引言
最近在用 sanic 寫東西,所有涉及到IO阻塞的代碼都需要用 aio 的模塊,好在近年來 asyncio 生態圈發展的還算不錯,該有的都有 ~
近期業務中 登錄/注冊 業務涉及的很復雜(涉及到邀請),需要解鎖、發送短信等操作,想來這么個模塊整的很繁瑣,以后加個滑動驗證那還了得。
於是乎,想整一個類似於celery 的模塊,進行任務解耦,但是目前 celery 還目前不支持異步(官方將在 celery5 支持異步)。
所以目前查閱資料發現了一個 python 實現的 arq 模塊,已經應用在了生產環境,效果還算不錯 ~
官方是這么介紹它的:
- 非阻塞
- 延遲執行、定時任務、重試機制
- 快
- 優雅
- 小
首先先安裝一下它:
$ pip install arq
那么接下來,快速了解下它的使用吧 ~
簡單使用
先看下面編寫的這段代碼
# filename: tasks.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Date: 2019/5/23
import asyncio
from arq import create_pool
from arq.connections import RedisSettings
async def say_hello(ctx, name) -> None:
"""任務函數
Parameters
----------
ctx: dict
工作者上下文
name: string
Returns
-------
dict
"""
print(ctx)
print(f"Hello {name}")
async def startup(ctx):
print("starting...")
async def shutdown(ctx):
print("ending...")
async def main():
# 創建
redis = await create_pool(RedisSettings(password="root123456"))
# 分配任務
await redis.enqueue_job('say_hello', name="liuzhichao")
# WorkerSettings定義了創建工作時要使用的設置,
# 它被arq cli使用
class WorkerSettings:
# 隊列使用 `redis` 配置, 可以配置相關參數
# 例如我的密碼是 `rooot123456`
redis_settings = RedisSettings(password="root123456")
# 被監聽的函數
functions = [say_hello]
# 開啟 `worker` 運行
on_startup = startup
# 終止 `worker` 后運行
on_shutdown = shutdown
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
1、接下來看我們怎么運行它
$ arq tasks.WorkerSettings
Maybe you can see
10:56:25: Starting worker for 1 functions: say_hello
10:56:25: redis_version=4.0.1 mem_usage=32.00M clients_connected=6 db_keys=19189
starting...
2、運行 tasks.py 文件
$ python3 tasks.py
Maybe you can see
11:01:04: 0.29s → 5a5ac0edd5ad4b318b9848637b1ae800:say_hello(name='liuzhichao')
{'redis': <ArqRedis <ConnectionsPool [db:0, size:[1:10], free:1]>>, 'job_id': '5a5ac0edd5ad4b318b9848637b1ae800', 'job_try': 1, 'enqueue_time': datetime.datetime(2019, 5, 23, 3, 1, 4, 570000), 'score': 1558580464570}
Hello liuzhichao
11:01:04: 0.00s ← 5a5ac0edd5ad4b318b9848637b1ae800:say_hello ●
3、那么這個簡單任務就執行完成了,是不是特別簡單 ~
定時任務
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Date: 2019/5/23
from arq import cron
from arq.connections import RedisSettings
async def run_regularly(ctx):
# 表示在 10、11、12 分 50秒的時候打印
print('run job at 26:05, 27:05 and 28:05')
class WorkerSettings:
redis_settings = RedisSettings(password="root123456")
cron_jobs = [
cron(run_regularly, minute={10, 11, 12}, second=50)
]
1、運行它
$ arq tasks.WorkerSettings
If run out of the time,maybe you can see
11:10:25: Starting worker for 1 functions: cron:run_regularly
11:10:25: redis_version=4.0.1 mem_usage=32.00M clients_connected=6 db_keys=19190
11:10:51: 0.51s → cron:run_regularly()
run foo job at 26:05, 27:05 and 28:05
11:10:51: 0.00s ← cron:run_regularly ●
11:11:51: 0.51s → cron:run_regularly()
run foo job at 26:05, 27:05 and 28:05
11:11:51: 0.00s ← cron:run_regularly ●
11:12:50: 0.50s → cron:run_regularly()
run foo job at 26:05, 27:05 and 28:05
11:12:50: 0.00s ← cron:run_regularly ●
按照此時間線,然后會一直進行無限循環下去
更多
更多api學習請查看官方文檔 --> https://arq-docs.helpmanual.io
