kafka "HelloWorld"實踐


  前面我們分別介紹了kafka的相關基本原理,kafka的集群服務器搭建以及kafka相關的配置,本文綜合前面的理論知識,運用kafka Java API實現一個簡單的客戶端Demo。

開發環境

  • 操作系統:MacOS 10.12.3
  • 開發平台:Eclipse Neon.2 Release (4.6.2)
  • JDK: java version 1.8.0_121
  • zookeeper: zookeeper-3.4.9
  • kafka: kafka-2.10-0.10.2.0

項目的建立與實現

  首先為大家展示一下項目最終的結構圖,如下:

  下面開始建立項目:

  • 首先建立一個基本的Maven Java Project 項目框架,項目名稱為 kafkaDemo,建立項目流程參考:maven 基本框架搭建
  • 然后修改pom.xml文件內容,為項目引入kafka 客戶端jar包:
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.0</version>
    </dependency>

  添加完成后保存pom.xml,然后maven update project。當update完成后,maven依賴包里的jar包應該如上圖所示。

  下面分別添加producer和consumer客戶端代碼。

  在src/main/java目錄下新建package,命名為 com.unionpay.producer。由於kafka producer端有同步發送和異步發送之分,本項目將兩個示例都進行展示,首先編寫同步發送ProducerSync代碼。

  ProducerSync.java:

 1 package com.unionpay.producer;
 2 
 3 import java.util.Properties;
 4 
 5 import org.apache.kafka.clients.producer.KafkaProducer;
 6 import org.apache.kafka.clients.producer.Producer;
 7 import org.apache.kafka.clients.producer.ProducerRecord;
 8 
 9 
10 public class ProducerSync {
11 
12     private static final String TOPIC = "my-replicated-topic";
13     public static void main(String[] args) {
14         // TODO Auto-generated method stub
15 
16         Properties properties = new Properties();
17         //客戶端用於建立與kafka集群連接的host:port組,如果有多個broker,則用“,”隔開
18 //        "host1:port1,host2:port2,host3,post3"
19         properties.put("bootstrap.servers", "127.0.0.1:9092");
20 
21 //        producer在向servers發送信息后,是否需要serveres向客戶端(producer)反饋接受消息狀態用此參數配置
22 //        acks=0:表示producer不需要等待集群服務器發送的確認消息;acks=1:表示producer需要等到topic對應的leader發送的消息確認;
23 //        acks=all:表示producer需要等到leader以及所有followers的消息確認,這是最安全的消息保障機制
24         properties.put("acks", "all");
25         properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
26         properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
27         properties.put("buffer.memory", "33554432");
28         
29         Producer<String,String> producer = new KafkaProducer<String,String>(properties);
30         
31         for(int i=0;i<100;i++){
32             
33             String message = "Sync : this is the " + i + "th message for test!";
34             ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC, message);
35             producer.send(producerRecord);
36             
37             try {
38                 Thread.sleep(1000);
39             } catch (InterruptedException e) {
40                 // TODO Auto-generated catch block
41                 e.printStackTrace();
42             }
43         }
44         
45         producer.close();
46         
47     }
48 
49 }

  然后編寫異步ProducerAsync代碼。

  ProducerAsync.java:

 1 package com.unionpay.producer;
 2 
 3 import java.util.Properties;
 4 
 5 import org.apache.kafka.clients.producer.KafkaProducer;
 6 import org.apache.kafka.clients.producer.Producer;
 7 import org.apache.kafka.clients.producer.ProducerRecord;
 8 
 9 public class ProducerAsync {
10 
11     private static final String TOPIC = "my-replicated-topic";
12     public static void main(String[] args) {
13         // TODO Auto-generated method stub
14         
15         Properties  props = new Properties();
16         props.put("bootstrap.servers", "127.0.0.1:9092");
17         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
18         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
19         props.put("producer.type", "async");
20         props.put("batch.size", "16384");
21         
22         Producer<String,String> producer = new KafkaProducer<String,String>(props);
23         
24         for(int i=0;i<100;i++){
25             
26             String message = "Async : this is the " + i + "th message for test!";
27             
28             ProducerRecord producerRecord = new ProducerRecord(TOPIC, message);
29             producer.send(producerRecord);
30             
31             try {
32                 Thread.sleep(1000);
33             } catch (InterruptedException e) {
34                 // TODO Auto-generated catch block
35                 e.printStackTrace();
36             }
37         }
38         
39         producer.close();
40     }
41 }

  從兩個代碼文件比對來看,異步中多了一句配置語句props.put("producer.type", "async");

  然后編寫consumer端代碼

  GroupConsumer.java:

 1 package com.unionpay.consumer;
 2 
 3 import java.util.Arrays;
 4 import java.util.Properties;
 5 
 6 import org.apache.kafka.clients.consumer.ConsumerRecord;
 7 import org.apache.kafka.clients.consumer.ConsumerRecords;
 8 import org.apache.kafka.clients.consumer.KafkaConsumer;
 9 
10 public class GroupConsumer {
11     
12     private static final String BROKER = "127.0.0.1:9092";
13     private static final String TOPIC = "my-replicated-topic";
14     
15 
16     public static void main(String[] args) {
17         // TODO Auto-generated method stub
18 
19         Properties props = new Properties();
20         props.put("bootstrap.servers",BROKER);
21 //        用來唯一標識consumer進程所在組的字符串,如果設置同樣的group id,表示這些processes都是屬於同一個consumer group
22         props.put("group.id", "group1");
23 //        如果為真,consumer所fetch的消息的offset將會自動的同步到zookeeper。這項提交的offset將在進程掛掉時,由新的consumer使用
24         props.put("enable.auto.commit", "true");
25 //        consumer向zookeeper提交offset的頻率
26         props.put("auto.commit.interval.ms", "1000");
27         props.put("session.timeout.ms", "30000");
28         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
29         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
30         
31         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
32         
33 //        訂閱topic,可以為多個用,隔開Arrays.asList("topic1","topic2");
34         consumer.subscribe(Arrays.asList(TOPIC));
35         
36         while(true){
37             ConsumerRecords<String,String> consumerRecords = consumer.poll(100);
38             
39             for(ConsumerRecord<String,String> consumerRecord : consumerRecords){
40                 System.out.println(consumerRecord.value());
41             }
42         }
43     }
44 }

  到目前為止,我們的項目建立完成啦,下面啟動zookeeper集群服務器,啟動kafka集群服務器:

//啟動zookeeper集群服務器
cd ~/DevelopEnvironment/zookeeper-3.4.9-kafka/bin
./zkServer.sh start

//啟動kafka集群服務器
cd ~/DevelopEnvironment/kafka_2.10-0.10.2.0/bin
./kafka-server-start.sh ../config/server.properties 
./kafka-server-start.sh ../config/server-1.properties 
./kafka-server-start.sh ../config/server-2.properties 

   當zookeeper集群服務器和kafka集群服務器啟動成功后,然后分別運行GroupConsumer.java和ProducerAsync.java,客戶端獲取如下信息:

  然后運行ProducerSync.java,客戶端獲取如下信息:

  到此,游戲結束,我們的kafka API 使用demo介紹到此結束。

 

  

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM