Java操作Kafka創建主題、生產者、消費者


環境

JDK 1.8
Zookeeper 3.6.1
Kafka 2.6.0

引入依賴

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.6.0</version>
</dependency>

<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-nop</artifactId>
  <version>1.7.25</version>
</dependency>

<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.9.5</version>
</dependency>

創建主題

public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.25.132:9092");  //kafka服務地址
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("key.serializer", StringSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());

    AdminClient client = KafkaAdminClient.create(props);//創建操作客戶端
    //創建名稱為test1的topic,有5個分區
    NewTopic topic = new NewTopic("test1", 5, (short) 1); 
    client.createTopics(Arrays.asList(topic));
    client.close();//關閉
}

查看結果,在zookeeper路徑/brokers/topics下新增了節點test1,就是剛才創建的topic主題
在這里插入圖片描述

生產者

public static void main(String[] args) {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.25.132:9092");
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    KafkaProducer<String, String> producer = new KafkaProducer<>(props);

    //異步發送20條消息
    for (int i = 1; i <= 20; i++){
        ProducerRecord<String, String> record = new ProducerRecord<>("test1", "key" + i, "message" + i);
        producer.send(record);
    }

    producer.close();
}

先啟動控制台的消費者,監聽topic=test1的數據

kafka-console-consumer.sh --bootstrap-server 192.168.25.132:9092 --topic test1

然后運行生產者程序,這時消費端會輸出消息信息

[root@cluster01 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.25.132:9092 --topic test1
message1
message2
message3
message4
message5
message6
message7
message8
message9
message10
message11
message12
message13
message14
message15
message16
message17
message18
message19
message20

帶回調函數的生產者

回調函數是在生產者收到ack時調用,把上面的producer.send(record)加上回調即可

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e == null){
            System.out.println("success:" + recordMetadata.offset());
        }
        else{
            e.printStackTrace();
        }
    }
});

同步發送:消息發送后,會堵塞當前線程直到收到ack為止,才繼續發送

//在后面加上get()即可
producer.send(record).get();

消費者

public static void main(String[] args) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.25.132:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");//groupid相同的屬於同一個消費者組
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//自動提交offset
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    //消費test1主題
    consumer.subscribe(Arrays.asList("test1"));
    while (true){
        System.out.println("consumer is polling");
        //5秒等待
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(String.format("offset=%d,key=%s,value=%s",
                    record.offset(), record.key(), record.value()));
        }
        //同步提交,失敗會重試
        consumer.commitSync();
        //異步提交,失敗不會重試
        //consumer.commitAsync();
    }
}

運行消費者,看到輸出“consumer is polling”之后,再運行生產者,結果如下:

D:\Java\jdk1.8.0_131\bin\java.exe "-javaagent:D:\Idea\IntelliJ IDEA 2019.1.1\lib\idea_rt.jar=52238:D:\Idea\IntelliJ IDEA 2019.1.1\bin" -Dfile.encoding=UTF-8 -classpath D:\Java\jdk1.8.0_131\jre\lib\charsets.jar;D:\Java\jdk1.8.0_131\jre\lib\deploy.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\access-bridge-64.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\cldrdata.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\dnsns.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\jaccess.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\jfxrt.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\localedata.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\nashorn.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunec.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunjce_provider.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunmscapi.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\sunpkcs11.jar;D:\Java\jdk1.8.0_131\jre\lib\ext\zipfs.jar;D:\Java\jdk1.8.0_131\jre\lib\javaws.jar;D:\Java\jdk1.8.0_131\jre\lib\jce.jar;D:\Java\jdk1.8.0_131\jre\lib\jfr.jar;D:\Java\jdk1.8.0_131\jre\lib\jfxswt.jar;D:\Java\jdk1.8.0_131\jre\lib\jsse.jar;D:\Java\jdk1.8.0_131\jre\lib\management-agent.jar;D:\Java\jdk1.8.0_131\jre\lib\plugin.jar;D:\Java\jdk1.8.0_131\jre\lib\resources.jar;D:\Java\jdk1.8.0_131\jre\lib\rt.jar;F:\JavaTest\ConsoleTrain\kafka\target\classes;E:\.m2\repository\org\apache\kafka\kafka-clients\2.6.0\kafka-clients-2.6.0.jar;E:\.m2\repository\com\github\luben\zstd-jni\1.4.4-7\zstd-jni-1.4.4-7.jar;E:\.m2\repository\org\lz4\lz4-java\1.7.1\lz4-java-1.7.1.jar;E:\.m2\repository\org\xerial\snappy\snappy-java\1.1.7.3\snappy-java-1.1.7.3.jar;E:\.m2\repository\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;E:\.m2\repository\org\slf4j\slf4j-nop\1.7.25\slf4j-nop-1.7.25.jar;E:\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.9.5\jackson-databind-2.9.5.jar;E:\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.9.5\jackson-core-2.9.5.jar com.train.consumer.SimpleConsumer
consumer is polling
consumer is polling
partition=1,offset=15,key=key2,value=message2
partition=1,offset=16,key=key6,value=message6
partition=1,offset=17,key=key12,value=message12
consumer is polling
partition=0,offset=25,key=key1,value=message1
partition=0,offset=26,key=key3,value=message3
partition=0,offset=27,key=key10,value=message10
partition=0,offset=28,key=key15,value=message15
partition=0,offset=29,key=key19,value=message19
partition=3,offset=20,key=key5,value=message5
partition=3,offset=21,key=key11,value=message11
partition=3,offset=22,key=key14,value=message14
partition=3,offset=23,key=key18,value=message18
partition=2,offset=15,key=key7,value=message7
partition=2,offset=16,key=key8,value=message8
partition=2,offset=17,key=key16,value=message16
partition=4,offset=25,key=key4,value=message4
partition=4,offset=26,key=key9,value=message9
partition=4,offset=27,key=key13,value=message13
partition=4,offset=28,key=key17,value=message17
partition=4,offset=29,key=key20,value=message20
consumer is polling

注意觀察上圖的數據:consumer消費數據,只保證同一個分區內的數據是有序的。當1個consumer去消費不同分區的數據時,分區之間的message不能保證順序。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM