Skip to content

.NET Core 实战 [No.200] 跨线程访问 BlockingCollection 集合

🏷️ 《.NET Core 实战》

BlockingCollection<T> 类似于 ConcurrentBag<T> ,也是一个用于多线程访问的集合类,但是功能上要强大很多。

  • BlockingCollection<T> 本身实现了类似于消息队列(MQ)的生产者 - 消费者模式

  • 可以设置集合的容量上限

    只能在创建实例时设置。
    通过 BoundedCapacity 只读属性可以获取其容量上限。
    若未指定,则默认值为 int.MaxValue,此时 BoundedCapacity 属性值为 -1

  • 集合为空时移除(Take)操作会被阻塞,集合满时新增(Add)操作会被阻塞。

    消费者可以阻塞处理,直到生产者新增。
    而当消费者消费过慢导致集合堆积达到容量上限时,会阻塞新增操作,直到消费者消费后释放了容量空间。

    如果不想新增和移除处理被阻塞,BlockingCollection<T> 也提供了 TryAddTryTake 方法。

    生产者可以调用 CompleteAdding 方法来标记生产已结束,此时消费者可以根据 IsCompleted 来判断是否所有的项都已被消费完毕。

  • 封装实现了 IProducerConsumerCollection<T> 的任何集合类型。

    上一篇博客中介绍了 ConcurrentBag<T>,并在最后提到了 ConcurrentQueue<T>ConcurrentStack<T> 类,这三种类型都实现了 IProducerConsumerCollection<T> 接口。

    可在创建实例时指定封装的集合类型,默认为 ConcurrentQueue<T> 类型。

    csharp
    BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000);
  • 支持使用 CancellationToken 取消 TakeAdd 等操作。

    所有 BlockingCollection<T> 的操作都有带 CancellationToken 参数的重载。
    取消时会触发 OperationCanceledException 异常,需要时可以手动捕捉该异常来响应取消请求。

下面是示例代码同上一篇博客一样,只是改成使用 BlockingCollection<T> 来实现。

由于 BlockingCollection<T> 实现了 IDisposable 接口,所以推荐使用 using 的方式创建实例。

csharp
using (BlockingCollection<int> bc = new BlockingCollection<int>(10))
{
    Task t1 = Task.Run(() => {
        for (int i = 0; i < 20; i++)
        {
            Console.WriteLine($"即将添加元素:{i}");
            bc.Add(i);
        } 
        bc.CompleteAdding();
    });

    Task t2 = Task.Run(() => {
        while (true)
        {
            if (bc.TryTake(out int item))
            {
                Console.WriteLine($"T2 已取出元素:{item}");
            }
            if (bc.IsCompleted) break;
        }
    });

    Task t3 = Task.Run(() => {
        while (true)
        {
            if (bc.TryTake(out int item))
            {
                Console.WriteLine($"T3 已取出元素:{item}");
            }
            if (bc.IsCompleted) break;
        }
    });

    Task.WaitAll(t1, t2, t3);
    t1.Dispose();
    t2.Dispose();
    t3.Dispose();
}

执行结果如下:

上面的示例中移除集合项使用的是 TryTake 方法,没有利用到 Take 方法的阻塞效果。

改成使用 Take 方法后代码如下。

csharp
using (BlockingCollection<int> bc = new BlockingCollection<int>(10))
{
    Task t1 = Task.Run(() => {
        for (int i = 0; i < 20; i++)
        {
            Console.WriteLine($"即将添加元素:{i}");
            bc.Add(i);
        }
        bc.CompleteAdding();
    });

    Task t2 = Task.Run(() => {
        while (true)
        {
            var item = bc.Take();
            Console.WriteLine($"T2 已取出元素:{item}");
            if (bc.IsCompleted) break;
        }
    });

    Task t3 = Task.Run(() => {
        while (true)
        {
            var item = bc.Take();
            Console.WriteLine($"T2 已取出元素:{item}");
            if (bc.IsCompleted) break;
        }
    });

    Task.WaitAll(t1, t2, t3);
    t1.Dispose();
    t2.Dispose();
    t3.Dispose();
}

上面的代码有时可以正常执行,有时会引发如下异常:

csharp
System.InvalidOperationException
  HResult=0x80131509
  Message=The collection argument is empty and has been marked as complete with regards to additions.
  Source=System.Collections.Concurrent
  StackTrace:
   at System.Collections.Concurrent.BlockingCollection`1.Take()
   at BlockingCollectionDemo.TakeDemo.<>c__DisplayClass0_0.<Main>b__2() in C:\Users\Administrator\source\repos\ConcurrentBagDemo\BlockingCollectionDemo\TakeDemo.cs:line 36
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)

这是由于集合调用了 CompleteAdding 方法,且集合已经全部被消费结束后,仍然还有消费者调用 Take 方法时,就会引发该异常。

若要使用 Take 方法最好是加上 try/catch 处理,以免程序崩溃。


参考:《.NET Core 实战:手把手教你掌握 380 个精彩案例》 -- 周家安 著