php Swoole實現毫秒級定時任務


項目開發中,如果有定時任務的業務要求,我們會使用linux的crontab來解決,但是它的最小粒度是分鍾級別,如果要求粒度是秒級別的,甚至毫秒級別的,crontab就無法滿足,值得慶幸的是swoole提供的強大的毫秒定時器。

應用場景舉例
我們可能會遇到這樣的場景:

  • 場景一:每隔30秒獲取一次本機內存使用率
  • 場景二:2分鍾后執行報表發送任務
  • 場景三:每天凌晨2點鍾定時請求第三方接口,如果接口有數據返回則停止任務,如果接口由於某種原因沒有響應或者沒有數據返回則5分鍾后繼續嘗試請求該接口,嘗試5次后仍然失敗則停止該任務

以上的三個場景我們都可以歸納為定時任務的范疇。

Swoole毫秒定時器
Swoole提供了異步毫秒定時器函數:

swoole_timer_tick(int $msec, callable $callback):設置一個間隔時鍾定時器,每隔$msec毫秒執行一次$callback,類似於javascript中的setInterval()

swoole_timer_after(int $after_time_ms, mixed $callback_function):在指定的時間$after_time_ms后執行$callback_function,類似於javascript的setTimeout()

swoole_timer_clear(int $timer_id):刪除指定id的定時器,類似於javascript的clearInterval()

解決方案

對於場景一,經常用在系統檢測統計方面,實時性要求比較高,但又能控制好頻率,多用於后台服務器性能監控,可以生成可視化圖表。可以是30秒獲取一次內存使用率,也可以是10秒,而crontab最小粒度只能設置為1分鍾。

1 swoole_timer_tick(30000, function($timer) use ($task_id) { // 啟用定時器,每30秒執行一次
2     $memPercent = $this->getMemoryUsage(); //計算內存使用率
3     echo date('Y-m-d H:i:s') . '當前內存使用率:'.$memPercent."\n";
4 });

 

 

對於場景二,直接定義xx時間后執行某項任務的話,貌似crontab比較困難,而使用swoole的swoole_timer_after可以實現:

1 swoole_timer_after(120000, function() use ($str) { //2分鍾后執行
2     $this->sendReport(); //發送報表
3     echo "send report, $str\n";
4 });

 

對於場景三,用來作嘗試請求,請求失敗后繼續,如果成功則停止請求。用crontab也能解決,但是比較傻,比如設置每隔5分鍾請求一次,不管成功會失敗都會去執行一次。而用swoole定時器則智能多了。

 1 swoole_timer_tick(5*60*1000, function($timer) use ($url) { // 啟用定時器,每5分鍾執行一次
 2     $rs = $this->postUrl($url);
 3 
 4     if ($rs) {
 5         //業務代碼...
 6         swoole_timer_clear($timer); // 停止定時器
 7         echo date('Y-m-d H:i:s'). "請求接口任務執行成功\n";
 8     } else {
 9         echo date('Y-m-d H:i:s'). "請求接口失敗,5分鍾后再次嘗試\n";
10     }
11 });

 

示例代碼

新建文件\src\App\Task.php:

  1 <?php 
  2 namespace Helloweba\Swoole;
  3 
  4 use swoole_server;
  5 
  6 /**
  7 * 任務調度
  8 */
  9 class Task
 10 {
 11     protected $serv;
 12     protected $host = '127.0.0.1';
 13     protected $port = 9506;
 14     // 進程名稱
 15     protected $taskName = 'swooleTask';
 16     // PID路徑
 17     protected $pidPath = '/run/swooletask.pid';
 18     // 設置運行時參數
 19     protected $options = [
 20         'worker_num' => 4, //worker進程數,一般設置為CPU數的1-4倍  
 21         'daemonize' => true, //啟用守護進程
 22         'log_file' => '/data/log/swoole-task.log', //指定swoole錯誤日志文件
 23         'log_level' => 0, //日志級別 范圍是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR
 24         'dispatch_mode' => 1, //數據包分發策略,1-輪詢模式
 25         'task_worker_num' => 4, //task進程的數量
 26         'task_ipc_mode' => 3, //使用消息隊列通信,並設置為爭搶模式
 27     ];
 28 
 29     public function __construct($options = [])
 30     {
 31         date_default_timezone_set('PRC'); 
 32         // 構建Server對象,監聽127.0.0.1:9506端口
 33         $this->serv = new swoole_server($this->host, $this->port);
 34 
 35         if (!empty($options)) {
 36             $this->options = array_merge($this->options, $options);
 37         }
 38         $this->serv->set($this->options);
 39 
 40         // 注冊事件
 41         $this->serv->on('Start', [$this, 'onStart']);
 42         $this->serv->on('Connect', [$this, 'onConnect']);
 43         $this->serv->on('Receive', [$this, 'onReceive']);
 44         $this->serv->on('Task', [$this, 'onTask']);  
 45         $this->serv->on('Finish', [$this, 'onFinish']);
 46         $this->serv->on('Close', [$this, 'onClose']);
 47     }
 48 
 49     public function start()
 50     {
 51         // Run worker
 52         $this->serv->start();
 53     }
 54 
 55     public function onStart($serv)
 56     {
 57         // 設置進程名
 58         cli_set_process_title($this->taskName);
 59         //記錄進程id,腳本實現自動重啟
 60         $pid = "{$serv->master_pid}\n{$serv->manager_pid}";
 61         file_put_contents($this->pidPath, $pid);
 62     }
 63 
 64     //監聽連接進入事件
 65     public function onConnect($serv, $fd, $from_id)
 66     {
 67         $serv->send( $fd, "Hello {$fd}!" );
 68     }
 69 
 70     // 監聽數據接收事件
 71     public function onReceive(swoole_server $serv, $fd, $from_id, $data)
 72     {
 73         echo "Get Message From Client {$fd}:{$data}\n";
 74         //$this->writeLog('接收客戶端參數:'.$fd .'-'.$data);
 75         $res['result'] = 'success';
 76         $serv->send($fd, json_encode($res)); // 同步返回消息給客戶端
 77         $serv->task($data);  // 執行異步任務
 78     }
 79 
 80     /**
 81     * @param $serv swoole_server swoole_server對象
 82     * @param $task_id int 任務id
 83     * @param $from_id int 投遞任務的worker_id
 84     * @param $data string 投遞的數據
 85     */
 86     public function onTask(swoole_server $serv, $task_id, $from_id, $data)
 87     {
 88         swoole_timer_tick(30000, function($timer) use ($task_id) { // 啟用定時器,每30秒執行一次
 89             $memPercent = $this->getMemoryUsage();
 90             echo date('Y-m-d H:i:s') . '當前內存使用率:'.$memPercent."\n";
 91         });
 92     }
 93 
 94 
 95     /**
 96     * @param $serv swoole_server swoole_server對象
 97     * @param $task_id int 任務id
 98     * @param $data string 任務返回的數據
 99     */
100     public function onFinish(swoole_server $serv, $task_id, $data)
101     {
102         //
103     }
104 
105 
106     // 監聽連接關閉事件
107     public function onClose($serv, $fd, $from_id) {
108         echo "Client {$fd} close connection\n";
109     }
110 
111     public function stop()
112     {
113         $this->serv->stop();
114     }
115 
116     private function getMemoryUsage()
117     {
118         // MEMORY
119         if (false === ($str = @file("/proc/meminfo"))) return false;
120         $str = implode("", $str);
121         preg_match_all("/MemTotal\s{0,}\:+\s{0,}([\d\.]+).+?MemFree\s{0,}\:+\s{0,}([\d\.]+).+?Cached\s{0,}\:+\s{0,}([\d\.]+).+?SwapTotal\s{0,}\:+\s{0,}([\d\.]+).+?SwapFree\s{0,}\:+\s{0,}([\d\.]+)/s", $str, $buf);
122         //preg_match_all("/Buffers\s{0,}\:+\s{0,}([\d\.]+)/s", $str, $buffers);
123 
124         $memTotal = round($buf[1][0]/1024, 2);
125         $memFree = round($buf[2][0]/1024, 2);
126         $memUsed = $memTotal - $memFree;
127         $memPercent = (floatval($memTotal)!=0) ? round($memUsed/$memTotal*100,2):0;
128 
129         return $memPercent;
130     }
131 }

 

 

我們以場景一為例,在onTask啟用定時任務,每隔30秒計算一次內存使用率。實際應用中可以把計算好的內存按時間寫入數據庫等存儲中,然后可以根據前端需求用來渲染成統計圖表,如:



接着服務端代碼 public\taskServer.php :

<?php 
require dirname(__DIR__) . '/vendor/autoload.php';

use Helloweba\Swoole\Task;

$opt = [
    'daemonize' => false
];
$ser = new Task($opt);
$ser->start();

 

 

客戶端代碼 public\taskClient.php :

<?php 
class Client
{
    private $client;

    public function __construct() {
        $this->client = new swoole_client(SWOOLE_SOCK_TCP);
    }

    public function connect() {
        if( !$this->client->connect("127.0.0.1", 9506 , 1) ) {
            echo "Error: {$this->client->errMsg}[{$this->client->errCode}]\n";
        }
        fwrite(STDOUT, "請輸入消息 Please input msg:");
        $msg = trim(fgets(STDIN));
        $this->client->send( $msg );
        $message = $this->client->recv();
        echo "Get Message From Server:{$message}\n";
    }
}

$client = new Client();
$client->connect();

 

驗證效果

1.啟動服務端:

php taskServer.php

 

2.客戶端輸入:

另開命令行窗口,執行

[root@localhost public]# php taskClient.php 
請輸入消息 Please input msg:hello
Get Message From Server:{"result":"success"}
[root@localhost public]# 

 

3.服務端返回:

如果返回上圖中的結果,則定時任務正常運行,我們會發現每隔30秒會輸出一條信息。

 

多PHPer在進階的時候總會遇到一些問題和瓶頸,業務代碼寫多了沒有方向感,不知道該從那里入手去提升,對此我整理了一些資料,包括但不限於:分布式架構、高可擴展、高性能、高並發、服務器性能調優、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql優化、shell腳本、Docker、微服務、Nginx等多個知識點高級進階干貨需要的可以免費分享給大家,需要的加群(點擊→)677079770


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM