先看工程路径,如下图
1.pom.xml:
1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 2 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 3 <modelVersion>4.0.0</modelVersion> 4 <groupId>netty-demo</groupId> 5 <artifactId>com.kingdee</artifactId> 6 <version>0.0.1-SNAPSHOT</version> 7 <properties> 8 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 9 <spring.version>3.2.5.RELEASE</spring.version> 10 <spring.rabbit.version>1.3.5.RELEASE</spring.rabbit.version> 11 </properties> 12 <dependencies> 13 <dependency> 14 <groupId>org.springframework</groupId> 15 <artifactId>spring-context</artifactId> 16 <version>${spring.version}</version> 17 </dependency> 18 19 <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> 20 <dependency> 21 <groupId>io.netty</groupId> 22 <artifactId>netty-all</artifactId> 23 <version>4.0.23.Final</version> 24 </dependency> 25 26 <!-- https://mvnrepository.com/artifact/log4j/log4j --> 27 <dependency> 28 <groupId>log4j</groupId> 29 <artifactId>log4j</artifactId> 30 <version>1.2.17</version> 31 </dependency> 32 33 <!-- https://mvnrepository.com/artifact/commons-logging/commons-logging --> 34 <dependency> 35 <groupId>commons-logging</groupId> 36 <artifactId>commons-logging</artifactId> 37 <version>1.1.1</version> 38 </dependency> 39 40 <dependency> 41 <groupId>com.google.protobuf</groupId> 42 <artifactId>protobuf-java</artifactId> 43 <version>3.0.0</version> 44 </dependency> 45 </dependencies> 46 </project>
2.msg.proto,把它转换成java代码,再拷贝到对应的包下,利用proto.exe工具生成
mgs.proto:
package com.netty.demo; message Client { required string head = 1; required string body = 2; } message Server { required int32 code=1; required string message=2; }
3.客户端代码:
Client.java:
package com.netty.demo.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class Client { public static String host = "127.0.0.1"; public static int port = 8787; public static void main(String[] args) { EventLoopGroup worker = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(worker); b.channel(NioSocketChannel.class); b.handler(new ClientInitializer()); try { ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { worker.shutdownGracefully(); } } }
ClientHandler.java(处理客户端消息发送和收到服务端消息的处理,但一般情况下是不会在这里写发送消息的逻辑的,只是为了写demo,所以把发消息写在这里面)
package com.netty.demo.client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import com.google.protobuf.Message; import com.netty.demo.Msg; public class ClientHandler extends SimpleChannelInboundHandler<Message> { /** * */ protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception { System.out.println("Server say : " + msg.toString()); } /** * */ public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Client active "); Msg.Client msg = Msg.Client.newBuilder().setHead("Content-Type:application/json;charset=UTF-8").setBody("hello world!").build(); ctx.writeAndFlush(msg); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("Client close "); super.channelInactive(ctx); } }
ClientInitializer.java(初始化Chanel,如解码,加密等)
package com.netty.demo.client; import com.netty.demo.Msg; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; public class ClientInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel ch) throws Exception { // decoded ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); //这里是收到服务端发过来的消息,所以是对服务端的response解码 ch.pipeline().addLast(new ProtobufDecoder(Msg.Server.getDefaultInstance())); // encoded ch.pipeline().addLast(new LengthFieldPrepender(4)); ch.pipeline().addLast(new ProtobufEncoder()); // 注册handler ch.pipeline().addLast(new ClientHandler()); } }
4.Server端代码:
Server.java
package com.netty.demo.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class Server { private static int port = 8787; public static void main(String[] args) { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap server = new ServerBootstrap(); server.group(boss, worker); server.channel(NioServerSocketChannel.class); server.childHandler(new ServerInitializer()); server.option(ChannelOption.SO_BACKLOG, 128); server.childOption(ChannelOption.SO_KEEPALIVE, true); try { //绑定端口 同步等待成功 ChannelFuture f = server.bind(port).sync(); //等待服务端监听端口关闭 f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { worker.shutdownGracefully(); boss.shutdownGracefully(); } } }
ServerHandler.java:
package com.netty.demo.server; import java.net.InetAddress; import com.google.protobuf.Message; import com.netty.demo.Msg; import com.netty.demo.Msg.Client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * 处理客户端连接时的handler * * @author shizhengchao32677 * */ public class ServerHandler extends SimpleChannelInboundHandler<Message> { /** * 收到客户端发过来的消息 */ protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception { // 收到消息直接打印输出 System.out.println(msg.getClass()); Msg.Server response = null; if(msg instanceof Msg.Client) { Msg.Client clientMsg = (Client) msg; System.out.println(ctx.channel().remoteAddress() + " Say : " + clientMsg.getBody()); response = Msg.Server.newBuilder().setCode(0).setMessage("Received client message success").build(); } else { response = Msg.Server.newBuilder().setCode(-1).setMessage("client message is illegal").build(); System.out.println("client message is illegal"); } // 返回客户端消息 - 我已经接收到了你的消息 ctx.writeAndFlush(response); } /* * 覆盖 channelActive 方法 在channel被启用的时候触发 (在建立连接的时候) */ public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !"); String welcome = "Welcome to " + InetAddress.getLocalHost().getHostName() + " service!"; Msg.Server response = Msg.Server.newBuilder().setCode(101).setMessage(welcome).build(); ctx.writeAndFlush(response); super.channelActive(ctx); } }
ServerInitializer.java
package com.netty.demo.server; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import com.netty.demo.Msg; public class ServerInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel ch) throws Exception { // decoded ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); //解码客户端发过来的消息 ch.pipeline().addLast(new ProtobufDecoder(Msg.Client.getDefaultInstance())); // encoded ch.pipeline().addLast(new LengthFieldPrepender(4)); ch.pipeline().addLast(new ProtobufEncoder()); // 注册handler ch.pipeline().addLast(new ServerHandler()); } }
运行Server.java和Client.java:
Server输出: RamoteAddress : /127.0.0.1:59693 active ! class com.netty.demo.Msg$Client /127.0.0.1:59693 Say : hello world! Clientl输出: Client active Server say : code: 101 message: "Welcome to H4UOJJQSF23HQ91 service!" Server say : code: 0 message: "Received client message success"