C# 多线程 08-使用 Reactive Extensions 06-使用 Rx 创建异步操作
🏷️ 《C# 多线程》
示例代码
csharp
delegate string AsyncDelegate(string name);
/// <summary>
/// 使用 Rx 创建异步操作
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
IObservable<string> o = LongRunningOperationAsync("Task1");
using (var sub = OutputToConsole(o))
{
Thread.Sleep(TimeSpan.FromSeconds(2));
}
Console.WriteLine("--------------------");
Task<string> t = LongRunningOperationTaskAsync("Task2");
// 使用 ToObservable 方法将 Task 转换为 Observable 方法
using (var sub = OutputToConsole(t.ToObservable()))
{
Thread.Sleep(TimeSpan.FromSeconds(2));
}
Console.WriteLine("--------------------");
// 将异步编程模块模式转换为 Observable 类
AsyncDelegate asyncMethod = LongRunningOperation;
Func<string, IObservable<string>> observableFactory = Observable.FromAsyncPattern<string, string>(asyncMethod.BeginInvoke, asyncMethod.EndInvoke);
o = observableFactory("Task3");
using (var sub = OutputToConsole(o))
{
Thread.Sleep(TimeSpan.FromSeconds(2));
}
Console.WriteLine("--------------------");
// 对 Observable 操作使用 await
o = observableFactory("Task4");
AwaitOnObservable(o).Wait();
Console.WriteLine("--------------------");
// 把基于事件的异步模式直接转换为 Observable 类
using (var timer = new System.Timers.Timer(1000))
{
var ot = Observable.FromEventPattern<ElapsedEventHandler, ElapsedEventArgs>(
h => timer.Elapsed += h
, h => timer.Elapsed -= h
);
timer.Start();
using (var sub = OutputToConsole(ot))
{
Thread.Sleep(TimeSpan.FromSeconds(5));
}
Console.WriteLine("--------------------");
timer.Stop();
}
Console.ReadLine();
}
static async Task<T> AwaitOnObservable<T>(IObservable<T> observable)
{
T obj = await observable;
Console.WriteLine($"{obj}");
return obj;
}
static Task<string> LongRunningOperationTaskAsync(string name)
{
return Task.Run(() => LongRunningOperation(name));
}
static IObservable<string> LongRunningOperationAsync(string name)
{
// Observable.Start 与 TPL 中的 Task.Run 方法很相似。
// 启动异步操作并返回同一个字符串结果,然后退出
return Observable.Start(() => LongRunningOperation(name));
}
static string LongRunningOperation(string name)
{
Thread.Sleep(TimeSpan.FromSeconds(1));
return $"Task {name} is completed. Thread id {Thread.CurrentThread.ManagedThreadId}";
}
static IDisposable OutputToConsole(IObservable<EventPattern<ElapsedEventArgs>> sequence)
{
return sequence.Subscribe(
obj => Console.WriteLine($"{obj.EventArgs.SignalTime}")
, ex => Console.WriteLine($"Error: {ex.Message}")
, () => Console.WriteLine("Completed")
);
}
static IDisposable OutputToConsole<T>(IObservable<T> sequence)
{
return sequence.Subscribe(
obj => Console.WriteLine($"{obj}")
, ex => Console.WriteLine($"Error: {ex.Message}")
, () => Console.WriteLine("Completed")
);
}
运行结果
txt
Task Task1 is completed. Thread id 4
Completed
--------------------
Task Task2 is completed. Thread id 4
Completed
--------------------
Task Task3 is completed. Thread id 4
Completed
--------------------
Task Task4 is completed. Thread id 5
--------------------
2017/8/25 17:30:35
2017/8/25 17:30:36
2017/8/25 17:30:37
2017/8/25 17:30:38
--------------------