kafka系列七、kafka核心配置


一、producer核心配置

1、acks :發送應答(默認值:1)

生產者在考慮完成請求之前要求leader收到的確認的數量。這控制了發送的記錄的持久性。允許以下設置:

  1. acks=0:設置為0,則生產者將完全不等待來自服務器的任何確認。記錄將立即添加到socket緩沖區,並被認為已發送。在這種情況下,不能保證服務器已經收到記錄,重試配置將不會生效(因為客戶機通常不會知道任何失敗)。每個記錄返回的偏移量總是-1。
  2. acks=1:leader會將記錄寫到本地日志中,但不會等待所有follower的完全確認。在這種情況下,如果leader在記錄失敗后立即失敗,但在追隨者復制記錄之前失敗,那么記錄就會丟失。
  3. acks=all / -1:leader將等待完整的同步副本來確認記錄。這保證了只要至少有一個同步副本仍然存在,記錄就不會丟失。這是最有力的保證。這相當於acks=-1設置。

2、batch.size:批量發送大小(默認:16384,16K)

緩存到本地內存,批量發送大小,意思每次發送16K到broke。當多個記錄被發送到同一個分區時,生產者將嘗試將記錄批處理成更少的請求。這有助於客戶機和服務器上的性能。此配置以字節為單位控制默認批處理大小。

3、bootstrap.servers:服務器地址

broke服務器地址,多個用逗號割開。

4、buffer.memory:生產者最大可用緩存 (默認:33554432,32M)

生產者可以用來緩沖等待發送到服務器的記錄的總內存字節。如果記錄被發送的速度超過了它們可以被發送到服務器的速度,那么生產者將阻塞max.block。然后它會拋出一個異常。

該設置應該大致與生成器將使用的總內存相對應,但不是硬綁定,因為生成器使用的並非所有內存都用於緩沖。一些額外的內存將用於壓縮(如果啟用了壓縮)以及維護飛行中的請求。

生產者產生的消息緩存到本地,每次批量發送batch.size大小到服務器。

5、client.id:生產者ID(默認“”)

請求時傳遞給服務器的id字符串。這樣做的目的是通過允許在服務器端請求日志中包含邏輯應用程序名稱,從而能夠跟蹤ip/端口之外的請求源。

6、compression.type:壓縮類型(默認值:producer)

指定給定主題的最終壓縮類型。此配置接受標准壓縮編解碼器(“gzip”、“snappy”、“lz4”、“zstd”)。它還接受“未壓縮”,相當於沒有壓縮;以及“生產者”,即保留生產者設置的原始壓縮編解碼器。

“gzip”:壓縮效率高,適合高內存、CPU

“snappy”:適合帶寬敏感性,壓縮力度大

7、retries:失敗重試次數(默認:2147483647)

異常是RetriableException類型或者TransactionManager允許重試;

transactionManager.canRetry()后面會分析;先看看哪些異常是RetriableException類型異常。

允許重試,但不需要設置max.in.flight.requests.per.connection(單個連接上發送的未確認請求的最大數量)連接到1可能會改變記錄的順序,因為如果將兩個批發送到單個分區,第一個批處理失敗並重試,但是第二個批處理成功,那么第二個批處理中的記錄可能先出現。

通過delivery.timeout.ms也可以控制重試次數,如果重試次數沒有用盡,傳輸超時也會停止。

retry.backoff.ms:重試阻塞時間(默認:100)

這避免了在某些失敗場景下以緊密循環的方式重復發送請求。

8、delivery.timeout.ms:傳輸時間(默認:120000,2分鍾)

生產者發送完請求接受服務器ACk的時間,該時間允許重試 ,該配置應該大於request.timeout.ms + linger.ms。

9、connections.max.idle.ms:關閉空閑連接時間(默認:540000)

 在此配置指定的毫秒數之后關閉空閑連接。

10、enable.idempotence:開啟冪等(默認:false)

當設置為“true”時,生產者將確保在流中准確地寫入每個消息的副本。如果“false”,則由於代理失敗而導致生產者重試,等等,可能會在流中寫入重試消息的副本。請注意,啟用冪等需要使用max.in.flight.requests.per.connection,連接小於或等於5,重試大於0且ack必須為“all”。如果用戶沒有顯式地設置這些值,將選擇合適的值。如果設置了不兼容的值,就會拋出ConfigException。

11、max.in.flight.requests.per.connection:單個連接上發送的未確認請求的最大數量(默認:5)

阻塞前客戶端在單個連接上發送的未確認請求的最大數量。請注意,如果該設置設置為大於1,並且發送失敗,則有由於重試(即,如果啟用重試)。

12、interceptor.classes:攔截器(默認:無)

用作攔截器的類的列表。實現接口:org.apache.kafka.clients.producer。ProducerInterceptor接口允許將生產者接收到的記錄發布到Kafka集群之前攔截它們(可能還會發生突變)。默認情況下,沒有攔截器。

13、key.serializer:key序列化器(默認無)

實現org.apache.kafka.common. serialize .Serializer接口的key的序列化器類。String可配置:class org.apache.kafka.common.serialization.StringSerializer。

14、value.serializer:value序列化器(默認無)

序列化器類的值,該值實現org.apache.kafka.common. serialize .Serializer接口。String可配置:class org.apache.kafka.common.serialization.StringSerializer

15、linger.ms:發送延遲時間(默認:0)

為減少負載和客戶端的請求數量,生產者不會一條一條發送,而是會逗留一段時間批量發送。batch.size和linger.ms滿足任何一個條件都會發送。

16、max.block.ms:阻塞時間(默認:60000,一分鍾)

配置控制KafkaProducer.send()和KafkaProducer.partitionsFor()阻塞的時間。由於緩沖區已滿或元數據不可用,也會阻塞。用戶提供的序列化器或分區程序中的阻塞將不計入此超時。

17、max.request.size:最大請求字節大小(默認:1048576,1M)

請求的最大字節大小。此設置將限制生產者在單個請求中發送記錄批的數量,以避免發送大量請求。這也有效地限制了最大記錄批大小。注意,服務器對記錄批處理大小有自己的上限,這可能與此不同。

18、metric.reporters:自定義指標報告器

用作指標報告器的類的列表。metricsreporter接口實現了org.apache.kafka.common.metrics.MetricsReporter接口,該接口允許插入將在創建新度量時得到通知的類。JmxReporter始終包含在注冊JMX統計信息中。

19、partitioner.class:自定義分區策略

實現接口 org.apache.kafka.clients.producer.Partitioner,默認值:org.apache.kafka.clients.producer.internals.DefaultPartitioner

20、request.timeout.ms:請求超時時間(默認:30000)

配置控制客戶機等待請求響應的最長時間。如果在超時超時之前沒有收到響應,客戶端將在需要時重新發送請求,或者在重試耗盡時失敗請求。這個應該大於replica.lag.time.max。ms(代理配置),以減少由於不必要的生產者重試而導致消息重復的可能性。

21、receive.buffer.bytes(默認:32768,32K)

讀取數據時使用的TCP接收緩沖區(SO_RCVBUF)的大小。如果值是-1,將使用OS默認值。

22、send.buffer.bytes(默認:131072,128K)

發送數據時使用的TCP發送緩沖區(SO_SNDBUF)的大小。如果值是-1,將使用OS默認值。

23、retry.backoff.ms:重試阻塞時間(默認:100)

這避免了在某些失敗場景下以緊密循環的方式重復發送請求。

二、consumer核心配置

1、enable.auto.commit:開啟自動提交(默認:true)

如果為true,consumer的偏移量將在后台定期提交。

2、auto.commit.interval.ms:自動提交頻率(默認:5000)

如果enable.auto.commit設置為true,則使用者偏移量自動提交到Kafka的頻率(毫秒)。

3、client.id:客戶ID

便於跟蹤日志。

4、check.crcs:是否開啟數據校驗(默認:true)

自動檢查消耗的記錄的CRC32。這確保不會發生對消息的在線或磁盤損壞。此檢查增加了一些開銷,因此在尋求極端性能的情況下可能禁用此檢查。

5、bootstrap.servers:服務器配置

多個用都好隔開。

6、connections.max.idle.ms:關閉空間連接時間(默認:540000)

在此配置指定的毫秒數之后關閉空閑連接。

7、group.id:群組(默認:“”)

唯一標識用戶群組,同一個group每個partition只會分配到一個consumer。

8、max.poll.records:拉起最大記錄(默認:500)

單次輪詢()調用中返回的記錄的最大數量。

9、max.poll.interval.ms:拉取記錄間隔(默認:300000,5分鍾)

使用消費者組管理時輪詢()調用之間的最大延遲。這為使用者在獲取更多記錄之前空閑的時間設置了上限。如果在此超時過期之前沒有調用poll(),則認為使用者失敗,組將重新平衡,以便將分區重新分配給另一個成員。

10、request.timeout.ms:請求超時時間(默認:30000 ,30S)

配置控制客戶機等待請求響應的最長時間。如果在超時超時之前沒有收到響應,客戶端將在需要時重新發送請求,或者在重試耗盡時失敗請求。

11、session.timeout.ms:consumer session超時

用於檢測worker程序失敗的超時。worker定期發送心跳,以向代理表明其活性。如果在此會話超時過期之前代理沒有接收到心跳,則代理將從組中刪除。請注意,該值必須位於group.min.session.timeout在broker配置中配置的允許范圍內group.max.session.timeout.ms。

12、auto.offset.reset:初始偏移量 (默認:latest)

如果Kafka中沒有初始偏移量,或者服務器上不再存在當前偏移量(例如,因為該數據已被刪除),該怎么辦:

earliest:自動重置偏移到最早的偏移

latest:自動將偏移量重置為最新偏移量

none:如果沒有為使用者的組找到以前的偏移量,則向使用者拋出exception

anything else:向使用者拋出異常

13、key.deserializer

用於實現org.apache.kafka.common. serialize .Deserializer接口的key的反序列化類,class org.apache.kafka.common.serialization.StringDeserializer

14、value.deserializer

用於實現org.apache.kafka.common. serialize .Deserializer接口的value的反序列化類,class org.apache.kafka.common.serialization.StringDeserializer

15、max.partition.fetch.bytes

每個分區服務器將返回的最大數據量。記錄由consumer成批提取。如果fetch的第一個非空分區中的第一個記錄批處理大於這個限制,那么仍然會返回批處理,以確保使用者能夠取得進展。broker接受的最大記錄批處理大小是通過message.max定義的。字節(broker配置)或max.message。字節(topic配置)。看fetch.max.bytes用於限制consumer請求大小的字節。

16、partition.assignment.strategy:consumer訂閱分區策略

(默認:class org.apache.kafka.clients.consumer.RangeAssignor)

當使用組管理時,客戶端將使用分區分配策略的類名在使用者實例之間分配分區所有權。

17、fetch.max.bytes:拉取最大字節(默認:52428800,50M)

服務器應該為獲取請求返回的最大數據量。記錄由使用者成批地獲取,並且如果獲取的第一個非空分區中的第一個記錄批處理大於這個值,仍然會返回記錄批處理,以確保使用者能夠取得進展。因此,這不是一個絕對最大值。代理接受的最大記錄批處理大小是通過message.max定義的。字節(代理配置)或max.message。字節(主題配置)。請注意,使用者並行執行多個獲取。

18、heartbeat.interval.ms:心跳時間(默認:3000, 3S)

使用Kafka的組管理工具時,從心跳到消費者協調器的預期時間。心跳被用來確保消費者的會話保持活躍,並在新消費者加入或離開組時促進再平衡。該值必須設置為小於session.timeout.ms的1/3。它可以調整甚至更低,以控制正常再平衡的預期時間。

19、fetch.max.wait.ms:拉取阻塞時間(默認:500)

如果沒有足夠的數據立即滿足fetch.min.bytes提供的要求,服務器在響應fetch請求之前將阻塞的最長時間。

20、fetch.min.bytes:拉取最小字節數(默認:1)

服務器應該為獲取請求返回的最小數據量。如果沒有足夠的數據可用,請求將等待那么多數據累積后再響應請求。默認的1字節設置意味着,只要數據的一個字節可用,或者獲取請求超時等待數據到達,就會響應獲取請求。將此設置為大於1的值將導致服務器等待更大數量的數據累積,這可以稍微提高服務器吞吐量,但代價是增加一些延遲。

21、exclude.internal.topics:公開內部topic(默認:true)

是否應該將來自內部主題(如偏移量)的記錄公開給使用者,consumer共享offset。如果設置為true,從內部主題接收記錄的唯一方法是訂閱它。

22、isolation.level(隔離級別:默認:read_uncommitted)

控制如何以事務方式讀取寫入的消息。如果設置為read_committed, consumer.poll()將只返回已提交的事務消息。如果設置為read_uncommitted'(默認),consumer.poll()將返回所有消息,甚至是已經中止的事務消息。在任何一種模式下,非事務性消息都將無條件返回。

三、broke配置

1、zookeeper.connect:zk地址

多個用逗號隔開。

2、advertised.host.name(默認:null)

不贊成使用:

在server.properties 里還有另一個參數是解決這個問題的, advertised.host.name參數用來配置返回的host.name值,把這個參數配置為外網IP地址即可。

這個參數默認沒有啟用,默認是返回的java.net.InetAddress.getCanonicalHostName的值,在我的mac上這個值並不等於hostname的值而是返回IP,但在linux上這個值就是hostname的值。

3、advertised.listeners

hostname和端口注冊到zk給生產者和消費者使用的,如果沒有設置,將會使用listeners的配置,如果listeners也沒有配置,將使用java.net.InetAddress.getCanonicalHostName()來獲取這個hostname和port,對於ipv4,基本就是localhost了。

4、auto.create.topics.enable(自動創建topic,默認:true)

第一次發動消息時,自動創建topic。

5、auto.leader.rebalance.enable:自動rebalance(默認:true)

支持自動領導平衡。如果需要,后台線程定期檢查並觸發leader balance。

6、background.threads:處理線程(默認:10)

用於各種后台處理任務的線程數。

7、broker.id 默認:-1

此服務器的broke id。如果未設置,將生成唯一的代理id。為了避免zookeeper生成的broke id和用戶配置的broke id之間的沖突,生成的代理id從reserve .broker.max開始id + 1。

8、compression.type:壓縮類型,默認:producer

指定給定主題的最終壓縮類型。此配置接受標准壓縮編解碼器(“gzip”、“snappy”、“lz4”、“zstd”)。它還接受“未壓縮”,相當於沒有壓縮;以及“producer”,即保留producer設置的原始壓縮編解碼器。

9、delete.topic.enable 刪除topic(默認:true)

允許刪除主題。如果關閉此配置,則通過管理工具刪除主題將無效。

10、leader.imbalance.check.interval.seconds(rebalance檢測頻率,默認:300)

控制器觸發分區rebalance檢查的頻率。

11、leader.imbalance.per.broker.percentage(觸發rebalance比率,默認:10,10%)

每個broke允許的lead不平衡比率。如果控制器超過每個broke的這個值,控制器將觸發一個leader balance。該值以百分比指定。

12、log.dir(日志目錄,默認:/tmp/kafka-logs)

保存日志數據的目錄。

13、log.dirs

保存日志數據的目錄。如果未設置,則為日志中的值。使用dir。

14、log.flush.interval.messages(默認:9223372036854775807)

在將消息刷新到磁盤之前,日志分區上累積的消息數量

15、log.flush.interval.ms(默認:null)

任何主題中的消息在刷新到磁盤之前保存在內存中的最長時間。如果沒有設置,則使用log.flush.scheduler.interval.ms中的值。

16、log.flush.offset.checkpoint.interval.ms(默認:60000)

作為日志恢復點的上次刷新的持久記錄的更新頻率。

17、log.retention.bytes 保存日志文件的最大值(默認:-1)

刪除前日志的最大大小。

18、log.retention.hours日志文件最大保存時間(小時)默認:168,一周

日志文件最大保存時間。

19、log.retention.minutes日志文件最大保存時間(分鍾)默認:null

20、log.retention.ms日志文件最大保存時間(毫秒)默認:null

21、log.roll.hours:新segment產生時間,默認:168,一周

即使文件沒有到達log.segment.bytes,只要文件創建時間到達此屬性,就會創建新文件。

22、log.roll.ms :新segment產生時間

滾出新日志段之前的最大時間(以毫秒為單位)。如果未設置,則為log.roll中的值使用。

23、log.segment.bytes:segment文件最大值,默認:1073741824(1G)

24、log.segment.delete.delay.ms:segment刪除等待時間, 默認:60000

從文件系統中刪除文件之前等待的時間量。

25、message.max.bytes 最大batch size 默認:1000012,0.9M

Kafka允許的最大記錄batch size。如果增加了這個值,並且存在大於0.10.2的使用者,那么還必須增加consumer的fetch大小,以便他們能夠獲取這么大的記錄批。在最新的消息格式版本中,記錄總是按批進行分組,以提高效率。在以前的消息格式版本中,未壓縮記錄沒有分組成批,這種限制只適用於單個記錄。可以使用主題級別max.message設置每個主題。字節的配置。

26、min.insync.replicas(insync中最小副本值)

當producer將acks設置為“all”(或“-1”)時,min.insync。副本指定必須確認寫操作成功的最小副本數量。如果不能滿足這個最小值,則生產者將引發一個異常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend)。

當一起使用時,min.insync.replicas和ack允許您執行更大的持久性保證。一個典型的場景是創建一個復制因子為3的主題,設置min.insync復制到2個,用“all”配置發送。將確保如果大多數副本沒有收到寫操作,則生產者將引發異常。

27、num.io.threads,默認:8

服務器用於處理請求的線程數,其中可能包括磁盤I/O。

28、num.network.threads,默認:3

服務器用於接收來自網絡的請求和向網絡發送響應的線程數。

29、num.recovery.threads.per.data.dir 默認:1

每個數據目錄在啟動時用於日志恢復和在關閉時用於刷新的線程數。

30、num.replica.alter.log.dirs.threads 默認:null

可以在日志目錄(可能包括磁盤I/O)之間移動副本的線程數。

31、num.replica.fetchers

從leader復制數據到follower的線程數。

32、offset.metadata.max.bytes 默認:4096

與offset提交關聯的metadata的最大大小。

33、offsets.commit.timeout.ms 默認:5000

偏移量提交將被延遲,直到偏移量主題的所有副本收到提交或達到此超時。這類似於生產者請求超時。

34、offsets.topic.num.partitions 默認:50

偏移量提交主題的分區數量(部署后不應更改)。

35、offsets.topic.replication.factor 副本大小,默認:3

低於上述值,主題將創建失敗。

36、offsets.topic.segment.bytes 默認104857600 ,100M

segment映射文件(index)文件大小,以便加快日志壓縮和緩存負載。

37、queued.max.requests 默認:500

阻塞網絡線程之前允許排隊的請求數。

38、replica.fetch.min.bytes默認:1

每個fetch響應所需的最小字節。如果字節不夠,則等待replicaMaxWaitTimeMs。

39、replica.lag.time.max.ms 默認:10000

如果follower 沒有發送任何獲取請求,或者至少在這段時間沒有消耗到leader日志的結束偏移量,那么leader將從isr中刪除follower 。

40、transaction.max.timeout.ms 默認:900000

事務執行最長時間,超時則拋出異常。

41、unclean.leader.election.enable 默認:false

是否選舉ISR以外的副本作為leader,會導致數據丟失。

42、zookeeper.connection.timeout.ms

客戶端等待與zookeeper建立連接的最長時間。如果未設置,則用zookeeper.session.timeout中的值。

43、zookeeper.max.in.flight.requests

阻塞之前consumer將發送給Zookeeper的未確認請求的最大數量。

44、group.max.session.timeout.ms

注冊使用者允許的最大會話超時。超時時間越長,消費者在心跳之間處理消息的時間就越多,而檢測故障的時間就越長。

45、group.min.session.timeout.ms

注冊使用者允許的最小會話超時。更短的超時導致更快的故障檢測,但代價是更頻繁的用戶心跳,這可能會壓倒broke資源。

46、num.partitions 默認:1

每個主題的默認日志分區數量。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM