rabbitmq-direct(直接交換模式)


生產者和消費者,具有相同的交換機名稱(Exchange)、交換機類型和相同的密匙(routingKey),那么消費者即可成功獲取到消息。
(PS:相對比只要交換機名稱即可接收到消息的廣播模式(fanout),direct模式在其基礎上,多加了一層密碼限制(routingKey)。)

 

一、什么是direct(直接交換模式)

RabbitMQ消息模型的核心思想(core idea): 生產者會把消息發送給RabbitMQ的交換中心(Exchange),Exchange的一側是生產者,另一側則是一個或多個隊列,由Exchange決定一條消息的生命周期–發送給某些隊列,或者直接丟棄掉。

二、代碼域

1. 生產者【DirectBoss】

package com.iyungu.phantaci.test.rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
//消息生產者
public class DirectBoss {

private static final Logger logger = Logger.getLogger(DirectBoss.class);
public static void main(String[] args) {

    ConnectionFactory factory = new ConnectionFactory();
    try {
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //聲明交換機(名稱和類型)
        channel.exchangeDeclare("directLogs", BuiltinExchangeType.DIRECT);
        String message = "2018年8月8日14:03:48";
        //消息發布(其中"directLogs"為交換機名稱,"jay"為routingKey)
        channel.basicPublish("directLogs","jay",null,message.getBytes());
        logger.info("********Message********:發送成功");
        channel.close();
        connection.close();
    } catch (IOException | TimeoutException e) {
        e.printStackTrace();
    }
}

2. 消費者【DirectWorker

package com.iyungu.phantaci.test.rabbitmq;

import com.rabbitmq.client.*;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//消息消費者
public class DirectWorker {

    private static final Logger logger = Logger.getLogger(DirectWorker.class);
    public static void main(String[] args) {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            //交換機聲明(參數為:交換機名稱;交換機類型)
            channel.exchangeDeclare("directLogs", BuiltinExchangeType.DIRECT);
            //獲取一個臨時隊列
            String queueName = channel.queueDeclare().getQueue();
            //隊列與交換機綁定(參數為:隊列名稱;交換機名稱;密匙-routingKey)
            channel.queueBind(queueName,"directLogs","jay");

            logger.info("********Waiting for messages********");

            //這里重寫了DefaultConsumer的handleDelivery方法,因為發送的時候對消息進行了getByte(),在這里要重新組裝成String
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    String message = new String(body,"UTF-8");
                    logger.info("received:" + message);
                }
            };

            //聲明隊列中被消費掉的消息(參數為:隊列名稱;消息是否自動確認;consumer主體)
            channel.basicConsume(queueName,true,consumer);
            //這里不能關閉連接,調用了消費方法后,消費者會一直連接着rabbitMQ等待消費
        } catch (IOException |TimeoutException e) {
            e.printStackTrace();
        }


    }

}

三、direct模式效果


1.

 1)先運行一個消費者,即【DirectWorker】


2)把消費者【DirectWorker】里的routingKey進行修改
把channel.queueBind(queueName,"directLogs","jay");修改為channel.queueBind(queueName,"directLogs","jjlin");。修改完畢后,再運行該消費者


RabbitMQ網頁控制台如下,可看到兩個消費者隊列


2. 再運行一個生產者,即【DirectBoss】
控制台效果圖如下,一條消息發布后,和交換機routingKey一致的消費者收到了消息,不一致的無消息;
消息是由生產者發送給交換機,所以要以生產者發布消息時的routingKey為准。此時,生產者里的routingKey為:”jay”


消費者的routingKey為:”jay“,與生產者一致。消息接收成功


消費者的routingKey為:”jjlin“,與生產者不一致。消息接收失敗


四、多路由(routingKey)接收


通過給消費者綁定多個路由(routingKey),可以使該消費者同時接收多個路由獲取的消息。

如給消費者代碼【DirectWorker】同時綁定兩個routingKey,其余不變

channel.queueBind(queueName, "directLogs", "jjlin"); //routingKey為jjlin
channel.queueBind(queueName, "directLogs", "jay"); //routingKey為jay

1.綁定兩個routingKey后,運行一個消費者【DirectWorker】


2.運行一個生產者【DirectBoss】,代碼默認其routingKey為”jay”

 

3.修改生產者routingKey為”jjlin”,然后再運行生產者【DirectBoss】
把生產者代碼里的routingKey進行修改,即channel.basicPublish("directLogs","jay",null,message.getBytes());改為channel.basicPublish("directLogs","jjlin",null,message.getBytes());,修改完畢后再運行生產者。

 

4.局限性
通過多路由綁定的例子,可以體會到direct模式相對比fanout模式,可以選擇性的接收消息;但局限是面對更多、更復雜的路由匹配時,仍舊會力不從心,這時可以使用更全面的topic主題模式。

原文鏈接:https://blog.csdn.net/fakerswe/article/details/81508963

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM