kafka stream的簡單使用,這里是官方文檔上面的例子。
kafka的簡單使用
一、啟動Kafka server
huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/zookeeper-server-start.sh config/zookeeper.properties
huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-server-start.sh config/server.properties
二、創建兩個主題streams-plaintext-input與streams-wordcount-output
huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic streams-plaintext-input huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic streams-wordcount-output \ --config cleanup.policy=compact
可以使用bin
/kafka-topics
.sh --zookeeper localhost:2181 --describe
查看創建的主題描述。
三、開啟kafka里面自帶的WordCount程序
huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
啟動console producer去寫入一些record,啟動console consumer去接受處理之后的消息。
huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
向kafka-console-producer.sh的窗口輸入一些數據
all streams lead to kafka
可以在kafka-console-consumer.sh的窗口里面看到如下的輸出
all 1 streams 1 lead 1 to 1 kafka 1
繼續在producer中輸入數據,可以在consumer的窗口看到相應的輸出。程序的結束可以按鍵Ctrl-C。
四、一個關於pipe的例子
package com.linux.huhx.stream; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; import java.util.concurrent.CountDownLatch; /** * user: huxhu * date: 2018/8/12 8:59 PM **/ public class PipeStream { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.101:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); builder.stream("streams-plaintext-input").to("streams-pipe-output"); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
運行以下命令執行程序
mvn clean package mvn exec:java -Dexec.mainClass=com.linux.huhx.stream.PipeStream
看到以下的輸出,說明正確啟動了程序。注意程序沒有結束,可以按ctrl+C終止程序。
[WARNING] [WARNING] Some problems were encountered while building the effective settings [WARNING] Unrecognised tag: 'snapshotPolicy' (position: START_TAG seen ...</layout>\n <snapshotPolicy>... @267:27) @ /usr/local/Cellar/maven/3.5.4/libexec/conf/settings.xml, line 267, column 27 [WARNING] Unrecognised tag: 'snapshotPolicy' (position: START_TAG seen ...\n <snapshotPolicy>... @203:27) @ /Users/huxhu/.m2/settings.xml, line 203, column 27 [WARNING] [INFO] Scanning for projects... [WARNING] [WARNING] Some problems were encountered while building the effective model for com.linux.huhx:KafkaLearn:jar:1.0-SNAPSHOT [WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-compiler-plugin is missing. @ line 42, column 21 [WARNING] [WARNING] It is highly recommended to fix these problems because they threaten the stability of your build. [WARNING] [WARNING] For this reason, future Maven versions might no longer support building such malformed projects. [WARNING] [INFO] [INFO] ---------------------< com.linux.huhx:KafkaLearn >---------------------- [INFO] Building KafkaLearn 1.0-SNAPSHOT [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:1.6.0:java (default-cli) @ KafkaLearn ---
我們需要訂閱上面聲明的主題streams-pipe-output。
huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-pipe-output --from-beginning
在streams-plaintext-input窗口輸入數據,可以在streams-pipe-output窗口看到相應的輸出。
五、一個關於LineSplit的例子
package com.linux.huhx.stream; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class LineSplit { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.101:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))).to("streams-linesplit-output"); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
在idea中直接運行上述程序,然后訂閱上面聲明的主題
huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-linesplit-output --from-beginning
在producer和consumer的窗口,可以看到對應的操作如下:
友情鏈接