Multiple Producer Multiple Consumer Multithreading Java

sgokhales picture sgokhales · Aug 20, 2014 · Viewed 17.6k times · Source

I'm trying out Multiple Producer - Multiple Consumer use case of Producer-Consumer problem. I'm using BlockingQueue for sharing common queue between multiple producers/consumers.

Below is my code.
Producer

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

    private BlockingQueue inputQueue;
    private static volatile int i = 0;
    private volatile boolean isRunning = true;

    public Producer(BlockingQueue q){
        this.inputQueue=q;
    }

    public synchronized void run() {

        //produce messages
        for(i=0; i<10; i++) 
        {
            try {
                inputQueue.put(new Integer(i));

                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Produced "+i);
        }
        finish();
    }

    public void finish() {
        //you can also clear here if you wanted
        isRunning = false;
    }

}

Consumer

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {

    private BlockingQueue inputQueue;
    private volatile boolean isRunning = true;

    private final Integer POISON_PILL = new Integer(-1);

    Consumer(BlockingQueue queue) {
        this.inputQueue = queue;
    }

    public void run() {
        //worker loop keeps taking en element from the queue as long as the producer is still running or as 
        //long as the queue is not empty:
        while(!inputQueue.isEmpty()) {

            try {
                Integer queueElement = (Integer) inputQueue.take();
                System.out.println("Consumed : " + queueElement.toString());

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println("Queue ");
    }

    //this is used to signal from the main thread that he producer has finished adding stuff to the queue
    public void finish() {
        //you can also clear here if you wanted
        isRunning = false;
        inputQueue.add(POISON_PILL);
    }
}

Test Class

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class ProducerConsumerService {

    public static void main(String[] args) {

        //Creating BlockingQueue of size 10
        BlockingQueue queue = new ArrayBlockingQueue(10);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        //starting producer to produce messages in queue
        new Thread(producer).start();

        //starting producer to produce messages in queue
        new Thread(producer).start();

        //starting consumer to consume messages from queue
        new Thread(consumer).start();

        //starting consumer to consume messages from queue
        new Thread(consumer).start();

        System.out.println("Producer and Consumer has been started");
    }

}

I don't see the correct output when I ran the below code.

Is there any mistake that I'm doing here ?

Answer

Peter Lawrey picture Peter Lawrey · Aug 20, 2014

There is quite a bit of your code which doesn't make sense. I suggest you sit down and work out why the code is there and what it is doing.

If you deleted the isFinshed flag, nothing would change.

If you deleted the use of synchronized in the producer you would have concurrent producers. There is no benefit in making a field which is only accessed in a synchronzied block volatile.

It makes no sense for producers to share a loop counter, if they are to be concurrent. Normally, a producer sends a poison pill, and a consumer doesn't consumer the pill. e.g. if you have two consumers, one might add the pill and the other might consume it. Your consumer ignores poison pills, as it ignores the isFinished flag.

You don't want to stop the consumer just because the queue is temporarily empty. Otherwise it will not see all the message the producer produces, possibly none of them.