RabbitMQ-廣播模式


概述

fanout扇出也稱之為廣播

在廣播模式下,消息發送的流程是這樣的,如下所示:

  • 可以有多個消費者。
  • 每個消費者有自己的 queue(隊列)
  • 每個隊列都要綁定到 Exchange(交換機)
  • 生產者發送的消息,只能發送到交換機,交換機來決定要發給哪個隊列,生產者無法決定。
  • 交換機把消息發送給綁定過的所有隊列。
  • 隊列的消費者都能拿到消息。實現一條消息被多個消費者消費。

創建生產者

/**
 * @author: BNTang
 */
public class Producer {

    @Test
    public void sendMessage() throws Exception {
        Connection connection = RabbitMQUtil.getConnection();

        // 創建通道
        Channel channel = connection.createChannel();

        // 設置交換機
        channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);

        // 向交換機發消息
        channel.basicPublish("logs", "", null, ("我是個 fanout 類型的消息").getBytes());

        RabbitMQUtil.closeChannelAndConnection(channel, connection);

        System.out.println("消息發送成功");
    }
}

創建消費者 1

/**
 * @author BNTang
 */
public class Consumer1 {

    @Test
    public void receiveMessage() throws Exception {

        Connection connection = RabbitMQUtil.getConnection();

        // 得到通道
        Channel channel = connection.createChannel();

        // 綁定交換機
        channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);

        // 從通道里面得到一個臨時的隊列
        String queue = channel.queueDeclare().getQueue();

        // 把臨時隊列和交換機進行綁定
        channel.queueBind(queue, "logs", "");

        // 接收消息
        channel.basicConsume(queue, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者【1】接收到消息" + new String(body));
            }
        });

        System.out.println("消費者【1】啟動成功");
        System.in.read();
    }
}

創建消費者 2

/**
 * @author BNTang
 */
public class Consumer2 {

    @Test
    public void receiveMessage() throws Exception {

        Connection connection = RabbitMQUtil.getConnection();

        // 得到通道
        Channel channel = connection.createChannel();

        // 綁定交換機
        channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);

        // 從通道里面得到一個臨時隊列
        String queue = channel.queueDeclare().getQueue();

        // 把臨時的隊列和交換機進行綁定
        channel.queueBind(queue, "logs", "");

        // 接收消息
        channel.basicConsume(queue, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者【2】接收到消息" + new String(body));
            }
        });

        System.out.println("消費者【2】啟動成功");
        System.in.read();
    }
}

測試方式,先啟動消費者1,和消費者2,在啟動消息生產者即可。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM