Spring - Netty (整合)


 瘋狂創客圈,一個Java 高並發研習社群 【博客園 總入口 】


 

 

   寫在前面 

大家好,我是作者尼恩。目前和幾個小伙伴一起,組織了一個高並發的實戰社群【瘋狂創客圈】。正在開始 高並發、億級流程的 IM 聊天程序 學習和實戰,此文是:  瘋狂創客圈 Java 分布式聊天室【 億級流量】實戰系列之 -30

 

順便說明下:
本文的內容只是一個初稿、初稿,本文的知識,在《Netty Zookeeper Redis 高並發實戰》一書時,進行大篇幅的完善和更新,並且進行的源碼的升級。 博客和書不一樣,書的內容更加系統化、全面化,更加層層升入、層次分明、更多次的錯誤排查,請大家以書的內容為准。
本文的最終內容, 具體請參考瘋狂創客圈 傾力編著,機械工業出版社出版的 《Netty Zookeeper Redis 高並發實戰》一書 。

 書籍 

  

Spring Netty 整合實戰

瘋狂創客圈 死磕Netty 系列之11

主要介紹的是SpringBoot整合Netty。在使用Netty之前,建議先了解Netty的基本原理,請參閱瘋狂創客圈。

這里僅僅是使用Netty的第一步,這里介紹一個最簡單的Demo——EchoServer,也就是回寫服務器。就是無論客戶端發啥字符串到服務器端,服務器端接收字符串后直接回寫到客戶端。

 

源碼下載鏈接:

點擊下載

 

本篇內容綱要

  • 環境要求

  • Spring +netty 服務器端

  • Spring +netty 客戶端

  • Spring讀取配置文件中的屬性值

環境要求

  • JDK::1.8

  • Netty::4.0或以上(不包括5)

 
        

<java.version>1.8</java.version>
<springboot>1.5.9.RELEASE</springboot>
<netty.version>4.0.33.Final</netty.version>

Spring +netty 服務器端

 

回寫服務器 Echo Server 程序主要由兩部分組成:

  • ServerBootstrap:服務器啟動引導器。負責配置服務器端基本信息,並且完成服務器的啟動

  • EchoServerHandler:回寫的業務邏輯處理器

ServerBootstrap

首先是編寫服務端的啟動類,代碼中相應的注釋在寫得很詳細。主要的步驟如下:

  1. 創建一個ServerBootstrap實例

  2. 創建一個EventLoopGroup來處理各種事件,如處理鏈接請求,發送接收數據等。

  3. 設置本地監聽端口 InetSocketAddress( port)

  4. 設置 childHandler 來設置通道初始化類。並且在通道初始化時,加入回寫的業務邏輯處理器EchoServerHandler到服務器通道的pipeline中 。childHandler 在通道初始化時,會被執行一次。

  5. 所有准備好之后調用ServerBootstrap.bind() 方法綁定 Server

不過需要注意的是,在不使用Spring的環境中,是通過main方法直接啟動服務端,因此是直接new一個處理器echoServerHandler 對象。而在和Spring 整合之后,我們需要將 echoServerHandler 處理器交給springBoot去管理。

ServerBootstrap 代碼如下:
@Service("EchoServer") public class EchoServer { // 服務器端口 @Value("${server.port}") private int port; // 通過nio方式來接收連接和處理連接 private static EventLoopGroup boss = new NioEventLoopGroup(); private static EventLoopGroup work = new NioEventLoopGroup(); ​ // 啟動引導器 private static ServerBootstrap b = new ServerBootstrap(); @Autowired private EchoServerHandler echoServerHandler; ​ public void run() { try { b.group(boss, work); // 設置nio類型的channel b.channel(NioServerSocketChannel.class); // 設置監聽端口 b.localAddress(new InetSocketAddress(port)); // 設置通道初始化 b.childHandler(new ChannelInitializer<SocketChannel>() { //有連接到達時會創建一個channel protected void initChannel(SocketChannel ch) throws Exception { // pipeline管理channel中的Handler // 在channel隊列中添加一個handler來處理業務 ch.pipeline().addLast("echoServerHandler",echoServerHandler); } }); // 配置完成,開始綁定server // 通過調用sync同步方法阻塞直到綁定成功 ​ ChannelFuture f = b.bind().sync(); System.out.println(EchoServer.class.getName() + " started and listen on " + f.channel().localAddress()); ​ // 監聽服務器關閉事件 // 應用程序會一直等待,直到channel關閉 f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 關閉EventLoopGroup,釋放掉所有資源包括創建的線程 work.shutdownGracefully(); boss.shutdownGracefully(); } ​ } }
業務邏輯ServerHandler:

要想處理接收到的數據,我們必須繼承ChannelInboundHandlerAdapter接口,重寫里面的channelRead方法,每當有數據到達,此方法就會被調用(一般是Byte類型數組),我們就在這里寫我們的業務邏輯:

 
        

@Service("echoServerHandler") public class EchoServerHandler extends ChannelInboundHandlerAdapter { ​ /** * 建立連接時,發送一條消息 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("連接的客戶端地址:" + ctx.channel().remoteAddress()); super.channelActive(ctx); } ​ public void channelRead(ChannelHandlerContext ctx, Object msg) { try { System.out.println("server received data :" + msg); ctx.write(msg);//寫回數據, ​ } finally { ReferenceCountUtil.release(msg); } } ​ public void channelReadComplete(ChannelHandlerContext ctx) { //flush掉所有寫回的數據 ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); //當flush完成后關閉channel } ​ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { //捕捉異常信息 cause.printStackTrace(); //出現異常時關閉channel ctx.close(); } } ​

關於異常處理:

我們在上面程序中也重寫了exceptionCaught方法,這里就是對當異常出現時的處理。

Spring +netty 客戶端

EchoClient 扮演如下角色:

  • 連接到Server

  • 向Server寫數據,

  • 等待Server返回數據

回寫客戶端程序EchoClient 主要由兩部分組成:

  • Bootstrap:客戶端啟動引導器。負責配置客戶端基本信息,並且完成客戶端的啟動

  • EchoClientHandler :客戶端業務邏輯處理器

EchoClient Bootstrap的過程:

和Server端類似,只不過Client端要同時指定連接主機的IP和Port。

  1. 創建一個Bootstrap實例

  2. 創建一個EventLoopGroup 來處理各種事件,如處理鏈接請求,發送接收數據等。

  3. 定義需要連接到的遠程服務器的InetSocketAddress,包含了IP+端口

  4. 設置 childHandler 來設置通道初始化類。並且在通道初始化時,加入客戶端的業務邏輯處理器echoClientHandler 到服務器通道的pipeline中 。當連接完成之后,childHandler 會被執行一次 。

  5. 所有准備好之后調用 ServerBootstrap.connect() 方法連接Server

EchoClient Bootstrap的代碼:
@Service("EchoClient") public class EchoClient { // 服務器ip地址 @Value("${server.ip}") private String host; // 服務器端口 @Value("${server.port}") private int port; ​ // 通過nio方式來接收連接和處理連接 private EventLoopGroup group = new NioEventLoopGroup(); ​ @Autowired private EchoClientHandler echoClientHandler; ​ /** * 唯一標記 */ private boolean initFalg = true; ​ /** * 客戶端的是Bootstrap,服務端的則是 ServerBootstrap。 * 都是AbstractBootstrap的子類。 **/ public void run() { doConnect(new Bootstrap(), group); } ​ /** * 重連 */ public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) { ChannelFuture f = null; try { if (bootstrap != null) { bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.remoteAddress(host, port); ​ // 設置通道初始化 bootstrap.handler( new ChannelInitializer<SocketChannel>() { public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(echoClientHandler); } } ); f = bootstrap.connect().addListener((ChannelFuture futureListener) -> { final EventLoop eventLoop = futureListener.channel().eventLoop(); if (!futureListener.isSuccess()) { System.out.println("與服務端斷開連接!在10s之后准備嘗試重連!"); eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS); } }); if (initFalg) { System.out.println("EchoClient客戶端連接成功!"); initFalg = false; } // 阻塞 f.channel().closeFuture().sync(); } } catch (Exception e) { System.out.println("客戶端連接失敗!" + e.getMessage()); } ​ } }
EchoClientHandler 客戶端業務邏輯處理器

要想處理接收到的數據,我們必須繼承ChannelInboundHandlerAdapter基類,重寫里面的channelRead方法,每當有數據到達,此方法就會被調用(一般是Byte類型數組),我們就在這里寫我們的業務邏輯:

 
        

@Service("echoClientHandler") public class EchoClientHandler extends ChannelInboundHandlerAdapter { /** * 此方法會在連接到服務器后被調用 */ public void channelActive(ChannelHandlerContext ctx) { ctx.write(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } ​ /** * 業務邏輯處理 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 如果不是protobuf類型的數據 if (!(msg instanceof ByteBuf)) { System.out.println("未知數據!" + msg); return; } try { ByteBuf in = (ByteBuf) msg; System.out.println("Client received: " + ByteBufUtil.hexDump(in.readBytes(in.readableBytes()))); } catch (Exception e) { e.printStackTrace(); } finally { ReferenceCountUtil.release(msg); } } /** * 捕捉到異常 */ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }​

除了繼承ChannelInboundHandlerAdapter基類,我們的業務Handler還可以繼承 SimpleChannelInboundHandler 基類。

那么這兩個有什么區別呢?

  • SimpleChannelInboundHandler在接收到數據后會自動release掉數據占用的Bytebuffer資源(自動調用Bytebuffer.release())。如果在channelRead方法返回前還沒有寫完數據,也就是當不能讓它自動release時,就不能繼承 SimpleChannelInboundHandler 基類。而繼承ChannelInboundHandlerAdapter則不會自動釋放,需要手動調用ReferenceCountUtil.release()等方法進行釋放。

  • SimpleChannelInboundHandler還有一個好處,可以在泛型參數中,可以直接指定好傳輸的數據格式。所以繼承該類,在處理數據時,不需要判斷數據格式。而繼承ChannelInboundHandlerAdapter則需要進行數據格式的判斷和轉換。

  • 推薦在服務端去繼承ChannelInboundHandlerAdapter,建議手動進行釋放,防止數據未處理完就自動釋放了。

Spring讀取配置文件中的屬性值

在Netty 的程序中,一般需要用到服務器ip和端口,最好的方式是放在配置文件中,方便修改。

Spring Boot 默認的配置文件名稱為 application.properties,SpringApplication將從以下位置加載此文件:

  • 當前目錄下的/config子目錄,

  • 當前目錄

  • 一個classpath下的/config包

  • classpath 根路徑(root)

一般情況下,工程在編譯之后,application.properties 放在classpath 根路徑下。

配置文件 application.properties
#端口號 server.port=8081 IPserver.ip=127.0.0.1

注意:文件名字不能錯哦,是application.properties

關聯配置項到類屬性

在類域屬性上通過@Value("${配置項}")指定關聯屬性,Spring Application會自動加載。

public class EchoServer { // 服務器端口 @Value("${server.port}") private int port; //... }
啟動配置項自動掃描

使用 @Configuration、@EnableAutoConfiguration 啟動配置項的自動掃描。

//自動加載配置信息 @Configuration @EnableAutoConfiguration //使包路徑下帶有@Value的注解自動注入 //使包路徑下帶有@Autowired的類可以自動注入 @ComponentScan("com.crazymakercircle.nettydemo.server") @SpringBootApplication public class ServerApp { ​ // ........ }

瘋狂創客圈 實戰計划
  • Netty 億級流量 高並發  IM后台 開源項目實戰

  • Netty 源碼、原理、JAVA NIO 原理

  • Java 面試題 一網打盡

  • 瘋狂創客圈 【 博客園 總入口 】


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM