C# 多线程 06-使用并发集合 02-使用 ConcurrentQueue 实现异步处理
🏷️ 《C# 多线程》
使用 ConcurrentQueue 实现异步处理
csharp
/// <summary>
/// 使用 ConcurrentQueue 实现异步处理
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
Task t = RunProgram();
t.Wait();
Console.ReadLine();
}
static async Task RunProgram()
{
var taskQueue = new ConcurrentQueue<CustomTask>();
var cts = new CancellationTokenSource();
// 异步创建任务
var taskSource = Task.Run(() => TaskProducer(taskQueue));
// 创建 4 个任务处理线程
Task[] processors = new Task[4];
for (int i = 1; i <= 4; i++)
{
string processorId = i.ToString();
processors[i - 1] = Task.Run(() => TaskProcessor(taskQueue, $"Processor {processorId}", cts.Token));
}
// 等待创建任务结束
await taskSource;
// 延迟 2 秒发送取消指令,确保创建的任务被处理完
cts.CancelAfter(TimeSpan.FromSeconds(2));
}
/// <summary>
/// 创建任务
/// </summary>
/// <param name="queue"></param>
/// <returns></returns>
static async Task TaskProducer(ConcurrentQueue<CustomTask> queue)
{
for (int i = 1; i <= 20; i++)
{
await Task.Delay(50);
// 创建任务加入队列
var workItem = new CustomTask { Id = i };
queue.Enqueue(workItem);
Console.WriteLine($"Task {workItem.Id} has been posted");
}
}
/// <summary>
/// 任务处理程序
/// </summary>
/// <param name="queue">队列</param>
/// <param name="name">消费程序名</param>
/// <param name="token">令牌(取消任务用)</param>
/// <returns></returns>
static async Task TaskProcessor(ConcurrentQueue<CustomTask> queue, string name, CancellationToken token)
{
CustomTask workItem;
bool dequeueSuccesful = false;
// 若任务未取消,则延迟随机时间后尝试从队列中获取任务
await GetRandomDelay();
do
{
dequeueSuccesful = queue.TryDequeue(out workItem);
if (dequeueSuccesful)
{
Console.WriteLine($"Task {workItem.Id} has been processed by {name}");
}
await GetRandomDelay();
} while (!token.IsCancellationRequested);
}
/// <summary>
/// 获取随机的延迟时间
/// </summary>
/// <returns></returns>
static Task GetRandomDelay()
{
int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
return Task.Delay(delay);
}
private class CustomTask
{
public int Id { get; set; }
}
运行结果
txt
Task 1 has been posted
Task 1 has been processed by Processor 3
Task 2 has been posted
Task 3 has been posted
Task 2 has been processed by Processor 3
Task 3 has been processed by Processor 2
Task 4 has been posted
Task 5 has been posted
Task 6 has been posted
Task 7 has been posted
Task 4 has been processed by Processor 3
Task 7 has been processed by Processor 4
Task 5 has been processed by Processor 2
Task 6 has been processed by Processor 1
Task 8 has been posted
Task 9 has been posted
Task 9 has been processed by Processor 2
Task 8 has been processed by Processor 1
Task 10 has been posted
Task 10 has been processed by Processor 1
Task 11 has been posted
Task 12 has been posted
Task 11 has been processed by Processor 4
Task 12 has been processed by Processor 1
Task 13 has been posted
Task 14 has been posted
Task 13 has been processed by Processor 2
Task 14 has been processed by Processor 3
Task 15 has been posted
Task 15 has been processed by Processor 4
Task 16 has been posted
Task 16 has been processed by Processor 2
Task 17 has been posted
Task 18 has been posted
Task 17 has been processed by Processor 3
Task 18 has been processed by Processor 3
Task 19 has been posted
Task 19 has been processed by Processor 1
Task 20 has been posted
Task 20 has been processed by Processor 4