mysql與redis數據同步(c/c++)(寫mysql同步到redis,並且以json格式保存)


系統開發中時常會需要緩存來提升並發讀的能力,這時可以通過mysql的UDF和hiredis來進行同步 

原理: 

通過mysql自動同步redis

在服務端開發過程中,一般會使用MySQL等關系型數據庫作為最終的存儲引擎,Redis其實也可以作為一種鍵值對型的數據庫,但在一些實際場景中,特別是關系型結構並不適合使用Redis直接作為數據庫。這倆家伙簡直可以用“男女搭配,干活不累”來形容,搭配起來使用才能事半功倍。本篇我們就這兩者如何合理搭配以及他們之間數據如何進行同步展開。

一般地,Redis可以用來作為MySQL的緩存層。為什么MySQL最好有緩存層呢?想象一下這樣的場景:在一個多人在線的游戲里,排行榜、好友關系、隊列等直接關系數據的情景下,如果直接和MySQL正面交手,大量的數據請求可能會讓MySQL疲憊不堪,甚至過量的請求將會擊穿數據庫,導致整個數據服務中斷,數據庫性能的瓶頸將掣肘業務的開發;那么如果通過Redis來做數據緩存,將大大減小查詢數據的壓力。在這種架子里,當我們在業務層有數據查詢需求時,先到Redis緩存中查詢,如果查不到,再到MySQL數據庫中查詢,同時將查到的數據更新到Redis里;當我們在業務層有修改插入數據需求時,直接向MySQL發起請求,同時更新Redis緩存。

在上面這種架子中,有一個關鍵點,就是MySQL的CRUD發生后自動地更新到Redis里,這需要通過MySQL UDF來實現。具體來說,我們把更新Redis的邏輯放到MySQL中去做,即定義一個觸發器Trigger,監聽CRUD這些操作,當操作發生后,調用對應的UDF函數,遠程寫回Redis,所以業務邏輯只需要負責更新MySQL就行了,剩下的交給MySQL UDF去完成。

 

那 什么是UDF呢?

UDF,是User Defined Function的縮寫,用戶定義函數。MySQL支持函數,也支持自定義的函數。UDF比存儲方法有更高的執行效率,並且支持聚集函數。

UDF定義了5個API:xxx_init()、xxx_deinit()、xxx()、xxx_add()、xxx_clear()。官方文檔(http://dev.mysql.com/doc/refman/5.7/en/adding-udf.html)給出了這些API的說明。相關的結構體定義在mysql_com.h里,它又被mysql.h包含,使用時只需#include<mysql.h>即可。他們之間的關系和執行順序可以以下圖來表示:

1. xxx()

這是主函數,5個函數至少需要xxx(),對MySQL操作的結果在此返回。函數的聲明如下:

char *xxx(UDF_INIT *initid, UDF_ARGS *args, char *result, unsigned long *length, char *is_null, char *error);

long long xxx(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error);

double xxx(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error);

SQL的類型和C/C++類型的映射:

SQL Type C/C++ Type
STRING char *
INTEGER long long
REAL double

2. xxx_init()

xxx()主函數的初始化,如果定義了,則用來檢查傳入xxx()的參數數量、類型、分配內存空間等初始化操作。函數的聲明如下:

my_bool xxx_init(UDF_INIT *initid, UDF_ARGS *args, char *message);

3. xxx_deinit()

xxx()主函數的反初始化,如果定義了,則用來釋放初始化時分配的內存空間。函數的聲明如下:

void xxx_deinit(UDF_INIT *initid);

4. xxx_add()

在聚合UDF中反復調用,將參數加入聚合參數中。函數的聲明如下:

void xxx_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null,char *error);

5. xxx_clear()

在聚合UDF中反復調用,重置聚合參數,為下一行數據的操作做准備。函數的聲明如下:

void xxx_clear(UDF_INIT *initid, char *is_null, char *error);

(二.) UDF函數的基本使用

在此之前,需要先安裝mysql的開發包:

[root@localhost zhxilin]# yum install mysql-devel -y

我們定義一個最簡單的UDF主函數:

復制代碼
復制代碼
 1 /*simple.cpp*/
 2 #include <mysql.h>
 3 
 4 extern "C" long long simple_add(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error)
 5 {
 6     int a = *((long long *)args->args[0]);
 7     int b = *((long long *)args->args[1]);
 8     return a + b;
 9 }
10 
11 extern "C" my_bool simple_add_init(UDF_INIT *initid, UDF_ARGS *args, char *message)
12 {
13     return 0;
14 }
復制代碼
復制代碼

由於mysql提供的接口是C實現的,我們在C++中使用時需要添加:

extern "C" { ... }

接下來編譯成動態庫.so:

[zhxilin@localhost mysql-redis-test]$ g++ -shared -fPIC -I /usr/include/mysql -o simple_add.so simple.cpp

-shared 表示編譯和鏈接時使用的是全局共享的類庫;

-fPIC編譯器輸出位置無關的目標代碼,適用於動態庫;

-I /usr/include/mysql 指明包含的頭文件mysql.h所在的位置。

編譯出simple_add.so后用root拷貝到/usr/lib64/mysql/plugin下:

[root@localhost mysql-redis-test]# cp simple_add.so /usr/lib64/mysql/plugin/

緊接着可以在MySQL中創建函數執行了。登錄MySQL,創建關聯函數:

mysql> CREATE FUNCTION simple_add RETURNS INTEGER SONAME 'simple_add.so';
Query OK, 0 rows affected (0.04 sec)

測試UDF函數:

復制代碼
復制代碼
mysql> select simple_add(10, 5);
+-------------------+
| simple_add(10, 5) |
+-------------------+
|                15 |
+-------------------+
1 row in set (0.00 sec)
復制代碼
復制代碼

可以看到,UDF正確執行了加法。

創建UDF函數的語法是 CREATE FUNCTION xxx RETURNS [INTEGER/STRING/REAL] SONAME '[so name]';

刪除UDF函數的語法是 DROP FUNCTION simple_add;

mysql> DROP FUNCTION simple_add;
Query OK, 0 rows affected (0.03 sec)

 

(三). 在UDF中訪問Redis

跟上述做法一樣,只需在UDF里調用Redis提供的接口函數。Redis官方給出了Redis C++ Client (https://github.com/mrpi/redis-cplusplus-client),封裝了Redis的基本操作。

源碼是依賴boost,需要先安裝boost:

[root@localhost dev]# yum install boost boost-devel

然后下載redis cpp client源碼:

[root@localhost dev]# git clone https://github.com/mrpi/redis-cplusplus-client

使用時需要把redisclient.h、anet.h、fmacros.h、anet.c 這4個文件考到目錄下,開始編寫關於Redis的UDF。我們定義了redis_hset作為主函數,連接Redis並調用hset插入哈希表,redis_hset_init作為初始化,檢查參數個數和類型。

復制代碼
復制代碼
 1 /* test.cpp */
 2 #include <stdio.h>
 3 #include <mysql.h>
 4 #include "redisclient.h"
 5 using namespace boost;
 6 using namespace std;
 7 
 8 static redis::client *m_client = NULL;
 9 
10 extern "C" char *redis_hset(UDF_INIT *initid, UDF_ARGS *args, char *result, unsigned long *length, char *is_null, char *error) {
11     try {
12         // 連接Redis
13         if(NULL == m_client) {
14             const char* c_host = getenv("REDIS_HOST");
15             string host = "127.0.0.1";
16             if(c_host) {
17                 host = c_host;
18             }
19             m_client = new redis::client(host);
20         }        
21 
22         if(!(args->args && args->args[0] && args->args[1] && args->args[2])) {
23             *is_null = 1;
24             return result;
25         }
26 
27         // 調用hset插入一個哈希表
28         if(m_client->hset(args->args[0], args->args[1], args->args[2])) {
29             return result;
30         } else {
31             *error = 1;
32             return result;
33         }
34     } catch (const redis::redis_error& e) {
35         return result;
36     }
37 }
38 
39 extern "C" my_bool redis_hset_init(UDF_INIT *initid, UDF_ARGS *args, char *message) {
40     if (3 != args->arg_count) {
41         // hset(key, field, value) 需要三個參數
42         strncpy(message, "Please input 3 args for: hset('key', 'field', 'value');", MYSQL_ERRMSG_SIZE);
43         return -1;
44     }
45     if (args->arg_type[0] != STRING_RESULT  || 
46         args->arg_type[1] != STRING_RESULT  || 
47         args->arg_type[2] != STRING_RESULT) { 
48         // 檢查參數類型
49         strncpy(message, "Args type error: hset('key', 'field', 'value');", MYSQL_ERRMSG_SIZE);
50         return -1;
51     }
52 
53     args->arg_type[0] = STRING_RESULT;
54     args->arg_type[1] = STRING_RESULT;
55     args->arg_type[2] = STRING_RESULT;
56 
57     initid->ptr = NULL;
58     return 0;
59 }
復制代碼
復制代碼

編譯鏈接:

[zhxilin@localhost mysql-redis-test]$ g++ -shared -fPIC -I /usr/include/mysql -lboost_serialization -lboost_system -lboost_thread -o libmyredis.so anet.c test.cpp

編譯時需要加上-lboost_serialization -lboost_system -lboost_thread, 表示需要鏈接三個動態庫:libboost_serialization.so、libboost_system.so、libboost_thread.so,否則在運行時會報缺少函數定義的錯誤。

編譯出libmyredis.so之后,將其拷貝到mysql的插件目錄下並提權:

[root@localhost mysql-redis-test]# cp libmyredis.so /usr/lib64/mysql/plugin/ & chmod 777 /usr/lib64/mysql/plugin/libmyredis.so 

完成之后登錄MySQL,創建關聯函數測試一下:

mysql> DROP FUNCTION IF EXISTS `redis_hset`;
Query OK, 0 rows affected (0.16 sec)

mysql> CREATE FUNCTION redis_hset RETURNS STRING SONAME 'libmyredis.so';
Query OK, 0 rows affected (0.02 sec)

先刪除老的UDF,注意函數名加反引號(``)。調用UDF測試,返回0,執行成功:

復制代碼
復制代碼
mysql> SELECT redis_hset('zhxilin', 'id', '09388334');
+-----------------------------------------+
| redis_hset('zhxilin', 'id', '09388334') |
+-----------------------------------------+
| 0                                                     |
+-----------------------------------------+
1 row in set (0.00 sec)
復制代碼
復制代碼

打開redis-cli,查看結果:

127.0.0.1:6379> HGETALL zhxilin
1) "id"
2) "09388334"

四. 通過MySQL觸發器刷新Redis

 在上一節的基礎上,我們想讓MySQL在增刪改查的時候自動調用UDF,還需要借助MySQL觸發器。觸發器可以監聽INSERT、UPDATE、DELETE等基本操作。在MySQL中,創建觸發器的基本語法如下:

CREATE TRIGGER trigger_name
trigger_time
trigger_event ON table_name
FOR EACH ROW
trigger_statement

trigger_time表示觸發時機,值為AFTERBEFORE

trigger_event表示觸發的事件,值為INSERTUPDATEDELETE等;

trigger_statement表示觸發器的程序體,可以是一句SQL語句或者調用UDF。

在trigger_statement中,如果有多條SQL語句,需要用BEGIN...END包含起來:

BEGIN
[statement_list]
END

由於MySQL默認的結束分隔符是分號(;),如果我們在BEGIN...END中出現了分號,將被標記成結束,此時沒法完成觸發器的定義。有一個辦法,可以調用DELIMITER命令來暫時修改結束分隔符,用完再改會分號即可。比如改成$:

mysql> DELIMITER $

我們開始定義一個觸發器,監聽對Student表的插入操作,Student表在上一篇文章中創建的,可以查看上一篇文章。

復制代碼
復制代碼
mysql > DELIMITER $
      > CREATE TRIGGER tg_student 
      > AFTER INSERT on Student 
      > FOR EACH ROW 
      > BEGIN
      > SET @id = (SELECT redis_hset(CONCAT('stu_', new.Sid), 'id', CAST(new.Sid AS CHAR(8))));
      > SET @name = (SELECT redis_hset(CONCAT('stu_', new.Sid), 'name', CAST(new.Sname AS CHAR(20))));
      > Set @age = (SELECT redis_hset(CONCAT('stu_', new.Sid), 'age', CAST(new.Sage AS CHAR))); 
      > Set @gender = (SELECT redis_hset(CONCAT('stu_', new.Sid), 'gender', CAST(new.Sgen AS CHAR))); 
      > Set @dept = (SELECT redis_hset(CONCAT('stu_', new.Sid), 'department', CAST(new.Sdept AS CHAR(10))));    
      > END $
復制代碼
復制代碼

創建完觸發器可以通過show查看,或者drop刪除:

mysql> SHOW TRIGGERS;
mysql> DROP TRIGGER tg_student;

接下來我們調用一句插入語句,然后觀察Redis和MySQL數據的變化:

mysql> INSERT INTO Student VALUES('09388165', 'Rose', 19, 'F', 'SS3-205');
Query OK, 1 row affected (0.27 sec)

MySQL的結果:

復制代碼
復制代碼
mysql> SELECT * FROM Student;
+----------+---------+------+------+---------+
| Sid      | Sname   | Sage | Sgen | Sdept   |
+----------+---------+------+------+---------+
| 09388123 | Lucy    |   18 | F    | AS2-123 |
| 09388165 | Rose    |   19 | F    | SS3-205 |
| 09388308 | zhsuiy  |   19 | F    | MD8-208 |
| 09388318 | daemon  |   18 | M    | ZS4-630 |
| 09388321 | David   |   20 | M    | ZS4-731 |
| 09388334 | zhxilin |   20 | M    | ZS4-722 |
+----------+---------+------+------+---------+
6 rows in set (0.00 sec)
復制代碼
復制代碼

Redis的結果:

復制代碼
復制代碼
127.0.0.1:6379> HGETALL stu_09388165
 1) "id"
 2) "09388165"
 3) "name"
 4) "Rose"
 5) "age"
 6) "19"
 7) "gender"
 8) "F"
 9) "department"
10) "SS3-205"
復制代碼
復制代碼

以上結果表明,當MySQL插入數據時,通過觸發器調用UDF,實現了自動刷新Redis的數據。另外,調用MySQL插入的命令,可以通過C++實現,進而就實現了在C++的業務邏輯里,只需調用MySQL++的接口就能實現MySQL數據庫和Redis緩存的更新,這部分內容在上一篇文章已經介紹過了。

 

總結

通過實踐,能體會到MySQL和Redis是多么相親相愛吧!^_^

本篇文章講了從最基礎的UDF開始,再到通過UDF連接Redis插入數據,再進一步介紹通過MySQL Trigger自動更新Redis數據的整個思路,實現了一個目標,即只在業務代碼中更新MySQL數據庫,進而Redis能夠自動同步刷新。

MySQL對UDF函數和觸發器的支持,使得實現Redis數據和MySQL自動同步成了可能。當然UDF畢竟是通過插件的形式運行在MySQL中的,並沒有過多的安全干預,一旦插件發生致命性崩潰,有可能MySQL也會掛,所以在編寫UDF的時候需要非常謹慎!

 

說了原理,接下來看如何使用,其實有很多已經實現的比較好的,我們可以直接使用

 

A high performance mysql udf to sync the newly modified/inserted data from mysql to redis cache.

https://github.com/dawnbreaks/mysql2redis(推薦使用這個,下面的例子以這個為主,具體原理請看上面)

 

A UDF(user defined functions) plugin for MySQL, which can be used for pushing data to Redis

https://github.com/jackeylu/mysql2redis

 

This is used to move the mysql data to redis or from redis to mysql.

https://github.com/zhangjg/mysql2redis

 

MySQL Syncer is a project which parse mysql binlog and sync to other datases, such as redis, mongodb and any other databases..

https://github.com/Terry-Mao/MySQL-Syncer

 

前題:安裝了mysql5.5(以上)和client 

下面依次安裝
1 安裝apr-1.4.6

   1. wget   https://www-us.apache.org/dist//apr/apr-1.7.0.tar.gz

   2.     tar -xvf   apr-1.7.0.tar.gz

   3   ./configure --prefix=/usr/local/apr //配置到指定目錄,如果此時報錯是關於aprd ,則需要安裝gcc,只需sudo apt-get install gcc即可,然后再次執行上一步
      mak&&make install //需要再root模式下運行?前面加sudo

1.2.安裝apr-util

  wget https://www-us.apache.org/dist//apr/apr-util-1.6.1.tar.gz
  cd ../apr-util/
  ./configure   --with-apr=/usr/local/apr
  make
  make install //需要root權限 前面加sudo

安裝后請檢驗生成庫是否的路徑可以訪問:(usr/local/apr/lib/)

  不能的話拷貝庫到/lib或者/usr/lib

或者配置

  export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/apr/lib/

2、安裝mysql2redis 

Shell代碼    收藏代碼
  1. git clone https://github.com/dawnbreaks/mysql2redis  
  2. cd mysql2redis  
  3. make  

如果出現關於off64_t錯誤(在Makefile中加入 -Doff64_t=__off64_t)

避免把警告當成錯誤,可以去掉-Werr
3、安裝hiredis 

Shell代碼    收藏代碼
  1. git clone http://github.com/redis/hiredis  
  2. cd hiredis  
  3. make && make install  



3、安裝mysql json udf 

Shell代碼    收藏代碼
  1. git clone https://github.com/mysqludf/lib_mysqludf_json.git  
  2. cd lib_mysqludf_json  
  3. gcc $(/usr/local/mariamysql/bin/mysql_config --cflags) -shared -fPIC -o lib_mysqludf_json.so lib_mysqludf_json.c(如果已經存在則可以直接使用)  

然后將lib_mysqludf_json.so拷貝到mysql的plugin (查看插件目錄:mysql_config --plugindir)
在mysql里執行lib_mysqludf_json.sql 

執行mysql2redis目錄下的:兩個install*.sql文件

測試使用test*.sql;

其中install_redis_udf.sql

DROP FUNCTION IF EXISTS redis_servers_set_v2;
DROP FUNCTION IF EXISTS redis_command_v2;
DROP FUNCTION IF EXISTS free_resources;

CREATE FUNCTION redis_servers_set_v2 RETURNS int SONAME "lib_mysqludf_redis_v2.so";
CREATE FUNCTION redis_command_v2 RETURNS int SONAME "lib_mysqludf_redis_v2.so";
CREATE FUNCTION free_resources RETURNS int SONAME "lib_mysqludf_redis_v2.so";

  其實就是把函數生成庫到mysql,使用mysql的udf進行調用redis接口訪問redis

lib_mysqludf_json(https://github.com/mysqludf/lib_mysqludf_json)

A UDF library of functions to map relational data to the JSON format

 

 

就是把mysql的數據轉換成json格式存儲在redis中

 

 

 

 

 

 

 

select 
redis_command_v2("lpush","crmInboxEvents11",json_object(json_members("op","insert","value","valuettt")));

select redis_servers_set_v2("192.168.0.118",6379);
select redis_command_v2("lpush","crmInboxEvents11",json_object(json_members("op","insert","value","valuettt")));

select redis_command_v2("hset","hkey","hfield",json_object(json_members("op","insert","value","valuettt")));


select free_resources();
select redis_servers_set_v2("192.168.0.118",6379);

  

  • json_members - maps a variable number of name-value pairs to a list of JSON object members.
  • json_object - maps a variable number of arguments to a JSON object

例子如下:

表結構

創建mysql觸發器,每插入一條數據,更新redis,以json風格,程序可以直接讀取redis下的存儲的json

DELIMITER $$
CREATE TRIGGER xb_ai AFTER INSERT ON xb
FOR EACH ROW
BEGIN
DECLARE ret INT DEFAULT 999;  
 SET ret=
  	 redis_command_v2("set",concat("user:","id:",cast(NEW.id as CHAR)),json_object(NEW.id as "id",NEW.name as "name"));
if ret>0 then  
  SIGNAL sqlstate '45001' set message_text = "Redis error! ";  
end if ;  
 
 END$$
DELIMITER ;

  

就是json_object(NEW.Idas"id",NEW.name as"type")//新增加的行id,name各種生成key-value json對

 新增一條數據:

進入redis-cli可以看到

 

json如何使用請看下面(

參考:、

lib_mysqludf_json(https://github.com/mysqludf/lib_mysqludf_json)

A UDF library of functions to map relational data to the JSON format

To obtain nested objects, use this function as an argument for json_object

select      json_object(
                f.last_update
            ,   json_members(
                    'film'
                ,   json_object(
                        f.film_id
                    ,   f.title
                    ,   f.last_update
                    )
                ,   'category'
                ,   json_object(
                        c.category_id
                    ,   c.name
                    ,   c.last_update
                    )
                )
            ) as film_category
from        film_category fc 
inner join  film f
on          fc.film_id = f.film_id
inner join  category c
on          fc.category_id = c.category_id
where       f.film_id =1;

yields a string representing the following JSON object (indentation added for readability):

{
    last_update:"2006-02-15 05:03:42"
,   film:{
        "film_id":1
    ,   "title":"ACADEMY DINOSAUR"
    ,   "last_update":"2006-02-15 05:03:42"
    }
,   category:{
        "category_id":6
    ,   "name":"Documentary"
    ,   "last_update":"2006-02-15 04:46:27"
    }
}

例子2:

在mysql里創建table,trigger 

Sql代碼    收藏代碼
CREATE  TABLE IF NOT EXISTS `test`.`t_users` (  
  `user_name` VARCHAR(50) NOT NULL ,  
  `nick_name` VARCHAR(100) NOT NULL ,  
  `password` VARCHAR(32) NOT NULL ,  
  `age` INT NULL ,  
  `create_time` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP ,  
  PRIMARY KEY (`user_name`) ,  
  UNIQUE INDEX `un_t_users_nick_name` (`nick_name` ASC) )  
ENGINE = InnoDB;  

  

 

Sql代碼    收藏代碼
DELIMITER |  
create trigger tri_users_redis_json BEFORE INSERT on t_users  
  For EACH ROW   
BEGIN  
DECLARE done INT DEFAULT 999;  
  
set done = redis_command_v2("set",concat("user::",cast(NEW.user_name as CHAR),
,json_object(NEW.user_name as userName,NEW.nick_name as nickName,NEW.age as age)));
if done>0
then
  SIGNAL sqlstate '45001' set message_text = "Redis error! ";
end if ;
END; |
DELIMITER ;

  

 

Sql代碼    收藏代碼
  1. insert into t_users(user_name,nick_name,password,age,create_time) values('Sally','雪莉','123456',25,CURRENT_TIMESTAMP)  



mysql的 


  

redis的 


 

使用mysql的udf和trigger可以保證mysql和redis的數據一致性,SIGNAL sqlstate '45001'會在redis失敗時回滾事物。 

問題:hiredis里遞交帶空格數據需要這樣使用 

C代碼    收藏代碼
  1. reply = redisCommand(context, "SET key:%s %s", myid, value);  


那么對於mysql2redis的redis_command是無法工作的,這部分可以為不同命令單寫函數。如redis_command_set。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM