👉寫在前邊
- Redis入門的整合篇。本篇也算是把2021年redis留下來的坑填上去,重新整合了一翻,點擊這里,回顧我的2020與2021~一名大二后台練習生
NoSQL
NoSQL(NoSQL = Not Only SQL ),意即“不僅僅是SQL”,泛指非關系型的數據庫。
NoSQL 不依賴業務邏輯方式存儲,而以簡單的key-value模式存儲。因此大大的增加了數據庫的擴展能力。
- 不遵循SQL標准。
- 不支持ACID。
- 遠超於SQL的性能。
適用場景
- 對數據高並發的讀寫
- 海量數據的讀寫
- 對數據高可擴展性的
不適用場景
- 需要事務支持
- 基於sql的結構化查詢存儲,處理復雜的關系,需要即席查詢。
- (用不着sql的和用了sql也不行的情況,請考慮用NoSql)
概述
Redis(Remote Dictionary Server ),即遠程字典服務 !
是一個開源的使用ANSI C語言編寫、支持網絡、可基於內存亦可持久化的日志型、Key-Value數據庫,並提供多種語言的API。
- redis會周期性的把更新的數據寫入磁盤或者把修改操作寫入追加的記錄文件,並且在此基礎上實現了master-slave(主從)同步。
用途
1、內存存儲、持久化,內存中是斷電即失、所以說持久化很重要(rdb、aof)
2、效率高,可以用於高速緩存
3、發布訂閱系統
4、地圖信息分析
5、計時器、計數器(瀏覽量!)
特性
1、多樣的數據類型
2、持久化
3、集群
4、事務
歷史發展
一開始數據量很少,只需要單表處理讀和寫
90年代,一個基本的網站訪問量一般不會太大,單個數據庫完全足夠!
那個時候,更多的去使用靜態網頁 Html ~ 服務器根本沒有太大的壓力!
思考一下,這種情況下:整個網站的瓶頸是什么?
1、數據量如果太大、一個機器放不下了!
2、數據的索引 (B+ Tree),一個機器內存也放不下
3、訪問量(讀寫混合),一個服務器承受不了
Memcached(緩存)+ MySQL + 垂直拆分 (讀寫分離)
可以一台服務器負責寫,如何同步給前台服務器去讀?
緩存也解決讀的問題
網站80%的情況都是在讀,每次都要去查詢數據庫的話就十分的麻煩!所以說我們希望減輕數據的壓力,我們可以使用緩存來保證效率!
發展過程: 優化數據結構和索引--> 文件緩存(IO)---> Memcached(當時最熱門的技術!)
分庫分表,MySQL集群 + 水平拆分
閱讀量也不是實時寫到mysql的,是先到緩存,再一定時間統一寫進去
早些年MyISAM: 表鎖,十分影響效率!高並發下就會出現嚴重的鎖問題
轉戰Innodb:行鎖
慢慢的就開始使用分庫分表來解決寫的壓力! MySQL 在哪個年代推出了表分區!這個並沒有多少公司使用!
MySQL 的集群,很好滿足那個年代的所有需求!
Linux安裝
1、官網下載安裝包!
2、解壓Redis的安裝包到你要的目錄!
tar -zxvf
3、基本環境安裝
yum install gcc-c++
4、進入解壓后的目錄
make
4、進入src目錄
make install
- 此處如果有問題的話可以自行搜索相關安裝教程,根據自己需求下載window版或者linux版
Redis運行
先設置daemonize no改成yes,開啟后台啟動
修改redis.conf(128行)文件將里面的daemonize no 改成yes,讓服務在后台啟動
protected-mode 設置成no
默認是設置成yes的, 防止了遠程訪問,在redis3.2.3版本后
找到# requirepass foobared
刪除前面的注釋符號#,並把foobared修改成自己的密碼
或者另起一行 requirepass 自己的密碼
- 一定要設置密碼阿,之前被黑客通過redis黑進服務器了,植入了挖抗病毒麻了🙊
要注意是在啟動server的時候選擇conf!!!!!
./redis-server ../redis.conf
此處要選擇conf來啟動
./redis-cli -p 端口號
服務器連接
進入后 auth 你設置的密碼
驗證通過后才能成功操作
Redis關閉與退出
SHUTDOWN
- 然后exit
性能分析
redis-benchmark 是一個壓力測試工具!
官方自帶的性能測試工具!
redis-benchmark 命令參數!
我們來簡單測試下:
如何查看這些分析呢?
基礎知識
默認16個數據庫,可以用select切換
Redis是很快的,官方表示,Redis是基於內存操作,CPU不是Redis性能瓶頸,Redis的瓶頸是根據
機器的內存和網絡帶寬,既然可以使用單線程來實現,就使用單線程了!
Redis 是C 語言寫的,官方提供的數據為 100000+ 的QPS,完全不比同樣是使用 key-vale的Memecache差!
Redis 為什么單線程還這么快?
1、誤區1:高性能的服務器一定是多線程的?
2、誤區2:多線程(CPU上下文會切換!)一定比單線程效率高!
核心:redis 是將所有的數據全部放在內存中的,所以說使用單線程去操作效率就是最高的,多線程(CPU上下文會切換:耗時的操作!!!),對於內存系統來說,如果沒有上下文切換效率就是最高的!多次讀寫都是在一個CPU上的,在內存情況下,這個就是最佳的方案!
命令大全:
Redis-Key
EXPIRE key10
10s就過期
ttl key
查詢過期剩余時間
type key
查看類型
move name 1(1表示當前數據庫)
移除當前的key
大雜燴
127.0.0.1:6379> keys * # 查看所有的key
(empty list or set)
127.0.0.1:6379> set name kuangshen # set key
OK
127.0.0.1:6379> keys *
1) "name"
127.0.0.1:6379> set age 1
OK
127.0.0.1:6379> keys *
1) "age"
2) "name"
127.0.0.1:6379> EXISTS name # 判斷當前的key是否存在
(integer) 1
127.0.0.1:6379> EXISTS name1
(integer) 0
127.0.0.1:6379> move name 1 # 移除當前的key
(integer) 1
127.0.0.1:6379> keys *
1) "age"
127.0.0.1:6379> set name qinjiang
OK
127.0.0.1:6379> keys *
1) "age"
2) "name"
127.0.0.1:6379> clear
127.0.0.1:6379> keys *
1) "age"
2) "name"
127.0.0.1:6379> get name
"qinjiang"
127.0.0.1:6379> EXPIRE name 10 # 設置key的過期時間,單位是秒
(integer) 1
127.0.0.1:6379> ttl name # 查看當前key的剩余時間
(integer) 4
127.0.0.1:6379> ttl name
(integer) 3
127.0.0.1:6379> ttl name
(integer) 2
127.0.0.1:6379> ttl name
(integer) 1
127.0.0.1:6379> ttl name
(integer) -2
127.0.0.1:6379> get name
(nil)
127.0.0.1:6379> type name # 查看當前key的一個類型!
string
127.0.0.1:6379> type age
string
String
appen和strlen
127.0.0.1:6379> set key1 v1 # 設置值
OK
127.0.0.1:6379> get key1 # 獲得值
"v1"
127.0.0.1:6379> keys * # 獲得所有的key
1) "key1"
127.0.0.1:6379> EXISTS key1 # 判斷某一個key是否存在
(integer) 1
127.0.0.1:6379> APPEND key1 "hello" # 追加字符串,如果當前key不存在,就相當於setkey
(integer) 7
127.0.0.1:6379> get key1
"v1hello"
127.0.0.1:6379> STRLEN key1 # 獲取字符串的長度!
(integer) 7
127.0.0.1:6379> APPEND key1 ",kaungshen"
(integer) 17
127.0.0.1:6379> STRLEN key1
(integer) 17
127.0.0.1:6379> get key1
"v1hello,kaungshen"
incr和decr 自增和自減
# i++
# 步長 i+=
127.0.0.1:6379> set views 0 # 初始瀏覽量為0
OK
127.0.0.1:6379> get views
"0"
127.0.0.1:6379> incr views # 自增1 瀏覽量變為1
(integer) 1
127.0.0.1:6379> incr views
(integer) 2
127.0.0.1:6379> get views
"2"
127.0.0.1:6379> decr views # 自減1 瀏覽量-1
(integer) 1
127.0.0.1:6379> decr views
(integer) 0
127.0.0.1:6379> decr views
(integer) -1
127.0.0.1:6379> get views
"-1"
127.0.0.1:6379> INCRBY views 10 # 可以設置步長,指定增量!
(integer) 9
127.0.0.1:6379> INCRBY views 10
(integer) 19
127.0.0.1:6379> DECRBY views 5
getRange字符串范圍 setRange替換指定位置開始的字符串
# 字符串范圍 range
127.0.0.1:6379> set key1 "hello,kuangshen" # 設置 key1 的值
OK
127.0.0.1:6379> get key1
"hello,kuangshen"
127.0.0.1:6379> GETRANGE key1 0 3 # 截取字符串 [0,3]
"hell"
127.0.0.1:6379> GETRANGE key1 0 -1 # 獲取全部的字符串 和 get key是一樣的
"hello,kuangshen"
# 替換!
127.0.0.1:6379> set key2 abcdefg
OK
127.0.0.1:6379> get key2
"abcdefg"
127.0.0.1:6379> SETRANGE key2 1 xx # 替換指定位置開始的字符串!
(integer) 7
127.0.0.1:6379> get key2
"axxdefg"
設置過期時間setex setnx(不存在才設置,存在時會失敗)
# setex (set with expire) # 設置過期時間
# setnx (set if not exist) # 不存在時再設置(在分布式鎖中會常常使用!)
127.0.0.1:6379> setex key3 30 "hello" # 設置key3的值為 hello,30秒后過期
OK
127.0.0.1:6379> ttl key3
(integer) 26
127.0.0.1:6379> get key3
"hello"
127.0.0.1:6379> setnx mykey "redis" # 如果mykey 不存在,創建mykey
(integer) 1
127.0.0.1:6379> keys *
1) "key2"
2) "mykey"
3) "key1"
127.0.0.1:6379> ttl key3
(integer) -2
127.0.0.1:6379> setnx mykey "MongoDB" # 如果mykey存在,創建失敗!
(integer) 0
127.0.0.1:6379> get mykey
mset/get 同時設置/獲取多個值
msetnx 是一個原子性的操作,要么一起成功,要么一起
mset
mget
127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3 # 同時設置多個值
OK
127.0.0.1:6379> keys *
1) "k1"
2) "k2"
3) "k3"
127.0.0.1:6379> mget k1 k2 k3 # 同時獲取多個值
1) "v1"
2) "v2"
3) "v3"
127.0.0.1:6379> msetnx k1 v1 k4 v4 # msetnx 是一個原子性的操作,要么一起成功,要么一起失敗!
(integer) 0
127.0.0.1:6379> get k4
(nil)
對象 set user:1:name xxx user:1:age xxx
其實相當於key=user:1:name這一串很長的,然后:后邊可以更換屬性
# 對象
set user:1 {name:zhangsan,age:3} # 設置一個user:1 對象 值為 json字符來保存一個對象!
# 這里的key是一個巧妙的設計: user:{id}:{filed} , 如此設計在Redis中是完全OK了!
127.0.0.1:6379> mset user:1:name zhangsan user:1:age 2
OK
127.0.0.1:6379> mget user:1:name user:1:age
1) "zhangsan"
2) "2"
get同時set
getset # 先get然后在set
127.0.0.1:6379> getset db redis # 如果不存在值,則返回 nil
(nil)
127.0.0.1:6379> get db
"redis
127.0.0.1:6379> getset db mongodb # 如果存在值,獲取原來的值,並設置新的值
"redis"
127.0.0.1:6379> get db
"mongodb"
使用場景
String類似的使用場景:value除了是我們的字符串還可以是我們的數字!
計數器
統計多單位的數量
粉絲數
對象緩存存儲!
Hash(跟String類似的其實)
Map集合,key-map! 時候這個值是一個map集合! 本質和String類型沒有太大區別,還是一個簡單的 key-vlaue!
hash存儲經常變更的數據 user name age,尤其是是用戶信息之類的,經常變動的信息! hash 更適合於對象的存儲,String更加適合字符串存儲!
hset/get
hdel(類似String的del)
hkeys/hvals(只獲得所有key/val)
同樣可以指定增量,可以hsetnx
127.0.0.1:6379> hset myhash field1 kuangshen # set一個具體 key-vlaue
(integer) 1
127.0.0.1:6379> hget myhash field1 # 獲取一個字段值
"kuangshen"
127.0.0.1:6379> hmset myhash field1 hello field2 world # set多個 key-vlaue
OK
127.0.0.1:6379> hmget myhash field1 field2 # 獲取多個字段值
1) "hello"
2) "world"
127.0.0.1:6379> hgetall myhash # 獲取全部的數據,
1) "field1"
2) "hello"
3) "field2"
4) "world"
127.0.0.1:6379> hdel myhash field1 # 刪除hash指定key字段!對應的value值也就消失了!
(integer) 1
127.0.0.1:6379> hgetall myhash
1) "field2"
2) "world"
##########################################################################
hlen
127.0.0.1:6379> hmset myhash field1 hello field2 world
OK
127.0.0.1:6379> HGETALL myhash
1) "field2"
2) "world"
3) "field1"
4) "hello"
127.0.0.1:6379> hlen myhash # 獲取hash表的字段數量!
(integer) 2
##########################################################################
127.0.0.1:6379> HEXISTS myhash field1 # 判斷hash中指定字段是否存在!
(integer) 1
127.0.0.1:6379> HEXISTS myhash field3
(integer) 0
##########################################################################
# 只獲得所有field
# 只獲得所有value
127.0.0.1:6379> hkeys myhash # 只獲得所有field
1) "field2"
2) "field1"
127.0.0.1:6379> hvals myhash # 只獲得所有value
1) "world"
2) "hello"
##########################################################################
incr decr
127.0.0.1:6379> hset myhash field3 5 #指定增量!
(integer) 1
127.0.0.1:6379> HINCRBY myhash field3 1
(integer) 6
127.0.0.1:6379> HINCRBY myhash field3 -1
(integer) 5
127.0.0.1:6379> hsetnx myhash field4 hello # 如果不存在則可以設置
(integer) 1
127.0.0.1:6379> hsetnx myhash field4 world # 如果存在則不能設置
(integer) 0
List
可以玩成雙向隊列,棧等
- 底層是一個雙向鏈表,對於兩段的插入刪除效率比較高,而對於中間索引值的插入和更改就相對慢了,因為查詢慢
Lpush和Rpush LRange 查看區間值(0 -1就會查所有)
只在左邊進的話就類似棧,
127.0.0.1:6379> LPUSH list one # 將一個值或者多個值,插入到列表頭部 (左)
(integer) 1
127.0.0.1:6379> LPUSH list two
(integer) 2
127.0.0.1:6379> LPUSH list three
(integer) 3
127.0.0.1:6379> LRANGE list 0 -1 # 獲取list中值!
1) "three"
2) "two"
3) "one"
127.0.0.1:6379> LRANGE list 0 1 # 通過區間獲取具體的值!
1) "three"
2) "two"
127.0.0.1:6379> Rpush list righr # 將一個值或者多個值,插入到列表尾部 (右)
(integer) 4
127.0.0.1:6379> LRANGE list 0 -1
1) "three"
2) "two"
3) "one"
4) "righr"
Lpop和Rpop
LPOP
RPOP
127.0.0.1:6379> LRANGE list 0 -1
1) "three"
2) "two"
3) "one"
4) "righr"
127.0.0.1:6379> Lpop list # 移除list的第一個元素
"three"
127.0.0.1:6379> Rpop list # 移除list的最后一個元素
"righr"
127.0.0.1:6379> LRANGE list 0 -1
1) "two"
2) "one"
Lindex(通過下標獲得 list 中的某一個值)
Lindex
127.0.0.1:6379> LRANGE list 0 -1
1) "two"
2) "one"
127.0.0.1:6379> lindex list 1 # 通過下標獲得 list 中的某一個值!
"one"
127.0.0.1:6379> lindex list 0
"two"
Llen 獲取長度
Llen
127.0.0.1:6379> Lpush list one
(integer) 1
127.0.0.1:6379> Lpush list two
bilibili:狂神說Java(integer) 2
127.0.0.1:6379> Lpush list three
(integer) 3
127.0.0.1:6379> Llen list # 返回列表的長度
(integer) 3
Lrem 移除count個指定value的值
注意是從左邊開始刪掉
移除指定的值!
取關 uid
Lrem
127.0.0.1:6379> LRANGE list 0 -1
1) "three"
2) "three"
3) "two"
4) "one"
127.0.0.1:6379> lrem list 1 one # 移除list集合中指定個數的value,精確匹配
(integer) 1
127.0.0.1:6379> LRANGE list 0 -1
1) "three"
2) "three"
3) "two"
127.0.0.1:6379> lrem list 1 three
(integer) 1
127.0.0.1:6379> LRANGE list 0 -1
1) "three"
2) "two"
127.0.0.1:6379> Lpush list three
(integer) 3
127.0.0.1:6379> lrem list 2 three
(integer) 2
127.0.0.1:6379> LRANGE list 0 -1
1) "two"
trim 截取
通過下標截取指定的長度,可以理解為只保留截取區間的元素
trim 修剪。; list 截斷!
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379> Rpush mylist "hello"
(integer) 1
127.0.0.1:6379> Rpush mylist "hello1"
(integer) 2
127.0.0.1:6379> Rpush mylist "hello2"
(integer) 3
127.0.0.1:6379> Rpush mylist "hello3"
(integer) 4
127.0.0.1:6379> ltrim mylist 1 2 # 通過下標截取指定的長度,這個list已經被改變了,截斷了,只剩下截取的元素!
OK
127.0.0.1:6379> LRANGE mylist 0 -1
1) "hello1"
2) "hello2"
rpoplpush 移除指定的值,移動到新的list中
rpoplpush # 移除列表的最后一個元素,將他移動到新的列表中!
127.0.0.1:6379> rpush mylist "hello"
(integer) 1
127.0.0.1:6379> rpush mylist "hello1"
(integer) 2
127.0.0.1:6379> rpush mylist "hello2"
(integer) 3
127.0.0.1:6379> rpoplpush mylist myotherlist # 移除列表的最后一個元素,將他移動到新的列表中!
"hello2"
127.0.0.1:6379> lrange mylist 0 -1 # 查看原來的列表
1) "hello"
2) "hello1"
127.0.0.1:6379> lrange myotherlist 0 -1 # 查看目標列表中,確實存在改值!
1) "hello2"
Lset 修改指定下標的值
lset #將列表中指定下標的值替換為另外一個值,更新操作
127.0.0.1:6379> EXISTS list # 判斷這個列表是否存在
(integer) 0
127.0.0.1:6379> lset list 0 item # 如果不存在列表我們去更新就會報錯
(error) ERR no such key
127.0.0.1:6379> lpush list value1
(integer) 1
127.0.0.1:6379> LRANGE list 0 0
1) "value1"
127.0.0.1:6379> lset list 0 item # 如果存在,更新當前下標的值
OK
127.0.0.1:6379> LRANGE list 0 0
1) "item"
127.0.0.1:6379> lset list 1 other # 如果不存在,則會報錯!
(error) ERR index out of range
Insert 在指定value前邊或者后邊插入值
linsert # 將某個具體的value插入到列把你中某個元素的前面或者后面!
127.0.0.1:6379> Rpush mylist "hello"
(integer) 1
127.0.0.1:6379> Rpush mylist "world"
(integer) 2
127.0.0.1:6379> LINSERT mylist before "world" "other"
(integer) 3
127.0.0.1:6379> LRANGE mylist 0 -1
1) "hello"
2) "other"
3) "world"
127.0.0.1:6379> LINSERT mylist after world new
(integer) 4
127.0.0.1:6379> LRANGE mylist 0 -1
1) "hello"
2) "other"
3) "world"
4) "new"
Set
底層是一個哈希表,查詢是O(1)
成員相關
Sadd 添加成員
Smember 查看所有成員
SIsMember 查看是否是成員之一
Scard 獲取元素個數
srem 刪除指定value
SrandMember 隨機獲取成員(不會pop)
Spop 隨機pop出來
大雜燴
127.0.0.1:6379> sadd myset "hello" # set集合中添加勻速
(integer) 1
127.0.0.1:6379> sadd myset "kuangshen"
(integer) 1
127.0.0.1:6379> sadd myset "lovekuangshen"
(integer) 1
127.0.0.1:6379> SMEMBERS myset # 查看指定set的所有值
1) "hello"
2) "lovekuangshen"
3) "kuangshen"
127.0.0.1:6379> SISMEMBER myset hello # 判斷某一個值是不是在set集合中!
(integer) 1
127.0.0.1:6379> SISMEMBER myset world
(integer) 0
##########################################################################
127.0.0.1:6379> scard myset # 獲取set集合中的內容元素個數!
(integer) 4
##########################################################################
rem
127.0.0.1:6379> srem myset hello # 移除set集合中的指定元素
(integer) 1
127.0.0.1:6379> scard myset
(integer) 3
127.0.0.1:6379> SMEMBERS myset
1) "lovekuangshen2"
2) "lovekuangshen"
3) "kuangshen"
##########################################################################
set 無序不重復集合。抽隨機!
127.0.0.1:6379> SMEMBERS myset
1) "lovekuangshen2"
2) "lovekuangshen"
3) "kuangshen"
127.0.0.1:6379> SRANDMEMBER myset # 隨機抽選出一個元素
"kuangshen"
127.0.0.1:6379> SRANDMEMBER myset
"kuangshen"
127.0.0.1:6379> SRANDMEMBER myset
"kuangshen"
127.0.0.1:6379> SRANDMEMBER myset
"kuangshen"
127.0.0.1:6379> SRANDMEMBER myset 2 # 隨機抽選出指定個數的元素
1) "lovekuangshen"
2) "lovekuangshen2"
127.0.0.1:6379> SRANDMEMBER myset 2
1) "lovekuangshen"
2) "lovekuangshen2"
127.0.0.1:6379> SRANDMEMBER myset # 隨機抽選出一個元素
"lovekuangshen2"
##########################################################################
刪除定的key,隨機刪除key!
127.0.0.1:6379> SMEMBERS myset
1) "lovekuangshen2"
2) "lovekuangshen"
3) "kuangshen"
127.0.0.1:6379> spop myset # 隨機刪除一些set集合中的元素!
"lovekuangshen2"
127.0.0.1:6379> spop myset
"lovekuangshen"
127.0.0.1:6379> SMEMBERS myset
1) "kuangshen"
##########################################################################
將一個指定的值,移動到另外一個set集合!
127.0.0.1:6379> sadd myset "hello"
(integer) 1
127.0.0.1:6379> sadd myset "world"
(integer) 1
127.0.0.1:6379> sadd myset "kuangshen"
(integer) 1
127.0.0.1:6379> sadd myset2 "set2"
(integer) 1
127.0.0.1:6379> smove myset myset2 "kuangshen" # 將一個指定的值,移動到另外一個set集
合!
(integer) 1
127.0.0.1:6379> SMEMBERS myset
1) "world"
2) "hello"
127.0.0.1:6379> SMEMBERS myset2
1) "kuangshen"
2) "set2"
##########################################################################
微博,B站,共同關注!(並集)
數字集合類:
- 差集 SDIFF
- 交集
- 並集
127.0.0.1:6379> SDIFF key1 key2 # 差集
1) "b"
2) "a"
127.0.0.1:6379> SINTER key1 key2 # 交集 共同好友就可以這樣實現
1) "c"
127.0.0.1:6379> SUNION key1 key2 # 並集
1) "b"
2) "c"
3) "e"
4) "a"
5) "d"
ZSet(有序集合)
在set的基礎上,增加了一個值
127.0.0.1:6379> zadd myset 1 one # 添加一個值
(integer) 1
127.0.0.1:6379> zadd myset 2 two 3 three # 添加多個值
(integer) 2
127.0.0.1:6379> ZRANGE myset 0 -1
1) "one"
2) "two"
3) "three"
##########################################################################
排序如何實現
127.0.0.1:6379> zadd salary 2500 xiaohong # 添加三個用戶
(integer) 1
127.0.0.1:6379> zadd salary 5000 zhangsan
(integer) 1
127.0.0.1:6379> zadd salary 500 kaungshen
(integer) 1
# ZRANGEBYSCORE key min max
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf # 顯示全部的用戶 從小到大!
1) "kaungshen"
2) "xiaohong"
3) "zhangsan"
127.0.0.1:6379> ZREVRANGE salary 0 -1 # 從大到進行排序!
1) "zhangsan"
2) "kaungshen"
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf withscores # 顯示全部的用戶並且附帶成
績 1)
"kaungshen"
2) "500"
3) "xiaohong"
4) "2500"
5) "zhangsan"
6) "5000"
127.0.0.1:6379> ZRANGEBYSCORE salary -inf 2500 withscores # 顯示工資小於2500員工的升
序排序!
1) "kaungshen"
2) "500"
3) "xiaohong"
4) "2500"
##########################################################################
# 移除rem中的元素
127.0.0.1:6379> zrange salary 0 -1
1) "kaungshen"
2) "xiaohong"
3) "zhangsan"
127.0.0.1:6379> zrem salary xiaohong # 移除有序集合中的指定元素
(integer) 1
127.0.0.1:6379> zrange salary 0 -1
1) "kaungshen"
2) "zhangsan"
127.0.0.1:6379> zcard salary # 獲取有序集合中的個數
(integer) 2
##########################################################################
127.0.0.1:6379> zadd myset 1 hello
(integer) 1
127.0.0.1:6379> zadd myset 2 world 3 kuangshen
(integer) 2
127.0.0.1:6379> zcount myset 1 3 # 獲取指定區間的成員數量!
(integer) 3
127.0.0.1:6379> zcount myset 1 2
(integer) 2
Redis.conf配置
Units單位
配置大小單位,開頭定義了一些基本的度量單位,只支持bytes,不支持bit
大小寫不敏感
INCLUDES包含
類似jsp中的include,多實例的情況可以把公用的配置文件提取出來
讓外網連接
可以設置bind 為對應主機的ip
- 若想讓所有ip都能訪問,注釋掉這一行就好了,即不寫的情況下,無限制接受任何ip地址的訪問
同時protected-mode 也得設置成no
Port
端口號,默認 6379
tcp-backlog
設置tcp的backlog,backlog其實是一個連接隊列,backlog隊列總和=未完成三次握手隊列+ 已經完成三次握手隊列。
在高並發環境下你需要一個高backlog值來避免慢客戶端連接問題。
注意Linux內核會將這個值減小到/proc/sys/net/core/somaxconn的值(128),所以需要確認增大/proc/sys/net/core/somaxconn和/proc/sys/net/ipv4/tcp_max_syn_backlog(128)兩個值來達到想要的效果
timeout
一個空閑的客戶端維持多少秒會關閉,0表示關閉該功能。即永不關閉。
tcp-keepalive
對訪問客戶端的一種心跳檢測,每個n秒檢測一次。
單位為秒,如果設置為0,則不會進行Keepalive檢測,建議設置成60
pidfile
存放pid文件的位置,每個實例會產生一個不同的pid文件
loglevel
指定日志記錄級別,Redis總共支持四個級別:debug、verbose、notice、warning,默認為notice
四個級別根據使用階段來選擇,生產環境選擇notice 或者warning
databases 16
設定庫的數量默認16,默認數據庫為0,可以使用SELECT
Limt限制
maxclients
Ø 設置redis同時可以與多少個客戶端進行連接。
Ø 默認情況下為10000個客戶端。
Ø 如果達到了此限制,redis則會拒絕新的連接請求,並且向這些連接請求方發出“max number of clients reached”以作回應。
maxmemory
Ø 建議必須設置,否則,將內存占滿,造成服務器宕機
Ø 設置redis可以使用的內存量。一旦到達內存使用上限,redis將會試圖移除內部數據,移除規則可以通過maxmemory-policy來指定。
Ø 如果redis無法根據移除規則來移除內存中的數據,或者設置了“不允許移除”,那么redis則會針對那些需要申請內存的指令返回錯誤信息,比如SET、LPUSH等。
Ø 但是對於無內存申請的指令,仍然會正常響應,比如GET等。如果你的redis是主redis(說明你的redis有從redis),那么在設置內存使用上限時,需要在系統中留出一些內存空間給同步隊列緩存,只有在你設置的是“不移除”的情況下,才不用考慮這個因素。
maxmemory-policy
Ø volatile-lru:使用LRU算法移除key,只對設置了過期時間的鍵;(最近最少使用)
Ø allkeys-lru:在所有集合key中,使用LRU算法移除key
Ø volatile-random:在過期集合中移除隨機的key,只對設置了過期時間的鍵
Ø allkeys-random:在所有集合key中,移除隨機的key
Ø volatile-ttl:移除那些TTL值最小的key,即那些最近要過期的key
Ø noeviction:不進行移除。針對寫操作,只是返回錯誤信息
maxmemory-samples
Ø 設置樣本數量,LRU算法和最小TTL算法都並非是精確的算法,而是估算值,所以你可以設置樣本的大小,redis默認會檢查這么多個key並選擇其中LRU的那個。
Ø 一般設置3到7的數字,數值越小樣本越不准確,但性能消耗越小。
daemonize
Redis 默認不是以守護進程的方式運行,可以通過該配置項修改,使用 yes 啟用守護進程
持久化配置
rdb
aof
設置密碼
通過命令行修改(不推薦,重啟就沒了)
修改redis.conf
vim進入后,直接輸入/requirepass 找到被注釋掉的那一欄,自行設置密碼
密碼設置之后,當你退出再次連上redis的時候,就需要輸入密碼了,不然是無法操作的。這里有兩種方式輸入密碼,一是連接的時候直接輸入密碼,而是連接上之后再輸入密碼
連接時輸入密碼
-a 后邊加密碼
連接后輸入密碼
限制client(一般不用管)
SpringBoot整合
SpringBoot 操作數據:spring-data jpa jdbc mongodb redis!
說明: 在 SpringBoot2.x 之后,原來使用的jedis 被替換為了 lettuce
jedis : 采用的直連,多個線程操作的話,是不安全的,如果想要避免不安全的,使用 jedis pool 連接池! 更像 BIO 模式
lettuce : 采用netty,實例可以在多個線程中進行共享,不存在線程不安全的情況!可以減少線程數據了,更像 NIO 模式
自定義RedisTemplate
package com.example.redis.conf;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.net.UnknownHostException;
@Configuration
public class RedisConfig {
/**
* 編寫自己的redisTemplate----固定模板
* @param redisConnectionFactory
* @return
* @throws UnknownHostException
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
//為了開發方便,一般直接使用<String,object>
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
//json序列化配置
Jackson2JsonRedisSerializer Jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
Jackson2JsonRedisSerializer.setObjectMapper(om);
//string序列化配置
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
/*配置具體的序列化方式*/
//key采用string的序列化方式
template.setKeySerializer(stringRedisSerializer);
//hash的key采用string的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
//value序列化方式采用jackson
template.setValueSerializer(Jackson2JsonRedisSerializer);
//hash的value序列化方式采用jackson
template.setHashValueSerializer(Jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
Redis工具類
package com.example.redis.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
//在真實開發中,經常使用
@Component
public final class RedisUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// =============================common============================
/**
* 指定緩存失效時間
*
* @param key 鍵
* @param time 時間(秒)
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根據key 獲取過期時間
*
* @param key 鍵 不能為null
* @return 時間(秒) 返回0代表為永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* 判斷key是否存在
*
* @param key 鍵
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 刪除緩存
*
* @param key 可以傳一個值 或多個
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
}
}
}
// ============================String=============================
/**
* 普通緩存獲取
*
* @param key 鍵
* @return 值
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 普通緩存放入
*
* @param key 鍵
* @param value 值
* @return true成功 false失敗
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 普通緩存放入並設置時間
*
* @param key 鍵
* @param value 值
* @param time 時間(秒) time要大於0 如果time小於等於0 將設置無限期
* @return true成功 false 失敗
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time,
TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 遞增
*
* @param key 鍵
* @param delta 要增加幾(大於0)
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("遞增因子必須大於0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/**
* 遞減
*
* @param key 鍵
* @param delta 要減少幾(小於0)
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("遞減因子必須大於0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
// ================================Map=================================
/**
* HashGet
*
* @param key 鍵 不能為null
* @param item 項 不能為null
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
/**
* 獲取hashKey對應的所有鍵值
*
* @param key 鍵
* @return 對應的多個鍵值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
*
* @param key 鍵
* @param map 對應多個鍵值
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* HashSet 並設置時間
*
* @param key 鍵
* @param map 對應多個鍵值
* @param time 時間(秒)
* @return true成功 false失敗
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一張hash表中放入數據,如果不存在將創建
*
* @param key 鍵
* @param item 項
* @param value 值
* @return true 成功 false失敗
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一張hash表中放入數據,如果不存在將創建
*
* @param key 鍵
* @param item 項
* @param value 值
* @param time 時間(秒) 注意:如果已存在的hash表有時間,這里將會替換原有的時間
* @return true 成功 false失敗
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 刪除hash表中的值
*
* @param key 鍵 不能為null
* @param item 項 可以使多個 不能為null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
/**
* 判斷hash表中是否有該項的值
*
* @param key 鍵 不能為null
* @param item 項 不能為null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
/**
* hash遞增 如果不存在,就會創建一個 並把新增后的值返回
*
* @param key 鍵
* @param item 項
* @param by 要增加幾(大於0)
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
/**
* hash遞減
*
* @param key 鍵
* @param item 項
* @param by 要減少記(小於0)
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
// ============================set=============================
/**
* 根據key獲取Set中的所有值
*
* @param key 鍵
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 根據value從一個set中查詢,是否存在
*
* @param key 鍵
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 將數據放入set緩存
*
* @param key 鍵
* @param values 值 可以是多個
* @return 成功個數
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 將set數據放入緩存
*
* @param key 鍵
* @param time 時間(秒)
* @param values 值 可以是多個
* @return 成功個數
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0) {
expire(key, time);
}
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 獲取set緩存的長度
*
* @param key 鍵
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 移除值為value的
*
* @param key 鍵
* @param values 值 可以是多個
* @return 移除的個數
*/
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
// ===============================list=================================
/**
* 獲取list緩存的內容
*
* @param key 鍵
* @param start 開始
* @param end 結束 0 到 -1代表所有值
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 獲取list緩存的長度
*
* @param key 鍵
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 通過索引 獲取list中的值
*
* @param key 鍵
* @param index 索引 index>=0時, 0 表頭,1 第二個元素,依次類推;index<0
* 時,-1,表尾,-2倒數第二個元素,依次類推
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 將list放入緩存
*
* @param key 鍵
* @param value 值
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 將list放入緩存
*
* @param key 鍵
* @param value 值
* @param time 時間(秒)
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 將list放入緩存
*
* @param key 鍵
* @param value 值
* @return
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 將list放入緩存
*
* @param key 鍵
* @param value 值
* @param time 時間(秒)
* @return
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根據索引修改list中的某條數據
*
* @param key 鍵
* @param index 索引
* @param value 值
* @return
*/
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 移除N個值為value
*
* @param key 鍵
* @param count 移除多少個
* @param value 值
* @return 移除的個數
*/
public long lRemove (String key,long count, Object value){
try {
Long remove = redisTemplate.opsForList().remove(key, count,value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}
Redis持久化
Redis 是內存數據庫,如果不將內存中的數據庫狀態保存到磁盤,那么一旦服務器進程退出,服務器中的數據庫狀態也會消失。所以 Redis 提供了持久化功能!
rdb (Redis DataBase)
在指定的時間間隔內將內存中的數據集快照寫入磁盤,也就是行話講的Snapshot快照,它恢復時是將快照文件直接讀到內存里。
Redis會單獨創建(fork)一個子進程來進行持久化,會先將數據寫入到一個臨時文件中,待持久化過程都結束了,再用這個臨時文件替換上次持久化好的文件。整個過程中,主進程是不進行任何IO操作的!!!!
這就確保了極高的性能。如果需要進行大規模數據的恢復,且對於數據恢復的完整性不是非常敏感,RDB方式要比AOF方式更加的高效。
流程圖
相關配置回顧
save
保存的文件是dump.rdb
stop-writes-on-bgsave-error
當Redis無法寫入磁盤的話,直接關掉Redis的寫操作。
推薦yes
rdbcompression 壓縮文件
對於存儲到磁盤中的快照,可以設置是否進行壓縮存儲。如果是的話,redis會采用LZF算法進行壓縮。
- 如果你不想消耗CPU來進行壓縮的話,可以設置為關閉此功能。
推薦yes
rdbchecksum 檢查完整性
在存儲快照后,還可以讓redis使用CRC64算法來進行數據校驗
- 但是這樣做會增加大約10%的性能消耗,如果希望獲取到最大的性能提升,可以關閉此功能
推薦yes
redis備份
在服務器啟動的時候,先備份一下dump.rdb
cp dump.rdb dump2.rdb
然后刪除掉dump.rdb 模擬一下數據丟失的實際場景
關閉redis服務器
接下來要怎么恢復原來的數據呢,我們只需要 cp dump2.rdb dump.rdb
啟動Redis,備份數據會直接加載
- 簡單說就是先備份一下rdb文件 , 然后需要重新加載的時候再改回原來的默認名字就好了 , redis就會自動重新加載
優點
- 適合大規模的數據恢復
- 對數據完整性和一致性要求不高更適合使用
- 節省磁盤空間
- 恢復速度快
缺點
最后一次持久化后的數據可能丟失。
比如說 我們設置了save 20 3 --> 20秒內如果有3個key發生變化就會去持久化操作寫入文件
那假如說,現在我們第一個20秒已經設置3個了,存進去了
而第二個20秒,我們設置了2個,這時服務器突然掛掉了,那是不是就不會進行持久化了(因為還沒有達到3個的條件),然后我們剛才設置的那兩個也就自然而然沒有進行持久化,就造成了數據丟失
AOF(Append Only File)(流程還沒看)
默認不開啟
以日志的形式來記錄每個寫操作(增量保存),將Redis執行過的所有寫指令記錄下來(讀操作不記錄),只許追加文件但不可以改寫文件,redis啟動之初會讀取該文件重新構建數據,換言之,redis 重啟的話就根據日志文件的內容將寫指令從前到后執行一次以完成數據的恢復工作
同步頻率設置
appendfsync always
始終同步,每次Redis的寫入都會立刻記入日志;性能較差但數據完整性比較好
appendfsync everysec
每秒同步,每秒記入日志一次,如果宕機,本秒的數據可能丟失。
appendfsync no
redis不主動進行同步,把同步時機交給操作系統。
重寫
重寫只關心最終的結果,不關心你的過程,把兩條語句壓縮成一條了
auto-aof-rewrite-percentage:
設置重寫的基准值,(文件是原來重寫后文件的2倍時觸發)
auto-aof-rewrite-min-size:
設置重寫的基准值,最小文件64MB。達到這個值開始重寫。
例如:文件達到70MB開始重寫,降到50MB,下次什么時候開始重寫?100MB
系統載入時或者上次重寫完畢時,Redis會記錄此時AOF大小,設為base_size,
如果Redis的AOF當前大小>= base_size +base_size*100% (默認)且當前大小>=64mb(默認)的情況下,Redis會對AOF進行重寫。
缺點
- 比起RDB占用更多的磁盤空間。
- 恢復備份速度要慢。
- 每次讀寫都同步的話,有一定的性能壓力。
- 存在個別Bug,造成恢復不能。
建議
官方推薦兩個都啟用。
在這種情況下,當redis重啟的時候會優先載入AOF文件來恢復原始的數據, 因為在通常情況下AOF文件保存的數據集要比RDB文件保存的數據集要完整.
RDB的數據不實時,同時使用兩者時服務器重啟也只會找AOF文件。那要不要只使用AOF呢?
- 建議不要,因為RDB更適合用於備份數據庫(AOF在不斷變化不好備份),快速重啟,而且不會有AOF可能潛在的bug,留着作為一個萬一的手段
如果對數據不敏感,可以單獨用RDB。
如果只是做純內存緩存,可以都不用。
發布與訂閱(原理狂神待看,菜鳥命令)
Redis 發布訂閱 (pub/sub) 是一種消息通信模式:發送者 (pub) 發送消息,訂閱者 (sub) 接收消息。
Redis 客戶端可以訂閱任意數量的頻道。
- 打開一個客戶端訂閱channel1
SUBSCRIBE channel1
- 打開另一個客戶端,給channel1發布消息hello
返回的1是訂閱者數量
- 打開第一個客戶端可以看到發送的消息
注:發布的消息沒有持久化,如果在訂閱的客戶端收不到hello,只能收到訂閱后發布的消息
訂閱
127.0.0.1:6379> SUBSCRIBE kuangshenshuo # 訂閱一個頻道 kuangshenshuo
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "kuangshenshuo"
3) (integer) 1
# 等待讀取推送的信息
1) "message" # 消息
2) "kuangshenshuo" # 那個頻道的消息
3) "hello,kuangshen" # 消息的具體內容
1) "message"
2) "kuangshenshuo"
3) "hello,redis"
發送端
127.0.0.1:6379> PUBLISH kuangshenshuo "hello,kuangshen" # 發布者發布消息到頻道!
(integer) 1
127.0.0.1:6379> PUBLISH kuangshenshuo "hello,redis" # 發布者發布消息到頻道!
(integer) 1
使用場景:
1、實時消息系統!
2、實時聊天!(頻道當做聊天室,將信息回顯給所有人即可!)
3、訂閱,關注系統都是可以的!
稍微復雜的場景我們就會使用 ---> 消息中間件MQ
Redis事務
定義
Redis事務是一個單獨的隔離操作:事務中的所有命令都會序列化、按順序地執行。事務在執行的過程中,不會被其他客戶端發送來的命令請求所打斷。
Redis事務的主要作用就是串聯多個命令防止別的命令插隊。
Multi、Exec、discard?(SpringBoot怎么實現)
從輸入Multi命令開始,輸入的命令都會依次進入命令隊列中,但不會執行,直到輸入Exec后,Redis會將之前的命令隊列中的命令依次執行。
組隊的過程中可以通過discard來放棄組隊。
//增加樂觀鎖
jedis.watch(qtkey);
//3.判斷庫存
String qtkeystr = jedis.get(qtkey);
if(qtkeystr==null || "".equals(qtkeystr.trim())) {
System.out.println("未初始化庫存");
jedis.close();
return false ;
}
int qt = Integer.parseInt(qtkeystr);
if(qt<=0) {
System.err.println("已經秒光");
jedis.close();
return false;
}
//增加事務
Transaction multi = jedis.multi();
//4.減少庫存
//jedis.decr(qtkey);
multi.decr(qtkey);
//5.加人
//jedis.sadd(usrkey, uid);
multi.sadd(usrkey, uid);
//執行事務
List<Object> list = multi.exec();
//判斷事務提交是否失敗
if(list==null || list.size()==0) {
System.out.println("秒殺失敗");
jedis.close();
return false;
}
System.err.println("秒殺成功");
jedis.close();
錯誤處理
組隊中某個命令出現了報告錯誤,執行時整個的所有隊列都會被取消。
此處在multi過程中,exec之前,我們對一個v1去increase 導致發生了錯誤,組隊期間的所有東西都會不成功
***與MySQL的區別!!!
如果執行階段某個命令報出了錯誤,則只有報錯的命令不會被執行,而其他的命令都會執行,不會回滾。
總結
在組隊期間發生錯誤,就會回滾。
- 而在執行階段發生錯誤了,則不會回滾。
悲觀鎖
樂觀鎖
版本機制,類似CAS中的ABA問題解決
但這里不會循環去查詢,而是直接就失敗了? 想一想這里會不會引發其他的問題呢?
對比
秒殺庫存變成負數問題
- 樂觀鎖和悲觀鎖都能解決,因為在每次購買下單之前,都會先去檢查一下
- 樂觀鎖會去檢查版本,發現版本號不一樣了,直接就失敗了!(那肯定不會出現負數的情況)
- 而悲觀鎖呢? 悲觀鎖是等待上一個人執行完了,再來操作.這樣可以保證不會出現負數的情況,同時也能夠繼續進行購買,不會說就此失敗了!
樂觀鎖引發的庫存遺留問題
解決--Lua腳本
將復雜的或者多步的redis操作,寫為一個腳本,一次提交給redis執行,減少反復連接redis的次數。提升性能。
LUA腳本是類似redis事務,有一定的原子性,不會被其他命令插隊,可以完成一些redis事務性的操作。
但是注意redis的lua腳本功能,只有在Redis 2.6以上的版本才可以使用。
利用lua腳本淘汰用戶,解決超賣問題。
redis 2.6版本以后,通過lua腳本解決爭搶問題,實際上是redis 利用其單線程的特性,用任務隊列的方式解決多任務並發問題。
watch
在執行multi之前,先執行watch key1 [key2],可以監視一個(或多個) key ,如果在事務執行之前這個(或這些) key 被其他命令所改動,那么事務將被打斷。
事務三特性
- 單獨的隔離操作
事務中的所有命令都會序列化、按順序地執行。事務在執行的過程中,不會被其他客戶端發送來的命令請求所打斷。
- 沒有隔離級別的概念
隊列中的命令沒有提交之前都不會實際被執行,因為事務提交前任何指令都不會被實際執行
- 不保證原子性
事務中如果有一條命令執行失敗,其后的命令仍然會被執行,沒有回滾
主從復制
主(寫)從(讀)
**從機只能讀,不能寫
一主二仆(附步驟)
拷貝多個redis.conf文件include(寫絕對路徑) ??
對應不同服務器不同配置文件
新建redis_7001.conf 修改conf
注意要開啟安全組端口
#引入我們原來的redis.conf文件(根據目錄引入)
include ../redis.conf
#在引入的基礎上,修改必要的選項
pidfile /var/run/redis_7001.pid
port 7001
dbfilename dump7001.rdb
appendonly no
daemonize yes
同理創建另外兩個conf,修改相應的pid文件,port,dufilename名字
#引入我們原來的redis.conf文件(根據目錄引入)
include ../redis.conf
#在引入的基礎上,修改必要的選項
pidfile /var/run/redis_7002.pid
port 7002
dbfilename dump7002.rdb
appendonly no
daemonize yes
#引入我們原來的redis.conf文件(根據目錄引入)
include ../redis.conf
#在引入的基礎上,修改必要的選項
pidfile /var/run/redis_7003.pid
port 7003
dbfilename dump7003.rdb
appendonly no
daemonize yes
查看三台主機運行情況
info replication
slaveof ip地址 端口
成為某個實例的從服務器
從機掛掉后,重啟沒辦法恢復成xx的從機
從機重啟需重設:slaveof 127.0.0.1 6379
但掛掉期間主服務的操作,重啟后還是可以看到的
薪火相傳
- 上一個Slave可以是下一個slave的Master,Slave同樣可以接收其他slaves的連接和同步請求,那么該slave作為了鏈條中下一個的master, 可以有效減輕master的寫壓力,去中心化降低風險
中途變更轉向:會清除之前的數據,重新建立拷貝最新的
風險是一旦某個slave宕機,后面的slave都沒法備份
主機掛了,從機還是從機,無法寫數據了
反客為主
小弟上位
主服務器掛掉后,從服務器只需要用 slaveof no one 就可以上位,變成主服務器了!
但是這里畢竟得自己手動輸入命令,才能上位,還是不太方便的在實際的業務場景中,那么如何實現更好的方法呢,得來看看后邊要講到的哨兵模式
復制原理
- 當從服務器連接上主服務器后,從服務器會主動向主服務器請求數據同步的消息
從服務器主動
- 主服務器接受到后,就會進行持久化操作,然后把rdb文件發送給從服務器,從服務器讀取rdb文件以達到同步的效果
- 每次主服務器進行了寫操作后,也會跟從服務器進行數據同步
主服務器主動
全量復制:slave服務在接收到數據庫文件數據后,將其存盤並加載到內存中。
增量復制:Master繼續將新的所有收集到的修改命令依次傳給slave,完成同步
但是只要是重新連接master,全量復制將被自動執行
哨兵模式
反客為主的自動版,能夠后台監控主機是否故障,如果故障了根據投票數自動將從庫轉換為主庫
- 哨兵模式是一種特殊的模式,首先Redis提供了哨兵的命令,哨兵是一個獨立的進程。其原理是哨兵通過發送命令,等待Redis服務器響應,從而監控運行的多個Redis實例 。
自定義的/myredis目錄下新建sentinel.conf文件,名字絕不能錯
配置哨兵,填寫內容
sentinel monitor mymaster 127.0.0.1 6379 1
其中mymaster為監控對象起的服務器名稱,1 為至少有多少個哨兵同意遷移的數量。
設置為1的話,只需要一個哨兵同意就可以切換,設置成2則需要有兩個哨兵都同意才進行切換(將從庫轉換為主庫)
啟動哨兵
執行redis-sentinel /myredis/sentinel.conf
當主機掛掉,從機選舉中產生新的主機
(大概10秒左右可以看到哨兵窗口日志,切換了新的主機)
哪個從機會被選舉為主機呢?根據優先級別:slave-priority
原主機重啟后會變為從機。
新皇登基了已經
復制延時
由於所有的寫操作都是先在Master上操作,然后同步更新到Slave上,所以從Master同步到Slave機器有一定的延遲,當系統很繁忙的時候,延遲問題會更加嚴重,Slave機器數量的增加也會使這個問題更加嚴重。
故障恢復
優先級在redis.conf中默認:slave-priority 100,值越小優先級越高
目前最新版本已經改名為 : replica-priority 默認是100
- 偏移量是指獲得原主機數據最全的
每個redis實例啟動后都會隨機生成一個40位的runid
集群(無中心化)
***好處
主從模式,薪火相傳模式,主機宕機,導致ip地址發生變化,應用程序中配置需要修改對應的主機地址、端口等信息。
之前通過代理主機來解決,但是redis3.0中提供了解決方案。就是無中心化集群配置。
代理
每一個又都需要從機,總共就需要8台
無中心化
只需要6台,每一台都能作為入口,然后再去轉移(互相訪問連通)
Redis 集群實現了對Redis的水平擴容,即啟動N個redis節點,將整個數據庫分布存儲在這N個節點中,每個節點存儲總數據的1/N。
- Redis 集群通過分區(partition)來提供一定程度的可用性(availability):即使集群中有一部分節點失效或者無法進行通訊,集群也可以繼續處理命令請求。
搭載集群
配置文件
同理拷貝出其他5個,總共是7001-7006(要開放相應端口,修改文件中的對應各種數字)
include ../redis.conf
pidfile /var/run/redis_7001.pid
port 7001
dbfilename dump7001.rdb
cluster-enabled yes
cluster-config-file nodes-7001.conf
cluster-node-timeout 15000
快速替換的小技巧
:%s/7001/7002g
帝皇俠合體
- redis-cli --cluster create --cluster-replicas 1 10.0.8.13:7001 10.0.8.13:7002 10.0.8.13:7003 10.0.8.13:7004 10.0.8.13:7005 10.0.8.13:7006
redis-cli --cluster create --cluster-replicas 1 后邊是 host:port host:port
- 注意如果是阿里雲或者騰訊雲的,這里得用內網ip!!!!
有設置密碼的還得在最后補充 -a 密碼
--replicas 1 采用最簡單的方式配置集群,一台主機,一台從機,正好三組。
輸入yes
成功界面
2223凄凄切切凄凄切切前驅群群群群群暈暈暈暈暈00
普通方式登錄
- 可能直接進入讀主機,存儲數據時,會出現MOVED重定向操作。所以,應該以集群方式登錄。
-c 采用集群策略連接
- ./redis-cli -c -p 7001
選擇任意一個端口連接都可以,設置數據會自動切換到相應的寫主機
查看節點信息
- CLUSTER NODES
分配原則
一個集群至少要有三個主節點。
選項--cluster-replicas 1 表示我們希望為集群中的每個主節點創建一個從節點。
分配原則盡量保證每個主數據庫運行在不同的IP地址,每個從庫和主庫不在一個IP地址上。
slot
一個 Redis 集群包含16384 個插槽(hash slot),數據庫中的每個鍵都屬於這16384 個插槽的其中一個,
集群使用公式 CRC16(key) % 16384
- 來計算鍵key 屬於哪個槽,其中CRC16(key) 語句用於計算鍵key 的CRC16 校驗和。
集群中的每個節點負責處理一部分插槽。舉個例子,如果一個集群可以有主節點,其中:
- 節點 A 負責處理0號至5460號插槽。
- 節點 B 負責處理5461號至10922號插槽。
- 節點 C 負責處理10923號至16383號插槽。
這里的分配有點類似哈希函數,主要是為了能夠盡量平均分配,讓三個主節點能夠平均分擔壓力
- 不過這里如果計算出來的值一樣(並不會像哈希一樣出現沖突),而是讓一個插槽放多個數據就好了
不在一個slot下的鍵值,是不能使用mget,mset等多鍵操作。
- 可以通過{}來定義組的概念,從而使key中{}內相同內容的鍵值對放到一個slot中去。
此時是根據{}內user計算,{}里的內容一樣,自然會放到同一個slot中去
計算重定向(無中心化的實現方式)
- 通過計算,然后去重定向到相應的節點上,這樣就可以實現各個節點能夠互相連通,自然而然每個節點都能作為入口了,因為他們可以重定向到別的節點,互相連通互相訪問!
主機掛掉了???
- 此時7003 fail掉了,此處還有待進一步驗證,現在服務器上掛着幾個項目不敢亂搭,寒假再拿另外一個服務器來試試。
JRedis集群開發
即使連接的不是主機,集群會自動切換主機存儲。主機寫,從機讀。
無中心化主從集群。無論從哪台主機寫的數據,其他主機上都能讀到數據。
public class JedisClusterTest {
public static void main(String[] args) {
Set<HostAndPort>set =new HashSet<HostAndPort>();
set.add(new HostAndPort("192.168.31.211",6379));
JedisCluster jedisCluster=new JedisCluster(set);
jedisCluster.set("k1", "v1");
System.out.println(jedisCluster.get("k1"));
}
}
另一種版本
public static void main(String[] args) {
Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();
//Jedis Cluster will attempt to discover cluster nodes automatically
jedisClusterNodes.add(new HostAndPort("127.0.0.1", 6371));
jedisClusterNodes.add(new HostAndPort("127.0.0.1", 6372));
jedisClusterNodes.add(new HostAndPort("127.0.0.1", 6373));
jedisClusterNodes.add(new HostAndPort("127.0.0.1", 6374));
jedisClusterNodes.add(new HostAndPort("127.0.0.1", 6375));
jedisClusterNodes.add(new HostAndPort("127.0.0.1", 6376));
JedisCluster jc = new JedisCluster(jedisClusterNodes);
jc.set("foo", "bar");
String value = jc.get("foo");
System.out.println(" ===> " + value);
}
💠整合篇預告
- 到這里,redis的入門知識篇算是整合完畢了,剩下的還有面試篇的三種緩存問題,包括如何解決,應用篇redis在項目中的實際應用場景,以及高級篇redis原理,有機會我們再來聊一聊。
參考
- 尚硅谷Redis6視頻
- 狂神說Redis視頻