redis管道实现批量插入数据(lpush和hset举例)


项目背景:多线程执行文件ftp传输操作,成功一个redis记录++

文件数据量:百亿

线程数:500+

出现问题:并发执行incr redis数据,出现问题,redis的请求数量有限导致无法操作redis

 

示例一:list的leftpush操作

    改造前:单线程一个个排队插入数据

f.transferFileHistoryList.stream().forEach(x->{
JSONObject file = new JSONObject();
file.put("dataId", x.getDataId());
file.put("fileId", x.getId());
file.put("path", x.getPath());
file.put("name", x.getName());
RedisTaskExecutor.getExecutor("leftpush-%d").execute(() -> {
redisTemplate.opsForList().leftPush(Constant.FILE_TRANSFER_QUEUE, JSON.toJSONString(file));
});
});

改造后:
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
redisTemplate.executePipelined(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection redisConnection) throws DataAccessException {
for (int i = 0; i < 3000000; i++) {
JSONObject file = new JSONObject();
file.put("dataId", CommonUtil.newUUID());
file.put("fileId", CommonUtil.newUUID());
file.put("path", CommonUtil.newUUID());
file.put("name", CommonUtil.newUUID());
file.put("id", i);
redisConnection.lPush(Constant.FILE_TRANSFER_QUEUE.getBytes(), JSON.toJSONString(file).getBytes());
}
return null;
}
},serializer);

测试数据量:300万,耗时:70秒

优点:要么都成功,要么都失败

示例二:hash的hset操作
private void addHashRedis(TransferBean transferBean, FTPListAllFiles f) {
// 记录成品库的redis
String key = Constant.FILE_TRANSFER_PROGRESS + transferBean.getDataId();
// 记录线程执行情况
String threadKey = Constant.THREAD_PROGRESS + transferBean.getDataId();

RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
redisTemplate.executePipelined(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection redisConnection) throws DataAccessException {
redisConnection.hSet(key.getBytes(), "total".getBytes(), (f.totalCount + "").getBytes());
redisConnection.hSet(key.getBytes(), "success".getBytes(), "0".getBytes());
redisConnection.hSet(key.getBytes(), "size".getBytes(), (f.totalSize + "").getBytes());

redisConnection.hSet(threadKey.getBytes(), "total".getBytes(), (f.totalCount + "").getBytes());
redisConnection.hSet(threadKey.getBytes(), "success".getBytes(), "0".getBytes());
return null;
}
},serializer);
}

其它:还可以使用lua脚本方法执行


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM