轉載自:http://www.cnblogs.com/huxi2b/p/4583249.html 供參考
本文分析的Kafka代碼為kafka-0.8.2.1。另外,由於Kafka目前提供了兩套Producer代碼,一套是Scala版的舊版本;一套是Java版的新版本。雖然Kafka社區極力推薦大家使用Java版本的producer,但目前很多已有的程序還是調用了Scala版的API。今天我們就分析一下舊版producer的代碼。

1
2
3
4
5
|
val
requestRequiredAcksOpt
=
parser.accepts(
"request-required-acks"
,
"The required acks of the producer requests"
)
.withRequiredArg
.describedAs(
"request required acks"
)
.ofType(classOf[java.lang.Integer])
.defaultsTo(
0
)
// 此處默認設置為0
|
1
2
3
4
5
|
do
{
message
=
reader.readMessage()
// 從LineMessageReader類中讀取消息。該類接收鍵盤輸入的一行文本作為消息
if
(message !
=
null
)
producer.send(message.topic, message.key, message.message)
// key默認是空,如果想要指定,需傳入參數parse.key=true,默認key和消息文本之間的分隔符是'\t'
}
while
(message !
=
null
)
// 循環接收消息,除非Ctrl+C或其他其他引發IOException操作跳出循環
|
下面代碼是Producer.scala中的發送方法:
1
2
3
4
5
6
7
8
9
10
11
|
def
send(messages
:
KeyedMessage[K,V]*) {
lock synchronized {
if
(hasShutdown.get)
//如果producer已經關閉了拋出異常退出
throw
new
ProducerClosedException
recordStats(messages
//更新producer統計信息
sync
match
{
case
true
=
> eventHandler.handle(messages)
//如果是同步發送,直接使用DefaultEventHandler的handle方法發送
case
false
=
> asyncSend(messages)
// 否則,使用ayncSend方法異步發送消息——本文不考慮這種情況
}
}
}
|
由上面的分析可以看出,真正的發送邏輯其實是由DefaultEventHandler類的handle方法來完成的。下面我們重點分析一下這個類的代碼結構。
五、DefaultEventHandler與消息發送
這個類的handler方法可以同時支持同步和異步的消息發送。我們這里只考慮同步的代碼路徑。下面是消息發送的完整流程圖:
以下代碼是發送消息的核心邏輯:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
while
(remainingRetries >
0
&& outstandingProduceRequests.size >
0
) {
// 屬性message.send.max.retries指定了消息發送的重試次數,而outstandingProducerRequests就是序列化之后待發送的消息集合
topicMetadataToRefresh ++
=
outstandingProduceRequests.map(
_
.topic)
//將待發送消息所屬topic加入到待刷新元數據的topic集合
if
(topicMetadataRefreshInterval >
=
0
&&
SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
//查看是否已過刷新元數據時間間隔
Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
// 更新topic元數據信息
sendPartitionPerTopicCache.clear()
//如果消息key是空,代碼隨機選擇一個分區並記住該分區,以后該topic的消息都會往這個分區里面發送。sendPartitionPerTopicCache就是這個緩存
topicMetadataToRefresh.clear
//清空待刷新topic集合
lastTopicMetadataRefreshTime
=
SystemTime.milliseconds
}
outstandingProduceRequests
=
dispatchSerializedData(outstandingProduceRequests)
// 真正的消息發送方法
if
(outstandingProduceRequests.size >
0
) {
// 如果還有未發送成功的消息
info(
"Back off for %d ms before retrying send. Remaining retries = %d"
.format(config.retryBackoffMs, remainingRetries-
1
))
// back off and update the topic metadata cache before attempting another send operation
Thread.sleep(config.retryBackoffMs)
// 等待一段時間並重試
// get topics of the outstanding produce requests and refresh metadata for those
Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(
_
.topic).toSet, correlationId.getAndIncrement))
sendPartitionPerTopicCache.clear()
remainingRetries -
=
1
// 更新剩余重試次數
producerStats.resendRate.mark()
}
}
|
下面具體說說各個子模塊的代碼邏輯:
1
2
3
4
5
6
|
serializedMessages +
=
new
KeyedMessage[K,Message](
topic
=
e.topic,
key
=
e.key,
partKey
=
e.partKey,
message
=
new
Message(bytes
=
encoder.toBytes(e.message)))
// new Message時沒有指定key
|
構建完KeyedMessage之后返回對應的消息集合即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
def
updateInfo(topics
:
Set[String], correlationId
:
Int) {
var
topicsMetadata
:
Seq[TopicMetadata]
=
Nil
// TopicMetadata = topic信息+ 一組PartitionMetadata (partitionId + leader + AR + ISR)
val
topicMetadataResponse
=
ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
//構造TopicMetadataRequest並隨機排列所有broker,然后從第一個broker開始嘗試發送請求。一旦成功就終止后面的請求發送嘗試。
topicsMetadata
=
topicMetadataResponse.topicsMetadata
//從response中取出zookeeper中保存的對應topic元數據信息
// throw partition specific exception
topicsMetadata.foreach(tmd
=
>{
trace(
"Metadata for topic %s is %s"
.format(tmd.topic, tmd))
if
(tmd.errorCode
==
ErrorMapping.NoError) {
topicPartitionInfo.put(tmd.topic, tmd)
//更新到broker的topic元數據緩存中
}
else
warn(
"Error while fetching metadata [%s] for topic [%s]: %s "
.format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
tmd.partitionsMetadata.foreach(pmd
=
>{
if
(pmd.errorCode !
=
ErrorMapping.NoError && pmd.errorCode
==
ErrorMapping.LeaderNotAvailableCode) {
warn(
"Error while fetching metadata %s for topic partition [%s,%d]: [%s]"
.format(pmd, tmd.topic, pmd.partitionId,
ErrorMapping.exceptionFor(pmd.errorCode).getClass))
}
// any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
})
})
producerPool.updateProducer(topicsMetadata)
}
|
關於上面代碼中的最后一行, 我們需要着重說一下。每個producer應用程序都會保存一個producer池對象來緩存每個broker上對應的同步producer實例。具體格式為brokerId -> SyncProducer。SyncProducer表示一個同步producer,其主要的方法是send,支持兩種請求的發送:ProducerRequest和TopicMetadataRequest。前者是發送消息的請求,后者是更新topic元數據信息的請求。為什么需要這份緩存呢?我們知道,每個topic分區都應該有一個leader副本在某個broker上,而只有leader副本才能接收客戶端發來的讀寫消息請求。對producer而言,即只有這個leader副本所在的broker才能接收ProducerRequest請求。在發送消息時候,我們會首先找出這個消息要發給哪個topic,然后發送更新topic元數據請求給任意broker去獲取最新的元數據信息——這部分信息中比較重要的就是要獲取topic各個分區的leader副本都在哪些broker上,這樣我們稍后會創建連接那些broker的阻塞通道(blocking channel)去實現真正的消息發送。Kafka目前的做法就是重建所有topic分區的leader副本所屬broker上對應的SyncProducer實例——雖然我覺得這樣實現有線沒有必要,只更新消息所屬分區的緩存信息應該就夠了(當然,這只是我的觀點,如果有不同意見歡迎拍磚)。以下是更新producer緩存的一些關鍵代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
val
newBrokers
=
new
collection.mutable.HashSet[Broker]
topicMetadata.foreach(tmd
=
> {
tmd.partitionsMetadata.foreach(pmd
=
> {
if
(pmd.leader.isDefined)
//遍歷topic元數據信息中的每個分區元數據實例,如果存在leader副本的,添加到newBrokers中以備后面更新緩存使用
newBrokers+
=
(pmd.leader.get)
})
})
lock synchronized {
newBrokers.foreach(b
=
> {
//遍歷newBrokers中的每個broker實例,如果在緩存中已經存在,直接關閉掉然后創建一個新的加入到緩存中;否則直接創建一個加入
if
(syncProducers.contains(b.id)){
syncProducers(b.id).close()
syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
}
else
syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
})
}
|
前面說了,如果只發送一條消息的話,其實真正需要更新的分區leader副本所述broker對應的SyncProducer實例只有一個,但目前的代碼中會更新所有分區,不知道Java版本的producer是否也是這樣實現,這需要后面繼續調研!
Topic | 分區 | Leader副本所在的broker ID |
test-topic | P0 | 0 |
test-topic | P1 | 1 |
test-topic | P2 | 3 |
如果基於這樣的配置,假定我們使用producer API一次性發送4條消息,分別是M1,M2, M3和M4。現在就可以開始分析代碼了,首先從消息分組及整理開始:
消息 | 要被發送到的分區ID | 該分區leader副本所在broker ID |
M1 | P0 | 0 |
M2 | P0 | 0 |
M3 | P1 | 1 |
M4 | P2 | 3 |
1
2
3
|
val
index
=
Utils.abs(Random.nextInt)
%
availablePartitions.size
// 隨機確定broker id
val
partitionId
=
availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId)
// 加入緩存中以便后續使用
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
def
startup() {
...
// 創建一個請求處理的線程池,在構造時就會開啟多個線程准備接收請求
requestHandlerPool
=
new
KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
...
}
class
KafkaRequestHandlerPool {
...
for
(i <-
0
until numThreads) {
runnables(i)
=
new
KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
threads(i)
=
Utils.daemonThread(
"kafka-request-handler-"
+ i, runnables(i))
threads(i).start()
// 啟動每個請求處理線程
}
...
}
|
KafkaRequestHandler實際上是一個Runnable,它的run核心方法中以while (true)的方式調用api.handle(request)不斷地接收請求處理,如下面的代碼所示:
1
2
3
4
5
6
7
8
9
10
11
12
|
class
KafkaRequestHandler...
extends
Runnable {
...
def
run() {
...
while
(
true
) {
...
apis.handle(request)
// 調用apis.handle等待請求處理
}
...
}
...
}
|
在KafkaApis中handle的主要作用就是接收各種類型的請求。本文只關注ProducerRequest請求:
1
2
3
4
5
6
7
8
|
def
handle(request
:
RequestChannel.Request) {
...
request.requestId
match
{
case
RequestKeys.ProduceKey
=
> handleProducerOrOffsetCommitRequest(request)
// 如果接收到ProducerRequest交由handleProducerOrOffsetCommitRequest處理
case
...
}
...
}
|
如此看來,核心的方法就是handleProducerOrOffsetCommitRequest了。這個方法之所以叫這個名字,是因為它同時可以處理ProducerRequest和OffsetCommitRequest兩種請求,后者其實也是一種特殊的ProducerRequest。從Kafka 0.8.2之后kafka使用一個特殊的topic來保存提交位移(commit offset)。這個topic名字是__consumer_offsets。本文中我們關注的是真正的ProducerRequest。下面來看看這個方法的邏輯,如下圖所示:
整體邏輯看上去非常簡單,如下面的代碼所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
def
handleProducerOrOffsetCommitRequest(request
:
RequestChannel.Request) {
...
val
localProduceResults
=
appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)
// 將消息追加寫入本地提交日志
val
numPartitionsInError
=
localProduceResults.count(
_
.error.isDefined)
// 計算是否存在發送失敗的分區
if
(produceRequest.requiredAcks
==
0
) {
// request.required.acks = 0時的代碼路徑
if
(numPartitionsInError !
=
0
) {
info((
"Send the close connection response due to error handling produce request "
+
"[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0"
)
.format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(
","
)))
requestChannel.closeConnection(request.processor, request)
// 關閉底層Socket以告知客戶端程序有發送失敗的情況
}
else
{
...
}
}
else
if
(produceRequest.requiredAcks
==
1
||
// request.required.acks = 0時的代碼路徑,當然還有其他兩個條件
produceRequest.numPartitions <
=
0
||
numPartitionsInError
==
produceRequest.numPartitions) {
val
response
=
offsetCommitRequestOpt.map(
_
.responseFor(firstErrorCode, config.offsetMetadataMaxSize))
.getOrElse(ProducerResponse(produceRequest.correlationId, statuses))
requestChannel.sendResponse(
new
RequestChannel.Response(request,
new
BoundedByteBufferSend(response)))
// 發送response給客戶端
}
else
{
// request.required.acks = -1時的代碼路徑
// create a list of (topic, partition) pairs to use as keys for this delayed request
val
producerRequestKeys
=
produceRequest.data.keys.toSeq
val
statuses
=
localProduceResults.map(r
=
>
r.key -> DelayedProduceResponseStatus(r.end +
1
, ProducerResponseStatus(r.errorCode, r.start))).toMap
val
delayedRequest
=
new
DelayedProduce(...)
// 此時需要構造延時請求進行處理,此段邏輯比較復雜,需要理解Purgatory的概念,本文暫不考慮
...
}
|
由上面代碼可見,無論request.required.acks是何值,都需要首先將待發送的消息集合追加寫入本地的提交日志中。此時如何按照默認值是是0的情況,那么這寫入日志后需要判斷下所有消息是否都已經發送成功了。如果出現了發送錯誤,那么就將關閉連入broker的Socket Server以通知客戶端程序錯誤的發生。現在的關鍵是追加寫是如何完成的?即方法appendToLocalLog如何實現的?該方法整體邏輯流程圖如下圖所示:
由於邏輯很直觀,不對代碼做詳細分析,不過值得關注的是這個方法會捕獲很多異常:
異常名稱 | 具體含義 | 異常處理 |
KafakStorageException | 這可能是不可恢復的IO錯誤 | 既然無法恢復,則終止該broker上JVM進程 |
InvalidTopicException | 顯式給__consumer_offsets topic發送消息就會有這個異常拋出,不要這么做,因為這是內部topic | 將InvalidTopicException封裝進ProduceResult返回 |
UnknownTopicOrPartitionException | topic或分區不在該broker上時拋出該異常 | 將UnknownTopicOrPartitionException封裝進ProduceResult返回 |
NotLeaderForPartitionException | 目標分區的leader副本不在該broker上 | 將NotLeaderForPartitionException封裝進ProduceResult返回 |
NotEnoughReplicasException | 只會出現在request.required.acks=-1且ISR中的副本數不滿足min.insync.replicas指定的最少副本數時會拋出該異常 | 將NotEnoughReplicasException封裝進ProduceResult返回 |
其他 | 處理ProducerRequest時發生的其他異常 | 將對應異常封裝進ProduceResult返回 |
okay,貌似現在我們就剩下最后一個主要的方法沒說了。分析完這個方法之后整個producer發送消息的流程應該就算是完整地走完了。最后的這個方法就是Partition的appendMessagesToLeader,其主要代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
def
appendMessagesToLeader(messages
:
ByteBufferMessageSet, requiredAcks
:
Int
=
0
)
=
{
inReadLock(leaderIsrUpdateLock) {
val
leaderReplicaOpt
=
leaderReplicaIfLocal()
// 判斷目標分區的leader副本是否在該broker上
leaderReplicaOpt
match
{
case
Some(leaderReplica)
=
>
// 如果leader副本在該broker上
val
log
=
leaderReplica.log.get
// 獲取本地提交日志文件句柄
val
minIsr
=
log.config.minInSyncReplicas
val
inSyncSize
=
inSyncReplicas.size
// Avoid writing to leader if there are not enough insync replicas to make it safe
if
(inSyncSize < minIsr && requiredAcks
==
-
1
) {
//只有request.required.acks等於-1時才會判斷ISR數是否不足
throw
new
NotEnoughReplicasException(
"Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"
.format(topic,partitionId,minIsr,inSyncSize))
}
val
info
=
log.append(messages, assignOffsets
=
true
)
// 真正的寫日志操作,由於涉及Kafka底層寫日志的,以后有機會寫篇文章專門探討這部分功能
// probably unblock some follower fetch requests since log end offset has been updated
replicaManager.unblockDelayedFetchRequests(
new
TopicAndPartition(
this
.topic,
this
.partitionId))
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
info
case
None
=
>
// 如果不在,直接拋出異常表明leader不在該broker上
throw
new
NotLeaderForPartitionException(
"Leader not local for partition [%s,%d] on broker %d"
.format(topic, partitionId, localBrokerId))
}
}
|
至此,一個最簡單的scala版同步producer的代碼走讀就算正式完成了,可以發現Kafka設計的思路就是在每個broker上啟動一個server不斷地處理從客戶端發來的各種請求,完成對應的功能並按需返回對應的response。希望本文能對希望了解Kafka producer機制的人有所幫助。