Springboot整合RabbitMQ


1、簡介

RabbitMQ 即一個消息隊列,主要是用來實現應用程序的異步和解耦,同時也能起到消息緩沖,消息分發的作用。

2、創建一個springboot的項目

3、添加RabbitMQ依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

 

4、在application.yml中配置RabbitMQ

spring

rabbitmq:
  host: 127.0.0.1
  port: 5672
  username: hxj
  password: 123hanxujie
  publisher-confirmstrue
  virtual-host
: /

5、創建一個rabbitMQ配置類(這個一定要看明白)

/**
 * FileName: Application
 * Author:   韓旭傑
 * Date:     2019/2/13 10:42
 * Description: 該類初始化創建隊列、轉發器,並把隊列綁定到轉發器
 */
package com.example.springboot.rabbitmq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 說明:〈該類初始化創建隊列、轉發器,並把隊列綁定到轉發器〉
 *
 * @author 韓旭傑
 * @date 2019/2/13
 * @since 1.0.0
 */
@Configuration
public class Application {
    private static Logger log = LoggerFactory.getLogger(Application.class);
    @Autowired
    private CachingConnectionFactory connectionFactory;
    final static String queueName = "helloQuery";
    @Bean
    public Queue helloQueue() {
        return new Queue(queueName);
    }

    @Bean
    public Queue userQueue() {
        return new Queue("user");
    }

    @Bean
    public Queue dirQueue() {
        return new Queue("direct");
    }
    //===============以下是驗證topic Exchange的隊列==========
    // Bean默認的name是方法名
    @Bean(name="message")
    public Queue queueMessage() {
        return new Queue("topic.message");
    }

    @Bean(name="messages")
    public Queue queueMessages() {
        return new Queue("topic.messages");
    }
    //===============以上是驗證topic Exchange的隊列==========

    //===============以下是驗證Fanout Exchange的隊列==========
    @Bean(name="AMessage")
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }
    //===============以上是驗證Fanout Exchange的隊列==========

    /**
     *     exchange是交換機交換機的主要作用是接收相應的消息並且綁定到指定的隊列.交換機有四種類型,分別為Direct,topic,headers,Fanout.
     *
     *   Direct是RabbitMQ默認的交換機模式,也是最簡單的模式.即創建消息隊列的時候,指定一個BindingKey.當發送者發送消息的時候,指定對應的Key.當Key和消息隊列的BindingKey一致的時候,消息將會被發送到該消息隊列中.
     *
     *   topic轉發信息主要是依據通配符,隊列和交換機的綁定主要是依據一種模式(通配符+字符串),而當發送消息的時候,只有指定的Key和該模式相匹配的時候,消息才會被發送到該消息隊列中.
     *
     *   headers也是根據一個規則進行匹配,在消息隊列和交換機綁定的時候會指定一組鍵值對規則,而發送消息的時候也會指定一組鍵值對規則,當兩組鍵值對規則相匹配的時候,消息會被發送到匹配的消息隊列中.
     *
     *   Fanout是路由廣播的形式,將會把消息發給綁定它的全部隊列,即便設置了key,也會被忽略.
     */
    @Bean
    DirectExchange directExchange(){
        return new DirectExchange("directExchange");
    }
    @Bean
    TopicExchange exchange() {
        // 參數1為交換機的名稱
        return new TopicExchange("exchange");
    }

    /**
     * //配置廣播路由器
     * @return FanoutExchange
     */
    @Bean
    FanoutExchange fanoutExchange() {
        // 參數1為交換機的名稱
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeDirect(@Qualifier("dirQueue")Queue dirQueue,DirectExchange directExchange){
        return  BindingBuilder.bind(dirQueue).to(directExchange).with("direct");
    }

    /**
     * 將隊列topic.message與exchange綁定,routing_key為topic.message,就是完全匹配
     * @param queueMessage
     * @param exchange
     * @return
     */
    @Bean
    // 如果參數名和上面用到方法名稱一樣,可以不用寫@Qualifier
    Binding bindingExchangeMessage(@Qualifier("message")Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    /**
     * 將隊列topic.messages與exchange綁定,routing_key為topic.#,模糊匹配
     * @param queueMessages
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessages(@Qualifier("messages")Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
    @Bean
    Binding bindingExchangeA(@Qualifier("AMessage")Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(){
        //若使用confirm-callback或return-callback,必須要配置publisherConfirms或publisherReturns為true
        //每個rabbitTemplate只能有一個confirm-callback和return-callback,如果這里配置了,那么寫生產者的時候不能再寫confirm-callback和return-callback
        //使用return-callback時必須設置mandatory為true,或者在配置中設置mandatory-expression的值為true
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
//        /**
//         * 如果消息沒有到exchange,則confirm回調,ack=false
//         * 如果消息到達exchange,則confirm回調,ack=true
//         * exchange到queue成功,則不回調return
//         * exchange到queue失敗,則回調return(需設置mandatory=true,否則不回回調,消息就丟了)
//         */
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(ack){
                    log.info("消息發送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
                }else{
                    log.info("消息發送失敗:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
                }
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
            }
        });
        return rabbitTemplate;
    }
}

 

rabbitMQ配置類大約就這些內容,里面我基本上都做了注釋。

下面我們就開始寫rabbitMQ的用法了

6、單生產者和單消費者

6.1、生產者

@Component

public class HelloSender1 {

    /**

     * AmqpTemplate可以說是RabbitTemplate父類,RabbitTemplate實現類RabbitOperations接口,RabbitOperations繼承了AmqpTemplate接口

     */

    @Autowired

    private AmqpTemplate rabbitTemplate;

    @Autowired

    private RabbitTemplate rabbitTemplate1;

    /**

     * 用於單生產者-》單消費者測試

     */

    public void send() {

        String sendMsg = "hello1 " + new Date();

        System.out.println("Sender1 : " + sendMsg);

        this.rabbitTemplate1.convertAndSend("helloQueue", sendMsg);

    }
}

 

名為helloQueue的隊列在配置類創建好了,項目啟動的時候會自動創建

6.2、消費者

@Component

@RabbitListener(queues = "helloQueue")

public class HelloReceiver1 {



    @RabbitHandler

    public void process(String hello) {

        System.out.println("Receiver1  : " + hello);

    }

}

 

@RabbitListener注解是監聽隊列的,當隊列有消息的時候,它會自動獲取。

@RabbitListener 標注在類上面表示當有收到消息的時候,就交給 @RabbitHandler 的方法處理,具體使用哪個方法處理,根據 MessageConverter 轉換后的參數類型

注意

  • 消息處理方法參數是由 MessageConverter 轉化,若使用自定義 MessageConverter 則需要在 RabbitListenerContainerFactory 實例中去設置(默認 Spring 使用的實現是 SimpleRabbitListenerContainerFactory)
  • 消息的 content_type 屬性表示消息 body 數據以什么數據格式存儲,接收消息除了使用 Message 對象接收消息(包含消息屬性等信息)之外,還可直接使用對應類型接收消息 body 內容,但若方法參數類型不正確會拋異常:
    • application/octet-stream:二進制字節數組存儲,使用 byte[]
    • application/x-java-serialized-object:java 對象序列化格式存儲,使用 Object、相應類型(反序列化時類型應該同包同名,否者會拋出找不到類異常)
    • text/plain:文本數據類型存儲,使用 String
    • application/json:JSON 格式,使用 Object、相應類型

6.3、controller

 /** * 最簡單的hello生產和消費實現(單生產者和單消費者) */ @RequestMapping("/hello") public void hello() { helloSender1.send(); } 

6.4、結果

控制台的結果

Sender1 : hello1 Mon Feb 18 10:13:35 CST 2019

2019-02-18 10:13:35,831 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)

Receiver1  : hello1 Mon Feb 18 10:13:35 CST 2019

7、單生產者對多消費者

7.1、生產者

/**

 * 用於單/多生產者-》多消費者測試

 */

public void send(String msg) {

    String sendMsg = msg + new Date();

    System.out.println("Sender1 : " + sendMsg);

    this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);

}

 

7.2、消費者

消費者1

@Component

@RabbitListener(queues = "helloQueue")

public class HelloReceiver1 {



    @RabbitHandler

    public void process(String hello) {

        System.out.println("Receiver1  : " + hello);

    }

}

 

消費者2

@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver2 {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver2  : " + hello);
    }
}

 

7.3、controller

/**
* 單生產者-多消費者
*/
@RequestMapping("/oneToMany")
public void oneToMany() {
for(int i=0;i<10;i++){
helloSender1.send("hellomsg:"+i);
}
}

 

7.4、結果

Sender1 : hellomsg:0Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:1Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:2Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:3Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:4Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:5Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:6Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:7Mon Feb 18 10:19:10 CST 2019

Sender1 : hellomsg:8Mon Feb 18 10:19:10 CST 2019

Sender1 : hellomsg:9Mon Feb 18 10:19:10 CST 2019

Receiver2  : hellomsg:0Mon Feb 18 10:19:09 CST 2019

Receiver2  : hellomsg:2Mon Feb 18 10:19:09 CST 2019

Receiver2  : hellomsg:4Mon Feb 18 10:19:09 CST 2019

Receiver1  : hellomsg:1Mon Feb 18 10:19:09 CST 2019

Receiver2  : hellomsg:6Mon Feb 18 10:19:09 CST 2019

Receiver1  : hellomsg:3Mon Feb 18 10:19:09 CST 2019

Receiver2  : hellomsg:8Mon Feb 18 10:19:10 CST 2019

Receiver1  : hellomsg:5Mon Feb 18 10:19:09 CST 2019

Receiver1  : hellomsg:7Mon Feb 18 10:19:10 CST 2019

Receiver1  : hellomsg:9Mon Feb 18 10:19:10 CST 2019

2019-02-18 10:19:10,041 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,041 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,044 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)

8、實體類的傳輸,必須格式化

8.1、實體類

public class User implements Serializable {

    private String name;

    private String pass;

    public String getName() {

        return name;

    }

    public void setName(String name) {

        this.name = name;

    }

    public String getPass() {

        return pass;

    }

    public void setPass(String pass) {

        this.pass = pass;

    }

    @Override

    public String toString() {

        return "User{" +

                "name='" + name + '\'' +

                ", pass='" + pass + '\'' +

                '}';
    }
}

 

8.2、生產者

/**

 * 實體類的傳輸(springboot完美的支持對象的發送和接收,不需要格外的配置。實體類必須序列化)

 * @param user

 */

public void send(User user) {

    System.out.println("user send : " + user.getName()+"/"+user.getPass());

    this.rabbitTemplate.convertAndSend("userQueue", user);

}

 

8.3、消費者

@Component

@RabbitListener(queues = "userQueue")

public class HelloReceiver3 {



    @RabbitHandler

    public void process(User user){

        System.out.println("user receive  : " + user.getName()+"/"+user.getPass());

    }

}

 

8.4、controller

/**

 * 實體列的傳輸

 */

@RequestMapping("/userTest")

public void userTest(){

    User user=new User();

    user.setName("韓旭傑");

    user.setPass("123456");

    userSender.send(user);

}

 

8.5、結果

user send : 韓旭傑/123456

2019-02-18 10:24:24,251 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)

user receive  : 韓旭傑/123456

9、directExchange

Direct是RabbitMQ默認的交換機模式,也是最簡單的模式.即創建消息隊列的時候,指定一個BindingKey.當發送者發送消息的時候,指定對應的Key.當Key和消息隊列的BindingKey一致的時候,消息將會被發送到該消息隊列中.

9.1、在rabbitMQ配置類中添加內容

@Bean

public Queue dirQueue() {

    return new Queue("direct");

}
@Bean

DirectExchange directExchange(){

    return new DirectExchange("directExchange");

}
/**

 * 將隊列dirQueue與directExchange交換機綁定,routing_key為direct

 * @param dirQueue

 * @param directExchange

 * @return

 */

@Bean

Binding bindingExchangeDirect(@Qualifier("dirQueue")Queue dirQueue,DirectExchange directExchange){

    return  BindingBuilder.bind(dirQueue).to(directExchange).with("direct");

}

 

9.2、生產者

@Component

public class DirectSender {

    @Autowired

    private AmqpTemplate rabbitTemplate;

    public void send() {

        String msgString="directSender :hello i am hzb";

        System.out.println(msgString);

        this.rabbitTemplate.convertAndSend("direct", msgString);

    }

}

 

9.3、消費者

@Component

@RabbitListener(queues = "direct")

public class DirectReceiver {

    @RabbitHandler

    public void process(String msg) {

        System.out.println("directReceiver  : " + msg);

    }

}

 

9.4、controller

 @RequestMapping("/directTest") public void directTest() { directSender.send(); } 

9.5、結果

directSender :hello i am hzb

directReceiver  : directSender :hello i am hzb

2019-02-18 10:33:25,974 INFO (Application.java:175)- 消息發送成功:correlationData(null),ack(true),cause(null)

10、topicExchange

topic轉發信息主要是依據通配符,隊列和交換機的綁定主要是依據一種模式(通配符+字符串),而當發送消息的時候,只有指定的Key和該模式相匹配的時候,消息才會被發送到該消息隊列中.

10.1、在rabbitMQ配置類中添加內容

// Bean默認的name是方法名
@Bean(name="message")
public Queue queueMessage() {

    return new Queue("topic.message");

}

@Bean(name="messages")
public Queue queueMessages() {

    return new Queue("topic.messages");

}
@Bean TopicExchange exchange() {
// 參數1為交換機的名稱 return new TopicExchange("exchange"); } /** * 將隊列topic.message與exchange綁定,routing_key為topic.message,就是完全匹配 * @param queueMessage * @param exchange * @return */ @Bean // 如果參數名和上面用到方法名稱一樣,可以不用寫@Qualifier Binding bindingExchangeMessage(@Qualifier("message")Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } /** * 將隊列topic.messages與exchange綁定,routing_key為topic.#,模糊匹配 * @param queueMessages * @param exchange * @return */ @Bean Binding bindingExchangeMessages(@Qualifier("messages")Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); }

 

10.2、生產者

@Component

public class TopicSender {



    @Autowired

    private AmqpTemplate rabbitTemplate;



    public void send() {

        String msg1 = "I am topic.mesaage msg======";

        System.out.println("sender1 : " + msg1);

        this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);



        String msg2 = "I am topic.mesaages msg########";

        System.out.println("sender2 : " + msg2);

        this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2);
    }
}

 

10.3、消費者

消費者1

@Component

@RabbitListener(queues = "topic.message")

public class TopicMessageReceiver {

    @RabbitHandler

    public void process(String msg) {

        System.out.println("topicMessageReceiver  : " +msg);

    }

}

 

消費者2

@Component

@RabbitListener(queues = "topic.messages")

public class TopicMessagesReceiver {

    @RabbitHandler

    public void process(String msg) {

        System.out.println("topicMessagesReceiver  : " +msg);

    }
}

 

10.4、controller

 /** * topic exchange類型rabbitmq測試 */ @RequestMapping("/topicTest") public void topicTest() { topicSender.send(); } 

10.5、結果

sender1 : I am topic.mesaage msg======

sender2 : I am topic.mesaages msg########

topicMessageReceiver  : I am topic.mesaage msg======

topicMessagesReceiver  : I am topic.mesaage msg======

topicMessagesReceiver  : I am topic.mesaages msg########

2019-02-18 10:39:46,150 INFO (Application.java:175)- 消息發送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:39:46,206 INFO (Application.java:175)- 消息發送成功:correlationData(null),ack(true),cause(null)

11、fanoutExchange

Fanout是路由廣播的形式,將會把消息發給綁定它的全部隊列,即便設置了key,也會被忽略.

11.1、在rabbitMQ配置類中添加內容

//===============以下是驗證Fanout Exchange的隊列==========

@Bean(name="AMessage")
public Queue AMessage() {

    return new Queue("fanout.A");

}

@Bean
public Queue BMessage() {

    return new Queue("fanout.B");

}

@Bean
public Queue CMessage() {

    return new Queue("fanout.C");

}
@Bean
FanoutExchange fanoutExchange() {

    // 參數1為交換機的名稱

    return new FanoutExchange("fanoutExchange");

}
@Bean
Binding bindingExchangeA(@Qualifier("AMessage")Queue AMessage,FanoutExchange fanoutExchange) {

    return BindingBuilder.bind(AMessage).to(fanoutExchange);

}

@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {

    return BindingBuilder.bind(BMessage).to(fanoutExchange);

}

@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {

    return BindingBuilder.bind(CMessage).to(fanoutExchange);

}

 

11.2、生產者

@Component
public class FanoutSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;
    public void send() {
        String msgString="fanoutSender :hello i am hzb";
        System.out.println(msgString);
        // 參數2被忽略
        this.rabbitTemplate.convertAndSend("fanoutExchange","", msgString);
    }
}

 

11.3、消費者

消費者A

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverA  : " + msg);
    }
}

 

消費者B

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {

    @RabbitHandler

    public void process(String msg) {
        System.out.println("FanoutReceiverB  : " + msg);
    }
}

 

消費者C

@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverC  : " + msg);
    }
}

 

11.4、controller

 /** * fanout exchange類型rabbitmq測試 */ @RequestMapping("/fanoutTest") public void fanoutTest() { fanoutSender.send(); } 

11.5、結果

fanoutSender :hello i am hzb

FanoutReceiverA  : fanoutSender :hello i am hzb

FanoutReceiverC  : fanoutSender :hello i am hzb

FanoutReceiverB  : fanoutSender :hello i am hzb

2019-02-18 10:45:38,760 INFO (Application.java:175)- 消息發送成功:correlationData(null),ack(true),cause(null)

12、配置類中的rabbitTemplate

   
@Bean
public RabbitTemplate rabbitTemplate() {

        //若使用confirm-callback或return-callback,必須要配置publisherConfirms或publisherReturns為true

        //每個rabbitTemplate只能有一個confirm-callback和return-callback

        // 配置文件配置了publisher-confirms: true,那么這句話可以省略

        connectionFactory.setPublisherConfirms(true);

        connectionFactory.setPublisherReturns(true);

        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        //使用return-callback時必須設置mandatory為true,或者在配置中設置mandatory-expression的值為true

        rabbitTemplate.setMandatory(true);

//        /**

//         * 如果消息沒有到exchange,則confirm回調,ack=false

//         * 如果消息到達exchange,則confirm回調,ack=true

//         * exchange到queue成功,則不回調return

//         * exchange到queue失敗,則回調return(需設置mandatory=true,否則不回回調,消息就丟了)

//         */

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    log.info("消息發送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                } else {
                    log.info("消息發送失敗:correlationData({}),ack({}),cause({})", correlationData, ack, cause);

                }
            }
        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);

            }
        });
        return rabbitTemplate;
    }

 

好好看看注釋

13、不在配置類中配置callback

方法一:

13.1、配置一個接口

/**

 * 說明:〈定義一個名為SendMessageService 的接口,這個接口繼承了RabbitTemplate.ConfirmCallback,

 * ConfirmCallback接口是用來回調消息發送成功后的方法,當一個消息被成功寫入到RabbitMQ服務端時,

 * 會自動的回調RabbitTemplate.ConfirmCallback接口內的confirm方法完成通知

 *

 * @author 韓旭傑

 * @date 2019/2/14

 * @since 1.0.0

 */
public interface SendMessageService extends RabbitTemplate.ConfirmCallback{

    void sendMessage(String exchange,String routekey,Object message);

}

 

13.2、實現這個接口

/**

 * 說明:〈該類注入了RabbitTemplate,RabbitTemplate封裝了發送消息的方法,我們直接使用即可。

 * 可以看到我們構建了一個回調返回的數據,並使用convertAndSend方法發送了消息。同時實現了confirm回調方法,

 * 通過判斷isSendSuccess可以知道消息是否發送成功,這樣我們就可以進行進一步處理。

 *

 * @author 韓旭傑

 * @date 2019/2/14

 * @since 1.0.0

 */

@Service
public class SendMessageServiceImpl implements SendMessageService{

    private static Logger logger = LoggerFactory.getLogger(SendMessageServiceImpl.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendMessage(String exchange,String routekey,Object message) {
        //設置回調對象

        //rabbitTemplate.setConfirmCallback(this);

        //rabbitTemplate.setMandatory(true);

        //構建回調返回的數據

        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
       //rabbitTemplate.convertAndSend(Constants.SAVE_USER_EXCHANGE_NAME, Constants.SAVE_USER_QUEUE_ROUTE_KEY, message, correlationData);
        rabbitTemplate.convertAndSend(exchange, routekey, message, correlationData);
        logger.info("SendMessageServiceImpl() >>> 發送消息到RabbitMQ, 消息內容: " + message);

    }


    /**

     * 消息回調確認方法

     *

     * @param correlationData 回調數據

     * @param isSendSuccess   是否發送成功

     * @param

     */

    @Override
    public void confirm(CorrelationData correlationData, boolean isSendSuccess, String s) {

        logger.info("confirm回調方法>>>>>>>>>>>>>回調消息ID為: " + correlationData.getId());

        if (isSendSuccess) {
            logger.info("confirm回調方法>>>>>>>>>>>>>消息發送成功");
        } else {
            logger.info("confirm回調方法>>>>>>>>>>>>>消息發送失敗" + s);
        }

    }
}

 

方法二:

直接在生產者發送信息的時候修改rabbitTemplate

@Service
public class SendMessage1 {

    private static Logger log = LoggerFactory.getLogger(SendMessage1.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String exchange, String routekey, Object message) {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {

                if(ack){
                    log.info("消息發送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
                }else{
                    log.info("消息發送失敗:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
                }

            }

        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

                log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);

            }

        });

 

13、有時候消費者出現錯誤,需要人工處理

//構建回調返回的數據

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

//rabbitTemplate.convertAndSend(Constants.SAVE_USER_EXCHANGE_NAME, Constants.SAVE_USER_QUEUE_ROUTE_KEY, message, correlationData);

//rabbitTemplate.convertAndSend(exchange, routekey, message, correlationData);



// 將 CorrelationData的id 與 Message的correlationId綁定,然后關系保存起來,例如放到緩存中,然后人工處理

// 當confirm或return回調時,根據ack類別等,分別處理. 例如return或者ack=false則說明有問題,報警, 如果ack=true則刪除關系

// (因為return在confirm前,所以一條消息在return后又ack=true的情況也是按return處理)

Message message1 = MessageBuilder.withBody(message.toString().getBytes()).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).setCorrelationId(correlationData.getId()).build();

rabbitTemplate.send(exchange, routekey, message1, correlationData);

 

將 CorrelationData的id 與 Message的correlationId綁定,然后關系保存起來,例如放到緩存中,然后人工處理

 

我們可以看到,這兩條消息關聯起來了。

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM