Module Streaming.Flow

Module with definitions for flows.

Flows define streaming transformation, filtering or groupping operations that are fully disconnected from input and output. Their implementation intercepts an internal folding function and modifies the input one value at a time.

Flows are a great way to define decoupled transformations that can be used with Stream.via.

A flow can be applied to a stream with Stream.via:

# Stream.range 10 100
  |> Stream.via (Flow.map (fun x -> x + 1))
  |> Stream.into Sink.sum
- : int = 4995

Flows can also be composed to form a pipeline:

# let flow = Flow.(map (fun x -> x + 1) >> filter (fun x -> x mod 2 = 0)) in
  Stream.range 10 100
  |> Stream.via flow
  |> Stream.into Sink.sum
- : int = 2475
type ('a, 'b) t = ('a'b) flow

Transforming a flow

val filter : ('a -> bool) -> ('a'a) t

A flow that includes only the elements that satisfy a predicate.

val map : ('a -> 'b) -> ('a'b) t

A flow with all elements transformed with a mapping function.

val tap : ('a -> unit) -> ('a'a) t

A flow with all elements passed through an effectful function.

val take : int -> ('a'a) t

Take first n elements from the source and discard the rest.

Buffering flow elements

val buffer : int -> ('a'a array) t

Collects n elements into an array buffer. Once the buffer is full it is emmited as a stream item.

Flowing into a sink

val through : ('a'r) sink -> ('a'r) t

through sink repeatedly processes incoming elements with sink producing computed results.

Note: The provided sink might consume the whole input if it is infinite, or if the input terminates before filling the sink.

Composing flows

val identity : ('a'a) t

A neutral flow that does not change the elements.

val compose : ('a'b) t -> ('b'c) t -> ('a'c) t

Compose two flows to form a new flow.

val (<<) : ('a'b) t -> ('b'c) t -> ('a'c) t

Composes two flows from right to left.

More precisely f1 << f2 is compose f1 f2.

val (>>) : ('b'c) t -> ('a'b) t -> ('a'c) t

Composes two flows from left to right.

More precisely f1 >> f2 is compose f2 f1.