redis管道實現批量插入數據(lpush和hset舉例)


項目背景:多線程執行文件ftp傳輸操作,成功一個redis記錄++

文件數據量:百億

線程數:500+

出現問題:並發執行incr redis數據,出現問題,redis的請求數量有限導致無法操作redis

 

示例一:list的leftpush操作

    改造前:單線程一個個排隊插入數據

f.transferFileHistoryList.stream().forEach(x->{
JSONObject file = new JSONObject();
file.put("dataId", x.getDataId());
file.put("fileId", x.getId());
file.put("path", x.getPath());
file.put("name", x.getName());
RedisTaskExecutor.getExecutor("leftpush-%d").execute(() -> {
redisTemplate.opsForList().leftPush(Constant.FILE_TRANSFER_QUEUE, JSON.toJSONString(file));
});
});

改造后:
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
redisTemplate.executePipelined(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection redisConnection) throws DataAccessException {
for (int i = 0; i < 3000000; i++) {
JSONObject file = new JSONObject();
file.put("dataId", CommonUtil.newUUID());
file.put("fileId", CommonUtil.newUUID());
file.put("path", CommonUtil.newUUID());
file.put("name", CommonUtil.newUUID());
file.put("id", i);
redisConnection.lPush(Constant.FILE_TRANSFER_QUEUE.getBytes(), JSON.toJSONString(file).getBytes());
}
return null;
}
},serializer);

測試數據量:300萬,耗時:70秒

優點:要么都成功,要么都失敗

示例二:hash的hset操作
private void addHashRedis(TransferBean transferBean, FTPListAllFiles f) {
// 記錄成品庫的redis
String key = Constant.FILE_TRANSFER_PROGRESS + transferBean.getDataId();
// 記錄線程執行情況
String threadKey = Constant.THREAD_PROGRESS + transferBean.getDataId();

RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
redisTemplate.executePipelined(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection redisConnection) throws DataAccessException {
redisConnection.hSet(key.getBytes(), "total".getBytes(), (f.totalCount + "").getBytes());
redisConnection.hSet(key.getBytes(), "success".getBytes(), "0".getBytes());
redisConnection.hSet(key.getBytes(), "size".getBytes(), (f.totalSize + "").getBytes());

redisConnection.hSet(threadKey.getBytes(), "total".getBytes(), (f.totalCount + "").getBytes());
redisConnection.hSet(threadKey.getBytes(), "success".getBytes(), "0".getBytes());
return null;
}
},serializer);
}

其它:還可以使用lua腳本方法執行


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM