Netty Websocket协议开发
Netty基于HTTP协议栈开发了WebSocket 协议栈,利用Netty的WebSocket 协议栈可以方便开发WebSocket 客户端和服务端
Netty Websocket 服务端
1 Netty Websocket 服务端启动类
package com.webSocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class WebSocketServer {
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//websocket协议本身是基于Http协议的,所以需要Http解码器
ch.pipeline().addLast("http-codec",new HttpServerCodec());
//以块的方式来写的处理器
ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
//websocket协议本身是基于http协议的,所以这边也要使用http解编码器
//netty是基于分段请求的,HttpObjectAggregator的作用是将请求分段再聚合,参数是聚合字节的最大长度
ch.pipeline().addLast(new HttpObjectAggregator(1024*1024*1024));
ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws",null,true,65535));
// ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
ch.pipeline().addLast(new WebSocketServerHandler());
}
});
ChannelFuture sync = bootstrap.bind(port).sync();
sync.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new WebSocketServer().bind(9090);
}
}
2 Netty Websocket 服务端启动处理类
package com.webSocket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.*;
public class WebSocketServerHandler extends ChannelInboundHandlerAdapter {
private WebSocketServerHandshaker handshaker;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
//以http请求形式接入,但是走的是websocket
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
//处理websocket客户端的消息
handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
public void channelReadComplete(ChannelHandlerContext context) throws Exception
{
context.flush();
}
/*
http
*/
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
WebSocketServerHandshakerFactory webFactory=new WebSocketServerHandshakerFactory("ws://127.0.0.1:9090",null,false);
if(req instanceof CloseWebSocketFrame)
{
handshaker.close(ctx.channel(),(CloseWebSocketFrame) req.retain());
if(handshaker==null)
{
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
}
else
{
handshaker.handshake(ctx.channel(),req);
}
}
}
/*
webSocket
*/
private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame)
{
handshaker.close(ctx.channel(),(CloseWebSocketFrame)frame.retain());
return;
}
if(frame instanceof TextWebSocketFrame)
{
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if(!(frame instanceof TextWebSocketFrame))
{
}
String request=((TextWebSocketFrame) frame).text();
ctx.channel().write(request+"欢迎Netty WebSocket");
}
}
Netty Websocket 客户端
1 Netty Websocket 客户端启动类
package com.webSocket;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import java.net.URI;
public class WebSocketClients {
public static void main(String[] args) throws Exception {
//netty基本操作,线程组
EventLoopGroup group = new NioEventLoopGroup();
//netty基本操作,启动类
Bootstrap boot = new Bootstrap();
boot.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.group(group)
.handler(new LoggingHandler(LogLevel.INFO))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelHandler[]{new HttpClientCodec(),
new HttpObjectAggregator(1024*1024*10)});
//websocket协议本身是基于Http协议的,所以需要Http解码器
ch.pipeline().addLast("http-codec",new HttpServerCodec());
//以块的方式来写的处理器
ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
// ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
ch.pipeline().addLast("WebSocketClientHandler",new WebSocketClientHandler());
}
});
//websocke连接的地址,/hello是因为在服务端的websockethandler设置的
URI websocketURI = new URI("ws://localhost:9090/ws");
HttpHeaders httpHeaders = new DefaultHttpHeaders();
//进行握手
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, (String) null, true, httpHeaders);
//客户端与服务端连接的通道,final修饰表示只会有一个
final Channel channel = boot.connect(websocketURI.getHost(), websocketURI.getPort()).sync().channel();
WebSocketClientHandler handler = (WebSocketClientHandler) channel.pipeline().get("WebSocketClientHandler");
handler.setHandshaker(handshaker);
handshaker.handshake(channel);
//阻塞等待是否握手成功
handler.handshakeFuture().sync();
System.out.println("握手成功");
//给服务端发送的内容,如果客户端与服务端连接成功后,可以多次掉用这个方法发送消息
sengMessage(channel);
}
public static void sengMessage(Channel channel){
//发送的内容,是一个文本格式的内容
String putMessage="你好,我是客户端";
TextWebSocketFrame frame = new TextWebSocketFrame(putMessage);
channel.writeAndFlush(frame).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("消息发送成功,发送的消息是:"+putMessage);
} else {
System.out.println("消息发送失败 " + channelFuture.cause().getMessage());
}
}
});
}
}
2 Netty Websocket 客户端启动处理类
package com.webSocket;
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import io.netty.channel.SimpleChannelInboundHandler;
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
//握手的状态信息
WebSocketClientHandshaker handshaker;
//netty自带的异步处理
ChannelPromise handshakeFuture;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("当前握手的状态"+this.handshaker.isHandshakeComplete());
Channel ch = ctx.channel();
FullHttpResponse response;
//进行握手操作
if (!this.handshaker.isHandshakeComplete()) {
try {
response = (FullHttpResponse)msg;
//握手协议返回,设置结束握手
this.handshaker.finishHandshake(ch, response);
//设置成功
this.handshakeFuture.setSuccess();
System.out.println("服务端的消息"+response.headers());
} catch (WebSocketHandshakeException var7) {
FullHttpResponse res = (FullHttpResponse)msg;
String errorMsg = String.format("握手失败,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8));
this.handshakeFuture.setFailure(new Exception(errorMsg));
}
} else if (msg instanceof FullHttpResponse) {
response = (FullHttpResponse)msg;
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
} else {
//接收服务端的消息
WebSocketFrame frame = (WebSocketFrame)msg;
//文本信息
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame)frame;
System.out.println("客户端接收的消息是:"+textFrame.text());
}
//二进制信息
if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame)frame;
System.out.println("BinaryWebSocketFrame");
}
//ping信息
if (frame instanceof PongWebSocketFrame) {
System.out.println("WebSocket Client pong");
}
//关闭消息
if (frame instanceof CloseWebSocketFrame) {
System.out.println("receive close frame");
ch.close();
}
}
}
/**
* Handler活跃状态,表示连接成功
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("与服务端连接成功");
}
/**
* 非活跃状态,没有连接远程主机的时候。
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("主机关闭");
}
/**
* 异常处理
* @param ctx
* @param cause
* @throws Exception
*/
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("连接异常:"+cause.getMessage());
ctx.close();
}
public void handlerAdded(ChannelHandlerContext ctx) {
this.handshakeFuture = ctx.newPromise();
}
public WebSocketClientHandshaker getHandshaker() {
return handshaker;
}
public void setHandshaker(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelPromise getHandshakeFuture() {
return handshakeFuture;
}
public void setHandshakeFuture(ChannelPromise handshakeFuture) {
this.handshakeFuture = handshakeFuture;
}
public ChannelFuture handshakeFuture() {
return this.handshakeFuture;
}
}
结果:
当前握手的状态false
服务端的消息DefaultHttpHeaders[upgrade: websocket, connection: upgrade, sec-websocket-accept: MVirCoc2+74v5fT6LiEzbDXl7nM=, content-length: 0]
握手成功
消息发送成功,发送的消息是:你好,我是客户端
当前握手的状态true
WebSocket Client pong