從零到一搭建一個群聊系統


 

前言

IM(Instant Messaging),也就是即時通訊。幾乎所有對實時性要求高的應用場景都需要IM技術的運用。比如聊天、直播、彈幕、實時位置共享、協同編輯/在線文檔、股票基金報價等。

本篇將帶大家從零開始搭建實現一個輕量群聊的完整閉環。客戶端用到的是vue+websocket通信,服務端用到是node的ws模塊通信,redis用於便於快速讀取在線狀態等數據的存取,MongoDB聊天消息等持久數據存儲。

已經實現的功能:進入聊天室,輸入臨時昵稱用於聊天區分(前端是暫時是存在瀏覽器的sessionStorage里,后端作為唯一用戶名存到了數據庫,輸入昵稱保存會校驗昵稱是否存在,后面可以根據需要通過擴展加上登錄等流程操作)

1. 正常群聊,保存消息記錄到數據庫

2. 用戶進入離開以及發送消息都有廣播提示

3. 消息發送失敗的重試機制

 

了解Socket和Websocket

Socket 是IM技術的重要組成部分,Socket是為了方便使用TCP或UDP而抽象出來的一層,是對TCP/IP協議的封裝,是位於應用層和傳輸控制層之間的一組接口

Websocket是為了滿足基於Web 的日益增長的實時通信需求而產生的,模仿socket的通信能力。但是和Socket不同的是,Websocket是基於TCP的應用層協議

WebSocket是一種在單個TCP連接上進行全雙工通信的協議。WebSocket 協議在2008年誕生,2011年成為國際標准。所有瀏覽器都已經支持了。WebSocket使得客戶端和服務器之間的數據交換變得更加簡單,允許服務端主動向客戶端推送數據。

在WebSocket API中,瀏覽器和服務器只需要完成一次握手,兩者之間就直接可以創建持久性的連接,並進行雙向數據傳輸。

傳統http通信只能由客戶端發起,做不到服務器主動推送消息,如果服務器有連續的狀態變化,只能用輪詢,而輪詢非常低效,浪費資源,所以才有了websocket

它的的最大特點就是,服務器可以主動向客戶端推送信息,客戶端也可以主動向服務器發送信息,是真正的雙向平等對話,屬於服務器推送技術的一種。其他特點包括:

  • 建立在 TCP 協議之上,服務器端的實現比較容易。
  • 與 HTTP 協議有着良好的兼容性。默認端口也是 80 和 443 ,並且握手階段采用 HTTP 協議,因此握手時不容易屏蔽,能通過各種 HTTP 代理服務器。
  • 數據格式比較輕量,性能開銷小,通信高效。
  • 可以發送文本,也可以發送二進制數據。
  • 沒有同源限制,客戶端可以與任意服務器通信。
  • 協議標識符是ws(如果加密,則為wss),服務器網址就是 URL。

總結:Socket是抽象接口,WebSocket與HTTP都是一樣基於TCP的應用層協議。WebSocket是雙向通信協議,模擬Socket,可以雙向發送或接受信息。HTTP是單向的。WebSocket在建立握手時,數據是通過HTTP傳輸的。但是建立之后,在真正傳輸時候不需要HTTP協議。

 

Websocket和Node ws的用法

WebSocket 的用法比較簡單 Websocket Api

var ws = new WebSocket("ws://localhost:3000"); 

ws.onopen = function(evt) { // onopen 連接成功后的回調函數
  console.log("Connection open ..."); // socket連接
  ws.send("Hello WebSockets!"); // send()方法用於向服務器發送數據
}; ws.onmessage = function(evt) { // 指定收到服務器數據后的回調函數 console.log( "Received Message: " + evt.data); // 接收服務端的信息 ws.close(); // 主動斷開和服務器的連接 }; ws.onclose = function(evt) {// onclose 連接關閉后的回調函數 console.log("Connection closed."); // socket連接斷開 }; ws.onerror = function(evt) {// onclose 連接錯誤的回調函數 console.log("Connection errored."); // socket因錯誤接斷開連接,例如有些信息不能發送 }; // 如果要指定多個回調函數可以用addEventListener ws.addEventListener('open', function (event) { ws.send('Hello Server!'); });

readyState屬性返回實例對象的當前狀態,共有四種

  • CONNECTING:值為0,表示正在連接。
  • OPEN:值為1,表示連接成功,可以通信了。
  • CLOSING:值為2,表示連接正在關閉。
  • CLOSED:值為3,表示連接已經關閉,或者打開連接失敗。
switch (ws.readyState) {
  case WebSocket.CONNECTING: // do something break; case WebSocket.OPEN: // do something break; case WebSocket.CLOSING: // do something break; case WebSocket.CLOSED: // do something break; default: // this never happens break; }

 node ws 模塊用起來也不難,參照 ws Api 文檔 很快就能搭起server啦

/**
 * Create HTTP server.
 */
const server = http.createServer(app)

/**
 * new WebSocket.Server(options[, callback]) 
 * @param {Object} options
 * @param {String} options.host 要綁定的服務器主機名
 *  @param {Number} options.port 要綁定的服務器端口
 *  @param  {Number} options.backlog 掛起連接隊列的最大長度.
 *  @param {http.Server|https.Server} options.server一個預創建的HTTP/S服務器  
 *  @param {Function} options.verifyClient 驗證傳入連接的函數。
 *  @param {Function} options.handleProtocols 處理子協議的函數。
 *  @param {String} options.path 只接受與此路徑匹配的連接
 *  @param {Boolean} options.noServer 啟用無服務器模式
 *  @param {Boolean} options.clientTracking 是否記錄連接clients
 *  @param {Boolean|Object} options.perMessageDeflate 開啟關閉zlib壓縮(配置)
 *  @param {Number} options.maxPayload 最大消息載荷大小(bytes)
 *  @param callback {Function}
 * 1. 創建一個新的服務器實例。必須提供端口、服務器或NoServer中的一個,否則會引發錯誤。
 * 2. 如果端口被設置,則自動創建、啟動和使用HTTP服務器。
 * 3. 要使用外部HTTP/S服務器,只指定服務器或NoServer。此時,必須手動啟動HTTP/S服務器。
 * 4. NoSver模式允許WS服務器與HTTP/S服務器完全分離。這使得,可以在多個WS服務器之間共享一個HTTP/S服務器
 */
const WebSocket = require('ws')
const wss = new WebSocket.Server({
  server
})

 

IM系統

IM按場景一般分為單聊和群聊。

單聊 

一個靠譜的IM系統,其核心就是消息的可靠性,及時觸達,以及系統安全性。安全性方面考慮的話需要對會話內容進行加密。

消息的可靠性,即消息的不丟失、不重復和不亂序,是IM系統中的一個難點。滿足這三點,才能有一個良好的聊天體驗。

1. 普通消息投遞流程

舉個例子: 用戶A給用戶B發送“你好”的文本消息,流程圖如下

 

  • 用戶A向服務端發送一個消息請求包msg: R
  • 服務端在成功處理后,回復用戶A一個ack消息響應包msg:A
  • 如果此時用戶B在線,則服務調研主動向用戶B發送一個消息通知包msg:N,如果用戶B不在線,則消息會存儲離線

 

客戶端發送消息的消息體一般包括幾個部分:

  • 消息類型(必填)對於IM系統來說消息類型肯定不止一種,前后端可統一定下消息類型,便於通信接收和識別
  • 消息唯一id(必填)前端生成消息id,后端存儲的時候以及接收去重等都要根據唯一id來判斷
  • 消息內容(非必填)可以是用戶通過輸入框或者上傳文件等用戶交互的方式發送消息,也可以是對用戶進入離開行為的監測
  • 發送時間(非必填)以服務端時間為准
  • 發送方唯一標識(必填)
  • 接收方唯一標識(如果是單聊必填)

消息體R的格式

{
  msg_type: 'TEXT', // 消息類型 文本消息
  msg_content: '你好', // 消息內容
  send_time: new Date().valueOf(),
  msg_id: uuid(), // 前端生成唯一id作為消息id
  from_id: 'liuyifei',
  to_id: 'pengyuyan'
}

服務端返回的消息體格式msg:A和通知給B的消息體格式msg:N

{
    code: 0,  // 狀態碼
    message: '接收成功',
    results: {
        msg_type: 'ACK',
        msg_content: '你好',
        msg_id: 'xxx',
        send_time:new Date().valueOf(),
        from_id: 'liuyifei',
        to_id: 'pengyuyan'
    } 
}

 

用戶A收到msg:A的響應的時候,把響應的msg_id從待發送隊列移除

 

2. 上述消息投遞流程容易出現的問題

從流程圖中可以看到,發送方用戶A收到msg:A后,只能說明服務端成功接收到了消息,並不能說明用戶B接收到了消息。在有些場景下,可能出現msg:N通知到B的包丟失,且發送方用戶A完全不知道,例如:

  • 服務器崩潰,msg:N包未發出
  • 網絡抖動,msg:N包被網絡設備丟棄
  • 客戶端B崩潰,msg:N包未接收

要想實現應用層的消息可靠投遞,必須加入應用層的確認機制,即:要想讓發送方用戶A確保接收方用戶B收到了消息,必須讓接收方用戶B給一個消息的確認,這個應用層的確認的流程,與消息的發送流程類似:

 

  • 客戶端B向服務端發送一個ack請求包,即ack:R
  • 服務端在成功處理后,回復用戶B一個ack響應包,即ack:A
  • 服務端主動向用戶A發送一個ack通知包,即ack:N
  • 圖中 msg:A是對msg:R的響應 ack:R是對ack:A的響應 ack:A是對ack:R的響應
用戶A: “你好” (msg:R)
服務器:我收到消息了(msg:A),我轉告給B (msg:N)
用戶B: 收到了消息,並回應服務器說我知道了 (ack:R)
服務器:告訴B,好了我知道你收到消息了(ack:A)
服務器:通知A,B已經收到消息了(ack:N)
 

這樣完成一個閉環,服務器作為中間人 確保發送方和接收方消息收發的正常,准確的觸達。

這是用戶B在線的情況,如果用戶B不在線的話,服務器會假裝B收到消息,返回ack給A,同時把離線消息存儲,下次B上線的時候會拉取離線消息。

 

如果一切都正常,這樣就完成一個完整的可靠的消息觸達機制,但是現實總是有很多不穩定因素存在的,容易丟失各種消息

1. msg:R,msg:A 可能丟失:
此時直接提示“發送失敗”即可,問題不大;

2. msg:N,ack:R,ack:A,ack:N這四個報文都可能丟失,此時用戶A都收不到期待的ack:N報文,即用戶A不能確認用戶B是否收到“你好”。

一個解決辦法:用戶A發出了msg:R,收到了msg:A之后,在一個期待的時間內,如果沒有收到ack:N,用戶A會嘗試將msg:R重發。可能用戶A同時發出了很多消息,故用戶A需要在本地維護一個等待ack隊列,並配合超時機制,來記錄哪些消息沒有收到ack:N,以定時重發。一旦收到了ack:N,說明用戶B收到了“你好”消息,對應的消息將從“等待ack隊列”中移除。

或者是:讓用戶主動點擊重發,重發的時候要確認匹配msg_id以便服務端識別去重,以免消息重復。也還是消息隊列方式實現。收到msg:A的響應即從等待隊列中移除。

 

群聊

一個im群聊的交互流程

 

 和單聊不同的是,群聊不存在一對一的關系,但要復雜的多,一個用戶發送消息,要廣播給其他所有用戶。這就意味着對於一個千人群來說,一個人發送的消息要推送給群里的1000個人,這里就要涉及到對高並發的處理了。

 

聊天室🌰

 1. 流程圖

 

臨時聊天室/直播間 就不存在用戶是否是離線,需要展示的是當前在線的人數,進入+1和離開/斷開-1

 那么就會遇到

第一個問題:如何知道用戶是在線還是離開了呢?

 一般來說都會實現心跳機制,客戶端一定時間間隔(例如5s)一次向服務端發送一個心跳PING,來告訴服務器我還在線。服務器收到心跳也會返回一個PONG的消息,告訴客戶端我收到了你的消息並且我服務器運轉正常,如果是持續的有來有回,服務器就知道有哪些用戶是在線的。

這個問題可以通過心跳機制配合Redis來解決,下面消息存儲會說到。

第二個問題就是如果消息發送失敗,該如何處理,也就是如何保證消息的可靠觸達?

提示用戶消息發送失敗只是第一步,但如果只是提示用戶發送消息失敗了,沒有一個重試機制的話,體驗不夠好。所以要解決的第二個問題就是如果消息發送失敗,可以點擊重發這條消息,這個時候消息id就起作用了,客戶端根據id匹配重發,服務端需要根據消息id來去重,保證消息得到准確的觸達。

 

2. 消息存儲機制

(1)對於在線人數的存儲

在線人數這種實時性較高,查詢頻率高的,就存到Redis緩存里。

本項目用到的是存儲一個集合 在用戶進入或者是刷新頁面重新進入的時候把用戶名存進去,離開檢測到close事件后把該用戶從集合中移除,這樣就能夠比較及時的獲取到實時在線人數了。

const userEnter = (type) => {
    if (type === 'enter') { // 用戶進入
      redisClient.sadd('users', user)
// 獲取集合的成員數 賦值給全局變量onlineCount 在每次初始化和發心跳包等消息的時候把它帶過去 客戶端收到后展示
      redisClient.scard('users', (err, data) => { 
        if (!err) {
          onlineCount = data
        }
      })
    }
    if (type === 'leave') { // 用戶離開
      redisClient.srem('users', user)
      redisClient.scard('users', (err, data) => {
        if (!err) {
          onlineCount = data
        }
      })
    }
  }

 (2) 在線離線狀態存儲--如何知道某個用戶是離開的

 客戶端的websocket斷聯觸發onclose事件,服務端監測到的close事件,會有幾種情況

  • 關閉窗口,正常斷開
  • 頁面刷新的時候也會斷開重連
  • 因為網絡和設備問題的異常斷開

這三種情況下,關閉窗口的離開和網絡異常的離開可以算作是用戶離開通知到其他用戶,但是頁面刷新重連並不能算作用戶離開的。

因為服務端是不能夠區分是正常斷聯還是異常斷聯,所以如果直接只在服務端close事件里處理用戶離開的話是不准確的

這里用到了時間戳比對,用的是redis hash來存儲,根據客戶端每次發來的心跳包記錄用戶最后連接的時間,如果最后連接時間大於某個時間間隔(比如6s)就判定該用戶是離線的狀態

const cb = (err, data) => {
  console.log('err: ', err, ' data: ', data, ' data type: ', typeof data)
}
redisClient.hset('myhash', user, new Date().valueOf(), cb)
    redisClient.hkeys('myhash', (err, replies) => {
      console.log(replies)
      replies.forEach(item => {
        redisClient.hget('myhash', item, (err, data) => {
          if (data) {
            const time = new Date().valueOf() - Number(data)
            if (time > 6000 && replies.indexOf(item) !== -1) {
              results = {
                msg_type: 'LEAVE',
                send_time: (new Date()).valueOf(),
                user_name: item
              }
              redisClient.hdel('myhash', item, cb)
              redisClient.hkeys('myhash', (err, res) => {
                if(res.indexOf(item)===-1){
                  boardCast(results)
                }
              })
              
            }
          }
        })
      })
    })

 

 (3)消息的永久存儲

進入離開消息通知和用戶發送的消息,會存儲到MongoDB數據庫,在刷新頁面或者新用戶進來的時候會拉取一遍歷史消息接口,這樣每個人都能夠

看到這個聊天室的歷史消息。(單聊群聊都可以用這種方式存儲離線消息) 

const boardCast = (results) => {
  wss.clients.forEach((client) => { // 廣播消息給所有客戶端 斷聯了 主動發消息給客戶端
    if (client.readyState === WebSocket.OPEN) {
      if (results && results.msg_type !== 'PING') {
        client.send(
          JSON.stringify({
            code: 0,
            message: '成功',
            results,
            onlineCount,
          })
        )
      }
    }
  })
  var message = new Message(results) // 歷史消息存數據庫
  message.save((err, res) => {
    if (err) {
      console.log("保存失敗:" + err)
      results = null
    } else {
      console.log("保存成功:" + res)
      results = res
    }
  })
}

 

 

代碼地址:

前端 https://github.com/leitingting08/im-vue

后端 https://github.com/leitingting08/im-node

 

參考:

阮一峰-Websocket教程

MDN-WebsocketAPI

IM消息送達保證機制實現(一):保證在線實時消息的可靠投遞

IM單聊和群聊中的在線狀態同步應該用“推”還是“拉”?

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM