1 采集模塊整體架構
采集模塊主要分為三大塊。
Ceilometer:用於采集數據並處理數據后發送到gnocchi服務去存儲
Gnocchi:用於將采集數據進行計算合並和存儲並提供rest api方式接收和查詢監控數據
Aodh:主要負責告警功能
1.1 Ceilometer架構
Ceilometer-polling服務:通過調用多個采集插件(采集插件在setup.cfg中有定義,ceilometer.poll.compute對應的就是采集插件)收集信息,這個服務收集的是有關虛擬機資源使用情況相關的數據,比如cpu、內存占用率等,通過libvirt獲取這些信息,並發送到notifications.sample隊列中保存。
Ceilometer-central服務:也是采集數據的服務,但它是通過API的輪詢方式去獲取一些服務的信息,采集的是Ceilometer-polling服務采集以外的信息,比如磁盤服務的狀態和總共使用了多少。
Ceilometer-agent-notification:polling服務采集到的原始數據稱為Meter。Meter是資源使用的計量項,它的屬性包括:名稱(name)、單位 (unit)、類型(cumulative:累計值,delta:變化值、gauge:離散或者波動值)以及對應的資源屬性等,不符合相關格式,因此可以在發送數據前進行一些轉換,這個轉換稱為Transformer,一條Meter數據可以經過多個Transformer處理后再由publisher發送。
這個是處理數據並進行數據轉換的服務,該服務先是從notifications.sample隊列中取出消息,
然后經過處理和轉換成measure結構后發送數據到gnocchi服務中去。
Ceilometer-collector服務:該服務在以前的版本中是用來獲取監控數據消息然后統一處理后發給Gnocchi-api服務的,但現在通過配置pipeline.yaml配置文件的publisher為gnocchi,則不經過ceilometer-collector中轉了,直接發送到gnocchi-api服務里去處理。
Gnocchi服務:對監控數據進行聚合計算並存儲到后端存儲並提供獲取監控數據的rest api接口。
1.2 Gnocchi架構
從圖可以看出Gnocchi的服務主要包含兩大服務,API和Metricd服務。同時可以看到有三個存儲,Measure Storage、Aggregate Storage和Index。
Measure Storage:是經過ceilometer-agent-notification服務處理后發送過來的數據,是實際的監控數據,但這些數據還需要經過gnocchi服務處理,處理后就會刪除掉。比如這部分數據就可以保存到file中,當然也支持保存到ceph,但這屬於臨時數據,所以用file保存就可以了。
Aggregate Storage:Aggreate是總數、合計的意思,gnocchi服務采用的是一種獨特的時間序列存儲方法,這個存儲存放的是按照預定義的策略進行聚合計算后的數據,這樣在獲取監控數據展示時速度就會很快,因為已經計算過了。用戶看到的是這層數據。后端存儲包括file、swift、ceph,influxdb,默認使用file。可以保存到ceph中,這樣可在任意一個節點上獲取,但由於存儲的都是大量小文件,大量的小文件對ceph來說並不友好。
Index:通常是一個關系型數據庫(比如MYSQL),是監控數據的元數據,用以索引取出resources和metrics,使得可以快速的從Measure Storage和Aggregate Storage中取出所需要的數據。
API:gnocchi-api服務進程,可以托管到httpd服務一起啟動,通過Indexer和Storage的driver,提供查詢和操作ArchivePolicy,Resource,Metric,Measure的接口,並將新到來的Measure(也就是ceilometer-agent-notification發送到gnocchi-api服務的數據)存入Measure Storage。
Metricd:gnocchi-metricd服務進程,根據Metric定義的ArchivePolicy規則周期性的從Measure Storage中獲取未處理的Measure數據並進行處理,將處理結果保存到Aggregate Storage中,同時也對Aggregate Storage中的數據進行聚合計算和清理過期的數據。
API和Metricd服務都是設計成了無狀態的服務,可以橫向拓展來加快數據的處理。
Gnocchi中有三層數據,resources -> metric -> measure
Resource:是gnocchi對openstack監控數據的一個大體的划分,比如虛擬機的磁盤的所有監控資源作為一個resource,可用命令gnocchi resource list查看
Metric:是gnocchi對openstack監控數據的第二層划分,歸屬於resource,代表一個較具體的資源,比如cpu值,可用命令gnocchi metric list查看
Measure:是gnocchi對openstack監控數據的第三層划分,歸屬於metric,表示在某個時間戳對應資源的值,可用命令gnocchi measures show metric_id
如果沒找到,可采用另外一種方式查看:
gnocchi measures show --resource-id uuid cpu_util
這里舉一個查看某個雲主機cpu使用率監控值例子:
先使用openstack server list找到要查的雲主機的uuid,比如找到是3e968827-8f1b-4d51-b76e-31dde39b34d9
然后查看與該雲主機相關的metric:
gnocchi metric list | grep 3e968827-8f1b-4d51-b76e-31dde39b34d9
可以看到這個雲主機的cpu_util這個metric的id為a89734c5-8e17-4905-a402-9b739f53c42c
查看計算后的數值:
gnocchi measures show a89734c5-8e17-4905-a402-9b739f53c42c
中間那個granularity值表示采集時間間隔,60表示60秒,1800表示30分鍾,這個取決於我們自己設定的聚合計算策略,我這邊設的策略如下:
這個策略是表示保存1小時(只保存60個點,每個點是1分鍾)的數據和1天的數據(只保存48個點,每個點是30分鍾),最后一列是計算方法,比如有min,max等方法,我這里只用了mean方法即可,只保存這些間隔數據。
另外一種通過resource-id找到對應的metric的id方法:
gnocchi resource show resource-id
同樣可以反過來查找:
gnocchi metric show metric-id
聚合計算策略介紹:
聚合計算策略表示最后數據存儲到后端時是什么形態,間隔多少,保存多久
gnocchi archive-policy create -d points:60,granularity:0:01:00 -d points:48,granularity:0:30:00 -m mean thin4
points:60,granularity:0:01:00表示只保存60個點,每個點間隔是1分鍾,也就是只保存最新1小時的數據
points:48,granularity:0:30:00表示只保存48個點,每個點間隔是30分鍾,也就是只保存最新一天的數據
一些resource type和要計算的metric都定義在/etc/ceilometer/gnocchi_resources.yaml該文件中
2 代碼流程解析
2.1 Ceilometer-polling服務采集數據流程
首先看該程序的入口函數:
File:ceilometer/cmd/polling.py
def main(): conf = cfg.ConfigOpts() conf.register_cli_opts(CLI_OPTS) service.prepare_service(conf=conf) # cotyledon庫是用來替代oslo.service庫的,提供跟oslo.service類似的管理服務功能 sm = cotyledon.ServiceManager() # 添加一個AgentManager管理類來運行 # 初始化該實例會進行插件加載和消息發送實例初始化 sm.add(create_polling_service, args=(conf,)) oslo_config_glue.setup(sm, conf) # 調用AgentManager類實例的run方法啟動服務 sm.run()
可以看到這里進行了一些配置文件獲取的初始化,最重要的是代用create_polling_service函數進行AgentManager類初始化。查看該類初始化代碼:
File:ceilometer/agent/manager.py
class AgentManager(service_base.PipelineBasedService): def __init__(self, worker_id, conf, namespaces=None, pollster_list=None, ): # ceilometer-compute服務傳入的是namespaces是compute namespaces = namespaces or ['compute', 'central'] pollster_list = pollster_list or [] ...... # we'll have default ['compute', 'central'] here if no namespaces will # be passed ''' 加載采集插件 # 根據namespance加載setup.cfg文件下的ceilometer.poll.compute、ceilometer.poll.central或 ceilometer.builder.poll.central 這三個namespaces下對應的插件 ''' extensions = (self._extensions('poll', namespace, self.conf).extensions for namespace in namespaces) # get the extensions from pollster builder extensions_fb = (self._extensions_from_builder('poll', namespace) for namespace in namespaces) if pollster_list: extensions = (moves.filter(_match, exts) for exts in extensions) extensions_fb = (moves.filter(_match, exts) for exts in extensions_fb) self.extensions = list(itertools.chain(*list(extensions))) + list( itertools.chain(*list(extensions_fb))) if self.extensions == []: raise EmptyPollstersList() # 加載ceilometer.discover.compute的插件,從setup.cfg中來看只有 # ceilometer.compute.discovery:InstanceDiscovery用來發現宿主機下的雲主機 discoveries = (self._extensions('discover', namespace, self.conf).extensions for namespace in namespaces) self.discoveries = list(itertools.chain(*list(discoveries))) self.polling_periodics = None # 獲取一個工作負載分區協調類實例,用來協調多個采集程序worker時的分工處理 self.partition_coordinator = coordination.PartitionCoordinator( self.conf) self.heartbeat_timer = utils.create_periodic( target=self.partition_coordinator.heartbeat, spacing=self.conf.coordination.heartbeat, run_immediately=True) # Compose coordination group prefix. # We'll use namespaces as the basement for this partitioning. namespace_prefix = '-'.join(sorted(namespaces)) self.group_prefix = ('%s-%s' % (namespace_prefix, group_prefix) if group_prefix else namespace_prefix) # 從該服務發送出去的消息都會攜帶ceilometer.polling這個publisher_id # notifier是oslo_messaging/notifier.py的Notifier類實例 # 從配置文件中可以看到默認的driver是messagingv2,對應的是oslo_messageing/notify/messaging.py的MessagingV2Driver類實例 # telemetry_driver = messagingv2 self.notifier = oslo_messaging.Notifier( messaging.get_transport(self.conf), driver=self.conf.publisher_notifier.telemetry_driver, publisher_id="ceilometer.polling") self._keystone = None self._keystone_last_exception = None
這段代碼主要做了以下幾件事:
(1)從setup.cfg中加載ceilometer.poll.compute對應的采集插件
(2)獲取用來發送采集數據樣本到消息隊列的notifier對象
接着是執行了sm.run()代碼,則是調用到了AgentManager類實例的run方法:
File:ceilometer/agent/manager.py:AgentManager.run
def run(self): super(AgentManager, self).run() # 獲取polling.yaml文件內容保存到列表中 # 該文件中保存了需要監控的項目,默認設計*所有,但其實是可以自定義下,從而減少監控項,減少不必要的監控項 # 調用ceilometer/pipeline.py的setup_polling函數 self.polling_manager = pipeline.setup_polling(self.conf) self.join_partitioning_groups() # 啟動polling采集任務 self.start_polling_tasks() self.init_pipeline_refresh()
這段代碼主要功能:
(1)調用setup_polling方法解析/etc/ceilometer/polling.yaml文件,返回一個資源采集列表管理實例(PollingManager類實例),就是把文件定義的要監控的資源信息保存到一個列表(PollingManager類實例的sources列表)中來管理
(2)調用start_pollings_tasks方法啟動polling采集任務,調用各插件采集方法進行采集
查看start_pollings_tasks方法實現:
File:ceilometer/agent/manager.py:AgentManager.start_polling_tasks
def start_polling_tasks(self): ...... # 把要監測的資源全加載到polling_tasks中去 # 返回的就是polling_tasks data = self.setup_polling_tasks() # Don't start useless threads if no task will run if not data: return # One thread per polling tasks is enough # 對於相同時間間隔的共用一個線程即可 self.polling_periodics = periodics.PeriodicWorker.create( [], executor_factory=lambda: futures.ThreadPoolExecutor(max_workers=len(data))) # 對於每個時間間隔的polling_task都調用一次interval_task執行 # interval_task又調用到了poll_and_notify方法 for interval, polling_task in data.items(): delay_time = (interval + delay_polling_time if delay_start else delay_polling_time) @periodics.periodic(spacing=interval, run_immediately=False) def task(running_task): self.interval_task(running_task) # spawn_thread就是開一個線程,以daemon方式運行 utils.spawn_thread(utils.delayed, delay_time, self.polling_periodics.add, task, polling_task) utils.spawn_thread(self.polling_periodics.start, allow_empty=True)
這段代碼主要功能:
(1)調用setup_polling_tasks方法將要監測的資源以時間間隔為分類保存到polling_tasks列表中去,列表中的每一項是一個PollingTask類實例,每個PollingTask里包含了相同采集時間間隔的插件
(2)對於每一個polling_task開啟一個線程運行interval_task方法,其中又調用了poll_and_notify方法來輪詢每個插件的采集方法並發送
查看poll_and_notify方法實現:
File:ceilometer/agent/manager.py:PollingTask.poll_and_notify
# 將這個時間間隔的polling_task里的插件進行插件里的get_samples調用來采集數據並進行發送 def poll_and_notify(self): """Polling sample and notify.""" ..... # 對於每個source進行輪詢 for source_name in self.pollster_matches: # 對於每個插件進行輪詢采集 for pollster in self.pollster_matches[source_name]: key = Resources.key(source_name, pollster) candidate_res = list( self.resources[key].get(discovery_cache)) if not candidate_res and pollster.obj.default_discovery: candidate_res = self.manager.discover( [pollster.obj.default_discovery], discovery_cache) # 將要在該插件中采集的資源加入到polling_resources列表中 # 比如對於cpu插件來說,這里的resources單元就是虛擬機 polling_resources = [] ...... try: polling_timestamp = timeutils.utcnow().isoformat() # 調用插件的get_samples方法進行數據采集保存到samples變量中 # 比如當前是采集cpu的插件,則是調用ceilometer.compute.pollsters.cpu:CPUPollster類的get_samples方法 samples = pollster.obj.get_samples( manager=self.manager, cache=cache, resources=polling_resources ) sample_batch = [] for sample in samples: # Note(yuywz): Unify the timestamp of polled samples sample.set_timestamp(polling_timestamp) sample_dict = ( publisher_utils.meter_message_from_counter( sample, self._telemetry_secret )) # 判斷是否可以進行批量發送,默認是批量的,可以提高效率 if self._batch: sample_batch.append(sample_dict) else: self._send_notification([sample_dict]) if sample_batch: # 批量發送采集到的數據 self._send_notification(sample_batch) except plugin_base.PollsterPermanentError as err: ...... except Exception as err: ......
這段代碼的主要功能:
(1)對每個插件進行輪詢調用,通過調用插件的采集方法獲取sample數據
(2)通過調用_send_notification方法發送樣本數據到消息隊列(發送到了notifications.sample隊列中)中
查看_send_notification函數實現:
File:ceilometer/agent/manager.py:PollingTask._send_notification
def _send_notification(self, samples): # <class 'oslo_messaging.notify.notifier.Notifier'> # 'telemetry.polling'是定義了event_type,消息事件類型 self.manager.notifier.sample( {}, 'telemetry.polling', {'samples': samples} )
這里調用到了oslo_messaging庫的sample函數里去發送消息,sample方法又調用了_notify方法,該方法對采集數據進行序列化並調用do_notify方法進行發送。
查看do_notify方法實現:
File:oslo_messaging/notify/notifier.py:Notifier.do_notify
def do_notify(ext): try: # oslo_messaging.notify.messaging.MessagingV2Driver ext.obj.notify(ctxt, msg, priority, retry or self.retry) except Exception as e: .....
查看notify實現:
File:oslo_messaging/notify/messaging.py:MessagingV2Driver.notify
def notify(self, ctxt, message, priority, retry): priority = priority.lower() for topic in self.topics: target = oslo_messaging.Target(topic='%s.%s' % (topic, priority)) try: # 該transport是oslo_messaging/transport.pyTransport實例 # 調用該實例的_send_notification方法其實又調用到它引用的driver的send_notification方法 # 該driver根據配置文件定義是oslo_messaging/_drivers/impl_rabbit.py的RabbitDriver類實例 # 調用了該父類的的send_notification方法進行消息發送 self.transport._send_notification(target, ctxt, message, version=self.version, retry=retry) except Exception: .....
由此采集的數據就發送到消息隊列上了,我們再來看下采集插件采集數據的代碼,以采集cpu的插件為例。
比如從setup.cfg我們可以看到這個采集插件:
cpu = ceilometer.compute.pollsters.cpu:CPUPollster
找到對應的采集方法(由上面可以知道都會調用插件的get_samples方法):
File:ceilometer.compute.pollsters.cpu:CPUPollster.get_samples
def get_samples(self, manager, cache, resources): for instance in resources: LOG.debug('checking instance %s', instance.id) try: # inspector是LibvirtInspector類實例 # inspect_cpus方法其實是調用libvirt的接口去獲取cpu的 cpu_info = self.inspector.inspect_cpus(instance) LOG.debug("CPUTIME USAGE: %(instance)s %(time)d", {'instance': instance, 'time': cpu_info.time}) cpu_num = {'cpu_number': cpu_info.number} # 將cpu數據封裝為一個sample類對象並返回 # ceilometer/sample.py文件的sample類 yield util.make_sample_from_instance( self.conf, instance, name='cpu', type=sample.TYPE_CUMULATIVE, unit='ns', volume=cpu_info.time, additional_metadata=cpu_num, monotonic_time=monotonic.monotonic() ) except virt_inspector.InstanceNotFoundException as err: ......
(1)inspector是在加載插件時通過配置文件取得該實例的,該實例是LibvirtInspector類實例
(2)調用該實例方法從而調用到libvirt的api去獲取cpu信息
2.2 Ceilometer-agent-notification服務采集數據流程
首先看程序的入口:
File:ceilometer/cmd/notification.py
def main(): conf = service.prepare_service() sm = cotyledon.ServiceManager() # 初始化一個ceilometer/notification.py的NotificationService類實例 sm.add(notification.NotificationService, workers=conf.notification.workers, args=(conf,)) oslo_config_glue.setup(sm, conf) # 執行NotificationService類實例的run方法開始運行 sm.run()
這段代碼主要功能:
(1)初始化一個NotificationService類實例,加入管理服務中
(2)調用該實例的run方法啟動服務
查看run方法實現:
File:ceilometer/notification.py:NotificationService.run
def run(self): ...... self.pipeline_listener = None # pipeline、tranformer和publisher相關插件加載 # 返回一個PipelineManager類實例 self.pipeline_manager = pipeline.setup_pipeline(self.conf) # 跟setup_pipeline類似 self.event_pipeline_manager = pipeline.setup_event_pipeline(self.conf) # 加載對應的driver實現,這里是oslo_message/_drivers/impl_rabbit.py的RabbitDriver類實現 self.transport = messaging.get_transport(self.conf) # 如果是workload_partitioning設置為true,則需要初始化一個PartitionCoordinator類實例來協調 if self.conf.notification.workload_partitioning: self.group_id = self.NOTIFICATION_NAMESPACE self.partition_coordinator = coordination.PartitionCoordinator( self.conf) # coordinator協調器 self.partition_coordinator.start() else: ...... # 該函數里判斷了是否支持工作負載,如果是則返回SamplePipelineTransportManager類實例替換掉 # PipelineManager類實例,否則還是PipelineManager類實例 # 這兩者的區別在於publisher函數實現是不一樣的 # SamplePipelineTransportManager的在調用publisher函數時會再發到消息隊列中去保存,之后會再取出來處理再發到gnocchi-api上 # PipelineManager的則直接去發到gnocchi-api服務上去了 self.pipe_manager = self._get_pipe_manager(self.transport, self.pipeline_manager) self.event_pipe_manager = self._get_event_pipeline_manager( self.transport) # 獲取ceilometer.notification的插件,並通過這些插件獲取對應的target,再根據targets來找到要監聽的 # 隊列在上面建立消費者獲取消息並處理消息 self._configure_main_queue_listeners(self.pipe_manager, self.event_pipe_manager) if self.conf.notification.workload_partitioning: ....... # configure pipelines after all coordination is configured. with self.coord_lock: # 監聽比如ceilometer-pipe-cpu_source:cpu_delta_sink-0.sample這樣的隊列的消費者 # 因為配置了支持workload_partitioning后,經過轉換的消息並沒有直接發到gnocchi-api中去,而是 # 分類保存到這些隊列里了,再由這些隊列接收到消息后再發送出去 self._configure_pipeline_listener() self.init_pipeline_refresh()
這段代碼的主要功能:
(1)解析pipeline.yaml配置文件獲取監控的項的transforms和publishers,保存到PipelineManager類實例中
(2)調用_get_pipe_manager函數以根據workload_partitioning是否為true獲取一個pipe_manager對象,當workload_partitioning為true時是SamplePipelineTransportManager類對象,否則是PipelineManager類對象,區別上面代碼注釋有說
(3)獲取ceilometer.notification的插件,並通過這些插件獲取對應的target,並對這些target上的隊列創建消費者進行監聽
(4)如果workload_partitioning為true,則還需要創建一些監聽用於多個agent協作時進行IPC通信的隊列
先看setup_event_pipeline函數的實現:
File:ceilometer/pipeline.py
def setup_pipeline(conf, transformer_manager=None): """Setup pipeline manager according to yaml config file.""" # 加載setup.cfg文件中的ceilometer.transformer對應的模塊 default = extension.ExtensionManager('ceilometer.transformer') # 該配置文件對應的是/etc/ceilometer/pipeline.yaml cfg_file = conf.pipeline_cfg_file # SAMPLE_TYPE是一個字典,包含了SamplePipeline、SampleSource和SampleSink等類 # 返回一個PipelineManager實例,該實例管理了SamplePipeline類實例數組,SamplePipeline又是管理了 # SampleSource和SampleSink,用SampleSource中標志的sink來對應SampleSink中具體制定的transform和publisher return PipelineManager(conf, cfg_file, transformer_manager or default, SAMPLE_TYPE)
這段代碼主要功能:
(1)加載setup.cfg文件中的ceilometer.transformer對應的模塊以用以PipelineManager類實例初始化
(2)初始化PipelineManager類實例
查看初始化PipelineManager類實例代碼:
File:ceilometer/pipeline.py:PipelineManager.__init__
class PipelineManager(ConfigManagerBase): def __init__(self, conf, cfg_file, transformer_manager, p_type=SAMPLE_TYPE): ...... # p_type['name']比如是sample # 這里只是獲取一個PublisherManager類實例,還沒有進行加載 # 當需要獲取時會通過調用該類實例的get方法動態獲取對應的publisher publisher_manager = PublisherManager(self.conf, p_type['name']) unique_names = set() sources = [] # 解析sources,封裝為SimpleSource for s in cfg.get('sources'): name = s.get('name') if name in unique_names: raise PipelineException("Duplicated source names: %s" % name, self) else: unique_names.add(name) # p_type['source']對應的是SampleSource sources.append(p_type['source'](s)) unique_names.clear() # 解析sinks,封裝為SimpleSink sinks = {} for s in cfg.get('sinks'): name = s.get('name') if name in unique_names: raise PipelineException("Duplicated sink names: %s" % name, self) else: unique_names.add(name) # p_type['sink']對應的是SampleSink sinks[s['name']] = p_type['sink'](self.conf, s, transformer_manager, publisher_manager) unique_names.clear() # 將加載的SampleSource和SampleSink封裝成SamplePipeline for source in sources: source.check_sinks(sinks) for target in source.sinks: # p_type['pipeline']對應的是SamplePipeline pipe = p_type['pipeline'](self.conf, source, sinks[target]) if pipe.name in unique_names: raise PipelineException( "Duplicate pipeline name: %s. Ensure pipeline" " names are unique. (name is the source and sink" " names combined)" % pipe.name, cfg) else: unique_names.add(pipe.name) # 所以最后的結果都是保存到self.pipelines里 self.pipelines.append(pipe) unique_names.clear()
可以看到整段代碼其實都是在把pipeline.yaml中定義的內容轉換成相對應的類,最后封裝成SamplePipeline類實例為單元進行保存
再來看run方法中的_get_pipe_manager方法實現:
File:ceilometer/notification.py:NotificationService._get_pipe_manager
def _get_pipe_manager(self, transport, pipeline_manager): if self.conf.notification.workload_partitioning: pipe_manager = pipeline.SamplePipelineTransportManager(self.conf) for pipe in pipeline_manager.pipelines: key = pipeline.get_pipeline_grouping_key(pipe) pipe_manager.add_transporter( (pipe.source.support_meter, key or ['resource_id'], self._get_notifiers(transport, pipe))) else: pipe_manager = pipeline_manager return pipe_manager
從這段代碼我們可以看出,主要是檢測workload_partitioning參數是否設置為true,如果是則重現構建一個SamplePipelineTransportManager類實例並返回,否則還是返回之前的PipelineManager類實例。
這兩個類的最大不同之處在於publisher方法實現的不同,查看PipelineManager類實例的publisher方法實現:
File:ceilometer/pipeline.py:PipelineManager.publisher
def publisher(self): return PublishContext(self.pipelines) class PublishContext(object): ..... def __enter__(self): def p(data): for p in self.pipelines: p.publish_data(data) return p .....
File:ceilometer/pipeline.py:SamplePipeline.publish_data
def publish_data(self, samples): ..... supported = [s for s in samples if self.source.support_meter(s.name) and self._validate_volume(s)] # 調用SinkSample類實例的publish_samples方法將監控數據通過_transform_sample處理后發送到gnocchi-api中去 self.sink.publish_samples(supported)
由上面的代碼可以看出publisher方法實現就是直接將監控數據通過轉換后發送到gnocchi-api服務中去保存
看SamplePipelineTransportManager類實例的publisher方法實現:
File:ceilometer/pipeline.py:SamplePipelineTransportManager.publisher
def publisher(self): ...... class PipelinePublishContext(object): def __enter__(self): def p(data): data = [data] if not isinstance(data, list) else data for datapoint in data: serialized_data = serializer(datapoint) for d_filter, grouping_keys, notifiers in transporters: if d_filter(serialized_data[filter_attr]): key = (hash_grouping(serialized_data, grouping_keys) % len(notifiers)) notifier = notifiers[key] # <class 'oslo_messaging.notify.notifier.Notifier'> # 這里的event_type是ceilometer.pipeline,不是ceilometer.polling了 # 發送到類似ceilometer-pipe-xxx_source:xxx_sink-x.sample這樣的隊列去 notifier.sample({}, event_type=event_type, payload=[serialized_data]) return p def __exit__(self, exc_type, exc_value, traceback): pass return PipelinePublishContext()
由這個方法我們可以看出消息時通過notifier對象發送到其它隊列中去的,不是直接發到gnocchi-api中去的,但它會再從對應隊列里收集到然后再統一發送到gnocchi-api服務中去。這是因為在run方法中有調用_configure_pipeline_listener方法來創建消費者監聽這些隊列,並且這些監聽的收到消息是調用SamplePipelineEndpoint類的sample方法的,這是由endpoints決定的。可在_configure_pipeline_listener方法中看到endpoints對象的獲取代碼:
endpoints.append(pipeline.SamplePipelineEndpoint(pipe)),所以在消費者監聽到消息進行處理時會回調到該類的sample方法。
查看sample方法:
# 對於類似ceilometer-pipe-cpu_source:cpu_delta_sink-0.sample這樣的隊列來的消息會調用這里的sample方法進行處理
File:ceilometer/pipeline.py:SamplePipelineEndpoint.sample
def sample(self, messages): ..... with self.publish_context as p: # 這里的p就又回到了ceilometer.pipeline.PipelineManager.publisher.PublishContext.p這個方法了 # 將數據發送到gnocchi-api服務上去 p(sorted(samples, key=methodcaller('get_iso_timestamp')))
所以這里總結下workload_partitioning如果設置為true,則監控數據在發到gnocchi-api之前多做了層分類處理,將消息根據類型分派到更細分的隊列中,方便多個agent協同工作。
接着再看run方法的_configure_main_queue_listeners方法的實現:
File:ceilometer/notification.py:NotificationService._configure_main_queue_listeners
def _configure_main_queue_listeners(self, pipe_manager, event_pipe_manager): # 獲取ceilometer.notification命令空間里的插件 notification_manager = self._get_notifications_manager(pipe_manager) ack_on_error = self.conf.notification.ack_on_event_error # 這里的endpoints是很關鍵的,因為當收到消息進行處理時會分派消息,分派時就是根據消息的類型分派到指定的插件類來處理消息 endpoints = [] endpoints.append( event_endpoint.EventsNotificationEndpoint(event_pipe_manager)) targets = [] for ext in notification_manager: handler = ext.obj # 針對每一個加載的插件獲取它們的targets # 其中_sample的target為1個,<Target exchange=ceilometer, topic=notifications> # 比如http.request的則有多個,比如<Target exchange=nova, topic=notifications>,<Target exchange=glance, topic=notifications> for new_tar in handler.get_targets(self.conf): if new_tar not in targets: targets.append(new_tar) # 每個插件的對象實例保存到endpoints endpoints.append(handler) urls = self.conf.notification.messaging_urls or [None] for url in urls: transport = messaging.get_transport(self.conf, url) # 初始化一個solo_messaging/notify/listener.py文件的BatchNotificationServer類實例 listener = messaging.get_batch_notification_listener( transport, targets, endpoints) # 啟動監聽,創建消費者,綁定收到消息時的回調函數 listener.start() self.listeners.append(listener)
這段代碼的主要功能:
(1)獲取ceilometer.notification命令空間里的插件對象
(2)從插件對象里獲取target
(3)根據獲取到的targets創建對應隊列的消費者進行監聽
這里比較關鍵的代碼是初始化一個listener並調用start方法。
先看get_batch_notification_listener方法實現,該方法又直接調用到oslo_messaging/notify/listener.py的get_batch_notification_listener方法:
File:oslo_messaging/notify/listener.py
def get_batch_notification_listener(transport, targets, endpoints, executor='blocking', serializer=None, allow_requeue=False, pool=None, batch_size=None, batch_timeout=None): # 初始化一個oslo_messaging/notify/dispatcher.py的BatchNotificationDispatcher類實例 dispatcher = notify_dispatcher.BatchNotificationDispatcher( endpoints, serializer) return BatchNotificationServer( transport, targets, dispatcher, executor, allow_requeue, pool, batch_size, batch_timeout )
這段代碼的功能:
(1)初始化一個dispatcher對象,該對象的功能是當接收到消息時將消息分配到對應插件進行處理
(2)初始化BatchNotificationServer類實例並返回該對象
現在看listener調用start方法,調用該start方法主要是調用了_create_listener方法獲取PollStyleListenerAdapter類對象並調用start方法啟動,查看_create_listener方法的實現:
File:oslo_messaging/notify/listener.py:BatchNotificationServer._create_listene
# 會被start方法調用 def _create_listener(self): return self.transport._listen_for_notifications( self._targets_priorities, self._pool, self._batch_size, self._batch_timeout )
Transport對象其實又是通過driver調用_listen_for_notifications方法:
File:oslo_messaging/_drivers/amqpdriver.py:AMQPDriverBase.listen_for_notifications
def listen_for_notifications(self, targets_and_priorities, pool, batch_size, batch_timeout): conn = self._get_connection(rpc_common.PURPOSE_LISTEN) conn.connection.rabbit_qos_prefetch_count = batch_size listener = AMQPListener(self, conn) # 根據每個target創建對應隊列上的消費者 for target, priority in targets_and_priorities: conn.declare_topic_consumer( exchange_name=self._get_exchange(target), topic='%s.%s' % (target.topic, priority), callback=listener, queue_name=pool) return base.PollStyleListenerAdapter(listener, batch_size, batch_timeout)
這段代碼的主要功能:
(1)在要監聽的隊列上創建消費者進行監聽
(2)初始化PollStyleListenerAdapter類實例並返回
PollStyleListenerAdapter類對象初始化時會生成一個線程對象:
File:oslo_messaging/_drivers/base.py:PollStyleListenerAdapter.__init__
self._listen_thread = threading.Thread(target=self._runner)
然后調用start后就會生成一個線程運行_runner函數,該函數主要功能是不斷的去獲取消息,並通過調用_process_incoming函數來處理消息:
File:oslo_messaging/notify/listener.py:NotificationServer._process_incoming
def _process_incoming(self, incoming): message = incoming[0] try: res = self.dispatcher.dispatch(message) except Exception: ......
可以看到該處理函數會調用dispatcher對象來分派消息,在查看dispatch方法實現前,先來看下該實例初始化時做的事情:
File:oslo_messaging/notifydispatcher.py:NotificationDispatcher
class NotificationDispatcher(dispatcher.DispatcherBase): def __init__(self, endpoints, serializer): # endpoints:Notification插件集合 # 對應ceilometer.notification加載的插件,該插件會接受它對應的消息並進行處理且最后調用publisher將處理后的消息發送出去 self.endpoints = endpoints self.serializer = serializer or msg_serializer.NoOpSerializer() self._callbacks_by_priority = {} for endpoint, prio in itertools.product(endpoints, PRIORITIES): if hasattr(endpoint, prio): # 這里的方法就是sample、info、debug等,在ceilometer/agent/plugin_base.py中的NotificationBase類都會對應名字的方法實現 # 注意了如果是ceilometer-pipe-cpu_source:cpu_delta_sink-0.sample類似這些隊列來的則處理函數就不是sample、info、debug那些了 # 而是SamplePipelineEndpoint類的sample方法了,因為endpoints不一樣了 method = getattr(endpoint, prio) screen = getattr(endpoint, 'filter_rule', None) self._callbacks_by_priority.setdefault(prio, []).append( (screen, method))
這段代碼的主要功能是初始化_callbacks_by_priority字典對象,使得可以根據消息類型找到對應的插件進行對應方法的調用。
現在看dispatch方法實現:
File:oslo_messaging/notify/dispatcher.py:NotificationDispatcher.dispatch
def dispatch(self, incoming): """Dispatch notification messages to the appropriate endpoint method. """ # 將資源分配到合理的處理插件上去處理消息 priority, raw_message, message = self._extract_user_message(incoming) if priority not in PRIORITIES: LOG.warning(_LW('Unknown priority "%s"'), priority) return for screen, callback in self._callbacks_by_priority.get(priority, []): if screen and not screen.match(message["ctxt"], message["publisher_id"], message["event_type"], message["metadata"], message["payload"]): continue ret = self._exec_callback(callback, message) if ret == NotificationResult.REQUEUE: return ret return NotificationResult.HANDLED
這里重點看通過消息的priority字段查找self._callbacks_by_priority字典里匹配的插件的對應方法,也即是獲取到callback函數,然后進行調用。
舉例子:比如notifications.sample上的消息會匹配到TelemetryIpc類的sample方法去處理
_sample = ceilometer.telemetry.notifications:TelemetryIpc
但該類實例是調用了它父類的sample方法:
File:ceilometer/agent/plugin_base.py:NotificationBase.sample
def sample(self, notifications): self._process_notifications('sample', notifications) _process_notifications方法又調用了to_samples_and_publish方法: File:ceilometer/agent/plugin_base.py:NotificationBase.to_samples_and_publish def to_samples_and_publish(self, notification): with self.manager.publisher() as p: # 這里處理好的sample的格式類似於 # <name: bandwidth, volume: 252, resource_id: a16d3949-29a6-4d88-b9a1-c7fdc97d0e4f, timestamp: 2019-07-04 03:02:25.181893> # p是ceilometer.pipeline.SamplePipelineTransportManager.publisher.PipelinePublishContext.p方法,這個會再發到消息隊列上去 # 如類似ceilometer-pipe-cpu_source:cpu_delta_sink-0.sample這種隊列 # 如果workload_partitioning設置為false則調用的是ceilometer.pipeline.PipelineManager.publisher.PublishContext.p方法,這個是 # 直接發送gnocchi-api上去的 # 這個process_notification是調用對應的插件的process_notification方法對監控數據進行處理 p(list(self.process_notification(notification)))
接着我們查看調用p函數的實現:
File:ceilometer/pipeline.py:PublishContext.__enter__.p
def p(data): for p in self.pipelines: p.publish_data(data)
查看publish_data實現:
File:ceilometer/pipeline.py:SamplePipeline.publish_data
def publish_data(self, samples): ...... # 調用SampleSink類實例的publish_samples方法將監控數據通過_transform_sample處理后發送到gnocchi-api中去 self.sink.publish_samples(supported)
這個sink對象是之前解析pipeline.yaml文件后每種類型資源生成的SampleSink類對象,publish_samples又調用了_publish_samples方法:
File:ceilometer/pipeline.py:SampleSink._publish_samples
def _publish_samples(self, start, samples): transformed_samples = [] if not self.transformers: transformed_samples = samples else: for sample in samples: sample = self._transform_sample(start, sample) if sample: transformed_samples.append(sample) if transformed_samples: # 配置文件配的是gnocchi,根據setup.cfg中的指定 # gnocchi = ceilometer.publisher.direct:DirectPublisher # 所以這里其實是調用到ceilometer/publisher/direct.py文件的DirectPublisher類實例的publish_samples方法 for p in self.publishers: try: # 所以這里其實是調用到ceilometer/publisher/direct.py文件的DirectPublisher類實例的publish_samples方法 p.publish_samples(transformed_samples) except Exception: .....
這段代碼的主要功能是:
(1)調用對應的transform插件對samples數據進行轉換處理
(2)將轉換后的數據通過publisher插件發送出去
這里再講下_transform_sample主要是在干嘛,舉個例子,libvirt中並不能采集到cpu的使用率,只能采集到cpu時鍾時間,這時就只采集到了cpu info的信息,處理這個sample的時候,就可以通過計算來取得cpu_util的值,所以這樣就轉換成了cpu_util的監控值了。
我們在/etc/ceilometer/pipeline.yaml配置文件中可以看到具體的轉換函數和定義計算規則,如下圖的rate_of_change(在setup.cfg文件中可以看到該轉換函數實現在哪個文件)就是計算cpu使用率的轉換函數,target定義轉換后的單位和計算方法:
計算公式:
首先得到一個周期差:cpu_time_diff = cpu_time_now - cpu_time_before
這個周期差所花費的時間:time_diff = time_now - time_before
計算實際使用率:%cpu = cpu_time_diff / (time_diff * cpu_cores * 10^9) * 100%
cpu_time_now和time_now值都可以在libvirt接口中獲得,cpu_time_before和time_before則是保存的上一次的值。
在openstack的ceilometer項目中cpu_time_now是count_volume(由cpu_time賦值的)值,time_now是monotonic_time值(表示虛擬機從開機到現在的時間)
查看DirectPublisher類實例的publish_samples方法:
File:ceilometer/publisher/direct.py:DirectPublisher.publish_samples
def publish_samples(self, samples): ..... # 這里是調用到ceilometer/dipatcher/gnocchi.py里的record_metering_data self.get_sample_dispatcher().record_metering_data([ utils.meter_message_from_counter(sample, secret=None) for sample in samples])
查看record_metering_data方法實現:
File:ceilometer/dispatcher/gnocchi.py:GnocchiDispatcher.record_metering_data
# 將sample進行加工后發送到gnocchi中去 def record_metering_data(self, data): # 處理data ....... try: # 連接gnocchiclient發送數據到gnocchi中去保存數據 self.batch_measures(measures, gnocchi_data, stats) except (gnocchi_exc.ClientException, ka_exceptions.ConnectFailure) as e: ......
查看batch_measures方法實現:
def batch_measures(self, measures, resource_infos, stats): try: # 調用gnocchiclient/v1/metric.py的batch_resources_metrics_measures方法發起http的post請求 self._gnocchi.metric.batch_resources_metrics_measures( measures, create_metrics=True) except gnocchi_exc.BadRequest as e: ......
這里發送出去后,數據流就到達到gnocchi-api服務上了,交由gnocchi服務來進行計算處理和存儲
2.3 Gnocchi-api服務接收數據流程
該服務現在在比較新的版本中默認配置為是由httpd服務拉起的,拉起時執行的代碼:
File:gnocchi/rest/app.wsgi
from gnocchi.rest import app application = app.build_wsgi_app()
build_wsgi_app又調用了load_app函數:
File:gnocchi/rest/app.py
def load_app(conf, indexer=None, storage=None, not_implemented_middleware=True): global APPCONFIGS # NOTE(sileht): We load config, storage and indexer, # so all if not storage: storage = gnocchi_storage.get_driver(conf) if not indexer: indexer = gnocchi_indexer.get_driver(conf) indexer.connect() # Build the WSGI app cfg_path = conf.api.paste_config if not os.path.isabs(cfg_path): cfg_path = conf.find_file(cfg_path) if cfg_path is None or not os.path.exists(cfg_path): raise cfg.ConfigFilesNotFoundError([conf.api.paste_config]) config = dict(conf=conf, indexer=indexer, storage=storage, not_implemented_middleware=not_implemented_middleware) configkey = str(uuid.uuid4()) APPCONFIGS[configkey] = config LOG.info("WSGI config used: %s", cfg_path) appname = "gnocchi+" + conf.api.auth_mode app = deploy.loadapp("config:" + cfg_path, name=appname, global_conf={'configkey': configkey}) return cors.CORS(app, conf=conf)
這段代碼的主要功能:
(1)根據配置文件加載對應存儲driver插件,對應的是measure storage和Index的插件
(2)建立api服務並運行起來,是使用pecan框架進行建立的
我們這里看一個接收采集數據的過程,數據是從notification-polling->notification-agent-notification->gnocchiclient->gnocchi-api的。
我們看下在gnocchiclient這邊的發送數據代碼邏輯:
File:gnocchiclient/v1/metric.py:MetricManager. batch_resources_metrics_measures
def batch_resources_metrics_measures(self, measures, create_metrics=False): # v1/batch/resources/metrics/measures # 對應gnocchi服務的pecan url路徑查找可知識調用了gnocchi/rest/__init__.py的ResourcesMetricsMeasuresBatchController類的post方法 return self._post( self.resources_batch_url, headers={'Content-Type': "application/json"}, data=jsonutils.dumps(measures), params=dict(create_metrics=create_metrics))
可以看到是通過post的方式發送一個http的請求到gnocchi-api服務的,這里關鍵的是看url路徑,可以通過該url路徑找到在gnocchi-api對應的接收方法,這里我們按照pecan框架的路徑查找方法找到了是在gnocchi/rest/__init__.py的ResourcesMetricsMeasuresBatchController類的post方法:
File:gnocchi/rest/__init__.py:ResourcesMetricsMeasuresBatchController.post
class ResourcesMetricsMeasuresBatchController(rest.RestController): @pecan.expose('json') def post(self, create_metrics=False): # 判斷該監控名字是否已存在和封裝數據等邏輯 ...... # 保存監控數據,等待gnocchi-metricd服務對它異步處理 # pecan.request.storage這個對象是<class 'gnocchi.storage.ceph.CephStorage'> # pecan.request.storage.incoming這個是<class 'gnocchi.storage.incoming.file.FileStorage'> # 這個應該就是我們配置文件中配置的臨時的先放file里(可查看配置文件的[incoming]項,存到/var/lib/gnocchi/該目錄下),最終的聚合計算后的數據放ceph里 pecan.request.storage.incoming.add_measures_batch( dict((metric, body_by_rid[metric.resource_id][metric.name]) for metric in known_metrics)) ....
add_measures_batch方法通過格式化數據后調用_store_new_measures方法來保存:
File:gnocchi/storage/incoming/file.py:FileStorage._store_new_measures
def _store_new_measures(self, metric, data): tmpfile = tempfile.NamedTemporaryFile( prefix='gnocchi', dir=self.basepath_tmp, delete=False) tmpfile.write(data) tmpfile.close() # 這個目錄下/var/lib/gnocchi/ path = self._build_measure_path(metric.id, True) while True: try: os.rename(tmpfile.name, path) break except OSError as e: ......
2.4 Gnocchi-metricd服務存儲數據流程
首先看該服務的程序入口:
File:gnocchi/cli.py
# 程序啟動時入口點 def metricd(): conf = cfg.ConfigOpts() conf.register_cli_opts([ cfg.IntOpt("stop-after-processing-metrics", default=0, min=0, help="Number of metrics to process without workers, " "for testing purpose"), ]) conf = service.prepare_service(conf=conf) if conf.stop_after_processing_metrics: metricd_tester(conf) else: MetricdServiceManager(conf).run()
入口代碼的關鍵代碼在於最后一行的MetricdServiceManager類初始化和調用run方法開始運行服務。
我們先看MetricdServiceManager類初始化:
File:gnocchi/cli.py:MetricdServiceManager
class MetricdServiceManager(cotyledon.ServiceManager): def __init__(self, conf): super(MetricdServiceManager, self).__init__() oslo_config_glue.setup(self, conf) self.conf = conf self.queue = multiprocessing.Manager().Queue() # 從measure storage中取出監控數據存放到queue隊列中 self.add(MetricScheduler, args=(self.conf, self.queue)) # 不斷的從queue隊列中獲取數據並進行聚合計算並保存到aggregate storage中去 self.metric_processor_id = self.add( MetricProcessor, args=(self.conf, self.queue), workers=conf.metricd.workers) if self.conf.metricd.metric_reporting_delay >= 0: self.add(MetricReporting, args=(self.conf,)) # 用來刪除在index數據庫中資源標志為delete的監控數據 self.add(MetricJanitor, args=(self.conf,)) self.register_hooks(on_reload=self.on_reload)
這段代碼的主要功能:
(1)獲取一個queue對象用於保存從measure storage取出的監控數據
(2)初始化一個MetricScheduler類實例並添加到待啟動功能對象中,運行服務后會開啟線程不斷的調用該對象的_run_job方法以不斷的從measure storage中取出數據放到queue隊列中
(3)初始化一個MetricProcessor類實例並添加到待啟動功能對象中,運行服務后開啟線程不斷的調用該對象的_run_job方法以不斷的從queue取出數據根據聚合計算規則進行計算並將結果存儲到aggregate storage中
(4)MetricReporting類添加流程類似如上,它的作用是報告measure storage中有多少條待處理數據需要處理
(5)MetricJanitor類添加流程如上,它的作用是刪除在index數據庫中資源標志為delete的監控數據,比如刪除掉measure storage已經處理過的數據或者aggregate storage中已經過期的數據
初始化完后就會調用run方法把上面添加好的功能類都運行起來,從而整個處理流程就跑起來了:
def run(self): # run方法會一直運行 super(MetricdServiceManager, self).run() # 在程序結束或停止時關閉queue self.queue.close()