實現基於netty的web框架,了解一下


上一篇寫了,基於netty實現的rpc的微框架,其中詳細介紹netty的原理及組件,這篇就不過多介紹

這篇實現基於netty的web框架,你說netty強不強,文中有不對的地方,歡迎大牛指正

先普及幾個知識點

@sharable

標注一個channel handler可以被多個channel安全地共享。
ChannelHandlerAdapter還提供了實用方法isSharable()。如果其對應的實現被標注為Sharable,那么這個方法將返回true,
表示它可以被添加到多個ChannelPipeline中。 因為一個ChannelHandler可以從屬於多個ChannelPipeline,所以它也可以綁定到多個ChannelHandlerContext實例。
用於這種用法的ChannelHandler必須要使用@Sharable注解標注;否則,試圖將它添加到多個ChannelPipeline時將會觸發異常。
顯而易見,為了安全地被用於多個並發的Channel(即連接),這樣的ChannelHandler必須是線程安全的。

AtomicInteger:這個類的存在是為了滿足在高並發的情況下,原生的整形數值自增線程不安全的問題,在Java語言中,++i和i++操作並不是線程安全的,在使用的時候,不可避免的會用到synchronized關鍵字。而AtomicInteger則通過一種線程安全的加減操作接口。AtomicInteger為什么能夠達到多而不亂,處理高並發應付自如呢?

  這是由硬件提供原子操作指令實現的,這里面用到了一種並發技術:CAS。在非激烈競爭的情況下,開銷更小,速度更快

TimeUnit: 

TimeUnit是Java.util.concurrent包下面的一個類。它提供了兩大功能:

1)提供了可讀性更好的線程暫停操作,通常用來替換Thread.sleep(); 

2)提供了便捷方法用於把時間轉換成不同單位,如把秒轉換成毫秒;

TimeUnit.MINUTES.sleep(4);  // sleeping for 4 minutes

Thread.sleep(4*60*1000);

項目的目錄結構

 

上代碼,分享一些關鍵的代碼,后續的giuhub上的demo的注釋很詳細

//Netty 事件回調類
@Sharable
public class MessageCollector extends ChannelInboundHandlerAdapter {
    private final static Logger LOG = LoggerFactory.getLogger(MessageCollector.class);
    //業務線程池
    private ThreadPoolExecutor[] executors;
    private RequestDispatch requestDispatch;
    //業務隊列最大值
    private int requestsMaxInflight=1000;

    public MessageCollector(int workerThreads,RequestDispatch dispatch){
        //給業務線程命名
        ThreadFactory factory =new ThreadFactory() {
            AtomicInteger seq=new AtomicInteger();
            @Override
            public Thread newThread(Runnable r) {
                Thread thread =new Thread(r);
                thread.setName("http-"+seq.getAndIncrement());
                return thread;
            }
        };
        this.executors=new ThreadPoolExecutor[workerThreads];
        for(int i=0;i<workerThreads;i++){
            ArrayBlockingQueue queue=new ArrayBlockingQueue<Runnable>(requestsMaxInflight);
            ////閑置時間超過30秒的線程就自動銷毀
            this.executors[i]=new ThreadPoolExecutor(1,1,
                    30, TimeUnit.SECONDS, queue,factory,new CallerRunsPolicy());
        }

        this.requestDispatch=dispatch;
    }

    public  void  closeGracefully(){
        //優雅一點關閉,先通知,再等待,最后強制關閉
        for (int i=0;i<executors.length;i++){
            ThreadPoolExecutor executor=executors[i];
            try {
                executor.awaitTermination(10,TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executor.shutdownNow();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //客戶端來了一個新的連接
       LOG.info("connection comes {}",ctx.channel().remoteAddress());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //客戶端走了一個
        LOG.info("connection leaves {}",ctx.channel().remoteAddress());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest){
            FullHttpRequest req= (FullHttpRequest) msg;
            CRC32 crc32=new CRC32();
            crc32.update(ctx.hashCode());
            int idx =(int) (crc32.getValue()%executors.length);
            //用業務線程處理消息
            this.executors[idx].execute(() ->{
                requestDispatch.dispatch(ctx,req);
            });
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //此處可能因為客戶機器突發重啟
        //也可能客戶端連接時間超時,后面的REadTimeoutHandle拋出異常
        //也可能消息協議錯誤,序列化異常
        ctx.close();
    }
}
HttpServer
public class HttpServer {
    private final static Logger LOG= LoggerFactory.getLogger(HttpServer.class);
    private String ip;
    private int port;  //端口
    private int ioThreads;  //IO線程數,用於處理套接字讀寫,由Netty內部管理
    private int workerThreads;  //業務線程數,專門處理http請求,由我們本省框架管理
    private RequestDispatch requestDispatch;//請求配發器對象

    public HttpServer() {
    }

    public HttpServer(String ip, int port, int ioThreads,
                      int workerThreads, RequestDispatch requestDispatch) {
        this.ip = ip;
        this.port = port;
        this.ioThreads = ioThreads;
        this.workerThreads = workerThreads;
        this.requestDispatch = requestDispatch;
    }
    //用於服務端,使用一個ServerChannel接收客戶端的連接,
    // 並創建對應的子Channel
    private ServerBootstrap bootstrap;
    //包含多個EventLoop
    private EventLoopGroup group;
    //代表一個Socket連接
    private Channel serverChannel;
    //
    private MessageCollector collector;

    public  void start(){
        bootstrap=new ServerBootstrap();
        group=new NioEventLoopGroup(ioThreads);
        bootstrap.group(group);
        collector=new MessageCollector(workerThreads,requestDispatch);
        bootstrap.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline=socketChannel.pipeline();
                //如果客戶端60秒沒有任何請求,就關閉客戶端連接
                pipeline.addLast(new ReadTimeoutHandler(10));
                //客戶端和服務器簡單的編解碼器:HttpClientCodec和HttpServerCodec。
                //ChannelPipelien中有解碼器和編碼器(或編解碼器)后就可以操作不同的HttpObject消息了;但是HTTP請求和響應可以有很多消息數據,
                // 你需要處理不同的部分,可能也需要聚合這些消息數據
                pipeline.addLast(new HttpServerCodec());
                //通過HttpObjectAggregator,Netty可以聚合HTTP消息,
                // 使用FullHttpResponse和FullHttpRequest到ChannelPipeline中的下一個ChannelHandler,這就消除了斷裂消息,保證了消息的完整
                pipeline.addLast(new HttpObjectAggregator(1 << 30)); // max_size = 1g
                //允許通過處理ChunkedInput來寫大的數據塊
                pipeline.addLast(new ChunkedWriteHandler());
                //將業務處理器放到最后
                pipeline.addLast(collector);
            }
        });
    }

    public void stop() {
        // 先關閉服務端套件字
        serverChannel.close();
        // 再斬斷消息來源,停止io線程池
        group.shutdownGracefully();
        // 最后停止業務線程
        collector.closeGracefully();
    }

}

 

RequestDispatcherImpl 是請求派發器,用於將收到的HTTP請求對象扔給響應的RequestHandler進行處理。
public class RequestDispatcherImpl implements RequestDispatch {
    private final static Logger LOG = LoggerFactory.getLogger(RequestDispatcherImpl.class);

    private String contextRoot;
    private Router router;
    private Map<Integer, WebExceptionHandler> exceptionHandlers = new HashMap<>();
    private WebExceptionHandler defaultExceptionHandler = new DefaultExceptionHandler();

    private WebTemplateEngine templateEngine = new WebTemplateEngine() {
    };

    static class DefaultExceptionHandler implements WebExceptionHandler {

        @Override
        public void handle(ApplicationContext ctx, AbortException e) {
            if (e.getStatus().code() == 500) {
                LOG.error("Internal Server Error", e);
            }
            ctx.error(e.getContent(), e.getStatus().code());
        }

    }

    public RequestDispatcherImpl(Router router) {
        this("/", router);
    }

    public RequestDispatcherImpl(String contextRoot, Router router) {
        this.contextRoot = CurrentUtil.normalize(contextRoot);
        this.router = router;
    }

    public RequestDispatcherImpl templateRoot(String templateRoot) {
        this.templateEngine = new FreemarkerEngine(templateRoot);
        return this;
    }

    public String root() {
        return contextRoot;
    }

    public RequestDispatcherImpl exception(int code, WebExceptionHandler handler) {
        this.exceptionHandlers.put(code, handler);
        return this;
    }

    public RequestDispatcherImpl exception(WebExceptionHandler handler) {
        this.defaultExceptionHandler = handler;
        return this;
    }
    @Override
    public void dispatch(ChannelHandlerContext channelCtx, FullHttpRequest req) {
        ApplicationContext ctx = new ApplicationContext(channelCtx, contextRoot, templateEngine);
        try {
            this.handleImpl(ctx, new Request(req));
        } catch (AbortException e) {
            this.handleException(ctx, e);
        } catch (Exception e) {
            this.handleException(ctx, new AbortException(HttpResponseStatus.INTERNAL_SERVER_ERROR, e));
        } finally {
            req.release();
        }
    }

    private void handleException(ApplicationContext ctx, AbortException e) {
        WebExceptionHandler handler = this.exceptionHandlers.getOrDefault(e.getStatus().code(), defaultExceptionHandler);
        try {
            handler.handle(ctx, e);
        } catch (Exception ex) {
            this.defaultExceptionHandler.handle(ctx, new AbortException(HttpResponseStatus.INTERNAL_SERVER_ERROR, ex));
        }
    }

    private void handleImpl(ApplicationContext ctx, Request req) throws Exception {
        if (req.decoderResult().isFailure()) {
            ctx.abort(400, "http protocol decode failed");
        }
        if (req.relativeUri().contains("./") || req.relativeUri().contains(".\\")) {
            ctx.abort(400, "unsecure url not allowed");
        }
        if (!req.relativeUri().startsWith(contextRoot)) {
            throw new AbortException(HttpResponseStatus.NOT_FOUND);
        }
        req.popRootUri(contextRoot);
        router.handle(ctx, req);
    }
}

 

項目github位置

https://github.com/developerxiaofeng/WebFrameByNetty.git  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM