更新 python萬能消費框架,新增7種中間件(或操作mq的包)和三種並發模式。
框架目的是分布式調度起一切任何函數(當然也包括調度起一切任何方法)。
之前寫的是基於rabbitmq的,作為專用的消息隊列好處比redis的list結構好很多。但有的人還是強烈喜歡用redis,以及rabbitmq安裝比redis麻煩點。
現在加入reids作為中間件的方式。(支持僅僅修改一個字母就達到全局切換使用何種中間件,其余代碼不需要做任何一處修改就可以 正常運行)
使用 模板模式 加工廠模式 加策略模式(消費者調用的函數,用戶自己寫的每一個被消費的函數單元都是策略函數)
解釋下為什么不直接用celery呢?
1、寫法過於麻煩了,運行方式也麻煩一些。
2、對文件的位置要求高一點,可以設置main來解決。但還是沒太方便。這個可以把消費代碼隨意移動文件到任何層級的文件夾或者移到任何地方。
3、配置項高達200個,復雜,英文文檔2000頁,我是看了5遍,太復雜了公司推廣很麻煩,不是每個人都喜歡反復幾千頁看好幾次。
這是之前的,運行方式很古怪,不喜歡命令行方式,ide不能補全愛出錯。
celery_main = 'app.apis.list_page_live_price.live_price_celery_app'
# -*- coding: utf-8 -*- # @Author : ydf """ 類celery的worker模式,可用於一切需要分布式並發的地方,最好是io類型的。可以分布式調度起一切函數。 rabbitmq生產者和消費者框架。完全實現了celery worker模式的全部功能,使用更簡單。支持自動重試指定次數,消費確認,指定數量的並發線程,和指定頻率控制1秒鍾只運行幾次, 同時對mongodb類型的異常做了特殊處理 最開始寫得是使用pika包,非線程安全,后來加入rabbitpy,rabbitpy包推送會丟失部分數據,推薦pika包使用 單下划線代表保護,雙下划線代表私有。只要關注公有方法就可以,其余是類內部自調用方法。 3月15日 1)、新增RedisConsumer 是基於redis中間件的消費框架,不支持隨意暫停程序或者斷點,會丟失一部分正在運行中的任務,推薦使用rabbitmq的方式。 get_consumer是使用工廠模式來生成基於rabbit和reids的消費者,使用不同中間件的消費框架更靈活一點點,只需要修改一個數字。 3月20日 2)、增加支持函數參數過濾的功能,可以隨時放心多次推送相同的任務到中間件,會先檢查該任務是否需要執行,避免浪費cpu和流量,加快處理速度。 基於函數參數值的過濾,需要設置 do_task_filtering 參數為True才生效,默認為False。 3)、新增支持了函數的參數是多個參數,需要設置is_consuming_function_use_multi_params 為True生效,為了兼容老代碼默認為False。 區別是消費函數原來需要 def f(body): # 函數有且只能有一個參數,是字典的多個鍵值對來表示參數的值。 print(body['a']) print(body['b']) 現在可以 def f(a,b): print(a) print(b) 對於推送的部分,都是一樣的,都是推送 {"a":1,"b":2} 6月3日 1) 增加了RedisPublisher類,和增加get_publisher工廠模式 方法同mqpublisher一樣,這是為了增強一致性,以后每個業務的推送和消費,如果不直接使用RedisPublisher RedisConsumerer RabbitmqPublisher RabbitMQConsumer這些類,而是使用get_publisher和get_consumer來獲取發布和消費對象,支持修改一個全局變量的broker_kind數字來切換所有平台消費和推送的中間件種類。 2)增加指定不運行的時間的配置。例如可以白天不運行,只在晚上運行。 3)增加了函數超時的配置,當函數運行時間超過n秒后,自動殺死函數,拋出異常。 4) 增加每分鍾函數運行次數統計,和按照最近一分鍾運行函數次數來預估多久可以運行完成當前隊列剩余的任務。 5) 增加一個判斷函數,阻塞判斷連續多少分鍾隊列里面是空的。判斷任務疑似完成。 6)增加一個終止消費者的標志,設置標志后終止循環調度消息。 7) consumer對象增加內置一個屬性,表示相同隊列名的publisher實例。 """ # import functools import abc import copy import traceback import typing import json from collections import Callable, OrderedDict import time from concurrent.futures import ThreadPoolExecutor from functools import wraps from threading import Lock, Thread import unittest import rabbitpy from pika import BasicProperties # noinspection PyUnresolvedReferences from pika.exceptions import ChannelClosed, AMQPError # from rabbitpy.message import Properties import pika from pika.adapters.blocking_connection import BlockingChannel from pymongo.errors import PyMongoError from app.utils_ydf import (LogManager, LoggerMixin, RedisMixin, BoundedThreadPoolExecutor, RedisBulkWriteHelper, RedisOperation, decorators, time_util, LoggerLevelSetterMixin, nb_print) from app import config as app_config # LogManager('pika').get_logger_and_add_handlers(10) # LogManager('pika.heartbeat').get_logger_and_add_handlers(10) # LogManager('rabbitpy').get_logger_and_add_handlers(10) # LogManager('rabbitpy.base').get_logger_and_add_handlers(10) def delete_keys_from_dict(dictx: dict, keys: list): for dict_key in keys: dictx.pop(dict_key) class ExceptionForRetry(Exception): """為了重試的,拋出錯誤。只是定義了一個子類,用不用都可以""" class ExceptionForRequeue(Exception): """框架檢測到此錯誤,重新放回隊列中""" class ExceptionForRabbitmqRequeue(ExceptionForRequeue): # 以后去掉這個異常,拋出上面那個異常就可以了。 """遇到此錯誤,重新放回隊列中""" class RabbitmqClientRabbitPy: """ 使用rabbitpy包。 """ # noinspection PyUnusedLocal def __init__(self, username, password, host, port, virtual_host, heartbeat=0): rabbit_url = f'amqp://{username}:{password}@{host}:{port}/{virtual_host}?heartbeat={heartbeat}' self.connection = rabbitpy.Connection(rabbit_url) def creat_a_channel(self) -> rabbitpy.AMQP: return rabbitpy.AMQP(self.connection.channel()) # 使用適配器,使rabbitpy包的公有方法幾乎接近pika包的channel的方法。 class RabbitmqClientPika: """ 使用pika包,多線程不安全的包。 """ def __init__(self, username, password, host, port, virtual_host, heartbeat=0): """ parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F') connection = pika.SelectConnection(parameters=parameters, on_open_callback=on_open) :param username: :param password: :param host: :param port: :param virtual_host: :param heartbeat: """ credentials = pika.PlainCredentials(username, password) self.connection = pika.BlockingConnection(pika.ConnectionParameters( host, port, virtual_host, credentials, heartbeat=heartbeat)) def creat_a_channel(self) -> BlockingChannel: return self.connection.channel() class RabbitMqFactory: def __init__(self, username=app_config.RABBITMQ_USER, password=app_config.RABBITMQ_PASS, host=app_config.RABBITMQ_HOST, port=app_config.RABBITMQ_PORT, virtual_host=app_config.RABBITMQ_VIRTUAL_HOST, heartbeat=60 * 10, is_use_rabbitpy=0): """ :param username: :param password: :param port: :param virtual_host: :param heartbeat: :param is_use_rabbitpy: 為0使用pika,多線程不安全。為1使用rabbitpy,多線程安全的包。 """ if is_use_rabbitpy: self.rabbit_client = RabbitmqClientRabbitPy(username, password, host, port, virtual_host, heartbeat) else: self.rabbit_client = RabbitmqClientPika(username, password, host, port, virtual_host, heartbeat) def get_rabbit_cleint(self): return self.rabbit_client class AbstractPublisher(LoggerLevelSetterMixin, metaclass=abc.ABCMeta, ): def __init__(self, queue_name, log_level_int=10, logger_prefix='', is_add_file_handler=True, clear_queue_within_init=False): """ :param queue_name: :param log_level_int: :param logger_prefix: :param is_add_file_handler: :param clear_queue_within_init: """ self._queue_name = queue_name if logger_prefix != '': logger_prefix += '--' logger_name = f'{logger_prefix}{self.__class__.__name__}--{queue_name}' self.logger = LogManager(logger_name).get_logger_and_add_handlers(log_level_int, log_filename=f'{logger_name}.log' if is_add_file_handler else None) # # self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=is_use_rabbitpy).get_rabbit_cleint() # self.channel = self.rabbit_client.creat_a_channel() # self.queue = self.channel.queue_declare(queue=queue_name, durable=True) self._lock_for_pika = Lock() self._lock_for_count = Lock() self._current_time = None self.count_per_minute = None self._init_count() self.init_broker() self.logger.info(f'{self.__class__} 被實例化了') self.publish_msg_num_total = 0 if clear_queue_within_init: self.clear() def _init_count(self): with self._lock_for_count: self._current_time = time.time() self.count_per_minute = 0 @abc.abstractmethod def init_broker(self): pass def publish(self, msg: typing.Union[str, dict]): if isinstance(msg, dict): msg = json.dumps(msg) t_start = time.time() decorators.handle_exception(retry_times=10, is_throw_error=True, time_sleep=0.1)(self.concrete_realization_of_publish)(msg) self.logger.debug(f'向{self._queue_name} 隊列,推送消息 耗時{round(time.time() - t_start, 5)}秒 {msg}') with self._lock_for_count: self.count_per_minute += 1 self.publish_msg_num_total += 1 if time.time() - self._current_time > 10: self.logger.info(f'10秒內推送了 {self.count_per_minute} 條消息,累計推送了 {self.publish_msg_num_total} 條消息到 {self._queue_name} 中') self._init_count() @abc.abstractmethod def concrete_realization_of_publish(self, msg): raise NotImplementedError @abc.abstractmethod def clear(self): raise NotImplementedError @abc.abstractmethod def get_message_count(self): raise NotImplementedError @abc.abstractmethod def close(self): raise NotImplementedError def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() self.logger.warning(f'with中自動關閉publisher連接,累計推送了 {self.publish_msg_num_total} 條消息 ') def deco_mq_conn_error(f): def _inner(self, *args, **kwargs): try: return f(self, *args, **kwargs) except AMQPError as e: self.logger.error(f'rabbitmq鏈接出錯 ,方法 {f.__name__} 出錯 ,{e}') self.init_broker() return f(self, *args, **kwargs) return _inner class RabbitmqPublisher(AbstractPublisher): """ 使用pika實現的。 """ # noinspection PyAttributeOutsideInit def init_broker(self): self.logger.warning(f'使用pika 鏈接mq') self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint() self.channel = self.rabbit_client.creat_a_channel() self.queue = self.channel.queue_declare(queue=self._queue_name, durable=True) # noinspection PyAttributeOutsideInit @deco_mq_conn_error def concrete_realization_of_publish(self, msg): with self._lock_for_pika: # 親測pika多線程publish會出錯。 # if self.channel.connection.is_closed or self.channel.is_closed: # 有時候斷了。 # self.logger.critical('發布消息,pika鏈接斷了 “self.channel.connection.is_closed or self.channel.is_closed ”') # self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint() # self.channel = self.rabbit_client.creat_a_channel() # self.queue = self.channel.queue_declare(queue=self._queue_name, durable=True) # import random # if random.randint(0, 3) != 1: # raise AMQPError self.channel.basic_publish(exchange='', routing_key=self._queue_name, body=msg, properties=BasicProperties( delivery_mode=2, # make message persistent 2(1是非持久化) ) ) @deco_mq_conn_error def clear(self): self.channel.queue_purge(self._queue_name) self.logger.warning(f'清除 {self._queue_name} 隊列中的消息成功') @deco_mq_conn_error def get_message_count(self): queue = self.channel.queue_declare(queue=self._queue_name, durable=True) return queue.method.message_count # @deco_mq_conn_error def close(self): self.channel.close() self.rabbit_client.connection.close() self.logger.warning('關閉pika包 鏈接') class RabbitmqPublisherUsingRabbitpy(AbstractPublisher): """ 使用rabbitpy包實現的。 """ # noinspection PyAttributeOutsideInit def init_broker(self): self.logger.warning(f'使用rabbitpy包 鏈接mq') self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=1).get_rabbit_cleint() self.channel = self.rabbit_client.creat_a_channel() self.queue = self.channel.queue_declare(queue=self._queue_name, durable=True) @deco_mq_conn_error def concrete_realization_of_publish(self, msg): # noinspection PyTypeChecker self.channel.basic_publish( exchange='', routing_key=self._queue_name, body=msg, properties={'delivery_mode': 2}, ) @deco_mq_conn_error def clear(self): self.channel.queue_purge(self._queue_name) self.logger.warning(f'清除 {self._queue_name} 隊列中的消息成功') @deco_mq_conn_error def get_message_count(self): # noinspection PyUnresolvedReferences ch_raw_rabbity = self.channel.channel return rabbitpy.amqp_queue.Queue(ch_raw_rabbity, self._queue_name, durable=True) # @deco_mq_conn_error def close(self): self.channel.close() self.rabbit_client.connection.close() self.logger.warning('關閉rabbitpy包 鏈接mq') class RedisPublisher(AbstractPublisher, RedisMixin): """ 使用redis作為中間件 """ def init_broker(self): pass def concrete_realization_of_publish(self, msg): # noinspection PyTypeChecker self.redis_db7.rpush(self._queue_name, msg) def clear(self): self.redis_db7.delete(self._queue_name) self.logger.warning(f'清除 {self._queue_name} 隊列中的消息成功') def get_message_count(self): return self.redis_db7.llen(self._queue_name) def close(self): # self.redis_db7.connection_pool.disconnect() pass class RedisFilter(RedisMixin): def __init__(self, redis_key_name): self._redis_key_name = redis_key_name @staticmethod def _get_ordered_str(value): """對json的鍵值對在redis中進行過濾,需要先把鍵值對排序,否則過濾會不准確如 {"a":1,"b":2} 和 {"b":2,"a":1}""" if isinstance(value, str): value = json.loads(value) ordered_dict = OrderedDict() for k in sorted(value): ordered_dict[k] = value[k] return json.dumps(ordered_dict) def add_a_value(self, value: typing.Union[str, dict]): self.redis_db7.sadd(self._redis_key_name, self._get_ordered_str(value)) def check_value_exists(self, value): return self.redis_db7.sismember(self._redis_key_name, self._get_ordered_str(value)) class AbstractConsumer(LoggerLevelSetterMixin, metaclass=abc.ABCMeta, ): shedual_task_thread_for_join_on_linux_multiprocessing = list() time_interval_for_check_do_not_run_time = 60 BROKER_KIND = None @property @decorators.synchronized def publisher_of_same_queue(self): if not self._publisher_of_same_queue: self._publisher_of_same_queue = get_publisher(self._queue_name, broker_kind=self.BROKER_KIND) return self._publisher_of_same_queue @classmethod def join_shedual_task_thread(cls): """ :return: """ """ def ff(): RabbitmqConsumer('queue_test', consuming_function=f3, threads_num=20, msg_schedule_time_intercal=2, log_level=10, logger_prefix='yy平台消費', is_consuming_function_use_multi_params=True).start_consuming_message() RabbitmqConsumer('queue_test2', consuming_function=f4, threads_num=20, msg_schedule_time_intercal=4, log_level=10, logger_prefix='zz平台消費', is_consuming_function_use_multi_params=True).start_consuming_message() AbstractConsumer.join_shedual_task_thread() # 如果開多進程啟動消費者,在linux上需要這樣寫下這一行。 if __name__ == '__main__': [Process(target=ff).start() for _ in range(4)] """ for t in cls.shedual_task_thread_for_join_on_linux_multiprocessing: t.join() def __init__(self, queue_name, *, consuming_function: Callable = None, function_timeout=0, threads_num=50, specify_threadpool: ThreadPoolExecutor = None, max_retry_times=3, log_level=10, is_print_detail_exception=True, msg_schedule_time_intercal=0.0, logger_prefix='', create_logger_file=True, do_task_filtering=False, is_consuming_function_use_multi_params=True, is_do_not_run_by_specify_time_effect=False, do_not_run_by_specify_time=('10:00:00', '22:00:00'), schedule_tasks_on_main_thread=False): """ :param queue_name: :param consuming_function: 處理消息的函數。 :param function_timeout : 超時秒數,函數運行超過這個時間,則自動殺死函數。為0是不限制。 :param threads_num: :param specify_threadpool:使用指定的線程池,可以多個消費者共使用一個線程池,不為None時候。threads_num失效 :param max_retry_times: :param log_level: :param is_print_detail_exception: :param msg_schedule_time_intercal:消息調度的時間間隔,用於控頻 :param logger_prefix: 日志前綴,可使不同的消費者生成不同的日志 :param create_logger_file : 是否創建文件日志 :param do_task_filtering :是否執行基於函數參數的任務過濾 :is_consuming_function_use_multi_params 函數的參數是否是傳統的多參數,不為單個body字典表示多個參數。 :param is_do_not_run_by_specify_time_effect :是否使不運行的時間段生效 :param do_not_run_by_specify_time :不運行的時間段 :param schedule_tasks_on_main_thread :直接在主線程調度任務,意味着不能直接在當前主線程同時開啟兩個消費者。 """ self._queue_name = queue_name self.consuming_function = consuming_function self._function_timeout = function_timeout self._threads_num = threads_num self.threadpool = specify_threadpool if specify_threadpool else BoundedThreadPoolExecutor(threads_num + 1) # 單獨加一個檢測消息數量和心跳的線程 self._max_retry_times = max_retry_times self._is_print_detail_exception = is_print_detail_exception self._msg_schedule_time_intercal = msg_schedule_time_intercal self._logger_prefix = logger_prefix self._log_level = log_level if logger_prefix != '': logger_prefix += '--' logger_name = f'{logger_prefix}{self.__class__.__name__}--{queue_name}' self.logger = LogManager(logger_name).get_logger_and_add_handlers(log_level, log_filename=f'{logger_name}.log' if create_logger_file else None) self.logger.info(f'{self.__class__} 被實例化') self._do_task_filtering = do_task_filtering self._redis_filter_key_name = f'filter:{queue_name}' self._redis_filter = RedisFilter(self._redis_filter_key_name) self._is_consuming_function_use_multi_params = is_consuming_function_use_multi_params self._lock_for_pika = Lock() self._execute_task_times_every_minute = 0 # 每分鍾執行了多少次任務。 self._lock_for_count_execute_task_times_every_minute = Lock() self._current_time_for_execute_task_times_every_minute = time.time() self._msg_num_in_broker = 0 self._last_timestamp_when_has_task_in_queue = 0 self._last_timestamp_print_msg_num = 0 self._is_do_not_run_by_specify_time_effect = is_do_not_run_by_specify_time_effect self._do_not_run_by_specify_time = do_not_run_by_specify_time # 可以設置在指定的時間段不運行。 self._schedule_tasks_on_main_thread = schedule_tasks_on_main_thread self.stop_flag = False self._publisher_of_same_queue = None def keep_circulating(self, time_sleep=0.001, exit_if_function_run_sucsess=False, is_display_detail_exception=True): """間隔一段時間,一直循環運行某個方法的裝飾器 :param time_sleep :循環的間隔時間 :param is_display_detail_exception :param exit_if_function_run_sucsess :如果成功了就退出循環 """ def _keep_circulating(func): # noinspection PyBroadException @wraps(func) def __keep_circulating(*args, **kwargs): while 1: if self.stop_flag: break try: result = func(*args, **kwargs) if exit_if_function_run_sucsess: return result except Exception as e: msg = func.__name__ + ' 運行出錯\n ' + traceback.format_exc(limit=10) if is_display_detail_exception else str(e) self.logger.error(msg) finally: time.sleep(time_sleep) return __keep_circulating return _keep_circulating def start_consuming_message(self): # self.threadpool.submit(decorators.keep_circulating(20)(self.check_heartbeat_and_message_count)) self.threadpool.submit(self.keep_circulating(20)(self.check_heartbeat_and_message_count)) if self._schedule_tasks_on_main_thread: # decorators.keep_circulating(1)(self._shedual_task)() self.keep_circulating(1)(self._shedual_task)() else: # t = Thread(target=decorators.keep_circulating(1)(self._shedual_task)) t = Thread(target=self.keep_circulating(1)(self._shedual_task)) self.__class__.shedual_task_thread_for_join_on_linux_multiprocessing.append(t) t.start() @abc.abstractmethod def _shedual_task(self): raise NotImplementedError def _run_consuming_function_with_confirm_and_retry(self, kw: dict, current_retry_times=0): if self._do_task_filtering and self._redis_filter.check_value_exists(kw['body']): # 對函數的參數進行檢查,過濾已經執行過並且成功的任務。 self.logger.info(f'redis的 [{self._redis_filter_key_name}] 鍵 中 過濾任務 {kw["body"]}') self._confirm_consume(kw) return with self._lock_for_count_execute_task_times_every_minute: self._execute_task_times_every_minute += 1 if time.time() - self._current_time_for_execute_task_times_every_minute > 60: self.logger.info( f'一分鍾內執行了 {self._execute_task_times_every_minute} 次函數 [ {self.consuming_function.__name__} ] ,預計' f'還需要 {time_util.seconds_to_hour_minute_second(self._msg_num_in_broker / self._execute_task_times_every_minute * 60)} 時間' f'才能執行完成 {self._msg_num_in_broker}個剩余的任務 ') self._current_time_for_execute_task_times_every_minute = time.time() self._execute_task_times_every_minute = 0 if current_retry_times < self._max_retry_times + 1: # noinspection PyBroadException t_start = time.time() try: function_run = self.consuming_function if self._function_timeout == 0 else decorators.timeout(self._function_timeout)(self.consuming_function) if self._is_consuming_function_use_multi_params: # 消費函數使用傳統的多參數形式 function_run(**kw['body']) else: function_run(kw['body']) # 消費函數使用單個參數,參數自身是一個字典,由鍵值對表示各個參數。 self._confirm_consume(kw) if self._do_task_filtering: self._redis_filter.add_a_value(kw['body']) # 函數執行成功后,添加函數的參數排序后的鍵值對字符串到set中。 self.logger.debug(f'函數 {self.consuming_function.__name__} 第{current_retry_times + 1}次 運行, 正確了,函數運行時間是 {round(time.time() - t_start, 2)} 秒,入參是 【 {kw["body"]} 】') except Exception as e: if isinstance(e, (PyMongoError, ExceptionForRequeue)): # mongo經常維護備份時候插入不了或掛了,或者自己主動拋出一個ExceptionForRequeue類型的錯誤會重新入隊,不受指定重試次數逇約束。 self.logger.critical(f'函數 [{self.consuming_function.__name__}] 中發生錯誤 {type(e)} {e}') return self._requeue(kw) self.logger.error(f'函數 {self.consuming_function.__name__} 第{current_retry_times + 1}次發生錯誤,函數運行時間是 {round(time.time() - t_start, 2)} 秒,\n 入參是 【 {kw["body"]} 】 \n 原因是 {type(e)} ', exc_info=self._is_print_detail_exception) self._run_consuming_function_with_confirm_and_retry(kw, current_retry_times + 1) else: self.logger.critical(f'函數 {self.consuming_function.__name__} 達到最大重試次數 {self._max_retry_times} 后,仍然失敗, 入參是 【 {kw["body"]} 】') # 錯得超過指定的次數了,就確認消費了。 self._confirm_consume(kw) @abc.abstractmethod def _confirm_consume(self, kw): """確認消費""" raise NotImplementedError # noinspection PyUnusedLocal def check_heartbeat_and_message_count(self): self._msg_num_in_broker = self.publisher_of_same_queue.get_message_count() if time.time() - self._last_timestamp_print_msg_num > 60: self.logger.info(f'[{self._queue_name}] 隊列中還有 [{self._msg_num_in_broker}] 個任務') self._last_timestamp_print_msg_num = time.time() if self._msg_num_in_broker != 0: self._last_timestamp_when_has_task_in_queue = time.time() return self._msg_num_in_broker @abc.abstractmethod def _requeue(self, kw): """重新入隊""" raise NotImplementedError def _submit_task(self, kw): if self._judge_is_daylight(): self._requeue(kw) time.sleep(self.time_interval_for_check_do_not_run_time) return self.threadpool.submit(self._run_consuming_function_with_confirm_and_retry, kw) def _judge_is_daylight(self): if self._is_do_not_run_by_specify_time_effect and self._do_not_run_by_specify_time[0] < time_util.DatetimeConverter().time_str < self._do_not_run_by_specify_time[1]: self.logger.warning(f'現在時間是 {time_util.DatetimeConverter()} ,現在時間是在 {self._do_not_run_by_specify_time} 之間,不運行') return True def wait_for_possible_has_finish_all_tasks(self, minutes: int, mannu_call_check_heartbeat_and_message_count=False, stop_flag=0): """ 由於是異步消費,和存在隊列一邊被消費,一邊在推送,或者還有結尾少量任務還在確認消費者實際還沒徹底運行完成。 但有時候需要判斷 所有任務,務是否完成,提供一個不精確的判斷,要搞清楚原因和場景后再慎用。 :param minutes 連續多少分鍾沒任務就判斷為消費已完成 :param mannu_call_check_heartbeat_and_message_count 如果消費者沒有執行startconsuming,需要手動調用這個方法 :param stop_flag 設置停止標志。停止當前實例無限循環調度消息。 :return: """ if minutes <= 1: raise ValueError('疑似完成任務,判斷時間最少需要設置為2分鍾內,每隔20秒檢測一次都是0個任務,') if mannu_call_check_heartbeat_and_message_count: self.threadpool = BoundedThreadPoolExecutor(2) self.threadpool.submit(self.keep_circulating(20)(self.check_heartbeat_and_message_count)) while True: if minutes * 60 < time.time() - self._last_timestamp_when_has_task_in_queue < 3650 * 24 * 60 * 60: # 初次時間戳是0,確保不是無限大。 # print(self._last_timestamp_print_msg_num) self.logger.warning(f'最后一次有任務的時間是{time_util.DatetimeConverter(self._last_timestamp_when_has_task_in_queue)},已經有 {minutes} 分鍾沒有任務了,疑似完成。') self.stop_flag = stop_flag if self.stop_flag: self.logger.warning('當前實例退出循環調度消息') break else: time.sleep(30) """ continuou_no_task_times = 0 check_interval_time = 10 while True: try: msg_num_in_broker = self.check_heartbeat_and_message_count() except Exception: msg_num_in_broker = 9999 if msg_num_in_broker == 0: continuou_no_task_times += 1 else: continuou_no_task_times = 0 if continuou_no_task_times >= minutes * (60//check_interval_time): break time.sleep(check_interval_time) """ class RabbitmqConsumer(AbstractConsumer): """ 使用pika包實現的。 """ BROKER_KIND = 0 def _shedual_task_old(self): channel = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint().creat_a_channel() channel.queue_declare(queue=self._queue_name, durable=True) channel.basic_qos(prefetch_count=self._threads_num) def callback(ch, method, properties, body): body = body.decode() self.logger.debug(f'從rabbitmq的 [{self._queue_name}] 隊列中 取出的消息是: {body}') time.sleep(self._msg_schedule_time_intercal) body = json.loads(body) kw = {'ch': ch, 'method': method, 'properties': properties, 'body': body} self._submit_task(kw) if self.stop_flag: ch.close() # 使start_consuming結束。 channel.basic_consume(callback, queue=self._queue_name, # no_ack=True ) channel.start_consuming() def _shedual_task(self): channel = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint().creat_a_channel() channel.queue_declare(queue=self._queue_name, durable=True) channel.basic_qos(prefetch_count=self._threads_num) while True: if self.stop_flag: return method, properties, body = channel.basic_get(self._queue_name, no_ack=False) if body is None: time.sleep(0.001) else: body = body.decode() self.logger.debug(f'從rabbitmq的 [{self._queue_name}] 隊列中 取出的消息是: {body}') body = json.loads(body) kw = {'ch': channel, 'method': method, 'properties': properties, 'body': body} self._submit_task(kw) time.sleep(self._msg_schedule_time_intercal) def _confirm_consume(self, kw): with self._lock_for_pika: kw['ch'].basic_ack(delivery_tag=kw['method'].delivery_tag) # 確認消費 def _requeue(self, kw): with self._lock_for_pika: # ch.connection.add_callback_threadsafe(functools.partial(self.__ack_message_pika, ch, method.delivery_tag)) return kw['ch'].basic_nack(delivery_tag=kw['method'].delivery_tag) # 立即重新入隊。 @staticmethod def __ack_message_pika(channelx, delivery_tagx): """Note that `channel` must be the same pika channel instance via which the message being ACKed was retrieved (AMQP protocol constraint). """ if channelx.is_open: channelx.basic_ack(delivery_tagx) else: # Channel is already closed, so we can't ACK this message; # log and/or do something that makes sense for your app in this case. pass class RabbitmqConsumerRabbitpy(AbstractConsumer): """ 使用rabbitpy實現的 """ BROKER_KIND = 1 def _shedual_task(self): # noinspection PyTypeChecker channel = RabbitMqFactory(is_use_rabbitpy=1).get_rabbit_cleint().creat_a_channel() # type: rabbitpy.AMQP # channel.queue_declare(queue=self._queue_name, durable=True) channel.basic_qos(prefetch_count=self._threads_num) for message in channel.basic_consume(self._queue_name, no_ack=False): body = message.body.decode() self.logger.debug(f'從rabbitmq {self._queue_name} 隊列中 取出的消息是: {body}') time.sleep(self._msg_schedule_time_intercal) kw = {'message': message, 'body': json.loads(message.body.decode())} if self.stop_flag: return # channel.channel.close() self._submit_task(kw) def _confirm_consume(self, kw): kw['message'].ack() def _requeue(self, kw): kw['message'].nack(requeue=True) class RedisConsumer(AbstractConsumer, RedisMixin): """ redis作為中間件實現的。 """ BROKER_KIND = 2 def _shedual_task_old(self): while True: t_start = time.time() task_bytes = self.redis_db7.blpop(self._queue_name)[1] # 使用db7 if task_bytes: task_dict = json.loads(task_bytes) # noinspection PyProtectedMember self.logger.debug(f'取出的任務時間是 {round(time.time() - t_start, 2)} 消息是: {task_bytes.decode()} ') time.sleep(self._msg_schedule_time_intercal) kw = {'body': task_dict} if self.stop_flag: return self._submit_task(kw) def _shedual_task(self): # 這樣容易控制退出消費循環。 while True: if self.stop_flag: return t_start = time.time() task_bytes = self.redis_db7.lpop(self._queue_name) # 使用db7 if task_bytes: task_dict = json.loads(task_bytes) # noinspection PyProtectedMember self.logger.debug(f'取出的任務時間是 {round(time.time() - t_start, 2)} 消息是: {task_bytes.decode()} ') kw = {'body': task_dict} self._submit_task(kw) else: time.sleep(0.001) time.sleep(self._msg_schedule_time_intercal) def _confirm_consume(self, kw): pass # redis沒有確認消費的功能。 def _requeue(self, kw): self.redis_db7.rpush(self._queue_name, json.dumps(kw['body'])) def get_publisher(queue_name, *, log_level_int=10, logger_prefix='', is_add_file_handler=False, clear_queue_within_init=False, broker_kind=0): """ :param queue_name: :param log_level_int: :param logger_prefix: :param is_add_file_handler: :param clear_queue_within_init: :param broker_kind: 中間件或使用包的種類。 :return: """ all_kwargs = copy.deepcopy(locals()) all_kwargs.pop('broker_kind') if broker_kind == 0: return RabbitmqPublisher(**all_kwargs) elif broker_kind == 1: return RabbitmqPublisherUsingRabbitpy(**all_kwargs) elif broker_kind == 2: return RedisPublisher(**all_kwargs) else: raise ValueError('設置的中間件種類數字不正確') def get_consumer(queue_name, *, consuming_function: Callable = None, function_timeout=0, threads_num=50, specify_threadpool: ThreadPoolExecutor = None, max_retry_times=3, log_level=10, is_print_detail_exception=True, msg_schedule_time_intercal=0.0, logger_prefix='', create_logger_file=True, do_task_filtering=False, is_consuming_function_use_multi_params=True, is_do_not_run_by_specify_time_effect=False, do_not_run_by_specify_time=('10:00:00', '22:00:00'), schedule_tasks_on_main_thread=False, broker_kind=0): """ 使用工廠模式再包一層,通過設置數字來生成基於不同中間件或包的consumer。 :param queue_name: :param consuming_function: 處理消息的函數。 :param function_timeout : 超時秒數,函數運行超過這個時間,則自動殺死函數。為0是不限制。 :param threads_num: :param specify_threadpool:使用指定的線程池,可以多個消費者共使用一個線程池,不為None時候。threads_num失效 :param max_retry_times: :param log_level: :param is_print_detail_exception: :param msg_schedule_time_intercal:消息調度的時間間隔,用於控頻 :param logger_prefix: 日志前綴,可使不同的消費者生成不同的日志 :param create_logger_file : 是否創建文件日志 :param do_task_filtering :是否執行基於函數參數的任務過濾 :param is_consuming_function_use_multi_params 函數的參數是否是傳統的多參數,不為單個body字典表示多個參數。 :param is_do_not_run_by_specify_time_effect :是否使不運行的時間段生效 :param do_not_run_by_specify_time :不運行的時間段 :param schedule_tasks_on_main_thread :直接在主線程調度任務,意味着不能直接在當前主線程同時開啟兩個消費者。 :param broker_kind:中間件種類 :return """ all_kwargs = copy.copy(locals()) all_kwargs.pop('broker_kind') if broker_kind == 0: return RabbitmqConsumer(**all_kwargs) elif broker_kind == 1: return RabbitmqConsumerRabbitpy(**all_kwargs) elif broker_kind == 2: return RedisConsumer(**all_kwargs) else: raise ValueError('設置的中間件種類數字不正確') # noinspection PyMethodMayBeStatic,PyShadowingNames class _Test(unittest.TestCase, LoggerMixin, RedisMixin): """ 演示一個簡單求和的例子。 """ @unittest.skip def test_publisher_with(self): """ 測試上下文管理器。 :return: """ with RabbitmqPublisher('queue_test') as rp: for i in range(1000): rp.publish(str(i)) @unittest.skip def test_publish_rabbit(self): """ 測試mq推送 :return: """ rabbitmq_publisher = RabbitmqPublisher('queue_test', log_level_int=10, logger_prefix='yy平台推送') rabbitmq_publisher.clear() for i in range(500000): try: time.sleep(1) rabbitmq_publisher.publish({'a': i, 'b': 2 * i}) except Exception as e: print(e) rabbitmq_publisher = RabbitmqPublisher('queue_test2', log_level_int=20, logger_prefix='zz平台推送') rabbitmq_publisher.clear() [rabbitmq_publisher.publish({'somestr_to_be_print': str(i)}) for i in range(500000)] @unittest.skip def test_publish_redis(self): # 如果需要批量推送 for i in range(10007): # 最犀利的批量操作方式,自動聚合多條redis命令,支持多種redis混合命令批量操作。 RedisBulkWriteHelper(self.redis_db7, 1000).add_task(RedisOperation('lpush', 'queue_test', json.dumps({'a': i, 'b': 2 * i}))) [self.redis_db7.lpush('queue_test', json.dumps({'a': j, 'b': 2 * j})) for j in range(500)] print('推送完畢') @unittest.skip def test_consume(self): """ 單參數代表所有傳參 :return: """ def f(body): self.logger.info(f'消費此消息 {body}') # print(body['a'] + body['b']) time.sleep(5) # 模擬做某事需要阻塞10秒種,必須用並發。 # 把消費的函數名傳給consuming_function,就這么簡單。 rabbitmq_consumer = RabbitmqConsumer('queue_test', consuming_function=f, threads_num=20, msg_schedule_time_intercal=0.5, log_level=10, logger_prefix='yy平台消費', is_consuming_function_use_multi_params=False) rabbitmq_consumer.start_consuming_message() @unittest.skip def test_consume2(self): """ 測試支持傳統參數形式,不是用一個字典里面包含所有參數。 :return: """ def f2(a, b): self.logger.debug(f'a的值是 {a}') self.logger.debug(f'b的值是 {b}') print(f'{a} + {b} 的和是 {a + b}') time.sleep(3) # 模擬做某事需要阻塞10秒種,必須用並發。 # 把消費的函數名傳給consuming_function,就這么簡單。 RabbitmqConsumer('queue_test', consuming_function=f2, threads_num=60, msg_schedule_time_intercal=5, log_level=10, logger_prefix='yy平台消費', is_consuming_function_use_multi_params=True).start_consuming_message() @unittest.skip def test_redis_filter(self): """ 測試基於redis set結構的過濾器。 :return: """ redis_filter = RedisFilter('abcd') redis_filter.add_a_value({'a': 1, 'c': 3, 'b': 2}) redis_filter.check_value_exists({'a': 1, 'c': 3, 'b': 2}) redis_filter.check_value_exists({'a': 1, 'b': 2, 'c': 3}) with decorators.TimerContextManager(): print(redis_filter.check_value_exists('{"a": 1, "b": 2, "c": 3}')) with decorators.TimerContextManager(): # 實測百萬元素的set,過濾檢查不需要1毫秒,一般最多100萬個酒店。 print(RedisFilter('filter:mafengwo-detail_task').check_value_exists({"_id": "69873340"})) @unittest.skip def test_run_two_function(self): # 演示連續運行兩個consumer def f3(a, b): print(f'{a} + {b} = {a + b}') time.sleep(10) # 模擬做某事需要阻塞10秒種,必須用並發。 def f4(somestr_to_be_print): print(f'打印 {somestr_to_be_print}') time.sleep(20) # 模擬做某事需要阻塞10秒種,必須用並發。 RabbitmqConsumer('queue_test', consuming_function=f3, threads_num=20, msg_schedule_time_intercal=2, log_level=10, logger_prefix='yy平台消費', is_consuming_function_use_multi_params=True).start_consuming_message() RabbitmqConsumer('queue_test2', consuming_function=f4, threads_num=20, msg_schedule_time_intercal=4, log_level=10, logger_prefix='zz平台消費', is_consuming_function_use_multi_params=True).start_consuming_message() # AbstractConsumer.join_shedual_task_thread() # @unittest.skip def test_factory_pattern_consumer(self): """ 測試工廠模式來生成消費者 :return: """ def f2(a, b): # body_dict = json.loads(body) self.logger.info(f'消費此消息 {a} {b} ,結果是 {a+b}') # print(body_dict['a'] + body_dict['b']) time.sleep(2) # 模擬做某事需要阻塞10秒種,必須用並發。 # 把消費的函數名傳給consuming_function,就這么簡單。 consumer = get_consumer('queue_test5', consuming_function=f2, threads_num=30, msg_schedule_time_intercal=1, log_level=10, logger_prefix='zz平台消費', function_timeout=20, is_print_detail_exception=True, broker_kind=0) # 通過設置broker_kind,一鍵切換中間件為mq或redis consumer.publisher_of_same_queue.clear() [consumer.publisher_of_same_queue.publish({'a': i, 'b': 2 * i}) for i in range(80)] consumer.start_consuming_message() # consumer.stop_flag = 1 # 原則是不需要關閉消費,一直在后台等待任務,循環調度消息。如果需要關閉可以使用下面。 nb_print('判斷完成阻塞中。。。') consumer.wait_for_possible_has_finish_all_tasks(2, stop_flag=1) nb_print('這一行要等疑似結束判斷,才能運行。。。') if __name__ == '__main__': # noinspection PyArgumentList unittest.main(sleep_time=1)
1、雖然實現這樣的萬能異步分布式框架代碼很長,代碼看起來有點復雜(如果真正的懂oop,看起來就不復雜,里面用了大量 模板模式 工廠模式 裝飾器 等)。但使用卻極其簡單。核心就是定義了一個函數,只需要把函數傳給這個Consumer類的初始化方法,並和隊列名綁定,就可以一行代碼實現分布式消費了。Consumer類的實例初始化參數只有2個是最本質核心,分別是隊列名字和函數,其余的參數全是輔助功能。目前框架已用於多個平台以及線上生產項目中,相當穩定。
我的寫代碼理念是做任何事,最好盡可能先設計好想好,然后抽取可復用流程或框架。我希望只麻煩、 很復雜一次,而不是使用無限復制粘貼扣字的做法來麻煩 復雜無數次。
2、使用這個框架,能大大簡化一切需要分布式的代碼,使你在寫任何需要分布式的項目和平台時候,都不需要關心分布式本身,只需要專注於寫好函數,寫完后,直接把函數和隊列名綁定,實例化一個consumer實例,然后執行start_consuming_message方法就可以。
3、有人有疑惑為什么反復強調的是函數?類行不行?
這個調度的本質是從中間件隊列中取到一個消息,消息是json形式,例如取到的消息是 {"a":1,"b":2},
一個函數是 def add(a,b):
print(a + b)
那么框架自動使用 add(** {"a":1,"b":2}) 的參數來調用 add函數。如果是類,很難判斷到底哪些參數傳給實例的初始化方法,哪些傳給其他方法。
如果是這樣的形式 Aclass(x).fun(y), 那再用函數包裝一層就可以了,
例如 def ffff(x,y):
Aclass(x).fun(y)
然后把ffff作為函數傳給consumer的初始化方法就可以了。
雖然實現框架用了很多類,但消費的不使用類的原因,
1)使用類,那就是有狀態的,分布式最好是無狀態的,函數更好。
2)再者一般這里面的consuming_function要簡單,每個函數只要做一件簡單的事,然后由框架無限次循環調度。如果一個函數做的事情太大,一個函數內部啥都干了,一個函數運行需要持續幾十分鍾幾個小時,那分布式就成了廢物,可以吧這個巨大的函數任務弄成邊消費邊推送,分解成很多細粒度的任務。分布式就是要消費大量細粒度的任務,使每台機器都有機會消費),弄錯一個細粒度的任務,重試時候不至於造成巨大代價。所以簡單細粒度的任務一般也不需要用類。
3)再者 celery裝飾器也是加在函數上,celery的任務也是函數單元,所以是不需要類的。
4、分布式為什么重要?
即使是只有一台機器,做分布式也很重要
a/ 這可以保存未消費的消息,停止腳本可以繼續接着運行未消耗的任務。
b/並且支持使用多進程而不需要考慮進程間通信,
c/而且支持重復啟動同一個腳本10次,使這十個運行中的腳本都有機會消費任務
d/python有個垃圾的地方是只能使用單核,如果只使用單進程,那32核linux電腦本來可以達到3200%的cpu使用率,但python由於設計的原因,python程序繁忙得到天了也只能把cpu消耗到100%,這一點在linux使用top命令查看可以證實,一個python腳本即使再忙碌運算量再大,這個linux進程絕對不會超過120%cpu使用率,java能把cpu消耗到3000%都可以,所以需要使用多進程或多次啟動來充分使用機器cpu。如果沒有分布式,一個腳本在消耗任務,別的腳本又不知道這個腳本還需要做哪些任務,大大的浪費cpu空閑資源,python是解釋性語言性能本來就垃圾再加上單進程不能充分使用多核優勢,造成了py性能雪上加霜,比java性能差了50倍,比c語言速度差了100倍。所以python比其他語言更需要分布式了。
如果是winwods用戶也可以證實,比如你電腦是i5 四核的,windows的cpu總數是100%使用率(和linux的統計不一樣),即使你寫個腳本反復計算運行100次1加到10億,這毫無疑問會造成cpu很忙碌,但python的設計原因,你的python消耗的cpu會是25%。總之就是無論 在linux還是windows,python都不能 充分利用cpu,所以需要分布式,便於多進程消費。
這個框架最好是用於io任務非純cpu計算的,但即使是io任務,由於python性能很差勁,做同樣的事情要消耗比c語言更高的cpu使用率更長的運行時間,即使是io任務python也會消耗很多cpu,所以io任務通常也需要使用多進程來充分使用cpu,所以分布式很重要。這個Conusmer類不僅是可以分布式,還提供了標題中的另外10種功能。
運行結果是這樣。
如果設置msg_schedule_time_intervel 為0.2和0.5,可以發現,的確是做到了控頻。精確地控制了每秒執行5次和2次的速度。
測試任務過期
也可以使用java發消息,py來運行。默認使用json來序列化和反序列化消息。所以推送的消息必須是簡單的,不要把一個自定義類型的對象作為消費函數的入參,json鍵的值必須是簡單類型,例如 數字 字符串 數組 字典這種。不可以是不可被json序列化的自定義類型的對象。用json序列化已經滿足所有場景了, picke序列化更強,但仍然有一些自定義類型的對象的實例屬性由於是一個不可被序列化的東西,picke解決不了,這種東西例如self.r = Redis() ,不可以序列化,就算能序列化也是要用一串很長的東西來表示這種屬性,導致中間件要存儲很大的東西傳輸效率會降低,這種完全可以使用json來解決,例如指定ip 和端口,在消費函數內部來使用redis。所以用json一定可以滿足一切傳參場景。
如果是使用celery,由於推送時候要讀取項目配置,java和python基本不能配合。這一點可以從消息里面的結構可以證實,因為celery的消息包括了函數參數、celery項目的配置、裝飾器的參數配置。
celery主要核心使用理念是在函數上加入裝飾器,裝飾器指定任務的路由,或者在獨立的配置中指定路由。然后調用 函數名.delay(x,y),這樣消費和發布都是自動使用同一個隊列了。很魔術,但作用也不是很大,黑魔法實在python的ide里面代碼是不能自動補全提示的,因為用了元編程,是一種動態的,pycharm只能解析死語法。