From 5a2a94fbec0389f0a7ec1fd3b31ddbd0a3028fdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vincent=20Fran=C3=A7oise?= Date: Fri, 24 Jun 2016 17:24:01 +0200 Subject: [PATCH] Loadable Cluster Data Model Collectors In this changeset, I made BaseClusterDataModelCollector instances pluggable. This corresponds to "part 1" of the work items detailed in the specifications. Change-Id: Iab1c7e264add9e2cbbbb767e3fd6e99a0c22c691 Partially-Implements: blueprint cluster-model-objects-wrapper --- .coveragerc | 4 +- setup.cfg | 3 + watcher/decision_engine/model/model_root.py | 8 +- .../strategy/strategies/base.py | 16 ++-- .../strategies/basic_consolidation.py | 58 +++++++------- .../strategy/strategies/dummy_strategy.py | 4 +- .../strategies/outlet_temp_control.py | 24 +++--- .../strategy/strategies/uniform_airflow.py | 31 ++++---- .../strategies/vm_workload_consolidation.py | 14 +--- .../strategy/strategies/workload_balance.py | 41 +++++----- .../strategies/workload_stabilization.py | 45 ++++++----- .../cluster_model_collector/base.py | 31 +++++++- .../cluster_model_collector/manager.py | 34 ++++++-- .../cluster_model_collector/nova.py | 45 +++++++---- watcher/metrics_engine/loading/__init__.py | 0 watcher/metrics_engine/loading/default.py | 23 ++++++ .../tests/collector/test_nova_collector.py | 21 ++--- .../decision_engine/model/test_mapping.py | 25 +++--- .../planner/test_default_planner.py | 4 +- .../strategy/context/test_strategy_context.py | 2 +- .../strategies/faker_cluster_and_metrics.py | 14 ++-- .../strategies/faker_cluster_state.py | 15 ++-- .../strategies/test_basic_consolidation.py | 2 +- .../strategies/test_dummy_strategy.py | 2 +- .../strategies/test_outlet_temp_control.py | 2 +- .../strategies/test_uniform_airflow.py | 2 +- .../test_vm_workload_consolidation.py | 2 +- .../strategies/test_workload_balance.py | 2 +- .../strategies/test_workload_stabilization.py | 2 +- watcher/tests/metrics_engine/__init__.py | 0 watcher/tests/metrics_engine/test_loading.py | 78 +++++++++++++++++++ 31 files changed, 364 insertions(+), 190 deletions(-) create mode 100644 watcher/metrics_engine/loading/__init__.py create mode 100644 watcher/metrics_engine/loading/default.py create mode 100644 watcher/tests/metrics_engine/__init__.py create mode 100644 watcher/tests/metrics_engine/test_loading.py diff --git a/.coveragerc b/.coveragerc index 243e0228c..b474acb1d 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,7 +1,9 @@ [run] branch = True source = watcher -omit = watcher/tests/* +omit = + watcher/tests/* + watcher/hacking/* [report] ignore_errors = True diff --git a/setup.cfg b/setup.cfg index 5073dc6b2..bda61f29a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -75,6 +75,9 @@ watcher_workflow_engines = watcher_planners = default = watcher.decision_engine.planner.default:DefaultPlanner +watcher_cluster_data_model_collectors = + compute = watcher.metrics_engine.cluster_model_collector.nova:NovaClusterDataModelCollector + [pbr] warnerrors = true autodoc_index_modules = true diff --git a/watcher/decision_engine/model/model_root.py b/watcher/decision_engine/model/model_root.py index 5ec65f9b1..eea6ff7f8 100644 --- a/watcher/decision_engine/model/model_root.py +++ b/watcher/decision_engine/model/model_root.py @@ -13,8 +13,10 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. + from watcher._i18n import _ from watcher.common import exception +from watcher.common import utils from watcher.decision_engine.model import hypervisor from watcher.decision_engine.model import mapping from watcher.decision_engine.model import vm @@ -22,10 +24,10 @@ from watcher.decision_engine.model import vm class ModelRoot(object): def __init__(self): - self._hypervisors = {} - self._vms = {} + self._hypervisors = utils.Struct() + self._vms = utils.Struct() self.mapping = mapping.Mapping(self) - self.resource = {} + self.resource = utils.Struct() def assert_hypervisor(self, obj): if not isinstance(obj, hypervisor.Hypervisor): diff --git a/watcher/decision_engine/strategy/strategies/base.py b/watcher/decision_engine/strategy/strategies/base.py index 0c3124f70..b8ae1f09f 100644 --- a/watcher/decision_engine/strategy/strategies/base.py +++ b/watcher/decision_engine/strategy/strategies/base.py @@ -75,7 +75,7 @@ class BaseStrategy(loadable.Loadable): self._solution = default.DefaultSolution(goal=self.goal, strategy=self) self._osc = osc self._collector_manager = None - self._model = None + self._compute_model = None self._goal = None self._input_parameters = utils.Struct() @@ -159,24 +159,24 @@ class BaseStrategy(loadable.Loadable): return self.solution @property - def collector(self): + def collector_manager(self): if self._collector_manager is None: self._collector_manager = manager.CollectorManager() return self._collector_manager @property - def model(self): + def compute_model(self): """Cluster data model :returns: Cluster data model the strategy is executed on :rtype model: :py:class:`~.ModelRoot` instance """ - if self._model is None: - collector = self.collector.get_cluster_model_collector( - osc=self.osc) - self._model = collector.get_latest_cluster_data_model() + if self._compute_model is None: + collector = self.collector_manager.get_cluster_model_collector( + 'compute', osc=self.osc) + self._compute_model = collector.get_latest_cluster_data_model() - return self._model + return self._compute_model @classmethod def get_schema(cls): diff --git a/watcher/decision_engine/strategy/strategies/basic_consolidation.py b/watcher/decision_engine/strategy/strategies/basic_consolidation.py index e706c6a84..06e5429b2 100644 --- a/watcher/decision_engine/strategy/strategies/basic_consolidation.py +++ b/watcher/decision_engine/strategy/strategies/basic_consolidation.py @@ -155,16 +155,16 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): total_cores = 0 total_disk = 0 total_mem = 0 - cpu_capacity = self.model.get_resource_from_id( + cpu_capacity = self.compute_model.get_resource_from_id( resource.ResourceType.cpu_cores) - disk_capacity = self.model.get_resource_from_id( + disk_capacity = self.compute_model.get_resource_from_id( resource.ResourceType.disk) - memory_capacity = self.model.get_resource_from_id( + memory_capacity = self.compute_model.get_resource_from_id( resource.ResourceType.memory) - for vm_id in self.model. \ + for vm_id in self.compute_model. \ get_mapping().get_node_vms(dest_hypervisor): - vm = self.model.get_vm_from_id(vm_id) + vm = self.compute_model.get_vm_from_id(vm_id) total_cores += cpu_capacity.get_capacity(vm) total_disk += disk_capacity.get_capacity(vm) total_mem += memory_capacity.get_capacity(vm) @@ -191,11 +191,11 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): :param total_mem: total memory used by the virtual machine :return: True if the threshold is not exceed """ - cpu_capacity = self.model.get_resource_from_id( + cpu_capacity = self.compute_model.get_resource_from_id( resource.ResourceType.cpu_cores).get_capacity(dest_hypervisor) - disk_capacity = self.model.get_resource_from_id( + disk_capacity = self.compute_model.get_resource_from_id( resource.ResourceType.disk).get_capacity(dest_hypervisor) - memory_capacity = self.model.get_resource_from_id( + memory_capacity = self.compute_model.get_resource_from_id( resource.ResourceType.memory).get_capacity(dest_hypervisor) return (cpu_capacity >= total_cores * self.threshold_cores and @@ -222,13 +222,13 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): :param total_memory_used: :return: """ - cpu_capacity = self.model.get_resource_from_id( + cpu_capacity = self.compute_model.get_resource_from_id( resource.ResourceType.cpu_cores).get_capacity(element) - disk_capacity = self.model.get_resource_from_id( + disk_capacity = self.compute_model.get_resource_from_id( resource.ResourceType.disk).get_capacity(element) - memory_capacity = self.model.get_resource_from_id( + memory_capacity = self.compute_model.get_resource_from_id( resource.ResourceType.memory).get_capacity(element) score_cores = (1 - (float(cpu_capacity) - float(total_cores_used)) / @@ -269,7 +269,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): ) host_avg_cpu_util = 100 - cpu_capacity = self.model.get_resource_from_id( + cpu_capacity = self.compute_model.get_resource_from_id( resource.ResourceType.cpu_cores).get_capacity(hypervisor) total_cores_used = cpu_capacity * (host_avg_cpu_util / 100) @@ -292,7 +292,6 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): """Calculate Score of virtual machine :param vm: the virtual machine - :param self.model: the cluster model :return: score """ vm_cpu_utilization = self.ceilometer. \ @@ -311,7 +310,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): ) vm_cpu_utilization = 100 - cpu_capacity = self.model.get_resource_from_id( + cpu_capacity = self.compute_model.get_resource_from_id( resource.ResourceType.cpu_cores).get_capacity(vm) total_cores_used = cpu_capacity * (vm_cpu_utilization / 100.0) @@ -338,10 +337,10 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): def score_of_nodes(self, score): """Calculate score of nodes based on load by VMs""" - for hypervisor_id in self.model.get_all_hypervisors(): - hypervisor = self.model. \ + for hypervisor_id in self.compute_model.get_all_hypervisors(): + hypervisor = self.compute_model. \ get_hypervisor_from_id(hypervisor_id) - count = self.model.get_mapping(). \ + count = self.compute_model.get_mapping(). \ get_node_vms_from_id(hypervisor_id) if len(count) > 0: result = self.calculate_score_node(hypervisor) @@ -355,12 +354,12 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): def node_and_vm_score(self, sorted_score, score): """Get List of VMs from node""" node_to_release = sorted_score[len(score) - 1][0] - vms_to_mig = self.model.get_mapping().get_node_vms_from_id( + vms_to_mig = self.compute_model.get_mapping().get_node_vms_from_id( node_to_release) vm_score = [] for vm_id in vms_to_mig: - vm = self.model.get_vm_from_id(vm_id) + vm = self.compute_model.get_vm_from_id(vm_id) if vm.state == vm_state.VMState.ACTIVE.value: vm_score.append( (vm_id, self.calculate_score_vm(vm))) @@ -370,13 +369,13 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): def create_migration_vm(self, mig_vm, mig_src_hypervisor, mig_dst_hypervisor): """Create migration VM""" - if self.model.get_mapping().migrate_vm( + if self.compute_model.get_mapping().migrate_vm( mig_vm, mig_src_hypervisor, mig_dst_hypervisor): self.add_migration(mig_vm.uuid, 'live', mig_src_hypervisor.uuid, mig_dst_hypervisor.uuid) - if len(self.model.get_mapping().get_node_vms( + if len(self.compute_model.get_mapping().get_node_vms( mig_src_hypervisor)) == 0: self.add_change_service_state(mig_src_hypervisor. uuid, @@ -389,10 +388,10 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): number_migrations = 0 for vm in sorted_vms: for j in range(0, len(sorted_score)): - mig_vm = self.model.get_vm_from_id(vm[0]) - mig_src_hypervisor = self.model.get_hypervisor_from_id( + mig_vm = self.compute_model.get_vm_from_id(vm[0]) + mig_src_hypervisor = self.compute_model.get_hypervisor_from_id( node_to_release) - mig_dst_hypervisor = self.model.get_hypervisor_from_id( + mig_dst_hypervisor = self.compute_model.get_hypervisor_from_id( sorted_score[j][0]) result = self.check_migration( @@ -414,7 +413,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): def pre_execute(self): LOG.info(_LI("Initializing Sercon Consolidation")) - if self.model is None: + if self.compute_model is None: raise exception.ClusterStateNotDefined() def do_execute(self): @@ -423,15 +422,16 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): unsuccessful_migration = 0 first_migration = True - size_cluster = len(self.model.get_all_hypervisors()) + size_cluster = len(self.compute_model.get_all_hypervisors()) if size_cluster == 0: raise exception.ClusterEmpty() self.compute_attempts(size_cluster) - for hypervisor_id in self.model.get_all_hypervisors(): - hypervisor = self.model.get_hypervisor_from_id(hypervisor_id) - count = self.model.get_mapping(). \ + for hypervisor_id in self.compute_model.get_all_hypervisors(): + hypervisor = self.compute_model.get_hypervisor_from_id( + hypervisor_id) + count = self.compute_model.get_mapping(). \ get_node_vms_from_id(hypervisor_id) if len(count) == 0: if hypervisor.state == hyper_state.HypervisorState.ENABLED: diff --git a/watcher/decision_engine/strategy/strategies/dummy_strategy.py b/watcher/decision_engine/strategy/strategies/dummy_strategy.py index e191875de..170e9b307 100644 --- a/watcher/decision_engine/strategy/strategies/dummy_strategy.py +++ b/watcher/decision_engine/strategy/strategies/dummy_strategy.py @@ -19,7 +19,6 @@ from oslo_log import log from watcher._i18n import _ -from watcher.common import exception from watcher.decision_engine.strategy.strategies import base LOG = log.getLogger(__name__) @@ -50,8 +49,7 @@ class DummyStrategy(base.DummyBaseStrategy): SLEEP = "sleep" def pre_execute(self): - if self.model is None: - raise exception.ClusterStateNotDefined() + pass def do_execute(self): para1 = self.input_parameters.para1 diff --git a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py index 134644d30..a2d04ef34 100644 --- a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py +++ b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py @@ -125,13 +125,13 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): def calc_used_res(self, hypervisor, cpu_capacity, memory_capacity, disk_capacity): """Calculate the used vcpus, memory and disk based on VM flavors""" - vms = self.model.get_mapping().get_node_vms(hypervisor) + vms = self.compute_model.get_mapping().get_node_vms(hypervisor) vcpus_used = 0 memory_mb_used = 0 disk_gb_used = 0 if len(vms) > 0: for vm_id in vms: - vm = self.model.get_vm_from_id(vm_id) + vm = self.compute_model.get_vm_from_id(vm_id) vcpus_used += cpu_capacity.get_capacity(vm) memory_mb_used += memory_capacity.get_capacity(vm) disk_gb_used += disk_capacity.get_capacity(vm) @@ -140,7 +140,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): def group_hosts_by_outlet_temp(self): """Group hosts based on outlet temp meters""" - hypervisors = self.model.get_all_hypervisors() + hypervisors = self.compute_model.get_all_hypervisors() size_cluster = len(hypervisors) if size_cluster == 0: raise wexc.ClusterEmpty() @@ -148,7 +148,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): hosts_need_release = [] hosts_target = [] for hypervisor_id in hypervisors: - hypervisor = self.model.get_hypervisor_from_id( + hypervisor = self.compute_model.get_hypervisor_from_id( hypervisor_id) resource_id = hypervisor.uuid @@ -175,13 +175,13 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): """Pick up an active vm instance to migrate from provided hosts""" for hvmap in hosts: mig_src_hypervisor = hvmap['hv'] - vms_of_src = self.model.get_mapping().get_node_vms( + vms_of_src = self.compute_model.get_mapping().get_node_vms( mig_src_hypervisor) if len(vms_of_src) > 0: for vm_id in vms_of_src: try: # select the first active VM to migrate - vm = self.model.get_vm_from_id(vm_id) + vm = self.compute_model.get_vm_from_id(vm_id) if vm.state != vm_state.VMState.ACTIVE.value: LOG.info(_LI("VM not active, skipped: %s"), vm.uuid) @@ -195,11 +195,11 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): def filter_dest_servers(self, hosts, vm_to_migrate): """Only return hosts with sufficient available resources""" - cpu_capacity = self.model.get_resource_from_id( + cpu_capacity = self.compute_model.get_resource_from_id( resource.ResourceType.cpu_cores) - disk_capacity = self.model.get_resource_from_id( + disk_capacity = self.compute_model.get_resource_from_id( resource.ResourceType.disk) - memory_capacity = self.model.get_resource_from_id( + memory_capacity = self.compute_model.get_resource_from_id( resource.ResourceType.memory) required_cores = cpu_capacity.get_capacity(vm_to_migrate) @@ -226,7 +226,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): def pre_execute(self): LOG.debug("Initializing Outlet temperature strategy") - if self.model is None: + if self.compute_model is None: raise wexc.ClusterStateNotDefined() def do_execute(self): @@ -270,7 +270,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): # always use the host with lowerest outlet temperature mig_dst_hypervisor = dest_servers[0]['hv'] # generate solution to migrate the vm to the dest server, - if self.model.get_mapping().migrate_vm( + if self.compute_model.get_mapping().migrate_vm( vm_src, mig_src_hypervisor, mig_dst_hypervisor): parameters = {'migration_type': 'live', 'src_hypervisor': mig_src_hypervisor.uuid, @@ -280,5 +280,5 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): input_parameters=parameters) def post_execute(self): - self.solution.model = self.model + self.solution.model = self.compute_model # TODO(v-francoise): Add the indicators to the solution diff --git a/watcher/decision_engine/strategy/strategies/uniform_airflow.py b/watcher/decision_engine/strategy/strategies/uniform_airflow.py index 7bda184fd..45f8d3b3f 100644 --- a/watcher/decision_engine/strategy/strategies/uniform_airflow.py +++ b/watcher/decision_engine/strategy/strategies/uniform_airflow.py @@ -130,12 +130,12 @@ class UniformAirflow(base.BaseStrategy): def calculate_used_resource(self, hypervisor, cap_cores, cap_mem, cap_disk): """Calculate the used vcpus, memory and disk based on VM flavors""" - vms = self.model.get_mapping().get_node_vms(hypervisor) + vms = self.compute_model.get_mapping().get_node_vms(hypervisor) vcpus_used = 0 memory_mb_used = 0 disk_gb_used = 0 for vm_id in vms: - vm = self.model.get_vm_from_id(vm_id) + vm = self.compute_model.get_vm_from_id(vm_id) vcpus_used += cap_cores.get_capacity(vm) memory_mb_used += cap_mem.get_capacity(vm) disk_gb_used += cap_disk.get_capacity(vm) @@ -150,7 +150,7 @@ class UniformAirflow(base.BaseStrategy): vms_tobe_migrate = [] for hvmap in hosts: source_hypervisor = hvmap['hv'] - source_vms = self.model.get_mapping().get_node_vms( + source_vms = self.compute_model.get_mapping().get_node_vms( source_hypervisor) if source_vms: inlet_t = self.ceilometer.statistic_aggregation( @@ -168,7 +168,7 @@ class UniformAirflow(base.BaseStrategy): # hardware issue, migrate all vms from this hypervisor for vm_id in source_vms: try: - vm = self.model.get_vm_from_id(vm_id) + vm = self.compute_model.get_vm_from_id(vm_id) vms_tobe_migrate.append(vm) except wexc.InstanceNotFound: LOG.error(_LE("VM not found; error: %s"), vm_id) @@ -177,7 +177,7 @@ class UniformAirflow(base.BaseStrategy): # migrate the first active vm for vm_id in source_vms: try: - vm = self.model.get_vm_from_id(vm_id) + vm = self.compute_model.get_vm_from_id(vm_id) if vm.state != vm_state.VMState.ACTIVE.value: LOG.info(_LI("VM not active; skipped: %s"), vm.uuid) @@ -193,10 +193,11 @@ class UniformAirflow(base.BaseStrategy): def filter_destination_hosts(self, hosts, vms_to_migrate): """Return vm and host with sufficient available resources""" - cap_cores = self.model.get_resource_from_id( + cap_cores = self.compute_model.get_resource_from_id( resource.ResourceType.cpu_cores) - cap_disk = self.model.get_resource_from_id(resource.ResourceType.disk) - cap_mem = self.model.get_resource_from_id( + cap_disk = self.compute_model.get_resource_from_id( + resource.ResourceType.disk) + cap_mem = self.compute_model.get_resource_from_id( resource.ResourceType.memory) # large vm go first vms_to_migrate = sorted(vms_to_migrate, reverse=True, @@ -240,13 +241,14 @@ class UniformAirflow(base.BaseStrategy): def group_hosts_by_airflow(self): """Group hosts based on airflow meters""" - hypervisors = self.model.get_all_hypervisors() + hypervisors = self.compute_model.get_all_hypervisors() if not hypervisors: raise wexc.ClusterEmpty() overload_hosts = [] nonoverload_hosts = [] for hypervisor_id in hypervisors: - hypervisor = self.model.get_hypervisor_from_id(hypervisor_id) + hypervisor = self.compute_model.get_hypervisor_from_id( + hypervisor_id) resource_id = hypervisor.uuid airflow = self.ceilometer.statistic_aggregation( resource_id=resource_id, @@ -270,7 +272,7 @@ class UniformAirflow(base.BaseStrategy): def pre_execute(self): LOG.debug("Initializing Uniform Airflow Strategy") - if self.model is None: + if self.compute_model is None: raise wexc.ClusterStateNotDefined() def do_execute(self): @@ -310,9 +312,8 @@ class UniformAirflow(base.BaseStrategy): for info in destination_hosts: vm_src = info['vm'] mig_dst_hypervisor = info['hv'] - if self.model.get_mapping().migrate_vm(vm_src, - source_hypervisor, - mig_dst_hypervisor): + if self.compute_model.get_mapping().migrate_vm( + vm_src, source_hypervisor, mig_dst_hypervisor): parameters = {'migration_type': 'live', 'src_hypervisor': source_hypervisor.uuid, 'dst_hypervisor': mig_dst_hypervisor.uuid} @@ -321,5 +322,5 @@ class UniformAirflow(base.BaseStrategy): input_parameters=parameters) def post_execute(self): - self.solution.model = self.model + self.solution.model = self.compute_model # TODO(v-francoise): Add the indicators to the solution diff --git a/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py b/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py index 177f4015e..03301660b 100644 --- a/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py +++ b/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py @@ -16,8 +16,6 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -# -from copy import deepcopy from oslo_log import log import six @@ -209,14 +207,6 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): hyper_state.HypervisorState.DISABLED.value): self.add_action_disable_hypervisor(hypervisor) - def get_prediction_model(self): - """Return a deepcopy of a model representing current cluster state. - - :param model: model_root object - :return: model_root object - """ - return deepcopy(self.model) - def get_vm_utilization(self, vm_uuid, model, period=3600, aggr='avg'): """Collect cpu, ram and disk utilization statistics of a VM. @@ -501,7 +491,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): asc += 1 def pre_execute(self): - if self.model is None: + if self.compute_model is None: raise exception.ClusterStateNotDefined() def do_execute(self): @@ -519,7 +509,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): :param original_model: root_model object """ LOG.info(_LI('Executing Smart Strategy')) - model = self.get_prediction_model() + model = self.compute_model.get_latest_cluster_data_model() rcu = self.get_relative_cluster_utilization(model) self.ceilometer_vm_data_cache = dict() diff --git a/watcher/decision_engine/strategy/strategies/workload_balance.py b/watcher/decision_engine/strategy/strategies/workload_balance.py index 43ac76583..75243c24c 100644 --- a/watcher/decision_engine/strategy/strategies/workload_balance.py +++ b/watcher/decision_engine/strategy/strategies/workload_balance.py @@ -108,12 +108,12 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): def calculate_used_resource(self, hypervisor, cap_cores, cap_mem, cap_disk): """Calculate the used vcpus, memory and disk based on VM flavors""" - vms = self.model.get_mapping().get_node_vms(hypervisor) + vms = self.compute_model.get_mapping().get_node_vms(hypervisor) vcpus_used = 0 memory_mb_used = 0 disk_gb_used = 0 for vm_id in vms: - vm = self.model.get_vm_from_id(vm_id) + vm = self.compute_model.get_vm_from_id(vm_id) vcpus_used += cap_cores.get_capacity(vm) memory_mb_used += cap_mem.get_capacity(vm) disk_gb_used += cap_disk.get_capacity(vm) @@ -129,7 +129,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): """ for hvmap in hosts: source_hypervisor = hvmap['hv'] - source_vms = self.model.get_mapping().get_node_vms( + source_vms = self.compute_model.get_mapping().get_node_vms( source_hypervisor) if source_vms: delta_workload = hvmap['workload'] - avg_workload @@ -138,7 +138,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): for vm_id in source_vms: try: # select the first active VM to migrate - vm = self.model.get_vm_from_id(vm_id) + vm = self.compute_model.get_vm_from_id(vm_id) if vm.state != vm_state.VMState.ACTIVE.value: LOG.debug("VM not active; skipped: %s", vm.uuid) @@ -150,8 +150,8 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): except wexc.InstanceNotFound: LOG.error(_LE("VM not found; error: %s"), vm_id) if instance_id: - return source_hypervisor, self.model.get_vm_from_id( - instance_id) + return (source_hypervisor, + self.compute_model.get_vm_from_id(instance_id)) else: LOG.info(_LI("VM not found on hypervisor: %s"), source_hypervisor.uuid) @@ -160,10 +160,12 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): avg_workload, workload_cache): '''Only return hosts with sufficient available resources''' - cap_cores = self.model.get_resource_from_id( + cap_cores = self.compute_model.get_resource_from_id( resource.ResourceType.cpu_cores) - cap_disk = self.model.get_resource_from_id(resource.ResourceType.disk) - cap_mem = self.model.get_resource_from_id(resource.ResourceType.memory) + cap_disk = self.compute_model.get_resource_from_id( + resource.ResourceType.disk) + cap_mem = self.compute_model.get_resource_from_id( + resource.ResourceType.memory) required_cores = cap_cores.get_capacity(vm_to_migrate) required_disk = cap_disk.get_capacity(vm_to_migrate) @@ -201,12 +203,12 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): and also generate the VM workload map. """ - hypervisors = self.model.get_all_hypervisors() + hypervisors = self.compute_model.get_all_hypervisors() cluster_size = len(hypervisors) if not hypervisors: raise wexc.ClusterEmpty() # get cpu cores capacity of hypervisors and vms - cap_cores = self.model.get_resource_from_id( + cap_cores = self.compute_model.get_resource_from_id( resource.ResourceType.cpu_cores) overload_hosts = [] nonoverload_hosts = [] @@ -216,11 +218,12 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): # use workload_cache to store the workload of VMs for reuse purpose workload_cache = {} for hypervisor_id in hypervisors: - hypervisor = self.model.get_hypervisor_from_id(hypervisor_id) - vms = self.model.get_mapping().get_node_vms(hypervisor) + hypervisor = self.compute_model.get_hypervisor_from_id( + hypervisor_id) + vms = self.compute_model.get_mapping().get_node_vms(hypervisor) hypervisor_workload = 0.0 for vm_id in vms: - vm = self.model.get_vm_from_id(vm_id) + vm = self.compute_model.get_vm_from_id(vm_id) try: cpu_util = self.ceilometer.statistic_aggregation( resource_id=vm_id, @@ -262,7 +265,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): """ LOG.info(_LI("Initializing Workload Balance Strategy")) - if self.model is None: + if self.compute_model is None: raise wexc.ClusterStateNotDefined() def do_execute(self): @@ -308,9 +311,9 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): key=lambda x: (x["cpu_util"])) # always use the host with lowerest CPU utilization mig_dst_hypervisor = destination_hosts[0]['hv'] - # generate solution to migrate the vm to the dest server - if self.model.get_mapping().migrate_vm(vm_src, source_hypervisor, - mig_dst_hypervisor): + # generate solution to migrate the vm to the dest server, + if self.compute_model.get_mapping().migrate_vm( + vm_src, source_hypervisor, mig_dst_hypervisor): parameters = {'migration_type': 'live', 'src_hypervisor': source_hypervisor.uuid, 'dst_hypervisor': mig_dst_hypervisor.uuid} @@ -323,4 +326,4 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): This can be used to compute the global efficacy """ - self.solution.model = self.model + self.solution.model = self.compute_model diff --git a/watcher/decision_engine/strategy/strategies/workload_stabilization.py b/watcher/decision_engine/strategy/strategies/workload_stabilization.py index 84073e699..acd9dba93 100644 --- a/watcher/decision_engine/strategy/strategies/workload_stabilization.py +++ b/watcher/decision_engine/strategy/strategies/workload_stabilization.py @@ -171,9 +171,9 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): :return: dict """ LOG.debug('get_vm_load started') - vm_vcpus = self.model.get_resource_from_id( + vm_vcpus = self.compute_model.get_resource_from_id( resource.ResourceType.cpu_cores).get_capacity( - self.model.get_vm_from_id(vm_uuid)) + self.compute_model.get_vm_from_id(vm_uuid)) vm_load = {'uuid': vm_uuid, 'vcpus': vm_vcpus} for meter in self.metrics: avg_meter = self.ceilometer.statistic_aggregation( @@ -192,9 +192,9 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): normalized_hosts = deepcopy(hosts) for host in normalized_hosts: if 'memory.resident' in normalized_hosts[host]: - h_memory = self.model.get_resource_from_id( + h_memory = self.compute_model.get_resource_from_id( resource.ResourceType.memory).get_capacity( - self.model.get_hypervisor_from_id(host)) + self.compute_model.get_hypervisor_from_id(host)) normalized_hosts[host]['memory.resident'] /= float(h_memory) return normalized_hosts @@ -202,11 +202,11 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): def get_hosts_load(self): """Get load of every host by gathering vms load""" hosts_load = {} - for hypervisor_id in self.model.get_all_hypervisors(): + for hypervisor_id in self.compute_model.get_all_hypervisors(): hosts_load[hypervisor_id] = {} - host_vcpus = self.model.get_resource_from_id( + host_vcpus = self.compute_model.get_resource_from_id( resource.ResourceType.cpu_cores).get_capacity( - self.model.get_hypervisor_from_id(hypervisor_id)) + self.compute_model.get_hypervisor_from_id(hypervisor_id)) hosts_load[hypervisor_id]['vcpus'] = host_vcpus for metric in self.metrics: @@ -297,15 +297,15 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): yield hypervisors vm_host_map = [] - for source_hp_id in self.model.get_all_hypervisors(): - hypervisors = list(self.model.get_all_hypervisors()) + for source_hp_id in self.compute_model.get_all_hypervisors(): + hypervisors = list(self.compute_model.get_all_hypervisors()) hypervisors.remove(source_hp_id) hypervisor_list = yield_hypervisors(hypervisors) - vms_id = self.model.get_mapping(). \ + vms_id = self.compute_model.get_mapping(). \ get_node_vms_from_id(source_hp_id) for vm_id in vms_id: min_sd_case = {'value': len(self.metrics)} - vm = self.model.get_vm_from_id(vm_id) + vm = self.compute_model.get_vm_from_id(vm_id) if vm.state not in [vm_state.VMState.ACTIVE.value, vm_state.VMState.PAUSED.value]: continue @@ -347,27 +347,29 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): def create_migration_vm(self, mig_vm, mig_src_hypervisor, mig_dst_hypervisor): """Create migration VM """ - if self.model.get_mapping().migrate_vm( + if self.compute_model.get_mapping().migrate_vm( mig_vm, mig_src_hypervisor, mig_dst_hypervisor): self.add_migration(mig_vm.uuid, 'live', mig_src_hypervisor.uuid, mig_dst_hypervisor.uuid) def migrate(self, vm_uuid, src_host, dst_host): - mig_vm = self.model.get_vm_from_id(vm_uuid) - mig_src_hypervisor = self.model.get_hypervisor_from_id(src_host) - mig_dst_hypervisor = self.model.get_hypervisor_from_id(dst_host) + mig_vm = self.compute_model.get_vm_from_id(vm_uuid) + mig_src_hypervisor = self.compute_model.get_hypervisor_from_id( + src_host) + mig_dst_hypervisor = self.compute_model.get_hypervisor_from_id( + dst_host) self.create_migration_vm(mig_vm, mig_src_hypervisor, mig_dst_hypervisor) def fill_solution(self): - self.solution.model = self.model + self.solution.model = self.compute_model return self.solution def pre_execute(self): LOG.info(_LI("Initializing Workload Stabilization")) - if self.model is None: + if self.compute_model is None: raise exception.ClusterStateNotDefined() def do_execute(self): @@ -377,12 +379,13 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): min_sd = 1 balanced = False for vm_host in migration: - dst_hp_disk = self.model.get_resource_from_id( + dst_hp_disk = self.compute_model.get_resource_from_id( resource.ResourceType.disk).get_capacity( - self.model.get_hypervisor_from_id(vm_host['host'])) - vm_disk = self.model.get_resource_from_id( + self.compute_model.get_hypervisor_from_id( + vm_host['host'])) + vm_disk = self.compute_model.get_resource_from_id( resource.ResourceType.disk).get_capacity( - self.model.get_vm_from_id(vm_host['vm'])) + self.compute_model.get_vm_from_id(vm_host['vm'])) if vm_disk > dst_hp_disk: continue vm_load = self.calculate_migration_case(hosts_load, diff --git a/watcher/metrics_engine/cluster_model_collector/base.py b/watcher/metrics_engine/cluster_model_collector/base.py index b08ee9fff..46ed09af0 100644 --- a/watcher/metrics_engine/cluster_model_collector/base.py +++ b/watcher/metrics_engine/cluster_model_collector/base.py @@ -109,11 +109,38 @@ XML File, In Memory Database, ...). """ import abc +import copy import six +from watcher.common.loader import loadable + @six.add_metaclass(abc.ABCMeta) -class BaseClusterModelCollector(object): +class BaseClusterDataModelCollector(loadable.Loadable): + + def __init__(self, config, osc=None): + super(BaseClusterDataModelCollector, self).__init__(config) + self.osc = osc + self._cluster_data_model = None + + @property + def cluster_data_model(self): + if self._cluster_data_model is None: + self._cluster_data_model = self.execute() + return self._cluster_data_model + + @cluster_data_model.setter + def cluster_data_model(self, model): + self._cluster_data_model = model + @abc.abstractmethod - def get_latest_cluster_data_model(self): + def execute(self): + """Build a cluster data model""" raise NotImplementedError() + + @classmethod + def get_config_opts(cls): + return [] + + def get_latest_cluster_data_model(self): + return copy.deepcopy(self.cluster_data_model) diff --git a/watcher/metrics_engine/cluster_model_collector/manager.py b/watcher/metrics_engine/cluster_model_collector/manager.py index fdad56838..9420c4f4e 100644 --- a/watcher/metrics_engine/cluster_model_collector/manager.py +++ b/watcher/metrics_engine/cluster_model_collector/manager.py @@ -19,8 +19,8 @@ from oslo_config import cfg -from watcher.common import nova_helper -from watcher.metrics_engine.cluster_model_collector import nova as cnova +from watcher.common import utils +from watcher.metrics_engine.loading import default CONF = cfg.CONF @@ -28,7 +28,29 @@ CONF = cfg.CONF class CollectorManager(object): - def get_cluster_model_collector(self, osc=None): - """:param osc: an OpenStackClients instance""" - nova = nova_helper.NovaHelper(osc=osc) - return cnova.NovaClusterModelCollector(nova) + def __init__(self): + self.collector_loader = default.ClusterDataModelCollectorLoader() + self._collectors = None + + def get_collectors(self): + if self._collectors is None: + collectors = utils.Struct() + available_collectors = self.collector_loader.list_available() + for collector_name in available_collectors: + collector = self.collector_loader.load(collector_name) + collectors[collector_name] = collector + self._collectors = collectors + + return self._collectors + + def get_cluster_model_collector(self, name, osc=None): + """Retrieve cluster data model collector + + :param name: name of the cluster data model collector plugin + :type name: str + :param osc: an OpenStackClients instance + :type osc: :py:class:`~.OpenStackClients` instance + :returns: cluster data model collector plugin + :rtype: :py:class:`~.BaseClusterDataModelCollector` + """ + return self.collector_loader.load(name, osc=osc) diff --git a/watcher/metrics_engine/cluster_model_collector/nova.py b/watcher/metrics_engine/cluster_model_collector/nova.py index b3ac3e827..df58d2ee8 100644 --- a/watcher/metrics_engine/cluster_model_collector/nova.py +++ b/watcher/metrics_engine/cluster_model_collector/nova.py @@ -15,10 +15,10 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -# from oslo_log import log +from watcher.common import nova_helper from watcher.decision_engine.model import hypervisor as obj_hypervisor from watcher.decision_engine.model import model_root from watcher.decision_engine.model import resource @@ -28,23 +28,36 @@ from watcher.metrics_engine.cluster_model_collector import base LOG = log.getLogger(__name__) -class NovaClusterModelCollector(base.BaseClusterModelCollector): - def __init__(self, wrapper): - super(NovaClusterModelCollector, self).__init__() - self.wrapper = wrapper +class NovaClusterDataModelCollector(base.BaseClusterDataModelCollector): + """nova - def get_latest_cluster_data_model(self): - LOG.debug("Getting latest cluster data model") + *Description* - cluster = model_root.ModelRoot() + This Nova cluster data model collector creates an in-memory representation + of the resources exposed by the compute service. + + *Spec URL* + + + """ + + def __init__(self, config, osc=None): + super(NovaClusterDataModelCollector, self).__init__(config, osc) + self.wrapper = nova_helper.NovaHelper(osc=self.osc) + + def execute(self): + """Build the compute cluster data model""" + LOG.debug("Building latest Nova cluster data model") + + model = model_root.ModelRoot() mem = resource.Resource(resource.ResourceType.memory) num_cores = resource.Resource(resource.ResourceType.cpu_cores) disk = resource.Resource(resource.ResourceType.disk) disk_capacity = resource.Resource(resource.ResourceType.disk_capacity) - cluster.create_resource(mem) - cluster.create_resource(num_cores) - cluster.create_resource(disk) - cluster.create_resource(disk_capacity) + model.create_resource(mem) + model.create_resource(num_cores) + model.create_resource(disk) + model.create_resource(disk_capacity) flavor_cache = {} hypervisors = self.wrapper.get_hypervisors_list() @@ -61,7 +74,7 @@ class NovaClusterModelCollector(base.BaseClusterModelCollector): num_cores.set_capacity(hypervisor, h.vcpus) hypervisor.state = h.state hypervisor.status = h.status - cluster.add_hypervisor(hypervisor) + model.add_hypervisor(hypervisor) vms = self.wrapper.get_vms_by_hypervisor(str(service.host)) for v in vms: # create VM in cluster_model_collector @@ -76,6 +89,6 @@ class NovaClusterModelCollector(base.BaseClusterModelCollector): disk.set_capacity(vm, v.flavor['disk']) num_cores.set_capacity(vm, v.flavor['vcpus']) - cluster.get_mapping().map(hypervisor, vm) - cluster.add_vm(vm) - return cluster + model.get_mapping().map(hypervisor, vm) + model.add_vm(vm) + return model diff --git a/watcher/metrics_engine/loading/__init__.py b/watcher/metrics_engine/loading/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/watcher/metrics_engine/loading/default.py b/watcher/metrics_engine/loading/default.py new file mode 100644 index 000000000..d2ea8a04d --- /dev/null +++ b/watcher/metrics_engine/loading/default.py @@ -0,0 +1,23 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import unicode_literals + + +from watcher.common.loader import default + + +class ClusterDataModelCollectorLoader(default.DefaultLoader): + def __init__(self): + super(ClusterDataModelCollectorLoader, self).__init__( + namespace='watcher_cluster_data_model_collectors') diff --git a/watcher/tests/collector/test_nova_collector.py b/watcher/tests/collector/test_nova_collector.py index 568f0f2f8..83e19c469 100644 --- a/watcher/tests/collector/test_nova_collector.py +++ b/watcher/tests/collector/test_nova_collector.py @@ -20,25 +20,28 @@ import mock -from watcher.metrics_engine.cluster_model_collector.nova import \ - NovaClusterModelCollector +from watcher.common import nova_helper +from watcher.metrics_engine.cluster_model_collector import nova from watcher.tests import base class TestNovaCollector(base.TestCase): - @mock.patch('keystoneclient.v3.client.Client') - def setUp(self, mock_ksclient): + + @mock.patch('keystoneclient.v3.client.Client', mock.Mock()) + @mock.patch.object(nova_helper, 'NovaHelper') + def setUp(self, m_nova_helper): super(TestNovaCollector, self).setUp() - self.wrapper = mock.MagicMock() - self.nova_collector = NovaClusterModelCollector(self.wrapper) + self.m_nova_helper = m_nova_helper + self.nova_collector = nova.NovaClusterDataModelCollector( + config=mock.Mock()) def test_nova_collector(self): hypervisor = mock.Mock() - hypervisor.hypervisor_hostname = "rdev-lannion.eu" + hypervisor.hypervisor_hostname = "compute-1" hypervisor.service = mock.MagicMock() service = mock.Mock() service.host = "" - self.wrapper.get_hypervisors_list.return_value = {hypervisor} - self.wrapper.nova.services.find.get.return_value = service + self.m_nova_helper.get_hypervisors_list.return_value = {hypervisor} + self.m_nova_helper.nova.services.find.get.return_value = service model = self.nova_collector.get_latest_cluster_data_model() self.assertIsNotNone(model) diff --git a/watcher/tests/decision_engine/model/test_mapping.py b/watcher/tests/decision_engine/model/test_mapping.py index e92b04403..5852584e6 100644 --- a/watcher/tests/decision_engine/model/test_mapping.py +++ b/watcher/tests/decision_engine/model/test_mapping.py @@ -30,9 +30,12 @@ class TestMapping(base.BaseTestCase): VM1_UUID = "73b09e16-35b7-4922-804e-e8f5d9b740fc" VM2_UUID = "a4cab39b-9828-413a-bf88-f76921bf1517" + def setUp(self): + super(TestMapping, self).setUp() + self.fake_cluster = faker_cluster_state.FakerModelCollector() + def test_get_node_from_vm(self): - fake_cluster = faker_cluster_state.FakerModelCollector() - model = fake_cluster.generate_scenario_3_with_2_hypervisors() + model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() vms = model.get_all_vms() keys = list(vms.keys()) @@ -43,15 +46,13 @@ class TestMapping(base.BaseTestCase): self.assertEqual('Node_0', node.uuid) def test_get_node_from_vm_id(self): - fake_cluster = faker_cluster_state.FakerModelCollector() - model = fake_cluster.generate_scenario_3_with_2_hypervisors() + model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() hyps = model.mapping.get_node_vms_from_id("BLABLABLA") self.assertEqual(0, hyps.__len__()) def test_get_all_vms(self): - fake_cluster = faker_cluster_state.FakerModelCollector() - model = fake_cluster.generate_scenario_3_with_2_hypervisors() + model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() vms = model.get_all_vms() self.assertEqual(2, vms.__len__()) @@ -63,16 +64,14 @@ class TestMapping(base.BaseTestCase): self.assertEqual(self.VM2_UUID, vms[self.VM2_UUID].uuid) def test_get_mapping(self): - fake_cluster = faker_cluster_state.FakerModelCollector() - model = fake_cluster.generate_scenario_3_with_2_hypervisors() + model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() mapping_vm = model.mapping.get_mapping_vm() self.assertEqual(2, mapping_vm.__len__()) self.assertEqual('Node_0', mapping_vm[self.VM1_UUID]) self.assertEqual('Node_1', mapping_vm[self.VM2_UUID]) def test_migrate_vm(self): - fake_cluster = faker_cluster_state.FakerModelCollector() - model = fake_cluster.generate_scenario_3_with_2_hypervisors() + model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() vms = model.get_all_vms() keys = list(vms.keys()) vm0 = vms[keys[0]] @@ -86,8 +85,7 @@ class TestMapping(base.BaseTestCase): self.assertEqual(True, model.mapping.migrate_vm(vm1, hyp0, hyp1)) def test_unmap_from_id_log_warning(self): - fake_cluster = faker_cluster_state.FakerModelCollector() - model = fake_cluster.generate_scenario_3_with_2_hypervisors() + model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() vms = model.get_all_vms() keys = list(vms.keys()) vm0 = vms[keys[0]] @@ -100,8 +98,7 @@ class TestMapping(base.BaseTestCase): # hypervisor.uuid)), 1) def test_unmap_from_id(self): - fake_cluster = faker_cluster_state.FakerModelCollector() - model = fake_cluster.generate_scenario_3_with_2_hypervisors() + model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() vms = model.get_all_vms() keys = list(vms.keys()) vm0 = vms[keys[0]] diff --git a/watcher/tests/decision_engine/planner/test_default_planner.py b/watcher/tests/decision_engine/planner/test_default_planner.py index e5f6e38fc..5c84bdd32 100644 --- a/watcher/tests/decision_engine/planner/test_default_planner.py +++ b/watcher/tests/decision_engine/planner/test_default_planner.py @@ -37,7 +37,7 @@ class SolutionFaker(object): metrics = fake.FakerMetricsCollector() current_state_cluster = faker_cluster_state.FakerModelCollector() sercon = strategies.BasicConsolidation(config=mock.Mock()) - sercon._model = current_state_cluster.generate_scenario_1() + sercon._compute_model = current_state_cluster.generate_scenario_1() sercon.ceilometer = mock.MagicMock( get_statistics=metrics.mock_get_statistics) return sercon.execute() @@ -49,7 +49,7 @@ class SolutionFakerSingleHyp(object): metrics = fake.FakerMetricsCollector() current_state_cluster = faker_cluster_state.FakerModelCollector() sercon = strategies.BasicConsolidation(config=mock.Mock()) - sercon._model = ( + sercon._compute_model = ( current_state_cluster.generate_scenario_3_with_2_hypervisors()) sercon.ceilometer = mock.MagicMock( get_statistics=metrics.mock_get_statistics) diff --git a/watcher/tests/decision_engine/strategy/context/test_strategy_context.py b/watcher/tests/decision_engine/strategy/context/test_strategy_context.py index 3bb03b106..e3578faeb 100644 --- a/watcher/tests/decision_engine/strategy/context/test_strategy_context.py +++ b/watcher/tests/decision_engine/strategy/context/test_strategy_context.py @@ -36,7 +36,7 @@ class TestStrategyContext(base.DbTestCase): strategy_context = d_strategy_ctx.DefaultStrategyContext() - @mock.patch.object(strategies.DummyStrategy, 'model', + @mock.patch.object(strategies.DummyStrategy, 'compute_model', new_callable=mock.PropertyMock) @mock.patch.object(d_selector.DefaultStrategySelector, 'select') def test_execute_strategy(self, mock_call, m_model): diff --git a/watcher/tests/decision_engine/strategy/strategies/faker_cluster_and_metrics.py b/watcher/tests/decision_engine/strategy/strategies/faker_cluster_and_metrics.py index c96189646..36e8bfa18 100644 --- a/watcher/tests/decision_engine/strategy/strategies/faker_cluster_and_metrics.py +++ b/watcher/tests/decision_engine/strategy/strategies/faker_cluster_and_metrics.py @@ -16,7 +16,9 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -# + +import mock + from watcher.decision_engine.model import hypervisor from watcher.decision_engine.model import model_root as modelroot from watcher.decision_engine.model import resource @@ -25,12 +27,14 @@ from watcher.decision_engine.model import vm_state from watcher.metrics_engine.cluster_model_collector import base -class FakerModelCollector(base.BaseClusterModelCollector): +class FakerModelCollector(base.BaseClusterDataModelCollector): - def __init__(self): - pass + def __init__(self, config=None, osc=None): + if config is None: + config = mock.Mock() + super(FakerModelCollector, self).__init__(config) - def get_latest_cluster_data_model(self): + def execute(self): return self.generate_scenario_1() def generate_scenario_1(self): diff --git a/watcher/tests/decision_engine/strategy/strategies/faker_cluster_state.py b/watcher/tests/decision_engine/strategy/strategies/faker_cluster_state.py index 4f931e417..2b285ebdc 100644 --- a/watcher/tests/decision_engine/strategy/strategies/faker_cluster_state.py +++ b/watcher/tests/decision_engine/strategy/strategies/faker_cluster_state.py @@ -15,7 +15,9 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -# + +import mock + from watcher.decision_engine.model import hypervisor from watcher.decision_engine.model import model_root as modelroot from watcher.decision_engine.model import resource @@ -23,11 +25,14 @@ from watcher.decision_engine.model import vm as modelvm from watcher.metrics_engine.cluster_model_collector import base -class FakerModelCollector(base.BaseClusterModelCollector): - def __init__(self): - pass +class FakerModelCollector(base.BaseClusterDataModelCollector): - def get_latest_cluster_data_model(self): + def __init__(self, config=None, osc=None): + if config is None: + config = mock.Mock() + super(FakerModelCollector, self).__init__(config) + + def execute(self): return self.generate_scenario_1() def generate_scenario_1(self): diff --git a/watcher/tests/decision_engine/strategy/strategies/test_basic_consolidation.py b/watcher/tests/decision_engine/strategy/strategies/test_basic_consolidation.py index bb732ec53..c446e342b 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_basic_consolidation.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_basic_consolidation.py @@ -40,7 +40,7 @@ class TestBasicConsolidation(base.BaseTestCase): self.fake_cluster = faker_cluster_state.FakerModelCollector() p_model = mock.patch.object( - strategies.BasicConsolidation, "model", + strategies.BasicConsolidation, "compute_model", new_callable=mock.PropertyMock) self.m_model = p_model.start() self.addCleanup(p_model.stop) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_dummy_strategy.py b/watcher/tests/decision_engine/strategy/strategies/test_dummy_strategy.py index 155716c17..6100d1580 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_dummy_strategy.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_dummy_strategy.py @@ -33,7 +33,7 @@ class TestDummyStrategy(base.TestCase): self.fake_cluster = faker_cluster_state.FakerModelCollector() p_model = mock.patch.object( - strategies.DummyStrategy, "model", + strategies.DummyStrategy, "compute_model", new_callable=mock.PropertyMock) self.m_model = p_model.start() self.addCleanup(p_model.stop) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py b/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py index fa62c7e1d..cd32b504c 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py @@ -42,7 +42,7 @@ class TestOutletTempControl(base.BaseTestCase): self.fake_cluster = faker_cluster_state.FakerModelCollector() p_model = mock.patch.object( - strategies.OutletTempControl, "model", + strategies.OutletTempControl, "compute_model", new_callable=mock.PropertyMock) self.m_model = p_model.start() self.addCleanup(p_model.stop) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py b/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py index bc2d1ab44..dd9c0b663 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py @@ -41,7 +41,7 @@ class TestUniformAirflow(base.BaseTestCase): self.fake_cluster = faker_cluster_state.FakerModelCollector() p_model = mock.patch.object( - strategies.UniformAirflow, "model", + strategies.UniformAirflow, "compute_model", new_callable=mock.PropertyMock) self.m_model = p_model.start() self.addCleanup(p_model.stop) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py b/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py index 312f6d8b5..49ec67b67 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py @@ -36,7 +36,7 @@ class TestVMWorkloadConsolidation(base.BaseTestCase): self.fake_cluster = faker_cluster_and_metrics.FakerModelCollector() p_model = mock.patch.object( - strategies.VMWorkloadConsolidation, "model", + strategies.VMWorkloadConsolidation, "compute_model", new_callable=mock.PropertyMock) self.m_model = p_model.start() self.addCleanup(p_model.stop) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py b/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py index 96a3fb535..c4d661c8c 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py @@ -41,7 +41,7 @@ class TestWorkloadBalance(base.BaseTestCase): self.fake_cluster = faker_cluster_state.FakerModelCollector() p_model = mock.patch.object( - strategies.WorkloadBalance, "model", + strategies.WorkloadBalance, "compute_model", new_callable=mock.PropertyMock) self.m_model = p_model.start() self.addCleanup(p_model.stop) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py b/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py index 310ed9086..c43ae0053 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_workload_stabilization.py @@ -47,7 +47,7 @@ class TestWorkloadStabilization(base.BaseTestCase): 'Node_4': {'cpu_util': 0.02, 'memory.resident': 4, 'vcpus': 40}} p_model = mock.patch.object( - strategies.WorkloadStabilization, "model", + strategies.WorkloadStabilization, "compute_model", new_callable=mock.PropertyMock) self.m_model = p_model.start() self.addCleanup(p_model.stop) diff --git a/watcher/tests/metrics_engine/__init__.py b/watcher/tests/metrics_engine/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/watcher/tests/metrics_engine/test_loading.py b/watcher/tests/metrics_engine/test_loading.py new file mode 100644 index 000000000..f4cd561bf --- /dev/null +++ b/watcher/tests/metrics_engine/test_loading.py @@ -0,0 +1,78 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2015 b<>com +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +from stevedore import driver as drivermanager +from stevedore import extension as stevedore_extension + +from watcher.common import clients +from watcher.common import exception +from watcher.metrics_engine.loading import default as default_loading +from watcher.tests.decision_engine.strategy.strategies import \ + faker_cluster_state + +from watcher.tests import base + + +class TestClusterDataModelCollectorLoader(base.TestCase): + + def setUp(self): + super(TestClusterDataModelCollectorLoader, self).setUp() + self.collector_loader = ( + default_loading.ClusterDataModelCollectorLoader()) + + def test_load_collector_with_empty_model(self): + self.assertRaises( + exception.LoadingError, self.collector_loader.load, None) + + def test_collector_loader(self): + fake_driver = "fake" + # Set up the fake Stevedore extensions + fake_driver_call = drivermanager.DriverManager.make_test_instance( + extension=stevedore_extension.Extension( + name=fake_driver, + entry_point="%s:%s" % ( + faker_cluster_state.FakerModelCollector.__module__, + faker_cluster_state.FakerModelCollector.__name__), + plugin=faker_cluster_state.FakerModelCollector, + obj=None, + ), + namespace="watcher_cluster_data_model_collectors", + ) + + with mock.patch.object(drivermanager, + "DriverManager") as m_driver_manager: + m_driver_manager.return_value = fake_driver_call + loaded_collector = self.collector_loader.load("fake") + + self.assertIsInstance( + loaded_collector, faker_cluster_state.FakerModelCollector) + + +class TestLoadClusterDataModelCollectors(base.TestCase): + + collector_loader = default_loading.ClusterDataModelCollectorLoader() + + scenarios = [ + (collector_name, + {"collector_name": collector_name, "collector_cls": collector_cls}) + for collector_name, collector_cls + in collector_loader.list_available().items()] + + @mock.patch.object(clients, 'OpenStackClients', mock.Mock()) + def test_load_cluster_data_model_collectors(self): + collector = self.collector_loader.load(self.collector_name) + self.assertIsNotNone(collector)