C# 多线程 10-并行编程模式 02-使用 BlockingCollection 实现并行管道
🏷️ 《C# 多线程》
示例代码
csharp
private const int CollectionsNumber = 4;
private const int Count = 5;
/// <summary>
/// 使用 BlockingCollection 实现并行管道
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
var cts = new CancellationTokenSource();
// 监视 c 键(按下 c 键取消执行)
Task.Run(() =>
{
if (Console.ReadKey().KeyChar == 'c')
{
cts.Cancel();
}
}, cts.Token);
// 创建 4 个集合,每个集合中有 5 个元素
var sourceArrays = new BlockingCollection<int>[CollectionsNumber];
for (int i = 0; i < sourceArrays.Length; i++)
{
sourceArrays[i] = new BlockingCollection<int>(Count);
}
// 第一个管道:将 int 型数据转换成 Decimal 型
var convertToDecimal = new PipelineWorker<int, decimal>(
sourceArrays,
n => Convert.ToDecimal(n * 100),
cts.Token,
"Decimal Converter"
);
// 第二个管道:格式化 Decimal 数据为金额字符串
var stringifyNumber = new PipelineWorker<decimal, string>(
convertToDecimal.Output,
s => $"--{s.ToString("C", CultureInfo.GetCultureInfo("en-us"))}--",
cts.Token,
"Console Formatter"
);
// 第三个管道:打印格式化后的结果
var outputResultToConsole = new PipelineWorker<string, string>(
stringifyNumber.Output,
s => Console.WriteLine($"The final result is {s} on thread id {Thread.CurrentThread.ManagedThreadId}"),
cts.Token,
"Console Output"
);
try
{
Parallel.Invoke(
// 初始化集合数据
() => CreateInitialValues(sourceArrays, cts),
// 将 int 型数据转换成 Decimal 型
() => convertToDecimal.Run(),
// 格式化 Decimal 数据为金额字符串
() => stringifyNumber.Run(),
// 打印格式化后的结果
() => outputResultToConsole.Run()
);
}
catch (AggregateException ae)
{
foreach (var ex in ae.InnerExceptions)
{
Console.WriteLine(ex.Message + ex.StackTrace);
}
}
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Operation has been canceled! Press ENTER to exit.");
}
else
{
Console.WriteLine("Press ENTER to exit.");
}
Console.ReadLine();
}
/// <summary>
/// 初始化集合数据
/// </summary>
/// <param name="sourceArrays"></param>
/// <param name="cts"></param>
static void CreateInitialValues(BlockingCollection<int>[] sourceArrays, CancellationTokenSource cts)
{
Parallel.For(0, sourceArrays.Length * Count, (j, state) =>
{
if (cts.Token.IsCancellationRequested)
{
state.Stop();
}
int number = GetRandomNumber(j);
int k = BlockingCollection<int>.TryAddToAny(sourceArrays, j);
if (k >= 0)
{
Console.WriteLine($"added {j} to source data on thread id {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(TimeSpan.FromMilliseconds(number));
}
});
foreach (var arr in sourceArrays)
{
arr.CompleteAdding();
}
}
static int GetRandomNumber(int seed)
{
return new Random(seed).Next(500);
}
/// <summary>
/// 管道类
/// </summary>
/// <typeparam name="TInput"></typeparam>
/// <typeparam name="TOutput"></typeparam>
class PipelineWorker<TInput, TOutput>
{
Func<TInput, TOutput> _processor;
Action<TInput> _outputProcessor;
BlockingCollection<TInput>[] _input;
CancellationToken _token;
Random _rnd;
public BlockingCollection<TOutput>[] Output { get; private set; }
public string Name { get; private set; }
public PipelineWorker(
BlockingCollection<TInput>[] input,
Func<TInput, TOutput> processor,
CancellationToken token,
string name)
{
_input = input;
Output = new BlockingCollection<TOutput>[_input.Length];
for (int i = 0; i < Output.Length; i++)
{
Output[i] = null == input[i] ? null : new BlockingCollection<TOutput>(Count);
}
_processor = processor;
_token = token;
Name = name;
_rnd = new Random(DateTime.Now.Millisecond);
}
public PipelineWorker(
BlockingCollection<TInput>[] input,
Action<TInput> renderer,
CancellationToken token,
string name)
{
_input = input;
_outputProcessor = renderer;
_token = token;
Name = name;
Output = null;
_rnd = new Random(DateTime.Now.Millisecond);
}
public void Run()
{
Console.WriteLine($"{Name} is running");
while (!_input.All(bc => bc.IsCompleted) && !_token.IsCancellationRequested)
{
TInput receivedItem;
// 尝试从集合中获取元素;如没有元素则会等待
int i = BlockingCollection<TInput>.TryTakeFromAny(_input, out receivedItem, 50, _token);
if (i >= 0)
{
if (Output != null)
{
TOutput outputItem = _processor(receivedItem);
BlockingCollection<TOutput>.AddToAny(Output, outputItem);
Console.WriteLine($"{Name} sent {outputItem} to next, on thread id {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(TimeSpan.FromMilliseconds(_rnd.Next(200)));
}
else
{
_outputProcessor(receivedItem);
}
}
else
{
Thread.Sleep(TimeSpan.FromMilliseconds(50));
}
}
if (Output != null)
{
foreach (var bc in Output)
{
bc.CompleteAdding();
}
}
}
}
运行结果
txt
Decimal Converter is running
Console Output is running
Console Formatter is running
added 0 to source data on thread id 1
Decimal Converter sent 0 to next, on thread id 4
Console Formatter sent --$0.00-- to next, on thread id 6
The final result is --$0.00-- on thread id 5
added 1 to source data on thread id 1
Console Formatter sent --$100.00-- to next, on thread id 6
Decimal Converter sent 100 to next, on thread id 4
The final result is --$100.00-- on thread id 5
added 2 to source data on thread id 1
Decimal Converter sent 200 to next, on thread id 4
The final result is --$200.00-- on thread id 5
Console Formatter sent --$200.00-- to next, on thread id 6
added 3 to source data on thread id 1
Decimal Converter sent 300 to next, on thread id 4
Console Formatter sent --$300.00-- to next, on thread id 6
The final result is --$300.00-- on thread id 5
added 4 to source data on thread id 1
Console Formatter sent --$400.00-- to next, on thread id 6
Decimal Converter sent 400 to next, on thread id 4
added 5 to source data on thread id 7
The final result is --$400.00-- on thread id 5
added 6 to source data on thread id 7
Decimal Converter sent 500 to next, on thread id 4
Console Formatter sent --$500.00-- to next, on thread id 6
Decimal Converter sent 600 to next, on thread id 4
Console Formatter sent --$600.00-- to next, on thread id 6
The final result is --$500.00-- on thread id 5
The final result is --$600.00-- on thread id 5
added 8 to source data on thread id 1
Console Formatter sent --$800.00-- to next, on thread id 6
Decimal Converter sent 800 to next, on thread id 4
The final result is --$800.00-- on thread id 5
added 10 to source data on thread id 8
The final result is --$1,000.00-- on thread id 5
Console Formatter sent --$1,000.00-- to next, on thread id 6
Decimal Converter sent 1000 to next, on thread id 4
added 7 to source data on thread id 7
Decimal Converter sent 700 to next, on thread id 4
Console Formatter sent --$700.00-- to next, on thread id 6
The final result is --$700.00-- on thread id 5
added 11 to source data on thread id 7
Decimal Converter sent 1100 to next, on thread id 4
Console Formatter sent --$1,100.00-- to next, on thread id 6
The final result is --$1,100.00-- on thread id 5
added 9 to source data on thread id 1
The final result is --$900.00-- on thread id 5
Console Formatter sent --$900.00-- to next, on thread id 6
Decimal Converter sent 900 to next, on thread id 4
added 15 to source data on thread id 8
Decimal Converter sent 1500 to next, on thread id 4
Console Formatter sent --$1,500.00-- to next, on thread id 6
added 12 to source data on thread id 7
The final result is --$1,500.00-- on thread id 5
added 17 to source data on thread id 1
Decimal Converter sent 1200 to next, on thread id 4
Console Formatter sent --$1,200.00-- to next, on thread id 6
The final result is --$1,200.00-- on thread id 5
Decimal Converter sent 1700 to next, on thread id 4
Console Formatter sent --$1,700.00-- to next, on thread id 6
The final result is --$1,700.00-- on thread id 5
added 16 to source data on thread id 8
Decimal Converter sent 1600 to next, on thread id 4
Console Formatter sent --$1,600.00-- to next, on thread id 6
The final result is --$1,600.00-- on thread id 5
added 18 to source data on thread id 1
Decimal Converter sent 1800 to next, on thread id 4
Console Formatter sent --$1,800.00-- to next, on thread id 6
The final result is --$1,800.00-- on thread id 5
added 19 to source data on thread id 1
Decimal Converter sent 1900 to next, on thread id 4
Console Formatter sent --$1,900.00-- to next, on thread id 6
The final result is --$1,900.00-- on thread id 5
added 13 to source data on thread id 7
Decimal Converter sent 1300 to next, on thread id 4
Console Formatter sent --$1,300.00-- to next, on thread id 6
The final result is --$1,300.00-- on thread id 5
added 14 to source data on thread id 7
Decimal Converter sent 1400 to next, on thread id 4
Console Formatter sent --$1,400.00-- to next, on thread id 6
The final result is --$1,400.00-- on thread id 5
Press ENTER to exit.
运行结果分析
混在一起不方便理解,将上面的结果中同类信息分类后整理如下:
1. 初始化集合 CreateInitialValues
因为方法中使用了并行迭代函数Parallel.For
,所以有多个线程 id 出现。
txt
added 0 to source data on thread id 1
added 1 to source data on thread id 1
added 2 to source data on thread id 1
added 3 to source data on thread id 1
added 4 to source data on thread id 1
added 5 to source data on thread id 7
added 6 to source data on thread id 7
added 8 to source data on thread id 1
added 10 to source data on thread id 8
added 7 to source data on thread id 7
added 11 to source data on thread id 7
added 9 to source data on thread id 1
added 15 to source data on thread id 8
added 12 to source data on thread id 7
added 17 to source data on thread id 1
added 16 to source data on thread id 8
added 18 to source data on thread id 1
added 19 to source data on thread id 1
added 13 to source data on thread id 7
added 14 to source data on thread id 7
2. 管道 1:convertToDecimal
这是定义的第一个管道 (PipelineWorker
),将int
型转换成Decimal
型。
注意其执行顺序,因为是基于CreateInitialValues创建的结合创建的管道,所以顺序和加入到集合中的顺序是一致的。
另外线程也是单独的同一个线程。
txt
Decimal Converter sent 0 to next, on thread id 4
Decimal Converter sent 100 to next, on thread id 4
Decimal Converter sent 200 to next, on thread id 4
Decimal Converter sent 300 to next, on thread id 4
Decimal Converter sent 400 to next, on thread id 4
Decimal Converter sent 500 to next, on thread id 4
Decimal Converter sent 600 to next, on thread id 4
Decimal Converter sent 800 to next, on thread id 4
Decimal Converter sent 1000 to next, on thread id 4
Decimal Converter sent 700 to next, on thread id 4
Decimal Converter sent 1100 to next, on thread id 4
Decimal Converter sent 900 to next, on thread id 4
Decimal Converter sent 1500 to next, on thread id 4
Decimal Converter sent 1200 to next, on thread id 4
Decimal Converter sent 1700 to next, on thread id 4
Decimal Converter sent 1600 to next, on thread id 4
Decimal Converter sent 1800 to next, on thread id 4
Decimal Converter sent 1900 to next, on thread id 4
Decimal Converter sent 1300 to next, on thread id 4
Decimal Converter sent 1400 to next, on thread id 4
3. 管道 2:stringifyNumber
定义的第二个管道,将Decimal
格式化成金额。
同样的顺序同上一步的顺序一致,线程是单独的同一个线程。
txt
Console Formatter sent --$0.00-- to next, on thread id 6
Console Formatter sent --$100.00-- to next, on thread id 6
Console Formatter sent --$200.00-- to next, on thread id 6
Console Formatter sent --$300.00-- to next, on thread id 6
Console Formatter sent --$400.00-- to next, on thread id 6
Console Formatter sent --$500.00-- to next, on thread id 6
Console Formatter sent --$600.00-- to next, on thread id 6
Console Formatter sent --$800.00-- to next, on thread id 6
Console Formatter sent --$1,000.00-- to next, on thread id 6
Console Formatter sent --$700.00-- to next, on thread id 6
Console Formatter sent --$1,100.00-- to next, on thread id 6
Console Formatter sent --$900.00-- to next, on thread id 6
Console Formatter sent --$1,500.00-- to next, on thread id 6
Console Formatter sent --$1,200.00-- to next, on thread id 6
Console Formatter sent --$1,700.00-- to next, on thread id 6
Console Formatter sent --$1,600.00-- to next, on thread id 6
Console Formatter sent --$1,800.00-- to next, on thread id 6
Console Formatter sent --$1,900.00-- to next, on thread id 6
Console Formatter sent --$1,300.00-- to next, on thread id 6
Console Formatter sent --$1,400.00-- to next, on thread id 6
4. 管道 3:outputResultToConsole
定义的第三个管道,打印格式化后的结果。
同样的顺序也是一致的,线程是单独的同一个线程。
txt
The final result is --$0.00-- on thread id 5
The final result is --$100.00-- on thread id 5
The final result is --$200.00-- on thread id 5
The final result is --$300.00-- on thread id 5
The final result is --$400.00-- on thread id 5
The final result is --$500.00-- on thread id 5
The final result is --$600.00-- on thread id 5
The final result is --$800.00-- on thread id 5
The final result is --$1,000.00-- on thread id 5
The final result is --$700.00-- on thread id 5
The final result is --$1,100.00-- on thread id 5
The final result is --$900.00-- on thread id 5
The final result is --$1,500.00-- on thread id 5
The final result is --$1,200.00-- on thread id 5
The final result is --$1,700.00-- on thread id 5
The final result is --$1,600.00-- on thread id 5
The final result is --$1,800.00-- on thread id 5
The final result is --$1,900.00-- on thread id 5
The final result is --$1,300.00-- on thread id 5
The final result is --$1,400.00-- on thread id 5