異常處理
代碼異常十之八九,十段代碼九個bug,哈哈哈哈。
平常程序異常我們使用try catch捕獲異常,在catch方法中根據異常類型進行相關處理,
既然我們可以使用try catch處理異常,那為什么還要使用ConsumerAwareErrorHandler異常處理器去處理異常呢?
首先,KafkaListener要做的事只是監聽Topic中的數據並消費,如果在KafkaListener中還需要對異常進行處理則會顯得代碼塊非常臃腫不利於維護,
我們可以把異常處理的這些代碼抽象出來,構造成一個異常處理器,KafkaListener中所拋出的異常都會經過ConsumerAwareErrorHandler異常處理器進行處理,
這樣就非常方便我們進行后期維護,比如后期更改異常處理業務的時候,
只需要修改ConsumerAwareErrorHandler處理器就行了,而不需要KafkaListener的一堆代碼中去修改代碼。這也是一種思想的體現。
單消息消費異常處理器
這里主要就是注冊一個ConsumerAwareListenerErrorHandler 類型的異常處理器,bean的注冊默認使用的是方法名,
所以我們將這個異常處理的BeanName放到@KafkaListener注解的errorHandler屬性里面。
當KafkaListener拋出異常的時候,則會自動調用異常處理器。

@Component public class ErrorListener { private static final Logger log= LoggerFactory.getLogger(ErrorListener.class); @KafkaListener(id = "err", topics = "topic.quick.error", errorHandler = "consumerAwareErrorHandler") public void errorListener(String data) { log.info("topic.quick.error receive : " + data); throw new RuntimeException("fail"); } @Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() { return new ConsumerAwareListenerErrorHandler() { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) { log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString()); return null; } }; } }
編寫測試方法,發送一條消息到topic.quick.error中,運行測試方法后我們可以看到異常處理器已經能正常使用了。

@Autowired private KafkaTemplate kafkaTemplate; @Test public void testErrorHandler() { kafkaTemplate.send("topic.quick.error", "test error handle"); }

2018-09-14 11:42:05.099 INFO 8912 --- [ err-0-C-1] com.viu.kafka.listen.ErrorListener : topic.quick.error receive : test error handle 2018-09-14 11:42:05.101 INFO 8912 --- [ err-0-C-1] com.viu.kafka.listen.ErrorListener : consumerAwareErrorHandler receive : test error handle
批量消費異常處理器
批量消費代碼也是差不多的,只不過傳遞過來的數據都是List集合方式,這里就不做其他代碼的展示了。

@Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() { return new ConsumerAwareListenerErrorHandler() { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) { log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString()); MessageHeaders headers = message.getHeaders(); List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class); List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class); List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class); Map<TopicPartition, Long> offsetsToReset = new HashMap<>(); return null; } }; }