ActiveMQ支持多種通訊協議TCP/UDP等,我們選取最常用的TCP來分析ActiveMQ的通訊機制。首先我們來明確一個概念:
客戶(Client):消息的生產者、消費者對ActiveMQ來說都叫作客戶。
消息中介(Message broker):接收消息並進行相關處理后分發給消息的消費者.
為了能清楚的描述出ActiveMQ的核心通訊機制,我們選擇3個部分來進行說明,它們分別是建立鏈接、關閉鏈接、心跳。
一、Client跟activeMQ的TCP通訊的初始化過程分析如下:
(1) ActiveMQ初始化時,通過TcpTransportServer類根據配置打開TCP偵聽端口,客戶端通過該端口發起建立鏈接的動作。
(2) 把接收到的socket放入阻塞隊列。
(3) 另外一個線程Socket handler阻塞着,監聽是否有新的socket,如果有則取出來。
(4) 生成一個TransportConnection的實例。TransportConnection類的主要作用是處理鏈路的狀態信息,並實現CommandVisitor接口來完成各類消息的處理。
(5) TransportConnection會使用一個由多個TransportFilter實例組成的消息處理鏈條,負責對接收到的各類消息進行處理並發送相應的應答。這個鏈條的典型組成順序:
MutexTransport->WireFormatNegotiator->InactivityMonitor->TcpTransport。在這條鏈條中最后的一環是TcpTransport類,它是實際和Client獲取和發送數據的地方,該類的重要方法有run()和oneway(),一個負責讀取,一個負責發送。
(6) 建鏈完成,可以進行通訊操作。
二、關閉鏈接
ActiveMQ發現TCP鏈接的關閉,最關鍵的代碼在TcpBufferedInputStream類中的
int n = in.read(buffer, position, buffer.length - position);
三、心跳
為了更好的維護TCP鏈路的使用,ActiveMQ采用了心跳機制作為判斷雙方鏈路的健康情況。ActiveMQ使用的是雙向心跳,也就是ActiveMQ的Broker和Client雙方都進行相互心跳,但不管是Broker或Client心跳的具體處理情況是完全一樣的,都在InactivityMonitor類中實現,下面具體介紹。
心跳會產生兩個線程“InactivityMonitor ReadCheck”和“InactivityMonitor WriteCheck”,它們都是Timer類型,都會隔一段固定時間被調用一次。ReadCheck線程主要調用的方法是readCheck(),當在等待時間內,有消息接收到,則該方法會返回true。WriteCheck線程主要調用的方法是writeCheck()。
這有個小技巧,大家可以參考一下,那就是當WriteCheck線程休眠時,有任何數據發送成功,則該線程被喚醒后,不用通過TCP向對方真的發送心跳消息,這樣可以從一定程度上減少網絡傳輸的數據量。