netty ChannelPipeline流處理源碼詳細分析


netty 官網api,在介紹pipeline處理流的時候,給了一些例子和圖片介紹。

以來證明 upstreamHandle和downstreamHandler流處理的順序。

光看例子和結論,說實話很難理解,干脆不如自己動手,debug。

如圖。

下面是例子

public class Server {
	public static void main(String args[]) {
		ServerBootstrap bootsrap = new ServerBootstrap(
				new NioServerSocketChannelFactory(Executors
						.newCachedThreadPool(), Executors.newCachedThreadPool()));
		bootsrap.setPipelineFactory(new PipelineFactoryTest());
		bootsrap.bind(new InetSocketAddress(8888));
	}

public class PipelineFactoryTest implements ChannelPipelineFactory {

	@Override
	public ChannelPipeline getPipeline() throws Exception {
		ChannelPipeline pipeline = Channels.pipeline();
		pipeline.addLast("1", new UpstreamHandlerA());
		pipeline.addLast("2", new UpstreamHandlerB());
		pipeline.addLast("3", new DownstreamHandlerA());
		pipeline.addLast("4", new DownstreamHandlerB());
		pipeline.addLast("5", new UpstreamHandlerX());
		return pipeline;
	}
}

public class UpstreamHandlerA extends SimpleChannelUpstreamHandler {
	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
			throws Exception {
		Channel ctxchannel = ctx.getChannel();
		Channel echannel =  e.getChannel();
		
		System.out.println(ctxchannel.equals(echannel));//handle和event共享一個channel
System.out.println("UpstreamHandlerA.messageReceived:" + e.getMessage()); ctx.sendUpstream(e); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { System.out.println("UpstreamHandlerA.exceptionCaught:" + e.toString()); e.getChannel().close(); } public class UpstreamHandlerB extends SimpleChannelUpstreamHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { System.out .println("UpstreamHandlerB.messageReceived:" + e.getMessage()); ctx.sendUpstream(e); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { System.out.println("UpstreamHandlerB.exceptionCaught:" + e.toString()); e.getChannel().close(); } } public class UpstreamHandlerX extends SimpleChannelUpstreamHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { System.out.println("UpstreamHandlerX.messageReceived:"+e.getMessage()); e.getChannel().write(e.getMessage()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { System.out.println("UpstreamHandlerX.exceptionCaught"); e.getChannel().close(); } } public class DownstreamHandlerA extends SimpleChannelDownstreamHandler { public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { System.out.println("DownstreamHandlerA.handleDownstream"); super.handleDownstream(ctx, e); } } public class DownstreamHandlerB extends SimpleChannelDownstreamHandler { public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { System.out.println("DownstreamHandlerB.handleDownstream:"); super.handleDownstream(ctx, e); } }

 client:



public class AppStoreClinetBootstrap { public static void main(String args[]){ ExecutorService bossExecutor = Executors.newCachedThreadPool(); ExecutorService workerExecutor = Executors.newCachedThreadPool(); ChannelFactory channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor); ClientBootstrap bootstarp = new ClientBootstrap(channelFactory); bootstarp.setPipelineFactory(new AppClientChannelPipelineFactory()); ChannelFuture future = bootstarp.connect(new InetSocketAddress("localhost", 8888)); future.awaitUninterruptibly(); if(future.isSuccess()){ String msg = "hello word"; ChannelBuffer buffer = ChannelBuffers.buffer(msg.length()); buffer.writeBytes(msg.getBytes()); future.getChannel().write(buffer); } } }

public class AppClientChannelPipelineFactory implements ChannelPipelineFactory{

 public ChannelPipeline getPipeline() throws Exception {  

 ChannelPipeline pipeline = pipeline();  

 //pipeline.addLast("encode", new StringEncoder());   

pipeline.addLast("handler", new AppStoreClientHandler());   return pipeline;  

}

}

public class AppStoreClientHandler extends SimpleChannelUpstreamHandler {  

private static Logger log = Logger.getLogger(AppStoreClientHandler.class);  

@Override

 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)    throws Exception {

 }

 @Override  

public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)    throws Exception {   

// TODO Auto-generated method stub   super.exceptionCaught(ctx, e);

 }

}

  

 上面的例子證明了。updatestream 和downstream的傳播順序。

Upstream: 1 ->2 ->5 順序處理
Downstream: 4 ->3  逆序處理

========================================================

好了到此打住。開始分析源碼為什么這樣?

 

在servers端,bind后,

如果client 沒有請求,那么servers端會一直處於循環狀態。直到有新的client 連接就開始激活

代碼如

 

 
NioServerSocketPipelineSinkle class
......................
public void run() {
            final Thread currentThread = Thread.currentThread();

            channel.shutdownLock.lock();
            try {
                for (;;) {
                    try {
                        if (selector.select(1000) > 0) {
                            selector.selectedKeys().clear();
                        }
                        //啟動servers后如果clent沒有請求,則這個一直循環
SocketChannel acceptedSocket = channel.socket.accept(); if (acceptedSocket != null) { registerAcceptedChannel(acceptedSocket, currentThread); } ......................................

有client請求后被激活開始注冊,如下代碼

 

 private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
            try {
                ChannelPipeline pipeline =channel.getConfig().getPipelineFactory().getPipeline();
                NioWorker worker = nextWorker();
                worker.register(new NioAcceptedSocketChannel(
                        channel.getFactory(), pipeline, channel,
                        NioServerSocketPipelineSink.this, acceptedSocket,
                        worker, currentThread), null);
            } catch (Exception e) {
                logger.warn(
                        "Failed to initialize an accepted socket.", e);
                try {
                    acceptedSocket.close();
                } catch (IOException e2) {
                    logger.warn(
                            "Failed to close a partially accepted socket.",
                            e2);
                }
            }
        }

紅色部分。拿到 pipeline所有的handle ,即 PipelineFactoryTest 類中ChannelPipeline,具體由NIOWORK去處理I/O

重點在於pipeline.addlast方法

 

    public synchronized void addLast(String name, ChannelHandler handler) {
        if (name2ctx.isEmpty()) {
            init(name, handler);
        } else {
            checkDuplicateName(name);
            DefaultChannelHandlerContext oldTail = tail;
            DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);

            callBeforeAdd(newTail);

            oldTail.next = newTail;
            tail = newTail;
            name2ctx.put(name, newTail);

            callAfterAdd(newTail);
        }
    }

DefaultChannelHandlerContext 就是鏈表結構,通過next和prev用來存放各種upstreamhandler和downstreamhandle(這個地方是重點),

由於upstream是專門負責接收數據的,所以當客戶端有數據請求時,PipelineFactoryTest類中的upstreamhandle就依次順序傳遞。

下面的代碼說明了,為什么是順序傳遞。如果大家細心的的話,可以看到 PipelineFactoryTest 3個upstreamhandle里面都有一個

ctx.sendUpstream(e);(ChannelHandlerContext 就是各種handler的上下文)

這個方法,就是上一個upstreamhandler負責將事件傳遞給下一個 upstreamhandler (典型的責任鏈模式)

代碼如下

 
     public void sendUpstream(ChannelEvent e) {
            DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
            if (next != null) {
                DefaultChannelPipeline.this.sendUpstream(next, e);//下一個upstreamhandle立馬觸發
            }
        } DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) { if (ctx == null) { return null; } DefaultChannelHandlerContext realCtx = ctx; while (!realCtx.canHandleUpstream()) { realCtx = realCtx.next; if (realCtx == null) { return null; } } return realCtx; }

  前面講了DefaultChannelHandlerContext是鏈表結構存放了不少handler,因此所有的upstreamhandle都在這里取。然后繼續事件傳遞。

因為所有的upstreamhandle是共用一個event,他們同時也共用一個channelbuffer。這種模式和責任鏈很相像,也可以來處理來用做filter處理

寫到這里,就很容易理解了,netty里面 各種encode(downstreamhandle)和decode(upstreamhandle)。

同理downstreamhandle分析

UpstreamHandlerX類有個e.getChannel().write(e.getMessage())方法。這里會觸發一個DownstreamMessageEvent,從而找到對應的DownstreamHandlerA

downA-downB的傳遞 是通過super.handleDownstream(ctx, e);來完成的。

 

 public ChannelFuture write(Object message) {
        return Channels.write(this, message);
    }

 public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
        ChannelFuture future = future(channel);
        channel.getPipeline().sendDownstream(
                new DownstreamMessageEvent(channel, future, message, remoteAddress));
        return future;
    }

  

 

寫這篇blog思維跳躍比較大。摸索了一天,挺有收獲的。記錄下

 

 

  

 

 

 

 

  

 

  

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM