本次的記錄內容包括:
1.Java調用生產者APi流程
2.Kafka生產者Api的使用及說明
3.Kafka消費者Api的使用及說明
4.Kafka消費者自動提交Offset和手動提交Offset
5.自定義生產者的攔截器,分區器
那么接下來我就帶大家熟悉以上Kafka的知識說明
1.Java調用生產者APi流程
首先上一張從網上找的簡單的圖,來描述一下生產者的生產流程。這里這個的圖描述的不是非常精確,稍微有點問題的地方就是省略了攔截器內容,這塊的內容在實際場景中也經常使用

那么從圖中我們可以看到。生產者通過調用api的Send方法開始進行一些列生產控制操作,首先進入的是一個叫序列化器的處理結構(這里就先按圖來講了--實際第一步會先經過攔截器),那么這一步主要的操作就是序列化相關數據,保證數據傳輸的穩定准確性,個人理解需要序列化的原因是因為kafka是磁盤文件寫消息,序列化后悔經過分區器,主要就是我們上篇講過的關於如何生產消息分區的策略,主要有三種,1.指定分區,2根據key的hash取余有效分區數分區,3初始化整數,輪訓分區。具體細節請參考上一篇文章(https://www.cnblogs.com/hnusthuyanhua/p/12355216.html)。經過分區后消息將會發送到指定的分區供消費者消費。
那么從圖中我們還可以看到有一個RecordMetaData的存在,這又是干什么的呢?這里就又設計到另一個知識點了。由於在網上未找打相關描述圖,我這里就粗略說明一下
大致的kafka生產者程序一般是有兩類線程進行,一個是主線程,另一個是生產消息的線程,他們質檢有一個RecoderMetaData作為消息存儲緩存,同時也是線程共享變量,當主線程不斷生產消息,本質上就是不斷累積RecoderMetaData的緩存值,當緩存值達到限定時,生產者線程開始講數據發送至kafka.。那么kafka生產者的一個流程大概就是這樣了
2.Kafka生產者Api的使用及說明
大致流程:配置kafka property信息---構建生產者---構建消息---發送消息---關閉資源
@Slf4j
public class KafkaProduce {
public static void main(String[] args) {
Properties properties = new Properties();
//第一步 初始化化kafka服務配置Properties--具體配置可以抽到實際的Property配置文件
//設備地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.24.1.77:9092");
//ack
properties.put(ProducerConfig.ACKS_CONFIG, "all");
//序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//構建生產者
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
for(int i = 0;i< 100;i++)
{
String msg = "------Message " + i;
//構建生產記錄
//第一種方式指定Toppic
ProducerRecord<String,String> producerRecord=new ProducerRecord<String, String>("kafkatest",msg);
//send方法分為有返回值和無返回值兩種
// 無返回值簡單發送消息
//producer.send(producerRecord);
//有返回值的在發送消息確認后返回一個Callback
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
//發送數據返回兩個東西--一個是返回結果 一個是異常 異常為空時即發送操作正常
if (e==null){
//返回結果中可獲取此條消息的相關分區信息
System.out.println(recordMetadata.offset()+recordMetadata.partition()+recordMetadata.topic());
}
}
}
});
log.info("kafka生產者發送消息{}",msg);
}
producer.close();
}
}
kafka分區策略方法說明:

3.Kafka消費者Api的使用及說明
大致流程:配置kafka property信息---構建消費者---訂閱主題--消費消息
@Slf4j
public class KafkaConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
//第一步 初始化化kafka服務配置Properties--具體配置可以抽到實際的Property配置文件
//設備地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.24.1.77:9092");
//反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
//offset自動提交
//properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//重置offset---當團體名發生改變時且消費者保存的初始offset未過期時,消費者會從頭消費
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
//初始化消費者
Consumer consumer=new KafkaConsumer(properties);
//初始化消費者訂閱主題
consumer.subscribe(Arrays.asList("kafkatest"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
//消費完按自動提交時間自動提交消費Offset
log.info("kafka消費者消費分區:{}-消息內容:{}",record.partition(),record.value());
}
//異步提交-即消費某條數據時發送offset更新,但消費繼續運行 不等待提交完成 效率較高 但當消費者異常掛掉時容
//易造成消費重復
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
//如果失敗E不為null 失敗的話E為Null
//對於需要絕對保證消息不丟失的 可在此處重新進行消費提交
}
});
//同步提交-即消費一條數據提交一次offset更新,消費必須等待offset更新完才可繼續運行。通常來講此方法可盡可能
//的減少數據丟失 但效率較低
//consumer.commitSync();
}
}
}
4.Kafka消費者自動提交Offset和手動提交Offset
自動提交:即消費者消費后自己提交消費offset標記去kafka更新信息,那么通常是通過時間來控制的,比如每10秒更新一次本地的offset到kafka, 缺點:實際應用場景中難以控制時間,太短容易造成數據丟失(offset已經更新 消費者還沒消費完就掛了),太長容易導致數據重復(offset還未更新,消費者掛了重新從kafka拉取之前的offset).
手動提交:消費完成后自行提交offset,根據同步情況分為兩種方式,syn提交(提交時相當於阻塞主線程,等offset提交完成后方可繼續進行)和asyn提交(異步提交),大致流程:配置kafka property配置文件,將配置文件中的自動提交關閉。--構建消費者訂閱主題並消費--消費完成后手動提交offset. 缺點:同樣還是會有上面自動提交的數據重復問題。但減少了數據丟失的可能性。
5.自定義生產者的攔截器,分區器
@Slf4j
public class KafkaFilter implements ProducerInterceptor {
public static int i=0;
@Override
public ProducerRecord onSend(ProducerRecord producerRecord) {
/**
* 發送消息的方法 可對消息進行處理 比如加時間戳啥的
*/
log.info("{}:{}",producerRecord.topic(),producerRecord.partition(),producerRecord.value());
return producerRecord;
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
/**
* ack標記回調方法,有點類型Callback回調的方法
* 可在這統計一下成功發送的條數和失敗發送的條數
*/
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
public class KafkaPartion implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
/**
* 自定義分區 可通過該接口的默認分區器進行參考 默認為根據訂閱的主題來分區方式
*/
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
6.消費者如何消費歷史數據
大致流程:配置kafka property信息,開啟AutoOffset配置---構建消費者---訂閱主題--消費消息
那么每次開啟消費者如果想從頭開始消費,需要滿足以下條件之一:1.消費者的組名改變 2.消費者的初始offset未過期
相關參考文章:
https://www.jianshu.com/p/1f9e18e926f6
kafka消費者監聽方式
https://www.jianshu.com/p/a64defb44a23
