Kafka消費者 從Kafka中讀取數據並寫入文件


Kafka消費者 從Kafka中讀取數據

最近有需求要從kafak上消費讀取實時數據,並將數據中的key輸出到文件中,用於發布端的原始點進行比對,以此來確定是否傳輸過程中有遺漏數據。

不廢話,直接上代碼,公司架構設計 kafak 上有多個TOPIC,此代碼每次需要指定一個TOPIC,一個TOPIC有3個分區Partition,所以消費的時候用多線程,

讀取數據過程中直接過濾重復的key點,因為原始推送點有20W的量(可能發生在一秒或者幾秒)。當時我直接用的HASHMAP來過濾。

1、ConsumerGroup 

 1 import java.util.ArrayList;
 2 import java.util.HashMap;
 3 import java.util.List;
 4 
 5 public class ConsumerGroup {
 6     private List<ConsumerRunnable> consumers;
 7 
 8     public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList,HashMap<String,String> points) {
 9         consumers = new ArrayList<>(consumerNum);
10         for (int i = 0; i < consumerNum; ++i) {
11             ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic, points);
12             consumers.add(consumerThread);
13         }
14     }
15 
16     public void execute() {
17         for (ConsumerRunnable task : consumers) {
18             new Thread(task).start();
19         }
20     }
21 }

 

2、ConsumerRunnable

 1 import com.google.gson.JsonArray;
 2 import com.google.gson.JsonObject;
 3 import com.google.gson.JsonParser;
 4 import org.apache.kafka.clients.consumer.ConsumerRecord;
 5 import org.apache.kafka.clients.consumer.ConsumerRecords;
 6 import org.apache.kafka.clients.consumer.KafkaConsumer;
 7 
 8 import java.util.Arrays;
 9 import java.util.HashMap;
10 import java.util.Properties;
11 
12 public class ConsumerRunnable implements Runnable {
13 
14     // 每個線程維護私有的KafkaConsumer實例
15     private final KafkaConsumer<String, String> consumer;
16 
17     HashMap<String,String> points = new HashMap<>();
18 
19     public ConsumerRunnable(String brokerList, String groupId, String topic,HashMap<String,String> nodepoint) {
20         Properties props = new Properties();
21         props.put("bootstrap.servers", brokerList);
22         props.put("group.id", groupId);
23         props.put("enable.auto.commit", "true");        //本例使用自動提交位移
24         props.put("auto.commit.interval.ms", "1000");
25         props.put("session.timeout.ms", "30000");
26         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
27         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
28         this.consumer = new KafkaConsumer<>(props);
29         consumer.subscribe(Arrays.asList(topic));   // 本例使用分區副本自動分配策略
30         points = nodepoint;
31     }
32 
33     @Override
34     public void run() {
35         while (true) {
36             ConsumerRecords<String, String> records = consumer.poll(200);
37             for (ConsumerRecord<String, String> record : records) {
38 //                System.out.printf("Partition = %s , offset = %d, key = %s, value=%s",record.partition(),record.offset(),record.key(),record.value());
39 
40                 JsonParser parse = new JsonParser();
41                 JsonObject jsonObject = (JsonObject) parse.parse(record.value());
42                 JsonArray jsonArray = jsonObject.get("list").getAsJsonArray();
43                 for (int i=0 ;i <jsonArray.size();i++){
44                     JsonObject subject = jsonArray.get(i).getAsJsonObject();
45                     String cedian = subject.get("id").getAsString().trim();
46                     if(points.containsKey(cedian) == false){
47                         points.put(cedian,cedian);
48                         WriterDataFile.writeData(cedian);
49                     }
50 
51 //                    System.out.println(subject.get("id").getAsString());
52                 }
53             }
54         }
55     }
56 }

 

3、ConsumerTest 

 1 import java.util.HashMap;
 2 
 3 public class ConsumerTest {
 4 
 5 
 6 
 7     public static void main(String[] args) {
 8         String brokerList = "172.16.10.22:9092,172.16.10.23:9092,172.16.10.21:9092";
 9         String groupId = "test20190722";
10         String topic = "SDFD";
11         int consumerNum = 3;
12 
13         HashMap<String,String> points = new HashMap<>();
14 
15         ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList,points);
16         consumerGroup.execute();
17 
18 
19 
20 
21     }
22 }

4、WriterDataFile

 1 import java.io.*;
 2 import java.util.HashMap;
 3 
 4 public class WriterDataFile {
 5 
 6    private static String path = "E:\\kafkadata_SDFD.txt";
 7 
 8     public static void writeData(String strvalue){
 9         FileWriter fw ;
10         try {
11             fw = new FileWriter(path,true);
12             BufferedWriter bw = new BufferedWriter(fw);
13             bw.write(strvalue+"\r\n");
14             bw.flush();
15             bw.close();
16             fw.close();
17         } catch (IOException e) {
18             e.printStackTrace();
19         }
20 
21     }
22 
23 }

 

都是基礎寫法,沒有時間整理,如有不合理處請諒解。

碼字不易...


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM