flink連接器-流處理-讀寫redis


寫入redis

resultStream.addSink(new RedisSink(FlinkUtils.getRedisSinkConfig(parameters),new MyRedisMapper()));

getRedisSinkConfig

 public static FlinkJedisSentinelConfig getRedisSinkConfig(ParameterTool parameterTool){


        String redisHosts = parameterTool.get(PropertiesUtil.REDIS_HOSTS);
        Set<String> hosts = new HashSet<String>(Arrays.asList(redisHosts.split(",")));
        FlinkJedisSentinelConfig redisProduceConfig = new FlinkJedisSentinelConfig.Builder()
                .setSentinels(hosts)
                .setMasterName(parameterTool.get(PropertiesUtil.REDIS_MASTER))
                .setPassword(parameterTool.get(PropertiesUtil.REDIS_PASSWORD))
                .setMaxIdle(parameterTool.getInt(PropertiesUtil.REDIS_POOL_MAXIDEL))
                .setMaxTotal(parameterTool.getInt(PropertiesUtil.REDIS_POOL_MAXTOTAL))
                .setConnectionTimeout(parameterTool.getInt(PropertiesUtil.REDIS_TIMEOUT)).build();
            return redisProduceConfig;

    }

MyRedisMapper

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

/**
 * @Auther WeiJiQian
 * @描述 Redis 存儲的key和value
 */
public class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {

    /**
     * 設置使用的redis數據結構類型,和key的名詞
     * 通過RedisCommand設置數據結構類型
     * Returns descriptor which defines data type.
     *
     * @return data type descriptor
     */
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.SETEX, Constant.REDIS_KEY_TTL);
    }

    /**
     * 設置value中的鍵值對 key的值
     * Extracts key from data.
     *
     * @return key
     */
    @Override
    public String getKeyFromData(Tuple2<String, String> stringStringTuple2) {
        return stringStringTuple2.f0;
    }

    /**
     * 設置value中的鍵值對 value的值
     * Extracts value from data.
     *
     * @return value
     */
    @Override
    public String getValueFromData(Tuple2<String, String> tuple2) {
        return tuple2.f1;
    }


}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM