Kafka平滑滾動升級2.4.0指南


今天測試了下kafka從2.0.0滾動升級至2.4.0,下面做一下記錄。這個鏈接是Kafka官網對升級2.4.0的指南,可以參考  http://kafka.apache.org/24/documentation.html#upgrade
好了,步入正題吧!
首先,線上環境,在對kafka滾動升級的過程中,一定是不能影響業務運行的吧,否則一頓操作猛如虎,業務罵你二百五。
所以,我這里搭建了三台節點的Kafka2.0.0集群,寫了一個生產者、一個消費者,來模擬業務運行。附一下客戶端代碼:

Producer

package org.bigdata.kafka.eye.web.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Random;

/**
 * @author Oliver Shen
 * @date 2020/4/22 11:00
 */
public class Producer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.put("acks", "1");
        props.put("retries", 2);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        try {
            while (true) {
                Random random = new Random();
                int i = random.nextInt(10000);
                producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i)));
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            producer.close();
        }
    }
}

Consumer

package org.bigdata.kafka.eye.web.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * @author Oliver Shen
 * @date 2020/4/22 11:12
 */
public class Consumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.put("group.id", "bing-test-group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    }
}

我是將客戶端打成jar包上傳到服務器執行的,並后台運行,日志打到文件里,可以用來查看升級過程中對客戶端是否有影響。

java -jar producer.jar > producer.log 2>&1 &

java -jar consumer.jar > consumer.log 2>&1 &

Kafka滾動升級,對0.11.0版本之后的版本,不需要修改消息格式,如果你是從0.11.0之前升級到2.4.0,你還需要升級消息格式:配置該參數log.message.format.version,2.0.0和2.4.0消息格式同步,這個參數可以不做。

第一步:修改server.properties

向所有節點的server.properties增加一個屬性:

inter.broker.protocol.version=2.0

修改保存后,依次重啟所有節點,注意觀察日志,保證Kafka正常重啟。

重啟過程中,觀察isr同步,最好等isr同步完成再進行下一個節點的重啟操作。

 

第二步:上傳最新的2.4.0包

上傳新包到三台節點,並將原包里的所有配置文件復制到新包的配置下。

 

第三步:滾動重啟

各節點依次:停止舊服務,將日志文件備份,並清空日志目錄 ,啟動新服務,觀察日志,確保沒有任何問題再進行下一個節點操作。

 

第四步:修改協議版本

修改所有新包的server.properties:將下面的配置改成2.4

inter.broker.protocol.version=2.4

依次滾動重啟個節點,滾動重啟過程,觀察日志,可以確保下isr同步。

 

至此,滾動升級完成。過程中,並沒有發現客戶端受到了影響,或許是因為客戶端內容過於簡單。

這里只是在測試環境進行的一次升級測試,如有錯誤,還望指教。

歡迎關注我的微信公眾號《小沈干貨》獲取更多學習內容。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM