当多个线程同时访问共享数据对象时,就需要线程同步,以保证数据状态不会被破坏。线程同步的通常做法是加“锁”,以保证某一时刻只有拥有这个锁的对象才能够去操作数据。加“锁”能够保证共享数据不会被破坏,但是它增加了代码的复杂性,并且有时候不容易测试和重现。另外“锁”增加了系统开销,会损害系统性能。
理想的情况下,通过梳理逻辑流程消除多个线程同时访问同一块数据从而避免线程同步问题。一些函数式编程语言,比如Erlang、Lisp、F#中,对象默认是immutant不可变的,每个线程都只能操作自己的数据副本,所以不需要加锁,天然适合多线程。在C#中,也有一些不可变类型(immutable),比如最常见的String,每次对String类型对象的操作比如Substring都会返回一个全新的对象,旧的对象就会丢弃直至被垃圾回收。另外在System.Collections.Immutable命名空间下也有一系列不可变集合,比如Immutable
public static Int32 Max(Int32 val1, int32 val2)
{
return (val1 < val2) ? val2 : val1;
}
但有些情况下,线程同步不可避免。这时候就必须加锁,否则就会产生数据不一致、甚至数据损坏等严重问题。在C#中, “锁”即所谓的线程同步构造。线程同步构造有很多种,不同种类的线程同步构造有各自的优缺点,它们对性能影响的程度也不同。
按照《CLR via C#》这本书的归类方法,最基础的同步构造有两大类:用户模式构造(User-Model Constructs)和内核模式构造(Kernal-Model Constructs)两种。在此基础上又分为两类:如果只使用用户模式构造或者内核模式构造其中的一种就叫基元线程同步构造(Primitive Thread Synchronization Constructs),基元(Primitive)是指可以在代码中使用的最简单的构造,比较单纯;如果同时使用这两种,就是混合线程同步构造(Hybrid Thread Synchronization Constructs)。下面就各种类型做简要说明。
用户模式构造和内核模式构造
基元线程同步构造分为用户模式构造和内核模式构造。用户模式构造的速度要明显快于内核模式构造,之所以用户模式构造这么快是因为它使用了特殊的CPU指令来协调线程,这种协调是在硬件中发生的,这也意味着Windows操作系统检测不到一个线程在用户构造模式下产生了阻塞(这有点类似在执行异步IO操作的时候,线程向硬盘上的控制单元发出读写指令,此时的CPU是不需要做任何事情的,直到接到通知)。所以假如线程池的某个线程在用户模式构造上产生了阻塞,线程池不会认为该线程被阻塞,所以不会创建更多的线程来替换被阻塞的线程来执行更多的操作。另外,这些CPU指令只会阻塞线程相当短的时间。
所以应该尽量采用用户模式构造,但它也有缺点:只有Windows操作系统内核才可以停止一个线程的运行以防止它浪费CPU时间。在用户模式中运行的线程可能被抢占(preempted),但线程会以最快的速度再次调度,所以想要获取资源,但暂时获取不到的线程会一直在用户模式中“自旋”,这可能会浪费大量的CPU时间,而这些CPU时间本可以用来执行其它更有用的工作,更好的做法就是让CPU停下来,至少能省电。
内核模式构造这时就派上用场了,它是由Windows操作系统自身提供的,所以需要在程序中调用由操作系统内核来实现函数。将线程从用户模式切换到内核模式会产生巨大的性能损失,这也是为什么要避免使用内核模式构造的原因。但它有个最大的优点,线程通过内核模式的构造获取其它线程拥有的资源时,Windows会阻塞当前线程避免它浪费CPU时间。当资源变得可用时,Windows会恢复线程,允许它访问资源。
对于一个在构造上等待的线程,如果拥有这个构造的线程一直不释放它,前者就会一直阻塞。如果是用户模式的构造,则线程将一直在一个CPU上运行,这种称之为“活锁”(livelock)。如果是内核模式的构造,则线程将一直阻塞,这种称之为“死锁”(deadlock)。这两种情况都不好,但“死锁”优于“活锁”,因为活锁既浪费CPU时间,也浪费内存,而死锁只浪费内存。
理想中的构造应该兼具两者的长处。就是说在没有竞争的条件下,这个构造应该快且不会阻塞,就像用户模式构造那样。但如果存在对构造的竞争,则希望它被操作系统内核阻塞,这样的构造确实存在,称之为混合构造(hybird construct)。在应用程序中使用混合构造很常见,因为在大多数应用中,很少会有两个或多个线程同时访问相同的数据,混合构造使得应用程序在大多数时间都快速运行,偶尔运行的慢是为了阻塞线程,但这时慢不要紧,因为线程反正都要阻塞。
用户模式构造
CLR保证对Boolean、Char、(S)Byte、(U)Int16、(U)Int32、(U)IntPtr、Single以及引用类型的变量的读写是原子性的。这意味着变量中的所有字节都是一次性的写入或者读出,不会有中间状态的值。比如:
public static class SomeType
{
public static int x = 0;
}
SomeType.x = 0x1234567;
x变量会一次性的从0x00000000变为0x01234567。不可能有其它线程查询SomeType.x并得到0x01230000这样的值。但如果x是一个Int64类型的值,那么当一个线程执行下面代码时:
SomeType.x = 0x0123456789abcdef;
另外一个线程查询x的值,可能会得到0x123456700000000或0x0000000089abcdef。因为读取和写入不是原子性的,它需要两条CPU的MOV指令来完成操作。
虽然对变量的原子访问可以保证读取或者写入一次性完成,但是由于编译器和CPU的优化,不保证这个操作什么时候发生。基元用户模式构造可以规划这些原子性读取、写入操作的时间。C#中有两种基元用户模式线程同步构造:
- volatile 构造。
- interlock构造。
上面这两种同步构造都需要传递值类型的引用,就是内存地址。
Volatile构造
编译器在除了执行诸如函数内联,常数展开等优化之外,有时候还会调整语句执行的先后顺序,如果是单线程这种问题不大,如果是多线程,语句的先后执行顺序在多线程条件下可能会输出不一样的结果,另外对于变量的读取,可能直接从CPU寄存器里读取之前缓存的数据,而不是每次都从RAM里去读最新的数据。比如下面的代码:
class ThreadShareData
{
private int m_flag = 0;
private int m_value = 0;
public void Thread1()
{
//以下代码可能会以相反的顺序执行
m_value = 5;
m_flag = 1;
}
public void Thread2()
{
//m_value可能限于m_flag读取
if (m_flag == 1)
{
Console.WriteLine(m_value);
}
}
}
它有一下可能的输出结果:
- 5:Thread1执行完成之后,开始执行Thread2。
- 0:Thread1可能会认为调整两个赋值的先后顺序不会改变程序的逻辑,所以有可能先给m_flag赋值为1,然后在准备给m_value赋值之前,Thread2抢先执行,此时m_value还未赋值,所以输出结果为0;或者Thread1在給两个变量赋值之后,Thread2在读取m_value的时候,并没有实时的从RAM中读取最新的数据,而是从CPU缓存中读取的m_value的0值,所以还是输出0。
- 什么都不显示:Thread2在Thread1为m_flag赋值之前已经执行;或者没有读取到Thread1对m_flag的修改,这两种情况都会导致条件不满足,所以输出为空。
要解决这一问题,可以使用Volatile.Read和Volatile.Write对变量进行读写:
- Volatile.Write方法强迫location的值在调用时写入,并且保证在该语句调用之前的加载和写入都已经完成。简单来说,就是可以用Volatile.Write来写入最后一个值。
- Volatile.Read方法强迫location的值在调用时读取,并且保证在在该语句之后的加载和读取都发生在该语句之后,简单来说就是可以用Volatile.Read来读取第一个值。
这两个语句保证的代码中的某条语句的读取或者赋值发生的时间不会被调整(需要注意的是,它不能保证之前或之后的多条语句的先后顺序,之前或之后的语句先后顺序仍然可能被调整。)同时,也能保证变量的读取实时从RAM中读取最新的值,杜绝了之前可能产生的从CPU缓存中读取的情况。修改后的代码如下:
class ThreadShareData
{
private int m_flag = 0;
private int m_value = 0;
public void Thread1()
{
//保证在将1写入m_flag之前,必然已经将5写入m_value
m_value = 5;
Volatile.Write(ref m_flag, 1);
}
public void Thread2()
{
//保证m_value一定在m_flag读取之后读取
if (Volatile.Read(ref m_flag) == 1)
{
Console.WriteLine(m_value);
}
}
}
现在,输出结果要么为5,要么为为空,不会出现输出为0的情况了。
为了简化Volatile.Read和Write的书写,C#编译器提供了volatile关键字,这个关键字可以用到静态或实例字段,还可以应用到引用字段。JIT编译器确保对易变字段的访问都是以易变读或易变写的方式进行,另外它也高速编译器不要讲字段缓存到CPU的寄存器中,确保所有的读写操作都在RAM中进行。重写后的代码如下:
class ThreadShareData
{
private volatile int m_flag = 0;
private int m_value = 0;
public void Thread1()
{
//保证在将1写入m_flag之前,必然已经将5写入m_value
m_value = 5;
m_flag = 1;
}
public void Thread2()
{
//保证m_value一定在m_flag读取之后读取
if (m_flag == 1)
{
Console.WriteLine(m_value);
}
}
}
但volatile关键字会有性能问题。因为大多数算法都不需要对字段进行易变的读取或者写入,大多数字段的访问都可以按照正常的方式进行,这样能提高性能,要求对字段的都有访问都是易变的这种情况极为少见。如果将易变读取操作用在下面的算法上,就会有非常严重的性能问题:
m_value = m_value + m_value;
通常,要将某个数增加一倍,只需要将他的所有位数向左移1位,大多数编译器都能检查到上述代码的意图并执行优化。如果m_value是volatile字段,就不允许执行这个优化了。编译器必须生成代码将m_value读入一个寄存器,再把它读入另外一个寄存器,然后把两个寄存器加到一起,在将结果返回m_value字段,这种代码更大、更慢。如果这条语句在一个循环中,那么性能问题就更严重。
Interlocked互锁构造
Interlocked的每个方法也都执行原子的读取或者写入操作。它有以下常用的方法:
public static class Interlocked
{
//将location位置的int值+1,并返回+1的值
public static int Increment(ref int location);
//将location位置的int值-1,并返回-1的值
public static int Decrement(ref int location);
//将location位置的int值+value,并返回+value后的值
public static int Add(ref int location1, int value);
//将location位置的int值替换为value,并返回localtion1的原始值,即替换之前的值
//相当于int old=location1;location1=value;return old;
public static extern int Exchange(ref int location1, int value);
//如果location1的值等于comparand,则将value赋值给location1,否则什么也不做。
//int old=location1;if (location1==comparand) location1=value; return old
public static extern int CompareExchange(ref int location1, int value, int comparand);
}
Exchange方法类似Volatile.Write方法。可以用它来构造一个简单的自旋锁SpinLock。
public struct SimpleSpinLock
{
private int m_resourceInUse = 0;//0-默认 1-use
public void Enter()
{
while (true)
{
//当线程A第一次进来时,m_resourceInUse为0,然后将它改为1,表示已使用
//该方法返回m_resourceInUse原始值0,所以第一次返回0,满足条件退出
//假设线程B也想进来,因为m_resourceInUse已经为1,这里不满足条件,所以一直在这里while循环等待
//直到等待线程A调用Leave方法,将m_resourceInUse置为0未使用,线程B才会跳出循环并拥有锁继续操作
if (Interlocked.Exchange(ref m_resourceInUse, 1) == 0)
{
return;
}
}
}
public void Leave()
{
//将资源标记为未使用
Volatile.Write(ref m_resourceInUse, 0);
}
}
使用起来也非常简单:
public class SomeResource
{
private SimpleSpinLock m_sl=new SimpleSpinLock();
public void AccessResource()
{
m_sl.Enter();
//Do something only one thread can do things
m_sl.Leave();
}
}
这个SimpleSpinLock是一个线程同步锁的简单实现。这种锁的最大问题在于,当存在对锁的竞争时,会造成线程“自旋”,就是前面看到的while死循环,这种“自旋”会浪费CPU资源,阻止CPU做其他更有意义的事情。所以,自旋锁应该用来保护那些执行的非常快的代码。
在FCL中提供了 System.Threading.SpinLock 结构,它的作用跟前面的SimpleSpinLock很相似,而且都是结构体,而不是类,所以它是轻量级的内存友好的。在内部它使用了System.Threading.SpinWait。如果要将锁同集合中的每一项进行关联,则SpinLock是比较好的选择,因为它是值类型,所以不要传递SpinkLock的实例,因为值类型会复制一份副本,也不要将字段定义为readonly,因为在操作锁的时候,其内部装填必须改变。
默认的SpinLock会追踪调用者的线程,可以在构造函数里传递false来禁用该功能以提升性能。如果开启该功能,则只有拥有该锁的对象,才能调用Exit方法,否则会抛出异常。在我们的SimpleSpinLock里则不具备追踪调用者线程的功能,任何一个其它的线程都可以调用Leave方法,来释放当前线程拥有的锁。
Interlocked提供了有限的方法,但是利用Interlocked的CompareExchange可以实现乐观并发控制。
乐观锁,假设多用户并发的事物在处理时不会彼此相互影响,各事物在不产生锁的情况下处理各自影响的那部分数据,在提交数据更新之前,每个事务会先检查在该事物读取数据之后,有没有其他事务又修改了该数据。如果其他事务有更新,则正在提交的数据进行回滚。乐观并发控制适用于数据争用不大,冲突较少的环境,这种环境下,偶尔的事物回滚成本低于读取数据时锁定数据的成本,因此可以获得比其他并发控制更高的吞吐量。
悲观锁则相反,是基于一种悲观的态度类来防止一切数据冲突,它是以一种预防的姿态在修改数据之前把数据锁住,然后再对数据进行读写,在它释放锁之前任何人都不能对其数据进行操作,直到前面一个人把锁释放后下一个人数据加锁才可对数据进行加锁,然后才可以对数据进行操作。
下面是一个计算最大值的方法代码:
public static int Maxmum(ref int target, int value)
{
int currentVal = target;
int startVal, desireVal;
//不要在循环中访问target,这个值,其他线程可能会修改;除非是比较是否其他线程有修改
//value值是以传值进来的,所以不会被其它线程修改。
do
{
//记录该次迭代的起始值
startVal = currentVal;
//这里做其它任何操作,但是要基于startValue和传进来的value进行
desireVal = Math.Max(startVal, value);
//这里,可能其它线程已经修改了target的值,这样target就与startVal不相等了
//如果相等,表示暂时没有其它线程修改target,于是可以将结果赋值给target,
//但是,这里的赋值不是原子性的,所以不能直接写下面的代码,需要用原子性的操作来赋值
//if (target == startVal) target = desireVal;
//下面是原子性的操作,
//如果target和startVal相等,则表示其它线程没有修改target值,我们基于target和value计算出的
//desireVal就是正确的,所以直接将desireValue赋值给target,就是我们需要的结果。
//如果其它线程修改了target,那么target就跟startVal不相等,desireVal就是基于target的旧值来计算的,而不是最新的值,
//此时对target什么都不做,将其它线程修改后的值赋值给currentVal
currentVal = Interlocked.CompareExchange(ref target, desireVal, startVal);
//如果target在这一次循环中被其他线程修改,就重复以上操作
} while (startVal != currentVal);
//返回结果,该结果跟target的值是一样的
return desireVal;
}
以上方法,每一行都很重要,细节和注意事项都在注释里,这里再解释一下。在进入方法后,用currentVal保存了开始执行时候的target值。然后在循环内部,startVal也被初始化为target值。可以用startVal和value执行任何的操作,这个操作可以非常赋值,最终将结果保存到desireVal中。
在准备将desireVal会写到target中的时候,有可能其它线程也修改了target的值,如果真的发生了,那么我们计算的desireVal就是基于旧的存储在startVal中的值获得的,而不是基于最新的被其它线程修改后的新值。这时候就不应该修改target的值。这里用到了Interlocked.CompareExchange,currentVal始终存储的是target在调用Interlock.CompareExchagne之前的值,如果其它线程修改了target的值,那么也会反映到currentVal中,然后while循环中判断startVal跟currentVal的值,如果相等,表示没有其他线程修改target,结束。否则表示其它线程修改了target,而修改后的最新值就存储在了currentVal中,然后重新循环,使用最新的currentVal重新计算结果。
以上是一个通用的模式,可以封装成一个类来执行各种操作,比如:
delegate int Morpher<TResult, TArgument>(int startValue, TArgument argument, out TResult morphResult);
static TResult Morph<TResult, TArgument>(ref int target, TArgument argument, Morpher<TResult, TArgument> morpher)
{
TResult morphResult;
int currentValue = target;
int startVal, desireVal;
do
{
startVal = currentValue;
desireVal = morpher(startVal, argument, out morphResult);
currentValue = Interlocked.CompareExchange(ref target, desireVal, startVal);
} while (startVal != currentValue);
return morphResult;
}
使用方法如下:
int calcMax(int start, int end, out int result)
{
result = Math.Max(start, end);
return result;
}
void Main()
{
int target = 1;
int max = Morph<int, int>(ref target, 3, calcMax);
max.Dump();
Console.ReadLine();
}
内核模式构造
Windows提供了几个内核模式的构造来同步线程。内核模式构造比用户模式构造要慢得多,一个原因是他们要求Windows操作系统自身进行配合。另外一个原因是内核对象上调用的每个方法都会造成调用线程从托管代码转换为本机(native)用户模式代码,再转换为本机(native)内核模式代码,然后还要朝相反的方向一路返回。这些转换需要大量的CPU时间。经常执行会对应用程序的性能造成负面影响。
但内核模式构造具备用户模式构造不具备的优点。
- 内核模式的构造检测到在一个资源上的竞争时,Windows会阻塞输掉的线程,使它不占着一个CPU“自旋”,无谓的浪费CPU资源。
- 内核模式可以实现本机(native)和托管线程相互之间的同步,并且可以实现在同一台机器的不同进程中进行同步,比如可以实现诸如应用只可以启动一个实例的功能。
- 内核模式可以追踪当前拥有锁的线程,防止未拥有锁的线程访问锁的退出方法。并且可以应用递归锁。
- 线程可以一直阻塞,直到集合中的所有内核模式构造都可用,或者知道集合中的任何内核构造模式可用,比如可以使用WaitAny或者WaitAll。
- 内核模式构造上的阻塞可以设置超时时间,如果在指定的超时时间内获取不到希望的资源,线程就可以解除阻塞并执行其他任务,比如WaitOne可以设置timeout时间,如果没有收到信号,最多阻塞timeout时间,之后返回false,可以继续执行该语句后面的代码。
事件和信号量是两种基原(Primitive)的内核模式线程同步构造。其它的内核模式构造,都是在这两个构造上构建的。在System.Threading命名空间下提供了一个名为WaitHandle的抽象基类。他下面包含了几个派生类:
WaitHandle
EventWaitHandle
AutoResetEvent
ManualResetEvent
Mutex
Semaphore
WaitHandle提供了一些方法:
public abstract class WaitHandle : MarshalByRefObject, IDisposable
{
public virtual bool WaitOne();
public virtual bool WaitOne(int millisecondsTimeout);
public virtual bool WaitOne(TimeSpan timeout);
public static int WaitAny(WaitHandle[] waitHandles);
public static int WaitAny(WaitHandle[] waitHandles, int millisecondsTimeout);
public static int WaitAny(WaitHandle[] waitHandles, TimeSpan timeout);
public static bool WaitAll(WaitHandle[] waitHandles);
public static bool WaitAll(WaitHandle[] waitHandles, int millisecondsTimeout);
public static bool WaitAll(WaitHandle[] waitHandles, TimeSpan timeout);
public void Dispose();
}
- WaitOne会等待底层内核对象收到信号,如果收到信号,就返回true;如果有超时参数,则等待超时时间,如果超时,则返回false。如果不含超时参数,且没收到信号,则一直阻塞下去。
- WaitAll,会让调用线程等待WaitHandle[]数组中的所有内核对象都收到信号,如果都收到信号返回true,超时返回false,如果不含超时参数,且没有都收到信号,则一直阻塞下去。
- WaitAny,会让调用线程等待WaitHandle[]数组中任何对象只要有一个收到信号,就返回该收信号的内核对象在数组中的Index;如果在等待期间没有收到信号,则返回WaitHandle.WaitTimeout,如果不含超时参数,且没有都收任何信号,则一直阻塞下去。
- WaitAll或者WaitAny方法的数组,元素不能超过64个。
- 不接受超时的方法,隐含着等待时间无限长,如果没有信号,则一直等待,一直阻塞。
EventWaitHandle,Semaphore,和Mutex类都提供了静态的TryOpenExisting方法,并传递一个名称,以标记内核对象的名称。如果已经存在,则返回true,如果不存在则返回false。
/// initialState,初始状态,true信号通过,false信号不通过
/// mode,一个 EventResetMode 值,它确定是自动还是手动重置事件。
/// name,唯一标识
/// 是否创建成功
public EventWaitHandle(bool initialState, System.Threading.EventResetMode mode, string? name, out bool createdNew)
public static bool TryOpenExisting (string name, out System.Threading.EventWaitHandle? result);
/// initiallyOwned 是否标记所有权,true标记,否则false
public Mutex (bool initiallyOwned, string? name, out bool createdNew);
public static bool TryOpenExisting (string name, out System.Threading.Mutex? result);
public Semaphore (int initialCount, int maximumCount, string? name, out bool createdNew);
public static bool TryOpenExisting (string name, out System.Threading.Semaphore? result);
另外,在上面各个对象的构造函数中,也有类似的功能。
比如要实现只允许一个实例运行的应用程序,比如之前WPF单例应用程序的两种实现的这篇文章,可以用以上任意一个内核构造函数实现:
void Main()
{
bool createNew;
using (new Semaphore(0, 1, "SomeUniqueStringIdentifyingThisApp", out createNew)
{
if (createNew)
{
//该线程创建了该内核对象,所以没有其它实例正在运行
//这里执行应用程序的其余部分
}
else
{
//该线程打开了一个具有相同名称的,现有的内核对象
//表示肯定有一个实例正在运行
//所以这里可以做的事情,就是直接返回
//但是更好的办法是唤起之前的那个实例
}
}
}
Event构造
事件其实只是内核维护的Boolean变量,为false,在事件上等待的线程就阻塞;为true,就解除阻塞。有两种事件,自动重置事Auto
public class EventWaitHandle:WaitHandle
{
public Boolean Set();//将事件置为true,解除阻塞
public Boolean Reset();//重新将事件设置为false,继续阻塞等待线程
}
public sealed class AutoResetEvent:EventWaitHandle
{
public AutoResetEvent(Boolean initialState);
}
public sealed class ManualResetEvent : EventWaitHandle
{
public ManualResetEvent(Boolean initialState);
}
可以使用AutoResetEvent来创建线程同步锁,他的作用跟前面的SimpleSpinLock类似:
class SimpleWaitLock : IDisposable
{
private readonly AutoResetEvent m_available;
public SimpleWaitLock()
{
m_available = new AutoResetEvent(true);//最开始将事件置为true,不阻塞
}
public void Enter()
{
//因为m_availabe初始化为了true,可以自由使用,所以第一个线程调用Enter时,不会阻塞
//调用完成之后,会把事件重置为false,第二个线程进来时就会阻塞,需要等到第一个线程Leave
m_available.WaitOne();//在内核中阻塞,直到资源可用
}
public void Leave()
{
m_available.Set();//让另外一个线程可以访问资源。
}
public void Dispose()
{
m_available.Dispose();
}
}
跟之前的用户模式实现的SimpleSpinLock相比,这里的SimpleWaitLock性能截然不同。当锁上没有竞争时,SimpleWaitLock比SimpleSpinLock要慢的多,因为SimpleWaitLock内部是使用内核同步构造实现的。Enter和Leave方法每次调用都需要强迫线程从托管代码转为内核代码,然后再转回来。但在有竞争的时候,输掉的线程会在那里WaitOne阻塞,不会在哪里While死循环“自旋”,从而浪费CPU时间,这是好的地方。最后,调用AutoResetEvent的Dispose方法时,也会造成线程从托管向内核的转换,从而对性能产生负面影响。
线程同步能避免就避免,如果一定要进行线程同步,就尽量使用用户模式构造。内核模式构造要尽量避免。
Semaphore构造
信号量(semaphore)其实是由内核维护的Int32变量。信号量为0时,在信号量上等待的线程会阻塞;信号量大于0时,解除阻塞。在信号量上等待的线程解除阻塞时,内核自动从信号量的计数中减1。信号量还关联了一个最大的Int32值,使得当前的计数绝对不会超过该值。Semaphore的基本方法如下:
public sealed class Semaphore : WaitHandle
{
public Semaphore(int initialCount,int maximumCount);
public int Release();//调用Release(1);返回上一个计数
public int Release(int releaseCount);//返回上一个计数
}
可以总结一下三种内核模式同步构造的行为:
- 多个线程在一个AutoResetEvent上等待时(WaitOne),设置事件(调用Set方法)只会导致一个线程被解除阻塞
- 多个线程在一个ManualResetEvent上等待时(WaitOne),设置事件(调用Set方法)导致所有的线程被解除阻塞(除非手动Reset)
- 多个线程在一个Semaphore上等待时(WaitOne),释放信号量导致releaseCount(调用Release方法)个线程解除阻塞。
所以AutoResetEvent,在行为上和最大计数为1的Semaphore非常相似。两者的区别在于在一个AutoResetEvent上可以多次调用Set,同时仍然只有一个线程解除阻塞。相反,在一个Semaphore上连续多次调用Release,会导致内部计数器一直递增,这可以解除大量的线程阻塞。如果在一个信号量上多次调用Release,可能会导致计数超过最大计数,这是就会抛出异常。
可以使用信号量重新实现SimpleWaitLock,允许多个线程并发访问一个资源,如果所有线程是以只读的方式访问资源,那就是安全的。
public class SimpleWaitLock : IDisposable
{
private Semaphore m_available;
public SimpleWaitLock(int maxConcurrent)
{
//一开始就是初始化了maxCocurrent个信号量,所以前maxConcucurrent个
//线程调用WaitOne时,会直接收到信号,返回。然后信号量会减1
m_available = new Semaphore(maxConcurrent, maxConcurrent)
}
public void Enter()
{
//一直阻塞,直到资源可用,资源可用时,信号量-1
m_available.WaitOne();
}
public void Leave()
{
//释放信号量,让其他线程访问资源,信号量+1
m_available.Release();
}
public void Dispose()
{
m_available.Dispose();
}
}
MSDN上的例子更好的说明了信号量的使用方法:
private static Semaphore _pool;
private static int _padding;
static void Main()
{
_pool = new Semaphore(initialCount: 0, maximumCount: 5);
for (int i = 1; i <= 5; i++)
{
Thread t = new Thread(new ParameterizedThreadStart(Worker));
t.Start(i);
}
Thread.Sleep(500);
Console.WriteLine("Main thread call Release (3).");
_pool.Release(3);
Console.WriteLine("Main Thread wait.");
Console.ReadLine();
}
static void Worker(object obj)
{
Console.WriteLine($"Thread {obj} begins and waits for the semaphore");
_pool.WaitOne();
int padding = Interlocked.Add(ref _padding, 100);
Console.WriteLine($"Thread {obj} enters the semaphore.");
Thread.Sleep(1000 + padding);
Console.WriteLine($"Thread {obj} release the semaphore.");
Console.WriteLine($"Thread {obj} previouse semaphore count:{_pool.Release()}");
}
上面的代码中,在Main函数里,初始化了一个Semaphore,然后起始信号量为0,最大信号量为5。然后启动了5个线程,在每个线程里面等待信号量,因为此时信号量为0,所以这五个线程都阻塞。
待5个线程创建完成并都已经启动之后,Main函数释放了3个信号量。则会是的有3个线程被唤醒,3个线程唤醒之后,信号量变为了0,剩下的2个线程继续阻塞。当3个线程中的某个线程处理完成之后调用Release,释放信号量时,信号量会+1,此时剩下的2个阻塞线程其中之一会被唤醒。可以看到以上程序的输出结果如下:
Thread 1 begins and waits for the semaphore
Thread 2 begins and waits for the semaphore
Thread 3 begins and waits for the semaphore
Thread 4 begins and waits for the semaphore
Thread 5 begins and waits for the semaphore
Main thread call Release (3).
Main Thread wait.
Thread 1 enters the semaphore.
Thread 3 enters the semaphore.
Thread 2 enters the semaphore.
Thread 1 release the semaphore.
Thread 1 previouse semaphore count:0
Thread 4 enters the semaphore.
Thread 3 release the semaphore.
Thread 3 previouse semaphore count:0
Thread 5 enters the semaphore.
Thread 2 release the semaphore.
Thread 2 previouse semaphore count:0
Thread 4 release the semaphore.
Thread 4 previouse semaphore count:1
Thread 5 release the semaphore.
Thread 5 previouse semaphore count:2
Main函数释放3个信号量之后,线程1、3、2解除阻塞,线程4、5继续阻塞。线程1完成任务后,释放信号量,在释放之前信号量为0,释放信号量之后,信号量+1。此时线程4解除阻塞,信号量-1,再次变为0,线程5继续阻塞。线程3完成任务后,释放信号量,在释放信号量之前信号量为0,释放之后,信号量+1。此时线程5解除阻塞,信号-1,信号量再次变为0。
接下来,线程2完成任务,释放信号量,释放信号量之前信号量为0,释放之后信号量+1,此时没有被阻塞的线程在等待。紧接着线程4完成任务,释放信号量,释放信号量之前,信号量为1,释放之后信号量+1,为2。最后,线程5完成任务,释放信号量,释放信号量之前信号量为2,释放信号量+1,最终信号量为3。
Mutex构造
互斥体(mutex)代表一个互斥锁,它的工作方式跟AutoResetEvent或者信号为1的Semaphore类似,三者都是一次只释放一个正在等待的线程。
public sealed class Mutex : WaitHandle
{
public Mutex();
public void ReleaseMutex();
}
Mutex有一些额外的逻辑,是的它比其它的构造更复杂:
首先,Mutex对象会查询调用线程的ID(int类型),记录是那个线程获取了它。当一个线程调用ReleaseMutex的时候,Mutex会确保调用线程就是获取Mutex的那个线程,如果不是,Mutex对象的状态就不会改变,而且调用ReleaseMutex会抛出异常。另外,拥有Mutex的线程因为任何原因而终止,在Mutex上等待的线程就会因为抛出AbandonedMutexException异常而被唤醒。该异常通常为未处理异常,从而终止整个进程。
其次,Mutex维护着一个递归计数,惠济路拥有该Mutex的线程,拥有了它多少次。如果一个线程当前拥有一个Mutex,然后改线程再次在Mutex上等待,计数就会递增,且线程允许继续运行。线程调用ReleaseMutex会导致计数递减。只有计数变成0,另外一个线程才能成为该Mutex的拥有者。
这两个附加功能是有“代价的”,它需要更多的内存来容纳额外的线程ID和计数信息。而且Mutex必须维护这些信息,这使得锁变得更慢。如果应用程序需要这些额外的功能,应用程序可以自己实现,代码不一定要放到Mutex对象中。
通常,当一个方法获取了一个锁,然后调用也需要锁的另外一个方法,这时就需要用到递归锁,否则就会阻塞,如下代码所示:
public class SomeClass : IDisposable
{
private readonly Mutex m_lock = new Mutex();
public void Method1()
{
m_lock.WaitOne();
//TODO:随便做些事情
Mthod2();//Method2递归地获取锁
m_lock.ReleaseMutex();
}
void Mthod2()
{
m_lock.WaitOne();
//TODO:随便做些事情
m_lock.ReleaseMutex();
}
public void Dispose()
{
m_lock.Dispose();
}
}
可以看到,拥有SomeClass对象的代码可以调用Method1,它获取了Mutex,执行一些线程安全的操作,然后调用Method2,它也执行一些线程安全的操作。因为Mutex支持递归,所以线程会获取两次锁,然后释放两次。在此之后,另一个线程才能拥有Mutex。
如果SomeClass使用了不支持递归的锁,比如AutoResetEvent,那么Method1在内部调用Method2的时候,Method2里面的WaitOne方法就会阻塞,导致死锁。
如果需要递归锁,可以使用AutoResetEvent来创建一个,而不一定要用Mutex,因为它太“重”了:
public sealed class RecursiveAutoResetEvent : IDisposable
{
private AutoResetEvent m_lock = new AutoResetEvent(true);
private int m_owingThreadId = 0;
private int m_recursiveCount = 0;
public void Enter()
{
int currentThreadId = Thread.CurrentThread.ManagedThreadId;
//如果调用线程拥有锁,就递增计数器
if (m_owingThreadId == currentThreadId)
{
m_recursiveCount++;
return;
}
//调用线程不拥有锁,等待它。由于信号初始化为true,所以第一个线程能够立即获取信号
m_lock.WaitOne();
//调用线程拥有所,初始化拥有的线程ID和递归计数
m_owingThreadId = currentThreadId;
m_recursiveCount = 1;
}
public void Leave()
{
//如果调用线程不拥有锁,直接报错
if (m_owingThreadId != Thread.CurrentThread.ManagedThreadId)
{
throw new InvalidOperationException("当前线程不拥有锁,不能直接调用该方法");
}
if (--m_recursiveCount == 0)
{
//如果递归到0,表示没有线程拥有锁
m_owingThreadId = 0;
m_lock.Set();//唤醒另外一个等待的线程(如果有的话,没有的话也没关系,把信号重新置为true)
}
}
public void Dispose()
{
throw new NotImplementedException();
}
}
RecursiveAutoResetEvent类的行为跟Mutex类完全一样,但是当线程试图递归获取锁时,它的性能会好很多,因为现在追踪线程所有权和递归计数的都是托管代码,只有在第一次获取AutoResetEvent,或者最后把它放弃给其它线程时,线程才需要从托管代码转为内核代码。
总结
本文大部分内容来自《CLR Via C#》,作为这本书的读书笔记,这里总结了C#中的两大类基本的同步构造:用户同步构造和内核同步构造,并分析了它们各自的优缺点和使用场景以及注意事项,了解这些基本的同步构造类型对于写出高效率代码以及理解后续的混合同步构造至关重要。这里用表格整理出了基本同步构造的各种实现以及优缺点。
参考
- CLR via C#, Forth Edition
- https://learn.microsoft.com/en-us/dotnet/api/system.collections.immutable?view=net-6.0
- https://learn.microsoft.com/en-us/dotnet/api/system.threading.spinlock?view=net-6.0
- https://learn.microsoft.com/en-us/dotnet/api/system.threading.semaphore?view=net-6.0
- https://learn.microsoft.com/en-us/dotnet/api/system.threading.mutex?view=net-6.0
- https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.eventwaithandle?view=net-6.0
- https://www.yycoding.xyz/post/2022/4/14/two-method-of-implement-wpf-single-instance-application
- https://zhuanlan.zhihu.com/p/136031306