kafka搭建二、集群搭建


系列導航

一、kafka搭建-單機版

二、kafka搭建-集群搭建

三、kafka集群增加密碼驗證

四、kafka集群權限增加ACL

五、kafka集群__consumer_offsets副本數修改

六、java操作kafka(沒有密碼驗證)

七、java操作kafka(有密碼驗證)

kafka集群搭建


本章講解如何安裝一個由三台機器組成的kafka集群,搭建完成該集群就可以在生產環境上使用了
三台服務器的ip 192.168.0.104,192.168.0.105,192.168.0.106

需要的軟件包:
1、jdk1.8安裝包       jdk-8u211-linux-x64.tar
2、zookeeper的安裝包  zookeeper-3.4.14.tar
3、kafka的安裝包      kafka_2.11-2.1.1.tgz    

(一)zookeeper 搭建(三台機器都要操作)
        1、軟件環境
          192.168.0.104 server1
          192.168.0.105 server2
          192.168.0.106 server3
          
    相關的安裝包拷入/opt/kafka下
    cd  /opt
    mkdir kafka
    
        2、安裝jdk1.8
           (1)解壓安裝包
             tar -xvf jdk-8u211-linux-x64.tar
                                     
           (2)移動到安裝目錄
                        mv jdk1.8.0_211  /usr/local

        (3)設置環境變量
            vi  /etc/profile
 
            export JAVA_HOME=/usr/local/jdk1.8.0_211
            export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
            export PATH=$JAVA_HOME/bin:$PATH

            source /etc/profile //讓配置生效

        (4)測試是否安裝成功
            cd  /
            echo $JAVA_HOME
            echo $PATH
            echo $CLASSPATH
            java -version 查看版本
 
        3、創建目錄
            mkdir /opt/zookeeper   #項目目錄
            mkdir /opt/zookeeper/zkdata      #存放快照日志
            mkdir /opt/zookeeper/zkdatalog   #存放事物日志
 
            將zookeeper-3.4.14.tar放到/opt/zookeeper/目錄下
            cp zookeeper-3.4.14.tar /opt/zookeeper/
            
                    tar  -xvf zookeeper-3.4.14.tar
            
        4、修改配置信息
          cd /opt/zookeeper/zookeeper-3.4.14/conf        
           cp zoo_sample.cfg  zoo.cfg
           vi zoo.cfg
            #添加如下內容
            tickTime=2000
            initLimit=10
            syncLimit=5
            dataDir=/opt/zookeeper/zkdata
            dataLogDir=/opt/zookeeper/zkdatalog
            clientPort=12181
            server.1=192.168.0.104:12888:13888
            server.2=192.168.0.105:12888:13888
            server.3=192.168.0.106:12888:13888
            
        5、創建myid文件    
            #server1
            echo "1" > /opt/zookeeper/zkdata/myid
            #server2
            echo "2" > /opt/zookeeper/zkdata/myid
            #server3
            echo "3" > /opt/zookeeper/zkdata/myid
            
        6、主要的shell
        /opt/zookeeper/zookeeper-3.4.14/bin
        zkServer.sh 主的管理程序文件
        zkEnv.sh 是主要配置,zookeeper集群啟動時配置環境變量的文件
           
        7、啟動服務並查看   
            cd /opt/zookeeper/zookeeper-3.4.14/bin
            #啟動服務(3台都需要操作)
            ./zkServer.sh start
            #檢查服務器狀態
            ./zkServer.sh status
            ZooKeeper JMX enabled by default
            Using config: /opt/zookeeper/zookeeper-3.4.14/bin/../conf/zoo.cfg
            Mode: follower   #他是否為領導
                
            可以用“jps”查看zk的進程,這個是zk的整個工程的main
            #執行命令jps
            [root@zhu bin]# jps
            27813 QuorumPeerMain
            27909 Jps    
 

        8、安裝scala 2.11
         tar -xvf scala-2.11.12.tar

             vi  /etc/profile 添加如下內容:
                 export SCALA_HOME=/opt/kafka/scala-2.11.12
                 export PATH=$SCALA_HOME/bin:$PATH    
         source /etc/profile //讓配置生效
        
        
        
-------------------------------kafka集群安裝-------------------------------------------------------------    
    
 kafka的日志位置:/opt/kafka/kafka_2.11-2.1.1/logs    
    
(二)kafka集群 搭建(三台機器)    
    1、創建目錄
           mkdir -p /opt/kafka/kafkalogs
        
    2、解壓安裝包
        cd /opt/kafka
        tar -zxvf kafka_2.11-2.1.1.tgz    

        3、修改配置文件
           cd /opt/kafka/kafka_2.11-2.1.1/config
           vi     server.properties  #根據正式庫的配置修改該文件
           #添加如下內容 注意不同機器的ip不同
       #192.168.0.104主機
        broker.id=104
        listeners=PLAINTEXT://192.168.0.104:9092
        port=9092
        advertised.listeners=PLAINTEXT://192.168.0.104:9092
        advertised.port=9092
        host.name=192.168.0.104
        advertised.host.name=192.168.0.104
        num.network.threads=3
        num.io.threads=8
        socket.send.buffer.bytes=102400
        socket.receive.buffer.bytes=102400
        socket.request.max.bytes=104857600
        log.dirs=/opt/kafka/kafkalogs/
        num.partitions=2
        num.recovery.threads.per.data.dir=1
        offsets.topic.replication.factor=1
        transaction.state.log.replication.factor=1
        transaction.state.log.min.isr=1
        log.retention.hours=168
        log.segment.bytes=1073741824
        log.retention.check.interval.ms=300000
        zookeeper.connect=192.168.0.104:12181,192.168.0.105:12181,192.168.0.106:12181
        zookeeper.connection.timeout.ms=60000
        group.initial.rebalance.delay.ms=0
        #inter.broker.protocol.version=0.9.0
        #log.message.format.version=0.9.0
        unclean.leader.election.enable=true
        auto.create.topics.enable=true
            default.replication.factor=3
            
 
            
        #192.168.0.105主機
        broker.id=105
        listeners=PLAINTEXT://192.168.0.105:9092
        port=9092
        advertised.listeners=PLAINTEXT://192.168.0.105:9092
        advertised.port=9092
        host.name=192.168.0.105
        advertised.host.name=192.168.0.105
        num.network.threads=3
        num.io.threads=8
        socket.send.buffer.bytes=102400
        socket.receive.buffer.bytes=102400
        socket.request.max.bytes=104857600
        log.dirs=/opt/kafka/kafkalogs/
        num.partitions=2
        num.recovery.threads.per.data.dir=1
        offsets.topic.replication.factor=1
        transaction.state.log.replication.factor=1
        transaction.state.log.min.isr=1
        log.retention.hours=168
        log.segment.bytes=1073741824
        log.retention.check.interval.ms=300000
        zookeeper.connect=192.168.0.104:12181,192.168.0.105:12181,192.168.0.106:12181
        zookeeper.connection.timeout.ms=60000
        group.initial.rebalance.delay.ms=0
        #inter.broker.protocol.version=0.9.0
        #log.message.format.version=0.9.0
        unclean.leader.election.enable=true
        auto.create.topics.enable=true
            default.replication.factor=3
            
        #192.168.0.106主機
        broker.id=106
        listeners=PLAINTEXT://192.168.0.106:9092
        port=9092
        advertised.listeners=PLAINTEXT://192.168.0.106:9092
        advertised.port=9092
        host.name=192.168.0.106
        advertised.host.name=192.168.0.106
        num.network.threads=3
        num.io.threads=8
        socket.send.buffer.bytes=102400
        socket.receive.buffer.bytes=102400
        socket.request.max.bytes=104857600
        log.dirs=/opt/kafka/kafkalogs/
        num.partitions=2
        num.recovery.threads.per.data.dir=1
        offsets.topic.replication.factor=1
        transaction.state.log.replication.factor=1
        transaction.state.log.min.isr=1
        log.retention.hours=168
        log.segment.bytes=1073741824
        log.retention.check.interval.ms=300000
        zookeeper.connect=192.168.0.104:12181,192.168.0.105:12181,192.168.0.106:12181
        zookeeper.connection.timeout.ms=60000
        group.initial.rebalance.delay.ms=0
        #inter.broker.protocol.version=0.9.0
        #log.message.format.version=0.9.0
        unclean.leader.election.enable=true
        auto.create.topics.enable=true
            default.replication.factor=3
           
    4、啟動服務並測試
        
         設置環境變量
         vi  /etc/profile 添加如下內容:
                 export KAFKA_HOME=/opt/kafka/kafka_2.11-2.1.1
                 export PATH=$KAFKA_HOME/bin:$PATH
         source /etc/profile //讓配置生效
        
          
         #啟動
            [root@zhu bin]#  kafka-server-start.sh -daemon ../config/server.properties
            [root@zhu bin]# jps
                27813 QuorumPeerMain
                28847 Kafka
                28882 Jps
            
        #關閉    
            [root@zhu bin]# jps
                27813 QuorumPeerMain
                28847 Kafka
     
            
-------------------------------常用操作-----------------------------------------------        
    5、 創建Topic來驗證是否創建成功
        #創建Topic
        kafka-topics.sh --create --zookeeper 192.168.0.104:12181 --replication-factor 2 --partitions 2 --topic yc
            #解釋
        --replication-factor 2   #復制兩份
        --partitions 1 #創建1個分區
        --topic #主題為yc
 
 
        刪除 topic
            kafka-topics.sh --delete --zookeeper 192.168.0.104:12181   --topic yc
            
    6、相關命令
            (1)查看topic
        kafka-topics.sh --list --zookeeper localhost:12181或者 kafka-topics.sh --list --zookeeper 192.168.0.104:12181
        #就會顯示我們創建的所有topic         
     
 
       (2)查看topic狀態
            [root@zhu bin]#  kafka-topics.sh --describe --zookeeper localhost:12181 --topic yc
                Topic:yc    PartitionCount:2    ReplicationFactor:2    Configs:
                            Topic: yc    Partition: 0    Leader: 104    Replicas: 104,105    Isr: 104,105
                            Topic: yc    Partition: 1    Leader: 105    Replicas: 105,106    Isr: 105,106
                #分區為為2  復制因子為2   他的  yc的分區為0和1
                
                #說明
                partiton: partion id
                leader:當前負責讀寫的lead broker id
                replicas:當前partition的所有replication broker  list
                isr:relicas的子集,只包含出於活動狀態的broker
                
    
      
        (3)重新分配partition
             bin/kafka-reassign-partitions.sh

                        --zookeeper <urls> 指定zookeeper的連接地址,格式host:port
                        --broker-list<brokerlist> 指定partition需要重新分配到哪些節點,格式為”0,1,2”
                        --topics-to-move-json-file <topics to reassign json file path>  指定JSON文件的地址,文件內容是需要重新分配的topic列表。這個選項和manual-assignment-json-file選項需要指定其中的一個。
                文件內容的格式為 {"topics": [{"topic": "test"},{"topic": "test1"}], "version":1  }
            --manual-assignment-json-file<manual assignment json file path> 指定JSON文件的地址,文件內容是手動分配的策略。這個選項和topics-to-move-json-file選項需要指定其中的一個。
            文件內容的格式為{"partitions": [{"topic": "test", "partition": 1, "replicas": [1,2,3] }], "version":1 }
            --status-check-json-file<partition reassignment json file path> 指定JSON文件的地址,文件內容是partition和partition需要分配到的新的replica的列表。這個JSON文件可以從模擬執行的結果得到。
            --execute 如果使用這個選項,那么會執行真實的重新分配分區的操作。如果不指定這個選項,默認會進行模擬執行。
            例子:
            <1>將test和test1 topic遷移到新的編號為3,4的broker上
            ./kafka-reassign-partitions.sh --zookeeper 172.1.1.1:2181 --broder-list "3,4" --topics-to-move-json-filetopicMove.json -execute
            topicMove.json 的內容是:{"topics":[{"topic":"test"},{"topic","test1"}],"version":1}
            <2>將test topic的partition 1 遷移到 broker 1 2 4 上
            ./kafka-reassign-partitions.sh --zookeeper 172.1.1.1:2181 --broder-list "1,2,4" --manual-assignment-json-file manualAssignment.json --execute
            manualAssignment.json的內容為:
            {"partitions":[{"topic":"test","partition":1,"replicas":[1,2,4]}],"version":1}
            
       (4)增加Topic的partition數量,命令為:
            通過kafka-topics.sh 的alter選項 ,將topic1的partitions從1增加到6;
            ./kafka-topics.sh --alter --topic topic1 --zookeeper localhost:12181 --partitions 6
          
      (5)手動均衡Topic,讓partition選擇preferred replica作為leader
            ./kafka-preferred-replica-election.sh

            --zookeeper 指定zookeeper的連接地址,格式host:port
            --path-to-json-file 指定需要重新進行leader選舉的partition列表文件所在的地址,文件內容的格式為
            {“partitions”: [{“topic”: “test”,“partitions”: 1},{“topic”: “test”, “partitions”: 2}]}
            默認值為所有存在的partition
            例如:
            <1>  ./kafka-preferred-replica-election.sh   --zookeeper 172.1.1.1:2181
            <2>  ./kafka-preferred-replica-election.sh   --zookeeper 172.1.1.1:2181 --path-to-json-file partitionList.json
                 partitionList.json 文件內容為{"partitions":[{"topic":"test","partition":1},{"topic":"test","partition":2}]}

            (6)    查看Consumer的消費和積壓信息
            ./kafka-consumer-groups.sh --bootstrap-server plaintext://192.168.0.104:9092,plaintext://192.168.0.105:9092,plaintext://192.168.0.106:9092 --new-consumer --describe --group group_test1
        
        
        (7)動態增加topic副本
            1、generate模式,給定需要重新分配的Topic,自動生成reassign plan(並不執行)
                     2、execute模式,根據指定的reassign plan重新分配Partition
                     3、verify模式,驗證重新分配Partition是否成功
            
            ./bin/kafka-reassign-partitions.sh --zookeeper localhost:12181 --reassignment-json-file replication.json --verify
            # replication.json內容為:(書寫的時候寫在一行要不會有問題)
            {"partitions":[{"topic":"topic_test1","partition":0,"replicas":[104,105,106]},{"topic":"topic_test1","partition":1,"replicas":[104,105,106]},{"topic":"topic_test1","partition":2,"replicas":[104,105,106]},{"topic":"topic_test1","partition":3,"replicas":[104,105,106]}],"version":1}

                                                    
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM