From e96ec2726caabef7d6e1b064f8dcc2cb0c00d938 Mon Sep 17 00:00:00 2001 From: genevieve-nantel Date: Fri, 10 May 2019 14:58:12 -0400 Subject: [PATCH] Implements karbor events notifications Getting the messages from Karbor like: - Plan create/update/delete - Checkpoint create/update/available - Restore create - Scheduled Operations create/delete - Triggers create/update/delete This should add and send the desired messages to RabbitMQ with the start and end identifiers for each message (e.g. karbor.plan_create.start, karbor.plan_create.end). The notification code was done in a similar fashion than the notifications from the Trove project. Implements: blueprint karbor-event-notifications Closes-Bug: 1797462 Change-Id: I0d7ffaa0873d192aeb24c17191683d705044644c --- karbor/api/v1/plans.py | 22 +- karbor/api/v1/providers.py | 27 +- karbor/api/v1/restores.py | 7 +- karbor/api/v1/scheduled_operations.py | 14 +- karbor/api/v1/triggers.py | 25 +- karbor/common/notification.py | 349 ++++++++++++++++++ karbor/tests/unit/common/test_notification.py | 187 ++++++++++ 7 files changed, 608 insertions(+), 23 deletions(-) create mode 100644 karbor/common/notification.py create mode 100644 karbor/tests/unit/common/test_notification.py diff --git a/karbor/api/v1/plans.py b/karbor/api/v1/plans.py index 1ebe47b4..f4004614 100644 --- a/karbor/api/v1/plans.py +++ b/karbor/api/v1/plans.py @@ -25,6 +25,8 @@ from karbor.api.openstack import wsgi from karbor.api.schemas import plans as plan_schema from karbor.api import validation from karbor.common import constants +from karbor.common import notification +from karbor.common.notification import StartNotification from karbor import exception from karbor.i18n import _ @@ -147,7 +149,8 @@ class PlansController(wsgi.Controller): context = req.environ['karbor.context'] LOG.info("Delete plan with id: %s", id, context=context) - + context.notification = notification.KarborPlanDelete( + context, request=req) try: plan = self._plan_get(context, id) except exception.PlanNotFound as error: @@ -157,7 +160,8 @@ class PlansController(wsgi.Controller): project_id = plan.project_id try: - plan.destroy() + with StartNotification(context, id=id): + plan.destroy() except Exception: msg = _("Failed to destroy a plan.") raise exc.HTTPServerError(reason=msg) @@ -245,6 +249,8 @@ class PlansController(wsgi.Controller): context.can(plan_policy.CREATE_POLICY) plan = body['plan'] LOG.debug('Create plan request plan: %s', plan) + context.notification = notification.KarborPlanCreate( + context, request=req) parameters = plan.get("parameters", None) @@ -277,7 +283,9 @@ class PlansController(wsgi.Controller): resource='plans') try: plan = objects.Plan(context=context, **plan_properties) - plan.create() + with StartNotification( + context, name=plan.get('name', None)): + plan.create() QUOTAS.commit(context, reservations) except Exception: with excutils.save_and_reraise_exception(): @@ -295,6 +303,8 @@ class PlansController(wsgi.Controller): def update(self, req, id, body): """Update a plan.""" context = req.environ['karbor.context'] + context.notification = notification.KarborPlanUpdate( + context, request=req) plan = body['plan'] update_dict = {} @@ -327,9 +337,9 @@ class PlansController(wsgi.Controller): except exception.PlanNotFound as error: raise exc.HTTPNotFound(explanation=error.msg) - self._plan_update(context, plan, update_dict) - - plan.update(update_dict) + with StartNotification(context, id=id): + self._plan_update(context, plan, update_dict) + plan.update(update_dict) retval = self._view_builder.detail(req, plan) return retval diff --git a/karbor/api/v1/providers.py b/karbor/api/v1/providers.py index c151602b..e5d9e639 100644 --- a/karbor/api/v1/providers.py +++ b/karbor/api/v1/providers.py @@ -24,6 +24,8 @@ from karbor.api.openstack import wsgi from karbor.api.schemas import checkpoints as checkpoint_schema from karbor.api import validation from karbor.common import constants +from karbor.common import notification +from karbor.common.notification import StartNotification from karbor import exception from karbor.i18n import _ @@ -337,6 +339,8 @@ class ProvidersController(wsgi.Controller): """Creates a new checkpoint.""" context = req.environ['karbor.context'] + context.notification = notification.KarborCheckpointCreate( + context, request=req) LOG.debug('Create checkpoint request ' 'body: %s provider_id:%s', body, provider_id) @@ -404,9 +408,11 @@ class ProvidersController(wsgi.Controller): else: checkpoint_id = None try: - checkpoint_id = self.protection_api.protect( - context, plan, checkpoint_properties) - QUOTAS.commit(context, reservations) + with StartNotification( + context, checkpoint_properties=checkpoint_properties): + checkpoint_id = self.protection_api.protect( + context, plan, checkpoint_properties) + QUOTAS.commit(context, reservations) except Exception as error: if not checkpoint_id: QUOTAS.rollback(context, reservations) @@ -471,6 +477,8 @@ class ProvidersController(wsgi.Controller): """Delete a checkpoint.""" context = req.environ['karbor.context'] context.can(provider_policy.CHECKPOINT_DELETE_POLICY) + context.notification = notification.KarborCheckpointDelete( + context, request=req) LOG.info("Delete checkpoint with id: %s.", checkpoint_id) LOG.info("provider_id: %s.", provider_id) @@ -484,7 +492,8 @@ class ProvidersController(wsgi.Controller): project_id = checkpoint.get('project_id') try: - self.protection_api.delete(context, provider_id, checkpoint_id) + with StartNotification(context, checkpoint_id=checkpoint_id): + self.protection_api.delete(context, provider_id, checkpoint_id) except exception.DeleteCheckpointNotAllowed as error: raise exc.HTTPForbidden(explantion=error.msg) @@ -518,6 +527,8 @@ class ProvidersController(wsgi.Controller): def checkpoints_update(self, req, provider_id, checkpoint_id, body): """Reset a checkpoint's state""" context = req.environ['karbor.context'] + context.notification = notification.KarborCheckpointUpdate( + context, request=req) LOG.info("Reset checkpoint state with id: %s", checkpoint_id) LOG.info("provider_id: %s.", provider_id) @@ -531,10 +542,12 @@ class ProvidersController(wsgi.Controller): raise exc.HTTPBadRequest(explanation=msg) context.can(provider_policy.CHECKPOINT_UPDATE_POLICY) + if body.get("os-resetState"): - state = body["os-resetState"]["state"] - return self._checkpoint_reset_state( - context, provider_id, checkpoint_id, state) + with StartNotification(context, checkpoint_id=checkpoint_id): + state = body["os-resetState"]["state"] + return self._checkpoint_reset_state( + context, provider_id, checkpoint_id, state) else: msg = _("Invalid input.") raise exc.HTTPBadRequest(explanation=msg) diff --git a/karbor/api/v1/restores.py b/karbor/api/v1/restores.py index 08f91ce7..1a377fbf 100644 --- a/karbor/api/v1/restores.py +++ b/karbor/api/v1/restores.py @@ -23,6 +23,8 @@ from karbor.api.openstack import wsgi from karbor.api.schemas import restores as restore_schema from karbor.api import validation from karbor.common import constants +from karbor.common import notification +from karbor.common.notification import StartNotification from karbor import exception from karbor.i18n import _ @@ -200,6 +202,8 @@ class RestoresController(wsgi.Controller): LOG.debug('Create restore request body: %s', body) context = req.environ['karbor.context'] context.can(restore_policy.CREATE_POLICY) + context.notification = notification.KarborRestoreCreate( + context, request=req) restore = body['restore'] LOG.debug('Create restore request : %s', restore) @@ -221,7 +225,8 @@ class RestoresController(wsgi.Controller): # call restore rpc API of protection service try: - self.protection_api.restore(context, restoreobj, restore_auth) + with StartNotification(context, parameters=parameters): + self.protection_api.restore(context, restoreobj, restore_auth) except exception.AccessCheckpointNotAllowed as error: raise exc.HTTPForbidden(explanation=error.msg) except Exception: diff --git a/karbor/api/v1/scheduled_operations.py b/karbor/api/v1/scheduled_operations.py index 4aabedaf..9cc71955 100644 --- a/karbor/api/v1/scheduled_operations.py +++ b/karbor/api/v1/scheduled_operations.py @@ -21,6 +21,8 @@ from karbor.api.openstack import wsgi from karbor.api.schemas import scheduled_operations as \ scheduled_operation_schema from karbor.api import validation +from karbor.common import notification +from karbor.common.notification import StartNotification from karbor import exception from karbor.i18n import _ from karbor import objects @@ -90,6 +92,8 @@ class ScheduledOperationController(wsgi.Controller): context = req.environ['karbor.context'] context.can(scheduled_operation_policy.CREATE_POLICY) + context.notification = notification.KarborScheduledOpsCreate( + context, request=req) operation_info = body['scheduled_operation'] name = operation_info.get("name", None) @@ -123,7 +127,8 @@ class ScheduledOperationController(wsgi.Controller): self._raise_unknown_exception(ex) try: - self._create_scheduled_operation(context, operation) + with StartNotification(context, operation_obj=operation_obj): + self._create_scheduled_operation(context, operation) except Exception: try: operation.destroy() @@ -140,14 +145,17 @@ class ScheduledOperationController(wsgi.Controller): LOG.debug('Delete scheduled operation(%s) start', id) context = req.environ['karbor.context'] + context.notification = notification.KarborScheduledOpsDelete( + context, request=req) operation = self._get_operation_by_id(context, id, ['trigger']) trigger = operation.trigger context.can(scheduled_operation_policy.DELETE_POLICY, operation) try: - self.operationengine_api.delete_scheduled_operation( - context, id, trigger.id) + with StartNotification(context, id=id): + self.operationengine_api.delete_scheduled_operation( + context, id, trigger.id) except (exception.ScheduledOperationStateNotFound, exception.TriggerNotFound, diff --git a/karbor/api/v1/triggers.py b/karbor/api/v1/triggers.py index 2e541859..d27b7c72 100644 --- a/karbor/api/v1/triggers.py +++ b/karbor/api/v1/triggers.py @@ -21,6 +21,8 @@ from karbor.api import common from karbor.api.openstack import wsgi from karbor.api.schemas import triggers as trigger_schema from karbor.api import validation +from karbor.common import notification +from karbor.common.notification import StartNotification from karbor import exception from karbor.i18n import _ from karbor import objects @@ -88,6 +90,8 @@ class TriggersController(wsgi.Controller): context = req.environ['karbor.context'] context.can(trigger_policy.CREATE_POLICY) trigger_info = body['trigger_info'] + context.notification = notification.KarborTriggerCreate( + context, request=req) trigger_name = trigger_info.get("name", None) trigger_type = trigger_info.get("type", None) @@ -103,10 +107,13 @@ class TriggersController(wsgi.Controller): 'properties': trigger_property, } try: - trigger = objects.Trigger(context=context, **trigger_definition) - self.operationengine_api.verify_trigger(context, trigger) - self.operationengine_api.create_trigger(context, trigger) - trigger.create() + with StartNotification( + context, name=trigger_name): + trigger = objects.Trigger( + context=context, **trigger_definition) + self.operationengine_api.verify_trigger(context, trigger) + self.operationengine_api.create_trigger(context, trigger) + trigger.create() except exception.Invalid as ex: raise exc.HTTPBadRequest(explanation=ex.msg) except Exception as ex: @@ -121,6 +128,8 @@ class TriggersController(wsgi.Controller): context = req.environ['karbor.context'] trigger = self._get_trigger_by_id(context, id) + context.notification = notification.KarborTriggerDelete( + context, request=req) context.can(trigger_policy.DELETE_POLICY, trigger) @@ -135,7 +144,8 @@ class TriggersController(wsgi.Controller): raise exc.HTTPFailedDependency(explanation=msg) try: - self.operationengine_api.delete_trigger(context, id) + with StartNotification(context, id=id): + self.operationengine_api.delete_trigger(context, id) except exception.TriggerNotFound as ex: pass except (exception.DeleteTriggerNotAllowed, @@ -152,6 +162,8 @@ class TriggersController(wsgi.Controller): context = req.environ['karbor.context'] trigger = self._get_trigger_by_id(context, id) + context.notification = notification.KarborTriggerUpdate( + context, request=req) context.can(trigger_policy.UPDATE_POLICY, trigger) @@ -177,7 +189,8 @@ class TriggersController(wsgi.Controller): except (exception.TriggerNotFound, Exception) as ex: self._raise_unknown_exception(ex) try: - trigger.save() + with StartNotification(context, id=id): + trigger.save() except Exception as ex: self._raise_unknown_exception(ex) diff --git a/karbor/common/notification.py b/karbor/common/notification.py new file mode 100644 index 00000000..d9f44b41 --- /dev/null +++ b/karbor/common/notification.py @@ -0,0 +1,349 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""The notification module.""" + +import abc +import copy +import traceback + +from karbor import exception +from karbor.i18n import _ +from karbor import rpc +from oslo_config import cfg +from oslo_log import log as logging + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class EndNotification(object): + + @property + def _notifier(self): + """Returns the notification for Karbor API.""" + + return (self.context.notification) + + def __init__(self, context, **kwargs): + self.context = context + self.context.notification.payload.update(kwargs) + + def __enter__(self): + return self.context.notification + + def __exit__(self, etype, value, tb): + if etype: + message = str(value) + exception = traceback.format_exception(etype, value, tb) + self._notifier.notify_exc_info(message, exception) + else: + self._notifier.notify_end() + + +class StartNotification(EndNotification): + + def __enter__(self): + self.context.notification.notify_start() + return super(StartNotification, self).__enter__() + + +class KaborAPINotification(object): + + """The traits of karbor.* notifications.""" + + event_type_format = 'karbor.%s.%s' + notify_callback = None + + @classmethod + def register_notify_callback(cls, callback): + """Callback when a notification is sent out.""" + cls.notify_callback = callback + + @abc.abstractmethod + def event_type(self): + 'Returns the event type (like "create" for karbor.create.start)' + pass + + @abc.abstractmethod + def required_start_traits(self): + 'Returns list of required traits for start notification' + pass + + def optional_start_traits(self): + 'Returns list of optional traits for start notification' + return [] + + def required_end_traits(self): + 'Returns list of required traits for end notification' + return [] + + def optional_end_traits(self): + 'Returns list of optional traits for end notification' + return [] + + def required_error_traits(self): + 'Returns list of required traits for error notification' + return ['message', 'exception'] + + def optional_error_traits(self): + 'Returns list of optional traits for error notification' + return ['id'] + + def required_base_traits(self): + return ['tenant_id', 'client_ip', 'request_id'] + + @property + def request_id(self): + return self.payload['request_id'] + + def __init__(self, context, **kwargs): + self.context = context + self.needs_end_notification = True + + self.payload = {} + + if 'request' in kwargs: + request = kwargs.pop('request') + self.payload.update({ + 'request_id': context.request_id, + 'client_ip': request.remote_addr, + 'tenant_id': context.tenant, + }) + elif 'request_id' not in kwargs: + raise exception.InvalidInput( + reason="Notification must include 'request' property") + + self.payload.update(kwargs) + + def serialize(self, context): + return self.payload + + def validate(self, required_traits): + required_keys = set(required_traits) + provided_keys = set(self.payload.keys()) + if not required_keys.issubset(provided_keys): + msg = (_("The following required keys not defined for" + " notification %(name)s: %(keys)s") + % {'name': self.__class__.__name__, + 'keys': list(required_keys - provided_keys)}) + raise exception.InvalidInput(reason=msg) + + def _notify(self, event_qualifier, required_traits, optional_traits, + **kwargs): + self.payload.update(kwargs) + self.validate(self.required_base_traits() + required_traits) + available_values = self.serialize(self.context) + payload = {k: available_values[k] + for k in self.required_base_traits() + required_traits} + for k in optional_traits: + if k in available_values: + payload[k] = available_values[k] + + qualified_event_type = (KaborAPINotification.event_type_format + % (self.event_type(), event_qualifier)) + LOG.debug('Sending event: %(event_type)s, %(payload)s', + {'event_type': qualified_event_type, 'payload': payload}) + + context = copy.copy(self.context) + del context.notification + notifier = rpc.get_notifier() + notifier.info(context, qualified_event_type, self.payload) + if self.notify_callback: + self.notify_callback(event_qualifier) + + def notify_start(self, **kwargs): + self._notify('start', self.required_start_traits(), + self.optional_start_traits(), **kwargs) + + def notify_end(self, **kwargs): + if self.needs_end_notification: + self._notify('end', self.required_end_traits(), + self.optional_end_traits(), **kwargs) + + def notify_exc_info(self, message, exception): + self.payload.update({ + 'message': message, + 'exception': exception + }) + self._notify('error', self.required_error_traits(), + self.optional_error_traits()) + + +class KarborPlanCreate(KaborAPINotification): + + @abc.abstractmethod + def event_type(self): + return 'plan_create' + + @abc.abstractmethod + def required_start_traits(self): + return ['name'] + + def optional_start_traits(self): + return ['parameters'] + + def required_end_traits(self): + return ['name'] + + +class KarborPlanDelete(KaborAPINotification): + + @abc.abstractmethod + def event_type(self): + return 'plan_delete' + + @abc.abstractmethod + def required_start_traits(self): + return ['id'] + + +class KarborPlanUpdate(KaborAPINotification): + + @abc.abstractmethod + def event_type(self): + return 'plan_update' + + @abc.abstractmethod + def required_start_traits(self): + return ['id'] + + +class KarborTriggerDelete(KaborAPINotification): + + @abc.abstractmethod + def event_type(self): + return 'trigger_delete' + + @abc.abstractmethod + def required_start_traits(self): + return ['id'] + + +class KarborTriggerCreate(KaborAPINotification): + + @abc.abstractmethod + def event_type(self): + return 'trigger_create' + + @abc.abstractmethod + def required_start_traits(self): + return ['name'] + + def optional_start_traits(self): + return ['parameters'] + + def required_end_traits(self): + return ['name'] + + +class KarborTriggerUpdate(KaborAPINotification): + + @abc.abstractmethod + def event_type(self): + return 'trigger_update' + + @abc.abstractmethod + def required_start_traits(self): + return ['id'] + + +class KarborRestoreDelete(KaborAPINotification): + + @abc.abstractmethod + def event_type(self): + return 'restore_delete' + + @abc.abstractmethod + def required_start_traits(self): + return ['id'] + + +class KarborRestoreCreate(KaborAPINotification): + + @abc.abstractmethod + def event_type(self): + return 'restore_create' + + @abc.abstractmethod + def required_start_traits(self): + return ['parameters'] + + def required_end_traits(self): + return ['parameters'] + + +class KarborCheckpointCreate(KaborAPINotification): + + @abc.abstractmethod + def event_type(self): + return 'checkpoint_create' + + @abc.abstractmethod + def required_start_traits(self): + return ['checkpoint_properties'] + + def required_end_traits(self): + return ['checkpoint_properties'] + + +class KarborCheckpointDelete(KaborAPINotification): + + @abc.abstractmethod + def event_type(self): + return 'checkpoint_delete' + + @abc.abstractmethod + def required_start_traits(self): + return ['checkpoint_id'] + + def required_end_traits(self): + return ['checkpoint_id'] + + +class KarborCheckpointUpdate(KaborAPINotification): + + @abc.abstractmethod + def event_type(self): + return 'checkpoint_update' + + @abc.abstractmethod + def required_start_traits(self): + return ['checkpoint_id'] + + +class KarborScheduledOpsCreate(KaborAPINotification): + + @abc.abstractmethod + def event_type(self): + return 'scheduled_operation_create' + + @abc.abstractmethod + def required_start_traits(self): + return ['operation_obj'] + + def required_end_traits(self): + return ['operation_obj'] + + +class KarborScheduledOpsDelete(KaborAPINotification): + + @abc.abstractmethod + def event_type(self): + return 'scheduled_operation_delete' + + @abc.abstractmethod + def required_start_traits(self): + return ['id'] + + def required_end_traits(self): + return ['id'] diff --git a/karbor/tests/unit/common/test_notification.py b/karbor/tests/unit/common/test_notification.py new file mode 100644 index 00000000..2227c0cd --- /dev/null +++ b/karbor/tests/unit/common/test_notification.py @@ -0,0 +1,187 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""The notification module.""" +from mock import Mock +from mock import patch + +from karbor.common import notification +from karbor.common.notification import EndNotification +from karbor.common.notification import StartNotification +from karbor import context +from karbor import exception +from karbor import rpc +from karbor.tests import base + + +class TestEndNotification(base.TestCase): + + def setUp(self): + super(TestEndNotification, self).setUp() + self.context = KarborTestContext(self) + + def test_call(self): + with patch.object(self.context, "notification") as notification: + with EndNotification(self.context): + pass + self.assertTrue(notification.notify_end.called) + + def server_exception(self, server_type): + with patch.object(self.context, "notification") as notification: + try: + with EndNotification(self.context): + raise exception.InvalidInput + except Exception: + self.assertTrue(notification.notify_exc_info.called) + + +class KarborTestContext(context.RequestContext): + + def __init__(self, test_case, **kwargs): + super(KarborTestContext, self).__init__(user_id='demo', + project_id='abcd', + auth_token='efgh') + self.notification = KarborTestNotification( + self, request_id='req_id') + + +class TestStartNotification(base.TestCase): + + def setUp(self): + super(TestStartNotification, self).setUp() + self.context = KarborTestContext(self) + + def test_call(self): + with patch.object(self.context, "notification") as notification: + with StartNotification(self.context): + pass + self.assertTrue(notification.notify_start.called) + + +class KarborTestNotification(notification.KaborAPINotification): + + def event_type(self): + return 'plan_test' + + def required_start_traits(self): + return ['name'] + + def optional_start_traits(self): + return ['parameters'] + + def required_end_traits(self): + return ['name'] + + +class TestKarborNotification(base.TestCase): + + def setUp(self): + super(TestKarborNotification, self).setUp() + self.test_n = KarborTestNotification(Mock(), request=Mock()) + + def test_missing_required_start_traits(self): + self.assertRaisesRegex(exception.InvalidInput, + self.test_n.required_start_traits()[0], + self.test_n.notify_start) + + def test_invalid_start_traits(self): + self.assertRaisesRegex(exception.InvalidInput, + "The following required keys", + self.test_n.notify_start, foo='bar') + + def test_missing_required_end_traits(self): + self.assertRaisesRegex(exception.InvalidInput, + self.test_n.required_end_traits()[0], + self.test_n.notify_end) + + def test_invalid_end_traits(self): + self.assertRaisesRegex(exception.InvalidInput, + "The following required keys", + self.test_n.notify_end, foo='bar') + + def test_missing_required_error_traits(self): + self.assertRaisesRegex(exception.InvalidInput, + self.test_n.required_error_traits()[0], + self.test_n._notify, 'error', + self.test_n.required_error_traits(), []) + + @patch.object(rpc, 'get_notifier') + def test_start_event(self, notifier): + self.test_n.notify_start(name='foo') + self.assertTrue(notifier().info.called) + a, _ = notifier().info.call_args + self.assertEqual('karbor.plan_test.start', a[1]) + + @patch.object(rpc, 'get_notifier') + def test_end_event(self, notifier): + self.test_n.notify_end(name='foo') + self.assertTrue(notifier().info.called) + a, _ = notifier().info.call_args + self.assertEqual('karbor.plan_test.end', a[1]) + + @patch.object(rpc, 'get_notifier') + def test_verify_base_values(self, notifier): + self.test_n.notify_start(name='foo') + self.assertTrue(notifier().info.called) + a, _ = notifier().info.call_args + payload = a[2] + self.assertIn('client_ip', payload) + self.assertIn('request_id', payload) + self.assertIn('tenant_id', payload) + + @patch.object(rpc, 'get_notifier') + def test_verify_required_start_args(self, notifier): + self.test_n.notify_start(name='foo') + self.assertTrue(notifier().info.called) + a, _ = notifier().info.call_args + payload = a[2] + self.assertIn('name', payload) + + @patch.object(rpc, 'get_notifier') + def test_verify_optional_start_args(self, notifier): + self.test_n.notify_start(name='foo', parameters="test") + self.assertTrue(notifier().info.called) + a, _ = notifier().info.call_args + payload = a[2] + self.assertIn('parameters', payload) + + @patch.object(rpc, 'get_notifier') + def test_verify_required_end_args(self, notifier): + self.test_n.notify_end(name='foo') + self.assertTrue(notifier().info.called) + a, _ = notifier().info.call_args + payload = a[2] + self.assertIn('name', payload) + + def _test_notify_callback(self, fn, *args, **kwargs): + with patch.object(rpc, 'get_notifier') as notifier: + mock_callback = Mock() + self.test_n.register_notify_callback(mock_callback) + mock_context = Mock() + mock_context.notification = Mock() + self.test_n.context = mock_context + fn(*args, **kwargs) + self.assertTrue(notifier().info.called) + self.assertTrue(mock_callback.called) + self.test_n.register_notify_callback(None) + + def test_notify_callback(self): + required_keys = { + 'name': 'name', + 'parameters': 'parameters', + } + self._test_notify_callback(self.test_n.notify_start, + **required_keys) + self._test_notify_callback(self.test_n.notify_end, + **required_keys) + self._test_notify_callback(self.test_n.notify_exc_info, + 'error', 'exc')