java kafka 生產者消費者 高級API


Java中提供高級的API,相對於低級API(更小的粒度控制消費)使用起來非常方便。

 

一、修改kafka   server.porperties的ip是你kafka服務的ip

listeners=PLAINTEXT://192.168.111.130:9092

 

二、生產者的例子

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerDemo {
    private final Producer<String, String> kafkaProdcer;
    public final static String TOPIC = "JAVA_TOPIC";

    private KafkaProducerDemo() {
        kafkaProdcer = createKafkaProducer();
    }

    private Producer<String, String> createKafkaProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.111.130:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        return kafkaProducer;
    }

    void produce() {
        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            final String key = "key" + i;
            String data = "hello kafka message:" + key;
            kafkaProdcer.send(new ProducerRecord<String, String>(TOPIC, key, data), new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    System.out.println("發送key" + key + "成功");
                }
            });
        }
    }

    public static void main(String[] args) {
        KafkaProducerDemo kafkaProducerDemo = new KafkaProducerDemo();
        kafkaProducerDemo.produce();
    }

}

用properties構造一個Producer的實例,然后調用send方法,傳入數據,還有一個回調函數。

可以看到數據已經進來了。

注意:kafka producer支持同步發送、異步發送、異步發送+回調函數方式。

1、同步方式會按順序發送,打印出來的結果是按發送的順序:

for (int i = 0; i < 1000; i++) {
    RecordMetadata test = producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hello world-" + i)).get();
    System.out.println(test);
}

2、回調函數里面可以對成功或者失敗,分支判斷,進行業務上的進一步處理。甚至可以把失敗的消息存儲下來。

for (int i = 0; i < 10; i++) {
    producer.send(new ProducerRecord<String, String>("test", i + "", "xxx-" + i), new Callback() {
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e != null) {
                e.printStackTrace();
            } else {
                System.out.println("發送成功");
            }
        }
    });
}

注:回調函數里面onCompletion方法其實是阻塞的! 如果進行延時,會逐個執行,不會同時並發跑,但是發送數據任然是異步的。

 

三、消費者例子

 

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerDemo {
    private final KafkaConsumer<String, String> consumer;
    private KafkaConsumerDemo(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.111.130:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<String, String>(props);
    }
    void consume(){
        consumer.subscribe(Arrays.asList(KafkaProducerDemo.TOPIC));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records){
                System.out.println("I'm coming");
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
    public static void main(String[] args) {
        KafkaConsumerDemo kafkaConsumerDemo = new KafkaConsumerDemo();
        kafkaConsumerDemo.consume();
    }
}

正常啟動是看不到東西的, 兩個同時啟動才有。消費者只看接下來有哪些生產者發來新的消息。

props.put("enable.auto.commit", "true");

這個的意思是,消費后自動改變偏移量。如果不添加這個,就會在服務器存的offset開始消費,並且不會改變offset的值。

如果為false, 可以看到不管消費幾次,服務端存儲的始終是offset的值都不會改變,需要手動提交offset。

 

如果想讓consumer從頭開始消費,可以設置:

props.put("auto.offset.reset", "earliest");

這個只對新建的組有效,如果一個組已經消費過,offset的值已經存在服務端了,這樣設置不起作用的,只會從服務端存儲的offset開始消費。不設置默認是latest,就是從最新的開始消費。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM