kafka2.5.0生產者與消費者,java普通main方法簡單示例,不包含ack機制


重要知識:

kafka生產者是線程安全的 ,不管啟動多少個線程去執行生產者,都是線程安全的。

1)kafka生產者,有3種發送方式:1、發送並忘記;2、同步發送;3、異步發送

生產者。發送方式:1、發送並忘記;

import cn.enjoyedu.config.BusiConst;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

/**
 * @author King老師   
 */
public class HelloKafkaProducer {

    public static void main(String[] args) {
        //TODO 生產者三個屬性必須指定(broker地址清單、key和value的序列化器)
        Properties properties = new Properties();
        properties.put("bootstrap.servers","192.168.2.61:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
        try {
            ProducerRecord<String,String> record;
            try {
                //  發送4條消息
                for(int i=0;i<4;i++){
// 這里的key值為null,所以kafka會根據分區總數把數據負載均衡到每個分區,如果有值,則根據值來判斷存到哪個分區。 record
= new ProducerRecord<String,String>(BusiConst.HELLO_TOPIC, null,"lison"+i);
            // 生產者有3種發送方式:1、發送並忘記;2、同步發送;3、異步發送 producer.send(record); // 此處是 1、發送並忘記 System.out.println(i
+",message is sent"); } } catch (Exception e) { e.printStackTrace(); } } finally { producer.close(); } } }

 重要知識:

如果該topic的分區大於1,那么生產者生產的數據存放到哪個分區,完全取決於key值,比如key=A,那么存到分區0,key=B,那么存到分區1,如果key為null,那么負載均衡存儲到每個分區!

相關資料參考: 《kafka2.5.0分區再均衡監聽器java例子

生產者。發送方式:2、同步發送;

Future<RecordMetadata> future = producer.send(record);
System.out.println("do other sth");
RecordMetadata recordMetadata = future.get();//有可能阻塞在這個位置
if(null!=recordMetadata){
      System.out.println("offset:"+recordMetadata.offset()+"-" +"partition:"+recordMetadata.partition());
}

 

生產者。發送方式:3、異步發送;

ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                    BusiConst.HELLO_TOPIC,"teacher14","deer");
producer.send(record, new Callback() {
      public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(null!=exception){
                        exception.printStackTrace();
                    }
                    if(null!=metadata){
                        System.out.println("offset:"+metadata.offset()+"-"
                                +"partition:"+metadata.partition());
                    }
     }
});

 

2)消費者:

import cn.enjoyedu.config.BusiConst;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @author King老師   
 */
public class HelloKafkaConsumer {

    public static void main(String[] args) {
        /* 消費者三個屬性必須指定(broker地址清單、key和value的反序列化器) */
        Properties properties = new Properties();
        properties.put("bootstrap.servers","192.168.2.61:9092");
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        //  群組並非完全必須. 重要知識:在同一Topic下,相同的groupID消費群組中,只有一個消費者可以拿到數據。
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        try {
            //消費者訂閱主題(可以多個)
            consumer.subscribe(Collections.singletonList(BusiConst.HELLO_TOPIC));
            while(true){
                //TODO 拉取(新版本)
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                for(ConsumerRecord<String, String> record:records){
                    System.out.println(String.format("topic:%s,分區:%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),
                            record.offset(),record.key(),record.value()));
                    // TODO
                }
            }

            //通過另外一個線程 consumer. wakeup()
        } finally {
            consumer.close();
        }
    }
}

 

end.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM