kafka默認的配置詳情,配置文件路徑在kafka/config下面
一.server.properties:服務端的配置文件
#broker的全局唯一編號,不能重復
broker.id=0
#用來監聽鏈接的端口,producer或consumer將在此端口建立連接
port=9092
#處理網絡請求的線程數量,也就是接收消息的線程數。
#接收線程會將接收到的消息放到內存中,然后再從內存中寫入磁盤。
num.network.threads=3
#消息從內存中寫入磁盤是時候使用的線程數量。
#用來處理磁盤IO的線程數量
num.io.threads=8
#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400
#接受套接字的緩沖區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩沖區大小
socket.request.max.bytes=104857600
#kafka運行日志存放的路徑
log.dirs=/export/servers/logs/kafka
#topic在當前broker上的分片個數
num.partitions=2
#我們知道segment文件默認會被保留7天的時間,超時的話就
#會被清理,那么清理這件事情就需要有一些線程來做。這里就是
#用來設置恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時間,默認保留7天(168小時),
#超時將被刪除,也就是說7天之前的數據將被清理掉。
log.retention.hours=168
#滾動生成新的segment文件的最大時間
log.roll.hours=168
#日志文件中每個segment的大小,默認為1G
log.segment.bytes=1073741824
#上面的參數設置了每一個segment文件的大小是1G,那么
#就需要有一個東西去定期檢查segment文件有沒有達到1G,
#多長時間去檢查一次,就需要設置一個周期性檢查文件大小
#的時間(單位是毫秒)。
log.retention.check.interval.ms=300000
#日志清理是否打開
log.cleaner.enable=true
#broker需要使用zookeeper保存meta數據
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
#zookeeper鏈接超時時間
zookeeper.connection.timeout.ms=6000
#上面我們說過接收線程會將接收到的消息放到內存中,然后再從內存
#寫到磁盤上,那么什么時候將消息從內存中寫入磁盤,就有一個
#時間限制(時間閾值)和一個數量限制(數量閾值),這里設置的是
#數量閾值,下一個參數設置的則是時間閾值。
#partion buffer中,消息的條數達到閾值,將觸發flush到磁盤。
log.flush.interval.messages=10000
#消息buffer的時間,達到閾值,將觸發將消息從內存flush到磁盤,
#單位是毫秒。
log.flush.interval.ms=3000
#刪除topic需要server.properties中設置delete.topic.enable=true否則只是標記刪除
delete.topic.enable=true
#此處的host.name為本機IP(重要),如果不改,則客戶端會拋出:
#Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=kafka01
advertised.host.name=loclahost
二.producer.properties:生產端的配置文件
#指定kafka節點列表,用於獲取metadata,不必全部指定
#需要kafka的服務器地址,來獲取每一個topic的分片數等元數據信息。
metadata.broker.list=kafka01:9092,kafka02:9092,kafka03:9092
#生產者生產的消息被發送到哪個block,需要一個分組策略。
#指定分區處理類。默認kafka.producer.DefaultPartitioner,表通過key哈希到對應分區
#partitioner.class=kafka.producer.DefaultPartitioner
#生產者生產的消息可以通過一定的壓縮策略(或者說壓縮算法)來壓縮。消息被壓縮后發送到broker集群,
#而broker集群是不會進行解壓縮的,broker集群只會把消息發送到消費者集群,然后由消費者來解壓縮。
#是否壓縮,默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。
#壓縮后消息中會有頭來指明消息壓縮類型,故在消費者端消息解壓是透明的無需指定。
#文本數據會以1比10或者更高的壓縮比進行壓縮。
compression.codec=none
#指定序列化處理類,消息在網絡上傳輸就需要序列化,它有String、數組等許多種實現。
serializer.class=kafka.serializer.DefaultEncoder
#如果要壓縮消息,這里指定哪些topic要壓縮消息,默認empty,表示不壓縮。
#如果上面啟用了壓縮,那么這里就需要設置
#compressed.topics=
#這是消息的確認機制,默認值是0。在面試中常被問到。
#producer有個ack參數,有三個值,分別代表:
#(1)不在乎是否寫入成功;
#(2)寫入leader成功;
#(3)寫入leader和所有副本都成功;
#要求非常可靠的話可以犧牲性能設置成最后一種。
#為了保證消息不丟失,至少要設置為1,也就
#是說至少保證leader將消息保存成功。
#設置發送數據是否需要服務端的反饋,有三個值0,1,-1,分別代表3種狀態:
#0: producer不會等待broker發送ack。生產者只要把消息發送給broker之后,就認為發送成功了,這是第1種情況;
#1: 當leader接收到消息之后發送ack。生產者把消息發送到broker之后,並且消息被寫入到本地文件,才認為發送成功,這是第二種情況;#-1: 當所有的follower都同步消息成功后發送ack。不僅是主的分區將消息保存成功了,
#而且其所有的分區的副本數也都同步好了,才會被認為發動成功,這是第3種情況。
request.required.acks=0
#broker必須在該時間范圍之內給出反饋,否則失敗。
#在向producer發送ack之前,broker允許等待的最大時間 ,如果超時,
#broker將會向producer發送一個error ACK.意味着上一次消息因為某種原因
#未能成功(比如follower未能同步成功)
request.timeout.ms=10000
#生產者將消息發送到broker,有兩種方式,一種是同步,表示生產者發送一條,broker就接收一條;
#還有一種是異步,表示生產者積累到一批的消息,裝到一個池子里面緩存起來,再發送給broker,
#這個池子不會無限緩存消息,在下面,它分別有一個時間限制(時間閾值)和一個數量限制(數量閾值)的參數供我們來設置。
#一般我們會選擇異步。
#同步還是異步發送消息,默認“sync”表同步,"async"表異步。異步可以提高發送吞吐量,
#也意味着消息將會在本地buffer中,並適時批量發送,但是也可能導致丟失未發送過去的消息
producer.type=sync
#在async模式下,當message被緩存的時間超過此值后,將會批量發送給broker,
#默認為5000ms
#此值和batch.num.messages協同工作.
queue.buffering.max.ms = 5000
#異步情況下,緩存中允許存放消息數量的大小。
#在async模式下,producer端允許buffer的最大消息量
#無論如何,producer都無法盡快的將消息發送給broker,從而導致消息在producer端大量沉積
#此時,如果消息的條數達到閥值,將會導致producer端阻塞或者消息被拋棄,默認為10000條消息。
queue.buffering.max.messages=20000
#如果是異步,指定每次批量發送數據量,默認為200
batch.num.messages=500
#在生產端的緩沖池中,消息發送出去之后,在沒有收到確認之前,該緩沖池中的消息是不能被刪除的,
#但是生產者一直在生產消息,這個時候緩沖池可能會被撐爆,所以這就需要有一個處理的策略。
#有兩種處理方式,一種是讓生產者先別生產那么快,阻塞一下,等會再生產;另一種是將緩沖池中的消息清空。
#當消息在producer端沉積的條數達到"queue.buffering.max.meesages"后阻塞一定時間后,
#隊列仍然沒有enqueue(producer仍然沒有發送出任何消息)
#此時producer可以繼續阻塞或者將消息拋棄,此timeout值用於控制"阻塞"的時間
#-1: 不限制阻塞超時時間,讓produce一直阻塞,這個時候消息就不會被拋棄
#0: 立即清空隊列,消息被拋棄
queue.enqueue.timeout.ms=-1
#當producer接收到error ACK,或者沒有接收到ACK時,允許消息重發的次數
#因為broker並沒有完整的機制來避免消息重復,所以當網絡異常時(比如ACK丟失)
#有可能導致broker接收到重復的消息,默認值為3.
message.send.max.retries=3
#producer刷新topic metada的時間間隔,producer需要知道partition leader
#的位置,以及當前topic的情況
#因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,
#將會立即刷新
#(比如topic失效,partition丟失,leader失效等),此外也可以通過此參數來配置
#額外的刷新機制,默認值600000
topic.metadata.refresh.interval.ms=60000
三.consumer.properties:消費端的配置文件
#消費者集群通過連接Zookeeper來找到broker。
#zookeeper連接服務器地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
#zookeeper的session過期時間,默認5000ms,用於檢測消費者是否掛掉
zookeeper.session.timeout.ms=5000
#當消費者掛掉,其他消費者要等該指定時間才能檢查到並且觸發重新負載均衡
zookeeper.connection.timeout.ms=10000
#這是一個時間閾值。
#指定多久消費者更新offset到zookeeper中。
#注意offset更新時基於time而不是每次獲得的消息。
#一旦在更新zookeeper發生異常並重啟,將可能拿到已拿到過的消息
zookeeper.sync.time.ms=2000
#指定消費
group.id=xxxxx
#這是一個數量閾值,經測試是500條。
#當consumer消費一定量的消息之后,將會自動向zookeeper提交offset信息#注意offset信息並不是每消費一次消息就向zk提交
#一次,而是現在本地保存(內存),並定期提交,默認為true
auto.commit.enable=true
# 自動更新時間。默認60 * 1000
auto.commit.interval.ms=1000
# 當前consumer的標識,可以設定,也可以有系統生成,
#主要用來跟蹤消息消費情況,便於觀察
conusmer.id=xxx
# 消費者客戶端編號,用於區分不同客戶端,默認客戶端程序自動產生
client.id=xxxx
# 最大取多少塊緩存到消費者(默認10)
queued.max.message.chunks=50
# 當有新的consumer加入到group時,將會reblance,此后將會
#有partitions的消費端遷移到新 的consumer上,如果一個
#consumer獲得了某個partition的消費權限,那么它將會向zk
#注冊 "Partition Owner registry"節點信息,但是有可能
#此時舊的consumer尚沒有釋放此節點, 此值用於控制,
#注冊節點的重試次數.
rebalance.max.retries=5
#每拉取一批消息的最大字節數
#獲取消息的最大尺寸,broker不會像consumer輸出大於
#此值的消息chunk 每次feth將得到多條消息,此值為總大小,
#提升此值,將會消耗更多的consumer端內存
fetch.min.bytes=6553600
#當消息的尺寸不足時,server阻塞的時間,如果超時,
#消息將立即發送給consumer
#數據一批一批到達,如果每一批是10條消息,如果某一批還
#不到10條,但是超時了,也會立即發送給consumer。
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
# 如果zookeeper沒有offset值或offset值超出范圍。
#那么就給個初始的offset。有smallest、largest、
#anything可選,分別表示給當前最小的offset、
#當前最大的offset、拋異常。默認largest
auto.offset.reset=smallest
# 指定序列化處理類
derializer.class=kafka.serializer.DefaultDecoder
————————————————
原文鏈接:https://blog.csdn.net/xionglangs/article/details/81507617