Remove double mocking

In py310 unittest.mock does not allow to mock the same function twice as
the second mocking will fail to autospec the Mock object created by the
first mocking.

This patch manually fixes the double mocking.

Fixed cases:
1) one of the mock was totally unnecessary so it was removed
2) the second mock specialized the behavior of the first generic mock.
   In this case the second mock is replaced with the configuration of
   the first mock
3) a test case with two test steps mocked the same function for each
   step with overlapping mocks. Here the overlap was removed to have
   the two mock exists independently

The get_connection injection in the libvirt functional test needed a
further tweak (yeah I know it has many already) to act like a single
mock (basically case #2) instead of a temporary re-mocking. Still the
globalness of the get_connection mocking warrant the special set / reset
logic there.

Change-Id: I3998d0d49583806ac1c3ae64f1b1fe343cefd20d
This commit is contained in:
Balazs Gibizer 2022-07-28 19:50:29 +02:00
parent 89ef050b8c
commit f8cf050a13
21 changed files with 537 additions and 667 deletions

View File

@ -355,7 +355,7 @@ class TestCase(base.BaseTestCase):
self.useFixture(fixtures.MonkeyPatch(old, new))
@staticmethod
def patch_exists(patched_path, result):
def patch_exists(patched_path, result, other=None):
"""Provide a static method version of patch_exists(), which if you
haven't already imported nova.test can be slightly easier to
use as a context manager within a test method via:
@ -364,7 +364,7 @@ class TestCase(base.BaseTestCase):
with self.patch_exists(path, True):
...
"""
return patch_exists(patched_path, result)
return patch_exists(patched_path, result, other)
@staticmethod
def patch_open(patched_path, read_data):
@ -848,10 +848,12 @@ class ContainKeyValue(object):
@contextlib.contextmanager
def patch_exists(patched_path, result):
def patch_exists(patched_path, result, other=None):
"""Selectively patch os.path.exists() so that if it's called with
patched_path, return result. Calls with any other path are passed
through to the real os.path.exists() function.
through to the real os.path.exists() function if other is not provided.
If other is provided then that will be the result of the call on paths
other than patched_path.
Either import and use as a decorator / context manager, or use the
nova.TestCase.patch_exists() static method as a context manager.
@ -885,7 +887,10 @@ def patch_exists(patched_path, result):
def fake_exists(path):
if path == patched_path:
return result
return real_exists(path)
elif other is not None:
return other
else:
return real_exists(path)
with mock.patch.object(os.path, "exists") as mock_exists:
mock_exists.side_effect = fake_exists

View File

@ -142,15 +142,15 @@ class ServersTestBase(integrated_helpers._IntegratedTestBase):
pci_info.get_pci_address_mac_mapping())
# This is fun. Firstly we need to do a global'ish mock so we can
# actually start the service.
with mock.patch('nova.virt.libvirt.host.Host.get_connection',
return_value=fake_connection):
compute = self.start_service('compute', host=hostname)
# Once that's done, we need to tweak the compute "service" to
# make sure it returns unique objects. We do this inside the
# mock context to avoid a small window between the end of the
# context and the tweaking where get_connection would revert to
# being an autospec mock.
compute.driver._host.get_connection = lambda: fake_connection
orig_con = self.mock_conn.return_value
self.mock_conn.return_value = fake_connection
compute = self.start_service('compute', host=hostname)
# Once that's done, we need to tweak the compute "service" to
# make sure it returns unique objects.
compute.driver._host.get_connection = lambda: fake_connection
# Then we revert the local mock tweaking so the next compute can
# get its own
self.mock_conn.return_value = orig_con
return compute
# ensure we haven't already registered services with these hostnames

View File

@ -128,7 +128,7 @@ class VTPMServersTest(base.ServersTestBase):
# the presence of users on the host, none of which makes sense here
_p = mock.patch(
'nova.virt.libvirt.driver.LibvirtDriver._check_vtpm_support')
self.mock_conn = _p.start()
_p.start()
self.addCleanup(_p.stop)
self.key_mgr = crypto._get_key_manager()

View File

@ -67,11 +67,11 @@ class RescheduleBuildAvailabilityZoneUpCall(
def wrap_bari(*args, **kwargs):
# Poison the AZ query to blow up as if the cell conductor does not
# have access to the API DB.
patcher = mock.patch('nova.objects.AggregateList.get_by_host',
side_effect=oslo_db_exc.CantStartEngineError)
patcher.start()
self.addCleanup(patcher.stop)
return original_bari(*args, **kwargs)
with mock.patch(
'nova.objects.AggregateList.get_by_host',
side_effect=oslo_db_exc.CantStartEngineError
):
return original_bari(*args, **kwargs)
self.stub_out('nova.compute.manager.ComputeManager.'
'build_and_run_instance', wrap_bari)
@ -81,10 +81,6 @@ class RescheduleBuildAvailabilityZoneUpCall(
# compute service we have to wait for the notification that the build
# is complete and then stop the mock so we can use the API again.
self.notifier.wait_for_versioned_notifications('instance.create.end')
# Note that we use stopall here because we actually called
# build_and_run_instance twice so we have more than one instance of
# the mock that needs to be stopped.
mock.patch.stopall()
server = self._wait_for_state_change(server, 'ACTIVE')
# We should have rescheduled and the instance AZ should be set from the
# Selection object. Since neither compute host is in an AZ, the server
@ -128,20 +124,20 @@ class RescheduleMigrateAvailabilityZoneUpCall(
self.rescheduled = None
def wrap_prep_resize(_self, *args, **kwargs):
# Poison the AZ query to blow up as if the cell conductor does not
# have access to the API DB.
patcher = mock.patch('nova.objects.AggregateList.get_by_host',
side_effect=oslo_db_exc.CantStartEngineError)
self.agg_mock = patcher.start()
self.addCleanup(patcher.stop)
if self.rescheduled is None:
# Track the first host that we rescheduled from.
self.rescheduled = _self.host
# Trigger a reschedule.
raise exception.ComputeResourcesUnavailable(
reason='test_migrate_reschedule_blocked_az_up_call')
return original_prep_resize(_self, *args, **kwargs)
# Poison the AZ query to blow up as if the cell conductor does not
# have access to the API DB.
with mock.patch(
'nova.objects.AggregateList.get_by_host',
side_effect=oslo_db_exc.CantStartEngineError,
) as agg_mock:
self.agg_mock = agg_mock
return original_prep_resize(_self, *args, **kwargs)
self.stub_out('nova.compute.manager.ComputeManager._prep_resize',
wrap_prep_resize)

View File

@ -368,25 +368,23 @@ class HypervisorsTestV21(test.NoDBTestCase):
return TEST_SERVICES[0]
raise exception.ComputeHostNotFound(host=host)
@mock.patch.object(self.controller.host_api, 'compute_node_get_all',
return_value=compute_nodes)
@mock.patch.object(self.controller.host_api,
'service_get_by_compute_host',
fake_service_get_by_compute_host)
def _test(self, compute_node_get_all):
req = self._get_request(True)
result = self.controller.index(req)
self.assertEqual(1, len(result['hypervisors']))
expected = {
'id': compute_nodes[0].uuid if self.expect_uuid_for_id
else compute_nodes[0].id,
'hypervisor_hostname': compute_nodes[0].hypervisor_hostname,
'state': 'up',
'status': 'enabled',
}
self.assertDictEqual(expected, result['hypervisors'][0])
m_get = self.controller.host_api.compute_node_get_all
m_get.side_effect = None
m_get.return_value = compute_nodes
self.controller.host_api.service_get_by_compute_host.side_effect = (
fake_service_get_by_compute_host)
_test(self)
req = self._get_request(True)
result = self.controller.index(req)
self.assertEqual(1, len(result['hypervisors']))
expected = {
'id': compute_nodes[0].uuid if self.expect_uuid_for_id
else compute_nodes[0].id,
'hypervisor_hostname': compute_nodes[0].hypervisor_hostname,
'state': 'up',
'status': 'enabled',
}
self.assertDictEqual(expected, result['hypervisors'][0])
def test_index_compute_host_not_mapped(self):
"""Tests that we don't fail index if a host is not mapped."""
@ -402,25 +400,22 @@ class HypervisorsTestV21(test.NoDBTestCase):
return TEST_SERVICES[0]
raise exception.HostMappingNotFound(name=host)
@mock.patch.object(self.controller.host_api, 'compute_node_get_all',
return_value=compute_nodes)
@mock.patch.object(self.controller.host_api,
'service_get_by_compute_host',
fake_service_get_by_compute_host)
def _test(self, compute_node_get_all):
req = self._get_request(True)
result = self.controller.index(req)
self.assertEqual(1, len(result['hypervisors']))
expected = {
'id': compute_nodes[0].uuid if self.expect_uuid_for_id
else compute_nodes[0].id,
'hypervisor_hostname': compute_nodes[0].hypervisor_hostname,
'state': 'up',
'status': 'enabled',
}
self.assertDictEqual(expected, result['hypervisors'][0])
self.controller.host_api.compute_node_get_all.return_value = (
compute_nodes)
self.controller.host_api.service_get_by_compute_host = (
fake_service_get_by_compute_host)
_test(self)
req = self._get_request(True)
result = self.controller.index(req)
self.assertEqual(1, len(result['hypervisors']))
expected = {
'id': compute_nodes[0].uuid if self.expect_uuid_for_id
else compute_nodes[0].id,
'hypervisor_hostname': compute_nodes[0].hypervisor_hostname,
'state': 'up',
'status': 'enabled',
}
self.assertDictEqual(expected, result['hypervisors'][0])
def test_detail(self):
req = self._get_request(True)
@ -444,32 +439,30 @@ class HypervisorsTestV21(test.NoDBTestCase):
return TEST_SERVICES[0]
raise exception.ComputeHostNotFound(host=host)
@mock.patch.object(self.controller.host_api, 'compute_node_get_all',
return_value=compute_nodes)
@mock.patch.object(self.controller.host_api,
'service_get_by_compute_host',
fake_service_get_by_compute_host)
def _test(self, compute_node_get_all):
req = self._get_request(True)
result = self.controller.detail(req)
self.assertEqual(1, len(result['hypervisors']))
expected = {
'id': compute_nodes[0].id,
'hypervisor_hostname': compute_nodes[0].hypervisor_hostname,
'state': 'up',
'status': 'enabled',
}
# we don't care about all of the details, just make sure we get
# the subset we care about and there are more keys than what index
# would return
hypervisor = result['hypervisors'][0]
self.assertTrue(
set(expected.keys()).issubset(set(hypervisor.keys())))
self.assertGreater(len(hypervisor.keys()), len(expected.keys()))
self.assertEqual(compute_nodes[0].hypervisor_hostname,
hypervisor['hypervisor_hostname'])
m_get = self.controller.host_api.compute_node_get_all
m_get.side_effect = None
m_get.return_value = compute_nodes
self.controller.host_api.service_get_by_compute_host.side_effect = (
fake_service_get_by_compute_host)
_test(self)
req = self._get_request(True)
result = self.controller.detail(req)
self.assertEqual(1, len(result['hypervisors']))
expected = {
'id': compute_nodes[0].id,
'hypervisor_hostname': compute_nodes[0].hypervisor_hostname,
'state': 'up',
'status': 'enabled',
}
# we don't care about all of the details, just make sure we get
# the subset we care about and there are more keys than what index
# would return
hypervisor = result['hypervisors'][0]
self.assertTrue(
set(expected.keys()).issubset(set(hypervisor.keys())))
self.assertGreater(len(hypervisor.keys()), len(expected.keys()))
self.assertEqual(compute_nodes[0].hypervisor_hostname,
hypervisor['hypervisor_hostname'])
def test_detail_compute_host_not_mapped(self):
"""Tests that if a service is deleted but the compute node is not we
@ -487,32 +480,28 @@ class HypervisorsTestV21(test.NoDBTestCase):
return TEST_SERVICES[0]
raise exception.HostMappingNotFound(name=host)
@mock.patch.object(self.controller.host_api, 'compute_node_get_all',
return_value=compute_nodes)
@mock.patch.object(self.controller.host_api,
'service_get_by_compute_host',
fake_service_get_by_compute_host)
def _test(self, compute_node_get_all):
req = self._get_request(True)
result = self.controller.detail(req)
self.assertEqual(1, len(result['hypervisors']))
expected = {
'id': compute_nodes[0].id,
'hypervisor_hostname': compute_nodes[0].hypervisor_hostname,
'state': 'up',
'status': 'enabled',
}
# we don't care about all of the details, just make sure we get
# the subset we care about and there are more keys than what index
# would return
hypervisor = result['hypervisors'][0]
self.assertTrue(
set(expected.keys()).issubset(set(hypervisor.keys())))
self.assertGreater(len(hypervisor.keys()), len(expected.keys()))
self.assertEqual(compute_nodes[0].hypervisor_hostname,
hypervisor['hypervisor_hostname'])
_test(self)
self.controller.host_api.service_get_by_compute_host.side_effect = (
fake_service_get_by_compute_host)
self.controller.host_api.compute_node_get_all.return_value = (
compute_nodes)
req = self._get_request(True)
result = self.controller.detail(req)
self.assertEqual(1, len(result['hypervisors']))
expected = {
'id': compute_nodes[0].id,
'hypervisor_hostname': compute_nodes[0].hypervisor_hostname,
'state': 'up',
'status': 'enabled',
}
# we don't care about all of the details, just make sure we get
# the subset we care about and there are more keys than what index
# would return
hypervisor = result['hypervisors'][0]
self.assertTrue(
set(expected.keys()).issubset(set(hypervisor.keys())))
self.assertGreater(len(hypervisor.keys()), len(expected.keys()))
self.assertEqual(compute_nodes[0].hypervisor_hostname,
hypervisor['hypervisor_hostname'])
def test_show(self):
req = self._get_request(True)
@ -525,21 +514,16 @@ class HypervisorsTestV21(test.NoDBTestCase):
"""Tests that if a service is deleted but the compute node is not we
don't fail when listing hypervisors.
"""
@mock.patch.object(self.controller.host_api, 'compute_node_get',
return_value=self.TEST_HYPERS_OBJ[0])
@mock.patch.object(self.controller.host_api,
'service_get_by_compute_host')
def _test(self, mock_service, mock_compute_node_get):
req = self._get_request(True)
mock_service.side_effect = exception.HostMappingNotFound(
name='foo')
hyper_id = self._get_hyper_id()
self.assertRaises(exc.HTTPNotFound, self.controller.show,
req, hyper_id)
self.assertTrue(mock_service.called)
mock_compute_node_get.assert_called_once_with(mock.ANY, hyper_id)
_test(self)
self.controller.host_api.service_get_by_compute_host.side_effect = (
exception.HostMappingNotFound(name='foo'))
req = self._get_request(True)
hyper_id = self._get_hyper_id()
self.assertRaises(
exc.HTTPNotFound, self.controller.show, req, hyper_id)
self.assertTrue(
self.controller.host_api.service_get_by_compute_host.called)
self.controller.host_api.compute_node_get.assert_called_once_with(
mock.ANY, hyper_id)
def test_show_noid(self):
req = self._get_request(True)
@ -611,20 +595,15 @@ class HypervisorsTestV21(test.NoDBTestCase):
mock.ANY, self.TEST_HYPERS_OBJ[0].host)
def test_uptime_hypervisor_not_mapped_service_get(self):
@mock.patch.object(self.controller.host_api, 'compute_node_get')
@mock.patch.object(self.controller.host_api, 'get_host_uptime')
@mock.patch.object(self.controller.host_api,
'service_get_by_compute_host',
side_effect=exception.HostMappingNotFound(
name='dummy'))
def _test(mock_get, _, __):
req = self._get_request(True)
hyper_id = self._get_hyper_id()
self.assertRaises(exc.HTTPNotFound,
self.controller.uptime, req, hyper_id)
self.assertTrue(mock_get.called)
self.controller.host_api.service_get_by_compute_host.side_effect = (
exception.HostMappingNotFound(name='dummy'))
_test()
req = self._get_request(True)
hyper_id = self._get_hyper_id()
self.assertRaises(exc.HTTPNotFound,
self.controller.uptime, req, hyper_id)
self.assertTrue(
self.controller.host_api.service_get_by_compute_host.called)
def test_uptime_hypervisor_not_mapped(self):
with mock.patch.object(self.controller.host_api, 'get_host_uptime',
@ -644,30 +623,26 @@ class HypervisorsTestV21(test.NoDBTestCase):
self.assertEqual(dict(hypervisors=self.INDEX_HYPER_DICTS), result)
def test_search_non_exist(self):
with mock.patch.object(self.controller.host_api,
'compute_node_search_by_hypervisor',
return_value=[]) as mock_node_search:
req = self._get_request(True)
self.assertRaises(exc.HTTPNotFound, self.controller.search,
req, 'a')
self.assertEqual(1, mock_node_search.call_count)
m_search = self.controller.host_api.compute_node_search_by_hypervisor
m_search.side_effect = None
m_search.return_value = []
req = self._get_request(True)
self.assertRaises(exc.HTTPNotFound, self.controller.search, req, 'a')
self.assertEqual(1, m_search.call_count)
def test_search_unmapped(self):
m_search = self.controller.host_api.compute_node_search_by_hypervisor
m_search.side_effect = None
m_search.return_value = [mock.MagicMock()]
@mock.patch.object(self.controller.host_api,
'compute_node_search_by_hypervisor')
@mock.patch.object(self.controller.host_api,
'service_get_by_compute_host')
def _test(mock_service, mock_search):
mock_search.return_value = [mock.MagicMock()]
mock_service.side_effect = exception.HostMappingNotFound(
name='foo')
req = self._get_request(True)
self.assertRaises(exc.HTTPNotFound, self.controller.search,
req, 'a')
self.assertTrue(mock_service.called)
self.controller.host_api.service_get_by_compute_host.side_effect = (
exception.HostMappingNotFound(name='foo'))
_test()
req = self._get_request(True)
self.assertRaises(exc.HTTPNotFound, self.controller.search, req, 'a')
self.assertTrue(
self.controller.host_api.service_get_by_compute_host.called)
@mock.patch.object(objects.InstanceList, 'get_by_host',
side_effect=fake_instance_get_all_by_host)
@ -702,15 +677,12 @@ class HypervisorsTestV21(test.NoDBTestCase):
def test_servers_compute_host_not_found(self):
req = self._get_request(True)
with test.nested(
mock.patch.object(
self.controller.host_api, 'instance_get_all_by_host',
side_effect=fake_instance_get_all_by_host,
),
mock.patch.object(
self.controller.host_api, 'service_get_by_compute_host',
side_effect=exception.ComputeHostNotFound(host='foo'),
),
self.controller.host_api.service_get_by_compute_host.side_effect = (
exception.ComputeHostNotFound(host='foo'))
with mock.patch.object(
self.controller.host_api,
'instance_get_all_by_host',
side_effect=fake_instance_get_all_by_host,
):
# The result should be empty since every attempt to fetch the
# service for a hypervisor "failed"
@ -718,24 +690,25 @@ class HypervisorsTestV21(test.NoDBTestCase):
self.assertEqual({'hypervisors': []}, result)
def test_servers_non_id(self):
with mock.patch.object(self.controller.host_api,
'compute_node_search_by_hypervisor',
return_value=[]) as mock_node_search:
req = self._get_request(True)
self.assertRaises(exc.HTTPNotFound,
self.controller.servers,
req, '115')
self.assertEqual(1, mock_node_search.call_count)
m_search = self.controller.host_api.compute_node_search_by_hypervisor
m_search.side_effect = None
m_search.return_value = []
req = self._get_request(True)
self.assertRaises(exc.HTTPNotFound,
self.controller.servers,
req, '115')
self.assertEqual(1, m_search.call_count)
def test_servers_with_non_integer_hypervisor_id(self):
with mock.patch.object(self.controller.host_api,
'compute_node_search_by_hypervisor',
return_value=[]) as mock_node_search:
m_search = self.controller.host_api.compute_node_search_by_hypervisor
m_search.side_effect = None
m_search.return_value = []
req = self._get_request(True)
self.assertRaises(exc.HTTPNotFound,
self.controller.servers, req, 'abc')
self.assertEqual(1, mock_node_search.call_count)
req = self._get_request(True)
self.assertRaises(
exc.HTTPNotFound, self.controller.servers, req, 'abc')
self.assertEqual(1, m_search.call_count)
def test_servers_with_no_servers(self):
with mock.patch.object(self.controller.host_api,
@ -1089,15 +1062,13 @@ class HypervisorsTestV253(HypervisorsTestV252):
use_admin_context=True,
url='/os-hypervisors?with_servers=1')
with test.nested(
mock.patch.object(
self.controller.host_api, 'instance_get_all_by_host',
side_effect=fake_instance_get_all_by_host,
),
mock.patch.object(
self.controller.host_api, 'service_get_by_compute_host',
side_effect=exception.ComputeHostNotFound(host='foo'),
),
self.controller.host_api.service_get_by_compute_host.side_effect = (
exception.ComputeHostNotFound(host='foo'))
with mock.patch.object(
self.controller.host_api,
"instance_get_all_by_host",
side_effect=fake_instance_get_all_by_host,
):
# The result should be empty since every attempt to fetch the
# service for a hypervisor "failed"
@ -1157,11 +1128,13 @@ class HypervisorsTestV253(HypervisorsTestV252):
use_admin_context=True,
url='/os-hypervisors?with_servers=yes&'
'hypervisor_hostname_pattern=shenzhen')
with mock.patch.object(self.controller.host_api,
'compute_node_search_by_hypervisor',
return_value=objects.ComputeNodeList()) as s:
self.assertRaises(exc.HTTPNotFound, self.controller.index, req)
s.assert_called_once_with(req.environ['nova.context'], 'shenzhen')
m_search = self.controller.host_api.compute_node_search_by_hypervisor
m_search.side_effect = None
m_search.return_value = objects.ComputeNodeList()
self.assertRaises(exc.HTTPNotFound, self.controller.index, req)
m_search.assert_called_once_with(
req.environ['nova.context'], 'shenzhen')
def test_detail_with_hostname_pattern(self):
"""Test listing hypervisors with details and using the
@ -1170,13 +1143,14 @@ class HypervisorsTestV253(HypervisorsTestV252):
req = self._get_request(
use_admin_context=True,
url='/os-hypervisors?hypervisor_hostname_pattern=shenzhen')
with mock.patch.object(
self.controller.host_api,
'compute_node_search_by_hypervisor',
return_value=objects.ComputeNodeList(objects=[TEST_HYPERS_OBJ[0]])
) as s:
result = self.controller.detail(req)
s.assert_called_once_with(req.environ['nova.context'], 'shenzhen')
m_search = self.controller.host_api.compute_node_search_by_hypervisor
m_search.side_effect = None
m_search.return_value = objects.ComputeNodeList(
objects=[TEST_HYPERS_OBJ[0]])
result = self.controller.detail(req)
m_search.assert_called_once_with(
req.environ['nova.context'], 'shenzhen')
expected = {'hypervisors': [self.DETAIL_HYPERS_DICTS[0]]}
@ -1483,15 +1457,11 @@ class HypervisorsTestV288(HypervisorsTestV275):
self.controller.uptime, req)
def test_uptime_old_version(self):
with mock.patch.object(
self.controller.host_api, 'get_host_uptime',
return_value='fake uptime',
):
req = self._get_request(use_admin_context=True, version='2.87')
hyper_id = self._get_hyper_id()
req = self._get_request(use_admin_context=True, version='2.87')
hyper_id = self._get_hyper_id()
# no exception == pass
self.controller.uptime(req, hyper_id)
# no exception == pass
self.controller.uptime(req, hyper_id)
def test_uptime_noid(self):
# the separate 'uptime' API has been removed, so skip this test
@ -1526,34 +1496,36 @@ class HypervisorsTestV288(HypervisorsTestV275):
pass
def test_show_with_uptime_notimplemented(self):
with mock.patch.object(
self.controller.host_api, 'get_host_uptime',
side_effect=NotImplementedError,
) as mock_get_uptime:
req = self._get_request(use_admin_context=True)
hyper_id = self._get_hyper_id()
self.controller.host_api.get_host_uptime.side_effect = (
NotImplementedError())
result = self.controller.show(req, hyper_id)
req = self._get_request(use_admin_context=True)
hyper_id = self._get_hyper_id()
expected_dict = copy.deepcopy(self.DETAIL_HYPERS_DICTS[0])
expected_dict.update({'uptime': None})
self.assertEqual({'hypervisor': expected_dict}, result)
self.assertEqual(1, mock_get_uptime.call_count)
result = self.controller.show(req, hyper_id)
expected_dict = copy.deepcopy(self.DETAIL_HYPERS_DICTS[0])
expected_dict.update({'uptime': None})
self.assertEqual({'hypervisor': expected_dict}, result)
self.assertEqual(
1, self.controller.host_api.get_host_uptime.call_count)
def test_show_with_uptime_hypervisor_down(self):
with mock.patch.object(
self.controller.host_api, 'get_host_uptime',
side_effect=exception.ComputeServiceUnavailable(host='dummy')
) as mock_get_uptime:
req = self._get_request(use_admin_context=True)
hyper_id = self._get_hyper_id()
self.controller.host_api.get_host_uptime.side_effect = (
exception.ComputeServiceUnavailable(host='dummy'))
result = self.controller.show(req, hyper_id)
req = self._get_request(use_admin_context=True)
hyper_id = self._get_hyper_id()
expected_dict = copy.deepcopy(self.DETAIL_HYPERS_DICTS[0])
expected_dict.update({'uptime': None})
self.assertEqual({'hypervisor': expected_dict}, result)
self.assertEqual(1, mock_get_uptime.call_count)
result = self.controller.show(req, hyper_id)
expected_dict = copy.deepcopy(self.DETAIL_HYPERS_DICTS[0])
expected_dict.update({'uptime': None})
self.assertEqual({'hypervisor': expected_dict}, result)
self.assertEqual(
1,
self.controller.host_api.get_host_uptime.call_count
)
def test_show_old_version(self):
# ensure things still work as expected here

View File

@ -34,7 +34,6 @@ from nova.limit import local as local_limit
from nova.limit import placement as placement_limit
from nova import objects
from nova.policies import limits as l_policies
from nova import quota
from nova import test
from nova.tests.unit.api.openstack import fakes
from nova.tests.unit import matchers
@ -52,12 +51,12 @@ class BaseLimitTestSuite(test.NoDBTestCase):
return {k: dict(limit=v, in_use=v // 2)
for k, v in self.absolute_limits.items()}
mock_get_project_quotas = mock.patch.object(
patcher_get_project_quotas = mock.patch.object(
nova.quota.QUOTAS,
"get_project_quotas",
side_effect = stub_get_project_quotas)
mock_get_project_quotas.start()
self.addCleanup(mock_get_project_quotas.stop)
side_effect=stub_get_project_quotas)
self.mock_get_project_quotas = patcher_get_project_quotas.start()
self.addCleanup(patcher_get_project_quotas.stop)
patcher = self.mock_can = mock.patch('nova.context.RequestContext.can')
self.mock_can = patcher.start()
self.addCleanup(patcher.stop)
@ -154,16 +153,14 @@ class LimitsControllerTestV21(BaseLimitTestSuite):
return {k: dict(limit=v, in_use=v // 2)
for k, v in self.absolute_limits.items()}
with mock.patch('nova.quota.QUOTAS.get_project_quotas') as \
get_project_quotas:
get_project_quotas.side_effect = _get_project_quotas
self.mock_get_project_quotas.side_effect = _get_project_quotas
response = request.get_response(self.controller)
response = request.get_response(self.controller)
body = jsonutils.loads(response.body)
self.assertEqual(expected, body)
get_project_quotas.assert_called_once_with(context, tenant_id,
usages=True)
body = jsonutils.loads(response.body)
self.assertEqual(expected, body)
self.mock_get_project_quotas.assert_called_once_with(
context, tenant_id, usages=True)
def _do_test_used_limits(self, reserved):
request = self._get_index_request(tenant_id=None)
@ -186,8 +183,7 @@ class LimitsControllerTestV21(BaseLimitTestSuite):
def stub_get_project_quotas(context, project_id, usages=True):
return limits
self.stub_out('nova.quota.QUOTAS.get_project_quotas',
stub_get_project_quotas)
self.mock_get_project_quotas.side_effect = stub_get_project_quotas
res = request.get_response(self.controller)
body = jsonutils.loads(res.body)
@ -211,14 +207,15 @@ class LimitsControllerTestV21(BaseLimitTestSuite):
user_id=user_id,
project_id=project_id)
context = fake_req.environ["nova.context"]
with mock.patch.object(quota.QUOTAS, 'get_project_quotas',
return_value={}) as mock_get_quotas:
fake_req.get_response(self.controller)
self.assertEqual(2, self.mock_can.call_count)
self.mock_can.assert_called_with(
l_policies.OTHER_PROJECT_LIMIT_POLICY_NAME)
mock_get_quotas.assert_called_once_with(context,
tenant_id, usages=True)
self.mock_get_project_quotas.side_effect = None
self.mock_get_project_quotas.return_value = {}
fake_req.get_response(self.controller)
self.assertEqual(2, self.mock_can.call_count)
self.mock_can.assert_called_with(
l_policies.OTHER_PROJECT_LIMIT_POLICY_NAME)
self.mock_get_project_quotas.assert_called_once_with(context,
tenant_id, usages=True)
def _test_admin_can_fetch_used_limits_for_own_project(self, req_get):
project_id = "123456"
@ -230,11 +227,12 @@ class LimitsControllerTestV21(BaseLimitTestSuite):
project_id=project_id)
context = fake_req.environ["nova.context"]
with mock.patch.object(quota.QUOTAS, 'get_project_quotas',
return_value={}) as mock_get_quotas:
fake_req.get_response(self.controller)
mock_get_quotas.assert_called_once_with(context,
project_id, usages=True)
self.mock_get_project_quotas.side_effect = None
self.mock_get_project_quotas.return_value = {}
fake_req.get_response(self.controller)
self.mock_get_project_quotas.assert_called_once_with(
context, project_id, usages=True)
def test_admin_can_fetch_used_limits_for_own_project(self):
req_get = {}
@ -262,12 +260,13 @@ class LimitsControllerTestV21(BaseLimitTestSuite):
project_id = "123456"
fake_req = self._get_index_request(project_id=project_id)
context = fake_req.environ["nova.context"]
with mock.patch.object(quota.QUOTAS, 'get_project_quotas',
return_value={}) as mock_get_quotas:
fake_req.get_response(self.controller)
self.mock_get_project_quotas.side_effect = None
self.mock_get_project_quotas.return_value = {}
mock_get_quotas.assert_called_once_with(context,
project_id, usages=True)
fake_req.get_response(self.controller)
self.mock_get_project_quotas.assert_called_once_with(
context, project_id, usages=True)
def test_used_ram_added(self):
fake_req = self._get_index_request()
@ -275,28 +274,26 @@ class LimitsControllerTestV21(BaseLimitTestSuite):
def stub_get_project_quotas(context, project_id, usages=True):
return {'ram': {'limit': 512, 'in_use': 256}}
with mock.patch.object(quota.QUOTAS, 'get_project_quotas',
side_effect=stub_get_project_quotas
) as mock_get_quotas:
self.mock_get_project_quotas.side_effect = stub_get_project_quotas
res = fake_req.get_response(self.controller)
body = jsonutils.loads(res.body)
abs_limits = body['limits']['absolute']
self.assertIn('totalRAMUsed', abs_limits)
self.assertEqual(256, abs_limits['totalRAMUsed'])
self.assertEqual(1, mock_get_quotas.call_count)
res = fake_req.get_response(self.controller)
body = jsonutils.loads(res.body)
abs_limits = body['limits']['absolute']
self.assertIn('totalRAMUsed', abs_limits)
self.assertEqual(256, abs_limits['totalRAMUsed'])
self.assertEqual(1, self.mock_get_project_quotas.call_count)
def test_no_ram_quota(self):
fake_req = self._get_index_request()
with mock.patch.object(quota.QUOTAS, 'get_project_quotas',
return_value={}) as mock_get_quotas:
self.mock_get_project_quotas.side_effect = None
self.mock_get_project_quotas.return_value = {}
res = fake_req.get_response(self.controller)
body = jsonutils.loads(res.body)
abs_limits = body['limits']['absolute']
self.assertNotIn('totalRAMUsed', abs_limits)
self.assertEqual(1, mock_get_quotas.call_count)
res = fake_req.get_response(self.controller)
body = jsonutils.loads(res.body)
abs_limits = body['limits']['absolute']
self.assertNotIn('totalRAMUsed', abs_limits)
self.assertEqual(1, self.mock_get_project_quotas.call_count)
class FakeHttplibSocket(object):
@ -398,25 +395,24 @@ class LimitsControllerTestV236(BaseLimitTestSuite):
return {k: dict(limit=v, in_use=v // 2)
for k, v in absolute_limits.items()}
with mock.patch('nova.quota.QUOTAS.get_project_quotas') as \
get_project_quotas:
get_project_quotas.side_effect = _get_project_quotas
response = self.controller.index(self.req)
expected_response = {
"limits": {
"rate": [],
"absolute": {
"maxTotalRAMSize": 512,
"maxTotalInstances": 5,
"maxTotalCores": 21,
"maxTotalKeypairs": 10,
"totalRAMUsed": 256,
"totalCoresUsed": 10,
"totalInstancesUsed": 2,
},
self.mock_get_project_quotas.side_effect = _get_project_quotas
response = self.controller.index(self.req)
expected_response = {
"limits": {
"rate": [],
"absolute": {
"maxTotalRAMSize": 512,
"maxTotalInstances": 5,
"maxTotalCores": 21,
"maxTotalKeypairs": 10,
"totalRAMUsed": 256,
"totalCoresUsed": 10,
"totalInstancesUsed": 2,
},
}
self.assertEqual(expected_response, response)
},
}
self.assertEqual(expected_response, response)
class LimitsControllerTestV239(BaseLimitTestSuite):
@ -436,21 +432,20 @@ class LimitsControllerTestV239(BaseLimitTestSuite):
return {k: dict(limit=v, in_use=v // 2)
for k, v in absolute_limits.items()}
with mock.patch('nova.quota.QUOTAS.get_project_quotas') as \
get_project_quotas:
get_project_quotas.side_effect = _get_project_quotas
response = self.controller.index(self.req)
# staring from version 2.39 there is no 'maxImageMeta' field
# in response after removing 'image-metadata' proxy API
expected_response = {
"limits": {
"rate": [],
"absolute": {
"maxServerMeta": 1,
},
self.mock_get_project_quotas.side_effect = _get_project_quotas
response = self.controller.index(self.req)
# starting from version 2.39 there is no 'maxImageMeta' field
# in response after removing 'image-metadata' proxy API
expected_response = {
"limits": {
"rate": [],
"absolute": {
"maxServerMeta": 1,
},
}
self.assertEqual(expected_response, response)
},
}
self.assertEqual(expected_response, response)
class LimitsControllerTestV275(BaseLimitTestSuite):
@ -469,10 +464,9 @@ class LimitsControllerTestV275(BaseLimitTestSuite):
return {k: dict(limit=v, in_use=v // 2)
for k, v in absolute_limits.items()}
with mock.patch('nova.quota.QUOTAS.get_project_quotas') as \
get_project_quotas:
get_project_quotas.side_effect = _get_project_quotas
self.controller.index(req)
self.mock_get_project_quotas.side_effect = _get_project_quotas
self.controller.index(req)
self.controller.index(req)
def test_index_additional_query_param(self):
req = fakes.HTTPRequest.blank("/?unknown=fake",

View File

@ -67,11 +67,11 @@ class ServerActionsControllerTestV21(test.TestCase):
self.controller = self._get_controller()
self.compute_api = self.controller.compute_api
# We don't care about anything getting as far as hitting the compute
# RPC API so we just mock it out here.
mock_rpcapi = mock.patch.object(self.compute_api, 'compute_rpcapi')
mock_rpcapi.start()
self.addCleanup(mock_rpcapi.stop)
# In most of the cases we don't care about anything getting as far as
# hitting the compute RPC API so we just mock it out here.
patcher_rpcapi = mock.patch.object(self.compute_api, 'compute_rpcapi')
self.mock_rpcapi = patcher_rpcapi.start()
self.addCleanup(patcher_rpcapi.stop)
# The project_id here matches what is used by default in
# fake_compute_get which need to match for policy checks.
self.req = fakes.HTTPRequest.blank('',
@ -1080,21 +1080,23 @@ class ServerActionsControllerTestV21(test.TestCase):
snapshot = dict(id=_fake_id('d'))
self.mock_rpcapi.quiesce_instance.side_effect = (
exception.InstanceQuiesceNotSupported(
instance_id="fake", reason="test"
)
)
with test.nested(
mock.patch.object(
self.controller.compute_api.volume_api, 'get_absolute_limits',
return_value={'totalSnapshotsUsed': 0,
'maxTotalSnapshots': 10}),
mock.patch.object(self.controller.compute_api.compute_rpcapi,
'quiesce_instance',
side_effect=exception.InstanceQuiesceNotSupported(
instance_id='fake', reason='test')),
mock.patch.object(self.controller.compute_api.volume_api, 'get',
return_value=volume),
mock.patch.object(self.controller.compute_api.volume_api,
'create_snapshot_force',
return_value=snapshot),
) as (mock_get_limits, mock_quiesce, mock_vol_get, mock_vol_create):
) as (mock_get_limits, mock_vol_get, mock_vol_create):
if mock_vol_create_side_effect:
mock_vol_create.side_effect = mock_vol_create_side_effect
@ -1126,7 +1128,7 @@ class ServerActionsControllerTestV21(test.TestCase):
for k in extra_properties.keys():
self.assertEqual(properties[k], extra_properties[k])
mock_quiesce.assert_called_once_with(mock.ANY, mock.ANY)
self.mock_rpcapi.quiesce_instance.assert_called_once()
mock_vol_get.assert_called_once_with(mock.ANY, volume['id'])
mock_vol_create.assert_called_once_with(mock.ANY, volume['id'],
mock.ANY, mock.ANY)
@ -1190,21 +1192,23 @@ class ServerActionsControllerTestV21(test.TestCase):
snapshot = dict(id=_fake_id('d'))
self.mock_rpcapi.quiesce_instance.side_effect = (
exception.InstanceQuiesceNotSupported(
instance_id="fake", reason="test"
)
)
with test.nested(
mock.patch.object(
self.controller.compute_api.volume_api, 'get_absolute_limits',
return_value={'totalSnapshotsUsed': 0,
'maxTotalSnapshots': 10}),
mock.patch.object(self.controller.compute_api.compute_rpcapi,
'quiesce_instance',
side_effect=exception.InstanceQuiesceNotSupported(
instance_id='fake', reason='test')),
mock.patch.object(self.controller.compute_api.volume_api, 'get',
return_value=volume),
mock.patch.object(self.controller.compute_api.volume_api,
'create_snapshot_force',
return_value=snapshot),
) as (mock_get_limits, mock_quiesce, mock_vol_get, mock_vol_create):
) as (mock_get_limits, mock_vol_get, mock_vol_create):
response = self.controller._action_create_image(self.req,
FAKE_UUID, body=body)
@ -1219,7 +1223,7 @@ class ServerActionsControllerTestV21(test.TestCase):
for key, val in extra_metadata.items():
self.assertEqual(properties[key], val)
mock_quiesce.assert_called_once_with(mock.ANY, mock.ANY)
self.mock_rpcapi.quiesce_instance.assert_called_once()
mock_vol_get.assert_called_once_with(mock.ANY, volume['id'])
mock_vol_create.assert_called_once_with(mock.ANY, volume['id'],
mock.ANY, mock.ANY)

View File

@ -378,10 +378,9 @@ class UnshelveServerControllerTestV291(test.NoDBTestCase):
self.req.body = jsonutils.dump_as_bytes(body)
self.req.api_version_request = (
api_version_request.APIVersionRequest('2.91'))
with mock.patch.object(
self.controller.compute_api, 'unshelve') as mock_unshelve:
self.controller._unshelve(self.req, fakes.FAKE_UUID, body=body)
mock_unshelve.assert_called_once_with(
self.controller._unshelve(self.req, fakes.FAKE_UUID, body=body)
self.controller.compute_api.unshelve.assert_called_once_with(
self.req.environ['nova.context'],
instance,
new_az='us-east',

View File

@ -8614,16 +8614,13 @@ class ComputeAPITestCase(BaseTestCase):
def test_create_instance_sets_system_metadata(self):
# Make sure image properties are copied into system metadata.
with mock.patch.object(
self.compute_api.compute_task_api, 'schedule_and_build_instances',
) as mock_sbi:
ref, resv_id = self.compute_api.create(
self.context,
flavor=self.default_flavor,
image_href='f5000000-0000-0000-0000-000000000000')
ref, resv_id = self.compute_api.create(
self.context,
flavor=self.default_flavor,
image_href='f5000000-0000-0000-0000-000000000000')
build_call = mock_sbi.call_args_list[0]
instance = build_call[1]['build_requests'][0].instance
build_call = self.schedule_and_build_instances_mock.call_args_list[0]
instance = build_call[1]['build_requests'][0].instance
image_props = {'image_kernel_id': uuids.kernel_id,
'image_ramdisk_id': uuids.ramdisk_id,
@ -8633,16 +8630,14 @@ class ComputeAPITestCase(BaseTestCase):
self.assertEqual(value, instance.system_metadata[key])
def test_create_saves_flavor(self):
with mock.patch.object(
self.compute_api.compute_task_api, 'schedule_and_build_instances',
) as mock_sbi:
ref, resv_id = self.compute_api.create(
self.context,
flavor=self.default_flavor,
image_href=uuids.image_href_id)
ref, resv_id = self.compute_api.create(
self.context,
flavor=self.default_flavor,
image_href=uuids.image_href_id)
build_call = self.schedule_and_build_instances_mock.call_args_list[0]
instance = build_call[1]['build_requests'][0].instance
build_call = mock_sbi.call_args_list[0]
instance = build_call[1]['build_requests'][0].instance
self.assertIn('flavor', instance)
self.assertEqual(self.default_flavor.flavorid,
instance.flavor.flavorid)
@ -8650,19 +8645,18 @@ class ComputeAPITestCase(BaseTestCase):
def test_create_instance_associates_security_groups(self):
# Make sure create associates security groups.
with test.nested(
mock.patch.object(self.compute_api.compute_task_api,
'schedule_and_build_instances'),
mock.patch('nova.network.security_group_api.validate_name',
return_value=uuids.secgroup_id),
) as (mock_sbi, mock_secgroups):
with mock.patch(
"nova.network.security_group_api.validate_name",
return_value=uuids.secgroup_id,
) as mock_secgroups:
self.compute_api.create(
self.context,
flavor=self.default_flavor,
image_href=uuids.image_href_id,
security_groups=['testgroup'])
build_call = mock_sbi.call_args_list[0]
build_call = (
self.schedule_and_build_instances_mock.call_args_list[0])
reqspec = build_call[1]['request_spec'][0]
self.assertEqual(1, len(reqspec.security_groups))
@ -8697,22 +8691,19 @@ class ComputeAPITestCase(BaseTestCase):
requested_networks = objects.NetworkRequestList(
objects=[objects.NetworkRequest(port_id=uuids.port_instance)])
with test.nested(
mock.patch.object(
self.compute_api.compute_task_api,
'schedule_and_build_instances'),
mock.patch.object(
self.compute_api.network_api,
'create_resource_requests',
return_value=(None, [], objects.RequestLevelParams())),
) as (mock_sbi, _mock_create_resreqs):
with mock.patch.object(
self.compute_api.network_api,
"create_resource_requests",
return_value=(None, [], objects.RequestLevelParams()),
):
self.compute_api.create(
self.context,
flavor=self.default_flavor,
image_href=uuids.image_href_id,
requested_networks=requested_networks)
build_call = mock_sbi.call_args_list[0]
build_call = (
self.schedule_and_build_instances_mock.call_args_list[0])
reqspec = build_call[1]['request_spec'][0]
self.assertEqual(1, len(reqspec.requested_networks))
@ -10212,8 +10203,7 @@ class ComputeAPITestCase(BaseTestCase):
self.compute_api.get_console_output,
self.context, instance)
@mock.patch.object(compute_utils, 'notify_about_instance_action')
def test_attach_interface(self, mock_notify):
def test_attach_interface(self):
instance = self._create_fake_instance_obj()
nwinfo = [fake_network_cache_model.new_vif()]
network_id = nwinfo[0]['network']['id']
@ -10233,8 +10223,12 @@ class ComputeAPITestCase(BaseTestCase):
mock.patch.object(
self.compute,
"_claim_pci_device_for_interface_attach",
return_value=None)
) as (cap, mock_lock, mock_create_resource_req, mock_claim_pci):
return_value=None),
mock.patch.object(compute_utils, 'notify_about_instance_action'),
) as (
cap, mock_lock, mock_create_resource_req, mock_claim_pci,
mock_notify
):
mock_create_resource_req.return_value = (
None, [], mock.sentinel.req_lvl_params)
vif = self.compute.attach_interface(self.context,
@ -11052,8 +11046,7 @@ class ComputeAPITestCase(BaseTestCase):
mock_remove_res.assert_called_once_with(
self.context, instance.uuid, mock.sentinel.resources)
@mock.patch.object(compute_utils, 'notify_about_instance_action')
def test_detach_interface(self, mock_notify):
def test_detach_interface(self):
nwinfo, port_id = self.test_attach_interface()
instance = self._create_fake_instance_obj()
instance.info_cache = objects.InstanceInfoCache.new(
@ -11086,10 +11079,13 @@ class ComputeAPITestCase(BaseTestCase):
mock.patch('nova.pci.request.get_instance_pci_request_from_vif',
return_value=pci_req),
mock.patch.object(self.compute.rt, 'unclaim_pci_devices'),
mock.patch.object(instance, 'save')
mock.patch.object(instance, 'save'),
mock.patch.object(compute_utils, 'notify_about_instance_action'),
) as (
mock_remove_alloc, mock_deallocate, mock_lock,
mock_get_pci_req, mock_unclaim_pci, mock_instance_save):
mock_remove_alloc, mock_deallocate, mock_lock,
mock_get_pci_req, mock_unclaim_pci, mock_instance_save,
mock_notify
):
self.compute.detach_interface(self.context, instance, port_id)
mock_deallocate.assert_called_once_with(
@ -11896,17 +11892,16 @@ class ComputeAPITestCase(BaseTestCase):
instance.save()
@mock.patch.object(objects.Service, 'get_by_compute_host')
@mock.patch.object(self.compute_api.compute_task_api,
'rebuild_instance')
@mock.patch.object(objects.ComputeNodeList, 'get_all_by_host')
@mock.patch.object(objects.RequestSpec,
'get_by_instance_uuid')
@mock.patch.object(self.compute_api.servicegroup_api, 'service_is_up')
def do_test(service_is_up, get_by_instance_uuid, get_all_by_host,
rebuild_instance, get_service):
def do_test(
service_is_up, get_by_instance_uuid, get_all_by_host, get_service
):
service_is_up.return_value = False
get_by_instance_uuid.return_value = fake_spec
rebuild_instance.side_effect = fake_rebuild_instance
self.rebuild_instance_mock.side_effect = fake_rebuild_instance
get_all_by_host.return_value = objects.ComputeNodeList(
objects=[objects.ComputeNode(
host='fake_dest_host',
@ -11924,7 +11919,7 @@ class ComputeAPITestCase(BaseTestCase):
host = None
else:
host = 'fake_dest_host'
rebuild_instance.assert_called_once_with(
self.rebuild_instance_mock.assert_called_once_with(
ctxt,
instance=instance,
new_pass=None,

View File

@ -8589,11 +8589,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase,
@mock.patch.object(self.compute.network_api, 'setup_networks_on_host')
@mock.patch.object(self.compute.network_api, 'migrate_instance_start')
@mock.patch.object(compute_utils, 'notify_usage_exists')
@mock.patch.object(self.migration, 'save')
@mock.patch.object(objects.BlockDeviceMappingList,
'get_by_instance_uuid')
def do_test(get_by_instance_uuid,
migration_save,
notify_usage_exists,
migrate_instance_start,
setup_networks_on_host,
@ -8665,7 +8663,6 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase,
@mock.patch.object(self.compute.network_api, 'migrate_instance_finish',
side_effect=_migrate_instance_finish)
@mock.patch.object(self.compute.network_api, 'setup_networks_on_host')
@mock.patch.object(self.migration, 'save')
@mock.patch.object(self.instance, 'save')
@mock.patch.object(self.compute, '_set_instance_info')
@mock.patch.object(db, 'instance_fault_create')
@ -8679,7 +8676,6 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase,
fault_create,
set_instance_info,
instance_save,
migration_save,
setup_networks_on_host,
migrate_instance_finish,
get_instance_nw_info,
@ -8723,11 +8719,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase,
@mock.patch.object(self.compute.network_api, 'migrate_instance_start')
@mock.patch.object(compute_utils, 'notify_usage_exists')
@mock.patch.object(db, 'instance_extra_update_by_uuid')
@mock.patch.object(self.migration, 'save')
@mock.patch.object(objects.BlockDeviceMappingList,
'get_by_instance_uuid')
def do_revert_resize(mock_get_by_instance_uuid,
mock_migration_save,
mock_extra_update,
mock_notify_usage_exists,
mock_migrate_instance_start,
@ -8774,7 +8768,6 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase,
@mock.patch.object(compute_utils, 'notify_about_instance_action')
@mock.patch.object(self.compute, "_set_instance_info")
@mock.patch.object(self.instance, 'save')
@mock.patch.object(self.migration, 'save')
@mock.patch.object(compute_utils, 'add_instance_fault_from_exc')
@mock.patch.object(db, 'instance_fault_create')
@mock.patch.object(db, 'instance_extra_update_by_uuid')
@ -8798,7 +8791,6 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase,
mock_extra_update,
mock_fault_create,
mock_fault_from_exc,
mock_mig_save,
mock_inst_save,
mock_set,
mock_notify_about_instance_action,
@ -8892,7 +8884,6 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase,
@mock.patch.object(self.compute, '_delete_scheduler_instance_info')
@mock.patch('nova.objects.Instance.get_by_uuid')
@mock.patch('nova.objects.Migration.get_by_id')
@mock.patch.object(self.migration, 'save')
@mock.patch.object(self.compute, '_notify_about_instance_usage')
@mock.patch.object(self.compute, 'network_api')
@mock.patch.object(self.compute.driver, 'confirm_migration')
@ -8901,7 +8892,7 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase,
@mock.patch.object(self.instance, 'save')
def do_confirm_resize(mock_save, mock_drop, mock_delete,
mock_confirm, mock_nwapi, mock_notify,
mock_mig_save, mock_mig_get, mock_inst_get,
mock_mig_get, mock_inst_get,
mock_delete_scheduler_info):
self._mock_rt()
@ -8984,16 +8975,16 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase,
instance_get_by_uuid.assert_called_once()
def test_confirm_resize_calls_virt_driver_with_old_pci(self):
@mock.patch.object(self.migration, 'save')
@mock.patch.object(self.compute, '_notify_about_instance_usage')
@mock.patch.object(self.compute, 'network_api')
@mock.patch.object(self.compute.driver, 'confirm_migration')
@mock.patch.object(self.compute, '_delete_allocation_after_move')
@mock.patch.object(self.instance, 'drop_migration_context')
@mock.patch.object(self.instance, 'save')
def do_confirm_resize(mock_save, mock_drop, mock_delete,
mock_confirm, mock_nwapi, mock_notify,
mock_mig_save):
def do_confirm_resize(
mock_save, mock_drop, mock_delete, mock_confirm, mock_nwapi,
mock_notify
):
# Mock virt driver confirm_resize() to save the provided
# network_info, we will check it later.
updated_nw_info = []

View File

@ -4205,9 +4205,9 @@ class TestCleanComputeNodeCache(BaseTestCase):
invalid_nodename = "invalid-node"
self.rt.compute_nodes[_NODENAME] = self.compute
self.rt.compute_nodes[invalid_nodename] = mock.sentinel.compute
with mock.patch.object(
self.rt.reportclient, "invalidate_resource_provider",
) as mock_invalidate:
self.rt.clean_compute_node_cache([self.compute])
mock_remove.assert_called_once_with(invalid_nodename)
mock_invalidate.assert_called_once_with(invalid_nodename)
mock_invalidate = self.rt.reportclient.invalidate_resource_provider
self.rt.clean_compute_node_cache([self.compute])
mock_remove.assert_called_once_with(invalid_nodename)
mock_invalidate.assert_called_once_with(invalid_nodename)

View File

@ -277,33 +277,21 @@ class DecoratorTestCase(test.TestCase):
'No DB access allowed in ',
mock_log.error.call_args[0][0])
@mock.patch.object(db, 'LOG')
@mock.patch.object(db, 'DISABLE_DB_ACCESS', return_value=True)
def test_pick_context_manager_writer_disable_db_access(
self, mock_DISABLE_DB_ACCESS, mock_log,
):
def test_pick_context_manager_writer_disable_db_access(self):
@db.pick_context_manager_writer
def func(context, value):
pass
self._test_pick_context_manager_disable_db_access(func)
@mock.patch.object(db, 'LOG')
@mock.patch.object(db, 'DISABLE_DB_ACCESS', return_value=True)
def test_pick_context_manager_reader_disable_db_access(
self, mock_DISABLE_DB_ACCESS, mock_log,
):
def test_pick_context_manager_reader_disable_db_access(self):
@db.pick_context_manager_reader
def func(context, value):
pass
self._test_pick_context_manager_disable_db_access(func)
@mock.patch.object(db, 'LOG')
@mock.patch.object(db, 'DISABLE_DB_ACCESS', return_value=True)
def test_pick_context_manager_reader_allow_async_disable_db_access(
self, mock_DISABLE_DB_ACCESS, mock_log,
):
def test_pick_context_manager_reader_allow_async_disable_db_access(self):
@db.pick_context_manager_reader_allow_async
def func(context, value):
pass

View File

@ -99,16 +99,7 @@ class PciDeviceStatsTestCase(test.NoDBTestCase):
def setUp(self):
super(PciDeviceStatsTestCase, self).setUp()
self._setup_pci_stats()
def _setup_pci_stats(self, numa_topology=None):
"""Exists for tests that need to setup pci_stats with a specific NUMA
topology, while still allowing tests that don't care to get the default
"empty" one.
"""
if not numa_topology:
numa_topology = objects.NUMATopology()
self.pci_stats = stats.PciDeviceStats(numa_topology)
self.pci_stats = stats.PciDeviceStats(objects.NUMATopology())
# The following two calls need to be made before adding the devices.
patcher = fakes.fake_pci_whitelist()
self.addCleanup(patcher.stop)
@ -241,18 +232,18 @@ class PciDeviceStatsTestCase(test.NoDBTestCase):
self.assertFalse(self.pci_stats.support_requests(pci_requests, cells))
def test_filter_pools_for_socket_affinity_no_socket(self):
self._setup_pci_stats(
objects.NUMATopology(
cells=[objects.NUMACell(socket=None)]))
self.pci_stats.numa_topology = objects.NUMATopology(
cells=[objects.NUMACell(socket=None)])
self.assertEqual(
[],
self.pci_stats._filter_pools_for_socket_affinity(
self.pci_stats.pools, [objects.InstanceNUMACell()]))
def test_filter_pools_for_socket_affinity(self):
self._setup_pci_stats(
objects.NUMATopology(
cells=[objects.NUMACell(id=1, socket=1)]))
self.pci_stats.numa_topology = objects.NUMATopology(
cells=[objects.NUMACell(id=1, socket=1)])
pools = self.pci_stats._filter_pools_for_socket_affinity(
self.pci_stats.pools, [objects.InstanceNUMACell(id=1)])
self.assertEqual(1, len(pools))

View File

@ -1458,20 +1458,17 @@ class MetadataHandlerTestCase(test.TestCase):
for c in range(ord('a'), ord('z'))]
mock_client.list_subnets.return_value = {
'subnets': subnet_list}
mock_client.list_ports.side_effect = fake_list_ports
with mock.patch.object(
mock_client, 'list_ports',
side_effect=fake_list_ports) as mock_list_ports:
response = fake_request(
self, self.mdinst,
relpath="/2009-04-04/user-data",
address="192.192.192.2",
fake_get_metadata_by_instance_id=self._fake_x_get_metadata,
headers={'X-Forwarded-For': '192.192.192.2',
'X-Metadata-Provider': proxy_lb_id})
response = fake_request(
self, self.mdinst,
relpath="/2009-04-04/user-data",
address="192.192.192.2",
fake_get_metadata_by_instance_id=self._fake_x_get_metadata,
headers={'X-Forwarded-For': '192.192.192.2',
'X-Metadata-Provider': proxy_lb_id})
self.assertEqual(3, mock_list_ports.call_count)
self.assertEqual(3, mock_client.list_ports.call_count)
self.assertEqual(200, response.status_int)

View File

@ -361,21 +361,6 @@ class PatchExistsTestCase(test.NoDBTestCase):
self.assertTrue(os.path.exists(os.path.dirname(__file__)))
self.assertFalse(os.path.exists('non-existent/file'))
@test.patch_exists('fake_file1', True)
@test.patch_exists('fake_file2', True)
@test.patch_exists(__file__, False)
def test_patch_exists_multiple_decorators(self):
"""Test that @patch_exists can be used multiple times on the
same method.
"""
self.assertTrue(os.path.exists('fake_file1'))
self.assertTrue(os.path.exists('fake_file2'))
self.assertFalse(os.path.exists(__file__))
# Check non-patched parameters
self.assertTrue(os.path.exists(os.path.dirname(__file__)))
self.assertFalse(os.path.exists('non-existent/file'))
class PatchOpenTestCase(test.NoDBTestCase):
fake_contents = "These file contents don't really exist"

View File

@ -1374,12 +1374,10 @@ class VMOpsTestCase(test_base.HyperVBaseTestCase):
def test_get_vm_state(self):
summary_info = {'EnabledState': os_win_const.HYPERV_VM_STATE_DISABLED}
with mock.patch.object(self._vmops._vmutils,
'get_vm_summary_info') as mock_get_summary_info:
mock_get_summary_info.return_value = summary_info
self._vmops._vmutils.get_vm_summary_info.return_value = summary_info
response = self._vmops._get_vm_state(mock.sentinel.FAKE_VM_NAME)
self.assertEqual(response, os_win_const.HYPERV_VM_STATE_DISABLED)
response = self._vmops._get_vm_state(mock.sentinel.FAKE_VM_NAME)
self.assertEqual(response, os_win_const.HYPERV_VM_STATE_DISABLED)
@mock.patch.object(vmops.VMOps, '_get_vm_state')
def test_wait_for_power_off_true(self, mock_get_state):
@ -1418,12 +1416,11 @@ class VMOpsTestCase(test_base.HyperVBaseTestCase):
def test_list_instance_uuids(self):
fake_uuid = '4f54fb69-d3a2-45b7-bb9b-b6e6b3d893b3'
with mock.patch.object(self._vmops._vmutils,
'list_instance_notes') as mock_list_notes:
mock_list_notes.return_value = [('fake_name', [fake_uuid])]
self._vmops._vmutils.list_instance_notes.return_value = (
[('fake_name', [fake_uuid])])
response = self._vmops.list_instance_uuids()
mock_list_notes.assert_called_once_with()
response = self._vmops.list_instance_uuids()
self._vmops._vmutils.list_instance_notes.assert_called_once_with()
self.assertEqual(response, [fake_uuid])

View File

@ -7004,14 +7004,12 @@ class LibvirtConnTestCase(test.NoDBTestCase,
self.assertEqual(cfg.devices[5].rate_bytes, 1024)
self.assertEqual(cfg.devices[5].rate_period, 2)
@mock.patch('nova.virt.libvirt.driver.os.path.exists')
@test.patch_exists(SEV_KERNEL_PARAM_FILE, False)
def test_get_guest_config_with_rng_backend(self, mock_path):
@test.patch_exists(SEV_KERNEL_PARAM_FILE, result=False, other=True)
def test_get_guest_config_with_rng_backend(self):
self.flags(virt_type='kvm',
rng_dev_path='/dev/hw_rng',
group='libvirt')
self.flags(pointer_model='ps2mouse')
mock_path.return_value = True
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
instance_ref = objects.Instance(**self.test_instance)
@ -7601,11 +7599,8 @@ class LibvirtConnTestCase(test.NoDBTestCase,
@mock.patch.object(libvirt_driver.LibvirtDriver,
"_get_guest_storage_config")
@mock.patch.object(libvirt_driver.LibvirtDriver, "_has_numa_support")
@mock.patch('os.path.exists', return_value=True)
@test.patch_exists(SEV_KERNEL_PARAM_FILE, False)
def test_get_guest_config_aarch64(
self, mock_path_exists, mock_numa, mock_storage,
):
@test.patch_exists(SEV_KERNEL_PARAM_FILE, result=False, other=True)
def test_get_guest_config_aarch64(self, mock_numa, mock_storage):
TEST_AMOUNT_OF_PCIE_SLOTS = 8
CONF.set_override("num_pcie_ports", TEST_AMOUNT_OF_PCIE_SLOTS,
group='libvirt')
@ -7625,7 +7620,6 @@ class LibvirtConnTestCase(test.NoDBTestCase,
cfg = drvr._get_guest_config(instance_ref,
_fake_network_info(self),
image_meta, disk_info)
self.assertTrue(mock_path_exists.called)
self.assertEqual(cfg.os_mach_type, "virt")
num_ports = 0
@ -7642,10 +7636,9 @@ class LibvirtConnTestCase(test.NoDBTestCase,
@mock.patch.object(libvirt_driver.LibvirtDriver,
"_get_guest_storage_config")
@mock.patch.object(libvirt_driver.LibvirtDriver, "_has_numa_support")
@mock.patch('os.path.exists', return_value=True)
@test.patch_exists(SEV_KERNEL_PARAM_FILE, False)
@test.patch_exists(SEV_KERNEL_PARAM_FILE, result=False, other=True)
def test_get_guest_config_aarch64_with_graphics(
self, mock_path_exists, mock_numa, mock_storage,
self, mock_numa, mock_storage,
):
self.mock_uname.return_value = fakelibvirt.os_uname(
'Linux', '', '5.4.0-0-generic', '', fields.Architecture.AARCH64)
@ -7655,7 +7648,6 @@ class LibvirtConnTestCase(test.NoDBTestCase,
cfg = self._get_guest_config_with_graphics()
self.assertTrue(mock_path_exists.called)
self.assertEqual(cfg.os_mach_type, "virt")
usbhost_exists = False
@ -11483,8 +11475,6 @@ class LibvirtConnTestCase(test.NoDBTestCase,
False,
False)
@mock.patch('nova.virt.libvirt.driver.LibvirtDriver.'
'_assert_dest_node_has_enough_disk')
@mock.patch('nova.virt.libvirt.driver.LibvirtDriver.'
'_assert_dest_node_has_enough_disk')
@mock.patch('nova.virt.libvirt.driver.LibvirtDriver.'
@ -11492,7 +11482,7 @@ class LibvirtConnTestCase(test.NoDBTestCase,
@mock.patch('nova.virt.libvirt.driver.LibvirtDriver.'
'_check_shared_storage_test_file')
def test_check_can_live_migration_source_disk_over_commit_none(self,
mock_check, mock_shared_block, mock_enough, mock_disk_check):
mock_check, mock_shared_block, mock_disk_check):
mock_check.return_value = False
mock_shared_block.return_value = False
@ -15626,8 +15616,7 @@ class LibvirtConnTestCase(test.NoDBTestCase,
filename=filename, size=100 * units.Gi, ephemeral_size=mock.ANY,
specified_fs=None)
@mock.patch.object(nova.virt.libvirt.imagebackend.Image, 'cache')
def test_create_image_resize_snap_backend(self, mock_cache):
def test_create_image_resize_snap_backend(self):
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
instance = objects.Instance(**self.test_instance)
instance.task_state = task_states.RESIZE_FINISH
@ -22141,11 +22130,8 @@ class LibvirtDriverTestCase(test.NoDBTestCase, TraitsComparisonMixin):
self.drvr.migrate_disk_and_power_off,
'ctx', instance, '10.0.0.1', flavor_obj, None)
@mock.patch('nova.virt.libvirt.driver.LibvirtDriver'
'._get_instance_disk_info')
@mock.patch('nova.virt.driver.block_device_info_get_ephemerals')
def test_migrate_disk_and_power_off_resize_error_eph(self, mock_get,
mock_get_disk_info):
def test_migrate_disk_and_power_off_resize_error_eph(self, mock_get):
mappings = [
{
'device_name': '/dev/sdb4',
@ -22192,7 +22178,6 @@ class LibvirtDriverTestCase(test.NoDBTestCase, TraitsComparisonMixin):
# Old flavor, eph is 20, real disk is 3, target is 2, fail
flavor = {'root_gb': 10, 'ephemeral_gb': 2}
flavor_obj = objects.Flavor(**flavor)
mock_get_disk_info.return_value = fake_disk_info_json(instance)
self.assertRaises(
exception.InstanceFaultRollback,
@ -25642,9 +25627,7 @@ class LibvirtDriverTestCase(test.NoDBTestCase, TraitsComparisonMixin):
}
self._test_get_gpu_inventories(drvr, expected, ['nvidia-11'])
@mock.patch('nova.virt.libvirt.driver.LibvirtDriver'
'._get_mdev_capable_devices')
def test_get_gpu_inventories_with_two_types(self, get_mdev_capable_devs):
def test_get_gpu_inventories_with_two_types(self):
self.flags(enabled_mdev_types=['nvidia-11', 'nvidia-12'],
group='devices')
# we need to call the below again to ensure the updated
@ -28591,13 +28574,11 @@ class LVMSnapshotTests(_BaseSnapshotTests):
new=mock.Mock(return_value=None))
@mock.patch('nova.virt.libvirt.utils.get_disk_type_from_path',
new=mock.Mock(return_value='lvm'))
@mock.patch('nova.virt.libvirt.utils.file_open',
side_effect=[io.BytesIO(b''), io.BytesIO(b'')])
@mock.patch.object(libvirt_driver.imagebackend.images,
'convert_image')
@mock.patch.object(libvirt_driver.imagebackend.lvm, 'volume_info')
def _test_lvm_snapshot(self, disk_format, mock_volume_info,
mock_convert_image, mock_file_open):
mock_convert_image):
self.flags(images_type='lvm',
images_volume_group='nova-vg', group='libvirt')

View File

@ -30,7 +30,7 @@ class LibvirtLightVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase):
device_scan_attempts=5)
@mock.patch('os_brick.initiator.connector.InitiatorConnector.factory',
new=mock.Mock(return_value=mock.Mock()))
new=mock.Mock())
def test_libvirt_lightos_driver_connect(self):
lightos_driver = lightos.LibvirtLightOSVolumeDriver(
self.fake_host)
@ -40,15 +40,16 @@ class LibvirtLightVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase):
'name': 'aLightVolume',
'conf': config}
connection_info = {'data': disk_info}
with mock.patch.object(lightos_driver.connector,
'connect_volume',
return_value={'path': '/dev/dms1234567'}):
lightos_driver.connect_volume(connection_info, None)
(lightos_driver.connector.connect_volume.
assert_called_once_with(
connection_info['data']))
self.assertEqual('/dev/dms1234567',
connection_info['data']['device_path'])
lightos_driver.connector.connect_volume.return_value = (
{'path': '/dev/dms1234567'})
lightos_driver.connect_volume(connection_info, None)
lightos_driver.connector.connect_volume.assert_called_once_with(
connection_info['data'])
self.assertEqual(
'/dev/dms1234567',
connection_info['data']['device_path'])
@mock.patch('os_brick.initiator.connector.InitiatorConnector.factory',
new=mock.Mock(return_value=mock.Mock()))

View File

@ -56,14 +56,15 @@ class LibvirtNVMEVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase):
'name': 'aNVMEVolume',
'conf': config}
connection_info = {'data': disk_info}
with mock.patch.object(nvme_driver.connector,
'connect_volume',
return_value={'path': '/dev/dms1234567'}):
nvme_driver.connect_volume(connection_info, None)
nvme_driver.connector.connect_volume.assert_called_once_with(
connection_info['data'])
self.assertEqual('/dev/dms1234567',
connection_info['data']['device_path'])
nvme_driver.connector.connect_volume.return_value = (
{'path': '/dev/dms1234567'})
nvme_driver.connect_volume(connection_info, None)
nvme_driver.connector.connect_volume.assert_called_once_with(
connection_info['data'])
self.assertEqual(
'/dev/dms1234567', connection_info['data']['device_path'])
@mock.patch('os_brick.initiator.connector.InitiatorConnector.factory',
new=mock.Mock(return_value=mock.Mock()))

View File

@ -437,24 +437,23 @@ class TestDriverBlockDevice(test.NoDBTestCase):
def _test_call_wait_func(self, delete_on_termination, delete_fail=False):
test_bdm = self.driver_classes['volume'](self.volume_bdm)
test_bdm['delete_on_termination'] = delete_on_termination
with mock.patch.object(self.volume_api, 'delete') as vol_delete:
wait_func = mock.MagicMock()
mock_exception = exception.VolumeNotCreated(volume_id='fake-id',
seconds=1,
attempts=1,
volume_status='error')
wait_func.side_effect = mock_exception
if delete_on_termination and delete_fail:
self.volume_api.delete.side_effect = Exception()
if delete_on_termination and delete_fail:
vol_delete.side_effect = Exception()
wait_func = mock.MagicMock()
mock_exception = exception.VolumeNotCreated(volume_id='fake-id',
seconds=1,
attempts=1,
volume_status='error')
wait_func.side_effect = mock_exception
self.assertRaises(exception.VolumeNotCreated,
test_bdm._call_wait_func,
context=self.context,
wait_func=wait_func,
volume_api=self.volume_api,
volume_id='fake-id')
self.assertEqual(delete_on_termination, vol_delete.called)
self.assertRaises(exception.VolumeNotCreated,
test_bdm._call_wait_func,
context=self.context,
wait_func=wait_func,
volume_api=self.volume_api,
volume_id='fake-id')
self.assertEqual(delete_on_termination, self.volume_api.delete.called)
def test_call_wait_delete_volume(self):
self._test_call_wait_func(True)
@ -487,25 +486,24 @@ class TestDriverBlockDevice(test.NoDBTestCase):
volume['shared_targets'] = True
volume['service_uuid'] = uuids.service_uuid
if delete_attachment_raises:
self.volume_api.attachment_delete.side_effect = (
delete_attachment_raises)
self.virt_driver.get_volume_connector.return_value = connector
with test.nested(
mock.patch.object(driver_bdm, '_get_volume', return_value=volume),
mock.patch.object(self.virt_driver, 'get_volume_connector',
return_value=connector),
mock.patch('os_brick.initiator.utils.guard_connection'),
mock.patch.object(self.volume_api, 'attachment_delete'),
) as (mock_get_volume, mock_get_connector, mock_guard,
vapi_attach_del):
if delete_attachment_raises:
vapi_attach_del.side_effect = delete_attachment_raises
) as (mock_get_volume, mock_guard):
driver_bdm.detach(elevated_context, instance,
self.volume_api, self.virt_driver,
attachment_id=attachment_id)
mock_guard.assert_called_once_with(volume)
vapi_attach_del.assert_called_once_with(elevated_context,
attachment_id)
self.volume_api.attachment_delete.assert_called_once_with(
elevated_context, attachment_id)
def test_volume_delete_attachment_with_shared_targets(self):
self.test_volume_delete_attachment(include_shared_targets=True)
@ -956,31 +954,28 @@ class TestDriverBlockDevice(test.NoDBTestCase):
instance = fake_instance.fake_instance_obj(mock.sentinel.ctx,
**{'uuid': uuids.uuid})
with test.nested(
mock.patch.object(self.volume_api, 'get_snapshot',
return_value=snapshot),
mock.patch.object(self.volume_api, 'create', return_value=volume),
mock.patch.object(self.volume_api, 'delete'),
) as (vol_get_snap, vol_create, vol_delete):
wait_func = mock.MagicMock()
mock_exception = exception.VolumeNotCreated(volume_id=volume['id'],
seconds=1,
attempts=1,
volume_status='error')
wait_func.side_effect = mock_exception
self.assertRaises(exception.VolumeNotCreated,
test_bdm.attach, context=self.context,
instance=instance,
volume_api=self.volume_api,
virt_driver=self.virt_driver,
wait_func=wait_func)
self.volume_api.get_snapshot.return_value = snapshot
self.volume_api.create.return_value = volume
wait_func = mock.MagicMock()
mock_exception = exception.VolumeNotCreated(volume_id=volume['id'],
seconds=1,
attempts=1,
volume_status='error')
wait_func.side_effect = mock_exception
self.assertRaises(exception.VolumeNotCreated,
test_bdm.attach, context=self.context,
instance=instance,
volume_api=self.volume_api,
virt_driver=self.virt_driver,
wait_func=wait_func)
vol_get_snap.assert_called_once_with(
self.context, 'fake-snapshot-id-1')
vol_create.assert_called_once_with(
self.context, 3, '', '', availability_zone=None,
snapshot=snapshot, volume_type=None)
vol_delete.assert_called_once_with(self.context, volume['id'])
self.volume_api.get_snapshot.assert_called_once_with(
self.context, 'fake-snapshot-id-1')
self.volume_api.create.assert_called_once_with(
self.context, 3, '', '', availability_zone=None,
snapshot=snapshot, volume_type=None)
self.volume_api.delete.assert_called_once_with(
self.context, volume['id'])
def test_snapshot_attach_volume(self):
test_bdm = self.driver_classes['volsnapshot'](
@ -988,19 +983,17 @@ class TestDriverBlockDevice(test.NoDBTestCase):
instance = {'id': 'fake_id', 'uuid': uuids.uuid}
with test.nested(
mock.patch.object(self.driver_classes['volume'], 'attach'),
mock.patch.object(self.volume_api, 'get_snapshot'),
mock.patch.object(self.volume_api, 'create'),
) as (mock_attach, mock_get_snapshot, mock_create):
with mock.patch.object(
self.driver_classes['volume'], 'attach'
) as mock_attach:
test_bdm.attach(self.context, instance, self.volume_api,
self.virt_driver)
self.assertEqual('fake-volume-id-2', test_bdm.volume_id)
mock_attach.assert_called_once_with(
self.context, instance, self.volume_api, self.virt_driver)
# Make sure theses are not called
mock_get_snapshot.assert_not_called()
mock_create.assert_not_called()
self.volume_api.get_snapshot.assert_not_called()
self.volume_api.create.assert_not_called()
def test_snapshot_attach_no_volume_and_no_volume_type(self):
bdm = self.driver_classes['volsnapshot'](self.volsnapshot_bdm)
@ -1010,15 +1003,10 @@ class TestDriverBlockDevice(test.NoDBTestCase):
original_volume = {'id': uuids.original_volume_id,
'volume_type_id': 'original_volume_type'}
new_volume = {'id': uuids.new_volume_id}
with test.nested(
mock.patch.object(self.driver_classes['volume'], 'attach'),
mock.patch.object(self.volume_api, 'get_snapshot',
return_value=snapshot),
mock.patch.object(self.volume_api, 'get',
return_value=original_volume),
mock.patch.object(self.volume_api, 'create',
return_value=new_volume),
) as (mock_attach, mock_get_snapshot, mock_get, mock_create):
self.volume_api.get_snapshot.return_value = snapshot
self.volume_api.get.return_value = original_volume
self.volume_api.create.return_value = new_volume
with mock.patch.object(self.driver_classes["volume"], "attach"):
bdm.volume_id = None
bdm.volume_type = None
bdm.attach(self.context, instance, self.volume_api,
@ -1026,10 +1014,11 @@ class TestDriverBlockDevice(test.NoDBTestCase):
# Assert that the original volume type is fetched, stored within
# the bdm and then used to create the new snapshot based volume.
mock_get.assert_called_once_with(self.context,
uuids.original_volume_id)
self.volume_api.get.assert_called_once_with(
self.context, uuids.original_volume_id)
self.assertEqual('original_volume_type', bdm.volume_type)
mock_create.assert_called_once_with(self.context, bdm.volume_size,
self.volume_api.create.assert_called_once_with(
self.context, bdm.volume_size,
'', '', volume_type='original_volume_type', snapshot=snapshot,
availability_zone=None)
@ -1101,27 +1090,25 @@ class TestDriverBlockDevice(test.NoDBTestCase):
instance = fake_instance.fake_instance_obj(mock.sentinel.ctx,
**{'uuid': uuids.uuid})
with test.nested(
mock.patch.object(self.volume_api, 'create', return_value=volume),
mock.patch.object(self.volume_api, 'delete'),
) as (vol_create, vol_delete):
wait_func = mock.MagicMock()
mock_exception = exception.VolumeNotCreated(volume_id=volume['id'],
seconds=1,
attempts=1,
volume_status='error')
wait_func.side_effect = mock_exception
self.assertRaises(exception.VolumeNotCreated,
test_bdm.attach, context=self.context,
instance=instance,
volume_api=self.volume_api,
virt_driver=self.virt_driver,
wait_func=wait_func)
self.volume_api.create.return_value = volume
wait_func = mock.MagicMock()
mock_exception = exception.VolumeNotCreated(volume_id=volume['id'],
seconds=1,
attempts=1,
volume_status='error')
wait_func.side_effect = mock_exception
self.assertRaises(exception.VolumeNotCreated,
test_bdm.attach, context=self.context,
instance=instance,
volume_api=self.volume_api,
virt_driver=self.virt_driver,
wait_func=wait_func)
vol_create.assert_called_once_with(
self.context, 1, '', '', image_id=image['id'],
availability_zone=None, volume_type=None)
vol_delete.assert_called_once_with(self.context, volume['id'])
self.volume_api.create.assert_called_once_with(
self.context, 1, '', '', image_id=image['id'],
availability_zone=None, volume_type=None)
self.volume_api.delete.assert_called_once_with(
self.context, volume['id'])
def test_image_attach_volume(self):
test_bdm = self.driver_classes['volimage'](
@ -1129,19 +1116,17 @@ class TestDriverBlockDevice(test.NoDBTestCase):
instance = {'id': 'fake_id', 'uuid': uuids.uuid}
with test.nested(
mock.patch.object(self.driver_classes['volume'], 'attach'),
mock.patch.object(self.volume_api, 'get_snapshot'),
mock.patch.object(self.volume_api, 'create'),
) as (mock_attch, mock_get_snapshot, mock_create):
with mock.patch.object(
self.driver_classes['volume'], 'attach'
) as mock_attach:
test_bdm.attach(self.context, instance, self.volume_api,
self.virt_driver)
self.assertEqual('fake-volume-id-2', test_bdm.volume_id)
mock_attch.assert_called_once_with(
mock_attach.assert_called_once_with(
self.context, instance, self.volume_api, self.virt_driver)
# Make sure theses are not called
mock_get_snapshot.assert_not_called()
mock_create.assert_not_called()
self.volume_api.get_snapshot.assert_not_called()
self.volume_api.create.assert_not_called()
def test_blank_attach_fail_volume(self):
no_blank_volume = self.volblank_bdm_dict.copy()
@ -1153,30 +1138,26 @@ class TestDriverBlockDevice(test.NoDBTestCase):
**{'uuid': uuids.uuid})
volume = {'id': 'fake-volume-id-2',
'display_name': '%s-blank-vol' % uuids.uuid}
self.volume_api.create.return_value = volume
wait_func = mock.MagicMock()
mock_exception = exception.VolumeNotCreated(volume_id=volume['id'],
seconds=1,
attempts=1,
volume_status='error')
wait_func.side_effect = mock_exception
self.assertRaises(exception.VolumeNotCreated,
test_bdm.attach, context=self.context,
instance=instance,
volume_api=self.volume_api,
virt_driver=self.virt_driver,
wait_func=wait_func)
with test.nested(
mock.patch.object(self.volume_api, 'create', return_value=volume),
mock.patch.object(self.volume_api, 'delete'),
) as (vol_create, vol_delete):
wait_func = mock.MagicMock()
mock_exception = exception.VolumeNotCreated(volume_id=volume['id'],
seconds=1,
attempts=1,
volume_status='error')
wait_func.side_effect = mock_exception
self.assertRaises(exception.VolumeNotCreated,
test_bdm.attach, context=self.context,
instance=instance,
volume_api=self.volume_api,
virt_driver=self.virt_driver,
wait_func=wait_func)
vol_create.assert_called_once_with(
self.context, test_bdm.volume_size,
'%s-blank-vol' % uuids.uuid,
'', volume_type=None, availability_zone=None)
vol_delete.assert_called_once_with(
self.context, volume['id'])
self.volume_api.create.assert_called_once_with(
self.context, test_bdm.volume_size,
'%s-blank-vol' % uuids.uuid,
'', volume_type=None, availability_zone=None)
self.volume_api.delete.assert_called_once_with(
self.context, volume['id'])
def test_blank_attach_volume(self):
no_blank_volume = self.volblank_bdm_dict.copy()
@ -1485,13 +1466,9 @@ class TestDriverBlockDevice(test.NoDBTestCase):
'display_name': 'fake-snapshot-vol'}
self.stub_volume_create(volume)
with test.nested(
mock.patch.object(self.volume_api, 'get_snapshot',
return_value=snapshot),
mock.patch.object(volume_class, 'attach')
) as (
vol_get_snap, vol_attach
):
self.volume_api.get_snapshot.return_value = snapshot
with mock.patch.object(volume_class, 'attach') as vol_attach:
test_bdm.attach(self.context, instance, self.volume_api,
self.virt_driver)

View File

@ -117,13 +117,11 @@ class VMwareImagesTestCase(test.NoDBTestCase):
mock.patch.object(images.IMAGE_API, 'download'),
mock.patch.object(images, 'image_transfer'),
mock.patch.object(images, '_build_shadow_vm_config_spec'),
mock.patch.object(session, '_call_method'),
mock.patch.object(vm_util, 'get_vmdk_info')
) as (mock_image_api_get,
mock_image_api_download,
mock_image_transfer,
mock_build_shadow_vm_config_spec,
mock_call_method,
mock_get_vmdk_info):
image_data = {'id': 'fake-id',
'disk_format': 'vmdk',
@ -172,7 +170,7 @@ class VMwareImagesTestCase(test.NoDBTestCase):
mock_write_handle)
mock_get_vmdk_info.assert_called_once_with(
session, mock.sentinel.vm_ref, 'fake-vm')
mock_call_method.assert_called_once_with(
session._call_method.assert_called_once_with(
session.vim, "UnregisterVM", mock.sentinel.vm_ref)
@mock.patch('oslo_vmware.rw_handles.ImageReadHandle')
@ -188,13 +186,11 @@ class VMwareImagesTestCase(test.NoDBTestCase):
mock.patch.object(images.IMAGE_API, 'download'),
mock.patch.object(images, 'image_transfer'),
mock.patch.object(images, '_build_shadow_vm_config_spec'),
mock.patch.object(session, '_call_method'),
mock.patch.object(vm_util, 'get_vmdk_info')
) as (mock_image_api_get,
mock_image_api_download,
mock_image_transfer,
mock_build_shadow_vm_config_spec,
mock_call_method,
mock_get_vmdk_info):
image_data = {'id': 'fake-id',
'disk_format': 'vmdk',
@ -220,7 +216,7 @@ class VMwareImagesTestCase(test.NoDBTestCase):
mock_image_transfer.assert_called_once_with(mock_read_handle,
mock_write_handle)
mock_call_method.assert_called_once_with(
session._call_method.assert_called_once_with(
session.vim, "UnregisterVM", mock.sentinel.vm_ref)
mock_get_vmdk_info.assert_called_once_with(
session, mock.sentinel.vm_ref, 'fake-vm')