Multiprocessing: How to use Pool.map on a function defined in a class?

Posted on

Question :

Multiprocessing: How to use Pool.map on a function defined in a class?

When I run something like:

from multiprocessing import Pool

p = Pool(5)
def f(x):
     return x*x

p.map(f, [1,2,3])

it works fine. However, putting this as a function of a class:

class calculate(object):
    def run(self):
        def f(x):
            return x*x

        p = Pool()
        return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

Gives me the following error:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/sw/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

I’ve seen a post from Alex Martelli dealing with the same kind of problem, but it wasn’t explicit enough.

Asked By: Mermoz

||

Answer #1:

I also was annoyed by restrictions on what sort of functions pool.map could accept. I wrote the following to circumvent this. It appears to work, even for recursive use of parmap.

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(pipe, x):
        pipe.send(f(x))
        pipe.close()
    return fun

def parmap(f, X):
    pipe = [Pipe() for x in X]
    proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
    [p.start() for p in proc]
    [p.join() for p in proc]
    return [p.recv() for (p, c) in pipe]

if __name__ == '__main__':
    print parmap(lambda x: x**x, range(1, 5))
Answered By: mrule

Answer #2:

I could not use the codes posted so far because the codes using “multiprocessing.Pool” do not work with lambda expressions and the codes not using “multiprocessing.Pool” spawn as many processes as there are work items.

I adapted the code s.t. it spawns a predefined amount of workers and only iterates through the input list if there exists an idle worker. I also enabled the “daemon” mode for the workers s.t. ctrl-c works as expected.

import multiprocessing


def fun(f, q_in, q_out):
    while True:
        i, x = q_in.get()
        if i is None:
            break
        q_out.put((i, f(x)))


def parmap(f, X, nprocs=multiprocessing.cpu_count()):
    q_in = multiprocessing.Queue(1)
    q_out = multiprocessing.Queue()

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
            for _ in range(nprocs)]
    for p in proc:
        p.daemon = True
        p.start()

    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [q_in.put((None, None)) for _ in range(nprocs)]
    res = [q_out.get() for _ in range(len(sent))]

    [p.join() for p in proc]

    return [x for i, x in sorted(res)]


if __name__ == '__main__':
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))
Answered By: klaus se

Answer #3:

Multiprocessing and pickling is broken and limited unless you jump outside the standard library.

If you use a fork of multiprocessing called pathos.multiprocesssing, you can directly use classes and class methods in multiprocessing’s map functions. This is because dill is used instead of pickle or cPickle, and dill can serialize almost anything in python.

pathos.multiprocessing also provides an asynchronous map function… and it can map functions with multiple arguments (e.g. map(math.pow, [1,2,3], [4,5,6]))

See discussions:
What can multiprocessing and dill do together?

and:
http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

It even handles the code you wrote initially, without modification, and from the interpreter. Why do anything else that’s more fragile and specific to a single case?

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
...  def run(self):
...   def f(x):
...    return x*x
...   p = Pool()
...   return p.map(f, [1,2,3])
... 
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]

Get the code here:
https://github.com/uqfoundation/pathos

And, just to show off a little more of what it can do:

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]
Answered By: Mike McKerns

Answer #4:

There is currently no solution to your problem, as far as I know: the function that you give to map() must be accessible through an import of your module. This is why robert’s code works: the function f() can be obtained by importing the following code:

def f(x):
    return x*x

class Calculate(object):
    def run(self):
        p = Pool()
        return p.map(f, [1,2,3])

if __name__ == '__main__':
    cl = Calculate()
    print cl.run()

I actually added a “main” section, because this follows the recommendations for the Windows platform (“Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects”).

I also added an uppercase letter in front of Calculate, so as to follow PEP 8. 🙂

Answered By: Eric O Lebigot

Answer #5:

The solution by mrule is correct but has a bug: if the child sends back a large amount of data, it can fill the pipe’s buffer, blocking on the child’s pipe.send(), while the parent is waiting for the child to exit on pipe.join(). The solution is to read the child’s data before join()ing the child. Furthermore the child should close the parent’s end of the pipe to prevent a deadlock. The code below fixes that. Also be aware that this parmap creates one process per element in X. A more advanced solution is to use multiprocessing.cpu_count() to divide X into a number of chunks, and then merge the results before returning. I leave that as an exercise to the reader so as not to spoil the conciseness of the nice answer by mrule. 😉

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(ppipe, cpipe,x):
        ppipe.close()
        cpipe.send(f(x))
        cpipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    ret = [p.recv() for (p,c) in pipe]
    [p.join() for p in proc]
    return ret

if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))
Answered By: Bob McElrath

Answer #6:

I’ve also struggled with this. I had functions as data members of a class, as a simplified example:

from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # Needed to do something like this (the following line won't work)
        return pool.map(self.f,list1,list2)  

I needed to use the function self.f in a Pool.map() call from within the same class and self.f did not take a tuple as an argument. Since this function was embedded in a class, it was not clear to me how to write the type of wrapper other answers suggested.

I solved this problem by using a different wrapper that takes a tuple/list, where the first element is the function, and the remaining elements are the arguments to that function, called eval_func_tuple(f_args). Using this, the problematic line can be replaced by return pool.map(eval_func_tuple, itertools.izip(itertools.repeat(self.f), list1, list2)). Here is the full code:

File: util.py

def add(a, b): return a+b

def eval_func_tuple(f_args):
    """Takes a tuple of a function and args, evaluates and returns result"""
    return f_args[0](*f_args[1:])  

File: main.py

from multiprocessing import Pool
import itertools
import util  

pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # The following line will now work
        return pool.map(util.eval_func_tuple, 
            itertools.izip(itertools.repeat(self.f), list1, list2)) 

if __name__ == '__main__':
    myExample = Example(util.add)
    list1 = [1, 2, 3]
    list2 = [10, 20, 30]
    print myExample.add_lists(list1, list2)  

Running main.py will give [11, 22, 33]. Feel free to improve this, for example eval_func_tuple could also be modified to take keyword arguments.

On another note, in another answers, the function “parmap” can be made more efficient for the case of more Processes than number of CPUs available. I’m copying an edited version below. This is my first post and I wasn’t sure if I should directly edit the original answer. I also renamed some variables.

from multiprocessing import Process, Pipe  
from itertools import izip  

def spawn(f):  
    def fun(pipe,x):  
        pipe.send(f(x))  
        pipe.close()  
    return fun  

def parmap(f,X):  
    pipe=[Pipe() for x in X]  
    processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]  
    numProcesses = len(processes)  
    processNum = 0  
    outputList = []  
    while processNum < numProcesses:  
        endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)  
        for proc in processes[processNum:endProcessNum]:  
            proc.start()  
        for proc in processes[processNum:endProcessNum]:  
            proc.join()  
        for proc,c in pipe[processNum:endProcessNum]:  
            outputList.append(proc.recv())  
        processNum = endProcessNum  
    return outputList    

if __name__ == '__main__':  
    print parmap(lambda x:x**x,range(1,5))         
Answered By: Brandt

Answer #7:

I took klaus se’s and aganders3’s answer, and made a documented module that is more readable and holds in one file. You can just add it to your project. It even has an optional progress bar !

"""
The ``processes`` module provides some convenience functions
for using parallel processes in python.

Adapted from http://stackoverflow.com/a/16071616/287297

Example usage:

    print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)

Comments:

"It spawns a predefined amount of workers and only iterates through the input list
 if there exists an idle worker. I also enabled the "daemon" mode for the workers so
 that KeyboardInterupt works as expected."

Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.

Alternatively, use this fork of multiprocessing: 
https://github.com/uqfoundation/multiprocess
"""

# Modules #
import multiprocessing
from tqdm import tqdm

################################################################################
def apply_function(func_to_apply, queue_in, queue_out):
    while not queue_in.empty():
        num, obj = queue_in.get()
        queue_out.put((num, func_to_apply(obj)))

################################################################################
def prll_map(func_to_apply, items, cpus=None, verbose=False):
    # Number of processes to use #
    if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
    # Create queues #
    q_in  = multiprocessing.Queue()
    q_out = multiprocessing.Queue()
    # Process list #
    new_proc  = lambda t,a: multiprocessing.Process(target=t, args=a)
    processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
    # Put all the items (objects) in the queue #
    sent = [q_in.put((i, x)) for i, x in enumerate(items)]
    # Start them all #
    for proc in processes:
        proc.daemon = True
        proc.start()
    # Display progress bar or not #
    if verbose:
        results = [q_out.get() for x in tqdm(range(len(sent)))]
    else:
        results = [q_out.get() for x in range(len(sent))]
    # Wait for them to finish #
    for proc in processes: proc.join()
    # Return results #
    return [x for i, x in sorted(results)]

################################################################################
def test():
    def slow_square(x):
        import time
        time.sleep(2)
        return x**2
    objs    = range(20)
    squares = prll_map(slow_square, objs, 4, verbose=True)
    print "Result: %s" % squares

EDIT: Added @alexander-mcfarlane suggestion and a test function

Answered By: xApple

Answer #8:

I know this was asked over 6 years ago now, but just wanted to add my solution, as some of the suggestions above seem horribly complicated, but my solution was actually very simple.

All I had to do was wrap the pool.map() call to a helper function. Passing the class object along with args for the method as a tuple, which looked a bit like this.

def run_in_parallel(args):
    return args[0].method(args[1])

myclass = MyClass()
method_args = [1,2,3,4,5,6]
args_map = [ (myclass, arg) for arg in method_args ]
pool = Pool()
pool.map(run_in_parallel, args_map)
Answered By: nightowl

Leave a Reply

Your email address will not be published. Required fields are marked *