上一篇中分析了Scala版的console producer代碼,這篇文章為讀者帶來一篇console consumer工作原理分析的隨筆。其實不論是哪個consumer,大部分的工作原理都是類似的。本文利用console consumer作為切入點,既容易理解又不失一般性。
本文使用的Kafka環境是0.8.2.1版本,這也是當前最新的版本。(注:Kafka 0.9版本據說會用Java重新設計並編寫consumer代碼,對此我們拭目以待) 由於主要目的是分析consumer原理,因此本文並不過多糾結於console consumer特定的使用方法。一條最簡單的命令足以作為我們的開始:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test-topic
kafka-console-consumer.sh腳本內容簡潔明了: exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer $@
很顯然,該shell腳本調用了kafka.tools包下的ConsoleConsumer類,並將提供的命令行參數全部傳給該類。由此可知,我們需要從這個類開始分析。不過在此之前,簡單說一下console consumer整體的啟動流程,如下圖所示:
上圖流程具體展開如下:
1. 加載並解析命令行參數,唯一的必要參數(Required)是zookeeper
2. 如果沒有傳入group.id,ConsoleConsumer將生成自己的group.id,即console-consumer-[10萬以內的一個隨機數]
3. 創建ConsumerConfig用於封裝consumer的各種配置
4. 創建默認的消息格式化類,其定義的writeTo方法會默認將消息輸出到控制台
5. 創建ZookeeperConsumerConnector。Kafka使用它來創建KafkaStream消費流
5.1 創建本地緩存, 保存topic下每個分區的信息,包括該分區底層的阻塞隊列,已消費的位移、已獲取到的最新位移以及獲取大小等
5.2 創建本地緩存,保存每個topic分區當前在zookeeper中保存的位移值
5.3 創建本地緩存,保存topic的每個讀取線程底層對應的阻塞隊列,主要用於關閉Connector時可以批量關閉底層的阻塞隊列
5.4 生成consumer id,規則為[group.id]_[主機名]_[時間戳]_[隨機產生的一個UUID的前8位]。其中主機名就是運行ConsoleConsumer所在broker節點的主機名
5.5 創建獲取線程管理器(ConsumerFetcherManager)
5.6 啟動一個特定線程,用於定時地(默認是1分鍾)向Zookeeper提交更改過的位移
6. 增加JVM關閉鈎子,確保JVM關閉后資源也能夠被釋放
7. 創建KafkaStream並通過迭代器不斷遍歷該stream, KafkaStream的迭代器的底層實現包含一個阻塞隊列,如果沒有新的消息到來,該迭代器會一直阻塞,除非你顯式設置了consumer.timeout.ms參數(默認是-1表示consumer會一直等待新消息的帶來)
8. 每接收到一條新的消息,默認的消息格式化類會將其輸出到控制台上。然后再次等待迭代器傳過來的下一條消息
本質上來說,console consumer啟動時會創建一個KafkaStream(可以簡單翻譯成Kafak流),該stream會不停地等待可消費的新消息——具體做法就是通過LinkedBlockingQueue阻塞隊列來實現,后續會有詳細描述。針對上面啟動的順序列表,我們在ConsoleConsumer.scala中逐一進行代碼走讀:
1. 加載必要參數 zookeeper
ConsoleConsumer.scala類定義了main方法,說明這是個可執行的類。類的前100多行幾乎都在處理命令行參數的解析。其中真正必要的參數只有zookeeper.connect一個,如下面代碼所示:
1 // REQUIRED表示這是一個必須要指定的參數
2 val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
3 "Multiple URLS can be given to allow fail-over.").withRequiredArg.describedAs("urls").ofType(classOf[String])
2. 生成group.id
乍一看和官網上要求的配置不匹配,因為官網中說過consumer真正必要的參數實際上有兩個:zookeeper.connect和group.id。由此可以推斷console consumer應該會生成group.id的值,且它本質上也是一個consumer,必然屬於一個消費組,因此也必然定義了consumer id。下面的代碼中即展示了console consumer如何生成自己的group id: (consumer id是如何生成的后面再說)
1 // 如果沒有顯式指定group.id,那么代碼就自己合成一個 2 // 具體格式: console-consumer-[10萬以內的一個隨機數] 3 // 10萬是一個很大的數,因此只有非常低的幾率會碰到多個console consumer的group id相同的情況
4 if(!consumerProps.containsKey("group.id")) { 5 consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000)) 6 groupIdPassed=false
7 }
3. 創建ConsumerConfig對象封裝配置
確定了consumer的group.id之后console consumer需要把傳入參數封裝進ConsumerConfig類中並把后者傳給Consumer的create方法以構造一個ConsumerConnector——即初始化consumer了,具體邏輯見下面的代碼:
1 val config = new ConsumerConfig(consumerProps) // 封裝ConsumerConfig配置類
2 val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
4. 創建默認的消息格式化類,其定義的writeTo方法會默認將消息輸出到控制台
1 val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) // 創建消息格式類,用於最后的輸出顯示
2 val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) 3 val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
5. 創建ZookeeperConsumerConnector
ZookeeperConsumerConnector非常重要,它實現了ConsumerConnector接口(該接口定義了創建KafkaStream和提交位移的操作,如createMessageStreams、commitOffsets等)。Kakfa官網把這個接口稱為high level的consumer API。對於大多數consumer來說,這個high level的consumer API提供的功能已經足夠了。不過很多用戶可能需要對位移有更大的控制,這個時候Kafka推薦用戶使用被稱為low level的consumer API—— SimpleConsumer。大家參考這篇文章來深入學習high level API的用法。目前為止,我們只需要知道Kafka通過下面的語句構建了ConsumerConnector這個consumer的核心接口:
1 val connector = Consumer.create(config) // 創建ConsumerConnector,Consumer核心接口
6. 構建JVM關閉鈎子線程
這部分非常簡單,就是在線程中關閉上一步創建的connector,並根據傳入的參數決定是否刪除zookeeper下/consumers/[group.id]節點
7. 創建KafkaStream,通過迭代器等待消息到來
由於console consumer支持同時消費多個topic的消息,因此它提供了類似於過濾器這樣的實現,這也是為什么connector調用createMessageStreamsByFilter來創建KafkaStream的原因,如下面的代碼所示。
1 val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) 2 val iter = if(maxMessages >= 0) 3 stream.slice(0, maxMessages) 4 else
5 stream
createMessageStreamsByFilter方法返回的是一組KafkaStream,但console consumer默認只是創建了1個stream,所以這里直接調用get(0)取到這個stream就可以了。
8. 通過迭代器以阻塞等待的方式消費消息
創建好KafkaStream之后,console consumer通過迭代器遍歷KafkaStream。這里值得注意的是,該迭代器底層實現依賴一個阻塞隊列。如果沒有顯式配置過consumer.timeout.ms參數(默認是-1表示consumer會一直等待新消息),那么迭代器會一直處於阻塞狀態等待可供消費的消息——具體的實現細節參見下一篇。迭代器每收到一條消息后,它就會使用默認的消息格式化類DefaultMessageFormatter將消息輸出到控制台,這也是console consumer名字的由來,如下面的代碼所示:
1 for(messageAndTopic <- iter) { 2 try { 3 formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) // 輸出到控制台
4 numMessages += 1
5 } catch { ... } 6 ... 7 }
好了,至此我們按照啟動順序概述了console consumer啟動時的各個階段。不過,ZookeeperConsumerConnector和創建和迭代器的實現我們並未詳細展開,這部分內容將作為后面續篇的內容呈現給大家。敬請期待!