前提
最近學習Netty
的時候想做一個基於Redis
服務協議的編碼解碼模塊,過程中順便閱讀了Redis
服務序列化協議RESP
,結合自己的理解對文檔進行了翻譯並且簡單實現了RESP
基於Java
語言的解析。編寫本文的使用使用的JDK
版本為[8+]
。
RESP簡介
Redis
客戶端與Redis
服務端基於一個稱作RESP
的協議進行通信,RESP
全稱為Redis Serialization Protocol
,也就是Redis
序列化協議。雖然RESP
為Redis
設計,但是它也可以應用在其他客戶端-服務端(Client-Server
)的軟件項目中。RESP
在設計的時候折中考慮了如下幾點:
- 易於實現。
- 快速解析。
- 可讀性高。
RESP
可以序列化不同的數據類型,如整型、字符串、數組還有一種特殊的Error
類型。需要執行的Redis
命令會封裝為類似於字符串數組的請求然后通過Redis
客戶端發送到Redis
服務端。Redis
服務端會基於特定的命令類型選擇對應的一種數據類型進行回復(這一句是意譯,原文是:Redis replies with a command-specific data type
)。
RESP
是二進制安全的(binary-safe
),並且在RESP
下不需要處理從一個進程傳輸到另一個進程的批量數據,因為它使用了前綴長度(prefixed-length
,后面會分析,就是在每個數據塊的前綴已經定義好數據塊的個數,類似於Netty
里面的定長編碼解碼)來傳輸批量數據。
注意:此處概述的協議僅僅使用在客戶端-服務端通信,Redis Cluster
使用不同的二進制協議在多個節點之間交換消息(也就是Redis
集群中的節點之間並不使用RESP
通信)。
網絡層
Redis
客戶端通過創建一個在6379
端口的TCP
連接,連接到Redis
服務端。
雖然RESP
在底層通信協議技術上是非TCP
特定的,但在Redis
的上下文中,RESP
僅用於TCP
連接(或類似的面向流的連接,如Unix
套接字)。
請求-響應模型
Redis
服務端接收由不同參數組成的命令,接收到命令並將其處理之后會把回復發送回Redis
客戶端。這是最簡單的模型,但是有兩種例外的情況:
Redis
支持管道(Pipelining
,流水線,多數情況下習慣稱為管道)操作。使用管道的情況下,Redis
客戶端可以一次發送多個命令,然后等待一次性的回復(文中的回復是replies
,理解為Redis
服務端會一次性返回一個批量回復結果)。- 當
Redis
客戶端訂閱Pub/Sub
信道時,該協議會更改語義並成為推送協議(push protocol
),也就是說,客戶端不再需要發送命令,因為Redis
服務端將自動向客戶端(訂閱了改信道的客戶端)發送新消息(這里的意思是:在訂閱/發布模式下,消息是由Redis
服務端主動推送給訂閱了特定信道的Redis
客戶端)。
除了上述兩個特例之外,Redis
協議是一種簡單的請求-響應協議。
RESP支持的數據類型
RESP
在Redis 1.2
中引入,在Redis 2.0
,RESP
正式成為與Redis
服務端通信的標准方案。也就是如果需要編寫Redis
客戶端,你就必須在客戶端中實現此協議。
RESP
本質上是一種序列化協議,它支持的數據類型如下:單行字符串、錯誤消息、整型數字、定長字符串和RESP
數組。
RESP
在Redis
中用作請求-響應協議的方式如下:
Redis
客戶端將命令封裝為RESP
的數組類型(數組元素都是定長字符串類型,注意這一點,很重要)發送到Redis
服務器。Redis
服務端根據命令實現選擇對應的RESP
數據類型之一進行回復。
在RESP
中,數據類型取決於數據報的第一個字節:
- 單行字符串的第一個字節為
+
。 - 錯誤消息的第一個字節為
-
。 - 整型數字的第一個字節為
:
。 - 定長字符串的第一個字節為
$
。 RESP
數組的第一個字節為*
。
另外,在RESP
中可以使用定長字符串或者數組的特殊變體來表示Null
值,后面會提及。在RESP
中,協議的不同部分始終以\r\n
(CRLF
)終止。
目前RESP
中5種數據類型的小結如下:
數據類型 | 本文翻譯名稱 | 基本特征 | 例子 |
---|---|---|---|
Simple String |
單行字符串 | 第一個字節是+ ,最后兩個字節是\r\n ,其他字節是字符串內容 |
+OK\r\n |
Error |
錯誤消息 | 第一個字節是- ,最后兩個字節是\r\n ,其他字節是異常消息的文本內容 |
-ERR\r\n |
Integer |
整型數字 | 第一個字節是: ,最后兩個字節是\r\n ,其他字節是數字的文本內容 |
:100\r\n |
Bulk String |
定長字符串 | 第一個字節是$ ,緊接着的字節是內容字符串長度\r\n ,最后兩個字節是\r\n ,其他字節是字符串內容 |
$4\r\ndoge\r\n |
Array |
RESP 數組 |
第一個字節是* ,緊接着的字節是元素個數\r\n ,最后兩個字節是\r\n ,其他字節是各個元素的內容,每個元素可以是任意一種數據類型 |
*2\r\n:100\r\n$4\r\ndoge\r\n |
下面的小節是對每種數據類型的更細致的分析。
RESP簡單字符串-Simple String
簡單字符串的編碼方式如下:
- (1)第一個字節為
+
。 - (2)緊接着的是一個不能包含
CR
或者LF
字符的字符串。 - (3)以
CRLF
終止。
簡單字符串能夠保證在最小開銷的前提下傳輸非二進制安全的字符串。例如很多Redis
命令執行成功后服務端需要回復OK
字符串,此時通過簡單字符串編碼為5字節的數據報如下:
+OK\r\n
如果需要發送二進制安全的字符串,那么需要使用定長字符串。
當Redis
服務端用簡單字符串響應時,Redis
客戶端庫應該向調用者返回一個字符串,該響應到調用者的字符串由+
之后直到字符串內容末尾的字符組成(其實就是上面提到的第(2)部分的內容),不包括最后的CRLF
字節。
RESP錯誤消息-Error
錯誤消息類型是RESP
特定的數據類型。實際上,錯誤消息類型和簡單字符串類型基本一致,只是其第一個字節為-
。錯誤消息類型跟簡單字符串類型的最大區別是:錯誤消息作為Redis
服務端響應的時候,對於客戶端而言應該感知為異常,而錯誤消息中的字符串內容應該感知為Redis
服務端返回的錯誤信息。錯誤消息的編碼方式如下:
- (1)第一個字節為
-
。 - (2)緊接着的是一個不能包含
CR
或者LF
字符的字符串。 - (3)以
CRLF
終止。
一個簡單的例子如下:
-Error message\r\n
Redis
服務端只有在真正發生錯誤或者感知錯誤的時候才會回復錯誤消息,例如嘗試對錯誤的數據類型執行操作或者命令不存在等等。Redis
客戶端接收到錯誤消息的時候,應該觸發異常(一般情況就是直接拋出異常,可以根據錯誤消息的內容進行異常分類)。下面是錯誤消息響應的一些例子:
-ERR unknown command 'foobar'
-WRONGTYPE Operation against a key holding the wrong kind of value
-
之后的第一個單詞到第一個空格或換行符之間的內容,代表返回的錯誤類型。這只是Redis
使用的約定,不是RESP
錯誤消息格式的一部分。
例如,ERR
是通用錯誤,WRONGTYPE
則是更具體的錯誤,表示客戶端試圖針對錯誤的數據類型執行操作。這種定義方式稱為錯誤前綴,是一種使客戶端能夠理解服務器返回的錯誤類型的方法,而不必依賴於所給出的確切消息定義,該消息可能會隨時間而變化。
客戶端實現可以針對不同的錯誤類型返回不同種類的異常,或者可以通過將錯誤類型的名稱作為字符串直接提供給調用方來提供捕獲錯誤的通用方法。
但是,不應該將錯誤消息分類處理的功能視為至關重要的功能,因為它作用並不巨大,並且有些的客戶端實現可能會簡單地返回特定值去屏蔽錯誤消息作為通用的異常處理,例如直接返回false
。
RESP整型數字-Integer
整型數字的編碼方式如下:
- (1)第一個字節為
:
。 - (2)緊接着的是一個不能包含
CR
或者LF
字符的字符串,也就是數字要先轉換為字符序列,最終要輸出為字節。 - (3)以
CRLF
終止。
例如:
:0\r\n
:1000\r\n
許多Redis
命令返回整型數字,像INCR
,LLEN
和LASTSAVE
命令等等。
返回的整型數字沒有特殊的含義,像INCR
返回的是增量的總量,而LASTSAVE
是UNIX
時間戳。但是Redis
服務端保證返回的整型數字在帶符號的64位整數范圍內。
有些情況下,返回的整型數字會指代true
或者false
。如EXISTS
或者SISMEMBER
命令執行返回1代表true
,0代表false
。
有些情況下,返回的整型數字會指代命令是否真正產生了效果。如SADD
,SREM
和SETNX
命令執行返回1代表命令執行生效,0代表命令執行不生效(等價於命令沒有執行)。
下面的一組命令執行后都是返回整型數字:SETNX, DEL, EXISTS, INCR, INCRBY, DECR, DECRBY, DBSIZE, LASTSAVE, RENAMENX, MOVE, LLEN, SADD, SREM, SISMEMBER, SCARD
。
RESP定長字符串-Bulk String
定長字符串用於表示一個最大長度為512MB
的二進制安全的字符串(Bulk
,本身有體積大的含義)。定長字符串的編碼方式如下:
- (1)第一個字節為
$
。 - (2)緊接着的是組成字符串的字節數長度(稱為
prefixed length
,也就是前綴長度),前綴長度分塊以CRLF
終止。 - (3)然后是一個不能包含
CR
或者LF
字符的字符串,也就是數字要先轉換為字符序列,最終要輸出為字節。 - (4)以
CRLF
終止。
舉個例子,doge
使用定長字符串編碼如下:
第一個字節 | 前綴長度 | CRLF |
字符串內容 | CRLF |
定長字符串 | |
---|---|---|---|---|---|---|
$ |
4 |
\r\n |
doge |
\r\n |
===> |
$4\r\ndoge\r\n |
foobar
使用定長字符串編碼如下:
第一個字節 | 前綴長度 | CRLF |
字符串內容 | CRLF |
定長字符串 | |
---|---|---|---|---|---|---|
$ |
6 |
\r\n |
foobar |
\r\n |
===> |
$6\r\nfoobar\r\n |
表示空字符串(Empty String
,對應Java中的""
) 的時候,使用定長字符串編碼如下:
第一個字節 | 前綴長度 | CRLF |
CRLF |
定長字符串 | |
---|---|---|---|---|---|
$ |
0 |
\r\n |
\r\n |
===> |
$0\r\n\r\n |
定長字符串也可以使用特殊的格式來表示Null
值,指代值不存在。在這種特殊格式中,前綴長度為-1,並且沒有數據,因此使用定長字符串對Null
值進行編碼如下:
第一個字節 | 前綴長度 | CRLF |
定長字符串 | |
---|---|---|---|---|
$ |
-1 |
\r\n |
===> |
$-1\r\n |
當Redis
服務端返回定長字符串編碼的Null
值的時候,客戶端不應該返回空字符串,而應該返回對應編程語言中的Null
對象。例如Ruby
中對應nil
,C
語言中對應NULL
,Java
中對應null
,以此類推。
RESP數組-Array
Redis
客戶端使用RESP
數組發送命令到Redis
服務端。與此相似,某些Redis
命令執行完畢后服務端需要使用RESP
數組類型將元素集合返回給客戶端,如返回一個元素列表的LRANGE
命令。RESP
數組和我們認知中的數組並不完全一致,它的編碼格式如下:
- (1)第一個字節為
*
。 - (2)緊接着的是組成
RESP
數組的元素個數(十進制數,但是最終需要轉換為字節序列,如10需要轉換為1
和0
兩個相鄰的字節),元素個數分塊以CRLF
終止。 - (3)
RESP
數組的每個元素內容,每個元素可以是任意的RESP
數據類型。
一個空的RESP
數組的編碼如下:
*0\r\n
一個包含2個定長字符串元素內容分別為foo
和bar
的RESP
數組的編碼如下:
*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
通用格式就是:*<count>CRLF
作為RESP
數組的前綴部分,而組成RESP
數組的其他數據類型的元素只是一個接一個地串聯在一起。例如一個包含3個整數類型元素的RESP
數組的編碼如下:
*3\r\n:1\r\n:2\r\n:3\r\n
RESP
數組的元素不一定是同一種數據類型,可以包含混合類型的元素。例如下面是一個包含4個整數類型元素和1個定長字符串類型元素(一共有5個元素)的RESP
數組的編碼(為了看得更清楚,分多行進行編碼,實際上不能這樣做):
# 元素個數
*5\r\n
# 第1個整型類型的元素
:1\r\n
# 第2個整型類型的元素
:2\r\n
# 第3個整型類型的元素
:3\r\n
# 第4個整型類型的元素
:4\r\n
# 定長字符串類型的元素
$6\r\n
foobar\r\n
Redis
服務端響應報的首行*5\r\n
定義了后面會緊跟着5個回復數據,然后每個回復數據分別作元素項,構成了用於傳輸的多元素定長回復(Multi Bulk Reply
,感覺比較難翻譯,這里的大概意思就是每個回復行都是整個回復報中的一個項)。
這里可以類比為Java
中的ArrayList
(泛型擦除),有點類似於下面的偽代碼:
List encode = new ArrayList();
// 添加元素個數
encode.add(elementCount);
encode.add(CRLF);
// 添加第1個整型類型的元素 - 1
encode.add(':');
encode.add(1);
encode.add(CRLF);
// 添加第2個整型類型的元素 - 2
encode.add(':');
encode.add(2);
encode.add(CRLF);
// 添加第3個整型類型的元素 - 3
encode.add(':');
encode.add(3);
encode.add(CRLF);
// 添加第4個整型類型的元素 - 4
encode.add(':');
encode.add(4);
encode.add(CRLF);
// 添加定長字符串類型的元素
encode.add('$');
// 前綴長度
encode.add(6);
// 字符串內容
encode.add("foobar");
encode.add(CRLF);
RESP
數組中也存在Null
值的概念,下面稱為RESP Null Array
。處於歷史原因,RESP
數組中采用了另一種特殊的編碼格式定義Null
值,區別於定長字符串中的Null
值字符串。例如,BLPOP
命令執行超時的時候,就會返回一個RESP Null Array
類型的響應。RESP Null Array
的編碼如下:
*-1\r\n
當Redis
服務端的回復是RESP Null Array
類型的時候,客戶端應該返回一個Null
對象,而不是一個空數組或者空列表。這一點比較重要,它是區分回復是空數組(也就是命令正確執行完畢,返回結果正常)或者其他原因(如BLPOP
命令的超時等)的關鍵。
RESP
數組的元素也可以是RESP
數組,下面是一個包含2個RESP
數組類型的元素的RESP
數組,編碼如下(為了看得更清楚,分多行進行編碼,實際上不能這樣做):
# 元素個數
*2\r\n
# 第1個RESP數組元素
*3\r\n
:1\r\n
:2\r\n
:3\r\n
# 第2個RESP數組元素
*2\r\n
+Foo\r\n
-Bar\r\n
上面的RESP
數組的包含2個RESP
數組類型的元素,第1個RESP
數組元素包含3個整型類型的元素,而第2個RESP
數組元素包含1個簡單字符串類型的元素和1個錯誤消息類型的元素。
RESP
數組中的Null元素
RESP
數組中的單個元素也有Null
值的概念,下面稱為Null
元素。Redis
服務端回復如果是RESP
數組類型,並且RESP
數組中存在Null
元素,那么意味着元素丟失,絕對不能用空字符串替代。缺少指定鍵的前提下,當與GET
模式選項一起使用時,SORT
命令可能會發生這種情況。
下面是一個包含Null
元素的RESP
數組的例子(為了看得更清楚,分多行進行編碼,實際上不能這樣做):
*3\r\n
$3\r\n
foo\r\n
$-1\r\n
$3\r\n
bar\r\n
RESP
數組中的第2個元素是Null
元素,客戶端API
最終返回的內容應該是:
# Ruby
["foo",nil,"bar"]
# Java
["foo",null,"bar"]
RESP其他相關內容
主要包括:
- 將命令發送到Redis服務端的示例。
- 批量命令與管道。
- 內聯命令(
Inline Commands
)。
其實文檔中還有一節使用C
語言編寫高性能RESP
解析器,這里不做翻譯,因為掌握RESP
的相關內容后,可以基於任何語言編寫解析器。
將命令發送到Redis服務端
如果已經相對熟悉RESP
中的序列化格式,那么編寫Redis
客戶端類庫就會變得很容易。我們可以進一步指定客戶端和服務器之間的交互方式:
Redis
客戶端向Redis
服務端發送僅僅包含定長字符串類型元素的RESP
數組。Redis
服務端可以采用任意一種RESP
數據類型向Redis
客戶端進行回復,具體的數據類型一般取決於命令類型。
下面是典型的交互例子:Redis
客戶端發送命令LLEN mylist
以獲得KEY
為mylist
的長度,Redis
服務端將以整數類型進行回復,如以下示例所示(C
是客戶端,S
服務器),偽代碼如下:
C: *2\r\n
C: $4\r\n
C: LLEN\r\n
C: $6\r\n
C: mylist\r\n
S: :48293\r\n
為了簡單起見,我們使用換行符來分隔協議的不同部分(這里指上面的代碼分行展示),但是實際交互的時候Redis
客戶端在發送*2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n
的時候是整體發送的。
批量命令與管道
Redis
客戶端可以使用相同的連接發送批量命令。Redis
支持管道特性,因此Redis
客戶端可以通過一次寫操作發送多個命令,而無需在發送下一個命令之前讀取Redis
服務端對上一個命令的回復。批量發送命令之后,所有的回復可以在最后得到(合並為一個回復)。更多相關信息可以查看Using pipelining to speedup Redis queries。
內聯命令
有些場景下,我們可能只有telnet
命令可以使用,在這種條件下,我們需要發送命令到Redis
服務端。盡管Redis
協議易於實現,但在交互式會話中並不理想,並且redis-cli
有些情況下不一定可用。處於這類原因,Redis
設計了一種專為人類設計的命令格式,稱為內聯命令(Inline Command
格式。
以下是服務器/客戶端使用內聯命令進行聊天的示例(S代表服務端,C代表客戶端):
C: PING
S: +PONG
以下是使用內聯命令返回整數的另一個示例:
C: EXISTS somekey
S: :0
基本上只需在telnet
會話中編寫以空格分隔的參數。由於除了統一的請求協議之外沒有命令會以*
開頭,Redis
能夠檢測到這種情況並解析輸入的命令。
基於RESP編寫高性能解析器
因為JDK
原生提供的字節緩沖區java.nio.ByteBuffer
存在不能自動擴容、需要切換讀寫模式等等問題,這里直接引入Netty
並且使用Netty
提供的ByteBuf
進行RESP
數據類型解析。編寫本文的時候(2019-10-09)Netty
的最新版本為4.1.42.Final
。引入依賴:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.1.42.Final</version>
</dependency>
定義解碼器接口:
public interface RespDecoder<V>{
V decode(ByteBuf buffer);
}
定義常量:
public class RespConstants {
public static final Charset ASCII = StandardCharsets.US_ASCII;
public static final Charset UTF_8 = StandardCharsets.UTF_8;
public static final byte DOLLAR_BYTE = '$';
public static final byte ASTERISK_BYTE = '*';
public static final byte PLUS_BYTE = '+';
public static final byte MINUS_BYTE = '-';
public static final byte COLON_BYTE = ':';
public static final String EMPTY_STRING = "";
public static final Long ZERO = 0L;
public static final Long NEGATIVE_ONE = -1L;
public static final byte CR = (byte) '\r';
public static final byte LF = (byte) '\n';
public static final byte[] CRLF = "\r\n".getBytes(ASCII);
public enum ReplyType {
SIMPLE_STRING,
ERROR,
INTEGER,
BULK_STRING,
RESP_ARRAY
}
}
下面的章節中解析模塊的實現已經忽略第一個字節的解析,因為第一個字節是決定具體的數據類型。
解析簡單字符串
簡單字符串類型就是單行字符串,它的解析結果對應的就是Java
中的String
類型。解碼器實現如下:
// 解析單行字符串
public class LineStringDecoder implements RespDecoder<String> {
@Override
public String decode(ByteBuf buffer) {
return CodecUtils.X.readLine(buffer);
}
}
public enum CodecUtils {
X;
public int findLineEndIndex(ByteBuf buffer) {
int index = buffer.forEachByte(ByteProcessor.FIND_LF);
return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1;
}
public String readLine(ByteBuf buffer) {
int lineEndIndex = findLineEndIndex(buffer);
if (lineEndIndex > -1) {
int lineStartIndex = buffer.readerIndex();
// 計算字節長度
int size = lineEndIndex - lineStartIndex - 1;
byte[] bytes = new byte[size];
buffer.readBytes(bytes);
// 重置讀游標為\r\n之后的第一個字節
buffer.readerIndex(lineEndIndex + 1);
buffer.markReaderIndex();
return new String(bytes, RespConstants.UTF_8);
}
return null;
}
}
public class RespSimpleStringDecoder extends LineStringDecoder {
}
這里抽取出一個類LineStringDecoder
用於解析單行字符串,這樣在解析錯誤消息的時候可以做一次繼承即可。測試一下:
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// +OK\r\n
buffer.writeBytes("+OK".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
String value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:OK
解析錯誤消息
錯誤消息的本質也是單行字符串,所以其解碼的實現可以和簡單字符串的解碼實現一致。錯誤消息數據類型的解碼器如下:
public class RespErrorDecoder extends LineStringDecoder {
}
測試一下:
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// -ERR unknown command 'foobar'\r\n
buffer.writeBytes("-ERR unknown command 'foobar'".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
String value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:ERR unknown command 'foobar'
解析整型數字
整型數字類型,本質就是需要從字節序列中還原出帶符號的64bit
的長整型,因為是帶符號的,類型標識位:
后的第一個字節需要判斷是否負數字符-
,因為是從左向右解析,然后每解析出一個新的位,當前的數字值要乘10
。其解碼器的實現如下:
public class RespIntegerDecoder implements RespDecoder<Long> {
@Override
public Long decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
// 沒有行尾,異常
if (-1 == lineEndIndex) {
return null;
}
long result = 0L;
int lineStartIndex = buffer.readerIndex();
boolean negative = false;
byte firstByte = buffer.getByte(lineStartIndex);
// 負數
if (RespConstants.MINUS_BYTE == firstByte) {
negative = true;
} else {
int digit = firstByte - '0';
result = result * 10 + digit;
}
for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) {
byte value = buffer.getByte(i);
int digit = value - '0';
result = result * 10 + digit;
}
if (negative) {
result = -result;
}
// 重置讀游標為\r\n之后的第一個字節
buffer.readerIndex(lineEndIndex + 1);
return result;
}
}
整型數字類型的解析相對復雜,一定要注意負數判斷。測試一下:
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// :-1000\r\n
buffer.writeBytes(":-1000".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
Long value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:-1000
解析定長字符串
定長字符串類型解析的關鍵是先讀取類型標識符$
后的第一個字節序列分塊解析成64bit
帶符號的整數,用來確定后面需要解析的字符串內容的字節長度,然后再按照該長度讀取后面的字節。其解碼器實現如下:
public class RespBulkStringDecoder implements RespDecoder<String> {
@Override
public String decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
if (-1 == lineEndIndex) {
return null;
}
// 使用RespIntegerDecoder讀取長度
Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
if (null == length) {
return null;
}
// Bulk Null String
if (RespConstants.NEGATIVE_ONE.equals(length)) {
return null;
}
// Bulk Empty String
if (RespConstants.ZERO.equals(length)) {
return RespConstants.EMPTY_STRING;
}
// 真實字節內容的長度
int readLength = (int) length.longValue();
if (buffer.readableBytes() > readLength) {
byte[] bytes = new byte[readLength];
buffer.readBytes(bytes);
// 重置讀游標為\r\n之后的第一個字節
buffer.readerIndex(buffer.readerIndex() + 2);
return new String(bytes, RespConstants.UTF_8);
}
return null;
}
}
測試一下:
public static void main(String[] args) throws Exception{
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// $6\r\nthrowable\r\n
buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes("$9".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("throwable".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
String value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:throwable
解析RESP數組
RESP
數組類型解析的關鍵:
- 先讀取類型標識符
*
后的第一個字節序列分塊解析成64bit
帶符號的整數,確定數組中的元素個數。 - 遞歸解析每個元素。
參考過不少Redis
協議解析框架,不少是用棧或者狀態機實現,這里先簡單點用遞歸實現,解碼器代碼如下:
public class RespArrayDecoder implements RespDecoder {
@Override
public Object decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
if (-1 == lineEndIndex) {
return null;
}
// 解析元素個數
Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
if (null == length) {
return null;
}
// Null Array
if (RespConstants.NEGATIVE_ONE.equals(length)) {
return null;
}
// Array Empty List
if (RespConstants.ZERO.equals(length)) {
return Lists.newArrayList();
}
List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue());
// 遞歸
for (int i = 0; i < length; i++) {
result.add(DefaultRespCodec.X.decode(buffer));
}
return result;
}
}
測試一下:
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
//*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes("*2".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("$3".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("foo".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("$3".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("bar".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
List value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:[foo, bar]
小結
對RESP
的內容和其編碼解碼的過程有相對深刻的認識后,就可以基於Netty
編寫Redis
服務的編碼解碼模塊,作為Netty
入門的十分有意義的例子。本文的最后一節只演示了RESP
的解碼部分,編碼模塊和更多細節會在另一篇用Netty
實現Redis
客戶端的文章中展示。
參考資料:
鏈接
希望你能讀到這里,然后發現我:
- Github Page:http://www.throwable.club/2019/10/09/redis-serialization-protocol-decode-guide
- Coding Page:http://throwable.coding.me/2019/10/09/redis-serialization-protocol-decode-guide
附錄
本文涉及的所有代碼:
public class RespConstants {
public static final Charset ASCII = StandardCharsets.US_ASCII;
public static final Charset UTF_8 = StandardCharsets.UTF_8;
public static final byte DOLLAR_BYTE = '$';
public static final byte ASTERISK_BYTE = '*';
public static final byte PLUS_BYTE = '+';
public static final byte MINUS_BYTE = '-';
public static final byte COLON_BYTE = ':';
public static final String EMPTY_STRING = "";
public static final Long ZERO = 0L;
public static final Long NEGATIVE_ONE = -1L;
public static final byte CR = (byte) '\r';
public static final byte LF = (byte) '\n';
public static final byte[] CRLF = "\r\n".getBytes(ASCII);
public enum ReplyType {
SIMPLE_STRING,
ERROR,
INTEGER,
BULK_STRING,
RESP_ARRAY
}
}
public enum CodecUtils {
X;
public int findLineEndIndex(ByteBuf buffer) {
int index = buffer.forEachByte(ByteProcessor.FIND_LF);
return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1;
}
public String readLine(ByteBuf buffer) {
int lineEndIndex = findLineEndIndex(buffer);
if (lineEndIndex > -1) {
int lineStartIndex = buffer.readerIndex();
// 計算字節長度
int size = lineEndIndex - lineStartIndex - 1;
byte[] bytes = new byte[size];
buffer.readBytes(bytes);
// 重置讀游標為\r\n之后的第一個字節
buffer.readerIndex(lineEndIndex + 1);
buffer.markReaderIndex();
return new String(bytes, RespConstants.UTF_8);
}
return null;
}
}
public interface RespCodec {
RespCodec X = DefaultRespCodec.X;
<IN, OUT> OUT decode(ByteBuf buffer);
<IN, OUT> ByteBuf encode(IN in);
}
public enum DefaultRespCodec implements RespCodec {
X;
static final Map<ReplyType, RespDecoder> DECODERS = Maps.newConcurrentMap();
private static final RespDecoder DEFAULT_DECODER = new DefaultRespDecoder();
static {
DECODERS.put(ReplyType.SIMPLE_STRING, new RespSimpleStringDecoder());
DECODERS.put(ReplyType.ERROR, new RespErrorDecoder());
DECODERS.put(ReplyType.INTEGER, new RespIntegerDecoder());
DECODERS.put(ReplyType.BULK_STRING, new RespBulkStringDecoder());
DECODERS.put(ReplyType.RESP_ARRAY, new RespArrayDecoder());
}
@SuppressWarnings("unchecked")
@Override
public <IN, OUT> OUT decode(ByteBuf buffer) {
return (OUT) DECODERS.getOrDefault(determineReplyType(buffer), DEFAULT_DECODER).decode(buffer);
}
private ReplyType determineReplyType(ByteBuf buffer) {
byte firstByte = buffer.readByte();
ReplyType replyType;
switch (firstByte) {
case RespConstants.PLUS_BYTE:
replyType = ReplyType.SIMPLE_STRING;
break;
case RespConstants.MINUS_BYTE:
replyType = ReplyType.ERROR;
break;
case RespConstants.COLON_BYTE:
replyType = ReplyType.INTEGER;
break;
case RespConstants.DOLLAR_BYTE:
replyType = ReplyType.BULK_STRING;
break;
case RespConstants.ASTERISK_BYTE:
replyType = ReplyType.RESP_ARRAY;
break;
default: {
throw new IllegalArgumentException("first byte:" + firstByte);
}
}
return replyType;
}
@Override
public <IN, OUT> ByteBuf encode(IN in) {
// TODO
throw new UnsupportedOperationException("encode");
}
}
public interface RespDecoder<V> {
V decode(ByteBuf buffer);
}
public class DefaultRespDecoder implements RespDecoder {
@Override
public Object decode(ByteBuf buffer) {
throw new IllegalStateException("decoder");
}
}
public class LineStringDecoder implements RespDecoder<String> {
@Override
public String decode(ByteBuf buffer) {
return CodecUtils.X.readLine(buffer);
}
}
public class RespSimpleStringDecoder extends LineStringDecoder {
}
public class RespErrorDecoder extends LineStringDecoder {
}
public class RespIntegerDecoder implements RespDecoder<Long> {
@Override
public Long decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
// 沒有行尾,異常
if (-1 == lineEndIndex) {
return null;
}
long result = 0L;
int lineStartIndex = buffer.readerIndex();
boolean negative = false;
byte firstByte = buffer.getByte(lineStartIndex);
// 負數
if (RespConstants.MINUS_BYTE == firstByte) {
negative = true;
} else {
int digit = firstByte - '0';
result = result * 10 + digit;
}
for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) {
byte value = buffer.getByte(i);
int digit = value - '0';
result = result * 10 + digit;
}
if (negative) {
result = -result;
}
// 重置讀游標為\r\n之后的第一個字節
buffer.readerIndex(lineEndIndex + 1);
return result;
}
}
public class RespBulkStringDecoder implements RespDecoder<String> {
@Override
public String decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
if (-1 == lineEndIndex) {
return null;
}
Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
if (null == length) {
return null;
}
// Bulk Null String
if (RespConstants.NEGATIVE_ONE.equals(length)) {
return null;
}
// Bulk Empty String
if (RespConstants.ZERO.equals(length)) {
return RespConstants.EMPTY_STRING;
}
// 真實字節內容的長度
int readLength = (int) length.longValue();
if (buffer.readableBytes() > readLength) {
byte[] bytes = new byte[readLength];
buffer.readBytes(bytes);
// 重置讀游標為\r\n之后的第一個字節
buffer.readerIndex(buffer.readerIndex() + 2);
return new String(bytes, RespConstants.UTF_8);
}
return null;
}
}
public class RespArrayDecoder implements RespDecoder {
@Override
public Object decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
if (-1 == lineEndIndex) {
return null;
}
// 解析元素個數
Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
if (null == length) {
return null;
}
// Null Array
if (RespConstants.NEGATIVE_ONE.equals(length)) {
return null;
}
// Array Empty List
if (RespConstants.ZERO.equals(length)) {
return Lists.newArrayList();
}
List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue());
// 遞歸
for (int i = 0; i < length; i++) {
result.add(DefaultRespCodec.X.decode(buffer));
}
return result;
}
}
(本文完 e-a-20191009 c-2-d)
技術公眾號(《Throwable文摘》),不定期推送筆者原創技術文章(絕不抄襲或者轉載):
娛樂公眾號(《天天沙雕》),甄選奇趣沙雕圖文和視頻不定期推送,緩解生活工作壓力: