Python: execute cat subprocess in parallel

Posted on

Question :

Python: execute cat subprocess in parallel

I am running several cat | zgrep commands on a remote server and gathering their output individually for further processing:

class MainProcessor(mp.Process):
    def __init__(self, peaks_array):
        super(MainProcessor, self).__init__()
        self.peaks_array = peaks_array

    def run(self):
        for peak_arr in self.peaks_array:
            peak_processor = PeakProcessor(peak_arr)

class PeakProcessor(mp.Process):
    def __init__(self, peak_arr):
        super(PeakProcessor, self).__init__()
        self.peak_arr = peak_arr

    def run(self):
        command = 'ssh remote_host cat files_to_process | zgrep --mmap "regex" '
        log_lines = (subprocess.check_output(command, shell=True)).split('n')

This, however, results in sequential execution of the subprocess(‘ssh … cat …’) commands. Second peak waits for first to finish and so on.

How can I modify this code so that the subprocess calls run in parallel, while still being able to collect the output for each individually?

Answer #1:

Another approach (rather than the other suggestion of putting shell processes in the background) is to use multithreading.

The run method that you have would then do something like this:

thread.start_new_thread ( myFuncThatDoesZGrep)

To collect results, you can do something like this:

class MyThread(threading.Thread):
   def run(self):
       self.finished = False
       # Your code to run the command here.
       # When finished....
       self.finished = True
       self.results = []

Run the thread as stated above in the link on multithreading. When your thread object has myThread.finished == True, then you can collect the results via myThread.results.

Answered By: FrobberOfBits

Answer #2:

You don’t need neither multiprocessing nor threading to run subprocesses in parallel e.g.:

#!/usr/bin/env python
from subprocess import Popen

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True)
             for i in range(5)]
# collect statuses
exitcodes = [p.wait() for p in processes]

it runs 5 shell commands simultaneously. Note: neither threads nor multiprocessing module are used here. There is no point to add ampersand & to the shell commands: Popen doesn’t wait for the command to complete. You need to call .wait() explicitly.

It is convenient but it is not necessary to use threads to collect output from subprocesses:

#!/usr/bin/env python
from multiprocessing.dummy import Pool # thread pool
from subprocess import Popen, PIPE, STDOUT

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True,
                   stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
             for i in range(5)]

# collect output in parallel
def get_lines(process):
    return process.communicate()[0].splitlines()

outputs = Pool(len(processes)).map(get_lines, processes)

Related: Python threading multiple bash subprocesses?.

Here’s code example that gets output from several subprocesses concurrently in the same thread:

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT

def get_lines(shell_command):
    p = yield from asyncio.create_subprocess_shell(shell_command,
            stdin=PIPE, stdout=PIPE, stderr=STDOUT)
    return (yield from p.communicate())[0].splitlines()

if sys.platform.startswith('win'):
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    loop = asyncio.get_event_loop()

# get commands output in parallel
coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"'
                    .format(i=i, e=sys.executable)) for i in range(5)]
Answered By: jfs

Leave a Reply

Your email address will not be published. Required fields are marked *