國產celery簡單使用


官方文檔

Welcome to function_scheduling_distributed_framework_docs’s documentation! — function_scheduling_distributed_framework v0.1 文檔 (function-scheduling-distributed-framework.readthedocs.io)
一、安裝方式

pip install function_scheduling_distributed_framework 

二、簡單使用

import time
from function_scheduling_distributed_framework import task_deco, BrokerEnum


@task_deco("task_queue_name1", qps=5, broker_kind=BrokerEnum.PERSISTQUEUE)  # 入參包括20種,運行控制方式非常多,想得到的控制都會有。
def task_fun(x, y):
    print(f'{x} + {y} = {x + y}')
    time.sleep(3)  # 框架會自動並發繞開這個阻塞,無論函數內部隨機耗時多久都能自動調節並發達到每秒運行 5 次 這個 task_fun 函數的目的。


if __name__ == "__main__":
    for i in range(100):
        task_fun.push(i, y=i * 2)  # 發布者發布任務
    task_fun.consume()  # 消費者啟動循環調度並發消費任務

對於消費函數,框架內部會生成發布者(生產者)和消費者。
1.推送。 task_fun.push(1,y=2) 會把 {"x":1,"y":2} (消息也自動包含一些其他輔助信息) 發送到中間件的 task_queue_name1 隊列中。
2.消費。 task_fun.consume() 開始自動從中間件拉取消息,並發的調度運行函數,task_fun(**{"x":1,"y":2}),每秒運行5次
整個過程只有這兩步,清晰明了,其他的控制方式需要看 task_deco 的中文入參解釋,全都參數都很有用。


這個是單個腳本實現了發布和消費,一般都是分離成兩個文件的,任務發布和任務消費無需在同一個進程的解釋器內部,
因為是使用了中間件解耦消息和持久化消息,不要被例子誤導成了,以為發布和消費必須放在同一個腳本里面


使用方式只需要這一個例子就行了,其他舉得例子只是改了下broker_kind和其他參數而已,
而且裝飾器的入參已近解釋得非常詳細了,框架濃縮到了一個裝飾器,並沒有用戶需要從框架里面要繼承什么組合什么的復雜寫法。
 

三、框架詳細介紹

1.各種中間件選擇的場景和優勢

RABBITMQ_AMQPSTORM = 0  # 使用 amqpstorm 包操作rabbitmq  作為 分布式消息隊列,支持消費確認.推薦這個。

RABBITMQ_RABBITPY = 1  # 使用 rabbitpy 包操作rabbitmq  作為 分布式消息隊列,支持消費確認。

REDIS = 2  # 使用 redis 的 list結構,brpop 作為分布式消息隊列。隨意重啟和關閉會丟失大量消息任務,不支持消費確認。

LOCAL_PYTHON_QUEUE = 3  # 使用python queue.Queue實現的基於當前python進程的消息隊列,不支持跨進程 跨腳本 跨機器共享任務,不支持持久化,適合一次性短期簡單任務。

RABBITMQ_PIKA = 4  # 使用pika包操作rabbitmq  作為 分布式消息隊列。

MONGOMQ = 5  # 使用mongo的表中的行模擬的 作為分布式消息隊列,支持消費確認。

PERSISTQUEUE = 6  # 使用基於sqlute3模擬消息隊列,支持消費確認和持久化,但不支持跨機器共享任務,可以基於本機單機跨腳本和跨進程共享任務,好處是不需要安裝中間件。

NSQ = 7  # 基於nsq作為分布式消息隊列,支持消費確認。

KAFKA = 8  # 基於kafka作為分布式消息隊列,建議使用BrokerEnum.CONFLUENT_KAFKA。

REDIS_ACK_ABLE = 9  # 基於redis的 list + 臨時unack的set隊列,采用了 lua腳本操持了取任務和加到pengding為原子性,隨意重啟和掉線不會丟失任務。

SQLACHEMY = 10  # 基於SQLACHEMY 的連接作為分布式消息隊列中間件支持持久化和消費確認。支持mysql oracle sqlserver等5種數據庫。

ROCKETMQ = 11  # 基於 rocketmq 作為分布式消息隊列,這個中間件必須在linux下運行,win不支持。

REDIS_STREAM = 12  # 基於redis 5.0 版本以后,使用 stream 數據結構作為分布式消息隊列,支持消費確認和持久化和分組消費,是redis官方推薦的消息隊列形式,比list結構更適合。

ZEROMQ = 13  # 基於zeromq作為分布式消息隊列,不需要安裝中間件,可以支持跨機器但不支持持久化。

RedisBrpopLpush = 14  # 基於redis的list結構但是采用brpoplpush 雙隊列形式,和 redis_ack_able的實現差不多,實現上采用了原生命令就不需要lua腳本來實現取出和加入unack了。

"""
操作 kombu 包,這個包也是celery的中間件依賴包,這個包可以操作10種中間件(例如rabbitmq redis),但沒包括分布式函數調度框架的kafka nsq zeromq 等。
同時 kombu 包的性能非常差,可以用原生redis的lpush和kombu的publish測試發布,使用brpop 和 kombu 的 drain_events測試消費,對比差距相差了5到10倍。
由於性能差,除非是分布式函數調度框架沒實現的中間件才選kombu方式(例如kombu支持亞馬遜隊列  qpid pyro 隊列),否則強烈建議使用此框架的操作中間件方式而不是使用kombu。
"""
KOMBU = 15

"""基於confluent-kafka包,包的性能比kafka-python提升10倍。同時應對反復隨意重啟部署消費代碼的場景,此消費者實現至少消費一次,第8種BrokerEnum.KAFKA是最多消費一次。"""
CONFLUENT_KAFKA = 16

""" 基於emq作為中間件的。這個和上面的中間件有很大不同,服務端不存儲消息。所以不能先發布幾十萬個消息,然后再啟動消費。mqtt優點是web前后端能交互,
前端不能操作redis rabbitmq kafka,但很方便操作mqtt。這種使用場景是高實時的互聯網接口。
"""
MQTT = 17

HTTPSQS = 18  # 基於httpsqs的

PULSAR = 20   # 下一代分布式消息系統。5年后會同時取代rabbitmq和kafka。

2.框架支持的函數調度並發模式種類詳細介紹

1、threading 多線程,使用自定義的可縮小、節制開啟新線程的自定義線程池,不是直接用官方內置concurrent.futures.ThreadpoolExecutor
   此線程池非常智能,配合qps參數,任何場景可以無腦開500線程,真正的做到智能擴張,智能自動縮小。
   這線程池是智能線程池,由於非常好用,為這個線程池做了獨立的pypi包,可以單獨用於沒有使用此框架的項目。

2、gevent    需要在運行起點的腳本首行打 gevent 猴子補丁。

3、eventlet  需要在運行起點的腳本首行打 eventlet 猴子補丁。

4、asyncio  async異步,主要是針對消費函數已經定義成了   async def fun(x)  這種情況,這種情況不能直接使用多線程,
   因為執行  fun(1)  后得到的並不是所想象的函數最終結果,而是得到的一個協程對象,所以針對已經定義成異步函數了的,需要使用此種並發模式。
   框架不鼓勵用戶定義異步函數,你就用同步的直觀方式思維定義函數就行了,其余的並發調度交給框架就行了。

5、開啟多進程啟動多個consumer,此模式是 多進程  + 上面4種的其中一種並發方式,充分利用多核和充分利用io,用法如下。可以實現 多進程 疊加 協程並發。
# 這種是多進程方式,一次編寫能夠兼容win和linux的運行。

from function_scheduling_distributed_framework import task_deco, BrokerEnum, ConcurrentModeEnum
import os

@task_deco('test_multi_process_queue',broker_kind=BrokerEnum.REDIS_ACK_ABLE,
           concurrent_mode=ConcurrentModeEnum.THREADING,)
def fff(x):
    print(x * 10,os.getpid())

if __name__ == '__main__':
    fff.multi_process_consume(6)  # 一次性啟動6進程疊加多線程。

3 、框架最重要的task_deco裝飾器參數說明

:param queue_name: 隊列名字,只有這個參數是必填的,而且每個任務函數都要使用唯一的隊列名字,其他參數都是可選的。
    :param function_timeout : 超時秒數,函數運行超過這個時間,則自動殺死函數。為0是不限制。
    :param concurrent_num:並發數量,
    :param specify_concurrent_pool:使用指定的線程池(協程池),可以多個消費者共使用一個線程池,不為None時候。threads_num失效
    :param concurrent_mode:並發模式,1線程 2gevent 3eventlet 4 asyncio
    :param max_retry_times: 最大自動重試次數,當函數發生錯誤,立即自動重試運行n次,對一些特殊不穩定情況會有效果。
           可以在函數中主動拋出重試的異常ExceptionForRetry,框架也會立即自動重試。
           主動拋出ExceptionForRequeue異常,則當前消息會重返中間件。
    :param log_level:框架的日志級別,默認是debug級別,可以看到詳細的執行信息,如果不想看到太多詳細的日志,可以設置為logging.INFO常量(20) 或者數字20。
    :param is_print_detail_exception:是否打印詳細的堆棧錯誤。為0則打印簡略的錯誤占用控制台屏幕行數少。
    :param qps:指定1秒內的函數執行次數,qps會覆蓋msg_schedule_time_intercal,以后廢棄msg_schedule_time_intercal這個參數。
    :param msg_expire_senconds:消息過期時間,為0永不過期,為10則代表,10秒之前發布的任務如果現在才輪到消費則丟棄任務。
    :param is_using_distributed_frequency_control: 是否使用分布式空頻(依賴redis計數),默認只對當前實例化的消費者空頻有效。假如實例化了2個qps為10的使用同一隊列名的消費者,
               並且都啟動,則每秒運行次數會達到20。如果使用分布式空頻則所有消費者加起來的總運行次數是10。
    :param is_send_consumer_hearbeat_to_redis   時候將發布者的心跳發送到redis,有些功能的實現需要統計活躍消費者。因為有的中間件不是真mq。
    :param logger_prefix: 日志前綴,可使不同的消費者生成不同的日志
    :param create_logger_file : 是否創建文件日志
    :param do_task_filtering :是否執行基於函數參數的任務過濾
    :param task_filtering_expire_seconds:任務過濾的失效期,為0則永久性過濾任務。例如設置過濾過期時間是1800秒 ,
           30分鍾前發布過1 + 2 的任務,現在仍然執行,
           如果是30分鍾以內發布過這個任務,則不執行1 + 2,現在把這個邏輯集成到框架,一般用於接口價格緩存。
    :param is_consuming_function_use_multi_params  函數的參數是否是傳統的多參數,不為單個body字典表示多個參數。
    :param is_do_not_run_by_specify_time_effect :是否使不運行的時間段生效
    :param do_not_run_by_specify_time   :不運行的時間段
    :param schedule_tasks_on_main_thread :直接在主線程調度任務,意味着不能直接在當前主線程同時開啟兩個消費者。
    :param function_result_status_persistance_conf   :配置。是否保存函數的入參,運行結果和運行狀態到mongodb。
           這一步用於后續的參數追溯,任務統計和web展示,需要安裝mongo。
    :param is_using_rpc_mode 是否使用rpc模式,可以在發布端獲取消費端的結果回調,但消耗一定性能,使用async_result.result時候會等待阻塞住當前線程。。
    :param broker_kind:中間件種類,。 0 使用pika鏈接rabbitmqmq,1使用rabbitpy包實現的操作rabbitmnq,2使用redis,
           3使用python內置Queue,4使用amqpstorm包實現的操作rabbitmq,5使用mongo,6使用本機磁盤持久化。
           7使用nsq,8使用kafka,9也是使用redis但支持消費確認。10為sqlachemy,支持mysql sqlite postgre oracel sqlserver
           11使用rocketmq. 12使用redis的 stream 數據結作為中間件,這個也支持消費確認 。13 zeromq

3.1 task_deco裝飾器的concurrent_num和qps之間的關系

concurrent_num:並發數量。
    qps qps是有個很有趣的參數,能精確控制函數每秒運行多少次。
    concurrent_num和qps存在着一定的關系。
    
    例如對於下面這個函數
    
    def func(x):
           time.sleep(2)
           print(x)

    1)如果設置 concurrent_num = 1000(或100萬)  qps = 10
    那么一秒鍾會執行10次func函數。如果不指定qps的值,則不進行控頻,消費框架會平均每秒鍾會執行50次函數func。

    如果設置concurrent_num = 1000  qps = 5   
    那么一秒鍾會執行5次func函數。所以可以看到,當你不知道要開多少並發合適的時候,可以粗暴開1000個線程,但要設置一個qps。
   
    那為什么次框架,可以讓你粗暴的設置1000設置100萬線程呢,並不是做了數字截取,判斷線程設置大於多少就自動調小了,此消費框架並沒有這樣去實現。
    而是次框架使用的非concurrent.tutures.ThreadpoolExecutor,是使用的自定義的  ThreadPoolExecutorShrinkAble 線程池,
    此線程池其中之一的功能就是節制開更多的線程,因為對於上面的休眠2秒的func函數,如果設置concurrent_num = 1000000  qps = 5,
    正常來說開10個線程足夠實現每秒執行5次了,此框架在調節線程新增線程時候進行了更多的判斷,所以原生線程池不可以設置100萬大小,
    而ThreadPoolExecutorShrinkAble可以設置為100萬大小。

    此外ThreadPoolExecutorShrinkAble 實現了線程池自動縮小的功能,這也是原生concurrent.tutures.ThreadpoolExecutor沒有的功能。
    自動縮小是什么意思呢,比如一段時間任務非常密集1秒鍾來了幾百個任務,所以當時開啟了很多線程來應付,但一段時間后每分鍾只來了個把任務,
    此時 ThreadPoolExecutorShrinkAble 能夠自動縮小線程池,
    ThreadPoolExecutorShrinkAble實現了java ThreadpoolExecutor的KeepAliveTime參數的功能,
    原生concurrent.tutures.ThreadpoolExecutor線程池即使以后永久不來新任務,之前開的線程數量一致保持這。

    關於 ThreadPoolExecutorShrinkAble 的厲害之處,可以參考 https://github.com/ydf0509/threadpool_executor_shrink_able
    
    最終關於 concurrent_num 大小設置為多少,看自己需求,上面說的100萬是舉個例子,
    實際這個參數還被用作為線程池的任務隊列的有界隊列的大小,所以一定要設置為1000以下,否則如果設置為100萬,
    從消息中間件預取出的消息過多,造成python內存大、單個消費者掏空消息隊列中間件造成別的新啟動的消費者無任務可消費、
    對於不支持消費確認類型的中間件的隨意重啟會丟失大量正在運行的任務 等不利影響。

    2)上面的func函數,設置 concurrent_num = 1  qps = 100,那會如何呢?
       由於你設置的並發是1,對於一個需要2秒運行完成的函數,顯然平均每2秒才能執行1次,就是框架真正的只能達到0.5個qps。
       所以 concurrent_num 和 qps,既有關系,也不是絕對的關系。
    
    在對一個隨機消耗時間的函數進行並發控制時候,如果函數的運行時間是0.5到20秒任意時間不確定的徘徊,你可以設置 concurrent_num = 100,
    如果合作方要求了只能1秒鍾能讓你使用多少次,例如需要精確控頻10次,可以設置qps =10,concurrent_num隨便搞個 一兩百 兩三百就行了,
    因為是智能的克制的調節線程池大小的,所以不會真的達到concurrent_num的值。

    3)qps是個小數可以小於1,如果要設置10秒執行一次函數,則設置qps=0.1

    這主要是介紹了 concurrent_num 和qps的關系和設置值,qps是優先,但受到concurrent_num的約束。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM