Skip to content

C# 多线程 08-使用 Reactive Extensions 01-将普通集合转换为异步的可观察集合

🏷️ 《C# 多线程》

添加 NuGet 包

  1. 右键单击项目的引用,选择“管理 NuGet 程序包”

  2. 在浏览中搜“rx-main”

  3. 搜索结果中选择“System.Reactive”(当前的最新稳定版是 3.1.1),点击右边的“安装”按钮

    Reactive Extensions (Rx) Main Library combining the interfaces, core, LINQ, and platform services libraries.

示例代码

csharp
/// <summary>
/// 将普通集合转换为异步的可观察集合
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    foreach (int i in EnumerableEventSequence())
    {
        Console.Write(i);
    }

    Console.WriteLine();
    Console.WriteLine("IEnumerable");

    // 使用 ToObservable 扩展方法,把可枚举的集合转换为可观察的集合
    IObservable<int> o = EnumerableEventSequence().ToObservable();
    // 使用 Subscribe 方法订阅该可观察集合的更新
    using (IDisposable subscription = o.Subscribe(Console.Write))
    {
        Console.WriteLine();
        Console.WriteLine("IObservable");
    }

    // 使用 SubscribeOn 方法使其异步化,并提供其 TPL 任务池调度程序。
    // 这可以使 UI 在集合更新时仍保持响应并做些其它的事情。
    o = EnumerableEventSequence().ToObservable()
        .SubscribeOn(TaskPoolScheduler.Default);
    using (IDisposable subscription = o.Subscribe(Console.Write))
    {
        // 因为是异步的,所以会先执行 using 块中的代码
        Console.WriteLine();
        Console.WriteLine("IObservable async");
        // 如果注释掉该行代码,会导致主线程立即结束,从而异步处理也会结束
        Console.ReadLine();
    }
}

/// <summary>
/// 模拟一个效率不高的可枚举的集合
/// </summary>
/// <returns></returns>
static IEnumerable<int> EnumerableEventSequence()
{
    for (int i = 0; i < 10; i++)
    {
        Thread.Sleep(TimeSpan.FromSeconds(0.5));
        yield return i;
    }
}

运行结果

txt
0123456789
IEnumerable
0123456789
IObservable

IObservable async
0123456789