BlockingCollection的使用
BlockingCollection实现了生产者/消费者模式,是对IProducerConsumerCollection<T>接口的实现。与其他Concurrent集合一样,每次Add或Take元素,都会导致对集合的lock。只有当确定需要在内存中创建一个生产者,消费者模式时,再考虑这个类。
MSDN中的示例用法:
using (BlockingCollection<int> bc = new BlockingCollection<int>())
{
Task.Factory.StartNew(() =>
{
for (int i = 0; i < 1000; i++)
{
bc.Add(i);
Thread.Sleep(50);
}
// Need to do this to keep foreach below from hanging
bc.CompleteAdding();
});
// Now consume the blocking collection with foreach.
// Use bc.GetConsumingEnumerable() instead of just bc because the
// former will block waiting for completion and the latter will
// simply take a snapshot of the current state of the underlying collection.
foreach (var item in bc.GetConsumingEnumerable())
{
Console.WriteLine(item);
}
}
重点在于迭代时使用bc.GetConsumingEnumerable(),而不是直接对bc迭代。
可以在这里查看GetConsumingEnumerable的实现源码:
https://github.com/dotnet/corefx/blob/master/src/System.Collections.Concurrent/src/System/Collections/Concurrent/BlockingCollection.cs
public IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken)
{
CancellationTokenSource linkedTokenSource = null;
try
{
linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _consumersCancellationTokenSource.Token);
while (!IsCompleted)
{
T item;
if (TryTakeWithNoTimeValidation(out item, Timeout.Infinite, cancellationToken, linkedTokenSource))
{
yield return item;
}
}
}
finally
{
if (linkedTokenSource != null)
{
linkedTokenSource.Dispose();
}
}
}
会不断的check IsCompleted属性:
public bool IsCompleted
{
get
{
CheckDisposed();
return (IsAddingCompleted && (_occupiedNodes.CurrentCount == 0));
}
}
会判断生产者线程是否已经完成添加并且没有元素被消费。
参考链接:
https://msdn.microsoft.com/en-us/library/dd267312(v=vs.110).aspx