RabbitMQ(topic主題模式)


一、topic主題模式

特點:模糊的routingkey的匹配模式
注意:*代表是必須為一個;#代表0個或者多個

二、代碼

RabbitMQ界面配置

創建交換機

創建隊列

將隊列綁定在交換機上

生產者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生產者
 */
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //所有的中間件技術都是基於TCP/IP協議基礎構建的協議規范,rabbitmq遵循的是ampq協議
        //1.創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2.創建連接Connection
        Connection connection = connectionFactory.newConnection();
        //3.通過連接獲取通道channel
        Channel channel = connection.createChannel();
        //4.通過通道創建交換機、聲明隊列、綁定關系、路由key、發送消息、和接受消息


        //5.准備消息內容
        String msg="hello topic";
        //6.准備交換機
        String exchangeName="topic_exchange";
        //定義路由key
        String routingkey="a.add";
        //定義指定交換機類型
        String exchangeType="topic";
//        //聲明隊列
//        channel.queueDeclare("q1",false,false,false,null);
//        channel.queueDeclare("q2",false,false,false,null);
//        channel.queueDeclare("q3",false,false,false,null);
//        //聲明交換機
//        channel.exchangeDeclare("fanout_exchange","fanout");
//        //將隊列綁定到交換機
//        channel.queueBind("q1",exchangeName,null);
//        channel.queueBind("q2",exchangeName,null);
//        channel.queueBind("q3",exchangeName,null);

        //6.發送消息給隊列queue
        /*
        參數一:交換機
        參數二:隊列、路由key
        參數三:消息的狀態控制
        參數四:消息主體
         */
        channel.basicPublish(exchangeName,routingkey,null,msg.getBytes());
        //7.關閉通道
        channel.close();
        //8.關閉連接
        connection.close();
        System.out.println("生產成功");
    }
}

消費者

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    private static Runnable runnable = new Runnable() {
        @Override
        public void run() {
            //1.創建連接工廠
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            //獲取隊列名稱
            final String queueName=Thread.currentThread().getName();
            Connection connection = null;
            Channel channel=null;
            try {
                //2.創建連接Connection
                connection = connectionFactory.newConnection();
                //3.通過連接獲取通道channel
                 channel = connection.createChannel();

                 //定義接受消息回調
                Channel finalChannel =channel;
                finalChannel.basicConsume(queueName, true, new DeliverCallback() {
                    @Override
                    public void handle(String s, Delivery delivery) throws IOException {
                        System.out.println(delivery.getEnvelope().getDeliveryTag());
                        System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String consumerTag) throws IOException {

                    }
                });
                System.out.println(queueName+":開始接受消息");
                System.in.read();

            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
                System.out.println("發送消息異常。");
            }finally {
                //7.關閉通道釋放連接
                if (channel !=null && channel.isOpen()){
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                    if (connection !=null && connection.isOpen()){
                        try {
                            connection.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }

        }
    };


    public static void main(String[] args)  {
        //啟動三個線程
        new Thread(runnable,"q7").start();
        new Thread(runnable,"q8").start();
        new Thread(runnable,"q9").start();
    }
}

add的結果



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM