Skip to content

C# 多线程 08-使用 Reactive Extensions 03-使用 Subject

🏷️ 《C# 多线程》

示例代码

csharp
/// <summary>
/// 使用 Subject
/// Subject 代表了 IObservable 和 IObserver 这两个接口的实现
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    // 一旦订阅了 Subject,它就会把事件序列发送给订阅者
    Console.WriteLine("Subject");
    var subject = new Subject<string>();
    
    subject.OnNext("A"); // A 在订阅之前,不会被打印
    using (var subscription = OutputToConsole(subject))
    {
        subject.OnNext("B");
        subject.OnNext("C");
        subject.OnNext("D");
        // 当调用 OnCompleted 或 OnError 方法时,事件序列传播会被停止
        subject.OnCompleted();
        // 事件传播停止之后的事件不会被打印
        subject.OnNext("Will not be printed out");
    }

    Console.WriteLine("ReplaySubject");
    // ReplaySubject 可以缓存从广播开始的所有事件
    var replaySubject = new ReplaySubject<string>();

    replaySubject.OnNext("A");
    // 稍后订阅也可以获得之前的事件
    using (var subscription = OutputToConsole(replaySubject))
    {
        replaySubject.OnNext("B");
        replaySubject.OnNext("C");
        replaySubject.OnNext("D");
        replaySubject.OnCompleted();
    }
    Console.WriteLine("Buffered ReplaySubject");
    // 指定 ReplaySubject 缓存的大小
    // 参数 2 表示只可以缓存最后的 2 个事件
    var bufferedSubject = new ReplaySubject<string>(2);

    bufferedSubject.OnNext("A");
    bufferedSubject.OnNext("B");
    bufferedSubject.OnNext("C");
    using (var subscription = OutputToConsole(bufferedSubject))
    {
        bufferedSubject.OnNext("D");
        bufferedSubject.OnCompleted();
    }

    Console.WriteLine("Time window ReplaySubject");
    // 指定 ReplaySubject 缓存的事件
    // TimeSpan.FromMilliseconds(200) 表示只缓存 200ms 内发生的事件
    var timeSubject = new ReplaySubject<string>(TimeSpan.FromMilliseconds(200));
    timeSubject.OnNext("A");
    Thread.Sleep(TimeSpan.FromMilliseconds(100));
    timeSubject.OnNext("B");
    Thread.Sleep(TimeSpan.FromMilliseconds(100));
    timeSubject.OnNext("C");
    Thread.Sleep(TimeSpan.FromMilliseconds(100));
    using (var subscription = OutputToConsole(timeSubject))
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(300));
        timeSubject.OnNext("D");
        timeSubject.OnCompleted();
    }

    Console.WriteLine("AsyncSubject");
    // AsyncSubject 类似于任务并行库中的 Task 类型
    // 它代表了单个异步操作
    // 如果有多个事件发布,它将等待事件序列完成,并把最后一个事件提供给订阅者
    var asyncSubject = new AsyncSubject<string>();

    asyncSubject.OnNext("A");
    using (var subscription = OutputToConsole(asyncSubject))
    {
        asyncSubject.OnNext("B");
        asyncSubject.OnNext("C");
        asyncSubject.OnNext("D");
        asyncSubject.OnCompleted();
    }

    Console.WriteLine("BehaviorSubject");
    // BehaviorSubject 与 ReplaySubject 很相似,但它只缓存一个值
    // 并允许万一还没有发送任何通知时,指定一个默认值
    // 默认值会被自动替换为订阅前的最后一个事件
    var behaviorSubject = new BehaviorSubject<string>("Default");
    using (var subscription = OutputToConsole(behaviorSubject))
    {
        behaviorSubject.OnNext("B");
        behaviorSubject.OnNext("C");
        behaviorSubject.OnNext("D");
        behaviorSubject.OnCompleted();

    }

    Console.ReadLine();
}

static IDisposable OutputToConsole<T>(IObservable<T> sequence)
{
    return sequence.Subscribe(
        obj => Console.WriteLine($"{obj}")
        , ex => Console.WriteLine($"Error: {ex.Message}")
        , () => Console.WriteLine("Completed")
    );
}

运行结果

txt
Subject
B
C
D
Completed
ReplaySubject
A
B
C
D
Completed
Buffered ReplaySubject
B
C
D
Completed
Time window ReplaySubject
C
D
Completed
AsyncSubject
D
Completed
BehaviorSubject
Default
B
C
D
Completed