實現一個用於審計功能的分區策略:假設我們有兩類消息,其中一類消息的key為audit,用於審計,放在最后一個分區中,其他消息在剩下的分區中隨機分配。
先創建一個三個分區三個副本的主題audit-test:
bin/kafka-topics.sh --create --zookeeper localhost:2181,localhost:2812,localhost:2183 --topic audit-test
--partitions 3 --replication-factor 3 //查看 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic audit-test
然后實現Kafka客戶端提供的Partitioner接口:
import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import java.util.List; import java.util.Map; import java.util.Random; /*** * * 實現一個自定義分區策略: * * key含有audit的一部分消息發送到最后一個分區上,其他消息在其他分區隨機分配 * * * */ public class PartitionerImpl implements Partitioner { private Random random; public void configure(Map<String, ?> configs) { //做必要的初始化工作 random = new Random(); } //分區策略 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { String keyObj = (String) key; List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic); int partitionCount = partitionInfoList.size(); System.out.println("partition size: " + partitionCount); int auditPartition = partitionCount - 1; return keyObj == null || "".equals(keyObj) || !keyObj.contains("audit") ? random.nextInt(partitionCount - 1) : auditPartition; } public void close() { //清理工作 } }
接下來設定啟動類參數:
//實現類 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"cn.org.fubin.PartitionerImpl"); String topic = "audit-test"; Producer<String,String> producer = new KafkaProducer<String, String>(properties); ProducerRecord nonKeyRecord = new ProducerRecord(topic,"non-key record"); //這類消息需要放在最后一個分區 ProducerRecord auditRecord = new ProducerRecord(topic,"audit","audit record"); ProducerRecord nonAuditRecord = new ProducerRecord(topic,"other","non-audit record"); try { producer.send(nonAuditRecord).get(); producer.send(nonAuditRecord).get(); producer.send(auditRecord).get(); producer.send(nonAuditRecord).get(); producer.send(nonAuditRecord).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
最后驗證:多推送幾次消息,查看每個分區的消息數
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic audit-test