RabbitMQ 消息模式


消息模式實例

視頻教程:https://ke.qq.com/course/304104

編寫代碼前,最好先添加好用戶並設置virtual hosts

一、簡單模式

1.導入jar包

    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>4.5.0</version>
    </dependency>

2.創建連接

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {
    private final static String QUEUE = "testhello"; //隊列名字

    public static void main(String[] args) throws Exception{
        //獲取連接
        Connection connection = ConnectionUtil.getConnection();

        //創建通道
        Channel channel = connection.createChannel();

        //聲明隊列,如果隊列存在則什么都不做,如果隊列不存在才創建
        //參數一: 隊列的名字
        //參數二: 是否持久化隊列,我們的隊列模式是在內存中的,如果rabbit重啟會丟失,如果我們設置為true 則會保存到erlng自帶的數據庫中,重啟會重新獲取
        //參數三: 是否排外,有兩個作用,第一個當我們的鏈接關閉后是否會自動刪除隊列,作用二,是否私有當前隊列,如果私有了,其他通道不可以訪問當前隊列,如果為true 一般適合一個隊列消費者的時候
        //參數四: 是否自動刪除
        //參數五 我們的一些其他的參數
        channel.queueDeclare(QUEUE, false, false, false, null);

        //發送內容
        channel.basicPublish("", QUEUE, null, "hello world".getBytes());

        //關閉連接
        channel.close();
        connection.close();
    }
}

3.消費者

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Receiver {
    private final static String QUEUE = "testhello"; //隊列名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, false, false, false, null);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        //接收消息,參數二 是自動確認
        channel.basicConsume(QUEUE, true, consumer);

       while (true) {
            //獲取消息   如果沒有消息會等待,有的話就獲取執行然后銷毀,是一次性的
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("message:"+message);
       }
    }
}

 

二、工作模式

1.生產者

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {
    private final static String QUEUE = "testwork"; //隊列名字

    public static void main(String[] args) throws Exception{
        //獲取連接
        Connection connection = ConnectionUtil.getConnection();

        //創建通道
        Channel channel = connection.createChannel();

        //聲明隊列,如果隊列存在則什么都不做,如果隊列不存在才創建
        //參數一: 隊列的名字
        //參數二: 是否持久化隊列,我們的隊列模式是在內存中的,如果rabbit重啟會丟失,如果我們設置為true 則會保存到erlng自帶的數據庫中,重啟會重新獲取
        //參數三: 是否排外,有兩個作用,第一個當我們的鏈接關閉后是否會自動刪除隊列,作用二,是否私有當前隊列,如果私有了,其他通道不可以訪問當前隊列,如果為true 一般適合一個隊列消費者的時候
        //參數四: 是否自動刪除
        //參數五 我們的一些其他的參數
        channel.queueDeclare(QUEUE, false, false, false, null);

        for (int i = 0; i < 20; i++){
            //發送內容
            channel.basicPublish("", QUEUE, null, ("hello world "+i).getBytes());
        }

        //關閉連接
        channel.close();
        connection.close();
    }
}

2.消費者1

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Receiver1 {
    private final static String QUEUE = "testwork"; //隊列名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, false, false, false, null);

        //告訴服務器,在我沒有確認當前消息完成之前,不要給我發新消息
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //當我們收到消息的時候調用
                System.out.println("消費者1 收到的消息內容是:" + new String(body));
                //確認 參數2,false為確認收到消息,true 為拒絕接收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //注冊消費者,參數2 收到確認,代表我們收到消息后需要手動告訴服務器,我們收到消息了
        channel.basicConsume(QUEUE, false, consumer);
    }
}

3.消費者2

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Receiver2 {
    private final static String QUEUE = "testwork"; //隊列名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, false, false, false, null);

        //告訴服務器,在我沒有確認當前消息完成之前,不要給我發新消息
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //當我們收到消息的時候調用
                System.out.println("消費者2 收到的消息內容是:" + new String(body));
                //確認 參數2,false為確認收到消息,true 為拒絕接收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //注冊消費者,參數2 收到確認,代表我們收到消息后需要手動告訴服務器,我們收到消息了
        channel.basicConsume(QUEUE, false, consumer);
    }
}

 

三、發布訂閱模式

1.生產者

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {
    private final static String EXCHANGE_NAME = "testexchange"; //定義交換機名字

    public static void main(String[] args) throws Exception{

       Connection connection = ConnectionUtil.getConnection();
       Channel channel = connection.createChannel();
       //聲明交換機
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//定義一個交換機,類型是fanout

        //發布訂閱模式,因為消息是先發布到交換機中,而交換機是沒有保存功能的,所以如果沒有消費者,消息則會丟失
        channel.basicPublish(EXCHANGE_NAME, "", null, "發布訂閱模式的消息".getBytes());
        channel.close();
        connection.close();
    }
}

2.消費者1

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Receiver1 {
    private final static String EXCHANGE_NAME = "testexchange"; //定義交換機名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare("testpubQueue1", false, false, false, null);

        //綁定隊列到交換機
        channel.queueBind("testpubQueue1", EXCHANGE_NAME, "");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testpubQueue1", false, consumer);
    }
}

3.消費者2

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Receiver2 {
    private final static String EXCHANGE_NAME = "testexchange"; //定義交換機名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare("testpubQueue2", false, false, false, null);

        //綁定隊列到交換機
        channel.queueBind("testpubQueue2", EXCHANGE_NAME, "");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testpubQueue2", false, consumer);
    }
}

 

四、路由模式

1.生產者

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {
    private final static String EXCHANGE_NAME = "testexroute"; //定義交換機名字

    public static void main(String[] args) throws Exception{

       Connection connection = ConnectionUtil.getConnection();
       Channel channel = connection.createChannel();
       //聲明交換機
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");//定義一個路由格式的交換機

        //發布訂閱模式,因為消息是先發布到交換機中,而交換機是沒有保存功能的,所以如果沒有消費者,消息則會丟失
        // routingKey 為key1
        channel.basicPublish(EXCHANGE_NAME, "key3", null, "路由模式的消息".getBytes());
        channel.close();
        connection.close();
    }
}

2.消費者1

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Receiver1 {
    private final static String EXCHANGE_NAME = "testexroute"; //定義交換機名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare("testRouteQueue1", false, false, false, null);

        //綁定隊列到交換機
        //參數3標記,綁定到交換機的時候會指定一個標記,只有和它一樣的標記的消息才會被當前消費者接收到
        channel.queueBind("testRouteQueue1", EXCHANGE_NAME, "key1");
        //如果需要綁定多個標記 在執行一次即可
        channel.queueBind("testRouteQueue1", EXCHANGE_NAME, "key3");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testRouteQueue1", false, consumer);
    }
}

3.消費者2

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Receiver2 {
    private final static String EXCHANGE_NAME = "testexroute"; //定義交換機名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare("testRouteQueue2", false, false, false, null);

         //綁定隊列到交換機
        //參數3標記,綁定到交換機的時候會指定一個標記,只有和它一樣的標記的消息才會被當前消費者接收到
        channel.queueBind("testRouteQueue2", EXCHANGE_NAME, "key1");
        //如果需要綁定多個標記 在執行一次即可
        channel.queueBind("testRouteQueue2", EXCHANGE_NAME, "key2");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testRouteQueue2", false, consumer);
    }
}

 

五、主題模式

1.生產者

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {
    private final static String EXCHANGE_NAME = "testexchangetopic"; //定義交換機名字

    public static void main(String[] args) throws Exception{

       Connection connection = ConnectionUtil.getConnection();
       Channel channel = connection.createChannel();
       //聲明交換機
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");//定義一個topic 格式的交換機

        //發布訂閱模式,因為消息是先發布到交換機中,而交換機是沒有保存功能的,所以如果沒有消費者,消息則會丟失
        // routingKey 為key1
        // * 只能匹配一個字符 # 可以匹配多個字符
        channel.basicPublish(EXCHANGE_NAME, "abc.1.3", null, "topic模式的消息".getBytes());
        channel.close();
        connection.close();
    }
}

2.消費者1

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Receiver1 {
    private final static String EXCHANGE_NAME = "testexchangetopic"; //定義交換機名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare("testTopicQueue1", false, false, false, null);

        //綁定隊列到交換機
        //參數3標記,綁定到交換機的時候會指定一個標記,只有和它一樣的標記的消息才會被當前消費者接收到
        channel.queueBind("testTopicQueue1", EXCHANGE_NAME, "key.*");
        //如果需要綁定多個標記 在執行一次即可
        channel.queueBind("testTopicQueue1", EXCHANGE_NAME, "abc.#");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testTopicQueue1", false, consumer);
    }
}

3.消費者2

import com.idelan.rabbitmq.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Receiver2 {
    private final static String EXCHANGE_NAME = "testexchangetopic"; //定義交換機名字

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare("testTopicQueue2", false, false, false, null);

         //綁定隊列到交換機
        //參數3標記,綁定到交換機的時候會指定一個標記,只有和它一樣的標記的消息才會被當前消費者接收到
        channel.queueBind("testTopicQueue2", EXCHANGE_NAME, "key.#");
        //如果需要綁定多個標記 在執行一次即可
        channel.queueBind("testTopicQueue2", EXCHANGE_NAME, "abc.*");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume("testTopicQueue2", false, consumer);
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM