Spring Cloud Stream監控
Spring Boot Actuator組件用於暴露監控端點,很多監控工具都需要依賴該組件的監控端點實現監控。而項目集成了Stream及Actuator后也會暴露相應的監控端點.
首先需要在項目里集成Actuator,添加依賴如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
添加配置,暴露所有監控端點,並顯示健康檢測詳情
management:
endpoint:
health:
# 顯示健康檢測詳情
show-details: always
endpoints:
web:
exposure:
# 暴露所有監控端點
include: '*'
訪問http://localhost:端口號/actuator可以獲取所有暴露出來的監控端點,Stream的相關監控端點也在其中
/actuator/bindings端點可以用於查看bindings相關信息:
/actuator/channels端點用於查看channels的相關信息,“input”和“output”就是channel,可以認為這些channel是topic的抽象:
/actuator/health端點中可以查看binder及RocketMQ的狀態,主要是用於查看MQ的連接情況,如果連接不上其status則為DOWN:
Spring Cloud Stream異常處理
局部處理
配置文件
spring:
cloud:
stream:
bindings:
input:
destination: test-destination
group: test-group
output:
destination: test-destination
代碼實現
@Slf4j
@SpringBootApplication
@EnableBinding({Processor.class})
@EnableScheduling
public class Study01Application {
public static void main(String[] args) {
SpringApplication.run(Study01Application.class, args);
}
@StreamListener(value = Processor.INPUT)
public void handle(String body) {
throw new RuntimeException("運行時錯誤");
}
@ServiceActivator(inputChannel = "test-destination.test-group.errors")
public void handleError(ErrorMessage message) {
Throwable throwable = message.getPayload();
log.error("截獲異常", throwable);
Message<?> originalMessage = message.getOriginalMessage();
assert originalMessage != null;
log.info("原始消息體 = {}", new String((byte[]) originalMessage.getPayload()));
}
@Bean
@InboundChannelAdapter(value = Processor.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<String> test() {
return () -> new GenericMessage<>("qwer");
}
}
全局處理
代碼實現
@StreamListener(value = Processor.INPUT)
public void handle(String body) {
throw new RuntimeException("運行時錯誤");
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
ErrorMessage errorMessage = (ErrorMessage) message;
log.warn("Handling ERROR = {} " + errorMessage);
}