使用kafka單筆消費時,listener函數寫成如下方式是可以正常執行的
@KafkaListener(topics = "dealmsg", id = "dealmsggroup") public String ReciveMsg(ConsumerRecord<?, ?> record, KafkaConsumer<?,?> consumer)
當改為批量消費時,listener函數寫成如下方式會報錯:A parameter of type 'List<ConsumerRecord>' must be the only parameter
@KafkaListener(topics = "dealmsg", id = "dealmsggroup") public String ReciveMsg(List<ConsumerRecord<?, ?>> records, KafkaConsumer<?,?> consumer)
報錯截圖
先說解決方案,把KafkaConsumer<?,?> consumer參數改成Consumer<?,?> consumer就可以了。
@KafkaListener(topics = "dealmsg", id = "dealmsggroup") public String ReciveMsg(List<ConsumerRecord<?, ?>> records, Consumer<?,?> consumer)
分析過程如下:
在哪報的錯,就去哪里看看代碼,找到org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.determineInferredType(MessagingMessageListenerAdapter.java:593)這一行,如下圖所示,分析代碼對於輸入參數為ConsumerRecords或List<ConsumerRecord>或List<Message<?>>,有效的參數個數不能小於輸入參數個數。輸入的參數為2,推導出的允許參數只有1,所以報錯了。
再分析整個determineInferredType函數,該函數根據輸入的參數來推斷類型,判斷輸入的每個參數的類型,是否允許的類型,即ConsumerRecord、或List<ConsumerRecord>、或Acknowledgment、或Consumer等,而KafkaConsumer不是有效類型。