Java調用Kafka生產者,消費者Api及相關配置說明


本次的記錄內容包括:

1.Java調用生產者APi流程

2.Kafka生產者Api的使用及說明

3.Kafka消費者Api的使用及說明

4.Kafka消費者自動提交Offset和手動提交Offset

5.自定義生產者的攔截器,分區器

那么接下來我就帶大家熟悉以上Kafka的知識說明

1.Java調用生產者APi流程

首先上一張從網上找的簡單的圖,來描述一下生產者的生產流程。這里這個的圖描述的不是非常精確,稍微有點問題的地方就是省略了攔截器內容,這塊的內容在實際場景中也經常使用

 

 

 那么從圖中我們可以看到。生產者通過調用api的Send方法開始進行一些列生產控制操作,首先進入的是一個叫序列化器的處理結構(這里就先按圖來講了--實際第一步會先經過攔截器),那么這一步主要的操作就是序列化相關數據,保證數據傳輸的穩定准確性,個人理解需要序列化的原因是因為kafka是磁盤文件寫消息,序列化后悔經過分區器,主要就是我們上篇講過的關於如何生產消息分區的策略,主要有三種,1.指定分區,2根據key的hash取余有效分區數分區,3初始化整數,輪訓分區。具體細節請參考上一篇文章(https://www.cnblogs.com/hnusthuyanhua/p/12355216.html)。經過分區后消息將會發送到指定的分區供消費者消費。

那么從圖中我們還可以看到有一個RecordMetaData的存在,這又是干什么的呢?這里就又設計到另一個知識點了。由於在網上未找打相關描述圖,我這里就粗略說明一下

大致的kafka生產者程序一般是有兩類線程進行,一個是主線程,另一個是生產消息的線程,他們質檢有一個RecoderMetaData作為消息存儲緩存,同時也是線程共享變量,當主線程不斷生產消息,本質上就是不斷累積RecoderMetaData的緩存值,當緩存值達到限定時,生產者線程開始講數據發送至kafka.。那么kafka生產者的一個流程大概就是這樣了

2.Kafka生產者Api的使用及說明

大致流程:配置kafka property信息---構建生產者---構建消息---發送消息---關閉資源

@Slf4j
public class KafkaProduce {
public static void main(String[] args) {
Properties properties = new Properties();
//第一步 初始化化kafka服務配置Properties--具體配置可以抽到實際的Property配置文件
//設備地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.24.1.77:9092");
//ack
properties.put(ProducerConfig.ACKS_CONFIG, "all");
//序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//構建生產者
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
for(int i = 0;i< 100;i++)
{
String msg = "------Message " + i;
//構建生產記錄
//第一種方式指定Toppic
ProducerRecord<String,String> producerRecord=new ProducerRecord<String, String>("kafkatest",msg);
//send方法分為有返回值和無返回值兩種
// 無返回值簡單發送消息
//producer.send(producerRecord);
//有返回值的在發送消息確認后返回一個Callback
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
//發送數據返回兩個東西--一個是返回結果 一個是異常 異常為空時即發送操作正常
if (e==null){
//返回結果中可獲取此條消息的相關分區信息
System.out.println(recordMetadata.offset()+recordMetadata.partition()+recordMetadata.topic());
}
}
}
});
log.info("kafka生產者發送消息{}",msg);
}
producer.close();
}
}

 

kafka分區策略方法說明:

 

 

3.Kafka消費者Api的使用及說明

大致流程:配置kafka property信息---構建消費者---訂閱主題--消費消息

 

@Slf4j
public class KafkaConsumerTest {

public static void main(String[] args) {
Properties properties = new Properties();
//第一步 初始化化kafka服務配置Properties--具體配置可以抽到實際的Property配置文件
//設備地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.24.1.77:9092");
//反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
//offset自動提交
//properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//重置offset---當團體名發生改變時且消費者保存的初始offset未過期時,消費者會從頭消費
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
//初始化消費者
Consumer consumer=new KafkaConsumer(properties);
//初始化消費者訂閱主題
consumer.subscribe(Arrays.asList("kafkatest"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
//消費完按自動提交時間自動提交消費Offset
log.info("kafka消費者消費分區:{}-消息內容:{}",record.partition(),record.value());
}
//異步提交-即消費某條數據時發送offset更新,但消費繼續運行 不等待提交完成 效率較高 但當消費者異常掛掉時容
//易造成消費重復
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
//如果失敗E不為null 失敗的話E為Null
//對於需要絕對保證消息不丟失的 可在此處重新進行消費提交
}
});
//同步提交-即消費一條數據提交一次offset更新,消費必須等待offset更新完才可繼續運行。通常來講此方法可盡可能
//的減少數據丟失 但效率較低
//consumer.commitSync();
}
}
}

 

4.Kafka消費者自動提交Offset和手動提交Offset

自動提交:即消費者消費后自己提交消費offset標記去kafka更新信息,那么通常是通過時間來控制的,比如每10秒更新一次本地的offset到kafka,  缺點:實際應用場景中難以控制時間,太短容易造成數據丟失(offset已經更新 消費者還沒消費完就掛了),太長容易導致數據重復(offset還未更新,消費者掛了重新從kafka拉取之前的offset).

手動提交:消費完成后自行提交offset,根據同步情況分為兩種方式,syn提交(提交時相當於阻塞主線程,等offset提交完成后方可繼續進行)和asyn提交(異步提交),大致流程:配置kafka property配置文件,將配置文件中的自動提交關閉。--構建消費者訂閱主題並消費--消費完成后手動提交offset.   缺點:同樣還是會有上面自動提交的數據重復問題。但減少了數據丟失的可能性。

 

5.自定義生產者的攔截器,分區器

 

@Slf4j
public class KafkaFilter implements ProducerInterceptor {
    public static int i=0;
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        /**
         * 發送消息的方法 可對消息進行處理  比如加時間戳啥的
         */
        log.info("{}:{}",producerRecord.topic(),producerRecord.partition(),producerRecord.value());
        return producerRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        /**
         * ack標記回調方法,有點類型Callback回調的方法
         * 可在這統計一下成功發送的條數和失敗發送的條數
         */
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

  

public class KafkaPartion  implements Partitioner {
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        /**
         * 自定義分區   可通過該接口的默認分區器進行參考  默認為根據訂閱的主題來分區方式
         */
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

  

6.消費者如何消費歷史數據

大致流程:配置kafka property信息,開啟AutoOffset配置---構建消費者---訂閱主題--消費消息

那么每次開啟消費者如果想從頭開始消費,需要滿足以下條件之一:1.消費者的組名改變 2.消費者的初始offset未過期

 

相關參考文章:

https://www.jianshu.com/p/1f9e18e926f6

kafka消費者監聽方式

https://www.jianshu.com/p/a64defb44a23

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM