背景
上一篇我們介紹了Kafka Streams中的消息轉換操作map,今天我們給出另一個經典的轉換操作filter的用法。依然是結合一個具體的實例展開介紹。
演示功能說明
本篇演示filter用法,即根據給定的過濾條件或邏輯實時對每條消息進行過濾處理。今天使用的輸入topic消息格式如下:
{"name": "George R. R. Martin", "title": "A Song of Ice and Fire"}
{"name": "C.S. Lewis", "title": "The Silver Chair"}
我們打算過濾出name是“George R. R. Martin”的所有消息並發送到輸出topic上。
初始化項目
創建項目目錄:
mkdir filter-streams
cd filter-streams/
配置項目
在filter-streams目錄下創建build.gradle文件,內容如下:
buildscript { repositories { jcenter() } dependencies { classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.2' } } plugins { id 'java' id "com.google.protob" version "0.8.10" } apply plugin: 'com.github.johnrengelman.shadow' repositories { mavenCentral() jcenter() maven { url 'http://packages.confluent.io/maven' } } group 'huxihx.kafkastreams' sourceCompatibility = 1.8 targetCompatibility = '1.8' version = '0.0.1' dependencies { implementation 'com.google.protobuf:protobuf-java:3.0.0' implementation 'org.slf4j:slf4j-simple:1.7.26' implementation 'org.apache.kafka:kafka-streams:2.3.0' implementation 'com.google.protobuf:protobuf-java:3.9.1' testCompile group: 'junit', name: 'junit', version: '4.12' } protobuf { generatedFilesBaseDir = "$projectDir/src/" protoc { artifact = 'com.google.protobuf:protoc:3.0.0' } } jar { manifest { attributes( 'Class-Path': configurations.compile.collect { it.getName() }.join(' '), 'Main-Class': 'huxihx.kafkastreams.FilteredStreamsApp' ) } } shadowJar { archiveName = "kstreams-transform-standalone-${version}.${extension}" }
然后執行下列命令下載Gradle的wrapper套件:
gradle wrapper
之后在filter-streams目錄下創建一個名為configuration的文件夾用於保存我們的參數配置文件:
mkdir configuration
創建一個名為dev.properties的文件:
application.id=filtering-app
bootstrap.servers=localhost:9092input.topic.name=publications
input.topic.partitions=1
input.topic.replication.factor=1output.topic.name=filtered-publications
output.topic.partitions=1output.topic.replication.factor=1
創建消息Schema
下一步是創建輸入消息和輸出消息的schema。由於我們今天只是做filter,所以輸入和輸出的格式一樣的,只需要創建一份schema即可。首先,在filter-streams下執行命令創建保存schema的文件夾:
mkdir -p src/main/proto
之后創建publication.proto文件,內容如下:
syntax = "proto3";
package huxihx.kafkastreams.proto;
message Publication {
string name = 1;
string title = 2;
}
保存文件之后運行下列命令去編譯對應的Java類:
./gradlew build
此時,你應該可以在src/main/java/huxihx/kafkastreams/proto下看到生成的Java類:PublicationOuterClass。
創建Serdes
這一步的Serdes和上一篇中的一樣,因此不再贅述,直接上代碼:
mkdir -p src/main/java/huxihx/kafkastreams/serdes
在新創建的serdes文件夾下創建ProtobufSerializer.java:
package huxihx.kafkastreams.serdes; import com.google.protobuf.MessageLite; import org.apache.kafka.common.serialization.Serializer; public class ProtobufSerializer<T extends MessageLite> implements Serializer<T> { @Override public byte[] serialize(String topic, T data) { return data == null ? new byte[0] : data.toByteArray(); } }
然后創建ProtobufDeserializer.java:
package huxihx.kafkastreams.serdes; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; import java.util.Map; public class ProtobufDeserializer<T extends MessageLite> implements Deserializer<T> { private Parser<T> parser; @Override public void configure(Map<String, ?> configs, boolean isKey) { parser = (Parser<T>) configs.get("parser"); } @Override public T deserialize(String topic, byte[] data) { try { return parser.parseFrom(data); } catch (InvalidProtocolBufferException e) { throw new SerializationException("Failed to deserialize from a protobuf byte array.", e); } } }
最后創建ProtobufSerdes.java:
package huxihx.kafkastreams.serdes; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import java.util.HashMap; import java.util.Map; public class ProtobufSerdes<T extends MessageLite> implements Serde<T> { private final Serializer<T> serializer; private final Deserializer<T> deserializer; public ProtobufSerdes(Parser<T> parser) { serializer = new ProtobufSerializer<>(); deserializer = new ProtobufDeserializer<>(); Map<String, Parser<T>> config = new HashMap<>(); config.put("parser", parser); deserializer.configure(config, false); } @Override public Serializer<T> serializer() { return serializer; } @Override public Deserializer<T> deserializer() { return deserializer; } }
開發主流程
在src/main/java/huxihx/kafkastreams下創建FilteredStreamsApp.java文件:
package huxihx.kafkastreams; import huxihx.kafkastreams.proto.PublicationOuterClass; import huxihx.kafkastreams.serdes.ProtobufSerdes; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Produced; import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.CountDownLatch; public class FilteredStreamsApp { private Properties buildStreamsProperties(Properties envProps) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id")); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers")); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return props; } private void preCreateTopics(Properties envProps) throws Exception { Map<String, Object> config = new HashMap<>(); config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers")); try (AdminClient client = AdminClient.create(config)) { Set<String> existingTopics = client.listTopics().names().get(); List<NewTopic> topics = new ArrayList<>(); String inputTopic = envProps.getProperty("input.topic.name"); if (!existingTopics.contains(inputTopic)) { topics.add(new NewTopic(inputTopic, Integer.parseInt(envProps.getProperty("input.topic.partitions")), Short.parseShort(envProps.getProperty("input.topic.replication.factor")))); } String outputTopic = envProps.getProperty("output.topic.name"); if (!existingTopics.contains(outputTopic)) { topics.add(new NewTopic(outputTopic, Integer.parseInt(envProps.getProperty("output.topic.partitions")), Short.parseShort(envProps.getProperty("output.topic.replication.factor")))); } client.createTopics(topics); } } private Properties loadEnvProperties(String filePath) throws IOException { Properties envProps = new Properties(); try (FileInputStream input = new FileInputStream(filePath)) { envProps.load(input); } return envProps; } private Topology buildTopology(Properties envProps, final Serde<PublicationOuterClass.Publication> publicationSerde) { final StreamsBuilder builder = new StreamsBuilder(); final String inputTopic = envProps.getProperty("input.topic.name"); final String outputTopic = envProps.getProperty("output.topic.name"); builder.stream(inputTopic, Consumed.with(Serdes.String(), publicationSerde)) .filter((key, publication) -> "George R. R. Martin".equals(publication.getName())) .to(outputTopic, Produced.with(Serdes.String(), publicationSerde)); return builder.build(); } public static void main(String[] args) throws Exception { if (args.length < 1) { throw new IllegalArgumentException("Environment configuration file must be specified."); } FilteredStreamsApp app = new FilteredStreamsApp(); Properties envProps = app.loadEnvProperties(args[0]); Properties streamProps = app.buildStreamsProperties(envProps); app.preCreateTopics(envProps); Topology topology = app.buildTopology(envProps, new ProtobufSerdes<>(PublicationOuterClass.Publication.parser())); final KafkaStreams streams = new KafkaStreams(topology, streamProps); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("streams-jvm-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Exception e) { System.exit(1); } System.exit(0); } }
編寫測試Producer和Consumer
在src/main/java/huxihx/kafkastreams/tests/TestProducer.java和TestConsumer.java,內容分別如下:
package huxihx.kafkastreams.tests; import huxihx.kafkastreams.proto.PublicationOuterClass; import huxihx.kafkastreams.serdes.ProtobufSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Arrays; import java.util.List; import java.util.Properties; public class TestProducer { // 測試輸入事件 private static final List<PublicationOuterClass.Publication> TEST_PUBLICATIONS = Arrays.asList( PublicationOuterClass.Publication.newBuilder() .setName("George R. R. Martin").setTitle("A Song of Ice and Fire").build(), PublicationOuterClass.Publication.newBuilder() .setName("C.S. Lewis").setTitle("The Silver Chair").build(), PublicationOuterClass.Publication.newBuilder() .setName("C.S. Lewis").setTitle("Perelandra").build(), PublicationOuterClass.Publication.newBuilder() .setName("George R. R. Martin").setTitle("Fire & Blood").build(), PublicationOuterClass.Publication.newBuilder() .setName("J. R. R. Tolkien").setTitle("The Hobbit").build(), PublicationOuterClass.Publication.newBuilder() .setName("J. R. R. Tolkien").setTitle("The Lord of the Rings").build(), PublicationOuterClass.Publication.newBuilder() .setName("George R. R. Martin").setTitle("A Dream of Spring").build(), PublicationOuterClass.Publication.newBuilder() .setName("J. R. R. Tolkien").setTitle("The Fellowship of the Ring").build(), PublicationOuterClass.Publication.newBuilder() .setName("George R. R. Martin").setTitle("The Ice Dragon").build()); public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", new ProtobufSerializer<PublicationOuterClass.Publication>().getClass()); try (final Producer<String, PublicationOuterClass.Publication> producer = new KafkaProducer<>(props)) { TEST_PUBLICATIONS.stream() .map(publication -> new ProducerRecord<String, PublicationOuterClass.Publication>("publications", publication)) .forEach(producer::send); } } }
package huxihx.kafkastreams.tests; import com.google.protobuf.Parser; import huxihx.kafkastreams.proto.PublicationOuterClass; import huxihx.kafkastreams.serdes.ProtobufDeserializer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; public class TestConsumer { public static void main(String[] args) { // 為輸出事件構造protobuf deserializer Deserializer<PublicationOuterClass.Publication> deserializer = new ProtobufDeserializer<>(); Map<String, Parser<PublicationOuterClass.Publication>> config = new HashMap<>(); config.put("parser", PublicationOuterClass.Publication.parser()); deserializer.configure(config, false); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); KafkaConsumer<String, PublicationOuterClass.Publication> consumer = new KafkaConsumer<>(props, new StringDeserializer(), deserializer); consumer.subscribe(Arrays.asList("filtered-publications")); while (true) { ConsumerRecords<String, PublicationOuterClass.Publication> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, PublicationOuterClass.Publication> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
測試
首先我們運行下列命令構建項目:
./gradlew shadowJar
然后啟動Kafka集群,之后運行Kafka Streams應用:
java -jar build/libs/kstreams-transform-standalone-0.0.1.jar configuration/dev.properties
然后啟動TestProducer發送測試事件:
java -cp build/libs/kstreams-transform-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestProducer
最后啟動TestConsumer驗證Kafka Streams過濾出了指定的Publication消息:
java -cp build/libs/kstreams-transform-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestConsumer
.......
offset = 0, key = null, value = name: "George R. R. Martin"
title: "A Song of Ice and Fire"offset = 1, key = null, value = name: "George R. R. Martin"
title: "Fire & Blood"offset = 2, key = null, value = name: "George R. R. Martin"
title: "A Dream of Spring"offset = 3, key = null, value = name: "George R. R. Martin"
title: "The Ice Dragon"
總結
下一篇介紹rekey的用法,即實時修改消息的Key值~~