rabbitmq基礎學習+springboot結合rabbitmq實現回調確認confirm


rabbitmq集群docker快速搭建 https://blog.csdn.net/u011058700/article/details/78708767
rabbitmq原理博客 https://www.jianshu.com/p/6376936845ff

基礎概念

  1. Queue
    • 隊列,用於儲存消息,先入先出,prefetchCount限制平分給消費者的消息個數
  2. Exchange
    • 交換機,生產者生產的消息先經過交換機,再路由到一個或多個Queue,這個過程通過binding key完成
    • Exchange交換類別
      • fanout:會把所有發到Exchange的消息路由到所有和它綁定的Queue
      • direct:會把消息路由到routing key和binding key完全相同的Queue,不相同的丟棄
      • topic:direct是嚴格匹配,那么topic就算模糊匹配,routing key和binding key都用.來區分單詞串,比如A.B.C,匹配任意單詞,#匹配任意多個或0個單詞,比如。B.*可以匹配到A.B.C
      • headers:不依賴routing key和binding key,通過對比消息屬性中的headers屬性,對比Exchange和Queue綁定時指定的鍵值對,相同就路由過來

集群

  1. rabbitmq基於Erlang語言編寫,天生支持分布式特性
  2. rabbitmq集群會進行元數據同步(相當於索引),同步的內容大致為隊列,交換機,路由,安全屬性等相關信息,這樣做使得客戶端訪問任意集群幾點,查詢到的隊列交換機等信息都是相同的。
  3. RabbitMQ集群中的節點只有兩種類型:內存節點/磁盤節點,單節點系統只運行磁盤類型的節點。而在集群中,可以選擇配置部分節點為內存節點。內存節點將所有的隊列,交換器,綁定關系,用戶,權限,和vhost的元數據信息保存在內存中。

rabbitmq實現回調

  1. 背景
    • 找的例子大多只是生產和消費,要實現消息的可靠性還是需要回調確認,下面記錄下最簡單的回調實現案例,使用的springboot搭建
  2. 生產者
package com.neo.rabbit.topic;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public class TopicSender implements RabbitTemplate.ConfirmCallback {


    private RabbitTemplate rabbitTemplate;

    @Autowired
    public TopicSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }

    public void send2() {
        for (int i = 0; i < 1000; i++) {
            String context = "hi, i am messages " + i;
            System.out.println("Sender : " + context);
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            System.out.println("callbackSender UUID: " + correlationData.getId());
            this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context, correlationData);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        System.out.println("confirm: " + correlationData.getId() + ",s=" + s + ",b:" + b);
    }
}
  1. 消費者
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver2  : " + message);
    }

}
  1. 配置文件
spring.application.name=spirng-boot-rabbitmq-example
spring.rabbitmq.addresses=ip:5672,ip:5673,ip:5674
spring.rabbitmq.username=dev
spring.rabbitmq.password=xxx
spring.rabbitmq.publisher-confirms=true

  1. test
@RunWith(SpringRunner.class)
@SpringBootTest
public class TopicTest {

	@Autowired
	private TopicSender sender;

	@Test
	public void topic() throws Exception {
		sender.send2();
	}

}
  1. 完成上述步驟,調用測試方法,可以看到日志如下
Sender : hi, i am messages 19
callbackSender UUID: 23e5768f-ce01-400b-81ad-3259a6d9a312
Topic Receiver2  : hi, i am messages 19
confirm: 23e5768f-ce01-400b-81ad-3259a6d9a312,s=null,b:true
Sender : hi, i am messages 20
callbackSender UUID: d7c4757a-0311-4de9-bb6d-661de36ef03e
confirm: d7c4757a-0311-4de9-bb6d-661de36ef03e,s=null,b:true

表明回調測試成功


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM