Spring Kafka(8)配置消息過濾器


消息過濾器

消息過濾器可以在消息抵達監聽容器前被攔截,過濾器根據系統業務邏輯去篩選出需要的數據再交由KafkaListener處理。

配置消息其實是非常簡單的額,只需要為監聽容器工廠配置一個RecordFilterStrategy(消息過濾策略),

返回true的時候消息將會被拋棄,返回false時,消息能正常抵達監聽容器。

這里我們將消息轉換為long類型,判斷該消息為基數還是偶數,把所有基數過濾,監聽容器只接收偶數。

@Component public class FilterListener { private static final Logger log= LoggerFactory.getLogger(TaskListener.class); @Autowired private ConsumerFactory consumerFactory; @Bean public ConcurrentKafkaListenerContainerFactory filterContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); //配合RecordFilterStrategy使用,被過濾的信息將被丟棄
        factory.setAckDiscarded(true); factory.setRecordFilterStrategy(new RecordFilterStrategy() { @Override public boolean filter(ConsumerRecord consumerRecord) { long data = Long.parseLong((String) consumerRecord.value()); log.info("filterContainerFactory filter : "+data); if (data % 2 == 0) { return false; } //返回true將會被丟棄
                return true; } }); return factory; } @KafkaListener(id = "filterCons", topics = "topic.quick.filter",containerFactory = "filterContainerFactory") public void filterListener(String data) { //這里做數據持久化的操作
        log.error("topic.quick.filter receive : " + data); } }
View Code

測試方法中,我們將當前時間戳發送到Kafka中。

 @Test public void testFilter() throws InterruptedException { kafkaTemplate.send("topic.quick.filter", System.currentTimeMillis()+""); }
View Code

額,這篇有點短,實在是不知道要講什么了,Kafka官方文檔也是一段描述,連代碼都沒有,后期想到了再補充吧。。。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM