__consumer_offsets
consumer默認將offset保存在Kafka一個內置的topic中,該topic為__consumer_offsets
__consumer_offsets 為kafka中的topic, 那就可以通過消費者進行消費.
大概思路:

1.先啟動一個生產者:
offset_Producer
package Look_offset;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/*簡單一個生產者 給offset_Consumer提供數據消費的*/
public class offset_Producer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "zzz01:9092");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
//
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put("retries", 3);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
for (int i = 0; i < 1000; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("otto", "害羞小向晚" + i + "次");
//回調函數 acks設置為all 等所有follower落盤完成之后返回一個回執消息
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println(metadata.topic() + " 數據:" + producerRecord.value() + " " + "分區: " + metadata.partition() + " "
+ "offset:" + metadata.offset());
}
}
});
//同步發送的意思就是,一條消息發送之后,會阻塞當前線程,直至返回ack。
//由於send方法返回的是一個Future對象,根據Futrue對象的特點,我們也可以實現同步發送的效果,只需在調用Future對象的get方發即可。
Thread.sleep(5);
}
kafkaProducer.close();
}
}
2. 在kafka上啟動腳本
消費__consumer_offsets的腳本:
#將結果輸出到文件 方便查看
bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server Ava01:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning >>kafka_offset.txt
3.啟動消費者
又是消費者又是生產者 產生的offset放進去__consumer_offset要被在kafaka中用腳本啟動的消費者消費
offset_Consumer
package Look_offset;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
/*
* 又是消費者又是生產者 產生的offset放進去__consumer_offset要被offset_Consumer2消費*/
public class offset_Consumer {
public static void main(String[] args) {
// 1. 創建配置對象
Properties properties = new Properties();
// 2. 給配置對象添加參數
// 添加連接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "zzz01:9092");
// 配置序列化 必須
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 配置消費者組
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "Ava");
// 修改分區分配策略
// properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
// 不排除內部offset,不然看不到__consumer_offsets
properties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false");
//3. 創建kafka消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//4. 設置消費主題 形參是列表
ArrayList<String> arrayList = new ArrayList<>();
// 更換主題
arrayList.add("otto");
consumer.subscribe(arrayList);
//5. 消費數據
while (true) {
// 讀取消息
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
// 輸出消息
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value() + " "+ "offset: "+consumerRecord.offset());
}
}
}
}
