1非阻塞同步Permalink
之前,我们描述了即使是很简单的赋值或更新一个字段也需要同步。尽管锁总能满足这个需求,一个存在竞争的锁意味着肯定有线程会被阻塞,就会导致由上下文切换和调度的延迟带来的开销,在高并发以及对性能要求很高的场景,这不符合需要。.NET Framework 的 非阻塞(nonblocking)同步构造能够在没有阻塞、暂停或等待的情况下完成简单的操作。
正确编写无阻塞或无锁的多线程代码是棘手的!特别是内存屏障容易用错(volatile
关键字更容易用错)。在放弃使用传统锁之前,请仔细思考是否真的需要非阻塞同步带来的性能优化。切记获得和释放一个无竞争的锁在一个 2010 时代的计算机上仅仅需要 20ns 而已。
无阻塞的方式也可以跨进程工作。一个例子就是它可以被用来读写进程间共享内存。
1.1内存屏障和易失性Permalink
考虑下边这个例子:
class Foo { int _answer; bool _complete; void A() { _answer = 123; _complete = true; } void B() { if (_complete) Console.WriteLine (_answer); } }
如果方法A
和B
在不同的线程上并发运行,B
可能会打印 “ 0 “ 吗?答案是会的,原因如下:
- 编译器、CLR 或 CPU 可能会重新排序(reorder)程序指令以提高效率。
- 编译器、CLR 或 CPU 可能会进行缓存优化,导致其它线程不能马上看到变量的赋值。
C# 和运行时会非常小心的保证这些优化不会破坏普通的单线程代码,和正确使用锁的多线程代码。除这些情况外,你必须通过显式的创建内存屏障(memory barrier,也称作内存栅栏 (memory fence))来对抗这些优化,限制指令重排和读写缓存产生的影响。
全栅栏Permalink
最简单的内存屏障是完全内存屏障(full memory barrier,或全栅栏(full fence)),它可以阻止所有跨越栅栏的指令重排和缓存。调用Thread.MemoryBarrier
生成一个全栅栏。我们可以使用 4 个全栅栏来修正之前的例子:
class Foo { int _answer; bool _complete; void A() { _answer = 123; Thread.MemoryBarrier(); // 屏障 1 _complete = true; Thread.MemoryBarrier(); // 屏障 2 } void B() { Thread.MemoryBarrier(); // 屏障 3 if (_complete) { Thread.MemoryBarrier(); // 屏障 4 Console.WriteLine (_answer); } } }
屏障 1 和 4 可以使这个例子不会打印 “ 0 “。屏障 2 和 3 提供了一个“最新(freshness)”保证:它们确保如果B
在A
后运行,读取_complete
的值会是true
。
在 2010 时代的桌面电脑上,一个全栅栏的开销大约是 10 纳秒。
下列方式都会隐式的使用全栅栏:
- C# 的
lock
语句(Monitor.Enter / Monitor.Exit
) Interlocked
类中的所有方法(马上会讲到)- 使用线程池的异步回调,包括异步委托、APM 回调,以及任务延续(task continuations)
- 在信号构造上等待或对其设置(译者注:发信号、复位等等)
- 任何依赖于信号同步的情况,比如启动或等待
Task
因为最后一条的关系,下边的代码是线程安全的:
int x = 0; Task t = Task.Factory.StartNew (() => x++); t.Wait(); Console.WriteLine (x); // 1
不需要对每一个读或写都使用全栅栏。如果有 3 个 answer 字段,我们的例子仍然只需要 4 个栅栏:
class Foo { int _answer1, _answer2, _answer3; bool _complete; void A() { _answer1 = 1; _answer2 = 2; _answer3 = 3; Thread.MemoryBarrier(); _complete = true; Thread.MemoryBarrier(); } void B() { Thread.MemoryBarrier(); if (_complete) { Thread.MemoryBarrier(); Console.WriteLine (_answer1 + _answer2 + _answer3); } } }
好的方式是:首先在每一个读写共享字段的指令前后都加上内存屏障,然后再剔除那些不需要的。如果你无法确认是否需要,那就保留它们。或者,更好的方式是:换回使用锁!
真的需要锁和内存屏障吗?Permalink
如果在用共享可写字段(shared writable fields)时不加锁或栅栏是自找麻烦。关于这个话题有很多误导信息,包括 MSDN 文档中描述只有在弱内存排序的多处理器系统上MemoryBarrier
才是必需的,例如,使用多个 Intel Itanium 处理器的系统。我们可以通过下边这个简短的程序证明:在普通的 Intel Core-2 和 Pentium 处理器上,内存屏障也是非常重要的。在开启优化以及非调试模式下运行下边的程序(在 Visual Studio 中,解决方案的配置管理里选择 Release 模式,然后非调试模式下启动 )
static void Main() { bool complete = false; var t = new Thread (() => { bool toggle = false; while (!complete) toggle = !toggle; }); t.Start(); Thread.Sleep (1000); complete = true; t.Join(); // 无限阻塞 }
这个程序 不会终止,因为变量complete
被缓存在 CPU 寄存器中。在while
循环中加入一个Thread.MemoryBarrier
的调用(或在读取complete
的地方加锁)可以修正这个错误。
volatile 关键字Permalink
另一个(更高级的)解决这个问题的方法是对_complete
字段使用volatile
关键字。
volatile bool _complete;
volatile
关键字通知编译器在每个读这个字段的地方使用一个读栅栏(acquire-fence),并且在每个写这个字段的地方使用一个写栅栏(release-fence)。读栅栏防止其它读/写被移到栅栏之前,写栅栏防止其它读/写被移到栅栏之后。这种“半栅栏(half-fences)”比全栅栏更快,因为它给了运行时和硬件更大的优化空间。
巧的是,Intel 的 X86 和 X64 处理器总是在读时使用读栅栏,写时使用写栅栏,无论是否使用volatile
关键字。所以在使用这些处理器的情况下,这个关键字对硬件来说是无效的。然而,volatile
关键字对编译器和 CLR 进行的优化是有作用的,以及在 64 位 AMD 和 Itanium 处理器上也是有作用的。这意味着不能因为你的客户端运行在特定类型的 CPU 上而放松警惕。
(并且即使你使用了volatile
,也仍然应当保持一种健康的担忧,我们稍后会看到原因!)
关于对字段使用volatile
关键字的效果,总结如下:
第一条指令 | 第二条指令 | 是否会被交换 |
---|---|---|
读 | 读 | 不会 |
读 | 写 | 不会 |
写 | 写 | 不会(CLR 确保写-写操作永远不会被交换,就算是没有volatile 关键字) |
写 | 读 | 会! |
注意:使用volatile
不能阻止写-读被交换,这可能是一个难题。Joe Duffy 使用下面的例子很好的说明了这个问题:如果Test1
和Test2
同时运行在不同的线程上,可能a
和b
最后的值都是 0 (尽管在x
和y
上都使用了volatile
):
class IfYouThinkYouUnderstandVolatile { volatile int x, y; void Test1() // 运行在一个线程上 { x = 1; // Volatile 写 (写栅栏) int a = y; // Volatile 读 (读栅栏) // ... } void Test2() // 运行在另一线程上 { y = 1; // Volatile 写 (写栅栏) int b = x; // Volatile 读 (读栅栏) // ... } }
MSDN 文档描述:使用volatile
关键字可以确保该字段在任何时间呈现的都是最新的值。这是错误的,就像我们刚才看到的,写-读操作可能被重新排序。(译者注:其实不能说 MSDN 的说法错误,使用volatile
后x
和y
的值确实是最新的,只是因为指令重排,对它们的读可能在另一个线程上的写之前进行)
这给出了避免使用volatile
关键字的理由:就算你能很好的理解这个例子,可是其它一起工作的开发者也理解么?在Test1
和Test2
的两次赋值之间使用全栅栏(或锁)可以解决这个问题。
volatile
关键字不支持引用类型的参数和捕获的局部变量:这些情况下你必须使用VolatileRead
和VolatileWrite
方法。
VolatileRead 和 VolatileWritePermalink
使用Thread
类上的静态方法VolatileRead
和VolatileWrite
读/写变量时,相当于volatile
关键字产生的作用(技术上说,作用是其超集)。它们的实现相对低效,可是这是因为它们实际上使用了全栅栏。这是它们对于整型的实现:
public static void VolatileWrite (ref int address, int value) { MemoryBarrier(); address = value; } public static int VolatileRead (ref int address) { int num = address; MemoryBarrier(); return num; }
可以看出来,如果调用VolatileWrite
后紧接着调用VolatileRead
,在它们中间是没有屏障的:这会产生和我们之前看到的同样的难题。
内存屏障和锁Permalink
像前所述,Monitor.Enter
和Monitor.Exit
都使用了全栅栏。因此,如果我们忽略锁的互斥作用,可以这样说:
lock (someField) { ... }
相当于:
Thread.MemoryBarrier(); { ... } Thread.MemoryBarrier();
1.2InterlockedPermalink
无锁代码下,在读写字段时使用内存屏障往往是不够的。在 64 位字段上进行加、减操作需要使用Interlocked
工具类这样更加重型的方式。Interlocked
也提供了Exchange
和CompareExchange
方法,后者能够进行无锁的读-改-写(read-modify-write)操作,只需要额外增加一点代码。
如果一条语句在底层处理器上被当作一个独立不可分割的指令,那么它本质上是原子的(atomic)。严格的原子性可以阻止任何抢占的可能。对于 32 位(或更低)的字段的简单读写总是原子的。而操作 64 位字段仅在 64 位运行时环境下是原子的,并且结合了多个读写操作的语句必然不是原子的:
class Atomicity { static int _x, _y; static long _z; static void Test() { long myLocal; _x = 3; // 原子的 _z = 3; // 32位环境下不是原子的(_z 是64位的) myLocal = _z; // 32位环境下不是原子的(_z 是64位的) _y += _x; // 不是原子的 (结合了读和写操作) _x++; // 不是原子的 (结合了读和写操作) } }
在 32 位环境下读写 64 位字段不是原子的,因为它需要两条独立的指令:每条用于对应的 32 位内存地址。所以,如果线程 X 在读一个 64 位的值,同时线程 Y 更新它,那么线程 X 最终可能得到新旧两个值按位组合后的结果(一个撕裂读(torn read))。
编译器实现x++
这种一元运算,是通过先读一个变量,然后计算,最后写回去的方式。考虑如下类:
class ThreadUnsafe { static int _x = 1000; static void Go() { for (int i = 0; i < 100; i++) _x--; } }
抛开内存屏障的事情,你可能会认为如果 10 个线程并发运行Go
,最终_x
会为0
。然而,这并不一定,因为可能存在竞态条件(race condition),在一个线程完成读取x
的当前值,减少值,把值写回这个过程之间,被另一个线程抢占(导致一个过期的值被写回)。
当然,可以通过用lock
语句封装非原子的操作来解决这些问题。实际上,锁如果一致的使用,可以模拟原子性。然而,Interlocked
类为这样简单的操作提供了一个更方便更快的方案:
class Program { static long _sum; static void Main() { // _sum // 简单的自增/自减操作: Interlocked.Increment (ref _sum); // 1 Interlocked.Decrement (ref _sum); // 0 // 加/减一个值: Interlocked.Add (ref _sum, 3); // 3 // 读取64位字段: Console.WriteLine (Interlocked.Read (ref _sum)); // 3 // 读取当前值并且写64位字段 // (打印 "3",并且将 _sum 更新为 10 ) Console.WriteLine (Interlocked.Exchange (ref _sum, 10)); // 10 // 仅当字段的当前值匹配特定的值(10)时才更新它: Console.WriteLine (Interlocked.CompareExchange (ref _sum, 123, 10); // 123 } }
Interlocked
上的所有方法都使用全栅栏。因此,通过Interlocked
访问字段不需要额外的栅栏,除非它们在程序其它地方没有通过Interlocked
或lock
来访问。
Interlocked
的数学运算操作仅限于Increment
、Decrement
以及Add
。如果你希望进行乘法或其它计算,在无锁方式下可以使用CompareExchange
方法(通常与自旋等待一起使用)。我们会在并行编程中提供一个例子。
Interlocked
类通过将原子性的需求传达给操作系统和虚拟机来进行实现其功能。
Interlocked
类的方法通常产生 10ns 的开销,是无竞争锁的一半。此外,因为它们不会导致阻塞,所以不会带来上下文切换的开销。然而,如果在循环中多次迭代使用Interlocked
,就可能比在循环外使用一个锁的效率低(不过Interlocked
可以实现更高的并发度)。
2使用 Wait 和 Pulse 进行信号同步Permalink
(译者注:Pulse
翻译为脉冲,它和Wait
都是作用在一个变量上:Wait
等待一个变量上的脉冲,Pulse
对一个变量发送脉冲。脉冲也是一种信号形式,相对于事件等待句柄那种锁存信号,脉冲顾名思义是一种非锁存或者说易失的信号)
之前我们讨论了事件等待句柄,这是一种简单的信号同步机制:一个线程阻塞直到收到另一个线程发来的通知。
还有个更强大的信号构造,由Monitor
类通过两个静态方法Wait
和Pulse
(以及PulseAll
)提供。原理是使用自定义的标识和字段(封装在lock
语句中)自行实现信号同步逻辑,然后引入Wait
和Pulse
控制防止自旋。仅仅使用这些方法和lock
,你就可以实现AutoResetEvent
、ManualResetEvent
以及Semaphore
,还有WaitHandle
的静态方法WaitAll
和WaitAny
的功能。此外,Wait
和Pulse
也可以用于所有等待句柄都不适用的情况。
但是,使用Wait
和Pulse
进行信号同步,对比事件等待句柄有以下缺点:
Wait / Pulse
不能跨越应用程序域和进程使用。- 必须切记通过锁保护所有信号同步逻辑涉及的变量。
- 使用
Wait / Pulse
的程序可能会导致依赖微软文档的开发者困惑。
微软文档的问题的是就算你已经攻读了解了Wait
和Pulse
是如何工作的,也还是无法明白它们该如何使用。Wait
和Pulse
会让浅尝辄止的人感到特别恶心:它们会寻找你理解中的漏洞然后折磨你取乐!幸运的是,有一种简单的使用模式可以驯服Wait
和Pulse
。
性能方面,在 2010 时代的桌面电脑上,调用Pulse
花费大概 100ns 左右, 约是在等待句柄上调用Set
三分之一的时间。等待无竞争信号的开销完全取决于你,因为是你使用普通的字段和变量自行实现的逻辑。在实践中上,这非常简单,并且基本上相当于使用锁的代价。
2.1如何使用 Wait 和 PulsePermalink
下面是如何使用Wait
和Pulse
:
1. 定义一个字段,作为同步对象,例如:
readonly object _locker = new object();
2. 定义一个或多个字段,作为自定义的阻塞条件,例如:
bool _go; /* 或 */ int _semaphoreCount;
3. 当你希望阻塞的时候,使用下边的代码:
lock (_locker) while (/* <blocking-condition> */) Monitor.Wait (_locker);
4. 当改变(或隐式改变)一个阻塞条件的时候,使用下边的代码:
lock (_locker) { // 修改会影响阻塞条件的字段或数据 // ... Monitor.Pulse(_locker); // 或: Monitor.PulseAll (_locker); }
(如果想改变阻塞条件并等待,可以在一个lock
内合并第 3 步和第 4 步)
这个模式允许任意线程在任意时间使用任意条件等待。下边这个简单的例子,一个线程等待直到_go
字段被设置为true
:
class SimpleWaitPulse { static readonly object _locker = new object(); static bool _go; static void Main() { // 新线程会阻塞 new Thread (Work).Start(); // 因为 _go==false Console.ReadLine(); // 等待用户敲回车 lock (_locker) // 现在唤醒线程 { // 通过设置 _go=true 然后 Pulse _go = true; Monitor.Pulse (_locker); } } static void Work() { lock (_locker) while (!_go) Monitor.Wait (_locker); // 当等待时锁会被释放 Console.WriteLine ("Woken!!!"); } }
输出结果:
Woken!!! (按下回车键之后)
为了线程安全,我们确保所有共享字段的访问都在锁内。因此,在读取和更新_go
标识的地方都加上了lock
语句。这很必要(除非你希望使用非阻塞同步的方式)。
Work
方法会一直阻塞,等待_go
标识变为true
。Monitor.Wait
方法按顺序做了如下的操作:
- 释放
_locker
上的锁。 - 阻塞,直到收到
_locker
上的脉冲。 - 重新获取
_locker
上的锁。如果锁已被占用,那么线程阻塞,直到锁变为可用为止。
这意味着当Monitor.Wait
在等待脉冲时,同步对象上的锁没有被持有。这并不是像代码看上去那样。
lock (_locker) { while (!_go) Monitor.Wait (_locker); // 锁被释放 // 锁重新获得 // ... }
然后继续执行下一条语句。Monitor.Wait
被设计为在lock
语句内使用,否则调用它会抛出一个异常。Monitor.Pulse
也是一样。
在Main
方法中,我们通过设置_go
标识(在锁内)和调用Pulse
来给工作线程发信号。我们一释放锁,工作线程就可以继续执行,继续它的while
循环。
Pulse
和PulseAll
方法可以释放通过调用Wait
阻塞的线程。Pulse
最多释放一个线程,而PulseAll
释放全部。在我们的例子中,只有一个线程被阻塞,所以它们在这个例子中效果是一样的。如果有多个线程在等待,以我们建议的这个模式来说,调用PulseAll
通常最安全。
为了Wait
能够和Pulse
或PulseAll
进行通信,必须使用同一个同步对象(我们的例子中的_locker
)。
在我们的模式中,脉冲表示有些东西可能已经改变,等待线程应该重新检查它们的阻塞条件。在Work
方法内,检查是通过while
循环实现的。由等待方来决定是否要继续运行,而不是通知方。如果把脉冲直接当作通知继续的指令,那么Wait
的构造就没有任何价值了,这样使用就相当于一个残疾版的AutoResetEvent
。
如果我们抛弃该模式,移除while
循环、_go
标识以及ReadLine
,就获得了一个最基础的Wait / Pulse
的例子:
static void Main() { new Thread (Work).Start(); lock (_locker) Monitor.Pulse (_locker); } static void Work() { lock (_locker) Monitor.Wait (_locker); Console.WriteLine ("Woken!!!"); }
这可能不会有输出,因为它有不确定性!在主线程和工作线程之间存在竞争,如果Wait
先执行,信号可以正常工作。如果Pulse
先执行,它就会丢失,工作线程就永远卡在那里等待。这与AutoResetEvent
的行为不同,它的Set
方法有一种记忆效果,或者说锁存(latching)效果,所以即使它在WaitOne
之前调用,仍然有效。
但是Pulse
没有锁存效果,它需要你自行实现,就像我们之前使用的 “ go “ 标识。这就是为什么Wait
和Pulse
是万能的原因:使用一个布尔标识,我们可以实现类似AutoResetEvent
的功能;使用一个整型字段,可以实现 CountdownEvent
或Semaphore
。通过更复杂的数据结构,可以进一步实现类似生产者 / 消费者队列这样的构造。
2.2生产者 / 消费者队列Permalink
之前,我们描述了生产者 / 消费者队列的概念,以及如何通过AutoResetEvent
来实现它。现在,我们通过Wait
和Pulse
来实现一个更强大的版本。
这次,我们将允许多个消费者,各自拥有它们自己的线程。使用一个数组来存放这些线程:
Thread[] _workers;
这样可以让我们在关闭该队列的时候Join
这些线程。
每个工作线程会执行一个名为Consume
的方法。我们可以在一个循环中创建和启动线程,例如:
public PCQueue (int workerCount) { _workers = new Thread [workerCount]; // 为每个worker创建和启动一个独立的线程 for (int i = 0; i < workerCount; i++) (_workers [i] = new Thread (Consume)).Start(); }
之前我们只是使用一个字符串来代表任务,这次使用一种更灵活的方式,即一个委托。我们使用 .NET Framework 中的System.Action
委托,它定义如下:
public delegate void Action();
这个委托可以匹配任意无参方法,很像ThreadStart
委托。当然我们也可以描述需要参数的任务,通过把调用封装在匿名委托或 lambda 表达式中。
Action myFirstTask = delegate { Console.WriteLine ("foo"); }; Action mySecondTask = () => Console.WriteLine ("foo");
如之前一样,使用Queue<T>
来表示任务的队列:
Queue<Action> _itemQ = new Queue<Action>();
在讨论EnqueueItem
和Consume
方法之前,先来看一下完整的代码:
using System; using System.Threading; using System.Collections.Generic; public class PCQueue { readonly object _locker = new object(); Thread[] _workers; Queue<Action> _itemQ = new Queue<Action>(); public PCQueue (int workerCount) { _workers = new Thread [workerCount]; // 为每个worker创建和启动一个独立的线程 for (int i = 0; i < workerCount; i++) (_workers [i] = new Thread (Consume)).Start(); } public void Shutdown (bool waitForWorkers) { // 为每个线程加入一个 null 任务,使它们退出 foreach (Thread worker in _workers) EnqueueItem (null); // 等待工作线程完成 if (waitForWorkers) foreach (Thread worker in _workers) worker.Join(); } public void EnqueueItem (Action item) { lock (_locker) { _itemQ.Enqueue (item); // 因为改变了阻塞条件 Monitor.Pulse (_locker); // 所以发送脉冲通知 } } void Consume() { while (true) // 继续消费直到 { // 收到通知 Action item; lock (