ActiveMQ源碼架構解析第一節(轉)


 

  工作四年已久,也快到了而立之年,本人也酷愛技術,總是想找一些途徑來提升自己,想着溫故而知新所以就寫起了博客,然而寫博客這個想法也是醞釀了很久,近期也看到了有很多人在問關於ActiveMQ的相關問題,有幸接觸ActiveMQ從今天開始我會定期總結一些關於ActiveMQ的相關知識,本篇重點是先分析ActiveMQ的架構以及設計模式等相關知識,然后在說如何更好的使用,讓ActiveMQ發揮最好的性能,歡迎大家跟帖討論,有錯誤的地方還請大家不吝雅正,小方在此謝過!

 

第一篇文章我們先從hello world寫起,下面是使用java代碼調用activemq的api發送一條消息。

 

public class test {

    public static void main(String[] args) throws Exception {

       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(

              "tcp://localhost:61616");

       Connection connection = connectionFactory.createConnection(); 

       connection.start();      

       Session session = connection.createSession(false,

              Session.AUTO_ACKNOWLEDGE);

       MessageProducer producer = session.createProducer(session

              .createQueue("testQueue"));

       Message message = session.createTextMessage("hello everybody!");

       producer.send(message);

       producer.close();

       session.close();

       connection.close();

    }

}

 

 

 

下面來分析發送消息的幾個步驟,如下:

 

1、創建ActiveMQConnectionFactory,入參是url,指定schema以及要連接的ip和端口號,其次是創建ActiveMQConnection,tcp協議交互肯定是要使用Socket類,所以說明下ActiveMQConnection->Transport->Socket的關系,Transport是對Socket的封裝,而ActiveMQConnection則是對Transport的封裝,如下圖所示:

 

2、創建ActiveMQConnection,因為上圖已經說明ActiveMQConnection和Transport是組合關系,所以創建ActiveMQConnection時首先要創建Transport,因為ActiveMQ的交互方式分為Tcp、Udp以及HTTP協議,ActiveMQ使用了非常經典的簡單工廠設計模式,使用這個模式的好處是工廠可以根據uri的schema頭來動態創建相應的TransportFactory工廠,例如用戶輸入tcp://localhost:61616,ObjectFactory則可以獲取到schema是tcp然后來實例化TcpTransportFactory,然后在調用TcpTransportFactory工廠來生產TcpTransport對象,簡單工廠模式如下圖,我是把2個工廠畫到了一起:          創建完TcpTransport還不夠,因為TcpTransport只實現了發送和接受消息,還需要做一些封裝來實現相應的業務處理,說到封裝這里使用到了包裝設計模式,也叫裝飾者模式,jdk的java.io輸入輸出流也有使用,類圖如下:   

1、1、MutexTransportFilter類實現了對每個請求的同步鎖,同一時間只允許發送一個請求,如果有第二個請求需要等待第一個請求發送完畢才可繼續發送。

 

2、2、WireFormatNegotiator類實現了在客戶端連接broker的時候先發送數據解析相關的協議信息,例如解析版本號,是否使用緩存等信息。

 

3、3、InactivityMonitor類實現了連接成功后啟動心跳檢查機制,客戶端每10秒發送一次心跳信息,服務端每30秒讀一次心跳信息,如果沒有讀到則會斷開連接,心跳檢測是相互的,客戶端也會每30秒讀取服務端發送來的心跳信息,如果沒有讀到也一樣會斷開連接。

 

4、4、ResponseCorrelator類實現了異步請求但需要獲取響應信息否則就會阻塞等待功能。

 

創建ActiveMQConnection的時序圖如下:

 

 

客戶端調用createConnection()方法,通過TcpTransportFactory獲取到TcpTransport對象,得到TcpTransport對象后,TcpTransportFactory又調用自己的configure方法對TcpTransport進行了包裝,代碼如下:

 

public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {

    transport = compositeConfigure(transport, wf, options);

    transport = new MutexTransport(transport);

    transport = new ResponseCorrelator(transport);

    return transport;

}

 

這就是包裝設計模式的使用,當調用transport.request()發送消息時,時序圖如下:

 

       每次transport調用request方法時都會先判斷next是否為空,如果不為空則先調用next.request(),當然在調用之前和之后都會加入相應的邏輯,包裝設計模式的好處就是當我不想啟動心跳檢測功能時則可以非常簡單的實現,只需要不為WireFormatNegotiator設置next屬性為InactivityMonitor即可,獲取加入其它功能不需要修改原來的設計只需要新增一個類即可,充分的體現了設計模式的開放封閉原則以及單一職責原則,設計模式在ActiveMQ中可以說是使用的淋漓盡致,恰到好處。

 

    創建完ActiveMQConnection之后就是創建ActiveMQSession以及ActiveMQMessageProducer了,這兩個對象的創建就比較簡單了沒什么可說的,當然在創建這些對象的時候客戶端會發送相應的信息給服務端,本節主要講解連接的建立。信息的交互以及Message的發送就留到下一節在說了,第一次寫這么長的博文,可能有的地方講的思路不清晰,還請大家多多指正,謝謝大家,睡覺了,明天還要加班!

http://www.iteye.com/topic/1137523

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM