- Posted by 微博@Yangsc_o
- 原創文章,版權聲明:自由轉載-非商用-非衍生-保持署名 | Creative Commons BY-NC-ND 3.0
# 摘要
在前兩篇《快速理解Linux網絡I_O》、《java的I_O模型-BIO&NIO&AIO》兩邊中介紹了Linux下的I/O模型和java中的I/O模型,今天我們介紹Reactor模型,並探究Netty的實現
高性能服務器
在互聯網時代,我們使用的軟件基本上全是C/S架構,C/S架構的軟件一個明顯的好處就是:只要有網絡,你可以在任何地方干同一件事。C/S架構可以抽象為如下模型:
- C就是Client(客戶端),上面的B是Browser(瀏覽器)
- S就是Server(服務器):服務器管理某種資源,並且通過操作這種資源來為它的客戶端提供某種服務
那服務器如何能快速的處理用戶的請求呢?在我看來高性能服務器至少要滿足如下幾個需求:
- 效率高:既然是高性能,那處理客戶端請求的效率當然要很高了
- 高可用:不能隨便就掛掉了
- 編程簡單:基於此服務器進行業務開發需要足夠簡單
- 可擴展:可方便的擴展功能
- 可伸縮:可簡單的通過部署的方式進行容量的伸縮,也就是服務需要無狀態
而滿足如上需求的一個基礎就是高性能的IO!
Reactor模式
什么是Reactor模式?
兩種I/O多路復用模式:Reactor和Proactor,兩個與事件分離器有關的模式是Reactor和Proactor。Reactor模式采用同步IO,而Proactor采用異步IO。
在Reactor中,事件分離器負責等待文件描述符或socket為讀寫操作准備就緒,然后將就緒事件傳遞給對應的處理器,最后由處理器負責完成實際的讀寫工作。
在Proactor模式中,處理器--或者兼任處理器的事件分離器,只負責發起異步讀寫操作。IO操作本身由操作系統來完成。傳遞給操作系統的參數需要包括用戶定義的數據緩沖區地址和數據大小,操作系統才能從中得到寫出操作所需數據,或寫入從socket讀到的數據。事件分離器捕獲IO操作完成事件,然后將事件傳遞給對應處理器。
說人話的方式理解:
- reactor:能收了你跟俺說一聲。
- proactor: 你給我收十個字節,收好了跟俺說一聲。
Doug Lea是這樣類比的
- Reactor通過調度適當的處理程序來響應IO事件;
- 處理程序執行非阻塞操作
- 通過將處理程序綁定到事件來管理;
Reactor單線程模型設計
單線程版本Java NIO的支持:
-
Channels:與支持非阻塞讀取的文件,套接字等的連接
-
Buffers:類似於數組的對象,可由Channels直接讀取或寫入
-
Selectors:通知一組通道中哪一個有IO事件
-
SelectionKeys:維護IO事件狀態和綁定
-
Reactor 代碼如下
public class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocketChannel;
public Reactor(int port) throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
key.attach(new Acceptor());
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey selectionKey : selectionKeys) {
dispatch(selectionKey);
}
selectionKeys.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void dispatch(SelectionKey selectionKey) {
Runnable run = (Runnable) selectionKey.attachment();
if (run != null) {
run.run();
}
}
class Acceptor implements Runnable {
@Override
public void run() {
try {
SocketChannel channel = serverSocketChannel.accept();
if (channel != null) {
new Handler(selector, channel);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new Thread(
new Reactor(1234)
).start();
}
}
- Handler代碼如下:
public class Handler implements Runnable{
private final static int DEFAULT_SIZE = 1024;
private final SocketChannel socketChannel;
private final SelectionKey seletionKey;
private static final int READING = 0;
private static final int SENDING = 1;
private int state = READING;
ByteBuffer inputBuffer = ByteBuffer.allocate(DEFAULT_SIZE);
ByteBuffer outputBuffer = ByteBuffer.allocate(DEFAULT_SIZE);
public Handler(Selector selector, SocketChannel channel) throws IOException {
this.socketChannel = channel;
socketChannel.configureBlocking(false);
this.seletionKey = socketChannel.register(selector, 0);
seletionKey.attach(this);
seletionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
@Override
public void run() {
if (state == READING) {
read();
} else if (state == SENDING) {
write();
}
}
private void write() {
try {
socketChannel.write(outputBuffer);
} catch (IOException e) {
e.printStackTrace();
}
while (outIsComplete()) {
seletionKey.cancel();
}
}
private void read() {
try {
socketChannel.read(inputBuffer);
if (inputIsComplete()) {
process();
System.out.println("接收到來自客戶端(" + socketChannel.socket().getInetAddress().getHostAddress()
+ ")的消息:" + new String(inputBuffer.array()));
seletionKey.attach(new Sender());
seletionKey.interestOps(SelectionKey.OP_WRITE);
seletionKey.selector().wakeup();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public boolean inputIsComplete() {
return true;
}
public boolean outIsComplete() {
return true;
}
public void process() {
// do something...
}
class Sender implements Runnable {
@Override
public void run() {
try {
socketChannel.write(outputBuffer);
} catch (IOException e) {
e.printStackTrace();
}
if (outIsComplete()) {
seletionKey.cancel();
}
}
}
}
這個模型和上面的NIO流程很類似,只是將消息相關處理獨立到了Handler中去了!雖然說到NIO一個線程就可以支持所有的IO處理。但是瓶頸也是顯而易見的!如果這個客戶端多次進行請求,如果在Handler中的處理速度較慢,那么后續的客戶端請求都會被積壓,導致響應變慢!所以引入了Reactor多線程模型!
Reactor多線程模型設計
Reactor多線程模型就是將Handler中的IO操作和非IO操作分開,操作IO的線程稱為IO線程,非IO操作的線程稱為工作線程!這樣的話,客戶端的請求會直接被丟到線程池中,客戶端發送請求就不會堵塞!
Reactor保持不變,僅需要改動Handler代碼:
public class Handler implements Runnable{
private final static int DEFAULT_SIZE = 1024;
private final SocketChannel socketChannel;
private final SelectionKey seletionKey;
private static final int READING = 0;
private static final int SENDING = 1;
private int state = READING;
ByteBuffer inputBuffer = ByteBuffer.allocate(DEFAULT_SIZE);
ByteBuffer outputBuffer = ByteBuffer.allocate(DEFAULT_SIZE);
private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime()
.availableProcessors());
private static final int PROCESSING = 3;
private Selector selector;
public Handler(Selector selector, SocketChannel channel) throws IOException {
this.selector = selector;
this.socketChannel = channel;
socketChannel.configureBlocking(false);
this.seletionKey = socketChannel.register(selector, 0);
seletionKey.attach(this);
seletionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
@Override
public void run() {
if (state == READING) {
read();
} else if (state == SENDING) {
write();
}
}
private void write() {
try {
socketChannel.write(outputBuffer);
} catch (IOException e) {
e.printStackTrace();
}
while (outIsComplete()) {
seletionKey.cancel();
}
}
private void read() {
try {
socketChannel.read(inputBuffer);
if (inputIsComplete()) {
process();
executorService.execute(new Processer());
}
} catch (IOException e) {
e.printStackTrace();
}
}
public boolean inputIsComplete() {
return true;
}
public boolean outIsComplete() {
return true;
}
public void process() {
// do something...
}
class Sender implements Runnable {
@Override
public void run() {
try {
socketChannel.write(outputBuffer);
} catch (IOException e) {
e.printStackTrace();
}
if (outIsComplete()) {
seletionKey.cancel();
}
}
}
synchronized void processAndHandOff() {
process();
// or rebind attachment
state = SENDING;
seletionKey.interestOps(SelectionKey.OP_WRITE);
selector.wakeup();
}
class Processer implements Runnable {
@Override
public void run() {
processAndHandOff();
}
}
}
主從Reactor多線程模型設計
主從Reactor多線程模型是將Reactor分成兩部分,mainReactor負責監聽server socket,accept新連接,並將建立的socket分派給subReactor。subReactor負責多路分離已連接的socket,讀寫網絡數據,對業務處理功能,其扔給worker線程池完成。通常,subReactor個數上可與CPU個數等同:
Handler保持不變,僅需要改動Reactor代碼:
public class Reactor {
// also create threads
Selector[] selectors;
AtomicInteger next = new AtomicInteger(0);
final ServerSocketChannel serverSocketChannel;
private static ExecutorService sunReactors = Executors.newFixedThreadPool(Runtime.getRuntime()
.availableProcessors());
private static final int PROCESSING = 3;
public Reactor(int port) throws IOException {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
selectors = new Selector[4];
for (int i = 0; i < selectors.length; i++) {
Selector selector = selectors[i];
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
key.attach(new Acceptor());
new Thread(()->{
while (!Thread.interrupted()) {
try {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey selectionKey : selectionKeys) {
dispatch(selectionKey);
}
selectionKeys.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
private void dispatch(SelectionKey selectionKey) {
Runnable run = (Runnable) selectionKey.attachment();
if (run != null) {
run.run();
}
}
class Acceptor implements Runnable {
@Override
public void run() {
try {
SocketChannel channel = serverSocketChannel.accept();
if (channel != null) {
sunReactors.execute(new Handler(selectors[next.getAndIncrement() % selectors.length], channel));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new Reactor(1234);
}
}
以上是三種不同的設計思路,接下來看一下Netty這個一個高性能NIO框架,其是如何實現Reactor模型的!
Netty Reactor模型設計
- 看一個最簡單的Netty服務端代碼
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(serverHandler);
}
});
ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- Netty Server Handler
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
我們從Netty服務器代碼來看,與Reactor模型進行對應!
- EventLoopGroup就相當於是Reactor,bossGroup對應主Reactor,workerGroup對應從Reactor
- TimeServerHandler就是Handler
- child開頭的方法配置的是客戶端channel,非child開頭的方法配置的是服務端channel
當然Netty的線程模型並不是固定的,它支持Reactor單線程模型、Reactor多線程模型、主從模型,上面的例子是一個主從模型的,下面進行詳細的分析,如圖所示:
服務啟動時,創建了兩個EventLoopGroup,它們實際上是兩個Reactor線程池,一個用於接收TCP連接、一個用於處理I/O相關的讀寫操作、或者執行系統task、定時task等;
- Netty用於接收客戶端請求連接池職責如下:
- 接收客戶端請求並初始化channel參數;
- 講鏈路變更事件通知給ChannelPipiline;
- Netty用於處理I/O連接池職責如下:
- 異步讀取通信對端的數據報,發送讀事件到ChannelPipiline;
- 異步發送消息對端的數據報,調用ChannelPipiline的消息發送接口;
- 執行系統調用task;
- 執行系統定時任務task,例如鏈路空閑狀態檢測定時任務;
參考
《Netty 權威指南》第二版 -- 李林峰