From 1dae83da57c506ef0a79b5f3348312c6c74edce6 Mon Sep 17 00:00:00 2001 From: Hidekazu Nakamura Date: Wed, 6 Dec 2017 15:09:59 +0900 Subject: [PATCH] Add zone migration strategy This patch adds hardware maintenance goal, efficacy and zone migration strategy. Change-Id: I5bfee421780233ffeea8c1539aba720ae554983d Implements: blueprint zone-migration-strategy --- ...e-migration-strategy-10f7656a2a01e607.yaml | 6 + setup.cfg | 2 + watcher/common/exception.py | 8 + watcher/common/nova_helper.py | 3 + watcher/decision_engine/goal/__init__.py | 5 +- .../goal/efficacy/indicators.py | 115 +++ .../decision_engine/goal/efficacy/specs.py | 83 ++ watcher/decision_engine/goal/goals.py | 25 + .../strategy/strategies/__init__.py | 6 +- .../strategy/strategies/base.py | 8 + .../strategy/strategies/zone_migration.py | 975 ++++++++++++++++++ .../strategies/test_zone_migration.py | 719 +++++++++++++ 12 files changed, 1953 insertions(+), 2 deletions(-) create mode 100644 releasenotes/notes/zone-migration-strategy-10f7656a2a01e607.yaml create mode 100644 watcher/decision_engine/strategy/strategies/zone_migration.py create mode 100644 watcher/tests/decision_engine/strategy/strategies/test_zone_migration.py diff --git a/releasenotes/notes/zone-migration-strategy-10f7656a2a01e607.yaml b/releasenotes/notes/zone-migration-strategy-10f7656a2a01e607.yaml new file mode 100644 index 000000000..89c32ba63 --- /dev/null +++ b/releasenotes/notes/zone-migration-strategy-10f7656a2a01e607.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Added strategy "Zone migration" and it's goal "Hardware maintenance". + The strategy migrates many instances and volumes efficiently with + minimum downtime automatically. diff --git a/setup.cfg b/setup.cfg index ca4ef1cf3..51424727d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -57,6 +57,7 @@ watcher_goals = airflow_optimization = watcher.decision_engine.goal.goals:AirflowOptimization noisy_neighbor = watcher.decision_engine.goal.goals:NoisyNeighborOptimization saving_energy = watcher.decision_engine.goal.goals:SavingEnergy + hardware_maintenance = watcher.decision_engine.goal.goals:HardwareMaintenance watcher_scoring_engines = dummy_scorer = watcher.decision_engine.scoring.dummy_scorer:DummyScorer @@ -78,6 +79,7 @@ watcher_strategies = uniform_airflow = watcher.decision_engine.strategy.strategies.uniform_airflow:UniformAirflow noisy_neighbor = watcher.decision_engine.strategy.strategies.noisy_neighbor:NoisyNeighbor storage_capacity_balance = watcher.decision_engine.strategy.strategies.storage_capacity_balance:StorageCapacityBalance + zone_migration = watcher.decision_engine.strategy.strategies.zone_migration:ZoneMigration watcher_actions = migrate = watcher.applier.actions.migration:Migrate diff --git a/watcher/common/exception.py b/watcher/common/exception.py index ab7de4f87..e1999fb74 100644 --- a/watcher/common/exception.py +++ b/watcher/common/exception.py @@ -366,6 +366,14 @@ class ClusterEmpty(WatcherException): msg_fmt = _("The list of compute node(s) in the cluster is empty") +class ComputeClusterEmpty(WatcherException): + msg_fmt = _("The list of compute node(s) in the cluster is empty") + + +class StorageClusterEmpty(WatcherException): + msg_fmt = _("The list of storage node(s) in the cluster is empty") + + class MetricCollectorNotDefined(WatcherException): msg_fmt = _("The metrics resource collector is not defined") diff --git a/watcher/common/nova_helper.py b/watcher/common/nova_helper.py index c50cdc12c..b738a5403 100644 --- a/watcher/common/nova_helper.py +++ b/watcher/common/nova_helper.py @@ -67,6 +67,9 @@ class NovaHelper(object): def get_instance_list(self): return self.nova.servers.list(search_opts={'all_tenants': True}) + def get_flavor_list(self): + return self.nova.flavors.list(**{'is_public': None}) + def get_service(self, service_id): return self.nova.services.find(id=service_id) diff --git a/watcher/decision_engine/goal/__init__.py b/watcher/decision_engine/goal/__init__.py index 1cde9744c..15a21354b 100644 --- a/watcher/decision_engine/goal/__init__.py +++ b/watcher/decision_engine/goal/__init__.py @@ -23,7 +23,10 @@ Unclassified = goals.Unclassified WorkloadBalancing = goals.WorkloadBalancing NoisyNeighbor = goals.NoisyNeighborOptimization SavingEnergy = goals.SavingEnergy +HardwareMaintenance = goals.HardwareMaintenance + __all__ = ("Dummy", "ServerConsolidation", "ThermalOptimization", "Unclassified", "WorkloadBalancing", - "NoisyNeighborOptimization", "SavingEnergy") + "NoisyNeighborOptimization", "SavingEnergy", + "HardwareMaintenance") diff --git a/watcher/decision_engine/goal/efficacy/indicators.py b/watcher/decision_engine/goal/efficacy/indicators.py index ff5e4be49..7363f21de 100644 --- a/watcher/decision_engine/goal/efficacy/indicators.py +++ b/watcher/decision_engine/goal/efficacy/indicators.py @@ -112,3 +112,118 @@ class InstanceMigrationsCount(IndicatorSpecification): def schema(self): return voluptuous.Schema( voluptuous.Range(min=0), required=True) + + +class LiveInstanceMigrateCount(IndicatorSpecification): + def __init__(self): + super(LiveInstanceMigrateCount, self).__init__( + name="live_migrate_instance_count", + description=_("The number of instances actually live migrated."), + unit=None, + ) + + @property + def schema(self): + return voluptuous.Schema( + voluptuous.Range(min=0), required=True) + + +class PlannedLiveInstanceMigrateCount(IndicatorSpecification): + def __init__(self): + super(PlannedLiveInstanceMigrateCount, self).__init__( + name="planned_live_migrate_instance_count", + description=_("The number of instances planned to live migrate."), + unit=None, + ) + + @property + def schema(self): + return voluptuous.Schema( + voluptuous.Range(min=0), required=True) + + +class ColdInstanceMigrateCount(IndicatorSpecification): + def __init__(self): + super(ColdInstanceMigrateCount, self).__init__( + name="cold_migrate_instance_count", + description=_("The number of instances actually cold migrated."), + unit=None, + ) + + @property + def schema(self): + return voluptuous.Schema( + voluptuous.Range(min=0), required=True) + + +class PlannedColdInstanceMigrateCount(IndicatorSpecification): + def __init__(self): + super(PlannedColdInstanceMigrateCount, self).__init__( + name="planned_cold_migrate_instance_count", + description=_("The number of instances planned to cold migrate."), + unit=None, + ) + + @property + def schema(self): + return voluptuous.Schema( + voluptuous.Range(min=0), required=True) + + +class VolumeMigrateCount(IndicatorSpecification): + def __init__(self): + super(VolumeMigrateCount, self).__init__( + name="volume_migrate_count", + description=_("The number of detached volumes actually migrated."), + unit=None, + ) + + @property + def schema(self): + return voluptuous.Schema( + voluptuous.Range(min=0), required=True) + + +class PlannedVolumeMigrateCount(IndicatorSpecification): + def __init__(self): + super(PlannedVolumeMigrateCount, self).__init__( + name="planned_volume_migrate_count", + description=_("The number of detached volumes planned" + " to migrate."), + unit=None, + ) + + @property + def schema(self): + return voluptuous.Schema( + voluptuous.Range(min=0), required=True) + + +class VolumeUpdateCount(IndicatorSpecification): + def __init__(self): + super(VolumeUpdateCount, self).__init__( + name="volume_update_count", + description=_("The number of attached volumes actually" + " migrated."), + unit=None, + ) + + @property + def schema(self): + return voluptuous.Schema( + voluptuous.Range(min=0), required=True) + + +class PlannedVolumeUpdateCount(IndicatorSpecification): + def __init__(self): + super(PlannedVolumeUpdateCount, self).__init__( + name="planned_volume_update_count", + description=_("The number of attached volumes planned to" + " migrate."), + unit=None, + ) + + @property + def schema(self): + return voluptuous.Schema( + voluptuous.Range(min=0), required=True) diff --git a/watcher/decision_engine/goal/efficacy/specs.py b/watcher/decision_engine/goal/efficacy/specs.py index 023a6e0a3..f6b7c437c 100644 --- a/watcher/decision_engine/goal/efficacy/specs.py +++ b/watcher/decision_engine/goal/efficacy/specs.py @@ -53,3 +53,86 @@ class ServerConsolidation(base.EfficacySpecification): )) return global_efficacy + + +class HardwareMaintenance(base.EfficacySpecification): + + def get_indicators_specifications(self): + return [ + indicators.LiveInstanceMigrateCount(), + indicators.PlannedLiveInstanceMigrateCount(), + indicators.ColdInstanceMigrateCount(), + indicators.PlannedColdInstanceMigrateCount(), + indicators.VolumeMigrateCount(), + indicators.PlannedVolumeMigrateCount(), + indicators.VolumeUpdateCount(), + indicators.PlannedVolumeUpdateCount() + ] + + def get_global_efficacy_indicator(self, indicators_map=None): + li_value = 0 + if (indicators_map and + indicators_map.planned_live_migrate_instance_count > 0): + li_value = ( + float(indicators_map.planned_live_migrate_instance_count) + / float(indicators_map.live_migrate_instance_count) + * 100 + ) + + li_indicator = efficacy.Indicator( + name="live_instance_migrate_ratio", + description=_("Ratio of actual live migrated instances " + "to planned live migrate instances."), + unit='%', + value=li_value) + + ci_value = 0 + if (indicators_map and + indicators_map.planned_cold_migrate_instance_count > 0): + ci_value = ( + float(indicators_map.planned_cold_migrate_instance_count) + / float(indicators_map.cold_migrate_instance_count) + * 100 + ) + + ci_indicator = efficacy.Indicator( + name="cold_instance_migrate_ratio", + description=_("Ratio of actual cold migrated instances " + "to planned cold migrate instances."), + unit='%', + value=ci_value) + + dv_value = 0 + if (indicators_map and + indicators_map.planned_volume_migrate_count > 0): + dv_value = (float(indicators_map.planned_volume_migrate_count) / + float(indicators_map. + volume_migrate_count) + * 100) + + dv_indicator = efficacy.Indicator( + name="volume_migrate_ratio", + description=_("Ratio of actual detached volumes migrated to" + " planned detached volumes migrate."), + unit='%', + value=dv_value) + + av_value = 0 + if (indicators_map and + indicators_map.planned_volume_update_count > 0): + av_value = (float(indicators_map.planned_volume_update_count) / + float(indicators_map. + volume_update_count) + * 100) + + av_indicator = efficacy.Indicator( + name="volume_update_ratio", + description=_("Ratio of actual attached volumes migrated to" + " planned attached volumes migrate."), + unit='%', + value=av_value) + + return [li_indicator, + ci_indicator, + dv_indicator, + av_indicator] diff --git a/watcher/decision_engine/goal/goals.py b/watcher/decision_engine/goal/goals.py index 32400d897..965d69c9e 100644 --- a/watcher/decision_engine/goal/goals.py +++ b/watcher/decision_engine/goal/goals.py @@ -216,3 +216,28 @@ class SavingEnergy(base.Goal): def get_efficacy_specification(cls): """The efficacy spec for the current goal""" return specs.Unclassified() + + +class HardwareMaintenance(base.Goal): + """HardwareMaintenance + + This goal is to migrate instances and volumes on a set of compute nodes + and storage from nodes under maintenance + """ + + @classmethod + def get_name(cls): + return "hardware_maintenance" + + @classmethod + def get_display_name(cls): + return _("Hardware Maintenance") + + @classmethod + def get_translatable_display_name(cls): + return "Hardware Maintenance" + + @classmethod + def get_efficacy_specification(cls): + """The efficacy spec for the current goal""" + return specs.HardwareMaintenance() diff --git a/watcher/decision_engine/strategy/strategies/__init__.py b/watcher/decision_engine/strategy/strategies/__init__.py index 30b5d2466..fc3a20c8e 100644 --- a/watcher/decision_engine/strategy/strategies/__init__.py +++ b/watcher/decision_engine/strategy/strategies/__init__.py @@ -28,6 +28,8 @@ from watcher.decision_engine.strategy.strategies import \ vm_workload_consolidation from watcher.decision_engine.strategy.strategies import workload_balance from watcher.decision_engine.strategy.strategies import workload_stabilization +from watcher.decision_engine.strategy.strategies import zone_migration + Actuator = actuation.Actuator BasicConsolidation = basic_consolidation.BasicConsolidation @@ -41,8 +43,10 @@ WorkloadBalance = workload_balance.WorkloadBalance WorkloadStabilization = workload_stabilization.WorkloadStabilization UniformAirflow = uniform_airflow.UniformAirflow NoisyNeighbor = noisy_neighbor.NoisyNeighbor +ZoneMigration = zone_migration.ZoneMigration __all__ = ("Actuator", "BasicConsolidation", "OutletTempControl", "DummyStrategy", "DummyWithScorer", "VMWorkloadConsolidation", "WorkloadBalance", "WorkloadStabilization", "UniformAirflow", - "NoisyNeighbor", "SavingEnergy", "StorageCapacityBalance") + "NoisyNeighbor", "SavingEnergy", "StorageCapacityBalance", + "ZoneMigration") diff --git a/watcher/decision_engine/strategy/strategies/base.py b/watcher/decision_engine/strategy/strategies/base.py index 13750aa08..b4a7a63f4 100644 --- a/watcher/decision_engine/strategy/strategies/base.py +++ b/watcher/decision_engine/strategy/strategies/base.py @@ -399,3 +399,11 @@ class SavingEnergyBaseStrategy(BaseStrategy): @classmethod def get_goal_name(cls): return "saving_energy" + + +@six.add_metaclass(abc.ABCMeta) +class ZoneMigrationBaseStrategy(BaseStrategy): + + @classmethod + def get_goal_name(cls): + return "hardware_maintenance" diff --git a/watcher/decision_engine/strategy/strategies/zone_migration.py b/watcher/decision_engine/strategy/strategies/zone_migration.py new file mode 100644 index 000000000..98dc8f52c --- /dev/null +++ b/watcher/decision_engine/strategy/strategies/zone_migration.py @@ -0,0 +1,975 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dateutil.parser import parse +import six + +from oslo_log import log + +from cinderclient.v2.volumes import Volume +from novaclient.v2.servers import Server +from watcher._i18n import _ +from watcher.common import cinder_helper +from watcher.common import exception as wexc +from watcher.common import nova_helper +from watcher.decision_engine.model import element +from watcher.decision_engine.strategy.strategies import base + +LOG = log.getLogger(__name__) + +INSTANCE = "instance" +VOLUME = "volume" +ACTIVE = "active" +PAUSED = 'paused' +STOPPED = "stopped" +status_ACTIVE = 'ACTIVE' +status_PAUSED = 'PAUSED' +status_SHUTOFF = 'SHUTOFF' +AVAILABLE = "available" +IN_USE = "in-use" + + +class ZoneMigration(base.ZoneMigrationBaseStrategy): + + def __init__(self, config, osc=None): + + super(ZoneMigration, self).__init__(config, osc) + self._nova = None + self._cinder = None + + self.live_count = 0 + self.planned_live_count = 0 + self.cold_count = 0 + self.planned_cold_count = 0 + self.volume_count = 0 + self.planned_volume_count = 0 + self.volume_update_count = 0 + self.planned_volume_update_count = 0 + + @classmethod + def get_name(cls): + return "zone_migration" + + @classmethod + def get_display_name(cls): + return _("Zone migration") + + @classmethod + def get_translatable_display_name(cls): + return "Zone migration" + + @classmethod + def get_schema(cls): + return { + "properties": { + "compute_nodes": { + "type": "array", + "items": { + "type": "object", + "properties": { + "src_node": { + "description": "Compute node from which" + " instances migrate", + "type": "string" + }, + "dst_node": { + "description": "Compute node to which" + "instances migrate", + "type": "string" + } + }, + "required": ["src_node"], + "additionalProperties": False + } + }, + "storage_pools": { + "type": "array", + "items": { + "type": "object", + "properties": { + "src_pool": { + "description": "Storage pool from which" + " volumes migrate", + "type": "string" + }, + "dst_pool": { + "description": "Storage pool to which" + " volumes migrate", + "type": "string" + }, + "src_type": { + "description": "Volume type from which" + " volumes migrate", + "type": "string" + }, + "dst_type": { + "description": "Volume type to which" + " volumes migrate", + "type": "string" + } + }, + "required": ["src_pool", "src_type", "dst_type"], + "additionalProperties": False + } + }, + "parallel_total": { + "description": "The number of actions to be run in" + " parallel in total", + "type": "integer", "minimum": 0, "default": 6 + }, + "parallel_per_node": { + "description": "The number of actions to be run in" + " parallel per compute node", + "type": "integer", "minimum": 0, "default": 2 + }, + "parallel_per_pool": { + "description": "The number of actions to be run in" + " parallel per storage host", + "type": "integer", "minimum": 0, "default": 2 + }, + "priority": { + "description": "List prioritizes instances and volumes", + "type": "object", + "properties": { + "project": { + "type": "array", "items": {"type": "string"} + }, + "compute_node": { + "type": "array", "items": {"type": "string"} + }, + "storage_pool": { + "type": "array", "items": {"type": "string"} + }, + "compute": { + "enum": ["vcpu_num", "mem_size", "disk_size", + "created_at"] + }, + "storage": { + "enum": ["size", "created_at"] + } + }, + "additionalProperties": False + }, + "with_attached_volume": { + "description": "instance migrates just after attached" + " volumes or not", + "type": "boolean", "default": False + }, + }, + "additionalProperties": False + } + + @property + def migrate_compute_nodes(self): + """Get compute nodes from input_parameters + + :returns: compute nodes + e.g. [{"src_node": "w012", "dst_node": "w022"}, + {"src_node": "w013", "dst_node": "w023"}] + """ + + return self.input_parameters.get('compute_nodes') + + @property + def migrate_storage_pools(self): + """Get storage pools from input_parameters + + :returns: storage pools + e.g. [ + {"src_pool": "src1@back1#pool1", + "dst_pool": "dst1@back1#pool1", + "src_type": "src1_type", + "dst_type": "dst1_type"}, + {"src_pool": "src1@back2#pool1", + "dst_pool": "dst1@back2#pool1", + "src_type": "src1_type", + "dst_type": "dst1_type"} + ] + """ + + return self.input_parameters.get('storage_pools') + + @property + def parallel_total(self): + return self.input_parameters.get('parallel_total') + + @property + def parallel_per_node(self): + return self.input_parameters.get('parallel_per_node') + + @property + def parallel_per_pool(self): + return self.input_parameters.get('parallel_per_pool') + + @property + def priority(self): + """Get priority from input_parameters + + :returns: priority map + e.g. + { + "project": ["pj1"], + "compute_node": ["compute1", "compute2"], + "compute": ["vcpu_num"], + "storage_pool": ["pool1", "pool2"], + "storage": ["size", "created_at"] + } + """ + + return self.input_parameters.get('priority') + + @property + def with_attached_volume(self): + return self.input_parameters.get('with_attached_volume') + + @property + def nova(self): + if self._nova is None: + self._nova = nova_helper.NovaHelper(osc=self.osc) + return self._nova + + @property + def cinder(self): + if self._cinder is None: + self._cinder = cinder_helper.CinderHelper(osc=self.osc) + return self._cinder + + def get_available_compute_nodes(self): + default_node_scope = [element.ServiceState.ENABLED.value, + element.ServiceState.DISABLED.value] + return {uuid: cn for uuid, cn in + self.compute_model.get_all_compute_nodes().items() + if cn.state == element.ServiceState.ONLINE.value and + cn.status in default_node_scope} + + def get_available_storage_nodes(self): + default_node_scope = [element.ServiceState.ENABLED.value, + element.ServiceState.DISABLED.value] + return {uuid: cn for uuid, cn in + self.storage_model.get_all_storage_nodes().items() + if cn.state == element.ServiceState.ONLINE.value and + cn.status in default_node_scope} + + def pre_execute(self): + """Pre-execution phase + + This can be used to fetch some pre-requisites or data. + """ + LOG.info("Initializing zone migration Strategy") + + if len(self.get_available_compute_nodes()) == 0: + raise wexc.ComputeClusterEmpty() + + if len(self.get_available_storage_nodes()) == 0: + raise wexc.StorageClusterEmpty() + + LOG.debug(self.compute_model.to_string()) + LOG.debug(self.storage_model.to_string()) + + def do_execute(self): + """Strategy execution phase + + """ + filtered_targets = self.filtered_targets() + self.set_migration_count(filtered_targets) + + total_limit = self.parallel_total + per_node_limit = self.parallel_per_node + per_pool_limit = self.parallel_per_pool + action_counter = ActionCounter(total_limit, + per_pool_limit, per_node_limit) + + for k, targets in six.iteritems(filtered_targets): + if k == VOLUME: + self.volumes_migration(targets, action_counter) + elif k == INSTANCE: + if self.volume_count == 0 and self.volume_update_count == 0: + # if with_attached_volume is true, + # instance having attached volumes already migrated, + # migrate instances which does not have attached volumes + if self.with_attached_volume: + targets = self.instances_no_attached(targets) + self.instances_migration(targets, action_counter) + else: + self.instances_migration(targets, action_counter) + + LOG.debug("action total: %s, pools: %s, nodes %s " % ( + action_counter.total_count, + action_counter.per_pool_count, + action_counter.per_node_count)) + + def post_execute(self): + """Post-execution phase + + This can be used to compute the global efficacy + """ + self.solution.set_efficacy_indicators( + live_migrate_instance_count=self.live_count, + planned_live_migrate_instance_count=self.planned_live_count, + cold_migrate_instance_count=self.cold_count, + planned_cold_migrate_instance_count=self.planned_cold_count, + volume_migrate_count=self.volume_count, + planned_volume_migrate_count=self.planned_volume_count, + volume_update_count=self.volume_update_count, + planned_volume_update_count=self.planned_volume_update_count + ) + + def set_migration_count(self, targets): + """Set migration count + + :param targets: dict of instance object and volume object list + keys of dict are instance and volume + """ + for instance in targets.get('instance', []): + if self.is_live(instance): + self.live_count += 1 + elif self.is_cold(instance): + self.cold_count += 1 + for volume in targets.get('volume', []): + if self.is_available(volume): + self.volume_count += 1 + elif self.is_in_use(volume): + self.volume_update_count += 1 + + def is_live(self, instance): + status = getattr(instance, 'status') + state = getattr(instance, 'OS-EXT-STS:vm_state') + return (status == status_ACTIVE and state == ACTIVE + ) or (status == status_PAUSED and state == PAUSED) + + def is_cold(self, instance): + status = getattr(instance, 'status') + state = getattr(instance, 'OS-EXT-STS:vm_state') + return status == status_SHUTOFF and state == STOPPED + + def is_available(self, volume): + return getattr(volume, 'status') == AVAILABLE + + def is_in_use(self, volume): + return getattr(volume, 'status') == IN_USE + + def instances_no_attached(instances): + return [i for i in instances + if not getattr(i, "os-extended-volumes:volumes_attached")] + + def get_host_by_pool(self, pool): + """Get host name from pool name + + Utility method to get host name from pool name + which is formatted as host@backend#pool. + + :param pool: pool name + :returns: host name + """ + + # TODO(hidekazu) use this + # mapping = zonemgr.get_host_pool_mapping() + # for host, pools in six.iteritems(mapping): + # for _pool in pools: + # if pool == _pool: + # return host + # LOG.warning(self.msg_not_exist_corresponding_host % pool) + # return pool + return pool.split('@')[0] + + def get_dst_node(self, src_node): + """Get destination node from self.migration_compute_nodes + + :param src_node: compute node name + :returns: destination node name + """ + for node in self.migrate_compute_nodes: + if node.get("src_node") == src_node: + return node.get("dst_node") + + def get_dst_pool_and_type(self, src_pool, src_type): + """Get destination pool and type from self.migration_storage_pools + + :param src_pool: storage pool name + :param src_type: storage volume type + :returns: set of storage pool name and volume type name + """ + for pool in self.migrate_storage_pools: + if pool.get("src_pool") == src_pool: + return (pool.get("dst_pool", None), + pool.get("dst_type")) + + def volumes_migration(self, volumes, action_counter): + for volume in volumes: + if action_counter.is_total_max(): + LOG.debug('total reached limit') + break + + pool = getattr(volume, 'os-vol-host-attr:host') + if action_counter.is_pool_max(pool): + LOG.debug("%s has objects to be migrated, but it has" + " reached the limit of parallelization." % pool) + continue + + src_type = volume.volume_type + dst_pool, dst_type = self.get_dst_pool_and_type(pool, src_type) + LOG.debug(src_type) + LOG.debug("%s %s" % (dst_pool, dst_type)) + + if self.is_available(volume): + if src_type == dst_type: + self._volume_migrate(volume.id, dst_pool) + else: + self._volume_retype(volume.id, dst_type) + elif self.is_in_use(volume): + self._volume_update(volume.id, dst_type) + + # if with_attached_volume is True, migrate attaching instances + if self.with_attached_volume: + instances = [self.nova.find_instance(dic.get('server_id')) + for dic in volume.attachments] + self.instances_migration(instances, action_counter) + + action_counter.add_pool(pool) + + def instances_migration(self, instances, action_counter): + + for instance in instances: + src_node = getattr(instance, 'OS-EXT-SRV-ATTR:host') + + if action_counter.is_total_max(): + LOG.debug('total reached limit') + break + + if action_counter.is_node_max(src_node): + LOG.debug("%s has objects to be migrated, but it has" + " reached the limit of parallelization." % src_node) + continue + + dst_node = self.get_dst_node(src_node) + if self.is_live(instance): + self._live_migration(instance.id, src_node, dst_node) + elif self.is_cold(instance): + self._cold_migration(instance.id, src_node, dst_node) + + action_counter.add_node(src_node) + + def _live_migration(self, resource_id, src_node, dst_node): + parameters = {"migration_type": "live", + "destination_node": dst_node, + "source_node": src_node} + self.solution.add_action( + action_type="migrate", + resource_id=resource_id, + input_parameters=parameters) + self.planned_live_count += 1 + + def _cold_migration(self, resource_id, src_node, dst_node): + parameters = {"migration_type": "cold", + "destination_node": dst_node, + "source_node": src_node} + self.solution.add_action( + action_type="migrate", + resource_id=resource_id, + input_parameters=parameters) + self.planned_cold_count += 1 + + def _volume_update(self, resource_id, dst_type): + parameters = {"migration_type": "swap", + "destination_type": dst_type} + self.solution.add_action( + action_type="volume_migrate", + resource_id=resource_id, + input_parameters=parameters) + self.planned_volume_update_count += 1 + + def _volume_migrate(self, resource_id, dst_pool): + parameters = {"migration_type": "migrate", + "destination_node": dst_pool} + self.solution.add_action( + action_type="volume_migrate", + resource_id=resource_id, + input_parameters=parameters) + self.planned_volume_count += 1 + + def _volume_retype(self, resource_id, dst_type): + parameters = {"migration_type": "retype", + "destination_type": dst_type} + self.solution.add_action( + action_type="volume_migrate", + resource_id=resource_id, + input_parameters=parameters) + self.planned_volume_count += 1 + + def get_src_node_list(self): + """Get src nodes from migrate_compute_nodes + + :returns: src node name list + """ + if not self.migrate_compute_nodes: + return None + + return [v for dic in self.migrate_compute_nodes + for k, v in dic.items() if k == "src_node"] + + def get_src_pool_list(self): + """Get src pools from migrate_storage_pools + + :returns: src pool name list + """ + + return [v for dic in self.migrate_storage_pools + for k, v in dic.items() if k == "src_pool"] + + def get_instances(self): + """Get migrate target instances + + :returns: instance list on src nodes and compute scope + """ + + src_node_list = self.get_src_node_list() + + if not src_node_list: + return None + + return [i for i in self.nova.get_instance_list() + if getattr(i, 'OS-EXT-SRV-ATTR:host') in src_node_list + and self.compute_model.get_instance_by_uuid(i.id)] + + def get_volumes(self): + """Get migrate target volumes + + :returns: volume list on src pools and storage scope + """ + + src_pool_list = self.get_src_pool_list() + + return [i for i in self.cinder.get_volume_list() + if getattr(i, 'os-vol-host-attr:host') in src_pool_list + and self.storage_model.get_volume_by_uuid(i.id)] + + def filtered_targets(self): + """Filter targets + + prioritize instances and volumes based on priorities + from input parameters. + + :returns: prioritized targets + """ + result = {} + + if self.migrate_compute_nodes: + result["instance"] = self.get_instances() + + if self.migrate_storage_pools: + result["volume"] = self.get_volumes() + + if not self.priority: + return result + + filter_actions = self.get_priority_filter_list() + LOG.debug(filter_actions) + + # apply all filters set in input prameter + for action in list(reversed(filter_actions)): + LOG.debug(action) + result = action.apply_filter(result) + + return result + + def get_priority_filter_list(self): + """Get priority filters + + :returns: list of filter object with arguments in self.priority + """ + + filter_list = [] + priority_filter_map = self.get_priority_filter_map() + + for k, v in six.iteritems(self.priority): + if k in priority_filter_map: + filter_list.append(priority_filter_map[k](v)) + + return filter_list + + def get_priority_filter_map(self): + """Get priority filter map + + :returns: filter map + key is the key in priority input parameters. + value is filter class for prioritizing. + """ + + return { + "project": ProjectSortFilter, + "compute_node": ComputeHostSortFilter, + "storage_pool": StorageHostSortFilter, + "compute": ComputeSpecSortFilter, + "storage": StorageSpecSortFilter, + } + + +class ActionCounter(object): + """Manage the number of actions in parallel""" + + def __init__(self, total_limit=6, per_pool_limit=2, per_node_limit=2): + """Initialize dict of host and the number of action + + :param total_limit: total number of actions + :param per_pool_limit: the number of migrate actions per storage pool + :param per_node_limit: the number of migrate actions per compute node + """ + self.total_limit = total_limit + self.per_pool_limit = per_pool_limit + self.per_node_limit = per_node_limit + self.per_pool_count = {} + self.per_node_count = {} + self.total_count = 0 + + def add_pool(self, pool): + """Increment the number of actions on host and total count + + :param pool: storage pool + :returns: True if incremented, False otherwise + """ + if pool not in self.per_pool_count: + self.per_pool_count[pool] = 0 + + if not self.is_total_max() and not self.is_pool_max(pool): + self.per_pool_count[pool] += 1 + self.total_count += 1 + LOG.debug("total: %s, per_pool: %s" % ( + self.total_count, self.per_pool_count)) + return True + return False + + def add_node(self, node): + """Add the number of actions on node + + :param host: compute node + :returns: True if action can be added, False otherwise + """ + if node not in self.per_node_count: + self.per_node_count[node] = 0 + + if not self.is_total_max() and not self.is_node_max(node): + self.per_node_count[node] += 1 + self.total_count += 1 + LOG.debug("total: %s, per_node: %s" % ( + self.total_count, self.per_node_count)) + return True + return False + + def is_total_max(self): + """Check if total count reached limit + + :returns: True if total count reached limit, False otherwise + """ + return self.total_count >= self.total_limit + + def is_pool_max(self, pool): + """Check if per pool count reached limit + + :returns: True if count reached limit, False otherwise + """ + if pool not in self.per_pool_count: + self.per_pool_count[pool] = 0 + LOG.debug("the number of parallel per pool %s is %s " % + (pool, self.per_pool_count[pool])) + LOG.debug("per pool limit is %s" % self.per_pool_limit) + return self.per_pool_count[pool] >= self.per_pool_limit + + def is_node_max(self, node): + """Check if per node count reached limit + + :returns: True if count reached limit, False otherwise + """ + if node not in self.per_node_count: + self.per_node_count[node] = 0 + return self.per_node_count[node] >= self.per_node_limit + + +class BaseFilter(object): + """Base class for Filter""" + + apply_targets = ('ALL',) + + def __init__(self, values=[], **kwargs): + """initialization + + :param values: priority value + """ + + if not isinstance(values, list): + values = [values] + + self.condition = values + + def apply_filter(self, targets): + """apply filter to targets + + :param targets: dict of instance object and volume object list + keys of dict are instance and volume + """ + + if not targets: + return {} + + for cond in list(reversed(self.condition)): + for k, v in six.iteritems(targets): + if not self.is_allowed(k): + continue + LOG.debug("filter:%s with the key: %s" % (cond, k)) + targets[k] = self.exec_filter(v, cond) + + LOG.debug(targets) + return targets + + def is_allowed(self, key): + return (key in self.apply_targets) or ('ALL' in self.apply_targets) + + def exec_filter(self, items, sort_key): + """This is implemented by sub class""" + return items + + +class SortMovingToFrontFilter(BaseFilter): + """This is to move to front if a condition is True""" + + def exec_filter(self, items, sort_key): + return self.sort_moving_to_front(items, + sort_key, + self.compare_func) + + def sort_moving_to_front(self, items, sort_key=None, compare_func=None): + if not compare_func or not sort_key: + return items + + for item in list(reversed(items)): + if compare_func(item, sort_key): + items.remove(item) + items.insert(0, item) + return items + + def compare_func(self, item, sort_key): + return True + + +class ProjectSortFilter(SortMovingToFrontFilter): + """ComputeHostSortFilter""" + + apply_targets = ('instance', 'volume') + + def __init__(self, values=[], **kwargs): + super(ProjectSortFilter, self).__init__(values, **kwargs) + + def compare_func(self, item, sort_key): + """Compare project id of item with sort_key + + :param item: instance object or volume object + :param sort_key: project id + :returns: true: project id of item equals sort_key + false: otherwise + """ + + project_id = self.get_project_id(item) + LOG.debug("project_id: %s, sort_key: %s" % (project_id, sort_key)) + return project_id == sort_key + + def get_project_id(self, item): + """get project id of item + + :param item: instance object or volume object + :returns: project id + """ + + if isinstance(item, Volume): + return getattr(item, 'os-vol-tenant-attr:tenant_id') + elif isinstance(item, Server): + return item.tenant_id + + +class ComputeHostSortFilter(SortMovingToFrontFilter): + """ComputeHostSortFilter""" + + apply_targets = ('instance',) + + def __init__(self, values=[], **kwargs): + super(ComputeHostSortFilter, self).__init__(values, **kwargs) + + def compare_func(self, item, sort_key): + """Compare compute name of item with sort_key + + :param item: instance object + :param sort_key: compute host name + :returns: true: compute name on which intance is equals sort_key + false: otherwise + """ + + host = self.get_host(item) + LOG.debug("host: %s, sort_key: %s" % (host, sort_key)) + return host == sort_key + + def get_host(self, item): + """get hostname on which item is + + :param item: instance object + :returns: hostname on which item is + """ + + return getattr(item, 'OS-EXT-SRV-ATTR:host') + + +class StorageHostSortFilter(SortMovingToFrontFilter): + """StoragehostSortFilter""" + + apply_targets = ('volume',) + + def compare_func(self, item, sort_key): + """Compare pool name of item with sort_key + + :param item: volume object + :param sort_key: storage pool name + :returns: true: pool name on which intance is equals sort_key + false: otherwise + """ + + host = self.get_host(item) + LOG.debug("host: %s, sort_key: %s" % (host, sort_key)) + return host == sort_key + + def get_host(self, item): + return getattr(item, 'os-vol-host-attr:host') + + +class ComputeSpecSortFilter(BaseFilter): + """ComputeSpecSortFilter""" + + apply_targets = ('instance',) + accept_keys = ['vcpu_num', 'mem_size', 'disk_size', 'created_at'] + + def __init__(self, values=[], **kwargs): + super(ComputeSpecSortFilter, self).__init__(values, **kwargs) + self._nova = None + + @property + def nova(self): + if self._nova is None: + self._nova = nova_helper.NovaHelper() + return self._nova + + def exec_filter(self, items, sort_key): + result = items + + if sort_key not in self.accept_keys: + LOG.warning("Invalid key is specified: %s" % sort_key) + else: + result = self.get_sorted_items(items, sort_key) + + return result + + def get_sorted_items(self, items, sort_key): + """Sort items by sort_key + + :param items: instances + :param sort_key: sort_key + :returns: items sorted by sort_key + """ + + result = items + flavors = self.nova.get_flavor_list() + + if sort_key == 'mem_size': + result = sorted(items, + key=lambda x: float(self.get_mem_size(x, flavors)), + reverse=True) + elif sort_key == 'vcpu_num': + result = sorted(items, + key=lambda x: float(self.get_vcpu_num(x, flavors)), + reverse=True) + elif sort_key == 'disk_size': + result = sorted(items, + key=lambda x: float( + self.get_disk_size(x, flavors)), + reverse=True) + elif sort_key == 'created_at': + result = sorted(items, + key=lambda x: parse(getattr(x, sort_key)), + reverse=False) + + return result + + def get_mem_size(self, item, flavors): + """Get memory size of item + + :param item: instance + :param flavors: flavors + :returns: memory size of item + """ + + LOG.debug("item: %s, flavors: %s" % (item, flavors)) + for flavor in flavors: + LOG.debug("item.flavor: %s, flavor: %s" % (item.flavor, flavor)) + if item.flavor.get('id') == flavor.id: + LOG.debug("flavor.ram: %s" % flavor.ram) + return flavor.ram + + def get_vcpu_num(self, item, flavors): + """Get vcpu number of item + + :param item: instance + :param flavors: flavors + :returns: vcpu number of item + """ + + LOG.debug("item: %s, flavors: %s" % (item, flavors)) + for flavor in flavors: + LOG.debug("item.flavor: %s, flavor: %s" % (item.flavor, flavor)) + if item.flavor.get('id') == flavor.id: + LOG.debug("flavor.vcpus: %s" % flavor.vcpus) + return flavor.vcpus + + def get_disk_size(self, item, flavors): + """Get disk size of item + + :param item: instance + :param flavors: flavors + :returns: disk size of item + """ + + LOG.debug("item: %s, flavors: %s" % (item, flavors)) + for flavor in flavors: + LOG.debug("item.flavor: %s, flavor: %s" % (item.flavor, flavor)) + if item.flavor.get('id') == flavor.id: + LOG.debug("flavor.disk: %s" % flavor.disk) + return flavor.disk + + +class StorageSpecSortFilter(BaseFilter): + """StorageSpecSortFilter""" + + apply_targets = ('volume',) + accept_keys = ['size', 'created_at'] + + def exec_filter(self, items, sort_key): + result = items + + if sort_key not in self.accept_keys: + LOG.warning("Invalid key is specified: %s" % sort_key) + return result + + if sort_key == 'created_at': + result = sorted(items, + key=lambda x: parse(getattr(x, sort_key)), + reverse=False) + else: + result = sorted(items, + key=lambda x: float(getattr(x, sort_key)), + reverse=True) + LOG.debug(result) + return result diff --git a/watcher/tests/decision_engine/strategy/strategies/test_zone_migration.py b/watcher/tests/decision_engine/strategy/strategies/test_zone_migration.py new file mode 100644 index 000000000..ba204d32a --- /dev/null +++ b/watcher/tests/decision_engine/strategy/strategies/test_zone_migration.py @@ -0,0 +1,719 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import mock + +import cinderclient +import novaclient +from watcher.common import cinder_helper +from watcher.common import clients +from watcher.common import exception +from watcher.common import nova_helper +from watcher.common import utils +from watcher.decision_engine.model import model_root +from watcher.decision_engine.strategy import strategies +from watcher.tests import base +from watcher.tests.decision_engine.model import faker_cluster_state + + +class TestZoneMigration(base.TestCase): + + def setUp(self): + super(TestZoneMigration, self).setUp() + # fake compute cluster + self.fake_c_cluster = faker_cluster_state.FakerModelCollector() + + # fake storage cluster + self.fake_s_cluster = faker_cluster_state.FakerStorageModelCollector() + + p_c_model = mock.patch.object( + strategies.ZoneMigration, "compute_model", + new_callable=mock.PropertyMock) + self.m_c_model = p_c_model.start() + self.addCleanup(p_c_model.stop) + + p_s_model = mock.patch.object( + strategies.ZoneMigration, "storage_model", + new_callable=mock.PropertyMock) + self.m_s_model = p_s_model.start() + self.addCleanup(p_s_model.stop) + + p_migrate_compute_nodes = mock.patch.object( + strategies.ZoneMigration, "migrate_compute_nodes", + new_callable=mock.PropertyMock) + self.m_migrate_compute_nodes = p_migrate_compute_nodes.start() + self.addCleanup(p_migrate_compute_nodes.stop) + + p_migrate_storage_pools = mock.patch.object( + strategies.ZoneMigration, "migrate_storage_pools", + new_callable=mock.PropertyMock) + self.m_migrate_storage_pools = p_migrate_storage_pools.start() + self.addCleanup(p_migrate_storage_pools.stop) + + p_parallel_total = mock.patch.object( + strategies.ZoneMigration, "parallel_total", + new_callable=mock.PropertyMock) + self.m_parallel_total = p_parallel_total.start() + self.addCleanup(p_parallel_total.stop) + + p_parallel_per_node = mock.patch.object( + strategies.ZoneMigration, "parallel_per_node", + new_callable=mock.PropertyMock) + self.m_parallel_per_node = p_parallel_per_node.start() + self.addCleanup(p_parallel_per_node.stop) + + p_parallel_per_pool = mock.patch.object( + strategies.ZoneMigration, "parallel_per_pool", + new_callable=mock.PropertyMock) + self.m_parallel_per_pool = p_parallel_per_pool.start() + self.addCleanup(p_parallel_per_pool.stop) + + p_audit_scope = mock.patch.object( + strategies.ZoneMigration, "audit_scope", + new_callable=mock.PropertyMock + ) + self.m_audit_scope = p_audit_scope.start() + self.addCleanup(p_audit_scope.stop) + + p_priority = mock.patch.object( + strategies.ZoneMigration, "priority", + new_callable=mock.PropertyMock + ) + self.m_priority = p_priority.start() + self.addCleanup(p_priority.stop) + + model = self.fake_c_cluster.generate_scenario_1() + self.m_c_model.return_value = model + + model = self.fake_s_cluster.generate_scenario_1() + self.m_s_model.return_value = model + + self.m_parallel_total.return_value = 6 + self.m_parallel_per_node.return_value = 2 + self.m_parallel_per_pool.return_value = 2 + self.m_audit_scope.return_value = mock.Mock() + self.m_migrate_compute_nodes.return_value = [ + {"src_node": "src1", "dst_node": "dst1"}, + {"src_node": "src2", "dst_node": "dst2"} + ] + self.m_migrate_storage_pools.return_value = [ + {"src_pool": "src1@back1#pool1", "dst_pool": "dst1@back1#pool1", + "src_type": "type1", "dst_type": "type1"}, + {"src_pool": "src2@back1#pool1", "dst_pool": "dst2@back2#pool1", + "src_type": "type2", "dst_type": "type3"} + ] + self.m_audit_scope.return_value = mock.Mock() + + self.strategy = strategies.ZoneMigration( + config=mock.Mock()) + + self.m_osc_cls = mock.Mock() + self.m_osc = mock.Mock(spec=clients.OpenStackClients) + self.m_osc_cls.return_value = self.m_osc + m_openstack_clients = mock.patch.object( + clients, "OpenStackClients", self.m_osc_cls) + m_openstack_clients.start() + self.addCleanup(m_openstack_clients.stop) + + self.m_n_helper_cls = mock.Mock() + self.m_n_helper = mock.Mock(spec=nova_helper.NovaHelper) + self.m_n_helper_cls.return_value = self.m_n_helper + m_nova_helper = mock.patch.object( + nova_helper, "NovaHelper", self.m_n_helper_cls) + m_nova_helper.start() + self.addCleanup(m_nova_helper.stop) + + self.m_c_helper_cls = mock.Mock() + self.m_c_helper = mock.Mock(spec=cinder_helper.CinderHelper) + self.m_c_helper_cls.return_value = self.m_c_helper + m_cinder_helper = mock.patch.object( + cinder_helper, "CinderHelper", self.m_c_helper_cls) + m_cinder_helper.start() + self.addCleanup(m_cinder_helper.stop) + + def test_exception_empty_compute_model(self): + model = model_root.ModelRoot() + self.m_c_model.return_value = model + self.assertRaises(exception.ComputeClusterEmpty, self.strategy.execute) + + def test_exception_empty_storage_model(self): + c_model = self.fake_c_cluster.generate_scenario_1() + self.m_c_model.return_value = c_model + + s_model = model_root.StorageModelRoot() + self.m_s_model.return_value = s_model + self.assertRaises(exception.StorageClusterEmpty, self.strategy.execute) + + @staticmethod + def fake_instance(**kwargs): + instance = mock.MagicMock(spec=novaclient.v2.servers.Server) + instance.id = kwargs.get('id', utils.generate_uuid()) + instance.status = kwargs.get('status', 'ACTIVE') + instance.tenant_id = kwargs.get('project_id', None) + instance.flavor = {'id': kwargs.get('flavor_id', None)} + setattr(instance, 'OS-EXT-SRV-ATTR:host', kwargs.get('host')) + setattr(instance, 'created_at', + kwargs.get('created_at', '1977-01-01T00:00:00')) + setattr(instance, 'OS-EXT-STS:vm_state', kwargs.get('state', 'active')) + + return instance + + @staticmethod + def fake_volume(**kwargs): + volume = mock.MagicMock(spec=cinderclient.v2.volumes.Volume) + volume.id = kwargs.get('id', utils.generate_uuid()) + volume.status = kwargs.get('status', 'available') + tenant_id = kwargs.get('project_id', None) + setattr(volume, 'os-vol-tenant-attr:tenant_id', tenant_id) + setattr(volume, 'os-vol-host-attr:host', kwargs.get('host')) + setattr(volume, 'size', kwargs.get('size', '1')) + setattr(volume, 'created_at', + kwargs.get('created_at', '1977-01-01T00:00:00')) + volume.volume_type = kwargs.get('volume_type', 'type1') + + return volume + + @staticmethod + def fake_flavor(**kwargs): + flavor = mock.MagicMock() + flavor.id = kwargs.get('id', None) + flavor.ram = kwargs.get('mem_size', '1') + flavor.vcpus = kwargs.get('vcpu_num', '1') + flavor.disk = kwargs.get('disk_size', '1') + + return flavor + + def test_get_src_node_list(self): + instances = self.strategy.get_src_node_list() + self.assertEqual(sorted(instances), sorted(["src1", "src2"])) + + def test_get_instances(self): + instance_on_src1 = self.fake_instance(host="src1", id="INSTANCE_1") + instance_on_src2 = self.fake_instance(host="src2", id="INSTANCE_2") + instance_on_src3 = self.fake_instance(host="src3", id="INSTANCE_3") + self.m_n_helper.get_instance_list.return_value = [ + instance_on_src1, + instance_on_src2, + instance_on_src3, + ] + + instances = self.strategy.get_instances() + + # src1,src2 is in instances + # src3 is not in instances + self.assertIn(instance_on_src1, instances) + self.assertIn(instance_on_src2, instances) + self.assertNotIn(instance_on_src3, instances) + + def test_get_volumes(self): + volume_on_src1 = self.fake_volume(host="src1@back1#pool1", + id="VOLUME_1") + volume_on_src2 = self.fake_volume(host="src2@back1#pool1", + id="VOLUME_2") + volume_on_src3 = self.fake_volume(host="src3@back2#pool1", + id="VOLUME_3") + self.m_c_helper.get_volume_list.return_value = [ + volume_on_src1, + volume_on_src2, + volume_on_src3, + ] + + volumes = self.strategy.get_volumes() + + # src1,src2 is in instances + # src3 is not in instances + self.assertIn(volume_on_src1, volumes) + self.assertIn(volume_on_src2, volumes) + self.assertNotIn(volume_on_src3, volumes) + + # execute # + + def test_execute_live_migrate_instance(self): + instance_on_src1 = self.fake_instance(host="src1", id="INSTANCE_1") + self.m_n_helper.get_instance_list.return_value = [ + instance_on_src1, + ] + + self.m_c_helper.get_volume_list.return_value = [] + + solution = self.strategy.execute() + + migration_types = collections.Counter( + [action.get('input_parameters')['migration_type'] + for action in solution.actions]) + self.assertEqual(1, migration_types.get("live", 0)) + global_efficacy_value = solution.global_efficacy[0].get('value', 0) + self.assertEqual(100, global_efficacy_value) + + def test_execute_cold_migrate_instance(self): + instance_on_src1 = self.fake_instance(host="src1", id="INSTANCE_1") + setattr(instance_on_src1, "status", "SHUTOFF") + setattr(instance_on_src1, "OS-EXT-STS:vm_state", "stopped") + self.m_n_helper.get_instance_list.return_value = [ + instance_on_src1, + ] + + self.m_c_helper.get_volume_list.return_value = [] + solution = self.strategy.execute() + + migration_types = collections.Counter( + [action.get('input_parameters')['migration_type'] + for action in solution.actions]) + self.assertEqual(1, migration_types.get("cold", 0)) + global_efficacy_value = solution.global_efficacy[1].get('value', 0) + self.assertEqual(100, global_efficacy_value) + + def test_execute_migrate_volume(self): + volume_on_src1 = self.fake_volume(host="src1@back1#pool1", + id="VOLUME_1") + self.m_c_helper.get_volume_list.return_value = [ + volume_on_src1, + ] + + self.m_n_helper.get_instance_list.return_value = [] + + solution = self.strategy.execute() + + migration_types = collections.Counter( + [action.get('input_parameters')['migration_type'] + for action in solution.actions]) + self.assertEqual(1, migration_types.get("migrate", 0)) + global_efficacy_value = solution.global_efficacy[2].get('value', 0) + self.assertEqual(100, global_efficacy_value) + + def test_execute_retype_volume(self): + volume_on_src2 = self.fake_volume(host="src2@back1#pool1", + id="VOLUME_2") + self.m_c_helper.get_volume_list.return_value = [ + volume_on_src2, + ] + + self.m_n_helper.get_instance_list.return_value = [] + + solution = self.strategy.execute() + + migration_types = collections.Counter( + [action.get('input_parameters')['migration_type'] + for action in solution.actions]) + self.assertEqual(1, migration_types.get("retype", 0)) + global_efficacy_value = solution.global_efficacy[2].get('value', 0) + self.assertEqual(100, global_efficacy_value) + + def test_execute_swap_volume(self): + volume_on_src1 = self.fake_volume(host="src1@back1#pool1", + id="VOLUME_1") + volume_on_src1.status = "in-use" + self.m_c_helper.get_volume_list.return_value = [ + volume_on_src1, + ] + + self.m_n_helper.get_instance_list.return_value = [] + + solution = self.strategy.execute() + + migration_types = collections.Counter( + [action.get('input_parameters')['migration_type'] + for action in solution.actions]) + self.assertEqual(1, migration_types.get("swap", 0)) + global_efficacy_value = solution.global_efficacy[3].get('value', 0) + self.assertEqual(100, global_efficacy_value) + + def test_execute_live_migrate_instance_parallel(self): + instance_on_src1_1 = self.fake_instance(host="src1", id="INSTANCE_1") + instance_on_src1_2 = self.fake_instance(host="src1", id="INSTANCE_2") + self.m_n_helper.get_instance_list.return_value = [ + instance_on_src1_1, + instance_on_src1_2, + ] + + self.m_c_helper.get_volume_list.return_value = [] + + solution = self.strategy.execute() + + migration_types = collections.Counter( + [action.get('input_parameters')['migration_type'] + for action in solution.actions]) + self.assertEqual(2, migration_types.get("live", 0)) + global_efficacy_value = solution.global_efficacy[0].get('value', 0) + self.assertEqual(100, global_efficacy_value) + + def test_execute_parallel_per_node(self): + self.m_parallel_per_node.return_value = 1 + + instance_on_src1_1 = self.fake_instance(host="src1", id="INSTANCE_1") + instance_on_src1_2 = self.fake_instance(host="src1", id="INSTANCE_2") + self.m_n_helper.get_instance_list.return_value = [ + instance_on_src1_1, + instance_on_src1_2, + ] + + self.m_c_helper.get_volume_list.return_value = [] + + solution = self.strategy.execute() + + migration_types = collections.Counter( + [action.get('input_parameters')['migration_type'] + for action in solution.actions]) + self.assertEqual(1, migration_types.get("live", 0)) + global_efficacy_value = solution.global_efficacy[0].get('value', 0) + self.assertEqual(50.0, global_efficacy_value) + + def test_execute_migrate_volume_parallel(self): + volume_on_src1_1 = self.fake_volume(host="src1@back1#pool1", + id="VOLUME_1") + volume_on_src1_2 = self.fake_volume(host="src1@back1#pool1", + id="VOLUME_2") + self.m_c_helper.get_volume_list.return_value = [ + volume_on_src1_1, + volume_on_src1_2, + ] + + self.m_n_helper.get_instance_list.return_value = [] + + solution = self.strategy.execute() + + migration_types = collections.Counter( + [action.get('input_parameters')['migration_type'] + for action in solution.actions]) + self.assertEqual(2, migration_types.get("migrate", 0)) + global_efficacy_value = solution.global_efficacy[2].get('value', 0) + self.assertEqual(100, global_efficacy_value) + + def test_execute_parallel_per_pool(self): + self.m_parallel_per_pool.return_value = 1 + + volume_on_src1_1 = self.fake_volume(host="src1@back1#pool1", + id="VOLUME_1") + volume_on_src1_2 = self.fake_volume(host="src1@back1#pool1", + id="VOLUME_2") + self.m_c_helper.get_volume_list.return_value = [ + volume_on_src1_1, + volume_on_src1_2, + ] + + self.m_n_helper.get_instance_list.return_value = [] + + solution = self.strategy.execute() + + migration_types = collections.Counter( + [action.get('input_parameters')['migration_type'] + for action in solution.actions]) + self.assertEqual(1, migration_types.get("migrate", 0)) + global_efficacy_value = solution.global_efficacy[2].get('value', 0) + self.assertEqual(50.0, global_efficacy_value) + + def test_execute_parallel_total(self): + self.m_parallel_total.return_value = 1 + self.m_parallel_per_pool.return_value = 1 + + volume_on_src1_1 = self.fake_volume(host="src1@back1#pool1", + id="VOLUME_1") + volume_on_src1_2 = self.fake_volume(host="src1@back1#pool1", + id="VOLUME_2") + volume_on_src2_1 = self.fake_volume(host="src2@back1#pool1", + id="VOLUME_3") + self.m_c_helper.get_volume_list.return_value = [ + volume_on_src1_1, + volume_on_src1_2, + volume_on_src2_1, + ] + + self.m_n_helper.get_instance_list.return_value = [] + + solution = self.strategy.execute() + + migration_types = collections.Counter( + [action.get('input_parameters')['migration_type'] + for action in solution.actions]) + self.assertEqual(1, migration_types.get("migrate", 0)) + + # priority filter # + + def test_get_priority_filter_list(self): + self.m_priority.return_value = { + "project": ["pj1"], + "compute_node": ["compute1", "compute2"], + "compute": ["cpu_num"], + "storage_pool": ["pool1", "pool2"], + "storage": ["size"] + } + filters = self.strategy.get_priority_filter_list() + self.assertIn(strategies.zone_migration.ComputeHostSortFilter, + map(lambda l: l.__class__, filters)) + self.assertIn(strategies.zone_migration.StorageHostSortFilter, + map(lambda l: l.__class__, filters)) + self.assertIn(strategies.zone_migration.ProjectSortFilter, + map(lambda l: l.__class__, filters)) + + # ComputeHostSortFilter # + + def test_filtered_targets_compute_nodes(self): + instance_on_src1 = self.fake_instance(host="src1", id="INSTANCE_1") + instance_on_src2 = self.fake_instance(host="src2", id="INSTANCE_2") + instance_on_src3 = self.fake_instance(host="src3", id="INSTANCE_3") + self.m_n_helper.get_instance_list.return_value = [ + instance_on_src1, + instance_on_src2, + instance_on_src3, + ] + + self.m_c_helper.get_volume_list.return_value = [] + + self.m_priority.return_value = { + "compute_node": ["src1", "src2"], + } + + targets = self.strategy.filtered_targets() + self.assertEqual(targets.get('instance'), + [instance_on_src1, instance_on_src2]) + + # StorageHostSortFilter # + + def test_filtered_targets_storage_pools(self): + volume_on_src1 = self.fake_volume(host="src1@back1#pool1", + id="VOLUME_1") + volume_on_src2 = self.fake_volume(host="src2@back1#pool1", + id="VOLUME_2") + volume_on_src3 = self.fake_volume(host="src3@back2#pool1", + id="VOLUME_3") + self.m_c_helper.get_volume_list.return_value = [ + volume_on_src1, + volume_on_src2, + volume_on_src3, + ] + + self.m_n_helper.get_instance_list.return_value = [] + + self.m_priority.return_value = { + "storage_pool": ["src1@back1#pool1", "src2@back1#pool1"], + } + + targets = self.strategy.filtered_targets() + self.assertEqual(targets.get("volume"), + [volume_on_src1, volume_on_src2]) + + # ProjectSortFilter # + + def test_filtered_targets_project(self): + instance_on_src1 = self.fake_instance( + host="src1", id="INSTANCE_1", project_id="pj2") + instance_on_src2 = self.fake_instance( + host="src2", id="INSTANCE_2", project_id="pj1") + instance_on_src3 = self.fake_instance( + host="src3", id="INSTANCE_3", project_id="pj3") + self.m_n_helper.get_instance_list.return_value = [ + instance_on_src1, + instance_on_src2, + instance_on_src3, + ] + + self.m_c_helper.get_volume_list.return_value = [] + + volume_on_src1 = self.fake_volume(host="src1@back1#pool1", + id="VOLUME_1", project_id="pj2") + volume_on_src2 = self.fake_volume(host="src2@back1#pool1", + id="VOLUME_2", project_id="pj1") + volume_on_src3 = self.fake_volume(host="src3@back2#pool1", + id="VOLUME_3", project_id="pj3") + + self.m_c_helper.get_volume_list.return_value = [ + volume_on_src1, + volume_on_src2, + volume_on_src3, + ] + + self.m_priority.return_value = { + "project": ["pj1"], + } + + targets = self.strategy.filtered_targets() + self.assertEqual(targets.get('instance'), + [instance_on_src2, instance_on_src1]) + self.assertEqual(targets.get('volume'), + [volume_on_src2, volume_on_src1]) + self.assertEqual(targets, + {"instance": [instance_on_src2, instance_on_src1], + "volume": [volume_on_src2, volume_on_src1]}) + + # ComputeSpecSortFilter # + + def test_filtered_targets_instance_mem_size(self): + flavor_64 = self.fake_flavor(id="1", mem_size="64") + flavor_128 = self.fake_flavor(id="2", mem_size="128") + flavor_512 = self.fake_flavor(id="3", mem_size="512") + self.m_n_helper.get_flavor_list.return_value = [ + flavor_64, + flavor_128, + flavor_512, + ] + + instance_on_src1 = self.fake_instance(host="src1", + id="INSTANCE_1", flavor_id="1") + instance_on_src2 = self.fake_instance(host="src2", + id="INSTANCE_2", flavor_id="2") + instance_on_src3 = self.fake_instance(host="src3", + id="INSTANCE_3", flavor_id="3") + self.m_n_helper.get_instance_list.return_value = [ + instance_on_src1, + instance_on_src2, + instance_on_src3, + ] + + self.m_c_helper.get_volume_list.return_value = [] + + self.m_priority.return_value = { + "compute": ["mem_size"], + } + + targets = self.strategy.filtered_targets() + self.assertEqual(targets.get('instance'), + [instance_on_src2, instance_on_src1]) + + def test_filtered_targets_instance_vcpu_num(self): + flavor_1 = self.fake_flavor(id="1", vcpu_num="1") + flavor_2 = self.fake_flavor(id="2", vcpu_num="2") + flavor_3 = self.fake_flavor(id="3", vcpu_num="3") + self.m_n_helper.get_flavor_list.return_value = [ + flavor_1, + flavor_2, + flavor_3, + ] + + instance_on_src1 = self.fake_instance(host="src1", + id="INSTANCE_1", flavor_id="1") + instance_on_src2 = self.fake_instance(host="src2", + id="INSTANCE_2", flavor_id="2") + instance_on_src3 = self.fake_instance(host="src3", + id="INSTANCE_3", flavor_id="3") + self.m_n_helper.get_instance_list.return_value = [ + instance_on_src1, + instance_on_src2, + instance_on_src3, + ] + + self.m_c_helper.get_volume_list.return_value = [] + + self.m_priority.return_value = { + "compute": ["vcpu_num"], + } + + targets = self.strategy.filtered_targets() + self.assertEqual(targets.get('instance'), + [instance_on_src2, instance_on_src1]) + + def test_filtered_targets_instance_disk_size(self): + flavor_1 = self.fake_flavor(id="1", disk_size="1") + flavor_2 = self.fake_flavor(id="2", disk_size="2") + flavor_3 = self.fake_flavor(id="3", disk_size="3") + self.m_n_helper.get_flavor_list.return_value = [ + flavor_1, + flavor_2, + flavor_3, + ] + + instance_on_src1 = self.fake_instance(host="src1", + id="INSTANCE_1", flavor_id="1") + instance_on_src2 = self.fake_instance(host="src2", + id="INSTANCE_2", flavor_id="2") + instance_on_src3 = self.fake_instance(host="src3", + id="INSTANCE_3", flavor_id="3") + self.m_n_helper.get_instance_list.return_value = [ + instance_on_src1, + instance_on_src2, + instance_on_src3, + ] + + self.m_c_helper.get_volume_list.return_value = [] + + self.m_priority.return_value = { + "compute": ["disk_size"], + } + + targets = self.strategy.filtered_targets() + self.assertEqual(targets.get('instance'), + [instance_on_src2, instance_on_src1]) + + def test_filtered_targets_instance_created_at(self): + instance_on_src1 = self.fake_instance( + host="src1", id="INSTANCE_1", created_at="2017-10-30T00:00:00") + instance_on_src2 = self.fake_instance( + host="src2", id="INSTANCE_2", created_at="1977-03-29T03:03:03") + instance_on_src3 = self.fake_instance( + host="src3", id="INSTANCE_3", created_at="1977-03-29T03:03:03") + self.m_n_helper.get_instance_list.return_value = [ + instance_on_src1, + instance_on_src2, + instance_on_src3, + ] + + self.m_c_helper.get_volume_list.return_value = [] + + self.m_priority.return_value = { + "compute": ["created_at"], + } + + targets = self.strategy.filtered_targets() + self.assertEqual(targets.get('instance'), + [instance_on_src2, instance_on_src1]) + + # StorageSpecSortFilter # + + def test_filtered_targets_storage_size(self): + volume_on_src1 = self.fake_volume(host="src1@back1#pool1", + size="1", id="VOLUME_1") + volume_on_src2 = self.fake_volume(host="src2@back1#pool1", + size="2", id="VOLUME_2") + volume_on_src3 = self.fake_volume(host="src3@back2#pool1", + size="3", id="VOLUME_3") + self.m_c_helper.get_volume_list.return_value = [ + volume_on_src1, + volume_on_src2, + volume_on_src3, + ] + + self.m_n_helper.get_instance_list.return_value = [] + + self.m_priority.return_value = { + "storage": ["size"] + } + + targets = self.strategy.filtered_targets() + self.assertEqual(targets.get("volume"), + [volume_on_src2, volume_on_src1]) + + def test_filtered_targets_storage_created_at(self): + volume_on_src1 = self.fake_volume(host="src1@back1#pool1", + id="VOLUME_1", + created_at="2017-10-30T00:00:00") + volume_on_src2 = self.fake_volume(host="src2@back1#pool1", + id="VOLUME_2", + created_at="1977-03-29T03:03:03") + volume_on_src3 = self.fake_volume(host="src3@back2#pool1", + id="VOLUME_3", + created_at="1977-03-29T03:03:03") + self.m_c_helper.get_volume_list.return_value = [ + volume_on_src1, + volume_on_src2, + volume_on_src3, + ] + + self.m_n_helper.get_instance_list.return_value = [] + + self.m_priority.return_value = { + "storage": ["created_at"] + } + + targets = self.strategy.filtered_targets() + self.assertEqual(targets.get("volume"), + [volume_on_src2, volume_on_src1])