如果你有跟進Web技術的最新進展,你很可能就遇到過“實時Web”這個短語,這里並不是指所謂的硬實時服務質量(QoS),硬實時服務質量是保證計算結果將在指定的時間間隔內被遞交。僅HTTP的請求/響應模式設計就使得其很難被支持。
實時Web利用技術和實踐,使用戶在信息的作者發布信息之后就能夠立即收到信息,而不需要他們或者他們的軟件周期性地檢查信息源以及獲取更新。
1、WebSocket簡介
WebSocket協議是完全重新設計的協議,旨在為Web上的雙向數據傳輸問題提供一個切實可行的解決方案,使得客戶端和服務器之間可以在任意時刻傳輸信息,因此,這也就要求他們異步地處理消息回執(作為HTML5客戶端API的一部分,大部分最新的瀏覽器都已經支持了WebSocket)
Netty對於WebSocket的支持包含了所有正在使用中的主要實現,因此在你的下一個應用程序中采用它將是簡單直接的。和往常使用Netty一樣,你可以完全使用該協議,而無需關心它內部的實現細節,我們將通過創建一個基於WbeSocket的實時聊天應用程序來演示。
2、WebSocket示例應用程序
為了讓示例應用程序展示它的實時功能,我們將通過使用WebSocket協議來實現一個基於瀏覽器的聊天應用程序,就像你可能在FaceBook的文本消息功能中見到過的那樣。我們將通過使用多個用戶之間可以同時進行相互通信,從而更進一步。
下圖說明應用邏輯:
——客戶端發送一個消息
——該消息將被廣播到所有其他鏈接的客戶端 這正如你可能會預期的一個聊天室應當的工作方式:所有的人都可以和其他的人聊天。在示例中,我們將只實現服務器端,而客戶端則是通過Web頁面訪問該聊天室的瀏覽器。正如同你將在接下來的幾頁中所看到的,WebSocket簡化了編寫這樣的服務器的過程。
3、添加WebSocket支持
在從標准的HTTP或者HTTPS協議切換到WebSocket時,將會使用一種稱為升級握手的機制。因此,使用WebSocket的應用程序將始終以HTTP/S作為開始,然后再執行升級。這個升級動作發生的確切時刻特定於應用程序;他可能會發生在啟動時,也可能會發生在請求了某個特定的URL之后。
我們的應用程序將采用下面的約定:如果被請求的URL以/ws結尾,那么我們將會把該協議升級為WebSocket;否則,服務器將使用基本的HTTP/S。在連接已經升級完成之后,所有數據都將會使用WebSocket進行傳輸。下圖說明了該服務器邏輯,一如在Netty中一樣,它由一組ChannelHandler實現。
4、處理HTTP請求
首先,我們將實現該處理HTTP請求的組件。這個組件將提供用於訪問聊天室並顯示由連接的客戶端發送的消息的網頁。如下代碼給出了這個HttpRequestHandler對應的代碼,其擴展了SimpleChannelInboundHandler以處理FullHttpRequest消息。需要注意是,channelRead0()方法的實現是如何轉發任何目標URI為/ws的請求的。
//擴展SimpleChannelInboundHandler以處理FullHttpReuqest消息 public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest>{ private final String wsUri; private static final File INDEX; static { URL location = HttpRequestHandler.class.getProtectionDomain() .getCodeSource().getLocation(); try { String path = location.toURI() + "index.html"; path = !path.contains("file:") ? path : path.substring(5); INDEX = new File(path); }catch (URISyntaxException e){ throw new IllegalStateException("Unable to locate index.html",e); } } public HttpRequestHandler(String wsUri){ this.wsUri = wsUri; } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { //如果請求了WebSocket協議升級,則增加引用技術,並將它傳遞給下一個ChannelInboundHandler if (wsUri.equalsIgnoreCase(request.getUri())){ ctx.fireChannelRead(request.retain()); } else { //處理100Continue請求以符合HTTP1.1規范 if (HttpHeaders.is100ContinueExpected(request)){ send100Continue(ctx); } //讀取“index.html” RandomAccessFile file = new RandomAccessFile(INDEX,"r"); HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(),HttpResponseStatus.OK); response.headers().set(HttpHeaders.Names.CONTENT_TYPE,"text/html;charset=UTF-8"); boolean keepAlive = HttpHeaders.isKeepAlive(request); //如果請求了keep-alive,則添加所需要的HTTP頭信息 if (keepAlive){ response.headers().set(HttpHeaders.Names.CONTENT_LENGTH,file.length()); response.headers().set(HttpHeaders.Names.CONNECTION,HttpHeaders.Values.KEEP_ALIVE); } //將HttpResponse寫到客戶端 ctx.write(response); //將index.html寫到客戶端 if (ctx.pipeline().get(SslHandler.class) == null){ ctx.write(new DefaultFileRegion(file.getChannel(),0,file.length())); } else { ctx.write(new ChunkedNioFile(file.getChannel())); } //寫LastHttpContent並沖刷至客戶端 ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); if (!keepAlive){ //如果沒有請求keep-alive,則在寫操作完成后關閉Channel future.addListener(ChannelFutureListener.CLOSE); } } } private static void send100Continue(ChannelHandlerContext ctx){ FullHttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
如果該HTTP請求指向了地址為/ws的URI,那么HttpRequestHandler將調用FullHttpRequest對象上的retain()方法。並通過調用fireChannelRead(msg)方法將它轉發給下一個ChannelInboundHandler。之所以需要調用retain()方法,是因為調用channelRead()方法完成之后,它將調用FullHttpRequest對象上的release()方法以釋放它的資源。
如果客戶端發送了HTTP1.1的HTTP頭信息Expect:100-continue,那么HttpRequestHandler將會發送一個100Continue響應。在該HTTP頭信息被設置之后,HttpRequestHandler將會寫回一個HttpResponse給客戶端。這不是一個FullHttpResponse,因為它只是響應的第一部分。此外,這里也不會調用writeAndFlush()方法,在結束的時候才會調用。
如果不需要加密和壓縮,那么可以通過將index.html的內容存儲到DefaultFileRegion中來達到最佳效率。這將會利用零拷貝特性來進行內容的傳輸。為此,你可以檢查一下,是否有SslHandler存在於在ChannelPipeline中。否則,你可以使用ChunkedNioFile。
HttpRequestHandler將寫一個LastHttpContent來標記響應的結束。如果沒有請求keep-alive,那么HttpRequestHandler將會添加一個ChannelFutureListener到最后一次寫出動作的ChannelFuture,並關閉該連接。在這里,你將調用writeAndFlush()方法以沖刷所有之前寫入的消息。
這部分代碼代表了聊天服務器的第一個部分,它管理純粹的HTTP請求和響應。接下來,我們將處理傳輸實際聊天消息的WebSocket幀。
WEBSOCKET幀:WebSocket以幀的方式傳輸數據,每一幀代表消息的一部分。一個完整的消息可能會包含許多幀。
5、處理WebSocket幀
有IETF發布的WebSocket RFC,定義了6種幀,Netty為它們都提供了一個POJO實現。
BinaryWebSocketFrame——包含了二進制數據
TextWebSocketFrame——包含了文本數據
ContinuationWebSocketFrame——包含屬於上一個BinaryWebSocketFrame或TextWebSocketFrame的文本數據或者二進制數據
CloseWebSocketFrame——表示一個CLOSE請求,包含一個關閉的狀態碼和關閉的原因
PingWebSocketFrame——請求傳輸一個PongWebSocketFrame
PongWebSocketFrame——作為一個對於PingWebSocketFrame的響應被發送
TextWebSocketFrame是我們唯一真正需要處理的幀類型。為了符合WebSocket RFC,Netty提供了WebSocketServerProtocolHandler來處理其他類型的幀。
以下代碼展示了我們用於處理TextWebSocketFrame的ChannelInboundHandler,其還將在它的ChannelGroup中跟蹤所有活動的WebSocket連接。
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ private final ChannelGroup group; public TextWebSocketFrameHandler(ChannelGroup group){ this.group = group; } //重寫userEventTriggered方法以處理自定義事件 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE){ //如果該事件表示握手成功,則從該ChannelPipeline中移除HttpRequestHandler,因為將不會接收到任何HTTP消息了 ctx.pipeline().remove(HttpRequestHandler.class); //通知所有已經連接的WebSocket客戶端新的客戶端連接上了 group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined")); //將新的WebSocket Channel添加到ChannelGroup中,以便它可以接收到所有的消息 group.add(ctx.channel()); } else { super.userEventTriggered(ctx,evt); } } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame msg) throws Exception { //增加消息的引用計數,並將它寫到ChannelGroup中所有已經連接的客戶端 group.writeAndFlush(msg.retain()); } }
TextWebSocketFrameHandler只有一組非常少量的責任。當和新客戶端的WebSocket握手成功完成之后,它將通過把通知消息寫到ChannelGroup中的所有Channel來通知所有已經連接的客戶端,然后它將把這個新Channel加入到該ChannelGroup中。
如果接收到了TextWebSocketFrame消息,TextWebSocketFrameHandler將調用TextWebSocketFrame消息上的retain()方法,並使用writeAndFlush()方法來將它傳輸給ChannelGroup,以便所有已經連接的WebSocket Channel都將接收到它。
和之前一樣,對於retain()方法的調用時必需的。因為當ChannelRead0()方法返回時,TextWebSocketFrame的引用技術將會被減少。由於所有的操作都是異步的,因此,writeAndFlush()方法可能會在channelRead0()方法返回之后完成,而且它絕對不能訪問一個已經失效的引用。
因為Netty在內部處理了大部分剩下的功能,所有現在剩下唯一需要做的事情就是為每個新創建的Channel初始化其ChannelPipeline。為此,我們需要一個ChannelInitializer。
6、初始化ChannelPipeline
以下代碼展示了生成的ChatServerInitializer。
public class ChatServerInitializer extends ChannelInitializer<Channel>{ private final ChannelGroup group; public ChatServerInitializer(ChannelGroup group) { this.group = group; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpObjectAggregator(64 * 1024)); pipeline.addLast(new HttpRequestHandler("/ws")); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); pipeline.addLast(new TextWebSocketFrameHandler(group)); } }
對於initChannel()方法調用,通過安裝所有必須的ChannelHandler來設置該新注冊的Channel的ChannelPipeline。
Netty的WebSocketServerProtocolHandler處理了所有委托管理的WebSocket幀類型以及升級握手本身。如果握手成功,那么所需的ChannelHandler將會被添加到ChannelPipeline中,而那些不再需要的ChannelHandler則將會被移除。
WebSocket協議升級之前的ChannelPipeline的狀態如下圖,這代表了剛剛被ChatServerInitializer初始化之后的ChannelPipeline。 當WebSocket協議升級完成之后,WebSocketServerProtocolHandler將會把HttpRequestDecoder替換為WebSocketFrameDecoder,把HttpResponseEncoder替換為WebSocketFrameEncoder。為了性能最大化,它將移除任何不再被WebSocket連接所需要的ChannelHandler。這也包括上圖所示的HttpObjectAggregator和HttpRequestHandler。
下圖展示了這些操作完成之后的ChannelPipeline。需要注意的是,Netty目前支持4個版本的WebSocket協議,他們每個都具有自己的實現類。Netty將會根據客戶端(這里指瀏覽器)所支持的版本,自動地選擇正確版本的WebSocketFrameDecoder和WebSocketFrameEncoder。
7、引導
這幅拼圖最后的一部分是引導該服務器,並安裝ChatSererInitializer的代碼。這將有ChatServer類處理,如下代碼所示。
public class ChatServer { //創建DefaultChannelGroup,其將保存所有已經連接的WebSocket Channel private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); private final EventLoopGroup group = new NioEventLoopGroup(); private Channel channel; public ChannelFuture start(InetSocketAddress address){ //引導服務器 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group) .channel(NioServerSocketChannel.class) .childHandler(createInitializer(channelGroup)); ChannelFuture future = bootstrap.bind(address); future.syncUninterruptibly(); channel = future.channel(); return future; } //創建ChatServerInitializer protected ChannelInitializer<Channel> createInitializer(ChannelGroup group){ return new ChatServerInitializer(group); } //處理服務器關閉,並釋放所有的資源 public void destroy(){ if (channel != null){ channel.close(); } channelGroup.close(); group.shutdownGracefully(); } public static void main(String[] args) throws Exception{ if (args.length != 1){ System.out.println("Please give port as argument"); System.exit(1); } int port = Integer.parseInt(args[0]); final ChatServer endpoint = new ChatServer(); ChannelFuture future = endpoint.start( new InetSocketAddress(port)); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { endpoint.destroy(); } }); future.channel().closeFuture().syncUninterruptibly(); } }
8、如何進行加密
在真實世界的場景中,你將很快就會被要求向該服務器添加加密。使用Netty,這不過是將一個SslHandler添加到ChannelPipeline中,並配置它的問題。以下代碼展示了如何通過擴展我們的ChatServerInitializer來創建一個SecureChatServerInitializer以完成需求。
//擴展ChatServerInitializer以添加加密 public class SecureChatServerInitializer extends ChatServerInitializer{ private final SslContext context; public SecureChatServerInitializer(ChannelGroup group,SslContext context) { super(group); this.context = context; } @Override protected void initChannel(Channel channel) throws Exception { //調用父類的initChannel()方法 super.initChannel(channel); SSLEngine engine = context.newEngine(channel.alloc()); engine.setUseClientMode(false); //將SslHandler添加到ChannelPipeline中 channel.pipeline().addFirst(new SslHandler(engine)); } }
最后一步是調整ChatServer以使用SecureChatServerInitializer,以便在ChannelPipeline中安裝SslHandler。
public class SecureChatServer extends ChatServer{ private final SslContext context; public SecureChatServer(SslContext context) { this.context = context; } @Override protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) { //返回之前創建的SecureChatServerInitializer以啟用加密 return new SecureChatServerInitializer(group,context); } public static void main(String[] args) throws Exception{ if (args.length != 1){ System.out.println("Please give port as argument"); System.exit(1); } int port = Integer.parseInt(args[0]); SelfSignedCertificate cert = new SelfSignedCertificate(); SslContext context = SslContext.newServerContext(cert.certificate(),cert.privateKey()); final SecureChatServer endpoint = new SecureChatServer(context); ChannelFuture future = endpoint.start( new InetSocketAddress(port)); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { endpoint.destroy(); } }); future.channel().closeFuture().syncUninterruptibly(); } }
這就是為所有的通信啟用SSL/TLS加密需要做的全部。
