Kafka 生產者和消費者入門代碼基礎


這篇博文講解Kafka 的生產者和消費者實例。

基礎版本一

生產者

ProducerFastStart.java
package com.xingyun.tutorial_1;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerFastStart {

    public static final String brokerList="192.168.10.137:9092";

    // public static final String topic="topic-demo";
    public static final String topic="ODS-PSR-P.*";

    public static void main(String args[]){

       //配置生產者客戶端參數
        //將配置序列化
        Properties properties=new Properties();
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers",brokerList);

        //創建KafkaProducer 實例
        KafkaProducer<String,String> producer=new KafkaProducer<String, String>(properties);

        //構建待發送的消息
        ProducerRecord<String,String> record=new ProducerRecord<String, String>(topic,"hello Kafka!");
        try {
            //嘗試發送消息
            producer.send(record);
            //打印發送成功
            System.out.println("send success from producer");
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //關閉生產者客戶端實例
            producer.close();
        }
    }
}
消費者
ConsumerFastStart.java
package com.xingyun.tutorial_1;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerFastStart {

  //  public static final String brokerList="192.168.10.137:9092";
    public static final String brokerList="10.221.148.217:9092 ,10.221.148.217:9093 ,10.221.148.217:9094";

   // public static final String topic="topic-demo";
    public static final String topic="ODS-PSR-P.*";


    public static final String groupId="group.demo";

    public static void main(String args[]){

        //設置消費組的名稱
        //將屬性值反序列化
        Properties properties=new Properties();
        properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("bootstrap.servers",brokerList);
        properties.put("group.id",groupId);

        //創建一個消費者客戶端實例
        KafkaConsumer<String,String> consumer=new KafkaConsumer<>(properties);

        //訂閱主題
        consumer.subscribe(Collections.singletonList(topic));

        //循環消費消息
        while (true){
            ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String,String> record:records){
                System.out.println("receiver a message from consumer client:"+record.value());
            }
        }
    }
}

升級版本一:
生產者

KafkaProducerAnalysis.java
package com.xingyun.tutorial_2;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerAnalysis {

    public static final String brokerList="192.168.10.137:9092";

    public static final String topic="topic-demo";

    public static void main(String args[]){

        //配置生產者客戶端參數
        Properties properties=initConfig();

        //創建相應的生產者實例
        KafkaProducer<String,String> producer=new KafkaProducer<>(properties);

        //構建待發送的消息  topic 和value 是必填項
        ProducerRecord<String,String> record=new ProducerRecord<>(topic,"hello,Kafka!");

        try {
            //發送消息
            producer.send(record);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //關閉生產者實例
            producer.close();
        }

    }


    /**
     * 配置生產者客戶端參數
     * */
    private static Properties initConfig() {
        Properties properties=new Properties();
       // properties.put("bootstrap.servers",brokerList);
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        // properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //properties.put("client.id","producer.client.id.demo");//指定客戶端ID
        properties.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.demo");//指定客戶端ID
        return properties;
    }


}
消費者
KafkaConsumerAnalysis.java
package com.xingyun.tutorial_2;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class KafkaConsumerAnalysis {

    public static final String brokerList="";
    public static final String topic="topic-demo";
    public static final String groupId="group.demo";

    public static final AtomicBoolean isRunning=new AtomicBoolean(true);

    public static void main(String[] args){

        //配置消費者客戶端參數
        Properties properties=initConfig();

        //創建相應的消費者實例
        KafkaConsumer<String,String> consumer=new KafkaConsumer<>(properties);

        //訂閱主題
        consumer.subscribe(Arrays.asList(topic));

        try {
            //拉取消息並消費
            while(isRunning.get()){

                ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String,String> record:records){

                   System.out.println("topic="+record.topic()+",partition="+record.partition()+",offset="+record.offset());
                   System.out.println("key="+record.key()+",value="+record.value());
                   //do something to processor record.
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    }


    /**
     * 配置消費者客戶端參數
     * */
    private static Properties initConfig() {
        Properties properties=new Properties();

//        properties.put("key.deserializer","org.apache.kafka.common.serialization.Deserializer");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class.getName());

//        properties.put("value.deserializer","org.apache.kafka.common.serialization.Deserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,Deserializer.class.getName());

        //指定連接Kafka集群所需的broker地址清單,中間用逗號隔開,默認值為""
//        properties.put("bootstrap.server",brokerList);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);

        //消費者所屬的消費組的名稱,默認值為""
//        properties.put("group.id",groupId);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);

//        properties.put("client.id","consumer.client.id.demo");
        //指定消費者客戶端Id,如果不設置,則自動生成consumer-1,consumer-2
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"consumer.client.id.demo");

        return properties;
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM