Monday, May 18, 2015

Particle Swarm Optimization in F# part 2 - Parallelizing

In the last post I gave an example of particle swarm optimization algorithm in F#. F# has a few nice features, but the main reason I wanted to use it was because it is so easy to write multi-threaded applications with it.

Multi-threaded PSO version 1

If I want my algorithm to run multi-threaded all I have to do is take this line in the update_particles function.

let updated_particles = particles |> List.map (fun x -> update_particle args loss_func x global_best_params)   

And change it to:
 let updated_particles = particles |> List.map (fun x -> async { return update_particle args loss_func x global_best_params }
                                   |> Async.Parallel   
                                   |> Async.RunSynchronously  
                                   |> Array.toList  

Like magic the whole application now runs in parallel and because I have no mutable types I can guarantee there are no issues with cross thread calls. There are a few things I don't like about this though, this is 3 lines when really it should be one and also this creates an array that I then have to map back into a list. It is an annoying quirk of F# that it is much easier to run Arrays in parallel with the Parallel.map function, than Lists, but arrays are mutable losing the guaranteed thread safety...
A bit of help from stack overflow yielded this, PSeq a lib allowing you to run parallel operations against F# sequences. So the above can be rewritten:
 let updated_particles = particles  |> PSeq.map (fun x -> update_particle args loss_func x global_best_params)   
                                    |> PSeq.toList  

Multi-threaded PSO version 2

Now running in parallel, our speed is improved, but we still don't use all cores as efficiently as possible. Each iteration of updating the particles waits for every particle to complete. This is reasonable if they all take the same amount of time, but lets say the function is something that could execute in 1 second or in 100 seconds. We always have to wait the amount of time of the longest to complete and once only 1 is left we are only running on a single thread.
A better alternative is we just run the whole lifetime of the each particle in parallel. The only piece of data that needs to travel between the particles is the global_best parameters. This can be handled by passing this as a ref and having a setter functions so we always just take the current global best at start up and update it whenever we have a new value.
The changes we need to make for this are:
Remove the update_particles and run_until_stop_condition methods and replace them with this:
let rec private run_particle (args : Args) (particle : Particle) (get_global_best :unit -> Particle) check_particle_against_global_best loss_func iterations_to_run : Particle =
    let updated_particle = update_particle args loss_func particle (get_global_best()).Parameters
    check_particle_against_global_best updated_particle

    let new_iterations_to_run = iterations_to_run - 1
    if stop_condition args iterations_to_run (get_global_best()).Local_best_loss then
        updated_particle
    else
        run_particle args updated_particle get_global_best check_particle_against_global_best loss_func new_iterations_to_run

The execute method needs to be modified to run run_particle in parallel:
 let execute (args : Args) (loss_func : list<float> -> float) (initail_weights : seq<list<float>>) =      
   let particles = initail_weights |> Seq.take args.particles  
             |> PSeq.map (fun w -> Particle(w, [for _ in 1 .. w.Length -> 0.0], w, loss_func w))  
             |> Seq.toList  

   let global_best = ref (particles |> List.minBy (fun x -> x.Local_best_loss) )  
   let monitor = new System.Object()  
   let check_particle_against_global_best (particle : Particle) =  
     lock monitor (fun() -> if particle.Local_best_loss < global_best.Value.Local_best_loss then  
                   global_best.contents <- particle)  

   let final_particles = particles |> PSeq.map (fun x -> run_particle args x global_best check_particle_against_global_best loss_func args.iterations)  
                   |> PSeq.toList  
   (global_best.Value.Local_best, global_best.Value.Local_best_loss, final_particles)  

Now this looks a bit more like traditional C# multi-threaded code and we now have the possibility of screwing it up. But hopefully we have contained the problem enough to be confident we haven't. We keep a ref to the particle that is our global best. We need a monitor to lock against when updating and final our check_particle_against_global_best, to which we will pass each new particle we create to see if it is an improvement.

Speed tests

Here is method to write out the execution speed of  an action
 let speed_test action title =    
   let iterations = 20   
   let sw = System.Diagnostics.Stopwatch();   
   let results =   
      [for i in 0 .. iterations do   
          GC.Collect()   
          sw.Restart()   
          action()   
          sw.Stop()   
          yield sw.Elapsed] |> List.sort   
   let median_result = List.nth results (iterations/2)   
   printfn "%s : %A" title median_result   

The GC.Collect() forces the .Net framework to do a full garbage collection. If this isn't done the speed of one test can be affected by the memory used in the previous test. I'm taking the median time here, which I think is a better measure than mean. When ever you run a set of speed tests there will always be a few that take ages because of spooky behavior of OS's. The middle time ignores these occasional outliers.

Results

From my fairly old 4 core desktop
Single threadedMulti-threaded 1Mutli-threaded 2
Fixed length function time in seconds2.451.561.33
Variable length function time in seconds4.451.440.71

So looks like Multi-threaded 2 gives a pretty decent improvement. Full code is here on github.

No comments:

Post a Comment