Kafka入門實戰(2)-Java操作Kafka


本文主要介紹使用Java API來操作Kafka,文中所使用到的軟件版本:Java 1.8.0_191、Kafka 2.13-2.4.1、kafka-clients 2.5.0、junit 4.13。

1、引入依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.5.0</version>
</dependency>
<dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>4.13</version>
</dependency>

2、生產者

package com.inspur.demo.general.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Properties;

/**
 * kafka生產者示例
 */
public class ProducerCase {
    private Properties props;
    private Producer producer;

    @Before
    public void before() {
        props = new Properties();
        props.put("bootstrap.servers", "10.49.196.10:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks", "all");//所有follower都響應了才認為消息提交成功,即"committed"
        //props.put("retries", "10");//連接失敗重試次數
        //props.put("batch.size", "16384");//每個分區緩沖區大小,當數據大小達到設定值后,就會立即發送,不顧下面的linger.ms
        //props.put("linger.ms", "1");//producer將會等待給定的延遲時間以允許其他消息記錄一起發送,默認為0
        //props.put("buffer.memory", "33554432");//producer緩沖區大小
        //props.put("max.block.ms", "60000");//當緩沖區滿了,發送消息時最大等待時間
    }

    @After
    public void after() throws Exception {
        producer.close();
    }

    /**
     * 簡單使用
     */
    @Test
    public void simpleUse() throws Exception {
        producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {
            //發送key和value
            producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "消息-" + i));
            //只發送value
            producer.send(new ProducerRecord<String, String>("test", "消息2-" + i));
        }
    }

    /**
     * 以事務方式發送消息
     * @throws Exception
     */
    @Test
    public void transactional() throws Exception {
        props.put("transactional.id", "tid123");//必須設置,不同的生產者需要設置不同的事務id
        producer = new KafkaProducer<>(props);

        producer.initTransactions();
        try {
            producer.beginTransaction();
            for (int i = 0; i < 10; i++) {
                //發送key和value
                producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "消息-" + i));
                //只發送value
                producer.send(new ProducerRecord<String, String>("test", "消息2-" + i));
            }

            producer.commitTransaction();
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
            // We can't recover from these exceptions, so our only option is to close the producer and exit.
            producer.close();
        } catch (KafkaException e) {
            // For all other exceptions, just abort the transaction and try again.
            producer.abortTransaction();
        }
    }

}

3、消費者

package com.inspur.demo.general.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
 * kafka消費者示例
 */
public class ConsumerCase {
    private Properties props;
    private Consumer consumer;

    @Before
    public void before() {
        props = new Properties();
        props.put("bootstrap.servers", "10.49.196.10:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("group.id", "test");
    }

    @After
    public void after() throws Exception {
        consumer.close();
    }

    /**
     * 自動提交
     */
    @Test
    public void automatic() throws Exception {
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "1000");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());
            }
        }
    }

    /**
     * 手動提交
     * @throws Exception
     */
    @Test
    public void manual() throws Exception {
        props.setProperty("enable.auto.commit", "false");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());
            }
            //insertIntoDb(records);//具體業務處理
            consumer.commitSync();
        }
    }

}

4、流處理

package com.inspur.demo.general.kafka;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

/**
 * 單詞統計
 */
public class WordCount {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.49.196.10:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))//把每條消息拆成一個個單詞
                .groupBy((key, value) -> value)//根據單詞分組
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))//計算每個單詞的個數並保存在名為"counts-store"的KeyValueStore中
                .toStream()
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));//設置輸出類型,鍵為String,值為long
        final Topology topology = builder.build();
        System.out.println(topology.describe());
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        streams.start();
        latch.await();
    }
}

程序從streams-plaintext-input中讀取消息,並把每條消息拆成單詞,並統計這些單詞的數量;把統計信息發送到streams-wordcount-output,可用如下命令查看主題輸出情況:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --value-deserializer org.apache.kafka.common.serialization.LongDeserializer --property print.key=true

輸入如下:

 輸出如下:

5、Admin

package com.inspur.demo.general.kafka;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.config.ConfigResource;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

public class AdminCase {
    private AdminClient adminClient;

    @Before
    public void before() {
        Properties props = new Properties();
        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "10.49.196.10:9092");
        adminClient = AdminClient.create(props);
    }

    @After
    public void after() {
        adminClient.close();
    }

    /**
     * 創建主題
     */
    @Test
    public void createTopics() {
        NewTopic topic = new NewTopic("admin-test", 4, (short) 1);//分區為4,副本為1
        Collection<NewTopic> topicList = new ArrayList<>();
        topicList.add(topic);
        adminClient.createTopics(topicList);
    }

    /**
     * 列出主題
     * @throws Exception
     */
    @Test
    public void listTopics() throws Exception {
        ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
        listTopicsOptions.listInternal(true);//是否羅列內部主題
        ListTopicsResult result = adminClient.listTopics(listTopicsOptions);
        Collection<TopicListing> list = result.listings().get();
        System.out.println(list);
    }

    /**
     * 查看主題詳情
     * @throws Exception
     */
    @Test
    public void describeTopics() throws Exception {
        DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList("admin-test"));
        System.out.println(result.all().get());
    }

    /**
     * 刪除主題
     * @throws Exception
     */
    @Test
    public void deleteTopics() throws Exception {
        adminClient.deleteTopics(Arrays.asList("admin-test"));
    }

    /**
     * 查詢集群信息
     * @throws Exception
     */
    @Test
    public void describeCluster() throws Exception {
        DescribeClusterResult result = adminClient.describeCluster();
        System.out.println(result.nodes().get());
    }

    /**
     * 查詢配置信息
     * @throws Exception
     */
    @Test
    public void describeConfigs() throws Exception {
        DescribeConfigsResult result = adminClient.describeConfigs(Arrays.asList(new ConfigResource(ConfigResource.Type.TOPIC, "admin-test")));
        System.out.println(result.all().get());
    }

    /**
     * 查詢節點的日志目錄信息
     * @throws Exception
     */
    @Test
    public void describeLogDirs() throws Exception {
        DescribeLogDirsResult result = adminClient.describeLogDirs(Arrays.asList(0));//查詢broker.id為0的節點
        System.out.println(result.all().get());
    }

    /**
     * 查詢副本的日志目錄信息
     * @throws Exception
     */
    @Test
    public void describeReplicaLogDirs() throws Exception {
        DescribeReplicaLogDirsResult result = adminClient.describeReplicaLogDirs(Arrays.asList(new TopicPartitionReplica("admin-test", 0, 0)));
        System.out.println(result.all().get());
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM