.NET Core 实战 [No.199] 多个 Task 同时操作 ConcurrentBag 集合
ConcurrentBag<T>
特点:泛型集合、无序、线程安全。
泛型集合
Add
方法添加元素TryTake
方法取出元素然后从集合中删除该元素TryPeek
方法取出元素但不会从集合中删除该元素IsEmpty
属性表示集合是否是空集合
无序
从集合中取出元素的顺序和放入的顺序 可能 不一致。
线程安全
ConcurrentBag<T>
的优点就是可以在多个线程上都可以访问,而且是线程安全的。
后面的几个示例就是为了验证这一点,并且另外使用List<T>
和Queue<T>
来做对比。
示例
使用三个线程同时操作同一个集合,一个线程用来添加元素到集合,另外两个取出元素打印到窗口并从集合中删除该元素。
1. ConcurrentBag<T>
ConcurrentBag<int> bag = new ConcurrentBag<int>();
bool isOver = false;
Task t1 = Task.Run(() => {
for (int i = 0; i < 20; i++)
{
Console.WriteLine($"即将添加元素:{i}");
bag.Add(i);
}
isOver = true;
});
Task t2 = Task.Run(() => {
while (true)
{
if (isOver && bag.IsEmpty)
{
break;
}
if (bag.TryTake(out int item))
{
Console.WriteLine($"T2 已取出元素:{item}");
}
}
});
Task t3 = Task.Run(() => {
while (true)
{
if (isOver && bag.IsEmpty)
{
break;
}
if (bag.TryTake(out int item))
{
Console.WriteLine($"T3 已取出元素:{item}");
}
}
});
Task.WaitAll(t1, t2, t3);
由于 ConcurrentBag<T>
线程安全的特性,多线程同时操作时,无需额外的处理即可正常的执行。
由下图可以看出,每个元素被正确的添加一次和取出一次。
2. List<T>
这里改成使用 List<T>
实现相同的功能。由于列表没有类似 TryTake
方法的功能,这里使用下标 0 来获取收个元素,然后使用 Remove
方法删除。
注意:列表中如果有重复的元素,多线程中使用 Remove
方法会导致删除本不应被删除的列表项。
List<int> list = new List<int>();
bool isOver = false;
Task t1 = Task.Run(() => {
for (int i = 0; i < 20; i++)
{
Console.WriteLine($"即将添加元素:{i}");
list.Add(i);
}
isOver = true;
});
Task t2 = Task.Run(() => {
while (true)
{
if (isOver && list.Count <= 0)
{
break;
}
if (list.Count > 0)
{
var item = list[0];
Console.WriteLine($"T2 已取出元素:{item}");
list.Remove(item);
}
}
});
Task t3 = Task.Run(() => {
while (true)
{
if (isOver && list.Count <= 0)
{
break;
}
if (list.Count > 0)
{
var item = list[0];
Console.WriteLine($"T3 已取出元素:{item}");
list.Remove(item);
}
}
});
Task.WaitAll(t1, t2, t3);
从下面结果的截图可以看出,同一个元素很容易被两个线程同时取出。
3. Queue<T>
“先进先出”的队列(Queue<T>
)类型和 ConcurrentBag<T>
比较像,有类似的 Enqueue
和 TryDequeue
方法。
Queue<int> queue = new Queue<int>();
bool isOver = false;
Task t1 = Task.Run(() => {
for (int i = 0; i < 20; i++)
{
Console.WriteLine($"即将添加元素:{i}");
queue.Enqueue(i);
}
isOver = true;
});
Task t2 = Task.Run(() => {
while (true)
{
if (isOver && queue.Count <= 0)
{
break;
}
if (queue.TryDequeue(out int item))
{
Console.WriteLine($"T2 已取出元素:{item}");
}
}
});
Task t3 = Task.Run(() => {
while (true)
{
if (isOver && queue.Count <= 0)
{
break;
}
if (queue.TryDequeue(out int item))
{
Console.WriteLine($"T3 已取出元素:{item}");
}
}
});
Task.WaitAll(t1, t2, t3);
从下面的运行结果截图可以看出,Queue<T>
的表现相当奇怪,队列中应该是没有值的情况下 TryDequeue
方法的返回值仍然会 True 。而且不仅两个线程会打印重复的数值,同一个线程也会打印重复的数值。
有时运行还会引发如下异常:
System.ArgumentException
HResult=0x80070057
Message=Source array was not long enough. Check the source index, length, and the array's lower bounds.
Arg_ParamName_Name
Source=System.Private.CoreLib
StackTrace:
at System.Array.Copy(Array sourceArray, Int32 sourceIndex, Array destinationArray, Int32 destinationIndex, Int32 length, Boolean reliable)
at System.Collections.Generic.Queue`1.SetCapacity(Int32 capacity)
at System.Collections.Generic.Queue`1.Enqueue(T item)
at QueueDemo.Program.<>c__DisplayClass0_0.<Main>b__0() in C:\Users\Administrator\source\repos\ConcurrentBagDemo\QueueDemo\Program.cs:line 18
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
那么,如果我们需要在多线程中使用“先进先出”的队列或者“先进后出”的堆栈应该怎么实现呢?
C# 为我们提供了线程安全版的队列 ConcurrentQueue<T>
和堆栈 ConcurrentStack<T>
类型。这两个类型都在 System.Collections.Concurrent 命名空间下面。
参考:《.NET Core 实战:手把手教你掌握 380 个精彩案例》 -- 周家安 著