Kafka動態增加Topic的副本


一、kafka的副本機制

由於Producer和Consumer都只會與Leader角色的分區副本相連,所以kafka需要以集群的組織形式提供主題下的消息高可用。kafka支持主備復制,所以消息具備高可用和持久性。

    一個分區可以有多個副本,這些副本保存在不同的broker上。每個分區的副本中都會有一個作為Leader。當一個broker失敗時,Leader在這台broker上的分區都會變得不可用,kafka會自動移除Leader,再其他副本中選一個作為新的Leader。

在通常情況下,增加分區可以提供kafka集群的吞吐量。然而,也應該意識到集群的總分區數或是單台服務器上的分區數過多,會增加不可用及延遲的風險。

 

關於副本的更多信息,請參考鏈接:

https://blog.csdn.net/weixin_38750084/article/details/82942564

 

二、概述

目前的kakfa集群有3個節點,server.properties 關於topic的配置為:

offsets.topic.replication.factor=1                                                                                                                                                                                           
transaction.state.log.replication.factor=1                                                                                                                                                                                   
transaction.state.log.min.isr=1    

 

目前的設置為1個副本,這樣不健全。如果有一台服務器掛掉了,那么就會造成數據丟失!

因此,需要將副本數改為3,也就是每台服務器都有一個副本,這樣才是穩妥的!

 

三、動態擴容

kafka-topics.sh 不能用來增加副本因子replication-factor。實際應該使用kafka bin目錄下面的kafka-reassign-partitions.sh

 

查看topic詳情

首先查看kafka的所有topic

/kafka/bin/kafka-topics.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --list

輸出:

test
...

 

查看topic為test的詳細信息

/kafka/bin/kafka-topics.sh --describe --zookeeper zookeeper-1.default.svc.cluster.local:2181 --topic test

 

輸出:

Topic:test    PartitionCount:3    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 1    Replicas: 1    Isr: 1
    Topic: test    Partition: 1    Leader: 2    Replicas: 2    Isr: 2
    Topic: test    Partition: 2    Leader: 3    Replicas: 3    Isr: 3

 

可以看到test的副本數為1

 

擴容副本

kafka-reassign-partitions.sh 執行時,依賴一個json文件。

創建 test.json

{
    "version": 1,
    "partitions": [
        {
            "topic": "test",
            "partition": 0,
            "replicas": [
                1,
                2,
                3
            ]
        },
        {
            "topic": "test",
            "partition": 1,
            "replicas": [
                1,
                2,
                3
            ]
        },
        {
            "topic": "test",
            "partition": 2,
            "replicas": [
                1,
                2,
                3
            ]
        }
    ]
}

 

注意:這個json文件和上面查看的test詳情,是有關聯的!否則會導致執行失敗

關系圖

 

 

正式執行腳本

/kafka/bin/kafka-reassign-partitions.sh --zookeeper  zookeeper-1.default.svc.cluster.local:2181 --reassignment-json-file test.json --execute

 

參數解釋:

--reassignment-json-file 帶有分區的JSON文件
--execute 按規定啟動重新分配通過---重新分配JSON文件選擇權。

 

執行輸出:

Current partition replica assignment

{"version":1,"partitions":[{"topic":"test","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"test","partition":1,"replicas":[3],"log_dirs":["any"]},{"topic":"test","partition":0,"replicas":[2],"log_dirs":["any"]}]}

 

出現 Successfully 表示成功了!

 

再次查看topic為test的partition詳情

/kafka/bin/kafka-topics.sh --describe --zookeeper zookeeper-1.default.svc.cluster.local:2181 --topic test

 

輸出:

Topic:test    PartitionCount:3    ReplicationFactor:3    Configs:
    Topic: test    Partition: 0    Leader: 2    Replicas: 1,2,3    Isr: 2,3,1
    Topic: test    Partition: 1    Leader: 3    Replicas: 1,2,3    Isr: 3,1,2
    Topic: test    Partition: 2    Leader: 1    Replicas: 1,2,3    Isr: 1,3,2

 

可以發現,副本已經改為3了!

 

默認配置

在java代碼或者python代碼中,是直接發送生產者消息。topic的名字是動態生成的(當kafka發現topic不存在時,會自動創建),那么它的partitions和replication-factor的數量是由服務端決定的

因為kafka集群有3個節點,所有需要改成3個

offsets.topic.replication.factor=3                                                                                                                                                                                           
transaction.state.log.replication.factor=3                                                                                                                                                                                   
transaction.state.log.min.isr=3
num.partitions=1
default.replication.factor=3

 

參數解釋:

offsets.topic.replication.factor 用於配置offset記錄的topic的partition的副本個數
transaction.state.log.replication.factor 事務主題的復制因子
transaction.state.log.min.isr 覆蓋事務主題的min.insync.replicas配置

num.partitions 新建Topic時默認的分區數

default.replication.factor 自動創建topic時的默認副本的個數

 

注意:這些參數,設置得更高以確保高可用性!

其中 default.replication.factor 是真正決定,topi的副本數量的

 

關於kafka配置文件的更多解釋,請參考鏈接:

https://blog.csdn.net/memoordit/article/details/78850086

 

那么默認參數,如何測試呢?

很簡單,由於在應用代碼,是不會主動創建topic的,由kafka集群自動創建topic。

那么由代碼進行一次,生產者和消費者,就可以了!

 

Python測試

這個腳本是普通版的kafka消息測試,沒有ACL配置!

 

test.py

#!/usr/bin/env python3
# coding: utf-8
import sys
import io

def setup_io():  # 設置默認屏幕輸出為utf-8編碼
    sys.stdout = sys.__stdout__ = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True)
    sys.stderr = sys.__stderr__ = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True)
setup_io()

import time
from kafka import KafkaProducer
from kafka import KafkaConsumer

class KafkaClient(object):
    def __init__(self, kafka_server, port, topic, content):
        self.kafka_server = kafka_server  # kafka服務器ip地址
        self.port = port  # kafka端口
        self.topic = topic  # topic名
        self.content = content # 內容

    def producer(self):
        producer = KafkaProducer(bootstrap_servers=['%s:%s' % (kafka_server, port)])
        producer.send(topic, content)
        producer.flush()  # flush確保所有meg都傳送給broker
        producer.close()
        return producer

    def consumer(self):
        consumer = KafkaConsumer(topic, group_id='test_group', bootstrap_servers=['%s:%s' % (kafka_server, port)])
        # consumer.close()
        return consumer

    def main(self):
        startime = time.time()  # 開始時間

        client = KafkaClient(self.kafka_server, self.port, self.topic, self.content)  # 實例化客戶端

        client.producer()  # 執行生產者
        print("已執行生產者")
        consumer = client.consumer()  # 執行消費者
        print("已執行消費者")
        print("等待結果輸出...")
        flag = False
        for msg in consumer:
            # recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
            # 判斷生產的消息和消費的消息是否一致
            print(msg.value)
            # print(self.content)
            if msg.value == self.content:
                flag = True
                break

        consumer.close()  # 關閉消費者對象
        endtime = time.time()  # 結束時間

        if flag:
            # %.2f %(xx) 表示保留小數點2位
            return "kafka驗證消息成功,花費時間", '%.2f 秒' % (endtime - startime)
        else:
            return "kafka驗證消息失敗,花費時間", '%.2f 秒' % (endtime - startime)


if __name__ == '__main__':
    kafka_server = "kafka-1.default.svc.cluster.local"
    port = "9092"
    topic = "test_xxx"
    content = "hello honey".encode('utf-8')

    client = KafkaClient(kafka_server,port,topic,content)  # 實例化客戶端
    print(client.main())
View Code

 

這里指定的topic為 test_xxx

執行Python腳本,然后到服務器上面,查看topic為test_xxx的詳細信息

/kafka/bin/kafka-topics.sh --describe --zookeeper zookeeper-1.default.svc.cluster.local:2181 --topic test_xxx

 

輸出如下:

Topic:test_xxx    PartitionCount:3    ReplicationFactor:3    Configs:
    Topic: test_xxx    Partition: 0    Leader: 2    Replicas: 1,2,3    Isr: 2,3,1
    Topic: test_xxx    Partition: 1    Leader: 3    Replicas: 1,2,3    Isr: 3,1,2
    Topic: test_xxx    Partition: 2    Leader: 1    Replicas: 1,2,3    Isr: 1,3,2

 

可以發現副本為3,說明默認配置生效了!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM