How can I recover the return value of a function passed to multiprocessing.Process?

Posted on

Solving problem is about exposing yourself to as many situations as possible like How can I recover the return value of a function passed to multiprocessing.Process? and practice these strategies over and over. With time, it becomes second nature and a natural way you approach any problems in general. Big or small, always start with a plan, use other strategies mentioned here till you are confident and ready to code the solution.
In this post, my aim is to share an overview the topic about How can I recover the return value of a function passed to multiprocessing.Process?, which can be followed any time. Take easy to follow this discuss.

How can I recover the return value of a function passed to multiprocessing.Process?

In the example code below, I’d like to recover the return value of the function worker. How can I go about doing this? Where is this value stored?

Example Code:

import multiprocessing
def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum
if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()
    for proc in jobs:
        proc.join()
    print jobs

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

I can’t seem to find the relevant attribute in the objects stored in jobs.

Asked By: blz

||

Answer #1:

Use shared variable to communicate. For example like this:

import multiprocessing
def worker(procnum, return_dict):
    """worker function"""
    print(str(procnum) + " represent!")
    return_dict[procnum] = procnum
if __name__ == "__main__":
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i, return_dict))
        jobs.append(p)
        p.start()
    for proc in jobs:
        proc.join()
    print(return_dict.values())
Answered By: vartec

Answer #2:

I think the approach suggested by @sega_sai is the better one. But it really needs a code example, so here goes:

import multiprocessing
from os import getpid
def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()
if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

Which will print the return values:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

If you are familiar with map (the Python 2 built-in) this should not be too challenging. Otherwise have a look at sega_Sai’s link.

Note how little code is needed. (Also note how processes are re-used).

Answered By: Mark

Answer #3:

For some reason, I couldn’t find a general example of how to do this with Queue anywhere (even Python’s doc examples don’t spawn multiple processes), so here’s what I got working after like 10 tries:

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)
def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue is a blocking, thread-safe queue that you can use to store the return values from the child processes. So you have to pass the queue to each process. Something less obvious here is that you have to get() from the queue before you join the Processes or else the queue fills up and blocks everything.

Update for those who are object-oriented (tested in Python 3.4):

from multiprocessing import Process, Queue
class Multiprocessor():
    def __init__(self):
        self.processes = []
        self.queue = Queue()
    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)
    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()
    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets
# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)
Answered By: sudo

Answer #4:

This example shows how to use a list of multiprocessing.Pipe instances to return strings from an arbitrary number of processes:

import multiprocessing
def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)
def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()
    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list
if __name__ == '__main__':
    main()

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

This solution uses fewer resources than a multiprocessing.Queue which uses

  • a Pipe
  • at least one Lock
  • a buffer
  • a thread

or a multiprocessing.SimpleQueue which uses

  • a Pipe
  • at least one Lock

It is very instructive to look at the source for each of these types.

Answered By: David Cullen

Answer #5:

For anyone else who is seeking how to get a value from a Process using Queue:

import multiprocessing
ret = {'foo': False}
def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)
if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join()
    print(queue.get())  # Prints {"foo": True}

Note that in Windows or Jupyter Notebook, with multithreading you have to save this as a file and execute the file. If you do it in a command prompt you will see an error like this:

 AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>
Answered By: Matthew Moisen

Answer #6:

It seems that you should use the multiprocessing.Pool class instead and use the methods .apply() .apply_async(), map()

http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult

Answered By: sega_sai

Answer #7:

You can use the exit built-in to set the exit code of a process. It can be obtained from the exitcode attribute of the process:

import multiprocessing
def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)
if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()
    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
Answered By: David Cullen

Answer #8:

The pebble package has a nice abstraction leveraging multiprocessing.Pipe which makes this quite straightforward:

from pebble import concurrent
@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg
future = function(1, kwarg=1)
print(future.result())

Example from: https://pythonhosted.org/Pebble/#concurrent-decorators

Answered By: erikreed

Leave a Reply

Your email address will not be published.