Netty Websocket協議開發
Netty基於HTTP協議棧開發了WebSocket 協議棧,利用Netty的WebSocket 協議棧可以方便開發WebSocket 客戶端和服務端
Netty Websocket 服務端
1 Netty Websocket 服務端啟動類
package com.webSocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class WebSocketServer {
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//websocket協議本身是基於Http協議的,所以需要Http解碼器
ch.pipeline().addLast("http-codec",new HttpServerCodec());
//以塊的方式來寫的處理器
ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
//websocket協議本身是基於http協議的,所以這邊也要使用http解編碼器
//netty是基於分段請求的,HttpObjectAggregator的作用是將請求分段再聚合,參數是聚合字節的最大長度
ch.pipeline().addLast(new HttpObjectAggregator(1024*1024*1024));
ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws",null,true,65535));
// ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
ch.pipeline().addLast(new WebSocketServerHandler());
}
});
ChannelFuture sync = bootstrap.bind(port).sync();
sync.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new WebSocketServer().bind(9090);
}
}
2 Netty Websocket 服務端啟動處理類
package com.webSocket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.*;
public class WebSocketServerHandler extends ChannelInboundHandlerAdapter {
private WebSocketServerHandshaker handshaker;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
//以http請求形式接入,但是走的是websocket
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
//處理websocket客戶端的消息
handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
public void channelReadComplete(ChannelHandlerContext context) throws Exception
{
context.flush();
}
/*
http
*/
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
WebSocketServerHandshakerFactory webFactory=new WebSocketServerHandshakerFactory("ws://127.0.0.1:9090",null,false);
if(req instanceof CloseWebSocketFrame)
{
handshaker.close(ctx.channel(),(CloseWebSocketFrame) req.retain());
if(handshaker==null)
{
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
}
else
{
handshaker.handshake(ctx.channel(),req);
}
}
}
/*
webSocket
*/
private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame)
{
handshaker.close(ctx.channel(),(CloseWebSocketFrame)frame.retain());
return;
}
if(frame instanceof TextWebSocketFrame)
{
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if(!(frame instanceof TextWebSocketFrame))
{
}
String request=((TextWebSocketFrame) frame).text();
ctx.channel().write(request+"歡迎Netty WebSocket");
}
}
Netty Websocket 客戶端
1 Netty Websocket 客戶端啟動類
package com.webSocket;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import java.net.URI;
public class WebSocketClients {
public static void main(String[] args) throws Exception {
//netty基本操作,線程組
EventLoopGroup group = new NioEventLoopGroup();
//netty基本操作,啟動類
Bootstrap boot = new Bootstrap();
boot.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.group(group)
.handler(new LoggingHandler(LogLevel.INFO))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelHandler[]{new HttpClientCodec(),
new HttpObjectAggregator(1024*1024*10)});
//websocket協議本身是基於Http協議的,所以需要Http解碼器
ch.pipeline().addLast("http-codec",new HttpServerCodec());
//以塊的方式來寫的處理器
ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
// ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
ch.pipeline().addLast("WebSocketClientHandler",new WebSocketClientHandler());
}
});
//websocke連接的地址,/hello是因為在服務端的websockethandler設置的
URI websocketURI = new URI("ws://localhost:9090/ws");
HttpHeaders httpHeaders = new DefaultHttpHeaders();
//進行握手
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, (String) null, true, httpHeaders);
//客戶端與服務端連接的通道,final修飾表示只會有一個
final Channel channel = boot.connect(websocketURI.getHost(), websocketURI.getPort()).sync().channel();
WebSocketClientHandler handler = (WebSocketClientHandler) channel.pipeline().get("WebSocketClientHandler");
handler.setHandshaker(handshaker);
handshaker.handshake(channel);
//阻塞等待是否握手成功
handler.handshakeFuture().sync();
System.out.println("握手成功");
//給服務端發送的內容,如果客戶端與服務端連接成功后,可以多次掉用這個方法發送消息
sengMessage(channel);
}
public static void sengMessage(Channel channel){
//發送的內容,是一個文本格式的內容
String putMessage="你好,我是客戶端";
TextWebSocketFrame frame = new TextWebSocketFrame(putMessage);
channel.writeAndFlush(frame).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("消息發送成功,發送的消息是:"+putMessage);
} else {
System.out.println("消息發送失敗 " + channelFuture.cause().getMessage());
}
}
});
}
}
2 Netty Websocket 客戶端啟動處理類
package com.webSocket;
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import io.netty.channel.SimpleChannelInboundHandler;
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
//握手的狀態信息
WebSocketClientHandshaker handshaker;
//netty自帶的異步處理
ChannelPromise handshakeFuture;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("當前握手的狀態"+this.handshaker.isHandshakeComplete());
Channel ch = ctx.channel();
FullHttpResponse response;
//進行握手操作
if (!this.handshaker.isHandshakeComplete()) {
try {
response = (FullHttpResponse)msg;
//握手協議返回,設置結束握手
this.handshaker.finishHandshake(ch, response);
//設置成功
this.handshakeFuture.setSuccess();
System.out.println("服務端的消息"+response.headers());
} catch (WebSocketHandshakeException var7) {
FullHttpResponse res = (FullHttpResponse)msg;
String errorMsg = String.format("握手失敗,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8));
this.handshakeFuture.setFailure(new Exception(errorMsg));
}
} else if (msg instanceof FullHttpResponse) {
response = (FullHttpResponse)msg;
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
} else {
//接收服務端的消息
WebSocketFrame frame = (WebSocketFrame)msg;
//文本信息
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame)frame;
System.out.println("客戶端接收的消息是:"+textFrame.text());
}
//二進制信息
if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame)frame;
System.out.println("BinaryWebSocketFrame");
}
//ping信息
if (frame instanceof PongWebSocketFrame) {
System.out.println("WebSocket Client pong");
}
//關閉消息
if (frame instanceof CloseWebSocketFrame) {
System.out.println("receive close frame");
ch.close();
}
}
}
/**
* Handler活躍狀態,表示連接成功
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("與服務端連接成功");
}
/**
* 非活躍狀態,沒有連接遠程主機的時候。
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("主機關閉");
}
/**
* 異常處理
* @param ctx
* @param cause
* @throws Exception
*/
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("連接異常:"+cause.getMessage());
ctx.close();
}
public void handlerAdded(ChannelHandlerContext ctx) {
this.handshakeFuture = ctx.newPromise();
}
public WebSocketClientHandshaker getHandshaker() {
return handshaker;
}
public void setHandshaker(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelPromise getHandshakeFuture() {
return handshakeFuture;
}
public void setHandshakeFuture(ChannelPromise handshakeFuture) {
this.handshakeFuture = handshakeFuture;
}
public ChannelFuture handshakeFuture() {
return this.handshakeFuture;
}
}
結果:
當前握手的狀態false
服務端的消息DefaultHttpHeaders[upgrade: websocket, connection: upgrade, sec-websocket-accept: MVirCoc2+74v5fT6LiEzbDXl7nM=, content-length: 0]
握手成功
消息發送成功,發送的消息是:你好,我是客戶端
當前握手的狀態true
WebSocket Client pong
