RabbitMQ入門-Topic模式


發送到topic的消息不能有任意的綁定鍵,綁定鍵的規則:必須由(.)分割的單詞列表。比如apple.banana.orange

綁定鍵也有兩個特殊字符:

 * 可以代替一個單詞。
# 可以替代零個或多個單詞。

比如:apple.#  *.banana.*

 

生產者:

package com.example.demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 有選擇的接受消息
 */
public class TopicSend {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();    // 連接工廠
        factory.setHost("localhost");
        Connection connection = factory.newConnection();        // 獲取連接
        Channel channel = connection.createChannel();

        // topic類型
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String[] msg = {"666的兔子","懶惰的大象"};
        // 第二個參數為綁定鍵
        channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, msg[0].getBytes());
        channel.basicPublish(EXCHANGE_NAME, "lazy.write.elephant", null, msg[1].getBytes());
        System.out.println("PS-Send:" + msg.toString());

        channel.close();
        connection.close();

    }
}

 

消費者:

package com.example.demo;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicReceive {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();    // 連接工廠
        factory.setHost("localhost");
        Connection connection = factory.newConnection();        // 獲取連接
        Channel channel = connection.createChannel();

        // 聲明一個topic交換類型
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 當聲明隊列,不加任何參數,產生的將是一個臨時隊列,getQueue返回的是隊列名稱
        String queueA = channel.queueDeclare().getQueue();
        String queueB = channel.queueDeclare().getQueue();
        System.out.println("臨時隊列:" + queueA);
        System.out.println("臨時隊列:" + queueB);

        // 第三個參數為“綁定建”
        // * 可以代替一個單詞。
        // # 可以替代零個或多個單詞。
        channel.queueBind(queueA, EXCHANGE_NAME, "*.orange.*");
        channel.queueBind(queueB, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(queueB, EXCHANGE_NAME, "lazy.#");

        Consumer consumerA = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String recv = new String(body, "UTF-8");
                System.out.println("Direct-Receive-A:" + recv);
            }
        };
        Consumer consumerB = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String recv = new String(body, "UTF-8");
                System.out.println("Direct-Receive-B:" + recv);
            }
        };
        channel.basicConsume(queueA, true, consumerA);
        channel.basicConsume(queueB, true, consumerB);
    }
}

 

 

先啟動消費者:再啟動生產者:控制台

..

 

 第一條消息,匹配A和B

第二條消息,只匹配B

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM