Code refactoring - StrategyContext and Auditendpoint

This patchset aim to remove useless code in StrategyContext
and AuditEndPoint.
This patchset also add a parameter for strategy context to define the
numbers of thread of execute the strategies.

DocImpact
Change-Id: I83e87165b03b42fe6b863921502a300bd94d2982
This commit is contained in:
Jean-Emile DARTOIS
2015-12-17 14:21:18 +01:00
parent 1698eb31f3
commit 4c3073efb4
11 changed files with 116 additions and 95 deletions

View File

@@ -788,6 +788,10 @@
# value) # value)
#publisher_id = watcher.decision.api #publisher_id = watcher.decision.api
# The maximum number of threads that can be used to execute strategies
# (integer value)
#max_workers = 2
[watcher_goals] [watcher_goals]

View File

@@ -20,7 +20,8 @@ from watcher.decision_engine.audit.base import \
BaseAuditHandler BaseAuditHandler
from watcher.decision_engine.messaging.events import Events from watcher.decision_engine.messaging.events import Events
from watcher.decision_engine.planner.default import DefaultPlanner from watcher.decision_engine.planner.default import DefaultPlanner
from watcher.decision_engine.strategy.context.default import StrategyContext from watcher.decision_engine.strategy.context.default import \
DefaultStrategyContext
from watcher.objects.audit import Audit from watcher.objects.audit import Audit
from watcher.objects.audit import AuditStatus from watcher.objects.audit import AuditStatus
from watcher.objects.audit_template import AuditTemplate from watcher.objects.audit_template import AuditTemplate
@@ -29,11 +30,23 @@ LOG = log.getLogger(__name__)
class DefaultAuditHandler(BaseAuditHandler): class DefaultAuditHandler(BaseAuditHandler):
def __init__(self, messaging, model_collector): def __init__(self, messaging, cluster_collector):
super(DefaultAuditHandler, self).__init__() super(DefaultAuditHandler, self).__init__()
self.messaging = messaging self._messaging = messaging
self.model_collector = model_collector self._cluster_collector = cluster_collector
self.strategy_context = StrategyContext() self._strategy_context = DefaultStrategyContext()
@property
def messaging(self):
return self._messaging
@property
def cluster_collector(self):
return self._cluster_collector
@property
def strategy_context(self):
return self._strategy_context
def notify(self, audit_uuid, event_type, status): def notify(self, audit_uuid, event_type, status):
event = Event() event = Event()
@@ -56,27 +69,25 @@ class DefaultAuditHandler(BaseAuditHandler):
try: try:
LOG.debug("Trigger audit %s" % audit_uuid) LOG.debug("Trigger audit %s" % audit_uuid)
# change state to ONGOING # change state of the audit to ONGOING
audit = self.update_audit_state(request_context, audit_uuid, audit = self.update_audit_state(request_context, audit_uuid,
AuditStatus.ONGOING) AuditStatus.ONGOING)
# Retrieve cluster-data-model # Retrieve cluster-data-model
cluster = self.model_collector.get_latest_cluster_data_model() cluster = self.cluster_collector.get_latest_cluster_data_model()
# Select appropriate strategy # Retrieve the Audit Template
audit_template = AuditTemplate.get_by_id(request_context, audit_template = AuditTemplate.get_by_id(request_context,
audit.audit_template_id) audit.audit_template_id)
# execute the strategy
solution = self.strategy_context.execute_strategy(audit_template.
goal, cluster)
self.strategy_context.set_goal(audit_template.goal) # schedule the actions and create in the watcher db the ActionPlan
# compute change requests
solution = self.strategy_context.execute_strategy(cluster)
# create an action plan
planner = DefaultPlanner() planner = DefaultPlanner()
planner.schedule(request_context, audit.id, solution) planner.schedule(request_context, audit.id, solution)
# change state to SUCCEEDED and notify # change state of the audit to SUCCEEDED
self.update_audit_state(request_context, audit_uuid, self.update_audit_state(request_context, audit_uuid,
AuditStatus.SUCCEEDED) AuditStatus.SUCCEEDED)
except Exception as e: except Exception as e:

View File

@@ -16,7 +16,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# #
from concurrent.futures import ThreadPoolExecutor
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
@@ -26,7 +25,6 @@ from watcher.common.messaging.notification_handler import NotificationHandler
from watcher.decision_engine.event.consumer_factory import EventConsumerFactory from watcher.decision_engine.event.consumer_factory import EventConsumerFactory
from watcher.decision_engine.messaging.audit_endpoint import AuditEndpoint from watcher.decision_engine.messaging.audit_endpoint import AuditEndpoint
from watcher.decision_engine.messaging.events import Events from watcher.decision_engine.messaging.events import Events
from watcher.decision_engine.strategy.context.default import StrategyContext
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
@@ -47,31 +45,35 @@ WATCHER_DECISION_ENGINE_OPTS = [
cfg.StrOpt('publisher_id', cfg.StrOpt('publisher_id',
default='watcher.decision.api', default='watcher.decision.api',
help='The identifier used by watcher ' help='The identifier used by watcher '
'module on the message broker') 'module on the message broker'),
cfg.IntOpt('max_workers',
default=2,
required=True,
help='The maximum number of threads that can be used to '
'execute strategies',
),
] ]
decision_engine_opt_group = cfg.OptGroup( decision_engine_opt_group = cfg.OptGroup(name='watcher_decision_engine',
name='watcher_decision_engine', title='Defines the parameters of '
title='Defines the parameters of the module decision engine') 'the module decision engine')
CONF.register_group(decision_engine_opt_group) CONF.register_group(decision_engine_opt_group)
CONF.register_opts(WATCHER_DECISION_ENGINE_OPTS, decision_engine_opt_group) CONF.register_opts(WATCHER_DECISION_ENGINE_OPTS, decision_engine_opt_group)
class DecisionEngineManager(MessagingCore): class DecisionEngineManager(MessagingCore):
def __init__(self): def __init__(self):
super(DecisionEngineManager, self).__init__( super(DecisionEngineManager, self).__init__(
CONF.watcher_decision_engine.publisher_id, CONF.watcher_decision_engine.publisher_id,
CONF.watcher_decision_engine.topic_control, CONF.watcher_decision_engine.topic_control,
CONF.watcher_decision_engine.topic_status, CONF.watcher_decision_engine.topic_status,
api_version=self.API_VERSION, api_version=self.API_VERSION)
)
self.handler = NotificationHandler(self.publisher_id) self.handler = NotificationHandler(self.publisher_id)
self.handler.register_observer(self) self.handler.register_observer(self)
self.add_event_listener(Events.ALL, self.event_receive) self.add_event_listener(Events.ALL, self.event_receive)
# todo(jed) oslo_conf endpoint = AuditEndpoint(self,
self.executor = ThreadPoolExecutor(max_workers=2) max_workers=CONF.watcher_decision_engine.
self.topic_control.add_endpoint(AuditEndpoint(self)) max_workers)
self.context = StrategyContext(self) self.topic_control.add_endpoint(endpoint)
def join(self): def join(self):
self.topic_control.join() self.topic_control.join()

View File

@@ -16,6 +16,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# #
from concurrent.futures import ThreadPoolExecutor
from oslo_log import log from oslo_log import log
from watcher.decision_engine.audit.default import DefaultAuditHandler from watcher.decision_engine.audit.default import DefaultAuditHandler
@@ -26,19 +28,32 @@ LOG = log.getLogger(__name__)
class AuditEndpoint(object): class AuditEndpoint(object):
def __init__(self, de): def __init__(self, messaging, max_workers):
self.de = de self._messaging = messaging
self.manager = CollectorManager() self._collector_manager = CollectorManager()
self._executor = ThreadPoolExecutor(max_workers=max_workers)
@property
def collector_manager(self):
return self._collector_manager
@property
def executor(self):
return self._executor
@property
def messaging(self):
return self._messaging
def do_trigger_audit(self, context, audit_uuid): def do_trigger_audit(self, context, audit_uuid):
model_collector = self.manager.get_cluster_model_collector() model_collector = self.collector_manager.get_cluster_model_collector()
audit = DefaultAuditHandler(self.de, model_collector) audit = DefaultAuditHandler(self.messaging, model_collector)
audit.execute(audit_uuid, context) audit.execute(audit_uuid, context)
def trigger_audit(self, context, audit_uuid): def trigger_audit(self, context, audit_uuid):
LOG.debug("Trigger audit %s" % audit_uuid) LOG.debug("Trigger audit %s" % audit_uuid)
self.de.executor.submit(self.do_trigger_audit, self.executor.submit(self.do_trigger_audit,
context, context,
audit_uuid) audit_uuid)
return audit_uuid return audit_uuid

View File

@@ -23,5 +23,5 @@ import six
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class BaseStrategyContext(object): class BaseStrategyContext(object):
@abc.abstractmethod @abc.abstractmethod
def execute_strategy(self, model): def execute_strategy(self, goal, cluster_data_model):
raise NotImplementedError() raise NotImplementedError()

View File

@@ -15,34 +15,23 @@
# limitations under the License. # limitations under the License.
from oslo_log import log from oslo_log import log
from watcher.decision_engine.planner.default import DefaultPlanner
from watcher.decision_engine.strategy.context.base import BaseStrategyContext from watcher.decision_engine.strategy.context.base import BaseStrategyContext
from watcher.decision_engine.strategy.selection.default import StrategySelector from watcher.decision_engine.strategy.selection.default import \
DefaultStrategySelector
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
class StrategyContext(BaseStrategyContext): class DefaultStrategyContext(BaseStrategyContext):
def __init__(self, broker=None): def __init__(self):
LOG.debug("Initializing decision_engine Engine API ") super(DefaultStrategyContext, self).__init__()
self.strategies = {} LOG.debug("Initializing Strategy Context")
self.selected_strategies = [] self._strategy_selector = DefaultStrategySelector()
self.broker = broker
self.planner = DefaultPlanner()
self.strategy_selector = StrategySelector()
self.goal = None
def add_strategy(self, strategy): @property
self.strategies[strategy.name] = strategy def strategy_selector(self):
self.selected_strategy = strategy.name return self._strategy_selector
def remove_strategy(self, strategy): def execute_strategy(self, goal, cluster_data_model):
pass selected_strategy = self.strategy_selector.define_from_goal(goal)
return selected_strategy.execute(cluster_data_model)
def set_goal(self, goal):
self.goal = goal
def execute_strategy(self, model):
# todo(jed) create thread + refactoring
selected_strategy = self.strategy_selector.define_from_goal(self.goal)
return selected_strategy.execute(model)

View File

@@ -41,9 +41,10 @@ CONF.register_group(goals_opt_group)
CONF.register_opts(WATCHER_GOALS_OPTS, goals_opt_group) CONF.register_opts(WATCHER_GOALS_OPTS, goals_opt_group)
class StrategySelector(BaseSelector): class DefaultStrategySelector(BaseSelector):
def __init__(self): def __init__(self):
super(DefaultStrategySelector, self).__init__()
self.strategy_loader = DefaultStrategyLoader() self.strategy_loader = DefaultStrategyLoader()
def define_from_goal(self, goal_name): def define_from_goal(self, goal_name):

View File

@@ -42,7 +42,6 @@ class TestDefaultAuditHandler(DbTestCase):
def test_trigger_audit_state_success(self): def test_trigger_audit_state_success(self):
model_collector = FakerModelCollector() model_collector = FakerModelCollector()
audit_handler = DefaultAuditHandler(MagicMock(), model_collector) audit_handler = DefaultAuditHandler(MagicMock(), model_collector)
audit_handler.strategy_context.execute_strategy = MagicMock()
audit_handler.execute(self.audit.uuid, self.context) audit_handler.execute(self.audit.uuid, self.context)
audit = Audit.get_by_uuid(self.context, self.audit.uuid) audit = Audit.get_by_uuid(self.context, self.audit.uuid)
self.assertEqual(AuditStatus.SUCCEEDED, audit.state) self.assertEqual(AuditStatus.SUCCEEDED, audit.state)
@@ -51,8 +50,6 @@ class TestDefaultAuditHandler(DbTestCase):
messaging = MagicMock() messaging = MagicMock()
model_collector = FakerModelCollector() model_collector = FakerModelCollector()
audit_handler = DefaultAuditHandler(messaging, model_collector) audit_handler = DefaultAuditHandler(messaging, model_collector)
audit_handler.strategy_context.execute_strategy = MagicMock()
audit_handler.execute(self.audit.uuid, self.context) audit_handler.execute(self.audit.uuid, self.context)
call_on_going = call(Events.TRIGGER_AUDIT.name, { call_on_going = call(Events.TRIGGER_AUDIT.name, {

View File

@@ -15,14 +15,15 @@
# limitations under the License. # limitations under the License.
import mock import mock
from mock import MagicMock from mock import MagicMock
from watcher.common import utils from watcher.common import utils
from watcher.decision_engine.audit.default import DefaultAuditHandler from watcher.decision_engine.audit.default import DefaultAuditHandler
from watcher.decision_engine.messaging.audit_endpoint import AuditEndpoint from watcher.decision_engine.messaging.audit_endpoint import AuditEndpoint
from watcher.metrics_engine.cluster_model_collector.manager import \ from watcher.metrics_engine.cluster_model_collector.manager import \
CollectorManager CollectorManager
from watcher.tests import base from watcher.tests import base
from watcher.tests.decision_engine.strategy.strategies.faker_cluster_state import \ from watcher.tests.decision_engine.strategy.strategies.faker_cluster_state \
FakerModelCollector import FakerModelCollector
class DefaultAuditHandlerMock(DefaultAuditHandler): class DefaultAuditHandlerMock(DefaultAuditHandler):
@@ -36,13 +37,12 @@ class DefaultAuditHandlerMock(DefaultAuditHandler):
class TestAuditEndpoint(base.TestCase): class TestAuditEndpoint(base.TestCase):
def setUp(self): def setUp(self):
super(TestAuditEndpoint, self).setUp() super(TestAuditEndpoint, self).setUp()
self.endpoint = AuditEndpoint(MagicMock())
def test_do_trigger_audit(self): def test_do_trigger_audit(self):
audit_uuid = utils.generate_uuid() audit_uuid = utils.generate_uuid()
model_collector = FakerModelCollector() model_collector = FakerModelCollector()
audit_handler = DefaultAuditHandler(MagicMock(), model_collector) audit_handler = DefaultAuditHandler(MagicMock(), model_collector)
endpoint = AuditEndpoint(audit_handler) endpoint = AuditEndpoint(audit_handler, max_workers=2)
with mock.patch.object(CollectorManager, 'get_cluster_model_collector') \ with mock.patch.object(CollectorManager, 'get_cluster_model_collector') \
as mock_call2: as mock_call2:
@@ -60,7 +60,7 @@ class TestAuditEndpoint(base.TestCase):
model_collector = FakerModelCollector() model_collector = FakerModelCollector()
audit_handler = DefaultAuditHandlerMock(MagicMock(), audit_handler = DefaultAuditHandlerMock(MagicMock(),
model_collector) model_collector)
endpoint = AuditEndpoint(audit_handler) endpoint = AuditEndpoint(audit_handler, max_workers=2)
with mock.patch.object(DefaultAuditHandlerMock, 'executor') \ with mock.patch.object(DefaultAuditHandlerMock, 'executor') \
as mock_call: as mock_call:

View File

@@ -13,19 +13,26 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from mock import MagicMock
from mock import patch
from watcher.decision_engine.strategy.context.default import StrategyContext from watcher.decision_engine.solution.default import DefaultSolution
from watcher.decision_engine.strategy.context.default import \
DefaultStrategyContext
from watcher.decision_engine.strategy.selection.default import \
DefaultStrategySelector
from watcher.decision_engine.strategy.strategies.dummy_strategy import \
DummyStrategy
from watcher.tests import base from watcher.tests import base
class FakeStrategy(object):
def __init__(self):
self.name = "BALANCE_LOAD"
class TestStrategyContext(base.BaseTestCase): class TestStrategyContext(base.BaseTestCase):
def test_add_remove_strategy(self): strategy_context = DefaultStrategyContext()
strategy = FakeStrategy()
strategy_context = StrategyContext() @patch.object(DefaultStrategySelector, 'define_from_goal')
strategy_context.add_strategy(strategy) def test_execute_strategy(self, mock_call):
strategy_context.remove_strategy(strategy) mock_call.return_value = DummyStrategy()
cluster_data_model = MagicMock()
solution = self.strategy_context.execute_strategy("dummy",
cluster_data_model)
self.assertIsInstance(solution, DefaultSolution)

View File

@@ -19,21 +19,20 @@ from oslo_config import cfg
from watcher.common.exception import WatcherException from watcher.common.exception import WatcherException
from watcher.decision_engine.strategy.loading.default import \ from watcher.decision_engine.strategy.loading.default import \
DefaultStrategyLoader DefaultStrategyLoader
from watcher.decision_engine.strategy.selection.default import StrategySelector from watcher.decision_engine.strategy.selection.default import \
DefaultStrategySelector
from watcher.tests.base import TestCase from watcher.tests.base import TestCase
CONF = cfg.CONF CONF = cfg.CONF
class TestStrategySelector(TestCase): class TestStrategySelector(TestCase):
strategy_selector = DefaultStrategySelector()
strategy_selector = StrategySelector()
@patch.object(DefaultStrategyLoader, 'load') @patch.object(DefaultStrategyLoader, 'load')
def test_define_from_goal(self, mock_call): def test_define_from_goal(self, mock_call):
cfg.CONF.set_override( cfg.CONF.set_override('goals',
'goals', {"DUMMY": "fake"}, group='watcher_goals' {"DUMMY": "fake"}, group='watcher_goals')
)
expected_goal = 'DUMMY' expected_goal = 'DUMMY'
expected_strategy = CONF.watcher_goals.goals[expected_goal] expected_strategy = CONF.watcher_goals.goals[expected_goal]
self.strategy_selector.define_from_goal(expected_goal) self.strategy_selector.define_from_goal(expected_goal)
@@ -41,12 +40,8 @@ class TestStrategySelector(TestCase):
@patch.object(DefaultStrategyLoader, 'load') @patch.object(DefaultStrategyLoader, 'load')
def test_define_from_goal_with_incorrect_mapping(self, mock_call): def test_define_from_goal_with_incorrect_mapping(self, mock_call):
cfg.CONF.set_override( cfg.CONF.set_override('goals', {}, group='watcher_goals')
'goals', {}, group='watcher_goals' self.assertRaises(WatcherException,
) self.strategy_selector.define_from_goal,
self.assertRaises( "DUMMY")
WatcherException,
self.strategy_selector.define_from_goal,
"DUMMY"
)
self.assertEqual(mock_call.call_count, 0) self.assertEqual(mock_call.call_count, 0)