1.maven依賴
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.10.Final</version> </dependency>
2.springboot入口啟動類
import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.support.SpringBootServletInitializer; import org.springframework.context.annotation.EnableAspectJAutoProxy; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.transaction.annotation.EnableTransactionManagement; import com.xxxxx.netty.NettyServer; /** * Spring Boot 應用啟動類 * * Created by bysocket on 16/4/26. */ // Spring Boot 應用的標識 @SpringBootApplication // mapper 接口類掃描包配置 @EnableTransactionManagement @EnableScheduling @EnableAspectJAutoProxy(proxyTargetClass = true) //開啟AspectJ代理,並將proxyTargetClass置為true,表示啟用cglib對Class也進行代理 @MapperScan("com.xxxxx.dao") public class Application extends SpringBootServletInitializer { public static void main(String[] args) { // 程序啟動入口 // 啟動嵌入式的 Tomcat 並初始化 Spring 環境及其各 Spring 組件 SpringApplication.run(Application.class, args); try { new NettyServer(8091).start(); }catch(Exception e) { System.out.println("NettyServerError:"+e.getMessage()); } } }
3. NettyServer
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; /** * NettyServer Netty服務器配置 * @author * @date */ public class NettyServer { private static Logger logger = LoggerFactory.getLogger(NettyServer.class); private final int port; public NettyServer(int port) { this.port = port; } public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap sb = new ServerBootstrap(); sb.option(ChannelOption.SO_BACKLOG, 1024); sb.group(group, bossGroup) // 綁定線程池 .channel(NioServerSocketChannel.class) // 指定使用的channel .localAddress(this.port)// 綁定監聽端口 .childHandler(new ChannelInitializer<SocketChannel>() { // 綁定客戶端連接時候觸發操作 @Override protected void initChannel(SocketChannel ch) throws Exception { logger.info("收到新的客戶端連接: {}",ch.toString()); //websocket協議本身是基於http協議的,所以這邊也要使用http解編碼器 ch.pipeline().addLast(new HttpServerCodec()); //以塊的方式來寫的處理器 ch.pipeline().addLast(new ChunkedWriteHandler()); ch.pipeline().addLast(new HttpObjectAggregator(8192)); ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10)); ch.pipeline().addLast(new MyWebSocketHandler()); } }); ChannelFuture cf = sb.bind().sync(); // 服務器異步創建綁定 System.out.println(NettyServer.class + " 啟動正在監聽: " + cf.channel().localAddress()); cf.channel().closeFuture().sync(); // 關閉服務器通道 } finally { group.shutdownGracefully().sync(); // 釋放線程池資源 bossGroup.shutdownGracefully().sync(); } } }
4.MyWebSocketHandler
import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSONObject; import com.github.pagehelper.Page; import com.xxxxx.service.IServiceXfzhQz; import com.xxxxx.util.SpringUtil; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; /** * MyWebSocketHandler * WebSocket處理器,處理websocket連接相關 * @author * @date */ public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ private static Logger logger = LoggerFactory.getLogger(MyWebSocketHandler.class); public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); //用戶id=>channel示例 //可以通過用戶的唯一標識保存用戶的channel //這樣就可以發送給指定的用戶 public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>(); //由於@Autowired注解注入不進去,所以取巧了 static IServiceXfzhQz serviceXfzhQz; static { serviceXfzhQz = SpringUtil.getBean(IServiceXfzhQz.class); } /** * 每當服務端收到新的客戶端連接時,客戶端的channel存入ChannelGroup列表中,並通知列表中其他客戶端channel * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //獲取連接的channel Channel incomming = ctx.channel(); //通知所有已經連接到服務器的客戶端,有一個新的通道加入 /*for(Channel channel:channelGroup){ channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"加入\n"); }*/ channelGroup.add(ctx.channel()); } /** *每當服務端斷開客戶端連接時,客戶端的channel從ChannelGroup中移除,並通知列表中其他客戶端channel * @param ctx * @throws Exception */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { //獲取連接的channel /*Channel incomming = ctx.channel(); for(Channel channel:channelGroup){ channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"離開\n"); }*/ //從服務端的channelGroup中移除當前離開的客戶端 channelGroup.remove(ctx.channel()); //從服務端的channelMap中移除當前離開的客戶端 Collection<Channel> col = channelMap.values(); while(true == col.contains(ctx.channel())) { col.remove(ctx.channel()); logger.info("netty客戶端連接刪除成功!"); } } /** * 服務端監聽到客戶端活動 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("netty與客戶端建立連接,通道開啟!"); //添加到channelGroup通道組 //channelGroup.add(ctx.channel()); } /** * 服務端監聽到客戶端不活動 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { logger.info("netty與客戶端斷開連接,通道關閉!"); //添加到channelGroup 通道組 //channelGroup.remove(ctx.channel()); } /** * 每當從服務端讀到客戶端寫入信息時,將信息轉發給其他客戶端的Channel. * @param ctx * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { logger.info("netty客戶端收到服務器數據: {}" , msg.text()); String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); //消息處理類 message(ctx,msg.text(),date); //channelGroup.writeAndFlush( new TextWebSocketFrame(msg.text())); } /** * 當服務端的IO 拋出異常時被調用 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //super.exceptionCaught(ctx, cause); Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:" + incoming.remoteAddress()+"異常"); //異常出現就關閉連接 cause.printStackTrace(); ctx.close(); } //消息處理類 public void message(ChannelHandlerContext ctx,String msg,String date) { try { Map<String,Object> resultmap = (Map<String,Object>)JSONObject.parse(msg); resultmap.put("CREATEDATE", date); //這里需要用戶信息跟channel通道綁定 //所以每當一個客戶端連接成功時,第一時間傳一條登錄信息 //該字段用來判斷是登錄綁定信息,還是發送信息 String msgtype = (String)resultmap.get("MSGTYPE"); if(msgtype.equals("DL")){//用戶登錄信息綁定 Channel channel = ctx.channel(); channelMap.put((String) resultmap.get("USERID"), channel); resultmap.put("success", true); resultmap.put("message", "用戶鏈接綁定成功!"); channel.writeAndFlush(new TextWebSocketFrame(resultmap.toString())); logger.info("netty用戶: {} 連接綁定成功!" , (String) resultmap.get("USERID")); }else if(msgtype.equals("DH")){//消息對話 //這里根據群組ID查詢出來所有的用戶信息並循環發送消息 //如果是單聊 可以直接獲取channelMap中用戶channel並發送 Page<Map<String, Object>> list = serviceXfzhQz.selectXfzhQzByQzId((String)resultmap.get("QZID")); for (Map<String, Object> map : list) { String userid = (String) map.get("USERID"); //判斷該用戶ID是否綁定通道 if(channelMap.containsKey(userid)){ Channel channel = channelMap.get(userid); channel.writeAndFlush(new TextWebSocketFrame(resultmap.toString())); } } } } catch (Exception e) { e.printStackTrace(); } } }
5.測試頁面
向服務端發送消息時,數據格式是json字符串.
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <title>Netty-Websocket</title> <script type="text/javascript"> var socket; if(!window.WebSocket){ window.WebSocket = window.MozWebSocket; } if(window.WebSocket){ socket = new WebSocket("ws://127.0.0.1:8091/ws"); socket.onmessage = function(event){ var ta = document.getElementById('responseText'); ta.value += event.data+"\r\n"; }; socket.onopen = function(event){ var ta = document.getElementById('responseText'); ta.value = "Netty-WebSocket服務器。。。。。。連接 \r\n"; }; socket.onclose = function(event){ var ta = document.getElementById('responseText'); ta.value = "Netty-WebSocket服務器。。。。。。關閉 \r\n"; }; }else{ alert("您的瀏覽器不支持WebSocket協議!"); } function send(message){ if(!window.WebSocket){return;} if(socket.readyState == WebSocket.OPEN){ socket.send(message); }else{ alert("WebSocket 連接沒有建立成功!"); } } </script> </head> <body> <form onSubmit="return false;"> <label>TEXT</label><input type="text" name="message" value="這里輸入消息" style="width: 1024px;height: 100px;"/> <br /> <br /> <input type="button" value="發送ws消息" onClick="send(this.form.message.value)" /> <hr color="black" /> <h3>服務端返回的應答消息</h3> <textarea id="responseText" style="width: 1024px;height: 300px;"></textarea> </form> </body> </html>