MQ介紹 & 實例


閱讀目錄

 

定義:

消息隊列(MQ)是一種應用程序對應用程序的通信方法,應用程序通過隊列進行通信,而不是通過直接調用彼此來通信,隊列的使用除去了接收和發送應用程序同時執行的要求。是進行通信的中間件產品。(換言之:MQ負責兩個系統之間傳遞消息,這兩個系統可以是異構的,處於不同硬件、不同操作系統、用不同語言編寫,只需要簡單的調用幾個MQ的API,就可以互相通訊,你不必考慮底層系統和網絡的復雜性。MQ能夠應付多種異常情況,例如網絡阻塞、臨時中斷等等)

PS:直接調用通常是用於諸如遠程過程調用的技術。

補充知識:MB(消息路由、數據轉換)

優秀MQ特點

  a>.高可用性,希望MQ能支撐7x24小時應用,而不是三天兩頭當機,要做到高可用性,就需要做MQ的集群,一台當了,不影響整個集群的服務能力,這里涉及到告警、流控、消息的負載均衡、數據庫的使用、測試的完備程度等等。

  b>.消息存儲的高可靠性。要保證100%不丟消息。這不僅僅是MQ的責任,更涉及到硬件、操作系統、語言平台和數據庫的一整套方案。許多號稱可靠存儲的MQ產品其實都不可靠,要知道,硬件錯誤是常態,如果在硬件錯誤的情況下還能保證消息的可靠存儲這才是難題。這里可能需要用到特殊的存儲硬件,特殊的數據庫,分布式的數據存儲,數據庫的分庫分表和同步等等。你要考慮消息存儲在哪里,是文件系統,還是數據庫,是本地文件,還是分布式文 件,是搞主輔備份呢還是多主機寫入等等。

  c>.高可擴展性,MQ集群能很好地支持水平擴展,這就要求我們的節點之間最好不要有通信和數據同步。

  d>.性能,性能是實現高可用性的前提,很難想象單機性能極差的MQ組成的集群能在高負載下幸免於難。性能因素跟采用的平台、語言、操作系統、代碼質量、數據 庫、網絡息息相關。MQ產品的核心其實是消息的存儲,在保證存儲安全的前提下如何保證和提高消息入隊的效率是性能的關鍵因素。這里需要開發人員建立起性能觀念,不需要你對一行行代碼斤斤計較,但是你需要知道這樣做會造成什么后果,有沒有更好更快的方式,你怎么證明它更好更快。軟件實現的性能是一方面,另一 方面就是平台相關了,因為MQ本質上是IO密集型的系統,瓶頸在IO,如何優化網絡IO、文件IO這需要專門的知識。性能另一個相關因素是消息的調度上, 引入消息順序和消息優先級,允許消息的延遲發送,都將增大消息發送調度的復雜性,如何保證高負載下的調度也是要特別注意的地方。

  e>.高可配置性和監控工具的完整,這是一個MQ產品容易忽略的地方。異步通信造成了查找問題的難度,不像同步調用那樣有相對明確的時序關系。因此查找異步通信 的異常是很困難的,這就需要MQ提供方便的DEBUG工具,查找分析日志的工具,查看消息生命周期的工具,查看系統間依賴關系的工具等等。可定制也是MQ 產品非常重要的一方面,可方便地配置各類參數並在集群中同步,並且可動態調整各類參數,這將大大降低維護難度。

Ps:一句話總結:全天候不宕機,安全消息存儲,100%不丟失數據高效率的寫入讀出,同時要求方便查錯

產品比較 

RabbitMQ

是使用Erlang編寫的一個開源的消息隊列,本身支持很多的協議:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它變的非常重量級,更適合於企業級的開發。同時實現了一個經紀人(Broker)構架,這意味着消息在發送給客戶端時先在中 心隊列排隊。對路由(Routing),負載均衡(Load balance)或者數據持久化都有很好的支持。

Ps: 結合erlang語言本身的並發優勢,性能較好,但是不利於做二次開發和維護

Redis

是一個Key-Value的NoSQL數據庫,開發維護很活躍,雖然它是一個Key-Value數據庫存儲系統,但它本身支持MQ功能,所以完全可 以當做一個輕量級的隊列服務來使用。對於RabbitMQ和Redis的入隊和出隊操作,各執行100萬次,每10萬次記錄一次執行時間。測試數據分為 128Bytes、512Bytes、1K和10K四個不同大小的數據。實驗表明:入隊時,當數據比較小時Redis的性能要高於RabbitMQ,而如 果數據大小超過了10K,Redis則慢的無法忍受;出隊時,無論數據大小,Redis都表現出非常好的性能,而RabbitMQ的出隊性能則遠低於 Redis。

 

                     入隊

                     出隊

 

128B

512B

1K

10K

128B

512B

1K

10K

Redis

16088

15961

17094

25

15955

20449

18098

9355

RabbitMQ

10627

9916

9370

2366

3219

3174

2982

1588

Ps: 做為一個基於內存的K-V數據庫,其提供了消息訂閱的服務,可以當作MQ來使用,目前應用案例較少,且不方便擴展

ZeroMQ

號稱最快的消息隊列系統,尤其針對大吞吐量的需求場景。ZMQ能夠實現RabbitMQ不擅長的高級/復雜的隊列,但是開發人員需要自己組合多種技 術框架,技術上的復雜度是對這MQ能夠應用成功的挑戰。ZeroMQ具有一個獨特的非中間件的模式,你不需要安裝和運行一個消息服務器或中間件,因為你的 應用程序將扮演了這個服務角色。你只需要簡單的引用ZeroMQ程序庫,可以使用NuGet安裝,然后你就可以愉快的在應用程序之間發送消息了。但是 ZeroMQ僅提供非持久性的隊列,也就是說如果down機,數據將會丟失。其中,Twitter的Storm中使用ZeroMQ作為數據流的傳輸。

Ps: 擴展性好,開發比較靈活,采用C語言實現,實際上他只是一個socket庫的重新封裝,如果我們做為消息隊列使用,需要開發大量的代碼

ActiveMQ

是Apache下的一個子項目。 類似於ZeroMQ,它能夠以代理人和點對點的技術實現隊列。同時類似於RabbitMQ,它少量代碼就可以高效地實現高級應用場景。RabbitMQ、 ZeroMQ、ActiveMQ均支持常用的多種語言客戶端 C++、Java、.Net,、Python、 Php、 Ruby等。

Ps: 歷史悠久的開源項目,已經在很多產品中得到應用,實現了JMS1.1規范,可以和spring-jms輕松融合,實現了多種協議,不夠輕巧(源代碼比RocketMQ多).支持持久化到數據庫,對隊列數較多的情況支持不好.

Jafka/Kafka

Kafka是Apache下的一個子項目,是一個高性能跨語言分布式Publish/Subscribe消息隊列系統,而Jafka是在Kafka 之上孵化而來的,即Kafka的一個升級版。具有以下特性:快速持久化,可以在O(1)的系統開銷下進行消息持久化;高吞吐,在一台普通的服務器上既可以 達到10W/s的吞吐速率;完全的分布式系統,Broker、Producer、Consumer都原生自動支持分布式,自動實現復雜均衡;支持 Hadoop數據並行加載,對於像Hadoop的一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka通過 Hadoop的並行加載機制來統一了在線和離線的消息處理,這一點也是本課題所研究系統所看重的。Apache Kafka相對於ActiveMQ是一個非常輕量級的消息系統,除了性能非常好之外,還是一個工作良好的分布式系統。

RocketMQ

阿里巴巴的MQ中間件,在其多個產品下使用,並能夠撐住雙十一的大流量,他並沒有實現JMS規范,使用起來很簡單。部署由一個 命名服務(nameserver)和一個代理(broker)組成,nameserver和broker以及producer都支持集群,隊列的容量受機器硬盤的限制,隊列滿后可以支持持久化到硬盤(也可以自己適配代碼,將其持久化到NOSQL數據庫中),隊列滿后會影響吞吐量,可以采用主備來保證穩定性,支持回溯消費,可以在broker端進行消息過濾.

其他一些隊列列表HornetQ、Apache Qpid、Sparrow、Starling、Kestrel、Beanstalkd、Amazon SQS就不再一一分析。

比較ActiveMQ and RocketMQ

  RocketMQ ActiveMQ   
優先級 需要新建一個特殊隊列來接收優先級高的隊列,無法實現從0-65535這種細粒度的控制 可以精細控制  
順序 可以保證嚴格的消費順序 無法保證嚴格的順序  
持久化 支持 支持  
穩定性 更高    
消息過濾 RocketMQ可以在broker端進行過濾,對於我們的消息總線,這里可以節省大量的網絡傳輸是否會有消息重發造成的重復消費:RocketMQ可以保證,ActiveMQ無法保證 僅支持在客戶端消費的時候進行判斷是否是自己需要的消息  
回溯消費 支持 不支持 即重新將某一個時刻之前的消息重新消費一遍
事務 支持 支持  
定時消費 支持 不支持  
消息堆積 更優   就是當緩存消息的內存滿了之后的解決方案,一種是丟棄策略,這種不會影響吞吐量,還有一種就是將消息持久化到磁盤,這種會影響吞吐量
客戶端不在線 RocketMQ可以在客戶端上線后繼續將未消費的消息推送到客戶端    

比較主流的MQ:

  ActiveMQ RabbitMQ RocketMq ZeroMQ
關注度  
成熟度   成熟 成熟 比較成熟 不成熟
所屬社區/公司 Apache  Mozilla Public License Alibaba      
社區活躍度  
文檔  
特點   功能齊全,被大量開源項目使用 由於Erlang 語言的並發能力,性能很好    各個環節分布式擴展設計,主從 HA;支持上萬個隊列;多種消費模式;性能很好 低延時,高性能,最高 43萬條消息每秒  
授權方式   開源 開源 開源 開源
開發語言   Java Erlang   Java   C
支持的協議   OpenWire、STOMP、REST、XMPP、AMQP AMQP   自己定義的一套(社區提供JMS--不成熟) TCP、UDP
客戶端支持語言   Java、C、C++、Python、PHP、Perl、.net 等  Java、C、C++、Python、 PHP、Perl 等 Java、C++(不成熟)   python、 java、 php、.net 等
持久化   內存、文件、數據庫 內存、文件 磁盤文件 在消息發送端保存
事務   支持 不支持 支持 不支持
集群   支持 支持 支持 不支持
負載均衡 支持 支持 支持 不支持
管理界面   一般 無社區有 webconsole   實現
部署方式   獨立、嵌入 獨立 獨立 獨立
評價   優點:成熟的產品,已經在很多公司得到應用(非大規模場景)。有較多的文檔。各種協議支持較好,有多重語言的成熟的客戶端;
缺點:根據其他用戶反饋,會出莫名其妙的問題,切會丟失消息。其重心放到activemq6.0 產品—apollo上去了,目前社區不活躍,且對5.x維護較少;
Activemq不適合用於上千個隊列的應用場景
優點:由於erlang語言的特性,mq性能較好;管理界面較豐富,在互聯網公司也有較大規模的應用;支持amqp系誒,有多中語言且支持amqp的客戶端可用
缺點:erlang語言難度較大。集群不支持動態擴展。
優點:模型簡單,接口易用(JMS的接口很多場合並不太實用)。在阿里大規模應用。目前支付寶中的余額寶等新興產品均使用rocketmq。集群規模大概在50 台左右,單日處理消息上百億;性能非常好,可以大量堆積消息在broker中;支持多種消費,包括集群消費、廣播消費等。開發度較活躍,版本更新很快。
缺點:沒有在mq核心中去實現JMS等接口,
 

 

 實例(簡單的實戰)

   ActiveMQ入門實例:

    1.去官方網站下載:http://activemq.apache.org/
    2.運行ActiveMQ 解壓縮apache-activemq-5.8.0-bin.zip,然后雙擊apache-activemq-5.5.1\bin\activemq.bat運行ActiveMQ程序。
      啟動ActiveMQ以后,登陸:http://localhost:8161/admin/,創建一個Queue,命名為FirstQueue。
    3.創建Eclipse項目並運行
    創建java project:ActiveMQ,新建lib文件夾,導入如下Jar包

    activemq-broker-5.8.0.jar 、activemq-client-5.8.0.jar 、geronimo-j2ee-management_1.1_spec-1.0.1.jar 、geronimo-jms_1.1_spec-1.1.1.jar 、slf4j-api-1.6.6.jar

    創建類如下類:
    

 

import javax.jms.Connection;  
import javax.jms.ConnectionFactory;  
import javax.jms.DeliveryMode;  
import javax.jms.Destination;  
import javax.jms.MessageProducer;  
import javax.jms.Session;  
import javax.jms.TextMessage;  
import org.apache.activemq.ActiveMQConnection;  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
public class Sender {  
    private static final int SEND_NUMBER = 5;  
  
    public static void main(String[] args) {  
        // ConnectionFactory :連接工廠,JMS 用它創建連接  
        ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS  
        // Provider 的連接  
        Connection connection = null; // Session: 一個發送或接收消息的線程  
        Session session; // Destination :消息的目的地;消息發送給誰.  
        Destination destination; // MessageProducer:消息發送者  
        MessageProducer producer; // TextMessage message;  
        // 構造ConnectionFactory實例對象,此處采用ActiveMq的實現jar  
        connectionFactory = new ActiveMQConnectionFactory(  
                ActiveMQConnection.DEFAULT_USER,  
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");  
        try { // 構造從工廠得到連接對象  
            connection = connectionFactory.createConnection();  
            // 啟動  
            connection.start();  
            // 獲取操作連接  
            session = connection.createSession(Boolean.TRUE,  
                    Session.AUTO_ACKNOWLEDGE);  
            // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置  
            destination = session.createQueue("FirstQueue");  
            // 得到消息生成者【發送者】  
            producer = session.createProducer(destination);  
            // 設置不持久化,此處學習,實際根據項目決定  
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
            // 構造消息,此處寫死,項目就是參數,或者方法獲取  
            sendMessage(session, producer);  
            session.commit();  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                if (null != connection)  
                    connection.close();  
            } catch (Throwable ignore) {  
            }  
        }  
    }  
  
    public static void sendMessage(Session session, MessageProducer producer)  
            throws Exception {  
        for (int i = 1; i <= SEND_NUMBER; i++) {  
            TextMessage message = session.createTextMessage("ActiveMq 發送的消息"  + i);  
            // 發送消息到目的地方  
            System.out.println("發送消息:" + "ActiveMq 發送的消息" + i);  
            producer.send(message);  
        }  
    }  
}  

 

 

import javax.jms.Connection;  
import javax.jms.ConnectionFactory;  
import javax.jms.Destination;  
import javax.jms.MessageConsumer;  
import javax.jms.Session;  
import javax.jms.TextMessage;  
import org.apache.activemq.ActiveMQConnection;  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
public class Receiver {  
    public static void main(String[] args) {  
        // ConnectionFactory :連接工廠,JMS 用它創建連接  
        ConnectionFactory connectionFactory;  
        // Connection :JMS 客戶端到JMS Provider 的連接  
        Connection connection = null;  
        // Session: 一個發送或接收消息的線程  
        Session session;  
        // Destination :消息的目的地;消息發送給誰.  
        Destination destination;  
        // 消費者,消息接收者  
        MessageConsumer consumer;  
        connectionFactory = new ActiveMQConnectionFactory(  
                ActiveMQConnection.DEFAULT_USER,  
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");  
        try {  
            // 構造從工廠得到連接對象  
            connection = connectionFactory.createConnection();  
            // 啟動  
            connection.start();  
            // 獲取操作連接  
            session = connection.createSession(Boolean.FALSE,  
                    Session.AUTO_ACKNOWLEDGE);  
            // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置  
            destination = session.createQueue("FirstQueue");  
            consumer = session.createConsumer(destination);  
            while (true) {  
                // 設置接收者接收消息的時間,為了便於測試,這里誰定為100s  
                TextMessage message = (TextMessage) consumer.receive(100000);  
                if (null != message) {  
                    System.out.println("收到消息" + message.getText());  
                } else {  
                    break;  
                }  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                if (null != connection)  
                    connection.close();  
            } catch (Throwable ignore) {  
            }  
        }  
    }  
}  

 

 

IBM WebSphere MQ介紹安裝以及配置服務詳解(鏈接)

 

關於消息隊列與分布式的那些事

  消息隊列技術是分布式應用間交換信息的一種技術。消息隊列可駐留在內存或磁盤上,隊列存儲消息直到它們被應用程序讀走。通過消息隊列,應用程序可獨立地執行--它們不需要知道彼此的位置、或在繼續執行前不需要等待接收程序接收此消息。在分布式計算環境中,為了集成分布式應用,開發者需要對異構網絡環境下的分布式應用提供有效的通信手段。為了管理需要共享的信息,對應用提供公共的信息交換機制是重要的。

 

設計分布式應用的方法主要有:

遠程過程調用(PRC)--分布式計算環境(DCE)的基礎標准成分之一;

對象事務監控(OTM)--基於CORBA的面向對象工業標准與事務處理(TP)監控技術的組合;

消息隊列(MessageQueue)--構造分布式應用的松耦合方法。

 


  (a) 分布計算環境/遠程過程調用 (DCE/RPC)

 

  RPC是DCE的成分,是一個由開放軟件基金會(OSF)發布的應用集成的軟件標准。RPC模仿一個程序用函數引用來引用另一程序的傳統程序設計方法,此引用是過程調用的形式,一旦被調用,程序的控制則    轉向被調用程序。

 

  在RPC實現時,被調用過程可在本地或遠地的另一系統中駐留並在執行。當被調用程序完成處理輸入數據,結果放在過程調用的返回變量中返回到調用程序。RPC完成后程序控制則立即返回到調用程序。因此    RPC模仿子程序的調用/返回結構,它僅提供了Client(調用程序)和Server(被調用過程)間的同步數據交換。

 


  (b) 對象事務監控 (OTM)

 

  基於CORBA的面向對象工業標准與事務處理(TP)監控技術的組合,在CORBA規范中定義了:使用面向對象技術和方法的體系結構;公共的Client/Server程序設計接口;多平台間傳輸和翻譯數據的指導方     針;開發分布式應用接口的語言(IDL)等,並為構造分布的Client/Server應用提供了廣泛及一致的模式。

 


  (c) 消息隊列 (Message Queue)

 

  消息隊列為構造以同步或異步方式實現的分布式應用提供了松耦合方法。消息隊列的API調用被嵌入到新的或現存的應用中,通過消息發送到內存或基於磁盤的隊列或從它讀出而提供信息交換。消息隊列可用    在應用中以執行多種功能,比如要求服務、交換信息或異步處理等。

 

中間件是一種獨立的系統軟件或服務程序,分布式應用系統借助這種軟件在不同的技術之間共享資源,管理計算資源和網絡通訊。它在計算機系統中是一個關鍵軟件,它能實現應用的互連和互操作性,能保證    系統的安全、可靠、高效的運行。中間件位於用戶應用和操作系統及網絡軟件之間,它為應用提供了公用的通信手段,並且獨立於網絡和操作系統。中間件為開發者提供了公用於所有環境的應用程序接口,當    應用程序中嵌入其函數調用,它便可利用其運行的特定操作系統和網絡環境的功能,為應用執行通信功能。

 

如果沒有消息中間件完成信息交換,應用開發者為了傳輸數據,必須要學會如何用網絡和操作系統軟件的功能,編寫相應的應用程序來發送和接收信息,且交換信息沒有標准方法,每個應用必須進行特定的編程從而和多平台、不同環境下的一個或多個應用通信。例如,為了實現網絡上不同主機系統間的通信,將要求具備在網絡上如何交換信息的知識(比如用TCP/IP的socket程序設計);為了實現同一主機內不同進程之間的通訊,將要求具備操作系統的消息隊列或命名管道(Pipes)等知識。

 

  目前中間件的種類很多,如交易管理中間件(如IBM的CICS)、面向Java應用的Web應用服務器中間件(如IBM的WebSphere Application Server)等,而消息傳輸中間件(MOM)是其中的一種。它簡化了應用之間數據的傳輸,屏蔽底層異構操作系統和網絡平台,提供一致的通訊標准和應用開發,確保分布式計算網絡環境下可靠的、跨平台的信息傳輸和數據交換。它基於消息隊列的存儲-轉發機制,並提供特有的異步傳輸機制,能夠基於消息傳輸和異步事務處理實現應用整合與數據交換。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM