Merge "Simplified sink service and improved coverage"
This commit is contained in:
commit
05ebd15122
@ -31,5 +31,4 @@ class FakeHandler(base.NotificationHandler):
|
||||
return CONF[self.name].allowed_event_types
|
||||
|
||||
def process_notification(self, context, event_type, payload):
|
||||
LOG.info('%s: received notification - %s',
|
||||
self.name, event_type)
|
||||
LOG.info('%s: received notification - %s', self.name, event_type)
|
||||
|
@ -14,6 +14,8 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
|
||||
@ -34,37 +36,38 @@ class Service(service.Service):
|
||||
)
|
||||
|
||||
# Initialize extensions
|
||||
self._server = None
|
||||
self.handlers = self._init_extensions()
|
||||
self.subscribers = self._get_subscribers()
|
||||
self._notification_listener = None
|
||||
self.handlers = self.init_extensions()
|
||||
self.allowed_event_types = self.get_allowed_event_types(self.handlers)
|
||||
|
||||
@property
|
||||
def service_name(self):
|
||||
return 'sink'
|
||||
|
||||
@staticmethod
|
||||
def _init_extensions():
|
||||
def init_extensions():
|
||||
"""Loads and prepares all enabled extensions"""
|
||||
|
||||
enabled_notification_handlers = (
|
||||
notification_handlers = notification_handler.get_notification_handlers(
|
||||
CONF['service:sink'].enabled_notification_handlers
|
||||
)
|
||||
|
||||
notification_handlers = notification_handler.get_notification_handlers(
|
||||
enabled_notification_handlers)
|
||||
|
||||
if len(notification_handlers) == 0:
|
||||
if not notification_handlers:
|
||||
LOG.warning('No designate-sink handlers enabled or loaded')
|
||||
|
||||
return notification_handlers
|
||||
|
||||
def _get_subscribers(self):
|
||||
subscriptions = {}
|
||||
for handler in self.handlers:
|
||||
for et in handler.get_event_types():
|
||||
subscriptions.setdefault(et, [])
|
||||
subscriptions[et].append(handler)
|
||||
return subscriptions
|
||||
@staticmethod
|
||||
def get_allowed_event_types(handlers):
|
||||
"""Build a list of all allowed event types."""
|
||||
allowed_event_types = []
|
||||
|
||||
for handler in handlers:
|
||||
for event_type in handler.get_event_types():
|
||||
if event_type in allowed_event_types:
|
||||
continue
|
||||
allowed_event_types.append(event_type)
|
||||
|
||||
return allowed_event_types
|
||||
|
||||
def start(self):
|
||||
super().start()
|
||||
@ -75,23 +78,15 @@ class Service(service.Service):
|
||||
# TODO(ekarlso): Change this is to endpoint objects rather then
|
||||
# ourselves?
|
||||
if targets:
|
||||
self._server = rpc.get_notification_listener(
|
||||
self._notification_listener = rpc.get_notification_listener(
|
||||
targets, [self],
|
||||
pool=CONF['service:sink'].listener_pool_name
|
||||
)
|
||||
self._server.start()
|
||||
self._notification_listener.start()
|
||||
|
||||
def stop(self, graceful=True):
|
||||
# Try to shut the connection down, but if we get any sort of
|
||||
# errors, go ahead and ignore them.. as we're shutting down anyway
|
||||
try:
|
||||
if self._server:
|
||||
self._server.stop()
|
||||
except Exception as e:
|
||||
LOG.warning(
|
||||
'Unable to gracefully stop the notification listener: %s', e
|
||||
)
|
||||
|
||||
if self._notification_listener:
|
||||
self._notification_listener.stop()
|
||||
super().stop(graceful)
|
||||
|
||||
def _get_targets(self):
|
||||
@ -108,19 +103,15 @@ class Service(service.Service):
|
||||
targets.append(target)
|
||||
return targets
|
||||
|
||||
def _get_handler_event_types(self):
|
||||
"""return a dict - keys are the event types we can handle"""
|
||||
return self.subscribers
|
||||
|
||||
def info(self, context, publisher_id, event_type, payload, metadata):
|
||||
"""
|
||||
Processes an incoming notification, offering each extension the
|
||||
opportunity to handle it.
|
||||
"""
|
||||
# NOTE(zykes): Only bother to actually do processing if there's any
|
||||
# matching events, skips logging of things like compute.exists etc.
|
||||
if event_type in self._get_handler_event_types():
|
||||
for handler in self.handlers:
|
||||
if event_type in handler.get_event_types():
|
||||
LOG.debug('Found handler for: %s', event_type)
|
||||
handler.process_notification(context, event_type, payload)
|
||||
if event_type not in self.allowed_event_types:
|
||||
return
|
||||
|
||||
for handler in self.handlers:
|
||||
if event_type in handler.get_event_types():
|
||||
LOG.debug('Found handler for: %s', event_type)
|
||||
handler.process_notification(context, event_type, payload)
|
||||
|
@ -16,8 +16,10 @@ from unittest import mock
|
||||
from oslo_config import fixture as cfg_fixture
|
||||
import oslotest.base
|
||||
|
||||
from designate.common import profiler
|
||||
import designate.conf
|
||||
import designate.rpc
|
||||
from designate import policy
|
||||
from designate import rpc
|
||||
from designate.sink import service
|
||||
from designate.tests import base_fixtures
|
||||
|
||||
@ -26,8 +28,10 @@ CONF = designate.conf.CONF
|
||||
|
||||
|
||||
class TestSinkService(oslotest.base.BaseTestCase):
|
||||
@mock.patch('designate.policy.init', mock.Mock())
|
||||
def setUp(self):
|
||||
@mock.patch.object(policy, 'init')
|
||||
@mock.patch.object(rpc, 'get_client')
|
||||
@mock.patch.object(profiler, 'setup_profiler')
|
||||
def setUp(self, mock_setup_profiler, mock_get_client, mock_policy_init):
|
||||
super().setUp()
|
||||
self.stdlog = base_fixtures.StandardLogging()
|
||||
self.useFixture(self.stdlog)
|
||||
@ -36,19 +40,101 @@ class TestSinkService(oslotest.base.BaseTestCase):
|
||||
CONF.set_override(
|
||||
'enabled_notification_handlers', ['fake'], 'service:sink'
|
||||
)
|
||||
CONF.set_override(
|
||||
'allowed_event_types', ['compute.instance.create.end'],
|
||||
'handler:fake'
|
||||
)
|
||||
|
||||
self.service = service.Service()
|
||||
|
||||
@mock.patch.object(designate.rpc, 'get_notification_listener')
|
||||
self.context = mock.Mock()
|
||||
|
||||
mock_setup_profiler.assert_called()
|
||||
mock_get_client.assert_called()
|
||||
mock_policy_init.assert_called()
|
||||
|
||||
@mock.patch.object(rpc, 'get_notification_listener')
|
||||
def test_service_start(self, mock_notification_listener):
|
||||
self.service.start()
|
||||
|
||||
self.assertTrue(mock_notification_listener.called)
|
||||
mock_notification_listener.assert_called()
|
||||
|
||||
@mock.patch.object(policy, 'init', mock.Mock())
|
||||
@mock.patch.object(rpc, 'get_client', mock.Mock())
|
||||
@mock.patch.object(profiler, 'setup_profiler', mock.Mock())
|
||||
@mock.patch.object(designate.rpc, 'get_notification_listener')
|
||||
def test_service_start_no_targets(self, mock_notification_listener):
|
||||
CONF.set_override(
|
||||
'enabled_notification_handlers', [], 'service:sink'
|
||||
)
|
||||
|
||||
sink_service = service.Service()
|
||||
|
||||
sink_service.start()
|
||||
|
||||
mock_notification_listener.assert_not_called()
|
||||
|
||||
def test_service_stop(self):
|
||||
self.service._notification_listener = None
|
||||
|
||||
self.service.stop()
|
||||
|
||||
self.assertIn('Stopping sink service', self.stdlog.logger.output)
|
||||
self.assertIsNone(self.service._notification_listener)
|
||||
|
||||
def test_service_stop_and_notification_listener_stopped(self):
|
||||
self.service._notification_listener = mock.Mock()
|
||||
|
||||
self.service.stop()
|
||||
|
||||
self.assertIn('Stopping sink service', self.stdlog.logger.output)
|
||||
self.service._notification_listener.stop.assert_called_with()
|
||||
|
||||
def test_service_name(self):
|
||||
self.assertEqual('sink', self.service.service_name)
|
||||
|
||||
def test_get_allowed_event_types(self):
|
||||
self.assertEqual(
|
||||
['compute.instance.create.end'],
|
||||
self.service.get_allowed_event_types(self.service.handlers)
|
||||
)
|
||||
|
||||
def test_get_allowed_event_types_with_duplicates(self):
|
||||
mock_handler1 = mock.Mock()
|
||||
mock_handler2 = mock.Mock()
|
||||
|
||||
mock_handler1.get_event_types.return_value = [
|
||||
'compute.instance.create.start'
|
||||
]
|
||||
mock_handler2.get_event_types.return_value = [
|
||||
'compute.instance.create.end',
|
||||
'compute.instance.create.start'
|
||||
]
|
||||
|
||||
handlers = [mock_handler1, mock_handler2]
|
||||
|
||||
self.assertEqual(
|
||||
['compute.instance.create.start', 'compute.instance.create.end'],
|
||||
self.service.get_allowed_event_types(handlers)
|
||||
)
|
||||
|
||||
def test_service_info(self):
|
||||
events = [
|
||||
'compute.instance.create.end',
|
||||
'compute.instance.create.start'
|
||||
]
|
||||
self.service.allowed_event_types = events
|
||||
|
||||
for event in events:
|
||||
self.service.info(
|
||||
self.context, 'publisher_id', event, mock.Mock(), mock.Mock()
|
||||
)
|
||||
|
||||
self.assertIn(
|
||||
'received notification - compute.instance.create.end',
|
||||
self.stdlog.logger.output
|
||||
)
|
||||
self.assertNotIn(
|
||||
'received notification - compute.instance.create.start',
|
||||
self.stdlog.logger.output
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user