批量消息發送模式
批量消息是指把消息放到一個集合統一進行提交,這種方案設計思路是希望消息在一個會話里,比如放到ThreadLocal里的集合,擁有相同
的會話ID,帶有這次提交信息的size等屬性,最重要的是吧這一批消息進行合並。對於channel就是發送一次消息。這種方式也是希望消費端在消
費的時候,可以進行批量化的消費,針對一個原子業務的操作進行處理,但是不保證可靠性,需要進行補償機制。
圖例:

偽代碼思路:
@Data @AllArgsConstructor @NoArgsConstructor public class BatchMessage { private long sessionId; private List<String> messageHolder = new ArrayList<>(); //用來保存message的集合 private int listSize; //集合的條數 }
把message放到ThreadLocal里面使用,這些批量的消息需要同一個sessionId,如果要入庫,只是保存sessionId對應的消息集合,而不是每條消息
步驟:
1、首先業務數據入庫
2、將批量消息對應的BatchMessage入庫,狀態為發送中
3、發送message到Broker
4、返回confirm確認
5、修改狀態為消費成功
6、。。。。后面不講了,和之前博客思路一樣
延遲消息發送模式
使用場景:
1、在電商平台買到的商品簽收后,不點擊確認支付,系統自動在一定時間進行支付操作
2、自動超時作廢的場景,你的優惠券/紅包也有使用時限,也可以用延遲消息機制
實現:
1、DLX和TTL:Consumer訂閱DLX,Message發送到原Queue,設置TTL為30分鍾。TTL到期,消息發送到DLX,然后被Consumer消費,就可以實現延遲隊列
2、rabbitmq 3.5.7及以上的版本提供了一個插件(rabbitmq-delayed-message-exchange)來實現延遲隊列功能
安裝、啟用插件
Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x-delay", 30*60*1000); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
