1. 背景
上一篇介紹了如何利用Kafka Streams實時統計某年最賣座和最不賣座的電影票房。主要的方法是通過Streams提供的aggregate方法實現了max/min算子。今天我為大家帶來時間窗口函數的使用方法。在Kafka Streams中,時間窗口有三類:固定時間窗口(Tumbling Window)、滑動時間窗口(Sliding Window)和會話窗口(Session Window)。我們不詳細討論這三類窗口的定義與區別,而是直接使用一個項目來說明如何利用Tumbling Window定期統計每部電影的打分人數。
2. 功能演示說明
這篇文章中我們會創建一個Kafka topic來表示電影打分事件。然后我們編寫一個程序統計每個時間窗口下每部電影接收到的打分人數。我們依然使用ProtocolBuffer對消息事件進行序列化。事件的JSON格式如下所示:
{"title": "Die Hard", "release_year": 1998, "rating": 8.2, "timestamp": "2019-04-25T18:00:00-0700"}
字段含義一目了然,不再贅述了。
整個程序實時統計不同時間窗口下的電影打分人數,比如輸出是這樣的:
[Die Hard@1556186400000/1556187000000] 1 [Die Hard@1556186400000/1556187000000] 2 [Die Hard@1556186400000/1556187000000] 3 [Die Hard@1556186400000/1556187000000] 4 [Die Hard@1556188200000/1556188800000] 1
上面輸出表示在第一個窗口下Die Hard這部電影依次接收到3次打分,在第二個窗口下Die Hard接收了1次打分。
3. 配置項目
第1步是創建項目功能所在路徑,命令如下:
$ mkdir tumbling-windows && cd tumbling-windows
然后在新創建的tumbling-windows路徑下新建Gradle配置文件build.gradle,內容如下:
buildscript {
repositories {
jcenter()
}
dependencies {
classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.2'
}
}
plugins {
id 'java'
id "com.google.protobuf" version "0.8.10"
}
apply plugin: 'com.github.johnrengelman.shadow'
repositories {
mavenCentral()
jcenter()
maven {
url 'http://packages.confluent.io/maven'
}
}
group 'huxihx.kafkastreams'
sourceCompatibility = 1.8
targetCompatibility = '1.8'
version = '0.0.1'
dependencies {
implementation 'com.google.protobuf:protobuf-java:3.11.4'
implementation 'org.slf4j:slf4j-simple:1.7.26'
implementation 'org.apache.kafka:kafka-streams:2.4.0'
testCompile group: 'junit', name: 'junit', version: '4.12'
}
protobuf {
generatedFilesBaseDir = "$projectDir/src/"
protoc {
artifact = 'com.google.protobuf:protoc:3.11.4'
}
}
jar {
manifest {
attributes(
'Class-Path': configurations.compile.collect { it.getName() }.join(' '),
'Main-Class': 'huxihx.kafkastreams.TumblingWindow'
)
}
}
shadowJar {
archiveName = "kstreams-tumbling-windows-standalone-${version}.${extension}"
}
項目工程中的主類是huxihx.kafkastreams.TumblingWindow。
保存上面的文件,然后執行下列命令下載Gradle的wrapper套件:
$ gradle wrapper
做完這些之后,我們在tumbling-windows目錄下創建名為configuration的子目錄,用於保存我們的參數配置文件dev.properties:
$ mkdir configuration $ cd configuration $ vi dev.properties
dev.properties內容如下:
bootstrap.servers=localhost:9092
rating.topic.name=ratings
rating.topic.partitions=1
rating.topic.replication.factor=1
rating.count.topic.name=rating-counts
rating.count.topic.partitions=1
rating.count.topic.replication.factor=1
這里我們創建了一個輸入topic:ratings和一個輸出topic:rating-counts。前者表示電影打分事件,后者保存每部電影在時間窗口下的打分次數。
4. 創建消息Schema
由於我們使用ProtocolBuffer進行序列化,因此我們要提前生成好Java類來建模實體消息。我們在tumbling-windows路徑下執行以下命令創建保存schema的文件夾:
$ mkdir -p src/main/proto && cd src/main/proto
之后在proto文件夾下創建名為rating.proto文件,內容如下:
syntax = "proto3";
package huxihx.kafkastreams.proto;
message Rating {
string title = 1;
int32 release_year = 2;
double rating = 3;
string timestamp = 4;
}
由於輸出格式很簡單,因此這次我們不為輸出topic做單獨的schema類生成了。保存上面的文件之后在tumbling-windows目錄下運行gradlew命令:
./gradlew build
此時,你應該可以在tumbling-windows的src/main/java/huxihx/kafkastreams/proto下看到生成的Java類:RatingOuterClass。
5. 創建Serdes
這一步我們為所需的topic消息創建Serdes。首先在tumbling-windows目錄下執行下面的命令創建對應的文件夾目錄:
$ mkdir -p src/main/java/huxihx/kafkastreams/serdes
在新創建的serdes文件夾下創建ProtobufSerializer.java,內容如下:
package huxihx.kafkastreams.serdes;
import com.google.protobuf.MessageLite;
import org.apache.kafka.common.serialization.Serializer;
public class ProtobufSerializer<T extends MessageLite> implements Serializer<T> {
@Override
public byte[] serialize(String topic, T data) {
return data == null ? new byte[0] : data.toByteArray();
}
}
接下來是創建ProtobufDeserializer.java:
package huxihx.kafkastreams.serdes;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
public class ProtobufDeserializer<T extends MessageLite> implements Deserializer<T> {
private Parser<T> parser;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
parser = (Parser<T>) configs.get("parser");
}
@Override
public T deserialize(String topic, byte[] data) {
try {
return parser.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new SerializationException("Failed to deserialize from a protobuf byte array.", e);
}
}
}
最后是ProtobufSerdes.java:
package huxihx.kafkastreams.serdes;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import java.util.HashMap;
import java.util.Map;
public class ProtobufSerdes<T extends MessageLite> implements Serde<T> {
private final Serializer<T> serializer;
private final Deserializer<T> deserializer;
public ProtobufSerdes(Parser<T> parser) {
serializer = new ProtobufSerializer<>();
deserializer = new ProtobufDeserializer<>();
Map<String, Parser<T>> config = new HashMap<>();
config.put("parser", parser);
deserializer.configure(config, false);
}
@Override
public Serializer<T> serializer() {
return serializer;
}
@Override
public Deserializer<T> deserializer() {
return deserializer;
}
}
6. 開發主流程
對於時間窗口而言,你必須要定義一個TimestampExtractor告訴Kafka Streams如何確定時間維度。本例中我們需要創建一個TimestampExtractor來提取事件中的時間信息。我們在huxihx.kafkastreams下創建一個名為RatingTimestampExtractor.java的文件:
package huxihx.kafkastreams;
import huxihx.kafkastreams.proto.RatingOuterClass;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
import java.text.ParseException;
import java.text.SimpleDateFormat;
public class RatingTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
String eventTime = ((RatingOuterClass.Rating)record.value()).getTimestamp();
try {
return sdf.parse(eventTime).getTime();
} catch (ParseException e) {
return 0;
}
}
}
上面代碼利用SimpleDateFormat做時間格式轉換,將字符串形式的時間轉換成一個時間戳返回。
接下來,我們編寫主程序:TumblingWindow.java:
package huxihx.kafkastreams;
import huxihx.kafkastreams.proto.RatingOuterClass;
import huxihx.kafkastreams.serdes.ProtobufSerdes;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
public class TumblingWindow {
public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
}
new TumblingWindow().runRecipe(args[0]);
}
private Properties loadEnvProperties(String fileName) throws IOException {
Properties envProps = new Properties();
try (FileInputStream input = new FileInputStream(fileName)) {
envProps.load(input);
}
return envProps;
}
private void runRecipe(final String configPath) throws Exception {
Properties envProps = this.loadEnvProperties(configPath);
Properties streamProps = this.createStreamsProperties(envProps);
Topology topology = this.buildTopology(envProps);
this.preCreateTopics(envProps);
final KafkaStreams streams = new KafkaStreams(topology, streamProps);
final CountDownLatch latch = new CountDownLatch(1);
// Attach shutdown handler to catch Control-C.
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
private Topology buildTopology(final Properties envProps) {
final StreamsBuilder builder = new StreamsBuilder();
final String ratingTopic = envProps.getProperty("rating.topic.name");
final String ratingCountTopic = envProps.getProperty("rating.count.topic.name");
builder.stream(ratingTopic, Consumed.with(Serdes.String(), ratingProtobufSerdes()))
.map((key, rating) -> new KeyValue<>(rating.getTitle(), rating))
.groupByKey(Grouped.with(Serdes.String(), ratingProtobufSerdes()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
.count()
.toStream()
.<String, String>map((Windowed<String> key, Long count) -> new KeyValue(windowedKeyToString(key), count.toString()))
.to(ratingCountTopic);
return builder.build();
}
private static void preCreateTopics(Properties envProps) throws Exception {
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
String inputTopic = envProps.getProperty("rating.topic.name");
String outputTopic = envProps.getProperty("rating.count.topic.name");
Map<String, String> topicConfigs = new HashMap<>();
topicConfigs.put("retention.ms", Long.toString(Long.MAX_VALUE));
try (AdminClient client = AdminClient.create(config)) {
Collection<TopicListing> existingTopics = client.listTopics().listings().get();
List<NewTopic> topics = new ArrayList<>();
List<String> topicNames = existingTopics.stream().map(TopicListing::name).collect(Collectors.toList());
if (!topicNames.contains(inputTopic))
topics.add(new NewTopic(
inputTopic,
Integer.parseInt(envProps.getProperty("rating.topic.partitions")),
Short.parseShort(envProps.getProperty("rating.topic.replication.factor"))).configs(topicConfigs));
if (!topicNames.contains(outputTopic))
topics.add(new NewTopic(
outputTopic,
Integer.parseInt(envProps.getProperty("rating.count.topic.partitions")),
Short.parseShort(envProps.getProperty("rating.count.topic.replication.factor"))).configs(topicConfigs));
if (!topics.isEmpty())
client.createTopics(topics).all().get();
}
}
private Properties createStreamsProperties(Properties envProps) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id"));
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, RatingTimestampExtractor.class.getName());
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
try {
props.put(StreamsConfig.STATE_DIR_CONFIG,
Files.createTempDirectory("tumbling-windows").toAbsolutePath().toString());
} catch (IOException ignored) {
}
return props;
}
private String windowedKeyToString(Windowed<String> key) {
return String.format("[%s@%s/%s]", key.key(), key.window().start(), key.window().end());
}
private static ProtobufSerdes<RatingOuterClass.Rating> ratingProtobufSerdes() {
return new ProtobufSerdes<>(RatingOuterClass.Rating.parser());
}
}
7. 編寫測試Producer
現在創建src/main/java/huxihx/kafkastreams/tests/TestProducer.java和TestConsumer.java用於測試,內容分別如下:
package huxihx.kafkastreams.tests;
import huxihx.kafkastreams.proto.RatingOuterClass;
import huxihx.kafkastreams.serdes.ProtobufSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class TestProducer {
private static final List<RatingOuterClass.Rating> TEST_EVENTS = Arrays.asList(
RatingOuterClass.Rating.newBuilder().setTitle("Die Hard").setReleaseYear(1998).setRating(8.2)
.setTimestamp("2019-04-25T18:00:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("Die Hard").setReleaseYear(1998).setRating(4.5)
.setTimestamp("2019-04-25T18:03:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("Die Hard").setReleaseYear(1998).setRating(5.1)
.setTimestamp("2019-04-25T18:04:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("Die Hard").setReleaseYear(1998).setRating(2.0)
.setTimestamp("2019-04-25T18:07:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("Die Hard").setReleaseYear(1998).setRating(8.3)
.setTimestamp("2019-04-25T18:32:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("Die Hard").setReleaseYear(1998).setRating(3.4)
.setTimestamp("2019-04-25T18:36:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("Die Hard").setReleaseYear(1998).setRating(4.2)
.setTimestamp("2019-04-25T18:43:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("Die Hard").setReleaseYear(1998).setRating(7.6)
.setTimestamp("2019-04-25T18:44:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("Tree of Life").setReleaseYear(2011).setRating(4.9)
.setTimestamp("2019-04-25T20:01:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("Tree of Life").setReleaseYear(2011).setRating(5.6)
.setTimestamp("2019-04-25T20:02:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("Tree of Life").setReleaseYear(2011).setRating(9.0)
.setTimestamp("2019-04-25T20:03:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("Tree of Life").setReleaseYear(2011).setRating(6.5)
.setTimestamp("2019-04-25T20:12:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("Tree of Life").setReleaseYear(2011).setRating(2.1)
.setTimestamp("2019-04-25T20:13:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("A Walk in the Clouds").setReleaseYear(1995).setRating(3.6)
.setTimestamp("2019-04-25T22:20:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("A Walk in the Clouds").setReleaseYear(1995).setRating(6.0)
.setTimestamp("2019-04-25T22:21:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("A Walk in the Clouds").setReleaseYear(1995).setRating(7.0)
.setTimestamp("2019-04-25T22:22:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("A Walk in the Clouds").setReleaseYear(1995).setRating(4.6)
.setTimestamp("2019-04-25T22:23:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("A Walk in the Clouds").setReleaseYear(1995).setRating(7.1)
.setTimestamp("2019-04-25T22:24:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("A Walk in the Clouds").setReleaseYear(1998).setRating(9.9)
.setTimestamp("2019-04-25T21:15:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("A Walk in the Clouds").setReleaseYear(1998).setRating(8.9)
.setTimestamp("2019-04-25T21:16:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("A Walk in the Clouds").setReleaseYear(1998).setRating(7.9)
.setTimestamp("2019-04-25T21:17:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("A Walk in the Clouds").setReleaseYear(1998).setRating(8.9)
.setTimestamp("2019-04-25T21:18:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("A Walk in the Clouds").setReleaseYear(1998).setRating(9.9)
.setTimestamp("2019-04-25T21:19:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("A Walk in the Clouds").setReleaseYear(1998).setRating(9.9)
.setTimestamp("2019-04-25T21:20:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("Super Mario Bros.").setReleaseYear(1993).setRating(3.5)
.setTimestamp("2019-04-25T13:00:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("Super Mario Bros.").setReleaseYear(1993).setRating(4.5)
.setTimestamp("2019-04-25T13:07:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("Super Mario Bros.").setReleaseYear(1993).setRating(5.5)
.setTimestamp("2019-04-25T13:30:00-0700").build(),
RatingOuterClass.Rating.newBuilder().setTitle("Super Mario Bros.").setReleaseYear(1993).setRating(6.5)
.setTimestamp("2019-04-25T13:34:00-0700").build());
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new ProtobufSerializer<RatingOuterClass.Rating>().getClass());
try (final Producer<String, RatingOuterClass.Rating> producer = new KafkaProducer<>(props)) {
TEST_EVENTS.stream().map(event ->
new ProducerRecord<String, RatingOuterClass.Rating>("ratings", event)).forEach(producer::send);
}
}
}
8. 測試
首先我們運行下列命令構建項目:
$ ./gradlew shadowJar
然后啟動Kafka集群,之后運行Kafka Streams應用:
$ java -jar build/libs/kstreams-transform-standalone-0.0.1.jar configuration/dev.properties
現在啟動一個終端測試Producer:
$ java -cp build/libs/kstreams-transform-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestProducer
之后打開另一個終端,運行一個ConsoleConsumer命令驗證輸出結果:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic rating-counts --from-beginning --property print.key=true
如果一切正常的話,你應該可以看見如下輸出:
[Die Hard@1556186400000/1556187000000] 1 [Die Hard@1556186400000/1556187000000] 2 [Die Hard@1556186400000/1556187000000] 3 [Die Hard@1556186400000/1556187000000] 4 [Die Hard@1556188200000/1556188800000] 1 [Die Hard@1556188200000/1556188800000] 2 [Die Hard@1556188800000/1556189400000] 1 [Die Hard@1556188800000/1556189400000] 2 [Tree of Life@1556193600000/1556194200000] 1 [Tree of Life@1556193600000/1556194200000] 2 [Tree of Life@1556193600000/1556194200000] 3 [Tree of Life@1556194200000/1556194800000] 1 [Tree of Life@1556194200000/1556194800000] 2 [A Walk in the Clouds@1556202000000/1556202600000] 1 [A Walk in the Clouds@1556202000000/1556202600000] 2 [A Walk in the Clouds@1556202000000/1556202600000] 3 [A Walk in the Clouds@1556202000000/1556202600000] 4 [A Walk in the Clouds@1556202000000/1556202600000] 5 [A Walk in the Clouds@1556197800000/1556198400000] 1 [A Walk in the Clouds@1556197800000/1556198400000] 2 [A Walk in the Clouds@1556197800000/1556198400000] 3 [A Walk in the Clouds@1556197800000/1556198400000] 4 [A Walk in the Clouds@1556197800000/1556198400000] 5 [A Walk in the Clouds@1556198400000/1556199000000] 1 [Super Mario Bros.@1556168400000/1556169000000] 1 [Super Mario Bros.@1556168400000/1556169000000] 2 [Super Mario Bros.@1556170200000/1556170800000] 1 [Super Mario Bros.@1556170200000/1556170800000] 2
中括號中的文字是電影名稱,第一個時間戳是該窗口起始時間,第二個時間戳是窗口結束時間。
