數據庫應用之--Redis+mysql實現大量數據的讀寫,以及高並發


一、開發背景

在項目開發過程中中遇到了以下三個需求:

  1. 多個用戶同時上傳數據;

  2. 數據庫需要支持同時讀寫;

  3. 1分鍾內存儲上萬條數據;

根據對Mysql的測試情況,遇到以下問題:

  1. 最先遇到壓力的是服務器,在寫入2500-3000條數據時,服務器崩潰了;

  2. 當數據庫寫入時,耗時太長,10000條數據,大概需要505.887s,相當於8分鍾,如下:

  a. 表結構:

  

 

   b. 數據庫Procedure:

DROP PROCEDURE IF EXISTS my_insert;
CREATE PROCEDURE my_insert()
BEGIN
   DECLARE n int DEFAULT 1;
        loopname:LOOP
            INSERT INTO car_pathinfo_driver_cpy(id, linkphone,cartype,carcolor,carnumber,drivername,pubtimes)VALUES(n+500,'18838325709','雪弗蘭','','豫A190XS','siker','3');
            SET n=n+1;
        IF n=10000 THEN
            LEAVE loopname;
        END IF;
        END LOOP loopname;
END;
CALL my_insert();

  c. 運行結果如下:

  

  3. 不斷的數據庫寫入導致數據庫壓力過大;

出現以上問題,是由於mysql是基於磁盤的IO,基於服務響應性能考慮,就需要給數據做緩存,所以決定使用Mysql+redis緩存的解決方案,將業務熱數據寫入Redis緩存,使得高頻業務數據可以直接從內存讀取,提高系統整體響應速度。

二、使用Redis+Mysql需要考慮的問題
  使用redis緩存+mysql數據庫存儲能解決:

  1. 數據讀寫的速度

  2. 服務器的壓力問題

  同時,就需要考慮同步問題了,Redis和Mysql的同步問題

三、Redis+mysql同步解決方案

  1.寫Redis->redis寫mysql,讀Mysql。

  以下是一個Redis+mysql同步的示例,該示例測試了寫入100000條數據的效率,先向Redis寫入100000條數據,再將數據讀出,寫入Mysql。

    批量寫入緩解了服務器的壓力。

stdafx.h

// stdafx.h : 標准系統包含文件的包含文件,
// 或是經常使用但不常更改的
// 特定於項目的包含文件
//

#pragma once

#include "targetver.h"

#include <stdio.h>
#include <tchar.h>
#include <stdlib.h>
#include <string.h>
#include <iostream>
#include <assert.h>
#include <vector>
#include "hiredis.h"
#include <Windows.h>
#include "mysql.h"

#ifdef _DEBUG
#pragma comment(lib, "hiredis_d.lib")
#pragma comment(lib, "Win32_Interop_d.lib")
#else
#pragma comment(lib, "hiredis.lib")
#pragma comment(lib, "Win32_Interop.lib")

#endif
#pragma comment(lib, "AdvAPI32.Lib")
#pragma comment(lib, "DbgHelp.Lib")
#pragma comment (lib, "Shlwapi.lib")
#pragma comment(lib,"libmysql.lib")

using namespace std;

typedef struct testData
{
    int iHeight;
    int iWidth;
    char szValue[64];
    char szHValue[64];
}stTestData, *pstTestData;

test.h

#include "stdafx.h"
#include "DBHandle.h"

int main()
{

    DBHandle *dbHandle = new DBHandle();
    thread tWriteDataToRedis(&DBHandle::writeHsetToRedis, *dbHandle);
    tWriteDataToRedis.join();

    return 0;
}

DBHandle.h

#pragma once
#include <mutex>
#include <thread>

class DBHandle
{
public:
    DBHandle();
    ~DBHandle();

    bool connectRedis(string strIp, int iPort, string strPwd);
    void freeRedis();

    int getRedisDBSize();

    bool writeHsetToRedis();
    bool readDataFromRedis();

    bool connectMysql();
    void FreeMysqlConnect();

    bool insertDataToMysql(string strData);

    redisContext* m_pRedisContext;
    MYSQL m_mysql;
    MYSQL_RES *res;     //行的一個查詢結果集
    

};

DBHandle.cpp

#include "stdafx.h"
#include "DBHandle.h"


DBHandle::DBHandle()
{
    m_pRedisContext = NULL;
}

DBHandle::~DBHandle()
{ 
    if (m_pRedisContext != NULL)
    {
        m_pRedisContext = NULL;
    } 
}

bool DBHandle::connectRedis(string strIp, int iPort, string strPwd)
{
    //redis默認監聽端口為6387 可以再配置文件中修改 
    char szBuf[32] = {};
    strcpy_s(szBuf, sizeof(strIp), strIp.c_str());
    m_pRedisContext = redisConnect(szBuf, iPort);
    if (NULL == m_pRedisContext || m_pRedisContext->err)
    {
        return false;
    }

    //輸入Redis密碼
    strcpy_s(szBuf, sizeof(strPwd), strPwd.c_str());
    redisReply *pRedisReply = (redisReply*)redisCommand(m_pRedisContext, "AUTH %s", szBuf);
    if (NULL != pRedisReply)
    {
        freeReplyObject(pRedisReply);
    }
    if (NULL == pRedisReply->str)
    {
        return false;
    }
    return true;

}

void DBHandle::freeRedis()
{
    redisFree(m_pRedisContext);
    if (m_pRedisContext != NULL)
    {
        m_pRedisContext = NULL;
    }
}

int DBHandle::getRedisDBSize()
{
    //查看list長度
    int iListLen = 0;
    //redisReply *pRedisReply = (redisReply *)redisCommand(m_pRedisContext, "LLen datalist");
    redisReply *pRedisReply = (redisReply *)redisCommand(m_pRedisContext, "DBSIZE");
    if (NULL != pRedisReply)
    {
        if (NULL == pRedisReply->integer)
        {
            return false;
        }
        iListLen = pRedisReply->integer;
        freeReplyObject(pRedisReply);
    }
    if (NULL == pRedisReply)
    {
        printf("%s \r\n", m_pRedisContext->errstr);
        return false;
    }
    
    return iListLen;
}

bool DBHandle::writeHsetToRedis()
{
    bool bFlag = connectRedis("127.0.0.1", 6379, "123456");
    if (false == bFlag)
    {
        return false;
    }

    time_t st = time(NULL);//
    stTestData data = {};
    int i = 1;
    while (i<100000)
    {

        data.iHeight = i;
        data.iWidth = 30;
        char szBuf[64] = {};
        sprintf_s(szBuf, "width%d", i);
        strcpy_s(data.szValue, 64, szBuf);
        sprintf_s(data.szHValue, "%s%d", "heighttest", i);

        //向Redis寫入數據hset location (interger)1 "width"
        sprintf_s(szBuf, "hset location%d value %s", i, data.szValue);
        redisReply *pRedisReply = (redisReply *)redisCommand(m_pRedisContext, szBuf);
        if (NULL != pRedisReply)
        {
            freeReplyObject(pRedisReply);
        }
        i++;
    }

    printf("write finish");
    readDataFromRedis();

    time_t et = time(NULL);
    int iUsed = st - et;
    printf("used time is %d", iUsed);
    freeRedis();
    return true;

}


bool DBHandle::readDataFromRedis()
{
    /*bool bFlag = connectRedis("127.0.0.1", 6379, "123456");
    if (false == bFlag)
    {
        return false;
    }*/

    printf("read start");
    
    int iSize = getListSize();
    if (iSize <= 0)
    {
        return false;
    }
    bool bSuc = connectMysql();
    if (bSuc == false)
    {
        return false;
    }

    int iCount = iSize;//計數
    while (iCount > 0)
    {
        //用get命令獲取數據
        redisReply *pRedisReply = (redisReply*)redisCommand(m_pRedisContext, "RPOP datalist");
        if (NULL == pRedisReply)
        {
            return false;
        }
        if (NULL != pRedisReply->str)
        {
            string str = pRedisReply->str;
            insertDataToMysql(str);
            freeReplyObject(pRedisReply);
        }
        iCount--;
    }
    
    printf("read finish");
    
    return true;

}

bool DBHandle::connectMysql()
{
    mysql_init(&m_mysql);

    // Connects to a MySQL server
    const char host[] = "192.168.4.8";
    const char user[] = "root";
    const char passwd[] = "123456";
    const char db[] = "topproductline";
    unsigned int port = 3306;
    const char *unix_socket = NULL;
    unsigned long client_flag = 0;

    /*A MYSQL* connection handler if the connection was successful,
    NULL if the connection was unsuccessful. For a successful connection,
    the return value is the same as the value of the first parameter.*/
    if (mysql_real_connect(&m_mysql, host, user, passwd, db, port, unix_socket, client_flag)) {
        printf("The connection was successful.\n");
        return true;
    }
    else {
        printf("Error connecting to database:%s\n", mysql_error(&m_mysql));
        return false;
    }
}

void DBHandle::FreeMysqlConnect()
{
    mysql_free_result(res);
    mysql_close(&m_mysql);
}

bool DBHandle::insertDataToMysql(string strData)
{
    char szQuery[256] = {0};
    sprintf_s(szQuery, "insert into a_test (type) values ('%s');", strData.c_str());
    if (mysql_query(&m_mysql, szQuery)) {
        printf("Query failed (%s)\n", mysql_error(&m_mysql));
        return false;
    }
    else {
        printf("Insert success\n");
        return true;
    }
}

   測試結果:

 

 

  2.寫redis->寫mysql,讀Redis->未找到->讀Mysql

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM