Ocsigen

This is a preliminary version of the documentation. Help us to improve it by filling tickets. We are looking for native english speakers to proof read the documentation. Contact us!

Lwt is a library for cooperative threads in OCaml, implemented by Jérôme Vouillon (Lwt module). It is using monadic style, which makes it really easy to use. Lwt can be downloaded here.

With respect to preemptive threads, cooperative threads are not using a scheduler to distribute processor time between threads. Each thread must instead tell other threads that it wants them to continue. An uncooperative thread will keep other threads blocked until it has completed its work.

  • Advantages
    • It is lighter
    • Very few needs of mutexes and risks of deadlock!
    • The use of many (small) threads make implementation very easy (for example, for user interfaces, no need to implement another event loop, make a thread for each widget!)
  • Drawbacks
    • Threads must cooperate ... Otherwise the whole program will hang.

Using monadic threads

The first and main commandment for programming with Lwt is:

Never use functions that may take time to complete.

For example Unix.sleep or Unix.read are to be banned. If you use them, all the process will hang and the server won't be able to fulfill any request during this time!

Fortunately, Lwt implements cooperative versions of all useful functions, for example:

Lwt_unix.sleep : float -> unit Lwt.t
Lwt_unix.read : file_descr -> string -> int -> int -> int Lwt.t

Note that all cooperative functions return σ Lwt.t instead of σ (where σ is any type). It means that the computation may not be terminated. It is the type of threads returning a value of type σ.

The only way to use such a value, is to bind the thread on another one, using Lwt.bind:

Lwt.bind : 'a Lwt.t -> ('a -> 'b Lwt.t) -> 'b Lwt.t

or the infix operator >>=, which is equivalent.

If you want to create a (terminated) thread from value, use Lwt.return:

Lwt.return : 'a -> 'a Lwt.t

The binary operator >>= used to bind the result of a non blocking computation to another. In other words, it means: "if the left handside takes time, do not block here, continue to the next instruction, but remember to come back here and give the result to the following function once you get it".

In other words, it is used to specify a sequence of computations that depend one from another. It is a kind of let binding. e1 >>= (fun r -> return e2) will try to evaluate e1, and once e1 is evaluated, it will give the result to the function given as second parameter. If the left handside (e1) takes time (for example because it is waiting for a read on a socket), the whole computation will be saved in a table and the program will continue to the next instruction that does not depend on e1. The computation will resume at a future cooperation point, if it is ready to continue. Instead of e1 >>= fun r -> return e2, you can write bind e1 (fun r -> return e2).

Cooperation points are inserted when you call cooperative functions such as Lwt_unix.read or Lwt_unix.write. You can add other cooperation points by calling Lwt_unix.yield (). The thread will suspend itself, Lwt will wake up the oldest waiting thread, and this thread will resume as soon as possible.

Monadic cooperative threads are not difficult to use once you get used to think the following way: programming is not putting instructions one after another any more, but defining a dependency relation (>>=) between function calls. Remember:

  • Functions that may take time to complete always return something of type σ Lwt.t (where σ is any type). They are called cooperative functions.
  • The only way to use the result of such a function is to bind it to another cooperative function (what to do after) using >>=.

Exceptions

Use Lwt.fail and Lwt.catch inside threads instead of raise and try ... with.

To raise an exception e inside a Lwt thread, use fail e and be careful about functions that may raise exceptions.

You must be careful when catching exception with Lwt. If you use the try ... with construct for an expression of type 'a Lwt.t, it may not work (as the computation may happen later).

Remember the following: if e has type 'a Lwt.t (where 'a is any type), do not write:

try
  e
with
  ...

but write:

catch
  (fun () -> e)
  (function ... | exn -> fail exn)

or, with the syntax extension:

try_lwt
  e
with
  ...

What if my function is not implemented in cooperative way?

If my function is thread-safe (for preemptive threads)

Ocsigen implements a way to make a non cooperative computation be executed automatically by a another preemptive thread. By preemptive, we mean "system threads", implemented using OCaml's Thread module. For example a database request using a non-cooperative database library, such as postgresql-ocaml. To do this, use the detach function. For example:

Lwt_preemptive.detach Unix.sleep 5 >>= fun () -> return e

Lwt_preemptive.detach create a fake Lwt thread. It'sa way to make actual system threads interact with cooperative threads.

A pool of preemptive threads is waiting for such "detached functions". You can specify the number of threads in the pool in the configuration file.

Warning: Detached functions must be thread-safe! Be careful to concurrent access to data. Be sure to use mutexes for your own functions, and use only thread-safe libraries. The libraries from Ocsigen are NOT thread-safe for now. Let us know if you really need them to be thread-safe.

Lwt itself is not thread-safe. Do not use Lwt threads inside a detached function!

If my function is not thread-safe (for preemptive threads)

If you want to use a function that takes time to execute but it not written in thread-safe way (for example some functions of OCaml's Str module), consider rewriting it in cooperative manner, or delegate the work to another process.

Examples

A thread that prints "hello" every 10 seconds

Just add the following lines to your program:

let rec f () =
  Lwt_io.printl "hello" >>= fun () ->
  Lwt_unix.sleep 10. >>= f
in f ();

or, with the syntax extension:

let rec f () =
  lwt () = Lwt_io.printl "hello" in
  lwt () = Lwt_unix.sleep 10. in
  f ()

Launching two computations in parallel

If you want to run two computations in parallel (for example f : unit -> σ Lwt.t and g : unit -> τ Lwt.t), do something like:

(* Firstly, I launch both threads: *)
let first_thread = f () in
let second_thread = g () in
(* Then I wait for the first one to finish: *)
first_thread >>= fun first_result ->
(* Then I wait for the second one to finish: *)
second_thread >>= fun second_result ->
...

or, with the syntax extension:

lwt first_result = f () and second_result = g () in
...

More advanced use: Create a thread waiting for an event

Lwt.wait () creates a thread that waits forever, and a "wakener". You can wake it up using Lwt.wakeup on the wakener.

(* Create the event *)
let waiter, wakener = Lwt.wait () in

(* Bind a thread on this event *)
Lwt.ignore_result (waiter >>= Lwt_io.printl);
...

(* Trigger the event *)
Lwt.wakeup wakener "HELLO";
(* All threads waiting for waiter are awoken, and waiter's value is "HELLO". *)

Note: up to version 1.1.0, there was no wakener and you could call wakeup on the thread itself.

Cooperative List.map

Here is an example taken from Lwt_util. It defines two functions map on lists. A thread is launched for each value of the list. In the first version, all threads are launched at the same moment. In the second one, they are sequentialized.

let rec map f l =
  match l with
  | [] -> return []
  | v :: r ->
      let t = f v in
      let rt = map f r in
      t >>= fun v' ->
      rt >>= fun l' ->
      return (v' :: l')

let rec map_serial f l =
  match l with
  | [] -> return []
  | v :: r ->
      f v >>= fun v' ->
      map_serial f r >>= fun l' ->
      return (v' :: l')

Example

Here is an example of of program that use Lwt. It waits for a connection on a port given as first parameter. It then connect to another port (second parameter) and relay everything it receives in either side to the other side. It exits when either side closes the connection.

(* Usage: relay <listening_port> <dest_port> *)

(* This program waits for a connection on <listening_port>. It then
   connect to <dest_port> and relay everything it receives in either
   side to the other side.  It exits when either side closes the
   connection. *)

let listening_port = int_of_string Sys.argv.(1)
let dest_port = int_of_string Sys.argv.(2)

open Lwt

let rec really_write out_ch buffer pos len =
  Lwt_unix.write out_ch buffer pos len >>= fun len' ->
  if len = len' then return () else
  really_write out_ch buffer (pos + len') (len - len')

let relay in_ch out_ch =
  let rec relay_rec previous_write =
    let buffer = String.create 8192 in
    (* Read some data from the input socket *)
    Lwt_unix.read in_ch buffer 0 8192 >>= fun len ->
    (* If we read nothing, this means that the connection has been
       closed.  In this case, we stop relaying. *)
    if len = 0 then return () else begin
      (* Otherwise, we write the data to the ouput socket *)
      let write =
        (* First wait for the previous write to terminate *)
        previous_write >>= (fun () ->
        (* Then write the contents of the buffer *)
        really_write out_ch buffer 0 len)
      in
      relay_rec write
    end
  in
  relay_rec (return ())

let new_socket () = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0
let local_addr num = Unix.ADDR_INET (Unix.inet_addr_any, num)

let _ =
  Lwt_unix.run begin
    (* Initialize the listening address *)
    let listening_socket = new_socket () in
    Lwt_unix.setsockopt listening_socket Unix.SO_REUSEADDR true;
    Lwt_unix.bind listening_socket (local_addr listening_port);
    Lwt_unix.listen listening_socket 1024;
    (* Wait for a connection *)
    Lwt_unix.accept listening_socket >>= fun (inp, _) ->
    (* Connect to the destination port *)
    let out = new_socket () in
    Lwt_unix.connect out (local_addr dest_port) >>= fun () ->
    (* Start relaying *)
    Lwt.choose [relay inp out; relay out inp]
  end

Library features

Stream

Lwt provides data stream, with a lot of functions to manipulates them.

Low-level unix functions

The Lwt_unix module offers low-level unix functions. It is a cooperative equivalent of the Unix module of the standard library.

Buffered byte channels

The Lwt_io module implements efficient buffered channels. They offers high-level functions to handle IO operations, like Lwt_io.read_lines which returns a stream of all the lines of a channel.

Process utilities

The Lwt_process module offers functions to easily create sub-process and communicate with them.

Text channels

Text channels are like byte channels except that they deals with text instead of bytes. They decode or encode unicode characters on the fly given a character encoding.

Terminal utilities

Lwt provides functions to write curses oriented applications. Lwt_term offers funtions to control the terminal, obtain its size, read a key, ... Lwt_read_line offers read-line functions with line editing support. It is also possible to use cooperative completion function which can be aborted by user input.

GLib integration

Lwt offers the possibility to use cooperative threads with a glib application. For that, use Lwt_glib.init instead of GLib.init.

.

Ocsigen

This page has been generated by Ocsimore. If you are a member of the Ocsigen team, you can log in to modify the pages.