C# Producer/Consumer pattern

Southsouth picture Southsouth · Sep 3, 2009 · Viewed 20.6k times · Source

I have simple one-producer/two-consumers code as follows but the output shows that only C2 is consuming. Are there any bugs in my code?

class Program
{
    static void Main(string[] args)
    {
        Object lockObj = new object();
        Queue<string>  queue = new Queue<string>();
        Producer p = new Producer(queue, lockObj);
        Consumer c1 = new Consumer(queue, lockObj, "c1");
        Consumer c2 = new Consumer(queue, lockObj, "c2");

        Thread t1 = new Thread(c1.consume);
        Thread t2 = new Thread(c2.consume);
        t1.Start();
        t2.Start();

        Thread t = new Thread(p.produce);
        t.Start();

        Console.ReadLine();
    } 
}
public class Producer
{
    Queue<string> queue;
    Object lockObject;
    static int seq = 0;
    public Producer(Queue<string> queue, Object lockObject)
    {
        this.queue = queue;
        this.lockObject = lockObject; 
    }

    public void produce()
    {
        while( seq++ <15) //just testinng 15 items
        {
            lock (lockObject)
            {
                string item = "item" + seq;
                queue.Enqueue(item);
                Console.WriteLine("Producing {0}", item);
                if (queue.Count == 1)
                { // first
                    Monitor.PulseAll(lockObject);
                }
            }
        }
    }

}

public class Consumer
{
    Queue<string> queue;
    Object lockObject;
    string name;
    public Consumer(Queue<string> queue, Object lockObject, string name)
    {
        this.queue = queue;
        this.lockObject = lockObject; 
        this.name = name;
    }

    public void consume()
    {
        string item;
        while (true)
        {
            lock (lockObject)
            {
                if (queue.Count == 0)
                { 
                    Monitor.Wait(lockObject);
                    continue; 
                }
                item = queue.Dequeue();
                Console.WriteLine(" {0} Consuming {1}", name, item);
            }
        }
    }
}

The output is:

Producing item1
 c2 Consuming item1

Producing item2
 c2 Consuming item2

Producing item3
 c2 Consuming item3

Producing item4
 c2 Consuming item4

Producing item5
 c2 Consuming item5

Producing item6
 c2 Consuming item6

Producing item7
 c2 Consuming item7

Producing item8
 c2 Consuming item8

Producing item9
 c2 Consuming item9

Producing item10
 c2 Consuming item10

Producing item11
 c2 Consuming item11

Producing item12
 c2 Consuming item12

Producing item13
 c2 Consuming item13

Producing item14
 c2 Consuming item14

Producing item15
 c2 Consuming item15

Answer

Ariel Popovsky picture Ariel Popovsky · Sep 3, 2009

First, I can't reproduce your problem, here both threads consume some of the items. I guess your machine is faster but adding Sleep like gw suggest will solve that. What I would also suggest is that you don't try to sync the producer, I mean let it queue items as fast as it can and let the consumers sync to see who handles each item. I made a quick modification and it seems to be working fine:

static void Main()
    {
        Object lockObj = new object();
        Queue<string> queue = new Queue<string>();
        Producer p = new Producer(queue);
        Comsumer c1 = new Comsumer(queue, lockObj, "c1");
        Comsumer c2 = new Comsumer(queue, lockObj, "c2");

        Thread t1 = new Thread(c1.consume);
        Thread t2 = new Thread(c2.consume);
        t1.Start();
        t2.Start();

        Thread t = new Thread(p.produce);
        t.Start();

        Console.ReadLine();
    }
}
public class Producer
{
    Queue<string> queue;
    static int seq;
    public Producer(Queue<string> queue)
    {
        this.queue = queue;
    }

    public void produce()
    {
        while (seq++ < 1000) //just testinng 15 items
        {
            string item = "item" + seq;
            queue.Enqueue(item);
            Console.WriteLine("Producing {0}", item);                
        }
    }
}

public class Comsumer
{
    Queue<string> queue;
    Object lockObject;
    string name;
    public Comsumer(Queue<string> queue, Object lockObject, string name)
    {
        this.queue = queue;
        this.lockObject = lockObject;
        this.name = name;
    }

    public void consume()
    {
        string item;
        while (true)
        {
            lock (lockObject)
            {
                if (queue.Count == 0)
                {
                    continue;
                }
                item = queue.Dequeue();
                Console.WriteLine(" {0} Comsuming {1}", name, item);
            }                
        }
    }
}

You may also add the sleep to slow down the consumer loops.