作為架構師必須知道的Netty原理和使用


本文首發於微信公眾號【猿燈塔】,轉載引用請說明出處

 

今天呢!燈塔君跟大家講:

Netty應用

 

一.Netty簡介

1、Netty下載

官網:https://netty.io/downloads.html

 

2、Netty簡介

Netty is *an asynchronous event-driven network application framework* for 

rapid development of maintainable high performance protocol servers & 

clients. 

是一個異步事件驅動的網絡應用程序框架,用於快速開發可維護的高性能協議服務器和客戶端 

Netty is a NIO client server framework which enables quick and easy 

development of network applications such as protocol servers and clients. It 

greatly simplifies and streamlines network programming such as TCP and UDP 

socket server. 

'Quick and easy' doesn't mean that a resulting application will suffer from 

a maintainability or a performance issue. Netty has been designed carefully 

with the experiences earned from the implementation of a lot of protocols 

such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. 

As a result, Netty has succeeded to find a way to achieve ease of 

development, performance, stability, and flexibility without a compromise. 

Netty是一個NIO客戶端服務器框架,可以快速輕松地開發網絡應用程序,例如協議服務器和客戶 

端。它極大地簡化和簡化了網絡編程,例如TCP和UDP套接字服務器。“快速簡便”並不意味着最終的應用 

程序將遭受可維護性或性能問題的困擾。Netty經過精心設計,結合了許多協議(例如FTP,SMTP, 

HTTP以及各種基於二進制和文本的舊式協議)的實施經驗。結果,Netty成功地找到了一種無需妥協即 

可輕松實現開發,性能,穩定性和靈活性的方法。 

Features 

Design 設計 

Unified API for various transport types - blocking and non-blocking 

socket

Based on a flexible and extensible event model which allows clear 

separation of concerns Highly customizable thread model - single thread, 

one or more thread 

pools such as SEDA True connectionless datagram socket support (since 

3.1)

Ease of use 使用方便 

Well-documented Javadoc, user guide and examples 

No additional dependencies, JDK 5 (Netty 3.x) or 6 (Netty 4.x) is enough 

Note: Some components such as HTTP/2 might have more requirements. 

Please refer to the Requirements page for more information. 

Performance 性能 

Better throughput, lower latency 

Less resource consumption 

Minimized unnecessary memory copy 

Security 安全 

Complete SSL/TLS and StartTLS support 

Community 社區 

Release early, release often 

The author has been writing similar frameworks since 2003 and he still 

findsyourfeed back precious! 

 

3、常用網絡通信框架

a、Mina

Mina出身於開源界的大牛Apache組織。是 Apache 組織一個較新的項目,它為開發高性能和高可用性的網 絡應用程序提 供了非常便利的框架。當前發行的 Mina 版本2.04支持基於 Java NIO 技術的 TCP/UDP 應用程序開發、串口通訊程 序,Mina 所支持的功能也在進一步的擴展中。目前,正在使用Mina的應用包 括:Apache Directory Project、AsyncWeb,AMQP(Advanced Message Queuing Protocol),RED5 Server(Macromedia Flash Media RTMP)、ObjectRADIUS、 Openfire等等

 

b、Netty

Netty是一款異步的事件驅動的網絡應用框架和工具,用於快速開發可維護的高性能、高擴展性協議服務器和 客戶端。也就是說,Netty是一個NIO客戶端/服務器框架,支持快速、簡單地開發網絡應用,如協議服務器和 客戶端。它極大簡化了網絡編程,如TCP和UDP套接字服務器 

 

c、Grizzly 

Grizzly是一種應用程序框架,專門解決編寫成千上萬用戶訪問服務器時候產生的各種問題。使用JAVA NIO 作為基礎,並隱藏其編程的復雜性。容易使用的高性能的API。帶來非阻塞socketd到協議處理層。利用高性 能的緩沖和緩沖管理使用高性能的線程池。從設計的理念上來看,Mina的設計理念是最為優雅的。當然,由於 Netty的主導作者與Mina的主導作者是同一人,出自同一人之手的Netty在設計理念上與Mina基本上是一致 的。而Grizzly在設計理念上就較差了點,幾乎是JavaNIO的簡單封裝

 

4、使用領域 

a、互聯網行業

阿里分布式服務框架 Dubbo 的 RPC 框架使用 Dubbo 協議進行節點間通信,Dubbo 協議默認使用 

Netty 作為基礎通信組件,用於實現各進程節點之間的內部通信。除了 Dubbo 之外,淘寶的消息中間 

件 RocketMQ 的消息生產者和消息消費者之間,也采用 Netty 進行高性能、異步通信。 

除了阿里系和淘寶系之外,很多其它的大型互聯網公司或者電商內部也已經大量使用 Netty 構建高性 能、分布式的網絡服務器

 

b、游戲行業

無論是手游服務端、還是大型的網絡游戲,Java 語言得到了越來越廣泛的應用。Netty 作為高性能的基礎通信組件,它本身提供了 TCP/UDP 和 HTTP 協議棧,非常方便定制和開發私有協議棧。賬號登陸服務器、地圖服務器之間可以方便的通過 Netty 進行高性能的通信 

 

c、大數據領域

經典的 Hadoop 的高性能通信和序列化組件 Avro 的 RPC 框架,默認采用 Netty 進行跨節點通信,它 

的 Netty Service 基於 Netty 框架二次封裝實現。大數據計算往往采用多個計算節點和一個/N個匯總節 

點進行分布式部署,各節點之間存在海量的數據交換。由於 Netty 的綜合性能是目前各個成熟 NIO 框 

架中最高的,因此,往往會被選中用作大數據各節點間的通信。 

 

d、企業軟件 

企業和 IT 集成需要 ESB,Netty 對多協議支持、私有協議定制的簡潔性和高性能是 ESB RPC 框架的首 

選通信組件。事實上,很多企業總線廠商會選擇Netty 作為基礎通信組件,用於企業的 IT 集成。 

 

e、通信行業

Netty 的異步高性能、高可靠性和高成熟度的優點,使它在通信行業得到了大量的應用。 

 

 

 

二.Netty核心組件

1、Channel 

Channel 是 Netty 網絡操作抽象類,它除了包括基本的 I/O 操作,如 bind、connect、read、write 之 

外,還包括了 Netty 框架相關的一些功能,如獲取該 Channe l的 EventLoop。 

在傳統的網絡編程中,作為核心類的 Socket ,它對程序員來說並不是那么友好,直接使用其成本還是 

稍微高了點。而Netty 的 Channel 則提供的一系列的 API ,它大大降低了直接與 Socket 進行操作的復 

雜性。而相對於原生 NIO 的 Channel,Netty 的 Channel 具有如下優勢(摘自《Netty權威指南(第二 版)》): 

在 Channel 接口層,采用 Facade 模式進行統一封裝,將網絡 I/O 操作、網絡 I/O 相關聯的其他操作封 

裝起來,統一對外提供。 

Channel 接口的定義盡量大而全,為SocketChannel 和 ServerSocketChannel 提供統一的視圖,由不 同子類實現不同的功能,公共功能在抽象父類中實現,最大程度地實現功能和接口的重用。 

具體實現采用聚合而非包含的方式,將相關的功能類聚合在 Channel 中,有 Channel 統一負責和調 

度,功能實現更加靈活。

 

2、ChannelFuture

Netty 為異步非阻塞,即所有的 I/O 操作都為異步的,因此,我們不能立刻得知消息是否已經被處理了。 Netty 提供了 ChannelFuture 接口,通過該接口的 addListener() 方法注冊一個 

ChannelFutureListener,當操作執行成功或者失敗時,監聽就會自動觸發返回結果

 

3、EventLoop

Netty 基於事件驅動模型,使用不同的事件來通知我們狀態的改變或者操作狀態的改變。它定義了在整個連 

接的生命周期里當有事件發生的時候處理的核心抽象。Channel 為Netty 網絡操作抽象類,EventLoop 主 

要是為Channel 處理 I/O 操作,兩者配合參與 I/O 操作。 

下圖是Channel、EventLoop、Thread、EventLoopGroup之間的關系(摘自《Netty In 

Action》):

一個 EventLoopGroup 包含一個或多個 EventLoop。 

一個 EventLoop 在它的生命周期內只能與一個Thread綁定。 

所有EnventLoop 處理的 I/O 事件都將在它專有的 Thread 上被處理。 

一個 Channel 在它的生命周期內只能注冊與一個 EventLoop。 

一個 EventLoop 可被分配至一個或多個 Channel 。

當一個連接到達時,Netty 就會注冊一個 Channel,然后從 EventLoopGroup 中分配一個 EventLoop 

綁定到 這個Channel上,在該Channel的整個生命周期中都是有這個綁定的 EventLoop 來服務的。 

 

4、ChannelHandler

ChannelHandler 為 Netty 中最核心的組件,它充當了所有處理入站和出站數據的應用程序邏輯的容器。 

ChannelHandler 主要用來處理各種事件,這里的事件很廣泛,比如可以是連接、數據接收、異常、數據轉 換等。 

ChannelHandler 有兩個核心子類 ChannelInboundHandler 和 ChannelOutboundHandler,其中 

ChannelInboundHandler 用於接收、處理入站數據和事件,而 ChannelOutboundHandler 則相反。

5、ChannelPipeline

ChannelPipeline 為 ChannelHandler 鏈提供了一個容器並定義了用於沿着鏈傳播入站和出站事件流的 API。一個數據或者事件可能會被多個 Handler 處理,在這個過程中,數據或者事件經流 ChannelPipeline,由 ChannelHandler 處理。在這個處理過程中,一個 ChannelHandler 接收數據 后處理完成后交給下一個 ChannelHandler,或者什么都不做直接交給下一個 ChannelHandler。

6、Channel的生命周期

ChannelUnregistered Channel已經被創建,但是還未注冊到EventLoop; 

ChannelRegistered Channel已經被注冊到EventLoop; 

ChannelActive Channel處於活躍狀態(已經連接到遠程節點),可以進行接收和發送數據 

ChannelInactive Channel沒有連接到遠程節點 

 

 

三.Netty核心組件關系

 

04.Netty簡單通信

服務器端:

import io.netty.bootstrap.ServerBootstrap; 

import io.netty.channel.*; 

import io.netty.channel.nio.NioEventLoopGroup; 

import io.netty.channel.socket.SocketChannel; 

import io.netty.channel.socket.nio.

NioServerSocketChannel; 

public class Server { 

public static void main(String[] args) { 

int port = 9898; 

new Server().bind(port); 

}

public void bind(int port) { 

 

/** * interface EventLoopGroup extends EventExecutorGroup extends 

ScheduledExecutorService extends 

ExecutorService 

* 配置服務端的 NIO 線程池,用於網絡事件處理,

實質上他們就是 Reactor 線程組

* bossGroup 用於服務端接受客戶端連接,

workerGroup 用於進行 SocketChannel 網絡 讀寫*/ 

EventLoopGroup bossGroup = new NioEventLoopGroup(); 

EventLoopGroup workerGroup = new NioEventLoopGroup(); 

try {

/** ServerBootstrap 是 Netty 用於啟動 NIO 

服務端的輔助啟動類,用於降低開發 難度 * */ 

ServerBootstrap b = new ServerBootstrap(); 

b.group(bossGroup, workerGroup) 

.channel(NioServerSocketChannel.class) 

.option(ChannelOption.SO_BACKLOG, 1024) 

.childHandler(new ChildChannelHandler()); 

/**服務器啟動輔助類配置完成后,調用 bind 

方法綁定監聽端口,調用 sync 方法同步等待綁 

定操作完成*/ 

ChannelFuture f = b.bind(port).sync(); 

System.out.println("服務器開始監聽端口,

等待客戶端連接........."); 

/**下面會進行阻塞,等待服務器連接關閉之后 

main 方法退出,程序結束*/ 

f.channel().closeFuture().sync(); 

} catch (InterruptedException e) { 

e.printStackTrace(); 

} finally { 

/**優雅退出,釋放線程池資源*/ 

bossGroup.shutdownGracefully(); 

workerGroup.shutdownGracefully(); 

}

private class ChildChannelHandler extends ChannelInitializer<SocketChannel> 

@Override 

protected void initChannel(SocketChannel arg0) throws Exception { 

arg0.pipeline().addLast(new ServerHandler()); 

}

 

服務器處理類:

import io.netty.buffer.ByteBuf; 

import io.netty.buffer.Unpooled; 

import io.netty.channel.ChannelHandler; 

import io.netty.channel.ChannelHandlerAdapter; 

import io.netty.channel.ChannelHandlerContext; 

import io.netty.channel.ChannelInboundHandlerAdapter; 

 

/**ChannelInboundHandlerAdapter extends ChannelHandlerAdapter 用於對網絡事件進行讀寫 

操作*/ 

public class ServerHandler extends ChannelInboundHandlerAdapter { 

/** * 收到客戶端消息,自動觸發 

* @param ctx 

* @param msg 

* @throws Exception */ 

@Override 

public void channelRead(ChannelHandlerContext ctx, Object msg) throws 

Exception { 

/** 將 msg 轉為 Netty 的 ByteBuf 對象,類似 JDK 中的 java.nio.ByteBuffer,不 

過 ButeBuf 功能更強,更靈活 */ 

 

ByteBuf buf = (ByteBuf) msg; 

/**readableBytes:獲取緩沖區可讀字節數,

然后創建字節數組 * 從而避免了像 java.nio.ByteBuffer

 時,只能盲目的創建特定大小的字節數組比如 1024  */ 

byte[] reg = new byte[buf.readableBytes()]; 

/*readBytes:將緩沖區字節數組復制到新建的 byte

數組中然后將字節數組轉為字符串*/ 

buf.readBytes(reg); 

String body = new String(reg, "UTF-8"); 

System.out.println(Thread.currentThread().getName() +",The server 

receive order : " + body); 

/**回復消息 

* copiedBuffer:創建一個新的緩沖區,

內容為里面的參數 

* 通過 ChannelHandlerContext 的 write 方法將消息異步發送給客戶端 * */ 

String respMsg = "I am Server,

消息接收 success!"; 

ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes()); 

ctx.write(respByteBuf); 

}

@Override 

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception 

/**flush:將消息發送隊列中的消息

寫入到 SocketChannel 中發送給對方,為了頻繁的喚醒 

Selector 進行消息發送 

* Netty 的 write 方法並不直接將消息寫如 SocketChannel 中,調用 write 只是把待 

發送的消息放到發送緩存數組中, 

* 再通過調用 flush方法,將發送緩沖區的消息全部寫入到 SocketChannel中 * */ 

ctx.flush(); 

}

@Override 

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 

throws Exception { 

/**當發生異常時,關閉 ChannelHandlerContext,

釋放和它相關聯的句柄等資源 */ 

ctx.close(); 

}

客戶端:

import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup; 

import io.netty.channel.nio.NioEventLoopGroup; 

import io.netty.channel.socket.SocketChannel; 

import io.netty.channel.socket.nio.

NioSocketChannel; 

public class Client { 

 

 /** * 使用 3 個線程模擬三個客戶端 

* @param args */ 

 

public static void main(String[] args) { 

for (int i = 0; i < 3; i++) { 

new Thread(new MyThread()).start(); 

}

static class MyThread implements Runnable { 

@Override 

public void run() { 

connect("127.0.0.1", 9898); 

}

public void connect(String host, int port) { 

/**配置客戶端 NIO 線程組/池*/ 

EventLoopGroup group = 

new NioEventLoopGroup(); 

try {

/**Bootstrap 與 ServerBootstrap 

都繼承(extends)於 AbstractBootstrap

* 創建客戶端輔助啟動類,並對其配置,與服務器稍微不同,

這里的 Channel 設置為 

NioSocketChannel 

* 然后為其添加 Handler,這里直接使用匿名內部類,

實現 initChannel 方法 

* 作用是當創建 NioSocketChannel 成功后,

在進行初始化時,將它的 

ChannelHandler設置到ChannelPipeline中, 

* 用於處理網絡I/O事件*/ 

Bootstrap b = new Bootstrap(); 

b.group(group).channel(NioSocketChannel.class) 

.option(ChannelOption.TCP_NODELAY, true) 

.handler(new ChannelInitializer<

SocketChannel>() { 

@Override 

public void initChannel(SocketChannel ch) throws 

Exception { 

ch.pipeline().addLast(new ClientHandler()); 

}); 

/**connect:發起異步連接操作,

調用同步方法 sync 等待連接成功*/ 

ChannelFuture channelFuture = b.connect(host, port).sync(); 

System.out.println(Thread.currentThread().

getName() + ",客戶端發起 

異步連接.........."); 

/**等待客戶端鏈路關閉*/ 

channelFuture.channel().closeFuture().sync(); 

} catch (InterruptedException e) { 

e.printStackTrace(); 

} finally { 

/**優雅退出,釋放NIO線程組*/ 

group.shutdownGracefully();

    } 

   } 

 } 

}

import io.netty.buffer.ByteBuf; 

import io.netty.buffer.Unpooled; 

import io.netty.channel.ChannelHandlerContext; 

import io.netty.channel.ChannelInboundHandlerAdapter; 

import java.util.logging.Logger; 

/** 用於對網絡事件進行讀寫操作 */ 

public class ClientHandler extends ChannelInboundHandlerAdapter { 

// private static final Logger logger = 

Logger.getLogger(TimeClientHandler.class.getName()); 

/** 當客戶端和服務端 TCP 鏈路建立成功之后,

Netty 的 NIO 線程會調用 channelActive 方法 */ 

@Override 

public void channelActive(ChannelHandlerContext ctx) throws Exception { 

String reqMsg = "我是客戶端 " + Thread.currentThread().getName(); 

byte[] reqMsgByte = reqMsg.getBytes("UTF-8"); 

ByteBuf reqByteBuf = Unpooled.buffer(reqMsgByte.length); 

/** * writeBytes:將指定的源數組的數據傳輸到緩沖區 

* 調用 ChannelHandlerContext 的 

  writeAndFlush 方法將消息發送給服務器 */ 

reqByteBuf.writeBytes(reqMsgByte); 

ctx.writeAndFlush(reqByteBuf); 

}

/** * 當服務端返回應答消息時,channelRead 

方法被調用,從 Netty 的 ByteBuf 中

讀取並打印應 答消息*/ 

@Override 

public void channelRead(ChannelHandlerContext ctx, Object msg) throws 

Exception { 

ByteBuf buf = (ByteBuf) msg; 

byte[] req = new byte[buf.readableBytes()]; 

buf.readBytes(req); 

String body = new String(req, "UTF-8"); 

System.out.println(Thread.currentThread().getName() + ",Server return 

Message:" + body); 

ctx.close(); 

}

/** * 當發生異常時,打印異常 日志,釋放客戶端資源 */ 

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 

throws Exception { 

/**釋放資源*/ 

//logger.warning("Unexpected exception from downstream : " + 

cause.getMessage()); 

ctx.close(); 

}

 

365天干貨不斷,可以微信搜索「 猿燈塔」第一時間閱讀,回復【資料】【面試】【簡歷】有我准備的一線大廠面試資料和簡歷模板

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM