kafka配置參數詳解【收藏】


3.1      Broker  Configs

基本配置如下:

-broker.id
-log.dirs
-zookeeper.connect

Topic-level配置以及其默認值將在下面討論。


Property

Default

Description

broker.id

 

每個broker都可以用一個唯一的非負整數id進行標識;這個id可以作為broker的“名字”,並且它的存在使得broker無須混淆consumers就可以遷移到不同的host/port上。你可以選擇任意你喜歡的數字作為id,只要id是唯一的即可。

log.dirs

/tmp/kafka-logs

kafka存放數據的路徑。這個路徑並不是唯一的,可以是多個,路徑之間只需要使用逗號分隔即可;每當創建新partition時,都會選擇在包含最少partitions的路徑下進行。

port

6667

server接受客戶端連接的端口

zookeeper.connect

null

ZooKeeper連接字符串的格式為:hostname:port,此處hostname和port分別是ZooKeeper集群中某個節點的host和port;為了當某個host宕掉之后你能通過其他ZooKeeper節點進行連接,你可以按照一下方式制定多個hosts:
hostname1:port1, hostname2:port2, hostname3:port3.

ZooKeeper 允許你增加一個“chroot”路徑,將集群中所有kafka數據存放在特定的路徑下。當多個Kafka集群或者其他應用使用相同ZooKeeper集群時,可以使用這個方式設置數據存放路徑。這種方式的實現可以通過這樣設置連接字符串格式,如下所示:
hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
這樣設置就將所有kafka集群數據存放在/chroot/path路徑下。注意,在你啟動broker之前,你必須創建這個路徑,並且consumers必須使用相同的連接格式。

message.max.bytes

1000000

server可以接收的消息最大尺寸。重要的是,consumer和producer有關這個屬性的設置必須同步,否則producer發布的消息對consumer來說太大。

num.network.threads

3

server用來處理網絡請求的網絡線程數目;一般你不需要更改這個屬性。

num.io.threads

8

server用來處理請求的I/O線程的數目;這個線程數目至少要等於硬盤的個數。

background.threads

4

用於后台處理的線程數目,例如文件刪除;你不需要更改這個屬性。

queued.max.requests

500

在網絡線程停止讀取新請求之前,可以排隊等待I/O線程處理的最大請求個數。

host.name

null

broker的hostname;如果hostname已經設置的話,broker將只會綁定到這個地址上;如果沒有設置,它將綁定到所有接口,並發布一份到ZK

advertised.host.name

null

如果設置,則就作為broker 的hostname發往producer、consumers以及其他brokers

advertised.port

null

此端口將給與producers、consumers、以及其他brokers,它會在建立連接時用到; 它僅在實際端口和server需要綁定的端口不一樣時才需要設置。

socket.send.buffer.bytes

100 * 1024

SO_SNDBUFF 緩存大小,server進行socket 連接所用

socket.receive.buffer.bytes

100 * 1024

SO_RCVBUFF緩存大小,server進行socket連接時所用

socket.request.max.bytes

100 * 1024 * 1024

server允許的最大請求尺寸;  這將避免server溢出,它應該小於Java  heap size

num.partitions

1

如果創建topic時沒有給出划分partitions個數,這個數字將是topic下partitions數目的默認數值。

log.segment.bytes

1014*1024*1024

topic  partition的日志存放在某個目錄下諸多文件中,這些文件將partition的日志切分成一段一段的;這個屬性就是每個文件的最大尺寸;當尺寸達到這個數值時,就會創建新文件。此設置可以由每個topic基礎設置時進行覆蓋。
查看  the per-topic  configuration section

log.roll.hours

24 * 7

即使文件沒有到達log.segment.bytes,只要文件創建時間到達此屬性,就會創建新文件。這個設置也可以有topic層面的設置進行覆蓋;
查看the per-topic  configuration section

log.cleanup.policy

delete

 

log.retention.minutes和log.retention.hours

7 days

每個日志文件刪除之前保存的時間。默認數據保存時間對所有topic都一樣。
log.retention.minutes 和 log.retention.bytes 都是用來設置刪除日志文件的,無論哪個屬性已經溢出。
這個屬性設置可以在topic基本設置時進行覆蓋。
查看the per-topic  configuration section

log.retention.bytes

-1

每個topic下每個partition保存數據的總量;注意,這是每個partitions的上限,因此這個數值乘以partitions的個數就是每個topic保存的數據總量。同時注意:如果log.retention.hours和log.retention.bytes都設置了,則超過了任何一個限制都會造成刪除一個段文件。
注意,這項設置可以由每個topic設置時進行覆蓋。
查看the per-topic  configuration section

log.retention.check.interval.ms

5 minutes

檢查日志分段文件的間隔時間,以確定是否文件屬性是否到達刪除要求。

log.cleaner.enable

false

當這個屬性設置為false時,一旦日志的保存時間或者大小達到上限時,就會被刪除;如果設置為true,則當保存屬性達到上限時,就會進行log compaction

log.cleaner.threads

1

進行日志壓縮的線程數

log.cleaner.io.max.bytes.per.second

None

進行log compaction時,log cleaner可以擁有的最大I/O數目。這項設置限制了cleaner,以避免干擾活動的請求服務。

log.cleaner.io.buffer.size

500*1024*1024

log cleaner清除過程中針對日志進行索引化以及精簡化所用到的緩存大小。最好設置大點,以提供充足的內存。

log.cleaner.io.buffer.load.factor

512*1024

進行log cleaning時所需要的I/O chunk尺寸。你不需要更改這項設置。

log.cleaner.io.buffer.load.factor

0.9

log cleaning中所使用的hash表的負載因子;你不需要更改這個選項。

log.cleaner.backoff.ms

15000

進行日志是否清理檢查的時間間隔

log.cleaner.min.cleanable.ratio

0.5

這項配置控制log  compactor試圖清理日志的頻率(假定log compaction是打開的)。默認避免清理壓縮超過50%的日志。這個比率綁定了備份日志所消耗的最大空間(50%的日志備份時壓縮率為50%)。更高的比率則意味着浪費消耗更少,也就可以更有效的清理更多的空間。這項設置在每個topic設置中可以覆蓋。
查看the per-topic  configuration section

log.cleaner.delete.retention.ms

1day

保存時間;保存壓縮日志的最長時間;也是客戶端消費消息的最長時間,榮log.retention.minutes的區別在於一個控制未壓縮數據,一個控制壓縮后的數據;會被topic創建時的指定時間覆蓋。

log.index.size.max.bytes

10*1024*1024

每個log segment的最大尺寸。注意,如果log尺寸達到這個數值,即使尺寸沒有超過log.segment.bytes限制,也需要產生新的log  segment。

log.index.interval.bytes

4096

當執行一次fetch后,需要一定的空間掃描最近的offset,設置的越大越好,一般使用默認值就可以

log.flush.interval.messages

Long.MaxValue

log文件“sync”到磁盤之前累積的消息條數。因為磁盤IO操作是一個慢操作,但又是一個“數據可靠性”的必要手段,所以檢查是否需要固化到硬盤的時間間隔。需要在“數據可靠性”與“性能”之間做必要的權衡,如果此值過大,將會導致每次“發sync”的時間過長(IO阻塞),如果此值過小,將會導致“fsync”的時間較長(IO阻塞),如果此值過小,將會導致”發sync“的次數較多,這也就意味着整體的client請求有一定的延遲,物理server故障,將會導致沒有fsync的消息丟失。

log.flush.scheduler.interval.ms

Long.MaxValue

檢查是否需要fsync的時間間隔

log.flush.interval.ms

Long.MaxValue

僅僅通過interval來控制消息的磁盤寫入時機,是不足的,這個數用來控制”fsync“的時間間隔,如果消息量始終沒有達到固化到磁盤的消息數,但是離上次磁盤同步的時間間隔達到閾值,也將觸發磁盤同步。

log.delete.delay.ms

60000

文件在索引中清除后的保留時間,一般不需要修改

auto.create.topics.enable

true

是否允許自動創建topic。如果是真的,則produce或者fetch 不存在的topic時,會自動創建這個topic。否則需要使用命令行創建topic

controller.socket.timeout.ms

30000

partition管理控制器進行備份時,socket的超時時間。

controller.message.queue.size

Int.MaxValue

controller-to-broker-channles的buffer 尺寸

default.replication.factor

1

默認備份份數,僅指自動創建的topics

replica.lag.time.max.ms

10000

如果一個follower在這個時間內沒有發送fetch請求,leader將從ISR重移除這個follower,並認為這個follower已經掛了

replica.lag.max.messages

4000

如果一個replica沒有備份的條數超過這個數值,則leader將移除這個follower,並認為這個follower已經掛了

replica.socket.timeout.ms

30*1000

leader 備份數據時的socket網絡請求的超時時間

replica.socket.receive.buffer.bytes

64*1024

備份時向leader發送網絡請求時的socket receive buffer

replica.fetch.max.bytes

1024*1024

備份時每次fetch的最大值

replica.fetch.min.bytes

500

leader發出備份請求時,數據到達leader的最長等待時間

replica.fetch.min.bytes

1

備份時每次fetch之后回應的最小尺寸

num.replica.fetchers

1

從leader備份數據的線程數

replica.high.watermark.checkpoint.interval.ms

5000

每個replica檢查是否將最高水位進行固化的頻率

fetch.purgatory.purge.interval.requests

1000

fetch 請求清除時的清除間隔

producer.purgatory.purge.interval.requests

1000

producer請求清除時的清除間隔

zookeeper.session.timeout.ms

6000

zookeeper會話超時時間。

zookeeper.connection.timeout.ms

6000

客戶端等待和zookeeper建立連接的最大時間

zookeeper.sync.time.ms

2000

zk follower落后於zk leader的最長時間

controlled.shutdown.enable

true

是否能夠控制broker的關閉。如果能夠,broker將可以移動所有leaders到其他的broker上,在關閉之前。這減少了不可用性在關機過程中。

controlled.shutdown.max.retries

3

在執行不徹底的關機之前,可以成功執行關機的命令數。

controlled.shutdown.retry.backoff.ms

5000

在關機之間的backoff時間

auto.leader.rebalance.enable

true

如果這是true,控制者將會自動平衡brokers對於partitions的leadership

leader.imbalance.per.broker.percentage

10

每個broker所允許的leader最大不平衡比率

leader.imbalance.check.interval.seconds

300

檢查leader不平衡的頻率

offset.metadata.max.bytes

4096

允許客戶端保存他們offsets的最大個數

max.connections.per.ip

Int.MaxValue

每個ip地址上每個broker可以被連接的最大數目

max.connections.per.ip.overrides

 

每個ip或者hostname默認的連接的最大覆蓋

connections.max.idle.ms

600000

空連接的超時限制

log.roll.jitter.{ms,hours}

0

從logRollTimeMillis抽離的jitter最大數目

num.recovery.threads.per.data.dir

1

每個數據目錄用來日志恢復的線程數目

unclean.leader.election.enable

true

指明了是否能夠使不在ISR中replicas設置用來作為leader

delete.topic.enable

false

能夠刪除topic

offsets.topic.num.partitions

50

The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200).

offsets.topic.retention.minutes

1440

存在時間超過這個時間限制的offsets都將被標記為待刪除

offsets.retention.check.interval.ms

600000

offset管理器檢查陳舊offsets的頻率

offsets.topic.replication.factor

3

topic的offset的備份份數。建議設置更高的數字保證更高的可用性

offset.topic.segment.bytes

104857600

offsets topic的segment尺寸。

offsets.load.buffer.size

5242880

這項設置與批量尺寸相關,當從offsets segment中讀取時使用。

offsets.commit.required.acks

-1

在offset  commit可以接受之前,需要設置確認的數目,一般不需要更改



offsets.commit.timeout.ms     5000      offset commit的延遲時間,這和producer request的超時時間相似。


更多細節可以在scala 類 kafka.server.KafkaConfig中找到。




topic-level的配置

有關topics的配置既有全局的又有每個topic獨有的配置。如果沒有給定特定topic設置,則應用默認的全局設置。這些覆蓋會在每次創建topic發生。下面的例子:創建一個topic,命名為my-topic,自定義最大消息尺寸以及刷新比率為:
>    bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1 
        --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1

需要刪除重寫時,可以按照以下來做:

> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic 
    --deleteConfig max.message.bytes


以下是topic-level的配置選項。server的默認配置在Server  Default Property列下給出了,設定這些默認值不會改變原有的設置

Property

Default

Server Default Property

Description

cleanup.policy

delete

log.cleanup.policy

要么是”delete“要么是”compact“; 這個字符串指明了針對舊日志部分的利用方式;默認方式("delete")將會丟棄舊的部分當他們的回收時間或者尺寸限制到達時。”compact“將會進行日志壓縮

delete.retention.ms

86400000 (24 hours)

log.cleaner.delete.retention.ms

對於壓縮日志保留的最長時間,也是客戶端消費消息的最長時間,通log.retention.minutes的區別在於一個控制未壓縮數據,一個控制壓縮后的數據。此項配置可以在topic創建時的置頂參數覆蓋

flush.messages

None

log.flush.interval.messages

此項配置指定時間間隔:強制進行fsync日志。例如,如果這個選項設置為1,那么每條消息之后都需要進行fsync,如果設置為5,則每5條消息就需要進行一次fsync。一般來說,建議你不要設置這個值。此參數的設置,需要在"數據可靠性"與"性能"之間做必要的權衡.如果此值過大,將會導致每次"fsync"的時間較長(IO阻塞),如果此值過小,將會導致"fsync"的次數較多,這也意味着整體的client請求有一定的延遲.物理server故障,將會導致沒有fsync的消息丟失.

flush.ms

None

log.flush.interval.ms

此項配置用來置頂強制進行fsync日志到磁盤的時間間隔;例如,如果設置為1000,那么每1000ms就需要進行一次fsync。一般不建議使用這個選項

index.interval.bytes

4096

log.index.interval.bytes

默認設置保證了我們每4096個字節就對消息添加一個索引,更多的索引使得閱讀的消息更加靠近,但是索引規模卻會由此增大;一般不需要改變這個選項

max.message.bytes

1000000

max.message.bytes

kafka追加消息的最大尺寸。注意如果你增大這個尺寸,你也必須增大你consumer的fetch 尺寸,這樣consumer才能fetch到這些最大尺寸的消息。

min.cleanable.dirty.ratio

0.5

min.cleanable.dirty.ratio

此項配置控制log壓縮器試圖進行清除日志的頻率。默認情況下,將避免清除壓縮率超過50%的日志。這個比率避免了最大的空間浪費

min.insync.replicas

1

min.insync.replicas

當producer設置request.required.acks為-1時,min.insync.replicas指定replicas的最小數目(必須確認每一個repica的寫數據都是成功的),如果這個數目沒有達到,producer會產生異常。

retention.bytes

None

log.retention.bytes

如果使用“delete”的retention  策略,這項配置就是指在刪除日志之前,日志所能達到的最大尺寸。默認情況下,沒有尺寸限制而只有時間限制

retention.ms

7 days

log.retention.minutes

如果使用“delete”的retention策略,這項配置就是指刪除日志前日志保存的時間。

segment.bytes

1GB

log.segment.bytes

kafka中log日志是分成一塊塊存儲的,此配置是指log日志划分成塊的大小

segment.index.bytes

10MB

log.index.size.max.bytes

此配置是有關offsets和文件位置之間映射的索引文件的大小;一般不需要修改這個配置

segment.ms

7 days

log.roll.hours

即使log的分塊文件沒有達到需要刪除、壓縮的大小,一旦log 的時間達到這個上限,就會強制新建一個log分塊文件

segment.jitter.ms

0

log.roll.jitter.{ms,hours}

The maximum jitter to subtract from logRollTimeMillis.




3.2  Consumer  Configs
    consumer基本配置如下:
       group.id
       zookeeper.connect

Property

Default

Description

group.id

 

用來唯一標識consumer進程所在組的字符串,如果設置同樣的group  id,表示這些processes都是屬於同一個consumer  group

zookeeper.connect

 

指定zookeeper的連接的字符串,格式是hostname:port,此處host和port都是zookeeper server的host和port,為避免某個zookeeper 機器宕機之后失聯,你可以指定多個hostname:port,使用逗號作為分隔:
hostname1:port1,hostname2:port2,hostname3:port3
可以在zookeeper連接字符串中加入zookeeper的chroot路徑,此路徑用於存放他自己的數據,方式:
hostname1:port1,hostname2:port2,hostname3:port3/chroot/path

consumer.id

null

不需要設置,一般自動產生

socket.timeout.ms

30*100

網絡請求的超時限制。真實的超時限制是   max.fetch.wait+socket.timeout.ms

socket.receive.buffer.bytes

64*1024

socket用於接收網絡請求的緩存大小

fetch.message.max.bytes

1024*1024

每次fetch請求中,針對每次fetch消息的最大字節數。這些字節將會督導用於每個partition的內存中,因此,此設置將會控制consumer所使用的memory大小。這個fetch請求尺寸必須至少和server允許的最大消息尺寸相等,否則,producer可能發送的消息尺寸大於consumer所能消耗的尺寸。

num.consumer.fetchers

1

用於fetch數據的fetcher線程數

auto.commit.enable

true

如果為真,consumer所fetch的消息的offset將會自動的同步到zookeeper。這項提交的offset將在進程掛掉時,由新的consumer使用

auto.commit.interval.ms

60*1000

consumer向zookeeper提交offset的頻率,單位是秒

queued.max.message.chunks

2

用於緩存消息的最大數目,以供consumption。每個chunk必須和fetch.message.max.bytes相同

rebalance.max.retries

4

當新的consumer加入到consumer  group時,consumers集合試圖重新平衡分配到每個consumer的partitions數目。如果consumers集合改變了,當分配正在執行時,這個重新平衡會失敗並重入

fetch.min.bytes

1

每次fetch請求時,server應該返回的最小字節數。如果沒有足夠的數據返回,請求會等待,直到足夠的數據才會返回。

fetch.wait.max.ms

100

如果沒有足夠的數據能夠滿足fetch.min.bytes,則此項配置是指在應答fetch請求之前,server會阻塞的最大時間。

rebalance.backoff.ms

2000

在重試reblance之前backoff時間

refresh.leader.backoff.ms

200

在試圖確定某個partition的leader是否失去他的leader地位之前,需要等待的backoff時間

auto.offset.reset

largest

zookeeper中沒有初始化的offset時,如果offset是以下值的回應:
smallest:自動復位offset為smallest的offset
largest:自動復位offset為largest的offset
anything  else:向consumer拋出異常

consumer.timeout.ms

-1

如果沒有消息可用,即使等待特定的時間之后也沒有,則拋出超時異常

exclude.internal.topics

true

是否將內部topics的消息暴露給consumer

paritition.assignment.strategy

range

選擇向consumer 流分配partitions的策略,可選值:range,roundrobin

client.id

group id value

是用戶特定的字符串,用來在每次請求中幫助跟蹤調用。它應該可以邏輯上確認產生這個請求的應用

zookeeper.session.timeout.ms

6000

zookeeper 會話的超時限制。如果consumer在這段時間內沒有向zookeeper發送心跳信息,則它會被認為掛掉了,並且reblance將會產生

zookeeper.connection.timeout.ms

6000

客戶端在建立通zookeeper連接中的最大等待時間

zookeeper.sync.time.ms

2000

ZK follower可以落后ZK leader的最大時間

offsets.storage

zookeeper

用於存放offsets的地點: zookeeper或者kafka

offset.channel.backoff.ms

1000

重新連接offsets channel或者是重試失敗的offset的fetch/commit請求的backoff時間

offsets.channel.socket.timeout.ms

10000

當讀取offset的fetch/commit請求回應的socket 超時限制。此超時限制是被consumerMetadata請求用來請求offset管理

offsets.commit.max.retries

5

重試offset commit的次數。這個重試只應用於offset  commits在shut-down之間。他

dual.commit.enabled

true

如果使用“kafka”作為offsets.storage,你可以二次提交offset到zookeeper(還有一次是提交到kafka)。在zookeeper-based的offset  storage到kafka-based的offset storage遷移時,這是必須的。對任意給定的consumer  group來說,比較安全的建議是當完成遷移之后就關閉這個選項

partition.assignment.strategy

range

在“range”和“roundrobin”策略之間選擇一種作為分配partitions給consumer 數據流的策略; 循環的partition分配器分配所有可用的partitions以及所有可用consumer  線程。它會將partition循環的分配到consumer線程上。如果所有consumer實例的訂閱都是確定的,則partitions的划分是確定的分布。循環分配策略只有在以下條件滿足時才可以:(1)每個topic在每個consumer實力上都有同樣數量的數據流。(2)訂閱的topic的集合對於consumer  group中每個consumer實例來說都是確定的。



更多細節可以查看  scala類:  kafka.consumer.ConsumerConfig


3.3   Producer  Configs
producer基本的配置屬性包含:
(1) metadata.broker.list
(2)request.required.acks
(3)producer.type
(4)serializer.class

Property

Default

Description

metadata.broker.list

 

服務於bootstrapping。producer僅用來獲取metadata(topics,partitions,replicas)。發送實際數據的socket連接將基於返回的metadata數據信息而建立。格式是:
host1:port1,host2:port2
這個列表可以是brokers的子列表或者是一個指向brokers的VIP

request.required.acks

0

此配置是表明當一次produce請求被認為完成時的確認值。特別是,多少個其他brokers必須已經提交了數據到他們的log並且向他們的leader確認了這些信息。典型的值包括:
0: 表示producer從來不等待來自broker的確認信息(和0.7一樣的行為)。這個選擇提供了最小的時延但同時風險最大(因為當server宕機時,數據將會丟失)。
1:表示獲得leader replica已經接收了數據的確認信息。這個選擇時延較小同時確保了server確認接收成功。
-1:producer會獲得所有同步replicas都收到數據的確認。同時時延最大,然而,這種方式並沒有完全消除丟失消息的風險,因為同步replicas的數量可能是1.如果你想確保某些replicas接收到數據,那么你應該在topic-level設置中選項min.insync.replicas設置一下。請閱讀一下設計文檔,可以獲得更深入的討論。

request.timeout.ms

10000

broker盡力實現request.required.acks需求時的等待時間,否則會發送錯誤到客戶端

producer.type

sync

此選項置頂了消息是否在后台線程中異步發送。正確的值:
(1)  async: 異步發送
(2)  sync: 同步發送
通過將producer設置為異步,我們可以批量處理請求(有利於提高吞吐率)但是這也就造成了客戶端機器丟掉未發送數據的可能性

serializer.class

kafka.serializer.DefaultEncoder

消息的序列化類別。默認編碼器輸入一個字節byte[],然后返回相同的字節byte[]

key.serializer.class

 

關鍵字的序列化類。如果沒給與這項,默認情況是和消息一致

partitioner.class

kafka.producer.DefaultPartitioner

partitioner 類,用於在subtopics之間划分消息。默認partitioner基於key的hash表

compression.codec

none

此項參數可以設置壓縮數據的codec,可選codec為:“none”, “gzip”, “snappy”

compressed.topics

null

此項參數可以設置某些特定的topics是否進行壓縮。如果壓縮codec是NoCompressCodec之外的codec,則對指定的topics數據應用這些codec。如果壓縮topics列表是空,則將特定的壓縮codec應用於所有topics。如果壓縮的codec是NoCompressionCodec,壓縮對所有topics軍不可用。

message.send.max.retries

3

此項參數將使producer自動重試失敗的發送請求。此項參數將置頂重試的次數。注意:設定非0值將導致重復某些網絡錯誤:引起一條發送並引起確認丟失

retry.backoff.ms

100

在每次重試之前,producer會更新相關topic的metadata,以此進行查看新的leader是否分配好了。因為leader的選擇需要一點時間,此選項指定更新metadata之前producer需要等待的時間。

topic.metadata.refresh.interval.ms

600*1000

producer一般會在某些失敗的情況下(partition missing,leader不可用等)更新topic的metadata。他將會規律的循環。如果你設置為負值,metadata只有在失敗的情況下才更新。如果設置為0,metadata會在每次消息發送后就會更新(不建議這種選擇,系統消耗太大)。重要提示: 更新是有在消息發送后才會發生,因此,如果producer從來不發送消息,則metadata從來也不會更新。

queue.buffering.max.ms

5000

當應用async模式時,用戶緩存數據的最大時間間隔。例如,設置為100時,將會批量處理100ms之內消息。這將改善吞吐率,但是會增加由於緩存產生的延遲。

queue.buffering.max.messages

10000

當使用async模式時,在在producer必須被阻塞或者數據必須丟失之前,可以緩存到隊列中的未發送的最大消息條數

batch.num.messages

200

使用async模式時,可以批量處理消息的最大條數。或者消息數目已到達這個上線或者是queue.buffer.max.ms到達,producer才會處理

send.buffer.bytes

100*1024

socket 寫緩存尺寸

client.id

“”

這個client  id是用戶特定的字符串,在每次請求中包含用來追蹤調用,他應該邏輯上可以確認是那個應用發出了這個請求。



更多細節需要查看  scala類
kafka.producer.ProducerConfig



3、4  New  Producer  Configs
    我們正在努力替換現有的producer。代碼在trunk中是可用的,可以認為beta版本。下面是新producer的配置

Name

Type

Default

Importance

Description

boostrap.servers

list

 

high

用於建立與kafka集群連接的host/port組。數據將會在所有servers上均衡加載,不管哪些server是指定用於bootstrapping。這個列表僅僅影響初始化的hosts(用於發現全部的servers)。這個列表格式:
host1:port1,host2:port2,...
因為這些server僅僅是用於初始化的連接,以發現集群所有成員關系(可能會動態的變化),這個列表不需要包含所有的servers(你可能想要不止一個server,盡管這樣,可能某個server宕機了)。如果沒有server在這個列表出現,則發送數據會一直失敗,直到列表可用。

acks

string

1

high

producer需要server接收到數據之后發出的確認接收的信號,此項配置就是指procuder需要多少個這樣的確認信號。此配置實際上代表了數據備份的可用性。以下設置為常用選項:
(1)acks=0: 設置為0表示producer不需要等待任何確認收到的信息。副本將立即加到socket  buffer並認為已經發送。沒有任何保障可以保證此種情況下server已經成功接收數據,同時重試配置不會發生作用(因為客戶端不知道是否失敗)回饋的offset會總是設置為-1;
(2)acks=1: 這意味着至少要等待leader已經成功將數據寫入本地log,但是並沒有等待所有follower是否成功寫入。這種情況下,如果follower沒有成功備份數據,而此時leader又掛掉,則消息會丟失。
(3)acks=all: 這意味着leader需要等待所有備份都成功寫入日志,這種策略會保證只要有一個備份存活就不會丟失數據。這是最強的保證。
(4)其他的設置,例如acks=2也是可以的,這將需要給定的acks數量,但是這種策略一般很少用。

buffer.memory

long

33554432

high

producer可以用來緩存數據的內存大小。如果數據產生速度大於向broker發送的速度,producer會阻塞或者拋出異常,以“block.on.buffer.full”來表明。

這項設置將和producer能夠使用的總內存相關,但並不是一個硬性的限制,因為不是producer使用的所有內存都是用於緩存。一些額外的內存會用於壓縮(如果引入壓縮機制),同樣還有一些用於維護請求。

compression.type

string

none

high

producer用於壓縮數據的壓縮類型。默認是無壓縮。正確的選項值是none、gzip、snappy。
壓縮最好用於批量處理,批量處理消息越多,壓縮性能越好。

retries

int

0

high

設置大於0的值將使客戶端重新發送任何數據,一旦這些數據發送失敗。注意,這些重試與客戶端接收到發送錯誤時的重試沒有什么不同。允許重試將潛在的改變數據的順序,如果這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。

batch.size

int

16384

medium

producer將試圖批處理消息記錄,以減少請求次數。這將改善client與server之間的性能。這項配置控制默認的批量處理消息字節數。
不會試圖處理大於這個字節數的消息字節數。
發送到brokers的請求將包含多個批量處理,其中會包含對每個partition的一個請求。
較小的批量處理數值比較少用,並且可能降低吞吐量(0則會僅用批量處理)。較大的批量處理數值將會浪費更多內存空間,這樣就需要分配特定批量處理數值的內存大小。

client.id

string

 

medium

當向server發出請求時,這個字符串會發送給server。目的是能夠追蹤請求源頭,以此來允許ip/port許可列表之外的一些應用可以發送信息。這項應用可以設置任意字符串,因為沒有任何功能性的目的,除了記錄和跟蹤

linger.ms

long

0

medium

producer組將會匯總任何在請求與發送之間到達的消息記錄一個單獨批量的請求。通常來說,這只有在記錄產生速度大於發送速度的時候才能發生。然而,在某些條件下,客戶端將希望降低請求的數量,甚至降低到中等負載一下。這項設置將通過增加小的延遲來完成--即,不是立即發送一條記錄,producer將會等待給定的延遲時間以允許其他消息記錄發送,這些消息記錄可以批量處理。這可以認為是TCP種Nagle的算法類似。這項設置設定了批量處理的更高的延遲邊界:一旦我們獲得某個partition的batch.size,他將會立即發送而不顧這項設置,然而如果我們獲得消息字節數比這項設置要小的多,我們需要“linger”特定的時間以獲取更多的消息。 這個設置默認為0,即沒有延遲。設定linger.ms=5,例如,將會減少請求數目,但是同時會增加5ms的延遲。

max.request.size

int

1028576

medium

請求的最大字節數。這也是對最大記錄尺寸的有效覆蓋。注意:server具有自己對消息記錄尺寸的覆蓋,這些尺寸和這個設置不同。此項設置將會限制producer每次批量發送請求的數目,以防發出巨量的請求。

receive.buffer.bytes

int

32768

medium

TCP receive緩存大小,當閱讀數據時使用

send.buffer.bytes

int

131072

medium

TCP send緩存大小,當發送數據時使用

timeout.ms

int

30000

medium

此配置選項控制server等待來自followers的確認的最大時間。如果確認的請求數目在此時間內沒有實現,則會返回一個錯誤。這個超時限制是以server端度量的,沒有包含請求的網絡延遲

block.on.buffer.full

boolean

true

low

當我們內存緩存用盡時,必須停止接收新消息記錄或者拋出錯誤。默認情況下,這個設置為真,然而某些阻塞可能不值得期待,因此立即拋出錯誤更好。設置為false則會這樣:producer會拋出一個異常錯誤:BufferExhaustedException, 如果記錄已經發送同時緩存已滿

metadata.fetch.timeout.ms

long

60000

low

是指我們所獲取的一些元素據的第一個時間數據。元素據包含:topic,host,partitions。此項配置是指當等待元素據fetch成功完成所需要的時間,否則會跑出異常給客戶端。

metadata.max.age.ms

long

300000

low

以微秒為單位的時間,是在我們強制更新metadata的時間間隔。即使我們沒有看到任何partition leadership改變。

metric.reporters

list

[]

low

類的列表,用於衡量指標。實現MetricReporter接口,將允許增加一些類,這些類在新的衡量指標產生時就會改變。JmxReporter總會包含用於注冊JMX統計

metrics.num.samples

int

2

low

用於維護metrics的樣本數

metrics.sample.window.ms

long

30000

low

metrics系統維護可配置的樣本數量,在一個可修正的window  size。這項配置配置了窗口大小,例如。我們可能在30s的期間維護兩個樣本。當一個窗口推出后,我們會擦除並重寫最老的窗口

recoonect.backoff.ms

long

10

low

連接失敗時,當我們重新連接時的等待時間。這避免了客戶端反復重連

retry.backoff.ms

long

100

low

在試圖重試失敗的produce請求之前的等待時間。避免陷入發送-失敗的死循環中。






翻譯者:beitiandijun


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM