Skip to content
微信扫码关注公众号

C# 多线程 07-使用 PLINQ 03-调整 PLINQ 查询的参数

调整 PLINQ 查询的参数

ParallelQuery

本例中使用了 ParallelQuery 中的如下方法

  • WithDegreeOfParallelism

    设置要在查询中使用的并行度。并行度是将用于处理查询的同时执行的任务的最大数目。

  • WithExecutionMode

    设置查询的执行模式

    • 参数

      • ParallelExecutionMode 查询执行模式

        • Default

          此设置为默认设置。PLINQ 将检查查询的结构,仅在可能带来加速时才对查询进行并行化。如果查询结构指示不可能获得加速,则 PLINQ 会将查询当作普通的 LINQ to Objects 查询来执行。

        • ForceParallelism

          并行化整个查询,即使要使用系统开销大的算法。如果认为并行执行查询将带来加速,则使用此标志,但处于默认模式的 PLINQ 将按顺序执行它。

  • WithMergeOptions

    设置此查询的合并选项,它指定查询对输出进行缓冲处理的方式。

    • 参数

      • ParallelMergeOptions

        指定查询中要使用的输出合并的首选类型。换言之,它指示 PLINQ 如何将多个分区的结果合并回单个结果序列。这仅是一个提示,系统在并行处理所有查询时可能不会考虑这一点。

        • Default

          使用默认合并类型,即 AutoBuffered

        • NotBuffered

          不利用输出缓冲区进行合并。一旦计算出结果元素,就向查询使用者提供这些元素。

        • AutoBuffered

          利用系统选定大小的输出缓冲区进行合并。在向查询使用者提供结果之前,会先将结果累计到输出缓冲区中。

        • FullyBuffered

          利用整个输出缓冲区进行合并。在向查询使用者提供任何结果之前,系统会先累计所有结果。

  • WithCancellation

    设置要与查询关联的 System.Threading.CancellationToken

  • AsOrdered

    启用将数据源视为“已经排序”的处理方法,重写默认的将数据源视为“未经排序”的处理方法。只可以对由 AsParallelParallelEnumerable.RangeParallelEnumerable.Repeat 返回的泛型序列调用 AsOrdered

示例代码

点击查看代码
csharp
/// <summary>
/// 调整 PLINQ 查询的参数
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var parallelQuery = from t in GetTypes().AsParallel()
                        select EmulateProcessing(t);
    var cts = new CancellationTokenSource();
    cts.CancelAfter(TimeSpan.FromSeconds(3));

    try
    {
        parallelQuery
            // 设置要在查询中使用的并行度。并行度是将用于处理查询的同时执行的任务的最大数目。
            .WithDegreeOfParallelism(Environment.ProcessorCount)
            // 设置查询的执行模式
            // (ForceParallelism:并行化整个查询,即使要使用系统开销大的算法。)
            .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
            // 设置此查询的合并选项,它指定查询对输出进行缓冲处理的方式。
            // (Default:使用默认合并类型,即 AutoBuffered。)
            // (AutoBuffered:利用系统选定大小的输出缓冲区进行合并。在向查询使用者提供结果之前,会先将结果累计到输出缓冲区中。)
            .WithMergeOptions(ParallelMergeOptions.Default)
            // 设置要与查询关联的 System.Threading.CancellationToken。
            .WithCancellation(cts.Token)
            .ForAll(Console.WriteLine);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("---");
        Console.WriteLine("操作已被取消");
    }

    Console.WriteLine("---");
    Console.WriteLine("执行未排序的 PLINQ 查询");

    var unorderedQuery = from i in ParallelEnumerable.Range(1, 30)
                            select i;

    foreach (var i in unorderedQuery)
    {
        Console.WriteLine(i);
    }

    Console.WriteLine("---");
    Console.WriteLine("执行已排序的 PLINQ 查询");
    var orderedQuery = from i in ParallelEnumerable.Range(1, 30).AsOrdered()
                        select i;

    foreach (var i in orderedQuery)
    {
        Console.WriteLine(i);
    }

    Console.ReadLine();
}

static string EmulateProcessing(string typeName)
{
    Thread.Sleep(TimeSpan.FromMilliseconds(new Random(DateTime.Now.Millisecond).Next(250,350)));
    Console.WriteLine($"{typeName} type was processed on a thread id {Thread.CurrentThread.ManagedThreadId}");
    return typeName;
}

/// <summary>
/// 使用反射 API 查询加载到当前应用程序域中的所有组件中名称以“Web”开头的类型
/// </summary>
/// <returns></returns>
static IEnumerable<string
GetTypes()
{
    return from assembly in AppDomain.CurrentDomain.GetAssemblies()
            from type in assembly.GetExportedTypes()
            where type.Name.StartsWith("Web")
            orderby type.Name.Length
            select type.Name;
}

运行结果

点击查看运行结果
txt
WebSocket type was processed on a thread id 1
WebClient type was processed on a thread id 5
WebClient
WebSocket
WebRequest type was processed on a thread id 4
WebProxy type was processed on a thread id 6
WebProxy
WebRequest
WebPermission type was processed on a thread id 4
WebPermission
WebException type was processed on a thread id 6
WebException
WebUtility type was processed on a thread id 5
WebUtility
WebResponse type was processed on a thread id 1
WebResponse
WebSocketState type was processed on a thread id 6
WebSocketState
WebSocketError type was processed on a thread id 4
WebSocketError
WebSocketContext type was processed on a thread id 5
WebSocketContext
WebRequestMethods type was processed on a thread id 1
WebRequestMethods
WebUtilityElement type was processed on a thread id 6
WebUtilityElement
WebExceptionStatus type was processed on a thread id 4
WebExceptionStatus
WebHeaderCollection type was processed on a thread id 1
WebHeaderCollection
WebSocketException type was processed on a thread id 5
WebSocketException
WebSocketCloseStatus type was processed on a thread id 6
WebSocketCloseStatus
WebSocketMessageType type was processed on a thread id 4
WebSocketMessageType
WebProxyScriptElement type was processed on a thread id 1
WebProxyScriptElement
WebPermissionAttribute type was processed on a thread id 5
WebPermissionAttribute
WebSocketReceiveResult type was processed on a thread id 6
WebSocketReceiveResult
WebRequestModuleElement type was processed on a thread id 4
WebRequestModuleElement
WebRequestModulesSection type was processed on a thread id 1
WebRequestModulesSection
WebRequestModuleElementCollection type was processed on a thread id 5
WebRequestModuleElementCollection
---
执行未排序的PLINQ查询
1
9
17
24
2
10
18
25
3
11
19
26
4
12
20
27
5
13
21
28
6
14
22
29
7
15
23
30
8
16
---
执行已排序的PLINQ查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

Page Layout Max Width

Adjust the exact value of the page width of VitePress layout to adapt to different reading needs and screens.

Adjust the maximum width of the page layout
A ranged slider for user to choose and customize their desired width of the maximum width of the page layout can go.

Content Layout Max Width

Adjust the exact value of the document content width of VitePress layout to adapt to different reading needs and screens.

Adjust the maximum width of the content layout
A ranged slider for user to choose and customize their desired width of the maximum width of the content layout can go.