Spring Kafka(9)ConsumerAwareErrorHandler异常处理器


异常处理

代码异常十之八九,十段代码九个bug,哈哈哈哈。

平常程序异常我们使用try catch捕获异常,在catch方法中根据异常类型进行相关处理,

既然我们可以使用try catch处理异常,那为什么还要使用ConsumerAwareErrorHandler异常处理器去处理异常呢?

首先,KafkaListener要做的事只是监听Topic中的数据并消费,如果在KafkaListener中还需要对异常进行处理则会显得代码块非常臃肿不利于维护,

我们可以把异常处理的这些代码抽象出来,构造成一个异常处理器,KafkaListener中所抛出的异常都会经过ConsumerAwareErrorHandler异常处理器进行处理,

这样就非常方便我们进行后期维护,比如后期更改异常处理业务的时候,

只需要修改ConsumerAwareErrorHandler处理器就行了,而不需要KafkaListener的一堆代码中去修改代码。这也是一种思想的体现。

单消息消费异常处理器

这里主要就是注册一个ConsumerAwareListenerErrorHandler 类型的异常处理器,bean的注册默认使用的是方法名,

所以我们将这个异常处理的BeanName放到@KafkaListener注解的errorHandler属性里面。

当KafkaListener抛出异常的时候,则会自动调用异常处理器。

@Component public class ErrorListener { private static final Logger log= LoggerFactory.getLogger(ErrorListener.class); @KafkaListener(id = "err", topics = "topic.quick.error", errorHandler = "consumerAwareErrorHandler") public void errorListener(String data) { log.info("topic.quick.error  receive : " + data); throw new RuntimeException("fail"); } @Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() { return new ConsumerAwareListenerErrorHandler() { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) { log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString()); return null; } }; } }
View Code

编写测试方法,发送一条消息到topic.quick.error中,运行测试方法后我们可以看到异常处理器已经能正常使用了。

 @Autowired private KafkaTemplate kafkaTemplate; @Test public void testErrorHandler() { kafkaTemplate.send("topic.quick.error", "test error handle"); }
View Code
2018-09-14 11:42:05.099  INFO 8912 --- [      err-0-C-1] com.viu.kafka.listen.ErrorListener : topic.quick.error receive : test error handle 2018-09-14 11:42:05.101  INFO 8912 --- [      err-0-C-1] com.viu.kafka.listen.ErrorListener       : consumerAwareErrorHandler receive : test error handle
View Code

批量消费异常处理器

批量消费代码也是差不多的,只不过传递过来的数据都是List集合方式,这里就不做其他代码的展示了。

 @Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() { return new ConsumerAwareListenerErrorHandler() { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) { log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString()); MessageHeaders headers = message.getHeaders(); List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class); List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class); List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class); Map<TopicPartition, Long> offsetsToReset = new HashMap<>(); return null; } }; }
View Code

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM