在前文浅谈内存映射文件中详细介绍了什么是内存映射文件(Memory-Mapped File, MMF),以及内存映射文件的用法。在其诸多应用中,很重要的一个就是进程间通讯(Inter-Process Communication,IPC)。内存映射文件允许将文件内容直接映射到进程的虚拟地址空间,使得应用程序能够像访问内存一样访问文件数据。但要正确和高效地使用内存映射文件进行进程间通讯则需要一些技巧,它需要处理多个进程对同一块内存区域进行高效的读写问题,这里就自然涉及到竞争和同步。
本文首先介绍了一个简单的一对一的使用MMF进行进程间单向通讯的例子,紧接着指出其中的低效,然后通过引入环形队列数据结构来分隔读和写,从而达到移除锁的使用,进而大幅提高效率。最后在此基础上介绍了一对一双向通讯,以及一对多通讯的实现思路。
一个简单的例子
下面这个例子演示了一个服务端和一个客户端的单向通讯,服务端的代码如下:
public class SharedMemoryServer : IDisposable
{
public const int BufferSize = 1024 * 1024;//1M共享内存大小
private readonly string _mapName;
private readonly MemoryMappedFile _mmf;
private readonly MemoryMappedViewAccessor _accessor;
private readonly Semaphore _semaphore; // 使用信号量替代 EventWaitHandle
private readonly Mutex _mutex;
public SharedMemoryServer(string mapName)
{
_mapName = mapName;
// 创建或打开共享内存
_mmf = MemoryMappedFile.CreateOrOpen(_mapName, BufferSize, MemoryMappedFileAccess.ReadWrite);
_accessor = _mmf.CreateViewAccessor();
_mutex = new Mutex(true, $"{_mapName}_Mutex", out bool createdNew);
if (createdNew)
{
_mutex.ReleaseMutex();
}
// 信号量用于通知消息数量
// 初始数量为0,最大数量不限
_semaphore = new Semaphore(0, int.MaxValue, mapName + "_Semaphore");
}
//向客户端写入数据
public void Write(byte[] data)
{
if (data.Length > BufferSize - 4)
{
throw new ArgumentException($"Data size exceeds the buffer size of {BufferSize} bytes.");
}
_mutex.WaitOne();
try
{
_accessor.Write(0, data.Length);
_accessor.WriteArray(4, data, 0, data.Length);
}
finally
{
_mutex.ReleaseMutex();
}
// 写入完成后,释放一个信号量,通知客户端有新消息
// 这会使信号量的计数加一
_semaphore.Release();
}
public void Dispose()
{
_accessor?.Dispose();
_mmf?.Dispose();
_semaphore?.Dispose();
_mutex?.Dispose();
}
}
SharedMemoryServer创建了一个MemoryMappedFile、一个MemoryMappedViewAccessor,以及一个Mutex和Semaphore信号量全局内核对象。它提供了一个Write方法,将一个二进制数组通过MemoryMappedViewAccessor写入到MMF对象中。写入时使用mutex保护,先写入数组大小,然后写入内容。写入完成之后触发Semaphore通知,客户端等待该信号量内核对象,如果有信号则表示有数据可以读取。
接下来是客户端代码:
public class SharedMemoryClient : IDisposable
{
private readonly string _mapName;
private readonly MemoryMappedFile _mmf;
private readonly MemoryMappedViewAccessor _accessor;
private readonly Semaphore _semaphore; // 使用信号量替代 EventWaitHandle
private readonly Mutex _mutex;
public SharedMemoryClient(string mapName)
{
_mapName = mapName;
// 打开共享内存
_mmf = MemoryMappedFile.OpenExisting(_mapName);
_accessor = _mmf.CreateViewAccessor();
_mutex = Mutex.OpenExisting($"{_mapName}_Mutex");
_semaphore = Semaphore.OpenExisting(mapName + "_Semaphore");
}
//从服务端读取数据
public byte[] Read(int timeout = Timeout.Infinite)
{
// 等待一个信号量。如果计数器大于0,则立即返回并使计数器减一。
// 如果计数器为0,则阻塞等待,直到服务端 Release。
if (!_semaphore.WaitOne(timeout))
{
return null; // 超时
}
_mutex.WaitOne();
try
{
int dataSize = _accessor.ReadInt32(0);
if (dataSize <= 0 || dataSize > SharedMemoryServer.BufferSize - 4)
{
throw new InvalidOperationException("Invalid data size read from shared memory.");
}
byte[] data = new byte[dataSize];
_accessor.ReadArray(4, data, 0, dataSize);
return data;
}
finally
{
_mutex.ReleaseMutex();
}
}
public void Dispose()
{
_accessor?.Dispose();
_mmf?.Dispose();
_semaphore?.Dispose();
_mutex?.Dispose();
}
}
客户端不再创建MMF、Mutex和Semaphore这些全局的内核对象,而是打开由服务端创建好的这些对象。所以这里要求服务端必须先启动,然后客户端才能打开成功。
客户端中有一个Read方法,专门用来读取服务端在MMF中写入的内容。Read方法首先等待信号量,如果有信号则进行读取,读取的时候也使用Mutex进行保护,因为这块内存客户端和服务端是可以同时访问的,所以需要排它。读取与写入相同,首先读取4个字节的消息长度,然后根据消息长度来读取消息内容。
这个里面最主要的问题是读和写不能同时进行,读的时候不能写,写的时候不能读,因为是跨进程,因此使用了全局锁。这极大地影响了性能。改进的方法就是使用之前介绍的环形队列。
基于环形队列的高效实现
使用环形队列的目的在于避免使用重量级的锁(如Mutex),转而使用其它方式来同步生产者和消费者的行为。它的核心思想是队列中有两个指针Head
和 Tail
,生产者写入数据后只更新Head
指针,消费者读取数据后只更新Tail
指针,这两个指针是逻辑指针,它们只会不断增加,不会对其归零或取余。
基于这一思想,共享内存布局被分为两部分:一个小的控制块 (Control Block) 和一个大的数据缓冲区 (Data Buffer),控制块中存储一些公共的信息,比如最重要的两个64位的整数:Head
和 Tail
指针,其它的也可以存储比如心跳时间,协定好的控制区的总大小(即创建MMF时,预订分配的内存大小),以及一些常量,比如消息头大小等。这些存储在以下常量类中:
public static class MmfConstants
{
// 头部布局:
// 0: Head (long, 8 bytes)
// 8: Tail (long, 8 bytes)
// 16: WriterTimestamp (long, 8 bytes, 由写入方更新)
public const long HeadOffset = 0;//下一个要写入的位置,在前
public const long TailOffset = 8;//目前已经读取的位置,在后
public const long WriterTimestampOffset = 16;//时间戳,心跳用
public const long CapacityOffset = 24; // MMF总大小字段
public const int HeaderSize = 32; //头部控制区大小,之后存储数据
public const int MessageLengthSize = sizeof(int);
}
下面来看写入端即生产者MmfRingBufferWriter的代码:
/// <summary>
/// MMF环形缓冲区的写入方 (Producer)。
/// 它现在包含一个后台心跳线程来表明自己处于活动状态。
/// </summary>
public sealed class MmfRingBufferWriter : IDisposable
{
private readonly MemoryMappedViewAccessor _accessor;
private readonly EventWaitHandle _dataWrittenEvent;
private readonly long _capacity;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private bool _isDisposed = false;
public MmfRingBufferWriter(MemoryMappedViewAccessor accessor, EventWaitHandle dataWrittenEvent, int heartbeatIntervalMs = 1000)
{
_accessor = accessor;
_dataWrittenEvent = dataWrittenEvent;
_capacity = _accessor.ReadInt64(MmfConstants.CapacityOffset);
// 启动心跳线程
Task.Run(() => HeartbeatLoop(heartbeatIntervalMs, _cts.Token));
}
private void HeartbeatLoop(int interval, CancellationToken token)
{
while (!token.IsCancellationRequested)
{
try
{
_accessor.Write(MmfConstants.WriterTimestampOffset, DateTime.UtcNow.Ticks);
token.WaitHandle.WaitOne(interval);
}
catch { break; }
}
}
public void Write(byte[] data)
{
var requiredSpace = data.Length + MmfConstants.MessageLengthSize;
if (requiredSpace > _capacity)
throw new ArgumentException("Message size exceeds buffer capacity.", nameof(data));
var spinWait = new SpinWait();
while (true)
{
// 读取无限增长的逻辑指针
var head = _accessor.ReadInt64(MmfConstants.HeadOffset);
var tail = _accessor.ReadInt64(MmfConstants.TailOffset);
// 基于指针的直接差值计算已用空间
var usedSpace = head - tail;
// 总容量减去已用空间,再减1(用于区分满/空),得到真实可用空间
var freeSpace = _capacity - usedSpace - 1;
if (freeSpace >= requiredSpace)
{
break; // 空间足够,跳出等待循环
}
spinWait.SpinOnce();
}
// 读取当前的逻辑 head 指针
var currentLogicalHead = _accessor.ReadInt64(MmfConstants.HeadOffset);
// 将长度和数据合并到一个包中,然后用分块拷贝的方式写入,保证原子性和正确性。
// 1. 在内存中准备好完整的消息包 (Length + Data)
byte[] lengthBytes = BitConverter.GetBytes(data.Length);
byte[] messagePacket = new byte[requiredSpace];
Buffer.BlockCopy(lengthBytes, 0, messagePacket, 0, lengthBytes.Length);
Buffer.BlockCopy(data, 0, messagePacket, lengthBytes.Length, data.Length);
// 2. 计算物理写入位置
long physicalStartPos = currentLogicalHead % _capacity;
long spaceToEnd = _capacity - physicalStartPos;
if (spaceToEnd >= requiredSpace)
{
// 消息足够在本行写完,无需环绕
_accessor.WriteArray(MmfConstants.HeaderSize + physicalStartPos, messagePacket, 0, requiredSpace);
}
else
{
// 消息需要环绕写入
int part1Length = (int)spaceToEnd;
int part2Length = requiredSpace - part1Length;
// 写入第一部分(到缓冲区末尾)
_accessor.WriteArray(MmfConstants.HeaderSize + physicalStartPos, messagePacket, 0, part1Length);
// 写入第二部分(从缓冲区开头)
_accessor.WriteArray(MmfConstants.HeaderSize, messagePacket, part1Length, part2Length);
}
// 3. 提交写入:推进逻辑头指针
_accessor.Write(MmfConstants.HeadOffset, currentLogicalHead + requiredSpace);
Console.WriteLine($"write HeadOffset:{currentLogicalHead + requiredSpace}");
_dataWrittenEvent.Set();
}
public void CancelHeartbeat()
{
_cts.Cancel();
}
/// <summary>
/// Stops the internal heartbeat thread.
/// </summary>
public void Dispose()
{
if (_isDisposed) return;
_isDisposed = true;
_cts.Cancel();
_cts.Dispose();
}
}
MmfRingBufferWriter里面的MemoryMappedViewAccessor和EventWaitHandle由构造函数传入,这是为了将MmfRingBufferWriter封装为服务端时方便对对象进行全局控制。在这里MmfRingBufferWriter添加了心跳控制,它会往控制区字段以特定频率写入当前的UTC时间,虽然理论上IPC通讯不会存在连接断开的情况,但使用心跳可以简化连接状态管理。另外,在构造函数中,从控制区的CapacityOffset读取了创建MMF时预设的空间大小_capacity,这个大小会用来对消息长度进行校验、判断消息是否需要环绕,以及根据逻辑指针来推断物理指针的位置。
MmfRingBufferWriter中的重点是Write方法:
- 首先将待写入的二进制数组的大小加上头部大小requiredSpace 与整个缓冲区大小进行对比,如果待写入的二进制数组过大,则直接返回。
- 紧接着,在while循环中,判断是否有足够的剩余缓冲区空间写入。判断方法是读取控制区的HeadOffset和TailOffset,使用HeadOffset-TailOffset即可判断已用空间。剩余空间就是整个缓冲区大小减去已用空间freeSpace=_capacity - usedSpace - 1,如果剩余空闲空间fressSpace大于需要的requiredSpace空间,则直接跳出while循环进行后续操作,否则使用SpinWait短暂等待,等待MmfRingBufferReader方法读取后增加TailOffset使得freeSpace变大。
- 经过以上两个步骤,确定写入空间足够之后,首先将待写入的二进制数据和该二进制的长度信息,打包成一个完整的消息包,消息长度在前内容在后(Length+Data)。
- 读取上一次写入后的逻辑指针位置HeadOffset,通过对缓冲区大小_capacity取模,得到物理指针待写入位置physicalStartPos。然后判断从待写入位置到缓冲区末尾的空间大小spaceToEnd=_capacity-physicalStartPos。
- 如果剩余空间spaceToEnd>=待写入的长度requiredSpace,表示可以一次性写入,不需要处理环绕。
- 否则,表示一部分消息长度spaceToEnd需要写到剩余空间,剩下的部分长度requiredSpace-spaceToEnd需要绕到缓冲区开头写入。
- 最后将最新的写入完成之后的逻辑指针:currentLogicalHead+requiredSpace,写回到HeadOffSet中,以方便下次继续写入。
- 最最后,在写入完成之后,触发一次_dataWrittenEvent消息,以通知读取端有新的数据被写入可以进行读取操作。
上述写入操作,关键部分在于判断待写入空间以及处理消息可能的环绕。
接下来就是读取端也就是消费端MmfRingBufferReader的代码:
/// <summary>
/// MMF环形缓冲区的读取方 (Consumer)。
/// 现在是事件驱动的,并能检测连接状态。
/// </summary>
public sealed class MmfRingBufferReader : IDisposable
{
public event Action<byte[]> MessageReceived;
public event Action Connected;
public event Action Disconnected;
private readonly MemoryMappedViewAccessor _accessor;
private readonly EventWaitHandle _dataWrittenEvent;
private readonly long _capacity;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly TimeSpan _heartbeatTimeout;
private bool _isDisposed = false;
public MmfRingBufferReader(MemoryMappedViewAccessor accessor, EventWaitHandle dataWrittenEvent, int heartbeatIntervalMs = 1000)
{
_accessor = accessor;
_dataWrittenEvent = dataWrittenEvent;
_capacity = _accessor.ReadInt64(MmfConstants.CapacityOffset);
// 超时时间设置为心跳间隔的2.5倍,提供容错空间
_heartbeatTimeout = TimeSpan.FromMilliseconds(heartbeatIntervalMs * 2.5);
// 启动监控线程
Task.Run(() => MonitorLoop(heartbeatIntervalMs, _cts.Token));
}
private void MonitorLoop(int interval, CancellationToken token)
{
bool wasConnected = false;
var waitHandles = new WaitHandle[] { _dataWrittenEvent, token.WaitHandle };
while (!token.IsCancellationRequested)
{
// 等待新数据或超时
WaitHandle.WaitAny(waitHandles, interval);
// 1. 检查连接状态
var writerTicks = _accessor.ReadInt64(MmfConstants.WriterTimestampOffset);
var isCurrentlyConnected = (writerTicks > 0) && (DateTime.UtcNow - new DateTime(writerTicks) < _heartbeatTimeout);
if (isCurrentlyConnected && !wasConnected)
{
wasConnected = true;
Connected?.Invoke();
}
else if (!isCurrentlyConnected && wasConnected)
{
wasConnected = false;
Disconnected?.Invoke();
}
// 2. 如果连接着,就读取所有可用数据
if (isCurrentlyConnected)
{
while (TryRead(out byte[] message))
{
MessageReceived?.Invoke(message);
}
}
}
// 循环结束后,如果之前是连接状态,触发最后一次断开事件
if (wasConnected) Disconnected?.Invoke();
}
private bool TryRead(out byte[] message)
{
message = null;
var head = _accessor.ReadInt64(MmfConstants.HeadOffset);
var tail = _accessor.ReadInt64(MmfConstants.TailOffset);
// 在“无限增长指针”模式下,判空条件依然是 head == tail
if (head == tail)
return false;
var currentLogicalTail = tail;
// 对称地,小心地分块读取长度,然后再分块读取数据体。
// 1. 读取4字节的消息长度前缀 (必须处理其本身环绕的可能性)
byte[] lengthBytes = new byte[MmfConstants.MessageLengthSize];
long physicalStartPos = currentLogicalTail % _capacity;
long spaceToEnd = _capacity - physicalStartPos;
if (spaceToEnd >= MmfConstants.MessageLengthSize)
{
_accessor.ReadArray(MmfConstants.HeaderSize + physicalStartPos, lengthBytes, 0, MmfConstants.MessageLengthSize);
}
else
{
int part1Length = (int)spaceToEnd;
_accessor.ReadArray(MmfConstants.HeaderSize + physicalStartPos, lengthBytes, 0, part1Length);
_accessor.ReadArray(MmfConstants.HeaderSize, lengthBytes, part1Length, MmfConstants.MessageLengthSize - part1Length);
}
var messageLength = BitConverter.ToInt32(lengthBytes, 0);
var requiredSpace = messageLength + MmfConstants.MessageLengthSize;
// 防护性检查
if (messageLength <= 0 || requiredSpace > _capacity - 1) return false;
if ((head - tail) < requiredSpace) return false;
// 2. 读取消息体
message = new byte[messageLength];
long bodyStartPos = (currentLogicalTail + MmfConstants.MessageLengthSize) % _capacity;
spaceToEnd = _capacity - bodyStartPos;
if (spaceToEnd >= messageLength)
{
_accessor.ReadArray(MmfConstants.HeaderSize + bodyStartPos, message, 0, messageLength);
}
else
{
int part1Length = (int)spaceToEnd;
_accessor.ReadArray(MmfConstants.HeaderSize + bodyStartPos, message, 0, part1Length);
_accessor.ReadArray(MmfConstants.HeaderSize, message, part1Length, messageLength - part1Length);
}
// 3. 提交读取:推进逻辑尾指针
_accessor.Write(MmfConstants.TailOffset, currentLogicalTail + requiredSpace);
return true;
}
public byte[] Read(int timeout = -1)
{
// 1. 尝试立即读取一次。这是最常见的快速路径。
if (TryRead(out byte[] message))
{
return message;
}
// 2. 如果没读到,进入一个循环等待模式。
// 这样可以确保即使我们被信号唤醒,也只会返回一条消息,
// 剩余的消息将由下一次对 Read() 的调用来处理。
var stopwatch = timeout > -1 ? System.Diagnostics.Stopwatch.StartNew() : null;
while (true)
{
int waitTime = timeout;
if (stopwatch != null)
{
var elapsed = (int)stopwatch.ElapsedMilliseconds;
if (elapsed >= timeout) return null; // 已经超时
waitTime = timeout - elapsed;
}
// 等待新数据信号
if (!_dataWrittenEvent.WaitOne(waitTime))
{
return null; // 等待超时
}
// 被唤醒后,再次尝试读取。如果成功,立即返回。
// 如果因为多条消息的“合并”信号而没读到,循环会继续等待。
if (TryRead(out message))
{
return message;
}
// 如果被唤醒但没读到数据(极小概率,比如另一个Reader进程抢先了),
// 循环会继续,重新计算超时并等待。
}
}
/// <summary>
/// Stops the internal monitoring thread.
/// </summary>
public void Dispose()
{
if (_isDisposed) return;
_isDisposed = true;
_cts.Cancel();
//_monitorThread?.Join(500);
_cts.Dispose();
}
}
与MmfRingBufferWriter一样,这里也是在构造函数里面接收MemoryMappedViewAccessor和EventWaitHandle对象,这两个对象理论上与创建MmfRingBufferWriter是相同的对象。
在构造函数中,同样从控制区的CapacityOffset读取创建MMF时预设的空间大小_capacity。另外也启动一个监控线程,判断心跳以及读取数据,在MonitorLoop代码中:
- 在while循环中,等待_dataWrittenEvent或CancellationToken取消,或者超时interval,三者只要有一个触发就开始处理。
- 首先检查连接状态,方法是读取在控制区里面的生产者写入的心跳时间,如果该时间与本地时间差距在心跳间隔范围之内,则认为服务端还在活动,连接依然有效。否则则认为生产者已经不处于活动状态,连接已经断开。
- 如果当前处于连接状态,则开始在while循环中尝试进行读取操作,一直到没有消息读取时退出。
TryRead方法是MmfRingBufferReader的核心,它的操作逻辑和MmfRingBufferWriter的Write方法类似:
- 首先读取控制区内部的上一次写入后的指针位置HeadOffset和上一次读取后的指针位置TailOffset,如果两者相等,则表示数据已经读取完毕,直接返回。
- 记下上次读取后的逻辑指针地址currentLogicalTail即TailOffset。计算当前待读取指针的物理指针位置physicalStartPos即currentLogicalTail对_capacity进行取模操作。
- 现在要先读取4个字节即MmfConstants.MessageLengthSize长度的内容,这4个字节存储了消息的长度。读取消息长度时,需要考虑消息环绕的可能。
- 判断当前待读取位置距离缓冲区末尾的空间大小spaceToEnd=_capacity-physicalStartPos。
- 如果剩余空间>=4个字节的消息长度,则表示消息长度可以一次取完。否则,要先去读一部分spaceToEnd长度的数据,然后跳转到开头读取剩余的4-spaceToEnd长度的内容。
- 4个字节读取完成之后,读取messageLength长度。
- 读取完消息长度之后,计算需要的空间即消息长度和4字节头部长度requiredSpace= messageLength +4,然后进行一些基本的判断。
- 继续读取消息体,现在消息体的物理起始位置bodyStartPos应该是currentLogicalTail+4字节然后对_capacity进行去模。接下来同样要去里消息体可能的环绕问题
- 判断物理其实位置距离缓冲区末尾的空间spaceToEnd=_capacity - bodyStartPos,如果spaceToEnd大于messageLength,则表示当前剩余的空间足以容纳下整个消息体,直接读取。
- 否则要分两部分进行消息体的读取,即首先从bodyStartPos读取spaceToEnd长度的字节。然后转到开头读取剩余的messageLength-spaceToEnd长度的字节。
- 最后将本次读取后的指针位置currentLogicalTail+requiredSpace写回到TailOffset,供下次读取时使用。
这就是一个完整的基于MMF的一对一的高性能的基于环形队列的生产者消费者模型的实现。
这里面有重要的改进就是,使用环形队列在单生产者单消费者(SPSC)模型下,读写根本不需要加锁,它不会造成导致数据状态出错的竟态,这个设计的安全性来源于几个技巧,可以使得能够在无锁的情况下安全运行:
核心:指针的单一写入权
这是最重要的保证,在这个模型中,严格遵守:
- Head 指针:永远只由写入方(Writer)进行修改。
- Tail 指针:永远只由读取方(Reader)进行修改。
这就从根本上杜绝了最危险的“写-写冲突”。绝对不会出现两个进程同时去修改同一个指针而导致其值被破坏的情况。
“读-写冲突”的安全处理
当Writer在读取Tail的同时,Reader可能正在修改它(反之亦然)。这种情况确实会发生,但目前的设计可以安全地处理这种“过时读取”(Stale Read)的情况:
写入方(Writer)的视角:
- Writer 在 while 循环中读取 tail 的值来计算空闲空间。
- 假设它读到的 tail 值是1000。
- 就在此时,Reader 进程运行,读取了一条消息,并将共享内存中的 tail 值更新为1100。
- Writer 进程恢复运行,它仍然拿着那个“过时”的 tail 值(1000)进行计算。
- 结果会怎样? Writer 会认为空闲空间比实际上更小。这是一种安全的错误。它最多只会导致 Writer 多等待一个SpinOnce周期,在下一轮循环中它就会读到最新的 tail 值(1100)。它永远不会因为读到旧的tail值而错误地认为某个位置是空闲的,从而覆盖掉未读的数据。
读取方(Reader)的视角:
- Reader 在 TryRead 中读取 head 的值来判断有多少数据可读。
- 假设它读到的 head 值是2000。
- 就在此时,Writer 进程运行,写入了一条新消息,并将 head 更新为2100。
- Reader 进程恢复运行,它仍然用那个“过时”的 head 值(2000)来判断。
- 结果会怎样? Reader 会认为可读的数据比实际上更少。这也是一种安全的错误。它会正确地读取直到 head 为2000之前的所有数据。对于那条新写入的消息(从2000到2100),它会在下一次 MonitorLoop 循环中再去读取。它永远不会读到 Writer 尚未完全“提交”(即更新head指针)的数据。
64位读写的原子性
在现代的64位CPU架构上,对齐的64位整数(long)的读写操作本身就是原子的。这意味着当Writer在写入一个新的head值(例如从1000变为1100)时,Reader绝不会读到一个“一半是旧值,一半是新值”的损坏数据(例如1050)。它要么读到完整的旧值1000,要么读到完整的新值1100。这保证了指针本身的数据完整性。
所以,通过“单一写入权”和对“过时读取”的安全处理,这个SPSC环形缓冲区设计是线程安全和进程安全的,无需使用性能开销更大的锁(lock或Mutex)。
另一种实现
上述对MMF的操作都是使用.NET里面包装好的类,MMF也提供了对原始指针的获取和操作,这就要求在.NET中使用unsafe代码,另外一种MMF的实现就是编写一个类,里面包含有对MMF的读和写,只不过控制头是一个struct对象指针,思路和上述一致,也是使用环形队列来去除读写锁的使用,代码如下:
/// <summary>
/// A lock-free, single-producer, single-consumer circular buffer for IPC using a Memory-Mapped File.
/// This implementation handles message wrap-around.
/// </summary>
public sealed unsafe class LockFreeCircularBuffer : IDisposable
{
[StructLayout(LayoutKind.Explicit)]
private struct Header
{
[FieldOffset(0)]
public int Capacity;
[FieldOffset(4)]
public int Head; // Read position
[FieldOffset(8)]
public int Tail; // Write position
}
private readonly MemoryMappedFile _mmf;
private readonly MemoryMappedViewAccessor _accessor;
private readonly EventWaitHandle _dataAvailableEvent;
private readonly int _capacity;
private readonly int _bufferSize;
private readonly int _headerSize;
public WaitHandle ReadWaitHandle => _dataAvailableEvent;
public LockFreeCircularBuffer(string name, int capacity = 1024 * 1024)
{
_capacity = capacity;
_bufferSize = capacity;
_headerSize = Marshal.SizeOf(typeof(Header));
long totalSize = _headerSize + _bufferSize;
_mmf = MemoryMappedFile.CreateOrOpen(name, totalSize);
_accessor = _mmf.CreateViewAccessor();
_dataAvailableEvent = new EventWaitHandle(false, EventResetMode.AutoReset, name + "_evt");
if (IsCreator())
{
var header = new Header { Capacity = _capacity, Head = 0, Tail = 0 };
_accessor.Write(0, ref header);
}
else
{
Header header;
_accessor.Read(0, out header);
if (header.Capacity != _capacity)
{
throw new InvalidOperationException("Circular buffer capacity does not match existing instance.");
}
}
}
private bool IsCreator()
{
return _accessor.ReadInt32(0) == 0;
}
public bool TryWrite(byte[] data)
{
int head;
int tail;
int nextTail;
Header* header = null;
try
{
header = GetHeaderPointer();
head = Thread.VolatileRead(ref header->Head);
tail = Thread.VolatileRead(ref header->Tail);
int messageLength = data.Length;
int requiredSpace = messageLength + 4; // +4 for length prefix
int freeSpace;
if (head > tail)
freeSpace = head - tail - 1;
else
freeSpace = _capacity - tail + head - 1;
//int freeSpace = (head - tail - 1 + _capacity) % _capacity;
if (freeSpace < requiredSpace) // +4 for length prefix
{
return false; // Not enough space
}
// Write the message length
byte[] lengthBytes = BitConverter.GetBytes(messageLength);
WriteBytes(tail, lengthBytes);
nextTail = (tail + 4) % _capacity;
// Write the message data, handling wrap-around
WriteBytes(nextTail, data);
nextTail = (nextTail + messageLength) % _capacity;
// Atomically update the tail pointer
Interlocked.Exchange(ref header->Tail, nextTail);
}
finally
{
if (header != null)
_accessor.SafeMemoryMappedViewHandle.ReleasePointer();
}
_dataAvailableEvent.Set();
return true;
}
public bool TryRead(out byte[] data)
{
data = null;
int head;
int tail;
int nextHead;
Header* header = null;
try
{
header = GetHeaderPointer();
head = Thread.VolatileRead(ref header->Head);
tail = Thread.VolatileRead(ref header->Tail);
if (head == tail)
{
return false; // Buffer is empty
}
// Read message length
byte[] lengthBytes = ReadBytes(head, 4);
int messageLength = BitConverter.ToInt32(lengthBytes, 0);
nextHead = (head + 4) % _capacity;
// Read message data, handling wrap-around
data = ReadBytes(nextHead, messageLength);
nextHead = (nextHead + messageLength) % _capacity;
// Atomically update the head pointer
Interlocked.Exchange(ref header->Head, nextHead);
}
finally
{
if (header != null)
_accessor.SafeMemoryMappedViewHandle.ReleasePointer();
}
return true;
}
private void WriteBytes(int position, byte[] data)
{
int firstChunkSize = Math.Min(data.Length, _capacity - position);
_accessor.WriteArray(_headerSize + position, data, 0, firstChunkSize);
if (firstChunkSize < data.Length)
{
int remaining = data.Length - firstChunkSize;
_accessor.WriteArray(_headerSize, data, firstChunkSize, remaining);
}
}
private byte[] ReadBytes(int position, int count)
{
byte[] buffer = new byte[count];
int firstChunkSize = Math.Min(count, _capacity - position);
_accessor.ReadArray(_headerSize + position, buffer, 0, firstChunkSize);
if (firstChunkSize < count)
{
int remaining = count - firstChunkSize;
_accessor.ReadArray(_headerSize, buffer, firstChunkSize, remaining);
}
return buffer;
}
private unsafe Header* GetHeaderPointer()
{
byte* ptr = null;
_accessor.SafeMemoryMappedViewHandle.AcquirePointer(ref ptr);
if (ptr == null)
{
return null;
}
return (Header*)ptr;
}
public void Dispose()
{
_dataAvailableEvent?.Close();
_accessor?.Dispose();
_mmf?.Dispose();
}
}
实现一对一双向通讯
有了上述对MMF的读和写的基本操作,就能很方便的实现一对一的双向通讯。其主要思路是,服务端和客户端各维护一个单向向对方发送数据的通道。即服务端和客户端互为客户端服务端。为了演示,这里还是简单区分服务端和客户端。
先看服务端代码:
public sealed class MmfIpcServer : IDisposable
{
public event Action<byte[]> MessageReceived;
public event Action ClientConnected;
public event Action ClientDisconnected;
private readonly string _s2cChannelName;
private readonly string _c2sChannelName;
private readonly int _bufferCapacity;
private MemoryMappedFile _s2cMmf, _c2sMmf;
private EventWaitHandle _s2cEvent, _c2sEvent;
private MmfRingBufferWriter _s2cWriter;
private MmfRingBufferReader _c2sReader;
private bool _isClientConnected = false;
private bool _isDisposed = false;
public bool IsClientConnected => _isClientConnected;
public MmfIpcServer(string serverToClientChannelName, string clientToServerChannelName, int capacity)
{
_s2cChannelName = serverToClientChannelName;
_c2sChannelName = clientToServerChannelName;
_bufferCapacity = capacity;
}
/// <summary>
/// 启动服务端,创建通道并开始监听。
/// </summary>
public void Start()
{
_s2cMmf = MemoryMappedFile.CreateOrOpen(_s2cChannelName, MmfConstants.HeaderSize + _bufferCapacity);
_c2sMmf = MemoryMappedFile.CreateOrOpen(_c2sChannelName, MmfConstants.HeaderSize + _bufferCapacity);
_s2cEvent = MmfUtils.CreateOrOpenEvent(_s2cChannelName, isCreator: true);
_c2sEvent = MmfUtils.CreateOrOpenEvent(_c2sChannelName, isCreator: true);
using (var s2cAccessor = _s2cMmf.CreateViewAccessor())
using (var c2sAccessor = _c2sMmf.CreateViewAccessor())
{
s2cAccessor.Write(MmfConstants.CapacityOffset, (long)_bufferCapacity);
c2sAccessor.Write(MmfConstants.CapacityOffset, (long)_bufferCapacity);
s2cAccessor.Write(MmfConstants.HeadOffset, 0L);
s2cAccessor.Write(MmfConstants.TailOffset, 0L);
c2sAccessor.Write(MmfConstants.HeadOffset, 0L);
c2sAccessor.Write(MmfConstants.TailOffset, 0L);
}
_s2cWriter = new MmfRingBufferWriter(_s2cMmf.CreateViewAccessor(), _s2cEvent);
_c2sReader = new MmfRingBufferReader(_c2sMmf.CreateViewAccessor(), _c2sEvent);
_c2sReader.Connected += OnClientConnected;
_c2sReader.Disconnected += OnClientDisconnected;
_c2sReader.MessageReceived += OnMessageReceived;
Console.WriteLine("[Server] Service started and listening for clients.");
}
/// <summary>
/// 停止服务端并释放所有资源。
/// </summary>
public void Stop()
{
Dispose();
}
/// <summary>
/// 向已连接的客户端发送消息。
/// </summary>
/// <param name="data">要发送的数据。</param>
/// <returns>如果发送成功则为 true,如果没有客户端连接则为 false。</returns>
public bool Send(byte[] data)
{
if (!IsClientConnected) return false;
try { _s2cWriter.Write(data); return true; }
catch { return false; }
}
private void OnClientConnected()
{
_isClientConnected = true;
ClientConnected?.Invoke();
}
private void OnClientDisconnected()
{
_isClientConnected = false;
ClientDisconnected?.Invoke();
}
private void OnMessageReceived(byte[] data)
{
MessageReceived?.Invoke(data);
}
public void Dispose()
{
if (_isDisposed) return;
_isDisposed = true;
_c2sReader?.Dispose();
_s2cWriter?.Dispose();
_c2sEvent?.Dispose();
_s2cEvent?.Dispose();
_c2sMmf?.Dispose();
_s2cMmf?.Dispose();
Console.WriteLine("[Server] Service stopped.");
}
}
可以看到,服务端MmfIpcServer创建了两个MMF对象,分别是用来给客户端发送消息的_s2cMmf(即写入的MMF)和_s2cEvent(即服务端写入之后通知客户端的EventWaitHandle),以及一个读取客户端发送消息的_c2sMmf(即读取的MMF)和_c2sEvent(即客户端写入之后通知服务端的EventWaitHandle)。服务端负责所有这些对象的创建,因为如果客户端负责创建_c2sMmf或_c2sEvent,那么在某些频繁关闭开启的场景下,上一次可能内核对象还没有来得及关闭,再次开启时可能会报错。且服务端一般都是宿主程序,它的生命周期与应用程序的生命周期是一致的,所以由服务端创建这些内核对象比较合适。客户端只负责尝试打开这些已经创建好的对象,如果打开报错,则表示服务端还没有启动好。
服务端创建好MMF对象之后,还要对两个MMF对象的控制头部信息使用各自的MemoryMappedViewAccessor进行初始化,其中最重要的是初始化MMF时设置的预分配大小。在上面的代码中,假定的是两个MMF的大小是相等的,实际上在很多情况下,如果主要是由服务端给客户端发送信息,客户端只是给服务端发送请求。那么服务端到客户端的这个MMF可以把缓冲区设置的更大一些,客户端到服务端的这个MMF可以设置的稍微小一些。
初始化完成底层的MMF之后,就可以用MMF的MemoryMappedViewAccessor以及EventWaitHandle来创建上述封装的Read和Write对象,对于服务端来说,需要创建一个类型为MmfRingBufferWriter的_s2cWriter对象,对这个对象执行写入操作,就相当于给客户端发送消息。还需要一个类型为MmfRingBufferReader的_c2sReader对象,对这个对象进行读操作,就相当于读取读取客户端发送过来的消息。这里的读取操作使用的是事件回调,所以注册了MmfRingBufferReader的连接状态事件和新消息事件。
MmfIpcServer对象还定义了一个Send方法,它实际上就是调用_s2cWriter对象的Write方法。
接下来看客户端对象:
public sealed class MmfIpcClient : IDisposable
{
public event Action<byte[]> MessageReceived;
public event Action Connected;
public event Action Disconnected;
private readonly string _s2cChannelName;
private readonly string _c2sChannelName;
private readonly int _reconnectIntervalMs = 3000;
// 使用 volatile 确保多线程的可见性
private volatile MmfRingBufferWriter _c2sWriter;
private CancellationTokenSource _mainCts;
private Task _connectLoopTask;
public bool IsConnected => _c2sWriter != null;
public MmfIpcClient(string serverToClientChannelName, string clientToServerChannelName)
{
_s2cChannelName = serverToClientChannelName;
_c2sChannelName = clientToServerChannelName;
}
/// <summary>
/// 启动客户端,开始连接和自动重连循环。
/// </summary>
public void Start()
{
_mainCts = new CancellationTokenSource();
_connectLoopTask = Task.Run(() => ClientConnectionLoop(_mainCts.Token), _mainCts.Token);
}
/// <summary>
/// 停止客户端,断开连接并停止重连。
/// </summary>
public void Stop()
{
Dispose();
}
/// <summary>
/// 向服务端发送消息。
/// </summary>
/// <param name="data">要发送的数据。</param>
/// <returns>如果发送成功则为 true,如果当前未连接则为 false。</returns>
public bool Send(byte[] data)
{
// 从 volatile 字段读取到本地变量,进行线程安全的访问
var writer = _c2sWriter;
if (writer == null) return false;
try { writer.Write(data); return true; }
catch { return false; }
}
private void ClientConnectionLoop(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
MmfRingBufferReader s2cReader = null;
MmfRingBufferWriter c2sWriter = null;
try
{
Console.WriteLine("[Client] Attempting to connect...");
var s2cMmf = MemoryMappedFile.OpenExisting(_s2cChannelName);
var c2sMmf = MemoryMappedFile.OpenExisting(_c2sChannelName);
var s2cEvent = MmfUtils.CreateOrOpenEvent(_s2cChannelName, isCreator: false);
var c2sEvent = MmfUtils.CreateOrOpenEvent(_c2sChannelName, isCreator: false);
s2cReader = new MmfRingBufferReader(s2cMmf.CreateViewAccessor(), s2cEvent);
c2sWriter = new MmfRingBufferWriter(c2sMmf.CreateViewAccessor(), c2sEvent);
_c2sWriter = c2sWriter;
var connectedSignal = new ManualResetEventSlim(false);
var disconnectedSignal = new ManualResetEventSlim(false);
s2cReader.Connected += connectedSignal.Set;
s2cReader.Disconnected += disconnectedSignal.Set;
s2cReader.MessageReceived += (data) => MessageReceived?.Invoke(data);
if (!connectedSignal.Wait(5000, token)) continue;
Connected?.Invoke();
// 阻塞在这里,直到断开或程序退出
WaitHandle.WaitAny(new[] { disconnectedSignal.WaitHandle, token.WaitHandle });
}
catch (Exception)
{
// 连接失败(例如服务端未启动),将继续循环重试
}
finally
{
// 确保在任何情况下断开连接时,共享字段都被清空
if (_c2sWriter != null)
{
_c2sWriter = null;
Disconnected?.Invoke();
}
c2sWriter?.Dispose();
s2cReader?.Dispose();
}
if (!token.IsCancellationRequested)
{
token.WaitHandle.WaitOne(_reconnectIntervalMs);
}
}
}
public void Dispose()
{
_mainCts?.Cancel();
try { _connectLoopTask?.Wait(2000); } catch { }
_mainCts?.Dispose();
}
}
MmfIpcClient对象的构造函数接受两个参数,这两个参数就是服务端写入数据的MMF名称serverToClientChannelName和客户端写入数据的MMF名称clientToServerChannelName。一般情况下,这个名称是服务端和客户端进行通讯时约定好的。另外为了减少名称的传递,约定对应的EventWaitHandle名称为MMF的名称后面加_Event后缀,创建EventWaitHandle被包装到一个工具方法里:
public static EventWaitHandle CreateOrOpenEvent(string channelName, bool isCreator)
{
string eventName = channelName + "_Event";
var eventSecurity = new EventWaitHandleSecurity();
eventSecurity.AddAccessRule(new EventWaitHandleAccessRule(new SecurityIdentifier(WellKnownSidType.WorldSid, null), EventWaitHandleRights.FullControl, AccessControlType.Allow));
if (isCreator)
{
return new EventWaitHandle(false, EventResetMode.AutoReset, eventName, out _, eventSecurity);
}
else
{
return EventWaitHandle.OpenExisting(eventName);
}
}
MmfIpcClient中提供了一个Start方法,在Start方法内部开启了一个Task任务,任务的主体是一个While循环ClientConnectionLoop。在循环体内首先尝试打开2个MMF内核对象和2个EventWaitHandle内核对象,如果打开失败,这表示服务端没有启动,过一个指定时间段之后再次重试。
这些内核对象打开成功之后,创建对应的客户端写入的对象,即类型为MmfRingBufferWriter的c2sWriter变量和一个读取服务端写入的MMF对象,即类型为MmfRingBufferReader的s2cReader对象,同样地这里读取采用事件驱动,即只需要注册s2cReader对应的连接状态变更事件和新消息到来事件。完成之后,任务阻塞,直到收到连接断开事件,或者手动调用CancellationToken的Cancel方法取消。
MmfIpcClient也提供Send方法,该方法用来给服务端发送消息,它在内部就是调用c2sWriter的Write方法。可以将MmfIpcClient和MmfIpcServer进行对比,可以看到,设计非常简洁明了。
另一种实现
如果使用前面所述的unsafe代码LockFreeCircularBuffer,思路也类似就是创建两个LockFreeCircularBuffer对象,一个用来服务端向客户端发送数据,一个用来客户端向服务端发送数据。这里不详述,直接上代码,MmfIpcServer代码如下:
public class MmfIpcServer : IDisposable
{
private readonly string _channelName;
private readonly BlockingCollection<byte[]> _receiveQueue = new BlockingCollection<byte[]>();
private CancellationTokenSource _cancellationTokenSource;
private readonly ILogger _logger;
private LockFreeCircularBuffer _receiveBuffer; // Client -> Server
private LockFreeCircularBuffer _sendBuffer; // Server -> Client
public event EventHandler<byte[]> MessageReceived;
public event EventHandler<ConnectionChangedEventArgs> ClientConnectionChanged;
public MmfIpcServer(string channelName, ILogger logger = null)
{
_channelName = channelName;
_logger = logger ?? new ConsoleLogger();
}
public void Start()
{
_cancellationTokenSource = new CancellationTokenSource();
_receiveBuffer = new LockFreeCircularBuffer(_channelName + "_C2S");
_sendBuffer = new LockFreeCircularBuffer(_channelName + "_S2C");
Task.Run(() => ReadLoop(_cancellationTokenSource.Token));
_logger.Info("MMF Server started.");
OnClientConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Connected));
}
public void StartMessageListener()
{
Task.Run(() =>
{
foreach (var message in _receiveQueue.GetConsumingEnumerable(_cancellationTokenSource.Token))
{
OnMessageReceived(message);
}
});
}
private void ReadLoop(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
try
{
if (_receiveBuffer.ReadWaitHandle.WaitOne(1000))
{
byte[] buffer;
while (_receiveBuffer.TryRead(out buffer) && !token.IsCancellationRequested)
{
//var json = Encoding.UTF8.GetString(buffer);
//var message = JsonConvert.DeserializeObject<T>(json);
if (buffer != null && buffer.Length > 0) _receiveQueue.Add(buffer, token);
}
}
}
catch (OperationCanceledException) { break; }
catch (Exception ex) { _logger.Error("MMF Server Read Error: ", ex); }
}
_receiveQueue.CompleteAdding();
}
public byte[] Read() => _receiveQueue.Take();
public Task<byte[]> ReadAsync() => Task.Run(() => _receiveQueue.Take());
private void WriteInternal(byte[] message)
{
if (!_sendBuffer.TryWrite(message))
{
_logger.Warn("MMF send buffer is full. Message dropped.");
}
}
public void Write(byte[] message) => WriteInternal(message);
public Task WriteAsync(byte[] message) => Task.Run(() => WriteInternal(message));
protected virtual void OnMessageReceived(byte[] e) => MessageReceived?.Invoke(this, e);
protected virtual void OnClientConnectionChanged(ConnectionChangedEventArgs e) => ClientConnectionChanged?.Invoke(this, e);
public void Stop()
{
_logger.Info("Stopping MMF server...");
_cancellationTokenSource?.Cancel();
OnClientConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Disconnected));
}
public void Dispose()
{
Stop();
_receiveBuffer?.Dispose();
_sendBuffer?.Dispose();
_receiveQueue.Dispose();
_cancellationTokenSource?.Dispose();
}
}
客户端MmfIpcClient代码如下:
public class MmfIpcClient : IDisposable
{
private readonly string _channelName;
private readonly BlockingCollection<byte[]> _receiveQueue = new BlockingCollection<byte[]>();
private CancellationTokenSource _cancellationTokenSource;
private readonly ILogger _logger;
private LockFreeCircularBuffer _sendBuffer;
private LockFreeCircularBuffer _receiveBuffer;
public bool AutoReconnect { get; set; } = true;
public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);
public event EventHandler<byte[]> MessageReceived;
public event EventHandler<ConnectionChangedEventArgs> ConnectionChanged;
public MmfIpcClient(string channelName, ILogger logger = null)
{
_channelName = channelName;
_logger = logger ?? new ConsoleLogger();
}
public async Task ConnectAsync()
{
_cancellationTokenSource = new CancellationTokenSource();
await AttemptConnectionAsync(_cancellationTokenSource.Token);
}
private async Task AttemptConnectionAsync(CancellationToken token)
{
OnConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Connecting));
while (!token.IsCancellationRequested)
{
try
{
_sendBuffer = new LockFreeCircularBuffer(_channelName + "_C2S");
_receiveBuffer = new LockFreeCircularBuffer(_channelName + "_S2C");
OnConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Connected));
Task.Run(() => ReadLoop(token), token);
return;
}
catch (Exception ex)
{
_logger.Error("Failed to connect to MMF server.", ex);
HandleDisconnection(false);
if (!AutoReconnect) break;
_logger.Info(string.Format("Will attempt to reconnect in {0} seconds...", ReconnectInterval.TotalSeconds));
await Task.Delay(ReconnectInterval, token);
}
}
}
private void HandleDisconnection(bool triggerReconnect)
{
OnConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Disconnected));
_sendBuffer?.Dispose();
_receiveBuffer?.Dispose();
if (AutoReconnect && triggerReconnect && !_cancellationTokenSource.IsCancellationRequested)
{
Task.Run(() => AttemptConnectionAsync(_cancellationTokenSource.Token));
}
}
private void ReadLoop(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
try
{
if (_receiveBuffer.ReadWaitHandle.WaitOne(1000))
{
byte[] buffer;
while (_receiveBuffer.TryRead(out buffer) && !token.IsCancellationRequested)
{
//var json = Encoding.UTF8.GetString(buffer);
//var message = JsonConvert.DeserializeObject<T>(json);
//if (message != null) _receiveQueue.Add(message, token);
_receiveQueue.Add(buffer, token);
}
}
}
catch (Exception ex)
{
_logger.Error("MMF Client Read Error: ", ex);
HandleDisconnection(true);
break;
}
}
_receiveQueue.CompleteAdding();
}
public byte[] Read() => _receiveQueue.Take();
public Task<byte[]> ReadAsync() => Task.Run(() => _receiveQueue.Take());
private void WriteInternal(byte[] message)
{
if (_sendBuffer == null) throw new InvalidOperationException("Not connected.");
//var json = JsonConvert.SerializeObject(message);
//var buffer = Encoding.UTF8.GetBytes(json);
if (!_sendBuffer.TryWrite(message))
{
_logger.Warn("MMF send buffer is full. Message dropped.");
}
}
public void Write(byte[] message) => WriteInternal(message);
public Task WriteAsync(byte[] message) => Task.Run(() => WriteInternal(message));
public void StartMessageListener()
{
Task.Run(() =>
{
foreach (var message in _receiveQueue.GetConsumingEnumerable(_cancellationTokenSource.Token))
{
OnMessageReceived(message);
}
});
}
protected virtual void OnMessageReceived(byte[] e) => MessageReceived?.Invoke(this, e);
protected virtual void OnConnectionChanged(ConnectionChangedEventArgs e)
{
_logger.Info(string.Format("Connection status changed to: {0}", e.Status));
ConnectionChanged?.Invoke(this, e);
}
public void Disconnect()
{
_logger.Info("Disconnecting...");
_cancellationTokenSource?.Cancel();
}
public void Dispose()
{
Disconnect();
_sendBuffer?.Dispose();
_receiveBuffer?.Dispose();
_receiveQueue.Dispose();
_cancellationTokenSource?.Dispose();
}
}
原理都是类似的。
一对多的实现
IPC大部分都是一对一的实现,宿主担任服务端,负责与分离出来的作为客户端的“部分”进行一对一的通讯。但在有些场景下,需要实现一对多的IPC通讯,比如之前介绍的将不稳定的部分或者用其它语言编写的部分独立出来作为一个单独的模块。参考前述的SPSC单生产者单消费者环形缓冲区,为满足一对多、双向、广播、单播和满客户端流控,一个高效的多通道架构如下,它包含:
- 一个 服务端->多客户端的“广播通道”(S2C-Server to Client)
- 它是一个单生产者,多消费者的环形缓冲区。
- 服务端是唯一的写入方,负责写入所有广播消息和针对特定客户端的单播消息,写入完成之后,会更新头部指针head。
- 所有的客户端都是读取方,它们会读取这个通道,在通道的头部,会记录每个客户端当前读取之后的指针位置tail。
- 为了解决“慢客户端”问题,通道的头部会为每个客户端专门设置一个指针槽位数组。服务端在写入新数据之前,会检查所有已连接客户端的tail指针,并且以最慢的那个客户端为准来计算可用空间。这样,如果某个客户端读取缓慢,它的tail指针就不会前进,最终会“撑满”缓冲区,自动地对服务端产生反压(Backpressure),迫使其减慢发送速度。
- 多个客户端->服务端的“上行通道”(C2S-Client to Server)
- 为每一个客户端创建一个专属的、独立的单向通道。
- 这是一个标准的单生产者、单消费者(SPSC)模型,客户端是唯一的写入方,服务端是唯一的读取方。
- 这种方式可以避免多个客户端写入同一个通道时参数复杂的锁竞争,保证了上行通道的最高效率。
- 客户端注册与发现
- 当一个新的客户端启动时,它会首先连接到“广播通道”。
- 它会在广播通道的头部寻找一个空的“客户端槽位”,并通过操作“认领”这个槽位。这个槽位的索引,比如5号槽,就成了这个客户端的唯一ID
- 服务端会监控这些槽位的变化,一旦发现有新客户端认领了槽位,就知道有新的客户端上线了。
- 客户端根据自己获得的ID(比如5),就知道自己应该使用第5号上行通道,比如C2S_Channel_5,来向服务端发送消息。
首先来看头部控制区的布局:
public static class MmfConstants
{
// 支持的最大客户端数
public const int MaxClients = 64;
public const int ClientSlotSize = 24;// 单个客户端在头部占用的槽位大小 (PID + Tail Ptr + Heartbeat Ptr)
// S2C (广播) 通道头部布局
public const int S2CHeaderSize = 32 + (ClientSlotSize * MaxClients);
public const int S2CMagicOffset = 0;
public const int S2CCapacityOffset = 8;
public const long S2CServerHeadOffset = 16;
public const int S2CClientBitmapOffset = 24;//Bitmap结构,记录所有的槽位
public const long S2CClientSlotsStartOffset = 32;//每个客户端的槽位开始
// C2S (上行) 通道头部布局
public const int C2SHeaderSize = 24;
public const int C2SCapacityOffset = 0;
public const long C2SHeadOffset = 8;
public const long C2STailOffset = 16;
// S2C 通道中的消息格式: [目标客户端ID (long)][消息长度 (int)][消息体]
public const int TargetClientIdSize = sizeof(long);
public const int MessageLengthSize = sizeof(int);
public const int BroadcastClientId = 0;
public const long MagicNO = 0xFEEDBEEFL;
}
S2C前缀表示服务端->客户端的下行广播通道,它的头部控制区包含:
- S2CMagicOffset=0,魔术数写入起始位置。控制区设置一个S2CMagicOffset并写入一个固定的“魔术数”,是出于健壮性和安全性的考虑。它主要有三个作用:
- 识别与校验 (Identification and Validation),这是最主要的目的,可以把它想象成一个“秘密握手暗号”。客户端程序尝试连接到一个名为 "My1ToManyIpc_Broadcast" 的共享内存区域。操作系统确实找到了这个名称的内存,但如何100%确定这个内存就是由您的服务端程序创建的,而不是由另一个碰巧使用了相同名称的、完全不相关的程序创建的?解决方案就是服务端在创建共享内存后,会在头部的固定位置(S2CMagicOffset)写入一个双方约定好的、独特的暗号(即魔术数 0xFEEDBEEFL)。客户端在连接时,会先读取这个位置的值。如果值与约定的暗号一致,客户端就知道:“连接正确,这是我们的自己人!”。如果值不一致,客户端就知道它连接到了一个错误的内存区域,应该立刻报错并断开,而不是尝试读取可能会导致程序崩溃的垃圾数据。
- 防止数据损坏 (Preventing Data Corruption),魔术数也是一种非常基础的完整性校验。如果因为某些意外(例如另一个进程的野指针错误地修改了共享内存),导致共享内存头部的数据被破坏,那么这个固定的魔术数有很大概率也会被改变。当新客户端连接时,一检查魔术数不匹配,就能立即发现数据已损坏,从而避免了更严重的后续错误。
- 同步初始化状态 (Synchronizing Initialization State),在一个更复杂的系统中,服务端可能会分好几步来初始化共享内存的头部。通常的做法是,在所有其他字段(如Capacity、指针等)都设置完毕后,最后一步才写入魔术数。这样,客户端就可以通过轮询这个魔术数是否存在,来判断服务端是否已完成全部的初始化工作。一旦魔术数出现,客户端就知道整个头部都是有效且可读的了。
- MagicNO,魔术数值,它没有任何数学或算法上的含义。这个值的选择是完全任意的,它源于一种在程序员中非常流行的、有趣的传统,叫做“十六进制魔术语”(Hexspeak)。
- Hexspeak 指的是用十六进制的数字(0-9, A-F)来拼凑出一些类似英文的、容易记忆的单词。比如 0xFEEDBEEF:FEED -> "feed" (喂养),BEEF -> "beef" (牛肉),所以,0xFEEDBEEF 就是 "feed beef"(喂牛肉)。它只是一个好玩的、在调试内存时非常显眼、容易辨认的词组。程序员喜欢用这类词组,因为当你在调试器中查看一大堆原始内存数据时,0xFEEDBEEF 这样的值会立刻抓住你的眼球。
- 其他著名的“十六进制魔术语”:
0xDEADBEEF (dead beef - 死牛肉): 最著名的魔术数,经常被用来标记已经释放或无效的内存。
0xCAFEBABE (cafe babe - 咖啡宝贝): Java 语言的 .class 文件就是用这个魔术数开头的。
0xBAADF00D (bad food - 坏掉的食物): 由微软使用,用于标记未初始化的堆内存。
- S2CCapacityOffset=8,广播通道的容量大小写入位置。
- S2CServerHeadOffset=16,服务端一次写入后,在缓冲区里的指针位置,便于下次继续写入。
- S2CClientBitmapOffset=24,广播通道的所有可用槽位数,用二进制来表示,初始全部为0,表示全部槽位可用。如果这是来了一个客户端,这遍历这个槽位,如果它发现第一位是0,表示可用,于是把第1位从0改为1写入,同时客户端自己的ClientID就是第1个槽位。
- S2CClientSlotsStartOffset=32,客户端槽位的开始位置。每个客户端的槽位由:
- PID 某个客户端的进程ID,为什么要加入进程ID,后面会详解。
- Tail 某个客户端读取广播消息后,在下行通道中的指针位置,便于下次继续读取。
- Heartbeat某个客户端写入的心跳事件,便于服务端读取之后判断客户端是否存活。
- ClientSlotSize=24,就是上述每个客户端槽的大小,里面包含三个字段,每个字段分配8个字节。
- S2CHeaderSize=32+ (ClientSlotSize * MaxClients),这个是下行通道控制区的总大小,前面32是固定的控制区大小,后面是根据支持最大的客户端数乘以每个客户端的槽位大小数得来。
客户端槽位里面包含PID进程ID,因为进程ID是一个至关重要的元数据(Metadata),PID 在 ClientSlot 中的主要作用是“身份识别”,它有三大作用:
- 核心作用:调试与监控 (Debugging and Monitoring),这是最直接、最实用的一个作用。想象一下一个正在运行的生产环境,
- 如果没有 PID 的情况:
服务端日志显示:“警告:客户端 #5 读取缓慢,导致广播队列阻塞。”管理员看到了这条日志,但他完全不知道“客户端 #5”对应的是服务器上运行的哪一个具体进程。如果服务器上运行了20个相同名称的客户端程序(例如 MyClientApp.exe),管理员将无法定位到是哪一个出了问题。他无法轻易地结束这个有问题的进程,也无法附加调试器或查看其具体的资源占用。
- 有 PID 的情况:
服务端日志可以显示:“警告:客户端 #5 (PID: 8132) 读取缓慢...”
管理员看到后,可以立刻打开任务管理器,根据进程ID 8132,精准地找到那个出问题的 MyClientApp.exe 进程。
他可以立即对这个进程进行分析:查看它的CPU/内存占用、附加调试器、或者在必要时将其终止,从而快速解决问题。
PID 在这里扮演了一个从“逻辑身份(客户端ID #5)”到“物理身份(操作系统进程ID 8132)”的关键桥梁。
- 如果没有 PID 的情况:
- 增强作用:“僵尸槽位”的自动清理 (Automatic Cleanup of "Zombie Slots"),这个作用极大地提升了服务端的鲁棒性,尤其是在服务端自身需要重启的情况下。假设服务端正在运行,有5个客户端连接着。服务端进程因为某些原因崩溃或被重启了。
当服务端重新启动时,它会使用 CreateOrOpen 重新打开之前的广播通道MMF文件。此时,文件头部的 ClientRegisterBitmap 中,之前那5个客户端的标志位可能仍然是1,这些都是“僵尸槽位”。之前的客户端进程可能也已经退出了,也可能还在运行并等待重连。
PID 如何解决这个问题:新启动的服务端在初始化时,会扫描 ClientRegisterBitmap。对于每一个被标记为“已连接”的槽位,它会读取其中存储的PID。然后,它会向操作系统查询:“PID为xxxx的进程当前是否存在?” (在C#中可以通过 Process.GetProcessById(pid) 实现,如果进程不存在会抛出异常)。
如果查询失败(进程不存在),服务端就知道这是一个“僵尸槽位”,是上一个会话的残留物。此时,服务端可以安全地、主动地将这个槽位从ClientRegisterBitmap中清除,为新客户端腾出空间。这使得服务端在重启后,能够自动清理无效的连接状态,恢复到一个干净的环境。 - 辅助作用:防止槽位重用冲突 (Preventing Slot Reuse Conflicts),这是一个更细微但有用的安全校验。加入:客户端 #5 (PID 8132) 崩溃了。服务端的“僵尸驱逐”机制有一定延迟(例如5秒),还没来得及清理这个槽位。在这5秒内,一个新的客户端启动了,并且恰好因为某种逻辑错误,也开始尝试使用客户端 #5 的上行通道 C2S_Channel_5。有了PID,服务端在收到来自 C2S_Channel_5 的消息时,可以做一次校验:它会去广播通道的头部读取5号槽位中记录的 PID (仍然是 8132)。如果它发现收到的消息宣称自己来自一个新的PID,与槽位中记录的PID不符,服务端就可以判定这是一个非法的或过时的连接,并拒绝处理该消息。
C2S前缀表示服务端->客户端的下行广播通道,这个就比较简单了,与之前SPSC中类似,它的头部控制区包含:
- C2SCapacityOffset=0,每个上行通道的缓冲区大小。
- C2SHeadOffset=8,每个客户端往上行通道的缓冲区写入数据后,在缓冲区的位置,方便下次继续写入。
- C2STailOffset=16,服务端读取上行通道里缓冲区中的数据后的指针位置,方便下次继续读取。
- C2SHeaderSize=24,上行通道头部控制区的总大小。
因为下行通道可能包含广播和单播消息,所以消息格式前面需要加上ClientID或者特殊的表示广播的ID,下行通道的消息格式为[目标客户端ID (long)][消息长度 (int)][消息体]。
- TargetClientIdSize=8,目标客户端ID,如果是广播这为0,或者其它不与ClientID冲突的值。
- MessageLengthSize=4,消息长度大小,消息长度默认大小为一个int所占大小。
- BroadcastClientId=0,常量,表示广播消息的ID,它与ClientID区分,ClientID从1开始自增。
控制区结构定义完成之后,还有一些辅助类,如下:
public static class MmfUtils
{
public static EventWaitHandle CreateOrOpenEvent(string name)
{
var eventSecurity = new EventWaitHandleSecurity();
eventSecurity.AddAccessRule(new EventWaitHandleAccessRule(new SecurityIdentifier(WellKnownSidType.WorldSid, null), EventWaitHandleRights.FullControl, AccessControlType.Allow));
return new EventWaitHandle(false, EventResetMode.AutoReset, name, out _);
}
}
public class MmfIpcException : Exception
{
public MmfIpcException(string message) : base(message) { }
public MmfIpcException(string message, Exception innerException) : base(message, innerException) { }
}
现在来看两个基础类MmfChannelWriter和MmfChannelReader,首先来看MmfChannelWriter,它的作用是对下行通道或上行通道进行写入。因为下行通道和上行通道的控制区结构不同(主要是控制区大小不同),所以要区分当前写入的是下行通道,还是上行通道。MmfChannelWriter代码如下:
/// <summary>
/// A low-level writer for a single MMF channel. Can be a broadcast or point-to-point writer.
/// </summary>
public sealed class MmfChannelWriter : IDisposable
{
private readonly MemoryMappedViewAccessor _accessor;
private readonly EventWaitHandle _dataWrittenEvent;
private readonly long _capacity;
private readonly bool _isBroadcastChannel;
public MmfChannelWriter(MemoryMappedViewAccessor accessor, EventWaitHandle ewh, bool isBroadcastChannel)
{
_accessor = accessor;
_dataWrittenEvent = ewh;
_isBroadcastChannel = isBroadcastChannel;
_capacity = _accessor.ReadInt64(_isBroadcastChannel ? MmfConstants.S2CCapacityOffset : MmfConstants.C2SCapacityOffset);
}
public void Write(long targetClientId, byte[] data)
{
int headerSize = _isBroadcastChannel ? MmfConstants.TargetClientIdSize + MmfConstants.MessageLengthSize : MmfConstants.MessageLengthSize;
int requiredSpace = data.Length + headerSize;
if (requiredSpace > _capacity - 1)
throw new ArgumentException("Message is too large for the buffer.");
var spinWait = new SpinWait();
while (true)
{
long head;
long tail;
if (_isBroadcastChannel)
{
head = _accessor.ReadInt64(MmfConstants.S2CServerHeadOffset);
long bitmap = _accessor.ReadInt64(MmfConstants.S2CClientBitmapOffset);
long slowestTail = head; // Start with head, min will find the true slowest
bool anyClients = false;
for (int i = 0; i < MmfConstants.MaxClients; i++)
{
if ((bitmap & (1L << i)) != 0) // Check if client at slot i is active
{
anyClients = true;
long clientTail = _accessor.ReadInt64(MmfConstants.S2CClientSlotsStartOffset + (i * MmfConstants.ClientSlotSize) + 8);
if (clientTail < slowestTail)
{
slowestTail = clientTail;
}
}
}
tail = anyClients ? slowestTail : head; // If no clients, buffer is empty
}
else
{
head = _accessor.ReadInt64(MmfConstants.C2SHeadOffset);
tail = _accessor.ReadInt64(MmfConstants.C2STailOffset);
}
if (_capacity - 1 - (head - tail) >= requiredSpace) break;
spinWait.SpinOnce();
}
byte[] messagePacket = new byte[requiredSpace];
int currentOffset = 0;
if (_isBroadcastChannel)
{
Buffer.BlockCopy(BitConverter.GetBytes(targetClientId), 0, messagePacket, currentOffset, MmfConstants.TargetClientIdSize);
currentOffset += MmfConstants.TargetClientIdSize;
}
Buffer.BlockCopy(BitConverter.GetBytes(data.Length), 0, messagePacket, currentOffset, MmfConstants.MessageLengthSize);
currentOffset += MmfConstants.MessageLengthSize;
if (data.Length > 0)
{
Buffer.BlockCopy(data, 0, messagePacket, currentOffset, data.Length);
}
long currentLogicalHead = _isBroadcastChannel ? _accessor.ReadInt64(MmfConstants.S2CServerHeadOffset) : _accessor.ReadInt64(MmfConstants.C2SHeadOffset);
long physicalStartPos = currentLogicalHead % _capacity;
long bufferStartOffset = _isBroadcastChannel ? MmfConstants.S2CHeaderSize : MmfConstants.C2SHeaderSize;
long spaceToEnd = _capacity - physicalStartPos;
if (spaceToEnd >= requiredSpace)
{
_accessor.WriteArray(bufferStartOffset + physicalStartPos, messagePacket, 0, requiredSpace);
}
else
{
int part1Len = (int)spaceToEnd;
_accessor.WriteArray(bufferStartOffset + physicalStartPos, messagePacket, 0, part1Len);
_accessor.WriteArray(bufferStartOffset, messagePacket, part1Len, requiredSpace - part1Len);
}
long newHead = currentLogicalHead + requiredSpace;
if (_isBroadcastChannel)
{
_accessor.Write(MmfConstants.S2CServerHeadOffset, newHead);
}
else
{
_accessor.Write(MmfConstants.C2SHeadOffset, newHead);
}
_dataWrittenEvent.Set();
}
public void Dispose()
{
_dataWrittenEvent?.Dispose();
_accessor?.Dispose();
}
}
MmfChannelWriter构造函数中的isBroadcastChannel表示是否是广播通道。它影响着消息的格式和写入的逻辑。
这里面的核心逻辑在于Write方法中,Write方法接收两个参数ClientID和待发送的二进制数组:
- 首先计算消息的大小,消息包含消息头,如果是广播通道,这消息头里面需要包含ClientID+消息长度;如果是上行通道,这消息头只包含消息长度。消息的大小requiredSpace=消息头+消息长度。得到待发送的消息大小之后,比较该大小和缓冲区大小,如果超过了缓冲区大小,则直接返回。
- 接着判断消息大小和剩余缓冲区大小之间的关系,计算剩余缓冲区大小根据通道类型不同,计算方法不同:
- 如果是广播通道,需要读取上次写入时的Head位置,以及所有的客户端上次读取的位置中的最小的那个位置。方法就是便利广播通道中控制头中的S2CClientBitmapOffset,逐位查看,如果某个位i为1,这表示该位数的ClientID客户端存在,读取该偏移位置(MmfConstants.S2CClientSlotsStartOffset + (i * MmfConstants.ClientSlotSize) + 8)的Tail,取所有Tail中最小的那个值,Tail越小,表示客户端读取的更慢,缓冲区中还没有读取的数据越多。
- 如果是上行通道,则比较简单,只需要读取客户端上次写入的位置Head和服务端上次读取的位置Tail
- 缓冲区剩余空间为Head-Tail,然后用总容量大小_capacity-1减去这个值,即为剩余缓冲区空间,如果剩余缓冲区空间大于requiredSpace,这表示空间足够,跳出循环,否则稍微等待后再次读取,在这期间客户端可能已经读取了数据,这样Tail就会增加,剩余缓冲区空间就会增加。(这里有个问题是必须要处理那些僵尸客户端,也就是那些已经断块的客户端,必须将其在S2CClientBitmapOffset中,对应的ClientID-1的位数重置为0,否则随着广播通道的不断写入,最慢的那个Tail一直没有更新,通道中有一天会写满,到时候就广播通道就无法写如数据了)。
- 判断剩余缓冲区足够之后,接着就需要把消息打包为二进制数组,如果是广播通道,则需要在消息头写入ClientID,接着写入消息长度,接着写入消息体;如果是上行通道,这直接写入消息长度+消息体。
- 在写入消息体时,需要处理环绕问题。首先要读取上一次写入的物理位置,方法是,先读取上次写入的逻辑位置,即Head,它也需要根据是广播通道还是上行通道获取逻辑位置currentLogicalHead。紧接着对容量取余,就等于物理指针的位置physicalStartPos。
- 计算物理写入位置距离缓冲区末尾的长度spaceToEnd=_capacity - physicalStartPos。
- 如果spaceToEnd足够写入,这不需要对消息进行环绕处理,直接写入对应的长度。
- 如果spaceToEnd不够写,这需要先将spaceToEnd长度的数据,写入;然后跳到开头,写入剩余的requiredSpace-spaceToEnd长度的数据。
- 上述写入完成之后,要更新Head回通道,方便下次接着写。新的head地址是physicalStartPos+requiredSpace。如果是广播通道,就更新回广播通道控制区的S2CServerHeadOffset位置,如果是下行通道,这更新回下行通道控制区的C2SHeadOffset位置。
- 最后,触发_dataWrittenEvent事件,告知等待读取数据的客户端有新的数据写入。
接下来是MmfChannelReader的代码:
/// <summary>
/// A low-level reader for a single MMF channel. Can be a broadcast or point-to-point reader.
/// </summary>
public sealed class MmfChannelReader
{
private readonly MemoryMappedViewAccessor _accessor;
private readonly long _capacity;
private readonly bool _isBroadcastChannel;
private readonly long _clientId;
public MmfChannelReader(MemoryMappedViewAccessor accessor, bool isBroadcastChannel, long clientId = -1)
{
_accessor = accessor;
_isBroadcastChannel = isBroadcastChannel;
_clientId = clientId;
if (_isBroadcastChannel)
{
// 在读取任何其他数据之前,首先校验魔术数。
// 这确保我们连接到了一个由兼容的 MmfIpcServer 创建的、正确的广播通道。
if (_accessor.ReadInt64(MmfConstants.S2CMagicOffset) != MmfConstants.MagicNO)
{
throw new MmfIpcException("Broadcast channel magic number mismatch. Ensure the server is running and the channel name is correct.");
}
_capacity = _accessor.ReadInt64(MmfConstants.S2CCapacityOffset);
}
else
{
_capacity = _accessor.ReadInt64(MmfConstants.C2SCapacityOffset);
}
}
public bool TryRead(out Tuple<long, byte[]> message)
{
message = null;
long head, tail;
long tailOffset;
long bufferStartOffset;
int msgHeaderSize;
if (_isBroadcastChannel)
{
if (_clientId <= 0) throw new InvalidOperationException("Reader must have a valid ClientId to read from a broadcast channel.");
head = _accessor.ReadInt64(MmfConstants.S2CServerHeadOffset);
tailOffset = MmfConstants.S2CClientSlotsStartOffset + ((_clientId - 1) * MmfConstants.ClientSlotSize) + 8;
tail = _accessor.ReadInt64(tailOffset);
bufferStartOffset = MmfConstants.S2CHeaderSize;
msgHeaderSize = MmfConstants.TargetClientIdSize + MmfConstants.MessageLengthSize;
}
else
{
head = _accessor.ReadInt64(MmfConstants.C2SHeadOffset);
tailOffset = MmfConstants.C2STailOffset;
tail = _accessor.ReadInt64(tailOffset);
bufferStartOffset = MmfConstants.C2SHeaderSize;
msgHeaderSize = MmfConstants.MessageLengthSize;
}
if (head == tail) return false;
var currentLogicalTail = tail;
byte[] messageHeader = new byte[msgHeaderSize];
long physicalStartPos = currentLogicalTail % _capacity;
long spaceToEnd = _capacity - physicalStartPos;
if (spaceToEnd >= msgHeaderSize)
{
_accessor.ReadArray(bufferStartOffset + physicalStartPos, messageHeader, 0, msgHeaderSize);
}
else
{
int part1Len = (int)spaceToEnd;
_accessor.ReadArray(bufferStartOffset + physicalStartPos, messageHeader, 0, part1Len);
_accessor.ReadArray(bufferStartOffset, messageHeader, part1Len, msgHeaderSize - part1Len);
}
long targetClientId = _isBroadcastChannel ? BitConverter.ToInt64(messageHeader, 0) : _clientId;
int messageLength = _isBroadcastChannel ? BitConverter.ToInt32(messageHeader, MmfConstants.TargetClientIdSize) : BitConverter.ToInt32(messageHeader, 0);
int requiredSpace = messageLength + msgHeaderSize;
if (messageLength <= 0 || requiredSpace > _capacity - 1) return false;
if ((head - tail) < requiredSpace) return false;
byte[] body = new byte[messageLength];
long bodyStartPos = (currentLogicalTail + msgHeaderSize) % _capacity;
spaceToEnd = _capacity - bodyStartPos;
if (spaceToEnd >= messageLength)
{
_accessor.ReadArray(bufferStartOffset + bodyStartPos, body, 0, messageLength);
}
else
{
int part1Len = (int)spaceToEnd;
_accessor.ReadArray(bufferStartOffset + bodyStartPos, body, 0, part1Len);
_accessor.ReadArray(bufferStartOffset, body, part1Len, messageLength - part1Len);
}
_accessor.Write(tailOffset, currentLogicalTail + requiredSpace);
if (_isBroadcastChannel)
{
if (targetClientId == MmfConstants.BroadcastClientId || targetClientId == _clientId)
{
message = new Tuple<long, byte[]>(targetClientId, body);
return true;
}
return false; // Message is not for us
}
else
{
message = new Tuple<long, byte[]>(_clientId, body);
return true;
}
}
}
也遵循由服务端创建所有资源的原则,这个类不创建资源,它只通过构造函数接受参数MemoryMappedViewAccessor,是否是广播通道isBroadcastChannel以及ClientID。
MmfChannelReader如果是广播通道,这表示客户端读取广播消息,那么就需要判断传进来的MemoryMappedViewAccessor的头部的魔术数是否是预设的魔术数,如果不是,这表示这个客户端是不小心读到了一个恰好名字为预设的广播通道,直接抛出异常。接下来就读取缓冲区大小的参数,这个也要根据是否广播通道来读取不同的位置。
MmfChannelReader只有一个核心的TryRead方法,方法的返回值是一个Tuple,第一项为ClientID,第二项为读取到的二进制数据。
- 方法首先要根据是广播通道还是上行通道来判断读取Tail和Head。
- 如果是广播通道,这表示是某个为ClientID的客户端尝试在读取广播消息。首先要校验clientid,如果不大于0这表示非法。接下来读取服务端写入的头S2CServerHeadOffset即Head,以及该ClientID上次读取后的位置MmfConstants.S2CClientSlotsStartOffset + ((_clientId - 1) * MmfConstants.ClientSlotSize) + 8,得到Tail。这里也同时记录消息内容的起始位置bufferStartOffset和消息头的长度msgHeaderSize。
- 如果是上行通道,这表示是服务端在读取某个客户端自己的上行通道里发送给服务端的MMF数据,这很简单了,直接读取控制区里的C2SHeadOffset和C2SHeaderSize,同时也记录消息内容的起始位置bufferStartOffset和消息头的长度msgHeaderSize。
- 接下来,如果头位置和尾位置相等,这表示所有的数据已经读取过了,没有新的数据,直接返回。
- 否则,再次根据消息头长度,考虑环绕,读取消息的大小。如果是广播通道,这在消息头中,还需要读取ClientID。
- 根据读取到的消息长度,考虑环绕,读取消息内容。
- 消息内容读取完成之后,把当前读取后的位置写回到Tail中,方便下次继续读取。
- 紧接着,如果是广播通道,则判断如果是单播,这判断消息中的ClientID和当前客户端的ClientID是否一致。如果不是则直接忽略这个消息(当然这里有优化的地方,如果读取完成消息头之后,接着读取ClientID,如果不是当前客户端的ClientID,这直接返回,不需要接着读取消息体。),如果是广播,则也处理。
- 如果是上行通道,则这条消息是客户端发送给服务端的,也需要直接处理返回。
上述代码与之前的SPSC中的逻辑一致,核心地方在于要判断是广播通道还是上行通道,以及消息的环绕读取等。
有了上述的读和写MMF的代码,接下来要封装服务端和客户端的代码了。与SPSC一致,秉承的核心思想是服务端创建所有的资源,客户端只负责打开,服务端代码MmfIpcServer如下:
public sealed class MmfIpcServer : IDisposable
{
public event Action<long> ClientConnected;
public event Action<long> ClientDisconnected;
public event Action<long, byte[]> MessageReceived;
private readonly string _broadcastChannelName;
private readonly string _c2sChannelNamePrefix;
private readonly int _capacity;
private readonly int _maxClients;
private readonly TimeSpan _clientTimeout = TimeSpan.FromSeconds(5);
private MmfChannelWriter _broadcastWriter;
private readonly ConcurrentDictionary<long, Tuple<MmfChannelReader, CancellationTokenSource>> _clientReaders = new ConcurrentDictionary<long, Tuple<MmfChannelReader, CancellationTokenSource>>();
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private MemoryMappedFile _s2cMmf;
private MemoryMappedViewAccessor _s2cAccessor;
private EventWaitHandle _s2cEvent;
// 使用一个列表来持有所有预创建的C2S通道所需要的资源,确保它们在服务端运行期间持续存在。
private readonly ConcurrentDictionary<long, Tuple<MemoryMappedFile, EventWaitHandle>> _c2sResources = new ConcurrentDictionary<long, Tuple<MemoryMappedFile, EventWaitHandle>>();
private long _lastKnownBitmap = 0;
private bool _isDisposed = false;
public MmfIpcServer(string broadcastChannelName, string c2sChannelNamePrefix, int capacity, int maxClients = 64)
{
if (maxClients > MmfConstants.MaxClients) throw new ArgumentOutOfRangeException(nameof(maxClients));
_broadcastChannelName = broadcastChannelName;
_c2sChannelNamePrefix = c2sChannelNamePrefix;
_capacity = capacity;
_maxClients = maxClients;
}
public void Start()
{
// 1. 服务端创建所有MMF和Event资源
_s2cMmf = MemoryMappedFile.CreateOrOpen(_broadcastChannelName, MmfConstants.S2CHeaderSize + _capacity);
_s2cEvent = MmfUtils.CreateOrOpenEvent(_broadcastChannelName + "_Event");
_s2cAccessor = _s2cMmf.CreateViewAccessor();
// 2. 初始化广播通道头部
_s2cAccessor.Write(MmfConstants.S2CMagicOffset, MmfConstants.MagicNO);
_s2cAccessor.Write(MmfConstants.S2CCapacityOffset, (long)_capacity);
_s2cAccessor.Write(MmfConstants.S2CServerHeadOffset, 0L);
_s2cAccessor.Write(MmfConstants.S2CClientBitmapOffset, 0L);
for (int i = 0; i < _maxClients; i++)
{
long slotOffset = MmfConstants.S2CClientSlotsStartOffset + (i * MmfConstants.ClientSlotSize);
_s2cAccessor.Write(slotOffset, 0L); // PID
_s2cAccessor.Write(slotOffset + 8, 0L); // Tail
_s2cAccessor.Write(slotOffset + 16, 0L); // Heartbeat
}
// 3. 实例化广播通道写入器
_broadcastWriter = new MmfChannelWriter(_s2cAccessor, _s2cEvent, isBroadcastChannel: true);
// 4. 预创建所有C2S通道资源
for (int i = 0; i < _maxClients; i++)
{
long clientId = i + 1;
var c2sMmf = MemoryMappedFile.CreateOrOpen($"{_c2sChannelNamePrefix}_{clientId}", MmfConstants.C2SHeaderSize + _capacity);
var c2sEvent = MmfUtils.CreateOrOpenEvent($"{_c2sChannelNamePrefix}_{clientId}_Event");
_c2sResources.TryAdd(clientId, new Tuple<MemoryMappedFile, EventWaitHandle>(c2sMmf, c2sEvent));
// 初始化C2S通道头部
using (var accessor = c2sMmf.CreateViewAccessor())
{
accessor.Write(MmfConstants.C2SCapacityOffset, (long)_capacity);
accessor.Write(MmfConstants.C2SHeadOffset, 0L);
accessor.Write(MmfConstants.C2STailOffset, 0L);
}
}
// 5. 启动客户端扫描/心跳检测任务
Task.Run(() => ScanForClientChanges(_cts.Token), _cts.Token);
}
private void ScanForClientChanges(CancellationToken token)
{
while (!_cts.IsCancellationRequested)
{
long currentBitmap = _s2cAccessor.ReadInt64(MmfConstants.S2CClientBitmapOffset);
if (currentBitmap != _lastKnownBitmap)
{
for (int i = 0; i < _maxClients; i++)
{
long clientId = i + 1;
bool isConnectedNow = (currentBitmap & (1L << i)) != 0;
bool wasConnected = (_lastKnownBitmap & (1L << i)) != 0;
if (isConnectedNow && !wasConnected)
{
if (_c2sResources.TryGetValue(clientId, out var resource))
{
// New client connected
var readerCts = new CancellationTokenSource();
var reader = new MmfChannelReader(resource.Item1.CreateViewAccessor(), false, clientId);
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(token, readerCts.Token);
if (_clientReaders.TryAdd(clientId, Tuple.Create(reader, readerCts)))
{
Task.Run(() => ClientReadLoop(reader, resource.Item2, linkedCts.Token), linkedCts.Token);
ClientConnected?.Invoke(clientId);
}
}
}
else if (!isConnectedNow && wasConnected)
{
// Client disconnected
if (_clientReaders.TryRemove(clientId, out var reader))
{
reader.Item2.Cancel();
ClientDisconnected?.Invoke(clientId);
}
}
}
_lastKnownBitmap = currentBitmap;
}
foreach (var clientReaderPair in _clientReaders)
{
long clientId = clientReaderPair.Key;
int slotIndex = (int)clientId - 1;
long clientSlotOffset = MmfConstants.S2CClientSlotsStartOffset + (slotIndex * MmfConstants.ClientSlotSize);
long heartbeatTicks = _s2cAccessor.ReadInt64(clientSlotOffset + 16);
if (DateTime.UtcNow - new DateTime(heartbeatTicks) > _clientTimeout)
{
Console.WriteLine($"[Server] Client {clientId} heartbeat timed out. Evicting.");
if (_clientReaders.TryRemove(clientId, out var reader))
{
// The reader will call ReleaseClientSlot on its own dispose
reader.Item2.Cancel();
// Forcefully clear the client's bit in the bitmap
long currentClientBitmap = _s2cAccessor.ReadInt64(MmfConstants.S2CClientBitmapOffset);
long slotBit = 1L << ((int)clientId - 1);
_s2cAccessor.Write(MmfConstants.S2CClientBitmapOffset, currentClientBitmap & ~slotBit); // 按位清零
ClientDisconnected?.Invoke(clientId);
}
}
}
_cts.Token.WaitHandle.WaitOne(1000);
}
}
public void Broadcast(byte[] data) => _broadcastWriter.Write(MmfConstants.BroadcastClientId, data);
public void Send(long clientId, byte[] data) => _broadcastWriter.Write(clientId, data);
private void ClientReadLoop(MmfChannelReader reader, EventWaitHandle ewh, CancellationToken token)
{
var waitHandles = new WaitHandle[] { ewh, token.WaitHandle };
while (!token.IsCancellationRequested)
{
try
{
while (!token.IsCancellationRequested && reader.TryRead(out var message))
{
MessageReceived?.Invoke(message.Item1, message.Item2);
}
int I = WaitHandle.WaitAny(waitHandles, 250);
}
catch { break; }
}
}
public void Stop() { Dispose(); }
public void Dispose()
{
if (_isDisposed) return;
_isDisposed = true;
_cts.Cancel();
_broadcastWriter.Dispose();
foreach (var resTuple in _c2sResources.Values)
{
resTuple.Item1.Dispose();
resTuple.Item2.Dispose();
}
foreach (var reader in _clientReaders)
{
reader.Value.Item2.Cancel();
}
_s2cAccessor.Dispose();
_s2cMmf.Dispose();
_s2cEvent.Dispose();
}
}
在MmfIpcServer的构造函数中,接受广播通道名称和上行通道名称的前缀,以及服务端支持的客户端数量,以及MMF的大小。核心逻辑在Start方法里:
- 首先创建广播通道需要的内核对象,包括MMF和EventWaitHandle,只要是写入,都需要创建一个EventWaitHandle,它可以通知客户端有新数据可以读取。
- 紧接着初始化广播通道对象的头部信息,包括魔术数,缓存容量,初始的写入位置,以及表示客户端是否在线的Bitmap,最后就是初始化所有客户端的槽位里的基本信息,包括PID,客户端读取的位置Tail和客户端写入的心跳数据。
- 以上数据初始化完成之后,实例化一个MmfChannelWrite广播对象。在服务端,对这个对象进行写入,就是发送广播或单播消息;在客户端,对这个对象进行读取就是接受广播或单播消息(客户端不是直接打开这个MmfChannelWrite广播对象,不负责创建。)
- 紧接着预创建所有上行通道以及实例化。这里需要保存创建上行通道所需要的内核对象,同样,一个上行通道需要一个MMF对象和EventWaitHandle对象,这些对象存储在以ClientID为key,MMF和EventWaitHandle为value的名为_c2sResources字典中。创建完上行通道之后,还需要初始化上行通道里面的控制区头部信息。
- 最后就是启动客户端扫描,心跳检测任务,任务内部是一个ScanForClientChanges方法。
ScanForClientChanges方法的目的是检测客户端的变化,其原理是读取表示客户端是否在线的S2CClientBitmapOffset控制区,如果某个位数为1,则表示该位数+1的ClientID已经连接。这里为了优化,只有S2CClientBitmapOffset的值发生变化了,才去进行逐个按位数来判断。判断的时候,主要是判断状态的变化。
- 如果之前某一位数为0,现在该位数的值变成了1,则表示有新客户端连接。于是根据ClientID,获取出之前预创建的MMF和EventWaitHandle对象,新建一个MmfChannelReader对象,这个对象用来实现服务端对该上线的客户端的上行通道的数据的读取,这个就是该客户端给服务端发送信息的通道。创建完成之后,将该ChannelReader添加到一个_clientReaders字典中。然后启动一个对该MmfChannelReader的循环读取的ClientReadLoop方法,里面需要一个EventWaitHandle来通知是否有新数据到来。最后触发新客户端连接事件。
- 如果之前某一位数为1,现在该位数的值变成了0,则表示有新客户端断开连接。根据ClientID,移除先前添加到_clientReaders字典中的项,并取消对该对象的ClientReadLoop读取方法。最后触发客户端断开连接事件。
- 方法的最后,进行心跳判断,如果心跳超时,则将该客户端连接强制断开,将S2CClientBitmapOffset中对应的位数改为0,然后写回。移除先前添加到_clientReaders字典中的项,并取消对该对象的ClientReadLoop读取方法。最后触发客户端断开连接事件。
ClientReadLoop读取上行通道数据的方法非常简单,当有客户端写入事件触发,或者间隔250毫秒,就主动去读一下MmfChannelReader,如果有数据,就触发MessageReceived事件,传递回ClientID和读取到的二进制数组数据。
最后一个核心的类MmfIpcClient代码如下:
public sealed class MmfIpcClient : IDisposable
{
public event Action Connected;
public event Action Disconnected;
public event Action<long, byte[]> MessageReceived;
private readonly string _broadcastChannelName;
private readonly string _c2sChannelNamePrefix;
private readonly int _reconnectIntervalMs = 3000;
private CancellationTokenSource _mainCts;
private Task _connectLoopTask;
public long ClientId { get; private set; } = -1;
public bool IsConnected => ClientId > 0;
private volatile MmfChannelWriter _c2sWriter;
private Mutex _bitmapMutex;
public MmfIpcClient(string broadcastChannelName, string c2sChannelNamePrefix)
{
_broadcastChannelName = broadcastChannelName;
_c2sChannelNamePrefix = c2sChannelNamePrefix;
}
public void Start()
{
_bitmapMutex = new Mutex(false, _broadcastChannelName + "_BitmapMutex");
_mainCts = new CancellationTokenSource();
_connectLoopTask = Task.Run(() => ConnectionLoop(_mainCts.Token));
}
private void ConnectionLoop(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
MemoryMappedFile s2cMmf = null;
MemoryMappedViewAccessor s2cAccessor = null;
EventWaitHandle s2cEvent = null;
try
{
// 1. 打开广播通道资源
s2cMmf = MemoryMappedFile.OpenExisting(_broadcastChannelName);
s2cAccessor = s2cMmf.CreateViewAccessor();
s2cEvent = MmfUtils.CreateOrOpenEvent(_broadcastChannelName + "_Event");
// 2. 关键步骤:认领一个客户端槽位
ClientId = ClaimClientSlot(s2cAccessor);
if (ClientId == -1) throw new MmfIpcException("Failed to claim a client slot.");
var c2sChannelName = $"{_c2sChannelNamePrefix}_{this.ClientId}";
// 3. 打开自己专属的上行通道资源
using (var c2sMmf = MemoryMappedFile.OpenExisting(c2sChannelName))
using (var c2sAccessor = c2sMmf.CreateViewAccessor())
using (var c2sEvent = MmfUtils.CreateOrOpenEvent(c2sChannelName + "_Event"))
{
// 4. 创建纯逻辑的 Reader 和 Writer
var s2cReader = new MmfChannelReader(s2cAccessor, true, this.ClientId);
var c2sWriter = new MmfChannelWriter(c2sAccessor, c2sEvent, false);
{
_c2sWriter = c2sWriter;
Connected?.Invoke();
// 启动心跳和监听任务
var tasks = new[]
{
Task.Run(() => HeartbeatLoop(s2cAccessor, token), token),
Task.Run(() => ReaderLoop(s2cReader, s2cEvent, token), token)
};
int i = Task.WaitAny(tasks); // 等待任何一个任务结束(或出错)
Console.WriteLine(i.ToString());
}
}
}
catch (Exception ex)
{ /* 抑制异常以便重连 */
Console.WriteLine(ex.Message);
}
finally
{
Console.WriteLine("client ConnectionLoop finally");
if (IsConnected)
{
ReleaseClientSlot(s2cAccessor); // 优雅地释放槽位
ClientId = -1;
_c2sWriter = null;
Disconnected?.Invoke();
}
// 清理本次连接打开的广播通道资源
s2cEvent?.Dispose();
s2cAccessor?.Dispose();
s2cMmf?.Dispose();
}
if (!token.IsCancellationRequested) token.WaitHandle.WaitOne(_reconnectIntervalMs);
}
}
private void ReaderLoop(MmfChannelReader reader, EventWaitHandle ewh, CancellationToken token)
{
while (!token.IsCancellationRequested)
{
if (reader.TryRead(out var message))
{
MessageReceived?.Invoke(message.Item1, message.Item2);
}
else
{
WaitHandle.WaitAny(new[] { ewh, token.WaitHandle }, 250);
}
}
}
private void HeartbeatLoop(MemoryMappedViewAccessor accessor, CancellationToken token)
{
int slotIndex = (int)ClientId - 1;
long heartbeatOffset = MmfConstants.S2CClientSlotsStartOffset + (slotIndex * MmfConstants.ClientSlotSize) + 16;
while (!token.IsCancellationRequested)
{
try
{
accessor.Write(heartbeatOffset, DateTime.UtcNow.Ticks);
token.WaitHandle.WaitOne(1000);
}
catch { break; }
}
}
public void Send(byte[] data) => _c2sWriter.Write(this.ClientId, data);
/// <summary>
/// 使用 Mutex 安全地认领一个客户端槽位。
/// </summary>
private int ClaimClientSlot(MemoryMappedViewAccessor _accessor)
{
var spinWait = new SpinWait();
while (true)
{
if (_bitmapMutex.WaitOne(TimeSpan.FromSeconds(5)))
{
try
{
long currentBitmap = _accessor.ReadInt64(MmfConstants.S2CClientBitmapOffset);
for (int i = 0; i < MmfConstants.MaxClients; i++)
{
if ((currentBitmap & (1L << i)) == 0) // 找到一个空槽位
{
long newBitmap = currentBitmap | (1L << i);
// 因为有锁保护,所以可以直接写入,无需原子比较
_accessor.Write(MmfConstants.S2CClientBitmapOffset, newBitmap);
int clientId = i + 1; // 使用1-based的客户端ID
// 将自己的tail指针初始化为服务端当前的head指针,避免读取旧数据
long serverHead = _accessor.ReadInt64(MmfConstants.S2CServerHeadOffset);
_accessor.Write(MmfConstants.S2CClientSlotsStartOffset + (i * MmfConstants.ClientSlotSize), (long)Process.GetCurrentProcess().Id);
_accessor.Write(MmfConstants.S2CClientSlotsStartOffset + (i * MmfConstants.ClientSlotSize) + 8, serverHead);
return clientId; // 成功认领,退出
}
}
}
finally
{
_bitmapMutex.ReleaseMutex();
}
}
// 如果没找到空槽位或获取锁超时,则稍后重试
spinWait.SpinOnce();
}
}
/// <summary>
/// 使用 Mutex 安全地释放客户端槽位。
/// </summary>
public void ReleaseClientSlot(MemoryMappedViewAccessor _accessor)
{
if (ClientId <= 0) return;
if (_bitmapMutex.WaitOne(TimeSpan.FromSeconds(1)))
{
try
{
long currentBitmap = _accessor.ReadInt64(MmfConstants.S2CClientBitmapOffset);
long slotBit = 1L << ((int)ClientId - 1);
_accessor.Write(MmfConstants.S2CClientBitmapOffset, currentBitmap & ~slotBit); // 按位清零
}
finally
{
_bitmapMutex.ReleaseMutex();
}
}
}
public void Dispose()
{
_mainCts?.Cancel();
try { _connectLoopTask?.Wait(2000); } catch { }
_mainCts?.Dispose();
}
}
与MmfIpcServer的构造函数类似,MmfIpcClient构造函数接收广播通道名称和上行通道名称的前缀。其余信息可以从通道控制头中读取。这里有一个认领客户端槽位和释放客户端槽位的操作,所以需要一个锁来控制并发。MmfIpcClient的Start方法中,启动了一个连接循环ConnectionLoop:
- 首先打开广播通道相关的资源,包括MMF和EventWaitHandle。
- 然后从广播通道上的客户端槽位中认领一个槽位作为当前的ClientID。
- 根据ClientID和上行通道的前缀,就可以知道服务端预先创建的上行通道的名称。
- 有了名称这些信息,就可以打开服务端创建的上行通道的MMF和EventWaitHandle等内核对象。
- 以上信息准备好之后,创建一个客户端读取广播通道的类型为MmfChannelReader的s2cReader和客户端给服务发送消息的上行通道的类型为MmfChannelWriter的c2sWriter。其中c2sWriter需要保存到私有变量中,方便后续发送消息时调用该对象的写入方法。
- 紧接着启动心跳和监听任务,心跳主要是往广播通道中写入心跳时间。监听任务主要是监听广播通道的写入事件EventWaitHandle,如果有写入时间触发或间隔250ms,则尝试从广播通道中读取数据,读取成功之后触发回调。
以上4个类是一对多模型的核心实现。 现在编写一个测试用例:
测试用法
测试控制台程序的完整代码如下:
internal class Program
{
private const string BroadcastChannel = "My1ToManyIpc_Broadcast";
private const string C2SPrefix = "My1ToManyIpc_C2S";
private const int BufferCapacity = 1024 * 1024; // 1 MB
private const int MaxClients = 8;
public static void Main()
{
Console.WriteLine("Select mode: [1] Server or [2] Client");
var choice = Console.ReadKey(true).KeyChar;
Console.WriteLine();
if (choice == '1') RunServer();
else RunClient();
}
private static void RunServer()
{
Console.Title = "IPC Server";
using (var server = new MmfIpcServer(BroadcastChannel, C2SPrefix, BufferCapacity, MaxClients))
{
server.ClientConnected += (id) => Console.WriteLine($"[Server] Client {id} has connected.");
server.ClientDisconnected += (id) => Console.WriteLine($"[Server] Client {id} has disconnected.");
server.MessageReceived += (id, data) =>
{
var msg = Encoding.UTF8.GetString(data);
Console.WriteLine($"[Server] Received from Client {id}: '{msg}'");
server.Send(id, Encoding.UTF8.GetBytes($"Server acknowledges your message: '{msg}'"));
};
server.Start();
Console.WriteLine("Server is running. Commands: 'b:message' (broadcast), 's:id:message' (send), 'exit'");
while (true)
{
var input = Console.ReadLine();
if (input?.ToLower() == "exit") break;
if (string.IsNullOrEmpty(input)) continue;
if (input.StartsWith("b:"))
{
server.Broadcast(Encoding.UTF8.GetBytes(input.Substring(2)));
}
else if (input.StartsWith("s:"))
{
var parts = input.Substring(2).Split(new[] { ':' }, 2);
if (parts.Length == 2 && long.TryParse(parts[0], out long id))
{
server.Send(id, Encoding.UTF8.GetBytes(parts[1]));
}
}
}
}
}
private static void RunClient()
{
try
{
using (var client = new MmfIpcClient(BroadcastChannel, C2SPrefix))
{
client.Connected += () =>
{
Console.Title = $"IPC Client #{client.ClientId}";
Console.WriteLine($"[Client #{client.ClientId}] Connected to server.");
client.Send(Encoding.UTF8.GetBytes("Hello from new client!"));
};
client.MessageReceived += (id, data) =>
{
var msg = Encoding.UTF8.GetString(data);
string from = id == MmfConstants.BroadcastClientId ? "Broadcast" : $"Server (to me)";
Console.WriteLine($"[Client #{client.ClientId}] Received message from [{from}]: '{msg}'");
};
client.Start();
Console.WriteLine($"Client starting... will connect and get ID. Type a message and press [Enter] to send, or 'exit' to quit.");
while (true)
{
var input = Console.ReadLine();
if (input?.ToLower() == "exit") break;
if (string.IsNullOrEmpty(input)) continue;
client.Send(Encoding.UTF8.GetBytes(input));
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Client failed to start. Is the server running? Error: {ex.Message}");
Console.ReadLine();
}
}
}
现在可以启动多个实例,启动时,可以选择是客户端还是服务端,服务端只启动一个,客户端可以启动多个。服务端可以发送广播消息和单播消息。
参考:
- Fast IPC Communication Using Shared Memory and InterlockedCompareExchange (updated!) - CodeProject
- Memory-Mapped Files - .NET | Microsoft Learn
- C# Send Data Between Processes (w/ Memory Mapped File) | coding.vision (codingvision.net)
- Sharing is Caring: Using Memory Mapped Files in .NET - Simple Talk (red-gate.com)
- A C# Framework for Interprocess Synchronization and Communication - CodeProject
- Client/Server interprocess communication via shared memory | Developer.com
- https://blog.darkthread.net/blog/about-shared-memory/
- Using Memory-Mapped Files in .NET 4.0 | Developer.com