mypy: filter scheduler
Change-Id: I426bb8cc4a01b64b2551f36fd0004bb03aa53e47
This commit is contained in:
parent
61335da5bc
commit
5df7daabee
@ -16,6 +16,8 @@
|
||||
"""
|
||||
Filter support
|
||||
"""
|
||||
from typing import Iterable
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from cinder.scheduler import base_handler
|
||||
@ -25,7 +27,7 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
class BaseFilter(object):
|
||||
"""Base class for all filter classes."""
|
||||
def _filter_one(self, obj, filter_properties):
|
||||
def _filter_one(self, obj, filter_properties) -> bool:
|
||||
"""Return True if it passes the filter, False otherwise.
|
||||
|
||||
Override this in a subclass.
|
||||
@ -79,8 +81,8 @@ class BaseFilterHandler(base_handler.BaseHandler):
|
||||
"volume ID '%(vol_id)s'. Filter results: %(str_results)s",
|
||||
msg_dict)
|
||||
|
||||
def get_filtered_objects(self, filter_classes, objs,
|
||||
filter_properties, index=0):
|
||||
def get_filtered_objects(self, filter_classes, objs: Iterable,
|
||||
filter_properties: dict, index: int = 0) -> list:
|
||||
"""Get objects after filter
|
||||
|
||||
:param filter_classes: filters that will be used to filter the
|
||||
|
@ -31,7 +31,7 @@ class BaseHandler(object):
|
||||
self.modifier_class_type = modifier_class_type
|
||||
self.extension_manager = extension.ExtensionManager(modifier_namespace)
|
||||
|
||||
def _is_correct_class(self, cls):
|
||||
def _is_correct_class(self, cls) -> bool:
|
||||
"""Return whether an object is a class of the correct type.
|
||||
|
||||
(or is not prefixed with an underscore)
|
||||
@ -40,7 +40,7 @@ class BaseHandler(object):
|
||||
not cls.__name__.startswith('_') and
|
||||
issubclass(cls, self.modifier_class_type))
|
||||
|
||||
def get_all_classes(self):
|
||||
def get_all_classes(self) -> list:
|
||||
# We use a set, as some classes may have an entrypoint of their own,
|
||||
# and also be returned by a function such as 'all_filters' for example
|
||||
return [ext.plugin for ext in self.extension_manager if
|
||||
|
@ -18,6 +18,7 @@ Pluggable Weighing support
|
||||
"""
|
||||
|
||||
import abc
|
||||
from typing import Iterable, List, Optional # noqa: H301
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
@ -27,7 +28,9 @@ from cinder.scheduler import base_handler
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def normalize(weight_list, minval=None, maxval=None):
|
||||
def normalize(weight_list: List[float],
|
||||
minval: float = None,
|
||||
maxval: float = None) -> Iterable[float]:
|
||||
"""Normalize the values in a list between 0 and 1.0.
|
||||
|
||||
The normalization is made regarding the lower and upper values present in
|
||||
@ -58,11 +61,11 @@ def normalize(weight_list, minval=None, maxval=None):
|
||||
|
||||
class WeighedObject(object):
|
||||
"""Object with weight information."""
|
||||
def __init__(self, obj, weight):
|
||||
def __init__(self, obj, weight: float):
|
||||
self.obj = obj
|
||||
self.weight = weight
|
||||
|
||||
def __repr__(self):
|
||||
def __repr__(self) -> str:
|
||||
return "<WeighedObject '%s': %s>" % (self.obj, self.weight)
|
||||
|
||||
|
||||
@ -75,10 +78,10 @@ class BaseWeigher(object, metaclass=abc.ABCMeta):
|
||||
from the calculated weights.
|
||||
"""
|
||||
|
||||
minval = None
|
||||
maxval = None
|
||||
minval: Optional[float] = None
|
||||
maxval: Optional[float] = None
|
||||
|
||||
def weight_multiplier(self):
|
||||
def weight_multiplier(self) -> float:
|
||||
"""How weighted this weigher should be.
|
||||
|
||||
Override this method in a subclass, so that the returned value is
|
||||
@ -88,10 +91,12 @@ class BaseWeigher(object, metaclass=abc.ABCMeta):
|
||||
return 1.0
|
||||
|
||||
@abc.abstractmethod
|
||||
def _weigh_object(self, obj, weight_properties):
|
||||
def _weigh_object(self, obj, weight_properties: dict) -> float:
|
||||
"""Override in a subclass to specify a weight for a specific object."""
|
||||
|
||||
def weigh_objects(self, weighed_obj_list, weight_properties):
|
||||
def weigh_objects(self,
|
||||
weighed_obj_list: List[WeighedObject],
|
||||
weight_properties: dict) -> List[float]:
|
||||
"""Weigh multiple objects.
|
||||
|
||||
Override in a subclass if you need access to all objects in order
|
||||
@ -123,8 +128,10 @@ class BaseWeigher(object, metaclass=abc.ABCMeta):
|
||||
class BaseWeightHandler(base_handler.BaseHandler):
|
||||
object_class = WeighedObject
|
||||
|
||||
def get_weighed_objects(self, weigher_classes, obj_list,
|
||||
weighing_properties):
|
||||
def get_weighed_objects(self,
|
||||
weigher_classes: list,
|
||||
obj_list: List[WeighedObject],
|
||||
weighing_properties: dict) -> List[WeighedObject]:
|
||||
"""Return a sorted (descending), normalized list of WeighedObjects."""
|
||||
|
||||
if not obj_list:
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import operator
|
||||
import re
|
||||
from typing import Callable, Dict # noqa: H301
|
||||
|
||||
import pyparsing
|
||||
|
||||
@ -167,7 +168,7 @@ class EvalTernaryOp(object):
|
||||
|
||||
|
||||
class EvalFunction(object):
|
||||
functions = {
|
||||
functions: Dict[str, Callable] = {
|
||||
"abs": abs,
|
||||
"max": max,
|
||||
"min": min,
|
||||
|
@ -20,6 +20,8 @@ You can customize this scheduler by specifying your own volume Filters and
|
||||
Weighing Functions.
|
||||
"""
|
||||
|
||||
from typing import List, Optional # noqa: H301
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
@ -27,8 +29,11 @@ from oslo_serialization import jsonutils
|
||||
from cinder import context
|
||||
from cinder import exception
|
||||
from cinder.i18n import _
|
||||
from cinder import objects
|
||||
from cinder.scheduler import driver
|
||||
from cinder.scheduler.host_manager import BackendState
|
||||
from cinder.scheduler import scheduler_options
|
||||
from cinder.scheduler.weights import WeighedHost
|
||||
from cinder.volume import volume_utils
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -43,7 +48,7 @@ class FilterScheduler(driver.Scheduler):
|
||||
self.options = scheduler_options.SchedulerOptions()
|
||||
self.max_attempts = self._max_attempts()
|
||||
|
||||
def _get_configuration_options(self):
|
||||
def _get_configuration_options(self) -> dict:
|
||||
"""Fetch options dictionary. Broken out for testing."""
|
||||
return self.options.get_configuration()
|
||||
|
||||
@ -117,7 +122,7 @@ class FilterScheduler(driver.Scheduler):
|
||||
|
||||
def backend_passes_filters(self,
|
||||
context: context.RequestContext,
|
||||
backend,
|
||||
backend: str,
|
||||
request_spec: dict,
|
||||
filter_properties: dict):
|
||||
"""Check if the specified backend passes the filters."""
|
||||
@ -146,8 +151,8 @@ class FilterScheduler(driver.Scheduler):
|
||||
def find_retype_backend(self,
|
||||
context: context.RequestContext,
|
||||
request_spec: dict,
|
||||
filter_properties: dict = None,
|
||||
migration_policy: str = 'never'):
|
||||
filter_properties: Optional[dict] = None,
|
||||
migration_policy: str = 'never') -> BackendState:
|
||||
"""Find a backend that can accept the volume with its new type."""
|
||||
filter_properties = filter_properties or {}
|
||||
backend = (request_spec['volume_properties'].get('cluster_name')
|
||||
@ -197,11 +202,13 @@ class FilterScheduler(driver.Scheduler):
|
||||
top_backend = self._choose_top_backend(weighed_backends, request_spec)
|
||||
return top_backend.obj
|
||||
|
||||
def get_pools(self, context, filters):
|
||||
def get_pools(self, context: context.RequestContext, filters: dict):
|
||||
return self.host_manager.get_pools(context, filters)
|
||||
|
||||
def _post_select_populate_filter_properties(self, filter_properties: dict,
|
||||
backend_state) -> None:
|
||||
def _post_select_populate_filter_properties(
|
||||
self,
|
||||
filter_properties: dict,
|
||||
backend_state: BackendState) -> None:
|
||||
"""Populate filter properties with additional information.
|
||||
|
||||
Add additional information to the filter properties after a backend has
|
||||
@ -233,7 +240,7 @@ class FilterScheduler(driver.Scheduler):
|
||||
"must be >=1"))
|
||||
return max_attempts
|
||||
|
||||
def _log_volume_error(self, volume_id, retry):
|
||||
def _log_volume_error(self, volume_id: str, retry: dict) -> None:
|
||||
"""Log requests with exceptions from previous volume operations."""
|
||||
exc = retry.pop('exc', None) # string-ified exception from volume
|
||||
if not exc:
|
||||
@ -251,7 +258,9 @@ class FilterScheduler(driver.Scheduler):
|
||||
'last_backend': last_backend,
|
||||
'exc': exc})
|
||||
|
||||
def _populate_retry(self, filter_properties, request_spec):
|
||||
def _populate_retry(self,
|
||||
filter_properties: dict,
|
||||
request_spec: dict) -> None:
|
||||
"""Populate filter properties with history of retries for request.
|
||||
|
||||
If maximum retries is exceeded, raise NoValidBackend.
|
||||
@ -274,8 +283,8 @@ class FilterScheduler(driver.Scheduler):
|
||||
}
|
||||
filter_properties['retry'] = retry
|
||||
|
||||
resource_id = request_spec.get(
|
||||
'volume_id') or request_spec.get("group_id")
|
||||
resource_id = str(request_spec.get(
|
||||
'volume_id')) or str(request_spec.get("group_id"))
|
||||
self._log_volume_error(resource_id, retry)
|
||||
|
||||
if retry['num_attempts'] > max_attempts:
|
||||
@ -285,10 +294,11 @@ class FilterScheduler(driver.Scheduler):
|
||||
{'max_attempts': max_attempts,
|
||||
'resource_id': resource_id})
|
||||
|
||||
def _get_weighted_candidates(self,
|
||||
context: context.RequestContext,
|
||||
request_spec: dict,
|
||||
filter_properties: dict = None) -> list:
|
||||
def _get_weighted_candidates(
|
||||
self,
|
||||
context: context.RequestContext,
|
||||
request_spec: dict,
|
||||
filter_properties: Optional[dict] = None) -> list:
|
||||
"""Return a list of backends that meet required specs.
|
||||
|
||||
Returned list is ordered by their fitness.
|
||||
@ -365,9 +375,10 @@ class FilterScheduler(driver.Scheduler):
|
||||
return weighed_backends
|
||||
|
||||
def _get_weighted_candidates_generic_group(
|
||||
self, context, group_spec, request_spec_list,
|
||||
group_filter_properties=None,
|
||||
filter_properties_list=None) -> list:
|
||||
self, context: context.RequestContext,
|
||||
group_spec: dict, request_spec_list: List[dict],
|
||||
group_filter_properties: Optional[dict] = None,
|
||||
filter_properties_list: Optional[List[dict]] = None) -> list:
|
||||
"""Finds backends that supports the group.
|
||||
|
||||
Returns a list of backends that meet the required specs,
|
||||
@ -474,8 +485,8 @@ class FilterScheduler(driver.Scheduler):
|
||||
return new_backends
|
||||
|
||||
def _get_weighted_candidates_by_group_type(
|
||||
self, context, group_spec,
|
||||
group_filter_properties=None) -> list:
|
||||
self, context: context.RequestContext, group_spec: dict,
|
||||
group_filter_properties: dict = None) -> List[WeighedHost]:
|
||||
"""Finds backends that supports the group type.
|
||||
|
||||
Returns a list of backends that meet the required specs,
|
||||
@ -537,7 +548,10 @@ class FilterScheduler(driver.Scheduler):
|
||||
|
||||
return weighed_backends
|
||||
|
||||
def _schedule(self, context, request_spec, filter_properties=None):
|
||||
def _schedule(self,
|
||||
context: context.RequestContext,
|
||||
request_spec: dict,
|
||||
filter_properties: Optional[dict] = None):
|
||||
weighed_backends = self._get_weighted_candidates(context, request_spec,
|
||||
filter_properties)
|
||||
# When we get the weighed_backends, we clear those backends that don't
|
||||
@ -557,15 +571,22 @@ class FilterScheduler(driver.Scheduler):
|
||||
if backend_id != resource_backend:
|
||||
weighed_backends.remove(backend)
|
||||
if not weighed_backends:
|
||||
assert filter_properties is not None
|
||||
LOG.warning('No weighed backend found for volume '
|
||||
'with properties: %s',
|
||||
filter_properties['request_spec'].get('volume_type'))
|
||||
return None
|
||||
return self._choose_top_backend(weighed_backends, request_spec)
|
||||
|
||||
def _schedule_generic_group(self, context, group_spec, request_spec_list,
|
||||
group_filter_properties=None,
|
||||
filter_properties_list=None):
|
||||
def _schedule_generic_group(
|
||||
self,
|
||||
context: context.RequestContext,
|
||||
group_spec: dict,
|
||||
request_spec_list: list,
|
||||
group_filter_properties: Optional[dict] = None,
|
||||
filter_properties_list: Optional[list] = None) \
|
||||
-> Optional[WeighedHost]:
|
||||
|
||||
weighed_backends = self._get_weighted_candidates_generic_group(
|
||||
context,
|
||||
group_spec,
|
||||
@ -576,7 +597,9 @@ class FilterScheduler(driver.Scheduler):
|
||||
return None
|
||||
return self._choose_top_backend_generic_group(weighed_backends)
|
||||
|
||||
def _choose_top_backend(self, weighed_backends: list, request_spec: dict):
|
||||
def _choose_top_backend(self,
|
||||
weighed_backends: List[WeighedHost],
|
||||
request_spec: dict):
|
||||
top_backend = weighed_backends[0]
|
||||
backend_state = top_backend.obj
|
||||
LOG.debug("Choosing %s", backend_state.backend_id)
|
||||
@ -584,11 +607,13 @@ class FilterScheduler(driver.Scheduler):
|
||||
backend_state.consume_from_volume(volume_properties)
|
||||
return top_backend
|
||||
|
||||
def _choose_top_backend_generic_group(self, weighed_backends):
|
||||
def _choose_top_backend_generic_group(
|
||||
self,
|
||||
weighed_backends: List[WeighedHost]) -> WeighedHost:
|
||||
top_backend = weighed_backends[0]
|
||||
backend_state = top_backend.obj
|
||||
LOG.debug("Choosing %s", backend_state.backend_id)
|
||||
return top_backend
|
||||
|
||||
def get_backup_host(self, volume, driver=None):
|
||||
def get_backup_host(self, volume: objects.Volume, driver=None):
|
||||
return self.host_manager.get_backup_host(volume, driver)
|
||||
|
@ -10,15 +10,20 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from typing import Any, Dict, Optional # noqa: H301
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
import taskflow.engines
|
||||
import taskflow.engines.base
|
||||
from taskflow.patterns import linear_flow
|
||||
|
||||
from cinder import context
|
||||
from cinder import exception
|
||||
from cinder import flow_utils
|
||||
from cinder.message import api as message_api
|
||||
from cinder.message import message_field
|
||||
from cinder import objects
|
||||
from cinder import rpc
|
||||
from cinder import utils
|
||||
from cinder.volume.flows import common
|
||||
@ -40,7 +45,11 @@ class ExtractSchedulerSpecTask(flow_utils.CinderTask):
|
||||
super(ExtractSchedulerSpecTask, self).__init__(addons=[ACTION],
|
||||
**kwargs)
|
||||
|
||||
def _populate_request_spec(self, volume, snapshot_id, image_id, backup_id):
|
||||
def _populate_request_spec(self,
|
||||
volume: objects.Volume,
|
||||
snapshot_id: Optional[str],
|
||||
image_id: Optional[str],
|
||||
backup_id: Optional[str]) -> Dict[str, Any]:
|
||||
# Create the full request spec using the volume object.
|
||||
#
|
||||
# NOTE(dulek): At this point, a volume can be deleted before it gets
|
||||
@ -62,8 +71,13 @@ class ExtractSchedulerSpecTask(flow_utils.CinderTask):
|
||||
'volume_type': list(dict(vol_type).items()),
|
||||
}
|
||||
|
||||
def execute(self, context, request_spec, volume, snapshot_id,
|
||||
image_id, backup_id):
|
||||
def execute(self,
|
||||
context: context.RequestContext,
|
||||
request_spec: Optional[dict],
|
||||
volume: objects.Volume,
|
||||
snapshot_id: Optional[str],
|
||||
image_id: Optional[str],
|
||||
backup_id: Optional[str]) -> Dict[str, Any]:
|
||||
# For RPC version < 1.2 backward compatibility
|
||||
if request_spec is None:
|
||||
request_spec = self._populate_request_spec(volume,
|
||||
@ -92,7 +106,10 @@ class ScheduleCreateVolumeTask(flow_utils.CinderTask):
|
||||
self.driver_api = driver_api
|
||||
self.message_api = message_api.API()
|
||||
|
||||
def _handle_failure(self, context, request_spec, cause):
|
||||
def _handle_failure(self,
|
||||
context: context.RequestContext,
|
||||
request_spec: dict,
|
||||
cause: Exception) -> None:
|
||||
try:
|
||||
self._notify_failure(context, request_spec, cause)
|
||||
finally:
|
||||
@ -100,7 +117,10 @@ class ScheduleCreateVolumeTask(flow_utils.CinderTask):
|
||||
{'cause': cause, 'name': self.name})
|
||||
|
||||
@utils.if_notifications_enabled
|
||||
def _notify_failure(self, context, request_spec, cause):
|
||||
def _notify_failure(self,
|
||||
context: context.RequestContext,
|
||||
request_spec: dict,
|
||||
cause: Exception) -> None:
|
||||
"""When scheduling fails send out an event that it failed."""
|
||||
payload = {
|
||||
'request_spec': request_spec,
|
||||
@ -118,7 +138,11 @@ class ScheduleCreateVolumeTask(flow_utils.CinderTask):
|
||||
"payload %(payload)s",
|
||||
{'topic': self.FAILURE_TOPIC, 'payload': payload})
|
||||
|
||||
def execute(self, context, request_spec, filter_properties, volume):
|
||||
def execute(self,
|
||||
context: context.RequestContext,
|
||||
request_spec: dict,
|
||||
filter_properties: dict,
|
||||
volume: objects.Volume) -> None:
|
||||
try:
|
||||
self.driver_api.schedule_create_volume(context, request_spec,
|
||||
filter_properties)
|
||||
@ -140,9 +164,14 @@ class ScheduleCreateVolumeTask(flow_utils.CinderTask):
|
||||
common.error_out(volume, reason=e)
|
||||
|
||||
|
||||
def get_flow(context, driver_api, request_spec=None,
|
||||
filter_properties=None,
|
||||
volume=None, snapshot_id=None, image_id=None, backup_id=None):
|
||||
def get_flow(context: context.RequestContext,
|
||||
driver_api,
|
||||
request_spec: Optional[dict] = None,
|
||||
filter_properties: Optional[dict] = None,
|
||||
volume: Optional[objects.Volume] = None,
|
||||
snapshot_id: Optional[str] = None,
|
||||
image_id: Optional[str] = None,
|
||||
backup_id: Optional[str] = None) -> taskflow.engines.base.Engine:
|
||||
|
||||
"""Constructs and returns the scheduler entrypoint flow.
|
||||
|
||||
|
@ -17,6 +17,9 @@
|
||||
|
||||
from collections import abc
|
||||
import random
|
||||
import typing
|
||||
from typing import (Any, Dict, Iterable, List, # noqa: H301
|
||||
Optional, Type, Union)
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
@ -69,7 +72,8 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
class ReadOnlyDict(abc.Mapping):
|
||||
"""A read-only dict."""
|
||||
def __init__(self, source=None):
|
||||
def __init__(self, source: Union[dict, 'ReadOnlyDict'] = None):
|
||||
self.data: dict
|
||||
if source is not None:
|
||||
self.data = dict(source)
|
||||
else:
|
||||
@ -81,27 +85,32 @@ class ReadOnlyDict(abc.Mapping):
|
||||
def __iter__(self):
|
||||
return iter(self.data)
|
||||
|
||||
def __len__(self):
|
||||
def __len__(self) -> int:
|
||||
return len(self.data)
|
||||
|
||||
def __repr__(self):
|
||||
def __repr__(self) -> str:
|
||||
return '%s(%r)' % (self.__class__.__name__, self.data)
|
||||
|
||||
|
||||
class BackendState(object):
|
||||
"""Mutable and immutable information tracked for a volume backend."""
|
||||
|
||||
def __init__(self, host, cluster_name, capabilities=None, service=None):
|
||||
def __init__(self,
|
||||
host: str,
|
||||
cluster_name: Optional[str],
|
||||
capabilities: Union[Optional[ReadOnlyDict],
|
||||
Optional[dict]] = None,
|
||||
service=None):
|
||||
# NOTE(geguileo): We have a circular dependency between BackendState
|
||||
# and PoolState and we resolve it with an instance attribute instead
|
||||
# of a class attribute that we would assign after the PoolState
|
||||
# declaration because this way we avoid splitting the code.
|
||||
self.pool_state_cls = PoolState
|
||||
self.pool_state_cls: Type[PoolState] = PoolState
|
||||
|
||||
self.capabilities = None
|
||||
self.service = None
|
||||
self.host = host
|
||||
self.cluster_name = cluster_name
|
||||
self.capabilities: Optional[ReadOnlyDict] = None
|
||||
self.service: Optional[ReadOnlyDict] = None
|
||||
self.host: str = host
|
||||
self.cluster_name: Optional[str] = cluster_name
|
||||
self.update_capabilities(capabilities, service)
|
||||
|
||||
self.volume_backend_name = None
|
||||
@ -127,20 +136,23 @@ class BackendState(object):
|
||||
self.thick_provisioning_support = False
|
||||
# Does this backend support attaching a volume to more than
|
||||
# one host/instance?
|
||||
self.multiattach = False
|
||||
self.multiattach: bool = False
|
||||
self.filter_function = None
|
||||
self.goodness_function = 0
|
||||
|
||||
# PoolState for all pools
|
||||
self.pools = {}
|
||||
self.pools: dict = {}
|
||||
|
||||
self.updated = None
|
||||
|
||||
@property
|
||||
def backend_id(self):
|
||||
def backend_id(self) -> str:
|
||||
return self.cluster_name or self.host
|
||||
|
||||
def update_capabilities(self, capabilities=None, service=None):
|
||||
def update_capabilities(
|
||||
self,
|
||||
capabilities: Optional[Union[dict, ReadOnlyDict]] = None,
|
||||
service: Optional[dict] = None) -> None:
|
||||
# Read-only capability dicts
|
||||
|
||||
if capabilities is None:
|
||||
@ -150,7 +162,9 @@ class BackendState(object):
|
||||
service = {}
|
||||
self.service = ReadOnlyDict(service)
|
||||
|
||||
def update_from_volume_capability(self, capability, service=None):
|
||||
def update_from_volume_capability(self,
|
||||
capability: Dict[str, Any],
|
||||
service=None) -> None:
|
||||
"""Update information about a host from its volume_node info.
|
||||
|
||||
'capability' is the status info reported by volume backend, a typical
|
||||
@ -212,7 +226,7 @@ class BackendState(object):
|
||||
# Update pool level info
|
||||
self.update_pools(capability, service)
|
||||
|
||||
def update_pools(self, capability, service):
|
||||
def update_pools(self, capability: Optional[dict], service) -> None:
|
||||
"""Update storage pools information from backend reported info."""
|
||||
if not capability:
|
||||
return
|
||||
@ -275,7 +289,7 @@ class BackendState(object):
|
||||
'host': self.host})
|
||||
del self.pools[pool]
|
||||
|
||||
def _append_backend_info(self, pool_cap):
|
||||
def _append_backend_info(self, pool_cap: Dict[str, Any]) -> None:
|
||||
# Fill backend level info to pool if needed.
|
||||
if not pool_cap.get('volume_backend_name', None):
|
||||
pool_cap['volume_backend_name'] = self.volume_backend_name
|
||||
@ -292,6 +306,7 @@ class BackendState(object):
|
||||
if not pool_cap.get('timestamp', None):
|
||||
pool_cap['timestamp'] = self.updated
|
||||
|
||||
self.capabilities = typing.cast(ReadOnlyDict, self.capabilities)
|
||||
if('filter_function' not in pool_cap and
|
||||
'filter_function' in self.capabilities):
|
||||
pool_cap['filter_function'] = self.capabilities['filter_function']
|
||||
@ -301,14 +316,16 @@ class BackendState(object):
|
||||
pool_cap['goodness_function'] = (
|
||||
self.capabilities['goodness_function'])
|
||||
|
||||
def update_backend(self, capability):
|
||||
def update_backend(self, capability: dict) -> None:
|
||||
self.volume_backend_name = capability.get('volume_backend_name', None)
|
||||
self.vendor_name = capability.get('vendor_name', None)
|
||||
self.driver_version = capability.get('driver_version', None)
|
||||
self.storage_protocol = capability.get('storage_protocol', None)
|
||||
self.updated = capability['timestamp']
|
||||
|
||||
def consume_from_volume(self, volume, update_time=True):
|
||||
def consume_from_volume(self,
|
||||
volume: objects.Volume,
|
||||
update_time: bool = True) -> None:
|
||||
"""Incrementally update host state from a volume."""
|
||||
volume_gb = volume['size']
|
||||
self.allocated_capacity_gb += volume_gb
|
||||
@ -325,7 +342,7 @@ class BackendState(object):
|
||||
self.updated = timeutils.utcnow()
|
||||
LOG.debug("Consumed %s GB from backend: %s", volume['size'], self)
|
||||
|
||||
def __repr__(self):
|
||||
def __repr__(self) -> str:
|
||||
# FIXME(zhiteng) backend level free_capacity_gb isn't as
|
||||
# meaningful as it used to be before pool is introduced, we'd
|
||||
# come up with better representation of HostState.
|
||||
@ -355,15 +372,22 @@ class BackendState(object):
|
||||
|
||||
|
||||
class PoolState(BackendState):
|
||||
def __init__(self, host, cluster_name, capabilities, pool_name):
|
||||
def __init__(self,
|
||||
host: str,
|
||||
cluster_name: Optional[str],
|
||||
capabilities: Union[Optional[ReadOnlyDict], Optional[dict]],
|
||||
pool_name: str):
|
||||
new_host = volume_utils.append_host(host, pool_name)
|
||||
assert new_host is not None
|
||||
new_cluster = volume_utils.append_host(cluster_name, pool_name)
|
||||
super(PoolState, self).__init__(new_host, new_cluster, capabilities)
|
||||
self.pool_name = pool_name
|
||||
# No pools in pool
|
||||
self.pools = None
|
||||
self.pools: dict = {}
|
||||
|
||||
def update_from_volume_capability(self, capability, service=None):
|
||||
def update_from_volume_capability(self,
|
||||
capability: Dict[str, Any],
|
||||
service=None) -> None:
|
||||
"""Update information about a pool from its volume_node info."""
|
||||
LOG.debug("Updating capabilities for %s: %s", self.host, capability)
|
||||
self.update_capabilities(capability, service)
|
||||
@ -425,7 +449,7 @@ class HostManager(object):
|
||||
|
||||
def __init__(self):
|
||||
self.service_states = {} # { <host|cluster>: {<service>: {cap k : v}}}
|
||||
self.backend_state_map = {}
|
||||
self.backend_state_map: Dict[str, BackendState] = {}
|
||||
self.backup_service_states = {}
|
||||
self.filter_handler = filters.BackendFilterHandler('cinder.scheduler.'
|
||||
'filters')
|
||||
@ -441,7 +465,7 @@ class HostManager(object):
|
||||
self._update_backend_state_map(cinder_context.get_admin_context())
|
||||
self.service_states_last_update = {}
|
||||
|
||||
def _choose_backend_filters(self, filter_cls_names):
|
||||
def _choose_backend_filters(self, filter_cls_names) -> list:
|
||||
"""Return a list of available filter names.
|
||||
|
||||
This function checks input filter names against a predefined set
|
||||
@ -466,7 +490,9 @@ class HostManager(object):
|
||||
filter_name=", ".join(bad_filters))
|
||||
return good_filters
|
||||
|
||||
def _choose_backend_weighers(self, weight_cls_names):
|
||||
def _choose_backend_weighers(
|
||||
self,
|
||||
weight_cls_names: Optional[List[str]]) -> list:
|
||||
"""Return a list of available weigher names.
|
||||
|
||||
This function checks input weigher names against a predefined set
|
||||
@ -506,7 +532,7 @@ class HostManager(object):
|
||||
filter_properties)
|
||||
|
||||
def get_weighed_backends(self, backends, weight_properties,
|
||||
weigher_class_names=None):
|
||||
weigher_class_names=None) -> list:
|
||||
"""Weigh the backends."""
|
||||
weigher_classes = self._choose_backend_weighers(weigher_class_names)
|
||||
|
||||
@ -516,8 +542,12 @@ class HostManager(object):
|
||||
LOG.debug("Weighed %s", weighed_backends)
|
||||
return weighed_backends
|
||||
|
||||
def update_service_capabilities(self, service_name, host, capabilities,
|
||||
cluster_name, timestamp):
|
||||
def update_service_capabilities(self,
|
||||
service_name: str,
|
||||
host: str,
|
||||
capabilities: dict,
|
||||
cluster_name: Optional[str],
|
||||
timestamp) -> None:
|
||||
"""Update the per-service capabilities based on this notification."""
|
||||
if service_name not in HostManager.ALLOWED_SERVICE_NAMES:
|
||||
LOG.debug('Ignoring %(service_name)s service update '
|
||||
@ -612,18 +642,20 @@ class HostManager(object):
|
||||
self.get_usage_and_notify(capabilities, updated, backend,
|
||||
timestamp)
|
||||
|
||||
def has_all_capabilities(self):
|
||||
def has_all_capabilities(self) -> bool:
|
||||
return len(self._no_capabilities_backends) == 0
|
||||
|
||||
def _is_just_initialized(self):
|
||||
def _is_just_initialized(self) -> bool:
|
||||
return not self.service_states_last_update
|
||||
|
||||
def first_receive_capabilities(self):
|
||||
def first_receive_capabilities(self) -> bool:
|
||||
return (not self._is_just_initialized() and
|
||||
len(set(self.backend_state_map)) > 0 and
|
||||
len(self._no_capabilities_backends) == 0)
|
||||
|
||||
def _update_backend_state_map(self, context):
|
||||
def _update_backend_state_map(
|
||||
self,
|
||||
context: cinder_context.RequestContext) -> None:
|
||||
|
||||
# Get resource usage across the available volume nodes:
|
||||
topic = constants.VOLUME_TOPIC
|
||||
@ -683,7 +715,9 @@ class HostManager(object):
|
||||
"scheduler cache.", {'backend': backend_key})
|
||||
del self.backend_state_map[backend_key]
|
||||
|
||||
def revert_volume_consumed_capacity(self, pool_name, size):
|
||||
def revert_volume_consumed_capacity(self,
|
||||
pool_name: str,
|
||||
size: int) -> None:
|
||||
for backend_key, state in self.backend_state_map.items():
|
||||
for key in state.pools:
|
||||
pool_state = state.pools[key]
|
||||
@ -691,7 +725,9 @@ class HostManager(object):
|
||||
pool_state.consume_from_volume({'size': -size},
|
||||
update_time=False)
|
||||
|
||||
def get_all_backend_states(self, context):
|
||||
def get_all_backend_states(
|
||||
self,
|
||||
context: cinder_context.RequestContext) -> Iterable:
|
||||
"""Returns a dict of all the backends the HostManager knows about.
|
||||
|
||||
Each of the consumable resources in BackendState are
|
||||
@ -715,7 +751,11 @@ class HostManager(object):
|
||||
|
||||
return all_pools.values()
|
||||
|
||||
def _filter_pools_by_volume_type(self, context, volume_type, pools):
|
||||
def _filter_pools_by_volume_type(
|
||||
self,
|
||||
context: cinder_context.RequestContext,
|
||||
volume_type: objects.VolumeType,
|
||||
pools: dict) -> dict:
|
||||
"""Return the pools filtered by volume type specs"""
|
||||
|
||||
# wrap filter properties only with volume_type
|
||||
@ -732,7 +772,9 @@ class HostManager(object):
|
||||
# filter the pools by value
|
||||
return {k: v for k, v in pools.items() if v in filtered}
|
||||
|
||||
def get_pools(self, context, filters=None):
|
||||
def get_pools(self,
|
||||
context: cinder_context.RequestContext,
|
||||
filters: Optional[dict] = None) -> List[dict]:
|
||||
"""Returns a dict of all pools on all hosts HostManager knows about."""
|
||||
|
||||
self._update_backend_state_map(context)
|
||||
@ -759,7 +801,8 @@ class HostManager(object):
|
||||
if filters:
|
||||
# filter all other items in capabilities
|
||||
for (attr, value) in filters.items():
|
||||
cap = new_pool.get('capabilities').get(attr)
|
||||
cap = new_pool.get('capabilities').\
|
||||
get(attr) # type: ignore
|
||||
if not self._equal_after_convert(cap, value):
|
||||
filtered = True
|
||||
break
|
||||
@ -780,13 +823,21 @@ class HostManager(object):
|
||||
return [dict(name=key, capabilities=value.capabilities)
|
||||
for key, value in all_pools.items()]
|
||||
|
||||
def get_usage_and_notify(self, capa_new, updated_pools, host, timestamp):
|
||||
def get_usage_and_notify(self,
|
||||
capa_new: dict,
|
||||
updated_pools: Iterable[dict],
|
||||
host: str,
|
||||
timestamp) -> None:
|
||||
context = cinder_context.get_admin_context()
|
||||
usage = self._get_usage(capa_new, updated_pools, host, timestamp)
|
||||
|
||||
self._notify_capacity_usage(context, usage)
|
||||
|
||||
def _get_usage(self, capa_new, updated_pools, host, timestamp):
|
||||
def _get_usage(self,
|
||||
capa_new: dict,
|
||||
updated_pools: Iterable[dict],
|
||||
host: str,
|
||||
timestamp) -> List[dict]:
|
||||
pools = capa_new.get('pools')
|
||||
usage = []
|
||||
if pools and isinstance(pools, list):
|
||||
@ -814,7 +865,9 @@ class HostManager(object):
|
||||
usage.append(backend_usage)
|
||||
return usage
|
||||
|
||||
def _get_pool_usage(self, pool, host, timestamp):
|
||||
def _get_pool_usage(self,
|
||||
pool: dict,
|
||||
host: str, timestamp) -> Dict[str, Any]:
|
||||
total = pool["total_capacity_gb"]
|
||||
free = pool["free_capacity_gb"]
|
||||
|
||||
@ -850,7 +903,7 @@ class HostManager(object):
|
||||
|
||||
return pool_usage
|
||||
|
||||
def _get_updated_pools(self, old_capa, new_capa):
|
||||
def _get_updated_pools(self, old_capa: dict, new_capa: dict) -> list:
|
||||
# Judge if the capabilities should be reported.
|
||||
|
||||
new_pools = new_capa.get('pools', [])
|
||||
@ -891,14 +944,16 @@ class HostManager(object):
|
||||
|
||||
return updated_pools
|
||||
|
||||
def _notify_capacity_usage(self, context, usage):
|
||||
def _notify_capacity_usage(self,
|
||||
context: cinder_context.RequestContext,
|
||||
usage: List[dict]) -> None:
|
||||
if usage:
|
||||
for u in usage:
|
||||
volume_utils.notify_about_capacity_usage(
|
||||
context, u, u['type'], None, None)
|
||||
LOG.debug("Publish storage capacity: %s.", usage)
|
||||
|
||||
def _equal_after_convert(self, capability, value):
|
||||
def _equal_after_convert(self, capability, value) -> bool:
|
||||
|
||||
if isinstance(value, type(capability)) or capability is None:
|
||||
return value == capability
|
||||
@ -912,7 +967,7 @@ class HostManager(object):
|
||||
# we just convert them into string to compare them.
|
||||
return str(value) == str(capability)
|
||||
|
||||
def get_backup_host(self, volume, driver=None):
|
||||
def get_backup_host(self, volume: objects.Volume, driver=None) -> str:
|
||||
if volume:
|
||||
volume_host = volume_utils.extract_host(volume.host, 'host')
|
||||
else:
|
||||
@ -932,7 +987,7 @@ class HostManager(object):
|
||||
random.shuffle(services)
|
||||
return services[0] if services else None
|
||||
|
||||
def _get_available_backup_service_host(self, host, az, driver=None):
|
||||
def _get_available_backup_service_host(self, host, az, driver=None) -> str:
|
||||
"""Return an appropriate backup service host."""
|
||||
backup_host = None
|
||||
if not host or not CONF.backup_use_same_host:
|
||||
@ -950,7 +1005,7 @@ class HostManager(object):
|
||||
"""
|
||||
services = []
|
||||
|
||||
def _is_good_service(cap, driver, az):
|
||||
def _is_good_service(cap, driver, az) -> bool:
|
||||
if driver is None and az is None:
|
||||
return True
|
||||
match_driver = cap['driver_name'] == driver if driver else True
|
||||
@ -967,11 +1022,15 @@ class HostManager(object):
|
||||
|
||||
return services
|
||||
|
||||
def _az_matched(self, service, availability_zone):
|
||||
def _az_matched(self,
|
||||
service: objects.Service,
|
||||
availability_zone: Optional[str]) -> bool:
|
||||
return ((not availability_zone) or
|
||||
service.availability_zone == availability_zone)
|
||||
|
||||
def _is_backup_service_enabled(self, availability_zone, host):
|
||||
def _is_backup_service_enabled(self,
|
||||
availability_zone: str,
|
||||
host: str) -> bool:
|
||||
"""Check if there is a backup service available."""
|
||||
topic = constants.BACKUP_TOPIC
|
||||
ctxt = cinder_context.get_admin_context()
|
||||
|
@ -81,7 +81,7 @@ class SchedulerOptions(object):
|
||||
"""Get current UTC. Broken out for testing."""
|
||||
return timeutils.utcnow()
|
||||
|
||||
def get_configuration(self, filename=None):
|
||||
def get_configuration(self, filename=None) -> dict:
|
||||
"""Check the json file for changes and load it if needed."""
|
||||
if not filename:
|
||||
filename = CONF.scheduler_json_config_location
|
||||
|
@ -55,7 +55,7 @@ class CapacityWeigher(weights.BaseHostWeigher):
|
||||
negative number and the weighing has the opposite effect of the default.
|
||||
|
||||
"""
|
||||
def weight_multiplier(self):
|
||||
def weight_multiplier(self) -> float:
|
||||
"""Override the weight multiplier."""
|
||||
return CONF.capacity_weight_multiplier
|
||||
|
||||
@ -75,12 +75,14 @@ class CapacityWeigher(weights.BaseHostWeigher):
|
||||
tmp_weights = super(CapacityWeigher, self).weigh_objects(
|
||||
weighed_obj_list, weight_properties)
|
||||
|
||||
assert self.maxval is not None
|
||||
if math.isinf(self.maxval):
|
||||
# NOTE(jecarey): if all weights were infinite then parent
|
||||
# method returns 0 for all of the weights. Thus self.minval
|
||||
# cannot be infinite at this point
|
||||
copy_weights = [w for w in tmp_weights if not math.isinf(w)]
|
||||
self.maxval = max(copy_weights)
|
||||
assert self.minval is not None
|
||||
offset = (self.maxval - self.minval) * OFFSET_MULT
|
||||
self.maxval += OFFSET_MIN if offset == 0.0 else offset
|
||||
tmp_weights = [self.maxval if math.isinf(w) else w
|
||||
@ -88,7 +90,7 @@ class CapacityWeigher(weights.BaseHostWeigher):
|
||||
|
||||
return tmp_weights
|
||||
|
||||
def _weigh_object(self, host_state, weight_properties):
|
||||
def _weigh_object(self, host_state, weight_properties) -> float:
|
||||
"""Higher weights win. We want spreading to be the default."""
|
||||
free_space = host_state.free_capacity_gb
|
||||
total_space = host_state.total_capacity_gb
|
||||
@ -135,7 +137,7 @@ class AllocatedCapacityWeigher(weights.BaseHostWeigher):
|
||||
positive number and the weighing has the opposite effect of the default.
|
||||
"""
|
||||
|
||||
def weight_multiplier(self):
|
||||
def weight_multiplier(self) -> float:
|
||||
"""Override the weight multiplier."""
|
||||
return CONF.allocated_capacity_weight_multiplier
|
||||
|
||||
|
@ -55,7 +55,7 @@ class StochasticHostWeightHandler(base_weight.BaseWeightHandler):
|
||||
|
||||
# First compute the total weight of all the objects and the upper
|
||||
# bound for each object to "win" the lottery.
|
||||
total_weight = 0
|
||||
total_weight = 0.0
|
||||
table = []
|
||||
for weighed_obj in weighed_objs:
|
||||
total_weight += weighed_obj.weight
|
||||
|
@ -38,7 +38,7 @@ class VolumeNumberWeigher(weights.BaseHostWeigher):
|
||||
number and the weighing has the opposite effect of the default.
|
||||
"""
|
||||
|
||||
def weight_multiplier(self):
|
||||
def weight_multiplier(self) -> float:
|
||||
"""Override the weight multiplier."""
|
||||
return CONF.volume_number_multiplier
|
||||
|
||||
|
@ -8,7 +8,21 @@ cinder/image/glance.py
|
||||
cinder/image/image_utils.py
|
||||
cinder/exception.py
|
||||
cinder/manager.py
|
||||
cinder/scheduler/base_handler.py
|
||||
cinder/scheduler/base_weight.py
|
||||
cinder/scheduler/evaluator/evaluator.py
|
||||
cinder/scheduler/filter_scheduler.py
|
||||
cinder/scheduler/flows/create_volume.py
|
||||
cinder/scheduler/host_manager.py
|
||||
cinder/scheduler/manager.py
|
||||
cinder/scheduler/rpcapi.py
|
||||
cinder/scheduler/scheduler_options.py
|
||||
cinder/scheduler/weights/__init__.py
|
||||
cinder/scheduler/weights/capacity.py
|
||||
cinder/scheduler/weights/chance.py
|
||||
cinder/scheduler/weights/goodness.py
|
||||
cinder/scheduler/weights/stochastic.py
|
||||
cinder/scheduler/weights/volume_number.py
|
||||
cinder/utils.py
|
||||
cinder/volume/__init__.py
|
||||
cinder/volume/flows/api/create_volume.py
|
||||
|
Loading…
x
Reference in New Issue
Block a user