kafka0.9.0及0.10.0配置屬性


問題導讀

1.borker包含哪些屬性?
2.Producer包含哪些屬性?
3.Consumer如何配置?






borker(0.9.0及0.10.0)配置
Kafka日志本身是由多個日志段組成(log segment)。一個日志是一個FileMessageSet,它包含了日志數據以及OffsetIndex對象,該對象使用位移來讀取日志數據 
* borker配置就是指配置server.properties文件 *


最小配置

通常情況下需要在減壓縮kafka后,修改config/server.properties 配置文件中的以下兩項

[Bash shell]  純文本查看 復制代碼
?
1
2
3
log. dirs = kafka-logs
zookeeper.connect = localhost:9092
listeners = PLAINTEXT: //ip :9092


  • log.dirs 指的是kafka的log Data保存的目錄,默認為Null。如果不指定log Data會保存到log.dir設置的目錄中,log.dir默認為/tmp/kafka-logs。需要保證啟動KafKaServer的用戶對log.dirs或log.dir設置的目錄具有讀與寫的權限。
  • zookeeper.connect 指的是zookeeper集群的地址,可以是多個,多個之間用逗號分割hostname1:port1,hostname2:port2,hostname3:port3
  • listeners 監聽列表(以逗號分隔 不同的協議(如plaintext,trace,ssl、不同的IP和端口))


kafka提供的borker配置


[Bash shell]  純文本查看 復制代碼
?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# Replication configurations
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag. time .max.ms=10000
 
controller.socket.timeout.ms=30000
controller.message.queue.size=10
 
# Log configuration
num.partitions=8
message.max.bytes=1000000
auto.create.topics. enable = true
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=168
log.flush.interval.ms=10000
log.flush.interval.messages=20000
log.flush.scheduler.interval.ms=2000
log.roll.hours=168
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824
 
# ZK configuration
zookeeper.connection.timeout.ms=6000
zookeeper. sync . time .ms=2000
 
# Socket server configuration
num.io.threads=8
num.network.threads=8
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=16
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100


配置詳細說明


名稱 說明 類型 默認值 有效值 重要性
zookeeper.connect zookeeper集群的地址,
可以是多個,
多個之間用逗號分割
string localhost:
2181
ip1
:port1
,ip2
:port2
zookeeper.connection.timeout.ms 客戶端在建立通zookeeper
連接中的最大等待時間
int null 6000
zookeeper.session.timeout.ms ZooKeeper的session的超時
時間,如果在這段時間內沒
有收到ZK的心跳,則會被認
為該Kafka server掛掉了。
如果把這個值設置得過低可
能被誤認為掛掉,如果設置
得過高,如果真的掛了,則需
要很長時間才能被server得知
int 6000  
zookeeper.sync.time.ms 一個ZK follower能落后leader
的時間
int 2000  
listeners 監聽列表(以逗號分隔 不同的
協議(如plaintext,trace,ssl、
不同的IP和端口)),hostname
如果設置為0.0.0.0則綁定所
有的網卡地址;如果hostname
為空則綁定默認的網卡。
如果
沒有配置則默認為
java.net
.InetAddress
.getCanonicalHostName()
string null 如:PLAINTEXT:
//myhost:
9092,
TRACE://:
9091
或 PLAINTEXT:
//0.0.0.0
:9092,
host.name 。如果設置了它,
會僅綁定這個
地址。
如果沒有設置,則會
綁定所有
的網絡接口,並提交
一個給ZK。
不推薦使用 只有當
listeners沒
有設置時才有必要使用。
string “’ 如:
”localhost”
port server用來接受
client連接的端口。
不推薦使用,使用
listeners配置
項代替;只有在
listeners沒有
配置時才使用。
int 9092  
advertised.host.name 會將hostname通知給
生產者
和消費者,在多網卡
時需要
設置該值為另一個ip地址。
如果沒有設置該值,
則返回 
配置項host.name設置的值,
如果host.name沒有設
置則返回java.net.InetAddress.
getCanonicalHostName()
不推薦使用 只有當
advertised.listeners或
listeners沒有設置時才
有必要使用。
string null  
advertised.listeners 設置不同於listeners配置
的監聽列表即不同於
listeners設置的網卡地址
及端口;如果沒有配置,
會使用listeners的值
string null  
advertised.port 分發這個端口給所有的
producer,consumer和
其他broker來建立連接
。如果此端口跟server
綁定的端口不同,
則才有必要設置。
不推薦使用 只有當
advertised.listeners
或listeners沒有設置
時才有必要使用。
int null  
auto.create.topics.enable 是否允許自動創建topic。
如果設為true,那么
produce,consume
或者fetch metadata
一個不存在的topic時,
就會自動創建一個默認
replication factor和
partition number的topic。
boolean true  
background.threads 一些后台任務處理的
線程數,例如過期消
息文件的刪除等,
一般情況下不需要去
做修改
int 10  
broker.id 每一個broker在集群中
的唯一表示,要求是正數。
當該服務器的IP地址發
生改變時,broker.id沒有
變化,則不會影響
consumers的消息情況。
int -1  
compression.type 指定topic的壓縮類型。
除了支持’gzip’, ‘snappy’,
‘lz4’外,還支持
”uncompressed(不壓縮)
”以及produce
r(由producer來指定)
string producer  
delete.topic.enable 是否啟動刪除topic。
如果設置為false,
你在刪除topic的時
候無法刪除,但是會打
上一個你將刪除該topic
的標記,等到你修改這
一屬性的值為true后重
新啟動Kafka集群的時候
,集群自動將那些標記
刪除的topic刪除掉,對應
的log.dirs目錄下的topic
目錄和數據也會被刪除。
而將這一屬性設置為true之后,
你就能成功刪除你想要
刪除的topic了
boolean false  
auto.leader.rebalance.enable 一個后台線程會周期性
的自動嘗試,為所有的
broker的每個partition
平衡leadership,使kafka
的leader均衡。
boolean true  
leader.imbalance.check.interval.seconds 檢查leader是否均衡的
時間間隔(秒)
long 300  
leader.imbalance.per.broker.percentage 每個broker允許的不平衡
的leader的百分比。
如果每個broker超過
了這個百分比,復制控
制器會重新平衡leadership。
int 10  
log.flush.interval.messages 數據flush(sync)到硬盤
前之前累積的消息條數
,因為磁盤IO操作是一
個慢操作,但又是一個
”數據可靠性”的必要
手段,所以此參數的設置
,需要在”數據可靠性”
與”性能”之間做必要
的權衡.如果此值過大,
將會導致每次”fsync”
的時間較長
(IO阻塞),如果此值過
小,將會導致”fsync”的
次數較多,這也意味着
整體的client請求有一
定的延遲.物理server
故障,將會導致沒有
fsync的消息丟失
long 9223372
0368547
75807
(此為一
個數字)
 
log.flush.interval.ms 當達到下面的時間
(ms)時,執行一次
強制的flush操作。
interval.ms和
interval.messages
無論哪個達到,
都會flush。
long null  
log.flush.offset.checkpoint.interval.ms 記錄上次把log刷
到磁盤的時間點的
頻率,用來日后的
recovery。通常
不需要改變
long 60000  
log.flush.scheduler.interval.ms 檢查是否需要固
化到硬盤的時間
間隔
long 92233720
3685477
5807
(此為一
個數字)
 
log.retention.bytes topic每個分區的
最大文件大小,
一個topic的大小
限制 = 分區數*
log.retention.bytes。
-1沒有大小限
log.retention.bytes和log.retention.minutes
任意一個達到要求,
都會執行刪除,
會被topic創建時
的指定參數覆蓋
loong -1  
log.retention.hours 日志保存時間,
默認為7天(168小時)
。超過這個時間會根
據policy處理數據。
bytes和minutes無論
哪個先達到都會觸發
int 168  
log.retention.minutes 數據存儲的最大時間
超過這個時間會根據
log.cleanup.policy
設置的策略處理數
據,也就是消費端能
夠多久去消費數據
       
log.retention.bytes和log.retention.minutes任意一個達到要求,都會執行刪除,會被topic創建時的指定參數覆蓋 int null    
log.roll.hous 當達到下面時間,
會強制新建一個segment。
這個參數會在日志
segment沒有達到
log.segment.bytes
設置的大小,也會強制
新建一個segment會
int 168  
log.roll.jitter.{ms,hours} 從logRollTimeMillis抽
離的jitter最大數目
int 0  
log.segment.bytes topic partition的日志存
放在某個目錄下諸多
文件中,這些文件將
partition的日志切分成
一段一段的;這個屬性就
是每個文件的最大尺寸;
當尺寸達到這個數值時,
就會創建新文件。
此設置可以由每個topic
基礎設置時進行覆蓋
long 1G=
1024*
1024*
1024
 
log.segment.delet.delay.ms 刪除文件系統上文件
的等待時間,默認是
1分鍾
long 6000  
message.max.bytes 表示一個服務器能夠
接收處理的消息的最
大字節數,注意這個
值producer和consumer
必須設置一致,且不要大於fetch.message.max.bytes
屬性的值該值默認是
1000012字節,大概900KB
       
int 1000012      
min.insync.replicas 該屬性規定了最小的
ISR數。當producer設置request.required.acks
為all或-1時,指定副
本(replicas)的最小數目
(必須確認每一個repica
的寫數據都是成功的),
如果這個數目沒有達到,
producer會產生異常。
int 1  
num.io.threads 服務器用來處理請求
的I/O線程的數目;
這個線程數目至少要
等於硬盤的個數。
int 8  
num.network.threads 服務器用來處理網絡
請求的網絡線程數目;
一般你不需要更改這
個屬性
int 3  
num.recovery.threads.per.data.dir 每數據目錄用於日志
恢復啟動和關閉沖洗
時的線程數量
int 1  
num.replica.fetchers 從leader進行復制
消息的線程數,增大這個
數值會增加follower的IO
int 1  
offset.metadata.max.bytes 允許client(消費者)保存
它們元數據(offset)的
最大的數據量
int 4096(4kb)    
offsets.commit.required.acks 在offset commit可以接
受之前,需要設置確認的
數目,一般不需要更改
int -1  
offsets.commit.timeout.ms offset commit會延遲
直至此超時或所需的
副本數都收到offset 
commit,
這類似於producer請
求的超時
int 5000  
offsets.load.buffer.size 此設置對應於offset
manager在讀取緩存
offset segment的批量
大小(以字節為單位).
int 5242880  
offsets.retention.check.interval.ms offset管理器檢查陳
舊offsets的頻率
long 600000
(10分鍾)
 
offsets.topic.num.partitions 偏移的提交topic的分
區數目。 由於目前不
支持部署之后改變,
我們建議您使用生產
較高的設置
(例如,100-200)
int 50  
offsets.topic.replication.factor 復制因子的offset提交topic。
較高的設置
(例如三個或四個),
建議以確保更高的
可用性。如果offset topic
創建時,broker比
復制因子少,
offset topic將以較
少的副本創建。
short 3  
offsets.topic.segment.bytes offset topic的Segment
大小。因為它使用
壓縮的topic,所有
Sgment的大小應該
保持小一點,以促進更
快的日志壓實和負載
int 104857600  
queued.max.requests 在網絡線程
(network threads)
停止讀取新請求之前,
可以排隊等待I/O線程
處理的最大請求個數。
若是等待IO的請求超
過這個數值,那么會
停止接受外部消息
int 500  
quota.consumer.default 以clientid或
consumer group區分
的consumer端每秒
可以抓取的最大byte
long 92233720
36854775
807
(此為一
個數字)
 
quota.producer.default producer端每秒可
以產生的最大byte
long 92233720
3685477
5807
(此為一
個數字)
 
replica.fetch.max.bytes replicas每次獲取數
據的最大字節數
int 1048576  
replica.fetch.min.bytes fetch的最小數據尺寸,
如果leader中尚未同步
的數據不足此值,將會
阻塞,直到滿足條件
int 1  
replica.fetch.wait.max.ms replicas同leader之間
通信的最大等待時間,
失敗了會重試。這個值
須小於
replica.lag.time.max.ms,
以防止低吞吐量
主題ISR頻繁收縮
int 500  
replica.high.watermark.checkpoint.interval.ms 每一個replica存儲自己
的high watermark到磁
盤的頻率,用來日后
的recovery
int 5000  
replica.socket.timeout.ms 復制數據過程中,
replica發送給leader的
網絡請求的socket超
時時間,至少等於
replica.fetch.wait.max.ms
int 30000  
replica.socket.receive.buffer.bytes 復制過程leader接
受請求的buffer大小
int 65536
(64*1024)
 
replica.lag.time.max.ms replicas響應partition leader
的最長等待時間,
若是超過這個時間,
就將replicas列入
ISR(in-sync replicas),
並認為它是死的,
不會再加入管理中
long 10000  
replica.lag.max.messages 如果follower落后與
leader太多,將會認
為此follower
[或者說partition relicas]
已經失效。 通常,在
follower與leader通訊時,
因為網絡延遲或者鏈
接斷開,
總會導致replicas中消息
同步滯后如果消息之
后太多,leader將認為
此follower網絡延遲
較大或者消息吞吐
能力有限,將會把此
replicas遷移到其他
follower中.在broker數
量較少,或者網絡不足
的環境中,建議提高此值.
int 4000  
request.timeout.ms producer等待響應的
最長時間,如果超時
將重發幾次,最終報錯
int 30000  
socket.receive.buffer.bytes socket用於接收網
絡請求的緩存大小
int 102400  
socket.request.max.bytes server能接受的請求
的最大的大小,
這是為了防止server
跑光內存,不能大
於Java堆的大小。
int 104857600
(100*1024
*1024)
(此為一
個表達式)
 
socket.send.buffer.bytes server端用來處理
socket連接的
SO_SNDBUFF緩沖大小
int 102400  
controller.socket.timeout.ms partition管理控制
器進行備份時,
socket的超時時間
int 30000  
controller.message.queue.size partition leader與
replicas數據同步時,
消息的隊列大小
int 10  
num.partitions 每個topic的分區個數,
若是在topic創建時候
沒有指定的話會被
topic創建時的指定
參數覆蓋
int 1 推薦
設為8
log.index.interval.bytes 當執行一次fetch后,
需要一定的空間掃描
最近的offset,設置的
越大越好,但是也更耗
內存一般使用默認值就可以
int 4096  
log.index.size.max.bytes 每個log segment的
最大尺寸。注意,
如果log尺寸達到這
個數值,即使尺寸
沒有超過log.segment.bytes
限制,也需要產生新的
log segment。
int 10485760  
fetch.purgatory.purge.interval.requests 非立即答復請求放入
purgatory中,
當到達或超出interval
時認為request complete
int 1000  
producer.purgatory.purge.interval.requests producer請求清除時間 int 1000  
default.replication.factor 一個topic ,默認分區
的replication個數 ,
不能大於集群中
broker的個數。
int 1  
group.max.session.timeout.ms 注冊consumer允許
的最大超時時間
int 300000  
group.min.session.timeout.ms 注冊consumer允許
的最小超時時間
int 6000  
inter.broker.protocol.version broker協議版本 string 0.10.0  
log.cleaner.backoff.ms 檢查log是否需要
clean的時間間隔
long 15000  
log.cleaner.dedupe.buffer.size 日志壓縮去重時候的
緩存空間,在空間允許
的情況下,越大越好
long 134217
728

 
log.cleaner.delete.retention.ms 保存時間;保存壓縮
日志的最長時間;
也是客戶端消費消息的
最長時間,
同log.retention.minutes
的區別在於一個控制未
壓縮數據,一個控制壓
縮后的數據;會被topic
創建時的指定時間覆蓋。
long 86400000
(一天)
 
log.cleaner.enable 是否啟動壓縮日志,
當這個屬性設置為false時
,一旦日志的保存時間或
者大小達到上限時,
就會被刪除;
如果設置為true,
則當保存屬性達到上限時,
就會進行壓縮
boolean false  
log.cleaner.threads 日志壓縮運行的線程數 int 1  
log.cleaner.io.buffer.load.factor 日志清理中hash表的
擴大因子,一般不需
要修改
double 0.9  
log.cleaner.io.buffer.size log cleaner清除過程
中針對日志進行索引
化以及精簡化所用到
的緩存大小。最好設置大
點,以提供充足的內存
int 524288  
log.cleaner.io.max.bytes.per.second 進行log compaction時,
log cleaner可以擁有的
最大I/O數目。這項設置
限制了cleaner,以避免干
擾活動的請求服務。
double 1.79769313
48623157E308
(此為一
個數字)
 
log.cleaner.min.cleanable.ratio 這項配置控制
log compactor
試圖清理日志的頻率
(假定[log compaction]
是打開的)。
默認避免清理壓縮超過
50%的日志。這個比率
綁定了備份日志所消耗
的最大空間(50%的
日志備份時壓縮率為50%)。
更高的比率則意味着浪
費消耗更少,也就可
以更有效的清理更多
的空間。這項設置在
每個topic設置中可以覆蓋
double 0.5  
log.preallocate 是否預創建新文件,windows推薦使用 boolean false  
log.retention.check.interval.ms 檢查日志分段文件的間隔時間,以確定是否文件屬性是否到達刪除要求。 long 300000  
max.connections.per.ip 一個broker允許從每個ip地址連接的最大數目 int 2147483647
=Int.
MaxValue
 
max.connections.per.ip.overrides 每個IP或主機名覆蓋連接的默認最大數量 string “”  
replica.fetch.backoff.ms 復制數據時失敗等待時間 int 1000  
reserved.broker.max.id broker可以使用的最大ID值 int 1000  
topic level 配置
broker級別的參數可以由topic級別的覆寫,不是所有的broker參數在topic級別都有對應值 
以下是topic-level的配置選項。server的默認配置在Server Default Property列下給出了,設定這些默認值不會改變原有的設置

Property Default Server Default Property Description
cleanup.policy delete log.cleanup.policy 要么是”delete“要么是”compact“; 這個字符串指明了針對舊日志部分的利用方式;默認方式(”delete”)將會丟棄舊的部分當他們的回收時間或者尺寸限制到達時。”compact“將會進行日志壓縮
delete.retention.ms 86400000 (24 hours) log.cleaner.delete.retention.ms 對於壓縮日志保留的最長時間,也是客戶端消費消息的最長時間,通log.retention.minutes的區別在於一個控制未壓縮數據,一個控制壓縮后的數據。此項配置可以在topic創建時的置頂參數覆蓋
flush.messages none log.flush.interval.messages 此項配置指定時間間隔:強制進行fsync日志。例如,如果這個選項設置為1,那么每條消息之后都需要進行fsync,如果設置為5,則每5條消息就需要進行一次fsync。一般來說,建議你不要設置這個值。此參數的設置,需要在”數據可靠性”與”性能”之間做必要的權衡.如果此值過大,將會導致每次”fsync”的時間較長(IO阻塞),如果此值過小,將會導致”fsync”的次數較多,這也意味着整體的client請求有一定的延遲.物理server故障,將會導致沒有fsync的消息丟失.
flush.ms None log.flush.interval.ms 此項配置用來置頂強制進行fsync日志到磁盤的時間間隔;例如,如果設置為1000,那么每1000ms就需要進行一次fsync。一般不建議使用這個選項
index.interval.bytes 4096 log.index.interval.bytes 默認設置保證了我們每4096個字節就對消息添加一個索引,更多的索引使得閱讀的消息更加靠近,但是索引規模卻會由此增大;一般不需要改變這個選項
max.message.bytes 1000000 max.message.bytes kafka追加消息的最大尺寸。注意如果你增大這個尺寸,你也必須增大你consumer的fetch 尺寸,這樣consumer才能fetch到這些最大尺寸的消息。
min.cleanable.dirty.ratio 0.5 in.cleanable.dirty.ratio 此項配置控制log壓縮器試圖進行清除日志的頻率。默認情況下,將避免清除壓縮率超過50%的日志。這個比率避免了最大的空間浪費
min.insync.replicas 1 min.insync.replicas 當producer設置 acks為-1時,min.insync.replicas指定replicas的最小數目(必須確認每一個repica的寫數據都是成功的),如果這個數目沒有達到,producer會產生異常。
retention.bytes None log.retention.bytes 如果使用“delete”的retention 策略,這項配置就是指在刪除日志之前,日志所能達到的最大尺寸。默認情況下,沒有尺寸限制而只有時間限制
retention.ms 7 days log.retention.minutes 如果使用“delete”的retention策略,這項配置就是指刪除日志前日志保存的時間。
segment.bytes 1GB log.segment.bytes kafka中log日志是分成一塊塊存儲的,此配置是指log日志划分成塊的大小
segment.index.bytes 10MB log.index.size.max.bytes 此配置是有關offsets和文件位置之間映射的索引文件的大小;一般不需要修改這個配置
segment.ms 7 days log.roll.hours 即使log的分塊(segment)文件沒有達到需要刪除、壓縮的大小,一旦log 的時間達到這個上限,就會強制新建一個log分塊文件
segment.jitter.ms 0 log.roll.jitter.{ms,hours} The maximum jitter to subtract from logRollTimeMillis.


Producer配置

名稱 說明 類型 默認值 有效值 重要性
bootstrap.servers 用於建立與kafka集群連接的host/port組。數據將會在所有servers上均衡加載,不管哪些server是指定用於bootstrapping。這個列表僅僅影響初始化的hosts(用於發現全部的servers)。這個列表格式:host1:port1,host2:port2,…因為這些server僅僅是用於初始化的連接,以發現集群所有成員關系(可能會動態的變化),這個列表不需要包含所有的servers(你可能想要不止一個server,盡管這樣,可能某個server宕機了)。如果沒有server在這個列表出現,則發送數據會一直失敗,直到列表可用。 list    
acks producer需要server接收到數據之后發出的確認接收的信號,此項配置就是指procuder需要多少個這樣的確認信號。此配置實際上代表了數據備份的可用性。以下設置為常用選項:(1)acks=0: 設置為0表示producer不需要等待任何確認收到的信息。副本將立即加到socket buffer並認為已經發送。沒有任何保障可以保證此種情況下server已經成功接收數據,同時重試配置不會發生作用(因為客戶端不知道是否失敗)回饋的offset會總是設置為-1;(2)acks=1: 這意味着至少要等待leader已經成功將數據寫入本地log,但是並沒有等待所有follower是否成功寫入。這種情況下,如果follower沒有成功備份數據,而此時leader又掛掉,則消息會丟失。(3)acks=all: 這意味着leader需要等待所有備份都成功寫入日志,這種策略會保證只要有一個備份存活就不會丟失數據。這是最強的保證。, string -1 [all -1 0 1]
key.serializer key的序列化方式,若是沒有設置,同serializer.class 實現Serializer接口的class    
value.serializer value序列化類方式 實現Serializer接口的class    
buffer.memory producer可以用來緩存數據的內存大小。如果數據產生速度大於向broker發送的速度,producer會阻塞或者拋出異常,以“block.on.buffer.full”來表明。這項設置將和producer能夠使用的總內存相關,但並不是一個硬性的限制,因為不是producer使用的所有內存都是用於緩存。一些額外的內存會用於壓縮(如果引入壓縮機制),同樣還有一些用於維護請求。 long 33554432  
compression.type producer用於壓縮數據的壓縮類型。默認是無壓縮。正確的選項值是none、gzip、snappy。壓縮最好用於批量處理,批量處理消息越多,壓縮性能越好 string none  
retries 設置大於0的值將使客戶端重新發送任何數據,一旦這些數據發送失敗。注意,這些重試與客戶端接收到發送錯誤時的重試沒有什么不同。允許重試將潛在的改變數據的順序,如果這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。 int 0  
batch.size producer將試圖批處理消息記錄,以減少請求次數。這將改善client與server之間的性能。這項配置控制默認的批量處理消息字節數。不會試圖處理大於這個字節數的消息字節數。發送到brokers的請求將包含多個批量處理,其中會包含對每個partition的一個請求。較小的批量處理數值比較少用,並且可能降低吞吐量(0則會僅用批量處理)。較大的批量處理數值將會浪費更多內存空間,這樣就需要分配特定批量處理數值的內存大小。 int 16384  
client.id 當向server發出請求時,這個字符串會發送給server。目的是能夠追蹤請求源頭,以此來允許ip/port許可列表之外的一些應用可以發送信息。這項應用可以設置任意字符串,因為沒有任何功能性的目的,除了記錄和跟蹤 string “”  
connections.max.idle.ms 關閉連接空閑時間 long 540000  
linger.ms producer組將會匯總任何在請求與發送之間到達的消息記錄一個單獨批量的請求。通常來說,這只有在記錄產生速度大於發送速度的時候才能發生。然而,在某些條件下,客戶端將希望降低請求的數量,甚至降低到中等負載一下。這項設置將通過增加小的延遲來完成–即,不是立即發送一條記錄,producer將會等待給定的延遲時間以允許其他消息記錄發送,這些消息記錄可以批量處理。這可以認為是TCP種Nagle的算法類似。這項設置設定了批量處理的更高的延遲邊界:一旦我們獲得某個partition的batch.size,他將會立即發送而不顧這項設置,然而如果我們獲得消息字節數比這項設置要小的多,我們需要“linger”特定的時間以獲取更多的消息。 這個設置默認為0,即沒有延遲。設定linger.ms=5,例如,將會減少請求數目,但是同時會增加5ms的延遲。 long 0  
max.block.ms 控制block的時長,當buffer空間不夠或者metadata丟失時產生block long 60000  
max.request.size 請求的最大字節數。這也是對最大記錄尺寸的有效覆蓋。注意:server具有自己對消息記錄尺寸的覆蓋,這些尺寸和這個設置不同。此項設置將會限制producer每次批量發送請求的數目,以防發出巨量的請求。 int 1048576  
partitioner.class 分區類 實現Partitioner 的class class 
org.
apache.
kafka.
clients.
producer.
internals.
DefaultPartitioner
 
receive.buffer.bytes socket的接收緩存空間大小,當閱讀數據時使用 int 32768  
request.timeout.ms 客戶端將等待請求的響應的最大時間,如果在這個時間內沒有收到響應,客戶端將重發請求;超過重試次數將拋異常 int 3000  
send.buffer.bytes 發送數據時的緩存空間大小 int 131072  
timeout.ms 此配置選項控制server等待來自followers的確認的最大時間。如果確認的請求數目在此時間內沒有實現,則會返回一個錯誤。這個超時限制是以server端度量的,沒有包含請求的網絡延遲 int 30000  
max.in.flight.requests.per.connection kafka可以在一個connection中發送多個請求,叫作一個flight,這樣可以減少開銷,但是如果產生錯誤,可能會造成數據的發送順序改變,默認是5 (修改) int 5  
metadata.fetch.timeout.ms 是指我們所獲取的一些元素據的第一個時間數據。元素據包含:topic,host,partitions。此項配置是指當等待元素據fetch成功完成所需要的時間,否則會跑出異常給客戶端 long 60000  
metadata.max.age.ms 以微秒為單位的時間,是在我們強制更新metadata的時間間隔。即使我們沒有看到任何partition leadership改變。 long 300000  
metric.reporters 類的列表,用於衡量指標。實現MetricReporter接口,將允許增加一些類,這些類在新的衡量指標產生時就會改變。JmxReporter總會包含用於注冊JMX統計 list none  
metrics.num.samples 用於維護metrics的樣本數 int 2  
metrics.sample.window.ms metrics系統維護可配置的樣本數量,在一個可修正的window size。這項配置配置了窗口大小,例如。我們可能在30s的期間維護兩個樣本。當一個窗口推出后,我們會擦除並重寫最老的窗口 long 30000  
reconnect.backoff.ms 連接失敗時,當我們重新連接時的等待時間。這避免了客戶端反復重連 long 10  
retry.backoff.ms 在試圖重試失敗的produce請求之前的等待時間。避免陷入發送-失敗的死循環中 long 100  



Consumer配置

Property Default Description
group.id   用來唯一標識consumer進程所在組的字符串,如果設置同樣的group id,表示這些processes都是屬於同一個consumer group
zookeeper.connect   指定zookeeper的連接的字符串,格式是hostname:port,此處host和port都是zookeeper server的host和port,為避免某個zookeeper 機器宕機之后失聯,你可以指定多個hostname:port,使用逗號作為分隔:hostname1:port1,hostname2:port2,hostname3:port3可以在zookeeper連接字符串中加入zookeeper的chroot路徑,此路徑用於存放他自己的數據,方式:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
consumer.id null 不需要設置,一般自動產生
socket.timeout.ms 30*1000 網絡請求的超時限制。真實的超時限制是 max.fetch.wait+socket.timeout.ms
socket.receive.buffer.bytes 64*1024 socket用於接收網絡請求的緩存大小
fetch.message.max.bytes 1024*1024 每次fetch請求中,針對每次fetch消息的最大字節數。這些字節將會督導用於每個partition的內存中,因此,此設置將會控制consumer所使用的memory大小。這個fetch請求尺寸必須至少和server允許的最大消息尺寸相等,否則,producer可能發送的消息尺寸大於consumer所能消耗的尺寸。
num.consumer.fetchers 1 用於fetch數據的fetcher線程數
auto.commit.enable true 如果為真,consumer所fetch的消息的offset將會自動的同步到zookeeper。這項提交的offset將在進程掛掉時,由新的consumer使用
auto.commit.interval.ms 60*1000 consumer向zookeeper提交offset的頻率
queued.max.message.chunks 2 用於緩存消息的最大數目,以供consumption。每個chunk必須和fetch.message.max.bytes相同
rebalance.max.retries 4 當新的consumer加入到consumer group時,consumers集合試圖重新平衡分配到每個consumer的partitions數目。如果consumers集合改變了,當分配正在執行時,這個重新平衡會失敗並重入
fetch.min.bytes 1 每次fetch請求時,server應該返回的最小字節數。如果沒有足夠的數據返回,請求會等待,直到足夠的數據才會返回。
fetch.wait.max.ms 100 如果沒有足夠的數據能夠滿足fetch.min.bytes,則此項配置是指在應答fetch請求之前,server會阻塞的最大時間。
rebalance.backoff.ms 2000 在重試reblance之前backoff時間
refresh.leader.backoff.ms 200 在試圖確定某個partition的leader是否失去他的leader地位之前,需要等待的backoff時間
auto.offset.reset largest zookeeper中沒有初始化的offset時,如果offset是以下值的回應:smallest:自動復位offset為smallest的offsetlargest:自動復位offset為largest的offsetanything else:向consumer拋出異常
consumer.timeout.ms -1 如果沒有消息可用,即使等待特定的時間之后也沒有,則拋出超時異常
exclude.internal.topics true 是否將內部topics的消息暴露給consumer
paritition.assignment.strategy range 選擇向consumer 流分配partitions的策略,可選值:range,roundrobin
client.id group id value 是用戶特定的字符串,用來在每次請求中幫助跟蹤調用。它應該可以邏輯上確認產生這個請求的應用
zookeeper.session.timeout.ms 6000 zookeeper 會話的超時限制。如果consumer在這段時間內沒有向zookeeper發送心跳信息,則它會被認為掛掉了,並且reblance將會產生
zookeeper.connection.timeout.ms 6000 客戶端在建立通zookeeper連接中的最大等待時間
zookeeper.sync.time.ms 2000 ZK follower可以落后ZK leader的最大時間
offsets.storage zookeeper 用於存放offsets的地點: zookeeper或者kafka
offset.channel.backoff.ms 1000 重新連接offsets channel或者是重試失敗的offset的fetch/commit請求的backoff時間
offsets.channel.socket.timeout.ms 10000 當讀取offset的fetch/commit請求回應的socket 超時限制。此超時限制是被consumerMetadata請求用來請求offset管理
offsets.commit.max.retries 5 重試offset commit的次數。這個重試只應用於offset commits在shut-down之間。
dual.commit.enabled true 如果使用“kafka”作為offsets.storage,你可以二次提交offset到zookeeper(還有一次是提交到kafka)。在zookeeper-based的offset storage到kafka-based的offset storage遷移時,這是必須的。對任意給定的consumer group來說,比較安全的建議是當完成遷移之后就關閉這個選項
partition.assignment.strategy range 在“range”和“roundrobin”策略之間選擇一種作為分配partitions給consumer 數據流的策略; 循環的partition分配器分配所有可用的partitions以及所有可用consumer 線程。它會將partition循環的分配到consumer線程上。如果所有consumer實例的訂閱都是確定的,則partitions的划分是確定的分布。循環分配策略只有在以下條件滿足時才可以:(1)每個topic在每個consumer實力上都有同樣數量的數據流。(2)訂閱的topic的集合對於consumer group中每個consumer實例來說都是確定的。
備注:從 0.9.0.0版本后,kafkat添加了新的 消費者API及對應的 consumer配置,有一些較大的變化,這就要求使用對應的kafka-clients(客戶端)版本。






來自:csdn
作者;louisliao_1981


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM