diff --git a/doc/notification_samples/action-create.json b/doc/notification_samples/action-create.json new file mode 100644 index 000000000..c8dd7a44c --- /dev/null +++ b/doc/notification_samples/action-create.json @@ -0,0 +1,40 @@ +{ + "priority": "INFO", + "payload": { + "watcher_object.namespace": "watcher", + "watcher_object.version": "1.0", + "watcher_object.name": "ActionCreatePayload", + "watcher_object.data": { + "uuid": "10a47dd1-4874-4298-91cf-eff046dbdb8d", + "input_parameters": { + "param2": 2, + "param1": 1 + }, + "created_at": "2016-10-18T09:52:05Z", + "updated_at": null, + "state": "PENDING", + "action_plan": { + "watcher_object.namespace": "watcher", + "watcher_object.version": "1.0", + "watcher_object.name": "TerseActionPlanPayload", + "watcher_object.data": { + "uuid": "76be87bd-3422-43f9-93a0-e85a577e3061", + "global_efficacy": {}, + "created_at": "2016-10-18T09:52:05Z", + "updated_at": null, + "state": "ONGOING", + "audit_uuid": "10a47dd1-4874-4298-91cf-eff046dbdb8d", + "strategy_uuid": "cb3d0b58-4415-4d90-b75b-1e96878730e3", + "deleted_at": null + } + }, + "parents": [], + "action_type": "nop", + "deleted_at": null + } + }, + "publisher_id": "infra-optim:node0", + "timestamp": "2017-01-01 00:00:00.000000", + "event_type": "action.create", + "message_id": "530b409c-9b6b-459b-8f08-f93dbfeb4d41" +} diff --git a/doc/notification_samples/action-delete.json b/doc/notification_samples/action-delete.json new file mode 100644 index 000000000..dbc5ef9e3 --- /dev/null +++ b/doc/notification_samples/action-delete.json @@ -0,0 +1,40 @@ +{ + "priority": "INFO", + "payload": { + "watcher_object.namespace": "watcher", + "watcher_object.version": "1.0", + "watcher_object.name": "ActionDeletePayload", + "watcher_object.data": { + "uuid": "10a47dd1-4874-4298-91cf-eff046dbdb8d", + "input_parameters": { + "param2": 2, + "param1": 1 + }, + "created_at": "2016-10-18T09:52:05Z", + "updated_at": null, + "state": "DELETED", + "action_plan": { + "watcher_object.namespace": "watcher", + "watcher_object.version": "1.0", + "watcher_object.name": "TerseActionPlanPayload", + "watcher_object.data": { + "uuid": "76be87bd-3422-43f9-93a0-e85a577e3061", + "global_efficacy": {}, + "created_at": "2016-10-18T09:52:05Z", + "updated_at": null, + "state": "ONGOING", + "audit_uuid": "10a47dd1-4874-4298-91cf-eff046dbdb8d", + "strategy_uuid": "cb3d0b58-4415-4d90-b75b-1e96878730e3", + "deleted_at": null + } + }, + "parents": [], + "action_type": "nop", + "deleted_at": null + } + }, + "publisher_id": "infra-optim:node0", + "timestamp": "2017-01-01 00:00:00.000000", + "event_type": "action.delete", + "message_id": "530b409c-9b6b-459b-8f08-f93dbfeb4d41" +} diff --git a/doc/notification_samples/action-execution-end.json b/doc/notification_samples/action-execution-end.json new file mode 100644 index 000000000..479a64934 --- /dev/null +++ b/doc/notification_samples/action-execution-end.json @@ -0,0 +1,41 @@ +{ + "priority": "INFO", + "payload": { + "watcher_object.namespace": "watcher", + "watcher_object.version": "1.0", + "watcher_object.name": "ActionExecutionPayload", + "watcher_object.data": { + "uuid": "10a47dd1-4874-4298-91cf-eff046dbdb8d", + "input_parameters": { + "param2": 2, + "param1": 1 + }, + "fault": null, + "created_at": "2016-10-18T09:52:05Z", + "updated_at": null, + "state": "SUCCEEDED", + "action_plan": { + "watcher_object.namespace": "watcher", + "watcher_object.version": "1.0", + "watcher_object.name": "TerseActionPlanPayload", + "watcher_object.data": { + "uuid": "76be87bd-3422-43f9-93a0-e85a577e3061", + "global_efficacy": {}, + "created_at": "2016-10-18T09:52:05Z", + "updated_at": null, + "state": "ONGOING", + "audit_uuid": "10a47dd1-4874-4298-91cf-eff046dbdb8d", + "strategy_uuid": "cb3d0b58-4415-4d90-b75b-1e96878730e3", + "deleted_at": null + } + }, + "parents": [], + "action_type": "nop", + "deleted_at": null + } + }, + "event_type": "action.execution.end", + "publisher_id": "infra-optim:node0", + "timestamp": "2017-01-01 00:00:00.000000", + "message_id": "530b409c-9b6b-459b-8f08-f93dbfeb4d41" +} diff --git a/doc/notification_samples/action-execution-error.json b/doc/notification_samples/action-execution-error.json new file mode 100644 index 000000000..66e237181 --- /dev/null +++ b/doc/notification_samples/action-execution-error.json @@ -0,0 +1,51 @@ +{ + "priority": "ERROR", + "payload": { + "watcher_object.namespace": "watcher", + "watcher_object.version": "1.0", + "watcher_object.name": "ActionExecutionPayload", + "watcher_object.data": { + "uuid": "10a47dd1-4874-4298-91cf-eff046dbdb8d", + "input_parameters": { + "param2": 2, + "param1": 1 + }, + "fault": { + "watcher_object.namespace": "watcher", + "watcher_object.version": "1.0", + "watcher_object.name": "ExceptionPayload", + "watcher_object.data": { + "module_name": "watcher.tests.notifications.test_action_notification", + "exception": "WatcherException", + "exception_message": "TEST", + "function_name": "test_send_action_execution_with_error" + } + }, + "created_at": "2016-10-18T09:52:05Z", + "updated_at": null, + "state": "FAILED", + "action_plan": { + "watcher_object.namespace": "watcher", + "watcher_object.version": "1.0", + "watcher_object.name": "TerseActionPlanPayload", + "watcher_object.data": { + "uuid": "76be87bd-3422-43f9-93a0-e85a577e3061", + "global_efficacy": {}, + "created_at": "2016-10-18T09:52:05Z", + "updated_at": null, + "state": "ONGOING", + "audit_uuid": "10a47dd1-4874-4298-91cf-eff046dbdb8d", + "strategy_uuid": "cb3d0b58-4415-4d90-b75b-1e96878730e3", + "deleted_at": null + } + }, + "parents": [], + "action_type": "nop", + "deleted_at": null + } + }, + "event_type": "action.execution.error", + "publisher_id": "infra-optim:node0", + "timestamp": "2017-01-01 00:00:00.000000", + "message_id": "530b409c-9b6b-459b-8f08-f93dbfeb4d41" +} diff --git a/doc/notification_samples/action-execution-start.json b/doc/notification_samples/action-execution-start.json new file mode 100644 index 000000000..ace78f909 --- /dev/null +++ b/doc/notification_samples/action-execution-start.json @@ -0,0 +1,41 @@ +{ + "priority": "INFO", + "payload": { + "watcher_object.namespace": "watcher", + "watcher_object.version": "1.0", + "watcher_object.name": "ActionExecutionPayload", + "watcher_object.data": { + "uuid": "10a47dd1-4874-4298-91cf-eff046dbdb8d", + "input_parameters": { + "param2": 2, + "param1": 1 + }, + "fault": null, + "created_at": "2016-10-18T09:52:05Z", + "updated_at": null, + "state": "ONGOING", + "action_plan": { + "watcher_object.namespace": "watcher", + "watcher_object.version": "1.0", + "watcher_object.name": "TerseActionPlanPayload", + "watcher_object.data": { + "uuid": "76be87bd-3422-43f9-93a0-e85a577e3061", + "global_efficacy": {}, + "created_at": "2016-10-18T09:52:05Z", + "updated_at": null, + "state": "ONGOING", + "audit_uuid": "10a47dd1-4874-4298-91cf-eff046dbdb8d", + "strategy_uuid": "cb3d0b58-4415-4d90-b75b-1e96878730e3", + "deleted_at": null + } + }, + "parents": [], + "action_type": "nop", + "deleted_at": null + } + }, + "event_type": "action.execution.start", + "publisher_id": "infra-optim:node0", + "timestamp": "2017-01-01 00:00:00.000000", + "message_id": "530b409c-9b6b-459b-8f08-f93dbfeb4d41" +} diff --git a/doc/notification_samples/action-update.json b/doc/notification_samples/action-update.json new file mode 100644 index 000000000..1e88429dc --- /dev/null +++ b/doc/notification_samples/action-update.json @@ -0,0 +1,49 @@ +{ + "priority": "INFO", + "payload": { + "watcher_object.namespace": "watcher", + "watcher_object.version": "1.0", + "watcher_object.name": "ActionUpdatePayload", + "watcher_object.data": { + "uuid": "10a47dd1-4874-4298-91cf-eff046dbdb8d", + "input_parameters": { + "param2": 2, + "param1": 1 + }, + "created_at": "2016-10-18T09:52:05Z", + "updated_at": null, + "state_update": { + "watcher_object.namespace": "watcher", + "watcher_object.version": "1.0", + "watcher_object.name": "ActionStateUpdatePayload", + "watcher_object.data": { + "old_state": "PENDING", + "state": "ONGOING" + } + }, + "state": "ONGOING", + "action_plan": { + "watcher_object.namespace": "watcher", + "watcher_object.version": "1.0", + "watcher_object.name": "TerseActionPlanPayload", + "watcher_object.data": { + "uuid": "76be87bd-3422-43f9-93a0-e85a577e3061", + "global_efficacy": {}, + "created_at": "2016-10-18T09:52:05Z", + "updated_at": null, + "state": "ONGOING", + "audit_uuid": "10a47dd1-4874-4298-91cf-eff046dbdb8d", + "strategy_uuid": "cb3d0b58-4415-4d90-b75b-1e96878730e3", + "deleted_at": null + } + }, + "parents": [], + "action_type": "nop", + "deleted_at": null + } + }, + "event_type": "action.update" + "publisher_id": "infra-optim:node0", + "timestamp": "2017-01-01 00:00:00.000000", + "message_id": "530b409c-9b6b-459b-8f08-f93dbfeb4d41" +} diff --git a/watcher/applier/workflow_engine/base.py b/watcher/applier/workflow_engine/base.py index 2cc4beadf..efe306282 100644 --- a/watcher/applier/workflow_engine/base.py +++ b/watcher/applier/workflow_engine/base.py @@ -18,12 +18,20 @@ import abc +from oslo_log import log import six +from taskflow import task as flow_task +from watcher._i18n import _LE from watcher.applier.actions import factory from watcher.common import clients from watcher.common.loader import loadable +from watcher import notifications from watcher import objects +from watcher.objects import fields + + +LOG = log.getLogger(__name__) @six.add_metaclass(abc.ABCMeta) @@ -72,11 +80,95 @@ class BaseWorkFlowEngine(loadable.Loadable): return self._action_factory def notify(self, action, state): - db_action = objects.Action.get_by_uuid(self.context, action.uuid) + db_action = objects.Action.get_by_uuid(self.context, action.uuid, + eager=True) db_action.state = state db_action.save() - # NOTE(v-francoise): Implement notifications for action @abc.abstractmethod def execute(self, actions): raise NotImplementedError() + + +class BaseTaskFlowActionContainer(flow_task.Task): + + def __init__(self, name, db_action, engine, **kwargs): + super(BaseTaskFlowActionContainer, self).__init__(name=name) + self._db_action = db_action + self._engine = engine + self.loaded_action = None + + @property + def engine(self): + return self._engine + + @property + def action(self): + if self.loaded_action is None: + action = self.engine.action_factory.make_action( + self._db_action, + osc=self._engine.osc) + self.loaded_action = action + return self.loaded_action + + @abc.abstractmethod + def do_pre_execute(self): + raise NotImplementedError() + + @abc.abstractmethod + def do_execute(self, *args, **kwargs): + raise NotImplementedError() + + @abc.abstractmethod + def do_post_execute(self): + raise NotImplementedError() + + # NOTE(alexchadin): taskflow does 3 method calls (pre_execute, execute, + # post_execute) independently. We want to support notifications in base + # class, so child's methods should be named with `do_` prefix and wrapped. + def pre_execute(self): + try: + self.do_pre_execute() + notifications.action.send_execution_notification( + self.engine.context, self._db_action, + fields.NotificationAction.EXECUTION, + fields.NotificationPhase.START) + except Exception as e: + LOG.exception(e) + self.engine.notify(self._db_action, objects.action.State.FAILED) + notifications.action.send_execution_notification( + self.engine.context, self._db_action, + fields.NotificationAction.EXECUTION, + fields.NotificationPhase.ERROR, + priority=fields.NotificationPriority.ERROR) + + def execute(self, *args, **kwargs): + try: + self.do_execute(*args, **kwargs) + notifications.action.send_execution_notification( + self.engine.context, self._db_action, + fields.NotificationAction.EXECUTION, + fields.NotificationPhase.END) + except Exception as e: + LOG.exception(e) + LOG.error(_LE('The workflow engine has failed ' + 'to execute the action: %s'), self.name) + self.engine.notify(self._db_action, objects.action.State.FAILED) + notifications.action.send_execution_notification( + self.engine.context, self._db_action, + fields.NotificationAction.EXECUTION, + fields.NotificationPhase.ERROR, + priority=fields.NotificationPriority.ERROR) + raise + + def post_execute(self): + try: + self.do_post_execute() + except Exception as e: + LOG.exception(e) + self.engine.notify(self._db_action, objects.action.State.FAILED) + notifications.action.send_execution_notification( + self.engine.context, self._db_action, + fields.NotificationAction.EXECUTION, + fields.NotificationPhase.ERROR, + priority=fields.NotificationPriority.ERROR) diff --git a/watcher/applier/workflow_engine/default.py b/watcher/applier/workflow_engine/default.py index 21c932010..47f70e658 100644 --- a/watcher/applier/workflow_engine/default.py +++ b/watcher/applier/workflow_engine/default.py @@ -22,7 +22,7 @@ from taskflow import engines from taskflow.patterns import graph_flow as gf from taskflow import task as flow_task -from watcher._i18n import _LE, _LW, _LC +from watcher._i18n import _LW, _LC from watcher.applier.workflow_engine import base from watcher.common import exception from watcher import objects @@ -95,60 +95,26 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine): raise exception.WorkflowExecutionException(error=e) -class TaskFlowActionContainer(flow_task.Task): +class TaskFlowActionContainer(base.BaseTaskFlowActionContainer): def __init__(self, db_action, engine): name = "action_type:{0} uuid:{1}".format(db_action.action_type, db_action.uuid) - super(TaskFlowActionContainer, self).__init__(name=name) - self._db_action = db_action - self._engine = engine - self.loaded_action = None + super(TaskFlowActionContainer, self).__init__(name, db_action, engine) - @property - def action(self): - if self.loaded_action is None: - action = self.engine.action_factory.make_action( - self._db_action, - osc=self._engine.osc) - self.loaded_action = action - return self.loaded_action + def do_pre_execute(self): + self.engine.notify(self._db_action, objects.action.State.ONGOING) + LOG.debug("Pre-condition action: %s", self.name) + self.action.pre_condition() - @property - def engine(self): - return self._engine + def do_execute(self, *args, **kwargs): + LOG.debug("Running action: %s", self.name) - def pre_execute(self): - try: - self.engine.notify(self._db_action, objects.action.State.ONGOING) - LOG.debug("Pre-condition action: %s", self.name) - self.action.pre_condition() - except Exception as e: - LOG.exception(e) - self.engine.notify(self._db_action, objects.action.State.FAILED) - raise + self.action.execute() + self.engine.notify(self._db_action, objects.action.State.SUCCEEDED) - def execute(self, *args, **kwargs): - try: - LOG.debug("Running action: %s", self.name) - - self.action.execute() - self.engine.notify(self._db_action, objects.action.State.SUCCEEDED) - except Exception as e: - LOG.exception(e) - LOG.error(_LE('The workflow engine has failed ' - 'to execute the action: %s'), self.name) - - self.engine.notify(self._db_action, objects.action.State.FAILED) - raise - - def post_execute(self): - try: - LOG.debug("Post-condition action: %s", self.name) - self.action.post_condition() - except Exception as e: - LOG.exception(e) - self.engine.notify(self._db_action, objects.action.State.FAILED) - raise + def do_post_execute(self): + LOG.debug("Post-condition action: %s", self.name) + self.action.post_condition() def revert(self, *args, **kwargs): LOG.warning(_LW("Revert action: %s"), self.name) diff --git a/watcher/common/exception.py b/watcher/common/exception.py index 06e434144..3f7704f13 100644 --- a/watcher/common/exception.py +++ b/watcher/common/exception.py @@ -182,6 +182,10 @@ class EagerlyLoadedActionPlanRequired(InvalidActionPlan): msg_fmt = _("Action plan %(action_plan)s was not eagerly loaded") +class EagerlyLoadedActionRequired(InvalidActionPlan): + msg_fmt = _("Action %(action)s was not eagerly loaded") + + class InvalidUUID(Invalid): msg_fmt = _("Expected a uuid but received %(uuid)s") diff --git a/watcher/decision_engine/planner/weight.py b/watcher/decision_engine/planner/weight.py index 9759a6f72..a504a738b 100644 --- a/watcher/decision_engine/planner/weight.py +++ b/watcher/decision_engine/planner/weight.py @@ -84,18 +84,6 @@ class WeightPlanner(base.BasePlanner): default=cls.parallelization), ] - @staticmethod - def format_action(action_plan_id, action_type, - input_parameters=None, parents=()): - return { - 'uuid': utils.generate_uuid(), - 'action_plan_id': int(action_plan_id), - 'action_type': action_type, - 'input_parameters': input_parameters, - 'state': objects.action.State.PENDING, - 'parents': parents or None, - } - @staticmethod def chunkify(lst, n): """Yield successive n-sized chunks from lst.""" @@ -168,7 +156,7 @@ class WeightPlanner(base.BasePlanner): action_plan.state = objects.action_plan.State.SUCCEEDED action_plan.save() - self.create_scheduled_actions(action_plan, action_graph) + self.create_scheduled_actions(action_graph) return action_plan def get_sorted_actions_by_weight(self, context, action_plan, solution): @@ -187,7 +175,7 @@ class WeightPlanner(base.BasePlanner): return reversed(sorted(weighted_actions.items(), key=lambda x: x[0])) - def create_scheduled_actions(self, action_plan, graph): + def create_scheduled_actions(self, graph): for action in graph.nodes(): LOG.debug("Creating the %s in the Watcher database", action.action_type) diff --git a/watcher/notifications/__init__.py b/watcher/notifications/__init__.py index d33d07d17..c2add648e 100644 --- a/watcher/notifications/__init__.py +++ b/watcher/notifications/__init__.py @@ -20,6 +20,7 @@ # need to be changed after we moved these function inside the package # Todo(gibi): remove these imports after legacy notifications using these are # transformed to versioned notifications +from watcher.notifications import action # noqa from watcher.notifications import action_plan # noqa from watcher.notifications import audit # noqa from watcher.notifications import exception # noqa diff --git a/watcher/notifications/action.py b/watcher/notifications/action.py new file mode 100644 index 000000000..449a0125e --- /dev/null +++ b/watcher/notifications/action.py @@ -0,0 +1,302 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2017 Servionica +# +# Authors: Alexander Chadin +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg + +from watcher.common import context as wcontext +from watcher.common import exception +from watcher.notifications import action_plan as ap_notifications +from watcher.notifications import base as notificationbase +from watcher.notifications import exception as exception_notifications +from watcher import objects +from watcher.objects import base +from watcher.objects import fields as wfields + +CONF = cfg.CONF + + +@base.WatcherObjectRegistry.register_notification +class ActionPayload(notificationbase.NotificationPayloadBase): + SCHEMA = { + 'uuid': ('action', 'uuid'), + + 'action_type': ('action', 'action_type'), + 'input_parameters': ('action', 'input_parameters'), + 'state': ('action', 'state'), + 'parents': ('action', 'parents'), + + 'created_at': ('action', 'created_at'), + 'updated_at': ('action', 'updated_at'), + 'deleted_at': ('action', 'deleted_at'), + } + + # Version 1.0: Initial version + VERSION = '1.0' + + fields = { + 'uuid': wfields.UUIDField(), + 'action_type': wfields.StringField(nullable=False), + 'input_parameters': wfields.DictField(nullable=False, default={}), + 'state': wfields.StringField(nullable=False), + 'parents': wfields.ListOfUUIDsField(nullable=False, default=[]), + 'action_plan_uuid': wfields.UUIDField(), + 'action_plan': wfields.ObjectField('TerseActionPlanPayload'), + + 'created_at': wfields.DateTimeField(nullable=True), + 'updated_at': wfields.DateTimeField(nullable=True), + 'deleted_at': wfields.DateTimeField(nullable=True), + } + + def __init__(self, action, **kwargs): + super(ActionPayload, self).__init__(**kwargs) + self.populate_schema(action=action) + + +@base.WatcherObjectRegistry.register_notification +class ActionStateUpdatePayload(notificationbase.NotificationPayloadBase): + # Version 1.0: Initial version + VERSION = '1.0' + + fields = { + 'old_state': wfields.StringField(nullable=True), + 'state': wfields.StringField(nullable=True), + } + + +@base.WatcherObjectRegistry.register_notification +class ActionCreatePayload(ActionPayload): + # Version 1.0: Initial version + VERSION = '1.0' + fields = {} + + def __init__(self, action, action_plan): + super(ActionCreatePayload, self).__init__( + action=action, + action_plan=action_plan) + + +@base.WatcherObjectRegistry.register_notification +class ActionUpdatePayload(ActionPayload): + # Version 1.0: Initial version + VERSION = '1.0' + fields = { + 'state_update': wfields.ObjectField('ActionStateUpdatePayload'), + } + + def __init__(self, action, state_update, action_plan): + super(ActionUpdatePayload, self).__init__( + action=action, + state_update=state_update, + action_plan=action_plan) + + +@base.WatcherObjectRegistry.register_notification +class ActionExecutionPayload(ActionPayload): + # Version 1.0: Initial version + VERSION = '1.0' + fields = { + 'fault': wfields.ObjectField('ExceptionPayload', nullable=True), + } + + def __init__(self, action, action_plan, **kwargs): + super(ActionExecutionPayload, self).__init__( + action=action, + action_plan=action_plan, + **kwargs) + + +@base.WatcherObjectRegistry.register_notification +class ActionDeletePayload(ActionPayload): + # Version 1.0: Initial version + VERSION = '1.0' + fields = {} + + def __init__(self, action, action_plan): + super(ActionDeletePayload, self).__init__( + action=action, + action_plan=action_plan) + + +@notificationbase.notification_sample('action-execution-error.json') +@notificationbase.notification_sample('action-execution-end.json') +@notificationbase.notification_sample('action-execution-start.json') +@base.WatcherObjectRegistry.register_notification +class ActionExecutionNotification(notificationbase.NotificationBase): + # Version 1.0: Initial version + VERSION = '1.0' + + fields = { + 'payload': wfields.ObjectField('ActionExecutionPayload') + } + + +@notificationbase.notification_sample('action-create.json') +@base.WatcherObjectRegistry.register_notification +class ActionCreateNotification(notificationbase.NotificationBase): + # Version 1.0: Initial version + VERSION = '1.0' + + fields = { + 'payload': wfields.ObjectField('ActionCreatePayload') + } + + +@notificationbase.notification_sample('action-update.json') +@base.WatcherObjectRegistry.register_notification +class ActionUpdateNotification(notificationbase.NotificationBase): + # Version 1.0: Initial version + VERSION = '1.0' + + fields = { + 'payload': wfields.ObjectField('ActionUpdatePayload') + } + + +@notificationbase.notification_sample('action-delete.json') +@base.WatcherObjectRegistry.register_notification +class ActionDeleteNotification(notificationbase.NotificationBase): + # Version 1.0: Initial version + VERSION = '1.0' + + fields = { + 'payload': wfields.ObjectField('ActionDeletePayload') + } + + +def _get_action_plan_payload(action): + action_plan = None + strategy_uuid = None + audit = None + try: + action_plan = action.action_plan + audit = objects.Audit.get(wcontext.make_context(show_deleted=True), + action_plan.audit_id) + if audit.strategy_id: + strategy_uuid = objects.Strategy.get( + wcontext.make_context(show_deleted=True), + audit.strategy_id).uuid + except NotImplementedError: + raise exception.EagerlyLoadedActionRequired(action=action.uuid) + + action_plan_payload = ap_notifications.TerseActionPlanPayload( + action_plan=action_plan, + audit_uuid=audit.uuid, strategy_uuid=strategy_uuid) + + return action_plan_payload + + +def send_create(context, action, service='infra-optim', host=None): + """Emit an action.create notification.""" + action_plan_payload = _get_action_plan_payload(action) + + versioned_payload = ActionCreatePayload( + action=action, + action_plan=action_plan_payload, + ) + + notification = ActionCreateNotification( + priority=wfields.NotificationPriority.INFO, + event_type=notificationbase.EventType( + object='action', + action=wfields.NotificationAction.CREATE), + publisher=notificationbase.NotificationPublisher( + host=host or CONF.host, + binary=service), + payload=versioned_payload) + + notification.emit(context) + + +def send_update(context, action, service='infra-optim', + host=None, old_state=None): + """Emit an action.update notification.""" + action_plan_payload = _get_action_plan_payload(action) + + state_update = ActionStateUpdatePayload( + old_state=old_state, + state=action.state if old_state else None) + + versioned_payload = ActionUpdatePayload( + action=action, + state_update=state_update, + action_plan=action_plan_payload, + ) + + notification = ActionUpdateNotification( + priority=wfields.NotificationPriority.INFO, + event_type=notificationbase.EventType( + object='action', + action=wfields.NotificationAction.UPDATE), + publisher=notificationbase.NotificationPublisher( + host=host or CONF.host, + binary=service), + payload=versioned_payload) + + notification.emit(context) + + +def send_delete(context, action, service='infra-optim', host=None): + """Emit an action.delete notification.""" + action_plan_payload = _get_action_plan_payload(action) + + versioned_payload = ActionDeletePayload( + action=action, + action_plan=action_plan_payload, + ) + + notification = ActionDeleteNotification( + priority=wfields.NotificationPriority.INFO, + event_type=notificationbase.EventType( + object='action', + action=wfields.NotificationAction.DELETE), + publisher=notificationbase.NotificationPublisher( + host=host or CONF.host, + binary=service), + payload=versioned_payload) + + notification.emit(context) + + +def send_execution_notification(context, action, notification_action, phase, + priority=wfields.NotificationPriority.INFO, + service='infra-optim', host=None): + """Emit an action execution notification.""" + action_plan_payload = _get_action_plan_payload(action) + + fault = None + if phase == wfields.NotificationPhase.ERROR: + fault = exception_notifications.ExceptionPayload.from_exception() + + versioned_payload = ActionExecutionPayload( + action=action, + action_plan=action_plan_payload, + fault=fault, + ) + + notification = ActionExecutionNotification( + priority=priority, + event_type=notificationbase.EventType( + object='action', + action=notification_action, + phase=phase), + publisher=notificationbase.NotificationPublisher( + host=host or CONF.host, + binary=service), + payload=versioned_payload) + + notification.emit(context) diff --git a/watcher/notifications/action_plan.py b/watcher/notifications/action_plan.py index 1acc2057c..97b714b81 100644 --- a/watcher/notifications/action_plan.py +++ b/watcher/notifications/action_plan.py @@ -32,14 +32,12 @@ CONF = cfg.CONF @base.WatcherObjectRegistry.register_notification -class ActionPlanPayload(notificationbase.NotificationPayloadBase): +class TerseActionPlanPayload(notificationbase.NotificationPayloadBase): SCHEMA = { 'uuid': ('action_plan', 'uuid'), 'state': ('action_plan', 'state'), 'global_efficacy': ('action_plan', 'global_efficacy'), - 'audit_uuid': ('audit', 'uuid'), - 'strategy_uuid': ('strategy', 'uuid'), 'created_at': ('action_plan', 'created_at'), 'updated_at': ('action_plan', 'updated_at'), @@ -54,20 +52,50 @@ class ActionPlanPayload(notificationbase.NotificationPayloadBase): 'state': wfields.StringField(), 'global_efficacy': wfields.FlexibleDictField(nullable=True), 'audit_uuid': wfields.UUIDField(), - 'strategy_uuid': wfields.UUIDField(), - 'audit': wfields.ObjectField('TerseAuditPayload'), - 'strategy': wfields.ObjectField('StrategyPayload'), + 'strategy_uuid': wfields.UUIDField(nullable=True), 'created_at': wfields.DateTimeField(nullable=True), 'updated_at': wfields.DateTimeField(nullable=True), 'deleted_at': wfields.DateTimeField(nullable=True), } + def __init__(self, action_plan, audit=None, strategy=None, **kwargs): + super(TerseActionPlanPayload, self).__init__(audit=audit, + strategy=strategy, + **kwargs) + self.populate_schema(action_plan=action_plan) + + +@base.WatcherObjectRegistry.register_notification +class ActionPlanPayload(TerseActionPlanPayload): + SCHEMA = { + 'uuid': ('action_plan', 'uuid'), + + 'state': ('action_plan', 'state'), + 'global_efficacy': ('action_plan', 'global_efficacy'), + + 'created_at': ('action_plan', 'created_at'), + 'updated_at': ('action_plan', 'updated_at'), + 'deleted_at': ('action_plan', 'deleted_at'), + } + + # Version 1.0: Initial version + VERSION = '1.0' + + fields = { + 'audit': wfields.ObjectField('TerseAuditPayload'), + 'strategy': wfields.ObjectField('StrategyPayload'), + } + def __init__(self, action_plan, audit, strategy, **kwargs): + if not kwargs.get('audit_uuid'): + kwargs['audit_uuid'] = audit.uuid + + if strategy and not kwargs.get('strategy_uuid'): + kwargs['strategy_uuid'] = strategy.uuid + super(ActionPlanPayload, self).__init__( - audit=audit, strategy=strategy, **kwargs) - self.populate_schema( - action_plan=action_plan, audit=audit, strategy=strategy) + action_plan, audit=audit, strategy=strategy, **kwargs) @base.WatcherObjectRegistry.register_notification diff --git a/watcher/notifications/base.py b/watcher/notifications/base.py index cb035c92f..d1c2d0ec0 100644 --- a/watcher/notifications/base.py +++ b/watcher/notifications/base.py @@ -198,7 +198,7 @@ class NotificationBase(NotificationObject): def notification_sample(sample): - """Provide a notification sample of the decatorated notification. + """Provide a notification sample of the decorated notification. Class decorator to attach the notification sample information to the notification object for documentation generation purposes. diff --git a/watcher/objects/action.py b/watcher/objects/action.py index 08ddd4972..539d6619f 100644 --- a/watcher/objects/action.py +++ b/watcher/objects/action.py @@ -17,6 +17,7 @@ from watcher.common import exception from watcher.common import utils from watcher.db import api as db_api +from watcher import notifications from watcher import objects from watcher.objects import base from watcher.objects import fields as wfields @@ -134,6 +135,8 @@ class Action(base.WatcherPersistentObject, base.WatcherObject, # notifications containing information about the related relationships self._from_db_object(self, db_action, eager=True) + notifications.action.send_create(self.obj_context, self) + def destroy(self): """Delete the Action from the DB""" self.dbapi.destroy_action(self.uuid) @@ -150,6 +153,7 @@ class Action(base.WatcherPersistentObject, base.WatcherObject, db_obj = self.dbapi.update_action(self.uuid, updates) obj = self._from_db_object(self, db_obj, eager=False) self.obj_refresh(obj) + notifications.action.send_update(self.obj_context, self) self.obj_reset_changes() @base.remotable @@ -173,3 +177,5 @@ class Action(base.WatcherPersistentObject, base.WatcherObject, obj = self._from_db_object( self.__class__(self._context), db_obj, eager=False) self.obj_refresh(obj) + + notifications.action.send_delete(self.obj_context, self) diff --git a/watcher/objects/action_plan.py b/watcher/objects/action_plan.py index bfb61a16f..4340ee7be 100644 --- a/watcher/objects/action_plan.py +++ b/watcher/objects/action_plan.py @@ -289,7 +289,8 @@ class ActionPlan(base.WatcherPersistentObject, base.WatcherObject, """Soft Delete the Action plan from the DB""" related_actions = objects.Action.list( context=self._context, - filters={"action_plan_uuid": self.uuid}) + filters={"action_plan_uuid": self.uuid}, + eager=True) # Cascade soft_delete of related actions for related_action in related_actions: diff --git a/watcher/objects/fields.py b/watcher/objects/fields.py index 1d26209dc..e05c7b870 100644 --- a/watcher/objects/fields.py +++ b/watcher/objects/fields.py @@ -52,6 +52,10 @@ class DictField(fields.AutoTypedField): AUTO_TYPE = fields.Dict(fields.FieldType()) +class ListOfUUIDsField(fields.AutoTypedField): + AUTO_TYPE = fields.List(fields.UUID()) + + class FlexibleDict(fields.FieldType): @staticmethod def coerce(obj, attr, value): diff --git a/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py b/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py index 1847f3b44..fd72c3349 100644 --- a/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py +++ b/watcher/tests/applier/workflow_engine/test_default_workflow_engine.py @@ -26,6 +26,7 @@ from watcher.applier.actions import factory from watcher.applier.workflow_engine import default as tflow from watcher.common import exception from watcher.common import utils +from watcher import notifications from watcher import objects from watcher.tests.db import base @@ -71,19 +72,19 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) - def create_action(self, action_type, parameters, parents, uuid=None): + def create_action(self, action_type, parameters, parents=None, uuid=None): action = { 'uuid': uuid or utils.generate_uuid(), 'action_plan_id': 0, 'action_type': action_type, 'input_parameters': parameters, 'state': objects.action.State.PENDING, - 'parents': parents, + 'parents': parents or [], } new_action = objects.Action(self.context, **action) - new_action.create() - new_action.save() + with mock.patch.object(notifications.action, 'send_create'): + new_action.create() return new_action @@ -106,8 +107,11 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) - def test_execute_with_one_action(self): - actions = [self.create_action("nop", {'message': 'test'}, None)] + @mock.patch.object(notifications.action, 'send_execution_notification') + @mock.patch.object(notifications.action, 'send_update') + def test_execute_with_one_action(self, mock_send_update, + mock_execution_notification): + actions = [self.create_action("nop", {'message': 'test'})] try: self.engine.execute(actions) self.check_actions_state(actions, objects.action.State.SUCCEEDED) @@ -115,12 +119,15 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) - def test_execute_nop_sleep(self): + @mock.patch.object(notifications.action, 'send_execution_notification') + @mock.patch.object(notifications.action, 'send_update') + def test_execute_nop_sleep(self, mock_send_update, + mock_execution_notification): actions = [] - first_nop = self.create_action("nop", {'message': 'test'}, []) - second_nop = self.create_action("nop", {'message': 'second test'}, []) + first_nop = self.create_action("nop", {'message': 'test'}) + second_nop = self.create_action("nop", {'message': 'second test'}) sleep = self.create_action("sleep", {'duration': 0.0}, - [first_nop.uuid, second_nop.uuid]) + parents=[first_nop.uuid, second_nop.uuid]) actions.extend([first_nop, second_nop, sleep]) try: @@ -130,19 +137,23 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) - def test_execute_with_parents(self): + @mock.patch.object(notifications.action, 'send_execution_notification') + @mock.patch.object(notifications.action, 'send_update') + def test_execute_with_parents(self, mock_send_update, + mock_execution_notification): actions = [] first_nop = self.create_action( - "nop", {'message': 'test'}, [], + "nop", {'message': 'test'}, uuid='bc7eee5c-4fbe-4def-9744-b539be55aa19') second_nop = self.create_action( - "nop", {'message': 'second test'}, [], + "nop", {'message': 'second test'}, uuid='0565bd5c-aa00-46e5-8d81-2cb5cc1ffa23') first_sleep = self.create_action( - "sleep", {'duration': 0.0}, [first_nop.uuid, second_nop.uuid], + "sleep", {'duration': 0.0}, parents=[first_nop.uuid, + second_nop.uuid], uuid='be436531-0da3-4dad-a9c0-ea1d2aff6496') second_sleep = self.create_action( - "sleep", {'duration': 0.0}, [first_sleep.uuid], + "sleep", {'duration': 0.0}, parents=[first_sleep.uuid], uuid='9eb51e14-936d-4d12-a500-6ba0f5e0bb1c') actions.extend([first_nop, second_nop, first_sleep, second_sleep]) @@ -194,10 +205,12 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) - def test_execute_with_two_actions(self): + @mock.patch.object(notifications.action, 'send_execution_notification') + @mock.patch.object(notifications.action, 'send_update') + def test_execute_with_two_actions(self, m_send_update, m_execution): actions = [] - second = self.create_action("sleep", {'duration': 0.0}, None) - first = self.create_action("nop", {'message': 'test'}, None) + second = self.create_action("sleep", {'duration': 0.0}) + first = self.create_action("nop", {'message': 'test'}) actions.append(first) actions.append(second) @@ -209,12 +222,14 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) - def test_execute_with_three_actions(self): + @mock.patch.object(notifications.action, 'send_execution_notification') + @mock.patch.object(notifications.action, 'send_update') + def test_execute_with_three_actions(self, m_send_update, m_execution): actions = [] - third = self.create_action("nop", {'message': 'next'}, None) - second = self.create_action("sleep", {'duration': 0.0}, None) - first = self.create_action("nop", {'message': 'hello'}, None) + third = self.create_action("nop", {'message': 'next'}) + second = self.create_action("sleep", {'duration': 0.0}) + first = self.create_action("nop", {'message': 'hello'}) self.check_action_state(first, objects.action.State.PENDING) self.check_action_state(second, objects.action.State.PENDING) @@ -231,12 +246,14 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): except Exception as exc: self.fail(exc) - def test_execute_with_exception(self): + @mock.patch.object(notifications.action, 'send_execution_notification') + @mock.patch.object(notifications.action, 'send_update') + def test_execute_with_exception(self, m_send_update, m_execution): actions = [] - third = self.create_action("no_exist", {'message': 'next'}, None) - second = self.create_action("sleep", {'duration': 0.0}, None) - first = self.create_action("nop", {'message': 'hello'}, None) + third = self.create_action("no_exist", {'message': 'next'}) + second = self.create_action("sleep", {'duration': 0.0}) + first = self.create_action("nop", {'message': 'hello'}) self.check_action_state(first, objects.action.State.PENDING) self.check_action_state(second, objects.action.State.PENDING) @@ -253,9 +270,12 @@ class TestDefaultWorkFlowEngine(base.DbTestCase): self.check_action_state(second, objects.action.State.SUCCEEDED) self.check_action_state(third, objects.action.State.FAILED) + @mock.patch.object(notifications.action, 'send_execution_notification') + @mock.patch.object(notifications.action, 'send_update') @mock.patch.object(factory.ActionFactory, "make_action") - def test_execute_with_action_exception(self, m_make_action): - actions = [self.create_action("fake_action", {}, None)] + def test_execute_with_action_exception(self, m_make_action, m_send_update, + m_send_execution): + actions = [self.create_action("fake_action", {})] m_make_action.return_value = FakeAction(mock.Mock()) exc = self.assertRaises(exception.WorkflowExecutionException, diff --git a/watcher/tests/decision_engine/planner/test_weight_planner.py b/watcher/tests/decision_engine/planner/test_weight_planner.py index 526f83203..ee41dc446 100644 --- a/watcher/tests/decision_engine/planner/test_weight_planner.py +++ b/watcher/tests/decision_engine/planner/test_weight_planner.py @@ -120,7 +120,7 @@ class TestActionScheduling(base.DbTestCase): self.context, self.audit.id, solution) self.assertIsNotNone(action_plan.uuid) self.assertEqual(1, m_create_scheduled_actions.call_count) - action_graph = m_create_scheduled_actions.call_args[0][1] + action_graph = m_create_scheduled_actions.call_args[0][0] expected_edges = [] @@ -175,7 +175,7 @@ class TestActionScheduling(base.DbTestCase): self.context, self.audit.id, solution) self.assertIsNotNone(action_plan.uuid) self.assertEqual(1, m_create_scheduled_actions.call_count) - action_graph = m_create_scheduled_actions.call_args[0][1] + action_graph = m_create_scheduled_actions.call_args[0][0] expected_edges = \ [({'action_type': 'nop', @@ -231,7 +231,7 @@ class TestActionScheduling(base.DbTestCase): self.context, self.audit.id, solution) self.assertIsNotNone(action_plan.uuid) self.assertEqual(1, m_create_scheduled_actions.call_count) - action_graph = m_create_scheduled_actions.call_args[0][1] + action_graph = m_create_scheduled_actions.call_args[0][0] expected_edges = \ [({'action_type': 'new_action_type', @@ -294,7 +294,7 @@ class TestActionScheduling(base.DbTestCase): self.context, self.audit.id, solution) self.assertIsNotNone(action_plan.uuid) self.assertEqual(1, m_create_scheduled_actions.call_count) - action_graph = m_create_scheduled_actions.call_args[0][1] + action_graph = m_create_scheduled_actions.call_args[0][0] expected_edges = \ [({'action_type': 'migrate', @@ -372,7 +372,7 @@ class TestActionScheduling(base.DbTestCase): self.context, self.audit.id, solution) self.assertIsNotNone(action_plan.uuid) self.assertEqual(1, m_create_scheduled_actions.call_count) - action_graph = m_create_scheduled_actions.call_args[0][1] + action_graph = m_create_scheduled_actions.call_args[0][0] expected_edges = \ [({'action_type': 'migrate', @@ -468,7 +468,7 @@ class TestActionScheduling(base.DbTestCase): self.context, self.audit.id, solution) self.assertIsNotNone(action_plan.uuid) self.assertEqual(1, m_create_scheduled_actions.call_count) - action_graph = m_create_scheduled_actions.call_args[0][1] + action_graph = m_create_scheduled_actions.call_args[0][0] expected_edges = \ [({'action_type': 'migrate', @@ -567,7 +567,7 @@ class TestActionScheduling(base.DbTestCase): self.context, self.audit.id, solution) self.assertIsNotNone(action_plan.uuid) self.assertEqual(1, m_create_scheduled_actions.call_count) - action_graph = m_create_scheduled_actions.call_args[0][1] + action_graph = m_create_scheduled_actions.call_args[0][0] expected_edges = \ [({'action_type': 'resize', @@ -672,7 +672,7 @@ class TestActionScheduling(base.DbTestCase): self.context, self.audit.id, solution) self.assertIsNotNone(action_plan.uuid) self.assertEqual(1, m_create_scheduled_actions.call_count) - action_graph = m_create_scheduled_actions.call_args[0][1] + action_graph = m_create_scheduled_actions.call_args[0][0] expected_edges = \ [({'action_type': 'migrate', @@ -803,7 +803,7 @@ class TestActionScheduling(base.DbTestCase): self.context, self.audit.id, solution) self.assertIsNotNone(action_plan.uuid) self.assertEqual(1, m_create_scheduled_actions.call_count) - action_graph = m_create_scheduled_actions.call_args[0][1] + action_graph = m_create_scheduled_actions.call_args[0][0] expected_edges = \ [({'action_type': 'migrate', diff --git a/watcher/tests/notifications/test_action_notification.py b/watcher/tests/notifications/test_action_notification.py new file mode 100644 index 000000000..2a4a5b2e4 --- /dev/null +++ b/watcher/tests/notifications/test_action_notification.py @@ -0,0 +1,355 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import freezegun +import mock +import oslo_messaging as om + +from watcher.common import exception +from watcher.common import rpc +from watcher import notifications +from watcher import objects +from watcher.tests.db import base +from watcher.tests.objects import utils + + +@freezegun.freeze_time('2016-10-18T09:52:05.219414') +class TestActionNotification(base.DbTestCase): + + def setUp(self): + super(TestActionNotification, self).setUp() + p_get_notifier = mock.patch.object(rpc, 'get_notifier') + m_get_notifier = p_get_notifier.start() + self.addCleanup(p_get_notifier.stop) + self.m_notifier = mock.Mock(spec=om.Notifier) + + def fake_get_notifier(publisher_id): + self.m_notifier.publisher_id = publisher_id + return self.m_notifier + + m_get_notifier.side_effect = fake_get_notifier + self.goal = utils.create_test_goal(mock.Mock()) + self.strategy = utils.create_test_strategy(mock.Mock()) + self.audit = utils.create_test_audit(mock.Mock(), + strategy_id=self.strategy.id) + self.action_plan = utils.create_test_action_plan(mock.Mock()) + + def test_send_invalid_action_plan(self): + action_plan = utils.get_test_action_plan( + mock.Mock(), state='DOESNOTMATTER', audit_id=1) + + self.assertRaises( + exception.InvalidActionPlan, + notifications.action_plan.send_update, + mock.MagicMock(), action_plan, host='node0') + + def test_send_action_update(self): + action = utils.create_test_action( + mock.Mock(), state=objects.action.State.ONGOING, + action_type='nop', input_parameters={'param1': 1, 'param2': 2}, + parents=[], action_plan_id=self.action_plan.id) + notifications.action.send_update( + mock.MagicMock(), action, host='node0', + old_state=objects.action.State.PENDING) + + # The 1st notification is because we created the object. + # The 2nd notification is because we created the action plan object. + self.assertEqual(4, self.m_notifier.info.call_count) + notification = self.m_notifier.info.call_args[1] + payload = notification['payload'] + + self.assertEqual("infra-optim:node0", self.m_notifier.publisher_id) + self.assertDictEqual( + { + 'watcher_object.namespace': 'watcher', + 'watcher_object.version': '1.0', + 'watcher_object.name': 'ActionUpdatePayload', + 'watcher_object.data': { + 'uuid': '10a47dd1-4874-4298-91cf-eff046dbdb8d', + 'input_parameters': { + 'param2': 2, + 'param1': 1 + }, + 'created_at': '2016-10-18T09:52:05Z', + 'updated_at': None, + 'state_update': { + 'watcher_object.namespace': 'watcher', + 'watcher_object.version': '1.0', + 'watcher_object.name': 'ActionStateUpdatePayload', + 'watcher_object.data': { + 'old_state': 'PENDING', + 'state': 'ONGOING' + } + }, + 'state': 'ONGOING', + 'action_plan': { + 'watcher_object.namespace': 'watcher', + 'watcher_object.version': '1.0', + 'watcher_object.name': 'TerseActionPlanPayload', + 'watcher_object.data': { + 'uuid': '76be87bd-3422-43f9-93a0-e85a577e3061', + 'global_efficacy': {}, + 'created_at': '2016-10-18T09:52:05Z', + 'updated_at': None, + 'state': 'ONGOING', + 'audit_uuid': '10a47dd1-4874-4298' + '-91cf-eff046dbdb8d', + 'strategy_uuid': 'cb3d0b58-4415-4d90' + '-b75b-1e96878730e3', + 'deleted_at': None + } + }, + 'parents': [], + 'action_type': 'nop', + 'deleted_at': None + } + }, + payload + ) + + def test_send_action_plan_create(self): + action = utils.create_test_action( + mock.Mock(), state=objects.action.State.PENDING, + action_type='nop', input_parameters={'param1': 1, 'param2': 2}, + parents=[], action_plan_id=self.action_plan.id) + notifications.action.send_create(mock.MagicMock(), action, + host='node0') + + self.assertEqual(4, self.m_notifier.info.call_count) + notification = self.m_notifier.info.call_args[1] + payload = notification['payload'] + + self.assertEqual("infra-optim:node0", self.m_notifier.publisher_id) + self.assertDictEqual( + { + 'watcher_object.namespace': 'watcher', + 'watcher_object.version': '1.0', + 'watcher_object.name': 'ActionCreatePayload', + 'watcher_object.data': { + 'uuid': '10a47dd1-4874-4298-91cf-eff046dbdb8d', + 'input_parameters': { + 'param2': 2, + 'param1': 1 + }, + 'created_at': '2016-10-18T09:52:05Z', + 'updated_at': None, + 'state': 'PENDING', + 'action_plan': { + 'watcher_object.namespace': 'watcher', + 'watcher_object.version': '1.0', + 'watcher_object.name': 'TerseActionPlanPayload', + 'watcher_object.data': { + 'uuid': '76be87bd-3422-43f9-93a0-e85a577e3061', + 'global_efficacy': {}, + 'created_at': '2016-10-18T09:52:05Z', + 'updated_at': None, + 'state': 'ONGOING', + 'audit_uuid': '10a47dd1-4874-4298' + '-91cf-eff046dbdb8d', + 'strategy_uuid': 'cb3d0b58-4415-4d90' + '-b75b-1e96878730e3', + 'deleted_at': None + } + }, + 'parents': [], + 'action_type': 'nop', + 'deleted_at': None + } + }, + payload + ) + + def test_send_action_delete(self): + action = utils.create_test_action( + mock.Mock(), state=objects.action.State.DELETED, + action_type='nop', input_parameters={'param1': 1, 'param2': 2}, + parents=[], action_plan_id=self.action_plan.id) + notifications.action.send_delete(mock.MagicMock(), action, + host='node0') + + # The 1st notification is because we created the audit object. + # The 2nd notification is because we created the action plan object. + self.assertEqual(4, self.m_notifier.info.call_count) + notification = self.m_notifier.info.call_args[1] + payload = notification['payload'] + + self.assertEqual("infra-optim:node0", self.m_notifier.publisher_id) + self.assertDictEqual( + { + 'watcher_object.namespace': 'watcher', + 'watcher_object.version': '1.0', + 'watcher_object.name': 'ActionDeletePayload', + 'watcher_object.data': { + 'uuid': '10a47dd1-4874-4298-91cf-eff046dbdb8d', + 'input_parameters': { + 'param2': 2, + 'param1': 1 + }, + 'created_at': '2016-10-18T09:52:05Z', + 'updated_at': None, + 'state': 'DELETED', + 'action_plan': { + 'watcher_object.namespace': 'watcher', + 'watcher_object.version': '1.0', + 'watcher_object.name': 'TerseActionPlanPayload', + 'watcher_object.data': { + 'uuid': '76be87bd-3422-43f9-93a0-e85a577e3061', + 'global_efficacy': {}, + 'created_at': '2016-10-18T09:52:05Z', + 'updated_at': None, + 'state': 'ONGOING', + 'audit_uuid': '10a47dd1-4874-4298' + '-91cf-eff046dbdb8d', + 'strategy_uuid': 'cb3d0b58-4415-4d90' + '-b75b-1e96878730e3', + 'deleted_at': None + } + }, + 'parents': [], + 'action_type': 'nop', + 'deleted_at': None + } + }, + payload + ) + + def test_send_action_execution(self): + action = utils.create_test_action( + mock.Mock(), state=objects.action.State.PENDING, + action_type='nop', input_parameters={'param1': 1, 'param2': 2}, + parents=[], action_plan_id=self.action_plan.id) + notifications.action.send_execution_notification( + mock.MagicMock(), action, 'execution', phase='start', host='node0') + + # The 1st notification is because we created the audit object. + # The 2nd notification is because we created the action plan object. + self.assertEqual(4, self.m_notifier.info.call_count) + notification = self.m_notifier.info.call_args[1] + + self.assertEqual("infra-optim:node0", self.m_notifier.publisher_id) + self.assertDictEqual( + { + 'event_type': 'action.execution.start', + 'payload': { + 'watcher_object.namespace': 'watcher', + 'watcher_object.version': '1.0', + 'watcher_object.name': 'ActionExecutionPayload', + 'watcher_object.data': { + 'uuid': '10a47dd1-4874-4298-91cf-eff046dbdb8d', + 'input_parameters': { + 'param2': 2, + 'param1': 1 + }, + 'created_at': '2016-10-18T09:52:05Z', + 'fault': None, + 'updated_at': None, + 'state': 'PENDING', + 'action_plan': { + 'watcher_object.namespace': 'watcher', + 'watcher_object.version': '1.0', + 'watcher_object.name': 'TerseActionPlanPayload', + 'watcher_object.data': { + 'uuid': '76be87bd-3422-43f9-93a0-e85a577e3061', + 'global_efficacy': {}, + 'created_at': '2016-10-18T09:52:05Z', + 'updated_at': None, + 'state': 'ONGOING', + 'audit_uuid': '10a47dd1-4874-4298' + '-91cf-eff046dbdb8d', + 'strategy_uuid': 'cb3d0b58-4415-4d90' + '-b75b-1e96878730e3', + 'deleted_at': None + } + }, + 'parents': [], + 'action_type': 'nop', + 'deleted_at': None + } + } + }, + notification + ) + + def test_send_action_execution_with_error(self): + action = utils.create_test_action( + mock.Mock(), state=objects.action.State.FAILED, + action_type='nop', input_parameters={'param1': 1, 'param2': 2}, + parents=[], action_plan_id=self.action_plan.id) + + try: + # This is to load the exception in sys.exc_info() + raise exception.WatcherException("TEST") + except exception.WatcherException: + notifications.action.send_execution_notification( + mock.MagicMock(), action, 'execution', phase='error', + host='node0', priority='error') + + self.assertEqual(1, self.m_notifier.error.call_count) + notification = self.m_notifier.error.call_args[1] + self.assertEqual("infra-optim:node0", self.m_notifier.publisher_id) + self.assertDictEqual( + { + 'event_type': 'action.execution.error', + 'payload': { + 'watcher_object.namespace': 'watcher', + 'watcher_object.version': '1.0', + 'watcher_object.name': 'ActionExecutionPayload', + 'watcher_object.data': { + 'uuid': '10a47dd1-4874-4298-91cf-eff046dbdb8d', + 'input_parameters': { + 'param2': 2, + 'param1': 1 + }, + 'created_at': '2016-10-18T09:52:05Z', + 'fault': { + 'watcher_object.data': { + 'exception': u'WatcherException', + 'exception_message': u'TEST', + 'function_name': ( + 'test_send_action_execution_with_error'), + 'module_name': ( + 'watcher.tests.notifications.' + 'test_action_notification') + }, + 'watcher_object.name': 'ExceptionPayload', + 'watcher_object.namespace': 'watcher', + 'watcher_object.version': '1.0' + }, + 'updated_at': None, + 'state': 'FAILED', + 'action_plan': { + 'watcher_object.namespace': 'watcher', + 'watcher_object.version': '1.0', + 'watcher_object.name': 'TerseActionPlanPayload', + 'watcher_object.data': { + 'uuid': '76be87bd-3422-43f9-93a0-e85a577e3061', + 'global_efficacy': {}, + 'created_at': '2016-10-18T09:52:05Z', + 'updated_at': None, + 'state': 'ONGOING', + 'audit_uuid': '10a47dd1-4874-4298' + '-91cf-eff046dbdb8d', + 'strategy_uuid': 'cb3d0b58-4415-4d90' + '-b75b-1e96878730e3', + 'deleted_at': None + } + }, + 'parents': [], + 'action_type': 'nop', + 'deleted_at': None + } + } + }, + notification + ) diff --git a/watcher/tests/notifications/test_notification.py b/watcher/tests/notifications/test_notification.py index 8331964dd..0f5b91139 100644 --- a/watcher/tests/notifications/test_notification.py +++ b/watcher/tests/notifications/test_notification.py @@ -267,16 +267,27 @@ expected_notification_fingerprints = { 'AuditActionPayload': '1.0-09f5d005f94ba9e5f6b9200170332c52', 'GoalPayload': '1.0-fa1fecb8b01dd047eef808ded4d50d1a', 'StrategyPayload': '1.0-94f01c137b083ac236ae82573c1fcfc1', - 'ActionPlanActionPayload': '1.0-34871caf18e9b43a28899953c1c9733a', + 'ActionPlanActionPayload': '1.0-d9f134708e06cf2ff2d3b8d522ac2aa8', 'ActionPlanCreateNotification': '1.0-9b69de0724fda8310d05e18418178866', - 'ActionPlanCreatePayload': '1.0-ffc3087acd73351b14f3dcc30e105027', + 'ActionPlanCreatePayload': '1.0-23d0abbfa43acfd49b2b3097770efdce', 'ActionPlanDeleteNotification': '1.0-9b69de0724fda8310d05e18418178866', - 'ActionPlanDeletePayload': '1.0-ffc3087acd73351b14f3dcc30e105027', - 'ActionPlanPayload': '1.0-ffc3087acd73351b14f3dcc30e105027', + 'ActionPlanDeletePayload': '1.0-23d0abbfa43acfd49b2b3097770efdce', + 'ActionPlanPayload': '1.0-23d0abbfa43acfd49b2b3097770efdce', 'ActionPlanStateUpdatePayload': '1.0-1a1b606bf14a2c468800c2b010801ce5', 'ActionPlanUpdateNotification': '1.0-9b69de0724fda8310d05e18418178866', - 'ActionPlanUpdatePayload': '1.0-7912a45fe53775c721f42aa87f06a023', + 'ActionPlanUpdatePayload': '1.0-3e1a348a0579c6c43c1c3d7257e3f26b', 'ActionPlanActionNotification': '1.0-9b69de0724fda8310d05e18418178866', + 'ActionCreateNotification': '1.0-9b69de0724fda8310d05e18418178866', + 'ActionCreatePayload': '1.0-519b93b7450319d8928b4b6e6362df31', + 'ActionDeleteNotification': '1.0-9b69de0724fda8310d05e18418178866', + 'ActionDeletePayload': '1.0-519b93b7450319d8928b4b6e6362df31', + 'ActionExecutionNotification': '1.0-9b69de0724fda8310d05e18418178866', + 'ActionExecutionPayload': '1.0-bff9f820a2abf7bb6d7027b7450157df', + 'ActionPayload': '1.0-519b93b7450319d8928b4b6e6362df31', + 'ActionStateUpdatePayload': '1.0-1a1b606bf14a2c468800c2b010801ce5', + 'ActionUpdateNotification': '1.0-9b69de0724fda8310d05e18418178866', + 'ActionUpdatePayload': '1.0-03306c7e7f4d49ac328c261eff6b30b8', + 'TerseActionPlanPayload': '1.0-42bf7a5585cc111a9a4dbc008a04c67e', } diff --git a/watcher/tests/objects/test_action.py b/watcher/tests/objects/test_action.py index a580c8d34..82c6706e3 100644 --- a/watcher/tests/objects/test_action.py +++ b/watcher/tests/objects/test_action.py @@ -19,7 +19,9 @@ import iso8601 import mock from watcher.common import exception +from watcher.common import utils as c_utils from watcher.db.sqlalchemy import api as db_api +from watcher import notifications from watcher import objects from watcher.tests.db import base from watcher.tests.db import utils @@ -47,6 +49,13 @@ class TestActionObject(base.DbTestCase): def setUp(self): super(TestActionObject, self).setUp() + + p_action_notifications = mock.patch.object( + notifications, 'action_plan', autospec=True) + self.m_action_notifications = p_action_notifications.start() + self.addCleanup(p_action_notifications.stop) + self.m_send_update = self.m_action_notifications.send_update + self.fake_action_plan = utils.create_test_action_plan( id=self.action_plan_id) @@ -73,6 +82,7 @@ class TestActionObject(base.DbTestCase): self.context, action_id, eager=self.eager) self.assertEqual(self.context, action._context) self.eager_action_assert(action) + self.assertEqual(0, self.m_send_update.call_count) @mock.patch.object(db_api.Connection, 'get_action_by_uuid') def test_get_by_uuid(self, mock_get_action): @@ -82,6 +92,7 @@ class TestActionObject(base.DbTestCase): mock_get_action.assert_called_once_with( self.context, uuid, eager=self.eager) self.assertEqual(self.context, action._context) + self.assertEqual(0, self.m_send_update.call_count) def test_get_bad_id_and_uuid(self): self.assertRaises(exception.InvalidIdentity, @@ -98,19 +109,31 @@ class TestActionObject(base.DbTestCase): self.assertEqual(self.context, actions[0]._context) for action in actions: self.eager_action_assert(action) + self.assertEqual(0, self.m_send_update.call_count) + @mock.patch.object(objects.Strategy, 'get') + @mock.patch.object(objects.Audit, 'get') @mock.patch.object(db_api.Connection, 'update_action') @mock.patch.object(db_api.Connection, 'get_action_by_uuid') - def test_save(self, mock_get_action, mock_update_action): + def test_save(self, mock_get_action, mock_update_action, mock_get_audit, + mock_get_strategy): mock_get_action.return_value = self.fake_action fake_saved_action = self.fake_action.copy() + mock_get_audit.return_value = mock.PropertyMock( + uuid=c_utils.generate_uuid()) + mock_get_strategy.return_value = mock.PropertyMock( + uuid=c_utils.generate_uuid()) fake_saved_action['updated_at'] = datetime.datetime.utcnow() mock_update_action.return_value = fake_saved_action uuid = self.fake_action['uuid'] action = objects.Action.get_by_uuid( self.context, uuid, eager=self.eager) action.state = objects.action.State.SUCCEEDED - action.save() + if not self.eager: + self.assertRaises(exception.EagerlyLoadedActionRequired, + action.save) + else: + action.save() expected_update_at = fake_saved_action['updated_at'].replace( tzinfo=iso8601.iso8601.Utc()) @@ -121,6 +144,7 @@ class TestActionObject(base.DbTestCase): uuid, {'state': objects.action.State.SUCCEEDED}) self.assertEqual(self.context, action._context) self.assertEqual(expected_update_at, action.updated_at) + self.assertEqual(0, self.m_send_update.call_count) @mock.patch.object(db_api.Connection, 'get_action_by_uuid') def test_refresh(self, mock_get_action): @@ -137,6 +161,7 @@ class TestActionObject(base.DbTestCase): self.assertEqual(expected, mock_get_action.call_args_list) self.assertEqual(self.context, action._context) self.eager_action_assert(action) + self.assertEqual(0, self.m_send_update.call_count) class TestCreateDeleteActionObject(base.DbTestCase): @@ -160,11 +185,14 @@ class TestCreateDeleteActionObject(base.DbTestCase): mock_create_action.assert_called_once_with(expected_action) self.assertEqual(self.context, action._context) + @mock.patch.object(notifications.action, 'send_delete') + @mock.patch.object(notifications.action, 'send_update') @mock.patch.object(db_api.Connection, 'update_action') @mock.patch.object(db_api.Connection, 'soft_delete_action') @mock.patch.object(db_api.Connection, 'get_action_by_uuid') def test_soft_delete(self, mock_get_action, - mock_soft_delete_action, mock_update_action): + mock_soft_delete_action, mock_update_action, + mock_send_update, mock_send_delete): mock_get_action.return_value = self.fake_action fake_deleted_action = self.fake_action.copy() fake_deleted_action['deleted_at'] = datetime.datetime.utcnow()