Kafka消費者手動提交消息偏移


生產者每次調用poll()方法時,它總是返回由生產者寫入Kafka但還沒有消費的消息,如果消費者一致處於運行狀態,那么分區消息偏移量就沒什么用處,但是如果消費者發生崩潰或者有新的消費者加入群組,就會觸發再均衡,完成再均衡之后,每個消費可能分配到新的分區,而不是之前處理的那個,為了能夠繼續之前的工作,消費者需要讀取每個分區最后一次提交的偏移量,然后從偏移量制定的地方開始工作。消費者會往一個__consumer_offser的主題發送消息,消息里包含每個分區的偏移量。

1.同步提交

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

/**
 * Created by zhangpeiran on 2018/10/9.
 */
public class MyConsumer {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers","ip:9092");
        properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id","DemoConsumerGroup");

        //默認值為latest,當消費者讀取的分區沒有偏移量或偏移量無效時,消費者將從最新的記錄開始讀
        //當一個消費group第一次訂閱主題時,符合這種情況,在Consumer啟動之前,Producer生產的數據不會被讀取
        //置為earliest,表示從分區起始位置讀取消息
        properties.put("auto.offset.reset","earliest");

        //設置手動提交消息偏移
        properties.put("enable.auto.commit","false");

        //一次拉取的最大消息條數
        properties.put("max.poll.records",10);


        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        consumer.subscribe(Collections.singletonList("Demo3"));

        int count = 0;
        try {
            while (true){
                ConsumerRecords<String,String> records = consumer.poll(10);
                for(ConsumerRecord<String ,String> record : records){
                    count ++;
                    if(count == 50)
                        consumer.commitSync();
                    System.out.println(record.topic() + "," + record.partition() + "," + record.offset() + "," + record.key() + "," + record.value());
                }
                System.out.println(count);
            }
        } finally {
            consumer.close();
        }
    }
}

說明:在上述例子中,主題Demo3中已經有100條消息,第一次遠行Consumer時,在讀取到50條消息時,提交一次偏移量,輸出的count值為100;第二次不改變消費group,會從51條開始讀取,所以輸出的count值為50

2. 異步提交,同步提交時,在broker回應指,會一直阻塞、重試,限制應用的吞吐量,因此可以采用異步提交,異步提交失敗時不會重試,因為如果提交失敗時因為臨時的問題導致的,那么后續的提交總戶有成功的。

consumer.commitAsync();

3. 同步、異步組合提交,確保消費者在關閉或者再均衡之前提交成功

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

/**
 * Created by zhangpeiran on 2018/10/9.
 */
public class MyConsumer {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers","ip:9092");
        properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id","DemoConsumerGroup");

        //默認值為latest,當消費者讀取的分區沒有偏移量或偏移量無效時,消費者將從最新的記錄開始讀
        //當一個消費group第一次訂閱主題時,符合這種情況,在Consumer啟動之前,Producer生產的數據不會被讀取
        //置為earliest,表示從分區起始位置讀取消息
        properties.put("auto.offset.reset","earliest");

        //設置手動提交消息偏移
        properties.put("enable.auto.commit","false");

        //一次拉取的最大消息條數
        properties.put("max.poll.records",10);


        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        consumer.subscribe(Collections.singletonList("Demo3"));

        int count = 0;
        try {
            while (true){
                ConsumerRecords<String,String> records = consumer.poll(10);
                for(ConsumerRecord<String ,String> record : records){
                    count ++;
                    //if(count == 50)
                        //consumer.commitAsync();
                        //consumer.commitSync();
                    System.out.println(record.topic() + "," + record.partition() + "," + record.offset() + "," + record.key() + "," + record.value());
                }
                consumer.commitAsync();
                //System.out.println(count);
            }
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
            //consumer.close();
        }
    }
}

 5. 提交特定的偏移量。前面提交的是最后一個偏移量,poll可能返回了大批數據,這樣在再均衡時,可能重復處理的消息比較多。消費者API提供了指定分區和偏移量來提交

 
         
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
* Created by zhangpeiran on 2018/10/9.
*/
public class MyConsumer {

public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers","ip:9092");
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id","DemoConsumerGroup");

//默認值為latest,當消費者讀取的分區沒有偏移量或偏移量無效時,消費者將從最新的記錄開始讀
//當一個消費group第一次訂閱主題時,符合這種情況,在Consumer啟動之前,Producer生產的數據不會被讀取
//置為earliest,表示從分區起始位置讀取消息
properties.put("auto.offset.reset","earliest");

//設置手動提交消息偏移
properties.put("enable.auto.commit","false");

//一次拉取的最大消息條數
properties.put("max.poll.records",1000);


KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

consumer.subscribe(Collections.singletonList("Demo5"));


int cnt = 0;
int count = 0;
Map<TopicPartition,OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition,OffsetAndMetadata>();
try {
while (true){
ConsumerRecords<String,String> records = consumer.poll(10);
for(ConsumerRecord<String ,String> record : records){
count ++;
//if(count == 50)
//consumer.commitAsync();
//consumer.commitSync();

//offset + 1,下次消費者從該偏移量開始拉取消息
currentOffsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()+1,"no"));
if ((count / 10 == 1) && (count % 10 == 0)){
System.out.println(count);
consumer.commitSync(currentOffsets);
}
System.out.println(record.topic() + "," + record.partition() + "," + record.offset() + "," + record.key() + "," + record.value());
}
//consumer.commitAsync();

cnt ++;
}
} finally {
try {
//consumer.commitSync();
} finally {
consumer.close();
}
//consumer.close();
}
}
}
 

生產者生產了100條消息,上述代碼的結果是:依次啟動-暫停消費者10次,每次讀取100,90,80,...10條消息,原因是每次消費者讀取前10條的時候提交一次偏移量


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM