C# 多线程 10-并行编程模式 03-使用 TPL 数据流实现并行管道
🏷️ 《C# 多线程》
TIP
需要添加对 Microsoft.Tpl.Dataflow
包的引用(通过 nuget 搜【Microsoft.Tpl.Dataflow】)
示例代码
csharp
/// <summary>
/// 使用 TPL 数据流实现并行管道
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
var t = ProcessAsynchronously();
t.GetAwaiter().GetResult();
}
async static Task ProcessAsynchronously()
{
var cts = new CancellationTokenSource();
Random _rnd = new Random(DateTime.Now.Millisecond);
Task.Run(() =>
{
if (Console.ReadKey().KeyChar == 'c')
{
cts.Cancel();
}
}, cts.Token);
// BufferBlock:将元素传给流中的下一个块
// BoundedCapacity:指定其容量,超过时不再接受新元素,直到一个现有元素被传递给下一个块
var inputBlock = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = cts.Token });
// TransformBlock:用于数据转换步骤
// MaxDegreeOfParallelism:通过该选项指定最大工作者线程数
// 将 int 转换为 decimal
var convertToDecimalBlock = new TransformBlock<int, decimal>(n =>
{
decimal result = Convert.ToDecimal(n * 100);
Console.WriteLine($"Decimal Converter sent {result} to the next stage on {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(TimeSpan.FromMilliseconds(_rnd.Next(200)));
return result;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, CancellationToken = cts.Token });
// 将 decimal 转换为 string
var stringifyBlock = new TransformBlock<decimal, string>(n =>
{
string result = $"--{n.ToString("C", CultureInfo.GetCultureInfo("en-us"))}--";
Console.WriteLine($"String Formatter sent {result} to the next stage on {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(TimeSpan.FromMilliseconds(_rnd.Next(200)));
return result;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, CancellationToken = cts.Token });
// ActionBlock:对每个传入的元素运行一个指定的操作
var outputBlock = new ActionBlock<string>(s => {
Console.WriteLine($"The final result is {s} on thread id {Thread.CurrentThread.ManagedThreadId}");
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, CancellationToken = cts.Token });
// 通过 LinkTo 方法将这些块连接到一起
// PropagateCompletion = true : 当前步骤完成时,自动将结果和异常传播到下一个阶段
inputBlock.LinkTo(convertToDecimalBlock, new DataflowLinkOptions { PropagateCompletion = true });
convertToDecimalBlock.LinkTo(stringifyBlock, new DataflowLinkOptions { PropagateCompletion = true });
stringifyBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true });
try
{
// 向块中添加项
Parallel.For(0, 20, new ParallelOptions {
MaxDegreeOfParallelism = 4, CancellationToken = cts.Token
}, i => {
Console.WriteLine($"added {i} to source data on thread id {Thread.CurrentThread.ManagedThreadId}");
inputBlock.SendAsync(i).GetAwaiter().GetResult();
});
// 添加完成后需要调用 Complete 方法
inputBlock.Complete();
// 等待最后的块完成
await outputBlock.Completion;
Console.WriteLine("Press ENTER to exit.");
}
catch (OperationCanceledException)
{
Console.WriteLine("Operation has been canceled! Press ENTER to exit.");
}
Console.ReadLine();
}
运行结果
txt
added 10 to source data on thread id 5
added 0 to source data on thread id 1
added 5 to source data on thread id 4
added 15 to source data on thread id 6
added 11 to source data on thread id 5
added 12 to source data on thread id 5
added 16 to source data on thread id 6
added 1 to source data on thread id 1
added 6 to source data on thread id 4
Decimal Converter sent 1000 to the next stage on 7
Decimal Converter sent 0 to the next stage on 7
Decimal Converter sent 1500 to the next stage on 7
Decimal Converter sent 500 to the next stage on 7
Decimal Converter sent 1100 to the next stage on 7
String Formatter sent --$1,000.00-- to the next stage on 7
String Formatter sent --$0.00-- to the next stage on 7
String Formatter sent --$1,500.00-- to the next stage on 7
String Formatter sent --$500.00-- to the next stage on 7
String Formatter sent --$1,100.00-- to the next stage on 7
added 7 to source data on thread id 4
added 8 to source data on thread id 4
added 9 to source data on thread id 4
added 13 to source data on thread id 4
added 17 to source data on thread id 6
added 2 to source data on thread id 1
Decimal Converter sent 1200 to the next stage on 8
added 14 to source data on thread id 4
added 18 to source data on thread id 6
The final result is --$1,000.00-- on thread id 7
The final result is --$0.00-- on thread id 7
The final result is --$1,500.00-- on thread id 7
The final result is --$500.00-- on thread id 7
The final result is --$1,100.00-- on thread id 7
Decimal Converter sent 1600 to the next stage on 7
Decimal Converter sent 100 to the next stage on 8
Decimal Converter sent 600 to the next stage on 7
Decimal Converter sent 700 to the next stage on 7
Decimal Converter sent 800 to the next stage on 8
Decimal Converter sent 900 to the next stage on 8
Decimal Converter sent 1300 to the next stage on 7
Decimal Converter sent 1700 to the next stage on 8
Decimal Converter sent 200 to the next stage on 7
added 3 to source data on thread id 4
added 4 to source data on thread id 4
Decimal Converter sent 1400 to the next stage on 4
added 19 to source data on thread id 6
Decimal Converter sent 1800 to the next stage on 6
String Formatter sent --$1,200.00-- to the next stage on 5
Decimal Converter sent 300 to the next stage on 7
Decimal Converter sent 400 to the next stage on 8
Decimal Converter sent 1900 to the next stage on 7
String Formatter sent --$1,600.00-- to the next stage on 5
String Formatter sent --$100.00-- to the next stage on 9
String Formatter sent --$600.00-- to the next stage on 6
String Formatter sent --$700.00-- to the next stage on 4
The final result is --$1,200.00-- on thread id 7
String Formatter sent --$800.00-- to the next stage on 5
The final result is --$1,600.00-- on thread id 7
String Formatter sent --$900.00-- to the next stage on 5
String Formatter sent --$1,300.00-- to the next stage on 6
String Formatter sent --$1,700.00-- to the next stage on 6
String Formatter sent --$200.00-- to the next stage on 4
String Formatter sent --$1,400.00-- to the next stage on 5
String Formatter sent --$1,800.00-- to the next stage on 6
The final result is --$100.00-- on thread id 8
The final result is --$700.00-- on thread id 8
The final result is --$800.00-- on thread id 8
The final result is --$900.00-- on thread id 8
The final result is --$600.00-- on thread id 7
String Formatter sent --$300.00-- to the next stage on 9
The final result is --$1,300.00-- on thread id 8
The final result is --$1,700.00-- on thread id 7
String Formatter sent --$400.00-- to the next stage on 9
String Formatter sent --$1,900.00-- to the next stage on 9
The final result is --$200.00-- on thread id 4
The final result is --$1,400.00-- on thread id 9
The final result is --$300.00-- on thread id 7
The final result is --$1,900.00-- on thread id 9
The final result is --$1,800.00-- on thread id 5
The final result is --$400.00-- on thread id 8
Press ENTER to exit.
运行结果分析
将打印结果分类整理后如下。
可以看出同上一个使用 BlockingCollection
的例子还是有区别的。
管道的执行顺序同加入块中的顺序有一小部分是不一致的,而且管道也不在是一个线程。
可以通过设定管道的最大线程数为 1 来实现顺序执行的效果。
txt
added 10 to source data on thread id 5
added 0 to source data on thread id 1
added 5 to source data on thread id 4
added 15 to source data on thread id 6
added 11 to source data on thread id 5
added 12 to source data on thread id 5
added 16 to source data on thread id 6
added 1 to source data on thread id 1
added 6 to source data on thread id 4
added 7 to source data on thread id 4
added 8 to source data on thread id 4
added 9 to source data on thread id 4
added 13 to source data on thread id 4
added 17 to source data on thread id 6
added 2 to source data on thread id 1
added 14 to source data on thread id 4
added 18 to source data on thread id 6
added 3 to source data on thread id 4
added 4 to source data on thread id 4
added 19 to source data on thread id 6
txt
Decimal Converter sent 1000 to the next stage on 7
Decimal Converter sent 0 to the next stage on 7
Decimal Converter sent 1500 to the next stage on 7
Decimal Converter sent 500 to the next stage on 7
Decimal Converter sent 1100 to the next stage on 7
Decimal Converter sent 1200 to the next stage on 8
Decimal Converter sent 1600 to the next stage on 7
Decimal Converter sent 100 to the next stage on 8
Decimal Converter sent 600 to the next stage on 7
Decimal Converter sent 700 to the next stage on 7
Decimal Converter sent 800 to the next stage on 8
Decimal Converter sent 900 to the next stage on 8
Decimal Converter sent 1300 to the next stage on 7
Decimal Converter sent 1700 to the next stage on 8
Decimal Converter sent 200 to the next stage on 7
Decimal Converter sent 1400 to the next stage on 4
Decimal Converter sent 1800 to the next stage on 6
Decimal Converter sent 300 to the next stage on 7
Decimal Converter sent 400 to the next stage on 8
Decimal Converter sent 1900 to the next stage on 7
txt
String Formatter sent --$1,000.00-- to the next stage on 7
String Formatter sent --$0.00-- to the next stage on 7
String Formatter sent --$1,500.00-- to the next stage on 7
String Formatter sent --$500.00-- to the next stage on 7
String Formatter sent --$1,100.00-- to the next stage on 7
String Formatter sent --$1,200.00-- to the next stage on 5
String Formatter sent --$1,600.00-- to the next stage on 5
String Formatter sent --$100.00-- to the next stage on 9
String Formatter sent --$600.00-- to the next stage on 6
String Formatter sent --$700.00-- to the next stage on 4
String Formatter sent --$800.00-- to the next stage on 5
String Formatter sent --$900.00-- to the next stage on 5
String Formatter sent --$1,300.00-- to the next stage on 6
String Formatter sent --$1,700.00-- to the next stage on 6
String Formatter sent --$200.00-- to the next stage on 4
String Formatter sent --$1,400.00-- to the next stage on 5
String Formatter sent --$1,800.00-- to the next stage on 6
String Formatter sent --$300.00-- to the next stage on 9
String Formatter sent --$400.00-- to the next stage on 9
String Formatter sent --$1,900.00-- to the next stage on 9
txt
The final result is --$1,000.00-- on thread id 7
The final result is --$0.00-- on thread id 7
The final result is --$1,500.00-- on thread id 7
The final result is --$500.00-- on thread id 7
The final result is --$1,100.00-- on thread id 7
The final result is --$1,200.00-- on thread id 7
The final result is --$1,600.00-- on thread id 7
The final result is --$100.00-- on thread id 8
The final result is --$700.00-- on thread id 8
The final result is --$800.00-- on thread id 8
The final result is --$900.00-- on thread id 8
The final result is --$600.00-- on thread id 7
The final result is --$1,300.00-- on thread id 8
The final result is --$1,700.00-- on thread id 7
The final result is --$200.00-- on thread id 4
The final result is --$1,400.00-- on thread id 9
The final result is --$300.00-- on thread id 7
The final result is --$1,900.00-- on thread id 9
The final result is --$1,800.00-- on thread id 5
The final result is --$400.00-- on thread id 8