| 屬性 | 描述 | 類型 | 默認值 |
|---|---|---|---|
| bootstrap.servers | 用於建立與kafka集群的連接,這個list僅僅影響用於初始化的hosts,來發現全部的servers。 格式:host1:port1,host2:port2,…,數量盡量不止一個,以防其中一個down了 |
list | |
| acks | Server完成 producer request 前需要確認的數量。acks=0時,producer不會等待確認,直接添加到socket等待發送;acks=1時,等待leader寫到local log就行;acks=all或acks=-1時,等待isr中所有副本確認(注意:確認都是 broker 接收到消息放入內存就直接返回確認,不是需要等待數據寫入磁盤后才返回確認,這也是kafka快的原因) |
string | 1 |
| buffer.memory | Producer可以用來緩存數據的內存大小。該值實際為RecordAccumulator類中的BufferPool,即Producer所管理的最大內存。 如果數據產生速度大於向broker發送的速度,producer會阻塞 max.block.ms,超時則拋出異常 |
long | 33554432 |
| compression.type | Producer用於壓縮數據的壓縮類型,取值:none, gzip, snappy, or lz4 |
string | none |
| batch.size | Producer可以將發往同一個Partition的數據做成一個Produce Request發送請求,即Batch批處理,以減少請求次數,該值即為每次批處理的大小。 另外每個Request請求包含多個Batch,每個Batch對應一個Partition,且一個Request發送的目的Broker均為這些partition的leader副本。 若將該值設為0,則不會進行批處理 |
int | 16384 |
| linger.ms | Producer默認會把兩次發送時間間隔內收集到的所有Requests進行一次聚合然后再發送,以此提高吞吐量,而linger.ms則更進一步,這個參數為每次發送增加一些delay,以此來聚合更多的Message。 官網解釋翻譯:producer會將request傳輸之間到達的所有records聚合到一個批請求。通常這個值發生在欠負載情況下,record到達速度快於發送。但是在某些場景下,client即使在正常負載下也期望減少請求數量。這個設置就是如此,通過人工添加少量時延,而不是立馬發送一個record,producer會等待所給的時延,以讓其他records發送出去,這樣就會被聚合在一起。這個類似於TCP的Nagle算法。該設置給了batch的時延上限:當我們獲得一個partition的 batch.size大小的records,就會立即發送出去,而不管該設置;但是如果對於這個partition沒有累積到足夠的record,會linger指定的時間等待更多的records出現。該設置的默認值為0(無時延)。例如,設置linger.ms=5,會減少request發送的數量,但是在無負載下會增加5ms的發送時延。 |
long | 0 |
| max.request.size | 請求的最大字節數。這也是對最大消息大小的有效限制。注意:server具有自己對消息大小的限制,這些大小和這個設置不同。此項設置將會限制producer每次批量發送請求的數目,以防發出巨量的請求。 | int | 1048576 |
| receive.buffer.bytes | TCP的接收緩存 SO_RCVBUF 空間大小,用於讀取數據 | int | 32768 |
| request.timeout.ms | client等待請求響應的最大時間,如果在這個時間內沒有收到響應,客戶端將重發請求,超過重試次數發送失敗 | int | 30000 |
| send.buffer.bytes | TCP的發送緩存 SO_SNDBUF 空間大小,用於發送數據 | int | 131072 |
| timeout.ms | 指定server等待來自followers的確認的最大時間,根據acks的設置,超時則返回error |
int | 30000 |
| max.in.flight.requests.per.connection | 在block前一個connection上允許最大未確認的requests數量。 當設為1時,即是消息保證有序模式,注意:這里的消息保證有序是指對於單個Partition的消息有順序,因此若要保證全局消息有序,可以只使用一個Partition,當然也會降低性能 |
int | 5 |
| metadata.fetch.timeout.ms | 在第一次將數據發送到某topic時,需先fetch該topic的metadata,得知哪些服務器持有該topic的partition,該值為最長獲取metadata時間 | long | 60000 |
| reconnect.backoff.ms | 連接失敗時,當我們重新連接時的等待時間 | long | 50 |
| retry.backoff.ms | 在重試發送失敗的request前的等待時間,防止若目的Broker完全掛掉的情況下Producer一直陷入死循環發送,折中的方法 | long | 100 |
其余參數(注:以下均為默認值)
# metrics系統維護可配置的樣本數量,在一個可修正的window size
metrics.sample.window.ms=30000
# 用於維護metrics的樣本數
metrics.num.samples=2
# 類的列表,用於衡量指標。實現MetricReporter接口
metric.reporters=[]
# 強制刷新metadata的周期,即使leader沒有變化
metadata.max.age.ms=300000
# 與broker會話協議,取值:LAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
security.protocol=PLAINTEXT
# 分區類,實現Partitioner接口
partitioner.class=class org.apache.kafka.clients.producer.internals.DefaultPartitioner
# 控制block的時長,當buffer空間不夠或者metadata丟失時產生block
max.block.ms=60000
# 關閉達到該時間的空閑連接
connections.max.idle.ms=540000
# 當向server發出請求時,這個字符串會發送給server,目的是能夠追蹤請求源
client.id=""
# 發生錯誤時,重傳次數。當開啟重傳時,需要將`max.in.flight.requests.per.connection`設置為1,否則可能導致失序
retries=0
# key 序列化方式,類型為class,需實現Serializer interface
key.serializer=
# value 序列化方式,類型為class,需實現Serializer interface
value.serializer=
