在kafka的安裝目錄下,config目錄下有個名字叫做producer.properties的配置文件
#指定kafka節點列表,用於獲取metadata,不必全部指定 #需要kafka的服務器地址,來獲取每一個topic的分片數等元數據信息。 metadata.broker.list=kafka01:9092,kafka02:9092,kafka03:9092 #生產者生產的消息被發送到哪個block,需要一個分組策略。 #指定分區處理類。默認kafka.producer.DefaultPartitioner,表通過key哈希到對應分區 #partitioner.class=kafka.producer.DefaultPartitioner #生產者生產的消息可以通過一定的壓縮策略(或者說壓縮算法)來壓縮。消息被壓縮后發送到broker集群, #而broker集群是不會進行解壓縮的,broker集群只會把消息發送到消費者集群,然后由消費者來解壓縮。 #是否壓縮,默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。 #壓縮后消息中會有頭來指明消息壓縮類型,故在消費者端消息解壓是透明的無需指定。 #文本數據會以1比10或者更高的壓縮比進行壓縮。 compression.codec=none #指定序列化處理類,消息在網絡上傳輸就需要序列化,它有String、數組等許多種實現。 serializer.class=kafka.serializer.DefaultEncoder #如果要壓縮消息,這里指定哪些topic要壓縮消息,默認empty,表示不壓縮。 #如果上面啟用了壓縮,那么這里就需要設置 #compressed.topics= #這是消息的確認機制,默認值是0。在面試中常被問到。 #producer有個ack參數,有三個值,分別代表: #(1)不在乎是否寫入成功; #(2)寫入leader成功; #(3)寫入leader和所有副本都成功; #要求非常可靠的話可以犧牲性能設置成最后一種。 #為了保證消息不丟失,至少要設置為1,也就 #是說至少保證leader將消息保存成功。 #設置發送數據是否需要服務端的反饋,有三個值0,1,-1,分別代表3種狀態: #0: producer不會等待broker發送ack。生產者只要把消息發送給broker之后,就認為發送成功了,這是第1種情況; #1: 當leader接收到消息之后發送ack。生產者把消息發送到broker之后,並且消息被寫入到本地文件,才認為發送成功,這是第二種情況;#-1: 當所有的follower都同步消息成功后發送ack。不僅是主的分區將消息保存成功了, #而且其所有的分區的副本數也都同步好了,才會被認為發動成功,這是第3種情況。 request.required.acks=0 #broker必須在該時間范圍之內給出反饋,否則失敗。 #在向producer發送ack之前,broker允許等待的最大時間 ,如果超時, #broker將會向producer發送一個error ACK.意味着上一次消息因為某種原因 #未能成功(比如follower未能同步成功) request.timeout.ms=10000 #生產者將消息發送到broker,有兩種方式,一種是同步,表示生產者發送一條,broker就接收一條; #還有一種是異步,表示生產者積累到一批的消息,裝到一個池子里面緩存起來,再發送給broker, #這個池子不會無限緩存消息,在下面,它分別有一個時間限制(時間閾值)和一個數量限制(數量閾值)的參數供我們來設置。 #一般我們會選擇異步。 #同步還是異步發送消息,默認“sync”表同步,"async"表異步。異步可以提高發送吞吐量, #也意味着消息將會在本地buffer中,並適時批量發送,但是也可能導致丟失未發送過去的消息 producer.type=sync #在async模式下,當message被緩存的時間超過此值后,將會批量發送給broker, #默認為5000ms #此值和batch.num.messages協同工作. queue.buffering.max.ms = 5000 #異步情況下,緩存中允許存放消息數量的大小。 #在async模式下,producer端允許buffer的最大消息量 #無論如何,producer都無法盡快的將消息發送給broker,從而導致消息在producer端大量沉積 #此時,如果消息的條數達到閥值,將會導致producer端阻塞或者消息被拋棄,默認為10000條消息。 queue.buffering.max.messages=20000 #如果是異步,指定每次批量發送數據量,默認為200 batch.num.messages=500 #在生產端的緩沖池中,消息發送出去之后,在沒有收到確認之前,該緩沖池中的消息是不能被刪除的, #但是生產者一直在生產消息,這個時候緩沖池可能會被撐爆,所以這就需要有一個處理的策略。 #有兩種處理方式,一種是讓生產者先別生產那么快,阻塞一下,等會再生產;另一種是將緩沖池中的消息清空。 #當消息在producer端沉積的條數達到"queue.buffering.max.meesages"后阻塞一定時間后, #隊列仍然沒有enqueue(producer仍然沒有發送出任何消息) #此時producer可以繼續阻塞或者將消息拋棄,此timeout值用於控制"阻塞"的時間 #-1: 不限制阻塞超時時間,讓produce一直阻塞,這個時候消息就不會被拋棄 #0: 立即清空隊列,消息被拋棄 queue.enqueue.timeout.ms=-1 #當producer接收到error ACK,或者沒有接收到ACK時,允許消息重發的次數 #因為broker並沒有完整的機制來避免消息重復,所以當網絡異常時(比如ACK丟失) #有可能導致broker接收到重復的消息,默認值為3. message.send.max.retries=3 #producer刷新topic metada的時間間隔,producer需要知道partition leader #的位置,以及當前topic的情況 #因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時, #將會立即刷新 #(比如topic失效,partition丟失,leader失效等),此外也可以通過此參數來配置 #額外的刷新機制,默認值600000 topic.metadata.refresh.interval.ms=60000
kafka的消費者配置(路徑和生產者配置文件路徑相同),名字叫做.consumer.properties
#消費者集群通過連接Zookeeper來找到broker。 #zookeeper連接服務器地址 zookeeper.connect=zk01:2181,zk02:2181,zk03:2181 #zookeeper的session過期時間,默認5000ms,用於檢測消費者是否掛掉 zookeeper.session.timeout.ms=5000 #當消費者掛掉,其他消費者要等該指定時間才能檢查到並且觸發重新負載均衡 zookeeper.connection.timeout.ms=10000 #這是一個時間閾值。 #指定多久消費者更新offset到zookeeper中。 #注意offset更新時基於time而不是每次獲得的消息。 #一旦在更新zookeeper發生異常並重啟,將可能拿到已拿到過的消息 zookeeper.sync.time.ms=2000 #指定消費 group.id=xxxxx #這是一個數量閾值,經測試是500條。 #當consumer消費一定量的消息之后,將會自動向zookeeper提交offset信息#注意offset信息並不是每消費一次消息就向zk提交 #一次,而是現在本地保存(內存),並定期提交,默認為true auto.commit.enable=true # 自動更新時間。默認60 * 1000 auto.commit.interval.ms=1000 # 當前consumer的標識,可以設定,也可以有系統生成, #主要用來跟蹤消息消費情況,便於觀察 conusmer.id=xxx # 消費者客戶端編號,用於區分不同客戶端,默認客戶端程序自動產生 client.id=xxxx # 最大取多少塊緩存到消費者(默認10) queued.max.message.chunks=50 # 當有新的consumer加入到group時,將會reblance,此后將會 #有partitions的消費端遷移到新 的consumer上,如果一個 #consumer獲得了某個partition的消費權限,那么它將會向zk #注冊 "Partition Owner registry"節點信息,但是有可能 #此時舊的consumer尚沒有釋放此節點, 此值用於控制, #注冊節點的重試次數. rebalance.max.retries=5 #每拉取一批消息的最大字節數 #獲取消息的最大尺寸,broker不會像consumer輸出大於 #此值的消息chunk 每次feth將得到多條消息,此值為總大小, #提升此值,將會消耗更多的consumer端內存 fetch.min.bytes=6553600 #當消息的尺寸不足時,server阻塞的時間,如果超時, #消息將立即發送給consumer #數據一批一批到達,如果每一批是10條消息,如果某一批還 #不到10條,但是超時了,也會立即發送給consumer。 fetch.wait.max.ms=5000 socket.receive.buffer.bytes=655360 # 如果zookeeper沒有offset值或offset值超出范圍。 #那么就給個初始的offset。有smallest、largest、 #anything可選,分別表示給當前最小的offset、 #當前最大的offset、拋異常。默認largest auto.offset.reset=smallest # 指定序列化處理類 derializer.class=kafka.serializer.DefaultDecoder
以上內容轉載自:https://www.cnblogs.com/jun1019/p/6256371.html
硬件的選擇
1.磁盤吞吐量
生產者客戶端的性能直接受到服務器端磁盤吞吐量的影響。生產者生成的消息必須被提交到服務器保存,大多數客戶端在發送消息后會一直等待,直到至少有一個服務器確認消息已經提交成功為止。也就是說,磁盤寫入速度越快,生成消息的延遲就越低。機械硬盤(HDD)和固態盤(SSD)。固態盤的查找和訪問速度都很快,提供了最好的性能。機械盤更便宜,單塊容量也更大。在同一個服務器上使用多個機械盤,可以設置多個數據目錄,或者把它們設置成磁盤陣列,這樣可以提升機械硬盤的性能。
2.磁盤容量
磁盤容量要多大取決於需要保存的消息的數量。
3.內存
服務器端的內存容量是影響客戶端性能的主要因素,磁盤性能影響生產者,而內存影響消費者。消費者一般從分區尾部讀取消息,如果有生產者存在,就緊跟在生產者后面。這種情況下,消費者讀的消息會直接存放在系統的頁面緩存里,這比從磁盤上重新讀取要快。不建議把kafka同其他重要的應用部署在一起,因為它們需要共享頁面緩存,最終會降低kafka消費者的性能。
4.網絡
網絡吞吐量決定了kafka能夠處理的最大數據流量。它和磁盤性能是制約kafka擴展規模的主要因素。kafka支持多個消費者,造成流入和流出的網絡流量不平衡,從而讓情況變得更加復雜。對於給定的主題,一個生產者可能每秒中寫入1MB數據,但可能同時有多個消費者瓜分網絡流量。其它操作也會占用網絡流量。
5.CPU
kafka對計算處理能力的要求較低,不過他在一定程度上還是會影響整體性能。客戶端為了優化網絡和磁盤空間,會對消息進行壓縮。服務器需要對消息進行批量解壓,設置偏移量,然后重新進行批量壓縮,再保存到磁盤上。這就是kafka對計算能力有所要求的地方。
使用kafka集群的最大好處就是可以跨服務器進行負載均衡,再則就是可以使用復制功能來避免因單點故障造成的數據丟失。在維護kafka或底層系統時,使用集群可以確保為客戶端提供高可用性。