pom
<!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling --> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>2.0.0.Beta2</version> </dependency>
處理對象的工具類 MarshallingCodeCFactory
package com.jiagoushi.lzh.網絡編程.netty1.serial; import io.netty.handler.codec.marshalling.*; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; /** * Marshalling工廠 * @author(liuzhonghua) * @since 2018-9-9 */ public final class MarshallingCodeCFactory { /** * 創建Jboss Marshalling解碼器MarshallingDecoder * @return MarshallingDecoder */ public static MarshallingDecoder buildMarshallingDecoder() { //首先通過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識創建的是java序列化工廠對象。 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //創建了MarshallingConfiguration對象,配置了版本號為5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根據marshallerFactory和configuration創建provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //構建Netty的MarshallingDecoder對象,倆個參數分別為provider和單個消息序列化后的最大長度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1); return decoder; } /** * 創建Jboss Marshalling編碼器MarshallingEncoder * @return MarshallingEncoder */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //構建Netty的MarshallingEncoder對象,MarshallingEncoder用於實現序列化接口的POJO對象序列化為二進制數組 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
實體類
req
package com.jiagoushi.lzh.網絡編程.netty1.serial; import lombok.Data; import java.io.Serializable; @Data public class Req implements Serializable{ private static final long SerialVersionUID = 1L; private String id ; private String name ; private String requestMessage ; private byte[] attachment; }
resp
package com.jiagoushi.lzh.網絡編程.netty1.serial; import lombok.Data; import java.io.Serializable; @Data public class Resp implements Serializable{ private static final long serialVersionUID = 1L; private String id; private String name; private String responseMessage; }
server服務端
package com.jiagoushi.lzh.網絡編程.netty1.serial; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //設置日志 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
服務端處理類
package com.jiagoushi.lzh.網絡編程.netty1.serial; import com.jiagoushi.lzh.網絡編程.utils.GzipUtils; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.io.File; import java.io.FileOutputStream; public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Req req = (Req)msg; System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage()); byte[] attachment = GzipUtils.ungzip(req.getAttachment()); String path = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "00"+req.getId()+".jpg"; FileOutputStream fos = new FileOutputStream(path); fos.write(attachment); fos.close(); Resp resp = new Resp(); resp.setId(req.getId()); resp.setName("resp" + req.getId()); resp.setResponseMessage("響應內容" + req.getId()); ctx.writeAndFlush(resp);//.addListener(ChannelFutureListener.CLOSE); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
client客戶端
package com.jiagoushi.lzh.網絡編程.netty1.serial; import com.jiagoushi.lzh.網絡編程.utils.GzipUtils; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.io.File; import java.io.FileInputStream; public class Client { public static void main(String[] args) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); for(int i = 0; i < 5; i++ ){ Req req = new Req(); req.setId("" + i); req.setName("pro" + i); req.setRequestMessage("數據信息" + i); String path = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "001.jpg"; File file = new File(path); FileInputStream in = new FileInputStream(file); byte[] data = new byte[in.available()]; in.read(data); in.close(); req.setAttachment(GzipUtils.gzip(data)); cf.channel().writeAndFlush(req); } cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
客戶端處理類
package com.jiagoushi.lzh.網絡編程.netty1.serial; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Resp resp = (Resp)msg; System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage()); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }