python: reading subprocess output in threads

muckl picture muckl · Mar 21, 2012 · Viewed 7.9k times · Source

I have an executable that I call using subprocess.Popen. Then, I intend to feed it some data via stdin using a thread that reads its value from a Queue which will later be populated in another thread. The output should be read using the stdout pipe in another thread and again be sorted in a Queue.

As far as I understand from my previous research, using threads with Queue is good practice.

The external executable, unfortunately, will not quickly give me an answer for every line that is piped in, so that simple write, readline cycles are not an option. The executable implements some internal multithreading and I want the output as soon as it becomes available, therefore the additional reader thread.

As an example for testing the executable will just shuffle each line (shuffleline.py):

#!/usr/bin/python -u
import sys
from random import shuffle

for line in sys.stdin:
    line = line.strip()

    # shuffle line
    line = list(line)
    shuffle(line)
    line = "".join(line)

    sys.stdout.write("%s\n"%(line))
    sys.stdout.flush() # avoid buffers

Please note that this is already as unbuffered as possible. Or isn't it? This is my stripped down test program:

#!/usr/bin/python -u
import sys
import Queue
import threading
import subprocess

class WriteThread(threading.Thread):
    def __init__(self, p_in, source_queue):
        threading.Thread.__init__(self)
        self.pipe = p_in
        self.source_queue = source_queue

    def run(self):
        while True:
            source = self.source_queue.get()
            print "writing to process: ", repr(source)
            self.pipe.write(source)
            self.pipe.flush()
            self.source_queue.task_done()

class ReadThread(threading.Thread):
    def __init__(self, p_out, target_queue):
        threading.Thread.__init__(self)
        self.pipe = p_out
        self.target_queue = target_queue

    def run(self):
        while True:
            line = self.pipe.readline() # blocking read
            if line == '':
                break
            print "reader read: ", line.rstrip()
            self.target_queue.put(line)

if __name__ == "__main__":

    cmd = ["python", "-u", "./shuffleline.py"] # unbuffered
    proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

    source_queue = Queue.Queue()
    target_queue = Queue.Queue()

    writer = WriteThread(proc.stdin, source_queue)
    writer.setDaemon(True)
    writer.start()

    reader = ReadThread(proc.stdout, target_queue)
    reader.setDaemon(True)
    reader.start()

    # populate queue
    for i in range(10):
        source_queue.put("string %s\n" %i)
    source_queue.put("")

    print "source_queue empty: ", source_queue.empty()
    print "target_queue empty: ", target_queue.empty()

    import time
    time.sleep(2) # expect some output from reader thread

    source_queue.join() # wait until all items in source_queue are processed
    proc.stdin.close()  # should end the subprocess
    proc.wait()

this give the following output (python2.7):

writing to process:  'string 0\n'
writing to process:  'string 1\n'
writing to process:  'string 2\n'
writing to process:  'string 3\n'
writing to process:  'string 4\n'
writing to process:  'string 5\n'
writing to process:  'string 6\n'
source_queue empty: writing to process:  'string 7\n'
writing to process:  'string 8\n'
writing to process:  'string 9\n'
writing to process:  ''
 True
target_queue empty:  True

then nothing for 2 seconds ...

reader read:  rgsn0i t
reader read:  nrg1sti
reader read:  tis n2rg
reader read:  snt gri3
reader read:  nsri4 tg
reader read:  stir5 gn
reader read:   gnri6ts
reader read:   ngrits7
reader read:  8nsrt ig
reader read:  sg9 nitr

The interleaving at the beginning is expected. However the output of the subprocess does not appear until after the subprocess ends. With more lines piped in I get some output, thus I assume a caching problem in the stdout pipe. According to other questions posted here flushing stdout (in the subprocess) should work, at least on Linux.

Answer

Thomas Wouters picture Thomas Wouters · Mar 21, 2012

Your problem has nothing to do the subprocess module, or threads (problematic as they are), or even mixing subprocesses and threads (a very bad idea, even worse than using threads to start with, unless you're using the backport of Python 3.2's subprocess module that you can get from code.google.com/p/python-subprocess32) or accessing the same things from multiple threads (as your print statements do.)

What happens is that your shuffleline.py program buffers. Not in output, but in input. Although it isn't very obvious, when you iterate over a fileobject, Python will read in blocks, usually 8k bytes. Since sys.stdin is a fileobject, your for loop will buffer until EOF or a full block:

for line in sys.stdin:
    line = line.strip()
    ....

If you want to not do this, either use a while loop to call sys.stdin.readline() (which returns '' for EOF):

while True:
    line = sys.stdin.readline()
    if not line:
        break
    line = line.strip()
    ...

or use the two-argument form of iter(), which creates an iterator that calls the first argument until the second argument (the "sentinel") is returned:

for line in iter(sys.stdin.readline, ''):
    line = line.strip()
    ...

I would also be remiss if I did not suggest not using threads for this, but non-blocking I/O on the subprocess's pipes instead, or even something like twisted.reactor.spawnProcess which has lots of ways of hooking processes and other things together as consumers and producers.