springboot 集成Netty+websocket實現簡單的聊天功能


1.maven依賴

    <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.10.Final</version>
    </dependency>

 

2.springboot入口啟動類

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.support.SpringBootServletInitializer;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import com.xxxxx.netty.NettyServer;

/**
 * Spring Boot 應用啟動類
 *
 * Created by bysocket on 16/4/26.
 */
// Spring Boot 應用的標識
@SpringBootApplication
// mapper 接口類掃描包配置
@EnableTransactionManagement
@EnableScheduling
@EnableAspectJAutoProxy(proxyTargetClass = true) //開啟AspectJ代理,並將proxyTargetClass置為true,表示啟用cglib對Class也進行代理
@MapperScan("com.xxxxx.dao")
public class Application extends SpringBootServletInitializer {

    public static void main(String[] args) {
        // 程序啟動入口
        // 啟動嵌入式的 Tomcat 並初始化 Spring 環境及其各 Spring 組件
        SpringApplication.run(Application.class, args);
        
        try {
            new NettyServer(8091).start();
        }catch(Exception e) {
            System.out.println("NettyServerError:"+e.getMessage());
        }
    }
}

 

3. NettyServer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 * NettyServer Netty服務器配置
 * @author
 * @date
 */
public class NettyServer {
    
    private static Logger logger = LoggerFactory.getLogger(NettyServer.class);
    
    private final int port;
 
    public NettyServer(int port) {
        this.port = port;
    }
 
    public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
 
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.option(ChannelOption.SO_BACKLOG, 1024);
            sb.group(group, bossGroup) // 綁定線程池
                    .channel(NioServerSocketChannel.class) // 指定使用的channel
                    .localAddress(this.port)// 綁定監聽端口
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 綁定客戶端連接時候觸發操作
 
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            logger.info("收到新的客戶端連接: {}",ch.toString());
                            //websocket協議本身是基於http協議的,所以這邊也要使用http解編碼器
                            ch.pipeline().addLast(new HttpServerCodec());
                            //以塊的方式來寫的處理器
                            ch.pipeline().addLast(new ChunkedWriteHandler());
                            ch.pipeline().addLast(new HttpObjectAggregator(8192));
                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
                            ch.pipeline().addLast(new MyWebSocketHandler());
                        }
                    });
            ChannelFuture cf = sb.bind().sync(); // 服務器異步創建綁定
            System.out.println(NettyServer.class + " 啟動正在監聽: " + cf.channel().localAddress());
            cf.channel().closeFuture().sync(); // 關閉服務器通道
        } finally {
            group.shutdownGracefully().sync(); // 釋放線程池資源
            bossGroup.shutdownGracefully().sync();
        }
    }
}

 

4.MyWebSocketHandler

import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.Page;
import com.xxxxx.service.IServiceXfzhQz;
import com.xxxxx.util.SpringUtil;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * MyWebSocketHandler
 * WebSocket處理器,處理websocket連接相關
 * @author 
 * @date 
 */
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
    
    private static Logger logger = LoggerFactory.getLogger(MyWebSocketHandler.class);
    
    public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
    //用戶id=>channel示例
    //可以通過用戶的唯一標識保存用戶的channel
    //這樣就可以發送給指定的用戶
    public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();
    
    //由於@Autowired注解注入不進去,所以取巧了
    static IServiceXfzhQz serviceXfzhQz;
    static {
        serviceXfzhQz = SpringUtil.getBean(IServiceXfzhQz.class);
    }
    
    
    /**
     * 每當服務端收到新的客戶端連接時,客戶端的channel存入ChannelGroup列表中,並通知列表中其他客戶端channel
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //獲取連接的channel
        Channel incomming = ctx.channel();
        //通知所有已經連接到服務器的客戶端,有一個新的通道加入
        /*for(Channel channel:channelGroup){
            channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"加入\n");
        }*/
        channelGroup.add(ctx.channel());
    }

    /**
     *每當服務端斷開客戶端連接時,客戶端的channel從ChannelGroup中移除,並通知列表中其他客戶端channel
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        //獲取連接的channel
        /*Channel incomming = ctx.channel();
        for(Channel channel:channelGroup){
            channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"離開\n");
        }*/
        //從服務端的channelGroup中移除當前離開的客戶端
        channelGroup.remove(ctx.channel());
        
        //從服務端的channelMap中移除當前離開的客戶端
        Collection<Channel> col = channelMap.values();
        while(true == col.contains(ctx.channel())) {
            col.remove(ctx.channel());
            logger.info("netty客戶端連接刪除成功!");
        }

    }

    /**
     * 服務端監聽到客戶端活動
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info("netty與客戶端建立連接,通道開啟!");

        //添加到channelGroup通道組
        //channelGroup.add(ctx.channel());
    }
 
    /**
     * 服務端監聽到客戶端不活動
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        logger.info("netty與客戶端斷開連接,通道關閉!");
        //添加到channelGroup 通道組
        //channelGroup.remove(ctx.channel());
    }
 
    /**
     * 每當從服務端讀到客戶端寫入信息時,將信息轉發給其他客戶端的Channel.
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        logger.info("netty客戶端收到服務器數據: {}" , msg.text());
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        //消息處理類
        message(ctx,msg.text(),date);
        
        //channelGroup.writeAndFlush( new TextWebSocketFrame(msg.text()));
    }

    /**
     * 當服務端的IO 拋出異常時被調用
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //super.exceptionCaught(ctx, cause);
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:" + incoming.remoteAddress()+"異常");
        //異常出現就關閉連接
        cause.printStackTrace();
        ctx.close();
    }

    //消息處理類
    public void message(ChannelHandlerContext ctx,String msg,String date) {
        try {
            Map<String,Object> resultmap = (Map<String,Object>)JSONObject.parse(msg);
            resultmap.put("CREATEDATE", date);

                //這里需要用戶信息跟channel通道綁定
                //所以每當一個客戶端連接成功時,第一時間傳一條登錄信息
                //該字段用來判斷是登錄綁定信息,還是發送信息
                String msgtype = (String)resultmap.get("MSGTYPE");
            
            if(msgtype.equals("DL")){//用戶登錄信息綁定
                
                Channel channel = ctx.channel();
                channelMap.put((String) resultmap.get("USERID"), channel);
                
                resultmap.put("success", true);
                resultmap.put("message", "用戶鏈接綁定成功!");
                channel.writeAndFlush(new TextWebSocketFrame(resultmap.toString()));
                logger.info("netty用戶: {} 連接綁定成功!" , (String) resultmap.get("USERID"));

            }else if(msgtype.equals("DH")){//消息對話
                
                //這里根據群組ID查詢出來所有的用戶信息並循環發送消息
                //如果是單聊 可以直接獲取channelMap中用戶channel並發送
                Page<Map<String, Object>> list =  serviceXfzhQz.selectXfzhQzByQzId((String)resultmap.get("QZID"));
                for (Map<String, Object> map : list) {
                    
                    String userid = (String) map.get("USERID");
                    //判斷該用戶ID是否綁定通道
                    if(channelMap.containsKey(userid)){
                        Channel channel = channelMap.get(userid);
                        channel.writeAndFlush(new TextWebSocketFrame(resultmap.toString()));
                    }
                }
                
                
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        }

    } 
    
}

 5.測試頁面

   向服務端發送消息時,數據格式是json字符串.

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <title>Netty-Websocket</title>
    <script type="text/javascript">
        var socket;
        if(!window.WebSocket){
            window.WebSocket = window.MozWebSocket;
        }
        if(window.WebSocket){
            socket = new WebSocket("ws://127.0.0.1:8091/ws");
            socket.onmessage = function(event){
                var ta = document.getElementById('responseText');
                ta.value += event.data+"\r\n";
            };
            socket.onopen = function(event){
                var ta = document.getElementById('responseText');
                ta.value = "Netty-WebSocket服務器。。。。。。連接  \r\n";
            };
            socket.onclose = function(event){
                var ta = document.getElementById('responseText');
                ta.value = "Netty-WebSocket服務器。。。。。。關閉 \r\n";
            };
        }else{
            alert("您的瀏覽器不支持WebSocket協議!");
        }
        function send(message){
            if(!window.WebSocket){return;}
            if(socket.readyState == WebSocket.OPEN){
                socket.send(message);
            }else{
                alert("WebSocket 連接沒有建立成功!");
            }
 
        }
 
    </script>
</head>
<body>
<form onSubmit="return false;">
    <label>TEXT</label><input type="text" name="message" value="這里輸入消息" 
    style="width: 1024px;height: 100px;"/> <br />
    <br /> <input type="button" value="發送ws消息"
                  onClick="send(this.form.message.value)" />
    <hr color="black" />
    <h3>服務端返回的應答消息</h3>
    <textarea id="responseText" style="width: 1024px;height: 300px;"></textarea>
</form>
</body>
</html>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM