I am interested in a data structure identical to the Java BlockingQueue, with the exception that it must be able to batch objects in the queue. In other words, I would like the producer to be able to put objects into the queue, but have the consumer block on take()
untill the queue reaches a certain size (the batch size).
Then, once the queue has reached the batch size, the producer must block on put()
untill the consumer has consumed all of the elements in the queue (in which case the producer will start producing again and the consumer block untill the batch is reached again).
Does a similar data structure exist? Or should I write it (which I don't mind), I just don't want to waste my time if there is something out there.
UPDATE
Maybe to clarify things a bit:
The situation will always be as follows. There can be multiple producers adding items to the queue, but there will never be more than one consumer taking items from the queue.
Now, the problem is that there are multiple of these setups in parallel and serial. In other words, producers produce items for multiple queues, while consumers in their own right can also be producers. This can be more easily thought of as a directed graph of producers, consumer-producers, and finally consumers.
The reason that producers should block until the queues are empty (@Peter Lawrey) is because each of these will be running in a thread. If you leave them to simply produce as space becomes available, you will end up with a situation where you have too many threads trying to process too many things at once.
Maybe coupling this with an execution service could solve the problem?
I would suggest you use BlockingQueue.drainTo(Collection, int). You can use it with take() to ensure you get a minimum number of elements.
The advantage of using this approach is that your batch size grows dynamically with the workload and the producer doesn't have to block when the consumer is busy. i.e. it self optimises for latency and throughput.
To implement exactly as asked (which I think is a bad idea) you can use a SynchronousQueue with a busy consuming thread.
i.e. the consuming thread does a
list.clear();
while(list.size() < required) list.add(queue.take());
// process list.
The producer will block when ever the consumer is busy.