Kafka提供了兩種Consumer API,分別是:High Level Consumer API 和 Lower Level Consumer API(Simple Consumer API)
High Level Consumer API:高度抽象的Kafka消費者API;將底層具體獲取數據、更新offset、設置偏移量等操作屏蔽掉,直接將操作數據流的處理工作提供給編寫程序的人員。優點是:操作簡單;缺點:可操作性太差,無法按照自己的業務場景選擇處理方式。(入口類:ConsumerConnector)
Lower Level Consumer API:通過直接操作底層API獲取數據的方式獲取Kafka中的數據,需要自行給定分區、偏移量等屬性。優點:可操作性強;缺點:代碼相對而言比較復雜。(入口類:SimpleConsumer)
這里主要將High Level Consumer API使用Java代碼實現並測試:
Lower Level Consumer API詳見博客:[Kafka] - Kafka Java Consumer實現(一)
========================================================================
一、JavaKafkaConsumerHighAPI:使用Kafka High Level Consumer API多線程讀取數據的相關API實現,具體代碼如下:
import kafka.consumer.*; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 自定義簡單Kafka消費者, 使用高級API * Created by gerry on 12/21. */ public class JavaKafkaConsumerHighAPI implements Runnable { /** * Kafka數據消費對象 */ private ConsumerConnector consumer; /** * Kafka Topic名稱 */ private String topic; /** * 線程數量,一般就是Topic的分區數量 */ private int numThreads; /** * 線程池 */ private ExecutorService executorPool; /** * 構造函數 * * @param topic Kafka消息Topic主題 * @param numThreads 處理數據的線程數/可以理解為Topic的分區數 * @param zookeeper Kafka的Zookeeper連接字符串 * @param groupId 該消費者所屬group ID的值 */ public JavaKafkaConsumerHighAPI(String topic, int numThreads, String zookeeper, String groupId) { // 1. 創建Kafka連接器 this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeper, groupId)); // 2. 數據賦值 this.topic = topic; this.numThreads = numThreads; } @Override public void run() { // 1. 指定Topic Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(this.topic, this.numThreads); // 2. 指定數據的解碼器 StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); // 3. 獲取連接數據的迭代器對象集合 /** * Key: Topic主題 * Value: 對應Topic的數據流讀取器,大小是topicCountMap中指定的topic大小 */ Map<String, List<KafkaStream<String, String>>> consumerMap = this.consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); // 4. 從返回結果中獲取對應topic的數據流處理器 List<KafkaStream<String, String>> streams = consumerMap.get(this.topic); // 5. 創建線程池 this.executorPool = Executors.newFixedThreadPool(this.numThreads); // 6. 構建數據輸出對象 int threadNumber = 0; for (final KafkaStream<String, String> stream : streams) { this.executorPool.submit(new ConsumerKafkaStreamProcesser(stream, threadNumber)); threadNumber++; } } public void shutdown() { // 1. 關閉和Kafka的連接,這樣會導致stream.hashNext返回false if (this.consumer != null) { this.consumer.shutdown(); } // 2. 關閉線程池,會等待線程的執行完成 if (this.executorPool != null) { // 2.1 關閉線程池 this.executorPool.shutdown(); // 2.2. 等待關閉完成, 等待五秒 try { if (!this.executorPool.awaitTermination(5, TimeUnit.SECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly!!"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly!!"); } } } /** * 根據傳入的zk的連接信息和groupID的值創建對應的ConsumerConfig對象 * * @param zookeeper zk的連接信息,類似於:<br/> * hadoop-senior01.ibeifeng.com:2181,hadoop-senior02.ibeifeng.com:2181/kafka * @param groupId 該kafka consumer所屬的group id的值, group id值一樣的kafka consumer會進行負載均衡 * @return Kafka連接信息 */ private ConsumerConfig createConsumerConfig(String zookeeper, String groupId) { // 1. 構建屬性對象 Properties prop = new Properties(); // 2. 添加相關屬性 prop.put("group.id", groupId); // 指定分組id prop.put("zookeeper.connect", zookeeper); // 指定zk的連接url prop.put("zookeeper.session.timeout.ms", "400"); // prop.put("zookeeper.sync.time.ms", "200"); prop.put("auto.commit.interval.ms", "1000"); // 3. 構建ConsumerConfig對象 return new ConsumerConfig(prop); } /** * Kafka消費者數據處理線程 */ public static class ConsumerKafkaStreamProcesser implements Runnable { // Kafka數據流 private KafkaStream<String, String> stream; // 線程ID編號 private int threadNumber; public ConsumerKafkaStreamProcesser(KafkaStream<String, String> stream, int threadNumber) { this.stream = stream; this.threadNumber = threadNumber; } @Override public void run() { // 1. 獲取數據迭代器 ConsumerIterator<String, String> iter = this.stream.iterator(); // 2. 迭代輸出數據 while (iter.hasNext()) { // 2.1 獲取數據值 MessageAndMetadata value = iter.next(); // 2.2 輸出 System.out.println(this.threadNumber + ":" + ":" + value.offset() + value.key() + ":" + value.message()); } // 3. 表示當前線程執行完成 System.out.println("Shutdown Thread:" + this.threadNumber); } } }
二、JavaKafkaConsumerHighAPITest:測試類
/** * Created by ibf on 12/21. */ public class JavaKafkaConsumerHighAPITest { public static void main(String[] args) { String zookeeper = "192.168.187.146:2181"; String groupId = "group1"; String topic = "test2"; int threads = 1; JavaKafkaConsumerHighAPI example = new JavaKafkaConsumerHighAPI(topic, threads, zookeeper, groupId); new Thread(example).start(); // 執行10秒后結束 int sleepMillis = 600000; try { Thread.sleep(sleepMillis); } catch (InterruptedException e) { e.printStackTrace(); } // 關閉 example.shutdown(); } }
三、運行測試截圖
Kafka相關命令可以參考博客[Kafka] - Kafka基本操作命令, 測試截圖如下:
至此,開發基本完成
========================================================
四、Kafka Pom文件依賴
<properties> <kafka.version>0.8.2.1</kafka.version> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>${kafka.version}</version> </dependency> </dependencies>