diff --git a/cinder/context.py b/cinder/context.py index 9118312ef78..f354b0d0e7c 100644 --- a/cinder/context.py +++ b/cinder/context.py @@ -18,6 +18,7 @@ """RequestContext: context for requests that persist through all of cinder.""" import copy +from typing import Any, Dict, Optional # noqa: H301 from keystoneauth1.access import service_catalog as ksa_service_catalog from keystoneauth1 import plugin @@ -79,10 +80,18 @@ class RequestContext(context.RequestContext): Represents the user taking a given action within the system. """ - def __init__(self, user_id=None, project_id=None, is_admin=None, - read_deleted="no", project_name=None, remote_address=None, - timestamp=None, quota_class=None, service_catalog=None, - user_auth_plugin=None, **kwargs): + def __init__(self, + user_id: Optional[str] = None, + project_id: Optional[str] = None, + is_admin: Optional[bool] = None, + read_deleted: Optional[str] = "no", + project_name: Optional[str] = None, + remote_address: Optional[str] = None, + timestamp=None, + quota_class=None, + service_catalog: Optional[dict] = None, + user_auth_plugin=None, + **kwargs): """Initialize RequestContext. :param read_deleted: 'no' indicates deleted records are hidden, 'yes' @@ -122,7 +131,8 @@ class RequestContext(context.RequestContext): # We need to have RequestContext attributes defined # when policy.check_is_admin invokes request logging # to make it loggable. - if self.is_admin is None: # type: ignore + self.is_admin: Optional[bool] + if self.is_admin is None: self.is_admin = policy.check_is_admin(self) elif self.is_admin and 'admin' not in self.roles: self.roles.append('admin') @@ -134,22 +144,22 @@ class RequestContext(context.RequestContext): else: return _ContextAuthPlugin(self.auth_token, self.service_catalog) - def _get_read_deleted(self): + def _get_read_deleted(self) -> str: return self._read_deleted - def _set_read_deleted(self, read_deleted): + def _set_read_deleted(self, read_deleted: str) -> None: if read_deleted not in ('no', 'yes', 'only'): raise ValueError(_("read_deleted can only be one of 'no', " "'yes' or 'only', not %r") % read_deleted) self._read_deleted = read_deleted - def _del_read_deleted(self): + def _del_read_deleted(self) -> None: del self._read_deleted read_deleted = property(_get_read_deleted, _set_read_deleted, _del_read_deleted) - def to_dict(self): + def to_dict(self) -> Dict[str, Any]: result = super(RequestContext, self).to_dict() result['user_id'] = self.user_id result['project_id'] = self.project_id @@ -164,7 +174,7 @@ class RequestContext(context.RequestContext): return result @classmethod - def from_dict(cls, values): + def from_dict(cls, values: dict) -> 'RequestContext': return cls(user_id=values.get('user_id'), project_id=values.get('project_id'), project_name=values.get('project_name'), @@ -183,7 +193,11 @@ class RequestContext(context.RequestContext): project_domain_id=values.get('project_domain_id'), ) - def authorize(self, action, target=None, target_obj=None, fatal=True): + def authorize(self, + action: str, + target: Optional[dict] = None, + target_obj: Optional[dict] = None, + fatal: bool = True): """Verify that the given action is valid on the target in this context. :param action: string representing the action to be checked. @@ -216,14 +230,16 @@ class RequestContext(context.RequestContext): return policy.authorize(self, action, target, do_raise=fatal, exc=exception.PolicyNotAuthorized) - def to_policy_values(self): + def to_policy_values(self) -> dict: policy = super(RequestContext, self).to_policy_values() policy['is_admin'] = self.is_admin return policy - def elevated(self, read_deleted=None, overwrite=False): + def elevated(self, + read_deleted: Optional[str] = None, + overwrite: bool = False) -> 'RequestContext': """Return a version of this context with admin flag set.""" context = self.deepcopy() context.is_admin = True @@ -236,11 +252,11 @@ class RequestContext(context.RequestContext): return context - def deepcopy(self): + def deepcopy(self) -> 'RequestContext': return copy.deepcopy(self) -def get_admin_context(read_deleted="no"): +def get_admin_context(read_deleted: Optional[str] = "no") -> RequestContext: return RequestContext(user_id=None, project_id=None, is_admin=True, @@ -248,7 +264,7 @@ def get_admin_context(read_deleted="no"): overwrite=False) -def get_internal_tenant_context(): +def get_internal_tenant_context() -> Optional[RequestContext]: """Build and return the Cinder internal tenant context object This request context will only work for internal Cinder operations. It will diff --git a/cinder/manager.py b/cinder/manager.py index 6689b3d0c34..15c4c17276c 100644 --- a/cinder/manager.py +++ b/cinder/manager.py @@ -106,7 +106,7 @@ class Manager(base.Base, PeriodicTasks): def service_topic_queue(self): return self.cluster or self.host - def init_host(self, service_id=None, added_to_cluster=None): + def init_host(self, service_id, added_to_cluster=None): """Handle initialization if this is a standalone service. A hook point for services to execute tasks before the services are made @@ -222,7 +222,9 @@ class SchedulerDependentManager(ThreadPoolManager): class CleanableManager(object): - def do_cleanup(self, context, cleanup_request) -> None: + def do_cleanup(self, + context: context.RequestContext, + cleanup_request: objects.CleanupRequest) -> None: LOG.info('Initiating service %s cleanup', cleanup_request.service_id) @@ -305,10 +307,10 @@ class CleanableManager(object): LOG.info('Service %s cleanup completed.', cleanup_request.service_id) - def _do_cleanup(self, ctxt, vo_resource) -> bool: + def _do_cleanup(self, ctxt: context.RequestContext, vo_resource) -> bool: return False - def init_host(self, service_id, **kwargs) -> None: + def init_host(self, service_id, added_to_cluster=None, **kwargs): ctxt = context.get_admin_context() self.service_id = service_id # TODO(geguileo): Once we don't support MySQL 5.5 anymore we can remove diff --git a/cinder/rpc.py b/cinder/rpc.py index d1f485417e7..f499f9d5089 100644 --- a/cinder/rpc.py +++ b/cinder/rpc.py @@ -26,6 +26,7 @@ __all__ = [ ] import functools +from typing import Tuple, Union # noqa: H301 from oslo_config import cfg from oslo_log import log as logging @@ -53,7 +54,7 @@ ALLOWED_EXMODS = [ EXTRA_EXMODS = [] -def init(conf): +def init(conf) -> None: global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER exmods = get_allowed_exmods() TRANSPORT = messaging.get_rpc_transport(conf, @@ -73,7 +74,7 @@ def init(conf): NOTIFIER = utils.DO_NOTHING -def initialized(): +def initialized() -> bool: return None not in [TRANSPORT, NOTIFIER] @@ -139,7 +140,9 @@ class RequestContextSerializer(messaging.Serializer): return cinder.context.RequestContext.from_dict(context) -def get_client(target, version_cap=None, serializer=None): +def get_client(target, + version_cap=None, + serializer=None) -> messaging.RPCClient: if TRANSPORT is None: raise AssertionError('RPC transport is not initialized.') serializer = RequestContextSerializer(serializer) @@ -149,7 +152,9 @@ def get_client(target, version_cap=None, serializer=None): serializer=serializer) -def get_server(target, endpoints, serializer=None): +def get_server(target, + endpoints, + serializer=None) -> messaging.rpc.server.RPCServer: if TRANSPORT is None: raise AssertionError('RPC transport is not initialized.') serializer = RequestContextSerializer(serializer) @@ -163,7 +168,9 @@ def get_server(target, endpoints, serializer=None): @utils.if_notifications_enabled -def get_notifier(service=None, host=None, publisher_id=None): +def get_notifier(service: str = None, + host: str = None, + publisher_id: str = None) -> messaging.Notifier: if NOTIFIER is None: raise AssertionError('RPC Notifier is not initialized.') if not publisher_id: @@ -222,7 +229,9 @@ class RPCAPI(object): return version return versions[-1] - def _get_cctxt(self, version=None, **kwargs): + def _get_cctxt(self, + version: Union[str, Tuple[str, ...]] = None, + **kwargs): """Prepare client context Version parameter accepts single version string or tuple of strings. diff --git a/cinder/scheduler/filter_scheduler.py b/cinder/scheduler/filter_scheduler.py index 001065f5039..a60026e500c 100644 --- a/cinder/scheduler/filter_scheduler.py +++ b/cinder/scheduler/filter_scheduler.py @@ -24,6 +24,7 @@ from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils +from cinder import context from cinder import exception from cinder.i18n import _ from cinder.scheduler import driver @@ -46,7 +47,9 @@ class FilterScheduler(driver.Scheduler): """Fetch options dictionary. Broken out for testing.""" return self.options.get_configuration() - def populate_filter_properties(self, request_spec, filter_properties): + def populate_filter_properties(self, + request_spec: dict, + filter_properties: dict) -> None: """Stuff things into filter_properties. Can be overridden in a subclass to add more data. @@ -58,11 +61,13 @@ class FilterScheduler(driver.Scheduler): filter_properties['metadata'] = vol.get('metadata') filter_properties['qos_specs'] = vol.get('qos_specs') - def schedule_create_group(self, context, group, + def schedule_create_group(self, + context: context.RequestContext, + group, group_spec, request_spec_list, group_filter_properties, - filter_properties_list): + filter_properties_list) -> None: weighed_backend = self._schedule_generic_group( context, group_spec, @@ -82,7 +87,10 @@ class FilterScheduler(driver.Scheduler): self.volume_rpcapi.create_group(context, updated_group) - def schedule_create_volume(self, context, request_spec, filter_properties): + def schedule_create_volume(self, + context: context.RequestContext, + request_spec: dict, + filter_properties: dict) -> None: backend = self._schedule(context, request_spec, filter_properties) if not backend: @@ -107,8 +115,11 @@ class FilterScheduler(driver.Scheduler): filter_properties, allow_reschedule=True) - def backend_passes_filters(self, context, backend, request_spec, - filter_properties): + def backend_passes_filters(self, + context: context.RequestContext, + backend, + request_spec: dict, + filter_properties: dict): """Check if the specified backend passes the filters.""" weighed_backends = self._get_weighted_candidates(context, request_spec, filter_properties) @@ -132,8 +143,11 @@ class FilterScheduler(driver.Scheduler): raise exception.NoValidBackend(_('Cannot place %(resource)s %(id)s ' 'on %(backend)s.') % reason_param) - def find_retype_backend(self, context, request_spec, - filter_properties=None, migration_policy='never'): + def find_retype_backend(self, + context: context.RequestContext, + request_spec: dict, + filter_properties: dict = None, + migration_policy: str = 'never'): """Find a backend that can accept the volume with its new type.""" filter_properties = filter_properties or {} backend = (request_spec['volume_properties'].get('cluster_name') @@ -186,8 +200,8 @@ class FilterScheduler(driver.Scheduler): def get_pools(self, context, filters): return self.host_manager.get_pools(context, filters) - def _post_select_populate_filter_properties(self, filter_properties, - backend_state): + def _post_select_populate_filter_properties(self, filter_properties: dict, + backend_state) -> None: """Populate filter properties with additional information. Add additional information to the filter properties after a backend has @@ -196,7 +210,7 @@ class FilterScheduler(driver.Scheduler): # Add a retry entry for the selected volume backend: self._add_retry_backend(filter_properties, backend_state.backend_id) - def _add_retry_backend(self, filter_properties, backend): + def _add_retry_backend(self, filter_properties: dict, backend) -> None: """Add a retry entry for the selected volume backend. In the event that the request gets re-scheduled, this entry will signal @@ -211,7 +225,7 @@ class FilterScheduler(driver.Scheduler): if backends is not None: backends.append(backend) - def _max_attempts(self): + def _max_attempts(self) -> int: max_attempts = CONF.scheduler_max_attempts if max_attempts < 1: raise exception.InvalidParameterValue( @@ -271,8 +285,10 @@ class FilterScheduler(driver.Scheduler): {'max_attempts': max_attempts, 'resource_id': resource_id}) - def _get_weighted_candidates(self, context, request_spec, - filter_properties=None): + def _get_weighted_candidates(self, + context: context.RequestContext, + request_spec: dict, + filter_properties: dict = None) -> list: """Return a list of backends that meet required specs. Returned list is ordered by their fitness. @@ -351,7 +367,7 @@ class FilterScheduler(driver.Scheduler): def _get_weighted_candidates_generic_group( self, context, group_spec, request_spec_list, group_filter_properties=None, - filter_properties_list=None): + filter_properties_list=None) -> list: """Finds backends that supports the group. Returns a list of backends that meet the required specs, @@ -443,7 +459,8 @@ class FilterScheduler(driver.Scheduler): return weighed_backends - def _find_valid_backends(self, backend_list1, backend_list2): + def _find_valid_backends(self, + backend_list1: list, backend_list2: list) -> list: new_backends = [] for backend1 in backend_list1: for backend2 in backend_list2: @@ -458,7 +475,7 @@ class FilterScheduler(driver.Scheduler): def _get_weighted_candidates_by_group_type( self, context, group_spec, - group_filter_properties=None): + group_filter_properties=None) -> list: """Finds backends that supports the group type. Returns a list of backends that meet the required specs, @@ -559,7 +576,7 @@ class FilterScheduler(driver.Scheduler): return None return self._choose_top_backend_generic_group(weighed_backends) - def _choose_top_backend(self, weighed_backends, request_spec): + def _choose_top_backend(self, weighed_backends: list, request_spec: dict): top_backend = weighed_backends[0] backend_state = top_backend.obj LOG.debug("Choosing %s", backend_state.backend_id) diff --git a/cinder/scheduler/manager.py b/cinder/scheduler/manager.py index d16385685f1..37cb97c6290 100644 --- a/cinder/scheduler/manager.py +++ b/cinder/scheduler/manager.py @@ -253,7 +253,9 @@ class SchedulerManager(manager.CleanableManager, manager.Manager): volume_rpcapi.VolumeAPI().create_snapshot(ctxt, volume, snapshot) - def _do_cleanup(self, ctxt, vo_resource): + def _do_cleanup(self, + ctxt: context.RequestContext, + vo_resource: 'objects.base.CinderObject'): # We can only receive cleanup requests for volumes, but we check anyway # We need to cleanup the volume status for cases where the scheduler # died while scheduling the volume creation. @@ -262,7 +264,8 @@ class SchedulerManager(manager.CleanableManager, manager.Manager): vo_resource.status = 'error' vo_resource.save() - def request_service_capabilities(self, context): + def request_service_capabilities(self, + context: context.RequestContext) -> None: volume_rpcapi.VolumeAPI().publish_service_capabilities(context) try: self.backup_api.publish_service_capabilities(context) @@ -275,8 +278,11 @@ class SchedulerManager(manager.CleanableManager, manager.Manager): LOG.warning(msg, {'host': self.host, 'e': e}) @append_operation_type() - def migrate_volume(self, context, volume, backend, force_copy, - request_spec, filter_properties): + def migrate_volume(self, + context: context.RequestContext, + volume: objects.Volume, + backend: str, force_copy: bool, + request_spec, filter_properties) -> None: """Ensure that the backend exists and can accept the volume.""" self._wait_for_scheduler() @@ -597,7 +603,7 @@ class SchedulerManager(manager.CleanableManager, manager.Manager): not_requested = [] # To reduce DB queries we'll cache the clusters data - clusters = collections.defaultdict(dict) + clusters: collections.defaultdict = collections.defaultdict(dict) for service in services: cleanup_request.cluster_name = service.cluster_name diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index 11b6319fbc6..21589ce15cf 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -37,8 +37,8 @@ intact. import functools import time -import typing as ty -from typing import Optional +import typing +from typing import Any, Dict, List, Optional, Set, Tuple, Union # noqa: H301 from castellan import key_manager from oslo_config import cfg @@ -238,14 +238,14 @@ class VolumeManager(manager.CleanableManager, 'consistencygroup', 'volume_attachment', 'group', 'snapshots'} def _get_service(self, - host: str = None, + host: Optional[str] = None, binary: str = constants.VOLUME_BINARY) -> objects.Service: host = host or self.host ctxt = context.get_admin_context() svc_host = volume_utils.extract_host(host, 'backend') return objects.Service.get_by_args(ctxt, svc_host, binary) - def __init__(self, volume_driver=None, service_name: str = None, + def __init__(self, volume_driver=None, service_name: Optional[str] = None, *args, **kwargs): """Load the driver from the one specified in args, or from flags.""" # update_service_capabilities needs service_name to be volume @@ -262,6 +262,7 @@ class VolumeManager(manager.CleanableManager, self.service_uuid = None self.cluster: str + self.host: str self.image_volume_cache: Optional[image_cache.ImageVolumeCache] if not volume_driver: @@ -424,7 +425,7 @@ class VolumeManager(manager.CleanableManager, updates, snapshot_updates = self.driver.update_provider_info( volumes, snapshots) - update: ty.Any + update: Any if updates: for volume in volumes: # NOTE(JDG): Make sure returned item is in this hosts volumes @@ -533,7 +534,7 @@ class VolumeManager(manager.CleanableManager, num_vols: int = 0 num_snaps: int = 0 max_objs_num: int = 0 - req_range: ty.Union[ty.List[int], range] = [0] + req_range: Union[List[int], range] = [0] req_limit = CONF.init_host_max_objects_retrieval or 0 use_batch_objects_retrieval: bool = req_limit > 0 @@ -544,7 +545,7 @@ class VolumeManager(manager.CleanableManager, num_snaps, __ = self._get_my_snapshots_summary(ctxt) # Calculate highest number of the objects (volumes or snapshots) max_objs_num = max(num_vols, num_snaps) - max_objs_num = ty.cast(int, max_objs_num) + max_objs_num = typing.cast(int, max_objs_num) # Make batch request loop counter req_range = range(0, max_objs_num, req_limit) @@ -679,7 +680,9 @@ class VolumeManager(manager.CleanableManager, resource={'type': 'driver', 'id': self.driver.__class__.__name__}) - def _do_cleanup(self, ctxt, vo_resource) -> bool: + def _do_cleanup(self, + ctxt: context.RequestContext, + vo_resource: 'objects.base.CinderObject') -> bool: if isinstance(vo_resource, objects.Volume): if vo_resource.status == 'downloading': self.driver.clear_download(ctxt, vo_resource) @@ -721,7 +724,8 @@ class VolumeManager(manager.CleanableManager, """ return self.driver.initialized - def _set_resource_host(self, resource) -> None: + def _set_resource_host(self, resource: Union[objects.Volume, + objects.Group]) -> None: """Set the host field on the DB to our own when we are clustered.""" if (resource.is_clustered and not volume_utils.hosts_are_equivalent(resource.host, @@ -779,7 +783,7 @@ class VolumeManager(manager.CleanableManager, snapshot_id = request_spec.get('snapshot_id') source_volid = request_spec.get('source_volid') - locked_action: ty.Optional[str] + locked_action: Optional[str] if snapshot_id is not None: # Make sure the snapshot is not deleted until we are done with it. locked_action = "%s-%s" % (snapshot_id, 'delete_snapshot') @@ -877,7 +881,7 @@ class VolumeManager(manager.CleanableManager, context: context.RequestContext, volume: objects.volume.Volume, unmanage_only=False, - cascade=False): + cascade=False) -> Optional[bool]: """Deletes and unexports volume. 1. Delete a volume(normal case) @@ -900,7 +904,7 @@ class VolumeManager(manager.CleanableManager, # NOTE(thingee): It could be possible for a volume to # be deleted when resuming deletes from init_host(). LOG.debug("Attempted delete of non-existent volume: %s", volume.id) - return + return None if context.project_id != volume.project_id: project_id = volume.project_id @@ -1031,6 +1035,7 @@ class VolumeManager(manager.CleanableManager, if unmanage_only: msg = "Unmanaged volume successfully." LOG.info(msg, resource=volume) + return None def _clear_db(self, is_migrating_dest, volume_ref, status) -> None: # This method is called when driver.unmanage() or @@ -1279,7 +1284,7 @@ class VolumeManager(manager.CleanableManager, context: context.RequestContext, snapshot: objects.Snapshot, unmanage_only: bool = False, - handle_quota: bool = True): + handle_quota: bool = True) -> Optional[bool]: """Deletes and unexports snapshot.""" context = context.elevated() snapshot._context = context @@ -1358,6 +1363,7 @@ class VolumeManager(manager.CleanableManager, if unmanage_only: msg = "Unmanage snapshot completed successfully." LOG.info(msg, resource=snapshot) + return None @coordination.synchronized('{volume_id}') def attach_volume(self, context, volume_id, instance_uuid, host_name, @@ -1594,8 +1600,7 @@ class VolumeManager(manager.CleanableManager, def _clone_image_volume(self, ctx: context.RequestContext, volume, - image_meta: dict) -> ty.Union[None, - objects.Volume]: + image_meta: dict) -> Optional[objects.Volume]: # TODO: should this return None? volume_type_id: str = volume.get('volume_type_id') reserve_opts: dict = {'volumes': 1, 'gigabytes': volume.size} @@ -1603,7 +1608,7 @@ class VolumeManager(manager.CleanableManager, reservations = QUOTAS.reserve(ctx, **reserve_opts) # NOTE(yikun): Skip 'snapshot_id', 'source_volid' keys to avoid # creating tmp img vol from wrong snapshot or wrong source vol. - skip: ty.Set[str] = {'snapshot_id', 'source_volid'} + skip: Set[str] = {'snapshot_id', 'source_volid'} skip.update(self._VOLUME_CLONE_SKIP_PROPERTIES) try: new_vol_values = {k: volume[k] for k in set(volume.keys()) - skip} @@ -2188,7 +2193,7 @@ class VolumeManager(manager.CleanableManager, self._detach_volume(ctxt, attach_info, volume, properties, force=True, remote=remote) - attach_info = ty.cast(dict, attach_info) + attach_info = typing.cast(dict, attach_info) return attach_info def _detach_volume(self, ctxt, attach_info, volume, properties, @@ -2829,26 +2834,28 @@ class VolumeManager(manager.CleanableManager, def _notify_about_volume_usage(self, context: context.RequestContext, - volume, - event_suffix, - extra_usage_info=None) -> None: + volume: objects.Volume, + event_suffix: str, + extra_usage_info: Optional[dict] = None) \ + -> None: volume_utils.notify_about_volume_usage( context, volume, event_suffix, extra_usage_info=extra_usage_info, host=self.host) def _notify_about_snapshot_usage(self, - context, - snapshot, - event_suffix, - extra_usage_info=None) -> None: + context: context.RequestContext, + snapshot: objects.Snapshot, + event_suffix: str, + extra_usage_info: Optional[dict] = None) \ + -> None: volume_utils.notify_about_snapshot_usage( context, snapshot, event_suffix, extra_usage_info=extra_usage_info, host=self.host) def _notify_about_group_usage(self, - context, - group, - event_suffix, + context: context.RequestContext, + group: objects.Group, + event_suffix: str, volumes=None, extra_usage_info=None) -> None: volume_utils.notify_about_group_usage( @@ -2864,12 +2871,13 @@ class VolumeManager(manager.CleanableManager, context, volume, event_suffix, extra_usage_info=extra_usage_info, host=self.host) - def _notify_about_group_snapshot_usage(self, - context, - group_snapshot, - event_suffix, - snapshots=None, - extra_usage_info=None) -> None: + def _notify_about_group_snapshot_usage( + self, + context: context.RequestContext, + group_snapshot: objects.GroupSnapshot, + event_suffix: str, + snapshots: Optional[list] = None, + extra_usage_info=None) -> None: volume_utils.notify_about_group_snapshot_usage( context, group_snapshot, event_suffix, extra_usage_info=extra_usage_info, host=self.host) @@ -2884,7 +2892,7 @@ class VolumeManager(manager.CleanableManager, extra_usage_info=extra_usage_info, host=self.host) def extend_volume(self, - context, + context: context.RequestContext, volume: objects.Volume, new_size: int, reservations) -> None: @@ -3137,7 +3145,10 @@ class VolumeManager(manager.CleanableManager, replication_status = fields.ReplicationStatus.DISABLED model_update['replication_status'] = replication_status - def manage_existing(self, ctxt, volume, ref=None) -> ovo_fields.UUIDField: + def manage_existing(self, + ctxt: context.RequestContext, + volume: objects.Volume, + ref=None) -> ovo_fields.UUIDField: vol_ref = self._run_manage_existing_flow_engine( ctxt, volume, ref) @@ -3165,7 +3176,7 @@ class VolumeManager(manager.CleanableManager, allocated_capacity_gb=volume_reference.size) def _run_manage_existing_flow_engine(self, - ctxt, + ctxt: context.RequestContext, volume: objects.Volume, ref): try: @@ -3190,7 +3201,7 @@ class VolumeManager(manager.CleanableManager, return vol_ref - def _get_cluster_or_host_filters(self) -> ty.Dict[str, ty.Any]: + def _get_cluster_or_host_filters(self) -> Dict[str, Any]: if self.cluster: filters = {'cluster_name': self.cluster} else: @@ -3199,31 +3210,48 @@ class VolumeManager(manager.CleanableManager, def _get_my_volumes_summary( self, - ctxt: context.RequestContext): + ctxt: context.RequestContext) -> objects.VolumeList: filters = self._get_cluster_or_host_filters() return objects.VolumeList.get_volume_summary(ctxt, False, filters) - def _get_my_snapshots_summary(self, ctxt): + def _get_my_snapshots_summary( + self, + ctxt: context.RequestContext) -> objects.SnapshotList: filters = self._get_cluster_or_host_filters() return objects.SnapshotList.get_snapshot_summary(ctxt, False, filters) - def _get_my_resources(self, ctxt, ovo_class_list, limit=None, offset=None): + def _get_my_resources(self, + ctxt: context.RequestContext, + ovo_class_list, + limit: Optional[int] = None, + offset: Optional[int] = None) -> list: filters = self._get_cluster_or_host_filters() return getattr(ovo_class_list, 'get_all')(ctxt, filters=filters, limit=limit, offset=offset) def _get_my_volumes(self, - ctxt, limit=None, offset=None) -> objects.VolumeList: + ctxt: context.RequestContext, + limit: Optional[int] = None, + offset: Optional[int] = None) -> objects.VolumeList: return self._get_my_resources(ctxt, objects.VolumeList, limit, offset) - def _get_my_snapshots(self, ctxt, limit=None, offset=None): + def _get_my_snapshots( + self, + ctxt: context.RequestContext, + limit: Optional[int] = None, + offset: Optional[int] = None) -> objects.SnapshotList: return self._get_my_resources(ctxt, objects.SnapshotList, limit, offset) - def get_manageable_volumes(self, ctxt, marker, limit, offset, sort_keys, - sort_dirs, want_objects=False): + def get_manageable_volumes(self, + ctxt: context.RequestContext, + marker, + limit: Optional[int], + offset: Optional[int], + sort_keys, + sort_dirs, want_objects=False) -> list: try: volume_utils.require_driver_initialized(self.driver) except exception.DriverNotInitialized: @@ -3307,9 +3335,12 @@ class VolumeManager(manager.CleanableManager, 'id': group.id}) return group - def create_group_from_src(self, context, group, - group_snapshot=None, - source_group=None) -> objects.Group: + def create_group_from_src( + self, + context: context.RequestContext, + group: objects.Group, + group_snapshot: Optional[objects.GroupSnapshot] = None, + source_group=None) -> objects.Group: """Creates the group from source. The source can be a group snapshot or a source group. @@ -3468,11 +3499,15 @@ class VolumeManager(manager.CleanableManager, return group def _create_group_from_src_generic( - self, context, group, volumes, - group_snapshot=None, snapshots=None, - source_group=None, - source_vols=None) -> ty.Tuple[ty.Dict[str, str], - ty.List[ty.Dict[str, str]]]: + self, + context: context.RequestContext, + group: objects.Group, + volumes: List[objects.Volume], + group_snapshot: Optional[objects.GroupSnapshot] = None, + snapshots: Optional[List[objects.Snapshot]] = None, + source_group: Optional[objects.Group] = None, + source_vols: Optional[List[objects.Volume]] = None) \ + -> Tuple[Dict[str, str], List[Dict[str, str]]]: """Creates a group from source. :param context: the context of the caller. @@ -3485,7 +3520,7 @@ class VolumeManager(manager.CleanableManager, :returns: model_update, volumes_model_update """ model_update = {'status': 'available'} - volumes_model_update: list = [] + volumes_model_update: List[dict] = [] for vol in volumes: if snapshots: for snapshot in snapshots: @@ -3548,7 +3583,9 @@ class VolumeManager(manager.CleanableManager, return sorted_snapshots - def _sort_source_vols(self, volumes, source_vols) -> list: + def _sort_source_vols(self, + volumes, + source_vols: objects.VolumeList) -> list: # Sort source volumes so that they are in the same order as their # corresponding target volumes. Each source volume in the source_vols # list should have a corresponding target volume in the volumes list. @@ -3572,7 +3609,8 @@ class VolumeManager(manager.CleanableManager, return sorted_source_vols def _update_volume_from_src(self, - context, vol, update, group=None) -> None: + context: context.RequestContext, + vol, update, group=None) -> None: try: snapshot_id = vol.get('snapshot_id') source_volid = vol.get('source_volid') @@ -3628,9 +3666,9 @@ class VolumeManager(manager.CleanableManager, self.db.volume_update(context, vol['id'], update) def _update_allocated_capacity(self, - vol, - decrement=False, - host: str = None) -> None: + vol: objects.Volume, + decrement: bool = False, + host: Optional[str] = None) -> None: # Update allocated capacity in volume stats host = host or vol['host'] pool = volume_utils.extract_host(host, 'pool') @@ -3648,7 +3686,9 @@ class VolumeManager(manager.CleanableManager, self.stats['pools'][pool] = dict( allocated_capacity_gb=max(vol_size, 0)) - def delete_group(self, context, group: objects.Group) -> None: + def delete_group(self, + context: context.RequestContext, + group: objects.Group) -> None: """Deletes group and the volumes in the group.""" context = context.elevated() project_id = group.project_id @@ -3725,6 +3765,7 @@ class VolumeManager(manager.CleanableManager, vol_obj.save() # Get reservations for group + grpreservations: Optional[list] try: reserve_opts = {'groups': -1} grpreservations = GROUP_QUOTAS.reserve(context, @@ -3739,6 +3780,7 @@ class VolumeManager(manager.CleanableManager, for vol in volumes: # Get reservations for volume + reservations: Optional[list] try: reserve_opts = {'volumes': -1, 'gigabytes': -vol.size} @@ -3779,8 +3821,8 @@ class VolumeManager(manager.CleanableManager, def _convert_group_to_cg( self, group: objects.Group, - volumes: objects.VolumeList) -> ty.Tuple[objects.Group, - objects.VolumeList]: + volumes: objects.VolumeList) -> Tuple[objects.Group, + objects.VolumeList]: if not group: return None, None cg = consistencygroup.ConsistencyGroup() @@ -3791,7 +3833,8 @@ class VolumeManager(manager.CleanableManager, return cg, volumes - def _remove_consistencygroup_id_from_volumes(self, volumes) -> None: + def _remove_consistencygroup_id_from_volumes( + self, volumes: Optional[List[objects.Volume]]) -> None: if not volumes: return for vol in volumes: @@ -3802,8 +3845,8 @@ class VolumeManager(manager.CleanableManager, self, group_snapshot: objects.GroupSnapshot, snapshots: objects.SnapshotList, - ctxt) -> ty.Tuple[objects.CGSnapshot, - objects.SnapshotList]: + ctxt) -> Tuple[objects.CGSnapshot, + objects.SnapshotList]: if not group_snapshot: return None, None cgsnap = cgsnapshot.CGSnapshot() @@ -3820,21 +3863,27 @@ class VolumeManager(manager.CleanableManager, return cgsnap, snapshots - def _remove_cgsnapshot_id_from_snapshots(self, snapshots) -> None: + def _remove_cgsnapshot_id_from_snapshots( + self, snapshots: Optional[list]) -> None: if not snapshots: return for snap in snapshots: snap.cgsnapshot_id = None snap.cgsnapshot = None - def _create_group_generic(self, context, group) -> dict: + def _create_group_generic(self, + context: context.RequestContext, + group) -> dict: """Creates a group.""" # A group entry is already created in db. Just returns a status here. model_update = {'status': fields.GroupStatus.AVAILABLE, 'created_at': timeutils.utcnow()} return model_update - def _delete_group_generic(self, context, group, volumes) -> ty.Tuple: + def _delete_group_generic(self, + context: context.RequestContext, + group: objects.Group, + volumes) -> Tuple: """Deletes a group and volumes in the group.""" model_update = {'status': group.status} volume_model_updates = [] @@ -3854,9 +3903,9 @@ class VolumeManager(manager.CleanableManager, return model_update, volume_model_updates def _update_group_generic( - self, context, group, + self, context: context.RequestContext, group, add_volumes=None, - remove_volumes=None) -> ty.Tuple[None, None, None]: + remove_volumes=None) -> Tuple[None, None, None]: """Updates a group.""" # NOTE(xyang): The volume manager adds/removes the volume to/from the # group in the database. This default implementation does not do @@ -3864,8 +3913,12 @@ class VolumeManager(manager.CleanableManager, return None, None, None def _collect_volumes_for_group( - self, context, group, volumes, add=True) -> list: - valid_status: ty.Tuple[str, ...] + self, + context: context.RequestContext, + group, + volumes: Optional[str], + add: bool = True) -> list: + valid_status: Tuple[str, ...] if add: valid_status = VALID_ADD_VOL_TO_GROUP_STATUS else: @@ -3900,8 +3953,11 @@ class VolumeManager(manager.CleanableManager, volumes_ref.append(add_vol_ref) return volumes_ref - def update_group(self, context, group, - add_volumes=None, remove_volumes=None) -> None: + def update_group(self, + context: context.RequestContext, + group, + add_volumes: Optional[str] = None, + remove_volumes: Optional[str] = None) -> None: """Updates group. Update group by adding volumes to the group, @@ -4002,7 +4058,7 @@ class VolumeManager(manager.CleanableManager, def create_group_snapshot( self, - context, + context: context.RequestContext, group_snapshot: objects.GroupSnapshot) -> objects.GroupSnapshot: """Creates the group_snapshot.""" caller_context = context @@ -4125,8 +4181,10 @@ class VolumeManager(manager.CleanableManager, return group_snapshot def _create_group_snapshot_generic( - self, context, group_snapshot, - snapshots) -> ty.Tuple[dict, ty.List[dict]]: + self, + context: context.RequestContext, + group_snapshot: objects.GroupSnapshot, + snapshots: list) -> Tuple[dict, List[dict]]: """Creates a group_snapshot.""" model_update = {'status': 'available'} snapshot_model_updates = [] @@ -4148,9 +4206,11 @@ class VolumeManager(manager.CleanableManager, return model_update, snapshot_model_updates - def _delete_group_snapshot_generic(self, context, group_snapshot, - snapshots) -> ty.Tuple[dict, - ty.List[dict]]: + def _delete_group_snapshot_generic( + self, + context: context.RequestContext, + group_snapshot: objects.GroupSnapshot, + snapshots: list) -> Tuple[dict, List[dict]]: """Deletes a group_snapshot.""" model_update = {'status': group_snapshot.status} snapshot_model_updates = [] @@ -4171,7 +4231,9 @@ class VolumeManager(manager.CleanableManager, return model_update, snapshot_model_updates - def delete_group_snapshot(self, context, group_snapshot) -> None: + def delete_group_snapshot(self, + context: context.RequestContext, + group_snapshot: objects.GroupSnapshot) -> None: """Deletes group_snapshot.""" caller_context = context context = context.elevated() @@ -4260,6 +4322,7 @@ class VolumeManager(manager.CleanableManager, for snapshot in snapshots: # Get reservations + reservations: Optional[list] try: reserve_opts = {'snapshots': -1} if not CONF.no_snapshot_gb_quota: @@ -4292,7 +4355,11 @@ class VolumeManager(manager.CleanableManager, "delete.end", snapshots) - def update_migrated_volume(self, ctxt, volume, new_volume, volume_status): + def update_migrated_volume(self, + ctxt: context.RequestContext, + volume: objects.Volume, + new_volume: objects.Volume, + volume_status) -> None: """Finalize migration process on backend device.""" model_update = None model_update_default = {'_name_id': new_volume.name_id, @@ -4499,7 +4566,9 @@ class VolumeManager(manager.CleanableManager, # TODO(geguileo): In P - remove this failover_host = failover - def finish_failover(self, context, service, updates) -> None: + def finish_failover(self, + context: context.RequestContext, + service, updates) -> None: """Completion of the failover locally or via RPC.""" # If the service is clustered, broadcast the service changes to all # volume services, including this one. @@ -4516,7 +4585,9 @@ class VolumeManager(manager.CleanableManager, service.update(updates) service.save() - def failover_completed(self, context, updates) -> None: + def failover_completed(self, + context: context.RequestContext, + updates) -> None: """Finalize failover of this backend. When a service is clustered and replicated the failover has 2 stages, @@ -4541,7 +4612,7 @@ class VolumeManager(manager.CleanableManager, fields.ReplicationStatus.ERROR) service.save() - def freeze_host(self, context) -> bool: + def freeze_host(self, context: context.RequestContext) -> bool: """Freeze management plane on this backend. Basically puts the control/management plane into a @@ -4571,7 +4642,7 @@ class VolumeManager(manager.CleanableManager, LOG.info("Set backend status to frozen successfully.") return True - def thaw_host(self, context) -> bool: + def thaw_host(self, context: context.RequestContext) -> bool: """UnFreeze management plane on this backend. Basically puts the control/management plane back into @@ -4601,8 +4672,8 @@ class VolumeManager(manager.CleanableManager, return True def manage_existing_snapshot(self, - ctxt, - snapshot, + ctxt: context.RequestContext, + snapshot: objects.Snapshot, ref=None) -> ovo_fields.UUIDField: LOG.debug('manage_existing_snapshot: managing %s.', ref) try: @@ -4625,7 +4696,11 @@ class VolumeManager(manager.CleanableManager, flow_engine.run() return snapshot.id - def get_manageable_snapshots(self, ctxt, marker, limit, offset, + def get_manageable_snapshots(self, + ctxt: context.RequestContext, + marker, + limit: Optional[int], + offset: Optional[int], sort_keys, sort_dirs, want_objects=False): try: volume_utils.require_driver_initialized(self.driver) @@ -4650,7 +4725,9 @@ class VolumeManager(manager.CleanableManager, "to driver error.") return driver_entries - def get_capabilities(self, context, discover): + def get_capabilities(self, + context: context.RequestContext, + discover: bool): """Get capabilities of backend storage.""" if discover: self.driver.init_capabilities() @@ -4658,7 +4735,10 @@ class VolumeManager(manager.CleanableManager, LOG.debug("Obtained capabilities list: %s.", capabilities) return capabilities - def get_backup_device(self, ctxt, backup, want_objects=False): + def get_backup_device(self, + ctxt: context.RequestContext, + backup: objects.Backup, + want_objects: bool = False): (backup_device, is_snapshot) = ( self.driver.get_backup_device(ctxt, backup)) secure_enabled = self.driver.secure_file_operations_enabled() @@ -4671,17 +4751,18 @@ class VolumeManager(manager.CleanableManager, ctxt) if want_objects else backup_device_dict) - def secure_file_operations_enabled(self, - ctxt: context.RequestContext, - volume): + def secure_file_operations_enabled( + self, + ctxt: context.RequestContext, + volume: Optional[objects.Volume]) -> bool: secure_enabled = self.driver.secure_file_operations_enabled() return secure_enabled def _connection_create(self, ctxt: context.RequestContext, - volume, - attachment, - connector) -> dict: + volume: objects.Volume, + attachment: objects.VolumeAttachment, + connector: dict) -> Dict[str, Any]: try: self.driver.validate_connector(connector) except exception.InvalidConnectorException as err: @@ -4734,9 +4815,9 @@ class VolumeManager(manager.CleanableManager, def attachment_update(self, context: context.RequestContext, - vref, + vref: objects.Volume, connector: dict, - attachment_id: str) -> dict: + attachment_id: str) -> Dict[str, Any]: """Update/Finalize an attachment. This call updates a valid attachment record to associate with a volume @@ -4803,7 +4884,7 @@ class VolumeManager(manager.CleanableManager, context: context.RequestContext, volume, attachment, - force: bool = False) -> ty.Union[None, bool]: + force: bool = False) -> Optional[bool]: """Remove a volume connection, but leave attachment. Exits early if the attachment does not have a connector and returns @@ -4845,8 +4926,8 @@ class VolumeManager(manager.CleanableManager, def attachment_delete(self, context: context.RequestContext, - attachment_id, - vref) -> None: + attachment_id: str, + vref: objects.Volume) -> None: """Delete/Detach the specified attachment. Notifies the backend device that we're detaching the specified @@ -4972,7 +5053,9 @@ class VolumeManager(manager.CleanableManager, 'id': group.id}) # Replication group API (Tiramisu) - def disable_replication(self, ctxt: context.RequestContext, group) -> None: + def disable_replication(self, + ctxt: context.RequestContext, + group: objects.Group) -> None: """Disable replication.""" group.refresh() if group.replication_status != fields.ReplicationStatus.DISABLING: @@ -5057,7 +5140,8 @@ class VolumeManager(manager.CleanableManager, # Replication group API (Tiramisu) def failover_replication(self, ctxt: context.RequestContext, - group, allow_attached_volume=False, + group: objects.Group, + allow_attached_volume: bool = False, secondary_backend_id=None) -> None: """Failover replication.""" group.refresh() @@ -5154,7 +5238,9 @@ class VolumeManager(manager.CleanableManager, resource={'type': 'group', 'id': group.id}) - def list_replication_targets(self, ctxt, group) -> ty.Dict[str, list]: + def list_replication_targets(self, + ctxt: context.RequestContext, + group: objects.Group) -> Dict[str, list]: """Provide a means to obtain replication targets for a group. This method is used to find the replication_device config diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py index e5cdb4c546b..85935c89605 100644 --- a/cinder/volume/rpcapi.py +++ b/cinder/volume/rpcapi.py @@ -12,8 +12,10 @@ # License for the specific language governing permissions and limitations # under the License. +from typing import Optional, Tuple, Union # noqa: H301 from cinder.common import constants +from cinder import context from cinder import objects from cinder import quota from cinder import rpc @@ -141,7 +143,10 @@ class VolumeAPI(rpc.RPCAPI): TOPIC = constants.VOLUME_TOPIC BINARY = constants.VOLUME_BINARY - def _get_cctxt(self, host=None, version=None, **kwargs): + def _get_cctxt(self, + host: str = None, + version: Union[str, Tuple[str, ...]] = None, + **kwargs) -> rpc.RPCAPI: if host: server = volume_utils.extract_host(host) @@ -158,8 +163,12 @@ class VolumeAPI(rpc.RPCAPI): return super(VolumeAPI, self)._get_cctxt(version=version, **kwargs) - def create_volume(self, ctxt, volume, request_spec, filter_properties, - allow_reschedule=True): + def create_volume(self, + ctxt: context.RequestContext, + volume: 'objects.Volume', + request_spec: Optional[dict], + filter_properties: Optional[dict], + allow_reschedule: bool = True) -> None: cctxt = self._get_cctxt(volume.service_topic_queue) cctxt.cast(ctxt, 'create_volume', request_spec=request_spec, @@ -174,7 +183,11 @@ class VolumeAPI(rpc.RPCAPI): cctxt.cast(ctxt, 'revert_to_snapshot', volume=volume, snapshot=snapshot) - def delete_volume(self, ctxt, volume, unmanage_only=False, cascade=False): + def delete_volume(self, + ctxt: context.RequestContext, + volume: 'objects.Volume', + unmanage_only: bool = False, + cascade: bool = False) -> None: volume.create_worker() cctxt = self._get_cctxt(volume.service_topic_queue) msg_args = { @@ -184,7 +197,10 @@ class VolumeAPI(rpc.RPCAPI): cctxt.cast(ctxt, 'delete_volume', **msg_args) - def create_snapshot(self, ctxt, volume, snapshot): + def create_snapshot(self, + ctxt: context.RequestContext, + volume: 'objects.Volume', + snapshot: 'objects.Snapshot') -> None: snapshot.create_worker() cctxt = self._get_cctxt(volume.service_topic_queue) cctxt.cast(ctxt, 'create_snapshot', snapshot=snapshot) @@ -393,7 +409,9 @@ class VolumeAPI(rpc.RPCAPI): return cctxt.call(ctxt, 'get_manageable_snapshots', **msg_args) - def create_group(self, ctxt, group): + def create_group(self, + ctxt: context.RequestContext, + group: 'objects.Group') -> None: cctxt = self._get_cctxt(group.service_topic_queue) cctxt.cast(ctxt, 'create_group', group=group) diff --git a/mypy-files.txt b/mypy-files.txt index 6d71d5d7314..134b5228cf4 100644 --- a/mypy-files.txt +++ b/mypy-files.txt @@ -1,3 +1,5 @@ +cinder/backup/manager.py +cinder/common/constants.py cinder/context.py cinder/i18n.py cinder/image/cache.py @@ -5,10 +7,12 @@ cinder/image/glance.py cinder/image/image_utils.py cinder/exception.py cinder/manager.py +cinder/scheduler/manager.py cinder/utils.py cinder/volume/__init__.py cinder/volume/flows/api/create_volume.py cinder/volume/flows/manager/create_volume.py cinder/volume/manager.py +cinder/volume/rpcapi.py cinder/volume/volume_types.py cinder/volume/volume_utils.py