Redis提供了兩種方式來作消息隊列。一個是使用生產者消費模式模式,另外一個方法就是發布訂閱者模式。前者會讓一個或者多個客戶端監聽消息隊列,一旦消息到達,消費者馬上消費,誰先搶到算誰的,如果隊列里沒有消息,則消費者繼續監聽。后者也是一個或多個客戶端訂閱消息頻道,只要發布者發布消息,所有訂閱者都能收到消息,訂閱者都是ping的。
生產消費模式
主要使用了redis提供的blpop獲取隊列數據,如果隊列沒有數據則阻塞等待,也就是監聽。
#coding:utf-8
"""
生產者-消費者模式中的消費者
"""
import redis
class Task(object):
def __init__(self):
self.rcon = redis.StrictRedis(host='localhost', db=5)
self.queue = 'task:prodcons:queue'
def listen_task(self):
while True:
task = self.rcon.blpop(self.queue, 0)[1]
print "Task get", task
if __name__ == '__main__':
print '監聽任務隊列'
Task().listen_task()
發布訂閱模式
使用redis的pubsub功能,訂閱者訂閱頻道,發布者發布消息到頻道了,頻道就是一個消息隊列。
#coding:utf-8
"""
發布訂閱模式中的訂閱者
"""
import redis
class Task(object):
def __init__(self):
self.rcon = redis.StrictRedis(host='localhost', db=5)
self.ps = self.rcon.pubsub()
self.ps.subscribe('task:pubsub:channel')
def listen_task(self):
for i in self.ps.listen():
if i['type'] == 'message':
print "Task get", i['data']
if __name__ == '__main__':
print '監聽任務頻道'
Task().listen_task()
提供生產者和消息發布者
這里使用Flask做一個簡單的頁面作為生產者和消息發布者
# coding:utf-8
"""
提供一個頁面作為生產者/消息發布者
"""
import redis
import random
import logging
from flask import Flask, redirect
app = Flask(__name__)
rcon = redis.StrictRedis(host='localhost', db=1)
prodcons_queue = 'task:prodcons:queue' #生產者消費者隊列名稱
pubsub_channel = 'task:pubsub:channel' #發布訂閱頻道名稱
@app.route('/')
def index():
html = """
<br>
<center><h3>Redis Message Queue</h3>
<br>
<a href="/prodcons">生產消費者模式</a>
<br>
<br>
<a href="/pubsub">發布訂閱者模式</a>
</center>
"""
return html
#生產者
@app.route('/prodcons')
def prodcons():
elem = random.randrange(10)
rcon.lpush(prodcons_queue, elem)
logging.info("lpush {} -- {}".format(prodcons_queue, elem))
return redirect('/')
#消息發布者
@app.route('/pubsub')
def pubsub():
ps = rcon.pubsub()
ps.subscribe(pubsub_channel)
elem = random.randrange(10)
rcon.publish(pubsub_channel, elem)
return redirect('/')
if __name__ == '__main__':
app.run(debug=True)
測試
將三短代碼分別保存在三個腳本文件中啟動.
打開瀏覽器,會看到如下頁面:

點擊其中的兩個超鏈接,兩個一直在監聽的消費者和訂閱者會立即得到消息
消費者

訂閱者

