Skip to content

C# 多线程 02 线程同步

🏷️ 《C# 多线程》

实现线程同步的方法

  1. 如果无须共享对象,那么就无须进行线程同步

  2. 只是用原子操作

  3. 内核模式(kernel-mode)

    • 将等待的线程置于阻塞状态。

    • 当线程处于阻塞状态时,只会占用尽可能少的 CPU 时间。

    • 然而,这意味着将引入至少一次所谓的上下文切换(context switch)

    • 上下文切换是指操作系统的线程调度器。该调度器会保存等待的线程的状态,并切换到另一个线程,依次恢复等待的线程的状态。这需要消耗相当多的资源。

  4. 用户模式(user-mode)

    • 不将线程切换到阻塞状态。

    • 该模式非常轻量,速度很快,但如果线程需要等待较长时间则会浪费大量的 CPU 时间。

  5. 混合模式(hybrid)

    • 混合模式会先尝试使用用户模式等待,

    • 如果线程等待了足够长的时间,则会切换到阻塞状态以节省 CPU 资源。

执行基本的原子操作

借助于Interlocked类,无需锁定任何对象即可获得正确的结果。

csharp
private int _count;

public int Count => _count;

public override void Decrement()
{
    Interlocked.Decrement(ref _count);
}

public override void Increment()
{
    Interlocked.Increment(ref _count);
}

使用 Mutex 类

csharp
onst string MutexName = "CSharp Threading Cookbook";
// Mutex 是一种原始的同步方式,其只对一个线程授予对共享资源的独占访问。
// 具名的互斥量是全局的操作系统对象,务必正确关闭互斥量。
// 最好是使用 using 代码快来包裹互斥对象。
using (var m = new Mutex(false, MutexName))
{
    if (!m.WaitOne(TimeSpan.FromSeconds(5), false))
    {
        Console.WriteLine("Second instance is running");
        Console.ReadLine();
    } else
    {
        Console.WriteLine("Running");
        Console.ReadLine();
        m.ReleaseMutex();
    }
}

使用 SemaphoreSlim 类

SemaphoreSlim类限制了同时访问同一个资源的线程数量。

csharp
// 构造函数中指定允许的并发线程数量
// 这里指定了并发数为 4 个线程
// 当有 4 个线程获取了资源后,其它的线程需要等待
static SemaphoreSlim _samphore = new SemaphoreSlim(4);

static void Main(string[] args)
{
    for (int i = 0; i < 6; i++)
    {
        string threadName = "Thread " + i;
        int secondsToWait = 2 + 2 * i;
        var t = new Thread(() => AccessDatabase(threadName, secondsToWait));
        t.Start();
    }
    Console.ReadLine();
}

static void AccessDatabase(String name, int secondes)
{
    Console.WriteLine($"{name} wait to access a database");
    // 调用 Wait 方法获取资源,当超过最大指定并发数量时,则需要等待其它线程释放资源
    _samphore.Wait();
    Console.WriteLine($"{name} was granted an access to a database");
    Thread.Sleep(TimeSpan.FromSeconds(secondes));
    Console.WriteLine($"{name} is completed");
    // 调用 Release 方法释放资源
    _samphore.Release();
}

这里使用了混合模式,其允许在等待时间很短的情况下无需使用上下文切换。
还有一个叫SemaphoreSemaphoreSlim类的老版本。该版本使用纯粹的内核时间(kernel-time)方法。
一般没必要使用它,除非是非常重要的场景。
SemaphoreSlim并不适用 Windows 内核信号量,而且也不支持进程间同步。所以在跨程序同步的场景下可以使用Semaphore.

使用 AutoResetEvent 类

csharp
static void Main(string[] args)
{
    var t = new Thread(() => Process(10));
    t.Start();

    Console.WriteLine("Waiting for another thread to complete work");
    _workerEvent.WaitOne();
    Console.WriteLine("First operation is completed!");
    Console.WriteLine("Performing an operation on a main thread");
    Thread.Sleep(TimeSpan.FromSeconds(5));
    _mainEvent.Set();
    Console.WriteLine("Now running the second operation on a second thread");
    _workerEvent.WaitOne();
    Console.WriteLine("Second operation is completed");
    Console.ReadLine();
}

// 参数为 false,定义了初始状态为 unsignaled
// 这意味着任何线程调用这个对象的 WaitOne 方法将会被阻塞,直到调用了 Set 方法
private static AutoResetEvent _workerEvent = new AutoResetEvent(false);
private static AutoResetEvent _mainEvent = new AutoResetEvent(false);
// 如果参数为 true,则初始状态为 signaled,如果线程调用 WaitOne 方法则会被立即处理,然后事件状态自动变为 unsignaled,
// 所以需要对该实例调用一次 Set 方法,以便让其他的线程对该实例调用 WaitOne 方法从而继续执行。

static void Process(int seconds)
{
    Console.WriteLine("Starting a long running work...");
    Thread.Sleep(TimeSpan.FromSeconds(seconds));
    Console.WriteLine("Work is done!");
    _workerEvent.Set();
    Console.WriteLine("Waiting for a main thread to complete its work");
    _mainEvent.WaitOne();
    Console.WriteLine("Starting second operation...");
    Thread.Sleep(TimeSpan.FromSeconds(seconds));
    Console.WriteLine("Work is done!");
    _workerEvent.Set();
}

执行结果

Waiting for another thread to complete work
Starting a long running work...
Work is done!
Waiting for a main thread to complete its work
First operation is completed!
Performing an operation on a main thread
Now running the second operation on a second thread
Starting second operation...
Work is done!
Second operation is completed

使用 ManualResetEventSlim 类

csharp
static void Main(string[] args)
{
    var t1 = new Thread(() => TravelThroughGates("Thread 1", 5));
    var t2 = new Thread(() => TravelThroughGates("Thread 2", 6));
    var t3 = new Thread(() => TravelThroughGates("Thread 3", 12));
    t1.Start();
    t2.Start();
    t3.Start();
    Thread.Sleep(TimeSpan.FromSeconds(6));
    Console.WriteLine("门打开啦!");
    _mainEvent.Set();
    Thread.Sleep(TimeSpan.FromSeconds(2));
    _mainEvent.Reset();
    Console.WriteLine("门关上啦!");
    Thread.Sleep(TimeSpan.FromSeconds(10));
    Console.WriteLine("门又打开啦!");
    _mainEvent.Set();
    Thread.Sleep(TimeSpan.FromSeconds(2));
    Console.WriteLine("门又关上啦!");
    _mainEvent.Reset();
    Console.ReadLine();

}

static ManualResetEventSlim _mainEvent = new ManualResetEventSlim(false);

static void TravelThroughGates(string threadName, int seconds)
{
    Console.WriteLine($"{threadName} 睡眠 {seconds} 秒");
    Thread.Sleep(TimeSpan.FromSeconds(seconds));
    Console.WriteLine($"{threadName} 等待门打开!");
    _mainEvent.Wait();
    Console.WriteLine($"{threadName} 已进门!");
}

输出结果

Thread 1 睡眠 5 秒
Thread 2 睡眠 6 秒
Thread 3 睡眠 12 秒
Thread 1 等待门打开!
门打开啦!
Thread 1 已进门!
Thread 2 等待门打开!
Thread 2 已进门!
门关上啦!
Thread 3 等待门打开!
门又打开啦!
Thread 3 已进门!
门又关上啦!

ManualResetEventSlimManualResetEvent的混合版本,一直保持大门敞开直到手动调用 Reset 方法。

使用 CountDownEvent 类

csharp
static void Main(string[] args)
{
    Console.WriteLine("开始两个操作");
    var t1 = new Thread(() => PerformOperation("操作 1 已经完成", 4));
    var t2 = new Thread(() => PerformOperation("操作 2 已经完成", 8));

    t1.Start();
    t2.Start();

    // 直到_countdown 的计数变为 0 才会继续执行
    _countdown.Wait();
    Console.WriteLine("两个操作都已经完成");
    _countdown.Dispose();

    Console.ReadLine();
}

static CountdownEvent _countdown = new CountdownEvent(2);
        
static void PerformOperation(string message, int seconds)
{
    Thread.Sleep(TimeSpan.FromSeconds(seconds));
    Console.WriteLine(message);
    // _countdown 的计数减 1
    _countdown.Signal();
}

打印结果

开始两个操作
操作 1 已经完成
操作 2 已经完成
两个操作都已经完成

使用 Barrier 类

csharp
// Barrier 类用于组织多个线程及时在某个时刻碰面
// 其提供一个回调函数,每次线程调用了 SignalAndWait 方法后该回调函数会被执行
static Barrier _barrier = new Barrier(2, b => Console.WriteLine($"结束阶段 {b.CurrentPhaseNumber + 1}"));

static void Main(string[] args)
{
    var t1 = new Thread(() => PlayMusic("吉他手", "弹一段 Solo", 5));
    var t2 = new Thread(() => PlayMusic("歌手", "唱歌", 2));

    t1.Start();
    t2.Start();

    Console.ReadLine();
}

static void PlayMusic(string name, string message, int seconds)
{
    for (int i = 1; i < 3; i++)
    {
        Console.WriteLine("----------------------------------------");
        Thread.Sleep(TimeSpan.FromSeconds(seconds));
        Console.WriteLine($"{name} 开始 {message}");
        Thread.Sleep(TimeSpan.FromSeconds(seconds));
        Console.WriteLine($"{name} 完成了 {message}");
        _barrier.SignalAndWait();
    }
}

打印结果

----------------------------------------
----------------------------------------
歌手 开始 唱歌
歌手 完成了 唱歌
吉他手 开始 弹一段Solo
吉他手 完成了 弹一段Solo
结束阶段 1
----------------------------------------
----------------------------------------
歌手 开始 唱歌
歌手 完成了 唱歌
吉他手 开始 弹一段Solo
吉他手 完成了 弹一段Solo
结束阶段 2

使用 ReaderWriterLockSlim 类

csharp
// ReaderWriterLockSlim 代表一个管理资源访问的锁,允许多个线程同时读取,以及独占写
static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
static Dictionary<int, int> _items = new Dictionary<int, int>();

static void Main(string[] args)
{
    new Thread(Read) { IsBackground = true, Name = "Read Thread 1"}.Start();
    new Thread(Read) { IsBackground = true, Name = "Read Thread 2" }.Start();
    new Thread(Read) { IsBackground = true, Name = "Read Thread 3" }.Start();

    new Thread(() => Write("Write Thread 1")) { IsBackground = true }.Start();
    new Thread(() => Write("Write Thread 2")) { IsBackground = true }.Start();

    // 30s 后主线程结束
    Thread.Sleep(TimeSpan.FromSeconds(30));
}

static void Read()
{
    Console.WriteLine("读取字典中的内容");
    while (true)
    {
        try
        {
            // 获取读锁(允许多个线程同时获取读锁)
            _rw.EnterReadLock();
            foreach (var key in _items)
            {
                // Console.WriteLine($"线程 {Thread.CurrentThread.Name} : {key}");
                Thread.Sleep(TimeSpan.FromSeconds(0.1));
            }
        }
        finally
        {
            // 在 finally 中释放锁,确保锁最终会被释放
            _rw.ExitReadLock();
        }
    }
}

static void Write(string threadName)
{
    while (true)
    {
        try
        {
            int newKey = new Random().Next(250);
            // 获取可升级读锁
            _rw.EnterUpgradeableReadLock();
            if (!_items.ContainsKey(newKey))
            {
                try
                {
                    // 等待所有的读锁释放后获取写锁,此时所有的获取读锁操作会被阻塞
                    _rw.EnterWriteLock();
                    _items[newKey] = 1;
                    Console.WriteLine($"新 Key {newKey} 被 {threadName} 加入字典");
                }
                finally
                {
                    // 在 finally 中释放锁,确保锁最终会被释放
                    _rw.ExitWriteLock();
                }
            }
            Thread.Sleep(TimeSpan.FromSeconds(0.1));
        }
        finally
        {
            // 在 finally 中释放锁,确保锁最终会被释放
            _rw.ExitUpgradeableReadLock();
        }
    }
}

打印结果

读取字典中的内容
读取字典中的内容
读取字典中的内容
新 Key 64 被 Write Thread 1 加入字典
新 Key 230 被 Write Thread 1 加入字典
新 Key 243 被 Write Thread 1 加入字典
新 Key 125 被 Write Thread 1 加入字典
新 Key 152 被 Write Thread 2 加入字典
新 Key 165 被 Write Thread 1 加入字典
新 Key 114 被 Write Thread 1 加入字典
新 Key 183 被 Write Thread 1 加入字典
新 Key 119 被 Write Thread 1 加入字典
新 Key 171 被 Write Thread 1 加入字典
新 Key 30 被 Write Thread 1 加入字典
新 Key 61 被 Write Thread 2 加入字典
新 Key 87 被 Write Thread 2 加入字典
新 Key 195 被 Write Thread 2 加入字典
新 Key 40 被 Write Thread 2 加入字典
新 Key 202 被 Write Thread 2 加入字典
新 Key 59 被 Write Thread 2 加入字典
新 Key 104 被 Write Thread 2 加入字典
新 Key 15 被 Write Thread 2 加入字典
新 Key 32 被 Write Thread 2 加入字典
新 Key 206 被 Write Thread 2 加入字典
新 Key 147 被 Write Thread 2 加入字典
新 Key 215 被 Write Thread 1 加入字典
新 Key 38 被 Write Thread 1 加入字典

csharp
// volatile 关键字指出一个字段可能会被同时执行的多个线程修改。
// 声明为 volatile 的字段不会被编译器和处理器优化为只能被单个线程访问。
// 这确保了该字段总是最新的值
static volatile bool _isCompleted = false;

static void Main(string[] args)
{
    var t1 = new Thread(UserModeWait);
    var t2 = new Thread(HybirdSpinWait);

    Console.WriteLine("执行用户模式等待");
    t1.Start();
    Thread.Sleep(20);
    // Thread.Sleep(TimeSpan.FromSeconds(20));
    _isCompleted = true;
    Thread.Sleep(TimeSpan.FromSeconds(1));
    _isCompleted = false;

    Console.WriteLine("执行混合模式等待");
    t2.Start();
    Thread.Sleep(5);
    // Thread.Sleep(TimeSpan.FromSeconds(20));
    _isCompleted = true;

    Console.ReadLine();
}

static void UserModeWait()
{
    while (!_isCompleted)
    {
        // 这里会一直消耗 CPU 时间
        Console.Write(".");
    }
    Console.WriteLine();
    Console.WriteLine("等待已完成");
}

static void HybirdSpinWait()
{
    // 使用 SpinWait 来使线程等待
    // 
    var w = new SpinWait();
    while (!_isCompleted)
    {
        w.SpinOnce();
        // 获取是否确保下次调用 System.Threading.SpinWait.SpinOnce 会产生处理器,同时触发强制的上下文切换。
        // false : 用户模式 不会发生上下文切换 但会浪费 CPU 时间
        // true : 内核模式 会发生上下文切换 但会节省 CPU 时间
        Console.WriteLine(w.NextSpinWillYield);
    }
    Console.WriteLine("等待已完成");
}

打印结果

执行用户模式等待
................................................................................
................................................................................
................................................................................
................................................................................
................................................................................
................................................................................
................................................................................
................................................................................
................................................................................
................................................................................
............................................
等待已完成
执行混合模式等待
False
False
False
False
False
False
False
False
False
False
True
True
True
True
True
True
True
True
True
True
True
True
True
等待已完成

看上面的数据结果没啥差别。我们将持续改为 20s 后执行,看一下 CPU 的负载。

CPU 时间曲线

效果很明显,使用 SpinWati 的版本明显 CPU 负载很低。