Basic multiprocessing with while loop

Wapiti picture Wapiti · Apr 11, 2015 · Viewed 14.1k times · Source

I am brand new to the multiprocessing package in python and my confusion will probably be easy for someone who knows more to clear up. I've been reading about concurrency and have searched for other questions like this and have found nothing. (FYI I do NOT want to use multithreading because the GIL will slow down my application a lot.)

I am thinking in the framework of events. I want to have multiple processes running, waiting for an event to happen. If the event happens, it gets assigned to a particular process, which operates and then returns to its idle state. There may be a better way to do this, but my reasoning is that I should spawn all the processes once and keep them open indefinitely, rather than creating then closing a process every time an event happens. Speed is an issue for me and my events can occur many thousands of times per second.

I came up with the following toy example, which is meant to send even numbers to one process, and odd numbers to another. Both processes are the same, they just append the number to a list.

from multiprocessing import Process, Queue, Pipe

slist=['even','odd']

Q={}
Q['even'] = Queue()
Q['odd'] = Queue()

ev,od = [],[]

Q['even'].put(ev)
Q['odd'].put(od)

P={}
P['even'] = Pipe()
P['odd'] = Pipe()



def add_num(s):
    """ The worker function, invoked in a process. The results are placed in
        a list that's pushed to a queue."""
#    while True :
    if not P[s][1].recv():
        print s,'- do nothing'

    else:            
        d = Q[s].get()
        print d
        d.append(P[s][1].recv())
        Q[s].put(d)
        print Q[s].get()
        P[s][0].send(False)
        print 'ya'




def piper(s,n):

    P[s][0].send(n)    
    for k in [S for S in slist if S != s]:
        P[k][0].send(False) 
    add_num(s)


procs = [ Process (
                   target=add_num,
                   args=(i,)
                   ) 
         for i in ['even','odd']]

for s in slist: 
    P[s][0].send(False)

for p in procs:
    p.start()  
    p.join()

for i in range(10):
    print i
    if i%2==0:
        s = 'even'
    else:
        s = 'odd'
    piper(s,i)


print 'results:', Q['odd'].get(),Q['even'].get()

This code produces the following:

even - do nothing

Any insight from the wise into this problem, where my code or reasoning falls short etc. would be greatly appreciated.

Answer

skrrgwasme picture skrrgwasme · Apr 26, 2015

Here is an approach I've used a couple of times with good success:

  1. Launch a multiprocessing pool.

  2. Use a multiprocessing SyncManager to create multiple queues (one for each type of data that needs to be handled differently).

  3. Use apply_async to launch the functions that process data. Just like the queues, there should be one function for each type of data that needs to be processed differently. Each function launched gets the queue that corresponds to its data as an input argument. The functions will do their work in an infinite loop that starts by getting data from the queue.

  4. Begin processing. During processing, the main process sorts the data and decides which function should be handling it. Once the decision is made, the data is placed on the queue that corresponds to that function.

  5. After all data has been handled, the main process puts a value called a "poison pill" into each queue. The poison pill is a value that the worker processes all recognize as a signal to exit. Since the queues are first-in first-out (FIFO), then they are guaranteed to pull the poison pill as the last item in the queues.

  6. Close and join the multiprocessing pool.

Code

Below is an example of this algorithm. The example code's goal is to use the algorithm previously described to divide odd numbers by 2, and even numbers by -2. All results are placed in a shared list accessible by the main process.

import multiprocessing

POISON_PILL = "STOP"

def process_odds(in_queue, shared_list):

    while True:

        # block until something is placed on the queue
        new_value = in_queue.get() 

        # check to see if we just got the poison pill
        if new_value == POISON_PILL:
            break

        # we didn't, so do the processing and put the result in the
        # shared data structure
        shared_list.append(new_value/2)

    return

def process_evens(in_queue, shared_list):

    while True:    
        new_value = in_queue.get() 
        if new_value == POISON_PILL:
            break

        shared_list.append(new_value/-2)

    return

def main():

    # create a manager - it lets us share native Python object types like
    # lists and dictionaries without worrying about synchronization - 
    # the manager will take care of it
    manager = multiprocessing.Manager()

    # now using the manager, create our shared data structures
    odd_queue = manager.Queue()
    even_queue = manager.Queue()
    shared_list = manager.list()

    # lastly, create our pool of workers - this spawns the processes, 
    # but they don't start actually doing anything yet
    pool = multiprocessing.Pool()

    # now we'll assign two functions to the pool for them to run - 
    # one to handle even numbers, one to handle odd numbers
    odd_result = pool.apply_async(process_odds, (odd_queue, shared_list))
    even_result = pool.apply_async(process_evens, (even_queue, shared_list))
    # this code doesn't do anything with the odd_result and even_result
    # variables, but you have the flexibility to check exit codes
    # and other such things if you want - see docs for AsyncResult objects

    # now that the processes are running and waiting for their queues
    # to have something, lets give them some work to do by iterating
    # over our data, deciding who should process it, and putting it in
    # their queue
    for i in range(6):

        if (i % 2) == 0: # use mod operator to see if "i" is even
            even_queue.put(i)

        else:
            odd_queue.put(i)

    # now we've finished giving the processes their work, so send the 
    # poison pill to tell them to exit
    even_queue.put(POISON_PILL)
    odd_queue.put(POISON_PILL)

    # wait for them to exit
    pool.close()
    pool.join()

    # now we can check the results
    print(shared_list)

    # ...and exit!
    return


if __name__ == "__main__":
    main()

Output

This code produces this output:

[0.5, -0.0, 1.5, -1.0, 2.5, -2.0]

Notice that the order of the results is unpredictable, because we can't guarantee in what order the functions will be able to get items from their queues and put the results into the list. But you can certainly to whatever post-processing you need, which could include sorting.

Rationale

I think this would be a good solution to your issue because:

  1. You're correct that there is huge overhead to spawning processes. This single-producer/multiple-consumer approach eliminates that when you use a pool to keep workers alive for the entire duration of the program.

  2. It addresses your concerns about being able to handle data differently depending on attributes of the data. In your comments, you expressed concerns about being able to send data to specific processes. In this approach, you can choose which processes to give data to, because you have to choose which queue to put it on. (By the way, I think you're thinking of the pool.map function, which, as you correctly believe, doesn't allow you to perform different operations in the same job. apply_async does.)

  3. I've found it to be very expandable and flexible. Need to add more types of data handling? Just write your handler function, add one more queue, and add to logic to main to route the data to your new function. Are you finding that one queue is getting backed up and becoming a bottleneck? You can call apply_async with the same target function and queue multiple times to get multiple workers working on the same queue. Just make sure you give the queue enough poison pills so that all of the workers get one.

Limitations

Any data you want to pass on a queue must be picklable (serializable) by the pickle module. Look here to see what can and can't be pickled.

There are probably other limitations as well, but I can't think of any others off of the top of my head.