Springboot2.x整合RabbitMQ


1、RabbitMQ介紹

可參照RabbitMQ筆記

2、接入配置

pom依賴

<!--amqp依賴-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

server.port=8080

spring.application.name=springboot-rabbitmq
spring.rabbitmq.host=192.168.242.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 開啟發送確認
spring.rabbitmq.publisher-confirms=true
# 開啟發送失敗退回
spring.rabbitmq.publisher-returns=true
# 開啟ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

3、一對一模式

  即一個生產者對一個消費者模式

配置類

@Configuration
public class RabbitMqConfig {

    @Bean
    public Queue kinsonQueue() {
        return new Queue("kinson");
    }

}

消費者

@Component
//監聽隊列kinson
@RabbitListener(queues = {"kinson"})
public class MyReceiver1 {

    @RabbitHandler
    public void receiver(String msg) {
        System.out.println("MyReceiver1 :" + msg);
    }
}

消息生產者測試接口

    /**
     * 單條消息發送給單個隊列,該隊列只有一個消費者
     *
     * @return
     */
    @GetMapping(value = "send")
    public String send() {
        String content = "Date:" + System.currentTimeMillis();
        //發送默認交換機對應的的隊列kinson
        amqpTemplate.convertAndSend("kinson", content);
        return content;
    }

4、一對多模式

  即一個生產者對多個消費者,該模式下可以是一個生產者將消息投遞到一個隊列,該隊列對應多個消費者,此時每條消息只會被消費一次,多個消費者循環處理。另外也可以是一個生產者將消息投遞到多個隊列里,此時消息是被復制處理。

模式一:

配置類

@Configuration
public class RabbitMqConfig {

    @Bean
    public Queue kinsonQueue() {
        return new Queue("kinson");
    }

}

消費者1

@Component
//監聽隊列kinson
@RabbitListener(queues = {"kinson"})
public class MyReceiver1 {

    @RabbitHandler
    public void receiver(String msg) {
        System.out.println("MyReceiver1 :" + msg);
    }
}

消費者2

@Component
//監聽隊列kinson
@RabbitListener(queues = {"kinson"})
public class MyReceiver2 {

    @RabbitHandler
    public void receiver(String msg) {
        System.out.println("MyReceiver2 :" + msg);
    }
}

消息生產者測試接口

    /**
     * 發送多條消息到一個隊列,該隊列有多個消費者
     *
     * @return
     */
    @GetMapping(value = "sendMore")
    public String sendMore() {
        List<String> result = new ArrayList<String>();
        //發送10條數據
        for (int i = 0; i < 10; i++) {
            String content = "第" + (i + 1) + "次發送 Date:" + System.currentTimeMillis();
            //發送默認交換機對應的的隊列kinson,此時有兩個消費者MyReceiver1和MyReceiver2,每條消息只會被消費一次
            amqpTemplate.convertAndSend("kinson", content);
            result.add(content);
        }
        return String.join("<br/>", result);
    }

模式二:

配置類

@Configuration
public class RabbitMqConfig {

    @Bean
    public Queue kinsonQueue() {
        return new Queue("kinson");
    }

    @Bean
    public Queue kinsonQueue2() {
        return new Queue("kinson2");
    }
}

kinson隊列消費者

@Component
//監聽隊列kinson
@RabbitListener(queues = {"kinson"})
public class MyReceiver1 {

    @RabbitHandler
    public void receiver(String msg) {
        System.out.println("MyReceiver1 :" + msg);
    }
}

kinson2隊列消費者

@Component
//監聽隊列kinson2
@RabbitListener(queues = {"kinson2"})
public class MyReceiver3 {

    @RabbitHandler
    public void receiver(String msg) {
        System.out.println("MyReceiver3 :" + msg);
    }
}

消息生產者測試接口

  /**
     * 發送多條消息到多個隊列
     *
     * @return
     */
    @GetMapping(value = "sendMoreQueue")
    public String sendMoreQueue() {
        List<String> result = new ArrayList<String>();
        //發送10條數據
        for (int i = 0; i < 10; i++) {
            String content = "第" + (i + 1) + "次發送 Date:" + System.currentTimeMillis();
            //發送默認交換機對應的的隊列kinson
            amqpTemplate.convertAndSend("kinson", content);
            //發送默認交換機對應的的隊列kinson2
            amqpTemplate.convertAndSend("kinson2", content);
            result.add(content);
        }
        return String.join("<br/>", result);
    }

相應測試結果請自測

5、ACK消息確認

配置文件加入相應配置

# 開啟發送確認
spring.rabbitmq.publisher-confirms=true
# 開啟發送失敗退回
spring.rabbitmq.publisher-returns=true
# 開啟ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

配置類,使用Fanout類型的Exchange,主要是設置隊列,交換機及綁定

@Configuration
public class RabbitMqFanoutACKConfig {

    @Bean
    public Queue ackQueue() {
        return new Queue("ackQueue");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingAckQueue2Exchange(Queue ackQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(ackQueue).to(fanoutExchange);
    }

}

消息發送服務

@Service
public class AckSenderService implements RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        System.out.println("AckSender returnedMessage " + message.toString() + " === " + i + " === " + s1 + " === " + s2);
} /** * 消息發送 */ public void send() { final String content = "現在時間是" + LocalDateTime.now(ZoneId.systemDefault()); //設置返回回調 rabbitTemplate.setReturnCallback(this); //設置確認回調 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { System.out.println("消息發送成功!"); } else { System.out.println("消息發送失敗," + cause + correlationData.toString()); } }); rabbitTemplate.convertAndSend("ackQueue", content); } }

消息消費者

@Component
@RabbitListener(queues = {"ackQueue"})
public class MyAckReceiver {

    @RabbitHandler
    public void process(String sendMsg, Channel channel, Message message) {

        System.out.println("AckReceiver  : 收到發送消息 " + sendMsg + ",收到消息時間"
                + LocalDateTime.now(ZoneId.systemDefault()));

        try {
            //告訴服務器收到這條消息已經被當前消費者消費了,可以在隊列安全刪除,這樣后面就不會再重發了,
            //否則消息服務器以為這條消息沒處理掉,后續還會再發
            //第二個參數是消息的標識,false只確認當前一個消息收到,true確認所有consumer獲得的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            System.out.println("process success");
        } catch (Exception e) {
            System.out.println("process fail");
            e.printStackTrace();
        }

    }
}

測試訪問接口

   /**
     * @return
     */
    @GetMapping(value = "ackSend")
    public String ackSend() {
        senderService.send();

        return "ok";
    }

測試將Consumer確認代碼注釋掉,即

@Component
@RabbitListener(queues = {"ackQueue"})
public class MyAckReceiver {

    @RabbitHandler
    public void process(String sendMsg, Channel channel, Message message) {

        System.out.println("AckReceiver  : 收到發送消息 " + sendMsg + ",收到消息時間"
                + LocalDateTime.now(ZoneId.systemDefault()));

        try {
            //告訴服務器收到這條消息已經被當前消費者消費了,可以在隊列安全刪除,這樣后面就不會再重發了,
            //否則消息服務器以為這條消息沒處理掉,后續還會再發
            //第二個參數是消息的標識,false只確認當前一個消息收到,true確認所有consumer獲得的消息
            //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            System.out.println("process success");
        } catch (Exception e) {
            System.out.println("process fail");
            e.printStackTrace();
        }

    }
}

此時訪問測試接口,可以看到當消息發送完被消費掉之后,隊列的狀態變為unacked。

當停掉服務時,unacked狀態變為Ready

再重新啟動服務時會重新發送消息

6、事務機制

事務的實現主要是對信道(Channel)的設置,主要的方法有三個:
//聲明啟動事務模式
channel.txSelect();
//提交事務
channel.txComment();
//回滾事務
channel.txRollback();

消息發送示例

public void publish()
            throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        // 創建連接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost("/");
        factory.setHost(host);
        factory.setPort(port);
        Connection conn = factory.newConnection();
        // 創建信道
        Channel channel = conn.createChannel();
        // 聲明隊列
        channel.queueDeclare(TX_QUEUE, true, false, false, null);

        try {

            long startTime = System.currentTimeMillis();

            for (int i = 0; i < 10; i++) {
                // 聲明事務
                channel.txSelect();
                String message = String.format("時間 => %s", System.currentTimeMillis());
                // 發送消息
                channel.basicPublish("", TX_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes("UTF-8"));
                // 提交事務
                channel.txCommit();
            }

            long endTime = System.currentTimeMillis();

            System.out.println("事務模式,發送10條數據,執行花費時間:" + (endTime - startTime) + "s");

        } catch (Exception e) {
            channel.txRollback();
        } finally {
            channel.close();
            conn.close();
        }
    }

消息消費示例

public void consume() throws IOException, TimeoutException, InterruptedException {

        Connection conn = RabbitMqConnFactoryUtil.getRabbitConn();
        Channel channel = conn.createChannel();
        channel.queueDeclare(TX_QUEUE, true, false, false, null);
        // 聲明事務
        channel.txSelect();
        try {
            //單條消息獲取進行消費
            GetResponse resp = channel.basicGet(TX_QUEUE, false);
            String message = new String(resp.getBody(), "UTF-8");
            System.out.println("收到消息:" + message);
            //消息拒絕
            // channel.basicReject(resp.getEnvelope().getDeliveryTag(), true);
            // 消息確認
            channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
            // 提交事務
            channel.txCommit();
        } catch (Exception e) {
            // 回滾事務
            channel.txRollback();
        } finally {
            //關閉通道、連接
            channel.close();
            conn.close();
        }
    }

7、Confirm消息確認

Confirm發送方確認模式使用和事務類似,也是通過設置Channel進行發送方確認的,Confirm的三種實現方式:
//方式一:普通發送方確認模式
channel.waitForConfirms();
//方式二:批量確認模式
channel.waitForConfirmsOrDie();
//方式三:異步監聽發送方確認模式
channel.addConfirmListener();

消息發布示例

public void publish() throws IOException, TimeoutException, InterruptedException {
        // 創建連接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost("/");
        factory.setHost(host);
        factory.setPort(port);
        Connection conn = factory.newConnection();
        // 創建信道
        Channel channel = conn.createChannel();
        // 聲明隊列
        channel.queueDeclare(CONFIRM_QUEUE, false, false, false, null);

        long startTime = System.currentTimeMillis();

        for (int i = 0; i < 10; i++) {
            // 開啟發送方確認模式
            channel.confirmSelect();
            String message = String.format("時間 => %s", System.currentTimeMillis());
            channel.basicPublish("", CONFIRM_QUEUE, null, message.getBytes("UTF-8"));
        }

        //添加確認監聽器
        channel.addConfirmListener(new ConfirmListener() {

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("未確認消息,標識:" + deliveryTag);
            }

            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println(String.format("已確認消息,標識:%d,多個消息:%b", deliveryTag, multiple));
            }
        });

        long endTime = System.currentTimeMillis();

        System.out.println("執行花費時間:" + (endTime - startTime) + "s");

    }

 

RabbitMQ簡單示例源碼參照Github


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM