一、
1、Kafka的消費並行度依賴Topic配置的分區數,如分區數為10,那么最多10台機器來並行消費(每台機器只能開啟一個線程),或者一台機器消費(10個線程並行消費)。即消費並行度和分區數一致。
2、(1)如果指定了某個分區,會只講消息發到這個分區上
(2)如果同時指定了某個分區和key,則也會將消息發送到指定分區上,key不起作用
(3)如果沒有指定分區和key,那么將會隨機發送到topic的分區中
(4)如果指定了key,那么將會以hash<key>的方式發送到分區中
二、多線程消費實例
paritition 為3,broker為3,節點為3
1、生產者隨機分區提交數據
這也是一個比較關鍵步驟,只有隨機提交到不同的分區才能實現多分區消費;
自定義隨機分區:
public class MyPartition implements Partitioner{ private static Logger LOG = LoggerFactory.getLogger(MyPartition.class); @Override public void configure(Map<String, ?> arg0) { // TODO Auto-generated method stub } @Override public void close() { // TODO Auto-generated method stub } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); int partitionNum = 0; try { partitionNum = Integer.parseInt((String) key); } catch (Exception e) { partitionNum = key.hashCode() ; } // System.out.println("kafkaMessage topic:"+ topic+" |key:"+ key+" |value:"+value); return Math.abs(partitionNum % numPartitions); } }
然后在初始化kafka生產者配置的時候修改如下配置
props.put("partitioner.class", properties.getProperty("com.mykafka.MyPartition"));
這樣就實現了kafka生產者隨機分區提交數據。
2、消費者
最后一步就是消費者,修改單線程模式為多線程,這里的多線程實現方式有很多,本例知識用了最簡單的固定線程模式:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < 3; i++) { fixedThreadPool.execute(new Runnable() { @Override public void run() { kafkaConsumerService.getInstance(); } }); }
在消費方面得注意,這里得遍歷所有分區,否則還是只消費了一個區:
ConsumerRecords<String, String> records = consumer.poll(1000); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records .records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println( "message==>key:" + record.key() + " value:" + record.value() + " offset:" + record.offset() + " 分區:" + record.partition()); if (record.value() == null || record.key() == null) { consumer.commitSync(); } else { // dealMessage KafkaServer.dealMessage(record.key(),record.value(),consumer); // consumer.commitSync(); } } }
注意上面的線程為啥只有3個,這里得跟上面kafka的分區個數相對應起來,否則如果線程超過分區數量,那么只會浪費線程,因為即使使用3個以上的線程也只會消費三個分區,而少了則無法消費完全。所以這里必須更上面的對應起來。