需求背景
當前有個需求,需要將一份過濾出來的數據文件,按照一定的格式導入redis中,之后做數據資源池使用。由於文件數據比較大,有1000w行左右。所以使用redis的pipeline管道去分批寫入redis
什么是Pipeline?
首先先來介紹一下pipeline:
Pipeline指的是管道技術,指的是客戶端允許將多個請求依次發給服務器,過程中而不需要等待請求的回復,在最后再一並讀取結果即可。
下面借鑒一下別人的圖來說明一下為什么pipeline速度會很快。
說白了,普通請求過程就是一次一次去redis-server端,而當client發送請求之后就會阻塞,並等待server響應之后再去處理下一次請求,當數據量大,且網絡波動明顯時,耗時便會非常嚴重。
而pipeline管道則是將多次請求一次性發給server端,server端將多條命令執行完畢,一次性返回,大大減少了多次往返的網絡消耗。


redis准備工作
引入redis依賴
這里我用的是springboot
<!--springboot redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--lettuce連接池需要此依賴-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>
配置文件配置redis參數以及連接設置
這里使用的是lettuce連接池
springboot對連接池的使用非常智能,配置文件中添加lettuce.pool相關配置,則會使用到lettuce連接池,並將相關配置設置為連接池相關參數,這里需要用到上邊的commons-pool2依賴
spring:
redis:
lettuce:
pool:
MaxTotal: 50
minIdle: 1
maxWaitMillis: 5000
maxIdle: 5
testOnBorrow: true
testOnReturn: true
testWhileIdle: true
mac-resource:
database: 19
hostName: r-uf6rnu5b4xxxxxxxapd.redis.rds.aliyuncs.com
port: 6379
timeout: 5000
password: kxohxxxxxxxxyL
配置RedisTemplate
package com.hao.redistest.redistemplatetest.config;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.time.Duration;
/**
* @author hao
* @date 2020/4/27
*/
@Configuration
public class RedisConfig {
//redis連接池參數設置
@Bean
@ConfigurationProperties(prefix = "spring.redis.lettuce.pool")
public GenericObjectPoolConfig redisPoolConfig() {
return new GenericObjectPoolConfig();
}
//根據配置文件的mac-resource讀取 reids資源配置
@Bean
@ConfigurationProperties(prefix = "spring.redis.mac-resource")
public RedisStandaloneConfiguration macResourceConfiguration() {
return new RedisStandaloneConfiguration();
}
//使用lettuceConnectionFactory連接redis
@Bean
public LettuceConnectionFactory macResourceFactory() {
GenericObjectPoolConfig redisPoolConfig = redisPoolConfig();
LettuceClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder()
.poolConfig(redisPoolConfig).commandTimeout(Duration.ofMillis(redisPoolConfig.getMaxWaitMillis())).build();
return new LettuceConnectionFactory(macResourceConfiguration(), clientConfiguration);
}
//配置RedisTemplate
@Bean("macResourceRedisTemplate")
public RedisTemplate<String,String> macResourceRedisTemplate() {
LettuceConnectionFactory macResourcePoolFactory = macResourceFactory();
RedisTemplate<String,String> redisTemplate = new RedisTemplate<>();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.setValueSerializer(stringRedisSerializer);
redisTemplate.setConnectionFactory(macResourcePoolFactory);
return redisTemplate;
}
}
pipeline使用
batchSave方法為pipeline寫入redis方法
@Service
public class TestServiceImpl implements TestService {
@Resource(name = "macResourceRedisTemplate")
private RedisTemplate<String, String> macResourceRedisTemplate;
/**
* 讀取文件並寫入redis
* @param path 要寫入的文件
*/
@Override
public void writeToRedis(String path) {
try {
//通過參數path讀取要寫入的文件
FileReader fr = new FileReader(path);
BufferedReader br = new BufferedReader(fr);
String line = "";
String[] resourceArr = null;
AtomicInteger inc = new AtomicInteger();
//分批執行
int batchSize = 20000;
List<Map<String,String>> batch = new ArrayList<>(batchSize);
while ((line = br.readLine()) != null) {
//這里是我讀取寫入的規則,大家可以按自己的規則來
resourceArr = line.split(",");
String key = resourceArr[0] + "_" + resourceArr[4];
Map<String,Object> element = new HashMap<>(1);
element.put("param1", resourceArr[1]);
element.put("param2", resourceArr[2]);
element.put("param3", resourceArr[7]);
Map<String,String> kv = new HashMap<>();
kv.put(key, JSON.toJSONString(element));
batch.add(kv) ;
if(batch.size() % batchSize == 0 ){
List<Map<String,String>> toSaveBatch = new ArrayList<>(batch);
try{
//到達設定的batchSize進行pipeline寫入
batchSave(toSaveBatch,inc);
batch = new ArrayList<>(batchSize);
}catch (Exception ex ){
for(Map<String,String> m :toSaveBatch){
for(Map.Entry<String,String> entry : m.entrySet()){
FileUtils.writeStringToFile(new File("tmp/mac_error.txt"),entry.getKey() + "@" + entry.getValue() + "\n", StandardCharsets.UTF_8,true);
}
}
throw new RuntimeException(ex);
}
}
}
br.close();
fr.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 分批用pipeline寫入redis
* @param batch
* @param inc
*/
private void batchSave(List<Map<String,String>> batch,AtomicInteger inc ){
//調用redisTemplate的executePipelined 重新內部的doInRedis方法,這里用lambda語法寫的 隱藏掉了
macResourceRedisTemplate.executePipelined((RedisCallback<Object>) redisConnection -> {
//打開pipeline管道
redisConnection.openPipeline();
for(Map<String,String> e : batch){
for(Map.Entry<String,String> entry : e.entrySet() ){
try {
//遍歷集合數據,通過pipeline推入redis
redisConnection.lPush(entry.getKey().getBytes(),entry.getValue().getBytes());
}catch (Exception ex){
System.out.println("key:" + entry.getKey() + ",value: " + entry.getValue());
throw new RuntimeException(ex);
}
System.out.println(inc.incrementAndGet());
}
}
return null;
});
}
}
啟動項目訪問入口
http://localhost:9999/api/write-redis?path=/Users/hao/Desktop/xxxxx.txt
10秒左右已經寫了20多w

再看看redis中,已經有了寫入數據

項目地址
github: redis-pipeline-demo
參考內容
https://www.cnblogs.com/littleatp/p/8419796.html
https://blog.csdn.net/ouyang111222/article/details/50942893