Spring Cloud Alibaba學習筆記(13) - Spring Cloud Stream的監控與異常處理


Spring Cloud Stream監控

Spring Boot Actuator組件用於暴露監控端點,很多監控工具都需要依賴該組件的監控端點實現監控。而項目集成了Stream及Actuator后也會暴露相應的監控端點.

首先需要在項目里集成Actuator,添加依賴如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

添加配置,暴露所有監控端點,並顯示健康檢測詳情

management:
  endpoint:
    health:
      # 顯示健康檢測詳情
      show-details: always
  endpoints:
    web:
      exposure:
        # 暴露所有監控端點
        include: '*'

訪問http://localhost:端口號/actuator可以獲取所有暴露出來的監控端點,Stream的相關監控端點也在其中

/actuator/bindings端點可以用於查看bindings相關信息:

/actuator/channels端點用於查看channels的相關信息,“input”和“output”就是channel,可以認為這些channel是topic的抽象:

/actuator/health端點中可以查看binder及RocketMQ的狀態,主要是用於查看MQ的連接情況,如果連接不上其status則為DOWN:

Spring Cloud Stream異常處理

局部處理

配置文件

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: test-destination
          group: test-group
        output:
          destination: test-destination

代碼實現

@Slf4j
@SpringBootApplication
@EnableBinding({Processor.class})
@EnableScheduling
public class Study01Application {
    public static void main(String[] args) {
        SpringApplication.run(Study01Application.class, args);
    }

    @StreamListener(value = Processor.INPUT)
    public void handle(String body) {
        throw new RuntimeException("運行時錯誤");
    }

    @ServiceActivator(inputChannel = "test-destination.test-group.errors")
    public void handleError(ErrorMessage message) {
        Throwable throwable = message.getPayload();
        log.error("截獲異常", throwable);

        Message<?> originalMessage = message.getOriginalMessage();
        assert originalMessage != null;

        log.info("原始消息體 = {}", new String((byte[]) originalMessage.getPayload()));
    }

    @Bean
    @InboundChannelAdapter(value = Processor.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
    public MessageSource<String> test() {
        return () -> new GenericMessage<>("qwer");
    }
}

全局處理

代碼實現

@StreamListener(value = Processor.INPUT)
public void handle(String body) {
    throw new RuntimeException("運行時錯誤");
}

@StreamListener("errorChannel")
public void error(Message<?> message) {
    ErrorMessage errorMessage = (ErrorMessage) message;
    log.warn("Handling ERROR = {} " + errorMessage);
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM