一、簡單介紹ceilometer
這里長話短說, ceilometer是用來采集openstack下面各種資源的在某一時刻的資源值,比如雲硬盤的大小等。下面是官網現在的架構圖
這里除了ceilometer的架構圖,還有另外三個組件:
- Panko 用來存儲事件的, 后面用來實現cloudkitty事件秒級計費也是我的工作之一,目前實現來一部分,有時間單獨在寫一篇博文。
- gnocchi是用來存儲ceilometer的計量數據,之前的版本是存在mongo中, 不過隨着計量數據的不斷累計, 查詢性能變得極低, 因此openstack后面推出來gnocchi項目,gnocchi的存儲后端支持redis,file,ceph等等。這一塊也是我負責,目前已經實現了, 有時間也可以寫一篇文章。
- Aodh 是用來告警的。
這里需要注意ceilometer 主要有兩個agent:
- 一個是polling 主要是定時調用相應的openstack接口來獲取計量數據,
- 一個是notification 主要是用來監聽openstack的事件消息,然后轉換成相應的計量數據,
兩者的計量數據, 最后通過定義的pipeline,傳遞給gnocchi暴露出來的rest API ,后面由gnocchi來做聚合處理以及存儲
下面來看一下,具體的官網的數據采集和處理轉發的架構圖
再來看一下數據的處理過程
這里,我覺得官方文檔的架構圖描述得非常好, 我就不在多說來。
二、源碼分析
說實話, 目測openstack估計是最大的python項目了,真的是一個龐然大物。第一次接觸的時候,完全不知所措。不過看看就強一點了, 雖然有很多地方還是懵逼。看openstack下面的項目的話,其實有些文件很重要比如setup.py, 里面配置了項目的入口點。這篇文章,我主要分析polling這一塊是如何實現的, 其他的地方類似。
- ceilometer polling-agent啟動的地方
# ceilometer/cmd/polling.py
1 def main(): 2 conf = cfg.ConfigOpts() 3 conf.register_cli_opts(CLI_OPTS) 4 service.prepare_service(conf=conf) 5 sm = cotyledon.ServiceManager() 6 sm.add(create_polling_service, args=(conf,)) 7 oslo_config_glue.setup(sm, conf) 8 sm.run()
# 前面幾行是讀取配置文件, 然后通過cotyledon這個庫add一個polling的service,最后run 起來。 cotyledon這個庫簡單看了一下,可以用來啟動進程任務def create_polling_service(worker_id, conf):
return manager.AgentManager(worker_id,
conf,
conf.polling_namespaces,
conf.pollster_list)
# create_polling_service 返回了一個polling agent polling-namespaces的默認值為choices=['compute', 'central', 'ipmi'], -
polling AgentManager # ceilometer/agent/manager.py
1 class AgentManager(service_base.PipelineBasedService): 2 3 def __init__(self, worker_id, conf, namespaces=None, pollster_list=None, ): 4 5 namespaces = namespaces or ['compute', 'central'] 6 pollster_list = pollster_list or [] 7 group_prefix = conf.polling.partitioning_group_prefix 8 9 # features of using coordination and pollster-list are exclusive, and 10 # cannot be used at one moment to avoid both samples duplication and 11 # samples being lost 12 if pollster_list and conf.coordination.backend_url: 13 raise PollsterListForbidden() 14 15 super(AgentManager, self).__init__(worker_id, conf) 16 17 def _match(pollster): 18 """Find out if pollster name matches to one of the list.""" 19 return any(fnmatch.fnmatch(pollster.name, pattern) for 20 pattern in pollster_list) 21 22 if type(namespaces) is not list: 23 namespaces = [namespaces] 24 25 # we'll have default ['compute', 'central'] here if no namespaces will 26 # be passed 27 extensions = (self._extensions('poll', namespace, self.conf).extensions 28 for namespace in namespaces) 29 # get the extensions from pollster builder 30 extensions_fb = (self._extensions_from_builder('poll', namespace) 31 for namespace in namespaces) 32 if pollster_list: 33 extensions = (moves.filter(_match, exts) 34 for exts in extensions) 35 extensions_fb = (moves.filter(_match, exts) 36 for exts in extensions_fb) 37 38 self.extensions = list(itertools.chain(*list(extensions))) + list( 39 itertools.chain(*list(extensions_fb))) 40 41 if self.extensions == []: 42 raise EmptyPollstersList() 43 44 discoveries = (self._extensions('discover', namespace, 45 self.conf).extensions 46 for namespace in namespaces) 47 self.discoveries = list(itertools.chain(*list(discoveries))) 48 self.polling_periodics = None 49 50 self.partition_coordinator = coordination.PartitionCoordinator( 51 self.conf) 52 self.heartbeat_timer = utils.create_periodic( 53 target=self.partition_coordinator.heartbeat, 54 spacing=self.conf.coordination.heartbeat, 55 run_immediately=True) 56 57 # Compose coordination group prefix. 58 # We'll use namespaces as the basement for this partitioning. 59 namespace_prefix = '-'.join(sorted(namespaces)) 60 self.group_prefix = ('%s-%s' % (namespace_prefix, group_prefix) 61 if group_prefix else namespace_prefix) 62 63 self.notifier = oslo_messaging.Notifier( 64 messaging.get_transport(self.conf), 65 driver=self.conf.publisher_notifier.telemetry_driver, 66 publisher_id="ceilometer.polling") 67 68 self._keystone = None 69 self._keystone_last_exception = None 70 71 72 def run(self): 73 super(AgentManager, self).run() 74 self.polling_manager = pipeline.setup_polling(self.conf) 75 self.join_partitioning_groups() 76 self.start_polling_tasks() 77 self.init_pipeline_refresh()
1 初始化函數里面通過 ExtensionManager加載setup里面定義的各個指標的entry point 包括discover和poll,
discover就是調用openstack的api來get 資源,
poll 就是將discover獲取到資源轉換成相應的sample(某一時刻的指標值)
2 如果有多個agent 還會創建一個定時器來做心跳檢測
3 定義收集到的數據通過消息隊列轉發送到哪里去 (oslo_messaging.Notifier)
4 之后通過run方法啟動polling agent
# setup.py
ceilometer.discover.compute =
local_instances = ceilometer.compute.discovery:InstanceDiscovery
ceilometer.poll.compute =
disk.read.requests = ceilometer.compute.pollsters.disk:ReadRequestsPollster
disk.write.requests = ceilometer.compute.pollsters.disk:WriteRequestsPollster
disk.read.bytes = ceilometer.compute.pollsters.disk:ReadBytesPollster
disk.write.bytes = ceilometer.compute.pollsters.disk:WriteBytesPollster
disk.read.requests.rate = ceilometer.compute.pollsters.disk:ReadRequestsRatePollster
......
- 設置 polling 比如多長的時間間隔去獲取資源的指標
1 def setup_polling(conf): 2 """Setup polling manager according to yaml config file.""" 3 cfg_file = conf.polling.cfg_file 4 return PollingManager(conf, cfg_file)
class PollingManager(ConfigManagerBase):
"""Polling Manager
Polling manager sets up polling according to config file.
"""
def __init__(self, conf, cfg_file):
"""Setup the polling according to config.
The configuration is supported as follows:
{"sources": [{"name": source_1,
"interval": interval_time,
"meters" : ["meter_1", "meter_2"],
"resources": ["resource_uri1", "resource_uri2"],
},
{"name": source_2,
"interval": interval_time,
"meters" : ["meter_3"],
},
]}
}
The interval determines the cadence of sample polling
Valid meter format is '*', '!meter_name', or 'meter_name'.
'*' is wildcard symbol means any meters; '!meter_name' means
"meter_name" will be excluded; 'meter_name' means 'meter_name'
will be included.
Valid meters definition is all "included meter names", all
"excluded meter names", wildcard and "excluded meter names", or
only wildcard.
The resources is list of URI indicating the resources from where
the meters should be polled. It's optional and it's up to the
specific pollster to decide how to use it.
"""
super(PollingManager, self).__init__(conf)
try:
cfg = self.load_config(cfg_file)
except (TypeError, IOError):
LOG.warning(_LW('Unable to locate polling configuration, falling '
'back to pipeline configuration.'))
cfg = self.load_config(conf.pipeline_cfg_file)
self.sources = []
if 'sources' not in cfg:
raise PollingException("sources required", cfg)
for s in cfg.get('sources'):
self.sources.append(PollingSource(s))
# 根據下面的配置文件 etc/ceilometer/polling.yaml 初始化配置---
sources:
- name: all_pollsters
interval: 600
meters:
- "*" - 將每個discovery根據相應的group id 加入到同一個組里面去
1 def join_partitioning_groups(self): 2 self.groups = set([self.construct_group_id(d.obj.group_id) 3 for d in self.discoveries]) 4 # let each set of statically-defined resources have its own group 5 static_resource_groups = set([ 6 self.construct_group_id(utils.hash_of_set(p.resources)) 7 for p in self.polling_manager.sources 8 if p.resources 9 ]) 10 self.groups.update(static_resource_groups) 11 12 if not self.groups and self.partition_coordinator.is_active(): 13 self.partition_coordinator.stop() 14 self.heartbeat_timer.stop() 15 16 if self.groups and not self.partition_coordinator.is_active(): 17 self.partition_coordinator.start() 18 utils.spawn_thread(self.heartbeat_timer.start) 19 20 for group in self.groups: 21 self.partition_coordinator.join_group(group)
- 開啟polling task
1 def start_polling_tasks(self): 2 # allow time for coordination if necessary 3 delay_start = self.partition_coordinator.is_active() 4 5 # set shuffle time before polling task if necessary 6 delay_polling_time = random.randint( 7 0, self.conf.shuffle_time_before_polling_task) 8 9 data = self.setup_polling_tasks() 10 11 # Don't start useless threads if no task will run 12 if not data: 13 return 14 15 # One thread per polling tasks is enough 16 self.polling_periodics = periodics.PeriodicWorker.create( 17 [], executor_factory=lambda: 18 futures.ThreadPoolExecutor(max_workers=len(data))) 19 20 for interval, polling_task in data.items(): 21 delay_time = (interval + delay_polling_time if delay_start 22 else delay_polling_time) 23 24 @periodics.periodic(spacing=interval, run_immediately=False) 25 def task(running_task): 26 self.interval_task(running_task) 27 28 utils.spawn_thread(utils.delayed, delay_time, 29 self.polling_periodics.add, task, polling_task) 30 31 utils.spawn_thread(self.polling_periodics.start, allow_empty=True)
# 根據之前的polling.yaml和從setup文件動態加載的extensions生成一個個task
def setup_polling_tasks(self):
polling_tasks = {}
for source in self.polling_manager.sources:
polling_task = None
for pollster in self.extensions:
if source.support_meter(pollster.name):
polling_task = polling_tasks.get(source.get_interval())
if not polling_task:
polling_task = self.create_polling_task()
polling_tasks[source.get_interval()] = polling_task
polling_task.add(pollster, source)
return polling_tasks
之后通過periodics 和polling.yaml定義的間隔周期性的執行任務def interval_task(self, task):
# NOTE(sileht): remove the previous keystone client
# and exception to get a new one in this polling cycle.
self._keystone = None
self._keystone_last_exception = None
task.poll_and_notify()def poll_and_notify(self):
"""Polling sample and notify."""
cache = {}
discovery_cache = {}
poll_history = {}
for source_name in self.pollster_matches:
for pollster in self.pollster_matches[source_name]:
key = Resources.key(source_name, pollster)
candidate_res = list(
self.resources[key].get(discovery_cache))
if not candidate_res and pollster.obj.default_discovery:
candidate_res = self.manager.discover(
[pollster.obj.default_discovery], discovery_cache)
# Remove duplicated resources and black resources. Using
# set() requires well defined __hash__ for each resource.
# Since __eq__ is defined, 'not in' is safe here.
polling_resources = []
black_res = self.resources[key].blacklist
history = poll_history.get(pollster.name, [])
for x in candidate_res:
if x not in history:
history.append(x)
if x not in black_res:
polling_resources.append(x)
poll_history[pollster.name] = history
# If no resources, skip for this pollster
if not polling_resources:
p_context = 'new ' if history else ''
LOG.info(_LI("Skip pollster %(name)s, no %(p_context)s"
"resources found this cycle"),
{'name': pollster.name, 'p_context': p_context})
continue
LOG.info(_LI("Polling pollster %(poll)s in the context of "
"%(src)s"),
dict(poll=pollster.name, src=source_name))
try:
polling_timestamp = timeutils.utcnow().isoformat()
samples = pollster.obj.get_samples(
manager=self.manager,
cache=cache,
resources=polling_resources
)
sample_batch = []
for sample in samples:
# Note(yuywz): Unify the timestamp of polled samples
sample.set_timestamp(polling_timestamp)
sample_dict = (
publisher_utils.meter_message_from_counter(
sample, self._telemetry_secret
))
if self._batch:
sample_batch.append(sample_dict)
else:
self._send_notification([sample_dict])
if sample_batch:
self._send_notification(sample_batch)
except plugin_base.PollsterPermanentError as err:
LOG.error(_LE(
'Prevent pollster %(name)s from '
'polling %(res_list)s on source %(source)s anymore!')
% ({'name': pollster.name, 'source': source_name,
'res_list': err.fail_res_list}))
self.resources[key].blacklist.extend(err.fail_res_list)
except Exception as err:
LOG.error(_LE(
'Continue after error from %(name)s: %(error)s')
% ({'name': pollster.name, 'error': err}),
exc_info=True)# 循環調用discovery的extensions的 discover方法,獲取資源, 之后調用polling的extensions的get_samples方法將資源轉換成相應的指標對象sample
之后將消息發送到消息隊列里面去。然后由ceilometer的notification agnet 獲取,之后在做進一步的轉換發送給gnocchi
polling agent 的基本過程就是這樣的,后面就是notification agent 的處理