kafka線上滾動升級方案記錄


 正文前先來一波福利推薦:

福利一:

百萬年薪架構師視頻,該視頻可以學到很多東西,是本人花錢買的VIP課程,學習消化了一年,為了支持一下女朋友公眾號也方便大家學習,共享給大家。

福利二:

畢業答辯以及工作上各種答辯,平時積累了不少精品PPT,現在共享給大家,大大小小加起來有幾千套,總有適合你的一款,很多是網上是下載不到。

獲取方式:

微信關注 精品3分鍾 ,id為 jingpin3mins,關注后回復   百萬年薪架構師 ,精品收藏PPT  獲取雲盤鏈接,謝謝大家支持!

 

------------------------正文開始---------------------------

kafka升級方案

為什么進行kafka升級

一、修改unclean.leader.election.enabled默認值
Kafka社區終於下定決心要把這個參數的默認值改成false,即不再允許出現unclean leader選舉的情況,在正確性和高可用性之間選擇了前者。如果依然要啟用它,用戶需要顯式地在server.properties中設置這個參數=true

二、確保offsets.topic.replication.factor參數被正確應用
__consumer_offsets這個topic是Kafka自動創建的,在創建的時候如果集群broker數<offsets.topic.replication.factor,原先的版本取其小者,但這會違背用戶設置該參數的初衷。因此在0.11版本中這個參數會被強制遵守,如果不滿足該參數設定的值,會拋出GROUP_COORDINATOR_NOT_AVAILABLE。

三、優化了對Snappy壓縮的支持
之前由於源代碼中硬編碼了block size,使得producer使用Snappy時的表現比LZ4相差很多,但其實Snappy和LZ4兩者之差距不應該很大。故此0.11版本中對Snappy的默認block size做了調整。不過這一點需要詳盡的性能測試報告來證明此改動是有效的。

四、消息增加頭部信息(Header)
Record增加了Header,每個header是一個KV存儲。具體的header設計參見KIP-82

五、空消費者組延時rebalance
為了縮短多consumer首次rebalance的時間,增加了“group.initial.rebalance.delay.ms”用於設置group開啟rebalance的延時時間。這段延時期間允許更多的consumer加入組,避免不必要的JoinGroup與SyncGroup之間的切換。當然凡事都是trade-off,引入這個必然帶來消費延時。

六、消息格式變更
增加最新的magic值:2,還增加了header信息。同時為了支持冪等producer和EOS,增加一些與事務相關的字段,使得單個record數據結構體積增加。但因為優化了RecordBatch使得整個batch所占體積反而減少,進一步降低了網絡IO開銷。

七、新的分配算法:StickyAssignor
比range和round-robin更加平衡的分配算法。指定partition.assignment.strategy = org.apache.kafka.clients.consumer.StickyAssignor可以嘗嘗鮮。不過根據我的經驗,分配不均勻的情況通常發生在每個consumer訂閱topic差別很大的時候。比如consumer1訂閱topic1, topic2, topic4, consumer2訂閱topic3, topic4這種情況

八、controller重設計
Controller原來的設計非常復雜,使得社區里面的人幾乎不敢改動controller代碼。老版本controller的主要問題在我看來有2個:1. controller需要執行1,2,3,4,5,6步操作,倘若第3步出錯了,無法回滾前兩步的操作;2. 多線程訪問,多個線程同時訪問Controller上下文信息。0.11版本部分重構了controller,采用了單線程+基於事件隊列的方式。具體效果咱們拭目以待吧~~

九、支持EOS
0.11最重要的功能,沒有之一!EOS是流式處理實現正確性的基石。主流的流式處理框架基本都支持EOS(如Storm Trident, Spark Streaming, Flink),Kafka streams肯定也要支持的。0.11版本通過3個大的改動支持EOS:1.冪等的producer(這也是千呼萬喚始出來的功能);2. 支持事務;3. 支持EOS的流式處理(保證讀-處理-寫全鏈路的EOS)

方案一:

接受停機升級,關閉0.9.0.1版本的kafka,然后按照正常步驟啟動kafka0.11.0.3
版本,然后升級后台所有涉及kafka的模塊;
優點:過程簡單,無突發異常,只有正常啟動新版本即可使用;
不足:關閉老版本,啟動新版本的過程中,存在部分線上數據丟失的情況,此種情況推薦在凌晨數據量少的時候使用;

方案二:

使用滾動升級方案,方案步驟如下:

 

升級步驟.png
升級步驟.png

 

注意:由於引入了新的協議,要在升級客戶端之前先升級kafka集群(即,0.10.1.x僅支持 0.10.1.x或更高版本的broker,但是0.10.1.x的broker向下支持舊版本的客戶端)
第一步:
先把新版安裝包拷貝到對應機器上,並解壓。

第二步:
更新所有broker(新版本)上的配置文件config/server.properties
inter.broker.protocol.version=0.9.0.1 (舊版本號)
log.message.format.version=0.9.0.1 (現正在使用client端版本號)
其他配置保持不變,特別是數據存儲目錄,沒改變,注意對應修改broker id號、ip、zookeeper

第三步:
先停一台舊版本broker,更改環境變量指向新版,啟動新版broker

第四步:
循環執行第三步,直到集群中所有有borker都更新到新版。
注意:替換新版broker后,注意查看新版broker是否已經注冊到zookeeper,所在機器上的的副本是否已經可用。確定可用之后再更新下一台broker。

第五步:
確定上訴步驟已經執行完畢,並且集群一切正常后,修改所有新版配置文件server.properties
inter.broker.protocol.version=0.11.0.3 (新版本號)
log.message.format.version=0.9.0.1 (現正在使用client端版本號)
注意:log.message.format.version這里要等client端的版本升級后再做修改。如果之前的消息格式是0.10.0,則將log.message.format.version更改為0.10.1(這無影響,因為0.10.0和0.10.1的消息格式是相同的)。 如果之前的消息格式版本低於.10.0,還不能更改log.message.format.version - 一旦所有的消費者都已升級到 0.10.0.0 或更高版本時,才能更改此參數。

第六步:
逐個重啟borker。
如果log.message.format.version低於0.10.0,請等待,知道所有消費者升級到0.10.0或更新的版本,然后將每個broker的log.message.format.version更改為0.10.1。然后逐個重啟。

注意:變換協議版本和重啟啟動可以在broker升級完成后的任何時間去做,不必馬上做。

方案二過程示例:

啟動0.9.0.1版本

 

guxiaoyong1.png
guxiaoyong1.png
guxiaoyong2.png
guxiaoyong2.png
guxiaoyong3.png
guxiaoyong3.png

創建topc,生產者,消費者,觀察收發情況:
在guxiaoyong3創建topic

 

topic.png
topic.png

 

在 guxiaoyong3創建生產者

 

生產者.png
生產者.png

 

發送消息.png
發送消息.png

guxiaoyong1和guxiaoyong2上進行消費;

 

消費 topic.png
消費 topic.png

 

消費topic.png
消費topic.png

可以看到收發正常,現在進行guxiaoyong1上進行版本升級,修改service.conf協議版本:

 

image.png
image.png

 

關閉之前的舊版本kafka,啟動新的kafka:

 

image.png
image.png

 

image.png
image.png

測試其他的兩個kafka是收發正常的:

 

image.png
image.png
image.png
image.png

升級guxiaoyong2的kafka:

 

image.png
image.png
image.png
image.png

測試guxiaoyong1 guxiaoyong2是否收發正常

 

image.png
image.png

 

image.png
image.png

 

image.png
image.png

更新guxiaoyong3,檢測收發是否正常:

 

image.png
image.png
image.png
image.png
image.png
image.png

 

可以看到全部收到正常;

接下來更改所有配置文件中的inter.broker.protocol.version=0.11.0.3,依次重啟kafka,完成升級;

 

image.png
image.png
image.png
image.png
image.png
image.png

可以看到所有的消息收到正常;
接下來,把項目項目代碼中的消費者更新到0.11.0.3,進行項目灰度發布,然后重新修改kafka配置文件中log.message.format.version=0.9.0.1,進行依次重啟。完成最終升級。

項目代碼修改

修改客戶端的版本:

 

image.png
image.png

注意spring與kafka版本的關聯關系:

 

image.png
image.png

修改代碼中部分配置:

 

image.png
image.png

驗證是否開啟了壓縮功能:

 

image.png
image.png

還可以使用DumpLogSegments工具,並替換您的目錄位置/日志文件名稱;
使用
./kafka-run-class.sh kafka.tools.DumpLogSegments -files ../../kafkalogs/risk_api_msg_test-2/00000000000000000000.log -print-data-log
查看topic下的數據屬性:

 

image.png
image.png


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM