Java: Watching a directory to move large files

nite picture nite · Jul 30, 2010 · Viewed 18.9k times · Source

I have been writing a program that watches a directory and when files are created in it, it changes the name and moves them to a new directory. In my first implementation I used Java's Watch Service API which worked fine when I was testing 1kb files. The problem that came up is that in reality the files getting created are anywhere from 50-300mb. When this happened the watcher API would find the file right away but could not move it because it was still being written. I tried putting the watcher in a loop (which generated exceptions until the file could be moved) but this seemed pretty inefficient.

Since that didn't work, I tried up using a timer that checks the folder every 10s and then moves files when it can. This is the method I ended up going for.

Question: Is there anyway to signal when a file is done being written without doing an exception check or continually comparing the size? I like the idea of using the Watcher API just once for each file instead of continually checking with a timer (and running into exceptions).

All responses are greatly appreciated!

nt

Answer

Jasper Krijgsman picture Jasper Krijgsman · Jan 24, 2011

I ran into the same problem today. I my usecase a small delay before the file is actually imported was not a big problem and I still wanted to use the NIO2 API. The solution I choose was to wait until a file has not been modified for 10 seconds before performing any operations on it.

The important part of the implementation is as follows. The program waits until the wait time expires or a new event occures. The expiration time is reset every time a file is modified. If a file is deleted before the wait time expires it is removed from the list. I use the poll method with a timeout of the expected expirationtime, that is (lastmodified+waitTime)-currentTime

private final Map<Path, Long> expirationTimes = newHashMap();
private Long newFileWait = 10000L;

public void run() {
    for(;;) {
        //Retrieves and removes next watch key, waiting if none are present.
        WatchKey k = watchService.take();

        for(;;) {
            long currentTime = new DateTime().getMillis();

            if(k!=null)
                handleWatchEvents(k);

            handleExpiredWaitTimes(currentTime);

            // If there are no files left stop polling and block on .take()
            if(expirationTimes.isEmpty())
                break;

            long minExpiration = min(expirationTimes.values());
            long timeout = minExpiration-currentTime;
            logger.debug("timeout: "+timeout);
            k = watchService.poll(timeout, TimeUnit.MILLISECONDS);
        }
    }
}

private void handleExpiredWaitTimes(Long currentTime) {
    // Start import for files for which the expirationtime has passed
    for(Entry<Path, Long> entry : expirationTimes.entrySet()) {
        if(entry.getValue()<=currentTime) {
            logger.debug("expired "+entry);
            // do something with the file
            expirationTimes.remove(entry.getKey());
        }
    }
}

private void handleWatchEvents(WatchKey k) {
    List<WatchEvent<?>> events = k.pollEvents();
    for (WatchEvent<?> event : events) {
        handleWatchEvent(event, keys.get(k));
    }
    // reset watch key to allow the key to be reported again by the watch service
    k.reset();
}

private void handleWatchEvent(WatchEvent<?> event, Path dir) throws IOException {
    Kind<?> kind = event.kind();

    WatchEvent<Path> ev = cast(event);
        Path name = ev.context();
        Path child = dir.resolve(name);

    if (kind == ENTRY_MODIFY || kind == ENTRY_CREATE) {
        // Update modified time
        FileTime lastModified = Attributes.readBasicFileAttributes(child, NOFOLLOW_LINKS).lastModifiedTime();
        expirationTimes.put(name, lastModified.toMillis()+newFileWait);
    }

    if (kind == ENTRY_DELETE) {
        expirationTimes.remove(child);
    }
}