C# 多线程 07-使用 PLINQ 03-调整 PLINQ 查询的参数
🏷️ 《C# 多线程》
调整 PLINQ 查询的参数
ParallelQuery
本例中使用了 ParallelQuery
中的如下方法
WithDegreeOfParallelism
设置要在查询中使用的并行度。并行度是将用于处理查询的同时执行的任务的最大数目。
WithExecutionMode
设置查询的执行模式
参数
ParallelExecutionMode
查询执行模式Default
此设置为默认设置。PLINQ 将检查查询的结构,仅在可能带来加速时才对查询进行并行化。如果查询结构指示不可能获得加速,则 PLINQ 会将查询当作普通的 LINQ to Objects 查询来执行。
ForceParallelism
并行化整个查询,即使要使用系统开销大的算法。如果认为并行执行查询将带来加速,则使用此标志,但处于默认模式的 PLINQ 将按顺序执行它。
WithMergeOptions
设置此查询的合并选项,它指定查询对输出进行缓冲处理的方式。
参数
ParallelMergeOptions
指定查询中要使用的输出合并的首选类型。换言之,它指示 PLINQ 如何将多个分区的结果合并回单个结果序列。这仅是一个提示,系统在并行处理所有查询时可能不会考虑这一点。
Default
使用默认合并类型,即
AutoBuffered
。NotBuffered
不利用输出缓冲区进行合并。一旦计算出结果元素,就向查询使用者提供这些元素。
AutoBuffered
利用系统选定大小的输出缓冲区进行合并。在向查询使用者提供结果之前,会先将结果累计到输出缓冲区中。
FullyBuffered
利用整个输出缓冲区进行合并。在向查询使用者提供任何结果之前,系统会先累计所有结果。
WithCancellation
设置要与查询关联的
System.Threading.CancellationToken
。AsOrdered
启用将数据源视为“已经排序”的处理方法,重写默认的将数据源视为“未经排序”的处理方法。只可以对由
AsParallel
、ParallelEnumerable.Range
和ParallelEnumerable.Repeat
返回的泛型序列调用AsOrdered
。
示例代码
点击查看代码
/// <summary>
/// 调整 PLINQ 查询的参数
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
var parallelQuery = from t in GetTypes().AsParallel()
select EmulateProcessing(t);
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(3));
try
{
parallelQuery
// 设置要在查询中使用的并行度。并行度是将用于处理查询的同时执行的任务的最大数目。
.WithDegreeOfParallelism(Environment.ProcessorCount)
// 设置查询的执行模式
// (ForceParallelism:并行化整个查询,即使要使用系统开销大的算法。)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
// 设置此查询的合并选项,它指定查询对输出进行缓冲处理的方式。
// (Default:使用默认合并类型,即 AutoBuffered。)
// (AutoBuffered:利用系统选定大小的输出缓冲区进行合并。在向查询使用者提供结果之前,会先将结果累计到输出缓冲区中。)
.WithMergeOptions(ParallelMergeOptions.Default)
// 设置要与查询关联的 System.Threading.CancellationToken。
.WithCancellation(cts.Token)
.ForAll(Console.WriteLine);
}
catch (OperationCanceledException)
{
Console.WriteLine("---");
Console.WriteLine("操作已被取消");
}
Console.WriteLine("---");
Console.WriteLine("执行未排序的 PLINQ 查询");
var unorderedQuery = from i in ParallelEnumerable.Range(1, 30)
select i;
foreach (var i in unorderedQuery)
{
Console.WriteLine(i);
}
Console.WriteLine("---");
Console.WriteLine("执行已排序的 PLINQ 查询");
var orderedQuery = from i in ParallelEnumerable.Range(1, 30).AsOrdered()
select i;
foreach (var i in orderedQuery)
{
Console.WriteLine(i);
}
Console.ReadLine();
}
static string EmulateProcessing(string typeName)
{
Thread.Sleep(TimeSpan.FromMilliseconds(new Random(DateTime.Now.Millisecond).Next(250,350)));
Console.WriteLine($"{typeName} type was processed on a thread id {Thread.CurrentThread.ManagedThreadId}");
return typeName;
}
/// <summary>
/// 使用反射 API 查询加载到当前应用程序域中的所有组件中名称以“Web”开头的类型
/// </summary>
/// <returns></returns>
static IEnumerable<string
GetTypes()
{
return from assembly in AppDomain.CurrentDomain.GetAssemblies()
from type in assembly.GetExportedTypes()
where type.Name.StartsWith("Web")
orderby type.Name.Length
select type.Name;
}
运行结果
点击查看运行结果
WebSocket type was processed on a thread id 1
WebClient type was processed on a thread id 5
WebClient
WebSocket
WebRequest type was processed on a thread id 4
WebProxy type was processed on a thread id 6
WebProxy
WebRequest
WebPermission type was processed on a thread id 4
WebPermission
WebException type was processed on a thread id 6
WebException
WebUtility type was processed on a thread id 5
WebUtility
WebResponse type was processed on a thread id 1
WebResponse
WebSocketState type was processed on a thread id 6
WebSocketState
WebSocketError type was processed on a thread id 4
WebSocketError
WebSocketContext type was processed on a thread id 5
WebSocketContext
WebRequestMethods type was processed on a thread id 1
WebRequestMethods
WebUtilityElement type was processed on a thread id 6
WebUtilityElement
WebExceptionStatus type was processed on a thread id 4
WebExceptionStatus
WebHeaderCollection type was processed on a thread id 1
WebHeaderCollection
WebSocketException type was processed on a thread id 5
WebSocketException
WebSocketCloseStatus type was processed on a thread id 6
WebSocketCloseStatus
WebSocketMessageType type was processed on a thread id 4
WebSocketMessageType
WebProxyScriptElement type was processed on a thread id 1
WebProxyScriptElement
WebPermissionAttribute type was processed on a thread id 5
WebPermissionAttribute
WebSocketReceiveResult type was processed on a thread id 6
WebSocketReceiveResult
WebRequestModuleElement type was processed on a thread id 4
WebRequestModuleElement
WebRequestModulesSection type was processed on a thread id 1
WebRequestModulesSection
WebRequestModuleElementCollection type was processed on a thread id 5
WebRequestModuleElementCollection
---
执行未排序的PLINQ查询
1
9
17
24
2
10
18
25
3
11
19
26
4
12
20
27
5
13
21
28
6
14
22
29
7
15
23
30
8
16
---
执行已排序的PLINQ查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30