消息追蹤
消息跟蹤,排查問題。追蹤消息的生產和消費
Firehose
Firehose 的原理是將生產者投遞給RabbitMQ 的消息,或者RabbitMQ 投遞給消費者的消息按照指 定的格式發送到默認的交換器上。這個默認的交換器的名稱為 amq.rabbitmq.trace ,它是一個 topic 類型的交換器。發送到這個交換器上的消息的路由鍵為 publish.{exchangename} 和 deliver. {queuename} 。其中 exchangename 和 queuename 為交換器和隊列的名稱,分別對應生產者投遞到交
換器的消息和消費者從隊列中獲取的消息。
默認是非持久化的,且對性能有影響,生產環境不用,默認也是關閉的
rabbitmq_tracing
可視化
TTL機制
使用場景:
在京東下單,訂單創建成功,等待支付,一般會給30分鍾的時間,開始倒計時。如果在這段時間內 用戶沒有支付,則默認訂單取消。
該如何實現?
定期輪詢(數據庫等)
用戶下單成功,將訂單信息放入數據庫,同時將支付狀態放入數據庫,用戶付款更 改數據庫狀態。定期輪詢數據庫支付狀態,如果超過30分鍾就將該訂單取消。 優點:設計實現簡單 缺點:需要對數據庫進行大量的IO操作,效率低下。
Timer
缺點:
- Timers沒有持久化機制.
- Timers不靈活 (只可以設置開始時間和重復間隔,對等待支付貌似夠用)
- Timers 不能利用線程池,一個timer一個線程
- Timers沒有真正的管理計划
ScheduledExecutorService
- 優點:可以多線程執行,一定程度上避免任務間互相影響,單個任務異常不影響其它任務。
- 在高並發的情況下,不建議使用定時任務去做,因為太浪費服務器性能,不建議。
RabbitMQ
使用TTL
Quartz
Redis Zset+過期時間
JCronTab
SchedulerX
TTL介紹
TTL,Time to Live 的簡稱,即過期時間。 RabbitMQ 可以對消息和隊列兩個維度來設置TTL。
任何消息中間件的容量和堆積能力都是有限的,如果有一些消息總是不被消費掉,那么需要有一種 過期的機制來做兜底。
目前有兩種方法可以設置消息的TTL。
- 通過Queue屬性設置,隊列中所有消息都有相同的過期時間。
- 對消息自身進行單獨設置,每條消息的TTL 可以不同。
如果兩種方法一起使用,則消息的TTL 以兩者之間較小數值為准。
通常來講,消息在隊列中的生存時間一旦超過設置的TTL 值時,就會變成“死信”(Dead Message),消費者默認就無法再收到該消息。當然,“死信”也是可以被取出來消費的,下一小節我們會講解。
RabbitMQ TTL實戰
通過原生API設置
public class Producer {
public static void main(String[] args) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@10.211.55.11:5672/%2f");
try (final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel()) {
Map<String, Object> arguments = new HashMap<>();
// 消息隊列中消息過期時間,30s
arguments.put("x-message-ttl", 10 * 1000);
// 如果消息隊列沒有消費者,則10s后消息過期,消息隊列也刪除
// arguments.put("x-expires", 10 * 1000);
arguments.put("x-expires", 60 * 1000);
channel.queueDeclare("queue.ttl.waiting",
true,
false,
false,
arguments);
channel.exchangeDeclare("ex.ttl.waiting",
"direct",
true,
false,
null);
channel.queueBind("queue.ttl.waiting", "ex.ttl.waiting", "key.ttl.waiting");
final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentEncoding("utf-8")
.deliveryMode(2) // 持久化的消息
.build();
channel.basicPublish("ex.ttl.waiting",
"key.ttl.waiting",
null,
"等待的訂單號".getBytes("utf-8"));
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
關鍵參數:
- x-message-ttl
- x-expires(消息隊列很重要,就不要設置,否則如果沒有消息,達到過期時間會刪掉隊列)
通過命令行方式設置全局TTL,執行如下命令:
rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues
設置隊列中的消息過期時間30秒
通過restful api方式設置
默認規則:
- 如果不設置TTL,則表示此消息不會過期;
- 如果TTL設置為0,則表示除非此時可以直接將消息投遞到消費者,否則該消息會被立即丟 棄;
注意理解 message-ttl 、 x-expires 這兩個參數的區別,有不同的含義。但是這兩個參數屬性都 遵循上面的默認規則。一般TTL相關的參數單位都是毫秒(ms)
SpringBoot方式設置
todo
死信隊列
TTL可以用於訂單支付的時候,如果30分鍾沒有支付,消息就刪掉
事實上,一會直接將消息刪除,而是把消息加入死信隊列,發送給其他外賣小哥。
用戶下單,調用訂單服務,然后訂單服務調用派單系統通知外賣人員送單,這時候訂單系統與派單系統采用 MQ異步通訊。
在定義業務隊列時可以考慮指定一個 死信交換機,並綁定一個死信隊列。當消息變成死信時,該消 息就會被發送到該死信隊列上,這樣方便我們查看消息失敗的原因。
DLX,全稱為Dead-Letter-Exchange,死信交換器。消息在一個隊列中變成死信(Dead Letter) 之后,被重新發送到一個特殊的交換器(DLX)中,同時,綁定DLX的隊列就稱為“死信隊列”。
以下幾種情況導致消息變為死信:
- 消息被拒絕(Basic.Reject/Basic.Nack),並且設置requeue參數為false;
- 消息過期;
- 隊列達到最大長度。
對於RabbitMQ 來說,DLX 是一個非常有用的特性。
它可以處理異常情況下,消息不能夠被費者正確消費(消費者調用了Basic.Nack 或者Basic.Reject)而被置入死信隊列中的情況,后續分析程序可以通過消費這個死信隊列中的內容來分析當時所遇到的異常情況,進而可以改善和優化系統。
RabbitMq死信隊列實戰
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@10.211.55.11:5672/%2f");
try (final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel()) {
// 正常業務的交換器
channel.exchangeDeclare("ex.biz", "direct", true);
// 聲明死信交換器 DLX
channel.exchangeDeclare("ex.dlx", "direct", true);
// 聲明隊列做死信隊列
channel.queueDeclare("queue.dlx", true, false, false, null);
// 綁定死信交換器和死信隊列
channel.queueBind("queue.dlx", "ex.dlx", "key.dlx");
Map<String, Object> arguments = new HashMap<>();
// 指定消息隊列中的消息過期時間
arguments.put("x-message-ttl", 10000);
// 指定過期消息通過死信交換器發送到死信隊列,死信交換器的名稱,DLX
arguments.put("x-dead-letter-exchange", "ex.dlx");
// 指定死信交換器的路由鍵
arguments.put("x-dead-letter-routing-key", "key.dlx");
channel.queueDeclare("queue.biz", true, false, false, arguments);
// 綁定業務的交換器和消息隊列
channel.queueBind("queue.biz", "ex.biz", "key.biz");
channel.basicPublish("ex.biz", "key.biz", null, "orderid.45789987678".getBytes());
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
死信隊列基於TTL,過期后將消息發送到死信隊列
死信隊列本身是一個普通隊列,只是在聲明業務隊列時,設置私信隊列參數,綁定死信需要發送到哪個隊列。
SpringBoot方式死信隊列實戰
todo
延遲隊列
延遲消息是指的消息發送出去后並不想立即就被消費,而是需要等(指定的)一段時間后才觸發消 費。 例如下面的業務場景:在支付寶上面買電影票,鎖定了一個座位后系統默認會幫你保留15分鍾時 間,如果15分鍾后還沒付款那么不好意思系統會自動把座位釋放掉。怎么實現類似的功能呢?
-
可以用定時任務每分鍾掃一次,發現有占座超過15分鍾還沒付款的就釋放掉。但是這樣做很 低效,很多時候做的都是些無用功;
-
可以用分布式鎖、分布式緩存的被動過期時間,15分鍾過期后鎖也釋放了,緩存key也不存在 了;
-
還可以用延遲隊列,鎖座成功后會發送1條延遲消息,這條消息15分鍾后才會被消費,消費的 過程就是檢查這個座位是否已經是“已付款”狀態;
你在公司的協同辦公系統上面預約了一個會議,邀請汪產品和陳序員今晚22點准時參加會有。系統 還比較智能,除了默認發會議邀請的郵件告知參會者以外,到了今晚21:45分的時候(提前15分鍾)就 會通知提醒參會人員做好參會准備,會議馬上開始...
同樣的,這也可以通過輪詢“會議預定表”來實現,比如我每分鍾跑一次定時任務看看當前有哪些會 議即將開始了。當然也可以通過延遲消息來實現,預定會議以后系統投遞一條延遲消息,而這條消息比 較特殊不會立馬被消費,而是延遲到指定時間后再觸發消費動作(發通知提醒參會人准備)。不過遺憾 的是,在AMQP協議和RabbitMQ中都沒有相關的規定和實現。不過,我們似乎可以借助上一小節介紹 的“死信隊列”來變相的實現。
如何通過死信隊列變相實現上面的延遲消息功能?
可以給要延遲的消息一個TTL,到期后,發送到死信隊列,客戶端從死信隊列消費消息。
但是這樣實現有個問題,由於消息隊列是個隊列,只支持FIFO。如果隊列里有4個消息順序如下,右邊的先出來:
40sTTL,10sTTL,20sTTL,30sTTL
那么30sTTL的消息出來后,才能檢查20sTTL的消息,問題是20sTTL的消息早就過期了。
即必須保證業務上的過期時間是一致的,如果想上面那種業務過期時間不一致,就不能用死信隊列。
或者把相同過期時間的放入同一個死信隊列
可以使用rabbitmq_delayed_message_exchange插件實現。
這里和TTL方式有個很大的不同就是TTL存放消息在死信隊列(delayqueue)里,二基於插件存放消息 在延時交換機里(x-delayed-message exchange)。
- 生產者將消息(msg)和路由鍵(routekey)發送指定的延時交換機(exchange)上
- 延時交換機(exchange)存儲消息等待消息到期根據路由鍵(routekey)找到綁定自己的隊列 (queue)並把消息給它
- 隊列(queue)再把消息發送給監聽它的消費者(customer)
消息緩存到延遲交換器中
-
下載插件 下載地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
-
安裝插件 將插件拷貝到rabbitmq-server的安裝路徑:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.4/plugins
-
啟用插件
rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 重啟rabbitmq-server
systemctl restart rabbitmq-server
- 編寫代碼,首先是SpringBootApplication主入口類
application.properties文件
spring.application.name=delayed_exchange
spring.rabbitmq.host=node1
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672
# 設置手動確認消息
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
@SpringBootApplication
public class Demo19DelayedExchangeApplication {
public static void main(String[] args) {
SpringApplication.run(Demo19DelayedExchangeApplication.class, args);
}
}
RabbitMQ的對象配置
@Configuration
@EnableRabbit
@ComponentScan("com.lagou.rabbitmq.demo")
public class RabbitConfig {
@Bean
public Queue queue() {
return new Queue("queue.delayed", true, false, false, null);
}
@Bean
public Exchange exchange() {
Map<String, Object> arguments = new HashMap<>();
// 使用x-delayed-type指定交換器的類型
arguments.put("x-delayed-type", ExchangeTypes.DIRECT);
// 使用x-delayed-message表示使用delayed exchange插件處理消息
return new CustomExchange("ex.delayed", "x-delayed-message", true, false, arguments);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("key.delayed").noargs();
}
@Bean
@Autowired
public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
return new RabbitAdmin(factory);
}
@Bean
@Autowired
public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
return new RabbitTemplate(factory);
}
@Bean
@Autowired
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory
= new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
使用推消息模式接收延遲隊列的廣播
@Component
public class MyMeetingListener {
@RabbitListener(queues = "queue.delayed")
public void onMessage(Message message, Channel channel) throws IOException {
System.out.println(new String(message.getBody(), message.getMessageProperties().getContentEncoding()));
// 消息確認
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
開發RestController,用於向延遲隊列發送消息,並指定延遲的時長
@RestController
public class DelayedController {
@Autowired
private AmqpTemplate amqpTemplate;
@RequestMapping("/meeting/{second}")
public String bookMeeting(@PathVariable Integer second) throws UnsupportedEncodingException {
final MessageProperties messageProperties = MessagePropertiesBuilder.newInstance()
// 設置消息的過期時間
.setHeader("x-delay", (second - 10) * 1000)
.setContentEncoding("utf-8")
.build();
final Message message = MessageBuilder
.withBody("還有10s開始開會了".getBytes("utf-8"))
.andProperties(messageProperties)
.build();
amqpTemplate.send("ex.delayed", "key.delayed", message);
return "會議定好了";
}
}
pom.xml添加依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.lagou.rabbitmq.demo</groupId>
<artifactId>demo_19_delayed_exchange</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo_19_delayed_exchange</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- 結果:按照時長倒序發送請求,結果時間先到的先消費。