multiprocessing - execute external command and wait before proceeding

nibbana picture nibbana · Aug 4, 2014 · Viewed 15k times · Source

I am using Linux. I have an external executable called "combine" and a loop of 20 iterations. Per each iteration, "combine" needs to be called with an argument that depends on the i-th iteration. Example:

arguments = " "

for i in range(1,20):
    arguments += str(i) + "_image.jpg "
    # begin of pseudo-code 
    execute: "./combine" + arguments  # in parallel using all cores

# pseudo-code continues
wait_for_all_previous_process_to_terminate
execute: "./merge_resized_images"  # use all cores - possible for one single command?

How do I achieve this using the multiprocessing module in Python?

Answer

dano picture dano · Aug 4, 2014

You can use subprocess.Popen to launch the external commands asynchronously, and store each Popen object returned in a list. Once you've launched all the processes, just iterate over them and wait for each to finish using popen_object.wait.

from subprocess import Popen

processes = []
for i in range(1,20):
    arguments += str(i) + "_image.jpg "
    processes.append(subprocess.Popen(shlex.split("./combine" + arguments)))

for p in processes:
    p.wait()
subprocess.call("./merge_resized_images")

However, this will launch twenty concurrent processes, which is probably going to hurt performance.

To avoid that, you can use a ThreadPool to limit yourself to some lower number of concurrent processes (multiprocessing.cpu_count is a good number), and then use pool.join to wait for them all to finish.

import multiprocessing
import subprocess
import shlex

from multiprocessing.pool import ThreadPool

def call_proc(cmd):
    """ This runs in a separate thread. """
    #subprocess.call(shlex.split(cmd))  # This will block until cmd finishes
    p = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, err = p.communicate()
    return (out, err)


pool = ThreadPool(multiprocessing.cpu_count())
results = []
for i in range(1,20):
    arguments += str(i) + "_image.jpg "
    results.append(pool.apply_async(call_proc, ("./combine" + arguments,)))

# Close the pool and wait for each running task to complete
pool.close()
pool.join()
for result in results:
    out, err = result.get()
    print("out: {} err: {}".format(out, err))
subprocess.call("./merge_resized_images")

Each thread will release the GIL while waiting for the subprocess to complete, so they'll all run in parallel.