I have a function which performs some simulation and returns an array in string format.
I want to run the simulation (the function) for varying input parameter values, over 10000 possible input values, and write the results to a single file.
I am using multiprocessing, specifically, pool.map function to run the simulations in parallel.
Since the whole process of running the simulation function over 10000 times takes a very long time, I really would like to track the process of the entire operation.
I think the problem in my current code below is that, pool.map runs the function 10000 times, without any process tracking during those operations. Once the parallel processing finishes running 10000 simulations (could be hours to days.), then I keep tracking when 10000 simulation results are being saved to a file..So this is not really tracking the processing of pool.map operation.
Is there an easy fix to my code that will allow process tracking?
def simFunction(input):
# Does some simulation and outputs simResult
return str(simResult)
# Parallel processing
inputs = np.arange(0,10000,1)
if __name__ == "__main__":
numCores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes = numCores)
t = pool.map(simFunction, inputs)
with open('results.txt','w') as out:
print("Starting to simulate " + str(len(inputs)) + " input values...")
counter = 0
for i in t:
out.write(i + '\n')
counter = counter + 1
if counter%100==0:
print(str(counter) + " of " + str(len(inputs)) + " input values simulated")
print('Finished!!!!')
If you use an iterated map
function, it's pretty easy to keep track of progress.
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> def simFunction(x,y):
... import time
... time.sleep(2)
... return x**2 + y
...
>>> x,y = range(100),range(-100,100,2)
>>> res = Pool().imap(simFunction, x,y)
>>> with open('results.txt', 'w') as out:
... for i in x:
... out.write("%s\n" % res.next())
... if i%10 is 0:
... print "%s of %s simulated" % (i, len(x))
...
0 of 100 simulated
10 of 100 simulated
20 of 100 simulated
30 of 100 simulated
40 of 100 simulated
50 of 100 simulated
60 of 100 simulated
70 of 100 simulated
80 of 100 simulated
90 of 100 simulated
Or, you can use an asynchronous map
. Here I'll do things a little differently, just to mix it up.
>>> import time
>>> res = Pool().amap(simFunction, x,y)
>>> while not res.ready():
... print "waiting..."
... time.sleep(5)
...
waiting...
waiting...
waiting...
waiting...
>>> res.get()
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899]
Note that I'm using pathos.multiprocessing
instead of multiprocessing
. It's just a fork of multiprocessing
that enables you to do map
functions with multiple inputs, has much better serialization, and allows you to execute map
calls anywhere (not just in __main__
). You could use multiprocessing
to do the above as well, however the code would be very slightly different.
Either an iterated or asynchronous map
will enable you to write whatever code you want to do better process tracking. For example, pass a unique "id" to each job, and watch which come back, or have each job return it's process id. There are lots of ways to track progress and processes… but the above should give you a start.
You can get pathos
here: https://github.com/uqfoundation