I have a scenario where I have multiple threads adding to a queue and multiple threads reading from the same queue. If the queue reaches a specific size all threads that are filling the queue will be blocked on add until an item is removed from the queue.
The solution below is what I am using right now and my question is: How can this be improved? Is there an object that already enables this behavior in the BCL that I should be using?
internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
//todo: might be worth changing this into a proper QUEUE
private AutoResetEvent _FullEvent = new AutoResetEvent(false);
internal T this[int i]
{
get { return (T) List[i]; }
}
private int _MaxSize;
internal int MaxSize
{
get { return _MaxSize; }
set
{
_MaxSize = value;
checkSize();
}
}
internal BlockingCollection(int maxSize)
{
MaxSize = maxSize;
}
internal void Add(T item)
{
Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));
_FullEvent.WaitOne();
List.Add(item);
Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));
checkSize();
}
internal void Remove(T item)
{
lock (List)
{
List.Remove(item);
}
Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
}
protected override void OnRemoveComplete(int index, object value)
{
checkSize();
base.OnRemoveComplete(index, value);
}
internal new IEnumerator GetEnumerator()
{
return List.GetEnumerator();
}
private void checkSize()
{
if (Count < MaxSize)
{
Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
_FullEvent.Set();
}
else
{
Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
_FullEvent.Reset();
}
}
}
That looks very unsafe (very little synchronization); how about something like:
class SizeQueue<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly int maxSize;
public SizeQueue(int maxSize) { this.maxSize = maxSize; }
public void Enqueue(T item)
{
lock (queue)
{
while (queue.Count >= maxSize)
{
Monitor.Wait(queue);
}
queue.Enqueue(item);
if (queue.Count == 1)
{
// wake up any blocked dequeue
Monitor.PulseAll(queue);
}
}
}
public T Dequeue()
{
lock (queue)
{
while (queue.Count == 0)
{
Monitor.Wait(queue);
}
T item = queue.Dequeue();
if (queue.Count == maxSize - 1)
{
// wake up any blocked enqueue
Monitor.PulseAll(queue);
}
return item;
}
}
}
(edit)
In reality, you'd want a way to close the queue so that readers start exiting cleanly - perhaps something like a bool flag - if set, an empty queue just returns (rather than blocking):
bool closing;
public void Close()
{
lock(queue)
{
closing = true;
Monitor.PulseAll(queue);
}
}
public bool TryDequeue(out T value)
{
lock (queue)
{
while (queue.Count == 0)
{
if (closing)
{
value = default(T);
return false;
}
Monitor.Wait(queue);
}
value = queue.Dequeue();
if (queue.Count == maxSize - 1)
{
// wake up any blocked enqueue
Monitor.PulseAll(queue);
}
return true;
}
}