module Lwt_stream : sig..end
type 'a t
A stream holding values of type 'a.
Naming convention: in this module, all functions applying a function to each element of a stream are suffixed by:
- _s when the function returns a thread and calls are serialised
- _p when the function returns a thread and calls are parallelised
from f creates a stream from the given input function. f is called each time more input is needed, and the stream ends when f returns None.
If f, or the thread produced by f, raises an exception, that exception is forwarded to the consumer of the stream (for example, a caller of Lwt_stream.get). Note that this does not end the stream. A subsequent attempt to read from the stream will cause another call to f, which may succeed with a value.
val from_direct : (unit -> 'a option) -> 'a t
from_direct f does the same as Lwt_stream.from but with a function that does not return a thread. It is preferred that this function be used rather than wrapping f into a function which returns a thread.
The behavior when f raises an exception is the same as for Lwt_stream.from, except that f does not produce a thread.
Exception raised by the push function of a push-stream when pushing an element after the end of stream (= None) has been pushed.
val create : unit -> 'a t * ('a option -> unit)
create () returns a new stream and a push function.
To notify the stream's consumer of errors, either use a separate communication channel, or use a result stream. There is no way to push an exception into a push-stream.
val create_with_reference :
unit -> 'a t * ('a option -> unit) * ('b -> unit)
create_with_reference () returns a new stream and a push function. The last function allows a reference to be set to an external source. This prevents the external source from being garbage collected.
For example, to convert a reactive event to a stream:
let stream, push, set_ref = Lwt_stream.create_with_reference () in set_ref (map_event push event)
Exception raised by the push function of a bounded push-stream when the stream queue is full and a thread is already waiting to push an element.
class type ['a]bounded_push = object..end
Type of sources for bounded push-streams.
val create_bounded : int -> 'a t * 'a bounded_push
create_bounded size returns a new stream and a bounded push source. The stream can hold a maximum of size elements. When this limit is reached, pushing a new element will block until one is consumed.
Note that you cannot clone or parse (with Lwt_stream.parse) a bounded stream. These functions will raise Invalid_argument if you try to do so.
It raises Invalid_argument if size < 0.
val of_seq : 'a Stdlib.Seq.t -> 'a t
of_seq s creates a stream returning all elements of s. The elements are
evaluated from s and pushed onto the stream as the stream is consumed.
val of_list : 'a list -> 'a t
of_list l creates a stream returning all elements of l. The elements are pushed into the stream immediately, resulting in a closed stream (in the sense of Lwt_stream.is_closed).
val of_array : 'a array -> 'a t
of_array a creates a stream returning all elements of a. The elements are pushed into the stream immediately, resulting in a closed stream (in the sense of Lwt_stream.is_closed).
val of_string : string -> char t
of_string str creates a stream returning all characters of str. The characters are pushed into the stream immediately, resulting in a closed stream (in the sense of Lwt_stream.is_closed).
clone st clone the given stream. Operations on each stream will not affect the other.
# let st1 = Lwt_stream.of_list [1; 2; 3];; val st1 : int Lwt_stream.t = <abstr> # let st2 = Lwt_stream.clone st1;; val st2 : int Lwt_stream.t = <abstr> # lwt x = Lwt_stream.next st1;; val x : int = 1 # lwt y = Lwt_stream.next st2;; val y : int = 1
It raises Invalid_argument if st is a bounded push-stream.
Returns the list of elements of the given stream
Returns the word composed of all characters of the given stream
Data retrieval ¶
Exception raised when trying to retrieve data from an empty stream.
peek st returns the first element of the stream, if any, without removing it.
npeek n st returns at most the first n elements of st, without removing them.
get st removes and returns the first element of the stream, if any.
nget n st removes and returns at most the first n elements of st.
get_while f st returns the longest prefix of st where all elements satisfy f.
next st removes and returns the next element of the stream or fails with Lwt_stream.Empty, if the stream is empty.
last_new st returns the last element that can be obtained without sleeping, or wait for one if none is available.
It fails with Lwt_stream.Empty if the stream has no more elements.
junk st removes the first element of st.
njunk n st removes at most the first n elements of the stream.
junk_while f st removes all elements at the beginning of the streams which satisfy f.
junk_old st removes all elements that are ready to be read without yielding from st.
For example, the read_password function of Lwt_read_line uses it to flush keys previously typed by the user.
val get_available : 'a t -> 'a list
get_available st returns all available elements of l without blocking.
val get_available_up_to : int -> 'a t -> 'a list
get_available_up_to n st returns up to n elements of l without blocking.
is_empty st returns whether the given stream is empty.
val is_closed : 'a t -> bool
is_closed st returns whether the given stream has been closed. A closed
stream is not necessarily empty. It may still contain unread elements. If
is_closed s = true, then all subsequent reads until the end of the
stream are guaranteed not to block.
closed st returns a thread that will sleep until the stream has been
val on_termination : 'a t -> (unit -> unit) -> unit
val on_terminate : 'a t -> (unit -> unit) -> unit
Stream transversal ¶
Note: all the following functions are destructive.
# let st1 = Lwt_stream.of_list [1; 2; 3];; val st1 : int Lwt_stream.t = <abstr> # let st2 = Lwt_stream.map string_of_int st1;; val st2 : string Lwt_stream.t = <abstr> # lwt x = Lwt_stream.next st1;; val x : int = 1 # lwt y = Lwt_stream.next st2;; val y : string = "2"
choose l creates an stream from a list of streams. The resulting stream will return elements returned by any stream of l in an unspecified order.
map f st maps the value returned by st with f
filter f st keeps only values, x, such that f x is true
filter_map f st filter and map st at the same time
map_list f st applies f on each element of st and flattens the lists returned
fold f s x fold_like function for streams.
iter f s iterates over all elements of the stream.
iter_n ?max_concurrency f s iterates over all elements of the stream s. Iteration is performed concurrently with up to max_threads concurrent instances of f.
Iteration is not guaranteed to be in order as this function will
attempt to always process max_concurrency elements from s at once.
Raises Invalid_argument if max_concurrency < 1.
max_concurrency : defaults to 1.
find f s find an element in a stream.
find_map f s find and map at the same time.
combine s1 s2 combines two streams. The stream will end when either stream ends.
append s1 s2 returns a stream which returns all elements of s1, then all elements of s2
concat st returns the concatenation of all streams of st.
flatten st = map_list (fun l -> l) st
val wrap_exn : 'a t -> 'a Lwt.result t
wrap_exn s is a stream s' such that each time s yields a value v, s' yields Result.Ok v, and when the source of s raises an exception e, s' yields Result.Error e.
Note that push-streams (as returned by Lwt_stream.create) never raise exceptions.
If the stream source keeps raising the same exception e each time the
stream is read, s' is unbounded. Reading it will produce Result.Error e
parse st f parses st with f. If f raise an exception, st is restored to its previous state.
It raises Invalid_argument if st is a bounded push-stream.
hexdump byte_stream returns a stream which is the same as the output of hexdump -C.
Basically, here is a simple implementation of hexdump -C:
let () = Lwt_main.run (Lwt_io.write_lines Lwt_io.stdout (Lwt_stream.hexdump (Lwt_io.read_lines Lwt_io.stdin)))
type 'a result = | Value of 'a | Error of exn(* <<div class="odocwiki_info"|<<span class="odocwiki_warning"|Deprecated.>>~Replaced by <<a_api | val Lwt_stream.wrap_exn >>~, which uses <<a_api | type Lwt.result >>~. ~A value or an error~. >> *)
Note that for push-streams (as returned by Lwt_stream.create) all elements of the mapped streams are values.
If the stream source keeps raising the same exception e each time the stream is read, the stream produced by map_exn is unbounded. Reading it will produce Lwt_stream.Error e indefinitely.