python中利用redis構建任務隊列(queue)


Python中的使用標准queue模塊就可以建立多進程使用的隊列,但是使用redisredis-queue(rq)模塊使這一操作更加簡單。

Part 1.

比如首先我們使用隊列來簡單的儲存數據:我們選用redis list類型,其他類型的數據操作可以參考這個文章
redis_queue.py文件中:

import redis

class RedisQueue(object):
    def __init__(self, name, namespace='queue', **redis_kwargs):
       # redis的默認參數為:host='localhost', port=6379, db=0, 其中db為定義redis database的數量
       self.__db= redis.Redis(**redis_kwargs)
       self.key = '%s:%s' %(namespace, name)

    def qsize(self):
        return self.__db.llen(self.key)  # 返回隊列里面list內元素的數量

    def put(self, item):
        self.__db.rpush(self.key, item)  # 添加新元素到隊列最右方

    def get_wait(self, timeout=None):
        # 返回隊列第一個元素,如果為空則等待至有元素被加入隊列(超時時間閾值為timeout,如果為None則一直等待)
        item = self.__db.blpop(self.key, timeout=timeout)
        # if item:
        #     item = item[1]  # 返回值為一個tuple
        return item

    def get_nowait(self):
        # 直接返回隊列第一個元素,如果隊列為空返回的是None
        item = self.__db.lpop(self.key)  
        return item

input.py文件中:

from redis_queue import RedisQueue
import time

q = RedisQueue('rq')  # 新建隊列名為rq
for i in range(5):
    q.put(i)
    print "input.py: data {} enqueue {}".format(i, time.strftime("%c"))
    time.sleep(1)

output.py文件中:

from redis_queue import RedisQueue
import time

q = RedisQueue('rq')
while 1:
    result = q.get_nowait()
    if not result:
        break
    print "output.py: data {} out of queue {}".format(result, time.strftime("%c"))
    time.sleep(2)

test_run.sh文件中:

python input.py &
python output.py &

在terminal中運行test.sh的結果如下:

input.py: data 0 enqueue Fri Nov 10 10:51:37 2017
output.py: data 0 out of queue Fri Nov 10 10:51:37 2017
input.py: data 1 enqueue Fri Nov 10 10:51:38 2017
output.py: data 1 out of queue Fri Nov 10 10:51:38 2017
input.py: data 2 enqueue Fri Nov 10 10:51:39 2017
output.py: data 2 out of queue Fri Nov 10 10:51:39 2017
input.py: data 3 enqueue Fri Nov 10 10:51:40 2017
output.py: data ('queue:rq', '3') out of queue Fri Nov 10 10:51:40 2017
input.py: data 4 enqueue Fri Nov 10 10:51:41 2017
output.py: data ('queue:rq', '4') out of queue 1510282301.69

其中lpop返回的是一個隊列名+元素值的tuple,並且返回的數值默認為string

Part 2.

隊列里面可以添加任意object,因此可以添加函數到多個隊列來實現多線程並發的效果。
首先,建立一個rq work進程(寫在worker.py腳本中)來監聽隊列

import redis
from rq import Worker, Queue, Connection

listen = ['default']
redis_url = "redis://localhost:6379"  # redis server 默認地址
conn = redis.from_url(redis_url)

def square_function(x):
    return x*x

if __name__ == '__main__':
    with Connection(conn):  # 建立與redis server的連接
        worker = Worker(list(map(Queue, listen)))  # 建立worker監聽給定的隊列
        worker.work()

然后python worker.py啟動redis server

test.py文件中:

from rq import Queue
from rq.job import Job
from worker import square_function, conn 
import time

q = Queue(connection=conn)

job = q.enqueue_call(square_function, args=(5, ), result_ttl=5000)   # 保存結果5000s
job_id = job.get_id()
print job_id

result1 = Job.fetch(job_id, connection=conn)
print result1.is_finished

time.sleep(1)  # 等待隊列里任務完成

result2 = Job.fetch(job_id, connection=conn)
print result2.return_value

上面文件的輸出結果為:

98dc6f58-9333-48f7-88c1-c4de1cc9e5cf  # job id
False # 任務尚未完成
25 # 任務完成,輸出結果

: 調用的square_function不允許和任務發起在同一個腳本,否則會報錯ValueError: Functions from the __main__ module cannot be processed by workers

當與flask一起使用時:
app.py文件中:

from rq import Queue
from rq.job import Job
from worker import conn, square_function

from flask import Flask, request

app = Flask(__name__)

q = Queue(connection=conn) # 建立與Redis server的連接並初始化一個隊列

@app.route("/", methods=['POST'])
def index():
    x = request.get_data()  # string 類型
    job = q.enqueue_call(square_function, args=(int(x), ), result_ttl=5000)  # 最后的參數為RQ保留結果的時間
    return job.get_id()  # 返回job的id

@app.route('/result/<job_key>', methods=['GET'])
def get_results(job_key):
    job = Job.fetch(job_key, connection=conn) # 獲取根據job_id獲取任務的返回值
    if job.is_finished: # 檢驗是否完成
        return str(job.result), 200
    else:
        return "Wait!", 202

if __name__ == "__main__":
    app.run('0.0.0.0', port=5000)

python app.py啟動flask服務

test.py文件:

import requests
import time

post_url = "http://localhost:5000"
post_result = requests.post(post_url, data={'x': 2})
job_id = post_result.content
print job_id

time.sleep(1)

get_url = "http://localhost:5000/result/{}".format(job_id)
get_result = requests.get(get_url)
print get_result.content

獲得的結果如下:

067306e9-f13b-4b6a-93dc-2f5b457a78b7  # job id
4  # job返回值

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM