在之前实现端口转发的几种方法这篇文章中,介绍了三种实现端口转发的方法,本质就是建立一个TCP中继:

但要完整实现一个TCP转发或者代理(Tcp Relay),还是有很多细节需要考虑。在《Linux多线程服务端编程:使用muduo C++网络库》这本书中,作者提到了以下需要考虑的问题:

  1. 建立连接。TCPRelay在接受client的连接C之后才向server发起连接S,那么在S建立之前,从C收到的数据如何处理?要不要暂存起来?
  2. 并发连接的管理。上图中只画了一个client,实际上TcpRelay可以服务多个client,这两边的并发连接如何管理,如何防止串话(cross talk?)
  3. 连接断开。client和server都可能主动断开连接。当client主动断开连接C时,TcpRelay应该立刻断开S。当server主动断开连接S时,TcpRelay应该立刻断开C。在关闭的一刹那,又有新的client连接进来,复用了刚刚close的fd号码,会不会造成串话?万一client和server几乎同时断开连接,TcpRelay如何应对?
  4. 速度不匹配。如果连接C的带宽是100kB/s,而连接S的带宽是10MB/s,如果server全速发送数据,那么会不会撑爆TcpRelay的缓冲区?如何限速?

如果用传统的多线程阻塞IO来实现TcpRelay不难,它的好处是自动解决了速度不匹配的问题,缺点是并发度不会很高。但一个TCP中继程序则天然需要高性能,它不应该带来直连之外的额外的过多开销。本文使用.NET 8实现了一个TCP中继,达到了以上目标。

1. 目标


我想起来要做个这个工具的目的是,为后端服务实现负载均衡和故障转移。这个转发工具支持TCP和UDP(这里仅实现TCP),除了满足上述需要考虑的重点之外,还具备下面的这些功能:

  1. 高性能异步I/O:基于async/await和Socket异步API,实现高并发、低资源占用的网络处理。
  2. TCP多服务器故障转移:支持配置多个后端服务器,实现随机转移和自动故障切换(Failover)。
  3. TCP会话中故障转移:在中继会话期间,如果当前连接的后端服务器断开,能自动尝试连接列表中的下一个可用服务器,而客户端连接则保持不变。
  4. 连接健壮性:通过TCP Keep-Alives和可选的应用层超时,智能检测并清理“僵尸”连接。
  5. 实时流量监控与速率监控:提供了一个WinForm界面,实时显示每个中继的聚合流量,B/S速率,以及每条活动连接的详细统计数据。
  6. 灵活部署:既可以作为Windows 服务在后台部署,也可以作为桌面WinForms/Console应用进行调试和管理。
  7. 配置与日志:使用json文件保存和加载配置信息,并集成log4net实现详细的、可配置的日志记录。

2. 项目结构、配置与管理


2.1 项目结构


为保持代码的清晰和可维护性,解决方案分为三个项目:

  • TcpUdpRelay.Core:.NET 8类库,它是整个功能的核心,包含了所有中继逻辑、配置模型(RelayConfig,RemoteServerEndpoint)、网络会话管理(TCPRelaySession),以及日志和格式化工具。
  • TcpUdpRelay.UI:Winform应用程序,它引用TcpUdpRelay.Core,它允许用户实时查看中继状态,活动连接和流量速率,并提供一个编辑器(RelayEditorForm)来添加,修改或删除relay-config.json中的配置。
  • TcpUdpRelay.Service:.NET 8服务,一个Windows 服务项目,它也引用TcpUdpRelay.Core,它使用Microsoft.Extension.Hosting来注册和运行核心的RelayManager,确保中继服务在后台稳定运行,并能正确处理服务启停。

2.2 配置与管理


2.2.1 JSON配置

程序不硬编码中继规则,而是使用relay-config.json,这允许程序支持复杂的需求,特别是多服务器的故障转移,首先是服务器端点配置文件RemoteServerEndpoint.cs:

public class RemoteServerEndpoint
{
    /// <summary>
    /// Hostname or IP address of the remote server.
    /// </summary>
    public string Host { get; set; } = string.Empty;

    /// <summary>
    /// Port number of the remote server.
    /// </summary>
    public int Port { get; set; }

    /// <summary>
    /// Indicates whether this specific server endpoint is enabled and can be used by the relay.
    /// </summary>
    public bool IsEnabled { get; set; } = true;

    /// <summary>
    /// Unique identifier for this server entry, primarily for UI management (e.g., in a list editor).
    /// </summary>
    public Guid Id { get; set; } = Guid.NewGuid();

    /// <summary>
    /// Returns a string representation of the server endpoint.
    /// </summary>
    public override string ToString() => $"{Host}:{Port} ({(IsEnabled ? "Enabled" : "Disabled")})";

    /// <summary>
    /// Default constructor for JSON deserialization and UI-driven creation.
    /// </summary>
    public RemoteServerEndpoint() { }

    /// <summary>
    /// Initializes a new instance of the <see cref="RemoteServerEndpoint"/> class with specified details.
    /// </summary>
    /// <param name="host">The server host.</param>
    /// <param name="port">The server port.</param>
    /// <param name="isEnabled">Whether the server is enabled.</param>
    public RemoteServerEndpoint(string host, int port, bool isEnabled = true)
    {
        Host = host;
        Port = port;
        IsEnabled = isEnabled;
    }

    // Equality members for comparing instances, e.g., if used in collections or for change detection.
    public override bool Equals(object? obj)
    {
        return obj is RemoteServerEndpoint endpoint && Host == endpoint.Host &&
               Port == endpoint.Port && IsEnabled == endpoint.IsEnabled &&
               Id.Equals(endpoint.Id); // Compare by ID for uniqueness in lists
    }

    public override int GetHashCode()
    {
        // Use a combination of properties that define uniqueness or essential state.
        // Id is a good candidate for hash code generation if it's guaranteed unique.
        return HashCode.Combine(Host, Port, IsEnabled, Id);
    }
}

接下来是完整的中继配置RelayConfig.cs,它包含一个RemoteServerEndpoint列表:

public class RelayConfig
{
    /// <summary>
    /// Unique identifier for this relay configuration.
    /// </summary>
    public Guid Id { get; set; } = Guid.NewGuid();

    /// <summary>
    /// User-friendly name for this relay configuration.
    /// </summary>
    public string Name { get; set; } = "New Relay";

    /// <summary>
    /// Network protocol to use for this relay (TCP or UDP).
    /// </summary>
    public ProtocolType Protocol { get; set; } = ProtocolType.Tcp;

    /// <summary>
    /// Local IP address to bind the listener to. "0.0.0.0" listens on all available network interfaces.
    /// </summary>
    public string LocalIpAddress { get; set; } = "0.0.0.0";

    /// <summary>
    /// Local port number for the listener.
    /// </summary>
    public int LocalPort { get; set; }

    /// <summary>
    /// List of potential remote server endpoints. For TCP, this list is used for random selection and failover.
    /// For UDP, currently, the first enabled server in this list is used.
    /// </summary>
    public List<RemoteServerEndpoint> RemoteServers { get; set; } = new List<RemoteServerEndpoint>();

    /// <summary>
    /// Indicates whether this entire relay configuration is enabled and should be started.
    /// </summary>
    public bool IsEnabled { get; set; } = true;

    /// <summary>
    /// Maximum number of bytes to buffer for sending to a slower peer (client or server)
    /// before the connection is considered overwhelmed and potentially closed (backpressure mechanism).
    /// </summary>
    public long MaxSendBufferBytes { get; set; } = 10 * 1024 * 1024; // 10 MB default

    /// <summary>
    /// Application-level inactivity timeout in seconds (minutes) for client connections.
    /// If 0 or less, this timeout is disabled, relying on TCP keep-alives.
    /// </summary>
    public int ClientInactivityTimeoutSeconds { get; set; } = 0; // Default to disabled

    // --- Runtime Status Properties ---
    // These are not saved to the JSON configuration file but are updated at runtime.
    // Public setters are used here for easier updates by the RelayManager and instance.

    /// <summary>
    /// [Runtime] True if this relay instance is currently running (listening for connections).
    /// </summary>
    [JsonIgnore] public bool IsRunning { get; set; }

    /// <summary>
    /// [Runtime] Total bytes forwarded from clients to servers for this relay instance.
    /// </summary>
    [JsonIgnore] public long TotalBytesForwardedCtoS { get; set; }

    [JsonIgnore] public double RateCtoS { get; set; }
    /// <summary>
    /// [Runtime] Total bytes forwarded from servers to clients for this relay instance.
    /// </summary>
    [JsonIgnore] public long TotalBytesForwardedStoC { get; set; }
    [JsonIgnore] public double RateStoC { get; set; }

    /// <summary>
    /// [Runtime] Number of currently active client connections being relayed.
    /// </summary>
    [JsonIgnore] public int ActiveConnections { get; set; }

    /// <summary>
    /// [Runtime] Message of the last significant error encountered by this relay instance.
    /// </summary>
    [JsonIgnore] public string? LastError { get; set; }


    // --- Display Properties for UI Binding ---
    // These are calculated properties not stored in JSON, for convenient display in UI.

    /// <summary>
    /// [Display] Combined string for the local endpoint (e.g., "0.0.0.0:7001").
    /// </summary>
    [JsonIgnore] public string LocalEndpointDisplay => $"{LocalIpAddress}:{LocalPort}";

    /// <summary>
    /// [Display] Combined string for the remote endpoint(s).
    /// Shows the first enabled server, or indicates if multiple servers are configured.
    /// </summary>
    [JsonIgnore]
    public string RemoteEndpointDisplay
    {
        get
        {
            var enabledServers = RemoteServers?.Where(s => s.IsEnabled).ToList();
            if (enabledServers == null || !enabledServers.Any()) return "No enabled remote servers";

            var firstEnabled = enabledServers.First();
            if (enabledServers.Count > 1)
            {
                return $"{firstEnabled.Host}:{firstEnabled.Port} (+{enabledServers.Count - 1} others)";
            }
            return $"{firstEnabled.Host}:{firstEnabled.Port}";
        }
    }

    // 新增/修改: 用于UI绑定的格式化字符串
    [JsonIgnore] public string TotalBytesCtoSDisplay => FormatUtils.FormatBytes(TotalBytesForwardedCtoS);
    [JsonIgnore] public string TotalBytesStoCDisplay => FormatUtils.FormatBytes(TotalBytesForwardedStoC);
    [JsonIgnore] public string RateCtoSDisplay => FormatUtils.FormatBytesRate(RateCtoS);
    [JsonIgnore] public string RateStoCDisplay => FormatUtils.FormatBytesRate(RateStoC);
}

这里面包含很多JsonIgnore自定义属性,告诉Json序列化时,不需要保存这些字段。

2.2.2 中继管理器

中继管理器 RelayManager承担核心服务的管理和调度,它被注册微单例,负责:

  1. 在启动时加载relay-config.json。
  2. 根据配置列表,为每个中继创建和管理一个IRelayInstance(是TcpRelayInstance或UdpRelayInstance)。
  3. 处理配置更新:通过UdpateConfigs方法,它可以动态地启动、停止或重启中继实例,以响应配置文件的更改。
  4. 聚合状态:作为UI查询聚合状态和活动会话的入口点。

IRelayInstance如下:

public interface IRelayInstance : IDisposable
{
    Guid Id { get; }
    Task StartAsync(CancellationToken cancellationToken);
    // Stop is implicitly handled by Dispose()

    /// <summary>
    /// 获取此实例的当前聚合状态,包括计算的速率。
    /// 此方法设计为由管理器定期轮询。
    /// </summary>
    RelayStatusUpdate GetCurrentAggregatedStatus();

    /// <summary>
    /// 获取此实例下所有活动会话的详细信息列表。
    /// </summary>
    List<ActiveSessionInfo> GetActiveSessionDetails();
}

public class RelayStatusUpdate
{
    public bool IsRunning { get; set; }
    public int ActiveConnections { get; set; }
    public long TotalBytesForwardedCtoS { get; set; }
    public long TotalBytesForwardedStoC { get; set; }
    public double RateCtoS { get; set; }
    public double RateStoC { get; set; }
    public string? LastError { get; set; }
}

public class ActiveSessionInfo
{
    /// <summary>
    /// Unique identifier for this specific session instance.
    /// </summary>
    public Guid SessionId { get; set; }

    /// <summary>
    /// Identifier of the parent RelayConfig this session belongs to.
    /// </summary>
    public Guid ParentRelayConfigId { get; set; }

    /// <summary>
    /// Protocol used by this session (e.g., "Tcp", "Udp").
    /// </summary>
    public string Protocol { get; set; } = string.Empty;

    /// <summary>
    /// The remote endpoint address and port of the connected client.
    /// </summary>
    public string ClientRemoteEndPoint { get; set; } = "N/A";

    /// <summary>
    /// The remote endpoint address and port of the currently connected backend server.
    /// May show "Connecting..." or "Disconnected" if not actively relayed.
    /// </summary>
    public string ServerRemoteEndPoint { get; set; } = "N/A";

    /// <summary>
    /// Total bytes forwarded from the client to the server for this session.
    /// </summary>
    public long BytesForwardedCtoS { get; set; }
    public double RateCtoS { get; set; }
    /// <summary>
    /// Total bytes forwarded from the server to the client for this session.
    /// </summary>
    public long BytesForwardedStoC { get; set; }
    public double RateStoC { get; set; }

    /// <summary>
    /// The UTC time when this session was established (client connected).
    /// </summary>
    public DateTime ConnectedTimeUtc { get; set; }

    /// <summary>
    /// Calculated current duration of the session.
    /// </summary>
    public TimeSpan Duration => DateTime.UtcNow - ConnectedTimeUtc;

    // --- Display Helper Properties for DataGridView Binding ---

    /// <summary>
    /// [Display] Formatted duration string (hh:mm:ss).
    /// </summary>
    public string DurationDisplay => $"{(int)Duration.TotalHours:00}:{Duration.Minutes:00}:{Duration.Seconds:00}";

    /// <summary>
    /// [Display] Formatted bytes C->S string (with thousands separator).
    /// </summary>
    public string BytesCtoSDisplay => FormatUtils.FormatBytes(BytesForwardedCtoS);

    /// <summary>
    /// [Display] Formatted bytes S->C string (with thousands separator).
    /// </summary>
    public string BytesStoCDisplay => FormatUtils.FormatBytes(BytesForwardedStoC);

    public string RateCtoSDisplay => FormatUtils.FormatBytesRate(RateCtoS);
    public string RateStoCDisplay => FormatUtils.FormatBytesRate(RateStoC);

    /// <summary>
    /// [Display] Formatted connection time in local time.
    /// </summary>
    public string ConnectedTimeDisplay => ConnectedTimeUtc.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss");
}

RelayManager代码如下:

public class RelayManager : IDisposable
{
    private static readonly ILog Log = LogManager.GetLogger(typeof(RelayManager));
    // Dictionary to hold active relay instances, keyed by their configuration ID.
    private readonly ConcurrentDictionary<Guid, IRelayInstance> _activeRelays = new();
    // Current list of all relay configurations loaded.
    private List<RelayConfig> _configs;
    // Master cancellation token source for the RelayManager itself.
    private readonly CancellationTokenSource _managerCts = new();

    /// <summary>
    /// Delegate for handling status updates from individual relay instances.
    /// </summary>
    public delegate void RelayStatusUpdateHandler(Guid id, RelayStatusUpdate status);
    /// <summary>
    /// Event triggered when the status of a relay instance changes.
    /// </summary>
    public event RelayStatusUpdateHandler? OnStatusUpdate;

    /// <summary>
    /// Initializes a new instance of the RelayManager.
    /// </summary>
    /// <param name="initialConfigs">The initial list of relay configurations to manage.</param>
    public RelayManager(List<RelayConfig> initialConfigs)
    {
        LogConfigurator.Configure(); // Ensure logging is set up.
        _configs = initialConfigs ?? new List<RelayConfig>();
        Log.Info($"RelayManager initialized with {_configs.Count} configurations.");
        // Reset runtime status fields for all loaded configs.
        _configs.ForEach(ResetRuntimeStatus);
    }

    /// <summary>
    /// Resets runtime status fields of a RelayConfig object.
    /// </summary>
    private void ResetRuntimeStatus(RelayConfig c)
    {
        c.IsRunning = false;
        c.ActiveConnections = 0;
        c.TotalBytesForwardedCtoS = 0;
        c.TotalBytesForwardedStoC = 0;
        c.LastError = null;
        c.RateCtoS = 0;
        c.RateStoC = 0;
    }

    /// <summary>
    /// Updates the managed relay configurations.
    /// It compares the new list with the current one to determine which relays to add, remove, or update (restart).
    /// </summary>
    /// <param name="newConfigs">The new complete list of relay configurations.</param>
    public void UpdateConfigs(List<RelayConfig> newConfigs)
    {
        if (_managerCts.IsCancellationRequested)
        {
            Log.Warn("UpdateConfigs called on a disposing RelayManager. Ignoring.");
            return;
        }

        Log.Info($"Updating configurations. Old count: {_configs.Count}, New count: {newConfigs.Count}");
        var currentConfigMap = _configs.ToDictionary(c => c.Id);
        var newConfigMap = newConfigs.ToDictionary(c => c.Id);

        // Determine changes
        var idsToRemove = currentConfigMap.Keys.Except(newConfigMap.Keys).ToList();
        var idsToAdd = newConfigMap.Keys.Except(currentConfigMap.Keys).ToList();
        var idsToPotentiallyUpdate = currentConfigMap.Keys.Intersect(newConfigMap.Keys).ToList();

        // Process removals
        foreach (var idToRemove in idsToRemove)
        {
            StopRelay(idToRemove); // Stop and remove from active relays
            Log.Info($"Configuration removed: {currentConfigMap[idToRemove].Name} ({idToRemove})");
        }

        // Process updates to existing configurations
        foreach (var idToUpdate in idsToPotentiallyUpdate)
        {
            var oldConfig = currentConfigMap[idToUpdate];
            var newConfig = newConfigMap[idToUpdate];
            bool needsRestart = NeedsRestart(oldConfig, newConfig);
            bool currentlyRunning = _activeRelays.ContainsKey(idToUpdate);

            // If it's running and needs to stop (either disabled or config change requires restart)
            if (currentlyRunning && (!newConfig.IsEnabled || needsRestart))
            {
                Log.Info($"Stopping relay for update/disable: {newConfig.Name} ({idToUpdate})");
                StopRelay(idToUpdate);
                currentlyRunning = false; // Reflect that it's no longer in _activeRelays
            }

            // If it needs to run (due to being enabled and either needing restart or wasn't running)
            if (newConfig.IsEnabled && (needsRestart || !currentlyRunning))
            {
                Log.Info($"Starting/Restarting relay due to config update: {newConfig.Name} ({idToUpdate})");
                StartRelay(newConfig);
            }
            else if (!newConfig.IsEnabled && !currentlyRunning)
            {
                // Ensure UI reflects disabled and not running state
                UpdateLocalConfigState(idToUpdate, false, 0, 0, 0, null);
            }
        }

        // Process additions
        foreach (var idToAdd in idsToAdd)
        {
            var configToAdd = newConfigMap[idToAdd];
            Log.Info($"New configuration added: {configToAdd.Name} ({idToAdd})");
            if (configToAdd.IsEnabled)
            {
                StartRelay(configToAdd);
            }
            else
            {
                UpdateLocalConfigState(configToAdd.Id, false, 0, 0, 0, null); // Reflect disabled state
            }
        }

        // Update the internal master list of configurations and save to file
        _configs = new List<RelayConfig>(newConfigs);
        ConfigManager.SaveConfig(_configs);
        Log.Info("Configuration update process finished.");
    }

    /// <summary>
    /// Determines if changes between an old and new configuration require a relay instance to be restarted.
    /// </summary>
    private bool NeedsRestart(RelayConfig oldConfig, RelayConfig newConfig)
    {
        // Core listening parameters
        if (oldConfig.LocalIpAddress != newConfig.LocalIpAddress ||
            oldConfig.LocalPort != newConfig.LocalPort ||
            oldConfig.Protocol != newConfig.Protocol ||
            oldConfig.IsEnabled != newConfig.IsEnabled) // If IsEnabled changed, treat as needing restart/stop/start
        {
            return true;
        }

        // Changes to backend server list (for TCP, this involves connection logic)
        if (!RemoteServersEqual(oldConfig.RemoteServers, newConfig.RemoteServers))
        {
            return true;
        }

        // Other critical parameters
        if (oldConfig.MaxSendBufferBytes != newConfig.MaxSendBufferBytes)
        {
            return true; // Buffer size change often best handled by restart for simplicity
        }
        return false;
    }

    /// <summary>
    /// Helper to compare two lists of RemoteServerEndpoint objects.
    /// Considers Host, Port, and IsEnabled status for equality. Order doesn't matter.
    /// </summary>
    private static bool RemoteServersEqual(List<RemoteServerEndpoint>? list1, List<RemoteServerEndpoint>? list2)
    {
        if (ReferenceEquals(list1, list2)) return true;
        if (list1 is null || list2 is null) return false; // One is null, other is not
        if (list1.Count != list2.Count) return false;

        // Compare based on content, ignoring order and ID (as ID is for UI editing)
        var set1 = list1.Select(s => (s.Host, s.Port, s.IsEnabled)).ToHashSet();
        var set2 = list2.Select(s => (s.Host, s.Port, s.IsEnabled)).ToHashSet();
        return set1.SetEquals(set2);
    }

    /// <summary>
    /// Starts all relay configurations that are marked as enabled.
    /// </summary>
    public void StartAllEnabledRelays()
    {
        if (_managerCts.IsCancellationRequested) return;
        Log.Info("Starting all enabled relays...");
        // Iterate a copy in case _configs is modified by StartRelay (shouldn't happen here)
        _configs.Where(c => c.IsEnabled).ToList().ForEach(StartRelay);
        Log.Info("Finished attempting to start enabled relays.");
    }

    /// <summary>
    /// Stops all currently active relay instances.
    /// </summary>
    public void StopAllRelays()
    {
        Log.Info("Stopping all active relays...");
        // Iterate a copy of keys as StopRelay modifies _activeRelays
        _activeRelays.Keys.ToList().ForEach(StopRelay);
        Log.Info("All active relays have been signalled to stop.");
    }

    /// <summary>
    /// Starts a single relay instance based on its configuration.
    /// </summary>
    /// <param name="config">The configuration for the relay to start.</param>
    /// <returns>True if the start process was initiated, false otherwise.</returns>
    public void StartRelay(RelayConfig config)
    {
        if (config == null) { Log.Warn("StartRelay called with null config."); return; } // Early exit
        if (_managerCts.IsCancellationRequested) { Log.Warn($"StartRelay for '{config.Name}' called on a disposing RelayManager."); return; } // Early exit

        if (!config.IsEnabled)
        {
            Log.Info($"Relay '{config.Name}' is disabled, skipping start.");
            UpdateLocalConfigState(config.Id, false, 0, 0, 0, "Disabled");
            return; // Early exit
        }

        if (_activeRelays.ContainsKey(config.Id))
        {
            Log.Warn($"Relay '{config.Name}' ({config.Id}) is already running or a start attempt is in progress.");
            // Optionally, refresh status here if needed, but generally, the instance should manage its own.
            return; // Early exit (or handle as "already started successfully")
        }

        Log.Info($"Attempting to start relay: {config.Name} ({config.Protocol} {config.LocalEndpointDisplay} -> {config.RemoteEndpointDisplay})");
        try
        {
            IRelayInstance relayInstance;
            switch (config.Protocol)
            {
                case ProtocolType.Tcp:
                    if (config.RemoteServers == null || !config.RemoteServers.Any(s => s.IsEnabled))
                    {
                        Log.Error($"[{config.Name}] TCP relay requires at least one enabled RemoteServer in its configuration.");
                        UpdateLocalConfigState(config.Id, false, 0, 0, 0, "No enabled remote server");
                        return; // Early exit
                    }
                    relayInstance = new TcpRelayInstance(config, HandleRelayStatusUpdate);
                    break;
                case ProtocolType.Udp:
                    if (config.RemoteServers == null || !config.RemoteServers.Any(s => s.IsEnabled && !string.IsNullOrEmpty(s.Host) && s.Port > 0))
                    {
                        Log.Error($"[{config.Name}] UDP relay requires at least one enabled RemoteServer with valid Host and Port.");
                        UpdateLocalConfigState(config.Id, false, 0, 0, 0, "Valid RemoteServer needed for UDP");
                        return; // Early exit
                    }
                    relayInstance = new UdpRelayInstance(config, HandleRelayStatusUpdate);
                    break;
                default:
                    Log.Error($"Unsupported protocol type: {config.Protocol} for relay {config.Name}");
                    UpdateLocalConfigState(config.Id, false, 0, 0, 0, $"Unsupported protocol: {config.Protocol}");
                    return; // Early exit
            }

            if (_activeRelays.TryAdd(config.Id, relayInstance))
            {
                Log.Debug($"[{config.Name}] Added relay instance to active dictionary. Initiating StartAsync task...");
                _ = Task.Run(async () => await relayInstance.StartAsync(_managerCts.Token))
                    .ContinueWith(t => HandleRelayInstanceStartTaskCompletion(t, config, relayInstance), TaskScheduler.Default);

                UpdateLocalConfigState(config.Id, true, 0, 0, 0, null); // Optimistically set running status
                Log.Info($"Relay '{config.Name}' ({config.Id}) added and start process initiated.");
                // No return true needed
            }
            else
            {
                Log.Warn($"[{config.Name}] Failed to add relay instance {config.Id} to dictionary (race condition? already added?). Disposing created instance.");
                relayInstance.Dispose();
                if (!_activeRelays.ContainsKey(config.Id))
                {
                    UpdateLocalConfigState(config.Id, false, 0, 0, 0, "Failed to add to active relay list");
                }
                // No return false needed
            }
        }
        catch (Exception ex) // Catch exceptions from creating the relayInstance object itself
        {
            Log.Error($"Exception creating relay instance for '{config.Name}' ({config.Id})", ex);
            UpdateLocalConfigState(config.Id, false, 0, 0, 0, $"Instance init failed: {ex.Message}");
            // No return false needed
        }
    }

    /// <summary>
    /// Handles the completion of the StartAsync task of a relay instance.
    /// </summary>
    private void HandleRelayInstanceStartTaskCompletion(Task startTask, RelayConfig config, IRelayInstance instance)
    {
        if (_managerCts.IsCancellationRequested)
        {
            Log.Info($"[{config.Name}] StartAsync completion handled during manager shutdown. Ensuring instance disposal.");
            // If manager is shutting down, the instance might be disposed via StopAllRelays.
            // Explicitly trying to remove and dispose here can be redundant or cause issues if already handled.
            // The instance's StartAsync should respect cancellation.
            if (_activeRelays.TryRemove(config.Id, out var removedInstance))
            {
                removedInstance.Dispose();
            }
            else
            {
                // If not in activeRelays, it might have already been stopped/disposed or failed to add.
                // The instance passed might be the one that was created but failed to be added.
                instance.Dispose();
            }
            return;
        }

        if (startTask.IsFaulted)
        {
            Log.Error($"[{config.Name}] Relay instance StartAsync task faulted.", startTask.Exception?.GetBaseException());
            UpdateLocalConfigState(config.Id, false, 0, 0, 0, $"StartAsyncFail: {startTask.Exception?.GetBaseException()?.Message}");
            // Remove the failed instance from active relays and dispose it
            if (_activeRelays.TryRemove(config.Id, out var failedInstance))
            {
                failedInstance.Dispose();
            }
            else // If not found, dispose the instance passed (might be one that failed TryAdd but still ran Task.Run)
            {
                instance.Dispose();
            }
        }
        else if (startTask.IsCanceled)
        {
            Log.Warn($"[{config.Name}] Relay instance StartAsync task was cancelled.");
            UpdateLocalConfigState(config.Id, false, 0, 0, 0, "StartAsync Canceled");
            // Instance should have cleaned up itself. Remove from active if still there.
            if (_activeRelays.TryRemove(config.Id, out var cancelledInstance))
            {
                cancelledInstance.Dispose(); // Ensure it's disposed
            }
            else
            {
                instance.Dispose();
            }
        }
        else // Task completed successfully (meaning the StartAsync method ran to completion - it's a long-running task)
        {
            Log.Info($"[{config.Name}] Relay instance StartAsync method completed (likely means its main loop exited or was never entered due to early cancellation).");
            // The instance itself should call ReportStatus when it finally stops running.
            // If it's still in _activeRelays, it might mean it exited prematurely.
            // This path is unusual unless StartAsync has a very short lifecycle.
            // For long-running listeners, this means the listener stopped.
            if (_activeRelays.ContainsKey(config.Id))
            {
                Log.Warn($"[{config.Name}] StartAsync completed but instance still in activeRelays. Instance might have stopped prematurely.");
                // Status will be updated by the instance via HandleRelayStatusUpdate or by StopRelay
            }
        }
    }

    /// <summary>
    /// Stops a single relay instance by its configuration ID.
    /// </summary>
    /// <param name="id">The ID of the relay configuration to stop.</param>
    /// <returns>True if the relay was found and stop was initiated, false otherwise.</returns>
    public void StopRelay(Guid id)
    {
        Log.Info($"Attempting to stop relay with ID: {id}");
        if (_activeRelays.TryRemove(id, out var relayInstance))
        {
            var config = _configs.FirstOrDefault(c => c.Id == id); // For logging name
            Log.Info($"Stopping relay: {config?.Name ?? "ID: " + id.ToString()}");
            try
            {
                relayInstance.Dispose(); // This should cancel its CancellationToken and stop operations.
                UpdateLocalConfigState(id, false, 0, 0, 0, null); // Mark as stopped
                Log.Info($"Relay '{config?.Name ?? id.ToString()}' stop initiated and removed from active list.");
                return;
            }
            catch (Exception ex)
            {
                Log.Error($"Error disposing relay '{config?.Name ?? id.ToString()}'", ex);
                UpdateLocalConfigState(id, false, 0, 0, 0, $"Stop failed: {ex.Message}");
                return;
            }
        }
        else
        {
            // Relay not found in active list, might be already stopped or never started.
            // Ensure its status in the main config list reflects it's not running.
            Log.Warn($"Relay with ID {id} not found in active relays. Cannot stop (may already be stopped).");
            UpdateLocalConfigState(id, false, 0, 0, 0, null); // Ensure status is non-running
            return; // Indicate it wasn't actively running to stop from this call.
        }
    }

    /// <summary>
    /// Callback for IRelayInstance to report status changes.
    /// </summary>
    private void HandleRelayStatusUpdate(Guid id, RelayStatusUpdate status)
    {
        //// Update the local RelayConfig object's status fields and invoke the public event.
        //UpdateLocalConfigState(id, status.IsRunning, status.ActiveConnections, status.TotalBytesForwardedCtoS, status.TotalBytesForwardedStoC, status.LastError);
        // 更新本地状态并触发外部事件
        UpdateLocalConfigState(id, status.IsRunning, status.ActiveConnections, status.LastError);
    }

    /// <summary>
    /// Updates the runtime status fields of a specific RelayConfig in the internal list and triggers OnStatusUpdate event.
    /// </summary>
    /// <summary>
    /// 更新本地配置对象中的非流量运行时状态字段。
    /// </summary>
    private void UpdateLocalConfigState(Guid id, bool isRunning, int activeConnections, string? error)
    {
        var config = _configs.FirstOrDefault(c => c.Id == id);
        if (config != null)
        {
            bool changed = config.IsRunning != isRunning ||
                           config.ActiveConnections != activeConnections ||
                           config.LastError != error;

            if (changed)
            {
                config.IsRunning = isRunning;
                config.ActiveConnections = activeConnections;
                config.LastError = error;

                // 触发外部事件,仅包含非流量数据
                OnStatusUpdate?.Invoke(id, new RelayStatusUpdate
                {
                    IsRunning = isRunning,
                    ActiveConnections = activeConnections,
                    LastError = error
                    // 流量/速率字段将通过 GetCurrentConfigsWithStatus() 拉取
                });
                Log.Debug($"Status updated (non-traffic): {config.Name} Run={isRunning}, Conn={activeConnections}, Err={error ?? "N/A"}");
            }
        }
        else { Log.Warn($"Status update for unknown ID: {id}"); }
    }

    // --- 重写 UpdateLocalConfigState (用于 StopRelay/StartRelay 故障) ---
    // (此重载现在仅用于设置错误或停止状态,并且不应包含流量)
    private void UpdateLocalConfigState(Guid id, bool isRunning, int activeConnections, long bytesCtoS, long bytesStoC, string? error)
    {
        // 重定向到简化的方法。我们不再从这里设置流量。
        UpdateLocalConfigState(id, isRunning, activeConnections, error);

        // 如果我们需要为这个特定的调用设置流量(例如,重置为0)
        var config = _configs.FirstOrDefault(c => c.Id == id);
        if (config != null && (bytesCtoS != 0 || bytesStoC != 0 || config.TotalBytesForwardedCtoS != 0))
        {
            // 这是一个重置调用
            config.TotalBytesForwardedCtoS = bytesCtoS; // 应该是 0
            config.TotalBytesForwardedStoC = bytesStoC; // 应该是 0
            config.RateCtoS = 0;
            config.RateStoC = 0;
        }
    }

    /// <summary>
    /// Gets a copy of the current list of relay configurations with their latest runtime status.
    /// </summary>
    public List<RelayConfig> GetCurrentConfigsWithStatus()
    {
        // 遍历内部 _configs 列表并更新其运行时状态
        foreach (var config in _configs)
        {
            if (_managerCts.IsCancellationRequested) break;

            if (_activeRelays.TryGetValue(config.Id, out var instance))
            {
                // 实例正在运行 -> 主动轮询它的最新状态
                var status = instance.GetCurrentAggregatedStatus();

                // 将最新状态复制到 config 对象
                config.IsRunning = status.IsRunning;
                config.ActiveConnections = status.ActiveConnections;
                config.TotalBytesForwardedCtoS = status.TotalBytesForwardedCtoS;
                config.TotalBytesForwardedStoC = status.TotalBytesForwardedStoC;
                config.RateCtoS = status.RateCtoS; // 更新速率
                config.RateStoC = status.RateStoC; // 更新速率
                config.LastError = status.LastError;
            }
            else
            {
                // 实例未在 _activeRelays 中运行 -> 重置其运行时状态
                ResetRuntimeStatus(config);
                // 保持 IsEnabled 状态不变
                config.IsEnabled = _configs.FirstOrDefault(c => c.Id == config.Id)?.IsEnabled ?? config.IsEnabled;
            }
        }

        // 返回更新后的列表的浅表副本
        return _configs.ToList();
    }

    /// <summary>
    /// Gets detailed information about active sessions for a specific relay configuration.
    /// </summary>
    /// <param name="relayId">The ID of the relay configuration.</param>
    /// <returns>A list of ActiveSessionInfo objects, or an empty list if relay not found or has no sessions.</returns>
    public List<ActiveSessionInfo> GetActiveSessionsForRelay(Guid relayId)
    {
        if (_activeRelays.TryGetValue(relayId, out var relayInstance))
        {
            return relayInstance.GetActiveSessionDetails();
        }
        return new List<ActiveSessionInfo>(); // Return empty list if relay instance not found
    }

    /// <summary>
    /// Disposes the RelayManager, stopping all relays and cleaning up resources.
    /// </summary>
    public void Dispose()
    {
        Log.Info("Disposing RelayManager...");
        if (!_managerCts.IsCancellationRequested)
        {
            try { _managerCts.Cancel(); } // Signal cancellation to all managed operations
            catch (ObjectDisposedException) { /* Already disposed, ignore */ }
        }
        StopAllRelays(); // Ensure all instances are stopped and disposed
        _managerCts.Dispose(); // Dispose the CancellationTokenSource
        Log.Info("RelayManager disposed.");
        GC.SuppressFinalize(this);
    }
}

3.关键实现:会话中的故障转移


这个工具实现的目的之一是:当客户端C已经连接到中继器R,R也连接到服务器S1时,如果S1突然断开,R应该自动尝试连接到S2,S3...,而客户端C完全感知不到这个切换,数据继续流动。具体的实现是在TcpRelaySession中实现的,它管理了单个C<->S连接。

3.1 ProcessAsync的双循环结构

ProcessAsync方法是会话的核心,它使用一个外循环来保持客户端连接,一个内循环来管理与后端服务器的连接和中继。TcpRelaySession的完整代码如下,其中ProcessAsync是核心逻辑:

public class TcpRelaySession : IDisposable
{
    private static readonly ILog Log = LogManager.GetLogger(typeof(TcpRelaySession));
    private readonly Guid _sessionId;
    private readonly RelayConfig _config;
    private readonly Socket _clientSocket;
    private Socket? _serverSocket;

    private readonly Action<Guid> _onClosedCallback;
    private readonly Action<long, long> _onTrafficCallback;

    private readonly CancellationTokenSource _sessionCts = new CancellationTokenSource();
    private readonly ConcurrentQueue<ArraySegment<byte>> _clientSendBuffer = new();
    private readonly SemaphoreSlim _dataAvailableSignal = new SemaphoreSlim(0);

    private long _pendingSendBytesC = 0;
    private long _pendingSendBytesS = 0;
    private long _sessionBytesForwardedCtoS = 0;
    private long _sessionBytesForwardedStoC = 0;
    // 新增: 用于计算会话速率的状态字段
    private long _lastReportedBytesCtoS_Session = 0;
    private long _lastReportedBytesStoC_Session = 0;
    private DateTime _lastReportTimeUtc_Session = DateTime.UtcNow;
    private double _currentSessionRateCtoS = 0;
    private double _currentSessionRateStoC = 0;

    private readonly DateTime _connectedTimeUtc = DateTime.UtcNow;

    private const int BufferSize = 8192;
    private volatile bool _disposed = false;

    private List<RemoteServerEndpoint> _availableServerEndpoints = new List<RemoteServerEndpoint>();
    private HashSet<RemoteServerEndpoint> _attemptedServersInCurrentCycle = new HashSet<RemoteServerEndpoint>();
    private Random _serverSelectorRandom = new Random();
    private RemoteServerEndpoint? _currentConnectedServerEndpoint;
    private string? _lastError = null;

    public Guid SessionId => _sessionId;

    public TcpRelaySession(Guid sessionId, RelayConfig config, Socket clientSocket, Action<Guid> onClosedCallback, Action<long, long> onTrafficCallback)
    {
        _sessionId = sessionId;
        _config = config;
        _clientSocket = clientSocket ?? throw new ArgumentNullException(nameof(clientSocket));
        _onClosedCallback = onClosedCallback;
        _onTrafficCallback = onTrafficCallback;
        if (_config.RemoteServers != null)
        {
            _availableServerEndpoints.AddRange(_config.RemoteServers.Where(s => s.IsEnabled));
        }
        if (!_availableServerEndpoints.Any())
        {
            Log.Warn($"[{_config.Name}][{_sessionId}] No enabled remote servers loaded from config.");
        }
    }

    private async Task<bool> TryConnectToAnAvailableServerAsync(CancellationToken token)
    {
        if (_disposed || token.IsCancellationRequested)
            return false;
        if (!_availableServerEndpoints.Any())
        {
            Log.Warn($"[{_config.Name}][{_sessionId}] No enabled remote servers configured.");
            _lastError = "No enabled remote servers.";
            return false;
        }

        var candidateServers = _availableServerEndpoints.Except(_attemptedServersInCurrentCycle).ToList();
        if (!candidateServers.Any())
        {
            Log.Info($"[{_config.Name}][{_sessionId}] All available servers attempted. Resetting/delaying.");
            _attemptedServersInCurrentCycle.Clear();
            candidateServers = _availableServerEndpoints.ToList();
            if (!candidateServers.Any())
            {
                Log.Warn($"[{_config.Name}][{_sessionId}] No candidates after reset.");
                _lastError = "All servers failed; none left.";
                return false;
            }
            try
            {
                await Task.Delay(TimeSpan.FromSeconds(Math.Min(5, _availableServerEndpoints.Count)), token);
            }
            catch (OperationCanceledException)
            {
                return false;
            }
        }

        var serverToTry = candidateServers[_serverSelectorRandom.Next(candidateServers.Count)];
        _attemptedServersInCurrentCycle.Add(serverToTry);
        Log.Info($"[{_config.Name}][{_sessionId}] Attempting connect: {serverToTry.Host}:{serverToTry.Port}");
        var newServerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        try
        {
            try
            {
                newServerSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
            }
            catch (Exception ex)
            {
                Log.Warn($"[{_config.Name}][{_sessionId}] Failed opts {serverToTry}: {ex.Message}");
            }
            await newServerSocket.ConnectAsync(serverToTry.Host, serverToTry.Port, token);
            if (!IsSocketConnected(newServerSocket))
                throw new SocketException((int)SocketError.NotConnected);
            Log.Info($"[{_config.Name}][{_sessionId}] Connected server: {serverToTry.Host}:{serverToTry.Port} ({newServerSocket.RemoteEndPoint})");
            _serverSocket = newServerSocket;
            _currentConnectedServerEndpoint = serverToTry;
            _attemptedServersInCurrentCycle.Clear();
            _lastError = null;
            return true;
        }
        catch (OperationCanceledException)
        {
            Log.Info($"[{_config.Name}][{_sessionId}] Connect cancelled: {serverToTry.Host}");
            CloseSocket(newServerSocket, $"Server (cancelled {serverToTry.Host})");
            return false;
        }
        catch (Exception ex)
        {
            Log.Warn($"[{_config.Name}][{_sessionId}] Failed connect {serverToTry.Host}: {ex.Message}");
            CloseSocket(newServerSocket, $"Server (failed {serverToTry.Host})");
            _lastError = $"S-ConnFail:{serverToTry.Host}";
            return false;
        }
    }

    public async Task ProcessAsync(CancellationToken parentToken)
    {
        if (_disposed)
            return;
        Log.Debug($"[{_config.Name}][{_sessionId}] ProcessAsync starting. Client: {_clientSocket?.RemoteEndPoint}");
        using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(parentToken, _sessionCts.Token);
        var token = linkedCts.Token;
        Task? clientReadTask = null;
        try
        {
            clientReadTask = ReadAndBufferClientAsync(token);
            while (!token.IsCancellationRequested && IsSocketConnected(_clientSocket))
            {
                if (!IsSocketConnected(_serverSocket))
                {
                    Log.Info($"[{_config.Name}][{_sessionId}] Server disconnected. Trying connect/reconnect...");
                    if (!await TryConnectToAnAvailableServerAsync(token))
                    {
                        if (token.IsCancellationRequested || !IsSocketConnected(_clientSocket))
                            break;
                        Log.Warn($"[{_config.Name}][{_sessionId}] Failed connect attempt. Retrying after delay...");
                        if (_availableServerEndpoints.Any() && _attemptedServersInCurrentCycle.Count >= _availableServerEndpoints.Count)
                        {
                            try
                            {
                                await Task.Delay(TimeSpan.FromSeconds(5), token);
                            }
                            catch (OperationCanceledException)
                            {
                                break;
                            }
                        }
                        else if (!_availableServerEndpoints.Any())
                        {
                            Log.Error($"[{_config.Name}][{_sessionId}] No remote servers. Closing.");
                            SignalClosure();
                            break;
                        }
                        continue;
                    }
                    Log.Info($"[{_config.Name}][{_sessionId}] Now connected server: {_serverSocket.RemoteEndPoint} ({_currentConnectedServerEndpoint})");
                }

                using (var serverRelayCts = CancellationTokenSource.CreateLinkedTokenSource(token))
                {
                    var serverRelayToken = serverRelayCts.Token;
                    Task? serverWriteTask = null;
                    Task? serverReadTask = null;
                    bool serverCycleEnded = false;
                    try
                    {
                        Log.Debug($"[{_config.Name}][{_sessionId}] Starting relay tasks for server {_serverSocket.RemoteEndPoint}");
                        serverWriteTask = SendFromQueueToServerAsync(_serverSocket, _clientSendBuffer, _dataAvailableSignal, serverRelayToken);
                        serverReadTask = ReadAndRelayAsync(_serverSocket, _clientSocket, false, serverRelayToken);
                        var relayTasks = new List<Task> { serverWriteTask, serverReadTask };
                        var completedTask = await Task.WhenAny(relayTasks);
                        Log.Debug($"[{_config.Name}][{_sessionId}] Server task completed: {completedTask.Status} for {_serverSocket.RemoteEndPoint}");
                        if (completedTask.IsFaulted)
                        {
                            Log.Warn($"[{_config.Name}][{_sessionId}] Server task faulted {_serverSocket.RemoteEndPoint}: {completedTask.Exception?.GetBaseException().Message}");
                            _lastError = $"S-TaskFail:{_currentConnectedServerEndpoint}";
                        }
                        else if (completedTask.IsCanceled)
                        {
                            Log.Info($"[{_config.Name}][{_sessionId}] Server task cancelled {_serverSocket.RemoteEndPoint}.");
                            if (token.IsCancellationRequested)
                            {
                                serverCycleEnded = true;
                            }
                        } // Break outer loop if main token cancelled
                        else
                        {
                            Log.Info($"[{_config.Name}][{_sessionId}] Server task completed normally {_serverSocket.RemoteEndPoint}.");
                        }
                    }
                    catch (Exception ex)
                    {
                        Log.Error($"[{_config.Name}][{_sessionId}] Ex managing server tasks for {_serverSocket?.RemoteEndPoint}: {ex.Message}", ex);
                        _lastError = $"S-TaskMgmtFail:{_currentConnectedServerEndpoint}";
                    }
                    finally
                    {
                        if (!serverRelayCts.IsCancellationRequested)
                        {
                            Log.Debug($"[{_config.Name}][{_sessionId}] Cancelling server-specific tasks for {_serverSocket?.RemoteEndPoint}.");
                            serverRelayCts.Cancel();
                        }
                        var tasksToWait = new List<Task>();
                        if (serverWriteTask != null)
                            tasksToWait.Add(serverWriteTask);
                        if (serverReadTask != null)
                            tasksToWait.Add(serverReadTask);
                        if (tasksToWait.Any())
                        {
                            try
                            {
                                await Task.WhenAll(tasksToWait);
                            }
                            catch { }
                        } // Wait for cleanup, ignore routine exceptions
                        Log.Info($"[{_config.Name}][{_sessionId}] Closing server connection {_serverSocket?.RemoteEndPoint} for failover/end.");
                        CloseSocket(_serverSocket, $"Server (failover cycle for {_currentConnectedServerEndpoint})");
                        _serverSocket = null;
                        _currentConnectedServerEndpoint = null;
                    }
                    if (serverCycleEnded)
                        break; // Break outer loop if needed
                }
            } // End outer while loop
            Log.Info($"[{_config.Name}][{_sessionId}] Exited main processing loop. Client disconnected or session cancelled.");
        }
        catch (OperationCanceledException) when (token.IsCancellationRequested)
        {
            Log.Info($"[{_config.Name}][{_sessionId}] Main session ProcessAsync cancelled.");
        }
        catch (Exception ex)
        {
            Log.Error($"[{_config.Name}][{_sessionId}] Critical error ProcessAsync setup: {ex.Message}", ex);
            _lastError = $"SessionFail:{ex.GetType().Name}";
            SignalClosure();
        }
        finally
        {
            Log.Info($"[{_config.Name}][{_sessionId}] ProcessAsync finally. Ensuring cleanup.");
            if (clientReadTask != null && !clientReadTask.IsCompleted)
            {
                if (!token.IsCancellationRequested)
                    SignalClosure();
                try
                {
                    await Task.WhenAny(clientReadTask, Task.Delay(1000));
                }
                catch { }
            }
            Dispose();
        }
    }

    // Add a configurable timeout (e.g., as a constant or from RelayConfig)
    // For this example, let's use a constant. Consider making it configurable in RelayConfig.
    private static readonly TimeSpan ClientInactivityTimeout = TimeSpan.FromMinutes(5); // Example: 5 minutes
                                                                                        // Producer C -> Queue
    private async Task ReadAndBufferClientAsync(CancellationToken token)
    {
        if (_disposed)
            return;
        var clientEndPoint = _clientSocket?.RemoteEndPoint?.ToString() ?? "N/A";
        // At the start of ReadAndBufferClientAsync
        var inactivityTimeout = (_config.ClientInactivityTimeoutSeconds > 0)
            ? TimeSpan.FromSeconds(_config.ClientInactivityTimeoutSeconds)
            : Timeout.InfiniteTimeSpan; // Use Timeout.InfiniteTimeSpan if disabled
        Log.Debug($"[{_config.Name}][{_sessionId}] C->Buffer starting read loop from client {clientEndPoint}. Configured Inactivity Timeout: {(inactivityTimeout == Timeout.InfiniteTimeSpan ? "Disabled" : inactivityTimeout.ToString())}");
        var buffer = ArrayPool<byte>.Shared.Rent(BufferSize);
        try
        {
            while (!token.IsCancellationRequested && IsSocketConnected(_clientSocket))
            {
                var receiveMemory = new Memory<byte>(buffer);
                ValueTask<int> receiveValueTask = _clientSocket.ReceiveAsync(receiveMemory, SocketFlags.None, token); // Using ValueTask<int> might be slightly more performant for frequent completions
                int bytesRead;
                if (inactivityTimeout == Timeout.InfiniteTimeSpan)
                {
                    bytesRead = await receiveValueTask; // Wait indefinitely if timeout is disabled
                }
                else
                {
                    Task timeoutTask = Task.Delay(inactivityTimeout, token);
                    Task<int> receiveTask = receiveValueTask.IsCompletedSuccessfully ? Task.FromResult(receiveValueTask.Result) : receiveValueTask.AsTask();
                    //Task<int> receiveTask = receiveValueTask.AsTask();
                    var completedTask = await Task.WhenAny(receiveTask, timeoutTask);
                    if (token.IsCancellationRequested)
                    {
                        Log.Info($"[{_config.Name}][{_sessionId}] C->Buffer: Operation cancelled while/after WhenAny.");
                        break;
                    }
                    if (completedTask == timeoutTask)
                    {
                        Log.Warn($"[{_config.Name}][{_sessionId}] Client {clientEndPoint} timed out after {inactivityTimeout.TotalSeconds}s of inactivity. Closing session.");
                        break; // Exit loop, finally block will call SignalClosure()
                    }
                    else
                    {
                        Log.Debug($"[{_config.Name}][{_sessionId}] C->Buffer: receiveTask completed before timeout. Status: {receiveTask.Status}");
                        if (receiveTask.IsFaulted)
                        {
                            Log.Warn($"[{_config.Name}][{_sessionId}] C->Buffer: receiveTask faulted before timeout processing.", receiveTask.Exception?.GetBaseException());
                            await receiveTask; // Re-throw to be caught by outer catch block
                        }
                        if (receiveTask.IsCanceled) // Should be caught by outer OperationCanceledException
                        {
                            Log.Info($"[{_config.Name}][{_sessionId}] C->Buffer: receiveTask was cancelled before timeout processing.");
                            break;
                        }
                        bytesRead = receiveTask.Result; // Get the actual result
                    }
                }
                if (ProcessReceivedBytes(bytesRead, buffer, clientEndPoint, token))
                    break; // Pass token to helper
            }
        }
        catch (OperationCanceledException)
        {
            Log.Debug($"[{_config.Name}][{_sessionId}] C->Buffer cancelled: {clientEndPoint}.");
        }
        catch (SocketException se) when (IsConnectionCloseError(se.SocketErrorCode))
        {
            Log.Info($"[{_config.Name}][{_sessionId}] Client {clientEndPoint} closed: {se.SocketErrorCode} ex:{se.StackTrace}");
        }
        catch (ObjectDisposedException)
        {
            Log.Debug($"[{_config.Name}][{_sessionId}] Client socket disposed: {clientEndPoint}.");
        }
        catch (Exception ex)
        {
            Log.Error($"[{_config.Name}][{_sessionId}] Error reading client {clientEndPoint}", ex);
        }
        finally
        {
            ArrayPool<byte>.Shared.Return(buffer);
            Log.Debug($"[{_config.Name}][{_sessionId}] Exiting C->Buffer loop: {clientEndPoint}. Signaling closure.");
            try
            {
                _dataAvailableSignal.Release();
            }
            catch { }
            SignalClosure();
        }
    }

    private bool ProcessReceivedBytes(int bytesRead, byte[] buffer, string? clientEndPoint, CancellationToken token) // Added token
    {
        Log.Debug($"[{_config.Name}][{_sessionId}] C->Buffer Received {bytesRead} bytes from client {clientEndPoint}.");
        if (bytesRead == 0)
        {
            Log.Info($"[{_config.Name}][{_sessionId}] Client {clientEndPoint} disconnected gracefully (read 0 bytes).");
            return true; // Break loop
        }

        var segment = RentAndCopy(buffer, bytesRead);
        long currentPendingS = Interlocked.Read(ref _pendingSendBytesS);
        if (currentPendingS + bytesRead > _config.MaxSendBufferBytes)
        {
            Log.Warn($"[{_config.Name}][{_sessionId}] Backpressure C->S. Pending ({currentPendingS + bytesRead}) > limit ({_config.MaxSendBufferBytes}). Closing session.");
            ArrayPool<byte>.Shared.Return(segment.Array!);
            SignalClosure();
            return true; // Break loop
        }
        _clientSendBuffer.Enqueue(segment);
        Interlocked.Add(ref _pendingSendBytesS, bytesRead);
        try
        {
            _dataAvailableSignal.Release();
        }
        catch (ObjectDisposedException)
        {
            Log.Debug($"[{_config.Name}][{_sessionId}] DataAvailableSignal disposed, could not release.");
            if (token.IsCancellationRequested) return true; // Break loop if cancelled
        }
        return false; // Continue loop
    }

    // Consumer Queue -> S
    private async Task SendFromQueueToServerAsync(Socket destinationSocket, ConcurrentQueue<ArraySegment<byte>> bufferQueue, SemaphoreSlim dataAvailableSignal, CancellationToken token)
    {
        if (_disposed || !IsSocketConnected(destinationSocket)) return;
        var destEndPoint = destinationSocket?.RemoteEndPoint?.ToString() ?? "N/A";
        string dir = "C->S";
        Log.Info($"[{_config.Name}][{_sessionId}][{dir}] SendFromQueue starting: {destEndPoint}.");
        try
        {
            while (!token.IsCancellationRequested && IsSocketConnected(destinationSocket))
            {
                if (!bufferQueue.TryDequeue(out var segment))
                {
                    try
                    {
                        await dataAvailableSignal.WaitAsync(TimeSpan.FromMilliseconds(200), token);
                        if (token.IsCancellationRequested || !IsSocketConnected(destinationSocket))
                        {
                            Log.Warn($"[{_config.Name}][{_sessionId}][{dir}] CancellationRequested or Destination Disconnected.");
                            break;
                        }
                        continue;
                    }
                    catch (OperationCanceledException)
                    {
                        Log.Warn($"[{_config.Name}][{_sessionId}][{dir}] OperationCanceledException .");
                        break;
                    }
                    catch (TimeoutException)
                    {
                        continue;
                    }
                }
                try
                {
                    int totalSent = 0;
                    while (totalSent < segment.Count && !token.IsCancellationRequested && IsSocketConnected(destinationSocket))
                    {
                        var sent = await destinationSocket.SendAsync(segment.Slice(totalSent), SocketFlags.None, token);
                        if (sent == 0)
                            throw new SocketException((int)SocketError.ConnectionAborted);
                        totalSent += sent;
                    }
                    if (token.IsCancellationRequested || !IsSocketConnected(destinationSocket))
                    {
                        Log.Warn($"[{_config.Name}][{_sessionId}][{dir}] Send cancel/disconnect mid-segment {destEndPoint}. Sent {totalSent}/{segment.Count}.");
                        Interlocked.Add(ref _pendingSendBytesS, -totalSent);
                        Interlocked.Add(ref _sessionBytesForwardedCtoS, totalSent);
                        _onTrafficCallback?.Invoke(totalSent, 0);
                        break;
                    }
                    Log.Debug($"[{_config.Name}][{_sessionId}][{dir}] Sent {totalSent} from queue to {destEndPoint}.");
                    Interlocked.Add(ref _pendingSendBytesS, -totalSent);
                    Interlocked.Add(ref _sessionBytesForwardedCtoS, totalSent);
                    _onTrafficCallback?.Invoke(totalSent, 0);
                }
                finally
                {
                    if (segment.Array != null)
                        ArrayPool<byte>.Shared.Return(segment.Array);
                }
            }
        }
        catch (OperationCanceledException)
        {
            Log.Warn($"[{_config.Name}][{_sessionId}][{dir}] SendFromQueue cancelled: {destEndPoint}.");
        }
        catch (SocketException se) when (IsConnectionCloseError(se.SocketErrorCode))
        {
            Log.Info($"[{_config.Name}][{_sessionId}][{dir}] Conn closed send: {se.SocketErrorCode}. Dest: {destEndPoint}");
        }
        catch (ObjectDisposedException)
        {
            Log.Warn($"[{_config.Name}][{_sessionId}][{dir}] Dest socket disposed send: {destEndPoint}.");
        }
        catch (Exception ex)
        {
            Log.Error($"[{_config.Name}][{_sessionId}][{dir}] Error sending queue {destEndPoint}", ex);
        }
        finally
        {
            Log.Warn($"[{_config.Name}][{_sessionId}][{dir}] Exiting SendFromQueue loop: {destEndPoint}.");
        } // Don't signal global closure
    }

    // Relay S -> C
    private async Task ReadAndRelayAsync(Socket sourceSocket, Socket destinationSocket, bool isClientToServer, CancellationToken token)
    {
        if (_disposed || !IsSocketConnected(sourceSocket) || !IsSocketConnected(destinationSocket))
            return;
        var buffer = ArrayPool<byte>.Shared.Rent(BufferSize);
        var srcEp = sourceSocket?.RemoteEndPoint?.ToString() ?? "N/A";
        var dstEp = destinationSocket?.RemoteEndPoint?.ToString() ?? "N/A";
        string dir = "S->C";
        Log.Debug($"[{_config.Name}][{_sessionId}][{dir}] ReadRelay starting: {srcEp} -> {dstEp}.");
        try
        {
            while (!token.IsCancellationRequested && IsSocketConnected(sourceSocket) && IsSocketConnected(destinationSocket))
            {
                var bytesRead = await sourceSocket.ReceiveAsync(new Memory<byte>(buffer), SocketFlags.None, token);
                if (bytesRead == 0)
                {
                    Log.Info($"[{_config.Name}][{_sessionId}][{dir}] Source disconnected {srcEp} (read 0).");
                    break;
                }
                Log.Debug($"[{_config.Name}][{_sessionId}][{dir}] Rx {bytesRead} from {srcEp}.");
                long pC = Interlocked.Read(ref _pendingSendBytesC);
                if (pC + bytesRead > _config.MaxSendBufferBytes)
                {
                    Log.Warn($"[{_config.Name}][{_sessionId}][{dir}] Backpressure S->C. Closing.");
                    SignalClosure();
                    break;
                }
                Interlocked.Add(ref _pendingSendBytesC, bytesRead);
                int totalSent = 0;
                while (totalSent < bytesRead && !token.IsCancellationRequested && IsSocketConnected(destinationSocket))
                {
                    var sent = await destinationSocket.SendAsync(new ReadOnlyMemory<byte>(buffer, totalSent, bytesRead - totalSent), SocketFlags.None, token);
                    if (sent == 0)
                        throw new SocketException((int)SocketError.ConnectionAborted);
                    totalSent += sent;
                }
                if (token.IsCancellationRequested || !IsSocketConnected(destinationSocket))
                {
                    Log.Warn($"[{_config.Name}][{_sessionId}][{dir}] Send cancel/disconnect client mid-segment. Sent {totalSent}/{bytesRead}.");
                    Interlocked.Add(ref _pendingSendBytesC, -totalSent);
                    Interlocked.Add(ref _sessionBytesForwardedStoC, totalSent);
                    _onTrafficCallback?.Invoke(0, totalSent); break;
                }
                Log.Debug($"[{_config.Name}][{_sessionId}][{dir}] Sent {totalSent} to {dstEp}.");
                Interlocked.Add(ref _pendingSendBytesC, -totalSent);
                Interlocked.Add(ref _sessionBytesForwardedStoC, totalSent);
                _onTrafficCallback?.Invoke(0, totalSent);
            }
        }
        catch (OperationCanceledException)
        {
            Log.Warn($"[{_config.Name}][{_sessionId}][{dir}] ReadRelay cancelled: {srcEp}.");
        }
        catch (SocketException se) when (IsConnectionCloseError(se.SocketErrorCode))
        {
            Log.Info($"[{_config.Name}][{_sessionId}][{dir}] Conn closed R/R: {se.SocketErrorCode}. Src:{srcEp}, Dst:{dstEp},ex:{se.StackTrace}");
        }
        catch (ObjectDisposedException)
        {
            Log.Warn($"[{_config.Name}][{_sessionId}][{dir}] Socket disposed R/R: {srcEp}.");
        }
        catch (Exception ex)
        {
            Log.Error($"[{_config.Name}][{_sessionId}][{dir}] Error R/R {srcEp} -> {dstEp}", ex);
        }
        finally
        {
            ArrayPool<byte>.Shared.Return(buffer);
            Log.Warn($"[{_config.Name}][{_sessionId}][{dir}] Exiting ReadRelay loop: {srcEp}.");
        } // Don't signal global closure
    }

    private ArraySegment<byte> RentAndCopy(byte[] source, int count)
    {
        var b = ArrayPool<byte>.Shared.Rent(count);
        Buffer.BlockCopy(source, 0, b, 0, count);
        return new ArraySegment<byte>(b, 0, count);
    }
    private void SignalClosure()
    {
        if (!_sessionCts.IsCancellationRequested && !_disposed)
        {
            try
            {
                Log.Debug($"[{_config.Name}][{_sessionId}] Signaling session closure.");
                _sessionCts.Cancel();
            }
            catch { }
        }
    }
    private static bool IsConnectionCloseError(SocketError e)
    {
        return e == SocketError.ConnectionReset || e == SocketError.ConnectionAborted || e == SocketError.Shutdown
            || e == SocketError.ConnectionRefused || e == SocketError.NotConnected || e == SocketError.NetworkReset
            || e == SocketError.TimedOut || e == SocketError.HostUnreachable || e == SocketError.NetworkUnreachable;
    }
    private static bool IsSocketConnected(Socket? s)
    {
        if (s == null || !s.Connected)
        {
            return false;
        }
        //try
        //{
        //    // Poll for 1 microsecond to check readability.
        //    // If Poll is true and Available is 0, remote has closed.
        //    // If Poll is false, socket is likely still okay (or timed out check).
        //    // This doesn't reliably detect all disconnects but catches graceful closures better than just .Connected.
        //    return !(s.Poll(1, SelectMode.SelectRead) && s.Available == 0);
        //}
        //// Catching general exceptions here might mask issues, but ObjectDisposed and SocketException are expected if closed.
        //catch (ObjectDisposedException) { return false; }
        //catch (SocketException) { return false; }
        return true;
    } // Basic check

    public void Dispose()
    {
        if (_disposed) return;
        lock (_sessionCts)
        {
            if (_disposed)
                return;
            _disposed = true;
            Log.Debug($"[{_config.Name}][{_sessionId}] Disposing session.");
        }
        SignalClosure();
        CloseSocket(_clientSocket, "Client");
        CloseSocket(_serverSocket, "Server(Current)");
        while (_clientSendBuffer.TryDequeue(out var s))
        {
            if (s.Array != null)
                ArrayPool<byte>.Shared.Return(s.Array);
        }
        try
        {
            _dataAvailableSignal.Dispose();
        }
        catch { }
        try
        {
            _sessionCts.Dispose();
        }
        catch { }
        try
        {
            _onClosedCallback?.Invoke(_sessionId);
        }
        catch (Exception ex)
        {
            Log.Warn($"[{_config.Name}][{_sessionId}] Error OnClosedCallback: {ex.Message}");
        }
        Log.Debug($"[{_config.Name}][{_sessionId}] Session disposed complete.");
        GC.SuppressFinalize(this);
    }
    private void CloseSocket(Socket? socket, string role)
    {
        if (socket == null)
            return;
        string ep = "N/A";
        try
        {
            if (socket.Connected)
                ep = socket.RemoteEndPoint?.ToString() ?? "N/A";
        }
        catch { }
        Log.Debug($"[{_config.Name}][{_sessionId}] Closing {role} socket {ep}");
        try
        {
            if (socket.Connected)
                socket.Shutdown(SocketShutdown.Both);
        }
        catch { }
        finally
        {
            try
            {
                socket.Close(); socket.Dispose();
            }
            catch { }
        }
    }

    // 在 TcpRelaySession 字段中添加:


    public ActiveSessionInfo GetCurrentInfo()
    {
        var now = DateTime.UtcNow;
        double elapsedSeconds = (now - _lastReportTimeUtc_Session).TotalSeconds;

        long currentTotalCtoS = Interlocked.Read(ref _sessionBytesForwardedCtoS);
        long currentTotalStoC = Interlocked.Read(ref _sessionBytesForwardedStoC);

        // 仅当经过了足够的时间(例如0.5秒)时才重新计算速率
        if (elapsedSeconds > 0.5)
        {
            long bytesDeltaCtoS = currentTotalCtoS - _lastReportedBytesCtoS_Session;
            long bytesDeltaStoC = currentTotalStoC - _lastReportedBytesStoC_Session;

            _currentSessionRateCtoS = bytesDeltaCtoS / elapsedSeconds; // 字节/秒
            _currentSessionRateStoC = bytesDeltaStoC / elapsedSeconds; // 字节/秒

            // 更新基线
            _lastReportedBytesCtoS_Session = currentTotalCtoS;
            _lastReportedBytesStoC_Session = currentTotalStoC;
            _lastReportTimeUtc_Session = now;
        }

        // 如果时间间隔太短,将返回上一次计算的速率 (_currentSessionRate...)
        return new ActiveSessionInfo
        {
            SessionId = _sessionId,
            ParentRelayConfigId = _config.Id,
            Protocol = _config.Protocol.ToString(),
            ClientRemoteEndPoint = _clientSocket?.RemoteEndPoint?.ToString() ?? "N/A",
            ServerRemoteEndPoint = _serverSocket?.Connected == true ? (_serverSocket.RemoteEndPoint?.ToString() ?? "Active") : (_currentConnectedServerEndpoint?.ToString() ?? "Disconnected"),
            BytesForwardedCtoS = currentTotalCtoS,
            BytesForwardedStoC = currentTotalStoC,
            RateCtoS = _currentSessionRateCtoS, // 返回存储的速率
            RateStoC = _currentSessionRateStoC, // 返回存储的速率
            ConnectedTimeUtc = _connectedTimeUtc
        };

    }
}

3.2 C-S缓冲

由于客户端可能在服务端断开(故障转移期间)仍在发送数据,所以必须缓存这些数据。在TcpRelaySession中的处理逻辑如下:

  1. 生产者(ReadAndBufferClientAsync)
    • 独立运行,有ProcessAsync启动一次。
    • await _clientSocket.ReceiveAsync(...) :从客户端接收数据。
    • _clientSendBuffer.Enque(segment):将数据(复制到ArrayPool租用的缓冲区)放入ConcurrentQueue。
    • _dataAvailableSignal.Release():通过SemaphoreSlim通知消费者有新数据。
    • 这个方法在服务器故障转移期间会继续运行和缓冲数据。
  2. 消费者(SendFromQueueToServerAsync):
    • 在ProcessAsync的内循环中启动(每次服务器连接时都会重新启动)
    • await _dataAvailableSignal.WaitAsync(token):高效地等待信号
    • _clientSendBuffer.TryDequeue(out segment):获取数据。
    • await destinationSocket.SendAsync(...):将数据发送到当前连接的_serverSocket。

4.关键实现:连接健壮性


4.1 检测静默断开的客户端

一个常见的网络问题是客户端(或其网络)突然崩溃或断开,而不发送TCP FIN/RST包。中继程序会持有一个“僵尸”连接,永远等待ReceiveAsync返回。

  • 方案A:应用层超时,在ReadAndBufferClientAsync中使用Task.WhenAny(receiveTask,Task.Delay(timeout))。这个方案存在的一个严重问题是,它无法区分真正静默的客户端,和合法的空闲的不发送数据的客户端。
  • 方案B:增强型TCP Keep-Alives方案,这是TCP层面的正确解决方案。在接受客户端套接字(TcpRelayInstance.AcceptClientAsync)中,代码如下:
public class TcpRelayInstance : IRelayInstance
{
    private static readonly ILog Log = LogManager.GetLogger(typeof(TcpRelayInstance));
    private readonly RelayConfig _config;
    private readonly Action<Guid, RelayStatusUpdate> _statusUpdateCallback;
    private TcpListener? _listener;
    private readonly ConcurrentDictionary<Guid, TcpRelaySession> _sessions = new();
    private CancellationTokenSource _instanceCts = new();
    private long _totalBytesCtoS = 0;
    private long _totalBytesStoC = 0;
    // 新增: 用于计算聚合速率的状态字段
    private long _lastRateCalcAggBytesCtoS = 0;
    private long _lastRateCalcAggBytesStoC = 0;
    private DateTime _lastRateCalcAggTimeUtc = DateTime.UtcNow;
    private double _currentAggRateCtoS = 0;
    private double _currentAggRateStoC = 0;

    private Task? _acceptLoopTask;
    private volatile bool _isRunning = false;
    private string? _lastError = null;
    public Guid Id => _config.Id;

    public TcpRelayInstance(RelayConfig config, Action<Guid, RelayStatusUpdate> statusUpdateCallback)
    {
        _config = config;
        _statusUpdateCallback = statusUpdateCallback;
    }

    public async Task StartAsync(CancellationToken managerCancellationToken)
    {
        Log.Info($"[{_config.Name}] Starting TCP listener on {_config.LocalIpAddress}:{_config.LocalPort}");
        if (!IPAddress.TryParse(_config.LocalIpAddress, out var ipAddress))
        {
            ipAddress = IPAddress.Any;
            Log.Warn($"[{_config.Name}] Invalid local IP '{_config.LocalIpAddress}'. Binding IPAddress.Any.");
        }
        _instanceCts = CancellationTokenSource.CreateLinkedTokenSource(managerCancellationToken);
        var cancellationToken = _instanceCts.Token;
        try
        {
            _listener = new TcpListener(ipAddress, _config.LocalPort);
            _listener.Start(100);
            _isRunning = true;
            _lastError = null;
            ReportStatus();
            _acceptLoopTask = AcceptClientsAsync(cancellationToken);
            await _acceptLoopTask;
        }
        catch (SocketException se) when (se.SocketErrorCode == SocketError.AddressAlreadyInUse)
        {
            Log.Error($"[{_config.Name}] Start FAIL: Address {ipAddress}:{_config.LocalPort} already in use.", se);
            _isRunning = false;
            _lastError = $"AddressInUse: {_config.LocalPort}";
            ReportStatus();
        }
        catch (Exception ex) when (!(ex is OperationCanceledException))
        {
            Log.Error($"[{_config.Name}] Start FAIL: Error starting TCP listener on {ipAddress}:{_config.LocalPort}", ex);
            _isRunning = false;
            _lastError = $"Start FAIL: {ex.Message}";
            ReportStatus();
        }
        finally
        {
            _isRunning = false;
            var localListener = _listener; _listener = null;
            if (localListener != null && localListener.Server.IsBound)
            {
                try
                {
                    localListener.Stop();
                }
                catch (Exception stopEx)
                {
                    Log.Warn($"[{_config.Name}] Ex stopping listener in finally: {stopEx.Message}");
                }
            }
            ReportStatus();
            Log.Info($"[{_config.Name}] TCP listener stopped: {_config.LocalPort}.");
        }
    }

    private async Task AcceptClientsAsync(CancellationToken cancellationToken)
    {
        Log.Info($"[{_config.Name}] Accept loop started: {_config.LocalPort}. Waiting...");
        var localListener = _listener; // Capture listener locally
        while (localListener != null && !cancellationToken.IsCancellationRequested)
        {
            try
            {
                Socket clientSocket = await localListener.AcceptSocketAsync(cancellationToken);
                try
                {
                    clientSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
                }
                catch (Exception soEx)
                {
                    Log.Warn($"[{_config.Name}] Failed socket opts {clientSocket.RemoteEndPoint}: {soEx.Message}");
                }
                Log.Info($"[{_config.Name}] Accepted client: {clientSocket.RemoteEndPoint}");

                var sessionId = Guid.NewGuid();
                var session = new TcpRelaySession(sessionId, _config, clientSocket, HandleSessionClosed, HandleSessionTraffic);

                if (_sessions.TryAdd(sessionId, session))
                {
                    ReportStatus();
                    _ = Task.Run(() => session.ProcessAsync(_instanceCts.Token), _instanceCts.Token);
                }
                else
                {
                    Log.Warn($"[{_config.Name}] Failed add session {sessionId}. Closing client.");
                    try
                    {
                        clientSocket.Shutdown(SocketShutdown.Both);
                    }
                    catch { }
                    finally
                    {
                        clientSocket.Close();
                    }
                }
            }
            catch (OperationCanceledException)
            {
                Log.Info($"[{_config.Name}] Accept loop cancelled: {_config.LocalPort}.");
                break;
            }
            catch (SocketException se) when (_instanceCts.IsCancellationRequested || cancellationToken.IsCancellationRequested || _listener == null)
            {
                Log.Info($"[{_config.Name}] Accept SocketEx ignore stopping: {se.Message}.");
                break;
            }
            catch (ObjectDisposedException) when (_instanceCts.IsCancellationRequested || cancellationToken.IsCancellationRequested || _listener == null)
            {
                Log.Info($"[{_config.Name}] Accept ObjectDisposed ignore stopping.");
                break;
            }
            catch (Exception ex)
            {
                if (!cancellationToken.IsCancellationRequested)
                {
                    Log.Error($"[{_config.Name}] Error accepting client {_config.LocalPort}", ex);
                    _lastError = $"Accept Err: {ex.Message}";
                    ReportStatus();
                    try
                    {
                        await Task.Delay(200, cancellationToken);
                    }
                    catch
                    {
                    }
                }
                else
                {
                    Log.Info($"[{_config.Name}] Accept Ex ignored stopping: {ex.Message}");
                    break;
                }
            }
        }
        Log.Info($"[{_config.Name}] Accept loop finished: {_config.LocalPort}.");
    }

    private void HandleSessionClosed(Guid sessionId)
    {
        if (_sessions.TryRemove(sessionId, out _))
        {
            Log.Debug($"[{_config.Name}] Removed session {sessionId}.");
            // 仅报告连接数变化
            ReportStatus(false); // 'false' 表示这不是由流量触发的
        }
    }

    private void HandleSessionTraffic(long bytesCtoS, long bytesStoC)
    {
        Interlocked.Add(ref _totalBytesCtoS, bytesCtoS);
        Interlocked.Add(ref _totalBytesStoC, bytesStoC);
        // 不再从此方法调用 ReportStatus(),以避免过于频繁的回调。
        // 速率和总数将通过 GetCurrentAggregatedStatus() 轮询。
    }

    /// <summary>
    /// 报告非流量状态更新(连接、错误、运行状态)。
    /// </summary>
    /// <param name="includeTraffic">如果为 true,则包含流量(已弃用,应为 false)。</param>
    private void ReportStatus(bool includeTraffic = false)
    {
        try
        {
            bool isBound = false; try { isBound = _listener?.Server?.IsBound ?? false; } catch { }
            bool isRunning = _isRunning && isBound && !_instanceCts.IsCancellationRequested;

            // 注意: 我们不再从此方法推送流量/速率更新。
            // UI 将通过 GetCurrentAggregatedStatus() 拉取它们。
            // 我们仍然推送连接和错误状态。
            _statusUpdateCallback?.Invoke(Id, new RelayStatusUpdate
            {
                IsRunning = isRunning,
                ActiveConnections = _sessions.Count,
                LastError = _lastError,
                // 流量/速率字段将由轮询填充,这里保持默认值
                TotalBytesForwardedCtoS = 0, // Interlocked.Read(ref _totalBytesCtoS), // 不再推送
                TotalBytesForwardedStoC = 0, // Interlocked.Read(ref _totalBytesStoC), // 不再推送
                RateCtoS = 0,
                RateStoC = 0
            });
        }
        catch (Exception ex) { Log.Warn($"[{_config.Name}] Error reporting status: {ex.Message}"); }
    }

    /// <summary>
    /// 实现 IRelayInstance,由 RelayManager 轮询调用。
    /// </summary>
    public RelayStatusUpdate GetCurrentAggregatedStatus()
    {
        var now = DateTime.UtcNow;
        double elapsedSeconds = (now - _lastRateCalcAggTimeUtc).TotalSeconds;

        long currentTotalCtoS = Interlocked.Read(ref _totalBytesCtoS);
        long currentTotalStoC = Interlocked.Read(ref _totalBytesStoC);

        // 仅在经过足够时间后才计算,以获得平滑的速率
        if (elapsedSeconds > 0.5)
        {
            long bytesDeltaCtoS = currentTotalCtoS - _lastRateCalcAggBytesCtoS;
            long bytesDeltaStoC = currentTotalStoC - _lastRateCalcAggBytesStoC;

            _currentAggRateCtoS = bytesDeltaCtoS / elapsedSeconds;
            _currentAggRateStoC = bytesDeltaStoC / elapsedSeconds;

            // 更新基线
            _lastRateCalcAggBytesCtoS = currentTotalCtoS;
            _lastRateCalcAggBytesStoC = currentTotalStoC;
            _lastRateCalcAggTimeUtc = now;
        }

        // 获取最新的运行状态
        bool isBound = false; try { isBound = _listener?.Server?.IsBound ?? false; } catch { }
        bool isRunning = _isRunning && isBound && !_instanceCts.IsCancellationRequested;

        // 返回包含 *所有* 最新数据的完整状态对象
        return new RelayStatusUpdate
        {
            IsRunning = isRunning,
            ActiveConnections = _sessions.Count,
            LastError = _lastError,
            TotalBytesForwardedCtoS = currentTotalCtoS,
            TotalBytesForwardedStoC = currentTotalStoC,
            RateCtoS = _currentAggRateCtoS,
            RateStoC = _currentAggRateStoC
        };
    }

    // --- Method for UI to get active session details ---
    public List<ActiveSessionInfo> GetActiveSessionDetails()
    {
        return _sessions.Values.ToList() // Snapshot
            .Select(session => session.GetCurrentInfo())
            .Where(info => info != null)
            .OrderByDescending(info => info.ConnectedTimeUtc) // Show newest first
            .ToList();
    }

    public void Dispose()
    {
        Log.Info($"[{_config.Name}] Disposing TcpRelayInstance: {_config.LocalPort}.");
        _isRunning = false;
        if (!_instanceCts.IsCancellationRequested)
        {
            try
            {
                _instanceCts.Cancel();
            }
            catch { }
        }
        var localListener = _listener;
        _listener = null;
        if (localListener != null)
        {
            try
            {
                localListener.Stop();
                Log.Debug($"[{_config.Name}] Listener stopped.");
            }
            catch (Exception ex)
            {
                Log.Warn($"[{_config.Name}] Ex stopping listener: {ex.Message}");
            }
        }
        var acceptTask = _acceptLoopTask;
        if (acceptTask != null && !acceptTask.IsCompleted)
        {
            try
            {
                acceptTask.Wait(TimeSpan.FromMilliseconds(500));
            }
            catch { }
        }
        var sessionsToClose = _sessions.Values.ToList();
        Log.Info($"[{_config.Name}] Clearing/closing {_sessions.Count} sessions.");
        _sessions.Clear();
        sessionsToClose.ForEach(session =>
        {
            try
            {
                session.Dispose();
            }
            catch (Exception ex)
            {
                Log.Warn($"[{_config.Name}] Ex disposing session {session.SessionId}: {ex.Message}");
            }
        });
        try
        {
            _instanceCts.Dispose();
        }
        catch { }
        ReportStatus();
        Log.Info($"[{_config.Name}] TCP RelayInstance disposed: {_config.LocalPort}.");
        GC.SuppressFinalize(this);
    }
}

AcceptClientAsync方法中,设置KeepAlives后,如果客户端在30秒内没有通信,操作系统将自动发送一个Keep-Alive探测包。如果客户端或其网络已经挂掉,则探测失败(例如,在5次重试后,共30+5×5=55秒)。失败后_clientSocket.ReceiveAsync将立即抛出一个SocketException,ReadAndBufferClentAsync会捕获它,调用SignalClosure(),并优雅地终止会话。

4.2 回压(Backpressure)

如果后端服务器S比客户端C慢,我们不能无限制地缓冲数据,MaxSendBufferBytes的配置项就是允许最大的数据缓冲量。

  • 在ReadAndBufferClientAsync (C->S生产者)中,实现的代码如下:
    var segment = RentAndCopy(buffer, bytesRead);
    long currentPendingS = Interlocked.Read(ref _pendingSendBytesS);
    if (currentPendingS + bytesRead > _config.MaxSendBufferBytes)
    {
        Log.Warn($"[{_config.Name}][{_sessionId}] Backpressure C->S. Pending ({currentPendingS + bytesRead}) > limit ({_config.MaxSendBufferBytes}). Closing session.");
        ArrayPool<byte>.Shared.Return(segment.Array!);
        SignalClosure();
        return true; // Break loop
    }
    _clientSendBuffer.Enqueue(segment);
    Interlocked.Add(ref _pendingSendBytesS, bytesRead);
  • 在ReadAndRelayAsync (S->C直通)中也应用了类似的逻辑:
    long pC = Interlocked.Read(ref _pendingSendBytesC);
    if (pC + bytesRead > _config.MaxSendBufferBytes)
    {
        Log.Warn($"[{_config.Name}][{_sessionId}][{dir}] Backpressure S->C. Closing.");
        SignalClosure();
        break;
    }
    Interlocked.Add(ref _pendingSendBytesC, bytesRead);

5.关键实现:实时流量与速率监控


在UI上,如果能显示当前各个部分的实时流量大小和速率,就会非常友好。

5.1 格式化工具

这些转换操作,是通过FormatUtils.cs来实现的:

public static class FormatUtils
{
    private static readonly string[] SizeSuffixes = { "B", "KB", "MB", "GB", "TB", "PB" };
    private static readonly string[] RateSuffixes = { "B/s", "KB/s", "MB/s", "GB/s", "TB/s", "PB/s" };

    /// <summary>
    /// 将字节总数格式化为人类可读的字符串 (例如 1.23 MB)。
    /// </summary>
    /// <param name="bytes">总字节数。</param>
    /// <returns>格式化后的字符串。</returns>
    public static string FormatBytes(long bytes)
    {
        if (bytes == 0)
        {
            return "0 B";
        }

        // 计算数量级 (0=B, 1=KB, 2=MB, etc.)
        int mag = (int)Math.Log(bytes, 1024);

        // 计算格式化后的值
        decimal adjustedSize = (decimal)bytes / (1L << (mag * 10)); // 1L << (mag * 10) 等同于 Math.Pow(1024, mag)

        // 确保数量级在数组范围内
        if (mag >= SizeSuffixes.Length)
        {
            mag = SizeSuffixes.Length - 1; // 使用最大单位
            adjustedSize = (decimal)bytes / (1L << (mag * 10));
        }

        // 如果结果小于1且不是B,使用更小的单位
        if (adjustedSize < 1.0m && mag > 0)
        {
            mag--;
            adjustedSize = (decimal)bytes / (1L << (mag * 10));
        }

        // 根据大小决定小数位数
        if (adjustedSize < 10) return $"{adjustedSize:F2} {SizeSuffixes[mag]}";
        if (adjustedSize < 100) return $"{adjustedSize:F1} {SizeSuffixes[mag]}";
        return $"{adjustedSize:F0} {SizeSuffixes[mag]}";
    }

    /// <summary>
    /// 将以 字节/秒 为单位的速率格式化为人类可读的字符串 (例如 1.23 MB/s)。
    /// </summary>
    /// <param name="bytesPerSecond">速率 (字节/秒)。</param>
    /// <returns>格式化后的字符串。</returns>
    public static string FormatBytesRate(double bytesPerSecond)
    {
        if (bytesPerSecond < 0.01) // 对非常小的速率显示 0
        {
            return "0 B/s";
        }

        // 计算数量级
        int mag = (int)Math.Log(bytesPerSecond, 1024);
        if (mag < 0) mag = 0; // 防止负数

        // 计算格式化后的值
        double adjustedRate = bytesPerSecond / Math.Pow(1024, mag);

        // 确保数量级在数组范围内
        if (mag >= RateSuffixes.Length)
        {
            mag = RateSuffixes.Length - 1;
            adjustedRate = bytesPerSecond / Math.Pow(1024, mag);
        }

        // 如果结果小于1且不是B/s,使用更小的单位
        if (adjustedRate < 1.0 && mag > 0)
        {
            mag--;
            adjustedRate = bytesPerSecond / Math.Pow(1024, mag);
        }

        // 根据大小决定小数位数
        if (adjustedRate < 10) return $"{adjustedRate:F2} {RateSuffixes[mag]}";
        if (adjustedRate < 100) return $"{adjustedRate:F1} {RateSuffixes[mag]}";
        return $"{adjustedRate:F0} {RateSuffixes[mag]}";
    }
}

5.2 拉取(Poll)模型计算速率

速率=(当前字节数-上次字节数)/(当前时间-上次时间),推送模型在每个数据包上计算这个值,会导致速率剧烈波动。更好的方法是使用拉取模型,间隔一定的时间不断触发:

  1. 在MainForm上,使用一个Timer控件,例如每隔1s触发一次主动调用。
  2. Timer Tick调用RefreshRelayList()。
  3. RefreshRelayList()调用_relayManager.GetCurrentConfigsWithStatus()。
  4. RelayManager遍历内部的_activeRelays列表,并调用每个IRelayInstance上的GetCurrentAggregatedStatus()。
  5. TcpRelayInstance(GetCurrentAggregatedStatus):
    • 检查上次调用以来经过了多少时间(elapsedSeconds)。
    • 计算字节增量(currentTotalCtoS-_lastRateCalcAggBytesCtoS)。
    • 计算速率:_currentAggRateCtoS=bytesDeltaCtoS/elapsedSeconds;
    • 存储新速率和当前字节数/时间作为下一次计算的基线。
    • 返回包含总数和新速率的RelayStatusUpdate对象。
  6. RelayManager将此状态更新到RelayConfig对象的运行时属性上。
  7. MainForm将这个更新后的List<RelayConfig>重新绑定到dataGridViewRelays上。
  8. dataGridViewRelays的列绑定到RelayConfig的...Display数学,比如RateCtoSDisplay。
  9. RateCtoSDsiaplay的属性调用FormatUtils.FormatBytesRate(RateCtoS)来返回格式化的字符串。

这个流程同样适用于显示每个会话的速率(UpdateActiveSessionsGrid -> GetActiveSessionsForRelay -> GetActiveSessionDetails -> session.GetCurrentInfo())

6.UI实现


UI实现分为两个窗体,一个主窗体,一个是编辑中继配置窗体。

主UI界面的代码如下:

public partial class MainForm : Form
{
    private RelayManager? _relayManager;
    private System.Windows.Forms.Timer? _uiUpdateTimer;
    private BindingSource _relayConfigBindingSource = new BindingSource(); // Renamed for clarity
    private BindingSource _activeSessionsBindingSource = new BindingSource();
    private Guid _selectedRelayId = Guid.Empty;
    private bool _isRefreshingRelayList = false;
    public MainForm()
    {
        InitializeComponent();
        InitializeRelaySystem();
        SetupDataGridView();
        SetupUiUpdateTimer();
    }

    private void InitializeRelaySystem()
    {
        try
        {
            // Ensure Core logging is configured (if not already done by manager)
            LogConfigurator.Configure();

            var configs = ConfigManager.LoadConfig();
            _relayManager = new RelayManager(configs);
            _relayManager.OnStatusUpdate += RelayManager_OnStatusUpdate; // Subscribe to status updates
            _relayManager.StartAllEnabledRelays(); // Start active relays on load
        }
        catch (Exception ex)
        {
            MessageBox.Show($"Failed to initialize relay manager: {ex.Message}", "Initialization Error", MessageBoxButtons.OK, MessageBoxIcon.Error);
            // Optionally log the exception via log4net here
        }
    }

    private void SetupDataGridView()
    {
        // Configure DataGridView columns (Name, Protocol, Local, Remote, Status, Connections, C->S Bytes, S->C Bytes, Error)
        // Example column setup (do this in the designer or here)

        dataGridViewRelays.AutoGenerateColumns = false;
        dataGridViewRelays.AllowUserToAddRows = false;
        dataGridViewRelays.AllowUserToDeleteRows = false;
        dataGridViewRelays.SelectionMode = DataGridViewSelectionMode.FullRowSelect;
        dataGridViewRelays.MultiSelect = false;
        dataGridViewRelays.RowHeadersVisible = false; // Cleaner look
        dataGridViewRelays.ColumnHeadersDefaultCellStyle.Alignment = DataGridViewContentAlignment.MiddleLeft;

        dataGridViewRelays.Columns.Clear();
        AddColumn(dataGridViewRelays, "Name", "Name", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddColumn(dataGridViewRelays, "Protocol", "Protocol", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddColumn(dataGridViewRelays, "LocalEndpointDisplay", "Local Endpoint", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddColumn(dataGridViewRelays, "RemoteEndpointDisplay", "Remote Endpoint", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddCheckBoxColumn(dataGridViewRelays, "IsRunning", "Running", readOnly: true);
        AddColumn(dataGridViewRelays, "ActiveConnections", "Conns", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddColumn(dataGridViewRelays, "TotalBytesCtoSDisplay", "C->S Total", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddColumn(dataGridViewRelays, "RateCtoSDisplay", "C->S Rate", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddColumn(dataGridViewRelays, "TotalBytesStoCDisplay", "S->C Total", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddColumn(dataGridViewRelays, "RateStoCDisplay", "S->C Rate", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddColumn(dataGridViewRelays, "LastError", "Last Error", readOnly: true, fillWeight: 1, autoSizeMode: DataGridViewAutoSizeColumnMode.Fill);

        _relayConfigBindingSource.DataSource = typeof(RelayConfig);
        dataGridViewRelays.DataSource = _relayConfigBindingSource;
        dataGridViewRelays.SelectionChanged += DataGridViewRelays_SelectionChanged;
        dataGridViewRelays.CellDoubleClick += dataGridViewRelays_CellDoubleClick;


        // --- Setup for dataGridViewActiveSessions ---
        // Ensure this DataGridView exists on your form (e.g., dataGridViewActiveSessions)
        dataGridViewActiveSessions.AutoGenerateColumns = false;
        dataGridViewActiveSessions.AllowUserToAddRows = false;
        dataGridViewActiveSessions.AllowUserToDeleteRows = false;
        dataGridViewActiveSessions.SelectionMode = DataGridViewSelectionMode.FullRowSelect;
        dataGridViewActiveSessions.MultiSelect = false;
        dataGridViewActiveSessions.RowHeadersVisible = false;
        dataGridViewActiveSessions.ColumnHeadersDefaultCellStyle.Alignment = DataGridViewContentAlignment.MiddleLeft;


        dataGridViewActiveSessions.Columns.Clear();
        AddColumn(dataGridViewActiveSessions, "ClientRemoteEndPoint", "Client Endpoint", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddColumn(dataGridViewActiveSessions, "ServerRemoteEndPoint", "Server Endpoint", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddColumn(dataGridViewActiveSessions, "BytesCtoSDisplay", "C->S Total", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddColumn(dataGridViewActiveSessions, "RateCtoSDisplay", "C->S Rate", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddColumn(dataGridViewActiveSessions, "BytesStoCDisplay", "S->C Total", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddColumn(dataGridViewActiveSessions, "RateStoCDisplay", "S->C Rate", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddColumn(dataGridViewActiveSessions, "ConnectedTimeDisplay", "Connected at", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddColumn(dataGridViewActiveSessions, "DurationDisplay", "Duration", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddColumn(dataGridViewActiveSessions, "Protocol", "Protocol", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.AllCells);
        AddColumn(dataGridViewActiveSessions, "SessionId", "Session ID", readOnly: true, autoSizeMode: DataGridViewAutoSizeColumnMode.Fill, fillWeight: 1); // Optional

        _activeSessionsBindingSource.DataSource = typeof(ActiveSessionInfo);
        dataGridViewActiveSessions.DataSource = _activeSessionsBindingSource;


        RefreshRelayList(); // Initial population
        UpdateActiveSessionsGrid(); // Initial empty state for sessions
    }

    private void dataGridViewRelays_CellDoubleClick(object? sender, DataGridViewCellEventArgs e)
    {
        if (e.RowIndex >= 0) // Ensure not header row
        {
            btnEdit_Click(sender, EventArgs.Empty);
        }
    }

    // Helper for dataGridViewActiveSessions columns
    private void AddColumn(DataGridView dgv, string dataPropertyName, string headerText, string? format = null, bool readOnly = false, float fillWeight = 0, DataGridViewAutoSizeColumnMode autoSizeMode = DataGridViewAutoSizeColumnMode.DisplayedCells)
    {
        var column = new DataGridViewTextBoxColumn
        {
            DataPropertyName = dataPropertyName,
            HeaderText = headerText,
            Name = $"{dgv.Name}_col{dataPropertyName}", // Ensure unique name
            ReadOnly = readOnly,
            AutoSizeMode = autoSizeMode
        };

        if (autoSizeMode == DataGridViewAutoSizeColumnMode.Fill && fillWeight > 0)
        {
            column.FillWeight = fillWeight;
        }
        else if (autoSizeMode == DataGridViewAutoSizeColumnMode.Fill && fillWeight <= 0)
        {
            // If Fill mode is specified but no valid fillWeight, default to a small positive value
            column.FillWeight = 10; // Default positive fill weight
        }


        if (!string.IsNullOrEmpty(format))
        {
            column.DefaultCellStyle.Format = format;
        }
        dgv.Columns.Add(column);
    }

    private void AddCheckBoxColumn(DataGridView dgv, string dataPropertyName, string headerText, bool readOnly = false)
    {
        var column = new DataGridViewCheckBoxColumn
        {
            DataPropertyName = dataPropertyName,
            HeaderText = headerText,
            Name = $"{dgv.Name}_col{dataPropertyName}",
            ReadOnly = readOnly,
            AutoSizeMode = DataGridViewAutoSizeColumnMode.DisplayedCells
        };
        dgv.Columns.Add(column);
    }

    private void DataGridViewRelays_SelectionChanged(object? sender, EventArgs e)
    {
        // --- 1. 检查标志 ---
        // 如果此事件是由我们的定时器刷新触发的,则忽略它。
        if (_isRefreshingRelayList)
        {
            return;
        }

        if (dataGridViewRelays.CurrentRow != null && dataGridViewRelays.CurrentRow.DataBoundItem is RelayConfig selectedConfig)
        {
            if (_selectedRelayId != selectedConfig.Id)
            {
                _selectedRelayId = selectedConfig.Id;
                if (lblActiveSessions != null)
                {
                    lblActiveSessions.Text = $"Active Connections for: {selectedConfig.Name}";
                }
                UpdateActiveSessionsGrid(); // 当选择更改时,立即更新会话
            }
        }
        else
        {
            _selectedRelayId = Guid.Empty;
            if (lblActiveSessions != null)
            {
                lblActiveSessions.Text = "Active Connections (no relay selected)";
            }
            UpdateActiveSessionsGrid(); // 更新会话网格以显示为空
        }
    }

    private void UpdateActiveSessionsGrid()
    {
        if (_relayManager == null || this.IsDisposed || !this.IsHandleCreated) return;

        List<ActiveSessionInfo> sessions;
        if (_selectedRelayId == Guid.Empty)
        {
            sessions = new List<ActiveSessionInfo>();
        }
        else
        {
            // 这一步现在会触发所选实例轮询其所有会话以获取最新的会话状态和速率
            sessions = _relayManager.GetActiveSessionsForRelay(_selectedRelayId);
        }

        // 智能更新 BindingSource 以减少闪烁 (可选,但推荐)
        // (为简单起见,我们仍使用 ResetBindings)
        _activeSessionsBindingSource.DataSource = sessions;
        _activeSessionsBindingSource.ResetBindings(false);
    }


    private void SetupUiUpdateTimer()
    {
        _uiUpdateTimer = new System.Windows.Forms.Timer();
        _uiUpdateTimer.Interval = 1500; // Update every 1.5 seconds
        _uiUpdateTimer.Tick += UiUpdateTimer_Tick;
        _uiUpdateTimer.Start();
    }

    private void UiUpdateTimer_Tick(object? sender, EventArgs e)
    {
        if (this.IsDisposed || !this.IsHandleCreated)
        {
            _uiUpdateTimer?.Stop();
            return;
        }
        if (_isRefreshingRelayList) return; // 如果上一个刷新周期仍在运行,则跳过
        try
        {
            _isRefreshingRelayList = true; // --- 1. 设置标志 ---

            // 刷新聚合列表(这将触发聚合速率计算)
            RefreshRelayList();

            // 刷新会话列表(这将触发生命会话速率计算)
            UpdateActiveSessionsGrid();
        }
        catch (Exception ex)
        {
            // 记录定时器滴答事件中的错误,但不要让定时器崩溃
            log4net.LogManager.GetLogger(typeof(MainForm)).Error("Error in UIUpdateTimer_Tick", ex);
        }
        finally
        {
            _isRefreshingRelayList = false; // --- 3. 无论如何都要取消设置标志 ---
        }
    }
 
    private void RefreshRelayList()
    {
        if (_relayManager == null) return;

        // Get the current state from the manager
        var configsWithStatus = _relayManager.GetCurrentConfigsWithStatus();

        // Update the BindingSource. This is more efficient than clearing and adding.
        // If you need to handle adds/removes correctly, you might need a BindingList<RelayConfig>
        // For simplicity here, we reset the binding source.
        _relayConfigBindingSource.DataSource = configsWithStatus;
        _relayConfigBindingSource.ResetBindings(false); // Refresh the grid
    }

    private void RelayManager_OnStatusUpdate(Guid id, RelayStatusUpdate status)
    {
        // Need to update the UI on the UI thread
        if (this.IsDisposed || !this.IsHandleCreated) return;

        if (this.InvokeRequired)
        {
            try
            {
                this.BeginInvoke(new Action(() => UpdateGridRowForRelayConfig(id, status)));
            }
            catch (ObjectDisposedException) { /* Form is closing */ }
            catch (InvalidOperationException) { /* Form handle not created or closing */ }
        }
        else
        {
            UpdateGridRowForRelayConfig(id, status);
        }
    }

    private void UpdateGridRowForRelayConfig(Guid id, RelayStatusUpdate status)
    {
        if (_relayConfigBindingSource.DataSource is not List<RelayConfig> list) return;

        var config = list.FirstOrDefault(c => c.Id == id);
        if (config != null)
        {
            bool changed = config.IsRunning != status.IsRunning ||
                           config.ActiveConnections != status.ActiveConnections ||
                           config.LastError != status.LastError;

            if (changed)
            {
                config.IsRunning = status.IsRunning;
                config.ActiveConnections = status.ActiveConnections;
                config.LastError = status.LastError;

                for (int i = 0; i < _relayConfigBindingSource.Count; i++)
                {
                    if (_relayConfigBindingSource[i] is RelayConfig bsConfig && bsConfig.Id == id)
                    {
                        _relayConfigBindingSource.ResetItem(i);
                        break;
                    }
                }
            }
        }
    }

    private void btnAdd_Click(object sender, EventArgs e)
    {
        using (var editorForm = new RelayEditorForm(null)) // Pass null for new config
        {
            if (editorForm.ShowDialog() == DialogResult.OK && editorForm.Config != null && _relayManager != null)
            {
                var currentConfigs = _relayManager.GetCurrentConfigsWithStatus();
                currentConfigs.Add(editorForm.Config);
                _relayManager.UpdateConfigs(currentConfigs); // Triggers add/start and save
                RefreshRelayList();
            }
        }
    }

    private void btnEdit_Click(object sender, EventArgs e)
    {
        if (dataGridViewRelays.SelectedRows.Count > 0 && dataGridViewRelays.SelectedRows[0].DataBoundItem is RelayConfig selectedConfig)
        {
            using (var editorForm = new RelayEditorForm(selectedConfig))
            {
                if (editorForm.ShowDialog() == DialogResult.OK && editorForm.Config != null && _relayManager != null)
                {
                    var currentConfigs = _relayManager.GetCurrentConfigsWithStatus();
                    // Find and replace the edited config
                    var index = currentConfigs.FindIndex(c => c.Id == editorForm.Config.Id);
                    if (index != -1)
                    {
                        currentConfigs[index] = editorForm.Config;
                        _relayManager.UpdateConfigs(currentConfigs); // Triggers update/restart and save
                        RefreshRelayList();
                    }
                }
            }
        }
        else
        {
            MessageBox.Show("Please select a relay configuration to edit.", "Edit Relay", MessageBoxButtons.OK, MessageBoxIcon.Information);
        }
    }

    private void btnDelete_Click(object sender, EventArgs e)
    {
        if (dataGridViewRelays.SelectedRows.Count > 0 && dataGridViewRelays.SelectedRows[0].DataBoundItem is RelayConfig selectedConfig)
        {
            if (MessageBox.Show($"Are you sure you want to delete the relay '{selectedConfig.Name}'?", "Confirm Delete", MessageBoxButtons.YesNo, MessageBoxIcon.Warning) == DialogResult.Yes)
            {
                if (_relayManager != null)
                {
                    var currentConfigs = _relayManager.GetCurrentConfigsWithStatus();
                    currentConfigs.RemoveAll(c => c.Id == selectedConfig.Id);
                    _relayManager.UpdateConfigs(currentConfigs); // Triggers stop/remove and save
                    RefreshRelayList();
                }
            }
        }
        else
        {
            MessageBox.Show("Please select a relay configuration to delete.", "Delete Relay", MessageBoxButtons.OK, MessageBoxIcon.Information);
        }
    }

    private void btnStart_Click(object sender, EventArgs e)
    {
        if (dataGridViewRelays.SelectedRows.Count > 0 && dataGridViewRelays.SelectedRows[0].DataBoundItem is RelayConfig selectedConfig)
        {
            if (_relayManager != null)
            {
                selectedConfig.IsEnabled = true; // Mark as enabled in the config
                var currentConfigs = _relayManager.GetCurrentConfigsWithStatus();
                // Update the specific config in the list
                var index = currentConfigs.FindIndex(c => c.Id == selectedConfig.Id);
                if (index != -1) currentConfigs[index] = selectedConfig;

                _relayManager.UpdateConfigs(currentConfigs); // Trigger potential start and save
                                                             // Refresh might be handled by the status update event
            }
        }
        else { MessageBox.Show("Select relay to start."); }
    }

    private void btnStop_Click(object sender, EventArgs e)
    {
        if (dataGridViewRelays.SelectedRows.Count > 0 && dataGridViewRelays.SelectedRows[0].DataBoundItem is RelayConfig selectedConfig)
        {
            if (_relayManager != null)
            {
                selectedConfig.IsEnabled = false; // Mark as disabled
                var currentConfigs = _relayManager.GetCurrentConfigsWithStatus();
                var index = currentConfigs.FindIndex(c => c.Id == selectedConfig.Id);
                if (index != -1) currentConfigs[index] = selectedConfig;

                _relayManager.UpdateConfigs(currentConfigs); // Trigger stop and save
            }
        }
        else { MessageBox.Show("Select relay to stop."); }
    }

    private void MainForm_FormClosing(object sender, FormClosingEventArgs e)
    {
        var logger = log4net.LogManager.GetLogger(typeof(MainForm));
        logger.Info("MainForm closing. Stopping timer and disposing Relay Manager...");
        _uiUpdateTimer?.Stop();
        _uiUpdateTimer?.Dispose();
        _relayManager?.Dispose();
        logger.Info("MainForm cleanup complete.");
    }
}

主窗体两个DataGridView,上面是当前配置的每条中继信息,当选择其中一个中继时,下面的DataGrid显示当前中继信息下的客户端与服务端的连接情况。

这里有个需要注意的地方是:UiUpdateTimer_Tick 每 1.5 秒调用 RefreshRelayList(),这会重置 DataGridView 的 DataSource 或调用 ResetBindings()。这反过来会导致 DataGridView 丢失并重新获取其选择,从而触发 SelectionChanged 事件,导致会话列表(dataGridViewActiveSessions)不断刷新,即使用户没有点击任何东西。解决方案是:添加一个布尔标志 _isRefreshingRelayList。在 UiUpdateTimer_Tick 和 RelayManager_OnStatusUpdate(它也可能重置绑定)中,在刷新绑定之前将此标志设置为 true,在之后将其设置为 false。在 DataGridViewRelays_SelectionChanged 事件处理器的顶部检查此标志;如果为 true,则立即 return。

编辑窗口的代码如下:

public partial class RelayEditorForm : Form
{
    private static readonly ILog Log = LogManager.GetLogger(typeof(RelayEditorForm));
    private readonly bool _isEditing;
    private RelayConfig _workingConfig; // The configuration being edited (a copy or original)
    private BindingList<RemoteServerEndpoint> _remoteServersBindingList; // For live editing in DataGridView

    /// <summary>
    /// Gets the resulting RelayConfig object after the dialog is closed with OK.
    /// Null if the dialog was cancelled or if there was an issue.
    /// </summary>
    public RelayConfig? Config { get; private set; }

    public RelayEditorForm(RelayConfig? configToEdit = null)
    {
        InitializeComponent(); // Standard call to load designer controls

        SetupProtocolComboBox();
        SetupNumericUpDowns();
        SetupRemoteServersGrid(); // Setup DGV columns before binding DataSource

        if (configToEdit == null) // Adding a new configuration
        {
            _isEditing = false;
            _workingConfig = new RelayConfig(); // Create a new configuration instance
            this.Text = "Add New Relay";
            // Ensure RemoteServers list is initialized for a new config
            if (_workingConfig.RemoteServers == null) _workingConfig.RemoteServers = new List<RemoteServerEndpoint>();
            // Create BindingList from the (empty) list of the new config
            _remoteServersBindingList = new BindingList<RemoteServerEndpoint>(_workingConfig.RemoteServers);
            LoadConfigDataIntoControls(null); // Load default values into UI controls
        }
        else // Editing an existing configuration
        {
            _isEditing = true;
            // Create a deep copy for editing, so original is not modified unless OK is pressed.
            // This is important if the RelayManager's list contains the original instance.
            // Simple way: Serialize and Deserialize, or implement a Clone method.
            // For this example, we'll make a semi-deep copy manually.
            _workingConfig = new RelayConfig
            {
                Id = configToEdit.Id,
                Name = configToEdit.Name,
                Protocol = configToEdit.Protocol,
                LocalIpAddress = configToEdit.LocalIpAddress,
                LocalPort = configToEdit.LocalPort,
                IsEnabled = configToEdit.IsEnabled,
                MaxSendBufferBytes = configToEdit.MaxSendBufferBytes,
                ClientInactivityTimeoutSeconds = configToEdit.ClientInactivityTimeoutSeconds,
                // Deep copy the list of remote servers
                RemoteServers = configToEdit.RemoteServers?.Select(s =>
                    new RemoteServerEndpoint(s.Host, s.Port, s.IsEnabled) { Id = s.Id }
                ).ToList() ?? new List<RemoteServerEndpoint>()
            };

            this.Text = $"Edit Relay: {_workingConfig.Name}";
            _remoteServersBindingList = new BindingList<RemoteServerEndpoint>(_workingConfig.RemoteServers);
            LoadConfigDataIntoControls(_workingConfig); // Load existing data into UI controls
        }
        // Bind the BindingList to the DataGridView after it's fully populated
        dgvRemoteServers.DataSource = _remoteServersBindingList;
    }

    /// <summary>
    /// Populates the Protocol ComboBox with supported values.
    /// </summary>
    private void SetupProtocolComboBox()
    {
        cmbProtocol.DataSource = Enum.GetValues(typeof(ProtocolType))
                                      .Cast<ProtocolType>()
                                      .Where(p => p == ProtocolType.Tcp || p == ProtocolType.Udp) // Filter for supported protocols
                                      .ToList();
        //cmbProtocol.DisplayMember = "ToString"; // Show enum names
        //cmbProtocol.ValueMember = "ToString";   // Store enum names (or use underlying value if needed)
    }

    /// <summary>
    /// Configures ranges and increments for NumericUpDown controls.
    /// </summary>
    private void SetupNumericUpDowns()
    {
        numLocalPort.Minimum = 1;
        numLocalPort.Maximum = 65535;
        // numRemotePort has been removed in favor of the grid

        numMaxBuffer.Minimum = 1 * 1024 * 1024;    // 1 MB
        numMaxBuffer.Maximum = 1024 * 1024 * 1024; // 1 GB
        numMaxBuffer.Increment = 1 * 1024 * 1024;  // 1 MB steps
        numMaxBuffer.ThousandsSeparator = true;    // For readability
    }

    /// <summary>
    /// Configures columns for the Remote Servers DataGridView.
    /// </summary>
    private void SetupRemoteServersGrid()
    {
        dgvRemoteServers.AutoGenerateColumns = false; // We define columns manually
        dgvRemoteServers.AllowUserToAddRows = false;  // Adding rows is done via the "Add Server" button
        dgvRemoteServers.RowHeadersVisible = false;
        dgvRemoteServers.SelectionMode = DataGridViewSelectionMode.FullRowSelect;
        dgvRemoteServers.Columns.Clear(); // Clear any designer-added columns

        dgvRemoteServers.Columns.Add(new DataGridViewTextBoxColumn
        {
            Name = "colServerHost",
            DataPropertyName = "Host",
            HeaderText = "Host",
            AutoSizeMode = DataGridViewAutoSizeColumnMode.Fill,
            FillWeight = 50 // Host takes up more space
        });
        dgvRemoteServers.Columns.Add(new DataGridViewTextBoxColumn
        {
            Name = "colServerPort",
            DataPropertyName = "Port",
            HeaderText = "Port",
            Width = 60,
            DefaultCellStyle = new DataGridViewCellStyle { Format = "N0" } // Numeric format, no decimals
        });
        dgvRemoteServers.Columns.Add(new DataGridViewCheckBoxColumn
        {
            Name = "colServerEnabled",
            DataPropertyName = "IsEnabled",
            HeaderText = "Enabled",
            Width = 70
        });

        // Event handlers for inline validation and error clearing
        dgvRemoteServers.CellValidating += dgvRemoteServers_CellValidating;
        dgvRemoteServers.CellEndEdit += dgvRemoteServers_CellEndEdit;
        dgvRemoteServers.DataError += dgvRemoteServers_DataError; // Handle data binding errors
    }

    /// <summary>
    /// Loads data from a RelayConfig object into the form's controls.
    /// If config is null, sets default values for a new configuration.
    /// </summary>
    private void LoadConfigDataIntoControls(RelayConfig? config)
    {
        if (config != null) // Editing existing config
        {
            txtName.Text = config.Name;
            cmbProtocol.SelectedItem = config.Protocol;
            txtLocalIp.Text = config.LocalIpAddress;
            // Ensure port value is within NumericUpDown control limits
            numLocalPort.Value = Math.Max(numLocalPort.Minimum, Math.Min(numLocalPort.Maximum, config.LocalPort > 0 ? config.LocalPort : 1));
            numMaxBuffer.Value = Math.Max(numMaxBuffer.Minimum, Math.Min(numMaxBuffer.Maximum, config.MaxSendBufferBytes > 0 ? config.MaxSendBufferBytes : numMaxBuffer.Minimum));
            chkEnabled.Checked = config.IsEnabled;
            txtClientSessionInactive.Text = config.ClientInactivityTimeoutSeconds.ToString();
            // RemoteServers are already loaded into _remoteServersBindingList which is bound to dgvRemoteServers
        }
        else // Setting defaults for a new configuration
        {
            txtName.Text = "New Relay";
            cmbProtocol.SelectedItem = ProtocolType.Tcp; // Default to TCP
            txtLocalIp.Text = "0.0.0.0";
            numLocalPort.Value = 7000; // Example default local port
            numMaxBuffer.Value = 10 * 1024 * 1024; // Default 10MB buffer
            chkEnabled.Checked = true;
            txtClientSessionInactive.Text = "0"; // Default to no inactivity timeout
                                                 // Add a default remote server entry for user convenience when creating new relay
            if (!_remoteServersBindingList.Any()) // Only if list is empty
            {
                _remoteServersBindingList.Add(new RemoteServerEndpoint("127.0.0.1", 8000));
            }
        }
    }

    /// <summary>
    /// Validates user input in the form controls.
    /// </summary>
    /// <returns>True if all inputs are valid, false otherwise.</returns>
    private bool ValidateInput()
    {
        errorProvider1.Clear(); // Clear previous error icons
        bool isValid = true;

        // Validate general settings
        if (string.IsNullOrWhiteSpace(txtName.Text))
        {
            errorProvider1.SetError(txtName, "Relay Name cannot be empty.");
            isValid = false;
        }
        if (cmbProtocol.SelectedItem == null)
        {
            errorProvider1.SetError(cmbProtocol, "A Protocol must be selected.");
            isValid = false;
        }
        if (string.IsNullOrWhiteSpace(txtLocalIp.Text))
        {
            errorProvider1.SetError(txtLocalIp, "Local IP address cannot be empty.");
            isValid = false;
        }
        else if (!txtLocalIp.Text.Equals("0.0.0.0") && !IPAddress.TryParse(txtLocalIp.Text.Trim(), out _))
        {
            errorProvider1.SetError(txtLocalIp, "Invalid Local IP address format. Use a valid IP or '0.0.0.0'.");
            isValid = false;
        }

        if (!int.TryParse(txtClientSessionInactive.Text.Trim(), out _))
        {
            errorProvider1.SetError(txtClientSessionInactive, "ClientSessionInactive seconds should be greate or equal 0");
            isValid = false;
        }

        // Validate Remote Servers list
        if (!_remoteServersBindingList.Any(s => s.IsEnabled))
        {
            // Set error on the GroupBox or the DataGridView itself
            errorProvider1.SetError(gbRemoteServers, "At least one enabled remote server is required for the relay.");
            isValid = false;
        }

        for (int i = 0; i < _remoteServersBindingList.Count; i++)
        {
            var server = _remoteServersBindingList[i];
            if (string.IsNullOrWhiteSpace(server.Host))
            {
                MessageBox.Show(this, $"Host name cannot be empty for the server entry at row {i + 1}. Please fill it or remove the entry.", "Remote Server Error", MessageBoxButtons.OK, MessageBoxIcon.Warning);
                dgvRemoteServers.CurrentCell = dgvRemoteServers.Rows[i].Cells["colServerHost"]; // Focus the problematic cell
                dgvRemoteServers.BeginEdit(true);
                return false; // Stop further validation
            }
            if (server.Port <= 0 || server.Port > 65535)
            {
                MessageBox.Show(this, $"Port number for server '{server.Host}' (row {i + 1}) must be between 1 and 65535.", "Remote Server Error", MessageBoxButtons.OK, MessageBoxIcon.Warning);
                dgvRemoteServers.CurrentCell = dgvRemoteServers.Rows[i].Cells["colServerPort"];
                dgvRemoteServers.BeginEdit(true);
                return false; // Stop further validation
            }
        }

        // If still valid, check protocol specific requirements
        if (isValid && (ProtocolType)cmbProtocol.SelectedItem == ProtocolType.Udp && _remoteServersBindingList.Count(s => s.IsEnabled) > 1)
        {
            // For UDP, current core logic uses only the first enabled server. Multi-server failover is not implemented for UDP.
            // Inform the user or restrict to one server for UDP.
            // For now, let's allow multiple but core will only use first. This comment is for awareness.
            Log.Debug("Multiple remote servers configured for UDP. Current core logic will use the first enabled one.");
        }


        if (!isValid && !errorProvider1.HasErrors) // General message if specific errors aren't set by ErrorProvider
        {
            MessageBox.Show(this, "Please correct the errors indicated before saving.", "Validation Error", MessageBoxButtons.OK, MessageBoxIcon.Warning);
        }
        return isValid;
    }

    /// <summary>
    /// Handles the OK button click. Validates input, updates the RelayConfig object, and closes the dialog.
    /// </summary>
    private void btnOk_Click(object sender, EventArgs e)
    {
        // Force any active edit in dgvRemoteServers to attempt to commit.
        // This will trigger CellValidating. If CellValidating sets e.Cancel=true, EndEdit() might fail or hang.
        // This is why making CellValidating non-cancelling is often better.
        if (dgvRemoteServers.IsCurrentCellInEditMode)
        {
            dgvRemoteServers.EndEdit(); // Attempt to commit current cell edit
        }

        // A more robust way to ensure all controls validate:
        if (!this.ValidateChildren(ValidationConstraints.Enabled)) // Or .Visible True, .TabStop True
        {
            // One of the control's own Validating events (or DGV's CellValidating if it set e.Cancel) failed.
            // A MessageBox is typically shown by the control that failed, or by your ErrorProvider.
            MessageBox.Show(this, "Please correct the highlighted validation errors before saving.",
                            "Validation Error", MessageBoxButtons.OK, MessageBoxIcon.Warning);
            return;
        }

        // Now, call your comprehensive form-level validation.
        // This method should now check the state of _remoteServersBindingList for any errors
        // that were deferred by CellValidating (like empty hosts or invalid ports).
        if (!ValidateInput()) // Your existing method
        {
            // ValidateInput should have shown a specific message if it returned false.
            return;
        }

        // If all validations passed, proceed to update the config and close.
        _workingConfig.Name = txtName.Text.Trim();
        _workingConfig.Protocol = (ProtocolType)cmbProtocol.SelectedItem;
        _workingConfig.LocalIpAddress = txtLocalIp.Text.Trim();
        _workingConfig.LocalPort = (int)numLocalPort.Value;
        _workingConfig.MaxSendBufferBytes = (long)numMaxBuffer.Value;
        _workingConfig.IsEnabled = chkEnabled.Checked;
        _workingConfig.ClientInactivityTimeoutSeconds = int.Parse(txtClientSessionInactive.Text.Trim());

        // The _remoteServersBindingList directly manipulates _workingConfig.RemoteServers if initialized correctly.
        // Or, if _remoteServersBindingList was from a copy, ensure _workingConfig gets the final list.
        _workingConfig.RemoteServers = new List<RemoteServerEndpoint>(_remoteServersBindingList);

        this.Config = _workingConfig;
        this.DialogResult = DialogResult.OK;
        this.Close();
    }

    /// <summary>
    /// Handles the Cancel button click. Closes the dialog.
    /// </summary>
    private void btnCancel_Click(object sender, EventArgs e)
    {
        this.DialogResult = DialogResult.Cancel;
        this.Close();
    }

    /// <summary>
    /// Adds a new default remote server entry to the list.
    /// </summary>
    private void btnAddServer_Click(object sender, EventArgs e)
    {
        string defaultHost = $"server{_remoteServersBindingList.Count + 1}.example.com";
        int defaultPort = 8000 + _remoteServersBindingList.Count; // Ensure this doesn't exceed 65535 over time
        if (defaultPort > 65535) defaultPort = 65535; // Cap the port

        var newServer = new RemoteServerEndpoint(defaultHost, defaultPort, true);
        _remoteServersBindingList.Add(newServer);

        // Optional: Scroll to the new row and select it, but don't force edit mode.
        if (dgvRemoteServers.Rows.Count > 0)
        {
            int newRowIndex = dgvRemoteServers.Rows.GetLastRow(DataGridViewElementStates.None);
            if (newRowIndex >= 0)
            {
                dgvRemoteServers.ClearSelection(); // Clear previous selection
                dgvRemoteServers.Rows[newRowIndex].Selected = true; // Select the new row
                dgvRemoteServers.FirstDisplayedScrollingRowIndex = newRowIndex; // Scroll to make it visible

                // REMOVE OR COMMENT OUT THESE LINES:
                // dgvRemoteServers.CurrentCell = dgvRemoteServers.Rows[newRowIndex].Cells["colServerHost"];
                // dgvRemoteServers.BeginEdit(true); 
            }
        }
    }

    /// <summary>
    /// Removes the currently selected remote server from the list.
    /// </summary>
    private void btnRemoveServer_Click(object sender, EventArgs e)
    {
        RemoteServerEndpoint? selectedServer = null;
        if (dgvRemoteServers.CurrentRow != null && dgvRemoteServers.CurrentRow.DataBoundItem is RemoteServerEndpoint currentItem)
        {
            selectedServer = currentItem;
        }
        // Fallback if CurrentRow isn't what we expect, try SelectedRows.
        else if (dgvRemoteServers.SelectedRows.Count > 0 && dgvRemoteServers.SelectedRows[0].DataBoundItem is RemoteServerEndpoint selectedItem)
        {
            selectedServer = selectedItem;
        }

        if (selectedServer != null)
        {
            if (MessageBox.Show(this, $"Are you sure you want to remove server '{selectedServer.Host}:{selectedServer.Port}'?", "Confirm Remove", MessageBoxButtons.YesNo, MessageBoxIcon.Question) == DialogResult.Yes)
            {
                _remoteServersBindingList.Remove(selectedServer);
            }
        }
        else
        {
            MessageBox.Show(this, "Please select a server from the list to remove.", "Remove Server", MessageBoxButtons.OK, MessageBoxIcon.Information);
        }
    }

    /// <summary>
    /// Validates cell input in the Remote Servers DataGridView, particularly for the Port column.
    /// </summary>
    private void dgvRemoteServers_CellValidating(object sender, DataGridViewCellValidatingEventArgs e)
    {
        DataGridViewRow row = dgvRemoteServers.Rows[e.RowIndex];
        string columnName = dgvRemoteServers.Columns[e.ColumnIndex].Name;

        // Always clear previous error for the cell/row before re-validating this cell
        // Note: Setting ErrorText on a cell applies to the whole row if not using cell-specific error icons.
        row.ErrorText = string.Empty;

        // If it's the special template row for adding new records (when AllowUserToAddRows = true),
        // and the user hasn't typed anything yet, we might skip validation or be lenient.
        // However, in our current setup, AllowUserToAddRows is false, so rows are added programmatically.
        // This IsNewRow check will likely be false for any cell being edited after programmatic add.
        if (row.IsNewRow)
        {
            // If user hasn't started editing the new template row, don't validate.
            if (string.IsNullOrWhiteSpace(e.FormattedValue?.ToString()))
            {
                return; // Skip validation for pristine new row template cell
            }
        }

        if (columnName == "colServerPort")
        {
            if (string.IsNullOrEmpty(e.FormattedValue?.ToString()))
            {
                // Allow temporarily empty port during editing.
                // Final validation will occur when 'OK' is clicked.
                // No error text needed here if empty is temporarily allowed.
            }
            else if (!int.TryParse(e.FormattedValue.ToString(), out int portValue) || portValue <= 0 || portValue > 65535)
            {
                // Provide feedback via ErrorText but AVOID e.Cancel = true to prevent focus trap.
                row.ErrorText = "Port must be a number between 1 and 65535.";
                // e.Cancel = true; // Avoid this to allow focus to move.
            }
        }
        else if (columnName == "colServerHost")
        {
            if (string.IsNullOrWhiteSpace(e.FormattedValue?.ToString()))
            {
                // Allow temporarily empty host. Final validation on 'OK'.
                // Provide feedback if desired, but don't trap focus.
                if (!row.IsNewRow) // Only show error for existing rows if host is cleared
                {
                    row.ErrorText = "Host cannot be empty.";
                }
                // e.Cancel = true; // Avoid this
            }
        }
    }

    /// <summary>
    /// Clears row error text when cell editing is finished.
    /// </summary>
    private void dgvRemoteServers_CellEndEdit(object sender, DataGridViewCellEventArgs e)
    {
        dgvRemoteServers.Rows[e.RowIndex].ErrorText = String.Empty;
    }

    /// <summary>
    /// Handles data errors in the DataGridView (e.g., failed type conversion during editing).
    /// This prevents the default .NET error dialog from showing and logs the error.
    /// </summary>
    private void dgvRemoteServers_DataError(object sender, DataGridViewDataErrorEventArgs e)
    {
        Log.Warn($"DataGridView DataError in '{this.Name}'. Row:{e.RowIndex}, Column:{e.ColumnIndex} (Header: {dgvRemoteServers.Columns[e.ColumnIndex].HeaderText}). Context:{e.Context}. Input Value Type: {dgvRemoteServers.Rows[e.RowIndex].Cells[e.ColumnIndex].EditedFormattedValue?.GetType()}", e.Exception);
        // Inform user gently
        MessageBox.Show(this, $"Invalid data for column '{dgvRemoteServers.Columns[e.ColumnIndex].HeaderText}'. Please ensure the format is correct.\nDetails: {e.Exception.Message}", "Data Input Error", MessageBoxButtons.OK, MessageBoxIcon.Warning);
        e.ThrowException = false; // Suppress the default .NET error dialog
                                  // Optionally, if you want to revert the change:
                                  // e.Cancel = true; // This might not work as expected in DataError, might need to handle in CellValidating or manually reset value.
    }

    private void btnEditServer_Click(object sender, EventArgs e)
    {

    }
}

这里需要注意的是:当用户正在 dgvRemoteServers 中编辑一个单元格(例如,输入端口号)时,如果他们点击“OK”按钮,DataGridView 会首先尝试验证并提交单元格编辑。如果 CellValidating 事件中的验证逻辑失败并设置了 e.Cancel = true(或显示了一个 MessageBox),焦点将被困在单元格中,无法到达“OK”按钮,导致 Click 事件永远不会触发。

解决方法是:改变验证策略。

  1. dgvRemoteServers_CellValidating: 只进行非阻塞的即时反馈。如果输入无效,设置 row.ErrorText 以显示一个红色图标,但不要设置 e.Cancel = true。这允许用户自由移动焦点。
  2. btnOk_Click: 这是最终验证的地方。首先调用 this.ValidateChildren() 来强制提交任何正在进行的编辑。然后,调用 ValidateInput() 方法。
  3. ValidateInput(): 现在这个方法负责严格检查 _remoteServersBindingList 中的每一行数据。如果发现空主机或无效端口,它会设置 dgvRemoteServers.Rows[i].ErrorText,将 isValid 设为 false,并最终显示一个摘要消息框。
  4. btnCancel_Click: 设置 this.AutoValidate = AutoValidate.Disable; 并调用 dgvRemoteServers.CancelEdit() 来放弃任何单元格编辑并绕过所有验证。

程序的UI界面如下:

7.部署为Windows服务


如果不要求UI界面,这类TCP中继部署为Windows服务更为合适。TcpUdpRelay.Service项目使用.NET Worker Service模板,项目需要引用TcpUdpRelay.Core,实现如下:

首先是Program.cs,流程如下:

  • 使用Host.CreateDefaultBuilder(args)。
  • 添加.UseWindowsService(...)。
  • 添加.ConfigureLogging(logging=>logging.AddLog4Net())。
  • 添加.ConfigureServices((hostContext,services)=>{...})。

配置服务:

  • services.AddSingleton(new RelayManager(ConfigManager.LoadConfig()));:将 RelayManager 注册为单例。主机(Host)将在启动时创建它,并在服务停止时自动调用其 Dispose() 方法。
  • services.AddHostedService<Worker>();:注册 Worker 服务。

完整代码如下:

[assembly: log4net.Config.XmlConfigurator(ConfigFile = "log4net.config", Watch = true)]
namespace TcpUdpRelay.Service
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Ensure Core logging is configured first for any early messages
            LogConfigurator.Configure();
            var hostLogger = log4net.LogManager.GetLogger(typeof(Program));
            hostLogger.Info("------------------------------------------");
            hostLogger.Info("TcpUdpRelay Service Starting Host Setup...");
            hostLogger.Info($"Arguments: {string.Join(" ", args)}");
            hostLogger.Info($"Base Directory: {AppContext.BaseDirectory}");
            hostLogger.Info("------------------------------------------");
 
            try
            {
                CreateHostBuilder(args).Build().Run();
                hostLogger.Info("TcpUdpRelay Service Host Stopped.");
            }
            catch (Exception ex)
            {
                hostLogger.Fatal("TcpUdpRelay Service Host terminated unexpectedly.", ex);
                // Ensure logs are flushed before exiting
                log4net.LogManager.Shutdown();
                Environment.Exit(1); // Indicate failure
            }
            finally
            {
                // Ensure logs are flushed on normal exit too
                log4net.LogManager.Shutdown();
            }
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
         Host.CreateDefaultBuilder(args)
             // Configure application to run as a Windows Service
             .UseWindowsService(options =>
             {
                 options.ServiceName = "TcpUdpRelayService";
             })
             .ConfigureServices((hostContext, services) =>
             {
                 // Load configuration
                 var relayConfigs = ConfigManager.LoadConfig();

                 // Register RelayManager as a Singleton
                 // It manages its own lifetime and resources via IDisposable
                 services.AddSingleton(new RelayManager(relayConfigs));

                 // Register the background service worker
                 services.AddHostedService<Worker>();
             })
             .ConfigureLogging((hostingContext, logging) =>
             {
                 // Optional: Clear default providers if you only want log4net
                 // logging.ClearProviders();

                 // Add log4net provider integration
                 logging.AddLog4Net();

                 // Set minimum log level (can be overridden by log4net.config)
                 logging.SetMinimumLevel(Microsoft.Extensions.Logging.LogLevel.Debug);

                 // Log provider configuration (optional)
                 // LoggerProviderOptions.RegisterProviderOptions<Log4NetProviderOptions, Log4NetLoggerProvider>(services);

             });
    }
}

log4net.config配置如下:

<?xml version="1.0" encoding="utf-8" ?>
<log4net>
	<root>
		<!--文件形式记录日志-->
		<level value="INFO" />
		<appender-ref ref="LogFileAppender" />
	</root>
	<!--定义输出到文件中-->
	<appender name="LogFileAppender" type="log4net.Appender.RollingFileAppender">
		<param name="Encoding" value="utf-8" />
		<param name="file" value="./log/logfile_" />
		<param name="appendToFile" value="true" />
		<param name="rollingStyle" value="Date" />
		<param name="StaticLogFileName" value="false" />
		<datePattern value="yyyyMMdd'.log'" />
		<param name="Threshold" value="DEBUG" />
		<layout type="log4net.Layout.PatternLayout">
			<param name="ConversionPattern" value="[%d{yyyy-MM-dd HH:mm:ss,fff}] %5p - %m%n" />
		</layout>
	</appender>

	<appender name="ConsoleAppender" type="log4net.Appender.ConsoleAppender">
		<layout type="log4net.Layout.PatternLayout">
			<conversionPattern value="%date{HH:mm:ss} %-5level %logger - %message%newline%exception" />
		</layout>
	</appender>
</log4net>

接下来是Worker.cs,它是一个BackgroundService:

  • ExecuteAsync 方法是服务的主体。
  • 它注入 RelayManager 和 ILogger。
  • 它所做的全部工作就是调用 _relayManager.StartAllEnabledRelays()。
  • 然后它 await Task.Delay(Timeout.Infinite, stoppingToken);,保持服务存活,而所有的中继逻辑都在 RelayManager 内部的后台任务中运行。

完整代码如下:

public class Worker : BackgroundService
{
    // Use ILogger from Microsoft.Extensions.Logging
    private readonly ILogger<Worker> _logger;
    private readonly RelayManager _relayManager;

    // Inject RelayManager and ILogger
    public Worker(ILogger<Worker> logger, RelayManager relayManager)
    {
        _logger = logger;
        _relayManager = relayManager;
        _logger.LogInformation("TcpUdpRelay Worker service initializing.");
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("TcpUdpRelay Worker service starting execution.");

        // Register a callback for when the application is stopping
        stoppingToken.Register(() =>
        {
            _logger.LogInformation("Cancellation token triggered. Initiating shutdown...");
            // RelayManager disposal should handle stopping relays
        });

        try
        {
            // Start the relays managed by RelayManager
            _relayManager.StartAllEnabledRelays();

            _logger.LogInformation("Relay Manager started. Worker running.");

            // Keep the service alive until cancellation is requested
            while (!stoppingToken.IsCancellationRequested)
            {
                // The actual work (accepting/relaying) happens within RelayManager's tasks.
                // This loop just keeps the service alive.
                // Add a delay to prevent tight looping and high CPU usage if nothing else is awaited.
                await Task.Delay(Timeout.Infinite, stoppingToken); // Wait indefinitely until cancelled
            }
        }
        catch (OperationCanceledException)
        {
            // This is expected when the service is stopping gracefully.
            _logger.LogInformation("Worker execution cancelled gracefully.");
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "An unhandled exception occurred during worker execution.");
            // Consider stopping the host or specific actions based on the error
        }
        finally
        {
            _logger.LogInformation("Worker execution finishing. Cleaning up...");
            // RelayManager is Singleton and will be disposed by the host,
            // which should call its Dispose method, stopping all relays.
            // Explicitly calling Dispose here might be redundant or cause issues
            // if the host manages its lifetime. Let the host handle it.
            // _relayManager.Dispose(); // Avoid calling Dispose here directly
        }

        _logger.LogInformation("Worker service execution completed.");
    }

    public override Task StopAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("StopAsync called. Performing cleanup tasks...");

        // RelayManager disposal is handled by the host. Log the intention.
        _logger.LogInformation("RelayManager will be disposed by the service host.");

        // Perform any other cleanup specific to the Worker service itself if needed.

        _logger.LogInformation("StopAsync completed.");
        return base.StopAsync(cancellationToken); // Call base implementation
    }
}

编译完成之后,用管理员身份打开cmd,使用sc.exe来安装:

//安装服务
sc.exe create "TcpUdpRelayService" binPath="C:\Your\Full\Path\To\publish\TcpUdpRelay.Service.exe"  DisplayName= "TcpUdpRelayService" start=auto
//添加服务描述
sc.exe description "TcpUdpRelayService" "使用TCP进行中继"
//启动服务
sc.exe start TcpUdpRelayService
//停止服务
sc.exe stop TcpUdpRelayService
//删除服务
sc.exe delete TcpUdpRelayService

安装后,界面如下: