前言:
kafka一些常用命令:
cd /opt/module/kafka
查看kafka主題:
./kafka-topics.sh --list --zookeeper localhost:2181
查看主題詳情
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafkaTopic1
kafka消費端數據過濾方案:
在生產端不做配置,只管按特定主題生產數據。
在消費端,對特定主題數據進行分組獲取,然后在獲取過程中對符合業務條件的數據進行處理,否則跳過,但還是會告訴kafka我已經消費過了。
示例代碼:
生產端:
public String hello() { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.11.101:9092"); props.put("acks", "all"); props.put("retries", "5"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //生產者實例 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); x++; producer.send(new ProducerRecord<String, String>(topic, "key1", "key1->value1:"+x)); producer.send(new ProducerRecord<String, String>(topic, "key1", "key1->value11:"+x)); producer.send(new ProducerRecord<String, String>(topic, "key2", "key2->value2:"+x)); producer.send(new ProducerRecord<String, String>(topic, "key2", "key2->value22:"+x)); producer.send(new ProducerRecord<String, String>(topic, "key3", "key3->value3:"+x)); producer.send(new ProducerRecord<String, String>(topic, "key3", "key3->value33:"+x)); return "ok"; }
消費端:
@KafkaListeners(value = {@KafkaListener(topics = {"kafkaTopic1"},groupId = "group1")})
public void aaaa(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
String processRecord = (String) record.value();
String x = (String)record.key();
if(x.equals("key1"))
{
System.out.printf("group1獨立獲取key1:"+processRecord+"\r\n");
}
acknowledgment.acknowledge();
}
@KafkaListeners(value = {@KafkaListener(topics = {"kafkaTopic1"},groupId = "group2")})
public void bbbb(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
String processRecord = (String) record.value();
String x = (String)record.key();
if(x.equals("key2"))
{
System.out.printf("group2獨立獲取key2:"+processRecord+"\r\n");
}
acknowledgment.acknowledge();
}
@KafkaListeners(value = {@KafkaListener(topics = {"kafkaTopic1"},groupId = "group3")})
public void ccc(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
String processRecord = (String) record.value();
String x = (String)record.key();
if(x.equals("key3"))
{
System.out.printf("group3獨立獲取key3:"+processRecord+"\r\n");
}
acknowledgment.acknowledge();
}
其中,group1和group2在調試環境下運行,group3打包成jar后,在命令行窗口中運行,在調用生產端產生數據后,消費端的消費情況如下:

完全符合預期。
