Netty Websocket使用


Netty Websocket协议开发

  Netty基于HTTP协议栈开发了WebSocket 协议栈,利用Netty的WebSocket 协议栈可以方便开发WebSocket 客户端和服务端

Netty Websocket 服务端

1 Netty Websocket 服务端启动类

package com.webSocket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class WebSocketServer {

public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {

//websocket协议本身是基于Http协议的,所以需要Http解码器
ch.pipeline().addLast("http-codec",new HttpServerCodec());
//以块的方式来写的处理器
ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
//websocket协议本身是基于http协议的,所以这边也要使用http解编码器
//netty是基于分段请求的,HttpObjectAggregator的作用是将请求分段再聚合,参数是聚合字节的最大长度
ch.pipeline().addLast(new HttpObjectAggregator(1024*1024*1024));
ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws",null,true,65535));
// ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
ch.pipeline().addLast(new WebSocketServerHandler());

}
});

ChannelFuture sync = bootstrap.bind(port).sync();
sync.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {

new WebSocketServer().bind(9090);
}
}

2 Netty Websocket 服务端启动处理类

 

package com.webSocket;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.*;

public class WebSocketServerHandler extends ChannelInboundHandlerAdapter {

private WebSocketServerHandshaker handshaker;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

if (msg instanceof FullHttpRequest) {
//以http请求形式接入,但是走的是websocket
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
//处理websocket客户端的消息
handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}



public void channelReadComplete(ChannelHandlerContext context) throws Exception
{

context.flush();
}
/*
http
*/
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {


WebSocketServerHandshakerFactory webFactory=new WebSocketServerHandshakerFactory("ws://127.0.0.1:9090",null,false);

if(req instanceof CloseWebSocketFrame)
{
handshaker.close(ctx.channel(),(CloseWebSocketFrame) req.retain());
if(handshaker==null)
{
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());

}
else
{
handshaker.handshake(ctx.channel(),req);
}
}

}

/*
webSocket
*/
private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

if (frame instanceof CloseWebSocketFrame)
{
handshaker.close(ctx.channel(),(CloseWebSocketFrame)frame.retain());
return;
}

if(frame instanceof TextWebSocketFrame)
{
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if(!(frame instanceof TextWebSocketFrame))
{

}

String request=((TextWebSocketFrame) frame).text();
ctx.channel().write(request+"欢迎Netty WebSocket");
}

}

Netty Websocket 客户端

1 Netty Websocket 客户端启动类

 

package com.webSocket;

import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import java.net.URI;

public class WebSocketClients {




public static void main(String[] args) throws Exception {
//netty基本操作,线程组
EventLoopGroup group = new NioEventLoopGroup();
//netty基本操作,启动类
Bootstrap boot = new Bootstrap();
boot.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.group(group)
.handler(new LoggingHandler(LogLevel.INFO))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelHandler[]{new HttpClientCodec(),
new HttpObjectAggregator(1024*1024*10)});
//websocket协议本身是基于Http协议的,所以需要Http解码器
ch.pipeline().addLast("http-codec",new HttpServerCodec());
//以块的方式来写的处理器
ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
// ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
ch.pipeline().addLast("WebSocketClientHandler",new WebSocketClientHandler());

}
});

//websocke连接的地址,/hello是因为在服务端的websockethandler设置的
URI websocketURI = new URI("ws://localhost:9090/ws");
HttpHeaders httpHeaders = new DefaultHttpHeaders();
//进行握手
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, (String) null, true, httpHeaders);

//客户端与服务端连接的通道,final修饰表示只会有一个
final Channel channel = boot.connect(websocketURI.getHost(), websocketURI.getPort()).sync().channel();
WebSocketClientHandler handler = (WebSocketClientHandler) channel.pipeline().get("WebSocketClientHandler");

handler.setHandshaker(handshaker);
handshaker.handshake(channel);
//阻塞等待是否握手成功
handler.handshakeFuture().sync();
System.out.println("握手成功");
//给服务端发送的内容,如果客户端与服务端连接成功后,可以多次掉用这个方法发送消息
sengMessage(channel);


}
public static void sengMessage(Channel channel){
//发送的内容,是一个文本格式的内容
String putMessage="你好,我是客户端";
TextWebSocketFrame frame = new TextWebSocketFrame(putMessage);
channel.writeAndFlush(frame).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("消息发送成功,发送的消息是:"+putMessage);
} else {
System.out.println("消息发送失败 " + channelFuture.cause().getMessage());
}
}
});
}

}

2 Netty Websocket 客户端启动处理类

 

package com.webSocket;

import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import io.netty.channel.SimpleChannelInboundHandler;

public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
//握手的状态信息
WebSocketClientHandshaker handshaker;


//netty自带的异步处理
ChannelPromise handshakeFuture;

@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("当前握手的状态"+this.handshaker.isHandshakeComplete());
Channel ch = ctx.channel();
FullHttpResponse response;
//进行握手操作
if (!this.handshaker.isHandshakeComplete()) {
try {
response = (FullHttpResponse)msg;
//握手协议返回,设置结束握手
this.handshaker.finishHandshake(ch, response);
//设置成功
this.handshakeFuture.setSuccess();
System.out.println("服务端的消息"+response.headers());
} catch (WebSocketHandshakeException var7) {
FullHttpResponse res = (FullHttpResponse)msg;
String errorMsg = String.format("握手失败,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8));
this.handshakeFuture.setFailure(new Exception(errorMsg));
}
} else if (msg instanceof FullHttpResponse) {
response = (FullHttpResponse)msg;
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
} else {
//接收服务端的消息
WebSocketFrame frame = (WebSocketFrame)msg;
//文本信息
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame)frame;
System.out.println("客户端接收的消息是:"+textFrame.text());
}
//二进制信息
if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame)frame;
System.out.println("BinaryWebSocketFrame");
}
//ping信息
if (frame instanceof PongWebSocketFrame) {
System.out.println("WebSocket Client pong");
}
//关闭消息
if (frame instanceof CloseWebSocketFrame) {
System.out.println("receive close frame");
ch.close();
}

}
}

/**
* Handler活跃状态,表示连接成功
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("与服务端连接成功");
}

/**
* 非活跃状态,没有连接远程主机的时候。
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("主机关闭");
}

/**
* 异常处理
* @param ctx
* @param cause
* @throws Exception
*/
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("连接异常:"+cause.getMessage());
ctx.close();
}

public void handlerAdded(ChannelHandlerContext ctx) {
this.handshakeFuture = ctx.newPromise();
}

public WebSocketClientHandshaker getHandshaker() {
return handshaker;
}

public void setHandshaker(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}

public ChannelPromise getHandshakeFuture() {
return handshakeFuture;
}

public void setHandshakeFuture(ChannelPromise handshakeFuture) {
this.handshakeFuture = handshakeFuture;
}

public ChannelFuture handshakeFuture() {
return this.handshakeFuture;
}

}

 

结果:

当前握手的状态false
服务端的消息DefaultHttpHeaders[upgrade: websocket, connection: upgrade, sec-websocket-accept: MVirCoc2+74v5fT6LiEzbDXl7nM=, content-length: 0]
握手成功
消息发送成功,发送的消息是:你好,我是客户端
当前握手的状态true
WebSocket Client  pong


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM