Streaming

Streaming abstractions that combine, transform and reduce large amounts of sequential data efficiently, in constant space and without leaking resources.

Overview

Streaming uses composable stream producers (sources), consumers (sinks) and transformers (flows). The central model that abstracts over them is a Stream.

The following features are provided:

Modules

Stream Push-based streams with excellent overall performance, safe and lazy resource management and a multitude of operations. Streams can be built with comprehensions.
Source Pull-based producers of values with good performance, safe and lazy resource management and zipping operations.
Sink Consumers of values with excellent performance, safe and lazy resource management and flexible composition operations. Sinks are like first-class folds with early termination!
Flow Transformers of values that form composable streaming pipelines. Excellent for defining source and sink independent value transformations.

For more information on each module consult the entrypoint module Streaming.

Quickstart

Install using opam

The library can be installed with OPAM: opam install streaming. You can run opam info streaming to make sure that the library is installed.

Install using esy

Add "@opam/streaming": "0.1" to dependencies in your package.json file and install the dependencies with esy install. Run esy ls-modules to make sure that the library is installed for your project.

Use with dune

To start using Streaming in your dune project add it to libraries in the dune file.

(executable
  (public_name myexe)
  (libraries streaming))

Open the entrypoint module in your code to start using Streams:

open Streaming

Use in the top-level

Fire up utop or down and run:

# #require "streaming";;
# open Streaming;;
# Stream.(stdin |> stdout);;
We're streaming!<Enter>
We're streaming!
<Ctrl+d>
- : unit = ()

That's it! Scroll down to see some examples or jump into the API documentation.

Examples

Read lines from STDIN

# Stream.stdin
  |> Stream.filter ((<>) "")
  |> Stream.map (fun line -> "You wrote: " ^ line)
  |> Stream.each print_endline

Using stream notation

# let items =
    let open Stream.Syntax in
    let* n = Stream.range 1 3 in
    let* c = Stream.of_list ['x'; 'y'] in
    yield (n, c)

# Stream.to_list items
- : (int * char) list = [(1, 'x'); (1, 'y'); (2, 'x'); (2, 'y')])

Using sink notation

Compute the arithmetic mean in a single iteration of the input.

# let mean =
    let open Sink.Syntax in
    let+ total = Sink.sum
    and+ count = Sink.len in
    total / count

# Stream.(iota 20 |> into mean)
- : int = 9

Resource handling

Input and output resources involved in stream processing will have the following two properties:

The following examples demonstrate these two properties in practice.

Examples

(* Create a source that must not be initialized. *)
# let bomb () =
    Source.make
      ~init:(fun () -> failwith "Boom!")
      ~pull:(fun () -> None)
      ()
val bomb : unit -> 'a source = <fun>

(* Feed it into a stream that terminates early. *)
# bomb ()
  |> Stream.from
  |> Stream.take 0
  |> Stream.to_list
- : 'a list = []

And... nothing! As you can see our "bomb" was never detonated. This is because in streams, sinks (in our case it's to_list combined with take) are checked for "fullness" before sources are initialized.

In the next example let's look at how streams behave in the presence of exceptions.

# let i'm_a_source_i_must_not_leak () =
    Source.make
      ~init:(fun () -> `Dangerous_input)
      ~pull:(fun st -> Some ("always blue", st))
      ~stop:(fun `Dangerous_input ->
          print_endline "Stopping input... Phew!")
      ()
val i'm_a_source_i_must_not_leak : unit -> string source = <fun>

# let i'm_a_sink_i_must_not_leak () =
    Sink.make
      ~init:(fun () -> `Dangerous_output)
      ~push:(fun `Dangerous_output x -> `Dangerous_output)
      ~stop:(fun `Dangerous_output ->
          print_endline "Stopping output... That was close!")
      ()
val i'm_a_sink_i_must_not_leak : unit -> ('a, unit) sink = <fun>

# i'm_a_source_i_must_not_leak ()
  |> Stream.from
  |> Stream.map (fun x -> failwith "Boom!")
  |> Stream.into (i'm_a_sink_i_must_not_leak ())
Stopping input... Phew!
Stopping output... That was close!
Exception: Failure "Boom!".

Our termination functions run just before the world exploded!

Limitations

The termination functions in streaming are always guaranteed to be called. What is currently not well specified is the state they will be called with. It is possible for a source, for example, to stream lines from multiple files, while seamlessly opening and closing them as the input is read. In normal termination conditions, streaming will correctly call all termination functions with correct states.

The same is not true in situations when exceptions are raised. Currently, when there is an exception, streaming will call the termination function on the first instance of the state, even though it might have changed.

This is not a difficult problem to solve, but a correct implementation has a high performance cost.

In the future, streaming might expose safer stream management functions to help with these situations. For now, it is recommended that sources and sinks implement their stop functions in a way that allows them to close all allocated resources when given only the first state. This can be achieved by aggregating the intermediate states or using refs to allow the initial state to point to the latest sate.

Questions

When should I use streaming?

streaming is a general-purpose streaming library with abstractions meant to be used as a drop-in replacement for concrete sequential data-structures such as lists. It is always a good idea to use a streaming model if you need sequential access to data. For very small collections that do not need to be processed multiple times, using lists is adequate. In all other situations, streams are significantly better in terms of performance and composition.

Streaming abstractions are an excellent choice for stateful producers and consumers that require precise resource management. Consuming elements from a file or a database handler with streams is significantly safer. All models in streaming are lazy (they will only initialise resources when needed) and support prompt termination (the resources will be terminated immediately when they are no longer needed). This is guaranteed even when streaming pipelines raise exceptions.

Finally, streaming encourages implementation of small decoupled sources, sinks and flow that can be reused in a wide spectrum of situations.

How fast are Streams compared to other streaming models?

In short - fast (see for yourself). The Streaming.Stream module was designed to improve the most efficient iteration model currently available for OCaml, which is the so called "internal iterator" with the type ('a -> unit) -> unit. This iterator is related to the commonly available "iter" functions in the standard library.

Streams are a variant of internal iterators that add support for resource safety, early termination and avoid the need for mutations and exceptions in the combinators. In addition to that, they provide a similar performance profile.

What's the difference between Sources and Streams?

Both sources and streams produce values. The main difference is flow control: with sources, consumers of the elements are in charge of control; while with streams, it's the producers who drive the computation. This means that sources should be used for situations where the elements are requested on demand, while streams are best suited for "reactive" inputs.

In general, streams offer better performance than sources for the most common operations (including concatenation) and offer integration with sinks and flows. On the other hand, sources are easier to create, and support zipping.

It is recommended to use sources to define decoupled producers that can be consumed with streams. Any source can become a stream, but the opposite is not so easy.

Why are all streaming types in this library abstract?

Even though the core types in streaming are very simple, stable and have many nice properties, it is possible that the types might change in the future to support new functionality such as backpressure and concurrency. To avoid breaking changes, and until the library reaches the 1.0 milestone, the types are going to be abstract.

If you have a use-case that requires access to the internal type, please open an issue.

How can the net amount of entropy of the universe be massively decreased?

You should ask AC.
let answer () =
  let x = "84726982693273833265833289698432737883857070736773697" ^
          "8843268658465327079823265327769657873787170857632657883876982
  What if there was no space and time?" in
  let open Format in let (%^) = printf in
  let open Streaming in let open Stream in
  let (%)=fun f g x->f (g x) and
  (|/-)=(%^)"\027[1m\027[41m";"\\|/-"in let (//) x = (|/-)
    |> of_string
    |> interpose '\b' |> append '\b'
    |> cycle ~times:(Random.int 10)
    |> append x
    |> each (fun c -> "%c%!" %^ (Unix.sleepf 0.003; c)) in
  from @@ (let pull i =
      if i >= 114 then None
      else Some (String.sub x i 2, i + 2)
    in Source.make ~init:(fun () -> 0) ~pull ())
  |> via (Flow.map (char_of_int%int_of_string))
  |> into (Sink.each (//)); (%^) "\027[0m"

Troubleshooting

Acknowledgements

This library is based on ideas found in other libraries and research projects such as: Haskell's Pipes and Foldl libraries, Scala's ZIO Streams, Clojure's Transducers and the Iteratees streaming model by Oleg Kiselyov.