Add zone migration strategy

This patch adds hardware maintenance goal, efficacy and zone
migration strategy.

Change-Id: I5bfee421780233ffeea8c1539aba720ae554983d
Implements: blueprint zone-migration-strategy
This commit is contained in:
Hidekazu Nakamura 2017-12-06 15:09:59 +09:00
parent 5ec8932182
commit 1dae83da57
12 changed files with 1953 additions and 2 deletions

View File

@ -0,0 +1,6 @@
---
features:
- |
Added strategy "Zone migration" and it's goal "Hardware maintenance".
The strategy migrates many instances and volumes efficiently with
minimum downtime automatically.

View File

@ -57,6 +57,7 @@ watcher_goals =
airflow_optimization = watcher.decision_engine.goal.goals:AirflowOptimization
noisy_neighbor = watcher.decision_engine.goal.goals:NoisyNeighborOptimization
saving_energy = watcher.decision_engine.goal.goals:SavingEnergy
hardware_maintenance = watcher.decision_engine.goal.goals:HardwareMaintenance
watcher_scoring_engines =
dummy_scorer = watcher.decision_engine.scoring.dummy_scorer:DummyScorer
@ -78,6 +79,7 @@ watcher_strategies =
uniform_airflow = watcher.decision_engine.strategy.strategies.uniform_airflow:UniformAirflow
noisy_neighbor = watcher.decision_engine.strategy.strategies.noisy_neighbor:NoisyNeighbor
storage_capacity_balance = watcher.decision_engine.strategy.strategies.storage_capacity_balance:StorageCapacityBalance
zone_migration = watcher.decision_engine.strategy.strategies.zone_migration:ZoneMigration
watcher_actions =
migrate = watcher.applier.actions.migration:Migrate

View File

@ -366,6 +366,14 @@ class ClusterEmpty(WatcherException):
msg_fmt = _("The list of compute node(s) in the cluster is empty")
class ComputeClusterEmpty(WatcherException):
msg_fmt = _("The list of compute node(s) in the cluster is empty")
class StorageClusterEmpty(WatcherException):
msg_fmt = _("The list of storage node(s) in the cluster is empty")
class MetricCollectorNotDefined(WatcherException):
msg_fmt = _("The metrics resource collector is not defined")

View File

@ -67,6 +67,9 @@ class NovaHelper(object):
def get_instance_list(self):
return self.nova.servers.list(search_opts={'all_tenants': True})
def get_flavor_list(self):
return self.nova.flavors.list(**{'is_public': None})
def get_service(self, service_id):
return self.nova.services.find(id=service_id)

View File

@ -23,7 +23,10 @@ Unclassified = goals.Unclassified
WorkloadBalancing = goals.WorkloadBalancing
NoisyNeighbor = goals.NoisyNeighborOptimization
SavingEnergy = goals.SavingEnergy
HardwareMaintenance = goals.HardwareMaintenance
__all__ = ("Dummy", "ServerConsolidation", "ThermalOptimization",
"Unclassified", "WorkloadBalancing",
"NoisyNeighborOptimization", "SavingEnergy")
"NoisyNeighborOptimization", "SavingEnergy",
"HardwareMaintenance")

View File

@ -112,3 +112,118 @@ class InstanceMigrationsCount(IndicatorSpecification):
def schema(self):
return voluptuous.Schema(
voluptuous.Range(min=0), required=True)
class LiveInstanceMigrateCount(IndicatorSpecification):
def __init__(self):
super(LiveInstanceMigrateCount, self).__init__(
name="live_migrate_instance_count",
description=_("The number of instances actually live migrated."),
unit=None,
)
@property
def schema(self):
return voluptuous.Schema(
voluptuous.Range(min=0), required=True)
class PlannedLiveInstanceMigrateCount(IndicatorSpecification):
def __init__(self):
super(PlannedLiveInstanceMigrateCount, self).__init__(
name="planned_live_migrate_instance_count",
description=_("The number of instances planned to live migrate."),
unit=None,
)
@property
def schema(self):
return voluptuous.Schema(
voluptuous.Range(min=0), required=True)
class ColdInstanceMigrateCount(IndicatorSpecification):
def __init__(self):
super(ColdInstanceMigrateCount, self).__init__(
name="cold_migrate_instance_count",
description=_("The number of instances actually cold migrated."),
unit=None,
)
@property
def schema(self):
return voluptuous.Schema(
voluptuous.Range(min=0), required=True)
class PlannedColdInstanceMigrateCount(IndicatorSpecification):
def __init__(self):
super(PlannedColdInstanceMigrateCount, self).__init__(
name="planned_cold_migrate_instance_count",
description=_("The number of instances planned to cold migrate."),
unit=None,
)
@property
def schema(self):
return voluptuous.Schema(
voluptuous.Range(min=0), required=True)
class VolumeMigrateCount(IndicatorSpecification):
def __init__(self):
super(VolumeMigrateCount, self).__init__(
name="volume_migrate_count",
description=_("The number of detached volumes actually migrated."),
unit=None,
)
@property
def schema(self):
return voluptuous.Schema(
voluptuous.Range(min=0), required=True)
class PlannedVolumeMigrateCount(IndicatorSpecification):
def __init__(self):
super(PlannedVolumeMigrateCount, self).__init__(
name="planned_volume_migrate_count",
description=_("The number of detached volumes planned"
" to migrate."),
unit=None,
)
@property
def schema(self):
return voluptuous.Schema(
voluptuous.Range(min=0), required=True)
class VolumeUpdateCount(IndicatorSpecification):
def __init__(self):
super(VolumeUpdateCount, self).__init__(
name="volume_update_count",
description=_("The number of attached volumes actually"
" migrated."),
unit=None,
)
@property
def schema(self):
return voluptuous.Schema(
voluptuous.Range(min=0), required=True)
class PlannedVolumeUpdateCount(IndicatorSpecification):
def __init__(self):
super(PlannedVolumeUpdateCount, self).__init__(
name="planned_volume_update_count",
description=_("The number of attached volumes planned to"
" migrate."),
unit=None,
)
@property
def schema(self):
return voluptuous.Schema(
voluptuous.Range(min=0), required=True)

View File

@ -53,3 +53,86 @@ class ServerConsolidation(base.EfficacySpecification):
))
return global_efficacy
class HardwareMaintenance(base.EfficacySpecification):
def get_indicators_specifications(self):
return [
indicators.LiveInstanceMigrateCount(),
indicators.PlannedLiveInstanceMigrateCount(),
indicators.ColdInstanceMigrateCount(),
indicators.PlannedColdInstanceMigrateCount(),
indicators.VolumeMigrateCount(),
indicators.PlannedVolumeMigrateCount(),
indicators.VolumeUpdateCount(),
indicators.PlannedVolumeUpdateCount()
]
def get_global_efficacy_indicator(self, indicators_map=None):
li_value = 0
if (indicators_map and
indicators_map.planned_live_migrate_instance_count > 0):
li_value = (
float(indicators_map.planned_live_migrate_instance_count)
/ float(indicators_map.live_migrate_instance_count)
* 100
)
li_indicator = efficacy.Indicator(
name="live_instance_migrate_ratio",
description=_("Ratio of actual live migrated instances "
"to planned live migrate instances."),
unit='%',
value=li_value)
ci_value = 0
if (indicators_map and
indicators_map.planned_cold_migrate_instance_count > 0):
ci_value = (
float(indicators_map.planned_cold_migrate_instance_count)
/ float(indicators_map.cold_migrate_instance_count)
* 100
)
ci_indicator = efficacy.Indicator(
name="cold_instance_migrate_ratio",
description=_("Ratio of actual cold migrated instances "
"to planned cold migrate instances."),
unit='%',
value=ci_value)
dv_value = 0
if (indicators_map and
indicators_map.planned_volume_migrate_count > 0):
dv_value = (float(indicators_map.planned_volume_migrate_count) /
float(indicators_map.
volume_migrate_count)
* 100)
dv_indicator = efficacy.Indicator(
name="volume_migrate_ratio",
description=_("Ratio of actual detached volumes migrated to"
" planned detached volumes migrate."),
unit='%',
value=dv_value)
av_value = 0
if (indicators_map and
indicators_map.planned_volume_update_count > 0):
av_value = (float(indicators_map.planned_volume_update_count) /
float(indicators_map.
volume_update_count)
* 100)
av_indicator = efficacy.Indicator(
name="volume_update_ratio",
description=_("Ratio of actual attached volumes migrated to"
" planned attached volumes migrate."),
unit='%',
value=av_value)
return [li_indicator,
ci_indicator,
dv_indicator,
av_indicator]

View File

@ -216,3 +216,28 @@ class SavingEnergy(base.Goal):
def get_efficacy_specification(cls):
"""The efficacy spec for the current goal"""
return specs.Unclassified()
class HardwareMaintenance(base.Goal):
"""HardwareMaintenance
This goal is to migrate instances and volumes on a set of compute nodes
and storage from nodes under maintenance
"""
@classmethod
def get_name(cls):
return "hardware_maintenance"
@classmethod
def get_display_name(cls):
return _("Hardware Maintenance")
@classmethod
def get_translatable_display_name(cls):
return "Hardware Maintenance"
@classmethod
def get_efficacy_specification(cls):
"""The efficacy spec for the current goal"""
return specs.HardwareMaintenance()

View File

@ -28,6 +28,8 @@ from watcher.decision_engine.strategy.strategies import \
vm_workload_consolidation
from watcher.decision_engine.strategy.strategies import workload_balance
from watcher.decision_engine.strategy.strategies import workload_stabilization
from watcher.decision_engine.strategy.strategies import zone_migration
Actuator = actuation.Actuator
BasicConsolidation = basic_consolidation.BasicConsolidation
@ -41,8 +43,10 @@ WorkloadBalance = workload_balance.WorkloadBalance
WorkloadStabilization = workload_stabilization.WorkloadStabilization
UniformAirflow = uniform_airflow.UniformAirflow
NoisyNeighbor = noisy_neighbor.NoisyNeighbor
ZoneMigration = zone_migration.ZoneMigration
__all__ = ("Actuator", "BasicConsolidation", "OutletTempControl",
"DummyStrategy", "DummyWithScorer", "VMWorkloadConsolidation",
"WorkloadBalance", "WorkloadStabilization", "UniformAirflow",
"NoisyNeighbor", "SavingEnergy", "StorageCapacityBalance")
"NoisyNeighbor", "SavingEnergy", "StorageCapacityBalance",
"ZoneMigration")

View File

@ -399,3 +399,11 @@ class SavingEnergyBaseStrategy(BaseStrategy):
@classmethod
def get_goal_name(cls):
return "saving_energy"
@six.add_metaclass(abc.ABCMeta)
class ZoneMigrationBaseStrategy(BaseStrategy):
@classmethod
def get_goal_name(cls):
return "hardware_maintenance"

View File

@ -0,0 +1,975 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dateutil.parser import parse
import six
from oslo_log import log
from cinderclient.v2.volumes import Volume
from novaclient.v2.servers import Server
from watcher._i18n import _
from watcher.common import cinder_helper
from watcher.common import exception as wexc
from watcher.common import nova_helper
from watcher.decision_engine.model import element
from watcher.decision_engine.strategy.strategies import base
LOG = log.getLogger(__name__)
INSTANCE = "instance"
VOLUME = "volume"
ACTIVE = "active"
PAUSED = 'paused'
STOPPED = "stopped"
status_ACTIVE = 'ACTIVE'
status_PAUSED = 'PAUSED'
status_SHUTOFF = 'SHUTOFF'
AVAILABLE = "available"
IN_USE = "in-use"
class ZoneMigration(base.ZoneMigrationBaseStrategy):
def __init__(self, config, osc=None):
super(ZoneMigration, self).__init__(config, osc)
self._nova = None
self._cinder = None
self.live_count = 0
self.planned_live_count = 0
self.cold_count = 0
self.planned_cold_count = 0
self.volume_count = 0
self.planned_volume_count = 0
self.volume_update_count = 0
self.planned_volume_update_count = 0
@classmethod
def get_name(cls):
return "zone_migration"
@classmethod
def get_display_name(cls):
return _("Zone migration")
@classmethod
def get_translatable_display_name(cls):
return "Zone migration"
@classmethod
def get_schema(cls):
return {
"properties": {
"compute_nodes": {
"type": "array",
"items": {
"type": "object",
"properties": {
"src_node": {
"description": "Compute node from which"
" instances migrate",
"type": "string"
},
"dst_node": {
"description": "Compute node to which"
"instances migrate",
"type": "string"
}
},
"required": ["src_node"],
"additionalProperties": False
}
},
"storage_pools": {
"type": "array",
"items": {
"type": "object",
"properties": {
"src_pool": {
"description": "Storage pool from which"
" volumes migrate",
"type": "string"
},
"dst_pool": {
"description": "Storage pool to which"
" volumes migrate",
"type": "string"
},
"src_type": {
"description": "Volume type from which"
" volumes migrate",
"type": "string"
},
"dst_type": {
"description": "Volume type to which"
" volumes migrate",
"type": "string"
}
},
"required": ["src_pool", "src_type", "dst_type"],
"additionalProperties": False
}
},
"parallel_total": {
"description": "The number of actions to be run in"
" parallel in total",
"type": "integer", "minimum": 0, "default": 6
},
"parallel_per_node": {
"description": "The number of actions to be run in"
" parallel per compute node",
"type": "integer", "minimum": 0, "default": 2
},
"parallel_per_pool": {
"description": "The number of actions to be run in"
" parallel per storage host",
"type": "integer", "minimum": 0, "default": 2
},
"priority": {
"description": "List prioritizes instances and volumes",
"type": "object",
"properties": {
"project": {
"type": "array", "items": {"type": "string"}
},
"compute_node": {
"type": "array", "items": {"type": "string"}
},
"storage_pool": {
"type": "array", "items": {"type": "string"}
},
"compute": {
"enum": ["vcpu_num", "mem_size", "disk_size",
"created_at"]
},
"storage": {
"enum": ["size", "created_at"]
}
},
"additionalProperties": False
},
"with_attached_volume": {
"description": "instance migrates just after attached"
" volumes or not",
"type": "boolean", "default": False
},
},
"additionalProperties": False
}
@property
def migrate_compute_nodes(self):
"""Get compute nodes from input_parameters
:returns: compute nodes
e.g. [{"src_node": "w012", "dst_node": "w022"},
{"src_node": "w013", "dst_node": "w023"}]
"""
return self.input_parameters.get('compute_nodes')
@property
def migrate_storage_pools(self):
"""Get storage pools from input_parameters
:returns: storage pools
e.g. [
{"src_pool": "src1@back1#pool1",
"dst_pool": "dst1@back1#pool1",
"src_type": "src1_type",
"dst_type": "dst1_type"},
{"src_pool": "src1@back2#pool1",
"dst_pool": "dst1@back2#pool1",
"src_type": "src1_type",
"dst_type": "dst1_type"}
]
"""
return self.input_parameters.get('storage_pools')
@property
def parallel_total(self):
return self.input_parameters.get('parallel_total')
@property
def parallel_per_node(self):
return self.input_parameters.get('parallel_per_node')
@property
def parallel_per_pool(self):
return self.input_parameters.get('parallel_per_pool')
@property
def priority(self):
"""Get priority from input_parameters
:returns: priority map
e.g.
{
"project": ["pj1"],
"compute_node": ["compute1", "compute2"],
"compute": ["vcpu_num"],
"storage_pool": ["pool1", "pool2"],
"storage": ["size", "created_at"]
}
"""
return self.input_parameters.get('priority')
@property
def with_attached_volume(self):
return self.input_parameters.get('with_attached_volume')
@property
def nova(self):
if self._nova is None:
self._nova = nova_helper.NovaHelper(osc=self.osc)
return self._nova
@property
def cinder(self):
if self._cinder is None:
self._cinder = cinder_helper.CinderHelper(osc=self.osc)
return self._cinder
def get_available_compute_nodes(self):
default_node_scope = [element.ServiceState.ENABLED.value,
element.ServiceState.DISABLED.value]
return {uuid: cn for uuid, cn in
self.compute_model.get_all_compute_nodes().items()
if cn.state == element.ServiceState.ONLINE.value and
cn.status in default_node_scope}
def get_available_storage_nodes(self):
default_node_scope = [element.ServiceState.ENABLED.value,
element.ServiceState.DISABLED.value]
return {uuid: cn for uuid, cn in
self.storage_model.get_all_storage_nodes().items()
if cn.state == element.ServiceState.ONLINE.value and
cn.status in default_node_scope}
def pre_execute(self):
"""Pre-execution phase
This can be used to fetch some pre-requisites or data.
"""
LOG.info("Initializing zone migration Strategy")
if len(self.get_available_compute_nodes()) == 0:
raise wexc.ComputeClusterEmpty()
if len(self.get_available_storage_nodes()) == 0:
raise wexc.StorageClusterEmpty()
LOG.debug(self.compute_model.to_string())
LOG.debug(self.storage_model.to_string())
def do_execute(self):
"""Strategy execution phase
"""
filtered_targets = self.filtered_targets()
self.set_migration_count(filtered_targets)
total_limit = self.parallel_total
per_node_limit = self.parallel_per_node
per_pool_limit = self.parallel_per_pool
action_counter = ActionCounter(total_limit,
per_pool_limit, per_node_limit)
for k, targets in six.iteritems(filtered_targets):
if k == VOLUME:
self.volumes_migration(targets, action_counter)
elif k == INSTANCE:
if self.volume_count == 0 and self.volume_update_count == 0:
# if with_attached_volume is true,
# instance having attached volumes already migrated,
# migrate instances which does not have attached volumes
if self.with_attached_volume:
targets = self.instances_no_attached(targets)
self.instances_migration(targets, action_counter)
else:
self.instances_migration(targets, action_counter)
LOG.debug("action total: %s, pools: %s, nodes %s " % (
action_counter.total_count,
action_counter.per_pool_count,
action_counter.per_node_count))
def post_execute(self):
"""Post-execution phase
This can be used to compute the global efficacy
"""
self.solution.set_efficacy_indicators(
live_migrate_instance_count=self.live_count,
planned_live_migrate_instance_count=self.planned_live_count,
cold_migrate_instance_count=self.cold_count,
planned_cold_migrate_instance_count=self.planned_cold_count,
volume_migrate_count=self.volume_count,
planned_volume_migrate_count=self.planned_volume_count,
volume_update_count=self.volume_update_count,
planned_volume_update_count=self.planned_volume_update_count
)
def set_migration_count(self, targets):
"""Set migration count
:param targets: dict of instance object and volume object list
keys of dict are instance and volume
"""
for instance in targets.get('instance', []):
if self.is_live(instance):
self.live_count += 1
elif self.is_cold(instance):
self.cold_count += 1
for volume in targets.get('volume', []):
if self.is_available(volume):
self.volume_count += 1
elif self.is_in_use(volume):
self.volume_update_count += 1
def is_live(self, instance):
status = getattr(instance, 'status')
state = getattr(instance, 'OS-EXT-STS:vm_state')
return (status == status_ACTIVE and state == ACTIVE
) or (status == status_PAUSED and state == PAUSED)
def is_cold(self, instance):
status = getattr(instance, 'status')
state = getattr(instance, 'OS-EXT-STS:vm_state')
return status == status_SHUTOFF and state == STOPPED
def is_available(self, volume):
return getattr(volume, 'status') == AVAILABLE
def is_in_use(self, volume):
return getattr(volume, 'status') == IN_USE
def instances_no_attached(instances):
return [i for i in instances
if not getattr(i, "os-extended-volumes:volumes_attached")]
def get_host_by_pool(self, pool):
"""Get host name from pool name
Utility method to get host name from pool name
which is formatted as host@backend#pool.
:param pool: pool name
:returns: host name
"""
# TODO(hidekazu) use this
# mapping = zonemgr.get_host_pool_mapping()
# for host, pools in six.iteritems(mapping):
# for _pool in pools:
# if pool == _pool:
# return host
# LOG.warning(self.msg_not_exist_corresponding_host % pool)
# return pool
return pool.split('@')[0]
def get_dst_node(self, src_node):
"""Get destination node from self.migration_compute_nodes
:param src_node: compute node name
:returns: destination node name
"""
for node in self.migrate_compute_nodes:
if node.get("src_node") == src_node:
return node.get("dst_node")
def get_dst_pool_and_type(self, src_pool, src_type):
"""Get destination pool and type from self.migration_storage_pools
:param src_pool: storage pool name
:param src_type: storage volume type
:returns: set of storage pool name and volume type name
"""
for pool in self.migrate_storage_pools:
if pool.get("src_pool") == src_pool:
return (pool.get("dst_pool", None),
pool.get("dst_type"))
def volumes_migration(self, volumes, action_counter):
for volume in volumes:
if action_counter.is_total_max():
LOG.debug('total reached limit')
break
pool = getattr(volume, 'os-vol-host-attr:host')
if action_counter.is_pool_max(pool):
LOG.debug("%s has objects to be migrated, but it has"
" reached the limit of parallelization." % pool)
continue
src_type = volume.volume_type
dst_pool, dst_type = self.get_dst_pool_and_type(pool, src_type)
LOG.debug(src_type)
LOG.debug("%s %s" % (dst_pool, dst_type))
if self.is_available(volume):
if src_type == dst_type:
self._volume_migrate(volume.id, dst_pool)
else:
self._volume_retype(volume.id, dst_type)
elif self.is_in_use(volume):
self._volume_update(volume.id, dst_type)
# if with_attached_volume is True, migrate attaching instances
if self.with_attached_volume:
instances = [self.nova.find_instance(dic.get('server_id'))
for dic in volume.attachments]
self.instances_migration(instances, action_counter)
action_counter.add_pool(pool)
def instances_migration(self, instances, action_counter):
for instance in instances:
src_node = getattr(instance, 'OS-EXT-SRV-ATTR:host')
if action_counter.is_total_max():
LOG.debug('total reached limit')
break
if action_counter.is_node_max(src_node):
LOG.debug("%s has objects to be migrated, but it has"
" reached the limit of parallelization." % src_node)
continue
dst_node = self.get_dst_node(src_node)
if self.is_live(instance):
self._live_migration(instance.id, src_node, dst_node)
elif self.is_cold(instance):
self._cold_migration(instance.id, src_node, dst_node)
action_counter.add_node(src_node)
def _live_migration(self, resource_id, src_node, dst_node):
parameters = {"migration_type": "live",
"destination_node": dst_node,
"source_node": src_node}
self.solution.add_action(
action_type="migrate",
resource_id=resource_id,
input_parameters=parameters)
self.planned_live_count += 1
def _cold_migration(self, resource_id, src_node, dst_node):
parameters = {"migration_type": "cold",
"destination_node": dst_node,
"source_node": src_node}
self.solution.add_action(
action_type="migrate",
resource_id=resource_id,
input_parameters=parameters)
self.planned_cold_count += 1
def _volume_update(self, resource_id, dst_type):
parameters = {"migration_type": "swap",
"destination_type": dst_type}
self.solution.add_action(
action_type="volume_migrate",
resource_id=resource_id,
input_parameters=parameters)
self.planned_volume_update_count += 1
def _volume_migrate(self, resource_id, dst_pool):
parameters = {"migration_type": "migrate",
"destination_node": dst_pool}
self.solution.add_action(
action_type="volume_migrate",
resource_id=resource_id,
input_parameters=parameters)
self.planned_volume_count += 1
def _volume_retype(self, resource_id, dst_type):
parameters = {"migration_type": "retype",
"destination_type": dst_type}
self.solution.add_action(
action_type="volume_migrate",
resource_id=resource_id,
input_parameters=parameters)
self.planned_volume_count += 1
def get_src_node_list(self):
"""Get src nodes from migrate_compute_nodes
:returns: src node name list
"""
if not self.migrate_compute_nodes:
return None
return [v for dic in self.migrate_compute_nodes
for k, v in dic.items() if k == "src_node"]
def get_src_pool_list(self):
"""Get src pools from migrate_storage_pools
:returns: src pool name list
"""
return [v for dic in self.migrate_storage_pools
for k, v in dic.items() if k == "src_pool"]
def get_instances(self):
"""Get migrate target instances
:returns: instance list on src nodes and compute scope
"""
src_node_list = self.get_src_node_list()
if not src_node_list:
return None
return [i for i in self.nova.get_instance_list()
if getattr(i, 'OS-EXT-SRV-ATTR:host') in src_node_list
and self.compute_model.get_instance_by_uuid(i.id)]
def get_volumes(self):
"""Get migrate target volumes
:returns: volume list on src pools and storage scope
"""
src_pool_list = self.get_src_pool_list()
return [i for i in self.cinder.get_volume_list()
if getattr(i, 'os-vol-host-attr:host') in src_pool_list
and self.storage_model.get_volume_by_uuid(i.id)]
def filtered_targets(self):
"""Filter targets
prioritize instances and volumes based on priorities
from input parameters.
:returns: prioritized targets
"""
result = {}
if self.migrate_compute_nodes:
result["instance"] = self.get_instances()
if self.migrate_storage_pools:
result["volume"] = self.get_volumes()
if not self.priority:
return result
filter_actions = self.get_priority_filter_list()
LOG.debug(filter_actions)
# apply all filters set in input prameter
for action in list(reversed(filter_actions)):
LOG.debug(action)
result = action.apply_filter(result)
return result
def get_priority_filter_list(self):
"""Get priority filters
:returns: list of filter object with arguments in self.priority
"""
filter_list = []
priority_filter_map = self.get_priority_filter_map()
for k, v in six.iteritems(self.priority):
if k in priority_filter_map:
filter_list.append(priority_filter_map[k](v))
return filter_list
def get_priority_filter_map(self):
"""Get priority filter map
:returns: filter map
key is the key in priority input parameters.
value is filter class for prioritizing.
"""
return {
"project": ProjectSortFilter,
"compute_node": ComputeHostSortFilter,
"storage_pool": StorageHostSortFilter,
"compute": ComputeSpecSortFilter,
"storage": StorageSpecSortFilter,
}
class ActionCounter(object):
"""Manage the number of actions in parallel"""
def __init__(self, total_limit=6, per_pool_limit=2, per_node_limit=2):
"""Initialize dict of host and the number of action
:param total_limit: total number of actions
:param per_pool_limit: the number of migrate actions per storage pool
:param per_node_limit: the number of migrate actions per compute node
"""
self.total_limit = total_limit
self.per_pool_limit = per_pool_limit
self.per_node_limit = per_node_limit
self.per_pool_count = {}
self.per_node_count = {}
self.total_count = 0
def add_pool(self, pool):
"""Increment the number of actions on host and total count
:param pool: storage pool
:returns: True if incremented, False otherwise
"""
if pool not in self.per_pool_count:
self.per_pool_count[pool] = 0
if not self.is_total_max() and not self.is_pool_max(pool):
self.per_pool_count[pool] += 1
self.total_count += 1
LOG.debug("total: %s, per_pool: %s" % (
self.total_count, self.per_pool_count))
return True
return False
def add_node(self, node):
"""Add the number of actions on node
:param host: compute node
:returns: True if action can be added, False otherwise
"""
if node not in self.per_node_count:
self.per_node_count[node] = 0
if not self.is_total_max() and not self.is_node_max(node):
self.per_node_count[node] += 1
self.total_count += 1
LOG.debug("total: %s, per_node: %s" % (
self.total_count, self.per_node_count))
return True
return False
def is_total_max(self):
"""Check if total count reached limit
:returns: True if total count reached limit, False otherwise
"""
return self.total_count >= self.total_limit
def is_pool_max(self, pool):
"""Check if per pool count reached limit
:returns: True if count reached limit, False otherwise
"""
if pool not in self.per_pool_count:
self.per_pool_count[pool] = 0
LOG.debug("the number of parallel per pool %s is %s " %
(pool, self.per_pool_count[pool]))
LOG.debug("per pool limit is %s" % self.per_pool_limit)
return self.per_pool_count[pool] >= self.per_pool_limit
def is_node_max(self, node):
"""Check if per node count reached limit
:returns: True if count reached limit, False otherwise
"""
if node not in self.per_node_count:
self.per_node_count[node] = 0
return self.per_node_count[node] >= self.per_node_limit
class BaseFilter(object):
"""Base class for Filter"""
apply_targets = ('ALL',)
def __init__(self, values=[], **kwargs):
"""initialization
:param values: priority value
"""
if not isinstance(values, list):
values = [values]
self.condition = values
def apply_filter(self, targets):
"""apply filter to targets
:param targets: dict of instance object and volume object list
keys of dict are instance and volume
"""
if not targets:
return {}
for cond in list(reversed(self.condition)):
for k, v in six.iteritems(targets):
if not self.is_allowed(k):
continue
LOG.debug("filter:%s with the key: %s" % (cond, k))
targets[k] = self.exec_filter(v, cond)
LOG.debug(targets)
return targets
def is_allowed(self, key):
return (key in self.apply_targets) or ('ALL' in self.apply_targets)
def exec_filter(self, items, sort_key):
"""This is implemented by sub class"""
return items
class SortMovingToFrontFilter(BaseFilter):
"""This is to move to front if a condition is True"""
def exec_filter(self, items, sort_key):
return self.sort_moving_to_front(items,
sort_key,
self.compare_func)
def sort_moving_to_front(self, items, sort_key=None, compare_func=None):
if not compare_func or not sort_key:
return items
for item in list(reversed(items)):
if compare_func(item, sort_key):
items.remove(item)
items.insert(0, item)
return items
def compare_func(self, item, sort_key):
return True
class ProjectSortFilter(SortMovingToFrontFilter):
"""ComputeHostSortFilter"""
apply_targets = ('instance', 'volume')
def __init__(self, values=[], **kwargs):
super(ProjectSortFilter, self).__init__(values, **kwargs)
def compare_func(self, item, sort_key):
"""Compare project id of item with sort_key
:param item: instance object or volume object
:param sort_key: project id
:returns: true: project id of item equals sort_key
false: otherwise
"""
project_id = self.get_project_id(item)
LOG.debug("project_id: %s, sort_key: %s" % (project_id, sort_key))
return project_id == sort_key
def get_project_id(self, item):
"""get project id of item
:param item: instance object or volume object
:returns: project id
"""
if isinstance(item, Volume):
return getattr(item, 'os-vol-tenant-attr:tenant_id')
elif isinstance(item, Server):
return item.tenant_id
class ComputeHostSortFilter(SortMovingToFrontFilter):
"""ComputeHostSortFilter"""
apply_targets = ('instance',)
def __init__(self, values=[], **kwargs):
super(ComputeHostSortFilter, self).__init__(values, **kwargs)
def compare_func(self, item, sort_key):
"""Compare compute name of item with sort_key
:param item: instance object
:param sort_key: compute host name
:returns: true: compute name on which intance is equals sort_key
false: otherwise
"""
host = self.get_host(item)
LOG.debug("host: %s, sort_key: %s" % (host, sort_key))
return host == sort_key
def get_host(self, item):
"""get hostname on which item is
:param item: instance object
:returns: hostname on which item is
"""
return getattr(item, 'OS-EXT-SRV-ATTR:host')
class StorageHostSortFilter(SortMovingToFrontFilter):
"""StoragehostSortFilter"""
apply_targets = ('volume',)
def compare_func(self, item, sort_key):
"""Compare pool name of item with sort_key
:param item: volume object
:param sort_key: storage pool name
:returns: true: pool name on which intance is equals sort_key
false: otherwise
"""
host = self.get_host(item)
LOG.debug("host: %s, sort_key: %s" % (host, sort_key))
return host == sort_key
def get_host(self, item):
return getattr(item, 'os-vol-host-attr:host')
class ComputeSpecSortFilter(BaseFilter):
"""ComputeSpecSortFilter"""
apply_targets = ('instance',)
accept_keys = ['vcpu_num', 'mem_size', 'disk_size', 'created_at']
def __init__(self, values=[], **kwargs):
super(ComputeSpecSortFilter, self).__init__(values, **kwargs)
self._nova = None
@property
def nova(self):
if self._nova is None:
self._nova = nova_helper.NovaHelper()
return self._nova
def exec_filter(self, items, sort_key):
result = items
if sort_key not in self.accept_keys:
LOG.warning("Invalid key is specified: %s" % sort_key)
else:
result = self.get_sorted_items(items, sort_key)
return result
def get_sorted_items(self, items, sort_key):
"""Sort items by sort_key
:param items: instances
:param sort_key: sort_key
:returns: items sorted by sort_key
"""
result = items
flavors = self.nova.get_flavor_list()
if sort_key == 'mem_size':
result = sorted(items,
key=lambda x: float(self.get_mem_size(x, flavors)),
reverse=True)
elif sort_key == 'vcpu_num':
result = sorted(items,
key=lambda x: float(self.get_vcpu_num(x, flavors)),
reverse=True)
elif sort_key == 'disk_size':
result = sorted(items,
key=lambda x: float(
self.get_disk_size(x, flavors)),
reverse=True)
elif sort_key == 'created_at':
result = sorted(items,
key=lambda x: parse(getattr(x, sort_key)),
reverse=False)
return result
def get_mem_size(self, item, flavors):
"""Get memory size of item
:param item: instance
:param flavors: flavors
:returns: memory size of item
"""
LOG.debug("item: %s, flavors: %s" % (item, flavors))
for flavor in flavors:
LOG.debug("item.flavor: %s, flavor: %s" % (item.flavor, flavor))
if item.flavor.get('id') == flavor.id:
LOG.debug("flavor.ram: %s" % flavor.ram)
return flavor.ram
def get_vcpu_num(self, item, flavors):
"""Get vcpu number of item
:param item: instance
:param flavors: flavors
:returns: vcpu number of item
"""
LOG.debug("item: %s, flavors: %s" % (item, flavors))
for flavor in flavors:
LOG.debug("item.flavor: %s, flavor: %s" % (item.flavor, flavor))
if item.flavor.get('id') == flavor.id:
LOG.debug("flavor.vcpus: %s" % flavor.vcpus)
return flavor.vcpus
def get_disk_size(self, item, flavors):
"""Get disk size of item
:param item: instance
:param flavors: flavors
:returns: disk size of item
"""
LOG.debug("item: %s, flavors: %s" % (item, flavors))
for flavor in flavors:
LOG.debug("item.flavor: %s, flavor: %s" % (item.flavor, flavor))
if item.flavor.get('id') == flavor.id:
LOG.debug("flavor.disk: %s" % flavor.disk)
return flavor.disk
class StorageSpecSortFilter(BaseFilter):
"""StorageSpecSortFilter"""
apply_targets = ('volume',)
accept_keys = ['size', 'created_at']
def exec_filter(self, items, sort_key):
result = items
if sort_key not in self.accept_keys:
LOG.warning("Invalid key is specified: %s" % sort_key)
return result
if sort_key == 'created_at':
result = sorted(items,
key=lambda x: parse(getattr(x, sort_key)),
reverse=False)
else:
result = sorted(items,
key=lambda x: float(getattr(x, sort_key)),
reverse=True)
LOG.debug(result)
return result

View File

@ -0,0 +1,719 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import mock
import cinderclient
import novaclient
from watcher.common import cinder_helper
from watcher.common import clients
from watcher.common import exception
from watcher.common import nova_helper
from watcher.common import utils
from watcher.decision_engine.model import model_root
from watcher.decision_engine.strategy import strategies
from watcher.tests import base
from watcher.tests.decision_engine.model import faker_cluster_state
class TestZoneMigration(base.TestCase):
def setUp(self):
super(TestZoneMigration, self).setUp()
# fake compute cluster
self.fake_c_cluster = faker_cluster_state.FakerModelCollector()
# fake storage cluster
self.fake_s_cluster = faker_cluster_state.FakerStorageModelCollector()
p_c_model = mock.patch.object(
strategies.ZoneMigration, "compute_model",
new_callable=mock.PropertyMock)
self.m_c_model = p_c_model.start()
self.addCleanup(p_c_model.stop)
p_s_model = mock.patch.object(
strategies.ZoneMigration, "storage_model",
new_callable=mock.PropertyMock)
self.m_s_model = p_s_model.start()
self.addCleanup(p_s_model.stop)
p_migrate_compute_nodes = mock.patch.object(
strategies.ZoneMigration, "migrate_compute_nodes",
new_callable=mock.PropertyMock)
self.m_migrate_compute_nodes = p_migrate_compute_nodes.start()
self.addCleanup(p_migrate_compute_nodes.stop)
p_migrate_storage_pools = mock.patch.object(
strategies.ZoneMigration, "migrate_storage_pools",
new_callable=mock.PropertyMock)
self.m_migrate_storage_pools = p_migrate_storage_pools.start()
self.addCleanup(p_migrate_storage_pools.stop)
p_parallel_total = mock.patch.object(
strategies.ZoneMigration, "parallel_total",
new_callable=mock.PropertyMock)
self.m_parallel_total = p_parallel_total.start()
self.addCleanup(p_parallel_total.stop)
p_parallel_per_node = mock.patch.object(
strategies.ZoneMigration, "parallel_per_node",
new_callable=mock.PropertyMock)
self.m_parallel_per_node = p_parallel_per_node.start()
self.addCleanup(p_parallel_per_node.stop)
p_parallel_per_pool = mock.patch.object(
strategies.ZoneMigration, "parallel_per_pool",
new_callable=mock.PropertyMock)
self.m_parallel_per_pool = p_parallel_per_pool.start()
self.addCleanup(p_parallel_per_pool.stop)
p_audit_scope = mock.patch.object(
strategies.ZoneMigration, "audit_scope",
new_callable=mock.PropertyMock
)
self.m_audit_scope = p_audit_scope.start()
self.addCleanup(p_audit_scope.stop)
p_priority = mock.patch.object(
strategies.ZoneMigration, "priority",
new_callable=mock.PropertyMock
)
self.m_priority = p_priority.start()
self.addCleanup(p_priority.stop)
model = self.fake_c_cluster.generate_scenario_1()
self.m_c_model.return_value = model
model = self.fake_s_cluster.generate_scenario_1()
self.m_s_model.return_value = model
self.m_parallel_total.return_value = 6
self.m_parallel_per_node.return_value = 2
self.m_parallel_per_pool.return_value = 2
self.m_audit_scope.return_value = mock.Mock()
self.m_migrate_compute_nodes.return_value = [
{"src_node": "src1", "dst_node": "dst1"},
{"src_node": "src2", "dst_node": "dst2"}
]
self.m_migrate_storage_pools.return_value = [
{"src_pool": "src1@back1#pool1", "dst_pool": "dst1@back1#pool1",
"src_type": "type1", "dst_type": "type1"},
{"src_pool": "src2@back1#pool1", "dst_pool": "dst2@back2#pool1",
"src_type": "type2", "dst_type": "type3"}
]
self.m_audit_scope.return_value = mock.Mock()
self.strategy = strategies.ZoneMigration(
config=mock.Mock())
self.m_osc_cls = mock.Mock()
self.m_osc = mock.Mock(spec=clients.OpenStackClients)
self.m_osc_cls.return_value = self.m_osc
m_openstack_clients = mock.patch.object(
clients, "OpenStackClients", self.m_osc_cls)
m_openstack_clients.start()
self.addCleanup(m_openstack_clients.stop)
self.m_n_helper_cls = mock.Mock()
self.m_n_helper = mock.Mock(spec=nova_helper.NovaHelper)
self.m_n_helper_cls.return_value = self.m_n_helper
m_nova_helper = mock.patch.object(
nova_helper, "NovaHelper", self.m_n_helper_cls)
m_nova_helper.start()
self.addCleanup(m_nova_helper.stop)
self.m_c_helper_cls = mock.Mock()
self.m_c_helper = mock.Mock(spec=cinder_helper.CinderHelper)
self.m_c_helper_cls.return_value = self.m_c_helper
m_cinder_helper = mock.patch.object(
cinder_helper, "CinderHelper", self.m_c_helper_cls)
m_cinder_helper.start()
self.addCleanup(m_cinder_helper.stop)
def test_exception_empty_compute_model(self):
model = model_root.ModelRoot()
self.m_c_model.return_value = model
self.assertRaises(exception.ComputeClusterEmpty, self.strategy.execute)
def test_exception_empty_storage_model(self):
c_model = self.fake_c_cluster.generate_scenario_1()
self.m_c_model.return_value = c_model
s_model = model_root.StorageModelRoot()
self.m_s_model.return_value = s_model
self.assertRaises(exception.StorageClusterEmpty, self.strategy.execute)
@staticmethod
def fake_instance(**kwargs):
instance = mock.MagicMock(spec=novaclient.v2.servers.Server)
instance.id = kwargs.get('id', utils.generate_uuid())
instance.status = kwargs.get('status', 'ACTIVE')
instance.tenant_id = kwargs.get('project_id', None)
instance.flavor = {'id': kwargs.get('flavor_id', None)}
setattr(instance, 'OS-EXT-SRV-ATTR:host', kwargs.get('host'))
setattr(instance, 'created_at',
kwargs.get('created_at', '1977-01-01T00:00:00'))
setattr(instance, 'OS-EXT-STS:vm_state', kwargs.get('state', 'active'))
return instance
@staticmethod
def fake_volume(**kwargs):
volume = mock.MagicMock(spec=cinderclient.v2.volumes.Volume)
volume.id = kwargs.get('id', utils.generate_uuid())
volume.status = kwargs.get('status', 'available')
tenant_id = kwargs.get('project_id', None)
setattr(volume, 'os-vol-tenant-attr:tenant_id', tenant_id)
setattr(volume, 'os-vol-host-attr:host', kwargs.get('host'))
setattr(volume, 'size', kwargs.get('size', '1'))
setattr(volume, 'created_at',
kwargs.get('created_at', '1977-01-01T00:00:00'))
volume.volume_type = kwargs.get('volume_type', 'type1')
return volume
@staticmethod
def fake_flavor(**kwargs):
flavor = mock.MagicMock()
flavor.id = kwargs.get('id', None)
flavor.ram = kwargs.get('mem_size', '1')
flavor.vcpus = kwargs.get('vcpu_num', '1')
flavor.disk = kwargs.get('disk_size', '1')
return flavor
def test_get_src_node_list(self):
instances = self.strategy.get_src_node_list()
self.assertEqual(sorted(instances), sorted(["src1", "src2"]))
def test_get_instances(self):
instance_on_src1 = self.fake_instance(host="src1", id="INSTANCE_1")
instance_on_src2 = self.fake_instance(host="src2", id="INSTANCE_2")
instance_on_src3 = self.fake_instance(host="src3", id="INSTANCE_3")
self.m_n_helper.get_instance_list.return_value = [
instance_on_src1,
instance_on_src2,
instance_on_src3,
]
instances = self.strategy.get_instances()
# src1,src2 is in instances
# src3 is not in instances
self.assertIn(instance_on_src1, instances)
self.assertIn(instance_on_src2, instances)
self.assertNotIn(instance_on_src3, instances)
def test_get_volumes(self):
volume_on_src1 = self.fake_volume(host="src1@back1#pool1",
id="VOLUME_1")
volume_on_src2 = self.fake_volume(host="src2@back1#pool1",
id="VOLUME_2")
volume_on_src3 = self.fake_volume(host="src3@back2#pool1",
id="VOLUME_3")
self.m_c_helper.get_volume_list.return_value = [
volume_on_src1,
volume_on_src2,
volume_on_src3,
]
volumes = self.strategy.get_volumes()
# src1,src2 is in instances
# src3 is not in instances
self.assertIn(volume_on_src1, volumes)
self.assertIn(volume_on_src2, volumes)
self.assertNotIn(volume_on_src3, volumes)
# execute #
def test_execute_live_migrate_instance(self):
instance_on_src1 = self.fake_instance(host="src1", id="INSTANCE_1")
self.m_n_helper.get_instance_list.return_value = [
instance_on_src1,
]
self.m_c_helper.get_volume_list.return_value = []
solution = self.strategy.execute()
migration_types = collections.Counter(
[action.get('input_parameters')['migration_type']
for action in solution.actions])
self.assertEqual(1, migration_types.get("live", 0))
global_efficacy_value = solution.global_efficacy[0].get('value', 0)
self.assertEqual(100, global_efficacy_value)
def test_execute_cold_migrate_instance(self):
instance_on_src1 = self.fake_instance(host="src1", id="INSTANCE_1")
setattr(instance_on_src1, "status", "SHUTOFF")
setattr(instance_on_src1, "OS-EXT-STS:vm_state", "stopped")
self.m_n_helper.get_instance_list.return_value = [
instance_on_src1,
]
self.m_c_helper.get_volume_list.return_value = []
solution = self.strategy.execute()
migration_types = collections.Counter(
[action.get('input_parameters')['migration_type']
for action in solution.actions])
self.assertEqual(1, migration_types.get("cold", 0))
global_efficacy_value = solution.global_efficacy[1].get('value', 0)
self.assertEqual(100, global_efficacy_value)
def test_execute_migrate_volume(self):
volume_on_src1 = self.fake_volume(host="src1@back1#pool1",
id="VOLUME_1")
self.m_c_helper.get_volume_list.return_value = [
volume_on_src1,
]
self.m_n_helper.get_instance_list.return_value = []
solution = self.strategy.execute()
migration_types = collections.Counter(
[action.get('input_parameters')['migration_type']
for action in solution.actions])
self.assertEqual(1, migration_types.get("migrate", 0))
global_efficacy_value = solution.global_efficacy[2].get('value', 0)
self.assertEqual(100, global_efficacy_value)
def test_execute_retype_volume(self):
volume_on_src2 = self.fake_volume(host="src2@back1#pool1",
id="VOLUME_2")
self.m_c_helper.get_volume_list.return_value = [
volume_on_src2,
]
self.m_n_helper.get_instance_list.return_value = []
solution = self.strategy.execute()
migration_types = collections.Counter(
[action.get('input_parameters')['migration_type']
for action in solution.actions])
self.assertEqual(1, migration_types.get("retype", 0))
global_efficacy_value = solution.global_efficacy[2].get('value', 0)
self.assertEqual(100, global_efficacy_value)
def test_execute_swap_volume(self):
volume_on_src1 = self.fake_volume(host="src1@back1#pool1",
id="VOLUME_1")
volume_on_src1.status = "in-use"
self.m_c_helper.get_volume_list.return_value = [
volume_on_src1,
]
self.m_n_helper.get_instance_list.return_value = []
solution = self.strategy.execute()
migration_types = collections.Counter(
[action.get('input_parameters')['migration_type']
for action in solution.actions])
self.assertEqual(1, migration_types.get("swap", 0))
global_efficacy_value = solution.global_efficacy[3].get('value', 0)
self.assertEqual(100, global_efficacy_value)
def test_execute_live_migrate_instance_parallel(self):
instance_on_src1_1 = self.fake_instance(host="src1", id="INSTANCE_1")
instance_on_src1_2 = self.fake_instance(host="src1", id="INSTANCE_2")
self.m_n_helper.get_instance_list.return_value = [
instance_on_src1_1,
instance_on_src1_2,
]
self.m_c_helper.get_volume_list.return_value = []
solution = self.strategy.execute()
migration_types = collections.Counter(
[action.get('input_parameters')['migration_type']
for action in solution.actions])
self.assertEqual(2, migration_types.get("live", 0))
global_efficacy_value = solution.global_efficacy[0].get('value', 0)
self.assertEqual(100, global_efficacy_value)
def test_execute_parallel_per_node(self):
self.m_parallel_per_node.return_value = 1
instance_on_src1_1 = self.fake_instance(host="src1", id="INSTANCE_1")
instance_on_src1_2 = self.fake_instance(host="src1", id="INSTANCE_2")
self.m_n_helper.get_instance_list.return_value = [
instance_on_src1_1,
instance_on_src1_2,
]
self.m_c_helper.get_volume_list.return_value = []
solution = self.strategy.execute()
migration_types = collections.Counter(
[action.get('input_parameters')['migration_type']
for action in solution.actions])
self.assertEqual(1, migration_types.get("live", 0))
global_efficacy_value = solution.global_efficacy[0].get('value', 0)
self.assertEqual(50.0, global_efficacy_value)
def test_execute_migrate_volume_parallel(self):
volume_on_src1_1 = self.fake_volume(host="src1@back1#pool1",
id="VOLUME_1")
volume_on_src1_2 = self.fake_volume(host="src1@back1#pool1",
id="VOLUME_2")
self.m_c_helper.get_volume_list.return_value = [
volume_on_src1_1,
volume_on_src1_2,
]
self.m_n_helper.get_instance_list.return_value = []
solution = self.strategy.execute()
migration_types = collections.Counter(
[action.get('input_parameters')['migration_type']
for action in solution.actions])
self.assertEqual(2, migration_types.get("migrate", 0))
global_efficacy_value = solution.global_efficacy[2].get('value', 0)
self.assertEqual(100, global_efficacy_value)
def test_execute_parallel_per_pool(self):
self.m_parallel_per_pool.return_value = 1
volume_on_src1_1 = self.fake_volume(host="src1@back1#pool1",
id="VOLUME_1")
volume_on_src1_2 = self.fake_volume(host="src1@back1#pool1",
id="VOLUME_2")
self.m_c_helper.get_volume_list.return_value = [
volume_on_src1_1,
volume_on_src1_2,
]
self.m_n_helper.get_instance_list.return_value = []
solution = self.strategy.execute()
migration_types = collections.Counter(
[action.get('input_parameters')['migration_type']
for action in solution.actions])
self.assertEqual(1, migration_types.get("migrate", 0))
global_efficacy_value = solution.global_efficacy[2].get('value', 0)
self.assertEqual(50.0, global_efficacy_value)
def test_execute_parallel_total(self):
self.m_parallel_total.return_value = 1
self.m_parallel_per_pool.return_value = 1
volume_on_src1_1 = self.fake_volume(host="src1@back1#pool1",
id="VOLUME_1")
volume_on_src1_2 = self.fake_volume(host="src1@back1#pool1",
id="VOLUME_2")
volume_on_src2_1 = self.fake_volume(host="src2@back1#pool1",
id="VOLUME_3")
self.m_c_helper.get_volume_list.return_value = [
volume_on_src1_1,
volume_on_src1_2,
volume_on_src2_1,
]
self.m_n_helper.get_instance_list.return_value = []
solution = self.strategy.execute()
migration_types = collections.Counter(
[action.get('input_parameters')['migration_type']
for action in solution.actions])
self.assertEqual(1, migration_types.get("migrate", 0))
# priority filter #
def test_get_priority_filter_list(self):
self.m_priority.return_value = {
"project": ["pj1"],
"compute_node": ["compute1", "compute2"],
"compute": ["cpu_num"],
"storage_pool": ["pool1", "pool2"],
"storage": ["size"]
}
filters = self.strategy.get_priority_filter_list()
self.assertIn(strategies.zone_migration.ComputeHostSortFilter,
map(lambda l: l.__class__, filters))
self.assertIn(strategies.zone_migration.StorageHostSortFilter,
map(lambda l: l.__class__, filters))
self.assertIn(strategies.zone_migration.ProjectSortFilter,
map(lambda l: l.__class__, filters))
# ComputeHostSortFilter #
def test_filtered_targets_compute_nodes(self):
instance_on_src1 = self.fake_instance(host="src1", id="INSTANCE_1")
instance_on_src2 = self.fake_instance(host="src2", id="INSTANCE_2")
instance_on_src3 = self.fake_instance(host="src3", id="INSTANCE_3")
self.m_n_helper.get_instance_list.return_value = [
instance_on_src1,
instance_on_src2,
instance_on_src3,
]
self.m_c_helper.get_volume_list.return_value = []
self.m_priority.return_value = {
"compute_node": ["src1", "src2"],
}
targets = self.strategy.filtered_targets()
self.assertEqual(targets.get('instance'),
[instance_on_src1, instance_on_src2])
# StorageHostSortFilter #
def test_filtered_targets_storage_pools(self):
volume_on_src1 = self.fake_volume(host="src1@back1#pool1",
id="VOLUME_1")
volume_on_src2 = self.fake_volume(host="src2@back1#pool1",
id="VOLUME_2")
volume_on_src3 = self.fake_volume(host="src3@back2#pool1",
id="VOLUME_3")
self.m_c_helper.get_volume_list.return_value = [
volume_on_src1,
volume_on_src2,
volume_on_src3,
]
self.m_n_helper.get_instance_list.return_value = []
self.m_priority.return_value = {
"storage_pool": ["src1@back1#pool1", "src2@back1#pool1"],
}
targets = self.strategy.filtered_targets()
self.assertEqual(targets.get("volume"),
[volume_on_src1, volume_on_src2])
# ProjectSortFilter #
def test_filtered_targets_project(self):
instance_on_src1 = self.fake_instance(
host="src1", id="INSTANCE_1", project_id="pj2")
instance_on_src2 = self.fake_instance(
host="src2", id="INSTANCE_2", project_id="pj1")
instance_on_src3 = self.fake_instance(
host="src3", id="INSTANCE_3", project_id="pj3")
self.m_n_helper.get_instance_list.return_value = [
instance_on_src1,
instance_on_src2,
instance_on_src3,
]
self.m_c_helper.get_volume_list.return_value = []
volume_on_src1 = self.fake_volume(host="src1@back1#pool1",
id="VOLUME_1", project_id="pj2")
volume_on_src2 = self.fake_volume(host="src2@back1#pool1",
id="VOLUME_2", project_id="pj1")
volume_on_src3 = self.fake_volume(host="src3@back2#pool1",
id="VOLUME_3", project_id="pj3")
self.m_c_helper.get_volume_list.return_value = [
volume_on_src1,
volume_on_src2,
volume_on_src3,
]
self.m_priority.return_value = {
"project": ["pj1"],
}
targets = self.strategy.filtered_targets()
self.assertEqual(targets.get('instance'),
[instance_on_src2, instance_on_src1])
self.assertEqual(targets.get('volume'),
[volume_on_src2, volume_on_src1])
self.assertEqual(targets,
{"instance": [instance_on_src2, instance_on_src1],
"volume": [volume_on_src2, volume_on_src1]})
# ComputeSpecSortFilter #
def test_filtered_targets_instance_mem_size(self):
flavor_64 = self.fake_flavor(id="1", mem_size="64")
flavor_128 = self.fake_flavor(id="2", mem_size="128")
flavor_512 = self.fake_flavor(id="3", mem_size="512")
self.m_n_helper.get_flavor_list.return_value = [
flavor_64,
flavor_128,
flavor_512,
]
instance_on_src1 = self.fake_instance(host="src1",
id="INSTANCE_1", flavor_id="1")
instance_on_src2 = self.fake_instance(host="src2",
id="INSTANCE_2", flavor_id="2")
instance_on_src3 = self.fake_instance(host="src3",
id="INSTANCE_3", flavor_id="3")
self.m_n_helper.get_instance_list.return_value = [
instance_on_src1,
instance_on_src2,
instance_on_src3,
]
self.m_c_helper.get_volume_list.return_value = []
self.m_priority.return_value = {
"compute": ["mem_size"],
}
targets = self.strategy.filtered_targets()
self.assertEqual(targets.get('instance'),
[instance_on_src2, instance_on_src1])
def test_filtered_targets_instance_vcpu_num(self):
flavor_1 = self.fake_flavor(id="1", vcpu_num="1")
flavor_2 = self.fake_flavor(id="2", vcpu_num="2")
flavor_3 = self.fake_flavor(id="3", vcpu_num="3")
self.m_n_helper.get_flavor_list.return_value = [
flavor_1,
flavor_2,
flavor_3,
]
instance_on_src1 = self.fake_instance(host="src1",
id="INSTANCE_1", flavor_id="1")
instance_on_src2 = self.fake_instance(host="src2",
id="INSTANCE_2", flavor_id="2")
instance_on_src3 = self.fake_instance(host="src3",
id="INSTANCE_3", flavor_id="3")
self.m_n_helper.get_instance_list.return_value = [
instance_on_src1,
instance_on_src2,
instance_on_src3,
]
self.m_c_helper.get_volume_list.return_value = []
self.m_priority.return_value = {
"compute": ["vcpu_num"],
}
targets = self.strategy.filtered_targets()
self.assertEqual(targets.get('instance'),
[instance_on_src2, instance_on_src1])
def test_filtered_targets_instance_disk_size(self):
flavor_1 = self.fake_flavor(id="1", disk_size="1")
flavor_2 = self.fake_flavor(id="2", disk_size="2")
flavor_3 = self.fake_flavor(id="3", disk_size="3")
self.m_n_helper.get_flavor_list.return_value = [
flavor_1,
flavor_2,
flavor_3,
]
instance_on_src1 = self.fake_instance(host="src1",
id="INSTANCE_1", flavor_id="1")
instance_on_src2 = self.fake_instance(host="src2",
id="INSTANCE_2", flavor_id="2")
instance_on_src3 = self.fake_instance(host="src3",
id="INSTANCE_3", flavor_id="3")
self.m_n_helper.get_instance_list.return_value = [
instance_on_src1,
instance_on_src2,
instance_on_src3,
]
self.m_c_helper.get_volume_list.return_value = []
self.m_priority.return_value = {
"compute": ["disk_size"],
}
targets = self.strategy.filtered_targets()
self.assertEqual(targets.get('instance'),
[instance_on_src2, instance_on_src1])
def test_filtered_targets_instance_created_at(self):
instance_on_src1 = self.fake_instance(
host="src1", id="INSTANCE_1", created_at="2017-10-30T00:00:00")
instance_on_src2 = self.fake_instance(
host="src2", id="INSTANCE_2", created_at="1977-03-29T03:03:03")
instance_on_src3 = self.fake_instance(
host="src3", id="INSTANCE_3", created_at="1977-03-29T03:03:03")
self.m_n_helper.get_instance_list.return_value = [
instance_on_src1,
instance_on_src2,
instance_on_src3,
]
self.m_c_helper.get_volume_list.return_value = []
self.m_priority.return_value = {
"compute": ["created_at"],
}
targets = self.strategy.filtered_targets()
self.assertEqual(targets.get('instance'),
[instance_on_src2, instance_on_src1])
# StorageSpecSortFilter #
def test_filtered_targets_storage_size(self):
volume_on_src1 = self.fake_volume(host="src1@back1#pool1",
size="1", id="VOLUME_1")
volume_on_src2 = self.fake_volume(host="src2@back1#pool1",
size="2", id="VOLUME_2")
volume_on_src3 = self.fake_volume(host="src3@back2#pool1",
size="3", id="VOLUME_3")
self.m_c_helper.get_volume_list.return_value = [
volume_on_src1,
volume_on_src2,
volume_on_src3,
]
self.m_n_helper.get_instance_list.return_value = []
self.m_priority.return_value = {
"storage": ["size"]
}
targets = self.strategy.filtered_targets()
self.assertEqual(targets.get("volume"),
[volume_on_src2, volume_on_src1])
def test_filtered_targets_storage_created_at(self):
volume_on_src1 = self.fake_volume(host="src1@back1#pool1",
id="VOLUME_1",
created_at="2017-10-30T00:00:00")
volume_on_src2 = self.fake_volume(host="src2@back1#pool1",
id="VOLUME_2",
created_at="1977-03-29T03:03:03")
volume_on_src3 = self.fake_volume(host="src3@back2#pool1",
id="VOLUME_3",
created_at="1977-03-29T03:03:03")
self.m_c_helper.get_volume_list.return_value = [
volume_on_src1,
volume_on_src2,
volume_on_src3,
]
self.m_n_helper.get_instance_list.return_value = []
self.m_priority.return_value = {
"storage": ["created_at"]
}
targets = self.strategy.filtered_targets()
self.assertEqual(targets.get("volume"),
[volume_on_src2, volume_on_src1])