Appearance
.NET 线程安全
1. 用户模式、内核模式
1.1. 基元
基元(Primitive):指代码中可以使用的最简单的构造。
有两种基元构造:用户模式(user-mode)和内核模式(kernel-mode)。
1.2. 用户模式
它是用 CPU 指令来协调线程,这种协调是在硬件中发生的,所以速度会快于内核模式。但是也意味着,Windows 操作系统永远也检测不到一个线程在一个基元用户模式构造上阻塞了。由于在一个基元用户模式构造上阻塞的线程永远不认为已经阻塞,所以线程池不会创建新的线程来替换这种阻塞的线程。另外,这些 CPU 指令只是阻塞线程极短的时间。
缺点:只有 Windows 系统的内核才能停止一个线程的执行。用户模式中的线程可能会被系统抢占,但很快就会被再次调度。如果一个线程想获得资源又暂时取不到资源,它会一直在用户模式中运行,这会大大浪费 CPU 的时间。
1.3. 内核模式
内核模式的构造是由 Windows 操作系统自身提供的。它们要求在应用程序的线程中调用操作系统内核的函数。将线程从用户模式切换成内核模式(或相反)会导致巨大的新能损失,这也是为什么要避免使用内核模式的原因。
优点:一个线程使用一个内核模式的构造获取一个其它线程拥有的资源时,Windows 会阻塞线程,使它不浪费 CPU 的时间。然后当资源变得可用时,Windows 会恢复线程,允许它访问资源。
1.4. 活锁 & 死锁
对于在一个构造上等待的线程,如果拥有这个构造的线程一直不释放它,前者就可能一直阻塞。
“活锁”:如果这是一个用户模式的构造,线程将一直在 CPU 上运行,我们称之为 “活锁”。
“死锁”:如果这是一个内核模式的构造,线程将一直阻塞,我们称之为 “死锁”。
“死锁” 总是优于 “活锁”,因为 “活锁” 既浪费 CPU 时间又浪费内存,而 “死锁” 只浪费内存。
1.5. 原子操作
对简单数据类型进行原子性的读和写。
比如:对于 32 位的 cpu,4 字节及以下是原子操作。64 位的 cpu,8 字节及以下是原子操作。
如对于 32 位的 cpu
csharp
class SomeType
{
public class Int32 x=0;
}1
2
3
4
2
3
4
我们对它进行赋值:
csharp
SomeType.x = 0x01234567x 变量的值会一次性(原子性)从 0x00000000 变成 0x01234567,这期间另一个线程不可能看到一个中间状态的值。但是如果 x 是一个 Int64 的类型。
csharp
SomeType.x = 0x0123456789abcdef另一个线程查询 x 的值,可能得到一个 0x0123456700000000 或 0x0000000089abcdef。因为读写操作不是原子性的。对于 32 位的 cpu 必须分两次写入这个数据。
2. 用户模式构造
这种方式是非阻止的同步,主要原理就是用了上面描述的原子操作特性。因为它不刻意阻塞线程,所以速度非常快。Thread.VolatileRead、Thread.VolatileWrite、System.Threading.Interlocked 类提供的方法,以及以及 C# 的 volatile 关键字都支持原子性的操作。
重中之重:上面提到的方法,并不是真正意义上的不阻塞,而是这个阻塞发生在 cpu 上,是用指令来协调的,阻塞时间非常短而已。例如:如果有 5 个线程同时访问到 Thread.VolatileRead 方法,其中 4 个线程肯定会被阻塞。
2.1. VolatileRead 和 VolatileWrite
这两个方法的解释让人非常迷惑,很不容易理解。从字面意思来看,就是进行易失性读取。
对于 Jeffrey 总结的这条规则:当线程通过共享内存相互通信时,调用 VolatileWrite 来写入最后一个值,调用 VolatileRead 来读取第一个值。
上面的这条规则是怎么的出来的?难道因为 Jeffrey 是牛人,我们就不动脑袋盲目接受?适度的探寻是必要的。
先来看看 Thread.MemoryBarrier 这个方法的作用:它强迫按照程序的顺序,之前的加载和存储操作必须在 MemoryBarrier 方法之前完成;之后的加载和存储操作必须在 MemoryBarrier 方法之后完成。这个方法是一个完整的栅栏(full fence),关于内存栅栏的概念可以 google 搜索。VolatileRead 和 VolatileWrite 在内部都调用了这个类。
csharp
public static int VolatileRead(ref int address)
{
int num = address;
MemoryBarrier();
return num;
}1
2
3
4
5
6
2
3
4
5
6
csharp
public static void VolatileWrite(ref int address, int value)
{
MemoryBarrier();
address = value;
}1
2
3
4
5
2
3
4
5
所以真正起作用的是 Thread.MemoryBarrier 方法:该方法可以阻止 CPU 指令的重新排列(也可阻止编译器的优化),在调用 MemoryBarrier 之后的内存访问不能在这之前就完成(也就是不能缓存的意思)。到现在明白了,MemoryBarrier 方法后的变量访问,都会去读内存最新的值。
有了这个解释,我们在来理解 VolatileRead 方法就相对容易了。在调用 MemoryBarrier 之前,它做了一步 int num = address; 这会造成到内存中去取 address 的值赋给 num,并且因为下面调用了 MemoryBarrier 方法,所以这一步不能被编译器优化掉,最后在 MemoryBarrier 方法后,返回这个最新的值。背后的实质就是利用了 MemoryBarrier 的特性,对要取的值做一步计算(简单赋值),然后返回,每次调用这个函数它都会重新取值。
而 VolatileWrite 方法,它只调用了 MemoryBarrier 保证前面的代码都执行了并写入到了内存,最后写入新值。所以,如果你的代码和顺序无关,或代码就只有一句,你完全可以直接赋值,而不用调用这个方法。
有点混乱,再归纳 2 点:
调用这两个方法,可以保证程序代码的顺序,因为写入(write)一个值,其他线程可能马上就会用这个值,所以要保证
VolatileWrite放在函数块的最后(这样编译器就不会优化代码,移动代码的顺序)。以保证VolatileWrite前面的内容都正确的计算和存储到内存中了。其他线程根据VolatileWrite写的值,可能会用到我们刚才计算的内容,这样就不会出错。对于read一个值,把VolatileRead放在函数块的最前面(个人觉得位置不是很重要),它在这里的主要作用是保证对变量的读取是从内存中读取。这两个方法中并没有保证是不是原子操作,看反编译代码你就知道。所以要自己控制使用的变量类型。这和你的 CPU 是 32 和 64 位密切相关。(这一点有待进一步考证)
备注:从 volatile 关键字不支持 Int64,double 等 64 位类型,也可以间接推断出这一点。尽管这两个方法中提供了 Int64, double 等版本,但我觉得和 cpu 的位数是相关的。
案例 1:对于一个可能被多线程访问的变量 x,如果你在另外一个线程中轮询这个变量是否被改变,必须要用 VolatileRead,以保证读到的是内存中得最新值,否则可能会出现死循环。
csharp
private void VolatileRW()
{
m_stopWork = 0;
Thread t = new Thread(new ParameterizedThreadStart(Worker));
t.Start(0);
Thread.Sleep(5000);
Thread.VolatileWrite(ref m_stopWork, 1); // 设定 m_stopWrok 为 1,这里和顺序有关,这里应该用 VolatileWrite,不要妄图去猜想编译器的优化顺序
LogHelper.WriteInfo("Main thread waiting for worker to stop");
t.Join();
}
private void Worker(object o)
{
int x = 0;
// while (m_stopWork == 0)
// 如果这样判定,m_stopWork 被缓存后可能不会再去读取内存的值(循序变量可能会被编译器优化),所以可能会是个死循环
while (Thread.VolatileRead(ref m_stopWork) == 0) // 用 VolatileRead 每次就会去读新的值
{
x++;
}
LogHelper.WriteInfo(string.Format("worker stoped:x={0}", x));
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
实际测试中,用 release 模式编译,并且不用 vs 直接调试程序,上面的第一个 while 就会出现死循环。现在应该知道这两个函数的作用了吧。
2.2. Interlocked 类
这个类的每个方法执行的是一次原子性的读取以及写入操作。这个类的所有方法都建立了完整的内存栅栏(和 Thread.MemoryBarrier 一样),保证了对内存的读取是最新数据。并且它能对 Int64, double 等执行原子操作(内部做了处理,但不是 lock 处理,而是用循环判断不成功就继续尝试),这一点和上面的两个方法是有区别的。
练习 1(Interlocked.Add, Interlocked.Decrement, Interlocked.Read):
csharp
private long m_threadNum = 5; // 工作线程计数
private long m_Total = 0; // 总和
private void InterlockTest()
{
// 开启 5 个线程,分别计算 10 以内的和
ThreadPool.QueueUserWorkItem(o => AddNumber(10));
ThreadPool.QueueUserWorkItem(o => AddNumber(10));
ThreadPool.QueueUserWorkItem(o => AddNumber(10));
ThreadPool.QueueUserWorkItem(o => AddNumber(10));
ThreadPool.QueueUserWorkItem(o => AddNumber(10));
// 上面 5 个都计算完了,打印出结果
ThreadPool.QueueUserWorkItem(o => DisplaySum());
}
private void AddNumber(int num)
{
int sum = 0;
while (num > 0)
{
sum += num;
num--;
}
Console.WriteLine("thread {0} ok. sum={1}", Thread.CurrentThread.ManagedThreadId, sum);
Interlocked.Add(ref m_Total, sum); // 把结果加到总和上面
Interlocked.Decrement(ref m_threadNum); // 线程计数减一
}
private void DisplaySum()
{
while (Interlocked.Read(ref m_threadNum) != 0)
{
// 其他线程没有做完就在这里自旋,故意浪费 cpu 时间
}
// 都做完了就输出结果
Console.WriteLine("thread {0} all done,total sum = {1}", Thread.CurrentThread.ManagedThreadId, m_Total);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
运行结果:
Text
thread 7 ok. sum=55
thread 11 ok. sum=55
thread 7 ok. sum=55
thread 11 ok. sum=55
thread 7 ok. sum=55
thread 11 all done,total sum = 2751
2
3
4
5
6
2
3
4
5
6
这里的 thread id 为 7 和 11,感觉好像只有两个线程一样,其实不是这样的。因为这里用了 Theadpool,由于我们的计算量太小(10 的累加),所以 CLR 重复利用了空闲的线程,这也是为什么倡导多用线程池的原因。
另外:上面代码的 Interlocked.Read 方法只能用于 Int64 位的读取,所以对于 Int32 等应该用 Thread.VolatileRead 方法会比较合理。
练习 2:Interlocked.Exchange 方法用于单例模式出现的问题:
csharp
private void InterExchangeTest()
{
// 开启 5 个线程
new Thread(() => { Singleton.Instance().ToString(); }).Start();
new Thread(() => { Singleton.Instance().ToString(); }).Start();
new Thread(() => { Singleton.Instance().ToString(); }).Start();
new Thread(() => { Singleton.Instance().ToString(); }).Start();
new Thread(() => { Singleton.Instance().ToString(); }).Start();
}
private class Singleton
{
private static Singleton _instance;
private Guid _id;
private Singleton(Guid id)
{
this._id = id;
}
public override string ToString()
{
return _id.ToString(); // 为了便于标识对象,这里用了 Guid 来表示
}
public static Singleton Instance()
{
if (_instance == null)
{
Singleton temp = new Singleton(Guid.NewGuid());
Console.WriteLine("temp:" + temp.ToString());
if (Interlocked.Exchange(ref _instance, temp) == null) // 如果是 null,代表是第一次初始化值,否者不是
{
Console.WriteLine("single thead entered.thread id = { 0 }", Thread.CurrentThread.Manag edThreadId);
}
else
{
Console.WriteLine("other thead entered.thread id = { 0 }", Thread.CurrentThread.Manag edThreadId);
}
Console.WriteLine("_instace:" + _instance.ToString());
}
return _instance;
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
上面的代码用 Interlocked.Exchange 方法来初始化单例对象,这个方法返回的是改变之前的值。如果返回的是 null,则代表是第一次访问;如果不是 null,则一定是多个线程进入到了代码段,第一个线程改变了 _instance 变量,其他线程访问时返回的就不是 null 了,但它们还会继续覆盖初始化好的值。测试中确实有多个线程进入,实例被多次初始化了:
Text
temp:1d20c10a-be7b-4352-ade7-bb15e0a6ee38
single thead entered.thread id = 1 3
_instace:1d20c10a-be7b-4352-a de7-bb15e0a6ee38 // _instance 第一次被初始化
temp:e333502c-757d-4526-b3c3-74f052ae0951
other thead entered.thread id = 1 2
_instace:e333502c-757d-4526-b 3c3-74f052ae0951 // _instance 第二次被改变1
2
3
4
5
6
2
3
4
5
6
上面的问题出在 Interlocked.Exchange 方法会强制每个线程对 _instance 变量赋值,这会导致后面的线程覆盖了前面线程创建号的对象,这和单例模式相违背了。解决这个问题的关键在于,当 _instance 不为 null 时,我们希望不要强制交换值。幸好,Interlocked 类提供了一个可用于比较后再赋值的原子操作方法 CompareExchange,下面对上面的代码进行一个小改动:
csharp
private class Singleton
{
private static Singleton _instance;
private Guid _id;
private Singleton(Guid id)
{
this._id = id;
}
public override string ToString()
{
return _id.ToString();
}
public static Singleton Instance()
{
if (_instance == null)
{
Singleton temp = new Singleton(Guid.NewGuid());
Console.WriteLine("temp:" + temp.ToString());
if (Interlocked.CompareExchange(ref _instance, temp, null) == null) // 这里换用 compareExchange 方法
{
Console.WriteLine("single thead entered.thread id = { 0 }", Thread.CurrentThread.Manag edThreadId);
}
else
{
Console.WriteLine("other thead entered.thread id = { 0 }", Thread.CurrentThread.Manag edThreadId);
}
Console.WriteLine("_instace:" + _instance.ToString());
}
return _instance;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
CompareExchange 方法,判断原始值是我们期望的值时,才进行交换。如这句 Interlocked.CompareExchange(ref _instance, temp, null),我们就是希望 _instance 为 null 是才把 temp 赋给它。运行结果:
Text
temp:6be7b8d8-ff1e-49b0-abf4-fafbf89c40e7
single thead entered.thread id = 1 2
_instace:6be7b8d8-ff1e-49b0-a bf4-fafbf89c40e7 // 第一次初始化
temp:90cc00e8-98be-4b44-be22-9957384dfd86
other thead entered.thread id = 1 1
_instace:6be7b8d8-ff1e-49b0-a bf4-fafbf89c40e7 // 第二次有其他线程进来,它的值还是第一次的值,并没有被覆盖1
2
3
4
5
6
2
3
4
5
6
特别注意:可能你会认为下面的代码和 CompareExchange 是等价的,这是不对的!
csharp
if (_instance == null)
{
// 这里仍然可能有多个线程进来
Interlocked.Exchange(ref _instance, temp);
}1
2
3
4
5
2
3
4
5
CompareExchange 相当于把判断放在了函数内部,但它是一个原子性的操作,其他线程进不来的。
有了以上的验证,我们实现单例模式完全可以这样写,而不用 lock:
csharp
private class Singleton
{
private static Singleton _instance;
private Singleton(){}
public static Singleton Instance()
{
if (_instance == null)
{
Singleton temp = new Singleton();
Interlocked.CompareExchange(ref _instance, temp, null);
}
return _instance;
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2.3. volatile 关键字
这是为简化 VolatileWrite 和 VolatileRead 的编程而提供的关键字。对于标注为 volatile 的变量:就是表明该变量可能会被多线程访问,每次访问该变量都应从内存中读取新值,而不应该用寄存器中保留的值。对于多 cpu 的机器而言,每个 cpu 的寄存器可能都需要刷新,在一定程度上会损害部分性能。(上面的几个方法同样会有这个问题)
csharp
private volatile bool m_finish = false;
private void VolatileKeywordTest()
{
m_finish = false;
ThreadPool.QueueUserWorkItem(o => WaitingForFinish());
Thread.Sleep(5000);
m_finish = true;
}
private void WaitingForFinish()
{
while (m_finish == false) { }
Console.WriteLine("the work is down");
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
有了这个关键字,我们就不需要再用 VolatileRead 方法去读取一个变量了。
2.4. Interlocked Anything 模式
这个模式的名字是 Jeffrey 给起的,它究竟要解决什么问题,我们为什么要用它?带着这些疑问,我们来看看它的应用场景。
看看下面这个设定最大值的例子:
csharp
private static Int32 Maximum(ref int target, int value)
{
int oldValue;
oldValue = target;
// 计算最大值
int temp = Math.Max(target, value);
// 注意:这里可能有其他线程恰好修改了 target 的值
// 如果 target 是 oldValue,则替换成最大的值
Interlocked.CompareExchange(ref target, temp, oldValue);
return target;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
测试代码:
csharp
int v1 = 10;
v1 = Maximum(ref v1, 20);
Console.WriteLine("v1:" + v1);1
2
3
4
2
3
4
如果恰好有一个线程在 Interlocked.CompareExchange 这句代码执行前修改了 target 的值为 15,那么我们这里的最大值 20 就不会生效,因为 CompareExchange 的比较条件将会失败。我们得到的输出,有可能是:v1:15。
这样的结果可能不是我们的预期,问题在于上面的代码只调用了 CompareExchange 一次,如果不成功就跳过了这次修改。所以,我们可以考虑对 CompareExchange 做一个循环判断,不成功就再度尝试修改,直到成功为止。而这正是 Jeffery 所说的 Interlock Anything 模式。
我们进一步修改上面的代码:
csharp
private static Int32 Maximum(ref int target, int value)
{
int oldValue, currentValue;
currentValue = target;
do
{
// oldValue 记录当前循环开始时的原始值
oldValue = currentValue;
// 计算最大值
int temp = Math.Max(target, value);
// 注意:这里可能有其他线程恰好修改了 target 的值
// 如果 target 是 oldValue,则替换成最大的值。
// 如果被其他线程修改了,currentValue 返回的将会是被其他线程修改后的最新值,比如:15
currentValue = Interlocked.CompareExchange(ref target, temp, oldValue);
}
while (currentValue != oldValue); // 如果被修该,则进入下一次循环,尝试再次修改
return target;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
注释中已经写得很清楚了,可能你需要反复的思考一下上面的代码,理解其意图。这里只是对 Int32 的值进行了修改,利用这个模式实际可以对任何引用类型的值进行修改。从而避免多线程访问共同变量带来的冲突。
对于这个模式的应用,我们看看微软给我们的示范:在实现事件(Event)时是如何运用了这个模式的。
首先定义下面的一个事件:
csharp
public delegate bool UploadFileCompleteEventHander(string fileName);
public event UploadFileCompleteEventHander UploadFileComplete;1
2
2
我们知道,事件本身还是用代理来实现的,只不过编译器帮我们做了这部分工作,反编译这个事件的代码:
csharp
public event UploadFileCompleteEventHander UploadFileComplete
{
add
{
UploadFileCompleteEventHander hander2;
UploadFileCompleteEventHander uploadFileComplete = this.UploadFileComplete;
do
{
hander2 = uploadFileComplete;
UploadFileCompleteEventHander hander3 = (UploadFileCompleteEventHander) Delegate.Combine(hander2, value);
uploadFileComplete = Interlocked.CompareExchange<UploadFileCompleteEventHander>(ref this.UploadFileComplete, hander3, hander2);
}
while (uploadFileComplete != hander2);
}
remove
{
UploadFileCompleteEventHander hander2;
UploadFileCompleteEventHander uploadFileComplete = this.UploadFileComplete;
do
{
hander2 = uploadFileComplete;
UploadFileCompleteEventHander hander3 = (UploadFileCompleteEventHander) Delegate.Remove(hander2, value);
uploadFileComplete = Interlocked.CompareExchange<UploadFileCompleteEventHander>(ref this.UploadFileComplete, hander3, hander2);
}
while (uploadFileComplete != hander2);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
看见了吗,上面的 add 和 remove 方法,对代理的添加和删除就是用的这个模式,多么经典。保证多个线程注册一个事件时,不会出现冲突。
总结:
- 一个变量可能被多线程访问时,可以考虑运用这个模式。
- 它的速度比阻塞式同步,如
lock等,要快很多。 - 它能对任何类型的变量进行赋值,而不仅仅局限于
Int32,等原子操作类型。
3. 内核模式构造
3.1. 内核模式构造
内核模式构造,采用的是 Windows 操作系统来同步线程,比 VolatileRead、VolatileWrite、Interlocked 等用户模式的构造慢很多。相对于用户模式的构造,它也有自己的优点:
- 不用像用户模式那样占着 cpu “自旋”,浪费 cpu 资源;
- 内核模式可同步在同一机器不同进程中运行的线程;
- 可实现本地和托管线程相互之间的同步;
- 一个线程可以一直阻塞,直到一个集合中的内核对象全部可用,或部分可用(WaitAll,WaitAny);
- 阻塞一个线程时,可以指定一个超时值,超过这个时间就解除阻塞;
3.2. FCL 提供的内核模式构造
WaitHandle(抽象类) ├ EventWaitHandle │ ├ AutoResetEvent │ └ ManualResetEvent ├ Semaphore └ Mutex
他们都继承了 WaitHandle 抽象类,WaitHandle 提供了下列共同的静态方法:
WaitOne:阻塞调用线程,直到收到一个信号;WaitAny:阻塞调用线程,直到收到任意一个信号;WaitAll:阻塞调用线程,直到收到全部信号;SingleAndWait:向指定的内核对象发出信号,并等待另一个内核对象收到信号;Close/Dispose:关闭内核对象句柄;
3.2.1. EventWaitHandle
它属于事件(event),事件是内核维护的 Boolean 变量。如果事件为 false,在事件上等待的线程就阻塞;如果事件为 true,就解除阻塞。它主要有两个方法:
Set:将事件设为true;ReSet:将事件设为false;
注意:初始化的时候我们可以指定事件的初始值,比如下面的例子就是指定初始时事件为 false。
csharp
EventWaitHandle ewh = new EventWaitHandle(false, EventResetMode.AutoReset); // 等同于 AutoResetEvent
EventWaitHandle ewh = new EventWaitHandle(false, EventResetMode.ManualReset); // 等同于 ManualResetEvent1
2
2
3.2.2. AutoResetEvent
AutoResetEvent 是 EventWaitHandle 的一个简单包装,内部没有额外的任何逻辑。它最大的特点就是,调用了 Set 方法将事件设为 true 之后,其中一个等待线程得到执行后,它会自动调用 Reset 方法,将事件信号设为 false,以阻塞其它的线程。相当于放一个线程进来,门自动就关了(自动门)。
例子,使用 AutoResetEvent 实现一个简单的线程同步锁。
csharp
private class SimpleWaitLock : IDisposable
{
// 初始化一定要是 true,否者,第一个调用 Enter 方法的线程会被阻塞
private AutoResetEvent are = new AutoResetEvent(true);
#region IDisposable
public void Enter()
{
are.WaitOne(); // 第一个线程调用这个方法后,事件将会为 false,其他线程会被阻塞
Console.WriteLine("thread={0}", Thread.CurrentThread.ManagedThreadId);
}
public void Exit()
{
are.Set(); // 退出时,将事件信号设为 true,放一个线程进来后,马上设为 false(调用 reset)
}
public void Dispose()
{
are.Dispose();
}
#endregion
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
3.2.3. ManualResetEvent
ManualResetEvent 是 EventWaitHandle 的一个简单包装,内部也没有额外的任何逻辑。它和 AutoResetEvent 唯一的不同是,调用了 Set 方法将事件设为 true 后,不会去调用 Reset 方法,这将导致事件一直处于 true,其它等待的多个线程都会得到执行,直到你手动调用 Reset 方法。相当于你把门打开后,需要手动去关(非自动门)。
3.2.4. Semaphore
信号量(semaphore)是内核维护的一个 Int32 的变量。信号量为 0 时,在信号量上等待的线程会阻塞;信号量大于 0 时,就解除阻塞。主要方法:
Release():就是一个加1的操作;Release(int32 releasecount):就是一个加releasecount的操作;
初始化 Semaphore 时,可以指定最大和最小信号量的值。
用 Semaphore 实现同样功能的同步锁:
csharp
private class SimpleWaitLock : IDisposable
{
// 初始化指定计数值为 1,允许第一个线程可用
private Semaphore sp = new Semaphore(1, 1);
#region IDisposable
public void Enter()
{
sp.WaitOne(); // 第一个线程调用这个方法后,计数值减 1,变为 0,其他线程会被阻塞
Console.WriteLine("thread={0}", Thread.CurrentThread.ManagedThreadId);
}
public void Exit()
{
sp.Release(); // 计数值加 1,其他线程可用
}
public void Dispose()
{
sp.Dispose();
}
#endregion
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
3.2.5. Mutex
互斥体(mutex)和计数值为 1 的 Semaphore 或 AutoResetEvent 的工作方式非常相似。这三种方式每次都只释放一个等待的线程。主要方法:
ReleaseMutex():计数减去1;
它有一个最大的不同是,它可以在同一线程上循环调用,也就是多次调用 WaitOne(),然后在调用等次数的 ReleaseMutex()。直到 Mutex 的计数为 0 时,其他等待的线程才能被调用。这种方式在平常中可能不太会用到。
可以用 Mutex 来防止应用程序二次启动,这在平常工作中也经常会碰到。
简单示例:
csharp
static void Main()
{
Application.EnableVisualStyles();
Application.SetCompatibleTextRenderingDefault(false);
bool createNew;
Mutex mutex = new Mutex(false, "ApplicationGuidName", out createNew);
// 没有启动,就创建一个新的
if (createNew)
{
Application.Run(new Form1());
}
else
{
// 已经启动了
MessageBox.Show("程序已经启动,不能重复启动!");
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
注意:也可以用 Semaphore 和 EventWaithandle 来实现上面的功能,原理是一样的。但 AutoResetEvent 和 ManualResetEvnet 不行,他们没有提供类似的构造方法。
3.2.6. ThreadPool.RegisterWaitForSingleObject
可以通过 ThreadPool.RegisterWaitForSingleObject 方法注册一个方法。当一个事件收到信号,或是指定的时间超时,就会自动调用这个方法。
这个方法对于 AutoResetEvent 特别有用。但不太适合 ManualResetEvent,因为要手动去调用 Reset 方法,不然会无限的调用这个方法。
csharp
private void TestAutoCallBack()
{
AutoResetEvent mre = new AutoResetEvent(false);
// 设定超时为 2000ms
ThreadPool.RegisterWaitForSingleObject(mre, new WaitOrTimerCallback(WaitCallBack), "123", 2000, false);
// 发出一个信号
mre.Set();
// 故意等代 3000ms
Thread.Sleep(3000);
}
// 有信号或超时就会调用这个方法
private void WaitCallBack(object state, bool timeOut)
{
Console.WriteLine("is time out = {0}", timeOut);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
运行结果:
Text
is time out = False // 得到信号时的调用
is time out = True // 超时的调用
is time out = True // 超时的调用1
2
3
2
3
4. 混合模式
4.1. 一个简单的混合同步锁
csharp
/// <summary>
/// 简单的混合同步锁
/// </summary>
private sealed class HybirdLock {
private int m_waiters = 0;
AutoResetEvent m_waitLock = new AutoResetEvent(false);
public void Enter() {
// 如果只有一个线程,直接返回
if (Interlocked.Increment(ref m_waiters) == 1)
return;
// 1 个以上的线程在这里被阻塞
m_waitLock.WaitOne();
}
public void Leave() {
// 如果只有一个线程,直接返回
if (Interlocked.Decrement(ref m_waiters) == 0)
return;
// 如果有多个线程等待,就唤醒一个
m_waitLock.Set();
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
优点:只有一个线程的时候仅在用户模式下运行(速度极快),多于一个线程时才会用到内核模式(AutoRestEvent),这大大的提升了性能。由于线程的并发访问毕竟是少数,多数情况下都是一个线程在访问资源,利用用户模式构造可以保证速度,利用内核模式又可以阻塞其它线程(虽然也有线程切换代价,但比起用户模式的一直自旋浪费 cpu 时间可能会更好,况且只有在多线程冲突时才会使用这个内核模式,几率很低)。
4.2. 自旋 + 线程所有权 + 递归的混合同步锁
- 自旋:使多线程并发时,可以在一定的时间内维持在用户模式,如果在这个期间获得了锁,就不用切换到内核模式,以避免切换的开销;
- 线程所有权:只有获得锁的线程才能释放锁;
- 递归:就是同一线程可以多次调用获取锁的方法,然后调用等次数的释放锁的操作(mutex 就属于这种类型);
下面来看看具体的实现:
csharp
/// <summary>
/// 加入自旋,线程多有权,递归的混合同步锁
/// </summary>
private sealed class AnotherHybirdLock : IDisposable {
private int m_waiters = 0; // 等待的线程数
private int m_spinCount = 4000; // 用户模式自旋的次数(可以调整大小)
private int m_owningThreadId = 0; // 用于判断获取和释放锁是不是同一线程
private int m_recursion = 0; // 同一线程循环计数(为 0 时,代表该线程不拥有锁了)
AutoResetEvent m_waitLock = new AutoResetEvent(false); // 切换到内核模式是,用于同步
private void Enter() {
int threadId = Thread.CurrentThread.ManagedThreadId;
// 同一线程,多次调用的情况
if (m_owningThreadId == threadId) {
m_recursion++;
return;
}
// 先采用用户模式自旋,这避免了切换
SpinWait spinWait = new SpinWait(); // .Net 自带的用于用户模式等待的类
for (int i = 0; i < m_spinCount; i++) {
// 试图在用户模式等待获得锁,如果获得成功,应跳过内核模式的阻塞
if (Interlocked.CompareExchange(ref m_waiters, 1, 0) == 0) {
// 这里用了 goto 语句,可以用 flag 等去掉 goto
goto GotLock;
}
spinWait.SpinOnce();
}
// 内核模式阻塞(在尝试获取了一次)
// 如果 = 1,也不用在内核模式阻塞
if (Interlocked.Increment(ref m_waiters) > 1) {
// 多个线程在这里都会被阻塞
m_waitLock.WaitOne(); // 性能损失
// 等这个线程醒来时,它拥有锁,并记录一些状态
}
GotLock:
// 线程获取锁是记录线程 Id,重置计数为 1
m_owningThreadId = threadId;
m_recursion = 1;
}
private void Leave() {
int threadId = Thread.CurrentThread.ManagedThreadId;
// 检查释放锁的线程的一致性
if (threadId != m_owningThreadId)
throw new SynchronizationLockException("Lock not owned by calling thread");
// 同一线程,循环计数没有归 0,不能递减线程计数
if (--m_recursion > 0) return;
m_owningThreadId = 0; // 没有线程拥有锁
// 么有其它线程被阻塞,直接返回
if (Interlocked.Decrement(ref m_waiters) == 0)
return;
// 有其他线程被阻塞,唤醒其中一个
m_waitLock.Set(); // 这里有性能损失
}
#region IDisposable
[MethodImpl(MethodImplOptions.Synchronized)]
public void Dispose() {
m_waitLock.Dispose();
}
#endregion
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
注释中,已经写的相当详细了,一定要好好理解它的实现方式,我们最常用的 Monitor 类和它的实现方式几乎一样。
4.3. 细数 FCL 提供的混合构造
有了上面的自定义混合同步构造的基础,再来看看 .net 为我们都准备了哪些能够直接使用的混合同步构造。
特别要注意一点:它们的性能都会比单纯的内核模式构造(如 Mutex、AutoResetEvent 等)要好很多,在实际项目中,要酌情使用。
4.3.1. ManualResetEventSlim、SemaphoreSlim
它们的构造和内核模式的 ManualResetEvent、Semaphore 完全一致,只是它们都在用户模式中 “自旋”,而且都推迟到发生第一次竞争时,才创建内核模式的构造。另外,可以向 Wait 方法传入 CancellationToken 以支持取消。
4.3.2. Monitor 类和同步块
Monitor 类应该是我们我们使用得最频繁的同步技术。它提供了一个互斥锁,这个锁支持自旋,线程所有权和递归。和我们上面展示的那个自定义同步类 AnotherHybirdLock 相似。它是一个静态类,提供了 Enter 和 Exit 方法用于获取锁和释放锁,会使用到传递给 Enter 和 Exit 方法对象的同步块。同步块的构造和 AnotherHybirdLock 的字段相似,包含一个内核对象、拥有线程的 ID、一个递归计数、以及一个等待线程的计数。关于同步块的概念,可以查阅其它的资料,这里不做太多的讲解。
Monitor 存在的问题以及使用建议:
- 最好提供一个私有的专用字段用于锁。如:
private objectm_lock = new object();如果方法是静态的,那么这个锁字段也标注成静态(static)就可以了。 - 不要对
string对象加锁。原因是,字符串可能留用(interning),两个完全不同的代码段可能指向同一个string对象。如果加锁,两个代码段在完全不知情的情况下就被同步了。另一个原因是跨界一个AppDomain传递一个字符串时,不会复制副本,相反,它传递的是一个引用,如果加锁,也会出现上面的情况。这是 CLR 在AppDomain隔离中的一个 bug。 - 不要锁住一个类型(Type)。如果一个类型对象是以 “AppDomain 中立” 的方式加载,它会被其它
AppDomain共享。线程会跨越AppDomain对该类型对象加锁,这也是 CLR 的一个已知 bug。 - 不要对值类型加锁。每次调用
Monitor的Enter方法,都会对这个值类型装箱,造成每次锁的对象都不一样,无法做到线程同步。 - 避免向一个方法应用
[MethodImpl( MethodImplOptions.Synchronized)]特性。如果方法是一个实例方法,那么 JIT 编译器会加入Monitor.Enter(this)和Monitor.Exit(this)来包围代码。如果是一个静态方法,传给Enter方法的就是这个类的类型。 - 调用一个类型的类型构造器(静态构造函数)时,CLR 要获取类型对象上的一个锁,确保只有一个线程初始化类型对象及其静态字段。同样,如果类型是以 “AppDomain 中立” 的方式加载,也会出现问题。例如,静态构造函数里出现一个死循环,进程中所有
AppDomain都不能使用该类型。所以要尽量保证静态函数短小简单,或尽量避免用类型构造器。
4.3.3. lock 关键字
lock 关键字是对 Monitor 类的一个简化语法。
csharp
public void SomeMethod() {
lock (this) {
// 对数据的独占访问。。。
}
}
// 等价于下面这样
public void SomeMehtodOther() {
bool lockToken = false;
try {
// 线程可能在这里推出,还没有执行 Enter 方法
Monitor.Enter(this, ref lockToken);
// 对数据的独占访问。。。
} finally {
if (lockToken) Monitor.Exit(this);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
lockToken 变量的作用:如果一个线程在没有调用 Enter 方法时就退出,这时它的值为 false,finally 块中就不会调用 Exit 方法;如果成功获得锁,它就为 true,这时就可以调用 Exit 方法。
lock 关键字存在的问题:
Jeffrey 指出,编译器为 lock 关键字生成的代码默认加上了 try / finally 块,如果在对数据的独占访问时发生了异常,当前线程是可以正常退出的。但是,如果有其他的线程正在等待,它们会被唤醒,从而访问到由于异常而被破坏掉的脏数据,进而引发安全漏洞。与其这样,还不如让进程终止。另外,进入一个 try / finally 块会使代码的速度变慢。它建议我们杜绝使用 lock 关键字,当然,估计太多的程序员都在使用 lock 关键字,该不该杜绝使用,自己判断。
4.3.4. ReaderWriterLockSlim
互斥锁保证多线程在访问一个资源时,只有一个线程才会运行,其它的线程都阻塞了,这会降低应用程序的吞吐量。如果所有线程都以只读的方法访问资源,我们就没有必要阻塞它。另一方面,如果一个线程希望修改数据,就需要独占的访问。ReaderWriterLockSlim 就能解决这个问题。
它的实现方式是这样的:
- 一个线程写入数据时,其它的所有线程都被阻塞;
- 一个线程读取数据时,请求读取的线程继续执行,请求写入的线程被阻塞;
- 一个线程写入结束后,要么解除一个写入线程的阻塞,要么解除一个读取线程的阻塞。如果没有线程被阻塞,锁就自由了;
- 所有读取线程结束后,一个写入线程解除阻塞(可见读取更优先);
一个简单的例子:
csharp
public class MyResource : IDisposable {
// LockRecursionPolicy(NoRecursion, SupportsRecursion)
// SupportsRecursion 会导致增加递归,开销会变得很大,尽量用 NoRecursion
private ReaderWriterLockSlim m_lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
private object m_source;
public void WriteSource(object source) {
m_lock.EnterWriteLock();
// 写独占访问
m_source = source;
m_lock.ExitWriteLock();
}
public object GetSource() {
m_lock.EnterReadLock();
// 共享访问
object temp = m_source;
m_lock.ExitReadLock();
return temp;
}
#region IDisposable
public void Dispose() {
m_lock.Dispose();
}
#endregion
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
.net 1.0 提供了一个 ReaderWriterLock,少了一个 Slim 后缀。它存在下面的几个问题:
- 不存在线程竞争,数度也很慢;
- 线程所有权和递归被它进行了封装,并且还取消不了;
- 相比
writer,它更青睐reader,这可能造成writer排很长的对而得不到执行;
4.3.5. CountDownEvent
不太常用。这个构造阻塞一个线程,直到它的内部计数为 0。这和 Semaphore 恰恰相反。如果它的 CurrentCount 变为 0,就不能再度更改了。再次调用 AddCount 方法会抛出异常。
4.3.6. Barrier
不太常用。它可以用于一系列线程并行工作。每个参与者线程完成阶段性工作后,都调用 SignalAndWait 方法阻塞自己,最后一个参与者线程调用 SignalAndWait 方法后会解除所有线程的阻塞。
原文链接