本文主要介紹使用Java API來操作Kafka,文中所使用到的軟件版本:Java 1.8.0_191、Kafka 2.13-2.4.1、kafka-clients 2.5.0、junit 4.13。
1、引入依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> </dependency>
2、生產者
package com.inspur.demo.general.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.Properties; /** * kafka生產者示例 */ public class ProducerCase { private Properties props; private Producer producer; @Before public void before() { props = new Properties(); props.put("bootstrap.servers", "10.49.196.10:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("acks", "all");//所有follower都響應了才認為消息提交成功,即"committed" //props.put("retries", "10");//連接失敗重試次數 //props.put("batch.size", "16384");//每個分區緩沖區大小,當數據大小達到設定值后,就會立即發送,不顧下面的linger.ms //props.put("linger.ms", "1");//producer將會等待給定的延遲時間以允許其他消息記錄一起發送,默認為0 //props.put("buffer.memory", "33554432");//producer緩沖區大小 //props.put("max.block.ms", "60000");//當緩沖區滿了,發送消息時最大等待時間 } @After public void after() throws Exception { producer.close(); } /** * 簡單使用 */ @Test public void simpleUse() throws Exception { producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { //發送key和value producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "消息-" + i)); //只發送value producer.send(new ProducerRecord<String, String>("test", "消息2-" + i)); } } /** * 以事務方式發送消息 * @throws Exception */ @Test public void transactional() throws Exception { props.put("transactional.id", "tid123");//必須設置,不同的生產者需要設置不同的事務id producer = new KafkaProducer<>(props); producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 10; i++) { //發送key和value producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "消息-" + i)); //只發送value producer.send(new ProducerRecord<String, String>("test", "消息2-" + i)); } producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. producer.abortTransaction(); } } }
3、消費者
package com.inspur.demo.general.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; /** * kafka消費者示例 */ public class ConsumerCase { private Properties props; private Consumer consumer; @Before public void before() { props = new Properties(); props.put("bootstrap.servers", "10.49.196.10:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("group.id", "test"); } @After public void after() throws Exception { consumer.close(); } /** * 自動提交 */ @Test public void automatic() throws Exception { props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value()); } } } /** * 手動提交 * @throws Exception */ @Test public void manual() throws Exception { props.setProperty("enable.auto.commit", "false"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value()); } //insertIntoDb(records);//具體業務處理 consumer.commitSync(); } } }
4、流處理
package com.inspur.demo.general.kafka; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Arrays; import java.util.Locale; import java.util.Properties; import java.util.concurrent.CountDownLatch; /** * 單詞統計 */ public class WordCount { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.49.196.10:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))//把每條消息拆成一個個單詞 .groupBy((key, value) -> value)//根據單詞分組 .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))//計算每個單詞的個數並保存在名為"counts-store"的KeyValueStore中 .toStream() .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));//設置輸出類型,鍵為String,值為long final Topology topology = builder.build(); System.out.println(topology.describe()); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); streams.start(); latch.await(); } }
程序從streams-plaintext-input中讀取消息,並把每條消息拆成單詞,並統計這些單詞的數量;把統計信息發送到streams-wordcount-output,可用如下命令查看主題輸出情況:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --value-deserializer org.apache.kafka.common.serialization.LongDeserializer --property print.key=true
輸入如下:
輸出如下:
5、Admin
package com.inspur.demo.general.kafka; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartitionReplica; import org.apache.kafka.common.config.ConfigResource; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Properties; public class AdminCase { private AdminClient adminClient; @Before public void before() { Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "10.49.196.10:9092"); adminClient = AdminClient.create(props); } @After public void after() { adminClient.close(); } /** * 創建主題 */ @Test public void createTopics() { NewTopic topic = new NewTopic("admin-test", 4, (short) 1);//分區為4,副本為1 Collection<NewTopic> topicList = new ArrayList<>(); topicList.add(topic); adminClient.createTopics(topicList); } /** * 列出主題 * @throws Exception */ @Test public void listTopics() throws Exception { ListTopicsOptions listTopicsOptions = new ListTopicsOptions(); listTopicsOptions.listInternal(true);//是否羅列內部主題 ListTopicsResult result = adminClient.listTopics(listTopicsOptions); Collection<TopicListing> list = result.listings().get(); System.out.println(list); } /** * 查看主題詳情 * @throws Exception */ @Test public void describeTopics() throws Exception { DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList("admin-test")); System.out.println(result.all().get()); } /** * 刪除主題 * @throws Exception */ @Test public void deleteTopics() throws Exception { adminClient.deleteTopics(Arrays.asList("admin-test")); } /** * 查詢集群信息 * @throws Exception */ @Test public void describeCluster() throws Exception { DescribeClusterResult result = adminClient.describeCluster(); System.out.println(result.nodes().get()); } /** * 查詢配置信息 * @throws Exception */ @Test public void describeConfigs() throws Exception { DescribeConfigsResult result = adminClient.describeConfigs(Arrays.asList(new ConfigResource(ConfigResource.Type.TOPIC, "admin-test"))); System.out.println(result.all().get()); } /** * 查詢節點的日志目錄信息 * @throws Exception */ @Test public void describeLogDirs() throws Exception { DescribeLogDirsResult result = adminClient.describeLogDirs(Arrays.asList(0));//查詢broker.id為0的節點 System.out.println(result.all().get()); } /** * 查詢副本的日志目錄信息 * @throws Exception */ @Test public void describeReplicaLogDirs() throws Exception { DescribeReplicaLogDirsResult result = adminClient.describeReplicaLogDirs(Arrays.asList(new TopicPartitionReplica("admin-test", 0, 0))); System.out.println(result.all().get()); } }