Java 集成WebSocket实现实时通讯


去年独立负责开发了一个小程序拼单的功能,要求两个及两个以上的设备同时在线点单时,单个设备加入购物车的商品要实时显示在所有设备的购物车上面,最后再由拼单发起人进行结算和支付。当时小程序额外还加了一个拼单发起人可以向参与人发起群收款功能,这个功能以后再介绍。

刚写代码的时候用PHP集成Swoole写过视频直播的聊天功能,所以当时看到拼单购物车共享功能就想到实时性要求肯定很高,脑袋里就浮现出了Websocket,但技术选型因为某些原因被否决了,最后我只能采用每秒轮询购物车版本号的方式来实现这个功能了。但是在面对实时性要求很高的功能,我坚信Websocket依然是很好的选择。

 

这篇文章就将Websocket集成到SpringBoot中,简单实现聊天房间在线用户和消息列表。

首先在SpringBoot的pom.xml文件里面加入Websocket整合包:

<dependencies>
    ...
    <dependency>
    <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    ...
</dependencies>

 然后创建一个配置文件,注入ServerEndpointExporter,简单来说就是让SpringBoot识别Websocket的注解

package com.demo.www.config.websocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * WebSocket服务配置
 * @author AnYuan
 */

@Configuration
public class WebsocketConfig {

    /**
     * 注入一个ServerEndpointExporter
     * 该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

 再做一个前置事情,这边构建一个统一的消息模版类,用来统一接受和发送消息的数据字段和类型:

package com.demo.www.config.websocket;

import lombok.Data;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;

/**
 * 消息模版
 * @author AnYuan
 */

@Data
public class WebsocketMsgDTO {

    /**
     * 发送消息用户
     */
    private String uid;
    /**
     * 接收消息用户
     */
    private String toUId;
    /**
     * 消息内容
     */
    private String content;
    /**
     * 消息时间
     */
    private String dateTime;
    /**
     * 用户列表
     */
    private List<String> onlineUser;

    /**
     * 统一消息模版
     * @param uid 发送消息用户
     * @param content 消息内容
     * @param onlineUser 在线用户列表
     */
    public WebsocketMsgDTO(String uid, String content, List<String> onlineUser) {
        this.uid = uid;
        this.content = content;
        this.onlineUser = onlineUser;
        this.dateTime = localDateTimeToString();
    }


    /**
     * 获取当前时间
     * @return String 12:00:00
     */
    private String localDateTimeToString() {
        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
        return dateTimeFormatter.format( LocalDateTime.now());
    }
}

最后上服务端逻辑代码:@ServerEndpoint(value="") 这个是Websocket服务url前缀,{uid}类似于ResutFul风格的参数

package com.demo.www.config.websocket;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.util.Strings;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * WebSocketServer服务
 * @author AnYuan
 */

@ServerEndpoint(value = "/webSocket/{uid}")
@Component
@Slf4j
public class WebSocketServer {

    /**
     * 机器人发言名称
     */
    private static final String SPOKESMAN_ADMIN = "机器人";

    /**
     * concurrent包的线程安全Set
     * 用来存放每个客户端对应的Session对象
     */
    private static final ConcurrentHashMap<String, Session> SESSION_POOLS = new ConcurrentHashMap<>();

    /**
     * 静态变量,用来记录当前在线连接数。
     * 应该把它设计成线程安全的。
     */
    private static final AtomicInteger ONLINE_NUM = new AtomicInteger();

    /**
     * 获取在线用户列表
     * @return List<String>
     */
    private List<String> getOnlineUsers() {
        return new ArrayList<>(SESSION_POOLS.keySet());
    }

    /**
     * 用户建立连接成功调用
     * @param session 用户集合
     * @param uid     用户标志
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "uid") String uid) {
        // 将加入连接的用户加入SESSION_POOLS集合
        SESSION_POOLS.put(uid, session);
        // 在线用户+1
        ONLINE_NUM.incrementAndGet();
        sendToAll(new WebsocketMsgDTO(SPOKESMAN_ADMIN, uid + " 加入连接!", getOnlineUsers()));
    }

    /**
     * 用户关闭连接时调用
     * @param uid 用户标志
     */
    @OnClose
    public void onClose(@PathParam(value = "uid") String uid) {
        // 将加入连接的用户移除SESSION_POOLS集合
        SESSION_POOLS.remove(uid);
        // 在线用户-1
        ONLINE_NUM.decrementAndGet();
        sendToAll(new WebsocketMsgDTO(SPOKESMAN_ADMIN, uid + " 断开连接!", getOnlineUsers()));
    }

    /**
     * 服务端收到客户端信息
     * @param message 客户端发来的string
     * @param uid     uid 用户标志
     */
    @OnMessage
    public void onMessage(String message, @PathParam(value = "uid") String uid) {
        log.info("Client:[{}], Message: [{}]", uid, message);

        // 接收并解析前端消息并加上时间,最后根据是否有接收用户,区别发送所有用户还是单个用户
        WebsocketMsgDTO msgDTO = JSONObject.parseObject(message, WebsocketMsgDTO.class);
        msgDTO.setDateTime(localDateTimeToString());

        // 如果有接收用户就发送单个用户
        if (Strings.isNotBlank(msgDTO.getToUId())) {
            sendMsgByUid(msgDTO);
            return;
        }
        // 否则发送所有人
        sendToAll(msgDTO);
    }

    /**
     * 给所有人发送消息
     * @param msgDTO msgDTO
     */
    private void sendToAll(WebsocketMsgDTO msgDTO) {
        //构建json消息体
        String content = JSONObject.toJSONString(msgDTO);
        // 遍历发送所有在线用户
        SESSION_POOLS.forEach((k, session) ->  sendMessage(session, content));
    }

    /**
     * 给指定用户发送信息
     */
    private void sendMsgByUid(WebsocketMsgDTO msgDTO) {
        sendMessage(SESSION_POOLS.get(msgDTO.getToUId()), JSONObject.toJSONString(msgDTO));
    }

    /**
     * 发送消息方法
     * @param session 用户
     * @param content 消息
     */
    private void sendMessage(Session session, String content){
        try {
            if (Objects.nonNull(session)) {
                // 使用Synchronized锁防止多次发送消息
                synchronized (session) {
                    // 发送消息
                    session.getBasicRemote().sendText(content);
                }
            }
        } catch (IOException ioException) {
            log.info("发送消息失败:{}", ioException.getMessage());
            ioException.printStackTrace();
        }
    }

    /**
     * 获取当前时间
     * @return String 12:00:00
     */
    private String localDateTimeToString() {
        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
        return dateTimeFormatter.format( LocalDateTime.now());
    }
}

以上启动后,就开启了一个WebSocket服务了,只需要前端接入就可以了。这里提一下,[获取当前时间]的方法两个地方用到了,建议用一个时间工具类将其提取出来,这里为了减少文件就写了两次。

那接下来我们简单写一下前端样式和Js代码,前端样式是我在网上下载后进行修改的,重点是Js部分, 下面将创建一个Admin用户,一个user用户,同时连接这个WebSocket服务,实现展现在线用户和通告列表的功能。

第一个文件:admin.html

<!DOCTYPE html>
<html>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<head>
    <title>Admin Hello WebSocket</title>
    <link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.4.1/css/bootstrap.min.css" rel="stylesheet">
    <script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.2.1/jquery.js"></script>
    <script src="app.js"></script>
    <style>
        body {
            background-color: #f5f5f5;
        }
        #main-content {
            max-width: 940px;
            padding: 2em 3em;
            margin: 0 auto 20px;
            background-color: #fff;
            border: 1px solid #e5e5e5;
            -webkit-border-radius: 5px;
            -moz-border-radius: 5px;
            border-radius: 5px;
        }
    </style>
</head>
<body>
<div id="main-content" class="container">
    <div class="row">
        <div class="col-md-6">
            <form class="form-inline">
                <div class="form-group">
                    <input id="userId" value="Admin" hidden>
                    <label for="connect">建立连接通道:</label>
                    <button id="connect" class="btn btn-default" type="submit">Connect</button>
                    <button id="disconnect" class="btn btn-default" type="submit" disabled="disabled">Disconnect
                    </button>
                </div>
            </form>
        </div>
        <div class="col-md-6">
            <form class="form-inline">
                <div class="form-group">
                    <label>发布新公告</label>
                    <input type="text" id="content" class="form-control" value="" placeholder="发言框..">
                </div>
                <button id="send" class="btn btn-default" type="submit">发布</button>
            </form>
        </div>
    </div>
    <div class="row" style="margin-top: 30px">
        <div class="col-md-12">
            <table id="userlist" class="table table-striped">
                <thead>
                <tr>
                    <th>实时在线用户列表<span id="onLineUserCount"></span></th>
                </tr>
                </thead>
                <tbody id='online'>
                </tbody>
            </table>
        </div>
        <div class="col-md-12">
            <table id="conversation" class="table table-striped">
                <thead>
                <tr>
                    <th>游戏公告内容</th>
                </tr>
                </thead>
                <tbody id="notice">
                </tbody>
            </table>
        </div>
    </div>
</div>
</body>
</html>

第二个文件:user.html

<!DOCTYPE html>
<html>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<head>
    <title>User1 Hello WebSocket</title>
    <link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.4.1/css/bootstrap.min.css" rel="stylesheet">
    <script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.2.1/jquery.js"></script>
    <script src="app.js"></script>
    <style>
        body {
            background-color: #f5f5f5;
        }
        #main-content {
            max-width: 940px;
            padding: 2em 3em;
            margin: 0 auto 20px;
            background-color: #fff;
            border: 1px solid #e5e5e5;
            -webkit-border-radius: 5px;
            -moz-border-radius: 5px;
            border-radius: 5px;
        }
    </style>
</head>
<body>
<div id="main-content" class="container">
    <div class="row">
        <div class="col-md-6">
            <form class="form-inline">
                <div class="form-group">
                    <input id="userId" value="user1" hidden>
                    <label for="connect">建立连接通道:</label>
                    <button id="connect" class="btn btn-default" type="submit">Connect</button>
                    <button id="disconnect" class="btn btn-default" type="submit" disabled="disabled">Disconnect
                    </button>
                </div>
            </form>
        </div>

    </div>
    <div class="row" style="margin-top: 30px">
        <div class="col-md-12">
            <table id="userlist" class="table table-striped">
                <thead>
                <tr>
                    <th>实时在线用户列表<span id="onLineUserCount"></span></th>
                </tr>
                </thead>
                <tbody id='online'>
                </tbody>
            </table>
        </div>
        <div class="col-md-12">
            <table id="conversation" class="table table-striped">
                <thead>
                <tr>
                    <th>游戏公告内容</th>
                </tr>
                </thead>
                <tbody id="notice">
                </tbody>
            </table>
        </div>
    </div>
</div>
</body>
</html>

最后重点的Js文件:app.js。 将其与admin.html、user.html放在同一个目录下即可引用

var socket;

function setConnected(connected) {
    $("#connect").prop("disabled", connected);
    $("#disconnect").prop("disabled", !connected);
    if (connected) {
        $("#conversation").show();
    } else {
        $("#conversation").hide();
    }
    $("#notice").html("");
}
// WebSocket 服务操作 
function openSocket() {
    if (typeof (WebSocket) == "undefined") {
        console.log("浏览器不支持WebSocket");
    } else {
        console.log("浏览器支持WebSocket");
        //实现化WebSocket对象,指定要连接的服务器地址与端口  建立连接
        if (socket != null) {
            socket.close();
            socket = null;
        }
        // ws 为websocket连接标识,localhost:9999 为SpringBoot的连接地址,webSocket 为后端配置的前缀, userId 则是参数
        socket = new WebSocket("ws://localhost:9999/webSocket/" + $("#userId").val());
        //打开事件
        socket.onopen = function () {
            console.log("websocket已打开");
            setConnected(true)
        };
        //获得消息事件
        socket.onmessage = function (msg) {
            const msgDto = JSON.parse(msg.data);
            console.log(msg)
            showContent(msgDto);
            showOnlineUser(msgDto.onlineUser);
        };
        //关闭事件
        socket.onclose = function () {
            console.log("websocket已关闭");
            setConnected(false)
            removeOnlineUser();
        };
        //发生了错误事件
        socket.onerror = function () {
            setConnected(false)
            console.log("websocket发生了错误");
        }
    }
}

//2、关闭连接
function disconnect() {
    if (socket !== null) {
        socket.close();
    }
    setConnected(false);
    console.log("Disconnected");
}

function sendMessage() {
    if (typeof (WebSocket) == "undefined") {
        console.log("您的浏览器不支持WebSocket");
    } else {
        var msg = '{"uid":"' + $("#userId").val() + '", "toUId": null, "content":"' + $("#content").val() + '"}';
        console.log("向服务端发送消息体:" + msg);
        socket.send(msg);
    }
}

// 订阅的消息显示在客户端指定位置
function showContent(serverMsg) {
    $("#notice").html("<tr><td>" + serverMsg.uid + ": </td> <td>" + serverMsg.content + "</td><td>" + serverMsg.dateTime + "</td></tr>" +  $("#notice").html())
}

//显示实时在线用户
function showOnlineUser(serverMsg) {
    if (null != serverMsg) {
        let html = '';
        for (let i = 0; i < serverMsg.length; i++) {
            html += "<tr><td>" + serverMsg[i] + "</td></tr>";
        }
        $("#online").html(html);
        $("#onLineUserCount").html(" ( " + serverMsg.length + " )");
    }
}

//显示实时在线用户
function removeOnlineUser() {
    $("#online").html("");
    $("#onLineUserCount").html("");
}

$(function () {
    $("form").on('submit', function (e) {
        e.preventDefault();
    });
    $("#connect").click(function () {
        openSocket();
    });
    $("#disconnect").click(function () {
        disconnect();
    });
    $("#send").click(function () {
        sendMessage();
    });
});

打开admin.html和user.html页面:

分别点击Connect连接Websocket服务,然后使用admin页面的[发布新通告]进行消息发布:

这里只实现了管理员群发消息,没有实现一对一的聊天,可以通过修改用户列表的样式,增加一对一聊天的功能。在app.js里,发送消息时指定发送对象字段 [toUId] 就可以实现一对一聊天了。

到此SpringBoot集成Websocket就已经完成了,如果要实现聊天或者拼单等一些实时性要求比较高的功能,可以通过更改WebSocketServer文件的逻辑,接入业务代码。

最后建议将WebSocket服务与业务代码运行环境分开,使用不同的容器,最后再根据流量大小决定是否需要扩容。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM