關鍵字:Netty開發redis客戶端,Netty發送redis命令,netty解析redis消息, netty redis ,redis RESP協議。redis客戶端,netty redis協議
我們可以使用redis-cli這個客戶端來操作redis,也可以使用window的命令行telnet連接redis。本文,我們的目標是使用netty來實現redis客戶端,實現目標為:
1. 啟動netty程序
2. 在命令行輸入 set mykey hello,由netty發送給redis服務器
3. 在命令行輸入 get mykey hello,得到結果:hello
4. 在命令行輸入 quit,程序退出
前言
Redis在TCP端口6379(默認,可修改端口)上監聽到來的連接,客戶端連接到來時,Redis服務器為此創建一個TCP連接。在客戶端與服務器端之間傳輸的每個Redis命令或者數據都以\r\n結尾。當redis服務啟動之后,我們可以使用TCP與之鏈接,連接之后便可以發消息,也會受到redis服務器的消息。而這個消息是有格式的,這個格式是事先商量好的,我們稱之為協議,redis的協議叫做RESP
,比方說我們有一條redis命令 set hello 123
,這條命令我們知道它是一條設置命令,通過RESP
協議“翻譯”一下,他就是這樣的:
*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$3\r\n123\r\n
然后,這條協議通過網絡傳輸(二進制形式),傳到redis服務器,被redis服務器解析,最后完成設置。關於RESP
協議的詳細介紹可以看這里.
思路
上面我們介紹了redis是基於TCP傳輸,並使用了其自己的協議——RESP。RESP其實是數據交換可解析的協議,你可以理解為數據交換的格式,按照此格式組裝好要傳輸的命令,並以二進制的形式由client端發往redis服務端。服務端接收這個消息之后,解析消息,執行命令,並將結果以協議好的格式組裝好,傳輸給client端。client端接收到響應,解釋成人類可以看懂的結果展示。
因此,我們可以整理一下思路:
1. 我們需要連接redis服務端,因此需要編寫一個netty client端(此處聯想一下netty client端的樣板代碼)。
2. 我們需要向redis服務端發送redis命令,很簡單,獲取channel,然后write。即channel.write(...)
3. 我們所編寫的直白的命令,如set xx,get xx之類的需要編碼之后才能傳輸給redis服務器。
因此,我們需要 **編碼器**。很榮幸netty自帶了,可以直接使用。
這里是 【輸出】 所以要有 outbound handler.
4. redis會響應結果給我們,因此我們需要在 chanelRead方法中處理數據。
這里是 【輸入】 所以要有 inbound handler.
編寫代碼
上的思路整理好了之后,我們可以寫代碼了。得益於netty的良好設計,我們只需要把netty client的“樣板代碼”拷貝過來生成一個client端代碼即可。剩下的就是 handler ,decoder ,encoder 。我們需要編寫的類有:
- RedisClient 見名知義,我們的主類,包含client bootstrap信息。 接收用戶控制台輸入redis命令。
- RedisClientInitializer 初始化器,在此添加 handler,decoder,encoder
- RedisClientHandler 核心邏輯,需要處理 inbound ,outbound 兩種類型事件。
RedisClient 代碼如下:
public class RedisClient {
String host; // 目標主機
int port; // 目標主機端口
public RedisClient(String host,int port){
this.host = host;
this.port = port;
}
public void start() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new RedisClientInitializer());
Channel channel = bootstrap.connect(host, port).sync().channel();
System.out.println(" connected to host : " + host + ", port : " + port);
System.out.println(" type redis's command to communicate with redis-server or type 'quit' to shutdown ");
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
ChannelFuture lastWriteFuture = null;
for (;;) {
String s = in.readLine();
if(s.equalsIgnoreCase("quit")) {
break;
}
System.out.print(">");
lastWriteFuture = channel.writeAndFlush(s);
lastWriteFuture.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
System.err.print("write failed: ");
future.cause().printStackTrace(System.err);
}
}
});
}
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
System.out.println(" bye ");
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception{
RedisClient client = new RedisClient("redis-cache2.228",5001);
client.start();
}
}
上面代碼很長,但是,我們要熟悉netty的套路,它的樣板代碼就是如此。我們只需要看handler(new RedisClientInitializer());
這一行,下面的就是一個 for(;;)
循環,用來接收我們在控制台輸入的redis命令。
RedisClientInitializer
代碼如下:
public class RedisClientInitializer extends ChannelInitializer<Channel>{
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new RedisDecoder());
pipeline.addLast(new RedisBulkStringAggregator());
pipeline.addLast(new RedisArrayAggregator());
pipeline.addLast(new RedisEncoder());
pipeline.addLast(new RedisClientHandler());
}
}
這個類,很簡單,上面的幾個addLast
方法,除了最后一個外,其他都是netty自帶的redis協議實現相關的編解碼。最后一個是我們自定義的業務邏輯處理器。源碼如下:
public class RedisClientHandler extends ChannelDuplexHandler {
// 發送 redis 命令
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
String[] commands = ((String) msg).split("\\s+");
List<RedisMessage> children = new ArrayList<>(commands.length);
for (String cmdString : commands) {
children.add(new FullBulkStringRedisMessage(ByteBufUtil.writeUtf8(ctx.alloc(), cmdString)));
}
RedisMessage request = new ArrayRedisMessage(children);
ctx.write(request, promise);
}
// 接收 redis 響應數據
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
RedisMessage redisMessage = (RedisMessage) msg;
// 打印響應消息
printAggregatedRedisResponse(redisMessage);
// 是否資源
ReferenceCountUtil.release(redisMessage);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.err.print("exceptionCaught: ");
cause.printStackTrace(System.err);
ctx.close();
}
private static void printAggregatedRedisResponse(RedisMessage msg) {
if (msg instanceof SimpleStringRedisMessage) {
System.out.println(((SimpleStringRedisMessage) msg).content());
} else if (msg instanceof ErrorRedisMessage) {
System.out.println(((ErrorRedisMessage) msg).content());
} else if (msg instanceof IntegerRedisMessage) {
System.out.println(((IntegerRedisMessage) msg).value());
} else if (msg instanceof FullBulkStringRedisMessage) {
System.out.println(getString((FullBulkStringRedisMessage) msg));
} else if (msg instanceof ArrayRedisMessage) {
for (RedisMessage child : ((ArrayRedisMessage) msg).children()) {
printAggregatedRedisResponse(child);
}
} else {
throw new CodecException("unknown message type: " + msg);
}
}
private static String getString(FullBulkStringRedisMessage msg) {
if (msg.isNull()) {
return "(null)";
}
return msg.content().toString(CharsetUtil.UTF_8);
}
}
注意,上面我們討論過,我們需要兩個handler,分別是inbound handler 和outbound handler 。這里我們使用的是ChannelDuplexHandler
。這個ChannelDuplexHandler
支持處理 inbound 和 outbound,其定義如下:
public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {
....
}
運行演示
按照開篇的思路分析,上面我們已經編寫好了netty redis client所需要的代碼。下面我們需要運行看看。main函數如下:
public static void main(String[] args) throws Exception{
RedisClient client = new RedisClient("your-redis-server-ip",6379);
client.start();
}
我在本地運行了一下,演示了一些命令:
- get ,set 以及 錯誤的命令
- expire命令設置超時時間,及 ttl 命令查看超時時間
- del 命令刪除可以
- quit退出程序。
結果如下:
connected to host : 192.168.2.120, port : 6379
type redis's command to communicate with redis-server or type 'quit' to shutdown
get hello
>(null)
set hello
>ERR wrong number of arguments for 'set' command
set hello 123
>OK
expire hello 10
>1
ttl hello
>6
ttl hello
>4
get hello
>(null)
set hello world
>OK
get hello
>world
del hello
>1
quit
bye
Process finished with exit code 0
如此,我們便用netty實現了redis的client端。代碼下載
如果你覺得還可以,給點個推薦吧!
spring如何啟動的?這里結合spring源碼描述了啟動過程
SpringMVC是怎么工作的,SpringMVC的工作原理
spring 異常處理。結合spring源碼分析400異常處理流程及解決方法
Mybatis Mapper接口是如何找到實現類的-源碼分析