Kafka版本升級 ( 0.10.0 -> 0.10.2 )


升級Kafka集群的版本其實很簡單,核心步驟只需要4步,但是我們需要在升級的過程中確保每一步操作都不會“打擾”到producer和consumer的正常運轉。為此,筆者在本機搭了一個測試環境進行實際的版本升級實驗。在開始之前,簡要介紹一下測試環境的部署情況及目標:Kafka 0.10.0.0 雙broker測試環境,而目標是把該集群升級到0.10.2版本

 

兩個broker啟動時分別讀取server.properties和server2.properties。

一、啟動測試環境
打開兩個終端,分別執行startBroker1.sh和startBroker2.sh。startBroker*.sh內容很簡單就是:

CURRENT_PATH=<your_path>/kafka_2.11-0.10.0.0
cd $CURRENT_PATH
JMX_PORT=9997 bin/kafka-server-start.sh ../configs/server.properties

二、創建測試topic
創建一個雙分區,replication-factor=2的topic:test,然后使用kafka-topics工具describe一下:

okay,目前一切正常。

三、啟動producer

很簡單的producer程序,每1秒發送一條消息,然后打印成功提交的消息數和提交失敗的消息數。特別注意提交失敗的消息數,后續我們依賴此值來確保升級流程不會影響到producer。 主要程序代碼如下:

 1 Properties props = new Properties();
 2         props.put("bootstrap.servers", "localhost:9092,localhost:9093");
 3         props.put("acks", "all");
 4         props.put("retries", Integer.MAX_VALUE);
 5         props.put("batch.size", 16384);
 6         props.put("linger.ms", 1);
 7         props.put("buffer.memory", 33554432);
 8         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 9         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
10 
11         Producer<String, String> producer = new KafkaProducer<>(props);
12         final AtomicInteger success = new AtomicInteger(0);
13         final AtomicInteger failed = new AtomicInteger(0);
14         try {
15             while (true) {
16                 producer.send(new ProducerRecord<String, String>("test", "a message"), new Callback() {
17                     @Override
18                     public void onCompletion(RecordMetadata metadata, Exception exception) {
19                         if (exception != null) {
20                             System.out.println("Current failed count: " + failed.incrementAndGet());
21                         } else
22                             System.out.println("Current success count: " + success.incrementAndGet()
23                                     + ", failed: " + failed.get());
24                     }
25                 });
26                 Thread.sleep(2000);
27 
28             }
29         } finally {
30             producer.close();
31

四、啟動consumer

為簡單起見,我使用了console-consumer,如下所示。另外, 是Kafka 0.10.0.0版本,所以一定要加上`--new-consumer`才能使用新版本consumer!
bin/kafka-console-consumer.sh --topic test --from-beginning --new-consumer --bootstrap-server localhost:9092,localhost:9093

此時,你應該可以看到producer和consumer都可以正常地工作。

 

----------------------------- 升級的流程正式開始 -----------------------------

切記: 每做完一步都要觀察producer和consumer是否出現嚴重錯誤!

五、 更新broker間通訊版本號和消息格式版本
向所有broker的server.properties文件中增加下面兩行:

inter.broker.protocol.version=0.10.0
log.message.format.version=0.10.0

六、依次更新代碼,重啟所有broker
注意一定要依次重啟,即先重啟broker1,然后再重啟broker2


七、再次更新broker間通訊版本和消息格式版本

inter.broker.protocol.version=0.10.2
log.message.format.version=0.10.2

注意,這次要更新成你要升級到的目標版本。比如我們要升級到0.10.2,那么就更新為0.10.2

八、再次依次重啟broker
依然要依次重啟

 

好了,當前集群版本已經升級完畢了。值得一提的是,在整個升級過程中producer應該是可以正常工作的,但consumer可能會出現位移提交失敗的警告,因此有可能會造成重復消費,而broker端可能會出現“org.apache.kafka.common.errors.NotLeaderForPartitionException”異常,因為__consumers_offsets各分區的leader有可能會發生瞬時的變化,因此通常也是不必在意的。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM