Spring Kafka(8)配置消息过滤器


消息过滤器

消息过滤器可以在消息抵达监听容器前被拦截,过滤器根据系统业务逻辑去筛选出需要的数据再交由KafkaListener处理。

配置消息其实是非常简单的额,只需要为监听容器工厂配置一个RecordFilterStrategy(消息过滤策略),

返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。

这里我们将消息转换为long类型,判断该消息为基数还是偶数,把所有基数过滤,监听容器只接收偶数。

@Component public class FilterListener { private static final Logger log= LoggerFactory.getLogger(TaskListener.class); @Autowired private ConsumerFactory consumerFactory; @Bean public ConcurrentKafkaListenerContainerFactory filterContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); //配合RecordFilterStrategy使用,被过滤的信息将被丢弃
        factory.setAckDiscarded(true); factory.setRecordFilterStrategy(new RecordFilterStrategy() { @Override public boolean filter(ConsumerRecord consumerRecord) { long data = Long.parseLong((String) consumerRecord.value()); log.info("filterContainerFactory filter : "+data); if (data % 2 == 0) { return false; } //返回true将会被丢弃
                return true; } }); return factory; } @KafkaListener(id = "filterCons", topics = "topic.quick.filter",containerFactory = "filterContainerFactory") public void filterListener(String data) { //这里做数据持久化的操作
        log.error("topic.quick.filter receive : " + data); } }
View Code

测试方法中,我们将当前时间戳发送到Kafka中。

 @Test public void testFilter() throws InterruptedException { kafkaTemplate.send("topic.quick.filter", System.currentTimeMillis()+""); }
View Code

额,这篇有点短,实在是不知道要讲什么了,Kafka官方文档也是一段描述,连代码都没有,后期想到了再补充吧。。。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM