之前在工作中已經用netty寫了tcp服務,感覺還不錯,就又簡單的寫了個Udp服務,防止以后工作中用到,到時就不用再到處翻了,拿來就用O(∩_∩)O~
說明:我用的是netty是3.5.3 ,截止目前3.x最新穩定版,看官網已經有4.0的alpha版了,不過4.0和3.x是不兼容的,改動比較大,等4.0穩定后再嘗試嘗試
1 /** 2 * @author Jadic 3 * @created 2012-8-10 4 */ 5 package com.jadic; 6 7 import java.net.InetSocketAddress; 8 9 import org.jboss.netty.bootstrap.ConnectionlessBootstrap; 10 import org.jboss.netty.channel.ChannelPipeline; 11 import org.jboss.netty.channel.ChannelPipelineFactory; 12 import org.jboss.netty.channel.Channels; 13 import org.jboss.netty.channel.socket.DatagramChannelFactory; 14 import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; 15 16 /** 17 * @author Jadic 18 * 19 */ 20 public class UdpServer { 21 22 private ConnectionlessBootstrap udpBootstrap; 23 24 public UdpServer(int port) { 25 DatagramChannelFactory channelFactory = new NioDatagramChannelFactory(); 26 udpBootstrap = new ConnectionlessBootstrap(channelFactory); 27 udpBootstrap.setPipelineFactory(new ChannelPipelineFactory() { 28 @Override 29 public ChannelPipeline getPipeline() throws Exception { 30 return Channels.pipeline(new UdpEventHandler()); 31 } 32 }); 33 udpBootstrap.bind(new InetSocketAddress("192.168.6.19", port)); 34 System.out.println("udp server started, listening on port:" + port); 35 } 36 37 public static void main(String[] args) { 38 new UdpServer(6803); 39 } 40 41 }
Udp數據handler,其中主要重寫下messageReceived和exceptionCaught方法就可以了
1 /** 2 * @author Jadic 3 * @created 2012-8-10 4 */ 5 package com.jadic; 6 7 import org.jboss.netty.buffer.ChannelBuffer; 8 import org.jboss.netty.channel.ChannelHandlerContext; 9 import org.jboss.netty.channel.ExceptionEvent; 10 import org.jboss.netty.channel.MessageEvent; 11 import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 12 13 /** 14 * @author Jadic 15 * 16 */ 17 public class UdpEventHandler extends SimpleChannelUpstreamHandler { 18 19 private void log(Object msg) { 20 System.out.println(msg); 21 } 22 23 @Override 24 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) 25 throws Exception { 26 log("messageReceived"); 27 ChannelBuffer buffer = (ChannelBuffer)e.getMessage(); 28 log("recvd " + buffer.readableBytes() + " bytes [" + KKTool.channelBufferToHexStr(buffer) + "]"); 29 } 30 31 @Override 32 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) 33 throws Exception { 34 log("exceptionCaught"); 35 } 36 37 }
有個小細節,在打印udp數據日志時,有用到ChannelBuffer.array()獲取字節內容,在tcp服務中,通過channelBuffer.array()獲取的就是真實收到的數據,而在udp服務中,則不是,跟了下源碼,tcp服務在接收數據時,是根據收到的字節數來新建一個ChannelBuffer的,而udp服務則是有根據默認接收緩存區大小新建一個ByteBuffer
如下分別是Netty Udp和Tcp服務中底層讀取數據方法的源碼,兩個類分別是NioDatagramWorker和NioWorker
1 @Override 2 protected boolean read(final SelectionKey key) { 3 final NioDatagramChannel channel = (NioDatagramChannel) key.attachment(); 4 ReceiveBufferSizePredictor predictor = 5 channel.getConfig().getReceiveBufferSizePredictor(); 6 final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory(); 7 final DatagramChannel nioChannel = (DatagramChannel) key.channel(); 8 9 // Allocating a non-direct buffer with a max udp packge size. 10 // Would using a direct buffer be more efficient or would this negatively 11 // effect performance, as direct buffer allocation has a higher upfront cost 12 // where as a ByteBuffer is heap allocated.//此處分配ByteBuffer大小時用的是Udp接收一包最大的數據大小,默認是768 13 final ByteBuffer byteBuffer = ByteBuffer.allocate( 14 predictor.nextReceiveBufferSize()).order(bufferFactory.getDefaultOrder()); 15 16 boolean failure = true; 17 SocketAddress remoteAddress = null; 18 try { 19 // Receive from the channel in a non blocking mode. We have already been notified that 20 // the channel is ready to receive. 21 remoteAddress = nioChannel.receive(byteBuffer); 22 failure = false; 23 } catch (ClosedChannelException e) { 24 // Can happen, and does not need a user attention. 25 } catch (Throwable t) { 26 fireExceptionCaught(channel, t); 27 } 28 29 if (remoteAddress != null) { 30 // Flip the buffer so that we can wrap it. 31 byteBuffer.flip(); 32 33 int readBytes = byteBuffer.remaining(); 34 if (readBytes > 0) { 35 // Update the predictor. 36 predictor.previousReceiveBufferSize(readBytes); 37 38 // Notify the interested parties about the newly arrived message. 此處將bytebuffer封裝成ChannelBuffer繼續向后觸發事件 39 fireMessageReceived( 40 channel, bufferFactory.getBuffer(byteBuffer), remoteAddress); 41 } 42 }
Tcp的
1 @Override 2 protected boolean read(SelectionKey k) { 3 final SocketChannel ch = (SocketChannel) k.channel(); 4 final NioSocketChannel channel = (NioSocketChannel) k.attachment(); 5 6 final ReceiveBufferSizePredictor predictor = 7 channel.getConfig().getReceiveBufferSizePredictor(); 8 final int predictedRecvBufSize = predictor.nextReceiveBufferSize(); 9 10 int ret = 0; 11 int readBytes = 0; 12 boolean failure = true; 13 14 ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize); 15 try { 16 while ((ret = ch.read(bb)) > 0) { 17 readBytes += ret; 18 if (!bb.hasRemaining()) { 19 break; 20 } 21 } 22 failure = false; 23 } catch (ClosedChannelException e) { 24 // Can happen, and does not need a user attention. 25 } catch (Throwable t) { 26 fireExceptionCaught(channel, t); 27 } 28 29 if (readBytes > 0) { 30 bb.flip(); 31 32 final ChannelBufferFactory bufferFactory = 33 channel.getConfig().getBufferFactory(); 34 final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);//這就是tcp服務中獲取到的ChannelBuffer,大小就是讀取到的字節數 readBytes 35 buffer.setBytes(0, bb); 36 buffer.writerIndex(readBytes); 37 38 recvBufferPool.release(bb); 39 40 // Update the predictor. 41 predictor.previousReceiveBufferSize(readBytes); 42 43 // Fire the event. 44 fireMessageReceived(channel, buffer); 45 } else { 46 recvBufferPool.release(bb); 47 } 48 49 if (ret < 0 || failure) { 50 k.cancel(); // Some JDK implementations run into an infinite loop without this. 51 close(channel, succeededFuture(channel)); 52 return false; 53 } 54 55 return true; 56 }