Modernized RPC impl and fixed service tests
- Fixed mdns service tests. - Properly clean up NOTIFICATION_TRANSPORT. - Modernized RPC implementation. Change-Id: Ic577388ce49cbe49ce16d81c0fa5a5b2506cc07f
This commit is contained in:
parent
46d167aec9
commit
3e3592666a
|
@ -148,7 +148,6 @@ class TsigInfoMiddleware(DNSMiddleware):
|
|||
|
||||
def __init__(self, application, storage):
|
||||
super(TsigInfoMiddleware, self).__init__(application)
|
||||
|
||||
self.storage = storage
|
||||
|
||||
def process_request(self, request):
|
||||
|
|
|
@ -21,6 +21,7 @@ class Base(Exception):
|
|||
error_type = None
|
||||
error_message = None
|
||||
errors = None
|
||||
expected = False
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.errors = kwargs.pop('errors', None)
|
||||
|
|
|
@ -58,6 +58,7 @@ class Service(service.DNSService, service.RPCService, service.Service):
|
|||
application = handler.RequestHandler(self.storage, self.tg)
|
||||
application = dnsutils.TsigInfoMiddleware(application, self.storage)
|
||||
application = dnsutils.SerializationMiddleware(
|
||||
application, dnsutils.TsigKeyring(self.storage))
|
||||
application, dnsutils.TsigKeyring(self.storage)
|
||||
)
|
||||
|
||||
return application
|
||||
|
|
|
@ -25,18 +25,16 @@ __all__ = [
|
|||
'get_notifier',
|
||||
]
|
||||
|
||||
|
||||
from oslo_config import cfg
|
||||
import oslo_messaging as messaging
|
||||
from oslo_messaging.rpc import server as rpc_server
|
||||
from oslo_messaging.rpc import dispatcher as rpc_dispatcher
|
||||
from oslo_messaging.rpc import server as rpc_server
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
import designate.context
|
||||
import designate.exceptions
|
||||
from designate import objects
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
TRANSPORT = None
|
||||
NOTIFIER = None
|
||||
|
@ -74,11 +72,15 @@ def initialized():
|
|||
|
||||
def cleanup():
|
||||
global TRANSPORT, NOTIFIER, NOTIFICATION_TRANSPORT
|
||||
assert TRANSPORT is not None
|
||||
assert NOTIFICATION_TRANSPORT is not None
|
||||
assert NOTIFIER is not None
|
||||
if TRANSPORT is None:
|
||||
raise AssertionError("'TRANSPORT' must not be None")
|
||||
if NOTIFICATION_TRANSPORT is None:
|
||||
raise AssertionError("'NOTIFICATION_TRANSPORT' must not be None")
|
||||
if NOTIFIER is None:
|
||||
raise AssertionError("'NOTIFIER' must not be None")
|
||||
TRANSPORT.cleanup()
|
||||
TRANSPORT = NOTIFIER = None
|
||||
NOTIFICATION_TRANSPORT.cleanup()
|
||||
TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None
|
||||
|
||||
|
||||
def set_defaults(control_exchange):
|
||||
|
@ -161,15 +163,13 @@ class RequestContextSerializer(messaging.Serializer):
|
|||
|
||||
|
||||
class RPCDispatcher(rpc_dispatcher.RPCDispatcher):
|
||||
|
||||
def dispatch(self, *args, **kwds):
|
||||
try:
|
||||
return super(RPCDispatcher, self).dispatch(*args, **kwds)
|
||||
except Exception as e:
|
||||
if getattr(e, 'expected', False):
|
||||
except designate.exceptions.Base as e:
|
||||
if e.expected:
|
||||
raise rpc_dispatcher.ExpectedException()
|
||||
else:
|
||||
raise
|
||||
raise
|
||||
|
||||
|
||||
def get_transport_url(url_str=None):
|
||||
|
@ -177,41 +177,53 @@ def get_transport_url(url_str=None):
|
|||
|
||||
|
||||
def get_client(target, version_cap=None, serializer=None):
|
||||
assert TRANSPORT is not None
|
||||
if TRANSPORT is None:
|
||||
raise AssertionError("'TRANSPORT' must not be None")
|
||||
if serializer is None:
|
||||
serializer = DesignateObjectSerializer()
|
||||
serializer = RequestContextSerializer(serializer)
|
||||
return messaging.RPCClient(TRANSPORT,
|
||||
target,
|
||||
version_cap=version_cap,
|
||||
serializer=serializer)
|
||||
return messaging.RPCClient(
|
||||
TRANSPORT,
|
||||
target,
|
||||
version_cap=version_cap,
|
||||
serializer=serializer
|
||||
)
|
||||
|
||||
|
||||
def get_server(target, endpoints, serializer=None):
|
||||
assert TRANSPORT is not None
|
||||
if TRANSPORT is None:
|
||||
raise AssertionError("'TRANSPORT' must not be None")
|
||||
if serializer is None:
|
||||
serializer = DesignateObjectSerializer()
|
||||
serializer = RequestContextSerializer(serializer)
|
||||
access_policy = rpc_dispatcher.DefaultRPCAccessPolicy
|
||||
dispatcher = RPCDispatcher(endpoints, serializer, access_policy)
|
||||
return rpc_server.RPCServer(
|
||||
TRANSPORT, target, dispatcher, 'eventlet')
|
||||
TRANSPORT,
|
||||
target,
|
||||
dispatcher=dispatcher,
|
||||
executor='eventlet',
|
||||
)
|
||||
|
||||
|
||||
def get_listener(targets, endpoints, serializer=None, pool=None):
|
||||
assert TRANSPORT is not None
|
||||
def get_notification_listener(targets, endpoints, serializer=None, pool=None):
|
||||
if NOTIFICATION_TRANSPORT is None:
|
||||
raise AssertionError("'NOTIFICATION_TRANSPORT' must not be None")
|
||||
if serializer is None:
|
||||
serializer = JsonPayloadSerializer()
|
||||
return messaging.get_notification_listener(TRANSPORT,
|
||||
targets,
|
||||
endpoints,
|
||||
executor='eventlet',
|
||||
pool=pool,
|
||||
serializer=serializer)
|
||||
return messaging.get_notification_listener(
|
||||
NOTIFICATION_TRANSPORT,
|
||||
targets,
|
||||
endpoints,
|
||||
executor='eventlet',
|
||||
pool=pool,
|
||||
serializer=serializer
|
||||
)
|
||||
|
||||
|
||||
def get_notifier(service=None, host=None, publisher_id=None):
|
||||
assert NOTIFIER is not None
|
||||
if NOTIFIER is None:
|
||||
raise AssertionError("'NOTIFIER' must not be None")
|
||||
if not publisher_id:
|
||||
publisher_id = "%s.%s" % (service, host or CONF.host)
|
||||
return NOTIFIER.prepare(publisher_id=publisher_id)
|
||||
|
|
|
@ -68,11 +68,11 @@ class Service(service.Service):
|
|||
|
||||
# TODO(ekarlso): Change this is to endpoint objects rather then
|
||||
# ourselves?
|
||||
self._server = rpc.get_listener(
|
||||
self._server = rpc.get_notification_listener(
|
||||
targets, [self],
|
||||
pool=cfg.CONF['service:sink'].listener_pool_name)
|
||||
|
||||
if len(targets) > 0:
|
||||
if targets:
|
||||
self._server.start()
|
||||
|
||||
def stop(self):
|
||||
|
|
|
@ -17,60 +17,61 @@
|
|||
|
||||
"""Unit-test MiniDNS service
|
||||
"""
|
||||
import unittest
|
||||
|
||||
from oslotest import base
|
||||
import mock
|
||||
|
||||
from designate.tests.unit import RoObject
|
||||
import designate.rpc
|
||||
import designate.mdns.service as mdns
|
||||
|
||||
# TODO(Federico): fix skipped tests
|
||||
import designate.storage.base as storage
|
||||
|
||||
|
||||
@mock.patch.object(mdns.utils, 'cache_result')
|
||||
@mock.patch.object(mdns.notify, 'NotifyEndpoint')
|
||||
@mock.patch.object(mdns.xfr, 'XfrEndpoint')
|
||||
class MdnsServiceTest(base.BaseTestCase):
|
||||
|
||||
@mock.patch.object(mdns.storage, 'get_storage', name='get_storage')
|
||||
@mock.patch.object(mdns.Service, '_rpc_endpoints')
|
||||
def setUp(self, *mocks):
|
||||
super(MdnsServiceTest, self).setUp()
|
||||
mdns.CONF = RoObject({
|
||||
'service:mdns': RoObject(storage_driver=None)
|
||||
})
|
||||
# _rpc_endpoints is a property
|
||||
mock_rpc_endpoints = mocks[0]
|
||||
mock_rpc_endpoints.__get__ = mock.Mock(
|
||||
return_value=[mock.MagicMock(), mock.MagicMock()]
|
||||
)
|
||||
|
||||
@mock.patch.object(mdns.service.DNSService, '_start')
|
||||
@mock.patch.object(designate.rpc, 'get_server')
|
||||
def test_service_start(self, mock_service_start, mock_rpc_server):
|
||||
self.mdns = mdns.Service()
|
||||
self.mdns.start()
|
||||
|
||||
self.assertTrue(mock_service_start.called)
|
||||
self.assertTrue(mock_rpc_server.called)
|
||||
|
||||
def test_service_name(self):
|
||||
self.mdns = mdns.Service()
|
||||
self.mdns.tg = mock.Mock(name='tg')
|
||||
|
||||
@unittest.skip("Fails with new oslo.messaging release")
|
||||
def test_service_name(self, mc, mn, mx):
|
||||
self.assertEqual('mdns', self.mdns.service_name)
|
||||
|
||||
@unittest.skip("Fails when run together with designate/tests/test_mdns/")
|
||||
def test_rpc_endpoints(self, _, mock_notify, mock_xfr):
|
||||
out = self.mdns._rpc_endpoints
|
||||
self.assertEqual(2, len(out))
|
||||
assert isinstance(out[0], mock.MagicMock), out
|
||||
assert isinstance(out[1], mock.MagicMock), out
|
||||
def test_rpc_endpoints(self):
|
||||
self.mdns = mdns.Service()
|
||||
|
||||
endpoints = self.mdns._rpc_endpoints
|
||||
|
||||
self.assertIsInstance(endpoints[0], mdns.notify.NotifyEndpoint)
|
||||
self.assertIsInstance(endpoints[1], mdns.xfr.XfrEndpoint)
|
||||
|
||||
@mock.patch.object(storage.Storage, 'get_driver')
|
||||
def test_storage_driver(self, mock_get_driver):
|
||||
mock_driver = mock.MagicMock()
|
||||
mock_driver.name = 'noop_driver'
|
||||
mock_get_driver.return_value = mock_driver
|
||||
|
||||
self.mdns = mdns.Service()
|
||||
|
||||
self.assertIsInstance(self.mdns.storage, mock.MagicMock)
|
||||
|
||||
self.assertTrue(mock_get_driver.called)
|
||||
|
||||
@unittest.skip("Fails when run together with designate/tests/test_mdns/")
|
||||
@mock.patch.object(mdns.handler, 'RequestHandler', name='reqh')
|
||||
@mock.patch.object(mdns.dnsutils, 'TsigInfoMiddleware', name='tsig')
|
||||
@mock.patch.object(mdns.dnsutils, 'SerializationMiddleware')
|
||||
def test_dns_application(self, *mocks):
|
||||
mock_serialization, mock_tsiginf, mock_req_handler = mocks[:3]
|
||||
mock_req_handler.return_value = mock.Mock(name='app')
|
||||
@mock.patch.object(mdns.service.DNSService, '_start')
|
||||
@mock.patch.object(mdns.utils, 'cache_result')
|
||||
@mock.patch.object(storage.Storage, 'get_driver')
|
||||
def test_dns_application(self, mock_req_handler, mock_cache_result,
|
||||
mock_service_start, mock_get_driver):
|
||||
mock_driver = mock.MagicMock()
|
||||
mock_driver.name = 'noop_driver'
|
||||
mock_get_driver.return_value = mock_driver
|
||||
|
||||
self.mdns = mdns.Service()
|
||||
|
||||
app = self.mdns._dns_application
|
||||
|
||||
assert isinstance(app, mock.MagicMock), repr(app)
|
||||
assert mock_req_handler.called
|
||||
assert mock_tsiginf.called
|
||||
assert mock_serialization.called
|
||||
self.assertIsInstance(app, mdns.dnsutils.DNSMiddleware)
|
||||
|
|
Loading…
Reference in New Issue