kafka---->kafka stream的使用(一)


  kafka stream的簡單使用,這里是官方文檔上面的例子。

 

kafka的簡單使用

一、啟動Kafka server

huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/zookeeper-server-start.sh config/zookeeper.properties
huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-server-start.sh config/server.properties

 

二、創建兩個主題streams-plaintext-input與streams-wordcount-output

huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-topics.sh --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input

huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-topics.sh --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output \
    --config cleanup.policy=compact

可以使用bin/kafka-topics.sh --zookeeper localhost:2181 --describe查看創建的主題描述。

 

三、開啟kafka里面自帶的WordCount程序

huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

啟動console producer去寫入一些record,啟動console consumer去接受處理之后的消息。

huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input

huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

向kafka-console-producer.sh的窗口輸入一些數據

all streams lead to kafka

可以在kafka-console-consumer.sh的窗口里面看到如下的輸出

all     1
streams 1
lead    1
to      1
kafka   1

繼續在producer中輸入數據,可以在consumer的窗口看到相應的輸出。程序的結束可以按鍵Ctrl-C。

 

四、一個關於pipe的例子

package com.linux.huhx.stream;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

/**
 * user: huxhu
 * date: 2018/8/12 8:59 PM
 **/
public class PipeStream {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.101:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        builder.stream("streams-plaintext-input").to("streams-pipe-output");

        final Topology topology = builder.build();

        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

運行以下命令執行程序

mvn clean package
mvn exec:java -Dexec.mainClass=com.linux.huhx.stream.PipeStream

看到以下的輸出,說明正確啟動了程序。注意程序沒有結束,可以按ctrl+C終止程序。

[WARNING]
[WARNING] Some problems were encountered while building the effective settings
[WARNING] Unrecognised tag: 'snapshotPolicy' (position: START_TAG seen ...</layout>\n          <snapshotPolicy>... @267:27)  @ /usr/local/Cellar/maven/3.5.4/libexec/conf/settings.xml, line 267, column 27
[WARNING] Unrecognised tag: 'snapshotPolicy' (position: START_TAG seen ...\n          <snapshotPolicy>... @203:27)  @ /Users/huxhu/.m2/settings.xml, line 203, column 27
[WARNING]
[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.linux.huhx:KafkaLearn:jar:1.0-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-compiler-plugin is missing. @ line 42, column 21
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO]
[INFO] ---------------------< com.linux.huhx:KafkaLearn >----------------------
[INFO] Building KafkaLearn 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- exec-maven-plugin:1.6.0:java (default-cli) @ KafkaLearn ---

我們需要訂閱上面聲明的主題streams-pipe-output。

huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092     --topic streams-pipe-output     --from-beginning

streams-plaintext-input窗口輸入數據,可以在streams-pipe-output窗口看到相應的輸出。

 

 五、一個關於LineSplit的例子

package com.linux.huhx.stream;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class LineSplit {
 
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.101:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
 
        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))).to("streams-linesplit-output");
 
        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

在idea中直接運行上述程序,然后訂閱上面聲明的主題

huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092     --topic streams-linesplit-output     --from-beginning 

在producer和consumer的窗口,可以看到對應的操作如下:

 

友情鏈接

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM