選擇自動提交還是手動提交方式和業務場景相關,可以查看前面的博客,根據原理進行選擇。
單線程消費
pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.2</version>
</dependency>
consumerConfig
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* 單線程-單條消費
* @return
*/
@Bean
public KafkaListenerContainerFactory<?> stringKafkaListenerContainerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "topic");
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
return factory;
}
}
consumer
@Component
public class KafkaReceiver {
private static Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
//@KafkaListener(topics = "hello", containerFactory = "stringKafkaListenerContainerFactory")
public void receiveString(String message) {
logger.info("Message : %s" +message);
}
/**
* 注解方式獲取消息頭及消息體
*
* @Payload:獲取的是消息的消息體,也就是發送內容
* @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):獲取發送消息的key
* @Header(KafkaHeaders.RECEIVED_PARTITION_ID):獲取當前消息是從哪個分區中監聽到的
* @Header(KafkaHeaders.RECEIVED_TOPIC):獲取監聽的TopicName
* @Header(KafkaHeaders.RECEIVED_TIMESTAMP):獲取時間戳
*
*/
//@KafkaListener(topics = "hello", containerFactory = "stringKafkaListenerContainerFactory")
public void receive(@Payload String message,
//@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
logger.info("topic : " +topic);
logger.info("partition : " +partition);
//logger.info("key : " +key.toString());
logger.info("TIMESTAMP : " +ts);
logger.info("message : " +message);
}
/**
* 指定消費分區和初始偏移量
*
* @TopicPartition:topic--需要監聽的Topic的名稱,partitions --需要監聽Topic的分區id,partitionOffsets --可以設置從某個偏移量開始監聽
* @PartitionOffset:partition --分區Id,非數組,initialOffset --初始偏移量
*
*/
@KafkaListener(containerFactory = "stringKafkaListenerContainerFactory",
topicPartitions = @TopicPartition(topic = "hello", partitionOffsets = @PartitionOffset(partition = "0" , initialOffset = "2")))
public void receiveFromBegin(@Payload String payload,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(String.format("Read all message from partition %d : %s", partition, payload));
}
/**
* ConsumerRecord 接收
*
* @param record
*/
//@KafkaListener(topics = "hello", containerFactory = "stringKafkaListenerContainerFactory")
public void receive(ConsumerRecord<?, ?> record) {
System.out.println("Message is :" + record.toString());
}
}
批量消費
開啟批量消費需要3步
1、消費者設置 max.poll.records/
2、消費者 開啟批量消費 factory.setBatchListener(true);
3、消費者批量接收 public void consumerBatch(List<ConsumerRecord> records)
javaConfig
@Configuration
@EnableKafka
public class BatchConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* 多線程-批量消費
* @return
*/
@Bean
public KafkaListenerContainerFactory<?> batchFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 控制多線程消費,並發數(如果topic有3各分區。設置成3,並發數就是3個線程,加快消費), 不設置setConcurrency就會變成單線程配置, MAX_POLL_RECORDS_CONFIG也會失效,接收的消息列表也不會是ConsumerRecord
factory.setConcurrency(10);
// poll超時時間
factory.getContainerProperties().setPollTimeout(1500);
// 控制批量消費
// 設置為批量消費,每個批次數量在Kafka配置參數中設置(max.poll.records)
factory.setBatchListener(true);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
/**
* 消費者配置
* @return
*/
public Map<String, Object> consumerConfigs() {
Map<String, Object> configProps = new HashMap<>();
// 不用指定全部的broker,它將自動發現集群中的其余的borker, 最好指定多個,萬一有服務器故障
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// key序列化方式
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// value序列化方式
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// GroupID
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
// 批量消費消息數量
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
// 自動提交偏移量
// 如果設置成true,偏移量由auto.commit.interval.ms控制自動提交的頻率
// 如果設置成false,不需要定時的提交offset,可以自己控制offset,當消息認為已消費過了,這個時候再去提交它們的偏移量。
// 這個很有用的,當消費的消息結合了一些處理邏輯,這個消息就不應該認為是已經消費的,直到它完成了整個處理。
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自動提交的頻率
configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// Session超時設置
configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
// 該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下該作何處理:
// latest(默認值)在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之后生成的記錄)
// earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區的記錄
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return configProps;
}
}
消費者 BatchConsumer
@Component
@Slf4j
public class BatchConsumer {
/**
* 批量消息
* @param records
*/
@KafkaListener(topics = "hello", containerFactory="batchFactory")
public void consumerBatch(List<ConsumerRecord<?, ?>> records){
log.info("接收到消息數量:{}",records.size());
for(ConsumerRecord record: records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
log.info("Received: " + record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
System.out.println("接收到消息:" + message);
}
}
}
}
參考博客:https://blog.csdn.net/yy756127197/article/details/103895413