Spring-kafka —— 生產者消費者重要配置


 

一、生產者配置

屬性 描述 類型 默認值 重要性

bootstrap.servers

用於建立與kafka集群的連接,這個list僅僅影響用於初始化的hosts,來發現全部的servers。
格式:host1:port1,host2:port2,…,數量盡量不止一個,以防其中一個down了
list ""
 

acks

 procedure要求leader在考慮完成請求之前收到的確認數,用於控制發送記錄在服務端的持久化,其值可以為如下:
  • acks = 0 如果設置為零,則生產者將不會等待來自服務器的任何確認,該記錄將立即添加到套接字緩沖區並視為已發送。

   在這種情況下,無法保證服務器已收到記錄,並且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設置為-1。

  • acks = 1 這意味着leader會將記錄寫入其本地日志,但無需等待所有副本服務器的完全確認即可做出回應,

         在這種情況下,如果leader在確認記錄后立即失敗,但在將數據復制到所有的副本服務器之前,則記錄將會丟失。

  • acks = all 這意味着leader將等待完整的同步副本集以確認記錄,這保證了只要至少一個同步副本服務器仍然存活,記錄就不會丟失,

     這是最強有力的保證,這相當於acks = -1的設置。

  • 可以設置的值為:all, -1, 0, 1
string  1  高

retries

發送失敗重試次數。

設置大於0的值將使客戶端重新發送任何數據,一旦這些數據發送失敗。注意,這些重試與客戶端接收到發送錯誤時的重試沒有什么不同。

允許重試將潛在的改變數據的順序,如果這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。

string 1  高

retry.backoff.ms

發送失敗,每次重試的間隔毫秒數。 long  100   低

buffer.memory

producer可以用來緩存數據的內存大小。如果數據產生速度大於向broker發送的速度,producer會阻塞或者拋出異常,以“block.on.buffer.full”來表明。

這項設置將和producer能夠使用的總內存相關,但並不是一個硬性的限制,因為不是producer使用的所有內存都是用於緩存。

一些額外的內存會用於壓縮(如果引入壓縮機制),同樣還有一些用於維護請求。

long

33554432 

(32M)

 高

batch.size

批處理大小。

每當多個記錄被發送到同一分區時,生產者將嘗試將記錄一起批量處理為更少的請求,
這有助於提升客戶端和服務端之間的性能,此配置控制默認批量大小(以字節為單位)

int   16384

(16K)

中 

linger.ms

生產者將在請求傳輸之間到達的所有記錄組合到一個個Batch中。一個Batch被創建之后,最多過linger.ms,不管這個Batch有沒有寫滿,都必須發送出去了。 long  中 

max.request.size

請求的最大大小(字節)。

這個參數決定了每次發送給Kafka服務器請求的最大大小,同時也會限制你一條消息的最大大小也不能超過這個參數設置的值。

int 

1048576

(1M)

中 

compression.type

producer用於壓縮數據的壓縮類型。默認是無壓縮。正確的選項值是none、gzip、snappy。壓縮最好用於批量處理,批量處理消息越多,壓縮性能越好 string none

key.serializer

key的Serializer類,實現了org.apache.kafka.common.serialization.Serializer接口
class  

value.serializer

值的Serializer類,實現了org.apache.kafka.common.serialization.Serializer接口
class  

client.id

當向server發出請求時,這個字符串會發送給server。目的是能夠追蹤請求源頭,以此來允許ip/port許可列表之外的一些應用可以發送信息。

這項應用可以設置任意字符串,因為沒有任何功能性的目的,除了記錄和跟蹤

string ""

client.dns.lookup

控制客戶端如何使用DNS查找。 string default
connections.max.idle.ms 在此配置指定的毫秒數之后關閉空閑連接。 long 540000

delivery.timeout.ms

調用send()返回后報告成功或失敗的時間上限。

此配置的值應大於或等於 request.timeout.ms 加 linger.ms 的總和.

int 120000

max.block.ms

控制block的時長,當buffer空間不夠或者metadata丟失時產生block. long 60000

request.timeout.ms        

配置控制客戶端等待請求響應的最長時間。如果在超時時間過去之前未收到響應,則客戶端將在必要時重新發送請求,或者在重試次數用盡時使請求失敗。 int 30000

partitioner.class

實現 org.apache.kafka.clients.producer.Partitioner 接口

默認值:org.apache.kafka.clients.producer.internals.DefaultPartitioner

class  

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

二、消費者配置

屬性 描述 類型 默認值 重要性

bootstrap.servers

以逗號分隔的主機:端口對列表,用於建立與Kafka群集的初始連接 list ""

group.id

用來唯一標識consumer進程所在組的字符串,如果設置同樣的group id,表示這些processes都是屬於同一個consumer group                                                   string null

fetch.min.bytes

服務器應為獲取請求返回的最小數據量。如果可用的數據不足,則請求將在響應請求之前等待該數據的累積。

默認設置為1字節意味着,只要有一個字節的數據可用,或者提取請求在等待數據到達時超時,就會響應提取請求。

int 1

fetch.max.bytes

服務器應為獲取請求返回的最大數據量。記錄由使用者分批獲取,如果獲取的第一個非空分區中的第一個記錄批大於此值,

則仍將返回該記錄批,以確保使用者能夠取得進展。

int   52428800 中 

max.partition.fetch.bytes

服務器將返回的每個分區的最大數據量。記錄由消費者分批提取。

如果fetch的第一個非空分區中的第一個記錄批大於此限制,則仍將返回該批以確保使用者能夠取得進展。

int  1048576

heartbeat.interval.ms

使用Kafka的組管理工具時,消費者協調器的心跳之間的預期時間。

心跳用於確保消費者的會話保持活動狀態,並在新消費者加入或離開組時促進重新平衡。該值必須設置為低於session.timeout.ms,

但通常應設置為不高於該值的1/3。它可以調整得更低,以控制正常再平衡的預期時間。

int 3000

session.timeout.ms

使用Kafka的組管理工具時用於檢測客戶端故障的超時。

如果consumer在這段時間內沒有發送心跳信息,則它會被認為掛掉了,並且reblance將會產生,

必須在[group.min.session.timeout.ms, group.max.session.timeout.ms]范圍內

int  10000

request.timeout.ms

配置控制客戶端等待請求響應的最長時間。

如果在超時時間過去之前未收到響應,則客戶端將在必要時重新發送請求,或者在重試次數用盡時使請求失敗。 

int  3000

key.deserializer

key的反序列化類。實現了org.apache.kafka.common.serialization.Deserializer接口 class  

value.deserializer

值的反序列化類。實現了org.apache.kafka.common.serialization.Deserializer接口 class  

allow.auto.create.topics

允許在訂閱或分配主題時在代理上自動創建主題。

只有當代理允許使用 auto.create.topics.enable 的情況下才生效。

boolean true

exclude.internal.topics

是否應將與訂閱模式匹配的內部主題從訂閱中排除。始終可以顯式訂閱內部主題。 boolean true

enable.auto.commit

啟動自動提交。

如果為真,則用戶的偏移量將在后台定期提交。

boolean true

auto.commit.interval.ms

使用者偏移自動提交到Kafka的頻率(以毫秒為單位),enable.auto.commit設置為true。  int 5000 低 

auto.offset.reset

當Kafka中沒有初始偏移量或服務器上不再存在當前偏移量時該怎么辦(例如,由於該數據已被刪除):

  • earliest:將偏移量自動重置為最早的偏移量
  • latest:自動將偏移量重置為最新偏移量
  • none:如果未找到消費者組的先前偏移量,則向消費者拋出異常
  • anything else:向消費者拋出異常
string latest

max.poll.interval.ms

使用使用者組管理時調用poll()之間的最大延遲。這為消費者在獲取更多記錄之前可以空閑的時間量設置了上限。

如果在此超時過期之前未調用poll(),則認為使用者失敗,該組將重新平衡,以便將分區重新分配給另一個成員。

對於使用非空group.instance.id組如果達到此超時,則不會立即重新分配分區。

int 300000

max.poll.records

在對poll()的單個調用中返回的最大記錄數。

max.poll.records條數據需要在session.timeout.ms這個時間內處理完

int 500

client.dns.lookup

控制客戶端如何使用DNS查找。 string default

connections.max.idle.ms

在此配置指定的毫秒數之后關閉空閑連接。 long 540000

default.api.timeout.ms

指定客戶端API的超時(毫秒)。  int 60000

group.instance.id

最終用戶提供的使用者實例的唯一標識符。  string  null

partition.assignment.strategy

在“range”和“roundrobin”策略之間選擇一種作為分配partitions給consumer 數據流的策略; 循環的partition分配器分配所有可用的partitions以及所有可用consumer 線程。

它會將partition循環的分配到consumer線程上。如果所有consumer實例的訂閱都是確定的,則partitions的划分是確定的分布。循環分配策略只有在以下條件滿足時才可以:

(1)每個topic在每個consumer實例上都有同樣數量的數據流。(2)訂閱的topic的集合對於consumer group中每個consumer實例來說都是確定的

list class

send.buffer.bytes

發送數據時要使用的TCP發送緩沖區(SO_SNDBUF)的大小。如果值為-1,將使用操作系統默認值。 int 131072 中 

receive.buffer.bytes

讀取數據時要使用的TCP接收緩沖區(SO_RCVBUF)的大小。如果值為-1,將使用操作系統默認值。  int  65536

client.id

請求時傳遞給服務器的id字符串。這樣做的目的是通過允許在服務器端請求日志記錄中包含邏輯應用程序名來跟蹤請求源,而不僅僅是ip/端口。 string ""

fetch.max.wait.ms

Fetch請求發給broker后,在broker中可能會被阻塞的(當topic中records的總size小於fetch.min.bytes時),此時這個fetch請求耗時就會比較長。

這個配置就是來配置consumer最多等待response多久。

int 500

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

引用官網:http://kafka.apache.org/documentation/#consumerconfigs

https://www.cnblogs.com/yx88/p/11013338.html

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM