1. 背景
上一篇介紹了如何利用Kafka Streams找出並過濾掉實時流中那些重復的消息。本篇將介紹如何對消息中特定數據進行求和匯總。
2. 功能演示說明
假設我們要執行匯總求和的事件格式如下:
{"title":"Die Hard","sale_ts":"2019-07-18T10:00:00Z","ticket_total_value":12}
這條事件表示的是電影票的售出信息,其中ticket_total_value是票價。現在我們想要為每部電影實時統計各自的票房。
3. 配置項目
首先創建項目路徑
$ mkdir aggregate-sum $ cd aggregate-sum
之后在aggregate-sum目錄下創建Gradle配置文件build.gradle,內容如下:
repositories { mavenCentral() jcenter() maven { url 'http://packages.confluent.io/maven' } } group 'huxihx.kafkastreams' sourceCompatibility = 1.8 targetCompatibility = '1.8' version = '0.0.1' dependencies { implementation 'com.google.protobuf:protobuf-java:3.0.0' implementation 'org.slf4j:slf4j-simple:1.7.26' implementation 'org.apache.kafka:kafka-streams:2.3.0' implementation 'com.google.protobuf:protobuf-java:3.9.1' testCompile group: 'junit', name: 'junit', version: '4.12' } protobuf { generatedFilesBaseDir = "$projectDir/src/" protoc { artifact = 'com.google.protobuf:protoc:3.0.0' } } jar { manifest { attributes( 'Class-Path': configurations.compile.collect { it.getName() }.join(' '), 'Main-Class': 'huxihx.kafkastreams.AggregatingSum' ) } } shadowJar { archiveName = "kstreams-aggregating-sum-standalone-${version}.${extension}" }
注意我們設定的主類名稱是huxihx.kafkastreams.AggregatingSum。
保存上面的文件,然后執行下列命令下載Gradle的wrapper套件:
$ gradle wrapper
做完這些之后,我們在aggregate-sum目錄下創建名為configuration的子目錄,用於保存我們的參數配置文件dev.properties:
$ mkdir configuration
application.id=agg-sum-app bootstrap.servers=localhost:9092 input.topic.name=movie-ticket-sales input.topic.partitions=1 input.topic.replication.factor=1 output.topic.name=movie-revenue output.topic.partitions=1 output.topic.replication.factor=1
這里我們配置了一個輸入topic和一個輸出topic,分別保存輸入消息流和求和匯總后的新消息流。
4. 創建消息Schema
接下來創建用到的topic的schema。在aggregate-sum下執行命令創建保存schema的文件夾:
$ mkdir -p src/main/proto
之后在proto文件夾下創建名為ticket-sale.proto文件,內容如下:
syntax = "proto3";
package huxihx.kafkastreams.proto;
message TicketSale {
string title = 1;
string sale_ts = 2;
int32 ticket_total_value = 3;
}
保存之后在aggregate-sum目錄下運行gradlew命令:
$ ./gradlew build
此時,你應該可以在aggregate-sum/src/main/java/huxihx/kafkastreams/proto下看到生成的Java類:TicketSaleOuterClass。
5. 創建Serdes
這一步我們為所需的topic消息創建各自的Serdes。首先在aggregate-sum目錄下執行下面的命令創建對應的文件夾目錄:
$ mkdir -p src/main/java/huxihx/kafkastreams/serdes
之后在新創建的serdes文件夾下創建ProtobufSerializer.java:
package huxihx.kafkastreams.serdes; import com.google.protobuf.MessageLite; import org.apache.kafka.common.serialization.Serializer; public class ProtobufSerializer<T extends MessageLite> implements Serializer<T> { @Override public byte[] serialize(String topic, T data) { return data == null ? new byte[0] : data.toByteArray(); } }
然后是ProtobufDeserializer.java:
package huxihx.kafkastreams.serdes; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; import java.util.Map; public class ProtobufDeserializer<T extends MessageLite> implements Deserializer<T> { private Parser<T> parser; @Override public void configure(Map<String, ?> configs, boolean isKey) { parser = (Parser<T>) configs.get("parser"); } @Override public T deserialize(String topic, byte[] data) { try { return parser.parseFrom(data); } catch (InvalidProtocolBufferException e) { throw new SerializationException("Failed to deserialize from a protobuf byte array.", e); } } }
最后是ProtobufSerdes.java:
package huxihx.kafkastreams.serdes; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import java.util.HashMap; import java.util.Map; public class ProtobufSerdes<T extends MessageLite> implements Serde<T> { private final Serializer<T> serializer; private final Deserializer<T> deserializer; public ProtobufSerdes(Parser<T> parser) { serializer = new ProtobufSerializer<>(); deserializer = new ProtobufDeserializer<>(); Map<String, Parser<T>> config = new HashMap<>(); config.put("parser", parser); deserializer.configure(config, false); } @Override public Serializer<T> serializer() { return serializer; } @Override public Deserializer<T> deserializer() { return deserializer; } }
6. 開發主流程
首先在src/main/java/huxihx/kafkastreams下創建AggregatingSum.java,代碼如下:
package huxihx.kafkastreams; import huxihx.kafkastreams.proto.TicketSaleOuterClass; import huxihx.kafkastreams.serdes.ProtobufSerdes; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Produced; import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; public class AggregatingSum { public static void main(String[] args) throws Exception { if (args.length < 1) { throw new IllegalArgumentException( "This program takes one argument: the path to an environment configuration file."); } new AggregatingSum().runRecipe(args[0]); } private void runRecipe(final String configPath) throws Exception { Properties envProps = this.loadEnvProperties(configPath); Properties streamProps = this.createStreamsProperties(envProps); Topology topology = this.buildTopology(envProps, ticketSaleProtobufSerdes()); this.preCreateTopics(envProps); final KafkaStreams streams = new KafkaStreams(topology, streamProps); final CountDownLatch latch = new CountDownLatch(1); // Attach shutdown handler to catch Control-C. Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } private Properties createStreamsProperties(Properties envProps) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id")); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers")); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); return props; } private void preCreateTopics(Properties envProps) throws Exception { Map<String, Object> config = new HashMap<>(); config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers")); String inputTopic = envProps.getProperty("input.topic.name"); String outputTopic = envProps.getProperty("output.topic.name"); try (AdminClient client = AdminClient.create(config)) { Collection<TopicListing> existingTopics = client.listTopics().listings().get(); List<NewTopic> topics = new ArrayList<>(); List<String> topicNames = existingTopics.stream().map(TopicListing::name).collect(Collectors.toList()); if (!topicNames.contains(inputTopic)) topics.add(new NewTopic( inputTopic, Integer.parseInt(envProps.getProperty("input.topic.partitions")), Short.parseShort(envProps.getProperty("input.topic.replication.factor")))); if (!topicNames.contains(outputTopic)) topics.add(new NewTopic( outputTopic, Integer.parseInt(envProps.getProperty("output.topic.partitions")), Short.parseShort(envProps.getProperty("output.topic.replication.factor")))); if (!topics.isEmpty()) client.createTopics(topics).all().get(); } } private Properties loadEnvProperties(String fileName) throws IOException { Properties envProps = new Properties(); try (FileInputStream input = new FileInputStream(fileName)) { envProps.load(input); } return envProps; } private Topology buildTopology(Properties envProps, final ProtobufSerdes<TicketSaleOuterClass.TicketSale> ticketSaleSerde) { final StreamsBuilder builder = new StreamsBuilder(); final String inputTopic = envProps.getProperty("input.topic.name"); final String outputTopic = envProps.getProperty("output.topic.name"); builder.stream(inputTopic, Consumed.with(Serdes.String(), ticketSaleSerde)) .map((k, v) -> new KeyValue<>(v.getTitle(), v.getTicketTotalValue())) .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer())) .reduce(Integer::sum) .toStream() .to(outputTopic, Produced.with(Serdes.String(), Serdes.Integer())); return builder.build(); } private static ProtobufSerdes<TicketSaleOuterClass.TicketSale> ticketSaleProtobufSerdes() { return new ProtobufSerdes<>(TicketSaleOuterClass.TicketSale.parser()); } }
7. 編寫測試Producer
和之前的入門系列一樣,我們編寫TestProducer類,位置在src/main/java/huxihx/kafkastreams/tests/TestProducer.java,內容如下:
package huxihx.kafkastreams.tests; import huxihx.kafkastreams.proto.TicketSaleOuterClass; import huxihx.kafkastreams.serdes.ProtobufSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Arrays; import java.util.List; import java.util.Properties; public class TestProducer { private static final List<TicketSaleOuterClass.TicketSale> TEST_EVENTS = Arrays.asList( TicketSaleOuterClass.TicketSale.newBuilder().setTitle("Die Hard").setSaleTs("2019-07-18T10:00:00Z").setTicketTotalValue(12).build(), TicketSaleOuterClass.TicketSale.newBuilder().setTitle("Die Hard").setSaleTs("2019-07-18T10:01:00Z").setTicketTotalValue(12).build(), TicketSaleOuterClass.TicketSale.newBuilder().setTitle("The Godfather").setSaleTs("2019-07-18T10:01:31Z").setTicketTotalValue(12).build(), TicketSaleOuterClass.TicketSale.newBuilder().setTitle("Die Hard").setSaleTs("2019-07-18T10:01:36Z").setTicketTotalValue(24).build(), TicketSaleOuterClass.TicketSale.newBuilder().setTitle("The Godfather").setSaleTs("2019-07-18T10:00:00Z").setTicketTotalValue(18).build(), TicketSaleOuterClass.TicketSale.newBuilder().setTitle("The Big Lebowski").setSaleTs("2019-07-18T10:40:00Z").setTicketTotalValue(12).build(), TicketSaleOuterClass.TicketSale.newBuilder().setTitle("The Big Lebowski").setSaleTs("2019-07-18T10:50:00Z").setTicketTotalValue(12).build(), TicketSaleOuterClass.TicketSale.newBuilder().setTitle("The Godfather").setSaleTs("2019-07-18T10:00:550Z").setTicketTotalValue(36).build(), TicketSaleOuterClass.TicketSale.newBuilder().setTitle("The Godfather").setSaleTs("2019-07-18T10:00:34Z").setTicketTotalValue(18).build() ); public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new ProtobufSerializer<TicketSaleOuterClass.TicketSale>().getClass()); try (final Producer<String, TicketSaleOuterClass.TicketSale> producer = new KafkaProducer<>(props)) { TEST_EVENTS.stream().map(event -> new ProducerRecord<String, TicketSaleOuterClass.TicketSale>("movie-ticket-sales", event)).forEach(producer::send); } } }
8. 測試
首先我們運行下列命令構建項目:
$ ./gradlew shadowJar
然后啟動Kafka集群,之后運行Kafka Streams應用:
$ java -jar build/libs/kstreams-transform-standalone-0.0.1.jar configuration/dev.properties
現在啟動一個終端運行測試Producer:
$ java -cp build/libs/kstreams-transform-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestProducer
然后再打開一個終端運行ConsoleConsumer測試匯總求和的消息流:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic movie-revenue --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer Die Hard 12 Die Hard 24 The Godfather 12 Die Hard 48 The Godfather 30 The Big Lebowski 12 The Big Lebowski 24 The Godfather 66 The Godfather 84
如果一切正常,你應該可以看到上面的輸出。該Kafka Streams會為每部電影實時統計票房。