前言
了解T-io
框架有些日子了,並且還將它應用於實戰,例如 tio-websocket-server
,tio-http-server
等。但是由於上述兩個server
已經封裝好,直接應用就可以。所以對於整個數據流通的過程不是很明朗,甚至對於hello-world
例子中的encode
,decode
作用並不理解。於是乎想寫一個更貼近實際應用的redis-client
來作為學習切入點,雖然編碼過程中困難重重,不過最后還是實現了一個粗糙的客戶端。由於代碼中大量參考了Jedis
源碼,所以,我給這個客戶端起名T-io
+Redis
=Tedis
.哈哈,這些都不重要,下文中將會記錄出我的學習和開發歷程。
Redis通信協議
在開發之前,首先要去了解客戶端和服務端的通信協議,那么我們開發Redis
客戶端,就要去看看Redis協議了。所以,下面要做的就是:
- 明確客戶端發送給服務端的消息格式
- 明確服務端返回給客戶端的消息格式
在此呢,我只簡單舉一個GET
,SET
的例子,其他的內容大家可以去看參考文檔。
//SET命令
set mykey myvalue
//GET命令
get mykey
上述兩個簡單的命令,根據Redis
協議可以解析成如下內容
//SET命令
*3\r\n$3\r\nset\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n
//GET命令
*2\r\n$3\r\nget\r\n$5\r\nmykey\r\n
其中 *3
代表有三段內容,即 SET
,mykey
,myvalue
.每一段內容之間由 CRLF(\r\n)
隔開.$
符號后邊跟的數字就是數據字節數。引用官方的一個圖:
在Jedis
源碼中,對於消息體的構造比較麻煩,我看的也是雲里霧里的,所以在Tedis
的實現中我才用了最簡單的拼接方式。即StringBuilder
根據規則拼接字符串,然后調用getBytes
方法獲取byte[]
。示例代碼如下:
public static byte[] buildCommandBody(final ProtocolCommand cmd,String... args) {
StringBuilder builder = new StringBuilder();
//*[num]
builder.append('*')
//命令數(1) + 參數的個數
.append(1 + args.length);
appendCrLf(builder)
//命令長度 $[cmd_length]
.append("$")
.append(cmd.getName().length());
appendCrLf(builder)
//命令內容 cmd
.append(cmd.getName());
appendCrLf(builder);
//遍歷參數,按照 $[num]\r\n[content]\r\n的格式拼接
for (String arg : args) {
builder.append("$")
.append(arg.length());
appendCrLf(builder)
.append(arg);
appendCrLf(builder);
}
//最后轉換為 byte[],此處使用 Jedis 中的 SafeEncoder
return SafeEncoder.encode(builder.toString());
}
調用示例:
public static void main(String[] args){
Protocol.buildCommandBody(Protocol.Command.SET,"key","value");
}
打印結果:
*3
$3
SET
$3
key
$5
value
那么到此為止,我們已經了解了如何構造發送給服務端的消息,那么如何解析服務端返回的消息呢?
Redis 命令會返回多種不同類型的回復。
通過檢查服務器發回數據的第一個字節, 可以確定這個回復是什么類型:
- 狀態回復(status reply)的第一個字節是 "+"
- 錯誤回復(error reply)的第一個字節是 "-"
- 整數回復(integer reply)的第一個字節是 ":"
- 批量回復(bulk reply)的第一個字節是 "$"
- 多條批量回復(multi bulk reply)的第一個字節是 "*"
時間有限,我也只是完成了狀態回復和批量回復的部分功能,下文中將以這兩種回復作為講解示例。
T-io登場
由於只是客戶端的開發,所以這里我們只會用到TioClient
。所以,我們先把Redis-Server
連接上。ClientAioHandler
,ClientAioListener
,ClientGroupContext
自然是少不了的啦,直接上代碼吧。
- 初始化一個
ServerNode
Node serverNode = new Node("127.0.0.1",6379);
- 初始化一個
ClientGroupContext
,它依賴於ClientAioHandler
,ClientAioListener
ClientGroupContext clientGroupContext = new ClientGroupContext(tioClientHandler, aioListener, null);
- 初始化一個
TioClient
TioClient tioClient = new TioClient(clientGroupContext);
- 最后連接服務器,如果沒有什么異常打印的話,就連接成功啦
//返回的ClientChannelContext 用於發送消息使用
ClientChannelContext clientChannelContext = tioClient.connect(serverNode);
恭喜你,一個Redis
客戶端寶寶就此誕生,只不過它還不會說話。結合上文協議部分的內容,我們發送一條消息給服務器。首先定義消息包:
public class TedisPacket extends Packet {
private byte[] body;
//getter setter
}
然后調用Tio.send
方法就可以啦。
Tio.send(clientChannelContext, packet);
如果你已經看懂了上半部分,那么你就會知道這里 TedisPacket
中的body
的值就是通過 Protocol.buildCommandBody(Protocol.Command.SET,"key","value");
來生成的。不要忘了 `ClientAioHandler.encode’方法哦。
@Override
public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) {
TedisPacket tedisPacket = (TedisPacket) packet;
byte[] body = tedisPacket.getBody();
int bodyLen = 0;
if (body != null) {
bodyLen = body.length;
}
//只是簡單將 body 放入 ByteBuffer 。
ByteBuffer buffer = ByteBuffer.allocate(bodyLen);
buffer.put(body);
return buffer;
}
到此為止,客戶端向服務器發送消息的內容已經寫完了。下面將介紹如何解析服務端的響應。
當服務器正常,並且發送到服務器的消息格式符合RESP
協議的話,那么服務器會返回你相應的內容,比如我們發送SET
命令,服務器的正常響應是+OK\r\n
.下面我們看ClientAioHandler.decode
方法。當我批量向服務器發送消息時,服務器給我的響應也是批量接收到的。打印結果如下:
那么問題來了,我們只想要每一次發送對應一個OK
.所以,原諒我這個菜鳥,我才明白decode
方法的目的。那么,我們就去解析這個內容。解析過程有幾個需要關注的地方:
- 遇到第一個
\r
的時候,下一個字節一定是'\n'否則,作為解析失敗處理。 \r\n
之后停止本輪解析,返回解析結果。
基於上述注意事項,解析代碼如下:(應該會有更優秀的方法)
先獲取第一個字節,它應該是+ - $ : *
的其中一個,如果不是的話,說明消息可能是上一次不完整導致的,等待下次解析。
byte first = buffer.get();
以 +OK\r\n
舉例:
private TedisPacket readSingleLinePacket(ByteBuffer buffer,int limit,int position) throws AioDecodeException {
byte[] body = new byte[limit - position];
int i = 0;
//結束標志
boolean endFlag = false;
while (buffer.position() <= limit) {
byte b = buffer.get();
//如果是\r
if (BufferReader.isCr(b)) {
byte c = buffer.get();
//如果不是\n拋出異常
if (!BufferReader.isLf(c)) {
throw new AioDecodeException("unexpected redis server response");
}
//結束解析
endFlag = true;
break;
} else {
body[i++] = b;
}
}
//如果此次解析一直沒有遇到\r\n,則返回null,等待下次解析
if (!endFlag) {
return null;
}
TedisPacket packet = new TedisPacket();
packet.setBody(body);
return packet;
}
寫完解析代碼之后,再一次調試結果如下,可以看到數據以5個字節減少,說明數據包被正確解析了。打印內容來自Tio:DecodeRunnable.java
.
到此為止,我們完成了消息的發送和接收,但是問題來了,由於消息是異步接收,那我們如何才能讓客戶端知道命令調用是否成功呢?注意,下文中的內容僅為個人理解,錯誤之處懇請指正
既然redis是單線程處理的,那么我是否可以理解為,消息的處理就是先到先處理,后到后處理呢?所以,我的解決方式是通過 LinkedBlockingQueue
。當解析完一個包之后,將這個包放入阻塞隊列中。
@Override
public void handler(Packet packet, ChannelContext channelContext) throws Exception {
TedisPacket responsePacket = (TedisPacket) packet;
if (responsePacket != null) {
QueueFactory.get(clientName).put(responsePacket);
}
}
同步接收返回消息:
private String getReponse() {
for (; ; ) {
try {
TedisPacket packet = QueueFactory.get(clientName).take();
return packet.hasBody() ? SafeEncoder.encode(packet.getBody()) : null;
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
}
}
所以set代碼就變成這樣:
@Override
public String set(String key, String value) {
client.set(key,value);
return client.getStatusCodeReply();
}
OK,消息接收這塊是基於我的理解,我也不知道對不對,而且,其中的BUG肯定也是多的數不勝數,沒關系,抱着學習的心態慢慢去完善就好了。Jedis
也不是一次兩次就寫成的對吧。
Tedis 與 Jedis
在開發過程中,我閱讀了很多Jedis
的源代碼,大體思路能看懂,可是很多細節處理對我來說就比較難了,大神的代碼只可膜拜。不過也給了我很多啟發。最后不知天高地厚的和人家做一下對比吧。
public static void main(String[] args) {
Jedis tedis = new Jedis("192.168.1.225", 6379);
long start = SystemTimer.currentTimeMillis();
for (int i = 0; i < 200; i++) {
tedis.set("tedis", "tedis");
}
tedis.get("tedis");
long end = SystemTimer.currentTimeMillis();
System.out.println("總共用時:" + (end - start) + "ms,平均用時:" + ((end - start) / 100) + "ms");
}
Jedis結果:總共用時:262ms,平均用時:2ms
Tedis結果:總共用時:390ms,平均用時:3ms
那么這一毫秒差在哪里呢?
總結
一篇博客簡單介紹了Redis
客戶端的開發過程,當然對於成熟的客戶端Jedis
來說,也就是一個HelloWorld,不過這有什么關系呢?知其然,更要知其所以然。看了大神的代碼才知道自己有多渺小哦。繼續加油~~