ActiveMQ詳解


Apache ActiveMQ介紹

ActiveMQ是一個開源的,實現了JMS1.1規范的面向消息(MOM)中間件,為應用程序提供高效的、可擴展的、穩定的和安全的企業級消息通信。ActiveMQ使用Apache提供的授權,任何人都可以對其實現代碼進行修改。編寫客戶端支持的語言包括java,C/C++, .NET, Perl, PHP, Python, Ruby等。 ActiveMQ的設計目標是提供標准的,面向消息的,能夠跨越多語言和多系統的應用集成消息通信中間件。ActiveMQ實現了JMS標准並提供了很多附加的特性。這些附加的特性包括: 1. JMX管理(java Management Extensions,即java管理擴展) 2. 主從管理(master/salve,這是集群模式的一種,主要體現在可靠性方面,當主中介(代理)出現故障,那么從代理會替代主代理的位置,不至於使消息系統癱瘓) 3. 消息組通信(同一組的消息,僅會提交給一個客戶進行處理) 4. 有序消息管理(確保消息能夠按照發送的次序被接受者接收) 5. 消息優先級(優先級高的消息先被投遞和處理) 6. 訂閱消息的延遲接收(訂閱消息在發布時,如果訂閱者沒有開啟連接,那么當訂閱者開啟連接時,消息中介將會向其提交之前的,其未處理的消息) 7. 接收者處理過慢(可以使用動態負載平衡,將多數消息提交到處理快的接收者,這主要是對PTP消息所說) 8. 虛擬接收者(降低與中介的連接數目) 9. 成熟的消息持久化技術(部分消息需要持久化到數據庫或文件系統中,當中介崩潰時,信息不會丟失) 10. 支持游標操作(可以處理大消息) 11. 支持消息的轉換 12. 通過使用Apache的Camel可以支持EIP 13. 使用鏡像隊列的形式輕松的對消息隊列進行監控等。

使用MQ的場景

像COM、CORBA、DCE和EJB等應用技術使用RPC(Remote Procedural Calls,即遠程過程調用)屬於緊耦合技術。使用RPC,一個應用程序調用另一個應用程序,調用者必須阻塞,直到被調用者執行結束返回結果信息為止。 ![](http://images2017.cnblogs.com/blog/575312/201709/575312-20170907101916257-125661489.png)

下圖給出一種松耦合的方式,進行架構設計:應用程序1向消息中介(MOM)發送一條消息,很可能一段時間之后,應用程序2調用MOM來收取消息。任何一個應用程序都不知道對方是否存在也不需要阻塞等待。這種通信方式大大縮減了維護開銷,因為對於一個應用程序的修改,會對其他應用程序影響極小。

ActiveMQ就是采用了上面提到的松耦合方式,因此,我們經常說應用程序發送消息僅僅是觸發后即忘卻。應用程序將消息發送給ActiveMQ而並不關心什么時間以何種方式消息投遞給接收者。同樣的消息接收者也不會關心消息來源於哪里和消息是怎樣投遞給ActiveMQ的。因此下圖可以很好地表示出amq(broker,服務端,消息傳遞提供者)和客戶端(client,消息收發者)之間的模型建立。

MQ的安裝

Step1 運行ActiveMQ至少需要JavaSE 1.5的支持,在使用ActiveMQ之前,需要先下載和安裝Java 運行環境。 Step2 下載ActiveMQ,可以從官方網站(http://activemq.apache.org/download.html.)上免費下載最新版本的ActiveMQ。 Step3 運行 ActiveMQ:./bin/activemq。通過這條命令就會啟動ActiveMQ的代理,並啟動了一些常用的連接器供客戶端連接使用,主要包括TCP,SSL,STOMP和XMPP。當看到如下頁面,說明mq已成功啟動。 ![](http://images2017.cnblogs.com/blog/575312/201709/575312-20170907105821522-459846238.png)

Step4 http://localhost:8161/可進入網頁控制台(監控、操作broker)。默認用戶名和密碼都為admin。

收發消息的簡單實現

```java // 生產者 public class JMSProducer {

private static final String USERNAME = "admin";

private static final String PASSWORD = "123";

private static final String BROKEURL = "failover://tcp://localhost:61616";

private static final int SENDNUM = 10;

public static void main(String[] args) {

ConnectionFactory connectionFactory;

Connection connection = null;

Session session;

Destination destination;

MessageProducer messageProducer;

connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);

try {

connection = connectionFactory.createConnection();

connection.start();

session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

destination = session.createQueue("hyr_event");

messageProducer = session.createProducer(destination);

// messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//不用設置持久,默認的

// messageProducer.setTimeToLive(10000);

sendMessage(session, messageProducer);//發消息頭帶有可篩選屬性的消息

session.commit();

session.close();

} catch (Exception e) {

e.printStackTrace();

}

}

public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {

int i = 0;

String type1 = "car";

String type2 = "bike";

String color1 = "white";

String color2 = "black";

String color3 = "red";

for (i = 0; i < SENDNUM; i++) {

TextMessage message = session.createTextMessage();

if(i < 5){

message.setStringProperty("TYPE", type1);

  // message.setStringProperty("TYPE", type2);

message.setStringProperty("COLOR", color1);

message.setStringProperty("COLOR", color2);

}

if(i >= 5){

message.setStringProperty("TYPE", type1);

message.setStringProperty("TYPE", type2);

message.setStringProperty("COLOR", color1);

message.setStringProperty("COLOR", color2);

}

message.setText("發送的第" + (i + 1) + "條消息:");

System.out.println("發送的第" + (i + 1) + "條消息");

//通過消息生產者發出消息

messageProducer.send(message);

 //  sleep(1000);
                }
        }

}


```java
// 消費者
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
  
public class JMSConsumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默認的連接用戶名 
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默認的連接密碼 
    private static final String BROKEURL = "failover://tcp://10.63.240.216:61616"; // 默認的連接地址
 
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 連接工廠 
        Connection connection = null; // 連接 
        Session session; // 會話 接受或者發送消息的線程 
        Destination destination; // 消息的目的地 
        MessageConsumer messageConsumer;// 消息消費者 
 
        // 實例化連接工廠 
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
 
        try {
            connection = connectionFactory.createConnection();// 通過工廠獲取連接 
            connection.start();// 啟動連接 
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 第一個參數為是否開啟事務
            destination = session.createQueue("HELLO_ActiveMQ");// 創建消息隊列
            String selector = "SYMBOL = 'A'";
            messageConsumer = session.createConsumer(destination,selector);// 創建消息消費者
       //   MessageConsumer  messageConsumer1 = session.createConsumer(destination,selector);// 創建消息消費者
          //  messageConsumer = session.createConsumer(destination);
 
            /*
             * 實際應用中,不會這么用,會注冊一個監聽
             */
            while (true) {
                TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
                if (textMessage != null) {
                    System.out.println("收到的消息:" + textMessage.getText());
                } else {
                 //   session.commit();
                   connection.close();
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

ActiveMQ內部實現

![](http://images2017.cnblogs.com/blog/575312/201709/575312-20170907135625022-788833581.png) 生產者進程向activeMQ所在進程發送消息和消費者消費消息的過程如上圖所示,消息傳遞的路徑經過了核心領域模型,具體步驟如下: 步驟1:生產者通過向activeMQ為它建立好的TransportConnection發送消息給activeMQ。 步驟2:TransportConnection對象找到RegionBroker。 步驟3:RegionBroker根據消息的類型找到對應的消息區域(Region)。 步驟4:該Region在它自己里面找到相應的消息目的地。 步驟5、6:該目的地首先根據需要進行持久化操作,並使用待發送消息指針對象。 步驟7:當有合適的消息消費者、訂閱者來到時,目的地會找到這些消費者。 步驟8、9:通過該消費者對應的TransportConnection,消息從activeMQ中出來,發給相應的消費者進程。

queue和topic

在JMS中,topic實現的是發布訂閱的語義。您發布消息時,它將發送給所有感興趣的訂閱者——因此,對於許多訂閱者來說,0到許多訂閱者將收到消息的副本。值得注意的是只有在代理接收到消息的時候擁有一個活躍訂閱的訂閱者將獲得消息的副本。queue實現的是負載均衡的語義。一條消息只被一個消費者接收,如果在發送消息時沒有可用的用戶,則將該消息保留,直到可以接收該消息的用戶可用為止。如果消費者收到一條消息,並且在關閉之前不承認它(not ack),那么消息將被重新發送到另一個消費者。隊列可以讓許多消費者在可用的消費者之間平衡消息負載。

消息持久化

JMS規范支持兩種類型的消息傳遞:持久性和非持久性。持久性消息傳遞必須將持久性屬性記錄到穩定存儲中,非持久性只是進行最大努力的傳遞信息,不用記錄。ActiveMQ支持這兩種消息傳遞,也可配置為支持消息恢復,介於兩者之間的狀態的消息被存在內存中。ActiveMQ很好的支持了消息的持久性(Persistence)。消息持久性對於可靠消息傳遞來說應該是一種比較好的方法,有了消息持久化,即使發送者和接受者不是同時在線或者消息中心在發送者發送消息后宕機了,在消息中心重新啟動后仍然可以將消息發送出去,如果把這種持久化和 ReliableMessaging結合起來應該是很好的保證了消息的可靠傳送。消息持久性的原理很簡單,就是在發送者將消息發送出去后,消息中心首先將消息存儲到本地數據文件、內存數據庫或者遠程數據庫等,然后試圖將消息發送給接收者,發送成功則將消息從存儲中刪除,失敗則繼續嘗試。消息中心啟動以后首先要檢查制定的存儲位置,如果有未發送成功的消息,則需要把消息發送出去。 對於ActiveMQ,消息的持久化是很簡單的,僅僅通過配置信息就可以實現。非持久性消息通常被用在發送通知和實時數據。當性能要求是第一位,確認消息交付在第二位時應該選用非持久性消息。

  • 配置文件

  • 消息怎樣存放在AMQ中
    了解消息存儲有助於我們進行配置,並了解在發送持久性消息時broker發生了什么。queue和topic存儲消息是不同的,因為有些存儲的優化可以被用在topics上而對queues毫無意義。
    隊列存儲概念直截了當,消息先進先出,如下圖:

某時一條消息被調度到單個消費者。只有當消息被消費並答復時才會從代理消息倉庫中刪除。
對於一個主題的持久訂閱者,每個消費者獲取一條消息的拷貝。為了節省存儲空間,只有消息的一個拷貝被代理存儲。存儲中的一個持久訂閱者對象維護一個指向它的下一個存儲消息的指針並分派它的一個拷貝到它的消費者,如下圖:

消息倉庫以這種方式實現因為每一個持有訂閱者可能以不同的速度消費消息,或者同一時間它們可能不都在運行。同時,因為每個消息可能潛在的有多個消費者,一條消息不能被倉庫刪除在直到它被成功地傳遞到每個對它感興趣的持久訂閱者。

kahadb原理

KahaDB消息倉庫(Amq默認的持久化方式)是所有提供消息存儲實現中最快的。它的速度是由於組合了包含數據日志文件的快速的事務性日報,高度優化的消息ID索引,和內存內消息緩存。下圖提供了一張上層KahaDB消息倉庫圖。

Data logs:作為消息日報,它包含了存儲在一定長度的數據文件的一個消息輪環日志和命令(例如事務性范圍和消息刪除)。如果當前使用的文件已達到最大長度,會創建一個新的數據文件。在數據文件中的所有消息被參考計算,所以一旦那個數據文件中的每個消息不再被需要,消息文件能被刪除或存檔。在數據日志中,消息只被附加到當前數據文件的尾部,所以存儲很快。
cache:如果消息有活動的消費者,則緩存(cache)臨時保存消息。如果有活動的消費者,當消息被安排存儲的時候同時被分派出去了,如果消息及時回應,那它們不必存到磁盤中。
BTree 索引:在由消息ID索引的數據日志中BTree 索引保存這些數據的說明。這些索引為隊列維護先進先出數據結構,為主題消息維護持續訂閱者指針。
Redo日志:僅用作當amq沒有正常關閉時保證btree索引的完整性。

了解了每一塊的含義,現在講一下每一塊到底可以做什么:

checkPoint:在內存(cache)中的那部分B-Tree是Metadata Cache,磁盤上的叫Metadata Store,它對應於文件db.data。顯而易見的,通過將索引緩存到內存中,可以加快查詢的速度,這個同步過程就稱為:check point。
消息的恢復和B-tree重建:有了B-Tree,Broker(消息服務器)可以快速地重啟恢復,因為它是消息的索引,根據它就能恢復出每條消息的location。如果Metadata Store被損壞,則只能掃描整個Data Logs來重建B樹了,這個過程是很復雜且緩慢的。 
消息載體:Data Logs以日志形式存儲消息,它是生產者生產的數據的真正載體,對應於文件 db-*.log,默認是32MB。

kahadb在磁盤上的目錄結構和上層設計圖是保持一致的:

db-編號 :數據日志文件產生,達到預定大小編號自動加1;銷毀:當沒有任何引用指向該數據日志文件中的消息,那么該日志文件被刪除或歸檔
 歸檔目錄:在啟用歸檔功能時才產生(默認關閉),如果關閉消息不再使用時日志文件會被刪除
db.data :保存 數據日志文件中消息的持久化btree索引
db.redo: 重做日志,用於kahaDb從一次非正常關閉后重啟時Btree索引的恢復

最關鍵的6個配置

1. indexWriteBatchSize 默認值1000,當Metadata Cache中更新的索引到達了1000時,才同步到磁盤上的Metadata Store中。不是每次更新都寫磁盤,而是批量更新寫磁盤,比較寫磁盤的代價是很大的。 2. indexCacheSize 默認值10000,(number of index pages cached in memory),在內存中最多分配多個頁面來緩存index。緩存的index越多,命中的概率就越大,檢索的效率就越高。 3. journalMaxFileLength 默認值32MB,當存儲的消息達到32MB時,新建一個新文件來保存消息。這個配置對生產者或消息者的速率有影響。比如,生產者速率很快而消費者速率很慢時,將它配置得大一點比較好。 4. enableJournalDiskSyncs 默認值true,默認采用同步寫磁盤,即消息先存儲到磁盤中再向Producer返回ACK 5. cleanupInterval 默認值30000ms,當消息被消息者成功消費之后,Broker就可以將消息刪除了。 6.checkpointInterval 默認值5s,每隔5s將內存中的Index(Metadata Cache)更新到磁盤的Index文件中(Metadata Store)

總結

MQ使用中,還是碰到過很多問題。如,MQ使用中意外重啟或崩掉,MQ發送方可以重連,MQ接收方必須重啟進程才可以重連。解決方案:新建一個斷鏈重連Listen接口,需要重連的接收方注冊監聽器(實現重新注冊MQ),服務端啟動一個Monitor線程(守護)輪詢MQ服務器,發現斷鏈,調用所有監聽器。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM