Kafka 分區備份實戰


1.概述

  在 Kafka 集群中,我們可以對每個 Topic 進行一個或是多個分區,並為該 Topic 指定備份數。這部分元數據信息都是存放在 Zookeeper 上,我們可以使用 zkCli 客戶端,通過 ls 和 get 命令來查看元數據信息。通過 log.dirs 屬性控制消息存放路徑,每個分區對應一個文件夾,文件夾命名方式為:TopicName-PartitionIndex,該文件夾下存放這該分區的所有消息和索引文件,如下圖所示:

2.內容

  Kafka 集群在生產消息入庫的時候,通過 Key 來進行分區存儲,按照相應的算法,生產分區規則,讓所生產的消息按照該規則分布到不同的分區中,以達到水平擴展和負載均衡。而我們在消費這些消息的時候,可以使用多線程來消費該 Topic 下的所有分區中的消息。

  分區規則的制定,通過實現 kafka.producer.Partitioner 接口,該接口我們可以進行重寫,按照自己的方式去實現分區規則。如下,我們按照 Key 的 Hash 值,然后取模得到分區索引,代碼如下所示:

package cn.hadoop.hdfs.kafka.partition;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

/**
 * @Date Nov 3, 2016
 *
 * @Author dengjie
 *
 * @Note 先 Hash 再取模,得到分區索引
 */
public class CustomerPartitioner implements Partitioner {

    public CustomerPartitioner(VerifiableProperties props) {
    }

    public int partition(Object key, int numPartitions) {
        int partition = 0;
        String k = (String) key;
        partition = Math.abs(k.hashCode()) % numPartitions;
        return partition;
    }

}

  在創建 Topic 的時候,若按照上述規則創建分區,分區數最后為 Brokers 的整數倍,這樣才能發揮其負載均衡的作用,比如:當前我們集群節點由 3 個 Broker 組成,如下圖所示:

2.1 創建分區

  我們在創建分區的時候,可以通過 Kafka 提供的客戶端命令進行創建,如下,我們創建一個6分區,3備份的一個 Topic,命令如下所示:

./kafka-topics.sh --create --zookeeper k1:2181,k2:2181,k3:2181 --replication-factor 3 --partitions 6 --topic ke_test

  這里需要注意的是,指定備份數的時候,備份數要小於等於 Brokers 數。否則創建失敗。在創建分區的時候,假設,我們只創建 2 個分區,而我們上述圖中, Brokers 有 3 個,會造成有一個 Broker 上沒有該 Topic 的分區,以致分布不均。

2.2 分區入庫

  一般,我們在入庫消息的時候,都有使用 Kafka 的 API,如下,我們使用生產 API ,按照上述的 Hash 取模規則,進行分區入庫,代碼如下所示:

package cn.hadoop.hdfs.kafka.partition;

import java.util.List;
import java.util.Properties;

import cn.hadoop.hdfs.kafka.partition.data.FileRead;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * @Date Nov 3, 2016
 *
 * @Author dengjie
 *
 * @Note 按照先 Hash 再取模的規則,進行分區入庫
 */
public class PartitionerProducer {
    public static void main(String[] args) {
        producerData();
    }

    private static void producerData() {
        Properties props = new Properties();
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "k1:9092,k2:9092,k3:9092");
        props.put("partitioner.class", "cn.hadoop.hdfs.kafka.partition.CustomerPartitioner");
        Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
        String topic = "ke_test";
        List<String> list = FileRead.readData();
        for (int i = 0; i < list.size(); i++) {
            String k = "key" + i;
            String v = new String(list.get(i));
            producer.send(new KeyedMessage<String, String>(topic, k, v));
            if (i == (list.size() - 1)) {
                return;
            }
        }
        producer.close();
    }
}

  這里,我們分析發現,生產者在生產消息入庫時,會按照 CustomerPartitioner 的規則,進行分區入庫,在入庫時,將 Key 先做 Hash,然后分區數取模(這里分區數是 6).我們計算可以得到一下信息:

hashCode("key0") % 6 = 1
hashCode("key1") % 6 = 2
hashCode("key2") % 6 = 3
hashCode("key3") % 6 = 4
hashCode("key4") % 6 = 5
hashCode("key5") % 6 = 0
// ... 以此循環

  按照該表述規則進行分區入庫。

2.3 分區入庫驗證

  接下里,我們通過 Kafka 的消費者 API 來驗證,在消費時,消費 Topic 各分區的詳情,代碼如下所示:

package cn.hadoop.hdfs.kafka.partition;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

/**
 * @Date Nov 3, 2016
 *
 * @Author dengjie
 *
 * @Note 通過 Kafka 的消費者 API 驗證分區入庫的消息
 */
public class PartitionerConsumer {
    public static void main(String[] args) {
        String topic = "ke_test";
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            MessageAndMetadata<byte[], byte[]> mam = it.next();
            System.out.println("consume: Partition [" + mam.partition() + "] Message: [" + new String(mam.message())
                    + "] ..");
        }

    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("group.id", "group1");
        props.put("zookeeper.connect", "zk1:2181,zk2:2181,zk3:2181");
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        return new ConsumerConfig(props);
    }
}

  這里筆者只是驗證消費數據,若在實際生產線上,需將上述單線程消費改造成多線程消費,來提升處理消息的能力。

2.4 驗證結果

  這里,我們線運行生產者,讓其生產消息,並分區入庫;然后,在啟動消費者,消費消息驗證其結果,如下圖所示:

3.總結

  需要注意的是,分區數建議為 Brokers 的整數倍,讓其達到均勻分布;備份數必須小於等於 Brokers。以及,多線程消費的控制,其線程數建議和分區數相等。

4.結束語

  這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM