kafka producer自定義partitioner和consumer多線程


  為了更好的實現負載均衡和消息的順序性,Kafka Producer可以通過分發策略發送給指定的Partition。Kafka Java客戶端有默認的Partitioner,平均的向目標topic的各個Partition中生產數據,如果想要控制消息的分發策略,有兩種方式,一種是在發送前創建ProducerRecord時指定分區(針對單個消息),另一種就是就是根據Key自己寫算法。繼承Partitioner接口,實現其partition方法。並且配置啟動參數 props.put("partitioner.class","com.example.demo.MyPartitioner"),示例代碼如下:

  自定義的partitoner

package com.example.demo;

import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

public class MyPartitioner implements Partitioner {

    @Override
    public void configure(Map<String, ?> configs) {

    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
         if (Integer.parseInt((String)key)%3==1)
                return 0;
         else if (Integer.parseInt((String)key)%3==2)
                return 1;
         else return 2;
    }

    @Override
    public void close() {

    }

}

  producer類中指定partitioner.class

package com.example.demo;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class MyProducer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.124:9092");
                props.put("acks", "all");
                props.put("retries", 0);
                props.put("batch.size", 16384);
                props.put("linger.ms", 1);
                props.put("partitioner.class", "com.example.demo.MyPartitioner");
                props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("powerTopic", Integer.toString(i), Integer.toString(i)));

        producer.close();

    }
}

  測試consumer

  

package com.example.demo;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MyAutoCommitConsumer {

    public static void main(String[] args) {
         Properties props = new Properties();
         props.put("bootstrap.servers", "192.168.1.124:9092");
         props.put("group.id", "test");
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "1000");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         @SuppressWarnings("resource")
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList("powerTopic"));
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records)
                 System.out.printf("partition = %d,offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());
         }
    }
}

  啟動zookeeper和kafka,使用命令行新建一個 3個partition的topic:powerTopic,為了方便查看結果,將producer的循環次數設置為15,運行consumer和producer代碼,效果如下:

  雖然我們有三個分區,但是我們group組中只有一個消費者,所以三個分區的消息被這個消費者順序消費,下面我們實現一個消費者組,示例代碼如下:

  ConsumerThread類

package com.example.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerThread implements Runnable {
    private  KafkaConsumer<String,String> kafkaConsumer;
    private final String topic;

    public ConsumerThread(String brokers,String groupId,String topic){
        Properties properties = buildKafkaProperty(brokers,groupId);
        this.topic = topic;
        this.kafkaConsumer = new KafkaConsumer<String, String>(properties);
        this.kafkaConsumer.subscribe(Arrays.asList(this.topic));
    }

    private static Properties buildKafkaProperty(String brokers,String groupId){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokers);
        properties.put("group.id", groupId);
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return properties;
    }

    @Override
    public void run() {
        while (true){
            ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(100);
            for(ConsumerRecord<String,String> item : consumerRecords){
                System.out.println(Thread.currentThread().getName());
                System.out.printf("partition = %d,offset = %d, key = %s, value = %s%n",item.partition(), item.offset(), item.key(), item.value());
            }
        }
    }
}

  ConsumerGroup類

package com.example.demo;

import java.util.ArrayList;
import java.util.List;

public class ConsumerGroup {
    private List<ConsumerThread> consumerThreadList = new ArrayList<ConsumerThread>();

    public ConsumerGroup(String brokers,String groupId,String topic,int consumerNumber){
        for(int i = 0; i< consumerNumber;i++){
            ConsumerThread consumerThread = new ConsumerThread(brokers,groupId,topic);
            consumerThreadList.add(consumerThread);
        }
    }

    public void start(){
        for (ConsumerThread item : consumerThreadList){
            Thread thread = new Thread(item);
            thread.start();
        }
    }
}

  消費者組啟動類ConsumerGroupMain

package com.example.demo;

public class ConsumerGroupMain {

    public static void main(String[] args){
        String brokers = "192.168.1.124:9092";
        String groupId = "group01";
        String topic = "powerTopic";
        int consumerNumber = 3;
        ConsumerGroup consumerGroup = new ConsumerGroup(brokers,groupId,topic,consumerNumber);
        consumerGroup.start();
    }
}

  啟動消費者和生產者,可以看到不同的分區是不同的線程去執行的效果如下:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM