為了更好的實現負載均衡和消息的順序性,Kafka Producer可以通過分發策略發送給指定的Partition。Kafka Java客戶端有默認的Partitioner,平均的向目標topic的各個Partition中生產數據,如果想要控制消息的分發策略,有兩種方式,一種是在發送前創建ProducerRecord時指定分區(針對單個消息),另一種就是就是根據Key自己寫算法。繼承Partitioner接口,實現其partition方法。並且配置啟動參數 props.put("partitioner.class","com.example.demo.MyPartitioner"),示例代碼如下:
自定義的partitoner
package com.example.demo; import java.util.Map; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; public class MyPartitioner implements Partitioner { @Override public void configure(Map<String, ?> configs) { } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if (Integer.parseInt((String)key)%3==1) return 0; else if (Integer.parseInt((String)key)%3==2) return 1; else return 2; } @Override public void close() { } }
producer類中指定partitioner.class
package com.example.demo; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class MyProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.124:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("partitioner.class", "com.example.demo.MyPartitioner"); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("powerTopic", Integer.toString(i), Integer.toString(i))); producer.close(); } }
測試consumer
package com.example.demo; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class MyAutoCommitConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.124:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); @SuppressWarnings("resource") KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("powerTopic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("partition = %d,offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value()); } } }
啟動zookeeper和kafka,使用命令行新建一個 3個partition的topic:powerTopic,為了方便查看結果,將producer的循環次數設置為15,運行consumer和producer代碼,效果如下:

雖然我們有三個分區,但是我們group組中只有一個消費者,所以三個分區的消息被這個消費者順序消費,下面我們實現一個消費者組,示例代碼如下:
ConsumerThread類
package com.example.demo; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class ConsumerThread implements Runnable { private KafkaConsumer<String,String> kafkaConsumer; private final String topic; public ConsumerThread(String brokers,String groupId,String topic){ Properties properties = buildKafkaProperty(brokers,groupId); this.topic = topic; this.kafkaConsumer = new KafkaConsumer<String, String>(properties); this.kafkaConsumer.subscribe(Arrays.asList(this.topic)); } private static Properties buildKafkaProperty(String brokers,String groupId){ Properties properties = new Properties(); properties.put("bootstrap.servers", brokers); properties.put("group.id", groupId); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("auto.offset.reset", "earliest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return properties; } @Override public void run() { while (true){ ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(100); for(ConsumerRecord<String,String> item : consumerRecords){ System.out.println(Thread.currentThread().getName()); System.out.printf("partition = %d,offset = %d, key = %s, value = %s%n",item.partition(), item.offset(), item.key(), item.value()); } } } }
ConsumerGroup類
package com.example.demo; import java.util.ArrayList; import java.util.List; public class ConsumerGroup { private List<ConsumerThread> consumerThreadList = new ArrayList<ConsumerThread>(); public ConsumerGroup(String brokers,String groupId,String topic,int consumerNumber){ for(int i = 0; i< consumerNumber;i++){ ConsumerThread consumerThread = new ConsumerThread(brokers,groupId,topic); consumerThreadList.add(consumerThread); } } public void start(){ for (ConsumerThread item : consumerThreadList){ Thread thread = new Thread(item); thread.start(); } } }
消費者組啟動類ConsumerGroupMain
package com.example.demo; public class ConsumerGroupMain { public static void main(String[] args){ String brokers = "192.168.1.124:9092"; String groupId = "group01"; String topic = "powerTopic"; int consumerNumber = 3; ConsumerGroup consumerGroup = new ConsumerGroup(brokers,groupId,topic,consumerNumber); consumerGroup.start(); } }
啟動消費者和生產者,可以看到不同的分區是不同的線程去執行的效果如下:

