How do I use subprocess.Popen to connect multiple processes by pipes?

Posted on

Problem :

How do I execute the following shell command using the Python subprocess module?

echo "input data" | awk -f script.awk | sort > outfile.txt

The input data will come from a string, so I don’t actually need echo. I’ve got this far, can anyone explain how I get it to pipe through sort too?

p_awk = subprocess.Popen(["awk","-f","script.awk"],
                          stdin=subprocess.PIPE,
                          stdout=file("outfile.txt", "w"))
p_awk.communicate( "input data" )

UPDATE: Note that while the accepted answer below doesn’t actually answer the question as asked, I believe S.Lott is right and it’s better to avoid having to solve that problem in the first place!

Solution :

You’d be a little happier with the following.

import subprocess

awk_sort = subprocess.Popen( "awk -f script.awk | sort > outfile.txt",
    stdin=subprocess.PIPE, shell=True )
awk_sort.communicate( b"input datan" )

Delegate part of the work to the shell. Let it connect two processes with a pipeline.

You’d be a lot happier rewriting ‘script.awk’ into Python, eliminating awk and the pipeline.

Edit. Some of the reasons for suggesting that awk isn’t helping.

[There are too many reasons to respond via comments.]

  1. Awk is adding a step of no significant value. There’s nothing unique about awk’s processing that Python doesn’t handle.

  2. The pipelining from awk to sort, for large sets of data, may improve elapsed processing time. For short sets of data, it has no significant benefit. A quick measurement of awk >file ; sort file and awk | sort will reveal of concurrency helps. With sort, it rarely helps because sort is not a once-through filter.

  3. The simplicity of “Python to sort” processing (instead of “Python to awk to sort”) prevents the exact kind of questions being asked here.

  4. Python — while wordier than awk — is also explicit where awk has certain implicit rules that are opaque to newbies, and confusing to non-specialists.

  5. Awk (like the shell script itself) adds Yet Another Programming language. If all of this can be done in one language (Python), eliminating the shell and the awk programming eliminates two programming languages, allowing someone to focus on the value-producing parts of the task.

Bottom line: awk can’t add significant value. In this case, awk is a net cost; it added enough complexity that it was necessary to ask this question. Removing awk will be a net gain.

Sidebar Why building a pipeline (a | b) is so hard.

When the shell is confronted with a | b it has to do the following.

  1. Fork a child process of the original shell. This will eventually become b.

  2. Build an os pipe. (not a Python subprocess.PIPE) but call os.pipe() which returns two new file descriptors that are connected via common buffer. At this point the process has stdin, stdout, stderr from its parent, plus a file that will be “a’s stdout” and “b’s stdin”.

  3. Fork a child. The child replaces its stdout with the new a’s stdout. Exec the a process.

  4. The b child closes replaces its stdin with the new b’s stdin. Exec the b process.

  5. The b child waits for a to complete.

  6. The parent is waiting for b to complete.

I think that the above can be used recursively to spawn a | b | c, but you have to implicitly parenthesize long pipelines, treating them as if they’re a | (b | c).

Since Python has os.pipe(), os.exec() and os.fork(), and you can replace sys.stdin and sys.stdout, there’s a way to do the above in pure Python. Indeed, you may be able to work out some shortcuts using os.pipe() and subprocess.Popen.

However, it’s easier to delegate that operation to the shell.

import subprocess

some_string = b'input_data'

sort_out = open('outfile.txt', 'wb', 0)
sort_in = subprocess.Popen('sort', stdin=subprocess.PIPE, stdout=sort_out).stdin
subprocess.Popen(['awk', '-f', 'script.awk'], stdout=sort_in, 
                 stdin=subprocess.PIPE).communicate(some_string)

To emulate a shell pipeline:

from subprocess import check_call

check_call('echo "input data" | a | b > outfile.txt', shell=True)

without invoking the shell (see 17.1.4.2. Replacing shell pipeline):

#!/usr/bin/env python
from subprocess import Popen, PIPE

a = Popen(["a"], stdin=PIPE, stdout=PIPE)
with a.stdin:
    with a.stdout, open("outfile.txt", "wb") as outfile:
        b = Popen(["b"], stdin=a.stdout, stdout=outfile)
    a.stdin.write(b"input data")
statuses = [a.wait(), b.wait()] # both a.stdin/stdout are closed already

plumbum provides some syntax sugar:

#!/usr/bin/env python
from plumbum.cmd import a, b # magic

(a << "input data" | b > "outfile.txt")()

The analog of:

#!/bin/sh
echo "input data" | awk -f script.awk | sort > outfile.txt

is:

#!/usr/bin/env python
from plumbum.cmd import awk, sort

(awk["-f", "script.awk"] << "input data" | sort > "outfile.txt")()

The accepted answer is sidestepping the issue.
here is a snippet that chains the output of multiple processes:
Note that it also prints the (somewhat) equivalent shell command so you can run it and make sure the output is correct.

#!/usr/bin/env python3

from subprocess import Popen, PIPE

# cmd1 : dd if=/dev/zero bs=1m count=100
# cmd2 : gzip
# cmd3 : wc -c
cmd1 = ['dd', 'if=/dev/zero', 'bs=1M', 'count=100']
cmd2 = ['tee']
cmd3 = ['wc', '-c']
print(f"Shell style : {' '.join(cmd1)} | {' '.join(cmd2)} | {' '.join(cmd3)}")

p1 = Popen(cmd1, stdout=PIPE, stderr=PIPE) # stderr=PIPE optional, dd is chatty
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE)
p3 = Popen(cmd3, stdin=p2.stdout, stdout=PIPE)

print("Output from last process : " + (p3.communicate()[0]).decode())

# thoretically p1 and p2 may still be running, this ensures we are collecting their return codes
p1.wait()
p2.wait()
print("p1 return: ", p1.returncode)
print("p2 return: ", p2.returncode)
print("p3 return: ", p3.returncode)

http://www.python.org/doc/2.5.2/lib/node535.html covered this pretty well. Is there some part of this you didn’t understand?

Your program would be pretty similar, but the second Popen would have stdout= to a file, and you wouldn’t need the output of its .communicate().

Inspired by @Cristian’s answer. I met just the same issue, but with a different command. So I’m putting my tested example, which I believe could be helpful:

grep_proc = subprocess.Popen(["grep", "rabbitmq"],
                             stdin=subprocess.PIPE, 
                             stdout=subprocess.PIPE)
subprocess.Popen(["ps", "aux"], stdout=grep_proc.stdin)
out, err = grep_proc.communicate()

This is tested.

What has been done

  • Declared lazy grep execution with stdin from pipe. This command will be executed at the ps command execution when the pipe will be filled with the stdout of ps.
  • Called the primary command ps with stdout directed to the pipe used by the grep command.
  • Grep communicated to get stdout from the pipe.

I like this way because it is natural pipe conception gently wrapped with subprocess interfaces.

The previous answers missed an important point. Replacing shell pipeline is basically correct, as pointed out by geocar. It is almost sufficient to run communicate on the last element of the pipe.

The remaining problem is passing the input data to the pipeline. With multiple subprocesses, a simple communicate(input_data) on the last element doesn’t work – it hangs forever. You need to create a a pipeline and a child manually like this:

import os
import subprocess

input = """
input data
more input
""" * 10

rd, wr = os.pipe()
if os.fork() != 0: # parent
    os.close(wr)
else:              # child
    os.close(rd)
    os.write(wr, input)
    os.close(wr)
    exit()

p_awk = subprocess.Popen(["awk", "{ print $2; }"],
                         stdin=rd,
                         stdout=subprocess.PIPE)
p_sort = subprocess.Popen(["sort"], 
                          stdin=p_awk.stdout,
                          stdout=subprocess.PIPE)
p_awk.stdout.close()
out, err = p_sort.communicate()
print (out.rstrip())

Now the child provides the input through the pipe, and the parent calls communicate(), which works as expected. With this approach, you can create arbitrary long pipelines without resorting to “delegating part of the work to the shell”. Unfortunately the subprocess documentation doesn’t mention this.

There are ways to achieve the same effect without pipes:

from tempfile import TemporaryFile
tf = TemporaryFile()
tf.write(input)
tf.seek(0, 0)

Now use stdin=tf for p_awk. It’s a matter of taste what you prefer.

The above is still not 100% equivalent to bash pipelines because the signal handling is different. You can see this if you add another pipe element that truncates the output of sort, e.g. head -n 10. With the code above, sort will print a “Broken pipe” error message to stderr. You won’t see this message when you run the same pipeline in the shell. (That’s the only difference though, the result in stdout is the same). The reason seems to be that python’s Popen sets SIG_IGN for SIGPIPE, whereas the shell leaves it at SIG_DFL, and sort‘s signal handling is different in these two cases.

EDIT: pipes is available on Windows but, crucially, doesn’t appear to actually work on Windows. See comments below.

The Python standard library now includes the pipes module for handling this:

https://docs.python.org/2/library/pipes.html, https://docs.python.org/3.4/library/pipes.html

I’m not sure how long this module has been around, but this approach appears to be vastly simpler than mucking about with subprocess.

For me, the below approach is the cleanest and easiest to read

from subprocess import Popen, PIPE

def string_to_2_procs_to_file(input_s, first_cmd, second_cmd, output_filename):
    with open(output_filename, 'wb') as out_f:
        p2 = Popen(second_cmd, stdin=PIPE, stdout=out_f)
        p1 = Popen(first_cmd, stdout=p2.stdin, stdin=PIPE)
        p1.communicate(input=bytes(input_s))
        p1.wait()
        p2.stdin.close()
        p2.wait()

which can be called like so:

string_to_2_procs_to_file('input data', ['awk', '-f', 'script.awk'], ['sort'], 'output.txt')

Leave a Reply

Your email address will not be published. Required fields are marked *