nova/nova/tests/unit/test_rpc.py
Dan Smith 9dcab12b7c Make RPCFixture support multiple connections
For testing cells, we will need to track the driver instances that
we give out by url. This normally just works with a conventional
oslo.messaging driver, but the fake driver keeps internal data
structures for simulating its bus. If we end up with clients creating
a new instance of the driver in the rpc switching code, we'll never
be able to send messages to services because we'll always have
private/separate data structures.

So, this makes the fixture wrap the transport creation stuff
and unify references by url. In order to make this work, some
retooling of rpc.init() is done, which makes it more in line with
the recent additions we had for wrapping transport initialization
per connection anyway.

For now, a lot of our tests can't handle the possibility of
multiple RPC connections due to them looking at the global
transport_url configuration. So for the moment, even though this
makes the fixture support multiple independent connections, we
collapse any such attempts down to a single connection to the
default broker.

Note: this requires a fix in oslo.messaging 5.14.0

Depends-On: I01b6f5a20ea9752da46a546a908bd38cf11da681
Change-Id: Icb63d7dabd17f3c5633387793f68a8ba20863a7e
2016-12-07 06:53:57 -08:00

544 lines
21 KiB
Python

# Copyright 2016 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import datetime
import fixtures
import mock
import oslo_messaging as messaging
from oslo_serialization import jsonutils
from oslo_utils import fixture as utils_fixture
import testtools
from nova import context
from nova import exception
from nova import objects
from nova import rpc
from nova import test
from nova.tests import uuidsentinel as uuids
# Make a class that resets all of the global variables in nova.rpc
class RPCResetFixture(fixtures.Fixture):
def _setUp(self):
self.trans = copy.copy(rpc.TRANSPORT)
self.noti_trans = copy.copy(rpc.NOTIFICATION_TRANSPORT)
self.noti = copy.copy(rpc.NOTIFIER)
self.all_mods = copy.copy(rpc.ALLOWED_EXMODS)
self.ext_mods = copy.copy(rpc.EXTRA_EXMODS)
self.addCleanup(self._reset_everything)
def _reset_everything(self):
rpc.TRANSPORT = self.trans
rpc.NOTIFICATION_TRANSPORT = self.noti_trans
rpc.NOTIFIER = self.noti
rpc.ALLOWED_EXMODS = self.all_mods
rpc.EXTRA_EXMODS = self.ext_mods
# We can't import nova.test.TestCase because that sets up an RPCFixture
# that pretty much nullifies all of this testing
class TestRPC(testtools.TestCase):
def setUp(self):
super(TestRPC, self).setUp()
self.useFixture(RPCResetFixture())
@mock.patch.object(rpc, 'get_allowed_exmods')
@mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'get_notification_transport')
@mock.patch.object(messaging, 'Notifier')
def test_init_unversioned(self, mock_notif, mock_noti_trans,
mock_ser, mock_exmods):
# The expected call to get the legacy notifier will require no new
# kwargs, and we expect the new notifier will need the noop driver
expected = [{}, {'driver': 'noop'}]
self._test_init(mock_notif, mock_noti_trans, mock_ser,
mock_exmods, 'unversioned', expected)
@mock.patch.object(rpc, 'get_allowed_exmods')
@mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'get_notification_transport')
@mock.patch.object(messaging, 'Notifier')
def test_init_both(self, mock_notif, mock_noti_trans,
mock_ser, mock_exmods):
expected = [{}, {'topics': ['versioned_notifications']}]
self._test_init(mock_notif, mock_noti_trans, mock_ser,
mock_exmods, 'both', expected)
@mock.patch.object(rpc, 'get_allowed_exmods')
@mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'get_notification_transport')
@mock.patch.object(messaging, 'Notifier')
def test_init_versioned(self, mock_notif, mock_noti_trans,
mock_ser, mock_exmods):
expected = [{'driver': 'noop'},
{'topics': ['versioned_notifications']}]
self._test_init(mock_notif, mock_noti_trans, mock_ser,
mock_exmods, 'versioned', expected)
def test_cleanup_transport_null(self):
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
rpc.LEGACY_NOTIFIER = mock.Mock()
rpc.NOTIFIER = mock.Mock()
self.assertRaises(AssertionError, rpc.cleanup)
def test_cleanup_notification_transport_null(self):
rpc.TRANSPORT = mock.Mock()
rpc.NOTIFIER = mock.Mock()
self.assertRaises(AssertionError, rpc.cleanup)
def test_cleanup_legacy_notifier_null(self):
rpc.TRANSPORT = mock.Mock()
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
rpc.NOTIFIER = mock.Mock()
def test_cleanup_notifier_null(self):
rpc.TRANSPORT = mock.Mock()
rpc.LEGACY_NOTIFIER = mock.Mock()
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
self.assertRaises(AssertionError, rpc.cleanup)
def test_cleanup(self):
rpc.LEGACY_NOTIFIER = mock.Mock()
rpc.NOTIFIER = mock.Mock()
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
rpc.TRANSPORT = mock.Mock()
trans_cleanup = mock.Mock()
not_trans_cleanup = mock.Mock()
rpc.TRANSPORT.cleanup = trans_cleanup
rpc.NOTIFICATION_TRANSPORT.cleanup = not_trans_cleanup
rpc.cleanup()
trans_cleanup.assert_called_once_with()
not_trans_cleanup.assert_called_once_with()
self.assertIsNone(rpc.TRANSPORT)
self.assertIsNone(rpc.NOTIFICATION_TRANSPORT)
self.assertIsNone(rpc.LEGACY_NOTIFIER)
self.assertIsNone(rpc.NOTIFIER)
@mock.patch.object(messaging, 'set_transport_defaults')
def test_set_defaults(self, mock_set):
control_exchange = mock.Mock()
rpc.set_defaults(control_exchange)
mock_set.assert_called_once_with(control_exchange)
def test_add_extra_exmods(self):
rpc.EXTRA_EXMODS = []
rpc.add_extra_exmods('foo', 'bar')
self.assertEqual(['foo', 'bar'], rpc.EXTRA_EXMODS)
def test_clear_extra_exmods(self):
rpc.EXTRA_EXMODS = ['foo', 'bar']
rpc.clear_extra_exmods()
self.assertEqual(0, len(rpc.EXTRA_EXMODS))
def test_get_allowed_exmods(self):
rpc.ALLOWED_EXMODS = ['foo']
rpc.EXTRA_EXMODS = ['bar']
exmods = rpc.get_allowed_exmods()
self.assertEqual(['foo', 'bar'], exmods)
@mock.patch.object(messaging, 'TransportURL')
def test_get_transport_url(self, mock_url):
conf = mock.Mock()
rpc.CONF = conf
mock_url.parse.return_value = 'foo'
url = rpc.get_transport_url(url_str='bar')
self.assertEqual('foo', url)
mock_url.parse.assert_called_once_with(conf, 'bar',
rpc.TRANSPORT_ALIASES)
@mock.patch.object(messaging, 'TransportURL')
def test_get_transport_url_null(self, mock_url):
conf = mock.Mock()
rpc.CONF = conf
mock_url.parse.return_value = 'foo'
url = rpc.get_transport_url()
self.assertEqual('foo', url)
mock_url.parse.assert_called_once_with(conf, None,
rpc.TRANSPORT_ALIASES)
@mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'RPCClient')
def test_get_client(self, mock_client, mock_ser):
rpc.TRANSPORT = mock.Mock()
tgt = mock.Mock()
ser = mock.Mock()
mock_client.return_value = 'client'
mock_ser.return_value = ser
client = rpc.get_client(tgt, version_cap='1.0', serializer='foo')
mock_ser.assert_called_once_with('foo')
mock_client.assert_called_once_with(rpc.TRANSPORT,
tgt, version_cap='1.0',
serializer=ser)
self.assertEqual('client', client)
@mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'get_rpc_server')
def test_get_server(self, mock_get, mock_ser):
rpc.TRANSPORT = mock.Mock()
ser = mock.Mock()
tgt = mock.Mock()
ends = mock.Mock()
mock_ser.return_value = ser
mock_get.return_value = 'server'
server = rpc.get_server(tgt, ends, serializer='foo')
mock_ser.assert_called_once_with('foo')
mock_get.assert_called_once_with(rpc.TRANSPORT, tgt, ends,
executor='eventlet', serializer=ser)
self.assertEqual('server', server)
def test_get_notifier(self):
rpc.LEGACY_NOTIFIER = mock.Mock()
mock_prep = mock.Mock()
mock_prep.return_value = 'notifier'
rpc.LEGACY_NOTIFIER.prepare = mock_prep
notifier = rpc.get_notifier('service', publisher_id='foo')
mock_prep.assert_called_once_with(publisher_id='foo')
self.assertIsInstance(notifier, rpc.LegacyValidatingNotifier)
self.assertEqual('notifier', notifier.notifier)
def test_get_notifier_null_publisher(self):
rpc.LEGACY_NOTIFIER = mock.Mock()
mock_prep = mock.Mock()
mock_prep.return_value = 'notifier'
rpc.LEGACY_NOTIFIER.prepare = mock_prep
notifier = rpc.get_notifier('service', host='bar')
mock_prep.assert_called_once_with(publisher_id='service.bar')
self.assertIsInstance(notifier, rpc.LegacyValidatingNotifier)
self.assertEqual('notifier', notifier.notifier)
def test_get_versioned_notifier(self):
rpc.NOTIFIER = mock.Mock()
mock_prep = mock.Mock()
mock_prep.return_value = 'notifier'
rpc.NOTIFIER.prepare = mock_prep
notifier = rpc.get_versioned_notifier('service.foo')
mock_prep.assert_called_once_with(publisher_id='service.foo')
self.assertEqual('notifier', notifier)
@mock.patch.object(rpc, 'get_allowed_exmods')
@mock.patch.object(messaging, 'get_transport')
def test_create_transport(self, mock_transport, mock_exmods):
exmods = mock_exmods.return_value
transport = rpc.create_transport(mock.sentinel.url)
self.assertEqual(mock_transport.return_value, transport)
mock_exmods.assert_called_once_with()
mock_transport.assert_called_once_with(rpc.CONF,
url=mock.sentinel.url,
allowed_remote_exmods=exmods,
aliases=rpc.TRANSPORT_ALIASES)
def _test_init(self, mock_notif, mock_noti_trans, mock_ser,
mock_exmods, notif_format, expected_driver_topic_kwargs):
legacy_notifier = mock.Mock()
notifier = mock.Mock()
notif_transport = mock.Mock()
transport = mock.Mock()
serializer = mock.Mock()
conf = mock.Mock()
conf.transport_url = None
conf.notification_format = notif_format
mock_exmods.return_value = ['foo']
mock_noti_trans.return_value = notif_transport
mock_ser.return_value = serializer
mock_notif.side_effect = [legacy_notifier, notifier]
@mock.patch.object(rpc, 'CONF', new=conf)
@mock.patch.object(rpc, 'create_transport')
@mock.patch.object(rpc, 'get_transport_url')
def _test(get_url, create_transport):
create_transport.return_value = transport
rpc.init(conf)
create_transport.assert_called_once_with(get_url.return_value)
_test()
self.assertTrue(mock_exmods.called)
self.assertIsNotNone(rpc.TRANSPORT)
self.assertIsNotNone(rpc.LEGACY_NOTIFIER)
self.assertIsNotNone(rpc.NOTIFIER)
self.assertEqual(legacy_notifier, rpc.LEGACY_NOTIFIER)
self.assertEqual(notifier, rpc.NOTIFIER)
expected_calls = []
for kwargs in expected_driver_topic_kwargs:
expected_kwargs = {'serializer': serializer}
expected_kwargs.update(kwargs)
expected_calls.append(((notif_transport,), expected_kwargs))
self.assertEqual(expected_calls, mock_notif.call_args_list,
"The calls to messaging.Notifier() did not create "
"the legacy and versioned notifiers properly.")
class TestJsonPayloadSerializer(test.NoDBTestCase):
def test_serialize_entity(self):
with mock.patch.object(jsonutils, 'to_primitive') as mock_prim:
rpc.JsonPayloadSerializer.serialize_entity('context', 'entity')
mock_prim.assert_called_once_with('entity', convert_instances=True)
class TestRequestContextSerializer(test.NoDBTestCase):
def setUp(self):
super(TestRequestContextSerializer, self).setUp()
self.mock_base = mock.Mock()
self.ser = rpc.RequestContextSerializer(self.mock_base)
self.ser_null = rpc.RequestContextSerializer(None)
def test_serialize_entity(self):
self.mock_base.serialize_entity.return_value = 'foo'
ser_ent = self.ser.serialize_entity('context', 'entity')
self.mock_base.serialize_entity.assert_called_once_with('context',
'entity')
self.assertEqual('foo', ser_ent)
def test_serialize_entity_null_base(self):
ser_ent = self.ser_null.serialize_entity('context', 'entity')
self.assertEqual('entity', ser_ent)
def test_deserialize_entity(self):
self.mock_base.deserialize_entity.return_value = 'foo'
deser_ent = self.ser.deserialize_entity('context', 'entity')
self.mock_base.deserialize_entity.assert_called_once_with('context',
'entity')
self.assertEqual('foo', deser_ent)
def test_deserialize_entity_null_base(self):
deser_ent = self.ser_null.deserialize_entity('context', 'entity')
self.assertEqual('entity', deser_ent)
def test_serialize_context(self):
context = mock.Mock()
self.ser.serialize_context(context)
context.to_dict.assert_called_once_with()
@mock.patch.object(context, 'RequestContext')
def test_deserialize_context(self, mock_req):
self.ser.deserialize_context('context')
mock_req.from_dict.assert_called_once_with('context')
class TestClientRouter(test.NoDBTestCase):
@mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid')
@mock.patch('nova.rpc.create_transport')
@mock.patch('oslo_messaging.RPCClient')
def test_by_instance(self, mock_rpcclient, mock_create, mock_get):
default_client = mock.Mock()
cell_client = mock.Mock()
mock_rpcclient.return_value = cell_client
ctxt = mock.Mock()
cm = objects.CellMapping(uuid=uuids.cell_mapping,
transport_url='fake:///')
mock_get.return_value = objects.InstanceMapping(cell_mapping=cm)
instance = objects.Instance(uuid=uuids.instance)
router = rpc.ClientRouter(default_client)
client = router.by_instance(ctxt, instance)
mock_get.assert_called_once_with(ctxt, instance.uuid)
# verify a client was created by ClientRouter
mock_rpcclient.assert_called_once_with(
mock_create.return_value, default_client.target,
version_cap=default_client.version_cap,
serializer=default_client.serializer)
# verify cell client was returned
self.assertEqual(cell_client, client)
# reset and check that cached client is returned the second time
mock_rpcclient.reset_mock()
mock_create.reset_mock()
mock_get.reset_mock()
client = router.by_instance(ctxt, instance)
mock_get.assert_called_once_with(ctxt, instance.uuid)
mock_rpcclient.assert_not_called()
mock_create.assert_not_called()
self.assertEqual(cell_client, client)
@mock.patch('nova.objects.HostMapping.get_by_host')
@mock.patch('nova.rpc.create_transport')
@mock.patch('oslo_messaging.RPCClient')
def test_by_host(self, mock_rpcclient, mock_create, mock_get):
default_client = mock.Mock()
cell_client = mock.Mock()
mock_rpcclient.return_value = cell_client
ctxt = mock.Mock()
cm = objects.CellMapping(uuid=uuids.cell_mapping,
transport_url='fake:///')
mock_get.return_value = objects.HostMapping(cell_mapping=cm)
host = 'fake-host'
router = rpc.ClientRouter(default_client)
client = router.by_host(ctxt, host)
mock_get.assert_called_once_with(ctxt, host)
# verify a client was created by ClientRouter
mock_rpcclient.assert_called_once_with(
mock_create.return_value, default_client.target,
version_cap=default_client.version_cap,
serializer=default_client.serializer)
# verify cell client was returned
self.assertEqual(cell_client, client)
# reset and check that cached client is returned the second time
mock_rpcclient.reset_mock()
mock_create.reset_mock()
mock_get.reset_mock()
client = router.by_host(ctxt, host)
mock_get.assert_called_once_with(ctxt, host)
mock_rpcclient.assert_not_called()
mock_create.assert_not_called()
self.assertEqual(cell_client, client)
@mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid',
side_effect=exception.InstanceMappingNotFound(uuid=uuids.instance))
@mock.patch('nova.rpc.create_transport')
@mock.patch('oslo_messaging.RPCClient')
def test_by_instance_not_found(self, mock_rpcclient, mock_create,
mock_get):
default_client = mock.Mock()
cell_client = mock.Mock()
mock_rpcclient.return_value = cell_client
ctxt = mock.Mock()
instance = objects.Instance(uuid=uuids.instance)
router = rpc.ClientRouter(default_client)
client = router.by_instance(ctxt, instance)
mock_get.assert_called_once_with(ctxt, instance.uuid)
mock_rpcclient.assert_not_called()
mock_create.assert_not_called()
# verify default client was returned
self.assertEqual(default_client, client)
@mock.patch('nova.objects.HostMapping.get_by_host',
side_effect=exception.HostMappingNotFound(name='fake-host'))
@mock.patch('nova.rpc.create_transport')
@mock.patch('oslo_messaging.RPCClient')
def test_by_host_not_found(self, mock_rpcclient, mock_create, mock_get):
default_client = mock.Mock()
cell_client = mock.Mock()
mock_rpcclient.return_value = cell_client
ctxt = mock.Mock()
host = 'fake-host'
router = rpc.ClientRouter(default_client)
client = router.by_host(ctxt, host)
mock_get.assert_called_once_with(ctxt, host)
mock_rpcclient.assert_not_called()
mock_create.assert_not_called()
# verify default client was returned
self.assertEqual(default_client, client)
@mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid')
@mock.patch('nova.rpc.create_transport')
@mock.patch('oslo_messaging.RPCClient')
def test_remove_stale_clients(self, mock_rpcclient, mock_create, mock_get):
t0 = datetime.datetime(2016, 8, 9, 0, 0, 0)
time_fixture = self.useFixture(utils_fixture.TimeFixture(t0))
default_client = mock.Mock()
ctxt = mock.Mock()
cm1 = objects.CellMapping(uuid=uuids.cell_mapping1,
transport_url='fake:///')
cm2 = objects.CellMapping(uuid=uuids.cell_mapping2,
transport_url='fake:///')
cm3 = objects.CellMapping(uuid=uuids.cell_mapping3,
transport_url='fake:///')
mock_get.side_effect = [objects.InstanceMapping(cell_mapping=cm1),
objects.InstanceMapping(cell_mapping=cm2),
objects.InstanceMapping(cell_mapping=cm3),
objects.InstanceMapping(cell_mapping=cm3)]
instance1 = objects.Instance(uuid=uuids.instance1)
instance2 = objects.Instance(uuid=uuids.instance2)
instance3 = objects.Instance(uuid=uuids.instance3)
router = rpc.ClientRouter(default_client)
cell1_client = router.by_instance(ctxt, instance1)
cell2_client = router.by_instance(ctxt, instance2)
# default client, cell1 client, cell2 client
self.assertEqual(3, len(router.clients))
expected = {'default': default_client,
uuids.cell_mapping1: cell1_client,
uuids.cell_mapping2: cell2_client}
for client_id, client in expected.items():
self.assertEqual(client, router.clients[client_id].client)
# expire cell1 client and cell2 client
time_fixture.advance_time_seconds(80)
# add cell3 client
cell3_client = router.by_instance(ctxt, instance3)
router._remove_stale_clients(ctxt)
# default client, cell3 client
expected = {'default': default_client,
uuids.cell_mapping3: cell3_client}
self.assertEqual(2, len(router.clients))
for client_id, client in expected.items():
self.assertEqual(client, router.clients[client_id].client)
# expire cell3 client
time_fixture.advance_time_seconds(80)
# access cell3 client to refresh it
cell3_client = router.by_instance(ctxt, instance3)
router._remove_stale_clients(ctxt)
# default client and cell3 client should be there
self.assertEqual(2, len(router.clients))
for client_id, client in expected.items():
self.assertEqual(client, router.clients[client_id].client)