Flink 異步 IO 查 Redis


Flink 異步 IO 查 Redis

遇到有同學問 Flink 異步 IO 查 Redis 的問題,幫忙解決了一下,剛好水一篇(后續應該會實現 Table Source、Lookup Source)

以前寫的異步 IO 查 MySQL: [Flink 異步IO訪問外部數據(mysql篇)](https://www.cnblogs.com/Springmoon-venn/p/11103558.html)

Flink 版本 1.14.3 (看了一下,Mysql 的是 2019-06 寫的,目測版本是 Flink 1.7.2)

這個東西比較簡單,直接上代碼了

主類


public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // kafka source
    KafkaSource<KafkaSimpleStringRecord> kafkaSource = KafkaSource
            .<KafkaSimpleStringRecord>builder()
            .setBootstrapServers(bootstrapServer)
            .setDeserializer(new SimpleKafkaRecordDeserializationSchema())
            .setStartingOffsets(OffsetsInitializer.latest())
            .setTopics(topic)
            .build();

    // get value
    SingleOutputStreamOperator<String> source = env
            .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource")
            .map((MapFunction<KafkaSimpleStringRecord, String>) value -> value.getValue());

    // async redis
    AsyncRedisFunction asyncRedisFunction = new AsyncRedisFunction(uri);
    SingleOutputStreamOperator<String> asyncStream = AsyncDataStream
            .unorderedWait(source, asyncRedisFunction, 5L, TimeUnit.SECONDS);

    // print result
    asyncStream
            .print("match redis");

    env.execute("kafkaJoinRedis");
}

異步 IO Redis

@Override
public void open(Configuration parameters) throws Exception {
    // redis standalone
    redisClient = RedisClient.create(url);
    connection = redisClient.connect();

    // async
    async = connection.async();
}


//數據處理的方法
@Override
public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {

    String userId = jsonParser.parse(input).getAsJsonObject().get("user_id").getAsString();

    // query string
    RedisFuture<String> redisFuture = async.get(input);
    //  query hash
//        RedisFuture<String> redisFuture = async.hget("key", input);
    // get all
//        async.hgetall(input);

    // async query and get result
    CompletableFuture.supplyAsync(() -> {
        try {
            return redisFuture.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        // if get exception
        return "exception";
    }).thenAccept(new Consumer<String>() {
        @Override
        public void accept(String result) {
            if (result == null) {
                result = "nothing";
            }
            // return result
            resultFuture.complete(Collections.singleton(input + " - " + result));
        }
    });
}

往 redis 寫數據


// 寫 hash 類型的數據
private static void writeHash(String key) {

    RedisClient redisClient = RedisClient.create("redis://localhost");
    StatefulRedisConnection<String, String> connection = redisClient.connect();
    RedisClusterCommands<String, String> command = connection.sync();

    for (int i = 0; i < 10000; ++i) {
        command.hset(key, "" + i, "hash-" + i);
    }

    System.out.println("finish");
}
// 寫 string 類型
private static void writeString() {
    RedisClient redisClient = RedisClient.create("redis://localhost");
    StatefulRedisConnection<String, String> connection = redisClient.connect();
    RedisClusterCommands<String, String> command = connection.sync();

    for (int i = 0; i < 100000; ++i) {
        command.set("" + i, "value-" + i);
    }
    System.out.println("finish");
}

測試

啟動任務

kafka 寫入數據

send topic : user_log, message : {"category_id":35,"user_id":"1","item_id":"35173","behavior":"pv","ts":"2022-04-18 16:11:03.469"}
send topic : user_log, message : {"category_id":32,"user_id":"2","item_id":"32556","behavior":"pv","ts":"2022-04-18 16:11:04.469"}
send topic : user_log, message : {"category_id":58,"user_id":"3","item_id":"58508","behavior":"pv","ts":"2022-04-18 16:11:05.469"}
send topic : user_log, message : {"category_id":60,"user_id":"4","item_id":"60712","behavior":"pv","ts":"2022-04-18 16:11:06.469"}
send topic : user_log, message : {"category_id":42,"user_id":"5","item_id":"42957","behavior":"pv","ts":"2022-04-18 16:11:07.469"}
send topic : user_log, message : {"category_id":44,"user_id":"6","item_id":"44459","behavior":"pv","ts":"2022-04-18 16:11:08.469"}
send topic : user_log, message : {"category_id":39,"user_id":"7","item_id":"39188","behavior":"pv","ts":"2022-04-18 16:11:09.469"}
send topic : user_log, message : {"category_id":58,"user_id":"8","item_id":"58703","behavior":"pv","ts":"2022-04-18 16:11:10.469"}
send topic : user_log, message : {"category_id":30,"user_id":"9","item_id":"30993","behavior":"pv","ts":"2022-04-18 16:11:11.469"}
send topic : user_log, message : {"category_id":98,"user_id":"10","item_id":"9818","behavior":"pv","ts":"2022-04-18 16:11:12.469"}


任務輸出

match redis> {"category_id":39,"user_id":"7","item_id":"39188","behavior":"pv","ts":"2022-04-18 16:11:09.469"} - key_7
match redis> {"category_id":58,"user_id":"8","item_id":"58703","behavior":"pv","ts":"2022-04-18 16:11:10.469"} - key_8
match redis> {"category_id":35,"user_id":"1","item_id":"35173","behavior":"pv","ts":"2022-04-18 16:11:03.469"} - key_1
match redis> {"category_id":33,"user_id":"11","item_id":"33807","behavior":"pv","ts":"2022-04-18 16:11:13.469"} - key_11
match redis> {"category_id":44,"user_id":"6","item_id":"44459","behavior":"pv","ts":"2022-04-18 16:11:08.469"} - key_6
match redis> {"category_id":98,"user_id":"10","item_id":"9818","behavior":"pv","ts":"2022-04-18 16:11:12.469"} - key_10
match redis> {"category_id":32,"user_id":"2","item_id":"32556","behavior":"pv","ts":"2022-04-18 16:11:04.469"} - key_2
match redis> {"category_id":30,"user_id":"9","item_id":"30993","behavior":"pv","ts":"2022-04-18 16:11:11.469"} - key_9
match redis> {"category_id":60,"user_id":"4","item_id":"60712","behavior":"pv","ts":"2022-04-18 16:11:06.469"} - key_4
match redis> {"category_id":42,"user_id":"5","item_id":"42957","behavior":"pv","ts":"2022-04-18 16:11:07.469"} - key_5
match redis> {"category_id":58,"user_id":"3","item_id":"58508","behavior":"pv","ts":"2022-04-18 16:11:05.469"} - key_3

搞定

完整代碼參考:flink-rookie

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM