kafka誕生之初,它自帶一個基於scala的生產者和消費者客戶端。但是慢慢的我們認識到這些API有很多限制。比如,消費者有一個“高級”API支持分組和異常控制,但是不支持很多更復雜的應用場景;它也有一個“低級”API,支持對細節的完全控制,但是要求碼農自己控制失敗和異常。所以重新設計了它們。
這個過程的第一階段就是在0.8.1版本的時候重寫了生產者API。在最近的0.9版本中完成了第二階段,提供了消費者的新API。建立在新的分組協議只是,新的消費者帶來以下好處:
- API更加簡潔:新的消費者API綜合了老版本的“高級”和“低級”API的功能,同時提供了分組機制和lower level access來實現自己的消費策略;
- 減少了依賴:新的消費者API是用純java寫的。沒有了scala和zk的依賴,讓代碼工程更輕量級;
- 更安全:新的消費者API支持kafka0.9版本的安全機制;
- 新的消費者也增加了一系列的機制來控制組消費時的容錯。老的API使用大量的java代碼實現的(與ZK交互過多),復雜的邏輯很難讓其他語言的消費者實現。新的API使這變得更簡單。現在已經有C版本的客戶端了。
雖然新的消費者是被重新設計過的和新的交互機制,但很多感念沒有本質區別,所以熟悉老API的碼農也不會覺得新API生硬。但是,也有一些特別細微的細節相對於組管理和線程模型需要在碼代碼的時候注意。
還有一個注意點:新的消費者API還是測試版本。(不穩定哦,隨時會有BUG冒出來,偉大的踩坑者)
Getting Started
略過舊API中的分組消費介紹。。。
舊的API強依賴ZK做分組管理,新的API使用kafka自己的分組協調機制。針對每個消費組,會從所有的broker中挑選出一個出來充當這個組的“協調員”。協調員負責管理該組的狀態。它的主要任務是,當新的組成員進入、老的組成員離開和元數據改變時進行分區的協調分配。這種重新分配分區的行為稱之為“重新平衡組”。
當一個組首次被初始化,每個分區的消費者一般會從最早或最近的數據開始讀。然后在每個分區的消息被依次讀出。在消費過程中,消費者會提交已經成功處理了的消息的偏移量。例如,在下圖中,消費者正在讀的消息的偏移量是6,而它最近一次提交的偏移量是1:
當一個分區被重新分配給組中的另一個消費者時,這個消費者會從上一個消費者最后一次提交的偏移量處開始讀。如果上面例子中的消費者突然崩潰了,其他組成員讀的時候會從1開始讀。這種情況下,它會從1到6重新消費一遍。
上圖中還標注了其他兩個位置。Log End Offset標記了最后一條消息寫入后的偏移量。High Watermark標記了最后被其他replicas同步成功了的偏移量。對於消費者來說,只能讀到High Watermark處,這樣為了防止未同步的消息被讀了以后丟失掉。
配置和初始化
在開始使用新的消費者API之前,先把 kafka-clients 這個依賴加到工程中。

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency>
消費者通過Properties文件來配置消費屬性,下面是一個最小配置:

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "consumer-tutorial"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
與舊的消費者和生產者一樣,我們需要配置broker連接參數。我們不需要提供集群中所有服務器的連接參數,客戶端會根據給定的連接參數集合得到所有的存活broker。客戶端還需要配置key和value的初始化類。最后配置group.id。
訂閱TOPIC
在開始消費之前,必須先訂閱一些需要讀取消息的topic。下面的例子中,同時訂閱了foo和bar兩個topic:

consumer.subscribe(Arrays.asList("foo", "bar"));
訂閱后,消費者會與組內其他消費者協調分區的分配。在開始消費消息的時候這些事自動完成的。稍后會展示如何使用分配API手動指定分區 。但是不能手動和自動一起用。
訂閱topic的方法不能增量訂閱:每次訂閱必須包含要訂閱的所有topic。可以隨時改變訂閱,新的訂閱會替換舊的訂閱。
基本的POLL循環
消費者需要並行化地讀取數據,可能從分布在不同broker的不同topic的不同分區。為了做到這一點,新的API用了近似unix得pool或者select調用:一旦訂閱了一些topic,所有未來的協調、重新平衡和數據獲取都被一個調用事件所驅動。這需要單個線程掌控所有IO的一個簡單而有效的實現。
訂閱一個主題后,需要一個事件循環來接受分區的分配和數據的獲取。聽起來復雜,其實只需要在循環調用poll方法,然后消費者客戶端就會處理剩下的事情。每次調用poll方法,都會收到(可能為空)被分配的分區里面的一系列數據。下面是基本例子:

try { while (running) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) System.out.println(record.offset() + ": " + record.value()); } } finally { consumer.close(); }
傳進poll方法里面的參數是一個Long類型的,表示等待消息的時間:如果隊列里面有消息,會立馬返回,如果沒有,會等待指定的時間然后返回。
消費者被設計成在自己的線程里面運行。沒有外部同步的多線程是不安全的,也是不建議這樣做的。
當消費完成后一定記得關閉它,這樣會保證組內協調分配分區不會混亂(因為一個分區只能被組內的一個消費者消費)。
上例中使用了一個較小的超時時間為了保證不會有太多延時去關閉消費者。下面這個例子中使用了很長的超時時間和用wakeup API來跳出循環:

try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (ConsumerRecord<String, String> record : records) System.out.println(record.offset() + “: ” + record.value()); } } catch (WakeupException e) { // ignore for shutdown } finally { consumer.close(); }
wakeup操作是線程安全的:

/** * Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll. * The thread which is blocking in an operation will throw {@link org.apache.kafka.common.errors.WakeupException}. */ @Override public void wakeup() { this.client.wakeup(); }
整合到一起:

public class ConsumerLoop implements Runnable { private final KafkaConsumer<String, String> consumer; private final List<String> topics; private final int id; public ConsumerLoop(int id, String groupId, List<String> topics) { this.id = id; this.topics = topics; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put(“group.id”, groupId); props.put(“key.deserializer”, StringDeserializer.class.getName()); props.put(“value.deserializer”, StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<>(props); } @Override public void run() { try { consumer.subscribe(topics); while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (ConsumerRecord<String, String> record : records) { Map<String, Object> data = new HashMap<>(); data.put("partition", record.partition()); data.put("offset", record.offset()); data.put("value", record.value()); System.out.println(this.id + ": " + data); } } } catch (WakeupException e) { // ignore for shutdown } finally { consumer.close(); } } public void shutdown() { consumer.wakeup(); } }
測試這里例子的話需要造一些數據。最簡單的方式是使用kafka-verifiable-producer.sh這個腳本。

# bin/kafka-topics.sh --create --topic consumer-tutorial --replication-factor 1 --partitions 3 --zookeeper localhost:2181 # bin/kafka-verifiable-producer.sh --topic consumer-tutorial --max-messages 200000 --broker-list localhost:9092
然后是驅動類:

public static void main(String[] args) { int numConsumers = 3; String groupId = "consumer-tutorial-group" List<String> topics = Arrays.asList("consumer-tutorial"); ExecutorService executor = Executors.newFixedThreadPool(numConsumers); final List<ConsumerLoop> consumers = new ArrayList<>(); for (int i = 0; i < numConsumers; i++) { ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics); consumers.add(consumer); executor.submit(consumer); } Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { for (ConsumerLoop consumer : consumers) { consumer.shutdown(); } executor.shutdown(); try { executor.awaitTermination(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace; } } }); }
例子中啟動了三個線程來消費消息,每個線程給一個單獨的ID,這樣就能清楚的看到哪個線程消費到了哪些信息。shutdown hook會調用線程的wakeup方法來結束消費。在IDE里面可以點擊關閉或者在命令行里面使用Ctrl-C。輸出結果:

2: {partition=0, offset=928, value=2786} 2: {partition=0, offset=929, value=2789} 1: {partition=2, offset=297, value=891} 2: {partition=0, offset=930, value=2792} 1: {partition=2, offset=298, value=894} 2: {partition=0, offset=931, value=2795} 0: {partition=1, offset=278, value=835} 2: {partition=0, offset=932, value=2798} 0: {partition=1, offset=279, value=838} 1: {partition=2, offset=299, value=897} 1: {partition=2, offset=300, value=900} 1: {partition=2, offset=301, value=903} 1: {partition=2, offset=302, value=906} 1: {partition=2, offset=303, value=909} 1: {partition=2, offset=304, value=912} 0: {partition=1, offset=280, value=841} 2: {partition=0, offset=933, value=2801}
Consumer Liveness
當組內的一個消費者消費某個分區的時候,這些分區上會有一個基於組的鎖,即一個組里面一個消費者正在消費某個分區,組內的其他消費者就不能消費這個分區,如果這個消費者一直健康的運行當然最好,如果因為某些原因死掉,你需要把這個鎖解掉,然后把分區分給其他消費者。
kafka的組協調機制使用了心跳機制來解決這個問題。每次重新平衡分區分配后,組內消費者開始向組協調員(某個broker)發送心跳。組協調員持續收到某個消費者的心跳,它就認為這個消費者是健康的。協調員每次收到心跳,都會啟動一個計時器。當計時器到時間后還沒有收到后面的心跳,就認為這個消費者已經掛掉了,就會把這個分區分配給其他合適的消費者。計時器的持續時間是被稱為會話超時,由客戶端的session.timeout.ms配置。
props.put("session.timeout.ms", "60000");
會話超時機制能保證當消費者掛掉或者網絡故障的時候,分區的鎖會被釋放,並分配給其他消費。老的消費者再發送心跳也不認為它是健康的。
心跳發送線程和poll線程是一起的,正常poll數據的時候才會發送心跳,否則不會發。
會話超時時間默認是30秒,在網絡延時大的集群中可以適當調大這個參數,避免非異常情況下的重新分配分區。
Delivery Semantics
當一個組剛創建時,它的初始化offset是根據 auto.offset.reset 這個配置屬性來獲取的(在0.8中就加入了這個配置項)。一旦消費者開始消費,它根據應用的需求來提交offset。每次組內重新平衡partition以后,讀offset的位置就是上一次最后提交的offset。如果一個應用成功處理了某條消息,但是在成功提交offset之前就崩潰掉了,那么下一個消費者將重新讀這條消息,造成重復讀。當然,offset的提交頻率越快,這種損失就越小。
當我們將 enable.auto.commit 屬性設置為true時(默認為true),消費者會在配置屬性 auto.commit.interval.ms 的時間間隔后自動提交offset。時間間隔越小,崩潰造成的損失越小,隨之影響性能。
如果要自己手動控制offset的提交,則必須將 auto.offset.reset 設置為false。
手動提交offset的API現在還是測試版,但是重要的是如果將它集成到poll循環中。下面代碼是一個例子:

try { while (running) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) System.out.println(record.offset() + ": " + record.value()); try { consumer.commitSync(); } catch (CommitFailedException e) { // application specific failure handling } } } finally { consumer.close(); }
上例中使用了commitSync API來提交,它會在成功返回或者遇到錯誤之前阻塞。你需要關心的主要錯誤就是消息處理的時間超過session的時間造成超時。當這種事情真正發生的時候,這個消費者會被踢出去,然后造成CommitFailedException異常。應用應該處理這種異常,在上次成功提交offset之后和失敗提交offset之后的消息造成的改變進行回滾。
另外你應該保證必須在消息成功處理后再提交offset。
譯自:http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client