一、
1、Kafka的消費並行度依賴Topic配置的分區數,如分區數為10,那么最多10台機器來並行消費(每台機器只能開啟一個線程),或者一台機器消費(10個線程並行消費)。即消費並行度和分區數一致。
2、(1)如果指定了某個分區,會只講消息發到這個分區上
(2)如果同時指定了某個分區和key,則也會將消息發送到指定分區上,key不起作用
(3)如果沒有指定分區和key,那么將會隨機發送到topic的分區中
(4)如果指定了key,那么將會以hash<key>的方式發送到分區中
二、多線程消費實例
paritition 為3,broker為3,節點為3
1、生產者隨機分區提交數據
這也是一個比較關鍵步驟,只有隨機提交到不同的分區才能實現多分區消費;
自定義隨機分區:
public class MyPartition implements Partitioner{
private static Logger LOG = LoggerFactory.getLogger(MyPartition.class);
@Override
public void configure(Map<String, ?> arg0) {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int partitionNum = 0;
try {
partitionNum = Integer.parseInt((String) key);
} catch (Exception e) {
partitionNum = key.hashCode() ;
}
// System.out.println("kafkaMessage topic:"+ topic+" |key:"+ key+" |value:"+value);
return Math.abs(partitionNum % numPartitions);
}
}
然后在初始化kafka生產者配置的時候修改如下配置
props.put("partitioner.class", properties.getProperty("com.mykafka.MyPartition"));
這樣就實現了kafka生產者隨機分區提交數據。
2、消費者
最后一步就是消費者,修改單線程模式為多線程,這里的多線程實現方式有很多,本例知識用了最簡單的固定線程模式:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
kafkaConsumerService.getInstance();
}
});
}
在消費方面得注意,這里得遍歷所有分區,否則還是只消費了一個區:
ConsumerRecords<String, String> records = consumer.poll(1000);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records
.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(
"message==>key:" + record.key() + " value:" + record.value() + " offset:" + record.offset()
+ " 分區:" + record.partition());
if (record.value() == null || record.key() == null) {
consumer.commitSync();
} else {
// dealMessage
KafkaServer.dealMessage(record.key(),record.value(),consumer);
// consumer.commitSync();
}
}
}
注意上面的線程為啥只有3個,這里得跟上面kafka的分區個數相對應起來,否則如果線程超過分區數量,那么只會浪費線程,因為即使使用3個以上的線程也只會消費三個分區,而少了則無法消費完全。所以這里必須更上面的對應起來。
