NettyServer
package com.youxiong.netty.server; import com.youxiong.netty.handler.MyChannelHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.concurrent.TimeUnit; @Component public class NettyServer { private static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class); @Value("${netty.server.port}") public Integer port; public Integer getPort() { return port; } public void setPort(Integer port) { this.port = port; } private void startServer(){ //服務端需要2個線程組 boss處理客戶端連接 work進行客服端連接之后的處理 EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup work = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); //服務器 配置 bootstrap.group(boss,work).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { // HttpServerCodec:將請求和應答消息解碼為HTTP消息 socketChannel.pipeline().addLast("http-codec",new HttpServerCodec()); // HttpObjectAggregator:將HTTP消息的多個部分合成一條完整的HTTP消息 socketChannel.pipeline().addLast("aggregator",new HttpObjectAggregator(65536)); // ChunkedWriteHandler:向客戶端發送HTML5文件 socketChannel.pipeline().addLast("http-chunked",new ChunkedWriteHandler()); // 進行設置心跳檢測 socketChannel.pipeline().addLast(new IdleStateHandler(60,30,60*30, TimeUnit.SECONDS)); // 配置通道處理 來進行業務處理 socketChannel.pipeline().addLast(new MyChannelHandler()); } }).option(ChannelOption.SO_BACKLOG,1024).childOption(ChannelOption.SO_KEEPALIVE,true); //綁定端口 開啟事件驅動 LOGGER.info("【服務器啟動成功========端口:"+port+"】"); Channel channel = bootstrap.bind(port).sync().channel(); channel.closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { //關閉資源 boss.shutdownGracefully(); work.shutdownGracefully(); } } @PostConstruct() public void init(){ //需要開啟一個新的線程來執行netty server 服務器 new Thread(new Runnable() { public void run() { startServer(); } }).start(); } }
handler
package com.youxiong.netty.handler; import com.youxiong.netty.util.GlobalUserUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.websocketx.*; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.AttributeKey; import io.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MyChannelHandler extends SimpleChannelInboundHandler<Object> { private static final Logger LOGGER = LoggerFactory.getLogger(MyChannelHandler.class); private static final String URI = "websocket"; private WebSocketServerHandshaker handshaker ; /** * 連接上服務器 * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { LOGGER.info("【handlerAdded】====>"+ctx.channel().id()); GlobalUserUtil.channels.add(ctx.channel()); } /** * 斷開連接 * @param ctx * @throws Exception */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { LOGGER.info("【handlerRemoved】====>"+ctx.channel().id()); GlobalUserUtil.channels.remove(ctx); } /** * 連接異常 需要關閉相關資源 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { LOGGER.error("【系統異常】======>"+cause.toString()); ctx.close(); ctx.channel().close(); } /** * 活躍的通道 也可以當作用戶連接上客戶端進行使用 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { LOGGER.info("【channelActive】=====>"+ctx.channel()); } /** * 不活躍的通道 就說明用戶失去連接 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { } /** * 這里只要完成 flush * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } /** * 這里是保持服務器與客戶端長連接 進行心跳檢測 避免連接斷開 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent){ IdleStateEvent stateEvent = (IdleStateEvent) evt; PingWebSocketFrame ping = new PingWebSocketFrame(); switch (stateEvent.state()){ //讀空閑(服務器端) case READER_IDLE: LOGGER.info("【"+ctx.channel().remoteAddress()+"】讀空閑(服務器端)"); ctx.writeAndFlush(ping); break; //寫空閑(客戶端) case WRITER_IDLE: LOGGER.info("【"+ctx.channel().remoteAddress()+"】寫空閑(客戶端)"); ctx.writeAndFlush(ping); break; case ALL_IDLE: LOGGER.info("【"+ctx.channel().remoteAddress()+"】讀寫空閑"); break; } } } /** * 收發消息處理 * @param ctx * @param msg * @throws Exception */ protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof HttpRequest){ doHandlerHttpRequest(ctx,(HttpRequest) msg); }else if(msg instanceof WebSocketFrame){ doHandlerWebSocketFrame(ctx,(WebSocketFrame) msg); } } /** * websocket消息處理 * @param ctx * @param msg */ private void doHandlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) { //判斷msg 是哪一種類型 分別做出不同的反應 if(msg instanceof CloseWebSocketFrame){ LOGGER.info("【關閉】"); handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg); return ; } if(msg instanceof PingWebSocketFrame){ LOGGER.info("【ping】"); PongWebSocketFrame pong = new PongWebSocketFrame(msg.content().retain()); ctx.channel().writeAndFlush(pong); return ; } if(msg instanceof PongWebSocketFrame){ LOGGER.info("【pong】"); PingWebSocketFrame ping = new PingWebSocketFrame(msg.content().retain()); ctx.channel().writeAndFlush(ping); return ; } if(!(msg instanceof TextWebSocketFrame)){ LOGGER.info("【不支持二進制】"); throw new UnsupportedOperationException("不支持二進制"); } //可以對消息進行處理 //群發 for (Channel channel : GlobalUserUtil.channels) { channel.writeAndFlush(new TextWebSocketFrame(((TextWebSocketFrame) msg).text())); } } /** * wetsocket第一次連接握手 * @param ctx * @param msg */ private void doHandlerHttpRequest(ChannelHandlerContext ctx, HttpRequest msg) { // http 解碼失敗 if(!msg.getDecoderResult().isSuccess() || (!"websocket".equals(msg.headers().get("Upgrade")))){ sendHttpResponse(ctx, (FullHttpRequest) msg,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST)); } //可以獲取msg的uri來判斷 String uri = msg.getUri(); if(!uri.substring(1).equals(URI)){ ctx.close(); } ctx.attr(AttributeKey.valueOf("type")).set(uri); //可以通過url獲取其他參數 WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory( "ws://"+msg.headers().get("Host")+"/"+URI+"",null,false ); handshaker = factory.newHandshaker(msg); if(handshaker == null){ WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); } //進行連接 handshaker.handshake(ctx.channel(), (FullHttpRequest) msg); //可以做其他處理 } private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // 返回應答給客戶端 if (res.getStatus().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } // 如果是非Keep-Alive,關閉連接 ChannelFuture f = ctx.channel().writeAndFlush(res); if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } } package com.youxiong.netty.util; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; public class GlobalUserUtil { //保存全局的 連接上服務器的客戶 public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor .INSTANCE); }
NettyServer
package com.youxiong.netty.server;
import com.youxiong.netty.handler.MyChannelHandler;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.stream.ChunkedWriteHandler;import io.netty.handler.timeout.IdleStateHandler;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;import java.util.concurrent.TimeUnit;
@Componentpublic class NettyServer {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
@Value("${netty.server.port}") public Integer port;
public Integer getPort() { return port; }
public void setPort(Integer port) { this.port = port; }
private void startServer(){ //服務端需要2個線程組 boss處理客戶端連接 work進行客服端連接之后的處理 EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup work = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); //服務器 配置 bootstrap.group(boss,work).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { // HttpServerCodec:將請求和應答消息解碼為HTTP消息 socketChannel.pipeline().addLast("http-codec",new HttpServerCodec()); // HttpObjectAggregator:將HTTP消息的多個部分合成一條完整的HTTP消息 socketChannel.pipeline().addLast("aggregator",new HttpObjectAggregator(65536)); // ChunkedWriteHandler:向客戶端發送HTML5文件 socketChannel.pipeline().addLast("http-chunked",new ChunkedWriteHandler()); // 進行設置心跳檢測 socketChannel.pipeline().addLast(new IdleStateHandler(60,30,60*30, TimeUnit.SECONDS)); // 配置通道處理 來進行業務處理 socketChannel.pipeline().addLast(new MyChannelHandler()); } }).option(ChannelOption.SO_BACKLOG,1024).childOption(ChannelOption.SO_KEEPALIVE,true); //綁定端口 開啟事件驅動 LOGGER.info("【服務器啟動成功========端口:"+port+"】"); Channel channel = bootstrap.bind(port).sync().channel(); channel.closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { //關閉資源 boss.shutdownGracefully(); work.shutdownGracefully(); } }
@PostConstruct() public void init(){ //需要開啟一個新的線程來執行netty server 服務器 new Thread(new Runnable() { public void run() { startServer(); } }).start(); }}handler
package com.youxiong.netty.handler;
import com.youxiong.netty.util.GlobalUserUtil;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.handler.codec.http.*;import io.netty.handler.codec.http.websocketx.*;import io.netty.handler.timeout.IdleStateEvent;import io.netty.util.AttributeKey;import io.netty.util.CharsetUtil;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
public class MyChannelHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger LOGGER = LoggerFactory.getLogger(MyChannelHandler.class);
private static final String URI = "websocket";
private WebSocketServerHandshaker handshaker ;
/** * 連接上服務器 * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { LOGGER.info("【handlerAdded】====>"+ctx.channel().id()); GlobalUserUtil.channels.add(ctx.channel()); }
/** * 斷開連接 * @param ctx * @throws Exception */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { LOGGER.info("【handlerRemoved】====>"+ctx.channel().id()); GlobalUserUtil.channels.remove(ctx); }
/** * 連接異常 需要關閉相關資源 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { LOGGER.error("【系統異常】======>"+cause.toString()); ctx.close(); ctx.channel().close(); }
/** * 活躍的通道 也可以當作用戶連接上客戶端進行使用 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { LOGGER.info("【channelActive】=====>"+ctx.channel()); }
/** * 不活躍的通道 就說明用戶失去連接 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
/** * 這里只要完成 flush * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); }
/** * 這里是保持服務器與客戶端長連接 進行心跳檢測 避免連接斷開 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent){ IdleStateEvent stateEvent = (IdleStateEvent) evt; PingWebSocketFrame ping = new PingWebSocketFrame(); switch (stateEvent.state()){ //讀空閑(服務器端) case READER_IDLE: LOGGER.info("【"+ctx.channel().remoteAddress()+"】讀空閑(服務器端)"); ctx.writeAndFlush(ping); break; //寫空閑(客戶端) case WRITER_IDLE: LOGGER.info("【"+ctx.channel().remoteAddress()+"】寫空閑(客戶端)"); ctx.writeAndFlush(ping); break; case ALL_IDLE: LOGGER.info("【"+ctx.channel().remoteAddress()+"】讀寫空閑"); break; } } }
/** * 收發消息處理 * @param ctx * @param msg * @throws Exception */ protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof HttpRequest){ doHandlerHttpRequest(ctx,(HttpRequest) msg); }else if(msg instanceof WebSocketFrame){ doHandlerWebSocketFrame(ctx,(WebSocketFrame) msg); } }
/** * websocket消息處理 * @param ctx * @param msg */ private void doHandlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) { //判斷msg 是哪一種類型 分別做出不同的反應 if(msg instanceof CloseWebSocketFrame){ LOGGER.info("【關閉】"); handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg); return ; } if(msg instanceof PingWebSocketFrame){ LOGGER.info("【ping】"); PongWebSocketFrame pong = new PongWebSocketFrame(msg.content().retain()); ctx.channel().writeAndFlush(pong); return ; } if(msg instanceof PongWebSocketFrame){ LOGGER.info("【pong】"); PingWebSocketFrame ping = new PingWebSocketFrame(msg.content().retain()); ctx.channel().writeAndFlush(ping); return ; } if(!(msg instanceof TextWebSocketFrame)){ LOGGER.info("【不支持二進制】"); throw new UnsupportedOperationException("不支持二進制"); } //可以對消息進行處理 //群發 for (Channel channel : GlobalUserUtil.channels) { channel.writeAndFlush(new TextWebSocketFrame(((TextWebSocketFrame) msg).text())); }
}
/** * wetsocket第一次連接握手 * @param ctx * @param msg */ private void doHandlerHttpRequest(ChannelHandlerContext ctx, HttpRequest msg) { // http 解碼失敗 if(!msg.getDecoderResult().isSuccess() || (!"websocket".equals(msg.headers().get("Upgrade")))){ sendHttpResponse(ctx, (FullHttpRequest) msg,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST)); } //可以獲取msg的uri來判斷 String uri = msg.getUri(); if(!uri.substring(1).equals(URI)){ ctx.close(); } ctx.attr(AttributeKey.valueOf("type")).set(uri); //可以通過url獲取其他參數 WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory( "ws://"+msg.headers().get("Host")+"/"+URI+"",null,false ); handshaker = factory.newHandshaker(msg); if(handshaker == null){ WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); } //進行連接 handshaker.handshake(ctx.channel(), (FullHttpRequest) msg); //可以做其他處理 }
private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // 返回應答給客戶端 if (res.getStatus().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } // 如果是非Keep-Alive,關閉連接 ChannelFuture f = ctx.channel().writeAndFlush(res); if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } }}package com.youxiong.netty.util;
import io.netty.channel.group.ChannelGroup;import io.netty.channel.group.DefaultChannelGroup;import io.netty.util.concurrent.GlobalEventExecutor;
public class GlobalUserUtil {
//保存全局的 連接上服務器的客戶 public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor .INSTANCE);}--------------------- 作者:yx726843014 來源:CSDN 原文:https://blog.csdn.net/xieliaowa9231/article/details/80151446 版權聲明:本文為博主原創文章,轉載請附上博文鏈接!