diff --git a/setup.cfg b/setup.cfg index c74403504..107329b7d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -49,6 +49,7 @@ watcher_strategies = dummy = watcher.decision_engine.strategy.strategies.dummy_strategy:DummyStrategy basic = watcher.decision_engine.strategy.strategies.basic_consolidation:BasicConsolidation outlet_temp_control = watcher.decision_engine.strategy.strategies.outlet_temp_control:OutletTempControl + vm_workload_consolidation = watcher.decision_engine.strategy.strategies.vm_workload_consolidation:VMWorkloadConsolidation watcher_actions = migrate = watcher.applier.actions.migration:Migrate diff --git a/watcher/decision_engine/strategy/strategies/__init__.py b/watcher/decision_engine/strategy/strategies/__init__.py index 5b7521b26..84a9e5fcd 100644 --- a/watcher/decision_engine/strategy/strategies/__init__.py +++ b/watcher/decision_engine/strategy/strategies/__init__.py @@ -18,10 +18,14 @@ from watcher.decision_engine.strategy.strategies import basic_consolidation from watcher.decision_engine.strategy.strategies import dummy_strategy from watcher.decision_engine.strategy.strategies import outlet_temp_control +from watcher.decision_engine.strategy.strategies \ + import vm_workload_consolidation BasicConsolidation = basic_consolidation.BasicConsolidation OutletTempControl = outlet_temp_control.OutletTempControl DummyStrategy = dummy_strategy.DummyStrategy +VMWorkloadConsolidation = vm_workload_consolidation.VMWorkloadConsolidation -__all__ = (BasicConsolidation, OutletTempControl, DummyStrategy) +__all__ = (BasicConsolidation, OutletTempControl, DummyStrategy, + VMWorkloadConsolidation) diff --git a/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py b/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py new file mode 100644 index 000000000..260370fbb --- /dev/null +++ b/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py @@ -0,0 +1,522 @@ +# -*- encoding: utf-8 -*- +# +# Authors: Vojtech CIMA +# Bruno GRAZIOLI +# Sean MURPHY +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from copy import deepcopy + +from oslo_log import log + +from watcher._i18n import _LE, _LI +from watcher.common import exception +from watcher.decision_engine.model import hypervisor_state as hyper_state +from watcher.decision_engine.model import resource +from watcher.decision_engine.model import vm_state +from watcher.decision_engine.strategy.strategies import base +from watcher.metrics_engine.cluster_history import ceilometer \ + as ceilometer_cluster_history + +LOG = log.getLogger(__name__) + + +class VMWorkloadConsolidation(base.BaseStrategy): + """VM Workload Consolidation Strategy. + + A load consolidation strategy based on heuristic first-fit + algorithm which focuses on measured CPU utilization and tries to + minimize hosts which have too much or too little load respecting + resource capacity constraints. + + This strategy produces a solution resulting in more efficient + utilization of cluster resources using following four phases: + * Offload phase - handling over-utilized resources + * Consolidation phase - handling under-utilized resources + * Solution optimization - reducing number of migrations + * Deactivation of unused hypervisors + + A capacity coefficients (cc) might be used to adjust optimization + thresholds. Different resources may require different coefficient + values as well as setting up different coefficient values in both + phases may lead to to more efficient consolidation in the end. + If the cc equals 1 the full resource capacity may be used, cc + values lower than 1 will lead to resource under utilization and + values higher than 1 will lead to resource overbooking. + e.g. If targeted utilization is 80% of hypervisor capacity, + the coefficient in the consolidation phase will be 0.8, but + may any lower value in the offloading phase. The lower it gets + the cluster will appear more released (distributed) for the + following consolidation phase. + + As this strategy laverages VM live migration to move the load + from one hypervisor to another, this feature needs to be set up + correctly on all hypervisors within the cluster. + This strategy assumes it is possible to live migrate any VM from + an active hypervisor to any other active hypervisor. + """ + + DEFAULT_NAME = 'vm_workload_consolidation' + DEFAULT_DESCRIPTION = 'VM Workload Consolidation Strategy' + + def __init__(self, name=DEFAULT_NAME, description=DEFAULT_DESCRIPTION, + osc=None): + super(VMWorkloadConsolidation, self).__init__(name, description, osc) + self._ceilometer = None + self.number_of_migrations = 0 + self.number_of_released_hypervisors = 0 + self.ceilometer_vm_data_cache = dict() + + @property + def ceilometer(self): + if self._ceilometer is None: + self._ceilometer = (ceilometer_cluster_history. + CeilometerClusterHistory(osc=self.osc)) + return self._ceilometer + + @ceilometer.setter + def ceilometer(self, ceilometer): + self._ceilometer = ceilometer + + def get_state_str(self, state): + """Get resource state in string format. + + :param state: resource state of unknown type + """ + if type(state) == str: + return state + elif (type(state) == hyper_state.HypervisorState or + type(state) == vm_state.VMState): + return state.value + else: + LOG.error(_LE('Unexpexted resource state type, ' + 'state=%(state)s, state_type=%(st)s.'), + state=state, + st=type(state)) + raise exception.WatcherException + + def add_action_activate_hypervisor(self, hypervisor): + """Add an action for hypervisor activation into the solution. + + :param hypervisor: hypervisor object + :return: None + """ + params = {'state': hyper_state.HypervisorState.ONLINE.value} + self.solution.add_action( + action_type='change_nova_service_state', + resource_id=hypervisor.uuid, + input_parameters=params) + self.number_of_released_hypervisors -= 1 + + def add_action_deactivate_hypervisor(self, hypervisor): + """Add an action for hypervisor deactivation into the solution. + + :param hypervisor: hypervisor object + :return: None + """ + params = {'state': hyper_state.HypervisorState.OFFLINE.value} + self.solution.add_action( + action_type='change_nova_service_state', + resource_id=hypervisor.uuid, + input_parameters=params) + self.number_of_released_hypervisors += 1 + + def add_migration(self, vm_uuid, src_hypervisor, + dst_hypervisor, model): + """Add an action for VM migration into the solution. + + :param vm_uuid: vm uuid + :param src_hypervisor: hypervisor object + :param dst_hypervisor: hypervisor object + :param model: model_root object + :return: None + """ + vm = model.get_vm_from_id(vm_uuid) + + vm_state_str = self.get_state_str(vm.state) + if vm_state_str != vm_state.VMState.ACTIVE.value: + ''' + Watcher curently only supports live VM migration and block live + VM migration which both requires migrated VM to be active. + When supported, the cold migration may be used as a fallback + migration mechanism to move non active VMs. + ''' + LOG.error(_LE('Cannot live migrate: vm_uuid=%(vm_uuid)s, ' + 'state=%(vm_state)s.'), + vm_uuid=vm_uuid, + vm_state=vm_state_str) + raise exception.WatcherException + + migration_type = 'live' + + dst_hyper_state_str = self.get_state_str(dst_hypervisor.state) + if dst_hyper_state_str == hyper_state.HypervisorState.OFFLINE.value: + self.add_action_activate_hypervisor(dst_hypervisor) + model.get_mapping().unmap(src_hypervisor, vm) + model.get_mapping().map(dst_hypervisor, vm) + + params = {'migration_type': migration_type, + 'src_hypervisor': src_hypervisor.uuid, + 'dst_hypervisor': dst_hypervisor.uuid} + self.solution.add_action(action_type='migrate', + resource_id=vm.uuid, + input_parameters=params) + self.number_of_migrations += 1 + + def deactivate_unused_hypervisors(self, model): + """Generate actions for deactivation of unused hypervisors. + + :param model: model_root object + :return: None + """ + for hypervisor in model.get_all_hypervisors().values(): + if len(model.get_mapping().get_node_vms(hypervisor)) == 0: + self.add_action_deactivate_hypervisor(hypervisor) + + def get_prediction_model(self, model): + """Return a deepcopy of a model representing current cluster state. + + :param model: model_root object + :return: model_root object + """ + return deepcopy(model) + + def get_vm_utilization(self, vm_uuid, model, period=3600, aggr='avg'): + """Collect cpu, ram and disk utilization statistics of a VM. + + :param vm_uuid: vm object + :param model: model_root object + :param period: seconds + :param aggr: string + :return: dict(cpu(number of vcpus used), ram(MB used), disk(B used)) + """ + if vm_uuid in self.ceilometer_vm_data_cache.keys(): + return self.ceilometer_vm_data_cache.get(vm_uuid) + + cpu_util_metric = 'cpu_util' + ram_util_metric = 'memory.usage' + + ram_alloc_metric = 'memory' + disk_alloc_metric = 'disk.root.size' + vm_cpu_util = self.ceilometer.statistic_aggregation( + resource_id=vm_uuid, meter_name=cpu_util_metric, + period=period, aggregate=aggr) + vm_cpu_cores = model.get_resource_from_id( + resource.ResourceType.cpu_cores).get_capacity( + model.get_vm_from_id(vm_uuid)) + + if vm_cpu_util: + total_cpu_utilization = vm_cpu_cores * (vm_cpu_util / 100.0) + else: + total_cpu_utilization = vm_cpu_cores + + vm_ram_util = self.ceilometer.statistic_aggregation( + resource_id=vm_uuid, meter_name=ram_util_metric, + period=period, aggregate=aggr) + + if not vm_ram_util: + vm_ram_util = self.ceilometer.statistic_aggregation( + resource_id=vm_uuid, meter_name=ram_alloc_metric, + period=period, aggregate=aggr) + + vm_disk_util = self.ceilometer.statistic_aggregation( + resource_id=vm_uuid, meter_name=disk_alloc_metric, + period=period, aggregate=aggr) + + if not vm_ram_util or not vm_disk_util: + LOG.error( + _LE('No values returned by %(resource_id)s ' + 'for memory.usage or disk.root.size'), + resource_id=vm_uuid + ) + raise exception.NoDataFound + + self.ceilometer_vm_data_cache[vm_uuid] = dict( + cpu=total_cpu_utilization, ram=vm_ram_util, disk=vm_disk_util) + return self.ceilometer_vm_data_cache.get(vm_uuid) + + def get_hypervisor_utilization(self, hypervisor, model, period=3600, + aggr='avg'): + """Collect cpu, ram and disk utilization statistics of a hypervisor. + + :param hypervisor: hypervisor object + :param model: model_root object + :param period: seconds + :param aggr: string + :return: dict(cpu(number of cores used), ram(MB used), disk(B used)) + """ + hypervisor_vms = model.get_mapping().get_node_vms_from_id( + hypervisor.uuid) + hypervisor_ram_util = 0 + hypervisor_disk_util = 0 + hypervisor_cpu_util = 0 + for vm_uuid in hypervisor_vms: + vm_util = self.get_vm_utilization(vm_uuid, model, period, aggr) + hypervisor_cpu_util += vm_util['cpu'] + hypervisor_ram_util += vm_util['ram'] + hypervisor_disk_util += vm_util['disk'] + + return dict(cpu=hypervisor_cpu_util, ram=hypervisor_ram_util, + disk=hypervisor_disk_util) + + def get_hypervisor_capacity(self, hypervisor, model): + """Collect cpu, ram and disk capacity of a hypervisor. + + :param hypervisor: hypervisor object + :param model: model_root object + :return: dict(cpu(cores), ram(MB), disk(B)) + """ + hypervisor_cpu_capacity = model.get_resource_from_id( + resource.ResourceType.cpu_cores).get_capacity(hypervisor) + + hypervisor_disk_capacity = model.get_resource_from_id( + resource.ResourceType.disk_capacity).get_capacity(hypervisor) + + hypervisor_ram_capacity = model.get_resource_from_id( + resource.ResourceType.memory).get_capacity(hypervisor) + return dict(cpu=hypervisor_cpu_capacity, ram=hypervisor_ram_capacity, + disk=hypervisor_disk_capacity) + + def get_relative_hypervisor_utilization(self, hypervisor, model): + """Return relative hypervisor utilization (rhu). + + :param hypervisor: hypervisor object + :param model: model_root object + :return: {'cpu': <0,1>, 'ram': <0,1>, 'disk': <0,1>} + """ + rhu = {} + util = self.get_hypervisor_utilization(hypervisor, model) + cap = self.get_hypervisor_capacity(hypervisor, model) + for k in util.keys(): + rhu[k] = float(util[k]) / float(cap[k]) + return rhu + + def get_relative_cluster_utilization(self, model): + """Calculate relative cluster utilization (rcu). + + RCU is an average of relative utilizations (rhu) of active hypervisors. + :param model: model_root object + :return: {'cpu': <0,1>, 'ram': <0,1>, 'disk': <0,1>} + """ + hypervisors = model.get_all_hypervisors().values() + rcu = {} + counters = {} + for hypervisor in hypervisors: + hyper_state_str = self.get_state_str(hypervisor.state) + if hyper_state_str == hyper_state.HypervisorState.ONLINE.value: + rhu = self.get_relative_hypervisor_utilization( + hypervisor, model) + for k in rhu.keys(): + if k not in rcu: + rcu[k] = 0 + if k not in counters: + counters[k] = 0 + rcu[k] += rhu[k] + counters[k] += 1 + for k in rcu.keys(): + rcu[k] /= counters[k] + return rcu + + def is_overloaded(self, hypervisor, model, cc): + """Indicate whether a hypervisor is overloaded. + + This considers provided resource capacity coefficients (cc). + :param hypervisor: hypervisor object + :param model: model_root object + :param cc: dictionary containing resource capacity coefficients + :return: [True, False] + """ + hypervisor_capacity = self.get_hypervisor_capacity(hypervisor, model) + hypervisor_utilization = self.get_hypervisor_utilization( + hypervisor, model) + metrics = ['cpu'] + for m in metrics: + if hypervisor_utilization[m] > hypervisor_capacity[m] * cc[m]: + return True + return False + + def vm_fits(self, vm_uuid, hypervisor, model, cc): + """Indicate whether is a hypervisor able to accomodate a VM. + + This considers provided resource capacity coefficients (cc). + :param vm_uuid: string + :param hypervisor: hypervisor object + :param model: model_root object + :param cc: dictionary containing resource capacity coefficients + :return: [True, False] + """ + hypervisor_capacity = self.get_hypervisor_capacity(hypervisor, model) + hypervisor_utilization = self.get_hypervisor_utilization( + hypervisor, model) + vm_utilization = self.get_vm_utilization(vm_uuid, model) + metrics = ['cpu', 'ram', 'disk'] + for m in metrics: + if (vm_utilization[m] + hypervisor_utilization[m] > + hypervisor_capacity[m] * cc[m]): + return False + return True + + def optimize_solution(self, model): + """Optimize solution. + + This is done by eliminating unnecessary or circular set of migrations + which can be replaced by a more efficient solution. + e.g.: + * A->B, B->C => replace migrations A->B, B->C with + a single migration A->C as both solution result in + VM running on hypervisor C which can be achieved with + one migration instead of two. + * A->B, B->A => remove A->B and B->A as they do not result + in a new VM placement. + + :param model: model_root object + """ + migrate_actions = ( + a for a in self.solution.actions if a[ + 'action_type'] == 'migrate') + vm_to_be_migrated = (a['input_parameters']['resource_id'] + for a in migrate_actions) + vm_uuids = list(set(vm_to_be_migrated)) + for vm_uuid in vm_uuids: + actions = list( + a for a in self.solution.actions if a[ + 'input_parameters'][ + 'resource_id'] == vm_uuid) + if len(actions) > 1: + src = actions[0]['input_parameters']['src_hypervisor'] + dst = actions[-1]['input_parameters']['dst_hypervisor'] + for a in actions: + self.solution.actions.remove(a) + self.number_of_migrations -= 1 + if src != dst: + self.add_migration(vm_uuid, src, dst, model) + + def offload_phase(self, model, cc): + """Perform offloading phase. + + This considers provided resource capacity coefficients. + Offload phase performing first-fit based bin packing to offload + overloaded hypervisors. This is done in a fashion of moving + the least CPU utilized VM first as live migration these + generaly causes less troubles. This phase results in a cluster + with no overloaded hypervisors. + * This phase is be able to activate turned off hypervisors (if needed + and any available) in the case of the resource capacity provided by + active hypervisors is not able to accomodate all the load. + As the offload phase is later followed by the consolidation phase, + the hypervisor activation in this phase doesn't necessarily results + in more activated hypervisors in the final solution. + + :param model: model_root object + :param cc: dictionary containing resource capacity coefficients + """ + sorted_hypervisors = sorted( + model.get_all_hypervisors().values(), + key=lambda x: self.get_hypervisor_utilization(x, model)['cpu']) + for hypervisor in reversed(sorted_hypervisors): + if self.is_overloaded(hypervisor, model, cc): + for vm in sorted(model.get_mapping().get_node_vms(hypervisor), + key=lambda x: self.get_vm_utilization( + x, model)['cpu']): + for dst_hypervisor in reversed(sorted_hypervisors): + if self.vm_fits(vm, dst_hypervisor, model, cc): + self.add_migration(vm, hypervisor, + dst_hypervisor, model) + break + if not self.is_overloaded(hypervisor, model, cc): + break + + def consolidation_phase(self, model, cc): + """Perform consolidation phase. + + This considers provided resource capacity coefficients. + Consolidation phase performing first-fit based bin packing. + First, hypervisors with the lowest cpu utilization are consolidated + by moving their load to hypervisors with the highest cpu utilization + which can accomodate the load. In this phase the most cpu utilizied + VMs are prioritizied as their load is more difficult to accomodate + in the system than less cpu utilizied VMs which can be later used + to fill smaller CPU capacity gaps. + + :param model: model_root object + :param cc: dictionary containing resource capacity coefficients + """ + sorted_hypervisors = sorted( + model.get_all_hypervisors().values(), + key=lambda x: self.get_hypervisor_utilization(x, model)['cpu']) + asc = 0 + for hypervisor in sorted_hypervisors: + vms = sorted(model.get_mapping().get_node_vms(hypervisor), + key=lambda x: self.get_vm_utilization(x, + model)['cpu']) + for vm in reversed(vms): + dsc = len(sorted_hypervisors) - 1 + for dst_hypervisor in reversed(sorted_hypervisors): + if asc >= dsc: + break + if self.vm_fits(vm, dst_hypervisor, model, cc): + self.add_migration(vm, hypervisor, + dst_hypervisor, model) + break + dsc -= 1 + asc += 1 + + def execute(self, original_model): + """Execute strategy. + + This strategy produces a solution resulting in more + efficient utilization of cluster resources using following + four phases: + * Offload phase - handling over-utilized resources + * Consolidation phase - handling under-utilized resources + * Solution optimization - reducing number of migrations + * Deactivation of unused hypervisors + + :param original_model: root_model object + """ + LOG.info(_LI('Executing Smart Strategy')) + model = self.get_prediction_model(original_model) + rcu = self.get_relative_cluster_utilization(model) + self.ceilometer_vm_data_cache = dict() + + cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} + + # Offloading phase + self.offload_phase(model, cc) + + # Consolidation phase + self.consolidation_phase(model, cc) + + # Optimize solution + self.optimize_solution(model) + + # Deactivate unused hypervisors + self.deactivate_unused_hypervisors(model) + + rcu_after = self.get_relative_cluster_utilization(model) + info = { + 'number_of_migrations': self.number_of_migrations, + 'number_of_released_hypervisors': + self.number_of_released_hypervisors, + 'relative_cluster_utilization_before': str(rcu), + 'relative_cluster_utilization_after': str(rcu_after) + } + + LOG.debug(info) + + self.solution.model = model + self.solution.efficacy = rcu_after['cpu'] + + return self.solution diff --git a/watcher/tests/decision_engine/strategy/strategies/faker_cluster_and_metrics.py b/watcher/tests/decision_engine/strategy/strategies/faker_cluster_and_metrics.py new file mode 100644 index 000000000..5801c5ae9 --- /dev/null +++ b/watcher/tests/decision_engine/strategy/strategies/faker_cluster_and_metrics.py @@ -0,0 +1,264 @@ +# -*- encoding: utf-8 -*- +# +# Authors: Vojtech CIMA +# Bruno GRAZIOLI +# Sean MURPHY +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from watcher.decision_engine.model import hypervisor +from watcher.decision_engine.model import model_root as modelroot +from watcher.decision_engine.model import resource +from watcher.decision_engine.model import vm as modelvm +from watcher.decision_engine.model import vm_state +from watcher.metrics_engine.cluster_model_collector import base + + +class FakerModelCollector(base.BaseClusterModelCollector): + + def __init__(self): + pass + + def get_latest_cluster_data_model(self): + return self.generate_scenario_1() + + def generate_scenario_1(self): + """Simulates cluster with 2 hypervisors and 2 VMs using 1:1 mapping""" + + current_state_cluster = modelroot.ModelRoot() + count_node = 2 + count_vm = 2 + + mem = resource.Resource(resource.ResourceType.memory) + num_cores = resource.Resource(resource.ResourceType.cpu_cores) + disk = resource.Resource(resource.ResourceType.disk) + disk_capacity =\ + resource.Resource(resource.ResourceType.disk_capacity) + + current_state_cluster.create_resource(mem) + current_state_cluster.create_resource(num_cores) + current_state_cluster.create_resource(disk) + current_state_cluster.create_resource(disk_capacity) + + for i in range(0, count_node): + node_uuid = "Node_{0}".format(i) + node = hypervisor.Hypervisor() + node.uuid = node_uuid + node.hostname = "hostname_{0}".format(i) + node.state = 'up' + + mem.set_capacity(node, 64) + disk_capacity.set_capacity(node, 250) + num_cores.set_capacity(node, 40) + current_state_cluster.add_hypervisor(node) + + for i in range(0, count_vm): + vm_uuid = "VM_{0}".format(i) + vm = modelvm.VM() + vm.uuid = vm_uuid + vm.state = vm_state.VMState.ACTIVE + mem.set_capacity(vm, 2) + disk.set_capacity(vm, 20) + num_cores.set_capacity(vm, 10) + current_state_cluster.add_vm(vm) + + current_state_cluster.get_mapping().map( + current_state_cluster.get_hypervisor_from_id("Node_0"), + current_state_cluster.get_vm_from_id("VM_0")) + + current_state_cluster.get_mapping().map( + current_state_cluster.get_hypervisor_from_id("Node_1"), + current_state_cluster.get_vm_from_id("VM_1")) + + return current_state_cluster + + def generate_scenario_2(self): + """Simulates a cluster + + With 4 hypervisors and 6 VMs all mapped to one hypervisor + """ + + current_state_cluster = modelroot.ModelRoot() + count_node = 4 + count_vm = 6 + + mem = resource.Resource(resource.ResourceType.memory) + num_cores = resource.Resource(resource.ResourceType.cpu_cores) + disk = resource.Resource(resource.ResourceType.disk) + disk_capacity =\ + resource.Resource(resource.ResourceType.disk_capacity) + + current_state_cluster.create_resource(mem) + current_state_cluster.create_resource(num_cores) + current_state_cluster.create_resource(disk) + current_state_cluster.create_resource(disk_capacity) + + for i in range(0, count_node): + node_uuid = "Node_{0}".format(i) + node = hypervisor.Hypervisor() + node.uuid = node_uuid + node.hostname = "hostname_{0}".format(i) + node.state = 'up' + + mem.set_capacity(node, 64) + disk_capacity.set_capacity(node, 250) + num_cores.set_capacity(node, 16) + current_state_cluster.add_hypervisor(node) + + for i in range(0, count_vm): + vm_uuid = "VM_{0}".format(i) + vm = modelvm.VM() + vm.uuid = vm_uuid + vm.state = vm_state.VMState.ACTIVE + mem.set_capacity(vm, 2) + disk.set_capacity(vm, 20) + num_cores.set_capacity(vm, 10) + current_state_cluster.add_vm(vm) + + current_state_cluster.get_mapping().map( + current_state_cluster.get_hypervisor_from_id("Node_0"), + current_state_cluster.get_vm_from_id("VM_%s" % str(i))) + + return current_state_cluster + + def generate_scenario_3(self): + """Simulates a cluster + + With 4 hypervisors and 6 VMs all mapped to one hypervisor + """ + + current_state_cluster = modelroot.ModelRoot() + count_node = 2 + count_vm = 4 + + mem = resource.Resource(resource.ResourceType.memory) + num_cores = resource.Resource(resource.ResourceType.cpu_cores) + disk = resource.Resource(resource.ResourceType.disk) + disk_capacity =\ + resource.Resource(resource.ResourceType.disk_capacity) + + current_state_cluster.create_resource(mem) + current_state_cluster.create_resource(num_cores) + current_state_cluster.create_resource(disk) + current_state_cluster.create_resource(disk_capacity) + + for i in range(0, count_node): + node_uuid = "Node_{0}".format(i) + node = hypervisor.Hypervisor() + node.uuid = node_uuid + node.hostname = "hostname_{0}".format(i) + node.state = 'up' + + mem.set_capacity(node, 64) + disk_capacity.set_capacity(node, 250) + num_cores.set_capacity(node, 10) + current_state_cluster.add_hypervisor(node) + + for i in range(6, 6 + count_vm): + vm_uuid = "VM_{0}".format(i) + vm = modelvm.VM() + vm.uuid = vm_uuid + vm.state = vm_state.VMState.ACTIVE + mem.set_capacity(vm, 2) + disk.set_capacity(vm, 20) + num_cores.set_capacity(vm, 2 ** (i-6)) + current_state_cluster.add_vm(vm) + + current_state_cluster.get_mapping().map( + current_state_cluster.get_hypervisor_from_id("Node_0"), + current_state_cluster.get_vm_from_id("VM_%s" % str(i))) + + return current_state_cluster + + +class FakeCeilometerMetrics(object): + def __init__(self, model): + self.model = model + + def mock_get_statistics(self, resource_id, meter_name, period=3600, + aggregate='avg'): + if meter_name == "compute.node.cpu.percent": + return self.get_hypervisor_cpu_util(resource_id) + elif meter_name == "cpu_util": + return self.get_vm_cpu_util(resource_id) + elif meter_name == "memory.usage": + return self.get_vm_ram_util(resource_id) + elif meter_name == "disk.root.size": + return self.get_vm_disk_root_size(resource_id) + + def get_hypervisor_cpu_util(self, r_id): + """Calculates hypervisor utilization dynamicaly. + + Hypervisor CPU utilization should consider + and corelate with actual VM-hypervisor mappings + provided within a cluster model. + Returns relative hypervisor CPU utilization <0, 100>. + :param r_id: resource id + """ + + id = '%s_%s' % (r_id.split('_')[0], r_id.split('_')[1]) + vms = self.model.get_mapping().get_node_vms_from_id(id) + util_sum = 0.0 + hypervisor_cpu_cores = self.model.get_resource_from_id( + resource.ResourceType.cpu_cores).get_capacity_from_id(id) + for vm_uuid in vms: + vm_cpu_cores = self.model.get_resource_from_id( + resource.ResourceType.cpu_cores).\ + get_capacity(self.model.get_vm_from_id(vm_uuid)) + total_cpu_util = vm_cpu_cores * self.get_vm_cpu_util(vm_uuid) + util_sum += total_cpu_util / 100.0 + util_sum /= hypervisor_cpu_cores + return util_sum * 100.0 + + def get_vm_cpu_util(self, r_id): + vm_cpu_util = dict() + vm_cpu_util['VM_0'] = 10 + vm_cpu_util['VM_1'] = 30 + vm_cpu_util['VM_2'] = 60 + vm_cpu_util['VM_3'] = 20 + vm_cpu_util['VM_4'] = 40 + vm_cpu_util['VM_5'] = 50 + vm_cpu_util['VM_6'] = 100 + vm_cpu_util['VM_7'] = 100 + vm_cpu_util['VM_8'] = 100 + vm_cpu_util['VM_9'] = 100 + return vm_cpu_util[str(r_id)] + + def get_vm_ram_util(self, r_id): + vm_ram_util = dict() + vm_ram_util['VM_0'] = 1 + vm_ram_util['VM_1'] = 2 + vm_ram_util['VM_2'] = 4 + vm_ram_util['VM_3'] = 8 + vm_ram_util['VM_4'] = 3 + vm_ram_util['VM_5'] = 2 + vm_ram_util['VM_6'] = 1 + vm_ram_util['VM_7'] = 2 + vm_ram_util['VM_8'] = 4 + vm_ram_util['VM_9'] = 8 + return vm_ram_util[str(r_id)] + + def get_vm_disk_root_size(self, r_id): + vm_disk_util = dict() + vm_disk_util['VM_0'] = 10 + vm_disk_util['VM_1'] = 15 + vm_disk_util['VM_2'] = 30 + vm_disk_util['VM_3'] = 35 + vm_disk_util['VM_4'] = 20 + vm_disk_util['VM_5'] = 25 + vm_disk_util['VM_6'] = 25 + vm_disk_util['VM_7'] = 25 + vm_disk_util['VM_8'] = 25 + vm_disk_util['VM_9'] = 25 + return vm_disk_util[str(r_id)] diff --git a/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py b/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py new file mode 100644 index 000000000..3472d14d3 --- /dev/null +++ b/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py @@ -0,0 +1,277 @@ +# -*- encoding: utf-8 -*- +# +# Authors: Vojtech CIMA +# Bruno GRAZIOLI +# Sean MURPHY +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import mock + +from watcher.decision_engine.strategy import strategies +from watcher.tests import base +from watcher.tests.decision_engine.strategy.strategies \ + import faker_cluster_and_metrics + + +class TestSmartConsolidation(base.BaseTestCase): + fake_cluster = faker_cluster_and_metrics.FakerModelCollector() + + def test_get_vm_utilization(self): + cluster = self.fake_cluster.generate_scenario_1() + fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster) + strategy = strategies.VMWorkloadConsolidation() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=fake_metrics.mock_get_statistics) + vm_0 = cluster.get_vm_from_id("VM_0") + vm_util = dict(cpu=1.0, ram=1, disk=10) + self.assertEqual(vm_util, + strategy.get_vm_utilization(vm_0.uuid, cluster)) + + def test_get_hypervisor_utilization(self): + cluster = self.fake_cluster.generate_scenario_1() + fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster) + strategy = strategies.VMWorkloadConsolidation() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=fake_metrics.mock_get_statistics) + node_0 = cluster.get_hypervisor_from_id("Node_0") + node_util = dict(cpu=1.0, ram=1, disk=10) + self.assertEqual(node_util, + strategy.get_hypervisor_utilization(node_0, cluster)) + + def test_get_hypervisor_capacity(self): + cluster = self.fake_cluster.generate_scenario_1() + fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster) + strategy = strategies.VMWorkloadConsolidation() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=fake_metrics.mock_get_statistics) + node_0 = cluster.get_hypervisor_from_id("Node_0") + node_util = dict(cpu=40, ram=64, disk=250) + self.assertEqual(node_util, + strategy.get_hypervisor_capacity(node_0, cluster)) + + def test_get_relative_hypervisor_utilization(self): + model = self.fake_cluster.generate_scenario_1() + fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) + strategy = strategies.VMWorkloadConsolidation() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=fake_metrics.mock_get_statistics) + hypervisor = model.get_hypervisor_from_id('Node_0') + rhu = strategy.get_relative_hypervisor_utilization(hypervisor, model) + expected_rhu = {'disk': 0.04, 'ram': 0.015625, 'cpu': 0.025} + self.assertEqual(expected_rhu, rhu) + + def test_get_relative_cluster_utilization(self): + model = self.fake_cluster.generate_scenario_1() + fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) + strategy = strategies.VMWorkloadConsolidation() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=fake_metrics.mock_get_statistics) + cru = strategy.get_relative_cluster_utilization(model) + expected_cru = {'cpu': 0.05, 'disk': 0.05, 'ram': 0.0234375} + self.assertEqual(expected_cru, cru) + + def test_add_migration(self): + model = self.fake_cluster.generate_scenario_1() + fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) + strategy = strategies.VMWorkloadConsolidation() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=fake_metrics.mock_get_statistics) + h1 = model.get_hypervisor_from_id('Node_0') + h2 = model.get_hypervisor_from_id('Node_1') + vm_uuid = 'VM_0' + strategy.add_migration(vm_uuid, h1, h2, model) + self.assertEqual(1, len(strategy.solution.actions)) + expected = {'action_type': 'migrate', + 'input_parameters': {'dst_hypervisor': h2.uuid, + 'src_hypervisor': h1.uuid, + 'migration_type': 'live', + 'resource_id': vm_uuid}} + self.assertEqual(expected, strategy.solution.actions[0]) + + def test_is_overloaded(self): + strategy = strategies.VMWorkloadConsolidation() + model = self.fake_cluster.generate_scenario_1() + fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=fake_metrics.mock_get_statistics) + h1 = model.get_hypervisor_from_id('Node_0') + cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} + res = strategy.is_overloaded(h1, model, cc) + self.assertEqual(False, res) + + cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0} + res = strategy.is_overloaded(h1, model, cc) + self.assertEqual(False, res) + + cc = {'cpu': 0.024, 'ram': 1.0, 'disk': 1.0} + res = strategy.is_overloaded(h1, model, cc) + self.assertEqual(True, res) + + def test_vm_fits(self): + model = self.fake_cluster.generate_scenario_1() + fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) + strategy = strategies.VMWorkloadConsolidation() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=fake_metrics.mock_get_statistics) + h = model.get_hypervisor_from_id('Node_1') + vm_uuid = 'VM_0' + cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} + res = strategy.vm_fits(vm_uuid, h, model, cc) + self.assertEqual(True, res) + + cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0} + res = strategy.vm_fits(vm_uuid, h, model, cc) + self.assertEqual(False, res) + + def test_add_action_activate_hypervisor(self): + model = self.fake_cluster.generate_scenario_1() + fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) + strategy = strategies.VMWorkloadConsolidation() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=fake_metrics.mock_get_statistics) + h = model.get_hypervisor_from_id('Node_0') + strategy.add_action_activate_hypervisor(h) + expected = [{'action_type': 'change_nova_service_state', + 'input_parameters': {'state': 'up', + 'resource_id': 'Node_0'}}] + self.assertEqual(expected, strategy.solution.actions) + + def test_add_action_deactivate_hypervisor(self): + model = self.fake_cluster.generate_scenario_1() + fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) + strategy = strategies.VMWorkloadConsolidation() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=fake_metrics.mock_get_statistics) + h = model.get_hypervisor_from_id('Node_0') + strategy.add_action_deactivate_hypervisor(h) + expected = [{'action_type': 'change_nova_service_state', + 'input_parameters': {'state': 'down', + 'resource_id': 'Node_0'}}] + self.assertEqual(expected, strategy.solution.actions) + + def test_deactivate_unused_hypervisors(self): + model = self.fake_cluster.generate_scenario_1() + fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) + strategy = strategies.VMWorkloadConsolidation() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=fake_metrics.mock_get_statistics) + h1 = model.get_hypervisor_from_id('Node_0') + h2 = model.get_hypervisor_from_id('Node_1') + vm_uuid = 'VM_0' + strategy.deactivate_unused_hypervisors(model) + self.assertEqual(0, len(strategy.solution.actions)) + + # Migrate VM to free the hypervisor + strategy.add_migration(vm_uuid, h1, h2, model) + + strategy.deactivate_unused_hypervisors(model) + expected = {'action_type': 'change_nova_service_state', + 'input_parameters': {'state': 'down', + 'resource_id': 'Node_0'}} + self.assertEqual(2, len(strategy.solution.actions)) + self.assertEqual(expected, strategy.solution.actions[1]) + + def test_offload_phase(self): + model = self.fake_cluster.generate_scenario_1() + fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) + strategy = strategies.VMWorkloadConsolidation() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=fake_metrics.mock_get_statistics) + cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} + strategy.offload_phase(model, cc) + expected = [] + self.assertEqual(expected, strategy.solution.actions) + + def test_consolidation_phase(self): + model = self.fake_cluster.generate_scenario_1() + fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) + strategy = strategies.VMWorkloadConsolidation() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=fake_metrics.mock_get_statistics) + h1 = model.get_hypervisor_from_id('Node_0') + h2 = model.get_hypervisor_from_id('Node_1') + vm_uuid = 'VM_0' + cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} + strategy.consolidation_phase(model, cc) + expected = [{'action_type': 'migrate', + 'input_parameters': {'dst_hypervisor': h2.uuid, + 'src_hypervisor': h1.uuid, + 'migration_type': 'live', + 'resource_id': vm_uuid}}] + self.assertEqual(expected, strategy.solution.actions) + + def test_strategy(self): + model = self.fake_cluster.generate_scenario_2() + fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) + strategy = strategies.VMWorkloadConsolidation() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=fake_metrics.mock_get_statistics) + h1 = model.get_hypervisor_from_id('Node_0') + cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} + strategy.offload_phase(model, cc) + strategy.consolidation_phase(model, cc) + strategy.optimize_solution(model) + h2 = strategy.solution.actions[0][ + 'input_parameters']['dst_hypervisor'] + expected = [{'action_type': 'migrate', + 'input_parameters': {'dst_hypervisor': h2, + 'src_hypervisor': h1.uuid, + 'migration_type': 'live', + 'resource_id': 'VM_3'}}, + {'action_type': 'migrate', + 'input_parameters': {'dst_hypervisor': h2, + 'src_hypervisor': h1.uuid, + 'migration_type': 'live', + 'resource_id': 'VM_1'}}] + + self.assertEqual(expected, strategy.solution.actions) + + def test_strategy2(self): + model = self.fake_cluster.generate_scenario_3() + fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model) + strategy = strategies.VMWorkloadConsolidation() + strategy.ceilometer = mock.MagicMock( + statistic_aggregation=fake_metrics.mock_get_statistics) + h1 = model.get_hypervisor_from_id('Node_0') + h2 = model.get_hypervisor_from_id('Node_1') + cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0} + strategy.offload_phase(model, cc) + expected = [{'action_type': 'migrate', + 'input_parameters': {'dst_hypervisor': h2.uuid, + 'migration_type': 'live', + 'resource_id': 'VM_6', + 'src_hypervisor': h1.uuid}}, + {'action_type': 'migrate', + 'input_parameters': {'dst_hypervisor': h2.uuid, + 'migration_type': 'live', + 'resource_id': 'VM_7', + 'src_hypervisor': h1.uuid}}, + {'action_type': 'migrate', + 'input_parameters': {'dst_hypervisor': h2.uuid, + 'migration_type': 'live', + 'resource_id': 'VM_8', + 'src_hypervisor': h1.uuid}}] + self.assertEqual(expected, strategy.solution.actions) + strategy.consolidation_phase(model, cc) + expected.append({'action_type': 'migrate', + 'input_parameters': {'dst_hypervisor': h1.uuid, + 'migration_type': 'live', + 'resource_id': 'VM_7', + 'src_hypervisor': h2.uuid}}) + self.assertEqual(expected, strategy.solution.actions) + strategy.optimize_solution(model) + del expected[3] + del expected[1] + self.assertEqual(expected, strategy.solution.actions)