在上一篇文章中提到了最近写的一个网络收发程序的一个问题。一开始还以为是因为计算机休眠会把线程杀死,导致程序在唤醒后接收不到服务端程序发送的数据,后来通过使用 WinDbg
分析dmp文件发现在网络异常断开后,接收数据的线程并没有退出,而是程序卡在了 NetworkStream.Read
方法那里,即使后来网络连通了也得不到通知和接收不了数据。这就引出了本篇要解决的问题:即在网络收发中,如何解决由于网络意外断开,客户端程序 Socket
的卡在了网络读取方法的问题。解决方法有两种:一种是在 Socket
上收发数据时设置超时时间,第二种就是增加心跳发送和检测逻辑。
问题
下面是一个典型的通过 Socket
接收数据的代码:
private void ReadData()
{
using (NetworkStream stream = new NetworkStream(so, ownsSocket: true))
{
int length;
byte[] head = new byte[4];
while (isRunning)
{
int count;
for (int offset = 0; offset < 4; offset += count)
{
count = stream.Read(head, offset, 4 - offset);
if (count == 0)
{
throw new Exception("socket 读取返回0");
}
}
length = BitConverter.ToInt32(head, 0);
if (length > 0 && length <= 1024 * 1024 * 30)
{
byte[] buff = new byte[length];
int count2;
for (int offset = 0; offset < length; offset += count2)
{
count2 = stream.Read(buff, offset, length - offset);
if (count2 == 0)
{
throw new Exception("socket 读取返回0");
}
}
_lastHeartbeatTime = DateTime.Now;
this.NewMessage?.Invoke(buff);
}
else if (length == 0)
{
//心跳包不用处理
_lastHeartbeatTime = DateTime.Now;
}
else
{
throw new Exception("数据长度错误:" + length);
}
}
}
}
方法通过 NetworkStream
的 Read
方法来从 Socket
读取数据。首先读取4个字节来获取数据包的长度,然后读取指定长度的内容。这里面一定要对长度进行校验,防止解析错误或者异常导致获得的数据长度过大,从而一直等待读取后续的过大长度的数据导致“卡死”。
上述代码,如果网络正常没有任何问题。但是当网络由于某种情况下异常断开(比如计算机休眠,网络连接会断开),断开的时候可能正处在执行 Read
读取数据方法的过程中,或者服务端在发送连接关闭请求时,恰好网络断开。在这些情况下,客户端的 Socket
是感知不到网络断开的,它会一直傻傻的等待不会到来的数据。即使随后网络已经恢复,但其持有的已经不是之前的那个 Socket
连接了。
解决方法
将上述的同步 Read
方法改为异步的 BeginReceive
,问题同样存在。解决方法有两种:
- 增加接收数据的超时时间,当一段时间没有数据到来时,方法会超时从而抛出异常。此时有两种情况,一种是服务端真的没有新的数据要发送过来,一种是连接断开了。对于同步的
Read
方法,可以直接设置超时;对于异步的方法,需要依赖通知机制来实现。 - 增加心跳发送和检测机制。当一段时间没有收到心跳数据时,断开当前连接并尝试重新连接。心跳机制根据客户端与服务端的交互逻辑又可以分为两种。一种是客户端和服务端只是单向数据传输,则发送方可以在没有数据发送时,定时发送心跳数据,接收数据的一方无论是在接收到正常数据还是心跳数据,都更新最后接收时间,然后在另外一个线程中判断当前时间与最后接收数据时间的差值,如果大于心跳数据发送频率的某个倍数,则认为此时网络连接可能已经断开,接收端在心跳检查线程里直接断开当前连接(
Socket.Close()
),尝试重新连接发送端。另一种是客户端和服务端都有双向数据发送,那么客户端可以按指定频率给服务端发送心跳消息,服务端接收到心跳消息后发送响应的心跳回复消息,客户端可以根据收到的心跳数据的时间判断连接是否已经断开。这种情况下如果网络连接已经断开,很大程度上在发送心跳数据时,就会报错抛出异常。如果此时发送心跳方法是同步方法,也要处理发送超时本身的问题,否则发送心跳的方法可能本身阻塞,如果发送是异步的,则不需要处理。
下面就上面的两种方法逐一分析。
设置超时时间
设置发送或接收超时能够避免发送数据或接收数据时连接断开, Socket
没有感知到从而阻塞的问题。但这一过程中要区分是连接断开导致的超时,还是没有数据过来导致的超时。
对于同步方法,只需要在 Socket
上设置超时时间即可,比如:
using System;
using System.Net;
using System.Net.Sockets;
class Program
{
static void Main()
{
try
{
// 创建一个 Socket 实例
Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
// 连接到服务器
IPEndPoint remoteEP = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 8080);
socket.Connect(remoteEP);
// 设置接收超时时间为 3 秒(3000 毫秒)
socket.ReceiveTimeout = 3000;
byte[] buffer = new byte[1024];
try
{
// 尝试接收数据
int bytesRead = socket.Receive(buffer);
if (bytesRead > 0)
{
string data = System.Text.Encoding.ASCII.GetString(buffer, 0, bytesRead);
Console.WriteLine("接收到数据: " + data);
}
}
catch (SocketException ex)
{
if (ex.SocketErrorCode == SocketError.TimedOut)
{
Console.WriteLine("接收超时");
}
else
{
Console.WriteLine("发生套接字错误: " + ex.Message);
}
}
finally
{
// 关闭套接字
socket.Close();
}
}
catch (Exception ex)
{
Console.WriteLine("发生错误: " + ex.Message);
}
}
}
上面设置了接收数据时超时间为3000ms,如果3秒后没有收到任何数据,则 Receive
方法会抛出异常。
如果是采用异步的收发,则需要配合同步原语如 ManualResetEvent
来进行判断:
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
class Program
{
static Socket socket;
static byte[] buffer = new byte[1024];
static ManualResetEvent receiveDone = new ManualResetEvent(false);
static bool received = false;
static void Main()
{
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPEndPoint remoteEP = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 8080);
socket.Connect(remoteEP);
// 开始异步接收
socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None,
new AsyncCallback(ReceiveCallback), socket);
// 设置超时时间为 3 秒
if (!receiveDone.WaitOne(3000))
{
Console.WriteLine("接收超时");
socket.Close();
}
else if (received)
{
Console.WriteLine("接收到数据: " + System.Text.Encoding.ASCII.GetString(buffer));
}
}
static void ReceiveCallback(IAsyncResult ar)
{
try
{
Socket client = (Socket)ar.AsyncState;
int bytesRead = client.EndReceive(ar);
if (bytesRead > 0)
{
received = true;
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
finally
{
receiveDone.Set();
}
}
}
在很多时候,没有收到数据并不表示连接已经断开,有很可能是当时没有数据交互。也就是说采用超时的收发方式不能判断是由于网络连接断开导致的超时,还是因为网络连接是正常的,但是由于没有数据交互导致了长时间没有接收到数据导致超时(当然可以通过抛出的异常类型 SocketException
来区别是否是 SocketError.TimedOut
)。如果是因为后者,超时后去断开然后重连显然是不合逻辑的。因为在这种情况下,即使断开重连,仍然可能会没有数据。所以更合理的方式是采用接下来介绍的心跳的方式。
心跳检测
根据收发的应用场景,心跳检测也有两种策略,分别是单向的心跳数据发送和检测,以及双向的心跳数据发送和检测。
单向心跳检测模式
如果是单向的数据收发,则发送端只需要在发送数据的时候,如果没有新的待发送数据,则按照一定的频率夹杂着发送心跳数据。接收端记录接收到的数据时间,不管是正常的数据,还是心跳数据。然后另外开启一个专门的检查心跳超时的程序,如果发现超时,直接将当前的连接 Close
,然后尝试重新连接。这里面数据包可以分为头4个字节为数据长度,后续的为该长度的数据内容。收发双方约定,如果头4个字节表示包长度的数据的值为0,则认为是一个心跳包。
代码如下:
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
class Client
{
private const int Port = 8888;
private const string ServerIp = "127.0.0.1";
private static Socket clientSocket;
private static DateTime lastHeartbeatTime = DateTime.Now;
private static bool isRunning = true;
private static Thread readThread;
private static CancellationTokenSource cts = new CancellationTokenSource();
private const int JoinTimeout = 5000; // 5 秒超时时间
private const int MaxDataLength = 1024; // 最大数据长度,可根据实际情况调整
static void Main()
{
while (true)
{
try
{
clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
clientSocket.Connect(IPAddress.Parse(ServerIp), Port);
Console.WriteLine("Connected to the server.");
isRunning = true;
cts = new CancellationTokenSource();
// 启动心跳检查线程
Thread heartbeatCheckThread = new Thread(CheckHeartbeat);
heartbeatCheckThread.Start();
// 启动数据读取线程
readThread = new Thread(() => ReadData(cts.Token));
readThread.Start();
// 等待读取线程结束
readThread.Join();
clientSocket.Close();
Console.WriteLine("Connection closed. Reconnecting...");
Thread.Sleep(10000);
}
catch (Exception e)
{
Console.WriteLine("Connection error: " + e.Message);
Thread.Sleep(10000);
}
}
}
static void ReadData(CancellationToken token)
{
try
{
byte[] buffer = new byte[1024];
while (isRunning && !token.IsCancellationRequested)
{
try
{
if (clientSocket.Available > 0)
{
int bytesRead = clientSocket.Receive(buffer, 0, 4, SocketFlags.None);
if (bytesRead == 4)
{
int length = BitConverter.ToInt32(buffer, 0);
if (length == 0)
{
lastHeartbeatTime = DateTime.Now;
}
else
{
if (length > MaxDataLength)
{
Console.WriteLine("Received data length is too long. Closing connection...");
isRunning = false;
cts.Cancel();
if (clientSocket != null && clientSocket.Connected)
{
clientSocket.Close();
}
break;
}
lastHeartbeatTime = DateTime.Now;
byte[] data = new byte[length];
int totalRead = 0;
while (totalRead < length)
{
int read = clientSocket.Receive(data, totalRead, length - totalRead, SocketFlags.None);
totalRead += read;
}
string message = Encoding.UTF8.GetString(data);
Console.WriteLine("Received: " + message);
}
}
}
}
catch (SocketException ex)
{
if (ex.SocketErrorCode == SocketError.OperationAborted || ex.SocketErrorCode == SocketError.Disconnecting)
{
// 套接字关闭,正常退出
isRunning = false;
break;
}
Console.WriteLine("Error reading from socket: " + ex.Message);
isRunning = false;
break;
}
catch (Exception ex)
{
Console.WriteLine("Error in ReadData thread: " + ex.Message);
isRunning = false;
break;
}
}
}
catch (Exception ex)
{
Console.WriteLine("Unexpected error in ReadData thread: " + ex.Message);
}
}
static void CheckHeartbeat()
{
while (isRunning)
{
if ((DateTime.Now - lastHeartbeatTime).TotalSeconds > 10)
{
Console.WriteLine("Server disconnected. Reconnecting...");
isRunning = false;
// 优雅地请求取消操作
cts.Cancel();
// 关闭 Socket 以终止 ReadData 线程中的 Receive 操作
if (clientSocket != null && clientSocket.Connected)
{
clientSocket.Close();
}
// 等待读取线程退出,设置超时时间
if (readThread != null && readThread.IsAlive)
{
if (!readThread.Join(JoinTimeout))
{
Console.WriteLine("Read thread did not exit within the timeout period.");
}
}
}
Thread.Sleep(1000);
}
}
}
这里面逻辑很简单。
Main
函数是一个 while
循环,在循环内部,首先去连接服务端,这是一个同步的阻塞方法。当连接成功之后开启心跳检测线程,然后继续开启数据接收处理线程,然后在主线程里等待接收线程完成。
在接收线程的读取数据的方法 ReadData
里。首先读取前4个字节,判断这4个字节的长度,如果为0,则表示是一个心跳包。如果过长,则认为是一个非法数据,需要断开当前连接重新连接。接下来根据读取的数据长度,读取接下来的指定长度字节的内容,这些内容就是这次完整的实际的数据。如果这个方法里面出现异常,比如正要接收数据之前,网络断开,那么 Read
方法会抛出异常, readThread
会正常退出, Main
函数里的 Join
方法将返回,程序将当前的连接关闭,然后尝试重新连接。
如果网络断开,但是 Read
方法正在读取,它没有感知到网络断开,就会一直卡在 ReadData
方法里。这个时候检查心跳数据的线程就会发生作用,它长时间判断没有最新的数据到来,就会认为连接可能断开,继而调用 Socket.Close()
方法,此时卡住的 Receive
方法会立即抛出异常, readThread
线程会正常退出。紧接着在心跳程序里检查 readThread
的状态如果不为空且没退出,设置超时时间,尝试退出。如果还没退出,那就是遇到了异常,日志记录一下。
在判断超时之后,会将 isRunning
置为 false
,从而跳出当前 CheckHeartBeat
方法,结束心跳检查线程。然后主线程尝试重新连接,然后再次创建心跳检测线程和数据读取线程。这一点很重要,否则可能会创建多个心跳和数据读取线程。
服务端的处理也很简单,它只需要在没有数据发送时,发送心跳数据即可。
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
class Server
{
private const int Port = 8888;
private static Socket serverSocket;
private static Socket clientSocket;
private static ConcurrentQueue<byte[]> sendQueue = new ConcurrentQueue<byte[]>();
private static AutoResetEvent dataAvailable = new AutoResetEvent(false);
private const int HeartbeatInterval = 5000; // 5 秒没有发送数据开始发送心跳
static void Main()
{
try
{
serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPEndPoint localEndPoint = new IPEndPoint(IPAddress.Any, Port);
serverSocket.Bind(localEndPoint);
serverSocket.Listen(1);
Console.WriteLine("Server started. Waiting for a client...");
while (true)
{
try
{
clientSocket = serverSocket.Accept();
Console.WriteLine("Client connected.");
// 启动数据发送和心跳处理线程
Thread sendThread = new Thread(SendDataAndHeartbeat);
sendThread.Start();
// 模拟调用 Send 方法发送数据
Thread.Sleep(2000);
Send("Hello from server!");
// 等待客户端连接断开
while (clientSocket.Connected)
{
Thread.Sleep(100);
}
Console.WriteLine("Client disconnected. Waiting for a new client...");
}
catch (Exception e)
{
Console.WriteLine("Connection error: " + e.Message);
}
}
}
catch (Exception e)
{
Console.WriteLine("Exception: " + e.Message);
}
}
public static void Send(string message)
{
byte[] data = Encoding.UTF8.GetBytes(message);
byte[] lengthBytes = BitConverter.GetBytes(data.Length);
byte[] packet = new byte[4 + data.Length];
Array.Copy(lengthBytes, 0, packet, 0, 4);
Array.Copy(data, 0, packet, 4, data.Length);
sendQueue.Enqueue(packet);
dataAvailable.Set();
}
static void SendDataAndHeartbeat()
{
while (true)
{
try
{
if (clientSocket != null && clientSocket.Connected)
{
if (dataAvailable.WaitOne(HeartbeatInterval))
{
// 有数据可发送
if (sendQueue.TryDequeue(out byte[] packet))
{
clientSocket.Send(packet);
Console.WriteLine("Sent data: " + Encoding.UTF8.GetString(packet, 4, packet.Length - 4));
}
}
else
{
// 超时,发送心跳
byte[] heartbeat = new byte[4];
clientSocket.Send(heartbeat);
Console.WriteLine("Sent heartbeat");
}
}
else
{
break;
}
}
catch (Exception e)
{
Console.WriteLine("Send error: " + e.Message);
break;
}
}
}
}
在 Main
函数中,主线程等待远程连接的到来,连接上以后就开启数据发送和心跳处理线程。在 SendDataAndHeartbeat
里面是一个 while
循环,在循环中通过 dataAvalible.Wait()
方法,提供一个等待超时时间,如果在该超时时间内有状态变更,则 Wait
方法返回,表示队列中有新添加的数据,继而从队列中读取数据并发送。如果超时,则认为此时没有需要发送的数据,就发送一个心跳。 Send
就是把数据准备好放到队列里面,然后将 dataAvailable
状态置为 Set
,通知 SendDataAndHeartbeat
方法处理。当客户端连接断开时,会重新回到 Main
方法的 while
循环里面,继续等待下一次客户端的连接。
双向心跳检测模式
然而有时候,数据不是单向流动,而是双向传输。当然也可以按照上述的模式,只做单向的心跳检测。但添加双向的心跳检测也不复杂,这样双方都可以判断对方是否断线。一般的情况下服务端是一直运行的,可以接受多个客户端的连接,客户端可以与服务端收发信息。
在这种模式下,服务端默认客户端会按照特定频率发送心跳。客户端发送心跳数据后,如果服务端收到客户端的心跳,则回发给客户端心跳。客户端在接收数据处判断是否是心跳数据,如果是则更新心跳时间。这里需要注意的是,心跳发送和心跳超时检测逻辑最好放在同一个单独的线程里。心跳检测不能放在接收数据的方法里判断,因为如果此时服务端连接断开,虽然客户端在按照特定频率给服务端发送消息,但是如果服务端没有相应,客户端则不会收到数据,那么就走不到心跳超时判断的逻辑那里。
客户端代码如下:
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
class Client
{
private const int LengthHeaderSize = 4;
private const int HeartbeatInterval = 5000; // 心跳间隔时间,单位:毫秒
private const int HeartbeatTimeout = 10000; // 心跳超时时间,单位:毫秒
private const int MaxDataLength = 1024 * 1024; // 最大数据长度,1MB
private Socket _clientSocket;
private readonly IPEndPoint _serverEndPoint;
private readonly TimeSpan _reconnectInterval = TimeSpan.FromSeconds(5);
private bool _isRunning = false;
private DateTime _lastHeartbeatTime;
private DateTime _lastSendHeartbeatTime;
private CancellationTokenSource _cancellationTokenSource;
private Task _heartbeatTask;
private Task _receiveTask;
public Client(IPEndPoint endPoint)
{
_serverEndPoint = endPoint;
}
public async Task Start()
{
_isRunning = true;
while (_isRunning)
{
_cancellationTokenSource = new CancellationTokenSource();
try
{
_clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await ConnectAsyncWrapper(_clientSocket, _serverEndPoint);
Console.WriteLine("Client connected to the server.");
_lastHeartbeatTime = DateTime.Now;
_lastSendHeartbeatTime = DateTime.Now;
// 启动心跳发送任务
_heartbeatTask = SendHeartbeats(_cancellationTokenSource.Token);
// 启动数据接收任务
_receiveTask = ReceiveDataAsync(_cancellationTokenSource.Token);
await Task.WhenAll(_heartbeatTask, _receiveTask);
break;
}
catch (SocketException)
{
Console.WriteLine($"Failed to connect to the server. Retrying in {_reconnectInterval.TotalSeconds} seconds...");
await Task.Delay(_reconnectInterval);
}
}
}
public void Stop()
{
_isRunning = false;
_cancellationTokenSource?.Cancel();
if (_clientSocket != null && _clientSocket.Connected)
{
try
{
_clientSocket.Shutdown(SocketShutdown.Both);
}
catch (SocketException)
{
// 忽略异常,因为在关闭时可能已经断开连接
}
_clientSocket.Close();
}
Console.WriteLine("Client stopped.");
}
private async Task ConnectAsyncWrapper(Socket socket, IPEndPoint endPoint)
{
return await Task.Factory.FromAsync(
(callback, state) => socket.BeginConnect(endPoint, callback, state),
socket.EndConnect,
null);
}
private async Task ReceiveDataAsync(CancellationToken cancellationToken)
{
try
{
while (_isRunning && _clientSocket.Connected &&!cancellationToken.IsCancellationRequested)
{
byte[] lengthBuffer = new byte[LengthHeaderSize];
int bytesRead = await ReadBytesAsync(_clientSocket, lengthBuffer, LengthHeaderSize);
if (bytesRead == 0)
{
// 服务器断开连接
break;
}
int contentLength = BitConverter.ToInt32(lengthBuffer, 0);
if (contentLength > MaxDataLength)
{
Console.WriteLine("Received data length is too long. Reconnecting...");
await Reconnect();
continue;
}
if (contentLength == 0)
{
// 收到心跳回复,更新最后心跳时间
_lastHeartbeatTime = DateTime.Now;
}
else
{
byte[] contentBuffer = new byte[contentLength];
await ReadBytesAsync(_clientSocket, contentBuffer, contentLength);
string content = Encoding.UTF8.GetString(contentBuffer);
Console.WriteLine($"Client received: {content}");
}
}
}
catch (SocketException ex)
{
if (_isRunning)
{
Console.WriteLine($"Error receiving data on client: {ex.Message}. Server may have disconnected.");
await Reconnect();
}
}
catch (Exception ex)
{
if (_isRunning)
{
Console.WriteLine($"Unexpected error on client: {ex.Message}");
}
}
finally
{
if (_clientSocket != null)
{
_clientSocket.Close();
}
}
}
private async Task<int> ReadBytesAsync(Socket socket, byte[] buffer, int totalBytesToRead)
{
int bytesRead = 0;
while (bytesRead < totalBytesToRead && socket.Connected)
{
try
{
int received = await ReceiveAsyncWrapper(socket, buffer, bytesRead, totalBytesToRead - bytesRead);
if (received == 0)
{
break;
}
bytesRead += received;
}
catch (SocketException ex)
{
if (socket.Connected)
{
Console.WriteLine($"Error reading data: {ex.Message}");
throw;
}
}
}
return bytesRead;
}
private async Task<int> ReceiveAsyncWrapper(Socket socket, byte[] buffer, int offset, int count)
{
if (socket.Connected)
{
return await Task<int>.Factory.FromAsync(
(callback, state) => socket.BeginReceive(buffer, offset, count, SocketFlags.None, callback, state),
socket.EndReceive,
null);
}
return 0;
}
private async Task SendHeartbeats(CancellationToken cancellationToken)
{
while (_isRunning &&!cancellationToken.IsCancellationRequested)
{
await Task.Delay(HeartbeatInterval, cancellationToken);
if (_clientSocket == null ||!_clientSocket.Connected)
{
if (DateTime.Now - _lastSendHeartbeatTime > TimeSpan.FromMilliseconds(HeartbeatTimeout))
{
Console.WriteLine("Server disconnected due to heartbeat timeout.");
await Reconnect();
}
continue;
}
try
{
byte[] lengthBytes = BitConverter.GetBytes(0);
_lastSendHeartbeatTime = DateTime.Now;
await SendAsyncWrapper(_clientSocket, lengthBytes);
}
catch (SocketException ex)
{
if (_isRunning)
{
Console.WriteLine($"Error sending heartbeat: {ex.Message}");
if (DateTime.Now - _lastSendHeartbeatTime > TimeSpan.FromMilliseconds(HeartbeatTimeout))
{
Console.WriteLine("Server disconnected due to heartbeat timeout.");
await Reconnect();
}
}
}
}
}
private async Task Reconnect()
{
if (_isRunning)
{
Console.WriteLine("Trying to reconnect...");
_cancellationTokenSource.Cancel();
// 先关闭 clientSocket
if (_clientSocket != null && _clientSocket.Connected)
{
_clientSocket.Close();
}
try
{
// 等待任务完成
await Task.WhenAny(_heartbeatTask, _receiveTask);
}
catch (Exception ex)
{
Console.WriteLine($"Error waiting for tasks to complete: {ex.Message}");
}
await Start();
}
}
private async Task<int> SendAsyncWrapper(Socket socket, byte[] buffer)
{
if (socket.Connected)
{
return await Task<int>.Factory.FromAsync(
(callback, state) => socket.BeginSend(buffer, 0, buffer.Length, SocketFlags.None, callback, state),
socket.EndSend,
null);
}
return 0;
}
}
当客户端发送数据出现错误,接收到非法长度数据或者心跳超时,都会尝试重新连接。需要注意的是,在 Start
方法中,当连接成功之后,创建了两个任务,一个是发送和检测心跳任务,一个是接收服务端数据的服务。然后等待这两个 Task
结束。
在 Reconnect
方法中,如果要尝试重连,需要先将上述两个 Task
结束。然后再重新调用 Start
方法,否则可能会创建多个心跳和接收线程。要结束可能卡住的 Socket.BeginReceive
方法,需要首先调用 Socket
的 Close
方法。然后使用 Task.WhenAll
等待确认两个 Task
全部结束。
服务端代码如下:
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
class Server
{
private const int LengthHeaderSize = 4;
private const int HeartbeatInterval = 5000; // 心跳间隔时间,单位:毫秒
private const int HeartbeatTimeout = 10000; // 心跳超时时间,单位:毫秒
private const int MaxDataLength = 1024 * 1024; // 最大数据长度,1MB
private readonly Socket _serverSocket;
private readonly IPEndPoint _serverEndPoint;
private readonly Dictionary<Socket, DateTime> _clientHeartbeats = new Dictionary<Socket, DateTime>();
private readonly List<Socket> _connectedClients = new List<Socket>();
private CancellationTokenSource _cancellationTokenSource;
private Task _heartbeatTask;
public Server(IPEndPoint endPoint)
{
_serverEndPoint = endPoint;
_serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
}
public async Task Start()
{
_cancellationTokenSource = new CancellationTokenSource();
try
{
_serverSocket.Bind(_serverEndPoint);
_serverSocket.Listen(10);
Console.WriteLine("Server started. Waiting for connections...");
// 启动心跳检查任务
_heartbeatTask = Task.Run(() => CheckHeartbeats(_cancellationTokenSource.Token));
while (!_cancellationTokenSource.Token.IsCancellationRequested)
{
Socket clientSocket = await AcceptAsyncWrapper(_serverSocket);
if (clientSocket != null)
{
Console.WriteLine("Client connected.");
// 记录客户端的最后心跳时间
_clientHeartbeats[clientSocket] = DateTime.Now;
// 将新客户端添加到连接列表
_connectedClients.Add(clientSocket);
// 处理客户端数据
_ = HandleClientAsync(clientSocket);
}
}
}
catch (Exception ex)
{
if (!_cancellationTokenSource.Token.IsCancellationRequested)
{
Console.WriteLine($"An error occurred on server: {ex.Message}");
}
}
finally
{
_serverSocket?.Close();
}
}
private async Task<Socket> AcceptAsyncWrapper(Socket socket)
{
return await Task<Socket>.Factory.FromAsync(socket.BeginAccept, socket.EndAccept, null);
}
private async Task HandleClientAsync(Socket clientSocket)
{
try
{
while (clientSocket.Connected &&!_cancellationTokenSource.Token.IsCancellationRequested)
{
byte[] lengthBuffer = new byte[LengthHeaderSize];
int bytesRead = await ReadBytesAsync(clientSocket, lengthBuffer, LengthHeaderSize);
if (bytesRead == 0)
{
// 客户端断开连接
break;
}
int contentLength = BitConverter.ToInt32(lengthBuffer, 0);
if (contentLength > MaxDataLength)
{
Console.WriteLine($"Received data length from {clientSocket.RemoteEndPoint} is too long. Disconnecting...");
DisconnectClient(clientSocket);
break;
}
if (contentLength == 0)
{
// 收到心跳包,回复客户端
if (clientSocket.Connected)
{
byte[] responseLength = BitConverter.GetBytes(0);
await SendAsyncWrapper(clientSocket, responseLength);
}
// 更新客户端的最后心跳时间
_clientHeartbeats[clientSocket] = DateTime.Now;
}
else
{
byte[] contentBuffer = new byte[contentLength];
await ReadBytesAsync(clientSocket, contentBuffer, contentLength);
string content = Encoding.UTF8.GetString(contentBuffer);
Console.WriteLine($"Server received from {clientSocket.RemoteEndPoint}: {content}");
}
}
}
catch (SocketException ex)
{
Console.WriteLine($"Error receiving data from client {clientSocket.RemoteEndPoint}: {ex.Message}");
}
finally
{
// 移除断开连接的客户端
DisconnectClient(clientSocket);
}
}
private void DisconnectClient(Socket clientSocket)
{
if (_clientHeartbeats.ContainsKey(clientSocket))
{
_clientHeartbeats.Remove(clientSocket);
}
if (_connectedClients.Contains(clientSocket))
{
_connectedClients.Remove(clientSocket);
}
if (clientSocket != null && clientSocket.Connected)
{
clientSocket.Close();
}
}
private async Task<int> SendAsyncWrapper(Socket socket, byte[] buffer)
{
if (socket != null && socket.Connected)
{
return await Task<int>.Factory.FromAsync(
(callback, state) => socket.BeginSend(buffer, 0, buffer.Length, SocketFlags.None, callback, state),
socket.EndSend,
null);
}
return 0;
}
private async Task<int> ReadBytesAsync(Socket socket, byte[] buffer, int totalBytesToRead)
{
if (socket == null ||!socket.Connected)
{
return 0;
}
int bytesRead = 0;
while (bytesRead < totalBytesToRead && socket.Connected)
{
try
{
int received = await ReceiveAsyncWrapper(socket, buffer, bytesRead, totalBytesToRead - bytesRead);
if (received == 0)
{
break;
}
bytesRead += received;
}
catch (SocketException ex)
{
Console.WriteLine($"Error reading data: {ex.Message}");
throw;
}
}
return bytesRead;
}
private async Task<int> ReceiveAsyncWrapper(Socket socket, byte[] buffer, int offset, int count)
{
if (socket != null && socket.Connected)
{
return await Task<int>.Factory.FromAsync(
(callback, state) => socket.BeginReceive(buffer, offset, count, SocketFlags.None, callback, state),
socket.EndReceive,
null);
}
return 0;
}
private void CheckHeartbeats(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
Thread.Sleep(HeartbeatInterval);
List<Socket> disconnectedClients = new List<Socket>();
foreach (var client in _clientHeartbeats)
{
if (DateTime.Now - client.Value > TimeSpan.FromMilliseconds(HeartbeatTimeout))
{
// 心跳超时,认为客户端断开连接
Console.WriteLine($"Client {client.Key.RemoteEndPoint} disconnected due to heartbeat timeout.");
DisconnectClient(client.Key);
disconnectedClients.Add(client.Key);
}
}
// 移除断开连接的客户端
foreach (var client in disconnectedClients)
{
if (_clientHeartbeats.ContainsKey(client))
{
_clientHeartbeats.Remove(client);
}
if (_connectedClients.Contains(client))
{
_connectedClients.Remove(client);
}
}
}
}
// 向指定客户端发送消息的方法
public async Task Send(Socket clientSocket, string message)
{
if (clientSocket == null ||!clientSocket.Connected)
{
Console.WriteLine($"Cannot send message to {clientSocket?.RemoteEndPoint}. Socket is null or disconnected.");
return;
}
try
{
byte[] content = Encoding.UTF8.GetBytes(message);
byte[] lengthBytes = BitConverter.GetBytes(content.Length);
byte[] combinedBuffer = new byte[LengthHeaderSize + content.Length];
Array.Copy(lengthBytes, 0, combinedBuffer, 0, LengthHeaderSize);
Array.Copy(content, 0, combinedBuffer, LengthHeaderSize, content.Length);
await SendAsyncWrapper(clientSocket, combinedBuffer);
Console.WriteLine($"Sent message to {clientSocket.RemoteEndPoint}: {message}");
}
catch (Exception ex)
{
Console.WriteLine($"Error sending message to {clientSocket.RemoteEndPoint}: {ex.Message}");
}
}
// 获取所有连接的客户端
public List<Socket> GetConnectedClients()
{
return _connectedClients;
}
// 停止服务器的方法
public void Stop()
{
_cancellationTokenSource.Cancel();
foreach (var client in _connectedClients)
{
DisconnectClient(client);
}
_serverSocket.Close();
Console.WriteLine("Server stopped.");
}
}
服务端可以接受多个客户端连接。在 Start
方法里,只需要启动一个心跳任务。在后面的while循环中,当有一个新的连接到来时,将该连接保存下来,然后创建一个处理接收客户端数据的任务。在 HandleClientAsync
方法里,主要是接收客户端的数据,如果收到的是一个心跳数据,则立即给客户端回发心跳,并更新最后收到该客户端的心跳时间。
心跳检测任务则比较简单,只需要按照一定频率检测最后收到的心跳时间与当前时间的差值,如果大于心跳发送频率的某个倍数,则任务该连接已经断开,需要关闭当前的 Socket
连接,并在连接信息的相关记录里面移除该连接。
参考: