記一次使用RedisTeamplate 操作Pipeline


需求背景

當前有個需求,需要將一份過濾出來的數據文件,按照一定的格式導入redis中,之后做數據資源池使用。由於文件數據比較大,有1000w行左右。所以使用redis的pipeline管道去分批寫入redis

什么是Pipeline?

首先先來介紹一下pipeline:
Pipeline指的是管道技術,指的是客戶端允許將多個請求依次發給服務器,過程中而不需要等待請求的回復,在最后再一並讀取結果即可。


下面借鑒一下別人的圖來說明一下為什么pipeline速度會很快。

說白了,普通請求過程就是一次一次去redis-server端,而當client發送請求之后就會阻塞,並等待server響應之后再去處理下一次請求,當數據量大,且網絡波動明顯時,耗時便會非常嚴重。

而pipeline管道則是將多次請求一次性發給server端,server端將多條命令執行完畢,一次性返回,大大減少了多次往返的網絡消耗。

圖片名稱 圖片名稱

redis准備工作

引入redis依賴

這里我用的是springboot

 <!--springboot redis-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <!--lettuce連接池需要此依賴-->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
        <version>2.6.0</version>
    </dependency>

配置文件配置redis參數以及連接設置

這里使用的是lettuce連接池
springboot對連接池的使用非常智能,配置文件中添加lettuce.pool相關配置,則會使用到lettuce連接池,並將相關配置設置為連接池相關參數,這里需要用到上邊的commons-pool2依賴

spring:
redis:
  lettuce:
    pool:
      MaxTotal: 50
      minIdle: 1
      maxWaitMillis: 5000
      maxIdle: 5
      testOnBorrow: true
      testOnReturn: true
      testWhileIdle: true
  mac-resource:
    database: 19
    hostName: r-uf6rnu5b4xxxxxxxapd.redis.rds.aliyuncs.com
    port: 6379
    timeout: 5000
    password: kxohxxxxxxxxyL

配置RedisTemplate

package com.hao.redistest.redistemplatetest.config;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.time.Duration;

/**
 * @author hao
 * @date 2020/4/27
 */
@Configuration
public class RedisConfig {

    //redis連接池參數設置
    @Bean
    @ConfigurationProperties(prefix = "spring.redis.lettuce.pool")
    public GenericObjectPoolConfig redisPoolConfig() {
        return new GenericObjectPoolConfig();
    }

    //根據配置文件的mac-resource讀取 reids資源配置
    @Bean
    @ConfigurationProperties(prefix = "spring.redis.mac-resource")
    public RedisStandaloneConfiguration macResourceConfiguration() {
        return new RedisStandaloneConfiguration();
    }

    //使用lettuceConnectionFactory連接redis
    @Bean
    public LettuceConnectionFactory macResourceFactory() {
        GenericObjectPoolConfig redisPoolConfig = redisPoolConfig();
        LettuceClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder()
                .poolConfig(redisPoolConfig).commandTimeout(Duration.ofMillis(redisPoolConfig.getMaxWaitMillis())).build();
        return new LettuceConnectionFactory(macResourceConfiguration(), clientConfiguration);
    }

    //配置RedisTemplate
    @Bean("macResourceRedisTemplate")
    public RedisTemplate<String,String> macResourceRedisTemplate() {
        LettuceConnectionFactory macResourcePoolFactory = macResourceFactory();
        RedisTemplate<String,String> redisTemplate = new RedisTemplate<>();
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringRedisSerializer);
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        redisTemplate.setValueSerializer(stringRedisSerializer);
        redisTemplate.setConnectionFactory(macResourcePoolFactory);
        return redisTemplate;
    }
}

pipeline使用

batchSave方法為pipeline寫入redis方法

@Service
public class TestServiceImpl implements TestService {

    @Resource(name = "macResourceRedisTemplate")
    private RedisTemplate<String, String> macResourceRedisTemplate;

    /**
     * 讀取文件並寫入redis
     * @param path 要寫入的文件
     */
    @Override
    public void writeToRedis(String path) {
        try {
            //通過參數path讀取要寫入的文件
            FileReader fr = new FileReader(path);
            BufferedReader br = new BufferedReader(fr);
            String line = "";
            String[] resourceArr = null;
            AtomicInteger inc = new AtomicInteger();
            //分批執行
            int batchSize = 20000;
            List<Map<String,String>> batch = new ArrayList<>(batchSize);
            while ((line = br.readLine()) != null) {
                //這里是我讀取寫入的規則,大家可以按自己的規則來
                resourceArr = line.split(",");
                String key = resourceArr[0] + "_" + resourceArr[4];
                Map<String,Object> element = new HashMap<>(1);
                element.put("param1", resourceArr[1]);
                element.put("param2", resourceArr[2]);
                element.put("param3", resourceArr[7]);
                Map<String,String> kv = new HashMap<>();
                kv.put(key, JSON.toJSONString(element));
                batch.add(kv)   ;
                if(batch.size() % batchSize == 0 ){
                    List<Map<String,String>> toSaveBatch = new ArrayList<>(batch);
                    try{
                        //到達設定的batchSize進行pipeline寫入
                        batchSave(toSaveBatch,inc);
                        batch = new ArrayList<>(batchSize);
                    }catch (Exception ex ){
                        for(Map<String,String> m :toSaveBatch){
                            for(Map.Entry<String,String> entry : m.entrySet()){
                                FileUtils.writeStringToFile(new File("tmp/mac_error.txt"),entry.getKey() + "@" + entry.getValue() + "\n", StandardCharsets.UTF_8,true);
                            }
                        }
                        throw new RuntimeException(ex);
                    }
                }
            }
            br.close();
            fr.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 分批用pipeline寫入redis
     * @param batch
     * @param inc
     */
    private void batchSave(List<Map<String,String>> batch,AtomicInteger inc ){
        //調用redisTemplate的executePipelined  重新內部的doInRedis方法,這里用lambda語法寫的 隱藏掉了
        macResourceRedisTemplate.executePipelined((RedisCallback<Object>) redisConnection -> {
            //打開pipeline管道
            redisConnection.openPipeline();
            for(Map<String,String> e : batch){
                for(Map.Entry<String,String> entry : e.entrySet() ){
                    try {
                        //遍歷集合數據,通過pipeline推入redis
                        redisConnection.lPush(entry.getKey().getBytes(),entry.getValue().getBytes());
                    }catch (Exception ex){
                        System.out.println("key:" + entry.getKey() + ",value: " + entry.getValue());
                        throw new RuntimeException(ex);
                    }
                    System.out.println(inc.incrementAndGet());
                }
            }
            return null;
        });
    }
}

啟動項目訪問入口

http://localhost:9999/api/write-redis?path=/Users/hao/Desktop/xxxxx.txt

10秒左右已經寫了20多w

圖片名稱

再看看redis中,已經有了寫入數據

圖片名稱

項目地址

github: redis-pipeline-demo

參考內容

https://www.cnblogs.com/littleatp/p/8419796.html
https://blog.csdn.net/ouyang111222/article/details/50942893


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM