最近做一個統計項目,數據量非常大,之前使用scan命令對redis中指定key進行掃描,一次100條,執行穩定、效率低,同時tcp關閉連接的time-wait增速相當的快,對性能造成了極大的浪費同時執行時間也很慢,而且當數據量進一步增大可能會影響其他服務。為了減少tcp連接數量,將redis的scan修改為pipeLine操作。
一、為什么要使用Pipeline?
Redis是采用基於C/S模式的請求/響應協議的TCP服務器。
性能問題一:redis客戶端發送多條請求,后面的請求需要等待前面的請求處理完后,才能進行處理,而且每個請求都存在往返時間RRT(Round Trip Time),即使redis性能極高,當數據量足夠大,也會極大影響性能,還可能會引起其他意外情況。
性能問題二:性能問題一,我們可以通過scan命令來解決,如何來設置count又是一個問題,設置不好,同樣會有大量請求存在,即使設置到1w(推薦最大值),如果掃描的數據量太大,這個問題同樣不能避免。每個請求都會經歷三次握手、四次揮手,在處理大量連接時,處理完后,揮手會產生大量time-wait,如果該服務器提供其他服務,可能對其他服務造成影響。
使用Pipeline可以解決以上問題。
二、如何在使用Pipeline?
本文使用的是RedisTemplate調用execute,connection使用的redis原生的操作,這里簡單使用了zCount來計算keys中集合數據的長度。獲取結果為result,這里要使用Pipeline需要調用Connection.openPipeline()。Connection.closePipeline()返回值為List<Object>是執行后的結果,相當簡單。
redisTemplate.execute(new RedisCallback<Long>() {
@Nullable
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
connection.openPipeline();
for (int i = 0; i < 1000000; i++) {
String key = "123" + i;
connection.zCount(key.getBytes(), 0,Integer.MAX_VALUE);
}
List<Object> result=connection.closePipeline();
return null;
}
});
這個list是放在匿名類內部,對於數據處理不太友好,代碼會看起來相當難受,想取出來使用還是不可變的。如果要獲取返回值,我們可以調用如下代碼executePipelined,這樣就可以返回我們需要的結果,下面我們可以對得到list進行操作。
List<Long> List = redisTemplate.executePipelined(new RedisCallback<Long>() {
@Nullable
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
connection.openPipeline();
for (int i = 0; i < 1000000; i++) {
String key = "123" + i;
connection.zCount(key.getBytes(), 0,Integer.MAX_VALUE);
}
return null;
}
});
在這里需要注意4點內容:
1.這里的connect是redis原生鏈接,所以connection的返回結果是基本上是byte數組,如果需要存儲的數據,需要對byte[]數組反序列化。
2.在doInRedis中返回值必須返回為null,為什么返回為空?可以定位到內部代碼去查看詳情,這里不再贅述。3.connection.openPipeline()可以調用,也可以不調用,但是connection.closePipeline()不能調用,調用了拿不到返回值。因為調用的時候會直接將結果返回,同時也不會對代碼進行反序列化。
4.反序列化需要傳入反序列化對象,這些對象都可以進行相應的實例化,如下圖所示。
根據你的項目需求選擇合適的反序列化對象。比如我在項目中key使用的是StringRedisSerializer,而值通常使用的是GenerJackson2JsonRedisSerializer。所以在初始化redisTemplate的時候會這樣做,代碼如下,將序列化的實例化對象放入redisTemplate中,當使用的時候就可以直接redis.getKeySerializer()或者redis.getValueSerializer(),這樣就不用在實例化一個對象,造成浪費和冗余。
public class MyRedisUtil {
private RedisTemplate redisTemplate;
@Autowired
public void setRedisTemplate(RedisTemplate redisTemplate) {
RedisSerializer keySerializer = new StringRedisSerializer();
RedisSerializer valueSerializer = new GenericJackson2JsonRedisSerializer();
redisTemplate.setKeySerializer(keySerializer);
redisTemplate.setValueSerializer(valueSerializer);
this.redisTemplate = redisTemplate;
}
public RedisTemplate getRedisTemplate() {
return redisTemplate;
}
}
通過這樣的掉用方式,我們就可以不用進行強制轉換,直接獲得我們想要的對象了。
List<User> List = redisTemplate.executePipelined(new RedisCallback<User>() {
@Nullable
@Override
public User doInRedis(RedisConnection connection) throws DataAccessException {
connection.openPipeline();
for (int i = 0; i < 1000000; i++) {
String key = "123" + i;
connection.zCount(key.getBytes(), 0,Integer.MAX_VALUE);
}
return null;
}
}, myRedisComponent.getRedisTemplate().getValueSerializer());