從0開始搭建kafka客戶端


上一節,我們實現了搭建kafka集群。本節我們將從0開始,使用Java,搭建kafka客戶端生產消費模型。

1.創建maven項目2.kafka producer3.kafka consumer4.結果生產者:消費者:可能遇到的坑:最后:

1.創建maven項目

   首先我們使用idea創建項目。


  這里我們使用maven來管理jar包,所以創建的是一個maven項目。

  然后輸入GroupId和ArtifactId即可,這兩個id在maven中相當於“坐標”,其中ArtifactId是你的項目名。

  這時候,一個maven項目就創建完成了。但是maven還需要配置,我們在idea中找到Preferences(mac系統快捷鍵:command + ,),搜索maven,接着配置 maven home directory(maven安裝路徑), User settings file(maven 配置文件所在位置 settings.xml), Local repository(本地倉庫位置,在setting.xml中配置),配置完成后,點擊APPLY即可。到此,一個maven項目就配置完成了。
  最后,我們需要在pom.xml中配置kafka依賴的坐標

 

1 <dependencies>
2    <dependency>
3        <groupId>org.apache.kafka</groupId>
4        <artifactId>kafka-clients</artifactId>
5        <version>0.11.0.3</version>
6    </dependency>
7</dependencies>

2.kafka producer

  接下來,我們要對kafka中的生產者進行開發。在開發之前,要保證我們kafka服務處於可用的狀態。
  生產者程序如下:

 1public class Producer {
2    public static void main(String[] args) {
3        Properties props = new Properties();
4        props.put("bootstrap.servers""localhost:9092,localhost:9093");
5        props.put("key.serializer",
6                "org.apache.kafka.common.serialization.StringSerializer");
7        props.put("value.serializer",
8                "org.apache.kafka.common.serialization.StringSerializer");
9        props.put("acks""-1");
10        props.put("retries"3);
11        props.put("batch.size"232840);
12        props.put("linger.ms"10);
13        props.put("buffer.memory"33554432);
14        props.put("max.block.ms"3000);
15        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
16        for (int i = 0; i < 100; i++) {
17            producer.send(new ProducerRecord<String, String>(
18                    "my-topic", Integer.toString(i), Integer.toString(i)), new Callback() {
19                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
20                    if (e == null) {
21                        System.out.println("消息發送成功");
22                    } else {
23                        System.out.println(String.format("消息發送失敗: %s", e.getMessage()));
24                    }
25                }
26            });
27        }
28        producer.close();
29    }
30}

3.kafka consumer

  接下來是kafka中的消費者代碼。

 1public class Consumer {
2    public static void main(String[] args) {
3        String topicName = "my-topic";
4        String groupId = "test-group";
5
6        Properties props = new Properties();
7        props.put("bootstrap.servers""localhost:9092,localhost:9093");
8        props.put("group.id", groupId);
9        props.put("enable.auto.commit""true");
10        props.put("auto.commit.interval.ms""1000");
11        props.put("auto.offset.reset""earliest");
12        props.put("key.deserializer",
13                "org.apache.kafka.common.serialization.StringDeserializer");
14        props.put("value.deserializer",
15                "org.apache.kafka.common.serialization.StringDeserializer");
16        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
17        // 訂閱主題
18        consumer.subscribe(Collections.singletonList(topicName));
19        try {
20            while (true) {
21                ConsumerRecords<String, String> records = consumer.poll(1000);
22                for (ConsumerRecord<String, String> record : records) {
23                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
24                }
25            }
26        } finally {
27            consumer.close();
28        }
29    }
30}

4.結果

  最后是對代碼進行測試。依次啟動生產者、消費者實例,觀察控制台輸出接口。

生產者:

1消息發送成功
2...
3消息發送成功

消費者:

1offset = 0, key = 0, value = 0
2...
3offset = 99, key = 99, value = 99

可能遇到的坑:

  如果你的程序出錯,請首先檢查代碼props是否正確,其次應該確認你的kafka client和kafka server 版本相同,最后bootstrap.servers這個參數的配置值,要與kafka中server.properties中一致,否則將會出現獲取不到元數據信息的異常。(消息發送失敗: Failed to update metadata after 3000 ms.)

最后:

  上述代碼,只是實現了一個最小的生產消費模型,寫法上並不規范(比如配置應該寫在配置文件中、對於異常應該有處理方法,不能只是輸出日志),不能直接使用在生產環境中。只能用於kafka入門學習

  最后,期待您的訂閱和點贊,專欄每周都會更新,希望可以和您一起進步,同時也期待您的批評與指正!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM