//暫停kafka的消費 暫停分區的分配
consumer.unsubscribe();//此處不取消訂閱暫停太久會出現訂閱超時的錯誤
consumer.pause(consumer.assignment());
//重新消費分區,此處不重新分配會出錯
this.open(null,null,null);
if (null == consumer) {
Properties props = new Properties();
props.put("bootstrap.servers", PropertiesUtil.getValue("bootstrap.servers"));
// 消費者的組id
props.put("group.id", constant.kafka_groupName);//Spider2
props.put("enable.auto.commit", "false");
// max.poll.interval.ms(官網給得默認值為3000)的意思為,當我們從kafkaServer端poll消息時,poll()的調用之間的最大延遲。
// 這提供了消費者在獲取更多記錄之前可以空閑的時間量的上限。 如果在此超時到期之前未調用poll(),則認為使用者失敗,並且消費
// 者組將重新平衡以便將分區重新分配給其他消費者,而恰好這里我們設置了Thread.sleep(6000) > max.poll.interval.ms值,
// 也就是我們在手動提交的時候,實際上分區信息已經被分配到整個消費者組里面的其它消費者了
props.put("auto.commit.interval.ms", "3000");
// 從poll(拉)的回話處理時長
props.put("session.timeout.ms", "100000");
props.put("request.timeout.ms", "200000");
props.put("max.poll.records", "2");
// poll的數量限制
// props.put("max.poll.records", "100");
/* props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");*/
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("group.name", UUID.randomUUID().toString().replaceAll("-", ""));
consumer = new KafkaConsumer<String, String>(props);
// 訂閱主題列表topic
//consumer.subscribe(Arrays.asList("test_input"));
}
//注冊kafka rebalanceListener
//consumer.subscribe(Arrays.asList("test_etl"), new ConsumerRebalanceListener(){
listener = new ConsumerRebalanceListener(){
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.printf("threadId = {}, onPartitionsRevoked.", Thread.currentThread().getId());
consumer.commitSync(offsetsMap);
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(
Collection<TopicPartition> partitions) {
System.out.printf("threadId = {}, onPartitionsAssigned.", Thread.currentThread().getId());
consumer.commitSync();
offsetsMap.clear();
}};
consumer.subscribe(Arrays.asList(topicName.split(",")[0],topicName.split(",")[1],topicName.split(",")[2]), listener);
consumer.resume(consumer.assignment());