RabbitMQ


 

RabbitMQ是一個消息中間件,在一些需要異步處理、發布/訂閱等場景的時候,使用RabbitMQ可以完成我們的需求。 下面是我在學習java語言實現RabbitMQ(自RabbitMQ官網的Tutorials)的一些記錄。

首先有三個名稱了解一下(以下圖片來自rabbitMQ官網)

  • producer是用戶應用負責發送消息

  • queue是存儲消息的緩沖(buffer)

  • consumer是用戶應用負責接收消息

下面是我使用rabbitMQ原生的jar包做的測試方法

maven pom.xml 加入

<dependency>

    <groupId>com.rabbitmq</groupId>

    <artifactId>amqp-client</artifactId>

    <version>3.5.6</version>

</dependency>

方法實現示意圖

 

發送消息方法(Send.java)

 1 import com.rabbitmq.client.Channel;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.ConnectionFactory;
 4 
 5 public class Send {
 6      
 7     private static final String QUEUE_NAME = "hello";
 8  
 9     public static void main(String[] args) throws Exception {
10         ConnectionFactory factory = new ConnectionFactory();
11         factory.setHost("192.168.1.7");
12         factory.setPort(5672);
13         factory.setUsername("admin");
14         factory.setPassword("admin");
15         Connection connection = factory.newConnection();
16         Channel channel = connection.createChannel();
17  
18         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
19         String message = "Hello World!";
20         // "" 表示默認exchange
21         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
22         System.out.println(" [x] Sent '" + message + "'");
23  
24         channel.close();
25         connection.close();
26     }
27  
28 }

10~16行 是獲取rabbitmq.client.Channel, rabbitMQ的API操作基本都是通過channel來完成的。

18行 channel.queueDeclare(QUEUE_NAME, false, false, false, null),這里channel聲明了一個名字叫“hello”的queue,聲明queue的操作是冪等的,也就是說只有不存在相同名稱的queue的情況下才會創建一個新的queue。

21行 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()),chaneel在這個queue里發布了消息(字節數組)。

24~25行 則是鏈接的關閉,注意關閉順序就好了。

接受消息方法 (Recv.java)

 1 import com.rabbitmq.client.AMQP;
 2 import com.rabbitmq.client.Channel;
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5 import com.rabbitmq.client.Consumer;
 6 import com.rabbitmq.client.DefaultConsumer;
 7 import com.rabbitmq.client.Envelope;
 8 
 9 import java.io.IOException;
10 
11 public class Recv {
12 
13   private final static String QUEUE_NAME = "hello";
14 
15   public static void main(String[] argv) throws Exception {
16     ConnectionFactory factory = new ConnectionFactory();
17     factory.setHost("192.168.1.7");
18     factory.setPort(5672);
19     factory.setUsername("admin");
20     factory.setPassword("admin");
21     Connection connection = factory.newConnection();
22     Channel channel = connection.createChannel();
23 
24     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
25     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
26 
27     Consumer consumer = new DefaultConsumer(channel) {
28       @Override
29       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
30           throws IOException {
31         String message = new String(body, "UTF-8");
32         System.out.println(" [x] Received '" + message + "'");
33       }
34     };
35     channel.basicConsume(QUEUE_NAME, true, consumer);
36   }
37 }

16~22行 和Send類中一樣,也是獲取同一個rabbitMQ服務的channel,這也是能接受到消息的基礎。

24行 同樣聲明了一個和Send類中發布的queue相同的queue。

27~35行 DefaultConsumer類實現了Consumer接口,由於推送消息是異步的,因此在這里提供了一個callback來緩沖接受到的消息。

先運行Recv 然后再運行Send,就可以看到消息被接受輸出到控制台了,如果多啟動幾個Recv,會發現消息被每個消費者按順序分別消費了,

這也就是rabbitMQ默認采用Round-robin dispatching(輪詢分發機制)。

 

Work queues

上面簡單的實現了rabbitMQ消息的發送和接受,但是無論Send類中的queueDeclare 、basicPublish方法還有Recv類中的basicConsume方法都有很多的參數,

下面我們分析一下幾個重要的參數。

(一)Message acknowledgment 消息答復

上面Recv.java的第35行中,channel.basicConsume(QUEUE_NAME, true, consumer),

在Channel接口中定義為 String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

這個autoAck我們當前實現為true,表示服務器會自動確認ack,一旦RabbitMQ將一個消息傳遞到consumer,它馬上會被標記為刪除狀態。

這樣如果consumer在正常執行任務過程中,一旦consumer服務掛了,那么我們就永遠的失去了這個consumer正在處理的所有消息。

為了防止這種情況,rabbitMQ支持Message acknowledgment,當消息被一個consumer接受並處理完成后,consumer發送給rabbitMQ一個回執,然后rabbitMQ才會刪除這個消息。

當一個消息掛了,rabbitMQ會給另外可用的consumer繼續發送上個consumer因為掛了而沒有處理成功的消息。

因此我們可以設置autoAck=false,來顯示的讓服務端做消息成功執行的確認。

(二)Message durability 消息持久化

Message acknowledgment 確保了consumer掛了的情況下,消息還可以被其他consumer接受處理,但是如果rabbitMQ掛了呢?

在聲明隊列的方法中,Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;

durable=true 意味着該隊列將在服務器重啟后繼續存在。Send和Recv兩個類中聲明隊列的方法都要設置durable=true。

現在,我們需要將消息標記為持久性——通過將MessageProperties(它實現BasicProperties)設置為PERSISTENT_TEXT_PLAIN

(三)Fair dispatch 公平分發

rabbitMQ默認是輪詢分發,這樣對多個consumer而言,可能就會出現負載不均衡的問題,無論是任務本身難易度,還是consumer處理能力的不同,都是導致這種問題。

為了處理這種情況我們可以使用basicQos方法來設置prefetchCount = 1。 這告訴rabbitMQ一次只給consumer一條消息,換句話來說,就是直到consumer發回ack,然后再向這個consumer發送下一條消息。

int prefetchCount = 1 ;
channel.basicQos(prefetchCount);

正是因為Fair dispatch是基於ack的,所有它最好和Message acknowledgment同時使用,否則在autoAck=true的情況下,單獨設置Fair dispatch並沒有效果。

下面是本人測試以上三種情況的測試代碼,可以直接使用。

import java.util.Scanner;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {
     
    private static final String QUEUE_NAME = "task_queue";
 
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.1.7");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
 
        boolean durable = true;    //消息持久化
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
// 多個消息使用空格分隔 Scanner sc
= new Scanner(System.in); String[] splits = sc.nextLine().split(" "); for (int i = 0; i < splits.length; i++) { channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, splits[i].getBytes()); System.out.println(" [x] Sent '" + splits[i] + "'"); } channel.close(); connection.close(); } }
import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {

  private final static String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.1.7");
    factory.setPort(5672);
    factory.setUsername("admin");
    factory.setPassword("admin");
    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
 
    // basicQos方法來設置prefetchCount = 1。 這告訴RabbitMQy一次只給worker一條消息,換句話來說,就是直到worker發回ack,然后再向這個worker發送下一條消息。
    channel.basicQos(1);
 
    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
 
        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    // 當consumer確認收到某個消息,並且已經處理完成,RabbitMQ可以刪除它時,consumer會向RabbitMQ發送一個ack(nowledgement)。
    boolean autoAck = true;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
  }

    protected static void doWork(String message) throws InterruptedException {
        for (char ch: message.toCharArray()) {
            if (ch == '.') Thread.sleep(1000);
        }
    }
}

 

發布/訂閱(Publish/Subscribe)

一個完整的rabbitMQ消息模型是會有Exchange的。

rabbitMQ的消息模型的核心思想是producer永遠不會直接發送任何消息到queue中,實際上,在很多情況下producer根本不知道一條消息是否被發送到了哪個queue中。

在rabbitMQ中,producer僅僅將消息發送到一個exchange中。要理解exchange也非常簡單,它一邊負責接收producer發送的消息, 另一邊將消息推送到queue中。

exchange必須清楚的知道在收到消息之后該如何進行下一步的處理,比如是否應該將這條消息發送到某個queue中? 還是應該發送到多個queue中?還是應該直接丟棄這條消息等。

exchange模型如下:

exchange類型也有好幾種:directtopicheaders以及fanout。

Fanout exchange

下面我們來創建一個fanout類型的exchange,顧名思義,fanout會向所有的queue廣播所有收到的消息。

 1 import java.io.IOException;
 2 import java.util.Scanner;
 3 import java.util.concurrent.TimeoutException;
 4 
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.Connection;
 7 import com.rabbitmq.client.ConnectionFactory;
 8 
 9 import rabbitMQ.RabbitMQTestUtil;
10 
11 public class EmitLog {
12      
13     private static final String EXCHANGE_NAME = "logs";
14  
15     public static void main(String[] argv) throws IOException, TimeoutException {
16  
17         ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
18         Connection connection = factory.newConnection();
19         Channel channel = connection.createChannel();
20         
21         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
22  
23         // 多個消息使用空格分隔
24         Scanner sc = new Scanner(System.in);
25         String[] splits = sc.nextLine().split(" ");
26         for (int i = 0; i < splits.length; i++) {
27                  channel.basicPublish(EXCHANGE_NAME, "", null, splits[i].getBytes());
28              System.out.println(" [x] Sent '" + splits[i] + "'");
29         }
30  
31         channel.close();
32         connection.close();
33     }
34 }

 

 1 import java.io.IOException;
 2 
 3 import com.rabbitmq.client.AMQP;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 import com.rabbitmq.client.ConnectionFactory;
 7 import com.rabbitmq.client.Consumer;
 8 import com.rabbitmq.client.DefaultConsumer;
 9 import com.rabbitmq.client.Envelope;
10 
11 import rabbitMQ.RabbitMQTestUtil;
12 
13 public class ReceiveLogs {
14 
15     private static final String EXCHANGE_NAME = "logs";
16      
17       public static void main(String[] argv) throws Exception {
18         ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
19           Connection connection = factory.newConnection();
20           Channel channel = connection.createChannel();
21      
22         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
23         String queueName = channel.queueDeclare().getQueue();
24         channel.queueBind(queueName, EXCHANGE_NAME, "");
25      
26         Consumer consumer = new DefaultConsumer(channel) {
27           @Override
28           public void handleDelivery(String consumerTag, Envelope envelope,
29                                      AMQP.BasicProperties properties, byte[] body) throws IOException {
30             String message = new String(body, "UTF-8");
31             System.out.println(" [x] Received '" + message + "'");
32           }
33         };
34         channel.basicConsume(queueName, true, consumer);
35       }
36 }

Direct exchange

在fanout的exchange類型中,消息的發布已經隊列的綁定方法中,routingKey參數都是默認空值,因為fanout類型會直接忽略這個值,

但是在其他exchange類型中它擁有很重要的意義,

      

rabbitMQ支持以上兩種綁定,消息在發布的時候,會指定一個routing key,而圖一中exchange會把routing key為orange發送的消息將會被路由到queue Q1中,使用routing key為black或者green的將會被路由到Q2中。

將多個queue使用相同的binding key進行綁定也是可行的。可以在X和Q1中間增加一個routing key black。 它會向所有匹配的queue進行廣播,使用routing key為black發送的消息將會同時被Q1Q2接收。

 下面是我測試debug和error兩種routing key發布消息並接受處理消息的代碼:

import java.util.Scanner;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import rabbitMQ.RabbitMQTestUtil;

public class EmitLog {
     
    private static final String EXCHANGE_NAME = "direct_logs";
 
    public static void main(String[] argv)
                  throws java.io.IOException, TimeoutException {
 
            ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
            Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
 
        // 多個消息使用空格分隔
        Scanner sc = new Scanner(System.in);
        String[] splits = sc.nextLine().split(" ");
        for (int i = 0; i < splits.length; i++) {
                 channel.basicPublish(EXCHANGE_NAME, splits[i], null, splits[i].getBytes());
             System.out.println(" [x] Sent '" + splits[i] + "'");
        }
 
        channel.close();
        connection.close();
    }
}
View Code
import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import rabbitMQ.RabbitMQTestUtil;

public class ReceiveLogsDebug {

    private static final String EXCHANGE_NAME = "direct_logs";
     
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
          Connection connection = factory.newConnection();
          Channel channel = connection.createChannel();
     
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "debug");
     
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
     
        Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
          }
        };
        channel.basicConsume(queueName, true, consumer);
      }
}
View Code
import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import rabbitMQ.RabbitMQTestUtil;

public class ReceiveLogsError {

    private static final String EXCHANGE_NAME = "direct_logs";
     
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory();
          Connection connection = factory.newConnection();
          Channel channel = connection.createChannel();
     
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "error");
     
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
     
        Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
          }
        };
        channel.basicConsume(queueName, true, consumer);
      }
}
View Code

發送輸入:

debug接受:

error接受:

 

Topic exchange

發送到topic exchange中的消息不能有一個任意的routing_key——它必須是一個使用點分隔的單詞列表。單詞可以是任意的。一些有效的routing key例子:”stock.usd.nyse”,”nyse.vmw”,”quick.orange.rabbit”。

routing key的長度限制為255個字節數。

binding key也必須是相同的形式。topic exchange背后的邏輯類似於direct——一條使用特定的routing key發送的消息將會被傳遞至所有使用與該routing key相同的binding key進行綁定的隊列中。 然而,對binding key來說有兩種特殊的情況:

  1. *(star)可以代替任意一個單詞
  2. #(hash)可以代替0個或多個單詞

和Direct exchange差不多,代碼就不copy了,有興趣的直接看看教程http://www.rabbitmq.com/tutorials/tutorial-five-java.html

Push vs Pull

An initial question we considered is whether consumers should pull data from brokers or brokers should push data to the consumer.

傳統大多數的消息系統(比如Kafka)都是采用的pull data from brokers,數據被從生產者推送到brokers,並被消費者從brokers那里拉出(Pull),

然而無論是推模式還是拉模式,都各有利弊,

基於推模式的系統可以最快速的將消息推送給消息者,但是當消費者消費速率低時,消費者會拒絕(a denial of service attack),

而且很難與多樣化的消費者打交道,因為brokers控制着數據傳輸的速率。

基於拉模式的系統更關注消費者的消費能力,有助於對發送給使用者的數據進行積極的批處理,但是如果brokers沒有數據,

那么消費者可能會在緊密的循環中進行輪詢,解決方案是允許消費者請求阻塞在“長時間輪詢”中,直到數據到達為止。

RabbitMQ支持兩種方式

  • Have messages delivered to them ("push API")
  • Fetch messages as needed ("pull API")

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM