自定義分區策略
思路
Command+Option+shift+N 調出查詢頁面,找到producer包的Partitioner接口
Partitioner下有一個DefaultPartitioner實現類
這里就有之前提到kafka數據分區策略
自定義分區策略
創建一個MyPartitioner類,繼承並重新定義上面的Partitioner類
package cn.itcast.kafka.demo1;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner {
/** * 此方法是確定分區規則 * @param topic * @param key * @param keyBytes * @param value * @param valueBytes * @param cluster * @return 返回的int值為分區 */
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//return 3 則指定發送數據到3分區
return 3;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
還需要在MyProducer中添加一行代碼
props.put("partitioner.class","cn.itcast.kafka.demo1.MyPartitioner");
而且在MyProducer類中不需要指定分區號
producer.send(new ProducerRecord<String, String>("test" , "mykey" + i,"這是第" + i + "條message"));