Kafka的消費者提交方式手動同步提交、和異步提交


1、Kafka的消費者提交方式

  1)、自動提交,這種方式讓消費者來管理位移,應用本身不需要顯式操作。當我們將enable.auto.commit設置為true,那么消費者會在poll方法調用后每隔五秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一樣,自動提交也是由poll方法來驅動的,在調用poll方法的時候,消費者判斷是否到達提交時間,如果是則提交上一次poll返回的最大位移。需要注意的是,這種方式可能會導致消息重復消費,假如,某個消費者poll消息后,應用正在處理消息,在3秒后kafka進行了重平衡,那么由於沒有更新位移導致重平衡后這部分消息重復消費。

  2)、同步提交。

 1 package com.demo.kafka.consumer;
 2 
 3 import java.time.Duration;
 4 import java.util.Arrays;
 5 import java.util.Collections;
 6 import java.util.List;
 7 import java.util.Properties;
 8 import java.util.regex.Pattern;
 9 
10 import org.apache.kafka.clients.consumer.ConsumerConfig;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.consumer.KafkaConsumer;
14 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
15 import org.apache.kafka.clients.producer.ProducerConfig;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 
19 public class KafkaConsumerSimple {
20 
21     // 設置服務器地址
22     private static final String bootstrapServer = "192.168.110.142:9092";
23 
24     // 設置主題
25     private static final String topic = "topic-demo";
26 
27     // 設置主題
28     private static final String topic2 = "topic-demo2";
29 
30     // 設置消費者組
31     private static final String groupId = "group.demo";
32 
33     public static void main(String[] args) {
34         Properties properties = new Properties();
35         // 設置反序列化key參數信息
36         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
37         // 設置反序列化value參數信息
38         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
39 
40         // 設置服務器列表信息,必填參數,該參數和生產者相同,,制定鏈接kafka集群所需的broker地址清單,可以設置一個或者多個
41         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
42 
43         // 設置消費者組信息,消費者隸屬的消費組,默認為空,如果設置為空,則會拋出異常,這個參數要設置成具有一定業務含義的名稱
44         properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
45 
46         // 制定kafka消費者對應的客戶端id,默認為空,如果不設置kafka消費者會自動生成一個非空字符串。
47         properties.put("client.id", "consumer.client.id.demo");
48 
49         // 設置每次從最早的offset開始消費
50         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
51 
52         // 手動提交開啟
53         properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
54 
55         // 將參數設置到消費者參數中
56         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
57 
58         // 消息訂閱
59         // consumer.subscribe(Collections.singletonList(topic));
60         // 可以訂閱多個主題
61         // consumer.subscribe(Arrays.asList(topic, topic2));
62         // 可以使用正則表達式進行訂閱
63         // consumer.subscribe(Pattern.compile("topic-demo*"));
64 
65         // 指定訂閱的分區
66         TopicPartition topicPartition = new TopicPartition(topic, 0);
67         consumer.assign(Arrays.asList(topicPartition));
68 
69         // 初始化offset位移為-1
70         long lastConsumeOffset = -1;
71         while (true) {
72             // 每隔一秒監聽一次,拉去指定主題分區的消息
73             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
74             if (records.isEmpty()) {
75                 break;
76             }
77             // 獲取到消息
78             List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
79             // 獲取到消息的offset位移信息,最后消費的位移
80             lastConsumeOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
81             // System.out.println("the last offset is " + lastConsumeOffset);
82             // 同步提交消費位移
83             consumer.commitSync();
84         }
85         // 當前消費者最后一個消費的位置
86         System.out.println("consumed offset is " + lastConsumeOffset);
87         // 提交,下次消費從哪個位置開始
88         OffsetAndMetadata committed = consumer.committed(topicPartition);
89         System.out.println("committed offset is " + committed.offset());
90         // 下次消費從哪個位置開始
91         long position = consumer.position(topicPartition);
92         System.out.println("the offset of the next record is " + position);
93 
94     }
95 
96 }

  3)、異步提交方式。手動提交有一個缺點,就是當發起提交時調用應用會阻塞。當然我們可以減少手動提交的頻率,但這個會增加消息重復的概率(和自動提交一樣)。另外一個解決方法是,使用異步提交。但是異步提交也有一個缺點,那就是如果服務器返回提交失敗,異步提交不會進行重試。相比較起來,同步提交會進行重試知道成功或者最后拋出異常給應用。異步提交沒有實現重試是因為,如果同時存在多個異步提交,進行重試可能會導致位移覆蓋。比如,我們發起一個異步提交commitA,此時提交位移是2000,隨后又發起了一個異步提交commitB且位移為3000,commitA提交失敗但commitB提交失敗,此時commitA進行重試並成功的話,會將實際上已經提交的位移從3000回滾到2000,導致消息重復消費。

 1 package com.demo.kafka.consumer;
 2 
 3 import java.time.Duration;
 4 import java.util.Arrays;
 5 import java.util.Map;
 6 import java.util.Properties;
 7 import java.util.concurrent.atomic.AtomicBoolean;
 8 
 9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
14 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.serialization.StringDeserializer;
17 
18 public class KafkaConsumerAsyncSimple {
19 
20     private static AtomicBoolean running = new AtomicBoolean(true);
21 
22     // 設置服務器地址
23     private static final String bootstrapServer = "192.168.110.142:9092";
24 
25     // 設置主題
26     private static final String topic = "topic-demo";
27 
28     // 設置消費者組
29     private static final String groupId = "group.demo";
30 
31     public static void main(String[] args) {
32         Properties properties = new Properties();
33         // 設置反序列化key參數信息
34         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
35         // 設置反序列化value參數信息
36         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
37 
38         // 設置服務器列表信息,必填參數,該參數和生產者相同,,制定鏈接kafka集群所需的broker地址清單,可以設置一個或者多個
39         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
40 
41         // 設置消費者組信息,消費者隸屬的消費組,默認為空,如果設置為空,則會拋出異常,這個參數要設置成具有一定業務含義的名稱
42         properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
43 
44         // 制定kafka消費者對應的客戶端id,默認為空,如果不設置kafka消費者會自動生成一個非空字符串。
45         properties.put("client.id", "consumer.client.id.demo");
46 
47         // 設置每次從最早的offset開始消費
48         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
49 
50         // 將參數設置到消費者參數中
51         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
52         // 訂閱主題
53         consumer.subscribe(Arrays.asList(topic));
54 
55         try {
56             while (running.get()) {
57                 // 每隔一秒監聽一次,拉去指定主題分區的消息
58                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
59                 if (records.isEmpty()) {
60                     break;
61                 }
62                 for (ConsumerRecord<String, String> record : records) {
63                     System.out.println("我要開始消費了: " + record.toString());
64                 }
65 
66                 // 異步回調,適合消息量非常大,但是允許消息重復的
67                 consumer.commitAsync(new OffsetCommitCallback() {
68 
69                     @Override
70                     public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
71                         if (exception == null) {
72                             System.out.println("異步回調成功了,offset : " + offsets);
73                         } else {
74                             System.err.println("fail to commit offsets " + offsets + " , " + exception);
75                         }
76 
77                     }
78                 });
79 
80             }
81         } finally {
82             // 關閉客戶端
83             consumer.close();
84         }
85 
86     }
87 
88 }

 

2、指定位移消費,seek方法提供了這個功能,可以追蹤之前的消費或者回溯消費。

  1 package com.demo.kafka.consumer;
  2 
  3 import java.time.Duration;
  4 import java.util.Arrays;
  5 import java.util.Map;
  6 import java.util.Properties;
  7 import java.util.Set;
  8 import java.util.concurrent.atomic.AtomicBoolean;
  9 
 10 import org.apache.kafka.clients.consumer.ConsumerConfig;
 11 import org.apache.kafka.clients.consumer.ConsumerRecord;
 12 import org.apache.kafka.clients.consumer.ConsumerRecords;
 13 import org.apache.kafka.clients.consumer.KafkaConsumer;
 14 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 15 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 16 import org.apache.kafka.common.TopicPartition;
 17 import org.apache.kafka.common.serialization.StringDeserializer;
 18 
 19 public class KafkaConsumerSeekSimple {
 20 
 21     private static AtomicBoolean running = new AtomicBoolean(true);
 22 
 23     // 設置服務器地址
 24     private static final String bootstrapServer = "192.168.110.142:9092";
 25 
 26     // 設置主題
 27     private static final String topic = "topic-demo3";
 28 
 29     // 設置消費者組
 30     private static final String groupId = "group.demo";
 31 
 32     public static void main(String[] args) {
 33         Properties properties = new Properties();
 34         // 設置反序列化key參數信息
 35         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 36         // 設置反序列化value參數信息
 37         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 38 
 39         // 設置服務器列表信息,必填參數,該參數和生產者相同,,制定鏈接kafka集群所需的broker地址清單,可以設置一個或者多個
 40         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
 41 
 42         // 設置消費者組信息,消費者隸屬的消費組,默認為空,如果設置為空,則會拋出異常,這個參數要設置成具有一定業務含義的名稱
 43         properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
 44 
 45         // 制定kafka消費者對應的客戶端id,默認為空,如果不設置kafka消費者會自動生成一個非空字符串。
 46         properties.put("client.id", "consumer.client.id.demo");
 47 
 48         // 設置每次從最早的offset開始消費
 49         // properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 50 
 51         // 將參數設置到消費者參數中
 52         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
 53         // 訂閱主題
 54         consumer.subscribe(Arrays.asList(topic));
 55 
 56         // 獲取消費者所分配到的分區
 57         Set<TopicPartition> assignment = consumer.assignment();
 58         System.err.println("打印消費者獲取到的分區: " + assignment.toString());
 59 
 60         // timeout參數設置多少合適?太短會使分區分配失敗,太長有可能造成一些不必要的等待
 61         // 獲取到指定主題的消息
 62         consumer.poll(Duration.ofMillis(2000));
 63 
 64 //        for (TopicPartition topicPartition : assignment) {
 65 //            // 參數partition表示分區,offset表示指定從分區的那個位置開始消費
 66 //            // 方式一,可以指定位置進行消費
 67 //            consumer.seek(topicPartition, 3);
 68 //        }
 69 
 70         // 指定從分區末尾開始消費,方式二,可以從末端開始倒敘消費
 71         Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
 72         for (TopicPartition topicPartition : assignment) {
 73             System.err.println("打印消費者獲取到offset : " + ( endOffsets.get(topicPartition) + 1 ));
 74             consumer.seek(topicPartition, endOffsets.get(topicPartition) + 1);
 75         }
 76 
 77         try {
 78             while (running.get()) {
 79                 // 每隔一秒監聽一次,拉去指定主題分區的消息
 80                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
 81                 if (records.isEmpty()) {
 82                     break;
 83                 }
 84                 for (ConsumerRecord<String, String> record : records) {
 85                     System.out.println("我要開始消費了: " + record.toString());
 86                 }
 87 
 88                 // 異步回調,適合消息量非常大,但是允許消息重復的
 89                 consumer.commitAsync(new OffsetCommitCallback() {
 90 
 91                     @Override
 92                     public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
 93                         if (exception == null) {
 94                             System.out.println("異步回調成功了,offset : " + offsets);
 95                         } else {
 96                             System.err.println("fail to commit offsets " + offsets + " , " + exception);
 97                         }
 98 
 99                     }
100                 });
101 
102             }
103         } finally {
104             // 關閉客戶端
105             consumer.close();
106         }
107 
108     }
109 }

 

3、Kafka再均衡監聽器,再均衡是指分區的所屬從一個消費者轉移到另外一個消費者的行為,它為消費組具備了高可用性和伸縮性提供了保障,使得我們既方便又安全的刪除消費組內的消費者或者往消費組內添加消費者。不過再均衡期間,消費者是無法拉取消息的。

  1 package com.demo.kafka.consumer;
  2 
  3 import java.time.Duration;
  4 import java.util.Collection;
  5 import java.util.Collections;
  6 import java.util.HashMap;
  7 import java.util.Map;
  8 import java.util.Properties;
  9 
 10 import org.apache.kafka.clients.consumer.ConsumerConfig;
 11 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 12 import org.apache.kafka.clients.consumer.ConsumerRecord;
 13 import org.apache.kafka.clients.consumer.ConsumerRecords;
 14 import org.apache.kafka.clients.consumer.KafkaConsumer;
 15 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 16 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 17 import org.apache.kafka.common.TopicPartition;
 18 import org.apache.kafka.common.serialization.StringDeserializer;
 19 
 20 public class KafkaConsumerListenerSimple {
 21 
 22     // 設置服務器地址
 23     private static final String bootstrapServer = "192.168.110.142:9092";
 24 
 25     // 設置主題
 26     private static final String topic = "topic-demo";
 27 
 28     // 設置消費者組
 29     private static final String groupId = "group.demo";
 30 
 31     public static void main(String[] args) {
 32         Properties properties = new Properties();
 33         // 設置反序列化key參數信息
 34         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 35         // 設置反序列化value參數信息
 36         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 37 
 38         // 設置服務器列表信息,必填參數,該參數和生產者相同,,制定鏈接kafka集群所需的broker地址清單,可以設置一個或者多個
 39         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
 40 
 41         // 設置消費者組信息,消費者隸屬的消費組,默認為空,如果設置為空,則會拋出異常,這個參數要設置成具有一定業務含義的名稱
 42         properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
 43 
 44         // 制定kafka消費者對應的客戶端id,默認為空,如果不設置kafka消費者會自動生成一個非空字符串。
 45         properties.put("client.id", "consumer.client.id.demo");
 46 
 47         // 設置每次從最早的offset開始消費
 48         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 49 
 50         // 手動提交開啟
 51         properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 52 
 53         // 將參數設置到消費者參數中
 54         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
 55 
 56         // 消息訂閱
 57         // consumer.subscribe(Collections.singletonList(topic));
 58 
 59         // 如果發生消息重復消費或者消息丟失的情況,當一個分區的消費者發生變更的時候,kafka會出現再均衡
 60         // kafka提供了再均衡監聽器,可以處理自己的行為,發生再均衡期間,消費者無法拉取消息的。
 61         Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
 62         consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
 63 
 64             //
 65             @Override
 66             public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
 67                 // 盡量避免重復消費
 68                 consumer.commitSync(currentOffsets);// 同步位移的提交
 69             }
 70 
 71             //
 72             @Override
 73             public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
 74 
 75             }
 76 
 77         });
 78 
 79         while (true) {
 80             // 每隔一秒監聽一次,拉去指定主題分區的消息
 81             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
 82             if (records.isEmpty()) {
 83                 break;
 84             }
 85             for (ConsumerRecord<String, String> record : records) {
 86                 System.out.println(record.toString());
 87 
 88                 // 異步提交消息位移,在發生再均衡動作之前通過再均衡監聽器的onPartitionsRevoked回調執行commitSync方法同步提交位移
 89                 currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
 90                         new OffsetAndMetadata(record.offset() + 1));
 91             }
 92             // 消費者的消費異步提交很有可能出現消息丟失的情況,所以在拉取完消息之后可以將消息的offset位移進行記錄
 93             consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
 94 
 95                 @Override
 96                 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
 97                     if (exception == null) {
 98                         System.out.println("異步回調成功了,offset : " + offsets);
 99                     } else {
100                         System.err.println("fail to commit offsets " + offsets + " , " + exception);
101                     }
102                 }
103             });
104         }
105 
106         // 關閉客戶端
107         consumer.close();
108 
109     }
110 
111 }

 

4、Kafka消費者攔截器,消費者攔截器主要是在消息到消息或者在提交消息位移的時候進行一些定制化的操作。使用場景,對消費消息設置一個有效期的屬性,如果某條消息在既定的時間窗口內無法到達,那就視為無效,不需要再被處理。

 1 package com.demo.kafka.interceptor;
 2 
 3 import java.util.ArrayList;
 4 import java.util.HashMap;
 5 import java.util.List;
 6 import java.util.Map;
 7 
 8 import org.apache.kafka.clients.consumer.ConsumerInterceptor;
 9 import org.apache.kafka.clients.consumer.ConsumerRecord;
10 import org.apache.kafka.clients.consumer.ConsumerRecords;
11 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
12 import org.apache.kafka.common.TopicPartition;
13 
14 /**
15  * 
16  * @author 消費者攔截器
17  *
18  */
19 public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> {
20 
21     // 十秒鍾
22     private static final long EXPIRE_INTERVAL = 10 * 1000; // 10000
23 
24     @Override
25     public void configure(Map<String, ?> configs) {
26 
27     }
28 
29     @Override
30     public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
31         // 打印輸出消息
32         for (ConsumerRecord<String, String> record : records) {
33             System.out.println("==============================" + record.toString() + "==============================");
34         }
35 
36         // 獲取到當前時間
37         long now = System.currentTimeMillis();
38         // 創建一個map集合對象
39         Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<TopicPartition, List<ConsumerRecord<String, String>>>();
40         // 循環遍歷出消費者的消息分區
41         for (TopicPartition tp : records.partitions()) {
42             System.out.println(
43                     "==============獲取到的分區================" + tp.partition() + "==============================");
44             // 獲取到分區里面的消息
45             List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
46             // 創建一個集合對象newTpRecords
47             List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
48             // 循環遍歷消息
49             for (ConsumerRecord<String, String> record : tpRecords) {
50                 // 如果消息的時間戳大於當前時間超過10秒,就放到集合中
51                 if (now - record.timestamp() > EXPIRE_INTERVAL) {
52                     // 放到集合中
53                     newTpRecords.add(record);
54                 }
55             }
56             // 判斷是否為空
57             if (!newTpRecords.isEmpty()) {
58                 // 將分區和新的消息放到map集合中
59                 newRecords.put(tp, newTpRecords);
60             }
61         }
62 
63         for (Map.Entry<TopicPartition, List<ConsumerRecord<String, String>>> map : newRecords.entrySet()) {
64             for (int i = 0; i < map.getValue().size(); i++) {
65                 List<ConsumerRecord<String, String>> value = map.getValue();
66                 ConsumerRecord<String, String> consumerRecord = value.get(i);
67                 System.out.println("==============================" + consumerRecord.toString()
68                         + "==============================");
69             }
70         }
71 
72         return new ConsumerRecords<String, String>(newRecords);
73     }
74 
75     @Override
76     public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
77         offsets.forEach((tp, offset) -> System.out.println("獲取到的offset位移: " + tp + " : " + offset.offset()));
78     }
79 
80     @Override
81     public void close() {
82 
83     }
84 
85     public static void main(String[] args) {
86         Map<String, String> map = new HashMap<>();
87         map.put("zhangsan", "hello world zhangsan!!!");
88         map.put("lisi", "hello world lisi!!!");
89         map.put("wangwu", "hello world wangwu!!!");
90         map.put("zhaoliu", "hello world zhaoliu!!!");
91 
92         map.forEach((key, value) -> System.out.println("key : " + key + " , value : " + value));
93     }
94 
95 }

消費者配置監聽,如下所示:

 1 package com.demo.kafka.consumer;
 2 
 3 import java.time.Duration;
 4 import java.util.Collections;
 5 import java.util.Properties;
 6 
 7 import org.apache.kafka.clients.consumer.ConsumerConfig;
 8 import org.apache.kafka.clients.consumer.ConsumerRecord;
 9 import org.apache.kafka.clients.consumer.ConsumerRecords;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.common.serialization.StringDeserializer;
12 
13 import com.demo.kafka.interceptor.ConsumerInterceptorTTL;
14 
15 public class KafkaConsumerInterceptorSimple {
16 
17     // 設置服務器地址
18     private static final String bootstrapServer = "192.168.110.142:9092";
19 
20     // 設置主題
21     private static final String topic = "topic-demo3";
22 
23     // 設置消費者組
24     private static final String groupId = "group.demo";
25 
26     public static void main(String[] args) {
27         Properties properties = new Properties();
28         // 設置反序列化key參數信息
29         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
30         // 設置反序列化value參數信息
31         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
32 
33         // 設置服務器列表信息,必填參數,該參數和生產者相同,,制定鏈接kafka集群所需的broker地址清單,可以設置一個或者多個
34         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
35 
36         // 設置消費者組信息,消費者隸屬的消費組,默認為空,如果設置為空,則會拋出異常,這個參數要設置成具有一定業務含義的名稱
37         properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
38 
39         // 制定kafka消費者對應的客戶端id,默認為空,如果不設置kafka消費者會自動生成一個非空字符串。
40         properties.put("client.id", "consumer.client.id.demo");
41 
42         // 設置每次從最早的offset開始消費
43         // properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
44 
45         // 手動提交開啟
46         // properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
47 
48         // 指定消費者攔截器
49         properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorTTL.class.getName());
50         
51         // 將參數設置到消費者參數中
52         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
53 
54         // 消息訂閱
55         consumer.subscribe(Collections.singletonList(topic));
56 
57         while (true) {
58             // 每隔一秒監聽一次,拉去指定主題分區的消息
59             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
60             if (records.isEmpty()) {
61                 break;
62             }
63             for (ConsumerRecord<String, String> record : records) {
64                 System.out.println(record.toString());
65             }
66         }
67 
68     }
69 
70 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM