項目背景:多線程執行文件ftp傳輸操作,成功一個redis記錄++
文件數據量:百億
線程數:500+
出現問題:並發執行incr redis數據,出現問題,redis的請求數量有限導致無法操作redis
示例一:list的leftpush操作
改造前:單線程一個個排隊插入數據
f.transferFileHistoryList.stream().forEach(x->{
JSONObject file = new JSONObject();
file.put("dataId", x.getDataId());
file.put("fileId", x.getId());
file.put("path", x.getPath());
file.put("name", x.getName());
RedisTaskExecutor.getExecutor("leftpush-%d").execute(() -> {
redisTemplate.opsForList().leftPush(Constant.FILE_TRANSFER_QUEUE, JSON.toJSONString(file));
});
});
改造后:
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
redisTemplate.executePipelined(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection redisConnection) throws DataAccessException {
for (int i = 0; i < 3000000; i++) {
JSONObject file = new JSONObject();
file.put("dataId", CommonUtil.newUUID());
file.put("fileId", CommonUtil.newUUID());
file.put("path", CommonUtil.newUUID());
file.put("name", CommonUtil.newUUID());
file.put("id", i);
redisConnection.lPush(Constant.FILE_TRANSFER_QUEUE.getBytes(), JSON.toJSONString(file).getBytes());
}
return null;
}
},serializer);
測試數據量:300萬,耗時:70秒
優點:要么都成功,要么都失敗
示例二:hash的hset操作
private void addHashRedis(TransferBean transferBean, FTPListAllFiles f) {
// 記錄成品庫的redis
String key = Constant.FILE_TRANSFER_PROGRESS + transferBean.getDataId();
// 記錄線程執行情況
String threadKey = Constant.THREAD_PROGRESS + transferBean.getDataId();
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
redisTemplate.executePipelined(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection redisConnection) throws DataAccessException {
redisConnection.hSet(key.getBytes(), "total".getBytes(), (f.totalCount + "").getBytes());
redisConnection.hSet(key.getBytes(), "success".getBytes(), "0".getBytes());
redisConnection.hSet(key.getBytes(), "size".getBytes(), (f.totalSize + "").getBytes());
redisConnection.hSet(threadKey.getBytes(), "total".getBytes(), (f.totalCount + "").getBytes());
redisConnection.hSet(threadKey.getBytes(), "success".getBytes(), "0".getBytes());
return null;
}
},serializer);
}
其它:還可以使用lua腳本方法執行