4

C# 线程同步查漏补缺 - 鹅群中的鸭霸

 1 year ago
source link: https://www.cnblogs.com/wengzp/p/17086895.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

C# 线程同步查漏补缺

当线程 A 在等待一个同步构造,另一个线程 B 持有构造一直不释放,那么就会导致线程 A 阻塞。同步构造有用户模式构造和内核模式构造。

  • 用户模式构造通过 CPU 指令来协调线程,所以速度很快。也意味着不受操作系统控制,所以等待构造的线程会不停自旋,浪费 CPU 时间。
  • 内核模式构造通过操作系统来协调线程。内核构造在获取时,需要先转换成本机代码,在转换成内核代码,返回时则需要反方向再转换一次,所以速度会比用户构造慢很多。
    因为使用了操作系统来协调线程,所以也有了更加强大的功能。
    1. 不同线程在竞争一个资源时,操作系统会阻塞线程,所以不会自旋。
    2. 可以实现托管线程和本机线程的同步。
    3. 可以跨进程跨 domain 同步。
    4. 可以利用 WaitHandle 类的方法实现多个构造的同步或者超时机制。

活锁和死锁:
当线程获取不到资源,从而不停在 CPU 上自旋等待资源,就会形成活锁。这是通过用户构造实现的。
当线程获取不到资源,被操作系统阻塞,就会形成死锁。这是通过内核构造实现的。

用户模式构造

.Net 提供了两种用户构造,易变构造 Volatile、互锁构造 Interlocked,这两种构造都提供了原子性读写的功能。
.Net 提供了基于易变构造、互锁构造、SpinWait 实现的自旋锁 SpinLock。

原子性读写:
在 32 位 CPU 中,CPU 一次只能存储 32 位的数据,所以如果是 64 位的数据类型(如 double),就得执行两次 MOV 指令,所以在 32 位 CPU 和 32 位操作系统中,不同线程对 64 位的数据类型进行读写可能得到不同的结果。原子性读写就是保证了即使是 64 位的数据类型,不同线程读写也会得到相同的结果。现在的 CPU 和操作系统基本都是 64 位的,所以一般也不会遇到这种问题。

易变构造 Volatile 和 volatile 关键字

Volatile 一般用于阻止编译器代码优化,编译器优化代码会优化掉一些在单线程情况下无用的变量或者语句,在多线程代码下有时候会导致程序运行结果跟设计的不一样。
Volatile.Read() 强制对变量的取值必须在调用时读取,Volatile.Write() 强制对变量的赋值必须在调用时写入。

/// <summary>
/// 在 debug 模式下不开启代码优化,所以需要用 release 模式下生成。
/// 执行 dotnet build -c release --no-incremental 后运行代码,如果没有标记为易变,则不会打印 x。
/// </summary>
public void Test2()
{
    var switchTrue = false;

    var t = new Thread(() =>
    {
        var x = 0;
        while (!switchTrue) // 如果没有标记变量为易变,编译器会把 while(!switchTrue) 优化为 while(true) 从而导致永远不会打印出 x 的值
        //while (!Volatile.Read(ref switchTrue)) // 标记为易变,可以保证在调用时才进行取值,不会进行代码优化。
        {
            x++;
        }
        Console.WriteLine($"x: {x}");
    });
    t.IsBackground = true;
    t.Start();

    Thread.Sleep(100);
    switchTrue = true;
    Console.WriteLine("ok");
}

互锁构造 Interlocked

  1. Interlocked 除了保证原子性读写外,还提供了很多方便的方法,在调用的地方建立了内存屏障,所以可以用来实现各种锁。
/// <summary>
/// 用 Interlocked 实现一个简单的自旋锁
/// 注意:
/// 1. 自旋锁在获取不到锁的时候,会进行空转。所以在自旋的时候,会占用 CPU,所以一般不在单 CPU 机器上用。
/// 2. 当占有锁的线程优先级比获取锁的线程更低的时候,会导致占有锁的线程一直获取不到CPU进行工作,从而无法释放锁,导致活锁。
///    所以使用自旋锁的线程,应该禁用线程优先级提升功能。
/// </summary>
public class SimpleSpinLock
{
    private int _count;
    public void Enter()
    {
        while (true)
        {
            if (Interlocked.Exchange(ref _count, 1) == 0)
            {
                return;
            }
        }
    }

    public void Exit()
    {
        Volatile.Write(ref _count, 0);
    }
}
  1. Interlocked 也经常用来实现单例模式。实现单例模式经常用 lock 关键字和双检索模式的,但我都是用 Interlocked 或者 Lazy,因为更轻量代码也简单。
/// <summary>
/// 使用 Interlocked 实现的单例,轻量且简单。
/// 可能会同时调用多次构造函数,所以适合构造函数没有副作用的类
/// </summary>
internal class DoubleCheckLocking3
{
    private static DoubleCheckLocking3? _value;

    private DoubleCheckLocking3()
    {

    }

    private DoubleCheckLocking3 GetInstance()
    {
        if (_value != null) return _value;
        Interlocked.CompareExchange(ref _value, new DoubleCheckLocking3(), null);
        return _value;
    }
}

/// <summary>
/// 使用 lock 和双检索实现的单例化
/// </summary>
internal class DoubleCheckLocking
{
    private static DoubleCheckLocking? _value;

    private static readonly object _lock = new();

    private DoubleCheckLocking()
    {

    }

    public static DoubleCheckLocking GetInstance()
    {
        if (_value != null) return _value;
        lock (_lock)
        {
            if (_value == null)
            {
                var t = new DoubleCheckLocking();
                Volatile.Write(ref _value, t); 
            }
        }
        return _value;
    }
}

自旋锁 SpinLock

.Net 提供了一个轻量化的同步构造 SpinLock,很适合在不常发生竞争的场景使用。如果发生竞争了,会先在 CPU 上自旋一段时间,如果还不能获取到资源,就会让出 CPU 控制权给其他线程(使用 SpinWait 实现的)。

  1. SpinLock 不支持重入锁,当给构造函数 SpinLock(bool) 传入 true 时,重入锁会抛出异常,否则就会死锁。

重入锁(Re-Enter): 就是一个线程调用了 SpinLock.Enter() 后,没有调用 SpinLock.Exit(),再次调用了 SpinLock.Enter()。

/// <summary>
/// 测试 SpinLock 重入锁
/// </summary>
public void Test3()
{
    var spinLock = new SpinLock(true); // 如果传 true,如果 SpinLock 重入锁,就会抛出异常,传 false 则不会,只会死锁。

    ThreadPool.QueueUserWorkItem(_ => DoWork());

    void DoWork()
    {
        var lockTaken = false;

        for (int i = 0; i < 10; i++)
        {
            try
            {
                Thread.Sleep(100);
                if (!spinLock.IsHeldByCurrentThread)  // SpinLock.IsHeldByCurrentThread 可以判断是不是当前线程拥有锁,如果是就不再获取锁
                {
                    Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 获取锁,i 为 {i}");
                    spinLock.Enter(ref lockTaken);
                }
                //spinLock.Enter(ref lockTaken); // 重入锁会死锁

            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
        }

        if (lockTaken) // 使用 lockTaken 来判断锁是否已经被持有
        {
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 释放锁");
            spinLock.Exit();
        }
        Console.WriteLine("结束");
    }
}
  1. SpinLock 是 Struct 类型的,所以注意装箱拆箱。
/// <summary>
/// 测试装箱拆箱问题
/// </summary>
public void Test4()
{
    var spinLock = new SpinLock(false);
    Task.Run(() => DoWork(ref spinLock));
    Task.Run(() => DoWork(ref spinLock));

    // SpinLock 是 Struct 类型,要注意装箱拆箱的问题,试试看不加 ref 关键字的效果
    void DoWork(ref SpinLock spinLock)
    {
        var lockTaken = false;
        Thread.Sleep(500);
        spinLock.Enter(ref lockTaken);
        Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 获取锁");
    }
}

内核模式构造

WaitHandle

.Net 提供了 System.Threading.WaitHandle 和 WaitHandle 的子类来支持内核构造,WaitHandle 封装内核同步构造的句柄,并且提供了操作的方法,并且每个方法都会在调用处建立内存屏障。

WaitHandle 有以下实现类,这些类定义了一个信号机制,根据信号去释放线程或者阻塞线程,用于在多线程的场景下访问共享资源:
WaitHandle:抽象基类,封装了系统内核构造的句柄。继承自 MarshalByRefObject,所以可以跨进程和 domain 边界。

  • EventWaitHandle:事件构造。由内核维护了一个 bool 变量,为 false 阻塞线程,为 true 时释放线程。
    • AutoResetEvent:自动重置事件构造。调用 AutoResetEvent.Set() 每次只释放一个阻塞线程。
    • ManualResetEvent:手动重置事件构造。调用 ManualResetEvent.Set() 会释放所有阻塞线程,并且不会有阻塞线程的功能,需要调用 ManualResetEvent.ReSet() 才能再次阻塞线程。
  • Semaphore:信号量。由内核维护了一个 Int32 变量,为当值为 0 时,阻塞线程,调用 Semaphore.Release() 会把变量加 1,调用 WaitHandle.WaitOne() 会把变量减 1。
  • Mutex:互斥体。功能跟 Semaphore(1) 和 AutoResetEvent 类似,一次只能释放一个线程。

WaitHandle 有以下常用方法:

  • WaitHandle.WaitOne() 虚方法,等待一个同步构造。
  • WaitHandle.WaitAll() 等待一组同步构造全部解除阻塞。
  • WaitHandle.WaitAny() 等待一组同步构造中的一个解除阻塞。
  • WaitHandle.SignalAndWait(WaitHandle x, WaitHandle y) 传入两个同步构造,解除第一个构造的阻塞,等待第二个构造。
public class WaitHandleDemo
{
    /// <summary>
    /// 测试 WaitHandle.WaitAll(), 成功运行返回 true, 支持超时,当超时时,返回 false
    ///  WaitHandle.WaitAny(), 成功运行返回对应的 索引,支持超时,当超时时,返回 WaitHandle.WaitTimeout
    /// </summary>
    public void Test()
    {
        var waitHandleList = new WaitHandle[] { new AutoResetEvent(false), new AutoResetEvent(false) };

        ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[0]);
        ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[1]);
        var timeout = WaitHandle.WaitAll(waitHandleList);
        Console.WriteLine($"是否超时:{!timeout},WaitHandle.WaitAll() 结束");

        Thread.Sleep(500);

        ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[0]);
        ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[1]);
        timeout = WaitHandle.WaitAll(waitHandleList,1000);
        Console.WriteLine($"是否超时:{!timeout},WaitHandle.WaitAll() 结束");

        ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[0]);
        ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[1]);
        var index = WaitHandle.WaitAny(waitHandleList);
        Console.WriteLine($"{index} 已经结束运行,WaitHandle.WaitAny() 结束");

        ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[0]);
        ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[1]);
        index = WaitHandle.WaitAny(waitHandleList, 1000);
        Console.WriteLine($"是否超时:{WaitHandle.WaitTimeout == index},WaitHandle.WaitAny() 结束");
        

        void DoWork(object? state)
        {
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 开始");

            var r = new Random();
            var interval = 1000 * r.Next(2, 10);
            Thread.Sleep(interval);
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 结束");

            ((AutoResetEvent)state).Set();
        }
    }

    /// <summary>
    /// 测试 WaitHandle.SignalAndWait(), 成功运行返回 true, 支持超时,当超时时,返回 false
    /// </summary>
    public void Test2()
    {
        var are = new AutoResetEvent(false);
        var are2 = new AutoResetEvent(false);

        foreach (var i in Enumerable.Range(1,5))
        {
            Console.WriteLine($"按下 Enter 启动线程 {i}");
            Console.ReadLine();
            var t = new Thread(DoWork)
            {
                Name = $"线程 {i}"
            };
            t.Start();
            WaitHandle.SignalAndWait(are, are2); // 给 are 发信号,同时等待 are2
        }

        Console.WriteLine("全部线程运行结束");

        void DoWork()
        {
            are.WaitOne();
            Console.WriteLine($"{Thread.CurrentThread.Name} 开始");
            Thread.Sleep(1000);
            Console.WriteLine($"{Thread.CurrentThread.Name} 结束");
            are2.Set();
        }
    }
}

EventWaitHandle、ManualResetEvent、AutoResetEvent、ManualResetEventSlim

EventWaitHandle、ManualResetEvent、AutoResetEvent 是内核同步构造,EventWaitHandle 由内核维护了一个 bool 变量,为 false 的时候阻塞线程,为 true 的时候释放线程。ManualResetEvent、AutoResetEvent 继承自 EventWaitHandle,所以拥有一样的行为,同时可以跨进程跨 domain 通信。
ManualResetEventSlim 并不继承自 EventWaitHandle,只是功能跟 ManualResetEvent、AutoResetEvent 一样的混合同步构造,使用用户构造和内核构造混合实现,遇到竞争的情况,会先自旋一下,还无法获取到资源,再使用内核构造阻塞线程,所以有更好的性能。

  1. EventWaitHandle 一般在构造函数中传入 name,用来跨进程或者跨 domain 通信。
/// <summary>
/// 测试 EventWaitHandle 跟其他线程通信
/// </summary>
public void Test2()
{
    EventWaitHandle ewh;
    if (EventWaitHandle.TryOpenExisting("multi-process", out ewh))
    {
        Console.WriteLine("等待 EventWaitHandle");
        ewh.WaitOne();
        Console.WriteLine("结束运行");
    }
    else
    {
        ewh = new EventWaitHandle(false, EventResetMode.AutoReset, "multi-process");
        while (true)
        {
            Console.WriteLine("按下 Enter 跟其他线程通讯");
            Console.ReadLine();
            ewh.Set();
        }
    }
}
  1. ManualResetEvent 调用完 ManualResetEvent.Set() 后会释放所有阻塞线程,如果需要再次阻塞线程,需要调用 ManualResetEvent.Reset()。
/// <summary>
/// 测试 ManualResetEvent.Set() 和 ManualResetEvent.Reset()
/// </summary>
public void Test1()
{
    var mre = new ManualResetEvent(false);

    foreach (var i in Enumerable.Range(1, 3))
    {
        StartThread(i);
    }
    Thread.Sleep(500);
    Console.WriteLine("按下 Enter 调用 Set(),释放所有线程");
    Console.ReadLine();
    mre.Set();
    Thread.Sleep(500);

    Console.WriteLine("ManualResetEvent 内部值为 true 时,不会阻塞线程。按下 Enter 启动一个新线程进行测试");
    Console.ReadLine();

    StartThread(4);
    Thread.Sleep(500);

    Console.WriteLine("按下 Enter 调用 Reset(),可以再次阻塞线程");
    Console.ReadLine();
    mre.Reset();
    Thread.Sleep(500);


    foreach (var i in Enumerable.Range(5, 2))
    {
        StartThread(i);
    }
    Thread.Sleep(500);

    Console.WriteLine("按下 Enter 调用 Set(),释放所有线程,结束 demo");
    Console.ReadLine();
    mre.Set();
    Thread.Sleep(500);

    void StartThread(int i)
    {
        var t = new Thread(() =>
        {
            Console.WriteLine($"{Thread.CurrentThread.Name} 启动并调用 WaitOne()");
            mre.WaitOne();
            Console.WriteLine($"{Thread.CurrentThread.Name} 结束运行");
        })
        {
            Name = $"线程_{i}"
        };
        t.Start();
    }
}
  1. AutoResetEvent 每次调用 AutoResetEvent.Set() 都只会释放一个阻塞的线程。
public void Test()
{
    var are = new AutoResetEvent(false);

    Task.Run(() =>
    {
        for (int i = 0; i < 5; i++)
        {
            Thread.Sleep(500);
            Console.WriteLine("按下 Enter 释放一个线程");
            Console.ReadLine();
            are.Set();
        }
    });

    foreach (var i in Enumerable.Range(1,5))
    {
        var t = new Thread(DoWork);
        t.Name = $"线程 {i}";
        t.Start();
    }

    void DoWork()
    {
        Console.WriteLine($"{Thread.CurrentThread.Name} 开始");
        are.WaitOne();
        Console.WriteLine($"{Thread.CurrentThread.Name} 结束");
    }
}

Semaphore、SemaphoreSlim

Semaphore 是一个内核构造,由内核维护了一个 Int32 变量,为当值为 0 时,阻塞线程,调用 Semaphore.Release() 会把变量加 1,调用 WaitHandle.WaitOne() 会把变量减 1。
SemaphoreSlim 是一个混合构造,功能跟 Semaphore 一致,使用用户构造和内核构造混合实现,遇到竞争的情况,会先自旋一下,还无法获取到资源,再使用内核构造阻塞线程,所以有更好的性能。

  1. 使用 Semaphore 释放多个线程。
/// <summary>
/// 测试 Semaphore
/// </summary>
public void Test4()
{
    var pool = new Semaphore(1, 3); // 初始化计数 1,最大计数 3

    foreach (var i in Enumerable.Range(1, 5))
    {
        var t = new Thread(DoWork);
        t.Name = $"线程 {i}";
        t.Start();
    }

    Thread.Sleep(500);
    Console.WriteLine("按下 Enter 释放 3 个线程");
    Console.ReadLine();
    pool.Release(3); // 计数加3
    Thread.Sleep(500);
    Console.WriteLine("再按下 Enter 释放 1 个线程");
    Console.ReadLine();
    pool.Release(); // 计数加1

    void DoWork()
    {
        Console.WriteLine($"{Thread.CurrentThread.Name} 开始");
        pool.WaitOne(); // 计数减1
        Console.WriteLine($"{Thread.CurrentThread.Name} 结束");
    }
}
  1. Semaphore 继承自 WaitHandle,所以在构造函数中传入 name 可以跨进程跨 domain 同步。把 Semaphore 的最大计数设置为 1,可以实现跟 AutoResetEvent 一样每次只解除一个阻塞线程的行为。
/// <summary>
/// 测试跟其他进程通讯
/// </summary>
public void Test5()
{
    Semaphore pool;
    if (Semaphore.TryOpenExisting("multi-process", out pool))
    {
        Console.WriteLine("等待 Semaphore");
        pool.WaitOne();
        Console.WriteLine("结束");
    }
    else
    {
        pool = new Semaphore(0, 1, "multi-process"); // 最大计数设置为 1,每次只解除一个阻塞。

        while (true)
        {
            Console.WriteLine("按下 Enter 跟其他线程通讯");
            Console.ReadLine();
            pool.Release();
        }
    }
}

Mutex

Mutex 是一个内核构造,经常用于进程同步(如保证只有程序只能有一个进程)。功能跟 AutoResetEvent(false) 和 Semaphore(0,1) 类似,每次只能阻塞一个线程或者进程。
Mutex 跟 EventWaitHandle 和 Semaphore 不一样的地方是,Mutex 要求线程一致(也就是获取和释放都必须在同一个线程),并且支持重入锁。

/// <summary>
/// Mutex 支持重入锁,支持线程一致
/// </summary>
public void Test()
{
    var mutex = new Mutex(false);
    var count = 0;
    DoWork(mutex);

    void DoWork(Mutex mutex)
    {
        try
        {
            mutex.WaitOne();
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 获取 Mutex");
            Interlocked.Increment(ref count);
            Thread.Sleep(1000);
            if (Interlocked.CompareExchange(ref count, 3, 3) == 3)
            {
                return;
            }
            DoWork(mutex);
        }
        finally
        {
            mutex.ReleaseMutex(); // 调用几次 WaitOne() 就必须调用几次 ReleaseMutex(),并且调用 WaitOne() 和 ReleaseMutex() 必须在同一个线程。
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 释放 Mutex");
        }
    }
}

用户模式构造、内核模式构造性能对比

从上面可以看出,.Net 内核构造功能比用户构造强大得多,所以看起来似乎直接使用内核构造,而不使用用户模式构造更加明智。
但是用户构造会比内核构造快,所以在不常发生竞争或者性能敏感的场景下,使用用户构造会是一个更加优秀的做法。接下来用一个没有竞争的空方法测试一下快多少。

internal class PerformanceDemo
{
    /// <summary>
    /// 测试用户模式构造和内核模式构造,在锁没有发生竞争的情况下的性能差距
    /// </summary>
    public void Test()
    {
        var count = 1000 * 10000;
        var spinLock = new SpinLock(false);
        var are = new AutoResetEvent(true);
        var pool = new Semaphore(1, 1);

        var sw = Stopwatch.StartNew();
        foreach (var _ in Enumerable.Range(0, count))
        {
            var lockTaken = false;
            spinLock.Enter(ref lockTaken);
            DoWork();
            spinLock.Exit(lockTaken);
        }
        Console.WriteLine($"在没有竞争的场景下,执行一个空方法一千万次,SpinLock 耗时:{sw.ElapsedMilliseconds} ms");

        sw.Restart();
        foreach (var _ in Enumerable.Range(0, count))
        {
            are.WaitOne();
            DoWork();
            are.Set();
        }
        Console.WriteLine($"在没有竞争的场景下,执行一个空方法一千万次,AutoResetEvent 耗时:{sw.ElapsedMilliseconds} ms");

        sw.Restart();
        foreach (var _ in Enumerable.Range(0, count))
        {
            pool.WaitOne();
            DoWork();
            pool.Release();
        }
        Console.WriteLine($"在没有竞争的场景下,执行一个空方法一千万次,Semaphore 耗时:{sw.ElapsedMilliseconds} ms");

        // 空方法
        void DoWork()
        {

        }
    }
}
// 输出:
// 在没有竞争的场景下,执行一个空方法一千万次,SpinLock 耗时:184 ms
// 在没有竞争的场景下,执行一个空方法一千万次,AutoResetEvent 耗时:5449 ms
// 在没有竞争的场景下,执行一个空方法一千万次,Semaphore 耗时:5366 ms

最终在我的机子上测试,在没有发生竞争的场景下,.NET 提供的用户构造性能是内核构造的 30 倍,所以性能差距还是非常大的。

用户构造在遇到竞争,在长时间获取不到资源的场景,会一直在 CPU 上自旋,既浪费 CPU 时间,又耽误其他线程执行,内核构造在操作系统的协调下,会把获取不到资源的线程阻塞,不会浪费 CPU 时间。
内核构造在没有竞争的场景下,性能会比用户构造差几十倍。
混合构造就是组合用户构造和内核构造的实现,遇到竞争的时候,先使用用户构造自旋一下,自旋一段时间还没获取到资源,就使用内核构造阻塞线程,这样就能结合两种构造的优点了。
.Net 提供了 ManualResetEventSlim、SemaphoreSlim、Monitor、lock 关键字、ReaderWriterLockSlim、CountDownEvent、Barrier 等混合构造,可以在不同的场景下使用。

自定义一个简单的混合构造

通过这个例子可以了解一下是怎么组合内核构造和用户构造的。

/// <summary>
/// 一个简单的混合构造,组合 AutoResetEvent 和 Interlocked 实现
/// </summary>
internal class SimpleHybridLock : IDisposable
{
    private int _waiter;
    private AutoResetEvent _waiterLock = new(false);


    public void Enter()
    {
        if (Interlocked.Increment(ref _waiter) == 1)
        {
            return;
        }

        _waiterLock.WaitOne();
    }

    public void Exit()
    {
        if (Interlocked.Decrement(ref _waiter) == 0)
        {
            return;
        }

        _waiterLock.Set();
    }

    public void Dispose()
    {
        _waiterLock.Dispose();
    }
}

Monitor 和 lock 关键字

lock 关键字是最常使用的同步构造了,lock 可以锁定一个代码块,保证每次只有一个线程访问执行该代码块,lock 是基于 Montor 实现的,通过 try{...}finally{...} 把代码块包围起来。

  1. Monitor 是一个静态类,调用 Monitor.Enter(obj) 获取锁,调用 Monitor.Exit(obj) 释放。还可以在已经获取锁的线程上,调用 Monitor.Wait(obj) 释放锁,同时把线程放到等待队列,其他线程可以调用 Monitor.Pulse() 或 Monitor.PulseAll() 通知调用了 Monitor.Wait() 的线程继续获得锁。
    Monitor 支持重入锁,线程一致。
/// <summary>
/// 测试 Monitor.Wait(object)、Monitor.Pulse(object)、Monitor.PulseAll(object)
/// 注意点:
/// 调用 Wait()、Pulse()、PulseAll() 也必须先调用 Enter() 获取锁,退出的时候也必须调用 Exit() 释放锁
/// </summary>
public void Test()
{
    var lockObj = new object();

    Task.Factory.StartNew(() =>
    {
        Thread.Sleep(500);
        Console.WriteLine("按下 c 调用 Monitor.Pulse(object)");

        if (Console.ReadKey().Key == ConsoleKey.C)
        {
            try
            {
                Monitor.Enter(lockObj);
                Monitor.Pulse(lockObj);
            }
            finally
            {
                Monitor.Exit(lockObj);
            }
        }
        Thread.Sleep(500);

        if (Console.ReadKey().Key == ConsoleKey.C)
        {
            try
            {
                Monitor.Enter(lockObj);
                Monitor.PulseAll(lockObj);
            }
            finally
            {
                Monitor.Exit(lockObj);
            }
        }
    });

    Parallel.Invoke(DoWork, DoWork, DoWork);

    void DoWork()
    {
        Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 启动");
        try
        {
            Monitor.Enter(lockObj);
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 获得 Monitor");
            Thread.Sleep(100);
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 调用 Monitor.Wait()");
            Monitor.Wait(lockObj);
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 重新获得 Monitor");

        }
        finally
        {
            Monitor.Exit(lockObj);
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 释放 Monitor");
        }
    }
}
  1. Monitor.Enter(object) 参数是一个 object 类型,代表可以传入任何类型的参数,所以就有一些细节需要注意。
  • Monitor.Enter(值类型),涉及到值类型传参,就必须注意装箱拆箱的问题。
  • Monitor.Enter(字符串),虽然字符串是引用类型,但是字符串会留用,所以锁定同一个字符串就会导致互斥。
  • 如果一个实例对象的方法使用了 lock(this),如果外部调用也 lock 这个实例方法,那么就会死锁,所以最佳做法是永远不要 lock(this)。
/// <summary>
/// 测试 Monitor.Enter(字符串)
/// 因为字符串会被留用,所以会导致不同线程间互斥访问。
/// </summary>
public void Test2()
{
    var mre = new ManualResetEventSlim(false);

    Task.Run(() =>
    {
        Console.WriteLine("按下 c 启动");
        if (Console.ReadKey().Key == ConsoleKey.C)
        {
            mre.Set();
        }
    });

    Parallel.Invoke(DoWork, DoWork, DoWork);


    void DoWork()
    {
        mre.Wait();

        try
        {
            Monitor.Enter("1");
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 进入同步代码块");
            Thread.Sleep(1000);

        }
        finally
        {
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 退出同步代码块");
            Monitor.Exit("1");
        }

    }
}

/// <summary>
/// 测试 Monitor.Enter(值类型)
/// 因为 Monitor.Enter(object) 参数是 object,所以值类型必须装箱,那样其实就会有问题了。
/// 值类型在堆栈上,没有引用,引用类型在堆上,有引用,所以装箱就是在堆上新建一个实例,然后复制栈上值的内容,拆箱就是把堆上实例的值,复制到栈上。
/// </summary>
public void Test3()
{
    var mre = new ManualResetEventSlim(false);
    var i = 1;
    //Object o = i;

    Task.Run(() =>
    {
        Console.WriteLine("按下 c 启动");
        if (Console.ReadKey().Key == ConsoleKey.C)
        {
            mre.Set();
        }
    });

    Parallel.Invoke(DoWork, DoWork, DoWork);

    void DoWork()
    {
        mre.Wait();
        object o = i;
        try
        {
            Monitor.Enter(o);
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 进入同步代码块");
            Thread.Sleep(1000);
        }
        finally
        {
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 退出同步代码块");
            Monitor.Exit(o);
        }

    }
}

CountdownEvent

CountdownEvent 是一个混合构造,经常用于 fork/join 等场景,就是等待多个并行任务完成,再执行下一个任务。CountdownEvent 内部会维护一个计数,当计数为 0 时,解除线程的阻塞。

  • 调用 CountdownEvent.Reset(int) 可以重新初始化 CountdownEvent。
  • 调用 Signal() Signal(int count) 把计数减 1 或减 count。
  • 调用 AddCount() AddCount(int) 把计数加 1 或加 count。
 public void Test2()
{
    var queue = new ConcurrentQueue<int>(Enumerable.Range(1, 100));
    var cde = new CountdownEvent(queue.Count);

    var doWork = new Action(() =>
    {
        while (queue.TryDequeue(out var result))
        {
            Thread.Sleep(100);
            Console.WriteLine(result);
            cde.Signal();
        }
    });

    var _ = Task.Run(doWork); // fork
    var _2 = Task.Run(doWork); // fork


    var complete = new Action(() =>
    {
        cde.Wait(); // join
        Console.WriteLine($"queue Count {queue.Count}");
    });

    var t = Task.Run(complete);
    var t2 = Task.Run(complete);

    Task.WaitAll(t, t2);


    Console.WriteLine($"CountdownEvent 重新初始化");
    cde.Reset(2); // 调用 Reset() 将 cde 重新初始化
    cde.AddCount(10); // 调用 AddCount() cde 内部计数 + 1
    var cts = new CancellationTokenSource(1000); // 测试超时机制

    try
    {
        cde.Wait(cts.Token);
    }
    catch (Exception e)
    {
        Console.WriteLine(e);
    }

    cde.Dispose();

}

Barrier

Barrier 是一个混合构造,可以通过 participantCount 来指定一个数值,同时会维护一个内部数值 total,每次调用 SignalAndWait() 的时候,阻塞调用线程,同时把total 加 1,等到 total == participantCount,调用 postPhaseAction,通过 postPhaseAction 来确定汇总每个线程的数据,并且执行下个阶段的工作。
Barrier 适合一种特殊场景,把一个大任务拆分成多个小任务,然后每个小任务又会分阶段执行。像是 Parallel 的 Plus 版,如果任务步骤很多,用 Parallel 来分拆很麻烦,可以考虑用 Barrier。

public class BarrierDemo
{
    public void Test()
    {
        var words = new string[] { "山", "飞", "千", "鸟", "绝" };
        var words2 = new string[] { "人", "灭", "径", "万", "踪" };
        var solution = "千山鸟飞绝,万径人踪灭";
        bool success = false;

        var barrier = new Barrier(2, b =>
        {
            var sb = new StringBuilder();
            sb.Append(string.Concat(words));
            sb.Append(',');
            sb.Append(string.Concat(words2));
            
            Console.WriteLine(sb.ToString());
            //Thread.Sleep(1000);
            if (string.CompareOrdinal(solution, sb.ToString()) == 0)
            {
                success = true;
                Console.WriteLine($"已完成");
            }
            Console.WriteLine($"当前阶段数:{b.CurrentPhaseNumber}");

        });

        var t = Task.Run(() => DoWork(words));
        var t2 = Task.Run(() => DoWork(words2));

        Console.ReadLine();

        void DoWork(string[] words)
        {
            while (!success)
            {
                var r = new Random();
                for (int i = 0; i < words.Length; i++)
                {
                    var swapIndex = r.Next(i, words.Length);
                    (words[swapIndex], words[i]) = (words[i], words[swapIndex]);
                }

                barrier.SignalAndWait();
            }
        }
    }
}

ReaderWriterLockSlim

ReaderWriterLockSlim 是一个混合构造。一般场景中在读取数据的时候,不会涉及到数据的修改,所以可以并发读取,在修改数据的时候,才会涉及到数据的修改,所以应该互斥修改。其他同步构造无论读取还是修改数据都是锁定的,所以 .Net 提供了一个读写锁 ReaderWriterLockSlim。
ReaderWriterLockSlim 的逻辑如下:

  • 一个线程向数据写入时,请求访问的其他所有线程都阻塞。
  • 一个线程向数据读取时,请求读取的其他线程允许继续执行,但是请求写入的线程仍被阻塞。
  • 一个向数据写入的线程结束后,要么解除一个写入线程(writer)的阻塞,使它能向数据写入,要么解除所有读取线程(reader)的阻塞,使它们能够进行并发读取。如果没有线程被阻塞,则锁进入自由状态,可以被下一个 reader 或者 writer 线程获取。
  • 所有向数据读取的线程结束后,一个 writer 线程被解除阻塞,使它能向数据写入。如果没有线程被阻塞,则锁进入自由状态,可以被下一个reader 或者 writer 线程获取。
/// <summary>
/// ReaderWriterLockerSlim 用法
/// </summary>
internal class Transaction2
{
    private DateTime _timeLastTrans;

    public DateTime TimeLastTrans
    {
        get
        {
            _lock.EnterReadLock();
            Thread.Sleep(1000);
            var t = _timeLastTrans;
            Console.WriteLine($"调用 ReadLock {Thread.CurrentThread.ManagedThreadId}");

            _lock.ExitReadLock();
            return t;
        }
    }

    private readonly ReaderWriterLockSlim _lock = new(LockRecursionPolicy.NoRecursion);

    public void PerformTransaction()
    {
        _lock.EnterWriteLock();
        _timeLastTrans = DateTime.Now;
        Console.WriteLine($"调用 WriteLock {Thread.CurrentThread.ManagedThreadId}");
        _lock.ExitWriteLock();
    }

    public void Test()
    {
        PerformTransaction();

        ThreadPool.QueueUserWorkItem(_ => Console.WriteLine(TimeLastTrans));

        PerformTransaction();
        Thread.Sleep(500); // 就算睡眠500ms,在锁释放后,依旧先进行读操作,读完才有写操作。
        ThreadPool.QueueUserWorkItem(_ => Console.WriteLine(TimeLastTrans)); 
    }
}

回顾了一下知识,总结了一下,发现自己又学到不少。下次回顾一下 Task 的知识。
源码 https://github.com/yijidao/blog/tree/master/TPL/ThreadDemo/ThreadDemo3


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK