Python multiprocessing safely writing to a file

Big Dogg picture Big Dogg · Nov 19, 2012 · Viewed 70.2k times · Source

I am trying to solve a big numerical problem which involves lots of subproblems, and I'm using Python's multiprocessing module (specifically Pool.map) to split up different independent subproblems onto different cores. Each subproblem involves computing lots of sub-subproblems, and I'm trying to effectively memoize these results by storing them to a file if they have not been computed by any process yet, otherwise skip the computation and just read the results from the file.

I'm having concurrency issues with the files: different processes sometimes check to see if a sub-subproblem has been computed yet (by looking for the file where the results would be stored), see that it hasn't, run the computation, then try to write the results to the same file at the same time. How do I avoid writing collisions like this?

Answer

MikeHunter picture MikeHunter · Nov 23, 2012

@GP89 mentioned a good solution. Use a queue to send the writing tasks to a dedicated process that has sole write access to the file. All the other workers have read only access. This will eliminate collisions. Here is an example that uses apply_async, but it will work with map too:

import multiprocessing as mp
import time

fn = 'c:/temp/temp.txt'

def worker(arg, q):
    '''stupidly simulates long running process'''
    start = time.clock()
    s = 'this is a test'
    txt = s
    for i in range(200000):
        txt += s 
    done = time.clock() - start
    with open(fn, 'rb') as f:
        size = len(f.read())
    res = 'Process' + str(arg), str(size), done
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''

    with open(fn, 'w') as f:
        while 1:
            m = q.get()
            if m == 'kill':
                f.write('killed')
                break
            f.write(str(m) + '\n')
            f.flush()

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()    
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    jobs = []
    for i in range(80):
        job = pool.apply_async(worker, (i, q))
        jobs.append(job)

    # collect results from the workers through the pool result queue
    for job in jobs: 
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()
    pool.join()

if __name__ == "__main__":
   main()