先看工程路徑,如下圖
1.pom.xml:
1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 2 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 3 <modelVersion>4.0.0</modelVersion> 4 <groupId>netty-demo</groupId> 5 <artifactId>com.kingdee</artifactId> 6 <version>0.0.1-SNAPSHOT</version> 7 <properties> 8 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 9 <spring.version>3.2.5.RELEASE</spring.version> 10 <spring.rabbit.version>1.3.5.RELEASE</spring.rabbit.version> 11 </properties> 12 <dependencies> 13 <dependency> 14 <groupId>org.springframework</groupId> 15 <artifactId>spring-context</artifactId> 16 <version>${spring.version}</version> 17 </dependency> 18 19 <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> 20 <dependency> 21 <groupId>io.netty</groupId> 22 <artifactId>netty-all</artifactId> 23 <version>4.0.23.Final</version> 24 </dependency> 25 26 <!-- https://mvnrepository.com/artifact/log4j/log4j --> 27 <dependency> 28 <groupId>log4j</groupId> 29 <artifactId>log4j</artifactId> 30 <version>1.2.17</version> 31 </dependency> 32 33 <!-- https://mvnrepository.com/artifact/commons-logging/commons-logging --> 34 <dependency> 35 <groupId>commons-logging</groupId> 36 <artifactId>commons-logging</artifactId> 37 <version>1.1.1</version> 38 </dependency> 39 40 <dependency> 41 <groupId>com.google.protobuf</groupId> 42 <artifactId>protobuf-java</artifactId> 43 <version>3.0.0</version> 44 </dependency> 45 </dependencies> 46 </project>
2.msg.proto,把它轉換成java代碼,再拷貝到對應的包下,利用proto.exe工具生成
mgs.proto:
package com.netty.demo; message Client { required string head = 1; required string body = 2; } message Server { required int32 code=1; required string message=2; }
3.客戶端代碼:
Client.java:
package com.netty.demo.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class Client { public static String host = "127.0.0.1"; public static int port = 8787; public static void main(String[] args) { EventLoopGroup worker = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(worker); b.channel(NioSocketChannel.class); b.handler(new ClientInitializer()); try { ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { worker.shutdownGracefully(); } } }
ClientHandler.java(處理客戶端消息發送和收到服務端消息的處理,但一般情況下是不會在這里寫發送消息的邏輯的,只是為了寫demo,所以把發消息寫在這里面)
package com.netty.demo.client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import com.google.protobuf.Message; import com.netty.demo.Msg; public class ClientHandler extends SimpleChannelInboundHandler<Message> { /** * */ protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception { System.out.println("Server say : " + msg.toString()); } /** * */ public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Client active "); Msg.Client msg = Msg.Client.newBuilder().setHead("Content-Type:application/json;charset=UTF-8").setBody("hello world!").build(); ctx.writeAndFlush(msg); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("Client close "); super.channelInactive(ctx); } }
ClientInitializer.java(初始化Chanel,如解碼,加密等)
package com.netty.demo.client; import com.netty.demo.Msg; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; public class ClientInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel ch) throws Exception { // decoded ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); //這里是收到服務端發過來的消息,所以是對服務端的response解碼 ch.pipeline().addLast(new ProtobufDecoder(Msg.Server.getDefaultInstance())); // encoded ch.pipeline().addLast(new LengthFieldPrepender(4)); ch.pipeline().addLast(new ProtobufEncoder()); // 注冊handler ch.pipeline().addLast(new ClientHandler()); } }
4.Server端代碼:
Server.java
package com.netty.demo.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class Server { private static int port = 8787; public static void main(String[] args) { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap server = new ServerBootstrap(); server.group(boss, worker); server.channel(NioServerSocketChannel.class); server.childHandler(new ServerInitializer()); server.option(ChannelOption.SO_BACKLOG, 128); server.childOption(ChannelOption.SO_KEEPALIVE, true); try { //綁定端口 同步等待成功 ChannelFuture f = server.bind(port).sync(); //等待服務端監聽端口關閉 f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { worker.shutdownGracefully(); boss.shutdownGracefully(); } } }
ServerHandler.java:
package com.netty.demo.server; import java.net.InetAddress; import com.google.protobuf.Message; import com.netty.demo.Msg; import com.netty.demo.Msg.Client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * 處理客戶端連接時的handler * * @author shizhengchao32677 * */ public class ServerHandler extends SimpleChannelInboundHandler<Message> { /** * 收到客戶端發過來的消息 */ protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception { // 收到消息直接打印輸出 System.out.println(msg.getClass()); Msg.Server response = null; if(msg instanceof Msg.Client) { Msg.Client clientMsg = (Client) msg; System.out.println(ctx.channel().remoteAddress() + " Say : " + clientMsg.getBody()); response = Msg.Server.newBuilder().setCode(0).setMessage("Received client message success").build(); } else { response = Msg.Server.newBuilder().setCode(-1).setMessage("client message is illegal").build(); System.out.println("client message is illegal"); } // 返回客戶端消息 - 我已經接收到了你的消息 ctx.writeAndFlush(response); } /* * 覆蓋 channelActive 方法 在channel被啟用的時候觸發 (在建立連接的時候) */ public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !"); String welcome = "Welcome to " + InetAddress.getLocalHost().getHostName() + " service!"; Msg.Server response = Msg.Server.newBuilder().setCode(101).setMessage(welcome).build(); ctx.writeAndFlush(response); super.channelActive(ctx); } }
ServerInitializer.java
package com.netty.demo.server; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import com.netty.demo.Msg; public class ServerInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel ch) throws Exception { // decoded ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); //解碼客戶端發過來的消息 ch.pipeline().addLast(new ProtobufDecoder(Msg.Client.getDefaultInstance())); // encoded ch.pipeline().addLast(new LengthFieldPrepender(4)); ch.pipeline().addLast(new ProtobufEncoder()); // 注冊handler ch.pipeline().addLast(new ServerHandler()); } }
運行Server.java和Client.java:
Server輸出: RamoteAddress : /127.0.0.1:59693 active ! class com.netty.demo.Msg$Client /127.0.0.1:59693 Say : hello world! Clientl輸出: Client active Server say : code: 101 message: "Welcome to H4UOJJQSF23HQ91 service!" Server say : code: 0 message: "Received client message success"