【Kafka】自定义分区策略


自定义分区策略


思路

Command+Option+shift+N 调出查询页面,找到producer包的Partitioner接口
在这里插入图片描述
Partitioner下有一个DefaultPartitioner实现类
在这里插入图片描述
这里就有之前提到kafka数据分区策略
在这里插入图片描述


自定义分区策略

创建一个MyPartitioner类,继承并重新定义上面的Partitioner类

package cn.itcast.kafka.demo1;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class MyPartitioner implements Partitioner {
    /** * 此方法是确定分区规则 * @param topic * @param key * @param keyBytes * @param value * @param valueBytes * @param cluster * @return 返回的int值为分区 */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    	//return 3 则指定发送数据到3分区
        return 3;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

还需要在MyProducer中添加一行代码

props.put("partitioner.class","cn.itcast.kafka.demo1.MyPartitioner");

而且在MyProducer类中不需要指定分区号

producer.send(new ProducerRecord<String, String>("test" , "mykey" + i,"这是第" + i + "条message"));


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM