一、Netty框架介紹
什么是netty?先看下百度百科的解釋:
為什么好多大公司都在使用netty框架?主要是基於netty框架的以下幾個特點決定的:
1)健壯性,2)功能齊全,3)可定制,4)擴展性
二、框架優點
傳統的RPC性能差,主要是由於客戶端和遠程調用采用了同步阻塞IO線程,當客戶端的並發壓力增大后,同步阻塞會由於頻繁的等待導致I/O線程堵塞,線程無法高效的工作,IO處理能力自然會降低。影響性能的三個因素:第一,IO模型,IO模型在一定程度上決定了框架的性能。第二、協議,如:HTTP、TCP/IP等,協議選擇的不同,性能模型也不同,通常情況下,內部私有協議的性能比較優,這是由於內部設計決定的。第三、線程,數據報文的接收、讀取、編碼、解碼等,線程模型的不同,性能也不同。相比於傳統的RPC框架,netty的優點主要體現在以下幾個方面:
- API使用簡單,封裝非常完善,開發門檻低
- 功能上強大,預置了多種編碼解碼功能,多種協議支持
- 定制能力強,可以對ChannelHandler對通信框架靈活擴展
- 性能高,Reactor線程模型調度,ChannelFuture-Listener,通過Listener機制主動推送結果
- 版本成熟穩定,社區活躍,版本更新快,出現的Bug會被很快的修復,同時,有心功能的加入,經歷了大規模的商業應用考驗,質量的到了充分的驗證。已經廣泛應用到互聯網、大數據、企業應用、電信軟件、網絡游戲等熱門行業,他可以滿足不同的商業標准。
三、Netty架構分析
Netty是一個基於三層網絡架構模型的框架,三層網絡架構分析包括調度層、鏈條傳遞層以及業務邏輯層。
- Reactor通信調度層,是一個模型,
NIO線程池組件{
監聽網絡讀寫連接
業務調度處理
NIO,AIO,配合NIO通道NioSocketChannel組件
}
Netty通過內部select巡查機制,能夠實現IO多路復用,通過把多個IO阻塞復用到同一個select的阻塞上,從而能夠使系統即使在單線程的情況下,也能夠同時處理多個請求。這樣就使得netty實現了IO多路復用的優勢,與傳統多線程相比,大大減少了系統的開銷,因為系統不必創建新的線程和銷毀線程了,減少了系統的維護難度,節省了資源。
ByteBuffer池化支持,不用手動切換標志位,實現零拷貝。傳統的Socket讀寫,基本是使用堆內存進行,即jvm事先會把堆內存拷貝到內存中,然后再寫入Socket,而netty采用的是DIRECT BUFFERS,不需要經過jvm內存拷貝,在堆外內存直接進行Socket讀寫,這樣就少了一次緩沖區的內存拷貝,從而實現零拷貝。
2.Pipleline職責鏈條傳遞
攔截處理向前向后事件,外部傳入的消息包對象,有POJO信息抽象,上層也只需要處理邏輯,類似SpringIOC處理BeanDefince。不同的Handler節點的功能也不同,通常情況下需要編碼解碼等,它可以完成外部協議到內部POJO對象的轉化,這樣上層只需要關注業務邏輯,不需要知道底層的協議和線程模型,從而實現解耦。
3.構建邏輯業務處理單元
底層的協議隔離,上層處理邏輯框架並不需要關心底層協議是什么。Netty框架的分層設計使得開發人員不需要關注協議框架的實現,只需要關注服務層的業務邏輯開發即可,實現了簡單化。
之前有個項目是基於傳統Socket和線程池的技術實現的,但是在高並發的時候發現並發能力不足,壓測的時候發現TPS達不到理想值,所以經過考慮,決定使用netty框架來解決此問題。同樣,netty框架也分為客戶端和服務端,經過整理,先寫一個demo初探netty框架,下面是代碼的實現過程。
首先是服務端,服務端包含兩個方面,第一、服務端Server的主要作用就是通過輔助引導程序,設置NIO的連接方式處理客戶端請求,通過綁定特定端口、設定解碼方式以及監聽來實現整個線程的處理請求;第二、服務端Handler需要繼承ChannelInboundHandlerAdapter類,handler類的主要作用是讀取客戶端數據,處理業務,拋出異常,響應客戶端請求。代碼如下:
服務端Server:

public class Server { private static Log logger = LogFactory.getLog(Server.class); private int port; public Server(int port) { super(); this.port = port; } public void start(){ ServerBootstrap b = new ServerBootstrap();//引導輔助程序 EventLoopGroup group = new NioEventLoopGroup();//通過nio方式來接收連接和處理請求 try { b.group(group); b.channel(NioServerSocketChannel.class);//設置nio類型的channnel b.localAddress(new InetSocketAddress(port));//設置監聽端口 //b.option(ChannelOption.SO_BACKLOG, 2048); b.childHandler(new ChannelInitializer<SocketChannel>() {//有連接到達時會創建一個channel @Override protected void initChannel(SocketChannel ch) throws Exception { //注冊handler ch.pipeline().addLast(new ByteArrayDecoder()); ch.pipeline().addLast(new ByteArrayEncoder()); ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8"))); ch.pipeline().addLast(new ServerHandler()); } });//.option(ChannelOption.SO_BACKLOG, 2048).childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind().sync();//配置完成,開始綁定server,通過調用sync同步方法阻塞直到綁定成功 logger.info(Server.class.getName()+"開始監聽:"+f.channel().localAddress()); f.channel().closeFuture().sync();//應用程序會一直等待直到channel關閉 } catch (Exception e) { e.printStackTrace(); } finally { try { //關閉EventLoopGroup,釋放掉所有資源包括創建的線程 group.shutdownGracefully().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
服務端Handler

public class ServerHandler extends ChannelInboundHandlerAdapter { private static Log logger=LogFactory.getLog(ServerHandler.class); @Override public void channelActive(ChannelHandlerContext ctx){ logger.info(ctx.channel().localAddress().toString()+"通道活躍...."); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { logger.error(ctx.channel().localAddress().toString()+"通道不活躍...."); } /** * * 讀取客戶端傳過來的消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //業務處理類 logger.info("開始業務處理...."); new SocketController(ctx,msg).run(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //出現異常,關閉連 logger.error("服務端出現異常:"+cause.getMessage(),cause); ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { logger.info("服務端完成請求!"); ctx.flush(); } }
客戶端代碼
客戶端主要是用來向服務端發送數據,同樣包含兩個方面,第一、Client主要通過設定端口和IP和服務器建立連接,進行數據包的編碼;第二、ClientHandler 需要繼承 SimpleChannelInboundHandler<ByteBuf>類,針對不同的傳輸方式,繼承不同的類,handler類同樣處理業務請求,響應服務端的請求。代碼如下:
客戶端Client:

public class Client { private static Log logger=LogFactory.getLog(Client.class); private String host; private int port; public Client(String host, int port) { super(); this.host = host; this.port = port; } public void connect(){ EventLoopGroup workGroup=new NioEventLoopGroup(); Bootstrap bootstrap=new Bootstrap(); bootstrap.group(workGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { logger.info("客戶端觸發連接......"); ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8"))); ch.pipeline().addLast(new ClientHandler()); } }); //客戶端開始連接 try { logger.info("連接到服務器......"); ChannelFuture future=bootstrap.connect(host,port).sync(); //等待連接關閉 future.channel().closeFuture().sync(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ workGroup.shutdownGracefully(); } } }
客戶端Handler:

public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private static Log logger=LogFactory.getLog(ClientHandler.class); /** * 向服務端發送消息 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info(ctx.channel().localAddress().toString()+"客戶點活躍..."); //向服務端寫字符串 logger.info("客戶端連接服務端,開始發送數據....."); String string ="hello server!"; System.out.println("發送數據為:"+string); ByteBuf buf=ctx.alloc().buffer(4*string.length()); buf.writeBytes(string.getBytes()); ctx.writeAndFlush(buf); logger.info("發送完畢..."); } /** * 讀取服務端返回來的消息 */ @Override protected void channelRead0(ChannelHandlerContext arg0, ByteBuf in) throws Exception { logger.info("開始接受服務端數據"); byte[] b=new byte[in.readableBytes()]; in.readBytes(b); String string=new String(b); logger.info("服務端發送的數據為:"+string); in.release(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.info("客戶端異常:"+cause.getMessage(),cause); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { logger.info("客戶端完成請求...."); ctx.flush(); } }
服務端啟動:

public class ServerMain { private static Log logger=LogFactory.getLog(ServerMain.class); private static Server server =new Server(55550); public static void main(String[] args) { logger.info("服務端啟動......."); server.start(); } }
客戶端啟動類:

public class Test { private static Client client = new Client("127.0.0.1", 55550); public static void main(String[] args) throws UnknownHostException, IOException { client.connect(); } }
測試結果:
服務端:
客戶端:
總結:
以上只是一個netty框架初探的小Demo,學習使用netty框架的開始,這里面涉及到了很多的技術以及非常多的組件,比如:Channels、Callbacks、Futures、Events和handlers等等,需要進一步的學習,另外,消息的編碼解碼、粘包、拆包的方式方法、消息格式的轉換以及報文格式大小限制都需要進一步的研究學習。