一、pipeline出現的背景:
redis客戶端執行一條命令分4個過程:
發送命令-〉命令排隊-〉命令執行-〉返回結果
這個過程稱為Round trip time(簡稱RTT, 往返時間),mget mset有效節約了RTT,但大部分命令(如hgetall,並沒有mhgetall)不支持批量操作,需要消耗N次RTT ,這個時候需要pipeline來解決這個問題
二、pipeline的性能
1、未使用pipeline執行N條命令
2、使用了pipeline執行N條命令
3、兩者性能對比
小結:這是一組統計數據出來的數據,使用Pipeline執行速度比逐條執行要快,特別是客戶端與服務端的網絡延遲越大,性能體能越明顯。
下面貼出測試代碼分析兩者的性能差異:
@Test
public void pipeCompare() {
Jedis redis = new Jedis("192.168.1.111", 6379);
redis.auth("12345678");//授權密碼 對應redis.conf的requirepass密碼
Map<String, String> data = new HashMap<String, String>();
redis.select(8);//使用第8個庫
redis.flushDB();//清空第8個庫所有數據
// hmset
long start = System.currentTimeMillis();
// 直接hmset
for (int i = 0; i < 10000; i++) {
data.clear(); //清空map
data.put("k_" + i, "v_" + i);
redis.hmset("key_" + i, data); //循環執行10000條數據插入redis
}
long end = System.currentTimeMillis();
System.out.println(" 共插入:[" + redis.dbSize() + "]條 .. ");
System.out.println("1,未使用PIPE批量設值耗時" + (end - start) / 1000 + "秒..");
redis.select(8);
redis.flushDB();
// 使用pipeline hmset
Pipeline pipe = redis.pipelined();
start = System.currentTimeMillis();
//
for (int i = 0; i < 10000; i++) {
data.clear();
data.put("k_" + i, "v_" + i);
pipe.hmset("key_" + i, data); //將值封裝到PIPE對象,此時並未執行,還停留在客戶端
}
pipe.sync(); //將封裝后的PIPE一次性發給redis
end = System.currentTimeMillis();
System.out.println(" PIPE共插入:[" + redis.dbSize() + "]條 .. ");
System.out.println("2,使用PIPE批量設值耗時" + (end - start) / 1000 + "秒 ..");
//--------------------------------------------------------------------------------------------------
// hmget
Set<String> keys = redis.keys("key_*"); //將上面設值所有結果鍵查詢出來
// 直接使用Jedis hgetall
start = System.currentTimeMillis();
Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();
for (String key : keys) {
//此處keys根據以上的設值結果,共有10000個,循環10000次
result.put(key, redis.hgetAll(key)); //使用redis對象根據鍵值去取值,將結果放入result對象
}
end = System.currentTimeMillis();
System.out.println(" 共取值:[" + redis.dbSize() + "]條 .. ");
System.out.println("3,未使用PIPE批量取值耗時 " + (end - start) / 1000 + "秒 ..");
// 使用pipeline hgetall
result.clear();
start = System.currentTimeMillis();
for (String key : keys) {
pipe.hgetAll(key); //使用PIPE封裝需要取值的key,此時還停留在客戶端,並未真正執行查詢請求
}
pipe.sync(); //提交到redis進行查詢
end = System.currentTimeMillis();
System.out.println(" PIPE共取值:[" + redis.dbSize() + "]條 .. ");
System.out.println("4,使用PIPE批量取值耗時" + (end - start) / 1000 + "秒 ..");
redis.disconnect();
}
三、原生批命令(mset, mget)與Pipeline對比
1、原生批命令是原子性,pipeline是非原子性
(原子性概念:一個事務是一個不可分割的最小工作單位,要么都成功要么都失敗。原子操作是指你的一個業務邏輯必須是不可拆分的. 處理一件事情要么都成功,要么都失敗,原子不可拆分)
2、原生批命令一命令多個key, 但pipeline支持多命令(存在事務),非原子性
3、原生批命令是服務端實現,而pipeline需要服務端與客戶端共同完成
四、Pipeline正確使用方式
使用pipeline組裝的命令個數不能太多,不然數據量過大,增加客戶端的等待時間,還可能造成網絡阻塞,可以將大量命令的拆分多個小的pipeline命令完成。
1、Jedis中的pipeline使用方式
大家知道redis提供了mset、mget方法,但沒有提供mdel方法,如果想實現,可以借助pipeline實現。
2、Jedis中的pipeline使用步驟:
- 獲取jedis對象(一般從連接池中獲取)
- 獲取jedis對象的pipeline對象
- 添加指令
- 執行指令
測試類方法:
@Test
public void testCommond() {
// 工具類初始化
JedisUtils jedis = new JedisUtils("192.168.1.111", 6379, "12345678");
for (int i = 0; i < 100; i++) {
// 設值
jedis.set("n" + i, String.valueOf(i));
}
System.out.println("keys from redis return =======" + jedis.keys("*"));
}
// 使用pipeline批量刪除
@Test
public void testPipelineMdel() {
// 工具類初始化
JedisUtils jedis = new JedisUtils("192.168.1.111", 6379, "12345678");
List<String> keys = new ArrayList<String>();
for (int i = 0; i < 100; i++) {
keys.add("n" + i);
}
jedis.mdel(keys);
System.out.println("after mdel the redis return ---------" + jedis.keys("*"));
}
JedisUtils下的mdel方法:
/**
* 刪除多個字符串key 並釋放連接
*
* @param keys*
* @return 成功返回value 失敗返回null
*/
public boolean mdel(List<String> keys) {
Jedis jedis = null;
boolean flag = false;
try {
jedis = pool.getResource();//從連接借用Jedis對象
Pipeline pipe = jedis.pipelined();//獲取jedis對象的pipeline對象
for(String key:keys){
pipe.del(key); //將多個key放入pipe刪除指令中
}
pipe.sync(); //執行命令,完全此時pipeline對象的遠程調用
flag = true;
} catch (Exception e) {
pool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
returnResource(pool, jedis);
}
return flag;
}
使用pipeline提交所有操作並返回執行結果:
@Test
public void testPipelineSyncAll() {
// 工具類初始化
Jedis jedis = new Jedis("192.168.1.111", 6379);
jedis.auth("12345678");
// 獲取pipeline對象
Pipeline pipe = jedis.pipelined();
pipe.multi();
pipe.set("name", "james"); // 調值
pipe.incr("age");// 自增
pipe.get("name");
pipe.discard();
// 將不同類型的操作命令合並提交,並將操作操作以list返回
List<Object> list = pipe.syncAndReturnAll();
for (Object obj : list) {
// 將操作結果打印出來
System.out.println(obj);
}
// 斷開連接,釋放資源
jedis.disconnect();
}
五、redis事務
pipeline是多條命令的組合,為了保證它的原子性,redis提供了簡單的事務。
1、redis的簡單事務,
一組需要一起執行的命令放到multi和exec兩個命令之間,其中multi代表事務開始,exec代表事務結束。
2、停止事務discard
3、命令錯誤,語法不正確,導致事務不能正常結束
4、運行錯誤,語法正確,但類型錯誤,事務可以正常結束
5、watch命令:
使用watch后, multi失效,事務失效
WATCH的機制是:在事務EXEC命令執行時,Redis會檢查被WATCH的key,只有被WATCH的key從WATCH起始時至今沒有發生過變更,EXEC才會被執行。如果WATCH的key在WATCH命令到EXEC命令之間發生過變化,則EXEC命令會返回失敗。
小結:redis提供了簡單的事務,不支持事務回滾