GatewayWorker是基於Workerman開發的一個可分布式部署的TCP長連接框架,專門用於快速開發TCP長連接應用,例如app推送服務端、即時IM服務端、游戲服務端、物聯網、智能家居等等
文檔地址:http://www.workerman.net/gatewaydoc/
一、測試官方DEMO(Windows 版本)
1、下載demo
2、解壓到任意位置,我這里為:D:\phpStudy\PHPTutorial\WWW\GatewayWorker
3、進入GatewayWorker目錄
4、雙擊start_for_win.bat啟動。(如果出現錯誤請參考這里設置php環境變量),效果如下
5、命令行窗口運行 telnet 127.0.0.1 8282
,輸入任意字符即可聊天(非本機測試請將127.0.0.1替換成實際ip)。
PS:以上表示TCP連接測試成功
二、修改測試websocket
1、需要修改 start_gateway.php 指定websocket協議,像這樣
$gateway = new Gateway(websocket://0.0.0.0:7272);
2、重新啟動 start_for_win.bat
3、測試js
小結:只需要改動一個文件( start_gateway.php
)的協議和端口即可,別的不需用改動。
三、與ThinkPHP5.1框架結合
(一)服務端主動推送消息到客戶端
原則:
1、TP5.1框架項目與GatewayWorker獨立部署互不干擾
2、所有的業務邏輯都由網站(websocket連接的)頁面以post/get請求到TP5.1框架的控制器中完成
3、GatewayWorker不接受客戶端發來的數據,即GatewayWorker不處理任何業務邏輯,GatewayWorker僅僅當做一個單向的推送通道
4、僅當TP5.1框架需要向瀏覽器主動推送數據時才在TP5.1框架中調用Gateway的API(GatewayClient)完成推送
具體實現步驟
1、網站頁面建立與GatewayWorker的websocket連接
ws = new WebSocket("ws://127.0.0.1:7272");
2、GatewayWorker發現有頁面發起連接時,將對應連接的client_id發給網站頁面
Event.php 內容
public static function onConnect($client_id) { $resData = [ 'type' => 'init', 'client_id' => $client_id, 'msg' => 'connect is success' // 初始化房間信息 ]; Gateway::sendToClient($client_id, json_encode($resData)); }
index.html 內容
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>GatewayWorker的websocket連接</title> </head> <body> <h1>GatewayWorker的websocket連接</h1> <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script> <script type="text/javascript"> ws = new WebSocket("ws://127.0.0.1:7272"); // 服務端主動推送消息時會觸發這里的onmessage ws.onmessage = function(e){ // json數據轉換成js對象 var data = JSON.parse(e.data); console.log(data); var type = data.type || ''; switch(type){ // Events.php中返回的init類型的消息,將client_id發給后台進行uid綁定 case 'init': // 利用jquery發起ajax請求,將client_id發給后端進行uid綁定 $.post( "{:url('index/chat_room/bind')}", {client_id: data.client_id}, function(data) { console.log(data); }, 'json' ); break; case 'say': console.log('TP5 msg'+e.data); break; // 當mvc框架調用GatewayClient發消息時直接alert出來 default : alert(e.data); } }; </script> </body> </html>
3、網站頁面收到client_id后觸發一個ajax請求(index/chat_room/bind)將client_id發到TP5.0后端,bind方法
/*
* 用戶登錄后初始化以及綁定client_id
*/
public function bind()
{
// 設置GatewayWorker服務的Register服務ip和端口,請根據實際情況改成實際值
Gateway::$registerAddress = '127.0.0.1:1238';
$uid = $this->userId;
$group_id = $this->groupId;
$client_id = request()->param('client_id');
// client_id與uid綁定
Gateway::bindUid($client_id, $uid);
// 加入某個群組(可調用多次加入多個群組)
Gateway::joinGroup($client_id, $group_id);
}
4、后端收到client_id后利用GatewayClient調用Gateway::bindUid($client_id, $uid)
將client_id與當前uid(用戶id或者客戶端唯一標識)綁定。如果有群組、群發功能,也可以利用Gateway::joinGroup($client_id, $group_id)
將client_id加入到對應分組
連接成功后返回值
PS:以上返回值為 GatewayWorker服務 連接成功后返回的json數據
5、頁面發起的所有請求都直接post/get到mvc框架統一處理,包括發送消息
通過sendMessage發送消息(服務端主動推送消息到客戶端)
// mvc后端發消息 利用GatewayClient發送 Events.php
public function sendMessage()
{
// stream_socket_client(): unable to connect to tcp://127.0.0.1:1236
$uid = $this->userId;
$group = $this->groupId;
$message = json_encode([
'type'=>'say',
'msg'=>'Hello ThinkPHP5'
]);
// 設置GatewayWorker服務的Register服務ip和端口,請根據實際情況改成實際值
Gateway::$registerAddress = '127.0.0.1:1238';
// 向任意uid的網站頁面發送數據
Gateway::sendToUid($uid, $message);
// 向任意群組的網站頁面發送數據,如果開啟,則會向頁面發送兩條一樣的消息
//Gateway::sendToGroup($group, $message);
}
6、mvc框架處理業務過程中需要向某個uid或者某個群組發送數據時,直接調用GatewayClient的接口Gateway::sendToUid Gateway::sendToGroup
等發送即可
通過瀏覽器訪問sendMessage操作,測試結果
PS:以上的消息是TP5.0 通過 GatewayClient\Gateway 發送寫消息,和GatewayWorker服務沒有直接關系
以上為 服務端主動推送消息到客戶端
注意區分:
1、服務端主動推送消息到客戶端
2、客戶端推送消息到客戶端
(二)客戶端推送消息到客戶端
修改客戶端到客戶端的消息發送和接受,下面修改 GatewayWorker 的 Events.php(開發者只需要關注這個文件)
public static function onConnect($client_id)
{
$resData = [
'type' => 'init',
'client_id' => $client_id,
'msg' => 'connect is success' // 初始化房間信息
];
Gateway::sendToClient($client_id, json_encode($resData));
}
/**
* 當客戶端發來消息時觸發
* @param int $client_id 連接id
* @param mixed $message 具體消息
*/
public static function onMessage($client_id, $message)
{
// 服務端console輸出
//echo "msg : $message \r\n";
// 解析數據
$resData = json_decode($message, true);
$type = $resData['type'];
$roomId = $resData['roomId'];
$userId = $resData['userId']; // 未登錄,則傳遞一個隨機
$userName = $resData['userName']; // 未登錄,則傳遞一個隨機
$content = isset($resData['content']) ? $resData['content'] : 'default content';
//將時間全部置為服務器時間
$serverTime = date('Y-m-d H:i:s', time());
switch ($type) {
case 'join': // 用戶進入直播間
//將客戶端加入到某一直播間
Gateway::joinGroup($client_id, $roomId);
$resData = [
'type' => 'join',
'roomId' => $roomId,
'userName' => $userName,
'msg' => "enters the Room", // 發送給客戶端的消息,而不是聊天發送的內容
'joinTime' => $serverTime // 加入時間
];
// 廣播給直播間內所有人,誰?什么時候?加入了那個房間?
Gateway::sendToGroup($roomId, json_encode($resData));
break;
case 'say': // 用戶發表評論
$resData = [
'type' => 'say',
'roomId' => $roomId,
'userName' => $userName,
'content' => $content,
'commentTime' => $serverTime // 發表評論時間
];
// 廣播給直播間內所有人
Gateway::sendToGroup($roomId, json_encode($resData));
break;
case 'pong':
break; // 接收心跳
default:
//Gateway::sendToAll($client_id,$json_encode($resData));
break;
}
}
index.html 聊天室頁面
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>GatewayWorker的websocket連接</title>
</head>
<body>
<h1>GatewayWorker的websocket連接</h1>
<div class="row">
websocket send content:<input type="text" style="height: 50px; width: 100%;" name="data" id="data">
<p></p>
<button id="submit" onclick="sub()">send info</button>
<p></p>
<div id="output"></div>
</div>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
<script src="https://cdn.bootcss.com/reconnecting-websocket/1.0.0/reconnecting-websocket.min.js"></script>
<script language="javascript" type="text/javascript">
var wsUri = "ws://notes.env:7272/";
var outputContent;
var roomId = 'L06777';
var userId = 4840043;
var userName = 'Tinywan' + Math.random();
// 把當新鏈接的客戶端加入到當前直播間,消息類型:{"type":"join","roomId":"1002","userId":"88","userName":"userName"}
var joinContent = {
"type": "join",
"roomId": roomId,
"userId": userId,
"userName": userName
};
// 初始化頁面操作
function init() {
outputContent = document.getElementById("output");
initWebSocket();
}
function initWebSocket() {
websocket = new ReconnectingWebSocket(wsUri);
websocket.onopen = function (evt) {
onOpen(evt)
};
websocket.onclose = function (evt) {
onClose(evt)
};
websocket.onmessage = function (evt) {
onMessage(evt)
};
websocket.onerror = function (evt) {
onError(evt)
};
}
function onOpen(evt) {
console.log("CONNECTED");
}
// 接收數據
function onMessage(evt) {
var data = eval("(" + evt.data + ")");
var type = data.type || '';
switch (type) {
case 'init':
// 把當新鏈接的客戶端加入到當前直播間
console.log('-------init--------' + data);
websocket.send(JSON.stringify(joinContent));
writeToScreen('<span style="color: blue;">RESPONSE: ' + evt.data + '</span>');
break;
case 'join':
console.log('-------join--------' + data);
writeToScreen(
'<span style="color: blue;"> ' + ' 新用戶: ' + '</span>' +
'<span style="color: red;"> ' + data.userName + '</span>' +
'<span style="color: green;"> ' + data.joinTime + '</span>' +
'<span style="color: black;"> ' + data.msg + '</span>'
);
break;
case 'say':
console.log('say======' + data);
writeToScreen(
'<span style="color: blue;"> ' + ' Chat: ' + '</span>' +
'<span style="color: red;"> ' + data.userName + '</span>' +
'<span style="color: #D2691E;"> ' + data.commentTime + '</span>' +
'<span style="color: black;"> ' + data.content + '</span>'
);
break;
default :
console.log(data);
break;
}
}
function onError(evt) {
console.log('<span style="color: red;">ERROR:</span> ' + evt.data);
}
function onClose(evt) {
console.log("DISCONNECTED");
}
function writeToScreen(message) {
var pre = document.createElement("p");
pre.style.wordWrap = "break-word";
pre.innerHTML = message;
outputContent.appendChild(pre);
}
function sub() {
var text = document.getElementById('data').value;
// {"type":"say",,"msg":"Welcome 111111111111Live Room"}
var sayContent = {
"type": "say",
"roomId": roomId,
"userId": userId,
"userName": userName,
"content": text
};
websocket.send(JSON.stringify(sayContent));
}
window.addEventListener("load", init, false);
</script>
</body>
</html>
重啟開啟服務
測試結果
擴展:
可以把消息存儲的Redis中,通過Redis統計直播間的PV
$redis = new \Redis;
$redis->connect('127.0.0.1',6379);
$key = "PV:ROOM:".$roomId;
$field = "ROOM_TOTAL_PV";
// 進入房間的人數增長,自增 ,增加PV統計
$redis->hIncrBy($key,$field,1);