Netty中,通訊的雙方建立連接后,會把數據按照ByteBuf的方式進行傳輸,例如http協議中,就是通過HttpRequestDecoder對ByteBuf數據流進行處理,轉換成http的對象。基於這個思路,我自定義一種通訊協議:Server和客戶端直接傳輸java對象。
實現的原理是通過Encoder把java對象轉換成ByteBuf流進行傳輸,通過Decoder把ByteBuf轉換成java對象進行處理,處理邏輯如下圖所示:
傳輸的java bean為Person:
- package com.guowl.testobjcoder;
- import java.io.Serializable;
- // 必須實現Serializable接口
- public class Person implements Serializable{
- private static final long serialVersionUID = 1L;
- private String name;
- private String sex;
- private int age;
- public String toString() {
- return "name:" + name + " sex:" + sex + " age:" + age;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public String getSex() {
- return sex;
- }
- public void setSex(String sex) {
- this.sex = sex;
- }
- public int getAge() {
- return age;
- }
- public void setAge(int age) {
- this.age = age;
- }
- }
Server端類:Server PersonDecoder BusinessHandler
1、Server:啟動netty服務
- package com.guowl.testobjcoder;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- public class Server {
- public void start(int port) throws Exception {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new PersonDecoder());
- ch.pipeline().addLast(new BusinessHandler());
- }
- }).option(ChannelOption.SO_BACKLOG, 128)
- .childOption(ChannelOption.SO_KEEPALIVE, true);
- ChannelFuture f = b.bind(port).sync();
- f.channel().closeFuture().sync();
- } finally {
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- }
- public static void main(String[] args) throws Exception {
- Server server = new Server();
- server.start(8000);
- }
- }
2、PersonDecoder:把ByteBuf流轉換成Person對象,其中ByteBufToBytes是讀取ButeBuf的工具類,上一篇文章中提到過,在此不在詳述。ByteObjConverter是byte和obj的互相轉換的工具。
- package com.guowl.testobjcoder;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.ByteToMessageDecoder;
- import java.util.List;
- import com.guowl.utils.ByteBufToBytes;
- import com.guowl.utils.ByteObjConverter;
- public class PersonDecoder extends ByteToMessageDecoder {
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
- ByteBufToBytes read = new ByteBufToBytes();
- Object obj = ByteObjConverter.ByteToObject(read.read(in));
- out.add(obj);
- }
- }
3、BusinessHandler 讀取Person信息,並打印
- package com.guowl.testobjcoder;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class BusinessHandler extends ChannelInboundHandlerAdapter {
- private Logger logger = LoggerFactory.getLogger(BusinessHandler.class);
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- Person person = (Person) msg;
- logger.info("BusinessHandler read msg from client :" + person);
- }
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- ctx.flush();
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- }
- }
Client端的類:Client ClientInitHandler PersonEncoder
1、Client 建立與Server的連接
- package com.guowl.testobjcoder;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- public class Client {
- public void connect(String host, int port) throws Exception {
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- Bootstrap b = new Bootstrap();
- b.group(workerGroup);
- b.channel(NioSocketChannel.class);
- b.option(ChannelOption.SO_KEEPALIVE, true);
- b.handler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new PersonEncoder());
- ch.pipeline().addLast(new ClientInitHandler());
- }
- });
- ChannelFuture f = b.connect(host, port).sync();
- f.channel().closeFuture().sync();
- } finally {
- workerGroup.shutdownGracefully();
- }
- }
- public static void main(String[] args) throws Exception {
- Client client = new Client();
- client.connect("127.0.0.1", 8000);
- }
- }
2、ClientInitHandler 向Server發送Person對象
- package com.guowl.testobjcoder;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class ClientInitHandler extends ChannelInboundHandlerAdapter {
- private static Logger logger = LoggerFactory.getLogger(ClientInitHandler.class);
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- logger.info("HelloClientIntHandler.channelActive");
- Person person = new Person();
- person.setName("guowl");
- person.setSex("man");
- person.setAge(30);
- ctx.write(person);
- ctx.flush();
- }
- }
3、PersonEncoder 把Person對象轉換成ByteBuf進行傳送
- package com.guowl.testobjcoder;
- import com.guowl.utils.ByteObjConverter;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.MessageToByteEncoder;
- public class PersonEncoder extends MessageToByteEncoder<Person> {
- @Override
- protected void encode(ChannelHandlerContext ctx, Person msg, ByteBuf out) throws Exception {
- byte[] datas = ByteObjConverter.ObjectToByte(msg);
- out.writeBytes(datas);
- ctx.flush();
- }
- }
工具類:ByteObjConverter
- package com.guowl.utils;
- import java.io.ByteArrayInputStream;
- import java.io.ByteArrayOutputStream;
- import java.io.IOException;
- import java.io.ObjectInputStream;
- import java.io.ObjectOutputStream;
- public class ByteObjConverter {
- public static Object ByteToObject(byte[] bytes) {
- Object obj = null;
- ByteArrayInputStream bi = new ByteArrayInputStream(bytes);
- ObjectInputStream oi = null;
- try {
- oi = new ObjectInputStream(bi);
- obj = oi.readObject();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- bi.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- try {
- oi.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- return obj;
- }
- public static byte[] ObjectToByte(Object obj) {
- byte[] bytes = null;
- ByteArrayOutputStream bo = new ByteArrayOutputStream();
- ObjectOutputStream oo = null;
- try {
- oo = new ObjectOutputStream(bo);
- oo.writeObject(obj);
- bytes = bo.toByteArray();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- bo.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- try {
- oo.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- return (bytes);
- }
- }
通過上述代碼,實現了Server端與Client端直接使用person對象進行通信的目的。基於此,可以構建更為復雜的場景:Server端同時支撐多種協議,不同的協議采用不同的Decoder進行解析,解析結果保持統一,這樣業務處理類可以保持接口一致。下一節將編寫這樣一個案例。
本例中需要注意的事項是:
1、Person對象必須實現Serializable接口,否則不能進行序列化。
2、PersonDecoder讀取ByteBuf數據的時候,並沒有對多次流式數據進行處理,而是簡單的一次性接收,如果數據量大的情況下,可能會出現數據不完整,這個問題會在后續的學習中解決。