diff --git a/magnum/conductor/handlers/bay_conductor.py b/magnum/conductor/handlers/bay_conductor.py index 38ae894525..4a969ea861 100644 --- a/magnum/conductor/handlers/bay_conductor.py +++ b/magnum/conductor/handlers/bay_conductor.py @@ -19,6 +19,7 @@ from heatclient import exc from oslo_config import cfg from oslo_log import log as logging from oslo_service import loopingcall +from pycadf import cadftaxonomy as taxonomy import six from magnum.common import clients @@ -130,11 +131,15 @@ class Handler(object): trust_manager.create_trustee_and_trust(osc, bay) # Generate certificate and set the cert reference to bay cert_manager.generate_certificates_to_bay(bay) + conductor_utils.notify_about_bay_operation( + context, taxonomy.ACTION_CREATE, taxonomy.OUTCOME_PENDING) created_stack = _create_stack(context, osc, bay, bay_create_timeout) except Exception as e: cert_manager.delete_certificates_from_bay(bay) trust_manager.delete_trustee_and_trust(osc, context, bay) + conductor_utils.notify_about_bay_operation( + context, taxonomy.ACTION_CREATE, taxonomy.OUTCOME_FAILURE) if isinstance(e, exc.HTTPBadRequest): e = exception.InvalidParameterValue(message=six.text_type(e)) @@ -165,6 +170,8 @@ class Handler(object): bay_status.ADOPT_COMPLETE ) if stack.stack_status not in allow_update_status: + conductor_utils.notify_about_bay_operation( + context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_FAILURE) operation = _('Updating a bay when stack status is ' '"%s"') % stack.stack_status raise exception.NotSupported(operation=operation) @@ -175,6 +182,9 @@ class Handler(object): manager = scale_manager.ScaleManager(context, osc, bay) + conductor_utils.notify_about_bay_operation( + context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_PENDING) + _update_stack(context, osc, bay, manager) self._poll_and_check(osc, bay) @@ -194,6 +204,8 @@ class Handler(object): # # If the exception is unhandled, the original exception will be raised. try: + conductor_utils.notify_about_bay_operation( + context, taxonomy.ACTION_DELETE, taxonomy.OUTCOME_PENDING) osc.heat().stacks.delete(stack_id) except exc.HTTPNotFound: LOG.info(_LI('The stack %s was not be found during bay' @@ -204,10 +216,16 @@ class Handler(object): bay.destroy() except exception.BayNotFound: LOG.info(_LI('The bay %s has been deleted by others.'), uuid) + conductor_utils.notify_about_bay_operation( + context, taxonomy.ACTION_DELETE, taxonomy.OUTCOME_SUCCESS) return None except exc.HTTPConflict: + conductor_utils.notify_about_bay_operation( + context, taxonomy.ACTION_DELETE, taxonomy.OUTCOME_FAILURE) raise exception.OperationInProgress(bay_name=bay.name) except Exception: + conductor_utils.notify_about_bay_operation( + context, taxonomy.ACTION_DELETE, taxonomy.OUTCOME_FAILURE) raise self._poll_and_check(osc, bay) @@ -237,15 +255,29 @@ class HeatPoller(object): # node_addresses and bay status stack = self.openstack_client.heat().stacks.get(self.bay.stack_id) self.attempts += 1 + status_to_event = { + bay_status.DELETE_COMPLETE: taxonomy.ACTION_DELETE, + bay_status.CREATE_COMPLETE: taxonomy.ACTION_CREATE, + bay_status.UPDATE_COMPLETE: taxonomy.ACTION_UPDATE, + bay_status.CREATE_FAILED: taxonomy.ACTION_CREATE, + bay_status.DELETE_FAILED: taxonomy.ACTION_DELETE, + bay_status.UPDATE_FAILED: taxonomy.ACTION_UPDATE + } # poll_and_check is detached and polling long time to check status, # so another user/client can call delete bay/stack. if stack.stack_status == bay_status.DELETE_COMPLETE: self._delete_complete() + conductor_utils.notify_about_bay_operation( + self.context, status_to_event[stack.stack_status], + taxonomy.OUTCOME_SUCCESS) raise loopingcall.LoopingCallDone() if stack.stack_status in (bay_status.CREATE_COMPLETE, bay_status.UPDATE_COMPLETE): self._sync_bay_and_template_status(stack) + conductor_utils.notify_about_bay_operation( + self.context, status_to_event[stack.stack_status], + taxonomy.OUTCOME_SUCCESS) raise loopingcall.LoopingCallDone() elif stack.stack_status != self.bay.status: self._sync_bay_status(stack) @@ -255,6 +287,9 @@ class HeatPoller(object): bay_status.UPDATE_FAILED): self._sync_bay_and_template_status(stack) self._bay_failed(stack) + conductor_utils.notify_about_bay_operation( + self.context, status_to_event[stack.stack_status], + taxonomy.OUTCOME_FAILURE) raise loopingcall.LoopingCallDone() # only check max attempts when the stack is being created when # the timeout hasn't been set. If the timeout has been set then diff --git a/magnum/conductor/utils.py b/magnum/conductor/utils.py index 2cd5532001..6ddb9e0896 100644 --- a/magnum/conductor/utils.py +++ b/magnum/conductor/utils.py @@ -12,7 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +from pycadf import cadftaxonomy as taxonomy +from pycadf import cadftype +from pycadf import eventfactory +from pycadf import resource + from magnum.common import clients +from magnum.common import rpc from magnum.common import utils from magnum.objects import bay from magnum.objects import baymodel @@ -47,3 +53,60 @@ def object_has_stack(context, bay_uuid): return False return True + + +def _get_request_audit_info(context): + """Collect audit information about the request used for CADF. + + :param context: Request context + :returns: Auditing data about the request + :rtype: :class:'pycadf.Resource' + """ + user_id = None + project_id = None + domain_id = None + + if context: + user_id = context.user_id + project_id = context.project_id + domain_id = context.domain_id + + initiator = resource.Resource(typeURI=taxonomy.ACCOUNT_USER) + + if user_id: + initiator.user_id = user_id + + if project_id: + initiator.project_id = project_id + + if domain_id: + initiator.domain_id = domain_id + + return initiator + + +def notify_about_bay_operation(context, action, outcome): + """Send a notification about bay operation. + + :param action: CADF action being audited + :param outcome: CADF outcome + """ + notifier = rpc.get_notifier() + event = eventfactory.EventFactory().new_event( + eventType=cadftype.EVENTTYPE_ACTIVITY, + outcome=outcome, + action=action, + initiator=_get_request_audit_info(context), + target=resource.Resource(typeURI='service/magnum/bay'), + observer=resource.Resource(typeURI='service/magnum/bay')) + service = 'magnum' + event_type = '%(service)s.bay.%(action)s' % { + 'service': service, 'action': action} + payload = event.as_dict() + + if outcome == taxonomy.OUTCOME_FAILURE: + method = notifier.error + else: + method = notifier.info + + method(context, event_type, payload) diff --git a/magnum/tests/base.py b/magnum/tests/base.py index c406a33740..73a2ff54e0 100644 --- a/magnum/tests/base.py +++ b/magnum/tests/base.py @@ -16,9 +16,11 @@ import copy import os +import fixtures import mock from oslo_config import cfg from oslo_log import log +import oslo_messaging from oslotest import base import pecan import testscenarios @@ -26,6 +28,7 @@ import testscenarios from magnum.common import context as magnum_context from magnum.objects import base as objects_base from magnum.tests import conf_fixture +from magnum.tests import fake_notifier from magnum.tests import policy_fixture @@ -67,6 +70,11 @@ class TestCase(base.BaseTestCase): self.policy = self.useFixture(policy_fixture.PolicyFixture()) + self.useFixture(fixtures.MockPatchObject( + oslo_messaging, 'Notifier', + fake_notifier.FakeNotifier)) + self.addCleanup(fake_notifier.reset) + def make_context(*args, **kwargs): # If context hasn't been constructed with token_info if not kwargs.get('auth_token_info'): diff --git a/magnum/tests/fake_notifier.py b/magnum/tests/fake_notifier.py new file mode 100644 index 0000000000..69bc264e25 --- /dev/null +++ b/magnum/tests/fake_notifier.py @@ -0,0 +1,49 @@ +# Copyright 2016 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import functools + + +NOTIFICATIONS = [] + + +def reset(): + del NOTIFICATIONS[:] + + +FakeMessage = collections.namedtuple('Message', [ + 'publisher_id', 'priority', 'event_type', 'payload', 'context']) + + +class FakeNotifier(object): + + def __init__(self, transport, publisher_id=None, driver=None, + topic=None, serializer=None, retry=None): + self.transport = transport + self.publisher_id = publisher_id or 'fake.id' + for priority in ('debug', 'info', 'warn', 'error', 'critical'): + setattr( + self, priority, + functools.partial(self._notify, priority=priority.upper())) + + def prepare(self, publisher_id=None): + if publisher_id is None: + publisher_id = self.publisher_id + return self.__class__(self.transport, publisher_id=publisher_id) + + def _notify(self, ctxt, event_type, payload, priority): + msg = FakeMessage(self.publisher_id, priority, event_type, + payload, ctxt) + NOTIFICATIONS.append(msg) diff --git a/magnum/tests/unit/conductor/handlers/test_bay_conductor.py b/magnum/tests/unit/conductor/handlers/test_bay_conductor.py index 83d884b3f2..e66644516e 100644 --- a/magnum/tests/unit/conductor/handlers/test_bay_conductor.py +++ b/magnum/tests/unit/conductor/handlers/test_bay_conductor.py @@ -22,12 +22,14 @@ import mock from mock import patch from oslo_config import cfg from oslo_service import loopingcall +from pycadf import cadftaxonomy as taxonomy from magnum.common import exception from magnum.conductor.handlers import bay_conductor from magnum import objects from magnum.objects.fields import BayStatus as bay_status from magnum.tests import base +from magnum.tests import fake_notifier from magnum.tests.unit.db import base as db_base from magnum.tests.unit.db import utils @@ -66,6 +68,13 @@ class TestHandler(db_base.DbTestCase): self.bay.node_count = 2 self.handler.bay_update(self.context, self.bay) + notifications = fake_notifier.NOTIFICATIONS + self.assertEqual(1, len(notifications)) + self.assertEqual( + 'magnum.bay.update', notifications[0].event_type) + self.assertEqual( + taxonomy.OUTCOME_PENDING, notifications[0].payload['outcome']) + mock_update_stack.assert_called_once_with( self.context, mock_openstack_client, self.bay, mock_scale_manager.return_value) @@ -93,6 +102,13 @@ class TestHandler(db_base.DbTestCase): self.assertRaises(exception.NotSupported, self.handler.bay_update, self.context, self.bay) + notifications = fake_notifier.NOTIFICATIONS + self.assertEqual(1, len(notifications)) + self.assertEqual( + 'magnum.bay.update', notifications[0].event_type) + self.assertEqual( + taxonomy.OUTCOME_FAILURE, notifications[0].payload['outcome']) + bay = objects.Bay.get(self.context, self.bay.uuid) self.assertEqual(1, bay.node_count) @@ -118,6 +134,13 @@ class TestHandler(db_base.DbTestCase): self.bay.node_count = 2 self.handler.bay_update(self.context, self.bay) + notifications = fake_notifier.NOTIFICATIONS + self.assertEqual(1, len(notifications)) + self.assertEqual( + 'magnum.bay.update', notifications[0].event_type) + self.assertEqual( + taxonomy.OUTCOME_PENDING, notifications[0].payload['outcome']) + mock_update_stack.assert_called_once_with( self.context, mock_openstack_client, self.bay, mock_scale_manager.return_value) @@ -182,6 +205,13 @@ class TestHandler(db_base.DbTestCase): bay = self.handler.bay_create(self.context, self.bay, timeout) + notifications = fake_notifier.NOTIFICATIONS + self.assertEqual(1, len(notifications)) + self.assertEqual( + 'magnum.bay.create', notifications[0].event_type) + self.assertEqual( + taxonomy.OUTCOME_PENDING, notifications[0].payload['outcome']) + mock_create_stack.assert_called_once_with(self.context, mock.sentinel.osc, self.bay, timeout) @@ -241,6 +271,17 @@ class TestHandler(db_base.DbTestCase): exception.InvalidParameterValue ) + notifications = fake_notifier.NOTIFICATIONS + self.assertEqual(2, len(notifications)) + self.assertEqual( + 'magnum.bay.create', notifications[0].event_type) + self.assertEqual( + taxonomy.OUTCOME_PENDING, notifications[0].payload['outcome']) + self.assertEqual( + 'magnum.bay.create', notifications[1].event_type) + self.assertEqual( + taxonomy.OUTCOME_FAILURE, notifications[1].payload['outcome']) + @patch('magnum.conductor.handlers.bay_conductor.trust_manager') @patch('magnum.conductor.handlers.bay_conductor.cert_manager') @patch('magnum.common.clients.OpenStackClients') @@ -257,6 +298,13 @@ class TestHandler(db_base.DbTestCase): exception.CertificatesToBayFailed ) + notifications = fake_notifier.NOTIFICATIONS + self.assertEqual(1, len(notifications)) + self.assertEqual( + 'magnum.bay.create', notifications[0].event_type) + self.assertEqual( + taxonomy.OUTCOME_FAILURE, notifications[0].payload['outcome']) + @patch('magnum.conductor.handlers.bay_conductor.trust_manager') @patch('magnum.conductor.handlers.bay_conductor.cert_manager') @patch('magnum.conductor.handlers.bay_conductor._create_stack') @@ -276,6 +324,13 @@ class TestHandler(db_base.DbTestCase): False ) + notifications = fake_notifier.NOTIFICATIONS + self.assertEqual(1, len(notifications)) + self.assertEqual( + 'magnum.bay.create', notifications[0].event_type) + self.assertEqual( + taxonomy.OUTCOME_FAILURE, notifications[0].payload['outcome']) + @patch('magnum.conductor.handlers.bay_conductor.trust_manager') @patch('magnum.conductor.handlers.bay_conductor.cert_manager') @patch('magnum.conductor.handlers.bay_conductor._create_stack') @@ -301,12 +356,34 @@ class TestHandler(db_base.DbTestCase): exception.InvalidParameterValue ) + notifications = fake_notifier.NOTIFICATIONS + self.assertEqual(2, len(notifications)) + self.assertEqual( + 'magnum.bay.create', notifications[0].event_type) + self.assertEqual( + taxonomy.OUTCOME_PENDING, notifications[0].payload['outcome']) + self.assertEqual( + 'magnum.bay.create', notifications[1].event_type) + self.assertEqual( + taxonomy.OUTCOME_FAILURE, notifications[1].payload['outcome']) + @patch('magnum.common.clients.OpenStackClients') def test_bay_delete(self, mock_openstack_client_class): osc = mock.MagicMock() mock_openstack_client_class.return_value = osc osc.heat.side_effect = exc.HTTPNotFound self.handler.bay_delete(self.context, self.bay.uuid) + + notifications = fake_notifier.NOTIFICATIONS + self.assertEqual(2, len(notifications)) + self.assertEqual( + 'magnum.bay.delete', notifications[0].event_type) + self.assertEqual( + taxonomy.OUTCOME_PENDING, notifications[0].payload['outcome']) + self.assertEqual( + 'magnum.bay.delete', notifications[1].event_type) + self.assertEqual( + taxonomy.OUTCOME_SUCCESS, notifications[1].payload['outcome']) # The bay has been destroyed self.assertRaises(exception.BayNotFound, objects.Bay.get, self.context, self.bay.uuid) @@ -321,6 +398,17 @@ class TestHandler(db_base.DbTestCase): self.context, self.bay.uuid) + notifications = fake_notifier.NOTIFICATIONS + self.assertEqual(2, len(notifications)) + self.assertEqual( + 'magnum.bay.delete', notifications[0].event_type) + self.assertEqual( + taxonomy.OUTCOME_PENDING, notifications[0].payload['outcome']) + self.assertEqual( + 'magnum.bay.delete', notifications[1].event_type) + self.assertEqual( + taxonomy.OUTCOME_FAILURE, notifications[1].payload['outcome']) + class TestHeatPoller(base.TestCase): @@ -341,6 +429,49 @@ class TestHeatPoller(base.TestCase): poller = bay_conductor.HeatPoller(mock_openstack_client, bay) return (mock_heat_stack, bay, poller) + def test_poll_and_check_send_notification(self): + mock_heat_stack, bay, poller = self.setup_poll_test() + mock_heat_stack.stack_status = bay_status.CREATE_COMPLETE + self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check) + mock_heat_stack.stack_status = bay_status.CREATE_FAILED + self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check) + mock_heat_stack.stack_status = bay_status.DELETE_COMPLETE + self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check) + mock_heat_stack.stack_status = bay_status.DELETE_FAILED + self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check) + mock_heat_stack.stack_status = bay_status.UPDATE_COMPLETE + self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check) + mock_heat_stack.stack_status = bay_status.UPDATE_FAILED + self.assertRaises(loopingcall.LoopingCallDone, poller.poll_and_check) + + self.assertEqual(6, poller.attempts) + notifications = fake_notifier.NOTIFICATIONS + self.assertEqual(6, len(notifications)) + self.assertEqual( + 'magnum.bay.create', notifications[0].event_type) + self.assertEqual( + taxonomy.OUTCOME_SUCCESS, notifications[0].payload['outcome']) + self.assertEqual( + 'magnum.bay.create', notifications[1].event_type) + self.assertEqual( + taxonomy.OUTCOME_FAILURE, notifications[1].payload['outcome']) + self.assertEqual( + 'magnum.bay.delete', notifications[2].event_type) + self.assertEqual( + taxonomy.OUTCOME_SUCCESS, notifications[2].payload['outcome']) + self.assertEqual( + 'magnum.bay.delete', notifications[3].event_type) + self.assertEqual( + taxonomy.OUTCOME_FAILURE, notifications[3].payload['outcome']) + self.assertEqual( + 'magnum.bay.update', notifications[4].event_type) + self.assertEqual( + taxonomy.OUTCOME_SUCCESS, notifications[4].payload['outcome']) + self.assertEqual( + 'magnum.bay.update', notifications[5].event_type) + self.assertEqual( + taxonomy.OUTCOME_FAILURE, notifications[5].payload['outcome']) + def test_poll_no_save(self): mock_heat_stack, bay, poller = self.setup_poll_test() diff --git a/requirements.txt b/requirements.txt index d56267bd7a..f92ed306d4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -40,6 +40,7 @@ oslo.reports>=0.6.0 # Apache-2.0 paramiko>=1.16.0 # LGPL pbr>=1.6 # Apache-2.0 pecan>=1.0.0 # BSD +pycadf!=2.0.0,>=1.1.0 # Apache-2.0 python-barbicanclient>=4.0.0 # Apache-2.0 python-glanceclient>=2.0.0 # Apache-2.0 python-heatclient>=0.6.0 # Apache-2.0