Emit notifications when bay operations get executed

Magnum needs to emit notifications for resource tracking, monitoring,
metering and auditing purposes so ceilometer can capture the events
and generate samples.

This commit leverage oslo.notify to send notifications when bay
resource operations are executed.

Change-Id: I4b07539822a75c069d9da8932dc0ff10a02e0058
Partially-Implements: blueprint magnum-notifications
This commit is contained in:
Wenzhi Yu 2016-05-12 18:26:04 +08:00
parent 3cbc3b9b44
commit 4c62436275
6 changed files with 287 additions and 0 deletions

View File

@ -19,6 +19,7 @@ from heatclient import exc
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from oslo_service import loopingcall from oslo_service import loopingcall
from pycadf import cadftaxonomy as taxonomy
import six import six
from magnum.common import clients from magnum.common import clients
@ -130,11 +131,15 @@ class Handler(object):
trust_manager.create_trustee_and_trust(osc, bay) trust_manager.create_trustee_and_trust(osc, bay)
# Generate certificate and set the cert reference to bay # Generate certificate and set the cert reference to bay
cert_manager.generate_certificates_to_bay(bay) cert_manager.generate_certificates_to_bay(bay)
conductor_utils.notify_about_bay_operation(
context, taxonomy.ACTION_CREATE, taxonomy.OUTCOME_PENDING)
created_stack = _create_stack(context, osc, bay, created_stack = _create_stack(context, osc, bay,
bay_create_timeout) bay_create_timeout)
except Exception as e: except Exception as e:
cert_manager.delete_certificates_from_bay(bay) cert_manager.delete_certificates_from_bay(bay)
trust_manager.delete_trustee_and_trust(osc, context, bay) trust_manager.delete_trustee_and_trust(osc, context, bay)
conductor_utils.notify_about_bay_operation(
context, taxonomy.ACTION_CREATE, taxonomy.OUTCOME_FAILURE)
if isinstance(e, exc.HTTPBadRequest): if isinstance(e, exc.HTTPBadRequest):
e = exception.InvalidParameterValue(message=six.text_type(e)) e = exception.InvalidParameterValue(message=six.text_type(e))
@ -165,6 +170,8 @@ class Handler(object):
bay_status.ADOPT_COMPLETE bay_status.ADOPT_COMPLETE
) )
if stack.stack_status not in allow_update_status: if stack.stack_status not in allow_update_status:
conductor_utils.notify_about_bay_operation(
context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_FAILURE)
operation = _('Updating a bay when stack status is ' operation = _('Updating a bay when stack status is '
'"%s"') % stack.stack_status '"%s"') % stack.stack_status
raise exception.NotSupported(operation=operation) raise exception.NotSupported(operation=operation)
@ -175,6 +182,9 @@ class Handler(object):
manager = scale_manager.ScaleManager(context, osc, bay) manager = scale_manager.ScaleManager(context, osc, bay)
conductor_utils.notify_about_bay_operation(
context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_PENDING)
_update_stack(context, osc, bay, manager) _update_stack(context, osc, bay, manager)
self._poll_and_check(osc, bay) self._poll_and_check(osc, bay)
@ -194,6 +204,8 @@ class Handler(object):
# #
# If the exception is unhandled, the original exception will be raised. # If the exception is unhandled, the original exception will be raised.
try: try:
conductor_utils.notify_about_bay_operation(
context, taxonomy.ACTION_DELETE, taxonomy.OUTCOME_PENDING)
osc.heat().stacks.delete(stack_id) osc.heat().stacks.delete(stack_id)
except exc.HTTPNotFound: except exc.HTTPNotFound:
LOG.info(_LI('The stack %s was not be found during bay' LOG.info(_LI('The stack %s was not be found during bay'
@ -204,10 +216,16 @@ class Handler(object):
bay.destroy() bay.destroy()
except exception.BayNotFound: except exception.BayNotFound:
LOG.info(_LI('The bay %s has been deleted by others.'), uuid) LOG.info(_LI('The bay %s has been deleted by others.'), uuid)
conductor_utils.notify_about_bay_operation(
context, taxonomy.ACTION_DELETE, taxonomy.OUTCOME_SUCCESS)
return None return None
except exc.HTTPConflict: except exc.HTTPConflict:
conductor_utils.notify_about_bay_operation(
context, taxonomy.ACTION_DELETE, taxonomy.OUTCOME_FAILURE)
raise exception.OperationInProgress(bay_name=bay.name) raise exception.OperationInProgress(bay_name=bay.name)
except Exception: except Exception:
conductor_utils.notify_about_bay_operation(
context, taxonomy.ACTION_DELETE, taxonomy.OUTCOME_FAILURE)
raise raise
self._poll_and_check(osc, bay) self._poll_and_check(osc, bay)
@ -237,15 +255,29 @@ class HeatPoller(object):
# node_addresses and bay status # node_addresses and bay status
stack = self.openstack_client.heat().stacks.get(self.bay.stack_id) stack = self.openstack_client.heat().stacks.get(self.bay.stack_id)
self.attempts += 1 self.attempts += 1
status_to_event = {
bay_status.DELETE_COMPLETE: taxonomy.ACTION_DELETE,
bay_status.CREATE_COMPLETE: taxonomy.ACTION_CREATE,
bay_status.UPDATE_COMPLETE: taxonomy.ACTION_UPDATE,
bay_status.CREATE_FAILED: taxonomy.ACTION_CREATE,
bay_status.DELETE_FAILED: taxonomy.ACTION_DELETE,
bay_status.UPDATE_FAILED: taxonomy.ACTION_UPDATE
}
# poll_and_check is detached and polling long time to check status, # poll_and_check is detached and polling long time to check status,
# so another user/client can call delete bay/stack. # so another user/client can call delete bay/stack.
if stack.stack_status == bay_status.DELETE_COMPLETE: if stack.stack_status == bay_status.DELETE_COMPLETE:
self._delete_complete() self._delete_complete()
conductor_utils.notify_about_bay_operation(
self.context, status_to_event[stack.stack_status],
taxonomy.OUTCOME_SUCCESS)
raise loopingcall.LoopingCallDone() raise loopingcall.LoopingCallDone()
if stack.stack_status in (bay_status.CREATE_COMPLETE, if stack.stack_status in (bay_status.CREATE_COMPLETE,
bay_status.UPDATE_COMPLETE): bay_status.UPDATE_COMPLETE):
self._sync_bay_and_template_status(stack) self._sync_bay_and_template_status(stack)
conductor_utils.notify_about_bay_operation(
self.context, status_to_event[stack.stack_status],
taxonomy.OUTCOME_SUCCESS)
raise loopingcall.LoopingCallDone() raise loopingcall.LoopingCallDone()
elif stack.stack_status != self.bay.status: elif stack.stack_status != self.bay.status:
self._sync_bay_status(stack) self._sync_bay_status(stack)
@ -255,6 +287,9 @@ class HeatPoller(object):
bay_status.UPDATE_FAILED): bay_status.UPDATE_FAILED):
self._sync_bay_and_template_status(stack) self._sync_bay_and_template_status(stack)
self._bay_failed(stack) self._bay_failed(stack)
conductor_utils.notify_about_bay_operation(
self.context, status_to_event[stack.stack_status],
taxonomy.OUTCOME_FAILURE)
raise loopingcall.LoopingCallDone() raise loopingcall.LoopingCallDone()
# only check max attempts when the stack is being created when # only check max attempts when the stack is being created when
# the timeout hasn't been set. If the timeout has been set then # the timeout hasn't been set. If the timeout has been set then

View File

@ -12,7 +12,13 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from pycadf import cadftaxonomy as taxonomy
from pycadf import cadftype
from pycadf import eventfactory
from pycadf import resource
from magnum.common import clients from magnum.common import clients
from magnum.common import rpc
from magnum.common import utils from magnum.common import utils
from magnum.objects import bay from magnum.objects import bay
from magnum.objects import baymodel from magnum.objects import baymodel
@ -47,3 +53,60 @@ def object_has_stack(context, bay_uuid):
return False return False
return True return True
def _get_request_audit_info(context):
"""Collect audit information about the request used for CADF.
:param context: Request context
:returns: Auditing data about the request
:rtype: :class:'pycadf.Resource'
"""
user_id = None
project_id = None
domain_id = None
if context:
user_id = context.user_id
project_id = context.project_id
domain_id = context.domain_id
initiator = resource.Resource(typeURI=taxonomy.ACCOUNT_USER)
if user_id:
initiator.user_id = user_id
if project_id:
initiator.project_id = project_id
if domain_id:
initiator.domain_id = domain_id
return initiator
def notify_about_bay_operation(context, action, outcome):
"""Send a notification about bay operation.
:param action: CADF action being audited
:param outcome: CADF outcome
"""
notifier = rpc.get_notifier()
event = eventfactory.EventFactory().new_event(
eventType=cadftype.EVENTTYPE_ACTIVITY,
outcome=outcome,
action=action,
initiator=_get_request_audit_info(context),
target=resource.Resource(typeURI='service/magnum/bay'),
observer=resource.Resource(typeURI='service/magnum/bay'))
service = 'magnum'
event_type = '%(service)s.bay.%(action)s' % {
'service': service, 'action': action}
payload = event.as_dict()
if outcome == taxonomy.OUTCOME_FAILURE:
method = notifier.error
else:
method = notifier.info
method(context, event_type, payload)

View File

@ -16,9 +16,11 @@
import copy import copy
import os import os
import fixtures
import mock import mock
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
import oslo_messaging
from oslotest import base from oslotest import base
import pecan import pecan
import testscenarios import testscenarios
@ -26,6 +28,7 @@ import testscenarios
from magnum.common import context as magnum_context from magnum.common import context as magnum_context
from magnum.objects import base as objects_base from magnum.objects import base as objects_base
from magnum.tests import conf_fixture from magnum.tests import conf_fixture
from magnum.tests import fake_notifier
from magnum.tests import policy_fixture from magnum.tests import policy_fixture
@ -67,6 +70,11 @@ class TestCase(base.BaseTestCase):
self.policy = self.useFixture(policy_fixture.PolicyFixture()) self.policy = self.useFixture(policy_fixture.PolicyFixture())
self.useFixture(fixtures.MockPatchObject(
oslo_messaging, 'Notifier',
fake_notifier.FakeNotifier))
self.addCleanup(fake_notifier.reset)
def make_context(*args, **kwargs): def make_context(*args, **kwargs):
# If context hasn't been constructed with token_info # If context hasn't been constructed with token_info
if not kwargs.get('auth_token_info'): if not kwargs.get('auth_token_info'):

View File

@ -0,0 +1,49 @@
# Copyright 2016 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import functools
NOTIFICATIONS = []
def reset():
del NOTIFICATIONS[:]
FakeMessage = collections.namedtuple('Message', [
'publisher_id', 'priority', 'event_type', 'payload', 'context'])
class FakeNotifier(object):
def __init__(self, transport, publisher_id=None, driver=None,
topic=None, serializer=None, retry=None):
self.transport = transport
self.publisher_id = publisher_id or 'fake.id'
for priority in ('debug', 'info', 'warn', 'error', 'critical'):
setattr(
self, priority,
functools.partial(self._notify, priority=priority.upper()))
def prepare(self, publisher_id=None):
if publisher_id is None:
publisher_id = self.publisher_id
return self.__class__(self.transport, publisher_id=publisher_id)
def _notify(self, ctxt, event_type, payload, priority):
msg = FakeMessage(self.publisher_id, priority, event_type,
payload, ctxt)
NOTIFICATIONS.append(msg)

View File

@ -22,12 +22,14 @@ import mock
from mock import patch from mock import patch
from oslo_config import cfg from oslo_config import cfg
from oslo_service import loopingcall from oslo_service import loopingcall
from pycadf import cadftaxonomy as taxonomy
from magnum.common import exception from magnum.common import exception
from magnum.conductor.handlers import bay_conductor from magnum.conductor.handlers import bay_conductor
from magnum import objects from magnum import objects
from magnum.objects.fields import BayStatus as bay_status from magnum.objects.fields import BayStatus as bay_status
from magnum.tests import base from magnum.tests import base
from magnum.tests import fake_notifier
from magnum.tests.unit.db import base as db_base from magnum.tests.unit.db import base as db_base
from magnum.tests.unit.db import utils from magnum.tests.unit.db import utils
@ -66,6 +68,13 @@ class TestHandler(db_base.DbTestCase):
self.bay.node_count = 2 self.bay.node_count = 2
self.handler.bay_update(self.context, self.bay) self.handler.bay_update(self.context, self.bay)
notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(1, len(notifications))
self.assertEqual(
'magnum.bay.update', notifications[0].event_type)
self.assertEqual(
taxonomy.OUTCOME_PENDING, notifications[0].payload['outcome'])
mock_update_stack.assert_called_once_with( mock_update_stack.assert_called_once_with(
self.context, mock_openstack_client, self.bay, self.context, mock_openstack_client, self.bay,
mock_scale_manager.return_value) mock_scale_manager.return_value)
@ -93,6 +102,13 @@ class TestHandler(db_base.DbTestCase):
self.assertRaises(exception.NotSupported, self.handler.bay_update, self.assertRaises(exception.NotSupported, self.handler.bay_update,
self.context, self.bay) self.context, self.bay)
notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(1, len(notifications))
self.assertEqual(
'magnum.bay.update', notifications[0].event_type)
self.assertEqual(
taxonomy.OUTCOME_FAILURE, notifications[0].payload['outcome'])
bay = objects.Bay.get(self.context, self.bay.uuid) bay = objects.Bay.get(self.context, self.bay.uuid)
self.assertEqual(1, bay.node_count) self.assertEqual(1, bay.node_count)
@ -118,6 +134,13 @@ class TestHandler(db_base.DbTestCase):
self.bay.node_count = 2 self.bay.node_count = 2
self.handler.bay_update(self.context, self.bay) self.handler.bay_update(self.context, self.bay)
notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(1, len(notifications))
self.assertEqual(
'magnum.bay.update', notifications[0].event_type)
self.assertEqual(
taxonomy.OUTCOME_PENDING, notifications[0].payload['outcome'])
mock_update_stack.assert_called_once_with( mock_update_stack.assert_called_once_with(
self.context, mock_openstack_client, self.bay, self.context, mock_openstack_client, self.bay,
mock_scale_manager.return_value) mock_scale_manager.return_value)
@ -182,6 +205,13 @@ class TestHandler(db_base.DbTestCase):
bay = self.handler.bay_create(self.context, bay = self.handler.bay_create(self.context,
self.bay, timeout) self.bay, timeout)
notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(1, len(notifications))
self.assertEqual(
'magnum.bay.create', notifications[0].event_type)
self.assertEqual(
taxonomy.OUTCOME_PENDING, notifications[0].payload['outcome'])
mock_create_stack.assert_called_once_with(self.context, mock_create_stack.assert_called_once_with(self.context,
mock.sentinel.osc, mock.sentinel.osc,
self.bay, timeout) self.bay, timeout)
@ -241,6 +271,17 @@ class TestHandler(db_base.DbTestCase):
exception.InvalidParameterValue exception.InvalidParameterValue
) )
notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(2, len(notifications))
self.assertEqual(
'magnum.bay.create', notifications[0].event_type)
self.assertEqual(
taxonomy.OUTCOME_PENDING, notifications[0].payload['outcome'])
self.assertEqual(
'magnum.bay.create', notifications[1].event_type)
self.assertEqual(
taxonomy.OUTCOME_FAILURE, notifications[1].payload['outcome'])
@patch('magnum.conductor.handlers.bay_conductor.trust_manager') @patch('magnum.conductor.handlers.bay_conductor.trust_manager')
@patch('magnum.conductor.handlers.bay_conductor.cert_manager') @patch('magnum.conductor.handlers.bay_conductor.cert_manager')
@patch('magnum.common.clients.OpenStackClients') @patch('magnum.common.clients.OpenStackClients')
@ -257,6 +298,13 @@ class TestHandler(db_base.DbTestCase):
exception.CertificatesToBayFailed exception.CertificatesToBayFailed
) )
notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(1, len(notifications))
self.assertEqual(
'magnum.bay.create', notifications[0].event_type)
self.assertEqual(
taxonomy.OUTCOME_FAILURE, notifications[0].payload['outcome'])
@patch('magnum.conductor.handlers.bay_conductor.trust_manager') @patch('magnum.conductor.handlers.bay_conductor.trust_manager')
@patch('magnum.conductor.handlers.bay_conductor.cert_manager') @patch('magnum.conductor.handlers.bay_conductor.cert_manager')
@patch('magnum.conductor.handlers.bay_conductor._create_stack') @patch('magnum.conductor.handlers.bay_conductor._create_stack')
@ -276,6 +324,13 @@ class TestHandler(db_base.DbTestCase):
False False
) )
notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(1, len(notifications))
self.assertEqual(
'magnum.bay.create', notifications[0].event_type)
self.assertEqual(
taxonomy.OUTCOME_FAILURE, notifications[0].payload['outcome'])
@patch('magnum.conductor.handlers.bay_conductor.trust_manager') @patch('magnum.conductor.handlers.bay_conductor.trust_manager')
@patch('magnum.conductor.handlers.bay_conductor.cert_manager') @patch('magnum.conductor.handlers.bay_conductor.cert_manager')
@patch('magnum.conductor.handlers.bay_conductor._create_stack') @patch('magnum.conductor.handlers.bay_conductor._create_stack')
@ -301,12 +356,34 @@ class TestHandler(db_base.DbTestCase):
exception.InvalidParameterValue exception.InvalidParameterValue
) )
notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(2, len(notifications))
self.assertEqual(
'magnum.bay.create', notifications[0].event_type)
self.assertEqual(
taxonomy.OUTCOME_PENDING, notifications[0].payload['outcome'])
self.assertEqual(
'magnum.bay.create', notifications[1].event_type)
self.assertEqual(
taxonomy.OUTCOME_FAILURE, notifications[1].payload['outcome'])
@patch('magnum.common.clients.OpenStackClients') @patch('magnum.common.clients.OpenStackClients')
def test_bay_delete(self, mock_openstack_client_class): def test_bay_delete(self, mock_openstack_client_class):
osc = mock.MagicMock() osc = mock.MagicMock()
mock_openstack_client_class.return_value = osc mock_openstack_client_class.return_value = osc
osc.heat.side_effect = exc.HTTPNotFound osc.heat.side_effect = exc.HTTPNotFound
self.handler.bay_delete(self.context, self.bay.uuid) self.handler.bay_delete(self.context, self.bay.uuid)
notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(2, len(notifications))
self.assertEqual(
'magnum.bay.delete', notifications[0].event_type)
self.assertEqual(
taxonomy.OUTCOME_PENDING, notifications[0].payload['outcome'])
self.assertEqual(
'magnum.bay.delete', notifications[1].event_type)
self.assertEqual(
taxonomy.OUTCOME_SUCCESS, notifications[1].payload['outcome'])
# The bay has been destroyed # The bay has been destroyed
self.assertRaises(exception.BayNotFound, self.assertRaises(exception.BayNotFound,
objects.Bay.get, self.context, self.bay.uuid) objects.Bay.get, self.context, self.bay.uuid)
@ -321,6 +398,17 @@ class TestHandler(db_base.DbTestCase):
self.context, self.context,
self.bay.uuid) self.bay.uuid)
notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(2, len(notifications))
self.assertEqual(
'magnum.bay.delete', notifications[0].event_type)
self.assertEqual(
taxonomy.OUTCOME_PENDING, notifications[0].payload['outcome'])
self.assertEqual(
'magnum.bay.delete', notifications[1].event_type)
self.assertEqual(
taxonomy.OUTCOME_FAILURE, notifications[1].payload['outcome'])
class TestHeatPoller(base.TestCase): class TestHeatPoller(base.TestCase):
@ -341,6 +429,49 @@ class TestHeatPoller(base.TestCase):
poller = bay_conductor.HeatPoller(mock_openstack_client, bay) poller = bay_conductor.HeatPoller(mock_openstack_client, bay)
return (mock_heat_stack, bay, poller) return (mock_heat_stack, bay, poller)
def test_poll_and_check_send_notification(self):
mock_heat_stack, bay, poller = self.setup_poll_test()
mock_heat_stack.stack_status = bay_status.CREATE_COMPLETE
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
mock_heat_stack.stack_status = bay_status.CREATE_FAILED
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
mock_heat_stack.stack_status = bay_status.DELETE_COMPLETE
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
mock_heat_stack.stack_status = bay_status.DELETE_FAILED
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
mock_heat_stack.stack_status = bay_status.UPDATE_COMPLETE
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
mock_heat_stack.stack_status = bay_status.UPDATE_FAILED
self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check)
self.assertEqual(6, poller.attempts)
notifications = fake_notifier.NOTIFICATIONS
self.assertEqual(6, len(notifications))
self.assertEqual(
'magnum.bay.create', notifications[0].event_type)
self.assertEqual(
taxonomy.OUTCOME_SUCCESS, notifications[0].payload['outcome'])
self.assertEqual(
'magnum.bay.create', notifications[1].event_type)
self.assertEqual(
taxonomy.OUTCOME_FAILURE, notifications[1].payload['outcome'])
self.assertEqual(
'magnum.bay.delete', notifications[2].event_type)
self.assertEqual(
taxonomy.OUTCOME_SUCCESS, notifications[2].payload['outcome'])
self.assertEqual(
'magnum.bay.delete', notifications[3].event_type)
self.assertEqual(
taxonomy.OUTCOME_FAILURE, notifications[3].payload['outcome'])
self.assertEqual(
'magnum.bay.update', notifications[4].event_type)
self.assertEqual(
taxonomy.OUTCOME_SUCCESS, notifications[4].payload['outcome'])
self.assertEqual(
'magnum.bay.update', notifications[5].event_type)
self.assertEqual(
taxonomy.OUTCOME_FAILURE, notifications[5].payload['outcome'])
def test_poll_no_save(self): def test_poll_no_save(self):
mock_heat_stack, bay, poller = self.setup_poll_test() mock_heat_stack, bay, poller = self.setup_poll_test()

View File

@ -40,6 +40,7 @@ oslo.reports>=0.6.0 # Apache-2.0
paramiko>=1.16.0 # LGPL paramiko>=1.16.0 # LGPL
pbr>=1.6 # Apache-2.0 pbr>=1.6 # Apache-2.0
pecan>=1.0.0 # BSD pecan>=1.0.0 # BSD
pycadf!=2.0.0,>=1.1.0 # Apache-2.0
python-barbicanclient>=4.0.0 # Apache-2.0 python-barbicanclient>=4.0.0 # Apache-2.0
python-glanceclient>=2.0.0 # Apache-2.0 python-glanceclient>=2.0.0 # Apache-2.0
python-heatclient>=0.6.0 # Apache-2.0 python-heatclient>=0.6.0 # Apache-2.0