Streaming.Source
Module with defintions for sources.
Elements are pulled from a source when needed. A source can have an internal state that will be lazily initialized when (and if) a consumer requests elements. The internal state will be safely disposed when the source runs out of elements, when the consumer terminates, or if an exception is raised at any point in the streaming pipeline.
Sources are a great way to define decoupled producers that can be consumed with Stream.from
.
Sources are "single shot" and will haver their input exhausted by most operations. Consider buffering sources if you need to reuse their input.
type +'a t = 'a source
The type for sources that produce elements of type 'a
.
Implementing your own custom sources enables access to many useful operations. The most flexible way to create a source is with the Source.make
function.
The following example creates a source that counts down to zero:
let countdown n =
let init () = n in
let pull i =
if i = 0 then None
else Some (i, i - 1)) in
Source.make ~init ~pull
Alternatively, existing list
/array
/seq
/string
sources, or others listed below, can be used.
val empty : 'a t
zero
is an empty source.
val single : 'a -> 'a t
single a
is a source with a single element a
.
val list : 'a list -> 'a t
list items
is a source with all elements from the items
list.
val seq : 'a Stdlib.Seq.t -> 'a t
seq items
is a source with all elements from the items
sequence.
val array : 'a array -> 'a t
array items
is a source with all elements from the items
array.
val string : string -> char t
string str
is a source with all characters from the str
string.
val bytes : bytes -> char t
bytes b
is a source with all characters from the b
bytes.
val queue : 'a Stdlib.Queue.t -> 'a t
queue q
is a source with all characters from the q
queue.
val generate : len:int -> (int -> 'a) -> 'a t
generate ~len f
generates a source of length len
mapping each index to an element with f
.
val count : int -> int t
count n
is an infinite source with integers starting from n
.
val iterate : 'a -> ('a -> 'a) -> 'a t
iterate x f
is an infinite source where the first item is calculated by applying f
to x
, the second item by applying the function on the previous result and so on.
val unfold : 's -> ('s -> ('a * 's) option) -> 'a t
unfold seed next
is a finite source created from a seed
state and a function that produces elements and an updated state.
val make : init:(unit -> 's) -> pull:('s -> ('a * 's) option) -> ?stop:('s -> unit) ->
unit -> 'a t
make ~init ~pull ~stop ()
is a value source created from the init
, pull
and stop
. This function is similar to unfold
but without lazy state initialization and state termination functions.
Note: For better performance, it is recommended that the pull
function caches the termination condition in case it is expensive.
zip_with f src1 src2
is a source that pulls elements from src1
and src2
one by one, combining them with f
.
zip src1 src2
is a source of pairs with elements elements pulled from src1
and src2
one by one.
Equivalent to zip_with (fun x y -> (x, y)) src1 src2
.
Note: Instead of applying the transformation functions at the source, consider using Stream.from
or defining your compuation as a Flow
to make it reusable.
A source that includes only the elements that satisfy a predicate.
filter_map f source
applies f
to every element x
of source, discarding it if f x
produces None
, and keeping the transformed value otherwise.
Similar to take
but takes the last n
elements.
Take first elements from the source that satisfy a predicate and discard the rest.
Similar to drop
but drops the last n
elements.
Drpo first elements from the source that satisfy a predicate and keep the rest.
Many consumers are available in the Sink
module. You can consume any source using a sink with:
let source = Source.iota 10 in
source
|> Stream.from
|> Stream.into Sink.last
Alternatively use the source consumers below for simple operations.
val fold : ('r -> 'a -> 'r) -> 'r -> 'a t -> 'r
fold step init source
reduces the values of source
with the step
function, starting with init
.
If the step
function raises an exception, the source will be properly terminated.
val len : 'a t -> int
len src
is the count of elements in src
.
val each : ('a -> unit) -> 'a t -> unit
each f src
applies an effectful function f
to all elements in src
.
next src
is Some (x, rest)
where x
is the first element of src
and rest
is src
without x
; or None
, if src
is empty.
Warning: If rest
is produced, it is required to either consume it or manually dispose its resources. Not doing so might lead to resource leaks.
val dispose : 'a t -> unit
dispose source
forces the termination of the source state. This function is useful in situations when a leftover source is produced in Stream.run
.
Note: If the source is not already initialized, calling this function will first initialize its state before it is terminated.