在某些情况下,我们需要在本机上的多个进程间进行通讯(inter process communication,IPC)。进程间进行通讯的方式有很多种,比如共享内存、TCP/IP、命名管道(Named Pipes)、Windows消息等等。本文主要介绍命名管道这种进程间通讯的使用方法。

为什么需要进程间通讯


需要使用IPC的可能原因有:

  • 隔离。在某些情况下,一个程序的功能实现可能需要依赖其它功能或者模块。比如某一部分的功能需要与非托管代码进行交互。我们当然可以使用P/Invoke(Platform Invoke)或者C++ CLI之类的技术对API进行封装,但如果这部分不受我们控制的非托管代码出现问题,比如报错或崩溃,可能会连带导致托管程序也一起崩溃。这时就可以将这部分非托管代码单独出来做成一个程序(不管是使用托管代码或者非托管代码实现),然后通过IPC技术与主程序进行通讯,这样即使这部分非托管代码导致了程序崩溃,也不会影响到主程序的运行。
  • 系统限制。在Windows中大部分的UI程序,无论是Word,Excel,还是我们使用Winform技术编写的程序,都是STA线程模式,即只允许有一个UI线程。所有的UI渲染、交互都只能在UI线程上进行,可以在后台线程或者线程池里完成一些耗时操作,但是涉及到UI更新则必须要将消息通过消息队列“泵送”到UI线程中。由于只有一个UI线程,在某些情况下,如果UI线程的工作量比较大,或者需要较高的刷新频率,刷新率可能就达不到我们的要求。比如在我们的某个交易客户端里,有大量的K线图需要展示,当来了一笔成交数据,就需要更新K线图,所以K线图的渲染工作量很大。当行情激烈的时候,不管是界面上的K线图,还是展示快照行情的地方,因为UI线程忙不过来,刷新率就会降低,用户就会感觉到界面“卡顿”。可能的办法就是将需要集中展示K线或者UI渲染工作量大的部分,独立出来做成单独的应用,然后将这部分UI更新需要的数据通过IPC的方式从主程序发送到独立出来的应用。这样原来的一个程序现在拆成了两个程序,一定程度上突破了STA程序只有一个UI线程,当UI渲染工作量大的时候会导致刷新率低的限制。

进行间通讯相比程序内部不同线程间通讯毫无疑问耗时会稍高一些,但是相比以上两个限制,这种消耗在某些情况下是值得的。在现实中有些我们常用的软件也会在主程序之外启动其它的进程挺住在后台,比如常用的WeChat客户端。

▲微信客户端,当打开公众号时会在WeChatAppEx中显示相关内容

进程间通讯的实现方式


进程间通讯的实现方式有很多种:

  • 共享内存(Shared Memory)、文件映射(Memory Mapped Files),系统剪贴板也是一种方式。
  • 命名管道(Named Piped)、匿名管道(Anonymous Pipes)
  • TCP
  • Windows消息(Windows Messaging),比如利用两个Windows窗体相互发送和订阅消息。
  • 消息队列,比如ActiveMQ,RabbitMQ这些很容易实现发布订阅模式。

Windows消息,可以利用Win32里面的api来实现,比如:

[DllImport("User32.dll", EntryPoint = "SendMessage")]
private static extern int SendMessage(IntPtr wnd,int msg,IntPtr wP,IntPtr lP);

其中:

  • wnd:接收消息的窗口的句柄。如果此参数为HWND_BROADCAST,则消息将被发送到系统中所有顶层窗口,包括无效或不可见的非自身拥有的窗口、被覆盖的窗口和弹出式窗口,但消息不被发送到子窗口。
  • msg:指定被发送的消息类型。
  • wP:消息内容。
  • lP:指定附加的消息指定信息。

Windows消息的局限性在于通讯双方必须是Winform窗体,有些应用比如Windows Service服务,就无法利用这一点进行通讯。另外这种P/Invoke的方式,方法参数实际传递的是指针,会涉及到对象固定。如果有兴趣可以参考相关的文档说明。

这里主要想介绍的是命名管道。从速度上来看,共享内存最快,命名管道其次但至少不比TCP慢。因为共享内存涉及到Unsafe操作,所以更偏向Named Piped。

什么是命名管道


命名管道看起来是Windows上的一种资源,在PowerShell中,可以使用如下命令查看本机上的所有命名管道

get-childitem \\.\pipe\

列出结果如下(太多了,部分删减):

PS C:\Users\yangyang> get-childitem \\.\pipe\

    目录: \\.\pipe\SQLLocal

Mode                 LastWriteTime         Length Name
----                 -------------         ------ ----
------          1601/1/1      8:00              2 SQLLocal\MSSQLSERVER

    目录: \\.\pipe\sql

Mode                 LastWriteTime         Length Name
----                 -------------         ------ ----
------          1601/1/1      8:00              2 sql\query

    目录: \\.\pipe\GoogleCrashServices

Mode                 LastWriteTime         Length Name
----                 -------------         ------ ----
------          1601/1/1      8:00              1 GoogleCrashServices\S-1-5-18
------          1601/1/1      8:00              1 GoogleCrashServices\S-1-5-18-x64

    目录: \\.\pipe

Mode                 LastWriteTime         Length Name
----                 -------------         ------ ----
------          1601/1/1      8:00              3 MsFteWds
------          1601/1/1      8:00              1 SearchTextHarvester
------          1601/1/1      8:00             38 TSVNCache-000000000075190b
------          1601/1/1      8:00              5 TSVNCacheCommand-000000000075190b
------          1601/1/1      8:00              2 LogiOptions{c5255338-1a20-4aaf-8825-defe7013f405-1}
------          1601/1/1      8:00              2 LogiOptions{43934ff4-74ad-4df3-aabf-d1a70657cdbc-1}
------          1601/1/1      8:00              1 LogiOverlay{1baf0af1-7425-4411-9a85-6c6a0a71d4f3-1}
------          1601/1/1      8:00              3 com.adobe.AdobeIPCBroker.ctrl-yangyang
------          1601/1/1      8:00              2 crashpad_12712_GLCKYFLKXHUIRLWD
------          1601/1/1      8:00              1 mojo.12712.5384.12067546639256367799
------          1601/1/1      8:00              1 mojo.12712.5384.11928156104756002532
------          1601/1/1      8:00              1 04C80331FAB0F216FD73D9930DC00A0E
------          1601/1/1      8:00              1 mojo.12712.5384.14872135345273823422
------          1601/1/1      8:00              1 mojo.12712.5384.17704450154668616578
------          1601/1/1      8:00              6 sogouCloudImeAgentPipe4436
------          1601/1/1      8:00              3 efsrpc
------          1601/1/1      8:00              1 devenv.perfwatson.20012
------          1601/1/1      8:00              1 mojo.12712.11624.7492884987350168219
------          1601/1/1      8:00              1 mojo.12712.11624.5261887561502754823
------          1601/1/1      8:00              1 mojo.12712.11624.13371054178782983191
------          1601/1/1      8:00              1 mojo.12712.11624.1875763664438552037
------          1601/1/1      8:00              1 named_pipe_test_server_1
------          1601/1/1      8:00              1 named_pipe_test_server_2
------          1601/1/1      8:00              1 named_pipe_test_server
------          1601/1/1      8:00              1 mojo.12712.5384.17906999344906419012
------          1601/1/1      8:00              1 mojo.12712.11624.13266080779881921418

可以看到,有很多程序都使用了NamedPipe,包括上面以mojo开头的应该就是Chrome浏览器、还有SQLServer等等。

除此之外,通过使用Sysinternals提供pipelist.exe的这个小程序,在命令行中也可以列出来所有的NamedPipe,除了上述信息之外,它还列出来了实例个数和最大允许实列个数,

E:\用户目录\Downloads\PipeList>pipelist64

PipeList v1.02 - Lists open named pipes
Copyright (C) 2005-2016 Mark Russinovich
Sysinternals - www.sysinternals.com

Pipe Name                                    Instances       Max Instances
---------                                    ---------       -------------
InitShutdown                                      3               -1
lsass                                             4               -1
ntsvcs                                            3               -1
scerpc                                            3               -1
Winsock2\CatalogChangeListener-2b4-0              1                1
Winsock2\CatalogChangeListener-440-0              1                1
epmapper                                          3               -1
LSM_API_service                                   3               -1
eventlog                                          3               -1
atsvc                                             3               -1
spoolss                                           3               -1
wkssvc                                            4               -1
trkwks                                            3               -1
TermSrv_API_service                               3               -1
Ctx_WinStation_API_service                        3               -1
srvsvc                                            4               -1
SessEnvPublicRpc                                  3               -1
Winsock2\CatalogChangeListener-13ac-0             1                1
ROUTER                                            3               -1
PIPE_EVENTROOT\CIMV2SCM EVENT PROVIDER            1               -1
_QPIPC_PUB1015_                                   1              128
SQLLocal\MSSQLSERVER                              2               -1
sql\query                                         2               -1
Winsock2\CatalogChangeListener-3fc-0              1                1
msfte\MSSQL10_50.MSSQLSERVERG308d9124f3a6af46f3fa7baf2f038facd30a3j8g6FTEtoFDAdmin          1                1
msfte\MSSQL10_50.MSSQLSERVERG308d9124f3a6af46f3fa7baf2f038facd30a3j8g6FDReq          1                1
msfte\MSSQL10_50.MSSQLSERVERG308d9124f3a6af46f3fa7baf2f038facd30a3j8g6CBStatus          1                1
GoogleCrashServices\S-1-5-18                      1                1
GoogleCrashServices\S-1-5-18-x64                  1                1
MsFteWds                                          3               -1
SearchTextHarvester                               1               -1
TSVNCache-000000000075190b                       38               -1
TSVNCacheCommand-000000000075190b                 5               -1
LogiOptions{c5255338-1a20-4aaf-8825-defe7013f405-1}          2               -1
LogiOptions{43934ff4-74ad-4df3-aabf-d1a70657cdbc-1}          2               -1
LogiOverlay{1baf0af1-7425-4411-9a85-6c6a0a71d4f3-1}          1                1
com.adobe.AdobeIPCBroker.ctrl-yangyang            3               -1
crashpad_12712_GLCKYFLKXHUIRLWD                   2                2
sogouCloudImeAgentPipe4436                        6               -1
efsrpc                                            3               -1
mojo.12712.11624.7931918328911422035              1                1
devenv.perfwatson.20012                           1                1
IdentityNexusIntegrationPipe\98f1536cbfed495ea79ec1fbd387c8ea          1               -1
mojo.12712.11624.5288610044236809969              1                1
ProtectedPrefix\LocalService\FTHPIPE              1                1
com.adobe.acrobat.rna.yangyang.DC.0               2               -1
com.adobe.acrobat.rna.35b8                        1               -1
mojo.11636.14816.7371205822873667664              1                1
mojo.11636.16844.11288117515834302909             1                1
mojo.11636.16844.13317992225715912740             1                1
dotnet-diagnostic-33136                           1               -1
LINQPad7.33136                                    1                1
dotnet-diagnostic-34364                           1               -1
dotnet-diagnostic-25868                           1               -1
dotnet-diagnostic-27700                           1               -1
LOCAL\mojo.33136.24864.17432818372899607161          1                1
crashpad_24324_ABBZIPCUBHCUVYQY                   2                2
LOCAL\mojo.24324.3272.13000764280322328871          1                1
LOCAL\mojo.24324.3272.10295390335632547853          1                1
LOCAL\mojo.24324.3272.7752123814181536120          1                1
LOCAL\mojo.24324.3272.2213263502785307946          1                1
LOCAL\mojo.24324.13676.496841217634845385          1                1
dotnet-diagnostic-27880                           1               -1
LOCAL\mojo.24324.13676.2990309382102775294          1                1
0cb2880c86240da8d1445e81e93254b6e723b1bb\devenv.exe.config         13               -1
19188E17466D16FAC048CE78B6B46731E9B6C             2               -1
1918899B6842291AEB372A1445F4EEAB80255            10               -1
devenv.perfwatson.35596                           1                1
4122a0bc6bc640db82e0ce1303f8a540                  1               -1
crashpad_508_HFJIVMLAFCIXFFOB                     2                2
mojo.508.33440.16278103835832306208               1                1
crashpad_35096_OWROVTHMAOKINLOT                   2                2
RDMChannel                                        1               -1
LOCAL\mojo.3352.21048.4615447561446550860          1                1
crashpad_11832_KGLARGUTZRNQIKSK                   2                2
LOCAL\mojo.11832.12904.6920560339268544532          1                1
LOCAL\mojo.11832.12904.11843395221099164572          1                1
LOCAL\mojo.11832.12904.17899706466599852410          1                1
mojo.12712.5384.17968125367119843034              1                1
named_pipe_test_server_1                          1                1
named_pipe_test_server_2                          1                1
named_pipe_test_server                            1                1
mojo.12712.5384.17906999344906419012              1                1
mojo.12712.11624.13266080779881921418             1                1
mojo.12712.5384.15523601107665812595              1                1
mojo.12712.11624.16169632426378812064             1                1
mojo.12712.11624.4665868630448807921              1                1

在C#代码中,可以使用如下命令列出所有命名管道:

String[] listOfPipes = System.IO.Directory.GetFiles(@"\\.\pipe\"); 

命名管道的使用


MSDN官网上对Named Pipe有很详细的介绍。使用方法也很简单,主要有两个对象NamedPipeServerStream和NamePipeClientStream:

 private static void ServerThread(object? data)
    {
        NamedPipeServerStream pipeServer =
            new NamedPipeServerStream("testpipe", PipeDirection.InOut, numThreads);

        int threadId = Thread.CurrentThread.ManagedThreadId;

        // Wait for a client to connect
        pipeServer.WaitForConnection();

        Console.WriteLine("Client connected on thread[{0}].", threadId);
        try
        {
            // Read the request from the client. Once the client has
            // written to the pipe its security token will be available.

            StreamString ss = new StreamString(pipeServer);

            // Verify our identity to the connected client using a
            // string that the client anticipates.

            ss.WriteString("I am the one true server!");
            string filename = ss.ReadString();

            // Read in the contents of the file while impersonating the client.
            ReadFileToStream fileReader = new ReadFileToStream(ss, filename);

            // Display the name of the user we are impersonating.
            Console.WriteLine("Reading file: {0} on thread[{1}] as user: {2}.",
                filename, threadId, pipeServer.GetImpersonationUserName());
            pipeServer.RunAsClient(fileReader.Start);
        }
        // Catch the IOException that is raised if the pipe is broken
        // or disconnected.
        catch (IOException e)
        {
            Console.WriteLine("ERROR: {0}", e.Message);
        }
        pipeServer.Close();
    }

// Defines the data protocol for reading and writing strings on our stream
public class StreamString
{
    private Stream ioStream;
    private UnicodeEncoding streamEncoding;

    public StreamString(Stream ioStream)
    {
        this.ioStream = ioStream;
        streamEncoding = new UnicodeEncoding();
    }

    public string ReadString()
    {
        int len = 0;

        len = ioStream.ReadByte() * 256;
        len += ioStream.ReadByte();
        byte[] inBuffer = new byte[len];
        ioStream.Read(inBuffer, 0, len);

        return streamEncoding.GetString(inBuffer);
    }

    public int WriteString(string outString)
    {
        byte[] outBuffer = streamEncoding.GetBytes(outString);
        int len = outBuffer.Length;
        if (len > UInt16.MaxValue)
        {
            len = (int)UInt16.MaxValue;
        }
        ioStream.WriteByte((byte)(len / 256));
        ioStream.WriteByte((byte)(len & 255));
        ioStream.Write(outBuffer, 0, len);
        ioStream.Flush();

        return outBuffer.Length + 2;
    }
}

// Contains the method executed in the context of the impersonated user
public class ReadFileToStream
{
    private string fn;
    private StreamString ss;

    public ReadFileToStream(StreamString str, string filename)
    {
        fn = filename;
        ss = str;
    }

    public void Start()
    {
        string contents = File.ReadAllText(fn);
        ss.WriteString(contents);
    }
}

服务端实例化一个NamedPipedServerStream,然后调用WaitForConnection等待客户端连接。客户端启动之后,就可以从这个Stream里面读取和写入二进制数据了。使用完成之后调用Close关闭流。

客户端也非常简单:

var pipeClient = new NamedPipeClientStream(".", "testpipe",
                       PipeDirection.InOut, PipeOptions.None,
                       TokenImpersonationLevel.Impersonation);

Console.WriteLine("Connecting to server...\n");
pipeClient.Connect();

var ss = new StreamString(pipeClient);
// Validate the server's signature string.
if (ss.ReadString() == "I am the one true server!")
{
    // The client security token is sent with the first write.
    // Send the name of the file whose contents are returned
    // by the server.
    ss.WriteString("c:\\textfile.txt");

    // Print the file to the screen.
    Console.Write(ss.ReadString());
}
else
{
    Console.WriteLine("Server could not be verified.");
}
pipeClient.Close();

客户端实例化一个NamedPipeClientStream,指定命名空间的名称,然后调用Connect就能连接服务端。这些方法都是阻塞的同步方法。

NamedPipeServerStream和NamedPipeClientStream可以“当作”常见的Stream来使用。都是一种数据流,可以读取和写入数据,完成之后需要调用Close关闭。服务端和客户端分别对各自的NamedPipeServerStream和NamedPipeClientStream进行读写就可以实现服务端和客户端进行收发。

命名管道通讯设计


上面就是命名管道的介绍,但实际应用中,还有很多需要注意的地方。比如服务端如何识别和标识客户端。在TCP中,服务端可以根据连接上来的客户端的IP和端口来实现客户端的辨别(通过Socket.RemoteEndPoint方法获取)。但在命名管道中,服务端只有一个WaitForConnect的阻塞方法,当有客户端连接上时,WaitForConnect返回void,然后执行后续方法,可以从NamedPipeServerStream读数据(表示读取客户端发过来的数据)和写数据(往客户端写入数据),仅此而已。

▲大部分的demo和“可用”就差了那些“其它细节”😂

这里的设计是基于named-pipe-wrapper这个开源项目,对其进行了一些修改和提炼:

  • 将序列化和反序列化单独出来,原项目里发送和接收的消息类型是自定义的泛型类型,然后内置是使用的是二进制序列化 BinaryFormatter对这个消息类型进行序列化和反序列化,二进制的序列化反序列化则要求这里的发送和接收的泛型类型必须标记为Serializable,否则序列化和反序列化可能会报错,这个对消息对象类型具有很大的局限性,尤其是那些已经写好无法修改的代码没办法添加Serializable标记。
  • 将发送和接受类型统一改为了byte[],对象类型以及序列化和反序列化操作改为在更上层进行操作,可以更加灵活的选择序列化和反序列化操作。
  • 改进了NamedPipedClinet的Name的生成方式,原项目使用的id自增的方式,并且在服务端也是按照同样的接收到的客户端连接数的方式进行自增,这样并不能确认一致性。修改为了通过用户指定Name,或者通过ProcessID的方式来产生,同时通过handshake的方式,告知服务端客户端的identity。
  • 移除了很多为了封装而进行的封装,结构更加精简和优化。

基本的设计思路是:

  • NamedPipeServer对象,表示命名管道服务端,一个服务端能服务多个NamedPipeClient客户端,但它不直接包含NamedPipedClient,而是以一个NamedPipeConnection呈现,所以一个NamedPipeServer应该包含多个NamedPipeConnection,其中每个NamedPipeConnection表示该服务端和某个客户端的连接,可以往该连接写入和读取数据。服务端可以给所有的NamedPipeConnection发送消息,那就是遍历这个NamedPipeConnection的List;也可以单独给指定的某个NamedPipeConnection发送消息。
  • NamedPipeClient对象,表示命名管道客户端,它应该包含一个唯一的标识,这个标识可以是用户指定,也可以是自动产生,比如根据id自增,或者直接根据process id来获取。这个唯一标识需要告知NamedPipeServer。
  • NamedPipeConnection对象,表示一个服务端客户端的连接。NamedPipeConnection内部应该包含一个PipeStreamWrapper对象,可以对其进行读取或者写入。
  • PipeStreamWrapper是对PipeStream的封装,它封装了PipeStream的读写操作。PipedStream是NamedPipeServerStream和NamedPipeClientStream的父类。如果是服务端内部包含的NamedPipeConnection,那么其内部的PipeStreamWarpper包装的就是NamedPipeServerStream,该对象读取的就是客户端发送给服务端的消息,写入的就是服务端往客户端的消息。如果时客户端内部包含的NamedPipeConnection,那么其内部的PipeStreamWrapper包装的是NamedPipeClientStream,该对象读取的就是服务端发送给客户端的消息,写入的就是客户端发送给服务端的消息。

下面就来看具体的每个对象实现:

NamedPipeConnection


NamedPipeConnect表示一个服务端和客户端的连接。服务端和客户端是在管道的两端,它们各自操作自己的NamedPipeServerStream和NamedPipeClientStream,这两个Stream都继承自PipeStream。所以NamedPipeConnection实际上是对NamedPipeStream的一个封装,封装了它的读写操作。

public class NamedPipeConnection
{
    private static int lastId;
    public readonly int Id;
    public string Name { get; }
    public bool IsConnected => _streamWrapper.IsConnected;
    public Action<NamedPipeConnection> OnDisconnected;
    public Action<NamedPipeConnection, byte[]> OnReceiveMessage;
    public Action<NamedPipeConnection, Exception> Error;
    private readonly PipeStreamWrapper _streamWrapper;
    private readonly AutoResetEvent _writeSignal = new AutoResetEvent(false);
    private readonly BlockingCollection<byte[]> _writeQueue = new BlockingCollection<byte[]>();
    private bool _notifiedSucceeded;
    public NamedPipeConnection(string name, PipeStream serverStream)
    {
        Id = ++lastId;
        Name = name;
        _streamWrapper = new PipeStreamWrapper(serverStream);
    }

    public void Open()
    {
        var readWorker = new Worker();
        readWorker.OnSucceeded += OnSucceeded;
        readWorker.OnError += OnError;
        readWorker.DoWork(ReadPipe);

        var writeWorker = new Worker();
        writeWorker.OnSucceeded += OnSucceeded;
        writeWorker.OnError += OnError;
        writeWorker.DoWork(WritePipe);
    }

    public void Send(byte[] message)
    {
        _writeQueue.Add(message);
        _writeSignal.Set();
    }

    public void Close()
    {
        CloseImpl();
    }

    private void CloseImpl()
    {
        _streamWrapper.Close();
        _writeSignal.Set();
    }

    private void OnSucceeded()
    {
        // Only notify observers once
        if (_notifiedSucceeded)
            return;

        _notifiedSucceeded = true;

        if (Disconnected != null)
            Disconnected(this);
    }

    private void OnError(Exception exception)
    {
        if (Error != null)
            Error(this, exception);
    }

    private void ReadPipe()
    {
        while (IsConnected && _streamWrapper.CanRead)
        {
            try
            {
                var obj = _streamWrapper.Read();
                if (obj == null)
                {
                    CloseImpl();
                    return;
                }
                if (OnReceiveMessage != null)
                    OnReceiveMessage(this, obj);
            }
            catch
            {
                //we must igonre exception, otherwise, the namepipe wrapper will stop work.
            }
        }
    }

    private void WritePipe()
    {
        while (IsConnected && _streamWrapper.CanWrite)
        {
            try
            {
                _streamWrapper.Write(_writeQueue.Take());
                _streamWrapper.WaitForPipeDrain();
            }
            catch
            {
                //we must igonre exception, otherwise, the namepipe wrapper will stop work.
            }
        }
    }
}

可以看到NamedPipeConnection里面定义了一些事件,包括链接的连接状态OnDisconnected、新消息事件OnReceiveNewMessage。

在Open方法里,启动了两个Work(对Task的简单包装,额外提供报错和完成事件),他们分别用来执行ReadPipe读取和写入WritePipe的操作。 这两个操作都是while循环。

在读取里面就是一直从PipedStream里面读取(通过PipedStreamWraper包装读写操作),读取完成之后调用OnReceiveNewMessage事件将数据发送出去。

写入的时候,当用户调用Send方法时,先把待发送的消息放到一个BlockingCollection中,然后在WritePipe方法里,不断的从BlockingCollection中获取数据写入到PipedStream里。

综上,这个NamedPipedConnect主要时对客户端和服务端的连接做了抽象和封装,它实际上只是对PipedStream的读写操作进行高层次的封装。

接下来要看下PipeStreamWrapper类了。

PipeStreamWrapper


这个类在NamedPipeConnection中负责对底层的PipeStream进行读写封装,非常简单,它就是提供了读写操作,读写的时候,先将消息的长度读出来,然后紧接着读写Stream后面紧接着指定长度的数据。

public class PipeStreamWrapper
{
    private PipeStream baseStream;
    private bool isReaderConnected = true;
    public bool IsConnected
    {
        get { return baseStream.IsConnected && isReaderConnected; }
    }

    public bool CanRead
    {
        get { return baseStream.CanRead; }
    }


    public bool CanWrite
    {
        get { return baseStream.CanWrite; }
    }

    public PipeStreamWrapper(PipeStream stream)
    {
        baseStream = stream;
    }

    public byte[] Read()
    {
        var len = ReadLength();
        return ReadObject(len);
    }


    public void Write(byte[] data)
    {
        WriteLength(data.Length);
        WriteBytes(data);
        Flush();
    }

    /// <summary>
    ///     Waits for the other end of the pipe to read all sent bytes.
    /// </summary>
    /// <exception cref="ObjectDisposedException">The pipe is closed.</exception>
    /// <exception cref="NotSupportedException">The pipe does not support write operations.</exception>
    /// <exception cref="IOException">The pipe is broken or another I/O error occurred.</exception>
    public void WaitForPipeDrain()
    {
        baseStream.WaitForPipeDrain();
    }

    public void Close()
    {
        baseStream.Close();
    }


    #region  read write

    private int ReadLength()
    {
        const int size = sizeof(int);
        var buf = new byte[size];
        var bytesRead = baseStream.Read(buf, 0, size);
        if (bytesRead == 0)
        {
            isReaderConnected = false;
            return 0;
        }
        if (bytesRead != size)
            throw new IOException($"Expected {size} bytes but read {bytesRead}");
        return IPAddress.NetworkToHostOrder(BitConverter.ToInt32(buf, 0));
    }

    private byte[] ReadObject(int len)
    {
        var data = new byte[len];
        baseStream.Read(data, 0, len);
        return data;
    }

    private void WriteLength(int len)
    {
        var buf = BitConverter.GetBytes(IPAddress.HostToNetworkOrder(len));
        baseStream.Write(buf, 0, buf.Length);
    }

    private void WriteBytes(byte[] data)
    {
        baseStream.Write(data, 0, data.Length);
    }

    private void Flush()
    {
        baseStream.Flush();
    }
    #endregion
}

接下来就是服务端和客户端的代码了,先看客户端。

NamedPipeClient


客户端主要时对NamedPipeClientStream对象的封装以及与服务端的通讯。这里面主要时如何获取客户端的唯一标识,以及如何通知服务端这一标志。

public class NamedPipeClient
{
    public bool AutoReconnect { get; set; }
    public Action<NamedPipeConnection, byte[]> NewMessage;
    public Action<NamedPipeConnection> Disconnected;
    public Action<Exception> Error;
    private readonly string _pipeName;
    private NamedPipeConnection connection;
    private readonly AutoResetEvent _connected = new AutoResetEvent(false);
    private readonly AutoResetEvent _disconnected = new AutoResetEvent(false);

    private volatile bool _closedExplicitly;
    private string _serverName { get; set; }
    public string _clientName { get; set; }

    public NamedPipeClient(string pipeName, string clientName = "", string serverName = ".")
    {
        if (string.IsNullOrEmpty(clientName))
        {
           Process processes = Process.GetCurrentProcess();
            _clientName = Helper.GetClientName(processes.Id);
        }
        else
        {
            _clientName = clientName;
        }
        _pipeName = pipeName;
        _serverName = serverName;
        AutoReconnect = true;
    }

    public void Start()
    {
        _closedExplicitly = false;
        var worker = new Worker();
        worker.OnError += OnError;
        worker.DoWork(ListenSync);
    }

    public void Send(byte[] message)
    {
        if (connection != null)
            connection.Send(message);
    }

    public void Stop()
    {
        _closedExplicitly = true;
        if (connection != null)
            connection.Close();
    }

    #region Wait for connection/disconnection

    public void WaitForConnection()
    {
        _connected.WaitOne();
    }

    public void WaitForConnection(int millisecondsTimeout)
    {
        _connected.WaitOne(millisecondsTimeout);
    }

    public void WaitForConnection(TimeSpan timeout)
    {
        _connected.WaitOne(timeout);
    }

    public void WaitForDisconnection()
    {
        _disconnected.WaitOne();
    }

    public void WaitForDisconnection(int millisecondsTimeout)
    {
        _disconnected.WaitOne(millisecondsTimeout);
    }

    public void WaitForDisconnection(TimeSpan timeout)
    {
        _disconnected.WaitOne(timeout);
    }

    #endregion

    #region Private methods

    private void ListenSync()
    {
        // Get the name of the data pipe that should be used from now on by this NamedPipeClient
        var handshakePipe = CreateNamedPipe(_pipeName, _serverName);
        handshakePipe.Connect();
        var handshake = new PipeStreamWrapper(handshakePipe);

        var dataPipeNameBytes = handshake.Read();
        string dataPipeName = Encoding.UTF8.GetString(dataPipeNameBytes);
        //tell the server the clientName
        handshake.Write(Encoding.UTF8.GetBytes(_clientName));
        handshake.Close();

        // Connect to the actual data pipe
        var dataPipe = CreateNamedPipe(dataPipeName, _serverName);
        dataPipe.Connect();

        // Create a Connection object for the data pipe
        connection = new NamedPipeConnection(_clientName, dataPipe);
        connection.OnDisconnected += OnDisconnected;
        connection.OnReceiveMessage += OnReceiveMessage;
        connection.Error += ConnectionOnError;
        connection.Open();

        _connected.Set();
    }

    private void OnDisconnected(NamedPipeConnection connection)
    {
        if (Disconnected != null)
            Disconnected(connection);

        _disconnected.Set();

        // Reconnect
        if (AutoReconnect && !_closedExplicitly)
            Start();
    }

    private void OnReceiveMessage(NamedPipeConnection connection, byte[] message)
    {
        if (NewMessage != null)
            NewMessage(connection, message);
    }

    private void ConnectionOnError(NamedPipeConnection connection, Exception exception)
    {
        OnError(exception);
    }

    private void OnError(Exception exception)
    {
        if (Error != null)
            Error(exception);
    }

    private static NamedPipeClientStream CreateNamedPipe(string pipeName, string serverName)
    {
        return new NamedPipeClientStream(serverName, pipeName, PipeDirection.InOut, PipeOptions.Asynchronous | PipeOptions.WriteThrough);
    }
    #endregion

}

在NamedPipeClient里面,如果用户没有手动指定_clientName,则将当前应用的进程ID作为_clientName。当用户调用Start方法是,会启动一个Task里面执行ListenSync方法,这是一个同步阻塞方法,在方法里面,首先创建一个握手连接handshakepipe,这个握手连接里的管道名称,是在初始化时指定的,该名称必须和服务端监听的管道名称一致。建立好握手之后,读取服务端的第一个字符串,作为后续的用来通讯的命名管道的连接名称dataPipeName,然后将自己的_clinetName发送给服务端,告知自己的identity。

上述的握手完毕之后,获取到了服务端下发的通讯管道名称,然后调用CreateNamedPipe创建一个真正的数据通讯连接dataPipe(NamedPipeClientStream)。连接成功之后,创建一个NamedPipeConnection,正如上所述,它实际上是对NamedPipeClientStream的一个封装。然后注册它的OnReceiveMessage事件,该事件表示接收到新的服务端发送来的数据就会触发。最后调用Open方法开始监听。

CreateNamedPipe方法就是创建一个NamedPipeClientStream。Send方法表示客户端往服务端发送消息,它直接调用NamedPipeConnection的Send方法,其内部就是王NamedPipeClientStream里面写入数据。

最后就是NamedPipeServer服务端代码了。

NamedPipeServer


和客户端封装的逻辑相似,服务端主要时对NamedPipeServerStream对象的封装以及与客户端的通讯。由于一个服务都可以服务多个客户端,每个服务都被封装为了NamedPipeConnection,所以它包含一个List类型的NamedPipeConnection集合。同时也对外暴漏了一些诸如新客户端连接状态OnClientConnected,新消息OnNewMessage等事件。另外内部还有个表示一对一连接的List<NamedPipeConnection>对象,每一个客户端连接上来之后,都会新建一个NamedPipeConnection然后保存起来。

public class NamedPipeServer
{
    private readonly string _pipeName;
    private readonly PipeSecurity _pipeSecurity;
    public Action<NamedPipeConnection> OnClientConnected;
    public Action<NamedPipeConnection> OnClientDisconnected;
    public Action<NamedPipeConnection, byte[]> OnNewMessage;
    public Action<Exception> OnError;

    private List<NamedPipeConnection> _connections = new List<NamedPipeConnection>();
    private int _nextPipeId;
    private volatile bool _shouldKeepRunning;
    private volatile bool _isRunning;

    public NamedPipeServer(string pipeName, PipeSecurity pipeSecurity = null)
    {
        _pipeName = pipeName;
        _pipeSecurity = pipeSecurity;
    }

    public void Start()
    {
        _shouldKeepRunning = true;
        var worker = new Worker();
        worker.OnError += OnWorkerError;
        worker.DoWork(ListenSync);
    }

    public void Send(byte[] message)
    {
        lock (_connections)
        {
            foreach (var client in _connections)
            {
                client.Send(message);
            }
        }
    }

    public void Send(byte[] message, string clientName)
    {
        lock (_connections)
        {
            foreach (var client in _connections)
            {
                if (client.Name == clientName)
                    client.Send(message);
            }
        }
    }

    public void Stop()
    {
        _shouldKeepRunning = false;
        lock (_connections)
        {
            foreach (var client in _connections.ToArray())
            {
                client.Close();
            }
        }

        // If background thread is still listening for a client to connect,
        // initiate a dummy connection that will allow the thread to exit.
        //dummy connection will use the local server name.
        var dummyClient = new NamedPipeClient(_pipeName, ".");
        dummyClient.Start();
        dummyClient.WaitForConnection(TimeSpan.FromSeconds(2));
        dummyClient.Stop();
        dummyClient.WaitForDisconnection(TimeSpan.FromSeconds(2));
    }

    #region Private methods

    private void ListenSync()
    {
        _isRunning = true;
        while (_shouldKeepRunning)
        {
            WaitForConnection(_pipeName, _pipeSecurity);
        }
        _isRunning = false;
    }

    private void WaitForConnection(string pipeName, PipeSecurity pipeSecurity)
    {
        NamedPipeServerStream handshakePipe = null;
        NamedPipeServerStream dataPipe = null;
        NamedPipeConnection connection = null;

        var connectionPipeName = GetNextConnectionPipeName(pipeName);
        byte[] connectionPipeNameBytes = Encoding.UTF8.GetBytes(connectionPipeName);
        try
        {
            // Send the client the name of the data pipe to use
            handshakePipe = CreateNamedPipe(pipeName, pipeSecurity);
            handshakePipe.WaitForConnection();
            var handshakeWrapper = new PipeStreamWrapper(handshakePipe);

            handshakeWrapper.Write(connectionPipeNameBytes);
            handshakeWrapper.WaitForPipeDrain();
            //uint clientProcessID = Helper.GetNamedPipeClientProcID(handshakePipe);
            byte[] clientNameBytes = handshakeWrapper.Read();
            string clientName = Encoding.UTF8.GetString(clientNameBytes);
            handshakeWrapper.Close();

            // Wait for the client to connect to the data pipe
            dataPipe = CreateNamedPipe(connectionPipeName, pipeSecurity);
            dataPipe.WaitForConnection();

            // Add the client's connection to the list of connections
            connection = new NamedPipeConnection(clientName/*Helper.GetClientName((int)clientProcessID)*/, dataPipe);
            connection.OnReceiveMessage += ClientOnReceiveMessage;
            connection.OnDisconnected += ClientOnDisconnected;
            connection.Error += ConnectionOnError;
            connection.Open();

            lock (_connections)
            {
                _connections.Add(connection);
            }

            ClientOnConnected(connection);
        }
        // Catch the IOException that is raised if the pipe is broken or disconnected.
        catch (Exception)
        {
            Cleanup(handshakePipe);
            Cleanup(dataPipe);
            ClientOnDisconnected(connection);
        }
    }

    private void ClientOnConnected(NamedPipeConnection connection)
    {
        if (OnClientConnected != null)
            OnClientConnected(connection);
    }

    private void ClientOnReceiveMessage(NamedPipeConnection connection, byte[] message)
    {
        if (OnNewMessage != null)
            OnNewMessage(connection, message);
    }

    private void ClientOnDisconnected(NamedPipeConnection connection)
    {
        if (connection == null)
            return;

        lock (_connections)
        {
            _connections.Remove(connection);
        }

        if (OnClientDisconnected != null)
            OnClientDisconnected(connection);
    }

    private void ConnectionOnError(NamedPipeConnection connection, Exception exception)
    {
        OnError(exception);
    }

    private void OnWorkerError(Exception exception)
    {
        if (OnError != null)
            OnError(exception);
    }

    private string GetNextConnectionPipeName(string pipeName)
    {
        return string.Format("{0}_{1}", pipeName, ++_nextPipeId);
    }

    private void Cleanup(NamedPipeServerStream pipe)
    {
        if (pipe == null) return;
        using (var x = pipe)
        {
            x.Close();
        }
    }

    private static NamedPipeServerStream CreateNamedPipe(string pipeName, PipeSecurity pipeSecurity)
    {
        return new NamedPipeServerStream(pipeName, PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous | PipeOptions.WriteThrough, 0, 0, pipeSecurity);
    }
    #endregion
}

当用户调用Start时,会启动一个task,在内部会调用一个ListenSync同步方法。该方法在while循环中,_shouldKeepRunning为true,就不断地等待新的连接。在WaitForConnection中,首先使用跟客户端约定好的pipeName建立一个handshakePipe,等待客户端连接,连接上之后,告知客户端用来进行数据传输的connectionPipeName。在客户端获取到新的PipeName之后,读取客户端的名称作为创建NamedPipeConnection时的标识。获取完成各自基本信息之后,创建使用CreateNamePipe新的通讯命令管道,该方法在内部实际上时创建一个NamedPipeServerStream。

上面有一句注释代码,如果不采用handshake由客户端主动传递标识,客户端那边默认采用进程IP,服务端这边也可以通过win32 api获取管道客户端的进程id。

uint clientProcessID = Helper.GetNamedPipeClientProcID(handshakePipe);

GetNamedPipeClientProcID方法如下:

class Helper
{
    [DllImport("kernel32.dll", SetLastError = true)]
    internal static extern bool GetNamedPipeClientProcessId(IntPtr Pipe, out uint ClientProcessId);
    public static uint GetNamedPipeClientProcID(NamedPipeServerStream pipeServer)
    {
        UInt32 nProcID;
        IntPtr hPipe = pipeServer.SafePipeHandle.DangerousGetHandle();
        if (GetNamedPipeClientProcessId(hPipe, out nProcID))
            return nProcID;
        return 0;
    }

    public static string GetClientName(int processID)
    {
        return "Client_" + processID;
    }
}

当然最好就是由客户端自己告诉服务端自己的标识,而不是采用上面这种约定使用ProcessID的方式,这样最优雅和准确。

新建好NamePipeConnection并注册相关事件之后,将该NamedPipedConnection添加到集合中,并报告ClientOnConnected,然后while循环继续等待下一个对象连接。

这里面有一个需要注意的地方就是Stop方法。当需要关闭的时候,Start方法创建了一个Task,在Task内部会调用ListenSync方法,该方法是一个while循环,它会循环调用WaitForConnection方法,该方法是一个阻塞方法,当需要Stop的时候,没有办法直接关闭Start方法里面创建的那个Task,因为Task里面还在运行,且方法在WaitForConnection方法里面正在阻塞等待下一个连接。所以Stop方法的做法就是首先把_shouldKeepRunning方法置为false,然后发起一个无用的NamedPipeClient连接。使得WaitForConnection继续运行,知道下一次在while循环判断_shouldKeepRunning为false时自然推出。这是一个讨巧的做法,因为没有办法使用CancellationTokenSource的方式来优雅的使得Task来结束。

至此实现管道通讯的可以在实战环境使用的几个框架结构类都介绍完毕,唯一还未介绍的时Worker这个类只是简单的对Task进行了一下封装,代码很简单,如下:

class Worker
{
    private readonly TaskScheduler _callbackThread;

    private static TaskScheduler CurrentTaskScheduler
    {
        get
        {
            return (SynchronizationContext.Current != null
                ? TaskScheduler.FromCurrentSynchronizationContext()
                : TaskScheduler.Default);
        }
    }

    public Action OnSucceeded;
    public Action<Exception> OnError;

    public Worker() : this(CurrentTaskScheduler)
    {
    }

    public Worker(TaskScheduler callbackThread)
    {
        _callbackThread = callbackThread;
    }

    public void DoWork(Action action)
    {
        new Task(DoWorkImpl, action, CancellationToken.None, TaskCreationOptions.LongRunning).Start();
    }

    private void DoWorkImpl(object oAction)
    {
        var action = (Action)oAction;
        try
        {
            action();
            Callback(Succeed);
        }
        catch (Exception e)
        {
            Callback(() => Fail(e));
        }
    }

    private void Succeed()
    {
        OnSucceeded?.Invoke();
    }

    private void Fail(Exception exception)
    {
        OnError?.Invoke(exception);
    }

    private void Callback(Action action)
    {
        Task.Factory.StartNew(action, CancellationToken.None, TaskCreationOptions.None, _callbackThread);
    }
}

一个小例子


利用上述介绍的几个类,可以很简单的编写一个使用命名管道通讯的例子。该实例有两部分组成,一部分是服务端一部分时客户端,客户端可以有多个。代码很简单。

首先是主窗体FormMain,主窗体上面只有两个按钮,启动后用户可以选择时开启一个服务端还是客户端。

public partial class FormMain : Form
{
    public FormMain()
    {
        InitializeComponent();
    }

    private void buttonClient_Click(object sender, EventArgs e)
    {
        Hide();
        new FormClient().ShowDialog(this);
        //Close();
        this.Dispose(false);
    }

    private void buttonServer_Click(object sender, EventArgs e)
    {
        Hide();
        new FormServer().ShowDialog(this);
        //Close();
        this.Dispose(false);
    }
}

服务端FormServer的代码如下:

public partial class FormServer : Form
{
    private readonly NamedPipeServer _server;
    private readonly ISet<string> _clients = new HashSet<string>();

    public FormServer()
    {
        _server = new NamedPipeServer(Constants.PIPE_NAME);
        InitializeComponent();
        Load += OnLoad;
    }

    private void OnLoad(object sender, EventArgs eventArgs)
    {
        _server.OnClientConnected += OnClientConnected;
        _server.OnClientDisconnected += OnClientDisconnected;
        _server.OnNewMessage += (client, message) => AddLine("<b>" + client.Name + "</b>: " + Constants.DeserializeFunc(message));
        _server.Start();
    }

    protected override void OnClosing(CancelEventArgs e)
    {
        _server.Stop();
        base.OnClosing(e);
        System.Environment.Exit(0);
    }

    private void OnClientConnected(NamedPipeConnection connection)
    {
        _clients.Add(connection.Name);
        AddLine("<b>" + connection.Name + "</b> connected!");
        UpdateClientList();
        connection.Send(Constants.SerializeFunc("Welcome!  You are now connected to the server."));
    }

    private void OnClientDisconnected(NamedPipeConnection connection)
    {
        _clients.Remove(connection.Name);
        AddLine("<b>" + connection.Name + "</b> disconnected!");
        UpdateClientList();
    }

    private void AddLine(string html)
    {
        richTextBoxMessages.Invoke(new Action(delegate
        {
            richTextBoxMessages.Text += "<div>" + html + "</div>" + Environment.NewLine;
        }));
    }

    private void UpdateClientList()
    {
        listBoxClients.Invoke(new Action(UpdateClientListImpl));
    }

    private void UpdateClientListImpl()
    {
        listBoxClients.Items.Clear();
        foreach (var client in _clients)
        {
            listBoxClients.Items.Add(client);
        }
    }

    private void buttonSend_Click(object sender, EventArgs e)
    {
        if (string.IsNullOrWhiteSpace(textBoxMessage.Text))
            return;
        byte[] serverMsg = Constants.SerializeFunc(textBoxMessage.Text);

        if (listBoxClients.SelectedIndex > -1)
        {
            string selectIndex = listBoxClients.SelectedItem.ToString();
            _server.Send(serverMsg, selectIndex);
        }
        else
        {
            _server.Send(serverMsg);
        }
        textBoxMessage.Text = "";
    }
}

服务端左侧会显示所有的消息记录,右侧会显示所有正在连接的客户端,下方是发送消息对话框,可以选择右侧列表里面的某个客户端单独的一对一发送消息,或者不选即表示群发消息。

客户端FormClient就更简单了,代码如下:

public partial class FormClient : Form
{
    private readonly NamedPipeClient _client;

    public FormClient()
    {
        _client = new NamedPipeClient(Constants.PIPE_NAME);
        InitializeComponent();
        Load += OnLoad;
        this.Text = _client._clientName;
    }

    private void OnLoad(object sender, EventArgs eventArgs)
    {
        _client.NewMessage += OnServerMessage;
        _client.Disconnected += OnDisconnected;
        _client.Start();
    }

    private void OnServerMessage(NamedPipeConnection connection, byte[] message)
    {
        richTextBoxMessages.Invoke(new Action(delegate
        {
            AddLine("<b>Server</b>: " + Constants.DeserializeFunc(message));
        }));
    }

    private void OnDisconnected(NamedPipeConnection connection)
    {
        richTextBoxMessages.Invoke(new Action(delegate
        {
            AddLine("<b>Disconnected from server</b>");
        }));
    }

    private void AddLine(string html)
    {
        richTextBoxMessages.Invoke(new Action(delegate
        {
            richTextBoxMessages.Text += "<div>" + html + "</div>" + Environment.NewLine;
        }));
    }

    private void buttonSend_Click(object sender, EventArgs e)
    {
        if (string.IsNullOrWhiteSpace(textBoxMessage.Text))
            return;
        byte[] clientMsg = Constants.SerializeFunc(textBoxMessage.Text);
        _client.Send(clientMsg);
        textBoxMessage.Text = "";
    }

    protected override void OnClosing(CancelEventArgs e)
    {
        _client.Stop();
        base.OnClosing(e);
        System.Environment.Exit(0);
    }
}

上方显示消息记录,下方是消息发送对话框。

下面是运行了一个服务端窗体,两个客户端窗体,并发送了若干条消息之后的截图:

▲使用命名管道,利用一个服务端两个客户端实现一对多,一对一通讯的例子

总结


本文简单介绍了进程间通讯的基本概念、使用场景以及一些常用的实现进程间通讯的方法。然后介绍了命名管道的基本概念和用法,再此基础之上,借鉴开源项目并改造了一个可用于生产环境的命名管道框架,最后以这个简单的框架实现了一个可以一对多和一对一通讯的演示项目,希望本文对了解在C#中使用命名管道来进行进程间通讯有所帮助。

 

参考: