Merge "mypy: continued manager, scheduler, rpcapi"

This commit is contained in:
Zuul 2021-08-20 17:17:51 +00:00 committed by Gerrit Code Review
commit 0e87d12d37
8 changed files with 317 additions and 159 deletions

View File

@ -18,6 +18,7 @@
"""RequestContext: context for requests that persist through all of cinder."""
import copy
from typing import Any, Dict, Optional # noqa: H301
from keystoneauth1.access import service_catalog as ksa_service_catalog
from keystoneauth1 import plugin
@ -79,10 +80,18 @@ class RequestContext(context.RequestContext):
Represents the user taking a given action within the system.
"""
def __init__(self, user_id=None, project_id=None, is_admin=None,
read_deleted="no", project_name=None, remote_address=None,
timestamp=None, quota_class=None, service_catalog=None,
user_auth_plugin=None, **kwargs):
def __init__(self,
user_id: Optional[str] = None,
project_id: Optional[str] = None,
is_admin: Optional[bool] = None,
read_deleted: Optional[str] = "no",
project_name: Optional[str] = None,
remote_address: Optional[str] = None,
timestamp=None,
quota_class=None,
service_catalog: Optional[dict] = None,
user_auth_plugin=None,
**kwargs):
"""Initialize RequestContext.
:param read_deleted: 'no' indicates deleted records are hidden, 'yes'
@ -122,7 +131,8 @@ class RequestContext(context.RequestContext):
# We need to have RequestContext attributes defined
# when policy.check_is_admin invokes request logging
# to make it loggable.
if self.is_admin is None: # type: ignore
self.is_admin: Optional[bool]
if self.is_admin is None:
self.is_admin = policy.check_is_admin(self)
elif self.is_admin and 'admin' not in self.roles:
self.roles.append('admin')
@ -134,22 +144,22 @@ class RequestContext(context.RequestContext):
else:
return _ContextAuthPlugin(self.auth_token, self.service_catalog)
def _get_read_deleted(self):
def _get_read_deleted(self) -> str:
return self._read_deleted
def _set_read_deleted(self, read_deleted):
def _set_read_deleted(self, read_deleted: str) -> None:
if read_deleted not in ('no', 'yes', 'only'):
raise ValueError(_("read_deleted can only be one of 'no', "
"'yes' or 'only', not %r") % read_deleted)
self._read_deleted = read_deleted
def _del_read_deleted(self):
def _del_read_deleted(self) -> None:
del self._read_deleted
read_deleted = property(_get_read_deleted, _set_read_deleted,
_del_read_deleted)
def to_dict(self):
def to_dict(self) -> Dict[str, Any]:
result = super(RequestContext, self).to_dict()
result['user_id'] = self.user_id
result['project_id'] = self.project_id
@ -164,7 +174,7 @@ class RequestContext(context.RequestContext):
return result
@classmethod
def from_dict(cls, values):
def from_dict(cls, values: dict) -> 'RequestContext':
return cls(user_id=values.get('user_id'),
project_id=values.get('project_id'),
project_name=values.get('project_name'),
@ -183,7 +193,11 @@ class RequestContext(context.RequestContext):
project_domain_id=values.get('project_domain_id'),
)
def authorize(self, action, target=None, target_obj=None, fatal=True):
def authorize(self,
action: str,
target: Optional[dict] = None,
target_obj: Optional[dict] = None,
fatal: bool = True):
"""Verify that the given action is valid on the target in this context.
:param action: string representing the action to be checked.
@ -216,14 +230,16 @@ class RequestContext(context.RequestContext):
return policy.authorize(self, action, target, do_raise=fatal,
exc=exception.PolicyNotAuthorized)
def to_policy_values(self):
def to_policy_values(self) -> dict:
policy = super(RequestContext, self).to_policy_values()
policy['is_admin'] = self.is_admin
return policy
def elevated(self, read_deleted=None, overwrite=False):
def elevated(self,
read_deleted: Optional[str] = None,
overwrite: bool = False) -> 'RequestContext':
"""Return a version of this context with admin flag set."""
context = self.deepcopy()
context.is_admin = True
@ -236,11 +252,11 @@ class RequestContext(context.RequestContext):
return context
def deepcopy(self):
def deepcopy(self) -> 'RequestContext':
return copy.deepcopy(self)
def get_admin_context(read_deleted="no"):
def get_admin_context(read_deleted: Optional[str] = "no") -> RequestContext:
return RequestContext(user_id=None,
project_id=None,
is_admin=True,
@ -248,7 +264,7 @@ def get_admin_context(read_deleted="no"):
overwrite=False)
def get_internal_tenant_context():
def get_internal_tenant_context() -> Optional[RequestContext]:
"""Build and return the Cinder internal tenant context object
This request context will only work for internal Cinder operations. It will

View File

@ -106,7 +106,7 @@ class Manager(base.Base, PeriodicTasks):
def service_topic_queue(self):
return self.cluster or self.host
def init_host(self, service_id=None, added_to_cluster=None):
def init_host(self, service_id, added_to_cluster=None):
"""Handle initialization if this is a standalone service.
A hook point for services to execute tasks before the services are made
@ -222,7 +222,9 @@ class SchedulerDependentManager(ThreadPoolManager):
class CleanableManager(object):
def do_cleanup(self, context, cleanup_request) -> None:
def do_cleanup(self,
context: context.RequestContext,
cleanup_request: objects.CleanupRequest) -> None:
LOG.info('Initiating service %s cleanup',
cleanup_request.service_id)
@ -305,10 +307,10 @@ class CleanableManager(object):
LOG.info('Service %s cleanup completed.', cleanup_request.service_id)
def _do_cleanup(self, ctxt, vo_resource) -> bool:
def _do_cleanup(self, ctxt: context.RequestContext, vo_resource) -> bool:
return False
def init_host(self, service_id, **kwargs) -> None:
def init_host(self, service_id, added_to_cluster=None, **kwargs):
ctxt = context.get_admin_context()
self.service_id = service_id
# TODO(geguileo): Once we don't support MySQL 5.5 anymore we can remove

View File

@ -26,6 +26,7 @@ __all__ = [
]
import functools
from typing import Tuple, Union # noqa: H301
from oslo_config import cfg
from oslo_log import log as logging
@ -53,7 +54,7 @@ ALLOWED_EXMODS = [
EXTRA_EXMODS = []
def init(conf):
def init(conf) -> None:
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
exmods = get_allowed_exmods()
TRANSPORT = messaging.get_rpc_transport(conf,
@ -73,7 +74,7 @@ def init(conf):
NOTIFIER = utils.DO_NOTHING
def initialized():
def initialized() -> bool:
return None not in [TRANSPORT, NOTIFIER]
@ -139,7 +140,9 @@ class RequestContextSerializer(messaging.Serializer):
return cinder.context.RequestContext.from_dict(context)
def get_client(target, version_cap=None, serializer=None):
def get_client(target,
version_cap=None,
serializer=None) -> messaging.RPCClient:
if TRANSPORT is None:
raise AssertionError('RPC transport is not initialized.')
serializer = RequestContextSerializer(serializer)
@ -149,7 +152,9 @@ def get_client(target, version_cap=None, serializer=None):
serializer=serializer)
def get_server(target, endpoints, serializer=None):
def get_server(target,
endpoints,
serializer=None) -> messaging.rpc.server.RPCServer:
if TRANSPORT is None:
raise AssertionError('RPC transport is not initialized.')
serializer = RequestContextSerializer(serializer)
@ -163,7 +168,9 @@ def get_server(target, endpoints, serializer=None):
@utils.if_notifications_enabled
def get_notifier(service=None, host=None, publisher_id=None):
def get_notifier(service: str = None,
host: str = None,
publisher_id: str = None) -> messaging.Notifier:
if NOTIFIER is None:
raise AssertionError('RPC Notifier is not initialized.')
if not publisher_id:
@ -222,7 +229,9 @@ class RPCAPI(object):
return version
return versions[-1]
def _get_cctxt(self, version=None, **kwargs):
def _get_cctxt(self,
version: Union[str, Tuple[str, ...]] = None,
**kwargs):
"""Prepare client context
Version parameter accepts single version string or tuple of strings.

View File

@ -24,6 +24,7 @@ from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from cinder import context
from cinder import exception
from cinder.i18n import _
from cinder.scheduler import driver
@ -46,7 +47,9 @@ class FilterScheduler(driver.Scheduler):
"""Fetch options dictionary. Broken out for testing."""
return self.options.get_configuration()
def populate_filter_properties(self, request_spec, filter_properties):
def populate_filter_properties(self,
request_spec: dict,
filter_properties: dict) -> None:
"""Stuff things into filter_properties.
Can be overridden in a subclass to add more data.
@ -58,11 +61,13 @@ class FilterScheduler(driver.Scheduler):
filter_properties['metadata'] = vol.get('metadata')
filter_properties['qos_specs'] = vol.get('qos_specs')
def schedule_create_group(self, context, group,
def schedule_create_group(self,
context: context.RequestContext,
group,
group_spec,
request_spec_list,
group_filter_properties,
filter_properties_list):
filter_properties_list) -> None:
weighed_backend = self._schedule_generic_group(
context,
group_spec,
@ -82,7 +87,10 @@ class FilterScheduler(driver.Scheduler):
self.volume_rpcapi.create_group(context, updated_group)
def schedule_create_volume(self, context, request_spec, filter_properties):
def schedule_create_volume(self,
context: context.RequestContext,
request_spec: dict,
filter_properties: dict) -> None:
backend = self._schedule(context, request_spec, filter_properties)
if not backend:
@ -107,8 +115,11 @@ class FilterScheduler(driver.Scheduler):
filter_properties,
allow_reschedule=True)
def backend_passes_filters(self, context, backend, request_spec,
filter_properties):
def backend_passes_filters(self,
context: context.RequestContext,
backend,
request_spec: dict,
filter_properties: dict):
"""Check if the specified backend passes the filters."""
weighed_backends = self._get_weighted_candidates(context, request_spec,
filter_properties)
@ -132,8 +143,11 @@ class FilterScheduler(driver.Scheduler):
raise exception.NoValidBackend(_('Cannot place %(resource)s %(id)s '
'on %(backend)s.') % reason_param)
def find_retype_backend(self, context, request_spec,
filter_properties=None, migration_policy='never'):
def find_retype_backend(self,
context: context.RequestContext,
request_spec: dict,
filter_properties: dict = None,
migration_policy: str = 'never'):
"""Find a backend that can accept the volume with its new type."""
filter_properties = filter_properties or {}
backend = (request_spec['volume_properties'].get('cluster_name')
@ -186,8 +200,8 @@ class FilterScheduler(driver.Scheduler):
def get_pools(self, context, filters):
return self.host_manager.get_pools(context, filters)
def _post_select_populate_filter_properties(self, filter_properties,
backend_state):
def _post_select_populate_filter_properties(self, filter_properties: dict,
backend_state) -> None:
"""Populate filter properties with additional information.
Add additional information to the filter properties after a backend has
@ -196,7 +210,7 @@ class FilterScheduler(driver.Scheduler):
# Add a retry entry for the selected volume backend:
self._add_retry_backend(filter_properties, backend_state.backend_id)
def _add_retry_backend(self, filter_properties, backend):
def _add_retry_backend(self, filter_properties: dict, backend) -> None:
"""Add a retry entry for the selected volume backend.
In the event that the request gets re-scheduled, this entry will signal
@ -211,7 +225,7 @@ class FilterScheduler(driver.Scheduler):
if backends is not None:
backends.append(backend)
def _max_attempts(self):
def _max_attempts(self) -> int:
max_attempts = CONF.scheduler_max_attempts
if max_attempts < 1:
raise exception.InvalidParameterValue(
@ -271,8 +285,10 @@ class FilterScheduler(driver.Scheduler):
{'max_attempts': max_attempts,
'resource_id': resource_id})
def _get_weighted_candidates(self, context, request_spec,
filter_properties=None):
def _get_weighted_candidates(self,
context: context.RequestContext,
request_spec: dict,
filter_properties: dict = None) -> list:
"""Return a list of backends that meet required specs.
Returned list is ordered by their fitness.
@ -351,7 +367,7 @@ class FilterScheduler(driver.Scheduler):
def _get_weighted_candidates_generic_group(
self, context, group_spec, request_spec_list,
group_filter_properties=None,
filter_properties_list=None):
filter_properties_list=None) -> list:
"""Finds backends that supports the group.
Returns a list of backends that meet the required specs,
@ -443,7 +459,8 @@ class FilterScheduler(driver.Scheduler):
return weighed_backends
def _find_valid_backends(self, backend_list1, backend_list2):
def _find_valid_backends(self,
backend_list1: list, backend_list2: list) -> list:
new_backends = []
for backend1 in backend_list1:
for backend2 in backend_list2:
@ -458,7 +475,7 @@ class FilterScheduler(driver.Scheduler):
def _get_weighted_candidates_by_group_type(
self, context, group_spec,
group_filter_properties=None):
group_filter_properties=None) -> list:
"""Finds backends that supports the group type.
Returns a list of backends that meet the required specs,
@ -559,7 +576,7 @@ class FilterScheduler(driver.Scheduler):
return None
return self._choose_top_backend_generic_group(weighed_backends)
def _choose_top_backend(self, weighed_backends, request_spec):
def _choose_top_backend(self, weighed_backends: list, request_spec: dict):
top_backend = weighed_backends[0]
backend_state = top_backend.obj
LOG.debug("Choosing %s", backend_state.backend_id)

View File

@ -253,7 +253,9 @@ class SchedulerManager(manager.CleanableManager, manager.Manager):
volume_rpcapi.VolumeAPI().create_snapshot(ctxt, volume,
snapshot)
def _do_cleanup(self, ctxt, vo_resource):
def _do_cleanup(self,
ctxt: context.RequestContext,
vo_resource: 'objects.base.CinderObject'):
# We can only receive cleanup requests for volumes, but we check anyway
# We need to cleanup the volume status for cases where the scheduler
# died while scheduling the volume creation.
@ -262,7 +264,8 @@ class SchedulerManager(manager.CleanableManager, manager.Manager):
vo_resource.status = 'error'
vo_resource.save()
def request_service_capabilities(self, context):
def request_service_capabilities(self,
context: context.RequestContext) -> None:
volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
try:
self.backup_api.publish_service_capabilities(context)
@ -275,8 +278,11 @@ class SchedulerManager(manager.CleanableManager, manager.Manager):
LOG.warning(msg, {'host': self.host, 'e': e})
@append_operation_type()
def migrate_volume(self, context, volume, backend, force_copy,
request_spec, filter_properties):
def migrate_volume(self,
context: context.RequestContext,
volume: objects.Volume,
backend: str, force_copy: bool,
request_spec, filter_properties) -> None:
"""Ensure that the backend exists and can accept the volume."""
self._wait_for_scheduler()
@ -597,7 +603,7 @@ class SchedulerManager(manager.CleanableManager, manager.Manager):
not_requested = []
# To reduce DB queries we'll cache the clusters data
clusters = collections.defaultdict(dict)
clusters: collections.defaultdict = collections.defaultdict(dict)
for service in services:
cleanup_request.cluster_name = service.cluster_name

View File

@ -37,8 +37,8 @@ intact.
import functools
import time
import typing as ty
from typing import Optional
import typing
from typing import Any, Dict, List, Optional, Set, Tuple, Union # noqa: H301
from castellan import key_manager
from oslo_config import cfg
@ -238,14 +238,14 @@ class VolumeManager(manager.CleanableManager,
'consistencygroup', 'volume_attachment', 'group', 'snapshots'}
def _get_service(self,
host: str = None,
host: Optional[str] = None,
binary: str = constants.VOLUME_BINARY) -> objects.Service:
host = host or self.host
ctxt = context.get_admin_context()
svc_host = volume_utils.extract_host(host, 'backend')
return objects.Service.get_by_args(ctxt, svc_host, binary)
def __init__(self, volume_driver=None, service_name: str = None,
def __init__(self, volume_driver=None, service_name: Optional[str] = None,
*args, **kwargs):
"""Load the driver from the one specified in args, or from flags."""
# update_service_capabilities needs service_name to be volume
@ -262,6 +262,7 @@ class VolumeManager(manager.CleanableManager,
self.service_uuid = None
self.cluster: str
self.host: str
self.image_volume_cache: Optional[image_cache.ImageVolumeCache]
if not volume_driver:
@ -424,7 +425,7 @@ class VolumeManager(manager.CleanableManager,
updates, snapshot_updates = self.driver.update_provider_info(
volumes, snapshots)
update: ty.Any
update: Any
if updates:
for volume in volumes:
# NOTE(JDG): Make sure returned item is in this hosts volumes
@ -533,7 +534,7 @@ class VolumeManager(manager.CleanableManager,
num_vols: int = 0
num_snaps: int = 0
max_objs_num: int = 0
req_range: ty.Union[ty.List[int], range] = [0]
req_range: Union[List[int], range] = [0]
req_limit = CONF.init_host_max_objects_retrieval or 0
use_batch_objects_retrieval: bool = req_limit > 0
@ -544,7 +545,7 @@ class VolumeManager(manager.CleanableManager,
num_snaps, __ = self._get_my_snapshots_summary(ctxt)
# Calculate highest number of the objects (volumes or snapshots)
max_objs_num = max(num_vols, num_snaps)
max_objs_num = ty.cast(int, max_objs_num)
max_objs_num = typing.cast(int, max_objs_num)
# Make batch request loop counter
req_range = range(0, max_objs_num, req_limit)
@ -679,7 +680,9 @@ class VolumeManager(manager.CleanableManager,
resource={'type': 'driver',
'id': self.driver.__class__.__name__})
def _do_cleanup(self, ctxt, vo_resource) -> bool:
def _do_cleanup(self,
ctxt: context.RequestContext,
vo_resource: 'objects.base.CinderObject') -> bool:
if isinstance(vo_resource, objects.Volume):
if vo_resource.status == 'downloading':
self.driver.clear_download(ctxt, vo_resource)
@ -721,7 +724,8 @@ class VolumeManager(manager.CleanableManager,
"""
return self.driver.initialized
def _set_resource_host(self, resource) -> None:
def _set_resource_host(self, resource: Union[objects.Volume,
objects.Group]) -> None:
"""Set the host field on the DB to our own when we are clustered."""
if (resource.is_clustered and
not volume_utils.hosts_are_equivalent(resource.host,
@ -779,7 +783,7 @@ class VolumeManager(manager.CleanableManager,
snapshot_id = request_spec.get('snapshot_id')
source_volid = request_spec.get('source_volid')
locked_action: ty.Optional[str]
locked_action: Optional[str]
if snapshot_id is not None:
# Make sure the snapshot is not deleted until we are done with it.
locked_action = "%s-%s" % (snapshot_id, 'delete_snapshot')
@ -877,7 +881,7 @@ class VolumeManager(manager.CleanableManager,
context: context.RequestContext,
volume: objects.volume.Volume,
unmanage_only=False,
cascade=False):
cascade=False) -> Optional[bool]:
"""Deletes and unexports volume.
1. Delete a volume(normal case)
@ -900,7 +904,7 @@ class VolumeManager(manager.CleanableManager,
# NOTE(thingee): It could be possible for a volume to
# be deleted when resuming deletes from init_host().
LOG.debug("Attempted delete of non-existent volume: %s", volume.id)
return
return None
if context.project_id != volume.project_id:
project_id = volume.project_id
@ -1031,6 +1035,7 @@ class VolumeManager(manager.CleanableManager,
if unmanage_only:
msg = "Unmanaged volume successfully."
LOG.info(msg, resource=volume)
return None
def _clear_db(self, is_migrating_dest, volume_ref, status) -> None:
# This method is called when driver.unmanage() or
@ -1279,7 +1284,7 @@ class VolumeManager(manager.CleanableManager,
context: context.RequestContext,
snapshot: objects.Snapshot,
unmanage_only: bool = False,
handle_quota: bool = True):
handle_quota: bool = True) -> Optional[bool]:
"""Deletes and unexports snapshot."""
context = context.elevated()
snapshot._context = context
@ -1358,6 +1363,7 @@ class VolumeManager(manager.CleanableManager,
if unmanage_only:
msg = "Unmanage snapshot completed successfully."
LOG.info(msg, resource=snapshot)
return None
@coordination.synchronized('{volume_id}')
def attach_volume(self, context, volume_id, instance_uuid, host_name,
@ -1594,8 +1600,7 @@ class VolumeManager(manager.CleanableManager,
def _clone_image_volume(self,
ctx: context.RequestContext,
volume,
image_meta: dict) -> ty.Union[None,
objects.Volume]:
image_meta: dict) -> Optional[objects.Volume]:
# TODO: should this return None?
volume_type_id: str = volume.get('volume_type_id')
reserve_opts: dict = {'volumes': 1, 'gigabytes': volume.size}
@ -1603,7 +1608,7 @@ class VolumeManager(manager.CleanableManager,
reservations = QUOTAS.reserve(ctx, **reserve_opts)
# NOTE(yikun): Skip 'snapshot_id', 'source_volid' keys to avoid
# creating tmp img vol from wrong snapshot or wrong source vol.
skip: ty.Set[str] = {'snapshot_id', 'source_volid'}
skip: Set[str] = {'snapshot_id', 'source_volid'}
skip.update(self._VOLUME_CLONE_SKIP_PROPERTIES)
try:
new_vol_values = {k: volume[k] for k in set(volume.keys()) - skip}
@ -2188,7 +2193,7 @@ class VolumeManager(manager.CleanableManager,
self._detach_volume(ctxt, attach_info, volume, properties,
force=True, remote=remote)
attach_info = ty.cast(dict, attach_info)
attach_info = typing.cast(dict, attach_info)
return attach_info
def _detach_volume(self, ctxt, attach_info, volume, properties,
@ -2829,26 +2834,28 @@ class VolumeManager(manager.CleanableManager,
def _notify_about_volume_usage(self,
context: context.RequestContext,
volume,
event_suffix,
extra_usage_info=None) -> None:
volume: objects.Volume,
event_suffix: str,
extra_usage_info: Optional[dict] = None) \
-> None:
volume_utils.notify_about_volume_usage(
context, volume, event_suffix,
extra_usage_info=extra_usage_info, host=self.host)
def _notify_about_snapshot_usage(self,
context,
snapshot,
event_suffix,
extra_usage_info=None) -> None:
context: context.RequestContext,
snapshot: objects.Snapshot,
event_suffix: str,
extra_usage_info: Optional[dict] = None) \
-> None:
volume_utils.notify_about_snapshot_usage(
context, snapshot, event_suffix,
extra_usage_info=extra_usage_info, host=self.host)
def _notify_about_group_usage(self,
context,
group,
event_suffix,
context: context.RequestContext,
group: objects.Group,
event_suffix: str,
volumes=None,
extra_usage_info=None) -> None:
volume_utils.notify_about_group_usage(
@ -2864,12 +2871,13 @@ class VolumeManager(manager.CleanableManager,
context, volume, event_suffix,
extra_usage_info=extra_usage_info, host=self.host)
def _notify_about_group_snapshot_usage(self,
context,
group_snapshot,
event_suffix,
snapshots=None,
extra_usage_info=None) -> None:
def _notify_about_group_snapshot_usage(
self,
context: context.RequestContext,
group_snapshot: objects.GroupSnapshot,
event_suffix: str,
snapshots: Optional[list] = None,
extra_usage_info=None) -> None:
volume_utils.notify_about_group_snapshot_usage(
context, group_snapshot, event_suffix,
extra_usage_info=extra_usage_info, host=self.host)
@ -2884,7 +2892,7 @@ class VolumeManager(manager.CleanableManager,
extra_usage_info=extra_usage_info, host=self.host)
def extend_volume(self,
context,
context: context.RequestContext,
volume: objects.Volume,
new_size: int,
reservations) -> None:
@ -3137,7 +3145,10 @@ class VolumeManager(manager.CleanableManager,
replication_status = fields.ReplicationStatus.DISABLED
model_update['replication_status'] = replication_status
def manage_existing(self, ctxt, volume, ref=None) -> ovo_fields.UUIDField:
def manage_existing(self,
ctxt: context.RequestContext,
volume: objects.Volume,
ref=None) -> ovo_fields.UUIDField:
vol_ref = self._run_manage_existing_flow_engine(
ctxt, volume, ref)
@ -3165,7 +3176,7 @@ class VolumeManager(manager.CleanableManager,
allocated_capacity_gb=volume_reference.size)
def _run_manage_existing_flow_engine(self,
ctxt,
ctxt: context.RequestContext,
volume: objects.Volume,
ref):
try:
@ -3190,7 +3201,7 @@ class VolumeManager(manager.CleanableManager,
return vol_ref
def _get_cluster_or_host_filters(self) -> ty.Dict[str, ty.Any]:
def _get_cluster_or_host_filters(self) -> Dict[str, Any]:
if self.cluster:
filters = {'cluster_name': self.cluster}
else:
@ -3199,31 +3210,48 @@ class VolumeManager(manager.CleanableManager,
def _get_my_volumes_summary(
self,
ctxt: context.RequestContext):
ctxt: context.RequestContext) -> objects.VolumeList:
filters = self._get_cluster_or_host_filters()
return objects.VolumeList.get_volume_summary(ctxt, False, filters)
def _get_my_snapshots_summary(self, ctxt):
def _get_my_snapshots_summary(
self,
ctxt: context.RequestContext) -> objects.SnapshotList:
filters = self._get_cluster_or_host_filters()
return objects.SnapshotList.get_snapshot_summary(ctxt, False, filters)
def _get_my_resources(self, ctxt, ovo_class_list, limit=None, offset=None):
def _get_my_resources(self,
ctxt: context.RequestContext,
ovo_class_list,
limit: Optional[int] = None,
offset: Optional[int] = None) -> list:
filters = self._get_cluster_or_host_filters()
return getattr(ovo_class_list, 'get_all')(ctxt, filters=filters,
limit=limit,
offset=offset)
def _get_my_volumes(self,
ctxt, limit=None, offset=None) -> objects.VolumeList:
ctxt: context.RequestContext,
limit: Optional[int] = None,
offset: Optional[int] = None) -> objects.VolumeList:
return self._get_my_resources(ctxt, objects.VolumeList,
limit, offset)
def _get_my_snapshots(self, ctxt, limit=None, offset=None):
def _get_my_snapshots(
self,
ctxt: context.RequestContext,
limit: Optional[int] = None,
offset: Optional[int] = None) -> objects.SnapshotList:
return self._get_my_resources(ctxt, objects.SnapshotList,
limit, offset)
def get_manageable_volumes(self, ctxt, marker, limit, offset, sort_keys,
sort_dirs, want_objects=False):
def get_manageable_volumes(self,
ctxt: context.RequestContext,
marker,
limit: Optional[int],
offset: Optional[int],
sort_keys,
sort_dirs, want_objects=False) -> list:
try:
volume_utils.require_driver_initialized(self.driver)
except exception.DriverNotInitialized:
@ -3307,9 +3335,12 @@ class VolumeManager(manager.CleanableManager,
'id': group.id})
return group
def create_group_from_src(self, context, group,
group_snapshot=None,
source_group=None) -> objects.Group:
def create_group_from_src(
self,
context: context.RequestContext,
group: objects.Group,
group_snapshot: Optional[objects.GroupSnapshot] = None,
source_group=None) -> objects.Group:
"""Creates the group from source.
The source can be a group snapshot or a source group.
@ -3468,11 +3499,15 @@ class VolumeManager(manager.CleanableManager,
return group
def _create_group_from_src_generic(
self, context, group, volumes,
group_snapshot=None, snapshots=None,
source_group=None,
source_vols=None) -> ty.Tuple[ty.Dict[str, str],
ty.List[ty.Dict[str, str]]]:
self,
context: context.RequestContext,
group: objects.Group,
volumes: List[objects.Volume],
group_snapshot: Optional[objects.GroupSnapshot] = None,
snapshots: Optional[List[objects.Snapshot]] = None,
source_group: Optional[objects.Group] = None,
source_vols: Optional[List[objects.Volume]] = None) \
-> Tuple[Dict[str, str], List[Dict[str, str]]]:
"""Creates a group from source.
:param context: the context of the caller.
@ -3485,7 +3520,7 @@ class VolumeManager(manager.CleanableManager,
:returns: model_update, volumes_model_update
"""
model_update = {'status': 'available'}
volumes_model_update: list = []
volumes_model_update: List[dict] = []
for vol in volumes:
if snapshots:
for snapshot in snapshots:
@ -3548,7 +3583,9 @@ class VolumeManager(manager.CleanableManager,
return sorted_snapshots
def _sort_source_vols(self, volumes, source_vols) -> list:
def _sort_source_vols(self,
volumes,
source_vols: objects.VolumeList) -> list:
# Sort source volumes so that they are in the same order as their
# corresponding target volumes. Each source volume in the source_vols
# list should have a corresponding target volume in the volumes list.
@ -3572,7 +3609,8 @@ class VolumeManager(manager.CleanableManager,
return sorted_source_vols
def _update_volume_from_src(self,
context, vol, update, group=None) -> None:
context: context.RequestContext,
vol, update, group=None) -> None:
try:
snapshot_id = vol.get('snapshot_id')
source_volid = vol.get('source_volid')
@ -3628,9 +3666,9 @@ class VolumeManager(manager.CleanableManager,
self.db.volume_update(context, vol['id'], update)
def _update_allocated_capacity(self,
vol,
decrement=False,
host: str = None) -> None:
vol: objects.Volume,
decrement: bool = False,
host: Optional[str] = None) -> None:
# Update allocated capacity in volume stats
host = host or vol['host']
pool = volume_utils.extract_host(host, 'pool')
@ -3648,7 +3686,9 @@ class VolumeManager(manager.CleanableManager,
self.stats['pools'][pool] = dict(
allocated_capacity_gb=max(vol_size, 0))
def delete_group(self, context, group: objects.Group) -> None:
def delete_group(self,
context: context.RequestContext,
group: objects.Group) -> None:
"""Deletes group and the volumes in the group."""
context = context.elevated()
project_id = group.project_id
@ -3725,6 +3765,7 @@ class VolumeManager(manager.CleanableManager,
vol_obj.save()
# Get reservations for group
grpreservations: Optional[list]
try:
reserve_opts = {'groups': -1}
grpreservations = GROUP_QUOTAS.reserve(context,
@ -3739,6 +3780,7 @@ class VolumeManager(manager.CleanableManager,
for vol in volumes:
# Get reservations for volume
reservations: Optional[list]
try:
reserve_opts = {'volumes': -1,
'gigabytes': -vol.size}
@ -3779,8 +3821,8 @@ class VolumeManager(manager.CleanableManager,
def _convert_group_to_cg(
self,
group: objects.Group,
volumes: objects.VolumeList) -> ty.Tuple[objects.Group,
objects.VolumeList]:
volumes: objects.VolumeList) -> Tuple[objects.Group,
objects.VolumeList]:
if not group:
return None, None
cg = consistencygroup.ConsistencyGroup()
@ -3791,7 +3833,8 @@ class VolumeManager(manager.CleanableManager,
return cg, volumes
def _remove_consistencygroup_id_from_volumes(self, volumes) -> None:
def _remove_consistencygroup_id_from_volumes(
self, volumes: Optional[List[objects.Volume]]) -> None:
if not volumes:
return
for vol in volumes:
@ -3802,8 +3845,8 @@ class VolumeManager(manager.CleanableManager,
self,
group_snapshot: objects.GroupSnapshot,
snapshots: objects.SnapshotList,
ctxt) -> ty.Tuple[objects.CGSnapshot,
objects.SnapshotList]:
ctxt) -> Tuple[objects.CGSnapshot,
objects.SnapshotList]:
if not group_snapshot:
return None, None
cgsnap = cgsnapshot.CGSnapshot()
@ -3820,21 +3863,27 @@ class VolumeManager(manager.CleanableManager,
return cgsnap, snapshots
def _remove_cgsnapshot_id_from_snapshots(self, snapshots) -> None:
def _remove_cgsnapshot_id_from_snapshots(
self, snapshots: Optional[list]) -> None:
if not snapshots:
return
for snap in snapshots:
snap.cgsnapshot_id = None
snap.cgsnapshot = None
def _create_group_generic(self, context, group) -> dict:
def _create_group_generic(self,
context: context.RequestContext,
group) -> dict:
"""Creates a group."""
# A group entry is already created in db. Just returns a status here.
model_update = {'status': fields.GroupStatus.AVAILABLE,
'created_at': timeutils.utcnow()}
return model_update
def _delete_group_generic(self, context, group, volumes) -> ty.Tuple:
def _delete_group_generic(self,
context: context.RequestContext,
group: objects.Group,
volumes) -> Tuple:
"""Deletes a group and volumes in the group."""
model_update = {'status': group.status}
volume_model_updates = []
@ -3854,9 +3903,9 @@ class VolumeManager(manager.CleanableManager,
return model_update, volume_model_updates
def _update_group_generic(
self, context, group,
self, context: context.RequestContext, group,
add_volumes=None,
remove_volumes=None) -> ty.Tuple[None, None, None]:
remove_volumes=None) -> Tuple[None, None, None]:
"""Updates a group."""
# NOTE(xyang): The volume manager adds/removes the volume to/from the
# group in the database. This default implementation does not do
@ -3864,8 +3913,12 @@ class VolumeManager(manager.CleanableManager,
return None, None, None
def _collect_volumes_for_group(
self, context, group, volumes, add=True) -> list:
valid_status: ty.Tuple[str, ...]
self,
context: context.RequestContext,
group,
volumes: Optional[str],
add: bool = True) -> list:
valid_status: Tuple[str, ...]
if add:
valid_status = VALID_ADD_VOL_TO_GROUP_STATUS
else:
@ -3900,8 +3953,11 @@ class VolumeManager(manager.CleanableManager,
volumes_ref.append(add_vol_ref)
return volumes_ref
def update_group(self, context, group,
add_volumes=None, remove_volumes=None) -> None:
def update_group(self,
context: context.RequestContext,
group,
add_volumes: Optional[str] = None,
remove_volumes: Optional[str] = None) -> None:
"""Updates group.
Update group by adding volumes to the group,
@ -4002,7 +4058,7 @@ class VolumeManager(manager.CleanableManager,
def create_group_snapshot(
self,
context,
context: context.RequestContext,
group_snapshot: objects.GroupSnapshot) -> objects.GroupSnapshot:
"""Creates the group_snapshot."""
caller_context = context
@ -4125,8 +4181,10 @@ class VolumeManager(manager.CleanableManager,
return group_snapshot
def _create_group_snapshot_generic(
self, context, group_snapshot,
snapshots) -> ty.Tuple[dict, ty.List[dict]]:
self,
context: context.RequestContext,
group_snapshot: objects.GroupSnapshot,
snapshots: list) -> Tuple[dict, List[dict]]:
"""Creates a group_snapshot."""
model_update = {'status': 'available'}
snapshot_model_updates = []
@ -4148,9 +4206,11 @@ class VolumeManager(manager.CleanableManager,
return model_update, snapshot_model_updates
def _delete_group_snapshot_generic(self, context, group_snapshot,
snapshots) -> ty.Tuple[dict,
ty.List[dict]]:
def _delete_group_snapshot_generic(
self,
context: context.RequestContext,
group_snapshot: objects.GroupSnapshot,
snapshots: list) -> Tuple[dict, List[dict]]:
"""Deletes a group_snapshot."""
model_update = {'status': group_snapshot.status}
snapshot_model_updates = []
@ -4171,7 +4231,9 @@ class VolumeManager(manager.CleanableManager,
return model_update, snapshot_model_updates
def delete_group_snapshot(self, context, group_snapshot) -> None:
def delete_group_snapshot(self,
context: context.RequestContext,
group_snapshot: objects.GroupSnapshot) -> None:
"""Deletes group_snapshot."""
caller_context = context
context = context.elevated()
@ -4260,6 +4322,7 @@ class VolumeManager(manager.CleanableManager,
for snapshot in snapshots:
# Get reservations
reservations: Optional[list]
try:
reserve_opts = {'snapshots': -1}
if not CONF.no_snapshot_gb_quota:
@ -4292,7 +4355,11 @@ class VolumeManager(manager.CleanableManager,
"delete.end",
snapshots)
def update_migrated_volume(self, ctxt, volume, new_volume, volume_status):
def update_migrated_volume(self,
ctxt: context.RequestContext,
volume: objects.Volume,
new_volume: objects.Volume,
volume_status) -> None:
"""Finalize migration process on backend device."""
model_update = None
model_update_default = {'_name_id': new_volume.name_id,
@ -4499,7 +4566,9 @@ class VolumeManager(manager.CleanableManager,
# TODO(geguileo): In P - remove this
failover_host = failover
def finish_failover(self, context, service, updates) -> None:
def finish_failover(self,
context: context.RequestContext,
service, updates) -> None:
"""Completion of the failover locally or via RPC."""
# If the service is clustered, broadcast the service changes to all
# volume services, including this one.
@ -4516,7 +4585,9 @@ class VolumeManager(manager.CleanableManager,
service.update(updates)
service.save()
def failover_completed(self, context, updates) -> None:
def failover_completed(self,
context: context.RequestContext,
updates) -> None:
"""Finalize failover of this backend.
When a service is clustered and replicated the failover has 2 stages,
@ -4541,7 +4612,7 @@ class VolumeManager(manager.CleanableManager,
fields.ReplicationStatus.ERROR)
service.save()
def freeze_host(self, context) -> bool:
def freeze_host(self, context: context.RequestContext) -> bool:
"""Freeze management plane on this backend.
Basically puts the control/management plane into a
@ -4571,7 +4642,7 @@ class VolumeManager(manager.CleanableManager,
LOG.info("Set backend status to frozen successfully.")
return True
def thaw_host(self, context) -> bool:
def thaw_host(self, context: context.RequestContext) -> bool:
"""UnFreeze management plane on this backend.
Basically puts the control/management plane back into
@ -4601,8 +4672,8 @@ class VolumeManager(manager.CleanableManager,
return True
def manage_existing_snapshot(self,
ctxt,
snapshot,
ctxt: context.RequestContext,
snapshot: objects.Snapshot,
ref=None) -> ovo_fields.UUIDField:
LOG.debug('manage_existing_snapshot: managing %s.', ref)
try:
@ -4625,7 +4696,11 @@ class VolumeManager(manager.CleanableManager,
flow_engine.run()
return snapshot.id
def get_manageable_snapshots(self, ctxt, marker, limit, offset,
def get_manageable_snapshots(self,
ctxt: context.RequestContext,
marker,
limit: Optional[int],
offset: Optional[int],
sort_keys, sort_dirs, want_objects=False):
try:
volume_utils.require_driver_initialized(self.driver)
@ -4650,7 +4725,9 @@ class VolumeManager(manager.CleanableManager,
"to driver error.")
return driver_entries
def get_capabilities(self, context, discover):
def get_capabilities(self,
context: context.RequestContext,
discover: bool):
"""Get capabilities of backend storage."""
if discover:
self.driver.init_capabilities()
@ -4658,7 +4735,10 @@ class VolumeManager(manager.CleanableManager,
LOG.debug("Obtained capabilities list: %s.", capabilities)
return capabilities
def get_backup_device(self, ctxt, backup, want_objects=False):
def get_backup_device(self,
ctxt: context.RequestContext,
backup: objects.Backup,
want_objects: bool = False):
(backup_device, is_snapshot) = (
self.driver.get_backup_device(ctxt, backup))
secure_enabled = self.driver.secure_file_operations_enabled()
@ -4671,17 +4751,18 @@ class VolumeManager(manager.CleanableManager,
ctxt)
if want_objects else backup_device_dict)
def secure_file_operations_enabled(self,
ctxt: context.RequestContext,
volume):
def secure_file_operations_enabled(
self,
ctxt: context.RequestContext,
volume: Optional[objects.Volume]) -> bool:
secure_enabled = self.driver.secure_file_operations_enabled()
return secure_enabled
def _connection_create(self,
ctxt: context.RequestContext,
volume,
attachment,
connector) -> dict:
volume: objects.Volume,
attachment: objects.VolumeAttachment,
connector: dict) -> Dict[str, Any]:
try:
self.driver.validate_connector(connector)
except exception.InvalidConnectorException as err:
@ -4734,9 +4815,9 @@ class VolumeManager(manager.CleanableManager,
def attachment_update(self,
context: context.RequestContext,
vref,
vref: objects.Volume,
connector: dict,
attachment_id: str) -> dict:
attachment_id: str) -> Dict[str, Any]:
"""Update/Finalize an attachment.
This call updates a valid attachment record to associate with a volume
@ -4803,7 +4884,7 @@ class VolumeManager(manager.CleanableManager,
context: context.RequestContext,
volume,
attachment,
force: bool = False) -> ty.Union[None, bool]:
force: bool = False) -> Optional[bool]:
"""Remove a volume connection, but leave attachment.
Exits early if the attachment does not have a connector and returns
@ -4845,8 +4926,8 @@ class VolumeManager(manager.CleanableManager,
def attachment_delete(self,
context: context.RequestContext,
attachment_id,
vref) -> None:
attachment_id: str,
vref: objects.Volume) -> None:
"""Delete/Detach the specified attachment.
Notifies the backend device that we're detaching the specified
@ -4972,7 +5053,9 @@ class VolumeManager(manager.CleanableManager,
'id': group.id})
# Replication group API (Tiramisu)
def disable_replication(self, ctxt: context.RequestContext, group) -> None:
def disable_replication(self,
ctxt: context.RequestContext,
group: objects.Group) -> None:
"""Disable replication."""
group.refresh()
if group.replication_status != fields.ReplicationStatus.DISABLING:
@ -5057,7 +5140,8 @@ class VolumeManager(manager.CleanableManager,
# Replication group API (Tiramisu)
def failover_replication(self, ctxt: context.RequestContext,
group, allow_attached_volume=False,
group: objects.Group,
allow_attached_volume: bool = False,
secondary_backend_id=None) -> None:
"""Failover replication."""
group.refresh()
@ -5154,7 +5238,9 @@ class VolumeManager(manager.CleanableManager,
resource={'type': 'group',
'id': group.id})
def list_replication_targets(self, ctxt, group) -> ty.Dict[str, list]:
def list_replication_targets(self,
ctxt: context.RequestContext,
group: objects.Group) -> Dict[str, list]:
"""Provide a means to obtain replication targets for a group.
This method is used to find the replication_device config

View File

@ -12,8 +12,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from typing import Optional, Tuple, Union # noqa: H301
from cinder.common import constants
from cinder import context
from cinder import objects
from cinder import quota
from cinder import rpc
@ -141,7 +143,10 @@ class VolumeAPI(rpc.RPCAPI):
TOPIC = constants.VOLUME_TOPIC
BINARY = constants.VOLUME_BINARY
def _get_cctxt(self, host=None, version=None, **kwargs):
def _get_cctxt(self,
host: str = None,
version: Union[str, Tuple[str, ...]] = None,
**kwargs) -> rpc.RPCAPI:
if host:
server = volume_utils.extract_host(host)
@ -158,8 +163,12 @@ class VolumeAPI(rpc.RPCAPI):
return super(VolumeAPI, self)._get_cctxt(version=version, **kwargs)
def create_volume(self, ctxt, volume, request_spec, filter_properties,
allow_reschedule=True):
def create_volume(self,
ctxt: context.RequestContext,
volume: 'objects.Volume',
request_spec: Optional[dict],
filter_properties: Optional[dict],
allow_reschedule: bool = True) -> None:
cctxt = self._get_cctxt(volume.service_topic_queue)
cctxt.cast(ctxt, 'create_volume',
request_spec=request_spec,
@ -174,7 +183,11 @@ class VolumeAPI(rpc.RPCAPI):
cctxt.cast(ctxt, 'revert_to_snapshot', volume=volume,
snapshot=snapshot)
def delete_volume(self, ctxt, volume, unmanage_only=False, cascade=False):
def delete_volume(self,
ctxt: context.RequestContext,
volume: 'objects.Volume',
unmanage_only: bool = False,
cascade: bool = False) -> None:
volume.create_worker()
cctxt = self._get_cctxt(volume.service_topic_queue)
msg_args = {
@ -184,7 +197,10 @@ class VolumeAPI(rpc.RPCAPI):
cctxt.cast(ctxt, 'delete_volume', **msg_args)
def create_snapshot(self, ctxt, volume, snapshot):
def create_snapshot(self,
ctxt: context.RequestContext,
volume: 'objects.Volume',
snapshot: 'objects.Snapshot') -> None:
snapshot.create_worker()
cctxt = self._get_cctxt(volume.service_topic_queue)
cctxt.cast(ctxt, 'create_snapshot', snapshot=snapshot)
@ -393,7 +409,9 @@ class VolumeAPI(rpc.RPCAPI):
return cctxt.call(ctxt, 'get_manageable_snapshots', **msg_args)
def create_group(self, ctxt, group):
def create_group(self,
ctxt: context.RequestContext,
group: 'objects.Group') -> None:
cctxt = self._get_cctxt(group.service_topic_queue)
cctxt.cast(ctxt, 'create_group', group=group)

View File

@ -1,3 +1,5 @@
cinder/backup/manager.py
cinder/common/constants.py
cinder/context.py
cinder/i18n.py
cinder/image/cache.py
@ -5,10 +7,12 @@ cinder/image/glance.py
cinder/image/image_utils.py
cinder/exception.py
cinder/manager.py
cinder/scheduler/manager.py
cinder/utils.py
cinder/volume/__init__.py
cinder/volume/flows/api/create_volume.py
cinder/volume/flows/manager/create_volume.py
cinder/volume/manager.py
cinder/volume/rpcapi.py
cinder/volume/volume_types.py
cinder/volume/volume_utils.py