mypy: continued manager, scheduler, rpcapi

Change-Id: I9a8d24ac27af8fe4864934d1b9bc5b66da6d2c1e
This commit is contained in:
Eric Harney 2021-04-01 13:36:00 -04:00
parent 8c46c09ad5
commit b5ac2af0c2
8 changed files with 317 additions and 159 deletions

View File

@ -18,6 +18,7 @@
"""RequestContext: context for requests that persist through all of cinder.""" """RequestContext: context for requests that persist through all of cinder."""
import copy import copy
from typing import Any, Dict, Optional # noqa: H301
from keystoneauth1.access import service_catalog as ksa_service_catalog from keystoneauth1.access import service_catalog as ksa_service_catalog
from keystoneauth1 import plugin from keystoneauth1 import plugin
@ -79,10 +80,18 @@ class RequestContext(context.RequestContext):
Represents the user taking a given action within the system. Represents the user taking a given action within the system.
""" """
def __init__(self, user_id=None, project_id=None, is_admin=None, def __init__(self,
read_deleted="no", project_name=None, remote_address=None, user_id: Optional[str] = None,
timestamp=None, quota_class=None, service_catalog=None, project_id: Optional[str] = None,
user_auth_plugin=None, **kwargs): is_admin: Optional[bool] = None,
read_deleted: Optional[str] = "no",
project_name: Optional[str] = None,
remote_address: Optional[str] = None,
timestamp=None,
quota_class=None,
service_catalog: Optional[dict] = None,
user_auth_plugin=None,
**kwargs):
"""Initialize RequestContext. """Initialize RequestContext.
:param read_deleted: 'no' indicates deleted records are hidden, 'yes' :param read_deleted: 'no' indicates deleted records are hidden, 'yes'
@ -122,7 +131,8 @@ class RequestContext(context.RequestContext):
# We need to have RequestContext attributes defined # We need to have RequestContext attributes defined
# when policy.check_is_admin invokes request logging # when policy.check_is_admin invokes request logging
# to make it loggable. # to make it loggable.
if self.is_admin is None: # type: ignore self.is_admin: Optional[bool]
if self.is_admin is None:
self.is_admin = policy.check_is_admin(self) self.is_admin = policy.check_is_admin(self)
elif self.is_admin and 'admin' not in self.roles: elif self.is_admin and 'admin' not in self.roles:
self.roles.append('admin') self.roles.append('admin')
@ -134,22 +144,22 @@ class RequestContext(context.RequestContext):
else: else:
return _ContextAuthPlugin(self.auth_token, self.service_catalog) return _ContextAuthPlugin(self.auth_token, self.service_catalog)
def _get_read_deleted(self): def _get_read_deleted(self) -> str:
return self._read_deleted return self._read_deleted
def _set_read_deleted(self, read_deleted): def _set_read_deleted(self, read_deleted: str) -> None:
if read_deleted not in ('no', 'yes', 'only'): if read_deleted not in ('no', 'yes', 'only'):
raise ValueError(_("read_deleted can only be one of 'no', " raise ValueError(_("read_deleted can only be one of 'no', "
"'yes' or 'only', not %r") % read_deleted) "'yes' or 'only', not %r") % read_deleted)
self._read_deleted = read_deleted self._read_deleted = read_deleted
def _del_read_deleted(self): def _del_read_deleted(self) -> None:
del self._read_deleted del self._read_deleted
read_deleted = property(_get_read_deleted, _set_read_deleted, read_deleted = property(_get_read_deleted, _set_read_deleted,
_del_read_deleted) _del_read_deleted)
def to_dict(self): def to_dict(self) -> Dict[str, Any]:
result = super(RequestContext, self).to_dict() result = super(RequestContext, self).to_dict()
result['user_id'] = self.user_id result['user_id'] = self.user_id
result['project_id'] = self.project_id result['project_id'] = self.project_id
@ -164,7 +174,7 @@ class RequestContext(context.RequestContext):
return result return result
@classmethod @classmethod
def from_dict(cls, values): def from_dict(cls, values: dict) -> 'RequestContext':
return cls(user_id=values.get('user_id'), return cls(user_id=values.get('user_id'),
project_id=values.get('project_id'), project_id=values.get('project_id'),
project_name=values.get('project_name'), project_name=values.get('project_name'),
@ -183,7 +193,11 @@ class RequestContext(context.RequestContext):
project_domain_id=values.get('project_domain_id'), project_domain_id=values.get('project_domain_id'),
) )
def authorize(self, action, target=None, target_obj=None, fatal=True): def authorize(self,
action: str,
target: Optional[dict] = None,
target_obj: Optional[dict] = None,
fatal: bool = True):
"""Verify that the given action is valid on the target in this context. """Verify that the given action is valid on the target in this context.
:param action: string representing the action to be checked. :param action: string representing the action to be checked.
@ -216,14 +230,16 @@ class RequestContext(context.RequestContext):
return policy.authorize(self, action, target, do_raise=fatal, return policy.authorize(self, action, target, do_raise=fatal,
exc=exception.PolicyNotAuthorized) exc=exception.PolicyNotAuthorized)
def to_policy_values(self): def to_policy_values(self) -> dict:
policy = super(RequestContext, self).to_policy_values() policy = super(RequestContext, self).to_policy_values()
policy['is_admin'] = self.is_admin policy['is_admin'] = self.is_admin
return policy return policy
def elevated(self, read_deleted=None, overwrite=False): def elevated(self,
read_deleted: Optional[str] = None,
overwrite: bool = False) -> 'RequestContext':
"""Return a version of this context with admin flag set.""" """Return a version of this context with admin flag set."""
context = self.deepcopy() context = self.deepcopy()
context.is_admin = True context.is_admin = True
@ -236,11 +252,11 @@ class RequestContext(context.RequestContext):
return context return context
def deepcopy(self): def deepcopy(self) -> 'RequestContext':
return copy.deepcopy(self) return copy.deepcopy(self)
def get_admin_context(read_deleted="no"): def get_admin_context(read_deleted: Optional[str] = "no") -> RequestContext:
return RequestContext(user_id=None, return RequestContext(user_id=None,
project_id=None, project_id=None,
is_admin=True, is_admin=True,
@ -248,7 +264,7 @@ def get_admin_context(read_deleted="no"):
overwrite=False) overwrite=False)
def get_internal_tenant_context(): def get_internal_tenant_context() -> Optional[RequestContext]:
"""Build and return the Cinder internal tenant context object """Build and return the Cinder internal tenant context object
This request context will only work for internal Cinder operations. It will This request context will only work for internal Cinder operations. It will

View File

@ -106,7 +106,7 @@ class Manager(base.Base, PeriodicTasks):
def service_topic_queue(self): def service_topic_queue(self):
return self.cluster or self.host return self.cluster or self.host
def init_host(self, service_id=None, added_to_cluster=None): def init_host(self, service_id, added_to_cluster=None):
"""Handle initialization if this is a standalone service. """Handle initialization if this is a standalone service.
A hook point for services to execute tasks before the services are made A hook point for services to execute tasks before the services are made
@ -222,7 +222,9 @@ class SchedulerDependentManager(ThreadPoolManager):
class CleanableManager(object): class CleanableManager(object):
def do_cleanup(self, context, cleanup_request) -> None: def do_cleanup(self,
context: context.RequestContext,
cleanup_request: objects.CleanupRequest) -> None:
LOG.info('Initiating service %s cleanup', LOG.info('Initiating service %s cleanup',
cleanup_request.service_id) cleanup_request.service_id)
@ -305,10 +307,10 @@ class CleanableManager(object):
LOG.info('Service %s cleanup completed.', cleanup_request.service_id) LOG.info('Service %s cleanup completed.', cleanup_request.service_id)
def _do_cleanup(self, ctxt, vo_resource) -> bool: def _do_cleanup(self, ctxt: context.RequestContext, vo_resource) -> bool:
return False return False
def init_host(self, service_id, **kwargs) -> None: def init_host(self, service_id, added_to_cluster=None, **kwargs):
ctxt = context.get_admin_context() ctxt = context.get_admin_context()
self.service_id = service_id self.service_id = service_id
# TODO(geguileo): Once we don't support MySQL 5.5 anymore we can remove # TODO(geguileo): Once we don't support MySQL 5.5 anymore we can remove

View File

@ -26,6 +26,7 @@ __all__ = [
] ]
import functools import functools
from typing import Tuple, Union # noqa: H301
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
@ -53,7 +54,7 @@ ALLOWED_EXMODS = [
EXTRA_EXMODS = [] EXTRA_EXMODS = []
def init(conf): def init(conf) -> None:
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
exmods = get_allowed_exmods() exmods = get_allowed_exmods()
TRANSPORT = messaging.get_rpc_transport(conf, TRANSPORT = messaging.get_rpc_transport(conf,
@ -73,7 +74,7 @@ def init(conf):
NOTIFIER = utils.DO_NOTHING NOTIFIER = utils.DO_NOTHING
def initialized(): def initialized() -> bool:
return None not in [TRANSPORT, NOTIFIER] return None not in [TRANSPORT, NOTIFIER]
@ -139,7 +140,9 @@ class RequestContextSerializer(messaging.Serializer):
return cinder.context.RequestContext.from_dict(context) return cinder.context.RequestContext.from_dict(context)
def get_client(target, version_cap=None, serializer=None): def get_client(target,
version_cap=None,
serializer=None) -> messaging.RPCClient:
if TRANSPORT is None: if TRANSPORT is None:
raise AssertionError('RPC transport is not initialized.') raise AssertionError('RPC transport is not initialized.')
serializer = RequestContextSerializer(serializer) serializer = RequestContextSerializer(serializer)
@ -149,7 +152,9 @@ def get_client(target, version_cap=None, serializer=None):
serializer=serializer) serializer=serializer)
def get_server(target, endpoints, serializer=None): def get_server(target,
endpoints,
serializer=None) -> messaging.rpc.server.RPCServer:
if TRANSPORT is None: if TRANSPORT is None:
raise AssertionError('RPC transport is not initialized.') raise AssertionError('RPC transport is not initialized.')
serializer = RequestContextSerializer(serializer) serializer = RequestContextSerializer(serializer)
@ -163,7 +168,9 @@ def get_server(target, endpoints, serializer=None):
@utils.if_notifications_enabled @utils.if_notifications_enabled
def get_notifier(service=None, host=None, publisher_id=None): def get_notifier(service: str = None,
host: str = None,
publisher_id: str = None) -> messaging.Notifier:
if NOTIFIER is None: if NOTIFIER is None:
raise AssertionError('RPC Notifier is not initialized.') raise AssertionError('RPC Notifier is not initialized.')
if not publisher_id: if not publisher_id:
@ -222,7 +229,9 @@ class RPCAPI(object):
return version return version
return versions[-1] return versions[-1]
def _get_cctxt(self, version=None, **kwargs): def _get_cctxt(self,
version: Union[str, Tuple[str, ...]] = None,
**kwargs):
"""Prepare client context """Prepare client context
Version parameter accepts single version string or tuple of strings. Version parameter accepts single version string or tuple of strings.

View File

@ -24,6 +24,7 @@ from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
from cinder import context
from cinder import exception from cinder import exception
from cinder.i18n import _ from cinder.i18n import _
from cinder.scheduler import driver from cinder.scheduler import driver
@ -46,7 +47,9 @@ class FilterScheduler(driver.Scheduler):
"""Fetch options dictionary. Broken out for testing.""" """Fetch options dictionary. Broken out for testing."""
return self.options.get_configuration() return self.options.get_configuration()
def populate_filter_properties(self, request_spec, filter_properties): def populate_filter_properties(self,
request_spec: dict,
filter_properties: dict) -> None:
"""Stuff things into filter_properties. """Stuff things into filter_properties.
Can be overridden in a subclass to add more data. Can be overridden in a subclass to add more data.
@ -58,11 +61,13 @@ class FilterScheduler(driver.Scheduler):
filter_properties['metadata'] = vol.get('metadata') filter_properties['metadata'] = vol.get('metadata')
filter_properties['qos_specs'] = vol.get('qos_specs') filter_properties['qos_specs'] = vol.get('qos_specs')
def schedule_create_group(self, context, group, def schedule_create_group(self,
context: context.RequestContext,
group,
group_spec, group_spec,
request_spec_list, request_spec_list,
group_filter_properties, group_filter_properties,
filter_properties_list): filter_properties_list) -> None:
weighed_backend = self._schedule_generic_group( weighed_backend = self._schedule_generic_group(
context, context,
group_spec, group_spec,
@ -82,7 +87,10 @@ class FilterScheduler(driver.Scheduler):
self.volume_rpcapi.create_group(context, updated_group) self.volume_rpcapi.create_group(context, updated_group)
def schedule_create_volume(self, context, request_spec, filter_properties): def schedule_create_volume(self,
context: context.RequestContext,
request_spec: dict,
filter_properties: dict) -> None:
backend = self._schedule(context, request_spec, filter_properties) backend = self._schedule(context, request_spec, filter_properties)
if not backend: if not backend:
@ -107,8 +115,11 @@ class FilterScheduler(driver.Scheduler):
filter_properties, filter_properties,
allow_reschedule=True) allow_reschedule=True)
def backend_passes_filters(self, context, backend, request_spec, def backend_passes_filters(self,
filter_properties): context: context.RequestContext,
backend,
request_spec: dict,
filter_properties: dict):
"""Check if the specified backend passes the filters.""" """Check if the specified backend passes the filters."""
weighed_backends = self._get_weighted_candidates(context, request_spec, weighed_backends = self._get_weighted_candidates(context, request_spec,
filter_properties) filter_properties)
@ -132,8 +143,11 @@ class FilterScheduler(driver.Scheduler):
raise exception.NoValidBackend(_('Cannot place %(resource)s %(id)s ' raise exception.NoValidBackend(_('Cannot place %(resource)s %(id)s '
'on %(backend)s.') % reason_param) 'on %(backend)s.') % reason_param)
def find_retype_backend(self, context, request_spec, def find_retype_backend(self,
filter_properties=None, migration_policy='never'): context: context.RequestContext,
request_spec: dict,
filter_properties: dict = None,
migration_policy: str = 'never'):
"""Find a backend that can accept the volume with its new type.""" """Find a backend that can accept the volume with its new type."""
filter_properties = filter_properties or {} filter_properties = filter_properties or {}
backend = (request_spec['volume_properties'].get('cluster_name') backend = (request_spec['volume_properties'].get('cluster_name')
@ -186,8 +200,8 @@ class FilterScheduler(driver.Scheduler):
def get_pools(self, context, filters): def get_pools(self, context, filters):
return self.host_manager.get_pools(context, filters) return self.host_manager.get_pools(context, filters)
def _post_select_populate_filter_properties(self, filter_properties, def _post_select_populate_filter_properties(self, filter_properties: dict,
backend_state): backend_state) -> None:
"""Populate filter properties with additional information. """Populate filter properties with additional information.
Add additional information to the filter properties after a backend has Add additional information to the filter properties after a backend has
@ -196,7 +210,7 @@ class FilterScheduler(driver.Scheduler):
# Add a retry entry for the selected volume backend: # Add a retry entry for the selected volume backend:
self._add_retry_backend(filter_properties, backend_state.backend_id) self._add_retry_backend(filter_properties, backend_state.backend_id)
def _add_retry_backend(self, filter_properties, backend): def _add_retry_backend(self, filter_properties: dict, backend) -> None:
"""Add a retry entry for the selected volume backend. """Add a retry entry for the selected volume backend.
In the event that the request gets re-scheduled, this entry will signal In the event that the request gets re-scheduled, this entry will signal
@ -211,7 +225,7 @@ class FilterScheduler(driver.Scheduler):
if backends is not None: if backends is not None:
backends.append(backend) backends.append(backend)
def _max_attempts(self): def _max_attempts(self) -> int:
max_attempts = CONF.scheduler_max_attempts max_attempts = CONF.scheduler_max_attempts
if max_attempts < 1: if max_attempts < 1:
raise exception.InvalidParameterValue( raise exception.InvalidParameterValue(
@ -271,8 +285,10 @@ class FilterScheduler(driver.Scheduler):
{'max_attempts': max_attempts, {'max_attempts': max_attempts,
'resource_id': resource_id}) 'resource_id': resource_id})
def _get_weighted_candidates(self, context, request_spec, def _get_weighted_candidates(self,
filter_properties=None): context: context.RequestContext,
request_spec: dict,
filter_properties: dict = None) -> list:
"""Return a list of backends that meet required specs. """Return a list of backends that meet required specs.
Returned list is ordered by their fitness. Returned list is ordered by their fitness.
@ -351,7 +367,7 @@ class FilterScheduler(driver.Scheduler):
def _get_weighted_candidates_generic_group( def _get_weighted_candidates_generic_group(
self, context, group_spec, request_spec_list, self, context, group_spec, request_spec_list,
group_filter_properties=None, group_filter_properties=None,
filter_properties_list=None): filter_properties_list=None) -> list:
"""Finds backends that supports the group. """Finds backends that supports the group.
Returns a list of backends that meet the required specs, Returns a list of backends that meet the required specs,
@ -443,7 +459,8 @@ class FilterScheduler(driver.Scheduler):
return weighed_backends return weighed_backends
def _find_valid_backends(self, backend_list1, backend_list2): def _find_valid_backends(self,
backend_list1: list, backend_list2: list) -> list:
new_backends = [] new_backends = []
for backend1 in backend_list1: for backend1 in backend_list1:
for backend2 in backend_list2: for backend2 in backend_list2:
@ -458,7 +475,7 @@ class FilterScheduler(driver.Scheduler):
def _get_weighted_candidates_by_group_type( def _get_weighted_candidates_by_group_type(
self, context, group_spec, self, context, group_spec,
group_filter_properties=None): group_filter_properties=None) -> list:
"""Finds backends that supports the group type. """Finds backends that supports the group type.
Returns a list of backends that meet the required specs, Returns a list of backends that meet the required specs,
@ -559,7 +576,7 @@ class FilterScheduler(driver.Scheduler):
return None return None
return self._choose_top_backend_generic_group(weighed_backends) return self._choose_top_backend_generic_group(weighed_backends)
def _choose_top_backend(self, weighed_backends, request_spec): def _choose_top_backend(self, weighed_backends: list, request_spec: dict):
top_backend = weighed_backends[0] top_backend = weighed_backends[0]
backend_state = top_backend.obj backend_state = top_backend.obj
LOG.debug("Choosing %s", backend_state.backend_id) LOG.debug("Choosing %s", backend_state.backend_id)

View File

@ -253,7 +253,9 @@ class SchedulerManager(manager.CleanableManager, manager.Manager):
volume_rpcapi.VolumeAPI().create_snapshot(ctxt, volume, volume_rpcapi.VolumeAPI().create_snapshot(ctxt, volume,
snapshot) snapshot)
def _do_cleanup(self, ctxt, vo_resource): def _do_cleanup(self,
ctxt: context.RequestContext,
vo_resource: 'objects.base.CinderObject'):
# We can only receive cleanup requests for volumes, but we check anyway # We can only receive cleanup requests for volumes, but we check anyway
# We need to cleanup the volume status for cases where the scheduler # We need to cleanup the volume status for cases where the scheduler
# died while scheduling the volume creation. # died while scheduling the volume creation.
@ -262,7 +264,8 @@ class SchedulerManager(manager.CleanableManager, manager.Manager):
vo_resource.status = 'error' vo_resource.status = 'error'
vo_resource.save() vo_resource.save()
def request_service_capabilities(self, context): def request_service_capabilities(self,
context: context.RequestContext) -> None:
volume_rpcapi.VolumeAPI().publish_service_capabilities(context) volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
try: try:
self.backup_api.publish_service_capabilities(context) self.backup_api.publish_service_capabilities(context)
@ -275,8 +278,11 @@ class SchedulerManager(manager.CleanableManager, manager.Manager):
LOG.warning(msg, {'host': self.host, 'e': e}) LOG.warning(msg, {'host': self.host, 'e': e})
@append_operation_type() @append_operation_type()
def migrate_volume(self, context, volume, backend, force_copy, def migrate_volume(self,
request_spec, filter_properties): context: context.RequestContext,
volume: objects.Volume,
backend: str, force_copy: bool,
request_spec, filter_properties) -> None:
"""Ensure that the backend exists and can accept the volume.""" """Ensure that the backend exists and can accept the volume."""
self._wait_for_scheduler() self._wait_for_scheduler()
@ -597,7 +603,7 @@ class SchedulerManager(manager.CleanableManager, manager.Manager):
not_requested = [] not_requested = []
# To reduce DB queries we'll cache the clusters data # To reduce DB queries we'll cache the clusters data
clusters = collections.defaultdict(dict) clusters: collections.defaultdict = collections.defaultdict(dict)
for service in services: for service in services:
cleanup_request.cluster_name = service.cluster_name cleanup_request.cluster_name = service.cluster_name

View File

@ -37,8 +37,8 @@ intact.
import functools import functools
import time import time
import typing as ty import typing
from typing import Optional from typing import Any, Dict, List, Optional, Set, Tuple, Union # noqa: H301
from castellan import key_manager from castellan import key_manager
from oslo_config import cfg from oslo_config import cfg
@ -238,14 +238,14 @@ class VolumeManager(manager.CleanableManager,
'consistencygroup', 'volume_attachment', 'group', 'snapshots'} 'consistencygroup', 'volume_attachment', 'group', 'snapshots'}
def _get_service(self, def _get_service(self,
host: str = None, host: Optional[str] = None,
binary: str = constants.VOLUME_BINARY) -> objects.Service: binary: str = constants.VOLUME_BINARY) -> objects.Service:
host = host or self.host host = host or self.host
ctxt = context.get_admin_context() ctxt = context.get_admin_context()
svc_host = volume_utils.extract_host(host, 'backend') svc_host = volume_utils.extract_host(host, 'backend')
return objects.Service.get_by_args(ctxt, svc_host, binary) return objects.Service.get_by_args(ctxt, svc_host, binary)
def __init__(self, volume_driver=None, service_name: str = None, def __init__(self, volume_driver=None, service_name: Optional[str] = None,
*args, **kwargs): *args, **kwargs):
"""Load the driver from the one specified in args, or from flags.""" """Load the driver from the one specified in args, or from flags."""
# update_service_capabilities needs service_name to be volume # update_service_capabilities needs service_name to be volume
@ -262,6 +262,7 @@ class VolumeManager(manager.CleanableManager,
self.service_uuid = None self.service_uuid = None
self.cluster: str self.cluster: str
self.host: str
self.image_volume_cache: Optional[image_cache.ImageVolumeCache] self.image_volume_cache: Optional[image_cache.ImageVolumeCache]
if not volume_driver: if not volume_driver:
@ -424,7 +425,7 @@ class VolumeManager(manager.CleanableManager,
updates, snapshot_updates = self.driver.update_provider_info( updates, snapshot_updates = self.driver.update_provider_info(
volumes, snapshots) volumes, snapshots)
update: ty.Any update: Any
if updates: if updates:
for volume in volumes: for volume in volumes:
# NOTE(JDG): Make sure returned item is in this hosts volumes # NOTE(JDG): Make sure returned item is in this hosts volumes
@ -533,7 +534,7 @@ class VolumeManager(manager.CleanableManager,
num_vols: int = 0 num_vols: int = 0
num_snaps: int = 0 num_snaps: int = 0
max_objs_num: int = 0 max_objs_num: int = 0
req_range: ty.Union[ty.List[int], range] = [0] req_range: Union[List[int], range] = [0]
req_limit = CONF.init_host_max_objects_retrieval or 0 req_limit = CONF.init_host_max_objects_retrieval or 0
use_batch_objects_retrieval: bool = req_limit > 0 use_batch_objects_retrieval: bool = req_limit > 0
@ -544,7 +545,7 @@ class VolumeManager(manager.CleanableManager,
num_snaps, __ = self._get_my_snapshots_summary(ctxt) num_snaps, __ = self._get_my_snapshots_summary(ctxt)
# Calculate highest number of the objects (volumes or snapshots) # Calculate highest number of the objects (volumes or snapshots)
max_objs_num = max(num_vols, num_snaps) max_objs_num = max(num_vols, num_snaps)
max_objs_num = ty.cast(int, max_objs_num) max_objs_num = typing.cast(int, max_objs_num)
# Make batch request loop counter # Make batch request loop counter
req_range = range(0, max_objs_num, req_limit) req_range = range(0, max_objs_num, req_limit)
@ -679,7 +680,9 @@ class VolumeManager(manager.CleanableManager,
resource={'type': 'driver', resource={'type': 'driver',
'id': self.driver.__class__.__name__}) 'id': self.driver.__class__.__name__})
def _do_cleanup(self, ctxt, vo_resource) -> bool: def _do_cleanup(self,
ctxt: context.RequestContext,
vo_resource: 'objects.base.CinderObject') -> bool:
if isinstance(vo_resource, objects.Volume): if isinstance(vo_resource, objects.Volume):
if vo_resource.status == 'downloading': if vo_resource.status == 'downloading':
self.driver.clear_download(ctxt, vo_resource) self.driver.clear_download(ctxt, vo_resource)
@ -721,7 +724,8 @@ class VolumeManager(manager.CleanableManager,
""" """
return self.driver.initialized return self.driver.initialized
def _set_resource_host(self, resource) -> None: def _set_resource_host(self, resource: Union[objects.Volume,
objects.Group]) -> None:
"""Set the host field on the DB to our own when we are clustered.""" """Set the host field on the DB to our own when we are clustered."""
if (resource.is_clustered and if (resource.is_clustered and
not volume_utils.hosts_are_equivalent(resource.host, not volume_utils.hosts_are_equivalent(resource.host,
@ -779,7 +783,7 @@ class VolumeManager(manager.CleanableManager,
snapshot_id = request_spec.get('snapshot_id') snapshot_id = request_spec.get('snapshot_id')
source_volid = request_spec.get('source_volid') source_volid = request_spec.get('source_volid')
locked_action: ty.Optional[str] locked_action: Optional[str]
if snapshot_id is not None: if snapshot_id is not None:
# Make sure the snapshot is not deleted until we are done with it. # Make sure the snapshot is not deleted until we are done with it.
locked_action = "%s-%s" % (snapshot_id, 'delete_snapshot') locked_action = "%s-%s" % (snapshot_id, 'delete_snapshot')
@ -877,7 +881,7 @@ class VolumeManager(manager.CleanableManager,
context: context.RequestContext, context: context.RequestContext,
volume: objects.volume.Volume, volume: objects.volume.Volume,
unmanage_only=False, unmanage_only=False,
cascade=False): cascade=False) -> Optional[bool]:
"""Deletes and unexports volume. """Deletes and unexports volume.
1. Delete a volume(normal case) 1. Delete a volume(normal case)
@ -900,7 +904,7 @@ class VolumeManager(manager.CleanableManager,
# NOTE(thingee): It could be possible for a volume to # NOTE(thingee): It could be possible for a volume to
# be deleted when resuming deletes from init_host(). # be deleted when resuming deletes from init_host().
LOG.debug("Attempted delete of non-existent volume: %s", volume.id) LOG.debug("Attempted delete of non-existent volume: %s", volume.id)
return return None
if context.project_id != volume.project_id: if context.project_id != volume.project_id:
project_id = volume.project_id project_id = volume.project_id
@ -1031,6 +1035,7 @@ class VolumeManager(manager.CleanableManager,
if unmanage_only: if unmanage_only:
msg = "Unmanaged volume successfully." msg = "Unmanaged volume successfully."
LOG.info(msg, resource=volume) LOG.info(msg, resource=volume)
return None
def _clear_db(self, is_migrating_dest, volume_ref, status) -> None: def _clear_db(self, is_migrating_dest, volume_ref, status) -> None:
# This method is called when driver.unmanage() or # This method is called when driver.unmanage() or
@ -1279,7 +1284,7 @@ class VolumeManager(manager.CleanableManager,
context: context.RequestContext, context: context.RequestContext,
snapshot: objects.Snapshot, snapshot: objects.Snapshot,
unmanage_only: bool = False, unmanage_only: bool = False,
handle_quota: bool = True): handle_quota: bool = True) -> Optional[bool]:
"""Deletes and unexports snapshot.""" """Deletes and unexports snapshot."""
context = context.elevated() context = context.elevated()
snapshot._context = context snapshot._context = context
@ -1358,6 +1363,7 @@ class VolumeManager(manager.CleanableManager,
if unmanage_only: if unmanage_only:
msg = "Unmanage snapshot completed successfully." msg = "Unmanage snapshot completed successfully."
LOG.info(msg, resource=snapshot) LOG.info(msg, resource=snapshot)
return None
@coordination.synchronized('{volume_id}') @coordination.synchronized('{volume_id}')
def attach_volume(self, context, volume_id, instance_uuid, host_name, def attach_volume(self, context, volume_id, instance_uuid, host_name,
@ -1594,8 +1600,7 @@ class VolumeManager(manager.CleanableManager,
def _clone_image_volume(self, def _clone_image_volume(self,
ctx: context.RequestContext, ctx: context.RequestContext,
volume, volume,
image_meta: dict) -> ty.Union[None, image_meta: dict) -> Optional[objects.Volume]:
objects.Volume]:
# TODO: should this return None? # TODO: should this return None?
volume_type_id: str = volume.get('volume_type_id') volume_type_id: str = volume.get('volume_type_id')
reserve_opts: dict = {'volumes': 1, 'gigabytes': volume.size} reserve_opts: dict = {'volumes': 1, 'gigabytes': volume.size}
@ -1603,7 +1608,7 @@ class VolumeManager(manager.CleanableManager,
reservations = QUOTAS.reserve(ctx, **reserve_opts) reservations = QUOTAS.reserve(ctx, **reserve_opts)
# NOTE(yikun): Skip 'snapshot_id', 'source_volid' keys to avoid # NOTE(yikun): Skip 'snapshot_id', 'source_volid' keys to avoid
# creating tmp img vol from wrong snapshot or wrong source vol. # creating tmp img vol from wrong snapshot or wrong source vol.
skip: ty.Set[str] = {'snapshot_id', 'source_volid'} skip: Set[str] = {'snapshot_id', 'source_volid'}
skip.update(self._VOLUME_CLONE_SKIP_PROPERTIES) skip.update(self._VOLUME_CLONE_SKIP_PROPERTIES)
try: try:
new_vol_values = {k: volume[k] for k in set(volume.keys()) - skip} new_vol_values = {k: volume[k] for k in set(volume.keys()) - skip}
@ -2188,7 +2193,7 @@ class VolumeManager(manager.CleanableManager,
self._detach_volume(ctxt, attach_info, volume, properties, self._detach_volume(ctxt, attach_info, volume, properties,
force=True, remote=remote) force=True, remote=remote)
attach_info = ty.cast(dict, attach_info) attach_info = typing.cast(dict, attach_info)
return attach_info return attach_info
def _detach_volume(self, ctxt, attach_info, volume, properties, def _detach_volume(self, ctxt, attach_info, volume, properties,
@ -2829,26 +2834,28 @@ class VolumeManager(manager.CleanableManager,
def _notify_about_volume_usage(self, def _notify_about_volume_usage(self,
context: context.RequestContext, context: context.RequestContext,
volume, volume: objects.Volume,
event_suffix, event_suffix: str,
extra_usage_info=None) -> None: extra_usage_info: Optional[dict] = None) \
-> None:
volume_utils.notify_about_volume_usage( volume_utils.notify_about_volume_usage(
context, volume, event_suffix, context, volume, event_suffix,
extra_usage_info=extra_usage_info, host=self.host) extra_usage_info=extra_usage_info, host=self.host)
def _notify_about_snapshot_usage(self, def _notify_about_snapshot_usage(self,
context, context: context.RequestContext,
snapshot, snapshot: objects.Snapshot,
event_suffix, event_suffix: str,
extra_usage_info=None) -> None: extra_usage_info: Optional[dict] = None) \
-> None:
volume_utils.notify_about_snapshot_usage( volume_utils.notify_about_snapshot_usage(
context, snapshot, event_suffix, context, snapshot, event_suffix,
extra_usage_info=extra_usage_info, host=self.host) extra_usage_info=extra_usage_info, host=self.host)
def _notify_about_group_usage(self, def _notify_about_group_usage(self,
context, context: context.RequestContext,
group, group: objects.Group,
event_suffix, event_suffix: str,
volumes=None, volumes=None,
extra_usage_info=None) -> None: extra_usage_info=None) -> None:
volume_utils.notify_about_group_usage( volume_utils.notify_about_group_usage(
@ -2864,12 +2871,13 @@ class VolumeManager(manager.CleanableManager,
context, volume, event_suffix, context, volume, event_suffix,
extra_usage_info=extra_usage_info, host=self.host) extra_usage_info=extra_usage_info, host=self.host)
def _notify_about_group_snapshot_usage(self, def _notify_about_group_snapshot_usage(
context, self,
group_snapshot, context: context.RequestContext,
event_suffix, group_snapshot: objects.GroupSnapshot,
snapshots=None, event_suffix: str,
extra_usage_info=None) -> None: snapshots: Optional[list] = None,
extra_usage_info=None) -> None:
volume_utils.notify_about_group_snapshot_usage( volume_utils.notify_about_group_snapshot_usage(
context, group_snapshot, event_suffix, context, group_snapshot, event_suffix,
extra_usage_info=extra_usage_info, host=self.host) extra_usage_info=extra_usage_info, host=self.host)
@ -2884,7 +2892,7 @@ class VolumeManager(manager.CleanableManager,
extra_usage_info=extra_usage_info, host=self.host) extra_usage_info=extra_usage_info, host=self.host)
def extend_volume(self, def extend_volume(self,
context, context: context.RequestContext,
volume: objects.Volume, volume: objects.Volume,
new_size: int, new_size: int,
reservations) -> None: reservations) -> None:
@ -3137,7 +3145,10 @@ class VolumeManager(manager.CleanableManager,
replication_status = fields.ReplicationStatus.DISABLED replication_status = fields.ReplicationStatus.DISABLED
model_update['replication_status'] = replication_status model_update['replication_status'] = replication_status
def manage_existing(self, ctxt, volume, ref=None) -> ovo_fields.UUIDField: def manage_existing(self,
ctxt: context.RequestContext,
volume: objects.Volume,
ref=None) -> ovo_fields.UUIDField:
vol_ref = self._run_manage_existing_flow_engine( vol_ref = self._run_manage_existing_flow_engine(
ctxt, volume, ref) ctxt, volume, ref)
@ -3165,7 +3176,7 @@ class VolumeManager(manager.CleanableManager,
allocated_capacity_gb=volume_reference.size) allocated_capacity_gb=volume_reference.size)
def _run_manage_existing_flow_engine(self, def _run_manage_existing_flow_engine(self,
ctxt, ctxt: context.RequestContext,
volume: objects.Volume, volume: objects.Volume,
ref): ref):
try: try:
@ -3190,7 +3201,7 @@ class VolumeManager(manager.CleanableManager,
return vol_ref return vol_ref
def _get_cluster_or_host_filters(self) -> ty.Dict[str, ty.Any]: def _get_cluster_or_host_filters(self) -> Dict[str, Any]:
if self.cluster: if self.cluster:
filters = {'cluster_name': self.cluster} filters = {'cluster_name': self.cluster}
else: else:
@ -3199,31 +3210,48 @@ class VolumeManager(manager.CleanableManager,
def _get_my_volumes_summary( def _get_my_volumes_summary(
self, self,
ctxt: context.RequestContext): ctxt: context.RequestContext) -> objects.VolumeList:
filters = self._get_cluster_or_host_filters() filters = self._get_cluster_or_host_filters()
return objects.VolumeList.get_volume_summary(ctxt, False, filters) return objects.VolumeList.get_volume_summary(ctxt, False, filters)
def _get_my_snapshots_summary(self, ctxt): def _get_my_snapshots_summary(
self,
ctxt: context.RequestContext) -> objects.SnapshotList:
filters = self._get_cluster_or_host_filters() filters = self._get_cluster_or_host_filters()
return objects.SnapshotList.get_snapshot_summary(ctxt, False, filters) return objects.SnapshotList.get_snapshot_summary(ctxt, False, filters)
def _get_my_resources(self, ctxt, ovo_class_list, limit=None, offset=None): def _get_my_resources(self,
ctxt: context.RequestContext,
ovo_class_list,
limit: Optional[int] = None,
offset: Optional[int] = None) -> list:
filters = self._get_cluster_or_host_filters() filters = self._get_cluster_or_host_filters()
return getattr(ovo_class_list, 'get_all')(ctxt, filters=filters, return getattr(ovo_class_list, 'get_all')(ctxt, filters=filters,
limit=limit, limit=limit,
offset=offset) offset=offset)
def _get_my_volumes(self, def _get_my_volumes(self,
ctxt, limit=None, offset=None) -> objects.VolumeList: ctxt: context.RequestContext,
limit: Optional[int] = None,
offset: Optional[int] = None) -> objects.VolumeList:
return self._get_my_resources(ctxt, objects.VolumeList, return self._get_my_resources(ctxt, objects.VolumeList,
limit, offset) limit, offset)
def _get_my_snapshots(self, ctxt, limit=None, offset=None): def _get_my_snapshots(
self,
ctxt: context.RequestContext,
limit: Optional[int] = None,
offset: Optional[int] = None) -> objects.SnapshotList:
return self._get_my_resources(ctxt, objects.SnapshotList, return self._get_my_resources(ctxt, objects.SnapshotList,
limit, offset) limit, offset)
def get_manageable_volumes(self, ctxt, marker, limit, offset, sort_keys, def get_manageable_volumes(self,
sort_dirs, want_objects=False): ctxt: context.RequestContext,
marker,
limit: Optional[int],
offset: Optional[int],
sort_keys,
sort_dirs, want_objects=False) -> list:
try: try:
volume_utils.require_driver_initialized(self.driver) volume_utils.require_driver_initialized(self.driver)
except exception.DriverNotInitialized: except exception.DriverNotInitialized:
@ -3307,9 +3335,12 @@ class VolumeManager(manager.CleanableManager,
'id': group.id}) 'id': group.id})
return group return group
def create_group_from_src(self, context, group, def create_group_from_src(
group_snapshot=None, self,
source_group=None) -> objects.Group: context: context.RequestContext,
group: objects.Group,
group_snapshot: Optional[objects.GroupSnapshot] = None,
source_group=None) -> objects.Group:
"""Creates the group from source. """Creates the group from source.
The source can be a group snapshot or a source group. The source can be a group snapshot or a source group.
@ -3468,11 +3499,15 @@ class VolumeManager(manager.CleanableManager,
return group return group
def _create_group_from_src_generic( def _create_group_from_src_generic(
self, context, group, volumes, self,
group_snapshot=None, snapshots=None, context: context.RequestContext,
source_group=None, group: objects.Group,
source_vols=None) -> ty.Tuple[ty.Dict[str, str], volumes: List[objects.Volume],
ty.List[ty.Dict[str, str]]]: group_snapshot: Optional[objects.GroupSnapshot] = None,
snapshots: Optional[List[objects.Snapshot]] = None,
source_group: Optional[objects.Group] = None,
source_vols: Optional[List[objects.Volume]] = None) \
-> Tuple[Dict[str, str], List[Dict[str, str]]]:
"""Creates a group from source. """Creates a group from source.
:param context: the context of the caller. :param context: the context of the caller.
@ -3485,7 +3520,7 @@ class VolumeManager(manager.CleanableManager,
:returns: model_update, volumes_model_update :returns: model_update, volumes_model_update
""" """
model_update = {'status': 'available'} model_update = {'status': 'available'}
volumes_model_update: list = [] volumes_model_update: List[dict] = []
for vol in volumes: for vol in volumes:
if snapshots: if snapshots:
for snapshot in snapshots: for snapshot in snapshots:
@ -3548,7 +3583,9 @@ class VolumeManager(manager.CleanableManager,
return sorted_snapshots return sorted_snapshots
def _sort_source_vols(self, volumes, source_vols) -> list: def _sort_source_vols(self,
volumes,
source_vols: objects.VolumeList) -> list:
# Sort source volumes so that they are in the same order as their # Sort source volumes so that they are in the same order as their
# corresponding target volumes. Each source volume in the source_vols # corresponding target volumes. Each source volume in the source_vols
# list should have a corresponding target volume in the volumes list. # list should have a corresponding target volume in the volumes list.
@ -3572,7 +3609,8 @@ class VolumeManager(manager.CleanableManager,
return sorted_source_vols return sorted_source_vols
def _update_volume_from_src(self, def _update_volume_from_src(self,
context, vol, update, group=None) -> None: context: context.RequestContext,
vol, update, group=None) -> None:
try: try:
snapshot_id = vol.get('snapshot_id') snapshot_id = vol.get('snapshot_id')
source_volid = vol.get('source_volid') source_volid = vol.get('source_volid')
@ -3628,9 +3666,9 @@ class VolumeManager(manager.CleanableManager,
self.db.volume_update(context, vol['id'], update) self.db.volume_update(context, vol['id'], update)
def _update_allocated_capacity(self, def _update_allocated_capacity(self,
vol, vol: objects.Volume,
decrement=False, decrement: bool = False,
host: str = None) -> None: host: Optional[str] = None) -> None:
# Update allocated capacity in volume stats # Update allocated capacity in volume stats
host = host or vol['host'] host = host or vol['host']
pool = volume_utils.extract_host(host, 'pool') pool = volume_utils.extract_host(host, 'pool')
@ -3648,7 +3686,9 @@ class VolumeManager(manager.CleanableManager,
self.stats['pools'][pool] = dict( self.stats['pools'][pool] = dict(
allocated_capacity_gb=max(vol_size, 0)) allocated_capacity_gb=max(vol_size, 0))
def delete_group(self, context, group: objects.Group) -> None: def delete_group(self,
context: context.RequestContext,
group: objects.Group) -> None:
"""Deletes group and the volumes in the group.""" """Deletes group and the volumes in the group."""
context = context.elevated() context = context.elevated()
project_id = group.project_id project_id = group.project_id
@ -3725,6 +3765,7 @@ class VolumeManager(manager.CleanableManager,
vol_obj.save() vol_obj.save()
# Get reservations for group # Get reservations for group
grpreservations: Optional[list]
try: try:
reserve_opts = {'groups': -1} reserve_opts = {'groups': -1}
grpreservations = GROUP_QUOTAS.reserve(context, grpreservations = GROUP_QUOTAS.reserve(context,
@ -3739,6 +3780,7 @@ class VolumeManager(manager.CleanableManager,
for vol in volumes: for vol in volumes:
# Get reservations for volume # Get reservations for volume
reservations: Optional[list]
try: try:
reserve_opts = {'volumes': -1, reserve_opts = {'volumes': -1,
'gigabytes': -vol.size} 'gigabytes': -vol.size}
@ -3779,8 +3821,8 @@ class VolumeManager(manager.CleanableManager,
def _convert_group_to_cg( def _convert_group_to_cg(
self, self,
group: objects.Group, group: objects.Group,
volumes: objects.VolumeList) -> ty.Tuple[objects.Group, volumes: objects.VolumeList) -> Tuple[objects.Group,
objects.VolumeList]: objects.VolumeList]:
if not group: if not group:
return None, None return None, None
cg = consistencygroup.ConsistencyGroup() cg = consistencygroup.ConsistencyGroup()
@ -3791,7 +3833,8 @@ class VolumeManager(manager.CleanableManager,
return cg, volumes return cg, volumes
def _remove_consistencygroup_id_from_volumes(self, volumes) -> None: def _remove_consistencygroup_id_from_volumes(
self, volumes: Optional[List[objects.Volume]]) -> None:
if not volumes: if not volumes:
return return
for vol in volumes: for vol in volumes:
@ -3802,8 +3845,8 @@ class VolumeManager(manager.CleanableManager,
self, self,
group_snapshot: objects.GroupSnapshot, group_snapshot: objects.GroupSnapshot,
snapshots: objects.SnapshotList, snapshots: objects.SnapshotList,
ctxt) -> ty.Tuple[objects.CGSnapshot, ctxt) -> Tuple[objects.CGSnapshot,
objects.SnapshotList]: objects.SnapshotList]:
if not group_snapshot: if not group_snapshot:
return None, None return None, None
cgsnap = cgsnapshot.CGSnapshot() cgsnap = cgsnapshot.CGSnapshot()
@ -3820,21 +3863,27 @@ class VolumeManager(manager.CleanableManager,
return cgsnap, snapshots return cgsnap, snapshots
def _remove_cgsnapshot_id_from_snapshots(self, snapshots) -> None: def _remove_cgsnapshot_id_from_snapshots(
self, snapshots: Optional[list]) -> None:
if not snapshots: if not snapshots:
return return
for snap in snapshots: for snap in snapshots:
snap.cgsnapshot_id = None snap.cgsnapshot_id = None
snap.cgsnapshot = None snap.cgsnapshot = None
def _create_group_generic(self, context, group) -> dict: def _create_group_generic(self,
context: context.RequestContext,
group) -> dict:
"""Creates a group.""" """Creates a group."""
# A group entry is already created in db. Just returns a status here. # A group entry is already created in db. Just returns a status here.
model_update = {'status': fields.GroupStatus.AVAILABLE, model_update = {'status': fields.GroupStatus.AVAILABLE,
'created_at': timeutils.utcnow()} 'created_at': timeutils.utcnow()}
return model_update return model_update
def _delete_group_generic(self, context, group, volumes) -> ty.Tuple: def _delete_group_generic(self,
context: context.RequestContext,
group: objects.Group,
volumes) -> Tuple:
"""Deletes a group and volumes in the group.""" """Deletes a group and volumes in the group."""
model_update = {'status': group.status} model_update = {'status': group.status}
volume_model_updates = [] volume_model_updates = []
@ -3854,9 +3903,9 @@ class VolumeManager(manager.CleanableManager,
return model_update, volume_model_updates return model_update, volume_model_updates
def _update_group_generic( def _update_group_generic(
self, context, group, self, context: context.RequestContext, group,
add_volumes=None, add_volumes=None,
remove_volumes=None) -> ty.Tuple[None, None, None]: remove_volumes=None) -> Tuple[None, None, None]:
"""Updates a group.""" """Updates a group."""
# NOTE(xyang): The volume manager adds/removes the volume to/from the # NOTE(xyang): The volume manager adds/removes the volume to/from the
# group in the database. This default implementation does not do # group in the database. This default implementation does not do
@ -3864,8 +3913,12 @@ class VolumeManager(manager.CleanableManager,
return None, None, None return None, None, None
def _collect_volumes_for_group( def _collect_volumes_for_group(
self, context, group, volumes, add=True) -> list: self,
valid_status: ty.Tuple[str, ...] context: context.RequestContext,
group,
volumes: Optional[str],
add: bool = True) -> list:
valid_status: Tuple[str, ...]
if add: if add:
valid_status = VALID_ADD_VOL_TO_GROUP_STATUS valid_status = VALID_ADD_VOL_TO_GROUP_STATUS
else: else:
@ -3900,8 +3953,11 @@ class VolumeManager(manager.CleanableManager,
volumes_ref.append(add_vol_ref) volumes_ref.append(add_vol_ref)
return volumes_ref return volumes_ref
def update_group(self, context, group, def update_group(self,
add_volumes=None, remove_volumes=None) -> None: context: context.RequestContext,
group,
add_volumes: Optional[str] = None,
remove_volumes: Optional[str] = None) -> None:
"""Updates group. """Updates group.
Update group by adding volumes to the group, Update group by adding volumes to the group,
@ -4002,7 +4058,7 @@ class VolumeManager(manager.CleanableManager,
def create_group_snapshot( def create_group_snapshot(
self, self,
context, context: context.RequestContext,
group_snapshot: objects.GroupSnapshot) -> objects.GroupSnapshot: group_snapshot: objects.GroupSnapshot) -> objects.GroupSnapshot:
"""Creates the group_snapshot.""" """Creates the group_snapshot."""
caller_context = context caller_context = context
@ -4125,8 +4181,10 @@ class VolumeManager(manager.CleanableManager,
return group_snapshot return group_snapshot
def _create_group_snapshot_generic( def _create_group_snapshot_generic(
self, context, group_snapshot, self,
snapshots) -> ty.Tuple[dict, ty.List[dict]]: context: context.RequestContext,
group_snapshot: objects.GroupSnapshot,
snapshots: list) -> Tuple[dict, List[dict]]:
"""Creates a group_snapshot.""" """Creates a group_snapshot."""
model_update = {'status': 'available'} model_update = {'status': 'available'}
snapshot_model_updates = [] snapshot_model_updates = []
@ -4148,9 +4206,11 @@ class VolumeManager(manager.CleanableManager,
return model_update, snapshot_model_updates return model_update, snapshot_model_updates
def _delete_group_snapshot_generic(self, context, group_snapshot, def _delete_group_snapshot_generic(
snapshots) -> ty.Tuple[dict, self,
ty.List[dict]]: context: context.RequestContext,
group_snapshot: objects.GroupSnapshot,
snapshots: list) -> Tuple[dict, List[dict]]:
"""Deletes a group_snapshot.""" """Deletes a group_snapshot."""
model_update = {'status': group_snapshot.status} model_update = {'status': group_snapshot.status}
snapshot_model_updates = [] snapshot_model_updates = []
@ -4171,7 +4231,9 @@ class VolumeManager(manager.CleanableManager,
return model_update, snapshot_model_updates return model_update, snapshot_model_updates
def delete_group_snapshot(self, context, group_snapshot) -> None: def delete_group_snapshot(self,
context: context.RequestContext,
group_snapshot: objects.GroupSnapshot) -> None:
"""Deletes group_snapshot.""" """Deletes group_snapshot."""
caller_context = context caller_context = context
context = context.elevated() context = context.elevated()
@ -4260,6 +4322,7 @@ class VolumeManager(manager.CleanableManager,
for snapshot in snapshots: for snapshot in snapshots:
# Get reservations # Get reservations
reservations: Optional[list]
try: try:
reserve_opts = {'snapshots': -1} reserve_opts = {'snapshots': -1}
if not CONF.no_snapshot_gb_quota: if not CONF.no_snapshot_gb_quota:
@ -4292,7 +4355,11 @@ class VolumeManager(manager.CleanableManager,
"delete.end", "delete.end",
snapshots) snapshots)
def update_migrated_volume(self, ctxt, volume, new_volume, volume_status): def update_migrated_volume(self,
ctxt: context.RequestContext,
volume: objects.Volume,
new_volume: objects.Volume,
volume_status) -> None:
"""Finalize migration process on backend device.""" """Finalize migration process on backend device."""
model_update = None model_update = None
model_update_default = {'_name_id': new_volume.name_id, model_update_default = {'_name_id': new_volume.name_id,
@ -4499,7 +4566,9 @@ class VolumeManager(manager.CleanableManager,
# TODO(geguileo): In P - remove this # TODO(geguileo): In P - remove this
failover_host = failover failover_host = failover
def finish_failover(self, context, service, updates) -> None: def finish_failover(self,
context: context.RequestContext,
service, updates) -> None:
"""Completion of the failover locally or via RPC.""" """Completion of the failover locally or via RPC."""
# If the service is clustered, broadcast the service changes to all # If the service is clustered, broadcast the service changes to all
# volume services, including this one. # volume services, including this one.
@ -4516,7 +4585,9 @@ class VolumeManager(manager.CleanableManager,
service.update(updates) service.update(updates)
service.save() service.save()
def failover_completed(self, context, updates) -> None: def failover_completed(self,
context: context.RequestContext,
updates) -> None:
"""Finalize failover of this backend. """Finalize failover of this backend.
When a service is clustered and replicated the failover has 2 stages, When a service is clustered and replicated the failover has 2 stages,
@ -4541,7 +4612,7 @@ class VolumeManager(manager.CleanableManager,
fields.ReplicationStatus.ERROR) fields.ReplicationStatus.ERROR)
service.save() service.save()
def freeze_host(self, context) -> bool: def freeze_host(self, context: context.RequestContext) -> bool:
"""Freeze management plane on this backend. """Freeze management plane on this backend.
Basically puts the control/management plane into a Basically puts the control/management plane into a
@ -4571,7 +4642,7 @@ class VolumeManager(manager.CleanableManager,
LOG.info("Set backend status to frozen successfully.") LOG.info("Set backend status to frozen successfully.")
return True return True
def thaw_host(self, context) -> bool: def thaw_host(self, context: context.RequestContext) -> bool:
"""UnFreeze management plane on this backend. """UnFreeze management plane on this backend.
Basically puts the control/management plane back into Basically puts the control/management plane back into
@ -4601,8 +4672,8 @@ class VolumeManager(manager.CleanableManager,
return True return True
def manage_existing_snapshot(self, def manage_existing_snapshot(self,
ctxt, ctxt: context.RequestContext,
snapshot, snapshot: objects.Snapshot,
ref=None) -> ovo_fields.UUIDField: ref=None) -> ovo_fields.UUIDField:
LOG.debug('manage_existing_snapshot: managing %s.', ref) LOG.debug('manage_existing_snapshot: managing %s.', ref)
try: try:
@ -4625,7 +4696,11 @@ class VolumeManager(manager.CleanableManager,
flow_engine.run() flow_engine.run()
return snapshot.id return snapshot.id
def get_manageable_snapshots(self, ctxt, marker, limit, offset, def get_manageable_snapshots(self,
ctxt: context.RequestContext,
marker,
limit: Optional[int],
offset: Optional[int],
sort_keys, sort_dirs, want_objects=False): sort_keys, sort_dirs, want_objects=False):
try: try:
volume_utils.require_driver_initialized(self.driver) volume_utils.require_driver_initialized(self.driver)
@ -4650,7 +4725,9 @@ class VolumeManager(manager.CleanableManager,
"to driver error.") "to driver error.")
return driver_entries return driver_entries
def get_capabilities(self, context, discover): def get_capabilities(self,
context: context.RequestContext,
discover: bool):
"""Get capabilities of backend storage.""" """Get capabilities of backend storage."""
if discover: if discover:
self.driver.init_capabilities() self.driver.init_capabilities()
@ -4658,7 +4735,10 @@ class VolumeManager(manager.CleanableManager,
LOG.debug("Obtained capabilities list: %s.", capabilities) LOG.debug("Obtained capabilities list: %s.", capabilities)
return capabilities return capabilities
def get_backup_device(self, ctxt, backup, want_objects=False): def get_backup_device(self,
ctxt: context.RequestContext,
backup: objects.Backup,
want_objects: bool = False):
(backup_device, is_snapshot) = ( (backup_device, is_snapshot) = (
self.driver.get_backup_device(ctxt, backup)) self.driver.get_backup_device(ctxt, backup))
secure_enabled = self.driver.secure_file_operations_enabled() secure_enabled = self.driver.secure_file_operations_enabled()
@ -4671,17 +4751,18 @@ class VolumeManager(manager.CleanableManager,
ctxt) ctxt)
if want_objects else backup_device_dict) if want_objects else backup_device_dict)
def secure_file_operations_enabled(self, def secure_file_operations_enabled(
ctxt: context.RequestContext, self,
volume): ctxt: context.RequestContext,
volume: Optional[objects.Volume]) -> bool:
secure_enabled = self.driver.secure_file_operations_enabled() secure_enabled = self.driver.secure_file_operations_enabled()
return secure_enabled return secure_enabled
def _connection_create(self, def _connection_create(self,
ctxt: context.RequestContext, ctxt: context.RequestContext,
volume, volume: objects.Volume,
attachment, attachment: objects.VolumeAttachment,
connector) -> dict: connector: dict) -> Dict[str, Any]:
try: try:
self.driver.validate_connector(connector) self.driver.validate_connector(connector)
except exception.InvalidConnectorException as err: except exception.InvalidConnectorException as err:
@ -4734,9 +4815,9 @@ class VolumeManager(manager.CleanableManager,
def attachment_update(self, def attachment_update(self,
context: context.RequestContext, context: context.RequestContext,
vref, vref: objects.Volume,
connector: dict, connector: dict,
attachment_id: str) -> dict: attachment_id: str) -> Dict[str, Any]:
"""Update/Finalize an attachment. """Update/Finalize an attachment.
This call updates a valid attachment record to associate with a volume This call updates a valid attachment record to associate with a volume
@ -4803,7 +4884,7 @@ class VolumeManager(manager.CleanableManager,
context: context.RequestContext, context: context.RequestContext,
volume, volume,
attachment, attachment,
force: bool = False) -> ty.Union[None, bool]: force: bool = False) -> Optional[bool]:
"""Remove a volume connection, but leave attachment. """Remove a volume connection, but leave attachment.
Exits early if the attachment does not have a connector and returns Exits early if the attachment does not have a connector and returns
@ -4845,8 +4926,8 @@ class VolumeManager(manager.CleanableManager,
def attachment_delete(self, def attachment_delete(self,
context: context.RequestContext, context: context.RequestContext,
attachment_id, attachment_id: str,
vref) -> None: vref: objects.Volume) -> None:
"""Delete/Detach the specified attachment. """Delete/Detach the specified attachment.
Notifies the backend device that we're detaching the specified Notifies the backend device that we're detaching the specified
@ -4972,7 +5053,9 @@ class VolumeManager(manager.CleanableManager,
'id': group.id}) 'id': group.id})
# Replication group API (Tiramisu) # Replication group API (Tiramisu)
def disable_replication(self, ctxt: context.RequestContext, group) -> None: def disable_replication(self,
ctxt: context.RequestContext,
group: objects.Group) -> None:
"""Disable replication.""" """Disable replication."""
group.refresh() group.refresh()
if group.replication_status != fields.ReplicationStatus.DISABLING: if group.replication_status != fields.ReplicationStatus.DISABLING:
@ -5057,7 +5140,8 @@ class VolumeManager(manager.CleanableManager,
# Replication group API (Tiramisu) # Replication group API (Tiramisu)
def failover_replication(self, ctxt: context.RequestContext, def failover_replication(self, ctxt: context.RequestContext,
group, allow_attached_volume=False, group: objects.Group,
allow_attached_volume: bool = False,
secondary_backend_id=None) -> None: secondary_backend_id=None) -> None:
"""Failover replication.""" """Failover replication."""
group.refresh() group.refresh()
@ -5154,7 +5238,9 @@ class VolumeManager(manager.CleanableManager,
resource={'type': 'group', resource={'type': 'group',
'id': group.id}) 'id': group.id})
def list_replication_targets(self, ctxt, group) -> ty.Dict[str, list]: def list_replication_targets(self,
ctxt: context.RequestContext,
group: objects.Group) -> Dict[str, list]:
"""Provide a means to obtain replication targets for a group. """Provide a means to obtain replication targets for a group.
This method is used to find the replication_device config This method is used to find the replication_device config

View File

@ -12,8 +12,10 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from typing import Optional, Tuple, Union # noqa: H301
from cinder.common import constants from cinder.common import constants
from cinder import context
from cinder import objects from cinder import objects
from cinder import quota from cinder import quota
from cinder import rpc from cinder import rpc
@ -141,7 +143,10 @@ class VolumeAPI(rpc.RPCAPI):
TOPIC = constants.VOLUME_TOPIC TOPIC = constants.VOLUME_TOPIC
BINARY = constants.VOLUME_BINARY BINARY = constants.VOLUME_BINARY
def _get_cctxt(self, host=None, version=None, **kwargs): def _get_cctxt(self,
host: str = None,
version: Union[str, Tuple[str, ...]] = None,
**kwargs) -> rpc.RPCAPI:
if host: if host:
server = volume_utils.extract_host(host) server = volume_utils.extract_host(host)
@ -158,8 +163,12 @@ class VolumeAPI(rpc.RPCAPI):
return super(VolumeAPI, self)._get_cctxt(version=version, **kwargs) return super(VolumeAPI, self)._get_cctxt(version=version, **kwargs)
def create_volume(self, ctxt, volume, request_spec, filter_properties, def create_volume(self,
allow_reschedule=True): ctxt: context.RequestContext,
volume: 'objects.Volume',
request_spec: Optional[dict],
filter_properties: Optional[dict],
allow_reschedule: bool = True) -> None:
cctxt = self._get_cctxt(volume.service_topic_queue) cctxt = self._get_cctxt(volume.service_topic_queue)
cctxt.cast(ctxt, 'create_volume', cctxt.cast(ctxt, 'create_volume',
request_spec=request_spec, request_spec=request_spec,
@ -174,7 +183,11 @@ class VolumeAPI(rpc.RPCAPI):
cctxt.cast(ctxt, 'revert_to_snapshot', volume=volume, cctxt.cast(ctxt, 'revert_to_snapshot', volume=volume,
snapshot=snapshot) snapshot=snapshot)
def delete_volume(self, ctxt, volume, unmanage_only=False, cascade=False): def delete_volume(self,
ctxt: context.RequestContext,
volume: 'objects.Volume',
unmanage_only: bool = False,
cascade: bool = False) -> None:
volume.create_worker() volume.create_worker()
cctxt = self._get_cctxt(volume.service_topic_queue) cctxt = self._get_cctxt(volume.service_topic_queue)
msg_args = { msg_args = {
@ -184,7 +197,10 @@ class VolumeAPI(rpc.RPCAPI):
cctxt.cast(ctxt, 'delete_volume', **msg_args) cctxt.cast(ctxt, 'delete_volume', **msg_args)
def create_snapshot(self, ctxt, volume, snapshot): def create_snapshot(self,
ctxt: context.RequestContext,
volume: 'objects.Volume',
snapshot: 'objects.Snapshot') -> None:
snapshot.create_worker() snapshot.create_worker()
cctxt = self._get_cctxt(volume.service_topic_queue) cctxt = self._get_cctxt(volume.service_topic_queue)
cctxt.cast(ctxt, 'create_snapshot', snapshot=snapshot) cctxt.cast(ctxt, 'create_snapshot', snapshot=snapshot)
@ -393,7 +409,9 @@ class VolumeAPI(rpc.RPCAPI):
return cctxt.call(ctxt, 'get_manageable_snapshots', **msg_args) return cctxt.call(ctxt, 'get_manageable_snapshots', **msg_args)
def create_group(self, ctxt, group): def create_group(self,
ctxt: context.RequestContext,
group: 'objects.Group') -> None:
cctxt = self._get_cctxt(group.service_topic_queue) cctxt = self._get_cctxt(group.service_topic_queue)
cctxt.cast(ctxt, 'create_group', group=group) cctxt.cast(ctxt, 'create_group', group=group)

View File

@ -1,3 +1,5 @@
cinder/backup/manager.py
cinder/common/constants.py
cinder/context.py cinder/context.py
cinder/i18n.py cinder/i18n.py
cinder/image/cache.py cinder/image/cache.py
@ -5,10 +7,12 @@ cinder/image/glance.py
cinder/image/image_utils.py cinder/image/image_utils.py
cinder/exception.py cinder/exception.py
cinder/manager.py cinder/manager.py
cinder/scheduler/manager.py
cinder/utils.py cinder/utils.py
cinder/volume/__init__.py cinder/volume/__init__.py
cinder/volume/flows/api/create_volume.py cinder/volume/flows/api/create_volume.py
cinder/volume/flows/manager/create_volume.py cinder/volume/flows/manager/create_volume.py
cinder/volume/manager.py cinder/volume/manager.py
cinder/volume/rpcapi.py
cinder/volume/volume_types.py cinder/volume/volume_types.py
cinder/volume/volume_utils.py cinder/volume/volume_utils.py