一:消費端
消費端的參數定義在類:org.apache.kafka.clients.consumer.ConsumerConfig。
1.1:bootstrap.servers:默認值:空
用於建立到Kafka群集的初始連接的主機/端口對的列表。客戶機將使用所有服務器而不僅僅使用這里配置的節點。因為這些服務器地址僅用於初始化連接,並通過現有配置的來發現全部的kafka集群成員(集群隨時會變化),所以此列表不需要包含完整的集群地址(但盡量多配置幾個,以防止配置的服務器宕機)
格式:host1:port1,host2:port2,...
1.2:client.dns.lookup:默認值:default
控制客戶端如何使用DNS查找。如果配置為use_all_dns_ips,則依次連接到每個返回的IP地址,直到成功建立連接。如果配置為resolve_canonical_bootstrap_servers_only,則將每個引導地址解析成一個canonical名稱列表。
1.3:group.id:默認值:null
消費者所屬的消費者組的唯一標識。下面2種情況下,group.id必須要設置:(1):基於kafka的offset管理策略。 (2):KafkaConsumer使用subscribe接口訂閱消息。
1.4:group.instance.id:默認值:null
消費者實例ID,只允許非空字符串。如果設置,則使用者將被視為靜態成員,這意味着在任何時候消費者組中只允許有一個具有此ID的實例。如果不設置,消費者將作為動態成員加入群,這是傳統行為。
1.5:session.timeout.ms:默認值:10000毫秒
消費者組里面,檢測消費者失敗的會話超時時間。消費者會固定周期發送心跳消息到服務端,當服務端在指定時間內沒有收到心跳消息,則認為消費者丟失。這時候,服務端會從消費者組里踢出該節點,然后重新再平衡。需要注意的是:該值必須在 group.min.session.timeout.ms 和group.max.session.timeout.ms 之間。
1.6:heartbeat.interval.ms:默認值:3000毫秒
kafka消費組里面期望的心跳間隔時間。心跳是用來確保消費者會議保持活躍,並在新消費者加入或離開團體時促進再平衡。該值必須設置為低於session.timeout.ms的配置。但通常應設置為比這個值的三分之一還小。
1.7:partition.assignment.strategy:默認值:RangeAssignor.class
當使用組管理時,客戶端將使用分區分配策略的類名來分配消費者實例之間的分區所有權。通過實現org.apache.kafka.clients.consumer.ConsumerPartitionAssignor接口,可以插入自定義分配策略。
1.8:metadata.max.age.ms:默認值:5*60*1000毫秒
強制刷新元數據的周期時間。即使沒有任何分區領導層更改,也可以主動發現任何新的代理或分區。
1.9:enable.auto.commit:默認值:true
如果為true,消費者的offset將在周期性的在后台自動提交。
1.10:auto.commit.interval.ms:默認值:5000毫秒
消費者自動提交offset的頻率。當enable.auto.commit的值為true時有效。
1.11:client.id:默認值:""
發出請求時要傳遞給服務器的id字符串,這樣做的目的是通過允許在服務器端請求日志記錄中包含邏輯應用程序名稱,從而能夠跟蹤ip/端口以外的請求源。
1.12:client.rack:默認值:""
此客戶端的機架標識符。這可以是任何字符串值,指示此客戶端的物理位置。它與broker配置“broker.rack”相對應
1.13:max.partition.fetch.bytes:默認值:1*1024*1024 (1M)。
每個請求從服務器上每個分區返回的最大數據量。消費者批量從服務端獲取數據,如果獲取到的第一個非空的partition的數量大於該值,則仍然會返回。 服務器端最大的返回數據量由 message.max.bytes 配置決定(這個是服務端的配置)。或者通過topic的 max.message.bytes 配置設置。
1.14:send.buffer.bytes:默認值:128*1024
發送數據時要使用的TCP發送緩沖區的大小(SO_SNDBUF)。如果值為-1,則使用OS默認值。
1.15:receive.buffer.bytes: 默認值:64*1024
讀取數據時要使用的TCP接收緩沖區(SO_RCVBUF)的大小。如果值為-1,則使用OS默認值。
1.16:fetch.min.bytes: 默認值:1
從服務器上獲取請求返回的最小數據量。如果沒有足夠的數據可用,請求將等待大量數據積累后再回答請求。默認設置為1字節意味着只要有一個字節的數據可用,或者提取請求在等待數據到達時超時,提取請求就會得到響應。將此值設置為大於1的值將導致服務器等待更大數量的數據累積,這會稍微提高服務器吞吐量,但會增加一些延遲。
1.17:fetch.max.bytes:默認值:50*1024*1024(50M)
每個請求從服務器上返回的最大數據量。消費者批量從服務端獲取數據,如果獲取到的第一個非空的partition的數量大於該值,則仍然會返回。服務器端最大的返回數據量由 message.max.bytes 配置決定(這個是服務端的配置)。或者通過topic的 max.message.bytes 配置設置。
1.18:fetch.max.wait.ms: 默認值:500毫秒。
如果沒有足夠的數據來立即滿足fetch.min.bytes給出的要求,則服務器在響應fetch請求之前將阻止的最長時間。
1.19:reconnect.backoff.ms: 默認值:50毫秒
嘗試重新連接到給定主機之前等待的基本時間量。這樣可以避免在緊密循環中重復連接到主機。此回退適用於客戶端到代理的所有連接嘗試。
1.20:reconnect.backoff.max.ms: 默認值:1000毫秒
重新連接到重復連接失敗的代理時等待的最大時間(毫秒)
1.21: retry.backoff.ms: 默認值值:100毫秒
嘗試重試對給定主題分區的失敗請求之前等待的時間量。這樣可以避免在某些故障情況下以緊密循環的方式重復發送請求。
1.22:auto.offset.reset: 默認值:latest。可選值("latest", "earliest", "none")
如果Kafka中沒有初始偏移量,或者服務器上不再存在當前偏移量(例如,因為該數據已被刪除),該怎么辦:
earliest:自動將偏移量重置為最早偏移量
latest:自動將偏移量重置為最新偏移量
none:消費者組沒有找到位置偏移量,拋出異常
1.23:check.crcs: 默認值:true
自動檢查已消耗記錄的CRC32。這可確保不會發生對消息的在線或磁盤損壞。此檢查會增加一些開銷,因此在尋求極端性能的情況下可能會禁用它。
1.24:metrics.sample.window.ms: 默認值:30000毫秒
計算度量樣本的時間窗口。
1.25:metrics.num.samples
為計算度量而保留的樣本數。
1.26: metrics.recording.level: 默認值:INFO。可選值:DEBUG,INFO
計算最高紀錄級別。
1.27:key.deserializer: 默認值:
接口 org.apache.kafka.common.serialization.Deserializer 的實現類。對KEY進行反序列化。
1.28:value.deserializer: 默認值:
接口org.apache.kafka.common.serialization.Deserializer的實現類。用以對VALUE值進行反序列化。
1.29:request.timeout.ms:默認值:30000毫秒
配置控制客戶端對發起的請求響應等待的最長時間。如果在超時前沒有收到響應,當重試次數沒有用完之前,將發起重試,當重試次數用完之后,將得到失敗。
1.30:default.api.timeout.ms: 默認值:60000毫秒
消費者API可能阻塞的默認超時時間。沒有設置 timeout 參數時,使用該默認值。
1.31:connections.max.idle.ms:默認值:9*60000毫秒
配置連接的最大空閑時間。超過這個時間連接將關閉。
1.32:interceptor.classes:默認值:空列表
接口org.apache.kafka.clients.consumer.ConsumerInterceptor的實現類列表。用以添加在消費者受到消息之前的攔截器。
1.33:max.poll.records: 默認值:500
單次poll請求獲取的最大記錄條數。
1.34:max.poll.interval.ms: 默認值:300000毫秒
使用消費者組管理時,調用poll命令的最大延遲時間。此值是消費者獲取記錄前的最大空閑時間。如果消費者在此超時時間到達之前沒有調用poll命令,則認為此消費者是失敗的,此時會觸發消費者組的再平衡,以便將此分區分配給其它的消費者。如果消費者的 group.instance.id 配置是非空的,那么達到此超時時間,不會立刻重新分配分區,此時消費者將停止發送心跳消息,在到達 session.timeout.ms 配置的超時時間后,分區才會被重新分配。此反映的是已關閉消費者的行為。
1.35:exclude.internal.topics: 默認值:true
訂閱模式的內部topic是否應該從訂閱topic中排除
1.36:internal.leave.group.on.close: 默認值:true
消費者關閉后,是否從消費者組里移除
1.37:isolation.level: 默認值:READ_UNCOMMITTED,可選值:READ_COMMITTED,READ_UNCOMMITTED
控制如何讀取事務性寫入的消息。如果配置的是 read_committed,那么消費者使用poll命令只能讀取事務性提交的消息。如果配置的是 read_uncommitted,那么將得到所以的消息,甚至事務已經中斷。非事務性消息在任何一種模式下都將無條件返回。
1.38:allow.auto.create.topics: 默認值:true
在訂閱或者分配topic時,是否允許在服務端自動創建topic。同時,服務端的auto.create.topics.enable配置也必須為true才能自動創建。
1.39:security.providers: 默認值:null
接口 org.apache.kafka.common.security.auth.SecurityProviderCreator 的實現類,用以實現安全算法。
1.40:security.protocol: 默認值:PLAINTEXT
和服務端的通訊協議。有效值是:Utils.join(SecurityProtocol.names(), ", ")
二:生產端
生產端的參數定義在類:org.apache.kafka.clients.producer.ProducerConfig
2.1:bootstrap.servers:默認值:空
用於建立到Kafka群集的初始連接的主機/端口對的列表。客戶機將使用所有服務器而不僅僅使用這里配置的節點。因為這些服務器地址僅用於初始化連接,並通過現有配置的來發現全部的kafka集群成員(集群隨時會變化),所以此列表不需要包含完整的集群地址(但盡量多配置幾個,以防止配置的服務器宕機)
格式:host1:port1,host2:port2,...
2.2:client.dns.lookup:默認值:default
控制客戶端如何使用DNS查找。如果配置為use_all_dns_ips,則依次連接到每個返回的IP地址,直到成功建立連接。如果配置為resolve_canonical_bootstrap_servers_only,則將每個引導地址解析成一個canonical名稱列表。
2.3:buffer.memory: 默認值:32*1024*1024L
生產者可以用來緩沖等待發送到服務器的記錄的總內存字節數。如果發送記錄的速度比傳輸到服務端的速度快,那么生產端將被阻塞 MAX_BLOCK_MS_CONFIG 配置的時間,然后會拋出異常。此設置應大致與生產者將使用的總內存相對應,但不是硬限制。因為不是所有生產者都使用的緩沖。
2.4: retries: 默認值:Integer.MAX_VALUE。 范圍:[0,Integer.MAX_VALUE]
設置一個大於零的值將導致客戶端重新發送任何出現暫時性錯誤而發送失敗的記錄。此重試與客戶端在收到錯誤時重新發送記錄沒有區別。如果 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 設置的不是1,那么重試可能改變消息到達partition的順序。比如第一個消息失敗了重試,第二個消息成功,那么第二個消息可能先於第一個達到partition。 在配置了 DELIVERY_TIMEOUT_MS_CONFIG 的超時時間后,即使重試次數沒有使用完,但是超時時間已到,那么也會失敗。同城情況下,可以不設置此屬性,而使用 DELIVERY_TIMEOUT_MS_CONFIG 來控制。
2.5:acks: 默認值:"1",可選值:"all", "-1", "0", "1"
Leader收到的應答數以確定生產者請求是否處理完成。
如果設置為0,生產者將不會等待服務端的應答,消息將立即添加到套接字緩沖區並被視為已發送。在這樣的情況下,不能保證消息已發送到服務端,而且 retries 配置將不會生效。
如果設置為1, 消息被發送到Leader,並寫入Leader的log后,並不會等待follow的應答就直接響應。在這樣的情況下,Leader可以確定收到消息,但是Follow可能會存在消息丟失。
如果設置為all,那Leader會等待所有的follow都應答后再響應。這強力保證了消息不會丟失。
如果設置為-1,效果和設置為all一樣。
2.6:compression.type: 默認值:none
生產者生成的所有數據的壓縮類型。默認值為none,表示無壓縮。有效值為:none,gzip,snappy,lz4,zstd。壓縮是對整批數據的壓縮,所以批處理的效果也會影響壓縮比。
2.7:batch.size: 默認值:16384
每當多個消息發送到同一個partition,生產者將嘗試將記錄批處理到一起,以減少請求。這有助於提高客戶機和服務器的性能。此配置控制以字節為單位的默認批處理大小。不會嘗試批處理大於此大小的記錄。發送到代理的請求將包含多個批處理,每個分區一個批處理。比較小的batch.size並不是很通用,並可能降低吞吐量。一個非常大的batch.size可能會使用內存有點浪費,因為我們總是分配一個指定批量大小的緩沖區,以預期其他記錄。
2.8: linger.ms: 默認值:0
生產者將在請求傳輸之間到達的所有記錄組合到一個單獨的批處理請求中。通常情況下,只有當記錄到達的速度比發送的速度快時,才會發生這種情況。但在某些情況下,客戶可能希望即使在合適負載下也要減少請求數。這個配置設置的是一個延遲。這樣生產者不必立馬發送消息,而是等待配置的時間,以便進行批量發送消息,這個類似於TCP中的Nagle算法。當我們配置了 BATCH_SIZE_CONFIG 后,linger.ms是等待的上限,即使消息字節數沒有達到配置的值。如果LINGER_MS_CONFIG 配置為0,表示不等待。
2.9:delivery.timeout.ms: 默認值:120*1000 毫秒
調用 send() 方法后,報告成功或者失敗的時間上限。這個配置限制了消息延遲發送,等待服務端確認,失敗重試的最大時間上限。當遇到不可恢復的錯誤,重試次數已用盡,當時間沒有達到這個上限值,也會提前返回結果。此值應該不小於 REQUEST_TIMEOUT_MS_CONFIG 和 LINGER_MS_CONFIG 之和的值。
2.10:client.id: 默認值:""
發出請求時要傳遞給服務器的id字符串。這樣做的目的是通過允許在服務器端請求日志記錄中包含邏輯應用程序名稱,從而能夠跟蹤ip/端口以外的請求源。
2.11:send.buffer.bytes: 默認值:128*1024
發送數據時要使用的TCP發送緩沖區(SO_SNDBUF)的大小。如果值為-1,則使用OS默認值。
2.12:receive.buffer.bytes: 默認值:32*1024
在讀取數據時要使用的TCP接收緩沖區(SO_RCVBUF)的大小。如果值為-1,則使用OS默認值。
2.13:max.request.size: 默認值:1024*1024
請求的最大大小(字節)。當生產者批量發送消息時候,該設置限制着最大值,以免發送一個超大的請求。注意服務端也有一個請求的最大值,可能和這個值不一樣。
2.14:reconnect.backoff.ms:默認值:50L
嘗試重新連接到給定主機之前等待的基本時間量。這樣可以避免在緊密循環中重復連接到主機。
2.15:reconnect.backoff.max.ms: 默認值:1000L
重新連接到服務器的最大等待時間。如果提供的話,每台主機的重連將隨着每個連續的連接失敗而成倍增加,直到這個最大值。
2.16:retry.backoff.ms: 默認值:100L
嘗試重試對給定主題分區的失敗請求之前等待的時間量。這樣可以避免在某些故障情況下以緊密循環的方式重復發送請求。
2.17:max.block.ms: 默認值:60*1000
控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 命令的阻塞時間。由於緩沖區已滿或元數據不可用,這些方法可能被阻止。用戶提供的序列化程序或分區程序中的阻塞將不計入此超時。
2.18:request.timeout.ms: 默認值:30*1000
配置控制客戶端等待請求的響應的最大時間。如果在超時時間到達之前仍然沒有得到響應,那么將重試或者得到失敗。此配置的值應該大於 replica.lag.time.max.ms 的值,以減少由於不必要的生產者重試而導致消息重復的可能性。
2.19:metadata.max.age.ms: 默認值:5*60*1000
以毫秒為單位的一段時間,在這段時間之后,我們強制刷新元數據,即使我們沒有看到任何分區領導層更改,也可以主動發現任何新的代理或分區。
2.20:metrics.sample.window.ms: 默認值:30000
計算度量樣本的時間窗口。
2.21:metrics.num.samples: 默認值:2
為計算度量而保留的樣本數。
2.22:metrics.recording.level: 默認值:INFO。 可選值:INFO,DEBUG
度量的最高記錄級別。
2.23: metric.reporters: 默認值:空
用作度量報告器的類的列表。 是org.apache.kafka.common.metrics.MetricsReporter接口的實現類。JmxReporter總是包含在注冊JMX統計信息中。
2.24: max.in.flight.requests.per.connection: 默認值:5
在阻塞之前,客戶端將在單個連接上發送的最大未確認請求數。請注意,如果將此設置設置為大於1並且存在失敗的發送,則存在由於重試而導致消息重新排序的風險。
2.25: key.serializer: 默認值:無
接口org.apache.kafka.common.serialization.Serializer的實現類,用以對KEY進行序列化。
2.26:value.serializer: 默認值:無
接口 org.apache.kafka.common.serialization.Serializer 的實現類,用以對value進行序列化。
2.27:connections.max.idle.ms: 默認值:9*60*1000
在此配置指定的毫秒數之后關閉空閑連接。
2.28:partitioner.class: 默認值:無
接口org.apache.kafka.clients.producer.Partitioner的實現類,用以自定義分片算法。
2.29:interceptor.classes: 默認值:空
接口 org.apache.kafka.clients.producer.ProducerInterceptor 的實現類的攔截器列表。允許消息在發送到Kafka集群之前,對消息進行攔截。默認情況,是沒有攔截器的。
2.30:security.protocol: 默認值:PLAINTEXT
和服務器的通訊協議。可能的值是:Utils.join(SecurityProtocol.names(), ", ")
2.31:security.providers: 默認值:空
接口 org.apache.kafka.common.security.auth.SecurityProviderCreator 的實現類列表,用以實現安全算法。
2.32:enable.idempotence: 默認值:false
當設置為“true”時,生產者將確保流中只寫入每條消息的一個副本。如果“false”,則由於服務端失敗等原因導致的生產者重試可能會在流中寫入重試消息的副本。需要注意的是,如果設置為true,那么配置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 的值必須小於等於5,RETRIES_CONFIG配置必須大於0,ACKS_CONFIG 配置必須是all。如果用戶未明確設置這些值,則將選擇合適的值。如果設置了不兼容的值,將拋出ConfigException。
2.33: transaction.timeout.ms: 默認值:60000
事務協調器在主動中止正在進行的事務之前等待生產者更新事務狀態的最長時間(毫秒)。如果此值大於代理中的transaction.max.timeout.ms設置,求將失敗,並出現InvalidTransactionTimeout錯誤。
2.34:transactional.id: 默認值:無
用於事務傳遞的TransactionalId。這支持跨多個生產者會話的可靠性語義,因為它允許客戶機保證在啟動任何新事務之前,使用相同TransactionalId的事務已經完成。如果未提供TransactionalId,則生產者僅限於冪等傳遞。請注意,如果配置了TransactionalId,那么enable.idempotence的配置必須是true。默認值為null,這意味着不能使用事務。