一、Redis
1、簡介
Redis 與其他 key - value 緩存產品有以下三個特點:
- Redis支持數據的持久化,可以將內存中的數據保存在磁盤中,重啟的時候可以再次加載進行使用。
- Redis不僅僅支持簡單的key-value類型的數據,同時還提供list,set,zset,hash等數據結構的存儲。
- Redis支持數據的備份,即master-slave模式的數據備份。
2、優勢
- 性能極高 – Redis能讀的速度是110000次/s,寫的速度是81000次/s 。
- 豐富的數據類型 – Redis支持二進制案例的 Strings, Lists, Hashes, Sets 及 Ordered Sets 數據類型操作。
- 原子 – Redis的所有操作都是原子性的,意思就是要么成功執行要么失敗完全不執行。單個操作是原子性的。多個操作也支持事務,即原子性,通過MULTI和EXEC指令包起來。
- 豐富的特性 – Redis還支持 publish/subscribe, 通知, key 過期等等特性。
3、Redis與其他key-value存儲有什么不同?
-
Redis有着更為復雜的數據結構並且提供對他們的原子性操作,這是一個不同於其他數據庫的進化路徑。Redis的數據類型都是基於基本數據結構的同時對程序員透明,無需進行額外的抽象。
-
Redis運行在內存中但是可以持久化到磁盤,所以在對不同數據集進行高速讀寫時需要權衡內存,因為數據量不能大於硬件內存。在內存數據庫方面的另一個優點是,相比在磁盤上相同的復雜的數據結構,在內存中操作起來非常簡單,這樣Redis可以做很多內部復雜性很強的事情。同時,在磁盤格式方面他們是緊湊的以追加的方式產生的,因為他們並不需要進行隨機訪問。
4、Redis 安裝
下載地址:https://github.com/MSOpenTech/redis/releases。
Redis 支持 32 位和 64 位。這個需要根據你系統平台的實際情況選擇,這里我們下載 Redis-x64-xxx.zip壓縮包到 C 盤,解壓后,將文件夾重新命名為 redis
打開一個 cmd 窗口 使用cd命令切換目錄到 C:\redis 運行 redis-server.exe redis.windows.conf 。
如果想方便的話,可以把 redis 的路徑加到系統的環境變量里,這樣就省得再輸路徑了,后面的那個 redis.windows.conf 可以省略,如果省略,會啟用默認的。輸入之后,會顯示如下界面:
這時候另啟一個cmd窗口,原來的不要關閉,不然就無法訪問服務端了。
切換到redis目錄下運行 redis-cli.exe -h 127.0.0.1 -p 6379 。
設置鍵值對 set myKey abc
取出鍵值對 get myKey
5、安裝目錄的文件
redis-server : 服務器
redis-cli :命令行客戶端
redis-benchmark 性能工具測試
redis-check-aof ADF文件修復工具
redis-check-dump RDB文件檢測工具
redis.conf是redis的配置文件
將配置文件中的daemonize yes 以守護進程的方式來使用 cd到redis的安裝目錄下 啟動和停止 啟動 : redis-server 停止:shutdown 命令返回值 1)狀態回復 pong set test "this is a test" 2)錯誤回復 (error) ERR unknown command 'testerror' 3)整數回復: (integer) 4 4)字符串回復 get 'test' (nil) 代表空的結果 5)多行字符串回復 KEYS * 得到當前數據庫中的存在的鍵值名
6、Redis配置選項相關的內容
1>動態設置/獲取配置選項的值 獲取 CONFIG GET port 1)"post" 2)"6379" 動態設置 CONFIG GET warning 2>Redis配置文件redis.conf選項相關 ---- 連接選項 ---- port 6379 默認端口 bind 127.0.01 默認綁定的主機地址 timeout 0 客戶端閑置多久后自動關閉連接 0 代表沒有啟動這個選項 loglevel notice 日志的記錄級別 debug 調試的很詳細的信息:適合開發和測試 verbose 包含很多不太有用的信息 notice 生產環境 warning 警告信息 logfile stdout 指定日志的記錄方式 默認是標准輸出 database 16 設置默認數據庫的數量,默認是16個 SETECT 1 選擇數據庫 默認編號是0 ----- 快照相關 ------ 多少秒有多少次改變將其同步到磁盤中的數據文件中 save 900 1 900代表秒數 900秒內有一個更改就記錄到磁盤中 save 300 10 save 60 10000 rdbcompression yes 存儲本地數據庫是是否啟動壓縮, 默認yes dbfilename dump.db 指定本地數據庫的文件名
7、Redis的數據類型
1》String類型

1 一個鍵最多存儲512MB 2 1》SET 設置key 對應的值為value 3 4 語法:SET key value [EX seconds] [PX milliseconds] [NX|XX] 5 6 EX seconds 設置key的過期時間 SET key value EX seconds == SETEX 7 PX milliseconds 以毫秒的形式設置過期時間 SET key value PX milliseconds -- PSETEX 8 NX :只有鍵不存在的時候才會設置成功 --SETNX 9 XX :只有key已經存在的時候才可以設置 10 11 SET test16 'this is a test' EX 100 12 SET test17 'this is a test17' PX 20000 13 SET test18 'this is a test18' NX 14 SET test18 'this is a test18888' XX 15 16 SET test19 'this is a test19' EX 100 NX 17 18 SET test20 'this is a test20' EX 100 PX 300000 NX 19 20 21 注意: 如果key存在,同名會產生覆蓋 22 23 SET testStr1 "this is a test" 24 25 2》GET key 根據key找到對應的值 26 27 語法: GET key 28 29 注意:如果key不存在,返回nil 30 如果key不是字符串,會報錯 31 32 3》GETGANGE: 返回字符串中的一部分 33 34 語法:GETRANGE key start end 35 36 GETGANGET testStr2 0 4 37 GETGANGET testStr2 0 -3 38 GETGANGET testStr2 -4 -2 39 GETGANGET testStr2 0 1000 40 41 4》GETSET:設定指定可以的值,返回原來的值 42 計數器的清零效果 43 44 語法:GETSET key value 45 SET testStr3 'king' 46 47 GET testStr3 48 49 GETSET testStr3 'queen' 50 51 注意:當key不存在返回nil 如果key不是字符串會報錯 52 53 5》MSET 一次設置多個 54 55 語法:MSET key value [key value...] 56 MSET testStr5 'king' testStr6 'sk' testStr7 'queen' 57 58 6》MGET 一次獲得多個 59 60 語法:MGET key key 61 MGET testStr5 testStr6 testStr7 62 63 7》STRLEN:獲取key的字符串的長度 64 65 語法:STRLEN key 66 67 注意:對於不存在的key獲取其長度返回的是0 68 69 8》SETRANGE:相當於字符串替換的效果 70 71 語法:8》SETRANGE key offset value 72 73 注意:如果設置的key原來的字符串長度要比偏移量小,就會以零字節(\x00)來填充 74 75 SET testStr9 'hello king' 76 77 SETRANGE testStr9 6 'queen' 78 79 對於不存在的key使用SETRANGE 80 81 EXISTS testStr10 檢測key是否存在 82 83 SETRANGE testStr10 5 'king' 84 85 9》SETNX 只有key不存在才能設置成功 86 87 語法:SETNX key value 88 89 10》SETEX:key 設置key並且設置過期時間,以秒為單位 90 91 語法:SETEX key seconds value 原子性操作 92 TTL 得到鍵的生存周期 93 94 注意:SETEX 是原子性操作,相當於執行了SET key value ,又對這個key設置了過期時間 EXPIRE key seconds 95 96 97 SET expireStr1 'test1' 98 99 EXPIRE expireStr1 10 100 101 SETEX test12 1000 'a' 102 103 GET test12 104 105 11》 MSETNX 一次設置多個key-value ,只有所有的key都不存在的時候才會成功 106 107 語法 MSETNX key value [key value] 108 109 MSETNX test13 'a' test14 'b' test15 'c' 110 111 12》PSETEX:以毫秒為單位設置key的生存周期 112 113 語法:PSETEX key milliseconds value 114 115 PSETEX test16 2000 'hell0 king' 116 117 PTTL 118 13》INCR 對key中存儲的數字+1 119 120 語法:INCR key 121 122 SET counter 1 123 124 INCR counter 125 126 注意:key如果不存在會先初始化為0,在進行INCR操作, 127 對於不是數值的值會報錯 128 129 14》INCR BY : 將key中存儲的數字加上指定增量 130 131 語法:INCRBY key INCREMENT 132 133 SET counter2 10 134 INCRBY counter2 5 135 INCRBY counter2 1.2 不能用浮點數 136 15》INCRBYFLOAT : 給key中存儲的數字加上指定的浮點數 137 138 語法:INCRBYFLOAT key increment 139 140 SET counter3 1 141 142 INCRBYFLOAT counter3 1.2 143 144 16》DECR:給將key中存儲的數字減1 145 146 語法:DECR key 147 148 17》DECYBY:將key中存儲的數值減去指定的值 149 150 語法:DECRBY key decrement 151 152 18》APPEND:將值追加到字符串的末尾 153 154 語法:APPEND key value
2》Hash類型

1 Hash類型(散列表) 2 在配置文件中可以通過配置 3 hash-max-ziplist-entries 512 512個字節 4 hash-max-ziplist-value 64 字段數目 5 6 Hash相關命令 7 1》HSET : 將哈希表key中域field設置成指定的value 8 9 語法 : HSET key field value 10 11 HSET userInfo1 username 'king' 12 13 HSET userInfo1 password '123456' 14 15 HSET userInfo1 email '18368827317@163.com' 16 17 HSET username 'queen' 18 19 注意:如果哈希表中key中field不存在相當於新建field,設置成功返回1 20 如果哈希表中key中field存在,相當於重新賦值,成功返回0 21 22 2》HGET :返回哈希表中給定field的值 23 24 語法:HGET key field 25 26 HGET userInfo1 username 27 28 注意:如果key中field不存在,返回的是nil 29 30 3》HSETNX: 將哈希表中的field設定成指定的值,只有field不在的時候才能成功,如果filed存在,操作無效 31 32 語法:HSETNX key filed value 33 34 HSETNX testHash1 test 'a' 35 36 4》 HMSET :通過將多個field-value設置到hash表key中 37 38 語法:HMSET key value filed value ... 39 40 HMSET userInfo2 username(域) 'king'(值) kickname 'freeyman' 41 42 5》HMGET : 一次獲得hash表key中多個filed的值 43 44 語法:HMGET key field field 45 46 注意:如果hash表中field不存在,會返回nil 47 48 6》HGETALL:返回哈希表key中所有的field和value 49 50 語法:HGETALL key 51 52 7》HKEYS:返回hash中key的所有的filed 53 54 語法: HKEYS key 55 56 8》HVALS :返回所有hash中key中field所有的對應的值 57 58 語法:HVALS key 59 60 9》HEXISTS :檢測hash表中key的field是否存在 61 62 語法:HEXISTS key field 63 64 HEXISTS userInfo2 username 65 66 HEXISTS userInfo2 notExists 67 68 10》HLEN: 返回hash表中keyfield的數量 69 70 語法:HLEN key 71 72 11》HINCRBY:給hash中key的field做增量操作 73 74 語法:HINCRBY key field increment 75 76 12》HINCRBYFLOAT: 給hash中key的field做增量操作 家浮點數 77 78 語法:HINCRBYFLOAT key field increment 79 80 HSET userInfo3 salary '12343.341' 81 82 HINCRBYFLOAT userInfo3 salary '0.214' 83 84 13》HDEL :刪除hash中key的指定域,可以刪除一個也可以刪除多個 85 86 語法:HDEL key field field 87 88 HGETALL userInfo2 89 90 HDEL userInfo2 username
3》List類型

1 雙向鏈表實現的兩邊的獲取速度快) 2 3 1》LPUSH:向列表左端添加元素 4 5 語法:LPUSH key value... 從左到右依次插入 6 7 LPUSH myList1 a b c 8 9 2》RPUSH :向列表右端添加元素 10 11 語法:RPUSH key value... 12 13 RPUSH myList1 test1 test2 test3 14 15 3》LPUSHX:向列表頭部添加元素,只有key存在在來添加 只能添加一個值, 16 17 語法:LPUSHX key value 18 19 LPUSH myList2 test4 20 21 4》RPUSHX:向列表尾部添加元素,只有key存在在來添加 22 23 語法:RPUSHX key value 24 25 RPUSH myList2 test4 26 27 5》LPOP :將列表頭部的元素彈出 28 29 語法:LPOP myList1 30 31 6》RPOP : 將列表尾部的元素彈出 32 33 語法:RPOP myList1 34 35 7》LLEN : 得到列表的長度 36 37 語法:LLEN key 38 39 8》LRANGE : 獲取列表片段 40 41 語法:LRANGE key start stop 42 43 LRANGE myList1 0 -1 44 45 注意:如果start下標比列表的最大的下標end大,返回空列表 46 如果stop比列表長度大,返回列表的末尾 47 48 9》LREM:刪除列表中指定的值 49 50 語法:LRME key count value 51 52 count>0 從列表的頭開始,向尾部搜索,移除與value相等的元素,移動count個 53 54 count<0 從列表的尾部向頭搜索,移除與value相等的元素,移除count個 55 56 count=0 移除列表所有與count相等的值 57 58 LPUSH myList3 a b c d a b c d b e f b g e b 59 60 LREM myList3 2 b 61 62 LREM myList3 -1 a 63 64 LREM myList3 0 e 65 66 10》LINDEX:獲取指定索引元素的值 67 68 語法:LINDEX key index 69 70 LINDEX myList3 3 71 72 LINDEX myList3 -2 73 74 11》LSET :設置指定索引元素的值 75 76 LSET key index value 77 78 12》LTRIM :只保留列表的片段 79 80 語法:LTRIN key start stop 81 82 LPUSH myList4 log1 log2 log3 log4 log5 83 84 LRTIM myList4 0 1 85 86 LPUSH myList4 a b c d e f g 87 88 LTRIM myList4 1 -1 89 90 LTRIM myList4 1 91 92 13》LINSERT 向列表插入元素 93 94 語法:LINSERT key BEFORE|AFTER pivot value 95 96 LPUSH myList6 a b c d 97 98 LINSERT myList6 BEFORE "b" "king" 99 100 如果沒有成功返回-1 成功返回當前列表的長度 對於空列表返回0 不成功 101 102 14》RPOPLPUSH:將一個元素從一個列表轉移到另一個列表(原子性操作) 103 104 語法:RPOPLPUSH source destination 105 106 RPOPLPUSH myList1 myList6 107 108 15》BLPOP:BLPOP是LPOP 的一個阻塞版本 109 110 語法: BLPOP key [key...] timeout 111 112 LPUSH myList9 a b c 113 114 LPUSH myList10 d e f 115 116 BLPOP myList8 myList9 myList10 0 117 118 BLPOP myList8 8 0
4》Set類型

1 sns 和 博客系統 可以通過集合類型實現 2 3 1》SADD :向集合中添加元素 4 5 語法:SADD key member [,...] 6 7 SADD web sunkai.clog.com 8 9 2》SMEMBERS :返回指定集合中的元素 10 11 語法:SMEMBERS key 12 13 3》SISMEMBER : 檢測member是否是集合中的成員 14 15 語法:SISMEMBER key member 16 17 4》SREM :刪除 18 19 語法:SREM key member 20 21 5》SPOP :隨機刪除並返回集合中的刪除的元素 22 23 語法:SPOP key 24 25 6》SRANDMEMBER :隨機返回集合中的元素 26 27 語法:SRANDMEMBER key counter 28 29 注意:count 為正數,而且小於集合中的元素,返回的一個包含隨機元素的集合數組;count如果大於集合中的元素,這個時候會返回整個集合 30 count 為負數,返回一個數組,數組中的成員可能出現重復,數組的長度是count取絕對值 31 32 7》SDIFF 返回集合間的差集 33 34 語法:SDIFF key key 35 36 SADD couser2 java PHP js jq Python 37 38 SADD couser1 iOS anzhuo Python 39 40 SDIFF couser2 couser1 41 42 8》SINTER 返回集合間的交集 43 44 語法:SINTER key key ... 45 46 SINTER couser2 couser1 47 48 9》SUNION :返回集合間的並集 49 50 語法:SUNION key key 51 52 10》SCARD :返回集合的長度 53 54 語法:SCARD 55 56 11》SDIFFSTORE :講差集結果保存到指定集合中 57 58 語法:SDIFFSTORE destination key key ... 59 60 SDIFFSTORE diffSET couser2 couser1 61 62 12》SINTERSTORE 63 64 13》SUNIONSTORE 65 66 14》SMOVE 將集合中的元素移動到另一個集合中(原子性操作) 67 68 語法:SMOVE source destination member
5》Zset(sorted set)有序集合類型

1 1》ZADD :將元素及其分數添加到集合中 2 3 語法: ZADD key score member [score member] 4 5 ZADD PYTHONcourse 100 king 6 7 ZADD PYTHONcourse 98 queen 98 test 78 test1 8 9 ZADD PYTHONcourse +inf maxInt -inf minInx 正無窮大,負無窮大 10 11 2》ZSCORE :獲得集合中的指定元素的分數 12 13 語法:ZSCORE key member 14 15 3》ZRANGE :按照元素分數從小到大的順序返回指定索引start到stop之間的所有元素(包含兩端) 16 17 語法:ZRANGE key start stop WITHSCORES 帶分數 18 19 注意:當元素的兩個元素的分數相同的時候,redis在排序按照字典的順序排列 20 21 4》ZREVRANGE: 和ZRANGE 相反,按照從大到小的順序返回 22 23 語法:ZREVRANGE key start stop [WITHSCORES] 24 25 5》ZRANGEBYSCORE :獲取指定分數范圍內的元素,按照從小到大的順序,返回的是分數在指定的min到max之間 26 27 語法:ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count] 28 29 獲得分數80~90之間的所有元素 30 31 ZRANGEBYSCORE PYTHONcourse 80 90 32 33 ZRANGEBYSCORE PYTHONcourse 80 (90 不包含90 34 35 注意:通過左括號代表不包含端點 36 37 6:ZREVRANGEBYSCORE 和上面的相反 38 39 語法:ZREVRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count] 40 41 7》ZINCRBY : 操作某個元素的分數,返回操作之后的分數 42 43 語法:ZINCRBY key increment member 44 45 8》ZCARD :獲得集合中元素的數量 46 47 語法:ZCARD key 48 49 9》ZCOUNT 獲得指定分數內元素的數量 50 51 語法:ZCOUNT PYTHONcourse 80 90 52 53 10》ZREM :刪除一個或多個元素,返回刪除元素的個數 54 55 語法:ZREM key member ... 56 57 11》ZREMRANGEBYRANK: 按照排名范圍刪除元素,按照分數從小到大的順序刪除所有指定的排名范圍內的所有元素 58 59 語法:ZREMRANGEBYRANK key start stop 60 61 12》ZREMRANGEBYSCORE:按照分數范圍內刪除元素 62 63 語法:ZREMRANGEBYSCORE key min max 64 65 13》ZRANK :獲得指定元素的排名,根據分數從小到大的順序 66 67 語法:ZRANK key member 68 69 14》ZREVRANK:獲得指定元素的排名,根據分數從大到小的順序 70 71 語法:ZREVRANK key member 72 73 15》ZINTERSTORE: 計算有序集合的交集,並將結果保存起來 74 75 語法:ZINTERSTORE destination numkeys key key ... WEIGHTS weight weight AGGREGATE [SUM|MIN|MAX] 76 77 ZADD testSortedSet1 1 a 2 b 3 c 78 79 ZADD testSortedSet2 10 a 20 b 30 c 80 81 ZINTERSTORE testSorted1 2 testSortedSet1 testSortedSet2 82 83 ZRANGE testSorted1 0 -1 WITHSCORES 84 85 ZINTERSTORE testSorted2 2 testSortedSet1 testSortedSet2 AGGREGATE SUM 86 87 ZINTERSTORE testSorted3 2 testSortedSet1 testSortedSet2 AGGREGATE MIN 88 89 16》ZUNIONSTORE :計算有序集合並集,並將結果保存起來 90 91 語法:ZUNIONSTORE destination numkeys key key ... WEIGHTS weight weight AGGREGATE [SUM|MIN|MAX]
未完待續。。。。。。
二、RabbitMQ
1、簡介
RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。RabbitMQ可以,多個程序同時使用RabbitMQ ,但是必須隊列名稱不一樣。采用erlang語言,屬於愛立信公司開發的。
消息中間件 --->就是消息隊列
異步方式:不需要立馬得到結果,需要排隊
同步方式:需要實時獲得數據,堅決不能排隊
subprocess 的Q也提供不同進程之間的溝通
應用場景:
電商秒殺活動
搶購小米手機
堡壘機批量發送文件
2、Centos6.x系統編譯安裝RabbitMQ
一、安裝erlang 依賴包: yum -y install gcc ncurses ncurses-base ncurses-devel ncurses-libs ncurses-static ncurses-term ocaml-curses ocaml-curses-devel openssl-devel zlib-devel openssl-devel perl xz xmlto kernel-devel m4 這是一行 1、下載otp_src_19.3.tar.gz 2、tar xvf otp_src_19.3.tar.gz 3、cd opt_src_19.3.tar.gz 4、./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac 5、make && make install 5、配置erlang環境: vi /etc/profile export PATH=$PATH:/usr/local/erlang/bin source /etc/profile # 環境變量重啟生效 二、安裝rabbitmq 1、下載rabbitmq-server-generic-unix-3.6.5.tar.xz 2、tar xvf rabbitmq-server-generic-unix-3.6.5.tar.xz 3、mv rabbitmq_server-3.6.5/ /usr/local/rabbitmq 4、啟動: #啟動rabbitmq服務 /usr/local/rabbitmq/sbin/rabbitmq-server #后台啟動 /usr/local/rabbitmq/sbin/rabbitmq-server -detached #關閉rabbitmq服務 /usr/local/rabbitmq/sbin/rabbitmqctl stop 或 ps -ef | grep rabbit 和 kill -9 xxx 殺死服務 #開啟插件管理頁面 /usr/local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management #創建用戶 /usr/local/rabbitmq/sbin/rabbitmqctl add_user rabbitadmin 123456 /usr/local/rabbitmq/sbin/rabbitmqctl set_user_tags rabbitadmin administrator ./rabbitmqctl set_permissions -p / rabbitadmin ".*" ".*" ".*" 為這個用戶授權 5、登錄 #WEB登錄 http://10.10.3.63:15672 自己的IP地址 用戶名:rabbitadmin 密碼:123456
三、幾種隊列通信
1、實現最簡單的隊列通信
sender
import pika
# 認證
credentials = pika.PlainCredentials('rabbitadmin', '123456') # 一定要認證
# 連接這台機器
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.14.38',credentials=credentials)) #主機IP 和驗證
channel = connection.channel() # 建立了rabbitmq的協議通道
# 聲明queue隊列
channel.queue_declare(queue='hello')
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
# 發送消息
channel.basic_publish(exchange='', exchange表示交換器,能精確指定消息應該發送到哪個隊列,
routing_key='hello', #
body='Hello World!')#routing_key設置為隊列的名稱,body就是發送的內容
print(" [x] Sent 'Hello World!'")
connection.close()
import pika import time credentials = pika.PlainCredentials('rabbitadmin', '123456') # 連接這台機器 connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.14.38',credentials=credentials)) channel = connection.channel() # 建立了rabbitmq的協議通道 channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print("received msg...start processing....",body) time.sleep(20) print(" [x] msg process done....",body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
2、Work Queues (一個發消息,兩個收消息,收消息是公平的依次分發)
在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差不多。
消息提供者代碼
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() # 聲明queue channel.queue_declare(queue='task_queue') # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. import sys message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time() channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent ) ) print(" [x] Sent %r" % message) connection.close()
import pika, time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(20) print(" [x] Done") print("method.delivery_tag",method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue='task_queue', no_ack=True ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
此時,先啟動消息生產者,然后再分別啟動3個消費者,通過生產者多發送幾條消息,你會發現,這幾條消息會被依次分配到各個消費者身上
3、消息持久化
雖然有了消息反饋機制,但是如果rabbitmq自身掛掉的話,那么任務還是會丟失。所以需要將任務持久化存儲起來。聲明持久化存儲:
將隊列(Queue)與消息(Message)都設置為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ消息不會丟失。但依然解決不了小概率丟失事件的發生(比如RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),如果需要對這種小概率事件也要管理起來,那么要用到事務。由於這里僅為RabbitMQ的簡單介紹,所以不講解RabbitMQ相關的事務。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 建立了rabbit 協議的通道 # durable=True 聲明持久化存儲 channel.queue_declare(queue='task_queue', durable=True) channel.basic_publish(exchange='', routing_key='task_queue', body='Hello World!', # 在發送任務的時候,用delivery_mode=2來標記消息為持久化存儲 properties=pika.BasicProperties( delivery_mode=2, )) print(" [x] Sent 'Hello World!'") connection.close() sender.py
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) def callback(ch, method, properties, body): print("received msg...start processing....",body) time.sleep(20) print(" [x] msg process done....", body) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume( callback, queue='task_queue', no_ack=False # 默認為False ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
4、消息公平分發
如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
channel.basic_qos(prefetch_count
=
1
)
帶消息持久化+公平分發的完整代碼
生產者端
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
消費者端
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
5、Publish\Subscribe(消息發布\訂閱)
之前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue里,但有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了,
交流是一個非常簡單的事情。一方面它收到消息從生產者和另一邊推他們隊列。交換必須知道如何處理接收到的消息。應該是附加到一個特定的隊列嗎?應該是附加到多隊列?或者應該丟棄。交換的規則定義的類型。
Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息
topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
表達式符號說明:#代表一個或多個字符,*代表任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout
headers: 通過headers 來決定把消息發給哪些queue
消息publisher
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
消息subscriber
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除 queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
6、有選擇的接收消息(exchange type=direct)
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
7、更細致的消息過濾
Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.
In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).
That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'.
topi: 意思是話題
To receive all the logs run:
python receive_logs_topic.py "#" #綁定#號,就是收所有消息,相當於廣播
To receive all logs from the facility "kern":
python receive_logs_topic.py "kern.*" #以kern開頭
Or if you want to hear only about "critical" logs:
python receive_logs_topic.py "*.critical" #以critical結尾
You can create multiple bindings:
python receive_logs_topic.py "kern.*" "*.critical" #收kern開頭並且以critical結尾(相當於收兩個)
And to emit a log with a routing key "kern.critical" type:
python emit_log_topic.py "kern.critical" "A critical kernel error" #發消息到kern.critical里,內容是:
A critical kernel error
示例:
rabbit_topic_send.py (生產者是發送端)
1 import pika 2 import sys 3 4 credentials = pika.PlainCredentials('nulige', '123456') 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='192.168.1.118',credentials=credentials)) 7 8 channel = connection.channel() 9 10 channel.exchange_declare(exchange='topic_logs',type='topic') #指定類型 11 12 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' 13 14 message = ' '.join(sys.argv[2:]) or 'Hello World!' #消息 15 16 channel.basic_publish(exchange='topic_logs', 17 routing_key=routing_key, 18 body=message) 19 print(" [x] Sent %r:%r" % (routing_key, message)) 20 connection.close()
rabbit_topic_recv.py (消費者是接收端)單向的
1 import pika 2 import sys 3 4 credentials = pika.PlainCredentials('nulige', '123456') 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='192.168.1.118',credentials=credentials)) 7 8 channel = connection.channel() 9 channel.exchange_declare(exchange='topic_logs',type='topic') 10 11 result = channel.queue_declare(exclusive=True) 12 queue_name = result.method.queue 13 14 binding_keys = sys.argv[1:] 15 if not binding_keys: 16 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 17 sys.exit(1) 18 19 for binding_key in binding_keys: 20 channel.queue_bind(exchange='topic_logs', 21 queue=queue_name, 22 routing_key=binding_key) 23 24 print(' [*] Waiting for logs. To exit press CTRL+C') 25 26 def callback(ch, method, properties, body): 27 print(" [x] %r:%r" % (method.routing_key, body)) 28 29 channel.basic_consume(callback,queue=queue_name) 30 31 channel.start_consuming()
執行結果:
1 #接收端 2 D:\python\day42>python3 rabbit_topic_recv.py error 3 [*] Waiting for logs. To exit press CTRL+C 4 [x] 'error':b'mysql has error' 5 6 7 D:\python\day42>python3 rabbit_topic_recv.py *.warning mysql.* 8 [*] Waiting for logs. To exit press CTRL+C 9 [x] 'mysql.error':b'mysql has error' 10 11 12 D:\python\day42>python3 rabbit_topic_send.py mysql.info "mysql has error" 13 [x] Sent 'mysql.info':'mysql has error' 14 15 16 D:\python\day42>python3 rabbit_topic_recv.py *.error.* 17 [*] Waiting for logs. To exit press CTRL+C 18 [x] 'mysql.error.':b'mysql has error' 19 20 21 #發送端 指定類型:error 消息內容 22 D:\python\day42>python3 rabbit_topic_send.py error "mysql has error" 23 [x] Sent 'error':'mysql has error' 24 25 26 D:\python\day42>python3 rabbit_topic_send.py mysql.error "mysql has error" 27 [x] Sent 'mysql.error':'mysql has error' 28 [x] 'mysql.info':b'mysql has error' 29 30 31 D:\python\day42>python3 rabbit_topic_send.py mysql.error. "mysql has error" 32 [x] Sent 'mysql.error.':'mysql has error'
8、Remote procedure call (RPC) 雙向的
To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which sends an RPC request and blocks until the answer is received:
fibonacci_rpc = FibonacciRpcClient() result = fibonacci_rpc.call(4) print("fib(4) is %r" % result)

應用場景:
示例:實現RPC服務功能
rabbit_rpc_send.py(生產者是發送端) 1 import pika 2 import uuid 3 4 class SSHRpcClient(object): 5 def __init__(self): 6 credentials = pika.PlainCredentials('nulige', '123456') 7 self.connection = pika.BlockingConnection(pika.ConnectionParameters( 8 host='192.168.1.118',credentials=credentials)) 9 10 self.channel = self.connection.channel() 11 12 result = self.channel.queue_declare(exclusive=True) #客戶端的結果必須要返回到這個queue 13 self.callback_queue = result.method.queue 14 15 self.channel.basic_consume(self.on_response,queue=self.callback_queue) #聲明從這個queue里收結果 16 17 def on_response(self, ch, method, props, body): 18 if self.corr_id == props.correlation_id: #任務標識符 19 self.response = body 20 print(body) 21 22 # 返回的結果,放在callback_queue中 23 def call(self, n): 24 self.response = None 25 self.corr_id = str(uuid.uuid4()) #唯一標識符 26 self.channel.basic_publish(exchange='', 27 routing_key='rpc_queue3', #聲明一個Q 28 properties=pika.BasicProperties( 29 reply_to=self.callback_queue, 30 correlation_id=self.corr_id, 31 ), 32 body=str(n)) 33 34 print("start waiting for cmd result ") 35 count = 0 36 while self.response is None: #如果命令沒返回結果 37 print("loop ",count) 38 count +=1 39 self.connection.process_data_events() #以不阻塞的形式去檢測有沒有新事件 40 #如果沒事件,那就什么也不做, 如果有事件,就觸發on_response事件 41 return self.response 42 43 ssh_rpc = SSHRpcClient() 44 45 print(" [x] sending cmd") 46 response = ssh_rpc.call("ipconfig") 47 48 print(" [.] Got result ") 49 print(response.decode("gbk"))
rabbit_rpc_recv.py(消費端是接收端) 1 import pika 2 import time 3 import subprocess 4 5 credentials = pika.PlainCredentials('nulige', '123456') 6 connection = pika.BlockingConnection(pika.ConnectionParameters( 7 host='192.168.1.118', credentials=credentials)) 8 9 channel = connection.channel() 10 channel.queue_declare(queue='rpc_queue3') 11 12 def SSHRPCServer(cmd): 13 14 print("recv cmd:",cmd) 15 cmd_obj = subprocess.Popen(cmd.decode(),shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 16 17 result = cmd_obj.stdout.read() or cmd_obj.stderr.read() 18 return result 19 20 def on_request(ch, method, props, body): 21 22 print(" [.] fib(%s)" % body) 23 response = SSHRPCServer(body) 24 25 ch.basic_publish(exchange='', 26 routing_key=props.reply_to, 27 properties=pika.BasicProperties(correlation_id= \ 28 props.correlation_id), 29 body=response) 30 31 channel.basic_consume(on_request, queue='rpc_queue3') 32 print(" [x] Awaiting RPC requests") 33 channel.start_consuming()