上一篇寫了,基於netty實現的rpc的微框架,其中詳細介紹netty的原理及組件,這篇就不過多介紹
這篇實現基於netty的web框架,你說netty強不強,文中有不對的地方,歡迎大牛指正
先普及幾個知識點
@sharable
標注一個channel handler可以被多個channel安全地共享。
ChannelHandlerAdapter還提供了實用方法isSharable()。如果其對應的實現被標注為Sharable,那么這個方法將返回true,
表示它可以被添加到多個ChannelPipeline中。
因為一個ChannelHandler可以從屬於多個ChannelPipeline,所以它也可以綁定到多個ChannelHandlerContext實例。
用於這種用法的ChannelHandler必須要使用@Sharable注解標注;否則,試圖將它添加到多個ChannelPipeline時將會觸發異常。
顯而易見,為了安全地被用於多個並發的Channel(即連接),這樣的ChannelHandler必須是線程安全的。
AtomicInteger:這個類的存在是為了滿足在高並發的情況下,原生的整形數值自增線程不安全的問題,在Java語言中,++i和i++操作並不是線程安全的,在使用的時候,不可避免的會用到synchronized關鍵字。而AtomicInteger則通過一種線程安全的加減操作接口。AtomicInteger為什么能夠達到多而不亂,處理高並發應付自如呢?
這是由硬件提供原子操作指令實現的,這里面用到了一種並發技術:CAS。在非激烈競爭的情況下,開銷更小,速度更快
TimeUnit:
TimeUnit是Java.util.concurrent包下面的一個類。它提供了兩大功能:
1)提供了可讀性更好的線程暫停操作,通常用來替換Thread.sleep();
2)提供了便捷方法用於把時間轉換成不同單位,如把秒轉換成毫秒;
TimeUnit.MINUTES.sleep(4); // sleeping for 4 minutes Thread.sleep(4*60*1000);
項目的目錄結構
上代碼,分享一些關鍵的代碼,后續的giuhub上的demo的注釋很詳細
//Netty 事件回調類 @Sharable public class MessageCollector extends ChannelInboundHandlerAdapter { private final static Logger LOG = LoggerFactory.getLogger(MessageCollector.class); //業務線程池 private ThreadPoolExecutor[] executors; private RequestDispatch requestDispatch; //業務隊列最大值 private int requestsMaxInflight=1000; public MessageCollector(int workerThreads,RequestDispatch dispatch){ //給業務線程命名 ThreadFactory factory =new ThreadFactory() { AtomicInteger seq=new AtomicInteger(); @Override public Thread newThread(Runnable r) { Thread thread =new Thread(r); thread.setName("http-"+seq.getAndIncrement()); return thread; } }; this.executors=new ThreadPoolExecutor[workerThreads]; for(int i=0;i<workerThreads;i++){ ArrayBlockingQueue queue=new ArrayBlockingQueue<Runnable>(requestsMaxInflight); ////閑置時間超過30秒的線程就自動銷毀 this.executors[i]=new ThreadPoolExecutor(1,1, 30, TimeUnit.SECONDS, queue,factory,new CallerRunsPolicy()); } this.requestDispatch=dispatch; } public void closeGracefully(){ //優雅一點關閉,先通知,再等待,最后強制關閉 for (int i=0;i<executors.length;i++){ ThreadPoolExecutor executor=executors[i]; try { executor.awaitTermination(10,TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } executor.shutdownNow(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //客戶端來了一個新的連接 LOG.info("connection comes {}",ctx.channel().remoteAddress()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //客戶端走了一個 LOG.info("connection leaves {}",ctx.channel().remoteAddress()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest){ FullHttpRequest req= (FullHttpRequest) msg; CRC32 crc32=new CRC32(); crc32.update(ctx.hashCode()); int idx =(int) (crc32.getValue()%executors.length); //用業務線程處理消息 this.executors[idx].execute(() ->{ requestDispatch.dispatch(ctx,req); }); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //此處可能因為客戶機器突發重啟 //也可能客戶端連接時間超時,后面的REadTimeoutHandle拋出異常 //也可能消息協議錯誤,序列化異常 ctx.close(); } }
HttpServer
public class HttpServer { private final static Logger LOG= LoggerFactory.getLogger(HttpServer.class); private String ip; private int port; //端口 private int ioThreads; //IO線程數,用於處理套接字讀寫,由Netty內部管理 private int workerThreads; //業務線程數,專門處理http請求,由我們本省框架管理 private RequestDispatch requestDispatch;//請求配發器對象 public HttpServer() { } public HttpServer(String ip, int port, int ioThreads, int workerThreads, RequestDispatch requestDispatch) { this.ip = ip; this.port = port; this.ioThreads = ioThreads; this.workerThreads = workerThreads; this.requestDispatch = requestDispatch; } //用於服務端,使用一個ServerChannel接收客戶端的連接, // 並創建對應的子Channel private ServerBootstrap bootstrap; //包含多個EventLoop private EventLoopGroup group; //代表一個Socket連接 private Channel serverChannel; // private MessageCollector collector; public void start(){ bootstrap=new ServerBootstrap(); group=new NioEventLoopGroup(ioThreads); bootstrap.group(group); collector=new MessageCollector(workerThreads,requestDispatch); bootstrap.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline=socketChannel.pipeline(); //如果客戶端60秒沒有任何請求,就關閉客戶端連接 pipeline.addLast(new ReadTimeoutHandler(10)); //客戶端和服務器簡單的編解碼器:HttpClientCodec和HttpServerCodec。 //ChannelPipelien中有解碼器和編碼器(或編解碼器)后就可以操作不同的HttpObject消息了;但是HTTP請求和響應可以有很多消息數據, // 你需要處理不同的部分,可能也需要聚合這些消息數據 pipeline.addLast(new HttpServerCodec()); //通過HttpObjectAggregator,Netty可以聚合HTTP消息, // 使用FullHttpResponse和FullHttpRequest到ChannelPipeline中的下一個ChannelHandler,這就消除了斷裂消息,保證了消息的完整 pipeline.addLast(new HttpObjectAggregator(1 << 30)); // max_size = 1g //允許通過處理ChunkedInput來寫大的數據塊 pipeline.addLast(new ChunkedWriteHandler()); //將業務處理器放到最后 pipeline.addLast(collector); } }); } public void stop() { // 先關閉服務端套件字 serverChannel.close(); // 再斬斷消息來源,停止io線程池 group.shutdownGracefully(); // 最后停止業務線程 collector.closeGracefully(); } }
RequestDispatcherImpl 是請求派發器,用於將收到的HTTP請求對象扔給響應的RequestHandler進行處理。
public class RequestDispatcherImpl implements RequestDispatch { private final static Logger LOG = LoggerFactory.getLogger(RequestDispatcherImpl.class); private String contextRoot; private Router router; private Map<Integer, WebExceptionHandler> exceptionHandlers = new HashMap<>(); private WebExceptionHandler defaultExceptionHandler = new DefaultExceptionHandler(); private WebTemplateEngine templateEngine = new WebTemplateEngine() { }; static class DefaultExceptionHandler implements WebExceptionHandler { @Override public void handle(ApplicationContext ctx, AbortException e) { if (e.getStatus().code() == 500) { LOG.error("Internal Server Error", e); } ctx.error(e.getContent(), e.getStatus().code()); } } public RequestDispatcherImpl(Router router) { this("/", router); } public RequestDispatcherImpl(String contextRoot, Router router) { this.contextRoot = CurrentUtil.normalize(contextRoot); this.router = router; } public RequestDispatcherImpl templateRoot(String templateRoot) { this.templateEngine = new FreemarkerEngine(templateRoot); return this; } public String root() { return contextRoot; } public RequestDispatcherImpl exception(int code, WebExceptionHandler handler) { this.exceptionHandlers.put(code, handler); return this; } public RequestDispatcherImpl exception(WebExceptionHandler handler) { this.defaultExceptionHandler = handler; return this; } @Override public void dispatch(ChannelHandlerContext channelCtx, FullHttpRequest req) { ApplicationContext ctx = new ApplicationContext(channelCtx, contextRoot, templateEngine); try { this.handleImpl(ctx, new Request(req)); } catch (AbortException e) { this.handleException(ctx, e); } catch (Exception e) { this.handleException(ctx, new AbortException(HttpResponseStatus.INTERNAL_SERVER_ERROR, e)); } finally { req.release(); } } private void handleException(ApplicationContext ctx, AbortException e) { WebExceptionHandler handler = this.exceptionHandlers.getOrDefault(e.getStatus().code(), defaultExceptionHandler); try { handler.handle(ctx, e); } catch (Exception ex) { this.defaultExceptionHandler.handle(ctx, new AbortException(HttpResponseStatus.INTERNAL_SERVER_ERROR, ex)); } } private void handleImpl(ApplicationContext ctx, Request req) throws Exception { if (req.decoderResult().isFailure()) { ctx.abort(400, "http protocol decode failed"); } if (req.relativeUri().contains("./") || req.relativeUri().contains(".\\")) { ctx.abort(400, "unsecure url not allowed"); } if (!req.relativeUri().startsWith(contextRoot)) { throw new AbortException(HttpResponseStatus.NOT_FOUND); } req.popRootUri(contextRoot); router.handle(ctx, req); } }
項目github位置
https://github.com/developerxiaofeng/WebFrameByNetty.git