Tuesday, January 18, 2011

The Eventually workflow

The F# specification gives an example of custom workflow in its section on computation expressions. I have extended it as shown below:

type Eventually<'R> =
    | Completed of 'R
    | Blocked of float32 * (unit -> Eventually<'R>)
    | BlockedNextFrame of (unit -> Eventually<'R>)
    | Running of (unit -> Eventually<'R>)
    | Yield of (unit -> Eventually<'R>)

An Eventually computation can be in one of the following states:
  • Completed, in which case the state includes the result of the computation.
  • Blocked, when the computation is paused, and the state contains the amount of time remaining before it resumes.
  • BlockedNextFrame, when the computation is paused for a short while, until the start of next frame. The meaning of "next frame" is up to the scheduler, which is described in an upcoming article
  • Running, which indicates the computation is ready to proceed.
  • Yield, when the computation is willing to pass control to another computation which is in state Running, if any. Otherwise, the computation is ready to proceed.
All states but the first one, Completed contain a function value which when executed produces a new state. It represents the work that the computation can perform "in one block" before returning control to the scheduler. When that happens, the scheduler updates the state of the computation.

Although it is possible to build computations "manually" using this discriminated union, F# makes it possible to do so conveniently using a subset of F# extended with a few keywords, namely yield, yield!, return, return!, let! and do!. The meaning of these keywords is up to the designer of the custom workflow, but they have an expected behaviour which should be respected.

yield and yield! are used to produce values in sequences. The second variant allows to insert sequences from other sequence expressions. Here I use sequence expression as a synonym for "a computation expression used to produce sequences". Below is an interesting example (from the F# Programming wikibook).
let allEvens = 
  let rec loop x = seq { yield x; yield! loop (x + 2) }
  loop 0;;
loop is a function which takes an integer x and returns a sequence expression which when evaluated produces the integer, followed by the result of a recursive call to loop using x+2. In clear, this will produce x, x+2, x+2+2, x+2+2+2..., one value at a time. Note the difference between yield and yield!: the first produces a single value, the second produces the values of another sequence expression.

let! allows to "nest" computation expressions. In other words, a computation expression can "call" another computation expression. I'm being pretty vague here, so let's look at an example using asynchronous computations (also from the wikibook).
let extractLinksAsync html =
    async {
        return System.Text.RegularExpressions.Regex.Matches(html, @"http://\S+")
let downloadAndExtractLinks url =
    async {
        let webClient = new System.Net.WebClient()
        let html = webClient.DownloadString(url : string)
        let! links = extractLinksAsync html
        return url, links.Count
return and return! are used for the final results of a computation expression. The example above uses return. My guess is return! should behave as
let! x = ...
return x
I am not quite sure if return is meant to have imperative semantics, i.e. discarding the remainder of a computation when encountered. My attempts to use such C-like semantics have not succeeded yet.

Going back to my library, XNAUtils, I have written a computation builder which is used to turn computation expressions (code between curly brackets) into state machines that can be executed in a controlled manner. The builder takes the form of a class with methods are called at selected places in a computation expression. A builder allows to override the meaning of certain keywords and specify the effect of the special keywords I listed above. The code is in CoopMultiTasking.fs
let rec bind k e =
    match e with
    | Completed r ->
        Running(fun () -> k r)
    | Running f ->
        Running(fun () -> f() |> bind k)
    | Blocked(dt, f) ->
        Blocked(dt, fun () -> f() |> bind k)
    | BlockedNextFrame f ->
        BlockedNextFrame(fun () -> f() |> bind k)
    | Yield f ->
        Yield(fun () -> f() |> bind k)


type TaskBuilder() =
    member x.Bind(e, f) = bind f e
    member x.Return(r) = result r
    member x.Combine(e1, e2) = combine e1 e2
    member x.Delay(f) = delay f
    member x.Zero() = result ()
    member x.TryWith(e, handler) = tryWith e handler
    member x.TryFinally(e, compensation) = tryFinally e compensation
    member x.While(cond, body) = whileLoop cond body
    member x.For(xs, body) = forLoop xs body
    member x.Using(e, f) = using e f

let task = TaskBuilder()
The module-level variable task can be used to create instances of Eventually. I have provided a few helpful tasks that can be used to wait a fixed amount of time, wait until next frame, give control to another task:
// Wait a fixed amount of time.
let wait dt =
    Blocked(dt, fun () -> Completed())

// Wait until next frame, i.e. next call to Scheduler.RunFor
let nextFrame () =
    BlockedNextFrame(fun() -> Completed())

// Stop executing, let some other task execute, if any is ready. Otherwise, we get control back.
let nextTask () =
    Yield(fun() -> Completed())

// Wait until a specified condition is true.
// nextTask is always called, then the condition is checked.
// If it's false, we wait until next frame, check the condition,
// call nextTask if it's not true, and so on...
let waitUntil f = task {
    let stop = ref false
    while not !stop do
        do! nextTask()
        if f() then
            stop := true
            do! nextFrame()
            stop := f()
waitUntil uses a while loop, it could have used recursion too. I initially refrained from using recursion in all code meant to be executed on the Xbox 360, because tail calls are not available on this platform. On second thoughts, I don't think that's a problem, as the kind of recursion that would appear inside of tasks is of the loose kind. Recursive calls a interrupted by calls to nextTask and nextFrame, which should prevent the stack from overflowing.

Although I have not had any use for synchronization between tasks yet, I have provided two functions to that effect:
// Lock, can be used for mutual exclusion.
// Locks should not be shared across instances of Scheduler.
type Lock() =
    let mutable locked = false;

    member this.Grab() =
        task {
            do! waitUntil (fun() -> not locked)
            locked <- true

    member this.Release() =
        task {
            locked <- false
            do! nextTask()

// A blocking channel which can be used for cross-task communication.
// Note that sending blocks until the message is received.
type BlockingChannel<'M>() =
    let mutable content = None
    let mutable flag_send = false
    let mutable flag_receive = false

    member this.Send(m : 'M) =
        task {
            do! waitUntil (fun () -> not flag_send)
            content <- Some m
            flag_send <- true
            do! waitUntil (fun () -> flag_receive)
            flag_send <- false
            content <- None

    member this.Receive() =
        task {
            do! waitUntil (fun () -> flag_send)
            let ret = content.Value
            flag_receive <- true
            do! waitUntil (fun () -> not flag_send)
            flag_receive <- false
            return ret

Cooperative multi-tasking makes synchronization between tasks easy, as there is no concurrency involved.

That's all for this time, in the next article I will describe how multiple tasks are executed by a single scheduler.

No comments: