Kafka Java API+自定義分區


kafka的API

第一步:導入kafka的開發jar包

 


<dependencies>

<!--  

  <dependency>

       <groupId>org.apache.kafka</groupId>

       <artifactId>kafka-clients</artifactId>

       <version>0.11.0.1</version>

   </dependency>

-->

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->

<dependency>

    <groupId>org.apache.kafka</groupId>

    <artifactId>kafka-clients</artifactId>

    <version>1.0.0</version>

</dependency>

  </dependencies>


 

 

Kafka生產者


@Test

   public void kafkaProducer() throws Exception {

      //1、准備配置文件

       Properties props = new Properties();

       props.put("bootstrap.servers", "hadoop-001:9092,hadoop-002:9092,hadoop-003:9092");

       props.put("acks", "all");

       props.put("retries", 0);

       props.put("batch.size", 16384);

       props.put("linger.ms", 1);

       props.put("buffer.memory", 33554432);

       props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

       //2、創建KafkaProducer

       KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);

       for (int i=0;i<100;i++){

           //3、發送數據

           kafkaProducer.send(new ProducerRecord<String, String>("yun01","num"+i,"value"+i));

       }

 

      kafkaProducer.close();

   }


 

 

Kafka消費者


@Test

   public void kafkaConsum() throws Exception {

        // 1、准備配置文件

       Properties props = new Properties();

       props.put("bootstrap.servers", "hadoop-001:9092,hadoop-002:9092,hadoop-003:9092");

       props.put("group.id", "test");

       props.put("enable.auto.commit", "true");

       props.put("auto.commit.interval.ms", "1000");

       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

 

       // 2、創建KafkaConsumer

       KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);

       // 3、訂閱數據,這里的topic可以是多個

       kafkaConsumer.subscribe(Arrays.asList("yun01"));

       // 4、獲取數據

       while (true) {

           ConsumerRecords<String, String> records = kafkaConsumer.poll(100);

           for (ConsumerRecord<String, String> record : records) {

               System.out.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value());

           }

 

       }

   }

 


 

 

 

kafka的自定義分區

第一種方式:直接指定分區


kafkaProducer.send(new ProducerRecord<String, String>("testpart",1,"0","value"+i));

 

第二種自定義分區

public class KafkaCustomPartitioner implements Partitioner {

   @Override

   public void configure(Map<String, ?> configs) {

   }

 

   @Override

   public int partition(String topic, Object arg1, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) {

      List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

       int partitionNum = partitions.size();

      Random random = new Random();

      int partition = random.nextInt(partitionNum);

       return partition;

   }

 

   @Override

   public void close() {

     

   }

 

}


 

 

 

主代碼中添加配置

 


@Test

   public void kafkaProducer() throws Exception {

      //1、准備配置文件

       Properties props = new Properties();

       props.put("bootstrap.servers", "hadoop-001:9092,hadoop-002:9092,hadoop-003:9092");

       props.put("acks", "all");

       props.put("retries", 0);

       props.put("batch.size", 16384);

       props.put("linger.ms", 1);

       props.put("buffer.memory", 33554432);

       props.put("partitioner.class", "com.gec.kafkaclient.MyCustomerPartitons");

       props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

       //2、創建KafkaProducer

       KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);

       for (int i=0;i<100;i++){

           //3、發送數據

           kafkaProducer.send(new ProducerRecord<String, String>("testpart","0","value"+i));

       }

 

      kafkaProducer.close();

   }


 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM