C# 多线程 05-使用 C#6.0 08-自定义 awaitable 类型

awaitable 表达式要求

为了与 await 操作符保持兼容,类型应当遵守在 C# 规则说明中规定的一些要求。

我安装的是 VS2017,其路径为:

*C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC#\Specifications\1033     [CSharp Language Specification.docx](/uploads/userfiles/e671fced-86e2-4d56-a20a-d79aab766aa7/files/2017/08/CSharp Language Specification.docx)*
C# 多线程 05-使用 C#6.0 07-使用 async void 方法

07-使用 async void 方法

/// <summary>
/// 使用 async void 方法
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    // 使用 async Task 可以通过返回值的 Task 示例,监控任务的状态
    Task t = AsyncTask();
    t.Wait();

    // 使用 async void 没有返回值,无法监控任务状态
    AsyncVoid();
    // 这里使用 Sleep 方法确保任务完成
    Thread.Sleep(TimeSpan.FromSeconds(3));

    // 根据 Task.IsFaulted 属性可以判断是否发生异常
    // 捕获的异常信息可以从 Task.Exception 中获取
    t = AsyncTaskWithErrors();
    while (!t.IsFaulted)
    {
        Thread.Sleep(TimeSpan.FromSeconds(1));
    }
    Console.WriteLine(t.Exception);

    // 该段代码虽然使用 try/catch 捕获异常,但是由于使用了 async void 方法,
    // 异常处理方法会被放置到当前的同步上下文中(即线程池的线程中)。
    // 线程池中未被处理的异常会终止整个进程。

    //try
    //{
    //    AsyncVoidWithErrors();
    //    Thread.Sleep(TimeSpan.FromSeconds(3));
    //}
    //catch (Exception ex)
    //{
    //    Console.WriteLine(ex);
    //}

    // Action 类型也是可以使用 async 关键字的;
    // 在 lambda 表达式中很容易忘记对异常的处理,而这会导致程序崩溃。
    int[] numbers = { 1, 2, 3, 4, 5 };
    Array.ForEach(numbers, async number =>
    {
        await Task.Delay(TimeSpan.FromSeconds(1));
        if (number == 3)
        {
            throw new Exception("Boom!");
        }
        Console.WriteLine(number);
    });

    Console.ReadLine();
}

static async Task AsyncTaskWithErrors()
{
    string result = await GetInfoAsync("AsyncTaskException", 2);
    Console.WriteLine(result);
}

static async void AsyncVoidWithErrors()
{
    string result = await GetInfoAsync("AsyncVoidException", 2);
    Console.WriteLine(result);
}

static async Task AsyncTask()
{
    string result = await GetInfoAsync("AsyncTask", 2);
    Console.WriteLine(result);
}

static async void AsyncVoid()
{
    string result = await GetInfoAsync("AsyncVoid", 2);
    Console.WriteLine(result);
}

static async Task<string> GetInfoAsync(string name, int seconds)
{
    await Task.Delay(TimeSpan.FromSeconds(seconds));
    if (name.Contains("Exception"))
    {
        throw new Exception($"Boom from {name}");
    }
    return $"Task {name} is running on a thrad id {Thread.CurrentThread.ManagedThreadId}. " +
        $"Is thread pool thread: {Thread.CurrentThread.IsThreadPoolThread}";
}
C# 多线程 05-使用 C#6.0 06-避免使用捕获的同步上下文

避免使用捕获的同步上下文

/// <summary>
/// 避免使用捕获的同步上下文
/// </summary>
/// <param name="args"></param>
[STAThread]
static void Main(string[] args)
{
    var app = new Application();
    var win = new Window();
    var panel = new StackPanel();
    var button = new Button();

    _label = new Label();
    _label.FontSize = 32;
    _label.Height = 200;

    button.Height = 100;
    button.FontSize = 32;
    button.Content = new TextBlock { Text = "开始异步操作" };
    button.Click += Click;

    panel.Children.Add(_label);
    panel.Children.Add(button);

    win.Content = panel;
    app.Run(win);

    Console.ReadLine();
}

private static Label _label;

static async void Click(object sender, EventArgs e)
{
    _label.Content = new TextBlock { Text = "计算中......" };
    TimeSpan resultWithContent = await Test();
    TimeSpan resultNoContent = await TestNoContent();
    //TimeSpan resultNoContent = await TestNoContent().ConfigureAwait(false);

    var sb = new StringBuilder();
    sb.AppendLine($"With the content: {resultWithContent}");
    sb.AppendLine($"Without the content: {resultNoContent}");
    sb.AppendLine($"Ratio: {resultWithContent.TotalMilliseconds / resultNoContent.TotalMilliseconds:0.00}");

    _label.Content = new TextBlock { Text = sb.ToString() };
}

static async Task<TimeSpan> Test()
{
    const int iterationsNumber = 100000;
    var sw = new Stopwatch();
    sw.Start();
    for (int i = 0; i < iterationsNumber; i++)
    {
        var t = Task.Run(() => { });
        await t;
    }
    sw.Stop();
    return sw.Elapsed;
}

static async Task<TimeSpan> TestNoContent()
{
    const int interationsNumber = 100000;
    var sw = new Stopwatch();
    sw.Start();
    for (int i = 0; i < interationsNumber; i++)
    {
        var t = Task.Run(() => { });
        // 将 continueOnCapturedContext 指定为 false,不使用捕获的同步上下文运行后续操作代码
        await t.ConfigureAwait(continueOnCapturedContext: false);
    }
    sw.Stop();
    return sw.Elapsed;
}
C# 多线程 05-使用 C#6.0 05-处理异步操作中的异常

处理异步操作中的异常

/// <summary>
/// 处理异步操作中的异常
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    Task t = AsynchronousProcessing();
    t.Wait();

    Console.ReadLine();
}

static async Task AsynchronousProcessing()
{
    Console.WriteLine("1. 单个异常");
    Console.WriteLine("   使用 try/catch 捕获单个异常");
    try
    {
        string result = await GetInfoAsync("任务 1", 2);
        Console.WriteLine(result);
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Exception details: {ex}");
    }

    Console.WriteLine();
    Console.WriteLine("2. 多个异常");
    Console.WriteLine("   尝试使用 try/catch 捕获多个异常,但此种写法仅能捕获其中一个异常");
    
    Task<string> t1 = GetInfoAsync("任务 1", 3);
    Task<string> t2 = GetInfoAsync("任务 2", 2);
    try
    {
        string[] results = await Task.WhenAll(t1, t2);
        Console.WriteLine(results.Length);
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Exception details: {ex}");
    }

    Console.WriteLine();
    Console.WriteLine("3. 使用 AggregateException 捕获多个异常");
    Console.WriteLine("   从 task 的 Exception 属性中获取多个异常的信息");
    
    t1 = GetInfoAsync("任务 1", 3);
    t2 = GetInfoAsync("任务 2", 2);
    Task<string[]> t3 = Task.WhenAll(t1, t2);
    try
    {
        string[] results = await t3;
        Console.WriteLine(results.Length);
    }
    catch
    {
        var ae = t3.Exception.Flatten();
        var exceptions = ae.InnerExceptions;
        Console.WriteLine($"Exceptions caght: {exceptions.Count}");
        foreach (var e in exceptions)
        {
            Console.WriteLine($"Exception details: {e}");
            Console.WriteLine();
        }
    }

    Console.WriteLine();
    Console.WriteLine("4. 在 catch 和 finally 块中使用 await");
    Console.WriteLine("   c#6.0中,允许在catch和finally块中使用await,在之前的版本中这种写法会报错");
    Console.WriteLine("   若将语言版本改为C#5.0编译,则会报如下错误:");
    Console.WriteLine("     CS1985 无法在 catch 子句中等待");
    Console.WriteLine("     CS1984  无法在 finally 子句体中等待");
    
    try
    {
        string result = await GetInfoAsync("任务 1", 2);
        Console.WriteLine(result);
    }
    catch (Exception ex)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));
        Console.WriteLine($"Catch block with await: Exception details: {ex}");
    }
    finally
    {
        await Task.Delay(TimeSpan.FromSeconds(1));
        Console.Write("Finally block");
    }
}

static async Task<string> GetInfoAsync(string name, int seconds)
{
    await Task.Delay(TimeSpan.FromSeconds(seconds));
    throw new Exception($"Boom from {name}");
}
C# 多线程 05-使用 C#6.0 04-对并行执行的异步任务使用 await 操作符

对并行执行的异步任务使用 await 操作符

/// <summary>
/// 对并行执行的异步任务使用 await 操作符
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    Task t = AsynchronousProcessing();
    t.Wait();

    Console.ReadLine();
}

static async Task AsynchronousProcessing()
{
    // 定义了两个任务,分别运行 3 秒和 5 秒
    Task<string> t1 = GetInfoAsync("Task 1", 3);
    Task<string> t2 = GetInfoAsync("Task 2", 5);
    // 使用 Task.WhenALL 辅助方法创建一个任务,该任务只有在所有底层任务完成后才会运行
    string[] results = await Task.WhenAll(t1, t2);
    // 5 秒后获得观察结果
    foreach (string result in results)
    {
        Console.WriteLine(result);
    }
}

static async Task<string> GetInfoAsync(string name, int seconds)
{
    // Task.Delay 方法不阻塞线程,而是在幕后使用了一个计时器
    // 导致两个任务一般总是由同一个线程运行
    await Task.Delay(TimeSpan.FromSeconds(seconds));
    // 如果使用下面注释掉的代码模拟运行时间,则两个任务总是由不同的线程运行
    // 因为 Thread.Sleep 方法阻塞线程
    // await Task.Run(() => Thread.Sleep(TimeSpan.FromSeconds(seconds)));
    return $"Task {name} is running on a thrad id {Thread.CurrentThread.ManagedThreadId}. " +
        $"Is thread pool thread: {Thread.CurrentThread.IsThreadPoolThread}";
}
C# 多线程 11-更多信息 03-在通用 Windows 平台应用中使用 BackgroundTask

实现方式

  1. 新建 C# => Windows 通常 => 空 App 项目

  2. 打开 Package.appxmanifest 文件。在 Declarations(声明)标签中,添加 Background Tasks(后台任务)到 Supported Declarations(支持的声明)。

    Properties(属性中),选择 System event(系统事件)和 Timer(计时器),并将Entry point(入口点)设置为 YourNamespace.TileSchedulerTask

    YourNamespace是你的命名空间。

C# 多线程 11-更多信息 02-在通常的应用程序中使用 WinRT

实现方式

  1. 从解决方案中卸载项目

  2. 右键单击被卸载的项目,选择【编辑 projectname.csproj】

  3. <TargetFrameworkVersion> 元素下添加如下 XML 代码

    <TragetPlatformVersion>10.0</TragetPlatformVersion>
    
  4. 保存工程文件,并再次右键单击项目,选择【重新加载项目】

  5. 添加如下引用

    • C:\Program Files (x86)\Windows Kits\10\UnionMetadata\10.0.15063.0\Windows.winmd
    • C:\Program Files (x86)\Reference Assemblies\Microsoft\Framework\.NETCore\v4.5\System.Runtime.WindowsRuntime.dll
C# 多线程 11-更多信息 01-在通用 Windows 平台应用中使用计时器

本例需要 Win10 系统,并且开启开发者模式。

示例代码

/// <summary>
/// 可用于自身或导航至 Frame 内部的空白页。
/// </summary>
public sealed partial class MainPage : Page
{
    public MainPage()
    {
        this.InitializeComponent();
        // 初始化计时器
        _timer = new DispatcherTimer();
        _ticks = 0;
    }

    private readonly DispatcherTimer _timer;
    private int _ticks;

    protected override void OnNavigatedTo(NavigationEventArgs e)
    {
        base.OnNavigatedTo(e);
        Grid.Children.Clear();

        var commonPanel = new StackPanel
        {
            Orientation = Orientation.Vertical,
            HorizontalAlignment = HorizontalAlignment.Center
        };

        var buttonPanel = new StackPanel
        {
            Orientation = Orientation.Horizontal,
            HorizontalAlignment = HorizontalAlignment.Center
        };

        var textBlock = new TextBlock
        {
            Text = "Sample timer application",
            FontSize = 32,
            HorizontalAlignment = HorizontalAlignment.Center,
            Margin = new Thickness(40)
        };

        var timerTextBlock = new TextBlock
        {
            Text = "0",
            FontSize = 32,
            HorizontalAlignment = HorizontalAlignment.Center,
            Margin = new Thickness(40)
        };

        var timerStateTextBlock = new TextBlock
        {
            Text = "Timer is enabled",
            FontSize = 32,
            HorizontalAlignment = HorizontalAlignment.Center,
            Margin = new Thickness(40)
        };

        var startButton = new Button { Content = "Start", FontSize = 32 };
        var stopButton = new Button { Content = "Stop", FontSize = 32 };

        buttonPanel.Children.Add(startButton);
        buttonPanel.Children.Add(stopButton);

        commonPanel.Children.Add(textBlock);
        commonPanel.Children.Add(timerTextBlock);
        commonPanel.Children.Add(timerStateTextBlock);
        commonPanel.Children.Add(buttonPanel);

        // 设置间隔时间为 1s
        _timer.Interval = TimeSpan.FromSeconds(1);
        // 设置每个计时器间隔事件
        _timer.Tick += (sender, eventArgs) =>
        {
            timerTextBlock.Text = _ticks.ToString();
            _ticks++;
        };
        // 启动计时器
        _timer.Start();

        startButton.Click += (sender, eventArgs) =>
        {
            timerTextBlock.Text = "0";
            // 启动计时器
            _timer.Start();
            // 重置计数
            _ticks = 1;
            timerStateTextBlock.Text = "Timer is enabled";
        };

        stopButton.Click += (sender, eventArgs) =>
        {
            // 停止计时器
            _timer.Stop();
            timerStateTextBlock.Text = "Timer is disabled.";
        };

        Grid.Children.Add(commonPanel);
    }
}
C# 多线程 10-并行编程模式 04-使用 PLINQ 实现 Map/Reduce 模式

Map/Reduce 模式

Map/Reduce 功能是另一个重要的并行编程模式。它适用于小程序以及拥有大量的多个服务器端计算的场景。

该模式的含义是你有两个特殊的功能要应用于你的数据。

  1. Map 函数

    接收一组键/值列表的初始数据,并产生另一组键/值序列,将初始数据转换为适合的格式以便进行下一部处理。

  2. Reduce 函数

    使用 Map 函数的结果,并将其转换为我们真正需要的尽可能小的数据集。

C# 多线程 10-并行编程模式 03-使用 TPL 数据流实现并行管道

提示

需要添加对 Microsoft.Tpl.Dataflow 包的引用(通过 nuget 搜【Microsoft.Tpl.Dataflow】)

示例代码

/// <summary>
/// 使用 TPL 数据流实现并行管道
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var t = ProcessAsynchronously();
    t.GetAwaiter().GetResult();
}

async static Task ProcessAsynchronously()
{
    var cts = new CancellationTokenSource();
    Random _rnd = new Random(DateTime.Now.Millisecond);

    Task.Run(() =>
    {
            if (Console.ReadKey().KeyChar == 'c')
            {
                cts.Cancel();
            }
    }, cts.Token);

    // BufferBlock:将元素传给流中的下一个块
    // BoundedCapacity:指定其容量,超过时不再接受新元素,直到一个现有元素被传递给下一个块
    var inputBlock = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = cts.Token });

    // TransformBlock:用于数据转换步骤
    //   MaxDegreeOfParallelism:通过该选项指定最大工作者线程数
    // 将 int 转换为 decimal
    var convertToDecimalBlock = new TransformBlock<int, decimal>(n =>
    {
        decimal result = Convert.ToDecimal(n * 100);
        Console.WriteLine($"Decimal Converter sent {result} to the next stage on {Thread.CurrentThread.ManagedThreadId}");
        Thread.Sleep(TimeSpan.FromMilliseconds(_rnd.Next(200)));
        return result;
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, CancellationToken = cts.Token });

    // 将 decimal 转换为 string
    var stringifyBlock = new TransformBlock<decimal, string>(n =>
    {
        string result = $"--{n.ToString("C", CultureInfo.GetCultureInfo("en-us"))}--";
        Console.WriteLine($"String Formatter sent {result} to the next stage on {Thread.CurrentThread.ManagedThreadId}");
        Thread.Sleep(TimeSpan.FromMilliseconds(_rnd.Next(200)));
        return result;
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, CancellationToken = cts.Token });

    // ActionBlock:对每个传入的元素运行一个指定的操作
    var outputBlock = new ActionBlock<string>(s => {
        Console.WriteLine($"The final result is {s} on thread id {Thread.CurrentThread.ManagedThreadId}");
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, CancellationToken = cts.Token });

    // 通过 LinkTo 方法将这些块连接到一起
    // PropagateCompletion = true : 当前步骤完成时,自动将结果和异常传播到下一个阶段
    inputBlock.LinkTo(convertToDecimalBlock, new DataflowLinkOptions { PropagateCompletion = true });
    convertToDecimalBlock.LinkTo(stringifyBlock, new DataflowLinkOptions { PropagateCompletion = true });
    stringifyBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true });

    try
    {
        // 向块中添加项
        Parallel.For(0, 20, new ParallelOptions {
            MaxDegreeOfParallelism = 4, CancellationToken = cts.Token
        }, i => {
            Console.WriteLine($"added {i} to source data on thread id {Thread.CurrentThread.ManagedThreadId}");
            inputBlock.SendAsync(i).GetAwaiter().GetResult();
        });
        // 添加完成后需要调用 Complete 方法
        inputBlock.Complete();
        // 等待最后的块完成
        await outputBlock.Completion;
        Console.WriteLine("Press ENTER to exit.");
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Operation has been canceled! Press ENTER to exit.");
    }

    Console.ReadLine();
}
C# 多线程 10-并行编程模式 02-使用 BlockingCollection 实现并行管道

示例代码

private const int CollectionsNumber = 4;
private const int Count = 5;

/// <summary>
/// 使用 BlockingCollection 实现并行管道
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var cts = new CancellationTokenSource();

    // 监视 c 键(按下 c 键取消执行)
    Task.Run(() =>
    {
        if (Console.ReadKey().KeyChar == 'c')
        {
            cts.Cancel();
        }
    }, cts.Token);

    // 创建 4 个集合,每个集合中有 5 个元素
    var sourceArrays = new BlockingCollection<int>[CollectionsNumber];
    for (int i = 0; i < sourceArrays.Length; i++)
    {
        sourceArrays[i] = new BlockingCollection<int>(Count);
    }

    // 第一个管道:将 int 型数据转换成 Decimal 型
    var convertToDecimal = new PipelineWorker<int, decimal>(
        sourceArrays,
        n => Convert.ToDecimal(n * 100),
        cts.Token,
        "Decimal Converter"
        );

    // 第二个管道:格式化 Decimal 数据为金额字符串
    var stringifyNumber = new PipelineWorker<decimal, string>(
        convertToDecimal.Output,
        s => $"--{s.ToString("C", CultureInfo.GetCultureInfo("en-us"))}--",
        cts.Token,
        "Console Formatter"
        );

    // 第三个管道:打印格式化后的结果
    var outputResultToConsole = new PipelineWorker<string, string>(
        stringifyNumber.Output,
        s => Console.WriteLine($"The final result is {s} on thread id {Thread.CurrentThread.ManagedThreadId}"),
        cts.Token,
        "Console Output"
        );

    try
    {
        Parallel.Invoke(
            // 初始化集合数据
            () => CreateInitialValues(sourceArrays, cts),
            // 将 int 型数据转换成 Decimal 型
            () => convertToDecimal.Run(),
            // 格式化 Decimal 数据为金额字符串
            () => stringifyNumber.Run(),
            // 打印格式化后的结果
            () => outputResultToConsole.Run()
            );
    }
    catch (AggregateException ae)
    {
        foreach (var ex in ae.InnerExceptions)
        {
            Console.WriteLine(ex.Message + ex.StackTrace);
        }
    }

    if (cts.Token.IsCancellationRequested)
    {
        Console.WriteLine("Operation has been canceled! Press ENTER to exit.");
    }
    else
    {
        Console.WriteLine("Press ENTER to exit.");
    }

    Console.ReadLine();
}

/// <summary>
/// 初始化集合数据
/// </summary>
/// <param name="sourceArrays"></param>
/// <param name="cts"></param>
static void CreateInitialValues(BlockingCollection<int>[] sourceArrays, CancellationTokenSource cts)
{
    Parallel.For(0, sourceArrays.Length * Count, (j, state) =>
    {
        if (cts.Token.IsCancellationRequested)
        {
            state.Stop();
        }
        int number = GetRandomNumber(j);
        int k = BlockingCollection<int>.TryAddToAny(sourceArrays, j);
        if (k >= 0)
        {
            Console.WriteLine($"added {j} to source data on thread id {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(TimeSpan.FromMilliseconds(number));
        }
    });

    foreach (var arr in sourceArrays)
    {
        arr.CompleteAdding();
    }
}

static int GetRandomNumber(int seed)
{
    return new Random(seed).Next(500);
}

/// <summary>
/// 管道类
/// </summary>
/// <typeparam name="TInput"></typeparam>
/// <typeparam name="TOutput"></typeparam>
class PipelineWorker<TInput, TOutput>
{
    Func<TInput, TOutput> _processor;
    Action<TInput> _outputProcessor;
    BlockingCollection<TInput>[] _input;
    CancellationToken _token;
    Random _rnd;

    public BlockingCollection<TOutput>[] Output { get; private set; }

    public string Name { get; private set; }

    public PipelineWorker(
        BlockingCollection<TInput>[] input,
        Func<TInput, TOutput> processor,
        CancellationToken token,
        string name)
    {
        _input = input;
        Output = new BlockingCollection<TOutput>[_input.Length];
        for (int i = 0; i < Output.Length; i++)
        {
            Output[i] = null == input[i] ? null : new BlockingCollection<TOutput>(Count);
        }

        _processor = processor;
        _token = token;
        Name = name;
        _rnd = new Random(DateTime.Now.Millisecond);
    }

    public PipelineWorker(
        BlockingCollection<TInput>[] input,
        Action<TInput> renderer,
        CancellationToken token,
        string name)
    {
        _input = input;
        _outputProcessor = renderer;
        _token = token;
        Name = name;
        Output = null;
        _rnd = new Random(DateTime.Now.Millisecond);
    }

    public void Run()
    {
        Console.WriteLine($"{Name} is running");
        while (!_input.All(bc => bc.IsCompleted) && !_token.IsCancellationRequested)
        {
            TInput receivedItem;
            // 尝试从集合中获取元素;如没有元素则会等待
            int i = BlockingCollection<TInput>.TryTakeFromAny(_input, out receivedItem, 50, _token);
            if (i >= 0)
            {
                if (Output != null)
                {
                    TOutput outputItem = _processor(receivedItem);
                    BlockingCollection<TOutput>.AddToAny(Output, outputItem);
                    Console.WriteLine($"{Name} sent {outputItem} to next, on thread id {Thread.CurrentThread.ManagedThreadId}");
                    Thread.Sleep(TimeSpan.FromMilliseconds(_rnd.Next(200)));
                }
                else
                {
                    _outputProcessor(receivedItem);
                }
            }
            else
            {
                Thread.Sleep(TimeSpan.FromMilliseconds(50));
            }
        }

        if (Output != null)
        {
            foreach (var bc in Output)
            {
                bc.CompleteAdding();
            }
        }
    }
}
C# 多线程 10-并行编程模式 01-实现惰性求值的共享状态

示例代码

/// <summary>
/// 实现惰性求值的共享状态
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var t = ProcessAsynchronously();
    t.Wait();

    Console.WriteLine("Press Enter to exit");
    Console.ReadLine();
}

static async Task ProcessAsynchronously()
{
    // 不安全的对象,构造方法会被调用了多次,
    // 并且不同的线程中值是不同的
    var unsafeState = new UnsafeState();
    Task[] tasks = new Task[4];

    for (int i = 0; i < 4; i++)
    {
        tasks[i] = Task.Run(() => Worker(unsafeState));
    }

    await Task.WhenAll(tasks);
    Console.WriteLine("------------------------------");

    // 使用双重锁定模式
    var firstState = new DoubleCheckedLocking();
    for (int i = 0; i < 4; i++)
    {
        tasks[i] = Task.Run(() => Worker(firstState));
    }

    await Task.WhenAll(tasks);
    Console.WriteLine("------------------------------");

    // 使用 LazyInitializer.EnsureInitialized 方法
    var secondState = new BCLDoubleChecked();
    for (int i = 0; i < 4; i++)
    {
        tasks[i] = Task.Run(() => Worker(secondState));
    }

    await Task.WhenAll(tasks);
    Console.WriteLine("------------------------------");

    // 使用 Lazy<T>类型
    var lazy = new Lazy<ValueToAccess>(Compute);
    var thirdState = new LazyWrapper(lazy);
    for (int i = 0; i < 4; i++)
    {
        tasks[i] = Task.Run(() => Worker(thirdState));
    }

    await Task.WhenAll(tasks);
    Console.WriteLine("------------------------------");

    // 使用 LazyInitializer.EnsureInitialized 方法的一个不使用锁的重载
    var fourthState = new BCLThreadSafeFactory();
    for (int i = 0; i < 4; i++)
    {
        tasks[i] = Task.Run(() => Worker(fourthState));
    }

    await Task.WhenAll(tasks);
    Console.WriteLine("------------------------------");
}

private static void Worker(IHasValue state)
{
    Console.WriteLine($"Worker runs on thread id {Thread.CurrentThread.ManagedThreadId}");
    Console.WriteLine($"State value: {state.Value.Text}");
}

class UnsafeState : IHasValue
{
    private ValueToAccess _value;

    // 不安全的对象,构造方法 Compute 会被调用多次
    public ValueToAccess Value => _value ?? (_value = Compute());
}

class DoubleCheckedLocking : IHasValue
{
    private readonly object _syncRoot = new object();
    private volatile ValueToAccess _value;
    
    public ValueToAccess Value
    {
        get
        {
            // 使用锁及双重验证,确保构造方法 Compute 仅执行一次
            if (_value == null)
            {
                lock (_syncRoot)
                {
                    if (_value == null)
                    {
                        _value = Compute();
                    }
                }
            }
            return _value;
        }
    }
}

class BCLDoubleChecked : IHasValue
{
    private object _syncRoot = new object();
    private ValueToAccess _value;
    private bool _initialized;

    // 使用 LazyInitializer.EnsureInitialized 方法初始化
    // 该方法内部实现了双重锁定模式
    public ValueToAccess Value => LazyInitializer.EnsureInitialized(ref _value, ref _initialized, ref _syncRoot, Compute);
}

class BCLThreadSafeFactory : IHasValue
{
    private ValueToAccess _value;

    // 使用 LazyInitializer.EnsureInitialized 方法初始化
    // 这个构造函数的重载没有使用锁,会导致初始化方法 Compute 被执行多次,但是结果的对象仍然是线程安全的
    public ValueToAccess Value => LazyInitializer.EnsureInitialized(ref _value, Compute);
}

class LazyWrapper : IHasValue
{
    // 使用 Lazy<T>类型
    // 效果同使用 LazyInitializer 一样
    // 区别是 LazyInitializer 是静态类,不需要初始化
    private readonly Lazy<ValueToAccess> _value;

    public LazyWrapper(Lazy<ValueToAccess> value)
    {
        _value = value;
    }

    public ValueToAccess Value => _value.Value;
}

static ValueToAccess Compute()
{
    Console.WriteLine($"The value is being constructed on a thread id {Thread.CurrentThread.ManagedThreadId}");
    Thread.Sleep(TimeSpan.FromSeconds(1));
    return new ValueToAccess($"Constructed on thread id {Thread.CurrentThread.ManagedThreadId}");
}

interface IHasValue
{
    ValueToAccess Value { get; }
}

class ValueToAccess
{
    private readonly string _text;
    public ValueToAccess(string text)
    {
        _text = text;
    }
    public string Text => _text;
}
C# 多线程 09-使用异步 I/O 04-异步调用 WCF 服务

提示

示例代码需要添加对 System.ServiceModel 的引用。

示例代码

/// <summary>
/// 异步调用 WCF 服务
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    ServiceHost host = null;

    try
    {
        // 创建一个的主机(服务类型为 HelloWorldService;承载服务的地址为 http://localhost:1234/HelloWorld)
        host = new ServiceHost(typeof(HelloWorldService), new Uri(SERVICE_URL));
        var metadata = host.Description.Behaviors.Find<ServiceMetadataBehavior>() ?? new ServiceMetadataBehavior();
        // 允许通过 HTTPGET 获取元数据
        // (即可以通过浏览器访问 http://localhost:1234/HelloWorld?wsdl 或 http://localhost:1234/HelloWorld?singleWsdl 获取 xml 格式的元数据)
        metadata.HttpGetEnabled = true;
        // 指定正在使用的 WS-Policy 规范的版本。
        metadata.MetadataExporter.PolicyVersion = PolicyVersion.Policy15;
        host.Description.Behaviors.Add(metadata);

        host.AddServiceEndpoint(ServiceMetadataBehavior.MexContractName, MetadataExchangeBindings.CreateMexHttpBinding(), "mex");

        var endpoint = host.AddServiceEndpoint(typeof(IHelloWorldService), new BasicHttpBinding(), SERVICE_URL);

        host.Faulted += (sender, e) => Console.WriteLine("Error!");

        // 开启主机
        host.Open();

        Console.WriteLine($"Greeting service is running and listening on: {endpoint.Address} ({endpoint.Binding.Name})");

        var client = RunServiceClient();
        client.GetAwaiter().GetResult();
        
        Console.WriteLine("Press Enter to exit");
        Console.ReadLine();
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Error in catch block: {ex}");
    }
    finally
    {
        if (null != host)
        {
            if (host.State == CommunicationState.Faulted)
            {
                host.Abort();
            }
            else
            {
                host.Close();
            }
        }
    }
}

const string SERVICE_URL = "http://localhost:1234/HelloWorld";

/// <summary>
/// 模拟远程访问 WCF 服务
/// </summary>
/// <returns></returns>
static async Task RunServiceClient()
{
    var endpoint = new EndpointAddress(SERVICE_URL);
    // 创建服务通道
    var channel = ChannelFactory<IHelloWorldServiceClient>.CreateChannel(new BasicHttpBinding(), endpoint);

    // 通过通道异步访问 Greet 服务
    var greeting = await channel.GreetAsync("Eugene");
    Console.WriteLine(greeting);
}

/// <summary>
/// 服务端接口
/// </summary>
[ServiceContract(Namespace = "Packt", Name = "HelloWorldServiceContract")]
public interface IHelloWorldService
{
    [OperationContract]
    string Greet(string name);
}

/// <summary>
/// 客户端服务接口
/// </summary>
[ServiceContract(Namespace = "Packt", Name = "HelloWorldServiceContract")]
public interface IHelloWorldServiceClient
{
    [OperationContract]
    string Greet(string name);

    [OperationContract]
    Task<string> GreetAsync(string name);
}

/// <summary>
/// 服务端实现
/// </summary>
public class HelloWorldService : IHelloWorldService
{
    public string Greet(string name)
    {
        return $"Greetings, {name}";
    }
}
C# 多线程 09-使用异步 I/O 03-异步操作数据库

示例代码

/// <summary>
/// 异步操作数据库
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    const string dataBaseName = "CustomDatabase";
    var t = ProcessAsynchronousIO(dataBaseName);
    t.GetAwaiter().GetResult();
    Console.WriteLine("Press Enter to exit");
    Console.ReadLine();
}

static async Task ProcessAsynchronousIO(string dbName)
{
    try
    {
        const string connectionString = @"Data Source=(LocalDB)\MSSQLLocalDB;Initial Catalog=master;Integrated Security=True";
        string outputFolder = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);

        string dbFileName = Path.Combine(outputFolder, $"{dbName}.mdf");
        string dbLogFileName = Path.Combine(outputFolder, $"{dbName}_log.ldf");
        string dbConnectionString = $"Data Source=(LocalDB)\\MSSQLLocalDB;AttachDBFileName={dbFileName};Integrated Security=True;";

        using (var connection = new SqlConnection(connectionString))
        {
            await connection.OpenAsync();

            if (File.Exists(dbFileName))
            {
                Console.WriteLine("Detaching the database...");
                // 分离数据库(sp_detach_db:该命令常用来删除数据库日志文件或者移动数据库文件)
                var detachCommand = new SqlCommand("sp_detach_db", connection);
                detachCommand.CommandType = System.Data.CommandType.StoredProcedure;
                detachCommand.Parameters.AddWithValue("@dbname", dbName);

                await detachCommand.ExecuteNonQueryAsync();

                Console.WriteLine("The database was detached successfully.");
                Console.WriteLine("Deleting the database");

                // 删除数据库日志文件
                if (File.Exists(dbLogFileName))
                {
                    File.Delete(dbLogFileName);
                }
                // 删除数据库文件
                File.Delete(dbFileName);

                Console.WriteLine("The database was deleted successfully");
            }

            Console.WriteLine("Creating the database");
            // 创建数据库
            string createCommand = $"CREATE DATABASE {dbName} ON (NAME = N'{dbName}', FILENAME = '{dbFileName}')";
            var cmd = new SqlCommand(createCommand, connection);

            await cmd.ExecuteNonQueryAsync();

            Console.WriteLine("The database was created successfully");
        }

        using (var connection = new SqlConnection(dbConnectionString))
        {
            // 异步连接数据库
            await connection.OpenAsync();
            var cmd = new SqlCommand("SELECT newid()", connection);
            var result = await cmd.ExecuteNonQueryAsync();

            // 创建表
            cmd = new SqlCommand(@"CREATE TABLE [dbo].[CustomTable]([ID] [int] IDENTITY(1,1) NOT NULL,
[NAME] [nvarchar](50) NOT NULL,
CONSTRAINT [PK_ID] PRIMARY KEY CLUSTERED ([ID] ASC) ON [PRIMARY]) ON [PRIMARY]", connection);
            await cmd.ExecuteNonQueryAsync();
            Console.WriteLine("Table was created successfully");

            // 插入数据
            cmd = new SqlCommand(@"INSERT INTO [dbo].[CustomTable] (Name) VALUES ('John');
INSERT INTO [dbo].[CustomTable] (Name) VALUES ('Peter');
INSERT INTO [dbo].[CustomTable] (Name) VALUES ('james');
INSERT INTO [dbo].[CustomTable] (Name) VALUES ('Eugene');", connection);
            await cmd.ExecuteNonQueryAsync();
            Console.WriteLine("Inserted data successfully");
            Console.WriteLine("Reading data from table...");

            // 查询上面插入的数据
            cmd = new SqlCommand(@"SELECT * FROM [dbo].[CustomTable]", connection);
            // 异步查询数据
            using (SqlDataReader reader = await cmd.ExecuteReaderAsync())
            {
                // 异步读取查询结果
                while (await reader.ReadAsync())
                {
                    var id = reader.GetFieldValue<int>(0);
                    var name = reader.GetFieldValue<string>(1);

                    Console.WriteLine("Table row: Id {0}, Name {1}", id, name);
                }
            }
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine("Error: {0}", ex.Message);
    }
}
C# 多线程 09-使用异步 I/O 02-编写一个异步的 HTTP 服务器和客户端

示例代码

/// <summary>
/// 编写一个异步的 HTTP 服务器和客户端
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var server = new AsyncHttpServer(1234);
    var t = Task.Run(() => server.Start());
    Console.WriteLine("Listening on port 1234. Opent http://localhost:1234 iin your browser.");
    Console.WriteLine("Trying to conect:");
    Console.WriteLine();

    GetResponseAsync("http://localhost:1234").GetAwaiter().GetResult();

    Console.WriteLine();
    Console.WriteLine("Press Enter to stop the server");
    Console.ReadLine();

    server.Stop().GetAwaiter().GetResult();

    Console.WriteLine("Press Enter to exit");
    Console.ReadLine();
}

static async Task GetResponseAsync(string url)
{
    using (var client = new HttpClient())
    {
        HttpResponseMessage responseMessage = await client.GetAsync(url);
        string responseHeaders = responseMessage.Headers.ToString();
        string response = await responseMessage.Content.ReadAsStringAsync();

        Console.WriteLine("Response headers:");
        Console.WriteLine(responseHeaders);
        Console.WriteLine("Response body:");
        Console.WriteLine(response);
    }
}

class AsyncHttpServer
{
    // 使用 HttpListener 实现一个简单的服务器
    readonly HttpListener _listener;
    const string RESPONSE_TEMPLATE = "<html><head><title>Test</title></head><body><h2>Testpage</h2><h4>Today is: {0}</h4></body></html>";

    public AsyncHttpServer(int portNumber)
    {
        _listener = new HttpListener();
        _listener.Prefixes.Add($"http://localhost:{portNumber}/");
    }
    
    public async Task Start()
    {
        // 启动服务器
        _listener.Start();
        while (true)
        {
            // 调用 GetContextAsync 会发生异步 I/O 操作
            var ctx = await _listener.GetContextAsync();
            // 接收到请求时继续下面的处理
            Console.WriteLine("Client connected...");
            // 返回一个简单的 HTML 页面
            var response = string.Format(RESPONSE_TEMPLATE, DateTime.Now);

            using (var sw = new StreamWriter(ctx.Response.OutputStream))
            {
                await sw.WriteAsync(response);
                await sw.FlushAsync();
            }
        }
    }

    public async Task Stop()
    {
        // 调用 Abort 方法丢弃所有连接并关闭服务器
        _listener.Abort();
    }
}
C# 多线程 09-使用异步 I/O 01-异步地使用文件

示例代码

/// <summary>
/// 异步地使用文件
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var t = ProcessAsynchronousIO();
    t.GetAwaiter().GetResult();

    Console.ReadLine();
}

const int BUFFER_SIZE = 4096;

static async Task ProcessAsynchronousIO()
{
    // 使用 FileStream 创建文件
    using (var stream = new FileStream("test1.txt", FileMode.Create, FileAccess.ReadWrite, FileShare.None, BUFFER_SIZE))
    {
        Console.WriteLine($"1. Uses I/O Threads: {stream.IsAsync}");

        byte[] buffer = Encoding.UTF8.GetBytes(CreateFileContent());
        // 将异步编程模型 API 转换成任务
        var writeTask = Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, 0, buffer.Length, null);
        await writeTask;
    }

    // 使用 FileStream 创建文件(提供了 FileOptions.Asynchronous 参数)
    // 只有提供了提供了 FileOptions.Asynchronous 选项,才能对 FileStream 类使用异步 IO
    using (var stream = new FileStream("test2.txt", FileMode.Create, FileAccess.ReadWrite, FileShare.None, BUFFER_SIZE, FileOptions.Asynchronous))
    {
        Console.WriteLine($"2. Uses I/O Threads: {stream.IsAsync}");

        byte[] buffer = Encoding.UTF8.GetBytes(CreateFileContent());
        // 将异步编程模型 API 转换成任务
        var writeTask = Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, 0, buffer.Length, null);
        await writeTask;
    }

    // 使用 File.Create(提供了 FileOptions.Asynchronous 参数)和 StreamWriter 创建和写入文件
    using (var stream = File.Create("test3.txt", BUFFER_SIZE, FileOptions.Asynchronous))
    using (var sw = new StreamWriter(stream))
    {
        Console.WriteLine($"3. Uses I/O Threads: {stream.IsAsync}");
        // 异步写入流
        await sw.WriteAsync(CreateFileContent());
    }

    // 仅使用 StreamWriter 创建文件
    using (var sw = new StreamWriter("test4.txt", true))
    {
        Console.WriteLine($"4. Uses I/O Threads: {((FileStream)sw.BaseStream).IsAsync}");

        // 异步写入流(因为没有提供 FileOptions.Asynchronous 参数,Stream 其实并没有使用异步 I/O)
        await sw.WriteAsync(CreateFileContent());
    }

    Console.WriteLine("Starting parsing files in parallel");
    var readTasks = new Task<long>[4];
    for (int i = 0; i < 4; i++)
    {
        string fileName = $"test{i + 1}.txt";
        // 异步读取文件并 Sum
        readTasks[i] = SumFileContent(fileName);
    }
    // 等待所有异步 Task 完成,并获取返回值数组
    long[] sums = await Task.WhenAll(readTasks);
    Console.WriteLine($"Sum is all files: {sums.Sum()}");

    Console.WriteLine("Deleting files");
    Task[] deleteTasks = new Task[4];
    for (int i = 0; i < 4; i++)
    {
        string filename = $"test{i + 1}.txt";
        // 异步删除文件
        deleteTasks[i] = SimulateAsynchronousDelete(filename);
    }
    // 等待所有异步删除操作结束
    await Task.WhenAll(deleteTasks);
    Console.WriteLine("Deleting complete.");
}

/// <summary>
/// 异步删除文件
/// </summary>
/// <param name="filename"></param>
/// <returns></returns>
static Task SimulateAsynchronousDelete(string filename)
{
    // 使用 Task.Run 模拟异步删除
    return Task.Run(() => File.Delete(filename));
}

/// <summary>
/// 异步统计文件中随机数的合计值
/// </summary>
/// <param name="fileName"></param>
/// <returns></returns>
static async Task<long> SumFileContent(string fileName)
{
    // 使用 FileStream 和 StreamReader 异步读取文件
    using (var stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.None, BUFFER_SIZE, FileOptions.Asynchronous))
    using (var sr = new StreamReader(stream))
    {
        long sum = 0;
        while (sr.Peek() > -1)
        {
            string line = await sr.ReadLineAsync();
            sum += long.Parse(line);
        }
        return sum;
    }
}

/// <summary>
/// 创建随机的文件内容
/// </summary>
/// <returns></returns>
static string CreateFileContent()
{
    var sb = new StringBuilder();
    for (int i = 0; i < 100000; i++)
    {
        sb.Append($"{ new Random(DateTime.Now.Millisecond).Next(0, 99999)}");
        sb.AppendLine();
    }
    return sb.ToString();
}
C# 多线程 08-使用 Reactive Extensions 06-使用 Rx 创建异步操作

示例代码

delegate string AsyncDelegate(string name);

/// <summary>
/// 使用 Rx 创建异步操作
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    IObservable<string> o = LongRunningOperationAsync("Task1");
    using (var sub = OutputToConsole(o))
    {
        Thread.Sleep(TimeSpan.FromSeconds(2));
    }
    Console.WriteLine("--------------------");

    Task<string> t = LongRunningOperationTaskAsync("Task2");
    // 使用 ToObservable 方法将 Task 转换为 Observable 方法
    using (var sub = OutputToConsole(t.ToObservable()))
    {
        Thread.Sleep(TimeSpan.FromSeconds(2));
    }
    Console.WriteLine("--------------------");
    
    // 将异步编程模块模式转换为 Observable 类
    AsyncDelegate asyncMethod = LongRunningOperation;
    Func<string, IObservable<string>> observableFactory = Observable.FromAsyncPattern<string, string>(asyncMethod.BeginInvoke, asyncMethod.EndInvoke);
    o = observableFactory("Task3");
    using (var sub = OutputToConsole(o))
    {
        Thread.Sleep(TimeSpan.FromSeconds(2));
    }
    Console.WriteLine("--------------------");

    // 对 Observable 操作使用 await
    o = observableFactory("Task4");
    AwaitOnObservable(o).Wait();
    Console.WriteLine("--------------------");

    // 把基于事件的异步模式直接转换为 Observable 类
    using (var timer = new System.Timers.Timer(1000))
    {
        var ot = Observable.FromEventPattern<ElapsedEventHandler, ElapsedEventArgs>(
            h => timer.Elapsed += h
            , h => timer.Elapsed -= h
        );
        timer.Start();

        using (var sub = OutputToConsole(ot))
        {
            Thread.Sleep(TimeSpan.FromSeconds(5));
        }
        Console.WriteLine("--------------------");
        timer.Stop();
    }

    Console.ReadLine();
}

static async Task<T> AwaitOnObservable<T>(IObservable<T> observable)
{
    T obj = await observable;
    Console.WriteLine($"{obj}");
    return obj;
}

static Task<string> LongRunningOperationTaskAsync(string name)
{
    return Task.Run(() => LongRunningOperation(name));
}

static IObservable<string> LongRunningOperationAsync(string name)
{
    // Observable.Start 与 TPL 中的 Task.Run 方法很相似。
    // 启动异步操作并返回同一个字符串结果,然后退出
    return Observable.Start(() => LongRunningOperation(name));
}

static string LongRunningOperation(string name)
{
    Thread.Sleep(TimeSpan.FromSeconds(1));
    return $"Task {name} is completed. Thread id {Thread.CurrentThread.ManagedThreadId}";
}

static IDisposable OutputToConsole(IObservable<EventPattern<ElapsedEventArgs>> sequence)
{
    return sequence.Subscribe(
        obj => Console.WriteLine($"{obj.EventArgs.SignalTime}")
        , ex => Console.WriteLine($"Error: {ex.Message}")
        , () => Console.WriteLine("Completed")
    );
}

static IDisposable OutputToConsole<T>(IObservable<T> sequence)
{
    return sequence.Subscribe(
        obj => Console.WriteLine($"{obj}")
        , ex => Console.WriteLine($"Error: {ex.Message}")
        , () => Console.WriteLine("Completed")
    );
}
C# 多线程 08-使用 Reactive Extensions 05-对可观察的集合使用 LINQ 查询

示例代码

/// <summary>
/// 对可观察的集合使用 LINQ 查询
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    // 计时器序列
    IObservable<long> sequence = Observable.Interval(TimeSpan.FromMilliseconds(50)).Take(21);
    // 偶数序列
    var evenNumbers = from n in sequence
                        where n % 2 == 0
                        select n;
    // 奇数序列
    var oddNumbers = from n in sequence
                    where n % 2 != 0
                    select n;
    // 连接序列
    var combine = from n in evenNumbers.Concat(oddNumbers)
                    select n;
    // 副作用序列(Do 函数)
    var nums = (from n in combine
                where n % 5 == 0
                select n)
                .Do(n => Console.WriteLine($"------Number {n} is processed in Do method"));

    // 以上测查询均是惰性的,只有订阅了其结果,查询才会运行
    using (var sub = OutputToConsole(sequence, 0))
    using (var sub2 = OutputToConsole(combine, 1))
    using (var sub3 = OutputToConsole(nums, 2))
    { 
        Console.WriteLine("Press Enter to finish the demo");
        Console.ReadLine();
    }
}

static IDisposable OutputToConsole<T>(IObservable<T> sequence, int innerLevel)
{
    string delimiter = innerLevel == 0
        ? String.Empty
        : new string('-', innerLevel * 3);

    return sequence.Subscribe(
        obj => Console.WriteLine($"{delimiter}{obj}")
        , ex => Console.WriteLine($"Error: {ex.Message}")
        , () => Console.WriteLine($"{delimiter}Completed")
    );
}
C# 多线程 08-使用 Reactive Extensions 04-创建可观察的对象

本例展示了创建可观察的对象的不同场景

示例代码

/// <summary>
/// 创建可观察的对象
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    Console.WriteLine("使用值创建 Observable 方法");
    Console.WriteLine("Observable.Return(0)");
    IObservable<int> o = Observable.Return(0);
    using (var sub = OutputToConsole(o)) ;
    Console.WriteLine("--------------------");

    Console.WriteLine("不使用值创建 Observable 方法");
    Console.WriteLine("Observable.Empty<int>()");
    o = Observable.Empty<int>();
    using (var sub = OutputToConsole(o)) ;
    Console.WriteLine("--------------------");

    Console.WriteLine("通过 Observable.Throw 触发 OnError 处理器");
    Console.WriteLine("Observable.Throw<int>( new Exception())");
    o = Observable.Throw<int>( new Exception());
    using (var sub = OutputToConsole(o)) ;
    Console.WriteLine("--------------------");

    Console.WriteLine("使用 Observable.Repeat 创建无尽序列");
    Console.WriteLine("Observable.Repeat(42)");
    o = Observable.Repeat(42);
    using (var sub = OutputToConsole(o.Take(5))) ;
    Console.WriteLine("--------------------");

    Console.WriteLine("使用 Observable.Range 创建一组值");
    Console.WriteLine("Observable.Range(0, 10)");
    o = Observable.Range(0, 10);
    using (var sub = OutputToConsole(o)) ;
    Console.WriteLine("--------------------");

    Console.WriteLine("Observable.Create 方法支持很多的自定义场景");
    // Create 方法接受一个函数,该函数接受一个观察者实例,并且返回 IDisposable 对象来代表订阅者
    o = Observable.Create<int>(ob =>
    {
        for (int i = 0; i < 10; i++)
        {
            ob.OnNext(i);
        }
        return Disposable.Empty;
    });
    using (var sub = OutputToConsole(o)) ;
    Console.WriteLine("--------------------");

    Console.WriteLine("Observable.Generate 是另一个创建自定义序列的方式");
    o = Observable.Generate(
        0 // install state 初始值
        , i => i < 5 // while this is true we continue the sequence 一个断言,用来决定是否需要生成更多元素或者完成序列
        , i => ++i // iteration 迭代逻辑
        , i => i * 2  // selecting result 选择器函数,允许我们定制化结果
    );
    using (var sub = OutputToConsole(o)) ;
    Console.WriteLine("--------------------");

    Console.WriteLine("Interval 会以 TimeSpan 间隔产生计时器标记事件");
    Console.WriteLine("Observable.Interval(TimeSpan.FromSeconds(1))");
    IObservable<long> ol = Observable.Interval(TimeSpan.FromSeconds(1));
    using (var sub = OutputToConsole(ol))
    {
        Thread.Sleep(TimeSpan.FromSeconds(3));
    }
    Console.WriteLine("--------------------");


    Console.WriteLine("Timer 指定了启动时间");
    Console.WriteLine("Observable.Timer(DateTimeOffset.Now.AddSeconds(2))");
    ol = Observable.Timer(DateTimeOffset.Now.AddSeconds(2));
    using (var sub = OutputToConsole(ol))
    {
        Thread.Sleep(TimeSpan.FromSeconds(3));
    }
    Console.WriteLine("--------------------");

    Console.ReadLine();
}

static IDisposable OutputToConsole<T>(IObservable<T> sequence)
{
    return sequence.Subscribe(
        obj => Console.WriteLine($"{obj}")
        , ex => Console.WriteLine($"Error: {ex.Message}")
        , () => Console.WriteLine("Completed")
    );
}
C# 多线程 08-使用 Reactive Extensions 03-使用 Subject

示例代码

/// <summary>
/// 使用 Subject
/// Subject 代表了 IObservable 和 IObserver 这两个接口的实现
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    // 一旦订阅了 Subject,它就会把事件序列发送给订阅者
    Console.WriteLine("Subject");
    var subject = new Subject<string>();
    
    subject.OnNext("A"); // A 在订阅之前,不会被打印
    using (var subscription = OutputToConsole(subject))
    {
        subject.OnNext("B");
        subject.OnNext("C");
        subject.OnNext("D");
        // 当调用 OnCompleted 或 OnError 方法时,事件序列传播会被停止
        subject.OnCompleted();
        // 事件传播停止之后的事件不会被打印
        subject.OnNext("Will not be printed out");
    }

    Console.WriteLine("ReplaySubject");
    // ReplaySubject 可以缓存从广播开始的所有事件
    var replaySubject = new ReplaySubject<string>();

    replaySubject.OnNext("A");
    // 稍后订阅也可以获得之前的事件
    using (var subscription = OutputToConsole(replaySubject))
    {
        replaySubject.OnNext("B");
        replaySubject.OnNext("C");
        replaySubject.OnNext("D");
        replaySubject.OnCompleted();
    }
    Console.WriteLine("Buffered ReplaySubject");
    // 指定 ReplaySubject 缓存的大小
    // 参数 2 表示只可以缓存最后的 2 个事件
    var bufferedSubject = new ReplaySubject<string>(2);

    bufferedSubject.OnNext("A");
    bufferedSubject.OnNext("B");
    bufferedSubject.OnNext("C");
    using (var subscription = OutputToConsole(bufferedSubject))
    {
        bufferedSubject.OnNext("D");
        bufferedSubject.OnCompleted();
    }

    Console.WriteLine("Time window ReplaySubject");
    // 指定 ReplaySubject 缓存的事件
    // TimeSpan.FromMilliseconds(200) 表示只缓存 200ms 内发生的事件
    var timeSubject = new ReplaySubject<string>(TimeSpan.FromMilliseconds(200));
    timeSubject.OnNext("A");
    Thread.Sleep(TimeSpan.FromMilliseconds(100));
    timeSubject.OnNext("B");
    Thread.Sleep(TimeSpan.FromMilliseconds(100));
    timeSubject.OnNext("C");
    Thread.Sleep(TimeSpan.FromMilliseconds(100));
    using (var subscription = OutputToConsole(timeSubject))
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(300));
        timeSubject.OnNext("D");
        timeSubject.OnCompleted();
    }

    Console.WriteLine("AsyncSubject");
    // AsyncSubject 类似于任务并行库中的 Task 类型
    // 它代表了单个异步操作
    // 如果有多个事件发布,它将等待事件序列完成,并把最后一个事件提供给订阅者
    var asyncSubject = new AsyncSubject<string>();

    asyncSubject.OnNext("A");
    using (var subscription = OutputToConsole(asyncSubject))
    {
        asyncSubject.OnNext("B");
        asyncSubject.OnNext("C");
        asyncSubject.OnNext("D");
        asyncSubject.OnCompleted();
    }

    Console.WriteLine("BehaviorSubject");
    // BehaviorSubject 与 ReplaySubject 很相似,但它只缓存一个值
    // 并允许万一还没有发送任何通知时,指定一个默认值
    // 默认值会被自动替换为订阅前的最后一个事件
    var behaviorSubject = new BehaviorSubject<string>("Default");
    using (var subscription = OutputToConsole(behaviorSubject))
    {
        behaviorSubject.OnNext("B");
        behaviorSubject.OnNext("C");
        behaviorSubject.OnNext("D");
        behaviorSubject.OnCompleted();

    }

    Console.ReadLine();
}

static IDisposable OutputToConsole<T>(IObservable<T> sequence)
{
    return sequence.Subscribe(
        obj => Console.WriteLine($"{obj}")
        , ex => Console.WriteLine($"Error: {ex.Message}")
        , () => Console.WriteLine("Completed")
    );
}
C# 多线程 08-使用 Reactive Extensions 02-编写自定义的可观察对象

示例代码

/// <summary>
/// 编写自定义的可观察对象
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var observer = new CustomObserver();

    var goodObservable = new CustomSequence(new[] { 1, 2, 3, 4, 5 });
    var badObservable = new CustomSequence(null);

    // 同步订阅
    using (IDisposable subscription = goodObservable.Subscribe(observer))
    {
    }

    // 异步订阅
    using (IDisposable subscription = goodObservable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(observer))
    {
        // 延迟一段时间等待异步任务完成
        Thread.Sleep(TimeSpan.FromMilliseconds(100));
        Console.WriteLine("Press Enter to continue");
        Console.ReadLine();
    }

    // 异步订阅异常时
    using (IDisposable subscription = badObservable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(observer))
    {
        // 延迟一段时间等待异步任务完成
        Thread.Sleep(TimeSpan.FromMilliseconds(100));
        Console.WriteLine("Press Enter to continue");
        Console.ReadLine();
    }
}

/// <summary>
/// 自定义观察者
/// </summary>
class CustomObserver : IObserver<int>
{
    public void OnCompleted()
    {
        Console.WriteLine("Completed");
    }

    public void OnError(Exception error)
    {
        Console.WriteLine($"Error: {error.Message}");
    }

    public void OnNext(int value)
    {
        Console.WriteLine($"Next value: {value}; Thread id: {Thread.CurrentThread.ManagedThreadId}");
    }
}

/// <summary>
/// 自定义可观察对象
/// </summary>
class CustomSequence : IObservable<int>
{
    private readonly IEnumerable<int> _numbers;

    public CustomSequence(IEnumerable<int> numbers)
    {
        _numbers = numbers;
    }

    public IDisposable Subscribe(IObserver<int> observer)
    {
        foreach (var number in _numbers)
        {
            observer.OnNext(number);
        }
        observer.OnCompleted();
        return Disposable.Empty;
    }
}
C# 多线程 08-使用 Reactive Extensions 01-将普通集合转换为异步的可观察集合

添加 NuGet 包

  1. 右键单击项目的引用,选择“管理 NuGet 程序包”

  2. 在浏览中搜“rx-main”

  3. 搜索结果中选择“System.Reactive”(当前的最新稳定版是 3.1.1),点击右边的“安装”按钮

    Reactive Extensions (Rx) Main Library combining the interfaces, core, LINQ, and platform services libraries.

C# 多线程 08-使用 Reactive Extensions 00-简介

观察者模式

  • 基于拉去(pull-based)

  • 基于推送(push-based)

反应扩展(Reactive Extensions,简称 Rx)

.NET Framework 从 4.0 版本开始包含了接口 IObservable<out T>IObserver<in T> 的定义,它们一起代表了异步的**基于推送 (push-based)**的集合及其客户端。

它们都来自叫做 Reactive Extensions(简称 Rx)的库,其由微软创建,用于使用可观察的集合来有效的构造事件序列,以及实际上任何其他类型的异步程序。

C# 多线程 07-使用 PLINQ 05-管理 PLINQ 查询中的数据分区

不同的线程在处理奇数长度和偶数长度的字符串

示例代码

/// <summary>
/// 管理 PLINQ 查询中的数据分区
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var sw = Stopwatch.StartNew();
    var partitioner = new StringPartitioner(GetTypes());
    var parallelQuery = from t in partitioner.AsParallel()
                        //.WithDegreeOfParallelism(1)
                        select EmulateProcessing(t);

    parallelQuery.ForAll(PrintInfo);
    int count = parallelQuery.Count();
    sw.Stop();
    Console.WriteLine("------------------------------");
    Console.WriteLine($"Total items processed: {count}");
    Console.WriteLine($"Time elapsesd: {sw.Elapsed}");

    Console.ReadLine();
}

static void PrintInfo(string typeName)
{
    Thread.Sleep(TimeSpan.FromMilliseconds(150));
    Console.WriteLine($"{typeName} type was printed on a thread id {Thread.CurrentThread.ManagedThreadId}");
}

static string EmulateProcessing(string typeName)
{
    Thread.Sleep(TimeSpan.FromMilliseconds(150));
    Console.WriteLine($"{typeName} type was processed on a thread id {Thread.CurrentThread.ManagedThreadId}. Has {(typeName.Length % 2 == 0 ? "even" : "odd")} length.");
    return typeName;
}

static IEnumerable<string> GetTypes()
{
    var types = AppDomain.CurrentDomain.GetAssemblies().SelectMany(a => a.GetExportedTypes());
    return from type in types
            where type.Name.StartsWith("Web")
            select type.Name;
}

/// <summary>
/// 自定义的分区器
/// </summary>
public class StringPartitioner : Partitioner<string>
{
    private readonly IEnumerable<string> _data;

    public StringPartitioner(IEnumerable<string> data)
    {
        _data = data;
    }

    /// <summary>
    /// 重写为 false,声明只支持静态分区
    /// </summary>
    public override bool SupportsDynamicPartitions => false;

    /// <summary>
    /// 重载生成静态分区方法(长度为奇数和偶数的字符串,分别放在不同的分区)
    /// </summary>
    /// <param name="partitionCount"></param>
    /// <returns></returns>
    public override IList<IEnumerator<string>> GetPartitions(int partitionCount)
    {
        var result = new List<IEnumerator<string>>(partitionCount);

        for (int i = 1; i <= partitionCount; i++)
        {
            result.Add(CreateEnumerator(i, partitionCount));
        }

        return result;
    }
    
    private IEnumerator<string> CreateEnumerator(int partitionNumber, int partitionCount)
    {
        int evenPartitions = partitionCount / 2;
        bool isEven = partitionNumber % 2 == 0;
        int step = isEven ? evenPartitions : partitionCount - evenPartitions;

        int startIndex = partitionNumber / 2 + partitionNumber % 2;

        var q = _data
            .Where(v => !(v.Length % 2 == 0 ^ isEven) || partitionCount == 1)
            .Skip(startIndex - 1);
        return q
            .Where((x, i) => i % step == 0)
            .GetEnumerator();
    }
}
C# 多线程 07-使用 PLINQ 06-为 PLINQ 查询创建一个自定义的聚合器

示例代码

/// <summary>
/// 为 PLINQ 查询创建一个自定义的聚合器
/// 统计集合中所有字母出现的频率
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var parallelQuery = from t in GetTypes().AsParallel()
                        select t;

    var parallelAggregator = parallelQuery.Aggregate(
        // 一个工厂类
        () => new ConcurrentDictionary<char, int>(),
        // 每个分区的聚合函数
        (taskTotal, item) => AccumulateLettersInformation(taskTotal, item),
        // 高阶聚合函数
        (total, taskTotal) => MergeAccululators(total, taskTotal),
        // 选择器函数(指定全局对象中我们需要的确切数据)
        total => total);

    Console.WriteLine();
    Console.WriteLine("There were the following letters in type names:");
    // 按字符出现的频率排序
    var orderedKeys = from k in parallelAggregator.Keys
                        orderby parallelAggregator[k] descending
                        select k;
    // 打印聚合结果
    foreach (var c in orderedKeys)
    {
        Console.WriteLine($"Letter '{c}' ---- {parallelAggregator[c]} times");
    }

    Console.ReadLine();
}

static ConcurrentDictionary<char, int> AccumulateLettersInformation(ConcurrentDictionary<char, int> taskTotal, string item)
{
    foreach (var c in item)
    {
        if (taskTotal.ContainsKey(c))
        {
            taskTotal[c] += 1;
        }
        else
        {
            taskTotal[c] = 1;
        }
    }

    Console.WriteLine($"{item} type was aggregated on a thread id {Thread.CurrentThread.ManagedThreadId}");

    return taskTotal;
}

static ConcurrentDictionary<char, int> MergeAccululators(ConcurrentDictionary<char, int> total, ConcurrentDictionary<char, int> taskTotal)
{
    foreach (var key in taskTotal.Keys)
    {
        if (total.ContainsKey(key))
        {
            total[key] += taskTotal[key];
        } else
        {
            total[key] = 1;
        }
    }

    Console.WriteLine("---");
    Console.WriteLine($"Total aggregate value was calculated on a thread in {Thread.CurrentThread.ManagedThreadId}");

    return total;
}

static IEnumerable<string> GetTypes()
{
    var types = AppDomain.CurrentDomain.GetAssemblies().SelectMany(a => a.GetExportedTypes());
    return from type in types
            where type.Name.StartsWith("Web")
            select type.Name;
}
C# 多线程 07-使用 PLINQ 04-处理 PLINQ 查询中的异常

示例代码

/// <summary>
/// 处理 PLINQ 查询中的异常
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    IEnumerable<int> numbers = Enumerable.Range(-5, 10);

    Console.WriteLine("执行顺序的 LINQ 查询");
    var query = from number in numbers
                select 100 / number;
    try
    {
        foreach (var n in query)
        {
            Console.WriteLine(n);
        }
    }
    catch (DivideByZeroException)
    {
        Console.WriteLine("被 0 除!");
    }
    
    Console.WriteLine("---");
    Console.WriteLine();

    Console.WriteLine("执行并行的 LINQ 查询");
    var parallelQuery = from number in numbers.AsParallel()
                        select 100 / number;
    try
    {
        parallelQuery.ForAll(Console.WriteLine);
    }
    catch (DivideByZeroException)
    {
        Console.WriteLine("被 0 除! - 通常的异常处理");
    }
    catch (AggregateException e)
    {
        e.Flatten().Handle(ex => {
            if (ex is DivideByZeroException)
            {
                Console.WriteLine("被 0 除! - Aggregate 异常处理");
                return true;
            }
            return false;
        });
    }
    Console.WriteLine("---");

    Console.ReadLine();
}
C# 多线程 07-使用 PLINQ 03-调整 PLINQ 查询的参数

调整 PLINQ 查询的参数

ParallelQuery

本例中使用了 ParallelQuery 中的如下方法

  • WithDegreeOfParallelism

    设置要在查询中使用的并行度。并行度是将用于处理查询的同时执行的任务的最大数目。

  • WithExecutionMode

    设置查询的执行模式

    • 参数

      • ParallelExecutionMode 查询执行模式

        • Default

          此设置为默认设置。PLINQ 将检查查询的结构,仅在可能带来加速时才对查询进行并行化。如果查询结构指示不可能获得加速,则 PLINQ 会将查询当作普通的 LINQ to Objects 查询来执行。

        • ForceParallelism

          并行化整个查询,即使要使用系统开销大的算法。如果认为并行执行查询将带来加速,则使用此标志,但处于默认模式的 PLINQ 将按顺序执行它。

  • WithMergeOptions

    设置此查询的合并选项,它指定查询对输出进行缓冲处理的方式。

    • 参数

      • ParallelMergeOptions

        指定查询中要使用的输出合并的首选类型。换言之,它指示 PLINQ 如何将多个分区的结果合并回单个结果序列。这仅是一个提示,系统在并行处理所有查询时可能不会考虑这一点。

        • Default

          使用默认合并类型,即 AutoBuffered

        • NotBuffered

          不利用输出缓冲区进行合并。一旦计算出结果元素,就向查询使用者提供这些元素。

        • AutoBuffered

          利用系统选定大小的输出缓冲区进行合并。在向查询使用者提供结果之前,会先将结果累计到输出缓冲区中。

        • FullyBuffered

          利用整个输出缓冲区进行合并。在向查询使用者提供任何结果之前,系统会先累计所有结果。

  • WithCancellation

    设置要与查询关联的 System.Threading.CancellationToken

  • AsOrdered

    启用将数据源视为“已经排序”的处理方法,重写默认的将数据源视为“未经排序”的处理方法。只可以对由 AsParallelParallelEnumerable.RangeParallelEnumerable.Repeat 返回的泛型序列调用 AsOrdered

C# 多线程 07-使用 PLINQ 02-PLINQ 查询

示例代码

/// <summary>
/// PLINQ 查询
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var sw = new Stopwatch();
    sw.Start();
    // 正常的顺序 LINQ 查询
    // 所有操作都运行在当前线程
    var query = from t in GetTypes()
                select EmulateProcessing(t);

    foreach (string typeName in query)
    {
        PrintInfo(typeName);
    }

    sw.Stop();
    Console.WriteLine("---");
    Console.WriteLine("Sequential LINQ query.");
    Console.WriteLine("正常的顺序 LINQ 查询");
    Console.WriteLine("所有操作都运行在当前线程");
    Console.WriteLine($"Time elapsed: {sw.Elapsed}");
    Console.WriteLine("Press Enter to continue....");
    Console.ReadLine();
    Console.Clear();
    sw.Reset();

    sw.Start();
    // 使用 AsParallel 方法将查询并行化
    // 默认情况下结果会被合并到单个线程中
    var paralleQuery = from t in GetTypes().AsParallel()
                        select EmulateProcessing(t);
    foreach (var typeName in paralleQuery)
    {
        PrintInfo(typeName);
    }
    sw.Stop();
    Console.WriteLine("---");
    Console.WriteLine("Parallel LINQ query. The results are being merged on a single thrad");
    Console.WriteLine("使用 AsParallel 方法将查询并行化");
    Console.WriteLine("默认情况下结果会被合并到单个线程中");
    Console.WriteLine($"Time elapsed: {sw.Elapsed}");
    Console.WriteLine("Press Enter to continue....");
    Console.ReadLine();
    Console.Clear();
    sw.Reset();


    sw.Start();
    // 使用 AsParallel 方法将查询并行化
    paralleQuery = from t in GetTypes().AsParallel()
                    select EmulateProcessing(t);
    // 使用 ForAll 方法将打印操作和查询操作放到了同一个线程,跳过了结果合并的步骤
    paralleQuery.ForAll(PrintInfo);

    sw.Stop();
    Console.WriteLine("---");
    Console.WriteLine("Parallel LINQ query. The results are being processed in parallel");
    Console.WriteLine("使用 ForAll 方法将打印操作和查询操作放到了同一个线程,跳过了结果合并的步骤");
    Console.WriteLine($"Time elapsed: {sw.Elapsed}");
    Console.WriteLine("Press Enter to continue....");
    Console.ReadLine();
    Console.Clear();
    sw.Reset();


    sw.Start();
    // 使用 AsSequential 方法将 PLINQ 查询已顺序方式执行
    query = from t in GetTypes().AsParallel().AsSequential()
            select EmulateProcessing(t);
    foreach (string typeName in query)
    {
        PrintInfo(typeName);
    }

    sw.Stop();
    Console.WriteLine("---");
    Console.WriteLine("Parallel LINQ query, transformed into sequential.");
    Console.WriteLine("使用 AsSequential 方法将 PLINQ 查询以顺序方式执行");
    Console.WriteLine("运行结果同第一个示例完全一样");
    Console.WriteLine($"Time elapsed: {sw.Elapsed}");
    Console.WriteLine("Press Enter to continue....");
    Console.ReadLine();
    Console.Clear();
    sw.Reset();
}

static void PrintInfo(string typeName)
{
    Thread.Sleep(TimeSpan.FromMilliseconds(15));
    Console.WriteLine($"{typeName} type was printed on a thread id {Thread.CurrentThread.ManagedThreadId}");
}

static string EmulateProcessing(string typeName)
{
    Thread.Sleep(TimeSpan.FromMilliseconds(150));
    Console.WriteLine($"{typeName} type was processed on a thread id {Thread.CurrentThread.ManagedThreadId}");
    return typeName;
}

/// <summary>
/// 使用反射 API 查询加载到当前应用程序域中的所有组件中名称以“Web”开头的类型
/// </summary>
/// <returns></returns>
static IEnumerable<string> GetTypes()
{
    return from assembly in AppDomain.CurrentDomain.GetAssemblies()
            from type in assembly.GetExportedTypes()
            where type.Name.StartsWith("Web")
            select type.Name;
}
C# 多线程 07-使用 PLINQ 01-使用 Parallel 类

示例代码

static void Main(string[] args)
{
    // 调用 Invoke 方法并行的运行多个任务
    Parallel.Invoke(
        () => EmulateProcessing("Task1"),
        () => EmulateProcessing("Task2"),
        () => EmulateProcessing("Task3")
    );

    // 使用 ForEach 方法并行的循环任务
    var cts = new CancellationTokenSource();
    var result = Parallel.ForEach(
        Enumerable.Range(1, 30),
        new ParallelOptions
        {
            // 可以指定 CancellationToken 取消循环
            CancellationToken = cts.Token,
            // 限制最大并行度
            MaxDegreeOfParallelism = Environment.ProcessorCount,
            // 设置自定义的 TaskScheduler 类
            TaskScheduler = TaskScheduler.Default
        },
        // Action 可以接受一个附加的 ParallelLoopState 参数
        (i, state) =>
        {
            Console.WriteLine(i);
            if (i == 20)
            {
                // 调用 Break 方法停止循环
                // Bread 方法停止之后的迭代,但之前的迭代还要继续工作
                state.Break();
                // 也可以使用 Stop 方法停止循环
                // Stop 方法会告诉循环停止任何工作,并设置并行循环状态属性 IsStopped 值为 true
                // state.Stop();
                Console.WriteLine($"Loop is stopped: {state.IsStopped}");
            }
        });

    Console.WriteLine("---");
    // 循环是否已完成
    Console.WriteLine($"IsCompleted: {result.IsCompleted}");
    // 最低迭代索引
    Console.WriteLine($"Lowest break iteration: {result.LowestBreakIteration}");

    Console.ReadLine();
}

static string EmulateProcessing(string taskName)
{
    Thread.Sleep(TimeSpan.FromMilliseconds(new Random(DateTime.Now.Millisecond).Next(250, 350)));
    Console.WriteLine($"{taskName} task was processed on a thrad id {Thread.CurrentThread.ManagedThreadId}");
    return taskName;
}
C# 多线程 07-使用 PLINQ 00-简介

并行库

并行库随着 .NET Framework 4.0 一起发布,包含三大主要部分:

  • 任务并行库(TPL)

  • 并发集合

  • 并行 LINQ(或 PLINQ)

编程模型

  • 任务并行(task parallelism)

    • 将程序分割成一组任务并使用不同的线程来运行不同的任务

    • 无结构并行(unstructured parallelism)

  • 数据并行(data parallelism)

    • 将数据分割成较小的数据块,对这些数据块进行并行运算,然后聚合这些计算结果

    • 结构并行(structured parallelism)

C# 多线程 06-使用并发集合 05-使用 BlockingCollection 进行异步处理

使用 BlockingCollection 进行异步处理

示例代码

/// <summary>
/// 使用 BlockingCollection 进行异步处理
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    Console.WriteLine($"Using a Queue inside of BlockingCollection");
    Console.WriteLine();
    // 默认使用 ConcurrentQueue 容器
    Task t = RunProgram();
    t.Wait();
    Console.WriteLine();
    Console.WriteLine("Using a Stack inside of BlockingCollection");
    Console.WriteLine();
    // 使用 ConcurrentStack 容器
    t = RunProgram(new ConcurrentStack<CustomTask>());
    t.Wait();

    Console.ReadLine();
}

static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection = null)
{
    // BlockingCollection 类可以改变存储在阻塞集合中的方式
    var taskCollection = new BlockingCollection<CustomTask>();
    // 默认使用 ConcurrentQueue 容器
    // 如果指定容器,则使用指定的容器
    if (collection != null)
    {
        taskCollection = new BlockingCollection<CustomTask>(collection);
    }

    // 调用生产者创建任务
    var taskSource = Task.Run(() => TaskProducer(taskCollection));

    // 生成消费者,消费任务
    Task[] processors = new Task[4];
    for (int i = 1; i <= 4; i++)
    {
        string processorId = $"Processor {i}";
        processors[i - 1] = Task.Run(() => TaskProcessor(taskCollection, processorId));
    }

    // 等待任务创建完毕
    await taskSource;

    // 等待任务全部消费完
    await Task.WhenAll(processors);
}

/// <summary>
/// 生产者
/// </summary>
/// <param name="collection"></param>
/// <returns></returns>
static async Task TaskProducer(BlockingCollection<CustomTask> collection)
{
    for (int i = 1; i <= 20; i++)
    {
        await Task.Delay(20);
        var workItem = new CustomTask { Id = i };
        collection.Add(workItem);
        Console.WriteLine($"Task {workItem.Id} has been posted");
    }
    // 生产者调用 CompleteAdding 方法时,该迭代周期会结束
    collection.CompleteAdding();
}

/// <summary>
/// 消费者
/// </summary>
/// <param name="collection"></param>
/// <param name="name"></param>
/// <returns></returns>
static async Task TaskProcessor(BlockingCollection<CustomTask> collection, string name)
{
    await GetRandomDelay();
    // 使用 GetConsumingEnumerable 方法获取工作项
    foreach (CustomTask item in collection.GetConsumingEnumerable())
    {
        Console.WriteLine($"Task {item.Id} has been processed by {name}");
        await GetRandomDelay();
    }
}

static Task GetRandomDelay()
{
    int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
    return Task.Delay(delay);
}

private class CustomTask
{
    public int Id { get; set; }
}
C# 多线程 06-使用并发集合 04-使用 ConcurrentBag 创建一个可扩展的爬虫

使用 ConcurrentBag 创建一个可扩展的爬虫

示例代码

static Dictionary<string, string[]> _contentEmulation = new Dictionary<string, string[]>();

/// <summary>
/// 使用 ConcurrentBag 创建一个可扩展的爬虫
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    CreateLinks();
    Task t = RunProgram();
    t.Wait();

    Console.ReadLine();
}

/// <summary>
/// 创建模拟用的链接数据
/// </summary>
private static void CreateLinks()
{
    _contentEmulation["http://liujiajia.me"] = new[] { "http://liujiajia.me/#/blog/it", "http://liujiajia.me/#/blog/game" };

    _contentEmulation["http://liujiajia.me/#/blog/it"] = new[] {
        "http://liujiajia.me/#/blog/details/csharp-multi-threading-06-concurrent-00-summary",
        "http://liujiajia.me/#/blog/details/cookie-http-only" };

    _contentEmulation["http://liujiajia.me/#/blog/game"] = new[] {
        "http://liujiajia.me/#/blog/details/wow-7-3-ptr",
        "http://liujiajia.me/#/blog/details/63b737b6-7663-43f6-acd2-dc6e020c14ba" };
}

static async Task RunProgram()
{
    var bag = new ConcurrentBag<CrawlingTask>();
    // 定义 4 个网站根 url 地址,并创建 4 个对应的 Task
    string[] urls =
    {
        "http://liujiajia.me",
        "http://weibo.com",
        "http://sf.gg",
        "http://ngacn.cc"
    };

    var crawlers = new Task[4];
    for (int i = 1; i <= 4; i++)
    {
        string crawlerName = $"Crawler {i}";
        bag.Add(new CrawlingTask { UrlToCrawl = urls[i-1], ProducerName = "root" });
        crawlers[i - 1] = Task.Run(() => Crawl(bag, crawlerName));
    }

    await Task.WhenAll(crawlers);
}

/// <summary>
/// 模拟爬虫程序
/// </summary>
/// <param name="bag"></param>
/// <param name="crawlerName"></param>
/// <returns></returns>
static async Task Crawl(ConcurrentBag<CrawlingTask> bag, string crawlerName)
{
    CrawlingTask task;
    while (bag.TryTake(out task))
    {
        // 如果页面中存在 URL 地址,则将这些地址放入待爬取的任务集合
        IEnumerable<string> urls = await GetLinksFromContent(task);
        if (urls != null)
        {
            foreach (var url in urls)
            {
                var t = new CrawlingTask
                {
                    UrlToCrawl = url,
                    ProducerName = crawlerName
                };

                bag.Add(t);
            }

            Console.WriteLine($"Indexing url {task.UrlToCrawl} posted by {task.ProducerName} is completed by {crawlerName}");
        }
    }
}

/// <summary>
/// 获取页面上的 URL 地址
/// </summary>
/// <param name="task"></param>
/// <returns></returns>
static async Task<IEnumerable<string>> GetLinksFromContent(CrawlingTask task)
{
    await GetRandomDelay();

    if (_contentEmulation.ContainsKey(task.UrlToCrawl))
    {
        return _contentEmulation[task.UrlToCrawl];
    }

    return null;
}

static Task GetRandomDelay()
{
    int delay = new Random(DateTime.Now.Millisecond).Next(150, 200);
    return Task.Delay(delay);
}

private class CrawlingTask
{
    public string UrlToCrawl { get; set; }
    public string ProducerName { get; set; }
}
C# 多线程 06-使用并发集合 03-改变 ConcurrentStack 异步处理顺序

改变 ConcurrentStack 异步处理顺序

该示例同 02-使用 ConcurrentQueue 实现异步处理 大部分是一样的,只是将 ConcurrentQueue 改成了 ConcurrentStack

Queue 是 FIFO(先进先出),Stack 则是 LIFO(后进先出),执行的顺序不一样。

/// <summary>
/// 改变 ConcurrentStack 异步处理顺序
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    Task t = RunProgram();
    t.Wait();

    Console.ReadLine();
}

static async Task RunProgram()
{
    var taskStack = new ConcurrentStack<CustomTask>();
    var cts = new CancellationTokenSource();
    // 异步创建任务
    var taskSource = Task.Run(() => TaskProducer(taskStack));

    // 创建 4 个任务处理线程
    Task[] processors = new Task[4];
    for (int i = 1; i <= 4; i++)
    {
        string processorId = i.ToString();
        processors[i - 1] = Task.Run(() => TaskProcessor(taskStack, $"Processor {processorId}", cts.Token));
    }

    // 等待创建任务结束
    await taskSource;
    // 延迟 2 秒发送取消指令,确保创建的任务被处理完
    cts.CancelAfter(TimeSpan.FromSeconds(2));
    // 等待所有消费结束
    await Task.WhenAll(processors);
}

/// <summary>
/// 创建任务
/// </summary>
/// <param name="queue"></param>
/// <returns></returns>
static async Task TaskProducer(ConcurrentStack<CustomTask> queue)
{
    for (int i = 1; i <= 20; i++)
    {
        await Task.Delay(50);
        // 创建任务加入队列
        var workItem = new CustomTask { Id = i };
        queue.Push(workItem);
        Console.WriteLine($"Task {workItem.Id} has been posted");
    }
}

/// <summary>
/// 任务处理程序
/// </summary>
/// <param name="queue">队列</param>
/// <param name="name">消费程序名</param>
/// <param name="token">令牌(取消任务用)</param>
/// <returns></returns>
static async Task TaskProcessor(ConcurrentStack<CustomTask> queue, string name, CancellationToken token)
{
    CustomTask workItem;
    bool dequeueSuccesful = false;

    // 若任务未取消,则延迟随机时间后尝试从队列中获取任务
    await GetRandomDelay();
    do
    {
        dequeueSuccesful = queue.TryPop(out workItem);
        if (dequeueSuccesful)
        {
            Console.WriteLine($"Task {workItem.Id} has been processed by {name}");
        }
        await GetRandomDelay();
    } while (!token.IsCancellationRequested);

}

/// <summary>
/// 获取随机的延迟时间
/// </summary>
/// <returns></returns>
static Task GetRandomDelay()
{
    int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
    return Task.Delay(delay);
}

private class CustomTask
{
    public int Id { get; set; }
}
C# 多线程 06-使用并发集合 02-使用 ConcurrentQueue 实现异步处理

使用 ConcurrentQueue 实现异步处理

/// <summary>
/// 使用 ConcurrentQueue 实现异步处理
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    Task t = RunProgram();
    t.Wait();

    Console.ReadLine();
}

static async Task RunProgram()
{
    var taskQueue = new ConcurrentQueue<CustomTask>();
    var cts = new CancellationTokenSource();
    // 异步创建任务
    var taskSource = Task.Run(() => TaskProducer(taskQueue));

    // 创建 4 个任务处理线程
    Task[] processors = new Task[4];
    for (int i = 1; i <= 4; i++)
    {
        string processorId = i.ToString();
        processors[i - 1] = Task.Run(() => TaskProcessor(taskQueue, $"Processor {processorId}", cts.Token));
    }

    // 等待创建任务结束
    await taskSource;
    // 延迟 2 秒发送取消指令,确保创建的任务被处理完
    cts.CancelAfter(TimeSpan.FromSeconds(2));
}

/// <summary>
/// 创建任务
/// </summary>
/// <param name="queue"></param>
/// <returns></returns>
static async Task TaskProducer(ConcurrentQueue<CustomTask> queue)
{
    for (int i = 1; i <= 20; i++)
    {
        await Task.Delay(50);
        // 创建任务加入队列
        var workItem = new CustomTask { Id = i };
        queue.Enqueue(workItem);
        Console.WriteLine($"Task {workItem.Id} has been posted");
    }
}

/// <summary>
/// 任务处理程序
/// </summary>
/// <param name="queue">队列</param>
/// <param name="name">消费程序名</param>
/// <param name="token">令牌(取消任务用)</param>
/// <returns></returns>
static async Task TaskProcessor(ConcurrentQueue<CustomTask> queue, string name, CancellationToken token)
{
    CustomTask workItem;
    bool dequeueSuccesful = false;

    // 若任务未取消,则延迟随机时间后尝试从队列中获取任务
    await GetRandomDelay();
    do
    {
        dequeueSuccesful = queue.TryDequeue(out workItem);
        if (dequeueSuccesful)
        {
            Console.WriteLine($"Task {workItem.Id} has been processed by {name}");
        }
        await GetRandomDelay();
    } while (!token.IsCancellationRequested);

}

/// <summary>
/// 获取随机的延迟时间
/// </summary>
/// <returns></returns>
static Task GetRandomDelay()
{
    int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
    return Task.Delay(delay);
}

private class CustomTask
{
    public int Id { get; set; }
}
C# 多线程 06-使用并发集合 01-使用 ConcurrentDictionary

使用 ConcurrentDictionary

const string Item = "Dictionary item";
const int Iterations = 1000000;
public static string CurrentItem;

static void Main(string[] args)
{
    var concurrentDictionary = new ConcurrentDictionary<int, string>();
    var dictionary = new Dictionary<int, string>();

    var sw = new Stopwatch();

    sw.Start();
    for (int i = 0; i < Iterations; i++)
    {
        lock (dictionary)
        {
            dictionary[i] = Item;
        }
    }
    sw.Stop();
    Console.WriteLine($"写入使用粗粒度锁的字典: {sw.Elapsed}");

    sw.Restart();
    for (int i = 0; i < Iterations; i++)
    {
        concurrentDictionary[i] = Item;
    }
    sw.Stop();
    Console.WriteLine($"写入一个 ConcurrentDictionary(细粒度锁): {sw.Elapsed}");

    sw.Restart();
    for (int i = 0; i < Iterations; i++)
    {
        lock (dictionary)
        {
            CurrentItem = dictionary[i];
        }
    }
    sw.Stop();
    Console.WriteLine($"使用粗粒度锁从字典中读取: {sw.Elapsed}");

    sw.Restart();
    for (int i = 0; i < Iterations; i++)
    {
        CurrentItem = concurrentDictionary[i];
    }
    sw.Stop();
    Console.WriteLine($"从一个 ConcurrentDictionary 中读取: {sw.Elapsed}");


    Console.WriteLine();


    int taskSize = 10;
    Task[] ts = new Task[taskSize];
    for (int j = 0; j < taskSize; j++)
    {
        Task t = new Task(() => {
            for (int i = 0; i < Iterations; i++)
            {
                lock (dictionary)
                {
                    dictionary[i] = Item;
                }
            }
        });
        ts[j] = t;
    }
    var whenAllTask = Task.WhenAll(ts);
    sw.Restart();
    for (int i = 0; i < ts.Length; i++)
    {
        ts[i].Start();
    }
    whenAllTask.Wait();
    sw.Stop();
    Console.WriteLine($"【多线程】写入使用粗粒度锁的字典: {sw.Elapsed}");

    
    ts = new Task[taskSize];
    for (int j = 0; j < taskSize; j++)
    {
        Task t = new Task(() => {
            for (int i = 0; i < Iterations; i++)
            {
                concurrentDictionary[i] = Item;
            }
        });
        ts[j] = t;
    }
    whenAllTask = Task.WhenAll(ts);
    sw.Restart();
    for (int i = 0; i < ts.Length; i++)
    {
        ts[i].Start();
    }
    whenAllTask.Wait();
    sw.Stop();
    Console.WriteLine($"【多线程】写入一个 ConcurrentDictionary(细粒度锁): {sw.Elapsed}");
    

    ts = new Task[taskSize];
    for (int j = 0; j < taskSize; j++)
    {
        Task t = new Task(() => {
            for (int i = 0; i < Iterations; i++)
            {
                lock (dictionary)
                {
                    CurrentItem = dictionary[i];
                }
            }
        });
        ts[j] = t;
    }
    whenAllTask = Task.WhenAll(ts);
    sw.Restart();
    for (int i = 0; i < ts.Length; i++)
    {
        ts[i].Start();
    }
    whenAllTask.Wait();
    sw.Stop();
    Console.WriteLine($"【多线程】使用粗粒度锁从字典中读取: {sw.Elapsed}");
    

    ts = new Task[taskSize];
    for (int j = 0; j < taskSize; j++)
    {
        Task t = new Task(() => {
            for (int i = 0; i < Iterations; i++)
            {
                CurrentItem = concurrentDictionary[i];
            }
        });
        ts[j] = t;
    }
    whenAllTask = Task.WhenAll(ts);
    sw.Restart();
    for (int i = 0; i < ts.Length; i++)
    {
        ts[i].Start();
    }
    whenAllTask.Wait();
    sw.Stop();
    Console.WriteLine($"【多线程】从一个 ConcurrentDictionary 中读取: {sw.Elapsed}");
    
    Console.ReadLine();
}
C# 多线程 06- 使用并发集合 00- 简介

简介

.NET framework 4 引入了 System.Collections.Concurrent 命名空间,其中包含了一些数据结构。这些数据结构具备可伸缩性,尽可能地避免锁,同时还能提供线程安全的访问。

1. ConcurrentQueue

  • 该集合使用了 原子的比较和交换(Copmare and Swap,简称 CAS)操作,以及 SpinWait 来保证线程安全。

  • 它实现了一个 先进先出(First In First Out,简称 FIFO)的集合

  • 调用 Enqueue 方法向队列中加入元素

  • TryDequeue 方法试图取出队列中的第一个元素

  • TryPeek 方法试图得到第一个元素但并不从队列中删除该元素

C# 多线程 05-使用 C#6.0 09-对动态类型使用 await

实现的功能同 05-使用 C#6.0 08-自定义 awaitable 类型 是一样的,只是这里是使用动态类型实现的。

注意

本例需要导入 ImpromptuInterface

示例代码

C# 多线程 03- 使用线程池 01- 在线程池中调用委托

在线程池中调用委托

异步编程模型(Asynchronous Programming Model,简称 APM),这是 .net 历史中的第一个异步编程模式。

点击查看代码
private delegate string RunOnThreadPool(out int threadId);

static void Main(string[] args)
{
    int threadId = 0;

    RunOnThreadPool poolDelegate = Test;

    // 不使用异步处理
    var t = new Thread(() => Test(out threadId));
    t.Start();
    // 等待 t 结束
    t.Join();
    // 打印结果
    Console.WriteLine($"线程 Id:{threadId}");

    // 开始异步处理,并指定回调函数
    IAsyncResult r = poolDelegate.BeginInvoke(out threadId, Callback, "一个代理异步调用");
    // 等待 poolDelegate 执行结束
    r.AsyncWaitHandle.WaitOne();
    // 获取异步处理结果
    // 虽然 threadId 是按址传参,但也必须通过 EndInvoke 获取返回结果
    string result = poolDelegate.EndInvoke(out threadId, r);
    // 打印结果
    Console.WriteLine($"线程池工作线程 Id:{threadId}");
    Console.WriteLine(result);
    // 这里的延迟是为了给回调函数足够的执行时间
    Thread.Sleep(TimeSpan.FromSeconds(2));
}

private static void Callback(IAsyncResult ar)
{
    Console.WriteLine("开始一个回调");
    // AsyncState 值为 BeginInvoke 的第三个参数值
    Console.WriteLine($"回调状态:{ar.AsyncState}");
    Console.WriteLine($"是否为线程池中的线程:{Thread.CurrentThread.IsThreadPoolThread}"); // true
    // 回调函数的线程 ID 和异步处理的线程 ID 是相同的
    Console.WriteLine($"线程池工作线程 Id:{Thread.CurrentThread.ManagedThreadId}");
}

private static string Test(out int threadId)
{
    Console.WriteLine("开始...");
    Console.WriteLine($"是否为线程池中的线程:{Thread.CurrentThread.IsThreadPoolThread}");
    Thread.Sleep(TimeSpan.FromSeconds(2));
    threadId = Thread.CurrentThread.ManagedThreadId;
    // 返回值可以通过代理的 EndInvoke 方法获取
    return $"线程池工作线程 Id 是 {Thread.CurrentThread.ManagedThreadId}";
}
C# 多线程 05-使用 C#6.0 03-对连续的异步任务使用 await 操作符

对连续的异步任务使用 await 操作符

/// <summary>
/// 对连续的异步任务使用 await 操作符
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    Task t = AsynchronyWithTPL();
    t.Wait();

    t = AsynchronyWithAwait();
    t.Wait();

    Console.ReadLine();
}

/// <summary>
/// 使用 TPL 方式的实现
/// 功能等同于 AsynchronyWithAwait 方法的功能
/// 但是写法复杂很多
/// </summary>
/// <returns></returns>
static Task AsynchronyWithTPL()
{
    var containerTask = new Task(() => {
        Task<string> t = GetInfoAsync("TPL 1");
        t.ContinueWith(task => {
            Console.WriteLine(t.Result);
            Task<string> t2 = GetInfoAsync("TPL 2");
            t2.ContinueWith(innerTask => Console.WriteLine(innerTask.Result),
                TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.AttachedToParent);
            t2.ContinueWith(innerTask => Console.WriteLine(innerTask.Exception.InnerException),
                TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.AttachedToParent);
        }, TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.AttachedToParent);

        t.ContinueWith(task => Console.WriteLine(t.Exception.InnerException),
            TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.AttachedToParent);
    });
    containerTask.Start();
    return containerTask;
}

static async Task AsynchronyWithAwait()
{
    try
    {
        // 使用了两个 await 操作符,但代码还是按顺序执行的
        string result = await GetInfoAsync("Async 1");
        Console.WriteLine(result);
        result = await GetInfoAsync("Async 2");
        Console.WriteLine(result);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex);
    }
}

static async Task<string> GetInfoAsync(string name)
{
    Console.WriteLine($"Task {name} started!");
    await Task.Delay(TimeSpan.FromSeconds(2));
    if (name == "TPL 2" || name == "Async 2")
    {
        throw new Exception("Boom!");
    }
    return $"Task {name} is running on a thrad id {Thread.CurrentThread.ManagedThreadId}. " +
        $"Is thread pool thread: {Thread.CurrentThread.IsThreadPoolThread}";
}
C# 多线程 05-使用 C#6.0 02-在 lambda 表达式中使用 await 操作符

在 lambda 表达式中使用 await 操作符

/// <summary>
/// 在 lambda 表达式中使用 await 操作符
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    // 由于不能在 main 方法中使用 async,所以将异步函数移到了 AsynchronousProcessing 方法中
    Task t = AsynchronousProcessing();
    t.Wait();

    Console.ReadLine();
}

static async Task AsynchronousProcessing()
{
    // 由于任何 lambda 表达式的类型都不能通过 lambda 自身来推断,
    // 所以不得不显式向C#编译器指定它的类型
    // 这里 lambda 的参数类型为 string,返回值类型为 Task<string>
    // lambda 表达式中虽然只返回了一个 string,但是因为使用 async 操作符,会自动封装成 Task<string>
    Func<string, Task<string>> asyncLambda = async name =>
    {
        await Task.Delay(TimeSpan.FromSeconds(2));
        return $"Task {name} is running on a thread id {Thread.CurrentThread.ManagedThreadId}. " +
        $"Is thread pool thread:{Thread.CurrentThread.IsThreadPoolThread}";
    };

    // 使用 await 操作符获取异步操作的返回值
    string result = await asyncLambda("async lambda");
    Console.WriteLine(result);
}
C# 多线程 05-使用 C#6.0 01-使用 await 操作符获取异步任务结果

使用 await 操作符获取异步任务结果

C# 5.0 中引入了新的语言特性,称为异步函数(asynchronous function)。它是 TPL 之上的更高级别的抽象,真正简化了异步编程。

下面代码中 AsynchronyWithTPLAsynchronyWithAwait 实现的功能是一样的,可以看出使用 asyncawait 关键字的写法更加简洁而且易懂。

/// <summary>
/// 使用 await 操作符获取异步任务结果
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    Task t = AsynchronyWithTPL();
    t.Wait();

    t = AsynchronyWithAwait();
    t.Wait();

    Console.ReadLine();
}

// 标准 TPL 的写法
static Task AsynchronyWithTPL()
{
    Task<string> t = GetInfoAsync("Task 1");
    Task t2 = t.ContinueWith(task => Console.WriteLine(t.Result),
        TaskContinuationOptions.NotOnFaulted);
    Task t3 = t.ContinueWith(task => Console.WriteLine(t.Exception.InnerException),
        TaskContinuationOptions.OnlyOnFaulted);
    return Task.WhenAny(t2, t3);
}

// 使用 async await 关键字的写法
static async Task AsynchronyWithAwait()
{
    try
    {
        string result = await GetInfoAsync("Task 2");
        Console.WriteLine(result);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex);
    }
}

static async Task<string> GetInfoAsync(string name)
{
    await Task.Delay(TimeSpan.FromSeconds(2));
    return $"Task {name} is running on a thread id {Thread.CurrentThread.ManagedThreadId}. " +
        $"Is thread pool thread: {Thread.CurrentThread.IsThreadPoolThread}";
}
C# 多线程 04-使用任务平行库 09-使用 TaskScheduler 配置任务的执行

使用 TaskScheduler 配置任务的执行

本节新建了一个 WPF 项目,用以观察异步时界面的响应效果。

页面代码

<Window x:Class="Recipe4_9.MainWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
        xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
        xmlns:local="clr-namespace:Recipe4_9"
        mc:Ignorable="d"
        Title="MainWindow" Height="350" Width="525">
    <Grid>
        <TextBlock
            Name="ContentTextBlock"
            HorizontalAlignment="Left"
            Margin="44,134,0,0" 
            VerticalAlignment="Top"
            Width="425"
            Height="40" />
        
        <Button
            Content="Sync"
            HorizontalAlignment="Left"
            Margin="45,190,0,0" 
            VerticalAlignment="Top"
            Width="75"
            Click="ButtonSync_Click"/>
        
        <Button
            Content="Async"
            HorizontalAlignment="Left"
            Margin="165,190,0,0"
            VerticalAlignment="Top"
            Width="75"
            Click="ButtonAsync_Click"/>

        <Button
            Content="Async OK"
            HorizontalAlignment="Left"
            Margin="285,190,0,0"
            VerticalAlignment="Top"
            Width="75"
            Click="ButtonAsyncOK_Click"/>

    </Grid>
</Window>
C# 多线程 04-使用任务平行库 08-并行运行任务

并行运行任务

/// <summary>
/// 并行运行任务
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    // 创建两个任务
    var firstTask = new Task<int>(() => TaskMethod("First Task", 3));
    var secondTask = new Task<int>(() => TaskMethod("Second Task", 2));
    // 使用 Task.WhenAll 创建第三个任务,该任务将在所有任务完成后执行
    // 该任务的结果提供了一个结果数组
    var whenAllTask = Task.WhenAll(firstTask, secondTask);
    whenAllTask.ContinueWith(
        t => Console.WriteLine($"The first answer is {t.Result[0]}, the second is {t.Result[1]}"),
        TaskContinuationOptions.OnlyOnRanToCompletion);

    firstTask.Start();
    secondTask.Start();

    Thread.Sleep(TimeSpan.FromSeconds(4));

    // 创建一个任务列表
    var tasks = new List<Task<int>>();
    for (int i = 0; i < 4; i++)
    {
        int counter = i;
        var task = new Task<int>(() => TaskMethod($"Task {counter}", counter));
        tasks.Add(task);
        task.Start();
    }

    while (tasks.Count > 0)
    {
        // 通过 Task.WhenAny 方法等待任何一个任务完成
        var completedTask = Task.WhenAny(tasks).Result;
        tasks.Remove(completedTask);
        Console.WriteLine($"A task has been completed with result {completedTask.Result}.");
    }

    Thread.Sleep(TimeSpan.FromSeconds(1));
    Console.ReadLine();
}

static int TaskMethod(string name, int seconds)
{
    Console.WriteLine($"Task {name} is running on a thread id {Thread.CurrentThread.ManagedThreadId}. " +
        $"Is thread pool thread: {Thread.CurrentThread.IsThreadPoolThread}");
    Thread.Sleep(TimeSpan.FromSeconds(seconds));
    return 42 * seconds;
}
C# 多线程 04-使用任务平行库 07-处理任务中的异常

处理任务中的异常

/// <summary>
/// 处理任务中的异常
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    Task<int> task;
    try
    {
        task = Task.Run(() => TaskMethod("Task 1", 2));
        // 尝试同步获取 task 的结果
        // 捕获的异常是一个被封装的异常(AggregateExcption),可以访问 InnerException 获取底层异常
        int result = task.Result;
        Console.WriteLine($"Result: {result}");
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Exception caught: {ex}");
    }
    Console.WriteLine("----------------------------------------");
    Console.WriteLine();

    try
    {
        task = Task.Run(() => TaskMethod("Task 2", 2));
        // 使用 GetAwaiter 和 GetResult 方法获取任务结果
        // 这种情况下不会封装异常,因为 TPL 基础设施会提取该异常。
        // 如果只有一个底层任务,那么一次只能获取一个原始异常。
        int result = task.GetAwaiter().GetResult();
        Console.WriteLine($"Result: {result}");
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Exception caught: {ex}");
    }
    Console.WriteLine("----------------------------------------");
    Console.WriteLine();

    var t1 = new Task<int>(() => TaskMethod("Task 3", 3));
    var t2 = new Task<int>(() => TaskMethod("Task 4", 2));
    var complexTask = Task.WhenAll(t1, t2);
    // 待 t1 和 t2 都结束后才会打印异常
    // 异常类型为 AggregateExcption,其内部封装了 2 个任务抛出的异常
    var exceptionHandler = complexTask.ContinueWith(
        t => Console.WriteLine($"Exception caught: {t.Exception}"),
        TaskContinuationOptions.OnlyOnFaulted);
    t1.Start();
    t2.Start();
    Thread.Sleep(TimeSpan.FromSeconds(5));
    Console.ReadLine();
}

static int TaskMethod(string name, int seconds)
{
    Console.WriteLine($"Task {name} is running on a thread id {Thread.CurrentThread.ManagedThreadId}. " +
        $"Is thread pool thread: {Thread.CurrentThread.IsThreadPoolThread}");
    Thread.Sleep(TimeSpan.FromSeconds(seconds));
    throw new Exception("Boom!");
    return 42 * seconds;
}
C# 多线程 04-使用任务平行库 06-实现取消选项

实现取消选项

本节是 Task 中实现取消选项,同使用线程池中实现取消是一样的。(参照:C# 多线程 03-使用线程池 04-实现一个取消选项)

/// <summary>
/// 实现取消选项
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var cts = new CancellationTokenSource();
    var longTask = new Task<int>(() => TaskMethod("Task 1", 10, cts.Token));
    Console.WriteLine(longTask.Status); // Created
    cts.Cancel();
    Console.WriteLine(longTask.Status); // Created
    Console.WriteLine($"First task has benn cancelled before execution");

    cts = new CancellationTokenSource();
    longTask = new Task<int>(() => TaskMethod("Task 2", 10, cts.Token));
    longTask.Start();
    for (int i = 0; i < 5; i++)
    {
        Thread.Sleep(TimeSpan.FromSeconds(0.5));
        Console.WriteLine(longTask.Status); // Running
    }
    cts.Cancel();
    for (int i = 0; i < 5; i++)
    {
        Thread.Sleep(TimeSpan.FromSeconds(0.5));
        Console.WriteLine(longTask.Status); // RanToCompletion
    }

    Console.WriteLine($"A task has been completed with result {longTask.Result}"); // -1
    Console.ReadLine();
}

static int TaskMethod(string name, int seconds, CancellationToken token)
{
    Console.WriteLine($"Task {name} is running on a thread id : {Thread.CurrentThread.ManagedThreadId}. " +
        $"Is thread pool thread: {Thread.CurrentThread.IsThreadPoolThread}");
    for (int i = 0; i < seconds; i++)
    {
        Thread.Sleep(TimeSpan.FromSeconds(1));
        if (token.IsCancellationRequested)
        {
            return -1;
        }
    }
    return 42 * seconds;
}
C# 多线程 04-使用任务平行库 05-将 EAP 模式转换为任务

将 EAP 模式转换为任务

关于 基于事件的异步模式(Event-based Asynchronous Pattern,简称 EAP)的实现方法可以参照 C# 多线程 03-使用线程池 07-使用 BackgroundWorker 组件

可以使用 TaskCompletionSource 类型将 EAP 转换为 Task。

/// <summary>
/// 将 EAP 模式转换为任务
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    // 主要是使用 TaskCompletionSource 类型
    // T 是异步处理返回结果类型
    var tcs = new TaskCompletionSource<int>();
    var worker = new BackgroundWorker();
    worker.DoWork += (sender, eventArgs) =>
    {
        eventArgs.Result = TaskMethod("Background worker", 5);
    };

    worker.RunWorkerCompleted += (sender, eventArgs) =>
    {
        if (eventArgs.Error != null)
        {
            tcs.SetException(eventArgs.Error);
        }
        else if (eventArgs.Cancelled)
        {
            tcs.SetCanceled();
        }
        else
        {
            try
            {
                // 将 SetResult 方法封装在 try-catch 中,保证发生错误时仍然会设置给任务完成源对象
                // 若发生异常导致没有执行 SetResult,则程序会一直阻塞在获取 Task 结果那里 (tcs.Task.Result)
                tcs.SetResult((int)eventArgs.Result);
            }
            catch (Exception)
            {
                tcs.SetResult(0);
            }
        }
    };

    worker.RunWorkerAsync();
    int result = tcs.Task.Result;

    Console.WriteLine($"Result is: {result}");
    Console.ReadLine();
}

static int TaskMethod(string name, int seconds)
{
    Console.WriteLine($"Task {name} is running on a thread id {Thread.CurrentThread.ManagedThreadId}. " +
        $"Is thread pool thread: {Thread.CurrentThread.IsThreadPoolThread}");
    Thread.Sleep(TimeSpan.FromSeconds(seconds));
    return 42 * seconds;
}
C# 多线程 04-使用任务平行库 04-将 APM 模式转换为任务

将 APM 模式转换为任务

关于 APM(Asynchronous Programming Model:异步编程模型)可以参考 01-在线程池中调用委托

本节介绍将 APM 转换为 Task。

/// <summary>
/// 将 APM 模式转换为任务
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    int threadId;
    AsynchronousTask d = Test;
    IncompatibleAsynchronousTask e = Test;

    // 将 APM 转换为 TPL 的关键是 Task<T>.Factory.FromAsync 方法
    // 其中 T 为异步操作结果的类型
    // 该方法有多个重载。

    Console.WriteLine("Option 1");
    // 这种方法可以指定回调函数
    Task<string> task = Task<string>.Factory.FromAsync(
        d.BeginInvoke(
            "AsyncTaskThread",
            Callback,
            "a delegate asynchronous call"),
        d.EndInvoke);

    task.ContinueWith(t => Console.WriteLine($"Callback is finished, now running a continuation! Result: {t.Result}"));

    while (!task.IsCompleted)
    {
        Console.WriteLine(task.Status);
        Thread.Sleep(TimeSpan.FromSeconds(0.5));
    }
    Console.WriteLine(task.Status);
    Thread.Sleep(TimeSpan.FromSeconds(1));

    Console.WriteLine("------------------------------------");
    Console.WriteLine();
    Console.WriteLine("Option 2");

    // 这种方法不能直接指定回调函数
    // 如果需要,可以使用 Task.ContinueWith 方法执行回调函数。
    task = Task<string>.Factory.FromAsync(
        d.BeginInvoke,
        d.EndInvoke,
        "AsyncTaskThread",
        "a delegate asynchronous call");

    task.ContinueWith(t => Console.WriteLine($"Task is completed, now running a continuation! Result: {t.Result}"));

    while (!task.IsCompleted)
    {
        Console.WriteLine(task.Status);
        Thread.Sleep(TimeSpan.FromSeconds(0.5));
    }
    Console.WriteLine(task.Status);
    Thread.Sleep(TimeSpan.FromSeconds(1));

    Console.WriteLine("------------------------------------");
    Console.WriteLine();
    Console.WriteLine("Option 3");

    // 这里的异步方法带有 out 参数,导致 EndInvoke 的签名和 FromAsync 的签名不一致
    // 这里展示了一个小技巧,使用 lambda 表达式封装了 EndInvoke 方法
    IAsyncResult ar = e.BeginInvoke(out threadId, Callback, "a delegate asynchronous call");
    task = Task<string>.Factory.FromAsync(ar, _ => e.EndInvoke(out threadId, ar));

    task.ContinueWith(t => Console.WriteLine($"Task is completed, now running a continuation! Result: {t.Result}, ThreadId: {threadId}"));

    while (!task.IsCompleted)
    {
        Console.WriteLine(task.Status);
        Thread.Sleep(TimeSpan.FromSeconds(0.5));
    }
    Console.WriteLine(task.Status);
    Thread.Sleep(TimeSpan.FromSeconds(1));

    Console.ReadLine();
}

delegate string AsynchronousTask(string threadName);
delegate string IncompatibleAsynchronousTask(out int threadId);

static void Callback(IAsyncResult ar)
{
    Console.WriteLine("Starting a callback ...");
    Console.WriteLine($"State passed to a callback: {ar.AsyncState}");
    Console.WriteLine($"Is thread pool thread: {Thread.CurrentThread.IsThreadPoolThread}");
    Console.WriteLine($"Thread pool worker thread id: {Thread.CurrentThread.ManagedThreadId}");
}

static string Test(string threadName)
{
    Console.WriteLine("Starting...");
    Console.WriteLine($"Is thread pool thread: {Thread.CurrentThread.IsThreadPoolThread}");
    Thread.Sleep(TimeSpan.FromSeconds(2));
    Thread.CurrentThread.Name = threadName;
    return $"Thread anme: {Thread.CurrentThread.Name}";
}

static string Test(out int threadId)
{
    Console.WriteLine("Starting...");
    Console.WriteLine($"Is thread pool thread: {Thread.CurrentThread.IsThreadPoolThread}");
    Thread.Sleep(TimeSpan.FromSeconds(2));
    threadId = Thread.CurrentThread.ManagedThreadId;
    return $"Thread pool worker thread id was: {threadId}";
}
C# 多线程 04-使用任务平行库 03-组合任务

组合任务

/// <summary>
/// 组合任务
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var firstTask = new Task<int>(() => TaskMethod("First Task", 3));
    var secondTask = new Task<int>(() => TaskMethod("Second Task", 2));

    // 设置后续操作
    firstTask.ContinueWith(t => Console.WriteLine($"The first answer is {t.Result}. " +
        $"Thread id {Thread.CurrentThread.ManagedThreadId}, " +
        $"is thread pool thread: {Thread.CurrentThread.IsThreadPoolThread}"),
        TaskContinuationOptions.OnlyOnRanToCompletion);

    firstTask.Start();
    secondTask.Start();

    // 等待上面的两个任务完成
    Thread.Sleep(TimeSpan.FromSeconds(4));

    // 给第二个任务设置后续操作,并使用 TaskContinuationOptions.ExecuteSynchronously 选项尝试同步执行该后续操作
    Task continuation = secondTask.ContinueWith(t => Console.WriteLine($"The first answer is {t.Result}. " +
        $"Thread id {Thread.CurrentThread.ManagedThreadId}, " +
        $"is thread pool thread: {Thread.CurrentThread.IsThreadPoolThread}"),
        TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously);
    // 为上面的后续操作再定义一个后续操作
    continuation.GetAwaiter().OnCompleted(() => Console.WriteLine($"Continuation Task Completed! " +
        $"Thread id: {Thread.CurrentThread.ManagedThreadId}, " +
        $"is thread pool thread: {Thread.CurrentThread.IsThreadPoolThread}"));

    Thread.Sleep(TimeSpan.FromSeconds(2));
    Console.WriteLine();

    firstTask = new Task<int>(() =>
    {
        // 使用 TaskCreationOptions.AttachedToParent 运行子任务
        // 只有所有子任务结束工作,父任务才会完成
        var innerTask = Task.Factory.StartNew(() => TaskMethod("Second Task", 5), TaskCreationOptions.AttachedToParent);

        innerTask.ContinueWith(t => TaskMethod("Third Task", 2), TaskContinuationOptions.AttachedToParent);

        return TaskMethod("First task", 2);
    });

    firstTask.Start();

    // 打印任务状态
    // firstTask 父任务处理仍在执行中时状态为 running
    // 父任务执行结束而子任务仍在执行时状态为 WaitingForChildrenToComplete
    // 全部完成后状态为 RanToCompletion
    while (!firstTask.IsCompleted)
    {
        Console.WriteLine(firstTask.Status); // running / WaitingForChildrenToComplete
        Thread.Sleep(TimeSpan.FromSeconds(0.5));
    }
    Console.WriteLine(firstTask.Status); // RanToCompletion
    Thread.Sleep(TimeSpan.FromSeconds(10));

    Console.ReadLine();
}

static int TaskMethod(string name, int seconds)
{
    Console.WriteLine($"Task {name} is running on a thread id {Thread.CurrentThread.ManagedThreadId}. " +
        $"Is thread pool thread: {Thread.CurrentThread.IsThreadPoolThread}");
    Thread.Sleep(TimeSpan.FromSeconds(seconds));
    return 42 * seconds;
}
C# 多线程 04-使用任务平行库 02-使用任务执行基本的操作

使用任务执行基本的操作

/// <summary>
///  使用任务执行基本的操作
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    // 直接调用,同步执行,不是线程池中的线程
    TaskMethod("Main Thread Task");
    
    Task<int> task = CreateTask("Task 1");
    // 启动任务
    // 该任务会被放置在线程池中
    task.Start();
    // 等待结果(直到任务返回前,主线程一直处于阻塞状态)
    int result = task.Result;
    Console.WriteLine($"Result is: {result}");

    task = CreateTask("Task 2");
    // 同步运行该任务
    // 该任务会运行在主线程中,运行结果同直接调用一样
    task.RunSynchronously();
    result = task.Result;
    Console.WriteLine($"Result is: {result}");

    task = CreateTask("Task 3");
    Console.WriteLine(task.Status); // Created
    // 启动任务
    task.Start();
    // 这里没有阻塞主线程,而是在任务完成前循环打印任务状态
    // 任务状态分别为:Created、Running 和 RanToCompletion
    while (!task.IsCompleted)
    {
        Console.WriteLine(task.Status); // Running 和 RanToCompletion
        Thread.Sleep(TimeSpan.FromSeconds(0.5));
    }
    Console.WriteLine(task.Status); // RanToCompletion
    result = task.Result;
    Console.WriteLine($"Result is: {result}");

    Console.ReadLine();
}

static Task<int> CreateTask(string name)
{
    return new Task<int>(() => TaskMethod(name));
}

static int TaskMethod(string name)
{
    Console.WriteLine($"Task {name} is running on a thread id {Thread.CurrentThread.ManagedThreadId}. Is thread pool thread: {Thread.CurrentThread.IsThreadPoolThread}");
    Thread.Sleep(TimeSpan.FromSeconds(2));
    return 42;
}
C# 多线程 04-使用任务平行库 01-创建 Task

简介

.Net Framework4.0 引入了一个新的关于异步操作的 API - 任务并行库(Task Parallel Library,简称 TPL)。

.Net Framework4.5 对该 API 进行了轻微的改进,使用更简单。

TPL 可被认为是线程池之上的有一个抽象层,其对程序员隐藏了于线程池交互的底层代码,可以使用或不使用独立线程运行。

C#5.0已经内置了对TPL的支持,允许我们使用新的 awaitasync 关键字以平滑的、舒服的方式操作任务。

C# 多线程 03-使用线程池 07-使用 BackgroundWorker 组件

使用 BackgroundWorker 组件

/// <summary>
/// 使用 BackgroundWorker 组件 
/// 借助于该对象,可以将异步代码组织为一系列事件及事件处理器
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var bw = new BackgroundWorker();
    bw.WorkerReportsProgress = true; // 是否可以报告进度更新
    bw.WorkerSupportsCancellation = true; // 是否支持异步取消操作

    bw.DoWork += Worker_DoWork;
    bw.ProgressChanged += Worker_ProgressChanged;
    bw.RunWorkerCompleted += Worker_Completed;

    bw.RunWorkerAsync();

    Console.WriteLine("Press C to cancel work");
    do
    {
        if (Console.ReadKey(true).KeyChar == 'C')
        {
            bw.CancelAsync();
        }
    } while (bw.IsBusy);
}

static void Worker_DoWork(object sender, DoWorkEventArgs e)
{
    Console.WriteLine($"DoWork 线程池的线程 ID:{Thread.CurrentThread.ManagedThreadId}");
    var bw = (BackgroundWorker)sender;
    for (int i = 1; i <= 100; i++)
    {
        if (bw.CancellationPending)
        {
            e.Cancel = true;
            return;
        }
        if (i%10 == 0)
        {
            // 触发 BackgroundWorker 的 ProgressChanged 事件
            bw.ReportProgress(i);
        }
        Thread.Sleep(TimeSpan.FromSeconds(0.1));
    }

    e.Result = 42;
}

static void Worker_ProgressChanged(object sender, ProgressChangedEventArgs e)
{
    Console.WriteLine($"已完成 {e.ProgressPercentage}%. Progress 线程池 Id:{Thread.CurrentThread.ManagedThreadId}");
}

static void Worker_Completed(object sender, RunWorkerCompletedEventArgs e)
{
    Console.WriteLine($"Completed 线程池 Id:{Thread.CurrentThread.ManagedThreadId}");
    if (e.Error != null)
    {
        Console.WriteLine($"发生异常:{e.Error.Message}");
    }
    else if (e.Cancelled)
    {
        Console.WriteLine($"操作已被取消.");
    } else
    {
        Console.WriteLine($"结果为:{e.Result}.");
    }
}
C# 多线程 03-使用线程池 06-使用计时器

使用 System.Threading.Timer 对象在线程池中创建周期性调用的异步操作。

/// <summary>
/// 使用计时器
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    Console.WriteLine("Press 'Enter' to stop the timer ...");
    DateTime start = DateTime.Now;
    // 第一个参数为定时器定时执行的处理
    // 第三个参数为多长时间后第一次执行
    // 第四个参数为第一次执行之后再次调用的间隔时间
    _timer = new Timer(_ => TimerOperation(start), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2));
    try
    {
        Thread.Sleep(TimeSpan.FromSeconds(6));
        // 使用 Change 方法改变第一次执行时间和之后再次调用的间隔时间
        _timer.Change(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(4));
        Console.ReadLine();
    }
    catch (Exception)
    {
        // 注销定时器
        _timer.Dispose();
    }
}

static Timer _timer;

static void TimerOperation(DateTime start)
{
    TimeSpan elapsed = DateTime.Now - start;
    Console.WriteLine($"{elapsed.TotalSeconds} seconds from {start}. Timer thread pool thrad id:{Thread.CurrentThread.ManagedThreadId}");
}
C# 多线程 03-使用线程池 05-在线程中使用等待事件处理器及超时

本节将通过一个示例来展示如何在线程池中取消异步操作。

/// <summary>
/// 在线程中使用等待事件处理器及超时
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    RunOperations(TimeSpan.FromSeconds(5));
    RunOperations(TimeSpan.FromSeconds(7));
}

static void RunOperations(TimeSpan workerOperationTimeout)
{
    using (var evt = new ManualResetEvent(false))
    using (var cts = new CancellationTokenSource())
    {
        Console.WriteLine($"注册一个超时操作...");
        // 该方法允许我们将回调函数放入线程池中的队列中。
        // 当提供的等待时间处理器收到信号或发生超时时,该回调函数将被调用。
        // 第一个参数是等待对象(System.Threading.WaitHandle)
        // 第二个参数是回调函数
        // 第四个参数是超时事件
        // 第五个参数为 true,表示仅执行一次
        var worker = ThreadPool.RegisterWaitForSingleObject(evt,
            (state, isTimeOut) => WorkerOperationWait(cts, isTimeOut),
            null,
            workerOperationTimeout,
            true);

        Console.WriteLine("开始执行一个长操作");
        ThreadPool.QueueUserWorkItem(_ => WorkerOperation(cts.Token, evt));

        Thread.Sleep(workerOperationTimeout.Add(TimeSpan.FromSeconds(2)));
        worker.Unregister(evt);
    }
}

static void WorkerOperation(CancellationToken token, ManualResetEvent evt)
{
    for (int i = 0; i < 6; i++)
    {
        // 判断 token 是否已取消
        if (token.IsCancellationRequested)
        {
            return;
        }
        Thread.Sleep(TimeSpan.FromSeconds(1));
    }
    // 6 秒后发送事件结束信号
    evt.Set();
}

static void WorkerOperationWait(CancellationTokenSource cts, bool isTimeOut)
{
    if (isTimeOut)
    {
        // 如果操作超时,则发送取消信号
        cts.Cancel();
        Console.WriteLine("操作超时且被取消");
    } else
    {
        Console.WriteLine("操作执行成功");
    }
}
C# 多线程 03-使用线程池 04-实现一个取消选项

/// <summary>
/// 实现一个取消选项
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    using (var cts = new CancellationTokenSource())
    {
        CancellationToken token = cts.Token;
        ThreadPool.QueueUserWorkItem(_ => AsyncOperation1(token));
        Thread.Sleep(TimeSpan.FromSeconds(2));
        cts.Cancel();
    }

    using (var cts = new CancellationTokenSource())
    {
        CancellationToken token = cts.Token;
        ThreadPool.QueueUserWorkItem(_ => AsyncOperation2(token));
        Thread.Sleep(TimeSpan.FromSeconds(2));
        cts.Cancel();
    }

    using (var cts = new CancellationTokenSource())
    {
        CancellationToken token = cts.Token;
        ThreadPool.QueueUserWorkItem(_ => AsyncOperation3(token));
        Thread.Sleep(TimeSpan.FromSeconds(2));
        cts.Cancel();
    }

    Thread.Sleep(TimeSpan.FromSeconds(2));
    Console.ReadLine();
}

static void AsyncOperation1(CancellationToken token)
{
    Console.WriteLine("开始第一个任务");
    for (int i = 0; i < 5; i++)
    {
        // 轮询检查 token.IsCancellationRequested,如果为 true,说明操作需要被取消
        if (token.IsCancellationRequested)
        {
            Console.WriteLine("第一个任务已经被取消");
            return;
        }
        Thread.Sleep(TimeSpan.FromSeconds(1));
    }
    Console.WriteLine("第一个任务已经成功完成");
}

static void AsyncOperation2(CancellationToken token)
{
    try
    {
        Console.WriteLine("开始第二个任务");
        for (int i = 0; i < 5; i++)
        {
            // 如果已取消,则抛出异常
            token.ThrowIfCancellationRequested();
            Thread.Sleep(TimeSpan.FromSeconds(1));
        }
        Console.WriteLine("第二个任务已经成功完成");
    }
    catch (OperationCanceledException)
    {
        // 在主处理意外处理取消过程
        Console.WriteLine("第二个任务已经被取消");
    }
}

static void AsyncOperation3(CancellationToken token)
{
    bool cancelltionFlag = false;
    // 注册一个回调函数
    // 当操作被取消时,线程池将调用该回调函数
    token.Register(() => cancelltionFlag = true);
    Console.WriteLine("开始第三个任务");
    for (int i = 0; i < 5; i++)
    {
        if (cancelltionFlag)
        {
            Console.WriteLine("第三个任务已经被取消");
            return;
        }
        Thread.Sleep(TimeSpan.FromSeconds(1));
    }
    Console.WriteLine("第三个任务已经成功完成");
}
C# 多线程 03-使用线程池 03-线程池和并行度

本节展示线程池如何工作于大量的异步操作,以及它与创建大量单独的线程的方式有何不同。

/// <summary>
/// 线程池和并行度
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    const int numberOfOpeartions = 500;
    var sw = new Stopwatch();
    sw.Start();
    UseThreads(numberOfOpeartions);
    sw.Stop();
    Console.WriteLine($"使用线程的执行时间:{sw.ElapsedMilliseconds}ms");

    sw.Reset();
    sw.Start();
    UseThreadPool(numberOfOpeartions);
    sw.Stop();
    Console.WriteLine($"使用线程池的执行时间:{sw.ElapsedMilliseconds}ms");

    Console.ReadLine();
}

/// <summary>
/// 使用线程
/// </summary>
/// <param name="numberOfOperations"></param>
static void UseThreads(int numberOfOperations)
{
    using (var countdown = new CountdownEvent(numberOfOperations))
    {
        Console.WriteLine("通过创建线程执行处理");
        for (int i = 0; i < numberOfOperations; i++)
        {
            var thread = new Thread(() =>
            {
                Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}");
                Thread.Sleep(TimeSpan.FromSeconds(0.1));
                countdown.Signal();
            });
            thread.Start();
        }
        countdown.Wait();

        Console.WriteLine();
    }
}

/// <summary>
/// 使用线程池
/// </summary>
/// <param name="numberOfOperations"></param>
static void UseThreadPool(int numberOfOperations)
{
    using (var countdown = new CountdownEvent(numberOfOperations))
    {
        Console.WriteLine("在一个线程池中执行处理");
        for (int i = 0; i < numberOfOperations; i++)
        {
            ThreadPool.QueueUserWorkItem(_ => {
                Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}");
                Thread.Sleep(TimeSpan.FromSeconds(0.1));
                countdown.Signal();
            });
        }
        countdown.Wait();

        Console.WriteLine();
    }
}
C# 多线程 03-使用线程池 02-向线程池中放入异步操作

/// <summary>
/// 向线程池中放入异步操作
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    const int x = 1;
    const int y = 2;
    const string lambdaState = "lambda state 2";

    // 使用 QueueUserWorkItem 方法将 AsyncOperation 方法放到线程池中
    ThreadPool.QueueUserWorkItem(AsyncOperation);
    Thread.Sleep(TimeSpan.FromSeconds(3));

    // 带参数的调用(参数值传递到 AsyncOperation 方法的参数 object state)
    ThreadPool.QueueUserWorkItem(AsyncOperation, "async state");
    // 使线程睡眠,从而让线程池拥有为新操作重用线程的可能性
    Thread.Sleep(TimeSpan.FromSeconds(3));

    // 使用 lamda 表达式放置到线程池
    ThreadPool.QueueUserWorkItem(state =>
    {
        Console.WriteLine($"操作状态:{state}");
        Console.WriteLine($"当前工作线程 Id:{Thread.CurrentThread.ManagedThreadId}");
        Thread.Sleep(TimeSpan.FromSeconds(2));
    }, "lambda state");

    // 这个 lambda 表达式中使用了闭包机制,从而无须传递 lambda 表达式的状态
    // 闭包更灵活,允许我们向异步方法传递一个以上的对象,而且这些对象具有静态类型
    ThreadPool.QueueUserWorkItem(_ =>
    {
        Console.WriteLine($"操作状态:{x + y}, {lambdaState}");
        Console.WriteLine($"当前工作线程 Id:{Thread.CurrentThread.ManagedThreadId}");
        Thread.Sleep(TimeSpan.FromSeconds(2));
    });

    Thread.Sleep(TimeSpan.FromSeconds(2));
}

private static void AsyncOperation(object state)
{
    Console.WriteLine($"操作状态:{state ?? "(null)"}");
    Console.WriteLine($"当前工作线程 Id:{Thread.CurrentThread.ManagedThreadId}");
    Thread.Sleep(TimeSpan.FromSeconds(2));
}
C# 多线程 02 线程同步

实现线程同步的方法

  1. 如果无须共享对象,那么就无须进行线程同步

  2. 只是用原子操作

  3. 内核模式(kernel-mode)

    • 将等待的线程置于阻塞状态。

    • 当线程处于阻塞状态时,只会占用尽可能少的 CPU 时间。

    • 然而,这意味着将引入至少一次所谓的上下文切换(context switch)

    • 上下文切换是指操作系统的线程调度器。该调度器会保存等待的线程的状态,并切换到另一个线程,依次恢复等待的线程的状态。这需要消耗相当多的资源。

  4. 用户模式(user-mode)

    • 不将线程切换到阻塞状态。

    • 该模式非常轻量,速度很快,但如果线程需要等待较长时间则会浪费大量的 CPU 时间。

  5. 混合模式(hybrid)

    • 混合模式会先尝试使用用户模式等待,

    • 如果线程等待了足够长的时间,则会切换到阻塞状态以节省 CPU 资源。

C# 多线程 01 线程基础

创建线程

Thread t = new Thread(PrintNumbers);
t.Start();