接下來我們將展示如何構建一個基於Netty的客戶端和服務器,程序很簡單:客戶端將消息發送給服務器,而服務器再將消息回送給客戶端,這將是一個對你而言很重要的第一個netty的實踐經驗。
1、設置開發環境
編譯和運行,我們需要准備JDK和Apache Maven工具,這里建議大家使用Java的集成開發環境(IDE)。
如果你已經安裝了JDK,那么可以略過此步。
否則,請從http://java.com/en/download/manual.jsp 處獲取JDK第8版,請下載JDK,而不是Java運行環境(JRE),其僅僅可以運行Java應用程序,但不夠編譯它們。
有關安裝說明:
——將環境變量JAVA_HOME設置為你的JDK安裝位置
——將%JAVA_HOME%\bin添加到你的執行路徑
下面是使用最廣泛的Java IDE,都可以免費獲取
——Eclipse——www.eclipse.org
——NetBeans——https://netbeans.org
——Intellij IDEA Community Edition——www.jetbrains.com
有關MAVEN的安裝也與Java JDK安裝類似
2、Netty客戶端/服務器概覽
圖2-1展示了我們將要編寫的Echo客戶端和服務器應用程序,即使可能我們要編寫基於Web的用於被瀏覽器訪問的應用程序,但是通過同時實現客戶端和服務器,你一定能更加全面地理解Netty的API。
雖然圖中也展示了我們一開始所說的多個客戶端,所能夠支持的客戶端數量,在理論上,僅受限於系統的可用資源(以及所使用的JDK版本可能會施加的限制)。
Echo客戶端和服務器之間的交互非常簡單,其本身也充分體現了客戶端/服務器系統中典型的請求-響應交互模式。
3、編寫Echo服務器
所有的Netty服務器都需要以下兩個部分:
——至少一個ChannelHandler——該組件實現了服務器對從客戶端接收的數據的處理,即它的業務邏輯。
——引導——配置服務器的啟動代碼,將服務器綁定到它要監聽連接請求的端口上。
我們的服務器會響應傳入的消息,需要實現ChannelInboundHandler接口,用來定義響應入站事件的方法,對於此應用而言只需要用到少量的這些方法,所以繼承ChannelInboundHandlerAdapter類就足夠了,它提供了ChannelInboundHandler的默認實現。
——channelRead():對於每個傳入的消息都要調用
——channelReadComplete():通知ChannelInboundHandler最后一次對channelRead()的調用時當前批量讀取中的最后一條消息
——exceptionCaught():在讀取操作期間,有異常跑出會調用
代碼清單2-1,展示Echo服務器的ChannelHandler實現EchoServerHandler。
@ChannelHandler.Sharable //標示一個ChannelHandler可以被多個Channel安全地共享 public class EchoServerHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf)msg; System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));//將消息記錄到控制台 ctx.write(in);//將接受到的消息寫給發送者,而不沖刷出站消息· } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //將未決消息沖刷到遠程節點,並且關閉該Channel ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace();//打印異常棧跟蹤 ctx.close();//關閉該Channel } }
ChannelInboundHandlerAdapter有一個直觀的API,並且它的每個方法都可以被重寫以掛鈎到事件生命周期的恰當點上。因為需要處理所有接收到的數據,所以重寫channelRead()方法
重寫exceptionCaught()方法允許你對Throwable的任何子類做出反應,我們代碼中記錄了異常並關閉了連接。
如果不捕獲異常,會發生什么呢?
每個Channel都擁有一個與之相關聯的ChannelPipeline,其持有一個ChannelHandler的實例鏈,在默認情況下,ChannelHandler會把對它的方法的調用轉發給鏈中的下一個ChannelHandler,因此,如果exceptionCaught()方法沒有被該鏈中的某處實現,那么所接收的異常將會被傳遞到ChannelPipeline的尾端並被記錄,為此,你的應用程序應該提供至少有一個實現了exceptionCaught()方法的ChannelHandler。
除了ChannelInboundHandlerAdapter之外,還有很多需要學習的ChannelHandler的子類型和實現。
——針對不同類型的事件調用ChannelHandler
——應用程序通過實現或者擴展ChannelHandler來掛鈎到事件的生命周期,並且提供自定義的應用程序邏輯
——在架構上,ChannelHandler有助於保持業務邏輯與網絡處理代碼的分離,這簡化了開發過程,因為代碼必須不斷地演化以響應不斷變化的需求
在討論過EchoServerHandler實現的核心業務邏輯之后,我們現在可以討論引導服務器本身的過程:
——綁定到服務器將在其上監聽並接受請求的端口
——配置Channel,以將有關的入站消息通知給EchoServerHandler實例
傳輸:
在網絡協議的標准多層視圖中,傳輸層提供了端到端的或者主機到主機的通信服務。
因特網通信是建立在TCP傳輸之上的,除了一些由Java NIO實現提供的服務器端性能增強之外,NIO傳輸大多數時候指的就是TCP傳輸。
public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[] args) throws Exception{ if (args.length != 1){ System.out.println("Usage: " + EchoServer.class.getSimpleName() + " <port>"); return; } int port = Integer.parseInt(args[0]);//設置端口值(如果端口參數的格式不正確,則拋出一個NumberFormatException) new EchoServer(port).start();//調用服務器的start()方法 } public void start() throws Exception{ final EchoServerHandler serverHandler = new EchoServerHandler(); EventLoopGroup group = new NioEventLoopGroup();//創建EventLoopGroup try { ServerBootstrap b = new ServerBootstrap();//創建ServerBootstrap b.group(group) .channel(NioServerSocketChannel.class)//指定所使用的NIO傳輸Channel .localAddress(new InetSocketAddress(port))//使用指定的端口設置套接字地址 .childHandler(new ChannelInitializer<SocketChannel>() {//添加一個EchoServerHandler到子Channel的ChannelPipeline @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //EchoServerHandler被標注為@Shareable,所以我們可以總是使用同樣的實例 socketChannel.pipeline().addLast(serverHandler); } }); ChannelFuture f = b.bind().sync();//異步地綁定服務器,調用sync()方法阻塞等待直到綁定完成 f.channel().closeFuture().sync();//獲取Channel的closeFuture,並且阻塞當前線程直到它完成 }finally { group.shutdownGracefully().sync();//關閉EventLoopGroup釋放所有的資源 } } }
我們創建了一個ServerBootstrap實例,因為正在使用NIO傳輸,指定NioEventLoopGroup來接收和處理新的連接,並且將Channel的類型指定為NioServerSocketChannel。在此之后,將本地地址設置為一個具有選定端口的InetSocketAddress,服務器將綁定到這個地址以監聽新的連接請求。
使用一個特殊的類——ChannelInitializer。當一個新的連接被接受時,一個新的子Channel將會被創建,而ChannelInitializer將會把一個你的EchoServerHandler的實例添加到該Channel的ChannelPipeline中,即這個ChannelHandler將會收到有關入站消息的通知。
雖然NIO是可伸縮的,但是其關於多線程處理的配置並不簡單。Netty的設計封裝了大部分的復雜性。
綁定服務器,並等待綁定完成。(對sync()方法的調用將導致當前Thread阻塞,一直到綁定操作完成為止)該應用程序將會阻塞等待直到服務器的Channel關閉(因為我的Channel的CloseFuture上調用sync()方法),之后我們可以關閉EventLoopGroup,並釋放所有的資源,包括所有被創建的線程。
使用了NIO,因為得益於它的可擴展性和徹底的異步性,它是目前使用最廣泛的傳輸,可以使用一個不同的傳輸實現,當然如果你想要在自己的服務器中使用OIO傳輸,將需要指定OioServerSocketChanne和OioEventLoopGroup。
讓我們回顧一下服務器中的重要步驟:
——EchoServerHandler實現了業務邏輯
——main()方法引導了服務器
引導過程中所需的步驟:
——創建一個ServerBootstrap的實例以引導和綁定服務器
——創建並分配一個NioEventLoopGroup實例以進行事件的處理,如接受新連接以及讀/寫數據
——指定服務器綁定的本地的InetSocketAddress
——使用一個EchoServerHandler的實例初始化每一個新的Channel
——調用ServerBootstrap.bing()方法以綁定服務器
4、編寫Echo客戶端
1、連接到服務器 2、發送一個或者多個消息 3、對於每個消息,等待並接收從服務器發回的相同的消息 4、關閉連接
編寫客戶端所涉及的兩個主要代碼部分也是業務邏輯和引導
客戶端將擁有一個用來處理數據的ChannelInboundHandler,在這個場景下,將擴展SimpleChannelInboundHandler類以處理所有必須的任務。如代碼清單2-3,要求重寫下面的方法:
——channelActive():在到服務器的連接已經建立之后將被調用
——channelRead():當從服務器接收到一條消息時被調用
——exceptionCaught():在處理過程中引發異常時被調用
@ChannelHandler.Sharable //標記該類的實例可以被多個Channel共享 public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf>{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //當被通知Channel是活躍的時候,發送一條消息 ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { //記錄已接收消息的轉儲 System.out.println("Client received: " + byteBuf.toString(CharsetUtil.UTF_8)); } /** * 在發生異常時,記錄錯誤並關閉Channel * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
重寫了channelActive()方法,其將在一個連接建立時被調用,這確保了數據將會被盡可能快地寫入服務器,其在這個場景下是一個編碼了字符串“Netty rocks!”的字符串緩存區。
重寫了channelRead0()方法,每當接收數據時,都會調用這個方法。需要注意的是,由服務器發送的消息可能會被分塊接收。如果服務器發送了5字節,那么不能保證這5字節會被一次性接收。
即使是對於這么少量的數據,channelRead0()方法也可能會被調用兩次。作為一個面向流的協議,TCP保證了字節數組將會按照服務器發送它們的順序被接收
重寫了exceptionCaught()。如同在EchoServerHandler(見代碼清單2-2)中所示,記錄Throwable,關閉Channel,在這個場景下,終止到服務器的連接。
SimpleChannelInboundHandler與ChannelLnboundHandler
為什么我們在客戶端使用的是SimpleChannelInboundHandler,而不是在EchoServerHandler中所使用的ChannelInboundHandlerAdapter呢?這兩個因素的相互作用有關:業務邏輯如何處理消息以及Netty如何管理資源
在客戶端,當channelRead()方法完成時,你已經有了傳入消息,並且已經處理完它了。當該方法返回時,SimpleChannelInboundHandler負責釋放指向保存該消息的ByteBuf的內存引用。
在EchoServerHandler中,你仍然需要將傳入消息回送給發送者,而write()操作時異步的,直到channelRead()方法返回后可能仍然沒有完成,為此,EchoServerHandler擴展了ChannelInboundHandlerAdapter,其在這個時間點上不會釋放消息。
消息在EchoServerHandler的channelReadComplete()方法中,當writeAndFlush()方法被調用時被釋放。
引導客戶端類似於引導服務器,不同的是,客戶端是使用主機和端口參數來連接遠程地址,也就是這里的Echo服務器的地址,而不是綁定到一個一直被監聽的端口。
public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap();//創建Bootstrap b.group(group)//指定EventLoopGroup以處理客戶端事件,需要適用於NIO的實現 .channel(NioSocketChannel.class)//適用於NIO傳輸的Channel類型 .remoteAddress(new InetSocketAddress(host,port))//設置服務器的InetSocketAddress .handler(new ChannelInitializer<SocketChannel>() {//在創建Channel時,向ChannelPipeline中添加一個EchoClientHandler實例 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect().sync();//連接到遠程節點,阻塞等待直到連接完成 f.channel().closeFuture().sync();//阻塞,直到Channel關閉 }finally { group.shutdownGracefully().sync();//關閉線程池並且釋放所有的資源 } } public static void main(String[] args) throws Exception{ if (args.length != 2){ System.out.println("Usage: " + EchoClient.class.getSimpleName() + " <host> <port>"); return; } String host = args[0]; int port = Integer.parseInt(args[1]); new EchoClient(host,port).start(); } }
注意,你可以在客戶端和服務器上分別使用不同的傳輸。在服務器端使用NIO傳輸,而在客戶端使用OIO傳輸。
——為初始化客戶端,創建了一個Bootstrap實例
——為進行事件處理分配了一個NioEventLoopGroup實例,其中事件處理包括創建新的連接以及處理入站和出站數據
——為服務器連接創建了一個InetSocketAddress實例
——當連接被建立時,一個EchoClientHandler實例會被安裝到(該Channel的)ChannelPipeline中
——在一切都設置完成后,調用Bootstrap.connect()方法連接到遠程節點
在本節中雖然只是一個簡單的應用程序, 但是它可以伸縮到支持數千並發連接——每秒可以比普通的基於套接字的Java應用程序處理多得多的消息。
深入地了解Netty對於關注點分離的架構原則的支持,通過提供正確的抽象來解耦業務邏輯和網絡編程邏輯。
