1、【基礎】redis能帶給我們什么福利
Redis(Remote Dictionary Server)官網:https://redis.io/
Redis命令:https://redis.io/commands
Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache and message broker. It supports data structures such as strings, hashes, lists, sets, sorted sets with range queries, bitmaps, hyperloglogs, geospatial indexes with radius queries and streams. Redis has built-in replication, Lua scripting, LRU eviction, transactions and different levels of on-disk persistence, and provides high availability via Redis Sentinel and automatic partitioning with Redis Cluster. //------------------------------------- Redis是一個開源(BSD許可),內存數據結構存儲,用作數據庫,緩存和消息代理。 它支持數據結構,如字符串,散列,列表,集合,帶有范圍查詢的排序集,位圖,超級日志,具有半徑查詢和流的地理空間索引。 Redis具有內置復制,Lua腳本,LRU驅逐,事務和不同級別的磁盤持久性,並通過Redis Sentinel提供高可用性並使用Redis Cluster自動分區。
1.1、Redis前世今生
- 最開始使用本機內存的NativeCache(NativeCache無法分布式共享),隨着網站規模越大,我們需要一個分布式的緩存產品,Memcache誕生。
-
隨着memcache緩存大行其道,互聯網規模進一步擴大,對應用程序性能要求越來越高以及應用場景的越來越多 【09年】,比如內存數據庫,異構化消息隊列 等等,而原來市面上的memcache 暴露了以下幾個缺點:
- memcache就是一個巨大的hash表,數據結構單一,我們知道編程語言中數據結構類型眾多。
數據結構類型:【List,HashSet, Dictionary, SortDictionary, BitArray, Queue, Stack, SortList。。。。】 - memcache 無法持久化,導致只能作為緩存使用,重啟之后數據就會丟失。
-
無法做到規模化的集群,memcache可以使用 一致性hash 的方式做到一個簡單的memcahce集群,非常依賴於客戶端實現,也並非無損的。
set username jack hash(username)=8億 ,沿着順時針走,碰到的第一個server節點就是要存放的節點。。。
所以我們非常渴望有一個東西可以解決上面三個問題,自己研發太費時費力,剛好redis就是為了解決這些頭疼的問題。
- memcache就是一個巨大的hash表,數據結構單一,我們知道編程語言中數據結構類型眾多。
1.2、redis給我們帶來了哪些福利
- 概況
可以在redis官網上看到,目前redis支持的數據類型之多,非常豐富:
Redis數據類型 String Bitmap List(雙端隊列) Set Geo Hash HyperLogLogs Stream SortetSet(SkipList) C#數據類型 String BitArray (LinkedList+Stack+Queue+List) HashSet --- Dictionary --- --- SortDictionary(紅黑樹)
- 持久化
使用AOF追加模式,RDB模式,以及混合模式,既然能緩存,就可以當做一個memroy db使用。- AOF: 使用大量的操作命令進行數據恢復。
- RDB: 內存快照磁盤化。
- FixMode:混合兩種。
- 集群
Redis自帶的Cluster集群模式,Sentinel 和 第三方豌豆莢的Codis集群搭建。
2、【搭建】使用centos和docker化快速部署
虛擬機CentOS7安裝步驟:https://www.cnblogs.com/wyt007/p/10295834.html
XShell6破解版:鏈接: https://pan.baidu.com/s/1YtnkN4_yAOU5Dc1j69ltrg 提取碼: nchp
2.1、centos7平台的部署
- 安裝
首先到Redis官網獲取Redis最新下載地址:http://download.redis.io/releases/redis-5.0.3.tar.gz
然后在CentOS7上面進行安裝
mkdir /data cd /data wget http://download.redis.io/releases/redis-5.0.3.tar.gz tar xzf redis-5.0.3.tar.gz mv redis-5.0.3 redis cd redis make
如果出現 gcc:命令未找到 ,安裝gcc並重新執行 make

yum -y install gcc automake autoconf libtool make //如果以上命令出現[Errno 256] No more mirrors to try.執行下面命令再重新安裝gcc yum clean all
如果出現:致命錯誤:jemalloc/jemalloc.h:沒有那個文件或目錄,則執行下方命令

make MALLOC=libc
- 這時候我們查看是否成功安裝Redis(/data/redis/src/ 目錄下有無redis-cli 與redis-server),並將它們拷貝到上級文件夾
cd /data/redis/src/ cp redis-cli ../ cp redis-server ../
- 啟動Redis
[root@localhost src]# cd /data/redis/ [root@localhost redis]# ./redis-server ./redis-conf
- 查看端口
netstat -tlnp

- 測試存儲
[root@localhost ~]# cd /data/redis/ [root@localhost redis]# ./redis-cli 127.0.0.1:6379> set username jack OK 127.0.0.1:6379> get username "jack" 127.0.0.1:6379> dbsize (integer) 1 127.0.0.1:6379> keys * 1) "username"
- 退出客戶端命令
quit
- 配置Redis
Redis啟動完成后是無法進行外網訪問的,因此我們需要修改redis.conf
protect-mode 保護模式
bind 綁定網卡接口bind 127.0.0.1 => bind 0.0.0.0 protected-mode yes => protected-mode no
現實場景:redis是生產內網部署,對外不開放端口。。。
- 需要密碼驗證(可選)
修改redis.conf默認參數 # requirepass foobared
連接之后命令 auth <password>
- 修改文件存儲目錄rdb + logfile + aof(可選)
- rdb 修改redis.conf默認參數 dir ./ 文件夾路徑
- logfile 修改redis.conf默認參數 logfile "" 文件名稱,可以改成“redis.log”
- 后台執行
修改redis.conf默認參數 daemonize no ,改成 daemonize yes
會生成pid文件 /var/run/redis_6379.pid 存放進程號
View Code[root@localhost redis]# ./redis-server ./redis.conf [root@localhost redis]# netstat -tlnp Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp 0 0 0.0.0.0:6379 0.0.0.0:* LISTEN 66042/./redis-serve tcp 0 0 0.0.0.0:111 0.0.0.0:* LISTEN 1/systemd tcp 0 0 0.0.0.0:6000 0.0.0.0:* LISTEN 7748/X tcp 0 0 192.168.122.1:53 0.0.0.0:* LISTEN 7604/dnsmasq tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 7215/sshd tcp 0 0 127.0.0.1:631 0.0.0.0:* LISTEN 7217/cupsd tcp 0 0 127.0.0.1:25 0.0.0.0:* LISTEN 7432/master tcp 0 0 127.0.0.1:6010 0.0.0.0:* LISTEN 9283/sshd: root@pts tcp 0 0 127.0.0.1:6011 0.0.0.0:* LISTEN 11424/sshd: root@pt tcp 0 0 127.0.0.1:6012 0.0.0.0:* LISTEN 63727/sshd: root@pt tcp6 0 0 :::111 :::* LISTEN 1/systemd tcp6 0 0 :::6000 :::* LISTEN 7748/X tcp6 0 0 :::21 :::* LISTEN 9406/vsftpd tcp6 0 0 :::22 :::* LISTEN 7215/sshd tcp6 0 0 ::1:631 :::* LISTEN 7217/cupsd tcp6 0 0 ::1:25 :::* LISTEN 7432/master tcp6 0 0 ::1:6010 :::* LISTEN 9283/sshd: root@pts tcp6 0 0 ::1:6011 :::* LISTEN 11424/sshd: root@pt tcp6 0 0 ::1:6012 :::* LISTEN 63727/sshd: root@pt [root@localhost redis]# tail /var/run/redis_6379.pid 66042
2.2、docker上進行部署
Docker安裝步驟:https://www.cnblogs.com/wyt007/p/10295834.html
- 啟動Docker
service docker start
- 列出容器內容
docker ps

我們可以看到容器內是空的,我們接下來前往DockerHub下載安裝redis(部分內容需要翻牆)
- 安裝端口並綁定端口
我這里是因為已經在虛擬機安裝了Redis,占用了redis的6379端口,所以用外網6378端口映射docker6379端口
安裝完成會自動啟動
docker run --name some-redis -p 6378:6379 -d redis
這時候在再查看Docker容器

更復雜的配置,應該自己寫一個redis.conf,通過docker-compose 部署進去。而不是自己敲命令。
dockerfile需要拷貝redis.conf - 移除docker中的redis
docker kill 90b45b58a571 docker rm 90b45b58a571
3、【SDK】C#的sdk快速操作和兩款可視化工具介紹
3.1、StackExchange.Redis
github地址:https://github.com/StackExchange/StackExchange.Redis/
使用文檔:https://stackexchange.github.io/StackExchange.Redis/
| String的應用 | web網站上保存用戶信息,模擬session。 |
| Hash的應用 | 記錄每個店鋪的數據庫連接串。(分庫的場景) key: shopid value:connectionstring |
| Set的應用 | 判斷某一個用戶是否在黑名單中。 O(1) |
| List的應用 | 消息隊列 client -> 短信隊列 <- 發送處理程序 -> 運營商 |
- 安裝
Install-Package StackExchange.Redis
- 使用示例
View Codeclass Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); //////cookie(ui,sessionid) //////redis(sessionid,userinfo) //db.StringSet("sessionid", "jack", TimeSpan.FromSeconds(5)); //while (true) //{ // var info = db.StringGet("sessionid"); // Console.WriteLine(info); // Thread.Sleep(1000); //} ////key: shopID value: connectionstring //db.HashSet("connetions", "1", "mysql://192.168.1.1/mydb"); //db.HashSet("connetions", "2", "mysql://192.168.1.2/mydb"); //db.HashSet("connetions", "3", "mysql://192.168.1.3/mydb"); //db.HashSet("connetions", "4", "mysql://192.168.1.4/mydb"); //db.HashSet("connetions", "5", "mysql://192.168.1.5/mydb"); //Console.WriteLine(db.HashGet("connetions", "3")); ////黑名單 //db.SetAdd("blacklist", "1"); //db.SetAdd("blacklist", "2"); //db.SetAdd("blacklist", "3"); //db.SetAdd("blacklist", "4"); //var r = db.SetContains("blacklist", 40); ////消息隊列 //db.ListLeftPush("sms", "18721073333"); //db.ListLeftPush("sms", "18521073333"); //db.ListLeftPush("sms", "18121073033"); //Console.WriteLine(db.ListRightPop("sms")); //Console.WriteLine(db.ListRightPop("sms")); //Console.WriteLine(db.ListRightPop("sms")); Console.ReadKey(); } }
- asp.net core使用redis存儲session
Session是我們在web開發中經常使用的對象,它默認是存在本機的,但是在ASP.NET Core中我們可以十分方便的將Session的存儲介質改為分布式緩存(Redis)或者數據庫(SqlServer)。分布式的緩存可以提高ASP.NET Core 應用的性能和可伸縮性 ,尤其是在托管在雲中或服務器場環境中
-
添加引用
Microsoft.Extensions.Caching.Redis
- 配置服務
public void ConfigureServices(IServiceCollection services) { ... //添加了redis作為分布式緩存 services.AddDistributedRedisCache(option => { option.InstanceName = "session"; option.Configuration = "192.168.181.131:6379"; }); //添加session services.AddSession(options => { //options.IdleTimeout = TimeSpan.FromMinutes(10); //session活期時間 //options.Cookie.HttpOnly = true;//設為httponly }); ... } public void Configure(IApplicationBuilder app, IHostingEnvironment env) { ... //使用session app.UseSession(); ... }
- 設置session
//using Microsoft.AspNetCore.Http; HttpContext.Session.SetString("userinfo", "jack");
- 顯示數據
@using Microsoft.AspNetCore.Http; @Context.Session.GetString("userinfo") @Context.Session.Id

-
3.2、可視化操作
- RedisClient
官網:https://github.com/caoxinyu/RedisClient
- fastoredis
官網:https://fastoredis.com/
- RedisDesktopManager
官網:https://redisdesktop.com/(0.8版本后開始收費)
4、【SDK】StackExchange強類型工具使用和自己動手封裝連接池
4.1、StackExchange.Redis的強類型擴展
為什么要使用強類型擴展?我們可以先看一段代碼:
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); var userModel = new UserModel() { UserName = "jack", Email = "sdfasdf@qq.com", IsVip = true }; db.StringSet("userinfo", JsonConvert.SerializeObject(userModel)); var info = db.StringGet("userinfo"); var model = JsonConvert.DeserializeObject<UserModel>(info); Console.ReadKey(); } } public class UserModel { public string UserName { get; set; } public string Email { get; set; } public bool IsVip { get; set; } }
要存儲數據先要進行序列化成String,然后進行存儲,取出時又要進行反序列化,那么有沒有更好的方式來處理這個問題呢? StackExchange.Redis.Extensions 為我們提供了很好的擴展
StackExchange.Redis.Extensions githun地址:https://github.com/imperugo/StackExchange.Redis.Extensions
- 安裝
Install-Package StackExchange.Redis.Extensions.Core Install-Package StackExchange.Redis.Extensions.Newtonsoft //序列化方式
- 使用
var cacheClient = new StackExchangeRedisCacheClient(redis,new NewtonsoftSerializer()); cacheClient.Add("userinfo", userModel); var model = cacheClient.Get<UserModel>("userinfo");
- 完整代碼示例
View Codeclass Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); var userModel = new UserModel() { UserName = "jack", Email = "sdfasdf@qq.com", IsVip = true }; db.StringSet("userinfo", JsonConvert.SerializeObject(userModel)); var info = db.StringGet("userinfo"); var model = JsonConvert.DeserializeObject<UserModel>(info); var cacheClient = new StackExchangeRedisCacheClient(redis,new NewtonsoftSerializer()); cacheClient.Add("userinfo", userModel); model = cacheClient.Get<UserModel>("userinfo"); Console.ReadKey(); } } public class UserModel { public string UserName { get; set; } public string Email { get; set; } public bool IsVip { get; set; } }
- 缺點
功能比底層要慢 + 功能要少。 暫時沒有Stream
4.2、StackExchange.Redis連接問題
4.2.1、Socket連接過多的問題導致sdk掛掉
- 原因描述:作為實例變量,會有什么后果。。。 如果每次調用都new一下,會有太多的socket。。。 頻繁的打開和關閉。。
- 解決辦法:
- 全局唯一的connection
- 自己定義connection連接池
4.2.2、自定義connection連接池
- 創建連接池 RedisConnectionPool.cs
View Codepublic class RedisConnectionPool { private static ConcurrentQueue<ConnectionMultiplexer> connectionPoolQueue = new ConcurrentQueue<ConnectionMultiplexer>(); private static int minConnectionNum; private static int maxConnectionNum; private static string host; private static int port; //通過構造函數 或者 config形式 獲取 max,min host,port public static void InitializeConnectionPool() { minConnectionNum = 10; maxConnectionNum = 100; host = "192.168.181.131"; port = 6379; for (int i = 0; i < minConnectionNum; i++) { var client = OpenConnection(host, port); PushConnection(client); } Console.WriteLine($"{0} 個 connection 初始化完畢!"); } /* * 1. 如果說池中沒有connection了,那么你需要OpenConnection * * 2. 如果池中獲取到了connection,並且isConnected=false,那么直接close * */ public static ConnectionMultiplexer GetConnection() { while (connectionPoolQueue.Count > 0) { connectionPoolQueue.TryDequeue(out ConnectionMultiplexer client); if (!client.IsConnected) { client.Close(); continue; } return client; } return OpenConnection(host, port); } /// <summary> /// 1. 如果 queue的個數 >=max 直接踢掉 /// /// 2. client的IsConnected 如果為false, close /// </summary> /// <param name="client"></param> /// <returns></returns> public static void PushConnection(ConnectionMultiplexer client) { if (connectionPoolQueue.Count >= maxConnectionNum) { client.Close(); return; } if (!client.IsConnected) { client.Close(); return; } connectionPoolQueue.Enqueue(client); } public static ConnectionMultiplexer OpenConnection(string host, int port) { ConnectionMultiplexer client = ConnectionMultiplexer.Connect($"{host}:{port}"); return client; } }
- 使用方法
View CodeRedisConnectionPool.InitializeConnectionPool(); for (int m = 0; m < 1000000; m++) { ConnectionMultiplexer client = null; try { client = RedisConnectionPool.GetConnection(); var db = client.GetDatabase(0); db.StringSet("username", "jack"); Console.WriteLine(db.StringGet("username") + " " + m); } finally { if (client != null) { RedisConnectionPool.PushConnection(client); } } //ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); //Console.WriteLine(m); }
5、【內存結構】閱讀redis源碼中的五大基礎對象
源碼由Redis官方下載下來並解壓,然后用VS2017打開,源碼在src文件夾下,redis存儲結構:
- RedisServer
源碼位置: src/server.h
redisServer 包含16個 redisDb 在 src/server.c 的 mian() 構造函數中,查看 void initServer(void) ,可以看到創建16個DB
我們可以看到 server.dbnum 默認值為16

- RedisDb
源碼位置: src/server.h
我們可以看到 dict *dict 數據字典,過期時間,長度等等
- redisObject
源碼位置: src/server.h
我們可以看到有個 *ptr 屬性,指向 sds(sds.h)、quicklist(quicklist.h)、dict(dict.h)、rax(rax.h)
可以在redis-cli中查看redisObject屬性
-
sds
sds => char[] 中了一個封裝,把內存優化到了極致
源碼位置: sds.h
View Codetypedef char *sds; /* Note: sdshdr5 is never used, we just access the flags byte directly. * However is here to document the layout of type 5 SDS strings. */ struct __attribute__ ((__packed__)) sdshdr5 { unsigned char flags; /* 3 lsb of type, and 5 msb of string length */ char buf[]; }; struct __attribute__ ((__packed__)) sdshdr8 { uint8_t len; /* used */ uint8_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[]; }; struct __attribute__ ((__packed__)) sdshdr16 { uint16_t len; /* used */ uint16_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[]; }; struct __attribute__ ((__packed__)) sdshdr32 { uint32_t len; /* used */ uint32_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[]; }; struct __attribute__ ((__packed__)) sdshdr64 { uint64_t len; /* used */ uint64_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[]; };
-
redisClient
源碼位置: src/server.h ,包含三大重要參數:-
redisDb *db 要進行操作的數據庫
- int argc 命令的數量
-
robj **argv 命令的所有參數
-
查詢示例
set name jack ↓↓↓↓↓↓↓ argv[0]=set argv[1]=name argv[2]=jack ↓↓↓↓↓↓↓ commandTables [ {set => setCommand} {get => getCommand} ]

-
6、【String】字符串命令介紹和源碼閱讀及秒殺和防重驗證sdk實踐
6.1、String中常見命令詳解
Redis中String命令:https://redis.io/commands#string
| Redis命令 | incr | decr | incrby | decrby |
| C#命令 | ++ | -- | Interlocked.Incrment | Interlocked.Decrement |
| 命令示例 | redis> SET mykey "10" "OK"redis> INCR mykey (integer) 11redis> GET mykey "11" |
redis> SET mykey "10" "OK"redis> DECR mykey (integer) 9redis> SET mykey "234293482390480948029348230948" "OK"redis> DECR mykey ERR ERR value is not an integer or out of range |
redis> SET mykey "10" "OK"redis> INCRBY mykey 5 (integer) 15 |
redis> SET mykey "10" "OK"redis> DECRBY mykey 3 (integer) 7 |
- incr命令的應用場景:【簡單的解決秒殺問題】
庫存:1000 人數:10w
購買:3000 只放3000進來。
購買:1000
待付款減庫存,還是購買成功減庫存,這是業務的事情!
用max來進行人員的過濾。。。
簡單示例:
View Codeclass Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); while (true) { var num = db.StringIncrement("max1"); if (num>3000) { Console.WriteLine("當前請求>3000"); break; } Console.WriteLine(num); } Console.ReadKey(); } }
- SetNx + Expire,Set
應用場景:解決訂單場景中的重復提交問題。 【SetNx=Set if Not eXists】如果key存在,那么value不進行復制。。。
setnx token 12345 (處理成功)
setnx token 12345 (處理失敗)
EXPIRE設置過期時間

轉化成Set

說明10秒之內重復SET是不被允許的
c#代碼示例:
View Codeclass Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); while (true) { var b = db.StringSet("token", "123456", TimeSpan.FromSeconds(10), When.NotExists); Console.WriteLine(b); Thread.Sleep(TimeSpan.FromSeconds(2)); } Console.ReadKey(); } }

6.2、源碼解讀
本篇示例解讀incr,其他請自行參照本篇解讀
我們首先查看 src/server.h 中的 redisCommand ,找到 incr 對應的 incrCommand ,然后定位到 t_string.c
void incrCommand(client *c) { incrDecrCommand(c,1);//這里可以看到是+1 }
然后找到 incrDecrCommand 的定義方法
void incrDecrCommand(client *c, long long incr) { long long value, oldvalue; robj *o, *new; o = lookupKeyWrite(c->db,c->argv[1]); if (o != NULL && checkType(c,o,OBJ_STRING)) return; if (getLongLongFromObjectOrReply(c,o,&value,NULL) != C_OK) return; oldvalue = value; if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) || (incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) { addReplyError(c,"increment or decrement would overflow"); return; } value += incr; if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT && (value < 0 || value >= OBJ_SHARED_INTEGERS) && value >= LONG_MIN && value <= LONG_MAX) { new = o; o->ptr = (void*)((long)value); } else { new = createStringObjectFromLongLongForValue(value); if (o) { dbOverwrite(c->db,c->argv[1],new); } else { dbAdd(c->db,c->argv[1],new); } } signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id); server.dirty++; addReply(c,shared.colon); addReply(c,new); addReply(c,shared.crlf); }

7、【String】位圖命令介紹和黑名單場景應用
7.1、bitmap思想
- 場景示例
customerid: 1-32 都是黑名單用戶,那么如何更省內存的存儲。
HashSet<int> hashSet=new HashSet<int>(); hashSet.Add(customerid=1) ... hashSet.Add(customerid=32)
- int類型存儲:32bit * 32 = 1024bit
- byte類型存儲:8bit * 32 = 256bit
- bitmap類型存儲:1個int = 32bit
customerid 作為 數組的 position
0,1 標識 標識 該 position 是否擁有值。。。
- 重要場景
如果用戶有500W,其中100W是刷單用戶。
如果某個店鋪的刷單用戶<10W,則可以使用 set
如果某個店鋪的刷單用戶>10W,則要使用 bitmap - bitmap 主要適用於比較小的情況,如果key=21億,那么要產生21億/32=幾千萬個int
普通模式只要一個int就可以了
7.2、setbit, getbit, bitcount 的使用
- setbit:設置當前position到底是0還是1
- getbit:獲取當前position的value。
- bitcount: 判斷當前有多少黑名單用戶
- redis-cli示例:
View Code192.168.181.131:0>setbit blacklist 1 1 //key=1黑名單 "0" 192.168.181.131:0>setbit blacklist 2 0 //key=2不是黑名單 "0" 192.168.181.131:0>setbit blacklist 3 0 "0" 192.168.181.131:0>setbit blacklist 4 1 "0" 192.168.181.131:0>getbit blacklist 2 //查詢是否是黑名單 "0" 192.168.181.131:0>getbit blacklist 4 //查詢是否是黑名單 "1" 192.168.181.131:0>bitcount blacklist //查詢黑名單數量 "2"
- SDK示例:
View Codeclass Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); db.StringSetBit("blacklist", 1, true); db.StringSetBit("blacklist", 4, true); Console.WriteLine(db.StringGetBit("blacklist", 1)); Console.WriteLine(db.StringGetBit("blacklist", 2)); Console.WriteLine(db.StringGetBit("blacklist", 3)); Console.WriteLine(db.StringGetBit("blacklist", 4)); Console.ReadKey(); } }

- C#非SDK實現:
View Codeclass Program { static void Main(string[] args) { BitArray bitArray=new BitArray(8); bitArray[1] = true; bitArray[4] = true; Console.WriteLine(bitArray[1]); Console.WriteLine(bitArray[2]); Console.WriteLine(bitArray[3]); Console.WriteLine(bitArray[4]); Console.ReadKey(); } }

8、【List】常用命令介紹及源碼閱讀和sdk使用
redis命令(List):https://redis.io/commands#list
8.1、List
- 鏈表結構解析
List是無環雙向列表,相鄰節點的查找的復雜度未O(1),如下圖所示
- 常見方法
lpush(左進),rpop(右出) ,rpush,lpop。這四種方法,可以作為堆棧(Stack)和鏈表(LinkList)使用
lpush,rpop 這就是隊列
lpush,lpop 這就是堆棧 (括號的語法檢查)
8.2、阻塞版的 bxxx
獲取隊列數據的時方法:
- 寫一個死循環(sleep(10ms)) 消息導致cpu過高
- 如果隊列沒有數據,那么線程卡住(卡住客戶端),一直等待獲取(阻塞)
View Code192.168.181.131:0>lpush sms 1 "1" 192.168.181.131:0>lpush sms 2 "2" 192.168.181.131:0>lpush sms 3 "3" 192.168.181.131:0>llen sms "3" 192.168.181.131:0>blpop sms 0 1) "sms" 2) "3" 192.168.181.131:0>blpop sms 0 1) "sms" 2) "2" 192.168.181.131:0>blpop sms 0 1) "sms" 2) "1" 192.168.181.131:0>blpop sms 0 Connection error:Execution timeout
8.3、Sdk實踐
阻塞和非阻塞 對比一下。。(代碼中控制進行非阻塞)
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); db.ListLeftPush("sms", 1); db.ListLeftPush("sms", 2); db.ListLeftPush("sms", 3); db.ListLeftPush("sms", 4); while (true) { var info = db.ListLeftPop("sms"); Console.WriteLine(info); Thread.Sleep(1000); } Console.ReadKey(); } }
8.4、源碼解讀
我們首先查看 src/quicklist.h 中的 quicklist 我們可以看到由 quicklistNode 組成,包含頭指針和尾指針
typedef struct quicklist { quicklistNode *head; quicklistNode *tail; unsigned long count; /* total count of all entries in all ziplists */ unsigned long len; /* number of quicklistNodes */ int fill : 16; /* fill factor for individual nodes */ unsigned int compress : 16; /* depth of end nodes not to compress;0=off */ } quicklist;
然后我們查看 quicklistNode 可以看到 quicklistNode 為當前節點,包含前節點和后一節點
typedef struct quicklistNode { struct quicklistNode *prev; struct quicklistNode *next; unsigned char *zl; unsigned int sz; /* ziplist size in bytes */ unsigned int count : 16; /* count of items in ziplist */ unsigned int encoding : 2; /* RAW==1 or LZF==2 */ unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */ unsigned int recompress : 1; /* was this node previous compressed? */ unsigned int attempted_compress : 1; /* node can't compress; too small */ unsigned int extra : 10; /* more bits to steal for future usage */ } quicklistNode;
我們接下來查看 LLEN 命令的源碼,我們首先查看 src/server.c 中的 redisCommand ,找到 llen 對應的 llenCommand ,然后定位到 t_list.c
void llenCommand(client *c) { robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero); if (o == NULL || checkType(c,o,OBJ_LIST)) return; addReplyLongLong(c,listTypeLength(o)); }
我們接下來查看 listTypeLength 方法
unsigned long listTypeLength(const robj *subject) { if (subject->encoding == OBJ_ENCODING_QUICKLIST) { return quicklistCount(subject->ptr); } else { serverPanic("Unknown list encoding"); } }
接下來查看 quicklistCount 方法
/* Return cached quicklist count */ unsigned long quicklistCount(const quicklist *ql) { return ql->count; }
於是我們可以看出 LLEN 命令實際上獲取的是 ptr 指針指向的 count
我們接下來再看下 LPUSH 命令,我們還是要先查看 src/server.c 中的 lpush 對應的 lpushCommand 命令、
void lpushCommand(client *c) { pushGenericCommand(c,LIST_HEAD); }
然后查看 pushGenericCommand 方法
void pushGenericCommand(client *c, int where) { int j, pushed = 0; robj *lobj = lookupKeyWrite(c->db,c->argv[1]); if (lobj && lobj->type != OBJ_LIST) { addReply(c,shared.wrongtypeerr); return; } for (j = 2; j < c->argc; j++) { if (!lobj) { lobj = createQuicklistObject(); quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size, server.list_compress_depth); dbAdd(c->db,c->argv[1],lobj); } listTypePush(lobj,c->argv[j],where); pushed++; } addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0)); if (pushed) { char *event = (where == LIST_HEAD) ? "lpush" : "rpush"; signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id); } server.dirty += pushed; }
然后查看 listTypePush 方法
void listTypePush(robj *subject, robj *value, int where) { if (subject->encoding == OBJ_ENCODING_QUICKLIST) { int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL; value = getDecodedObject(value); size_t len = sdslen(value->ptr); quicklistPush(subject->ptr, value->ptr, len, pos); decrRefCount(value); } else { serverPanic("Unknown list encoding"); } }
然后查看 quicklistPush 方法,可以看到加入了頭或者尾
/* Wrapper to allow argument-based switching between HEAD/TAIL pop */ void quicklistPush(quicklist *quicklist, void *value, const size_t sz, int where) { if (where == QUICKLIST_HEAD) { quicklistPushHead(quicklist, value, sz); } else if (where == QUICKLIST_TAIL) { quicklistPushTail(quicklist, value, sz); } }
我們可以查看一下 quicklistPushHead ,可以看到count進行可+1
/* Add new entry to head node of quicklist. * * Returns 0 if used existing head. * Returns 1 if new head created. */ int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) { quicklistNode *orig_head = quicklist->head; if (likely( _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) { quicklist->head->zl = ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD); quicklistNodeUpdateSz(quicklist->head); } else { quicklistNode *node = quicklistCreateNode(); node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD); quicklistNodeUpdateSz(node); _quicklistInsertNodeBefore(quicklist, quicklist->head, node); } quicklist->count++; quicklist->head->count++; return (orig_head != quicklist->head); }
9、【Hash】哈希命令介紹和分庫連接串存儲及源碼閱讀
9.1、Hash的底層結構
redis的哈希對象的底層存儲可以使用ziplist(壓縮列表)和hashtable。當hash對象可以同時滿足一下兩個條件時,哈希對象使用ziplist編碼。
- 哈希對象保存的所有鍵值對的鍵和值的字符串長度都小於64字節
- 哈希對象保存的鍵值對數量小於512個
redis的hash架構就是標准的hashtab的結構,通過掛鏈解決沖突問題。

類比成C#:
Dictionary<string,string> dict=new Dictionary<string,string>(); dict.Add("username","jack"); //假設hash(username) = 100 //table[100]=dictEntry(username,jack,next ) => model dict.Add("password","12345"); //假設hash(password) = 100 //hash沖突進行掛鏈 //table[100]= dictEntry(pasword,12345,next ) -> dictEntry(username,jack,next ) var info= dict["username"]; info=jack;
可以看出next的作用是將沖突的hash進行掛鏈
9.2、使用常用的hash命令
Hash命令地址:https://redis.io/commands#hash
常用的Hash命令:hset,hget,hdel,hlen,hexists,hkeys,hvals,hgetall
- 命令簡單使用
View Code127.0.0.1:6379> flushdb OK 127.0.0.1:6379> hset conn 1 mysql://1 (integer) 1 127.0.0.1:6379> hset conn 2 mysql://2 (integer) 1 127.0.0.1:6379> hlen conn (integer) 2 127.0.0.1:6379> hexists conn 2 (integer) 1 127.0.0.1:6379> hexists conn 3 (integer) 0 127.0.0.1:6379> hget conn 2 "mysql://2" 127.0.0.1:6379> hdel conn 2 (integer) 1 127.0.0.1:6379> hlen conn (integer) 1 127.0.0.1:6379> hset conn 3 mysql://3 (integer) 1 127.0.0.1:6379> hlen conn (integer) 2 127.0.0.1:6379> hkeys conn 1) "1" 2) "3" 127.0.0.1:6379> hvals conn 1) "mysql://1" 2) "mysql://3" 127.0.0.1:6379> hgetall conn 1) "1" 2) "mysql://1" 3) "3" 4) "mysql://3"
9.3、SDK的使用
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); db.HashSet("conn", 10, "mysql://10"); var info = db.HashGet("conn", 10); Console.WriteLine(info); var len = db.HashLength("conn"); Console.WriteLine(len); var arr = db.HashKeys("conn"); Console.WriteLine(string.Join(",", arr)); Console.ReadKey(); } }
9.4、源碼解讀
我們首先查看 src/dict.h 頭文件中的 dict 我們可以看到由數組結構 dictht 組成
typedef struct dict { dictType *type; void *privdata; dictht ht[2]; long rehashidx; /* rehashing not in progress if rehashidx == -1 */ unsigned long iterators; /* number of iterators currently running */ } dict;
然后我們查看 dictht 包含 dictEntry
/* This is our hash table structure. Every dictionary has two of this as we * implement incremental rehashing, for the old to the new table. */ typedef struct dictht { dictEntry **table; unsigned long size; //開辟的大小空間 unsigned long sizemask; //求余使用 unsigned long used; //實際使用的大小空間 } dictht;
然后我們查看 dictEntry 、
typedef struct dictEntry { void *key; union { void *val; uint64_t u64; int64_t s64; double d; } v; struct dictEntry *next; //掛鏈使用 } dictEntry;
接下來我們查看 HLEN 命令,我們還是要先查看 src/server.c 中的 hlen 對應的 hlenCommand 命令
void hlenCommand(client *c) { robj *o; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,OBJ_HASH)) return; addReplyLongLong(c,hashTypeLength(o)); }
然后我們接下來查看獲取hash對象長度的 hashTypeLength 方法
/* Return the number of elements in a hash. */ unsigned long hashTypeLength(const robj *o) { unsigned long length = ULONG_MAX; if (o->encoding == OBJ_ENCODING_ZIPLIST) { length = ziplistLen(o->ptr) / 2; } else if (o->encoding == OBJ_ENCODING_HT) { length = dictSize((const dict*)o->ptr); } else { serverPanic("Unknown hash encoding"); } return length; }
然后查看 dictSize 方法查看計算邏輯
#define dictSize(d) ((d)->ht[0].used+(d)->ht[1].used)
可以看到將兩個 dictht 數組中的 used 相加,得到 hlen 結果
我們接下來再看下 HSET 命令,我們還是要先查看 src/server.c 中的 hset 對應的 hsetCommand 命令
void hsetCommand(client *c) { int i, created = 0; robj *o; if ((c->argc % 2) == 1) { addReplyError(c,"wrong number of arguments for HMSET"); return; } if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; hashTypeTryConversion(o,c->argv,2,c->argc-1); for (i = 2; i < c->argc; i += 2) //遍歷hset的兩個參數 created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY); /* HMSET (deprecated) and HSET return value is different. */ char *cmdname = c->argv[0]->ptr; if (cmdname[1] == 's' || cmdname[1] == 'S') { /* HSET */ addReplyLongLong(c, created); } else { /* HMSET */ addReply(c, shared.ok); } signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); server.dirty++; }
然后我們查看 hashTypeSet 方法
int hashTypeSet(robj *o, sds field, sds value, int flags) { int update = 0; //判斷是否是壓縮類型 if (o->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *zl, *fptr, *vptr; zl = o->ptr; fptr = ziplistIndex(zl, ZIPLIST_HEAD); if (fptr != NULL) { fptr = ziplistFind(fptr, (unsigned char*)field, sdslen(field), 1); if (fptr != NULL) { /* Grab pointer to the value (fptr points to the field) */ vptr = ziplistNext(zl, fptr); serverAssert(vptr != NULL); update = 1; /* Delete value */ zl = ziplistDelete(zl, &vptr); /* Insert new value */ zl = ziplistInsert(zl, vptr, (unsigned char*)value, sdslen(value)); } } if (!update) { /* Push new field/value pair onto the tail of the ziplist */ zl = ziplistPush(zl, (unsigned char*)field, sdslen(field), ZIPLIST_TAIL); zl = ziplistPush(zl, (unsigned char*)value, sdslen(value), ZIPLIST_TAIL); } o->ptr = zl; /* Check if the ziplist needs to be converted to a hash table */ if (hashTypeLength(o) > server.hash_max_ziplist_entries) hashTypeConvert(o, OBJ_ENCODING_HT); } else if (o->encoding == OBJ_ENCODING_HT) { dictEntry *de = dictFind(o->ptr,field);//hash(field)=int查看dictEntry是否有這個position if (de) { sdsfree(dictGetVal(de)); if (flags & HASH_SET_TAKE_VALUE) { dictGetVal(de) = value; value = NULL; } else { dictGetVal(de) = sdsdup(value); } update = 1; } else { sds f,v; if (flags & HASH_SET_TAKE_FIELD) { f = field; field = NULL; } else { f = sdsdup(field); } if (flags & HASH_SET_TAKE_VALUE) { v = value; value = NULL; } else { v = sdsdup(value); } dictAdd(o->ptr,f,v); } } else { serverPanic("Unknown hash encoding"); } /* Free SDS strings we did not referenced elsewhere if the flags * want this function to be responsible. */ if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field); if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value); return update; }
然后查看 dictFind 方法
static dictEntry *dictFind(dict *ht, const void *key) { dictEntry *he; unsigned int h; if (ht->size == 0) return NULL; h = dictHashKey(ht, key) & ht->sizemask;//求余取hash值 he = ht->table[h];//到table中進行查找 while(he) {//如果存在則還要進行掛鏈查找 if (dictCompareHashKeys(ht, key, he->key)) return he; he = he->next; } return NULL; }
然后查看 dictHashKey 方法
#define dictHashKey(ht, key) (ht)->type->hashFunction(key)
我們查看 dictAdd 方法
/* Add an element to the target hash table */ static int dictAdd(dict *ht, void *key, void *val) { int index; dictEntry *entry; /* Get the index of the new element, or -1 if * the element already exists. */ if ((index = _dictKeyIndex(ht, key)) == -1) return DICT_ERR; /* Allocates the memory and stores key */ entry = malloc(sizeof(*entry));//后進來的放在前面 entry->next = ht->table[index]; ht->table[index] = entry;//將實體放在table的對應索引中去 /* Set the hash entry fields. */ dictSetHashKey(ht, entry, key); dictSetHashVal(ht, entry, val); ht->used++; //最終將used++ return DICT_OK; }
10、【Set,HyperLogLog】常用命令介紹和sdk使用
10.1、理解Set的底層數據結構
Set 應用場景: 黑名單。
Set 底層就是用了dict。
[key=xxx,value=null]
10.2、常用set命令
sadd(增加),sismember(是否包含),scard(統計個數),srem(刪除),smembers(列出值)
server.natappfree.cc:0>sadd blacklist 1 "1" server.natappfree.cc:0>sadd blacklist 2 "1" server.natappfree.cc:0>sismember blacklist 2 "1" server.natappfree.cc:0>sismember blacklist 3 "0" server.natappfree.cc:0>scard blacklist "2" server.natappfree.cc:0>sadd blacklist 30 "1" server.natappfree.cc:0>scard blacklist "3" server.natappfree.cc:0>smembers blacklist 1) "1" 2) "2" 3) "30" server.natappfree.cc:0>srem blacklist 2 "1" server.natappfree.cc:0>smembers blacklist 1) "1" 2) "30"
10.3、sdk操作
static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("server.natappfree.cc:39767"); IDatabase db = redis.GetDatabase(0); db.SetAdd("blacklist", "2"); var arr = db.SetMembers("blacklist"); Console.WriteLine(string.Join(",", arr)); var len = db.SetLength("blacklist"); Console.WriteLine($"len={len}"); db.SetRemove("blacklist", "1"); Console.WriteLine(string.Join(",", db.SetMembers("blacklist"))); Console.ReadKey(); }
10.4、源碼閱讀
- scard(統計個數)源碼
我們首先查看 SCARD 命令,我們還是要先查看 src/server.c 中的 scard 對應的 scardCommand 命令
void scardCommand(client *c) { robj *o; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,OBJ_SET)) return; addReplyLongLong(c,setTypeSize(o)); }
然后我們查看 setTypeSize 方法
unsigned long setTypeSize(const robj *subject) { if (subject->encoding == OBJ_ENCODING_HT) { return dictSize((const dict*)subject->ptr); } else if (subject->encoding == OBJ_ENCODING_INTSET) { return intsetLen((const intset*)subject->ptr); } else { serverPanic("Unknown set encoding"); } }
然后我們查看 dictSize 方法
#define dictSize(d) ((d)->ht[0].used+(d)->ht[1].used)這樣就側面印證了也是字典結構
- sadd(增加)源碼
我們首先查看 SCARD 命令,我們還是要先查看 src/server.c 中的 sadd 對應的 saddCommand 命令
void saddCommand(client *c) { robj *set; int j, added = 0; set = lookupKeyWrite(c->db,c->argv[1]); if (set == NULL) { set = setTypeCreate(c->argv[2]->ptr); dbAdd(c->db,c->argv[1],set); } else { if (set->type != OBJ_SET) { addReply(c,shared.wrongtypeerr); return; } } for (j = 2; j < c->argc; j++) {//遍歷多有的值,可以添加多個值 if (setTypeAdd(set,c->argv[j]->ptr)) added++; } if (added) { signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id); } server.dirty += added; addReplyLongLong(c,added); }
我們接下來查看 setTypeAdd 方法
/* Add the specified value into a set. * * If the value was already member of the set, nothing is done and 0 is * returned, otherwise the new element is added and 1 is returned. */ int setTypeAdd(robj *subject, sds value) { long long llval; if (subject->encoding == OBJ_ENCODING_HT) { dict *ht = subject->ptr; dictEntry *de = dictAddRaw(ht,value,NULL); if (de) { dictSetKey(ht,de,sdsdup(value));//添加key dictSetVal(ht,de,NULL);//添加value為null,所以這是內有值得hash字典 return 1; } } else if (subject->encoding == OBJ_ENCODING_INTSET) { if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) { uint8_t success = 0; subject->ptr = intsetAdd(subject->ptr,llval,&success); if (success) { /* Convert to regular set when the intset contains * too many entries. */ if (intsetLen(subject->ptr) > server.set_max_intset_entries) setTypeConvert(subject,OBJ_ENCODING_HT); return 1; } } else { /* Failed to get integer from object, convert to regular set. */ setTypeConvert(subject,OBJ_ENCODING_HT); /* The set *was* an intset and this value is not integer * encodable, so dictAdd should always work. */ serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK); return 1; } } else { serverPanic("Unknown set encoding"); } return 0; }
10.5、HyperLogLogs統計
命令文檔地址:https://redis.io/commands#hyperloglog
- 概況
比如我有存儲數據3,3,1,5 ,那么基數(去除重復后統計數)=3
優點:特別能節省空間,redis: 12k的空間,就能處理long個數據。 2的64次方 個數據(字符,數字)
只能處理count統計,有一定的誤差,誤差率在 0.8%。
原理: 就是使用數學中的 概率算法,不存儲數據本身,用 概率函數 預估基數值。f(x)=xxx.
250萬 int = 1M
2.5億 int 100M - pfadd, pfcount 命令使用
View Codeserver.natappfree.cc:0>pfadd p1 1 "1" server.natappfree.cc:0>pfadd p1 2 "1" server.natappfree.cc:0>pfadd p1 1 "0" server.natappfree.cc:0>pfcount p1 "2"
11、【SortedSet】跳躍表原理分析和topK場景中sdk應用
用途:用於范圍查找。。 10-100 的人數等等。。。
11.1、理解SortedSet底層結構 (skiplist)
跳躍表。 (本質上是解決查找的一個問題)
樹結構: avl,紅黑樹,伸展樹。
鏈表結構: 層級鏈表
<1> 有序的鏈表 (二分查找)
level1: O(N)
level1: 10 - 46 4次
level 2: 3次
leve1 3: -
level1: 做汽車: 上海 - 鎮江 -南京 - 石家庄 - 北京 (100站)
level2: 做高鐵: 上海 - 南京 - 天津 - 北京 (10站)
level3: 做飛機: 上海 - 北京 (1站)
11.2、源碼對照
- zskiplist
我們首先查看 zskiplist 方法,我們還是要先查看 src/server.c 中的對應方法
typedef struct zskiplist { struct zskiplistNode *header, *tail; //頭尾節點 unsigned long length; int level; } zskiplist;
我們接下來查看 zskiplistNode 方法
/* ZSETs use a specialized version of Skiplists */ typedef struct zskiplistNode { sds ele; //原色 double score; //得分,類似於權重 struct zskiplistNode *backward; //回退指針 struct zskiplistLevel { struct zskiplistNode *forward; //向前指針 unsigned long span; //跳躍節點的區間 } level[]; } zskiplistNode;
11.3、應用場景介紹及sdk操作
首先我們初始化消費者客戶的消費積分
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); var rand = new Random(); for (int i = 1; i <= 10000; i++) { var customerID = i; var totalTradeMoney = rand.Next(10, 10000); db.SortedSetAdd("shop", i, totalTradeMoney); } Console.WriteLine("插入成功!"); Console.ReadKey(); } }
統計積分范圍個數
192.168.181.131:0>zcount shop 500 1000 "514"
新增一個用戶的積分
192.168.181.131:0>zadd shop 1 10001 "1"
刪除一個用戶的積分
192.168.181.131:0>zrem shop 10001 "1"
統計所有用戶數量
192.168.181.131:0>zcard shop "10000"
查詢排序后10個
192.168.181.131:0>zrange shop 0 9 1) "5593" 2) "1459" 3) "2811" 4) "5043" 5) "5750" 6) "6601" 7) "337" 8) "7276" 9) "2917" 10) "6990" 192.168.181.131:0>zrange shop 0 9 with192.168.181.131:0>scores 1) "5593" 2) "11" 3) "1459" 4) "13" 5) "2811" 6) "15" 7) "5043" 8) "15" 9) "5750" 10) "15" 11) "6601" 12) "15" 13) "337" 14) "17" 15) "7276" 16) "17" 17) "2917" 18) "18" 19) "6990" 20) "19"
查詢top10
192.168.181.131:0>zrevrange shop 0 9 1) "4907" 2) "9796" 3) "6035" 4) "4261" 5) "2028" 6) "4611" 7) "4612" 8) "1399" 9) "2786" 10) "2696" 192.168.181.131:0>zrevrange shop 0 9 withscores 1) "4907" 2) "9999" 3) "9796" 4) "9998" 5) "6035" 6) "9995" 7) "4261" 8) "9995" 9) "2028" 10) "9995" 11) "4611" 12) "9994" 13) "4612" 14) "9992" 15) "1399" 16) "9992" 17) "2786" 18) "9990" 19) "2696" 20) "9989"
查詢排名
192.168.181.131:0>zrank shop192.168.181.131:0> 60 "9223"
實現業務邏輯:判斷某一個用戶是否在消費力前 25 % 的人群,如果是,就是優質客戶了。(老客戶)
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); var len = db.SortedSetLength("shop"); var customerRank = len * 0.25; // 高端客戶 var customerID = 60; var dbRank = db.SortedSetRank("shop", customerID, Order.Descending); Console.ReadKey(); } }
實現業務邏輯:獲取top10%的客戶,專門做重點維護。
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); var len = db.SortedSetLength("shop"); var top10 = len * 0.1; var vals = db.SortedSetRangeByRankWithScores("shop", order: Order.Descending); Console.ReadKey(); } }
12、【序列化】理解redis的三大序列化存儲機制
12.1、RDB (Redis Database)
默認存儲文件: dump.rdb
數據庫快照,加載速度快。redis意外退出會丟數據。
snapshot: 歷史時間點上的某一刻的全量數據。
我們可以查看 redis.conf 中的設置內存刷新數據到磁盤的觸發點
# Save the DB on disk: # # save <seconds> <changes> # # Will save the DB if both the given number of seconds and the given # number of write operations against the DB occurred. # # In the example below the behaviour will be to save: # after 900 sec (15 min) if at least 1 key changed # after 300 sec (5 min) if at least 10 keys changed # after 60 sec if at least 10000 keys changed # # Note: you can disable saving completely by commenting out all "save" lines. # # It is also possible to remove all the previously configured save # points by adding a save directive with a single empty string argument # like in the following example: # # save "" save 900 1 //900秒內有1個數據改變則觸發 save 300 10 //300秒內有10個數據改變則觸發 save 60 10000 //60秒內有10000個數據改變則觸發
也可以修改 redis.conf 中設置的保存文件名
# The filename where to dump the DB dbfilename dump.rdb
我們來模擬一下數據
./redis-server ./redis.conf set username jack kill -9 50565 ./redis-server ./redis.conf get username (nil)
查看dump文件
[root@localhost redis]# od -c ./mydata/dump.rdb 0000000 R E D I S 0 0 0 9 372 \t r e d i s 0000020 - v e r 005 5 . 0 . 3 372 \n r e d i 0000040 s - b i t s 300 @ 372 005 c t i m e 302 0000060 271 226 N \ 372 \b u s e d - m e m 302 @ 0000100 347 \r \0 372 \f a o f - p r e a m b l 0000120 e 300 \0 377 t 344 312 Z Y 363 323 231 0000134
使用redis工具查看dump文件
[root@localhost redis]# ./src/redis-check-rdb ./mydata/dump.rdb [offset 0] Checking RDB file ./mydata/dump.rdb [offset 26] AUX FIELD redis-ver = '5.0.3' [offset 40] AUX FIELD redis-bits = '64' [offset 52] AUX FIELD ctime = '1548654265' [offset 67] AUX FIELD used-mem = '911168' [offset 83] AUX FIELD aof-preamble = '0' [offset 92] Checksum OK [offset 92] \o/ RDB looks OK! \o/ [info] 0 keys read [info] 0 expires [info] 0 already expired
12.2、AOF (Append Only File)
我們可以查看 redis.conf 中的設置內存刷新數據附加的觸發點
# no: don't fsync, just let the OS flush the data when it wants. Faster. # always: fsync after every write to the append only log. Slow, Safest. # everysec: fsync only one time every second. Compromise. # # The default is "everysec", as that's usually the right compromise between # speed and data safety. It's up to you to understand if you can relax this to # "no" that will let the operating system flush the output buffer when # it wants, for better performances (but if you can live with the idea of # some data loss consider the default persistence mode that's snapshotting), # or on the contrary, use "always" that's very slow but a bit safer than # everysec. # # More details please check the following article: # http://antirez.com/post/redis-persistence-demystified.html # # If unsure, use "everysec". # appendfsync always //來一條附加一條到disk appendfsync everysec //每秒附加一條到disk # appendfsync no //由操作系統來決定
接下來我們關閉RDB,開啟AOF
#save 900 1 #save 300 10 #save 60 10000 appendonly yes
然后進行數據存儲
127.0.0.1:6379> set username jack OK 127.0.0.1:6379> set password 12345 OK
查看生成的 appendonly.aof 文件
[root@localhost mydata]# cat appendonly.aof *2 $6 SELECT $1 0 *3 $3 set $8 username $4 jack *3 $3 set $8 password $5 12345
AOF:加載慢,丟失數據少
RDB:加載快,丟失數據多
12.3、混合模式rdb + aof 模式
既保證加載速度快,有保證了丟失數據少。
如何開啟?我們可以修改 redis.conf 中的設置內存刷新數據到磁盤的觸發點
# When rewriting the AOF file, Redis is able to use an RDB preamble in the # AOF file for faster rewrites and recoveries. When this option is turned # on the rewritten AOF file is composed of two different stanzas: # # [RDB file][AOF tail] # # When loading Redis recognizes that the AOF file starts with the "REDIS" # string and loads the prefixed RDB file, and continues loading the AOF # tail. aof-use-rdb-preamble yes
我們接下來輸入存儲數據
127.0.0.1:6379> flushall OK 127.0.0.1:6379> set username jack OK 127.0.0.1:6379> set password 12345 OK
這時候 appendonly.aof 中會被追加存儲命令信息
[root@localhost redis]# cat appendonly.aof *2 $6 SELECT $1 0 *1 $8 flushall *3 $3 set $8 username $4 jack *3 $3 set $8 password $5 12345 *3 $3 set $8 username $4 jack *3 $3 set $8 password $5 12345 *1 $8 flushall *3 $3 set $8 username $4 jack *3 $3 set $8 password $5 12345
接下來執行 bgrewriteaof 命令將aof文件內容寫入rdb
127.0.0.1:6379> bgrewriteaof Background append only file rewriting started
這時候再查看 appendonly.aof
[root@localhost redis]# cat appendonly.aof REDIS0009 redis-ver5.0.3 redis-bitse·Nused-memÈ 𮤭preamblepasswordusernamejackÿȵ
12.4、源碼解析
- rdb源碼
查看 rdbSaveRio 方法,該方法位於 src/rdb.c 中
View Code/* Produces a dump of the database in RDB format sending it to the specified * Redis I/O channel. On success C_OK is returned, otherwise C_ERR * is returned and part of the output, or all the output, can be * missing because of I/O errors. * * When the function returns C_ERR and if 'error' is not NULL, the * integer pointed by 'error' is set to the value of errno just after the I/O * error. */ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { dictIterator *di = NULL; dictEntry *de; char magic[10]; int j; uint64_t cksum; size_t processed = 0; if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum; snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; for (j = 0; j < server.dbnum; j++) {//for循環16個DB redisDb *db = server.db+j; dict *d = db->dict;//拿出所有的Key if (dictSize(d) == 0) continue; di = dictGetSafeIterator(d); /* Write the SELECT DB opcode */ if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr; if (rdbSaveLen(rdb,j) == -1) goto werr; /* Write the RESIZE DB opcode. We trim the size to UINT32_MAX, which * is currently the largest type we are able to represent in RDB sizes. * However this does not limit the actual size of the DB to load since * these sizes are just hints to resize the hash tables. */ uint64_t db_size, expires_size; db_size = dictSize(db->dict); expires_size = dictSize(db->expires); if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr; if (rdbSaveLen(rdb,db_size) == -1) goto werr; if (rdbSaveLen(rdb,expires_size) == -1) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { sds keystr = dictGetKey(de); robj key, *o = dictGetVal(de); long long expire; initStaticStringObject(key,keystr); expire = getExpire(db,&key); if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr; /* When this RDB is produced as part of an AOF rewrite, move * accumulated diff from parent to child while rewriting in * order to have a smaller final write. */ if (flags & RDB_SAVE_AOF_PREAMBLE && rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) { processed = rdb->processed_bytes; aofReadDiffFromParent(); } } dictReleaseIterator(di); di = NULL; /* So that we don't release it again on error. */ } /* If we are storing the replication information on disk, persist * the script cache as well: on successful PSYNC after a restart, we need * to be able to process any EVALSHA inside the replication backlog the * master will send us. */ if (rsi && dictSize(server.lua_scripts)) { di = dictGetIterator(server.lua_scripts); while((de = dictNext(di)) != NULL) { robj *body = dictGetVal(de); if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1) goto werr; } dictReleaseIterator(di); di = NULL; /* So that we don't release it again on error. */ } /* EOF opcode */ if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr; /* CRC64 checksum. It will be zero if checksum computation is disabled, the * loading code skips the check in this case. */ cksum = rdb->cksum; memrev64ifbe(&cksum); if (rioWrite(rdb,&cksum,8) == 0) goto werr; return C_OK; werr: if (error) *error = errno; if (di) dictReleaseIterator(di); return C_ERR; }
- aof源碼
查看 rewriteAppendOnlyFile 方法,該方法位於 src/aof.c 中
View Code/* Write a sequence of commands able to fully rebuild the dataset into * "filename". Used both by REWRITEAOF and BGREWRITEAOF. * * In order to minimize the number of commands needed in the rewritten * log Redis uses variadic commands when possible, such as RPUSH, SADD * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time * are inserted using a single command. */ int rewriteAppendOnlyFile(char *filename) { rio aof; FILE *fp; char tmpfile[256]; char byte; /* Note that we have to use a different temp name here compared to the * one used by rewriteAppendOnlyFileBackground() function. */ snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno)); return C_ERR; } server.aof_child_diff = sdsempty(); rioInitWithFile(&aof,fp); if (server.aof_rewrite_incremental_fsync) rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES); if (server.aof_use_rdb_preamble) {//判斷是否設置了aof-use-rdb-preamble yes int error; if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) { errno = error; goto werr; } } else { if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr; } /* Do an initial slow fsync here while the parent is still sending * data, in order to make the next final fsync faster. */ if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; /* Read again a few times to get more data from the parent. * We can't read forever (the server may receive data from clients * faster than it is able to send data to the child), so we try to read * some more data in a loop as soon as there is a good chance more data * will come. If it looks like we are wasting time, we abort (this * happens after 20 ms without new data). */ int nodata = 0; mstime_t start = mstime(); while(mstime()-start < 1000 && nodata < 20) { if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0) { nodata++; continue; } nodata = 0; /* Start counting from zero, we stop on N *contiguous* timeouts. */ aofReadDiffFromParent(); } /* Ask the master to stop sending diffs. */ if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr; if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK) goto werr; /* We read the ACK from the server using a 10 seconds timeout. Normally * it should reply ASAP, but just in case we lose its reply, we are sure * the child will eventually get terminated. */ if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 || byte != '!') goto werr; serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF..."); /* Read the final diff if any. */ aofReadDiffFromParent(); /* Write the received diff to the file. */ serverLog(LL_NOTICE, "Concatenating %.2f MB of AOF diff received from parent.", (double) sdslen(server.aof_child_diff) / (1024*1024)); if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0) goto werr; /* Make sure data will not remain on the OS's output buffers */ if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; if (fclose(fp) == EOF) goto werr; /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ if (rename(tmpfile,filename) == -1) { serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); unlink(tmpfile); return C_ERR; } serverLog(LL_NOTICE,"SYNC append only file rewrite performed"); return C_OK; werr: serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); fclose(fp); unlink(tmpfile); return C_ERR; }
13、【PubSub】發布訂閱模式命令介紹和sdk實戰
13.1、概述
發布訂閱模式:類似於觀察者模式,比如用戶下單之后,通過pubsub講所有訂閱這個主題的subscribe發送消息。

13.2、命令實現
命令地址:https://redis.io/commands#pubsub
常用功能:publish(發布),subscribe(訂閱),psubcribe(模式訂閱)
- subscribe (用2個客戶端進行訂閱)
127.0.0.1:6379> subscribe order Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "order" 3) (integer) 1
-
publish (用一個客戶端進行發發送)
[root@localhost redis]# ./redis-cli 127.0.0.1:6379> publish order trade1 (integer) 2 //顯示發送給了2個訂閱者
- 這時候查看訂閱的客戶端,發現已經收到消息
127.0.0.1:6379> subscribe order Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "order" 3) (integer) 1 1) "message" 2) "order" 3) "trade1"
- psubcribe 支持三種模式的訂閱消息
- * 如果為ord*則表示所有ord開頭的都能通過
- [] 如果為orde[er]則表示order和ordee能通過
- ? 如果為orde?則表示orde后面任意一個字符能通過
- 示例1:
訂閱端
127.0.0.1:6379> psubscribe s* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "s*" 3) (integer) 1
發布端
127.0.0.1:6379> publish shop shop1 (integer) 2 127.0.0.1:6379> publish order trade1 (integer) 0
訂閱端
127.0.0.1:6379> psubscribe s* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "s*" 3) (integer) 1 1) "pmessage" 2) "s*" 3) "shop" 4) "shop1"
13.3、SDK實現
首先,令我們的2個客戶端監控trade通道
127.0.0.1:6379> subscribe trade Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "trade" 3) (integer) 1
然后編寫c#代碼實現第三個客戶端
- 非模式訂閱
View Codeclass Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); var subscriber = redis.GetSubscriber(); //訂閱了channel=>trade //只要有 * 號 就認為是 模式的。 subscriber.Subscribe("trade", (channel, redisVaue) => { Console.WriteLine($"message={redisVaue}"); }); Console.ReadKey(); } }

- publish
View Codeclass Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); var subscriber = redis.GetSubscriber(); for (int i = 0; i < 100; i++) { subscriber.Publish("trade", "t11111111111111"); } Console.ReadKey(); } }
- 模式訂閱
View Codeclass Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); var subscriber = redis.GetSubscriber(); var redisChannel = new RedisChannel("trad[ae]", RedisChannel.PatternMode.Pattern); //訂閱了channel=>trade //只要有 * 號 就認為是 模式的。 subscriber.Subscribe(redisChannel, (channel, redisVaue) => { Console.WriteLine($"message={redisVaue}"); }); Console.ReadKey(); } }

13.4、源碼解析
我們首先查看 Pubsub 組成,我們還是要先查看 src/server.c 中的對應定義
/* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an xor of NOTIFY_... flags. */
然后查看 publish 命令,我們還是要先查看 src/server.c 中的 publish 對應的 publishCommand 命令
void publishCommand(client *c) { int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);//推送消息 if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]); else forceCommandPropagation(c,PROPAGATE_REPL); addReplyLongLong(c,receivers); }
我們接下來查看 pubsubPublishMessage 方法
/* Publish a message */ int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; dictEntry *de; listNode *ln; listIter li; /* Send to clients listening for that channel */ de = dictFind(server.pubsub_channels,channel);//獲取到通道所有client if (de) { list *list = dictGetVal(de); listNode *ln; listIter li; listRewind(list,&li); while ((ln = listNext(&li)) != NULL) {//遍歷所有client進行發送消息 client *c = ln->value; addReply(c,shared.mbulkhdr[3]); addReply(c,shared.messagebulk); addReplyBulk(c,channel); addReplyBulk(c,message); receivers++; } } /* Send to clients listening to matching channels */ if (listLength(server.pubsub_patterns)) {//獲取到通道模式適配的所有client listRewind(server.pubsub_patterns,&li); channel = getDecodedObject(channel); while ((ln = listNext(&li)) != NULL) { pubsubPattern *pat = ln->value; if (stringmatchlen((char*)pat->pattern->ptr, sdslen(pat->pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) { addReply(pat->client,shared.mbulkhdr[4]); addReply(pat->client,shared.pmessagebulk); addReplyBulk(pat->client,pat->pattern); addReplyBulk(pat->client,channel); addReplyBulk(pat->client,message); receivers++; } } decrRefCount(channel); } return receivers; }
然后我們再查看 subscribeCommand 方法
void subscribeCommand(client *c) { int j; for (j = 1; j < c->argc; j++) pubsubSubscribeChannel(c,c->argv[j]); c->flags |= CLIENT_PUBSUB; }
我們接下來查看 pubsubSubscribeChannel 方法
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */ int pubsubSubscribeChannel(client *c, robj *channel) { dictEntry *de; list *clients = NULL; int retval = 0; /* Add the channel to the client -> channels hash table */ if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {//將channel加入到字典中 retval = 1; incrRefCount(channel); /* Add the client to the channel -> list of clients hash table */ de = dictFind(server.pubsub_channels,channel); if (de == NULL) {//如果為null則生成一個list將客戶端塞進去 clients = listCreate(); dictAdd(server.pubsub_channels,channel,clients); incrRefCount(channel); } else { clients = dictGetVal(de); } listAddNodeTail(clients,c);//將當前client追加到鏈表末尾 } /* Notify the client */ addReply(c,shared.mbulkhdr[3]); addReply(c,shared.subscribebulk); addReplyBulk(c,channel); addReplyLongLong(c,clientSubscriptionsCount(c)); return retval; }
14、【Tranaction】事務命令介紹和源碼閱讀
14.1、命令介紹
命令地址:https://redis.io/commands#transactions
常用命令:multi(開始),exec(執行),discard(丟棄) 命令的使用
命令示例:
127.0.0.1:6379> multi OK 127.0.0.1:6379> set username jack QUEUED 127.0.0.1:6379> set password 12345 QUEUED 127.0.0.1:6379> exec 1) OK 2) OK 127.0.0.1:6379> keys * 1) "password" 2) "username" 127.0.0.1:6379> flushall OK 127.0.0.1:6379> multi OK 127.0.0.1:6379> set username jack QUEUED 127.0.0.1:6379> set password 12345 QUEUED 127.0.0.1:6379> discard OK 127.0.0.1:6379> keys * (empty list or set)
14.2、事務的一些坑
示例:
127.0.0.1:6379> flushall OK 127.0.0.1:6379> multi OK 127.0.0.1:6379> set username mary QUEUED 127.0.0.1:6379> lpush username 1 2 3 QUEUED 127.0.0.1:6379> exec 1) OK 2) (error) WRONGTYPE Operation against a key holding the wrong kind of value
這時候我們發現命令有一個未執行成功,這樣破壞了事務的原子性
14.3、watch 防止破壞事務的安全性
watch的目的是為了在執行事務的時候如果命令key的值唄修改,則不會執行成功
示例:
//客戶端1 127.0.0.1:6379> watch username OK 127.0.0.1:6379> multi OK 127.0.0.1:6379> set username jack QUEUED //客戶端2 127.0.0.1:6379> set username mary OK //客戶端1 127.0.0.1:6379> exec (nil) 127.0.0.1:6379> get username "mary"
在client1的執行期間,修改了client1的事務中的某些數據類型的狀態。。。
14.3、sdk使用
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); var transaction = db.CreateTransaction(); transaction.StringSetAsync("username", "jack"); transaction.StringSetAsync("password", "1234512345123451234512345123451234512345123451234512345123451234512345123451234512345123451234512345"); transaction.Execute(); Console.WriteLine("提交成功!"); Console.ReadKey(); } }
Wireshark抓取網絡請求分析redis事務

14.4、源碼解析
- client
查看 client 定義,該方法位於 src/server.h 中
/* With multiplexing we need to take per-client state. * Clients are taken in a linked list. */ typedef struct client { uint64_t id; /* Client incremental unique ID. */ int fd; /* Client socket. */ redisDb *db; /* Pointer to currently SELECTed DB. */ robj *name; /* As set by CLIENT SETNAME. */ sds querybuf; /* Buffer we use to accumulate client queries. */ size_t qb_pos; /* The position we have read in querybuf. */ sds pending_querybuf; /* If this client is flagged as master, this buffer represents the yet not applied portion of the replication stream that we are receiving from the master. */ size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */ int argc; /* Num of arguments of current command. */ robj **argv; /* Arguments of current command. */ struct redisCommand *cmd, *lastcmd; /* Last command executed. */ int reqtype; /* Request protocol type: PROTO_REQ_* */ int multibulklen; /* Number of multi bulk arguments left to read. */ long bulklen; /* Length of bulk argument in multi bulk request. */ list *reply; /* List of reply objects to send to the client. */ unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */ size_t sentlen; /* Amount of bytes already sent in the current buffer or object being sent. */ time_t ctime; /* Client creation time. */ time_t lastinteraction; /* Time of the last interaction, used for timeout */ time_t obuf_soft_limit_reached_time; int flags; /* Client flags: CLIENT_* macros. */ int authenticated; /* When requirepass is non-NULL. */ int replstate; /* Replication state if this is a slave. */ int repl_put_online_on_ack; /* Install slave write handler on ACK. */ int repldbfd; /* Replication DB file descriptor. */ off_t repldboff; /* Replication DB file offset. */ off_t repldbsize; /* Replication DB file size. */ sds replpreamble; /* Replication DB preamble. */ long long read_reploff; /* Read replication offset if this is a master. */ long long reploff; /* Applied replication offset if this is a master. */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_ack_time;/* Replication ack time, if this is a slave. */ long long psync_initial_offset; /* FULLRESYNC reply offset other slaves copying this slave output buffer should use. */ char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */ int slave_listening_port; /* As configured with: SLAVECONF listening-port */ char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */ int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ multiState mstate; /* MULTI/EXEC state */ //保存所有的命令 int btype; /* Type of blocking op if CLIENT_BLOCKED. */ blockingState bpop; /* blocking state */ long long woff; /* Last write global replication offset. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ sds peerid; /* Cached peer ID. */ listNode *client_list_node; /* list node in client list */ /* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES]; } client;
我們接下來查看 multiState 方法
typedef struct multiState { multiCmd *commands; /* Array of MULTI commands */ //存放命令的數組 int count; /* Total number of MULTI commands */ int cmd_flags; /* The accumulated command flags OR-ed together. So if at least a command has a given flag, it will be set in this field. */ int minreplicas; /* MINREPLICAS for synchronous replication */ time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */ } multiState;
我們接下來查看 multiCmd 方法
/* Client MULTI/EXEC state */ typedef struct multiCmd { robj **argv; //數組命令具體的值 int argc; //參數個數 struct redisCommand *cmd; //具體執行的哪個commond } multiCmd;
查看常見的 Client flags ,在 src/server.h 中
#define CLIENT_MULTI (1<<3) /* This client is in a MULTI context */ //當前的客戶端是MULTI上下文
- multi
我們首先查看 multi 命令,我們還是要先查看 src/server.c 中的 multi 對應的 multiCommand 命令
void multiCommand(client *c) { if (c->flags & CLIENT_MULTI) { addReplyError(c,"MULTI calls can not be nested"); return; } c->flags |= CLIENT_MULTI;//把當前flag置為CLIENT_MULTI addReply(c,shared.ok); }
- exec
我們首先查看 exec 命令,我們還是要先查看 src/server.c 中的 exec 對應的 execCommand 命令
void execCommand(client *c) { int j; robj **orig_argv; int orig_argc; struct redisCommand *orig_cmd; int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */ int was_master = server.masterhost == NULL; if (!(c->flags & CLIENT_MULTI)) { addReplyError(c,"EXEC without MULTI"); return; } /* Check if we need to abort the EXEC because: * 1) Some WATCHed key was touched. * 2) There was a previous error while queueing commands. * A failed EXEC in the first case returns a multi bulk nil object * (technically it is not an error but a special behavior), while * in the second an EXECABORT error is returned. */ if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) { addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr : shared.nullmultibulk); discardTransaction(c); goto handle_monitor; } /* If there are write commands inside the transaction, and this is a read * only slave, we want to send an error. This happens when the transaction * was initiated when the instance was a master or a writable replica and * then the configuration changed (for example instance was turned into * a replica). */ if (!server.loading && server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)//判斷是不是CAS狀態,如果是的話,則取消 { addReplyError(c, "Transaction contains write commands but instance " "is now a read-only slave. EXEC aborted."); discardTransaction(c); goto handle_monitor; } /* Exec all the queued commands */ unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ //接觸所有watch命令控制住的key orig_argv = c->argv; orig_argc = c->argc; orig_cmd = c->cmd; addReplyMultiBulkLen(c,c->mstate.count); for (j = 0; j < c->mstate.count; j++) { c->argc = c->mstate.commands[j].argc; c->argv = c->mstate.commands[j].argv; c->cmd = c->mstate.commands[j].cmd; /* Propagate a MULTI request once we encounter the first command which * is not readonly nor an administrative one. * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) { execCommandPropagateMulti(c); //以冒泡的形式執行 must_propagate = 1; } call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL); /* Commands may alter argc/argv, restore mstate. */ //讀取所有的命令 c->mstate.commands[j].argc = c->argc; c->mstate.commands[j].argv = c->argv; c->mstate.commands[j].cmd = c->cmd; } c->argv = orig_argv; c->argc = orig_argc; c->cmd = orig_cmd; discardTransaction(c); /* Make sure the EXEC command will be propagated as well if MULTI * was already propagated. */ if (must_propagate) { int is_master = server.masterhost == NULL; server.dirty++; /* If inside the MULTI/EXEC block this instance was suddenly * switched from master to slave (using the SLAVEOF command), the * initial MULTI was propagated into the replication backlog, but the * rest was not. We need to make sure to at least terminate the * backlog with the final EXEC. */ if (server.repl_backlog && was_master && !is_master) { char *execcmd = "*1\r\n$4\r\nEXEC\r\n"; feedReplicationBacklog(execcmd,strlen(execcmd)); } } handle_monitor: /* Send EXEC to clients waiting data from MONITOR. We do it here * since the natural order of commands execution is actually: * MUTLI, EXEC, ... commands inside transaction ... * Instead EXEC is flagged as CMD_SKIP_MONITOR in the command * table, and we do it here with correct ordering. */ if (listLength(server.monitors) && !server.loading) replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); }
- watch
查看 redisDb 方法,該方法位於 src/server.h 中
/* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ //watch數組來存放MULTI/EXEC的watch key使之變成CAS狀態 int id; /* Database ID */ long long avg_ttl; /* Average TTL, just for stats */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ } redisDb;
這個地方的處理邏輯在於 t_string.c 中的 setCommand 方法中的 setGenericCommand 方法中的 setKey 方法, setKey 方法位於 db.c 中
/* High level Set operation. This function can be used in order to set * a key, whatever it was existing or not, to a new object. * * 1) The ref count of the value object is incremented. * 2) clients WATCHing for the destination key notified. * 3) The expire time of the key is reset (the key is made persistent). * * All the new keys in the database should be created via this interface. */ void setKey(redisDb *db, robj *key, robj *val) { if (lookupKeyWrite(db,key) == NULL) { dbAdd(db,key,val); } else { dbOverwrite(db,key,val); } incrRefCount(val); removeExpire(db,key); signalModifiedKey(db,key); //通知修改key }
我們接下來查看 signalModifiedKey 方法
/*----------------------------------------------------------------------------- * Hooks for key space changes. * * Every time a key in the database is modified the function * signalModifiedKey() is called. * * Every time a DB is flushed the function signalFlushDb() is called. *----------------------------------------------------------------------------*/ void signalModifiedKey(redisDb *db, robj *key) { //鈎子函數,所有key的修改都能監控到 touchWatchedKey(db,key); }
我們接下來查看 touchWatchedKey 方法,位於 multi.c 文件中
/* "Touch" a key, so that if this key is being WATCHed by some client the * next EXEC will fail. */ void touchWatchedKey(redisDb *db, robj *key) { list *clients; listIter li; listNode *ln; if (dictSize(db->watched_keys) == 0) return; clients = dictFetchValue(db->watched_keys, key); //拿出watched_keys中的所有客戶端,類似結構["username":{client1, client2,client3}] if (!clients) return; /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */ /* Check if we are already watching for this key */ listRewind(clients,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags |= CLIENT_DIRTY_CAS; //把所有的狀態全部設置成CAS狀態,后面執行exec的時候回進行該狀態判斷 } }
15、【Scan】億級key的刪除困惑之理解利器scan
15.1、Keys 硬遍歷的困惑
keys命令介紹:https://redis.io/commands#generic
背景介紹:最近有一個redis大概有1億個key,但是隨着有些店鋪的過期,我需要把keys找到刪除(一年一個周期),以減少redis內存的膨脹。如果直接使用使用直接 keys * 命令,則會造成redis卡死。
數據存儲格式:key: s1c1 => shopid=1 customerid= 1。 value: 總交易金額,總交易次數。所以獲取到的key為key:s1c2, s1c3, s2c1, s2c2
困惑原因:由於redis是單線程的,遍歷37w數據大約需要4s的時間,如果是上億級的數據會很耗時,所以數據量比較大的時候不建議使用keys
模擬數據插入:
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); var rand = new Random(); for (int i = 1; i < int.MaxValue; i++) { var customerID = rand.Next(1, 10000); var key = $"s{i}c{customerID}"; var value = ""; //統計信息 db.StringSet(key, value); } Console.WriteLine("提交成功!"); Console.ReadKey(); } }
15.2、Scan 軟遍歷
采用cursor游標的模式,增量返回。不是像keys一樣所有都返回。而是采用游標的形式從0開始,從0結束。。
語法結構:count: 返回的條數 【maxcount】,先讀取后匹配
SCAN cursor [MATCH pattern] [COUNT count]
- SCAN:遍歷string
- HSCAN:遍歷hash
- ZSCAN:遍歷SortSet
- SSCAN:遍歷Set
示例:
127.0.0.1:6379> scan 0 match s* count 10 //先從游標0開始按模式匹配10個 1) "6553600" //獲取到當前游標6553600 2) 1) "s6320142c103" 2) "s719732c3086" 3) "s4214422c4224" 4) "s6107971c7924" 5) "s571181c6966" 6) "s750494c9526" 7) "s527442c5164" 8) "s6580725c8456" 9) "s4791604c5206" 10) "s1556977c9206" (1.95s) 127.0.0.1:6379> scan 6553600 match s* count 10 //從游標6553600繼續按模式匹配10個 1) "6422528" //獲取到當前游標6422528 2) 1) "s4304862c8240" 2) "s3414324c2227" 3) "s4356115c2908" 4) "s236939c720" 5) "s3866928c1421" 6) "s4228406c6939" 7) "s5128352c6328" 8) "s3357175c9411" 9) "s2312242c5901" 10) "s5774711c106" 127.0.0.1:6379> scan 6422528 match s* count 10 //從游標6422528繼續按模式匹配10個 1) "7995392" //獲取到當前游標7995392 2) 1) "s1823975c1611" 2) "s244495c2589" 3) "s1786203c9731" 4) "s6120152c2581" 5) "s3939227c1146" 6) "s2551230c1949" 7) "s2603224c341" 8) "s5598259c625" 9) "s5823184c9255" 10) "s3871444c9972" (0.52s)
15.3、SDK實現
SDK中將keys 和 scan 合二為一了。。
- 如果你的sdk 版本比較低,或者不支持scan,那就是用keys
- 如果你的key的個數比較少,可能就會是用到keys。。。
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); var server = redis.GetServer("192.168.181.131:6379"); var list = server.Keys(0, "s*", 10); // 底層幫你每次從server獲取10條,上層不用關心這個。。。 // 自動幫你執行了 list = server.Keys(cursor, "s*", 10); var index = 1; foreach (var item in list) { Console.WriteLine(item); Console.WriteLine(index++); } Console.ReadKey(); } }
wireshark抓包

15.4、 源碼簡要研究
我們首先查看 SCAN 命令,我們還是要先查看 src/server.c 中的 scan 對應的 scanCommand 命令
/* The SCAN command completely relies on scanGenericCommand. */ void scanCommand(client *c) { unsigned long cursor; if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return; scanGenericCommand(c,NULL,cursor); }
我們接下來查看 scanGenericCommand 方法
/* This command implements SCAN, HSCAN and SSCAN commands. * If object 'o' is passed, then it must be a Hash or Set object, otherwise * if 'o' is NULL the command will operate on the dictionary associated with * the current database. * * When 'o' is not NULL the function assumes that the first argument in * the client arguments vector is a key so it skips it before iterating * in order to parse options. * * In the case of a Hash object the function returns both the field and value * of every element on the Hash. */ void scanGenericCommand(client *c, robj *o, unsigned long cursor) { int i, j; list *keys = listCreate(); listNode *node, *nextnode; long count = 10; sds pat = NULL; int patlen = 0, use_pattern = 0; dict *ht; /* Object must be NULL (to iterate keys names), or the type of the object * must be Set, Sorted Set, or Hash. */ serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH || o->type == OBJ_ZSET); /* Set i to the first option argument. The previous one is the cursor. */ i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */ /* Step 1: Parse options. */ //第一步,轉換options,驗證count、match、*不能錯 while (i < c->argc) { j = c->argc - i; if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) { if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL) != C_OK) { goto cleanup; } if (count < 1) { addReply(c,shared.syntaxerr); goto cleanup; } i += 2; } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) { pat = c->argv[i+1]->ptr; patlen = sdslen(pat); /* The pattern always matches if it is exactly "*", so it is * equivalent to disabling it. */ use_pattern = !(pat[0] == '*' && patlen == 1); i += 2; } else { addReply(c,shared.syntaxerr); goto cleanup; } } /* Step 2: Iterate the collection. * * Note that if the object is encoded with a ziplist, intset, or any other * representation that is not a hash table, we are sure that it is also * composed of a small number of elements. So to avoid taking state we * just return everything inside the object in a single call, setting the * cursor to zero to signal the end of the iteration. */ /* Handle the case of a hash table. */ //第二步,迭代集合 ht = NULL; if (o == NULL) { ht = c->db->dict; } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) { ht = o->ptr; } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) { ht = o->ptr; count *= 2; /* We return key / value for this type. */ } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = o->ptr; ht = zs->dict; count *= 2; /* We return key / value for this type. */ } if (ht) { void *privdata[2]; /* We set the max number of iterations to ten times the specified * COUNT, so if the hash table is in a pathological state (very * sparsely populated) we avoid to block too much time at the cost * of returning no or very few elements. */ long maxiterations = count*10; /* We pass two pointers to the callback: the list to which it will * add new elements, and the object containing the dictionary so that * it is possible to fetch more data in a type-dependent way. */ privdata[0] = keys; privdata[1] = o; do { cursor = dictScan(ht, cursor, scanCallback, NULL, privdata); } while (cursor && maxiterations-- && listLength(keys) < (unsigned long)count); } else if (o->type == OBJ_SET) { int pos = 0; int64_t ll; while(intsetGet(o->ptr,pos++,&ll)) listAddNodeTail(keys,createStringObjectFromLongLong(ll)); cursor = 0; } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) { unsigned char *p = ziplistIndex(o->ptr,0); unsigned char *vstr; unsigned int vlen; long long vll; while(p) { ziplistGet(p,&vstr,&vlen,&vll); listAddNodeTail(keys, (vstr != NULL) ? createStringObject((char*)vstr,vlen) : createStringObjectFromLongLong(vll)); p = ziplistNext(o->ptr,p); } cursor = 0; } else { serverPanic("Not handled encoding in SCAN."); } /* Step 3: Filter elements. */ //第三步,過濾元素 node = listFirst(keys); while (node) { robj *kobj = listNodeValue(node); nextnode = listNextNode(node); int filter = 0; /* Filter element if it does not match the pattern. */ //使用模式去匹配 if (!filter && use_pattern) { if (sdsEncodedObject(kobj)) { if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0)) filter = 1; } else { char buf[LONG_STR_SIZE]; int len; serverAssert(kobj->encoding == OBJ_ENCODING_INT); len = ll2string(buf,sizeof(buf),(long)kobj->ptr); if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1; } } /* Filter element if it is an expired key. */ //判斷key有沒有過期,過期也進行刪除 if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1; /* Remove the element and its associted value if needed. */ //刪除元素關聯的值 if (filter) { decrRefCount(kobj); listDelNode(keys, node); } /* If this is a hash or a sorted set, we have a flat list of * key-value elements, so if this element was filtered, remove the * value, or skip it if it was not filtered: we only match keys. */ if (o && (o->type == OBJ_ZSET || o->type == OBJ_HASH)) { node = nextnode; nextnode = listNextNode(node); if (filter) { kobj = listNodeValue(node); decrRefCount(kobj); listDelNode(keys, node); } } node = nextnode; } /* Step 4: Reply to the client. */ //第四部,響應客戶端 addReplyMultiBulkLen(c, 2); addReplyBulkLongLong(c,cursor); addReplyMultiBulkLen(c, listLength(keys)); while ((node = listFirst(keys)) != NULL) { robj *kobj = listNodeValue(node); addReplyBulk(c, kobj); decrRefCount(kobj); listDelNode(keys, node); } cleanup: listSetFreeMethod(keys,decrRefCountVoid); listRelease(keys); }
我們重點查看一下第二步的 dictScan ,在 dict.c 文件中
/* dictScan() is used to iterate over the elements of a dictionary. * dictscan()用於迭代字典的元素。 * * Iterating works the following way: * 迭代的工作方式如下: * * 1) Initially you call the function using a cursor (v) value of 0. * 1)最初使用光標(v)值0調用函數。 * 2) The function performs one step of the iteration, and returns the * new cursor value you must use in the next call. * 2)函數執行迭代的一個步驟,並返回下一次調用中必須使用的新光標值。 * 3) When the returned cursor is 0, the iteration is complete. * 3)當返回的光標為0時,迭代完成。 * * The function guarantees all elements present in the * dictionary get returned between the start and end of the iteration. * However it is possible some elements get returned multiple times. * 函數確保在迭代的開始和結束之間返回字典中的所有元素。但是,某些元素可能會多次返回。 * * For every element returned, the callback argument 'fn' is * called with 'privdata' as first argument and the dictionary entry * 'de' as second argument. * 對於返回的每個元素,調用回調參數“fn”,第一個參數為“privdata”,第二個參數為字典條目“de”。 * * HOW IT WORKS. * 它是如何工作的。 * * The iteration algorithm was designed by Pieter Noordhuis. * The main idea is to increment a cursor starting from the higher order * bits. That is, instead of incrementing the cursor normally, the bits * of the cursor are reversed, then the cursor is incremented, and finally * the bits are reversed again. * 迭代算法由Pieter Noordhuis設計。主要思想是從高階位開始增加光標。也就是說,不是通常遞增光標,而是反轉光標的位,然后遞增光標,最后再次反轉位。 * * This strategy is needed because the hash table may be resized between * iteration calls. * 需要使用此策略,因為哈希表可能在迭代調用之間調整大小。 * * dict.c hash tables are always power of two in size, and they * use chaining, so the position of an element in a given table is given * by computing the bitwise AND between Hash(key) and SIZE-1 * (where SIZE-1 is always the mask that is equivalent to taking the rest * of the division between the Hash of the key and SIZE). * dict.c散列表的大小總是2的冪,它們使用鏈接,因此通過計算散列(鍵)和大小-1之間的位和(其中,大小-1始終是等同於在鍵的散列和大小之間進行其余除法的掩碼)來給出給定表中元素的位置。 * * For example if the current hash table size is 16, the mask is * (in binary) 1111. The position of a key in the hash table will always be * the last four bits of the hash output, and so forth. * 例如,如果當前哈希表大小為16,則掩碼為(二進制)1111。鍵在哈希表中的位置始終是哈希輸出的最后四位,以此類推。 * * WHAT HAPPENS IF THE TABLE CHANGES IN SIZE? * 如果表的大小發生了變化,會發生什么? * * If the hash table grows, elements can go anywhere in one multiple of * the old bucket: for example let's say we already iterated with * a 4 bit cursor 1100 (the mask is 1111 because hash table size = 16). * 如果散列表增長,元素可以在舊bucket的一個倍數中移動到任何地方:例如,假設我們已經使用4位光標1100進行了迭代(掩碼為1111,因為散列表大小=16)。 * * If the hash table will be resized to 64 elements, then the new mask will * be 111111. The new buckets you obtain by substituting in ??1100 * with either 0 or 1 can be targeted only by keys we already visited * when scanning the bucket 1100 in the smaller hash table. * 如果哈希表將被調整為64個元素,那么新的掩碼將是111111。你用替換的方法得到的新桶??只有在掃描較小哈希表中的bucket 1100時,我們已經訪問過的鍵才能針對0或1的1100。 * * By iterating the higher bits first, because of the inverted counter, the * cursor does not need to restart if the table size gets bigger. It will * continue iterating using cursors without '1100' at the end, and also * without any other combination of the final 4 bits already explored. * 通過首先迭代更高的位,由於計數器是反向的,如果表的大小變大,光標就不需要重新啟動。它將繼續使用光標進行迭代,結尾不帶“1100”,也不包含已探索的最后4位的任何其他組合。 * * Similarly when the table size shrinks over time, for example going from * 16 to 8, if a combination of the lower three bits (the mask for size 8 * is 111) were already completely explored, it would not be visited again * because we are sure we tried, for example, both 0111 and 1111 (all the * variations of the higher bit) so we don't need to test it again. * 同樣地,當表大小隨着時間而縮小時,例如從16到8,如果已經完全探索了較低的三位(8大小的掩碼是111)的組合,則不會再次訪問它,因為我們確定已嘗試過,例如,0111和1111(較高位的所有變化),因此我們不需要再次測試它。 * * WAIT... YOU HAVE *TWO* TABLES DURING REHASHING! * * * Yes, this is true, but we always iterate the smaller table first, then * we test all the expansions of the current cursor into the larger * table. For example if the current cursor is 101 and we also have a * larger table of size 16, we also test (0)101 and (1)101 inside the larger * table. This reduces the problem back to having only one table, where * the larger one, if it exists, is just an expansion of the smaller one. * 是的,這是正確的,但我們總是先迭代較小的表,然后將當前光標的所有擴展測試到較大的表中。例如,如果當前光標是101,並且我們還有一個更大的表,大小為16,那么我們還將在更大的表中測試(0)101和(1)101。這將問題減少到只有一個表,其中較大的表(如果存在)只是較小表的擴展。 * * LIMITATIONS * 局限性 * * This iterator is completely stateless, and this is a huge advantage, * including no additional memory used. * 這個迭代器是完全無狀態的,這是一個巨大的優勢,包括沒有使用額外的內存。 * * The disadvantages resulting from this design are: * 這種設計的缺點是: * * 1) It is possible we return elements more than once. However this is usually * easy to deal with in the application level. * 2) The iterator must return multiple elements per call, as it needs to always * return all the keys chained in a given bucket, and all the expansions, so * we are sure we don't miss keys moving during rehashing. * 3) The reverse cursor is somewhat hard to understand at first, but this * comment is supposed to help. * 1)我們可能會多次返回元素。然而,這通常在應用程序級別很容易處理。 * 2)迭代器每次調用必須返回多個元素,因為它需要始終返回一個給定bucket中鏈接的所有鍵以及所有擴展,因此我們確信在重新刷新期間不會錯過鍵的移動。 * 3)反向光標一開始有點難理解,但是這個注釋應該有幫助。 */ unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanBucketFunction* bucketfn, void *privdata) { dictht *t0, *t1; const dictEntry *de, *next; unsigned long m0, m1; if (dictSize(d) == 0) return 0; if (!dictIsRehashing(d)) { t0 = &(d->ht[0]); m0 = t0->sizemask; /* Emit entries at cursor */ if (bucketfn) bucketfn(privdata, &t0->table[v & m0]); de = t0->table[v & m0]; while (de) { next = de->next; fn(privdata, de); de = next; } /* Set unmasked bits so incrementing the reversed cursor * operates on the masked bits */ v |= ~m0; /* Increment the reverse cursor */ v = rev(v); v++; v = rev(v); } else { t0 = &d->ht[0]; t1 = &d->ht[1]; /* Make sure t0 is the smaller and t1 is the bigger table */ if (t0->size > t1->size) { t0 = &d->ht[1]; t1 = &d->ht[0]; } m0 = t0->sizemask; m1 = t1->sizemask; /* Emit entries at cursor */ if (bucketfn) bucketfn(privdata, &t0->table[v & m0]); de = t0->table[v & m0]; while (de) { next = de->next; fn(privdata, de); de = next; } /* Iterate over indices in larger table that are the expansion * of the index pointed to by the cursor in the smaller table */ do { /* Emit entries at cursor */ if (bucketfn) bucketfn(privdata, &t1->table[v & m1]); de = t1->table[v & m1]; while (de) { next = de->next; fn(privdata, de); de = next; } /* Increment the reverse cursor not covered by the smaller mask.*/ v |= ~m1; v = rev(v); v++; v = rev(v); /* Continue while bits covered by mask difference is non-zero */ } while (v & (m0 ^ m1)); } return v; }
16、【Lua】腳本的幾個案例介紹及對scan的優化
16.1、Lua簡介
Lua 腳本功能是 Reids 2.6 版本的最大亮點, 通過內嵌對 Lua 環境的支持, Redis 解決了長久以來不能高效地處理 CAS (check-and-set)命令的缺點, 並且可以通過組合使用多個命令, 輕松實現以前很難實現或者不能高效實現的模式。(其實他就相當於關系數據庫的 存儲過程)
假設我們存儲了userinfo age1 20 age2 25 age3 28,如果我們要找到hash中小於指定age的所有kv。我們只能使用 gethashall 命令取出全部數據或者 getkeys 取出所有key,然后再逐一進行查詢,Lua腳本就是來解決這一問題的
16.2、常用命令介紹
命令地址:https://redis.io/commands#scripting
常用命令:EVAL,EVALSHA, SCRIPT LOAD, SCRIPT FLUSH
- EVAL
語法:EVAL script numkeys key [key ...] arg [arg ...]
示例:
//KEYS表示鍵,ARGV表示值 127.0.0.1:6379> EVAL "return KEYS[1]+KEYS[2]" 2 1 5 (integer) 6 (0.57s) 127.0.0.1:6379> EVAL "return KEYS[1]+ARGV[1]+ARGV[2]" 1 1 10 20 (integer) 31
我們還可以將其寫成lua腳本,使用 file.lua 形式灌入
//創建test.lua文件 vim test.lua //文件中寫入 return KEYS[1]+ARGV[1]+ARGV[2]; //然后執行(注意使用lua腳本時,key和value用,號進行分隔,中間還應有空格) [root@localhost redis]# ./redis-cli --eval ./test.lua 1 , 10 20 (integer) 31
- SCRIPT LOAD + EVALSHA
把腳本在redis server 中進行緩存,這樣不用每次使用的時候再去進行編譯了
127.0.0.1:6379> SCRIPT LOAD "return KEYS[1]+KEYS[2]" "7b23d2a5829679ac50baf7c8e105904a3e9e69bb" 127.0.0.1:6379> EVALSHA 7b23d2a5829679ac50baf7c8e105904a3e9e69bb 2 1 5 (integer) 6
16.3、LUA腳本
情景描述:首先我們優化之前的SCAN查找,我們先進性篩選查找,然后將數據存放到List<string>集合中,然后進行遍歷刪除,這樣就涉及到客戶端與服務端的頻繁數據往返。
那么我們可以通過Lua腳本的方式解決這個問題。
16.3.1、按模式刪除數據
初始化測試數據:
127.0.0.1:6379> flushall OK 127.0.0.1:6379> set s1c1 1 OK 127.0.0.1:6379> set s1c2 2 OK 127.0.0.1:6379> set s1c3 3 OK 127.0.0.1:6379> set s1c4 4 OK 127.0.0.1:6379> set s1c5 5 OK 127.0.0.1:6379> set s1c6 6 OK 127.0.0.1:6379> set s1c7 7 OK 127.0.0.1:6379> set s1c8 8 OK 127.0.0.1:6379> set s1c9 9 OK 127.0.0.1:6379> set s1c10 10 OK 127.0.0.1:6379> set s2c1 1 OK
測試查詢數據:
127.0.0.1:6379> scan 0 match s1c* count 5 1) "10" 2) 1) "s1c8" 2) "s1c10" 3) "s1c3" 4) "s1c2" 5) "s1c1"
test.lua腳本內容:
local pattern=KEYS[1]; local result={}; local cursor=0; while (true) do -- 匹配slc* -- redis.call :相當於在server端調用redis的相應命令。 -- redis.call返回table結構 => dict 相當於c#中的dictionary字典 local dict=redis.call("scan",cursor,"match",pattern); -- 1.獲取cursor cursor=dict[1]; -- 1.獲取返回的keys的table local keyslist=dict[2]; -- 2.獲取要刪除的keys for idx,value in pairs(keyslist) do local isSuccess=redis.call("del",value); if(isSuccess==1)then table.insert(result.isSuccess);-- 插入到result中 end end print(cursor); if(cursor=="0")then break; end end return result;
我們可以在本地寫好腳本,然后再在 linux 系統中使用 rz/sz 命令進行接收和發送文件(注意:rz命令從客戶端進行發送時,去確保接收路徑沒有重復文件,不然會傳輸失敗或者使用rz -y強制覆蓋)
sz:將選定的文件發送(send)到本地機器
rz:運行該命令會彈出一個文件選擇窗口,從本地選擇文件上傳到Linux服務器
安裝命令:
yum install lrzsz
從服務端發送文件到客戶端:
sz filename
從客戶端上傳文件到服務端:
rz
在彈出的框中選擇文件,上傳文件的用戶和組是當前登錄的用戶
Xshell設置默認路徑:
右鍵會話 -> 屬性 -> ZMODEM -> 接收文件夾
然后執行命令刪除"s1c"開頭的所有數據,刪除成功后只會剩一條數據
[root@localhost redis]# ./redis-cli --eval ./test.lua "s1c*" 1) (integer) 1 2) (integer) 1 3) (integer) 1 4) (integer) 1 5) (integer) 1 6) (integer) 1 7) (integer) 1 8) (integer) 1 9) (integer) 1 10) (integer) 1 127.0.0.1:6379> keys * 1) "s2c1"
16.3.2、找到hash中小於指定age的所有kv
目標:刪除age大於25的kv
初始化數據:
127.0.0.1:6379> flushall OK 127.0.0.1:6379> hset userinfo age1 20 (integer) 1 127.0.0.1:6379> hset userinfo age2 25 (integer) 1 127.0.0.1:6379> hset userinfo age3 28 (integer) 1 127.0.0.1:6379> hset userinfo age4 30 (integer) 1
測試查詢數據:
127.0.0.1:6379> hkeys userinfo 1) "age1" 2) "age2" 3) "age3" 4) "age4"
hash.lua腳本內容:
local userinfo=KEYS[1]; --db 的 key local age=KEYS[2]; local hkeys=redis.call("hkeys",userinfo); for k,v in pairs(hkeys) do local hval= redis.call("hget",userinfo,v); -- 如果hval 大於指定的 age,直接刪除 if(tonumber(hval) > tonumber(age)) then redis.call("hdel",userinfo,v); print (v .. " del ok"); end end return 1;
然后執行命令刪除userinfo中age大於25的所有數據,刪除成功后只會剩2條數據
[root@localhost redis]# ./redis-cli --eval ./hash.lua userinfo 25 (integer) 1 [root@localhost redis]# ./redis-cli 127.0.0.1:6379> hkeys userinfo 1) "age1" 2) "age2"
16.4、SDK實現
sdk中的實現邏輯是讀取本地lua文件中的腳本信息,然后提交到redis-server中去執行
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); var script = File.ReadAllText(@"hash.lua", Encoding.Default); var result = db.ScriptEvaluate(script, new RedisKey[2] { "userinfo", "25" }); Console.WriteLine("執行成功"); Console.ReadKey(); } }
17、【性能優化】介紹使用四種方式實現大批量數據急速插入
17.1、場景介紹
如何短時間內向redis灌入大量數據,源於千人千面場景,存儲s*c*(針對shop和customer統計信息進行存儲)。
- 普通模式 的龜速插入
10w條: 50s左右
View Codeclass Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); Stopwatch sw=new Stopwatch(); sw.Start(); for (int i = 0; i < 100000; i++) { db.StringSet(i.ToString(), i.ToString()); } sw.Stop(); Console.WriteLine(sw.ElapsedMilliseconds); Console.WriteLine("執行成功"); Console.ReadKey(); } }

- 原因分析及優化 (Round-Trip)
優化思路:減少round-trip,10萬次請求就是10萬次round-trip
17.2、SDK演示速度大比拼
batch.lua 腳本
-- KEYS[1] 轉化為json數組 local str=KEYS[1]; local arr=cjson.decode(str); local result={}; for idx,v in pairs(arr) do local isSuccess= redis.call("set",v.k,v.v); table.insert(result,isSuccess); end return result;
c# SDK 代碼
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.181.131:6379"); IDatabase db = redis.GetDatabase(0); var dict=new Dictionary<int, List<KeyValuePair<RedisKey, RedisValue>>>(); // 100個shop for (int i = 0; i <= 10; i++) { //1w 個 key var smallList = Enumerable.Range(0, 10000).Select(m => KeyValuePair.Create<RedisKey, RedisValue>(Guid.NewGuid().ToString(), Guid.NewGuid().ToString())).ToList(); dict.Add(i,smallList); } var stopwatch = Stopwatch.StartNew(); //1. transaction (1w條插入一次) foreach (var item in dict) { var transaction = db.CreateTransaction(); foreach (var model in item.Value) { transaction.StringSetAsync(model.Key, model.Value); } transaction.Execute(); Console.WriteLine($"transaction {item.Key} 批次執行完畢"); } Console.WriteLine($"transaction 耗費的時間:{stopwatch.ElapsedMilliseconds}"); stopwatch.Restart(); //2. mset (1w條插入一次) foreach (var item in dict) { db.StringSet(item.Value.ToArray()); Console.WriteLine($"mset {item.Key} 批次執行完畢"); } Console.WriteLine($"mset耗費的時間:{stopwatch.ElapsedMilliseconds}"); stopwatch.Restart(); //3. pipeline (1w條插入一次) foreach (var item in dict) { var batch = db.CreateBatch(); foreach (var model in item.Value) { batch.StringSetAsync(model.Key, model.Value); } batch.Execute(); Console.WriteLine($"batch {item.Key} 批次執行完畢"); } Console.WriteLine($"batch 耗費的時間:{stopwatch.ElapsedMilliseconds}"); stopwatch.Restart(); //4. lua腳本 (1w條插入一次) foreach (var item in dict) { var list = item.Value.Select(i => new model() { k = i.Key, v = i.Value }); db.ScriptEvaluate(File.ReadAllText(@"batch.lua", Encoding.Default), new RedisKey[] { JsonConvert.SerializeObject(list) }); Console.WriteLine($"lua {item.Key} 批次執行完畢"); } Console.WriteLine($"lua 耗費的時間:{stopwatch.ElapsedMilliseconds}"); stopwatch.Restart(); //5. normal (一條一次) foreach (var item in dict) { foreach (var model in item.Value) { db.StringSet(model.Key, model.Value); } Console.WriteLine($"normal {item.Key} 批次執行完畢"); } Console.WriteLine($"normal 耗費的時間:{stopwatch.ElapsedMilliseconds}"); Console.ReadKey(); } } public class model { public string k { get; set; } public string v { get; set; } }
時間統計
... transaction 耗費的時間:1060 ... mset耗費的時間:511 ... batch 耗費的時間:819 ... lua 耗費的時間:1504 ... normal 耗費的時間:61657
18、【限制內存】限制redis的最大內存介紹及代碼測試
有些人可能真的會把Redis當做緩存來使用。因為緩存使用無止境,所有通常會配一個 maxmemory 限制redis最大內存。
18.1、設置最大內存(maxmemory)、內存超出使用策略(maxmemory-policy)
修改redis.conf默認參數(maxmemory、maxmemory-policy)
############################## MEMORY MANAGEMENT ################################ # Set a memory usage limit to the specified amount of bytes. # When the memory limit is reached Redis will try to remove keys # according to the eviction policy selected (see maxmemory-policy). # # If Redis can't remove keys according to the policy, or if the policy is # set to 'noeviction', Redis will start to reply with errors to commands # that would use more memory, like SET, LPUSH, and so on, and will continue # to reply to read-only commands like GET. # # This option is usually useful when using Redis as an LRU or LFU cache, or to # set a hard memory limit for an instance (using the 'noeviction' policy). # # WARNING: If you have replicas attached to an instance with maxmemory on, # the size of the output buffers needed to feed the replicas are subtracted # from the used memory count, so that network problems / resyncs will # not trigger a loop where keys are evicted, and in turn the output # buffer of replicas is full with DELs of keys evicted triggering the deletion # of more keys, and so forth until the database is completely emptied. # # In short... if you have replicas attached it is suggested that you set a lower # limit for maxmemory so that there is some free RAM on the system for replica # output buffers (but this is not needed if the policy is 'noeviction'). # # maxmemory <bytes> maxmemory 104857600 //100M=1024*1024*100 最大內存設置100M # MAXMEMORY POLICY: how Redis will select what to remove when maxmemory # is reached. You can select among five behaviors: # # volatile-lru -> Evict using approximated LRU among the keys with an expire set. # allkeys-lru -> Evict any key using approximated LRU. # volatile-lfu -> Evict using approximated LFU among the keys with an expire set. # allkeys-lfu -> Evict any key using approximated LFU. # volatile-random -> Remove a random key among the ones with an expire set. # allkeys-random -> Remove a random key, any key. # volatile-ttl -> Remove the key with the nearest expire time (minor TTL) # noeviction -> Don't evict anything, just return an error on write operations. # # LRU means Least Recently Used # LFU means Least Frequently Used # # Both LRU, LFU and volatile-ttl are implemented using approximated # randomized algorithms. # # Note: with any of the above policies, Redis will return an error on write # operations, when there are no suitable keys for eviction. # # At the date of writing these commands are: set setnx setex append # incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd # sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby # zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby # getset mset msetnx exec sort # # The default is: # # maxmemory-policy noeviction maxmemory-policy allkeys-lru //maxmemory-policy 配置內存不足時使用的策略。
內存不足時使用的策略使用說明
- LRU(Least Recently Used最少最近使用 ):

- LFU(Least Frequently Used訪問次數最少的優先剔除):
我們可以通過使用 info memory 命令查看內存使用情況
View Code127.0.0.1:6379> info memory # Memory used_memory:911136 used_memory_human:889.78K //已經使用內存 used_memory_rss:2678784 used_memory_rss_human:2.55M used_memory_peak:911248 used_memory_peak_human:889.89K used_memory_peak_perc:99.99% used_memory_overhead:910574 used_memory_startup:860880 used_memory_dataset:562 used_memory_dataset_perc:1.12% allocator_allocated:878296 allocator_active:2640896 allocator_resident:2640896 total_system_memory:1907941376 total_system_memory_human:1.78G used_memory_lua:37888 used_memory_lua_human:37.00K used_memory_scripts:0 used_memory_scripts_human:0B number_of_cached_scripts:0 maxmemory:104857600 //最大內存 maxmemory_human:100.00M //最大內存 maxmemory_policy:allkeys-lru //最大內存策略,使用lru策略 allocator_frag_ratio:3.01 allocator_frag_bytes:1762600 allocator_rss_ratio:1.00 allocator_rss_bytes:0 rss_overhead_ratio:1.01 rss_overhead_bytes:37888 mem_fragmentation_ratio:3.05 mem_fragmentation_bytes:1800488 mem_not_counted_for_evict:0 mem_replication_backlog:0 mem_clients_slaves:0 mem_clients_normal:49694 mem_aof_buffer:0 mem_allocator:libc active_defrag_running:0 lazyfree_pending_objects:0
18.2、sdk演示
sdk插入數據進行觀察內存變化,可以看到始終保持100M的內存
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.43.62:6379"); var db = redis.GetDatabase(0); for (int i = 0; i < int.MaxValue; i++) { var val = string.Join(",", Enumerable.Range(0, 10000)); db.StringSet(i.ToString(), val); Console.WriteLine($"當前{i} 塞入成功!"); } Console.ReadKey(); } }
使用redis-cli查看
127.0.0.1:6379> info memory # Memory used_memory:104808424 used_memory_human:99.95M used_memory_rss:171360256 used_memory_rss_human:163.42M used_memory_peak:104857232 used_memory_peak_human:100.00M used_memory_peak_perc:99.95% used_memory_overhead:1110857 used_memory_startup:860896 used_memory_dataset:103697567 used_memory_dataset_perc:99.76% allocator_allocated:104775568 allocator_active:171322368 allocator_resident:171322368 total_system_memory:1907941376 total_system_memory_human:1.78G used_memory_lua:37888 used_memory_lua_human:37.00K used_memory_scripts:0 used_memory_scripts_human:0B number_of_cached_scripts:0 maxmemory:104857600 maxmemory_human:100.00M maxmemory_policy:allkeys-lru allocator_frag_ratio:1.64 allocator_frag_bytes:66546800 allocator_rss_ratio:1.00 allocator_rss_bytes:0 rss_overhead_ratio:1.00 rss_overhead_bytes:37888 mem_fragmentation_ratio:1.64 mem_fragmentation_bytes:66584688 mem_not_counted_for_evict:0 mem_replication_backlog:0 mem_clients_slaves:0 mem_clients_normal:132433 mem_aof_buffer:0 mem_allocator:libc active_defrag_running:0 lazyfree_pending_objects:0
18.3、源碼簡要研究
redis 怎么知道剔除呢? 它是根據時間來進行控制的,怎么給對象安裝上時間的?我們首先要查看 redisObject 對象中的 lru
#define OBJ_SHARED_REFCOUNT INT_MAX typedef struct redisObject { unsigned type:4; unsigned encoding:4; unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock 自動記錄系統時鍾) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time 記錄最少次數).getcommond的時候會給lru賦值 */ int refcount; void *ptr; } robj;
我們接下來查看一下 getCommand 源碼
void getCommand(client *c) { getGenericCommand(c); }
我們接下來查看一下 getGenericCommand 源碼
int getGenericCommand(client *c) { robj *o; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL) return C_OK; if (o->type != OBJ_STRING) { addReply(c,shared.wrongtypeerr); return C_ERR; } else { addReplyBulk(c,o); return C_OK; } }
我們接下來查看一下 lookupKeyReadOrReply 源碼
robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) { robj *o = lookupKeyRead(c->db, key); if (!o) addReply(c,reply); return o; }
我們接下來查看一下 lookupKeyRead 源碼
/* Like lookupKeyReadWithFlags(), but does not use any flag, which is the * common case. */ robj *lookupKeyRead(redisDb *db, robj *key) { return lookupKeyReadWithFlags(db,key,LOOKUP_NONE); }
我們接下來查看一下 lookupKeyReadWithFlags 源碼
/* Lookup a key for read operations, or return NULL if the key is not found * in the specified DB. * * As a side effect of calling this function: * 1. A key gets expired if it reached it's TTL. * 2. The key last access time is updated. * 3. The global keys hits/misses stats are updated (reported in INFO). * * This API should not be used when we write to the key after obtaining * the object linked to the key, but only for read only operations. * * Flags change the behavior of this command: * * LOOKUP_NONE (or zero): no special flags are passed. * LOOKUP_NOTOUCH: don't alter the last access time of the key. * * Note: this function also returns NULL if the key is logically expired * but still existing, in case this is a slave, since this API is called only * for read operations. Even if the key expiry is master-driven, we can * correctly report a key is expired on slaves even if the master is lagging * expiring our key via DELs in the replication link. */ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { robj *val; if (expireIfNeeded(db,key) == 1) { /* Key expired. If we are in the context of a master, expireIfNeeded() * returns 0 only when the key does not exist at all, so it's safe * to return NULL ASAP. */ if (server.masterhost == NULL) { server.stat_keyspace_misses++; return NULL; } /* However if we are in the context of a slave, expireIfNeeded() will * not really try to expire the key, it only returns information * about the "logical" status of the key: key expiring is up to the * master in order to have a consistent view of master's data set. * * However, if the command caller is not the master, and as additional * safety measure, the command invoked is a read-only command, we can * safely return NULL here, and provide a more consistent behavior * to clients accessign expired values in a read-only fashion, that * will say the key as non existing. * * Notably this covers GETs when slaves are used to scale reads. */ if (server.current_client && server.current_client != server.master && server.current_client->cmd && server.current_client->cmd->flags & CMD_READONLY) { server.stat_keyspace_misses++; return NULL; } } val = lookupKey(db,key,flags); if (val == NULL) server.stat_keyspace_misses++; else server.stat_keyspace_hits++; return val; }
我們接下來查看一下 lookupKey 源碼
/* Low level key lookup API, not actually called directly from commands * implementations that should instead rely on lookupKeyRead(), * lookupKeyWrite() and lookupKeyReadWithFlags(). */ robj *lookupKey(redisDb *db, robj *key, int flags) { dictEntry *de = dictFind(db->dict,key->ptr); if (de) { robj *val = dictGetVal(de); /* Update the access time for the ageing algorithm. * Don't do it if we have a saving child, as this will trigger * a copy on write madness. */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && !(flags & LOOKUP_NOTOUCH)) { if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {//如果是lfu會更新系統值 updateLFU(val); } else {//如果是lru會賦值系統時鍾 val->lru = LRU_CLOCK(); } } return val; } else { return NULL; } }
我們接下來查看一下 createObject 源碼
robj *createObject(int type, void *ptr) { robj *o = zmalloc(sizeof(*o)); o->type = type; o->encoding = OBJ_ENCODING_RAW; o->ptr = ptr; o->refcount = 1; /* Set the LRU to the current lruclock (minutes resolution), or * alternatively the LFU counter. */ //分配lru時間 if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; } else { o->lru = LRU_CLOCK(); } return o; }
我們接下來查看一下 freeMemoryIfNeeded 源碼
/* This function is periodically called to see if there is memory to free * according to the current "maxmemory" settings. In case we are over the * memory limit, the function will try to free some memory to return back * under the limit. * * The function returns C_OK if we are under the memory limit or if we * were over the limit, but the attempt to free memory was successful. * Otehrwise if we are over the memory limit, but not enough memory * was freed to return back under the limit, the function returns C_ERR. */ int freeMemoryIfNeeded(void) { /* By default replicas should ignore maxmemory * and just be masters exact copies. */ if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK; size_t mem_reported, mem_tofree, mem_freed; mstime_t latency, eviction_latency; long long delta; int slaves = listLength(server.slaves); /* When clients are paused the dataset should be static not just from the * POV of clients not being able to write, but also from the POV of * expires and evictions of keys not being performed. */ if (clientsArePaused()) return C_OK; if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK) return C_OK; mem_freed = 0; if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) goto cant_free; /* We need to free memory, but policy forbids. */ latencyStartMonitor(latency); while (mem_freed < mem_tofree) { int j, k, i, keys_freed = 0; static unsigned int next_db = 0; sds bestkey = NULL; int bestdbid; redisDb *db; dict *dict; dictEntry *de; if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) || server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { struct evictionPoolEntry *pool = EvictionPoolLRU; while(bestkey == NULL) { unsigned long total_keys = 0, keys; /* We don't want to make local-db choices when expiring keys, * so to start populate the eviction pool sampling keys from * every DB. */ for (i = 0; i < server.dbnum; i++) {//遍歷數據庫 db = server.db+i; dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?//如果ALLkeys,則從所有的key中去找,否則從待過期的時間中去找 db->dict : db->expires; if ((keys = dictSize(dict)) != 0) { evictionPoolPopulate(i, dict, db->dict, pool); total_keys += keys; } } if (!total_keys) break; /* No keys to evict. *///沒有key則退出 /* Go backward from best to worst element to evict. */ for (k = EVPOOL_SIZE-1; k >= 0; k--) { if (pool[k].key == NULL) continue; bestdbid = pool[k].dbid; if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { de = dictFind(server.db[pool[k].dbid].dict, pool[k].key); } else { de = dictFind(server.db[pool[k].dbid].expires, pool[k].key); } /* Remove the entry from the pool. */ if (pool[k].key != pool[k].cached) sdsfree(pool[k].key); pool[k].key = NULL; pool[k].idle = 0; /* If the key exists, is our pick. Otherwise it is * a ghost and we need to try the next element. */ if (de) { bestkey = dictGetKey(de); break; } else { /* Ghost... Iterate again. */ } } } } /* volatile-random and allkeys-random policy */ else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM) { /* When evicting a random key, we try to evict a key for * each DB, so we use the static 'next_db' variable to * incrementally visit all DBs. */ for (i = 0; i < server.dbnum; i++) { j = (++next_db) % server.dbnum; db = server.db+j; dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ? db->dict : db->expires; if (dictSize(dict) != 0) { de = dictGetRandomKey(dict); bestkey = dictGetKey(de); bestdbid = j; break; } } } /* Finally remove the selected key. *///最終移除選擇key if (bestkey) { db = server.db+bestdbid; robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); propagateExpire(db,keyobj,server.lazyfree_lazy_eviction); /* We compute the amount of memory freed by db*Delete() alone. * It is possible that actually the memory needed to propagate * the DEL in AOF and replication link is greater than the one * we are freeing removing the key, but we can't account for * that otherwise we would never exit the loop. * * AOF and Output buffer memory will be freed eventually so * we only care about memory used by the key space. */ delta = (long long) zmalloc_used_memory(); latencyStartMonitor(eviction_latency); if (server.lazyfree_lazy_eviction) dbAsyncDelete(db,keyobj); else dbSyncDelete(db,keyobj); latencyEndMonitor(eviction_latency); latencyAddSampleIfNeeded("eviction-del",eviction_latency); latencyRemoveNestedEvent(latency,eviction_latency); delta -= (long long) zmalloc_used_memory(); mem_freed += delta; server.stat_evictedkeys++; notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", keyobj, db->id); decrRefCount(keyobj); keys_freed++; /* When the memory to free starts to be big enough, we may * start spending so much time here that is impossible to * deliver data to the slaves fast enough, so we force the * transmission here inside the loop. */ if (slaves) flushSlavesOutputBuffers(); /* Normally our stop condition is the ability to release * a fixed, pre-computed amount of memory. However when we * are deleting objects in another thread, it's better to * check, from time to time, if we already reached our target * memory, since the "mem_freed" amount is computed only * across the dbAsyncDelete() call, while the thread can * release the memory all the time. */ if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) { if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) { /* Let's satisfy our stop condition. */ mem_freed = mem_tofree; } } } if (!keys_freed) { latencyEndMonitor(latency); latencyAddSampleIfNeeded("eviction-cycle",latency); goto cant_free; /* nothing to free... */ } } latencyEndMonitor(latency); latencyAddSampleIfNeeded("eviction-cycle",latency); return C_OK; cant_free: /* We are here if we are not able to reclaim memory. There is only one * last thing we can try: check if the lazyfree thread has jobs in queue * and wait... */ while(bioPendingJobsOfType(BIO_LAZY_FREE)) { if (((mem_reported - zmalloc_used_memory()) + mem_freed) >= mem_tofree) break; usleep(1000); } return C_ERR; }
19、【限流分布鎖】限流和分布式鎖的場景介紹及sdk代碼演示
19.1、Redis實現限流
- 場景
- 防爬蟲,限制某個接口的調用頻次。 (10個/s)
- 限制並統計接口調用次數,按萬次收費。
- 實現思想
- 漏桶算法
可以有效的保護下游的系統。 - 令牌桶算法
- 區別
漏桶算法是鐵定的恆定輸出。
令牌桶算法是可支持短暫的突然流量。
- 漏桶算法
- 實現
- string 的 incr 或者 set/get 實現(lua腳本實現)
令牌桶lua腳本(基於string的insr)
View Code-- redis的key -- keys[1]: ip local rediskey="rate.limit."..KEYS[1]; local limit= redis.call("incr",rediskey); --每一個請求來了,我自增+1 local isOk=1; -- 這是第一次加入 if limit == 1 then redis.call("expire",rediskey,1); -- 這個key只有1s的有效期 else if limit >10 then --每秒中 10 個令牌 isOk=0; else redis.call("incr",rediskey); end end return isOk;
令牌桶lua腳本(基於get/set)
View Codelocal seconds=redis.call("time")[1]; local rediskey="rate.limit."..seconds .."." ..KEYS[1]; local limit= redis.call("get",rediskey); local isOk=1; -- 第一次加入 if limit==false then redis.call("set",rediskey,1,"EX",1); -- 10s 過期 else if tonumber(limit) >10 then isOk=0; else redis.call("incr",rediskey); end end return isOk;
sdk執行腳本
View Codeclass Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.2.107:6379"); var db = redis.GetDatabase(0); var script = File.ReadAllText("api.lua"); while (true) { var result= db.ScriptEvaluate(script, new RedisKey[1] { "192.168.1.1" }); Console.WriteLine(result); //1秒大約是12個請求 Thread.Sleep(80); } Console.ReadKey(); } }
- list 的 lpush 實現
lua腳本
View Code-- redis的key local ip=KEYS[1]; local rediskey="rate.limit."..ip; local limit= redis.call("llen",rediskey); local isOk=1; if limit > 10 then isOk=0; else redis.call("lpush",rediskey,ip); if limit ==1 then redis.call("expire",rediskey,1); -- 這個key只有1s的有效期 end end return isOk;
sdk執行腳本
View Codeclass Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.2.107:6379"); var db = redis.GetDatabase(0); var script = File.ReadAllText("api2.lua"); while (true) { var result= db.ScriptEvaluate(script, new RedisKey[1] { "192.168.1.1" }); Console.WriteLine(result); //1秒大約是12個請求 Thread.Sleep(80); } Console.ReadKey(); } }
- string 的 incr 或者 set/get 實現(lua腳本實現)
19.2、SDK中的分布式鎖
官方網址參考:https://redis.io/topics/distlock
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.2.107:6379"); var db = redis.GetDatabase(0); var isTake = db.LockTake("lock", "12345", TimeSpan.FromMinutes(10));//獲取鎖 if (isTake) { //TODO var isRelease = db.LockRelease("lock", "12345");//釋放鎖,釋放鎖的key和value要和獲取鎖保持一致 } Console.ReadKey(); } }
SDK實現
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.2.107:6379"); var db = redis.GetDatabase(0); var isTake = db.LockTake("lock", "12345", TimeSpan.FromMinutes(10));//獲取鎖,期間會執行set lock 12345 EX 600 NX if (isTake) { //TODO var isRelease = db.LockRelease("lock", "12345"); /* * 釋放鎖,釋放鎖的key和value要和獲取鎖保持一致 * 期間會執行set lock 12345 EX 600 NX * watch lock * get lock * 然后根據獲取的lock進行下一步執行是否需要繼續執行,因為當你del的時候,你要確定這個lock就是你當初創建lock * multi * del lock * 最后 * exec */ } Console.ReadKey(); } }
20、【Stream】對流式處理的理解和常用的stream命令介紹
20.1、什么叫做流式處理
- 場景
很多數據會隨着時間的推移價值大大流逝,所以需要實時計算。 - 流式處理的特征
- 事先定義好計算模型。【機器學習 】
- 數據持續的輸入, 結果持續的輸出。
- 例子1:金融風控:當前申請貸款的人: 是男是女,年齡多少,家里幾套房,戶口,銀行流水怎樣,有沒有不良記錄等等。。。 結果要實時反饋。影像到貸款的多少。。
- 例子2:實時預測:比如我們的這類系統,當用戶下單之后盡快給用戶推送,根據訂單金額,提供貢獻度,所在地區,黑名單 等等,發送實時的 猜你喜歡,實施可能的二次回購。
20.2、理解Redis中的流結構Stream(5.0)
官方文檔參考:https://redis.io/topics/streams-intro
官方命令文檔:https://redis.io/commands#stream
The Stream is a new data type introduced with Redis 5.0, which models a log data structure in a more abstract way, however the essence of the log is still intact: like a log file, often implemented as a file open in append only mode, Redis streams are primarily an append only data structure. At least conceptually, because being Redis Streams an abstract data type represented in memory, they implement more powerful operations, to overcome the limits of the log file itself. /*****************************/ Stream是Redis 5.0引入的一種新的數據類型,它以更抽象的方式對日志數據結構進行建模,但是日志的本質仍然是完整的:就像日志文件一樣,Redis streams通常是以僅附加模式打開的文件,主要是一種僅附加的數據結構。至少在概念上,因為Redis Streams是內存中表示的抽象數據類型,所以它們實現了更強大的操作,以克服日志文件本身的限制。
理解redis中的stream 模型

可以看出,寫入是最后一條寫入,讀取的話按序號向下取
127.0.0.1:6379> xadd logs * c1 c1 "1549195730591-0" //結構解析為時間+序號 127.0.0.1:6379> xadd logs * c2 c2 "1549197993824-0" //結構解析為時間+序號 127.0.0.1:6379> xadd logs * c3 c3 "1549198001356-0" //結構解析為時間+序號 127.0.0.1:6379> xread streams logs 1549197993824-0 //讀取當前序號的下一條 1) 1) "logs" 2) 1) 1) "1549198001356-0" 2) 1) "c3" 2) "c3" 127.0.0.1:6379> xread streams logs 0 //讀取所有,從0之后嘛 1) 1) "logs" 2) 1) 1) "1549195730591-0" 2) 1) "c1" 2) "c1" 2) 1) "1549197993824-0" 2) 1) "c2" 2) "c2" 3) 1) "1549198001356-0" 2) 1) "c3" 2) "c3" 127.0.0.1:6379> xread streams logs $ //讀取最后一條之后,就是沒有咯 (nil) 127.0.0.1:6379> xread count 1 streams logs 0 //讀取第一行 1) 1) "logs" 2) 1) 1) "1549195730591-0" 2) 1) "c1" 2) "c1" 127.0.0.1:6379> xread count 2 streams logs 0 //讀取前兩行 1) 1) "logs" 2) 1) 1) "1549195730591-0" 2) 1) "c1" 2) "c1" 2) 1) "1549197993824-0" 2) 1) "c2" 2) "c2"
20.3、SDK實現發布訂閱與數據保持
- 使用Xadd + XRead 實現 發布訂閱的功能
redis:publish/subscribe (不存儲數據)
stream: 存儲數據的。
- SDK實現
View Codeclass Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.2.107:6379"); var db = redis.GetDatabase(0); RedisValue nextID = (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000;//c#獲取毫秒數 while (true) { var list = db.StreamRead("logs", nextID, 1); if (list.Length == 0) { Thread.Sleep(10); continue; } var single = list[0]; //有點類似於游標 nextID = single.Id; Console.WriteLine(single.Values[0]); } Console.ReadKey(); } }

20.4、SDK實現xgroup, xreadgroup 實現多分組

127.0.0.1:6379> xrange logs - + 1) 1) "1549195730591-0" 2) 1) "c1" 2) "c1" 2) 1) "1549197993824-0" 2) 1) "c2" 2) "c2" 3) 1) "1549198001356-0" 2) 1) "c3" 2) "c3" 4) 1) "1549199460391-0" 2) 1) "c4" 2) "c4" 5) 1) "1549199719371-0" 2) 1) "c5" 2) "c5" 127.0.0.1:6379> xgroup create logs ctrip 1549197993824-0 //創建分組ctrip讀取,從c2開始 OK 127.0.0.1:6379> xreadgroup group ctrip jack streams logs > //按分組往后讀取ctrip 1) 1) "logs" 2) 1) 1) "1549198001356-0" 2) 1) "c3" 2) "c3" 2) 1) "1549199460391-0" 2) 1) "c4" 2) "c4" 3) 1) "1549199719371-0" 2) 1) "c5" 2) "c5" 127.0.0.1:6379> xinfo groups logs //查看所有的組信息 1) 1) "name" 2) "ctrip" 3) "consumers" 4) (integer) 1 5) "pending" //阻塞3個,是因為redis中流處理類似於rabbitmq中的ack機制 6) (integer) 3 7) "last-delivered-id" 8) "1549199719371-0"
SDK實現
業務1模擬(mary)
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.2.107:6379"); var db = redis.GetDatabase(0); while (true) { var entrylist = db.StreamReadGroup("logs", "ctrip", "mary", ">", count: 1); if (entrylist.Length == 0) { Console.WriteLine("暫無數據!"); Thread.Sleep(1000); continue; } var single = entrylist[0]; Console.WriteLine(single.Values[0]); //提交給redis確認(ack) db.StreamAcknowledge("logs", "ctrip", single.Id); } Console.ReadKey(); } }
業務2模擬(Jack)
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.2.107:6379"); var db = redis.GetDatabase(0); while (true) { var entrylist = db.StreamReadGroup("logs", "ctrip", "jack", ">", count: 1); if (entrylist.Length == 0) { Console.WriteLine("暫無數據!"); Thread.Sleep(1000); continue; } var single = entrylist[0]; Console.WriteLine(single.Values[0]); //提交給redis確認(ack) db.StreamAcknowledge("logs", "ctrip", single.Id); } Console.ReadKey(); } }
模擬數據插入
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.2.107:6379"); var db = redis.GetDatabase(0); for (int i = 0; i < int.MaxValue; i++) { db.StreamAdd("logs", i, i); Thread.Sleep(200); } Console.ReadKey(); } }
效果展示:資源競爭處理

21、【Module】模塊概念認知及sql on redis的RediSQL介紹
21.1、什么是module
redis4.0之后支持了module,第三方機構可以給予redis開發自己的module。通過module 給 redis添加新的數據類型,(bloomfilter,sql on redis)
This is a list of Redis modules, for Redis v4.0 or greater, ordered by Github stars. This list contains two set of modules: modules under an OSI approved license, and modules that are under some proprietary license. Non OSI modules are clearly flagged as not open source. Also to have the source code hosted at Github is currently mandatory. To add your module here please send a pull request for the modules.json file in the Redis-doc repository. /**********************/ 這是一個Redis模塊列表,適用於Redis v4.0或更高版本,由Github stars訂購。這個列表包含兩組模塊:OSI批准的許可下的模塊,以及一些私有許可下的模塊。非OSI模塊被明確標記為非開源。同時,將源代碼托管在Github上也是必須的。要在這里添加模塊,請發送模塊的拉取請求。Redis-doc存儲庫中的json文件。
21.2、實現一個簡單的module
- 引用redismodule.h 頭文件、初始化函數 RedisModule_Init(在源碼中,要拷貝進來)
View Code#ifndef REDISMODULE_H #define REDISMODULE_H #include <sys/types.h> #include <stdint.h> #include <stdio.h> /* ---------------- Defines common between core and modules --------------- */ /* Error status return values. */ #define REDISMODULE_OK 0 #define REDISMODULE_ERR 1 /* API versions. */ #define REDISMODULE_APIVER_1 1 /* API flags and constants */ #define REDISMODULE_READ (1<<0) #define REDISMODULE_WRITE (1<<1) #define REDISMODULE_LIST_HEAD 0 #define REDISMODULE_LIST_TAIL 1 /* Key types. */ #define REDISMODULE_KEYTYPE_EMPTY 0 #define REDISMODULE_KEYTYPE_STRING 1 #define REDISMODULE_KEYTYPE_LIST 2 #define REDISMODULE_KEYTYPE_HASH 3 #define REDISMODULE_KEYTYPE_SET 4 #define REDISMODULE_KEYTYPE_ZSET 5 #define REDISMODULE_KEYTYPE_MODULE 6 /* Reply types. */ #define REDISMODULE_REPLY_UNKNOWN -1 #define REDISMODULE_REPLY_STRING 0 #define REDISMODULE_REPLY_ERROR 1 #define REDISMODULE_REPLY_INTEGER 2 #define REDISMODULE_REPLY_ARRAY 3 #define REDISMODULE_REPLY_NULL 4 /* Postponed array length. */ #define REDISMODULE_POSTPONED_ARRAY_LEN -1 /* Expire */ #define REDISMODULE_NO_EXPIRE -1 /* Sorted set API flags. */ #define REDISMODULE_ZADD_XX (1<<0) #define REDISMODULE_ZADD_NX (1<<1) #define REDISMODULE_ZADD_ADDED (1<<2) #define REDISMODULE_ZADD_UPDATED (1<<3) #define REDISMODULE_ZADD_NOP (1<<4) /* Hash API flags. */ #define REDISMODULE_HASH_NONE 0 #define REDISMODULE_HASH_NX (1<<0) #define REDISMODULE_HASH_XX (1<<1) #define REDISMODULE_HASH_CFIELDS (1<<2) #define REDISMODULE_HASH_EXISTS (1<<3) /* Context Flags: Info about the current context returned by * RM_GetContextFlags(). */ /* The command is running in the context of a Lua script */ #define REDISMODULE_CTX_FLAGS_LUA (1<<0) /* The command is running inside a Redis transaction */ #define REDISMODULE_CTX_FLAGS_MULTI (1<<1) /* The instance is a master */ #define REDISMODULE_CTX_FLAGS_MASTER (1<<2) /* The instance is a slave */ #define REDISMODULE_CTX_FLAGS_SLAVE (1<<3) /* The instance is read-only (usually meaning it's a slave as well) */ #define REDISMODULE_CTX_FLAGS_READONLY (1<<4) /* The instance is running in cluster mode */ #define REDISMODULE_CTX_FLAGS_CLUSTER (1<<5) /* The instance has AOF enabled */ #define REDISMODULE_CTX_FLAGS_AOF (1<<6) /* The instance has RDB enabled */ #define REDISMODULE_CTX_FLAGS_RDB (1<<7) /* The instance has Maxmemory set */ #define REDISMODULE_CTX_FLAGS_MAXMEMORY (1<<8) /* Maxmemory is set and has an eviction policy that may delete keys */ #define REDISMODULE_CTX_FLAGS_EVICT (1<<9) /* Redis is out of memory according to the maxmemory flag. */ #define REDISMODULE_CTX_FLAGS_OOM (1<<10) /* Less than 25% of memory available according to maxmemory. */ #define REDISMODULE_CTX_FLAGS_OOM_WARNING (1<<11) #define REDISMODULE_NOTIFY_GENERIC (1<<2) /* g */ #define REDISMODULE_NOTIFY_STRING (1<<3) /* $ */ #define REDISMODULE_NOTIFY_LIST (1<<4) /* l */ #define REDISMODULE_NOTIFY_SET (1<<5) /* s */ #define REDISMODULE_NOTIFY_HASH (1<<6) /* h */ #define REDISMODULE_NOTIFY_ZSET (1<<7) /* z */ #define REDISMODULE_NOTIFY_EXPIRED (1<<8) /* x */ #define REDISMODULE_NOTIFY_EVICTED (1<<9) /* e */ #define REDISMODULE_NOTIFY_STREAM (1<<10) /* t */ #define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM) /* A */ /* A special pointer that we can use between the core and the module to signal * field deletion, and that is impossible to be a valid pointer. */ #define REDISMODULE_HASH_DELETE ((RedisModuleString*)(long)1) /* Error messages. */ #define REDISMODULE_ERRORMSG_WRONGTYPE "WRONGTYPE Operation against a key holding the wrong kind of value" #define REDISMODULE_POSITIVE_INFINITE (1.0/0.0) #define REDISMODULE_NEGATIVE_INFINITE (-1.0/0.0) /* Cluster API defines. */ #define REDISMODULE_NODE_ID_LEN 40 #define REDISMODULE_NODE_MYSELF (1<<0) #define REDISMODULE_NODE_MASTER (1<<1) #define REDISMODULE_NODE_SLAVE (1<<2) #define REDISMODULE_NODE_PFAIL (1<<3) #define REDISMODULE_NODE_FAIL (1<<4) #define REDISMODULE_NODE_NOFAILOVER (1<<5) #define REDISMODULE_CLUSTER_FLAG_NONE 0 #define REDISMODULE_CLUSTER_FLAG_NO_FAILOVER (1<<1) #define REDISMODULE_CLUSTER_FLAG_NO_REDIRECTION (1<<2) #define REDISMODULE_NOT_USED(V) ((void) V) /* This type represents a timer handle, and is returned when a timer is * registered and used in order to invalidate a timer. It's just a 64 bit * number, because this is how each timer is represented inside the radix tree * of timers that are going to expire, sorted by expire time. */ typedef uint64_t RedisModuleTimerID; /* ------------------------- End of common defines ------------------------ */ #ifndef REDISMODULE_CORE typedef long long mstime_t; /* Incomplete structures for compiler checks but opaque access. */ typedef struct RedisModuleCtx RedisModuleCtx; typedef struct RedisModuleKey RedisModuleKey; typedef struct RedisModuleString RedisModuleString; typedef struct RedisModuleCallReply RedisModuleCallReply; typedef struct RedisModuleIO RedisModuleIO; typedef struct RedisModuleType RedisModuleType; typedef struct RedisModuleDigest RedisModuleDigest; typedef struct RedisModuleBlockedClient RedisModuleBlockedClient; typedef struct RedisModuleClusterInfo RedisModuleClusterInfo; typedef struct RedisModuleDict RedisModuleDict; typedef struct RedisModuleDictIter RedisModuleDictIter; typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc); typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver); typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value); typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value); typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value); typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value); typedef void (*RedisModuleTypeFreeFunc)(void *value); typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len); typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); #define REDISMODULE_TYPE_METHOD_VERSION 1 typedef struct RedisModuleTypeMethods { uint64_t version; RedisModuleTypeLoadFunc rdb_load; RedisModuleTypeSaveFunc rdb_save; RedisModuleTypeRewriteFunc aof_rewrite; RedisModuleTypeMemUsageFunc mem_usage; RedisModuleTypeDigestFunc digest; RedisModuleTypeFreeFunc free; } RedisModuleTypeMethods; #define REDISMODULE_GET_API(name) \ RedisModule_GetApi("RedisModule_" #name, ((void **)&RedisModule_ ## name)) #define REDISMODULE_API_FUNC(x) (*x) void *REDISMODULE_API_FUNC(RedisModule_Alloc)(size_t bytes); void *REDISMODULE_API_FUNC(RedisModule_Realloc)(void *ptr, size_t bytes); void REDISMODULE_API_FUNC(RedisModule_Free)(void *ptr); void *REDISMODULE_API_FUNC(RedisModule_Calloc)(size_t nmemb, size_t size); char *REDISMODULE_API_FUNC(RedisModule_Strdup)(const char *str); int REDISMODULE_API_FUNC(RedisModule_GetApi)(const char *, void *); int REDISMODULE_API_FUNC(RedisModule_CreateCommand)(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep); void REDISMODULE_API_FUNC(RedisModule_SetModuleAttribs)(RedisModuleCtx *ctx, const char *name, int ver, int apiver); int REDISMODULE_API_FUNC(RedisModule_IsModuleNameBusy)(const char *name); int REDISMODULE_API_FUNC(RedisModule_WrongArity)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_ReplyWithLongLong)(RedisModuleCtx *ctx, long long ll); int REDISMODULE_API_FUNC(RedisModule_GetSelectedDb)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_SelectDb)(RedisModuleCtx *ctx, int newid); void *REDISMODULE_API_FUNC(RedisModule_OpenKey)(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode); void REDISMODULE_API_FUNC(RedisModule_CloseKey)(RedisModuleKey *kp); int REDISMODULE_API_FUNC(RedisModule_KeyType)(RedisModuleKey *kp); size_t REDISMODULE_API_FUNC(RedisModule_ValueLength)(RedisModuleKey *kp); int REDISMODULE_API_FUNC(RedisModule_ListPush)(RedisModuleKey *kp, int where, RedisModuleString *ele); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_ListPop)(RedisModuleKey *key, int where); RedisModuleCallReply *REDISMODULE_API_FUNC(RedisModule_Call)(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...); const char *REDISMODULE_API_FUNC(RedisModule_CallReplyProto)(RedisModuleCallReply *reply, size_t *len); void REDISMODULE_API_FUNC(RedisModule_FreeCallReply)(RedisModuleCallReply *reply); int REDISMODULE_API_FUNC(RedisModule_CallReplyType)(RedisModuleCallReply *reply); long long REDISMODULE_API_FUNC(RedisModule_CallReplyInteger)(RedisModuleCallReply *reply); size_t REDISMODULE_API_FUNC(RedisModule_CallReplyLength)(RedisModuleCallReply *reply); RedisModuleCallReply *REDISMODULE_API_FUNC(RedisModule_CallReplyArrayElement)(RedisModuleCallReply *reply, size_t idx); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateString)(RedisModuleCtx *ctx, const char *ptr, size_t len); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromLongLong)(RedisModuleCtx *ctx, long long ll); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromString)(RedisModuleCtx *ctx, const RedisModuleString *str); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringPrintf)(RedisModuleCtx *ctx, const char *fmt, ...); void REDISMODULE_API_FUNC(RedisModule_FreeString)(RedisModuleCtx *ctx, RedisModuleString *str); const char *REDISMODULE_API_FUNC(RedisModule_StringPtrLen)(const RedisModuleString *str, size_t *len); int REDISMODULE_API_FUNC(RedisModule_ReplyWithError)(RedisModuleCtx *ctx, const char *err); int REDISMODULE_API_FUNC(RedisModule_ReplyWithSimpleString)(RedisModuleCtx *ctx, const char *msg); int REDISMODULE_API_FUNC(RedisModule_ReplyWithArray)(RedisModuleCtx *ctx, long len); void REDISMODULE_API_FUNC(RedisModule_ReplySetArrayLength)(RedisModuleCtx *ctx, long len); int REDISMODULE_API_FUNC(RedisModule_ReplyWithStringBuffer)(RedisModuleCtx *ctx, const char *buf, size_t len); int REDISMODULE_API_FUNC(RedisModule_ReplyWithString)(RedisModuleCtx *ctx, RedisModuleString *str); int REDISMODULE_API_FUNC(RedisModule_ReplyWithNull)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_ReplyWithDouble)(RedisModuleCtx *ctx, double d); int REDISMODULE_API_FUNC(RedisModule_ReplyWithCallReply)(RedisModuleCtx *ctx, RedisModuleCallReply *reply); int REDISMODULE_API_FUNC(RedisModule_StringToLongLong)(const RedisModuleString *str, long long *ll); int REDISMODULE_API_FUNC(RedisModule_StringToDouble)(const RedisModuleString *str, double *d); void REDISMODULE_API_FUNC(RedisModule_AutoMemory)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_Replicate)(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...); int REDISMODULE_API_FUNC(RedisModule_ReplicateVerbatim)(RedisModuleCtx *ctx); const char *REDISMODULE_API_FUNC(RedisModule_CallReplyStringPtr)(RedisModuleCallReply *reply, size_t *len); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromCallReply)(RedisModuleCallReply *reply); int REDISMODULE_API_FUNC(RedisModule_DeleteKey)(RedisModuleKey *key); int REDISMODULE_API_FUNC(RedisModule_UnlinkKey)(RedisModuleKey *key); int REDISMODULE_API_FUNC(RedisModule_StringSet)(RedisModuleKey *key, RedisModuleString *str); char *REDISMODULE_API_FUNC(RedisModule_StringDMA)(RedisModuleKey *key, size_t *len, int mode); int REDISMODULE_API_FUNC(RedisModule_StringTruncate)(RedisModuleKey *key, size_t newlen); mstime_t REDISMODULE_API_FUNC(RedisModule_GetExpire)(RedisModuleKey *key); int REDISMODULE_API_FUNC(RedisModule_SetExpire)(RedisModuleKey *key, mstime_t expire); int REDISMODULE_API_FUNC(RedisModule_ZsetAdd)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr); int REDISMODULE_API_FUNC(RedisModule_ZsetIncrby)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore); int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModuleString *ele, double *score); int REDISMODULE_API_FUNC(RedisModule_ZsetRem)(RedisModuleKey *key, RedisModuleString *ele, int *deleted); void REDISMODULE_API_FUNC(RedisModule_ZsetRangeStop)(RedisModuleKey *key); int REDISMODULE_API_FUNC(RedisModule_ZsetFirstInScoreRange)(RedisModuleKey *key, double min, double max, int minex, int maxex); int REDISMODULE_API_FUNC(RedisModule_ZsetLastInScoreRange)(RedisModuleKey *key, double min, double max, int minex, int maxex); int REDISMODULE_API_FUNC(RedisModule_ZsetFirstInLexRange)(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max); int REDISMODULE_API_FUNC(RedisModule_ZsetLastInLexRange)(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_ZsetRangeCurrentElement)(RedisModuleKey *key, double *score); int REDISMODULE_API_FUNC(RedisModule_ZsetRangeNext)(RedisModuleKey *key); int REDISMODULE_API_FUNC(RedisModule_ZsetRangePrev)(RedisModuleKey *key); int REDISMODULE_API_FUNC(RedisModule_ZsetRangeEndReached)(RedisModuleKey *key); int REDISMODULE_API_FUNC(RedisModule_HashSet)(RedisModuleKey *key, int flags, ...); int REDISMODULE_API_FUNC(RedisModule_HashGet)(RedisModuleKey *key, int flags, ...); int REDISMODULE_API_FUNC(RedisModule_IsKeysPositionRequest)(RedisModuleCtx *ctx); void REDISMODULE_API_FUNC(RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos); unsigned long long REDISMODULE_API_FUNC(RedisModule_GetClientId)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_GetContextFlags)(RedisModuleCtx *ctx); void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes); RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods); int REDISMODULE_API_FUNC(RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value); RedisModuleType *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetType)(RedisModuleKey *key); void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetValue)(RedisModuleKey *key); void REDISMODULE_API_FUNC(RedisModule_SaveUnsigned)(RedisModuleIO *io, uint64_t value); uint64_t REDISMODULE_API_FUNC(RedisModule_LoadUnsigned)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_SaveSigned)(RedisModuleIO *io, int64_t value); int64_t REDISMODULE_API_FUNC(RedisModule_LoadSigned)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_EmitAOF)(RedisModuleIO *io, const char *cmdname, const char *fmt, ...); void REDISMODULE_API_FUNC(RedisModule_SaveString)(RedisModuleIO *io, RedisModuleString *s); void REDISMODULE_API_FUNC(RedisModule_SaveStringBuffer)(RedisModuleIO *io, const char *str, size_t len); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_LoadString)(RedisModuleIO *io); char *REDISMODULE_API_FUNC(RedisModule_LoadStringBuffer)(RedisModuleIO *io, size_t *lenptr); void REDISMODULE_API_FUNC(RedisModule_SaveDouble)(RedisModuleIO *io, double value); double REDISMODULE_API_FUNC(RedisModule_LoadDouble)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_SaveFloat)(RedisModuleIO *io, float value); float REDISMODULE_API_FUNC(RedisModule_LoadFloat)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...); void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...); int REDISMODULE_API_FUNC(RedisModule_StringAppendBuffer)(RedisModuleCtx *ctx, RedisModuleString *str, const char *buf, size_t len); void REDISMODULE_API_FUNC(RedisModule_RetainString)(RedisModuleCtx *ctx, RedisModuleString *str); int REDISMODULE_API_FUNC(RedisModule_StringCompare)(RedisModuleString *a, RedisModuleString *b); RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetContextFromIO)(RedisModuleIO *io); long long REDISMODULE_API_FUNC(RedisModule_Milliseconds)(void); void REDISMODULE_API_FUNC(RedisModule_DigestAddStringBuffer)(RedisModuleDigest *md, unsigned char *ele, size_t len); void REDISMODULE_API_FUNC(RedisModule_DigestAddLongLong)(RedisModuleDigest *md, long long ele); void REDISMODULE_API_FUNC(RedisModule_DigestEndSequence)(RedisModuleDigest *md); RedisModuleDict *REDISMODULE_API_FUNC(RedisModule_CreateDict)(RedisModuleCtx *ctx); void REDISMODULE_API_FUNC(RedisModule_FreeDict)(RedisModuleCtx *ctx, RedisModuleDict *d); uint64_t REDISMODULE_API_FUNC(RedisModule_DictSize)(RedisModuleDict *d); int REDISMODULE_API_FUNC(RedisModule_DictSetC)(RedisModuleDict *d, void *key, size_t keylen, void *ptr); int REDISMODULE_API_FUNC(RedisModule_DictReplaceC)(RedisModuleDict *d, void *key, size_t keylen, void *ptr); int REDISMODULE_API_FUNC(RedisModule_DictSet)(RedisModuleDict *d, RedisModuleString *key, void *ptr); int REDISMODULE_API_FUNC(RedisModule_DictReplace)(RedisModuleDict *d, RedisModuleString *key, void *ptr); void *REDISMODULE_API_FUNC(RedisModule_DictGetC)(RedisModuleDict *d, void *key, size_t keylen, int *nokey); void *REDISMODULE_API_FUNC(RedisModule_DictGet)(RedisModuleDict *d, RedisModuleString *key, int *nokey); int REDISMODULE_API_FUNC(RedisModule_DictDelC)(RedisModuleDict *d, void *key, size_t keylen, void *oldval); int REDISMODULE_API_FUNC(RedisModule_DictDel)(RedisModuleDict *d, RedisModuleString *key, void *oldval); RedisModuleDictIter *REDISMODULE_API_FUNC(RedisModule_DictIteratorStartC)(RedisModuleDict *d, const char *op, void *key, size_t keylen); RedisModuleDictIter *REDISMODULE_API_FUNC(RedisModule_DictIteratorStart)(RedisModuleDict *d, const char *op, RedisModuleString *key); void REDISMODULE_API_FUNC(RedisModule_DictIteratorStop)(RedisModuleDictIter *di); int REDISMODULE_API_FUNC(RedisModule_DictIteratorReseekC)(RedisModuleDictIter *di, const char *op, void *key, size_t keylen); int REDISMODULE_API_FUNC(RedisModule_DictIteratorReseek)(RedisModuleDictIter *di, const char *op, RedisModuleString *key); void *REDISMODULE_API_FUNC(RedisModule_DictNextC)(RedisModuleDictIter *di, size_t *keylen, void **dataptr); void *REDISMODULE_API_FUNC(RedisModule_DictPrevC)(RedisModuleDictIter *di, size_t *keylen, void **dataptr); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_DictNext)(RedisModuleCtx *ctx, RedisModuleDictIter *di, void **dataptr); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_DictPrev)(RedisModuleCtx *ctx, RedisModuleDictIter *di, void **dataptr); int REDISMODULE_API_FUNC(RedisModule_DictCompareC)(RedisModuleDictIter *di, const char *op, void *key, size_t keylen); int REDISMODULE_API_FUNC(RedisModule_DictCompare)(RedisModuleDictIter *di, const char *op, RedisModuleString *key); /* Experimental APIs */ #ifdef REDISMODULE_EXPERIMENTAL_API #define REDISMODULE_EXPERIMENTAL_API_VERSION 3 RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms); int REDISMODULE_API_FUNC(RedisModule_UnblockClient)(RedisModuleBlockedClient *bc, void *privdata); int REDISMODULE_API_FUNC(RedisModule_IsBlockedReplyRequest)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_IsBlockedTimeoutRequest)(RedisModuleCtx *ctx); void *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientPrivateData)(RedisModuleCtx *ctx); RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientHandle)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_AbortBlock)(RedisModuleBlockedClient *bc); RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetThreadSafeContext)(RedisModuleBlockedClient *bc); void REDISMODULE_API_FUNC(RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx); void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx); void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb); int REDISMODULE_API_FUNC(RedisModule_BlockedClientDisconnected)(RedisModuleCtx *ctx); void REDISMODULE_API_FUNC(RedisModule_RegisterClusterMessageReceiver)(RedisModuleCtx *ctx, uint8_t type, RedisModuleClusterMessageReceiver callback); int REDISMODULE_API_FUNC(RedisModule_SendClusterMessage)(RedisModuleCtx *ctx, char *target_id, uint8_t type, unsigned char *msg, uint32_t len); int REDISMODULE_API_FUNC(RedisModule_GetClusterNodeInfo)(RedisModuleCtx *ctx, const char *id, char *ip, char *master_id, int *port, int *flags); char **REDISMODULE_API_FUNC(RedisModule_GetClusterNodesList)(RedisModuleCtx *ctx, size_t *numnodes); void REDISMODULE_API_FUNC(RedisModule_FreeClusterNodesList)(char **ids); RedisModuleTimerID REDISMODULE_API_FUNC(RedisModule_CreateTimer)(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data); int REDISMODULE_API_FUNC(RedisModule_StopTimer)(RedisModuleCtx *ctx, RedisModuleTimerID id, void **data); int REDISMODULE_API_FUNC(RedisModule_GetTimerInfo)(RedisModuleCtx *ctx, RedisModuleTimerID id, uint64_t *remaining, void **data); const char *REDISMODULE_API_FUNC(RedisModule_GetMyClusterID)(void); size_t REDISMODULE_API_FUNC(RedisModule_GetClusterSize)(void); void REDISMODULE_API_FUNC(RedisModule_GetRandomBytes)(unsigned char *dst, size_t len); void REDISMODULE_API_FUNC(RedisModule_GetRandomHexChars)(char *dst, size_t len); void REDISMODULE_API_FUNC(RedisModule_SetDisconnectCallback)(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback); void REDISMODULE_API_FUNC(RedisModule_SetClusterFlags)(RedisModuleCtx *ctx, uint64_t flags); #endif /* This is included inline inside each Redis module. */ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) __attribute__((unused)); static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) { void *getapifuncptr = ((void**)ctx)[0]; RedisModule_GetApi = (int (*)(const char *, void *)) (unsigned long)getapifuncptr; REDISMODULE_GET_API(Alloc); REDISMODULE_GET_API(Calloc); REDISMODULE_GET_API(Free); REDISMODULE_GET_API(Realloc); REDISMODULE_GET_API(Strdup); REDISMODULE_GET_API(CreateCommand); REDISMODULE_GET_API(SetModuleAttribs); REDISMODULE_GET_API(IsModuleNameBusy); REDISMODULE_GET_API(WrongArity); REDISMODULE_GET_API(ReplyWithLongLong); REDISMODULE_GET_API(ReplyWithError); REDISMODULE_GET_API(ReplyWithSimpleString); REDISMODULE_GET_API(ReplyWithArray); REDISMODULE_GET_API(ReplySetArrayLength); REDISMODULE_GET_API(ReplyWithStringBuffer); REDISMODULE_GET_API(ReplyWithString); REDISMODULE_GET_API(ReplyWithNull); REDISMODULE_GET_API(ReplyWithCallReply); REDISMODULE_GET_API(ReplyWithDouble); REDISMODULE_GET_API(ReplySetArrayLength); REDISMODULE_GET_API(GetSelectedDb); REDISMODULE_GET_API(SelectDb); REDISMODULE_GET_API(OpenKey); REDISMODULE_GET_API(CloseKey); REDISMODULE_GET_API(KeyType); REDISMODULE_GET_API(ValueLength); REDISMODULE_GET_API(ListPush); REDISMODULE_GET_API(ListPop); REDISMODULE_GET_API(StringToLongLong); REDISMODULE_GET_API(StringToDouble); REDISMODULE_GET_API(Call); REDISMODULE_GET_API(CallReplyProto); REDISMODULE_GET_API(FreeCallReply); REDISMODULE_GET_API(CallReplyInteger); REDISMODULE_GET_API(CallReplyType); REDISMODULE_GET_API(CallReplyLength); REDISMODULE_GET_API(CallReplyArrayElement); REDISMODULE_GET_API(CallReplyStringPtr); REDISMODULE_GET_API(CreateStringFromCallReply); REDISMODULE_GET_API(CreateString); REDISMODULE_GET_API(CreateStringFromLongLong); REDISMODULE_GET_API(CreateStringFromString); REDISMODULE_GET_API(CreateStringPrintf); REDISMODULE_GET_API(FreeString); REDISMODULE_GET_API(StringPtrLen); REDISMODULE_GET_API(AutoMemory); REDISMODULE_GET_API(Replicate); REDISMODULE_GET_API(ReplicateVerbatim); REDISMODULE_GET_API(DeleteKey); REDISMODULE_GET_API(UnlinkKey); REDISMODULE_GET_API(StringSet); REDISMODULE_GET_API(StringDMA); REDISMODULE_GET_API(StringTruncate); REDISMODULE_GET_API(GetExpire); REDISMODULE_GET_API(SetExpire); REDISMODULE_GET_API(ZsetAdd); REDISMODULE_GET_API(ZsetIncrby); REDISMODULE_GET_API(ZsetScore); REDISMODULE_GET_API(ZsetRem); REDISMODULE_GET_API(ZsetRangeStop); REDISMODULE_GET_API(ZsetFirstInScoreRange); REDISMODULE_GET_API(ZsetLastInScoreRange); REDISMODULE_GET_API(ZsetFirstInLexRange); REDISMODULE_GET_API(ZsetLastInLexRange); REDISMODULE_GET_API(ZsetRangeCurrentElement); REDISMODULE_GET_API(ZsetRangeNext); REDISMODULE_GET_API(ZsetRangePrev); REDISMODULE_GET_API(ZsetRangeEndReached); REDISMODULE_GET_API(HashSet); REDISMODULE_GET_API(HashGet); REDISMODULE_GET_API(IsKeysPositionRequest); REDISMODULE_GET_API(KeyAtPos); REDISMODULE_GET_API(GetClientId); REDISMODULE_GET_API(GetContextFlags); REDISMODULE_GET_API(PoolAlloc); REDISMODULE_GET_API(CreateDataType); REDISMODULE_GET_API(ModuleTypeSetValue); REDISMODULE_GET_API(ModuleTypeGetType); REDISMODULE_GET_API(ModuleTypeGetValue); REDISMODULE_GET_API(SaveUnsigned); REDISMODULE_GET_API(LoadUnsigned); REDISMODULE_GET_API(SaveSigned); REDISMODULE_GET_API(LoadSigned); REDISMODULE_GET_API(SaveString); REDISMODULE_GET_API(SaveStringBuffer); REDISMODULE_GET_API(LoadString); REDISMODULE_GET_API(LoadStringBuffer); REDISMODULE_GET_API(SaveDouble); REDISMODULE_GET_API(LoadDouble); REDISMODULE_GET_API(SaveFloat); REDISMODULE_GET_API(LoadFloat); REDISMODULE_GET_API(EmitAOF); REDISMODULE_GET_API(Log); REDISMODULE_GET_API(LogIOError); REDISMODULE_GET_API(StringAppendBuffer); REDISMODULE_GET_API(RetainString); REDISMODULE_GET_API(StringCompare); REDISMODULE_GET_API(GetContextFromIO); REDISMODULE_GET_API(Milliseconds); REDISMODULE_GET_API(DigestAddStringBuffer); REDISMODULE_GET_API(DigestAddLongLong); REDISMODULE_GET_API(DigestEndSequence); REDISMODULE_GET_API(CreateDict); REDISMODULE_GET_API(FreeDict); REDISMODULE_GET_API(DictSize); REDISMODULE_GET_API(DictSetC); REDISMODULE_GET_API(DictReplaceC); REDISMODULE_GET_API(DictSet); REDISMODULE_GET_API(DictReplace); REDISMODULE_GET_API(DictGetC); REDISMODULE_GET_API(DictGet); REDISMODULE_GET_API(DictDelC); REDISMODULE_GET_API(DictDel); REDISMODULE_GET_API(DictIteratorStartC); REDISMODULE_GET_API(DictIteratorStart); REDISMODULE_GET_API(DictIteratorStop); REDISMODULE_GET_API(DictIteratorReseekC); REDISMODULE_GET_API(DictIteratorReseek); REDISMODULE_GET_API(DictNextC); REDISMODULE_GET_API(DictPrevC); REDISMODULE_GET_API(DictNext); REDISMODULE_GET_API(DictPrev); REDISMODULE_GET_API(DictCompare); REDISMODULE_GET_API(DictCompareC); #ifdef REDISMODULE_EXPERIMENTAL_API REDISMODULE_GET_API(GetThreadSafeContext); REDISMODULE_GET_API(FreeThreadSafeContext); REDISMODULE_GET_API(ThreadSafeContextLock); REDISMODULE_GET_API(ThreadSafeContextUnlock); REDISMODULE_GET_API(BlockClient); REDISMODULE_GET_API(UnblockClient); REDISMODULE_GET_API(IsBlockedReplyRequest); REDISMODULE_GET_API(IsBlockedTimeoutRequest); REDISMODULE_GET_API(GetBlockedClientPrivateData); REDISMODULE_GET_API(GetBlockedClientHandle); REDISMODULE_GET_API(AbortBlock); REDISMODULE_GET_API(SetDisconnectCallback); REDISMODULE_GET_API(SubscribeToKeyspaceEvents); REDISMODULE_GET_API(BlockedClientDisconnected); REDISMODULE_GET_API(RegisterClusterMessageReceiver); REDISMODULE_GET_API(SendClusterMessage); REDISMODULE_GET_API(GetClusterNodeInfo); REDISMODULE_GET_API(GetClusterNodesList); REDISMODULE_GET_API(FreeClusterNodesList); REDISMODULE_GET_API(CreateTimer); REDISMODULE_GET_API(StopTimer); REDISMODULE_GET_API(GetTimerInfo); REDISMODULE_GET_API(GetMyClusterID); REDISMODULE_GET_API(GetClusterSize); REDISMODULE_GET_API(GetRandomBytes); REDISMODULE_GET_API(GetRandomHexChars); REDISMODULE_GET_API(SetClusterFlags); #endif if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; RedisModule_SetModuleAttribs(ctx,name,ver,apiver); return REDISMODULE_OK; } #else /* Things only defined for the modules core, not exported to modules * including this file. */ #define RedisModuleString robj #endif /* REDISMODULE_CORE */ #endif /* REDISMOUDLE_H */
- 實現入口函數 RedisModule_OnLoad 、實現自己的自定義函數 xxxx_createmand (module.c)
View Code#include "redismodule.h" #include <stdlib.h> // 你的業務代碼 int MyRand_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_ReplyWithLongLong(ctx,rand()); return REDISMODULE_OK; } // redis 加載 module 的入口函數 int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { //給你的module定義 名稱和版本信息 if (RedisModule_Init(ctx,"ctrip",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"ctrip.rand",MyRand_RedisCommand,"",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; return REDISMODULE_OK; }
- 編寫編譯文件Makefile
View CodeDEBUGFLAGS = -g -ggdb -O2 ifeq ($(DEBUG), 1) DEBUGFLAGS = -g -ggdb -O0 endif # find the OS uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') CFLAGS = -Wall -Wno-unused-function $(DEBUGFLAGS) -fPIC -std=gnu99 -D_GNU_SOURCE CC:=$(shell sh -c 'type $(CC) >/dev/null 2>/dev/null && echo $(CC) || echo gcc') # Compile flags for linux / osx ifeq ($(uname_S),Linux) SHOBJ_CFLAGS ?= -fno-common -g -ggdb SHOBJ_LDFLAGS ?= -shared -Bsymbolic -Bsymbolic-functions else CFLAGS += -mmacosx-version-min=10.6 SHOBJ_CFLAGS ?= -dynamic -fno-common -g -ggdb SHOBJ_LDFLAGS ?= -dylib -exported_symbol _RedisModule_OnLoad -macosx_version_min 10.6 endif SOURCEDIR=$(shell pwd -P) CC_SOURCES = $(wildcard $(SOURCEDIR)/*.c) $(wildcard $(SOURCEDIR)/dep/*.c) CC_OBJECTS = $(patsubst $(SOURCEDIR)/%.c, $(SOURCEDIR)/%.o, $(CC_SOURCES)) all: ctrip.so ctrip.so: $(CC_OBJECTS) $(LD) -o $@ $(CC_OBJECTS) $(SHOBJ_LDFLAGS) -lc clean: rm -rvf *.xo *.so *.o *.a
- 執行 rz 命令將 redismodule.h 、 module.c 、 Makefile 文件拷貝到redis文件夾下新建的module文件夾下,然后使用 make 編譯一下,然后生成 ctrip.so 動態庫
View Code[root@localhost redis]# mkdir module [root@localhost redis]# cd module/ [root@localhost module]# rz [root@localhost module]# ls Makefile module.c redismodule.h [root@localhost module]# make cc -Wall -Wno-unused-function -g -ggdb -O2 -fPIC -std=gnu99 -D_GNU_SOURCE -c -o /data/redis/module/module.o /data/redis/module/module.c ld -o ctrip.so /data/redis/module/module.o -shared -Bsymbolic -Bsymbolic-functions -lc [root@localhost module]# ls ctrip.so Makefile module.c module.o redismodule.h [root@localhost module]# pwd /data/redis/module
- redis啟動的時候加載module
./redis-server ./redis.conf --loadmodule ./module/ctrip.so
- 客戶端使用
View Code[root@localhost redis]# ./redis-cli 127.0.0.1:6379> ctrip.rand (integer) 1445136292 127.0.0.1:6379> ctrip.rand (integer) 475099848 127.0.0.1:6379> ctrip.rand (integer) 2031241573
- 其他命令
127.0.0.1:6379> module list //查看module命令 1) 1) "name" 2) "ctrip" 3) "ver" 4) (integer) 1 127.0.0.1:6379> module unload ctrip //卸載module OK 127.0.0.1:6379> module load /data/redis/ctrip.so //加載module (error) ERR Error loading the extension. Please check the server logs. 127.0.0.1:6379> module load /data/redis/module/ctrip.so OK
21.3、rediSql的安裝和使用
官方地址:https://github.com/RedBeardLab/rediSQL
下載v0.7.1版本:https://github.com/RedBeardLab/rediSQL/releases/download/v0.7.1/rediSQL_0.7.1.so
將文件拷貝到文件夾並加載
127.0.0.1:6379> module load /data/redis/rediSQL_0.7.1.so OK
客戶端執行語句
127.0.0.1:6379> REDISQL.CREATE_DB DB //創建數據庫 OK 127.0.0.1:6379> REDISQL.EXEC DB "CREATE TABLE person(id int, username text);" //創建person表 1) DONE 2) (integer) 0 127.0.0.1:6379> REDISQL.EXEC DB "INSERT INTO person VALUES(1,'jack');" //插入person表數據 1) DONE 2) (integer) 1 127.0.0.1:6379> REDISQL.EXEC DB "INSERT INTO person VALUES(1,'mary');" 1) DONE 2) (integer) 1 127.0.0.1:6379> REDISQL.EXEC DB "select * from person;" //查詢person表數據 1) 1) (integer) 1 2) "jack" 2) 1) (integer) 1 2) "mary"
22、【監控】使用es+kibana+metricsbeat對redis進行監控
22.1、如何監控redis
- info命令https://redis.io/commands/info
可以通過info命令查看
View Code127.0.0.1:6379> info # Server redis_version:5.0.3 redis_git_sha1:00000000 redis_git_dirty:0 redis_build_id:7e97aa5c23979213 redis_mode:standalone os:Linux 3.10.0-957.el7.x86_64 x86_64 arch_bits:64 multiplexing_api:epoll atomicvar_api:atomic-builtin gcc_version:4.8.5 process_id:19834 run_id:d50dc9ef71d45e3ee56bf5bd20d6a55e539fd476 tcp_port:6379 uptime_in_seconds:1546 uptime_in_days:0 hz:10 configured_hz:10 lru_clock:5845738 executable:/data/redis/./redis-server config_file:/data/redis/./redis.conf # Clients connected_clients:1 client_recent_max_input_buffer:2 client_recent_max_output_buffer:0 blocked_clients:0 # Memory used_memory:913320 used_memory_human:891.91K used_memory_rss:3870720 used_memory_rss_human:3.69M used_memory_peak:913320 used_memory_peak_human:891.91K used_memory_peak_perc:100.11% used_memory_overhead:910838 used_memory_startup:861072 used_memory_dataset:2482 used_memory_dataset_perc:4.75% allocator_allocated:879528 allocator_active:3832832 allocator_resident:3832832 total_system_memory:1907941376 total_system_memory_human:1.78G used_memory_lua:37888 used_memory_lua_human:37.00K used_memory_scripts:0 used_memory_scripts_human:0B number_of_cached_scripts:0 maxmemory:104857600 maxmemory_human:100.00M maxmemory_policy:allkeys-lru allocator_frag_ratio:4.36 allocator_frag_bytes:2953304 allocator_rss_ratio:1.00 allocator_rss_bytes:0 rss_overhead_ratio:1.01 rss_overhead_bytes:37888 mem_fragmentation_ratio:4.40 mem_fragmentation_bytes:2991192 mem_not_counted_for_evict:0 mem_replication_backlog:0 mem_clients_slaves:0 mem_clients_normal:49694 mem_aof_buffer:0 mem_allocator:libc active_defrag_running:0 lazyfree_pending_objects:0 # Persistence loading:0 rdb_changes_since_last_save:0 rdb_bgsave_in_progress:0 rdb_last_save_time:1549349088 rdb_last_bgsave_status:ok rdb_last_bgsave_time_sec:0 rdb_current_bgsave_time_sec:-1 rdb_last_cow_size:557056 aof_enabled:0 aof_rewrite_in_progress:0 aof_rewrite_scheduled:0 aof_last_rewrite_time_sec:-1 aof_current_rewrite_time_sec:-1 aof_last_bgrewrite_status:ok aof_last_write_status:ok aof_last_cow_size:0 # Stats total_connections_received:3 total_commands_processed:17 instantaneous_ops_per_sec:0 total_net_input_bytes:772 total_net_output_bytes:34922 instantaneous_input_kbps:0.00 instantaneous_output_kbps:0.00 rejected_connections:0 sync_full:0 sync_partial_ok:0 sync_partial_err:0 expired_keys:0 expired_stale_perc:0.00 expired_time_cap_reached_count:0 evicted_keys:0 keyspace_hits:0 keyspace_misses:0 pubsub_channels:0 pubsub_patterns:0 latest_fork_usec:172 migrate_cached_sockets:0 slave_expires_tracked_keys:0 active_defrag_hits:0 active_defrag_misses:0 active_defrag_key_hits:0 active_defrag_key_misses:0 # Replication role:master connected_slaves:0 master_replid:343fc546128e3dc41ecb5f178fae6d5e02e390ea master_replid2:0000000000000000000000000000000000000000 master_repl_offset:0 second_repl_offset:-1 repl_backlog_active:0 repl_backlog_size:1048576 repl_backlog_first_byte_offset:0 repl_backlog_histlen:0 # CPU used_cpu_sys:1.949254 used_cpu_user:1.224226 used_cpu_sys_children:0.003597 used_cpu_user_children:0.000000 # Cluster cluster_enabled:0 # Keyspace db0:keys=1,expires=0,avg_ttl=0
server: General information about the Redis serverclients: Client connections sectionmemory: Memory consumption related informationpersistence: RDB and AOF related informationstats: General statisticsreplication: Master/replica replication informationcpu: CPU consumption statisticscommandstats: Redis command statisticscluster: Redis Cluster sectionkeyspace: Database related statistics
It can also take the following values:
all: Return all sectionsdefault: Return only the default set of sections
- monitor命令https://redis.io/commands/monitor
可以隨時查看多個客戶端進行處理的命令語句
View Code//客戶端1 127.0.0.1:6379> monitor OK //客戶端2 [root@localhost redis]# ./redis-cli //客戶端1 1549349998.266239 [0 127.0.0.1:46604] "COMMAND" 127.0.0.1:6379> keys * 1) "DB" //客戶端2 1549350008.437912 [0 127.0.0.1:46604] "keys" "*"
- ./redis-cli --stat命令
[root@localhost redis]# ./redis-cli --stat ------- data ------ --------------------- load -------------------- - child - 鍵 內存 客戶端 阻塞數 請求數 連接數 keys mem clients blocked requests connections 1 909.06K 2 0 21 (+0) 5 1 909.06K 2 0 22 (+1) 5 1 909.06K 2 0 23 (+1) 5 1 909.06K 2 0 24 (+1) 5 1 909.06K 2 0 25 (+1) 5 1 909.06K 2 0 26 (+1) 5 1 909.06K 2 0 27 (+1) 5 1 909.06K 2 0 28 (+1) 5
22.2、專業的elasticsearch + kibana + metricbeat 對redis進行監控
- 搭建
docker 安裝 es +kibana
metric 安裝 https://www.elastic.co/downloadsmetricbeat elasticsearch kibana 采集器 數據分析,搜索 luncene web查看工具
- 通過docker安裝es +kibana+redis
docker-es(5.6.14):https://hub.docker.com/_/elasticsearch安裝
docker pull docker.elastic.co/elasticsearch/elasticsearch:5.6.14
docker-kibana(5.6.14):https://hub.docker.com/_/kibana安裝
docker pull docker.elastic.co/kibana/kibana:5.6.14
docker-redis:
docker run --name some-redis -p 6379:6379 -d redis
- 查看安裝的docker鏡像
[root@localhost ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE docker.io/redis latest 82629e941a38 13 days ago 95 MB docker.elastic.co/kibana/kibana 5.6.14 b5d65e1bd763 7 weeks ago 659 MB docker.elastic.co/elasticsearch/elasticsearch 5.6.14 cbf18c3f8c43 7 weeks ago 663 MB
- 安裝 metricbeat ,在redis文件夾下創建 elastic 文件夾,然后下載 Metricbeat 安裝包
[root@localhost redis]# mkdir elastic [root@localhost redis]# cd elastic/ [root@localhost elastic]# wget https://artifacts.elastic.co/downloads/beats/metricbeat/metricbeat-5.6.14-linux-x86_64.tar.gz
- 編寫 docker-compose.yml 文件,或者使用 rz 命令上傳
version: '3.0' services: elasticsearch: image: elasticsearch:5.6.14 ports: - 9200:9200 - 9300:9300 kibana: image: kibana:5.6.14 ports: - 5601:5601 links: - elasticsearch
- 安裝 docker-compose 工具
官方地址:https://github.com/docker/compose/releases
//安裝 sudo curl -L https://github.com/docker/compose/releases/download/1.24.0-rc1/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose //添加執行權限 sudo chmod +x /usr/local/bin/docker-compose //測試安裝結果 docker-compose
- 編譯(會根據 docker-compose.yml 配置文件進行自動下載)
docker-compose up --build
- 查看docker鏡像
[root@localhost ~]# docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 0ea66ecacfa6 kibana:5.6.14 "/docker-entrypoin..." 8 minutes ago Up 8 minutes 0.0.0.0:5601->5601/tcp elastic_kibana_1 a6e1ea5adadf elasticsearch:5.6.14 "/docker-entrypoin..." 8 minutes ago Up 8 minutes 0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp elastic_elasticsearch_1
可以看到kibana與elasticasearch都已安裝完成
- 查看kibana,端口5601
- 查看elasticsearch,端口9200、9300
- 安裝與啟動 metricbeat
先進行解壓
//解壓 [root@localhost elastic]# tar -xzvf metricbeat-5.6.14-linux-x86_64.tar.gz //重命名解壓文件 [root@localhost elastic]# mv metricbeat-5.6.14-linux-x86_64 metricbeat [root@localhost elastic]# cd metricbeat/ //刪除原有metricbeat.yml 文件並重新生成 [root@localhost metricbeat]# rm -rf metricbeat.yml [root@localhost metricbeat]# cp metricbeat.full.yml metricbeat.yml
修改 metricbeat.yml 配置文件
//Redis Module - module: redis metricsets: ["info", "keyspace"] enabled: true #period: 10s # Redis hosts hosts: ["0.0.0.0:6379"] //template # Set to false to disable template loading. template.enabled: true # Template name. By default the template name is metricbeat. template.name: "metricbeat" # Path to template file template.path: "${path.config}/metricbeat.template.json" # Overwrite existing template template.overwrite: false //dashboard dashboards.enabled:true
啟動Metricbeat
./metricbeat -e -c metricbeat.yml
- 配置Kibana
我們要配置kibana的索引,索引在metricbeat中的 metricbeat.yml 文件中的 template.name 的值metricbea,注意配置的時候要以*號結尾。時間過濾我們選擇時間戳
然后我們可以在DisCover中查看日志信息

我們可以通過dashboard來查看很多儀表盤信息

我們重點關注一下dashboard中的redis

我們也可以自己起創建模板來采集信息

23、【Cluster】讀寫分離架構搭建和twenproxy分布式緩存搭建介紹
23.1、初級的集群(多機部署)

23.1.1、redis的 master - replica 模式
緩存單機redis的讀寫壓力。
- master: 負責寫 和 少量的讀
- slave: 負責讀
解決的問題
- 一定能力的高可用
- 分攤讀寫壓力
23.1.2、搭建
- slaveof 命令實現(重啟之后將會取消)
寫入庫IP:192.168.43.62
讀取庫IP:192.168.132.128
在讀取庫上進行slaveof操作
slaveof 192.168.43.62 6379
這時候在寫入庫寫入,即可在讀取庫讀取

- replicaof 命令實現(重啟之后不會取消,配置文件)
寫入庫IP:192.168.43.62
讀取庫IP:192.168.132.128
修改讀取庫配置文件 redis.conf
# replicaof <masterip> <masterport> replicaof 192.168.43.62 6379

23.1.3、SDK實現
- write
View Codeclass Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.43.62:6379,192.168.132.128:6379"); var db = redis.GetDatabase(0); for (int i = 0; i < int.MaxValue; i++) { try { db.StringSet(i.ToString(), ""); Console.WriteLine($"{i}處理結束"); } catch (Exception ex) { Console.WriteLine(ex.Message); Thread.Sleep(10); } } Console.ReadKey(); } }
主服務掛掉會拋出異常
- read
View Codeclass Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.43.62:6379,192.168.132.128:6379"); var db = redis.GetDatabase(0); for (int i = 0; i < int.MaxValue; i++) { try { db.StringGet(i.ToString()); Console.WriteLine($"{i}處理結束"); } catch (Exception ex) { Console.WriteLine(ex.Message); Thread.Sleep(10); } } Console.ReadKey(); } }
主服務掛掉一個依然可以讀
23.2、redis + twenproxy 模式(負載均衡模式)
twenproxy官方地址:https://github.com/twitter/twemproxy (相當於web nginx)

在主服務器上進行下載安裝
//下載 [root@localhost data]# wget https://doc-0o-08-docs.googleusercontent.com/docs/securesc/ldrgc4v8l6s4mmt863tsvf16k3mhbph4/uqjpged7j8k0kucos05i7pvj52n3fv21/1549418400000/15183709450983481674/04994642584830366907/0B6pVMMV5F5dfb1YwcThnaVZXbjg?e=download //解壓 [root@localhost data]# tar -xzvf nutcracker-0.4.1.tar.gz //重命名 [root@localhost data]# mv nutcracker-0.4.1 twemproxy [root@localhost data]# cd twemproxy/
編譯,編譯到/data/proxy文件夾下
[root@localhost twemproxy]# ./configure -prefix=/data/proxy
執行make進行編譯
[root@localhost twemproxy]# make & make install
編譯完成之后我們可以到 /data/proxy/sbin/ 下面的 nutcracker ,我們可以看一下 nutcracker 的幫助類
[root@localhost sbin]# ./nutcracker --help This is nutcracker-0.4.1 Usage: nutcracker [-?hVdDt] [-v verbosity level] [-o output file] [-c conf file] [-s stats port] [-a stats addr] [-i stats interval] [-p pid file] [-m mbuf size] Options: -h, --help : this help -V, --version : show version and exit -t, --test-conf : test configuration for syntax errors and exit -d, --daemonize : run as a daemon -D, --describe-stats : print stats description and exit -v, --verbose=N : set logging level (default: 5, min: 0, max: 11) -o, --output=S : set logging file (default: stderr) -c, --conf-file=S : set configuration file (default: conf/nutcracker.yml) -s, --stats-port=N : set stats monitoring port (default: 22222) -a, --stats-addr=S : set stats monitoring ip (default: 0.0.0.0) -i, --stats-interval=N : set stats aggregation interval in msec (default: 30000 msec) -p, --pid-file=S : set pid file (default: off) -m, --mbuf-size=N : set size of mbuf chunk in bytes (default: 16384 bytes)
然后我們看一下配置文件模板,在 /data/twemproxy/conf/ 文件夾下面的 nutcracker.yml 文件中
alpha: listen: 127.0.0.1:22121 hash: fnv1a_64 distribution: ketama auto_eject_hosts: true redis: true server_retry_timeout: 2000 server_failure_limit: 1 servers: - 127.0.0.1:6379:1 beta: listen: 127.0.0.1:22122 hash: fnv1a_64 hash_tag: "{}" distribution: ketama auto_eject_hosts: false timeout: 400 redis: true servers: - 127.0.0.1:6380:1 server1 - 127.0.0.1:6381:1 server2 - 127.0.0.1:6382:1 server3 - 127.0.0.1:6383:1 server4 gamma: listen: 127.0.0.1:22123 hash: fnv1a_64 distribution: ketama timeout: 400 backlog: 1024 preconnect: true auto_eject_hosts: true server_retry_timeout: 2000 server_failure_limit: 3 servers: - 127.0.0.1:11212:1 - 127.0.0.1:11213:1 delta: listen: 127.0.0.1:22124 hash: fnv1a_64 distribution: ketama timeout: 100 auto_eject_hosts: true server_retry_timeout: 2000 server_failure_limit: 1 servers: - 127.0.0.1:11214:1 - 127.0.0.1:11215:1 - 127.0.0.1:11216:1 - 127.0.0.1:11217:1 - 127.0.0.1:11218:1 - 127.0.0.1:11219:1 - 127.0.0.1:11220:1 - 127.0.0.1:11221:1 - 127.0.0.1:11222:1 - 127.0.0.1:11223:1 omega: listen: /tmp/gamma hash: hsieh distribution: ketama auto_eject_hosts: false servers: - 127.0.0.1:11214:100000 - 127.0.0.1:11215:1
然后我們在 /data/proxy/sbin/ 下面參照 nutcracker.yml 文件編寫自定義配置文件 kp.yml
lpha: listen: 192.168.132.130:22121 hash: fnv1a_64 distribution: ketama auto_eject_hosts: true redis: true server_retry_timeout: 2000 server_failure_limit: 1 servers: - 192.168.132.130:6379:1 - 192.168.132.129:6379:2
然后啟動(-d后台啟動,-c配置文件)
[root@localhost sbin]# ./nutcracker -d -c kp.yml [root@localhost sbin]# netstat -tlnp Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp 0 0 192.168.132.130:22121 0.0.0.0:* LISTEN 61912/./nutcracker tcp 0 0 0.0.0.0:22222 0.0.0.0:* LISTEN 61912/./nutcracker tcp 0 0 0.0.0.0:111 0.0.0.0:* LISTEN 1/systemd tcp 0 0 0.0.0.0:6000 0.0.0.0:* LISTEN 7196/X tcp 0 0 192.168.122.1:53 0.0.0.0:* LISTEN 7643/dnsmasq tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 6974/sshd tcp 0 0 127.0.0.1:631 0.0.0.0:* LISTEN 6976/cupsd tcp6 0 0 :::111 :::* LISTEN 1/systemd tcp6 0 0 :::6000 :::* LISTEN 7196/X tcp6 0 0 :::22 :::* LISTEN 6974/sshd tcp6 0 0 ::1:631 :::* LISTEN 6976/cupsd
然后分別啟動兩台redis(1192.168.132.130、192.168.132.129),進行測試
[root@bogon redis]# ./redis-cli -h 192.168.132.130 -p 22121 192.168.132.130:22121> set username jack OK 192.168.132.130:22121> set password 12345 OK 192.168.132.130:22121> set email 786744873@qq.com OK
效果

23.3、SDK實現
class Program { static void Main(string[] args) { var conf = new ConfigurationOptions() { Proxy = Proxy.Twemproxy }; conf.EndPoints.Add("192.168.132.130:22121"); //proxy 服務器 ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(conf); var db = redis.GetDatabase(0); for (int i = 0; i < int.MaxValue; i++) { try { var info = db.StringSet(i.ToString(), i.ToString()); Console.WriteLine($"{i} {info}處理結束"); Thread.Sleep(100); } catch (Exception ex) { Console.WriteLine(ex.Message); Thread.Sleep(10); } } Console.ReadKey(); } }
效果

24、【Cluster】搭建Redis的高可用模式sentinel介紹和搭建sdk實戰
24.1、sentinel(哨兵機制)
master-slave 模式下master掛掉的問題解決,master 掛掉了,我希望這個 master -slave 模式還可以讀寫。所以引入議會機制 master - slave 的高可用問題,讓這個機構去投票選舉出一個slave 作為 master。

流程:
- 如果master掛掉了
-
當一般以上的sentinel(觀察員)都認為master掛掉了。那么(觀察員)就要推選出一個 “正組長”, 由 “正組長” 根據 slave的優先級 選舉出一個最合適的 slave 作為 master
- sentinel讓 其他slave 就會自動與master進行同步,作為新的master 的slave
-
掛掉的master 繼續 受到觀察員的監視。。。當老的master重新啟動,將作為新的master的slave。
24.2、搭建
主:192.168.132.129:6379
從:192.168.132.130:6379
哨兵:192.168.132.131
- 現將主從建立 replicaof 主從關系,然后分別啟動
- 在哨兵機上面的 /data/ 文件夾下創建 sentinelnel 文件夾,並在 sentinelnel 文件夾下創建三個哨兵文件夾
View Code[root@localhost data]# ls redis redis-5.0.3.tar.gz [root@localhost data]# mkdir sentinelnel [root@localhost data]# cd sentinelnel/ [root@localhost sentinelnel]# mkdir s1 [root@localhost sentinelnel]# mkdir s2 [root@localhost sentinelnel]# mkdir s3 [root@localhost sentinelnel]# ls s1 s2 s3
- 將 redis 文件夾下面的 sentinel.conf 與 redis/src 文件夾下面的分別拷貝到 /data/sentinelnel/s1/ 、 /data/sentinelnel/s2/ 、 /data/sentinelnel/s3/ 文件夾下
[root@localhost redis]# cp ./sentinel.conf ../sentinelnel/s1/ [root@localhost redis]# cp ./sentinel.conf ../sentinelnel/s2/ [root@localhost redis]# cp ./sentinel.conf ../sentinelnel/s3/ [root@localhost redis]# cd src [root@localhost src]# cp ./redis-sentinel ../../sentinelnel/s1/ [root@localhost src]# cp ./redis-sentinel ../../sentinelnel/s2/ [root@localhost src]# cp ./redis-sentinel ../../sentinelnel/s3/
- 分別修改s1、s2、s3下面的配置文件(默認端口號26379,所以我們設置s1:26379、s2:26380、s3:26381)
//s1=>sentinel.conf port 26379 sentinel monitor mymaster 192.168.132.129 6379 2 //有2個哨兵認為主節點下線了,才進行重新選舉 //s2=>sentinel.conf port 26380 sentinel monitor mymaster 192.168.132.129 6379 2 //s3=>sentinel.conf port 26381 sentinel monitor mymaster 192.168.132.129 6379 2
- 分別使用命令啟動三個哨兵
[root@localhost s1]# ./redis-sentinel ./sentinel.conf [root@localhost s2]# ./redis-sentinel ./sentinel.conf [root@localhost s3]# ./redis-sentinel ./sentinel.conf ... 72173:X 07 Feb 2019 10:09:26.046 # Sentinel ID is a4b0184b08224fccf8c9eb9d8073a6197bb15fcc 72173:X 07 Feb 2019 10:09:26.046 # +monitor master mymaster 192.168.132.129 6379 quorum 2 72173:X 07 Feb 2019 10:09:26.047 * +slave slave 192.168.132.130:6379 192.168.132.130 6379 @ mymaster 192.168.132.129 6379 72173:X 07 Feb 2019 10:09:26.883 * +sentinel sentinel bee674ea947117df4af66228229a6c565a3e051b 192.168.132.131 26379 @ mymaster 192.168.132.129 6379 //26379哨兵 72173:X 07 Feb 2019 10:09:26.996 * +sentinel sentinel 14998349ae4f22ea83e22a9f847db2385009ddba 192.168.132.131 26380 @ mymaster 192.168.132.129 6379 //26380哨兵
為什么哨兵機制可以檢測到其他哨兵?
[root@localhost redis]# ./redis-cli 127.0.0.1:6379> pubsub channels 1) "__sentinel__:hello"
由此我們可以看到在主redis中建立了sentinel通道,其他哨兵通過檢測該通道來進行檢測其他哨兵
- 測試成果
我們可以在主redis上使用 info 命令查看當前redis是的 role
//192.168.132.129 # Replication role:master //192.168.132.130 # Replication role:slave
我們可以將192.168.132.129上的redis服務暫停掉,哨兵30秒內會檢測到變化,然后重新選舉
[root@localhost s1]# ./redis-sentinel ./sentinel.conf 71856:X 07 Feb 2019 10:00:43.022 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo 71856:X 07 Feb 2019 10:00:43.022 # Redis version=5.0.3, bits=64, commit=00000000, modified=0, pid=71856, just started 71856:X 07 Feb 2019 10:00:43.022 # Configuration loaded 71856:X 07 Feb 2019 10:00:43.023 * Increased maximum number of open files to 10032 (it was originally set to 1024). _._ _.-``__ ''-._ _.-`` `. `_. ''-._ Redis 5.0.3 (00000000/0) 64 bit .-`` .-```. ```\/ _.,_ ''-._ ( ' , .-` | `, ) Running in sentinel mode |`-._`-...-` __...-.``-._|'` _.-'| Port: 26379 | `-._ `._ / _.-' | PID: 71856 `-._ `-._ `-./ _.-' _.-' |`-._`-._ `-.__.-' _.-'_.-'| | `-._`-._ _.-'_.-' | http://redis.io `-._ `-._`-.__.-'_.-' _.-' |`-._`-._ `-.__.-' _.-'_.-'| | `-._`-._ _.-'_.-' | `-._ `-._`-.__.-'_.-' _.-' `-._ `-.__.-' _.-' `-._ _.-' `-.__.-' 71856:X 07 Feb 2019 10:00:43.025 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128. 71856:X 07 Feb 2019 10:00:43.026 # Sentinel ID is bee674ea947117df4af66228229a6c565a3e051b 71856:X 07 Feb 2019 10:00:43.026 # +monitor master mymaster 192.168.132.129 6379 quorum 2 71856:X 07 Feb 2019 10:00:43.028 * +slave slave 192.168.132.130:6379 192.168.132.130 6379 @ mymaster 192.168.132.129 6379 71856:X 07 Feb 2019 10:06:39.686 * +sentinel sentinel 14998349ae4f22ea83e22a9f847db2385009ddba 192.168.132.131 26380 @ mymaster 192.168.132.129 6379 71856:X 07 Feb 2019 10:09:28.105 * +sentinel sentinel a4b0184b08224fccf8c9eb9d8073a6197bb15fcc 192.168.132.131 26381 @ mymaster 192.168.132.129 6379 71856:X 07 Feb 2019 12:10:54.893 # +new-epoch 1 71856:X 07 Feb 2019 12:10:54.895 # +vote-for-leader a4b0184b08224fccf8c9eb9d8073a6197bb15fcc 1 71856:X 07 Feb 2019 12:10:54.904 # +sdown master mymaster 192.168.132.129 6379 71856:X 07 Feb 2019 12:10:54.962 # +odown master mymaster 192.168.132.129 6379 #quorum 3/2 71856:X 07 Feb 2019 12:10:54.962 # Next failover delay: I will not start a failover before Thu Feb 7 12:16:55 2019 71856:X 07 Feb 2019 12:10:55.704 # +config-update-from sentinel a4b0184b08224fccf8c9eb9d8073a6197bb15fcc 192.168.132.131 26381 @ mymaster 192.168.132.129 6379 71856:X 07 Feb 2019 12:10:55.704 # +switch-master mymaster 192.168.132.129 6379 192.168.132.130 6379 71856:X 07 Feb 2019 12:10:55.705 * +slave slave 192.168.132.129:6379 192.168.132.129 6379 @ mymaster 192.168.132.130 6379 71856:X 07 Feb 2019 12:11:25.779 # +sdown slave 192.168.132.129:6379 192.168.132.129 6379 @ mymaster 192.168.132.130 6379
我們通過 info 命令查看之前的從節點(192.168.132.130),發現已經變成了主節點

這時我們繼續啟動原來的暫停的主節點,發現已經變成了從節點
72173:X 07 Feb 2019 12:17:36.159 * +convert-to-slave slave 192.168.132.129:6379 192.168.132.129 6379 @ mymaster 192.168.132.130 6379

24.3、sdk演示
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.132.129:6379,192.168.132.130:6379"); var db = redis.GetDatabase(0); for (int i = 0; i < int.MaxValue; i++) { try { var info = db.StringSet(i.ToString(), i.ToString()); Console.WriteLine($"{i} {info}處理結束"); Thread.Sleep(100); } catch (Exception ex) { Console.WriteLine(ex.Message); Thread.Sleep(10); } } Console.ReadKey(); } }
25、【Cluster】cluster集群搭建和之前集群缺陷分析
25.1、目前我們知道的集群
- tweenproxy
解決了數據的均攤,但是單點壓力容易壓力過大。(容量的問題)
-> redis1
client -> proxy -> redis2
-> redis3 - master + slave + sentinel
雖然解決了高可用,實現master - slave角色的切換,但還沒有解決數據均攤
25.2、cluster模型
- 高可用
sentinel 集成到 master 里面去。 - 數據均攤
16384個slot (一致性hash的算法),一個master分攤了 5600的slot。
25.3、集群搭建
官方文檔:https://redis.io/topics/cluster-tutorial
本次搭建采用一台機器多個端口進行搭建,現預計端口開放如下
| master | 6379 | 6380 | 6381 |
| slave | 6382 | 6383 | 6384 |
我們先准備6台redis服務(將redis-serve、redis.conf拷貝下來),並修改配置文件如下
/cluster/6379/redis-server /cluster/6380/redis-server /cluster/6381/redis-server /cluster/6382/redis-server /cluster/6383/redis-server /cluster/6384/redis-server /cluster/6379/redis.conf bind 0.0.0.0 port 6379 protected-mode no cluster-enabled yes cluster-config-file nodes-6379.conf /cluster/6380/redis.conf bind 0.0.0.0 port 6380 protected-mode no cluster-enabled yes cluster-config-file nodes-6380.conf /cluster/6381/redis.conf bind 0.0.0.0 port 6381 protected-mode no cluster-enabled yes cluster-config-file nodes-6381.conf /cluster/6382/redis.conf bind 0.0.0.0 port 6382 protected-mode no cluster-enabled yes cluster-config-file nodes-6382.conf /cluster/6383/redis.conf bind 0.0.0.0 port 6383 protected-mode no cluster-enabled yes cluster-config-file nodes-6383.conf /cluster/6384/redis.conf bind 0.0.0.0 port 6384 protected-mode no cluster-enabled yes cluster-config-file nodes-6384.conf
然后使用 filezilla 工具將cluster文件夾拷貝到 /data 文件夾下

[root@localhost ~]# cd /data/ [root@localhost data]# ls cluster redis redis-5.0.3.tar.gz [root@localhost data]# ls cluster redis redis-5.0.3.tar.gz [root@localhost data]# cd cluster/ [root@localhost cluster]# ls 6379 6380 6381 6382 6383 6384 [root@localhost cluster]# cd 6379/ [root@localhost 6379]# ls redis.conf redis-server
然后修改權限並啟動
[root@localhost 6379]# chmod 777 ./redis-server && ./redis-server ./redis.conf [root@localhost 6380]# chmod 777 ./redis-server && ./redis-server ./redis.conf [root@localhost 6381]# chmod 777 ./redis-server && ./redis-server ./redis.conf [root@localhost 6382]# chmod 777 ./redis-server && ./redis-server ./redis.conf [root@localhost 6383]# chmod 777 ./redis-server && ./redis-server ./redis.conf [root@localhost 6384]# chmod 777 ./redis-server && ./redis-server ./redis.conf
查看是否啟動成功
[root@localhost ~]# netstat -tlnp Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp 0 0 0.0.0.0:6379 0.0.0.0:* LISTEN 85807/./redis-serve tcp 0 0 0.0.0.0:6380 0.0.0.0:* LISTEN 85846/./redis-serve tcp 0 0 0.0.0.0:6381 0.0.0.0:* LISTEN 85851/./redis-serve tcp 0 0 0.0.0.0:6382 0.0.0.0:* LISTEN 85864/./redis-serve tcp 0 0 0.0.0.0:6383 0.0.0.0:* LISTEN 85869/./redis-serve tcp 0 0 0.0.0.0:111 0.0.0.0:* LISTEN 1/systemd tcp 0 0 0.0.0.0:6384 0.0.0.0:* LISTEN 85875/./redis-serve
創建集群(cluster-replicas表示每個master后面跟一個slave)
./redis-cli --cluster create 192.168.132.129:6379 192.168.132.129:6380 192.168.132.129:6381 192.168.132.129:6382 192.168.132.129:6383 192.168.132.129:6384 --cluster-replicas 1
啟動效果
[root@localhost redis]# ./redis-cli --cluster create 192.168.132.129:6379 192.168.132.129:6380 192.168.132.129:6381 192.168.132.129:6382 192.168.132.129:6383 192.168.132.129:6384 --cluster-replicas 1 >>> Performing hash slots allocation on 6 nodes... Master[0] -> Slots 0 - 5460 Master[1] -> Slots 5461 - 10922 Master[2] -> Slots 10923 - 16383 Adding replica 192.168.132.129:6382 to 192.168.132.129:6379 Adding replica 192.168.132.129:6383 to 192.168.132.129:6380 Adding replica 192.168.132.129:6384 to 192.168.132.129:6381 >>> Trying to optimize slaves allocation for anti-affinity [WARNING] Some slaves are in the same host as their master M: a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5 192.168.132.129:6379 //主 slots:[0-5460] (5461 slots) master M: a13cd2f8005286fd7bed260f1463b8cf5da1b91f 192.168.132.129:6380 //主 slots:[5461-10922] (5462 slots) master M: 56f79b7711110f2a6cc9bd52d5a218345ab23a77 192.168.132.129:6381 //主 slots:[10923-16383] (5461 slots) master S: e243acc761d12f1a0f1d5e05c28a2d2a6b7b9db9 192.168.132.129:6382 //從 replicates 56f79b7711110f2a6cc9bd52d5a218345ab23a77 S: dc13912412a5c8f38a7ee24234aad47aa269c593 192.168.132.129:6383 //從 replicates a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5 S: b408b915098d0a4725a6bce2ce375e5f9d690bdf 192.168.132.129:6384 //從 replicates a13cd2f8005286fd7bed260f1463b8cf5da1b91f Can I set the above configuration? (type 'yes' to accept): yes >>> Nodes configuration updated >>> Assign a different config epoch to each node >>> Sending CLUSTER MEET messages to join the cluster Waiting for the cluster to join ......... >>> Performing Cluster Check (using node 192.168.132.129:6379) M: a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5 192.168.132.129:6379 //主 slots:[0-5460] (5461 slots) master 1 additional replica(s) M: 56f79b7711110f2a6cc9bd52d5a218345ab23a77 192.168.132.129:6381 //主 slots:[10923-16383] (5461 slots) master 1 additional replica(s) S: b408b915098d0a4725a6bce2ce375e5f9d690bdf 192.168.132.129:6384 //從 slots: (0 slots) slave replicates a13cd2f8005286fd7bed260f1463b8cf5da1b91f S: dc13912412a5c8f38a7ee24234aad47aa269c593 192.168.132.129:6383 //從 slots: (0 slots) slave replicates a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5 M: a13cd2f8005286fd7bed260f1463b8cf5da1b91f 192.168.132.129:6380 //主 slots:[5461-10922] (5462 slots) master 1 additional replica(s) S: e243acc761d12f1a0f1d5e05c28a2d2a6b7b9db9 192.168.132.129:6382 //從 slots: (0 slots) slave replicates 56f79b7711110f2a6cc9bd52d5a218345ab23a77 [OK] All nodes agree about slots configuration. >>> Check for open slots... >>> Check slots coverage... [OK] All 16384 slots covered.
使用 cluster nodes 命令查看集群狀態
[root@localhost redis]# ./redis-cli 127.0.0.1:6379> cluster nodes 56f79b7711110f2a6cc9bd52d5a218345ab23a77 192.168.132.129:6381@16381 master - 0 1549521186000 3 connected 10923-16383 a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5 192.168.132.129:6379@16379 myself,master - 0 1549521183000 1 connected 0-5460 b408b915098d0a4725a6bce2ce375e5f9d690bdf 192.168.132.129:6384@16384 slave a13cd2f8005286fd7bed260f1463b8cf5da1b91f 0 1549521187004 6 connected dc13912412a5c8f38a7ee24234aad47aa269c593 192.168.132.129:6383@16383 slave a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5 0 1549521184000 5 connected a13cd2f8005286fd7bed260f1463b8cf5da1b91f 192.168.132.129:6380@16380 master - 0 1549521185000 2 connected 5461-10922 e243acc761d12f1a0f1d5e05c28a2d2a6b7b9db9 192.168.132.129:6382@16382 slave 56f79b7711110f2a6cc9bd52d5a218345ab23a77 0 1549521185997 4 connected
高可用問題演示
現在我們將6379關閉,可以看到其他的redis全部監聽到了6379端口關閉
85846:M 07 Feb 2019 14:39:51.059 * Marking node a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5 as failing (quorum reached). 85846:M 07 Feb 2019 14:39:51.060 # Cluster state changed: fail 85846:M 07 Feb 2019 14:39:51.968 # Failover auth granted to dc13912412a5c8f38a7ee24234aad47aa269c593 for epoch 7 85846:M 07 Feb 2019 14:39:52.010 # Cluster state changed: ok
這時候我們查看6383信息,可以看到該slave已經變成了master

這時候查看cluster信息,可以看到目前3個master,2個slave
127.0.0.1:6383> cluster nodes a13cd2f8005286fd7bed260f1463b8cf5da1b91f 192.168.132.129:6380@16380 master - 0 1549521863000 2 connected 5461-10922 dc13912412a5c8f38a7ee24234aad47aa269c593 192.168.132.129:6383@16383 myself,master - 0 1549521864000 7 connected 0-5460 e243acc761d12f1a0f1d5e05c28a2d2a6b7b9db9 192.168.132.129:6382@16382 slave 56f79b7711110f2a6cc9bd52d5a218345ab23a77 0 1549521865027 4 connected b408b915098d0a4725a6bce2ce375e5f9d690bdf 192.168.132.129:6384@16384 slave a13cd2f8005286fd7bed260f1463b8cf5da1b91f 0 1549521864021 6 connected a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5 192.168.132.129:6379@16379 master,fail - 1549521573625 1549521572519 1 disconnected 56f79b7711110f2a6cc9bd52d5a218345ab23a77 192.168.132.129:6381@16381 master - 0 1549521863013 3 connected 10923-16383
這時候我們重新啟動6379,並查看查看cluster信息,發現已經恢復成slave
127.0.0.1:6383> cluster nodes a13cd2f8005286fd7bed260f1463b8cf5da1b91f 192.168.132.129:6380@16380 master - 0 1549521999113 2 connected 5461-10922 dc13912412a5c8f38a7ee24234aad47aa269c593 192.168.132.129:6383@16383 myself,master - 0 1549522000000 7 connected 0-5460 e243acc761d12f1a0f1d5e05c28a2d2a6b7b9db9 192.168.132.129:6382@16382 slave 56f79b7711110f2a6cc9bd52d5a218345ab23a77 0 1549521999000 4 connected b408b915098d0a4725a6bce2ce375e5f9d690bdf 192.168.132.129:6384@16384 slave a13cd2f8005286fd7bed260f1463b8cf5da1b91f 0 1549522001129 6 connected a9ef70b01534cae6bed200ea6ba3f0c73ff9d1f5 192.168.132.129:6379@16379 slave dc13912412a5c8f38a7ee24234aad47aa269c593 0 1549521997098 7 connected 56f79b7711110f2a6cc9bd52d5a218345ab23a77 192.168.132.129:6381@16381 master - 0 1549522000121 3 connected 10923-16383
數據均攤問題演示
以集群的方式連接服務6382,在這里我們連接master,方便清除數據庫(使用-c參數),可以清晰的看到存儲到了哪台服務器的哪個slot(槽位)
[root@localhost redis]# ./redis-cli -c -p 6380 127.0.0.1:6380> flushall OK 127.0.0.1:6380> set username jack -> Redirected to slot [14315] located at 192.168.132.129:6381 OK 192.168.132.129:6381> set password 12345 -> Redirected to slot [9540] located at 192.168.132.129:6380 OK 192.168.132.129:6380> set email 786744873@qq.com OK
25.4、SDK實現
class Program { static void Main(string[] args) { ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.132.129:6379,192.168.132.129:6380,192.168.132.129:6381,192.168.132.129:6382,192.168.132.129:6383,192.168.132.129:6384"); var db = redis.GetDatabase(0); for (int i = 0; i < int.MaxValue; i++) { try { var info = db.StringSet(i.ToString(), i.ToString()); Console.WriteLine($"{i} {info}處理結束"); Thread.Sleep(100); } catch (Exception ex) { Console.WriteLine(ex.Message); Thread.Sleep(10); } } Console.ReadKey(); } }
效果展示

節后語,堅持寫完真的不容易,大年初一都在寫。謝謝大家,新年快樂
