談談python里面關於任務隊列
- 為什么要做任務隊列
要回答這個問題我們首先看看在流水線上的案列,如果人的速度很慢,機器的速度比人的速度快很多,就會造成,機器生產的東西沒有及時處理,越積越多,造成阻塞,影響生產。
- 任務隊列的意義:
打個比方如果出現人的速度跟不上機器速度怎么辦,這個時候我們就需要第三方,監管人員(任務隊列)把機器生產的東西,放在一個地方,(隊列),然后分配給每個用戶,有條不理的執行。
python 里面的celery 模塊是一個簡單,靈活且可靠的,處理大量消息的分布式系統,並且提供維護這樣一個系統的必需工具。它是一個專注於實時處理的任務隊列,同時也支持任務調度。
- 關於安裝celery
pip install Celery
關於celery 的概念介紹
消息隊列
消息隊列的輸入是工作的一個單元,稱為任務,獨立的職程(Worker)進程持續監視隊列中是否有需要處理的新任務。
Celery 用消息通信,通常使用中間人(Broker)在客戶端和職程間斡旋。這個過程從客戶端向隊列添加消息開始,之后中間人把消息派送給職程,職程對消息進行處理。如下圖所示:
Celery 系統可包含多個職程和中間人,以此獲得高可用性和橫向擴展能力。
Celery****的架構
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
消息中間件
Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成,包括,RabbitMQ,Redis,MongoDB等,這里我先去了解RabbitMQ,Redis。
任務執行單元
Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中
任務結果存儲
Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括Redis,MongoDB,Django ORM,AMQP等,這里我先不去看它是如何存儲的,就先選用Redis來存儲任務執行結果。
實戰
環境
- kaillinux 主機兩台(192.168.29.234,192.168.29.198)
- redis (192.168.29.234 )
- flower (192.168.29.234)
- 任務腳本(兩台都必須部署)
任務腳本
- tasks.py (計算加減乘除)
import os
import sys
import datetime
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from celery import Celery
from celery import chain, group, chord, Task
import celeryconfig
app = Celery()
app.config_from_object(celeryconfig)
__all__ = ['add', 'reduce','sum_all', 'other']
####################################
# tas #
####################################
@app.task
def add(x, y):
return x + y
@app.task
def reduce(x, y):
return x - y
@app.task
def sum(values):
return sum([int(value) for value in values])
@app.task
def other(x, y):
return x * y
- celeryconfig.py
!/usr/bin/python
#coding:utf-8
from kombu import Queue
CELERY_TIMEZONE = 'Asia/Shanghai'
####################################
# 一般配置 #
####################################
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
# List of modules to import when celery starts.
CELERY_IMPORTS = ('tasks', )
CELERYD_MAX_TASKS_PER_CHILD = 40 # 每個worker執行了多少任務就會死掉
BROKER_POOL_LIMIT = 10 #默認celery與broker連接池連接數
CELERY_DEFAULT_QUEUE='default'
CELERY_DEFAULT_ROUTING_KEY='task.default'
CELERY_RESULT_BACKEND='redis://192.168.29.234:6379/0'
BROKER_URL='redis://192.168.29.234:6379/0'
#默認隊列
CELERY_DEFAULT_QUEUE = 'celery'
CELERY_DEFAULT_ROUTING_KEY = 'celery'
CELERYD_LOG_FILE="./logs/celery.log"
CELERY_QUEUEs = (
Queue("queue_add", routing_key='queue_add'),
Queue('queue_reduce', routing_key='queue_sum'),
Queue('celery', routing_key='celery'),
)
CELERY_ROUTES = {
'task.add':{'queue':'queue_add', 'routing_key':'queue_add'},
'task.reduce':{'queue':'queue_reduce', 'routing_key':'queue_sum'},
}
關於flower 是監控任務信息的web 圖表,默認的配置沒有做驗證,而且當主機重啟時,數據會丟失,所以我們要自定義一個flower 文件
在234 上flower.py 的腳本
#!/usr/bin/env python
#coding:utf-8
broker_api = 'redis://127.0.0.1:6379/0'
logging = 'DEBUG'
address = '0.0.0.0'
port = 5555
#外部訪問密碼
#basic_auth=['root:ybl8651073']
persistent=True #持久化celery tasks(如果為false的話,重啟flower之后,監控的task就消失了)
db="/root/flower_db"
運行
- 在198上啟動
celery worker -A tasks --loglevel=info --queues=celery,queue_add --hostname=celery_worker198
- 在234 上啟動
1. redis服務
2. celery worker -A tasks --loglevel=info --queues=celery,queue_reduce --hostname=celery_worker234
3. celery flower worker -A tasks --config==/root/flower.py
服務驗證
- 在任一台有celeryservice項目代碼的服務器上,運行add、reduce、-
- sum、other任務(測試可簡單使用add.delay(1,2)等)
- add只會在198上運行,
- sum任務,可能會在198或234服務器的worker節點運行
- reduce任務,只會在234上運行。
- other任務可能會在198或者234上運行。
打開監控web 192.168.29.234:5555
- 隨機運行幾個任務
-
分析
-
也可以通過 curl提交任務
curl -X POST -d '{"args":[1,2]}' http://192.168.29.234:5555/api/task/async-apply/tasks.add