1、我們在使用netty的是有都會存在將channelBuffer的數據處理成相應的String或者自定義數據。而這里主要是介紹管道里面存在的上行和下行的數據處理方式
2、通過一張圖片來看一下具體管道中的實現過程
一個Channel中包括一個Socket、一個ChannelPipeline。一個ChannelPipeline中有一個ChannelSink和多個ChannelHandler。ChannelHandler分為兩種:UpstremHandler、DownstreamHandler。
不論是讀數據還是寫數據都要經過Channel中的ChannelPipeline。讀數據的過程是從Socket到ChannelPipeline,由ChannelPipeline交給里面的UpstreamHandler(或者叫做InBoundHandler)從下到上依次處理 。寫數據時,由要經過ChannelPipeline里面在DownStreamHandler(或者是OutBoundHandler)由上到下依次處理。
3、因為UpstremHandler與DownstreamHandler的實現方式大同小異,我這里寫的例子是UpstremHandler的例子
package com.troy.application.upstream; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.handler.timeout.IdleStateHandler; import org.jboss.netty.util.HashedWheelTimer; import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Server { public static void main(String[] args) { //聲明服務類 ServerBootstrap serverBootstrap = new ServerBootstrap(); //設定線程池 ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService work = Executors.newCachedThreadPool(); //設置工廠 serverBootstrap.setFactory(new NioServerSocketChannelFactory(boss,work)); //設置管道流 serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline channelPipeline = Channels.pipeline(); //添加處理方式 channelPipeline.addLast("idle",new IdleStateHandler(new HashedWheelTimer(),5,5,10)); channelPipeline.addLast("handler1",new Handler1()); channelPipeline.addLast("handler2",new Handler2()); return channelPipeline; } }); //設置端口 serverBootstrap.bind(new InetSocketAddress(9000)); } }
package com.troy.application.upstream; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.UpstreamMessageEvent; public class Handler1 extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { ChannelBuffer channelBuffer = (ChannelBuffer) e.getMessage(); byte[] array = channelBuffer.array(); String message = new String(array); System.out.println("handler1"+message); ctx.sendUpstream(new UpstreamMessageEvent(ctx.getChannel(),message,e.getRemoteAddress())); } }
package com.troy.application.upstream; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; public class Handler2 extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { String message = (String) e.getMessage(); System.out.println("handler2"+message); } }
說明:這里最重要的兩個方法是sendUpstream和sendDownstream。這兩個方式在上行和下行的處理基本上是一樣的。在源碼里面handler的處理都會存在sendUpstream和sendDownstream。這個兩個方法也是多重處理的基礎。