Kafka 生產者、消費者與分區的關系


背景

最近和海康整數據對接, 需要將海康產生的結構化數據拿過來做二次識別.

基本的流程: 海康大數據 --> kafka server --> 平台


Kafka 的 topic

正常過車 topic: BAYONET_VEHICLEPASS

違法過車 topic: BAYONET_VEHICLEALARM


前言

首先我們需要對kafka中的一些名詞有一定的了解, 有過一些使用經驗, 一般來說, 生產者發送消息到主題, 而消費者從主題消費數據 ( 我初次接觸的時候, 就是這樣理解的, 后來在實踐中慢慢發現分區這個角色的重要性 ), 主題下邊是分區, 消息數據是存儲在分區中的, 所以事實上是生產者發送消息到主題, 然后存儲在分區上, 消費者從某個主題下的某個分區上消費數據, 那么生產者將消息發送到哪個分區, 消費者從哪個分區開始消費呢 ?


如何設置主題下的分區數量

  1. 在 config/server.properties 配置文件中, 可以設置一個全局的分區數量, 這個分區數量的含義是: 每個主題下的分區數量, 默認為 1

  2. 也可以在創建主題的時候, 使用 --partitions 參數指定分區數量

    bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my_topic --partitions 2 --replication-factor 1
    

    查看已創建主題的分區數量:

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my_topic
    


生產者與分區

在默認 1 個分區的情況下, 我們推測消息是發送到唯一的分區上, 那么在存在多個分區的情況下, 發送消息有沒有什么規律呢, 怎么決定一條消息該往哪個分區上發送呢 ?

默認的分區策略:

The default partitioning strategy:

  • If a partition is specified in the record, use it
  • If no partition is specified but a key is present choose a partition based on a hash of the key
  • If no partition or key is present choose a partition in a round-robin fashion

org.apache.kafka.clients.producer.internals.DefaultPartitioner

默認的分區策略是:

  • 如果在發消息的時候指定了分區,則消息投遞到指定的分區
  • 如果沒有指定分區,但是消息的key不為空,則基於key的哈希值來選擇一個分區
  • 如果既沒有指定分區,且消息的key也是空,則用輪詢的方式選擇一個分區

通過源代碼可以佐證這一點:

/**
 * Compute the partition for the given record.
 *
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes serialized key to partition on (or null if no key)
 * @param value The value to partition on or null
 * @param valueBytes serialized value to partition on or null
 * @param cluster The current cluster metadata
 */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

消費者與分區

還是剛才的問題, 在默認 1 個分區的情況下, 是從唯一的分區上去消費數據, 那么在存在多個分區的情況下, 消費消息有沒有什么規律, 怎么決定從哪個分區上進行消費呢 ?

首先需要了解的是:

  1. 消費者是以組的名義訂閱主題消息, 消費者組里邊包含多個消費者實例.
  2. 主題下邊包含多個分區

消費者實例與主題下分區的分配關系

kafka 集群上有兩個節點, 4 個分區

A組有 2 個消費者實例 (兩個消費線程)

B組有 4 個消費者實例

由圖可以看出, A組的消費者C1, C2 平均要消費兩個分區的數據, 而 B 組的消費者平均消費 一 個分區的數據 ( 最理想的狀態 ), 得到的結論是 : 一條消息只能被一個消費組中的一個消費者實例消費到, (換句話說, 不可能出現組中的兩個消費者負責同一個分區, 同組內消費者不會重復消費 )

等等, 考慮的場景還不夠, 下邊再提些問題 :

如果分區數大於或等於組中的消費者實例數, 那就沒有問題, 但是如果消費者實例的數量 > 主題下分區數量, 那么按照默認的策略 ( 之所以強調默認策略是因為可以自定義策略 ), 有一些消費者是多余的, 一直接不到消息而處於空閑狀態.

但是假設有消費者實例就是不安分, 就造成了多個消費者負責同一個分區, 這樣會造成什么 ? (重復消費就太可怕了)

我們知道,Kafka它在設計的時候就是要保證分區下消息的順序,也就是說消息在一個分區中的順序是怎樣的,那么消費者在消費的時候看到的就是什么樣的順序,那么要做到這一點就首先要保證消息是由消費者主動拉取的(pull),其次還要保證一個分區只能由一個消費者負責。倘若,兩個消費者負責同一個分區,那么就意味着兩個消費者同時讀取分區的消息,由於消費者自己可以控制讀取消息的offset (偏移量),就有可能C1才讀到2,而C2讀到1,C1還沒提交 offset,這時C2讀到2了,相當於多線程讀取同一個消息,會造成消息處理的重復,且不能保證消息的順序,這就跟主動推送(push)無異。


消費者分區分配策略 (兩種)

range (默認的分配策略)

range策略是基於每個主題的,對於每個主題,我們以數字順序排列可用分區,以字典順序排列消費者。然后,將分區數量除以消費者總數,以確定分配給每個消費者的分區數量。如果沒有平均划分(PS:除不盡),那么最初的幾個消費者將有一個額外的分區。

簡而言之:

  1. range分配策略針對的是主題(也就是說,這里所說的分區指的某個主題的分區,消費者值的是訂閱這個主題的消費者組中的消費者實例)
  2. 首先,將分區按數字順序排行序,消費者按消費者名稱的字典順序排好序.
  3. 然后,用分區總數除以消費者總數。如果能夠除盡,則皆大歡喜,平均分配;若除不盡,則位於排序前面的消費者將多負責一個分區.

例如,假設有兩個消費者C0和C1,兩個主題t0和t1,並且每個主題有3個分區,分區的情況是這樣的:t0p0,t0p1,t0p2,t1p0,t1p1,t1p2

那么,基於以上信息,最終消費者分配分區的情況是這樣的:

C0: [t0p0, t0p1, t1p0, t1p1]

C1: [t0p2, t1p2]

因為,對於主題t0,分配的結果是C0負責P0和P1,C1負責P2;對於主題t2,也是如此,綜合起來就是這個結果

上面的過程用圖形表示的話大概是這樣的 :

閱讀源碼, 更有助於理解 :

public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions) {
    //    主題與消費者的映射                                                            
    Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
    Map<String, List<TopicPartition>> assignment = new HashMap<>();
    for (String memberId : subscriptions.keySet())
        assignment.put(memberId, new ArrayList<TopicPartition>());

    for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
        String topic = topicEntry.getKey();    //    主題
        List<String> consumersForTopic = topicEntry.getValue();    //    消費者列表

        //    partitionsPerTopic表示主題和分區數的映射
        //    獲取主題下有多少個分區
        Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
        if (numPartitionsForTopic == null)
            continue;

        //    消費者按字典序排序
        Collections.sort(consumersForTopic);

        //    分區數量除以消費者數量
        int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
        //    取模,余數就是額外的分區
        int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

        List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
        for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
            int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
            int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
            //    分配分區
            assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
        }
    }
    return assignment;
}

python版本

"""
    The range assignor works on a per-topic basis. For each topic, we lay out
    the available partitions in numeric order and the consumers in
    lexicographic order. We then divide the number of partitions by the total
    number of consumers to determine the number of partitions to assign to each
    consumer. If it does not evenly divide, then the first few consumers will
    have one extra partition.

    For example, suppose there are two consumers C0 and C1, two topics t0 and
    t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
    t0p2, t1p0, t1p1, and t1p2.

    The assignment will be:
        C0: [t0p0, t0p1, t1p0, t1p1]
        C1: [t0p2, t1p2]
    """
name = 'range'
version = 0

@classmethod
def assign(cls, cluster, member_metadata):
    consumers_per_topic = collections.defaultdict(list)
    for member, metadata in six.iteritems(member_metadata):
        for topic in metadata.subscription:
            consumers_per_topic[topic].append(member)

    # construct {member_id: {topic: [partition, ...]}}
    assignment = collections.defaultdict(dict)

    for topic, consumers_for_topic in six.iteritems(consumers_per_topic):
        partitions = cluster.partitions_for_topic(topic)
        if partitions is None:
            log.warning('No partition metadata for topic %s', topic)
            continue
        partitions = sorted(list(partitions))
        partitions_for_topic = len(partitions)
        consumers_for_topic.sort()

        partitions_per_consumer = len(partitions) // len(consumers_for_topic)
        consumers_with_extra = len(partitions) % len(consumers_for_topic)

        for i in range(len(consumers_for_topic)):
            start = partitions_per_consumer * i
            start += min(i, consumers_with_extra)
            length = partitions_per_consumer
            if not i + 1 > consumers_with_extra:
                length += 1
            member = consumers_for_topic[i]
            assignment[member][topic] = partitions[start:start + length]

    protocol_assignment = {}
    for member_id in member_metadata:
        protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
            cls.version,
            sorted(assignment[member_id].items()),
            b'')
    return protocol_assi

roundrobin (輪詢)

roundronbin分配策略的具體實現是org.apache.kafka.clients.consumer.RoundRobinAssignor

"""
The roundrobin assignor lays out all the available partitions and all the
available consumers. It then proceeds to do a roundrobin assignment from
partition to consumer. If the subscriptions of all consumer instances are
identical, then the partitions will be uniformly distributed. (i.e., the
partition ownership counts will be within a delta of exactly one across all
consumers.)

For example, suppose there are two consumers C0 and C1, two topics t0 and
t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
t0p2, t1p0, t1p1, and t1p2.

The assignment will be:
    C0: [t0p0, t0p2, t1p1]
    C1: [t0p1, t1p0, t1p2]

When subscriptions differ across consumer instances, the assignment process
still considers each consumer instance in round robin fashion but skips
over an instance if it is not subscribed to the topic. Unlike the case when
subscriptions are identical, this can result in imbalanced assignments.

For example, suppose we have three consumers C0, C1, C2, and three topics
t0, t1, t2, with unbalanced partitions t0p0, t1p0, t1p1, t2p0, t2p1, t2p2,
where C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is
subscribed to t0, t1, t2.

The assignment will be:
    C0: [t0p0]
    C1: [t1p0]
    C2: [t1p1, t2p0, t2p1, t2p2]
"""

輪詢分配策略是基於所有可用的消費者和所有可用的分區的

與前面的range策略最大的不同就是它不再局限於某個主題

如果所有的消費者實例的訂閱都是相同的,那么這樣最好了,可用統一分配,均衡分配

例如,假設有兩個消費者C0和C1,兩個主題t0和t1,每個主題有3個分區,分別是t0p0,t0p1,t0p2,t1p0,t1p1,t1p2

那么,最終分配的結果是這樣的:

C0: [t0p0, t0p2, t1p1]

C1: [t0p1, t1p0, t1p2]


用圖形表示大概是這樣的:

一個較為復雜的場景:

假設,組中每個消費者訂閱的主題不一樣,分配過程仍然以輪詢的方式考慮每個消費者實例,但是如果沒有訂閱主題,則跳過實例。當然,這樣的話分配肯定不均衡。

什么意思呢?也就是說,消費者組是一個邏輯概念,同組意味着同一時刻分區只能被一個消費者實例消費,換句話說,同組意味着一個分區只能分配給組中的一個消費者。事實上,同組也可以不同訂閱,這就是說雖然屬於同一個組,但是它們訂閱的主題可以是不一樣的。

例如,假設有3個主題t0,t1,t2;其中,t0有1個分區p0,t1有2個分區p0和p1,t2有3個分區p0,p1和p2;有3個消費者C0,C1和C2;C0訂閱t0,C1訂閱t0和t1,C2訂閱t0,t1和t2。那么,按照輪詢分配的話,C0應該負責

首先,肯定是輪詢的方式,其次,比如說有主題t0,t1,t2,它們分別有1,2,3個分區,也就是t0有1個分區,t1有2個分區,t2有3個分區;有3個消費者分別從屬於3個組,C0訂閱t0,C1訂閱t0和t1,C2訂閱t0,t1,t2;那么,按照輪詢分配的話,C0應該負責t0p0,C1應該負責t1p0,其余均由C2負責。

上述過程用圖形表示大概是這樣的:

為什么最后的結果是:

C0: [t0p0]

C1: [t1p0]

C2: [t1p1, t2p0, t2p1, t2p2]

這是因為,按照輪詢t0p1由C0負責,t1p0由C1負責,由於同組,C2只能負責t1p1,由於只有C2訂閱了t2,所以t2所有分區由C2負責,綜合起來就是這個結果

細想一下可以發現,這種情況下跟range分配的結果是一樣的


測試代碼 (python):

producer.py

# coding: utf-8
import time
import json
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError


class Producer(object):
    """
    使用kafka的生產模塊
    """

    def __init__(self, kafkahost, kafkaport, kafkatopic):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.producer = KafkaProducer(bootstrap_servers='{kafka_host}:{kafka_port}'.format(
            kafka_host=self.kafkaHost,
            kafka_port=self.kafkaPort
        ))

    def sendjsondata(self, params):
        try:
            parmas_message = json.dumps(params)
            producer = self.producer
            producer.send(self.kafkatopic, parmas_message.encode('utf-8'))
            producer.flush()
        except KafkaError as e:
            print(e)


def main():
    """
    測試consumer和producer
    :return:
    """
    # 測試生產模塊
    producer = Producer("10.10.4.70", 9092, "push")
    for i in range(10000):
        params = 'test---' + str(i)
        print(params)
        producer.sendjsondata(params)
        time.sleep(1)


if __name__ == '__main__':
    main()

consumer.py

# coding: utf-8
from kafka import KafkaConsumer


class Kafka_consumer(object):
    '''
    使用Kafka—python的消費模塊
    '''

    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.groupid = groupid
        self.consumer = KafkaConsumer(self.kafkatopic, group_id=self.groupid,
                                      bootstrap_servers='{kafka_host}:{kafka_port}'.format(
                                          kafka_host=self.kafkaHost,
                                          kafka_port=self.kafkaPort))

    def consume_data(self):
        try:
            for message in self.consumer:
                print(message.value)
                self.consumer.commit()
                continue

        except KeyboardInterrupt as e:
            print(e)


def main():
    '''
    測試consumer和producer
    :return:
    '''
    # 測試消費模塊
    # 消費模塊的返回格式為ConsumerRecord(topic=u'ranktest', partition=0, offset=202, timestamp=None,
    # \timestamp_type=None, key=None, value='"{abetst}:{null}---0"', checksum=-1868164195,
    # \serialized_key_size=-1, serialized_value_size=21)

    consumer = Kafka_consumer('10.10.4.70', 9092, "push", 'police_seemmo1')
    consumer.consume_data()


if __name__ == '__main__':
    main()


測試代碼 (java):

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>

   <groupId>com.cjs.example</groupId>
   <artifactId>kafka-demo</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <packaging>jar</packaging>

   <name>kafka-demo</name>
   <description></description>

   <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>2.0.5.RELEASE</version>
       <relativePath/> <!-- lookup parent from repository -->
   </parent>

   <properties>
       <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
       <java.version>1.8</java.version>
   </properties>

   <dependencies>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.kafka</groupId>
           <artifactId>spring-kafka</artifactId>
       </dependency>

       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-test</artifactId>
           <scope>test</scope>
       </dependency>
   </dependencies>

   <build>
       <plugins>
           <plugin>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-maven-plugin</artifactId>
           </plugin>
       </plugins>
   </build>

</project>

producer.java

package com.cjs.kafka.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class HelloProducer {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.133:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("abc", Integer.toString(i), Integer.toString(i)), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (null != e) {
                        e.printStackTrace();
                    }else {
                        System.out.println("callback: " + recordMetadata.topic() + " " + recordMetadata.partition() + " " + recordMetadata.offset());
                    }
                }
            });
        }
        producer.close();

    }
}

consumer.java

package com.cjs.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class HelloConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.133:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
//        props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("foo", "bar", "abc"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("partition = %s, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
            }
        }
    }
}

參考

https://www.cnblogs.com/cjsblog/p/9664536.html

http://kafka.apache.org/documentation/#consumerconfigs

https://blog.csdn.net/feelwing1314/article/details/81097167


ending ~



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM