swoole 多進程共享數據


 

進程作為程序執行過程中資源分配的基本單位,擁有獨立的地址空間,同一進程的線程可以共享本進程的全局變量,靜態變量等數據和地址空間,但進程之間資源相互獨立.由於PHP語言不支持多線程,因此Swoole使用多進程模式,再多進程模式下就存在進程內存隔離,進程間通信與數據共享問題.

swoole中master主進程會創建manager管理進程和reactor線程,真正的工作進程為worker進程.  manager是創建和管理worker進程,reactor進程測試監聽socket,接受數據任務,發送給worker進程去工作,因此所有業務邏輯最終都是在worker進程中進行的,worker進程之間的數據共享與通信必不可少.

 swoole中 設置選項worker_num設置 啟動的worker進程數,默認設置為CPU核數

1 $server = new swoole_server('127.0.0.1',9898);
2 $server->set(array(
3     'worker_num' => 4,   //設置啟動的Worker進程數。
4 ));

如上面說描述,進程存在進程隔離:

1 $fds = array();
2 $server->on('connect', function ($server, $fd){
3     echo "connection open: {$fd}\n";
4     global $fds;
5     $fds[] = $fd;
6     var_dump($fds);
7 });

$fds雖然是全局變量,但是只在但前的進程內有效,swoole服務器底層會創建多個worker進程,此處打印出來的只有部分連接的fd,本文講簡述兩種解決方案的簡單示例:

1.外部存儲服務 : Redis

作為內存數據庫redis 無太多IO等待,並且讀寫速度快

示例代碼:以簡易聊天室websocket服務 swoole_websocket_server為例


1
$ws = new swoole_websocket_server("0.0.0.0", 9999); 2 $redis = new \Redis(); 3 $redis->connect('127.0.0.1', 6379); 4 $ws->set(array( 5 'daemonize' => false, 6 'worker_num' => 4, 7 )); 8 //監聽WebSocket連接打開事件 9 $ws->on('open', function ($ws, $request) use($redis) { 10 var_dump($request->fd, $request->get, $request->server); 11 //記錄連接 12 $redis->sAdd('fd', $request->fd); 13 $count = $redis->sCard('fd'); 14 var_dump($count); 15 $ws->push($request->fd, 'hello, welcome ☺ 當前'.$count.'人連接在線'); 16 }); 17 //監聽WebSocket消息事件 18 $ws->on('message', function ($ws, $frame) use($redis) { 19 $fds = $redis->sMembers('fd'); 20 $data = json_decode($frame->data,true); 21 if($data['type'] ==1 ){ 22 $redis->set($frame->fd,json_encode(['fd'=>$frame->fd,'user'=>$data['user']])); 23 //通知所有用戶新用戶上線 24 $fds = $redis->sMembers('fd');$users=[]; 25 $i=0; 26 foreach ($fds as $fd_on){ 27 $info = $redis->get($fd_on); 28 $users[$i]['fd'] = $fd_on; 29 $users[$i]['name'] = json_decode($info,true)['user']; 30 $message = "歡迎 <b style='color: darkmagenta ;'>".$data['user']."</b> 進入聊天室"; 31 $push_data = ['message'=>$message,'users'=>$users]; 32 $ws->push($fd_on,json_encode($push_data)); 33 $i++; 34 } 35 }else if($data['type'] ==2){ 36 if($data['to_user'] == 'all'){ 37 foreach ($fds as $fd){ 38 $message = "<b style='color: crimson'>".$data['from_user']." say:</b> ".$data['msg']; 39 $push_data = ['message'=>$message]; 40 $ws->push($fd,json_encode($push_data)); 41 } 42 } 43 } 44 echo "Message: {$frame->data}\n"; 45 }); 46 //監聽WebSocket連接關閉事件 47 $ws->on('close', function ($ws, $fd) use ($redis){ 48 $redis->sRem('fd',$fd); 49 $fds = $redis->sMembers('fd'); 50 $i=0; 51 foreach ($fds as $fd_on){ 52 $user = json_decode($redis->get($fd),true)['user']; 53 $info = $redis->get($fd_on); 54 $users[$i]['fd'] = $fd_on; 55 $users[$i]['name'] = json_decode($info,true)['user']; 56 $message = "<b style='color: blueviolet'>".$user."</b> 離開聊天室了"; 57 $push_data = ['message'=>$message,'users'=>$users]; 58 $ws->push($fd_on,json_encode($push_data)); 59 $i++; 60 } 61 echo "client-{$fd} is closed\n"; 62 });

 

2.共享內存拓展:swoole_table

swoole_table是swoole官方提供的基於共享內存和鎖實現的超高性能冰飯數據結構.swoole_table在swoole1.7.5版本后可用.

目前swoole只支持3種類型:

swoole_table::TYPE_INT 整形字段

swoole_table::TYPE_FLOAT浮點字段

swoole_table::TYPE_STRING 字符串字段

 

函數方法:

column() :給內存表增加一列 參數:字段名,字段類型,字節數

$table->column('id', swoole_table::TYPE_INT, 4);

create():基於前一步對表結構的創建,執行創建表.

set() :設置行的數據(key-value的方式) 參數: 數據的key,數據的值(必須數組,鍵名必須與字段定義的$name相同)

$table->set($fd, ['id'=>1]);

get() :獲取一行數據  參數:數據的key

$table->get($fd);

del() :刪除一行數據 參數:數據的key

$table->del($fd);

lock():鎖定整個表

unlock():釋放鎖

lock/unlock 必須成對出現,否則會發生死鎖.

示例代碼: 還是上面的websocket服務為例

  1 class WebSocketServer {
  2     private $server;
  3     public function __construct()
  4     {
  5         $this->server = new swoole_websocket_server("0.0.0.0",9988);
  6         $this->server->set(array(
  7             'daemonize'       => false,
  8             'worker_num'      => 4,
  9         ));
 10         //內存表
 11         $fd_table = new swoole_table( 1024 );
 12         $fd_table->column( "user",swoole_table::TYPE_STRING, 30 );
 13         $fd_table->column( "time", swoole_table::TYPE_STRING, 20 );
 14         $fd_table->create();
 15 
 16         $user_table = new swoole_table(1024);
 17         $user_table->column("fd",swoole_table::TYPE_INT,8);
 18         $user_table->create();
 19 
 20         $this->server->fd = $fd_table;
 21         $this->server->user = $user_table;
 22 
 23         //啟動開始
 24         $this->server->on('Start',[$this,'onStart']);
 25         //與onStart同級
 26         $this->server->on('workerStart',[$this,'onWorkerStart']);
 27         //webSocket open 連接觸發回調
 28         $this->server->on('open',[$this,'onOpen']);
 29         //webSocket send 發送觸發回調
 30         $this->server->on('message', [$this, 'onMessage']);
 31         //webSocket close 關閉觸發回調
 32         $this->server->on('Close', [$this, 'onClose']);
 33         //tcp連接 觸發 在 webSocket open 之前回調
 34         $this->server->on('Connect', [$this, 'onConnect']);
 35         //tcp 模式下(eg:telnet ) 發送信息才會觸發  webSocket 模式下沒有觸發
 36         $this->server->on('Receive', [$this, 'onReceive']);
 37         // 服務開啟
 38         $this->server->start();
 39 
 40     }
 41 
 42     public function onStart( $server)
 43     {
 44         echo "Start\n";
 45     }
 46 
 47     public function onWorkerStart($server,$worker_id)
 48     {
 49         //判斷是worker進程還是 task_worker進程 echo 次數 是worker_num+task_worker_num
 50         if($worker_id<$server->setting['worker_num']){
 51             echo  'worder'.$worker_id."\n";
 52         }else{
 53             echo  'task_worker'.$worker_id."\n";
 54         }
 55         //     echo "workerStart{$worker_id}\n";
 56     }
 57 
 58     public function onOpen( $server,$request)
 59     {
 60         $this->server->fd->set($request->fd,['user'=>'']);
 61         echo "server: handshake success with fd{$request->fd}\n";
 62         $count = count($server->connections);
 63         $server->push($request->fd, 'hello, welcome ☺                     當前'.$count.'人連接在線');
 64     }
 65 
 66     public function onMessage( $server,$frame)
 67     {
 68         echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
 69         $data = json_decode($frame->data,true);
 70         if($data['type'] ==1 ){
 71             $server->fd->set($frame->fd,['user'=>$data['user']]);
 72             //通知所有用戶新用戶上線
 73             foreach($server->connections as $key => $fd) {
 74                 $server->push($fd, "歡迎 <b style='color: darkmagenta ;'>".$data['user']."</b> 進入聊天室");
 75             }
 76         }else if($data['type'] ==2){
 77             if($data['to_user'] == 'all'){
 78                 foreach($server->connections as $key => $fd) {
 79                     $server->push($fd, "<b style='color: crimson'>".$data['from_user']." say:</b>  ".$data['msg']);
 80                 }
 81             }
 82         }
 83     }
 84 
 85 
 86     public function onConnect( $server, $fd, $from_id ) {
 87         echo "Client {$fd} connect\n";
 88         echo "{$from_id}\n";
 89     }
 90 
 91     public function onReceive( $server, $fd, $from_id, $data ) {
 92         echo "Get Message From Client {$fd}:{$data}\n";
 93     }
 94 
 95 
 96     public function onClose($server, $fd)
 97     {
 98         echo "Client {$fd} close connection\n";
 99         foreach($server->connections as $key => $on_fd) {
100             $user = $server->fd->get($fd)['user'];
101             $server->push($on_fd, "<b style='color: blueviolet'>".$user."</b> 離開聊天室了");
102         }
103     }
104 }
105 new WebSocketServer();


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM