Module Streaming.Source

Module with defintions for sources.

Elements are pulled from a source when needed. A source can have an internal state that will be lazily initialized when (and if) a consumer requests elements. The internal state will be safely disposed when the source runs out of elements, when the consumer terminates, or if an exception is raised at any point in the streaming pipeline.

Sources are a great way to define decoupled producers that can be consumed with Stream.from.

Sources are "single shot" and will haver their input exhausted by most operations. Consider buffering sources if you need to reuse their input.

type +'a t = 'a source

The type for sources that produce elements of type 'a.

Creating a source

Implementing your own custom sources enables access to many useful operations. The most flexible way to create a source is with the Source.make function.

The following example creates a source that counts down to zero:

let countdown n =
  let init () = n in
  let pull i =
    if i = 0 then None
    else Some (i, i - 1)) in
  Source.make ~init ~pull

Alternatively, existing list/array/seq/string sources, or others listed below, can be used.

val empty : 'a t

zero is an empty source.

val single : 'a -> 'a t

single a is a source with a single element a.

val list : 'a list -> 'a t

list items is a source with all elements from the items list.

val seq : 'a Stdlib.Seq.t -> 'a t

seq items is a source with all elements from the items sequence.

val array : 'a array -> 'a t

array items is a source with all elements from the items array.

val string : string -> char t

string str is a source with all characters from the str string.

val bytes : bytes -> char t

bytes b is a source with all characters from the b bytes.

val queue : 'a Stdlib.Queue.t -> 'a t

queue q is a source with all characters from the q queue.

val generate : len:int -> (int -> 'a) -> 'a t

generate ~len f generates a source of length len mapping each index to an element with f.

val count : int -> int t

count n is an infinite source with integers starting from n.

val iterate : 'a -> ('a -> 'a) -> 'a t

iterate x f is an infinite source where the first item is calculated by applying f to x, the second item by applying the function on the previous result and so on.

val unfold : 's -> ('s -> ('a * 's) option) -> 'a t

unfold seed next is a finite source created from a seed state and a function that produces elements and an updated state.

val make : init:(unit -> 's) -> pull:('s -> ('a * 's) option) -> ?stop:('s -> unit) -> unit -> 'a t

make ~init ~pull ~stop () is a value source created from the init, pull and stop. This function is similar to unfold but without lazy state initialization and state termination functions.

Note: For better performance, it is recommended that the pull function caches the termination condition in case it is expensive.

Zipping sources

val zip_with : ('a -> 'b -> 'c) -> 'a t -> 'b t -> 'c t

zip_with f src1 src2 is a source that pulls elements from src1 and src2 one by one, combining them with f.

val zip : 'a t -> 'b t -> ('a * 'b) t

zip src1 src2 is a source of pairs with elements elements pulled from src1 and src2 one by one.

Equivalent to zip_with (fun x y -> (x, y)) src1 src2.

Transforming a source

Note: Instead of applying the transformation functions at the source, consider using Stream.from or defining your compuation as a Flow to make it reusable.

val map : ('a -> 'b) -> 'a t -> 'b t

A source with all elements transformed with a mapping function.

val filter : ('a -> bool) -> 'a t -> 'a t

A source that includes only the elements that satisfy a predicate.

val filter_map : ('a -> 'b option) -> 'a t -> 'b t

filter_map f source applies f to every element x of source, discarding it if f x produces None, and keeping the transformed value otherwise.

val take : int -> 'a t -> 'a t

Take first n elements from the source and discard the rest.

Similar to take but takes the last n elements.

val take_while : ('a -> bool) -> 'a t -> 'a t

Take first elements from the source that satisfy a predicate and discard the rest.

val drop : int -> 'a t -> 'a t

Drop first n elements from the source and keep the rest.

Similar to drop but drops the last n elements.

val drop_while : ('a -> bool) -> 'a t -> 'a t

Drpo first elements from the source that satisfy a predicate and keep the rest.

Consuming a source

Many consumers are available in the Sink module. You can consume any source using a sink with:

let source = Source.iota 10 in
source
|> Stream.from
|> Stream.into Sink.last

Alternatively use the source consumers below for simple operations.

val fold : ('r -> 'a -> 'r) -> 'r -> 'a t -> 'r

fold step init source reduces the values of source with the step function, starting with init.

If the step function raises an exception, the source will be properly terminated.

val len : 'a t -> int

len src is the count of elements in src.

val each : ('a -> unit) -> 'a t -> unit

each f src applies an effectful function f to all elements in src.

val next : 'a t -> ('a * 'a t) option

next src is Some (x, rest) where x is the first element of src and rest is src without x; or None, if src is empty.

Warning: If rest is produced, it is required to either consume it or manually dispose its resources. Not doing so might lead to resource leaks.

Resource handling

val dispose : 'a t -> unit

dispose source forces the termination of the source state. This function is useful in situations when a leftover source is produced in Stream.run.

Note: If the source is not already initialized, calling this function will first initialize its state before it is terminated.