diff --git a/watcher/cmd/api.py b/watcher/cmd/api.py index 089869345..13e5781bd 100644 --- a/watcher/cmd/api.py +++ b/watcher/cmd/api.py @@ -46,6 +46,5 @@ def main(): LOG.info(_LI('serving on %(protocol)s://%(host)s:%(port)s') % dict(protocol=protocol, host=host, port=port)) - launcher = service.process_launcher() - launcher.launch_service(server, workers=server.workers) + launcher = service.launch(CONF, server, workers=server.workers) launcher.wait() diff --git a/watcher/cmd/applier.py b/watcher/cmd/applier.py index ec45e2d2f..1ec6e35b1 100644 --- a/watcher/cmd/applier.py +++ b/watcher/cmd/applier.py @@ -22,7 +22,6 @@ import sys from oslo_config import cfg from oslo_log import log as logging -from oslo_service import service from watcher._i18n import _LI from watcher.applier import manager @@ -38,5 +37,7 @@ def main(): LOG.info(_LI('Starting Watcher Applier service in PID %s'), os.getpid()) applier_service = watcher_service.Service(manager.ApplierManager) - launcher = service.launch(CONF, applier_service) + + # Only 1 process + launcher = watcher_service.launch(CONF, applier_service) launcher.wait() diff --git a/watcher/cmd/decisionengine.py b/watcher/cmd/decisionengine.py index 00c21f8b9..ff6b0d106 100644 --- a/watcher/cmd/decisionengine.py +++ b/watcher/cmd/decisionengine.py @@ -22,11 +22,11 @@ import sys from oslo_config import cfg from oslo_log import log as logging -from oslo_service import service from watcher._i18n import _LI from watcher.common import service as watcher_service from watcher.decision_engine import manager +from watcher.decision_engine import scheduling from watcher.decision_engine import sync LOG = logging.getLogger(__name__) @@ -43,5 +43,10 @@ def main(): syncer.sync() de_service = watcher_service.Service(manager.DecisionEngineManager) - launcher = service.launch(CONF, de_service) + bg_schedulder_service = scheduling.DecisionEngineSchedulingService() + + # Only 1 process + launcher = watcher_service.launch(CONF, de_service) + launcher.launch_service(bg_schedulder_service) + launcher.wait() diff --git a/watcher/common/exception.py b/watcher/common/exception.py index 6c4e5e943..4305cf257 100644 --- a/watcher/common/exception.py +++ b/watcher/common/exception.py @@ -324,6 +324,10 @@ class MetricCollectorNotDefined(WatcherException): msg_fmt = _("The metrics resource collector is not defined") +class ClusterDataModelCollectionError(WatcherException): + msg_fmt = _("The cluster data model '%(cdm)s' could not be built") + + class ClusterStateNotDefined(WatcherException): msg_fmt = _("The cluster state is not defined") diff --git a/watcher/common/loader/loadable.py b/watcher/common/loader/loadable.py index a3260b2e2..c234274eb 100644 --- a/watcher/common/loader/loadable.py +++ b/watcher/common/loader/loadable.py @@ -18,6 +18,8 @@ import abc import six +from watcher.common import service + @six.add_metaclass(abc.ABCMeta) class Loadable(object): @@ -28,6 +30,35 @@ class Loadable(object): """ def __init__(self, config): + super(Loadable, self).__init__() + self.config = config + + @classmethod + @abc.abstractmethod + def get_config_opts(cls): + """Defines the configuration options to be associated to this loadable + + :return: A list of configuration options relative to this Loadable + :rtype: list of :class:`oslo_config.cfg.Opt` instances + """ + raise NotImplementedError + + +LoadableSingletonMeta = type( + "LoadableSingletonMeta", (abc.ABCMeta, service.Singleton), {}) + + +@six.add_metaclass(LoadableSingletonMeta) +class LoadableSingleton(object): + """Generic interface for dynamically loading a driver as a singleton. + + This defines the contract in order to let the loader manager inject + the configuration parameters during the loading. Classes inheriting from + this class will be singletons. + """ + + def __init__(self, config): + super(LoadableSingleton, self).__init__() self.config = config @classmethod diff --git a/watcher/common/scheduling.py b/watcher/common/scheduling.py new file mode 100644 index 000000000..90884d111 --- /dev/null +++ b/watcher/common/scheduling.py @@ -0,0 +1,44 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 b<>com +# +# Authors: Vincent FRANCOISE +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from apscheduler import events +from apscheduler.schedulers import background +from oslo_service import service + +job_events = events + + +class BackgroundSchedulerService(service.ServiceBase, + background.BackgroundScheduler): + + def start(self): + """Start service.""" + background.BackgroundScheduler.start(self) + + def stop(self): + """Stop service.""" + self.shutdown() + + def wait(self): + """Wait for service to complete.""" + + def reset(self): + """Reset service. + + Called in case service running in daemon mode receives SIGHUP. + """ diff --git a/watcher/common/service.py b/watcher/common/service.py index 54159e266..6725cdab4 100644 --- a/watcher/common/service.py +++ b/watcher/common/service.py @@ -62,6 +62,8 @@ _DEFAULT_LOG_LEVELS = ['amqp=WARN', 'amqplib=WARN', 'qpid.messaging=INFO', 'paramiko=WARN', 'requests=WARN', 'neutronclient=WARN', 'glanceclient=WARN', 'watcher.openstack.common=WARN'] +Singleton = service.Singleton + class WSGIService(service.ServiceBase): """Provides ability to launch Watcher API from wsgi app.""" @@ -209,8 +211,8 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher): self.publish_status(evt, payload) -def process_launcher(conf=cfg.CONF): - return service.ProcessLauncher(conf) +def launch(conf, service_, workers=1, restart_method='reload'): + return service.launch(conf, service_, workers, restart_method) def prepare_service(argv=(), conf=cfg.CONF): diff --git a/watcher/decision_engine/model/model_root.py b/watcher/decision_engine/model/model_root.py index eea6ff7f8..78c87d4b8 100644 --- a/watcher/decision_engine/model/model_root.py +++ b/watcher/decision_engine/model/model_root.py @@ -23,11 +23,18 @@ from watcher.decision_engine.model import vm class ModelRoot(object): - def __init__(self): + + def __init__(self, stale=False): self._hypervisors = utils.Struct() self._vms = utils.Struct() self.mapping = mapping.Mapping(self) self.resource = utils.Struct() + self.stale = stale + + def __nonzero__(self): + return not self.stale + + __bool__ = __nonzero__ def assert_hypervisor(self, obj): if not isinstance(obj, hypervisor.Hypervisor): diff --git a/watcher/decision_engine/scheduling.py b/watcher/decision_engine/scheduling.py new file mode 100644 index 000000000..c8f2307bd --- /dev/null +++ b/watcher/decision_engine/scheduling.py @@ -0,0 +1,91 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 b<>com +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime + +import eventlet +from oslo_log import log + +from watcher.common import exception +from watcher.common import scheduling +from watcher.metrics_engine.cluster_model_collector import manager + +LOG = log.getLogger(__name__) + + +class DecisionEngineSchedulingService(scheduling.BackgroundSchedulerService): + + def __init__(self, gconfig=None, **options): + gconfig = None or {} + super(DecisionEngineSchedulingService, self).__init__( + gconfig, **options) + self.collector_manager = manager.CollectorManager() + + @property + def collectors(self): + return self.collector_manager.get_collectors() + + def add_sync_jobs(self): + for name, collector in self.collectors.items(): + timed_task = self._wrap_collector_sync_with_timeout( + collector, name) + self.add_job(timed_task, + trigger='interval', + seconds=collector.config.period, + next_run_time=datetime.datetime.now()) + + def _as_timed_sync_func(self, sync_func, name, timeout): + def _timed_sync(): + with eventlet.Timeout( + timeout, + exception=exception.ClusterDataModelCollectionError(cdm=name) + ): + sync_func() + + return _timed_sync + + def _wrap_collector_sync_with_timeout(self, collector, name): + """Add an execution timeout constraint on a function""" + timeout = collector.config.period + + def _sync(): + try: + timed_sync = self._as_timed_sync_func( + collector.synchronize, name, timeout) + timed_sync() + except Exception as exc: + LOG.exception(exc) + collector.set_cluster_data_model_as_stale() + + return _sync + + def start(self): + """Start service.""" + self.add_sync_jobs() + super(DecisionEngineSchedulingService, self).start() + + def stop(self): + """Stop service.""" + self.shutdown() + + def wait(self): + """Wait for service to complete.""" + + def reset(self): + """Reset service. + + Called in case service running in daemon mode receives SIGHUP. + """ diff --git a/watcher/decision_engine/strategy/strategies/base.py b/watcher/decision_engine/strategy/strategies/base.py index b8ae1f09f..c30181252 100644 --- a/watcher/decision_engine/strategy/strategies/base.py +++ b/watcher/decision_engine/strategy/strategies/base.py @@ -40,6 +40,7 @@ import abc import six from watcher.common import clients +from watcher.common import exception from watcher.common.loader import loadable from watcher.common import utils from watcher.decision_engine.loading import default as loading @@ -176,6 +177,9 @@ class BaseStrategy(loadable.Loadable): 'compute', osc=self.osc) self._compute_model = collector.get_latest_cluster_data_model() + if not self._compute_model: + raise exception.ClusterStateNotDefined() + return self._compute_model @classmethod diff --git a/watcher/decision_engine/strategy/strategies/basic_consolidation.py b/watcher/decision_engine/strategy/strategies/basic_consolidation.py index 06e5429b2..cf70251c0 100644 --- a/watcher/decision_engine/strategy/strategies/basic_consolidation.py +++ b/watcher/decision_engine/strategy/strategies/basic_consolidation.py @@ -413,7 +413,7 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy): def pre_execute(self): LOG.info(_LI("Initializing Sercon Consolidation")) - if self.compute_model is None: + if not self.compute_model: raise exception.ClusterStateNotDefined() def do_execute(self): diff --git a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py index a2d04ef34..673b23946 100644 --- a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py +++ b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py @@ -226,7 +226,7 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy): def pre_execute(self): LOG.debug("Initializing Outlet temperature strategy") - if self.compute_model is None: + if not self.compute_model: raise wexc.ClusterStateNotDefined() def do_execute(self): diff --git a/watcher/decision_engine/strategy/strategies/uniform_airflow.py b/watcher/decision_engine/strategy/strategies/uniform_airflow.py index 45f8d3b3f..8a5c2d3ec 100644 --- a/watcher/decision_engine/strategy/strategies/uniform_airflow.py +++ b/watcher/decision_engine/strategy/strategies/uniform_airflow.py @@ -272,7 +272,7 @@ class UniformAirflow(base.BaseStrategy): def pre_execute(self): LOG.debug("Initializing Uniform Airflow Strategy") - if self.compute_model is None: + if not self.compute_model: raise wexc.ClusterStateNotDefined() def do_execute(self): diff --git a/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py b/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py index 03301660b..d07bbff58 100644 --- a/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py +++ b/watcher/decision_engine/strategy/strategies/vm_workload_consolidation.py @@ -491,7 +491,7 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy): asc += 1 def pre_execute(self): - if self.compute_model is None: + if not self.compute_model: raise exception.ClusterStateNotDefined() def do_execute(self): diff --git a/watcher/decision_engine/strategy/strategies/workload_balance.py b/watcher/decision_engine/strategy/strategies/workload_balance.py index 75243c24c..ad4cf37dc 100644 --- a/watcher/decision_engine/strategy/strategies/workload_balance.py +++ b/watcher/decision_engine/strategy/strategies/workload_balance.py @@ -265,7 +265,7 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy): """ LOG.info(_LI("Initializing Workload Balance Strategy")) - if self.compute_model is None: + if not self.compute_model: raise wexc.ClusterStateNotDefined() def do_execute(self): diff --git a/watcher/decision_engine/strategy/strategies/workload_stabilization.py b/watcher/decision_engine/strategy/strategies/workload_stabilization.py index acd9dba93..6c0cf7101 100644 --- a/watcher/decision_engine/strategy/strategies/workload_stabilization.py +++ b/watcher/decision_engine/strategy/strategies/workload_stabilization.py @@ -369,7 +369,7 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy): def pre_execute(self): LOG.info(_LI("Initializing Workload Stabilization")) - if self.compute_model is None: + if not self.compute_model: raise exception.ClusterStateNotDefined() def do_execute(self): diff --git a/watcher/metrics_engine/cluster_model_collector/base.py b/watcher/metrics_engine/cluster_model_collector/base.py index 1f84dcfbc..7575a5f8f 100644 --- a/watcher/metrics_engine/cluster_model_collector/base.py +++ b/watcher/metrics_engine/cluster_model_collector/base.py @@ -103,28 +103,44 @@ strategies. import abc import copy +import threading + +from oslo_config import cfg import six +from watcher.common import clients from watcher.common.loader import loadable +from watcher.decision_engine.model import model_root @six.add_metaclass(abc.ABCMeta) -class BaseClusterDataModelCollector(loadable.Loadable): +class BaseClusterDataModelCollector(loadable.LoadableSingleton): + + STALE_MODEL = model_root.ModelRoot(stale=True) def __init__(self, config, osc=None): super(BaseClusterDataModelCollector, self).__init__(config) - self.osc = osc + self.osc = osc if osc else clients.OpenStackClients() self._cluster_data_model = None + self.lock = threading.RLock() @property def cluster_data_model(self): if self._cluster_data_model is None: + self.lock.acquire() self._cluster_data_model = self.execute() + self.lock.release() + return self._cluster_data_model @cluster_data_model.setter def cluster_data_model(self, model): + self.lock.acquire() self._cluster_data_model = model + self.lock.release() + + def set_cluster_data_model_as_stale(self): + self.cluster_data_model = self.STALE_MODEL @abc.abstractmethod def execute(self): @@ -133,7 +149,21 @@ class BaseClusterDataModelCollector(loadable.Loadable): @classmethod def get_config_opts(cls): - return [] + return [ + cfg.IntOpt( + 'period', + default=3600, + help='The time interval (in seconds) between each ' + 'synchronization of the model'), + ] def get_latest_cluster_data_model(self): return copy.deepcopy(self.cluster_data_model) + + def synchronize(self): + """Synchronize the cluster data model + + Whenever called this synchronization will perform a drop-in replacement + with the existing cluster data model + """ + self.cluster_data_model = self.execute() diff --git a/watcher/tests/base.py b/watcher/tests/base.py index f1644b281..80fcadc62 100644 --- a/watcher/tests/base.py +++ b/watcher/tests/base.py @@ -27,6 +27,7 @@ from pecan import testing import testscenarios from watcher.common import context as watcher_context +from watcher.common import service from watcher.objects import base as objects_base from watcher.tests import conf_fixture from watcher.tests import policy_fixture @@ -90,10 +91,15 @@ class TestCase(BaseTestCase): self.addCleanup(p.stop) self.useFixture(conf_fixture.ConfFixture(cfg.CONF)) + self._reset_singletons() self._base_test_obj_backup = copy.copy( objects_base.WatcherObject._obj_classes) self.addCleanup(self._restore_obj_registry) + self.addCleanup(self._reset_singletons) + + def _reset_singletons(self): + service.Singleton._instances.clear() def _restore_obj_registry(self): objects_base.WatcherObject._obj_classes = self._base_test_obj_backup diff --git a/watcher/tests/cmd/test_api.py b/watcher/tests/cmd/test_api.py index 71e90bc01..bdeba3b6e 100644 --- a/watcher/tests/cmd/test_api.py +++ b/watcher/tests/cmd/test_api.py @@ -50,7 +50,7 @@ class TestApi(base.BaseTestCase): @mock.patch.object(wsgi, "Server", mock.Mock()) @mock.patch("watcher.api.app.pecan.make_app") - @mock.patch.object(service, "process_launcher") + @mock.patch.object(service, "launch") def test_run_api_app(self, m_launcher, m_make_app): m_make_app.return_value = load_test_app(config=api_config.PECAN_CONFIG) api.main() @@ -58,7 +58,7 @@ class TestApi(base.BaseTestCase): @mock.patch.object(wsgi, "Server", mock.Mock()) @mock.patch("watcher.api.app.pecan.make_app") - @mock.patch.object(service, "process_launcher") + @mock.patch.object(service, "launch") def test_run_api_app_serve_specific_address(self, m_launcher, m_make_app): cfg.CONF.set_default("host", "localhost", group="api") m_make_app.return_value = load_test_app(config=api_config.PECAN_CONFIG) diff --git a/watcher/tests/collector/__init__.py b/watcher/tests/collector/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/watcher/tests/collector/test_nova_collector.py b/watcher/tests/collector/test_nova_collector.py deleted file mode 100644 index 83e19c469..000000000 --- a/watcher/tests/collector/test_nova_collector.py +++ /dev/null @@ -1,47 +0,0 @@ -# -*- encoding: utf-8 -*- -# Copyright (c) 2015 b<>com -# -# Authors: Jean-Emile DARTOIS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -import mock - -from watcher.common import nova_helper -from watcher.metrics_engine.cluster_model_collector import nova -from watcher.tests import base - - -class TestNovaCollector(base.TestCase): - - @mock.patch('keystoneclient.v3.client.Client', mock.Mock()) - @mock.patch.object(nova_helper, 'NovaHelper') - def setUp(self, m_nova_helper): - super(TestNovaCollector, self).setUp() - self.m_nova_helper = m_nova_helper - self.nova_collector = nova.NovaClusterDataModelCollector( - config=mock.Mock()) - - def test_nova_collector(self): - hypervisor = mock.Mock() - hypervisor.hypervisor_hostname = "compute-1" - hypervisor.service = mock.MagicMock() - service = mock.Mock() - service.host = "" - self.m_nova_helper.get_hypervisors_list.return_value = {hypervisor} - self.m_nova_helper.nova.services.find.get.return_value = service - model = self.nova_collector.get_latest_cluster_data_model() - self.assertIsNotNone(model) diff --git a/watcher/tests/conf_fixture.py b/watcher/tests/conf_fixture.py index 45f0107f3..3540da186 100644 --- a/watcher/tests/conf_fixture.py +++ b/watcher/tests/conf_fixture.py @@ -26,9 +26,9 @@ CONF.import_opt('sqlite_synchronous', 'oslo_db.options', group='database') class ConfFixture(fixtures.Fixture): - """Fixture to manage global conf settings.""" + """Fixture to manage conf settings.""" - def __init__(self, conf): + def __init__(self, conf=cfg.CONF): self.conf = conf def setUp(self): diff --git a/watcher/tests/decision_engine/strategy/strategies/faker_cluster_state.py b/watcher/tests/decision_engine/strategy/strategies/faker_cluster_state.py index 2b285ebdc..fa79758b1 100644 --- a/watcher/tests/decision_engine/strategy/strategies/faker_cluster_state.py +++ b/watcher/tests/decision_engine/strategy/strategies/faker_cluster_state.py @@ -29,7 +29,7 @@ class FakerModelCollector(base.BaseClusterDataModelCollector): def __init__(self, config=None, osc=None): if config is None: - config = mock.Mock() + config = mock.Mock(period=777) super(FakerModelCollector, self).__init__(config) def execute(self): diff --git a/watcher/tests/decision_engine/strategy/strategies/test_basic_consolidation.py b/watcher/tests/decision_engine/strategy/strategies/test_basic_consolidation.py index c446e342b..b63fb0f06 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_basic_consolidation.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_basic_consolidation.py @@ -168,6 +168,14 @@ class TestBasicConsolidation(base.BaseTestCase): self.assertEqual(expected_num_migrations, num_migrations) self.assertEqual(expected_power_state, num_hypervisor_state_change) + def test_exception_stale_cdm(self): + self.fake_cluster.set_cluster_data_model_as_stale() + self.m_model.return_value = self.fake_cluster.cluster_data_model + + self.assertRaises( + exception.ClusterStateNotDefined, + self.strategy.execute) + # calculate_weight def test_execute_no_workload(self): model = ( diff --git a/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py b/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py index cd32b504c..bb8cd5aa6 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py @@ -109,6 +109,14 @@ class TestOutletTempControl(base.BaseTestCase): self.m_model.return_value = model self.assertRaises(exception.ClusterEmpty, self.strategy.execute) + def test_exception_stale_cdm(self): + self.fake_cluster.set_cluster_data_model_as_stale() + self.m_model.return_value = self.fake_cluster.cluster_data_model + + self.assertRaises( + exception.ClusterStateNotDefined, + self.strategy.execute) + def test_execute_cluster_empty(self): model = model_root.ModelRoot() self.m_model.return_value = model diff --git a/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py b/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py index dd9c0b663..fbf122af1 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_uniform_airflow.py @@ -136,6 +136,14 @@ class TestUniformAirflow(base.BaseTestCase): self.m_model.return_value = model self.assertRaises(exception.ClusterEmpty, self.strategy.execute) + def test_exception_stale_cdm(self): + self.fake_cluster.set_cluster_data_model_as_stale() + self.m_model.return_value = self.fake_cluster.cluster_data_model + + self.assertRaises( + exception.ClusterStateNotDefined, + self.strategy.execute) + def test_execute_cluster_empty(self): model = model_root.ModelRoot() self.m_model.return_value = model diff --git a/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py b/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py index 49ec67b67..f59206283 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_vm_workload_consolidation.py @@ -20,6 +20,7 @@ import mock +from watcher.common import exception from watcher.decision_engine.model import model_root from watcher.decision_engine.strategy import strategies from watcher.tests import base @@ -56,6 +57,14 @@ class TestVMWorkloadConsolidation(base.BaseTestCase): statistic_aggregation=self.fake_metrics.mock_get_statistics) self.strategy = strategies.VMWorkloadConsolidation(config=mock.Mock()) + def test_exception_stale_cdm(self): + self.fake_cluster.set_cluster_data_model_as_stale() + self.m_model.return_value = self.fake_cluster.cluster_data_model + + self.assertRaises( + exception.ClusterStateNotDefined, + self.strategy.execute) + def test_get_vm_utilization(self): model = self.fake_cluster.generate_scenario_1() self.m_model.return_value = model diff --git a/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py b/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py index c4d661c8c..be6e3e248 100644 --- a/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py +++ b/watcher/tests/decision_engine/strategy/strategies/test_workload_balance.py @@ -126,6 +126,14 @@ class TestWorkloadBalance(base.BaseTestCase): self.m_model.return_value = model self.assertRaises(exception.ClusterEmpty, self.strategy.execute) + def test_exception_stale_cdm(self): + self.fake_cluster.set_cluster_data_model_as_stale() + self.m_model.return_value = self.fake_cluster.cluster_data_model + + self.assertRaises( + exception.ClusterStateNotDefined, + self.strategy.execute) + def test_execute_cluster_empty(self): model = model_root.ModelRoot() self.m_model.return_value = model diff --git a/watcher/tests/decision_engine/test_scheduling.py b/watcher/tests/decision_engine/test_scheduling.py new file mode 100644 index 000000000..dd1376dc3 --- /dev/null +++ b/watcher/tests/decision_engine/test_scheduling.py @@ -0,0 +1,87 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 b<>com +# +# Authors: Vincent FRANCOISE +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from apscheduler.schedulers import background +from apscheduler.triggers import interval as interval_trigger +import eventlet +import mock + +from watcher.decision_engine import scheduling +from watcher.metrics_engine.loading import default as default_loading +from watcher.tests import base +from watcher.tests.decision_engine.strategy.strategies import \ + faker_cluster_state + + +class TestDecisionEngineSchedulingService(base.TestCase): + + @mock.patch.object( + default_loading.ClusterDataModelCollectorLoader, 'load') + @mock.patch.object( + default_loading.ClusterDataModelCollectorLoader, 'list_available') + @mock.patch.object(background.BackgroundScheduler, 'start') + def test_start_de_scheduling_service(self, m_start, m_list_available, + m_load): + m_list_available.return_value = { + 'fake': faker_cluster_state.FakerModelCollector} + fake_collector = faker_cluster_state.FakerModelCollector( + config=mock.Mock(period=777)) + m_load.return_value = fake_collector + + scheduler = scheduling.DecisionEngineSchedulingService() + + scheduler.start() + + m_start.assert_called_once_with(scheduler) + jobs = scheduler.get_jobs() + self.assertEqual(1, len(jobs)) + + job = jobs[0] + self.assertTrue(bool(fake_collector.cluster_data_model)) + + self.assertIsInstance(job.trigger, interval_trigger.IntervalTrigger) + + @mock.patch.object( + default_loading.ClusterDataModelCollectorLoader, 'load') + @mock.patch.object( + default_loading.ClusterDataModelCollectorLoader, 'list_available') + @mock.patch.object(background.BackgroundScheduler, 'start') + def test_execute_sync_job_fails(self, m_start, m_list_available, + m_load): + fake_config = mock.Mock(period=.01) + fake_collector = faker_cluster_state.FakerModelCollector( + config=fake_config) + fake_collector.synchronize = mock.Mock( + side_effect=lambda: eventlet.sleep(.5)) + m_list_available.return_value = { + 'fake': faker_cluster_state.FakerModelCollector} + m_load.return_value = fake_collector + + scheduler = scheduling.DecisionEngineSchedulingService() + + scheduler.start() + + m_start.assert_called_once_with(scheduler) + jobs = scheduler.get_jobs() + self.assertEqual(1, len(jobs)) + + job = jobs[0] + job.func() + self.assertFalse(bool(fake_collector.cluster_data_model)) + + self.assertIsInstance(job.trigger, interval_trigger.IntervalTrigger) diff --git a/watcher/tests/metrics_engine/test_cluster_data_model_collector.py b/watcher/tests/metrics_engine/test_cluster_data_model_collector.py new file mode 100644 index 000000000..f193cf630 --- /dev/null +++ b/watcher/tests/metrics_engine/test_cluster_data_model_collector.py @@ -0,0 +1,50 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 b<>com +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from watcher.decision_engine.model import model_root +from watcher.metrics_engine.cluster_model_collector import base +from watcher.tests import base as test_base + + +class DummyClusterDataModelCollector(base.BaseClusterDataModelCollector): + + def execute(self): + model = model_root.ModelRoot() + # Do something here... + return model + + +class TestClusterDataModelCollector(test_base.TestCase): + + def test_is_singleton(self): + m_config = mock.Mock() + inst1 = DummyClusterDataModelCollector(config=m_config) + inst2 = DummyClusterDataModelCollector(config=m_config) + + self.assertIs(inst1, inst2) + + def test_in_memory_model_is_copied(self): + m_config = mock.Mock() + collector = DummyClusterDataModelCollector(config=m_config) + collector.synchronize() + + self.assertIs( + collector._cluster_data_model, collector.cluster_data_model) + self.assertIsNot( + collector.cluster_data_model, + collector.get_latest_cluster_data_model()) diff --git a/watcher/tests/metrics_engine/test_loading.py b/watcher/tests/metrics_engine/test_loading.py index f4cd561bf..73740dc05 100644 --- a/watcher/tests/metrics_engine/test_loading.py +++ b/watcher/tests/metrics_engine/test_loading.py @@ -21,16 +21,17 @@ from stevedore import extension as stevedore_extension from watcher.common import clients from watcher.common import exception from watcher.metrics_engine.loading import default as default_loading +from watcher.tests import base +from watcher.tests import conf_fixture from watcher.tests.decision_engine.strategy.strategies import \ faker_cluster_state -from watcher.tests import base - class TestClusterDataModelCollectorLoader(base.TestCase): def setUp(self): super(TestClusterDataModelCollectorLoader, self).setUp() + self.useFixture(conf_fixture.ConfReloadFixture()) self.collector_loader = ( default_loading.ClusterDataModelCollectorLoader()) @@ -72,6 +73,10 @@ class TestLoadClusterDataModelCollectors(base.TestCase): for collector_name, collector_cls in collector_loader.list_available().items()] + def setUp(self): + super(TestLoadClusterDataModelCollectors, self).setUp() + self.useFixture(conf_fixture.ConfReloadFixture()) + @mock.patch.object(clients, 'OpenStackClients', mock.Mock()) def test_load_cluster_data_model_collectors(self): collector = self.collector_loader.load(self.collector_name) diff --git a/watcher/tests/metrics_engine/test_nova_cdmc.py b/watcher/tests/metrics_engine/test_nova_cdmc.py new file mode 100644 index 000000000..c57e98704 --- /dev/null +++ b/watcher/tests/metrics_engine/test_nova_cdmc.py @@ -0,0 +1,82 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 b<>com +# +# Authors: Vincent FRANCOISE +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from watcher.common import nova_helper +from watcher.metrics_engine.cluster_model_collector import nova +from watcher.tests import base +from watcher.tests import conf_fixture + + +class TestNovaClusterDataModelCollector(base.TestCase): + + def setUp(self): + super(TestNovaClusterDataModelCollector, self).setUp() + self.useFixture(conf_fixture.ConfReloadFixture()) + + @mock.patch('keystoneclient.v3.client.Client', mock.Mock()) + @mock.patch.object(nova_helper, 'NovaHelper') + def test_nova_cdmc_execute(self, m_nova_helper_cls): + m_nova_helper = mock.Mock() + m_nova_helper_cls.return_value = m_nova_helper + fake_hypervisor = mock.Mock( + service={'id': 123}, + hypervisor_hostname='test_hostname', + memory_mb=333, + free_disk_gb=222, + local_gb=111, + vcpus=4, + state='TEST_STATE', + status='TEST_STATUS', + ) + fake_vm = mock.Mock( + id='ef500f7e-dac8-470f-960c-169486fce71b', + state=mock.Mock(**{'OS-EXT-STS:vm_state': 'VM_STATE'}), + flavor={'ram': 333, 'disk': 222, 'vcpus': 4}, + ) + m_nova_helper.get_hypervisors_list.return_value = [fake_hypervisor] + m_nova_helper.get_vms_by_hypervisor.return_value = [fake_vm] + m_nova_helper.nova.services.find.return_value = mock.Mock( + host='test_hostname') + + def m_get_flavor_instance(vm, cache): + vm.flavor = {'ram': 333, 'disk': 222, 'vcpus': 4} + return vm + + m_nova_helper.get_flavor_instance.side_effect = m_get_flavor_instance + + m_config = mock.Mock() + m_osc = mock.Mock() + + nova_cdmc = nova.NovaClusterDataModelCollector( + config=m_config, osc=m_osc) + + model = nova_cdmc.execute() + + hypervisors = model.get_all_hypervisors() + vms = model.get_all_vms() + + self.assertEqual(1, len(hypervisors)) + self.assertEqual(1, len(vms)) + + hypervisor = list(hypervisors.values())[0] + vm = list(vms.values())[0] + + self.assertEqual(hypervisor.uuid, 'test_hostname') + self.assertEqual(vm.uuid, 'ef500f7e-dac8-470f-960c-169486fce71b') diff --git a/watcher/tests/test_list_opts.py b/watcher/tests/test_list_opts.py index 185405c93..0d24f8090 100644 --- a/watcher/tests/test_list_opts.py +++ b/watcher/tests/test_list_opts.py @@ -29,10 +29,11 @@ class TestListOpts(base.TestCase): 'api', 'watcher_decision_engine', 'watcher_applier', 'watcher_planner', 'nova_client', 'glance_client', 'cinder_client', 'ceilometer_client', 'neutron_client', - 'watcher_clients_auth', 'watcher_planners.default'] + 'watcher_clients_auth'] + self.opt_sections = list(dict(opts.list_opts()).keys()) def test_run_list_opts(self): - expected_sections = self.base_sections + expected_sections = self.opt_sections result = opts.list_opts()