netty是java的高性能socket框架,linux下基epoll,這里不對他多牛逼作分析,網上資料很多,這里針對一般socket的業務作個例子
幾個基本概念:
channel類似於socket句柄的抽象
pipeline是每個socket里面的eventHandler的處理響應鏈
每個socket(channel)綁定一個pipeline,,每個pipeline綁定若干個handler,netty里面的handler,專門用來處理和業務有關的東西,handler有upHandler和downHandler,down用來處理發包,up用來處理收包,大概的示例圖看這里

注意上面的123的順序,很重要,在netty里面,處理順序如圖,對於up類的收包處理,最靠近收包層的順序越靠前;對於down類的包處理,最靠近收包層的順序越靠后
還有一些encoder和decoder,encoder用來在發包之前進行加密,decoder在收包以后進行解碼,然后業務數據跳到事件處理流程。
下面具體上代碼,版本是netty3.6
MessageClientHandler.java
1 package com.netty.test.client; 2 3 import java.util.logging.Level; 4 import java.util.logging.Logger; 5 6 import org.jboss.netty.channel.ChannelHandlerContext; 7 import org.jboss.netty.channel.ChannelStateEvent; 8 import org.jboss.netty.channel.ExceptionEvent; 9 import org.jboss.netty.channel.MessageEvent; 10 import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 11 12 public class MessageClientHandler extends SimpleChannelUpstreamHandler { 13 14 private static final Logger logger = Logger.getLogger( 15 MessageClientHandler.class.getName()); 16 17 18 @Override 19 public void channelConnected( 20 ChannelHandlerContext ctx, ChannelStateEvent e) { 21 String message = "hello kafka0102"; 22 e.getChannel().write(message); 23 } 24 25 @Override 26 public void messageReceived( 27 ChannelHandlerContext ctx, MessageEvent e) { 28 // Send back the received message to the remote peer. 29 System.err.println("client messageReceived send message "+e.getMessage()); 30 try { 31 Thread.sleep(1000*3); 32 } catch (Exception ex) { 33 ex.printStackTrace(); 34 } 35 e.getChannel().write(e.getMessage()); 36 } 37 38 @Override 39 public void exceptionCaught( 40 ChannelHandlerContext ctx, ExceptionEvent e) { 41 // Close the connection when an exception is raised. 42 logger.log( 43 Level.WARNING, 44 "Unexpected exception from downstream.", 45 e.getCause()); 46 e.getChannel().close(); 47 } 48 }
MessageDecoder.java
1 package com.netty.test.client; 2 import org.jboss.netty.buffer.ChannelBuffer; 3 import org.jboss.netty.channel.Channel; 4 import org.jboss.netty.channel.ChannelHandlerContext; 5 import org.jboss.netty.handler.codec.frame.FrameDecoder; 6 7 public class MessageDecoder extends FrameDecoder { 8 9 @Override 10 protected Object decode( 11 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { 12 if (buffer.readableBytes() < 4) { 13 return null;//(1) 14 } 15 int dataLength = buffer.getInt(buffer.readerIndex()); 16 if (buffer.readableBytes() < dataLength + 4) { 17 return null;//(2) 18 } 19 20 buffer.skipBytes(4);//(3) 21 byte[] decoded = new byte[dataLength]; 22 buffer.readBytes(decoded); 23 String msg = new String(decoded);//(4) 24 return msg; 25 } 26 }
MessageEncoder.java
1 package com.netty.test.client; 2 import org.jboss.netty.buffer.ChannelBuffer; 3 import org.jboss.netty.buffer.ChannelBuffers; 4 import org.jboss.netty.channel.Channel; 5 import org.jboss.netty.channel.ChannelHandlerContext; 6 import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; 7 8 public class MessageEncoder extends OneToOneEncoder { 9 10 @Override 11 protected Object encode( 12 ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { 13 if (!(msg instanceof String)) { 14 return msg;//(1) 15 } 16 17 String res = (String)msg; 18 byte[] data = res.getBytes(); 19 int dataLength = data.length; 20 ChannelBuffer buf = ChannelBuffers.dynamicBuffer();//(2) 21 buf.writeInt(dataLength); 22 buf.writeBytes(data); 23 return buf;//(3) 24 } 25 }
用來測試的TestClientDownHandlerA.java
1 package com.netty.test.client; 2 3 import org.jboss.netty.channel.ChannelEvent; 4 import org.jboss.netty.channel.ChannelHandlerContext; 5 import org.jboss.netty.channel.SimpleChannelDownstreamHandler; 6 7 public class TestClientDownHandlerA extends SimpleChannelDownstreamHandler { 8 9 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) 10 throws Exception { 11 System.err.println("test client Down handlerA "); 12 13 super.handleDownstream(ctx, e); 14 } 15 }
用來測試的TestClientDownHandlerB.java
1 package com.netty.test.client; 2 3 import org.jboss.netty.channel.ChannelEvent; 4 import org.jboss.netty.channel.ChannelHandlerContext; 5 import org.jboss.netty.channel.SimpleChannelDownstreamHandler; 6 7 public class TestClientDownHandlerB extends SimpleChannelDownstreamHandler { 8 9 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) 10 throws Exception { 11 System.err.println("test client Down handlerB "); 12 13 super.handleDownstream(ctx, e); 14 } 15 }
MessageClientPipelineFactory.java
1 package com.netty.test.client; 2 3 import org.jboss.netty.channel.ChannelPipeline; 4 import org.jboss.netty.channel.ChannelPipelineFactory; 5 import org.jboss.netty.channel.Channels; 6 7 import com.netty.test.client.MessageDecoder; 8 import com.netty.test.client.MessageEncoder; 9 10 public class MessageClientPipelineFactory implements ChannelPipelineFactory { 11 12 public ChannelPipeline getPipeline() throws Exception { 13 ChannelPipeline pipeline = Channels.pipeline(); 14 15 pipeline.addLast("decoder", new MessageDecoder()); 16 pipeline.addLast("encoder", new MessageEncoder()); 17 pipeline.addLast("handler", new MessageClientHandler()); 18 19 pipeline.addFirst("testClientDownHandlerA", new TestClientDownHandlerA()); 20 pipeline.addFirst("testClientDownHandlerB", new TestClientDownHandlerB()); 21 22 return pipeline; 23 } 24 }
MessageClient.java
1 package com.netty.test.client; 2 3 import java.net.InetSocketAddress; 4 import java.util.concurrent.Executors; 5 import org.jboss.netty.bootstrap.ClientBootstrap; 6 import org.jboss.netty.channel.ChannelFuture; 7 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 8 9 public class MessageClient { 10 11 public static void main(String[] args) throws Exception { 12 // Parse options. 13 String host = "127.0.0.1"; 14 int port = 8888; 15 // Configure the client. 16 ClientBootstrap bootstrap = new ClientBootstrap( 17 new NioClientSocketChannelFactory( 18 Executors.newCachedThreadPool(), 19 Executors.newCachedThreadPool())); 20 // Set up the event pipeline factory. 21 bootstrap.setPipelineFactory(new MessageClientPipelineFactory()); 22 // Start the connection attempt. 23 ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); 24 // Wait until the connection is closed or the connection attempt fails. 25 future.getChannel().getCloseFuture().awaitUninterruptibly(); 26 // Shut down thread pools to exit. 27 bootstrap.releaseExternalResources(); 28 } 29 }
客戶端代碼完成
以下為服務端測試代碼
MessageDecoder和Messagencoder照抄,這個都是一樣的
MessageServerHandler.java
1 package com.netty.test.server; 2 import java.util.logging.Level; 3 import java.util.logging.Logger; 4 5 import org.jboss.netty.channel.ChannelHandlerContext; 6 import org.jboss.netty.channel.ExceptionEvent; 7 import org.jboss.netty.channel.MessageEvent; 8 import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 9 10 public class MessageServerHandler extends SimpleChannelUpstreamHandler { 11 12 private static final Logger logger = Logger.getLogger( 13 MessageServerHandler.class.getName()); 14 15 @Override 16 public void messageReceived( 17 ChannelHandlerContext ctx, MessageEvent e) { 18 if (!(e.getMessage() instanceof String)) { 19 return;//(1) 20 } 21 String msg = (String) e.getMessage(); 22 System.err.println("got msg:"+msg); 23 e.getChannel().write(msg);//(2) 24 } 25 26 @Override 27 public void exceptionCaught( 28 ChannelHandlerContext ctx, ExceptionEvent e) { 29 logger.log( 30 Level.WARNING, 31 "Unexpected exception from downstream.", 32 e.getCause()); 33 e.getChannel().close(); 34 } 35 }
TestServerUpHandlerA.java
1 package com.netty.test.server; 2 3 import org.jboss.netty.channel.ChannelHandlerContext; 4 import org.jboss.netty.channel.ExceptionEvent; 5 import org.jboss.netty.channel.MessageEvent; 6 import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 7 8 public class TestServerUpHandlerA extends SimpleChannelUpstreamHandler { 9 10 @Override 11 public void messageReceived( 12 ChannelHandlerContext ctx, MessageEvent e) { 13 // Send back the received message to the remote peer. 14 System.err.println("server test upHandlerA get message "+e.getMessage()); 15 16 try { 17 super.messageReceived(ctx, e); 18 } catch (Exception e1) { 19 // TODO Auto-generated catch block 20 e1.printStackTrace(); 21 } 22 } 23 24 @Override 25 public void exceptionCaught( 26 ChannelHandlerContext ctx, ExceptionEvent e) { 27 28 e.getChannel().close(); 29 } 30 }
TestServerUpHandlerB.java
1 package com.netty.test.server; 2 3 import org.jboss.netty.channel.ChannelHandlerContext; 4 import org.jboss.netty.channel.ExceptionEvent; 5 import org.jboss.netty.channel.MessageEvent; 6 import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 7 8 public class TestServerUpHandlerB extends SimpleChannelUpstreamHandler { 9 10 @Override 11 public void messageReceived( 12 ChannelHandlerContext ctx, MessageEvent e) { 13 // Send back the received message to the remote peer. 14 System.err.println("server test upHandlerB get message "+e.getMessage()); 15 16 try { 17 super.messageReceived(ctx, e); 18 } catch (Exception e1) { 19 // TODO Auto-generated catch block 20 e1.printStackTrace(); 21 } 22 } 23 24 @Override 25 public void exceptionCaught( 26 ChannelHandlerContext ctx, ExceptionEvent e) { 27 28 e.getChannel().close(); 29 } 30 }
MessageServerPipelineFactory.java
1 package com.netty.test.server; 2 3 import org.jboss.netty.channel.ChannelPipeline; 4 import org.jboss.netty.channel.ChannelPipelineFactory; 5 import org.jboss.netty.channel.Channels; 6 7 public class MessageServerPipelineFactory implements 8 ChannelPipelineFactory { 9 10 public ChannelPipeline getPipeline() throws Exception { 11 ChannelPipeline pipeline = Channels.pipeline(); 12 13 pipeline.addLast("decoder", new MessageDecoder()); 14 pipeline.addLast("encoder", new MessageEncoder()); 15 pipeline.addLast("handler", new MessageServerHandler()); 16 17 // pipeline.addFirst("testServerUpHandlerA", new TestServerUpHandlerA()); 18 // pipeline.addFirst("testServerUpHandlerB", new TestServerUpHandlerB()); 19 20 return pipeline; 21 } 22 }
MessageServer.java
1 package com.netty.test.server; 2 3 import java.net.InetSocketAddress; 4 import java.util.concurrent.Executors; 5 import org.jboss.netty.bootstrap.ServerBootstrap; 6 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; 7 8 public class MessageServer { 9 10 public static void main(String[] args) throws Exception { 11 // Configure the server. 12 ServerBootstrap bootstrap = new ServerBootstrap( 13 new NioServerSocketChannelFactory( 14 Executors.newCachedThreadPool(), 15 Executors.newCachedThreadPool())); 16 17 // Set up the default event pipeline. 18 bootstrap.setPipelineFactory(new MessageServerPipelineFactory()); 19 20 // Bind and start to accept incoming connections. 21 bootstrap.bind(new InetSocketAddress(8888)); 22 } 23 }
通過測試,可以發現handler的處理順序,和圖上面的是一致的;還有可以參考下encoder和decoder的寫法,改改,直接用於項目。
