實時大規模數據的訂閱和推送服務


1. 背景  

     服務后台實時收集千萬級別在線終端、全國近400個城市的出租車、手機和pad等移動終端的位置點gps信息,然后根據gps所在城市區域,持久化並推送分發給不同的訂閱用戶。

     其業務邏輯圖如下:

                               

 

 1.1 需求特征

  a 實時性(gps點本身具有實時性的特征,例如打車服務,需要周邊實時出租車位置信息)

  b 數據量大(全國實時gps點數據規模 T級別/per day,高峰期時達到1G/min) 

 1.2 推送方式選擇    

  數據推送方式通常有兩種類型:  

  a Pull方式,這種方式服務端開發相對簡單,可以采用緩存+httpserver的方式解決;

  b Push方式,這種方式通常滿足實時性的需求,對服務端而言邏輯相對復雜,需要維持大並發的連接和發送

  由於實時性和大數據量需求的特征,所以系統采用"Push+長連接方式"進行推送。當然實現一套支持實時海量數據和客戶推送的系統,需要解決的關鍵技術問題有很多: 如分布式集群,集群的failover和balancer能力,集群節點的配置管理等等。本系統借鑒hadoop的RPC模塊,實現一套訂閱發布實時推送服務,下面主要說說為了提高單節點的並發和吞吐量的些trick. 

2. 架構圖    

3. 性能優化

 3.1 異步數據發送

   異步發送邏輯如下:  

private int channelIO(WritableByteChannel writeCh, ByteBuffer buf)
            throws IOException {
        int originalLimit = buf.limit();
        int initialRemaining = buf.remaining();
        int ret = 0;
        while (buf.remaining() > 0) {
            try {
                int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
                buf.limit(buf.position() + ioSize);
                ret = writeCh.write(buf);
                if (ret < ioSize) {
                    break;
                }
            } finally {
                buf.limit(originalLimit);
            }
        }
        int nBytes = initialRemaining - buf.remaining(); 
        return (nBytes > 0) ? nBytes : ret;
    }

   這里主要有兩優化點:①為防止待寫的數據量過大導致獨占線程時間片過長,在8行代碼對ByteBuffer進行了分片發送(盡管tcp盡量避免數據分片和組包),②在11行代碼,通道沒法寫完數據時,應讓出線程,立刻返回注冊到selector,待下次writeCh通道變成writable可寫狀態時,再進行channelIO寫操作(這是niobio的最大區別)。 

 3.2 用多selector機制,分離網絡讀寫操作

      ReadSelector負責監聽用戶的請求和鑒權響應,若用戶請求為合法,則把相應連接注冊給WriteSelector;WriteSelector負責將接收的實時gps點數據推送給已鑒權成功的注冊用戶連接         

 3.3 使用多selector機制,進行異步寫數據

     可以根據客戶端的端口hash到不同的Selector上去執行寫的操作,如下: 

    private Responder selectResponder(int remotePort){
        int index = Math.abs(remotePort % responderCount);
        return responders[index];
    }

4. 容錯健壯性

      最后還得考慮實時數據流大和頻率高的特征,當存在網絡不好或帶寬不足時,服務會存在數據發送不贏而導致堆積的潛在風險。所以為每個連接增加個隊列ResponseQueue,來維護待發送數據集。只有數據隊列中存在數據時,就將相應連接注冊到WriteSelector。如下圖:

                                       

        這里主要用到兩個trick:  

   4.1 避免服務數據堆積

     當網絡狀況不好對方接收較慢或發送數據量比較大時,這兩種情況下,都會造成服務數據堆積。因此,引入參數連接的緩沖數據隊列大小限制maxAllowedQueueSize如果數據批次隊列大於maxAllowedQueueSize,則直接丟棄,避免數據無上限增長,如下代碼: 

void doRespond(Call call) throws IOException {
        try {
                    synchronized (call.connection.responseQueue) {
                        if (call.connection.responseQueue.size() < maxAllowedQueueSize) {
                            call.connection.responseQueue.addLast(call);
                            if (call.connection.responseQueue.size() == 1) {
                                processResponse(call.connection.responseQueue, true);
                            }
                        } else {
                            logger.warn(
                                    "incoming data discarded from connection {}",
                                    call.connection);
                        }
                    }
                } catch (NullPointerException e) {
                    logger.error(e.getMessage(), e);
                }
            }

   4.2 定期掃描和關閉壞掉的連接資源

     這里的壞掉是指數據在一段時間內一直停留在連接connection的數據隊列里,則認為該連接已失效而直接清理隊列數據和關閉相應連接。代碼如下: 

            private void doPurge(Call call, long now) throws IOException {
              if(call.connection == null || call.connection.responseQueue == null){
                  return ;
              }
              LinkedList<Call> responseQueue = call.connection.responseQueue;
              synchronized (responseQueue) {
                Iterator<Call> iter = responseQueue.listIterator(0);
                while (iter.hasNext()) {
                  call = iter.next();
                  if (now > call.timestamp + PURGE_INTERVAL) {
                    logger.info("dalay of current connection {}  exceeds 10 mins",call.connection);
                    closeConnection(call.connection);
                    
                  }
                }
              }
            }

     希望對有類似需求的網友能提供些參考和討論。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM