Spring Cloud Stream消費失敗后的處理策略(二):自定義錯誤處理邏輯


應用場景

上一篇《Spring Cloud Stream消費失敗后的處理策略(一):自動重試》介紹了默認就會生效的消息重試功能。對於一些因環境原因、網絡抖動等不穩定因素引發的問題可以起到比較好的作用。但是對於諸如代碼本身存在的邏輯錯誤等,無論重試多少次都不可能成功的問題,是無法修復的。對於這樣的情況,前文中說了可以利用日志記錄消息內容,配合告警來做補救,但是很顯然,這樣做非常原始,並且太過笨拙,處理復雜度過高。所以,我們需要需求更好的辦法,本文將介紹針對該類問題的一種處理方法:自定義錯誤處理邏輯。

動手試試

准備一個會消費失敗的例子,可以直接沿用前文的工程,也可以新建一個,然后創建如下代碼的邏輯:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }

    @RestController
    static class TestController {

        @Autowired
        private TestTopic testTopic;

        /**
         * 消息生產接口
         *
         * @param message
         * @return
         */
        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).build());
            return "ok";
        }

    }

    /**
     * 消息消費邏輯
     */
    @Slf4j
    @Component
    static class TestListener {

        @StreamListener(TestTopic.INPUT)
        public void receive(String payload) {
            log.info("Received payload : " + payload);
            throw new RuntimeException("Message consumer failed!");
        }

    }

    interface TestTopic {

        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";

        @Output(OUTPUT)
        MessageChannel output();

        @Input(INPUT)
        SubscribableChannel input();

    }

}

內容很簡單,既包含了消息的生產,也包含了消息消費。消息消費的時候主動拋出了一個異常來模擬消息的消費失敗。

在啟動應用之前,還要記得配置一下輸入輸出通道對應的物理目標(exchange或topic名)、並設置一下分組,比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1

spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,啟動應用並訪問localhost:8080/sendMessage?message=hello接口來發送一個消息到MQ中了,此時可以看到消費失敗后拋出了異常,跟上一篇文章的結果一樣,消息消費失敗,記錄了日志,消息信息丟棄。

下面,針對消息消費失敗,在TestListener中針對消息消費邏輯創建一段錯誤處理邏輯,比如:

@Slf4j
@Component
static class TestListener {

    @StreamListener(TestTopic.INPUT)
    public void receive(String payload) {
        log.info("Received payload : " + payload);
        throw new RuntimeException("Message consumer failed!");
    }

    /**
     * 消息消費失敗的降級處理邏輯
     *
     * @param message
     */
    @ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")
    public void error(Message<?> message) {
        log.info("Message consumer failed, call fallback!");
    }

}

通過使用@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")指定了某個通道的錯誤處理映射。其中,inputChannel的配置中對應關系如下:

  • test-topic:消息通道對應的目標(destination,即:spring.cloud.stream.bindings.example-topic-input.destination的配置)
  • stream-exception-handler:消息通道對應的消費組(group,即:spring.cloud.stream.bindings.example-topic-input.group的配置)

再啟動應用並訪問localhost:8080/sendMessage?message=hello接口來發送一個消息到MQ中,此時可以看到日志如下:

2018-12-11 12:00:35.500  INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2018-12-11 12:00:35.512  INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#311db1cb:0/SimpleConnection@40370d8c [delegate=amqp://guest@127.0.0.1:5672/, localPort= 54391]
2018-12-11 12:00:35.527  INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener  : Received: hello,
2018-12-11 12:00:38.541  INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener  : Message consumer failed, call fallback!

雖然消費邏輯中輸出了消息內容之后拋出了異常,但是會進入到error函數中,執行錯誤處理邏輯(這里只是答應了一句話),用戶可以根據需要讀取消息內容以及異常詳情做更進一步的細化處理。

深入思考

由於error邏輯是通過編碼方式來實現的,所以這段邏輯相對來說比較死。通常,只有業務上有明確的錯誤處理邏輯的時候,這種方法才可以比較好的被應用到。不然能做的可能也只是將消息記錄下來,然后具體的分析原因后再去做補救措施。所以這種方法也不是萬能的,主要適用於有明確錯誤處理方案的方式來使用(這種場景並不多),另外。。。

注意:有坑! 這個方案在目前版本(2.0.x)其實還有一個坑,這種方式並不能很好的處理異常消息,會有部分消息得不到正確的處理,由於應用場景也不多,所以目前不推薦使用這種方法來做(完全可以用原始的異常捕獲機制來處理,只是沒有這種方式那么優雅)。目前看官方issue是在Spring Cloud Stream的2.1.0版本中會修復,后續發布之后可以使用該功能,具體點擊查看:Issue #1357

而對於沒有特定的錯誤處理方案的,也只能通過記錄和后續處理來解決,可能這樣的方式也只是比從日志中抓去簡單那么一些,並沒有得到很大的提升。但是,不要緊,因為下一篇我們將繼續介紹其他更好的處理方案。

代碼示例

本文示例讀者可以通過查看下面倉庫的中的stream-exception-handler-2項目:

如果您對這些感興趣,歡迎star、follow、收藏、轉發給予支持!

以下專題教程也許您會有興趣

本文首發:http://blog.didispace.com/spring-cloud-starter-finchley-7-3/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM