I am using concurrent.futures.ProcessPoolExecutor
to find the occurrence of a number from a number range. The intent is to investigate the amount of speed-up performance gained from concurrency. To benchmark performance, I have a control - a serial code to perform said task (shown below). I have written 2 concurrent codes, one using concurrent.futures.ProcessPoolExecutor.submit()
and the other using concurrent.futures.ProcessPoolExecutor.map()
to perform the same task. They are shown below. Advice on drafting the former and latter can be seen here and here, respectively.
The task issued to all three codes was to find the number of occurrences of the number 5 in the number range of 0 to 1E8. Both .submit()
and .map()
were assigned 6 workers, and .map()
had a chunksize of 10,000. The manner to discretise the workload were identical in the concurrent codes. However, the function used to find occurrences in both codes were different. This was because the way arguments were passed to a function called by .submit()
and .map()
were different.
All 3 codes reported the same number of occurrences, i.e. 56,953,279 times. However, the time taken to complete the task were very different. .submit()
performed 2 times faster than the control while .map()
took twice as long as the control to complete it's task.
Questions:
.map()
is an artifact of my coding or it is inherently slow?" If the former, how can I improve it. I am just surprise that it performed slower than the control as there will be no much incentive to use it..submit()
code perform even faster. A condition I have is that the function _concurrent_submit()
must return an iterable with the numbers/occurrences containing the number 5.concurrent.futures.ProcessPoolExecutor.submit()
#!/usr/bin/python3.5
# -*- coding: utf-8 -*-
import concurrent.futures as cf
from time import time
from traceback import print_exc
def _findmatch(nmin, nmax, number):
'''Function to find the occurrence of number in range nmin to nmax and return
the found occurrences in a list.'''
print('\n def _findmatch', nmin, nmax, number)
start = time()
match=[]
for n in range(nmin, nmax):
if number in str(n):
match.append(n)
end = time() - start
print("found {0} in {1:.4f}sec".format(len(match),end))
return match
def _concurrent_submit(nmax, number, workers):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
find the occurences of a given number in a number range in a parallelised
manner.'''
# 1. Local variables
start = time()
chunk = nmax // workers
futures = []
found =[]
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
for i in range(workers):
cstart = chunk * i
cstop = chunk * (i + 1) if i != workers - 1 else nmax
futures.append(executor.submit(_findmatch, cstart, cstop, number))
# 2.2. Instruct workers to process results as they come, when all are
# completed or .....
cf.as_completed(futures) # faster than cf.wait()
# 2.3. Consolidate result as a list and return this list.
for future in futures:
for f in future.result():
try:
found.append(f)
except:
print_exc()
foundsize = len(found)
end = time() - start
print('within statement of def _concurrent_submit():')
print("found {0} in {1:.4f}sec".format(foundsize, end))
return found
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
workers = 6 # Pool of workers
start = time()
a = _concurrent_submit(nmax, number, workers)
end = time() - start
print('\n main')
print('workers = ', workers)
print("found {0} in {1:.4f}sec".format(len(a),end))
concurrent.futures.ProcessPoolExecutor.map()
#!/usr/bin/python3.5
# -*- coding: utf-8 -*-
import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc
def _findmatch(listnumber, number):
'''Function to find the occurrence of number in another number and return
a string value.'''
#print('def _findmatch(listnumber, number):')
#print('listnumber = {0} and ref = {1}'.format(listnumber, number))
if number in str(listnumber):
x = listnumber
#print('x = {0}'.format(x))
return x
def _concurrent_map(nmax, number, workers):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a parallelised
manner.'''
# 1. Local variables
start = time()
chunk = nmax // workers
futures = []
found =[]
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
for i in range(workers):
cstart = chunk * i
cstop = chunk * (i + 1) if i != workers - 1 else nmax
numberlist = range(cstart, cstop)
futures.append(executor.map(_findmatch, numberlist,
itertools.repeat(number),
chunksize=10000))
# 2.3. Consolidate result as a list and return this list.
for future in futures:
for f in future:
if f:
try:
found.append(f)
except:
print_exc()
foundsize = len(found)
end = time() - start
print('within statement of def _concurrent(nmax, number):')
print("found {0} in {1:.4f}sec".format(foundsize, end))
return found
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
workers = 6 # Pool of workers
start = time()
a = _concurrent_map(nmax, number, workers)
end = time() - start
print('\n main')
print('workers = ', workers)
print("found {0} in {1:.4f}sec".format(len(a),end))
Serial Code:
#!/usr/bin/python3.5
# -*- coding: utf-8 -*-
from time import time
def _serial(nmax, number):
start = time()
match=[]
nlist = range(nmax)
for n in nlist:
if number in str(n):match.append(n)
end=time()-start
print("found {0} in {1:.4f}sec".format(len(match),end))
return match
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
start = time()
a = _serial(nmax, number)
end = time() - start
print('\n main')
print("found {0} in {1:.4f}sec".format(len(a),end))
Update 13th Feb 2017:
In addition to @niemmi answer, I have provide an answer following some personal research to show:
.map()
and .submit()
solutions, andProcessPoolExecutor.map()
can led to more speed-up than ProcessPoolExecutor.submit()
.Overview:
There are 2 parts to my answer:
ProcessPoolExecutor.map()
solution.ProcessPoolExecutor
's subclasses .submit()
and .map()
yield non-equivalent compute times.=======================================================================
Part 1: More Speed-up for ProcessPoolExecutor.map()
Background:
This section builds on @niemmi's .map()
solution, which by itself is excellent. While doing some research on his discretization scheme to better understand how that interact with .map() chunksizes arguement, I found this interesting solution.
I regard @niemmi's definition of chunk = nmax // workers
to be a definition for chunksize, i.e. a smaller size of actual number range (given task) to be tackled by each worker in the worker pool. Now, this definition is premised on the assumption that if a computer has x number of workers, dividing the task equally among each worker will result in optimum use of each worker and hence the total task will be completed fastest. Therefore, the number of chunks to break up a given task into should always equal the number of pool workers. However, is this assumption correct?
Proposition: Here, I propose that the above assumption does not always lead to the fastest compute time when used with ProcessPoolExecutor.map()
. Rather, discretising a task to an amount greater than the number of pool workers can lead to speed-up, i.e. faster completion of a given task.
Experiment: I have modified @niemmi's code to allow the number of discretized tasks to exceed the number of pool workers. This code is given below and used to fin the number of times the number 5 appears in the number range of 0 to 1E8. I have executed this code using 1, 2, 4, and 6 pool workers and for various ratio of number of discretized tasks vs the number of pool workers. For each scenario, 3 runs were made and the compute times were tabulated. "Speed-up" is defined here as the average compute time using equal number of chunks and pool workers over the average compute time of when the number of discretized tasks is greater than the number of pool workers.
Findings:
Figure on left shows the compute time taken by all the scenarios mentioned in the experiment section. It shows that the compute time taken by number of chunks / number of workers = 1 is always greater than the compute time taken by number of chunks > number of workers. That is, the former case is always less efficient than the latter.
Figure on right shows that a speed-up of 1.2 times or more was gained when the number of chunks / number of workers reach a threshold value of 14 or more. It is interesting to observe that the speed-up trend also occurred when ProcessPoolExecutor.map()
was executed with 1 worker.
Conclusion: When customizing the number of discrete tasks that ProcessPoolExecutor.map()` should use to solve a given task, it is prudent to ensure that this number is greater than the number pool workers as this practice shortens compute time.
concurrent.futures.ProcessPoolExecutor.map() code. (revised parts only)
def _concurrent_map(nmax, number, workers, num_of_chunks):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a parallelised
manner.'''
# 1. Local variables
start = time()
chunksize = nmax // num_of_chunks
futures = []
found =[]
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
cstart = (chunksize * i for i in range(num_of_chunks))
cstop = (chunksize * i if i != num_of_chunks else nmax
for i in range(1, num_of_chunks + 1))
futures = executor.map(_findmatch, cstart, cstop,
itertools.repeat(number))
# 2.2. Consolidate result as a list and return this list.
for future in futures:
#print('type(future)=',type(future))
for f in future:
if f:
try:
found.append(f)
except:
print_exc()
foundsize = len(found)
end = time() - start
print('\n within statement of def _concurrent(nmax, number):')
print("found {0} in {1:.4f}sec".format(foundsize, end))
return found
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
workers = 4 # Pool of workers
chunks_vs_workers = 14 # A factor of =>14 can provide optimum performance
num_of_chunks = chunks_vs_workers * workers
start = time()
a = _concurrent_map(nmax, number, workers, num_of_chunks)
end = time() - start
print('\n main')
print('nmax={}, workers={}, num_of_chunks={}'.format(
nmax, workers, num_of_chunks))
print('workers = ', workers)
print("found {0} in {1:.4f}sec".format(len(a),end))
=======================================================================
Part 2: Total compute time from using ProcessPoolExecutor subclasses .submit() and .map() can be dissimilar when returning a sorted/ordered result list.
Background: I have amended both the .submit()
and .map()
codes to allow an "apple-to-apple" comparison of their compute time and the ability to visualize the compute time of the main code, the compute time of the _concurrent method called by the main code to performs the concurrent operations, and the compute time for each discretized task/worker called by the _concurrent method. Furthermore, the concurrent method in these codes was structured to return an unordered and ordered list of the result directly from the future object of .submit()
and the iterator of .map()
. Source code is provided below (Hope it helps you.).
Experiments These two newly improved codes were used to perform the same experiment described in Part 1, save that only 6 pool workers were considered and the python built-in list
and sorted
methods were used to return an unordered and ordered list of the results to the main section of the code, respectively.
ProcessPoolExecutor.submit()
, and to create the iterator of ProcessPoolExecutor.map()
, as a function of the number of discretized task over the number of pool workers, are equivalent. This result simply means that the ProcessPoolExecutor
sub-classes .submit()
and .map()
are equally efficient/fast.list
and sorted
methods (and that of the other methods encased within these methods). Clearly seen, the list
method took less compute time to return a result list than the sorted
method. The average compute times of the list
method for both the .submit() and .map() codes were similar, at ~0.47sec. The average compute time of the sorted method for the .submit() and .map() codes was 1.23sec and 1.01sec, respectively. In other words, the list
method performed 2.62 times and 2.15 times faster than sorted
method for the .submit() and .map() codes, respectively. sorted
method generated an ordered list from
.map()
faster than from .submit()
, as the number of discretized
tasks increased more than the number of pool workers, save when the
number of discretized tasks equaled the number of pool workers.
That said, these findings shows that the decision to use the equally fast .submit()
or .map()
sub-classes can be encumbered by the sorted method. For example, if the intent is to generate an ordered list in the shortest time possible, the use of ProcessPoolExecutor.map() should be preferred over ProcessPoolExecutor.submit()
as .map()
can allow the shortest total compute time. .submit()
and .map()
sub-classes. The amount of speed-up can be as much as 20% over the case when the number of discretized tasks equaled the number of pool workers.Improved .map() code
#!/usr/bin/python3.5
# -*- coding: utf-8 -*-
import concurrent.futures as cf
from time import time
from itertools import repeat, chain
def _findmatch(nmin, nmax, number):
'''Function to find the occurence of number in range nmin to nmax and return
the found occurences in a list.'''
start = time()
match=[]
for n in range(nmin, nmax):
if number in str(n):
match.append(n)
end = time() - start
#print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
# format(nmin, nmax, number, len(match),end))
return match
def _concurrent(nmax, number, workers, num_of_chunks):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a concurrent
manner.'''
# 1. Local variables
start = time()
chunksize = nmax // num_of_chunks
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
cstart = (chunksize * i for i in range(num_of_chunks))
cstop = (chunksize * i if i != num_of_chunks else nmax
for i in range(1, num_of_chunks + 1))
futures = executor.map(_findmatch, cstart, cstop, repeat(number))
end = time() - start
print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
print("found in {0:.4f}sec".format(end))
return list(chain.from_iterable(futures)) #Return an unordered result list
#return sorted(chain.from_iterable(futures)) #Return an ordered result list
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
workers = 6 # Pool of workers
chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance
num_of_chunks = chunks_vs_workers * workers
start = time()
found = _concurrent(nmax, number, workers, num_of_chunks)
end = time() - start
print('\n main')
print('nmax={}, workers={}, num_of_chunks={}'.format(
nmax, workers, num_of_chunks))
#print('found = ', found)
print("found {0} in {1:.4f}sec".format(len(found),end))
Improved .submit() code.
This code is same as .map code except you replace the _concurrent method with the following:
def _concurrent(nmax, number, workers, num_of_chunks):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
find the occurrences of a given number in a number range in a concurrent
manner.'''
# 1. Local variables
start = time()
chunksize = nmax // num_of_chunks
futures = []
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
for i in range(num_of_chunks):
cstart = chunksize * i
cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
futures.append(executor.submit(_findmatch, cstart, cstop, number))
end = time() - start
print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
print("found in {0:.4f}sec".format(end))
return list(chain.from_iterable(f.result() for f in cf.as_completed(
futures))) #Return an unordered list
#return list(chain.from_iterable(f.result() for f in cf.as_completed(
# futures))) #Return an ordered list
=======================================================================