Skip to content

C# 多线程 10-并行编程模式 02-使用 BlockingCollection 实现并行管道

🏷️ 《C# 多线程》

示例代码

csharp
private const int CollectionsNumber = 4;
private const int Count = 5;

/// <summary>
/// 使用 BlockingCollection 实现并行管道
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var cts = new CancellationTokenSource();

    // 监视 c 键(按下 c 键取消执行)
    Task.Run(() =>
    {
        if (Console.ReadKey().KeyChar == 'c')
        {
            cts.Cancel();
        }
    }, cts.Token);

    // 创建 4 个集合,每个集合中有 5 个元素
    var sourceArrays = new BlockingCollection<int>[CollectionsNumber];
    for (int i = 0; i < sourceArrays.Length; i++)
    {
        sourceArrays[i] = new BlockingCollection<int>(Count);
    }

    // 第一个管道:将 int 型数据转换成 Decimal 型
    var convertToDecimal = new PipelineWorker<int, decimal>(
        sourceArrays,
        n => Convert.ToDecimal(n * 100),
        cts.Token,
        "Decimal Converter"
        );

    // 第二个管道:格式化 Decimal 数据为金额字符串
    var stringifyNumber = new PipelineWorker<decimal, string>(
        convertToDecimal.Output,
        s => $"--{s.ToString("C", CultureInfo.GetCultureInfo("en-us"))}--",
        cts.Token,
        "Console Formatter"
        );

    // 第三个管道:打印格式化后的结果
    var outputResultToConsole = new PipelineWorker<string, string>(
        stringifyNumber.Output,
        s => Console.WriteLine($"The final result is {s} on thread id {Thread.CurrentThread.ManagedThreadId}"),
        cts.Token,
        "Console Output"
        );

    try
    {
        Parallel.Invoke(
            // 初始化集合数据
            () => CreateInitialValues(sourceArrays, cts),
            // 将 int 型数据转换成 Decimal 型
            () => convertToDecimal.Run(),
            // 格式化 Decimal 数据为金额字符串
            () => stringifyNumber.Run(),
            // 打印格式化后的结果
            () => outputResultToConsole.Run()
            );
    }
    catch (AggregateException ae)
    {
        foreach (var ex in ae.InnerExceptions)
        {
            Console.WriteLine(ex.Message + ex.StackTrace);
        }
    }

    if (cts.Token.IsCancellationRequested)
    {
        Console.WriteLine("Operation has been canceled! Press ENTER to exit.");
    }
    else
    {
        Console.WriteLine("Press ENTER to exit.");
    }

    Console.ReadLine();
}

/// <summary>
/// 初始化集合数据
/// </summary>
/// <param name="sourceArrays"></param>
/// <param name="cts"></param>
static void CreateInitialValues(BlockingCollection<int>[] sourceArrays, CancellationTokenSource cts)
{
    Parallel.For(0, sourceArrays.Length * Count, (j, state) =>
    {
        if (cts.Token.IsCancellationRequested)
        {
            state.Stop();
        }
        int number = GetRandomNumber(j);
        int k = BlockingCollection<int>.TryAddToAny(sourceArrays, j);
        if (k >= 0)
        {
            Console.WriteLine($"added {j} to source data on thread id {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(TimeSpan.FromMilliseconds(number));
        }
    });

    foreach (var arr in sourceArrays)
    {
        arr.CompleteAdding();
    }
}

static int GetRandomNumber(int seed)
{
    return new Random(seed).Next(500);
}

/// <summary>
/// 管道类
/// </summary>
/// <typeparam name="TInput"></typeparam>
/// <typeparam name="TOutput"></typeparam>
class PipelineWorker<TInput, TOutput>
{
    Func<TInput, TOutput> _processor;
    Action<TInput> _outputProcessor;
    BlockingCollection<TInput>[] _input;
    CancellationToken _token;
    Random _rnd;

    public BlockingCollection<TOutput>[] Output { get; private set; }

    public string Name { get; private set; }

    public PipelineWorker(
        BlockingCollection<TInput>[] input,
        Func<TInput, TOutput> processor,
        CancellationToken token,
        string name)
    {
        _input = input;
        Output = new BlockingCollection<TOutput>[_input.Length];
        for (int i = 0; i < Output.Length; i++)
        {
            Output[i] = null == input[i] ? null : new BlockingCollection<TOutput>(Count);
        }

        _processor = processor;
        _token = token;
        Name = name;
        _rnd = new Random(DateTime.Now.Millisecond);
    }

    public PipelineWorker(
        BlockingCollection<TInput>[] input,
        Action<TInput> renderer,
        CancellationToken token,
        string name)
    {
        _input = input;
        _outputProcessor = renderer;
        _token = token;
        Name = name;
        Output = null;
        _rnd = new Random(DateTime.Now.Millisecond);
    }

    public void Run()
    {
        Console.WriteLine($"{Name} is running");
        while (!_input.All(bc => bc.IsCompleted) && !_token.IsCancellationRequested)
        {
            TInput receivedItem;
            // 尝试从集合中获取元素;如没有元素则会等待
            int i = BlockingCollection<TInput>.TryTakeFromAny(_input, out receivedItem, 50, _token);
            if (i >= 0)
            {
                if (Output != null)
                {
                    TOutput outputItem = _processor(receivedItem);
                    BlockingCollection<TOutput>.AddToAny(Output, outputItem);
                    Console.WriteLine($"{Name} sent {outputItem} to next, on thread id {Thread.CurrentThread.ManagedThreadId}");
                    Thread.Sleep(TimeSpan.FromMilliseconds(_rnd.Next(200)));
                }
                else
                {
                    _outputProcessor(receivedItem);
                }
            }
            else
            {
                Thread.Sleep(TimeSpan.FromMilliseconds(50));
            }
        }

        if (Output != null)
        {
            foreach (var bc in Output)
            {
                bc.CompleteAdding();
            }
        }
    }
}

运行结果

txt
Decimal Converter is running
Console Output is running
Console Formatter is running
added 0 to source data on thread id 1
Decimal Converter sent 0 to next, on thread id 4
Console Formatter sent --$0.00-- to next, on thread id 6
The final result is --$0.00-- on thread id 5
added 1 to source data on thread id 1
Console Formatter sent --$100.00-- to next, on thread id 6
Decimal Converter sent 100 to next, on thread id 4
The final result is --$100.00-- on thread id 5
added 2 to source data on thread id 1
Decimal Converter sent 200 to next, on thread id 4
The final result is --$200.00-- on thread id 5
Console Formatter sent --$200.00-- to next, on thread id 6
added 3 to source data on thread id 1
Decimal Converter sent 300 to next, on thread id 4
Console Formatter sent --$300.00-- to next, on thread id 6
The final result is --$300.00-- on thread id 5
added 4 to source data on thread id 1
Console Formatter sent --$400.00-- to next, on thread id 6
Decimal Converter sent 400 to next, on thread id 4
added 5 to source data on thread id 7
The final result is --$400.00-- on thread id 5
added 6 to source data on thread id 7
Decimal Converter sent 500 to next, on thread id 4
Console Formatter sent --$500.00-- to next, on thread id 6
Decimal Converter sent 600 to next, on thread id 4
Console Formatter sent --$600.00-- to next, on thread id 6
The final result is --$500.00-- on thread id 5
The final result is --$600.00-- on thread id 5
added 8 to source data on thread id 1
Console Formatter sent --$800.00-- to next, on thread id 6
Decimal Converter sent 800 to next, on thread id 4
The final result is --$800.00-- on thread id 5
added 10 to source data on thread id 8
The final result is --$1,000.00-- on thread id 5
Console Formatter sent --$1,000.00-- to next, on thread id 6
Decimal Converter sent 1000 to next, on thread id 4
added 7 to source data on thread id 7
Decimal Converter sent 700 to next, on thread id 4
Console Formatter sent --$700.00-- to next, on thread id 6
The final result is --$700.00-- on thread id 5
added 11 to source data on thread id 7
Decimal Converter sent 1100 to next, on thread id 4
Console Formatter sent --$1,100.00-- to next, on thread id 6
The final result is --$1,100.00-- on thread id 5
added 9 to source data on thread id 1
The final result is --$900.00-- on thread id 5
Console Formatter sent --$900.00-- to next, on thread id 6
Decimal Converter sent 900 to next, on thread id 4
added 15 to source data on thread id 8
Decimal Converter sent 1500 to next, on thread id 4
Console Formatter sent --$1,500.00-- to next, on thread id 6
added 12 to source data on thread id 7
The final result is --$1,500.00-- on thread id 5
added 17 to source data on thread id 1
Decimal Converter sent 1200 to next, on thread id 4
Console Formatter sent --$1,200.00-- to next, on thread id 6
The final result is --$1,200.00-- on thread id 5
Decimal Converter sent 1700 to next, on thread id 4
Console Formatter sent --$1,700.00-- to next, on thread id 6
The final result is --$1,700.00-- on thread id 5
added 16 to source data on thread id 8
Decimal Converter sent 1600 to next, on thread id 4
Console Formatter sent --$1,600.00-- to next, on thread id 6
The final result is --$1,600.00-- on thread id 5
added 18 to source data on thread id 1
Decimal Converter sent 1800 to next, on thread id 4
Console Formatter sent --$1,800.00-- to next, on thread id 6
The final result is --$1,800.00-- on thread id 5
added 19 to source data on thread id 1
Decimal Converter sent 1900 to next, on thread id 4
Console Formatter sent --$1,900.00-- to next, on thread id 6
The final result is --$1,900.00-- on thread id 5
added 13 to source data on thread id 7
Decimal Converter sent 1300 to next, on thread id 4
Console Formatter sent --$1,300.00-- to next, on thread id 6
The final result is --$1,300.00-- on thread id 5
added 14 to source data on thread id 7
Decimal Converter sent 1400 to next, on thread id 4
Console Formatter sent --$1,400.00-- to next, on thread id 6
The final result is --$1,400.00-- on thread id 5
Press ENTER to exit.

运行结果分析

混在一起不方便理解,将上面的结果中同类信息分类后整理如下:

1. 初始化集合 CreateInitialValues

因为方法中使用了并行迭代函数Parallel.For,所以有多个线程 id 出现。

txt
added 0 to source data on thread id 1
added 1 to source data on thread id 1
added 2 to source data on thread id 1
added 3 to source data on thread id 1
added 4 to source data on thread id 1
added 5 to source data on thread id 7
added 6 to source data on thread id 7
added 8 to source data on thread id 1
added 10 to source data on thread id 8
added 7 to source data on thread id 7
added 11 to source data on thread id 7
added 9 to source data on thread id 1
added 15 to source data on thread id 8
added 12 to source data on thread id 7
added 17 to source data on thread id 1
added 16 to source data on thread id 8
added 18 to source data on thread id 1
added 19 to source data on thread id 1
added 13 to source data on thread id 7
added 14 to source data on thread id 7

2. 管道 1:convertToDecimal

这是定义的第一个管道 (PipelineWorker),将int型转换成Decimal型。
注意其执行顺序,因为是基于CreateInitialValues创建的结合创建的管道,所以顺序和加入到集合中的顺序是一致的。
另外线程也是单独的同一个线程。

txt
Decimal Converter sent 0 to next, on thread id 4
Decimal Converter sent 100 to next, on thread id 4
Decimal Converter sent 200 to next, on thread id 4
Decimal Converter sent 300 to next, on thread id 4
Decimal Converter sent 400 to next, on thread id 4
Decimal Converter sent 500 to next, on thread id 4
Decimal Converter sent 600 to next, on thread id 4
Decimal Converter sent 800 to next, on thread id 4
Decimal Converter sent 1000 to next, on thread id 4
Decimal Converter sent 700 to next, on thread id 4
Decimal Converter sent 1100 to next, on thread id 4
Decimal Converter sent 900 to next, on thread id 4
Decimal Converter sent 1500 to next, on thread id 4
Decimal Converter sent 1200 to next, on thread id 4
Decimal Converter sent 1700 to next, on thread id 4
Decimal Converter sent 1600 to next, on thread id 4
Decimal Converter sent 1800 to next, on thread id 4
Decimal Converter sent 1900 to next, on thread id 4
Decimal Converter sent 1300 to next, on thread id 4
Decimal Converter sent 1400 to next, on thread id 4

3. 管道 2:stringifyNumber

定义的第二个管道,将Decimal格式化成金额。
同样的顺序同上一步的顺序一致,线程是单独的同一个线程。

txt
Console Formatter sent --$0.00-- to next, on thread id 6
Console Formatter sent --$100.00-- to next, on thread id 6
Console Formatter sent --$200.00-- to next, on thread id 6
Console Formatter sent --$300.00-- to next, on thread id 6
Console Formatter sent --$400.00-- to next, on thread id 6
Console Formatter sent --$500.00-- to next, on thread id 6
Console Formatter sent --$600.00-- to next, on thread id 6
Console Formatter sent --$800.00-- to next, on thread id 6
Console Formatter sent --$1,000.00-- to next, on thread id 6
Console Formatter sent --$700.00-- to next, on thread id 6
Console Formatter sent --$1,100.00-- to next, on thread id 6
Console Formatter sent --$900.00-- to next, on thread id 6
Console Formatter sent --$1,500.00-- to next, on thread id 6
Console Formatter sent --$1,200.00-- to next, on thread id 6
Console Formatter sent --$1,700.00-- to next, on thread id 6
Console Formatter sent --$1,600.00-- to next, on thread id 6
Console Formatter sent --$1,800.00-- to next, on thread id 6
Console Formatter sent --$1,900.00-- to next, on thread id 6
Console Formatter sent --$1,300.00-- to next, on thread id 6
Console Formatter sent --$1,400.00-- to next, on thread id 6

4. 管道 3:outputResultToConsole

定义的第三个管道,打印格式化后的结果。
同样的顺序也是一致的,线程是单独的同一个线程。

txt
The final result is --$0.00-- on thread id 5
The final result is --$100.00-- on thread id 5
The final result is --$200.00-- on thread id 5
The final result is --$300.00-- on thread id 5
The final result is --$400.00-- on thread id 5
The final result is --$500.00-- on thread id 5
The final result is --$600.00-- on thread id 5
The final result is --$800.00-- on thread id 5
The final result is --$1,000.00-- on thread id 5
The final result is --$700.00-- on thread id 5
The final result is --$1,100.00-- on thread id 5
The final result is --$900.00-- on thread id 5
The final result is --$1,500.00-- on thread id 5
The final result is --$1,200.00-- on thread id 5
The final result is --$1,700.00-- on thread id 5
The final result is --$1,600.00-- on thread id 5
The final result is --$1,800.00-- on thread id 5
The final result is --$1,900.00-- on thread id 5
The final result is --$1,300.00-- on thread id 5
The final result is --$1,400.00-- on thread id 5