I am brand new to the multiprocessing
package in python and my confusion will probably be easy for someone who knows more to clear up. I've been reading about concurrency and have searched for other questions like this and have found nothing. (FYI I do NOT want to use multithreading
because the GIL will slow down my application a lot.)
I am thinking in the framework of events. I want to have multiple processes running, waiting for an event to happen. If the event happens, it gets assigned to a particular process, which operates and then returns to its idle state. There may be a better way to do this, but my reasoning is that I should spawn all the processes once and keep them open indefinitely, rather than creating then closing a process every time an event happens. Speed is an issue for me and my events can occur many thousands of times per second.
I came up with the following toy example, which is meant to send even numbers to one process, and odd numbers to another. Both processes are the same, they just append the number to a list.
from multiprocessing import Process, Queue, Pipe
slist=['even','odd']
Q={}
Q['even'] = Queue()
Q['odd'] = Queue()
ev,od = [],[]
Q['even'].put(ev)
Q['odd'].put(od)
P={}
P['even'] = Pipe()
P['odd'] = Pipe()
def add_num(s):
""" The worker function, invoked in a process. The results are placed in
a list that's pushed to a queue."""
# while True :
if not P[s][1].recv():
print s,'- do nothing'
else:
d = Q[s].get()
print d
d.append(P[s][1].recv())
Q[s].put(d)
print Q[s].get()
P[s][0].send(False)
print 'ya'
def piper(s,n):
P[s][0].send(n)
for k in [S for S in slist if S != s]:
P[k][0].send(False)
add_num(s)
procs = [ Process (
target=add_num,
args=(i,)
)
for i in ['even','odd']]
for s in slist:
P[s][0].send(False)
for p in procs:
p.start()
p.join()
for i in range(10):
print i
if i%2==0:
s = 'even'
else:
s = 'odd'
piper(s,i)
print 'results:', Q['odd'].get(),Q['even'].get()
This code produces the following:
even - do nothing
Any insight from the wise into this problem, where my code or reasoning falls short etc. would be greatly appreciated.
Here is an approach I've used a couple of times with good success:
Launch a multiprocessing pool.
Use a multiprocessing SyncManager to create multiple queues (one for each type of data that needs to be handled differently).
Use apply_async to launch the functions that process data. Just like the queues, there should be one function for each type of data that needs to be processed differently. Each function launched gets the queue that corresponds to its data as an input argument. The functions will do their work in an infinite loop that starts by getting data from the queue.
Begin processing. During processing, the main process sorts the data and decides which function should be handling it. Once the decision is made, the data is placed on the queue that corresponds to that function.
After all data has been handled, the main process puts a value called a "poison pill" into each queue. The poison pill is a value that the worker processes all recognize as a signal to exit. Since the queues are first-in first-out (FIFO), then they are guaranteed to pull the poison pill as the last item in the queues.
Close and join the multiprocessing pool.
Below is an example of this algorithm. The example code's goal is to use the algorithm previously described to divide odd numbers by 2, and even numbers by -2. All results are placed in a shared list accessible by the main process.
import multiprocessing
POISON_PILL = "STOP"
def process_odds(in_queue, shared_list):
while True:
# block until something is placed on the queue
new_value = in_queue.get()
# check to see if we just got the poison pill
if new_value == POISON_PILL:
break
# we didn't, so do the processing and put the result in the
# shared data structure
shared_list.append(new_value/2)
return
def process_evens(in_queue, shared_list):
while True:
new_value = in_queue.get()
if new_value == POISON_PILL:
break
shared_list.append(new_value/-2)
return
def main():
# create a manager - it lets us share native Python object types like
# lists and dictionaries without worrying about synchronization -
# the manager will take care of it
manager = multiprocessing.Manager()
# now using the manager, create our shared data structures
odd_queue = manager.Queue()
even_queue = manager.Queue()
shared_list = manager.list()
# lastly, create our pool of workers - this spawns the processes,
# but they don't start actually doing anything yet
pool = multiprocessing.Pool()
# now we'll assign two functions to the pool for them to run -
# one to handle even numbers, one to handle odd numbers
odd_result = pool.apply_async(process_odds, (odd_queue, shared_list))
even_result = pool.apply_async(process_evens, (even_queue, shared_list))
# this code doesn't do anything with the odd_result and even_result
# variables, but you have the flexibility to check exit codes
# and other such things if you want - see docs for AsyncResult objects
# now that the processes are running and waiting for their queues
# to have something, lets give them some work to do by iterating
# over our data, deciding who should process it, and putting it in
# their queue
for i in range(6):
if (i % 2) == 0: # use mod operator to see if "i" is even
even_queue.put(i)
else:
odd_queue.put(i)
# now we've finished giving the processes their work, so send the
# poison pill to tell them to exit
even_queue.put(POISON_PILL)
odd_queue.put(POISON_PILL)
# wait for them to exit
pool.close()
pool.join()
# now we can check the results
print(shared_list)
# ...and exit!
return
if __name__ == "__main__":
main()
This code produces this output:
[0.5, -0.0, 1.5, -1.0, 2.5, -2.0]
Notice that the order of the results is unpredictable, because we can't guarantee in what order the functions will be able to get items from their queues and put the results into the list. But you can certainly to whatever post-processing you need, which could include sorting.
I think this would be a good solution to your issue because:
You're correct that there is huge overhead to spawning processes. This single-producer/multiple-consumer approach eliminates that when you use a pool to keep workers alive for the entire duration of the program.
It addresses your concerns about being able to handle data differently depending on attributes of the data. In your comments, you expressed concerns about being able to send data to specific processes. In this approach, you can choose which processes to give data to, because you have to choose which queue to put it on. (By the way, I think you're thinking of the pool.map function, which, as you correctly believe, doesn't allow you to perform different operations in the same job. apply_async
does.)
I've found it to be very expandable and flexible. Need to add more types of data handling? Just write your handler function, add one more queue, and add to logic to main to route the data to your new function. Are you finding that one queue is getting backed up and becoming a bottleneck? You can call apply_async
with the same target function and queue multiple times to get multiple workers working on the same queue. Just make sure you give the queue enough poison pills so that all of the workers get one.
Any data you want to pass on a queue must be picklable (serializable) by the pickle module. Look here to see what can and can't be pickled.
There are probably other limitations as well, but I can't think of any others off of the top of my head.