Process list of 'N' items with multiple threads

Sajjad picture Sajjad · Jun 4, 2015 · Viewed 9.8k times · Source

I have List of N items and I want to divide this List in a sequential manner between a fixed number of threads.

By sequential I mean, I want to pass 1 to N/4to first thread , N/4 + 1 to N/2to second thread and N/2+1 to N to third thread, Now once all the threads have finished their work, I want to notify to main thread to send some message that all the processing has been completed.

What I have done so far now is that I have implemented ExecutorService

I did something like this

ExecutorService threadPool = Executors.newFixedThreadPool(Number_of_threads); 
                //List of items List
                List <items>itemList = getList(); 
                  for (int i = 0 i < Number_of_threads ;i++ ) { 
                    //how to divide list here sequentially and pass it to some processor while will process those items.
                    Runnable processor = new Processor(Start, End)
                    executor.execute(process);
                   }
                  if(executor.isTerminated()){
                    logger.info("All threads completed");
                  }
  • How to divide list in sequential chunks?
  • is there a better way to achieve such functionality ?

Answer

starikoff picture starikoff · Jun 5, 2015

If what you want is to make all threads finish processing as fast as possible and the number of items is not huge then just post one Runnable per item into a newFixedThreadPool(NUMBER_OF_THREADS):

    ExecutorService exec = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
    List<Future<?>> futures = new ArrayList<Future<?>>(NUMBER_OF_ITEMS);
    for (Item item : getItems()) {
        futures.add(exec.submit(new Processor(item)));
    }
    for (Future<?> f : futures) {
        f.get(); // wait for a processor to complete
    }
    logger.info("all items processed");

If you really want to give each thread a continuous portion of the list (but still want them to finish as fast as possible, and also expect that processing each item takes approximately the same amount of time), then split the items as "evenly" as you can so that the maximum number of items per thread differed from the minimum number by no more than one (example: 14 items, 4 threads, then you want the splitting to be [4,4,3,3], not e.g. [3,3,3,5]). For that, your code would be e.g.

    ExecutorService exec = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
    List<Item> items = getItems();
    int minItemsPerThread = NUMBER_OF_ITEMS / NUMBER_OF_THREADS;
    int maxItemsPerThread = minItemsPerThread + 1;
    int threadsWithMaxItems = NUMBER_OF_ITEMS - NUMBER_OF_THREADS * minItemsPerThread;
    int start = 0;
    List<Future<?>> futures = new ArrayList<Future<?>>(NUMBER_OF_ITEMS);
    for (int i = 0; i < NUMBER_OF_THREADS; i++) {
        int itemsCount = (i < threadsWithMaxItems ? maxItemsPerThread : minItemsPerThread);
        int end = start + itemsCount;
        Runnable r = new Processor(items.subList(start, end));
        futures.add(exec.submit(r));
        start = end;
    }
    for (Future<?> f : futures) {
        f.get();
    }
    logger.info("all items processed");