I’m trying to use multiprocessing with pandas dataframe, that is split the dataframe to 8 parts. apply some function to each part using apply (with each part processed in different process).
Here’s the solution I finally found:
import multiprocessing as mp import pandas.util.testing as pdt def process_apply(x): # do some stuff to data here def process(df): res = df.apply(process_apply, axis=1) return res if __name__ == '__main__': p = mp.Pool(processes=8) split_dfs = np.array_split(big_df,8) pool_results = p.map(aoi_proc, split_dfs) p.close() p.join() # merging parts processed by different processes parts = pd.concat(pool_results, axis=0) # merging newly calculated parts to big_df big_df = pd.concat([big_df, parts], axis=1) # checking if the dfs were merged correctly pdt.assert_series_equal(parts['id'], big_df['id'])
You can use https://github.com/nalepae/pandarallel, as in the following example:
from pandarallel import pandarallel from math import sin pandarallel.initialize() def func(x): return sin(x**2) df.parallel_apply(func, axis=1)
A more generic version based on the author solution, that allows to run it on every function and dataframe:
from multiprocessing import Pool from functools import partial import numpy as np def parallelize(data, func, num_of_processes=8): data_split = np.array_split(data, num_of_processes) pool = Pool(num_of_processes) data = pd.concat(pool.map(func, data_split)) pool.close() pool.join() return data def run_on_subset(func, data_subset): return data_subset.apply(func, axis=1) def parallelize_on_rows(data, func, num_of_processes=8): return parallelize(data, partial(run_on_subset, func), num_of_processes)
So the following line:
Since I don’t have much of your data script, this is a guess, but I’d suggest using
p.map instead of
apply_async with the callback.
p = mp.Pool(8) pool_results = p.map(process, np.array_split(big_df,8)) p.close() p.join() results =  for result in pool_results: results.extend(result)
This worked well for me:
rows_iter = (row for _, row in df.iterrows()) with multiprocessing.Pool() as pool: df['new_column'] = pool.map(process_apply, rows_iter)
You can set the amount of cores (and the chunking behaviour) upon init:
import pandas as pd import mapply mapply.init(n_workers=-1) def process_apply(x): # do some stuff to data here def process(df): # spawns a pathos.multiprocessing.ProcessPool if sensible res = df.mapply(process_apply, axis=1) return res
By default (
n_workers=-1), the package uses all physical CPUs available on the system. If your system uses hyper-threading (usually twice the amount of physical CPUs would show up),
mapply will spawn one extra worker to prioritise the multiprocessing pool over other processes on the system.
You could also use all logical cores instead (beware that like this the CPU-bound processes will be fighting for physical CPUs, which might slow down your operation):
import multiprocessing n_workers = multiprocessing.cpu_count() # or more explicit import psutil n_workers = psutil.cpu_count(logical=True)
I also run into the same problem when I use
multiprocessing.map() to apply function to different chunk of a large dataframe.
I just want to add several points just in case other people run into the same problem as I do.
- remember to add
if __name__ == '__main__':
- execute the file in a
.pyfile, if you use
ipython/jupyter notebook, then you can not run
multiprocessing(this is true for my case, though I have no clue)
Install Pyxtension that simplifies using parallel map and use like this:
from pyxtension.streams import stream big_df = pd.concat(stream(np.array_split(df, multiprocessing.cpu_count())).mpmap(process))
I ended up using
concurrent.futures.ProcessPoolExecutor.map in place of
multiprocessing.Pool.map which took 316 microseconds for some code that took 12 seconds in serial.