概述
這是關於 Swoole 學習的第七篇文章:Swoole RPC 的實現。
- 第六篇:Swoole 整合成一個小框架
- 第五篇:Swoole 多協議 多端口 的應用
- 第四篇:Swoole HTTP 的應用
- 第三篇:Swoole WebSocket 的應用
- 第二篇:Swoole Task 的應用
- 第一篇:Swoole Timer 的應用
有位讀者說 “上篇文章,下載代碼后直接運行成功,代碼簡潔明了,簡直是 Swoole 入門最好的 Demo ”。
“哈哈哈...”
還有讀者說 “有一起學習的組織群嗎,可以在里面進行疑難答疑?”
這個還真沒有,總覺得維護一個微信群不容易,因為自己本身就不愛在群里說話,另外,自己也在很多微信群中,開始氛圍挺好的,大家都聊聊技術,后來技術聊的少了改成聊八卦啦,再后來慢慢就安靜了,還有在群里起沖突的...
當然我也知道維護一個微信群的好處是非常大的,如果有這方面經驗的同學,咱們一起交流交流 ~
還有出版社找我寫書的.
他們也真是放心,我自己肚子里幾滴墨水還是知道的,目前肯定是不行,以后嘛,再說。
還有一些大佬加了微信,可能是出於對晚輩的提攜吧,偷偷告訴你,從大佬的朋友圈能學到很多東西。
我真誠的建議,做技術的應該自己多總結總結,將自己會的東西寫出來分享給大家,先不說給別人帶來太多的價值,反正對自己的幫助是非常非常大的,這方面想交流的同學,可以加我,我可以給你無私分享。
可能都會說時間少,時間只要擠,總會有的,每個人都 24 小時,時間對每個人是最公平的。說到這推薦大家讀一下《暗時間》這本書,這是我這本書的 讀書筆記,大家可以瞅瞅。
開始今天的文章吧,這篇文章實現了一個簡單的 RPC 遠程調用,在實現之前需要先了解什么是 RPC,不清楚的可以看下之前發的這篇文章 《我眼中的 RPC》。
下面的演示代碼主要使用了 Swoole 的 Task 任務池,通過 OnRequest/OnReceive 獲得信息交給 Task 去處理。
舉個工作中的例子吧,在電商系統中的兩個模塊,個人中心模塊和訂單管理模塊,這兩個模塊是獨立部署的,可能不在一個機房,可能不是一個域名,現在個人中心需要通過 用戶ID 和 訂單類型 獲取訂單數據。
實現效果
客戶端
- HTTP 請求
//代碼片段
<?php
$demo = [
'type' => 'SW',
'token' => 'Bb1R3YLipbkTp5p0',
'param' => [
'class' => 'Order',
'method' => 'get_list',
'param' => [
'uid' => 1,
'type' => 2,
],
],
];
$ch = curl_init();
$options = [
CURLOPT_URL => 'http://10.211.55.4:9509/',
CURLOPT_POST => 1,
CURLOPT_POSTFIELDS => json_encode($demo),
];
curl_setopt_array($ch, $options);
curl_exec($ch);
curl_close($ch);
- TCP 請求
//代碼片段
$demo = [
'type' => 'SW',
'token' => 'Bb1R3YLipbkTp5p0',
'param' => [
'class' => 'Order',
'method' => 'get_list',
'param' => [
'uid' => 1,
'type' => 2,
],
],
];
$this->client->send(json_encode($demo));
請求方式
-
SW 單個請求,等待結果
發出請求后,分配給 Task ,並等待 Task 執行完成后,再返回。
-
SN 單個請求,不等待結果
發出請求后,分配給 Task 之后,就直接返回。
發送數據
$demo = [
'type' => 'SW',
'token' => 'Bb1R3YLipbkTp5p0',
'param' => [
'class' => 'Order',
'method' => 'get_list',
'param' => [
'uid' => 1,
'type' => 2,
],
],
];
- type 同步/異步設置
- token 可進行權限驗證
- class 請求的類名
- method 請求的方法名
- uid 參數一
- type 參數二
返回數據
- request_method 請求方式
- request_time 請求開始時間
- response_time 請求結束時間
- code 標識
- msg 標識值
- data 約定數據
- query 請求參數
代碼
OnRequest.php
<?php
if (!defined('SERVER_PATH')) exit("No Access");
class OnRequest
{
private static $query;
private static $code;
private static $msg;
private static $data;
public static function run($serv, $request, $response)
{
try {
$data = decrypt($request->rawContent());
self::$query = $data;
if (empty($data)) {
self::$code = '-1';
self::$msg = '非法請求';
self::end($request, $response);
}
//TODO 驗證Token
switch ($data['type']) {
case 'SW': //單個請求,等待結果
$task = [
'request' => $data,
'server' => 'http'
];
$rs = $serv->task(json_encode($task), -1, function ($serv, $task_id, $rs_data) use ($request, $response) {
self::$code = '1';
self::$msg = '成功';
self::$data = $rs_data['response'];
self::end($request, $response);
});
if ($rs === false) {
self::$code = '-1';
self::$msg = '失敗';
self::end($request, $response);
}
break;
case 'SN': //單個請求,不等待結果
$task = [
'request' => $data,
'server' => 'http'
];
$rs = $serv->task(json_encode($task));
if ($rs === false) {
self::$code = '-1';
self::$msg = '失敗';
} else {
self::$code = '1';
self::$msg = '成功';
}
self::end($request, $response);
break;
default:
self::$code = '-1';
self::$msg = '非法請求';
self::end($request, $response);
}
} catch(Exception $e) {
}
}
private static function end($request = null, $response = null)
{
$rs['request_method'] = $request->server['request_method'];
$rs['request_time'] = $request->server['request_time'];
$rs['response_time'] = time();
$rs['code'] = self::$code;
$rs['msg'] = self::$msg;
$rs['data'] = self::$data;
$rs['query'] = self::$query;
$response->end(json_encode($rs));
self::$data = [];
return;
}
}
OnReceive.php
<?php
if (!defined('SERVER_PATH')) exit("No Access");
class OnReceive
{
private static $request_time;
private static $query;
private static $code;
private static $msg;
private static $data;
public static function run($serv, $fd, $reactor_id, $data)
{
try {
self::$request_time = time();
$data = decrypt($data);
self::$query = $data;
//TODO 驗證Token
switch ($data['type']) {
case 'SW': //單個請求,等待結果
$task = [
'fd' => $fd,
'request' => $data,
'server' => 'tcp',
'request_time' => self::$request_time,
];
$rs = $serv->task(json_encode($task));
if ($rs === false) {
self::$code = '-1';
self::$msg = '失敗';
self::handlerTask($serv, $fd);
}
break;
case 'SN': //單個請求,不等待結果
$task = [
'fd' => $fd,
'request' => $data,
'server' => 'tcp',
'request_time' => self::$request_time,
];
$rs = $serv->task(json_encode($task));
if ($rs === false) {
self::$code = '-1';
self::$msg = '失敗';
} else {
self::$code = '1';
self::$msg = '成功';
}
self::handlerTask($serv, $fd);
break;
default:
self::$code = '-1';
self::$msg = '非法請求';
self::handlerTask($serv, $fd);
}
} catch(Exception $e) {
}
}
private static function handlerTask($serv, $fd)
{
$rs['request_method'] = 'TCP';
$rs['request_time'] = self::$request_time;
$rs['response_time'] = time();
$rs['code'] = self::$code;
$rs['msg'] = self::$msg;
$rs['data'] = self::$data;
$rs['query'] = self::$query;
$serv->send($fd, json_encode($rs));
}
}
小結
Demo 代碼僅供參考,里面有很多不嚴謹的地方!
服務的調用方與提供方中間需要有一個服務注冊中心,很顯然上面的代碼中沒有,需要自己去實現。
服務注冊中心,負責管理 IP、Port 信息,提供給調用方使用,還要能負載均衡和故障切換。
根據自己的情況,服務注冊中心實現可容易可復雜,用 Redis 也行,用 Zookeeper、Consul 也行。
感興趣的也可以了解下網關 Kong ,包括 身份認證、權限認證、流量控制、監控預警...
再推薦一個 Swoole RPC 框架 Hprose,支持多語言。
就到這了,上面的 Demo 需要源碼的,加我微信。(菜單-> 加我微信-> 掃我)
本文歡迎轉發,轉發請注明作者和出處,謝謝!