openstack Rocky系列之Cinder:(二)Cinder 創建一個卷


  上一篇文章寫了cinder服務的啟動,下面講一下openstack是如何通過openstack創建一個卷

  通過查看cinder的api-paste.ini文件,並且現在是v3版本的API,可以得知目前API的router文件是cinder/api/v3/router.py文件

  

  通過查看router.py文件,可以得知,對於volume的操作都會通過mapper重定向到cinder/api/v3/volume.py文件中進行處理

  

 

   看一下創建volume的源碼

 1     def create(self, req, body):
 2         ........
 3         ........
 4         new_volume = self.volume_api.create(context,
 5                                             size,
 6                                             volume.get('display_name'),
 7                                             volume.get('display_description'),
 8                                             **kwargs)
 9 
10         retval = self._view_builder.detail(req, new_volume)
11 
12         return retval    

  此處調用了self.volume_api.create()去創建卷,self.volume_api 這個變量是VolumeController從V2的api繼承過來來的,在初始話的時候被初始化為cinder.volume.api.API(),所以其create方法為cinder/volume/api.py中API類下的create方法

 1     def create(self, context, size, name, description, snapshot=None,
 2                image_id=None, volume_type=None, metadata=None,
 3                availability_zone=None, source_volume=None,
 4                scheduler_hints=None,
 5                source_replica=None, consistencygroup=None,
 6                cgsnapshot=None, multiattach=False, source_cg=None,
 7                group=None, group_snapshot=None, source_group=None,
 8                backup=None):
 9          .........   
10          .........
11         create_what = {
12             'context': context,
13             'raw_size': size,
14             'name': name,
15             'description': description,
16             'snapshot': snapshot,
17             'image_id': image_id,
18             'raw_volume_type': volume_type,
19             'metadata': metadata or {},
20             'raw_availability_zone': availability_zone,
21             'source_volume': source_volume,
22             'scheduler_hints': scheduler_hints,
23             'key_manager': self.key_manager,
24             'optional_args': {'is_quota_committed': False},
25             'consistencygroup': consistencygroup,
26             'cgsnapshot': cgsnapshot,
27             'raw_multiattach': multiattach,
28             'group': group,
29             'group_snapshot': group_snapshot,
30             'source_group': source_group,
31             'backup': backup,
32         }
33         try:
34             sched_rpcapi = (self.scheduler_rpcapi if (
35                             not cgsnapshot and not source_cg and
36                             not group_snapshot and not source_group)
37                             else None)
38             volume_rpcapi = (self.volume_rpcapi if (
39                              not cgsnapshot and not source_cg and
40                              not group_snapshot and not source_group)
41                              else None)
42             flow_engine = create_volume.get_flow(self.db,
43                                                  self.image_service,
44                                                  availability_zones,
45                                                  create_what,
46                                                  sched_rpcapi,
47                                                  volume_rpcapi)
48         except Exception:
49             msg = _('Failed to create api volume flow.')
50             LOG.exception(msg)
51             raise exception.CinderException(msg)    

     此處調用了create_flow中的get_flow方法,進行傳參和並創建,get_flow采用了taskflow,使用了taskflow中的線性流程,依次添加了ExtractVolumeRequestTesk(), QuotaReserveTask(), EntryCreateTask(), QuotaCommitTask() 以及VolumeCastTask()五個步驟

 1 def get_flow(db_api, image_service_api, availability_zones, create_what,
 2              scheduler_rpcapi=None, volume_rpcapi=None):
 3     """Constructs and returns the api entrypoint flow.
 4 
 5     This flow will do the following:
 6 
 7     1. Inject keys & values for dependent tasks.
 8     2. Extracts and validates the input keys & values.
 9     3. Reserves the quota (reverts quota on any failures).
10     4. Creates the database entry.
11     5. Commits the quota.
12     6. Casts to volume manager or scheduler for further processing.
13     """
14 
15     flow_name = ACTION.replace(":", "_") + "_api"
16     api_flow = linear_flow.Flow(flow_name)
17 
18     api_flow.add(ExtractVolumeRequestTask(
19         image_service_api,
20         availability_zones,
21         rebind={'size': 'raw_size',
22                 'availability_zone': 'raw_availability_zone',
23                 'volume_type': 'raw_volume_type',
24                 'multiattach': 'raw_multiattach'}))
25     api_flow.add(QuotaReserveTask(),
26                  EntryCreateTask(),
27                  QuotaCommitTask())
28 
29     if scheduler_rpcapi and volume_rpcapi:
30         # This will cast it out to either the scheduler or volume manager via
31         # the rpc apis provided.
32         api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db_api))
33 
34     # Now load (but do not run) the flow using the provided initial data.
35     return taskflow.engines.load(api_flow, store=create_what)

  taskflow會調用添加的每個步驟類的execute方法,taskflow是openstack中的一個重要組建,用於構建邏輯需要精准步驟的業務,涉及的東西比較多,暫時不在這里記錄

       ExcuactVolumeRequestTask類主要對傳過來的參數進行校驗,提取各類參數,並根據參數進行zone、鏡像等選取的操作,並為QuotaReserveTask 類傳遞參數

       QuotaReserveTask類進行配額檢查以及占用

  EntryCreateTask類主要是是調用cinder.objects.volume.Volume.create()方法在database中創建記錄

       QuotaCommitTask類在數據庫中進行配額的確認

       VolumeCastTask類通過rpc對任務進行投遞投遞的對象為schduler_rpcapi

 

 

 scheduler_rpcapi在調用get_flow時已經指定

 

 

    此時cinder-scheduler接收到cinder-api傳過來的請求,發送請求的代碼部分為 cinder/scheduler/rpcapi.py

 1     def create_volume(self, ctxt, volume, snapshot_id=None, image_id=None,
 2                       request_spec=None, filter_properties=None,
 3                       backup_id=None):
 4         volume.create_worker()
 5         cctxt = self._get_cctxt()
 6         msg_args = {'snapshot_id': snapshot_id, 'image_id': image_id,
 7                     'request_spec': request_spec,
 8                     'filter_properties': filter_properties,
 9                     'volume': volume, 'backup_id': backup_id}
10         if not self.client.can_send_version('3.10'):
11             msg_args.pop('backup_id')
12         return cctxt.cast(ctxt, 'create_volume', **msg_args)

  此處同樣,cinder-scheduler接收為cinder-cherduler的cinder/scheduler/manager.SchedulerManager

 1     @objects.Volume.set_workers
 2     @append_operation_type()
 3     def create_volume(self, context, volume, snapshot_id=None, image_id=None,
 4                       request_spec=None, filter_properties=None,
 5                       backup_id=None):
 6         self._wait_for_scheduler()
 7 
 8         try:
 9             flow_engine = create_volume.get_flow(context,
10                                                  self.driver,
11                                                  request_spec,
12                                                  filter_properties,
13                                                  volume,
14                                                  snapshot_id,
15                                                  image_id,
16                                                  backup_id)
17         except Exception:
18             msg = _("Failed to create scheduler manager volume flow")
19             LOG.exception(msg)
20             raise exception.CinderException(msg)
21 
22         with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
23             flow_engine.run()

  此處cinder-scheduler同樣使用了taskflow模型對磁盤進行創建,看一下這個get_flow中包含的幾個task類

 1 def get_flow(context, driver_api, request_spec=None,
 2              filter_properties=None,
 3              volume=None, snapshot_id=None, image_id=None, backup_id=None):
 4 
 5     create_what = {
 6         'context': context,
 7         'raw_request_spec': request_spec,
 8         'filter_properties': filter_properties,
 9         'volume': volume,
10         'snapshot_id': snapshot_id,
11         'image_id': image_id,
12         'backup_id': backup_id,
13     }
14 
15     flow_name = ACTION.replace(":", "_") + "_scheduler"
16     scheduler_flow = linear_flow.Flow(flow_name)
17 
18     # This will extract and clean the spec from the starting values.
19     scheduler_flow.add(ExtractSchedulerSpecTask(
20         rebind={'request_spec': 'raw_request_spec'}))
21 
22     # This will activate the desired scheduler driver (and handle any
23     # driver related failures appropriately).
24     scheduler_flow.add(ScheduleCreateVolumeTask(driver_api))
25 
26     # Now load (but do not run) the flow using the provided initial data.
27     return taskflow.engines.load(scheduler_flow, store=create_what)

  ExtractSchedulerSpecTask 同樣為對請求參數進行提取加工以供后續調用

      ScheduleCreateVolumeTask中execute中有兩個操作,1調用drvier_api進行volume的創建,2.如果創建過程中出現失敗,則通過message將消息發送給scheduler

 1     def execute(self, context, request_spec, filter_properties, volume):
 2         try:
 3             self.driver_api.schedule_create_volume(context, request_spec,
 4                                                    filter_properties)
 5         except Exception as e:
 6             self.message_api.create(
 7                 context,
 8                 message_field.Action.SCHEDULE_ALLOCATE_VOLUME,
 9                 resource_uuid=request_spec['volume_id'],
10                 exception=e)
11             with excutils.save_and_reraise_exception(
12                     reraise=not isinstance(e, exception.NoValidBackend)):
13                 try:
14                     self._handle_failure(context, request_spec, e)
15                 finally:
16                     common.error_out(volume, reason=e)

  此時driver_api為SchedulerManager初始化時的scheduler_driver

 

 

   可知driver_api為cinder.cheduler.filter_scheduler.FilterScheduler

 1     def schedule_create_volume(self, context, request_spec, filter_properties):
 2         backend = self._schedule(context, request_spec, filter_properties)
 3 
 4         if not backend:
 5             raise exception.NoValidBackend(reason=_("No weighed backends "
 6                                                     "available"))
 7 
 8         backend = backend.obj
 9         volume_id = request_spec['volume_id']
10 
11         updated_volume = driver.volume_update_db(
12             context, volume_id,
13             backend.host,
14             backend.cluster_name,
15             availability_zone=backend.service['availability_zone'])
16         self._post_select_populate_filter_properties(filter_properties,
17                                                      backend)
18 
19         # context is not serializable
20         filter_properties.pop('context', None)
21 
22         self.volume_rpcapi.create_volume(context, updated_volume, request_spec,
23                                          filter_properties,
24                                          allow_reschedule=True)

  self._schedule 通過傳入的參數對后端的進行選擇(多個后端的情況下)

  最后調用self.volume_rpcapi.create_volume進行volume的創建,volume_api為volume_rpcapi.VolumeAPI()

  

1     def create_volume(self, ctxt, volume, request_spec, filter_properties,
2                       allow_reschedule=True):
3         cctxt = self._get_cctxt(volume.service_topic_queue)
4         cctxt.cast(ctxt, 'create_volume',
5                    request_spec=request_spec,
6                    filter_properties=filter_properties,
7                    allow_reschedule=allow_reschedule,
8                    volume=volume)

  此時cinder-schedule通過rpc對cinder-volume發送創建volume的消息,接收消息的是cinder-volume的VolumeManager

  1     @objects.Volume.set_workers
  2     def create_volume(self, context, volume, request_spec=None,
  3                       filter_properties=None, allow_reschedule=True):
  4         """Creates the volume."""
  5         utils.log_unsupported_driver_warning(self.driver)
  6 
  7         self._set_resource_host(volume)
  8 
  9         self._update_allocated_capacity(volume)
 10         # We lose the host value if we reschedule, so keep it here
 11         original_host = volume.host
 12 
 13         context_elevated = context.elevated()
 14         if filter_properties is None:
 15             filter_properties = {}
 16 
 17         if request_spec is None:
 18             request_spec = objects.RequestSpec()
 19 
 20         try:
 21             # NOTE(flaper87): Driver initialization is
 22             # verified by the task itself.
 23             flow_engine = create_volume.get_flow(
 24                 context_elevated,
 25                 self,
 26                 self.db,
 27                 self.driver,
 28                 self.scheduler_rpcapi,
 29                 self.host,
 30                 volume,
 31                 allow_reschedule,
 32                 context,
 33                 request_spec,
 34                 filter_properties,
 35                 image_volume_cache=self.image_volume_cache,
 36             )
 37         except Exception:
 38             msg = _("Create manager volume flow failed.")
 39             LOG.exception(msg, resource={'type': 'volume', 'id': volume.id})
 40             raise exception.CinderException(msg)
 41 
 42         snapshot_id = request_spec.get('snapshot_id')
 43         source_volid = request_spec.get('source_volid')
 44 
 45         if snapshot_id is not None:
 46             # Make sure the snapshot is not deleted until we are done with it.
 47             locked_action = "%s-%s" % (snapshot_id, 'delete_snapshot')
 48         elif source_volid is not None:
 49             # Make sure the volume is not deleted until we are done with it.
 50             locked_action = "%s-%s" % (source_volid, 'delete_volume')
 51         else:
 52             locked_action = None
 53 
 54         def _run_flow():
 55             # This code executes create volume flow. If something goes wrong,
 56             # flow reverts all job that was done and reraises an exception.
 57             # Otherwise, all data that was generated by flow becomes available
 58             # in flow engine's storage.
 59             with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
 60                 flow_engine.run()
 61 
 62         # NOTE(dulek): Flag to indicate if volume was rescheduled. Used to
 63         # decide if allocated_capacity should be incremented.
 64         rescheduled = False
 65 
 66         try:
 67             if locked_action is None:
 68                 _run_flow()
 69             else:
 70                 with coordination.COORDINATOR.get_lock(locked_action):
 71                     _run_flow()
 72         finally:
 73             try:
 74                 flow_engine.storage.fetch('refreshed')
 75             except tfe.NotFound:
 76                 # If there's no vol_ref, then flow is reverted. Lets check out
 77                 # if rescheduling occurred.
 78                 try:
 79                     rescheduled = flow_engine.storage.get_revert_result(
 80                         create_volume.OnFailureRescheduleTask.make_name(
 81                             [create_volume.ACTION]))
 82                 except tfe.NotFound:
 83                     pass
 84 
 85             if rescheduled:
 86                 # NOTE(geguileo): Volume was rescheduled so we need to update
 87                 # volume stats because the volume wasn't created here.
 88                 # Volume.host is None now, so we pass the original host value.
 89                 self._update_allocated_capacity(volume, decrement=True,
 90                                                 host=original_host)
 91 
 92         # Shared targets is only relevant for iSCSI connections.
 93         # We default to True to be on the safe side.
 94         volume.shared_targets = (
 95             self.driver.capabilities.get('storage_protocol') == 'iSCSI' and
 96             self.driver.capabilities.get('shared_targets', True))
 97         # TODO(geguileo): service_uuid won't be enough on Active/Active
 98         # deployments. There can be 2 services handling volumes from the same
 99         # backend.
100         volume.service_uuid = self.service_uuid
101         volume.save()
102 
103         LOG.info("Created volume successfully.", resource=volume)
104         return volume.id

  上述代碼中,同樣使用了taskflow, 

  ExtractVolumeRefTask為提取數據庫中volume的具體信息

  OnFailureRescheduleTask中execute並無操作,但是revert中有操作,是為了以后的步驟出現錯誤進行回滾進行部分操作。

  ExtractVolumeSpecTask 提取spec信息

  NotifyVolumeActionTask 廣播volume開始創建的消息

 1 def get_flow(context, manager, db, driver, scheduler_rpcapi, host, volume,
 2              allow_reschedule, reschedule_context, request_spec,
 3              filter_properties, image_volume_cache=None):
 4 
 5     flow_name = ACTION.replace(":", "_") + "_manager"
 6     volume_flow = linear_flow.Flow(flow_name)
 7 
 8     # This injects the initial starting flow values into the workflow so that
 9     # the dependency order of the tasks provides/requires can be correctly
10     # determined.
11     create_what = {
12         'context': context,
13         'filter_properties': filter_properties,
14         'request_spec': request_spec,
15         'volume': volume,
16     }
17 
18     volume_flow.add(ExtractVolumeRefTask(db, host, set_error=False))
19 
20     retry = filter_properties.get('retry', None)
21 
22     # Always add OnFailureRescheduleTask and we handle the change of volume's
23     # status when reverting the flow. Meanwhile, no need to revert process of
24     # ExtractVolumeRefTask.
25     do_reschedule = allow_reschedule and request_spec and retry
26     volume_flow.add(OnFailureRescheduleTask(reschedule_context, db, driver,
27                                             scheduler_rpcapi, do_reschedule))
28 
29     LOG.debug("Volume reschedule parameters: %(allow)s "
30               "retry: %(retry)s", {'allow': allow_reschedule, 'retry': retry})
31 
32     volume_flow.add(ExtractVolumeSpecTask(db),
33                     NotifyVolumeActionTask(db, "create.start"),
34                     CreateVolumeFromSpecTask(manager,
35                                              db,
36                                              driver,
37                                              image_volume_cache),
38                     CreateVolumeOnFinishTask(db, "create.end"))
39 
40     # Now load (but do not run) the flow using the provided initial data.
41     return taskflow.engines.load(volume_flow, store=create_what)

  CreateVolumeFromSpecTask此處通過傳入的create_type不同,調用不同的接口進行卷的創建,以裸磁盤為例(create_type為raw)

  CreateVolumeOnFinishTask廣播創建磁盤完成

 1         if create_type == 'raw':
 2             model_update = self._create_raw_volume(volume, **volume_spec)
 3         elif create_type == 'snap':
 4             model_update = self._create_from_snapshot(context, volume,
 5                                                       **volume_spec)
 6         elif create_type == 'source_vol':
 7             model_update = self._create_from_source_volume(
 8                 context, volume, **volume_spec)
 9         elif create_type == 'image':
10             model_update = self._create_from_image(context,
11                                                    volume,
12                                                    **volume_spec)
13         elif create_type == 'backup':
14             model_update, need_update_volume = self._create_from_backup(
15                 context, volume, **volume_spec)
16             volume_spec.update({'need_update_volume': need_update_volume})
17         else:
18             raise exception.VolumeTypeNotFound(volume_type_id=create_type)
1     def _create_raw_volume(self, volume, **kwargs):
2         try:
3             ret = self.driver.create_volume(volume)
4         finally:
5             self._cleanup_cg_in_volume(volume)
6         return ret

  此處self.driver為VolumeManager初始化時進行初始化的,可以看出driver是從配置文件中讀取的

 1 self.configuration = config.Configuration(volume_backend_opts,
 2                                                   config_group=service_name)
 3         self._set_tpool_size(
 4             self.configuration.backend_native_threads_pool_size)
 5         self.stats = {}
 6         self.service_uuid = None
 7 
 8         if not volume_driver:
 9             # Get from configuration, which will get the default
10             # if its not using the multi backend
11             volume_driver = self.configuration.volume_driver
12         if volume_driver in MAPPING:
13             LOG.warning("Driver path %s is deprecated, update your "
14                         "configuration to the new path.", volume_driver)
15             volume_driver = MAPPING[volume_driver]

  配置文件中有寫

 

   

 1     def create_volume(self, volume):
 2         """Creates a logical volume."""
 3         mirror_count = 0
 4         if self.configuration.lvm_mirrors:
 5             mirror_count = self.configuration.lvm_mirrors
 6 
 7         self._create_volume(volume['name'],
 8                             self._sizestr(volume['size']),
 9                             self.configuration.lvm_type,
10                             mirror_count)
    def _create_volume(self, name, size, lvm_type, mirror_count, vg=None):
        vg_ref = self.vg
        if vg is not None:
            vg_ref = vg

        vg_ref.create_volume(name, size, lvm_type, mirror_count)

  此處self.vg是什么?全局查找一下,具體初始化時間,可以查看上一篇,cinder服務啟動中的cinder-volume啟動部分

   此時創建卷調用的底層代碼就可以得知,調用的是lvcreate對卷進行創建。

 1     def create_volume(self, name, size_str, lv_type='default', mirror_count=0):
 2         """Creates a logical volume on the object's VG.
 3 
 4         :param name: Name to use when creating Logical Volume
 5         :param size_str: Size to use when creating Logical Volume
 6         :param lv_type: Type of Volume (default or thin)
 7         :param mirror_count: Use LVM mirroring with specified count
 8 
 9         """
10 
11         if lv_type == 'thin':
12             pool_path = '%s/%s' % (self.vg_name, self.vg_thin_pool)
13             cmd = LVM.LVM_CMD_PREFIX + ['lvcreate', '-T', '-V', size_str, '-n',
14                                         name, pool_path]
15         else:
16             cmd = LVM.LVM_CMD_PREFIX + ['lvcreate', '-n', name, self.vg_name,
17                                         '-L', size_str]
18 
19         if mirror_count > 0:
20             cmd.extend(['--type=mirror', '-m', mirror_count, '--nosync',
21                         '--mirrorlog', 'mirrored'])
22             terras = int(size_str[:-1]) / 1024.0
23             if terras >= 1.5:
24                 rsize = int(2 ** math.ceil(math.log(terras) / math.log(2)))
25                 # NOTE(vish): Next power of two for region size. See:
26                 #             http://red.ht/U2BPOD
27                 cmd.extend(['-R', str(rsize)])
28 
29         try:
30             self._execute(*cmd,
31                           root_helper=self._root_helper,
32                           run_as_root=True)
33         except putils.ProcessExecutionError as err:
34             LOG.exception('Error creating Volume')
35             LOG.error('Cmd     :%s', err.cmd)
36             LOG.error('StdOut  :%s', err.stdout)
37             LOG.error('StdErr  :%s', err.stderr)
38             LOG.error('Current state: %s',
39                       self.get_all_volume_groups(self._root_helper))
40             raise

  后續就是一系列回調和通知啦


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM