Browse Source

Implements karbor events notifications

Getting the messages from Karbor like:
  - Plan create/update/delete
  - Checkpoint create/update/available
  - Restore create
  - Scheduled Operations create/delete
  - Triggers create/update/delete

This should add and send the desired messages to RabbitMQ with the
start and end identifiers for each message
(e.g. karbor.plan_create.start, karbor.plan_create.end). The
notification code was done in a similar fashion than the
notifications from the Trove project.

Implements:  blueprint karbor-event-notifications
Closes-Bug: 1797462
Change-Id: I0d7ffaa0873d192aeb24c17191683d705044644c
changes/53/658453/6
genevieve-nantel 2 years ago
parent
commit
e96ec2726c
  1. 22
      karbor/api/v1/plans.py
  2. 27
      karbor/api/v1/providers.py
  3. 7
      karbor/api/v1/restores.py
  4. 14
      karbor/api/v1/scheduled_operations.py
  5. 25
      karbor/api/v1/triggers.py
  6. 349
      karbor/common/notification.py
  7. 187
      karbor/tests/unit/common/test_notification.py

22
karbor/api/v1/plans.py

@ -25,6 +25,8 @@ from karbor.api.openstack import wsgi
from karbor.api.schemas import plans as plan_schema
from karbor.api import validation
from karbor.common import constants
from karbor.common import notification
from karbor.common.notification import StartNotification
from karbor import exception
from karbor.i18n import _
@ -147,7 +149,8 @@ class PlansController(wsgi.Controller):
context = req.environ['karbor.context']
LOG.info("Delete plan with id: %s", id, context=context)
context.notification = notification.KarborPlanDelete(
context, request=req)
try:
plan = self._plan_get(context, id)
except exception.PlanNotFound as error:
@ -157,7 +160,8 @@ class PlansController(wsgi.Controller):
project_id = plan.project_id
try:
plan.destroy()
with StartNotification(context, id=id):
plan.destroy()
except Exception:
msg = _("Failed to destroy a plan.")
raise exc.HTTPServerError(reason=msg)
@ -245,6 +249,8 @@ class PlansController(wsgi.Controller):
context.can(plan_policy.CREATE_POLICY)
plan = body['plan']
LOG.debug('Create plan request plan: %s', plan)
context.notification = notification.KarborPlanCreate(
context, request=req)
parameters = plan.get("parameters", None)
@ -277,7 +283,9 @@ class PlansController(wsgi.Controller):
resource='plans')
try:
plan = objects.Plan(context=context, **plan_properties)
plan.create()
with StartNotification(
context, name=plan.get('name', None)):
plan.create()
QUOTAS.commit(context, reservations)
except Exception:
with excutils.save_and_reraise_exception():
@ -295,6 +303,8 @@ class PlansController(wsgi.Controller):
def update(self, req, id, body):
"""Update a plan."""
context = req.environ['karbor.context']
context.notification = notification.KarborPlanUpdate(
context, request=req)
plan = body['plan']
update_dict = {}
@ -327,9 +337,9 @@ class PlansController(wsgi.Controller):
except exception.PlanNotFound as error:
raise exc.HTTPNotFound(explanation=error.msg)
self._plan_update(context, plan, update_dict)
plan.update(update_dict)
with StartNotification(context, id=id):
self._plan_update(context, plan, update_dict)
plan.update(update_dict)
retval = self._view_builder.detail(req, plan)
return retval

27
karbor/api/v1/providers.py

@ -24,6 +24,8 @@ from karbor.api.openstack import wsgi
from karbor.api.schemas import checkpoints as checkpoint_schema
from karbor.api import validation
from karbor.common import constants
from karbor.common import notification
from karbor.common.notification import StartNotification
from karbor import exception
from karbor.i18n import _
@ -337,6 +339,8 @@ class ProvidersController(wsgi.Controller):
"""Creates a new checkpoint."""
context = req.environ['karbor.context']
context.notification = notification.KarborCheckpointCreate(
context, request=req)
LOG.debug('Create checkpoint request '
'body: %s provider_id:%s', body, provider_id)
@ -404,9 +408,11 @@ class ProvidersController(wsgi.Controller):
else:
checkpoint_id = None
try:
checkpoint_id = self.protection_api.protect(
context, plan, checkpoint_properties)
QUOTAS.commit(context, reservations)
with StartNotification(
context, checkpoint_properties=checkpoint_properties):
checkpoint_id = self.protection_api.protect(
context, plan, checkpoint_properties)
QUOTAS.commit(context, reservations)
except Exception as error:
if not checkpoint_id:
QUOTAS.rollback(context, reservations)
@ -471,6 +477,8 @@ class ProvidersController(wsgi.Controller):
"""Delete a checkpoint."""
context = req.environ['karbor.context']
context.can(provider_policy.CHECKPOINT_DELETE_POLICY)
context.notification = notification.KarborCheckpointDelete(
context, request=req)
LOG.info("Delete checkpoint with id: %s.", checkpoint_id)
LOG.info("provider_id: %s.", provider_id)
@ -484,7 +492,8 @@ class ProvidersController(wsgi.Controller):
project_id = checkpoint.get('project_id')
try:
self.protection_api.delete(context, provider_id, checkpoint_id)
with StartNotification(context, checkpoint_id=checkpoint_id):
self.protection_api.delete(context, provider_id, checkpoint_id)
except exception.DeleteCheckpointNotAllowed as error:
raise exc.HTTPForbidden(explantion=error.msg)
@ -518,6 +527,8 @@ class ProvidersController(wsgi.Controller):
def checkpoints_update(self, req, provider_id, checkpoint_id, body):
"""Reset a checkpoint's state"""
context = req.environ['karbor.context']
context.notification = notification.KarborCheckpointUpdate(
context, request=req)
LOG.info("Reset checkpoint state with id: %s", checkpoint_id)
LOG.info("provider_id: %s.", provider_id)
@ -531,10 +542,12 @@ class ProvidersController(wsgi.Controller):
raise exc.HTTPBadRequest(explanation=msg)
context.can(provider_policy.CHECKPOINT_UPDATE_POLICY)
if body.get("os-resetState"):
state = body["os-resetState"]["state"]
return self._checkpoint_reset_state(
context, provider_id, checkpoint_id, state)
with StartNotification(context, checkpoint_id=checkpoint_id):
state = body["os-resetState"]["state"]
return self._checkpoint_reset_state(
context, provider_id, checkpoint_id, state)
else:
msg = _("Invalid input.")
raise exc.HTTPBadRequest(explanation=msg)

7
karbor/api/v1/restores.py

@ -23,6 +23,8 @@ from karbor.api.openstack import wsgi
from karbor.api.schemas import restores as restore_schema
from karbor.api import validation
from karbor.common import constants
from karbor.common import notification
from karbor.common.notification import StartNotification
from karbor import exception
from karbor.i18n import _
@ -200,6 +202,8 @@ class RestoresController(wsgi.Controller):
LOG.debug('Create restore request body: %s', body)
context = req.environ['karbor.context']
context.can(restore_policy.CREATE_POLICY)
context.notification = notification.KarborRestoreCreate(
context, request=req)
restore = body['restore']
LOG.debug('Create restore request : %s', restore)
@ -221,7 +225,8 @@ class RestoresController(wsgi.Controller):
# call restore rpc API of protection service
try:
self.protection_api.restore(context, restoreobj, restore_auth)
with StartNotification(context, parameters=parameters):
self.protection_api.restore(context, restoreobj, restore_auth)
except exception.AccessCheckpointNotAllowed as error:
raise exc.HTTPForbidden(explanation=error.msg)
except Exception:

14
karbor/api/v1/scheduled_operations.py

@ -21,6 +21,8 @@ from karbor.api.openstack import wsgi
from karbor.api.schemas import scheduled_operations as \
scheduled_operation_schema
from karbor.api import validation
from karbor.common import notification
from karbor.common.notification import StartNotification
from karbor import exception
from karbor.i18n import _
from karbor import objects
@ -90,6 +92,8 @@ class ScheduledOperationController(wsgi.Controller):
context = req.environ['karbor.context']
context.can(scheduled_operation_policy.CREATE_POLICY)
context.notification = notification.KarborScheduledOpsCreate(
context, request=req)
operation_info = body['scheduled_operation']
name = operation_info.get("name", None)
@ -123,7 +127,8 @@ class ScheduledOperationController(wsgi.Controller):
self._raise_unknown_exception(ex)
try:
self._create_scheduled_operation(context, operation)
with StartNotification(context, operation_obj=operation_obj):
self._create_scheduled_operation(context, operation)
except Exception:
try:
operation.destroy()
@ -140,14 +145,17 @@ class ScheduledOperationController(wsgi.Controller):
LOG.debug('Delete scheduled operation(%s) start', id)
context = req.environ['karbor.context']
context.notification = notification.KarborScheduledOpsDelete(
context, request=req)
operation = self._get_operation_by_id(context, id, ['trigger'])
trigger = operation.trigger
context.can(scheduled_operation_policy.DELETE_POLICY, operation)
try:
self.operationengine_api.delete_scheduled_operation(
context, id, trigger.id)
with StartNotification(context, id=id):
self.operationengine_api.delete_scheduled_operation(
context, id, trigger.id)
except (exception.ScheduledOperationStateNotFound,
exception.TriggerNotFound,

25
karbor/api/v1/triggers.py

@ -21,6 +21,8 @@ from karbor.api import common
from karbor.api.openstack import wsgi
from karbor.api.schemas import triggers as trigger_schema
from karbor.api import validation
from karbor.common import notification
from karbor.common.notification import StartNotification
from karbor import exception
from karbor.i18n import _
from karbor import objects
@ -88,6 +90,8 @@ class TriggersController(wsgi.Controller):
context = req.environ['karbor.context']
context.can(trigger_policy.CREATE_POLICY)
trigger_info = body['trigger_info']
context.notification = notification.KarborTriggerCreate(
context, request=req)
trigger_name = trigger_info.get("name", None)
trigger_type = trigger_info.get("type", None)
@ -103,10 +107,13 @@ class TriggersController(wsgi.Controller):
'properties': trigger_property,
}
try:
trigger = objects.Trigger(context=context, **trigger_definition)
self.operationengine_api.verify_trigger(context, trigger)
self.operationengine_api.create_trigger(context, trigger)
trigger.create()
with StartNotification(
context, name=trigger_name):
trigger = objects.Trigger(
context=context, **trigger_definition)
self.operationengine_api.verify_trigger(context, trigger)
self.operationengine_api.create_trigger(context, trigger)
trigger.create()
except exception.Invalid as ex:
raise exc.HTTPBadRequest(explanation=ex.msg)
except Exception as ex:
@ -121,6 +128,8 @@ class TriggersController(wsgi.Controller):
context = req.environ['karbor.context']
trigger = self._get_trigger_by_id(context, id)
context.notification = notification.KarborTriggerDelete(
context, request=req)
context.can(trigger_policy.DELETE_POLICY, trigger)
@ -135,7 +144,8 @@ class TriggersController(wsgi.Controller):
raise exc.HTTPFailedDependency(explanation=msg)
try:
self.operationengine_api.delete_trigger(context, id)
with StartNotification(context, id=id):
self.operationengine_api.delete_trigger(context, id)
except exception.TriggerNotFound as ex:
pass
except (exception.DeleteTriggerNotAllowed,
@ -152,6 +162,8 @@ class TriggersController(wsgi.Controller):
context = req.environ['karbor.context']
trigger = self._get_trigger_by_id(context, id)
context.notification = notification.KarborTriggerUpdate(
context, request=req)
context.can(trigger_policy.UPDATE_POLICY, trigger)
@ -177,7 +189,8 @@ class TriggersController(wsgi.Controller):
except (exception.TriggerNotFound, Exception) as ex:
self._raise_unknown_exception(ex)
try:
trigger.save()
with StartNotification(context, id=id):
trigger.save()
except Exception as ex:
self._raise_unknown_exception(ex)

349
karbor/common/notification.py

@ -0,0 +1,349 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""The notification module."""
import abc
import copy
import traceback
from karbor import exception
from karbor.i18n import _
from karbor import rpc
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class EndNotification(object):
@property
def _notifier(self):
"""Returns the notification for Karbor API."""
return (self.context.notification)
def __init__(self, context, **kwargs):
self.context = context
self.context.notification.payload.update(kwargs)
def __enter__(self):
return self.context.notification
def __exit__(self, etype, value, tb):
if etype:
message = str(value)
exception = traceback.format_exception(etype, value, tb)
self._notifier.notify_exc_info(message, exception)
else:
self._notifier.notify_end()
class StartNotification(EndNotification):
def __enter__(self):
self.context.notification.notify_start()
return super(StartNotification, self).__enter__()
class KaborAPINotification(object):
"""The traits of karbor.* notifications."""
event_type_format = 'karbor.%s.%s'
notify_callback = None
@classmethod
def register_notify_callback(cls, callback):
"""Callback when a notification is sent out."""
cls.notify_callback = callback
@abc.abstractmethod
def event_type(self):
'Returns the event type (like "create" for karbor.create.start)'
pass
@abc.abstractmethod
def required_start_traits(self):
'Returns list of required traits for start notification'
pass
def optional_start_traits(self):
'Returns list of optional traits for start notification'
return []
def required_end_traits(self):
'Returns list of required traits for end notification'
return []
def optional_end_traits(self):
'Returns list of optional traits for end notification'
return []
def required_error_traits(self):
'Returns list of required traits for error notification'
return ['message', 'exception']
def optional_error_traits(self):
'Returns list of optional traits for error notification'
return ['id']
def required_base_traits(self):
return ['tenant_id', 'client_ip', 'request_id']
@property
def request_id(self):
return self.payload['request_id']
def __init__(self, context, **kwargs):
self.context = context
self.needs_end_notification = True
self.payload = {}
if 'request' in kwargs:
request = kwargs.pop('request')
self.payload.update({
'request_id': context.request_id,
'client_ip': request.remote_addr,
'tenant_id': context.tenant,
})
elif 'request_id' not in kwargs:
raise exception.InvalidInput(
reason="Notification must include 'request' property")
self.payload.update(kwargs)
def serialize(self, context):
return self.payload
def validate(self, required_traits):
required_keys = set(required_traits)
provided_keys = set(self.payload.keys())
if not required_keys.issubset(provided_keys):
msg = (_("The following required keys not defined for"
" notification %(name)s: %(keys)s")
% {'name': self.__class__.__name__,
'keys': list(required_keys - provided_keys)})
raise exception.InvalidInput(reason=msg)
def _notify(self, event_qualifier, required_traits, optional_traits,
**kwargs):
self.payload.update(kwargs)
self.validate(self.required_base_traits() + required_traits)
available_values = self.serialize(self.context)
payload = {k: available_values[k]
for k in self.required_base_traits() + required_traits}
for k in optional_traits:
if k in available_values:
payload[k] = available_values[k]
qualified_event_type = (KaborAPINotification.event_type_format
% (self.event_type(), event_qualifier))
LOG.debug('Sending event: %(event_type)s, %(payload)s',
{'event_type': qualified_event_type, 'payload': payload})
context = copy.copy(self.context)
del context.notification
notifier = rpc.get_notifier()
notifier.info(context, qualified_event_type, self.payload)
if self.notify_callback:
self.notify_callback(event_qualifier)
def notify_start(self, **kwargs):
self._notify('start', self.required_start_traits(),
self.optional_start_traits(), **kwargs)
def notify_end(self, **kwargs):
if self.needs_end_notification:
self._notify('end', self.required_end_traits(),
self.optional_end_traits(), **kwargs)
def notify_exc_info(self, message, exception):
self.payload.update({
'message': message,
'exception': exception
})
self._notify('error', self.required_error_traits(),
self.optional_error_traits())
class KarborPlanCreate(KaborAPINotification):
@abc.abstractmethod
def event_type(self):
return 'plan_create'
@abc.abstractmethod
def required_start_traits(self):
return ['name']
def optional_start_traits(self):
return ['parameters']
def required_end_traits(self):
return ['name']
class KarborPlanDelete(KaborAPINotification):
@abc.abstractmethod
def event_type(self):
return 'plan_delete'
@abc.abstractmethod
def required_start_traits(self):
return ['id']
class KarborPlanUpdate(KaborAPINotification):
@abc.abstractmethod
def event_type(self):
return 'plan_update'
@abc.abstractmethod
def required_start_traits(self):
return ['id']
class KarborTriggerDelete(KaborAPINotification):
@abc.abstractmethod
def event_type(self):
return 'trigger_delete'
@abc.abstractmethod
def required_start_traits(self):
return ['id']
class KarborTriggerCreate(KaborAPINotification):
@abc.abstractmethod
def event_type(self):
return 'trigger_create'
@abc.abstractmethod
def required_start_traits(self):
return ['name']
def optional_start_traits(self):
return ['parameters']
def required_end_traits(self):
return ['name']
class KarborTriggerUpdate(KaborAPINotification):
@abc.abstractmethod
def event_type(self):
return 'trigger_update'
@abc.abstractmethod
def required_start_traits(self):
return ['id']
class KarborRestoreDelete(KaborAPINotification):
@abc.abstractmethod
def event_type(self):
return 'restore_delete'
@abc.abstractmethod
def required_start_traits(self):
return ['id']
class KarborRestoreCreate(KaborAPINotification):
@abc.abstractmethod
def event_type(self):
return 'restore_create'
@abc.abstractmethod
def required_start_traits(self):
return ['parameters']
def required_end_traits(self):
return ['parameters']
class KarborCheckpointCreate(KaborAPINotification):
@abc.abstractmethod
def event_type(self):
return 'checkpoint_create'
@abc.abstractmethod
def required_start_traits(self):
return ['checkpoint_properties']
def required_end_traits(self):
return ['checkpoint_properties']
class KarborCheckpointDelete(KaborAPINotification):
@abc.abstractmethod
def event_type(self):
return 'checkpoint_delete'
@abc.abstractmethod
def required_start_traits(self):
return ['checkpoint_id']
def required_end_traits(self):
return ['checkpoint_id']
class KarborCheckpointUpdate(KaborAPINotification):
@abc.abstractmethod
def event_type(self):
return 'checkpoint_update'
@abc.abstractmethod
def required_start_traits(self):
return ['checkpoint_id']
class KarborScheduledOpsCreate(KaborAPINotification):
@abc.abstractmethod
def event_type(self):
return 'scheduled_operation_create'
@abc.abstractmethod
def required_start_traits(self):
return ['operation_obj']
def required_end_traits(self):
return ['operation_obj']
class KarborScheduledOpsDelete(KaborAPINotification):
@abc.abstractmethod
def event_type(self):
return 'scheduled_operation_delete'
@abc.abstractmethod
def required_start_traits(self):
return ['id']
def required_end_traits(self):
return ['id']

187
karbor/tests/unit/common/test_notification.py

@ -0,0 +1,187 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""The notification module."""
from mock import Mock
from mock import patch
from karbor.common import notification
from karbor.common.notification import EndNotification
from karbor.common.notification import StartNotification
from karbor import context
from karbor import exception
from karbor import rpc
from karbor.tests import base
class TestEndNotification(base.TestCase):
def setUp(self):
super(TestEndNotification, self).setUp()
self.context = KarborTestContext(self)
def test_call(self):
with patch.object(self.context, "notification") as notification:
with EndNotification(self.context):
pass
self.assertTrue(notification.notify_end.called)
def server_exception(self, server_type):
with patch.object(self.context, "notification") as notification:
try:
with EndNotification(self.context):
raise exception.InvalidInput
except Exception:
self.assertTrue(notification.notify_exc_info.called)
class KarborTestContext(context.RequestContext):
def __init__(self, test_case, **kwargs):
super(KarborTestContext, self).__init__(user_id='demo',
project_id='abcd',
auth_token='efgh')
self.notification = KarborTestNotification(
self, request_id='req_id')
class TestStartNotification(base.TestCase):
def setUp(self):
super(TestStartNotification, self).setUp()
self.context = KarborTestContext(self)
def test_call(self):
with patch.object(self.context, "notification") as notification:
with StartNotification(self.context):
pass
self.assertTrue(notification.notify_start.called)
class KarborTestNotification(notification.KaborAPINotification):
def event_type(self):
return 'plan_test'
def required_start_traits(self):
return ['name']
def optional_start_traits(self):
return ['parameters']
def required_end_traits(self):
return ['name']
class TestKarborNotification(base.TestCase):
def setUp(self):
super(TestKarborNotification, self).setUp()
self.test_n = KarborTestNotification(Mock(), request=Mock())
def test_missing_required_start_traits(self):
self.assertRaisesRegex(exception.InvalidInput,
self.test_n.required_start_traits()[0],
self.test_n.notify_start)
def test_invalid_start_traits(self):
self.assertRaisesRegex(exception.InvalidInput,
"The following required keys",
self.test_n.notify_start, foo='bar')
def test_missing_required_end_traits(self):
self.assertRaisesRegex(exception.InvalidInput,
self.test_n.required_end_traits()[0],
self.test_n.notify_end)
def test_invalid_end_traits(self):
self.assertRaisesRegex(exception.InvalidInput,
"The following required keys",
self.test_n.notify_end, foo='bar')
def test_missing_required_error_traits(self):
self.assertRaisesRegex(exception.InvalidInput,
self.test_n.required_error_traits()[0],
self.test_n._notify, 'error',
self.test_n.required_error_traits(), [])
@patch.object(rpc, 'get_notifier')
def test_start_event(self, notifier):
self.test_n.notify_start(name='foo')
self.assertTrue(notifier().info.called)
a, _ = notifier().info.call_args
self.assertEqual('karbor.plan_test.start', a[1])
@patch.object(rpc, 'get_notifier')
def test_end_event(self, notifier):
self.test_n.notify_end(name='foo')
self.assertTrue(notifier().info.called)
a, _ = notifier().info.call_args
self.assertEqual('karbor.plan_test.end', a[1])
@patch.object(rpc, 'get_notifier')
def test_verify_base_values(self, notifier):
self.test_n.notify_start(name='foo')
self.assertTrue(notifier().info.called)
a, _ = notifier().info.call_args
payload = a[2]
self.assertIn('client_ip', payload)
self.assertIn('request_id', payload)
self.assertIn('tenant_id', payload)
@patch.object(rpc, 'get_notifier')
def test_verify_required_start_args(self, notifier):
self.test_n.notify_start(name='foo')
self.assertTrue(notifier().info.called)
a, _ = notifier().info.call_args
payload = a[2]
self.assertIn('name', payload)
@patch.object(rpc, 'get_notifier')
def test_verify_optional_start_args(self, notifier):
self.test_n.notify_start(name='foo', parameters="test")
self.assertTrue(notifier().info.called)
a, _ = notifier().info.call_args
payload = a[2]
self.assertIn('parameters', payload)
@patch.object(rpc, 'get_notifier')
def test_verify_required_end_args(self, notifier):
self.test_n.notify_end(name='foo')
self.assertTrue(notifier().info.called)
a, _ = notifier().info.call_args
payload = a[2]
self.assertIn('name', payload)
def _test_notify_callback(self, fn, *args, **kwargs):
with patch.object(rpc, 'get_notifier') as notifier:
mock_callback = Mock()
self.test_n.register_notify_callback(mock_callback)
mock_context = Mock()
mock_context.notification = Mock()
self.test_n.context = mock_context
fn(*args, **kwargs)
self.assertTrue(notifier().info.called)
self.assertTrue(mock_callback.called)
self.test_n.register_notify_callback(None)
def test_notify_callback(self):
required_keys = {
'name': 'name',
'parameters': 'parameters',
}
self._test_notify_callback(self.test_n.notify_start,
**required_keys)
self._test_notify_callback(self.test_n.notify_end,
**required_keys)
self._test_notify_callback(self.test_n.notify_exc_info,
'error', 'exc')
Loading…
Cancel
Save