基於swoole+Redis的消息實時推送通知


swoole+Redis將實時數據的推送

一 實現功能

設計師訂單如果設計師未搶單,超時(5分鍾)設計訂單時時給設計師派送,
設計師公眾號中收到派單信息
設計發布者收到派單成功信息

環境

centos6.10
redis-4.0.2
swoole-src-4.4.12
php-7.1.5
MYsyql5.7

在centos6默認是gcc-4.7,安裝swoole的時候需要升級到gcc-4.8

二 實現流程

1.開啟swoole server端監聽
2.開啟swoole client連接執行定時執行
3.使用swoole task 異步執行推送邏輯

開始監聽

服務端窗口

# php71 pushServer.php

client連接執行開始任務

客戶端窗口

# php71 pushClient.php start

默認start開啟5個client tcp鏈接,每個鏈接開啟一個1s定時器

開啟后服務端窗口的變化

[root@111111 swoole_server]# php71 pushServer.php 
Client-1: 連接成功
reactor-7 Client-1 接受數據: data=start 
Client-1: 連接結束
Client-2: 連接成功
reactor-0 Client-2 接受數據: data=start 
Client-3: 連接成功
Client-2: 連接結束
reactor-7 Client-3 接受數據: data=start 
Client-3: 連接結束
Client-4: 連接成功
reactor-0 Client-4 接受數據: data=start 
Client-4: 連接結束
Client-5: 連接成功
reactor-7 Client-5 接受數據: data=start 
Client-5: 連接結束
2019-12-14 01:29:15reactor-7 client-1 timer-2: 定時器第1 次執行

redis添加數據

向order_designer_list隊列中添加數據

127.0.0.1:6379> LPUSH order_designer_list 1912136916313##150481##1576140373##oLvqQwo25p5myELvO5VXj0k-7ngk##李偉測試##13926
(integer) 1
127.0.0.1:6379> LRANGE order_designer_list 0 10
1) "1912136916313##150481##1576140373##oLvqQwo25p5myELvO5VXj0k-7ngk##李偉測試##13926"

值為:
訂單號##orderId##分配到期時間(uninx時間戳)##微信openid##設計師用戶名##設計師userid

服務端窗口的變化

2019-12-14 01:29:15reactor-7 client-1 timer-2 taskid=0 :投遞異步任務 data=1912136916313_150481_1576140373_oLvqQwo25p5myELvO5VXj0k-7ngk_李偉測試_13926
2019-12-14 01:29:15執行任務 id=0  data= 1912136916313_150481_1576140373_oLvqQwo25p5myELvO5VXj0k-7ngk_李偉測試_13926 
任務完成 id=0 結果=1912136916313_150481_1576140373_oLvqQwo25p5myELvO5VXj0k-7ngk_李偉測試_13926已經派單過

task任務執行邏輯

數據分析
數據通過微信消息模板接口,將信息內容發送到客戶微信公眾號上
將監控日志和錯誤日志寫入mysql數據庫

三 項目代碼結構

├── composer.json
├── composer.lock
├── swoole_server
│   ├── 2019-12-13-swoole.log
│   ├── 2019-12-14-swoole.log
│   ├── DispatchOrder.php
│   ├── pushClient.php
│   ├── pushServer.php
│   ├── Runtime.php
└── vendor
│   ├── 忽略php類庫文件
│   ├── ....

conposer.json

{
    "require": {        
            "predis/predis": "^1.1",
        "catfan/medoo": "1.7.*"
    }
}

pushServer.php

<?php
require __DIR__ . '/../vendor/autoload.php';
require './DispatchOrder.php';

class swoolePush
{

    private $timerId;
    private $timeCount = 0;
    //微信消息模板id
    private $templateid = "C3HSuUJdS86p_4gj4xxth943DdE2zkE3IxnlrK5MFTI";

    public function receiveHandle($serv, $fd, $reactor_id, $data)
    {
        echo "reactor-{$reactor_id} Client-$fd 接受數據: data=$data " . PHP_EOL;
        if ('start' == trim($data)) {//開啟定時器
            $redisServ = new Swoole\Coroutine\Redis;
            $redisServ->connect('127.0.0.1', 6379);
            //每隔5000ms觸發一次
            $this->timerId = Swoole\Timer::tick(5000, function ($timer_id) use ($redisServ, $serv, $reactor_id, $fd) {
                $this->timeCount++;
                echo date("Y-m-d H:i:s") . "reactor-{$reactor_id} client-{$fd} timer-{$timer_id}: 定時器第{$this->timeCount} 次執行" . PHP_EOL;
                $item = $redisServ->lIndex("order_designer_list", -1);
                if ($item) {

                    //訂單號_orderId_分配到期時間(uninx時間戳)_openid_設計師用戶名_設計師userid
                    $order = explode('##', $item);
                    if ($order[2] < time()) {
                        //投遞異步任務
                        $task_id = $serv->task($item);
                        echo date("Y-m-d H:i:s") . "reactor-{$reactor_id} client-{$fd} timer-{$timer_id} taskid={$task_id} :投遞異步任務 data=$item" . PHP_EOL;
                        $redisServ->rPop("order_designer_list");
                    }
                }
            });
        } elseif ('stop' == trim($data)) {//清除定時器
            echo "reactor-{$reactor_id} client-{$fd} timer-{$this->timerId} 定時器結束" . PHP_EOL;;
            Swoole\Timer::clear($this->timerId);
        }

    }

    /**
     * 任務處理
     * @param $serv
     * @param $fd
     */
    public function taskHandle($serv, $task_id, $from_id, $data)
    {
        echo date("Y-m-d H:i:s") . "執行任務 id=$task_id  data= $data " . PHP_EOL;

        //訂單號_orderId_分配到期時間(uninx時間戳)_openid_設計師用戶名_設計師userid
        $order = explode('##', $data);
        $returnMsg = "";

        //派單邏輯
        $dispatchOrderObj = new DispatchOrder();
        if ($dispatchOrderObj->isOrderNeedGive($order[1])) {

            //開始派單 orderGive($orderId, $designerUserId,$otherInfo=[])
            $result = $dispatchOrderObj->orderGive($order[1], $order[5], [
                'design_username' => $order[4], 'order_no' => $order[0], 'design_openid' => $order[3],
                'design_templdateid' => "C3HSuUJdS86p_4gj4xxth943DdE2zkE3IxnlrK5MFTI"]);
            if ($result['state'] != 1) {
                //派單失敗重新加入redis
                $redisServ = new Swoole\Coroutine\Redis;
                $redisServ->connect('127.0.0.1', 6379);
                $redisServ->lPush("order_designer_list", $data);
                $returnMsg = "{$order[0]}派單失敗, errmsg:" . $result['message'];
            } else {
                $returnMsg = "{$order[0]}派單成功, errmsg:" . $result['message'];
            }

        } else {
            $returnMsg = "{$order[0]}已經派單過";
        }
        //返回任務執行的結果
        $serv->finish($returnMsg);
    }

    /**
     * 任務完成通知
     * @param $serv
     * @param $fd
     */
    public function finishHandle($serv, $task_id, $data)
    {
        echo "任務完成 id=$task_id 結果=$data" . PHP_EOL;
    }

    /**
     * 連接開始
     * @param $serv
     * @param $fd
     */
    public function connectHandle($serv, $fd)
    {
        echo "Client-$fd: 連接成功" . PHP_EOL;
    }

    /**
     * 連接結束
     * @param $serv
     * @param $fd
     */
    public function closeHandle($serv, $fd)
    {
        echo "Client-$fd: 連接結束" . PHP_EOL;
    }
}

//創建Server對象,監聽 127.0.0.1:9501端口
$serv = new Swoole\Server("127.0.0.1", 9501);
//設置異步任務的工作進程數量
$serv->set([
    'task_worker_num' => 4 * 5,
    'log_file' => date("Y-m-d") . '-swoole.log',
]);
$object = new swoolePush();
//監聽連接進入事件
$serv->on('Connect', [$object, 'connectHandle']);
//監聽數據接收事件
$serv->on('Receive', [$object, 'receiveHandle']);
//監聽連接關閉事件
$serv->on('Close', [$object, 'closeHandle']);
//處理異步任務
$serv->on('task', [$object, 'taskHandle']);
//處理異步任務的結果
$serv->on('finish', [$object, 'finishHandle']);
//啟動服務器
$serv->start();

pushClient.php

<?php

class pushClient
{
    private $client;

    public function __construct()
    {
        $this->client = new Swoole\Client(SWOOLE_SOCK_TCP);
        if (!$this->client->connect('127.0.0.1', 9501, -1)) {
            exit("connect failed. Error: {$this->client->errCode}" . PHP_EOL);
        }
    }

    /**
     * client發送開啟定時器的指令
     */
    public function startTimer()
    {

        $this->client->send("start");
    }

    /**
     * client發送結束定時器的指令
     */
    public function stopTimer()
    {

        $this->client->send("stop");
    }

    /**
     * 自動銷毀client
     */
    public function __destruct()
    {
        $this->client->close();
    }
}

//client連接開始
if (empty($argv[1])) {
    exit("缺少參數, 參數:(string 'start|getinfo', int [num]) ");
}

$num = empty($argv[2]) ? 5 : intval($argv[2]);
//不同操作返回
//開啟多個client連接,並發送start命令
if (trim($argv[1]) == 'start') {
    for ($i = 0; $i < $num; $i++) {
        $pushClient = new pushClient();
        $pushClient->startTimer();
        sleep(1);
    }
} elseif (trim($argv[1]) == 'getinfo') {

    //獲取當前swoole的信息
    $server = new Swoole\Server("127.0.0.1", 9501);
    $arr = [
        'client_connection_num' => count($server->connections)

    ];
    var_dump($arr);
}

DispatchOrder.php

<?php
require __DIR__ . '/../vendor/autoload.php';
require './Runtime.php';

use Medoo\Medoo;

class DispatchOrder
{
    private $configWchat = [
        'app_id' => 'wx888888888888',
        'secret' => '6d6e4f19888888888888888888',
    ];
    private $configMysql = [
        'database_type' => 'mysql',
        'server' => '10.0.0.0',
        'database_name' => 'db_test',
        'username' => 'root',
        'password' => '123456'
    ];
    private $tokenWchat;
    private $severMysql = null;
    private $severRedis = null;
    private $severRuntime = null;
    private $result = ['state' => 0, 'message' => ''];


    public function __construct()
    {
        //mysql
        if ($this->severMysql == null) {

            $this->severMysql = new Medoo($this->configMysql);
        }
        //日志
        if ($this->severRuntime == null) {

            $this->severRuntime = new Runtime(1, 1, 1, $this->severMysql);
        }
    }

    /**
     * 是否需要分配訂單
     * @param $orderId
     * @return int 1需要分配 0不需要
     */
    public function isOrderNeedGive($orderId)
    {
        $data = $this->severMysql->select('tg_order_task', [
            'status',
        ], [
            'order_id' => $orderId
        ]);
        //1需要分配 0不需要
        return (empty($data) || $data[0]['status'] == 1) ? 0 : 1;
    }

    /**
     * @return Medoo
     */
    public function orderGive($orderId, $designerUserId, $otherInfo = [])
    {
        //微信access_token
        if ($this->severRedis == null) {
            $redisServ = new Predis\Client([
                'scheme' => 'tcp',
                'host' => '127.0.0.1',
                'port' => 6379]);
            $this->severRedis = $redisServ;
        }
        $accessToken = $this->severRedis->get('swoole_order_dispatch_token');
        $accessTokenArr = explode("##", $accessToken);
        if (empty($accessToken) || $accessTokenArr[1] < time()) {
            //設置token
            $urlToken = "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=" . $this->configWchat['app_id'] . "&secret=" . $this->configWchat['secret'];
            $res = $this->httpRequest($urlToken, "get");
            $arr = json_decode($res, true);
            if (empty($arr['access_token'])) {
                $this->severRuntime->log("請求微信token接口失敗,返回:" . $res);
            }
            $expiresTime = (string)(time() + 7200);
            $value = (string)($arr['access_token'] . "##" . $expiresTime);
            $this->severRedis->set('swoole_order_dispatch_token', $value);
            $this->tokenWchat = $arr['access_token'];
        } else {

            $this->tokenWchat = $accessTokenArr[0];
        }
//        var_dump($this->tokenWchat);
        //1.更新訂單狀態
        $updateData = [
            'design_userid' => $designerUserId,
            'design_username' => $otherInfo['design_username'],
            'design_time' => date("Y-m-d H:i:s"),
            'status' => 1,//0 待搶單 1待設計 2已設計 3已完成4已取消5駁回
        ];

        $res = $this->severMysql->update('tg_order_task', [
            'status' => 1,
        ], [
            'order_id' => $orderId
        ]);
//        var_dump($res);
        if (0 == $res->rowCount()) {
            $this->result['message'] = "更新任務訂單失敗,訂單號:" . $otherInfo['order_no'];
            $this->severRuntime->log($this->result['message']);
            return $this->result;
        }

        //2.微信模板信息推送
        $data = [
            'touser' => $otherInfo['design_openid'],
            'template_id' => $otherInfo['design_templdateid'],
            'url' => '',
            'data' => [
                'keyword1' => ['value' => $otherInfo['order_no']],
                'keyword2' => ['value' => date("Y-m-d H:i:s")]
            ],
        ];
        $url = 'https://api.weixin.qq.com/cgi-bin/message/template/send?access_token=' . $this->tokenWchat;
        $res = $this->httpRequest($url, "post", json_encode($data));
//        var_dump($res);
        $resObj = json_decode($res);
        if (0 != $resObj->errcode) {
            $this->result['message'] = "{$otherInfo['order_no']}微信推送失敗,$res";
            $this->severRuntime->log($this->result['message']);
            return $this->result;
        }
        $this->result['state'] = 1;
        $this->result['message'] = "執行成功,訂單號:" . $otherInfo['order_no'];
        return $this->result;

    }

    /**
     * CURL請求
     *
     * @param string  請求url地址
     * @param method 請求方法 get post
     * @param null $postfields post數據數組
     * @param array $headers 請求header信息
     * @param bool|false $debug 調試開啟 默認false
     * @return mixed
     */
    private function httpRequest($url, $method, $postfields = null, $headers = [], $debug = false)
    {
        $method = strtoupper($method);
        $ci = curl_init();
        /* Curl settings */
        curl_setopt($ci, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
        curl_setopt($ci, CURLOPT_USERAGENT, "Mozilla/5.0 (Windows NT 6.2; WOW64; rv:34.0) Gecko/20100101 Firefox/34.0");
        curl_setopt($ci, CURLOPT_CONNECTTIMEOUT, 60); /* 在發起連接前等待的時間,如果設置為0,則無限等待 */
        curl_setopt($ci, CURLOPT_TIMEOUT, 7); /* 設置cURL允許執行的最長秒數 */
        curl_setopt($ci, CURLOPT_RETURNTRANSFER, true);
        switch ($method) {
            case "POST":
                curl_setopt($ci, CURLOPT_POST, true);
                if (!empty($postfields)) {
                    $tmpdatastr = is_array($postfields) ? http_build_query($postfields) : $postfields;
                    curl_setopt($ci, CURLOPT_POSTFIELDS, $tmpdatastr);
                }
                break;
            default:
                curl_setopt($ci, CURLOPT_CUSTOMREQUEST, $method); /* //設置請求方式 */
                break;
        }
        $ssl = preg_match('/^https:\/\//i', $url) ? TRUE : FALSE;
        curl_setopt($ci, CURLOPT_URL, $url);
        if ($ssl) {
            curl_setopt($ci, CURLOPT_SSL_VERIFYPEER, FALSE); // https請求 不驗證證書和hosts
            curl_setopt($ci, CURLOPT_SSL_VERIFYHOST, FALSE); // 不從證書中檢查SSL加密算法是否存在
        }
        curl_setopt($ci, CURLOPT_MAXREDIRS, 2); /* 指定最多的HTTP重定向的數量,這個選項是和CURLOPT_FOLLOWLOCATION一起使用的 */
        curl_setopt($ci, CURLOPT_HTTPHEADER, $headers);
        curl_setopt($ci, CURLINFO_HEADER_OUT, true);
        $response = curl_exec($ci);
        $requestinfo = curl_getinfo($ci);
        $http_code = curl_getinfo($ci, CURLINFO_HTTP_CODE);
        if ($debug) {
            echo "=====post data======\r\n";
            var_dump($postfields);
            echo "=====info===== \r\n";
            print_r($requestinfo);
            echo "=====response=====\r\n";
            print_r($response);
        }
        curl_close($ci);
        return $response;
    }

    /**
     * 關閉資源鏈接
     */
    public function __destruct()
    {
    }
}

Runtime.php

<?php
require __DIR__ . '/../vendor/autoload.php';

use Medoo\Medoo;

class Runtime
{
    //isMysql是否寫入數據  isFile是否寫入文件  isConsole是否console輸入
    private $isMysql;
    private $isFile;
    private $isConsole;
    private $startTime = 0;
    private $stopTime = 0;
    private $severMysql = null;

    public function __construct($isMysql, $isFile, $isConsole, $severMysql)
    {
        $this->isMysql = $isMysql;
        $this->isFile = $isFile;
        $this->isConsole = $isConsole;
        //mysql
        if ($this->severMysql == null) {
            $this->severMysql = $severMysql;
        }
    }

    /**
     * 日志記錄
     * @param $content
     * @param $runtime
     */
    public function log($content, $runtime = 0, $mode = [])
    {
        //數據庫
        if ($this->isMysql) {
            $data = [
                'content' => $content,
                'runtime' => $runtime,
                'w_time' => date("Y-m-d H:i:s"),
            ];
            $this->severMysql->insert('tg_log_order_dispatch', $data);
        }
        //寫入文件
        if ($this->isFile) {
            file_put_contents(date("Y-m-d").'-runtime.log', $content."\n", FILE_APPEND);
        }
        //命令窗口
        if ($this->isConsole) {
            echo date("Y-m-d H:i:s") . " " . $content . PHP_EOL;
        }

    }

    //開始運行時間
    public function start()
    {
        $this->startTime = $this->getMicrotime();
    }

    //結束時間
    public function stop()
    {
        $this->stopTime = $this->getMicrotime();
    }

    //開始和結束之間總時長
    public function spent()
    {
        return ($this->stopTime - $this->startTime);
    }

    private function getMicrotime()
    {
        list($usec, $sec) = explode(' ', microtime());
        return ((float)$usec + (float)$sec);
    }
}

歡迎留言交流


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM