對於kafka的consumer接口,提供兩種版本,
high-level
一種high-level版本,比較簡單不用關心offset, 會自動的讀zookeeper中該Consumer group的last offset
參考,https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
不過要注意一些注意事項,對於多個partition和多個consumer
1. 如果consumer比partition多,是浪費,因為kafka的設計是在一個partition上是不允許並發的,所以consumer數不要大於partition數
2. 如果consumer比partition少,一個consumer會對應於多個partitions,這里主要合理分配consumer數和partition數,否則會導致partition里面的數據被取的不均勻
最好partiton數目是consumer數目的整數倍,所以partition數目很重要,比如取24,就很容易設定consumer數目
3. 如果consumer從多個partition讀到數據,不保證數據間的順序性,kafka只保證在一個partition上數據是有序的,但多個partition,根據你讀的順序會有不同
4. 增減consumer,broker,partition會導致rebalance,所以rebalance后consumer對應的partition會發生變化
5. High-level接口中獲取不到數據的時候是會block的
簡單版,
簡單的坑,如果測試流程是,先produce一些數據,然后再用consumer讀的話,記得加上第一句設置
因為初始的offset默認是非法的,然后這個設置的意思是,當offset非法時,如何修正offset,默認是largest,即最新,所以不加這個配置,你是讀不到你之前produce的數據的,而且這個時候你再加上smallest配置也沒用了,因為此時offset是合法的,不會再被修正了,需要手工或用工具改重置offset
Properties props = new Properties(); props.put("auto.offset.reset", "smallest"); //必須要加,如果要讀舊數據 props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "pv"); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig conf = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf); String topic = "page_visits"; Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); KafkaStream<byte[], byte[]> stream = streams.get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()){ System.out.println("message: " + new String(it.next().message())); } if (consumer != null) consumer.shutdown(); //其實執行不到,因為上面的hasNext會block
在用high-level的consumer時,兩個給力的工具,
1. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv
可以看到當前group offset的狀況,比如這里看pv的狀況,3個partition
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none
關鍵就是offset,logSize和Lag
這里以前讀完了,所以offset=logSize,並且Lag=0
2. bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties page_visits
3個參數,
[earliest | latest],表示將offset置到哪里
consumer.properties ,這里是配置文件的路徑
topic,topic名,這里是page_visits
我們對上面的pv group執行完這個操作后,再去check group offset狀況,結果如下,
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 0 21 21 none
pv page_visits 1 0 19 19 none
pv page_visits 2 0 20 20 none
可以看到offset已經被清0,Lag=logSize
底下給出原文中多線程consumer的完整代碼
import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ConsumerGroupExample { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( // 創建Connector,注意下面對conf的配置 createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); } public void run(int a_numThreads) { // 創建並發的consumers Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); // 描述讀取哪個topic,需要幾個線程讀 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); // 創建Streams List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // 每個線程對應於一個KafkaStream // now launch all the threads // executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); // 啟動consumer thread threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = args[0]; String groupId = args[1]; String topic = args[2]; int threads = Integer.parseInt(args[3]); ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); example.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); } }
SimpleConsumer
另一種是SimpleConsumer,名字起的,以為是簡單的接口,其實是low-level consumer,更復雜的接口
參考,https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
什么時候用這個接口?
- Read a message multiple times
- Consume only a subset of the partitions in a topic in a process
- Manage transactions to make sure a message is processed once and only once
當然用這個接口是有代價的,即partition,broker,offset對你不再透明,需要自己去管理這些,並且還要handle broker leader的切換,很麻煩
所以不是一定要用,最好別用
- You must keep track of the offsets in your application to know where you left off consuming.
- You must figure out which Broker is the lead Broker for a topic and partition
- You must handle Broker leader changes
使用SimpleConsumer的步驟:
- Find an active Broker and find out which Broker is the leader for your topic and partition
- Determine who the replica Brokers are for your topic and partition
- Build the request defining what data you are interested in
- Fetch the data
- Identify and recover from leader changes
首先,你必須知道讀哪個topic的哪個partition
然后,找到負責該partition的broker leader,從而找到存有該partition副本的那個broker
再者,自己去寫request並fetch數據
最終,還要注意需要識別和處理broker leader的改變
逐步來看,
Finding the Lead Broker for a Topic and Partition
思路就是,遍歷每個broker,取出該topic的metadata,然后再遍歷其中的每個partition metadata,如果找到我們要找的partition就返回
根據返回的PartitionMetadata.leader().host()找到leader broker
private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) { PartitionMetadata returnMetaData = null; loop: for (String seed : a_seedBrokers) { //遍歷每個broker SimpleConsumer consumer = null; try { //創建Simple Consumer,
//class SimpleConsumer(val host: String,val port: Int,val soTimeout: Int
// ,val bufferSize: Int,val clientId: String) consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); List<String> topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); // kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); //發送TopicMetadata Request請求 List<TopicMetadata> metaData = resp.topicsMetadata(); //取到Topic的Metadata for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) {//遍歷每個partition的metadata if (part.partitionId() == a_partition) { //確認是否是我們要找的partition returnMetaData = part; break loop; //找到就返回 } } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } return returnMetaData; }
Finding Starting Offset for Reads
request主要的信息就是Map<TopicAndPartition, PartitionOffsetRequestInfo>
TopicAndPartition就是對topic和partition信息的封裝
PartitionOffsetRequestInfo的定義
case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int)
其中參數time,表示where to start reading data,兩個取值
kafka.api.OffsetRequest.EarliestTime(),the beginning of the data in the logs
kafka.api.OffsetRequest.LatestTime(),will only stream new messages
不要認為起始的offset一定是0,因為messages會過期,被刪除
另外一個參數不清楚什么含義,代碼中取的是1
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); //build offset fetch request info kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(),clientName); OffsetResponse response = consumer.getOffsetsBefore(request); //取到offsets if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) ); return 0; } long[] offsets = response.offsets(topic, partition); //取到的一組offset return offsets[0]; //取第一個開始讀 }
Reading the Data
首先在FetchRequest上加上Fetch,指明topic,partition,開始的offset,讀取的大小
如果producer在寫入很大的message時,也許這里指定的1000000是不夠的,會返回an empty message set,這時需要增加這個值,直到得到一個非空的message set。
// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only. // Setting the replicaId incorrectly will cause the brokers to behave incorrectly. FetchRequest req = new FetchRequestBuilder() .clientId(clientName) .addFetch(a_topic, a_partition, readOffset, 100000) // 1000000bytes .build(); FetchResponse fetchResponse = consumer.fetch(req); if (fetchResponse.hasError()) { // See Error Handling } numErrors = 0; long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { // 必要判斷,因為對於compressed message,會返回整個block,所以可能包含old的message System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset); continue; } readOffset = messageAndOffset.nextOffset(); // 獲取下一個readOffset ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); numRead++; } if (numRead == 0) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } }
Error Handling
if (fetchResponse.hasError()) { numErrors++; // Something went wrong! short code = fetchResponse.errorCode(a_topic, a_partition); System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); if (numErrors > 5) break; if (code == ErrorMapping.OffsetOutOfRangeCode()) { // 處理offset非法的問題,用最新的offset // We asked for an invalid offset. For simple case ask for the last element to reset readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); continue; } consumer.close(); consumer = null; leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); // 更新leader broker continue; }
沒有特別的邏輯,只是重新調用findLeader獲取leader broker
並且防止在切換過程中,取不到leader信息,加上sleep邏輯
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { for (int i = 0; i < 3; i++) { boolean goToSleep = false; PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); if (metadata == null) { goToSleep = true; } else if (metadata.leader() == null) { goToSleep = true; } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { // first time through if the leader hasn't changed give ZooKeeper a second to recover // second time, assume the broker did recover before failover, or it was a non-Broker issue // goToSleep = true; } else { return metadata.leader().host(); } if (goToSleep) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } System.out.println("Unable to find new leader after Broker failure. Exiting"); throw new Exception("Unable to find new leader after Broker failure. Exiting"); }
Full Source Code
package com.test.simple; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; import kafka.javaapi.*; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; public class SimpleExample { public static void main(String args[]) { SimpleExample example = new SimpleExample(); long maxReads = Long.parseLong(args[0]); String topic = args[1]; int partition = Integer.parseInt(args[2]); List<String> seeds = new ArrayList<String>(); seeds.add(args[3]); int port = Integer.parseInt(args[4]); try { example.run(maxReads, topic, partition, seeds, port); } catch (Exception e) { System.out.println("Oops:" + e); e.printStackTrace(); } } private List<String> m_replicaBrokers = new ArrayList<String>(); public SimpleExample() { m_replicaBrokers = new ArrayList<String>(); } public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception { // find the meta data about the topic and partition we are interested in // PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); if (metadata == null) { System.out.println("Can't find metadata for Topic and Partition. Exiting"); return; } if (metadata.leader() == null) { System.out.println("Can't find Leader for Topic and Partition. Exiting"); return; } String leadBroker = metadata.leader().host(); String clientName = "Client_" + a_topic + "_" + a_partition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName); int numErrors = 0; while (a_maxReads > 0) { if (consumer == null) { consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); } FetchRequest req = new FetchRequestBuilder() .clientId(clientName) .addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka .build(); FetchResponse fetchResponse = consumer.fetch(req); if (fetchResponse.hasError()) { numErrors++; // Something went wrong! short code = fetchResponse.errorCode(a_topic, a_partition); System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); if (numErrors > 5) break; if (code == ErrorMapping.OffsetOutOfRangeCode()) { // We asked for an invalid offset. For simple case ask for the last element to reset readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); continue; } consumer.close(); consumer = null; leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); continue; } numErrors = 0; long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset); continue; } readOffset = messageAndOffset.nextOffset(); ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); numRead++; a_maxReads--; } if (numRead == 0) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } if (consumer != null) consumer.close(); } public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) ); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { for (int i = 0; i < 3; i++) { boolean goToSleep = false; PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); if (metadata == null) { goToSleep = true; } else if (metadata.leader() == null) { goToSleep = true; } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { // first time through if the leader hasn't changed give ZooKeeper a second to recover // second time, assume the broker did recover before failover, or it was a non-Broker issue // goToSleep = true; } else { return metadata.leader().host(); } if (goToSleep) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } System.out.println("Unable to find new leader after Broker failure. Exiting"); throw new Exception("Unable to find new leader after Broker failure. Exiting"); } private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) { PartitionMetadata returnMetaData = null; loop: for (String seed : a_seedBrokers) { SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); List<String> topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { if (part.partitionId() == a_partition) { returnMetaData = part; break loop; } } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } if (returnMetaData != null) { m_replicaBrokers.clear(); for (kafka.cluster.Broker replica : returnMetaData.replicas()) { m_replicaBrokers.add(replica.host()); } } return returnMetaData; } }