Skip to content

C# 多线程 08-使用 Reactive Extensions 05-对可观察的集合使用 LINQ 查询

示例代码

csharp
/// <summary>
/// 对可观察的集合使用 LINQ 查询
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    // 计时器序列
    IObservable<long> sequence = Observable.Interval(TimeSpan.FromMilliseconds(50)).Take(21);
    // 偶数序列
    var evenNumbers = from n in sequence
                        where n % 2 == 0
                        select n;
    // 奇数序列
    var oddNumbers = from n in sequence
                    where n % 2 != 0
                    select n;
    // 连接序列
    var combine = from n in evenNumbers.Concat(oddNumbers)
                    select n;
    // 副作用序列(Do 函数)
    var nums = (from n in combine
                where n % 5 == 0
                select n)
                .Do(n => Console.WriteLine($"------Number {n} is processed in Do method"));

    // 以上测查询均是惰性的,只有订阅了其结果,查询才会运行
    using (var sub = OutputToConsole(sequence, 0))
    using (var sub2 = OutputToConsole(combine, 1))
    using (var sub3 = OutputToConsole(nums, 2))
    { 
        Console.WriteLine("Press Enter to finish the demo");
        Console.ReadLine();
    }
}

static IDisposable OutputToConsole<T>(IObservable<T> sequence, int innerLevel)
{
    string delimiter = innerLevel == 0
        ? String.Empty
        : new string('-', innerLevel * 3);

    return sequence.Subscribe(
        obj => Console.WriteLine($"{delimiter}{obj}")
        , ex => Console.WriteLine($"Error: {ex.Message}")
        , () => Console.WriteLine($"{delimiter}Completed")
    );
}

运行结果

txt
Press Enter to finish the demo
0
---0
------Number 0 is processed in Do method
------0
1
---2
2
3
4
---4
5
---6
6
7
8
---8
9
---10
------Number 10 is processed in Do method
------10
10
11
12
---12
13
14
---14
15
---16
16
17
---18
18
19
---20
20
------Number 20 is processed in Do method
------20
Completed
---1
---3
------Number 5 is processed in Do method
------5
---5
---7
---9
---11
---13
------Number 15 is processed in Do method
------15
---15
---17
---19
---Completed
------Completed

总结

按照书上的说法,只有连接的序列不是并行的执行的。

不知道这部分是翻译错误还是我的理解不对。

根据结果,计时器序列、偶数序列、奇数序列、连接序列及最后的副作用序列都是并行执行的,也就是说同时有 5 个序列以并行的方式运行。

只不过连接序列那里由于 Concat 函数的原因,在前面的偶数序列结束后才连接,导致连接序列的奇数部分在后面才打印,进而也导致了副作用序列的奇数部分也在后面打印。