【轉】Kafka producer原理 (Scala版同步producer)


轉載自:http://www.cnblogs.com/huxi2b/p/4583249.html     供參考

本文分析的Kafka代碼為kafka-0.8.2.1。另外,由於Kafka目前提供了兩套Producer代碼,一套是Scala版的舊版本;一套是Java版的新版本。雖然Kafka社區極力推薦大家使用Java版本的producer,但目前很多已有的程序還是調用了Scala版的API。今天我們就分析一下舊版producer的代碼。

  producer還分為同步和異步模式,由屬性producer.type指定,默認是sync,即同步發送模式。本文主要關注於同步發送的代碼走讀。下面以console-producer為例——console producer是Kafka自帶的一個工具,它可以很方便地以鍵盤輸入的方式接收消息並發送給指定的topic,非常適合作為我們學習的一個起點。
 
一、運行console-producer命令
我們的第一步是要啟動一個console-producer實例。最簡單的方式就是使用下面的命令:
 
除了絕對必要的topic, borker-list屬性,我們並沒有指定其他的參數。這幾乎是最簡單的啟動方式了。
【刊誤】console-producer如果不指定--sync默認應該是異步發送消息而非同步的,筆者之前說錯了,所以命令應該調整為:
  bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic --sync
 
二、構建Producer配置信息
producer的第一步就是要構造producer的配置信息,比如metadata.broker.list和request.required.acks等,完整的參數列表可以查詢Kafka官網,這些參數部分可以由啟動console-producer時候指定,部分是有默認值的。舉例來說,對於metadata.broker.list這樣必須要指定的參數,在調用console-producer時候就必須傳入broker-list的值給它賦值;而像request.required.acks這樣的參數,雖然從名稱上來看也是必要參數,但console-producer代碼中提供了默認值,因此我們可以選擇不顯式提供request-required-acks的值,如下面代碼所示:
1
2
3
4
5
val  requestRequiredAcksOpt  =  parser.accepts( "request-required-acks" "The required acks of the producer requests" )
      .withRequiredArg
      .describedAs( "request required acks" )
      .ofType(classOf[java.lang.Integer])
      .defaultsTo( 0 // 此處默認設置為0
三、構建JVM Shutdownhook
console-producer代碼此處添加了一個JVM關閉鈎子,用於確保producer的關閉。
 
四、發送消息
代碼此處循環從鍵盤中接收一行文本作為消息發送。需要注意的時,默認情況下構造的消息是沒有key的。由於是同步發送,每條消息都會在Producer的send方法中調用DefaultEventHandler的send方法進行發送,以下代碼是ConsoleProducer.scala中消息發送部分代碼:
1
2
3
4
5
do  {
           message  =  reader.readMessage()     // 從LineMessageReader類中讀取消息。該類接收鍵盤輸入的一行文本作為消息
           if (message ! =  null )
             producer.send(message.topic, message.key, message.message)  // key默認是空,如果想要指定,需傳入參數parse.key=true,默認key和消息文本之間的分隔符是'\t'
while (message ! =  null // 循環接收消息,除非Ctrl+C或其他其他引發IOException操作跳出循環

下面代碼是Producer.scala中的發送方法:  

1
2
3
4
5
6
7
8
9
10
11
def  send(messages :  KeyedMessage[K,V]*) {
     lock synchronized {
       if  (hasShutdown.get)  //如果producer已經關閉了拋出異常退出
         throw  new  ProducerClosedException
       recordStats(messages  //更新producer統計信息
       sync  match  {
         case  true  = > eventHandler.handle(messages)  //如果是同步發送,直接使用DefaultEventHandler的handle方法發送
         case  false  = > asyncSend(messages)  // 否則,使用ayncSend方法異步發送消息——本文不考慮這種情況
       }
     }
   }

由上面的分析可以看出,真正的發送邏輯其實是由DefaultEventHandler類的handle方法來完成的。下面我們重點分析一下這個類的代碼結構。

 

五、DefaultEventHandler與消息發送

這個類的handler方法可以同時支持同步和異步的消息發送。我們這里只考慮同步的代碼路徑。下面是消息發送的完整流程圖:

  

以下代碼是發送消息的核心邏輯:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
while  (remainingRetries >  0  && outstandingProduceRequests.size >  0 ) {   // 屬性message.send.max.retries指定了消息發送的重試次數,而outstandingProducerRequests就是序列化之后待發送的消息集合
       topicMetadataToRefresh ++ =  outstandingProduceRequests.map( _ .topic)  //將待發送消息所屬topic加入到待刷新元數據的topic集合
       if  (topicMetadataRefreshInterval > =  0  &&   
           SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {  //查看是否已過刷新元數據時間間隔
         Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))  // 更新topic元數據信息
         sendPartitionPerTopicCache.clear()  //如果消息key是空,代碼隨機選擇一個分區並記住該分區,以后該topic的消息都會往這個分區里面發送。sendPartitionPerTopicCache就是這個緩存
         topicMetadataToRefresh.clear  //清空待刷新topic集合
         lastTopicMetadataRefreshTime  =  SystemTime.milliseconds
       }
       outstandingProduceRequests  =  dispatchSerializedData(outstandingProduceRequests)  // 真正的消息發送方法
       if  (outstandingProduceRequests.size >  0 ) {  // 如果還有未發送成功的消息
         info( "Back off for %d ms before retrying send. Remaining retries = %d" .format(config.retryBackoffMs, remainingRetries- 1 ))
         // back off and update the topic metadata cache before attempting another send operation
         Thread.sleep(config.retryBackoffMs)  // 等待一段時間並重試
         // get topics of the outstanding produce requests and refresh metadata for those
         Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map( _ .topic).toSet, correlationId.getAndIncrement))
         sendPartitionPerTopicCache.clear()
         remainingRetries - =  1  // 更新剩余重試次數
         producerStats.resendRate.mark()
       }
     }

下面具體說說各個子模塊的代碼邏輯:  

5.1 serialize方法
該方法雖然是叫序列化,但其實主要的作用就是將字節數組格式的消息體轉成KeyedMessage格式。由於默認情況下我們沒有指定key,因此在構造KeyedMessage時就只需要指定消息體就好了,如下面的代碼所示:
1
2
3
4
5
6
serializedMessages + =
     new  KeyedMessage[K,Message](
     topic  =  e.topic,
     key  =  e.key,
     partKey  =  e.partKey,
     message  =  new  Message(bytes  =  encoder.toBytes(e.message)))  // new Message時沒有指定key

構建完KeyedMessage之后返回對應的消息集合即可。

5.2 更新topic元數據信息
Kafka是如何刷新某些topic的元數據信息的呢?它會向任意一個broker發送TopicMetadataRequest請求(TopicMetadataRequest是唯一一個能發給任意broker的請求API),使用獲取的響應來更新連入broker的緩存。TopicMetadataRequest的響應信息包括對應topic的Leader、AR、ISR信息。
具體到代碼而言,BrokerPartitionInfo的updateInfo方法就是做這件事情的,這個方法代碼不多,我們逐行分析下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def  updateInfo(topics :  Set[String], correlationId :  Int) {
     var  topicsMetadata :  Seq[TopicMetadata]  =  Nil  // TopicMetadata = topic信息+ 一組PartitionMetadata (partitionId + leader + AR + ISR)
     val  topicMetadataResponse  =  ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)  //構造TopicMetadataRequest並隨機排列所有broker,然后從第一個broker開始嘗試發送請求。一旦成功就終止后面的請求發送嘗試。
     topicsMetadata  =  topicMetadataResponse.topicsMetadata  //從response中取出zookeeper中保存的對應topic元數據信息
     // throw partition specific exception
     topicsMetadata.foreach(tmd  = >{
       trace( "Metadata for topic %s is %s" .format(tmd.topic, tmd))
       if (tmd.errorCode  ==  ErrorMapping.NoError) {
         topicPartitionInfo.put(tmd.topic, tmd)  //更新到broker的topic元數據緩存中
       else
         warn( "Error while fetching metadata [%s] for topic [%s]: %s " .format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
       tmd.partitionsMetadata.foreach(pmd  = >{
         if  (pmd.errorCode ! =  ErrorMapping.NoError && pmd.errorCode  ==  ErrorMapping.LeaderNotAvailableCode) {
           warn( "Error while fetching metadata %s for topic partition [%s,%d]: [%s]" .format(pmd, tmd.topic, pmd.partitionId,
             ErrorMapping.exceptionFor(pmd.errorCode).getClass))
         // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
       })
     })
     producerPool.updateProducer(topicsMetadata)
   }

關於上面代碼中的最后一行, 我們需要着重說一下。每個producer應用程序都會保存一個producer池對象來緩存每個broker上對應的同步producer實例。具體格式為brokerId -> SyncProducer。SyncProducer表示一個同步producer,其主要的方法是send,支持兩種請求的發送:ProducerRequest和TopicMetadataRequest。前者是發送消息的請求,后者是更新topic元數據信息的請求。為什么需要這份緩存呢?我們知道,每個topic分區都應該有一個leader副本在某個broker上,而只有leader副本才能接收客戶端發來的讀寫消息請求。對producer而言,即只有這個leader副本所在的broker才能接收ProducerRequest請求。在發送消息時候,我們會首先找出這個消息要發給哪個topic,然后發送更新topic元數據請求給任意broker去獲取最新的元數據信息——這部分信息中比較重要的就是要獲取topic各個分區的leader副本都在哪些broker上,這樣我們稍后會創建連接那些broker的阻塞通道(blocking channel)去實現真正的消息發送。Kafka目前的做法就是重建所有topic分區的leader副本所屬broker上對應的SyncProducer實例——雖然我覺得這樣實現有線沒有必要,只更新消息所屬分區的緩存信息應該就夠了(當然,這只是我的觀點,如果有不同意見歡迎拍磚)。以下是更新producer緩存的一些關鍵代碼:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val  newBrokers  =  new  collection.mutable.HashSet[Broker]
     topicMetadata.foreach(tmd  = > {   
       tmd.partitionsMetadata.foreach(pmd  = > {   
         if (pmd.leader.isDefined)  //遍歷topic元數據信息中的每個分區元數據實例,如果存在leader副本的,添加到newBrokers中以備后面更新緩存使用
           newBrokers+ = (pmd.leader.get)
       })
     })
     lock synchronized {
       newBrokers.foreach(b  = > {  //遍歷newBrokers中的每個broker實例,如果在緩存中已經存在,直接關閉掉然后創建一個新的加入到緩存中;否則直接創建一個加入
         if (syncProducers.contains(b.id)){
           syncProducers(b.id).close()
           syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
         else
           syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
       })
     }

前面說了,如果只發送一條消息的話,其實真正需要更新的分區leader副本所述broker對應的SyncProducer實例只有一個,但目前的代碼中會更新所有分區,不知道Java版本的producer是否也是這樣實現,這需要后面繼續調研!  

5.3 發送消息
更新完topic元數據信息之后就該真正地發送消息了,這是由dispatchSerializedData方法來實現的。該方法接收一組KeyedMessage消息集合並返回發送失敗的消息集合。如果返回None自然表示發送成功。該方法主要的邏輯如下圖所示:

  

為了更加直觀地說明上圖是如何完成消息發送的,我們先對Kafka環境做一些基本的假設。假設我們的Kafka環境有5個broker,ID分別為0, 1, 2, 3, 4。我們還定義了一個topic,名字是test-topic(其實名字不重要)。該topic有3個分區,分區ID分別是0, 1, 2,並假設每個分區的leader replica都是存在的。現在假設leader與broker的對應關系假定如下:
Topic 分區 Leader副本所在的broker ID
test-topic P0 0
test-topic P1 1
test-topic P2 3

 

 

 

如果基於這樣的配置,假定我們使用producer API一次性發送4條消息,分別是M1,M2, M3和M4。現在就可以開始分析代碼了,首先從消息分組及整理開始:

5.3.1 partitionAndCollate方法
了解一個方法最簡單的方式就是學習它的輸入,分析它的輸出。該方法接收一組待發送的消息集合——用Scala表示的話就是Seq[KeyedMessage[K, Message]],在我們的例子中很顯然這個集合中有4條消息。這個方法的輸出比較復雜,完整的寫法是:
Option[Map[Int, Map[TopicAndPartition, Seq[KeyedMessage[K, Message]]]]]
熟悉Scala語法的朋友可能會知道,這個返回值類型表示該方法可能會返回None——這表示producer代碼沒法對你要發送的消息按照broker進行分組或在分組過程中遇到了嚴重的錯誤,只能返回None由上層代碼來處理這種情況。如果確實返回了值,這個值長的是什么樣子呢?拿我們的例子來說,假定每條消息去被發送到的分區如下:( 這里的對應關系是假設的,其實在partitionAndCollate方法中會為每條消息都分配它要去的分區!)
消息 要被發送到的分區ID 該分區leader副本所在broker ID
M1 P0 0
M2 P0 0
M3 P1 1
M4 P2 3

 

 
 那么這個方法返回的結果就是:
0 - > {test-topic + P0 -> {M1, M2}},
1 -> {test-topic + P1 -> {M3}},
2 -> {test-topic + P2 -> {M4}} 
}

 

該方法的效果就是將所有待發送的消息首先按照broker進行分組,然后再按照分區進行整理。

 

當然了,上面我們假定了每條消息要去的分區,其實這也是在partitionAndCollate方法中被計算出來的。主要的邏輯是:

 

1. 首先判斷每條消息的分區key是否指定,如果指定了調用默認的分區類Partitioner的partition計算目標分區就是了。

 

2. 如果沒有指定key,就像默認使用console-producer的情況,代碼會首先從緩存中判斷以前是否保存該topic的信息——即該topic下所有沒有key的消息默認會被發送到同一個分區下。如果存在直接找出來就好了;否則隨機挑選一個返回並把它加入到緩存中,如下面代碼所示:
1
2
3
val  index  =  Utils.abs(Random.nextInt)  %  availablePartitions.size  // 隨機確定broker id
val  partitionId  =  availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId)  // 加入緩存中以便后續使用
5.3.2 groupMessagesToSet方法
通過上一步中將待發送消息集合按照broker和topic分區進行分組,Kafka對要發送的消息進行了分區。該操作完成之后代碼就需要遍歷整理過的消息數據,獲取消息數據中每個broker對應的分區消息映射,也就是類似於 {test-topic + P0 -> {M1, M2}}這樣的數據。然后將每個映射轉換為這樣的格式:
{(topic + 分區,  ByteBufferMessageSet(message),  (topic + 分區, ByteBufferMessage(message) }。還是以我們的例子而言,經過groupMessageToSet之后,每個broker對應的數據變為:
{
(topic + P0, ByteBufferMessageSet(M1, M2)),
(topic + P1, ByteBufferMessageSet(M3)),
(topic + P2, ByteBufferMessageSet(M4)),
}
這個方法還考慮壓縮的情況,即producer的屬性compression.codec中指定的壓縮策略。如果啟用了壓縮,追加寫當前日志段的時候會先解壓縮消息再寫入(詳見Log.scala的append方法)。
 
5.3.3 send發送消息
這個方法基於上一步中構造的(topic+分區, ByteBufferMessageSet)元組構造ProducerRequest發送給對應的broker,並返回發送失敗的topic分區集合。具體的邏輯如下:
1. 判斷要發送到的broker id是否合法,如果小於0的話(通常是-1),說明消息要發送到的分區沒有leader。這種情況下直接記錄一個警告信息並直接返回未發送的消息集合
2. 如果broker id是合法的,那么還需要再判斷一下要發送的消息是否為空,如果為空自然也不需要做什么,直接返回空集合就好了
3. 如果上一步中的確有要發送的消息,那么就根據request.required.acks以及超時時間等配置構造一個ProducerRequest將消息封裝進這個請求中。
4. 獲取這個broker上的syncProducer——這個也是從producer池緩存中拿到的,如果池緩存中沒有的話也只是記錄為一個警告,下次重試的時候刷新一下topic元數據信息就能夠創建出來了。
5. 一旦拿到目標broker上的syncProducer,就可以使用它來發送請求了,即調用syncProducer.send(producerRequest)
6. 請求被Kafka server處理之后(如何處理的下面會有詳細介紹)會發送一個對應的響應(response)給eventHandler。
7. 拿到response之后需要判斷一下response是否為空。這其實還要看下request.required.acks的設置。當該值是默認值0時表示producer不需要等待broker的應答(acknowledgement),這可以帶來最低的延遲但持久性也最差,因為如果一個broker宕機了有可能會丟失數據。如果該值是0, 那么Kafka處理完ProducerRequest之后並不發送任何response。因此若發現response是空,那么自然表示所有數據已經被發送了,返回空集合表示沒有發送失敗的分區消息
8. 但倘若request.required.acks是1(其實還有兩種情況,比如分區數是0等——這里不做討論),那么就表示producer在leader副本獲得數據后需要等待broker的應答。這個值的設置有更好的持久化效果。假設request.required.acks是1的話,那么Kafka處理完請求后悔發送response,因此代碼還要繼續解析response中的數據以確定到底有無失敗消息
9. 在開始解析response代碼之前,先來說說ProduceResponse的格式,如下圖所示:

  

 

response中比較重要的信息是topic下面多個分區對應的錯誤碼和消息待追加的第一條消息的位移。

 

因此,在拿到response之后,需要先判斷一下response中總的分區數是否和請求中的分區數一樣,如果不同的話說明在返回的response不完整,Kafka代碼會拋出異常。否則,就從response中找出那些有錯誤的分區(即錯誤碼不是NoError的)並返回。

 

至此,客戶端的producer程序就已經執行完畢了。可能有些人會感到奇怪?貌似消息只是以請求的方式被發送到Kafka server上,但消息不是還要被寫入到日志中嗎?這部分功能又是在哪里做的呢? 下面我們來看看Kafka server是如何處理ProducerRequest的?
 
六、 KafkaServer處理請求

 

Kafka server在啟動的時候會開啟N個線程來處理請求。其中N是由num.io.threads屬性指定,默認是8。Kafka推薦你設置該值至少是機器上磁盤數。在KafkaServer的startup方法中,如代碼所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def  startup() {
     ...
     // 創建一個請求處理的線程池,在構造時就會開啟多個線程准備接收請求
     requestHandlerPool  =  new  KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
     ...
}
 
class  KafkaRequestHandlerPool {
     ...
     for (i <-  0  until numThreads) {
         runnables(i)  =  new  KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
         threads(i)  =  Utils.daemonThread( "kafka-request-handler-"  + i, runnables(i))
         threads(i).start()  // 啟動每個請求處理線程
     }
     ...
}

KafkaRequestHandler實際上是一個Runnable,它的run核心方法中以while (true)的方式調用api.handle(request)不斷地接收請求處理,如下面的代碼所示:  

 

1
2
3
4
5
6
7
8
9
10
11
12
class  KafkaRequestHandler...  extends  Runnable {
     ...
     def  run() {
         ...
         while  ( true ) {
             ...
             apis.handle(request)  // 調用apis.handle等待請求處理
         }
         ...
     }
     ...   
}

在KafkaApis中handle的主要作用就是接收各種類型的請求。本文只關注ProducerRequest請求:  

 
1
2
3
4
5
6
7
8
def  handle(request :  RequestChannel.Request) {
     ...
     request.requestId  match  {
         case  RequestKeys.ProduceKey  = > handleProducerOrOffsetCommitRequest(request)  // 如果接收到ProducerRequest交由handleProducerOrOffsetCommitRequest處理
         case  ...
     }
     ...
}

如此看來,核心的方法就是handleProducerOrOffsetCommitRequest了。這個方法之所以叫這個名字,是因為它同時可以處理ProducerRequest和OffsetCommitRequest兩種請求,后者其實也是一種特殊的ProducerRequest。從Kafka 0.8.2之后kafka使用一個特殊的topic來保存提交位移(commit offset)。這個topic名字是__consumer_offsets。本文中我們關注的是真正的ProducerRequest。下面來看看這個方法的邏輯,如下圖所示:

整體邏輯看上去非常簡單,如下面的代碼所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def  handleProducerOrOffsetCommitRequest(request :  RequestChannel.Request) {
     ...
     val  localProduceResults  =  appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)  // 將消息追加寫入本地提交日志
     val  numPartitionsInError  =  localProduceResults.count( _ .error.isDefined)  // 計算是否存在發送失敗的分區
     if (produceRequest.requiredAcks  ==  0 ) {  // request.required.acks = 0時的代碼路徑
       if  (numPartitionsInError ! =  0 ) {
         info(( "Send the close connection response due to error handling produce request "  +
           "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0" )
           .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString( "," )))
         requestChannel.closeConnection(request.processor, request)  // 關閉底層Socket以告知客戶端程序有發送失敗的情況
       else  {
         ...
       }
     else  if  (produceRequest.requiredAcks  ==  1  ||  // request.required.acks = 0時的代碼路徑,當然還有其他兩個條件
         produceRequest.numPartitions < =  0  ||
         numPartitionsInError  ==  produceRequest.numPartitions) {
       val  response  =  offsetCommitRequestOpt.map( _ .responseFor(firstErrorCode, config.offsetMetadataMaxSize))
                                            .getOrElse(ProducerResponse(produceRequest.correlationId, statuses))
       requestChannel.sendResponse( new  RequestChannel.Response(request,  new  BoundedByteBufferSend(response)))  // 發送response給客戶端
     else  //  request.required.acks = -1時的代碼路徑
       // create a list of (topic, partition) pairs to use as keys for this delayed request
       val  producerRequestKeys  =  produceRequest.data.keys.toSeq
       val  statuses  =  localProduceResults.map(r  = >
         r.key -> DelayedProduceResponseStatus(r.end +  1 , ProducerResponseStatus(r.errorCode, r.start))).toMap
       val  delayedRequest  =   new  DelayedProduce(...)  // 此時需要構造延時請求進行處理,此段邏輯比較復雜,需要理解Purgatory的概念,本文暫不考慮
         ...
}

由上面代碼可見,無論request.required.acks是何值,都需要首先將待發送的消息集合追加寫入本地的提交日志中。此時如何按照默認值是是0的情況,那么這寫入日志后需要判斷下所有消息是否都已經發送成功了。如果出現了發送錯誤,那么就將關閉連入broker的Socket Server以通知客戶端程序錯誤的發生。現在的關鍵是追加寫是如何完成的?即方法appendToLocalLog如何實現的?該方法整體邏輯流程圖如下圖所示:

  

由於邏輯很直觀,不對代碼做詳細分析,不過值得關注的是這個方法會捕獲很多異常:

異常名稱 具體含義 異常處理
KafakStorageException 這可能是不可恢復的IO錯誤 既然無法恢復,則終止該broker上JVM進程
InvalidTopicException 顯式給__consumer_offsets topic發送消息就會有這個異常拋出,不要這么做,因為這是內部topic 將InvalidTopicException封裝進ProduceResult返回
UnknownTopicOrPartitionException topic或分區不在該broker上時拋出該異常 將UnknownTopicOrPartitionException封裝進ProduceResult返回
NotLeaderForPartitionException 目標分區的leader副本不在該broker上 將NotLeaderForPartitionException封裝進ProduceResult返回
NotEnoughReplicasException 只會出現在request.required.acks=-1且ISR中的副本數不滿足min.insync.replicas指定的最少副本數時會拋出該異常 將NotEnoughReplicasException封裝進ProduceResult返回
其他 處理ProducerRequest時發生的其他異常 將對應異常封裝進ProduceResult返回

okay,貌似現在我們就剩下最后一個主要的方法沒說了。分析完這個方法之后整個producer發送消息的流程應該就算是完整地走完了。最后的這個方法就是Partition的appendMessagesToLeader,其主要代碼如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def  appendMessagesToLeader(messages :  ByteBufferMessageSet, requiredAcks :  Int = 0 =  {
     inReadLock(leaderIsrUpdateLock) {
       val  leaderReplicaOpt  =  leaderReplicaIfLocal()  // 判斷目標分區的leader副本是否在該broker上
       leaderReplicaOpt  match  {
         case  Some(leaderReplica)  = // 如果leader副本在該broker上
           val  log  =  leaderReplica.log.get  // 獲取本地提交日志文件句柄
           val  minIsr  =  log.config.minInSyncReplicas
           val  inSyncSize  =  inSyncReplicas.size
 
           // Avoid writing to leader if there are not enough insync replicas to make it safe
           if  (inSyncSize < minIsr && requiredAcks  ==  - 1 ) {  //只有request.required.acks等於-1時才會判斷ISR數是否不足
             throw  new  NotEnoughReplicasException( "Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"
               .format(topic,partitionId,minIsr,inSyncSize))
           }
           val  info  =  log.append(messages, assignOffsets  =  true // 真正的寫日志操作,由於涉及Kafka底層寫日志的,以后有機會寫篇文章專門探討這部分功能
           // probably unblock some follower fetch requests since log end offset has been updated
           replicaManager.unblockDelayedFetchRequests( new  TopicAndPartition( this .topic,  this .partitionId))
           // we may need to increment high watermark since ISR could be down to 1
           maybeIncrementLeaderHW(leaderReplica)
           info
         case  None  = // 如果不在,直接拋出異常表明leader不在該broker上
           throw  new  NotLeaderForPartitionException( "Leader not local for partition [%s,%d] on broker %d"
             .format(topic, partitionId, localBrokerId))
       }
     }

至此,一個最簡單的scala版同步producer的代碼走讀就算正式完成了,可以發現Kafka設計的思路就是在每個broker上啟動一個server不斷地處理從客戶端發來的各種請求,完成對應的功能並按需返回對應的response。希望本文能對希望了解Kafka producer機制的人有所幫助。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM