進程作為程序執行過程中資源分配的基本單位,擁有獨立的地址空間,同一進程的線程可以共享本進程的全局變量,靜態變量等數據和地址空間,但進程之間資源相互獨立.由於PHP語言不支持多線程,因此Swoole使用多進程模式,再多進程模式下就存在進程內存隔離,進程間通信與數據共享問題.
swoole中master主進程會創建manager管理進程和reactor線程,真正的工作進程為worker進程. manager是創建和管理worker進程,reactor進程測試監聽socket,接受數據任務,發送給worker進程去工作,因此所有業務邏輯最終都是在worker進程中進行的,worker進程之間的數據共享與通信必不可少.
swoole中 設置選項worker_num設置 啟動的worker進程數,默認設置為CPU核數
1 $server = new swoole_server('127.0.0.1',9898); 2 $server->set(array( 3 'worker_num' => 4, //設置啟動的Worker進程數。 4 ));
如上面說描述,進程存在進程隔離:
1 $fds = array(); 2 $server->on('connect', function ($server, $fd){ 3 echo "connection open: {$fd}\n"; 4 global $fds; 5 $fds[] = $fd; 6 var_dump($fds); 7 });
$fds雖然是全局變量,但是只在但前的進程內有效,swoole服務器底層會創建多個worker進程,此處打印出來的只有部分連接的fd,本文講簡述兩種解決方案的簡單示例:
1.外部存儲服務 : Redis
作為內存數據庫redis 無太多IO等待,並且讀寫速度快
示例代碼:以簡易聊天室websocket服務 swoole_websocket_server為例
1 $ws = new swoole_websocket_server("0.0.0.0", 9999); 2 $redis = new \Redis(); 3 $redis->connect('127.0.0.1', 6379); 4 $ws->set(array( 5 'daemonize' => false, 6 'worker_num' => 4, 7 )); 8 //監聽WebSocket連接打開事件 9 $ws->on('open', function ($ws, $request) use($redis) { 10 var_dump($request->fd, $request->get, $request->server); 11 //記錄連接 12 $redis->sAdd('fd', $request->fd); 13 $count = $redis->sCard('fd'); 14 var_dump($count); 15 $ws->push($request->fd, 'hello, welcome ☺ 當前'.$count.'人連接在線'); 16 }); 17 //監聽WebSocket消息事件 18 $ws->on('message', function ($ws, $frame) use($redis) { 19 $fds = $redis->sMembers('fd'); 20 $data = json_decode($frame->data,true); 21 if($data['type'] ==1 ){ 22 $redis->set($frame->fd,json_encode(['fd'=>$frame->fd,'user'=>$data['user']])); 23 //通知所有用戶新用戶上線 24 $fds = $redis->sMembers('fd');$users=[]; 25 $i=0; 26 foreach ($fds as $fd_on){ 27 $info = $redis->get($fd_on); 28 $users[$i]['fd'] = $fd_on; 29 $users[$i]['name'] = json_decode($info,true)['user']; 30 $message = "歡迎 <b style='color: darkmagenta ;'>".$data['user']."</b> 進入聊天室"; 31 $push_data = ['message'=>$message,'users'=>$users]; 32 $ws->push($fd_on,json_encode($push_data)); 33 $i++; 34 } 35 }else if($data['type'] ==2){ 36 if($data['to_user'] == 'all'){ 37 foreach ($fds as $fd){ 38 $message = "<b style='color: crimson'>".$data['from_user']." say:</b> ".$data['msg']; 39 $push_data = ['message'=>$message]; 40 $ws->push($fd,json_encode($push_data)); 41 } 42 } 43 } 44 echo "Message: {$frame->data}\n"; 45 }); 46 //監聽WebSocket連接關閉事件 47 $ws->on('close', function ($ws, $fd) use ($redis){ 48 $redis->sRem('fd',$fd); 49 $fds = $redis->sMembers('fd'); 50 $i=0; 51 foreach ($fds as $fd_on){ 52 $user = json_decode($redis->get($fd),true)['user']; 53 $info = $redis->get($fd_on); 54 $users[$i]['fd'] = $fd_on; 55 $users[$i]['name'] = json_decode($info,true)['user']; 56 $message = "<b style='color: blueviolet'>".$user."</b> 離開聊天室了"; 57 $push_data = ['message'=>$message,'users'=>$users]; 58 $ws->push($fd_on,json_encode($push_data)); 59 $i++; 60 } 61 echo "client-{$fd} is closed\n"; 62 });
2.共享內存拓展:swoole_table
swoole_table是swoole官方提供的基於共享內存和鎖實現的超高性能冰飯數據結構.swoole_table在swoole1.7.5版本后可用.
目前swoole只支持3種類型:
swoole_table::TYPE_INT 整形字段
swoole_table::TYPE_FLOAT浮點字段
swoole_table::TYPE_STRING 字符串字段
函數方法:
column() :給內存表增加一列 參數:字段名,字段類型,字節數
$table->column('id', swoole_table::TYPE_INT, 4);
create():基於前一步對表結構的創建,執行創建表.
set() :設置行的數據(key-value的方式) 參數: 數據的key,數據的值(必須數組,鍵名必須與字段定義的$name相同)
$table->set($fd, ['id'=>1]);
get() :獲取一行數據 參數:數據的key
$table->get($fd);
del() :刪除一行數據 參數:數據的key
$table->del($fd);
lock():鎖定整個表
unlock():釋放鎖
lock/unlock 必須成對出現,否則會發生死鎖.
示例代碼: 還是上面的websocket服務為例
1 class WebSocketServer { 2 private $server; 3 public function __construct() 4 { 5 $this->server = new swoole_websocket_server("0.0.0.0",9988); 6 $this->server->set(array( 7 'daemonize' => false, 8 'worker_num' => 4, 9 )); 10 //內存表 11 $fd_table = new swoole_table( 1024 ); 12 $fd_table->column( "user",swoole_table::TYPE_STRING, 30 ); 13 $fd_table->column( "time", swoole_table::TYPE_STRING, 20 ); 14 $fd_table->create(); 15 16 $user_table = new swoole_table(1024); 17 $user_table->column("fd",swoole_table::TYPE_INT,8); 18 $user_table->create(); 19 20 $this->server->fd = $fd_table; 21 $this->server->user = $user_table; 22 23 //啟動開始 24 $this->server->on('Start',[$this,'onStart']); 25 //與onStart同級 26 $this->server->on('workerStart',[$this,'onWorkerStart']); 27 //webSocket open 連接觸發回調 28 $this->server->on('open',[$this,'onOpen']); 29 //webSocket send 發送觸發回調 30 $this->server->on('message', [$this, 'onMessage']); 31 //webSocket close 關閉觸發回調 32 $this->server->on('Close', [$this, 'onClose']); 33 //tcp連接 觸發 在 webSocket open 之前回調 34 $this->server->on('Connect', [$this, 'onConnect']); 35 //tcp 模式下(eg:telnet ) 發送信息才會觸發 webSocket 模式下沒有觸發 36 $this->server->on('Receive', [$this, 'onReceive']); 37 // 服務開啟 38 $this->server->start(); 39 40 } 41 42 public function onStart( $server) 43 { 44 echo "Start\n"; 45 } 46 47 public function onWorkerStart($server,$worker_id) 48 { 49 //判斷是worker進程還是 task_worker進程 echo 次數 是worker_num+task_worker_num 50 if($worker_id<$server->setting['worker_num']){ 51 echo 'worder'.$worker_id."\n"; 52 }else{ 53 echo 'task_worker'.$worker_id."\n"; 54 } 55 // echo "workerStart{$worker_id}\n"; 56 } 57 58 public function onOpen( $server,$request) 59 { 60 $this->server->fd->set($request->fd,['user'=>'']); 61 echo "server: handshake success with fd{$request->fd}\n"; 62 $count = count($server->connections); 63 $server->push($request->fd, 'hello, welcome ☺ 當前'.$count.'人連接在線'); 64 } 65 66 public function onMessage( $server,$frame) 67 { 68 echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n"; 69 $data = json_decode($frame->data,true); 70 if($data['type'] ==1 ){ 71 $server->fd->set($frame->fd,['user'=>$data['user']]); 72 //通知所有用戶新用戶上線 73 foreach($server->connections as $key => $fd) { 74 $server->push($fd, "歡迎 <b style='color: darkmagenta ;'>".$data['user']."</b> 進入聊天室"); 75 } 76 }else if($data['type'] ==2){ 77 if($data['to_user'] == 'all'){ 78 foreach($server->connections as $key => $fd) { 79 $server->push($fd, "<b style='color: crimson'>".$data['from_user']." say:</b> ".$data['msg']); 80 } 81 } 82 } 83 } 84 85 86 public function onConnect( $server, $fd, $from_id ) { 87 echo "Client {$fd} connect\n"; 88 echo "{$from_id}\n"; 89 } 90 91 public function onReceive( $server, $fd, $from_id, $data ) { 92 echo "Get Message From Client {$fd}:{$data}\n"; 93 } 94 95 96 public function onClose($server, $fd) 97 { 98 echo "Client {$fd} close connection\n"; 99 foreach($server->connections as $key => $on_fd) { 100 $user = $server->fd->get($fd)['user']; 101 $server->push($on_fd, "<b style='color: blueviolet'>".$user."</b> 離開聊天室了"); 102 } 103 } 104 } 105 new WebSocketServer();