Java-API+Kafka實現自定義分區


目錄章節:

  1.pom.xml導入kafka依賴包;

  2.kafka普通生產者實現方式;

  3.kafka帶回調函數的生產者;

  4.生產者自定義分區;

     4.1使用自定義分區

1.pom.xml導入kafka依賴包:

<!--kafka依賴-->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.11.0.0</version>
    </dependency>

PS:kafkaProducer發送數據流程及ACK、重復消費與數據丟失問題:

1.Kafka 的 Producer 發送消息采用的是 異步發送的方式。在消息發送的過程中,涉及到了兩個線程 ——main 線程和Sender線程,以及 一個線程共享變量 ——RecordAccumulator。main 線程將消息發送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取
消息發送到 Kafka broker。
2.異步和ack並不沖突,生產者一直發送數據,不等應答,如果某條數據遲遲沒有應答,生產者會再發一次;
3.acks: -1 代表所有處於isr列表中的follower partition都會同步寫入消息成功 0 代表消息只要發送出去就行,其他不管 1 代表發送消息到leader partition寫入成功就可以;
4.重復消費與數據丟失:
  說明: 已經消費的數據對於kafka來說,會將消費組里面的offset值進行修改,那什么時候進行修改了?是在數據消費 完成之后,比如在控制台打印完后自動提交;
      提交過程:是通過kafka將offset進行移動到下個message所處的offset的位置。拿到數據后,存儲到hbase中或者mysql中,如果hbase或者mysql在這個時候連接不上,就會拋出異常,如果在處理數據的時候已經進行了提交,
      那么kafka上的offset值已經進行了修改了,但是hbase或者mysql中沒有數據,這個時候就會出現數據丟失。什么時候提交offset值?在Consumer將數據處理完成之后,再來進行offset的修改提交。默認情況下offset是 自動提交,
     需要修改為手動提交offset值。如果在處理代碼中正常處理了,但是在提交offset請求的時候,沒有連接到kafka或者出現了故障,那么該次修 改offset的請求是失敗的,那么下次在進行讀取同一個分區中的數據時,會從已經處理掉的offset值再進行處理一 次,
      那么在hbase中或者mysql中就會產生兩條一樣的數據,也就是數據重復。

PS:數據來源:

/**
     * 獲取數據庫數據
     * @param
     * @return
     * @throws SQLException
     */
    public static List<KafKaMyImage> getKafKaMyImages() throws SQLException {
        List<KafKaMyImage> kafKaMyImages=new ArrayList<>(); KafKaMyImage kafKaMyImage=null; String sql="select id,loginip,updatetime,username,loginaddr from adminlogin"; Connection conection = SingleJavaJDBC.getConection(); PreparedStatement preparedStatement = conection.prepareStatement(sql); ResultSet resultSet = preparedStatement.executeQuery(); while (resultSet.next()){ kafKaMyImage=new KafKaMyImage(Integer.parseInt(resultSet.getString("id")), resultSet.getString("loginip"), resultSet.getString("updatetime"), resultSet.getString("username"), resultSet.getString("loginaddr")); kafKaMyImages.add(kafKaMyImage); } // SingleJavaJDBC.close(resultSet,preparedStatement,conection); return kafKaMyImages; } }

 

2.kafka普通生產者實現方式:

public void producerOne() {
 2         Properties props = new Properties(); 3 // Kafka服務端的主機名和端口號 4 props.put("bootstrap.servers", "hadoop01:9092"); 5 // 所有副本都必須應答后再發送 6 props.put("acks", "all"); 7 // 發送失敗后,再重復發送的次數 8 props.put("retries", 0); 9 // 一批消息處理大小 10 props.put("batch.size", 16384); 11 // 請求時間間隔 12 props.put("linger.ms", 1); 13 // 發送緩存區內存大小 14 props.put("buffer.memory", 33554432); 15 // key序列化 16 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 17 // value序列化 18 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 19 //2.定義kafka生產者 20 Producer<String, String> producer = new KafkaProducer<>(props); 21 //3.發送消息 22 for (int i = 0; i < 5; i++) { 23 //top,指定分區,數據 24 //("second",0,key,"");指定分區 25 //("second",key,"");指定key,根據key分區 26 //("second","");不指定,隨機分區,輪詢 27 producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))); 28 } 29 producer.close(); 30 }

3.kafka帶回調函數的生產者:

 /**
     * 創建生產者帶回調函數02
     * @throws SQLException
     */
    public static void producerThree() throws SQLException{
        //step1 配置參數,這些跟優化kafka性能有關系
        Properties props=new Properties(); // props.put("partitioner.class","com.comment.kafka.demo.producer.MyPartitioner"); //1 連接broker props.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092"); //2 key和value序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //3 acks // -1 代表所有處於isr列表中的follower partition都會同步寫入消息成功 // 0 代表消息只要發送出去就行,其他不管 // 1 代表發送消息到leader partition寫入成功就可以 props.put("acks","-1"); //4 重試次數 props.put("retries",3);//大部分問題,設置這個就可以解決,生產環境可以設置多些 5-10次 // 5 隔多久重試一次 props.put("retry.backoff.ms",2000); //6 如果要提升kafka的吞吐量,可以指定壓縮類型,如lz4 props.put("compression.type","none"); //7 緩沖區大小,默認是32M props.put("buffer.size",33554432); //8 一個批次batch的大小,默認是16k,需要根據一條消息的大小去調整 props.put("batch.size",323840);//設置為32k //9 如果一個batch沒滿,達到如下的時間也會發送出去 props.put("linger.ms",200); //10 一條消息最大的大小,默認是1M,生產環境中一般會修改變大,否則會報錯 props.put("max.request.size",1048576); //11 一條消息發送出去后,多久還沒收到響應,就認為是超時 props.put("request.timeout.ms",5000); //step2 創建生產者對象 KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props); //step3 使用消息的封裝形式,注意value一般是json格式 List<KafKaMyImage> kafKaMyImages = getKafKaMyImages(); for (int i = 0; i < kafKaMyImages.size(); i++) { //step4 調用生產者對象的send方法發送消息,有異步和同步兩種選擇 //1 異步發送,一般使用異步,發送后會執行一個回調函數 //top,指定分區,數據 KafKaMyImage kafKaMyImage = kafKaMyImages.get(i); JSONObject jsonObject = JSONObject.fromObject(kafKaMyImage); producer.send(new ProducerRecord<String, String>("topicC","0",jsonObject.toString()), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { //判斷是否有異常 if(exception==null){ System.out.println("消息發送到分區"+metadata.partition()+"成功"); }else{ System.out.println("消息發送失敗"); // TODO 可以寫入到redis,或mysql  } } }); } try { Thread.sleep(10*1000); } catch (InterruptedException e) { e.printStackTrace(); } //2 同步發送,需要等待一條消息發送完成,才能發送下一條消息 //RecordMetadata recordMetadata = producer.send(record).get(); //System.out.println("發送到的分區是:"+recordMetadata.partition()); //step5 關閉連接  producer.close(); }

 

 4.生產者自定義分區:

Kafka自定義分區需要實現Partitioner類,這里實現的是根據某個字段的值把數據寫入相應分區

package com.comment.kafka.demo.producer;

import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import java.util.List; import java.util.Map; /** * @className: MyPartitioner * @description: TODO 類描述 * @author: 東林 * @date: 2022/2/26 **/ public class MyPartitioner implements Partitioner { /** * 主要重寫這個方法,假設有topic country三個分區,producer將key為china、usa和korea的消息分開存儲到不同的分區,否則都放到0號分區 * @param topic 要使用自定義分區的topic * @param key 消息key * @param keyBytes 消息key序列化字節數組 * @param value 消息value * @param valueBytes 消息value序列化字節數組 * @param cluster 集群元信息 * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { int partitions=0; String keyStr=(String) key; //獲取分區信息 List<PartitionInfo> partitionInfoList=cluster.availablePartitionsForTopic(topic); //獲取當前topic的分區數 int partitionInfoListSize=partitionInfoList.size(); //判斷是否有三個分區 if(partitionInfoListSize==3){ switch (Integer.parseInt(keyStr)){ case 1: partitions=0; break; case 0: partitions=1; break; default: partitions=2; break; } } //返回分區序號 return partitions; } @Override public void close() {} /** * 文件加載時 * @param map */ @Override public void configure(Map<String, ?> map) {} }

 4.1使用自定義分區

public static void producerPartition() throws SQLException {
        //step1 配置參數,這些跟優化kafka性能有關系
        Properties props=new Properties(); //1 連接broker props.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092"); //2 key和value序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //3 acks // -1 代表所有處於isr列表中的follower partition都會同步寫入消息成功 // 0 代表消息只要發送出去就行,其他不管 // 1 代表發送消息到leader partition寫入成功就可以 props.put("acks","-1"); //4 重試次數 props.put("retries",3);//大部分問題,設置這個就可以解決,生產環境可以設置多些 5-10次 // 5 隔多久重試一次 props.put("retry.backoff.ms",2000); //6 如果要提升kafka的吞吐量,可以指定壓縮類型,如lz4 props.put("compression.type","none"); //7 緩沖區大小,默認是32M props.put("buffer.size",33554432); //8 一個批次batch的大小,默認是16k,需要根據一條消息的大小去調整 props.put("batch.size",323840);//設置為32k //9 如果一個batch沒滿,達到如下的時間也會發送出去 props.put("linger.ms",200); //10 一條消息最大的大小,默認是1M,生產環境中一般會修改變大,否則會報錯 props.put("max.request.size",1048576); //11 一條消息發送出去后,多久還沒收到響應,就認為是超時 props.put("request.timeout.ms",5000); //12 使用自定義分區器 props.put("partitioner.class","com.comment.kafka.demo.producer.MyPartitioner");

     //step2 創建生產者對象 KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props); //step3 使用消息的封裝形式,注意value一般是json格式 List<KafKaMyImage> kafKaMyImages = getKafKaMyImages(); for (int i = 0; i < kafKaMyImages.size(); i++) { //step4 調用生產者對象的send方法發送消息,有異步和同步兩種選擇 //1 異步發送,一般使用異步,發送后會執行一個回調函數 //top,指定分區,數據 KafKaMyImage kafKaMyImage = kafKaMyImages.get(i); JSONObject jsonObject = JSONObject.fromObject(kafKaMyImage); producer.send(new ProducerRecord<String, String>("topicD",kafKaMyImages.get(i).getIsdel(),jsonObject.toString()), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { //判斷是否有異常 if(exception==null){ System.out.println("消息發送到分區"+metadata.partition()+"成功"); }else{ System.out.println("消息發送失敗"); // TODO 可以寫入到redis,或mysql } } }); } try { Thread.sleep(10*1000); } catch (InterruptedException e) { e.printStackTrace(); } //2 同步發送,需要等待一條消息發送完成,才能發送下一條消息 //RecordMetadata recordMetadata = producer.send(record).get(); //System.out.println("發送到的分區是:"+recordMetadata.partition()); producer.flush(); //step5 關閉連接 producer.close(); }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM