異步處理特點
1. 異步處理是互聯網應用不可或缺的一種架構模式,大多數業務項目都是由同步處理、異步處理和定時任務處理三種模式相輔相成實現的。 2. 異步處理無需同步等待流程處理完畢,因此適用場景主要包括: 2.1 服務於主流程的分支流程。比如,在注冊流程中,把數據寫入數據庫的操作是主流程,但注冊后給用戶發優惠券或歡迎短信的操作是分支流程,時效性不那么強,可以進行異步處理。 2.2 用戶不需要實時看到結果的流程。比如,下單后的配貨、送貨流程完全可以進行異步處理,每個階段處理完成后,再給用戶發推送或短信讓用戶知曉即可。 3. 異步處理因為可以有 MQ 中間件的介入用於任務的緩沖的分發,所以相比於同步處理,在應對流量洪峰、實現模塊解耦和消息廣播方面有功能優勢。
異步處理需要注意的四個問題
1. 異步處理流程的可靠性問題:異步流程丟消息或處理中斷需要備線進行補償。
2. 異步處理消息重復的問題:處理邏輯時需要實現冥等防止重復處理。 3. 消息發送模式的區分問題:不同服務多個實例監聽消息情況,
-- 不同服務需要同時收到相同消息,相同服務的多個實例需要輪詢接收消息
-- 需要確認MQ消息路由配置是否滿足需求,以避免消息重復或漏發。 4. 大量死信消息堵塞隊列的問題:可能引發堵塞MQ,需要設置一定的重試策略處理死信隊列。
操作RabbitMQ引入amqp依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
異步處理需要消息補償閉環
背景
1. 使用類似 RabbitMQ、RocketMQ 等 MQ 系統來做消息隊列實現異步處理,雖然說消息可以落地到磁盤保存, 2. 即使 MQ 出現問題消息數據也不會丟失,但是異步流程在消息發送、傳輸、處理等環節,都可能發生消息丟失。 3. 此外,任何 MQ 中間件都無法確保 100% 可用,需要考慮不可用時異步流程如何繼續進行。 //對於異步處理流程,必須考慮補償或建立主備雙活流程
用戶注冊后異步發送歡迎消息案例
//調用邏輯 1. 藍色的線,使用 MQ 進行的異步處理,我們稱作主線,可能存在消息丟失的情況(虛線代表異步調用); 2. 綠色的線,使用補償 Job 定期進行消息補償,我們稱作備線,用來補償主線丟失的消息; 3. 考慮到極端的 MQ 中間件失效的情況,我們要求備線的處理吞吐能力達到主線的能力水平。
代碼實現
定義 UserController 用於注冊 + 發送異步消息。
//一次注冊10個用戶,不能發送出去概率50% @RestController @Slf4j @RequestMapping("user") public class UserController { @Autowired private UserService userService; @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("register") public void register() { //模擬10個用戶注冊 IntStream.rangeClosed(1, 10).forEach(i -> { //落庫 User user = userService.register(); //模擬50%的消息可能發送失敗 if (ThreadLocalRandom.current().nextInt(10) % 2 == 0) { //通過RabbitMQ發送消息 rabbitTemplate.convertAndSend(RabbitConfiguration.EXCHANGE, RabbitConfiguration.ROUTING_KEY, user); log.info("sent mq user {}", user.getId()); } }); } }
定義 MemberService 類用於模擬會員服務
//會員服務監聽用戶注冊成功的消息,並發送歡迎短信 //使用 ConcurrentHashMap 來存放那些發過短信的用戶 ID 實現冪等,避免相同的用戶進行補償時重復發送短信: @Component @Slf4j public class MemberService { //發送歡迎消息的狀態 private Map<Long, Boolean> welcomeStatus = new ConcurrentHashMap<>(); //監聽用戶注冊成功的消息,發送歡迎消息 @RabbitListener(queues = RabbitConfiguration.QUEUE) public void listen(User user) { log.info("receive mq user {}", user.getId()); welcome(user); } //發送歡迎消息 public void welcome(User user) { //去重操作 if (welcomeStatus.putIfAbsent(user.getId(), true) == null) { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { } log.info("memberService: welcome new user {}", user.getId()); } } }
對於 MQ 消費程序,處理邏輯務必考慮去重(支持冪等),原因有幾個:
1. MQ 消息可能會因為中間件本身配置錯誤、穩定性等原因出現重復。 2. 自動補償重復,比如本例,同一條消息可能既走 MQ 也走補償,肯定會出現重復,而且考慮到高內聚,補償 Job 本身不會做去重處理。 3. 人工補償重復。 3.1 出現消息堆積時,異步處理流程必然會延遲。 3.2 如果我們提供了通過后台進行補償的功能,那么在處理遇到延遲的時候,很可能會先進行人工補償,過了一段時間后處理程序又收到消息了,重復處理。 3.3 之前就遇到過一次由 MQ 故障引發的事故,MQ 中堆積了幾十萬條發放資金的消息,導致業務無法及時處理,運營以為程序出錯了就先通過后台進行了人工處理, 3.4 結果 MQ 系統恢復后消息又被重復處理了一次,造成大量資金重復發放。
定義補償 Job 也就是備線操作。
//在 CompensationJob 中定義一個 @Scheduled 定時任務,5 秒做一次補償操作, //因為 Job 並不知道哪些用戶注冊的消息可能丟失,所以是全量補償, //補償邏輯是:每 5 秒補償一次,按順序一次補償 5 個用戶,下一次補償操作從上一次補償的最后一個用戶 ID 開始; //對於補償任務我們提交到線程池進行“異步”處理,提高處理能力。 @Component @Slf4j public class CompensationJob { //補償Job異步處理線程池 private static ThreadPoolExecutor compensationThreadPool = new ThreadPoolExecutor( 10, 10, 1, TimeUnit.HOURS, new ArrayBlockingQueue<>(1000), new ThreadFactoryBuilder().setNameFormat("compensation-threadpool-%d").get()); @Autowired private UserService userService; @Autowired private MemberService memberService; //目前補償到哪個用戶ID private long offset = 0; //10秒后開始補償,5秒補償一次 @Scheduled(initialDelay = 10_000, fixedRate = 5_000) public void compensationJob() { log.info("開始從用戶ID {} 補償", offset); //獲取從offset開始的用戶 userService.getUsersAfterIdWithLimit(offset, 5).forEach(user -> { compensationThreadPool.execute(() -> memberService.welcome(user)); offset = user.getId(); }); } }
為了實現高內聚,主線和備線處理消息,最好使用同一個方法。比如,本例中 MemberService 監聽到 MQ 消息和 CompensationJob 補償,調用的都是 welcome 方法。
生產級的代碼應該在以下幾個方面進行加強
1. 考慮配置補償的頻次、每次處理數量,以及補償線程池大小等參數為合適的值,以滿足補償的吞吐量。 2. 考慮備線補償數據進行適當延遲。比如,對注冊時間在 30 秒之前的用戶再進行補償,以方便和主線 MQ 實時流程錯開,避免沖突。 3. 諸如當前補償到哪個用戶的 offset 數據,需要落地數據庫。 4. 補償 Job 本身需要高可用,可以使用類似 XXLJob 或 ElasticJob 等任務系統。 //針對消息的補償閉環處理的最高標准是,能夠達到補償全量數據的吞吐量。 //也就是說,如果補償備線足夠完善,即使直接把 MQ 停機,雖然會略微影響處理的及時性,但至少確保流程都能正常執行。
注意消息模式是廣播還是工作隊列
背景
1. 消息廣播,和我們平時說的“廣播”意思差不多,就是希望同一條消息,不同消費者都能分別消費;
2. 而隊列模式,就是不同消費者共享消費同一個隊列的數據,相同消息只能被某一個消費者消費一次。
案例
1. 同一個用戶的注冊消息,會員服務需要監聽以發送歡迎短信,營銷服務同樣需要監聽以發送新用戶小禮物。 2. 但是,會員服務、營銷服務都可能有多個實例,我們期望的是同一個用戶的消息,可以同時廣播給不同的服務(廣播模式), 3. 但對於同一個服務的不同實例(比如會員服務 1 和會員服務 2),不管哪個實例來處理,處理一次即可(工作隊列模式):
MQ這里的區別
1. 對RocketMQ來說,實現類似功能比較簡單直白:如果消費者屬於一個組,那么消息只會由同一個組的一個消費者來消費;如果消費者屬於不同組,那么每個組都能消費一遍消息。
2. 對RabbitMQ來說,消息路由的模式采用的是隊列 + 交換器,隊列是消息的載體,交換器決定了消息路由到隊列的方式,配置比較復雜,容易出錯。
使用 RabbitMQ 實現廣播模式和工作隊列模式的坑
第一步 實現會員服務監聽用戶服務發出的新用戶注冊消息的那部分邏輯
如果我們啟動兩個會員服務,那么同一個用戶的注冊消息應該只能被其中一個實例消費。
//分別實現 RabbitMQ 隊列、交換器、綁定三件套。 //其中,隊列用的是匿名隊列,交換器用的是直接交換器 DirectExchange,交換器綁定到匿名隊列的路由 Key 是空字符串。 //在收到消息之后,我們會打印所在實例使用的端口: //為了代碼簡潔直觀,我們把消息發布者、消費者、以及MQ的配置代碼都放在了一起 @Slf4j @Configuration @RestController @RequestMapping("workqueuewrong") public class WorkQueueWrong { private static final String EXCHANGE = "newuserExchange"; @Autowired private RabbitTemplate rabbitTemplate; @GetMapping public void sendMessage() { rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString()); } //使用匿名隊列作為消息隊列 @Bean public Queue queue() { return new AnonymousQueue(); } //聲明DirectExchange交換器,綁定隊列到交換器 @Bean public Declarables declarables() { DirectExchange exchange = new DirectExchange(EXCHANGE); return new Declarables(queue(), exchange, BindingBuilder.bind(queue()).to(exchange).with("")); } //監聽隊列,隊列名稱直接通過SpEL表達式引用Bean @RabbitListener(queues = "#{queue.name}") public void memberService(String userName) { log.info("memberService: welcome message sent to new user {} from {}", userName, System.getProperty("server.port")); } }
使用 12345 和 45678 兩個端口啟動兩個程序實例后,調用 sendMessage 接口發送一條消息,輸出的日志,顯示同一個會員服務兩個實例都收到了消息:
出現這個問題的原因是,我們沒有理清楚 RabbitMQ 直接交換器和隊列的綁定關系。
以上代碼RabbitMQ 的發消息邏輯
如下圖所示,RabbitMQ 的直接交換器根據 routingKey 對消息進行路由。
由於我們的程序每次啟動都會創建匿名(隨機命名)的隊列,所以相當於每一個會員服務實例都對應獨立的隊列,以空 routingKey 綁定到直接交換器。用戶服務發出消息的時候也設置了 routingKey 為空,所以直接交換器收到消息之后,發現有兩條隊列匹配,於是都轉發了消息:
修復問題
//對於會員服務不要使用匿名隊列,而是使用同一個隊列即可 //把上面代碼中的匿名隊列替換為一個普通隊列: private static final String QUEUE = "newuserQueue"; @Bean public Queue queue() { return new Queue(QUEUE); }
以上代碼RabbitMQ 的發消息邏輯
第二步 進一步完整實現用戶服務需要廣播消息給會員服務和營銷服務的邏輯。
//希望會員服務和營銷服務都可以收到廣播消息,但會員服務或營銷服務中的每個實例只需要收到一次消息 //聲明一個隊列和廣播交換器模擬兩個用戶服務和兩個營銷服務 @Slf4j @Configuration @RestController @RequestMapping("fanoutwrong") public class FanoutQueueWrong { private static final String QUEUE = "newuser"; private static final String EXCHANGE = "newuser"; @Autowired private RabbitTemplate rabbitTemplate; @GetMapping public void sendMessage() { rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString()); } //聲明FanoutExchange,然后綁定到隊列,FanoutExchange綁定隊列的時候不需要routingKey @Bean public Declarables declarables() { Queue queue = new Queue(QUEUE); FanoutExchange exchange = new FanoutExchange(EXCHANGE); return new Declarables(queue, exchange, BindingBuilder.bind(queue).to(exchange)); } //會員服務實例1 @RabbitListener(queues = QUEUE) public void memberService1(String userName) { log.info("memberService1: welcome message sent to new user {}", userName); } //會員服務實例2 @RabbitListener(queues = QUEUE) public void memberService2(String userName) { log.info("memberService2: welcome message sent to new user {}", userName); } //營銷服務實例1 @RabbitListener(queues = QUEUE) public void promotionService1(String userName) { log.info("promotionService1: gift sent to new user {}", userName); } //營銷服務實例2 @RabbitListener(queues = QUEUE) public void promotionService2(String userName) { log.info("promotionService2: gift sent to new user {}", userName); } }
調用sendMessage接口發現,一條用戶注冊的消息,要么被會員服務收到,要么被營銷服務收到,顯然這不是廣播。
使FanoutExchange,實現廣播的交換器生效
廣播交換器非常簡單,它會忽略 routingKey,廣播消息到所有綁定的隊列。
在這個案例中,兩個會員服務和兩個營銷服務都綁定了同一個隊列,所以這四個服務只能收到一次消息:
//拆分隊列,會員和營銷兩組服務分別使用一條獨立隊列綁定到廣播交換器即可 @Slf4j @Configuration @RestController @RequestMapping("fanoutright") public class FanoutQueueRight { private static final String MEMBER_QUEUE = "newusermember"; private static final String PROMOTION_QUEUE = "newuserpromotion"; private static final String EXCHANGE = "newuser"; @Autowired private RabbitTemplate rabbitTemplate; @GetMapping public void sendMessage() { rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString()); } @Bean public Declarables declarables() { //會員服務隊列 Queue memberQueue = new Queue(MEMBER_QUEUE); //營銷服務隊列 Queue promotionQueue = new Queue(PROMOTION_QUEUE); //廣播交換器 FanoutExchange exchange = new FanoutExchange(EXCHANGE); //兩個隊列綁定到同一個交換器 return new Declarables(memberQueue, promotionQueue, exchange, BindingBuilder.bind(memberQueue).to(exchange), BindingBuilder.bind(promotionQueue).to(exchange)); } @RabbitListener(queues = MEMBER_QUEUE) public void memberService1(String userName) { log.info("memberService1: welcome message sent to new user {}", userName); } @RabbitListener(queues = MEMBER_QUEUE) public void memberService2(String userName) { log.info("memberService2: welcome message sent to new user {}", userName); } @RabbitListener(queues = PROMOTION_QUEUE) public void promotionService1(String userName) { log.info("promotionService1: gift sent to new user {}", userName); } @RabbitListener(queues = PROMOTION_QUEUE) public void promotionService2(String userName) { log.info("promotionService2: gift sent to new user {}", userName); } }
修改后交換器和隊列的結構:
調用接口驗證,對於每一條 MQ 消息,會員服務和營銷服務分別都會收到一次,一條消息廣播到兩個服務的同時,在每一個服務的兩個實例中通過輪詢接收。
總結
1. 需要理解RabbitMQ 直接交換器、廣播交換器的工作方式。 2. 對於異步流程來說,消息路由模式一旦配置出錯,輕則可能導致消息的重復處理,重則可能導致重要的服務無法接收到消息,最終造成業務邏輯錯誤。 3. 每個 MQ 中間件對消息的路由處理的配置各不相同,需要了解原理再編碼。
注意死信堵塞了消息隊列
消息處理失敗,重新進入隊列,繼續處理失敗,這樣循環導致堵塞,就是死信;最終MQ可能因為數據量過大而奔潰。
模擬死信場景
//定義一個隊列、一個直接交換器,然后把隊列綁定到交換器: @Bean public Declarables declarables() { //隊列 Queue queue = new Queue(Consts.QUEUE); //交換器 DirectExchange directExchange = new DirectExchange(Consts.EXCHANGE); //快速聲明一組對象,包含隊列、交換器,以及隊列到交換器的綁定 return new Declarables(queue, directExchange, BindingBuilder.bind(queue).to(directExchange).with(Consts.ROUTING_KEY)); } //實現一個 sendMessage 方法來發送消息到 MQ,訪問一次提交一條消息,使用自增標識作為消息內容: //自增消息標識 AtomicLong atomicLong = new AtomicLong(); @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("sendMessage") public void sendMessage() { String msg = "msg" + atomicLong.incrementAndGet(); log.info("send message {}", msg); //發送消息 rabbitTemplate.convertAndSend(Consts.EXCHANGE, msg); } //收到消息后,直接拋出空指針異常,模擬處理出錯的情況: @RabbitListener(queues = Consts.QUEUE) public void handler(String data) { log.info("got message {}", data); throw new NullPointerException("error"); } //調用 sendMessage 接口發送兩條消息,然后來到 RabbitMQ 管理台,可以看到這兩條消息始終在隊列中,不斷被重新投遞,導致重新投遞 QPS 達到很高;
解決死信
//在處理程序出錯的時候,直接拋出AmqpRejectAndDontRequeueException 異常,避免消息重新進入隊列: throw new AmqpRejectAndDontRequeueException("error"); //完整的邏輯 1. 對於同一條消息,先進行幾次重試解決因網絡問題導致的偶發消息處理失敗; 2. 還是不行,再把消息投遞到專門的一個死信隊列; 3. 對於死信隊列的數據,可以只做記錄日志發送報警。
//Spring AMQP 提供了非常方便的解決方案: 1. 定義處理死信消息的交換器和死信隊列。 2. 通過RetryInterceptorBuilder 構建一個 RetryOperationsInterceptor,用於處理失敗時候的重試。 3. 設置策略如:最多嘗試5次(重試4次);並且采取指數退避重試, 4. 首次重試延遲1秒,第二次2秒,以此類推,最大延遲10秒;如果4次重試還是失敗,則使用RepublishMessageRecoverer把消息重新投入一個“死信交換器”中; 5. 最后,定義死信隊列的處理程序。如記錄日志告警提示。 //定義死信交換器和隊列,並且進行綁定 @Bean public Declarables declarablesForDead() { Queue queue = new Queue(Consts.DEAD_QUEUE); DirectExchange directExchange = new DirectExchange(Consts.DEAD_EXCHANGE); return new Declarables(queue, directExchange, BindingBuilder.bind(queue).to(directExchange).with(Consts.DEAD_ROUTING_KEY)); } //定義重試操作攔截器 @Bean public RetryOperationsInterceptor interceptor() { return RetryInterceptorBuilder.stateless() .maxAttempts(5) //最多嘗試(不是重試)5次 .backOffOptions(1000, 2.0, 10000) //指數退避重試 .recoverer(new RepublishMessageRecoverer(rabbitTemplate, Consts.DEAD_EXCHANGE, Consts.DEAD_ROUTING_KEY)) //重新投遞重試達到上限的消息 .build(); } //通過定義SimpleRabbitListenerContainerFactory,設置其adviceChain屬性為之前定義的RetryOperationsInterceptor來啟用重試攔截器 @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAdviceChain(interceptor()); return factory; } //死信隊列處理程序 @RabbitListener(queues = Consts.DEAD_QUEUE) public void deadHandler(String data) { log.error("got dead message {}", data); }
默認情況下 SimpleMessageListenerContainer 只有一個消費線程。可以通過增加消費線程來避免性能問題,如下我們直接設置 concurrentConsumers 參數為 10,來增加到 10 個工作線程:
@Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAdviceChain(interceptor()); factory.setConcurrentConsumers(10); return factory; } //也可以設置maxConcurrentConsumers參數,讓SimpleMessageListenerContainer自己冬天地調整消費者線程數。 //動態開啟新線程
進一步完整實現用戶服務需要廣播消息給會員服務和營銷服務的邏輯。
原文鏈接:https://time.geekbang.org/column/article/234928