springboot整合rabbitMQ


  當前社區活躍度最好的消息中間件就是kafka和rabbitmq了,前面對kafaka的基礎使用做了一些總結,最近開始研究rabbitmq,查看了很多資料,自己仿着寫了一些demo,在博客園記錄一下。

rabbitmq基礎知識

  關於rabbitmq基礎知識,可以看這篇博客,介紹的很詳細了:https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html,這里分享一張核心概念圖

rabbitmq安裝

  rabbitmq的安裝很簡單,我們可以根據自己的系統去網上找對應的安裝說明,這里我為了方便,采用docker鏡像的方式,我的虛擬機裝的是centos7。步驟如下:

  1、啟動docker,關閉防火牆

  2、拉取鏡像:docker pull rabbitmq,如需要管理界面:docker pull rabbitmq:management

  3、執行指令啟動RabbitMQ

  無管理界面:

  docker run --hostname rabbit-host --name rabbit -d -p 5672:5672 rabbitmq

  有管理界面:

  docker run --hostname rabbit-host --name rabbit -d -p 5672:5672 -p 15672:15672 rabbitmq:management

  4、啟動后輸入你的虛擬機地址+端口號15672,即可訪問到rabbitmq登錄界面,默認用戶名和密碼都是guest。

springboot與rabbitmq整合

  IDE:STS,這是spring官方推薦的開發工具,構建springboot項目非常方便。JDK:1.8

  1、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>powerx.io</groupId>
    <artifactId>springboot-rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>springboot-rabbitmq</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

  2、定義常量

package com.example.demo.constant;

public interface QueueConstants {

    // 消息交換
    String MESSAGE_EXCHANGE = "message.direct.myexchange";
    // 消息隊列名稱
    String MESSAGE_QUEUE_NAME = "message.myqueue";
    // 消息路由鍵
    String MESSAGE_ROUTE_KEY = "message.myroute";

    // 死信消息交換
    String MESSAGE_EXCHANGE_DL = "message.direct.dlexchange";
    // 死信消息隊列名稱
    String MESSAGE_QUEUE_NAME_DL = "message.dlqueue";
    // 死信消息路由鍵
    String MESSAGE_ROUTE_KEY_DL = "message.dlroute";

}

 

  3、rabbitmq配置

package com.example.demo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.example.demo.constant.QueueConstants;

@Configuration
public class MyRabbitMqConfiguration {

    /**
     * 交換配置
     *
     * @return
     */
    @Bean
    public DirectExchange messageDirectExchange() {
        return (DirectExchange) ExchangeBuilder.directExchange(QueueConstants.MESSAGE_EXCHANGE)
                .durable(true)
                .build();
    }

    /**
     * 消息隊列聲明
     *
     * @return
     */
    @Bean
    public Queue messageQueue() {
        return QueueBuilder.durable(QueueConstants.MESSAGE_QUEUE_NAME)
                .build();
    }

    /**
     * 消息綁定
     *
     * @return
     */
    @Bean
    public Binding messageBinding() {
        return BindingBuilder.bind(messageQueue())
                .to(messageDirectExchange())
                .with(QueueConstants.MESSAGE_ROUTE_KEY);
    }



}

  4、生產者

package com.example.demo.producer;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.example.demo.constant.QueueConstants;

@Component
public class MessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String str) {
        
        rabbitTemplate.convertAndSend(QueueConstants.MESSAGE_EXCHANGE, QueueConstants.MESSAGE_ROUTE_KEY, str);
    }

}

  5、消費者

package com.example.demo.consumer;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.example.demo.constant.QueueConstants;
import com.rabbitmq.client.Channel;

@Component
public class MessageConsumer {
    @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME)
    public void processMessage(Channel channel,Message  message) {
        System.out.println("MessageConsumer收到消息:"+new String(message.getBody()));
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            
        } 
    }
}

  6、控制器類

package com.example.demo.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.example.demo.producer.MessageProducer;

@RestController
public class TestController {

    @Autowired
    private MessageProducer messageProducer;
    
    @RequestMapping(value = "/index")
    public String index(String str) {
        // 將實體實例寫入消息隊列
        messageProducer.sendMessage(str);
        return "Success";
    }

}

  7、application.properties

#用戶名
spring.rabbitmq.username=guest
#密碼
spring.rabbitmq.password=guest
#服務器ip
spring.rabbitmq.host=192.168.1.124
#虛擬空間地址
spring.rabbitmq.virtual-host=/
#端口號
spring.rabbitmq.port=5672

  至此,springboot整合rabbitmq基本demo完畢,這里不再貼出演示截圖。

消息序列化

  涉及網絡傳輸的應用序列化不可避免,發送端以某種規則將消息轉成 byte 數組進行發送,接收端則以約定的規則進行 byte[] 數組的解析,RabbitMQ 的序列化是指 Message 的 body 屬性,即我們真正需要傳輸的內容,RabbitMQ 抽象出一個 MessageConvert 接口處理消息的序列化,其實現有 SimpleMessageConverter(默認)、Jackson2JsonMessageConverter。

  SimpleMessageConverter 對於要發送的消息體 body 為 byte[] 時不進行處理,如果是 String 則轉成字節數組,如果是 Java 對象,則使用 jdk 序列化將消息轉成字節數組,轉出來的結果較大,含class類名,類相應方法等信息。因此性能較差;當使用 RabbitMQ 作為中間件時,數據量比較大,此時就要考慮使用類似 Jackson2JsonMessageConverter 等序列化形式以此提高性能。

  Jackson2JsonMessageConverter配置如下:

  1、消息發送者設置序列化方式 :rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

  2、消息消費者也應配置 MessageConverter 為 Jackson2JsonMessageConverter

@Configuration
public class RabbitMQConfig {
    
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

}

  3、消費消息

@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {

    @RabbitHandler
    public void processMessage1(@Payload User user) {
        System.out.println(user.getName());
    }

}

消息確認

  RabbitMQ的消息確認有兩種。

  一種是消息發送確認。這種是用來確認生產者將消息發送給交換器,交換器傳遞給隊列的過程中,消息是否成功投遞。發送確認分為兩步,一是確認是否到達交換器,二是確認是否到達隊列。

  第二種是消費接收確認。這種是確認消費者是否成功消費了隊列中的消息。

消息發送確認

  1、ConfirmCallback

  確認消息發送成功,通過實現ConfirmCallBack接口,消息發送到交換器Exchange后觸發回調,使用該功能需要開啟確認,spring-boot中配置如下:

  spring.rabbitmq.publisher-confirms = true

  在MessageProducer.java加入如下代碼:

rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
            System.out.println("消息唯一標識" + correlationData);
            System.out.println("消息確認結果" + ack);
            System.out.println("失敗原因" + cause);
        });

  2、ReturnCallback

  通過實現ReturnCallback接口,如果消息從交換器發送到對應隊列失敗時觸發(比如根據發送消息時指定的routingKey找不到隊列時會觸發),使用該功能需要開啟確認,spring-boot中配置如下:spring.rabbitmq.publisher-returns = true

  在MessageProducer.java加入如下代碼:

rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText,
                String exchange, String routingKey) ->{
                    System.out.println("消息主體message" + message);
                    System.out.println("消息replyCode" + replyCode);
                    System.out.println("消息replyText" + replyText);
                    System.out.println("消息使用的交換器" + exchange);
                    System.out.println("消息使用的路由鍵" + routingKey);
                });

消息消費確認

  消費確認模式有三種:NONE、AUTO、MANUAL。

  開啟手動確認需要在配置中加入:spring.rabbitmq.listener.direct.acknowledge-mode=manual

  消息在處理失敗后將再次返回隊列,重新嘗試消費,如果再次失敗則直接拒絕。

  實例代碼如下:

package com.example.demo.consumer;

import java.io.IOException;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.QueueConstants;
import com.rabbitmq.client.Channel;

@Component
public class MessageConsumer {
    @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME)
    public void processMessage(Channel channel, Message message) {
        System.out.println("MessageConsumer收到消息:" + new String(message.getBody()));
        try {
            //模擬消息處理失敗
            int a = 3 / 0;
            // false只確認當前一個消息收到,true確認所有consumer獲得的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {

                System.out.println("消息已重復處理失敗,拒絕再次接收...");
                try {
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);//requeue為false,拒絕
                } catch (IOException e1) {
                }

            } else {

                System.out.println("消息即將再次返回隊列處理...");

                try {
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // requeue為true重新回到隊列
                } catch (IOException e1) {
                }

            }
        }
    }
}

死信隊列

  DLX, Dead-Letter-Exchange。利用DLX, 當消息在一個隊列中變成死信(dead message)之后,它能被重新publish到另一個Exchange,這樣我們就可以重新去處理這個消息。DLX也是一個正常的Exchange,和一般的Exchange沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性,當這個隊列中有死信時,RabbitMQ就會自動的將這個消息重新發布到設置的Exchange上去,進而被路由到另一個隊列,可以監聽這個隊列中消息做相應的處理 。消息變成死信一向有一下幾種情況:

  利用DLX,我們可以實現消息的延遲消費,可參考:https://www.jianshu.com/p/b74a14c7f31d,還可以像我的demo那樣,對於有問題的消息進行重新處理,實例代碼如下

  首先在MyRabbitMqConfiguration上加入如下配置:

@Bean
    DirectExchange messagedlDirect() {
        return (DirectExchange) ExchangeBuilder.directExchange(QueueConstants.MESSAGE_EXCHANGE_DL).durable(true)
                .build();
    }

    @Bean
    Queue messagedlQueue() {
        return QueueBuilder.durable(QueueConstants.MESSAGE_QUEUE_NAME_DL)
                // 配置到期后轉發的交換
                .withArgument("x-dead-letter-exchange", QueueConstants.MESSAGE_EXCHANGE)
                // 配置到期后轉發的路由鍵
                .withArgument("x-dead-letter-routing-key", QueueConstants.MESSAGE_ROUTE_KEY).build();
    }

    @Bean
    public Binding messageTtlBinding(Queue messagedlQueue, DirectExchange messagedlDirect) {
        return BindingBuilder.bind(messagedlQueue).to(messagedlDirect).with(QueueConstants.MESSAGE_ROUTE_KEY_DL);
    }

  其次,修改我們的消息發送者,發送消息到我們新加入的交換器和路由鍵上,如下:

rabbitTemplate.convertAndSend(QueueConstants.MESSAGE_EXCHANGE_DL, QueueConstants.MESSAGE_ROUTE_KEY_DL, str);

  新添加一個消費者,同時將原來的消費者的監聽隊列換成新加入的

package com.example.demo.consumer;

import java.io.IOException;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.QueueConstants;
import com.rabbitmq.client.Channel;

@Component
public class MessageConsumer {
    @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME_DL)
    public void processMessage(Channel channel, Message message) {
        System.out.println("MessageConsumer收到消息:" + new String(message.getBody()));
        try {
            //模擬消息處理失敗
            int a = 3 / 0;
            // false只確認當前一個消息收到,true確認所有consumer獲得的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {

                System.out.println("消息已重復處理失敗,拒絕再次接收...");
                try {
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);//requeue為false,拒絕
                } catch (IOException e1) {
                }

            } else {

                System.out.println("消息即將再次返回隊列處理...");

                try {
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // requeue為true重新回到隊列
                } catch (IOException e1) {
                }

            }
        }
    }
}
package com.example.demo.consumer;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.example.demo.constant.QueueConstants;
import com.rabbitmq.client.Channel;

@Component
public class MessageConsumer2 {
    @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME)
    public void processMessage(Channel channel,Message  message) {
        System.out.println("MessageConsumer2收到消息:"+new String(message.getBody()));
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            
        } 
    }
}

  啟動項目,發送請求http://localhost:8082/index?str=asdfgh,可以看到后台日志如下:

 

  rabbitmq支持四種交換器,同時還支持很多種插件,功能非常強大,這里我自己還沒親手用過,所以不再展開。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM