【redis】pipeline - 管道模型


redis-pipeline

2020-02-10:
因為我把github相關的wiki刪了,所以導致破圖...待解決。(講真github-wiki跟project是2個url,真的不好用)
因為用的是github的外鏈地址https://raw.githubusercontent.com/不一定可以訪問得到導致破圖~~

github-wiki地址:

參考:

pipeline概念

redis-pipeline 技術可以在服務端未響應時,客戶端可以繼續向服務端發送請求,並最終一次性讀取所有服務端的響應。
(摘自 https://www.runoob.com/redis/redis-pipelining.html

Redis provides support for pipelining, which involves sending multiple commands to the server without waiting for the replies and then reading the replies in a single step.
Pipelining can improve performance when you need to send several commands in a row, such as adding many elements to the same List.
(摘自 https://docs.spring.io/spring-data/redis/docs/2.1.8.RELEASE/reference/html/#pipeline

pipeline特點:

  1. 需要 客戶端 和 服務端 同時提供支持。

  2. 有效的減少了RTT和redis連接數,同時也減少了IO調用次數(IO調用涉及到用戶態到內核態之間的切換)。

  3. pipeline並未保證原子性。即同一個pipeline中的命令在redis-server被執行的順序可以保證,但不保證其中不會穿插其余的客戶端請求的命令。
    可以通過redis的slowlog查看命令的執行順序,以驗證此特點。

  4. 每次Pipeline組裝的命令個數不能沒有節制,否則一次組裝Pipeline數據量過大,一方面會增加客戶端的等待時間,另一方面會造成一定的網絡阻塞,可以將一次包含大量命令的Pipeline拆分成多次較小的Pipeline來完成。
    例如,jedis-pipeline每次最大發送字節數為8192。

  5. 使用管道發送命令時,服務器將被迫回復一個隊列答復,占用很多內存。所以,如果你需要發送大量的命令,最好是把他們按照合理數量分批次的處理。
    例如10K的命令,讀回復,然后再發送另一個10k的命令,等等。這樣速度幾乎是相同的,但是在回復這10k命令隊列需要非常大量的內存用來組織返回數據內容。
    例如jedis的邏輯是,jedis的默認的接收緩沖區大小是2619648字節,當超過此值,jedis會通知不能再接收redis的響應結果。此時redis會將響應結果維護在redis端的輸出緩沖區中。

對redis-pipeline的理解

普通請求模型:
redis 普通請求模型

Pipeline請求模型:
redis pipeline請求模型

以上圖片來至:redis通過pipeline提升吞吐量
普通請求模型沒有疑問。但是,個人覺得pipeline請求模型這樣描述並不完整,或者說我對此理解出現了歧義。

直到看到了以下blog:Redis Pipeline原理分析
其中對普通請求模型與上篇blog中的描述是一致的,但是對pipeline請求模型描述更加詳細。
redis pipeline 詳細請求模型

client這邊首先將執行的命令寫入到緩沖中(client-buffer),最后再一次性發送Redis。
但是有一種情況是,緩沖區(client-buffer)的大小是有限制的,比如Jedis,限制為8192,超過了,則刷緩存,發送請求到Redis,但是jedis不會去處理Redis的應答,如上圖所示那樣。

Redis的Pipeline和Transaction(Multi)不同,Transaction會存儲客戶端的命令,最后一次性執行,而Pipeline則是處理一條響應一條。
但是這里卻有一點,就是客戶端會並不會調用read去讀取socket里面的緩沖數據。
這也就造就了,如果Redis應答的數據填滿了該接收緩沖(SO_RECVBUF),那么客戶端會通過ACK,WIN=0(接收窗口)來控制服務端不能再發送數據,
那樣子,數據就會緩沖在Redis的客戶端應答列表里面。所以需要注意控制Pipeline的大小。如下圖:
WIN=0
​​

個人閱讀后對blog中的理解:
blog中提到的客戶端會並不會調用read去讀取socket里面的緩沖數據
對此我的理解是,例如圖中的6:value1, value2, value3,假設client接收到了此response,(假設)並保存在client-receive-buffer(客戶端 接收緩沖區),且剛好占滿client-receive-buffer。
但是,client並不會去read,且 client-receive-buffer 被占滿,client會通過ACK, WIN=0通知redis-server不要再發送響應數據。
即,8: value4 會保存在redis-server端的output-buffer(redis的輸出緩沖區)中。直到client開始read client-receive-buffer。

測試 pipeline

os windows10
redis-server 3.2.100
jedis 2.9.3
spring-data-redis 2.1.6.RELEASE
lettuce 5.1.6.RELEASE
netty 4.1.34.Final
spring-boot 2.1.4.RELEASE

jedis

假設,現在有2個客戶端/線程同時向redis發起命令。
client-01利用pipeline調用1000次 incr key ,client-02利用pipeline調用1000次get key

/**
 * 針對`get jedis` 大概 341 條命令為一組相同結果。
 * 其單條RESP:`*2\r\n$3\r\nget\r\n$5\r\njedis\r\n` len=24
 * jedis-pipeline的client-buffer限制:8192。8192 / 24 ≈ 341
 *
 * @see RedisOutputStream#RedisOutputStream(java.io.OutputStream) jedis默認限制output-buffer=8192。
 */
@Test(dataProvider = "pipelineData", threadPoolSize = 2, invocationCount = 1)
public void jedisPipeline(boolean flag){
    String key = "jedis";

    Jedis jedis = jedisPool.getResource();
    Pipeline pipeline = jedis.pipelined();
    int num = 0, limit = 1000;
    List<Object> list;

    if (flag){
        while (num++ < limit) {
            pipeline.incr(key);
        }

        list = pipeline.syncAndReturnAll();

        log.info("exec: incr, key: {}, result: {} \r\n", key, StringUtils.join(list, ","));
    }else {
        while (num++ < limit) {
            pipeline.get(key);
        }

        list = pipeline.syncAndReturnAll();
        log.info("exec: get, key: {}, result: {} \r\n", key, StringUtils.join(list, ","));
    }

    jedis.close();
}

@DataProvider(name = "pipelineData", parallel = true)
private Object[][] pipelineData(){
    return new Object[][]{{true}, {false}};
}

jedis最終發送命令都是通過源碼:

// redis.clients.jedis.Protocol#sendCommand(redis.clients.util.RedisOutputStream, byte[], byte[]...)
private static void sendCommand(final RedisOutputStream os, final byte[] command,
    final byte[]... args) {
  try {
    os.write(ASTERISK_BYTE);
    os.writeIntCrLf(args.length + 1);
    os.write(DOLLAR_BYTE);
    os.writeIntCrLf(command.length);
    os.write(command);
    os.writeCrLf();

    for (final byte[] arg : args) {
      os.write(DOLLAR_BYTE);
      os.writeIntCrLf(arg.length);
      os.write(arg);
      os.writeCrLf();
    }
  } catch (IOException e) {
    throw new JedisConnectionException(e);
  }
}

然后通過查看 redis.clients.util.RedisOutputStream 源碼:

public final class RedisOutputStream extends FilterOutputStream {
private final static int[] sizeTable = { 9, 99, 999, 9999, 99999, 999999, 9999999, 99999999,
        999999999, Integer.MAX_VALUE };

public RedisOutputStream(final OutputStream out) {
  		this(out, 8192);
}

public void write(final byte b) throws IOException {
  if (count == buf.length) {
    flushBuffer();
  }
  buf[count++] = b;
}

private void flushBuffer() throws IOException {
  if (count > 0) {
    out.write(buf, 0, count);
    count = 0;
  }
}

public void writeIntCrLf(int value) throws IOException {
  if (value < 0) {
    write((byte) '-');
    value = -value;
  }

  int size = 0;
  while (value > sizeTable[size])
    size++;

  size++;
  if (size >= buf.length - count) {
    flushBuffer();
  }

  //  省略其余源碼
}

//  省略其余源碼
}

其中8192即RedisOutputBuffer(jedis輸出緩沖區)大小限制,當pipeline中堆積的命令超過此值時,jedis-client會將緩沖區中的值發送給redis-server。
RedisOutputStream中所有的write操作都有檢測此限制,當達到時,會調用flushBuffer()
(另外,可以通過redis的slowlog命令查看執行順序)

1. jedis 是以完整的 RESP命令組 發送的請求數據嗎?

會有此疑問的原因是,sendCommand()中第一行代碼是os.write(ASTERISK_BYTE),而相應的源碼檢測是count == buf.length

通過debug和wireshark,發現jedis會拆分成多個數據包后發送給redis。
並且,不保證8192個字節最后包含完整的redis命令組(RESP被拆包發送),通過源碼也可以得出這個結論。
例如wireshark結果:342條 get jedis命令,會被jedis-pipeline拆分成2個數據包發送。


​​
可以看到No.161最后是*2..$4..INCR..$5並不是一個完整命令的RESP,其剩余部分包含在No.165中..GET..$5..jedis.. (..表示\r\n)。

pipeline需要客戶端和服務端支持,那么redis源碼中是如何處理pipeline請求的?
c語言看不懂了,可能是hiredis.c -> redisvFormatCommand(...)

很有意思的源碼,如何知道一個int類型的(string)字節數:

private final static int[] sizeTable = { 9, 99, 999, 9999, 99999, 999999, 9999999, 99999999,
        999999999, Integer.MAX_VALUE };

int size = 0;
while (value > sizeTable[size]) size++;

size++;

我可能是會是Integer.toString(it).length(),但對比分析發現,redis源碼中的這種方式實在好太多。

2. jedis-pipeline 什么時候會去read redis響應的結果?

通過對pipeline技術的理解,jedis在發送請求命令結束前,是不會去read redis響應的結果(但是會接收redis的響應結果)。
此時redis響應的結果,保存在jedis端的接收緩沖區中。
如果當jedis端的接收緩沖區占滿,jedis會通知redis win=0,此時redis不會再發送結果給jedis端,轉而把響應結果保存在redis端的輸出緩沖區中(相關:redis.conf -> client-output-buffer-limit)。

當jedis請求命令發送完畢后,jedis會去read jedis端接收緩沖區的數據。

例如 發送400條get jedis,其結果是342

​​
get jedis --RESP--> *2\r\n$3\r\nget\r\n$5\r\njedis\r\n, len=24,24 * 400 = 8190 + 1410。
redis --response RESP--> $3\r\n342\r\n,len=9,9 * 400 = 3069 + 531。

No.238是建立連接后(3次握手完成),jedis通知redis win=2619648,這個值就是jedis的默認接收緩沖區大小。
每次jedis接收到redis的響應后,都會通知redis 自己剩余的接收緩沖區大小,例如 No.248,2616576 = 2619648 - 3069 - 3(不知道為什么差 3)。

  1. 如何驗證 jedis 在 發送請求 結束前,不會read redis的相應結果?
    通過查看jedis源代碼。

  2. 為什么redis響應最大len=3069?
    其實這個len不是redis響應最大len。
    因為一次flush包含的完整命令個數是8190 / 24 = 341.25,所以一次flush能獲取到的完整結果是341 * 9 = 3069

  3. 為什么jedis默認的接收緩沖區是2619648,以及如何改變此值?
    (並未去詳細了解,簡單看了幾篇blog)
    貌似跟MSS有關,或者取操作系統的某個配置參數。

  4. 如何測試當jedis 接收緩沖區滿時的情況?
    假設調用get jedis 300,000次(2619648 / 9 = 291,072 個結果才會占滿jedis的接收緩沖區)。
    但是,實際測試並不會發送win=0通知redis,但會發送Tcp Window Update
    原因,jedis在發送請求結束時,會開始read 接收緩沖區中的數據,並通知redis TCP Winodw Update且更新Win=xxxxxxx
    所以,需要增大發送請求的次數,在發送請求結束前,redis響應結果占滿jedis的接收緩沖區。

    ​​

spring-data-redis (lettuce)

摘自: https://blog.csdn.net/valada/article/details/80871709
不可否認,Jedis 是一個優秀的基於 Java 語言的 Redis 客戶端。
但是,其不足也很明顯:Jedis 在實現上是直接連接 Redis-Server,在多個線程間共享一個 Jedis 實例時是線程不安全的,
如果想要在多線程場景下使用 Jedis,需要使用連接池,每個線程都使用自己的 Jedis 實例,當連接數量增多時,會消耗較多的物理資源。

與 Jedis 相比,Lettuce 則完全克服了其線程不安全的缺點:
Lettuce 是一個可伸縮的線程安全的 Redis 客戶端,支持同步、異步和響應式模式。
多個線程可以共享一個連接實例,而不必擔心多線程並發問題。
它基於優秀 Netty NIO 框架構建,支持 Redis 的高級功能,如 Sentinel,集群,流水線,自動重新連接和 Redis 數據模型。

假設,現在有2個客戶端/線程同時向redis發起命令,client-01利用pipeline調用5次 incr key ,client-02利用pipeline調用5次get key
通過wireshark抓包,發現並未同jedis-pipeline一樣會打包發送。而是有多少個命令,發送了多少次tcp。
不知道是寫法有問題,還是lettuce的問題?(感覺是寫法問題,並未使用到pipeline)

/**
 * 代碼參考的是:https://docs.spring.io/spring-data/redis/docs/2.1.8.RELEASE/reference/html/#pipeline
 * 感覺是寫法問題,也可能是lettuce的問題。
 * 跟jedis-pipeline表現出來並不同,lettuce還是一個完整的命令RESP為一次TCP請求。
 */
@Test
public void lettucePipelineGet(){
    String key = "lettuce";
    int limit = 400;

    List<Object> list = stringRedisTemplate.executePipelined((RedisCallback<Object>) connection -> {
        StringRedisConnection stringRedisConn = (StringRedisConnection)connection;

        int num = 0;
        while (num++ < limit) {
            stringRedisConn.get(key);
        }
        return null;
    });

    log.info("exec: get, key: {}, result: {} \r\n", key, StringUtils.join(list, ","));
}

假設 get lettuce = 20
get lettuce --RESP--> *2\r\n$3\r\nGET\r\n$7\r\nlettuce\r\n, len=26;
redis --response RESP--> $2\r\n20\r\n, len=8.

通過wireshark發現,spring-data-redis 中的pipeline並未如jedis一樣,會將命令打包發送給redis,而是一條命令發送一次。
但是spring-data-redis官方文檔中說支持pipeline,並且測試代碼就是用的官方文檔的寫法。
通過跟蹤代碼org.springframework.data.redis.connection.lettuce.LettuceStringCommands,發現確實開啟了lettuce-pipeline。

通過查看lettuce pipeline修改測試代碼:

@Test
public void lettucePipeline(){
    String key = "lettuce";
    int limit = 400;

    LettuceConnection lettuceConnection = (LettuceConnection) stringRedisTemplate.getConnectionFactory().getConnection();
    RedisClusterAsyncCommands<byte[], byte[]> commands = lettuceConnection.getNativeConnection();

    // 如果想達到打包發送請求的效果(類似jedis-pipeline),需要設置`autoFlushCommands=false`
    // disable auto-flushing
    commands.setAutoFlushCommands(false);
    commands.setTimeout(Duration.ofMinutes(10));

    // perform a series of independent calls
    List<RedisFuture<byte[]>> futures = Lists.newArrayList();
    for (int i = 0; i < limit; i++) {
        futures.add(commands.get(key.getBytes()));
    }

    // 因為`autoFlushCommands=false`,所以需要手動提交命令
    // write all commands to the transport layer
    commands.flushCommands();

    List<Object> result = Lists.newArrayList();
    futures.forEach(e -> {
        try {
            result.add(new String(e.get()));
        } catch (InterruptedException | ExecutionException ex) {
            ex.printStackTrace();
        }
    });

    // later
    lettuceConnection.close();

    log.info("exec: lettuce-get, key: {}, result: {} \r\n", key, StringUtils.join(result, ","));

}

通過以上測試代碼,發現可以達到jedis-pipeline的 打包發送命令集合的效果。

例如 get lettuce,其結果是20
lettuce --RESP--> *2..$3..GET..$7..lettuce.., len=26。
redis --response RESP--> $2..20.., len=8。

所以,lettuce一次flush的限制是416 = 26 * 16,這可以理解成是lettuce或netty(lettuce基於netty)對pipeline實現的控制。
然后128 = 8 * 16,剛好是一次flush的所有結果。

另外,通過wireshark可得出,lettuce與jedis不同點:
lettuce每批次(每個TCP請求中)的命令都是包含完整的RESP命令組,而jedis可能一個完整的RESP命令組是被2個數據包分開發送的(估計由redis組合解析命令)。
(暫時未了解lettuce/netty對pipeline的實現邏輯。)

  1. 為什么lettuce每組的字節數是416?
    不一定是416,測試中416是RESP的整數倍。
    所以,lettuce更可能是以多少個命令為一個批次(比如測試中的 16個命令為一批次)。待閱讀源代碼驗證此結論。

  2. lettuce 一個TCP請求中包含了完整的RESP命令集合,不像jedis可能是2個數據包組成一個完整RESP。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM