3

C#多线程之线程高级(下) - xiaolipro

 1 year ago
source link: https://www.cnblogs.com/xiaolipro/p/16894817.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

四、Monitor信号构造

信号构造本质:一个线程阻塞直到收到另一个线程发来的通知。

1510705-20221116093207322-1527097438.png

当多线程Wait同一对象时,就形成了一个“等待队列(waiting queue)”,和用于等待获得锁的“就绪队列(ready queue)”不同,每次调用Pulse时会释放队头线程,它会进入就绪队列,然后重新获取锁。可以把它想象成一个自动停车场,首先你在收费站(等待队列)排队验票,然后在栅栏前(就绪队列)排队等待放行。

这个队列结构天然有序,但是,对于Wait/Pulse应用通常不重要,在这种场景下把它想象成一个等待线程的“池(pool)”更好理解,每次调用Pulse都会从池中释放一个等待线程。

PulseAll释放整个等待队列或者说等待池。收到Pulse的线程不会完全同时开始执行,而是有序的执行,因为每个Wait语句都要试图重新获取同一把锁。他们的效果就是,PulseAll将线程从等待队列移到就绪队列中,让它们可以继续有序执行。

使用Wait/Pulse需要注意:

  • Wait / Pulse不能lock块之外使用,否则会抛异常。
  • Pulse最多释放一个线程,而PulseAll释放所有线程。
  • Wait会立即释放当前持有的锁,然后进入阻塞,等待脉冲
  • 收到脉冲会立即尝试重新获取锁,如果在指定时间内重新获取,则返回true,如果在超过指定时间获取,则返回false,如果没有获取锁,则一直阻塞不会返回

Wait和Pulse

  1. 定义一个字段,作为同步对象

    private readonly object _locker = new object();
    
  2. 定义一个或多个字段,作为阻塞条件

    private bool _ok;
    
  3. 当你希望阻塞的时候

    Monitor.Wait在等待脉冲时,同步对象上的锁会被释放,并且进入阻塞状态,直到收到 _locker上的脉冲,收到脉冲后重新获取 _locker,如果此时 _locker 已经被别的线程占有,则继续阻塞,直至_获取 _locker

    lock (_locker) 
    {
        while (!_ok)
        {
            Monitor.Wait (_locker);
        }
    }
    
  4. 当你希望改变阻塞条件时

    lock (_locker)
    {
        _ok = true;
        Monitor.Pulse(_locker);  // Monitor.PulseAll(_locker);
    }
    

WaitPulse几乎是万能的,通过一个bool标识我们就能实现AutoResetEvent/ManualResetEvent的功能,同理使用一个整形字段,就可以实现CountdownEvent/Semaphore

性能方面,调用Pulse花费大概约是在等待句柄上调用Set三分之一的时间。但是,使用WaitPulse进行信号同步,对比事件等待句柄有以下缺点:

  • Wait / Pulse不能跨越应用程序域和进程使用。

  • 必须通过锁保护所有信号同步逻辑涉及的变量。

调用Wait方法时,你可以设定一个超时时间,可以是毫秒或TimeSpan的形式。如果因为超时而放弃了等待,那么Wait方法就会返回false

public static bool Wait(object obj, TimeSpan timeout)

如果在超时到达时仍然没有获得一个脉冲,CLR会主动给它发送一个虚拟的脉冲(virtual pulse),使其能够重新获得锁,然后继续执行,就像收到一个真实脉冲一样。

下面这个例子非常有用,它可以定期的检查阻塞条件。即使其它线程无法按照预期发送脉冲,例如程序之后被其他人修改,但没能正确使用Pulse,这样也可以在一定程度上免疫 bug。因此在复杂的同步设计中可以给所有Wait指定超时时间。

lock (_locker)
  while (/* <blocking-condition> */)
    Monitor.Wait (_locker, /* <timeout> */);

Monitor.Wait的boolean类型返回值其实还可以这么理解:其返回值意味着是否获得了一个“真实的脉冲“。

如果”虚拟的脉冲“并不是期待的行为,可以记录日志或抛出异常。

Wait等待一个变量上的脉冲,Pulse对一个变量发送脉冲。脉冲也是一种信号形式,相对于事件等待句柄那种锁存(latching)信号,脉冲顾名思义是一种非锁存或者说易失的信号

双向信号与竞争状态

Monitor.Pulse是一种单向通信机制:发送脉冲的线程不关心发出的脉冲被谁收到了,他没有返回值,不会阻塞,内部也没有确认机制。

当一个线程发起一次脉冲:

  • 如果等待队列中没有任何线程,那么这次发起的脉冲不会有任何效果。
  • 如果等待队列中有线程,线程发送完脉冲并释放锁后,并不能保证接到脉冲信号的等待线程能立即开始工作。

然后我们有一些场景依赖等待线程能够在收到脉冲后及时的响应,此时,双向信号出现了,这是一种自定义的确认机制。

在上文的信号构造基础上改造一个竞争状态的案例:

public class 竞争状态测试
{
    private readonly ITestOutputHelper _testOutputHelper;
    private readonly object _locker = new object();
    private bool _ok;

    public 竞争状态测试(ITestOutputHelper testOutputHelper)
    {
        _testOutputHelper = testOutputHelper;
    }

    [Fact]
    void Show()
    {
        new Thread(() =>  // Worker
        {
            for (int i = 0; i < 5; i++)
                lock (_locker)
                {
                    while (!_ok) Monitor.Wait(_locker);
                    _ok = false;
                    _testOutputHelper.WriteLine("Wassup?");
                }
        }).Start();

        for (int i = 0; i < 5; i++)
        {
            lock (_locker)
            {
                _ok = true;
                Monitor.Pulse(_locker);
            }
        }
    }
}

我们期待的结果:

Wassup?
Wassup?
Wassup?
Wassup?
Wassup?

实际上这个这个程序可能一次”Wassup?“都不会输出:主线程可能在工作线程启动之前完成,这五次Pulse啥事都没干

还记得我们讲事件等待句柄时,使用AutoResetEvent来模拟的双向信号吗?现在使用Monitor来实现一个扩展性更好的版本

public class 双向信号测试
{
    private readonly ITestOutputHelper _testOutputHelper;
    private readonly object _locker = new();
    private bool _entry; // 我是否可以工作了
    private bool _ready; // 我是否可以继续投递了

    public 双向信号测试(ITestOutputHelper testOutputHelper)
    {
        _testOutputHelper = testOutputHelper;
    }

    [Fact]
    void Show()
    {
        new Thread(() =>
        {
            Thread.Sleep(100);
            for (int i = 0; i < 5; i++)
            {
                lock (_locker)
                {
                    _ready = true;
                    Monitor.PulseAll(_locker);
                    while (!_entry) Monitor.Wait(_locker);
                    _entry = false;
                    _testOutputHelper.WriteLine("Wassup?");
                }
            }
        }).Start();

        for (int i = 0; i < 5; i++)
        {
            lock (_locker)
            {
                while (!_ready) Monitor.Wait(_locker);
                _ready = false;
                _entry = true;
                Monitor.PulseAll(_locker);
            }
        }
    }
}

我们仍然使用_ready来作为上游脉冲线程的自旋条件,使用_entry作为下游等待线程的自旋条件。由于我们的逻辑都在lock语句中,即使之后引入了第三个线程,我们的逻辑仍然不会出问题,_ready_entry的读写总是原子的。

升级生产消费队列

  1. 这次,我们将允许多个消费者,各自拥有独立的消费线程。使用一个数组来存放这些线程,并且他们接收的不再是string,而是更加灵活的委托:

    private Thread[] _workers;
    private Queue<Action> _queue = new Queue<Action>();
    
  2. 和上次一样,我们传递null来告知消费者线程退出:

    foreach (var worker in _workers)
    {
        AddTask(null);
    }
    
  3. 在告知消费线程退出后Join这些线程,等待未完成的任务被消费:

    foreach (var worker in _workers)
    { 
        worker.Join();
    }
    
  4. 每个工作线程会执行一个名为Consume的方法。我们在构造队列时循环创建和启动这些线程:

    _workers = new Thread[workerCount];
    for (int i = 0; i < workerCount; i++)
    {
        _workers[i] = new Thread(Consume);
        _workers[i].Start();
    }
    
  5. 消费Comsume方法,一个工作线程从队列中取出并执行一个项目。我们希望工作线程没什么事情做的时候,或者说当队列中没有任何项目时,它们应该被阻塞。因此,我们的阻塞条件是_queue.Count == 0

    private void Consume()
    {
        while (true)
        {
            Action task;
            lock (_locker)
            {
                while (_queue.Count == 0)
                {
                    Monitor.Wait(_locker);  // 队列里没任务,释放锁,进入等待
                }
                // 获取新任务,重新持有锁
                task = _queue.Dequeue();
            }
            
            if (task == null) return;  // 空任务代表退出
            task();  // 执行任务
        }
    }
    
  6. 添加一个任务。出于效率考虑,加入一个任务时,我们调用Pulse而不是PulseAll。这是因为每个项目只需要唤醒(至多)一个消费者。如果你只有一个冰激凌,你不会把一个班 30 个正在睡觉的孩子都叫起来排队获取它。

    public void AddTask(Action task)
    {
        lock (_locker)
        {
            _queue.Enqueue(task);
            Monitor.Pulse(_locker);
        }
    }
    

模拟等待句柄

在双向信号中,你可能注意到了一个模式:_flag在当前线程被作为自旋阻塞条件,在另一线程中被设置为true,跳出自旋

lock(_locker)
{
    while (!_flag) Monitor.Wait(_locker);
	_flag = false;
}

ManualResetEvent

事实上它的工作原理就是模仿AutoResetEvent。如果去掉_flag=false,就得到了ManualResetEvent的基础版本。

private readonly object _locker = new object();
private bool _signal;
void WaitOne()
{
    lock (_locker)
    {
        while (!_signal) Monitor.Wait(_locker);
    }
}
void Set()
{
    lock (_locker)
    {
        _signal = true;
        Monitor.PulseAll(_locker);
    }
}
void Reset()
{
    lock (_locker) _signal = false;
}

使用PulseAll,是因为可能存在多个被阻塞的等待线程。而EventWaitHandle.WaitOne()的通行条件就是:是开着的,ManualResetEvent被放行通过后不会自己关门,只能通过Reset将门关上,再次期间其它所有阻塞线程都能通行。

AutoResetEvent

实现AutoResetEvent非常简单,只需要将WaitOne方法改为:

lock (_locker)
{
    while (!_signal) Monitor.Wait(_locker);
    _signal = false;  // 添加一条,自己关门
}

然后将Set方法改为:

lock (_locker)
{
    _signal = true;
    Monitor.Pulse(_locker);  // PulseAll替换成Pulse:
}

Semaphore

_signal替换为一个整型字段可以得到Semaphore的基础版本

public class 模拟信号量
{
    private readonly object _locker = new object();
    private int _count, _initialCount;
    public 模拟信号量(int initialCount)
    {
        _initialCount = initialCount;
    }
    
    void WaitOne()  // +1
    {
        lock (_locker)
        {
            _count++;
            while (_count >= _initialCount)
            {
                Monitor.Wait(_locker);
            }
        }
    }

    void Release()  // -1
    {
        lock (_locker)
        {
            _count --;
            Monitor.Pulse(_locker);
        }
    }
}

模拟CountdownEvent

是不是非常类似信号量?

public class 模拟CountdownEvent
{
    private object _locker = new object();
    private int _initialCount;

    public 模拟CountdownEvent(int initialCount)
    {
        _initialCount = initialCount;
    }

    public void Signal()  // +1
    {
        AddCount(-1);
    }

    public void AddCount(int amount)  // +amount
    {
        lock (_locker)
        {
            _initialCount -= amount;
            if (_initialCount <= 0) Monitor.PulseAll(_locker);
        }
    }

    public void Wait()
    {
        lock (_locker)
        {
            while (_initialCount > 0)
                Monitor.Wait(_locker);
        }
    }
}

CountdownEvent

利用我们刚刚实现的模拟CountdownEvent,来实现两个线程的会和,和同步基础中提到的WaitHandle.SignalAndWait一样。

并且我们也可以通过initialCount将会和的线程扩展到更多个,显而易见的强大。

public class 线程会和测试
{
    private readonly ITestOutputHelper _testOutputHelper;
    private 模拟CountdownEvent _countdown = new 模拟CountdownEvent(2);

    public 线程会和测试(ITestOutputHelper testOutputHelper)
    {
        _testOutputHelper = testOutputHelper;
    }

    [Fact]
    public void Show()
    {
        // 每个线程都睡眠一段随机时间
        Random r = new Random();
        new Thread(Mate).Start(r.Next(10000));
        Thread.Sleep(r.Next(10000));

        _countdown.Signal();
        _countdown.Wait();

        _testOutputHelper.WriteLine("Mate! ");
    }

    void Mate(object delay)
    {
        Thread.Sleep((int)delay);

        _countdown.Signal(); //+1
        _countdown.Wait();

        _testOutputHelper.WriteLine("Mate! ");
    }
}

上面例子,每个线程随机休眠一段时间,然后等待对方,他们几乎在同时打印”Mate!“,这被称为线程执行屏障(thread execution barrier)

当你想让多个线程执行一个系列任务,希望它们步调一致时,可以用到线程执行屏障。然而,我们现在的解决方案有一定限制:我们不能重用同一个Countdown对象来第二次会合线程,至少在没有额外信号构造的情况下不能。为解决这个问题,Framework 4.0 提供了一个新的类Barrier

Barrier

Framework 4.0 加入的一个信号构造。它实现了线程执行屏障(thread execution barrier),允许多个线程在一个时间点会合。这个类非常快速和高效,它是建立在Wait / Pulse和自旋锁基础上的。

  1. 实例化它,指定有多少个线程参与会合(可以调用AddParticipants / RemoveParticipants来进行更改)。

    public Barrier(int participantCount)
    
  2. 当希望会合时,调用SignalAndWait。表示参与者已到达障碍,并等待所有其他参与者到达障碍

    public void SignalAndWait()
    

    他还实现了协作取消模式

    public void SignalAndWait(CancellationToken cancellationToken)
    

    并提供了超时时间的重载,返回一个bool类型,true标识在规定的时间,其他参与者到达障碍,false标识没有全部到达

    public bool SignalAndWait(TimeSpan timeout)
    

实例化Barrier,参数为 3 ,意思是调用SignalAndWait会被阻塞直到该方法被调用 3 次。但与CountdownEvent不同,它会自动复位:再调用SignalAndWait仍会阻塞直到被调用 3 次。这允许你保持多个线程“步调一致”,让它们执行一个系列任务。

1510705-20221116093324615-1155582214.png

下边的例子中,三个线程步调一致地打印数字 0 到 4:

private readonly ITestOutputHelper _testOutputHelper;
private Barrier _barrier = new Barrier(3);
public Barrier测试(ITestOutputHelper testOutputHelper)
{
    _testOutputHelper = testOutputHelper;
}
[Fact]
void Show()
{
    new Thread(Speak).Start();
    new Thread(Speak).Start();
    new Thread(Speak).Start();
}
void Speak()
{
    for (int i = 0; i < 5; i++)
    {
        _testOutputHelper.WriteLine(i.ToString());
        _barrier.SignalAndWait();
    }
}

Barrier还提供一个非常用有的构造参数,他是一个委托,会在每个会和处执行。不用担心抢占,因为当它被执行时,所有的参与者都是被阻塞的。

public Barrier(int participantCount, Action<Barrier>? postPhaseAction)

前景回顾:

还记得我们在讲同步的时候提到的最小化共享数据无状态设计吗?经过前面的学习,稍加思考,其实引发线程安全的本质是多线程并发下的数据交互问题。如果我们的数据在线程之间没有交互,或者说我们的数据都是只读的,那不就天然的线程安全了吗?

现在你能理解为什么只读字段是天然线程安全的了吗?

然而有的场景下又需要对公共数据进行读写,同步篇中我们通过很简单的排它锁来保证线程安全,在这里,我们不在满足这种粗暴的粒度(事实上多数时候读总是多于写),这时,读写锁出现了。

ReaderWriterLockSlim

ReaderWriterLockSlim在 Framework 3.5 加入的,被加入了standard 1.0,此类型是线程安全的,用于保护由多个线程读取的资源。

ReaderWriterLockSlim出现的目的是为了取缔ReaderWriterLock,他简化了递归规则以及锁状态的升级和降级规则。避免了许多潜在的死锁情况。 另外,他的性能显著优于ReaderWriterLock。 建议对所有新开发的项目使用ReaderWriterLockSlim

然而如果与普通的lockMonitor.Enter / Exit)对比,他还是要慢一倍。

ReaderWriterLockSlim有三种模式:

  • 读取模式:允许任意多的线程处于读取模式

  • 可升级模式:只允许一个线程处于可升级模式,与读锁兼容

  • 写入模式:完全互斥,不允许任何模式下的线程获取任何锁

ReaderWriterLockSlim定义了如下的方法来获取和释放读 / 写锁:

public void EnterReadLock();
public void ExitReadLock();
public void EnterWriteLock();
public void ExitWriteLock();

另外,对应所有EnterXXX的方法,都有相应的TryXXX版本,可以接受一个超时参数,与Monitor.TryEnter类似。

让我们来看一个案例:

模拟三个读线程,两个写线程,并行执行

new Thread(Read).Start();
new Thread(Read).Start();
new Thread(Read).Start();
new Thread(Write).Start();
new Thread(Write).Start();

读方法是这样的

while (true)
{
    _rw.EnterReadLock();
    foreach (int number in _items)
    {
        Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " added " + number);
        Thread.Sleep(100);
    }
    _rw.ExitReadLock();
}

写方法是这样的

while (true)
{
    int number = _rand.Value.Next(100);
    _rw.EnterWriteLock();
    _items.Add(number);
    _rw.ExitWriteLock();
    Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " added " + number);
    Thread.Sleep(100);
}

随机数生成方法就是用的TLS讲过的

new ThreadLocal<Random>(() => new Random(Guid.NewGuid().GetHashCode()));

需要注意ReaderWriterLockSlim实现了IDisposable,用完了请记得释放

public class ReaderWriterLockSlim : IDisposable

运行结果:

Thread 11 added 42
Thread 8 reading 42
Thread 6 reading 42
Thread 7 reading 42
Thread 10 added 98
Thread 8 reading 42
...

显而易见的,并发度变高了

ReaderWriterLockSlim提供一个构造参数LockRecursionPolicy用于配置锁递归策略

public ReaderWriterLockSlim(LockRecursionPolicy recursionPolicy)
public enum LockRecursionPolicy
{
  /// <summary>If a thread tries to enter a lock recursively, an exception is thrown. Some classes may allow certain recursions when this setting is in effect.</summary>
  NoRecursion,
  /// <summary>A thread can enter a lock recursively. Some classes may restrict this capability.</summary>
  SupportsRecursion,
}

默认情况下是使用NoRecursion策略:不允许递归或重入,这与GO的读写锁设计不谋而合,建议使用此默认策略,因为递归引入了不必要的复杂性,并使代码更易于死锁。

public ReaderWriterLockSlim() : this(LockRecursionPolicy.NoRecursion)

开启支持递归策略后,以下代码不会抛出LockRecursionException异常

var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);
rw.EnterReadLock();
rw.EnterReadLock();
rw.ExitReadLock();
rw.ExitReadLock();

递归锁定级别只能越来越小,级别顺序如下:读锁,可升级锁,写锁。下面代码会抛出LockRecursionException异常

void F()
{
    var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);
    rw.EnterReadLock();
    rw.EnterWriteLock();
    rw.EnterWriteLock();
    rw.ExitReadLock();
}
Assert.Throws<LockRecursionException>(F);

可升级锁例外,把可升级锁升级为写锁是合法的。

var rw = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
rw.EnterUpgradeableReadLock();
rw.EnterWriteLock();
rw.ExitWriteLock();
rw.ExitUpgradeableReadLock();

思考一个问题:为什么只允许一个线程处于可升级模式?

SQL Server ReaderWriterLockSlim
共享锁(Share lock) 读锁(Read lock)
排它锁(Exclusive lock) 写锁(Write lock)
更新锁(Update lock) 可升级锁(Upgradeable lock)

Timer

如果你需要使用规律的时间间隔重复执行一些方法,这个例子会使得一个线程永远被占用

while (true)
{
    // do something
    Thread.Sleep(1000);
}

这时候你会需要Timer

创建计时器时,可以指定在方法首次执行之前等待的时间 dueTime ,以及后续执行之间等待的时间period。 类 Timer 的分辨率与系统时钟相同。 这意味着,如果period小于系统时钟的分辨率,委托将以系统时钟分辨率定义的时间间隔执行,在Windows 7 和Windows 8系统上大约为 15 毫秒。

public Timer(TimerCallback callback, object? state, int dueTime, int period)

下面这个例子首次间隔1s,之后间隔500ms打印tick...

Timer timer = new Timer ((data) =>
{
    _testOutputHelper.WriteLine(data.ToString());
}, "tick...", 1000, 500);
Thread.Sleep(3000);
timer.Dispose();

计时器委托是在构造计时器时指定的,不能更改。 该方法不会在创建计时器的线程上执行;而是在线程池(thread pool)执行。

如果计时器间隔period小于执行回调所需的时间,或者如果所有线程池线程都在使用,并且回调被多次排队,则可以在两个线程池线程上同时执行回调。

只要使用 Timer,就必须保留对它的引用。 与任何托管对象一样,当没有对其引用时,会受到垃圾回收的约束。 即使 Timer 仍然处于活动状态也不会阻止它被收集。

不再需要计时器时,请调用 Dispose 释放计时器持有的资源。请注意,调用 Dispose() 后仍然可能会发生回调,因为计时器将回调排队供线程池线程执行。可以使用public bool Dispose(WaitHandle notifyObject)重载等待所有回调完成。

System.Threading.Timer是一个普通计时器。 它会回调一个线程池线程(来自工作池)。

System.Timers.Timer是一个System.ComponentModel.Component ,它包装System.Threading.Timer ,并提供一些用于在特定线程上调度的附加功能。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK