精通並發與 Netty
Netty 是一個異步的,事件驅動的網絡通信框架,用於高性能的基於協議的客戶端和服務端的開發。
異步指的是會立即返回,並不知道到底發送過去沒有,成功沒有,一般都會使用監聽器來監聽返回。
事件驅動是指開發者只需要關注事件對應的回調方法即可,比如 channel active,inactive,read 等等。
網絡通信框架就不用解釋了,很多你非常熟悉的組件都使用了 netty,比如 spark,dubbo 等等。
初步了解 Netty
第一個簡單的例子,使用 Netty 實現一個 http 服務器,客戶端調用一個沒有參數的方法,服務端返回一個 hello world。
Netty 里面大量的代碼都是對線程的處理和 IO 的異步的操作。
package com.paul;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Server {
public static void main(String[] args) throws InterruptedException {
//定義兩個線程組,事件循環組,可以類比與 Tomcat 就是死循環,不斷接收客戶端的連接
// boss 線程組不斷從客戶端接受連接,但不處理,由 worker 線程組對連接進行真正的處理
// 一個線程組其實也能完成,推薦使用兩個
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 服務端啟動器,可以輕松的啟動服務端的 channel
ServerBootstrap serverBootstrap = new ServerBootstrap();
//group 方法有兩個,一個接收一個參數,另一個接收兩個參數
// childhandler 是我們自己寫的請求處理器
serverBootstrap.group(bossGroup, workerGroup).channel(NioSocketChannel.class)
.childHandler(new ServerInitializer());
//綁定端口
ChannelFuture future = serverBootstrap.bind(8011).sync();
//channel 關閉的監聽
future.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.paul;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//管道,管道里面可以有很多 handler,一層層過濾的柑橘
ChannelPipeline pipeline = socketChannel.pipeline();
//HttpServerCodec 是 HttpRequestDecoder 和 HttpReponseEncoder 的組合,編碼和解碼的 h handler
pipeline.addLast("httpServerCodec", new HttpServerCodec());
pipeline.addLast("handler", new ServerHandler());
}
}
package com.paul;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
public class ServerHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
if(httpObject instanceof HttpRequest) {
ByteBuf content = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
//單純的調用 write 只會放到緩存區,不會真的發送
channelHandlerContext.writeAndFlush(response);
}
}
}
我們在 SimpleChannelInboundHandler 里分析一下,先看它繼承的 ChannelInboundHandlerAdapter 里面的事件回調方法,包括通道注冊,解除注冊,Active,InActive等等。
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
執行順序為 handler added->channel registered->channel active->channelRead0->channel inactive->channel unregistered。
Netty 本身並不是遵循 servlet 規范的。Http 是基於請求和響應的無狀態協議。Http 1.1 是有 keep-alived 參數的,如果3秒沒有返回,則服務端主動關閉了解,Http 1.0 則是請求完成直接返回。
Netty 的連接會被一直保持,我們需要自己去處理這個功能。
在服務端發送完畢數據后,可以在服務端關閉 Channel。
ctx.channel.close();
Netty 能做什么
- 可以當作一個 http 服務器,但是他並沒有實現 servelt 規范。雖然 Tomcat 底層本身也使用 NIO,但是 Netty 本身的特點決定了它比 Tomcat 的吞吐量更高。相比於 SpringMVC 等框架,Netty 沒提供路由等功能,這也契合和 Netty 的設計思路,它更貼近底層。
- Socket 開發,也是應用最為廣泛的領域,底層傳輸的最基礎框架,RPC 框架底層多數采用 Netty。直接采用 Http 當然也可以,但是效率就低了很多了。
- 支持長連接的開發,消息推送,聊天,服務端向客戶端推送等等都會采用 WebSocket 協議,就是長連接。
Netty 對 Socket 的實現
對於 Http 編程來說,我們實現了服務端就可以了,客戶端完全可以使用瀏覽器或者 CURL 工具來充當。但是對於 Socket 編程來說,客戶端也得我們自己實現。
服務器端:
Server 類於上面 Http 服務器那個一樣,在 ServerInitoalizer 有一些變化
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//管道,管道里面可以有很多 handler,一層層過濾的柑橘
ChannelPipeline pipeline = socketChannel.pipeline();
// TCP 粘包 拆包
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
pipeline.addLast(new LengthFieldPrepender(4));
// 字符串編碼,解碼
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new ServerHandler());
}
}
public class ServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(ctx.channel().remoteAddress()+","+msg);
ctx.channel().writeAndFlush("from server:" + UUID.randomUUID());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客戶端:
public class Client {
public static void main(String[] args) throws InterruptedException {
//客戶端不需要兩個 group,只需要一個就夠了,直接連接服務端發送數據就可以了
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
//服務器端既可以使用 handler 也可以使用 childhandler, 客戶端一般使用 handler
//對於 服務端,handler 是針對 bossgroup的,childhandler 是針對 workergorup 的
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new ClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync();
channelFuture.channel().closeFuture().sync();
}finally {
eventLoopGroup.shutdownGracefully();
}
}
}
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//管道,管道里面可以有很多 handler,一層層過濾的柑橘
ChannelPipeline pipeline = socketChannel.pipeline();
// TCP 粘包 拆包
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
pipeline.addLast(new LengthFieldPrepender(4));
// 字符串編碼,解碼
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new ClientHandler());
}
}
public class ClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(ctx.channel().remoteAddress()+","+msg);
System.out.println("client output:"+ msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().writeAndFlush("23123");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Netty 長連接實現一個聊天室
Server 端:
public class ServerHandler extends SimpleChannelInboundHandler<String> {
//定義 channel group 來管理所有 channel
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[服務器]-" + channel.remoteAddress() + "加入\n");
channelGroup.add(channel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[服務器]-" + channel.remoteAddress() + "離開\n");
//這個 channel 會被自動從 channelGroup 里移除
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "上線");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "離開");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Client 端:
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for(;;){
channel.writeAndFlush(br.readLine() + "\r\n");
}
Netty 心跳
集群之間各個節點的通信,主從節點之間需要進行數據同步,每當主節點的數據發生變化時,通過異步的方式將數據同步到從節點,同步方式可以用日志等等,因此主從節點之間不是實時一致性而是最終一致性。
節點與節點之間如何進行通信那?這種主從模式是需要互相之間有長連接的,這樣來確定對方還活着,實現方式是互相之間定時發送心跳數據包。如果發送幾次后對方還是沒有響應的話,就可以認為對方已經掛掉了。
回到客戶端與服務端的模式,有人可能會想,客戶端斷開連接后服務端的 handlerRemoved 等方法不是能感知嗎?還要心跳干什么哪?
真實情況其實非常復雜,比如手機客戶端和服務端進行一個長連接,客戶端沒有退出應用,客戶端開了飛行模型,或者強制關機,此時雙方是感知不到連接已經斷掉了,或者說需要非常長的時間才能感知到,這是我們不想看到的,這時就需要心跳了。
來看一個示例:
其他的代碼還是和上面的一樣,我們就不列出來了,直接進入主題,看不同的地方:
服務端
// Netty 為了支持心跳的 IdleStateHandler,空閑狀態監測處理器。
pipeline.addLast(new IdleStateHandler(5,7,10,TimeUnit.SECONDS));
來看看 IdleStateHandler 的說明
/*
* Triggers an IdleStateEvent when a Channel has not performed read, write, or both
* operation for a while
* 當一個 channel 一斷時間沒有進行 read,write 就觸發一個 IdleStateEvent
*/
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
//三個參數分別為多長時間沒進行讀,寫或者讀寫操作則觸發 event。
}
觸發 event 后我們編寫這個 event 對應的處理器。
public class MyHandler extends ChannelInboundHandlerAdapter{
//觸發某個事件后這個方法就會被調用
//一個 channelhandlerContext 上下文對象,另一個是事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
if(evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
String eventType = null;
switch(event.state()){
case READER_IDLE:
eventType = "讀空閑";
case WRITER_IDLE:
eventType = "寫空閑";
case ALL_IDLE:
eventType = "讀寫空閑";
}
}else{
//繼續將事件向下一個 handler 傳遞
ctx.
}
}
}
WebSocket 實現與原理分析
WebSocket 是一種規范,是 HTML5 規范的一部分,主要是解決 Http 協議本身存在的問題。可以實現瀏覽器和服務端的長連接,連接頭信息只在建立連接時發送一次。是在 Http 協議之上構建的,比如請求連接其實是一個 Http 請求,只不過里面加了一些 WebSocket 信息。也可以用在非瀏覽器場合,比如 app 上。
Http 是一種無狀態的基於請求和響應的協議,意思是一定是客戶端想服務端發送一個請求,服務端給客戶端一個響應。Http 1.0 在服務端給客戶端響應后連接就斷了。Http 1.1 增加可 keep-alive,服務端可以和客戶端在短時間之內保持一個連接,某個事件之內服務端和客戶端可以復用這個鏈接。在這種情況下,網頁聊天就是實現不了的,服務端的數據推送是無法實現的。
以前有一些假的長連接技術,比如輪詢,缺點和明顯,這里就不細說了。
Http 2.0 實現了長連接,但是這不在我們討論范圍之內。
針對服務端,Tomcat 新版本,Spring,和 Netty 都實現了對 Websocket 的支持。
使用 Netty 對 WebSocket 的支持來實現長連接
其他的部分還是一樣的,先來看服務端的 WebSocketChannelInitializer。
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel>{
//需要支持 websocket,我們在 initChannel 是做一點改動
@Override
protected void initChannel(SocketChannel ch) throws Exception{
ChannelPipeline pipeline = ch.pipeline();
//因為 websocket 是基於 http 的,所以要加入 http 相應的編解碼器
pipeline.addLast(new HttpServerCodec());
//以塊的方式進行寫的處理器
pipeline.addLast(new ChunkedWriteHandler());
// 進行 http 聚合的處理器,將 HttpMessage 和 HttpContent 聚合到 FullHttpRequest 或者
// FullHttpResponse
//HttpObjectAggregator 在基於 netty 的 http 編程使用的非常多,粘包拆包。
pipeline.addLast(new HttpObjectAggregator(8192));
// 針對 websocket 的類,完成 websocket 構建的所有繁重工作,負責握手,以及心跳(close,ping,
// pong)的處理, websocket 通過 frame 幀來傳遞數據。
// BinaryWebSocketFrame,CloseWebSocketFrame,ContinuationWebSocketFrame,
// PingWebSocketFrame,PongWebSocketFrame,TextWebSocketFrame。
// /ws 是 context_path,websocket 協議標准,ws://server:port/context_path
pipeline.addLast(new WebSocketServerProcotolHandler("/ws"));
pipeline.addLast(new TextWebSocketFrameHandler());
}
}
// websocket 協議需要用幀來傳遞參數
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception{
System.out.println("收到消息:"+ msg.text());
ctx.channel().writeAndFlush(new TextWebSocketFrame("服務器返回"));
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
System.out.println("handlerAdded" + ctx.channel().id.asLongText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{
System.out.println("handlerRemoved" + ctx.channel().id.asLongText());
}
}
客戶端我們直接通過瀏覽器的原聲 JS 來寫
<script type="text/javascript">
var socket;
if(window.WebSocket){
socket = new WebSocket("ws://localhost:8899/ws");
socket.onmessage = function(event){
alert(event.data);
}
socket.onopen = function(event){
alert("連接開啟");
}
socket.onclose = function(event){
alert("連接關閉");
}
}else{
alert("瀏覽器不支持 WebSocket");
}
function send(message){
if(!window.WebSocket){
return;
}
if(socket.readyState == WebSocket.OPEN){
socket.send(message);
}
}
</script>
我們在瀏覽器中通過 F12 看看 Http 協議升級為 WebSocket 協議的過程。
如果想自己實現一個 RPC 框架,可以參考我的博客:傳送門-> RPC框架