一、生產者配置
| 屬性 | 描述 | 類型 | 默認值 | 重要性 |
bootstrap.servers |
用於建立與kafka集群的連接,這個list僅僅影響用於初始化的hosts,來發現全部的servers。 格式:host1:port1,host2:port2,…,數量盡量不止一個,以防其中一個down了 |
list | "" | 高 |
acks |
procedure要求leader在考慮完成請求之前收到的確認數,用於控制發送記錄在服務端的持久化,其值可以為如下:
在這種情況下,無法保證服務器已收到記錄,並且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設置為-1。
在這種情況下,如果leader在確認記錄后立即失敗,但在將數據復制到所有的副本服務器之前,則記錄將會丟失。
這是最強有力的保證,這相當於acks = -1的設置。
|
string | 1 | 高 |
retries |
發送失敗重試次數。 設置大於0的值將使客戶端重新發送任何數據,一旦這些數據發送失敗。注意,這些重試與客戶端接收到發送錯誤時的重試沒有什么不同。 允許重試將潛在的改變數據的順序,如果這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。 |
string | 1 | 高 |
retry.backoff.ms |
發送失敗,每次重試的間隔毫秒數。 | long | 100 | 低 |
buffer.memory |
producer可以用來緩存數據的內存大小。如果數據產生速度大於向broker發送的速度,producer會阻塞或者拋出異常,以“block.on.buffer.full”來表明。 這項設置將和producer能夠使用的總內存相關,但並不是一個硬性的限制,因為不是producer使用的所有內存都是用於緩存。 一些額外的內存會用於壓縮(如果引入壓縮機制),同樣還有一些用於維護請求。 |
long | 33554432 (32M) |
高 |
batch.size |
批處理大小。 每當多個記錄被發送到同一分區時,生產者將嘗試將記錄一起批量處理為更少的請求, |
int | 16384 (16K) |
中 |
linger.ms |
生產者將在請求傳輸之間到達的所有記錄組合到一個個Batch中。一個Batch被創建之后,最多過linger.ms,不管這個Batch有沒有寫滿,都必須發送出去了。 | long | 0 | 中 |
max.request.size |
請求的最大大小(字節)。 這個參數決定了每次發送給Kafka服務器請求的最大大小,同時也會限制你一條消息的最大大小也不能超過這個參數設置的值。 |
int | 1048576 (1M) |
中 |
compression.type |
producer用於壓縮數據的壓縮類型。默認是無壓縮。正確的選項值是none、gzip、snappy。壓縮最好用於批量處理,批量處理消息越多,壓縮性能越好 | string | none | 高 |
key.serializer |
key的Serializer類,實現了org.apache.kafka.common.serialization.Serializer接口 |
class | 高 | |
value.serializer |
值的Serializer類,實現了org.apache.kafka.common.serialization.Serializer接口 |
class | 高 | |
client.id |
當向server發出請求時,這個字符串會發送給server。目的是能夠追蹤請求源頭,以此來允許ip/port許可列表之外的一些應用可以發送信息。 這項應用可以設置任意字符串,因為沒有任何功能性的目的,除了記錄和跟蹤 |
string | "" | 中 |
client.dns.lookup |
控制客戶端如何使用DNS查找。 | string | default | 中 |
| connections.max.idle.ms | 在此配置指定的毫秒數之后關閉空閑連接。 | long | 540000 | 中 |
delivery.timeout.ms |
調用send()返回后報告成功或失敗的時間上限。 此配置的值應大於或等於 |
int | 120000 | 中 |
max.block.ms |
控制block的時長,當buffer空間不夠或者metadata丟失時產生block. | long | 60000 | 中 |
request.timeout.ms |
配置控制客戶端等待請求響應的最長時間。如果在超時時間過去之前未收到響應,則客戶端將在必要時重新發送請求,或者在重試次數用盡時使請求失敗。 | int | 30000 | 中 |
partitioner.class |
實現 org.apache.kafka.clients.producer.Partitioner 接口 默認值:org.apache.kafka.clients.producer.internals.DefaultPartitioner |
class | 中 |
二、消費者配置
| 屬性 | 描述 | 類型 | 默認值 | 重要性 |
bootstrap.servers |
以逗號分隔的主機:端口對列表,用於建立與Kafka群集的初始連接 | list | "" | 高 |
group.id |
用來唯一標識consumer進程所在組的字符串,如果設置同樣的group id,表示這些processes都是屬於同一個consumer group | string | null | 高 |
fetch.min.bytes |
服務器應為獲取請求返回的最小數據量。如果可用的數據不足,則請求將在響應請求之前等待該數據的累積。 默認設置為1字節意味着,只要有一個字節的數據可用,或者提取請求在等待數據到達時超時,就會響應提取請求。 |
int | 1 | 高 |
fetch.max.bytes |
服務器應為獲取請求返回的最大數據量。記錄由使用者分批獲取,如果獲取的第一個非空分區中的第一個記錄批大於此值, 則仍將返回該記錄批,以確保使用者能夠取得進展。 |
int | 52428800 | 中 |
max.partition.fetch.bytes |
服務器將返回的每個分區的最大數據量。記錄由消費者分批提取。 如果fetch的第一個非空分區中的第一個記錄批大於此限制,則仍將返回該批以確保使用者能夠取得進展。 |
int | 1048576 | 高 |
heartbeat.interval.ms |
使用Kafka的組管理工具時,消費者協調器的心跳之間的預期時間。 心跳用於確保消費者的會話保持活動狀態,並在新消費者加入或離開組時促進重新平衡。該值必須設置為低於session.timeout.ms, 但通常應設置為不高於該值的1/3。它可以調整得更低,以控制正常再平衡的預期時間。 |
int | 3000 | 高 |
session.timeout.ms |
使用Kafka的組管理工具時用於檢測客戶端故障的超時。 如果consumer在這段時間內沒有發送心跳信息,則它會被認為掛掉了,並且reblance將會產生, 必須在[group.min.session.timeout.ms, group.max.session.timeout.ms]范圍內 |
int | 10000 | 高 |
request.timeout.ms |
配置控制客戶端等待請求響應的最長時間。 如果在超時時間過去之前未收到響應,則客戶端將在必要時重新發送請求,或者在重試次數用盡時使請求失敗。 |
int | 3000 | 高 |
key.deserializer |
key的反序列化類。實現了org.apache.kafka.common.serialization.Deserializer接口 | class | 高 | |
value.deserializer |
值的反序列化類。實現了org.apache.kafka.common.serialization.Deserializer接口 | class | 高 | |
allow.auto.create.topics |
允許在訂閱或分配主題時在代理上自動創建主題。 只有當代理允許使用 auto.create.topics.enable 的情況下才生效。 |
boolean | true | 中 |
exclude.internal.topics |
是否應將與訂閱模式匹配的內部主題從訂閱中排除。始終可以顯式訂閱內部主題。 | boolean | true | 中 |
enable.auto.commit |
啟動自動提交。 如果為真,則用戶的偏移量將在后台定期提交。 |
boolean | true | 中 |
auto.commit.interval.ms |
使用者偏移自動提交到Kafka的頻率(以毫秒為單位),enable.auto.commit設置為true。 | int | 5000 | 低 |
auto.offset.reset |
當Kafka中沒有初始偏移量或服務器上不再存在當前偏移量時該怎么辦(例如,由於該數據已被刪除):
|
string | latest | 中 |
max.poll.interval.ms |
使用使用者組管理時調用poll()之間的最大延遲。這為消費者在獲取更多記錄之前可以空閑的時間量設置了上限。 如果在此超時過期之前未調用poll(),則認為使用者失敗,該組將重新平衡,以便將分區重新分配給另一個成員。 對於使用非空group.instance.id組如果達到此超時,則不會立即重新分配分區。 |
int | 300000 | 中 |
max.poll.records |
在對poll()的單個調用中返回的最大記錄數。 max.poll.records條數據需要在session.timeout.ms這個時間內處理完 |
int | 500 | 中 |
client.dns.lookup |
控制客戶端如何使用DNS查找。 | string | default | 中 |
connections.max.idle.ms |
在此配置指定的毫秒數之后關閉空閑連接。 | long | 540000 | 中 |
default.api.timeout.ms |
指定客戶端API的超時(毫秒)。 | int | 60000 | 中 |
group.instance.id |
最終用戶提供的使用者實例的唯一標識符。 | string | null | 中 |
| 在“range”和“roundrobin”策略之間選擇一種作為分配partitions給consumer 數據流的策略; 循環的partition分配器分配所有可用的partitions以及所有可用consumer 線程。 它會將partition循環的分配到consumer線程上。如果所有consumer實例的訂閱都是確定的,則partitions的划分是確定的分布。循環分配策略只有在以下條件滿足時才可以: (1)每個topic在每個consumer實例上都有同樣數量的數據流。(2)訂閱的topic的集合對於consumer group中每個consumer實例來說都是確定的 |
list | class | 中 | |
send.buffer.bytes |
發送數據時要使用的TCP發送緩沖區(SO_SNDBUF)的大小。如果值為-1,將使用操作系統默認值。 | int | 131072 | 中 |
receive.buffer.bytes |
讀取數據時要使用的TCP接收緩沖區(SO_RCVBUF)的大小。如果值為-1,將使用操作系統默認值。 | int | 65536 | 中 |
client.id |
請求時傳遞給服務器的id字符串。這樣做的目的是通過允許在服務器端請求日志記錄中包含邏輯應用程序名來跟蹤請求源,而不僅僅是ip/端口。 | string | "" | 低 |
fetch.max.wait.ms |
Fetch請求發給broker后,在broker中可能會被阻塞的(當topic中records的總size小於fetch.min.bytes時),此時這個fetch請求耗時就會比較長。 這個配置就是來配置consumer最多等待response多久。 |
int | 500 | 低 |
引用官網:http://kafka.apache.org/documentation/#consumerconfigs
https://www.cnblogs.com/yx88/p/11013338.html
