java版本的Kafka消息寫入與讀取


安裝zookeeper:  https://www.cnblogs.com/guoyansi19900907/p/9954864.html

並啟動zookeeper

安裝kafka https://www.cnblogs.com/guoyansi19900907/p/9961143.html

並啟動kafka.

1.創建maven  java項目

2.添加依賴

 <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>

3.創建生產者:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerDemo {
    public static void main(String[] args) {
        Properties kafkaProps=new Properties();
        /**
         * acks指定了必須要有多少個分區副本收到消息,生產者才會認為消息寫入成功。
         * acks=0,生產者在寫入消息之前不會等待任何來自服務器的響應;就算發送失敗了,生產者也不知道。
         * acks=1,只要集群首領收到消息,生產者就會收到一個來自服務器的成功消息
         * acks=all,所有參與復制的節點都收到消息,生產者才會收到一個來自服務器的成功響應。
         */
        kafkaProps.put("acks", "all");
        /**
         * 發送失敗后重發的次數,最終還不成功表示發送徹底的失敗
         */
        kafkaProps.put("retries", 0);
        /**
         * 默認情況下,消息發送時不會被壓縮。
         * snappy:壓縮算法由Google發明,它占用較少的CPU,卻能提供較好的性能和相當可觀的壓縮比
         * gzip:占用較多的CPU,但是提供更高的壓縮比,帶寬比較有限,可以考慮這個壓縮算法。
         * 使用壓縮可以降低網絡傳輸開銷和存儲開銷,而這往往時向kafka發送消息的瓶頸
         */
        kafkaProps.put("compression.type", "snappy");
        /**
         * 一個批次可以使用的內存大小;當批次被填滿,批次里的所有消息會被發送;不過生產者並不一定等批次被填滿才發送;
         * 所以批次大小設置得很大,也不會造成延遲,只是會占用更多得內存而已。但是設置得太小,
         * 因為生產者需要更頻繁的發送消息,會增加額外的開銷。
         */
        kafkaProps.put("batch.size", 100);
        /**
         * 指定了生產者在發送批次之前等待更多消息加入批次的時間。
         * KafkaProducer會在批次填滿或liner.ms達到上限時把批次發送出去。
         * 這樣做雖然會出現一些延時,但是會提高吞吐量。
         */
        kafkaProps.put("linger.ms", 1);
        /**
         * 生產者內存緩沖區的大小,生產者用它緩沖要發送到服務器的消息。
         * 如果應用程序發送消息的速度超過發送到服務器的速度,會導致生產者空間不足,
         * 這個時候send()方法要么被阻塞,要么拋出異常。
         */
        kafkaProps.put("buffer.memory", 33554432);
        /**
         * 生產者在收到服務器響應之前可以發送多少個消息。
         * 值越高就會占用越多的內存,不過也會提升吞吐量。
         * 設為1可以保證消息是按照發送順序填寫入服務器的,即使發生了重試。
         */
        kafkaProps.put("max.in.flight.requests.per.connection", 1);
        //kafkaProps.put("bootstrap.servers","192.168.123.128:9092,192.168.123.129:9092,192.168.123.130:9092");
        //主機信息(broker)
        kafkaProps.put("bootstrap.servers","192.168.123.128:9092");
        //鍵為字符串類型
        kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        //值為字符串類型
        kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");


        Producer<String,String> producer=new KafkaProducer<String, String>(kafkaProps);
        String msg = "abc";
        producer.send(new ProducerRecord<String, String>("guo", msg));
        System.out.println("Sent:" + msg);
        producer.close();
    }
}

 

4.創建消費者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemo {

    public static void main(String[] args)  throws Exception{
        Properties properties=new Properties();
        //主機信息
        properties.put("bootstrap.servers","192.168.123.128:9092");
        //群組id
        properties.put("group.id", "group-1");
        /**
         *消費者是否自動提交偏移量,默認是true
         * 為了經量避免重復數據和數據丟失,可以把它設為true,
         * 由自己控制核實提交偏移量。
         * 如果設置為true,可以通過auto.commit.interval.ms屬性來設置提交頻率
         */
        properties.put("enable.auto.commit", "true");
        /**
         * 自動提交偏移量的提交頻率
         */
        properties.put("auto.commit.interval.ms", "1000");
        /**
         * 默認值latest.
         * latest:在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據
         * erliest:偏移量無效的情況下,消費者將從起始位置讀取分區的記錄。
         */
        properties.put("auto.offset.reset", "earliest");
        /**
         * 消費者在指定的時間內沒有發送心跳給群組協調器,就被認為已經死亡,
         * 協調器就會觸發再均衡,把它的分區分配給其他消費者。
         */
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        /**
         * 訂閱主題,這個地方只傳了一個主題:gys.
         * 這個地方也可以有正則表達式。
         */
        kafkaConsumer.subscribe(Arrays.asList("guo"));
        //無限循環輪詢
        while (true) {
            /**
             * 消費者必須持續對Kafka進行輪詢,否則會被認為已經死亡,他的分區會被移交給群組里的其他消費者。
             * poll返回一個記錄列表,每個記錄包含了記錄所屬主題的信息,
             * 記錄所在分區的信息,記錄在分區里的偏移量,以及鍵值對。
             * poll需要一個指定的超時參數,指定了方法在多久后可以返回。
             * 發送心跳的頻率,告訴群組協調器自己還活着。
             */
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                //Thread.sleep(1000);
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                System.out.println();
            }
        }
    }
}

 

 

5.先運行消費者,然后會出現一個監聽的控制台,運行生產者。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM