Fix wrong mock assertions in unit tests
'mock' objects don't have method "assert_called_once", but this is called in following unit test suites: manila/tests/network/linux/test_interface.py manila/tests/network/neutron/test_neutron_api.py manila/tests/test_share_generic.py manila/tests/test_service_instance.py manila/tests/test_share_glusterfs.py manila/tests/test_utils.py Make unit tests call proper assertion methods. Change-Id: I96923bd345fc5129963c77dfce3555e7284eb836 Closes-Bug: #1330391
This commit is contained in:
parent
85a406aead
commit
ab37c79a7e
@ -231,7 +231,6 @@ class TestBridgeInterfaceDriver(TestBase):
|
||||
with mock.patch('manila.network.linux.interface.LOG.debug') as log:
|
||||
br = interface.BridgeInterfaceDriver()
|
||||
br.unplug('tap0')
|
||||
log.assert_called_once()
|
||||
|
||||
self.assertTrue(log.called)
|
||||
self.ip_dev.assert_has_calls([mock.call('tap0', None),
|
||||
mock.call().link.delete()])
|
||||
|
@ -95,15 +95,16 @@ class NeutronApiTest(test.TestCase):
|
||||
def _create_neutron_api(self):
|
||||
self.neutron_api = neutron_api.API()
|
||||
|
||||
@mock.patch.object(base, 'Base', fakes.FakeModel)
|
||||
@mock.patch.object(base.Base, '__init__', mock.Mock())
|
||||
@mock.patch.object(context, 'get_admin_context',
|
||||
mock.Mock(return_value='context'))
|
||||
@mock.patch.object(neutron, 'get_client', mock.Mock())
|
||||
def test_create_api_object(self):
|
||||
with mock.patch.object(base.Base, '__init__', mock.Mock()):
|
||||
neutron_api.API()
|
||||
base.Base.__init__.assert_called_once()
|
||||
neutron.get_client.assert_called_once_with('context')
|
||||
neutron_api.API()
|
||||
|
||||
context.get_admin_context.assert_called_once_with()
|
||||
neutron.get_client.assert_called_once_with('context')
|
||||
base.Base.__init__.assert_called_once_with()
|
||||
|
||||
def test_create_port_with_all_args(self):
|
||||
port_args = {'tenant_id': 'test tenant', 'network_id': 'test net',
|
||||
@ -160,7 +161,7 @@ class NeutronApiTest(test.TestCase):
|
||||
self.assertRaises(exception.NetworkException,
|
||||
self.neutron_api.create_port,
|
||||
**port_args)
|
||||
neutron_api.LOG.exception.assert_called_once()
|
||||
self.assertTrue(neutron_api.LOG.exception.called)
|
||||
|
||||
@mock.patch.object(neutron_api.LOG, 'exception', mock.Mock())
|
||||
def test_create_port_exception_status_409(self):
|
||||
@ -177,7 +178,7 @@ class NeutronApiTest(test.TestCase):
|
||||
self.assertRaises(exception.PortLimitExceeded,
|
||||
self.neutron_api.create_port,
|
||||
**port_args)
|
||||
neutron_api.LOG.exception.assert_called_once()
|
||||
self.assertTrue(neutron_api.LOG.exception.called)
|
||||
|
||||
def test_delete_port(self):
|
||||
port_id = 'test port id'
|
||||
@ -299,48 +300,36 @@ class NeutronApiTest(test.TestCase):
|
||||
client_list_routers_mock.assert_any_call()
|
||||
self.assertEqual(networks, fake_routers)
|
||||
|
||||
@mock.patch.object(neutron_api.LOG, 'exception', mock.Mock())
|
||||
def test_create_network_exception(self):
|
||||
net_args = {'tenant_id': 'test tenant', 'name': 'test name'}
|
||||
client_create_network_mock = mock.Mock(
|
||||
side_effect=neutron_client_exc.NeutronClientException)
|
||||
self.stubs.Set(
|
||||
self.neutron_api.client, 'create_network',
|
||||
mock.Mock(side_effect=neutron_client_exc.NeutronClientException))
|
||||
self.assertRaises(
|
||||
exception.NetworkException,
|
||||
self.neutron_api.network_create,
|
||||
**net_args)
|
||||
|
||||
with mock.patch.object(self.neutron_api.client, 'create_network',
|
||||
client_create_network_mock):
|
||||
|
||||
self.assertRaises(exception.NetworkException,
|
||||
self.neutron_api.network_create,
|
||||
**net_args)
|
||||
neutron_api.LOG.exception.assert_called_once()
|
||||
|
||||
@mock.patch.object(neutron_api.LOG, 'exception', mock.Mock())
|
||||
def test_create_subnet_exception(self):
|
||||
subnet_args = {'tenant_id': 'test tenant', 'name': 'test name',
|
||||
'net_id': 'test net id', 'cidr': '10.0.0.0/24'}
|
||||
client_create_subnet_mock = mock.Mock(
|
||||
side_effect=neutron_client_exc.NeutronClientException)
|
||||
self.stubs.Set(
|
||||
self.neutron_api.client, 'create_subnet',
|
||||
mock.Mock(side_effect=neutron_client_exc.NeutronClientException))
|
||||
self.assertRaises(
|
||||
exception.NetworkException,
|
||||
self.neutron_api.subnet_create,
|
||||
**subnet_args)
|
||||
|
||||
with mock.patch.object(self.neutron_api.client, 'create_subnet',
|
||||
client_create_subnet_mock):
|
||||
|
||||
self.assertRaises(exception.NetworkException,
|
||||
self.neutron_api.subnet_create,
|
||||
**subnet_args)
|
||||
neutron_api.LOG.exception.assert_called_once()
|
||||
|
||||
@mock.patch.object(neutron_api.LOG, 'exception', mock.Mock())
|
||||
def test_create_router_exception(self):
|
||||
router_args = {'tenant_id': 'test tenant', 'name': 'test name'}
|
||||
client_create_router_mock = mock.Mock(
|
||||
side_effect=neutron_client_exc.NeutronClientException)
|
||||
|
||||
with mock.patch.object(self.neutron_api.client, 'create_router',
|
||||
client_create_router_mock):
|
||||
|
||||
self.assertRaises(exception.NetworkException,
|
||||
self.neutron_api.router_create,
|
||||
**router_args)
|
||||
neutron_api.LOG.exception.assert_called_once()
|
||||
self.stubs.Set(
|
||||
self.neutron_api.client, 'create_router',
|
||||
mock.Mock(side_effect=neutron_client_exc.NeutronClientException))
|
||||
self.assertRaises(
|
||||
exception.NetworkException,
|
||||
self.neutron_api.router_create,
|
||||
**router_args)
|
||||
|
||||
def test_update_port_fixed_ips(self):
|
||||
port_id = 'test_port'
|
||||
@ -348,20 +337,16 @@ class NeutronApiTest(test.TestCase):
|
||||
port = self.neutron_api.update_port_fixed_ips(port_id, fixed_ips)
|
||||
self.assertEqual(port, fixed_ips)
|
||||
|
||||
@mock.patch.object(neutron_api.LOG, 'exception', mock.Mock())
|
||||
def test_update_port_fixed_ips_exception(self):
|
||||
port_id = 'test_port'
|
||||
fixed_ips = {'fixed_ips': [{'subnet_id': 'test subnet'}]}
|
||||
client_update_port_mock = mock.Mock(
|
||||
side_effect=neutron_client_exc.NeutronClientException)
|
||||
|
||||
with mock.patch.object(self.neutron_api.client, 'update_port',
|
||||
client_update_port_mock):
|
||||
|
||||
self.assertRaises(exception.NetworkException,
|
||||
self.neutron_api.update_port_fixed_ips,
|
||||
port_id, fixed_ips)
|
||||
neutron_api.LOG.exception.assert_called_once()
|
||||
self.stubs.Set(
|
||||
self.neutron_api.client, 'update_port',
|
||||
mock.Mock(side_effect=neutron_client_exc.NeutronClientException))
|
||||
self.assertRaises(
|
||||
exception.NetworkException,
|
||||
self.neutron_api.update_port_fixed_ips,
|
||||
port_id, fixed_ips)
|
||||
|
||||
def test_router_update_routes(self):
|
||||
router_id = 'test_router'
|
||||
@ -370,21 +355,17 @@ class NeutronApiTest(test.TestCase):
|
||||
router = self.neutron_api.router_update_routes(router_id, routes)
|
||||
self.assertEqual(router, routes)
|
||||
|
||||
@mock.patch.object(neutron_api.LOG, 'exception', mock.Mock())
|
||||
def test_router_update_routes_exception(self):
|
||||
router_id = 'test_router'
|
||||
routes = {'routes': [{'destination': '0.0.0.0/0',
|
||||
'nexthop': '8.8.8.8'}]}
|
||||
client_update_router_mock = mock.Mock(
|
||||
side_effect=neutron_client_exc.NeutronClientException)
|
||||
|
||||
with mock.patch.object(self.neutron_api.client, 'update_router',
|
||||
client_update_router_mock):
|
||||
|
||||
self.assertRaises(exception.NetworkException,
|
||||
self.neutron_api.router_update_routes,
|
||||
router_id, routes)
|
||||
neutron_api.LOG.exception.assert_called_once()
|
||||
self.stubs.Set(
|
||||
self.neutron_api.client, 'update_router',
|
||||
mock.Mock(side_effect=neutron_client_exc.NeutronClientException))
|
||||
self.assertRaises(
|
||||
exception.NetworkException,
|
||||
self.neutron_api.router_update_routes,
|
||||
router_id, routes)
|
||||
|
||||
def test_show_router(self):
|
||||
router_id = 'test router id'
|
||||
@ -413,21 +394,17 @@ class NeutronApiTest(test.TestCase):
|
||||
client_add_interface_router_mock.assert_called_once_with(
|
||||
port_id, {'subnet_id': subnet_id, 'port_id': port_id})
|
||||
|
||||
@mock.patch.object(neutron_api.LOG, 'exception', mock.Mock())
|
||||
def test_router_add_interface_exception(self):
|
||||
router_id = 'test port id'
|
||||
subnet_id = 'test subnet id'
|
||||
port_id = 'test port id'
|
||||
client_add_interface_router_mock = mock.Mock(
|
||||
side_effect=neutron_client_exc.NeutronClientException)
|
||||
|
||||
with mock.patch.object(self.neutron_api.client, 'add_interface_router',
|
||||
client_add_interface_router_mock):
|
||||
|
||||
self.assertRaises(exception.NetworkException,
|
||||
self.neutron_api.router_add_interface,
|
||||
router_id, subnet_id, port_id)
|
||||
neutron_api.LOG.exception.assert_called_once()
|
||||
self.stubs.Set(
|
||||
self.neutron_api.client, 'add_interface_router',
|
||||
mock.Mock(side_effect=neutron_client_exc.NeutronClientException))
|
||||
self.assertRaises(
|
||||
exception.NetworkException,
|
||||
self.neutron_api.router_add_interface,
|
||||
router_id, subnet_id, port_id)
|
||||
|
||||
|
||||
class TestNeutronClient(test.TestCase):
|
||||
|
@ -166,16 +166,30 @@ class GenericShareDriverTestCase(test.TestCase):
|
||||
self.assertEqual(len(self._driver._helpers), 1)
|
||||
|
||||
def test_create_share(self):
|
||||
volume = 'fake_volume'
|
||||
volume2 = 'fake_volume2'
|
||||
self._helper_nfs.create_export.return_value = 'fakelocation'
|
||||
methods = ('get_service_instance', '_allocate_container',
|
||||
'_attach_volume', '_format_device', '_mount_device')
|
||||
for method in methods:
|
||||
self.stubs.Set(self._driver, method, mock.Mock())
|
||||
result = self._driver.create_share(self._context, self.share,
|
||||
share_server=self.server)
|
||||
for method in methods:
|
||||
getattr(self._driver, method).assert_called_once()
|
||||
self.stubs.Set(self._driver, '_allocate_container',
|
||||
mock.Mock(return_value=volume))
|
||||
self.stubs.Set(self._driver, '_attach_volume',
|
||||
mock.Mock(return_value=volume2))
|
||||
self.stubs.Set(self._driver, '_format_device', mock.Mock())
|
||||
self.stubs.Set(self._driver, '_mount_device', mock.Mock())
|
||||
|
||||
result = self._driver.create_share(
|
||||
self._context, self.share, share_server=self.server)
|
||||
|
||||
self.assertEqual(result, 'fakelocation')
|
||||
self._driver._allocate_container.assert_called_once_with(
|
||||
self._driver.admin_context, self.share)
|
||||
self._driver._attach_volume.assert_called_once_with(
|
||||
self._driver.admin_context, self.share,
|
||||
self.server['backend_details']['instance_id'],
|
||||
volume)
|
||||
self._driver._format_device.assert_called_once_with(
|
||||
self.server['backend_details'], volume2)
|
||||
self._driver._mount_device.assert_called_once_with(
|
||||
self.share, self.server['backend_details'], volume2)
|
||||
|
||||
def test_create_share_exception(self):
|
||||
share = fake_share(share_network_id=None)
|
||||
@ -612,64 +626,83 @@ class GenericShareDriverTestCase(test.TestCase):
|
||||
|
||||
self._driver._deallocate_container(self._context, self.share)
|
||||
|
||||
self._driver._get_volume.assert_called_once()
|
||||
self._driver.volume_api.delete.assert_called_once()
|
||||
self._driver.volume_api.get.assert_called_once()
|
||||
self._driver._get_volume.assert_called_once_with(
|
||||
self._context, self.share['id'])
|
||||
self._driver.volume_api.delete.assert_called_once_with(
|
||||
self._context, fake_vol['id'])
|
||||
self._driver.volume_api.get.assert_called_once_with(
|
||||
self._context, fake_vol['id'])
|
||||
|
||||
def test_create_share_from_snapshot(self):
|
||||
vol1 = 'fake_vol1'
|
||||
vol2 = 'fake_vol2'
|
||||
self._helper_nfs.create_export.return_value = 'fakelocation'
|
||||
methods = ('get_service_instance', '_allocate_container',
|
||||
'_attach_volume', '_mount_device')
|
||||
for method in methods:
|
||||
self.stubs.Set(self._driver, method, mock.Mock())
|
||||
self.stubs.Set(self._driver, '_allocate_container',
|
||||
mock.Mock(return_value=vol1))
|
||||
self.stubs.Set(self._driver, '_attach_volume',
|
||||
mock.Mock(return_value=vol2))
|
||||
self.stubs.Set(self._driver, '_mount_device', mock.Mock())
|
||||
|
||||
result = self._driver.create_share_from_snapshot(
|
||||
self._context,
|
||||
self.share,
|
||||
self.snapshot,
|
||||
share_server=self.server)
|
||||
for method in methods:
|
||||
getattr(self._driver, method).assert_called_once()
|
||||
|
||||
self.assertEqual(result, 'fakelocation')
|
||||
self._driver._allocate_container.assert_called_once_with(
|
||||
self._driver.admin_context, self.share, self.snapshot)
|
||||
self._driver._attach_volume.assert_called_once_with(
|
||||
self._driver.admin_context, self.share,
|
||||
self.server['backend_details']['instance_id'], vol1)
|
||||
self._driver._mount_device.assert_called_once_with(
|
||||
self.share, self.server['backend_details'], vol2)
|
||||
self._helper_nfs.create_export.assert_called_once_with(
|
||||
self.server['backend_details'], self.share['name'])
|
||||
|
||||
def test_delete_share(self):
|
||||
fake_server = fake_compute.FakeServer()
|
||||
self.stubs.Set(self._driver, 'get_service_instance',
|
||||
mock.Mock(return_value=fake_server))
|
||||
self.stubs.Set(self._driver, '_unmount_device', mock.Mock())
|
||||
self.stubs.Set(self._driver, '_detach_volume', mock.Mock())
|
||||
self.stubs.Set(self._driver, '_deallocate_container', mock.Mock())
|
||||
|
||||
self._driver.delete_share(self._context, self.share,
|
||||
share_server=self.server)
|
||||
self._driver.delete_share(
|
||||
self._context, self.share, share_server=self.server)
|
||||
|
||||
self._driver.get_service_instance.assert_called_once()
|
||||
self._driver._unmount_device.assert_called_once()
|
||||
self._driver._detach_volume.assert_called_once()
|
||||
self._driver._deallocate_container.assert_called_once()
|
||||
self._helper_nfs.remove_export.assert_called_once_with(
|
||||
self.server['backend_details'], self.share['name'])
|
||||
self._driver._unmount_device.assert_called_once_with(
|
||||
self.share, self.server['backend_details'])
|
||||
self._driver._detach_volume.assert_called_once_with(
|
||||
self._driver.admin_context, self.share,
|
||||
self.server['backend_details'])
|
||||
self._driver._deallocate_container.assert_called_once_with(
|
||||
self._driver.admin_context, self.share)
|
||||
|
||||
def test_create_snapshot(self):
|
||||
fake_vol = fake_volume.FakeVolume()
|
||||
fake_vol_snap = fake_volume.FakeVolumeSnapshot()
|
||||
fake_vol_snap = fake_volume.FakeVolumeSnapshot(share_id=fake_vol['id'])
|
||||
self.stubs.Set(self._driver, '_get_volume',
|
||||
mock.Mock(return_value=fake_vol))
|
||||
self.stubs.Set(self._driver.volume_api, 'create_snapshot_force',
|
||||
mock.Mock(return_value=fake_vol_snap))
|
||||
|
||||
self._driver.create_snapshot(self._context, self.snapshot,
|
||||
self._driver.create_snapshot(self._context, fake_vol_snap,
|
||||
share_server=self.server)
|
||||
|
||||
self._driver._get_volume.assert_called_once()
|
||||
self._driver._get_volume.assert_called_once_with(
|
||||
self._driver.admin_context, fake_vol_snap['share_id'])
|
||||
self._driver.volume_api.create_snapshot_force.assert_called_once_with(
|
||||
self._context,
|
||||
fake_vol['id'],
|
||||
CONF.volume_snapshot_name_template % self.snapshot['id'],
|
||||
CONF.volume_snapshot_name_template % fake_vol_snap['id'],
|
||||
''
|
||||
)
|
||||
|
||||
def test_delete_snapshot(self):
|
||||
fake_vol_snap = fake_volume.FakeVolumeSnapshot()
|
||||
fake_vol_snap2 = {'id': 'fake_vol_snap2'}
|
||||
self.stubs.Set(self._driver, '_get_volume_snapshot',
|
||||
mock.Mock(return_value=fake_vol_snap))
|
||||
mock.Mock(return_value=fake_vol_snap2))
|
||||
self.stubs.Set(self._driver.volume_api, 'delete_snapshot', mock.Mock())
|
||||
self.stubs.Set(self._driver.volume_api, 'get_snapshot',
|
||||
mock.Mock(side_effect=exception.VolumeSnapshotNotFound(
|
||||
@ -678,29 +711,40 @@ class GenericShareDriverTestCase(test.TestCase):
|
||||
self._driver.delete_snapshot(self._context, fake_vol_snap,
|
||||
share_server=self.server)
|
||||
|
||||
self._driver._get_volume_snapshot.assert_called_once()
|
||||
self._driver.volume_api.delete_snapshot.assert_called_once()
|
||||
self._driver.volume_api.get_snapshot.assert_called_once()
|
||||
self._driver._get_volume_snapshot.assert_called_once_with(
|
||||
self._driver.admin_context, fake_vol_snap['id'])
|
||||
self._driver.volume_api.delete_snapshot.assert_called_once_with(
|
||||
self._driver.admin_context, fake_vol_snap2['id'])
|
||||
self._driver.volume_api.get_snapshot.assert_called_once_with(
|
||||
self._driver.admin_context, fake_vol_snap2['id'])
|
||||
|
||||
def test_ensure_share(self):
|
||||
vol1 = 'fake_vol1'
|
||||
vol2 = 'fake_vol2'
|
||||
self._helper_nfs.create_export.return_value = 'fakelocation'
|
||||
methods = ('get_service_instance', '_get_volume',
|
||||
'_attach_volume', '_mount_device')
|
||||
for method in methods:
|
||||
self.stubs.Set(self._driver, method, mock.Mock())
|
||||
self._driver.ensure_share(self._context, self.share,
|
||||
share_server=self.server)
|
||||
for method in methods:
|
||||
getattr(self._driver, method).assert_called_once()
|
||||
self.stubs.Set(self._driver, '_get_volume',
|
||||
mock.Mock(return_value=vol1))
|
||||
self.stubs.Set(self._driver, '_attach_volume',
|
||||
mock.Mock(return_value=vol2))
|
||||
self.stubs.Set(self._driver, '_mount_device', mock.Mock())
|
||||
|
||||
self._driver.ensure_share(
|
||||
self._context, self.share, share_server=self.server)
|
||||
|
||||
self._driver._get_volume.assert_called_once_with(
|
||||
self._context, self.share['id'])
|
||||
self._driver._attach_volume.assert_called_once_with(
|
||||
self._context, self.share,
|
||||
self.server['backend_details']['instance_id'], vol1)
|
||||
self._driver._mount_device.assert_called_once_with(
|
||||
self.share, self.server['backend_details'], vol2)
|
||||
self._helper_nfs.create_export.assert_called_once_with(
|
||||
self.server['backend_details'], self.share['name'], recreate=True)
|
||||
|
||||
def test_allow_access(self):
|
||||
fake_server = fake_compute.FakeServer()
|
||||
access = {'access_type': 'ip', 'access_to': 'fake_dest'}
|
||||
self.stubs.Set(self._driver, 'get_service_instance',
|
||||
mock.Mock(return_value=fake_server))
|
||||
self._driver.allow_access(self._context, self.share, access,
|
||||
share_server=self.server)
|
||||
self._driver.get_service_instance.assert_called_once()
|
||||
self._driver.allow_access(
|
||||
self._context, self.share, access, share_server=self.server)
|
||||
self._driver._helpers[self.share['share_proto']].\
|
||||
allow_access.assert_called_once_with(
|
||||
self.server['backend_details'],
|
||||
@ -746,10 +790,16 @@ class GenericShareDriverTestCase(test.TestCase):
|
||||
net_info)
|
||||
|
||||
def test_teardown_network(self):
|
||||
sim = self._driver.instance_manager
|
||||
self._driver.service_instance_manager = sim
|
||||
self._driver.teardown_server(self.fake_net_info)
|
||||
sim.delete_service_instance.assert_called_once()
|
||||
server_details = {
|
||||
'instance_id': 'fake_instance_id',
|
||||
'subnet_id': 'fake_subnet_id',
|
||||
'router_id': 'fake_router_id',
|
||||
}
|
||||
self._driver.teardown_server(server_details)
|
||||
self._driver.service_instance_manager.delete_service_instance.\
|
||||
assert_called_once_with(
|
||||
self._driver.admin_context, server_details['instance_id'],
|
||||
server_details['subnet_id'], server_details['router_id'])
|
||||
|
||||
def test_ssh_exec_connection_not_exist(self):
|
||||
ssh_output = 'fake_ssh_output'
|
||||
|
@ -557,7 +557,8 @@ class GlusterfsShareDriverTestCase(test.TestCase):
|
||||
ret = self._driver.allow_access(self._context, self.share, access,
|
||||
share_server)
|
||||
self.assertEqual(ret, None)
|
||||
self._driver._manage_access.assert_called_once()
|
||||
self._driver._manage_access.assert_called_once_with(
|
||||
self._context, self.share, access, mock.ANY)
|
||||
|
||||
def test_deny_access_with_share_having_noaccess(self):
|
||||
access = {'access_type': 'ip', 'access_to': '10.0.0.1'}
|
||||
@ -587,4 +588,5 @@ class GlusterfsShareDriverTestCase(test.TestCase):
|
||||
ret = self._driver.deny_access(self._context, self.share, access,
|
||||
share_server)
|
||||
self.assertEqual(ret, None)
|
||||
self._driver._manage_access.assert_called_once()
|
||||
self._driver._manage_access.assert_called_once_with(
|
||||
self._context, self.share, access, mock.ANY)
|
||||
|
@ -239,34 +239,28 @@ class ServiceInstanceManagerTestCase(test.TestCase):
|
||||
self._context)
|
||||
|
||||
def test_set_up_service_instance(self):
|
||||
fake_server = {'id': 'fake',
|
||||
'ip': '1.2.3.4',
|
||||
'public_address': '1.2.3.4',
|
||||
'subnet_id': 'fake-subnet-id',
|
||||
'router_id': 'fake-router-id',
|
||||
'pk_path': 'path'}
|
||||
fake_server = {
|
||||
'id': 'fake',
|
||||
'ip': '1.2.3.4',
|
||||
'public_address': '1.2.3.4',
|
||||
'subnet_id': 'fake-subnet-id',
|
||||
'router_id': 'fake-router-id',
|
||||
'pk_path': None,
|
||||
'username': self._manager.get_config_option(
|
||||
'service_instance_user'),
|
||||
}
|
||||
expected_details = fake_server.copy()
|
||||
expected_details.pop('pk_path')
|
||||
expected_details['instance_id'] = expected_details.pop('id')
|
||||
if CONF.service_instance_password:
|
||||
expected_details['password'] = CONF.service_instance_password
|
||||
expected_details['username'] = CONF.service_instance_user
|
||||
self.stubs.Set(self._manager, '_get_server_ip',
|
||||
mock.Mock(return_value='fake_ip'))
|
||||
self.stubs.Set(self._manager.compute_api, 'server_list',
|
||||
mock.Mock(return_value=[]))
|
||||
self.stubs.Set(self._manager, '_create_service_instance',
|
||||
mock.Mock(return_value=fake_server))
|
||||
|
||||
result = self._manager.set_up_service_instance(
|
||||
self._context,
|
||||
'fake-inst-name',
|
||||
'fake-net-id',
|
||||
'fake-subnet-id')
|
||||
self._context, 'fake-inst-name', 'fake-net-id', 'fake-subnet-id')
|
||||
|
||||
self._manager.compute_api.server_list.assert_called_once()
|
||||
self._manager._get_server_ip.assert_called_once()
|
||||
self._manager._create_service_instance.assert_called_once()
|
||||
self.assertEqual(result, expected_details)
|
||||
self._manager._create_service_instance.assert_called_once_with(
|
||||
self._context, 'fake-inst-name', 'fake-net-id', 'fake-subnet-id')
|
||||
self.assertEqual(expected_details, result)
|
||||
|
||||
def test_ensure_server(self):
|
||||
server_details = {'instance_id': 'fake_inst_id',
|
||||
@ -340,8 +334,10 @@ class ServiceInstanceManagerTestCase(test.TestCase):
|
||||
self.assertEqual(result,
|
||||
(fake_keypair.name,
|
||||
os.path.expanduser(CONF.path_to_private_key)))
|
||||
self._manager.compute_api.keypair_list.assert_called_once()
|
||||
self._manager.compute_api.keypair_import.assert_called_once()
|
||||
self._manager.compute_api.keypair_list.assert_called_once_with(
|
||||
self._context)
|
||||
self._manager.compute_api.keypair_import.assert_called_once_with(
|
||||
self._context, 'manila-service', '')
|
||||
|
||||
def test_get_key_exists(self):
|
||||
fake_keypair = fake_compute.FakeKeypair(
|
||||
@ -356,7 +352,8 @@ class ServiceInstanceManagerTestCase(test.TestCase):
|
||||
|
||||
result = self._manager._get_key(self._context)
|
||||
|
||||
self._manager.compute_api.keypair_list.assert_called_once()
|
||||
self._manager.compute_api.keypair_list.assert_called_once_with(
|
||||
self._context)
|
||||
self.assertFalse(self._manager.compute_api.keypair_import.called)
|
||||
self.assertEqual(result,
|
||||
(fake_keypair.name,
|
||||
@ -377,8 +374,10 @@ class ServiceInstanceManagerTestCase(test.TestCase):
|
||||
|
||||
result = self._manager._get_key(self._context)
|
||||
|
||||
self._manager.compute_api.keypair_list.assert_called_once()
|
||||
self._manager.compute_api.keypair_delete.assert_called_once()
|
||||
self._manager.compute_api.keypair_list.assert_called_once_with(
|
||||
self._context)
|
||||
self._manager.compute_api.keypair_delete.assert_called_once_with(
|
||||
self._context, fake_keypair.id)
|
||||
self._manager.compute_api.keypair_import.assert_called_once_with(
|
||||
self._context, fake_keypair.name, 'fake_public_key2')
|
||||
self.assertEqual(result,
|
||||
@ -543,7 +542,18 @@ class ServiceInstanceManagerTestCase(test.TestCase):
|
||||
'fake-neutron-net',
|
||||
'fake-neutron-subnet')
|
||||
|
||||
self._manager.compute_api.server_create.assert_called_once()
|
||||
self._manager._get_service_image.assert_called_once_with(self._context)
|
||||
self._manager._get_key.assert_called_once_with(self._context)
|
||||
self._manager._get_or_create_security_group.assert_called_once_with(
|
||||
self._context)
|
||||
self._manager._setup_network_for_instance.assert_called_once_with(
|
||||
'fake-neutron-net', 'fake-neutron-subnet')
|
||||
self._manager._setup_connectivity_with_service_instances.\
|
||||
assert_called_once_with()
|
||||
self._manager.compute_api.server_create.assert_called_once_with(
|
||||
self._context, name='fake-inst-name', image='fake_image_id',
|
||||
flavor=100, key_name='fake_key_name',
|
||||
nics=[{'port-id': fake_port['id']}])
|
||||
self.assertFalse(self._manager.compute_api.server_get.called)
|
||||
self.assertFalse(service_instance.socket.socket.called)
|
||||
|
||||
@ -641,7 +651,7 @@ class ServiceInstanceManagerTestCase(test.TestCase):
|
||||
self._manager.service_network_id,
|
||||
subnet_id='fake_subnet_id',
|
||||
device_owner='manila')
|
||||
self._manager._get_cidr_for_subnet.assert_called_once()
|
||||
self._manager._get_cidr_for_subnet.assert_called_once_with()
|
||||
self.assertIs(network_data.get('service_subnet'), fake_service_subnet)
|
||||
self.assertIs(network_data.get('router'), fake_router)
|
||||
self.assertIs(network_data.get('service_port'), fake_port)
|
||||
@ -791,15 +801,12 @@ class ServiceInstanceManagerTestCase(test.TestCase):
|
||||
|
||||
def test_get_service_port(self):
|
||||
fake_service_port = fake_network.FakePort(device_id='manila-share')
|
||||
fake_service_net = fake_network.FakeNetwork(subnets=[])
|
||||
self.stubs.Set(self._manager.neutron_api, 'list_ports',
|
||||
mock.Mock(return_value=[]))
|
||||
self.stubs.Set(self._manager, '_execute',
|
||||
mock.Mock(return_value=('fake_host', '')))
|
||||
self.stubs.Set(self._manager.neutron_api, 'create_port',
|
||||
mock.Mock(return_value=fake_service_port))
|
||||
self.stubs.Set(self._manager.neutron_api, 'get_network',
|
||||
mock.Mock(return_value=fake_service_net))
|
||||
self.stubs.Set(self._manager.neutron_api, 'update_port_fixed_ips',
|
||||
mock.Mock(return_value=fake_service_port))
|
||||
|
||||
@ -807,7 +814,6 @@ class ServiceInstanceManagerTestCase(test.TestCase):
|
||||
|
||||
self._manager.neutron_api.list_ports.assert_called_once_with(
|
||||
device_id='manila-share')
|
||||
self._manager.db.service_get_all_by_topic.assert_called_once()
|
||||
self._manager.neutron_api.create_port.assert_called_once_with(
|
||||
self._manager.service_tenant_id,
|
||||
self._manager.service_network_id,
|
||||
@ -815,7 +821,7 @@ class ServiceInstanceManagerTestCase(test.TestCase):
|
||||
device_owner='manila:share',
|
||||
host_id='fake_host'
|
||||
)
|
||||
self._manager.neutron_api.get_network.assert_called_once()
|
||||
self._manager._execute.assert_called_once_with('hostname')
|
||||
self.assertFalse(self._manager.neutron_api.
|
||||
update_port_fixed_ips.called)
|
||||
self.assertEqual(result, fake_service_port)
|
||||
@ -830,15 +836,10 @@ class ServiceInstanceManagerTestCase(test.TestCase):
|
||||
|
||||
def test_get_service_port_exists(self):
|
||||
fake_service_port = fake_network.FakePort(device_id='manila-share')
|
||||
fake_service_net = fake_network.FakeNetwork(subnets=[])
|
||||
self.stubs.Set(self._manager.neutron_api, 'list_ports',
|
||||
mock.Mock(return_value=[fake_service_port]))
|
||||
self.stubs.Set(self._manager.db, 'service_get_all_by_topic',
|
||||
mock.Mock(return_value=[{'host': 'fake_host'}]))
|
||||
self.stubs.Set(self._manager.neutron_api, 'create_port',
|
||||
mock.Mock(return_value=fake_service_port))
|
||||
self.stubs.Set(self._manager.neutron_api, 'get_network',
|
||||
mock.Mock(return_value=fake_service_net))
|
||||
self.stubs.Set(self._manager.neutron_api, 'update_port_fixed_ips',
|
||||
mock.Mock(return_value=fake_service_port))
|
||||
|
||||
@ -848,7 +849,6 @@ class ServiceInstanceManagerTestCase(test.TestCase):
|
||||
device_id='manila-share')
|
||||
self.assertFalse(self._manager.db.service_get_all_by_topic.called)
|
||||
self.assertFalse(self._manager.neutron_api.create_port.called)
|
||||
self._manager.neutron_api.get_network.assert_called_once()
|
||||
self.assertFalse(self._manager.neutron_api.
|
||||
update_port_fixed_ips.called)
|
||||
self.assertEqual(result, fake_service_port)
|
||||
|
@ -371,7 +371,7 @@ class GenericUtilsTestCase(test.TestCase):
|
||||
self.assertTrue(utils.is_ipv6_configured())
|
||||
|
||||
open.assert_called_once_with('/proc/net/if_inet6')
|
||||
fake_fd.read.assert_called_once()
|
||||
fake_fd.read.assert_called_once_with(32)
|
||||
|
||||
def test_is_ipv6_configured1(self):
|
||||
fake_fd = mock.Mock()
|
||||
@ -396,7 +396,7 @@ class GenericUtilsTestCase(test.TestCase):
|
||||
with mock.patch.dict('sys.modules', {
|
||||
'eventlet.support.greendns': fake_dns}):
|
||||
self.assertTrue(utils.is_eventlet_bug105())
|
||||
fake_dns.getaddrinfo.assert_called_once()
|
||||
self.assertTrue(fake_dns.getaddrinfo.called)
|
||||
|
||||
def test_is_eventlet_bug105_neg(self):
|
||||
fake_dns = mock.Mock()
|
||||
@ -406,7 +406,7 @@ class GenericUtilsTestCase(test.TestCase):
|
||||
with mock.patch.dict('sys.modules', {
|
||||
'eventlet.support.greendns': fake_dns}):
|
||||
self.assertFalse(utils.is_eventlet_bug105())
|
||||
fake_dns.getaddrinfo.assert_called_once()
|
||||
fake_dns.getaddrinfo.assert_called_once_with('::1', 80)
|
||||
|
||||
|
||||
class MonkeyPatchTestCase(test.TestCase):
|
||||
|
Loading…
Reference in New Issue
Block a user