For C++, we can use OpenMP to do parallel programming; however, OpenMP will not work for Python. What should I do if I want to parallel some parts of my python program?
The structure of the code may be considered as:
solve2 are two independent function. How to run this kind of code in parallel instead of in sequence in order to reduce the running time?
The code is:
def solve(Q, G, n): i = 0 tol = 10 ** -4 while i < 1000: inneropt, partition, x = setinner(Q, G, n) outeropt = setouter(Q, G, n) if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol: break node1 = partition node2 = partition G = updateGraph(G, node1, node2) if i == 999: print "Maximum iteration reaches" print inneropt
setouter are two independent functions. That’s where I want to parallel…
You can use the multiprocessing module. For this case I might use a processing pool:
from multiprocessing import Pool pool = Pool() result1 = pool.apply_async(solve1, [A]) # evaluate "solve1(A)" asynchronously result2 = pool.apply_async(solve2, [B]) # evaluate "solve2(B)" asynchronously answer1 = result1.get(timeout=10) answer2 = result2.get(timeout=10)
This will spawn processes that can do generic work for you. Since we did not pass
processes, it will spawn one process for each CPU core on your machine. Each CPU core can execute one process simultaneously.
If you want to map a list to a single function you would do this:
args = [A, B] results = pool.map(solve1, args)
Don’t use threads because the GIL locks any operations on python objects.
This can be done very elegantly with Ray.
To parallelize your example, you’d need to define your functions with the
@ray.remote decorator, and then invoke them with
import ray ray.init() # Define the functions. def solve1(a): return 1 def solve2(b): return 2 # Start two tasks in the background. x_id = solve1.remote(0) y_id = solve2.remote(1) # Block until the tasks are done and get the results. x, y = ray.get([x_id, y_id])
There are a number of advantages of this over the multiprocessing module.
- The same code will run on a multicore machine as well as a cluster of machines.
- Processes share data efficiently through shared memory and zero-copy serialization.
- Error messages are propagated nicely.
These function calls can be composed together, e.g.,
def f(x): return x + 1 x_id = f.remote(1) y_id = f.remote(x_id) z_id = f.remote(y_id) ray.get(z_id) # returns 4
- In addition to invoking functions remotely, classes can be instantiated remotely as actors.
Note that Ray is a framework I’ve been helping develop.
CPython uses the Global Interpreter Lock which makes parallel programing a bit more interesting than C++
This topic has several useful examples and descriptions of the challenge:
The solution, as others have said, is to use multiple processes. Which framework is more appropriate, however, depends on many factors. In addition to the ones already mentioned, there is also charm4py and mpi4py (I am the developer of charm4py).
There is a more efficient way to implement the above example than using the worker pool abstraction. The main loop sends the same parameters (including the complete graph
G) over and over to workers in each of the 1000 iterations. Since at least one worker will reside on a different process, this involves copying and sending the arguments to the other process(es). This could be very costly depending on the size of the objects. Instead, it makes sense to have workers store state and simply send the updated information.
For example, in charm4py this can be done like this:
class Worker(Chare): def __init__(self, Q, G, n): self.G = G ... def setinner(self, node1, node2): self.updateGraph(node1, node2) ... def solve(Q, G, n): # create 2 workers, each on a different process, passing the initial state worker_a = Chare(Worker, onPE=0, args=[Q, G, n]) worker_b = Chare(Worker, onPE=1, args=[Q, G, n]) while i < 1000: result_a = worker_a.setinner(node1, node2, ret=True) # execute setinner on worker A result_b = worker_b.setouter(node1, node2, ret=True) # execute setouter on worker B inneropt, partition, x = result_a.get() # wait for result from worker A outeropt = result_b.get() # wait for result from worker B ...
Note that for this example we really only need one worker. The main loop could execute one of the functions, and have the worker execute the other. But my code helps to illustrate a couple of things:
- Worker A runs in process 0 (same as the main loop). While
result_a.get()is blocked waiting on the result, worker A does the computation in the same process.
- Arguments are automatically passed by reference to worker A, since it is in the same
process (there is no copying involved).
In some cases, it’s possible to automatically parallelize loops using Numba, though it only works with a small subset of Python:
from numba import njit, prange def prange_test(A): s = 0 # Without "parallel=True" in the jit-decorator # the prange statement is equivalent to range for i in prange(A.shape): s += A[i] return s
Unfortunately, it seems that Numba only works with Numpy arrays, but not with other Python objects. In theory, it might also be possible to compile Python to C++ and then automatically parallelize it using the Intel C++ compiler, though I haven’t tried this yet.
You can use
joblib library to do parallel computation and multiprocessing.
from joblib import Parallel, delayed
You can simply create a function
foo which you want to be run in parallel and based on the following piece of code implement parallel processing:
output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)
num_cores can be obtained from
multiprocessing library as followed:
import multiprocessing num_cores = multiprocessing.cpu_count()
If you have a function with more than one input argument, and you just want to iterate over one of the arguments by a list, you can use the the
partial function from
functools library as follow:
from joblib import Parallel, delayed import multiprocessing from functools import partial def foo(arg1, arg2, arg3, arg4): ''' body of the function ''' return output input = [11,32,44,55,23,0,100,...] # arbitrary list num_cores = multiprocessing.cpu_count() foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4) # arg1 is being fetched from input list output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)
You can find a complete explanation of the python and R multiprocessing with couple of examples here.