关于easyswoole实现websocket聊天室的步骤解析


在去年,我们公司内部实现了一个聊天室系统,实现了一个即时在线聊天室功能,可以进行群组,私聊,发图片,文字,语音等功能,那么,这个聊天室是怎么实现的呢?后端又是怎么实现的呢?

后端框架

在后端框架上,我选用了php的easyswoole,easyswoole作为swoole中最简单易学的框架,上手简单,文档齐全,社区活跃

仙士可博客

 

直接通过easyswoole官方文档的例子,即可实现一个websocket服务器,并且还实现了对控制器的转发等:

https://www.easyswoole.com/Cn/Socket/webSocket.html

 

前后端通信协议

由于考虑到聊天室的业务逻辑复杂,我们使用了http+websocket 2种协议,分别用在以下几个地方:

登录注册,个人信息修改,好友申请等,使用http 接口实现

私聊,群聊消息推送,系统消息申请等,使用websocket即时推送

 

websocket即时推送封包方式

在websocket中,为了区分客户端不同的操作(发送群消息,发送私聊消息等),我们定义了一个数据格式:

1
2
3
4
5
op  命令
- args 额外参数
- msg 消息内容
- msgType 消息类型(默认为1)
- flagId 消息标识符(前端随机生成一个标识符,后台处理完该消息之后,会返回相同的标识符给与前端确认)

使用json字符串方式传递

同样,为了区分服务端不同的推送,我们定义了服务端的响应格式:

1
2
3
4
5
op  命令(响应类型)
- args 额外参数
- msg 消息内容(成功时为OK)
- msgType 消息类型(默认为1)
- flagId 将返回和前端一致的标识符,告知前端该次请求 成功/失败

 

例如:

1
2
3
4
5
6
7
## 发送消息
私聊消息:
`{ "op" :1001, "args" :{ "userId" :12}, "msg" : "test" , "flagId" :10086}`
将回复:
`{ "op" :1000, "args" :[], "msg" : "ok" , "flagId" :10086}`
目标用户将收到:
`{ "op" :1101, "args" :{ "fromUserId" : "12" , "msgId" :16}, "msg" : "test" }`

下文有许多op:xxx的数据,可以忽略xxx的数据,直接联系上下文获得op的命令类型

 

聊天记录存储

根据消息的类型,我们区分了 私聊消息,群消息,系统消息 3种消息,设计了3个表
为了使得客户端能够正常显示群消息,我们对群成员做了软删除处理,确保可以获取到群成员头像

用户可通过http接口,获得历史聊天记录

 

语音,图片,视频聊天

在上面我们可以看到,有一个msgType字段,它将决定了这条数据是文字消息,还是语音,视频

当msgType为语音类型时,msg将附带一个语音文件的地址(通过http接口上传文件,到oss或者服务器)

客户端进行判断,如果是语音,则下载文件,点击即可播放,视频,图片同理

 

心跳设置

由于tcp的特性,在长时间没有通信时,操作系统可能会自动对tcp连接进行销毁并且可能没有close事件提示,所以我们在websocket中提供了ping的命令,该命令发起后,服务器将响应pong,完成一次通信:

1
2
3
4
## ping
发送:直接给客户端发送  "ping" 即可
返回:
`{ "op" :1000, "args" :null, "msg" : "PONG" }`

 

网络不稳定推送问题

当服务端推送消息时,为了确保用户已经收到,提供了isRecv字段,默认为0
当用户A向用户B发送消息,服务器向B推送时,该条消息记录初始isRecv为0,只有当B客户端接收到消息,并且向服务器发送已接收命令时,才会置为1:

1
2
3
4
5
### 消息接收状态
`{ "op" :4002, "args" :{ "msgId" :42}, "msg" : "" , "flagId" :111}`
 
服务器将响应:
`{ "op" :1000, "args" :[], "msg" : "ok" , "msgType" :1, "flagId" :111}`

每次重新连接websocket服务时,可通过发起好友未读消息推送的命令,向服务器获得之前的未读消息(网络不稳定断线重连)

1
2
3
4
5
6
当ws连接成功时,可通过该命令获取所有的未读好友消息:
`{ "op" :4001, "args" :{ "userId" :null, "size" :5}, "msg" : "" , "flagId" :111}`
其中`userId` 为限制单独一个好友的未读消息,可不传
其中`size`为每次响应条数,默认为5,可不传
服务器将响应:
`{ "op" :4101, "args" :{ "total" :0, "list" :[]}, "msg" : "ok" , "msgType" :1, "flagId" :111}

每次推送完,都需要客户端遍历list,进行上面的已接收推送

 

聊天室流程讲解

整个聊天室流程为:

- 用户http接口登录获得授权

- 通过授权请求http接口获得好友列表,不同好友的最后一条未读消息以及未读消息数(用于首页显示)

- 通过授权请求获得群列表(群消息为了节省存储空间没有做已读未读)

- 建立ws链接

- 注册断线重连机制,当触发close事件时,重连ws

- 建立ping定时器,每隔30秒进行一次ping

- 通过ws接口,获得所有未读消息,客户端进行处理,推送到通知栏等

- 接收新消息推送,并显示到消息列表

- 当点击进某个群/好友消息界面时,自动获取最新n条消息,用户上拉时继续获取n条

 

不同设备数据同步

为了服务端性能问题,所有消息记录,好友消息,群成员消息将缓存到客户端,当用户登录成功时
直接显示之前登录时的所有状态(消息列表,最后一条消息显示等)
当新设备登录时,只获取未读消息列表,其他消息需要点击某个好友/群,才会进行显示

 

fd->userId对应

当用户登录成功时,我们使用了swoole的Table进行存储fd->userId以及userId->fd的对应

通过这2者对应的存储,我们可以通过userId找到fd进行推送数据,也可以通过fd找到userId获取用户消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
<?php
 
 
namespace  App\Utility;
 
 
use  EasySwoole\Component\Singleton;
use  Swoole\Table;
 
class  FdManager
{
     use  Singleton;
 
     private  $fdUserId ; //fd=>userId
     private  $userIdFd ; //userId=>fd
 
     function  __construct(int  $size  = 1024*256)
     {
         $this ->fdUserId =  new  Table( $size );
         $this ->fdUserId->column( 'userId' ,Table::TYPE_STRING,25);
         $this ->fdUserId->create();
         $this ->userIdFd =  new  Table( $size );
         $this ->userIdFd->column( 'fd' ,Table::TYPE_INT,10);
         $this ->userIdFd->create();
     }
 
     function  bind(int  $fd ,int  $userId )
     {
         $this ->fdUserId->set( $fd ,[ 'userId' => $userId ]);
         $this ->userIdFd->set( $userId ,[ 'fd' => $fd ]);
     }
 
     function  delete (int  $fd )
     {
         $userId  $this ->fdUserId( $fd );
         if ( $userId ){
             $this ->userIdFd->del( $userId );
         }
         $this ->fdUserId->del( $fd );
     }
 
     function  fdUserId(int  $fd ):?string
     {
         $ret  $this ->fdUserId->get( $fd );
         if ( $ret ){
             return  $ret [ 'userId' ];
         } else {
             return  null;
         }
     }
 
     function  userIdFd(int  $userId ):?int
     {
         $ret  $this ->userIdFd->get( $userId );
         if ( $ret ){
             return  $ret [ 'fd' ];
         } else {
             return  null;
         }
     }
}

 

同理,当需要群发消息时,只需要获得群成员的userId,即可获得当前所有在线成员的fd,进行遍历推送

服务端推送问题

当A客户端在群发送一条消息时,由于群成员可能有很多,如果直接同步推送给所有群成员,会造成A客户端等待响应时间过长的情况
所以需要使用task做异步推送:

当A客户端发送一条消息,先存入数据库,并调用task进行异步群发推送,同时给A客户端响应ok,代表接收到此消息

通过easyswoole的task组件,进行推送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
namespace  App\Task;
 
 
use  App\HttpController\Api\User\Message\GroupMessage;
use  App\HttpController\Api\User\Message\SystemMessage;
use  App\Model\Group\GroupUserModel;
use  App\Model\Message\GroupMessageModel;
use  App\Model\Message\SystemMessageModel;
use  App\Model\Message\UserMessageModel;
use  App\Utility\FdManager;
use  App\WebSocket\Command;
use  EasySwoole\EasySwoole\ServerManager;
use  EasySwoole\Task\AbstractInterface\TaskInterface;
 
//消息异步推送
class  WebSocketPush  implements  TaskInterface
{
     protected  $messageModel ;
 
     function  __construct( $messageModel )
     {
         $this ->messageModel =  $messageModel ;
     }
 
     function  run(int  $taskId , int  $workerIndex )
     {
         $message  $this ->messageModel;
         $result  = false;
         //好友消息
         if  ( $message  instanceof  UserMessageModel) {
             $result  $this ->friendMsg( $message );
         }
         //群组消息
         if  ( $message  instanceof  GroupMessageModel) {
             $result  $this ->groupMsg( $message );
         }
         //系统消息
         if  ( $message  instanceof  SystemMessageModel) {
             $result  $this ->systemMsg( $message );
         }
         return  $result ;
     }
}

 

websocket验权,提下线功能

用户在连接ws服务时,需要带上token进行验权,
服务端在onopen事件时,会进行token验权,如果验证失败则响应一条消息表示登录过期:

1
2
3
4
5
6
7
{
     "op" : -1003,
     "args" : [],
     "msg" "登陆状态失效" ,
     "msgType" : 1,
     "flagId" : null
}

 

当A用户在客户端1登录成功后,又在客户端2登录时,将给客户端1发送一条已被踢下线消息::

1
2
3
4
5
6
7
{
     "op" : -1002,
     "args" : [],
     "msg" "你的账号在其他设备登陆,你已被强制下线" ,
     "msgType" : 1,
     "flagId" : null
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
    static  function  onOpen(Server $server, \Swoole\Http\Request $request)
     {
         $session = $request->get[ 'userSession' ] ?? null;
         $user = new UserModel();
         if  (!empty($session)) {
             $user->userSession = $session;
             $info = $user->getOneBySession();
             if  (empty($info)) {
                 self::pushSessionError($request->fd);
                 ServerManager::getInstance()->getSwooleServer()->close($request->fd);
                 return  true ;
             }
             // 如果已经有设备登陆,则强制退出
             self::userClose($info->userId);
             FdManager::getInstance()->bind($request->fd, $info->userId);
             // 推送消息
//                 self::pushMessage($request->fd,$info->userId);
         else  {
             self::pushSessionError($request->fd);
             ServerManager::getInstance()->getSwooleServer()->close($request->fd);
         }
     }

 

关于客户端网络不稳定时候的情况解析

当客户端发送一条消息之前,需要生成一个flagId,发送消息时附带flagId

服务端响应消息时,会附带flagId

因此,当客户端发送消息时,新增一个flagId的定时器,当定时器到期却没有接收到服务端响应消息时,判断该条消息发送失败,显示红色感叹号,提示用户重发

当服务端响应成功时,将取消这个定时器,并直接将消息置为发送成功状态

 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM