I'm trying out Multiple Producer - Multiple Consumer use case of Producer-Consumer problem. I'm using BlockingQueue for sharing common queue between multiple producers/consumers.
Below is my code.
Producer
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private BlockingQueue inputQueue;
private static volatile int i = 0;
private volatile boolean isRunning = true;
public Producer(BlockingQueue q){
this.inputQueue=q;
}
public synchronized void run() {
//produce messages
for(i=0; i<10; i++)
{
try {
inputQueue.put(new Integer(i));
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Produced "+i);
}
finish();
}
public void finish() {
//you can also clear here if you wanted
isRunning = false;
}
}
Consumer
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
private BlockingQueue inputQueue;
private volatile boolean isRunning = true;
private final Integer POISON_PILL = new Integer(-1);
Consumer(BlockingQueue queue) {
this.inputQueue = queue;
}
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(!inputQueue.isEmpty()) {
try {
Integer queueElement = (Integer) inputQueue.take();
System.out.println("Consumed : " + queueElement.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("Queue ");
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
//you can also clear here if you wanted
isRunning = false;
inputQueue.add(POISON_PILL);
}
}
Test Class
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerService {
public static void main(String[] args) {
//Creating BlockingQueue of size 10
BlockingQueue queue = new ArrayBlockingQueue(10);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
//starting producer to produce messages in queue
new Thread(producer).start();
//starting producer to produce messages in queue
new Thread(producer).start();
//starting consumer to consume messages from queue
new Thread(consumer).start();
//starting consumer to consume messages from queue
new Thread(consumer).start();
System.out.println("Producer and Consumer has been started");
}
}
I don't see the correct output when I ran the below code.
Is there any mistake that I'm doing here ?
There is quite a bit of your code which doesn't make sense. I suggest you sit down and work out why the code is there and what it is doing.
If you deleted the isFinshed
flag, nothing would change.
If you deleted the use of synchronized
in the producer you would have concurrent producers. There is no benefit in making a field which is only accessed in a synchronzied block volatile.
It makes no sense for producers to share a loop counter, if they are to be concurrent.
Normally, a producer sends a poison pill, and a consumer doesn't consumer the pill. e.g. if you have two consumers, one might add the pill and the other might consume it. Your consumer ignores poison pills, as it ignores the isFinished
flag.
You don't want to stop the consumer just because the queue is temporarily empty. Otherwise it will not see all the message the producer produces, possibly none of them.