Redis使用總結(3):實現簡單的消息隊列


參考Redis實現簡單消息隊列

Redis提供了兩種方式來作消息隊列。一個是使用生產者消費模式模式,另外一個方法就是發布訂閱者模式。前者會讓一個或者多個客戶端監聽消息隊列,一旦消息到達,消費者馬上消費,誰先搶到算誰的,如果隊列里沒有消息,則消費者繼續監聽。后者也是一個或多個客戶端訂閱消息頻道,只要發布者發布消息,所有訂閱者都能收到消息,訂閱者都是ping的。

生產消費模式

主要使用了redis提供的blpop獲取隊列數據,如果隊列沒有數據則阻塞等待,也就是監聽。

#coding:utf-8
"""
生產者-消費者模式中的消費者
"""
import redis

class Task(object):
    def __init__(self):
        self.rcon = redis.StrictRedis(host='localhost', db=5)
        self.queue = 'task:prodcons:queue'

    def listen_task(self):
        while True:
            task = self.rcon.blpop(self.queue, 0)[1]
            print "Task get", task

if __name__ == '__main__':
    print '監聽任務隊列'
    Task().listen_task()

發布訂閱模式

使用redis的pubsub功能,訂閱者訂閱頻道,發布者發布消息到頻道了,頻道就是一個消息隊列。

#coding:utf-8
"""
發布訂閱模式中的訂閱者
"""
import redis

class Task(object):

    def __init__(self):
        self.rcon = redis.StrictRedis(host='localhost', db=5)
        self.ps = self.rcon.pubsub()
        self.ps.subscribe('task:pubsub:channel')

    def listen_task(self):
        for i in self.ps.listen():
            if i['type'] == 'message':
                print "Task get", i['data']

if __name__ == '__main__':
    print '監聽任務頻道'
    Task().listen_task()

提供生產者和消息發布者

這里使用Flask做一個簡單的頁面作為生產者和消息發布者

# coding:utf-8
"""
提供一個頁面作為生產者/消息發布者
"""
import redis
import random
import logging
from flask import Flask, redirect

app = Flask(__name__)

rcon = redis.StrictRedis(host='localhost', db=1)
prodcons_queue = 'task:prodcons:queue'  #生產者消費者隊列名稱
pubsub_channel = 'task:pubsub:channel' #發布訂閱頻道名稱

@app.route('/')
def index():

    html = """
<br>
<center><h3>Redis Message Queue</h3>
<br>
<a href="/prodcons">生產消費者模式</a>
<br>
<br>
<a href="/pubsub">發布訂閱者模式</a>
</center>
"""
    return html

#生產者
@app.route('/prodcons')
def prodcons():
    elem = random.randrange(10)
    rcon.lpush(prodcons_queue, elem)
    logging.info("lpush {} -- {}".format(prodcons_queue, elem))
    return redirect('/')
#消息發布者
@app.route('/pubsub')
def pubsub():
    ps = rcon.pubsub()
    ps.subscribe(pubsub_channel)
    elem = random.randrange(10)
    rcon.publish(pubsub_channel, elem)
    return redirect('/')

if __name__ == '__main__':
    app.run(debug=True)

測試

將三短代碼分別保存在三個腳本文件中啟動.

打開瀏覽器,會看到如下頁面:

點擊其中的兩個超鏈接,兩個一直在監聽的消費者和訂閱者會立即得到消息

消費者

訂閱者


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM