Simplified sink service and improved coverage

- Removed unecessary error handling.
- Simplified event type handling code.

Change-Id: I030c071eaacd5ef5d5a2084ffe1b177a9e4d7762
This commit is contained in:
Erik Olof Gunnar Andersson 2024-01-09 17:09:15 -08:00
parent 808f0c9198
commit f314f471b0
3 changed files with 123 additions and 47 deletions

View File

@ -31,5 +31,4 @@ class FakeHandler(base.NotificationHandler):
return CONF[self.name].allowed_event_types return CONF[self.name].allowed_event_types
def process_notification(self, context, event_type, payload): def process_notification(self, context, event_type, payload):
LOG.info('%s: received notification - %s', LOG.info('%s: received notification - %s', self.name, event_type)
self.name, event_type)

View File

@ -14,6 +14,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from oslo_log import log as logging from oslo_log import log as logging
import oslo_messaging as messaging import oslo_messaging as messaging
@ -34,37 +36,38 @@ class Service(service.Service):
) )
# Initialize extensions # Initialize extensions
self._server = None self._notification_listener = None
self.handlers = self._init_extensions() self.handlers = self.init_extensions()
self.subscribers = self._get_subscribers() self.allowed_event_types = self.get_allowed_event_types(self.handlers)
@property @property
def service_name(self): def service_name(self):
return 'sink' return 'sink'
@staticmethod @staticmethod
def _init_extensions(): def init_extensions():
"""Loads and prepares all enabled extensions""" """Loads and prepares all enabled extensions"""
notification_handlers = notification_handler.get_notification_handlers(
enabled_notification_handlers = (
CONF['service:sink'].enabled_notification_handlers CONF['service:sink'].enabled_notification_handlers
) )
notification_handlers = notification_handler.get_notification_handlers( if not notification_handlers:
enabled_notification_handlers)
if len(notification_handlers) == 0:
LOG.warning('No designate-sink handlers enabled or loaded') LOG.warning('No designate-sink handlers enabled or loaded')
return notification_handlers return notification_handlers
def _get_subscribers(self): @staticmethod
subscriptions = {} def get_allowed_event_types(handlers):
for handler in self.handlers: """Build a list of all allowed event types."""
for et in handler.get_event_types(): allowed_event_types = []
subscriptions.setdefault(et, [])
subscriptions[et].append(handler) for handler in handlers:
return subscriptions for event_type in handler.get_event_types():
if event_type in allowed_event_types:
continue
allowed_event_types.append(event_type)
return allowed_event_types
def start(self): def start(self):
super().start() super().start()
@ -75,23 +78,15 @@ class Service(service.Service):
# TODO(ekarlso): Change this is to endpoint objects rather then # TODO(ekarlso): Change this is to endpoint objects rather then
# ourselves? # ourselves?
if targets: if targets:
self._server = rpc.get_notification_listener( self._notification_listener = rpc.get_notification_listener(
targets, [self], targets, [self],
pool=CONF['service:sink'].listener_pool_name pool=CONF['service:sink'].listener_pool_name
) )
self._server.start() self._notification_listener.start()
def stop(self, graceful=True): def stop(self, graceful=True):
# Try to shut the connection down, but if we get any sort of if self._notification_listener:
# errors, go ahead and ignore them.. as we're shutting down anyway self._notification_listener.stop()
try:
if self._server:
self._server.stop()
except Exception as e:
LOG.warning(
'Unable to gracefully stop the notification listener: %s', e
)
super().stop(graceful) super().stop(graceful)
def _get_targets(self): def _get_targets(self):
@ -108,19 +103,15 @@ class Service(service.Service):
targets.append(target) targets.append(target)
return targets return targets
def _get_handler_event_types(self):
"""return a dict - keys are the event types we can handle"""
return self.subscribers
def info(self, context, publisher_id, event_type, payload, metadata): def info(self, context, publisher_id, event_type, payload, metadata):
""" """
Processes an incoming notification, offering each extension the Processes an incoming notification, offering each extension the
opportunity to handle it. opportunity to handle it.
""" """
# NOTE(zykes): Only bother to actually do processing if there's any if event_type not in self.allowed_event_types:
# matching events, skips logging of things like compute.exists etc. return
if event_type in self._get_handler_event_types():
for handler in self.handlers: for handler in self.handlers:
if event_type in handler.get_event_types(): if event_type in handler.get_event_types():
LOG.debug('Found handler for: %s', event_type) LOG.debug('Found handler for: %s', event_type)
handler.process_notification(context, event_type, payload) handler.process_notification(context, event_type, payload)

View File

@ -16,8 +16,10 @@ from unittest import mock
from oslo_config import fixture as cfg_fixture from oslo_config import fixture as cfg_fixture
import oslotest.base import oslotest.base
from designate.common import profiler
import designate.conf import designate.conf
import designate.rpc from designate import policy
from designate import rpc
from designate.sink import service from designate.sink import service
from designate.tests import base_fixtures from designate.tests import base_fixtures
@ -26,8 +28,10 @@ CONF = designate.conf.CONF
class TestSinkService(oslotest.base.BaseTestCase): class TestSinkService(oslotest.base.BaseTestCase):
@mock.patch('designate.policy.init', mock.Mock()) @mock.patch.object(policy, 'init')
def setUp(self): @mock.patch.object(rpc, 'get_client')
@mock.patch.object(profiler, 'setup_profiler')
def setUp(self, mock_setup_profiler, mock_get_client, mock_policy_init):
super().setUp() super().setUp()
self.stdlog = base_fixtures.StandardLogging() self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog) self.useFixture(self.stdlog)
@ -36,19 +40,101 @@ class TestSinkService(oslotest.base.BaseTestCase):
CONF.set_override( CONF.set_override(
'enabled_notification_handlers', ['fake'], 'service:sink' 'enabled_notification_handlers', ['fake'], 'service:sink'
) )
CONF.set_override(
'allowed_event_types', ['compute.instance.create.end'],
'handler:fake'
)
self.service = service.Service() self.service = service.Service()
@mock.patch.object(designate.rpc, 'get_notification_listener') self.context = mock.Mock()
mock_setup_profiler.assert_called()
mock_get_client.assert_called()
mock_policy_init.assert_called()
@mock.patch.object(rpc, 'get_notification_listener')
def test_service_start(self, mock_notification_listener): def test_service_start(self, mock_notification_listener):
self.service.start() self.service.start()
self.assertTrue(mock_notification_listener.called) mock_notification_listener.assert_called()
@mock.patch.object(policy, 'init', mock.Mock())
@mock.patch.object(rpc, 'get_client', mock.Mock())
@mock.patch.object(profiler, 'setup_profiler', mock.Mock())
@mock.patch.object(designate.rpc, 'get_notification_listener')
def test_service_start_no_targets(self, mock_notification_listener):
CONF.set_override(
'enabled_notification_handlers', [], 'service:sink'
)
sink_service = service.Service()
sink_service.start()
mock_notification_listener.assert_not_called()
def test_service_stop(self): def test_service_stop(self):
self.service._notification_listener = None
self.service.stop() self.service.stop()
self.assertIn('Stopping sink service', self.stdlog.logger.output) self.assertIn('Stopping sink service', self.stdlog.logger.output)
self.assertIsNone(self.service._notification_listener)
def test_service_stop_and_notification_listener_stopped(self):
self.service._notification_listener = mock.Mock()
self.service.stop()
self.assertIn('Stopping sink service', self.stdlog.logger.output)
self.service._notification_listener.stop.assert_called_with()
def test_service_name(self): def test_service_name(self):
self.assertEqual('sink', self.service.service_name) self.assertEqual('sink', self.service.service_name)
def test_get_allowed_event_types(self):
self.assertEqual(
['compute.instance.create.end'],
self.service.get_allowed_event_types(self.service.handlers)
)
def test_get_allowed_event_types_with_duplicates(self):
mock_handler1 = mock.Mock()
mock_handler2 = mock.Mock()
mock_handler1.get_event_types.return_value = [
'compute.instance.create.start'
]
mock_handler2.get_event_types.return_value = [
'compute.instance.create.end',
'compute.instance.create.start'
]
handlers = [mock_handler1, mock_handler2]
self.assertEqual(
['compute.instance.create.start', 'compute.instance.create.end'],
self.service.get_allowed_event_types(handlers)
)
def test_service_info(self):
events = [
'compute.instance.create.end',
'compute.instance.create.start'
]
self.service.allowed_event_types = events
for event in events:
self.service.info(
self.context, 'publisher_id', event, mock.Mock(), mock.Mock()
)
self.assertIn(
'received notification - compute.instance.create.end',
self.stdlog.logger.output
)
self.assertNotIn(
'received notification - compute.instance.create.start',
self.stdlog.logger.output
)