diff --git a/designate/notification_handler/fake.py b/designate/notification_handler/fake.py index b7c26a42d..0c9cf411f 100644 --- a/designate/notification_handler/fake.py +++ b/designate/notification_handler/fake.py @@ -31,5 +31,4 @@ class FakeHandler(base.NotificationHandler): return CONF[self.name].allowed_event_types def process_notification(self, context, event_type, payload): - LOG.info('%s: received notification - %s', - self.name, event_type) + LOG.info('%s: received notification - %s', self.name, event_type) diff --git a/designate/sink/service.py b/designate/sink/service.py index fee17d775..913638d7e 100644 --- a/designate/sink/service.py +++ b/designate/sink/service.py @@ -14,6 +14,8 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + + from oslo_log import log as logging import oslo_messaging as messaging @@ -34,37 +36,38 @@ class Service(service.Service): ) # Initialize extensions - self._server = None - self.handlers = self._init_extensions() - self.subscribers = self._get_subscribers() + self._notification_listener = None + self.handlers = self.init_extensions() + self.allowed_event_types = self.get_allowed_event_types(self.handlers) @property def service_name(self): return 'sink' @staticmethod - def _init_extensions(): + def init_extensions(): """Loads and prepares all enabled extensions""" - - enabled_notification_handlers = ( + notification_handlers = notification_handler.get_notification_handlers( CONF['service:sink'].enabled_notification_handlers ) - notification_handlers = notification_handler.get_notification_handlers( - enabled_notification_handlers) - - if len(notification_handlers) == 0: + if not notification_handlers: LOG.warning('No designate-sink handlers enabled or loaded') return notification_handlers - def _get_subscribers(self): - subscriptions = {} - for handler in self.handlers: - for et in handler.get_event_types(): - subscriptions.setdefault(et, []) - subscriptions[et].append(handler) - return subscriptions + @staticmethod + def get_allowed_event_types(handlers): + """Build a list of all allowed event types.""" + allowed_event_types = [] + + for handler in handlers: + for event_type in handler.get_event_types(): + if event_type in allowed_event_types: + continue + allowed_event_types.append(event_type) + + return allowed_event_types def start(self): super().start() @@ -75,23 +78,15 @@ class Service(service.Service): # TODO(ekarlso): Change this is to endpoint objects rather then # ourselves? if targets: - self._server = rpc.get_notification_listener( + self._notification_listener = rpc.get_notification_listener( targets, [self], pool=CONF['service:sink'].listener_pool_name ) - self._server.start() + self._notification_listener.start() def stop(self, graceful=True): - # Try to shut the connection down, but if we get any sort of - # errors, go ahead and ignore them.. as we're shutting down anyway - try: - if self._server: - self._server.stop() - except Exception as e: - LOG.warning( - 'Unable to gracefully stop the notification listener: %s', e - ) - + if self._notification_listener: + self._notification_listener.stop() super().stop(graceful) def _get_targets(self): @@ -108,19 +103,15 @@ class Service(service.Service): targets.append(target) return targets - def _get_handler_event_types(self): - """return a dict - keys are the event types we can handle""" - return self.subscribers - def info(self, context, publisher_id, event_type, payload, metadata): """ Processes an incoming notification, offering each extension the opportunity to handle it. """ - # NOTE(zykes): Only bother to actually do processing if there's any - # matching events, skips logging of things like compute.exists etc. - if event_type in self._get_handler_event_types(): - for handler in self.handlers: - if event_type in handler.get_event_types(): - LOG.debug('Found handler for: %s', event_type) - handler.process_notification(context, event_type, payload) + if event_type not in self.allowed_event_types: + return + + for handler in self.handlers: + if event_type in handler.get_event_types(): + LOG.debug('Found handler for: %s', event_type) + handler.process_notification(context, event_type, payload) diff --git a/designate/tests/unit/sink/test_service.py b/designate/tests/unit/sink/test_service.py index a081a417c..5a6324308 100644 --- a/designate/tests/unit/sink/test_service.py +++ b/designate/tests/unit/sink/test_service.py @@ -16,8 +16,10 @@ from unittest import mock from oslo_config import fixture as cfg_fixture import oslotest.base +from designate.common import profiler import designate.conf -import designate.rpc +from designate import policy +from designate import rpc from designate.sink import service from designate.tests import base_fixtures @@ -26,8 +28,10 @@ CONF = designate.conf.CONF class TestSinkService(oslotest.base.BaseTestCase): - @mock.patch('designate.policy.init', mock.Mock()) - def setUp(self): + @mock.patch.object(policy, 'init') + @mock.patch.object(rpc, 'get_client') + @mock.patch.object(profiler, 'setup_profiler') + def setUp(self, mock_setup_profiler, mock_get_client, mock_policy_init): super().setUp() self.stdlog = base_fixtures.StandardLogging() self.useFixture(self.stdlog) @@ -36,19 +40,101 @@ class TestSinkService(oslotest.base.BaseTestCase): CONF.set_override( 'enabled_notification_handlers', ['fake'], 'service:sink' ) + CONF.set_override( + 'allowed_event_types', ['compute.instance.create.end'], + 'handler:fake' + ) self.service = service.Service() - @mock.patch.object(designate.rpc, 'get_notification_listener') + self.context = mock.Mock() + + mock_setup_profiler.assert_called() + mock_get_client.assert_called() + mock_policy_init.assert_called() + + @mock.patch.object(rpc, 'get_notification_listener') def test_service_start(self, mock_notification_listener): self.service.start() - self.assertTrue(mock_notification_listener.called) + mock_notification_listener.assert_called() + + @mock.patch.object(policy, 'init', mock.Mock()) + @mock.patch.object(rpc, 'get_client', mock.Mock()) + @mock.patch.object(profiler, 'setup_profiler', mock.Mock()) + @mock.patch.object(designate.rpc, 'get_notification_listener') + def test_service_start_no_targets(self, mock_notification_listener): + CONF.set_override( + 'enabled_notification_handlers', [], 'service:sink' + ) + + sink_service = service.Service() + + sink_service.start() + + mock_notification_listener.assert_not_called() def test_service_stop(self): + self.service._notification_listener = None + self.service.stop() self.assertIn('Stopping sink service', self.stdlog.logger.output) + self.assertIsNone(self.service._notification_listener) + + def test_service_stop_and_notification_listener_stopped(self): + self.service._notification_listener = mock.Mock() + + self.service.stop() + + self.assertIn('Stopping sink service', self.stdlog.logger.output) + self.service._notification_listener.stop.assert_called_with() def test_service_name(self): self.assertEqual('sink', self.service.service_name) + + def test_get_allowed_event_types(self): + self.assertEqual( + ['compute.instance.create.end'], + self.service.get_allowed_event_types(self.service.handlers) + ) + + def test_get_allowed_event_types_with_duplicates(self): + mock_handler1 = mock.Mock() + mock_handler2 = mock.Mock() + + mock_handler1.get_event_types.return_value = [ + 'compute.instance.create.start' + ] + mock_handler2.get_event_types.return_value = [ + 'compute.instance.create.end', + 'compute.instance.create.start' + ] + + handlers = [mock_handler1, mock_handler2] + + self.assertEqual( + ['compute.instance.create.start', 'compute.instance.create.end'], + self.service.get_allowed_event_types(handlers) + ) + + def test_service_info(self): + events = [ + 'compute.instance.create.end', + 'compute.instance.create.start' + ] + self.service.allowed_event_types = events + + for event in events: + self.service.info( + self.context, 'publisher_id', event, mock.Mock(), mock.Mock() + ) + + self.assertIn( + 'received notification - compute.instance.create.end', + self.stdlog.logger.output + ) + self.assertNotIn( + 'received notification - compute.instance.create.start', + self.stdlog.logger.output + )