Java Redis Pipeline 使用示例


1. 參考的優秀文章

2. 來源

原來,系統中一個樹結構的數據來源是Redis,由於數據增多、業務復雜,查詢速度並不快。究其原因,是單次查詢的數量太多了,一個樹結構,大概要幾萬次Redis的交互。於是,嘗試用Redis的Pipelining特性。

3. 測試Pipelining使用與否的差別

3.1. 不使用pipelining

首先,不使用pipelining,插入10w條記錄,再刪除10w條記錄,看看需要多久。

首先來個小程序,用於計算程序消耗的時間:

import java.util.Date;
import java.util.concurrent.TimeUnit;


public class TimeLag {
    
    private Date start;
    private Date end;
    
    public TimeLag() {
        start = new Date();
    }
    
    public String cost() {
        end = new Date();
        long c = end.getTime() - start.getTime();
        
        String s = new StringBuffer().append("cost ").append(c).append(" milliseconds (").append(c / 1000).append(" seconds).").toString();
        return s;
    }
    
    public static void main(String[] args) throws InterruptedException {
        TimeLag t = new TimeLag();
        TimeUnit.SECONDS.sleep(2);
        System.out.println(t.cost());
    }

}

  

package com.nicchagil.study.jedis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;


public class HowToTest {

    public static void main(String[] args) {
        // 連接池
        JedisPool jedisPool = new JedisPool("192.168.1.9", 6379);
        
        /* 操作Redis */
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            
            TimeLag t = new TimeLag();
            System.out.println("操作前,全部Key值:" + jedis.keys("*"));
            /* 插入多條數據 */
            for(Integer i = 0; i < 100000; i++) {
                jedis.set(i.toString(), i.toString());
            }
            
            /* 刪除多條數據 */
            for(Integer i = 0; i < 100000; i++) {
                jedis.del(i.toString());
            }
            System.out.println("操作前,全部Key值:" + jedis.keys("*"));
            System.out.println(t.cost());
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }

}

  

日志,Key值“user_001”是我的Redis存量的值,忽略即可:

操作前,全部Key值:[user_001]
操作前,全部Key值:[user_001]
cost 35997 milliseconds (35 seconds).

  

3.2. 使用pipelining

package com.nicchagil.study.jedis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;


public class HowToTest {

    public static void main(String[] args) {
        // 連接池
        JedisPool jedisPool = new JedisPool("192.168.1.9", 6379);
        
        /* 操作Redis */
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            
            TimeLag t = new TimeLag();
            
            System.out.println("操作前,全部Key值:" + jedis.keys("*"));
            Pipeline p = jedis.pipelined();
            /* 插入多條數據 */
            for(Integer i = 0; i < 100000; i++) {
                p.set(i.toString(), i.toString());
            }
            
            /* 刪除多條數據 */
            for(Integer i = 0; i < 100000; i++) {
                p.del(i.toString());
            }
            p.sync();
            System.out.println("操作前,全部Key值:" + jedis.keys("*"));
            
            System.out.println(t.cost());
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }

}

  

日志:

操作前,全部Key值:[user_001]
操作前,全部Key值:[user_001]
cost 629 milliseconds (0 seconds).

  

4. 為什么Pipelining這么快?

先看看原來的多條命令,是如何執行的:

sequenceDiagram
Redis Client->>Redis Server: 發送第1個命令
Redis Server->>Redis Client: 響應第1個命令
Redis Client->>Redis Server: 發送第2個命令
Redis Server->>Redis Client: 響應第2個命令
Redis Client->>Redis Server: 發送第n個命令
Redis Server->>Redis Client: 響應第n個命令

  

Pipeling機制是怎樣的呢:

sequenceDiagram
Redis Client->>Redis Server: 發送第1個命令(緩存在Redis Client,未即時發送)
Redis Client->>Redis Server: 發送第2個命令(緩存在Redis Client,未即時發送)
Redis Client->>Redis Server: 發送第n個命令(緩存在Redis Client,未即時發送)
Redis Client->>Redis Server: 發送累積的命令
Redis Server->>Redis Client: 響應第1、2、n個命令

  

5. Pipelining的局限性(重要!)

基於其特性,它有兩個明顯的局限性:

  • 鑒於Pipepining發送命令的特性,Redis服務器是以隊列來存儲准備執行的命令,而隊列是存放在有限的內存中的,所以不宜一次性發送過多的命令。如果需要大量的命令,可分批進行,效率不會相差太遠滴,總好過內存溢出嘛~~
  • 由於pipeline的原理是收集需執行的命令,到最后才一次性執行。所以無法在中途立即查得數據的結果(需待pipelining完畢后才能查得結果),這樣會使得無法立即查得數據進行條件判斷(比如判斷是非繼續插入記錄)。

比如,以下代碼中,response.get()p.sync();完畢前無法執行,否則,會報異常

redis.clients.jedis.exceptions.JedisDataException: Please close pipeline or multi block before calling this method.

  

package com.nicchagil.study.jedis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;


public class HowToTest {

    public static void main(String[] args) {
        // 連接池
        JedisPool jedisPool = new JedisPool("192.168.1.9", 6379);
        
        /* 操作Redis */
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            
            TimeLag t = new TimeLag();
            
            System.out.println("操作前,全部Key值:" + jedis.keys("*"));
            Pipeline p = jedis.pipelined();
            /* 插入多條數據 */
            for(Integer i = 0; i < 100000; i++) {
                p.set(i.toString(), i.toString());
            }
            
            Response<String> response = p.get("999");
            // System.out.println(response.get()); // 執行報異常:redis.clients.jedis.exceptions.JedisDataException: Please close pipeline or multi block before calling this method.
            
            /* 刪除多條數據 */
            for(Integer i = 0; i < 100000; i++) {
                p.del(i.toString());
            }
            p.sync();
            
            System.out.println(response.get());
            System.out.println("操作前,全部Key值:" + jedis.keys("*"));
            
            System.out.println(t.cost());
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        
    }
}

  

6. 如何使用Pipelining查詢大量數據

Map<String, Response<String>>先將Response緩存起來再使用就OK了:

package com.nicchagil.study.jedis;

import java.util.HashMap;
import java.util.Map;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;


public class GetMultiRecordWithPipelining {

    public static void main(String[] args) {
        // 連接池
        JedisPool jedisPool = new JedisPool("192.168.1.9", 6379);
        
        /* 操作Redis */
        Jedis jedis = null;
        Map<String, Response<String>> map = new HashMap<String, Response<String>>();
        try {
            jedis = jedisPool.getResource();
            
            TimeLag t = new TimeLag(); // 開始計算時間
            
            Pipeline p = jedis.pipelined();
            /* 插入多條數據 */
            for(Integer i = 0; i < 100000; i++) {
                if (i % 2 == 1) {
                    map.put(i.toString(), p.get(i.toString()));
                }
            }
            p.sync();
            
            /* 由Response對象獲取對應的值 */
            Map<String, String> resultMap = new HashMap<String, String>();
            String result = null;
            for (String key : map.keySet()) {
                result = map.get(key).get();
                if (result != null && result.length() > 0) {
                    resultMap.put(key, result);
                }
            }
            System.out.println("get record num : " + resultMap.size());
            
            System.out.println(t.cost()); // 計時結束
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        
    }
}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM