消息隊列介紹、RabbitMQ、Redis
一、什么是消息隊列
這個概念我們百度Google能查到一大堆文章,所以我就通俗的講下消息隊列的基本思路。
還記得原來寫過Queue的文章,不管是線程queue還是進程queue他都是一種消息隊列。他都是基於生產者消費者模型來處理消息。
Python中的進程queue,是用於父進程與子進程,或者同屬於一個父進程下的多個子進程之間進行信息交互。注意這種queue只能在同一個python程序下才能用,如果兩個python程序,或者Python和別的什么程序,他是不能共用的。
那么問題來了,那怎么辦那??我就是要用我的QQ程序來調用我的Word文檔,怎么辦??
這時候跨程序的消息隊列工具就出現了,現在主流的有:RabbitQM,ZeroMQ(saltstack就是用的他),Kafka,Redis的消息隊列等等
具體什么原理呢?
我想要用QQ調用word,QQ先鏈接消息隊列(broker),把調用word的消息發給broker,如果word和QQ在一個頻道,好像是我在微博關注了你,你一發消息,我這里就能收到你的信息了。word一看QQ要啟動我,好啊,那來啊,互相傷害啊,然后就啟動了。
有的大神就說了,感覺好麻煩,還得經過一個中間商,我直接QQ給word發socket請求不行嗎,多直接啊~~我微微一笑,從原理上確實是可以,可是實際操作卻太難了,消息隊列有完善、專業的消息處理機制,可以同時處理大量應用發來的海量消息,並且幾乎沒有丟消息的可能,而且,關鍵一點,用着很方便,不用管人家怎么處理你的消息,丟給他就好啦。
二、RabbitMQ介紹與簡單應用
1、安裝
RabbitMQ是用erlang語言開發的,所以先裝語言環境
erlang安裝好后,就裝RabbitMQ,上官網下rpm,源碼,或者yum裝都行。
對了,Python開發環境需要pika模塊支持
pip install pika or easy_install pika or 源碼 https://pypi.python.org/pypi/pika
2、RabbitMQ基本架構及原理
下面為RabbitMQ的基本使用原理的架構圖:
這個系統架構圖版權屬於sunjun041640。
RabbitMQ Server: 也叫broker server,它不是運送食物的卡車,而是一種傳輸服務。原話是RabbitMQisn’t a food truck, it’s a delivery service. 他的角色就是維護一條從Producer到Consumer的路線,保證數據能夠按照指定的方式進行傳輸。但是這個保證也不是100%的保證,但是對於普通的應用來說這已經足夠了。當然對於商業系統來說,可以再做一層數據一致性的guard,就可以徹底保證系統的一致性了。
Client A & B: 也叫Producer,數據的發送方。createmessages and publish (send) them to a broker server (RabbitMQ).一個Message有兩個部分:payload(有效載荷)和label(標簽)。payload顧名思義就是傳輸的數據。label是exchange的名字或者說是一個tag,它描述了payload,而且RabbitMQ也是通過這個label來決定把這個Message發給哪個Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個label的規則。
Client 1,2,3:也叫Consumer,數據的接收方。Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一個有名字的郵箱。當有Message到達某個郵箱后,RabbitMQ把它發送給它的某個訂閱者即Consumer。當然可能會把同一個Message發送給很多的Consumer。在這個Message中,只有payload,label已經被刪掉了。對於Consumer來說,它是不知道誰發送的這個信息的。就是協議本身不支持。但是當然了如果Producer發送的payload包含了Producer的信息就另當別論了。
對於一個數據從Producer到Consumer的正確傳遞,還有三個概念需要明確:exchanges, queues and bindings。
Exchanges are where producers publish their messages.
Queuesare where the messages end up and are received by consumers
Bindings are how the messages get routed from the exchange to particular queues.
還有幾個概念是上述圖中沒有標明的,那就是Connection(連接),Channel(通道,頻道)。
Connection: 就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。以后我們可以看到,程序的起始處就是建立這個TCP連接。
Channels: 虛擬連接。它建立在上述的TCP連接中。數據流動都是在Channel中進行的。也就是說,一般情況是程序起始建立TCP連接,第二步就是建立這個Channel。
那么,為什么使用Channel,而不是直接使用TCP連接?
對於OS來說,建立和關閉TCP連接是有代價的,頻繁的建立關閉TCP連接對於系統的性能有很大的影響,而且TCP的連接數也有限制,這也限制了系統處理高並發的能力。但是,在TCP連接中建立Channel是沒有上述代價的。對於Producer或者Consumer來說,可以並發的使用多個Channel進行Publish或者Receive。有實驗表明,1s的數據可以Publish10K的數據包。當然對於不同的硬件環境,不同的數據包大小這個數據肯定不一樣,但是我只想說明,對於普通的Consumer或者Producer來說,這已經足夠了。如果不夠用,你考慮的應該是如何細化split你的設計。
3、基本原理搞清了,下面來個簡單的RabbitMQ的使用栗子:
一對一的發送與接收
發送端:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #聲明queue channel.queue_declare(queue='hello') # RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
接收端:
#_*_coding:utf-8_*_ __author__ = 'Alex Li' import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program #was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
一個生產者,多個消費者,實現消息輪詢分發:
生產者:
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() # 聲明queue channel.queue_declare(queue='task_queue') # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. import sys message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time() channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent ) ) print(" [x] Sent %r" % message) connection.close()
消費者:
#_*_coding:utf-8_*_ import pika, time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(20) print(" [x] Done") print("method.delivery_tag",method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag) #告訴發送端我已經處理完了 channel.basic_consume(callback, #回調函數 queue='task_queue', no_ack=True ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
當有多個消費者消費時,如果其中一個在處理消息時掛啦,那么消息會不會丟失呢?
我們可以看到在消費者代碼有一個參數,no_ack=true,他的作用是如果為true,不管消費者有沒有處理消息,都不給生產者反饋,這樣的話,如果真的出現消費者異常掛掉了,那么正在處理的消息就真的丟了。如果no_ack=false,消費者在處理完消息后會給生產者反饋,我處理完了,這時生產者才把消息刪掉,如果生產者發現消費者沒正常反饋信息,就會把這條消息發給別的消費者,保證消息不丟失。
4、消息持久化
上面說的是客戶端掛了,服務端還能保存消息不丟失,那服務端掛了咋弄,答案是如果服務端不進行數據持久化,服務掛了連channel都不會給你留的。。。。。
我們在聲明隊列時需要加句話:
channel.queue_declare(queue='hello', durable=True)
聲明隊列持久化。服務端和客戶端都要寫。
寫了這句就行了么??不行,這只是把咱們的隊列持久化了,隊列里的消息依然會丟,然並卵啊。。。
所以要想不丟數據,還得再加代碼:
我們需要在生產者這一段加入下面的代碼,主要是那個properties
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
我們看到delivery_mode這個參數,delivery_mode=2就是消息持久化的作用。
5、消息的公平分發
當消費者的處理性能有差異時,可能會出現性能好的機器處理消息速度比接受速度還快,性能不好的機器由於處理的慢會出現消息堵塞,最后就是好機器閑着,一般機器忙不過來的情況,造成資源浪費。這里RabbitMQ會提供一個實現消息公平分發的機制,使機器有多大能力收多少消息。
這個機制的配置就是perfetch=1
channel.basic_qos(prefetch_count=1)
帶消息持久化+公平分發的完整示例
生產者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
消費者
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(10) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
上面栗子用sleep模擬處理時間,我們可以多起幾個不同sleep的消費者,觀察他們接受消息的規律,我們就會發現,處理的快的消費者接受的消息多,慢的就少。
6、Publish\Subscribe(消息發布\訂閱)
如果我們不想一對一的發送消息,而是想讓所有channel都收到,就可以用所謂的廣播來進行消息發布。這時我們就用到了exchange
Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息
topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
表達式符號說明:#代表一個或多個字符,*代表任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout
headers: 通過headers 來決定把消息發給哪些queue
消息publisher
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
消息subscriber
#_*_coding:utf-8_*_ __author__ = 'Alex Li' import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除 queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
7、有選擇的接收消息(exchange type=direct)
可以指定某些更細致的過濾規則來進行指定channel發送
publisher
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
subscriber
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
8、更細致的消息過濾(exchange type=topic)
可以動態過濾,如圖
可以進行范圍過濾,而不是寫死的過濾條件
publisher
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
subscriber
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
9、Remote procedure call (RPC)
RabbitMQ的RPC模式,可以實現發布者發布消息給消費者后,還能接收消費者處理消息后的結果,也就是兩端既是生產者又是消費者。
RPC server
#_*_coding:utf-8_*_ __author__ = 'Alex Li' import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()
RPC client
import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response,
no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
三、redis的介紹與應用
1、緩存數據庫介紹
NoSQL(NoSQL = Not Only SQL ),意即“不僅僅是SQL”,泛指非關系型的數據庫,隨着互聯網web2.0網站的興起,傳統的關系數據庫在應付web2.0網站,特別是超大規模和高並發的SNS類型的web2.0純動態網站已經顯得力不從心,暴露了很多難以克服的問題,而非關系型的數據庫則由於其本身的特點得到了非常迅速的發展。NoSQL數據庫的產生就是為了解決大規模數據集合多重數據種類帶來的挑戰,尤其是大數據應用難題。
NoSQL數據庫的四大分類
鍵值(Key-Value)存儲數據庫
NoSQL數據庫的四大分類表格分析
2、redis介紹
redis是業界主流的key-value nosql 數據庫之一。和Memcached類似,它支持存儲的value類型相對更多,包括string(字符串)、list(鏈表)、set(集合)、zset(sorted set --有序集合)和hash(哈希類型)。這些數據類型都支持push/pop、add/remove及取交集並集和差集及更豐富的操作,而且這些操作都是原子性的。在此基礎上,redis支持各種不同方式的排序。與memcached一樣,為了保證效率,數據都是緩存在內存中。區別的是redis會周期性的把更新的數據寫入磁盤或者把修改操作寫入追加的記錄文件,並且在此基礎上實現了master-slave(主從)同步。
Redis優點
-
異常快速 : Redis是非常快的,每秒可以執行大約110000設置操作,81000個/每秒的讀取操作。
-
支持豐富的數據類型 : Redis支持最大多數開發人員已經知道如列表,集合,可排序集合,哈希等數據類型。
這使得在應用中很容易解決的各種問題,因為我們知道哪些問題處理使用哪種數據類型更好解決。 -
操作都是原子的 : 所有 Redis 的操作都是原子,從而確保當兩個客戶同時訪問 Redis 服務器得到的是更新后的值(最新值)。
-
MultiUtility工具:Redis是一個多功能實用工具,可以在很多如:緩存,消息傳遞隊列中使用(Redis原生支持發布/訂閱),在應用程序中,如:Web應用程序會話,網站頁面點擊數等任何短暫的數據;
3、Redis API使用
redis-py 的API的使用可以分類為:
- 連接方式
- 連接池
- 操作
- String 操作
- Hash 操作
- List 操作
- Set 操作
- Sort Set 操作
- 管道
- 發布訂閱
連接方式
1、操作模式
redis-py提供兩個類Redis和StrictRedis用於實現Redis的命令,StrictRedis用於實現大部分官方的命令,並使用官方的語法和命令,Redis是StrictRedis的子類,用於向后兼容舊版本的redis-py。
import redis r = redis.Redis(host='127.0.0.1', port=6379) r.set('foo', 'Bar') print r.get('foo')
2、連接池
redis-py使用connection pool來管理對一個redis server的所有連接,避免每次建立、釋放連接的開銷。默認,每個Redis實例都會維護一個自己的連接池。可以直接建立一個連接池,然后作為參數Redis,這樣就可以實現多個Redis實例共享一個連接池。
#!/usr/bin/env python # -*- coding:utf-8 -*- import redis pool = redis.ConnectionPool(host='127.0.0.1', port=6379) r = redis.Redis(connection_pool=pool) r.set('foo', 'Bar') print r.get('foo')
3、操作
String操作,redis中的String在在內存中按照一個name對應一個value來存儲。
set(name, value, ex=None, px=None, nx=False, xx=False)
在Redis中設置值,默認,不存在則創建,存在則修改 參數: ex,過期時間(秒) px,過期時間(毫秒) nx,如果設置為True,則只有name不存在時,當前set操作才執行 xx,如果設置為True,則只有name存在時,崗前set操作才執行setnx(name, value)
設置值,只有name不存在時,執行設置操作(添加)setex(name, value, time)
# 設置值 # 參數: # time,過期時間(數字秒 或 timedelta對象)psetex(name, time_ms, value)
# 設置值 # 參數: # time_ms,過期時間(數字毫秒 或 timedelta對象)mset(*args, **kwargs)
批量設置值 如: mset(k1='v1', k2='v2') 或 mget({'k1': 'v1', 'k2': 'v2'})get(name)
獲取值mget(keys, *args)
批量獲取 如: mget('ylr', 'wupeiqi') 或 r.mget(['ylr', 'wupeiqi'])getset(name, value)
設置新值並獲取原來的值getrange(key, start, end)
# 獲取子序列(根據字節獲取,非字符) # 參數: # name,Redis 的 name # start,起始位置(字節) # end,結束位置(字節) # 如: "武藤蘭" ,0-3表示 "武"setrange(name, offset, value)
# 修改字符串內容,從指定字符串索引開始向后替換(新值太長時,則向后添加) # 參數: # offset,字符串的索引,字節(一個漢字三個字節) # value,要設置的值setbit(name, offset, value)
# 對name對應值的二進制表示的位進行操作 # 參數: # name,redis的name # offset,位的索引(將值變換成二進制后再進行索引) # value,值只能是 1 或 0 # 注:如果在Redis中有一個對應: n1 = "foo", 那么字符串foo的二進制表示為:01100110 01101111 01101111 所以,如果執行 setbit('n1', 7, 1),則就會將第7位設置為1, 那么最終二進制則變成 01100111 01101111 01101111,即:"goo" # 擴展,轉換二進制表示: # source = "蒼老師" source = "foo" for i in source: num = ord(i) print bin(num).replace('b','') 特別的,如果source是漢字 "蒼老師"怎么辦? 答:對於utf-8,每一個漢字占 3 個字節,那么 "蒼老師" 則有 9個字節 對於漢字,for循環時候會按照 字節 迭代,那么在迭代時,將每一個字節轉換 十進制數,然后再將十進制數轉換成二進制 11100110 10101101 10100110 11100110 10110010 10011011 11101001 10111101 10010000 -------------------------- -------------------------- -------------------------- 蒼 老 師*用途舉例,用最省空間的方式,存儲在線用戶數及分別是哪些用戶在線
getbit(name, offset)
# 獲取name對應的值的二進制表示中的某位的值 (0或1)bitcount(key, start=None, end=None)
# 獲取name對應的值的二進制表示中 1 的個數 # 參數: # key,Redis的name # start,位起始位置 # end,位結束位置bitop(operation, dest, *keys)
# 獲取多個值,並將值做位運算,將最后的結果保存至新的name對應的值 # 參數: # operation,AND(並) 、 OR(或) 、 NOT(非) 、 XOR(異或) # dest, 新的Redis的name # *keys,要查找的Redis的name # 如: bitop("AND", 'new_name', 'n1', 'n2', 'n3') # 獲取Redis中n1,n2,n3對應的值,然后講所有的值做位運算(求並集),然后將結果保存 new_name 對應的值中strlen(name)
# 返回name對應值的字節長度(一個漢字3個字節)incr(self, name, amount=1)
# 自增 name對應的值,當name不存在時,則創建name=amount,否則,則自增。 # 參數: # name,Redis的name # amount,自增數(必須是整數) # 注:同incrbyincrbyfloat(self, name, amount=1.0)
# 自增 name對應的值,當name不存在時,則創建name=amount,否則,則自增。 # 參數: # name,Redis的name # amount,自增數(浮點型)decr(self, name, amount=1)
# 自減 name對應的值,當name不存在時,則創建name=amount,否則,則自減。 # 參數: # name,Redis的name # amount,自減數(整數)append(key, value)
# 在redis name對應的值后面追加內容 # 參數: key, redis的name value, 要追加的字符串
Hash操作,redis中Hash在內存中的存儲格式如下圖:
hset(name, key, value)
# name對應的hash中設置一個鍵值對(不存在,則創建;否則,修改) # 參數: # name,redis的name # key,name對應的hash中的key # value,name對應的hash中的value # 注: # hsetnx(name, key, value),當name對應的hash中不存在當前key時則創建(相當於添加)hmset(name, mapping)
# 在name對應的hash中批量設置鍵值對 # 參數: # name,redis的name # mapping,字典,如:{'k1':'v1', 'k2': 'v2'} # 如: # r.hmset('xx', {'k1':'v1', 'k2': 'v2'})hget(name,key)
# 在name對應的hash中獲取根據key獲取valuehmget(name, keys, *args)
# 在name對應的hash中獲取多個key的值 # 參數: # name,reids對應的name # keys,要獲取key集合,如:['k1', 'k2', 'k3'] # *args,要獲取的key,如:k1,k2,k3 # 如: # r.mget('xx', ['k1', 'k2']) # 或 # print r.hmget('xx', 'k1', 'k2')hgetall(name)
獲取name對應hash的所有鍵值hlen(name)
# 獲取name對應的hash中鍵值對的個數hkeys(name)
# 獲取name對應的hash中所有的key的值hvals(name)
# 獲取name對應的hash中所有的value的值hexists(name, key)
# 檢查name對應的hash是否存在當前傳入的keyhdel(name,*keys)
# 將name對應的hash中指定key的鍵值對刪除hincrby(name, key, amount=1)
# 自增name對應的hash中的指定key的值,不存在則創建key=amount # 參數: # name,redis中的name # key, hash對應的key # amount,自增數(整數)hincrbyfloat(name, key, amount=1.0)
# 自增name對應的hash中的指定key的值,不存在則創建key=amount # 參數: # name,redis中的name # key, hash對應的key # amount,自增數(浮點數) # 自增name對應的hash中的指定key的值,不存在則創建key=amounthscan(name, cursor=0, match=None, count=None)
# 增量式迭代獲取,對於數據大的數據非常有用,hscan可以實現分片的獲取數據,並非一次性將數據全部獲取完,從而放置內存被撐爆 # 參數: # name,redis的name # cursor,游標(基於游標分批取獲取數據) # match,匹配指定key,默認None 表示所有的key # count,每次分片最少獲取個數,默認None表示采用Redis的默認分片個數 # 如: # 第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None) # 第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None) # ... # 直到返回值cursor的值為0時,表示數據已經通過分片獲取完畢hscan_iter(name, match=None, count=None)
# 利用yield封裝hscan創建生成器,實現分批去redis中獲取數據 # 參數: # match,匹配指定key,默認None 表示所有的key # count,每次分片最少獲取個數,默認None表示采用Redis的默認分片個數 # 如: # for item in r.hscan_iter('xx'): # print item
List操作,redis中的List在在內存中按照一個name對應一個List來存儲。如圖:
lpush(name,values)
# 在name對應的list中添加元素,每個新的元素都添加到列表的最左邊 # 如: # r.lpush('oo', 11,22,33) # 保存順序為: 33,22,11 # 擴展: # rpush(name, values) 表示從右向左操作lpushx(name,value)
# 在name對應的list中添加元素,只有name已經存在時,值添加到列表的最左邊 # 更多: # rpushx(name, value) 表示從右向左操作llen(name)
# name對應的list元素的個數linsert(name, where, refvalue, value))
# 在name對應的列表的某一個值前或后插入一個新值 # 參數: # name,redis的name # where,BEFORE或AFTER # refvalue,標桿值,即:在它前后插入數據 # value,要插入的數據r.lset(name, index, value)
# 對name對應的list中的某一個索引位置重新賦值 # 參數: # name,redis的name # index,list的索引位置 # value,要設置的值r.lrem(name, value, num)
# 在name對應的list中刪除指定的值 # 參數: # name,redis的name # value,要刪除的值 # num, num=0,刪除列表中所有的指定值; # num=2,從前到后,刪除2個; # num=-2,從后向前,刪除2個lpop(name)
# 在name對應的列表的左側獲取第一個元素並在列表中移除,返回值則是第一個元素 # 更多: # rpop(name) 表示從右向左操作lindex(name, index)
# 在name對應的列表中根據索引獲取列表元素lrange(name, start, end)
# 在name對應的列表分片獲取數據 # 參數: # name,redis的name # start,索引的起始位置 # end,索引結束位置ltrim(name, start, end)
# 在name對應的列表中移除沒有在start-end索引之間的值 # 參數: # name,redis的name # start,索引的起始位置 # end,索引結束位置rpoplpush(src, dst)
# 從一個列表取出最右邊的元素,同時將其添加至另一個列表的最左邊 # 參數: # src,要取數據的列表的name # dst,要添加數據的列表的nameblpop(keys, timeout)
# 將多個列表排列,按照從左到右去pop對應列表的元素 # 參數: # keys,redis的name的集合 # timeout,超時時間,當元素所有列表的元素獲取完之后,阻塞等待列表內有數據的時間(秒), 0 表示永遠阻塞 # 更多: # r.brpop(keys, timeout),從右向左獲取數據brpoplpush(src, dst, timeout=0)
# 從一個列表的右側移除一個元素並將其添加到另一個列表的左側 # 參數: # src,取出並要移除元素的列表對應的name # dst,要插入元素的列表對應的name # timeout,當src對應的列表中沒有數據時,阻塞等待其有數據的超時時間(秒),0 表示永遠阻塞自定義增量迭代
# 由於redis類庫中沒有提供對列表元素的增量迭代,如果想要循環name對應的列表的所有元素,那么就需要: # 1、獲取name對應的所有列表 # 2、循環列表 # 但是,如果列表非常大,那么就有可能在第一步時就將程序的內容撐爆,所有有必要自定義一個增量迭代的功能: def list_iter(name): """ 自定義redis列表增量迭代 :param name: redis中的name,即:迭代name對應的列表 :return: yield 返回 列表元素 """ list_count = r.llen(name) for index in xrange(list_count): yield r.lindex(name, index) # 使用 for item in list_iter('pp'): print itemSet操作,Set集合就是不允許重復的列表
sadd(name,values)
# name對應的集合中添加元素scard(name)
獲取name對應的集合中元素個數sdiff(keys, *args)
# 在第一個name對應的集合中且不在其他name對應的集合的元素集合sdiffstore(dest, keys, *args)
# 獲取第一個name對應的集合中且不在其他name對應的集合,再將其新加入到dest對應的集合中sinter(keys, *args)
# 獲取多個name對應集合的並集sinterstore(dest, keys, *args)
# 獲取多個name對應集合的並集,再講其加入到dest對應的集合中sismember(name, value)
# 檢查value是否是name對應的集合的成員smembers(name)
# 獲取name對應的集合的所有成員smove(src, dst, value)
# 將某個成員從一個集合中移動到另外一個集合spop(name)
# 從集合的右側(尾部)移除一個成員,並將其返回srandmember(name, numbers)
# 從name對應的集合中隨機獲取 numbers 個元素srem(name, values)
# 在name對應的集合中刪除某些值sunion(keys, *args)
# 獲取多個name對應的集合的並集sunionstore(dest,keys, *args)
# 獲取多個name對應的集合的並集,並將結果保存到dest對應的集合中sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)# 同字符串的操作,用於增量迭代分批獲取元素,避免內存消耗太大
有序集合,在集合的基礎上,為每元素排序;元素的排序需要根據另外一個值來進行比較,所以,對於有序集合,每一個元素有兩個值,即:值和分數,分數專門用來做排序。
zadd(name, *args, **kwargs)
# 在name對應的有序集合中添加元素 # 如: # zadd('zz', 'n1', 1, 'n2', 2) # 或 # zadd('zz', n1=11, n2=22)zcard(name)
# 獲取name對應的有序集合元素的數量zcount(name, min, max)
# 獲取name對應的有序集合中分數 在 [min,max] 之間的個數zincrby(name, value, amount)
# 自增name對應的有序集合的 name 對應的分數r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)
# 按照索引范圍獲取name對應的有序集合的元素 # 參數: # name,redis的name # start,有序集合索引起始位置(非分數) # end,有序集合索引結束位置(非分數) # desc,排序規則,默認按照分數從小到大排序 # withscores,是否獲取元素的分數,默認只獲取元素的值 # score_cast_func,對分數進行數據轉換的函數 # 更多: # 從大到小排序 # zrevrange(name, start, end, withscores=False, score_cast_func=float) # 按照分數范圍獲取name對應的有序集合的元素 # zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float) # 從大到小排序 # zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)zrank(name, value)
# 獲取某個值在 name對應的有序集合中的排行(從 0 開始) # 更多: # zrevrank(name, value),從大到小排序zrangebylex(name, min, max, start=None, num=None)
# 當有序集合的所有成員都具有相同的分值時,有序集合的元素會根據成員的 值 (lexicographical ordering)來進行排序,而這個命令則可以返回給定的有序集合鍵 key 中, 元素的值介於 min 和 max 之間的成員 # 對集合中的每個成員進行逐個字節的對比(byte-by-byte compare), 並按照從低到高的順序, 返回排序后的集合成員。 如果兩個字符串有一部分內容是相同的話, 那么命令會認為較長的字符串比較短的字符串要大 # 參數: # name,redis的name # min,左區間(值)。 + 表示正無限; - 表示負無限; ( 表示開區間; [ 則表示閉區間 # min,右區間(值) # start,對結果進行分片處理,索引位置 # num,對結果進行分片處理,索引后面的num個元素 # 如: # ZADD myzset 0 aa 0 ba 0 ca 0 da 0 ea 0 fa 0 ga # r.zrangebylex('myzset', "-", "[ca") 結果為:['aa', 'ba', 'ca'] # 更多: # 從大到小排序 # zrevrangebylex(name, max, min, start=None, num=None)zrem(name, values)
# 刪除name對應的有序集合中值是values的成員 # 如:zrem('zz', ['s1', 's2'])zremrangebyrank(name, min, max)
# 根據排行范圍刪除zremrangebyscore(name, min, max)
# 根據分數范圍刪除zremrangebylex(name, min, max)
# 根據值返回刪除zscore(name, value)
# 獲取name對應有序集合中 value 對應的分數zinterstore(dest, keys, aggregate=None)
# 獲取兩個有序集合的交集,如果遇到相同值不同分數,則按照aggregate進行操作 # aggregate的值為: SUM MIN MAXzunionstore(dest, keys, aggregate=None)
獲取兩個有序集合的並集,如果遇到相同值不同分數,則按照aggregate進行操作 # aggregate的值為: SUM MIN MAXzscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)# 同字符串相似,相較於字符串新增score_cast_func,用來對分數進行操作
其他常用操作
delete(*names)
# 根據刪除redis中的任意數據類型exists(name)
# 檢測redis的name是否存在keys(pattern='*')
# 根據模型獲取redis的name # 更多: # KEYS * 匹配數據庫中所有 key 。 # KEYS h?llo 匹配 hello , hallo 和 hxllo 等。 # KEYS h*llo 匹配 hllo 和 heeeeello 等。 # KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hilloexpire(name ,time)
# 為某個redis的某個name設置超時時間rename(src, dst)
# 對redis的name重命名為move(name, db))
# 將redis的某個值移動到指定的db下randomkey()
# 隨機獲取一個redis的name(不刪除)type(name)
# 獲取name對應值的類型scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)# 同字符串操作,用於增量迭代獲取key
4、管道
redis-py默認在執行每次請求都會創建(連接池申請連接)和斷開(歸還連接池)一次連接操作,如果想要在一次請求中指定多個命令,則可以使用pipline實現一次請求指定多個命令,並且默認情況下一次pipline 是原子性操作。
#!/usr/bin/env python # -*- coding:utf-8 -*- import redis pool = redis.ConnectionPool(host='127.0.0.1', port=6379) r = redis.Redis(connection_pool=pool) # pipe = r.pipeline(transaction=False) pipe = r.pipeline(transaction=True) pipe.set('name', 'chengyiqiang') pipe.set('role', 'coder') pipe.execute()
5、發布訂閱
redis能通過頻道進行簡單的發布訂閱功能
demo如下:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import redis 5 6 7 class RedisHelper: 8 9 def __init__(self): 10 self.__conn = redis.Redis(host='10.211.55.4') 11 self.chan_sub = 'fm104.5' 12 self.chan_pub = 'fm104.5' 13 14 def public(self, msg): 15 self.__conn.publish(self.chan_pub, msg) 16 return True 17 18 def subscribe(self): 19 pub = self.__conn.pubsub() 20 pub.subscribe(self.chan_sub) 21 pub.parse_response() 22 return pub
訂閱者:
#!/usr/bin/env python # -*- coding:utf-8 -*- from monitor.RedisHelper import RedisHelper obj = RedisHelper() redis_sub = obj.subscribe() while True: msg= redis_sub.parse_response() print msg
發布者:
#!/usr/bin/env python # -*- coding:utf-8 -*- from monitor.RedisHelper import RedisHelper obj = RedisHelper() obj.public('hello')