From 4c3073efb411d0bba96efb5d1ccc6d6f944cb3b6 Mon Sep 17 00:00:00 2001 From: Jean-Emile DARTOIS Date: Thu, 17 Dec 2015 14:21:18 +0100 Subject: [PATCH] Code refactoring - StrategyContext and Auditendpoint This patchset aim to remove useless code in StrategyContext and AuditEndPoint. This patchset also add a parameter for strategy context to define the numbers of thread of execute the strategies. DocImpact Change-Id: I83e87165b03b42fe6b863921502a300bd94d2982 --- etc/watcher/watcher.conf.sample | 4 ++ watcher/decision_engine/audit/default.py | 41 ++++++++++++------- watcher/decision_engine/manager.py | 28 +++++++------ .../messaging/audit_endpoint.py | 31 ++++++++++---- .../decision_engine/strategy/context/base.py | 2 +- .../strategy/context/default.py | 37 ++++++----------- .../strategy/selection/default.py | 3 +- .../audit/test_default_audit_handler.py | 3 -- .../messaging/test_audit_endpoint.py | 10 ++--- .../strategy/context/test_strategy_context.py | 29 ++++++++----- .../selector/test_strategy_selector.py | 23 ++++------- 11 files changed, 116 insertions(+), 95 deletions(-) diff --git a/etc/watcher/watcher.conf.sample b/etc/watcher/watcher.conf.sample index 4a1f92b5b..4c00a4499 100644 --- a/etc/watcher/watcher.conf.sample +++ b/etc/watcher/watcher.conf.sample @@ -788,6 +788,10 @@ # value) #publisher_id = watcher.decision.api +# The maximum number of threads that can be used to execute strategies +# (integer value) +#max_workers = 2 + [watcher_goals] diff --git a/watcher/decision_engine/audit/default.py b/watcher/decision_engine/audit/default.py index 34924dfc7..91d36dcbf 100644 --- a/watcher/decision_engine/audit/default.py +++ b/watcher/decision_engine/audit/default.py @@ -20,7 +20,8 @@ from watcher.decision_engine.audit.base import \ BaseAuditHandler from watcher.decision_engine.messaging.events import Events from watcher.decision_engine.planner.default import DefaultPlanner -from watcher.decision_engine.strategy.context.default import StrategyContext +from watcher.decision_engine.strategy.context.default import \ + DefaultStrategyContext from watcher.objects.audit import Audit from watcher.objects.audit import AuditStatus from watcher.objects.audit_template import AuditTemplate @@ -29,11 +30,23 @@ LOG = log.getLogger(__name__) class DefaultAuditHandler(BaseAuditHandler): - def __init__(self, messaging, model_collector): + def __init__(self, messaging, cluster_collector): super(DefaultAuditHandler, self).__init__() - self.messaging = messaging - self.model_collector = model_collector - self.strategy_context = StrategyContext() + self._messaging = messaging + self._cluster_collector = cluster_collector + self._strategy_context = DefaultStrategyContext() + + @property + def messaging(self): + return self._messaging + + @property + def cluster_collector(self): + return self._cluster_collector + + @property + def strategy_context(self): + return self._strategy_context def notify(self, audit_uuid, event_type, status): event = Event() @@ -56,27 +69,25 @@ class DefaultAuditHandler(BaseAuditHandler): try: LOG.debug("Trigger audit %s" % audit_uuid) - # change state to ONGOING + # change state of the audit to ONGOING audit = self.update_audit_state(request_context, audit_uuid, AuditStatus.ONGOING) # Retrieve cluster-data-model - cluster = self.model_collector.get_latest_cluster_data_model() + cluster = self.cluster_collector.get_latest_cluster_data_model() - # Select appropriate strategy + # Retrieve the Audit Template audit_template = AuditTemplate.get_by_id(request_context, audit.audit_template_id) + # execute the strategy + solution = self.strategy_context.execute_strategy(audit_template. + goal, cluster) - self.strategy_context.set_goal(audit_template.goal) - - # compute change requests - solution = self.strategy_context.execute_strategy(cluster) - - # create an action plan + # schedule the actions and create in the watcher db the ActionPlan planner = DefaultPlanner() planner.schedule(request_context, audit.id, solution) - # change state to SUCCEEDED and notify + # change state of the audit to SUCCEEDED self.update_audit_state(request_context, audit_uuid, AuditStatus.SUCCEEDED) except Exception as e: diff --git a/watcher/decision_engine/manager.py b/watcher/decision_engine/manager.py index a16782c04..ac352b4b5 100644 --- a/watcher/decision_engine/manager.py +++ b/watcher/decision_engine/manager.py @@ -16,7 +16,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from concurrent.futures import ThreadPoolExecutor from oslo_config import cfg from oslo_log import log @@ -26,7 +25,6 @@ from watcher.common.messaging.notification_handler import NotificationHandler from watcher.decision_engine.event.consumer_factory import EventConsumerFactory from watcher.decision_engine.messaging.audit_endpoint import AuditEndpoint from watcher.decision_engine.messaging.events import Events -from watcher.decision_engine.strategy.context.default import StrategyContext LOG = log.getLogger(__name__) CONF = cfg.CONF @@ -47,31 +45,35 @@ WATCHER_DECISION_ENGINE_OPTS = [ cfg.StrOpt('publisher_id', default='watcher.decision.api', help='The identifier used by watcher ' - 'module on the message broker') + 'module on the message broker'), + cfg.IntOpt('max_workers', + default=2, + required=True, + help='The maximum number of threads that can be used to ' + 'execute strategies', + ), ] -decision_engine_opt_group = cfg.OptGroup( - name='watcher_decision_engine', - title='Defines the parameters of the module decision engine') +decision_engine_opt_group = cfg.OptGroup(name='watcher_decision_engine', + title='Defines the parameters of ' + 'the module decision engine') CONF.register_group(decision_engine_opt_group) CONF.register_opts(WATCHER_DECISION_ENGINE_OPTS, decision_engine_opt_group) class DecisionEngineManager(MessagingCore): - def __init__(self): super(DecisionEngineManager, self).__init__( CONF.watcher_decision_engine.publisher_id, CONF.watcher_decision_engine.topic_control, CONF.watcher_decision_engine.topic_status, - api_version=self.API_VERSION, - ) + api_version=self.API_VERSION) self.handler = NotificationHandler(self.publisher_id) self.handler.register_observer(self) self.add_event_listener(Events.ALL, self.event_receive) - # todo(jed) oslo_conf - self.executor = ThreadPoolExecutor(max_workers=2) - self.topic_control.add_endpoint(AuditEndpoint(self)) - self.context = StrategyContext(self) + endpoint = AuditEndpoint(self, + max_workers=CONF.watcher_decision_engine. + max_workers) + self.topic_control.add_endpoint(endpoint) def join(self): self.topic_control.join() diff --git a/watcher/decision_engine/messaging/audit_endpoint.py b/watcher/decision_engine/messaging/audit_endpoint.py index 6de6cbbc9..322263cb7 100644 --- a/watcher/decision_engine/messaging/audit_endpoint.py +++ b/watcher/decision_engine/messaging/audit_endpoint.py @@ -16,6 +16,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from concurrent.futures import ThreadPoolExecutor + from oslo_log import log from watcher.decision_engine.audit.default import DefaultAuditHandler @@ -26,19 +28,32 @@ LOG = log.getLogger(__name__) class AuditEndpoint(object): - def __init__(self, de): - self.de = de - self.manager = CollectorManager() + def __init__(self, messaging, max_workers): + self._messaging = messaging + self._collector_manager = CollectorManager() + self._executor = ThreadPoolExecutor(max_workers=max_workers) + + @property + def collector_manager(self): + return self._collector_manager + + @property + def executor(self): + return self._executor + + @property + def messaging(self): + return self._messaging def do_trigger_audit(self, context, audit_uuid): - model_collector = self.manager.get_cluster_model_collector() + model_collector = self.collector_manager.get_cluster_model_collector() - audit = DefaultAuditHandler(self.de, model_collector) + audit = DefaultAuditHandler(self.messaging, model_collector) audit.execute(audit_uuid, context) def trigger_audit(self, context, audit_uuid): LOG.debug("Trigger audit %s" % audit_uuid) - self.de.executor.submit(self.do_trigger_audit, - context, - audit_uuid) + self.executor.submit(self.do_trigger_audit, + context, + audit_uuid) return audit_uuid diff --git a/watcher/decision_engine/strategy/context/base.py b/watcher/decision_engine/strategy/context/base.py index 532b4e028..35c6afe13 100644 --- a/watcher/decision_engine/strategy/context/base.py +++ b/watcher/decision_engine/strategy/context/base.py @@ -23,5 +23,5 @@ import six @six.add_metaclass(abc.ABCMeta) class BaseStrategyContext(object): @abc.abstractmethod - def execute_strategy(self, model): + def execute_strategy(self, goal, cluster_data_model): raise NotImplementedError() diff --git a/watcher/decision_engine/strategy/context/default.py b/watcher/decision_engine/strategy/context/default.py index 3455500ad..4e1103561 100644 --- a/watcher/decision_engine/strategy/context/default.py +++ b/watcher/decision_engine/strategy/context/default.py @@ -15,34 +15,23 @@ # limitations under the License. from oslo_log import log -from watcher.decision_engine.planner.default import DefaultPlanner from watcher.decision_engine.strategy.context.base import BaseStrategyContext -from watcher.decision_engine.strategy.selection.default import StrategySelector +from watcher.decision_engine.strategy.selection.default import \ + DefaultStrategySelector LOG = log.getLogger(__name__) -class StrategyContext(BaseStrategyContext): - def __init__(self, broker=None): - LOG.debug("Initializing decision_engine Engine API ") - self.strategies = {} - self.selected_strategies = [] - self.broker = broker - self.planner = DefaultPlanner() - self.strategy_selector = StrategySelector() - self.goal = None +class DefaultStrategyContext(BaseStrategyContext): + def __init__(self): + super(DefaultStrategyContext, self).__init__() + LOG.debug("Initializing Strategy Context") + self._strategy_selector = DefaultStrategySelector() - def add_strategy(self, strategy): - self.strategies[strategy.name] = strategy - self.selected_strategy = strategy.name + @property + def strategy_selector(self): + return self._strategy_selector - def remove_strategy(self, strategy): - pass - - def set_goal(self, goal): - self.goal = goal - - def execute_strategy(self, model): - # todo(jed) create thread + refactoring - selected_strategy = self.strategy_selector.define_from_goal(self.goal) - return selected_strategy.execute(model) + def execute_strategy(self, goal, cluster_data_model): + selected_strategy = self.strategy_selector.define_from_goal(goal) + return selected_strategy.execute(cluster_data_model) diff --git a/watcher/decision_engine/strategy/selection/default.py b/watcher/decision_engine/strategy/selection/default.py index e22d79224..9a0a5abad 100644 --- a/watcher/decision_engine/strategy/selection/default.py +++ b/watcher/decision_engine/strategy/selection/default.py @@ -41,9 +41,10 @@ CONF.register_group(goals_opt_group) CONF.register_opts(WATCHER_GOALS_OPTS, goals_opt_group) -class StrategySelector(BaseSelector): +class DefaultStrategySelector(BaseSelector): def __init__(self): + super(DefaultStrategySelector, self).__init__() self.strategy_loader = DefaultStrategyLoader() def define_from_goal(self, goal_name): diff --git a/watcher/tests/decision_engine/audit/test_default_audit_handler.py b/watcher/tests/decision_engine/audit/test_default_audit_handler.py index 3932333e7..963182a99 100644 --- a/watcher/tests/decision_engine/audit/test_default_audit_handler.py +++ b/watcher/tests/decision_engine/audit/test_default_audit_handler.py @@ -42,7 +42,6 @@ class TestDefaultAuditHandler(DbTestCase): def test_trigger_audit_state_success(self): model_collector = FakerModelCollector() audit_handler = DefaultAuditHandler(MagicMock(), model_collector) - audit_handler.strategy_context.execute_strategy = MagicMock() audit_handler.execute(self.audit.uuid, self.context) audit = Audit.get_by_uuid(self.context, self.audit.uuid) self.assertEqual(AuditStatus.SUCCEEDED, audit.state) @@ -51,8 +50,6 @@ class TestDefaultAuditHandler(DbTestCase): messaging = MagicMock() model_collector = FakerModelCollector() audit_handler = DefaultAuditHandler(messaging, model_collector) - audit_handler.strategy_context.execute_strategy = MagicMock() - audit_handler.execute(self.audit.uuid, self.context) call_on_going = call(Events.TRIGGER_AUDIT.name, { diff --git a/watcher/tests/decision_engine/messaging/test_audit_endpoint.py b/watcher/tests/decision_engine/messaging/test_audit_endpoint.py index 028a4365a..5e242834a 100644 --- a/watcher/tests/decision_engine/messaging/test_audit_endpoint.py +++ b/watcher/tests/decision_engine/messaging/test_audit_endpoint.py @@ -15,14 +15,15 @@ # limitations under the License. import mock from mock import MagicMock + from watcher.common import utils from watcher.decision_engine.audit.default import DefaultAuditHandler from watcher.decision_engine.messaging.audit_endpoint import AuditEndpoint from watcher.metrics_engine.cluster_model_collector.manager import \ CollectorManager from watcher.tests import base -from watcher.tests.decision_engine.strategy.strategies.faker_cluster_state import \ - FakerModelCollector +from watcher.tests.decision_engine.strategy.strategies.faker_cluster_state \ + import FakerModelCollector class DefaultAuditHandlerMock(DefaultAuditHandler): @@ -36,13 +37,12 @@ class DefaultAuditHandlerMock(DefaultAuditHandler): class TestAuditEndpoint(base.TestCase): def setUp(self): super(TestAuditEndpoint, self).setUp() - self.endpoint = AuditEndpoint(MagicMock()) def test_do_trigger_audit(self): audit_uuid = utils.generate_uuid() model_collector = FakerModelCollector() audit_handler = DefaultAuditHandler(MagicMock(), model_collector) - endpoint = AuditEndpoint(audit_handler) + endpoint = AuditEndpoint(audit_handler, max_workers=2) with mock.patch.object(CollectorManager, 'get_cluster_model_collector') \ as mock_call2: @@ -60,7 +60,7 @@ class TestAuditEndpoint(base.TestCase): model_collector = FakerModelCollector() audit_handler = DefaultAuditHandlerMock(MagicMock(), model_collector) - endpoint = AuditEndpoint(audit_handler) + endpoint = AuditEndpoint(audit_handler, max_workers=2) with mock.patch.object(DefaultAuditHandlerMock, 'executor') \ as mock_call: diff --git a/watcher/tests/decision_engine/strategy/context/test_strategy_context.py b/watcher/tests/decision_engine/strategy/context/test_strategy_context.py index 9d7905d10..b7b23bada 100644 --- a/watcher/tests/decision_engine/strategy/context/test_strategy_context.py +++ b/watcher/tests/decision_engine/strategy/context/test_strategy_context.py @@ -13,19 +13,26 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +from mock import MagicMock +from mock import patch -from watcher.decision_engine.strategy.context.default import StrategyContext +from watcher.decision_engine.solution.default import DefaultSolution +from watcher.decision_engine.strategy.context.default import \ + DefaultStrategyContext +from watcher.decision_engine.strategy.selection.default import \ + DefaultStrategySelector +from watcher.decision_engine.strategy.strategies.dummy_strategy import \ + DummyStrategy from watcher.tests import base -class FakeStrategy(object): - def __init__(self): - self.name = "BALANCE_LOAD" - - class TestStrategyContext(base.BaseTestCase): - def test_add_remove_strategy(self): - strategy = FakeStrategy() - strategy_context = StrategyContext() - strategy_context.add_strategy(strategy) - strategy_context.remove_strategy(strategy) + strategy_context = DefaultStrategyContext() + + @patch.object(DefaultStrategySelector, 'define_from_goal') + def test_execute_strategy(self, mock_call): + mock_call.return_value = DummyStrategy() + cluster_data_model = MagicMock() + solution = self.strategy_context.execute_strategy("dummy", + cluster_data_model) + self.assertIsInstance(solution, DefaultSolution) diff --git a/watcher/tests/decision_engine/strategy/selector/test_strategy_selector.py b/watcher/tests/decision_engine/strategy/selector/test_strategy_selector.py index fdb769896..2d6f5e280 100644 --- a/watcher/tests/decision_engine/strategy/selector/test_strategy_selector.py +++ b/watcher/tests/decision_engine/strategy/selector/test_strategy_selector.py @@ -19,21 +19,20 @@ from oslo_config import cfg from watcher.common.exception import WatcherException from watcher.decision_engine.strategy.loading.default import \ DefaultStrategyLoader -from watcher.decision_engine.strategy.selection.default import StrategySelector +from watcher.decision_engine.strategy.selection.default import \ + DefaultStrategySelector from watcher.tests.base import TestCase CONF = cfg.CONF class TestStrategySelector(TestCase): - - strategy_selector = StrategySelector() + strategy_selector = DefaultStrategySelector() @patch.object(DefaultStrategyLoader, 'load') def test_define_from_goal(self, mock_call): - cfg.CONF.set_override( - 'goals', {"DUMMY": "fake"}, group='watcher_goals' - ) + cfg.CONF.set_override('goals', + {"DUMMY": "fake"}, group='watcher_goals') expected_goal = 'DUMMY' expected_strategy = CONF.watcher_goals.goals[expected_goal] self.strategy_selector.define_from_goal(expected_goal) @@ -41,12 +40,8 @@ class TestStrategySelector(TestCase): @patch.object(DefaultStrategyLoader, 'load') def test_define_from_goal_with_incorrect_mapping(self, mock_call): - cfg.CONF.set_override( - 'goals', {}, group='watcher_goals' - ) - self.assertRaises( - WatcherException, - self.strategy_selector.define_from_goal, - "DUMMY" - ) + cfg.CONF.set_override('goals', {}, group='watcher_goals') + self.assertRaises(WatcherException, + self.strategy_selector.define_from_goal, + "DUMMY") self.assertEqual(mock_call.call_count, 0)