`Streaming.Stream`

Module with defintions for streams.

Stream is a purely functional abstraction for incremental, push-based, sequential processing of elements. Streams can be easily and efficiently transformed and concatenated.

Stream operations do not leak resources. This is guaranteed in the presence of early termination (when not all stream elements are consumed) and in case of exceptions in the streaming pipeline.

Streams are built to be compatible with sources, sinks and flows. To create a stream that produces all elements from a source use `Stream.from`

, to consume a stream with a sink use `Stream.into`

and to transform stream elements with a flow use `Stream.via`

. For more sophisticated pipelines that might have source leftovers, `run`

can be used.

A simple echo program that loops over standard input and prints every line to standard output until Ctrl-D is hit:

```
# Stream.stdin |> Stream.stdout;;
hello<Enter>
hello
world<Enter>
world
<Ctrl+d>
- : unit = ()
```

`type 'a t = 'a stream`

Type for streams with elements of type `'a`

.

`val empty : 'a t`

Empty stream with no elements.

`val single : 'a -> 'a t`

`single a`

is a stream with a single element `a`

.

`val double : 'a -> 'a -> 'a t`

`double a b`

is a stream with two elements: `a`

and `b`

.

`val triple : 'a -> 'a -> 'a -> 'a t`

`triple a b c`

is a stream with elements: `a`

, `b`

and `c`

.

`val count : int -> int t`

`count n`

is an infinite stream with integers starting from `n`

.

`val range : ?by:int -> int -> int -> int t`

`range ~by:step n m`

is a sequence of integers starting from `n`

to `m`

(excluding `m`

) incremented by `step`

. The range is open on the right side.

`val iota : int -> int t`

`iota n`

is `range ~by:1 0 n`

, that is a range from `0`

to `n`

incremented by `1`

.

`val (-<) : int -> int -> int t`

`n -< m`

is `range n m`

.

`val (--) : int -> int -> int t`

`n -- m`

is `range n (m - 1)`

.

`val generate : len:int -> (int -> 'a) -> 'a t`

`generate ~len f`

generates a stream of length `n`

mapping each index to an element with `f`

.

`val repeat : ?times:int -> 'a -> 'a t`

`repeat ~times:n x`

produces a stream by repeating `x`

`n`

times. If `times`

is omitted, `x`

is repeated *ad infinitum*.

`val repeatedly : ?times:int -> (unit -> 'a) -> 'a t`

`repeatedly ~times:n f`

produces a stream by repeatedly calling `f ()`

`n`

times. If `times`

is omitted, `f`

is called *ad infinitum*.

`val iterate : 'a -> ('a -> 'a) -> 'a t`

`iterate x f`

is an infinite source where the first item is calculated by applying `f`

to `x`

, the second item by applying the function on the previous result and so on.

`val unfold : 's -> ('s -> ('a * 's) option) -> 'a t`

`unfold seed next`

is a stream created from a `seed`

state and a function that produces elements and an updated state. The stream will terminate when `next`

produces `None`

.

`val yield : 'a -> 'a t`

`yield x`

is a stream with a single element `x`

. Alias for `single`

.

`val of_list : 'a list -> 'a t`

`of_list items`

is a stream with all elements in the list `items`

.

`val to_list : 'a t -> 'a list`

`to_list stream`

converts `stream`

into a list.

`val of_array : 'a array -> 'a t`

`of_array items`

is a stream with all elements in the array `items`

.

`val to_array : 'a t -> 'a array`

`to_array stream`

converts `stream`

into an array.

`val of_string : string -> char t`

`of_string string`

is a stream with all characters in `string`

.

`val to_string : char t -> string`

`to_string stream`

converts `stream`

of characters into a string.

`val of_iter : (('a -> unit) -> unit) -> 'a t`

`of_iter iter`

is a stream created with elements generated by `iter`

.

`iter`

is an iterator that takes a consumer funcion that will recieve the produced values. An example of such a function is `List.iter`

.

Examples

```
# let iter k = List.iter k [1; 2; 3] in
Stream.fold (+) 0 (Stream.of_iter iter)
- : int = 6
```

A stream that includes only the elements that satisfy a predicate.

`filter_map f source`

applies `f`

to every element `x`

of source, discarding it if `f x`

produces `None`

, and keeping the transformed value otherwise.

Take first elements from the stream that satisfy a predicate and discard the rest.

Drpo first elements from the stream that satisfy a predicate and keep the rest.

`concat stream1 stream2`

is a stream that exhausts all elements from `stream1`

and then all elements from `stream2`

.

Examples

```
# let stream1 = Stream.of_list ['a'; 'b'; 'c'] in
let stream2 = Stream.of_list ['d'; 'e'; 'f'] in
Stream.to_list (Stream.concat stream1 stream2)
- : char list = ['a'; 'b'; 'c'; 'd'; 'e'; 'f']
```

`stream1 ++ stream2`

is the infix operator version of `concat stream1 stream2`

.

`flat_map f stream`

is a stream concatenated from sub-streams produced by applying `f`

to all elements of `stream`

.

```
let duplicated =
[1; 2; 3]
|> String.of_list
|> String.flat_map (fun x -> Stream.of_list [x; x])
|> Stream.to_list in
assert (duplicated = [1; 1; 2; 2; 3; 3])
```

`cycle ~times:n stream`

produces a stream by repeating all elements from `stream`

`n`

times. If `times`

is omitted, `x`

is repeated *ad infinitum*.

`product outer inner`

is a cartesian product of streams `outer`

and `inner`

.

Same as `product_with (fun x y -> (x, y))`

.

```
let pairs =
Stream.product (Stream.range 0 3) (Stream.of_string "ab")
|> Stream.to_list in
assert (pairs = [(0, 'a'); (0, 'b'); (1, 'a'); (1, 'b'); (2, 'a'); (2, 'b')])
```

`product_with combine outer inner`

is a cartesian product of streams `outer`

and `inner`

with pairs combined with `combine`

.

`split ~by:predicate stream`

splits `stream`

when `predicate x`

is `true`

for some stream item `x`

.

Split stream elements based on a binary predicate.

`split_between predicate ~into:sink stream`

is a stream of values computed from groups of consecutive elements by continuously feeding `sink`

. When the binary `predicate`

is `true`

for a pair of elements, the currently aggregated sink value is yielded and a new group is started.

If the input stream has less than 2 elements, `predicate`

is never called and the sink is computed with 1 or no elements.

```
let sums_of_groups =
[1; 2; 3; 101; 102; 103; 1001; 1002; 1003]
|> Stream.of_list
|> Stream.split_between
(fun x y -> y - x > 10)
~into:Sink.sum in
assert (Stream.to_list sums_of_groups = [6; 306; 3006])
```

`through sink stream`

repeatedly processes `stream`

elements with `sink`

streaming computed results.

**Note:** The provided sink might consume the whole input if it never fills, or if the stream terminates before filling the sink.

```
let large_numbers =
[1; 100; 2; 200; 3; 300]
|> Stream.of_list
|> Stream.through (Sink.find ~where:(fun x -> x > 10)) in
assert (Stream.to_list large_numbers = [Some 100; Some 200; Some 300])
```

Operations that traverse the the stream computing a single result value.

If the stream is infinite and the consumer accumulates the elements, the processing will not terminate, potentially resulting in a memory leak.

`val len : 'a t -> int`

`len stream`

counts the number of elements in `stream`

.

Will exhaust the stream during processing.

Examples

```
# Stream.len (Stream.of_list ['a'; 'b'; 'c']);
- : int = 3
```

`val each : ('a -> unit) -> 'a t -> unit`

`each f stream`

applies an effectful function `f`

to all elements of `stream`

.

`val fold : ('r -> 'a -> 'r) -> 'r -> 'a t -> 'r`

`fold step init stream`

reduces the values of `stream`

with the `step`

function, starting with `init`

.

If the `step`

function raises an exception, the stream will be properly terminated.

`val is_empty : 'a t -> bool`

`is_empty stream`

is `true`

if the stream has no elements and `false`

otherwise. This operations consumes the first elements of the stream.

`val first : 'a t -> 'a option`

Return the first element in the stream.

`val last : 'a t -> 'a option`

Return the last element in the stream, in linear time.

`val drain : 'a t -> unit`

`val of_file : string -> string t`

`of_file path`

is a stream of lines read from the file located at `path`

.

The file is opened lazily only when the stream is consumed and will be closed even if the stream processing terminates with an exception.

`val to_file : string -> string t -> unit`

`to_file path stream`

writes lines from `stream`

into the file located at `path`

.

`val stdin : string t`

The stream that reads lines from the standard input channel.

`val stdout : string t -> unit`

The stream that writes lines to standard output channel.

`val stderr : string t -> unit`

The stream that writes lines to standard error channel.

Integration adaptors for sources, sinks and flows.

`from source`

is a stream created from a source.

Examples

```
# Stream.len (Stream.from (Source.list [0; 1; 2]))
- : int = 3
```

`into sink stream`

is the result value produced by streaming all elements of `stream`

into `sink`

.

Examples

```
# Stream.into Sink.sum (Stream.of_list [0; 1; 2])
- : int = 3
```

`fill sink stream`

is similar to `into`

but, in addition to the result value produced by `sink`

, will optionally return a leftover stream with elements that were not consumed by `sink`

.

`via flow stream`

is stream produced by transforming all elements of `stream`

via `flow`

.

Examples

```
Stream.count 100
|> Stream.via (Flow.filter (fun x -> x mod 2 = 0))
|> Stream.via (Flow.take 50)
|> Stream.into Sink.sum
```

Fuses sources, sinks and flows and produces a result and a leftover.

`let (r, leftover) = Stream.run ~from:source via:flow ~into:sink`

Streams elements from `source`

into `sink`

via a stream transformer `flow`

. In addition to the result value `r`

produced by `sink`

, a `leftover`

source is returned, if `source`

was not exhausted.

**Warning:** If a leftover source is produced, it is required to either consume it or manually dispose its resources. Not doing so might lead to resource leaks.

Examples

```
# let (x, leftover) =
let source = Source.list ["1"; "2"; "3"] in
let flow = Flow.map int_of_string in
Stream.run ~from:source ~via:flow ~into:Sink.first
val x : int option = Some 1
val leftover : string source option = Some <abstr>
# match leftover with
| Some source -> Source.dispose source
| None -> print_endline "No leftover"
- : unit = ()
```

Streams can be constructed with the let-binding syntax which is similar to list comprehensions. The following example demonstrates this feature:

```
open Stream.Syntax
let items =
let* n = Stream.range 1 3 in
let* c = Stream.of_list ['x'; 'y'] in
yield (n, c) in
assert (Stream.to_list items = [(1, 'x'); (1, 'y'); (2, 'x'); (2, 'y')])
```

`module Syntax : sig ... end`

Module with syntax definitions for streams.