談談python里面關於任務隊列


談談python里面關於任務隊列

  • 為什么要做任務隊列

要回答這個問題我們首先看看在流水線上的案列,如果人的速度很慢,機器的速度比人的速度快很多,就會造成,機器生產的東西沒有及時處理,越積越多,造成阻塞,影響生產。

  • 任務隊列的意義:

打個比方如果出現人的速度跟不上機器速度怎么辦,這個時候我們就需要第三方,監管人員(任務隊列)把機器生產的東西,放在一個地方,(隊列),然后分配給每個用戶,有條不理的執行。

python 里面的celery 模塊是一個簡單,靈活且可靠的,處理大量消息的分布式系統,並且提供維護這樣一個系統的必需工具。它是一個專注於實時處理的任務隊列,同時也支持任務調度。

  • 關於安裝celery
pip  install Celery

關於celery 的概念介紹

消息隊列
消息隊列的輸入是工作的一個單元,稱為任務,獨立的職程(Worker)進程持續監視隊列中是否有需要處理的新任務。
Celery 用消息通信,通常使用中間人(Broker)在客戶端和職程間斡旋。這個過程從客戶端向隊列添加消息開始,之后中間人把消息派送給職程,職程對消息進行處理。如下圖所示:
 
Celery 系統可包含多個職程和中間人,以此獲得高可用性和橫向擴展能力。
Celery****的架構
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
消息中間件
Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成,包括,RabbitMQ,Redis,MongoDB等,這里我先去了解RabbitMQ,Redis
任務執行單元
Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中
任務結果存儲
Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括Redis,MongoDB,Django ORM,AMQP等,這里我先不去看它是如何存儲的,就先選用Redis來存儲任務執行結果。

實戰

環境

  • kaillinux 主機兩台(192.168.29.234,192.168.29.198)
  • redis (192.168.29.234 )
  • flower (192.168.29.234)
  • 任務腳本(兩台都必須部署)

任務腳本

  • tasks.py (計算加減乘除)
import os
import sys
import datetime
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from celery import Celery
from celery import chain, group, chord, Task
import celeryconfig
app = Celery()
app.config_from_object(celeryconfig)
__all__ = ['add', 'reduce','sum_all', 'other']
####################################
# tas #
####################################
@app.task
def add(x, y):
    return x + y
@app.task
def reduce(x, y):
    return x - y
@app.task
def sum(values):
    return sum([int(value) for value in values])
@app.task
def other(x, y):
    return x * y

  • celeryconfig.py
!/usr/bin/python
#coding:utf-8
from kombu import Queue
CELERY_TIMEZONE = 'Asia/Shanghai'
####################################
# 一般配置 #
####################################
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
# List of modules to import when celery starts.
CELERY_IMPORTS = ('tasks', )
CELERYD_MAX_TASKS_PER_CHILD = 40 #  每個worker執行了多少任務就會死掉
BROKER_POOL_LIMIT = 10 #默認celery與broker連接池連接數
CELERY_DEFAULT_QUEUE='default'
CELERY_DEFAULT_ROUTING_KEY='task.default'
CELERY_RESULT_BACKEND='redis://192.168.29.234:6379/0'  
BROKER_URL='redis://192.168.29.234:6379/0'  
#默認隊列
CELERY_DEFAULT_QUEUE = 'celery'
CELERY_DEFAULT_ROUTING_KEY = 'celery'
CELERYD_LOG_FILE="./logs/celery.log"
CELERY_QUEUEs = (
    Queue("queue_add", routing_key='queue_add'),
    Queue('queue_reduce', routing_key='queue_sum'),
    Queue('celery', routing_key='celery'),
    )
CELERY_ROUTES = {
    'task.add':{'queue':'queue_add', 'routing_key':'queue_add'},
    'task.reduce':{'queue':'queue_reduce', 'routing_key':'queue_sum'},
}

關於flower 是監控任務信息的web 圖表,默認的配置沒有做驗證,而且當主機重啟時,數據會丟失,所以我們要自定義一個flower 文件

flower github

在234 上flower.py 的腳本

#!/usr/bin/env python
#coding:utf-8
broker_api = 'redis://127.0.0.1:6379/0'
logging = 'DEBUG'
address = '0.0.0.0'
port = 5555
#外部訪問密碼
#basic_auth=['root:ybl8651073']
persistent=True  #持久化celery tasks(如果為false的話,重啟flower之后,監控的task就消失了)
db="/root/flower_db"

運行

  • 在198上啟動
celery worker -A  tasks --loglevel=info --queues=celery,queue_add --hostname=celery_worker198
  • 在234 上啟動
1.  redis服務
2.  celery worker -A  tasks --loglevel=info --queues=celery,queue_reduce --hostname=celery_worker234
3.  celery  flower worker -A  tasks  --config==/root/flower.py 

服務驗證

  • 在任一台有celeryservice項目代碼的服務器上,運行add、reduce、-
  • sum、other任務(測試可簡單使用add.delay(1,2)等)
  • add只會在198上運行,
  • sum任務,可能會在198或234服務器的worker節點運行
  • reduce任務,只會在234上運行。
  • other任務可能會在198或者234上運行。

打開監控web 192.168.29.234:5555

 兩台上線workers

  • 隨機運行幾個任務

image.png

  • 分析
    運行結果

  • 也可以通過 curl提交任務

curl -X POST -d '{"args":[1,2]}' http://192.168.29.234:5555/api/task/async-apply/tasks.add


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM