ActiveMQ之 TCP通訊機制


  

  ActiveMQ支持多種通訊協議TCP/UDP等,我們選取最常用的TCP來分析ActiveMQ的通訊機制。首先我們來明確一個概念
  客戶(Client):消息的生產者、消費者對ActiveMQ來說都叫作客戶。 
  消息中介(Message broker)接收消息並進行相關處理后分發給消息的消費者.

  

     

 

  為了能清楚的描述出ActiveMQ的核心通訊機制,我們選擇3個部分來進行說明,它們分別是建立鏈接、關閉鏈接、心跳。 

一、ClientactiveMQTCP通訊的初始化過程分析如下: 
  (1) ActiveMQ初始化時,通過TcpTransportServer類根據配置打開TCP偵聽端口,客戶端通過該端口發起建立鏈接的動作。 
  (2) 把接收到的socket放入阻塞隊列。 
  (3) 另外一個線程Socket handler阻塞着,監聽是否有新的socket,如果有則取出來。 
  (4) 生成一個TransportConnection的實例。TransportConnection類的主要作用是處理鏈路的狀態信息,並實現CommandVisitor接口來完成各類消息的處理。 
  (5) TransportConnection會使用一個由多個TransportFilter實例組成的消息處理鏈條,負責對接收到的各類消息進行處理並發送相應的應答。這個鏈條的典型組成順序: 

    MutexTransport->WireFormatNegotiator->InactivityMonitor->TcpTransport。在這條鏈條中最后的一環是TcpTransport類,它是實際和Client獲取和發送數據的地方,該類的重要方法有run()oneway(),一個負責讀取,一個負責發送。 
  (6) 建鏈完成,可以進行通訊操作。 


二、關閉鏈接 
  ActiveMQ發現TCP鏈接的關閉,最關鍵的代碼在TcpBufferedInputStream類中的 

  int n = in.read(buffer, position, buffer.length - position); 


三、心跳 
  為了更好的維護TCP鏈路的使用,ActiveMQ采用了心跳機制作為判斷雙方鏈路的健康情況。ActiveMQ使用的是雙向心跳,也就是ActiveMQBrokerClient雙方都進行相互心跳,但不管是BrokerClient心跳的具體處理情況是完全一樣的,都在InactivityMonitor類中實現,下面具體介紹。 
  心跳會產生兩個線程InactivityMonitor ReadCheck”InactivityMonitor WriteCheck”,它們都是Timer類型,都會隔一段固定時間被調用一次。ReadCheck線程主要調用的方法是readCheck(),當在等待時間內,有消息接收到,則該方法會返回trueWriteCheck線程主要調用的方法是writeCheck()

  這有個小技巧,大家可以參考一下,那就是當WriteCheck線程休眠時,有任何數據發送成功,則該線程被喚醒后,不用通過TCP向對方真的發送心跳消息,這樣可以從一定程度上減少網絡傳輸的數據量。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM