一、開發背景
在項目開發過程中中遇到了以下三個需求:
1. 多個用戶同時上傳數據;
2. 數據庫需要支持同時讀寫;
3. 1分鍾內存儲上萬條數據;
根據對Mysql的測試情況,遇到以下問題:
1. 最先遇到壓力的是服務器,在寫入2500-3000條數據時,服務器崩潰了;
2. 當數據庫寫入時,耗時太長,10000條數據,大概需要505.887s,相當於8分鍾,如下:
a. 表結構:
b. 數據庫Procedure:
DROP PROCEDURE IF EXISTS my_insert; CREATE PROCEDURE my_insert() BEGIN DECLARE n int DEFAULT 1; loopname:LOOP INSERT INTO car_pathinfo_driver_cpy(id, linkphone,cartype,carcolor,carnumber,drivername,pubtimes)VALUES(n+500,'18838325709','雪弗蘭','白','豫A190XS','siker','3'); SET n=n+1; IF n=10000 THEN LEAVE loopname; END IF; END LOOP loopname; END; CALL my_insert();
c. 運行結果如下:
3. 不斷的數據庫寫入導致數據庫壓力過大;
出現以上問題,是由於mysql是基於磁盤的IO,基於服務響應性能考慮,就需要給數據做緩存,所以決定使用Mysql+redis緩存的解決方案,將業務熱數據寫入Redis緩存,使得高頻業務數據可以直接從內存讀取,提高系統整體響應速度。
二、使用Redis+Mysql需要考慮的問題
使用redis緩存+mysql數據庫存儲能解決:
1. 數據讀寫的速度
2. 服務器的壓力問題
同時,就需要考慮同步問題了,Redis和Mysql的同步問題
三、Redis+mysql同步解決方案
1.寫Redis->redis寫mysql,讀Mysql。
以下是一個Redis+mysql同步的示例,該示例測試了寫入100000條數據的效率,先向Redis寫入100000條數據,再將數據讀出,寫入Mysql。
批量寫入緩解了服務器的壓力。
stdafx.h
// stdafx.h : 標准系統包含文件的包含文件, // 或是經常使用但不常更改的 // 特定於項目的包含文件 // #pragma once #include "targetver.h" #include <stdio.h> #include <tchar.h> #include <stdlib.h> #include <string.h> #include <iostream> #include <assert.h> #include <vector> #include "hiredis.h" #include <Windows.h> #include "mysql.h" #ifdef _DEBUG #pragma comment(lib, "hiredis_d.lib") #pragma comment(lib, "Win32_Interop_d.lib") #else #pragma comment(lib, "hiredis.lib") #pragma comment(lib, "Win32_Interop.lib") #endif #pragma comment(lib, "AdvAPI32.Lib") #pragma comment(lib, "DbgHelp.Lib") #pragma comment (lib, "Shlwapi.lib") #pragma comment(lib,"libmysql.lib") using namespace std; typedef struct testData { int iHeight; int iWidth; char szValue[64]; char szHValue[64]; }stTestData, *pstTestData;
test.h
#include "stdafx.h" #include "DBHandle.h" int main() { DBHandle *dbHandle = new DBHandle(); thread tWriteDataToRedis(&DBHandle::writeHsetToRedis, *dbHandle); tWriteDataToRedis.join(); return 0; }
DBHandle.h
#pragma once #include <mutex> #include <thread> class DBHandle { public: DBHandle(); ~DBHandle(); bool connectRedis(string strIp, int iPort, string strPwd); void freeRedis(); int getRedisDBSize(); bool writeHsetToRedis(); bool readDataFromRedis(); bool connectMysql(); void FreeMysqlConnect(); bool insertDataToMysql(string strData); redisContext* m_pRedisContext; MYSQL m_mysql; MYSQL_RES *res; //行的一個查詢結果集 };
DBHandle.cpp
#include "stdafx.h" #include "DBHandle.h" DBHandle::DBHandle() { m_pRedisContext = NULL; } DBHandle::~DBHandle() { if (m_pRedisContext != NULL) { m_pRedisContext = NULL; } } bool DBHandle::connectRedis(string strIp, int iPort, string strPwd) { //redis默認監聽端口為6387 可以再配置文件中修改 char szBuf[32] = {}; strcpy_s(szBuf, sizeof(strIp), strIp.c_str()); m_pRedisContext = redisConnect(szBuf, iPort); if (NULL == m_pRedisContext || m_pRedisContext->err) { return false; } //輸入Redis密碼 strcpy_s(szBuf, sizeof(strPwd), strPwd.c_str()); redisReply *pRedisReply = (redisReply*)redisCommand(m_pRedisContext, "AUTH %s", szBuf); if (NULL != pRedisReply) { freeReplyObject(pRedisReply); } if (NULL == pRedisReply->str) { return false; } return true; } void DBHandle::freeRedis() { redisFree(m_pRedisContext); if (m_pRedisContext != NULL) { m_pRedisContext = NULL; } } int DBHandle::getRedisDBSize() { //查看list長度 int iListLen = 0; //redisReply *pRedisReply = (redisReply *)redisCommand(m_pRedisContext, "LLen datalist"); redisReply *pRedisReply = (redisReply *)redisCommand(m_pRedisContext, "DBSIZE"); if (NULL != pRedisReply) { if (NULL == pRedisReply->integer) { return false; } iListLen = pRedisReply->integer; freeReplyObject(pRedisReply); } if (NULL == pRedisReply) { printf("%s \r\n", m_pRedisContext->errstr); return false; } return iListLen; } bool DBHandle::writeHsetToRedis() { bool bFlag = connectRedis("127.0.0.1", 6379, "123456"); if (false == bFlag) { return false; } time_t st = time(NULL);//秒 stTestData data = {}; int i = 1; while (i<100000) { data.iHeight = i; data.iWidth = 30; char szBuf[64] = {}; sprintf_s(szBuf, "width%d", i); strcpy_s(data.szValue, 64, szBuf); sprintf_s(data.szHValue, "%s%d", "heighttest", i); //向Redis寫入數據hset location (interger)1 "width" sprintf_s(szBuf, "hset location%d value %s", i, data.szValue); redisReply *pRedisReply = (redisReply *)redisCommand(m_pRedisContext, szBuf); if (NULL != pRedisReply) { freeReplyObject(pRedisReply); } i++; } printf("write finish"); readDataFromRedis(); time_t et = time(NULL); int iUsed = st - et; printf("used time is %d", iUsed); freeRedis(); return true; } bool DBHandle::readDataFromRedis() { /*bool bFlag = connectRedis("127.0.0.1", 6379, "123456"); if (false == bFlag) { return false; }*/ printf("read start"); int iSize = getListSize(); if (iSize <= 0) { return false; } bool bSuc = connectMysql(); if (bSuc == false) { return false; } int iCount = iSize;//計數 while (iCount > 0) { //用get命令獲取數據 redisReply *pRedisReply = (redisReply*)redisCommand(m_pRedisContext, "RPOP datalist"); if (NULL == pRedisReply) { return false; } if (NULL != pRedisReply->str) { string str = pRedisReply->str; insertDataToMysql(str); freeReplyObject(pRedisReply); } iCount--; } printf("read finish"); return true; } bool DBHandle::connectMysql() { mysql_init(&m_mysql); // Connects to a MySQL server const char host[] = "192.168.4.8"; const char user[] = "root"; const char passwd[] = "123456"; const char db[] = "topproductline"; unsigned int port = 3306; const char *unix_socket = NULL; unsigned long client_flag = 0; /*A MYSQL* connection handler if the connection was successful, NULL if the connection was unsuccessful. For a successful connection, the return value is the same as the value of the first parameter.*/ if (mysql_real_connect(&m_mysql, host, user, passwd, db, port, unix_socket, client_flag)) { printf("The connection was successful.\n"); return true; } else { printf("Error connecting to database:%s\n", mysql_error(&m_mysql)); return false; } } void DBHandle::FreeMysqlConnect() { mysql_free_result(res); mysql_close(&m_mysql); } bool DBHandle::insertDataToMysql(string strData) { char szQuery[256] = {0}; sprintf_s(szQuery, "insert into a_test (type) values ('%s');", strData.c_str()); if (mysql_query(&m_mysql, szQuery)) { printf("Query failed (%s)\n", mysql_error(&m_mysql)); return false; } else { printf("Insert success\n"); return true; } }
測試結果:
2.寫redis->寫mysql,讀Redis->未找到->讀Mysql