Kafka提供了兩種Consumer API,分別是:High Level Consumer API 和 Lower Level Consumer API(Simple Consumer API)
High Level Consumer API:高度抽象的Kafka消費者API;將底層具體獲取數據、更新offset、設置偏移量等操作屏蔽掉,直接將操作數據流的處理工作提供給編寫程序的人員。優點是:操作簡單;缺點:可操作性太差,無法按照自己的業務場景選擇處理方式。(入口類:ConsumerConnector)
Lower Level Consumer API:通過直接操作底層API獲取數據的方式獲取Kafka中的數據,需要自行給定分區、偏移量等屬性。優點:可操作性強;缺點:代碼相對而言比較復雜。(入口類:SimpleConsumer)
這里主要將Lower Level Consumer API使用Java代碼實現並測試:
Hight Level Consumer API詳見博客:[Kafka] - Kafka Java Consumer實現(二)
===============================================================
一、KafkaBrokerInfo:自定義bean類,主要功能保存連接kafka的broker的元數據,比如host&port;代碼如下:
/** * Kafka服務器連接參數 * Created by gerry on 12/21. */ public class KafkaBrokerInfo { // 主機名 public final String brokerHost; // 端口號 public final int brokerPort; /** * 構造方法 * * @param brokerHost Kafka服務器主機或者IP地址 * @param brokerPort 端口號 */ public KafkaBrokerInfo(String brokerHost, int brokerPort) { this.brokerHost = brokerHost; this.brokerPort = brokerPort; } /** * 構造方法, 使用默認端口號9092進行構造 * * @param brokerHost */ public KafkaBrokerInfo(String brokerHost) { this(brokerHost, 9092); } }
二、KafkaTopicPartitionInfo:自定義bean類,主要功能是保存讀取具體分區的信息,包括topic名稱和partition ID;代碼如下:
/** * Created by gerry on 02/22. */ public class KafkaTopicPartitionInfo { // 主題名稱 public final String topic; // 分區id public final int partitionID; /** * 構造函數 * * @param topic 主題名稱 * @param partitionID 分區id */ public KafkaTopicPartitionInfo(String topic, int partitionID) { this.topic = topic; this.partitionID = partitionID; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; KafkaTopicPartitionInfo that = (KafkaTopicPartitionInfo) o; if (partitionID != that.partitionID) return false; return topic != null ? topic.equals(that.topic) : that.topic == null; } @Override public int hashCode() { int result = topic != null ? topic.hashCode() : 0; result = 31 * result + partitionID; return result; } }
三、JavaKafkaSimpleConsumerAPI:具體通過Kafka提供的LowerAPI操作Kafka的相關代碼,包括數據的讀取、偏移量的讀取、更新等操作;具體代碼如下:
import kafka.api.*; import kafka.cluster.Broker; import kafka.common.ErrorMapping; import kafka.common.OffsetAndMetadata; import kafka.common.OffsetMetadataAndError; import kafka.common.TopicAndPartition; import kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetCommitRequest; import kafka.javaapi.OffsetFetchRequest; import kafka.javaapi.OffsetFetchResponse; import kafka.javaapi.OffsetRequest; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.TopicMetadataResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; import java.nio.ByteBuffer; import java.util.*; /** * TODO: 添加必要的日志打印信息 * Kafka Lower consumer api ==> Kafka Simple Consumer API * Created by gerry on 12/21. */ public class JavaKafkaSimpleConsumerAPI { // 最大重試次數 private int maxRetryTimes = 5; // 重試間隔時間 private long retryIntervalMillis = 1000; // 緩存Topic/Partition對應的Broker連接信息 private Map<KafkaTopicPartitionInfo, List<KafkaBrokerInfo>> replicaBrokers = new HashMap<KafkaTopicPartitionInfo, List<KafkaBrokerInfo>>(); /** * 運行入口 * * @param maxReads 最多讀取記錄數量 * @param topicPartitionInfo 讀取數據的topic分區信息 * @param seedBrokers 連接topic分區的初始化連接信息 * @throws Exception */ public void run(long maxReads, KafkaTopicPartitionInfo topicPartitionInfo, List<KafkaBrokerInfo> seedBrokers) throws Exception { // 默認消費數據的偏移量是當前分區的最早偏移量值 long whichTime = kafka.api.OffsetRequest.EarliestTime(); // 構建client name及groupId String topic = topicPartitionInfo.topic; int partitionID = topicPartitionInfo.partitionID; String clientName = this.createClientName(topic, partitionID); String groupId = clientName; // 獲取當前topic分區對應的分區元數據(主要包括leader節點的連接信息) PartitionMetadata metadata = this.findLeader(seedBrokers, topic, partitionID); // 校驗元數據 this.validatePartitionMetadata(metadata); // 連接leader節點構建具體的SimpleConsumer對象 SimpleConsumer consumer = this.createSimpleConsumer(metadata.leader().host(), metadata.leader().port(), clientName); try { // 獲取當前topic、當前consumer的消費數據offset偏移量 int times = 0; long readOffSet = -1; while (true) { readOffSet = this.getLastOffSet(consumer, groupId, topic, partitionID, whichTime, clientName); if (readOffSet == -1) { // 當返回為-1的時候,表示異常信息 if (times > this.maxRetryTimes) { throw new RuntimeException("Fetch the last offset of those group:" + groupId + " occur exception"); } // 先休眠,再重新構建Consumer連接 times++; this.sleep(); consumer = this.createNewSimpleConsumer(consumer, topic, partitionID); continue; } // 正常情況下,結束循環 break; } System.out.println("The first read offset is:" + readOffSet); int numErrors = 0; boolean ever = maxReads <= 0; // 開始數據讀取操作循環,當maxReads為非正數的時候,一直讀取數據;當maxReads為正數的時候,最多讀取maxReads條數據 while (ever || maxReads > 0) { // 構建獲取數據的請求對象, 給定獲取數據對應的topic、partition、offset以及每次獲取數據最多獲取條數 kafka.api.FetchRequest request = new FetchRequestBuilder() .clientId(clientName) .addFetch(topic, partitionID, readOffSet, 100000) .build(); // 發送請求到Kafka,並獲得返回值 FetchResponse response = consumer.fetch(request); // 如果返回對象表示存在異常,進行異常處理,並進行consumer重新連接的操作 // 當異常連續出現次數超過5次的時候,程序拋出異常 if (response.hasError()) { String leaderBrokerHost = consumer.host(); numErrors++; short code = response.errorCode(topic, partitionID); System.out.println("Error fetching data from the Broker:" + leaderBrokerHost + " Reason:" + code); if (numErrors > 5) break; if (code == ErrorMapping.OffsetOutOfRangeCode()) { // 異常表示是offset異常,重新獲取偏移量即可 readOffSet = this.getLastOffSet(consumer, groupId, topic, partitionID, kafka.api.OffsetRequest.LatestTime(), clientName); continue; } consumer.close(); consumer = null; // 重新創建一個SimpleConsumer對象 consumer = this.createNewSimpleConsumer(consumer, topic, partitionID); continue; } // 重置失敗次數 numErrors = 0; // 接收數據沒有異常,那么開始對數據進行具體操作,eg: 打印 long numRead = 0; for (MessageAndOffset messageAndOffset : response.messageSet(topic, partitionID)) { // 校驗偏移量 long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffSet) { System.out.println("Found and old offset:" + currentOffset + " Expection:" + readOffSet); continue; } // 獲取下一個讀取數據開始的偏移量 readOffSet = messageAndOffset.nextOffset(); // 讀取數據的value ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); System.out.println(currentOffset + ": " + new String(bytes, "UTF-8")); numRead++; maxReads--; } // 更新偏移量 consumer = this.updateOffset(consumer, topic, partitionID, readOffSet, groupId, clientName, 0); // 如果沒有讀取數據,休眠一秒鍾 if (numRead == 0) { try { Thread.sleep(1000); } catch (Exception e) { // nothings } } } System.out.println("執行完成...."); } finally { // 關閉資源 if (consumer != null) { try { consumer.close(); } catch (Exception e) { // nothings } } } } /** * 驗證分區元數據,如果驗證失敗,直接拋出IllegalArgumentException異常 * * @param metadata */ private void validatePartitionMetadata(PartitionMetadata metadata) { if (metadata == null) { System.out.println("Can't find metadata for Topic and Partition. Exiting!!"); throw new IllegalArgumentException("Can't find metadata for Topic and Partition. Exiting!!"); } if (metadata.leader() == null) { System.out.println("Can't find Leader for Topic and Partition. Exiting!!"); throw new IllegalArgumentException("Can't find Leader for Topic and Partition. Exiting!!"); } } /** * Finding the Lead Broker for a Topic and Partition<br/> * 獲取主題和分區對應的主Broker節點(即topic和分區id是給定參數的對應brokere節點的元數據)<br/> * 獲取方式: * * @param brokers Kafka集群連接參數,eg: {"hadoop-senior01" -> 9092, "hadoop-senior02" -> 9092} * @param topic topic名稱 * @param partitionID 分區id * @return */ public PartitionMetadata findLeader( List<KafkaBrokerInfo> brokers, String topic, int partitionID) { PartitionMetadata returnMetadata = null; for (KafkaBrokerInfo broker : brokers) { SimpleConsumer consumer = null; try { // 1. 創建簡單的消費者連接對象 consumer = new SimpleConsumer(broker.brokerHost, broker.brokerPort, 100000, 64 * 1024, "leaderLookUp"); // 2. 構建獲取參數的Topic名稱參數集合 List<String> topics = Collections.singletonList(topic); // 3. 構建請求參數 TopicMetadataRequest request = new TopicMetadataRequest(topics); // 4. 請求數據,得到返回對象 TopicMetadataResponse response = consumer.send(request); // 5. 獲取返回值 List<TopicMetadata> metadatas = response.topicsMetadata(); // 6. 遍歷返回值 for (TopicMetadata metadata : metadatas) { // 獲取當前metadata對應的分區 String currentTopic = metadata.topic(); if (topic.equalsIgnoreCase(currentTopic)) { // 遍歷所有分區的原始數據 ==> 當前分區的元數據 for (PartitionMetadata part : metadata.partitionsMetadata()) { if (part.partitionId() == partitionID) { // 1. 找到對應的元數據 returnMetadata = part; // 2. 更新備份節點的host數據 if (returnMetadata != null) { KafkaTopicPartitionInfo topicPartitionInfo = new KafkaTopicPartitionInfo(topic, partitionID); List<KafkaBrokerInfo> brokerInfos = this.replicaBrokers.get(topicPartitionInfo); if (brokerInfos == null) { brokerInfos = new ArrayList<KafkaBrokerInfo>(); } else { brokerInfos.clear(); } for (Broker replica : returnMetadata.replicas()) { brokerInfos.add(new KafkaBrokerInfo(replica.host(), replica.port())); } this.replicaBrokers.put(topicPartitionInfo, brokerInfos); return returnMetadata; } } } } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + broker.brokerHost + "] to find Leader for [" + topic + ", " + partitionID + "] Reason:" + e); } finally { if (consumer != null) { try { consumer.close(); } catch (Exception e) { // nothings } } } } // 沒有找到,返回一個空值,默認情況下,不會返回該值 return null; } /** * 獲取當前groupID對應的consumer在對應的topic和partition中對應的offset偏移量 * * @param consumer 消費者 * @param groupId 消費者分區id * @param topic 所屬的Topic * @param partitionID 所屬的分區ID * @param whichTime 用於判斷,當consumer從沒有消費數據的時候,從當前topic的Partition的那個offset開始讀取數據 * @param clientName client名稱 * @return 正常情況下,返回非負數,當出現異常的時候,返回-1 */ public long getLastOffSet(SimpleConsumer consumer, String groupId, String topic, int partitionID, long whichTime, String clientName) { // 1. 從ZK中獲取偏移量,當zk的返回偏移量大於0的時候,表示是一個正常的偏移量 long offset = this.getOffsetOfTopicAndPartition(consumer, groupId, clientName, topic, partitionID); if (offset > 0) { return offset; } // 2. 獲取當前topic當前分區的數據偏移量 TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionID); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); OffsetRequest request = new OffsetRequest(requestInfoMap, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionID)); return -1; } // 獲取偏移量 long[] offsets = response.offsets(topic, partitionID); return offsets[0]; } /** * 從保存consumer消費者offset偏移量的位置獲取當前consumer對應的偏移量 * * @param consumer 消費者 * @param groupId Group Id * @param clientName client名稱 * @param topic topic名稱 * @param partitionID 分區id * @return */ public long getOffsetOfTopicAndPartition(SimpleConsumer consumer, String groupId, String clientName, String topic, int partitionID) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionID); List<TopicAndPartition> requestInfo = new ArrayList<TopicAndPartition>(); requestInfo.add(topicAndPartition); OffsetFetchRequest request = new OffsetFetchRequest(groupId, requestInfo, 0, clientName); OffsetFetchResponse response = consumer.fetchOffsets(request); // 獲取返回值 Map<TopicAndPartition, OffsetMetadataAndError> returnOffsetMetadata = response.offsets(); // 處理返回值 if (returnOffsetMetadata != null && !returnOffsetMetadata.isEmpty()) { // 獲取當前分區對應的偏移量信息 OffsetMetadataAndError offset = returnOffsetMetadata.get(topicAndPartition); if (offset.error() == ErrorMapping.NoError()) { // 沒有異常,表示是正常的,獲取偏移量 return offset.offset(); } else { // 當Consumer第一次連接的時候(zk中不在當前topic對應數據的時候),會產生UnknownTopicOrPartitionCode異常 System.out.println("Error fetching data Offset Data the Topic and Partition. Reason: " + offset.error()); } } // 所有異常情況直接返回0 return 0; } /** * 根據給定參數獲取一個新leader的分區元數據信息 * * @param oldLeader * @param topic * @param partitionID * @return */ private PartitionMetadata findNewLeaderMetadata(String oldLeader, String topic, int partitionID) { KafkaTopicPartitionInfo topicPartitionInfo = new KafkaTopicPartitionInfo(topic, partitionID); List<KafkaBrokerInfo> brokerInfos = this.replicaBrokers.get(topicPartitionInfo); for (int i = 0; i < 3; i++) { boolean gotoSleep = false; PartitionMetadata metadata = this.findLeader(brokerInfos, topic, partitionID); if (metadata == null) { gotoSleep = true; } else if (metadata.leader() == null) { gotoSleep = true; } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { // leader切換過程中 gotoSleep = true; } else { return metadata; } if (gotoSleep) { try { Thread.sleep(1000); } catch (InterruptedException e) { // nothings } } } System.out.println("Unable to find new leader after Broker failure. Exiting!!"); throw new RuntimeException("Unable to find new leader after Broker failure. Exiting!!"); } /** * 更新偏移量,當SimpleConsumer發生變化的時候,重新構造一個新的SimpleConsumer並返回 * * @param consumer * @param topic * @param partitionID * @param readOffSet * @param groupId * @param clientName * @param times * @return * @throws RuntimeException 當更新失敗的情況下 */ private SimpleConsumer updateOffset(SimpleConsumer consumer, String topic, int partitionID, long readOffSet, String groupId, String clientName, int times) { // 構建請求對象 Map<TopicAndPartition, OffsetAndMetadata> requestInfoMap = new HashMap<TopicAndPartition, OffsetAndMetadata>(); TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionID); requestInfoMap.put(topicAndPartition, new OffsetAndMetadata(readOffSet, OffsetAndMetadata.NoMetadata(), -1)); kafka.javaapi.OffsetCommitRequest ocRequest = new OffsetCommitRequest(groupId, requestInfoMap, 0, clientName); // 提交修改偏移量的請求,並獲取返回值 kafka.javaapi.OffsetCommitResponse response = consumer.commitOffsets(ocRequest); // 根據返回值進行不同的操作 if (response.hasError()) { short code = response.errorCode(topicAndPartition); if (times > this.maxRetryTimes) { throw new RuntimeException("Update the Offset occur exception," + " the current response code is:" + code); } if (code == ErrorMapping.LeaderNotAvailableCode()) { // 當異常code為leader切換情況的時候,重新構建consumer對象 // 操作步驟:先休眠一段時間,再重新構造consumer對象,最后重試 try { Thread.sleep(this.retryIntervalMillis); } catch (InterruptedException e) { // nothings } PartitionMetadata metadata = this.findNewLeaderMetadata(consumer.host(), topic, partitionID); this.validatePartitionMetadata(metadata); consumer = this.createSimpleConsumer(metadata.leader().host(), metadata.leader().port(), clientName); // 重試 consumer = updateOffset(consumer, topic, partitionID, readOffSet, groupId, clientName, times + 1); } if (code == ErrorMapping.RequestTimedOutCode()) { // 當異常為請求超時的時候,進行重新請求 consumer = updateOffset(consumer, topic, partitionID, readOffSet, groupId, clientName, times + 1); } // 其他code直接拋出異常 throw new RuntimeException("Update the Offset occur exception," + " the current response code is:" + code); } // 返回修改后的consumer對象 return consumer; } /** * 構建clientName根據主題名稱和分區id * * @param topic * @param partitionID * @return */ private String createClientName(String topic, int partitionID) { return "client_" + topic + "_" + partitionID; } /** * 根據一個老的consumer,重新創建一個consumer對象 * * @param consumer * @param topic * @param partitionID * @return */ private SimpleConsumer createNewSimpleConsumer(SimpleConsumer consumer, String topic, int partitionID) { // 重新獲取新的leader節點 PartitionMetadata metadata = this.findNewLeaderMetadata(consumer.host(), topic, partitionID); // 校驗元數據 this.validatePartitionMetadata(metadata); // 重新創建consumer的連接 return this.createSimpleConsumer(metadata.leader().host(), metadata.leader().port(), consumer.clientId()); } /** * 構建一個SimpleConsumer並返回 * * @param host * @param port * @param clientName * @return */ private SimpleConsumer createSimpleConsumer(String host, int port, String clientName) { return new SimpleConsumer(host, port, 100000, 64 * 1024, clientName); } /** * 休眠一段時間 */ private void sleep() { try { Thread.sleep(this.maxRetryTimes); } catch (InterruptedException e) { // nothings } } /** * 關閉對應資源 * * @param consumer */ private static void closeSimpleConsumer(SimpleConsumer consumer) { if (consumer != null) { try { consumer.close(); } catch (Exception e) { // nothings } } } /** * 從Kafka集群中獲取指定topic的分區ID<br/> * 如果集群中不存在對應的topic,那么返回一個empty的集合 * * @param brokers Kafka集群連接參數,eg: {"hadoop-senior01" -> 9092, "hadoop-senior02" -> 9092} * @param topic 要獲取ID對應的主題 * @param soTimeout 過期時間 * @param bufferSize 緩沖區大小 * @param clientId client連接ID * @return */ public static List<Integer> fetchTopicPartitionIDs(List<KafkaBrokerInfo> brokers, String topic, int soTimeout, int bufferSize, String clientId) { Set<Integer> partitionIDs = new HashSet<Integer>(); List<String> topics = Collections.singletonList(topic); // 連接所有的Kafka服務器,然后獲取參數 ==> 遍歷連接 for (KafkaBrokerInfo broker : brokers) { SimpleConsumer consumer = null; try { // 構建簡單消費者連接對象 consumer = new SimpleConsumer(broker.brokerHost, broker.brokerPort, soTimeout, bufferSize, clientId); // 構建請求參數 TopicMetadataRequest tmRequest = new TopicMetadataRequest(topics); // 發送請求 TopicMetadataResponse response = consumer.send(tmRequest); // 獲取返回結果 List<TopicMetadata> metadatas = response.topicsMetadata(); // 遍歷返回結果,獲取對應topic的結果值 for (TopicMetadata metadata : metadatas) { if (metadata.errorCode() == ErrorMapping.NoError()) { // 沒有異常的情況下才進行處理 if (topic.equals(metadata.topic())) { // 處理當前topic對應的分區 for (PartitionMetadata part : metadata.partitionsMetadata()) { partitionIDs.add(part.partitionId()); } // 處理完成,結束循環 break; } } } } finally { // 關閉連接 closeSimpleConsumer(consumer); } } // 返回結果 return new ArrayList<Integer>(partitionIDs); } }
四、JavaKafkaSimpleConsumerAPITest:測試類;主要代碼如下:
import java.util.ArrayList; import java.util.List; /** * Created by gerry on 12/21. */ public class JavaKafkaSimpleConsumerAPITest { public static void main(String[] args) { JavaKafkaSimpleConsumerAPI example = new JavaKafkaSimpleConsumerAPI(); long maxReads = 300; String topic = "test2"; int partitionID = 0; KafkaTopicPartitionInfo topicPartitionInfo = new KafkaTopicPartitionInfo(topic, partitionID); List<KafkaBrokerInfo> seeds = new ArrayList<KafkaBrokerInfo>(); seeds.add(new KafkaBrokerInfo("192.168.187.146", 9092)); try { example.run(maxReads, topicPartitionInfo, seeds); } catch (Exception e) { e.printStackTrace(); } // 獲取該topic所屬的所有分區ID列表 System.out.println(example.fetchTopicPartitionIDs(seeds, topic, 100000, 64 * 1024, "client-id")); } }
五、測試
Kafka相關命令可以參考博客[Kafka] - Kafka基本操作命令, 測試截圖如下:
至此,開發基本完成
========================================================
六、Kafka Pom文件依賴
<properties> <kafka.version>0.8.2.1</kafka.version> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>${kafka.version}</version> </dependency> </dependencies>