一:Consumer API
1.自動提交程序
這種不建議在實際中使用
/**
* 簡單的消費kafka消息,自動提交
* 消費過的數據再消費不到了
*/
public static void helloConsumer() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
properties.setProperty("group.id", "test");
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
// 訂閱
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
// 1000毫秒拉取一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records)
System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
}
2.手動提交offset
如果事務失敗了,么有提交,下次還能繼續獲取到數據
/**
* 手動提交
*/
public static void commitedOffset() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
properties.setProperty("group.id", "test");
properties.setProperty("enable.auto.commit", "false");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
// 訂閱
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
// 1000毫秒拉取一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
// 手動提交,for循環成功才執行;不然不執行,在下一次還會再拉取數據
consumer.commitAsync();
}
}
3.ConsumerGroup
單個分區的消息只能有ConsumerGroup中的某個Consumer消費
Consumer從partition中的消費是順序,默認從頭開始
單個ConsumerGroup會消費所有partition中的消息
4.特性

5.按照patition維度進行處理
/**
* 按照patition維度進行處理
*/
public static void commitedOffsetWithPartition() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
properties.setProperty("group.id", "test");
properties.setProperty("enable.auto.commit", "false");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
// 訂閱
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
// 1000毫秒拉取一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// 操作維度是partition了,每個partition單獨處理
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecords = records.records(partition);
for (ConsumerRecord<String, String> record : pRecords) {
System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
long lastOffset = pRecords.get(pRecords.size() - 1).offset();
// 手動提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap();
offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
consumer.commitSync(offset);
}
}
}
6.只消費某個partition
/**
* 訂閱topic下的partition中的內容
*
*/
public static void commitedOffsetWithTopicPartition() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
properties.setProperty("group.id", "test");
properties.setProperty("enable.auto.commit", "false");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);
// 訂閱partition
consumer.assign(Arrays.asList(p1));
while (true) {
// 1000毫秒拉取一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// 操作維度是partition了,每個partition單獨處理
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecords = records.records(partition);
for (ConsumerRecord<String, String> record : pRecords) {
System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
long lastOffset = pRecords.get(pRecords.size() - 1).offset();
// 手動提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap();
offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
consumer.commitSync(offset);
}
}
}
二:Consumer API的多線程處理
1.第一種方式

2.程序
package com.jun.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
public class ConsumerThreadSample {
private final static String TOPIC_NAME="caojun-topic";
/*
這種類型是經典模式,每一個線程單獨創建一個KafkaConsumer,用於保證線程安全
*/
public static void main(String[] args) throws InterruptedException {
KafkaConsumerRunner r1 = new KafkaConsumerRunner();
Thread t1 = new Thread(r1);
t1.start();
Thread.sleep(15000);
r1.shutdown();
}
public static class KafkaConsumerRunner implements Runnable{
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public KafkaConsumerRunner() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.19.129:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);
consumer.assign(Arrays.asList(p0,p1));
}
public void run() {
try {
while(!closed.get()) {
//處理消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecord = records.records(partition);
// 處理每個分區的消息
for (ConsumerRecord<String, String> record : pRecord) {
System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(),record.offset(), record.key(), record.value());
}
// 返回去告訴kafka新的offset
long lastOffset = pRecord.get(pRecord.size() - 1).offset();
// 注意加1
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
}catch(WakeupException e) {
if(!closed.get()) {
throw e;
}
}finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
}
3.第二種方式
這種方式,是沒有辦法提交offset的,只是為了快速消費數據

4.程序
package com.jun.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ConsumerRecordThreadSample {
private final static String TOPIC_NAME = "caojun-topic";
public static void main(String[] args) throws InterruptedException {
String brokerList = "192.168.19.129:9092";
String groupId = "test";
int workerNum = 5;
CunsumerExecutor consumers = new CunsumerExecutor(brokerList, groupId, TOPIC_NAME);
consumers.execute(workerNum);
Thread.sleep(1000000);
consumers.shutdown();
}
// Consumer處理
public static class CunsumerExecutor{
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
public CunsumerExecutor(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
public void execute(int workerNum) {
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (final ConsumerRecord record : records) {
executors.submit(new ConsumerRecordWorker(record));
}
}
}
public void shutdown() {
if (consumer != null) {
consumer.close();
}
if (executors != null) {
executors.shutdown();
}
try {
if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Timeout.... Ignore for this case");
}
} catch (InterruptedException ignored) {
System.out.println("Other thread interrupted this shutdown, ignore for this case.");
Thread.currentThread().interrupt();
}
}
}
// 記錄處理
public static class ConsumerRecordWorker implements Runnable {
private ConsumerRecord<String, String> record;
public ConsumerRecordWorker(ConsumerRecord record) {
this.record = record;
}
@Override
public void run() {
// 假如說數據入庫操作
System.out.println("Thread - "+ Thread.currentThread().getName());
System.err.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
}
}
三:一些其他的特性
1.手動控制offset
/**
* 手動指定offset的起始位置,手動提交offset
*
* 手動指定offset起始位置
* 1、人為控制offset起始位置
* 2、如果出現程序錯誤,重復消費一次
*
* 步驟
* 1、第一次從0消費【一般情況】
* 2、比如一次消費了100條, offset置為101並且存入Redis
* 3、每次poll之前,從redis中獲取最新的offset位置
* 4、每次從這個位置開始消費
*
* 建議
* 1.使用redis進行保存
*/
public static void controllerOffset() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
properties.setProperty("group.id", "test");
properties.setProperty("enable.auto.commit", "false");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
// 訂閱partition
consumer.assign(Arrays.asList(p0));
while (true) {
// 設置offset
consumer.seek(p0, 5);
// 1000毫秒拉取一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// 操作維度是partition了,每個partition單獨處理
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecords = records.records(partition);
for (ConsumerRecord<String, String> record : pRecords) {
System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
long lastOffset = pRecords.get(pRecords.size() - 1).offset();
// 手動提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap();
offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
consumer.commitSync(offset);
}
}
}
2.限流
/**
* 限流
*/
public static void controllerLimit() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.19.129:9092");
properties.setProperty("group.id", "test");
properties.setProperty("enable.auto.commit", "false");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);
long totalNum = 100;
// 訂閱partition
consumer.assign(Arrays.asList(p0, p1));
while (true) {
// 1000毫秒拉取一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// 操作維度是partition了,每個partition單獨處理
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecords = records.records(partition);
long num = 0;
for (ConsumerRecord<String, String> record : pRecords) {
System.out.printf("partition= %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
/*
1、接收到record信息以后,去令牌桶中拿取令牌
2、如果獲取到令牌,則繼續業務處理
3、如果獲取不到令牌, 則pause等待令牌
4、當令牌桶中的令牌足夠, 則將consumer置為resume狀態
*/
num++;
if(record.partition() == 0){
if(num >= totalNum){
consumer.pause(Arrays.asList(p0));
}
}
if(record.partition() == 1){
if(num == 40){
consumer.resume(Arrays.asList(p0));
}
}
}
long lastOffset = pRecords.get(pRecords.size() - 1).offset();
// 手動提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap();
offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
consumer.commitSync(offset);
}
}
}
