Kafka 0.8 配置參數解析


http://kafka.apache.org/documentation.html#configuration

 

Broker Configs

4個必填參數,

broker.id
Each broker is uniquely identified by a non-negative integer id
broker唯一標識,broker可以在不同的host或port,但必須保證id唯一

log.dirs (/tmp/kafka-logs)
日志文件存放的目錄
可以用逗號隔開多個目錄,當創建partitions時,會自動挑一個已創建partition最少的目錄創建
因為Kafka必須充分利用磁盤資源,所以要讓partititons均勻分布在所有disks上,至少每個disk創建一個目錄

port (6667)
broker server所在端口

zookeeper.connect
zk的地址,hostname1:port1,hostname2:port2

 

可選的重要參數,

advertised.host.name (null)
設置暴露給其他Consumer,producer,broker的域名
不設置會暴露默認域名,域名解析比較麻煩,這里可以設為ip

message.max.bytes (1000000)
broker可以接收的message的最大size
注意要和consumer的maximum fetch size相匹配

num.io.threads (8)
處理I/O的線程數,即讀寫磁盤的線程數,所以至少等於磁盤數

queued.max.requests (500)
可以queue的最大請求數,包括producer的寫請求和consumer的讀請求

log.segment.bytes (1024 * 1024 * 1024
log.roll.hours
上面2個配置用於設置,何時為partition產生新的segment文件
為了防止partition文件過大,所以partition是由一組segment文件組成
可以通過設置size或時間來決定何時roll

log.retention.{minutes,hours} (24 * 7)
log.retention.bytes (-1)
log.retention.check.interval.ms (5*60*1000, 5分鍾)
默認是7天數據
當然你可以通過partition大小來設置刪除threshold,默認是-1,即關掉的,注意這里設置的是partition大小,如果多個partition需要乘上partition數

log.cleanup.policy (delete or compact, default delete)
log.cleaner.enable (false)
log.cleaner.threads (1)
當前retention支持兩種類型,默認就是刪除舊數據
新加了compact模式,應用場景來源於Linkedin之前的Databus
上面列出主要的相關配置,如果要使用compact
log.cleanup.policy = compact
log.cleaner.enable  = true
http://kafka.apache.org/documentation.html#compaction
這個場景,數據流中的key是有限重復的,只是在不斷的更新value
compact過程就是當觸發retention條件時,刪除相同key的舊值,只保留最新值

image
局限,
a. 無法指定compact的范圍,除了last segment(正在被寫),其他所有的segments都可能被compact
b. 不支持compress topics

 

log.index.size.max.bytes (10*1024*1024)
log.index.interval.bytes (4096)
這個由於0.8版本,做出妥協,用邏輯offset來替換原先的物理offset
那就需要在邏輯offset和物理地址之間創建index
log.index.size.max.bytes,一個segment最大的index的大小,超出會創建新的segment,無論segment本身的配置
log.index.interval.bytes,做index的間隔,也是系統為了找到一個物理offset需要做linear scan的最大值

 

log.flush.interval.messages (None)
log.flush.interval.ms (None)
log.flush.scheduler.interval.ms (3000),多久check
為了效率,broker會緩存部分log,然后再flush到磁盤
這里可以通過message數或時間來控制flush策略

 

replica相關的參數,

default.replication.factor (1), The default replication factor for automatically created topics
默認的副本數

replica.lag.time.max.ms (10000), follower的最大lag時間,即如果leader在這個時間內都沒有收到follower的fetch請求,就認為它dead
If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead.
replica.lag.max.messages (4000), 最大lag消息數,超過這個消息數,leader會認為該follower dead
If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as dead.

replica.socket.timeout.ms (30 * 1000), The socket timeout for network requests to the leader for replicating data.
replica.socket.receive.buffer.bytes (64 * 1024), The socket receive buffer for network requests to the leader for replicating data.

replica.fetch.max.bytes (1024 * 1024), 一次fetch request最大可以fetch的數據size
The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader.
replica.fetch.wait.max.ms (500), fetch request的等待超時
The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader.
replica.fetch.min.bytes (1), Minimum bytes expected for each fetch response for the fetch requests from the replica to the leader. If not enough bytes, wait up to replica.fetch.wait.max.ms for this many bytes to arrive.

num.replica.fetchers (1), follower上用於同步leader數據的線程數
Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker.

replica.high.watermark.checkpoint.interval.ms (5000), checkpoint HW的interval
The frequency with which each replica saves its high watermark to disk to handle recovery.

 

failover相關的參數,

controlled.shutdown.enable (false)
用於Graceful shutdown,當設為true時,在用stop命令關閉broker,
會做下面兩點優化,
a. sync all its logs to disk,這樣可以加速重啟,避免checksum校驗等步驟
b. migrate任意partitions的leader replica到其他的brokers,這樣主動的遷移,比發現異常后被動遷移更快
注意,設為true,只有當broker上所有的replica,不是該partition唯一replica時才能stop成功
因為如果直接關掉,會導致該partition不work

auto.leader.rebalance.enable (false)
leader.imbalance.per.broker.percentage (10)
leader.imbalance.check.interval.seconds (300)

broker crash后,partition的leader replica會被遷到其他的broker
但是當broker恢復后,上面的replica會成為follower,但不會自動切成leader
這樣會導致不均衡,所以當設為true時,會自動做leader的rebalance
默認超過10%,認為不均衡,300秒check一次

 

Topic-level configuration

其中一些參數是可以按topic設置的,並且可以修改刪除

> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1 
        --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic 
    --config max.message.bytes=128000
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic 
    --deleteConfig max.message.bytes

上面分別是創建,修改和刪除的命令
可以按topic設置的參數包括,參考文檔
http://kafka.apache.org/documentation.html#topic-config

需要注意的是,這里的屬性名和配置文件中的屬性名是不一樣的。。。

 

Producer Configs

參考,Kafka Producer接口

 

Consumer Configs

首先只有在用high level consumer時才需要管這個配置
如果用low level,自己做代碼里面寫,這個配置不起作用的

2個必填參數,

group.id
A string that uniquely identifies the group of consumer processes to which this consumer belongs.

zookeeper.connect
zk地址,hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
可以看到zk地址其實是可以設置到,目錄級別的(比如用於一個zk管理多個kafka集群),但是如果不用默認目錄,需要自己創建和維護這個zk目錄
並且必須要和broker中的zk配置保持一致

 

可選配置,

fetch.message.max.bytes (1024 * 1024)
The number of byes of messages to attempt to fetch for each topic-partition in each fetch request
為了效率,consumer從kafka fetch數據,也是批量fetch的,雖然你處理的時候是onebyone邏輯,但后台是預讀的
這個值至少要等於broker里面設置的maximum message size,否則有可能連一條message都取不下來
默認是1m,設太大會爆內存,而且會增大讀重的可能性,因為當consumer發生變化時,會發生rebalance,這時被新分配到這個partition的consumer仍然會讀到預讀但沒有commit的數據

auto.commit.enable (true), auto.commit.interval.ms (60 * 1000)
默認是會自動commit offset到zk的,如果要自己commit,設為false
默認除非自動commit的時間是1分鍾
用手工commit的簡單的理由是,防止consumer crash丟失數據,可以在確認數據被正確處理后,再手工commit offset

queued.max.message.chunks (10)
上面說的fetch.message.max.bytes是針對一個partition的,但是一個consumer是可以同時讀多個partition,所以對每個partition都可以讀一個這樣的chunk
所以這個配置是,同時可以讀多少大小為fetch.message.max.bytes的chunk
默認是10,個人認為至少要大於讀的partition個數吧,但避免耗盡內存

fetch.wait.max.ms (100), fetch.min.bytes (1)
當取不到fetch.message.max.bytes時,最大block時間,並且返回fetch.min.bytes大小數據

auto.offset.reset
* smallest : automatically reset the offset to the smallest offset
* largest : automatically reset the offset to the largest offset
* anything else: throw exception to the consumer.

consumer.timeout.ms (-1)
Throw a timeout exception to the consumer if no message is available for consumption after the specified interval
默認是block的,當consumer取不到數據


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM