Merge "Added sink unit tests"

This commit is contained in:
Zuul 2019-06-11 16:47:38 +00:00 committed by Gerrit Code Review
commit fc5700bd61
8 changed files with 242 additions and 0 deletions

View File

@ -20,6 +20,11 @@ SINK_GROUP = cfg.OptGroup(
title="Configuration for Sink Service"
)
SINK_FAKE_GROUP = cfg.OptGroup(
name='handler:fake',
title="Configuration for the Fake Notification Handler"
)
SINK_NEUTRON_GROUP = cfg.OptGroup(
name='handler:neutron_floatingip',
title="Configuration for Neutron Notification Handler"
@ -44,6 +49,15 @@ SINK_OPTS = [
'by all oslo.messaging drivers.'),
]
SINK_FAKE_OPTS = [
cfg.ListOpt('notification-topics', default=['notifications'],
help='notification events for the fake notification handler'),
cfg.StrOpt('control-exchange', default='fake',
help='control-exchange for fake notifications'),
cfg.ListOpt('allowed-event-types', default=[],
help='the event types we want the fake handler to accept'),
]
SINK_NEUTRON_OPTS = [
cfg.ListOpt('notification-topics', default=['notifications'],
help='notification any events from neutron'),
@ -74,6 +88,8 @@ SINK_NOVA_OPTS = [
def register_opts(conf):
conf.register_group(SINK_GROUP)
conf.register_opts(SINK_OPTS, group=SINK_GROUP)
conf.register_group(SINK_FAKE_GROUP)
conf.register_opts(SINK_FAKE_OPTS, group=SINK_FAKE_GROUP)
conf.register_group(SINK_NEUTRON_GROUP)
conf.register_opts(SINK_NEUTRON_OPTS, group=SINK_NEUTRON_GROUP)
conf.register_group(SINK_NOVA_GROUP)

View File

@ -0,0 +1,33 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
from designate.notification_handler import base
LOG = logging.getLogger(__name__)
class FakeHandler(base.NotificationHandler):
__plugin_name__ = 'fake'
def get_exchange_topics(self):
exchange = cfg.CONF[self.name].control_exchange
topics = cfg.CONF[self.name].notification_topics
return exchange, topics
def get_event_types(self):
return cfg.CONF[self.name].allowed_event_types
def process_notification(self, context, event_type, payload):
LOG.info('%s: received notification - %s',
self.name, event_type)

View File

@ -0,0 +1,63 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.mport threading
import mock
import oslotest.base
from oslo_config import cfg
from designate.notification_handler import fake
from designate.tests import test_notification_handler
CONF = cfg.CONF
class TestFakeHandler(oslotest.base.BaseTestCase,
test_notification_handler.NotificationHandlerMixin):
@mock.patch('designate.rpc.get_client')
def setUp(self, mock_get_instance):
super(TestFakeHandler, self).setUp()
CONF.set_override(
'enabled_notification_handlers',
[fake.FakeHandler.__plugin_name__],
'service:sink'
)
CONF.set_override(
'allowed_event_types', ['compute.instance.create.end'],
'handler:fake'
)
self.handler = fake.FakeHandler()
def test_get_name(self):
self.assertEqual(
self.handler.name,
'handler:fake'
)
def test_get_canonical_name(self):
self.assertEqual(
self.handler.get_canonical_name(),
'handler:fake'
)
def test_get_exchange_topics(self):
self.assertEqual(
self.handler.get_exchange_topics(),
('fake', ['notifications'])
)
def test_get_event_types(self):
self.assertEqual(
self.handler.get_event_types(),
['compute.instance.create.end']
)

View File

View File

@ -0,0 +1,84 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.mport threading
import mock
import oslotest.base
from oslo_config import cfg
from designate.notification_handler import fake
from designate.sink import service
from designate.tests import fixtures
from designate.tests import test_notification_handler
CONF = cfg.CONF
class TestSinkNotification(oslotest.base.BaseTestCase,
test_notification_handler.NotificationHandlerMixin):
def setUp(self):
super(TestSinkNotification, self).setUp()
self.stdlog = fixtures.StandardLogging()
self.useFixture(self.stdlog)
CONF.set_override(
'enabled_notification_handlers',
[fake.FakeHandler.__plugin_name__],
'service:sink'
)
CONF.set_override(
'allowed_event_types', ['compute.instance.create.end'],
'handler:fake'
)
self.context = mock.Mock()
self.service = service.Service()
def test_notification(self):
event_type = 'compute.instance.create.end'
fixture = self.get_notification_fixture('nova', event_type)
self.service.info(self.context, None, event_type,
fixture['payload'], None)
self.assertIn(
'handler:fake: received notification - %s' % event_type,
self.stdlog.logger.output
)
def test_notification_with_unknown_event(self):
event_type = 'compute.instance.create.start'
fixture = self.get_notification_fixture('nova', event_type)
self.service.info(self.context, None, event_type,
fixture['payload'], None)
self.assertNotIn(
'handler:fake: received notification - %s' % event_type,
self.stdlog.logger.output
)
def test_notification_without_handler(self):
CONF.set_override('enabled_notification_handlers', [], 'service:sink')
self.service = service.Service()
event_type = 'compute.instance.create.end'
fixture = self.get_notification_fixture('nova', event_type)
self.service.info(self.context, None, event_type,
fixture['payload'], None)
self.assertIn(
'No designate-sink handlers enabled or loaded',
self.stdlog.logger.output
)

View File

@ -0,0 +1,45 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.mport threading
import mock
import designate.rpc
from designate import tests
from designate.sink import service
from designate.tests import fixtures
class TestSinkService(tests.TestCase):
def setUp(self):
super(TestSinkService, self).setUp()
self.stdlog = fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.CONF.set_override('enabled_notification_handlers', ['fake'],
'service:sink')
self.service = service.Service()
@mock.patch.object(designate.rpc, 'get_notification_listener')
def test_service_start(self, mock_notification_listener):
self.service.start()
self.assertTrue(mock_notification_listener.called)
@mock.patch.object(designate.rpc, 'get_notification_listener')
def test_service_stop(self, mock_notification_listener):
self.service.stop()
self.assertIn('Stopping sink service', self.stdlog.logger.output)
def test_service_name(self):
self.assertEqual('sink', self.service.service_name)

View File

@ -75,6 +75,7 @@ designate.pool_manager.cache =
sqlalchemy = designate.pool_manager.cache.impl_sqlalchemy:SQLAlchemyPoolManagerCache
designate.notification.handler =
fake = designate.notification_handler.fake:FakeHandler
nova_fixed = designate.notification_handler.nova:NovaFixedHandler
neutron_floatingip = designate.notification_handler.neutron:NeutronFloatingHandler