源碼地址:https://github.com/Tinywan/PHP_Experience
問題分析
- 問題一:要求日志最好入庫;但是,直接入庫mysql確實扛不住,批量入庫沒有問題,done。【批量入庫和直接入庫性能差異】
- 問題二:批量入庫就需要有高並發的消息隊列,決定采用redis list 仿真實現,而且方便回滾。
- 問題三:日志量畢竟大,保存最近30條足矣,決定用php寫個離線統計和清理腳本。
一、設計數據庫表和存儲
- 考慮到log系統對數據庫的性能更多一些,穩定性和安全性沒有那么高,
存儲引擎自然是只支持select insert 沒有索引的archive
。如果確實有update需求,也可以采用myISAM。 - 考慮到log是實時記錄的所有數據,數量可能巨大,
主鍵采用bigint,自增即可
。 - 考慮到log系統
以寫為主,統計采用離線計算,字段均不要出現索引
,因為一方面可能會影響插入數據效率,另外讀時候會造成死鎖,影響寫數據。
二、redis存儲數據形成消息隊列
/** * 使用隊列生成reids測試數據 * 成功:執行 RPUSH操作后,返回列表的長度:8 */ public function createRedisList($listKey = 'message01') { $redis = RedisInstance::MasterInstance(); $redis->select(1); $message = [ 'type' => 'say', 'userId' => $redis->incr('user_id'), 'userName' => 'Tinywan' . mt_rand(100, 9999), //是否正在錄像 'userImage' => '/res/pub/user-default-w.png', //是否正在錄像 'openId' => 'openId' . mt_rand(100000, 9999999999999999), 'roomId' => 'openId' . mt_rand(30, 50), 'createTime' => date('Y-m-d H:i:s', time()), 'content' => $redis->incr('content') //當前是否正在打流狀態 ]; $rPushResul = $redis->rPush($listKey, json_encode($message)); //執行成功后返回當前列表的長度 9 return $rPushResul; }
三、讀取redis消息隊列里面的數據,批量入庫
第一種思路:
/** * 消息Redis方法保存到Mysql數據庫 * @param string $liveKey */ public function RedisSaveToMysql($listKey = 'message01') { if (empty($listKey)) { $result = ["errcode" => 500, "errmsg" => "this parameter is empty!"]; exit(json_encode($result)); } $redis = RedisInstance::MasterInstance(); $redis->select(1); $redisInfo = $redis->lRange($listKey, 0, 5); $dataLength = $redis->lLen($listKey); $model = M("User"); while ($dataLength > 65970) { try { $model->startTrans(); $redis->watch($listKey); $arrList = []; foreach ($redisInfo as $key => $val) { $arrList[] = array( 'username' => json_decode($val, true)['userName'], 'logintime' => json_decode($val, true)['createTime'], 'description' => json_decode($val, true)['content'], 'pido' => json_decode($val, true)['content'] ); } $insertResult = $model->addAll($arrList); if (!$insertResult) { $model->rollback(); $result = array("errcode" => 500, "errmsg" => "Data Insert into Fail!", 'data' => 'dataLength:' . $dataLength); exit(json_encode($result)); } $model->commit(); $redis->lTrim($listKey, 6, -1); $redisInfo = $redis->lRange($listKey, 0, 5); $dataLength = $redis->lLen($listKey); } catch (Exception $e) { $model->rollback(); $result = array("errcode" => 500, "errmsg" => "Data Insert into Fail!"); exit(json_encode($result)); } } $result = array("errcode" => 200, "errmsg" => "Data Insert into Success!", 'data' => 'dataLength:' . $dataLength . 'liveKey:' . $listKey); exit(json_encode($result)); }
第二種思路(供參考,非框架)
<?php $redis_xx = new Redis(); $redis_xx->connect('ip', port); $redis_xx->auth("password"); // 獲取現有消息隊列的長度 $count = 0; $max = $redis_xx->lLen("call_log"); // 獲取消息隊列的內容,拼接sql $insert_sql = "insert into fb_call_log (`interface_name`, `createtime`) values "; // 回滾數組 $roll_back_arr = array(); while ($count < $max) { $log_info = $redis_cq01->lPop("call_log"); $roll_back_arr = $log_info; if ($log_info == 'nil' || !isset($log_info)) { $insert_sql .= ";"; break; } // 切割出時間和info $log_info_arr = explode("%", $log_info); $insert_sql .= " ('" . $log_info_arr[0] . "','" . $log_info_arr[1] . "'),"; $count++; } // 判定存在數據,批量入庫 if ($count != 0) { $link_2004 = mysql_connect('ip:port', 'user', 'password'); if (!$link_2004) { die("Could not connect:" . mysql_error()); } $crowd_db = mysql_select_db('fb_log', $link_2004); $insert_sql = rtrim($insert_sql, ",") . ";"; $res = mysql_query($insert_sql); // 輸出入庫log和入庫結果; echo date("Y-m-d H:i:s") . "insert " . $count . " log info result:"; echo json_encode($res); echo "</br>\n"; // 數據庫插入失敗回滾 if (!$res) { foreach ($roll_back_arr as $k) { $redis_xx->rPush("call_log", $k); } } // 釋放連接 mysql_free_result($res); mysql_close($link_2004); } $redis_cq01->close(); ?>
四、獲取Redis數據緩存數據
/** * [0]檢查當前Redis是否連接成功 * [1]獲取數據,首先從Redis中去獲取,沒有的話再從數據庫中去獲取 */ public function findDataRedisOrMysql($listKey = 'message01') { //Check the current connection status 查看服務是否運行 if (RedisInstance::MasterInstance() != false) { $redis = RedisInstance::MasterInstance(); $redis->select(2); /** * 首先從Redis中去獲取數據 * lRange 獲取為空的話,則表示沒有數據,否則返回一個非空數組 */ $redisData = $redis->lRange($listKey, 0, 9); $resultData = []; if (!empty($redisData)) { $resultData['status_code'] = 200; $resultData['msg'] = 'Data Source from Redis Cache'; foreach ($redisData as $key => $val) { $resultData['listData'][] = json_decode($val, true); } } else { $resultData['redis_msg'] = 'Redis is Expire'; $conditions = array('status' => ':status'); $mysqlData = M('User')->where($conditions)->bind(':status', 1, \PDO::PARAM_STR)->select(); if ($mysqlData) { $resultData['status_code'] = 200; $resultData['mysql_msg'] = 'Data Source from Mysql is Success'; $redis->select(2); foreach ($mysqlData as $key => $val) { $resultData['listData'][] = $val; //寫入Redis作為緩存 $redis->rPush($listKey, json_encode($val)); } //同時設置一個過期時間 $redis->expire($listKey,30); } else { $resultData['status_code'] = 500; $resultData['mysql_msg'] = 'Data Source from Mysql is Fail'; } } } else { $resultData['redis_msg'] = 'Redis server went away'; $resultData['mysql_msg'] = 'Mysql Data2'; $conditions = array('status' => ':status'); $mysqlData = M('User')->where($conditions)->bind(':status', 1, \PDO::PARAM_STR)->select(); foreach ($mysqlData as $key => $val) { $resultData['listData'][] = $val; } } homePrint($resultData); }
四、離線天級統計和清理數據腳本
<?php /** * static log :每天離線統計代碼日志和刪除五天前的日志 * */ // 離線統計 $link_2004 = mysql_connect('ip:port', 'user', 'pwd'); if (!$link_2004) { die("Could not connect:" . mysql_error()); } $crowd_db = mysql_select_db('fb_log', $link_2004); // 統計昨天的數據 $day_time = date("Y-m-d", time() - 60 * 60 * 24 * 1); $static_sql = "get sql"; $res = mysql_query($static_sql, $link_2004); // 獲取結果入庫略 // 清理15天之前的數據 $before_15_day = date("Y-m-d", time() - 60 * 60 * 24 * 15); $delete_sql = "delete from xxx where createtime < '" . $before_15_day . "'"; try { $res = mysql_query($delete_sql); }catch(Exception $e){ echo json_encode($e)."\n"; echo "delete result:".json_encode($res)."\n"; } mysql_close($link_2004); ?>
五:代碼部署
主要是部署,批量入庫腳本的調用和天級統計腳本,crontab例行運行。
# 批量入庫腳本 */2 * * * * /home/cuihuan/xxx/lamp/php5/bin/php /home/cuihuan/xxx/batchLog.php >>/home/cuihuan/xxx/batchlog.log # 天級統計腳本 0 5 * * * /home/cuihuan/xxx/php5/bin/php /home/cuihuan/xxx/staticLog.php >>/home/cuihuan/xxx/staticLog.log
總結:相對於其他復雜的方式處理高並發,這個解決方案簡單有效:通過redis緩存抗壓,mysql批量入庫解決數據庫瓶頸,離線計算解決統計數據,通過定期清理保證庫的大小。