前面我們分別介紹了kafka的相關基本原理,kafka的集群服務器搭建以及kafka相關的配置,本文綜合前面的理論知識,運用kafka Java API實現一個簡單的客戶端Demo。
開發環境
- 操作系統:MacOS 10.12.3
- 開發平台:Eclipse Neon.2 Release (4.6.2)
- JDK: java version 1.8.0_121
- zookeeper: zookeeper-3.4.9
- kafka: kafka-2.10-0.10.2.0
項目的建立與實現
首先為大家展示一下項目最終的結構圖,如下:
下面開始建立項目:
- 首先建立一個基本的Maven Java Project 項目框架,項目名稱為 kafkaDemo,建立項目流程參考:maven 基本框架搭建;
- 然后修改pom.xml文件內容,為項目引入kafka 客戶端jar包:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency>
添加完成后保存pom.xml,然后maven update project。當update完成后,maven依賴包里的jar包應該如上圖所示。
下面分別添加producer和consumer客戶端代碼。
在src/main/java目錄下新建package,命名為 com.unionpay.producer。由於kafka producer端有同步發送和異步發送之分,本項目將兩個示例都進行展示,首先編寫同步發送ProducerSync代碼。
ProducerSync.java:
1 package com.unionpay.producer; 2 3 import java.util.Properties; 4 5 import org.apache.kafka.clients.producer.KafkaProducer; 6 import org.apache.kafka.clients.producer.Producer; 7 import org.apache.kafka.clients.producer.ProducerRecord; 8 9 10 public class ProducerSync { 11 12 private static final String TOPIC = "my-replicated-topic"; 13 public static void main(String[] args) { 14 // TODO Auto-generated method stub 15 16 Properties properties = new Properties(); 17 //客戶端用於建立與kafka集群連接的host:port組,如果有多個broker,則用“,”隔開 18 // "host1:port1,host2:port2,host3,post3" 19 properties.put("bootstrap.servers", "127.0.0.1:9092"); 20 21 // producer在向servers發送信息后,是否需要serveres向客戶端(producer)反饋接受消息狀態用此參數配置 22 // acks=0:表示producer不需要等待集群服務器發送的確認消息;acks=1:表示producer需要等到topic對應的leader發送的消息確認; 23 // acks=all:表示producer需要等到leader以及所有followers的消息確認,這是最安全的消息保障機制 24 properties.put("acks", "all"); 25 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 26 properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 27 properties.put("buffer.memory", "33554432"); 28 29 Producer<String,String> producer = new KafkaProducer<String,String>(properties); 30 31 for(int i=0;i<100;i++){ 32 33 String message = "Sync : this is the " + i + "th message for test!"; 34 ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC, message); 35 producer.send(producerRecord); 36 37 try { 38 Thread.sleep(1000); 39 } catch (InterruptedException e) { 40 // TODO Auto-generated catch block 41 e.printStackTrace(); 42 } 43 } 44 45 producer.close(); 46 47 } 48 49 }
然后編寫異步ProducerAsync代碼。
ProducerAsync.java:
1 package com.unionpay.producer; 2 3 import java.util.Properties; 4 5 import org.apache.kafka.clients.producer.KafkaProducer; 6 import org.apache.kafka.clients.producer.Producer; 7 import org.apache.kafka.clients.producer.ProducerRecord; 8 9 public class ProducerAsync { 10 11 private static final String TOPIC = "my-replicated-topic"; 12 public static void main(String[] args) { 13 // TODO Auto-generated method stub 14 15 Properties props = new Properties(); 16 props.put("bootstrap.servers", "127.0.0.1:9092"); 17 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 18 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 19 props.put("producer.type", "async"); 20 props.put("batch.size", "16384"); 21 22 Producer<String,String> producer = new KafkaProducer<String,String>(props); 23 24 for(int i=0;i<100;i++){ 25 26 String message = "Async : this is the " + i + "th message for test!"; 27 28 ProducerRecord producerRecord = new ProducerRecord(TOPIC, message); 29 producer.send(producerRecord); 30 31 try { 32 Thread.sleep(1000); 33 } catch (InterruptedException e) { 34 // TODO Auto-generated catch block 35 e.printStackTrace(); 36 } 37 } 38 39 producer.close(); 40 } 41 }
從兩個代碼文件比對來看,異步中多了一句配置語句props.put("producer.type", "async");
然后編寫consumer端代碼
GroupConsumer.java:
1 package com.unionpay.consumer; 2 3 import java.util.Arrays; 4 import java.util.Properties; 5 6 import org.apache.kafka.clients.consumer.ConsumerRecord; 7 import org.apache.kafka.clients.consumer.ConsumerRecords; 8 import org.apache.kafka.clients.consumer.KafkaConsumer; 9 10 public class GroupConsumer { 11 12 private static final String BROKER = "127.0.0.1:9092"; 13 private static final String TOPIC = "my-replicated-topic"; 14 15 16 public static void main(String[] args) { 17 // TODO Auto-generated method stub 18 19 Properties props = new Properties(); 20 props.put("bootstrap.servers",BROKER); 21 // 用來唯一標識consumer進程所在組的字符串,如果設置同樣的group id,表示這些processes都是屬於同一個consumer group 22 props.put("group.id", "group1"); 23 // 如果為真,consumer所fetch的消息的offset將會自動的同步到zookeeper。這項提交的offset將在進程掛掉時,由新的consumer使用 24 props.put("enable.auto.commit", "true"); 25 // consumer向zookeeper提交offset的頻率 26 props.put("auto.commit.interval.ms", "1000"); 27 props.put("session.timeout.ms", "30000"); 28 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 29 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 30 31 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 32 33 // 訂閱topic,可以為多個用,隔開Arrays.asList("topic1","topic2"); 34 consumer.subscribe(Arrays.asList(TOPIC)); 35 36 while(true){ 37 ConsumerRecords<String,String> consumerRecords = consumer.poll(100); 38 39 for(ConsumerRecord<String,String> consumerRecord : consumerRecords){ 40 System.out.println(consumerRecord.value()); 41 } 42 } 43 } 44 }
到目前為止,我們的項目建立完成啦,下面啟動zookeeper集群服務器,啟動kafka集群服務器:
//啟動zookeeper集群服務器 cd ~/DevelopEnvironment/zookeeper-3.4.9-kafka/bin ./zkServer.sh start //啟動kafka集群服務器 cd ~/DevelopEnvironment/kafka_2.10-0.10.2.0/bin ./kafka-server-start.sh ../config/server.properties ./kafka-server-start.sh ../config/server-1.properties ./kafka-server-start.sh ../config/server-2.properties
當zookeeper集群服務器和kafka集群服務器啟動成功后,然后分別運行GroupConsumer.java和ProducerAsync.java,客戶端獲取如下信息:
然后運行ProducerSync.java,客戶端獲取如下信息:
到此,游戲結束,我們的kafka API 使用demo介紹到此結束。