Simplified sink service and improved coverage
- Removed unecessary error handling. - Simplified event type handling code. Change-Id: I030c071eaacd5ef5d5a2084ffe1b177a9e4d7762
This commit is contained in:
parent
097ffc6df1
commit
1e5a700567
@ -31,5 +31,4 @@ class FakeHandler(base.NotificationHandler):
|
|||||||
return CONF[self.name].allowed_event_types
|
return CONF[self.name].allowed_event_types
|
||||||
|
|
||||||
def process_notification(self, context, event_type, payload):
|
def process_notification(self, context, event_type, payload):
|
||||||
LOG.info('%s: received notification - %s',
|
LOG.info('%s: received notification - %s', self.name, event_type)
|
||||||
self.name, event_type)
|
|
||||||
|
@ -14,6 +14,8 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
import oslo_messaging as messaging
|
import oslo_messaging as messaging
|
||||||
|
|
||||||
@ -34,37 +36,38 @@ class Service(service.Service):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Initialize extensions
|
# Initialize extensions
|
||||||
self._server = None
|
self._notification_listener = None
|
||||||
self.handlers = self._init_extensions()
|
self.handlers = self.init_extensions()
|
||||||
self.subscribers = self._get_subscribers()
|
self.allowed_event_types = self.get_allowed_event_types(self.handlers)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def service_name(self):
|
def service_name(self):
|
||||||
return 'sink'
|
return 'sink'
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _init_extensions():
|
def init_extensions():
|
||||||
"""Loads and prepares all enabled extensions"""
|
"""Loads and prepares all enabled extensions"""
|
||||||
|
notification_handlers = notification_handler.get_notification_handlers(
|
||||||
enabled_notification_handlers = (
|
|
||||||
CONF['service:sink'].enabled_notification_handlers
|
CONF['service:sink'].enabled_notification_handlers
|
||||||
)
|
)
|
||||||
|
|
||||||
notification_handlers = notification_handler.get_notification_handlers(
|
if not notification_handlers:
|
||||||
enabled_notification_handlers)
|
|
||||||
|
|
||||||
if len(notification_handlers) == 0:
|
|
||||||
LOG.warning('No designate-sink handlers enabled or loaded')
|
LOG.warning('No designate-sink handlers enabled or loaded')
|
||||||
|
|
||||||
return notification_handlers
|
return notification_handlers
|
||||||
|
|
||||||
def _get_subscribers(self):
|
@staticmethod
|
||||||
subscriptions = {}
|
def get_allowed_event_types(handlers):
|
||||||
for handler in self.handlers:
|
"""Build a list of all allowed event types."""
|
||||||
for et in handler.get_event_types():
|
allowed_event_types = []
|
||||||
subscriptions.setdefault(et, [])
|
|
||||||
subscriptions[et].append(handler)
|
for handler in handlers:
|
||||||
return subscriptions
|
for event_type in handler.get_event_types():
|
||||||
|
if event_type in allowed_event_types:
|
||||||
|
continue
|
||||||
|
allowed_event_types.append(event_type)
|
||||||
|
|
||||||
|
return allowed_event_types
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
super().start()
|
super().start()
|
||||||
@ -75,23 +78,15 @@ class Service(service.Service):
|
|||||||
# TODO(ekarlso): Change this is to endpoint objects rather then
|
# TODO(ekarlso): Change this is to endpoint objects rather then
|
||||||
# ourselves?
|
# ourselves?
|
||||||
if targets:
|
if targets:
|
||||||
self._server = rpc.get_notification_listener(
|
self._notification_listener = rpc.get_notification_listener(
|
||||||
targets, [self],
|
targets, [self],
|
||||||
pool=CONF['service:sink'].listener_pool_name
|
pool=CONF['service:sink'].listener_pool_name
|
||||||
)
|
)
|
||||||
self._server.start()
|
self._notification_listener.start()
|
||||||
|
|
||||||
def stop(self, graceful=True):
|
def stop(self, graceful=True):
|
||||||
# Try to shut the connection down, but if we get any sort of
|
if self._notification_listener:
|
||||||
# errors, go ahead and ignore them.. as we're shutting down anyway
|
self._notification_listener.stop()
|
||||||
try:
|
|
||||||
if self._server:
|
|
||||||
self._server.stop()
|
|
||||||
except Exception as e:
|
|
||||||
LOG.warning(
|
|
||||||
'Unable to gracefully stop the notification listener: %s', e
|
|
||||||
)
|
|
||||||
|
|
||||||
super().stop(graceful)
|
super().stop(graceful)
|
||||||
|
|
||||||
def _get_targets(self):
|
def _get_targets(self):
|
||||||
@ -108,19 +103,15 @@ class Service(service.Service):
|
|||||||
targets.append(target)
|
targets.append(target)
|
||||||
return targets
|
return targets
|
||||||
|
|
||||||
def _get_handler_event_types(self):
|
|
||||||
"""return a dict - keys are the event types we can handle"""
|
|
||||||
return self.subscribers
|
|
||||||
|
|
||||||
def info(self, context, publisher_id, event_type, payload, metadata):
|
def info(self, context, publisher_id, event_type, payload, metadata):
|
||||||
"""
|
"""
|
||||||
Processes an incoming notification, offering each extension the
|
Processes an incoming notification, offering each extension the
|
||||||
opportunity to handle it.
|
opportunity to handle it.
|
||||||
"""
|
"""
|
||||||
# NOTE(zykes): Only bother to actually do processing if there's any
|
if event_type not in self.allowed_event_types:
|
||||||
# matching events, skips logging of things like compute.exists etc.
|
return
|
||||||
if event_type in self._get_handler_event_types():
|
|
||||||
for handler in self.handlers:
|
for handler in self.handlers:
|
||||||
if event_type in handler.get_event_types():
|
if event_type in handler.get_event_types():
|
||||||
LOG.debug('Found handler for: %s', event_type)
|
LOG.debug('Found handler for: %s', event_type)
|
||||||
handler.process_notification(context, event_type, payload)
|
handler.process_notification(context, event_type, payload)
|
||||||
|
@ -16,8 +16,10 @@ from unittest import mock
|
|||||||
from oslo_config import fixture as cfg_fixture
|
from oslo_config import fixture as cfg_fixture
|
||||||
import oslotest.base
|
import oslotest.base
|
||||||
|
|
||||||
|
from designate.common import profiler
|
||||||
import designate.conf
|
import designate.conf
|
||||||
import designate.rpc
|
from designate import policy
|
||||||
|
from designate import rpc
|
||||||
from designate.sink import service
|
from designate.sink import service
|
||||||
from designate.tests import base_fixtures
|
from designate.tests import base_fixtures
|
||||||
|
|
||||||
@ -26,8 +28,10 @@ CONF = designate.conf.CONF
|
|||||||
|
|
||||||
|
|
||||||
class TestSinkService(oslotest.base.BaseTestCase):
|
class TestSinkService(oslotest.base.BaseTestCase):
|
||||||
@mock.patch('designate.policy.init', mock.Mock())
|
@mock.patch.object(policy, 'init')
|
||||||
def setUp(self):
|
@mock.patch.object(rpc, 'get_client')
|
||||||
|
@mock.patch.object(profiler, 'setup_profiler')
|
||||||
|
def setUp(self, mock_setup_profiler, mock_get_client, mock_policy_init):
|
||||||
super().setUp()
|
super().setUp()
|
||||||
self.stdlog = base_fixtures.StandardLogging()
|
self.stdlog = base_fixtures.StandardLogging()
|
||||||
self.useFixture(self.stdlog)
|
self.useFixture(self.stdlog)
|
||||||
@ -36,19 +40,101 @@ class TestSinkService(oslotest.base.BaseTestCase):
|
|||||||
CONF.set_override(
|
CONF.set_override(
|
||||||
'enabled_notification_handlers', ['fake'], 'service:sink'
|
'enabled_notification_handlers', ['fake'], 'service:sink'
|
||||||
)
|
)
|
||||||
|
CONF.set_override(
|
||||||
|
'allowed_event_types', ['compute.instance.create.end'],
|
||||||
|
'handler:fake'
|
||||||
|
)
|
||||||
|
|
||||||
self.service = service.Service()
|
self.service = service.Service()
|
||||||
|
|
||||||
@mock.patch.object(designate.rpc, 'get_notification_listener')
|
self.context = mock.Mock()
|
||||||
|
|
||||||
|
mock_setup_profiler.assert_called()
|
||||||
|
mock_get_client.assert_called()
|
||||||
|
mock_policy_init.assert_called()
|
||||||
|
|
||||||
|
@mock.patch.object(rpc, 'get_notification_listener')
|
||||||
def test_service_start(self, mock_notification_listener):
|
def test_service_start(self, mock_notification_listener):
|
||||||
self.service.start()
|
self.service.start()
|
||||||
|
|
||||||
self.assertTrue(mock_notification_listener.called)
|
mock_notification_listener.assert_called()
|
||||||
|
|
||||||
|
@mock.patch.object(policy, 'init', mock.Mock())
|
||||||
|
@mock.patch.object(rpc, 'get_client', mock.Mock())
|
||||||
|
@mock.patch.object(profiler, 'setup_profiler', mock.Mock())
|
||||||
|
@mock.patch.object(designate.rpc, 'get_notification_listener')
|
||||||
|
def test_service_start_no_targets(self, mock_notification_listener):
|
||||||
|
CONF.set_override(
|
||||||
|
'enabled_notification_handlers', [], 'service:sink'
|
||||||
|
)
|
||||||
|
|
||||||
|
sink_service = service.Service()
|
||||||
|
|
||||||
|
sink_service.start()
|
||||||
|
|
||||||
|
mock_notification_listener.assert_not_called()
|
||||||
|
|
||||||
def test_service_stop(self):
|
def test_service_stop(self):
|
||||||
|
self.service._notification_listener = None
|
||||||
|
|
||||||
self.service.stop()
|
self.service.stop()
|
||||||
|
|
||||||
self.assertIn('Stopping sink service', self.stdlog.logger.output)
|
self.assertIn('Stopping sink service', self.stdlog.logger.output)
|
||||||
|
self.assertIsNone(self.service._notification_listener)
|
||||||
|
|
||||||
|
def test_service_stop_and_notification_listener_stopped(self):
|
||||||
|
self.service._notification_listener = mock.Mock()
|
||||||
|
|
||||||
|
self.service.stop()
|
||||||
|
|
||||||
|
self.assertIn('Stopping sink service', self.stdlog.logger.output)
|
||||||
|
self.service._notification_listener.stop.assert_called_with()
|
||||||
|
|
||||||
def test_service_name(self):
|
def test_service_name(self):
|
||||||
self.assertEqual('sink', self.service.service_name)
|
self.assertEqual('sink', self.service.service_name)
|
||||||
|
|
||||||
|
def test_get_allowed_event_types(self):
|
||||||
|
self.assertEqual(
|
||||||
|
['compute.instance.create.end'],
|
||||||
|
self.service.get_allowed_event_types(self.service.handlers)
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_get_allowed_event_types_with_duplicates(self):
|
||||||
|
mock_handler1 = mock.Mock()
|
||||||
|
mock_handler2 = mock.Mock()
|
||||||
|
|
||||||
|
mock_handler1.get_event_types.return_value = [
|
||||||
|
'compute.instance.create.start'
|
||||||
|
]
|
||||||
|
mock_handler2.get_event_types.return_value = [
|
||||||
|
'compute.instance.create.end',
|
||||||
|
'compute.instance.create.start'
|
||||||
|
]
|
||||||
|
|
||||||
|
handlers = [mock_handler1, mock_handler2]
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
['compute.instance.create.start', 'compute.instance.create.end'],
|
||||||
|
self.service.get_allowed_event_types(handlers)
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_service_info(self):
|
||||||
|
events = [
|
||||||
|
'compute.instance.create.end',
|
||||||
|
'compute.instance.create.start'
|
||||||
|
]
|
||||||
|
self.service.allowed_event_types = events
|
||||||
|
|
||||||
|
for event in events:
|
||||||
|
self.service.info(
|
||||||
|
self.context, 'publisher_id', event, mock.Mock(), mock.Mock()
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertIn(
|
||||||
|
'received notification - compute.instance.create.end',
|
||||||
|
self.stdlog.logger.output
|
||||||
|
)
|
||||||
|
self.assertNotIn(
|
||||||
|
'received notification - compute.instance.create.start',
|
||||||
|
self.stdlog.logger.output
|
||||||
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user