rabbitmq學習(三):rabbitmq之扇形交換機、主題交換機


 前言

上篇我們學習了rabbitmq的作用以及直連交換機的代碼實現,這篇我們繼續看如何用代碼實現扇形交換機和主題交換機

一、扇形交換機

  1.生產者

  

/**
 * 生產者
 */
public class LogProducer {
    //交換機名稱
    public final static String EXCHANGE_NAME = "logs";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

            for (int i = 0; i < 5;i++){
                String message = "Hello Rabbit " + i;
                channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
                System.out.println("EmitLog send message " + message);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }
}

  2.消費者

  Consumer1

/**
 * 消費者
 */
public class Consumer1 {
    public final static String EXCHANGE_NAME = "logs";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            String queueName = channel.queueDeclare().getQueue();
            //聲明一個交換機,發布模式為fanout-扇形
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
            //將隊列和交換機綁定起來,因為扇形交換機和路由鍵無關,所以這里路由鍵設為空字符串即可
            channel.queueBind(queueName,EXCHANGE_NAME,"");

            QueueingConsumer consumer = new QueueingConsumer(channel);
            //當連接斷開時,隊列會自動被刪除
            channel.basicConsume(queueName,true,consumer);
            System.out.println("ReceiveLogTopic1 Waitting for message");
            while (true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("ReceiveLogTopic1 receives message " + message);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  Cosumer2

  

/**
 * 消費者2
 */
public class Consumer2 {
    public final static String EXCHANGE_NAME = "logs";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            String queueName = channel.queueDeclare().getQueue();
            //聲明一個交換機,發布模式為fanout-扇形
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
            //將隊列和交換機綁定起來,因為扇形交換機和路由鍵無關,所以這里路由鍵設為空字符串即可
            channel.queueBind(queueName,EXCHANGE_NAME,"");

            QueueingConsumer consumer = new QueueingConsumer(channel);
            //當連接斷開時,隊列會自動被刪除
            channel.basicConsume(queueName,true,consumer);
            System.out.println("ReceiveLog2 Waitting for message");
            while (true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("ReceiveLog2 receives message " + message);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

  先啟動Consumer1,Consumer2,再啟動LogProducer。結果如下:

  LogProducer:

  

 

  Consumer1:

  

 

  Consumer2:

  

  從輸出結果中我們可以看出,扇形交換機所接受到的消息會被分發到所有綁定到該交換機上的隊列中,和路由鍵無關。

 

二、主題交換機

  1.生產者

  

/**
 * 生產者
 */
public class Producer {
    private static final String EXCHANGE_NAME = "topic_logs";
    // 路由關鍵字
    private static final String[] routingKeys = new String[]{
            "quick.orange.rabbit",
            "lazy.orange.elephant",
            "quick.orange.fox",
            "lazy.brown.fox",
            "quick.brown.fox",
            "quick.orange.male.rabbit",
            "lazy.orange.male.rabbit"};

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //聲明交換機
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            //循環發送具有不同routing key的message
            for (String routingKey : routingKeys) {
                String message = routingKey + "--->biu~";
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
                System.out.println("Producer -> routingkey: " + routingKey + ", send message " + message);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }
}

2.消費者

  Consumer1

  

/**
 * 消費者1
 */
public class Consumer1 {
    private static final String EXCHANGE_NAME = "topic_logs";
    // 路由關鍵字
    private static final String[] routingKeys = new String[]{"*.orange.*"};

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //聲明隊列
            String queueName = channel.queueDeclare().getQueue();
            //聲明交換機
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            //將隊列與交換器用routingkey綁定起來
            for (String routingKey : routingKeys) {
                channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
                System.out.println("Consumer1 -> queue: " + queueName + ", exchange_name: " + EXCHANGE_NAME + ", routingKey: " + routingKey);
            }

            //接收消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, consumer);
            System.out.println("Consumer1 waitting for message");

            while (true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody(), "UTF-8");
                Envelope envelope = delivery.getEnvelope();
                String routingKey = envelope.getRoutingKey();
                System.out.println("Consumer1 receive message " + message + ", routingKey: " + routingKey);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  Consumer2

  

/**
 * 消費者2
 */
public class Consumer2 {
    private static final String EXCHANGE_NAME = "topic_logs";
    // 路由關鍵字
    private static final String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //聲明隊列
            String queueName = channel.queueDeclare().getQueue();
            //聲明交換機
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            //將隊列與交換器用routingkey綁定起來
            for (String routingKey : routingKeys) {
                channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
                System.out.println("Consumer2 -> queue: " + queueName + ", exchange_name: " + EXCHANGE_NAME + ", routingKey: " + routingKey);
            }

            //接收消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, consumer);
            System.out.println("Consumer2 waitting for message");

            while (true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody(), "UTF-8");
                Envelope envelope = delivery.getEnvelope();
                String routingKey = envelope.getRoutingKey();
                System.out.println("Consumer2 receive message " + message + ", routingKey: " + routingKey);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  同樣先運行消費者,再運行生產者,結果如下:

  Producer:

  

 

   Consumer1:

  

  Consumer2:

  

  由運行結果我們可以看到:消息被交換機按照模式路由鍵的規則路由到相應的隊列中。

 

代碼gitbu地址:https://github.com/wutianqi/rabbitmq-learn.git

參考資料:https://www.cnblogs.com/LipeiNet/p/5978276.html

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM