一文徹底理解Redis序列化協議,你也可以編寫Redis客戶端


前提

最近學習Netty的時候想做一個基於Redis服務協議的編碼解碼模塊,過程中順便閱讀了Redis服務序列化協議RESP,結合自己的理解對文檔進行了翻譯並且簡單實現了RESP基於Java語言的解析。編寫本文的使用使用的JDK版本為[8+]

RESP簡介

Redis客戶端與Redis服務端基於一個稱作RESP的協議進行通信,RESP全稱為Redis Serialization Protocol,也就是Redis序列化協議。雖然RESPRedis設計,但是它也可以應用在其他客戶端-服務端(Client-Server)的軟件項目中。RESP在設計的時候折中考慮了如下幾點:

  • 易於實現。
  • 快速解析。
  • 可讀性高。

RESP可以序列化不同的數據類型,如整型、字符串、數組還有一種特殊的Error類型。需要執行的Redis命令會封裝為類似於字符串數組的請求然后通過Redis客戶端發送到Redis服務端。Redis服務端會基於特定的命令類型選擇對應的一種數據類型進行回復(這一句是意譯,原文是:Redis replies with a command-specific data type)。

RESP是二進制安全的(binary-safe),並且在RESP下不需要處理從一個進程傳輸到另一個進程的批量數據,因為它使用了前綴長度(prefixed-length,后面會分析,就是在每個數據塊的前綴已經定義好數據塊的個數,類似於Netty里面的定長編碼解碼)來傳輸批量數據。

注意:此處概述的協議僅僅使用在客戶端-服務端通信,Redis Cluster使用不同的二進制協議在多個節點之間交換消息(也就是Redis集群中的節點之間並不使用RESP通信)。

網絡層

Redis客戶端通過創建一個在6379端口的TCP連接,連接到Redis服務端。

雖然RESP在底層通信協議技術上是非TCP特定的,但在Redis的上下文中,RESP僅用於TCP連接(或類似的面向流的連接,如Unix套接字)。

請求-響應模型

Redis服務端接收由不同參數組成的命令,接收到命令並將其處理之后會把回復發送回Redis客戶端。這是最簡單的模型,但是有兩種例外的情況:

  • Redis支持管道(Pipelining,流水線,多數情況下習慣稱為管道)操作。使用管道的情況下,Redis客戶端可以一次發送多個命令,然后等待一次性的回復(文中的回復是replies,理解為Redis服務端會一次性返回一個批量回復結果)。
  • Redis客戶端訂閱Pub/Sub信道時,該協議會更改語義並成為推送協議(push protocol),也就是說,客戶端不再需要發送命令,因為Redis服務端將自動向客戶端(訂閱了改信道的客戶端)發送新消息(這里的意思是:在訂閱/發布模式下,消息是由Redis服務端主動推送給訂閱了特定信道的Redis客戶端)。

除了上述兩個特例之外,Redis協議是一種簡單的請求-響應協議。

RESP支持的數據類型

RESPRedis 1.2中引入,在Redis 2.0RESP正式成為與Redis服務端通信的標准方案。也就是如果需要編寫Redis客戶端,你就必須在客戶端中實現此協議。

RESP本質上是一種序列化協議,它支持的數據類型如下:單行字符串、錯誤消息、整型數字、定長字符串和RESP數組。

RESPRedis中用作請求-響應協議的方式如下:

  • Redis客戶端將命令封裝為RESP的數組類型(數組元素都是定長字符串類型,注意這一點,很重要)發送到Redis服務器。
  • Redis服務端根據命令實現選擇對應的RESP數據類型之一進行回復。

RESP中,數據類型取決於數據報的第一個字節:

  • 單行字符串的第一個字節為+
  • 錯誤消息的第一個字節為-
  • 整型數字的第一個字節為:
  • 定長字符串的第一個字節為$
  • RESP數組的第一個字節為*

另外,在RESP中可以使用定長字符串或者數組的特殊變體來表示Null值,后面會提及。在RESP中,協議的不同部分始終以\r\nCRLF)終止

目前RESP中5種數據類型的小結如下:

數據類型 本文翻譯名稱 基本特征 例子
Simple String 單行字符串 第一個字節是+,最后兩個字節是\r\n,其他字節是字符串內容 +OK\r\n
Error 錯誤消息 第一個字節是-,最后兩個字節是\r\n,其他字節是異常消息的文本內容 -ERR\r\n
Integer 整型數字 第一個字節是:,最后兩個字節是\r\n,其他字節是數字的文本內容 :100\r\n
Bulk String 定長字符串 第一個字節是$,緊接着的字節是內容字符串長度\r\n,最后兩個字節是\r\n,其他字節是字符串內容 $4\r\ndoge\r\n
Array RESP數組 第一個字節是*,緊接着的字節是元素個數\r\n,最后兩個字節是\r\n,其他字節是各個元素的內容,每個元素可以是任意一種數據類型 *2\r\n:100\r\n$4\r\ndoge\r\n

下面的小節是對每種數據類型的更細致的分析。

RESP簡單字符串-Simple String

簡單字符串的編碼方式如下:

  • (1)第一個字節為+
  • (2)緊接着的是一個不能包含CR或者LF字符的字符串。
  • (3)以CRLF終止。

簡單字符串能夠保證在最小開銷的前提下傳輸非二進制安全的字符串。例如很多Redis命令執行成功后服務端需要回復OK字符串,此時通過簡單字符串編碼為5字節的數據報如下:

+OK\r\n

如果需要發送二進制安全的字符串,那么需要使用定長字符串。

Redis服務端用簡單字符串響應時,Redis客戶端庫應該向調用者返回一個字符串,該響應到調用者的字符串由+之后直到字符串內容末尾的字符組成(其實就是上面提到的第(2)部分的內容),不包括最后的CRLF字節。

RESP錯誤消息-Error

錯誤消息類型是RESP特定的數據類型。實際上,錯誤消息類型和簡單字符串類型基本一致,只是其第一個字節為-。錯誤消息類型跟簡單字符串類型的最大區別是:錯誤消息作為Redis服務端響應的時候,對於客戶端而言應該感知為異常,而錯誤消息中的字符串內容應該感知為Redis服務端返回的錯誤信息。錯誤消息的編碼方式如下:

  • (1)第一個字節為-
  • (2)緊接着的是一個不能包含CR或者LF字符的字符串。
  • (3)以CRLF終止。

一個簡單的例子如下:

-Error message\r\n

Redis服務端只有在真正發生錯誤或者感知錯誤的時候才會回復錯誤消息,例如嘗試對錯誤的數據類型執行操作或者命令不存在等等。Redis客戶端接收到錯誤消息的時候,應該觸發異常(一般情況就是直接拋出異常,可以根據錯誤消息的內容進行異常分類)。下面是錯誤消息響應的一些例子:

-ERR unknown command 'foobar'
-WRONGTYPE Operation against a key holding the wrong kind of value

-之后的第一個單詞到第一個空格或換行符之間的內容,代表返回的錯誤類型。這只是Redis使用的約定,不是RESP錯誤消息格式的一部分。

例如,ERR是通用錯誤,WRONGTYPE則是更具體的錯誤,表示客戶端試圖針對錯誤的數據類型執行操作。這種定義方式稱為錯誤前綴,是一種使客戶端能夠理解服務器返回的錯誤類型的方法,而不必依賴於所給出的確切消息定義,該消息可能會隨時間而變化。

客戶端實現可以針對不同的錯誤類型返回不同種類的異常,或者可以通過將錯誤類型的名稱作為字符串直接提供給調用方來提供捕獲錯誤的通用方法。

但是,不應該將錯誤消息分類處理的功能視為至關重要的功能,因為它作用並不巨大,並且有些的客戶端實現可能會簡單地返回特定值去屏蔽錯誤消息作為通用的異常處理,例如直接返回false

RESP整型數字-Integer

整型數字的編碼方式如下:

  • (1)第一個字節為
  • (2)緊接着的是一個不能包含CR或者LF字符的字符串,也就是數字要先轉換為字符序列,最終要輸出為字節。
  • (3)以CRLF終止。

例如:

:0\r\n
:1000\r\n

許多Redis命令返回整型數字,像INCRLLENLASTSAVE命令等等。

返回的整型數字沒有特殊的含義,像INCR返回的是增量的總量,而LASTSAVEUNIX時間戳。但是Redis服務端保證返回的整型數字在帶符號的64位整數范圍內。

有些情況下,返回的整型數字會指代true或者false。如EXISTS或者SISMEMBER命令執行返回1代表true,0代表false

有些情況下,返回的整型數字會指代命令是否真正產生了效果。如SADDSREMSETNX命令執行返回1代表命令執行生效,0代表命令執行不生效(等價於命令沒有執行)。

下面的一組命令執行后都是返回整型數字:SETNX, DEL, EXISTS, INCR, INCRBY, DECR, DECRBY, DBSIZE, LASTSAVE, RENAMENX, MOVE, LLEN, SADD, SREM, SISMEMBER, SCARD

RESP定長字符串-Bulk String

定長字符串用於表示一個最大長度為512MB的二進制安全的字符串(Bulk,本身有體積大的含義)。定長字符串的編碼方式如下:

  • (1)第一個字節為$
  • (2)緊接着的是組成字符串的字節數長度(稱為prefixed length,也就是前綴長度),前綴長度分塊以CRLF終止。
  • (3)然后是一個不能包含CR或者LF字符的字符串,也就是數字要先轉換為字符序列,最終要輸出為字節。
  • (4)以CRLF終止。

舉個例子,doge使用定長字符串編碼如下:

第一個字節 前綴長度 CRLF 字符串內容 CRLF 定長字符串
$ 4 \r\n doge \r\n ===> $4\r\ndoge\r\n

foobar使用定長字符串編碼如下:

第一個字節 前綴長度 CRLF 字符串內容 CRLF 定長字符串
$ 6 \r\n foobar \r\n ===> $6\r\nfoobar\r\n

表示空字符串(Empty String,對應Java中的"" 的時候,使用定長字符串編碼如下:

第一個字節 前綴長度 CRLF CRLF 定長字符串
$ 0 \r\n \r\n ===> $0\r\n\r\n

定長字符串也可以使用特殊的格式來表示Null值,指代值不存在。在這種特殊格式中,前綴長度為-1,並且沒有數據,因此使用定長字符串對Null值進行編碼如下:

第一個字節 前綴長度 CRLF 定長字符串
$ -1 \r\n ===> $-1\r\n

Redis服務端返回定長字符串編碼的Null值的時候,客戶端不應該返回空字符串,而應該返回對應編程語言中的Null對象。例如Ruby中對應nilC語言中對應NULLJava中對應null,以此類推。

RESP數組-Array

Redis客戶端使用RESP數組發送命令到Redis服務端。與此相似,某些Redis命令執行完畢后服務端需要使用RESP數組類型將元素集合返回給客戶端,如返回一個元素列表的LRANGE命令。RESP數組和我們認知中的數組並不完全一致,它的編碼格式如下:

  • (1)第一個字節為*
  • (2)緊接着的是組成RESP數組的元素個數(十進制數,但是最終需要轉換為字節序列,如10需要轉換為10兩個相鄰的字節),元素個數分塊以CRLF終止。
  • (3)RESP數組的每個元素內容,每個元素可以是任意的RESP數據類型。

一個空的RESP數組的編碼如下:

*0\r\n

一個包含2個定長字符串元素內容分別為foobarRESP數組的編碼如下:

*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n

通用格式就是:*<count>CRLF作為RESP數組的前綴部分,而組成RESP數組的其他數據類型的元素只是一個接一個地串聯在一起。例如一個包含3個整數類型元素的RESP數組的編碼如下:

*3\r\n:1\r\n:2\r\n:3\r\n

RESP數組的元素不一定是同一種數據類型,可以包含混合類型的元素。例如下面是一個包含4個整數類型元素和1個定長字符串類型元素(一共有5個元素)的RESP數組的編碼(為了看得更清楚,分多行進行編碼,實際上不能這樣做):

# 元素個數
*5\r\n
# 第1個整型類型的元素
:1\r\n
# 第2個整型類型的元素
:2\r\n
# 第3個整型類型的元素
:3\r\n
# 第4個整型類型的元素
:4\r\n
# 定長字符串類型的元素
$6\r\n
foobar\r\n

Redis服務端響應報的首行*5\r\n定義了后面會緊跟着5個回復數據,然后每個回復數據分別作元素項,構成了用於傳輸的多元素定長回復(Multi Bulk Reply,感覺比較難翻譯,這里的大概意思就是每個回復行都是整個回復報中的一個項)。

這里可以類比為Java中的ArrayList(泛型擦除),有點類似於下面的偽代碼:

List encode = new ArrayList();
// 添加元素個數
encode.add(elementCount);
encode.add(CRLF);
// 添加第1個整型類型的元素 - 1
encode.add(':');
encode.add(1);
encode.add(CRLF);
// 添加第2個整型類型的元素 - 2
encode.add(':');
encode.add(2);
encode.add(CRLF);
// 添加第3個整型類型的元素 - 3
encode.add(':');
encode.add(3);
encode.add(CRLF);
// 添加第4個整型類型的元素 - 4
encode.add(':');
encode.add(4);
encode.add(CRLF);
// 添加定長字符串類型的元素
encode.add('$');
// 前綴長度
encode.add(6);
// 字符串內容
encode.add("foobar");
encode.add(CRLF);

RESP數組中也存在Null值的概念,下面稱為RESP Null Array。處於歷史原因,RESP數組中采用了另一種特殊的編碼格式定義Null值,區別於定長字符串中的Null值字符串。例如,BLPOP命令執行超時的時候,就會返回一個RESP Null Array類型的響應。RESP Null Array的編碼如下:

*-1\r\n

Redis服務端的回復是RESP Null Array類型的時候,客戶端應該返回一個Null對象,而不是一個空數組或者空列表。這一點比較重要,它是區分回復是空數組(也就是命令正確執行完畢,返回結果正常)或者其他原因(如BLPOP命令的超時等)的關鍵。

RESP數組的元素也可以是RESP數組,下面是一個包含2個RESP數組類型的元素的RESP數組,編碼如下(為了看得更清楚,分多行進行編碼,實際上不能這樣做):

# 元素個數
*2\r\n
# 第1個RESP數組元素
*3\r\n
:1\r\n
:2\r\n
:3\r\n
# 第2個RESP數組元素
*2\r\n
+Foo\r\n
-Bar\r\n

上面的RESP數組的包含2個RESP數組類型的元素,第1個RESP數組元素包含3個整型類型的元素,而第2個RESP數組元素包含1個簡單字符串類型的元素和1個錯誤消息類型的元素。

RESP數組中的Null元素

RESP數組中的單個元素也有Null值的概念,下面稱為Null元素。Redis服務端回復如果是RESP數組類型,並且RESP數組中存在Null元素,那么意味着元素丟失,絕對不能用空字符串替代。缺少指定鍵的前提下,當與GET模式選項一起使用時,SORT命令可能會發生這種情況。

下面是一個包含Null元素的RESP數組的例子(為了看得更清楚,分多行進行編碼,實際上不能這樣做):

*3\r\n
$3\r\n
foo\r\n
$-1\r\n
$3\r\n
bar\r\n

RESP數組中的第2個元素是Null元素,客戶端API最終返回的內容應該是:

# Ruby
["foo",nil,"bar"]
# Java
["foo",null,"bar"]

RESP其他相關內容

主要包括:

  • 將命令發送到Redis服務端的示例。
  • 批量命令與管道。
  • 內聯命令(Inline Commands)。

其實文檔中還有一節使用C語言編寫高性能RESP解析器,這里不做翻譯,因為掌握RESP的相關內容后,可以基於任何語言編寫解析器。

將命令發送到Redis服務端

如果已經相對熟悉RESP中的序列化格式,那么編寫Redis客戶端類庫就會變得很容易。我們可以進一步指定客戶端和服務器之間的交互方式:

  • Redis客戶端向Redis服務端發送僅僅包含定長字符串類型元素的RESP數組。
  • Redis服務端可以采用任意一種RESP數據類型向Redis客戶端進行回復,具體的數據類型一般取決於命令類型。

下面是典型的交互例子:Redis客戶端發送命令LLEN mylist以獲得KEYmylist的長度,Redis服務端將以整數類型進行回復,如以下示例所示(C是客戶端,S服務器),偽代碼如下:

C: *2\r\n
C: $4\r\n
C: LLEN\r\n
C: $6\r\n
C: mylist\r\n

S: :48293\r\n

為了簡單起見,我們使用換行符來分隔協議的不同部分(這里指上面的代碼分行展示),但是實際交互的時候Redis客戶端在發送*2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n的時候是整體發送的。

批量命令與管道

Redis客戶端可以使用相同的連接發送批量命令。Redis支持管道特性,因此Redis客戶端可以通過一次寫操作發送多個命令,而無需在發送下一個命令之前讀取Redis服務端對上一個命令的回復。批量發送命令之后,所有的回復可以在最后得到(合並為一個回復)。更多相關信息可以查看Using pipelining to speedup Redis queries

內聯命令

有些場景下,我們可能只有telnet命令可以使用,在這種條件下,我們需要發送命令到Redis服務端。盡管Redis協議易於實現,但在交互式會話中並不理想,並且redis-cli有些情況下不一定可用。處於這類原因,Redis設計了一種專為人類設計的命令格式,稱為內聯命令(Inline Command格式。

以下是服務器/客戶端使用內聯命令進行聊天的示例(S代表服務端,C代表客戶端):

C: PING
S: +PONG

以下是使用內聯命令返回整數的另一個示例:

C: EXISTS somekey
S: :0

基本上只需在telnet會話中編寫以空格分隔的參數。由於除了統一的請求協議之外沒有命令會以*開頭,Redis能夠檢測到這種情況並解析輸入的命令。

基於RESP編寫高性能解析器

因為JDK原生提供的字節緩沖區java.nio.ByteBuffer存在不能自動擴容、需要切換讀寫模式等等問題,這里直接引入Netty並且使用Netty提供的ByteBuf進行RESP數據類型解析。編寫本文的時候(2019-10-09)Netty的最新版本為4.1.42.Final。引入依賴:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-buffer</artifactId>
    <version>4.1.42.Final</version>
</dependency>

定義解碼器接口:

public interface RespDecoder<V>{
    
    V decode(ByteBuf buffer);
}

定義常量:

public class RespConstants {

    public static final Charset ASCII = StandardCharsets.US_ASCII;
    public static final Charset UTF_8 = StandardCharsets.UTF_8;

    public static final byte DOLLAR_BYTE = '$';
    public static final byte ASTERISK_BYTE = '*';
    public static final byte PLUS_BYTE = '+';
    public static final byte MINUS_BYTE = '-';
    public static final byte COLON_BYTE = ':';

    public static final String EMPTY_STRING = "";
    public static final Long ZERO = 0L;
    public static final Long NEGATIVE_ONE = -1L;
    public static final byte CR = (byte) '\r';
    public static final byte LF = (byte) '\n';
    public static final byte[] CRLF = "\r\n".getBytes(ASCII);

    public enum ReplyType {

        SIMPLE_STRING,

        ERROR,

        INTEGER,

        BULK_STRING,

        RESP_ARRAY
    }
}

下面的章節中解析模塊的實現已經忽略第一個字節的解析,因為第一個字節是決定具體的數據類型。

解析簡單字符串

簡單字符串類型就是單行字符串,它的解析結果對應的就是Java中的String類型。解碼器實現如下:

// 解析單行字符串
public class LineStringDecoder implements RespDecoder<String> {

    @Override
    public String decode(ByteBuf buffer) {
        return CodecUtils.X.readLine(buffer);
    }
}

public enum CodecUtils {

    X;

    public int findLineEndIndex(ByteBuf buffer) {
        int index = buffer.forEachByte(ByteProcessor.FIND_LF);
        return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1;
    }

    public String readLine(ByteBuf buffer) {
        int lineEndIndex = findLineEndIndex(buffer);
        if (lineEndIndex > -1) {
            int lineStartIndex = buffer.readerIndex();
            // 計算字節長度
            int size = lineEndIndex - lineStartIndex - 1;
            byte[] bytes = new byte[size];
            buffer.readBytes(bytes);
            // 重置讀游標為\r\n之后的第一個字節
            buffer.readerIndex(lineEndIndex + 1);
            buffer.markReaderIndex();
            return new String(bytes, RespConstants.UTF_8);
        }
        return null;
    }
}

public class RespSimpleStringDecoder extends LineStringDecoder {
    
}

這里抽取出一個類LineStringDecoder用於解析單行字符串,這樣在解析錯誤消息的時候可以做一次繼承即可。測試一下:

public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // +OK\r\n
    buffer.writeBytes("+OK".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    String value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:OK

解析錯誤消息

錯誤消息的本質也是單行字符串,所以其解碼的實現可以和簡單字符串的解碼實現一致。錯誤消息數據類型的解碼器如下:

public class RespErrorDecoder extends LineStringDecoder {

}

測試一下:

public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // -ERR unknown command 'foobar'\r\n
    buffer.writeBytes("-ERR unknown command 'foobar'".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    String value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:ERR unknown command 'foobar'

解析整型數字

整型數字類型,本質就是需要從字節序列中還原出帶符號的64bit的長整型,因為是帶符號的,類型標識位:后的第一個字節需要判斷是否負數字符-,因為是從左向右解析,然后每解析出一個新的位,當前的數字值要乘10。其解碼器的實現如下:

public class RespIntegerDecoder implements RespDecoder<Long> {

    @Override
    public Long decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        // 沒有行尾,異常
        if (-1 == lineEndIndex) {
            return null;
        }
        long result = 0L;
        int lineStartIndex = buffer.readerIndex();
        boolean negative = false;
        byte firstByte = buffer.getByte(lineStartIndex);
        // 負數
        if (RespConstants.MINUS_BYTE == firstByte) {
            negative = true;
        } else {
            int digit = firstByte - '0';
            result = result * 10 + digit;
        }
        for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) {
            byte value = buffer.getByte(i);
            int digit = value - '0';
            result = result * 10 + digit;
        }
        if (negative) {
            result = -result;
        }
        // 重置讀游標為\r\n之后的第一個字節
        buffer.readerIndex(lineEndIndex + 1);
        return result;
    }
}

整型數字類型的解析相對復雜,一定要注意負數判斷。測試一下:

public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // :-1000\r\n
    buffer.writeBytes(":-1000".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    Long value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:-1000

解析定長字符串

定長字符串類型解析的關鍵是先讀取類型標識符$后的第一個字節序列分塊解析成64bit帶符號的整數,用來確定后面需要解析的字符串內容的字節長度,然后再按照該長度讀取后面的字節。其解碼器實現如下:

public class RespBulkStringDecoder implements RespDecoder<String> {

    @Override
    public String decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        if (-1 == lineEndIndex) {
            return null;
        }
        // 使用RespIntegerDecoder讀取長度
        Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
        if (null == length) {
            return null;
        }
        // Bulk Null String
        if (RespConstants.NEGATIVE_ONE.equals(length)) {
            return null;
        }
        // Bulk Empty String
        if (RespConstants.ZERO.equals(length)) {
            return RespConstants.EMPTY_STRING;
        }
        // 真實字節內容的長度
        int readLength = (int) length.longValue();
        if (buffer.readableBytes() > readLength) {
            byte[] bytes = new byte[readLength];
            buffer.readBytes(bytes);
            // 重置讀游標為\r\n之后的第一個字節
            buffer.readerIndex(buffer.readerIndex() + 2);
            return new String(bytes, RespConstants.UTF_8);
        }
        return null;
    }
}

測試一下:

public static void main(String[] args) throws Exception{
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // $6\r\nthrowable\r\n
    buffer = ByteBufAllocator.DEFAULT.buffer();
    buffer.writeBytes("$9".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("throwable".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    String value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:throwable

解析RESP數組

RESP數組類型解析的關鍵:

  • 先讀取類型標識符*后的第一個字節序列分塊解析成64bit帶符號的整數,確定數組中的元素個數。
  • 遞歸解析每個元素。

參考過不少Redis協議解析框架,不少是用棧或者狀態機實現,這里先簡單點用遞歸實現,解碼器代碼如下:

public class RespArrayDecoder implements RespDecoder {

    @Override
    public Object decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        if (-1 == lineEndIndex) {
            return null;
        }
        // 解析元素個數
        Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
        if (null == length) {
            return null;
        }
        // Null Array
        if (RespConstants.NEGATIVE_ONE.equals(length)) {
            return null;
        }
        // Array Empty List
        if (RespConstants.ZERO.equals(length)) {
            return Lists.newArrayList();
        }
        List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue());
        // 遞歸
        for (int i = 0; i < length; i++) {
            result.add(DefaultRespCodec.X.decode(buffer));
        }
        return result;
    }
}

測試一下:

public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    //*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
    buffer = ByteBufAllocator.DEFAULT.buffer();
    buffer.writeBytes("*2".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("$3".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("foo".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("$3".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("bar".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    List value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:[foo, bar]

小結

RESP的內容和其編碼解碼的過程有相對深刻的認識后,就可以基於Netty編寫Redis服務的編碼解碼模塊,作為Netty入門的十分有意義的例子。本文的最后一節只演示了RESP的解碼部分,編碼模塊和更多細節會在另一篇用Netty實現Redis客戶端的文章中展示。

參考資料:

鏈接

希望你能讀到這里,然后發現我:

附錄

本文涉及的所有代碼:

public class RespConstants {

    public static final Charset ASCII = StandardCharsets.US_ASCII;
    public static final Charset UTF_8 = StandardCharsets.UTF_8;

    public static final byte DOLLAR_BYTE = '$';
    public static final byte ASTERISK_BYTE = '*';
    public static final byte PLUS_BYTE = '+';
    public static final byte MINUS_BYTE = '-';
    public static final byte COLON_BYTE = ':';

    public static final String EMPTY_STRING = "";
    public static final Long ZERO = 0L;
    public static final Long NEGATIVE_ONE = -1L;
    public static final byte CR = (byte) '\r';
    public static final byte LF = (byte) '\n';
    public static final byte[] CRLF = "\r\n".getBytes(ASCII);

    public enum ReplyType {

        SIMPLE_STRING,

        ERROR,

        INTEGER,

        BULK_STRING,

        RESP_ARRAY
    }
}

public enum CodecUtils {

    X;

    public int findLineEndIndex(ByteBuf buffer) {
        int index = buffer.forEachByte(ByteProcessor.FIND_LF);
        return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1;
    }

    public String readLine(ByteBuf buffer) {
        int lineEndIndex = findLineEndIndex(buffer);
        if (lineEndIndex > -1) {
            int lineStartIndex = buffer.readerIndex();
            // 計算字節長度
            int size = lineEndIndex - lineStartIndex - 1;
            byte[] bytes = new byte[size];
            buffer.readBytes(bytes);
            // 重置讀游標為\r\n之后的第一個字節
            buffer.readerIndex(lineEndIndex + 1);
            buffer.markReaderIndex();
            return new String(bytes, RespConstants.UTF_8);
        }
        return null;
    }
}

public interface RespCodec {

    RespCodec X = DefaultRespCodec.X;

    <IN, OUT> OUT decode(ByteBuf buffer);

    <IN, OUT> ByteBuf encode(IN in);
}

public enum DefaultRespCodec implements RespCodec {

    X;

    static final Map<ReplyType, RespDecoder> DECODERS = Maps.newConcurrentMap();
    private static final RespDecoder DEFAULT_DECODER = new DefaultRespDecoder();

    static {
        DECODERS.put(ReplyType.SIMPLE_STRING, new RespSimpleStringDecoder());
        DECODERS.put(ReplyType.ERROR, new RespErrorDecoder());
        DECODERS.put(ReplyType.INTEGER, new RespIntegerDecoder());
        DECODERS.put(ReplyType.BULK_STRING, new RespBulkStringDecoder());
        DECODERS.put(ReplyType.RESP_ARRAY, new RespArrayDecoder());
    }

    @SuppressWarnings("unchecked")
    @Override
    public <IN, OUT> OUT decode(ByteBuf buffer) {
        return (OUT) DECODERS.getOrDefault(determineReplyType(buffer), DEFAULT_DECODER).decode(buffer);
    }

    private ReplyType determineReplyType(ByteBuf buffer) {
        byte firstByte = buffer.readByte();
        ReplyType replyType;
        switch (firstByte) {
            case RespConstants.PLUS_BYTE:
                replyType = ReplyType.SIMPLE_STRING;
                break;
            case RespConstants.MINUS_BYTE:
                replyType = ReplyType.ERROR;
                break;
            case RespConstants.COLON_BYTE:
                replyType = ReplyType.INTEGER;
                break;
            case RespConstants.DOLLAR_BYTE:
                replyType = ReplyType.BULK_STRING;
                break;
            case RespConstants.ASTERISK_BYTE:
                replyType = ReplyType.RESP_ARRAY;
                break;
            default: {
                throw new IllegalArgumentException("first byte:" + firstByte);
            }
        }
        return replyType;
    }

    @Override
    public <IN, OUT> ByteBuf encode(IN in) {
        // TODO
        throw new UnsupportedOperationException("encode");
    }
}

public interface RespDecoder<V> {

    V decode(ByteBuf buffer);
}

public class DefaultRespDecoder implements RespDecoder {

    @Override
    public Object decode(ByteBuf buffer) {
        throw new IllegalStateException("decoder");
    }
}

public class LineStringDecoder implements RespDecoder<String> {

    @Override
    public String decode(ByteBuf buffer) {
        return CodecUtils.X.readLine(buffer);
    }
}

public class RespSimpleStringDecoder extends LineStringDecoder {

}

public class RespErrorDecoder extends LineStringDecoder {

}

public class RespIntegerDecoder implements RespDecoder<Long> {

    @Override
    public Long decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        // 沒有行尾,異常
        if (-1 == lineEndIndex) {
            return null;
        }
        long result = 0L;
        int lineStartIndex = buffer.readerIndex();
        boolean negative = false;
        byte firstByte = buffer.getByte(lineStartIndex);
        // 負數
        if (RespConstants.MINUS_BYTE == firstByte) {
            negative = true;
        } else {
            int digit = firstByte - '0';
            result = result * 10 + digit;
        }
        for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) {
            byte value = buffer.getByte(i);
            int digit = value - '0';
            result = result * 10 + digit;
        }
        if (negative) {
            result = -result;
        }
        // 重置讀游標為\r\n之后的第一個字節
        buffer.readerIndex(lineEndIndex + 1);
        return result;
    }
}

public class RespBulkStringDecoder implements RespDecoder<String> {

    @Override
    public String decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        if (-1 == lineEndIndex) {
            return null;
        }
        Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
        if (null == length) {
            return null;
        }
        // Bulk Null String
        if (RespConstants.NEGATIVE_ONE.equals(length)) {
            return null;
        }
        // Bulk Empty String
        if (RespConstants.ZERO.equals(length)) {
            return RespConstants.EMPTY_STRING;
        }
        // 真實字節內容的長度
        int readLength = (int) length.longValue();
        if (buffer.readableBytes() > readLength) {
            byte[] bytes = new byte[readLength];
            buffer.readBytes(bytes);
            // 重置讀游標為\r\n之后的第一個字節
            buffer.readerIndex(buffer.readerIndex() + 2);
            return new String(bytes, RespConstants.UTF_8);
        }
        return null;
    }
}

public class RespArrayDecoder implements RespDecoder {

    @Override
    public Object decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        if (-1 == lineEndIndex) {
            return null;
        }
        // 解析元素個數
        Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
        if (null == length) {
            return null;
        }
        // Null Array
        if (RespConstants.NEGATIVE_ONE.equals(length)) {
            return null;
        }
        // Array Empty List
        if (RespConstants.ZERO.equals(length)) {
            return Lists.newArrayList();
        }
        List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue());
        // 遞歸
        for (int i = 0; i < length; i++) {
            result.add(DefaultRespCodec.X.decode(buffer));
        }
        return result;
    }
}

(本文完 e-a-20191009 c-2-d)

技術公眾號(《Throwable文摘》),不定期推送筆者原創技術文章(絕不抄襲或者轉載):

娛樂公眾號(《天天沙雕》),甄選奇趣沙雕圖文和視頻不定期推送,緩解生活工作壓力:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM