Tuesday, 4 June 2013

Parallel iPython

For a few months now, I've been using IPython to do a heavy but embarrassingly parallel calculation. I finally decided to work out how to use IPython's parallel computing mechanisms to do the job several times faster. Here's a summary of my routine to make the parallel calculation. Most of this can be found in the IPython documentation but I'll mention a few extra points I noted.

Starting the IPython cluster

To do parallel calculations, IPython needs to run a number of engines, which it calls on from the interface to do the heavy lifting. These are started with

ipcluster start -n 4

where here, for example, 4 engines will be started. My quad-core processor seems to be multithreaded, so the OS actually thinks I have 8 cores. I usually run 6.

This command must be executed in parallel to IPython. You can, for example, run it in a different terminal or send it to the background of the same terminal, either by appending & to the command or by pressing Ctrl-Z and then typing bg. I tend to run it in a separate terminal tab and send it to the background.

When the time comes to stop the engines, you can either bring the ipcluster job to the foreground and abort (Ctrl-C) or type

ipcluster stop

Initializing the clients in IPython

Now, in your instance of IPython, you need to import IPython's parallel client module.

from IPython.parallel import Client

Then, we can assign a client object that will have access to the engines that you started with the ipcluster command.

c = Client()

We aren't quite ready to start calculating. From the documentation,
The two primary models for interacting with engines are:
  • A Direct interface, where engines are addressed explicitly.
  • A LoadBalanced interface, where the Scheduler is trusted with assigning work to appropriate engines.
I use the LoadBalanced interface because it decides on the most efficient way to assign work to the engines. The interface objects provides a new map function, which works like the intrinsic map function but invokes the engines, in parallel. To create the interface, type

lbv = c.load_balanced_view()

I'm not quite sure why, but we also need this command.

lbv.block = True

At this point, you could start calculating, if you have work that doesn't depend on having any data or any of your own functions. For example, try

lbv.map(lambda x:x**10, range(32))

In reality (or, at least, my reality), I need to make calculations that involve my own functions and data and there's a bit more to do to make all that work.

Preparing the clients

I think of the clients as new IPython instances that haven't issued any import commands or defined any variables or anything. So I need to make those imports and define those variables.

There are two ways to import packages. The first, which I use, boils down to telling the engines to issue the import command themselves. For example, to import NumPy,

c[:].execute('import numpy')

Alternatively, you can enter

with c[:].sync_imports():
    import numpy

I'm not aware of either method being preferred.

To define variables, we could use the execute function above. But that might get painful for complicated expressions like list comprehensions. Much better is to assign the variable directly in the dictionary of global variables. For a variable my_var defined in the local IPython instace, enter

c[:]['my_var'] = my_var

Calling your own functions

My work originally used a function with a call signature something like

output = my_fun(var1,var2,var3,list_of_arrays1,list_of_arrays2,list_of_arrays3,constant)

I couldn't figure out how to make this play nice with the map command, so I re-organized the function in two ways. First, I pre-processed my data in such a way that the last constant was no longer necessary. I was lucky that this was very easy. (In fact, I should've done it before because it removed a list comprehension from the innermost loop.) Second, I combined the lists with zip and had the function unpack once called. So I then had a call signature

output = my_package.my_fun(var1,var2,var3,zipped_up_arrays)

Finally, I invoked the parallel calculation with

output = 
lbv.map(lambda x: my_package.my_fun(var1,var2,var3,x),zip(list_of_arrays1,list_of_arrays2,list_of_arrays3)

Et voila! My calculation was done vastly faster.

The only problem...

...is that there seems to be a memory leak somewhere in ipcluster or the engines themselves. The result is that I kill the engines once in a while and re-initialize the client and interface objects before I run out of memory. Apparently this is a known problem that can be circumvented by manually clearing the client and interface objects' cache

view.results.clear()
client.results.clear()
client.metadata.clear()

but I generally haven't found that this helps at all.

Have you used IPython's parallel routines? See something silly I'm doing? Let me know in the comments!

No comments:

Post a Comment