Spring Kafka(9)ConsumerAwareErrorHandler異常處理器


異常處理

代碼異常十之八九,十段代碼九個bug,哈哈哈哈。

平常程序異常我們使用try catch捕獲異常,在catch方法中根據異常類型進行相關處理,

既然我們可以使用try catch處理異常,那為什么還要使用ConsumerAwareErrorHandler異常處理器去處理異常呢?

首先,KafkaListener要做的事只是監聽Topic中的數據並消費,如果在KafkaListener中還需要對異常進行處理則會顯得代碼塊非常臃腫不利於維護,

我們可以把異常處理的這些代碼抽象出來,構造成一個異常處理器,KafkaListener中所拋出的異常都會經過ConsumerAwareErrorHandler異常處理器進行處理,

這樣就非常方便我們進行后期維護,比如后期更改異常處理業務的時候,

只需要修改ConsumerAwareErrorHandler處理器就行了,而不需要KafkaListener的一堆代碼中去修改代碼。這也是一種思想的體現。

單消息消費異常處理器

這里主要就是注冊一個ConsumerAwareListenerErrorHandler 類型的異常處理器,bean的注冊默認使用的是方法名,

所以我們將這個異常處理的BeanName放到@KafkaListener注解的errorHandler屬性里面。

當KafkaListener拋出異常的時候,則會自動調用異常處理器。

@Component public class ErrorListener { private static final Logger log= LoggerFactory.getLogger(ErrorListener.class); @KafkaListener(id = "err", topics = "topic.quick.error", errorHandler = "consumerAwareErrorHandler") public void errorListener(String data) { log.info("topic.quick.error  receive : " + data); throw new RuntimeException("fail"); } @Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() { return new ConsumerAwareListenerErrorHandler() { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) { log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString()); return null; } }; } }
View Code

編寫測試方法,發送一條消息到topic.quick.error中,運行測試方法后我們可以看到異常處理器已經能正常使用了。

 @Autowired private KafkaTemplate kafkaTemplate; @Test public void testErrorHandler() { kafkaTemplate.send("topic.quick.error", "test error handle"); }
View Code
2018-09-14 11:42:05.099  INFO 8912 --- [      err-0-C-1] com.viu.kafka.listen.ErrorListener : topic.quick.error receive : test error handle 2018-09-14 11:42:05.101  INFO 8912 --- [      err-0-C-1] com.viu.kafka.listen.ErrorListener       : consumerAwareErrorHandler receive : test error handle
View Code

批量消費異常處理器

批量消費代碼也是差不多的,只不過傳遞過來的數據都是List集合方式,這里就不做其他代碼的展示了。

 @Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() { return new ConsumerAwareListenerErrorHandler() { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) { log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString()); MessageHeaders headers = message.getHeaders(); List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class); List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class); List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class); Map<TopicPartition, Long> offsetsToReset = new HashMap<>(); return null; } }; }
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM