RabbitMQ 概念與Java例子


RabbitMQ簡介

目前RabbitMQ是AMQP 0-9-1(高級消息隊列協議)的一個實現,使用Erlang語言編寫,利用了Erlang的分布式特性。


概念介紹:

  1. Broker:簡單來說就是消息隊列服務器實體。
  2. Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
  3. Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
  4. Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
  5. Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
  6. vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
  7. producer:消息生產者,就是投遞消息的程序。
  8. consumer:消息消費者,就是接受消息的程序。
  9. channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。

 

使用流程

AMQP模型中,消息在producer中產生,發送到MQ的exchange上,exchange根據配置的路由方式發到相應的Queue上,Queue又將消息發送給consumer,消息從queue到consumer有push和pull兩種方式。 消息隊列的使用過程大概如下:

  1. 客戶端連接到消息隊列服務器,打開一個channel。
  2. 客戶端聲明一個exchange,並設置相關屬性。
  3. 客戶端聲明一個queue,並設置相關屬性。
  4. 客戶端使用routing key,在exchange和queue之間建立好綁定關系。
  5. 客戶端投遞消息到exchange。

exchange接收到消息后,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。 exchange也有幾個類型,完全根據key進行投遞的叫做Direct交換機,例如,綁定時設置了routing key為”abc”,那么客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。

Exchange類型

Exchange路由消息的集中類型:

名稱

默認的預先定義exchange名字

作用描述

Direct exchange

(Empty string) and amq.direct

根據Binding指定的Routing Key,將符合Key的消息發送到BindingQueue

Fanout exchange

amq.fanout

將同一個message發送到所有同該Exchange bingdingqueue

Topic exchange

amq.topic

根據Binding指定的Routing KeyExchangekey進行模式匹配后路由到相應的Queue,模式匹配時符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。

Headers exchange

amq.match (and amq.headers in RabbitMQ)

direct exchange類似,不同之處是不再使用Routing Key路由,而是使用headersmessage attributes)進行匹配路由到指定Queue

參考:

http://www.choudan.net/2013/07/25/OpenStack-RabbitMQ(%E4%B8%80).html

http://stephansun.iteye.com/blog/1452853

http://www.diggerplus.org/archives/3110

http://backend.blog.163.com/blog/static/202294126201322563245975/

http://lynnkong.iteye.com/blog/1699684

 

特性:

  1. broker的持久化:exchange和queue聲明為durable時,exchange和queue的配置會在服務端磁盤保存起來,這樣在服務停掉重啟后,exchange和queue以及其相應的binding等配置不會丟失;
  2. message的持久化:當message的deliver mode attribute(message properties)設置為2時,每個未被消費的message將被保存在磁盤中,在服務重啟后仍能保存。
    message在文件中的保存參考:http://my.oschina.net/hncscwc/blog/182083
  3. cluster:RabbitMQ支持多個nodes(每個nodes是一個RabbitMQ實例)組成一個cluster,訪問cluster中的任意一個node的效果是相同的,也就是說任何一個message都可以在任意一個nodes上生產和消費(生產或消費的message會在nodes間中轉)。
  4. mirrored-queue:RabbitMQ在cluster的基礎上,支持同一個queue的message同時存儲在多個nodes上,這樣當部分節點失效時,可保證message和broker的配置不丟失。


安裝與配置

1.安裝erlang

sudo apt-get install tk tcl unixODBC erlang 
sudo vim /etc/profile
添加export PATH=$PATH:/usr/lib/erlang/bin/

2.安裝rabbitmq

sudo apt-get install rabbitmq-server
sudo vim /etc/profile 添加export PATH=$PATH:/usr/lib/rabbitmq/bin
source /etc/profile

rabbitmq的基本配置(端口等)參考:http://my.oschina.net/hncscwc/blog/302339

3.用戶與權限

在正式應用之前,我們先在RabbitMQ里創建一個vhost,加一個用戶,並設置該用戶的權限。使用rabbitmqctl客戶端工具,在根目錄下創建”/mq_test”這個vhost:

rabbitmqctl add_vhost /mq_test

創建一個用戶名”test,設置密碼”test123″:

rabbitmqctl add_user test test123

設置pyh用戶對/pyhtest這個vhost擁有全部權限:

rabbitmqctl set_permissions -p /mq_test test “.*” “.*” “.*”、

后面三個”*”代表pyh用戶擁有對/pyhtest的配置、寫、讀全部權限


參考:http://my.oschina.net/hncscwc/blog/262246

 

4.配置開啟web管理插件

cat <<EOF>> /etc/rabbitmq/enabled_plugins
[rabbitmq_management]. 
EOF

可以通過http://localhost:15672/ 查看運行情況

 

5.啟動

使用root權限運行rabbitmq-server 或使用/etc/init.d/rabbitmq-server start|restart|stop

 

6.在一台機器上啟動多個節點(模擬集群)

vim start_rabbitmq_cluster.sh

添加以下內容

#!/bin/bash

if [[ $# != 1 ]]
then
    echo "Usage: $0 process_num"
    exit 1
fi

HOST_NAME=`hostname`
START_NUM=0
PROCESS_NUM=$1
END_NUM=$(( START_NUM + PROCESS_NUM - 1))

RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME="rabbit" RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15672}]" rabbitmq-server -detached

for (( i=$((START_NUM+1)); i<=$END_NUM; i++  ))
do
    RABBITMQ_PROT=$(( i + 5672 ))
    MANAGE_PORT=$(( i + 15672  ))
    NODE_NAME="rabbit_$i"
    echo $RABBITMQ_PROT
    echo $MANAGE_PORT
    echo $NODE_NAME
    RABBITMQ_NODE_PORT=$RABBITMQ_PROT RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,$MANAGE_PORT}]" RABBITMQ_NODENAME=$NODE_NAME rabbitmq-server -detached
    sleep 3
    
    rabbitmqctl -n $NODE_NAME stop_app
    rabbitmqctl -n $NODE_NAME reset
    echo "join cluster"
    rabbitmqctl -n $NODE_NAME join_cluster rabbit@$HOST_NAME
    rabbitmqctl -n $NODE_NAME start_app
done

rabbitmqctl cluster_status -n rabbit

運行

chmod a+x start_rabbitmq_cluster.sh
start_rabbitmq_cluster.sh 3

啟動后可以通過rabbitmqctl -n rabbit cluster_status查看集群節點配置情況,或者在web管理頁面中查看

 

7.在多台機器上建立集群

首先在主節點上啟動服務

然后將其他機器的rabbitmq加入集群

1.將主服務器的/var/log/rabbitmq/.erlang.cookie 拷貝到新節點
2.在新節點上將文件所有人更改為rabbitmq,注意保持文件權限為所有者只讀,其他人無權限
chown rabbitmq.rabbitmq /var/log/rabbitmq/.erlang.cookie
3.在新節點上加入集群、
/etc/init.d/rabbitmq-server start
rabbitmqctl -n rabbit stop_app
rabbitmqctl -n rabbit reset
rabbitmqctl -n rabbit join_cluster  rabbit@$MASTER_NODE
rabbitmqctl -n rabbit start_app

 

 

8.配置network partion時的處理方式

cat<<EOF>> /usr/local/rabbitmq/rabbitmq_server-3.1.0/etc/rabbitmq/rabbitmq.conf 
[
{rabbit, [{cluster_partition_handling, pause_minority}]}
].
EOF

參考:http://my.oschina.net/hncscwc/blog/174417

 

Java代碼示例

首先在項目中添加maven依賴

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.2.2</version>
        </dependency>
    </dependencies>

 

Producer

import com.rabbitmq.client.*;
import com.sun.deploy.util.StringUtils;

import java.io.IOException;
import java.lang.String;
import java.lang.System;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;

public class Producer {
    //exchange type
    public enum XT {
        DEFAULT, DIRECT, TOPIC, HEADERS, FANOUT
    }

    private static final String QUEUE_NAME = "log2";

    public static void main(String[] args) throws IOException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost"); //使用默認端口連接本地rabbitmq服務器

        Connection connection = factory.newConnection(); //聲明一個連接
        Channel channel = connection.createChannel(); //聲明消息通道

        //exchange類型 參考:http://stephansun.iteye.com/blog/1452853
        XT xt = XT.HEADERS;
        switch (xt) {
            case DEFAULT: //默認,向指定的隊列發送消息,消息只會被一個consumer處理,多個消費者消息會輪訓處理,消息發送時如果沒有consumer,消息不會丟失
                //為消息通道綁定一個隊列
                //隊列的相關參數需要與第一次定義該隊列時相同,否則會出錯
                //參數1:隊列名稱
                //參數2:為true時server重啟隊列不會消失
                //參數3:隊列是否是獨占的,如果為true只能被一個connection使用,其他連接建立時會拋出異常
                //參數4:隊列不再使用時是否自動刪除(沒有連接,並且沒有未處理的消息)
                //參數5:建立隊列時的其他參數
                channel.queueDeclare(QUEUE_NAME, true, false, true, null);

                while (GetInputString()) {
                    //向server發布一條消息
                    //參數1:exchange名字,若為空則使用默認的exchange
                    //參數2:routing key
                    //參數3:其他的屬性
                    //參數4:消息體
                    //RabbitMQ默認有一個exchange,叫default exchange,它用一個空字符串表示,它是direct exchange類型,
                    //任何發往這個exchange的消息都會被路由到routing key的名字對應的隊列上,如果沒有對應的隊列,則消息會被丟棄
                    channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); //設置消息為持久化,服務器重啟不會丟失

                    System.out.println("Send " + message);
                }
                break;
            case FANOUT:
                //廣播給所有隊列  接收方也必須通過fanout交換機獲取消息,所有連接到該交換機的consumer均可獲取消息
                //如果producer在發布消息時沒有consumer在監聽,消息將被丟棄


                //定義一個交換機
                //參數1:交換機名稱
                //參數2:交換機類型
                //參數3:交換機持久性,如果為true則服務器重啟時不會丟失
                //參數4:交換機在不被使用時是否刪除
                //參數5:交換機的其他屬性
                channel.exchangeDeclare(XCHG_NAME, "fanout", true, true, null);

                while (GetInputString()) {
                    //發送一條廣播消息,參數2此時無意義
                    channel.basicPublish(XCHG_NAME, "", null, message.getBytes());

                    System.out.println("Send " + message);
                }
                break;
            case DIRECT:
                //向所有綁定了相應routing key的隊列發送消息
                //如果producer在發布消息時沒有consumer在監聽,消息將被丟棄
                //如果有多個consumer監聽了相同的routing key  則他們都會受到消息

                channel.exchangeDeclare(XCHG_NAME, "direct", true, true, null);

                while (GetInputString()) {
                    //input like : info message
                    String[] temp = StringUtils.splitString(message, " ");
                    channel.basicPublish(XCHG_NAME, temp[0], null, temp[1].getBytes());
                    System.out.println("Send " + message);
                }
                break;
            case TOPIC:
                //與direct模式有類似之處,都使用routing key作為路由
                //不同之處在於direct模式只能指定固定的字符串,而topic可以指定一個字符串模式

                channel.exchangeDeclare(XCHG_NAME, "topic", true, true, null);
                while (GetInputString()) {
                    //input like : topic message
                    String[] temp = StringUtils.splitString(message, " ");
                    channel.basicPublish(XCHG_NAME, temp[0], null, temp[1].getBytes());
                    System.out.println("Send " + message);
                }
                break;
            case HEADERS:
                //與topic和direct有一定相似之處,但不是通過routing key來路由消息
                //通過headers中詞的匹配來進行路由

                channel.exchangeDeclare(XCHG_NAME, "headers", true, true, null);
                while (GetInputString()) {
                    //input like : headers message
                    String[] temp = StringUtils.splitString(message, " ");

                    Map<String, Object> headers = new HashMap<String, Object>();
                    headers.put("name", temp[0]); //定義headers
                    headers.put("sex", temp[1]);
                    AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder().headers(headers);

                    channel.basicPublish(XCHG_NAME, "", builder.build(), temp[2].getBytes()); //根據headers路由到相應的consumer
                    System.out.println("Send " + message);
                }
                break;
        }
        channel.close();
        connection.close();
    }

    private static boolean GetInputString() {
        message = scanner.nextLine();
        if (message.length() == 0) return false;
        return true;
    }

    private static Scanner scanner = new Scanner(System.in);
    private static String message = "";
    public static String XCHG_NAME = "xchg3";
}

 

Consumer

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class Consumer {
    private static final String QUEUE_NAME = "log2";

    public static void main(String[] args) throws IOException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String queueName = QUEUE_NAME;

        Producer.XT xt = Producer.XT.HEADERS;

        switch (xt) {
            case DEFAULT:
                //隊列的相關參數需要與第一次定義該隊列時相同,否則會出錯,使用channel.queueDeclarePassive()可只被動綁定已有隊列,而不創建
                channel.queueDeclare(queueName, true, false, true, null);
                break;
            case FANOUT:
                //接收端也聲明一個fanout交換機
                channel.exchangeDeclare(Producer.XCHG_NAME, "fanout", true, true, null);
                //channel.exchangeDeclarePassive() 可以使用該函數使用一個已經建立的exchange
                //聲明一個臨時隊列,該隊列會在使用完比后自動銷毀
                queueName = channel.queueDeclare().getQueue();
                //將隊列綁定到交換機,參數3無意義此時
                channel.queueBind(queueName, Producer.XCHG_NAME, "");
                break;
            case DIRECT:
                channel.exchangeDeclare(Producer.XCHG_NAME, "direct", true, true, null);
                queueName = channel.queueDeclare().getQueue();
                channel.queueBind(queueName, Producer.XCHG_NAME, "info"); //綁定一個routing key,可以綁定多個
                channel.queueBind(queueName, Producer.XCHG_NAME, "warning");
                break;
            case TOPIC:
                channel.exchangeDeclare(Producer.XCHG_NAME, "topic", true, true, null);
                queueName = channel.queueDeclare().getQueue();
                channel.queueBind(queueName, Producer.XCHG_NAME, "warning.#"); //監聽兩種模式 #匹配一個或多個單詞 *匹配一個單詞
                channel.queueBind(queueName, Producer.XCHG_NAME, "*.blue");
                break;
            case HEADERS:
                channel.exchangeDeclare(Producer.XCHG_NAME, "headers", true, true, null);
                queueName = channel.queueDeclare().getQueue();
                Map<String, Object> headers = new HashMap<String, Object>() {{
                    put("name", "test");
                    put("sex", "male");
                    put("x-match", "any");//all==匹配所有條件,any==匹配任意條件
                }};
                channel.queueBind(queueName, Producer.XCHG_NAME, "", headers);
                break;
        }

        // 在同一時間不要給一個worker一個以上的消息。
        // 不要將一個新的消息分發給worker知道它處理完了並且返回了前一個消息的通知標志(acknowledged)
        // 替代的,消息將會分發給下一個不忙的worker。
        channel.basicQos(1); //server push消息時的隊列長度

        //用來緩存服務器推送過來的消息
        QueueingConsumer consumer = new QueueingConsumer(channel);

        //為channel聲明一個consumer,服務器會推送消息
        //參數1:隊列名稱
        //參數2:是否發送ack包,不發送ack消息會持續在服務端保存,直到收到ack。  可以通過channel.basicAck手動回復ack
        //參數3:消費者
        channel.basicConsume(queueName, false, consumer);
        //channel.basicGet() //使用該函數主動去服務器檢索是否有新消息,而不是等待服務器推送

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            System.out.println("Received " + new String(delivery.getBody()));

            //回復ack包,如果不回復,消息不會在服務器刪除
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            //channel.basicReject(); channel.basicNack(); //可以通過這兩個函數拒絕消息,可以指定消息在服務器刪除還是繼續投遞給其他消費者
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM