Module Streaming

Streaming abstractions that combine, transform and reduce large amounts of sequential data efficiently, in constant space and without leaking resources.

Sources

Sources are decoupled producer of values.

Elements are pulled from a source when needed. A source can have an internal state that will be lazily initialized when (and if) a consumer requests elements. The internal state will be safely disposed when the source runs out of elements, when the consumer terminates, or if an exception is raised at any point in the streaming pipeline.

Sources are a great way to define decoupled producers that can be consumed with Stream.from. To learn more about how to create sources see "Creating a source".

The following example creates a source that counts down to zero:

let countdown n =
  let init () = n in
  let pull i =
    if i = 0 then None
    else Some (i, i - 1) in
  Source.make ~init ~pull ()

It can be consumed with:

# Stream.(from (countdown 3) |> into Sink.sum)
- : int = 6
type +'a source =
| Source : {
init : unit -> 's;
pull : 's -> ('a * 's) option;
stop : 's -> unit;
} -> 'a source

Type for sources that produce elements of type 'a.

module Source : sig ... end

Module with defintions for sources.

Sinks

Sinks are decoupled consumer of values.

Sinks are streaming abstractions that consume values and produce an aggregated value as a result. The result value is extracted from an internal state that is built incrementally. The internal state can acquire resources that are guaranteed to be terminated when the sink is filled.

Sinks are a great way to define decoupled consumers that can be filled with Stream.into. To learn more about how to create sinks see "Creating a sink".

The following example demonstrates a sink that consumes all elements into a list:

let list_sink =
  let init () = [] in
  let push acc x = x :: acc in
  let stop acc = List.rev acc in
  Sink.make ~init ~push ~stop ()

It can be used with:

# Stream.(iota 5 |> into list_sink)
- : int list = [0; 1; 2; 3; 4]
type ('a, 'b) sink =
| Sink : {
init : unit -> 's;
push : 's -> 'a -> 's;
full : 's -> bool;
stop : 's -> 'r;
} -> ('a'r) sink

Type for sinks that consume elements of type 'a and, once done, produce a value of type 'b.

module Sink : sig ... end

Module with defintions for sinks.

Flows

Flows are decoupled transformers of values.

Flows define streaming transformation, filtering or groupping operations that are fully disconnected from input and output. Their implementation intercepts an internal folding function and modifies the input one value at a time.

Flows are a great way to define decoupled transformations that can be used with Stream.via.

A flow can be applied to a stream with Stream.via:

# Stream.range 10 100
  |> Stream.via (Flow.map (fun x -> x + 1))
  |> Stream.into Sink.sum
- : int = 4995

Flows can also be composed to form a pipeline:

# let flow = Flow.(map (fun x -> x + 1) >> filter (fun x -> x mod 2 = 0)) in
  Stream.range 10 100
  |> Stream.via flow
  |> Stream.into Sink.sum
- : int = 2475
type ('a, 'b) flow = {
flow : r. ('b'r) sink -> ('a'r) sink;
}

Stream transformers that consume values of type 'a and produce values of type 'b.

module Flow : sig ... end

Module with definitions for flows.

Streams

Streams combine sources, sinks and flows into a flexible streaming toolkit.

Stream is a purely functional abstraction for incremental, push-based, sequential processing of elements. Streams can be easily and efficiently transformed and concatenated.

Stream operations do not leak resources. This is guaranteed in the presence of early termination (when not all stream elements are consumed) or in case of exceptions in the streaming pipeline.

Streams are built to be compatible with sources, sinks and flows. To create a stream that produces all elements from a source use Stream.from, to consume a stream with a sink use Stream.into and to transform stream elements with a flow use Stream.via. For more sophisticated pipelines that might have source leftovers, Stream.run can be used.

A simple echo program that loops over standard input and prints every line to standard output until Ctrl-D is hit:

# Stream.stdin |> Stream.stdout;;
hello<Enter>
hello
world<Enter>
world
<Ctrl+d>
- : unit = ()
type 'a stream = {
stream : r. ('a'r) sink -> 'r;
}

Type for streams with elements of type 'a.

module Stream : sig ... end

Module with defintions for streams.