.NET Core 实战 [No.200] 跨线程访问 BlockingCollection 集合
BlockingCollection<T>
类似于 ConcurrentBag<T>
,也是一个用于多线程访问的集合类,但是功能上要强大很多。
BlockingCollection<T>
本身实现了类似于消息队列(MQ)的生产者 - 消费者模式。可以设置集合的容量上限。
只能在创建实例时设置。
通过 BoundedCapacity 只读属性可以获取其容量上限。
若未指定,则默认值为 int.MaxValue,此时 BoundedCapacity 属性值为 -1。集合为空时移除(Take)操作会被阻塞,集合满时新增(Add)操作会被阻塞。
消费者可以阻塞处理,直到生产者新增。
而当消费者消费过慢导致集合堆积达到容量上限时,会阻塞新增操作,直到消费者消费后释放了容量空间。如果不想新增和移除处理被阻塞,
BlockingCollection<T>
也提供了 TryAdd 和 TryTake 方法。生产者可以调用 CompleteAdding 方法来标记生产已结束,此时消费者可以根据 IsCompleted 来判断是否所有的项都已被消费完毕。
封装实现了
IProducerConsumerCollection<T>
的任何集合类型。在上一篇博客中介绍了
ConcurrentBag<T>
,并在最后提到了ConcurrentQueue<T>
和ConcurrentStack<T>
类,这三种类型都实现了IProducerConsumerCollection<T>
接口。可在创建实例时指定封装的集合类型,默认为
ConcurrentQueue<T>
类型。csharpBlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000);
支持使用
CancellationToken
取消 Take、Add 等操作。所有
BlockingCollection<T>
的操作都有带CancellationToken
参数的重载。
取消时会触发OperationCanceledException
异常,需要时可以手动捕捉该异常来响应取消请求。
下面是示例代码同上一篇博客一样,只是改成使用 BlockingCollection<T>
来实现。
由于 BlockingCollection<T>
实现了 IDisposable
接口,所以推荐使用 using
的方式创建实例。
using (BlockingCollection<int> bc = new BlockingCollection<int>(10))
{
Task t1 = Task.Run(() => {
for (int i = 0; i < 20; i++)
{
Console.WriteLine($"即将添加元素:{i}");
bc.Add(i);
}
bc.CompleteAdding();
});
Task t2 = Task.Run(() => {
while (true)
{
if (bc.TryTake(out int item))
{
Console.WriteLine($"T2 已取出元素:{item}");
}
if (bc.IsCompleted) break;
}
});
Task t3 = Task.Run(() => {
while (true)
{
if (bc.TryTake(out int item))
{
Console.WriteLine($"T3 已取出元素:{item}");
}
if (bc.IsCompleted) break;
}
});
Task.WaitAll(t1, t2, t3);
t1.Dispose();
t2.Dispose();
t3.Dispose();
}
执行结果如下:
上面的示例中移除集合项使用的是 TryTake 方法,没有利用到 Take 方法的阻塞效果。
改成使用 Take 方法后代码如下。
using (BlockingCollection<int> bc = new BlockingCollection<int>(10))
{
Task t1 = Task.Run(() => {
for (int i = 0; i < 20; i++)
{
Console.WriteLine($"即将添加元素:{i}");
bc.Add(i);
}
bc.CompleteAdding();
});
Task t2 = Task.Run(() => {
while (true)
{
var item = bc.Take();
Console.WriteLine($"T2 已取出元素:{item}");
if (bc.IsCompleted) break;
}
});
Task t3 = Task.Run(() => {
while (true)
{
var item = bc.Take();
Console.WriteLine($"T2 已取出元素:{item}");
if (bc.IsCompleted) break;
}
});
Task.WaitAll(t1, t2, t3);
t1.Dispose();
t2.Dispose();
t3.Dispose();
}
上面的代码有时可以正常执行,有时会引发如下异常:
System.InvalidOperationException
HResult=0x80131509
Message=The collection argument is empty and has been marked as complete with regards to additions.
Source=System.Collections.Concurrent
StackTrace:
at System.Collections.Concurrent.BlockingCollection`1.Take()
at BlockingCollectionDemo.TakeDemo.<>c__DisplayClass0_0.<Main>b__2() in C:\Users\Administrator\source\repos\ConcurrentBagDemo\BlockingCollectionDemo\TakeDemo.cs:line 36
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
这是由于集合调用了 CompleteAdding 方法,且集合已经全部被消费结束后,仍然还有消费者调用 Take 方法时,就会引发该异常。
若要使用 Take 方法最好是加上 try/catch 处理,以免程序崩溃。
参考:《.NET Core 实战:手把手教你掌握 380 个精彩案例》 -- 周家安 著