C# 多线程 08-使用 Reactive Extensions 05-对可观察的集合使用 LINQ 查询
示例代码
csharp
/// <summary>
/// 对可观察的集合使用 LINQ 查询
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
// 计时器序列
IObservable<long> sequence = Observable.Interval(TimeSpan.FromMilliseconds(50)).Take(21);
// 偶数序列
var evenNumbers = from n in sequence
where n % 2 == 0
select n;
// 奇数序列
var oddNumbers = from n in sequence
where n % 2 != 0
select n;
// 连接序列
var combine = from n in evenNumbers.Concat(oddNumbers)
select n;
// 副作用序列(Do 函数)
var nums = (from n in combine
where n % 5 == 0
select n)
.Do(n => Console.WriteLine($"------Number {n} is processed in Do method"));
// 以上测查询均是惰性的,只有订阅了其结果,查询才会运行
using (var sub = OutputToConsole(sequence, 0))
using (var sub2 = OutputToConsole(combine, 1))
using (var sub3 = OutputToConsole(nums, 2))
{
Console.WriteLine("Press Enter to finish the demo");
Console.ReadLine();
}
}
static IDisposable OutputToConsole<T>(IObservable<T> sequence, int innerLevel)
{
string delimiter = innerLevel == 0
? String.Empty
: new string('-', innerLevel * 3);
return sequence.Subscribe(
obj => Console.WriteLine($"{delimiter}{obj}")
, ex => Console.WriteLine($"Error: {ex.Message}")
, () => Console.WriteLine($"{delimiter}Completed")
);
}
运行结果
txt
Press Enter to finish the demo
0
---0
------Number 0 is processed in Do method
------0
1
---2
2
3
4
---4
5
---6
6
7
8
---8
9
---10
------Number 10 is processed in Do method
------10
10
11
12
---12
13
14
---14
15
---16
16
17
---18
18
19
---20
20
------Number 20 is processed in Do method
------20
Completed
---1
---3
------Number 5 is processed in Do method
------5
---5
---7
---9
---11
---13
------Number 15 is processed in Do method
------15
---15
---17
---19
---Completed
------Completed
总结
按照书上的说法,只有连接的序列不是并行的执行的。
不知道这部分是翻译错误还是我的理解不对。
根据结果,计时器序列、偶数序列、奇数序列、连接序列及最后的副作用序列都是并行执行的,也就是说同时有 5 个序列以并行的方式运行。
只不过连接序列那里由于 Concat
函数的原因,在前面的偶数序列结束后才连接,导致连接序列的奇数部分在后面才打印,进而也导致了副作用序列的奇数部分也在后面打印。