Python中的使用標准queue模塊就可以建立多進程使用的隊列,但是使用redis和redis-queue(rq)模塊使這一操作更加簡單。
Part 1.
比如首先我們使用隊列來簡單的儲存數據:我們選用redis list類型,其他類型的數據操作可以參考這個文章
在redis_queue.py文件中:
import redis
class RedisQueue(object):
def __init__(self, name, namespace='queue', **redis_kwargs):
# redis的默認參數為:host='localhost', port=6379, db=0, 其中db為定義redis database的數量
self.__db= redis.Redis(**redis_kwargs)
self.key = '%s:%s' %(namespace, name)
def qsize(self):
return self.__db.llen(self.key) # 返回隊列里面list內元素的數量
def put(self, item):
self.__db.rpush(self.key, item) # 添加新元素到隊列最右方
def get_wait(self, timeout=None):
# 返回隊列第一個元素,如果為空則等待至有元素被加入隊列(超時時間閾值為timeout,如果為None則一直等待)
item = self.__db.blpop(self.key, timeout=timeout)
# if item:
# item = item[1] # 返回值為一個tuple
return item
def get_nowait(self):
# 直接返回隊列第一個元素,如果隊列為空返回的是None
item = self.__db.lpop(self.key)
return item
在input.py文件中:
from redis_queue import RedisQueue
import time
q = RedisQueue('rq') # 新建隊列名為rq
for i in range(5):
q.put(i)
print "input.py: data {} enqueue {}".format(i, time.strftime("%c"))
time.sleep(1)
在output.py文件中:
from redis_queue import RedisQueue
import time
q = RedisQueue('rq')
while 1:
result = q.get_nowait()
if not result:
break
print "output.py: data {} out of queue {}".format(result, time.strftime("%c"))
time.sleep(2)
在test_run.sh文件中:
python input.py &
python output.py &
在terminal中運行test.sh的結果如下:
input.py: data 0 enqueue Fri Nov 10 10:51:37 2017
output.py: data 0 out of queue Fri Nov 10 10:51:37 2017
input.py: data 1 enqueue Fri Nov 10 10:51:38 2017
output.py: data 1 out of queue Fri Nov 10 10:51:38 2017
input.py: data 2 enqueue Fri Nov 10 10:51:39 2017
output.py: data 2 out of queue Fri Nov 10 10:51:39 2017
input.py: data 3 enqueue Fri Nov 10 10:51:40 2017
output.py: data ('queue:rq', '3') out of queue Fri Nov 10 10:51:40 2017
input.py: data 4 enqueue Fri Nov 10 10:51:41 2017
output.py: data ('queue:rq', '4') out of queue 1510282301.69
其中lpop返回的是一個隊列名+元素值的tuple,並且返回的數值默認為string
Part 2.
隊列里面可以添加任意object,因此可以添加函數到多個隊列來實現多線程並發的效果。
首先,建立一個rq work進程(寫在worker.py腳本中)來監聽隊列
import redis
from rq import Worker, Queue, Connection
listen = ['default']
redis_url = "redis://localhost:6379" # redis server 默認地址
conn = redis.from_url(redis_url)
def square_function(x):
return x*x
if __name__ == '__main__':
with Connection(conn): # 建立與redis server的連接
worker = Worker(list(map(Queue, listen))) # 建立worker監聽給定的隊列
worker.work()
然后python worker.py啟動redis server
在test.py文件中:
from rq import Queue
from rq.job import Job
from worker import square_function, conn
import time
q = Queue(connection=conn)
job = q.enqueue_call(square_function, args=(5, ), result_ttl=5000) # 保存結果5000s
job_id = job.get_id()
print job_id
result1 = Job.fetch(job_id, connection=conn)
print result1.is_finished
time.sleep(1) # 等待隊列里任務完成
result2 = Job.fetch(job_id, connection=conn)
print result2.return_value
上面文件的輸出結果為:
98dc6f58-9333-48f7-88c1-c4de1cc9e5cf # job id
False # 任務尚未完成
25 # 任務完成,輸出結果
注: 調用的square_function不允許和任務發起在同一個腳本,否則會報錯ValueError: Functions from the __main__ module cannot be processed by workers
當與flask一起使用時:
在app.py文件中:
from rq import Queue
from rq.job import Job
from worker import conn, square_function
from flask import Flask, request
app = Flask(__name__)
q = Queue(connection=conn) # 建立與Redis server的連接並初始化一個隊列
@app.route("/", methods=['POST'])
def index():
x = request.get_data() # string 類型
job = q.enqueue_call(square_function, args=(int(x), ), result_ttl=5000) # 最后的參數為RQ保留結果的時間
return job.get_id() # 返回job的id
@app.route('/result/<job_key>', methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn) # 獲取根據job_id獲取任務的返回值
if job.is_finished: # 檢驗是否完成
return str(job.result), 200
else:
return "Wait!", 202
if __name__ == "__main__":
app.run('0.0.0.0', port=5000)
python app.py啟動flask服務
在test.py文件:
import requests
import time
post_url = "http://localhost:5000"
post_result = requests.post(post_url, data={'x': 2})
job_id = post_result.content
print job_id
time.sleep(1)
get_url = "http://localhost:5000/result/{}".format(job_id)
get_result = requests.get(get_url)
print get_result.content
獲得的結果如下:
067306e9-f13b-4b6a-93dc-2f5b457a78b7 # job id
4 # job返回值
