combining python watchdog with multiprocessing or threading

havok2063 picture havok2063 · Feb 19, 2014 · Viewed 9.7k times · Source

I'm using Python's Watchdog to monitor a given directory for new files being created. When a file is created, some code runs that spawns a subprocess shell command to run different code to process this file. This should run for every new file that is created. I've tested this out when one file is created, and things work great, but am having trouble getting it working when multiple files are created, either at the same time, or one after another.

My current problem is this... the processing code run in the shell takes a while to run and will not finish before a new file is created in the directory. There's nothing I can do about that. While this code is running, watchdog will not recognize that a new file has been created, and will not proceed with the code.

So I think I need to spawn a new process for each new file, or do something get things to run concurrently, and not wait until one file is done before processing the next one.

So my questions are:

1.) In reality I will have 4 files, in different series, created at the same time, in one directory. What's the best way get watchdog to run the code on file creation for all 4 files at once?

2.) When the code is running for one file, how do I get watchdog to begin processing the next file in the same series without waiting until processing for the previous file has completed. This is necessary because the files are particular and I need to pause the processing of one file until another file is finished, but the order in which they are created may vary.

Do I need to combine my watchdog with multiprocessing or threading somehow? Or do I need to implement multiple observers? I'm kind of at a loss. Thanks for any help.

class MonitorFiles(FileSystemEventHandler):
    '''Sub-class of watchdog event handler'''

    def __init__(self, config=None, log=None):
        self.log = log
        self.config = config

    def on_created(self, event):
        file = os.path.basename(event.src_path)
        self.log.info('Created file {0}'.format(event.src_path))
        dosWatch.go(event.src_path, self.config, self.log)

    def on_modified(self, event):
        file = os.path.basename(event.src_path)
        ext = os.path.splitext(file)[1]
        if ext == '.fits':
            self.log.warning('Modifying a FITS file is not allowed')
            return

    def on_deleted(self, event):
        self.log.critical('Nothing should ever be deleted from here!')
        return      

Main Monitoring

def monitor(config, log):
    '''Uses the Watchdog package to monitor the data directory for new files.
    See the MonitorFiles class in dosClasses for actual monitoring code'''

    event_handler = dosclass.MonitorFiles(config, log)

    # add logging the the event handler
    log_handler = LoggingEventHandler()

    # set up observer
    observer = Observer()
    observer.schedule(event_handler, path=config.fitsDir, recursive=False)
    observer.schedule(log_handler, config.fitsDir, recursive=False)
    observer.start()
    log.info('Begin MaNGA DOS!')
    log.info('Start watching directory {0} for new files ...'.format(config.fitsDir))

    # monitor
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.unschedule_all()
        observer.stop()
        log.info('Stop watching directory ...')
        log.info('End MaNGA DOS!')
        log.info('--------------------------')
        log.info('')
    observer.join() 

In the above, my monitor method sets up watchdog to monitor the main directory. The MonitorFiles class defines what happens when a file is created. It basically calls this dosWatch.go method which eventually calls a subprocess.Popen to run a shell command.

Answer

havok2063 picture havok2063 · Mar 3, 2014

Here's what I ended up doing, which solved my problem. I used multiprocessing to start a separate watchdog monitoring process to watch for each file separately. Watchdog already queues up new files for me, which is fine for me.

As for point 2 above, I needed, e.g. a file2 to process before a file1, even though file1 was created first. So during file1 I check for the output of the file2 processing. If it finds it, it goes ahead processing file1. If it doesn't it exits. On file2 processing, I check to see if file1 was created already, and if so, then process file1. (Code for this not shown)

Main Monitoring of Cameras

def monitorCam(camera, config, mainlog):
    '''Uses the Watchdog package to monitor the data directory for new files.
    See the MonitorFiles class in dosClasses for actual monitoring code.  Monitors each camera.'''

    mainlog.info('Process Name, PID: {0},{1}'.format(mp.current_process().name,mp.current_process().pid))

    #init cam log
    camlog = initLogger(config, filename='manga_dos_{0}'.format(camera))
    camlog.info('Camera {0}, PID {1} '.format(camera,mp.current_process().pid))
    config.camera=camera

    event_handler = dosclass.MonitorFiles(config, camlog, mainlog)

    # add logging the the event handler
    log_handler = LoggingEventHandler()

    # set up observer
    observer = Observer()
    observer.schedule(event_handler, path=config.fitsDir, recursive=False)
    observer.schedule(log_handler, config.fitsDir, recursive=False)
    observer.daemon=True
    observer.start()
    camlog.info('Begin MaNGA DOS!')
    camlog.info('Start watching directory {0} for new files ...'.format(config.fitsDir))
    camlog.info('Watching directory {0} for new files from camera {1}'.format(config.fitsDir,camera))

    # monitor
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.unschedule_all()
        observer.stop()
        camlog.info('Stop watching directory ...')
        camlog.info('End MaNGA DOS!')
        camlog.info('--------------------------')
        camlog.info('')
    #observer.join()

    if observer.is_alive():
        camlog.info('still alive')
    else:
        camlog.info('thread ending')    

Start of Multiple Camera Processes

def startProcess(camera,config,log):
    ''' Uses multiprocessing module to start 4 different camera monitoring processes'''

    jobs=[]

    #pdb.set_trace()

    #log.info(mp.log_to_stderr(logging.DEBUG))
    for i in range(len(camera)):
        log.info('Starting to monitor camera {0}'.format(camera[i]))
        print 'Starting to monitor camera {0}'.format(camera[i])
        try:
            p = mp.Process(target=monitorCam, args=(camera[i],config, log), name=camera[i])
            p.daemon=True
            jobs.append(p)
            p.start()
        except KeyboardInterrupt:
            log.info('Ending process: {0} for camera {1}'.format(mp.current_process().pid, camera[i]))
            p.terminate()
            log.info('Terminated: {0}, {1}'.format(p,p.is_alive()))

    for i in range(len(jobs)):
        jobs[i].join()  

    return