From a1cb1420096af439e0e7ee60868ba96e9ae97453 Mon Sep 17 00:00:00 2001 From: Tomasz Kaczynski Date: Wed, 3 Aug 2016 08:11:00 +0000 Subject: [PATCH] Add Scoring Module implementation This change is adding the main logic for the scoring module, defines entry points for the scoring engine plugins and provides a watcher-sync tool to enable Watcher database synchronization without needing to restart any Watcher service. Partially-Implements: blueprint scoring-module Change-Id: If10daae969ec27a7008af5173359992e957dcd5e --- setup.cfg | 8 + watcher/cmd/sync.py | 39 ++++ watcher/decision_engine/loading/default.py | 13 ++ watcher/decision_engine/scoring/__init__.py | 0 .../decision_engine/scoring/dummy_scorer.py | 169 ++++++++++++++++++ .../scoring/dummy_scoring_container.py | 99 ++++++++++ .../scoring/scoring_container.py | 51 ++++++ .../decision_engine/scoring/scoring_engine.py | 97 ++++++++++ .../scoring/scoring_factory.py | 106 +++++++++++ .../strategy/strategies/__init__.py | 4 +- .../strategy/strategies/dummy_with_scorer.py | 166 +++++++++++++++++ watcher/decision_engine/sync.py | 102 ++++++++++- watcher/opts.py | 2 + .../tests/decision_engine/scoring/__init__.py | 0 .../scoring/test_dummy_scorer.py | 54 ++++++ .../scoring/test_dummy_scoring_container.py | 51 ++++++ .../scoring/test_scoring_factory.py | 53 ++++++ .../strategies/test_dummy_with_scorer.py | 61 +++++++ 18 files changed, 1073 insertions(+), 2 deletions(-) create mode 100644 watcher/cmd/sync.py create mode 100644 watcher/decision_engine/scoring/__init__.py create mode 100644 watcher/decision_engine/scoring/dummy_scorer.py create mode 100644 watcher/decision_engine/scoring/dummy_scoring_container.py create mode 100644 watcher/decision_engine/scoring/scoring_container.py create mode 100644 watcher/decision_engine/scoring/scoring_engine.py create mode 100644 watcher/decision_engine/scoring/scoring_factory.py create mode 100644 watcher/decision_engine/strategy/strategies/dummy_with_scorer.py create mode 100644 watcher/tests/decision_engine/scoring/__init__.py create mode 100644 watcher/tests/decision_engine/scoring/test_dummy_scorer.py create mode 100644 watcher/tests/decision_engine/scoring/test_dummy_scoring_container.py create mode 100644 watcher/tests/decision_engine/scoring/test_scoring_factory.py create mode 100644 watcher/tests/decision_engine/strategy/strategies/test_dummy_with_scorer.py diff --git a/setup.cfg b/setup.cfg index 6f3cb026d..fe1fcae1c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -39,6 +39,7 @@ console_scripts = watcher-db-manage = watcher.cmd.dbmanage:main watcher-decision-engine = watcher.cmd.decisionengine:main watcher-applier = watcher.cmd.applier:main + watcher-sync = watcher.cmd.sync:main tempest.test_plugins = watcher_tests = watcher_tempest_plugin.plugin:WatcherTempestPlugin @@ -54,8 +55,15 @@ watcher_goals = workload_balancing = watcher.decision_engine.goal.goals:WorkloadBalancing airflow_optimization = watcher.decision_engine.goal.goals:AirflowOptimization +watcher_scoring_engines = + dummy_scorer = watcher.decision_engine.scoring.dummy_scorer:DummyScorer + +watcher_scoring_engine_containers = + dummy_scoring_container = watcher.decision_engine.scoring.dummy_scoring_container:DummyScoringContainer + watcher_strategies = dummy = watcher.decision_engine.strategy.strategies.dummy_strategy:DummyStrategy + dummy_with_scorer = watcher.decision_engine.strategy.strategies.dummy_with_scorer:DummyWithScorer basic = watcher.decision_engine.strategy.strategies.basic_consolidation:BasicConsolidation outlet_temperature = watcher.decision_engine.strategy.strategies.outlet_temp_control:OutletTempControl vm_workload_consolidation = watcher.decision_engine.strategy.strategies.vm_workload_consolidation:VMWorkloadConsolidation diff --git a/watcher/cmd/sync.py b/watcher/cmd/sync.py new file mode 100644 index 000000000..b94a876af --- /dev/null +++ b/watcher/cmd/sync.py @@ -0,0 +1,39 @@ +# -*- encoding: utf-8 -*- +# +# Copyright (c) 2016 Intel +# +# Authors: Tomasz Kaczynski +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Script for the sync tool.""" + +import sys + +from oslo_log import log as logging + +from watcher._i18n import _LI +from watcher.common import service as service +from watcher.decision_engine import sync + +LOG = logging.getLogger(__name__) + + +def main(): + LOG.info(_LI('Watcher sync started.')) + + service.prepare_service(sys.argv) + syncer = sync.Syncer() + syncer.sync() + + LOG.info(_LI('Watcher sync finished.')) diff --git a/watcher/decision_engine/loading/default.py b/watcher/decision_engine/loading/default.py index de85cf3ca..8fbd5b893 100644 --- a/watcher/decision_engine/loading/default.py +++ b/watcher/decision_engine/loading/default.py @@ -3,6 +3,7 @@ # # Authors: Jean-Emile DARTOIS # Vincent FRANCOISE +# Tomasz Kaczynski # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -46,3 +47,15 @@ class ClusterDataModelCollectorLoader(default.DefaultLoader): def __init__(self): super(ClusterDataModelCollectorLoader, self).__init__( namespace='watcher_cluster_data_model_collectors') + + +class DefaultScoringLoader(default.DefaultLoader): + def __init__(self): + super(DefaultScoringLoader, self).__init__( + namespace='watcher_scoring_engines') + + +class DefaultScoringContainerLoader(default.DefaultLoader): + def __init__(self): + super(DefaultScoringContainerLoader, self).__init__( + namespace='watcher_scoring_engine_containers') diff --git a/watcher/decision_engine/scoring/__init__.py b/watcher/decision_engine/scoring/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/watcher/decision_engine/scoring/dummy_scorer.py b/watcher/decision_engine/scoring/dummy_scorer.py new file mode 100644 index 000000000..51cc69a3a --- /dev/null +++ b/watcher/decision_engine/scoring/dummy_scorer.py @@ -0,0 +1,169 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Intel +# +# Authors: Tomasz Kaczynski +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from oslo_log import log +from oslo_serialization import jsonutils +from oslo_utils import units + +from watcher._i18n import _ +from watcher.decision_engine.scoring import scoring_engine + +LOG = log.getLogger(__name__) + + +class DummyScorer(scoring_engine.ScoringEngine): + """Sample Scoring Engine implementing simplified workload classification. + + Typically a scoring engine would be implemented using machine learning + techniques. For example, for workload classification problem the solution + could consist of the following steps: + 1. Define a problem to solve: we want to detect the workload on the + machine based on the collected metrics like power consumption, + temperature, CPU load, memory usage, disk usage, network usage, etc. + 2. The workloads could be predefined, e.g. IDLE, CPU-INTENSIVE, + MEMORY-INTENSIVE, IO-BOUND, ... + Or we could let the ML algorithm to find the workloads based on the + learning data provided. The decision here leads to learning algorithm + used (supervised vs. non-supervised learning). + 3. Collect metrics from sample servers (learning data). + 4. Define the analytical model, pick ML framework and algorithm. + 5. Apply learning data to the data model. Once taught, the data model + becomes a scoring engine and can start doing predictions or + classifications. + 6. Wrap up the scoring engine with the class like this one, so it has a + standard interface and can be used inside Watcher. + + This class is a greatly very simplified version of the above model. The + goal is to provide an example how such class could be implemented and used + in Watcher, without adding additional dependencies like machine learning + frameworks (which can be quite heavy) or over-complicating it's internal + implementation, which can distract from looking at the overall picture. + + That said, this class implements a workload classification "manually" + (in plain python code) and is not intended to be used in production. + """ + + # Constants defining column indices for the input data + PROCESSOR_TIME_PERC = 0 + MEM_TOTAL_BYTES = 1 + MEM_AVAIL_BYTES = 2 + MEM_PAGE_READS_PER_SEC = 3 + MEM_PAGE_WRITES_PER_SEC = 4 + DISK_READ_BYTES_PER_SEC = 5 + DISK_WRITE_BYTES_PER_SEC = 6 + NET_BYTES_RECEIVED_PER_SEC = 7 + NET_BYTES_SENT_PER_SEC = 8 + + # Types of workload + WORKLOAD_IDLE = 0 + WORKLOAD_CPU = 1 + WORKLOAD_MEM = 2 + WORKLOAD_DISK = 3 + + def get_name(self): + return 'dummy_scorer' + + def get_description(self): + return 'Dummy workload classifier' + + def get_metainfo(self): + """Metadata about input/output format of this scoring engine. + + This information is used in strategy using this scoring engine to + prepare the input information and to understand the results. + """ + + return """{ + "feature_columns": [ + "proc-processor-time-%", + "mem-total-bytes", + "mem-avail-bytes", + "mem-page-reads/sec", + "mem-page-writes/sec", + "disk-read-bytes/sec", + "disk-write-bytes/sec", + "net-bytes-received/sec", + "net-bytes-sent/sec"], + "result_columns": [ + "workload", + "idle-probability", + "cpu-probability", + "memory-probability", + "disk-probability"], + "workloads": [ + "idle", + "cpu-intensive", + "memory-intensive", + "disk-intensive"] + }""" + + def calculate_score(self, features): + """Arbitrary algorithm calculating the score. + + It demonstrates how to parse the input data (features) and serialize + the results. It detects the workload type based on the metrics and + also returns the probabilities of each workload detection (again, + the arbitrary values are returned, just for demonstration how the + "real" machine learning algorithm could work. For example, the + Gradient Boosting Machine from H2O framework is using exactly the + same format: + http://www.h2o.ai/verticals/algos/gbm/ + """ + + LOG.debug('Calculating score, features: %s', features) + + # By default IDLE workload will be returned + workload = self.WORKLOAD_IDLE + idle_prob = 0.0 + cpu_prob = 0.0 + mem_prob = 0.0 + disk_prob = 0.0 + + # Basic input validation + try: + flist = jsonutils.loads(features) + except Exception as e: + raise ValueError(_('Unable to parse features: ') % e) + if type(flist) is not list: + raise ValueError(_('JSON list expected in feature argument')) + if len(flist) != 9: + raise ValueError(_('Invalid number of features, expected 9')) + + # Simple logic for workload classification + if flist[self.PROCESSOR_TIME_PERC] >= 80: + workload = self.WORKLOAD_CPU + cpu_prob = 100.0 + elif flist[self.MEM_PAGE_READS_PER_SEC] >= 1000 \ + and flist[self.MEM_PAGE_WRITES_PER_SEC] >= 1000: + workload = self.WORKLOAD_MEM + mem_prob = 100.0 + elif flist[self.DISK_READ_BYTES_PER_SEC] >= 50*units.Mi \ + and flist[self.DISK_WRITE_BYTES_PER_SEC] >= 50*units.Mi: + workload = self.WORKLOAD_DISK + disk_prob = 100.0 + else: + idle_prob = 100.0 + if flist[self.PROCESSOR_TIME_PERC] >= 40: + cpu_prob = 50.0 + if flist[self.MEM_PAGE_READS_PER_SEC] >= 500 \ + or flist[self.MEM_PAGE_WRITES_PER_SEC] >= 500: + mem_prob = 50.0 + + return jsonutils.dumps( + [workload, idle_prob, cpu_prob, mem_prob, disk_prob]) diff --git a/watcher/decision_engine/scoring/dummy_scoring_container.py b/watcher/decision_engine/scoring/dummy_scoring_container.py new file mode 100644 index 000000000..be5a533a7 --- /dev/null +++ b/watcher/decision_engine/scoring/dummy_scoring_container.py @@ -0,0 +1,99 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Intel +# +# Authors: Tomasz Kaczynski +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from oslo_log import log +from oslo_serialization import jsonutils + +from watcher._i18n import _ +from watcher.decision_engine.scoring import scoring_container +from watcher.decision_engine.scoring import scoring_engine + +LOG = log.getLogger(__name__) + + +class DummyScoringContainer(scoring_container.ScoringEngineContainer): + """Sample Scoring Engine container returning a list of scoring engines. + + Please note that it can be used in dynamic scenarios and the returned list + might return instances based on some external configuration (e.g. in + database). In order for these scoring engines to become discoverable in + Watcher API and Watcher CLI, a database re-sync is required. It can be + executed using watcher-sync tool for example. + """ + + @classmethod + def get_scoring_engine_list(self): + return [ + SimpleFunctionScorer( + 'dummy_min_scorer', + 'Dummy Scorer calculating the minimum value', + min), + SimpleFunctionScorer( + 'dummy_max_scorer', + 'Dummy Scorer calculating the maximum value', + max), + SimpleFunctionScorer( + 'dummy_avg_scorer', + 'Dummy Scorer calculating the average value', + lambda x: float(sum(x)) / len(x)), + ] + + +class SimpleFunctionScorer(scoring_engine.ScoringEngine): + """A simple generic scoring engine for demonstration purposes only. + + A generic scoring engine implementation, which is expecting a JSON + formatted array of numbers to be passed as an input for score calculation. + It then executes the aggregate function on this array and returns an + array with a single aggregated number (also JSON formatted). + """ + + def __init__(self, name, description, aggregate_function): + super(SimpleFunctionScorer, self).__init__(config=None) + self._name = name + self._description = description + self._aggregate_function = aggregate_function + + def get_name(self): + return self._name + + def get_description(self): + return self._description + + def get_metainfo(self): + return '' + + def calculate_score(self, features): + LOG.debug('Calculating score, features: %s', features) + + # Basic input validation + try: + flist = jsonutils.loads(features) + except Exception as e: + raise ValueError(_('Unable to parse features: %s') % e) + if type(flist) is not list: + raise ValueError(_('JSON list expected in feature argument')) + if len(flist) < 1: + raise ValueError(_('At least one feature is required')) + + # Calculate the result + result = self._aggregate_function(flist) + + # Return the aggregated result + return jsonutils.dumps([result]) diff --git a/watcher/decision_engine/scoring/scoring_container.py b/watcher/decision_engine/scoring/scoring_container.py new file mode 100644 index 000000000..b9ac29955 --- /dev/null +++ b/watcher/decision_engine/scoring/scoring_container.py @@ -0,0 +1,51 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Intel +# +# Authors: Tomasz Kaczynski +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import six + +from watcher.common.loader import loadable + + +@six.add_metaclass(abc.ABCMeta) +class ScoringEngineContainer(loadable.Loadable): + """A base class for all the Scoring Engines Containers. + + A Scoring Engine Container is an abstraction which allows to plugin + multiple Scoring Engines as a single Stevedore plugin. This enables some + more advanced scenarios like dynamic reloading of Scoring Engine + implementations without having to restart any Watcher services. + """ + + @classmethod + @abc.abstractmethod + def get_scoring_engine_list(self): + """Returns a list of Scoring Engine instances. + + :return: A list of Scoring Engine instances + :rtype: :class: `~.scoring_engine.ScoringEngine` + """ + + @classmethod + def get_config_opts(cls): + """Defines the configuration options to be associated to this loadable + + :return: A list of configuration options relative to this Loadable + :rtype: list of :class:`oslo_config.cfg.Opt` instances + """ + return [] diff --git a/watcher/decision_engine/scoring/scoring_engine.py b/watcher/decision_engine/scoring/scoring_engine.py new file mode 100644 index 000000000..30352071f --- /dev/null +++ b/watcher/decision_engine/scoring/scoring_engine.py @@ -0,0 +1,97 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Intel +# +# Authors: Tomasz Kaczynski +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import six + +from watcher.common.loader import loadable + + +@six.add_metaclass(abc.ABCMeta) +class ScoringEngine(loadable.Loadable): + """A base class for all the Scoring Engines. + + A Scoring Engine is an instance of a data model, to which the learning + data was applied. + + Please note that this class contains non-static and non-class methods by + design, so that it's easy to create multiple Scoring Engine instances + using a single class (possibly configured differently). + """ + + @abc.abstractmethod + def get_name(self): + """Returns the name of the Scoring Engine. + + The name should be unique across all Scoring Engines. + + :return: A Scoring Engine name + :rtype: str + """ + + @abc.abstractmethod + def get_description(self): + """Returns the description of the Scoring Engine. + + The description might contain any human readable information, which + might be useful for Strategy developers planning to use this Scoring + Engine. It will be also visible in the Watcher API and CLI. + + :return: A Scoring Engine description + :rtype: str + """ + + @abc.abstractmethod + def get_metainfo(self): + """Returns the metadata information about Scoring Engine. + + The metadata might contain a machine-friendly (e.g. in JSON format) + information needed to use this Scoring Engine. For example, some + Scoring Engines require to pass the array of features in particular + order to be able to calculate the score value. This order can be + defined in metadata and used in Strategy. + + :return: A Scoring Engine metadata + :rtype: str + """ + + @abc.abstractmethod + def calculate_score(self, features): + """Calculates a score value based on arguments passed. + + Scoring Engines might be very different to each other. They might + solve different problems or use different algorithms or frameworks + internally. To enable this kind of flexibility, the method takes only + one argument (string) and produces the results in the same format + (string). The consumer of the Scoring Engine is ultimately responsible + for providing the right arguments and parsing the result. + + :param features: Input data for Scoring Engine + :type features: str + :return: A score result + :rtype: str + """ + + @classmethod + def get_config_opts(cls): + """Defines the configuration options to be associated to this loadable + + :return: A list of configuration options relative to this Loadable + :rtype: list of :class:`oslo_config.cfg.Opt` instances + """ + return [] diff --git a/watcher/decision_engine/scoring/scoring_factory.py b/watcher/decision_engine/scoring/scoring_factory.py new file mode 100644 index 000000000..c716cff8f --- /dev/null +++ b/watcher/decision_engine/scoring/scoring_factory.py @@ -0,0 +1,106 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Intel +# +# Authors: Tomasz Kaczynski +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +A module providing helper methods to work with Scoring Engines. +""" + +from oslo_log import log + +from watcher._i18n import _ +from watcher.decision_engine.loading import default + + +LOG = log.getLogger(__name__) + +_scoring_engine_map = None + + +def get_scoring_engine(scoring_engine_name): + """Returns a Scoring Engine by its name. + + Method retrieves a Scoring Engine instance by its name. Scoring Engine + instances are being cached in memory to avoid enumerating the Stevedore + plugins on each call. + + When called for the first time, it reloads the cache. + + :return: A Scoring Engine instance with a given name + :rtype: :class: + `watcher.decision_engine.scoring.scoring_engine.ScoringEngine` + """ + global _scoring_engine_map + + _reload_scoring_engines() + scoring_engine = _scoring_engine_map.get(scoring_engine_name) + if scoring_engine is None: + raise KeyError(_('Scoring Engine with name=%s not found') + % scoring_engine_name) + + return scoring_engine + + +def get_scoring_engine_list(): + """Returns a list of Scoring Engine instances. + + The main use case for this method is discoverability, so the Scoring + Engine list is always reloaded before returning any results. + + Frequent calling of this method might have a negative performance impact. + + :return: A list of all available Scoring Engine instances + :rtype: List of :class: + `watcher.decision_engine.scoring.scoring_engine.ScoringEngine` + """ + global _scoring_engine_map + + _reload_scoring_engines(True) + return _scoring_engine_map.values() + + +def _reload_scoring_engines(refresh=False): + """Reloads Scoring Engines from Stevedore plugins to memory. + + Please note that two Stevedore entry points are used: + - watcher_scoring_engines: for simple plugin implementations + - watcher_scoring_engine_containers: for container plugins, which enable + the dynamic scenarios (its get_scoring_engine_list method might return + different values on each call) + """ + global _scoring_engine_map + + if _scoring_engine_map is None or refresh: + LOG.debug("Reloading Scoring Engine plugins") + engines = default.DefaultScoringLoader().list_available() + _scoring_engine_map = dict() + + for name in engines.keys(): + se_impl = default.DefaultScoringLoader().load(name) + LOG.debug("Found Scoring Engine plugin: %s" % se_impl.get_name()) + _scoring_engine_map[se_impl.get_name()] = se_impl + + engine_containers = \ + default.DefaultScoringContainerLoader().list_available() + + for container_id, container_cls in engine_containers.items(): + LOG.debug("Found Scoring Engine container plugin: %s" % + container_id) + for se in container_cls.get_scoring_engine_list(): + LOG.debug("Found Scoring Engine plugin: %s" % + se.get_name()) + _scoring_engine_map[se.get_name()] = se diff --git a/watcher/decision_engine/strategy/strategies/__init__.py b/watcher/decision_engine/strategy/strategies/__init__.py index 5d16113ae..403d04bb1 100644 --- a/watcher/decision_engine/strategy/strategies/__init__.py +++ b/watcher/decision_engine/strategy/strategies/__init__.py @@ -16,6 +16,7 @@ from watcher.decision_engine.strategy.strategies import basic_consolidation from watcher.decision_engine.strategy.strategies import dummy_strategy +from watcher.decision_engine.strategy.strategies import dummy_with_scorer from watcher.decision_engine.strategy.strategies import outlet_temp_control from watcher.decision_engine.strategy.strategies import uniform_airflow from watcher.decision_engine.strategy.strategies import \ @@ -26,11 +27,12 @@ from watcher.decision_engine.strategy.strategies import workload_stabilization BasicConsolidation = basic_consolidation.BasicConsolidation OutletTempControl = outlet_temp_control.OutletTempControl DummyStrategy = dummy_strategy.DummyStrategy +DummyWithScorer = dummy_with_scorer.DummyWithScorer VMWorkloadConsolidation = vm_workload_consolidation.VMWorkloadConsolidation WorkloadBalance = workload_balance.WorkloadBalance WorkloadStabilization = workload_stabilization.WorkloadStabilization UniformAirflow = uniform_airflow.UniformAirflow __all__ = ("BasicConsolidation", "OutletTempControl", "DummyStrategy", - "VMWorkloadConsolidation", "WorkloadBalance", + "DummyWithScorer", "VMWorkloadConsolidation", "WorkloadBalance", "WorkloadStabilization", "UniformAirflow") diff --git a/watcher/decision_engine/strategy/strategies/dummy_with_scorer.py b/watcher/decision_engine/strategy/strategies/dummy_with_scorer.py new file mode 100644 index 000000000..d99db22bb --- /dev/null +++ b/watcher/decision_engine/strategy/strategies/dummy_with_scorer.py @@ -0,0 +1,166 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Intel +# +# Authors: Tomasz Kaczynski +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import random + +from oslo_log import log +from oslo_serialization import jsonutils +from oslo_utils import units + +from watcher._i18n import _ +from watcher.decision_engine.scoring import scoring_factory +from watcher.decision_engine.strategy.strategies import base + +LOG = log.getLogger(__name__) + + +class DummyWithScorer(base.DummyBaseStrategy): + """A dummy strategy using dummy scoring engines. + + This is a dummy strategy demonstrating how to work with scoring + engines. One scoring engine is predicting the workload type of a machine + based on the telemetry data, the other one is simply calculating the + average value for given elements in a list. Results are then passed to the + NOP action. + + The strategy is presenting the whole workflow: + - Get a reference to a scoring engine + - Prepare input data (features) for score calculation + - Perform score calculation + - Use scorer's metadata for results interpretation + """ + + DEFAULT_NAME = "dummy_with_scorer" + DEFAULT_DESCRIPTION = "Dummy Strategy with Scorer" + + NOP = "nop" + SLEEP = "sleep" + + def __init__(self, config, osc=None): + """Constructor: the signature should be identical within the subclasses + + :param config: Configuration related to this plugin + :type config: :py:class:`~.Struct` + :param osc: An OpenStackClients instance + :type osc: :py:class:`~.OpenStackClients` instance + """ + + super(DummyWithScorer, self).__init__(config, osc) + + # Setup Scoring Engines + self._workload_scorer = (scoring_factory + .get_scoring_engine('dummy_scorer')) + self._avg_scorer = (scoring_factory + .get_scoring_engine('dummy_avg_scorer')) + + # Get metainfo from Workload Scorer for result intepretation + metainfo = jsonutils.loads(self._workload_scorer.get_metainfo()) + self._workloads = {index: workload + for index, workload in enumerate( + metainfo['workloads'])} + + def pre_execute(self): + pass + + def do_execute(self): + # Simple "hello world" from strategy + param1 = self.input_parameters.param1 + param2 = self.input_parameters.param2 + LOG.debug('DummyWithScorer params: param1=%(p1)f, param2=%(p2)s', + {'p1': param1, 'p2': param2}) + parameters = {'message': 'Hello from Dummy Strategy with Scorer!'} + self.solution.add_action(action_type=self.NOP, + input_parameters=parameters) + + # Demonstrate workload scorer + features = self._generate_random_telemetry() + result_str = self._workload_scorer.calculate_score(features) + LOG.debug('Workload Scorer result: %s', result_str) + + # Parse the result using workloads from scorer's metainfo + result = self._workloads[jsonutils.loads(result_str)[0]] + LOG.debug('Detected Workload: %s', result) + parameters = {'message': 'Detected Workload: %s' % result} + self.solution.add_action(action_type=self.NOP, + input_parameters=parameters) + + # Demonstrate AVG scorer + features = jsonutils.dumps(random.sample(range(1000), 20)) + result_str = self._avg_scorer.calculate_score(features) + LOG.debug('AVG Scorer result: %s', result_str) + result = jsonutils.loads(result_str)[0] + LOG.debug('AVG Scorer result (parsed): %d', result) + parameters = {'message': 'AVG Scorer result: %s' % result} + self.solution.add_action(action_type=self.NOP, + input_parameters=parameters) + + # Sleep action + self.solution.add_action(action_type=self.SLEEP, + input_parameters={'duration': 5.0}) + + def post_execute(self): + pass + + @classmethod + def get_name(cls): + return 'dummy_with_scorer' + + @classmethod + def get_display_name(cls): + return _('Dummy Strategy using sample Scoring Engines') + + @classmethod + def get_translatable_display_name(cls): + return 'Dummy Strategy using sample Scoring Engines' + + @classmethod + def get_schema(cls): + # Mandatory default setting for each element + return { + 'properties': { + 'param1': { + 'description': 'number parameter example', + 'type': 'number', + 'default': 3.2, + 'minimum': 1.0, + 'maximum': 10.2, + }, + 'param2': { + 'description': 'string parameter example', + 'type': "string", + 'default': "hello" + }, + }, + } + + def _generate_random_telemetry(self): + processor_time = random.randint(0, 100) + mem_total_bytes = 4*units.Gi + mem_avail_bytes = random.randint(1*units.Gi, 4*units.Gi) + mem_page_reads = random.randint(0, 2000) + mem_page_writes = random.randint(0, 2000) + disk_read_bytes = random.randint(0*units.Mi, 200*units.Mi) + disk_write_bytes = random.randint(0*units.Mi, 200*units.Mi) + net_bytes_received = random.randint(0*units.Mi, 20*units.Mi) + net_bytes_sent = random.randint(0*units.Mi, 10*units.Mi) + + return jsonutils.dumps([ + processor_time, mem_total_bytes, mem_avail_bytes, + mem_page_reads, mem_page_writes, disk_read_bytes, + disk_write_bytes, net_bytes_received, net_bytes_sent]) diff --git a/watcher/decision_engine/sync.py b/watcher/decision_engine/sync.py index 1bb6951c7..ce7be78b9 100644 --- a/watcher/decision_engine/sync.py +++ b/watcher/decision_engine/sync.py @@ -21,6 +21,7 @@ from oslo_log import log from watcher._i18n import _LI, _LW from watcher.common import context from watcher.decision_engine.loading import default +from watcher.decision_engine.scoring import scoring_factory from watcher import objects from watcher.objects import action_plan as apobjects from watcher.objects import audit as auditobjects @@ -32,6 +33,9 @@ GoalMapping = collections.namedtuple( StrategyMapping = collections.namedtuple( 'StrategyMapping', ['name', 'goal_name', 'display_name', 'parameters_spec']) +ScoringEngineMapping = collections.namedtuple( + 'ScoringEngineMapping', + ['name', 'description', 'metainfo']) IndicatorSpec = collections.namedtuple( 'IndicatorSpec', ['name', 'description', 'unit', 'schema']) @@ -50,10 +54,15 @@ class Syncer(object): self._available_strategies = None self._available_strategies_map = None + self._available_scoringengines = None + self._available_scoringengines_map = None + # This goal mapping maps stale goal IDs to the synced goal self.goal_mapping = dict() # This strategy mapping maps stale strategy IDs to the synced goal self.strategy_mapping = dict() + # Maps stale scoring engine IDs to the synced scoring engines + self.se_mapping = dict() self.stale_audit_templates_map = {} self.stale_audits_map = {} @@ -73,6 +82,14 @@ class Syncer(object): self._available_strategies = objects.Strategy.list(self.ctx) return self._available_strategies + @property + def available_scoringengines(self): + """Scoring Engines loaded from DB""" + if self._available_scoringengines is None: + self._available_scoringengines = (objects.ScoringEngine + .list(self.ctx)) + return self._available_scoringengines + @property def available_goals_map(self): """Mapping of goals loaded from DB""" @@ -101,10 +118,22 @@ class Syncer(object): } return self._available_strategies_map + @property + def available_scoringengines_map(self): + if self._available_scoringengines_map is None: + self._available_scoringengines_map = { + ScoringEngineMapping( + name=s.id, description=s.description, + metainfo=s.metainfo): s + for s in self.available_scoringengines + } + return self._available_scoringengines_map + def sync(self): self.discovered_map = self._discover() goals_map = self.discovered_map["goals"] strategies_map = self.discovered_map["strategies"] + scoringengines_map = self.discovered_map["scoringengines"] for goal_name, goal_map in goals_map.items(): if goal_map in self.available_goals_map: @@ -122,7 +151,16 @@ class Syncer(object): self.strategy_mapping.update(self._sync_strategy(strategy_map)) + for se_name, se_map in scoringengines_map.items(): + if se_map in self.available_scoringengines_map: + LOG.info(_LI("Scoring Engine %s already exists"), + se_name) + continue + + self.se_mapping.update(self._sync_scoringengine(se_map)) + self._sync_objects() + self._soft_delete_removed_scoringengines() def _sync_goal(self, goal_map): goal_name = goal_map.name @@ -181,6 +219,32 @@ class Syncer(object): return strategy_mapping + def _sync_scoringengine(self, scoringengine_map): + scoringengine_name = scoringengine_map.name + se_mapping = dict() + # Scoring Engines matching by id with discovered Scoring engine + matching_scoringengines = [se for se in self.available_scoringengines + if se.name == scoringengine_name] + stale_scoringengines = self._soft_delete_stale_scoringengines( + scoringengine_map, matching_scoringengines) + + if stale_scoringengines or not matching_scoringengines: + scoringengine = objects.ScoringEngine(self.ctx) + scoringengine.name = scoringengine_name + scoringengine.description = scoringengine_map.description + scoringengine.metainfo = scoringengine_map.metainfo + scoringengine.create() + LOG.info(_LI("Scoring Engine %s created"), scoringengine_name) + + # Updating the internal states + self.available_scoringengines_map[scoringengine] = \ + scoringengine_map + # Map the old scoring engine names to the new (equivalent) SE + for matching_scoringengine in matching_scoringengines: + se_mapping[matching_scoringengine.name] = scoringengine + + return se_mapping + def _sync_objects(self): # First we find audit templates, audits and action plans that are stale # because their associated goal or strategy has been modified and we @@ -393,10 +457,22 @@ class Syncer(object): self.stale_action_plans_map[ action_plan.id].state = apobjects.State.CANCELLED + def _soft_delete_removed_scoringengines(self): + removed_se = [ + se for se in self.available_scoringengines + if se.name not in self.discovered_map['scoringengines']] + for se in removed_se: + LOG.info(_LI("Scoring Engine %s removed"), se.name) + se.soft_delete() + def _discover(self): strategies_map = {} goals_map = {} - discovered_map = {"goals": goals_map, "strategies": strategies_map} + scoringengines_map = {} + discovered_map = { + "goals": goals_map, + "strategies": strategies_map, + "scoringengines": scoringengines_map} goal_loader = default.DefaultGoalLoader() implemented_goals = goal_loader.list_available() @@ -419,6 +495,12 @@ class Syncer(object): display_name=strategy_cls.get_translatable_display_name(), parameters_spec=str(strategy_cls.get_schema())) + for se in scoring_factory.get_scoring_engine_list(): + scoringengines_map[se.get_name()] = ScoringEngineMapping( + name=se.get_name(), + description=se.get_description(), + metainfo=se.get_metainfo()) + return discovered_map def _soft_delete_stale_goals(self, goal_map, matching_goals): @@ -462,3 +544,21 @@ class Syncer(object): stale_strategies.append(matching_strategy) return stale_strategies + + def _soft_delete_stale_scoringengines( + self, scoringengine_map, matching_scoringengines): + se_name = scoringengine_map.name + se_description = scoringengine_map.description + se_metainfo = scoringengine_map.metainfo + + stale_scoringengines = [] + for matching_scoringengine in matching_scoringengines: + if (matching_scoringengine.description == se_description and + matching_scoringengine.metainfo == se_metainfo): + LOG.info(_LI("Scoring Engine %s unchanged"), se_name) + else: + LOG.info(_LI("Scoring Engine %s modified"), se_name) + matching_scoringengine.soft_delete() + stale_scoringengines.append(matching_scoringengine) + + return stale_scoringengines diff --git a/watcher/opts.py b/watcher/opts.py index d3dd48f80..dfe518688 100644 --- a/watcher/opts.py +++ b/watcher/opts.py @@ -31,6 +31,8 @@ from watcher.decision_engine.planner import manager as planner_manager PLUGIN_LOADERS = ( applier_loader.DefaultActionLoader, decision_engine_loader.DefaultPlannerLoader, + decision_engine_loader.DefaultScoringLoader, + decision_engine_loader.DefaultScoringContainerLoader, decision_engine_loader.DefaultStrategyLoader, decision_engine_loader.ClusterDataModelCollectorLoader, applier_loader.DefaultWorkFlowEngineLoader, diff --git a/watcher/tests/decision_engine/scoring/__init__.py b/watcher/tests/decision_engine/scoring/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/watcher/tests/decision_engine/scoring/test_dummy_scorer.py b/watcher/tests/decision_engine/scoring/test_dummy_scorer.py new file mode 100644 index 000000000..69ac82488 --- /dev/null +++ b/watcher/tests/decision_engine/scoring/test_dummy_scorer.py @@ -0,0 +1,54 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Intel +# +# Authors: Tomasz Kaczynski +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_serialization import jsonutils + +from watcher.decision_engine.scoring import dummy_scorer +from watcher.tests import base + + +class TestDummyScorer(base.TestCase): + + def setUp(self): + super(TestDummyScorer, self).setUp() + + def test_metadata(self): + scorer = dummy_scorer.DummyScorer(config=None) + self.assertEqual('dummy_scorer', scorer.get_name()) + self.assertTrue('Dummy' in scorer.get_description()) + + metainfo = scorer.get_metainfo() + self.assertTrue('feature_columns' in metainfo) + self.assertTrue('result_columns' in metainfo) + self.assertTrue('workloads' in metainfo) + + def test_calculate_score(self): + scorer = dummy_scorer.DummyScorer(config=None) + + self._assert_result(scorer, 0, '[0, 0, 0, 0, 0, 0, 0, 0, 0]') + self._assert_result(scorer, 0, '[50, 0, 0, 600, 0, 0, 0, 0, 0]') + self._assert_result(scorer, 0, '[0, 0, 0, 0, 600, 0, 0, 0, 0]') + self._assert_result(scorer, 1, '[85, 0, 0, 0, 0, 0, 0, 0, 0]') + self._assert_result(scorer, 2, '[0, 0, 0, 1100, 1100, 0, 0, 0, 0]') + self._assert_result(scorer, 3, + '[0, 0, 0, 0, 0, 70000000, 70000000, 0, 0]') + + def _assert_result(self, scorer, expected, features): + result_str = scorer.calculate_score(features) + actual_result = jsonutils.loads(result_str)[0] + self.assertEqual(expected, actual_result) diff --git a/watcher/tests/decision_engine/scoring/test_dummy_scoring_container.py b/watcher/tests/decision_engine/scoring/test_dummy_scoring_container.py new file mode 100644 index 000000000..25786b314 --- /dev/null +++ b/watcher/tests/decision_engine/scoring/test_dummy_scoring_container.py @@ -0,0 +1,51 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Intel +# +# Authors: Tomasz Kaczynski +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_serialization import jsonutils + +from watcher.decision_engine.scoring import dummy_scoring_container +from watcher.tests import base + + +class TestDummyScoringContainer(base.TestCase): + + def setUp(self): + super(TestDummyScoringContainer, self).setUp() + + def test_get_scoring_engine_list(self): + scorers = (dummy_scoring_container.DummyScoringContainer + .get_scoring_engine_list()) + + self.assertEqual(3, len(scorers)) + self.assertEqual('dummy_min_scorer', scorers[0].get_name()) + self.assertEqual('dummy_max_scorer', scorers[1].get_name()) + self.assertEqual('dummy_avg_scorer', scorers[2].get_name()) + + def test_scorers(self): + scorers = (dummy_scoring_container.DummyScoringContainer + .get_scoring_engine_list()) + + self._assert_result(scorers[0], 1.1, '[1.1, 2.2, 4, 8]') + self._assert_result(scorers[1], 8, '[1.1, 2.2, 4, 8]') + # float(1 + 2 + 4 + 8) / 4 = 15.0 / 4 = 3.75 + self._assert_result(scorers[2], 3.75, '[1, 2, 4, 8]') + + def _assert_result(self, scorer, expected, features): + result_str = scorer.calculate_score(features) + actual_result = jsonutils.loads(result_str)[0] + self.assertEqual(expected, actual_result) diff --git a/watcher/tests/decision_engine/scoring/test_scoring_factory.py b/watcher/tests/decision_engine/scoring/test_scoring_factory.py new file mode 100644 index 000000000..dfb79c68b --- /dev/null +++ b/watcher/tests/decision_engine/scoring/test_scoring_factory.py @@ -0,0 +1,53 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Intel +# +# Authors: Tomasz Kaczynski +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from watcher.decision_engine.scoring import scoring_factory +from watcher.tests import base + + +class TestScoringFactory(base.TestCase): + + def setUp(self): + super(TestScoringFactory, self).setUp() + + def test_get_scoring_engine(self): + scorer = scoring_factory.get_scoring_engine('dummy_scorer') + self.assertEqual('dummy_scorer', scorer.get_name()) + + scorer = scoring_factory.get_scoring_engine('dummy_min_scorer') + self.assertEqual('dummy_min_scorer', scorer.get_name()) + + scorer = scoring_factory.get_scoring_engine('dummy_max_scorer') + self.assertEqual('dummy_max_scorer', scorer.get_name()) + + scorer = scoring_factory.get_scoring_engine('dummy_avg_scorer') + self.assertEqual('dummy_avg_scorer', scorer.get_name()) + + self.assertRaises( + KeyError, + scoring_factory.get_scoring_engine, + 'non_existing_scorer') + + def test_get_scoring_engine_list(self): + scoring_engines = scoring_factory.get_scoring_engine_list() + + engine_names = {'dummy_scorer', 'dummy_min_scorer', + 'dummy_max_scorer', 'dummy_avg_scorer'} + + for scorer in scoring_engines: + self.assertIn(scorer.get_name(), engine_names) diff --git a/watcher/tests/decision_engine/strategy/strategies/test_dummy_with_scorer.py b/watcher/tests/decision_engine/strategy/strategies/test_dummy_with_scorer.py new file mode 100644 index 000000000..11cf33162 --- /dev/null +++ b/watcher/tests/decision_engine/strategy/strategies/test_dummy_with_scorer.py @@ -0,0 +1,61 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2015 b<>com +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from watcher.applier.loading import default +from watcher.common import utils +from watcher.decision_engine.model import model_root +from watcher.decision_engine.strategy import strategies +from watcher.tests import base +from watcher.tests.decision_engine.strategy.strategies import \ + faker_cluster_state + + +class TestDummyWithScorer(base.TestCase): + + def setUp(self): + super(TestDummyWithScorer, self).setUp() + # fake cluster + self.fake_cluster = faker_cluster_state.FakerModelCollector() + + p_model = mock.patch.object( + strategies.DummyWithScorer, "compute_model", + new_callable=mock.PropertyMock) + self.m_model = p_model.start() + self.addCleanup(p_model.stop) + + self.m_model.return_value = model_root.ModelRoot() + self.strategy = strategies.DummyWithScorer(config=mock.Mock()) + + def test_dummy_with_scorer(self): + dummy = strategies.DummyWithScorer(config=mock.Mock()) + dummy.input_parameters = utils.Struct() + dummy.input_parameters.update({'param1': 4.0, 'param2': 'Hi'}) + solution = dummy.execute() + self.assertEqual(4, len(solution.actions)) + + def test_check_parameters(self): + model = self.fake_cluster.generate_scenario_3_with_2_nodes() + self.m_model.return_value = model + self.strategy.input_parameters = utils.Struct() + self.strategy.input_parameters.update({'param1': 4.0, 'param2': 'Hi'}) + solution = self.strategy.execute() + loader = default.DefaultActionLoader() + for action in solution.actions: + loaded_action = loader.load(action['action_type']) + loaded_action.input_parameters = action['input_parameters'] + loaded_action.validate_parameters()