swoole 消息隊列


 

<?php
/**
 * 場景:
 * 監控訂單表狀態 隊列通信
 * 一個進程向隊列發布消息 另外兩個進程爭搶
 */

//設置主進程名
echo '主進程id:' . posix_getpid() . PHP_EOL;
cli_set_process_title('php_main');

//1、此子進程用於監聽數據的改變
$process1 = new \Swoole\Process(function (\Swoole\Process $process) {
//    cli_set_process_title('php_child');
    $process->name('php_child1');
    $pdo = new \PDO('mysql:host=mysql;dbname=test', 'root', 'csh.aptx4869#');

    while (true) {
        $statement = $pdo->query('select * from `order` where is_pay=1 and is_notice=0 order by id desc');
        $data = $statement->fetch(PDO::FETCH_ASSOC);
        if ($data) {
            //投遞數據到消息隊列中
            $process->push('php_child1發送' . $data['client_name'] . '已支付 ');
            $pdo->exec('update `order` set is_notice=1 where id=' . $data['id']);
        }
        sleep(3);
    }
}, false, SOCK_STREAM, true);
//啟動消息隊列作為進程間通信
$process1->useQueue(2);
echo '子進程1 id:' . $process1->start() . PHP_EOL;

//2、此子進程用於發送郵件
$process2 = new \Swoole\Process(function (\Swoole\Process $process) {
    $process->name('php_child2');
    while (true) {
        //從隊列中提取數據
        $orderInfo = $process->pop();
        if ($orderInfo) {
            echo $orderInfo;
            echo 'php_child2發送郵件' . PHP_EOL;
        }
        sleep(3);
    }
}, false, SOCK_STREAM, true);
$process2->useQueue(2);
echo '子進程2 id:' . $process2->start() . PHP_EOL;

//3、此子進程用於發送郵件
$process3 = new \Swoole\Process(function (\Swoole\Process $process) {
    $process->name('php_child3');
    while (true) {
        //從隊列中提取數據
        $orderInfo = $process->pop();
        if ($orderInfo) {
            echo $orderInfo;
            echo 'php_child3發送郵件' . PHP_EOL;
        }
        sleep(3);
    }
}, false, SOCK_STREAM, true);
$process3->useQueue(2);
echo '子進程3 id:' . $process3->start() . PHP_EOL;

while (true) {
    sleep(3);
}

//\Swoole\Process::wait();
//使用Process作為監控父進程,創建管理子進程時,父類必須注冊信號SIGCHLD對退出的進程執行wait,否則子進程退出時會變成僵屍進程
Swoole\Process::signal(SIGCHLD, function ($signo) {
    //false 非阻塞模式
    while ($ret = \Swoole\Process::wait(false)) {
        var_dump($ret);
    }
});

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM