Skip to content

C# 多线程 06-使用并发集合 05-使用 BlockingCollection 进行异步处理

🏷️ 《C# 多线程》

使用 BlockingCollection 进行异步处理

示例代码

csharp
/// <summary>
/// 使用 BlockingCollection 进行异步处理
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    Console.WriteLine($"Using a Queue inside of BlockingCollection");
    Console.WriteLine();
    // 默认使用 ConcurrentQueue 容器
    Task t = RunProgram();
    t.Wait();
    Console.WriteLine();
    Console.WriteLine("Using a Stack inside of BlockingCollection");
    Console.WriteLine();
    // 使用 ConcurrentStack 容器
    t = RunProgram(new ConcurrentStack<CustomTask>());
    t.Wait();

    Console.ReadLine();
}

static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection = null)
{
    // BlockingCollection 类可以改变存储在阻塞集合中的方式
    var taskCollection = new BlockingCollection<CustomTask>();
    // 默认使用 ConcurrentQueue 容器
    // 如果指定容器,则使用指定的容器
    if (collection != null)
    {
        taskCollection = new BlockingCollection<CustomTask>(collection);
    }

    // 调用生产者创建任务
    var taskSource = Task.Run(() => TaskProducer(taskCollection));

    // 生成消费者,消费任务
    Task[] processors = new Task[4];
    for (int i = 1; i <= 4; i++)
    {
        string processorId = $"Processor {i}";
        processors[i - 1] = Task.Run(() => TaskProcessor(taskCollection, processorId));
    }

    // 等待任务创建完毕
    await taskSource;

    // 等待任务全部消费完
    await Task.WhenAll(processors);
}

/// <summary>
/// 生产者
/// </summary>
/// <param name="collection"></param>
/// <returns></returns>
static async Task TaskProducer(BlockingCollection<CustomTask> collection)
{
    for (int i = 1; i <= 20; i++)
    {
        await Task.Delay(20);
        var workItem = new CustomTask { Id = i };
        collection.Add(workItem);
        Console.WriteLine($"Task {workItem.Id} has been posted");
    }
    // 生产者调用 CompleteAdding 方法时,该迭代周期会结束
    collection.CompleteAdding();
}

/// <summary>
/// 消费者
/// </summary>
/// <param name="collection"></param>
/// <param name="name"></param>
/// <returns></returns>
static async Task TaskProcessor(BlockingCollection<CustomTask> collection, string name)
{
    await GetRandomDelay();
    // 使用 GetConsumingEnumerable 方法获取工作项
    foreach (CustomTask item in collection.GetConsumingEnumerable())
    {
        Console.WriteLine($"Task {item.Id} has been processed by {name}");
        await GetRandomDelay();
    }
}

static Task GetRandomDelay()
{
    int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
    return Task.Delay(delay);
}

private class CustomTask
{
    public int Id { get; set; }
}

运行结果

使用默认的 ConcurrentQueue 容器时,顺序的消费任务;

使用指定的 ConcurrentStack 容器时,以后进先出的顺序消费任务;

txt
Using a Queue inside of BlockingCollection

Task 1 has been posted
Task 2 has been posted
Task 3 has been posted
Task 4 has been posted
Task 1 has been processed by Processor 1
Task 5 has been posted
Task 6 has been posted
Task 7 has been posted
Task 8 has been posted
Task 9 has been posted
Task 10 has been posted
Task 2 has been processed by Processor 1
Task 11 has been posted
Task 12 has been posted
Task 13 has been posted
Task 6 has been processed by Processor 2
Task 4 has been processed by Processor 3
Task 5 has been processed by Processor 1
Task 3 has been processed by Processor 4
Task 14 has been posted
Task 15 has been posted
Task 16 has been posted
Task 17 has been posted
Task 18 has been posted
Task 7 has been processed by Processor 4
Task 19 has been posted
Task 20 has been posted
Task 8 has been processed by Processor 4
Task 9 has been processed by Processor 1
Task 11 has been processed by Processor 2
Task 10 has been processed by Processor 3
Task 13 has been processed by Processor 1
Task 12 has been processed by Processor 2
Task 14 has been processed by Processor 4
Task 15 has been processed by Processor 1
Task 16 has been processed by Processor 3
Task 17 has been processed by Processor 4
Task 18 has been processed by Processor 3
Task 19 has been processed by Processor 2
Task 20 has been processed by Processor 4

Using a Stack inside of BlockingCollection

Task 1 has been posted
Task 2 has been posted
Task 3 has been posted
Task 4 has been posted
Task 5 has been posted
Task 6 has been posted
Task 7 has been posted
Task 8 has been posted
Task 9 has been posted
Task 8 has been processed by Processor 1
Task 9 has been processed by Processor 3
Task 7 has been processed by Processor 4
Task 6 has been processed by Processor 2
Task 10 has been posted
Task 11 has been posted
Task 12 has been posted
Task 13 has been posted
Task 14 has been posted
Task 15 has been posted
Task 16 has been posted
Task 17 has been posted
Task 18 has been posted
Task 18 has been processed by Processor 2
Task 17 has been processed by Processor 4
Task 15 has been processed by Processor 3
Task 16 has been processed by Processor 1
Task 14 has been processed by Processor 1
Task 13 has been processed by Processor 3
Task 12 has been processed by Processor 4
Task 19 has been posted
Task 11 has been processed by Processor 2
Task 20 has been posted
Task 20 has been processed by Processor 4
Task 19 has been processed by Processor 1
Task 10 has been processed by Processor 3
Task 5 has been processed by Processor 1
Task 4 has been processed by Processor 3
Task 3 has been processed by Processor 1
Task 2 has been processed by Processor 2
Task 1 has been processed by Processor 4