1. 背景
服務后台實時收集千萬級別在線終端、全國近400個城市的出租車、手機和pad等移動終端的位置點gps信息,然后根據gps所在城市區域,持久化並推送分發給不同的訂閱用戶。
其業務邏輯圖如下:
1.1 需求特征
a 實時性(gps點本身具有實時性的特征,例如打車服務,需要周邊實時出租車位置信息)
b 數據量大(全國實時gps點數據規模 T級別/per day,高峰期時達到1G/min)
1.2 推送方式選擇
數據推送方式通常有兩種類型:
a Pull方式,這種方式服務端開發相對簡單,可以采用緩存+httpserver的方式解決;
b Push方式,這種方式通常滿足實時性的需求,對服務端而言邏輯相對復雜,需要維持大並發的連接和發送
由於實時性和大數據量需求的特征,所以系統采用"Push+長連接方式"進行推送。當然實現一套支持實時海量數據和客戶推送的系統,需要解決的關鍵技術問題有很多: 如分布式集群,集群的failover和balancer能力,集群節點的配置管理等等。本系統借鑒hadoop的RPC模塊,實現一套訂閱發布實時推送服務,下面主要說說為了提高單節點的並發和吞吐量的些trick.
2. 架構圖
3. 性能優化
3.1 異步數據發送
異步發送邏輯如下:
private int channelIO(WritableByteChannel writeCh, ByteBuffer buf) throws IOException { int originalLimit = buf.limit(); int initialRemaining = buf.remaining(); int ret = 0; while (buf.remaining() > 0) { try { int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); buf.limit(buf.position() + ioSize); ret = writeCh.write(buf); if (ret < ioSize) { break; } } finally { buf.limit(originalLimit); } } int nBytes = initialRemaining - buf.remaining(); return (nBytes > 0) ? nBytes : ret; }
這里主要有兩優化點:①為防止待寫的數據量過大導致獨占線程時間片過長,在8行代碼對ByteBuffer進行了分片發送(盡管tcp盡量避免數據分片和組包),②在11行代碼,當通道沒法寫完數據時,應讓出線程,立刻返回注冊到selector,待下次writeCh通道變成writable可寫狀態時,再進行channelIO寫操作(這是nio與bio的最大區別)。
3.2 用多selector機制,分離網絡讀寫操作
ReadSelector負責監聽用戶的請求和鑒權響應,若用戶請求為合法,則把相應連接注冊給WriteSelector;WriteSelector負責將接收的實時gps點數據推送給已鑒權成功的注冊用戶連接
3.3 使用多selector機制,進行異步寫數據
可以根據客戶端的端口hash到不同的Selector上去執行寫的操作,如下:
private Responder selectResponder(int remotePort){ int index = Math.abs(remotePort % responderCount); return responders[index]; }
4. 容錯健壯性
最后還得考慮實時數據流大和頻率高的特征,當存在網絡不好或帶寬不足時,服務會存在數據發送不贏而導致堆積的潛在風險。所以為每個連接增加個隊列ResponseQueue,來維護待發送數據集。只有數據隊列中存在數據時,就將相應連接注冊到WriteSelector。如下圖:
這里主要用到兩個trick:
4.1 避免服務數據堆積
當網絡狀況不好對方接收較慢或發送數據量比較大時,這兩種情況下,都會造成服務數據堆積。因此,引入參數連接的緩沖數據隊列大小限制maxAllowedQueueSize。如果數據批次隊列大於maxAllowedQueueSize,則直接丟棄,避免數據無上限增長,如下代碼:
void doRespond(Call call) throws IOException { try { synchronized (call.connection.responseQueue) { if (call.connection.responseQueue.size() < maxAllowedQueueSize) { call.connection.responseQueue.addLast(call); if (call.connection.responseQueue.size() == 1) { processResponse(call.connection.responseQueue, true); } } else { logger.warn( "incoming data discarded from connection {}", call.connection); } } } catch (NullPointerException e) { logger.error(e.getMessage(), e); } }
4.2 定期掃描和關閉壞掉的連接資源
這里的壞掉是指數據在一段時間內一直停留在連接connection的數據隊列里,則認為該連接已失效而直接清理隊列數據和關閉相應連接。代碼如下:
private void doPurge(Call call, long now) throws IOException { if(call.connection == null || call.connection.responseQueue == null){ return ; } LinkedList<Call> responseQueue = call.connection.responseQueue; synchronized (responseQueue) { Iterator<Call> iter = responseQueue.listIterator(0); while (iter.hasNext()) { call = iter.next(); if (now > call.timestamp + PURGE_INTERVAL) { logger.info("dalay of current connection {} exceeds 10 mins",call.connection); closeConnection(call.connection); } } } }
希望對有類似需求的網友能提供些參考和討論。