Flink 異步 IO 查 Redis
遇到有同學問 Flink 異步 IO 查 Redis 的問題,幫忙解決了一下,剛好水一篇(后續應該會實現 Table Source、Lookup Source)
以前寫的異步 IO 查 MySQL: [Flink 異步IO訪問外部數據(mysql篇)](https://www.cnblogs.com/Springmoon-venn/p/11103558.html)
Flink 版本 1.14.3 (看了一下,Mysql 的是 2019-06 寫的,目測版本是 Flink 1.7.2)
這個東西比較簡單,直接上代碼了
主類
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// kafka source
KafkaSource<KafkaSimpleStringRecord> kafkaSource = KafkaSource
.<KafkaSimpleStringRecord>builder()
.setBootstrapServers(bootstrapServer)
.setDeserializer(new SimpleKafkaRecordDeserializationSchema())
.setStartingOffsets(OffsetsInitializer.latest())
.setTopics(topic)
.build();
// get value
SingleOutputStreamOperator<String> source = env
.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource")
.map((MapFunction<KafkaSimpleStringRecord, String>) value -> value.getValue());
// async redis
AsyncRedisFunction asyncRedisFunction = new AsyncRedisFunction(uri);
SingleOutputStreamOperator<String> asyncStream = AsyncDataStream
.unorderedWait(source, asyncRedisFunction, 5L, TimeUnit.SECONDS);
// print result
asyncStream
.print("match redis");
env.execute("kafkaJoinRedis");
}
異步 IO Redis
@Override
public void open(Configuration parameters) throws Exception {
// redis standalone
redisClient = RedisClient.create(url);
connection = redisClient.connect();
// async
async = connection.async();
}
//數據處理的方法
@Override
public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
String userId = jsonParser.parse(input).getAsJsonObject().get("user_id").getAsString();
// query string
RedisFuture<String> redisFuture = async.get(input);
// query hash
// RedisFuture<String> redisFuture = async.hget("key", input);
// get all
// async.hgetall(input);
// async query and get result
CompletableFuture.supplyAsync(() -> {
try {
return redisFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
// if get exception
return "exception";
}).thenAccept(new Consumer<String>() {
@Override
public void accept(String result) {
if (result == null) {
result = "nothing";
}
// return result
resultFuture.complete(Collections.singleton(input + " - " + result));
}
});
}
往 redis 寫數據
// 寫 hash 類型的數據
private static void writeHash(String key) {
RedisClient redisClient = RedisClient.create("redis://localhost");
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisClusterCommands<String, String> command = connection.sync();
for (int i = 0; i < 10000; ++i) {
command.hset(key, "" + i, "hash-" + i);
}
System.out.println("finish");
}
// 寫 string 類型
private static void writeString() {
RedisClient redisClient = RedisClient.create("redis://localhost");
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisClusterCommands<String, String> command = connection.sync();
for (int i = 0; i < 100000; ++i) {
command.set("" + i, "value-" + i);
}
System.out.println("finish");
}
測試
啟動任務
kafka 寫入數據
send topic : user_log, message : {"category_id":35,"user_id":"1","item_id":"35173","behavior":"pv","ts":"2022-04-18 16:11:03.469"}
send topic : user_log, message : {"category_id":32,"user_id":"2","item_id":"32556","behavior":"pv","ts":"2022-04-18 16:11:04.469"}
send topic : user_log, message : {"category_id":58,"user_id":"3","item_id":"58508","behavior":"pv","ts":"2022-04-18 16:11:05.469"}
send topic : user_log, message : {"category_id":60,"user_id":"4","item_id":"60712","behavior":"pv","ts":"2022-04-18 16:11:06.469"}
send topic : user_log, message : {"category_id":42,"user_id":"5","item_id":"42957","behavior":"pv","ts":"2022-04-18 16:11:07.469"}
send topic : user_log, message : {"category_id":44,"user_id":"6","item_id":"44459","behavior":"pv","ts":"2022-04-18 16:11:08.469"}
send topic : user_log, message : {"category_id":39,"user_id":"7","item_id":"39188","behavior":"pv","ts":"2022-04-18 16:11:09.469"}
send topic : user_log, message : {"category_id":58,"user_id":"8","item_id":"58703","behavior":"pv","ts":"2022-04-18 16:11:10.469"}
send topic : user_log, message : {"category_id":30,"user_id":"9","item_id":"30993","behavior":"pv","ts":"2022-04-18 16:11:11.469"}
send topic : user_log, message : {"category_id":98,"user_id":"10","item_id":"9818","behavior":"pv","ts":"2022-04-18 16:11:12.469"}
任務輸出
match redis> {"category_id":39,"user_id":"7","item_id":"39188","behavior":"pv","ts":"2022-04-18 16:11:09.469"} - key_7
match redis> {"category_id":58,"user_id":"8","item_id":"58703","behavior":"pv","ts":"2022-04-18 16:11:10.469"} - key_8
match redis> {"category_id":35,"user_id":"1","item_id":"35173","behavior":"pv","ts":"2022-04-18 16:11:03.469"} - key_1
match redis> {"category_id":33,"user_id":"11","item_id":"33807","behavior":"pv","ts":"2022-04-18 16:11:13.469"} - key_11
match redis> {"category_id":44,"user_id":"6","item_id":"44459","behavior":"pv","ts":"2022-04-18 16:11:08.469"} - key_6
match redis> {"category_id":98,"user_id":"10","item_id":"9818","behavior":"pv","ts":"2022-04-18 16:11:12.469"} - key_10
match redis> {"category_id":32,"user_id":"2","item_id":"32556","behavior":"pv","ts":"2022-04-18 16:11:04.469"} - key_2
match redis> {"category_id":30,"user_id":"9","item_id":"30993","behavior":"pv","ts":"2022-04-18 16:11:11.469"} - key_9
match redis> {"category_id":60,"user_id":"4","item_id":"60712","behavior":"pv","ts":"2022-04-18 16:11:06.469"} - key_4
match redis> {"category_id":42,"user_id":"5","item_id":"42957","behavior":"pv","ts":"2022-04-18 16:11:07.469"} - key_5
match redis> {"category_id":58,"user_id":"3","item_id":"58508","behavior":"pv","ts":"2022-04-18 16:11:05.469"} - key_3
搞定
完整代碼參考:flink-rookie
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文