Redis 6
課程來源尚硅谷Redis 6課程
Redis 是典型的 NoSQL 數據庫,支持多種數據結構類型。設計思想是:單線程+多路IO復用技術

上圖的例子說明:內存中只有一個單線程,無需進行線程切換等操作,保證了redis在內存處理中的效率,同時使用多個socket鏈接復用,一旦需要哪個鏈接的數據准備就緒之后,就將這個線程與這個鏈接進行IO操作。總結一句話就是:通過一個線程,進行撥開關的方式,來同時傳輸多個IO流。
redis官網:https://redis.io/download
Redis 是一個開源的 key-value 存儲系統。
和 Memcached 類似,它支持存儲的 value 類型相對更多,包括 string、list、set、zset、sorted set、hash。
這些數據類型都支持 push/pop、add/remove 及取交集並集和差集及更豐富的操作,而且這些操作都是原子性的。
在此基礎上,Redis 支持各種不同方式的排序。
與 memcached一樣,為了保證效率,數據都是緩存在內存中。
區別的是 Redis 會周期性的把更新的數據寫入磁盤或者把修改操作寫入追加的記錄文件。
並且在此基礎上實現了master-slave (主從)同步。
單線程 + IO 多路復用。
安裝和啟動
-
windows下:
-
linux下:
安裝 C 語言的編譯環境
yum install centos-release-scl scl-utils-build yum install -y devtoolset-8-toolchain scl enable devtoolset-8 bash
通過 wget 下載
wget https://download.redis.io/releases/redis-6.2.6.tar.gz // 下載路徑:/opt
解壓至當前目錄
tar -zxvf redis-6.2.6.tar.gz
解壓完成后進入目錄
cd redis-6.2.6
在當前目錄下執行 make
make && make install
默認安裝在
/usr/local/bin
![]()
redis-benchmark:性能測試工具,可以在自己本子運行,看看自己本子性能如何 redis-check-aof:修復有問題的AOF文件,rdb和aof后面講 redis-check-dump:修復有問題的dump.rdb文件 redis-sentinel:Redis集群使用 redis-server:Redis服務器啟動命令 redis-cli:客戶端,操作入口
前台啟動:/usr/local/bin 目錄下啟動 redis
redis-server(前台啟動)
后台啟動:
安裝 redis 的目錄 /opt/redis-6.2.6 中將 redis.conf 復制到任意一個文件夾下
cp redis.conf /etc/redis.conf // 將redis.conf復制到/etc/下
修改 /etc/redis.conf 配置文件
vim redis.conf # daemonize no 修改為 daemonize yes
![]()
/usr/local/bin 目錄下啟動 redis
redis-server /etc/redis.conf
關閉 redis
- kill 進程:kill掉進程號
- 命令 shutdown:redis-cli shutdown
默認端口號:6379
NoSQL數據庫
web2.0時代,訪問量太大,CPUI和內存有巨大壓力,如果還是按照單個服務器進行響應的話,顯然性能是不夠的,於是引出分布式架構的服務,也就是多台服務器來響應請求,使用過NoSQL數據庫進行解決。
NoSQL數據庫的作用:
-
解決 CPU 及內存壓力
進行分布式服務的時候,可能同一台電腦的請求是由不同的服務器進行響應的,那么如何存放session?也就是session的共享問題。
-
解決 IO 壓力
當做高速緩存使用,緩解訪問數據庫的壓力
NoSQL數據庫的特點:
NoSQL( NoSQL = Not Only SQL ),意即 “不僅僅是 SQL” ,泛指非關系型的數據庫。
NoSQL 不依賴業務邏輯方式存儲,而以簡單的 key-value 模式存儲。因此大大的增加了數據庫的擴展能力。
- 不遵循 SQL 標准。
- 不支持 ACID。
- 遠超於 SQL 的性能。
適用於的場景
- 對數據高並發的讀寫;
- 海量數據的讀寫;
- 對數據高可擴展性的。
不適用的場景
- 需要事務支持;
- 基於 sql 的結構化查詢存儲,處理復雜的關系,需要即席查詢。
- 用不着sql的情況以及用了sql也不行的情況下,考慮NoSQL
常見的 NoSQL 數據庫
-
Redis
-
MongoDB
大數據時代常用的數據庫類型
-
行式數據庫
-
列式數據庫
配置文件
**Redis 的配置文件位於 Redis 安裝目錄下,文件名為 ** redis.conf (Windows 名為 redis.windows.conf)。
-
端口號:6379
-
默認16個數據庫,類似數組下標從0開始,初始默認使用0號庫
- select 8:切換到8號庫
- 所有庫擁有統一的密碼
- dbsize:查看當前數據庫的key的數量
- flushdb:清空當前庫
- flushall:通殺全部庫
Units 單位
單位,配置大小單位,開頭定義了一些基本的度量單位,只支持 bytes,不支持 bit。
大小寫不敏感。

INCLUDES 包含
包含,多實例的情況可以把公用的配置文件提取出來。

NETWORK 網絡
網絡相關配置。
bind
默認情況
bind=127.0.0.1
只能接受本機的訪問請求。不寫的情況下,無限制接受任何 ip 地址的訪問。
生產環境肯定要寫你應用服務器的地址,服務器是需要遠程訪問的,所以需要將其注釋掉。
如果開啟了protected-mode,那么在沒有設定 bind ip 且沒有設密碼的情況下,Redis 只允許接受本機的響應。
![]()
protected-mode
將本機訪問保護模式設置 no。
![]()
port
端口號,默認 6379。
![]()
tcp-backlog
設置 tcp 的 backlog,backlog 其實是一個連接隊列,backlog 隊列總和 $=$ 未完成三次握手隊列 $+$ 已經完成三次握手隊列。
在高並發環境下你需要一個高 backlog 值來避免慢客戶端連接問題。
![]()
timeout
一個空閑的客戶端維持多少秒會關閉,0 表示關閉該功能。即永不關閉。
![]()
tcp-keepalive
對訪問客戶端的一種心跳檢測,每個 n 秒檢測一次。(默認是300s檢測一次)
單位為秒,如果設置為 0,則不會進行 Keepalive 檢測,建議設置成 60。
![]()
GENERAL
通用。
daemonize
是否為后台進程,設置為 yes。
守護進程,后台啟動。
![]()
pidfile
存放 pid 文件的位置,每個實例會產生一個不同的 pid 文件。
![]()
loglevel
指定日志記錄級別,Redis 總共支持四個級別:debug、verbose、notice、warning,默認為 notice。
![]()
logfile
日志文件名稱。
![]()
database
設定庫的數量 默認16,默認數據庫為 0,可以使用
SELECT <dbid>
命令在連接上指定數據庫 id。![]()
SECURITY
安全。
訪問密碼的查看、設置和取消。
在命令中設置密碼,只是臨時的。重啟 redis 服務器,密碼就還原了。
永久設置,需要在配置文件中進行設置。
LIMITS
限制。
maxclients
設置 redis 同時可以與多少個客戶端進行連接。
默認情況下為 10000 個客戶端。
如果達到了此限制,redis 則會拒絕新的連接請求,並且向這些連接請求方發出 max number of clients reached 以作回應。
![]()
maxmemory
建議必須設置,否則,將內存占滿,造成服務器宕機。
設置 redis 可以使用的內存量。一旦到達內存使用上限,redis 將會試圖移除內部數據,移除規則可以通過 maxmemory-policy 來指定。
如果 redis 無法根據移除規則來移除內存中的數據,或者設置了不允許移除,那么 redis 則會針對那些需要申請內存的指令返回錯誤信息,比如 SET、LPUSH 等。
但是對於無內存申請的指令,仍然會正常響應,比如 GET 等。如果你的 redis 是主 redis( 說明你的 redis 有從 redis ),那么在設置內存使用上限時,需要在系統中留出一些內存空間給同步隊列緩存,只有在你設置的是“不移除”的情況下,才不用考慮這個因素。
![]()
maxmemory-policy
volatile-lru:使用 LRU 算法移除 key,只對設置了過期時間的鍵(最近最少使用)。
allkeys-lru:在所有集合 key 中,使用 LRU 算法移除 key。
volatile-random:在過期集合中移除隨機的 key,只對設置了過期時間的鍵。
allkeys-random:在所有集合 key 中,移除隨機的 key。
volatile-ttl:移除那些 TTL 值最小的 key,即那些最近要過期的 key。
noeviction:不進行移除。針對寫操作,只是返回錯誤信息。
![]()
maxmemory-samples
設置樣本數量,LRU 算法和最小 TTL 算法都並非是精確的算法,而是估算值,所以你可以設置樣本的大小,redis 默認會檢查這么多個 key 並選擇其中 LRU 的那個。
一般設置 3 到 7 的數字,數值越小樣本越不准確,但性能消耗越小。
![]()
常用五大基本數據類型
前言:Redis鍵(key操作)
keys *
:查看當前庫所有 key
set key value
:添加一組 k-v
exists key
:判斷某個 key 是否存在
type key
:查看你的 key 是什么類型
del key
:刪除指定的 key 數據(直接刪除,而不是異步刪除)
unlink key
:根據 value 選擇非阻塞刪除,僅將 keys 從 keyspace 元數據中刪除,真正的刪除會在后續異步操作
expire key 10
:為給定的 key 設置過期時間
ttl key
:查看還有多少秒過期,-1表示永不過期,-2表示已過期
select
:命令切換數據庫
dbsize
:查看當前數據庫的 key 的數量
flushdb
:清空當前庫
flushall
:通殺全部庫
字符串(String)
String 類型是最基本的類型,是二進制安全的。意味着 Redis 的 string 可以包含任何數據,比如 jpg 圖片或者序列化的對象,只要數據能夠存儲為字符串類型,redis都能通過k-v的形式存儲。
String 類型是 Redis 最基本的數據類型,一個 Redis 中字符串 value 最多可以是 512M。
命令操作:
set <key> <value>
:添加鍵值對,也包含對一個key下的value進行覆蓋
get <key>
:查詢對應鍵值
append <key> <value>
:將給定的 <value> 追加到原值的末尾,返回添加后的字符串長度值
strlen <key>
:獲得值的長度
setnx <key> <value>
:只有在 key 不存在時,才能設置 key 的值(注意與set命令的區別)
incr <key>
:將 key 中儲存的數字值增 1,只能對數字值操作,如果為空,新增值為 1(具有原子性)
decr <key>
:將 key 中儲存的數字值減 1,只能對數字值操作,如果為空,新增值為 -1
incrby/decrby <key><步長>
:將 key 中儲存的數字值增減。自定義步長
mset <key1> <value1> <key2> <value2>
:同時設置一個或多個 key-value 對
mget <key1> <key2> <key3>...
:同時獲取一個或多個 value
msetnx <key1> <value1> <key2> <value2>...
:同時設置一個或多個 key-value 對,當且僅當所有給定 key 都不存在(原子性操作,有一個失敗那么就全部失敗)
getrange <key><起始位置><結束位置>
:獲得值的范圍(將字符串理解成一個數組,按數組序號進行取子字符串的操作)
setrange <key><起始位置><value>
:用 <value> 覆寫 <key> 所儲存的字符串值
setex <key><過期時間><value>
:設置鍵值的同時,設置過期時間,單位秒。(可以通過ttl key的命令,查看過期時間)
getset <key><value>
:以新換舊,設置了新值同時獲得舊值。
原子性
所謂 原子 操作是指不會被線程調度機制打斷的操作;
這種操作一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另一個線程)。
-
在單線程中, 能夠在單條指令中完成的操作都可以認為是"原子操作",因為中斷只能發生於指令之間。(而redis是單線程的,所以redis中的操作是不會被打斷的)
-
在多線程中,不能被其它進程(線程)打斷的操作就叫原子操作。
Redis 單命令的原子性主要得益於 Redis 的單線程。
數據結構
內部結構實現上類似於 Java 的 ArrayList,采用預分配冗余空間的方式來減少內存的頻繁分配,字符串大小小於1M時,擴容都是加倍現有的空間大小,如果空間大小一旦超過1M,每次擴容只會多擴容1M的空間,注意字符串最大長度是512M。

列表(List)
單鍵多值
Redis 列表是簡單的字符串列表,按照插入順序排序。你可以添加一個元素到列表的頭部(左邊)或者尾部(右邊)。
它的底層實際是個雙向鏈表,對兩端的操作性能很高,通過索引下標的操作中間的節點性能會較差。

lpush/rpush <key><value1><value2><value3> ....
: 從左邊/右邊插入一個或多個值。lpush k1 v1 v2 v3 (頭插法) lrange k1 0 -1 輸出:v3 v2 v1 rpush k1 v1 v2 v3 (尾插法) rrange k1 0 -1 輸出:v1 v2 v3
lpop/rpop <key>
:從左邊/右邊吐出一個值。值在鍵在,值光鍵亡。
rpoplpush <key1><key2>
:從 <key1> 列表右邊吐出一個值,插到 <key2> 列表左邊。
lrange <key><start><stop>
:按照索引下標獲得元素(從左到右)
lrange <key> 0 -1
:左邊第一個,-1右邊第一個,(0 -1表示獲取所有)
lindex <key><index>
:按照索引下標獲得元素(從左到右,從0開始到index索引值)
llen <key>
:獲得列表長度
linsert <key> before/after <value> <newvalue>
:在 <value> 的前面/后面插入 <newvalue> 插入值
lrem <key><n><value>
:從左邊刪除 n 個 value(從左到右)
lset <key><index><value>
:將列表 key 下標為 index 的值替換成 value
數據結構
List 的數據結構為快速鏈表 quickList。
-
首先在列表元素較少的情況下會使用一塊連續的內存存儲,這個結構是 ziplist,也即是壓縮列表。
- 它將所有的元素緊挨着一起存儲,分配的是一塊連續的內存。
-
當數據量比較多的時候才會改成 quicklist。
-
因為普通的鏈表需要的附加指針空間太大,會比較浪費空間。比如這個列表里存的只是 int 類型的數據,結構上還需要兩個額外的指針 prev 和 next。
-
-
Redis 將鏈表和 ziplist 結合起來組成了 quicklist。也就是將多個 ziplist 使用雙向指針串起來使用。這樣既滿足了快速的插入刪除性能,又不會出現太大的空間冗余。quicklist結構圖如下:

Set(集合)
Set 對外提供的功能與 List 類似列表的功能。(一個key,對應一個set集合)
-
特殊之處在於 Set 是可以 自動排重 的
-
當需要存儲一個列表數據,又不希望出現重復數據時,Set 是一個很好的選擇,並且 Set 提供了判斷某個成員是否在一個 Set 集合內的重要接口,這個也是 List 所不能提供的。
-
Redis 的 Set 是 String 類型的無序集合。它底層其實是一個 value 為 null 的 hash 表,所以添加,刪除,查找的復雜度都是 O(1)。
一個算法,隨着數據的增加,執行時間的長短,如果是 O(1),數據增加,查找數據的時間不變。
命令操作:
sadd <key><value1><value2> .....
:將一個或多個 member 元素加入到集合 key 中,已經存在的 member 元素將被忽略
smembers <key>
:取出該集合的所有值。
sismember <key><value>
:判斷集合 <key> 是否為含有該 <value> 值,有返回 1,沒有返回 0
scard<key>
:返回該集合的元素個數。
srem <key><value1><value2> ....
:刪除集合中的某個元素
spop <key>
:隨機從該集合中吐出一個值
srandmember <key><n>
:隨機從該集合中取出 n 個值,不會從集合中刪除
smove <source><destination>value
:把集合中一個值從一個集合移動到另一個集合
sinter <key1><key2>
:返回兩個集合的交集元素
sunion <key1><key2>
:返回兩個集合的並集元素
sdiff <key1><key2>
:返回兩個集合的差集元素(key1 中的,不包含 key2 中的)
數據結構
Set 數據結構是字典,字典是用哈希表實現的。
Hash(哈希)
Redis hash 是一個鍵值對集合。
Redis hash 是一個 String 類型的 field 和 value 的映射表,hash 特別適合用於存儲對象。( 可以理解為Java中的Map<String,Object> )
hset <key><field><value>
:給 <key> 集合中的 <field> 鍵賦值 <value>
- hset user:1001 id 1,redis允許key帶 :,可以理解為key值是 user:1001,這是為了存很多對象時,用來區分不同對象
- hset user:1001 name zhanggsan,給 user:1001 這個key對應的對象中,添加一個name字段,值為zhangsan
hget <key1><field>
:從 <key1> 集合 <field> 取出 value
- hget user:1001 id,打印結果為“1”
- hget user:1001 name,打印結果為“zhangsan”
hmset <key1><field1><value1><field2><value2>...
: 批量設置 hash 的值(新版本這項命令,已經可以用hset進行實現了)
hexists <key1><field>
:查看哈希表 key 中,給定域 field 是否存在(1:存在,0:不存在)
hkeys <key>
:列出該 hash 集合的所有 field
- hkeys user:1001,列出這個key下所有的字段
hvals <key>
:列出該 hash 集合的所有 value
hincrby <key> <field> <increment>
:為哈希表 key 中的域 field 的值加上增量 1 -1
hsetnx <key> <field> <value>
:將哈希表 key 中的域 field 的值設置為 value ,當且僅當域 field 不存在
數據結構
Hash 類型對應的數據結構是兩種:ziplist(壓縮列表),hashtable(哈希表)。
當 field-value 長度較短且個數較少時,使用 ziplist,否則使用 hashtable。
Zset(有序集合)
Redis 有序集合 zset 與普通集合 set 非常相似,是一個沒有重復元素的字符串有序集合。
不同之處是有序集合的每個成員都關聯了一個評分(score),這個評分(score)被用來按照從最低分到最高分的方式排序集合中的成員。集合的成員是唯一的,但是評分可以是重復的。
因為元素是有序的,所以可以很快的根據評分(score)或者次序(position)來獲取一個范圍的元素。
訪問有序集合的中間元素也是非常快的,因此能夠使用有序集合作為一個沒有重復成員的智能列表。
操作命令:
zadd <key> <score1> <value1> <score2> <value2> …
:將一個或多個 member 元素及其 score 值加入到有序集 key 當中
zrange <key> <start> <stop> [WITHSCORES]
:返回有序集 key 中,下標在 <start><stop> 之間的元素( 0 -1 ,表示所有)
- 當帶上 WITHSCORES 時,可以讓分數一起和值返回到結果集
zrangebyscore key min max [withscores] [limit offset count]
:返回有序集 key 中,所有 score 值介於 min 和 max 之間(包括等於 min 或 max )的成員。有序集成員按 score 值遞增(從小到大)次序排列。
zrevrangebyscore key max min [withscores] [limit offset count]
:同上,改為從大到小排列
zincrby <key> <increment> <value>
:為元素的 score 加上增量
zrem <key> <value>
:刪除該集合下,指定值的元素
zcount <key> <min> <max>
:統計該集合,分數區間內的元素個數
zrank <key> <value>
:返回該值在集合中的排名,從 0 開始。
數據結構
SortedSet(zset)是 Redis 提供的一個非常特別的數據結構,一方面它等價於 Java 的數據結構 Map<String, Double>,可以給每一個元素 value 賦予一個權重 score,另一方面它又類似於 TreeSet,內部的元素會按照權重 score 進行排序,可以得到每個元素的名次,還可以通過 score 的范圍來獲取元素的列表。
zset 底層使用了兩個數據結構
-
hash,hash 的作用就是關聯元素 value 和權重 score(通過key值,能夠找到關聯的value和score),保障元素 value 的唯一性,可以通過元素 value 找到相應的 score 值
-
跳躍表,跳躍表的目的在於給元素 value 排序,根據 score 的范圍獲取元素列表

Redis6 新數據類型
Bitmaps

- 將 Bitmaps 數據類型理解為一個數組,每個單位只存儲0和1
實例:

- getbit
:獲取Bitmaps中某個偏移量的值 - bitcount
[start end]:統計字符串被設置為1的bit數,start end可以指定范圍,且可以使用負數值,例如:-1表示最后一個位,-2表示倒數第二個位置(從0開始....) - bitop and(or/not/xor)
[key...]:復合操作,可以做多個Bitmaps的交集、並集等操作,並將結果保存在destkey中 - 例如:bitop and users:1 users:2 users:3,將users:2與users:3的交集結果存放到key為users:1的值中
Bitmaps與set對比



HyperLogLog



命令操作:
-
pfadd
[element...]:添加指定元素到HyperLogLog中,執行命令后,若基數發生變化則返回1,否則返回0 -
pfcount
[key...]:計算基數值 -
pfmerge
[其中,sourcekey可以為多個]:將多個HyperLogLog數據類型進行合並,例子比如將月活躍用戶數與日活躍用戶數進行合並,就可以使用pfcount進行統計基數
Geospatial






Redis的發布與訂閱
Redis 發布訂閱( pub/sub )是一種消息通信模式:發送者( pub )發送消息,訂閱者( sub )接收消息。
Redis 客戶端可以訂閱任意數量的頻道。

發布者可以建立許多個頻道進行消息的發送(如上圖頻道1、頻道2、頻道3),供訂閱者進行接收和監聽消息。
- 客戶端可以訂閱頻道

- 當給這個頻道發布消息后,消息就會發送給訂閱的客戶端

發布訂閱命令行實現
- 打開一個客戶端訂閱channel1
- subscribe channel1
- 打開另一個客戶端,給channel1發布消息hello
-
publish channel1 hello
-
返回的數字表示:訂閱者的數量、
- 打開第一個客戶端可以看到發送的信息

注:發布的消息如果沒有持久化,那么在訂閱的客戶端是接收不到消息的,只能收到訂閱后發布的消息
事務和鎖機制
Redis 事務是一個單獨的隔離操作:事務中的所有命令都會序列化、按順序地執行。事務在執行的過程中,不會被其他客戶端發送來的命令請求所打斷。
Redis 事務的主要作用就是串聯多個命令防止別的命令插隊。
Multi、Exec、Discard

Multi
Exec
Discard
從輸入 Multi 命令開始,輸入的命令都會依次進入命令隊列中,但不會執行,直到輸入 Exec 后,Redis 會將之前的命令隊列中的命令依次執行。
組隊的過程中可以通過 Discard 來放棄組隊。
Redis的事務就是:先使用multi進行命令的添加(組隊過程),組隊完畢后,使用exec進行執行,這個執行過程不能被其他命令打斷(相當於事務執行的過程),如果要中止,就使用discard命令(類似於回滾操作)。
-
multi開啟組隊,輸入命令;組隊成功,exec執行事務,執行完畢,事務結束
-
放棄組隊
-
組隊中有命令錯誤,不會執行
-
組隊中不報錯,執行時報錯
悲觀鎖
悲觀鎖(Pessimistic Lock),即每次去拿數據的時候都認為有其他線程會修改,所以每次在拿數據的時候都會上鎖,這樣其他線程想要拿到這個數據就會被 block 直到釋放鎖后,成功拿到鎖。(效率低,操作之前先上鎖)
樂觀鎖
樂觀鎖(Optimistic Lock),即每次去拿數據的時候都認為其他線程不會修改,所以不會上鎖,但是在更新的時候會判斷,在此期間有沒有其他線程去更新這個數據,可以使用版本號控制等機制。
樂觀鎖適用於多讀的應用類型,這樣可以提高吞吐量。
Redis 就是利用這種 check-and-set 機制實現事務的。
Watch、unwatch
在執行 multi 之前,先執行 watch key1 [key.....],可以監視一個(或多個 )key 。如果在事務執行之前,這個 key 被其他命令所改動,那么事務將被打斷。(類似於上鎖,一旦發現watch的這個key被修改了,那么自己的exec操作就會中斷)
取消 WATCH 命令對所有 key 的監視:如果在執行 WATCH 命令之后,EXEC 命令或 DISCARD 命令先被執行,那么就不需要再執行 UNWATCH 。
事務三特性
-
單獨的隔離操作
事務中的所有命令都會序列化、按順序地執行。事務在執行的過程中,不會被其他客戶端發送來的命令請求所打斷。
-
沒有隔離級別的概念
隊列中的命令沒有提交之前都不會實際被執行,因為事務提交前任何指令都不會被實際執行。
-
不保證原子性
事務中如果有一條命令執行失敗,其后的命令仍然會被執行,沒有回滾 。
模擬秒殺
基本實現
核心的邏輯代碼:
public class SecKill_redis {
public static void main(String[] args) {
Jedis jedis =new Jedis("192.168.242.110",6379);
System.out.println(jedis.ping());
jedis.close();
}
//秒殺過程
public static boolean doSecKill(String uid,String prodid) throws IOException {
//1 uid和prodid非空判斷
if(uid == null || prodid == null){
return false;
}
//2 連接redis
Jedis jedis =new Jedis("192.168.xx.xxx",6379);
//3 拼接key
// 3.1 庫存key
String kcKey = "sk:"+prodid+":qt";
// 3.2 秒殺成功用戶key
String userKey = "sk:"+prodid+":user";
//4 獲取庫存,如果庫存null,秒殺還沒有開始
String kc = jedis.get(kcKey);
if(kc == null){
System.out.println("秒殺還沒開始,請稍等");
jedis.close();
return false;
}
// 5 判斷用戶是否重復秒殺操作
if(jedis.sismember(userKey, uid)){
System.out.println("每個用戶只能秒殺成功一次,請下次再來");
jedis.close();
return false;
}
//6 判斷如果商品數量,庫存數量小於1,秒殺結束
if(Integer.parseInt(kc) < 1){
System.out.println("秒殺結束,請下次參與");
jedis.close();
return false;
}
//7 秒殺過程
//7.1庫存-1
jedis.decr(kcKey);
//7.2 把秒殺成功的用戶添加到清單里面
jedis.sadd(userKey,uid);
System.out.println("用戶" + uid + "秒殺成功");
jedis.close();
return true;
}
}
使用ab工具模擬並發以及暴露出的問題
CentOS 6 默認安裝
CentOS 7 手動安裝(yum -y install httpd-tools)
-
通過ab命令發送並發操作
ab -n 2000 -c 200 -k -p ~/postfile -T application/x-www-form-urlencoded http://192.168.0.43:8080/Seckill/doseckill
-n:測試會話中所執行的請求個數
-c:一次產生的請求個數
-
並發暴露出來的問題
-
會出現超賣問題:賣完了商品,但還存在繼續購買,即庫存變為負數
-
解決方案:使用樂觀鎖,進行版本控制(redis事務+watch)
代碼修改:
//秒殺過程 public static boolean doSecKill(String uid,String prodid) throws IOException { //1 uid和prodid非空判斷 if(uid == null || prodid == null){ return false; } //2 連接redis //Jedis jedis =new Jedis("192.168.xx.xxx",6379); //通過連接池獲取連接redis的對象 JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance(); Jedis jedis = jedisPoolInstance.getResource(); //3 拼接key // 3.1 庫存key String kcKey = "sk:"+prodid+":qt"; // 3.2 秒殺成功用戶key String userKey = "sk:"+prodid+":user"; //監視庫存 jedis.watch(kcKey); //4 獲取庫存,如果庫存null,秒殺還沒有開始 String kc = jedis.get(kcKey); if(kc == null){ System.out.println("秒殺還沒開始,請稍等"); jedis.close(); return false; } // 5 判斷用戶是否重復秒殺操作 if(jedis.sismember(userKey, uid)){ System.out.println("每個用戶只能秒殺成功一次,請下次再來"); jedis.close(); return false; } //6 判斷如果商品數量,庫存數量小於1,秒殺結束 if(Integer.parseInt(kc) < 1){ System.out.println("秒殺結束,請下次參與"); jedis.close(); return false; } //7 秒殺過程 //使用事務 Transaction multi = jedis.multi(); //組隊操作 multi.decr(kcKey); multi.sadd(userKey,uid); //執行 List<Object> results = multi.exec(); if(results == null || results.size()==0) { System.out.println("秒殺失敗了...."); jedis.close(); return false; } // //7.1庫存-1 // jedis.decr(kcKey); // //7.2 把秒殺成功的用戶添加到清單里面 // jedis.sadd(userKey,uid); System.out.println("用戶" + uid + "秒殺成功"); jedis.close(); return true; }
-
-
連接超時問題
- 解決方案:采用連接池
// 創建工具類 public class JedisPoolUtil { private static volatile JedisPool jedisPool = null; private JedisPoolUtil() { } public static JedisPool getJedisPoolInstance() { if (null == jedisPool) { synchronized (JedisPoolUtil.class) { if (null == jedisPool) { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(200); poolConfig.setMaxIdle(32); poolConfig.setMaxWaitMillis(100*1000); poolConfig.setBlockWhenExhausted(true); poolConfig.setTestOnBorrow(true); // ping PONG jedisPool = new JedisPool(poolConfig, "192.168.xx.xxx", 6379, 60000 ); } } } return jedisPool; } public static void release(JedisPool jedisPool, Jedis jedis) { if (null != jedis) { jedisPool.returnResource(jedis); } } }
修改代碼,主要是針對前面基本實現中的核心代碼,對獲取redis對象進行修改:
//2 連接redis //Jedis jedis =new Jedis("192.168.xx.xxx",6379); //通過連接池獲取連接redis的對象 JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance(); Jedis jedis = jedisPoolInstance.getResource();
-
商品遺留問題,即秒殺已經結束了,卻還有商品庫存
- 解決方案:使用Lua腳本
Lua 是一個小巧的腳本語言,Lua腳本可以很容易的被C/C++ 代碼調用,也可以反過來調用C/C++的函數,Lua並沒有提供強大的庫,一個完整的Lua解釋器不過200k,所以Lua不適合作為開發獨立應用程序的語言,而是作為嵌入式腳本語言。很多應用程序、游戲使用LUA作為自己的嵌入式腳本語言,以此來實現可配置性、可擴展性。
將復雜的或者多步的redis操作,寫為一個腳本,一次提交給redis執行,減少反復連接redis的次數。提升性能。
Lua腳本是類似redis事務,有一定的原子性,不會被其他命令插隊,可以完成一些redis事務性的操作。
但是注意redis的lua腳本功能,只有在Redis 2.6以上的版本才可以使用。
利用Lua腳本淘汰用戶,解決超賣問題。
redis 2.6版本以后,通過lua腳本解決爭搶問題,實際上是redis 利用其單線程的特性,用任務隊列的方式解決多任務並發問題。
-
Redis中兩種持久化機制
兩種持久化機制,RDB和AOF,簡單來說就是存數據使用的哪種機制,默認是使用RDB(Redis DataBase)。
RDB
Redis DataBase
在指定的時間間隔內將內存中的 數據集快照 寫入磁盤,即 Snapshot 快照,恢復時是將快照文件直接讀到內存里。
周期性地進行持久化的操作

Redis 會單獨創建一個子進程(fork)來進行持久化。
底層執行過程:先將數據寫入到一個臨時文件中,待持久化過程完成后(同步過程完成),再將這個臨時文件內容覆蓋到 dump.rdb(持久化文件)。
- 整個過程中,主進程是不進行任何 IO 操作的,這就確保了極高的性能。
- 為什么要先同步到文件臨時區域而不直接同步到rdb中?若同步過程中發生異常情況中斷,不會導致數據庫中的數據發生損壞,待同步過程完成后,用臨時文件替代這個持久化的文件,保證了數據的完整性和一致性。
- 如果需要進行大規模數據的恢復,且對於數據恢復的完整性不是非常敏感,那 RDB 方式要比 AOF 方式更加的高效。
RDB 的缺點是最后一次持久化后的數據可能丟失。
Fork
-
Redis進程執行fork操作創建子進程,RDB持久化過程由子進程負責,完成后自動結束。阻塞只發生在fork階段,一般時間很短。
-
作用是復制一個與當前進程一樣的進程。新進程的所有數據(變量、環境變量、程序計數器等) 數值都和原進程一致,但是是一個全新的進程,並作為原進程的子進程
-
在 Linux 程序中,fork() 會產生一個和父進程完全相同的子進程,但子進程在此后多會 exec 系統調用,出於效率考慮,Linux 中引入了 寫時復制技術
-
一般情況父進程和子進程會共用同一段物理內存,只有進程空間的各段的內容要發生變化時,才會將父進程的內容復制一份給子進程
配置
dump 文件名字
在 redis.conf 中配置文件名稱,默認為 dump.rdb。
![]()
dump 保存位置
rdb 文件的保存路徑可以修改。默認為 Redis 啟動時命令行所在的目錄下。
![]()
stop-writes-on-bgsave-error
即當 redis 無法寫入磁盤,關閉 redis 的寫入操作。推薦yes
![]()
rdbcompression
持久化的文件是否進行壓縮存儲。
![]()
rdbchecksum
完整性的檢查,即數據是否完整性、准確性。
![]()
save
表示寫操作的次數。
格式:save 秒 寫操作次數
![]()
優點
- 適合大規模的數據恢復;
- 對數據完整性和一致性要求不高更適合使用;
- 節省磁盤空間;
- 恢復速度快。
- redis主進程會fork()一個子進程來處理所有保存工作,主進程不需要進行任何磁盤IO操作
缺點
- Fork 的時候,內存中的數據被克隆了一份,大致 2 倍的膨脹性需要考慮;
- 雖然 Redis 在 fork 時使用了寫時拷貝技術,但是如果數據龐大時還是比較消耗性能;
- 在備份周期在一定間隔時間做一次備份,所以如果還沒到達指定備份時間間隔的時候,Redis 意外 down 掉的話,就會丟失最后一次快照后的所有修改,數據發生了丟失。
AOF
Append Only File
以日志的形式來記錄每個寫操作(增量保存),將 Redis 執行過的所有寫指令記錄下來(讀操作不記錄), 只許追加文件但不可以改寫文件(可能是使用的redo和undo日志恢復?)。Redis 啟動之初會讀取該文件重新構建數據,換言之,如果 Redis 重啟就會根據日志文件的內容將寫指令從前到后執行一次,以完成數據的恢復工作。
一種使用追加方式記錄數據的方法
執行流程
-
客戶端的請求寫命令會被 append 追加到 AOF 緩沖區內;
-
AOF 緩沖區根據 AOF 持久化策略
[always,everysec,no]
將操作 sync 同步到磁盤的 AOF 文件中; -
AOF 文件大小超過重寫策略或手動重寫時,會對 AOF 文件 Rewrite 重寫,壓縮 AOF 文件容量;
-
Redis 服務重啟時,會重新 load 加載 AOF 文件中的寫操作達到數據恢復的目的。
AOF 和 RDB 同時開啟時,系統默認讀取 AOF 的數據(數據不會存在丟失)
配置
redis.conf中配置開啟aof,默認生成的aof配置文件為appendonly.aof,與rdb文件路徑一致(啟動路徑)。
AOF 默認不開啟 (RDB默認開啟)
![]()
- 若AOF和RDB同時開啟,系統默認讀取AOF的數據(數據不會存在丟失)。
文件名字
![]()
AOF 同步頻率設置
![]()
appendfsync always
始終同步,每次 Redis 的寫入都會立刻記入日志;
性能較差但數據完整性比較好。
appendfsync everysec
每秒同步,每秒記入日志一次,如果宕機,本秒的數據可能丟失。
appendfsync no
Redis 不主動進行同步,把同步時機交給操作系統。
Rewrite 壓縮
當 AOF 文件的大小超過所設定的閾值時,Redis 就會啟動 AOF 文件的內容壓縮,只保留可以恢復數據的最小指令集。可以使用命令 bgrewriteaof。
![]()
優點
- 備份機制更穩健,丟失數據概率更低;
- 可讀的日志文本,通過操作 AOF 穩健,可以處理誤操作。
缺點
- 比起 RDB 占用更多的磁盤空間(不僅記錄數據還要記錄操作);
- 恢復備份速度要慢;
- 每次讀寫都同步的話,有一定的性能壓力;
- 存在個別 Bug,造成不能恢復。
總結
官方推薦兩個都啟用。
如果對數據不敏感(允許數據有部分丟失),可以選單獨用 RDB。
不建議單獨用 AOF,因為可能會出現 Bug。
如果只是做純內存緩存,可以都不用。
主從復制
基本介紹
主機數據更新后根據配置和策略, 自動同步到備機的 master/slaver 機制,Master 以寫為主,Slaver 以讀為主,即主服務器承擔寫操作,復制的若干 從服務器 則承擔讀操作。

特點:
-
讀寫分離,性能擴展
-
容災快速恢復
- 某個從服務器發生故障,那么會快速切換到另一個從服務器中,不影響讀操作的進行
-
一主多從
- 只有一台主服務器,供其他從服務器進行復制
搭建一主兩從
- 創建文件目錄
/opt/etc
- 將 redis.conf 復制到當前目錄
cp /etc/redis.conf /opt/etc/
- 創建 3 個 redis.conf 配置文件
redis6379.conf
redis6380.conf
redis6381.conf
# redis6379.conf
include /opt/etc/redis.conf
pidfile /var/run/redis_6379.pid
port 6379
dbfilename dump6379.rdb
# redis6380.conf
include /opt/etc/redis.conf
pidfile /var/run/redis_6380.pid
port 6380
dbfilename dump6380.rdb
# redis6381.conf
include /opt/etc/redis.conf
pidfile /var/run/redis_6381.pid
port 6381
dbfilename dump6381.rdb
- 啟動 3 台 redis 服務器

- 查看主機運行情況
info replication

-
配從不配主
在從機中進行設置,成為誰的從機
slaveof <ip> <port>
# 成為某個實例的從服務器


- 再次查看主機運行情況

成功搭建。
一主二從
特點:
主機 6379,從機 6380 和 6381。
假設從機 6380 掛掉。(從機掛掉)
- 當6380重啟后,6380不再是6379的從機,而是作為新的master;(從機重啟后,不再是某個主機的從機,其自身就是一個主機)
- 當再次把6380作為6379的從機加入后,從機才會把數據從頭到尾復制。(從機重啟后,需要再輸入成為從機的指令)
假設主機 6379 掛掉。(主機掛掉)
- 6380和6381仍然是6379的從機,不會做任何事;(從機不會改變)
- 當6379重啟后,既然是主服務器。(主機重啟后,還是主機)
主從復制原理
完整版:
-
slave 啟動成功連接到 master 后會發送一個 sync 命令(同步命令)。
-
master 接到命令啟動后台的存盤進程,對數據進行持久化操作,同時收集所有接收到的用於修改數據集命令,在后台進程執行完畢之后,master 將傳送整個數據文件(rdb)到 slave,以完成一次完全同步。
-
當主服務進行寫操作后,和從服務器進行數據同步。
-
全量復制:而 slave 服務在接收到數據庫文件數據后,將其存盤並加載到內存中。
-
增量復制:master 繼續將新的所有收集到的修改命令依次傳給 slave,完成同步。
-
只要是重新連接 master,一次完全同步(全量復制)將被自動執行。
全量復制:是從機主動去請求主機進行同步操作,是一開始連接的時候
增量復制:主機進行一次寫操作之后,就主動同步從機
簡潔版:

薪火相傳

上一個 slave 可以是下一個 slave 的 master(從機是另一個從機的主機,並由這個擔任主機的從機,進行數據同步),slave 同樣可以接收其他 slave的連接和同步請求,那么該 slave 作為了鏈條中下一個的 master,可以有效減輕 master 的寫壓力,去中心化降低風險。
slaveof <ip> <port>
-
特點與一主二從類似
-
中途變更轉向:會清除之前的數據,重新建立拷貝最新的。
-
當某個擔任主機的 slave 宕機,其掛在后面的 slave 都沒法備份。
- 即當主機掛掉,從機還是從機,但是無法繼續寫數據。
反客為主
當一個 master 宕機后,后面的 slave 可以立刻升為 master,其后面的 slave 不用做任何修改。(需要手動完成,如果不手動執行的話,那么從機沒有任何動作,主機重新啟動后,還依舊是主機)
使用命令:將從機變為主機
slaveof no one
哨兵模式
基本介紹
反客為主的自動版,即能夠后台監控主機是否故障,如果故障了根據投票數自動將從庫轉換為主庫。
- 創建 sentinel.conf 文件
/opt/etc/sentinel.conf
- 配置哨兵
sentinel monitor mymaster 172.16.xx.xxx 6379 1
# mymaster:給監控對象起的服務器名稱
# 1:至少有多少個哨兵同意遷移的數量
- 啟動哨兵
redis-sentinel /opt/etc/sentinel.conf

主機掛掉,哨兵監控到之后,會按照選舉規則,從 從機 中選舉中產生新的主機,原來掛掉的主機會變成新主機的從機。

選舉規則
選擇條件依次為:
-
根據優先級別,slave-priority/replica-priority,優先選擇優先級靠前的。(越小優先級越高)
-
根據偏移量,優先選擇偏移量大的。(偏移量是指獲得原主機數據最全的)
-
若前兩個條件相同,那么選擇 runid 最小的,優先選擇最小的服務
- 每個redis實例啟動后,都會隨機生成一個40位的runid
復制延時
由於所有的寫操作都是先在 master 上操作,然后同步更新到 slave 上,所以從 master 同步到 slave 從機有一定的延遲,當系統很繁忙的時候,延遲問題會更加嚴重,slave 機器數量的增加也會使這個問題更加嚴重。
集群
基本介紹
容量不夠,redis 如何進行擴容?
並發寫操作, redis 如何分攤?
另外,主從模式、薪火相傳模式,主機宕機,導致 ip 地址發生變化,應用程序中配置需要修改對應的主機地址、端口等信息。
解決方法:
-
代理主機( 之前 )
-
無中心化集群配置( redis3.0 )
- 服務之間可以進行相互連通
- 任何一個服務模塊都可以作為集群的入口

什么是集群:
Redis 集群實現了對 Redis 的水平擴容,即啟動 N 個 Redis 節點,將整個數據庫分布存儲在這 N 個節點中,每個節點存儲總數據的 1/N 。
Redis 集群通過分區(partition)來提供一定程度的可用性(availability),即使集群中有一部分節點失效或者無法進行通訊, 集群也可以繼續處理命令請求。
搭建 Redis 集群
- 創建配置文件
# 以redis6379.conf為例
include /opt/etc/redis.conf
pidfile /var/run/redis_6379.pid # 更改
port 6379 # 更改
dbfilename dump6379.rdb # 更改
cluster-enabled yes # 打開集群模式
cluster-config-file nodes-6379.conf # 設置節點配置文件名稱,需要更改
cluster-node-timeout 15000 # 設置節點失聯事件,超過該時間(ms),集群自動進行主從切換

- 啟動

- 將 6 個節點合成一個集群
# 組合之前請確保所有redis實例啟動后,nodes-xxxx.conf文件都生成正常。

# 進入redis安裝目錄
/opt/redis-6.2.6/src
# 執行
redis-cli --cluster create --cluster-replicas 1 172.16.88.168:6379 172.16.88.168:6380 172.16.88.168:6381 172.16.88.168:6389 172.16.88.168:6390 172.16.88.168:6391

- 采用集群策略連接
redis-cli -c -p PORT
cluster nodes # 命令查看集群信息

問題
redis cluster 如何分配這六個節點?
一個集群至少要有三個主節點。
選項
--cluster-replicas 1
,表示希望為集群中的每個主節點創建一個從節點。分配原則盡量保證每個主數據庫運行在不同的 IP 地址,每個從庫和主庫不在一個 IP 地址上。保證出故障的時候,能夠有替換的,繼續提供服務。
![]()
什么是 slots?
插槽


一個 Redis 集群包含 16384 個插槽(hash slot), 數據庫中的每個鍵都屬於這 16384 個插槽的其中一個。
集群使用公式 CRC16(key) % 16384 來計算鍵 key 屬於哪個槽, 其中 CRC16(key) 語句用於計算鍵 key 的 CRC16 校驗和 。
集群中的每個節點負責處理一部分插槽,分擔壓力。 例如, 如果一個集群可以有主節點, 其中:
- 節點 A 負責處理 0 號至 5460 號插槽。
- 節點 B 負責處理 5461 號至 10922 號插槽。
- 節點 C 負責處理 10923 號至 16383 號插槽。
如何在集群中錄入值?
在 redis-cli 每次錄入、查詢鍵值,redis 都會計算出該 key 應該送往的插槽,如果不是該客戶端對應服務器的插槽,redis 會報錯,並告知應前往的 redis 實例地址和端口。
redis-cli 客戶端提供了 –c 參數實現自動重定向。
例如 redis-cli -c –p 6379 登入后,再錄入、查詢鍵值對可以自動重定向。
如何查詢集群中的值?
每個主機只能查詢自己范圍內部的插槽。
cluster keyslot <key>
:查詢某個 key 的 **slot **。
cluster countkeysinslot <slot>
:查詢某個 slot 是否有值。
CLUSTER GETKEYSINSLOT <slot><count>
:返回 count 個 slot 槽中的鍵。
集群故障恢復?
如果主節點下線?從節點能否自動升為主節點?注意:15 秒超時(一旦超時,從機升為主機)
![]()
- 當 6379 掛掉后,6389 成為新的主機。
- 集群中,主機掛掉后,從機會自動變成主節點,擔任主節點功能(有哨兵功能)
主節點恢復后,主從關系會如何?主節點回來變成從機。(類似於一主多從中的哨兵模式)
- 當 6379 重啟后,6379 成為 6389 的從機。
如果所有某一段插槽的 主、從節點 都宕掉,redis 服務是否還能繼續?
- 如果某一段插槽的主從都掛掉,而 cluster-require-full-coverage=yes,那么 ,整個集群都掛掉。
- 如果某一段插槽的主從都掛掉,而 cluster-require-full-coverage=no,那么,該插槽數據全都不能使用,也無法存儲。
redis.conf
中的參數cluster-require-full-coverage
優點
- 實現redis擴容;
- 使用插槽,分攤壓力;
- 無中心配置,相對簡單。
缺點
- 多鍵操作是不被支持的,但可以用組,不方便;
- 多鍵的 Redis 事務是不被支持的。lua 腳本不被支持;
- 由於集群方案出現較晚,很多公司已經采用了其他的集群方案,而代理或者客戶端分片的方案想要遷移至redis cluster,需要整體遷移而不是逐步過渡,復雜度較大。
Jedis (Java操作Redis)
基本操作
- 依賴
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
</dependency>
- 連接 Redis
public class JedisDemo {
public static void main(String[] args) {
// 創建Jedis對象
Jedis jedis = new Jedis("192.168.xx.xxx", 6379);
// 測試,能夠連接上的話,ping通,會返回一個值
String ping = jedis.ping();
System.out.println("連接成功:" + ping);
jedis.close();
}
}
注意:使用Jedis進行操作,需要對Redis的網絡相關配置文件進行修改:
- bind:默認是bind綁定本機,不進行修改的情況下,只能接受本機的訪問請求,不寫的情況下,能夠無限制接受任何ip地址的訪問。
- protected-mode:將本機訪問保護模式設置為 no
如果出現 connet timed out 錯誤,檢查兩塊,第一是否配置文件進行了修改,第二防火牆是否關閉。
Key
jedis.set("k1", "v1"); jedis.set("k2", "v2"); jedis.set("k3", "v3"); Set<String> keys = jedis.keys("*"); // 返回所有key System.out.println(keys.size()); for (String key : keys) { System.out.println(key); } System.out.println(jedis.exists("k1")); // 是否存在 System.out.println(jedis.ttl("k1")); // 過期時間 System.out.println(jedis.get("k1")); // 獲取key對應value值
String
jedis.mset("str1","v1","str2","v2","str3","v3"); System.out.println(jedis.mget("str1","str2","str3"));
List
// 可以使用lpush或者rpush添加k-v List<String> list = jedis.lrange("mylist",0,-1); for (String element : list) { System.out.println(element); }
Set
jedis.sadd("orders", "order01"); jedis.sadd("orders", "order02"); jedis.sadd("orders", "order03"); jedis.sadd("orders", "order04"); Set<String> smembers = jedis.smembers("orders"); for (String order : smembers) { System.out.println(order); } jedis.srem("orders", "order02");
Hash
jedis.hset("hash1","userName","lisi"); System.out.println(jedis.hget("hash1","userName")); Map<String,String> map = new HashMap<String,String>(); map.put("telphone","13810169999"); map.put("address","atguigu"); map.put("email","abc@163.com"); jedis.hmset("hash2",map); List<String> result = jedis.hmget("hash2", "telphone","email"); for (String element : result) { System.out.println(element); }
zset
jedis.zadd("zset01", 100d, "z3"); jedis.zadd("zset01", 90d, "l4"); jedis.zadd("zset01", 80d, "w5"); jedis.zadd("zset01", 70d, "z6"); Set<String> zrange = jedis.zrange("zset01", 0, -1); for (String e : zrange) { System.out.println(e); }
模擬驗證碼發送

代碼實現:
public class PhoneCode {
public static void main(String[] args) {
// 模擬驗證碼發送
// verifyCode("123456789");
getRedisCode("123456789", "123456");
}
// 1. 生成6位數字驗證碼
public static String getCode(){
Random random = new Random();
String code = "";
for (int i = 0; i < 6; i++) {
int nextInt = random.nextInt(10);
code += nextInt;
}
return code;
}
// 2. 每個手機每天只能發送三次驗證碼請求,驗證碼放到redis中,並設置過期時間
public static void verifyCode(String phone) {
// 連接redis
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 拼接key
// 手機發送次數的key
String countKey = "VerifyCode-" + phone + ":count";
// 驗證碼的key
String phoneKey = "VerifyCode-" + phone + ":code";
// 每個手機每天只能發送三次驗證碼
String count = jedis.get(countKey);
if(count == null) {
// 之前還沒發送過,這次是第一次發送,設置發送次數為1
jedis.setex(countKey, 24*60*60, "1"); // 設置過期時間為一天
} else if (Integer.parseInt(count) < 3) {
// 發送次數加1
jedis.incr(countKey);
} else if (Integer.parseInt(count) >= 3) {
// 發送已經有三次了,不能再發送了
System.out.println("今天發送驗證碼的次數已經達到三次,無法再發送!");
jedis.close(); // 關閉連接
return; // 不執行下面的代碼
}
// 驗證碼放到redis中
String code1 = getCode();
jedis.setex(phoneKey, 120, code1); // 設置驗證碼的過期時間為兩分鍾,會進行覆蓋
jedis.close();
}
// 3. 驗證碼校驗
public static void getRedisCode(String phone, String code){
// 連接redis
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 拼接key
// 驗證碼的key
String phoneKey = "VerifyCode-" + phone + ":code";
// 判斷
String codePhone = jedis.get(phoneKey);
if(codePhone.equals(code)) {
System.out.println("成功!");
} else {
System.out.println("失敗!");
}
jedis.close();
}
}
Jedis 主從復制
private static JedisSentinelPool jedisSentinelPool=null;
public static Jedis getJedisFromSentinel(){
if(jedisSentinelPool==null){
Set<String> sentinelSet=new HashSet<>();
sentinelSet.add("172.16.88.168:26379"); // 端口為sentinal
JedisPoolConfig jedisPoolConfig =new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(10); // 最大可用連接數
jedisPoolConfig.setMaxIdle(5); // 最大閑置連接數
jedisPoolConfig.setMinIdle(5); // 最小閑置連接數
jedisPoolConfig.setBlockWhenExhausted(true); // 連接耗盡是否等待
jedisPoolConfig.setMaxWaitMillis(2000); // 等待時間
jedisPoolConfig.setTestOnBorrow(true); // 取連接的時候進行測試
jedisSentinelPool=new JedisSentinelPool("mymaster",sentinelSet,jedisPoolConfig); // 服務主機名
return jedisSentinelPool.getResource();
}
else {
return jedisSentinelPool.getResource();
}
}
集群的 Jedis 開發
即使連接的不是主機,集群會自動切換主機存儲。主機寫,從機讀。
無中心化主從集群。無論從哪台主機寫的數據,其他主機上都能讀到數據。
public class JedisClusterTest {
public static void main(String[] args) {
// 創建對象
Set<HostAndPort> set = new HashSet<HostAndPort>();
set.add(new HostAndPort("172.16.xx.xxx",6379)); // 任何一個端口
JedisCluster jedisCluster = new JedisCluster(set);
// 操作
jedisCluster.set("k1", "v1");
System.out.println(jedisCluster.get("k1"));
jedisCluster.close();
}
}
SpringBoot整合Redis
- 依賴
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 連接池:spring2.X集成redis所需common-pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>
- 配置文件配置 Redis
#Redis服務器地址
spring.redis.host= ip地址
#Redis服務器連接端口
spring.redis.port=6379
#Redis數據庫索引(默認為0,一共有16個)
spring.redis.database= 0
#連接超時時間(毫秒)
spring.redis.timeout=1800000
#連接池最大連接數(使用負值表示沒有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待時間(負數表示沒限制)
spring.redis.lettuce.pool.max-wait=-1
#連接池中的最大空閑連接
spring.redis.lettuce.pool.max-idle=5
#連接池中的最小空閑連接
spring.redis.lettuce.pool.min-idle=0
- Redis 配置類(需要繼承 CachingConfigurerSupport)
@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setConnectionFactory(factory);
// key序列化方式
template.setKeySerializer(redisSerializer);
// value序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
// value hashmap序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer);
return template;
}
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
// 解決查詢緩存轉換異常的問題
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解決亂碼的問題),過期時間600秒
RedisCacheConfiguration config =
RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}
- 測試

應用問題解決(面試重點)
緩存穿透

現象
key 對應的數據在數據源並不存在,每次針對此 key 的請求從緩存獲取不到,請求都會壓到數據源,從而可能壓垮數據源。
redis查詢不到數據庫,出現了很多非正常url訪問。黑客攻擊就是通過查詢一個不存在的值,緩存里面沒有,那么就會查數據庫,大量類似的請求發生后,導致數據庫崩潰。若黑客利用此漏洞進行攻擊可能壓垮數據庫。
造成的條件:
- 應用服務器壓力變大,訪問請求增光
- redis 命中率下降(重點)
- 導致一直訪問查詢數據庫
服務器壓力變大,請求太多,導致redis緩存命中率開始下降,對數據庫的訪問越來越多,數據庫最終承受不住壓力,崩潰了。
如何解決
-
對空值緩存
如果一個查詢返回的數據為空(不管是數據是否不存在),仍然把這個空結果(null)進行緩存,設置空結果的過期時間會很短,最長不超過五分鍾。
-
設置可訪問的名單(白名單):
使用 bitmaps 類型定義一個可以訪問的名單,名單 id 作為 bitmaps 的偏移量,每次訪問和 bitmap 里面的 id 進行比較,如果訪問 id 不在 bitmaps 里面,進行攔截,則不允許訪問。
-
采用布隆過濾器
布隆過濾器(Bloom Filter)是1970年由布隆提出的。它實際上是一個很長的二進制向量(位圖)和一系列隨機映射函數(哈希函數)。(跟bitmaps類似,不過效率更高)
布隆過濾器可以用於檢索一個元素是否在一個集合中。它的優點是空間效率和查詢時間都遠遠超過一般的算法,缺點是有一定的誤識別率和刪除困難,命中率不一定高。
將所有可能存在的數據哈希到一個足夠大的 bitmaps 中,一個一定不存在的數據會被這個 bitmaps 攔截掉,從而避免了對底層存儲系統的查詢壓力。
-
進行實時監控
當發現 Redis 的命中率開始急速降低,需要排查訪問對象和訪問的數據,和運維人員配合,可以設置黑名單限制服務。
緩存擊穿

注意與緩存穿透的區別:
- 緩存穿透
- redis命中率下降,導致數據庫訪問量激增
- 緩存擊穿
- redis正常訪問,但某個熱點key突然失效,導致瞬間數據庫的訪問量激增
現象
key 對應的數據存在,但在 redis 中過期,此時若有大量並發請求過來,這些請求發現緩存過期一般都會從后端DB 加載數據並回設到緩存,這個時候大並發的請求可能會瞬間把后端 DB 壓垮。
- 數據庫訪問壓力瞬間增大
- redis 中沒有出現大量 key 過期,redis 正常運行(與緩存穿透的區別)
- 某個經常訪問的 key,即十分熱點的key,不停地被大量訪問,當這個key過期的瞬間,持續的高並發就擊穿了緩存,大量請求數據庫,導致數據庫奔潰
如何解決
-
預先設置熱門數據
在 redis 高峰訪問之前,把一些熱門數據提前存入到 redis 里面,加大這些熱門數據 key 的時長。
-
實時調整
現場監控哪些數據熱門,實時調整 key 的過期時長。
-
使用鎖
緩存雪崩

現象
key 對應的數據存在,但在 redis 中過期,此時若有大量並發請求過來,這些請求發現緩存過期后,一般都會從后端 DB 加載數據並回設到緩存,這個時候大並發的請求可能會瞬間把后端 DB 壓垮。
緩存雪崩與緩存擊穿的區別在於這里針對很多 key 緩存,前者則是某一個 key。
- 數據庫壓力變大
- 極少的時間段,查詢大量 key 的集中過期情況(大量key集中過期,而緩存擊穿是熱點key過期)
如何解決
-
構建多級緩存架構
nginx 緩存 + redis 緩存 + 其他緩存(ehcache等)
-
使用鎖或隊列:
用加鎖或者隊列的方式保證來保證不會有大量的線程對數據庫一次性進行讀寫,從而避免失效時大量的並發請求落到底層存儲系統上。不適用高並發情況。
-
設置過期標志更新緩存:
記錄緩存數據是否過期(設置提前量),快過期的時候,提前進行一個緩存。如果過期會觸發通知另外的線程在后台去更新實際 key 的緩存。
-
將緩存失效時間分散開:
比如我們可以在原有的失效時間基礎上增加一個隨機值,比如 1~5 分鍾隨機,這樣每一個緩存的過期時間的重復率就會降低,就很難引發集體失效的事件。
分布式鎖
基本介紹
隨着業務發展的需要,原單體單機部署的系統被演化成分布式集群系統后,由於分布式系統多線程、多進程並且分布在不同機器上,這將使原單機部署情況下的並發控制鎖策略失效,單純的Java API並不能提供分布式鎖的能力。為了解決這個問題就需要一種跨JVM的互斥機制來控制共享資源的訪問,這就是分布式鎖要解決的問題。
分布式鎖主流的實現方案:
- 基於數據庫實現分布式鎖
- 基於緩存(Redis等)
- 基於Zookeeper
每一種分布式鎖解決方案都有各自的優缺點:
- 性能:redis最高
- 可靠性:zookeeper最高
設置鎖以及過期時間
- 設置鎖的命令
SETNX KEY VALUE # 設置鎖
del key # 刪除鎖
- 給鎖設置過期時間
expire users 30 # 給users上鎖30s


- 優化:上鎖的同時設置過期時間
set key value nx ex time # nx 上鎖;ex 設置過期時間
- Java實現
@GetMapping("testLock")
public void testLock(){
//1獲取鎖,setne ,順便設置過期時間
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111",3,TimeUnit.SECONDS); // key, value, 過期時間,時間單位
//2獲取鎖成功、查詢num的值
if(lock){
Object value = redisTemplate.opsForValue().get("num");
//2.1判斷num為空return
if(StringUtils.isEmpty(value)){
return;
}
//2.2有值就轉成成int
int num = Integer.parseInt(value+"");
//2.3把redis的num加1
redisTemplate.opsForValue().set("num", ++num);
//2.4釋放鎖,del
redisTemplate.delete("lock");
}else{
//3獲取鎖失敗、每隔0.1秒再獲取
try {
Thread.sleep(100); // 休眠,等一會
testLock(); // 再去嘗試獲取鎖
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
分布式鎖產生的問題
使用UUID防止誤刪鎖

現象
a先上鎖后,在執行操作的過程中,服務器卡頓,而10秒過期后,b搶到鎖進行具體操作,然而此時a的服務器恢復正常,a繼續執行操作並結束,此時有一個釋放鎖的操作,那么此時釋放的鎖是b的鎖,這就是導致誤刪除鎖的現象發生。
解決方案

Java實現修改版:
@GetMapping("testLock")
public void testLock(){
// 設置UUID
String uuid = UUID.randomUUID().toString();
.....
if(lock){
...
// 判斷UUID值是否一樣
String lockUuid = (String)redisTemplate.opsForValue().get("lock");
if(uuid.equals(lockUuid)){ // UUID一樣時,才釋放鎖
//2.4釋放鎖,del
redisTemplate.delete("lock");
}
}else{
...
}
}
Lua保證刪除原子性
問題:刪除操作缺乏原子性,即uuid的比較操作和刪除操作不是原子操作

Java實現修改:
@GetMapping("testLockLua")
public void testLockLua() {
//1 聲明一個uuid ,將做為一個value 放入我們的key所對應的值中
String uuid = UUID.randomUUID().toString();
//2 定義一個鎖:lua 腳本可以使用同一把鎖,來實現刪除!
String skuId = "25"; // 訪問skuId 為25號的商品 100008348542
String locKey = "lock:" + skuId; // 鎖住的是每個商品的數據
// 3 獲取鎖
Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);
// 第一種: lock 與過期時間中間不寫任何的代碼。
// redisTemplate.expire("lock",10, TimeUnit.SECONDS);//設置過期時間
// 如果true
if (lock) {
// 執行的業務邏輯開始
// 獲取緩存中的num 數據
Object value = redisTemplate.opsForValue().get("num");
// 如果是空直接返回
if (StringUtils.isEmpty(value)) {
return;
}
// 不是空 如果說在這出現了異常! 那么delete 就刪除失敗! 也就是說鎖永遠存在!
int num = Integer.parseInt(value + "");
// 使num 每次+1 放入緩存
redisTemplate.opsForValue().set("num", String.valueOf(++num));
/*使用lua腳本來鎖*/
// 定義lua 腳本
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 使用redis執行lua執行
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
// 設置一下返回值類型 為Long
// 因為刪除判斷的時候,返回的0,給其封裝為數據類型。如果不封裝那么默認返回String 類型,
// 那么返回字符串與0 會有發生錯誤。
redisScript.setResultType(Long.class);
// 第一個要是script 腳本 ,第二個需要判斷的key,第三個就是key所對應的值。
redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
} else {
.....
}
}
總結
為了確保分布式鎖可用,我們至少要確保鎖的實現同時滿足以下四個條件:
-
互斥性。在任意時刻,只有一個客戶端能持有鎖。
-
不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖。
-
解鈴還須系鈴人。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了。
-
加鎖和解鎖必須具有原子性。