Kafka生產者與分區策略


Kafka生產者與分區策略

注:部分內容參考自:https://www.cnblogs.com/cxuanBlog/p/11949238.html

生產者發送消息流程

image-20200927095842317

首先,我們創建了一個ProducerRecord對象,它由要發送的消息key-value、要發送的主題名、可選的分區號構成,在發送 ProducerRecord時,需要將key-value通過序列化器序列化為字節數組,這樣才能在網絡上傳輸,然后消息到達分區器。

如果發送過程中指定了有效的分區號,則直接發送到該分區,如果發送時未指定,則默認使用key的hash值指定一個分區,如果發送時未指定消息key,則采用輪詢的方式選擇一個分區,這就是Kafka默認的分區策略,后面會詳細講解

ProducerRecord 還有關聯的時間戳,如果用戶沒有提供時間戳,那么生產者將會在記錄中使用當前的時間作為時間戳。Kafka 最終使用的時間戳取決於 topic 主題配置的時間戳類型。

  • 如果將主題配置為使用 CreateTime,則生產者記錄中的時間戳將由 broker 使用。
  • 如果將主題配置為使用LogAppendTime,則生產者記錄中的時間戳在將消息添加到其日志中時,將由 broker 重寫。

然后,這條消息被存放在一個記錄批次里,這個批次里的所有消息會被發送到相同的主題和分區上。由一個獨立的線程負責把它們發到 Kafka Broker 上。

Kafka Broker 在收到消息時會返回一個響應,如果寫入成功,會返回一個 RecordMetaData 對象,它包含了主題和分區信息,以及記錄在分區里的偏移量,上面兩種的時間戳類型也會返回給用戶。如果寫入失敗,會返回一個錯誤。生產者在收到錯誤之后會嘗試重新發送消息,幾次之后如果還是失敗的話,就返回錯誤消息。

分區策略

上面介紹了Kafka的默認分區策略,如果需要自定義分區策略,需要實現Partitioner接口中的partition方法,並在配置中指定。

Partitioner 類有三個方法,分別來解釋一下

  • partition(): 這個類有幾個參數: topic,表示需要傳遞的主題;key 表示消息中的鍵值;keyBytes表示分區中序列化過后的key,byte數組的形式傳遞;value 表示消息的 value 值;valueBytes 表示分區中序列化后的值數組;cluster表示當前集群的原數據。Kafka 給你這么多信息,就是希望讓你能夠充分地利用這些信息對消息進行分區,計算出它要被發送到哪個分區中。
  • close() : 繼承了 Closeable 接口能夠實現 close() 方法,在分區關閉時調用。
  • onNewBatch(): 表示通知分區程序用來創建新的批次

其中與分區策略息息相關的就是 partition() 方法了,分區策略有下面這幾種

順序輪詢

順序分配,消息是均勻的分配給每個 partition,即每個分區存儲一次消息,輪詢策略是 Kafka Producer 提供的默認策略,如果你不使用指定的輪詢策略的話,Kafka 默認會使用順序輪訓策略的方式。

隨機分配

實現隨機分配的代碼只需要兩行,如下

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

生產者配置參數說明

key.serializer

用於 key 鍵的序列化,它實現了 org.apache.kafka.common.serialization.Serializer 接口

value.serializer

用於 value 值的序列化,實現了 org.apache.kafka.common.serialization.Serializer 接口

acks

acks 參數指定了要有多少個分區副本接收消息,生產者才認為消息是寫入成功的。此參數對消息丟失的影響較大

  • 如果 acks = 0,就表示生產者也不知道自己產生的消息是否被服務器接收了,它才知道它寫成功了。如果發送的途中產生了錯誤,生產者也不知道,它也比較懵逼,因為沒有返回任何消息。這就類似於 UDP 的運輸層協議,只管發,服務器接受不接受它也不關心。
  • 如果 acks = 1,只要集群的 Leader 接收到消息,就會給生產者返回一條消息,告訴它寫入成功。如果發送途中造成了網絡異常或者 Leader 還沒選舉出來等其他情況導致消息寫入失敗,生產者會受到錯誤消息,這時候生產者往往會再次重發數據。因為消息的發送也分為 同步異步,Kafka 為了保證消息的高效傳輸會決定是同步發送還是異步發送。如果讓客戶端等待服務器的響應(通過調用 Future 中的 get() 方法),顯然會增加延遲,如果客戶端使用回調,就會解決這個問題。
  • 如果 acks = all,這種情況下是只有當所有參與復制的節點都收到消息時,生產者才會接收到一個來自服務器的消息。不過,它的延遲比 acks =1 時更高,因為我們要等待不只一個服務器節點接收消息。

buffer.memory

此參數用來設置生產者內存緩沖區的大小,生產者用它緩沖要發送到服務器的消息。如果應用程序發送消息的速度超過發送到服務器的速度,會導致生產者空間不足。這個時候,send() 方法調用要么被阻塞,要么拋出異常,具體取決於 block.on.buffer.null參數的設置。

compression.type

此參數來表示生產者啟用何種壓縮算法,默認情況下,消息發送時不會被壓縮。該參數可以設置為 snappy、gzip 和 lz4,它指定了消息發送給 broker 之前使用哪一種壓縮算法進行壓縮。

retries

生產者從服務器收到的錯誤有可能是臨時性的錯誤(比如分區找不到首領),在這種情況下,reteis 參數的值決定了生產者可以重發的消息次數,如果達到這個次數,生產者會放棄重試並返回錯誤。默認情況下,生產者在每次重試之間等待 100ms,這個等待參數可以通過 retry.backoff.ms 進行修改。

batch.size

當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算。當批次被填滿,批次里的所有消息會被發送出去。不過生產者井不一定都會等到批次被填滿才發送,任意條數的消息都可能被發送。

client.id

此參數可以是任意的字符串,服務器會用它來識別消息的來源,一般配置在日志里

max.in.flight.requests.per.connection

此參數指定了生產者在收到服務器響應之前可以發送多少消息,它的值越高,就會占用越多的內存,不過也會提高吞吐量。把它設為1 可以保證消息是按照發送的順序寫入服務器。

timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms 指定了生產者在發送數據時等待服務器返回的響應時間,metadata.fetch.timeout.ms 指定了生產者在獲取元數據(比如目標分區的首領是誰)時等待服務器返回響應的時間。如果等待時間超時,生產者要么重試發送數據,要么返回一個錯誤。timeout.ms 指定了 broker 等待同步副本返回消息確認的時間,與 asks 的配置相匹配----如果在指定時間內沒有收到同步副本的確認,那么 broker 就會返回一個錯誤。

max.block.ms

此參數指定了在調用 send() 方法或使用 partitionFor() 方法獲取元數據時生產者的阻塞時間當生產者的發送緩沖區已捕,或者沒有可用的元數據時,這些方法就會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。

max.request.size

該參數用於控制生產者發送的請求大小。它可以指能發送的單個消息的最大值,也可以指單個請求里所有消息的總大小。

receive.buffer.bytes 和 send.buffer.bytes

Kafka 是基於 TCP 實現的,為了保證可靠的消息傳輸,這兩個參數分別指定了 TCP Socket 接收和發送數據包的緩沖區的大小。如果它們被設置為 -1,就使用操作系統的默認值。如果生產者或消費者與 broker 處於不同的數據中心,那么可以適當增大這些值。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM