异常处理
代码异常十之八九,十段代码九个bug,哈哈哈哈。
平常程序异常我们使用try catch捕获异常,在catch方法中根据异常类型进行相关处理,
既然我们可以使用try catch处理异常,那为什么还要使用ConsumerAwareErrorHandler异常处理器去处理异常呢?
首先,KafkaListener要做的事只是监听Topic中的数据并消费,如果在KafkaListener中还需要对异常进行处理则会显得代码块非常臃肿不利于维护,
我们可以把异常处理的这些代码抽象出来,构造成一个异常处理器,KafkaListener中所抛出的异常都会经过ConsumerAwareErrorHandler异常处理器进行处理,
这样就非常方便我们进行后期维护,比如后期更改异常处理业务的时候,
只需要修改ConsumerAwareErrorHandler处理器就行了,而不需要KafkaListener的一堆代码中去修改代码。这也是一种思想的体现。
单消息消费异常处理器
这里主要就是注册一个ConsumerAwareListenerErrorHandler 类型的异常处理器,bean的注册默认使用的是方法名,
所以我们将这个异常处理的BeanName放到@KafkaListener注解的errorHandler属性里面。
当KafkaListener抛出异常的时候,则会自动调用异常处理器。

@Component public class ErrorListener { private static final Logger log= LoggerFactory.getLogger(ErrorListener.class); @KafkaListener(id = "err", topics = "topic.quick.error", errorHandler = "consumerAwareErrorHandler") public void errorListener(String data) { log.info("topic.quick.error receive : " + data); throw new RuntimeException("fail"); } @Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() { return new ConsumerAwareListenerErrorHandler() { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) { log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString()); return null; } }; } }
编写测试方法,发送一条消息到topic.quick.error中,运行测试方法后我们可以看到异常处理器已经能正常使用了。

@Autowired private KafkaTemplate kafkaTemplate; @Test public void testErrorHandler() { kafkaTemplate.send("topic.quick.error", "test error handle"); }

2018-09-14 11:42:05.099 INFO 8912 --- [ err-0-C-1] com.viu.kafka.listen.ErrorListener : topic.quick.error receive : test error handle 2018-09-14 11:42:05.101 INFO 8912 --- [ err-0-C-1] com.viu.kafka.listen.ErrorListener : consumerAwareErrorHandler receive : test error handle
批量消费异常处理器
批量消费代码也是差不多的,只不过传递过来的数据都是List集合方式,这里就不做其他代码的展示了。

@Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() { return new ConsumerAwareListenerErrorHandler() { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) { log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString()); MessageHeaders headers = message.getHeaders(); List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class); List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class); List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class); Map<TopicPartition, Long> offsetsToReset = new HashMap<>(); return null; } }; }