C# 多线程 08-使用 Reactive Extensions 04-创建可观察的对象
🏷️ 《C# 多线程》
本例展示了创建可观察的对象的不同场景
示例代码
csharp
/// <summary>
/// 创建可观察的对象
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
Console.WriteLine("使用值创建 Observable 方法");
Console.WriteLine("Observable.Return(0)");
IObservable<int> o = Observable.Return(0);
using (var sub = OutputToConsole(o)) ;
Console.WriteLine("--------------------");
Console.WriteLine("不使用值创建 Observable 方法");
Console.WriteLine("Observable.Empty<int>()");
o = Observable.Empty<int>();
using (var sub = OutputToConsole(o)) ;
Console.WriteLine("--------------------");
Console.WriteLine("通过 Observable.Throw 触发 OnError 处理器");
Console.WriteLine("Observable.Throw<int>( new Exception())");
o = Observable.Throw<int>( new Exception());
using (var sub = OutputToConsole(o)) ;
Console.WriteLine("--------------------");
Console.WriteLine("使用 Observable.Repeat 创建无尽序列");
Console.WriteLine("Observable.Repeat(42)");
o = Observable.Repeat(42);
using (var sub = OutputToConsole(o.Take(5))) ;
Console.WriteLine("--------------------");
Console.WriteLine("使用 Observable.Range 创建一组值");
Console.WriteLine("Observable.Range(0, 10)");
o = Observable.Range(0, 10);
using (var sub = OutputToConsole(o)) ;
Console.WriteLine("--------------------");
Console.WriteLine("Observable.Create 方法支持很多的自定义场景");
// Create 方法接受一个函数,该函数接受一个观察者实例,并且返回 IDisposable 对象来代表订阅者
o = Observable.Create<int>(ob =>
{
for (int i = 0; i < 10; i++)
{
ob.OnNext(i);
}
return Disposable.Empty;
});
using (var sub = OutputToConsole(o)) ;
Console.WriteLine("--------------------");
Console.WriteLine("Observable.Generate 是另一个创建自定义序列的方式");
o = Observable.Generate(
0 // install state 初始值
, i => i < 5 // while this is true we continue the sequence 一个断言,用来决定是否需要生成更多元素或者完成序列
, i => ++i // iteration 迭代逻辑
, i => i * 2 // selecting result 选择器函数,允许我们定制化结果
);
using (var sub = OutputToConsole(o)) ;
Console.WriteLine("--------------------");
Console.WriteLine("Interval 会以 TimeSpan 间隔产生计时器标记事件");
Console.WriteLine("Observable.Interval(TimeSpan.FromSeconds(1))");
IObservable<long> ol = Observable.Interval(TimeSpan.FromSeconds(1));
using (var sub = OutputToConsole(ol))
{
Thread.Sleep(TimeSpan.FromSeconds(3));
}
Console.WriteLine("--------------------");
Console.WriteLine("Timer 指定了启动时间");
Console.WriteLine("Observable.Timer(DateTimeOffset.Now.AddSeconds(2))");
ol = Observable.Timer(DateTimeOffset.Now.AddSeconds(2));
using (var sub = OutputToConsole(ol))
{
Thread.Sleep(TimeSpan.FromSeconds(3));
}
Console.WriteLine("--------------------");
Console.ReadLine();
}
static IDisposable OutputToConsole<T>(IObservable<T> sequence)
{
return sequence.Subscribe(
obj => Console.WriteLine($"{obj}")
, ex => Console.WriteLine($"Error: {ex.Message}")
, () => Console.WriteLine("Completed")
);
}
运行结果
txt
使用值创建Observable方法
Observable.Return(0)
0
Completed
--------------------
不使用值创建Observable方法
Observable.Empty<int>()
Completed
--------------------
通过Observable.Throw触发OnError处理器
Observable.Throw<int>( new Exception())
Error: 引发类型为“System.Exception”的异常。
--------------------
使用Observable.Repeat创建无尽序列
Observable.Repeat(42)
42
42
42
42
42
Completed
--------------------
使用Observable.Range创建一组值
Observable.Range(0, 10)
0
1
2
3
4
5
6
7
8
9
Completed
--------------------
Observable.Create方法支持很多的自定义场景
0
1
2
3
4
5
6
7
8
9
--------------------
Observable.Generate是另一个创建自定义序列的方式
0
2
4
6
8
Completed
--------------------
Interval会以TimeSpan间隔产生计时器标记事件
Observable.Interval(TimeSpan.FromSeconds(1))
0
1
--------------------
Timer指定了启动时间
Observable.Timer(DateTimeOffset.Now.AddSeconds(2))
0
Completed
--------------------