RabbitMQ學習總結 第五篇:路由Routing


目錄

RabbitMQ學習總結 第一篇:理論篇
RabbitMQ學習總結 第二篇:快速入門HelloWorld

RabbitMQ學習總結 第三篇:工作隊列Work Queue

RabbitMQ學習總結 第四篇:發布/訂閱 Publish/Subscribe

RabbitMQ學習總結 第五篇:路由Routing

RabbitMQ學習總結 第六篇:Topic類型的exchange

RabbitMQ學習總結 第七篇:RCP(遠程過程調用協議)

 

上一篇中我們構建了一個簡單的日志系統,我們可以把日志消息廣播給多個接受者。

這篇中我們將來添加一個特性只接收部分消息。例如我只將一些錯誤log存到文件中,把所有的log都打印到控制台里。

1、綁定(Bindings)

在上篇博文中,我們已經創建了一個binding,代碼如下:

channel.queueBind(queueName, EXCHANGE_NAME, "");

一個binding就是exchange和Queue之間的一個關系。可以簡單的理解為:這個Queue對其相對於的exchange的消息感興趣(原文是the queue is interested in messages from this exchange,能理解什么意思,但總覺得怪怪的)。

Binding可以使用一個已經存在的routingKey參數。為了避免和basic_publish參數混淆,我們稱之為binding key。下邊就是我們怎么用key來創建一個binding:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

binding key的意義有時候取決於exchange的類型。對於Fanout類型的exchange,會忽略binding key。

2、Direct類型的exchange

我們上篇博文中的日志系統會把所有的log消息廣播給所有的消費者。我們想擴展來根據他們的日志級別來過濾log消息。例如:我們只想把error級別的日志寫到磁盤文件中,而其它級別的日志消息則過濾掉。

我們之前使用的fanout類型的exchange,但這樣就不會有太多的靈活性。

在這里我們將要使用direct類型的exchange。Direct類型exchange的路由算法是很簡單的:要想一個消息能到達這個隊列,需要binding key和routing key正好能匹配得上。

為了說明這個道理,可以看看下邊的描述:

在這樣的結構中,我們可以看到direct類型的exchange X,有兩個queue綁定到它。第一個queue是以orange為binding key綁定到exchange X上的,第二個queue是由兩個binding key(black和green)綁定到exchange X的。

在這樣的設置中,一條消息被推送到exchange,如果使用的routing key是orange,那么消息就會被路由到Q1中;如果使用的routing key是black或green,那么該消息將會被路由到Q2中。其它的消息都將會被丟棄掉。

3、多重綁定(Multiple bindings)

用同一個binding來把多個queue綁定到同一個exchange也是可行的。例如在之前例子的基礎上,在X和Q1之間添加binding key名字為black,這樣的話,這里的direct類型的exchange就和fanout類型的一樣了,可以把消息推送給所有的queue。帶有routing key為black的消息將會被推送到Q1和Q2中。

4、發送日志(Emitting logs)

我們將會使用這種模型,不使用fanout類型的exchange,而是使用direct類型的。我們使用日志級別做為routing key,接收端根據設置的日志級別做為binding key來接收消息。首先來看看發射日志:

如之前一樣,首先來創建一個exchange:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

然后准備發送消息;

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

這里的”severity”可以是”info”、“warning”、”error”等。

5、訂閱(Subscribing)

這里接收消息和上篇博文中的一樣,只是有一點例外:我們將會為每一個感興趣的日志級別進行綁定。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){   
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

6、最終實現

  • EmitLogDirect.java的代碼:
public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] argv)
                  throws java.io.IOException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
 
        //聲明direct類型的exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
                  
    
//拿到日志級別 String severity = getSeverity(argv); //拿到日志消息 String message = getMessage(argv); //指定routing key,發送消息 channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
channel.close(); connection.close(); }
//.. }
  • ReceiveLogsDirect.java的代碼:
public class ReceiveLogsDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] argv)
                  throws java.io.IOException,
                  java.lang.InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //聲明direct類型的exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }
        
        //綁定我們需要接收的日志級別
        for(String severity : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}
  • 運行三個日志接收器:

接收error和info級別的日志:

接收error級別的日志:

接收info級別的日志:

  • 運行兩個日志發生器:

產生error級別的日志:

產生info級別的日志:

  • 觀察接收器端的變化:

接收error級別的接收器,只接收error級別的日志:

 

接收info級別的接收器,只接收info級別的日志:

 

Error和info級別日志都接收的接收器,info和error級別的日志都接收:

7、總結:

要記住生產者端的routing key,那么在消費者端設置binding key和之前的routing key一樣,就可以用direct類型的exchange了,以此來獲取到自己需要的消息。

 

參考鏈接:http://www.rabbitmq.com/tutorials/tutorial-four-java.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM