RabbitMQ簡介


l  JAVA平台異步消息模塊

JAVA平台異步消息模塊,是一個針對RabbitMQ的消息發送及處理封裝,包含消息的配置、發送、接收、失敗重試、日志記錄等,總共分為4個部分:

1)RabbitMQ訪問封裝:JAMQP(Jar包)

2)消息模塊公共對象、配置讀取及接口定義:JMSG(Jar包)

3)消息發送端:JMSG—Client(Jar包)

4)消息接收端:JMSG—Server(War包)

l  RabbitMQ簡介

MQ是消費-生產者模型的一個典型的代表,一端往消息隊列中不斷的寫入消息,而另一端則可以讀取或者訂閱隊列中的消息。RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。RabbitMQ是 AMQP的標准實現。

RabbitMQ的結構圖如下:

 

Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。

Alternate exchange:發送給某Exchange的消息路由失敗時,發送至該exchange。

Dead letter exchange:死信Exchange,將超過一定時間的Queue中的消息發送至該exchange。

Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。

Binding:綁定,它的作用就是把Exchange和Queue按照路由規則綁定起來。

Routing Key:路由關鍵字,Exchange根據這個關鍵字進行消息投遞。

channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。

l  RabbitMQ封裝

1) RabbitConfig :所有RabbitMQ有關配置

2) RabbitConnectPool :RabbitMQ連接池,管理所有對RabbitMQ的連接

3) RabbitProxy :RabbitMQ連接的封裝,包含

4) RabbitSendProxy : 專門用於發送消息的連接,繼承自RabbitProxy

5) RabbitReceiveProxy :專門用於接收消息的連接,繼承自RabbitProxy

6) RabbitReceiverDispatcher : 消息分發器管理類,用於消息的接

7) 消息發送:從連接池RabbitConnectPool中Get出RabbitSendProxy,調用send方法發送消息,return獲得的RabbitSendProxy。發送失敗的消息記錄本地文件,由輪詢線程獲取消息后重試3次

8) 消息接收:服務啟動時向RabbitReceiverDispatcher注冊要監聽的隊列,每一個監聽對應一個線程以及一個處理消息的線程池,從RabbitMQ獲取到推送來的消息后,通過線程池並行處理

l  內部流程

n  消息發送邏輯

 

n  消息接收邏輯

 

l  配置說明

n  RabbitMQ配置表

n  消息配置表

1) MessageName : 消息名稱,MessageConfig表中唯一

2) Url : 消息的業務處理Http API地址

3) Priority : 消息優先級,必須寫成P1,P2,P3....P8,P9,用於RabbitMQ的RouteKey,發送消息時的最終RouteKey = MessageName.Priority

l  使用方法

n  發送消息

消息發送方法定義:

    /**

     * 發送消息

     * @param transferObject

     *  消息主體對象

     * @param customTag

     *  自定義消息標簽

     * @param messageName

     *  消息名稱

     */

    void send(Object transferObject, String customTag, String messageName);

在消息配置表中配置好消息的信息,引用JMSG和JMSG-Client 兩個Jar包,調用方法如下:

   public void SendMsg() {

              RabbitMQSender sender = new RabbitMQSender();

             sender.send(new Object(), "tag", "messageName");

      }

注意:傳輸的對象(transferObject)進行JSON序列化后,大小不能超過64K,否則會拋出異常

n  接收消息

1) 實現一個能夠接收Post請求的Http API,Post請求參數形式如下:

   NameValuePair[] data = {

         new NameValuePair("message", message.getTransferObjectJSON()),

         new NameValuePair("messageName", message.getMessageName()),

         new NameValuePair("tag", message.getCustomTag()),

};

 

2) 該API的返回值要求為JSON串,內容要求如下(responseCode:0表示處理成功,小於0表示系統異常,大於0表示業務異常):

"{exceptionMessgage:null, responseCode:0}"

3) 將該API的Url配置到MessageConsumersConfig表中

下面是一個簡單的實例:

     @Controller

      public class ConsumerController {

 

      private static final Logger logger = Logger.getLogger(ConsumerController.class);

 

      @RequestMapping(value = "testmsgconsumer", method=RequestMethod.POST)

      @ResponseBody

      public String LogMessage(HttpServletRequest request) {

         logger.info(request.getParameter("messageName") + "(" + request.getParameter("tag") + "): " + request.getParameter("message"));

         return "{exceptionMessgage:null, responseCode:0}";

      }

   }

RabbitMQ組成及原理介紹

rabbitmq作為成熟的企業消息中間件,實現了應用程序間接口調用的解耦,提高系統的吞吐量。

1.RabbitMQ組成

  • 是由 LShift 提供的一個 Advanced Message Queuing Protocol (AMQP) 的開源實現,由以高性能、健壯以及可伸縮性出名的 Erlang 寫成,因此也是繼承了這些優點。
  • AMQP 里主要要說兩個組件:Exchange 和 Queue (在 AMQP 1.0 里還會有變動),如下圖所示,綠色的 X 就是 Exchange ,紅色的是 Queue ,這兩者都在 Server 端,又稱作 Broker ,這部分是 RabbitMQ 實現的,而藍色的則是客戶端,通常有 Producer 和 Consumer 兩種類型:

 

 

  •  RabbitMQ Server 也叫broker server,它不是運送食物的卡車,而是一種傳輸服務。原話是RabbitMQ isn’t a food truck, it’s a delivery service. 他的角色就是維護一條從Producer到Consumer的路線,保證數據能夠按照指定的方式進行傳輸。但是這個保證也不是100%的保證,但是對於普通的應用來說這已經足夠了。當然對於商業系統來說,可以再做一層數據一致性的guard,就可以徹底保證系統的一致性了。
  • Client A & B: 也叫Producer,數據的發送方。create messages and publish (send) them to a broker server (RabbitMQ).一個Message有兩個部分:payload(有效載荷)和label(標簽)。payload顧名思義就是傳輸的數據。label是exchange的名字或者說是一個tag,它描述了payload,而且RabbitMQ也是通過這個label來決定把這個Message發給哪個Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個label的規則。
  • Client 1,2,3:也叫Consumer,數據的接收方。Consumers attach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一個有名字的郵箱。當有Message到達某個郵箱后,RabbitMQ把它發送給它的某個訂閱者即Consumer。當然可能會把同一個Message發送給很多的Consumer。在這個Message中,只有payload,label已經被刪掉了。對於Consumer來說,它是不知道誰發送的這個信息的。就是協議本身不支持。但是當然了如果Producer發送的payload包含了Producer的信息就另當別論了。

  對於一個數據從Producer到Consumer的正確傳遞,還有三個概念需要明確:exchanges, queues and bindings。

  • Exchanges are where producers publish their messages.
  • Queues are where the messages end up and are received by consumers
  • Bindings are how the messages get routed from the exchange to particular queues.

   還有幾個概念是上述圖中沒有標明的,那就是Connection(連接),Channel(通道,頻道)。

  • Connection: 就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。以后我們可以看到,程序的起始處就是建立這個TCP連接。
  • Channels: 虛擬連接。它建立在上述的TCP連接中。數據流動都是在Channel中進行的。也就是說,一般情況是程序起始建立TCP連接,第二步就是建立這個Channel。
  •  那么,為什么使用Channel,而不是直接使用TCP連接?

        對於OS來說,建立和關閉TCP連接是有代價的,頻繁的建立關閉TCP連接對於系統的性能有很大的影響,而且TCP的連接數也有限制,這也限制了系統處理高並發的能力。但是,在TCP連接中建立Channel是沒有上述代價的。對於Producer或者Consumer來說,可以並發的使用多個Channel進行Publish或者Receive。有實驗表明,1s的數據可以Publish10K的數據包。當然對於不同的硬件環境,不同的數據包大小這個數據肯定不一樣,但是我只想說明,對於普通的Consumer或者Producer來說,這已經足夠了。如果不夠用,你考慮的應該是如何細化split你的設計。

Exchange

Exchange是屬於Vhost的。同一個Vhost不能有重復的Exchange名稱。 Exhange有四種類型:fanout,Direct,header,topic。

Exchange接受Producer發送的Message並根據不同路由算法將Message發送到Message Queue。

  從架構圖可以看出,Procuder Publish的Message進入了Exchange。接着通過“routing keys”, RabbitMQ會找到應該把這個Message放到哪個queue里。queue也是通過這個routing keys來做的綁定。

    Exchange的綁定功能,可以綁定queue,也可以綁定Exchange。這個看具體業務了。如果綁定數據,需要在分發或者重新被分派,使用To Exchange綁定。如果要被直接處理,使用queue綁定。如果Exchange綁定了Queue,如果Route Key不對,也會導致數據不可達,被丟掉。(一個數據可以被Queue處理,需要Exchange綁定Queue,並且在Message發送的時候,Route Key 與綁定的Key相等。我們發送一個空數據,Route Key隨便制定了Hello,消息雖然發送,但是客戶獲得失敗。

 

    有四種類型的Exchanges:direct, fanout,topic。 每個實現了不同的路由算法(routing algorithm)。

  • Direct exchange: 如果 routing key 匹配, 那么Message就會被傳遞到相應的queue中。其實在queue創建時,它會自動的以queue的名字  作為routing key來綁定那個exchange。(轉發消息到routigKey指定的隊列)
  • Fanout exchange: 會向響應的queue廣播。(轉發消息到所有綁定隊列)
  • Topic exchange: 對key進行模式匹配,比如ab*可以傳遞到所有ab*的queue。(按規則轉發消息(最靈活))

 Queue:

  Message Queue會在Message不能被正常消費時將其緩存起來,但是當Consumer與Message Queue之間的連接通暢時,Message Queue將Message轉發給Consumer。
    Message由Header和Body組成,Header是由Producer添加的各種屬性的集合,包括Message是否客被緩存、由哪個Message Queue接受、優先級是多少等。而Body是真正需要傳輸的APP數據。
    Exchange與Message Queue之間的關聯通過Binding來實現。Exchange在與多個Message Queue發生Binding后會生成一張路由表,路由表中存儲着Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header得到Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。

  Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,兩者的匹配方式由Exchange Type決定。
    Exchange Type分為Direct(單播)、Topic(組播)、Fanout(廣播)。當為Direct(單播)時,Routing Key必須與Binding Key相等時才能匹配成功,當為Topic(組播)時,Routing Key與Binding Key符合一種模式關系即算匹配成功,當為Fanout(廣播)時,不受限制。默認Exchange Type是Direct(單播)。

routing:exchange和queue之間綁定的媒介,成為routing key。

message acknowledgment: 消息確認,解決消息確認問題,只有收到ack之后才能從消息系統中刪除。

message durability: 消息持久化,當rabbitmq退出或崩潰后,會把queue中的消息持久化。但注意,RabbitMQ並不能百分之百保證消息一定不會丟失,因為為了提 升性能,RabbitMQ會把消息暫存在內存緩存中,直到達到閥值才會批量持久化到磁盤,也就是說如果在持久化到磁盤之前RabbitMQ崩潰了,那么就 會丟失一小部分數據,這對於大多數場景來說並不是不可接受的,如果確實需要保證任務絕對不丟失,那么應該使用事務機制。

Virtual hosts

   每個virtual host本質上都是一個RabbitMQ Server,擁有它自己的queue,exchange,和bind rule等等。這保證了你可以在多個不同的application中使用RabbitMQ。

 RabbitMQ的Vhost主要是用來划分不同業務模塊。不同業務模塊之間沒有信息交互。Vhost之間相互完全隔離,不同Vhost之間無法共享Exchange和Queue。因此Vhost之間數據無法共享和分享。如果要實現這種功能,需要Vhost之間手動構建對應代碼邏輯。

 

    Vhost其實在用戶之間是透明的,Vhost可以被多個用戶訪問,而一個用戶可以同時訪問多個Vhost。 例如:peter可以同時訪問"/"和"/vhost"。

    Virtual Host是個虛擬概念,可以持有一些Exchange和Message Queue。一個Virtual Host可以是一台服務器,也可以是由多台服務器組成的集群。Exchange和Message Queue可以分別部署在一台或者多台服務器上。

第三章 啟動rabbitmq的webUI

一、啟動步驟

1、啟動rabbitmq

  • rabbitmq-server (前台啟動)或者rabbitmq-server -detached(后台啟動)

2、啟動rabbitmq_management

  • rabbitmq-plugins enable rabbitmq_management

執行命令后啟動的插件:

3、瀏覽器登入

http://localhost:15672/打開登錄頁使用{username:"guest",password:"guest"}登入。

 

二、webUI包含的監視項

  • overview頁面:可以看到消息的接收和消費情況以及如下的其他一些總控的部分

  

  • connectionstcp連接
  • channels:信道:一個connection內可以有多個channel,一個channel供一個線程使用
  • exchanges:交換器:消息生成者發送消息到交換器,最后交換器中的消息通過路由規則到達綁定的隊列中供消費者使用
  • queues:隊列:消息的最終到達地點。也是消費者獲取消息的地方
  • admin:權限部分、用戶列表、虛擬主機部分(虛擬主機與rabbitmq的關系就與虛擬機與物理機的關系一樣)

三、注意點

我使用的是rabbitmq3.6.1,在這個版本登錄webUI的話,有下面幾種情況:

1、使用默認的guest登錄

http://localhost:15672/,只能用localhost,不能寫成具體的IP,包括127.0.0.1,包括當前的本機IP(這是>=3.3.0版本引入的功能)

2、自己創建用戶進行登錄

2.1、添加用戶及密碼

  • rabbitmqctl add_user zhaojigang wangna(添加了用戶zhaojigang,密碼是wangna)

2.2、設置用戶角色

  • rabbitmqctl set_user_tags zhaojigang administrator(將用戶zhaojigang設為administrator)

查看一下用戶列表:

  • rabbitmqctl list_users

  

2.3、登錄webUI

http://192.168.23.238:15672/登錄即可。

注意:必須為用戶設置角色,否則可能無法登陸進入。各種角色、各種權限查看下文:

http://my.oschina.net/hncscwc/blog/262246

 

附:設置指定用戶在指定虛擬主機上的配置、讀、寫權限

  • 添加虛擬主機:rabbitmqctl add_vhost zhaojigangvhost
  • 設置指定用戶在指定虛擬主機上的配置、讀、寫權限: rabbitmqctl set_permissions -p zhaojigangvhost zhaojigang ".*" ".*" ".*"(設置用戶zhaojigang在虛擬主機zhaojigangvhost的配置權限、寫權限、讀權限)

配置了這些權限后,zhaojigang在zhaojigangvhost上才有權限進行配置、讀、寫。

第四章 第一個rabbitmq程序

rabbitmq消息發送模型

要素:

  • 生產者
  • 消費者
  • 交換器:生產者將消息發送到交換器
  • 隊列:交換器通過某種路由規則綁定到指定隊列,將消息加入隊列,消費者從隊列消費消息

前提:

引入rabbitmq的java客戶端jar包

        <!-- import rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.5.6</version>
        </dependency>

 

一、消息生產者

1、代碼:

 1 package com.xxx.producer;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 
10 /**
11  * 消息生產者
12  */
13 public class HelloWorldProducer {
14     private static final String QUEUE_NAME    = "helloQueue";
15     private static final String EXCHANGE_NAME = "helloExchange";
16 
17     public static void main(String[] args) throws IOException, TimeoutException {
18         ConnectionFactory factory = new ConnectionFactory();// 建立連接工廠
19         factory.setHost("192.168.20.238");// 設置rabbitmq服務器地址
20         factory.setPort(5672);// 設置rabbitmq服務器端口
21         factory.setUsername("zhaojigang");
22         factory.setPassword("wangna");
23         factory.setVirtualHost("zhaojigangvhost");
24 
25         Connection connection = factory.newConnection();// 建立連接
26         Channel channel = connection.createChannel();// 建立信道
27 
28         /**
29          * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
30          * durable:隊列是否持久化
31          * exclusive:當最后一個消費者取消訂閱時,是否自動刪除
32          * autoDelete:只有當前應用程序才能夠消費隊列消息(場景:限制一個隊列只有一個消費者)
33          * arguments:other properties (construction arguments) for the queue
34          */
35         channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 創建隊列(如果隊列不存在,創建;如果存在,什么都不做)
36         /**
37          * exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)
38          * exchange:交換器名字
39          * type:3種類型 direct/fanout/topic
40          */
41         channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);
42 
43         for (int i = 0; i < 10; i++) {
44             String msg = "helloworld_" + i;// 創建消息
45             /**
46              * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
47              * exchange:交換器
48              * routingKey:路由鍵
49              * props:other properties for the message - routing headers etc
50              * body:消息體
51              */
52             channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, msg.getBytes());// 發布消息
53             System.out.println("發送消息:msg-->" + msg);
54         }
55 
56         channel.close();// 關閉信道
57         connection.close();// 關閉連接
58     }
59 }

2、步驟:

  • 創建並設置連接工廠
    • host、port、username、password、vhost
    • 值得注意的是,一定要現在rabbitmq server上把username和password設置好,並且開啟該用戶在指定vhost上的權限,才可以設置連接工廠成功
  • 創建連接
  • 創建信道
  • 創建隊列
  • 創建交換器
  • 創建(創建之后也可以配置消息)並發送消息
  • 關閉信道
  • 關閉連接

3、注意點:

  • queueDeclare方法:如果隊列不存在,創建;如果存在,什么都不做

  • basicPublish:發布消息到指定的交換器,並制定路由規則(用於消費者部分的綁定操作)

 

二、消息消費者

1、代碼:

 1 package com.xxx.consumer;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 import com.rabbitmq.client.Consumer;
10 import com.rabbitmq.client.DefaultConsumer;
11 import com.rabbitmq.client.Envelope;
12 import com.rabbitmq.client.AMQP.BasicProperties;
13 
14 /**
15  * 消息消費者
16  */
17 public class HelloWorldConsumer {
18     private final static String QUEUE_NAME = "helloQueue";
19     private static final String EXCHANGE_NAME = "helloExchange";
20 
21     public static void main(String[] args) throws IOException, TimeoutException {
22         ConnectionFactory factory = new ConnectionFactory();// 建立連接工廠
23         factory.setHost("192.168.20.238");// 設置rabbitmq服務器地址
24         factory.setPort(5672);// 設置rabbitmq服務器端口
25         factory.setUsername("zhaojigang");
26         factory.setPassword("wangna");
27         factory.setVirtualHost("zhaojigangvhost");
28 
29         Connection connection = factory.newConnection();// 建立連接
30         Channel channel = connection.createChannel();// 建立信道
31 
32         /**
33          * Note that we declare the queue here, as well. 
34          * Because we might start the receiver before the sender, 
35          * we want to make sure the queue exists before we try to consume messages from it.
36          */
37         channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 創建隊列(如果隊列不存在,創建;如果存在,什么都不做)
38         channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);
39         /**
40          * queueBind(String queue, String exchange, String routingKey)
41          */
42         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, QUEUE_NAME);
43         Consumer consumer = new DefaultConsumer(channel){
44             @Override
45             public void handleDelivery(String consumerTag, 
46                                        Envelope envelope, 
47                                        BasicProperties properties,
48                                        byte[] body) throws IOException {
49                 String msg = new String(body,"UTF-8");
50                 System.out.println("接收消息:msg-->" + msg);
51             }
52         };54             /**
55              * basicConsume(String queue, boolean autoAck, Consumer callback)
56              * autoAck true if the server should consider messages acknowledged once delivered; 
57              * false if the server should expect explicit acknowledgements
          * 這里啟動一個consume,該consume會不斷的接收消息,如果此處用while(true)包起來的話,就會不斷的啟動consume 58 */ 59 channel.basicConsume(QUEUE_NAME, true, consumer);61 62 // channel.close();// 關閉信道 63 // connection.close();// 關閉連接 64 } 65 }

2、步驟:

  • 創建並設置連接工廠
    • host、port、username、password、vhost
  • 創建連接
  • 創建信道
  • 創建隊列
  • 創建交換器
  • 通過路由規則綁定隊列和交換器
  • 創建消息處理函數
  • 從隊列獲取消息並消費消息(根據消息處理函數) 

三、測試

1、啟動rabbitmq服務器

2、啟動消費者進程

3、啟動生產者進程

4、查看console即可或者查看rabbitmq的webUI

多個消費者監聽同一個隊列

生產者:代碼如上一章

消費者1:代碼如前一章

消費者2:與消費者1代碼完全相同

注意:此時,消費者1和2監聽在同一個隊列上,隊列會以輪訓的方式將10個消息分別交給消費者1和2進行處理。

但是這種情況下,如果消費者1處理的消息比較繁重,而消費者2處理的消息比較輕松地話,實際上應該讓消費者2多處理一些消息,在消費者代碼添加中如下代碼:

        /**
         * basicQos(int prefetchCount)
         * prefetchCount:maximum number of messages that the server will deliver, 0 if unlimited
         */
     channel.basicQos(1);//阻止rabbitmq將消息平均分配到每一個消費者,會優先的發給不忙的消費者,如果當前的消費者在忙的話,就將消息分配給下一個消費者
 

附1 rabbitmq常用命令

1、rabbitmq的啟動和停止

  • rabbitmq-server (前台啟動)
  • rabbitmq-server -detached(后台啟動)
  • rabbitmqctl stop(停止)

2、查看rabbitmq的狀態

  • rabbitmqctl status

3、用戶管理

  • rabbitmqctl add_user zhaojigang wangna(添加用戶zhaojigang,密碼是wangna)
  • rabbitmqctl delete_user zhaojigang(刪除用戶zhaojigang)
  • rabbitmqctl list_users(列出所有用戶名及其角色)
  • rabbitmqctl change_password zhaojigang wangna2(修改zhaojigang的密碼為wangna2)

4、用戶角色管理(5類角色)

5、虛擬主機vhost管理

  • rabbitmqctl list_vhosts(列出所有的虛擬主機)
  • rabbitmqctl add_vhost zhaojigangvhost(添加虛擬主機zhaojigangvhost)
  • rabbitmqctl delete_vhost zhaojigangvhost(刪除虛擬主機zhaojigangvhost)

6、用戶權限管理(3種權限)

  • rabbitmqctl set_permissions -p zhaojigangvhost zhaojigang ".*" ".*" ".*"(設置用戶zhaojigang在虛擬主機zhaojigangvhost的配置權限、寫權限、讀權限)

  • rabbitmqctl set_permissions [-p vhost] {user} {conf} {write} {read}

附2 rabbitmq用戶管理、角色管理與權限管理

本文摘自:http://my.oschina.net/hncscwc/blog/262246

1. 用戶管理

用戶管理包括增加用戶,刪除用戶,查看用戶列表,修改用戶密碼。

相應的命令

(1) 新增一個用戶

rabbitmqctl  add_user  Username  Password

(2) 刪除一個用戶

rabbitmqctl  delete_user  Username

(3) 修改用戶的密碼

rabbitmqctl  change_password  Username  Newpassword

(4) 查看當前用戶列表

rabbitmqctl  list_users

2. 用戶角色

按照個人理解,用戶角色可分為五類,超級管理員, 監控者, 策略制定者, 普通管理者以及其他。

(1) 超級管理員(administrator)

可登陸管理控制台(啟用management plugin的情況下),可查看所有的信息,並且可以對用戶,策略(policy)進行操作。

(2) 監控者(monitoring)

可登陸管理控制台(啟用management plugin的情況下),同時可以查看rabbitmq節點的相關信息(進程數,內存使用情況,磁盤使用情況等)

(3) 策略制定者(policymaker)

可登陸管理控制台(啟用management plugin的情況下), 同時可以對policy進行管理。但無法查看節點的相關信息(上圖紅框標識的部分)。

與administrator的對比,administrator能看到這些內容

(4) 普通管理者(management)

僅可登陸管理控制台(啟用management plugin的情況下),無法看到節點信息,也無法對策略進行管理。

(5) 其他

無法登陸管理控制台,通常就是普通的生產者和消費者。

了解了這些后,就可以根據需要給不同的用戶設置不同的角色,以便按需管理。

設置用戶角色的命令為:

rabbitmqctl  set_user_tags  User  Tag

User為用戶名, Tag為角色名(對應於上面的administrator,monitoring,policymaker,management,或其他自定義名稱)。

也可以給同一用戶設置多個角色,例如

rabbitmqctl  set_user_tags  hncscwc  monitoring  policymaker

3. 用戶權限

用戶權限指的是用戶對exchange,queue的操作權限,包括配置權限,讀寫權限。配置權限會影響到exchange,queue的聲明和刪除。讀寫權限影響到從queue里取消息,向exchange發送消息以及queue和exchange的綁定(bind)操作。

例如: 將queue綁定到某exchange上,需要具有queue的可寫權限,以及exchange的可讀權限;向exchange發送消息需要具有exchange的可寫權限;從queue里取數據需要具有queue的可讀權限。詳細請參考官方文檔中"How permissions work"部分。

相關命令為:

(1) 設置用戶權限

rabbitmqctl  set_permissions  -p  VHostPath  User  ConfP  WriteP  ReadP

(2) 查看(指定hostpath)所有用戶的權限信息

rabbitmqctl  list_permissions  [-p  VHostPath]

(3) 查看指定用戶的權限信息

rabbitmqctl  list_user_permissions  User

(4)  清除用戶的權限信息

rabbitmqctl  clear_permissions  [-p VHostPath]  User


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM