Module Streaming.Stream

Module with defintions for streams.

Stream is a purely functional abstraction for incremental, push-based, sequential processing of elements. Streams can be easily and efficiently transformed and concatenated.

Stream operations do not leak resources. This is guaranteed in the presence of early termination (when not all stream elements are consumed) and in case of exceptions in the streaming pipeline.

Streams are built to be compatible with sources, sinks and flows. To create a stream that produces all elements from a source use Stream.from, to consume a stream with a sink use Stream.into and to transform stream elements with a flow use Stream.via. For more sophisticated pipelines that might have source leftovers, run can be used.

A simple echo program that loops over standard input and prints every line to standard output until Ctrl-D is hit:

# Stream.stdin |> Stream.stdout;;
- : unit = ()
type 'a t = 'a stream

Type for streams with elements of type 'a.

Creating a stream

val empty : 'a t

Empty stream with no elements.

val single : 'a -> 'a t

single a is a stream with a single element a.

val double : 'a -> 'a -> 'a t

double a b is a stream with two elements: a and b.

val triple : 'a -> 'a -> 'a -> 'a t

triple a b c is a stream with elements: a, b and c.

val count : int -> int t

count n is an infinite stream with integers starting from n.

val range : ?by:int -> int -> int -> int t

range ~by:step n m is a sequence of integers starting from n to m (excluding m) incremented by step. The range is open on the right side.

val iota : int -> int t

iota n is range ~by:1 0 n, that is a range from 0 to n incremented by 1.

val (-<) : int -> int -> int t

n -< m is range n m.

val (--) : int -> int -> int t

n -- m is range n (m - 1).

val generate : len:int -> (int -> 'a) -> 'a t

generate ~len f generates a stream of length n mapping each index to an element with f.

val repeat : ?times:int -> 'a -> 'a t

repeat ~times:n x produces a stream by repeating x n times. If times is omitted, x is repeated ad infinitum.

val repeatedly : ?times:int -> (unit -> 'a) -> 'a t

repeatedly ~times:n f produces a stream by repeatedly calling f () n times. If times is omitted, f is called ad infinitum.

val iterate : 'a -> ('a -> 'a) -> 'a t

iterate x f is an infinite source where the first item is calculated by applying f to x, the second item by applying the function on the previous result and so on.

val unfold : 's -> ('s -> ('a * 's) option) -> 'a t

unfold seed next is a stream created from a seed state and a function that produces elements and an updated state. The stream will terminate when next produces None.

val yield : 'a -> 'a t

yield x is a stream with a single element x. Alias for single.

Stream converters

val of_list : 'a list -> 'a t

of_list items is a stream with all elements in the list items.

val to_list : 'a t -> 'a list

to_list stream converts stream into a list.

val of_array : 'a array -> 'a t

of_array items is a stream with all elements in the array items.

val to_array : 'a t -> 'a array

to_array stream converts stream into an array.

val of_string : string -> char t

of_string string is a stream with all characters in string.

val to_string : char t -> string

to_string stream converts stream of characters into a string.

val of_iter : (('a -> unit) -> unit) -> 'a t

of_iter iter is a stream created with elements generated by iter.

iter is an iterator that takes a consumer funcion that will recieve the produced values. An example of such a function is List.iter.


# let iter k = List.iter k [1; 2; 3] in
  Stream.fold (+) 0 (Stream.of_iter iter)
- : int = 6

Transforming a stream

val map : ('a -> 'b) -> 'a t -> 'b t

A stream with all elements transformed with a mapping function.

val tap : ('a -> unit) -> 'a t -> 'a t

Pass each element through an effectful function.

val filter : ('a -> bool) -> 'a t -> 'a t

A stream that includes only the elements that satisfy a predicate.

val filter_map : ('a -> 'b option) -> 'a t -> 'b t

filter_map f source applies f to every element x of source, discarding it if f x produces None, and keeping the transformed value otherwise.

val take : int -> 'a t -> 'a t

Take first n elements from the stream and discard the rest.

val take_while : ('a -> bool) -> 'a t -> 'a t

Take first elements from the stream that satisfy a predicate and discard the rest.

val drop_while : ('a -> bool) -> 'a t -> 'a t

Drpo first elements from the stream that satisfy a predicate and keep the rest.

val drop : int -> 'a t -> 'a t

Drop first n elements from the stream and keep the rest.

val rest : 'a t -> 'a t

Drops the first element of the stream.

val indexed : 'a t -> (int * 'a) t

Adds an index to each element in the stream.

Combining streams

val concat : 'a t -> 'a t -> 'a t

concat stream1 stream2 is a stream that exhausts all elements from stream1 and then all elements from stream2.


# let stream1 = Stream.of_list ['a'; 'b'; 'c'] in
  let stream2 = Stream.of_list ['d'; 'e'; 'f'] in
  Stream.to_list (Stream.concat stream1 stream2)
- : char list = ['a'; 'b'; 'c'; 'd'; 'e'; 'f']
val (++) : 'a t -> 'a t -> 'a t

stream1 ++ stream2 is the infix operator version of concat stream1 stream2.

val append : 'a -> 'a t -> 'a t

append x stream adds the element x to the end of stream.

val prepend : 'a -> 'a t -> 'a t

prepend x stream adds the element x to the beginning of stream.

val flatten : 'a t t -> 'a t

Concatenates a stream of streams.

val flat_map : ('a -> 'b t) -> 'a t -> 'b t

flat_map f stream is a stream concatenated from sub-streams produced by applying f to all elements of stream.

let duplicated =
  [1; 2; 3]
  |> String.of_list
  |> String.flat_map (fun x -> Stream.of_list [x; x])
  |> Stream.to_list in
assert (duplicated = [1; 1; 2; 2; 3; 3])
val cycle : ?times:int -> 'a t -> 'a t

cycle ~times:n stream produces a stream by repeating all elements from stream n times. If times is omitted, x is repeated ad infinitum.

val interpose : 'a -> 'a t -> 'a t

Inserts a separator element between each stream element.

val product : 'a t -> 'b t -> ('a * 'b) t

product outer inner is a cartesian product of streams outer and inner.

Same as product_with (fun x y -> (x, y)).

let pairs =
  Stream.product (Stream.range 0 3) (Stream.of_string "ab")
  |> Stream.to_list in
assert (pairs = [(0, 'a'); (0, 'b'); (1, 'a'); (1, 'b'); (2, 'a'); (2, 'b')])
val product_with : ('a -> 'b -> 'c) -> 'a t -> 'b t -> 'c t

product_with combine outer inner is a cartesian product of streams outer and inner with pairs combined with combine.

Groupping and splitting

val partition : int -> 'a t -> 'a t t

partition n partitions the stream into sub-streams of size n.

val split : by:('a -> bool) -> 'a t -> 'a t t

split ~by:predicate stream splits stream when predicate x is true for some stream item x.

val split_between : ('a -> 'a -> bool) -> into:('a'r) sink -> 'a t -> 'r t

Split stream elements based on a binary predicate.

split_between predicate ~into:sink stream is a stream of values computed from groups of consecutive elements by continuously feeding sink. When the binary predicate is true for a pair of elements, the currently aggregated sink value is yielded and a new group is started.

If the input stream has less than 2 elements, predicate is never called and the sink is computed with 1 or no elements.

let sums_of_groups =
  [1; 2; 3; 101; 102; 103; 1001; 1002; 1003]
  |> Stream.of_list
  |> Stream.split_between
      (fun x y -> y - x > 10)
      ~into:Sink.sum in
assert (Stream.to_list sums_of_groups = [6; 306; 3006])
val through : ('a'r) sink -> 'a t -> 'r t

through sink stream repeatedly processes stream elements with sink streaming computed results.

Note: The provided sink might consume the whole input if it never fills, or if the stream terminates before filling the sink.

let large_numbers =
  [1; 100; 2; 200; 3; 300]
  |> Stream.of_list
  |> Stream.through (Sink.find ~where:(fun x -> x > 10)) in
assert (Stream.to_list large_numbers = [Some 100; Some 200; Some 300])


Operations that traverse the the stream computing a single result value.

If the stream is infinite and the consumer accumulates the elements, the processing will not terminate, potentially resulting in a memory leak.

val len : 'a t -> int

len stream counts the number of elements in stream.

Will exhaust the stream during processing.


# Stream.len (Stream.of_list ['a'; 'b'; 'c']);
- : int = 3
val each : ('a -> unit) -> 'a t -> unit

each f stream applies an effectful function f to all elements of stream.

val fold : ('r -> 'a -> 'r) -> 'r -> 'a t -> 'r

fold step init stream reduces the values of stream with the step function, starting with init.

If the step function raises an exception, the stream will be properly terminated.

val is_empty : 'a t -> bool

is_empty stream is true if the stream has no elements and false otherwise. This operations consumes the first elements of the stream.

val first : 'a t -> 'a option

Return the first element in the stream.

val last : 'a t -> 'a option

Return the last element in the stream, in linear time.

val drain : 'a t -> unit

IO Streams

val of_file : string -> string t

of_file path is a stream of lines read from the file located at path.

The file is opened lazily only when the stream is consumed and will be closed even if the stream processing terminates with an exception.

val to_file : string -> string t -> unit

to_file path stream writes lines from stream into the file located at path.

val stdin : string t

The stream that reads lines from the standard input channel.

val stdout : string t -> unit

The stream that writes lines to standard output channel.

val stderr : string t -> unit

The stream that writes lines to standard error channel.


Integration adaptors for sources, sinks and flows.

val from : 'a source -> 'a t

from source is a stream created from a source.


# Stream.len (Stream.from (Source.list [0; 1; 2]))
- : int = 3
val into : ('a'b) sink -> 'a t -> 'b

into sink stream is the result value produced by streaming all elements of stream into sink.


# Stream.into Sink.sum (Stream.of_list [0; 1; 2])
- : int = 3

fill sink stream is similar to into but, in addition to the result value produced by sink, will optionally return a leftover stream with elements that were not consumed by sink.

val via : ('a'b) flow -> 'a stream -> 'b stream

via flow stream is stream produced by transforming all elements of stream via flow.


Stream.count 100
|> Stream.via (Flow.filter (fun x -> x mod 2 = 0))
|> Stream.via (Flow.take 50)
|> Stream.into Sink.sum
val run : from:'a source -> via:('a'b) flow -> into:('b'r) sink -> 'r * 'a source option

Fuses sources, sinks and flows and produces a result and a leftover.

let (r, leftover) = ~from:source via:flow ~into:sink

Streams elements from source into sink via a stream transformer flow. In addition to the result value r produced by sink, a leftover source is returned, if source was not exhausted.

Warning: If a leftover source is produced, it is required to either consume it or manually dispose its resources. Not doing so might lead to resource leaks.


# let (x, leftover) =
    let source = Source.list ["1"; "2"; "3"] in
    let flow = int_of_string in ~from:source ~via:flow ~into:Sink.first
val x : int option = Some 1
val leftover : string source option = Some <abstr>
# match leftover with
  | Some source -> Source.dispose source
  | None -> print_endline "No leftover"
- : unit = ()

Syntax defintions

Streams can be constructed with the let-binding syntax which is similar to list comprehensions. The following example demonstrates this feature:

open Stream.Syntax

let items =
  let* n = Stream.range 1 3 in
  let* c = Stream.of_list ['x'; 'y'] in
  yield (n, c) in
assert (Stream.to_list items = [(1, 'x'); (1, 'y'); (2, 'x'); (2, 'y')])
module Syntax : sig ... end

Module with syntax definitions for streams.