Monday, September 15, 2008

Tutorial 7: Concurrency

It seems that every tutorial on concurrency has to cite The Free Lunch is Over: A Fundamental Turn Toward Concurrency in Software by Herb Sutter, and I won't make an exception. Basically, we are not going to get faster CPUs in the future. What we are going to get is more CPUs. The challenge for programmers is then to write software which can take advantage of the extra CPUs.

The code of this Tutorial shows how to achieve that in F#, and how easy it is.

#light

open System

let run_time_func (func: unit -> 'a) =
let time0 = System.DateTime.Now
let res = func ()
let diff0 = System.DateTime.Now - time0
(diff0, res)

let map_parallel func items =
let tasks =
seq {
for i in items -> async {
return (func i)
}
}
Async.Run (Async.Parallel tasks)

let gravity =
-9.81

let inline spring pos =
let k = 100.0 in
-k * pos

let inline drag k pos speed = -k * speed

let inline euler_implicit accel_func pos speed delta =
let speed2 = speed + delta * accel_func pos speed in
pos + delta * speed2, speed2

let inline adapt (f: 'num -> 'num -> 'num) =
fun pos speed ->
let accel = f pos speed in
speed, accel

let inline runge_kutta_4 deriv_func pos speed delta =
let half_delta = 0.5 * delta
let s1, a1 = deriv_func pos speed
let s2, a2 = deriv_func (pos + half_delta * s1) (speed + half_delta * a1)
let s3, a3 = deriv_func (pos + half_delta * s2) (speed + half_delta * a2)
let s4, a4 = deriv_func (pos + delta * s3) (speed + delta * a3)
let pos1 = pos + delta/6.0*(s1 + 2.0*s2 + 2.0*s3 + s4)
let speed1 = speed + delta/6.0*(a1 + 2.0*a2 + 2.0*a3 + a4) in
pos1, speed1

let inline simulate intg_func t0 t_fin delta pos0 speed0 =
let rec repeat t0 pos0 speed0 =
if t0 > t_fin then (pos0, speed0)
else
let t1 = t0 + delta
let pos1, speed1 = intg_func pos0 speed0 delta
repeat t1 pos1 speed1
repeat t0 pos0 speed0


let initial_states = [ for s in 1..1000 -> (float s * 1.0, 0.0) ]
let t0 = 0.0
let t_fin = 1000.0
let delta = 0.025

let accel pos speed = (drag 1.0) pos speed + spring pos + gravity in
for i in 0..3 do
let (run_time, res) = run_time_func (fun () -> initial_states |>
List.map(fun (x,y) -> simulate (euler_implicit accel) t0 t_fin (0.25*delta) x y))
printfn "Euler single: %f" run_time.TotalSeconds;

let (run_time, res) = run_time_func (fun () -> initial_states |>
map_parallel(fun (x,y) -> simulate (euler_implicit accel) t0 t_fin (0.25*delta) x y))
printfn "Euler multi: %f" run_time.TotalSeconds;

let (run_time, res) = run_time_func (fun () -> initial_states |>
List.map(fun (x,y) -> simulate (runge_kutta_4 (adapt accel)) t0 t_fin delta x y))
printfn "RK4 single: %f" run_time.TotalSeconds;

let (run_time, res) = run_time_func (fun () -> initial_states |>
map_parallel(fun (x,y) -> simulate (runge_kutta_4 (adapt accel)) t0 t_fin delta x y))
printfn "RK4 multi: %f" run_time.TotalSeconds;
done;

ignore(Console.ReadKey(true))


There are two changes in this code compared to Tutorial 6. One of them makes it easier to measure execution times. The other is the introduction of asynchronous computations. I will start with the description of the latter.

let map_parallel func items =
let tasks =
seq {
for i in items -> async {
return (func i)
}
}
Async.Run (Async.Parallel tasks)

This defines a function "map_parallel" taking a function and a list of items as parameters. It is similar to "Array.map", which was used in Tutorial 6: it applies the function "func" on each item of the list, and produces a list of the results. What is new here is it does so not in a sequential manner, but in an asynchronous way.

The "map_parallel" function starts by defining a sequence of tasks. The syntax
seq { for i in items -> Expr }
produces a sequence of items whose value is the result of evaluating Expr for each item.

async { return (func i) }
This defines an asynchronous computation, i.e. a computation that can be executed asynchronously. At this point it may seem that actually executing all these computations in parallel and wait for their completion is a complicated thing to do. Happily, there are functions in the standard library of F# that handle all the tricky business of creating threads, starting them, distributing computations on these threads and waiting for their completion.

Async.Run (Async.Parallel tasks)
I don't fully understand what these two functions exactly do, I'm happy enough that they do what I want when composed.

All there is to do is replace the call to Array.map by parallel_map:
map_parallel(fun (x,y) -> simulate (euler_implicit accel) t0 t_fin (0.25*delta) x y)


That's it, the performance numbers have now increased, running on a dual-core (although by less than a factor of two). Below is the updated performance table.


ImplementationPerformance EulerPerformance RK4
C++172413326530
F# Tutorial 6 66225111731
F# Tutorial 7 129032185185



The rest is just utility code intended to make it easier to measure execution times.

let run_time_func (func: unit -> 'a) =
let time0 = System.DateTime.Now
let res = func ()
let diff0 = System.DateTime.Now - time0
(diff0, res)
This defines a function "run_time_func" which takes a function of zero argument and returns a value of some type "a". Notice the use of "unit" to denote "no argument", which is similar to the "void" type of C.
"func ()" invokes this function. "func" alone would have denoted the function itself, instead of calling it.

In order to prevent calling the "simulate" function in the main code, I have added "fun () -> ", thus wrapping the call to "simulate" inside a function taking no argument.
run_time_func (
fun () ->
initial_states |>
List.map(fun (x,y) -> simulate (euler_implicit accel) t0 t_fin (0.25*delta) x y)
)

It is this wrapper that is passed to "run_time_func", and its execution results in the call to "simulate".

The "run_time_func" function returns a tuple composed of the execution time and the result of the function call.