Kafka入門之生產者消費者測試


目錄:

kafka啟動腳本以及關閉腳本

1. 同一個生產者同一個Topic,兩個相同的消費者相同的Group

2. 同一個生產者同一個Topic,兩個消費者不同Group

 3. 兩個生產者同一個Topic,生產不同的消息,一個消費者

 

運行的前提是有kafka,並啟動kafka,這里我寫了個kafka啟動腳本:

#!/bin/sh
#創建啟動腳本
#啟動zookeeper
/user/kafka_2.11-2.0.0/bin/zookeeper-server-start.sh /user/kafka_2.11-2.0.0/config/zookeeper.properties &
sleep 3 #等3秒后執行

#啟動kafka
/user/kafka_2.11-2.0.0/bin/kafka-server-start.sh /user/kafka_2.11-2.0.0/config/server.properties &

 kafka關閉腳本:

#!/bin/sh
#創建關閉腳本
#關閉kafka
/user/kafka_2.11-2.0.0/bin/kafka-server-stop.sh /user/kafka_2.11-2.0.0/config/server.properties &
sleep 3 #等3秒后執行

#關閉zookeeper
/user/kafka_2.11-2.0.0/bin/zookeeper-server-stop.sh /user/kafka_2.11-2.0.0/config/zookeeper.properties &

 

1. 同一個生產者同一個Topic,兩個相同的消費者相同的Group

新建一個生產者TestKafkaProducer,需要引入kafka的lib中的jar包,主要包括兩個類,如下所示:

kafka生產者:

package com.zc.kafka.producer.main;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * Kafka生產者
 * 先啟動生產者,發送消息到broker,這里簡單發送了10條從0-9的消息,再啟動消費者,控制台輸出如下:
 */
public class SimpleKafkaProducer {

    private static long i = 0;
    
    public void send(String str) {
        // TODO Auto-generated method stub

        Properties props = new Properties();

        //broker地址
        props.put("bootstrap.servers", "localhost:9092");

        //請求時候需要驗證
        props.put("acks", "all");

        //請求失敗時候需要重試
        props.put("retries", 0);

        //內存緩存區大小
        props.put("buffer.memory", 33554432);

        //指定消息key序列化方式
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        //指定消息本身的序列化方式
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        //for (int i = 0; i < 10; i++) {  //i < 10
            // 生產一條消息的時間有點長
            //producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
            //System.out.println(i);
        //}
        // 這里的“test“是topic
        producer.send(new ProducerRecord<>("test", String.valueOf(i), str));
        i++;
        System.out.println("Message sent successfully");
        producer.close();
    }

}

生產數據:

package com.zc.kafka.producer.test;

import com.zc.kafka.producer.main.SimpleKafkaProducer;

public class TestSimpleKafkaProducer {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        long i=0;
        SimpleKafkaProducer skp = new SimpleKafkaProducer();
        while(true) {
            skp.send("Hello: "+ String.valueOf(i));
            i++;
            try {
                Thread.sleep(10000);  //ms
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

}

新建兩個消費者,引入kafka中lib中的jar包,分別是TestKafkaConsumer和TestKafkaConsumer2,他們有一個相同的類,如下所示:

消息消費者:

package com.zc.kafka.consumer.main;

import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 * kafka消費者
 */
public class SimpleKafkaConsumer {

    @SuppressWarnings({ "deprecation", "resource" })
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        //每個消費者分配獨立的組號,這里的“test”是group
        props.put("group.id", "test");

        //如果value合法,則自動提交偏移量
        props.put("enable.auto.commit", "true");

        //設置多久一次更新被消費消息的偏移量
        props.put("auto.commit.interval.ms", "1000");

        //設置會話響應的時間,超過這個時間kafka可以選擇放棄消費或者消費下一條消息
        props.put("session.timeout.ms", "30000");
        
        //
        //props.put("auto.offset.reset", "earliest");

        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Collections.singletonList("test"));  //核心函數1:訂閱topic

        System.out.println("Subscribed to topic " + "test");
        //int i = 0;

        while (true) {
            //System.out.println(i++);
            //核心函數2:long poll,一次拉取回來多個消息
            /* 讀取數據,讀取超時時間為100ms */
            ConsumerRecords<String, String> records = consumer.poll(100);  
            //System.out.println(records.count());
            for (ConsumerRecord<String, String> record : records)
                // print the offset,key and value for the consumer records.
                System.out.printf("offset = %d, key = %s, value = %s\n",
                        record.offset(), record.key(), record.value());
        }
    }

}

啟動生產者,並啟動兩個消費者。(我的生產者和兩個消費者都在同一主機上)

結果是:

第一個啟動的消費者消費消息,第二個消費者沒有消費消息;我關閉掉第一個消費者,第二個消費者就會消費消息; (因為Group相同)

同時只會有一個消費者在消費消息,並且消費消息沒有重疊。

消費者1:

Subscribed to topic test
offset = 4451, key = 25, value = Hello: 25
offset = 4452, key = 26, value = Hello: 26
offset = 4453, key = 27, value = Hello: 27
offset = 4454, key = 28, value = Hello: 28
offset = 4455, key = 29, value = Hello: 29
offset = 4456, key = 30, value = Hello: 30
offset = 4457, key = 31, value = Hello: 31
offset = 4458, key = 32, value = Hello: 32

消費者2:

Subscribed to topic test
offset = 4459, key = 33, value = Hello: 33
offset = 4460, key = 34, value = Hello: 34
offset = 4461, key = 35, value = Hello: 35
offset = 4462, key = 36, value = Hello: 36

 

2. 同一個生產者同一個Topic,兩個消費者不同Group

這里只是修改了TestKafkaConsumer2的源碼,修改了組,具體如下所示:

package com.zc.kafka.consumer.main;

import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 * kafka消費者
 */
public class SimpleKafkaConsumer {

    @SuppressWarnings({ "deprecation", "resource" })
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        //每個消費者分配獨立的組號
        props.put("group.id", "Consumer2");  //修改了組

        //如果value合法,則自動提交偏移量
        props.put("enable.auto.commit", "true");

        //設置多久一次更新被消費消息的偏移量
        props.put("auto.commit.interval.ms", "1000");

        //設置會話響應的時間,超過這個時間kafka可以選擇放棄消費或者消費下一條消息
        props.put("session.timeout.ms", "30000");
        
        //
        //props.put("auto.offset.reset", "earliest");

        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Collections.singletonList("test"));  //核心函數1:訂閱topic

        System.out.println("Subscribed to topic " + "test");
        //int i = 0;

        while (true) {
            //System.out.println(i++);
            //核心函數2:long poll,一次拉取回來多個消息
            /* 讀取數據,讀取超時時間為100ms */
            ConsumerRecords<String, String> records = consumer.poll(100);  
            //System.out.println(records.count());
            for (ConsumerRecord<String, String> record : records)
                // print the offset,key and value for the consumer records.
                System.out.printf("offset = %d, key = %s, value = %s\n",
                        record.offset(), record.key(), record.value());
        }
    }

}

啟動生產者,並啟動兩個消費者。(我的生產者和兩個消費者都在同一主機上)

結果是:

第一個啟動的消費者消費消息,第二個消費者也再消費消息;(因為Group不相同)

同時兩個消費者都在消費消息,並且消費消息重疊。

消費者1:

Subscribed to topic test
offset = 4463, key = 0, value = Hello: 0
offset = 4464, key = 1, value = Hello: 1
offset = 4465, key = 2, value = Hello: 2
offset = 4466, key = 3, value = Hello: 3
offset = 4467, key = 4, value = Hello: 4
offset = 4468, key = 5, value = Hello: 5
offset = 4469, key = 6, value = Hello: 6
offset = 4470, key = 7, value = Hello: 7
offset = 4471, key = 8, value = Hello: 8
offset = 4472, key = 9, value = Hello: 9

消費者2:

Subscribed to topic test
offset = 4466, key = 3, value = Hello: 3
offset = 4467, key = 4, value = Hello: 4
offset = 4468, key = 5, value = Hello: 5
offset = 4469, key = 6, value = Hello: 6
offset = 4470, key = 7, value = Hello: 7
offset = 4471, key = 8, value = Hello: 8
offset = 4472, key = 9, value = Hello: 9

 

3. 兩個生產者同一個Topic,生產不同的消息,一個消費者

生產的數據和第一個生產者不同:

package com.zc.kafka.producer.test;

import com.zc.kafka.producer.main.SimpleKafkaProducer;

public class TestSimpleKafkaProducer2 {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        long i=0;
        SimpleKafkaProducer skp = new SimpleKafkaProducer();
        while(true) {
            skp.send("Kafka: "+ String.valueOf(i));   //生產的數據不同
            i++;
            try {
                Thread.sleep(10000);  //ms
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

}

啟動兩個生產者,並啟動消費者。(我的生產者和消費者都在同一主機上)

結果是:

消費者同時收到了兩個生產者的消息; (因為Topic相同)

消費者:

Subscribed to topic test
offset = 4473, key = 0, value = Hello: 0
offset = 4474, key = 0, value = Kafka: 0
offset = 4475, key = 1, value = Hello: 1
offset = 4476, key = 1, value = Kafka: 1
offset = 4477, key = 2, value = Hello: 2
offset = 4478, key = 2, value = Kafka: 2
offset = 4479, key = 3, value = Hello: 3
offset = 4480, key = 3, value = Kafka: 3
offset = 4481, key = 4, value = Hello: 4
offset = 4482, key = 4, value = Kafka: 4

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM