場景
目前做了一個接口:邀請用戶成為某課程的管理員,於是我感覺有能在用戶被邀請之后能有個立馬通知他本人的機(類似微博、朋友圈被點贊后就有立馬能收到通知一樣),於是就琢磨琢磨搞了一套。
涉及技術棧
- Springboot
- Websocket 協議
- JWT
- (非必要)RabbitMQ 消息中間件
Websocket 協議
⭐推薦閱讀:Websocket 協議簡介
WebSocket協議是基於TCP的一種新的網絡協議。它實現了瀏覽器與服務器全雙工(full-duplex)通信——允許服務器主動發送信息給客戶端。
為什么使用Websocket?
因為普通的http協議一個最大的問題就是:通信只能由客戶端發起,服務器響應(半雙工),而我們希望可以全雙工通信。
因此一句話總結就是:建立websocket(以下簡稱為ws)連接是為了讓服務器主動向前端發消息,而無需等待前端的發起請求調用接口。
業務邏輯
我們現在有:
用戶A用戶BSpringboot服務器- 場景:
用戶A調用接口邀請用戶B成為課程成員 - 涉及數據庫
MySQL的數據表:course_member_invitation,記錄課程邀請記錄,其形式如下(忽略時間等列):
| id | course_id | account_id | admin_id | is_accepted | bind_message_id |
|---|---|---|---|---|---|
| 邀請id | 課程id | 受邀用戶id | 邀請人id(因其本身為課程管理員) | 受邀用戶是否接受了邀請 | 綁定的消息id |
course_message,記錄消息記錄,其形式如下(忽略時間等列):
| id | type | account_id | source_id | is_read | is_ignored |
|---|---|---|---|---|---|
| 消息id | 消息類型 | 收信人用戶id | 發信人用戶id | 是否已讀 | 收信人是否忽略 |
- (圖中沒有體現)
course_message_type,記錄消息類型,其形式如下
| id | name | description |
|---|---|---|
| 消息類型id | 消息類型名稱 | 描述 |
- 涉及
RabbitMQ(因不是重點,所以此處暫不討論,最后一章敘述)

業務步驟主要涉及兩個方法addCourseMemberInvitation與sendMessage和一個組件CourseMemberInvitationListener,分別做:
addCourseMemberInvitation:
用戶A調用接口,邀請用戶B成為某門課程的管理員Springboot服務器收到請求,將這一請求生成邀請記錄、消息記錄,寫入下表:course_member_invitationcourse_message
- 寫入DB后,調用
sendMessage處理發送消息的業務。 - 將執行的結果返回給
用戶A
sendMessage:
- 將消息記錄放入
RabbitMQ中對應的消息隊列。
CourseMemberInvitationListener:
- 持續監聽其綁定的消息隊列
- 一旦消息隊列中有新消息,就嘗試通過ws連接發送消息。
- 若
用戶B在線,則可發送。 - 否則,則消費掉該消息,待用戶上線后從DB中讀入。
- 若
在Springboot中配置Websocket
pom.xml文件
<!-- WebSocket相關 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
Websocket Server組件配置初步:com.xxxxx.course.webSocket.WebSocketServer
/**
* 進行前后端即時通信
* https://blog.csdn.net/qq_33833327/article/details/105415393
* session: https://www.codeleading.com/article/6950456772/
* @author jojo
*/
@ServerEndpoint(value = "/ws/{uid}",configurator = WebSocketConfig.class) //響應路徑為 /ws/{uid} 的連接請求
@Component
public class WebSocketServer {
/**
* 靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的
*/
private static int onlineCount = 0;
/**
* concurrent 包的線程安全Set,用來存放每個客戶端對應的 myWebSocket對象
* 根據 用戶id 來獲取對應的 WebSocketServer 示例
*/
private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**
* 與某個客戶端的連接會話,需要通過它來給客戶端發送數據
*/
private Session session;
/**
* 用戶id
*/
private String accountId ="";
/**
* logger
*/
private static Logger LOGGER = LoggerUtil.getLogger();
/**
* 連接建立成功調用的方法
*
* @param session
* @param uid 用戶id
*/
@OnOpen
public void onOpen(Session session, @PathParam("uid") String uid) {
this.session = session;
//設置超時,同httpSession
session.setMaxIdleTimeout(3600000);
this.accountId = uid;
//存儲websocket連接,存在內存中,若有同一個用戶同時在線,也會存,不會覆蓋原有記錄
webSocketMap.put(accountId, this);
LOGGER.info("webSocketMap -> " + JSON.toJSONString(webSocketMap.toString()));
addOnlineCount(); // 在線數 +1
LOGGER.info("有新窗口開始監聽:" + accountId + ",當前在線人數為" + getOnlineCount());
try {
sendMessage(JSON.toJSONString("連接成功"));
} catch (IOException e) {
e.printStackTrace();
throw new ApiException("websocket IO異常!!!!");
}
}
/**
* 關閉連接
*/
@OnClose
public void onClose() {
if (webSocketMap.get(this.accountId) != null) {
webSocketMap.remove(this.accountId);
subOnlineCount(); // 人數 -1
LOGGER.info("有一連接關閉,當前在線人數為:" + getOnlineCount());
}
}
/**
* 收到客戶端消息后調用的方法
* 這段代碼尚未有在使用,可以先不看,在哪天有需求時再改寫啟用
* @param message 客戶端發送過來的消息
* @param session
*/
@OnMessage
public void onMessage(String message, Session session) {
LOGGER.info("收到來自用戶 [" + this.accountId + "] 的信息:" + message);
if (!StringTools.isNullOrEmpty(message)) {
try {
// 解析發送的報文
JSONObject jsonObject = JSON.parseObject(message);
// 追加發送人(防竄改)
jsonObject.put("fromUserId", this.accountId);
String toUserId = jsonObject.getString("toUserId");
// 傳送給對應 toUserId 用戶的 WebSocket
if (!StringTools.isNullOrEmpty(toUserId) && webSocketMap.containsKey(toUserId)) {
webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
} else {
// 否則不在這個服務器上,發送到 MySQL 或者 Redis
LOGGER.info("請求的userId:" + toUserId + "不在該服務器上");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
LOGGER.error("用戶錯誤:" + this.accountId + ",原因:" + error);
}
/**
* 實現服務器主動推送
*
* @param message 消息字符串
* @throws IOException
*/
public void sendMessage(String message) throws IOException {
//需要使用同步機制,否則多並發時會因阻塞而報錯
synchronized(this.session) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
LOGGER.error("發送給用戶 ["+this.accountId +"] 的消息出現錯誤",e.getMessage());
throw e;
}
}
}
/**
* 點對點發送
* 指定用戶id
* @param message 消息字符串
* @param userId 目標用戶id
* @throws IOException
*/
public static void sendInfo(String message, String userId) throws Exception {
Iterator entrys = webSocketMap.entrySet().iterator();
while (entrys.hasNext()) {
Map.Entry entry = (Map.Entry) entrys.next();
if (entry.getKey().toString().equals(userId)) {
webSocketMap.get(entry.getKey()).sendMessage(message);
LOGGER.info("發送消息到用戶id為 [" + userId + "] ,消息:" + message);
return;
}
}
//錯誤說明用戶沒有在線,不用記錄log
throw new Exception("用戶沒有在線");
}
private static synchronized int getOnlineCount() {
return onlineCount;
}
private static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
private static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
幾點說明:
- onOpen方法:服務器與前端建立ws連接成功時自動調用。
- sendInfo方法:是服務器通過用戶id向指定用戶發送消息的方法,其為靜態公有方法,因此可供各service調用。調用的例子:
// WebSocket 通知前端
try {
//調用WebsocketServer向目標用戶推送消息
WebSocketServer.sendInfo(JSON.toJSONString(courseMemberInvitation),courseMemberInvitation.getAccountId().toString());
LOGGER.info("send to "+courseMemberInvitation.getAccountId().toString());
}
@ServerEndpoint注解:
@ServerEndpoint(value = "/ws/{uid}",configurator = WebSocketConfig.class) //響應路徑為 /ws/{uid} 的連接請求
這么注解之后,前端只用發起 ws://xxx.xxx:xxxx/ws/{uid} 即可開啟ws連接(或者wss協議,增加TLS), 比如前端js代碼這么寫:
<script>
var socket;
/* 啟動ws連接 */
function openSocket() {
if(typeof(WebSocket) == "undefined") {
console.log("您的瀏覽器不支持WebSocket");
}else{
console.log("您的瀏覽器支持WebSocket");
//實現化WebSocket對象,指定要連接的服務器地址與端口 建立連接
var socketUrl="http://xxx.xxx.xxx:xxxx/ws/"+$("#uid").val();
socketUrl=socketUrl.replace("https","ws").replace("http","ws"); //轉換成ws協議
console.log("正在連接:"+socketUrl);
if(socket!=null){
socket.close();
socket=null;
}
socket = new WebSocket(socketUrl);
/* websocket 基本方法 */
//打開事件
socket.onopen = function() {
console.log(new Date()+"websocket已打開,正在連接...");
//socket.send("這是來自客戶端的消息" + location.href + new Date());
};
//獲得消息事件
socket.onmessage = function(msg) {
console.log(msg.data);
//發現消息進入 開始處理前端觸發邏輯
};
//關閉事件
socket.onclose = function() {
console.log(new Date()+"websocket已關閉,連接失敗...");
//重新請求token
};
//發生了錯誤事件
socket.onerror = function() {
console.log("websocket連接發生發生了錯誤");
}
}
}
/* 發送消息 */
function sendMessage() {
if(typeof(WebSocket) == "undefined") {
console.log("您的瀏覽器不支持WebSocket");
}else {
console.log("您的瀏覽器支持WebSocket");
console.log('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
socket.send('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
}
}
</script>
存在的問題
一切看起來很順利,我只要放個用戶id進去,就可以想跟誰通訊就跟誰通訊咯!
但設想一個場景, 我是小明,uid為250,我想找uid為520的小花聊天,理論上我只要發起ws://xxx.xxx:xxxx/ws/250請求與服務器連接,小花也發起ws://xxx.xxx:xxxx/ws/520與服務器建立ws連接,我們就能互發消息了吧!
這時候出現了uid為1的小黃,他竟然想挖牆腳!?他竟然學過js,自己發了ws://xxx.xxx:xxxx/ws/520跟服務器建立ws連接,而小花根本不想和我發消息,所以實際上是小黃冒充了小花,把小花NTR了(實際上人家並不在乎😥),跟我愉快地聊天?!
那怎么辦啊?我怎么才能知道在跟我Websocket的究竟是美女小花還是黃毛小黃啊??!
這就引入了JWT!
JWT——JSON WEB TOKEN
可以看到后端會響應/ws/{token}的連接請求,前端可以發/ws/{token}的連接請求,一開始寫的時候看網上的都是用/ws/{userId}來建立該id的用戶與服務器的ws連接,但這樣的話可能就很不安全,無法保證使用某個id建立的ws確實就是真實用戶發起的連接。(小花被小黃NTR的悲慘故事)
所以在調研了很多公開的解決方案,看到有改用令牌(token)來建立ws連接的說法,同時驗證用戶身份(事實上一些其他接口也可以用令牌(token)來保證接口安全性),於是打算自己試試看,未必是最好的,甚至可能有點頭痛醫頭,腳痛醫腳,但總歸是個經驗,記錄一下。。
//Websocket Server
@ServerEndpoint(value = "/ws/{token}",configurator = WebSocketConfig.class) //響應路徑為 /ws/{token} 的連接請求
@Component
public class WebSocketServer {
...
}
js:
var socketUrl="http://xxx.xxx.xxx.xxx:xxxx/ws/"+$("#token").val();
socketUrl=socketUrl.replace("https","ws").replace("http","ws"); //轉換成ws協議
....
socket = new WebSocket(socketUrl);
業務邏輯

為什么用JWT
最初考慮的是用/ws/{userId}來建立ws連接,然后在后台拿session中的user來對比用戶id,判斷合法性。
結果發現ws的session和http的session是不同的,還不好拿,可能得想辦法把http的session存到redis或者DB(也可以存在內存中,只是可能又要消耗內存資源),在建立ws連接之前去拿出來驗證合法性。后面查到了還有JWT這種好東西。
JWT好在哪里?
⭐推薦閱讀:什么是 JWT -- JSON WEB TOKEN
我的總結:
- token可以過期
- 驗證token可以不用存在redis或者DB或者內存,完全依賴算法即可
- 只要從前端請求中拿到token,后端就可以根據封裝好的算法驗證這個token合不合法,有沒有被篡改過(這點很重要),過期了沒有
- 可以將用戶id、用戶名等非敏感數據一同封裝到token中,后端拿到token后可以解碼拿到數據,只要這個token合法,這些發來的數據就是可信的(小黃就算自己發明了token也不作數),是沒有被篡改的(小黃就算把我小花的token偷走把用戶id改成自己的也沒用,后台可以算出來被改過),可以建立ws連接,調用websocket server進行通訊。 教程
JWT教程 整合到本項目中
pom.xml
<!-- JWT 相關 -->
<dependency>
<groupId>com.auth0</groupId>
<artifactId>java-jwt</artifactId>
<version>3.12.1</version>
</dependency>
<!-- base64 -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.12</version>
</dependency>
token的前兩個部分是由base64編碼的,所以需要codec進行解碼。
實現一個JWT工具類
目前基本當作工具使用
com.xxxx.course.util.JWTUtil
/**
* @author jojo
* JWT 令牌工具類
*/
public class JWTUtil {
/**
* 默認本地密鑰
* @notice: 非常重要,請勿泄露
*/
private static final String SECRET = "doyoulikevanyouxi?" //亂打的
/**
* 默認有效時間單位,為分鍾
*/
private static final int TIME_TYPE = Calendar.MINUTE;
/**
* 默認有效時間長度,同http Session時長,為60分鍾
*/
private static final int TIME_AMOUNT = 600;
/**
* 全自定生成令牌
* @param payload payload部分
* @param secret 本地密鑰
* @param timeType 時間類型:按Calender類中的常量傳入:
* Calendar.YEAR;
* Calendar.MONTH;
* Calendar.HOUR;
* Calendar.MINUTE;
* Calendar.SECOND;等
* @param expiredTime 過期時間,單位由 timeType 決定
* @return 令牌
*/
public static String generateToken(Map<String,String> payload,String secret,int timeType,int expiredTime){
JWTCreator.Builder builder = JWT.create();
//payload部分
payload.forEach((k,v)->{
builder.withClaim(k,v);
});
Calendar instance = Calendar.getInstance();
instance.add(timeType,expiredTime);
//設置超時時間
builder.withExpiresAt(instance.getTime());
//簽名
return builder.sign(Algorithm.HMAC256(secret)).toString();
}
/**
* 生成token
* @param payload payload部分
* @return 令牌
*/
public static String generateToken(Map<String,String> payload){
return generateToken(payload,SECRET,TIME_TYPE,TIME_AMOUNT);
}
省略了重載方法....
/**
* 驗證令牌合法性
* @param token 令牌
* @return
*/
public static void verify(String token) {
//如果有任何驗證異常,此處都會拋出異常
JWT.require(Algorithm.HMAC256(SECRET)).build().verify(token);
}
/**
* 自定義密鑰解析
* @param token 令牌
* @param secret 密鑰
* @return 結果
*/
public static DecodedJWT parseToken(String token,String secret) {
DecodedJWT decodedJWT = JWT.require(Algorithm.HMAC256(secret)).build().verify(token);
return decodedJWT;
}
/**
* 解析令牌
* 當令牌不合法將拋出錯誤
* @param token
* @return
*/
public static DecodedJWT parseToken(String token) {
return parseToken(token,SECRET);
}
/**
* 解析令牌獲得payload,值為claims形式
* @param token
* @param secret
* @return
*/
public static Map<String,Claim> getPayloadClaims(String token,String secret){
DecodedJWT decodedJWT = parseToken(token,secret);
return decodedJWT.getClaims();
}
/**
* 默認解析令牌獲得payload,值為claims形式
* @param token 令牌
* @return
*/
public static Map<String,Claim> getPayloadClaims(String token){
return getPayloadClaims(token,SECRET);
}
/**
* 解析令牌獲得payload,值為String形式
* @param token 令牌
* @return
*/
public static Map<String,String> getPayloadString(String token,String secret){
Map<String, Claim> claims = getPayloadClaims(token,secret);
Map<String,String> payload = new HashMap<>();
claims.forEach((k,v)->{
if("exp".equals(k)){
payload.put(k,v.asDate().toString());
}
else {
payload.put(k, v.asString());
}
});
return payload;
}
/**
* 默認解析令牌獲得payload,值為String形式
* @param token 令牌
* @return
*/
public static Map<String,String> getPayloadString(String token){
return getPayloadString(token,SECRET);
}
/**
* 通過用戶實體生成令牌
* @param user 用戶實體
* @return
*/
public static String generateUserToken(Account user){
return generateUserToken(user.getId());
}
/**
* 通過用戶id生成令牌
* @param accountId 用戶id
* @return
*/
public static String generateUserToken(Integer accountId){
return generateUserToken(accountId.toString());
}
/**
* 通過用戶id生成令牌
* @param accountId 用戶id
* @return
*/
public static String generateUserToken(String accountId){
Map<String,String> payload = new HashMap<>();
payload.put("accountId",accountId);
return generateToken(payload);
}
/**
* 從令牌中解析出用戶id
* @param token 令牌
* @return
*/
public static String parseUserToken(String token){
Map<String, String> payload = getPayloadString(token);
return payload.get("accountId");
}
}
調整登陸 service 中,登陸時返回一個token
com.xxxx.course.service.impl.AccountServiceImpl
public JSONObject login(){
登陸成功...
...
//生成並放入通信令牌token,令牌中帶有用戶id,用以鑒別身份
String token = JWTUtil.generateUserToken(user);
jsonObject.put("token",token);
...
后續操作...
return jsonObject;
}
WebSocket 連接握手時進行身份驗證
之后前端只要攜帶token進行ws連接即可,寫了個ws的配置類,繼承了一個websocket連接的監聽器ServerEndpointConfig.Configurator,進行token的驗證。
com.XXXXX.course.config.webSocket.WebSocketConfig
/**
* 開啟 WebSocket 支持,進行前后端即時通訊
* https://blog.csdn.net/qq_33833327/article/details/105415393
* session配置:https://www.codeleading.com/article/6950456772/
* @author jojo
*/
@Configuration
public class WebSocketConfig extends ServerEndpointConfig.Configurator implements WebSocketConfigurer {
/**
* logger
*/
private static final Logger LOGGER = LoggerUtil.getLogger();
/**
* 監聽websocket連接,處理握手前行為
* @param sec
* @param request
* @param response
*/
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
String[] path = request.getRequestURI().getPath().split("/");
String token = path[path.length-1];
//todo 驗證用戶令牌是否有效
try {
JWTUtil.verify(token);
} catch (Exception e) {
LOGGER.info("攔截了非法連接",e.getMessage());
return;
}
super.modifyHandshake(sec, request, response);
}
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
...
}
這樣,每次服務器建立ws連接前,都要驗證token的合法性,僅僅通過JWTUtil.verify(token);即可!當token不合法,就會拋出異常。
再配合重寫websocket server的onOpen方法,應該就能進行身份可信的通信了!
/**
* 連接建立成功調用的方法
*
* @param session
* @param token 用戶令牌
*/
@OnOpen
public void onOpen(Session session, @PathParam("token") String token) {
this.session = session;
this.token = token;
//設置超時,同httpSession
session.setMaxIdleTimeout(3600000);
//解析令牌,拿取用戶信息
Map<String, String> payload = JWTUtil.getPayloadString(token);
String accountId = payload.get("accountId");
this.accountId = accountId;
//存儲websocket連接,存在內存中,若有同一個用戶同時在線,也會存,不會覆蓋原有記錄
webSocketMap.put(accountId, this);
LOGGER.info("webSocketMap -> " + JSON.toJSONString(webSocketMap.toString()));
addOnlineCount(); // 在線數 +1
LOGGER.info("有新窗口開始監聽:" + accountId + ",當前在線人數為" + getOnlineCount());
...
(非必須)RabbitMQ消息中間件
教程
- RabbitMQ 基本介紹+入門使用 : P14-P19。看完這個視頻 基本上能知道 RabbitMQ 是什么、怎么部署(推薦使用docker,學習時若使用centOS推薦使用7.x版本,
8.x我真的不會用)、怎么整合到springboot。
為什么我要用RabbitMQ
- 正經理由:
- 可以將寫DB與發送消息兩件事情異步處理,這樣響應會更快。
- 未來可以拓展為集群
- 真正理由:
- 人傻
- 閑的
整合到本項目中
pom.xml
<!-- rabbitMQ 相關-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitMQ 配置類
com.xxxx.course.config.rabbitMQ.RabbitMQConfig
/**
* @author jojo
*/
@Configuration
public class RabbitMQConfig {
/**
* 指定環境
*/
@Value("${spring.profiles.active}")
private String env;
/**
* logger
*/
public static final Logger LOGGER = LoggerUtil.getLogger();
/**
* 交換機名
*/
public String MEMBER_INVITATION_EXCHANGE = RabbitMQConst.MEMBER_INVITATION_EXCHANGE;
/**
* 交換機隊列
*/
public String MEMBER_INVITATION_QUEUE = RabbitMQConst.MEMBER_INVITATION_QUEUE;
/**
* 聲明 課程成員邀請消息 交換機
* @return
*/
@Bean("memberInvitationDirectExchange")
public Exchange memberInvitationDirectExchange(){
//根據項目環境起名,比如開發環境會帶dev字樣
String exchangeName = RabbitUtil.generateRabbitName(env,MEMBER_INVITATION_EXCHANGE);
return ExchangeBuilder.directExchange(exchangeName).durable(true).build();
}
/**
* 聲明 課程成員邀請消息 隊列
* @return
*/
@Bean("memberInvitationQueue")
public Queue memberInvitationQueue(){
//同上
String queueName = RabbitUtil.generateRabbitName(env,MEMBER_INVITATION_QUEUE);
return QueueBuilder.durable(queueName).build();
}
/**
* 課程成員邀請消息的隊列與交換機綁定
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding memberInvitationBinding(@Qualifier("memberInvitationQueue") Queue queue,@Qualifier("memberInvitationDirectExchange") Exchange exchange){
String queueName = RabbitUtil.generateRabbitName(env,MEMBER_INVITATION_QUEUE);
return BindingBuilder.bind(queue).to(exchange).with(queueName).noargs();
}
/**
* Springboot啟動時, 驗證隊列名根據環境命名正確
*/
@Bean
public void verify(){
Queue memberInvitationQueue = SpringUtil.getBean("memberInvitationQueue", Queue.class);
Exchange memberInvitationDirectExchange = SpringUtil.getBean("memberInvitationDirectExchange", Exchange.class);
LOGGER.info("消息隊列 ["+memberInvitationQueue.getName()+"] 創建成功");
LOGGER.info("消息交換器 ["+memberInvitationDirectExchange.getName()+"] 創建成功");
//放入映射中存儲
RabbitMQConst.QUEUE_MAP.put(MessageConst.MEMBER_INVITATION,memberInvitationQueue.getName());
RabbitMQConst.EXCHANGE_MAP.put(MessageConst.MEMBER_INVITATION,memberInvitationDirectExchange.getName());
}
/**
* 自定義messageConverter使得消息中攜帶的pojo序列化成json格式
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
項目運行后,在 RabbitMQ服務器 中就會出現剛剛注冊的隊列與交換器(圖是舊的,沒有體現根據環境命名隊列,但是其實做到了):

課程管理成員邀請接口
com.xxxx.course.service.impl.CourseMemberInvitationServiceImpl- 首先在接口中注入操作rabbitMQ的Bean
@Autowired
RabbitTemplate rabbitTemplate;
- 在課程管理員業務代碼中加入向rabbitMQ發送消息的邏輯
CourseMemberInvitationServiceImpl
@Override
@Transactional(rollbackFor = Exception.class) //開啟事務,以防萬一
public JSONObject addCourseMemberInvitation(Integer courseId, String username, String requestIp) {
//檢查課程是否存在
courseService.hasCourse(courseId);
//檢查用戶是否已加入課程平台
accountService.hasAccount(username);
/* 若存在則查看邀請記錄是否已經存在 */
//獲取用戶id
Account account = accountService.getAccountByUsernameOrEmail(username);
//檢查用戶名是否存在
if(account==null){
JSONObject result = new JSONObject();
result.put(RESULT,FAILED);
result.put(MSG,"用戶不存在");
return result;
}
Integer accountId = account.getId();
//獲得發出邀請人的id
Account user = (Account) SecurityUtils.getSubject().getSession().getAttribute("user");
Integer adminId = user.getId();
//檢查是否自己邀請自己,是則不再執行
hasInvitedOneself(accountId,adminId);
//檢查是否已經邀請過,是則不再執行
hasInvited(courseId,accountId,adminId);
/* 若不存在則新建邀請記錄 */
CourseMemberInvitation courseMemberInvitation = new CourseMemberInvitation();
courseMemberInvitation.setCourseId(courseId);
courseMemberInvitation.setAccountId(accountId);
courseMemberInvitation.setAdminId(adminId);
courseMemberInvitation.setCreateTime(new Date());
courseMemberInvitation.setCreateIp(requestIp);
//新建消息
CourseMessage courseMessage = courseMessageService.newMessage(MessageConst.MEMBER_INVITATION, accountId, adminId);
//綁定邀請記錄與消息記錄
courseMemberInvitation.setBindMessageId(courseMessage.getId());
//插入數據庫(這里用的是MybatisPlus)
int insertResult = courseMemberInvitationDao.insert(courseMemberInvitation);
//根據數據庫插入返回值封裝json
JSONObject result = insertCourseMemberInvitationResult(insertResult, courseMemberInvitation);
if(result.get(RESULT).equals(FAILED)){
//若數據庫操作沒有成功,則直接返回json
return result;
}
/* 發送消息 */
courseMessageService.sendMessage(courseMessage);
//根據插入情況返回json
return result;
}
courseMessageService中實現的sendMessage方法:
@Autowired
RabbitTemplate rabbitTemplate;
@Override
public void sendMessage(CourseMessage courseMessage) {
//嘗試發送
//將消息放入rabbitMQ
storeInRabbitMQ(courseMessage);
}
private void storeInRabbitMQ(CourseMessage courseMessage){
//將消息放入rabbitMQ
String exchangeName = (String) RabbitMQConst.EXCHANGE_MAP.get(courseMessage.getType());
String routeKey = (String) RabbitMQConst.QUEUE_MAP.get(courseMessage.getType());
try {
//送到rabbitMQ隊列中
rabbitTemplate.convertAndSend(exchangeName,routeKey,courseMessage);
}
catch (Exception e){
LOGGER.error("插入rabbitMQ失敗",e);
}
}
com.xxxx.course.rabbitMQ.listener.CourseMemberInvitationListener
該類是用以監聽_課程成員邀請_消息的,即是在rabbitMQ服務器建立的member_invitation隊列。
/**
* @author jojo
*/
@Component
public class CourseMemberInvitationListener {
@Autowired
MessageHandler messageHandler;
/**
* logger
*/
public static final Logger LOGGER = LoggerUtil.getLogger();
/**
* spEL表達式
* 一旦隊列中有新消息,這個方法就會被觸發
*/
@RabbitListener(queues = "#{memberInvitationQueue.name}")
public void listenCourseMemberInvitation(Message message){
messageHandler.handleMessage(message);
}
}
com.xxxx.course.rabbitMQ.MessageHandler, 該類是用來處理監聽事件的:
@Service
public class MessageHandler {
@Autowired
MessageConverter messageConverter;
@Autowired
RabbitTemplate rabbitTemplate;
/**
* logger
*/
public static final Logger LOGGER = LoggerUtil.getLogger();
/**
* 隊列消息處理業務
* @param message
*/
public void handleMessage(Message message){
CourseMessage courseMessage = (CourseMessage) messageConverter.fromMessage(message);
// WebSocket 通知前端
try {
//將消息發給指定用戶
WebSocketServer.sendInfo(JSON.toJSONString(courseMessage),courseMessage.getAccountId().toString());
} catch (Exception e) {
//消息存在數據庫中了,待用戶上線后再獲取
LOGGER.info("發送消息id為 ["+courseMessage.getId()+"] 的消息給->消息待收方id為 ["+courseMessage.getAccountId().toString()+"] 的用戶,但其不在線上。");
}
}
}
這樣做應該就可以用RabbitMQ了。
為什么不用HttpSession來驗證用戶真實性?
如果要驗證一個用戶的真實性,為什么不直接用HttpSession來驗證?比如在登錄時,將用戶的id存入HttpSession中,然后在Websocket握手連接之前,驗明前端通過/ws/{userId}中的userId與HttpSession中的用戶id進行比較就行了
這屬實是靈魂拷問了
經過自己實驗之后,發現按照很多方案,都沒辦法在ws連接的時候中,拿到HttpSession,比如在WebSocketConfig中按如下代碼:
@Override
20 public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
21 ...
//嘗試獲取HttpSession
HttpSession httpSession = (HttpSession)request.getHttpSession();
22 sec.getUserProperties().put(HttpSession.class.getName(),httpSession);
...
23 }
調試一番發現httpSession都是null,於是跟前端佬聊了聊,得到結論是ws跟http連接是兩個連接,當然拿不到啦
哎,我太菜了,就先這么認為好了
那或者能否把sessionId對應的userid存數據庫(mysql或者redis),每次ws連接時再驗證?
那就不如JWT了,JWT不需要存儲。
更理想的情況是,我能在WebSocketServer這個模塊中拿到httpSession的實體,這樣子驗證用戶真實性就很好做了,但是暫時還沒找到方法,需要再研究研究。
暫時想到這些,如果有更好的做法會再更新
總結
本文的難點是ws的認證問題,雖然用超級好用的JWT解決了,但是隨之而來的還有很多問題,比如:
- 注銷登錄等場景下 token 還有效
與之類似的具體相關場景有:- 退出登錄;
- 修改密碼;
- 服務端修改了某個用戶具有的權限或者角色;
- 用戶的帳戶被刪除/暫停。
- 用戶由管理員注銷;
- token 的續簽問題
token 有效期一般都建議設置的不太長,那么 token 過期后如何認證,如何實現動態刷新 token,避免用戶經常需要重新登錄?
這些問題還是有待解決是😂
本文就當記錄一下自己的胡作非為吧😂
總之,至少小花再也不怕被小黃NTR了

