C# 多线程 10-并行编程模式 02-使用 BlockingCollection 实现并行管道


private const int CollectionsNumber = 4;
private const int Count = 5;

/// <summary>
/// 使用 BlockingCollection 实现并行管道
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
    var cts = new CancellationTokenSource();

    // 监视 c 键(按下 c 键取消执行)
    Task.Run(() =>
        if (Console.ReadKey().KeyChar == 'c')
    }, cts.Token);

    // 创建 4 个集合,每个集合中有 5 个元素
    var sourceArrays = new BlockingCollection<int>[CollectionsNumber];
    for (int i = 0; i < sourceArrays.Length; i++)
        sourceArrays[i] = new BlockingCollection<int>(Count);

    // 第一个管道:将 int 型数据转换成 Decimal 型
    var convertToDecimal = new PipelineWorker<int, decimal>(
        n => Convert.ToDecimal(n * 100),
        "Decimal Converter"

    // 第二个管道:格式化 Decimal 数据为金额字符串
    var stringifyNumber = new PipelineWorker<decimal, string>(
        s => $"--{s.ToString("C", CultureInfo.GetCultureInfo("en-us"))}--",
        "Console Formatter"

    // 第三个管道:打印格式化后的结果
    var outputResultToConsole = new PipelineWorker<string, string>(
        s => Console.WriteLine($"The final result is {s} on thread id {Thread.CurrentThread.ManagedThreadId}"),
        "Console Output"

            // 初始化集合数据
            () => CreateInitialValues(sourceArrays, cts),
            // 将 int 型数据转换成 Decimal 型
            () => convertToDecimal.Run(),
            // 格式化 Decimal 数据为金额字符串
            () => stringifyNumber.Run(),
            // 打印格式化后的结果
            () => outputResultToConsole.Run()
    catch (AggregateException ae)
        foreach (var ex in ae.InnerExceptions)
            Console.WriteLine(ex.Message + ex.StackTrace);

    if (cts.Token.IsCancellationRequested)
        Console.WriteLine("Operation has been canceled! Press ENTER to exit.");
        Console.WriteLine("Press ENTER to exit.");


/// <summary>
/// 初始化集合数据
/// </summary>
/// <param name="sourceArrays"></param>
/// <param name="cts"></param>
static void CreateInitialValues(BlockingCollection<int>[] sourceArrays, CancellationTokenSource cts)
    Parallel.For(0, sourceArrays.Length * Count, (j, state) =>
        if (cts.Token.IsCancellationRequested)
        int number = GetRandomNumber(j);
        int k = BlockingCollection<int>.TryAddToAny(sourceArrays, j);
        if (k >= 0)
            Console.WriteLine($"added {j} to source data on thread id {Thread.CurrentThread.ManagedThreadId}");

    foreach (var arr in sourceArrays)

static int GetRandomNumber(int seed)
    return new Random(seed).Next(500);

/// <summary>
/// 管道类
/// </summary>
/// <typeparam name="TInput"></typeparam>
/// <typeparam name="TOutput"></typeparam>
class PipelineWorker<TInput, TOutput>
    Func<TInput, TOutput> _processor;
    Action<TInput> _outputProcessor;
    BlockingCollection<TInput>[] _input;
    CancellationToken _token;
    Random _rnd;

    public BlockingCollection<TOutput>[] Output { get; private set; }

    public string Name { get; private set; }

    public PipelineWorker(
        BlockingCollection<TInput>[] input,
        Func<TInput, TOutput> processor,
        CancellationToken token,
        string name)
        _input = input;
        Output = new BlockingCollection<TOutput>[_input.Length];
        for (int i = 0; i < Output.Length; i++)
            Output[i] = null == input[i] ? null : new BlockingCollection<TOutput>(Count);

        _processor = processor;
        _token = token;
        Name = name;
        _rnd = new Random(DateTime.Now.Millisecond);

    public PipelineWorker(
        BlockingCollection<TInput>[] input,
        Action<TInput> renderer,
        CancellationToken token,
        string name)
        _input = input;
        _outputProcessor = renderer;
        _token = token;
        Name = name;
        Output = null;
        _rnd = new Random(DateTime.Now.Millisecond);

    public void Run()
        Console.WriteLine($"{Name} is running");
        while (!_input.All(bc => bc.IsCompleted) && !_token.IsCancellationRequested)
            TInput receivedItem;
            // 尝试从集合中获取元素;如没有元素则会等待
            int i = BlockingCollection<TInput>.TryTakeFromAny(_input, out receivedItem, 50, _token);
            if (i >= 0)
                if (Output != null)
                    TOutput outputItem = _processor(receivedItem);
                    BlockingCollection<TOutput>.AddToAny(Output, outputItem);
                    Console.WriteLine($"{Name} sent {outputItem} to next, on thread id {Thread.CurrentThread.ManagedThreadId}");

        if (Output != null)
            foreach (var bc in Output)


Decimal Converter is running
Console Output is running
Console Formatter is running
added 0 to source data on thread id 1
Decimal Converter sent 0 to next, on thread id 4
Console Formatter sent --$0.00-- to next, on thread id 6
The final result is --$0.00-- on thread id 5
added 1 to source data on thread id 1
Console Formatter sent --$100.00-- to next, on thread id 6
Decimal Converter sent 100 to next, on thread id 4
The final result is --$100.00-- on thread id 5
added 2 to source data on thread id 1
Decimal Converter sent 200 to next, on thread id 4
The final result is --$200.00-- on thread id 5
Console Formatter sent --$200.00-- to next, on thread id 6
added 3 to source data on thread id 1
Decimal Converter sent 300 to next, on thread id 4
Console Formatter sent --$300.00-- to next, on thread id 6
The final result is --$300.00-- on thread id 5
added 4 to source data on thread id 1
Console Formatter sent --$400.00-- to next, on thread id 6
Decimal Converter sent 400 to next, on thread id 4
added 5 to source data on thread id 7
The final result is --$400.00-- on thread id 5
added 6 to source data on thread id 7
Decimal Converter sent 500 to next, on thread id 4
Console Formatter sent --$500.00-- to next, on thread id 6
Decimal Converter sent 600 to next, on thread id 4
Console Formatter sent --$600.00-- to next, on thread id 6
The final result is --$500.00-- on thread id 5
The final result is --$600.00-- on thread id 5
added 8 to source data on thread id 1
Console Formatter sent --$800.00-- to next, on thread id 6
Decimal Converter sent 800 to next, on thread id 4
The final result is --$800.00-- on thread id 5
added 10 to source data on thread id 8
The final result is --$1,000.00-- on thread id 5
Console Formatter sent --$1,000.00-- to next, on thread id 6
Decimal Converter sent 1000 to next, on thread id 4
added 7 to source data on thread id 7
Decimal Converter sent 700 to next, on thread id 4
Console Formatter sent --$700.00-- to next, on thread id 6
The final result is --$700.00-- on thread id 5
added 11 to source data on thread id 7
Decimal Converter sent 1100 to next, on thread id 4
Console Formatter sent --$1,100.00-- to next, on thread id 6
The final result is --$1,100.00-- on thread id 5
added 9 to source data on thread id 1
The final result is --$900.00-- on thread id 5
Console Formatter sent --$900.00-- to next, on thread id 6
Decimal Converter sent 900 to next, on thread id 4
added 15 to source data on thread id 8
Decimal Converter sent 1500 to next, on thread id 4
Console Formatter sent --$1,500.00-- to next, on thread id 6
added 12 to source data on thread id 7
The final result is --$1,500.00-- on thread id 5
added 17 to source data on thread id 1
Decimal Converter sent 1200 to next, on thread id 4
Console Formatter sent --$1,200.00-- to next, on thread id 6
The final result is --$1,200.00-- on thread id 5
Decimal Converter sent 1700 to next, on thread id 4
Console Formatter sent --$1,700.00-- to next, on thread id 6
The final result is --$1,700.00-- on thread id 5
added 16 to source data on thread id 8
Decimal Converter sent 1600 to next, on thread id 4
Console Formatter sent --$1,600.00-- to next, on thread id 6
The final result is --$1,600.00-- on thread id 5
added 18 to source data on thread id 1
Decimal Converter sent 1800 to next, on thread id 4
Console Formatter sent --$1,800.00-- to next, on thread id 6
The final result is --$1,800.00-- on thread id 5
added 19 to source data on thread id 1
Decimal Converter sent 1900 to next, on thread id 4
Console Formatter sent --$1,900.00-- to next, on thread id 6
The final result is --$1,900.00-- on thread id 5
added 13 to source data on thread id 7
Decimal Converter sent 1300 to next, on thread id 4
Console Formatter sent --$1,300.00-- to next, on thread id 6
The final result is --$1,300.00-- on thread id 5
added 14 to source data on thread id 7
Decimal Converter sent 1400 to next, on thread id 4
Console Formatter sent --$1,400.00-- to next, on thread id 6
The final result is --$1,400.00-- on thread id 5
Press ENTER to exit.



1. 初始化集合 CreateInitialValues

因为方法中使用了并行迭代函数Parallel.For,所以有多个线程 id 出现。

2. 管道 1:convertToDecimal

这是定义的第一个管道 (PipelineWorker),将int型转换成Decimal型。

3. 管道 2:stringifyNumber


4. 管道 3:outputResultToConsole


