在构建交易系统时,全速盘口(OrderBook)的实时重建是数据处理链路中计算密度最高、对延迟最敏感的环节之一。本文将详细阐述全速盘口计算组件 FastMarketDataManager 的演进历程,分析从单线程模型到静态分片,再到基于消息队列的动态负载均衡架构的演进过程,重点介绍如何在保证数据严格按照时序处理的前提下,解决热点股票引发的队头阻塞(Head-of-Line Blocking)问题。
一、从逐笔数据重建订单簿
FastMarketDataManager的核心职责是顺序处理交易所推送的逐笔数据(包括逐笔成交和逐笔委托),实时合成高精度的全速订单簿,这是典型的事件驱动状态机模型。

▲ 图片来自华泰证券
1. 数据源定义
系统接收两类核心数据流:
- 逐笔委托(Order):包含挂单价格、数量、方向及订单号。
- 逐笔成交(TAS/Trade):包含成交价格、数量、方向及对应的买卖订单号。
2. 合成逻辑
算法需要维护两个核心有序队列:Bid Queue(买盘) 和 Ask Queue(卖盘)。通常使用价格优先、时间优先的原则管理。
- Order 事件处理:当接收到新的 Order 时,如果是放单,则算法在对应方向的价格档位上增加挂单量;如果是撤单,则在对应的价格档位上减少相应数量。如果该价格档位不存在,则创建新档位并插入有序队列。
- TAS 事件处理:当接收到 Trade 时,意味着挂单被消耗。算法根据成交信息,在对应价格档位上扣减挂单量。若某档位剩余量归零,则从队列中移除该档位。
- 快照生成:系统根据当前的 Bid/Ask 队列状态,截取前 N 档数据(如 Level-10),生成标准化的行情快照(Market Data Snapshot)供下游策略使用。
上交所和深交所对于撤单的情形可能有所不同,有的是撤单委托,有的是撤单成交,所以必须要有所区别。
技术约束:该计算过程必须严格遵循交易所推送的序列号(Sequence Number),任何乱序处理都会导致订单簿状态错误(如出现“负库存”或买卖倒挂)。
简单的代码如下,实际情况中,还需要考虑很多情况,比如上交所和深交所对数据字段定义的差别,具体一些出错的消除等等,更详细的介绍,可以查看当提到订单簿,我们指的是什么?这篇文章,这里不展开,它不是本文的重点:
// 核心逻辑抽象示意
public class OrderBookEngine
{
// 使用价格优先的有序字典维护挂单量
private SortedDictionary<double, long> _bids;
private SortedDictionary<double, long> _asks;
public void OnOrder(OrderData order)
{
var book = order.Side == Buy ? _bids : _asks;
// 挂单:在对应价格档位增加挂单量
if (book.TryGetValue(order.Price, out long vol))
book[order.Price] = vol + order.Volume;
else
book.Add(order.Price, order.Volume);
}
public void OnTrade(TAS trade)
{
var book = trade.Side == Buy ? _asks : _bids;
// 成交:消耗挂单量
if (book.TryGetValue(trade.Price, out long vol))
{
long remain = vol - trade.Volume;
if (remain <= 0) book.Remove(trade.Price); // 击穿档位
else book[trade.Price] = remain;
}
}
}
二、演进与瓶颈分析
第一阶段:单线程模型及其局限性
初期采用的是简单的单生产者-单消费者模型。所有订阅的股票数据汇入一个 ConcurrentQueue,由单一工作线程串行处理,代码大致如下:
public class FastMarketDataManager
{
private ConcurrentQueue<object> _globalQueue; // 全局单一队列
private Thread _workerThread;
public void UpdateTAS(TAS trade)
{
_globalQueue.Enqueue(trade); // 所有股票数据入同一个队
}
private void ProcessLoop()
{
while (_running)
{
if (_globalQueue.TryDequeue(out object data))
{
// 逐条串行计算
// 瓶颈:若某只股票数据量暴增,后续所有股票都会被阻塞
Process(data);
}
}
}
}
当时的主要考虑是,订阅行情回调的方法应该及时响应,它需要把数据放到队列里面后尽快返回,否则可能会导致接收行情的线程断开。上述单一线程处理逻辑,在实际的环境中会存在一些问题:
- 情形:随着订阅股票数量增加,该模型暴露出严重的性能问题。尤其在成交量增大或市场剧烈波动时,比如早盘开盘、下午开盘或者个别“热点股票”(Hot Spot,如新股或涨跌停股票)会在毫秒级产生海量数据。
- 后果:由于单线程处理能力的物理上限,热点股票的数据积压会阻塞队列,导致排在队列后方的非热点股票处理延迟急剧上升,形成”队头阻塞(Head-of-Line Blocking)“效应,全市场行情更新出现系统性滞后,需要等待单个线程处理完成之后才能恢复,这个恢复时间可能很长。
第二阶段:静态哈希分片
为解决单线程瓶颈,我们引入了 FastMDPostProcessor 处理单元,采用数据分片策略,即把所有需要计算全速盘口的股票分成几个组,每个组即由 FastMDPostProcessor 来处理。
实现方式:将N个股票进行对股票总数取余取模,静态分配到 M个处理线程(Batch)中,即静态哈希分片(Static Sharding)。
public class FastMarketDataManager
{
private Dictionary<string, int> symbolBatchIndex; // 路由表:Symbol -> BatchID
private List<FastMDPostProcessor> postProcessors; // 处理单元集合
private readonly int _batchCount;
public FastMarketDataManager(FastMarketDataConfig cfg)
{
_batchCount = cfg.ProcessFastMarketDataBatchCount;
symbolBatchIndex = new Dictionary<string, int>();
// 初始化 M 个处理线程
for (int i = 0; i < _batchCount; i++)
{
postProcessors.Add(new FastMDPostProcessor(fastMdDic, ...));
}
}
// 该方法通常在 UpdateMD 初始化股票时调用
private void AddSymbolsBatchIndex(string symbol)
{
// 只有新股票才需要计算路由
if (!symbolBatchIndex.ContainsKey(symbol))
{
// 算法核心:使用哈希取模 (Hash Modulo)
int batchId = symbolBatchIndex.Count() % _batchCount;
symbolBatchIndex.Add(symbol, batchId);
}
}
// 运行时分发
public void UpdateTAS(TAS trade)
{
// 查表路由:O(1) 复杂度找到对应的处理线程
if (symbolBatchIndex.TryGetValue(trade.Symbol, out int batchID))
{
postProcessors[batchID].Enqueue(trade);
}
}
}
// 消费者:FastMDPostProcessor
class FastMDPostProcessor
{
private ConcurrentQueue<object> _queue; // 待处理数据队列
private AutoResetEvent _signal;
public void Enqueue(object data)
{
_queue.Enqueue(data);
_signal.Set();
}
private void PostProcess()
{
while (_running)
{
_signal.WaitOne(); // 等待信号
while (_queue.TryDequeue(out object obj))
{
// 具体的盘口计算逻辑
if (obj is TAS tas) _fmd.Update(tas);
else if (obj is OrderData order) _fmd.Update(order);
}
}
}
}
上述结构在线上运行了相当长时间。但最近随着成交量的放大,出现了一些问题,某些股票的全速盘口在成交量放大时,会变慢,而其它另外一些股票的盘口却正常。我们的初步猜测是:
静态哈希无法解决负载倾斜(Load Skew)。若两只或多只高频热点股票偶然被分配至同一个 Batch,该 Batch 的处理延迟依然会飙升,而其它 Batch 的计算资源却处于闲置状态。这种资源分配的不均衡性在实盘中构成了显著的尾延迟(Tail Latency)风险。
要印证这个问题,首先需要将批次里面的股票以及每个股票的积压情况,可视化显示出来,以方便下次出现问题时,能够及时观测到以印证我们的想法。
class FastMDPostProcessor
{
// 待处理队列
private ConcurrentQueue<object> _queue;
// 监控核心:实时记录每只股票的积压量 [Key: Symbol, Value: Count]
private ConcurrentDictionary<string, int> _stockPendingCounts;
public FastMDPostProcessor(...)
{
_stockPendingCounts = new ConcurrentDictionary<string, int>();
// ...
}
// 生产者调用:入队并计数
public void Enqueue(TAS tas)
{
_queue.Enqueue(tas);
// 原子操作:计数 +1
_stockPendingCounts.AddOrUpdate(tas.Symbol, 1, (k, v) => v + 1);
_signal.Set();
}
// 消费者循环:处理并减数
private void PostProcess()
{
while (_queue.TryDequeue(out object obj))
{
// 1. 提取股票代码
string symbol = GetSymbol(obj);
// 2. 原子操作:计数 -1
if (symbol != null)
{
_stockPendingCounts.AddOrUpdate(symbol, 0, (k, v) => v > 0 ? v - 1 : 0);
}
// 3. 执行具体的盘口计算 (OrderBook Update)
if (obj is TAS tas) _fmd.Update(tas);
else if (obj is OrderData order) _fmd.Update(order);
}
}
public long PendingCount => _queue.Count;
// 对外接口:获取积压最严重的股票(用于监控UI显示或自动迁移决策)
public List<KeyValuePair<string, int>> GetTopPendingStocks(int topN = 5)
{
return _stockPendingCounts
.OrderByDescending(kv => kv.Value) // 按积压量降序排列
.Take(topN)
.ToList();
}
}
要实现可观测性也非常简单,只需要提供一个总积压数PendingCount 和一个获取积压明细GetTopPnedingStocks方法即可,并在FastMarketDataManager中提供对应的方法:
public List<BatchPendingInfo> GetPendingInfo()
{
var result = new List<BatchPendingInfo>(postProcessors.Count);
foreach (var processor in postProcessors)
{
result.Add(new BatchPendingInfo
{
BatchID = processor.ID,
TotalPendingCount = processor.PendingCount,
StockDetails = processor.GetPendingCounts()
});
}
// 按总积压量降序排列批次
result.Sort((x, y) => y.TotalPendingCount.CompareTo(x.TotalPendingCount));
return result;
}
这样,就可以按照总积压数TotalPendingCount倒序,然后通过属性StockDetails获取积压明细。
三、 基于消息驱动的动态负载均衡
为彻底解决负载倾斜问题,这里设计了一套无锁动态迁移机制。该机制允许系统在检测到某个 Batch 出现积压时,将积压严重的股票从忙碌线程(Source)Batch 移动到空闲线程(Target)Batch。
1.核心挑战
动态迁移的最大难点在于维护数据处理的严格顺序一致性。在迁移过程中,旧线程(Source Batch)队列中尚存该股票的未处理数据,而新数据已开始流向新线程(Target Batch)。若不加干预,新线程可能先于旧线程中的旧数据处理而先处理新到来数据,这样就违反了需要严格按照先后顺序处理数据的原则从而导致出错。比如:在迁移过程中,Source 队列里可能还有该股票的 5000 条旧数据未处理,而 Target 已经开始接收新数据了。如果 Target 直接开始处理新数据,会导致 SeqID 乱序(先处理了 Seq 10005,后处理 Seq 10001),破坏订单簿状态。
2.解决方案
这里摒弃了传统的加锁同步方案,转而利用消息队列的 FIFO 特性,设计了基于控制指令(Command Pattern)的异步迁移流程。
- 步骤一:缓冲区初始化 (Prepare)
- 监控系统识别出热点股票 S 需从 Batch A 迁移至 Batch B。
- 系统向 Batch B 发送 MigrateStartCmd。
- Batch B 收到指令后,为股票 S 初始化暂存缓冲区(Buffer)。此时,Batch B 并不处理股票 S 的逻辑,仅接收并缓存后续到达的数据。
- 步骤二:路由原子切换 (Switch)
- 系统更新全局路由表。
- 此后到达的股票 S 的所有新数据(Seq N+1 以后)将被路由至 Batch B 并进入其缓冲区。
- 步骤三:源线程极速排空 (Fast Drain)
- 系统向 Batch A 发送 MigrateFlushCmd,该指令排在 Batch A 当前积压的所有旧数据(Seq N 以前)之后。
- 优化关键点:系统同时通知 Batch A 进入“打包模式”。Batch A 不再对股票 S 的残留数据执行高耗时的订单簿计算,而是直接将其从队列中取出并存储到LegacyData 列表中。
- 此举将 Batch A 的处理耗时从 O(计算) 降级为 O(移动),极大缩短了积压数据的排空时间,快速释放 Batch A 的算力。
- 步骤四:状态接力与回放 (Handover & Replay)
- 当 Batch A 处理到 MigrateFlushCmd 时,意味着旧数据已全部打包。Batch A 发送包含 LegacyData 列表数据的 MigrateCompleteCmd 至 Batch B。
- Batch B 收到指令后,执行严格顺序的回放操作:
- Replay Legacy:处理 Batch A 移交的旧数据,更新订单簿状态至 Seq N。
- Replay Buffer:处理缓冲区内积压的新数据(Seq N+1 ...)。
- Resume:移除缓冲区,恢复正常的实时计算模式。
3. 架构优势
- 零锁竞争:全流程无 lock 或 WaitHandle 阻塞,完全依赖队列消息传递,最大化了 CPU 利用率。
- 顺序保证:利用 TCP/IP 类似的滑动窗口思想,通过消息传递确保了 Seq N 与 Seq N+1 的无缝衔接。
- 性能隔离:通过“打包模式”快速释放源线程压力,避免了迁移过程对源线程中其他股票的持续影响。
4.具体实现
核心思想就是通过引入三种控制指令(Command),利用消息队列的 FIFO 特性来协调迁移,全程无锁。
- 定义控制指令:为了在数据流中插入控制逻辑,我们定义了三种特殊消息:
// 指令1:通知目标线程(Target)建立缓冲区 class MigrateStartCmd { public string Symbol; } // 指令2:通知源线程(Source)停止计算,开始打包排空 class MigrateFlushCmd { public string Symbol; public int TargetBatchId; } // 指令3:源线程发给目标线程的交接数据包 class MigrateCompleteCmd { public string Symbol; public List<object> LegacyData; // 核心优化:打包好的旧数据 } - 调度器逻辑 (FastMarketDataManager),调度器负责编排迁移的四个原子步骤:准备 -> 降级 -> 切换 -> 哨兵。
/// <summary> /// 新建批次并移动股票 /// </summary> public bool MoveStockToNewBatch(string symbol) { lock (postProcessors) { int newBatchId = postProcessors.Count; //这里传入 newBatchId 作为 ID FastMDPostProcessor newProcessor = new FastMDPostProcessor(newBatchId, fastMdDic, postProcessors, config.LogFastMDError); postProcessors.Add(newProcessor); if (MoveStockToBatch(symbol, newBatchId)) { return true; } return false; } } public void MoveStockToBatch(string symbol, int targetBatchId) { int currentBatchId = symbolBatchIndex[symbol]; // Step 1: 通知 Target 开启缓冲模式 // Target 收到此指令后,会暂存该股票的新数据,不进行计算 postProcessors[targetBatchId].Enqueue(new MigrateStartCmd(symbol)); // Step 2: 通知 Source 开启“打包模式” (Direct Flag) // 这一步至关重要:Source 线程后续不再执行高耗时的 OrderBook 计算, // 而是直接将数据 Add 到 List,极大提升排空速度。 postProcessors[currentBatchId].MarkStockAsMigratingOut(symbol); // Step 3: 原子切换路由 // 此后的新数据流向 Target symbolBatchIndex[symbol] = targetBatchId; // Step 4: 发送哨兵指令 // Source 处理完当前队列积压的数据后,会遇到此指令,触发移交 postProcessors[currentBatchId].Enqueue(new MigrateFlushCmd(symbol, targetBatchId)); } - 处理器逻辑 (FastMDPostProcessor),消费者线程的循环逻辑升级为支持状态机的处理模式。
private void PostProcess() { while (postProcessQueue.TryDequeue(out object obj)) { string symbol = GetSymbol(obj); // === 场景 A: 正在移入 (Target) === // 新数据到达 Target,但旧数据还没来,先缓存 if (incomingBuffers.TryGetValue(symbol, out var buffer)) { buffer.Enqueue(obj); continue; } // === 场景 B: 正在移出 (Source) === // 核心优化:降级处理。不计算盘口,直接打包,加速队列排空 if (outgoingBuffers.TryGetValue(symbol, out var legacyList)) { legacyList.Add(obj); continue; } // === 场景 C: 正常计算 === if (obj is TAS tas) ProcessTAS(tas); // === 场景 D: 指令处理 === else if (obj is MigrateFlushCmd cmd) { // Source 完成打包,发送数据包给 Target var legacyData = outgoingBuffers[cmd.Symbol]; var target = allProcessors[cmd.TargetBatchId]; target.Enqueue(new MigrateCompleteCmd(cmd.Symbol, legacyData)); } else if (obj is MigrateCompleteCmd cmd) { // Target 收到旧数据包,开始严格顺序回放 // 1. 回放 Source 的旧数据 foreach(var item in cmd.LegacyData) Process(item); // 2. 回放 Buffer 中的新数据 while(incomingBuffers[cmd.Symbol].Count > 0) Process(incomingBuffers[cmd.Symbol].Dequeue()); // 3. 恢复正常 incomingBuffers.Remove(cmd.Symbol); } } }
四、验证与测试
为了验证这套复杂状态机的正确性, 构建了名为 MockDataFeed的数据类。
Mock 策略:
- 确定性数据源:生成带有严格递增序列号 (SN) 的 Mock TAS 数据。
- 热点模拟:通过 SetHotSpot(symbol, true) 方法,瞬间将某只股票的数据生成速率提升 2000 倍,人为制造队列积压。
- 一致性断言:在迁移发生时,通过日志系统追踪 SN 的连续性。
- Expect: Batch A 处理 SN: 1000 -> Batch B 处理 SN: 1001。
- Actual: 验证通过。
代码如下:
class MockDataFeed : DataFeedAdaptor, IFastMarketDataPendingReporter
{
/// <summary>
/// 内部类:维护单个模拟股票的状态
/// </summary>
class MockStockContext
{
public string Symbol { get; }
public ExchangeType Exchange { get; }
public double LastPrice { get; set; } = 10.0;
public long CurrentSN { get; set; } = 0; // 维护严格递增的序列号
public volatile bool IsHotSpot = false; // 是否开启洪水模式
public MockStockContext(string symbol, ExchangeType exchange)
{
Symbol = symbol;
Exchange = exchange;
}
}
private string name;
private long receivedData;
private FastMarketDataManager fastMarketDataManager;
private readonly XTPConfig config;
private volatile bool _running;
private Thread _generatorThread;
private readonly List<MockStockContext> _stocks;
private readonly Random _random = new Random();
// 配置参数
private const int StockCount = 40; // 模拟20只股票
private const int NormalSpeed = 1; // 普通股票每次循环生成1条数据
private const int HotSpotSpeed = 2000; // 热点股票每次循环生成2000条数据 (制造积压)
// 新增:每生成多少笔TAS后,生成一笔MarketData快照
private const int MarketDataInterval = 10;
public MockDataFeed(string configName)
{
name = configName;
config = new XTPConfig(configName);
FastMarketDataConfig cfg = new FastMarketDataConfig(
config.FastMDUpdateInterval,
config.CallAuctionFastMDUpdateInterval,
config.SZFastMDUpdateDelay,
config.SHFastMDUpdateDelay,
config.ProcessFastMarketDataBatchCount,
config.LogFastMDError);
fastMarketDataManager = new FastMarketDataManager(cfg);
fastMarketDataManager.NewFastMarketDatas += (mds) =>
{
// 可以在这里打断点或日志观察生成的 MarketData
// Logger.LogInfo($"Output {mds.Count} MarketDatas");
};
// 3. 初始化模拟股票列表
_stocks = new List<MockStockContext>();
for (int i = 0; i < StockCount; i++)
{
// 混合生成沪深股票
string prefix = i % 2 == 0 ? "600" : "000";
string symbol = $"{prefix}{i.ToString("D3")}";
// 交易所类型
var exchange = i % 2 == 0 ? ExchangeType.SH : ExchangeType.SZ;
_stocks.Add(new MockStockContext(symbol, exchange));
}
SymbolFilter.Instance.SetFastSymbolForDebug(_stocks.Select(x => x.Symbol).ToList());
}
public override string Name
{
get { return name; }
}
public override long ReceivedData
{
get { return receivedData; }
}
public override void Start()
{
if (_running) return;
_running = true;
// 启动生成线程
_generatorThread = new Thread(DataGeneratorWork)
{
Name = "MockDataGenerator",
IsBackground = true
};
_generatorThread.Start();
Logger.LogInfo("MockDataFeed Started. Generating data...");
}
public override void Stop()
{
_running = false;
if (_generatorThread != null && _generatorThread.IsAlive)
{
_generatorThread.Join(2000);
}
fastMarketDataManager.Stop();
}
/// <summary>
/// 数据生成主循环
/// </summary>
private void DataGeneratorWork()
{
// 先推送一波初始的 MarketData,确保 FastMarketData 对象被创建
foreach (var stock in _stocks)
{
var md = GenerateMarketData(stock);
fastMarketDataManager.UpdateMD(md);
}
while (_running)
{
try
{
foreach (var stock in _stocks)
{
// 决定本次生成的数据量
int count = stock.IsHotSpot ? HotSpotSpeed : NormalSpeed;
for (int i = 0; i < count; i++)
{
// 随机生成 TAS 或 OrderData
// 为了测试顺序性,我们主要生成 TAS,因为 TAS 有明确的 Sequence Number (SN)
TAS tas = GenerateNextTAS(stock);
// 推送给 Manager
fastMarketDataManager.UpdateTAS(tas);
}
}
// 稍微休眠一下,控制整体节奏,避免 CPU 100%
// 如果有热点股票,这个 Sleep 会导致积压(因为生成速度远大于消费速度)
Thread.Sleep(100);
}
catch (Exception ex)
{
Logger.LogError($"Mock Generator Error: {ex.Message}");
}
}
}
private MarketData GenerateMarketData(MockStockContext stock)
{
double currentPrice = Math.Round(stock.LastPrice, 2);
MarketData md = new MarketData
{
Symbol = stock.Symbol,
Exchange = stock.Exchange,
Time = DateTime.Now,
CurrentPrice = currentPrice,
Type = InstrumentType.Stock, // 确保类型正确
};
return md;
}
private TAS GenerateNextTAS(MockStockContext stock)
{
// 价格随机波动
double change = (_random.NextDouble() - 0.5) * 0.1;
stock.LastPrice = Math.Max(0.01, stock.LastPrice + change);
// 严格递增 SN
stock.CurrentSN++;
TAS tas = new TAS(stock.Symbol)
{
Exchange = stock.Exchange,
Time = DateTime.Now,
Price = Math.Round(stock.LastPrice, 2),
Size = _random.Next(100, 10000),
SN = stock.CurrentSN, // 关键:用于验证顺序
TradeType = TASTradeType.Trade,
BSFlag = TAS.BSType.Buy // 简单模拟
};
return tas;
}
public override string GetStatus()
{
return base.GetStatus();
}
/// <summary>
/// 移动股票到指定批次 (对接 UI)
/// </summary>
public void MoveStockToBatch(string symbol, int targetBatchID)
{
Logger.LogInfo($"UI Command: Move {symbol} to Batch {targetBatchID}");
fastMarketDataManager.MoveStockToBatch(symbol, targetBatchID);
}
/// <summary>
/// 设置某只股票是否为热点 (用于制造积压)
/// </summary>
public void SetStockHotSpot(string symbol, bool isHot)
{
var stock = _stocks.FirstOrDefault(s => s.Symbol == symbol);
if (stock != null)
{
stock.IsHotSpot = isHot;
Logger.LogInfo($"Stock {symbol} HotSpot set to: {isHot}");
}
}
public List<BatchPendingInfo> GetPendingInfo()
{
if (fastMarketDataManager != null)
{
return fastMarketDataManager.GetPendingInfo();
}
return new List<BatchPendingInfo>();
}
}
这里面,提供了将某只股票设置为热点和取消热点的方法,提供了将股票从一个热点移动到另外一个热点的方法。
五:可观测的结果
最后,设计了一个界面,根据我们预想的,可以看到新的结构能够处理好方案二中的静态哈希可能产生的负载倾斜,达到了预期的效果。

▲ 正常情况下,处理批次,以及积压情况

▲将批次0中的600030股票设置为热点股票,可以看到批次0的积压量迅速上升,而且还会影响到同批次里面的其它股票产生积压,但是目前不会对其它批次产生影响

▲ 将批次0中的热点股票600030,移动到空闲的批次4中,可以看到,批次0很快恢复正常,但会导致批次4产生新的积压。

▲再次将600030从批次4移动到新的批次(方法里传参数-1),可以看到新建了批次5,里面只有一只股票。批次5产生了积压,其它批次恢复正常。
参考: