引言

在现代软件架构中,无论是模块化的单体应用、面向服务的架构(SOA),还是日益普及的微服务部署,进程间通信(Inter-Process Communication, IPC)都已从一个简单的实现细节,演变为影响系统性能、安全性与可维护性的基石性架构决策。在此背景下,选择一种合适的 IPC 机制,不仅关乎数据传输的效率,更直接决定了整个系统的响应能力、资源消耗和技术债务。在Windows平台上,有三种主流的IPC通讯方式,它们分别是:

  • 命名管道(Named Pipes):一种基于消息传递的机制,它在内核层面提供了一个结构化、可靠且安全的通信通道 。   
  • TCP/IP 套接字(TCP/IP Sockets):同样基于消息传递,但其设计初衷是为网络提供服务。当用于本地环回接口时,它成为了一种 IPC 选项 。 
  • 内存映射文件(Memory-Mapped Files, MMF):一种基于共享内存的机制,通过将同一块物理内存映射到多个进程的虚拟地址空间,实现了数据交换的“零拷贝” 。   

在之前的文章使用命名管道进行进程间通讯使用内存映射文件实现进程间通讯,详细介绍了其中的两种,TCP/IP通过本地环回也可以实现IPC通讯,本文对应用中常见的一对一、双向通讯场景,提供了一套功能完备的、包含连接状态管理、自动重连和日志抽象的企业级实现。并分析了各种方案的传输性能。

Windows平台IPC的基础


在深入比较具体技术之前,必须首先理解它们所依赖的操作系统底层原理。进程间通信的性能、复杂度和安全性,在很大程度上由其在内核层面的实现范式所决定。在 Windows 平台上,上述三种技术主要归结为两种根本不同的模型:消息传递(Message Passing)和共享内存(Shared Memory)。操作系统的核心职责之一是隔离进程,确保一个进程的错误不会影响到其他进程。这种隔离体现在每个进程都拥有独立的虚拟地址空间。IPC 的本质,正是在这种隔离的前提下,安全、高效地建立数据交换的桥梁 。   

  • 消息传递(命名管道与套接字):消息传递模型是一种高度结构化的通信方式。当发送方进程(Process A)希望向接收方进程(Process B)发送数据时,数据会经历一次从用户空间到内核空间的拷贝,再从内核空间拷贝到接收方的用户空间。这个过程涉及两次内存拷贝和两次上下文切换,但带来了显著的优势:内核保证了数据传输的原子性和顺序性,且进程之间没有直接的内存访问,安全性高 。
  • 共享内存(内存映射文件):共享内存模型则采取了截然不同的策略。它通过将同一块物理内存映射到多个进程的虚拟地址空间,最大限度地减少了内核的干预。数据交换时,数据始终停留在用户空间,完全避免了向内核空间的数据拷贝,从而实现了极致的性能 。然而,这种模型也将数据同步的全部责任转移给了开发者。   
  • 同步机制:当多个进程可以同时访问同一块内存区域时,如果缺乏协调,就会发生“竞态条件”(Race Condition),导致数据损坏 。因此,任何基于共享内存的 IPC 实现都必须构建一个强大的同步协议。在 Windows 上,这意味着必须使用内核提供的同步原语,如互斥体(Mutex)和事件等待句柄(EventWaitHandle)来协调访问 。这份“同步的负担”是 MMF 极致性能所必须付出的成本。

下面简要介绍这三种技术的原理和实现。

命名管道


原理

使用命名管道进行进程间通讯 这篇文章中已经详细介绍了命名管道。简言之命名管道(Named Pipes)提供了一种经过精心设计的、专门用于本地及局域网内进程通信的解决方案,实现了性能、安全性与易用性的高度平衡。一个命名管道是存在于命名管道文件系统(NPFS)中的一个内核对象,路径格式为 \\.\pipe\PipeName 。.NET 通过System.IO.Pipes 命名空间中的 NamedPipeServerStream 和 NamedPipeClientStream 类,将其封装成了熟悉的 Stream 接口 。    
命名管道在底层并不是通过 TCP/IP 协议实现的,它们是两种完全独立且在不同层级上运行的通信机制。命名管道是 Windows 操作系统提供的一种高级 IPC 机制。它的底层实现依赖于一个特殊的、仅存在于内核内存中的文件系统驱动,称为命名管道文件系统 (Named Pipe File System, NPFS) 。当创建一个 NamedPipeServerStream 时,Windows 内核会在 NPFS 中创建一个类似于文件的对象。客户端通过这个“文件”的路径来连接。所有的读写操作最终都会被转换为对这个内核对象的 I/O 请求。这个机制是为本地和局域网内的高效 IPC 专门设计和优化的。它绕过了大部分通用网络协议栈的复杂层次,通信路径更短、更直接,因此在本地通信时,其固定开销(overhead)远低于 TCP/IP

命名管道与 Windows 的核心安全模型深度集成。通过 PipeSecurity 类,可以为管道设置访问控制列表(ACL),精确地定义哪些用户或用户组可以访问管道 。此外,它还支持客户端模拟(Impersonation),允许服务端进程临时以客户端的安全上下文来执行代码,这在处理权限敏感的操作时至关重要 。

命名管道支持两种传输模式:字节模式(Byte Mode)和消息模式(Message Mode)。消息模式是其最具吸引力的特性之一,因为它保证了消息边界的完整性,极大地简化了通信协议的设计 。   

实现

使用命名管道进行进程间通讯这篇文章中,介绍了通过修改一个开源的命名管道进行通讯来实现的一个类库,现在看起来有些过于复杂。现在的实现提供以下接口:

  • 同步读/写:Write() 和 Read() 方法。
  • 异步读/写:WriteAsync() 和 ReadAsync() 方法。
  • 事件驱动读取:通过 MessageReceived 事件。
  • 这种设计通过内部使用一个 BlockingCollection<byte> 作为消息缓冲区来实现,将底层的 I/O 操作与上层的消息处理逻辑解耦 。

服务端代码NamedPipeIpcServer如下:

public class NamedPipeIpcServer : IDisposable
{
    private readonly string _pipeName;
    private readonly BlockingCollection<byte[]> _receiveQueue = new BlockingCollection<byte[]>();
    private CancellationTokenSource _cancellationTokenSource;
    private readonly ILogger _logger;
    private NamedPipeServerStream _currentPipeStream; // For Write operations

    public event EventHandler<byte[]> MessageReceived;
    public event EventHandler<ConnectionChangedEventArgs> ClientConnectionChanged;
    public NamedPipeIpcServer(string pipeName, ILogger logger = null)
    {
        _pipeName = pipeName;
        _logger = logger ?? new ConsoleLogger();
    }

    public void Start()
    {
        _cancellationTokenSource = new CancellationTokenSource();
        Task.Run(() => ListenForClientsAsync(_cancellationTokenSource.Token));
        _logger.Info(string.Format("Server started on pipe '{0}'.", _pipeName));
    }

    public void StartMessageListener()
    {
        Task.Run(() =>
        {
            foreach (var message in _receiveQueue.GetConsumingEnumerable(_cancellationTokenSource.Token))
            {
                OnMessageReceived(message);
            }
        });
    }

    private async Task ListenForClientsAsync(CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {
            try
            {
                await ProcessClientConnection(token);
            }
            catch (OperationCanceledException)
            {
                break; // Expected on shutdown
            }
            catch (Exception ex)
            {
                _logger.Error("An error occurred in the client listener loop.", ex);
                await Task.Delay(1000, token); // Prevent rapid-fire error loops
            }
        }
        _logger.Info("Server listener stopped.");
    }

    private async Task ProcessClientConnection(CancellationToken token)
    {
        var pipeSecurity = new PipeSecurity();
        pipeSecurity.AddAccessRule(new PipeAccessRule(new SecurityIdentifier(WellKnownSidType.WorldSid, null), PipeAccessRights.ReadWrite, System.Security.AccessControl.AccessControlType.Allow));

        using (var pipeStream = new NamedPipeServerStream(_pipeName, PipeDirection.InOut, 1, PipeTransmissionMode.Message, PipeOptions.Asynchronous, 4096, 4096, pipeSecurity))
        {
            _currentPipeStream = pipeStream;
            _logger.Info("Waiting for client connection...");
            await pipeStream.WaitForConnectionAsync(token);
            OnClientConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Connected));

            try
            {
                await ReadLoopAsync(pipeStream, token);
            }
            finally
            {
                OnClientConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Disconnected));
                _currentPipeStream = null;
            }
        }
    }

    private async Task ReadLoopAsync(NamedPipeServerStream stream, CancellationToken token)
    {
        var buffer = new byte[1024];
        try
        {
            while (stream.IsConnected && !token.IsCancellationRequested)
            {
                using (var memoryStream = new System.IO.MemoryStream())
                {
                    int bytesRead;
                    // 循环读取,直到消息完整
                    do
                    {
                        // 读取当前可用的消息片段(可能是部分消息)
                        bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
                        // 将读取到的片段写入内存流
                        if (bytesRead > 0)
                        {
                            memoryStream.Write(buffer, 0, bytesRead);
                        }
                        else
                        {
                            break;
                        }
                        // 检查消息是否读取完毕
                    } while (!stream.IsMessageComplete);

                    // 将内存流中的所有片段合并为完整消息
                    _receiveQueue.Add(memoryStream.ToArray(), token);
                }
            }
        }
        catch (IOException ex)
        {
            _logger.Warn(string.Format("Client disconnected with IO exception: {0}", ex.Message));
        }
        catch (OperationCanceledException) { /* Expected */ }
        finally
        {
            _receiveQueue.CompleteAdding();
        }
    }

    public byte[] Read() => _receiveQueue.Take();
    public Task<byte[]> ReadAsync() => Task.Run(() => _receiveQueue.Take());

    private void WriteInternal(byte[] message)
    {
        var pipe = _currentPipeStream;
        if (pipe == null || !pipe.IsConnected) throw new InvalidOperationException("Pipe is not connected.");
        byte[] buffer = new byte[message.Length];
        Buffer.BlockCopy(message, 0, buffer, 0, message.Length);
        pipe.Write(buffer, 0, buffer.Length);
    }

    private async Task WriteInternalAsync(byte[] message)
    {
        var pipe = _currentPipeStream;
        if (pipe == null || !pipe.IsConnected) throw new InvalidOperationException("Pipe is not connected.");
        byte[] buffer = new byte[message.Length];
        Buffer.BlockCopy(message, 0, buffer, 0, message.Length);
        await pipe.WriteAsync(buffer, 0, buffer.Length);
    }

    public void Write(byte[] message) => WriteInternal(message);
    public Task WriteAsync(byte[] message) => WriteInternalAsync(message);

    protected virtual void OnMessageReceived(byte[] e) => MessageReceived?.Invoke(this, e);

    protected virtual void OnClientConnectionChanged(ConnectionChangedEventArgs e)
    {
        _logger.Info(string.Format("Client connection status changed to: {0}", e.Status));
        ClientConnectionChanged?.Invoke(this, e);
    }

    public void Stop()
    {
        _logger.Info("Stopping server...");
        _cancellationTokenSource?.Cancel();
        // Unblock WaitForConnectionAsync by making a dummy connection
        try
        {
            using (var dummy = new NamedPipeClientStream(".", _pipeName, PipeDirection.Out))
            {
                dummy.Connect(100);
            }
        }
        catch { }
    }

    public void Dispose()
    {
        Stop();
        _receiveQueue.Dispose();
        _cancellationTokenSource?.Dispose();
    }
}

客户端代码NamedPipeIpcClient如下:

public class NamedPipeIpcClient : IDisposable
{
    private readonly string _pipeName;
    private readonly BlockingCollection<byte[]> _receiveQueue = new BlockingCollection<byte[]>();
    private CancellationTokenSource _cancellationTokenSource;
    private NamedPipeClientStream _pipeStream;
    private readonly ILogger _logger;
    public bool AutoReconnect { get; set; } = true;
    public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);

    public event EventHandler<byte[]> MessageReceived;
    public event EventHandler<ConnectionChangedEventArgs> ConnectionChanged;

    public NamedPipeIpcClient(string pipeName, ILogger logger = null)
    {
        _pipeName = pipeName;
        _logger = logger ?? new ConsoleLogger();
        _cancellationTokenSource = new CancellationTokenSource();
    }

    public async Task ConnectAsync()
    {
        await AttemptConnectionAsync(_cancellationTokenSource.Token);
    }

    private async Task AttemptConnectionAsync(CancellationToken token)
    {
        OnConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Connecting));
        try
        {
            _pipeStream = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
            await _pipeStream.ConnectAsync(5000, token);
            _pipeStream.ReadMode = PipeTransmissionMode.Message;

            OnConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Connected));
            Task.Run(() => ReadLoopAsync(token), token);
        }
        catch (Exception ex)
        {
            _logger.Error("Failed to connect to server.", ex);
            HandleDisconnection();
        }
    }

    private void HandleDisconnection()
    {
        OnConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Disconnected));
        _pipeStream?.Dispose();

        if (AutoReconnect && !_cancellationTokenSource.IsCancellationRequested)
        {
            _logger.Info(string.Format("Will attempt to reconnect in {0} seconds...", ReconnectInterval.TotalSeconds));
            Task.Delay(ReconnectInterval, _cancellationTokenSource.Token)
              .ContinueWith(t => AttemptConnectionAsync(_cancellationTokenSource.Token), TaskContinuationOptions.NotOnCanceled);
        }
    }

    private async Task ReadLoopAsync(CancellationToken token)
    {
        try
        {
            while (_pipeStream.IsConnected && !token.IsCancellationRequested)
            {
                // 初始缓冲区(可根据实际场景调整大小)
                byte[] buffer = new byte[1024];
                // 用于累积消息片段的内存流
                using (var memoryStream = new System.IO.MemoryStream())
                {
                    int bytesRead;
                    // 循环读取,直到消息完整
                    do
                    {
                        // 读取当前可用的消息片段(可能是部分消息)
                        bytesRead = await _pipeStream.ReadAsync(buffer, 0, buffer.Length);
                        // 将读取到的片段写入内存流
                        if (bytesRead > 0)
                        {
                            memoryStream.Write(buffer, 0, bytesRead);
                        }
                        else
                        {
                            break;
                        }
                        // 检查消息是否读取完毕
                    } while (!_pipeStream.IsMessageComplete);

                    // 将内存流中的所有片段合并为完整消息
                    _receiveQueue.Add(memoryStream.ToArray(), token);
                }
            }
        }
        catch (Exception ex) when (ex is IOException || ex is OperationCanceledException)
        {
            _logger.Warn("Read loop ended due to disconnection or cancellation.");
        }
        finally
        {
            HandleDisconnection();
        }
    }

    public byte[] Read() => _receiveQueue.Take();
    public Task<byte[]> ReadAsync() => Task.Run(() => _receiveQueue.Take());

    private void WriteInternal(byte[] message)
    {
        if (_pipeStream == null || !_pipeStream.IsConnected) throw new InvalidOperationException("Pipe is not connected.");
        byte[] bytes = new byte[message.Length];
        Buffer.BlockCopy(message, 0, bytes, 0, message.Length);
        _pipeStream.Write(bytes, 0, bytes.Length);
    }

    private async Task WriteInternalAsync(byte[] message)
    {
        if (_pipeStream == null || !_pipeStream.IsConnected) throw new InvalidOperationException("Pipe is not connected.");
        byte[] bytes = new byte[message.Length];
        Buffer.BlockCopy(message, 0, bytes, 0, message.Length);
        await _pipeStream.WriteAsync(bytes, 0, bytes.Length);
    }

    public void Write(byte[] message) => WriteInternal(message);
    public Task WriteAsync(byte[] message) => WriteInternalAsync(message);

    public void StartMessageListener()
    {
        Task.Run(() =>
        {
            foreach (var message in _receiveQueue.GetConsumingEnumerable(_cancellationTokenSource.Token))
            {
                OnMessageReceived(message);
            }
        });
    }

    protected virtual void OnMessageReceived(byte[] e) => MessageReceived?.Invoke(this, e);

    protected virtual void OnConnectionChanged(ConnectionChangedEventArgs e)
    {
        _logger.Info(string.Format("Connection status changed to: {0}", e.Status));
        ConnectionChanged?.Invoke(this, e);
    }

    public void Disconnect()
    {
        _logger.Info("Disconnecting...");
        _cancellationTokenSource?.Cancel();
        _pipeStream?.Dispose();
    }

    public void Dispose()
    {
        Disconnect();
        _receiveQueue.Dispose();
        _cancellationTokenSource?.Dispose();
    }
}

需要注意的是,上面采用的是消息模式来 PipeTransmissionMode.Message 处理消息,所以发送的时候直接发送,不需要手动计算和打包消息长度头。但读取的时候,要放在while循环里读取,直到本条消息读取完成。如果使用的是PipeTransmissionMode.Byte,则需要自己来处理消息的分帧,通用的处理策略是发送每条消息时,把消息长度和消息体打包到一起发送,在读取时,先读取4个字节的消息长度,然后再根据消息长度读取剩余的消息体。

TCP/IP 套接字


原理

TCP/IP 套接字是计算机网络领域的事实标准。当通信双方位于同一台主机时,可以通过环回接口(127.0.0.1)将其用作一种 IPC 机制。

  • 环回路径及其伴随开销:当一个进程向环回地址发送数据时,数据依然会完整地执行 TCP/IP 协议栈的大部分逻辑,包括数据封装、校验和计算、连接管理以及流量控制 。所有这些步骤都会消耗 CPU 周期和内存资源,导致其在本地通信中的固有开销和延迟高于命名管道 。   
  • 消息分帧的至关键要:TCP 是一个面向流(Stream-Oriented)的协议,而非面向消息(Message-Oriented)的协议 。这意味着 TCP 完全不保证消息边界。因此,任何一个健壮的基于 TCP 的应用层协议,都  必须实现自己的消息分帧(Message Framing)机制。最常用且高效的策略是长度前缀协议(Length-Prefix Framing) 。
  • 多范式通信接口设计:与命名管道的实现类似,我们的 TCP/IP 封装也将提供同步、异步和事件驱动的通信方法。基于 NetworkStream 和手动缓冲区管理,使用 async/await 来构建一个高效的异步 I/O 循环,该循环负责实现长度前缀协议。
  • 企业环境下的安全考量与端口管理:本地 TCP/IP 通信存在天然的安全短板。一个监听在 127.0.0.1:port 的服务对本机所有进程都是可见的。这意味着应用层安全是必须的,开发者必须自行设计和实现身份验证与授权机制。此外,在服务密集的服务器上,端口冲突与管理也是一项重要的运维工作 。

实现

TCP/IP C# 代码实现如下,先看服务端:

public class TcpIpcServer : IDisposable
{
    private readonly IPAddress _ipAddress;
    private readonly int _port;
    private readonly BlockingCollection<byte[]> _receiveQueue = new BlockingCollection<byte[]>();
    private TcpListener _listener;
    private CancellationTokenSource _cancellationTokenSource;
    private readonly ILogger _logger;

    public event EventHandler<byte[]> MessageReceived;
    public event EventHandler<ConnectionChangedEventArgs> ClientConnectionChanged;
    private TcpClient _currentClient; // For Write operations
    public TcpIpcServer(string ipAddress, int port, ILogger logger = null)
    {
        _ipAddress = IPAddress.Parse(ipAddress);
        _port = port;
        _logger = logger ?? new ConsoleLogger();
    }

    public void Start()
    {
        _cancellationTokenSource = new CancellationTokenSource();
        _listener = new TcpListener(_ipAddress, _port);
        _listener.Start();
        Task.Run(() => ListenForClientsAsync(_cancellationTokenSource.Token));
        _logger.Info(string.Format("Server started on {0}:{1}.", _ipAddress, _port));
    }

    public void StartMessageListener()
    {
        Task.Run(() =>
        {
            foreach (var message in _receiveQueue.GetConsumingEnumerable(_cancellationTokenSource.Token))
            {
                OnMessageReceived(message);
            }
        });
    }

    private async Task ListenForClientsAsync(CancellationToken token)
    {
        try
        {
            while (!token.IsCancellationRequested)
            {
                _logger.Info("Waiting for client connection...");
                var client = await _listener.AcceptTcpClientAsync();
                _logger.Info(string.Format("Client connected from {0}.", client.Client.RemoteEndPoint));
                // For 1-to-1, close previous connection if any
                _currentClient?.Close();
                _currentClient = client;
                // Handle each client in a new task
                Task.Run(() => HandleClientAsync(client, token), token);
            }
        }
        catch (OperationCanceledException) { /* Expected */ }
        catch (Exception ex)
        {
            _logger.Error("Error in listener loop.", ex);
        }
    }

    private async Task HandleClientAsync(TcpClient client, CancellationToken token)
    {
        OnClientConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Connected));
        try
        {
            using (client)
            using (var stream = client.GetStream())
            {
                await ReadLoopAsync(stream, token);
            }
        }
        catch (Exception ex)
        {
            _logger.Error("Error handling client.", ex);
        }
        finally
        {
            OnClientConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Disconnected));
            if (_currentClient == client)
            {
                _currentClient = null;
            }
        }
    }

    private async Task ReadLoopAsync(NetworkStream stream, CancellationToken token)
    {
        try
        {
            while (!token.IsCancellationRequested)
            {
                var lengthPrefix = new byte[4];
                await ReadExactBytesAsync(stream, lengthPrefix, 4, token);
                var messageLength = BitConverter.ToInt32(lengthPrefix, 0);

                var messageBuffer = new byte[messageLength];
                await ReadExactBytesAsync(stream, messageBuffer, messageLength, token);

                _receiveQueue.Add(messageBuffer, token);
            }
        }
        catch (IOException ex) { _logger.Warn(string.Format("Client disconnected with IO exception: {0}", ex.Message)); }
        catch (OperationCanceledException) { /* Expected */ }
        finally { _receiveQueue.CompleteAdding(); }
    }

    public byte[] Read()
    {
        return _receiveQueue.Take();
    }

    public Task<byte[]> ReadAsync()
    {
        return Task.Run(() => _receiveQueue.Take());
    }

    private void WriteInternal(byte[] message)
    {
        if (_currentClient == null || !_currentClient.Connected) throw new InvalidOperationException("Client not connected.");
        var stream = _currentClient.GetStream();
        var lengthPrefix = BitConverter.GetBytes(message.Length);
        stream.Write(lengthPrefix, 0, 4);
        stream.Write(message, 0, message.Length);
    }

    private async Task WriteInternalAsync(byte[] message)
    {
        if (_currentClient == null || !_currentClient.Connected) throw new InvalidOperationException("Client not connected.");
        var stream = _currentClient.GetStream();
        var lengthPrefix = BitConverter.GetBytes(message.Length);
        await stream.WriteAsync(lengthPrefix, 0, 4);
        await stream.WriteAsync(message, 0, message.Length);
    }

    public void Write(byte[] message) => WriteInternal(message);
    public Task WriteAsync(byte[] message) => WriteInternalAsync(message);

    private static async Task ReadExactBytesAsync(NetworkStream stream, byte[] buffer, int bytesToRead, CancellationToken token)
    {
        int offset = 0;
        while (offset < bytesToRead)
        {
            int bytesRead = await stream.ReadAsync(buffer, offset, bytesToRead - offset, token);
            if (bytesRead == 0) throw new EndOfStreamException();
            offset += bytesRead;
        }
    }

    protected virtual void OnMessageReceived(byte[] e)
    {
        var handler = MessageReceived;
        if (handler != null)
        {
            handler(this, e);
        }
    }

    protected virtual void OnClientConnectionChanged(ConnectionChangedEventArgs e)
    {
        _logger.Info(string.Format("Client connection status changed to: {0}", e.Status));
        ClientConnectionChanged?.Invoke(this, e);
    }

    public void Stop()
    {
        _logger.Info("Stopping server...");
        _cancellationTokenSource?.Cancel();
        _listener?.Stop();
        _currentClient?.Close();
    }

    public void Dispose()
    {
        Stop();
        _receiveQueue.Dispose();
        _cancellationTokenSource?.Dispose();
    }
}

接下来是客户端实现:

public class TcpIpcClient : IDisposable
{
    private readonly string _serverAddress;
    private readonly int _port;
    private readonly BlockingCollection<byte[]> _receiveQueue = new BlockingCollection<byte[]>();
    private TcpClient _client;
    private NetworkStream _stream;
    private CancellationTokenSource _cancellationTokenSource;
    private readonly ILogger _logger;

    public bool AutoReconnect { get; set; } = true;
    public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);

    public event EventHandler<byte[]> MessageReceived;
    public event EventHandler<ConnectionChangedEventArgs> ConnectionChanged;

    public TcpIpcClient(string serverAddress, int port, ILogger logger = null)
    {
        _serverAddress = serverAddress;
        _port = port;
        _logger = logger ?? new ConsoleLogger();
    }

    public async Task ConnectAsync()
    {
        _cancellationTokenSource = new CancellationTokenSource();
        await AttemptConnectionAsync(_cancellationTokenSource.Token);
    }

    public void StartMessageListener()
    {
        Task.Run(() =>
        {
            foreach (var message in _receiveQueue.GetConsumingEnumerable(_cancellationTokenSource.Token))
            {
                OnMessageReceived(message);
            }
        });
    }

    private async Task AttemptConnectionAsync(CancellationToken token)
    {
        OnConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Connecting));
        try
        {
            _client = new TcpClient();
            await _client.ConnectAsync(_serverAddress, _port);
            _stream = _client.GetStream();

            OnConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Connected));
            Task.Run(() => ReadLoopAsync(token), token);
        }
        catch (Exception ex)
        {
            _logger.Error("Failed to connect to server.", ex);
            HandleDisconnection();
        }
    }

    private void HandleDisconnection()
    {
        OnConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Disconnected));
        _stream?.Dispose();
        _client?.Close();

        if (AutoReconnect && !_cancellationTokenSource.IsCancellationRequested)
        {
            _logger.Info(string.Format("Will attempt to reconnect in {0} seconds...", ReconnectInterval.TotalSeconds));
            Task.Delay(ReconnectInterval, _cancellationTokenSource.Token)
               .ContinueWith(t => AttemptConnectionAsync(_cancellationTokenSource.Token), TaskContinuationOptions.NotOnCanceled);
        }
    }

    private async Task ReadLoopAsync(CancellationToken token)
    {
        try
        {
            while (!token.IsCancellationRequested)
            {
                var lengthPrefix = new byte[4];
                await ReadExactBytesAsync(_stream, lengthPrefix, 4, token);
                var messageLength = BitConverter.ToInt32(lengthPrefix, 0);

                var messageBuffer = new byte[messageLength];
                await ReadExactBytesAsync(_stream, messageBuffer, messageLength, token);

                _receiveQueue.Add(messageBuffer, token);
            }
        }
        catch (Exception ex) when (ex is IOException || ex is OperationCanceledException || ex is EndOfStreamException)
        {
            _logger.Warn("Read loop ended due to disconnection or cancellation.");
        }
        finally
        {
            HandleDisconnection();
        }
    }

    public byte[] Read()
    {
        return _receiveQueue.Take();
    }

    public Task<byte[]> ReadAsync()
    {
        return Task.Run(() => _receiveQueue.Take());
    }

    public void Write(byte[] message)
    {
        if (_stream == null) throw new InvalidOperationException("Not connected.");
        var lengthPrefix = BitConverter.GetBytes(message.Length);
        _stream.Write(lengthPrefix, 0, 4);
        _stream.Write(message, 0, message.Length);
    }

    public async Task WriteAsync(byte[] message)
    {
        if (_stream == null) throw new InvalidOperationException("Not connected.");
        var lengthPrefix = BitConverter.GetBytes(message.Length);
        await _stream.WriteAsync(lengthPrefix, 0, 4);
        await _stream.WriteAsync(message, 0, message.Length);
    }

    private static async Task ReadExactBytesAsync(NetworkStream stream, byte[] buffer, int bytesToRead, CancellationToken token)
    {
        int offset = 0;
        while (offset < bytesToRead)
        {
            int bytesRead = await stream.ReadAsync(buffer, offset, bytesToRead - offset, token);
            if (bytesRead == 0) throw new EndOfStreamException();
            offset += bytesRead;
        }
    }

    protected virtual void OnMessageReceived(byte[] e)
    {
        var handler = MessageReceived;
        if (handler != null)
        {
            handler(this, e);
        }
    }

    protected virtual void OnConnectionChanged(ConnectionChangedEventArgs e)
    {
        _logger.Info(string.Format("Connection status changed to: {0}", e.Status));
        ConnectionChanged?.Invoke(this, e);
    }

    public void Disconnect()
    {
        _logger.Info("Disconnecting...");
        _cancellationTokenSource?.Cancel();
        _stream?.Dispose();
        _client?.Close();
    }

    public void Dispose()
    {
        Disconnect();
        _receiveQueue.Dispose();
        _cancellationTokenSource?.Dispose();
    }
}

内存映射文件

首先实现一个针对MMF读取写入的环形缓冲区实现,代码如下:

/// <summary>
/// A lock-free, single-producer, single-consumer circular buffer for IPC using a Memory-Mapped File.
/// This implementation handles message wrap-around.
/// </summary>
public sealed unsafe class LockFreeCircularBuffer : IDisposable
{
    [StructLayout(LayoutKind.Explicit)]
    private struct Header
    {
        [FieldOffset(0)]
        public int Capacity;
        [FieldOffset(4)]
        public int Head; // Read position
        [FieldOffset(8)]
        public int Tail; // Write position
    }

    private readonly MemoryMappedFile _mmf;
    private readonly MemoryMappedViewAccessor _accessor;
    private readonly EventWaitHandle _dataAvailableEvent;
    private readonly int _capacity;
    private readonly int _bufferSize;
    private readonly int _headerSize;

    public WaitHandle ReadWaitHandle => _dataAvailableEvent;

    public LockFreeCircularBuffer(string name, int capacity = 1024 * 1024)
    {
        _capacity = capacity;
        _bufferSize = capacity;
        _headerSize = Marshal.SizeOf(typeof(Header));
        long totalSize = _headerSize + _bufferSize;

        _mmf = MemoryMappedFile.CreateOrOpen(name, totalSize);
        _accessor = _mmf.CreateViewAccessor();
        _dataAvailableEvent = new EventWaitHandle(false, EventResetMode.AutoReset, name + "_evt");

        if (IsCreator())
        {
            var header = new Header { Capacity = _capacity, Head = 0, Tail = 0 };
            _accessor.Write(0, ref header);
        }
        else
        {
            Header header;
            _accessor.Read(0, out header);
            if (header.Capacity != _capacity)
            {
                throw new InvalidOperationException("Circular buffer capacity does not match existing instance.");
            }
        }
    }

    private bool IsCreator()
    {
        return _accessor.ReadInt32(0) == 0;
    }

    public bool TryWrite(byte[] data)
    {
        int head;
        int tail;
        int nextTail;

        Header* header = null;
        try
        {

            header = GetHeaderPointer();
            head = Thread.VolatileRead(ref header->Head);
            tail = Thread.VolatileRead(ref header->Tail);

            int messageLength = data.Length;
            int requiredSpace = messageLength + 4; // +4 for length prefix

            int freeSpace;
            if (head > tail)
                freeSpace = head - tail - 1;
            else
                freeSpace = _capacity - tail + head - 1;

            //int freeSpace = (head - tail - 1 + _capacity) % _capacity;
            if (freeSpace < requiredSpace) // +4 for length prefix
            {
                return false; // Not enough space
            }

            // Write the message length
            byte[] lengthBytes = BitConverter.GetBytes(messageLength);
            WriteBytes(tail, lengthBytes);
            nextTail = (tail + 4) % _capacity;

            // Write the message data, handling wrap-around
            WriteBytes(nextTail, data);
            nextTail = (nextTail + messageLength) % _capacity;

            // Atomically update the tail pointer
            Interlocked.Exchange(ref header->Tail, nextTail);
        }
        finally
        {
            if (header != null)
                _accessor.SafeMemoryMappedViewHandle.ReleasePointer();
        }
        _dataAvailableEvent.Set();
        return true;
    }

    public bool TryRead(out byte[] data)
    {
        data = null;
        int head;
        int tail;
        int nextHead;

        Header* header = null;
        try
        {
            header = GetHeaderPointer();
            head = Thread.VolatileRead(ref header->Head);
            tail = Thread.VolatileRead(ref header->Tail);
            if (head == tail)
            {
                return false; // Buffer is empty
            }

            // Read message length
            byte[] lengthBytes = ReadBytes(head, 4);
            int messageLength = BitConverter.ToInt32(lengthBytes, 0);
            nextHead = (head + 4) % _capacity;

            // Read message data, handling wrap-around
            data = ReadBytes(nextHead, messageLength);
            nextHead = (nextHead + messageLength) % _capacity;

            // Atomically update the head pointer
            Interlocked.Exchange(ref header->Head, nextHead);
        }
        finally
        {
            if (header != null)
                _accessor.SafeMemoryMappedViewHandle.ReleasePointer();
        }
        return true;
    }

    private void WriteBytes(int position, byte[] data)
    {
        int firstChunkSize = Math.Min(data.Length, _capacity - position);
        _accessor.WriteArray(_headerSize + position, data, 0, firstChunkSize);

        if (firstChunkSize < data.Length)
        {
            int remaining = data.Length - firstChunkSize;
            _accessor.WriteArray(_headerSize, data, firstChunkSize, remaining);
        }
    }

    private byte[] ReadBytes(int position, int count)
    {
        byte[] buffer = new byte[count];
        int firstChunkSize = Math.Min(count, _capacity - position);
        _accessor.ReadArray(_headerSize + position, buffer, 0, firstChunkSize);

        if (firstChunkSize < count)
        {
            int remaining = count - firstChunkSize;
            _accessor.ReadArray(_headerSize, buffer, firstChunkSize, remaining);
        }
        return buffer;
    }

    private unsafe Header* GetHeaderPointer()
    {
        byte* ptr = null;
        _accessor.SafeMemoryMappedViewHandle.AcquirePointer(ref ptr);
        if (ptr == null)
        {
            return null;
        }
        return (Header*)ptr;
    }

    public void Dispose()
    {
        _dataAvailableEvent?.Close();
        _accessor?.Dispose();
        _mmf?.Dispose();
    }
}

基于上述结构,服务端代码MmfIpcServer如下:

public class MmfIpcServer : IDisposable
{
    private readonly string _channelName;
    private readonly BlockingCollection<byte[]> _receiveQueue = new BlockingCollection<byte[]>();
    private CancellationTokenSource _cancellationTokenSource;
    private readonly ILogger _logger;

    private LockFreeCircularBuffer _receiveBuffer; // Client -> Server
    private LockFreeCircularBuffer _sendBuffer;    // Server -> Client

    public event EventHandler<byte[]> MessageReceived;
    public event EventHandler<ConnectionChangedEventArgs> ClientConnectionChanged;

    public MmfIpcServer(string channelName, ILogger logger = null)
    {
        _channelName = channelName;
        _logger = logger ?? new ConsoleLogger();
    }

    public void Start()
    {
        _cancellationTokenSource = new CancellationTokenSource();
        _receiveBuffer = new LockFreeCircularBuffer(_channelName + "_C2S");
        _sendBuffer = new LockFreeCircularBuffer(_channelName + "_S2C");

        Task.Run(() => ReadLoop(_cancellationTokenSource.Token));
        _logger.Info("MMF Server started.");
        OnClientConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Connected));
    }

    public void StartMessageListener()
    {
        Task.Run(() =>
        {
            foreach (var message in _receiveQueue.GetConsumingEnumerable(_cancellationTokenSource.Token))
            {
                OnMessageReceived(message);
            }
        });
    }

    private void ReadLoop(CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {
            try
            {
                if (_receiveBuffer.ReadWaitHandle.WaitOne(1000))
                {
                    byte[] buffer;
                    while (_receiveBuffer.TryRead(out buffer) && !token.IsCancellationRequested)
                    {
                        if (buffer != null && buffer.Length > 0) 
                           _receiveQueue.Add(buffer, token);
                    }
                }
            }
            catch (OperationCanceledException) { break; }
            catch (Exception ex) { _logger.Error("MMF Server Read Error: ", ex); }
        }
        _receiveQueue.CompleteAdding();
    }

    public byte[] Read() => _receiveQueue.Take();
    public Task<byte[]> ReadAsync() => Task.Run(() => _receiveQueue.Take());

    private void WriteInternal(byte[] message)
    {
        if (!_sendBuffer.TryWrite(message))
        {
            _logger.Warn("MMF send buffer is full. Message dropped.");
        }
    }

    public void Write(byte[] message) => WriteInternal(message);
    public Task WriteAsync(byte[] message) => Task.Run(() => WriteInternal(message));

    protected virtual void OnMessageReceived(byte[] e) => MessageReceived?.Invoke(this, e);
    protected virtual void OnClientConnectionChanged(ConnectionChangedEventArgs e) => ClientConnectionChanged?.Invoke(this, e);

    public void Stop()
    {
        _logger.Info("Stopping MMF server...");
        _cancellationTokenSource?.Cancel();
        OnClientConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Disconnected));
    }

    public void Dispose()
    {
        Stop();
        _receiveBuffer?.Dispose();
        _sendBuffer?.Dispose();
        _receiveQueue.Dispose();
        _cancellationTokenSource?.Dispose();
    }
}

客户端MmfIpcClient代码如下:

public class MmfIpcClient : IDisposable
{
    private readonly string _channelName;
    private readonly BlockingCollection<byte[]> _receiveQueue = new BlockingCollection<byte[]>();
    private CancellationTokenSource _cancellationTokenSource;
    private readonly ILogger _logger;

    private LockFreeCircularBuffer _sendBuffer;
    private LockFreeCircularBuffer _receiveBuffer;

    public bool AutoReconnect { get; set; } = true;
    public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);

    public event EventHandler<byte[]> MessageReceived;
    public event EventHandler<ConnectionChangedEventArgs> ConnectionChanged;

    public MmfIpcClient(string channelName, ILogger logger = null)
    {
        _channelName = channelName;
        _logger = logger ?? new ConsoleLogger();
    }

    public async Task ConnectAsync()
    {
        _cancellationTokenSource = new CancellationTokenSource();
        await AttemptConnectionAsync(_cancellationTokenSource.Token);
    }

    private async Task AttemptConnectionAsync(CancellationToken token)
    {
        OnConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Connecting));
        while (!token.IsCancellationRequested)
        {
            try
            {
                _sendBuffer = new LockFreeCircularBuffer(_channelName + "_C2S");
                _receiveBuffer = new LockFreeCircularBuffer(_channelName + "_S2C");

                OnConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Connected));
                Task.Run(() => ReadLoop(token), token);
                return;
            }
            catch (Exception ex)
            {
                _logger.Error("Failed to connect to MMF server.", ex);
                HandleDisconnection(false);
                if (!AutoReconnect) break;

                _logger.Info(string.Format("Will attempt to reconnect in {0} seconds...", ReconnectInterval.TotalSeconds));
                await Task.Delay(ReconnectInterval, token);
            }
        }
    }

    private void HandleDisconnection(bool triggerReconnect)
    {
        OnConnectionChanged(new ConnectionChangedEventArgs(ConnectionStatus.Disconnected));
        _sendBuffer?.Dispose();
        _receiveBuffer?.Dispose();

        if (AutoReconnect && triggerReconnect && !_cancellationTokenSource.IsCancellationRequested)
        {
            Task.Run(() => AttemptConnectionAsync(_cancellationTokenSource.Token));
        }
    }

    private void ReadLoop(CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {
            try
            {
                if (_receiveBuffer.ReadWaitHandle.WaitOne(1000))
                {
                    byte[] buffer;
                    while (_receiveBuffer.TryRead(out buffer) && !token.IsCancellationRequested)
                    {
                        _receiveQueue.Add(buffer, token);
                    }
                }
            }
            catch (Exception ex)
            {
                _logger.Error("MMF Client Read Error: ", ex);
                HandleDisconnection(true);
                break;
            }
        }
        _receiveQueue.CompleteAdding();
    }

    public byte[] Read() => _receiveQueue.Take();
    public Task<byte[]> ReadAsync() => Task.Run(() => _receiveQueue.Take());

    private void WriteInternal(byte[] message)
    {
        if (_sendBuffer == null) throw new InvalidOperationException("Not connected.");
        if (!_sendBuffer.TryWrite(message))
        {
            _logger.Warn("MMF send buffer is full. Message dropped.");
        }
    }

    public void Write(byte[] message) => WriteInternal(message);
    public Task WriteAsync(byte[] message) => Task.Run(() => WriteInternal(message));

    public void StartMessageListener()
    {
        Task.Run(() =>
        {
            foreach (var message in _receiveQueue.GetConsumingEnumerable(_cancellationTokenSource.Token))
            {
                OnMessageReceived(message);
            }
        });
    }

    protected virtual void OnMessageReceived(byte[] e) => MessageReceived?.Invoke(this, e);
    protected virtual void OnConnectionChanged(ConnectionChangedEventArgs e)
    {
        _logger.Info(string.Format("Connection status changed to: {0}", e.Status));
        ConnectionChanged?.Invoke(this, e);
    }

    public void Disconnect()
    {
        _logger.Info("Disconnecting...");
        _cancellationTokenSource?.Cancel();
    }

    public void Dispose()
    {
        Disconnect();
        _sendBuffer?.Dispose();
        _receiveBuffer?.Dispose();
        _receiveQueue.Dispose();
        _cancellationTokenSource?.Dispose();
    }
}

性能实证分析


测试原则


为了对这三个IPC方法的性能进行量化评估,这里提供了一个基于 BenchmarkDotNet 的基准测试。基准测试的方法如下:

  • 测试框架:基于 BenchmarkDotNet。
  • 测试场景: 模拟一个典型的一对一、请求-响应(“乒乓”)场景。客户端发送一条消息,服务端收到后立即将完全相同的消息返回。测量完成一次“往返(Round-Trip)”所需的时间。
  • 测试实现: 为了隔离 IPC 传输本身的开销,基准测试将直接操作 byte,避免 JSON 序列化的影响。
  • 测试参数: 测试将在多种不同的消息负载(Payload)大小下运行:16 字节、256 字节、4 KB 和 16 KB。

基准测试代码


首先是BenchmarkBase代码,这是三个测试基准代码的基类:

public abstract class BenchmarkBase
{
    protected ILogger _logger = new ConsoleLogger();
    protected byte[] _payload;
 
    [Params(16, 256, 4096, 16384)]
    public int PayloadSize { get; set; }
 
    [GlobalSetup]
    public void GlobalSetup()
    {
        _payload = new byte[PayloadSize];
        new Random(42).NextBytes(_payload);
        Setup();
    }

    [GlobalCleanup]
    public void GlobalCleanup()
    {
        Cleanup();
    }

    public abstract void Setup();
    public abstract void Cleanup();
}

如果觉得4个不同大小的参数不够,还可以以步长的方式指定,比如:

public IEnumerable<int> Values => Enumerable.Range(1, 1024).Select(x => x * 16);

[ParamsSource(nameof(Values))]
public int PayloadSize { get; set; }

则表示PayloadSize会从16开始,每隔16,一直到1024×16。

测试是以客户端发送消息,服务端收到消息之后回传给客户端,客户端收到发送的消息为一次测试,也称之为”乒乓“测试。在代码中的做法就是定义一个AutoResetEvent,当客户端发送之后等待这个AutoResetEvent,等客户端再次接收到消息,触发AutoResetEvent事件,然后等待的方法收到事件后结束。

NamedPipeBenchmark的代码如下:

public class NamedPipeBenchmark : BenchmarkBase
{
    private NamedPipeIpcServer _server;
    private NamedPipeIpcClient _client;
    private const string PipeName = "ipc-benchmark-pipe";
    private AutoResetEvent receiveMessage = new AutoResetEvent(false);
    public override void Setup()
    {
        _server = new NamedPipeIpcServer(PipeName, _logger);
        _server.MessageReceived += (s, e) =>
        {
            _server.Write(e); // Echo server
        };
        _server.Start();
        _server.StartMessageListener();
        // Give server time to start
        Task.Delay(2000).Wait();

        _client = new NamedPipeIpcClient(PipeName, _logger);
        _client.MessageReceived += (s, e) =>
        {
            receiveMessage.Set();
        }; // Signal when a message is received
        _client.ConnectAsync().Wait();
        _client.StartMessageListener();
        Task.Delay(2000).Wait();
    }

    public override void Cleanup()
    {
        _client.Dispose();
        _server.Dispose();
    }

    [Benchmark(Baseline = true)]
    public void PingPong()
    {
        _client.Write(_payload);
        receiveMessage.WaitOne(); // Wait for the message to be received
    }
}

TcpBenchmark 代码如下:

public class TcpBenchmark : BenchmarkBase
{
    private TcpIpcServer _server;
    private TcpIpcClient _client;
    private const string Ip = "127.0.0.1";
    private const int Port = 9999;
    private AutoResetEvent receiveMessage = new AutoResetEvent(false);
    public override void Setup()
    {
        _server = new TcpIpcServer(Ip, Port, _logger);
        _server.MessageReceived += (s, e) =>
        {
            _server.Write(e); // Echo server
        };
        _server.Start();
        _server.StartMessageListener();
        // Give server time to start
        Task.Delay(2000).Wait();
        _client = new TcpIpcClient(Ip, Port, _logger);
        _client.MessageReceived += (s, e) =>
        {
            receiveMessage.Set();
        }; // Signal when a message is received
        _client.ConnectAsync().Wait();
        _client.StartMessageListener();
        Task.Delay(2000).Wait();
    }


    public override void Cleanup()
    {
        _client.Dispose();
        _server.Dispose();
    }

    [Benchmark]
    public void PingPong()
    {
        _client.Write(_payload);
        receiveMessage.WaitOne(); // Wait for the message to be received()
    }
}

MmfBenchmark代码如下:

public class MmfBenchmark : BenchmarkBase
{
    private MmfIpcServer _server;
    private MmfIpcClient _client;
    private const string ChannelName = "ipc-benchmark-mmf";
    private AutoResetEvent receiveMessage = new AutoResetEvent(false);
    public override void Setup()
    {
        _server = new MmfIpcServer(ChannelName, _logger);
        _server.MessageReceived += (s, e) =>
        {
            _server.Write(e); // Echo server
        };

        _server.Start();
        _server.StartMessageListener();
        // Give server time to start
        Task.Delay(2000).Wait();

        _client = new MmfIpcClient(ChannelName, _logger);
        _client.MessageReceived += (s, e) => receiveMessage.Set(); // Signal when a message is received
        _client.ConnectAsync().Wait();
        //如果是采用异步读取触发,则需要调用下卖弄的方法
        _client.StartMessageListener();
        Task.Delay(2000).Wait();
    }

    public override void Cleanup()
    {
        _client.Dispose();
        _server.Dispose();
    }

    [Benchmark]
    public void PingPong()
    {
        _client.Write(_payload);
        receiveMessage.WaitOne(); // Wait for the message to be received
    }
}

主测试的代码如下:

static void Main(string[] args)
{
    Console.WriteLine("Running Named Pipe Benchmark...");
    BenchmarkRunner.Run<NamedPipeBenchmark>();

    Console.WriteLine("\nRunning TCP Benchmark...");
    BenchmarkRunner.Run<TcpBenchmark>();

    Console.WriteLine("\nRunning MMF Benchmark...");
    BenchmarkRunner.Run<MmfBenchmark>();

    Console.ReadLine();
}

以Release条件编译,就可以直接运行输出结果了。

在运行测试期间如果报错,可以用以下方式debug,比如:

NamedPipeBenchmark b = new NamedPipeBenchmark();
b.PayloadSize = 4096;
b.GlobalSetup();
b.PingPong();

然后调试即可。

性能结果与分析


在以下条件基准下:

BenchmarkDotNet v0.15.2, Windows 11 (10.0.26100.4652/24H2/2024Update/HudsonValley)
Intel Core i7-14700 2.10GHz, 1 CPU, 28 logical and 20 physical cores
  [Host]     : .NET Framework 4.8.1 (4.8.9310.0), X86 LegacyJIT
  DefaultJob : .NET Framework 4.8.1 (4.8.9310.0), X86 LegacyJIT

测试结果如下:

| PayloadSize | MMF        | TCPIP    | NamedPipe|
|--------------|------------|----------|-------------|
| 16B             |  40.04 μs  | 64.08 μs | 47.95 μs    |
| 256B           |  43.75 μs  | 65.84 μs | 46.85 μs    |
| 4096B         | 101.45 μs | 67.82 μs | 46.41 μs    |
| 16384B       | 252.39 μs | 73.12 μs | 58.69 μs    |

分析:

  1. 小负载 (16B, 256B): MMF的绝对优势。对于小消息,总延迟主要由固定的系统调用开销决定。MMF 的无锁实现开销极低,因此性能遥遥领先。
  2. 大负载 (4KB, 16KB): 性能反转。当负载增大,数据拷贝的成本成为主导因素。
    • 命名管道/TCP: 它们的数据拷贝在操作系统内核中完成,由高度优化的原生代码执行,效率极高。
    • MMF (自研实现): 我们的实现虽然避免了用户态/内核态的切换拷贝,但从托管 byte 到共享内存视图的拷贝是通过 MemoryMappedViewAccessor 完成的。这个过程,特别是当需要处理回环而将一次写入拆分为两次物理拷贝时,其效率可能低于内核为管道和套接字提供的、一次性的原生内存拷贝。
  3. MMF 是低延迟、高频、中小负载场景下的性能之王。对于大块数据的持续流式传输,命名管道可能凭借其高效的内核拷贝机制提供更强的吞吐量。TCP 因其协议栈开销,在所有本地场景下都是最慢的。

最终结论


各种IPC方式的比较如下:

基于场景的选型建议:

场景一:标准服务与应用通信

  • 描述:一个在后台运行的 Windows 服务需要与一个桌面 UI 应用进行通信;或者一个主应用程序需要与多个插件进行交互。通信频率适中,数据量不大,但要求可靠和安全。
  • 推荐:命名管道 (Named Pipes)
  • 理由:这是命名管道的“甜点区”。它提供了远超 TCP 的性能和更适合本地通信的轻量级协议。其内置的 Message 模式和基于 ACL 的安全模型,完美解决了分帧和安全这两大难题,极大地降低了开发成本和风险。对于绝大多数本地 C/S 架构,命名管道应是首选方案。

场景二:超低延迟、高频交易或实时数据处理

  • 描述:金融领域的高频交易系统、科学计算中的实时数据采集与分析等场景,对延迟极其敏感,且消息通常为中小型。
  • 推荐:内存映射文件(采用无锁环形缓冲区)
  • 理由:在性能是唯一且压倒一切的考量因素时,MMF 是唯一的选择。其“零拷贝”和无锁特性提供了物理硬件所能支持的性能极限。但需注意,对于持续传输数 KB 以上的大块数据,应进行基准测试以确认其性能是否优于命名管道。

场景三:快速原型开发或未来可能网络化

  • 描述:一个项目初期,需要快速搭建一个 IPC 通道进行功能验证;或者架构上预留了未来将服务拆分并部署到不同机器的可能性。
  • 推荐:TCP/IP 套接字(但需高度警惕)
  • 理由:TCP API 的通用性使其成为一个看似便捷的选择。然而,决策者必须清醒地认识到,为了使本地通信正确工作,团队必须投入时间去实现消息分帧和应用层安全。如果应用长期内都只在本地运行,那么 TCP 是一个次优选择。

场景四:多进程共享大型静态数据集

  • 描述:一个大型的、数 GB 的数据集被一个进程加载到内存后,需要被多个其他只读进程频繁访问。
  • 推荐:内存映射文件(采用持久化 MMF 和简单锁)
  • 理由:这是 MMF 的另一个理想应用场景。通过将磁盘文件映射到内存,可以避免每个进程都将整个文件加载到自己的私有内存中,从而极大地节省了物理 RAM。由于数据主要是写入一次、读取多次,同步逻辑可以大大简化,使用基于 Mutex 的简单锁机制即可。

C# 本地进程间通信领域,不存在“一种通用”的最佳方案,而是存在一个基于具体需求的“最优”选择。

  • 内存映射文件(MMF) 是无可争议的低延迟之王,尤其是在处理高频率、中小型消息时,但它要求使用者具备驾驭底层并发编程的深厚功力。
  • TCP/IP 套接字,在高性能本地 IPC 场景中,其表现更像是一个有着沉重负担的协议。协议栈的开销和必需的应用层协议开发,使其成为效率和开发成本上都相对较差的选择。
  • 命名管道则扮演了更加务实的角色。它在性能、安全性和易用性之间取得了卓越的平衡,为绝大多数企业级应用提供了最合理的、风险收益比最高的解决方案,并在传输大块数据时可能展现出最佳的吞吐量。