RabbitMQ消息隊列入門篇(環境配置+Java實例+基礎概念)


一、消息隊列使用場景或者其優點

消息隊列通常是在項目中,將一些無需即時返回且耗時的操作提取出來。進行了異步處理,而這種異步處理的方式大大的節省了server的請求響應時間,從而提高了系統的吞吐量。

在項目啟動之初來預測將來項目會碰到什么需求,是極其困難的。消息隊列在處理過程中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。

這同意你獨立的擴展或改動兩邊的處理過程。僅僅要確保它們遵守相同的接口約束。

消息隊列能夠解決這樣一個問題,也就是其解耦性。解耦伴隨的優點就是減少冗余,靈活,易於擴展。

峰值處理能力:當你的應用上了Hacker News的首頁。你將發現訪問流量攀升到一個不同平常的水平。在訪問量劇增的情況下。你的應用仍然須要繼續發揮作用,可是這種突發流量並不常見;假設為以能處理這類峰值訪問為標准來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住增長的訪問壓力,而不是由於超出負荷的請求而全然崩潰。
消息隊列還有可恢復性、異步通信、緩沖………等各種優點,在此不一一介紹,用到自然理解。

二、RabbitMQ來源

RabbitMQ是用Erlang實現的一個高並發高可靠AMQP消息隊列server。

顯然,RabbitMQ跟Erlang和AMQP有關。以下簡介一下Erlang和AMQP。

Erlang是一門動態類型的函數式編程語言,它也是一門解釋型語言,由Erlang虛擬機解釋執行。從語言模型上說。Erlang是基於Actor模型的實現。在Actor模型里面。萬物皆Actor。每一個Actor都封裝着內部狀態,Actor相互之間僅僅能通過消息傳遞這一種方式來進行通信。相應到Erlang里,每一個Actor相應着一個Erlang進程。進程之間通過消息傳遞進行通信。相比共享內存,進程間通過消息傳遞來通信帶來的直接優點就是消除了直接的鎖開銷(不考慮Erlang虛擬機底層實現中的鎖應用)。

AMQP(Advanced Message Queue Protocol)定義了一種消息系統規范。這個規范描寫敘述了在一個分布式的系統中各個子系統怎樣通過消息交互。

而RabbitMQ則是AMQP的一種基於erlang的實現。

AMQP將分布式系統中各個子系統隔離開來,子系統之間不再有依賴。子系統僅依賴於消息。子系統不關心消息的發送者。也不關心消息的接受者。

這里不必要對Erlang和AMQP作過於深入介紹。畢竟本文RabbitMQ才是主角哦。哈哈。以下直接看主角表演(實例)啦,至於主角的一些不得不深入介紹的點我們放到最后面。

三、RabbitMQ實例(Java)

3.1、環境配置

RabbitMQ的執行須要erlang的支持,因此我們先安裝erlang。


32位下載地址:http://www.erlang.org/download/otp_win64_18.2.1.exe
64位下載地址:http://www.erlang.org/download/otp_win32_18.2.1.exe
雙擊選擇默認安裝就好。

前面我們也講到RabbitMQ就是一個server,以下我們就安裝相應server。


下載地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.3.4/rabbitmq-server-3.3.4.exe
雙擊選擇默認安裝就好。安裝好之后須要啟動服務,cmd。進入到安裝文件夾的sbin文件夾下。命令例如以下:

cd C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.3.4\sbin
rabbitmq-server start

這里寫圖片描寫敘述

博主的之前啟動過了,所以報錯,假設你的也啟動了就沒問題了。

接下來自然是jar包依賴,本文project採用eclipse + maven,maven依賴例如以下:

<!-- rabbitmq相關依賴 -->
<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>3.0.4</version>
</dependency>
<!-- 序列化相關依賴 -->
<dependency>
    <groupId>commons-lang</groupId>
    <artifactId>commons-lang</artifactId>
    <version>2.6</version>
</dependency>

由於興許樣例里面實用到序列化的,因此加上序列化工具包相關依賴。

3.2、樣例一代碼和效果

新建發送者Send.java,代碼例如以下:

package com.luo.rabbit.test.one;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

    //隊列名稱 
    private final static String QUEUE_NAME = "queue";  

    public static void main(String[] argv) throws java.io.IOException  
    {  
        /** * 創建連接連接到MabbitMQ */  
        ConnectionFactory factory = new ConnectionFactory();  
        //設置MabbitMQ所在主機ip或者主機名 
        factory.setHost("127.0.0.1"); 
        //創建一個連接 
        Connection connection = factory.newConnection();  
        //創建一個頻道 
        Channel channel = connection.createChannel();  
        //指定一個隊列 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        //發送的消息 
        String message = "hello world!";  
        //往隊列中發出一條消息 
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
        System.out.println("Sent '" + message + "'");  
        //關閉頻道和連接 
        channel.close();  
        connection.close();  
     }  
}

新建接收者Recv.java,代碼例如以下:

package com.luo.rabbit.test.one;

import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.QueueingConsumer; 

public class Recv  { 
    //隊列名稱  
    private final static String QUEUE_NAME = "queue"; 
    public static void main(String[] argv) throws java.io.IOException,
    java.lang.InterruptedException  
    {  
        //打開連接和創建頻道,與發送端一樣  
        ConnectionFactory factory = new ConnectionFactory();
        //設置MabbitMQ所在主機ip或者主機名  
        factory.setHost("127.0.0.1"); 
        Connection connection = factory.newConnection(); 
        Channel channel = connection.createChannel(); 
        //聲明隊列,主要為了防止消息接收者先執行此程序,隊列還不存在時創建隊列。

channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("Waiting for messages. To exit press CTRL+C"); //創建隊列消費者 QueueingConsumer consumer = new QueueingConsumer(channel); //指定消費隊列 channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { //nextDelivery是一個堵塞方法(內部實現事實上是堵塞隊列的take方法) QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("Received '" + message + "'"); } } }

分別執行這兩個類。先后順序沒有關系。先執行發送者再執行接收者,效果例如以下:

這里寫圖片描寫敘述

這里寫圖片描寫敘述

3.3、樣例二代碼和效果

樣例一可能通俗易懂,可是並非非常規范,而且有些缺陷,比方我要發送一個對象過去呢?以下看另外一個樣例:

首先建一個連接類。由於發送者和接收者的連接代碼都是一樣的。之后讓二者繼承這個連接類就可以。連接類代碼BaseConnector.java:

package com.luo.rabbit.test.two;

import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class BaseConnector {
    protected Channel channel;
    protected Connection connection;
    protected String queueName;
    public BaseConnector(String queueName) throws IOException{
        this.queueName = queueName;
        //打開連接和創建頻道
        ConnectionFactory factory = new ConnectionFactory();
        //設置MabbitMQ所在主機ip或者主機名 127.0.0.1即localhost
        factory.setHost("127.0.0.1");
        //創建連接 
        connection = factory.newConnection();
        //創建頻道 
        channel = connection.createChannel();
        //聲明創建隊列
        channel.queueDeclare(queueName, false, false, false, null);
    }
}

發送者Sender.java:

package com.luo.rabbit.test.two;

import java.io.IOException;
import java.io.Serializable;
import org.apache.commons.lang.SerializationUtils;

public class Sender extends BaseConnector {
    public Sender(String queueName) throws IOException {
        super(queueName);
    }

    public void sendMessage(Serializable object) throws IOException {
        channel.basicPublish("",queueName, null, SerializationUtils.serialize(object));
    }   
}

前面講過,我們想發送一個對象給接受者。因此,我們先新建一個對象,由於發送過程須要序列化。因此這里須要實現java.io.Serializable接口:

package com.luo.rabbit.test.two;

import java.io.Serializable;

public class MessageInfo implements Serializable {
    private static final long serialVersionUID = 1L;
    //渠道
    private String channel;
    //來源
    private String content;
    public String getChannel() {
        return channel;
    }
    public void setChannel(String channel) {
        this.channel = channel;
    }
    public String getContent() {
        return content;
    }
    public void setContent(String content) {
        this.content = content;
    }
}

關於序列化,這里小寶鴿就再嘮叨兩句,序列化就是將一個對象的狀態(各個屬性量)保存起來,然后在適當的時候再獲得。序列化分為兩大部分:序列化和反序列化。

序列化是這個過程的第一部分。將數據分解成字節流。以便存儲在文件里或在網絡上傳輸。反序列化就是打開字節流並重構對象。

對象序列化不僅要將基本數據類型轉換成字節表示,有時還要恢復數據。恢復數據要求有恢復數據的對象實例。

接收者代碼Receiver.java:

package com.luo.rabbit.test.two;

import java.io.IOException;
import org.apache.commons.lang.SerializationUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.ShutdownSignalException;

public class Receiver extends BaseConnector implements Runnable, Consumer {

    public Receiver(String queueName) throws IOException {
        super(queueName);
    }

    //實現Runnable的run方法
    public void run() {
         try {
            channel.basicConsume(queueName, true,this);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /** * 以下這些方法都是實現Consumer接口的 **/    
    //當消費者注冊完畢自己主動調用
    public void handleConsumeOk(String consumerTag) {
        System.out.println("Consumer "+consumerTag +" registered");
    }
    //當消費者接收到消息會自己主動調用
    public void handleDelivery(String consumerTag, Envelope env,
                BasicProperties props, byte[] body) throws IOException {
        MessageInfo messageInfo = (MessageInfo)SerializationUtils.deserialize(body);
        System.out.println("Message ( "
                + "channel : " + messageInfo.getChannel() 
                + " , content : " + messageInfo.getContent() 
                + " ) received.");

    }
    //以下這些方法能夠臨時不用理會
    public void handleCancelOk(String consumerTag) {
    }
    public void handleCancel(String consumerTag) throws IOException {
    }
    public void handleShutdownSignal(String consumerTag,
            ShutdownSignalException sig) {
    }
    public void handleRecoverOk(String consumerTag) {
    }
}

這里。接收者實現了。Runnable接口和com.rabbitmq.client.Consumer接口。

實現Runnable接口的目的是為了實現多線程。java實現多線程的方式有兩種:一種是繼承Thread類,一種是實現Runnable接口。詳情請看這篇文章:http://developer.51cto.com/art/201203/321042.htm

實現Consumer接口的目的是什么呢?猿友們應有看到實例一中的接收者代碼:

//指定消費隊列 
channel.basicConsume(QUEUE_NAME, true, consumer);

最后一個參數是須要傳遞com.rabbitmq.client.Consumer參數的,實現了Consumer接口之后我們僅僅須要傳遞this就好了。另外,Consumer有非常多方法。上面代碼除了構造方法和run方法(run是實現Runnable接口的),其它都是實現Consumer接口的,這些方法的詳細含義,大家能夠直接看com.rabbitmq.client.Consumer源代碼。

接下來就是測試類了Test.java:

package com.luo.rabbit.test.two;

public class Test {
    public static void main(String[] args) throws Exception{
        Receiver receiver = new Receiver("testQueue");
        Thread receiverThread = new Thread(receiver);
        receiverThread.start();
        Sender sender = new Sender("testQueue");
        for (int i = 0; i < 5; i++) {
            MessageInfo messageInfo = new MessageInfo();
            messageInfo.setChannel("test");
            messageInfo.setContent("msg" + i);
            sender.sendMessage(messageInfo);
        }
    }
}

執行效果:

這里寫圖片描寫敘述

記得執行完畢之后一定要把進程關掉,不然你每執行一次Test.java就會開啟一個進程。之后會出現什么問題呢?我是十分建議大家試試。會有驚喜哦,哈哈。驚喜就是,發送的消息會平均(數量平均)的出現到各個接收者的控制台。最好還是將發送的數量改大一點試試。

四、RabbitMQ使用的道具的詳細介紹

RabbitMQ是用Erlang實現的一個高並發高可靠AMQP消息隊列server。

Erlang就是RabbitMQ的一個依賴環境。這里沒什么好說的。我們更加關注它的一身表演技巧哪里來的,這里就看AMQP吧,看完AMQP之后預計你會對RabbitMQ的理解更加深刻。

開始吧
AMQP其中有四個概念非常重要:虛擬主機(virtual host)。交換機(exchange),隊列(queue)和綁定(binding)。一個虛擬主機持有一組交換機、隊列和綁定。為什么須要多個虛擬主機呢?非常easy,RabbitMQ其中。用戶僅僅能在虛擬主機的粒度進行權限控制。因此,假設須要禁止A組訪問B組的交換機/隊列/綁定,必須為A和B分別創建一個虛擬主機。每一個RabbitMQserver都有一個默認的虛擬主機“/”。假設這就夠了,那如今就能夠開始了。

交換機,隊列,還有綁定……天哪。
剛開始我思維的列車就是在這里脫軌的…… 這些鬼東西怎么結合起來的?

隊列(Queues)是你的消息(messages)的終點,能夠理解成裝消息的容器。

消息就一直在里面。直到有客戶端(也就是消費者,Consumer)連接到這個隊列而且將其取走為止。只是。你能夠將一個隊列配置成這種:一旦消息進入這個隊列,biu~。它就煙消雲散了。

這個有點跑題了……

須要記住的是,隊列是由消費者(Consumer)通過程序建立的。不是通過配置文件或者命令行工具。

這沒什么問題,假設一個消費者試圖創建一個已經存在的隊列,RabbitMQ就會起來拍拍他的腦袋,笑一笑。然后忽略這個請求。

因此你能夠將消息隊列的配置寫在應用程序的代碼里面。這個概念不錯。

OK。你已經創建而且連接到了你的隊列,你的消費者程序正在百無聊賴的敲着手指等待消息的到來,敲啊,敲啊…… 沒有消息。

發生了什么?你當然須要先把一個消息放進隊列才行。只是要做這個,你須要一個交換機(Exchange)……

交換機能夠理解成具有路由表的路由程序。僅此而已。每一個消息都有一個稱為路由鍵(routing key)的屬性,就是一個簡單的字符串。

交換機其中有一系列的綁定(binding),即路由規則(routes),比如,指明具有路由鍵 “X” 的消息要到名為timbuku的隊列其中去。先不討論這個,我們有點超前了。

你的消費者程序要負責創建你的交換機們(復數)。

啥?你是說你能夠有多個交換機?是的,這個能夠有。只是為啥?非常easy。每一個交換機在自己獨立的進程其中執行。因此添加多個交換機就是添加多個進程,能夠充分利用server上的CPU核以便達到更高的效率。比如,在一個8核的server上,能夠創建5個交換機來用5個核,另外3個核留下來做消息處理。相似的,在RabbitMQ的集群其中。你能夠用相似的思路來擴展交換機一邊獲取更高的吞吐量。

OK,你已經創建了一個交換機。可是他並不知道要把消息送到哪個隊列。你須要路由規則,即綁定(binding)。一個綁定就是一個相似這種規則:將交換機“desert(沙漠)”其中具有路由鍵“阿里巴巴”的消息送到隊列“hideout(山洞)”里面去。

換句話說,一個綁定就是一個基於路由鍵將交換機和隊列連接起來的路由規則。

比如,具有路由鍵“audit”的消息須要被送到兩個隊列,“log-forever”和“alert-the-big-dude”。要做到這個,就須要創建兩個綁定,每一個都連接一個交換機和一個隊列,兩者都是由“audit”路由鍵觸發。

在這種情況下。交換機會復制一份消息而且把它們分別發送到兩個隊列其中。

交換機只是就是一個由綁定構成的路由表。

如今復雜的東西來了:交換機有多種類型。

他們都是做路由的。只是接受不同類型的綁定。為什么不創建一種交換機來處理所有類型的路由規則呢?由於每種規則用來做匹配分子的CPU開銷是不同的。比如,一個“topic”類型的交換機試圖將消息的路由鍵與相似“dogs.*”的模式進行匹配。

匹配這種末端的通配符比直接將路由鍵與“dogs”比較(“direct”類型的交換機)要消耗很多其它的CPU。

假設你不須要“topic”類型的交換機帶來的靈活性,你能夠通過使用“direct”類型的交換機獲取更高的處理效率。

那么有哪些類型,他們又是怎么處理的呢?

上面這些都是參考另外一篇文章的,http://blog.ftofficer.com/2010/03/translation-rabbitmq-python-rabbits-and-warrens/,當然這篇文章的實例是Python的,可是我們不看他的實例。僅僅看他吹水的那部分,哈哈。

五、源代碼project下載

http://download.csdn.net/detail/u013142781/9396830

小寶鴿向來有個壞習慣,即便博客里面已經將所有代碼貼出來了,還是會提供源代碼project供大家下載。哈哈。

有些時候有些猿友常常會問,寫一篇博客非常花時間吧,我不能假裝跟你說不花時間。盡管花時間,可是當你看到方向,看到了目標,能夠將自己學習的東西分享出來。你就會非常有動力了。根本停不下來。

本博客自己查資料,建實例驗證,動手寫博客,約花了8個小時左右吧。只是當我了解到RabbitMQ的博大精深,這些時間都不是事,歡迎關注,盡管剛畢業半年,但小寶鴿會繼續將工作中遇到的技術點分享給大家。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM