# Use numpy array in shared memory for multiprocessing

Posted on

Solving problem is about exposing yourself to as many situations as possible like Use numpy array in shared memory for multiprocessing and practice these strategies over and over. With time, it becomes second nature and a natural way you approach any problems in general. Big or small, always start with a plan, use other strategies mentioned here till you are confident and ready to code the solution.
In this post, my aim is to share an overview the topic about Use numpy array in shared memory for multiprocessing, which can be followed any time. Take easy to follow this discuss.

Use numpy array in shared memory for multiprocessing

I would like to use a numpy array in shared memory for use with the multiprocessing module. The difficulty is using it like a numpy array, and not just as a ctypes array.

``````from multiprocessing import Process, Array
import scipy
def f(a):
a[0] = -a[0]
if __name__ == '__main__':
# Create the array
N = int(10)
unshared_arr = scipy.rand(N)
arr = Array('d', unshared_arr)
print "Originally, the first two elements of arr = %s"%(arr[:2])
# Create, start, and finish the child processes
p = Process(target=f, args=(arr,))
p.start()
p.join()
# Printing out the changed values
print "Now, the first two elements of arr = %s"%arr[:2]
``````

This produces output such as:

``````Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]
``````

The array can be accessed in a ctypes manner, e.g. `arr[i]` makes sense. However, it is not a numpy array, and I cannot perform operations such as `-1*arr`, or `arr.sum()`. I suppose a solution would be to convert the ctypes array into a numpy array. However (besides not being able to make this work), I don’t believe it would be shared anymore.

It seems there would be a standard solution to what has to be a common problem.

To add to @unutbu’s (not available anymore) and @Henry Gomersall’s answers. You could use `shared_arr.get_lock()` to synchronize access when needed:

``````shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
with shared_arr.get_lock(): # synchronize access
arr = np.frombuffer(shared_arr.get_obj()) # no data copying
arr[i] = -arr[i]
``````

### Example

``````import ctypes
import logging
import multiprocessing as mp
from contextlib import closing
import numpy as np
info = mp.get_logger().info
def main():
logger = mp.log_to_stderr()
logger.setLevel(logging.INFO)
# create shared array
N, M = 100, 11
shared_arr = mp.Array(ctypes.c_double, N)
arr = tonumpyarray(shared_arr)
# fill with random values
arr[:] = np.random.uniform(size=N)
arr_orig = arr.copy()
# write to arr from different processes
with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
# many processes access the same slice
stop_f = N // 10
p.map_async(f, [slice(stop_f)]*M)
# many processes access different slices of the same array
assert M % 2 # odd
step = N // 10
p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
p.join()
assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)
def init(shared_arr_):
global shared_arr
shared_arr = shared_arr_ # must be inherited, not passed as an argument
def tonumpyarray(mp_arr):
return np.frombuffer(mp_arr.get_obj())
def f(i):
"""synchronized."""
with shared_arr.get_lock(): # synchronize access
g(i)
def g(i):
"""no synchronization."""
info("start %s" % (i,))
arr = tonumpyarray(shared_arr)
arr[i] = -1 * arr[i]
info("end   %s" % (i,))
if __name__ == '__main__':
mp.freeze_support()
main()
``````

If you don’t need synchronized access or you create your own locks then `mp.Array()` is unnecessary. You could use `mp.sharedctypes.RawArray` in this case.

The `Array` object has a `get_obj()` method associated with it, which returns the ctypes array which presents a buffer interface. I think the following should work…

``````from multiprocessing import Process, Array
import scipy
import numpy
def f(a):
a[0] = -a[0]
if __name__ == '__main__':
# Create the array
N = int(10)
unshared_arr = scipy.rand(N)
a = Array('d', unshared_arr)
print "Originally, the first two elements of arr = %s"%(a[:2])
# Create, start, and finish the child process
p = Process(target=f, args=(a,))
p.start()
p.join()
# Print out the changed values
print "Now, the first two elements of arr = %s"%a[:2]
b = numpy.frombuffer(a.get_obj())
b[0] = 10.0
print a[0]
``````

When run, this prints out the first element of `a` now being 10.0, showing `a` and `b` are just two views into the same memory.

In order to make sure it is still multiprocessor safe, I believe you will have to use the `acquire` and `release` methods that exist on the `Array` object, `a`, and its built in lock to make sure its all safely accessed (though I’m not an expert on the multiprocessor module).

While the answers already given are good, there is a much easier solution to this problem provided two conditions are met:

1. You are on a POSIX-compliant operating system (e.g. Linux, Mac OSX); and

In this case you do not need to fiddle with explicitly making variables shared, as the child processes will be created using a fork. A forked child automatically shares the parent’s memory space. In the context of Python multiprocessing, this means it shares all module-level variables; note that this does not hold for arguments that you explicitly pass to your child processes or to the functions you call on a `multiprocessing.Pool` or so.

A simple example:

``````import multiprocessing
import numpy as np
# will hold the (implicitly mem-shared) data
data_array = None
# child worker function
def job_handler(num):
# built-in id() returns unique memory ID of a variable
return id(data_array), np.sum(data_array)
def launch_jobs(data, num_jobs=5, num_worker=4):
global data_array
data_array = data
pool = multiprocessing.Pool(num_worker)
return pool.map(job_handler, range(num_jobs))
# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))
# this will print 'True' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))
``````

I’ve written a small python module that uses POSIX shared memory to share numpy arrays between python interpreters. Maybe you will find it handy.

https://pypi.python.org/pypi/SharedArray

Here’s how it works:

``````import numpy as np
import SharedArray as sa
# Create an array in shared memory
a = sa.create("test1", 10)
# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")
# See how they are actually sharing the same memory block
a[0] = 42
print(b[0])
# Destroying a does not affect b.
del a
print(b[0])
# See how "test1" is still present in shared memory even though we
# destroyed the array a.
sa.list()
# Now destroy the array "test1" from memory.
sa.delete("test1")
# The array b is not affected, but once you destroy it then the
# data are lost.
print(b[0])
``````

You can use the `sharedmem` module: https://bitbucket.org/cleemesser/numpy-sharedmem

Here’s your original code then, this time using shared memory that behaves like a NumPy array (note the additional last statement calling a NumPy `sum()` function):

``````from multiprocessing import Process
import sharedmem
import scipy
def f(a):
a[0] = -a[0]
if __name__ == '__main__':
# Create the array
N = int(10)
unshared_arr = scipy.rand(N)
arr = sharedmem.empty(N)
arr[:] = unshared_arr.copy()
print "Originally, the first two elements of arr = %s"%(arr[:2])
# Create, start, and finish the child process
p = Process(target=f, args=(arr,))
p.start()
p.join()
# Print out the changed values
print "Now, the first two elements of arr = %s"%arr[:2]
# Perform some NumPy operation
print arr.sum()
``````
The answers/resolutions are collected from stackoverflow, are licensed under cc by-sa 2.5 , cc by-sa 3.0 and cc by-sa 4.0 .