學習T-io框架,從寫一個Redis客戶端開始


前言

  了解T-io框架有些日子了,並且還將它應用於實戰,例如 tio-websocket-server,tio-http-server等。但是由於上述兩個server已經封裝好,直接應用就可以。所以對於整個數據流通的過程不是很明朗,甚至對於hello-world例子中的encode,decode作用並不理解。於是乎想寫一個更貼近實際應用的redis-client來作為學習切入點,雖然編碼過程中困難重重,不過最后還是實現了一個粗糙的客戶端。由於代碼中大量參考了Jedis源碼,所以,我給這個客戶端起名T-io+Redis=Tedis.哈哈,這些都不重要,下文中將會記錄出我的學習和開發歷程。

Redis通信協議

  Redis Protocol

  在開發之前,首先要去了解客戶端和服務端的通信協議,那么我們開發Redis客戶端,就要去看看Redis協議了。所以,下面要做的就是:

  • 明確客戶端發送給服務端的消息格式
  • 明確服務端返回給客戶端的消息格式
    在此呢,我只簡單舉一個 GET,SET的例子,其他的內容大家可以去看參考文檔。
//SET命令
set mykey myvalue
//GET命令
get mykey

上述兩個簡單的命令,根據Redis協議可以解析成如下內容

//SET命令
*3\r\n$3\r\nset\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n
//GET命令
*2\r\n$3\r\nget\r\n$5\r\nmykey\r\n

其中 *3代表有三段內容,即 SET,mykey,myvalue.每一段內容之間由 CRLF(\r\n)隔開.$符號后邊跟的數字就是數據字節數。引用官方的一個圖:

Jedis源碼中,對於消息體的構造比較麻煩,我看的也是雲里霧里的,所以在Tedis的實現中我才用了最簡單的拼接方式。即StringBuilder根據規則拼接字符串,然后調用getBytes方法獲取byte[]。示例代碼如下:

public static byte[] buildCommandBody(final ProtocolCommand cmd,String... args) {
        StringBuilder builder = new StringBuilder();
        //*[num]
        builder.append('*')
                //命令數(1) + 參數的個數
                .append(1 + args.length);
        appendCrLf(builder)
                //命令長度  $[cmd_length]
                .append("$")

                .append(cmd.getName().length());
        appendCrLf(builder)
                //命令內容 cmd
                .append(cmd.getName());
        appendCrLf(builder);

        //遍歷參數,按照 $[num]\r\n[content]\r\n的格式拼接
        for (String arg : args) {
            builder.append("$")
                    .append(arg.length());
            appendCrLf(builder)
                    .append(arg);
            appendCrLf(builder);
        }
        //最后轉換為 byte[],此處使用  Jedis 中的 SafeEncoder
        return SafeEncoder.encode(builder.toString());

    }

調用示例:

 public static void main(String[] args){
        Protocol.buildCommandBody(Protocol.Command.SET,"key","value");
  }

打印結果:

*3
$3
SET
$3
key
$5
value

  那么到此為止,我們已經了解了如何構造發送給服務端的消息,那么如何解析服務端返回的消息呢?
Redis 命令會返回多種不同類型的回復。
通過檢查服務器發回數據的第一個字節, 可以確定這個回復是什么類型:

  • 狀態回復(status reply)的第一個字節是 "+"
  • 錯誤回復(error reply)的第一個字節是 "-"
  • 整數回復(integer reply)的第一個字節是 ":"
  • 批量回復(bulk reply)的第一個字節是 "$"
  • 多條批量回復(multi bulk reply)的第一個字節是 "*"
      時間有限,我也只是完成了狀態回復和批量回復的部分功能,下文中將以這兩種回復作為講解示例。

T-io登場

  由於只是客戶端的開發,所以這里我們只會用到TioClient。所以,我們先把Redis-Server連接上。ClientAioHandler,ClientAioListener,ClientGroupContext自然是少不了的啦,直接上代碼吧。

  • 初始化一個 ServerNode
 Node serverNode = new Node("127.0.0.1",6379);
  • 初始化一個ClientGroupContext,它依賴於ClientAioHandler,ClientAioListener
 ClientGroupContext clientGroupContext = new ClientGroupContext(tioClientHandler, aioListener, null);
  • 初始化一個TioClient
 TioClient tioClient = new TioClient(clientGroupContext);
  • 最后連接服務器,如果沒有什么異常打印的話,就連接成功啦
//返回的ClientChannelContext 用於發送消息使用
ClientChannelContext clientChannelContext = tioClient.connect(serverNode);

  恭喜你,一個Redis客戶端寶寶就此誕生,只不過它還不會說話。結合上文協議部分的內容,我們發送一條消息給服務器。首先定義消息包:

public class TedisPacket extends Packet {
    private byte[] body;
    //getter setter
}

然后調用Tio.send方法就可以啦。

     Tio.send(clientChannelContext, packet);

如果你已經看懂了上半部分,那么你就會知道這里 TedisPacket中的body的值就是通過 Protocol.buildCommandBody(Protocol.Command.SET,"key","value");來生成的。不要忘了 `ClientAioHandler.encode’方法哦。

 @Override
    public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) {
        TedisPacket tedisPacket = (TedisPacket) packet;
        byte[] body = tedisPacket.getBody();
        int bodyLen = 0;
        if (body != null) {
            bodyLen = body.length;
        }
        //只是簡單將 body 放入 ByteBuffer 。
        ByteBuffer buffer = ByteBuffer.allocate(bodyLen);
        buffer.put(body);
        return buffer;
    }

到此為止,客戶端向服務器發送消息的內容已經寫完了。下面將介紹如何解析服務端的響應。

  當服務器正常,並且發送到服務器的消息格式符合RESP協議的話,那么服務器會返回你相應的內容,比如我們發送SET命令,服務器的正常響應是+OK\r\n.下面我們看ClientAioHandler.decode方法。當我批量向服務器發送消息時,服務器給我的響應也是批量接收到的。打印結果如下:

那么問題來了,我們只想要每一次發送對應一個OK.所以,原諒我這個菜鳥,我才明白decode方法的目的。那么,我們就去解析這個內容。解析過程有幾個需要關注的地方:

  • 遇到第一個 \r的時候,下一個字節一定是'\n'否則,作為解析失敗處理。
  • \r\n之后停止本輪解析,返回解析結果。
    基於上述注意事項,解析代碼如下:(應該會有更優秀的方法)
    先獲取第一個字節,它應該是 + - $ : *的其中一個,如果不是的話,說明消息可能是上一次不完整導致的,等待下次解析。
byte first = buffer.get();

+OK\r\n舉例:

 private TedisPacket readSingleLinePacket(ByteBuffer buffer,int limit,int position) throws AioDecodeException {
        byte[] body = new byte[limit - position];
        int i = 0;
        //結束標志
        boolean endFlag = false;

        while (buffer.position() <= limit) {
            byte b = buffer.get();
            //如果是\r
            if (BufferReader.isCr(b)) {
                byte c = buffer.get();
                //如果不是\n拋出異常
                if (!BufferReader.isLf(c)) {
                    throw new AioDecodeException("unexpected redis server response");
                }
                //結束解析
                endFlag = true;
                break;
            } else {
                body[i++] = b;
            }
        }
        //如果此次解析一直沒有遇到\r\n,則返回null,等待下次解析
        if (!endFlag) {
            return null;
        }
        TedisPacket packet = new TedisPacket();
        packet.setBody(body);
        return packet;
    }

寫完解析代碼之后,再一次調試結果如下,可以看到數據以5個字節減少,說明數據包被正確解析了。打印內容來自Tio:DecodeRunnable.java.

到此為止,我們完成了消息的發送和接收,但是問題來了,由於消息是異步接收,那我們如何才能讓客戶端知道命令調用是否成功呢?注意,下文中的內容僅為個人理解,錯誤之處懇請指正
既然redis是單線程處理的,那么我是否可以理解為,消息的處理就是先到先處理,后到后處理呢?所以,我的解決方式是通過 LinkedBlockingQueue。當解析完一個包之后,將這個包放入阻塞隊列中。

 @Override
    public void handler(Packet packet, ChannelContext channelContext) throws Exception {
        TedisPacket responsePacket = (TedisPacket) packet;
        if (responsePacket != null) {
            QueueFactory.get(clientName).put(responsePacket);
        }
    }

同步接收返回消息:

   private String getReponse() {
        for (; ; ) {
            try {
                TedisPacket packet = QueueFactory.get(clientName).take();
                return packet.hasBody() ? SafeEncoder.encode(packet.getBody()) : null;
            } catch (InterruptedException e) {
                e.printStackTrace();
                return null;
            }
        }
    }

所以set代碼就變成這樣:

  @Override
    public String set(String key, String value) {
       client.set(key,value);
       return client.getStatusCodeReply();
    }

OK,消息接收這塊是基於我的理解,我也不知道對不對,而且,其中的BUG肯定也是多的數不勝數,沒關系,抱着學習的心態慢慢去完善就好了。Jedis也不是一次兩次就寫成的對吧。

Tedis 與 Jedis

  在開發過程中,我閱讀了很多Jedis的源代碼,大體思路能看懂,可是很多細節處理對我來說就比較難了,大神的代碼只可膜拜。不過也給了我很多啟發。最后不知天高地厚的和人家做一下對比吧。

 public static void main(String[] args) {

        Jedis tedis = new Jedis("192.168.1.225", 6379);
        long start = SystemTimer.currentTimeMillis();
        for (int i = 0; i < 200; i++) {
            tedis.set("tedis", "tedis");
        }
        tedis.get("tedis");
        long end = SystemTimer.currentTimeMillis();

        System.out.println("總共用時:" + (end - start) + "ms,平均用時:" + ((end - start) / 100) + "ms");
    }

Jedis結果:總共用時:262ms,平均用時:2ms
Tedis結果:總共用時:390ms,平均用時:3ms

那么這一毫秒差在哪里呢?

總結

  一篇博客簡單介紹了Redis客戶端的開發過程,當然對於成熟的客戶端Jedis來說,也就是一個HelloWorld,不過這有什么關系呢?知其然,更要知其所以然。看了大神的代碼才知道自己有多渺小哦。繼續加油~~

源碼地址:https://github.com/fanpan26/tedis


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM