端口转发在有些时候还是比较有用的,它能够在数据传输的过程中增加一个“路由”,提供了某种程度的灵活。本文简单介绍了工作中遇到的一个可能使用端口转发能解决的例子,以及如何使用端口转发,包括介绍了在Windows上的内置的端口转发工具,以及一些开源的端口转发程序,最后再简单介绍了如何使用C#实现一个端口转发工具。

场景


我在工作中遇到的一个场景:一个运行在生产环境的某台机器M1上有一个行情接收程序 S1 会将收到的数据发送到本机的某个端口 P1,然后在另外一台机器 L1 上的程序 L1 会从这台生产环境的机器M1 的端口 P1 读取数据。由于行情接收程序中会有一些计算比如委托列表,这些计算结果会实时保存在内存中,所以要求接收程序S1不能出现异常,否则会导致数据丢失从而使得行情计算出现错误。

然而在进行功能更新或者bug修复的时候,最好能够在生产环境上部署一份修改后的程序来验证修改的内容,没问题后才发布到正式生产环境。但由于条件限制,我们在生产环境只有一台机器M1,但是行情接收程序可以通过使用同一个账号的不同的SessionID运行多个实例。所以可以在原来的程序S1运行并监听端口 P1 的同时,运行一个修改后的程序 S2,并监听本地端口 P2。然后在端口P2上验证修改后的内容。

现在,假设修改后的程序S2没有问题,那怎么才能在S1和S2程序不重启的情况下,将S2的数据发送到端口P1上?另外,如果S2验证时出现异常,如何将正常的S1又切换回来把数据发到P1上?一般来说有两步:

  • 让S1不再往端口P1发送数据。
  • 运行一个端口转发工具,把S2程序发送到P2端口的数据转发到P1端口上。

这样可以保证程序在不重启的情况下,动态的将数据发送到需要的端口上。

实现


有一些方法可以实现端口转发,包括Windows上自带的端口转发工具,开源的端口转发软件pjs-passport,以及使用Socket编程自己实现一个简单的端口转发工具。举个列子,假设需要将本机192.168.6.200的61619端口的所有流量转发到端口61615上来,同时将61615的流量转发到61619上。下面以这个场景来一一介绍。

Windows内置的端口转发


 从 Windows 2000 开始,系统就内置了最基本的端口转发功能,这是基于 Windows 的 IP Helper 提供的服务 ( 确保服务里面名称为“IP Helper”的服务处于运行状态) ,不仅可以提供端口转发功能,还可以通过将 IPv4 和 IPv6 的不同地址的数据进行转发,但是只可以转发 TCP 协议,暂不支持 UDP 协议。可以使用 netsh interface portproxy 系列命令来进行端口转发操作。

▲ IP Helper 服务

以管理员身份运行命令行程序,运行以下命令查看、添加、删除转发。

查看当前所有转发:

netsh interface portproxy show all

添加转发,将192.168.6.200端口61619转发到端口61615 :

netsh interface portproxy add v4tov4 listenport=61619 listenaddress=192.168.6.200 connectport=61615 connectaddress=192.168.6.200 protocol=tcp

现在端口转发应该建立起来了,使用下面的各种工具来验证一下:

首先查看一下目前所有的端口转发。

C:\WINDOWS\system32>netsh interface portproxy show all

侦听 ipv4:                 连接到 ipv4:

地址            端口        地址            端口
--------------- ----------  --------------- ----------
192.168.6.200   61619       192.168.6.200   61615

运行一个连接61619的NewTw程序,和一个监听61615的DateFeedGateway的行情网关程序,然后查看监听61615和61619两个端口的所有应用程序:

C:\WINDOWS\system32>netstat -ano | findstr "61615"
  TCP    0.0.0.0:61615          0.0.0.0:0              LISTENING       26604
  TCP    192.168.6.200:52028    192.168.1.9:61615      ESTABLISHED     36820
  TCP    192.168.6.200:56566    192.168.6.200:61615    ESTABLISHED     3824
  TCP    192.168.6.200:61615    192.168.6.200:56566    ESTABLISHED     26604

C:\WINDOWS\system32>netstat -ano | findstr "61619"
  TCP    192.168.6.200:56563    192.168.6.200:61619    ESTABLISHED     34156
  TCP    192.168.6.200:61619    0.0.0.0:0              LISTENING       3824
  TCP    192.168.6.200:61619    192.168.6.200:56563    ESTABLISHED     3824

查看监听61615和61619端口的 pid为3824的应用程序:

C:\WINDOWS\system32>tasklist | findstr 3824
svchost.exe                   3824 Services                   0      9,700 K

可以看到是系统服务,该服务在61615和61619之间转发数据。

查看监听61619端口的pid为34156的应用程序:

C:\WINDOWS\system32>tasklist | findstr 34156
NewTW.exe                    34156 Console                    1    137,976 K

可以看到这个是我们的行情查看程序,它监听的是本地的61619端口。

查看监听61615端口的pid为26604的应用程序:

C:\WINDOWS\system32>tasklist | findstr 26604
DataFeedGateWay.exe          26604 Console                    1     91,760 K

这个程序是一个行情转发程序,它监听本机的61615端口,如果有订阅就往订阅的Socket发送行情服务。

可以看到名为NewTw的行情程序订阅了192.168.6.200端口为61619的数据源服务。而名为DateFeedGateway的行情源网关监听192.168.6.200的61615端口。如果没有61619到61615的端口转发,NewTw是无法连接到61619来获取行情源服务的。

现在端口转发连接了61619和61615。NewTw的请求先从61619发送到端口转接程序,进而将请求转发到了61615。DateFeedGateway的行情数据从61615端口发送到端口转发程序,进而转发到61619,使得NewTw能够接收数据。

重启NewTw和重启DateFeedGateway,可以发现端口转发程序依然能够正常工作。

删除转发:

netsh interface portproxy delete v4tov4 listenport=61619 listenaddress=192.168.6.200

可以看到Windows这个内置的端口转发工具还是非常简单可易用的。只是它没有提供一个UI界面,只能通过命令行的形式来操作,并且网上还说这个不太稳定,有人的做法是定时重启IP Helper服务。

开源程序 pjs-passport


pjs-passport是一个开源软件,源代码可以直接下载。它是免费开源的、图形界面的、支持TCP/UDP、配置简单,可配置多组映射规则,可作为 Windows 系统服务运行。安装该软件后,必须以管理员身份运行。首先点击“Define/View forward”,可以配置转发规则和协议。

▲ 转发规则和协议配置 

然后回到主界面,点击Start,即可运行。

▲ 主界面 

运行之后,再次查看监控端口:

C:\WINDOWS\system32>netstat -ano | findstr 61619
  TCP    192.168.6.200:61619    0.0.0.0:0              LISTENING       44560
  TCP    192.168.6.200:61619    192.168.6.200:61764    ESTABLISHED     44560
  TCP    192.168.6.200:61764    192.168.6.200:61619    ESTABLISHED     14876

C:\WINDOWS\system32>netstat -ano | findstr 61615
  TCP    0.0.0.0:61615          0.0.0.0:0              LISTENING       45988
  TCP    192.168.6.200:52028    192.168.1.9:61615      ESTABLISHED     36820
  TCP    192.168.6.200:61615    192.168.6.200:61772    ESTABLISHED     45988
  TCP    192.168.6.200:61772    192.168.6.200:61615    ESTABLISHED     44560

查看,同时监听61619和61615的PID为44560的应用:

C:\WINDOWS\system32>tasklist | findstr 44560
PassPort.exe                 44560 Services                   0     21,048 K

可以看到,正是PassPort应用。

同样重启监听两个端口的程序,发现程序也能正常运行。PassPort也可安装作为Windows服务运行。

▲ Psj 服务

它的所有规则是存储在安装目录下的PassPortConfig.xml 文件中的。

<?xml version="1.0" encoding="utf-8"?>
<PassPort>
	<Forward>
		<Source address="192.168.6.200" port="61619" />
		<Target address="192.168.6.200" port="61615" />
		<Protocol type="tcp" />
	</Forward>
</PassPort>

打开PassPort的源码后发现,它是使用C++ CLR来实现的,需要依赖.NET Framework 2.0,可能是使用起来绘制UI界面更加方便。所有的核心代码都在PortForwarder.cpp类中,该类的接口PortForwarder.h如下:

#pragma once

#include <winsock2.h>

using namespace System;
using namespace System::Collections::Generic;
using namespace System::Threading;

namespace PassPort {

public ref class PortForwarder
{
public:
	PortForwarder(String ^srcAddr, String ^srcPort, String ^trgAddr, String ^trgPort,String ^proto);
	~PortForwarder(void);
	void Run();
	static void Init();
	static void ShutDown();
	static List<int> ^oldThreads = gcnew List<int>;
	//typedef ^SortedDictionary<int,SOCKET> malacka;

	static SortedDictionary<u_long,SortedDictionary<u_short ,SOCKET>^> ^udp_hosts = gcnew SortedDictionary<u_long, SortedDictionary<u_short ,SOCKET>^ >;
	static Mutex^ udp_hosts_mut = gcnew Mutex;
	static HANDLE h_Shutdown_Event ;
private: 
	static List<Thread^> ^forwarders = gcnew List<Thread^>;	
	//used to signal PortForwarder object to shutdown, ThreadAbortException not working on blocking socket operations
private:
	String ^ srcAddr;
	String ^ srcPort;
	String ^ trgAddr;
	String ^ trgPort;
	String ^ proto;
};

}

在实现上,可以查看PortForwarder.cpp类,它是使用C++ Windows Socket的IO模型中的WSAEventSelect模型来实现的,具体这里不详细介绍了,可以看源代码和相关文档,需要注意的是这种模型在C#中没有。

C#实现端口转发


有了源码就可以进行更多的修改,使用Socket编程,就能实现一个简单的端口转发程序,比如下面这段来自simple-tcp-forwarder-in-csharp的代码,非常简洁:

public class TcpForwarderSlim
{
    private class State
    {
        public Socket SourceSocket { get; private set; }
        public Socket DestinationSocket { get; private set; }
        public byte[] Buffer { get; private set; }

        public State(Socket source, Socket destination)
        {
            SourceSocket = source;
            DestinationSocket = destination;
            Buffer = new byte[1024 * 8];
        }
    }

    private readonly Socket _mainSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

    public void Start(IPEndPoint local, IPEndPoint remote)
    {
        _mainSocket.Bind(local);
        _mainSocket.Listen(10);
        while (true)
        {
            Socket source = null;
            try
            {
                source = _mainSocket.Accept();
                Console.WriteLine($"[{source.RemoteEndPoint.ToString()}] accept");
                TcpForwarderSlim destination = new TcpForwarderSlim();
                State state = new State(source, destination._mainSocket);
                destination.Connect(remote, source);
                source.BeginReceive(state.Buffer, 0, state.Buffer.Length, SocketFlags.None, OnDataReceive, state);
            }
            catch (Exception ex)
            {
                source?.Close();
            }
        }
    }
    private void Connect(EndPoint remoteEndPoint, Socket destination)
    {
        State state = new State(_mainSocket, destination);
        try
        {
            _mainSocket.Connect(remoteEndPoint);
            _mainSocket.BeginReceive(state.Buffer, 0, state.Buffer.Length, SocketFlags.None, OnDataReceive, state);
        }
        catch (Exception e)
        {
            state.DestinationSocket.Close();
            state.SourceSocket.Close();
        }
    }
    private void OnDataReceive(IAsyncResult result)
    {
        State state = (State)result.AsyncState;
        try
        {
            int bytesRead = state.SourceSocket.EndReceive(result);
            if (bytesRead > 0)
            {
                state.DestinationSocket.Send(state.Buffer, bytesRead, SocketFlags.None);
                state.SourceSocket.BeginReceive(state.Buffer, 0, state.Buffer.Length, SocketFlags.None, OnDataReceive, state);
            }
        }
        catch (Exception e)
        {
            state.DestinationSocket.Close();
            state.SourceSocket.Close();
        }
    }
}

代码非常简洁,使用起来也很方便,比如:

new TcpForwarderSlim().Start(new IPEndPoint(“192.168.6.200”, 61619), new IPEndPoint(“192.168.6.200”, 61615));

在Start中,先使用Accept阻塞的方法连接Source,得到一个SourceSocket,然后再另外启动一个实例,发起另外一个阻塞的Connect方法连接Destination,并将连接Source和Destination的两个Socket放到State中进行保存。同时在异步的BeginReceive中,使用了State的Buffer数组来保存接收的数据,并且使用State对象来保存源和目标Socket的相关信息以便在回调方法中使用。

Source和Destination的BeginReceive方法的回调事件OnDateReceive事件中,首先获取源Socket收到的字节数,然后将收到的数据发送到目标Destination的Socket中,同时继续调用BeginRecieve继续接收数据。在Start方法和Connect方法中,可以看到是初始化了两个State对象的,所以在OnDateRecive事件中Destination和Source的角色是相互的。

上面的每个TcpForwarderSlim代表一个端口转发,在实际过程中我们可以管理多个实例,每个实例运行一个Task,然后再Task里面调用该实例的Start方法,可以在这个基础上,添加UI界面,以实现一个类似psj-passport的应用相信也不难,并且还能很方便的添加一些状态显示,比如Source和Destination的当前连接状态,转发的总流量和速率等等。

上面是一个端口转发的实例,它巧妙的利用了端口转发两端互为源和目标的特点,并且利用了异步的BeginReceive方法简化了代码,代码非常简洁。同样也可以使用同步的方法来实现一个端口转发,比如这个TcpForwardingProxy来自的例子(源代码有bug,下面代码有修复)。

public class TCPForwarder
{
    public class ProxyEdge
    {
        private string label;
        private Socket source;
        private Socket target;
        private SemaphoreSlim sem;
        private int bufSize;
        private static object locker = new object();
        public Action<int, string> LogAction;
        private void Log(int level, string m)
        {
            if (LogAction != null)
            {
                LogAction(level, m);
            }
        }

        public ProxyEdge(string label, Socket source, Socket target, SemaphoreSlim sem, int bufSize)
        {
            this.label = label;
            this.source = source;
            this.target = target;
            this.sem = sem;
            this.bufSize = bufSize;
        }

        internal void Start()
        {
            Log(1, $"{label} waiting for lock.");
            sem.Wait();
            Log(1, $"{label} begin run.");
            new Thread(() => Run()).Start();
        }

        private void Run()
        {
            while (source.Connected)
            {
                byte[] bytes = new byte[bufSize];
                int bytesRec = 0;
                try
                {
                    bytesRec = source.Receive(bytes);
                }
                catch (Exception)
                {
                    Log(1, $"{label} source recieved error.");
                    CloseSocket(source);
                }
                if (bytesRec > 0)
                {
                    if (target.Connected)
                    {
                        Log(1, $"{label} forwarding {bytesRec} bytes");
                        target.Send(bytes, bytesRec, SocketFlags.None);
                        Log(1, $"{label} sent {bytesRec} bytes");
                    }
                    else
                    {
                        Log(1, $"{label} target down. {bytesRec} bytes discarded");
                        CloseSocket(source);
                        break;
                    }
                }
                else
                {
                    Log(1, $"{label} source recieved 0. {bytesRec} bytes discarded");
                    CloseSocket(source);
                    break;
                }
            }
            Log(1, $"{label} source disconnected");
            if (target.Connected)
            {
                Log(1, $"{label} target shutdown");
                CloseSocket(target);
            }
            Log(1, $"{label} release lock");
            sem.Release();
        }

        private void CloseSocket(Socket so)
        {
            lock (locker)
            {
                try
                {
                    if (so != null && so.Connected)
                    {
                        so.Close();
                        so = null;
                    }
                }
                catch (Exception e)
                {
                    Log(1, $"{label} close socket error:{e.Message}");
                }
            }

        }
    }

    private ProxyEdge listener;
    private ProxyEdge forwarder;

    public Action<int, string> LogAction;
    private void Log(int level, string m)
    {
        if (LogAction != null)
        {
            LogAction(level, m);
        }
    }
    public void Start(IPEndPoint local, IPEndPoint remote)
    {
        Socket socketListener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        socketListener.Bind(local);
        Log(1, "Start listening");
        socketListener.Listen(10);
        while (true)
        {
            try
            {
                using (Socket target = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
                {
                    Log(1, "Client connect target");
                    target.Connect(remote);
                    Log(1, "Waiting for listener connect");
                    using (Socket source = socketListener.Accept())
                    {
                        var sem = new SemaphoreSlim(2);
                        listener = new ProxyEdge("Listener", source, target, sem, 2048);
                        forwarder = new ProxyEdge("Forwarder", target, source, sem, 2048);
                        listener.LogAction = LogAction;
                        forwarder.LogAction = LogAction;
                        Log(1, "Starting edges");
                        listener.Start();
                        forwarder.Start();
                        sem.Wait();
                    }
                }
            }
            catch (Exception ex)
            {
                Log(1, ex.ToString());
            }
            Thread.Sleep(1000);
        }
    }
}

使用方法也很简单:

new TCPForwarder().Start(new IPEndPoint(“192.168.6.200”, 61619), new IPEndPoint(“192.168.6.200”, 61615));

上面这个代码里面都是调用的Socket的同步方法,在Start方法中,它依次连接Source和Destination两端,连接建立之后,调用ProxyEdge实例,来实现将Souce的数据转发到Destination中。可以看到端口转发的两端互为源和目标,所以实例化了两个ProxyEdge方法。

在ProxyEdge方法中,也是先从source接收数据,然后向target发送数据。需要注意的是,这里当发生异常时,关闭Socket的时候一定要加锁,因为source和target这两个socket在两个ProxyEdge实例中公用的。

上面两个使用C#进行端口转发的示例代码都进行了与之前一样的测试,包括关闭重启监听两个端口的应用程序,重启转发程序,都能恢复正常的转发功能。

总结


端口转发能够在数据传输的过程中增加一个“路由”,提供了某种程度的灵活,在有些场景下能够提高应用程序的灵活性。本文简单介绍了工作中遇到的一个可能使用端口转发能解决的例子,并介绍了一些实施端口转发的方法,比如在Windows上的内置的端口转发工具,一些开源的端口转发程序如psj-passort,最后再简单介绍了如何使用C#实现一个端口转发工具,这其中又包括了使用Socket异步和同步方法来实现端口转发的方法。

 

 参考