緩存,隊列(Redis,RabbitMQ)


一、Redis

1、簡介

Redis 與其他 key - value 緩存產品有以下三個特點:

  • Redis支持數據的持久化,可以將內存中的數據保存在磁盤中,重啟的時候可以再次加載進行使用。
  • Redis不僅僅支持簡單的key-value類型的數據,同時還提供list,set,zset,hash等數據結構的存儲。
  • Redis支持數據的備份,即master-slave模式的數據備份。

2、優勢

  • 性能極高 – Redis能讀的速度是110000次/s,寫的速度是81000次/s 。
  • 豐富的數據類型 – Redis支持二進制案例的 Strings, Lists, Hashes, Sets 及 Ordered Sets 數據類型操作。
  • 原子 – Redis的所有操作都是原子性的,意思就是要么成功執行要么失敗完全不執行。單個操作是原子性的。多個操作也支持事務,即原子性,通過MULTI和EXEC指令包起來。
  • 豐富的特性 – Redis還支持 publish/subscribe, 通知, key 過期等等特性。

3、Redis與其他key-value存儲有什么不同?

  • Redis有着更為復雜的數據結構並且提供對他們的原子性操作,這是一個不同於其他數據庫的進化路徑。Redis的數據類型都是基於基本數據結構的同時對程序員透明,無需進行額外的抽象。

  • Redis運行在內存中但是可以持久化到磁盤,所以在對不同數據集進行高速讀寫時需要權衡內存,因為數據量不能大於硬件內存。在內存數據庫方面的另一個優點是,相比在磁盤上相同的復雜的數據結構,在內存中操作起來非常簡單,這樣Redis可以做很多內部復雜性很強的事情。同時,在磁盤格式方面他們是緊湊的以追加的方式產生的,因為他們並不需要進行隨機訪問。

4、Redis 安裝

下載地址:https://github.com/MSOpenTech/redis/releases

Redis 支持 32 位和 64 位。這個需要根據你系統平台的實際情況選擇,這里我們下載 Redis-x64-xxx.zip壓縮包到 C 盤,解壓后,將文件夾重新命名為 redis

打開一個 cmd 窗口 使用cd命令切換目錄到 C:\redis 運行 redis-server.exe redis.windows.conf 。

如果想方便的話,可以把 redis 的路徑加到系統的環境變量里,這樣就省得再輸路徑了,后面的那個 redis.windows.conf 可以省略,如果省略,會啟用默認的。輸入之后,會顯示如下界面:

這時候另啟一個cmd窗口,原來的不要關閉,不然就無法訪問服務端了。

切換到redis目錄下運行 redis-cli.exe -h 127.0.0.1 -p 6379 。

設置鍵值對 set myKey abc

取出鍵值對 get myKey

5、安裝目錄的文件

redis-server : 服務器
redis-cli :命令行客戶端
redis-benchmark 性能工具測試
redis-check-aof ADF文件修復工具
redis-check-dump RDB文件檢測工具

redis.conf是redis的配置文件

將配置文件中的daemonize yes 以守護進程的方式來使用

cd到redis的安裝目錄下

啟動和停止

啟動 : redis-server 
停止:shutdown
命令返回值
1)狀態回復 pong 
set test "this is a test"

2)錯誤回復
(error) ERR unknown command 'testerror'

3)整數回復:
(integer) 4

4)字符串回復
get 'test' 

(nil) 代表空的結果

5)多行字符串回復
KEYS * 得到當前數據庫中的存在的鍵值名

 

6、Redis配置選項相關的內容

1>動態設置/獲取配置選項的值
		獲取
			CONFIG GET port
			1)"post"
			2)"6379"
		動態設置
			CONFIG GET warning
	2>Redis配置文件redis.conf選項相關
	
		---- 連接選項 ----
		
		port 6379 默認端口
		
		bind 127.0.01 默認綁定的主機地址
		
		timeout 0 客戶端閑置多久后自動關閉連接  0 代表沒有啟動這個選項
		
		loglevel notice 日志的記錄級別
			debug  	調試的很詳細的信息:適合開發和測試
			verbose 包含很多不太有用的信息
			notice	生產環境
			warning 警告信息
			
		logfile stdout 指定日志的記錄方式 默認是標准輸出
		
		database 16 設置默認數據庫的數量,默認是16個
			SETECT 1 選擇數據庫 默認編號是0
			
		----- 快照相關 ------
		
		多少秒有多少次改變將其同步到磁盤中的數據文件中
			save 900 1       900代表秒數  900秒內有一個更改就記錄到磁盤中
			save 300 10		 
			save 60 10000
		
 		rdbcompression yes   存儲本地數據庫是是否啟動壓縮, 默認yes
		
		dbfilename dump.db 指定本地數據庫的文件名

7、Redis的數據類型 

1》String類型

  1 一個鍵最多存儲512MB
  2         1》SET 設置key 對應的值為value
  3         
  4             語法:SET key value [EX seconds] [PX milliseconds] [NX|XX]
  5             
  6             EX seconds 設置key的過期時間 SET key value EX seconds == SETEX
  7             PX milliseconds 以毫秒的形式設置過期時間 SET key value PX milliseconds -- PSETEX
  8             NX :只有鍵不存在的時候才會設置成功 --SETNX
  9             XX :只有key已經存在的時候才可以設置
 10                     
 11             SET test16 'this is a test' EX 100
 12             SET test17 'this is a test17' PX 20000
 13             SET test18 'this is a test18' NX
 14             SET test18 'this is a test18888' XX
 15             
 16             SET test19 'this is a test19' EX 100 NX
 17             
 18             SET test20 'this is a test20' EX 100 PX 300000 NX
 19             
 20             
 21             注意: 如果key存在,同名會產生覆蓋
 22             
 23                 SET testStr1 "this is a test"
 24         
 25         2》GET key 根據key找到對應的值
 26         
 27             語法: GET key
 28             
 29             注意:如果key不存在,返回nil
 30                   如果key不是字符串,會報錯
 31               
 32         3》GETGANGE: 返回字符串中的一部分
 33             
 34             語法:GETRANGE key start end
 35             
 36             GETGANGET testStr2 0 4
 37             GETGANGET testStr2 0 -3
 38             GETGANGET testStr2 -4 -2
 39             GETGANGET testStr2 0 1000
 40         
 41         4》GETSET:設定指定可以的值,返回原來的值
 42             計數器的清零效果
 43         
 44             語法:GETSET key value
 45             SET testStr3 'king'
 46             
 47             GET testStr3
 48             
 49             GETSET testStr3 'queen' 
 50             
 51             注意:當key不存在返回nil 如果key不是字符串會報錯
 52         
 53         5》MSET 一次設置多個
 54             
 55             語法:MSET key value [key value...]
 56                 MSET testStr5 'king' testStr6 'sk' testStr7 'queen'
 57             
 58         6》MGET 一次獲得多個
 59         
 60             語法:MGET key key 
 61                 MGET testStr5 testStr6 testStr7
 62         
 63         7》STRLEN:獲取key的字符串的長度
 64             
 65             語法:STRLEN key
 66             
 67             注意:對於不存在的key獲取其長度返回的是0
 68             
 69         8》SETRANGE:相當於字符串替換的效果
 70             
 71             語法:8》SETRANGE key  offset value
 72             
 73             注意:如果設置的key原來的字符串長度要比偏移量小,就會以零字節(\x00)來填充
 74             
 75             SET testStr9 'hello king'
 76             
 77             SETRANGE testStr9 6 'queen'
 78             
 79             對於不存在的key使用SETRANGE
 80             
 81             EXISTS testStr10 檢測key是否存在
 82             
 83             SETRANGE testStr10 5 'king'
 84             
 85         9》SETNX 只有key不存在才能設置成功
 86         
 87             語法:SETNX key value 
 88             
 89         10》SETEX:key 設置key並且設置過期時間,以秒為單位
 90         
 91             語法:SETEX key  seconds value 原子性操作
 92                 TTL 得到鍵的生存周期
 93             
 94             注意:SETEX 是原子性操作,相當於執行了SET key value ,又對這個key設置了過期時間 EXPIRE key seconds
 95                 
 96             
 97             SET expireStr1 'test1'
 98             
 99             EXPIRE expireStr1 10
100             
101             SETEX test12 1000 'a'
102             
103             GET test12
104             
105         11》 MSETNX 一次設置多個key-value ,只有所有的key都不存在的時候才會成功
106         
107             語法 MSETNX key value [key value]
108             
109                 MSETNX test13 'a' test14 'b' test15 'c'
110         
111         12》PSETEX:以毫秒為單位設置key的生存周期
112             
113             語法:PSETEX key milliseconds value
114             
115                 PSETEX test16 2000 'hell0 king'
116         
117                 PTTL
118         13》INCR 對key中存儲的數字+1
119             
120             語法:INCR key
121             
122             SET counter 1
123                 
124             INCR counter
125             
126             注意:key如果不存在會先初始化為0,在進行INCR操作,
127                   對於不是數值的值會報錯
128                   
129         14》INCR BY : 將key中存儲的數字加上指定增量
130             
131             語法:INCRBY key INCREMENT
132                 
133                 SET counter2 10
134                 INCRBY counter2 5
135                 INCRBY counter2 1.2 不能用浮點數
136         15》INCRBYFLOAT : 給key中存儲的數字加上指定的浮點數
137             
138             語法:INCRBYFLOAT key increment
139 
140                 SET counter3 1
141                 
142                 INCRBYFLOAT counter3 1.2
143         
144         16》DECR:給將key中存儲的數字減1
145             
146             語法:DECR key
147         
148         17》DECYBY:將key中存儲的數值減去指定的值
149             
150             語法:DECRBY key decrement
151             
152         18》APPEND:將值追加到字符串的末尾
153             
154             語法:APPEND key value
string 類型

2》Hash類型

 1 Hash類型(散列表)
 2         在配置文件中可以通過配置
 3         hash-max-ziplist-entries 512 512個字節
 4         hash-max-ziplist-value 64 字段數目
 5         
 6         Hash相關命令
 7         1》HSET : 將哈希表key中域field設置成指定的value
 8         
 9             語法 : HSET key field value 
10             
11             HSET userInfo1 username 'king'
12             
13             HSET userInfo1 password '123456'
14             
15             HSET userInfo1 email '18368827317@163.com'
16             
17             HSET username 'queen'
18             
19             注意:如果哈希表中key中field不存在相當於新建field,設置成功返回1
20                   如果哈希表中key中field存在,相當於重新賦值,成功返回0
21         
22         2》HGET :返回哈希表中給定field的值
23         
24             語法:HGET key field
25             
26             HGET userInfo1 username
27             
28             注意:如果key中field不存在,返回的是nil
29             
30         3》HSETNX: 將哈希表中的field設定成指定的值,只有field不在的時候才能成功,如果filed存在,操作無效
31         
32             語法:HSETNX key filed value
33             
34             HSETNX testHash1 test 'a'
35             
36         4》    HMSET :通過將多個field-value設置到hash表key中
37             
38             語法:HMSET key value filed value ...
39             
40             HMSET userInfo2 username(域) 'king'(值) kickname 'freeyman'
41                 
42         5》HMGET : 一次獲得hash表key中多個filed的值
43             
44             語法:HMGET key field field
45             
46             注意:如果hash表中field不存在,會返回nil
47             
48         6》HGETALL:返回哈希表key中所有的field和value
49             
50             語法:HGETALL key 
51         
52         7》HKEYS:返回hash中key的所有的filed 
53             
54             語法: HKEYS key 
55             
56         8》HVALS :返回所有hash中key中field所有的對應的值
57             
58             語法:HVALS key 
59         
60         9》HEXISTS :檢測hash表中key的field是否存在
61         
62             語法:HEXISTS key field 
63             
64             HEXISTS userInfo2 username
65             
66             HEXISTS userInfo2 notExists
67             
68         10》HLEN: 返回hash表中keyfield的數量
69             
70             語法:HLEN key
71             
72         11》HINCRBY:給hash中key的field做增量操作
73             
74             語法:HINCRBY key field increment
75             
76         12》HINCRBYFLOAT:    給hash中key的field做增量操作 家浮點數
77             
78             語法:HINCRBYFLOAT key field increment
79                 
80             HSET userInfo3 salary '12343.341'
81             
82             HINCRBYFLOAT userInfo3 salary '0.214'
83             
84         13》HDEL :刪除hash中key的指定域,可以刪除一個也可以刪除多個
85             
86             語法:HDEL key field field
87             
88             HGETALL userInfo2 
89             
90             HDEL userInfo2 username 
Hash類型(散列表)

3》List類型

  1 雙向鏈表實現的兩邊的獲取速度快)
  2         
  3         1》LPUSH:向列表左端添加元素
  4             
  5             語法:LPUSH key value... 從左到右依次插入
  6             
  7             LPUSH myList1 a b c 
  8             
  9         2》RPUSH :向列表右端添加元素
 10             
 11             語法:RPUSH key value...
 12             
 13             RPUSH myList1 test1 test2 test3
 14             
 15         3》LPUSHX:向列表頭部添加元素,只有key存在在來添加 只能添加一個值,
 16         
 17             語法:LPUSHX key value
 18             
 19             LPUSH myList2 test4 
 20             
 21         4》RPUSHX:向列表尾部添加元素,只有key存在在來添加
 22             
 23             語法:RPUSHX key value
 24             
 25             RPUSH myList2 test4 
 26             
 27         5》LPOP :將列表頭部的元素彈出
 28             
 29             語法:LPOP myList1
 30             
 31         6》RPOP : 將列表尾部的元素彈出
 32         
 33             語法:RPOP myList1
 34         
 35         7》LLEN : 得到列表的長度
 36         
 37             語法:LLEN key 
 38         
 39         8》LRANGE : 獲取列表片段
 40             
 41             語法:LRANGE key start stop
 42         
 43             LRANGE myList1 0 -1
 44             
 45             注意:如果start下標比列表的最大的下標end大,返回空列表
 46                 如果stop比列表長度大,返回列表的末尾
 47             
 48         9》LREM:刪除列表中指定的值
 49             
 50             語法:LRME key count value
 51             
 52             count>0 從列表的頭開始,向尾部搜索,移除與value相等的元素,移動count個
 53             
 54             count<0 從列表的尾部向頭搜索,移除與value相等的元素,移除count個
 55             
 56             count=0 移除列表所有與count相等的值
 57             
 58             LPUSH myList3 a b c d a b c d b e f b g e b  
 59             
 60             LREM myList3 2 b 
 61             
 62             LREM myList3 -1 a 
 63             
 64             LREM myList3 0 e 
 65             
 66         10》LINDEX:獲取指定索引元素的值
 67         
 68             語法:LINDEX key index
 69             
 70             LINDEX myList3 3
 71             
 72             LINDEX myList3 -2
 73             
 74         11》LSET :設置指定索引元素的值
 75         
 76             LSET key index value
 77         
 78         12》LTRIM :只保留列表的片段
 79         
 80             語法:LTRIN key start stop
 81             
 82             LPUSH myList4 log1 log2 log3 log4 log5
 83             
 84             LRTIM myList4 0 1
 85             
 86             LPUSH myList4 a b c d e f g 
 87             
 88             LTRIM myList4 1 -1
 89             
 90             LTRIM myList4 1
 91         
 92         13》LINSERT 向列表插入元素    
 93             
 94             語法:LINSERT key BEFORE|AFTER pivot value 
 95             
 96             LPUSH myList6 a b c d
 97             
 98             LINSERT myList6 BEFORE "b" "king"
 99             
100             如果沒有成功返回-1 成功返回當前列表的長度 對於空列表返回0 不成功
101             
102         14》RPOPLPUSH:將一個元素從一個列表轉移到另一個列表(原子性操作)
103             
104             語法:RPOPLPUSH source destination 
105             
106                 RPOPLPUSH myList1 myList6 
107         
108         15》BLPOP:BLPOP是LPOP 的一個阻塞版本
109             
110             語法: BLPOP key [key...] timeout 
111             
112             LPUSH myList9 a b c 
113             
114             LPUSH myList10 d e f 
115             
116             BLPOP myList8 myList9 myList10 0
117             
118             BLPOP myList8 8 0
List類型

4》Set類型

 1 sns 和 博客系統 可以通過集合類型實現
 2         
 3         1》SADD :向集合中添加元素
 4             
 5             語法:SADD key member [,...]
 6             
 7             SADD web sunkai.clog.com
 8         
 9         2》SMEMBERS :返回指定集合中的元素
10             
11             語法:SMEMBERS key 
12             
13         3》SISMEMBER : 檢測member是否是集合中的成員
14         
15             語法:SISMEMBER key member 
16             
17         4》SREM :刪除
18         
19             語法:SREM key member
20             
21         5》SPOP :隨機刪除並返回集合中的刪除的元素
22             
23             語法:SPOP key
24         
25         6》SRANDMEMBER :隨機返回集合中的元素
26             
27             語法:SRANDMEMBER key counter
28             
29             注意:count 為正數,而且小於集合中的元素,返回的一個包含隨機元素的集合數組;count如果大於集合中的元素,這個時候會返回整個集合
30                   count 為負數,返回一個數組,數組中的成員可能出現重復,數組的長度是count取絕對值
31         
32         7》SDIFF 返回集合間的差集
33             
34             語法:SDIFF key key 
35             
36             SADD  couser2 java PHP js jq Python
37             
38             SADD  couser1 iOS anzhuo Python    
39                 
40             SDIFF couser2 couser1
41             
42         8》SINTER 返回集合間的交集
43             
44             語法:SINTER key key ...
45             
46             SINTER couser2 couser1
47             
48         9》SUNION :返回集合間的並集
49             
50             語法:SUNION key key
51                 
52         10》SCARD :返回集合的長度
53             
54             語法:SCARD
55         
56         11》SDIFFSTORE :講差集結果保存到指定集合中
57             
58             語法:SDIFFSTORE destination key key ...
59              
60             SDIFFSTORE diffSET couser2 couser1
61             
62         12》SINTERSTORE
63         
64         13》SUNIONSTORE
65         
66         14》SMOVE 將集合中的元素移動到另一個集合中(原子性操作)
67             
68             語法:SMOVE source destination member
Set集合類型(元素不可重復)

5》Zset(sorted set)有序集合類型

 1 1》ZADD :將元素及其分數添加到集合中
 2         
 3             語法: ZADD key score member [score member]
 4             
 5             ZADD PYTHONcourse 100 king
 6             
 7             ZADD PYTHONcourse 98 queen 98 test 78 test1
 8             
 9             ZADD PYTHONcourse +inf maxInt -inf minInx 正無窮大,負無窮大
10     
11         2》ZSCORE :獲得集合中的指定元素的分數
12             
13             語法:ZSCORE key member 
14             
15         3》ZRANGE :按照元素分數從小到大的順序返回指定索引start到stop之間的所有元素(包含兩端)
16             
17             語法:ZRANGE key start stop WITHSCORES 帶分數
18             
19             注意:當元素的兩個元素的分數相同的時候,redis在排序按照字典的順序排列
20             
21         4》ZREVRANGE: 和ZRANGE 相反,按照從大到小的順序返回
22             
23             語法:ZREVRANGE key start stop [WITHSCORES]
24             
25         5》ZRANGEBYSCORE :獲取指定分數范圍內的元素,按照從小到大的順序,返回的是分數在指定的min到max之間
26             
27             語法:ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
28             
29             獲得分數80~90之間的所有元素
30             
31             ZRANGEBYSCORE PYTHONcourse 80 90 
32             
33             ZRANGEBYSCORE PYTHONcourse 80 (90 不包含90 
34             
35             注意:通過左括號代表不包含端點
36             
37         6:ZREVRANGEBYSCORE 和上面的相反    
38             
39             語法:ZREVRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
40             
41         7》ZINCRBY : 操作某個元素的分數,返回操作之后的分數
42             
43             語法:ZINCRBY key increment member
44         
45         8》ZCARD :獲得集合中元素的數量
46             
47             語法:ZCARD key 
48         
49         9》ZCOUNT 獲得指定分數內元素的數量
50             
51             語法:ZCOUNT PYTHONcourse 80 90 
52             
53         10》ZREM :刪除一個或多個元素,返回刪除元素的個數
54             
55             語法:ZREM key member ...
56             
57         11》ZREMRANGEBYRANK: 按照排名范圍刪除元素,按照分數從小到大的順序刪除所有指定的排名范圍內的所有元素
58             
59             語法:ZREMRANGEBYRANK key start stop 
60         
61         12》ZREMRANGEBYSCORE:按照分數范圍內刪除元素
62             
63             語法:ZREMRANGEBYSCORE key min max 
64             
65         13》ZRANK :獲得指定元素的排名,根據分數從小到大的順序
66             
67             語法:ZRANK    key member 
68             
69         14》ZREVRANK:獲得指定元素的排名,根據分數從大到小的順序
70             
71             語法:ZREVRANK key member
72             
73         15》ZINTERSTORE: 計算有序集合的交集,並將結果保存起來
74             
75             語法:ZINTERSTORE destination numkeys key key ... WEIGHTS weight weight AGGREGATE [SUM|MIN|MAX]
76             
77                 ZADD testSortedSet1 1 a 2 b 3 c
78                 
79                 ZADD testSortedSet2 10 a 20 b 30 c
80                 
81                 ZINTERSTORE testSorted1 2 testSortedSet1 testSortedSet2
82                 
83                 ZRANGE testSorted1 0 -1 WITHSCORES 
84                 
85                 ZINTERSTORE testSorted2 2 testSortedSet1 testSortedSet2 AGGREGATE SUM
86                 
87                 ZINTERSTORE testSorted3 2 testSortedSet1 testSortedSet2 AGGREGATE MIN
88                 
89         16》ZUNIONSTORE    :計算有序集合並集,並將結果保存起來
90                 
91             語法:ZUNIONSTORE destination numkeys key key ... WEIGHTS weight weight AGGREGATE [SUM|MIN|MAX]
Zset(sorted set)有序集合類型  

未完待續。。。。。。  

二、RabbitMQ

1、簡介

RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。

MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。RabbitMQ可以,多個程序同時使用RabbitMQ ,但是必須隊列名稱不一樣。采用erlang語言,屬於愛立信公司開發的。

消息中間件 --->就是消息隊列

異步方式:不需要立馬得到結果,需要排隊

同步方式:需要實時獲得數據,堅決不能排隊

subprocess 的Q也提供不同進程之間的溝通

應用場景:

  電商秒殺活動

  搶購小米手機

  堡壘機批量發送文件

2、Centos6.x系統編譯安裝RabbitMQ

一、安裝erlang
依賴包:
yum -y install gcc ncurses ncurses-base ncurses-devel ncurses-libs ncurses-static ncurses-term ocaml-curses ocaml-curses-devel openssl-devel zlib-devel openssl-devel perl xz xmlto kernel-devel m4  這是一行


1、下載otp_src_19.3.tar.gz
2、tar xvf otp_src_19.3.tar.gz
3、cd opt_src_19.3.tar.gz
4、./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac
5、make && make install

5、配置erlang環境:

vi /etc/profile

export PATH=$PATH:/usr/local/erlang/bin

source /etc/profile # 環境變量重啟生效


二、安裝rabbitmq
1、下載rabbitmq-server-generic-unix-3.6.5.tar.xz
2、tar xvf rabbitmq-server-generic-unix-3.6.5.tar.xz
3、mv rabbitmq_server-3.6.5/ /usr/local/rabbitmq
4、啟動:
	#啟動rabbitmq服務
	/usr/local/rabbitmq/sbin/rabbitmq-server
	#后台啟動
	/usr/local/rabbitmq/sbin/rabbitmq-server -detached
	#關閉rabbitmq服務
	/usr/local/rabbitmq/sbin/rabbitmqctl stop
	或
	ps -ef | grep rabbit 和 kill -9 xxx  殺死服務

	#開啟插件管理頁面
	/usr/local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management

	#創建用戶
	/usr/local/rabbitmq/sbin/rabbitmqctl add_user rabbitadmin 123456
	/usr/local/rabbitmq/sbin/rabbitmqctl set_user_tags rabbitadmin administrator
       ./rabbitmqctl set_permissions -p / rabbitadmin ".*" ".*" ".*" 為這個用戶授權
        
    
5、登錄
	#WEB登錄
	http://10.10.3.63:15672  自己的IP地址
	用戶名:rabbitadmin
	密碼:123456

三、幾種隊列通信

1、實現最簡單的隊列通信

sender

import pika
# 認證
credentials = pika.PlainCredentials('rabbitadmin', '123456')  # 一定要認證
# 連接這台機器
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.14.38',credentials=credentials))    #主機IP 和驗證
channel = connection.channel() # 建立了rabbitmq的協議通道

# 聲明queue隊列
channel.queue_declare(queue='hello')

# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
# 發送消息
channel.basic_publish(exchange='', exchange表示交換器,能精確指定消息應該發送到哪個隊列,
                      routing_key='hello', # 
                      body='Hello World!')#routing_key設置為隊列的名稱,body就是發送的內容
print(" [x] Sent 'Hello World!'")
connection.close()
import pika
import time

credentials = pika.PlainCredentials('rabbitadmin', '123456')
# 連接這台機器
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.14.38',credentials=credentials))
channel = connection.channel() # 建立了rabbitmq的協議通道


channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):

    print("received msg...start processing....",body)
    time.sleep(20)
    print(" [x] msg process done....",body)


channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

2、Work Queues (一個發消息,兩個收消息,收消息是公平的依次分發)

在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差不多。

消息提供者代碼

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()
 
# 聲明queue
channel.queue_declare(queue='task_queue')
 
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
import sys
 
message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      )
                      )
print(" [x] Sent %r" % message)
connection.close()
import pika, time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()
 
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(20)
    print(" [x] Done")
    print("method.delivery_tag",method.delivery_tag)
    ch.basic_ack(delivery_tag=method.delivery_tag)
 
 
channel.basic_consume(callback,
                      queue='task_queue',
                      no_ack=True
                      )
 
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

此時,先啟動消息生產者,然后再分別啟動3個消費者,通過生產者多發送幾條消息,你會發現,這幾條消息會被依次分配到各個消費者身上  

3、消息持久化

雖然有了消息反饋機制,但是如果rabbitmq自身掛掉的話,那么任務還是會丟失。所以需要將任務持久化存儲起來。聲明持久化存儲:

將隊列(Queue)與消息(Message)都設置為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ消息不會丟失。但依然解決不了小概率丟失事件的發生(比如RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),如果需要對這種小概率事件也要管理起來,那么要用到事務。由於這里僅為RabbitMQ的簡單介紹,所以不講解RabbitMQ相關的事務。

import pika


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()  # 建立了rabbit 協議的通道

# durable=True 聲明持久化存儲
channel.queue_declare(queue='task_queue', durable=True)


channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body='Hello World!',
                      # 在發送任務的時候,用delivery_mode=2來標記消息為持久化存儲
                      properties=pika.BasicProperties(
                          delivery_mode=2,  
                      ))

print(" [x] Sent 'Hello World!'")
connection.close()

sender.py
import pika
import time


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)


def callback(ch, method, properties, body):
    print("received msg...start processing....",body)
    time.sleep(20)
    print(" [x] msg process done....", body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(
    callback,
    queue='task_queue',
    no_ack=False  # 默認為False
)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

4、消息公平分發

如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

channel.basic_qos(prefetch_count=1)

帶消息持久化+公平分發的完整代碼

生產者端

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

消費者端  

import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')
 
channel.start_consuming()

5、Publish\Subscribe(消息發布\訂閱) 

之前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue里,但有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了,

交流是一個非常簡單的事情。一方面它收到消息從生產者和另一邊推他們隊列。交換必須知道如何處理接收到的消息。應該是附加到一個特定的隊列嗎?應該是附加到多隊列?或者應該丟棄。交換的規則定義的類型。  

Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息


fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息
topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息

   表達式符號說明:#代表一個或多個字符,*代表任何字符
      例:#.a會匹配a.a,aa.a,aaa.a等
          *.a會匹配a.a,b.a,c.a等
     注:使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout 

headers: 通過headers 來決定把消息發給哪些queue

消息publisher

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs',
                         type='fanout')
 
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

消息subscriber

import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs',
                         type='fanout')
 
result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
queue_name = result.method.queue
 
channel.queue_bind(exchange='logs',
                   queue=queue_name)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

6、有選擇的接收消息(exchange type=direct)

RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
 
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

7、更細致的消息過濾

Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.

In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).

That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'.

 

topi: 意思是話題

To receive all the logs run:

python receive_logs_topic.py "#"  #綁定#號,就是收所有消息,相當於廣播

To receive all logs from the facility "kern":

python receive_logs_topic.py "kern.*"   #以kern開頭

Or if you want to hear only about "critical" logs:

python receive_logs_topic.py "*.critical"  #以critical結尾

You can create multiple bindings:

python receive_logs_topic.py "kern.*" "*.critical" #收kern開頭並且以critical結尾(相當於收兩個) 

And to emit a log with a routing key "kern.critical" type:

python emit_log_topic.py "kern.critical" "A critical kernel error" #發消息到kern.critical里,內容是:
A critical kernel error

示例:

rabbit_topic_send.py (生產者是發送端)

1 import pika
 2 import sys
 3 
 4 credentials = pika.PlainCredentials('nulige', '123456')
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6     host='192.168.1.118',credentials=credentials))
 7 
 8 channel = connection.channel()
 9 
10 channel.exchange_declare(exchange='topic_logs',type='topic') #指定類型
11 
12 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
13 
14 message = ' '.join(sys.argv[2:]) or 'Hello World!'  #消息
15 
16 channel.basic_publish(exchange='topic_logs',
17                       routing_key=routing_key,
18                       body=message)
19 print(" [x] Sent %r:%r" % (routing_key, message))
20 connection.close()

rabbit_topic_recv.py (消費者是接收端)單向的

 1 import pika
 2 import sys
 3 
 4 credentials = pika.PlainCredentials('nulige', '123456')
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6     host='192.168.1.118',credentials=credentials))
 7 
 8 channel = connection.channel()
 9 channel.exchange_declare(exchange='topic_logs',type='topic')
10 
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13 
14 binding_keys = sys.argv[1:]
15 if not binding_keys:
16     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
17     sys.exit(1)
18 
19 for binding_key in binding_keys:
20     channel.queue_bind(exchange='topic_logs',
21                        queue=queue_name,
22                        routing_key=binding_key)
23 
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25 
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28 
29 channel.basic_consume(callback,queue=queue_name)
30 
31 channel.start_consuming()

執行結果:  

 1 #接收端
 2 D:\python\day42>python3 rabbit_topic_recv.py error
 3  [*] Waiting for logs. To exit press CTRL+C
 4  [x] 'error':b'mysql has error'
 5 
 6 
 7 D:\python\day42>python3 rabbit_topic_recv.py *.warning mysql.*
 8  [*] Waiting for logs. To exit press CTRL+C
 9  [x] 'mysql.error':b'mysql has error'
10 
11 
12 D:\python\day42>python3 rabbit_topic_send.py mysql.info "mysql has error"
13  [x] Sent 'mysql.info':'mysql has error'
14 
15 
16 D:\python\day42>python3 rabbit_topic_recv.py *.error.*
17  [*] Waiting for logs. To exit press CTRL+C
18  [x] 'mysql.error.':b'mysql has error'
19 
20 
21 #發送端                                指定類型:error      消息內容
22 D:\python\day42>python3 rabbit_topic_send.py error "mysql has error"
23  [x] Sent 'error':'mysql has error'
24 
25 
26 D:\python\day42>python3 rabbit_topic_send.py mysql.error "mysql has error"
27  [x] Sent 'mysql.error':'mysql has error'
28  [x] 'mysql.info':b'mysql has error'
29 
30 
31 D:\python\day42>python3 rabbit_topic_send.py mysql.error. "mysql has error"
32  [x] Sent 'mysql.error.':'mysql has error'

8、Remote procedure call (RPC) 雙向的

To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which sends an RPC request and blocks until the answer is received:

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)  
 

應用場景:

示例:實現RPC服務功能

rabbit_rpc_send.py(生產者是發送端)

 1 import pika
 2 import uuid
 3 
 4 class SSHRpcClient(object):
 5     def __init__(self):
 6         credentials = pika.PlainCredentials('nulige', '123456')
 7         self.connection = pika.BlockingConnection(pika.ConnectionParameters(
 8                             host='192.168.1.118',credentials=credentials))
 9 
10         self.channel = self.connection.channel()
11 
12         result = self.channel.queue_declare(exclusive=True) #客戶端的結果必須要返回到這個queue
13         self.callback_queue = result.method.queue
14 
15         self.channel.basic_consume(self.on_response,queue=self.callback_queue) #聲明從這個queue里收結果
16 
17     def on_response(self, ch, method, props, body):
18         if self.corr_id == props.correlation_id: #任務標識符
19             self.response = body
20             print(body)
21 
22     # 返回的結果,放在callback_queue中
23     def call(self, n):
24         self.response = None
25         self.corr_id = str(uuid.uuid4()) #唯一標識符
26         self.channel.basic_publish(exchange='',
27                                    routing_key='rpc_queue3',  #聲明一個Q
28                                    properties=pika.BasicProperties(
29                                        reply_to=self.callback_queue,
30                                        correlation_id=self.corr_id,
31                                    ),
32                                    body=str(n))
33 
34         print("start waiting for cmd result ")
35         count = 0
36         while self.response is None: #如果命令沒返回結果
37             print("loop ",count)
38             count +=1
39             self.connection.process_data_events() #以不阻塞的形式去檢測有沒有新事件
40             #如果沒事件,那就什么也不做, 如果有事件,就觸發on_response事件
41         return self.response
42 
43 ssh_rpc = SSHRpcClient()
44 
45 print(" [x] sending cmd")
46 response = ssh_rpc.call("ipconfig")
47 
48 print(" [.] Got result ")
49 print(response.decode("gbk"))
rabbit_rpc_recv.py(消費端是接收端)

 1 import pika
 2 import time
 3 import subprocess
 4 
 5 credentials = pika.PlainCredentials('nulige', '123456')
 6 connection = pika.BlockingConnection(pika.ConnectionParameters(
 7     host='192.168.1.118', credentials=credentials))
 8 
 9 channel = connection.channel()
10 channel.queue_declare(queue='rpc_queue3')
11 
12 def SSHRPCServer(cmd):
13 
14     print("recv cmd:",cmd)
15     cmd_obj = subprocess.Popen(cmd.decode(),shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
16 
17     result = cmd_obj.stdout.read() or cmd_obj.stderr.read()
18     return result
19 
20 def on_request(ch, method, props, body):
21 
22     print(" [.] fib(%s)" % body)
23     response = SSHRPCServer(body)
24 
25     ch.basic_publish(exchange='',
26                      routing_key=props.reply_to,
27                      properties=pika.BasicProperties(correlation_id= \
28                                                          props.correlation_id),
29                      body=response)
30 
31 channel.basic_consume(on_request, queue='rpc_queue3')
32 print(" [x] Awaiting RPC requests")
33 channel.start_consuming()

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM