Stop using mox in unit/*.py (2/2)

As mox is not compatible with python 3, all test cases that
use mox should be updated to use mock instead.

Replaced mox with mock and stubs.Set() with mock_object() in
some classes in cinder/tests/unit/.

Change-Id: I2f77d3dc9d09cdaf88be6f8e1b0894e7548ea2e5
This commit is contained in:
Dao Cong Tien 2016-08-16 16:40:05 +07:00
parent c240eafa0a
commit 14063a1eb1
3 changed files with 195 additions and 196 deletions

View File

@ -99,8 +99,8 @@ class QoSSpecsTestCase(test.TestCase):
del input['consumer']
self.stubs.Set(db, 'qos_specs_create',
fake_db_qos_specs_create)
self.mock_object(db, 'qos_specs_create',
fake_db_qos_specs_create)
# able to catch DBError
self.assertRaises(exception.QoSSpecsCreateFailed,
qos_specs.create, self.ctxt, 'FailQoSName', input)
@ -132,7 +132,7 @@ class QoSSpecsTestCase(test.TestCase):
qos_specs.update, self.ctxt, specs_id,
{'consumer': 'not-real'})
self.stubs.Set(db, 'qos_specs_update', fake_db_update)
self.mock_object(db, 'qos_specs_update', fake_db_update)
self.assertRaises(exception.QoSSpecsUpdateFailed,
qos_specs.update, self.ctxt, specs_id, {'key':
'new_key'})
@ -153,11 +153,11 @@ class QoSSpecsTestCase(test.TestCase):
def fake_disassociate_all(context, id):
pass
self.stubs.Set(db, 'qos_specs_associations_get',
fake_db_associations_get)
self.stubs.Set(qos_specs, 'disassociate_all',
fake_disassociate_all)
self.stubs.Set(db, 'qos_specs_delete', fake_db_delete)
self.mock_object(db, 'qos_specs_associations_get',
fake_db_associations_get)
self.mock_object(qos_specs, 'disassociate_all',
fake_disassociate_all)
self.mock_object(db, 'qos_specs_delete', fake_db_delete)
self.assertRaises(exception.InvalidQoSSpecs,
qos_specs.delete, self.ctxt, None)
self.assertRaises(exception.QoSSpecsNotFound,
@ -179,12 +179,6 @@ class QoSSpecsTestCase(test.TestCase):
else:
pass
def fake_qos_specs_get(context, id):
if id == 'NotFound':
raise exception.QoSSpecsNotFound(specs_id=id)
else:
pass
value = {'foo': 'Foo', 'bar': 'Bar', 'zoo': 'tiger'}
name = 'QoSName'
consumer = 'front-end'
@ -204,7 +198,7 @@ class QoSSpecsTestCase(test.TestCase):
'specs': specs['specs']}
self.assertDictMatch(expected, specs_dic)
self.stubs.Set(db, 'qos_specs_item_delete', fake_db_qos_delete_key)
self.mock_object(db, 'qos_specs_item_delete', fake_db_qos_delete_key)
self.assertRaises(exception.InvalidQoSSpecs,
qos_specs.delete_keys, self.ctxt, None, [])
self.assertRaises(exception.QoSSpecsNotFound,
@ -267,11 +261,11 @@ class QoSSpecsTestCase(test.TestCase):
self.assertEqual('TypeName', res[0]['name'])
self.assertEqual(type_ref['id'], res[0]['id'])
self.stubs.Set(db, 'qos_specs_associate',
fake_db_associate)
self.stubs.Set(qos_specs, 'get_qos_specs', fake_qos_specs_get)
self.stubs.Set(volume_types, 'get_volume_type_qos_specs',
fake_vol_type_qos_get)
self.mock_object(db, 'qos_specs_associate',
fake_db_associate)
self.mock_object(qos_specs, 'get_qos_specs', fake_qos_specs_get)
self.mock_object(volume_types, 'get_volume_type_qos_specs',
fake_vol_type_qos_get)
self.assertRaises(exception.VolumeTypeNotFound,
qos_specs.associate_qos_with_type,
self.ctxt, 'specs-id', 'NotFound')
@ -309,8 +303,8 @@ class QoSSpecsTestCase(test.TestCase):
# not associated with no error
qos_specs.disassociate_qos_specs(self.ctxt, specs_id, type_ref['id'])
qos_specs.associate_qos_with_type(self.ctxt, specs_id, type_ref['id'])
self.stubs.Set(db, 'qos_specs_disassociate',
fake_db_disassociate)
self.mock_object(db, 'qos_specs_disassociate',
fake_db_disassociate)
self.assertRaises(exception.QoSSpecsDisassociateFailed,
qos_specs.disassociate_qos_specs,
self.ctxt, specs_id, type_ref['id'])
@ -342,10 +336,10 @@ class QoSSpecsTestCase(test.TestCase):
res = qos_specs.get_associations(self.ctxt, specs_id)
self.assertEqual(0, len(res))
self.stubs.Set(db, 'qos_specs_disassociate_all',
fake_db_disassociate_all)
self.stubs.Set(qos_specs, 'get_qos_specs',
fake_qos_specs_get)
self.mock_object(db, 'qos_specs_disassociate_all',
fake_db_disassociate_all)
self.mock_object(qos_specs, 'get_qos_specs',
fake_qos_specs_get)
self.assertRaises(exception.QoSSpecsDisassociateFailed,
qos_specs.disassociate_all,
self.ctxt, 'Trouble')

View File

@ -838,7 +838,7 @@ class VolumeTypeQuotaEngineTestCase(test.TestCase):
def test_default_resources(self):
def fake_vtga(context, inactive=False, filters=None):
return {}
self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
self.mock_object(db, 'volume_type_get_all', fake_vtga)
engine = quota.VolumeTypeQuotaEngine()
self.assertEqual(['backup_gigabytes', 'backups',
@ -864,7 +864,7 @@ class VolumeTypeQuotaEngineTestCase(test.TestCase):
'extra_specs': {},
},
}
self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
self.mock_object(db, 'volume_type_get_all', fake_vtga)
engine = quota.VolumeTypeQuotaEngine()
self.assertEqual(['backup_gigabytes', 'backups',
@ -920,8 +920,8 @@ class DbQuotaDriverBaseTestCase(test.TestCase):
self.mock_utcnow = patcher.start()
self.mock_utcnow.return_value = datetime.datetime.utcnow()
def _stub_quota_class_get_default(self):
# Stub out quota_class_get_default
def _mock_quota_class_get_default(self):
# Mock quota_class_get_default
def fake_qcgd(context):
self.calls.append('quota_class_get_default')
return dict(volumes=10,
@ -930,30 +930,30 @@ class DbQuotaDriverBaseTestCase(test.TestCase):
backups=10,
backup_gigabytes=1000
)
self.stubs.Set(db, 'quota_class_get_default', fake_qcgd)
self.mock_object(db, 'quota_class_get_default', fake_qcgd)
def _stub_volume_type_get_all(self):
def _mock_volume_type_get_all(self):
def fake_vtga(context, inactive=False, filters=None):
return {}
self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
self.mock_object(db, 'volume_type_get_all', fake_vtga)
def _stub_quota_class_get_all_by_name(self):
# Stub out quota_class_get_all_by_name
def _mock_quota_class_get_all_by_name(self):
# Mock quota_class_get_all_by_name
def fake_qcgabn(context, quota_class):
self.calls.append('quota_class_get_all_by_name')
self.assertEqual('test_class', quota_class)
return dict(gigabytes=500, volumes=10, snapshots=10, backups=10,
backup_gigabytes=500)
self.stubs.Set(db, 'quota_class_get_all_by_name', fake_qcgabn)
self.mock_object(db, 'quota_class_get_all_by_name', fake_qcgabn)
def _stub_allocated_get_all_by_project(self, allocated_quota=False):
def _mock_allocated_get_all_by_project(self, allocated_quota=False):
def fake_qagabp(context, project_id):
self.calls.append('quota_allocated_get_all_by_project')
if allocated_quota:
return dict(project_id=project_id, volumes=3)
return dict(project_id=project_id)
self.stubs.Set(db, 'quota_allocated_get_all_by_project', fake_qagabp)
self.mock_object(db, 'quota_allocated_get_all_by_project', fake_qagabp)
class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
@ -964,8 +964,8 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
def test_get_defaults(self):
# Use our pre-defined resources
self._stub_quota_class_get_default()
self._stub_volume_type_get_all()
self._mock_quota_class_get_default()
self._mock_volume_type_get_all()
result = self.driver.get_defaults(None, quota.QUOTAS.resources)
self.assertEqual(
@ -978,8 +978,8 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
per_volume_gigabytes=-1), result)
def test_get_class_quotas(self):
self._stub_quota_class_get_all_by_name()
self._stub_volume_type_get_all()
self._mock_quota_class_get_all_by_name()
self._mock_volume_type_get_all()
result = self.driver.get_class_quotas(None, quota.QUOTAS.resources,
'test_class')
@ -992,7 +992,7 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
per_volume_gigabytes=-1), result)
def test_get_class_quotas_no_defaults(self):
self._stub_quota_class_get_all_by_name()
self._mock_quota_class_get_all_by_name()
result = self.driver.get_class_quotas(None, quota.QUOTAS.resources,
'test_class', False)
@ -1003,7 +1003,7 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
backups=10,
backup_gigabytes=500), result)
def _stub_get_by_project(self):
def _mock_get_by_project(self):
def fake_qgabp(context, project_id):
self.calls.append('quota_get_all_by_project')
self.assertEqual('test_project', project_id)
@ -1021,16 +1021,16 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
backup_gigabytes=dict(in_use=10, reserved=0)
)
self.stubs.Set(db, 'quota_get_all_by_project', fake_qgabp)
self.stubs.Set(db, 'quota_usage_get_all_by_project', fake_qugabp)
self.mock_object(db, 'quota_get_all_by_project', fake_qgabp)
self.mock_object(db, 'quota_usage_get_all_by_project', fake_qugabp)
self._stub_quota_class_get_all_by_name()
self._stub_quota_class_get_default()
self._mock_quota_class_get_all_by_name()
self._mock_quota_class_get_default()
def test_get_project_quotas(self):
self._stub_get_by_project()
self._stub_volume_type_get_all()
self._stub_allocated_get_all_by_project()
self._mock_get_by_project()
self._mock_volume_type_get_all()
self._mock_allocated_get_all_by_project()
result = self.driver.get_project_quotas(
FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources, 'test_project')
@ -1079,9 +1079,9 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
self.assertTrue(mock_defaults.called)
def test_get_root_project_with_subprojects_quotas(self):
self._stub_get_by_project()
self._stub_volume_type_get_all()
self._stub_allocated_get_all_by_project(allocated_quota=True)
self._mock_get_by_project()
self._mock_volume_type_get_all()
self._mock_allocated_get_all_by_project(allocated_quota=True)
result = self.driver.get_project_quotas(
FakeContext('test_project', None),
quota.QUOTAS.resources, 'test_project')
@ -1117,8 +1117,8 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
), result)
def test_get_project_quotas_alt_context_no_class(self):
self._stub_get_by_project()
self._stub_volume_type_get_all()
self._mock_get_by_project()
self._mock_volume_type_get_all()
result = self.driver.get_project_quotas(
FakeContext('other_project', 'other_class'),
quota.QUOTAS.resources, 'test_project')
@ -1147,8 +1147,8 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
), result)
def test_get_project_quotas_alt_context_with_class(self):
self._stub_get_by_project()
self._stub_volume_type_get_all()
self._mock_get_by_project()
self._mock_volume_type_get_all()
result = self.driver.get_project_quotas(
FakeContext('other_project', 'other_class'),
quota.QUOTAS.resources, 'test_project', quota_class='test_class')
@ -1178,8 +1178,8 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
result)
def test_get_project_quotas_no_defaults(self):
self._stub_get_by_project()
self._stub_volume_type_get_all()
self._mock_get_by_project()
self._mock_volume_type_get_all()
result = self.driver.get_project_quotas(
FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources, 'test_project', defaults=False)
@ -1206,8 +1206,8 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
), result)
def test_get_project_quotas_no_usages(self):
self._stub_get_by_project()
self._stub_volume_type_get_all()
self._mock_get_by_project()
self._mock_volume_type_get_all()
result = self.driver.get_project_quotas(
FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources, 'test_project', usages=False)
@ -1222,18 +1222,18 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
backup_gigabytes=dict(limit=50, ),
per_volume_gigabytes=dict(limit=-1, )), result)
def _stub_get_project_quotas(self):
def _mock_get_project_quotas(self):
def fake_get_project_quotas(context, resources, project_id,
quota_class=None, defaults=True,
usages=True, parent_project_id=None):
self.calls.append('get_project_quotas')
return {k: dict(limit=v.default) for k, v in resources.items()}
self.stubs.Set(self.driver, 'get_project_quotas',
fake_get_project_quotas)
self.mock_object(self.driver, 'get_project_quotas',
fake_get_project_quotas)
def test_get_quotas_has_sync_unknown(self):
self._stub_get_project_quotas()
self._mock_get_project_quotas()
self.assertRaises(exception.QuotaResourceUnknown,
self.driver._get_quotas,
None, quota.QUOTAS.resources,
@ -1241,7 +1241,7 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
self.assertEqual([], self.calls)
def test_get_quotas_no_sync_unknown(self):
self._stub_get_project_quotas()
self._mock_get_project_quotas()
self.assertRaises(exception.QuotaResourceUnknown,
self.driver._get_quotas,
None, quota.QUOTAS.resources,
@ -1249,7 +1249,7 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
self.assertEqual([], self.calls)
def test_get_quotas_has_sync_no_sync_resource(self):
self._stub_get_project_quotas()
self._mock_get_project_quotas()
self.assertRaises(exception.QuotaResourceUnknown,
self.driver._get_quotas,
None, quota.QUOTAS.resources,
@ -1257,7 +1257,7 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
self.assertEqual([], self.calls)
def test_get_quotas_no_sync_has_sync_resource(self):
self._stub_get_project_quotas()
self._mock_get_project_quotas()
self.assertRaises(exception.QuotaResourceUnknown,
self.driver._get_quotas,
None, quota.QUOTAS.resources,
@ -1265,7 +1265,7 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
self.assertEqual([], self.calls)
def test_get_quotas_has_sync(self):
self._stub_get_project_quotas()
self._mock_get_project_quotas()
result = self.driver._get_quotas(FakeContext('test_project',
'test_class'),
quota.QUOTAS.resources,
@ -1275,17 +1275,17 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
self.assertEqual(['get_project_quotas'], self.calls)
self.assertEqual(dict(volumes=10, gigabytes=1000, ), result)
def _stub_quota_reserve(self):
def _mock_quota_reserve(self):
def fake_quota_reserve(context, resources, quotas, deltas, expire,
until_refresh, max_age, project_id=None):
self.calls.append(('quota_reserve', expire, until_refresh,
max_age))
return ['resv-1', 'resv-2', 'resv-3']
self.stubs.Set(db, 'quota_reserve', fake_quota_reserve)
self.mock_object(db, 'quota_reserve', fake_quota_reserve)
def test_reserve_bad_expire(self):
self._stub_get_project_quotas()
self._stub_quota_reserve()
self._mock_get_project_quotas()
self._mock_quota_reserve()
self.assertRaises(exception.InvalidReservationExpiration,
self.driver.reserve,
FakeContext('test_project', 'test_class'),
@ -1294,8 +1294,8 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
self.assertEqual([], self.calls)
def test_reserve_default_expire(self):
self._stub_get_project_quotas()
self._stub_quota_reserve()
self._mock_get_project_quotas()
self._mock_quota_reserve()
result = self.driver.reserve(FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources,
dict(volumes=2))
@ -1306,8 +1306,8 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
self.assertEqual(['resv-1', 'resv-2', 'resv-3'], result)
def test_reserve_int_expire(self):
self._stub_get_project_quotas()
self._stub_quota_reserve()
self._mock_get_project_quotas()
self._mock_quota_reserve()
result = self.driver.reserve(FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources,
dict(volumes=2), expire=3600)
@ -1318,8 +1318,8 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
self.assertEqual(['resv-1', 'resv-2', 'resv-3'], result)
def test_reserve_timedelta_expire(self):
self._stub_get_project_quotas()
self._stub_quota_reserve()
self._mock_get_project_quotas()
self._mock_quota_reserve()
expire_delta = datetime.timedelta(seconds=60)
result = self.driver.reserve(FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources,
@ -1331,8 +1331,8 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
self.assertEqual(['resv-1', 'resv-2', 'resv-3'], result)
def test_reserve_datetime_expire(self):
self._stub_get_project_quotas()
self._stub_quota_reserve()
self._mock_get_project_quotas()
self._mock_quota_reserve()
expire = timeutils.utcnow() + datetime.timedelta(seconds=120)
result = self.driver.reserve(FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources,
@ -1343,8 +1343,8 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
self.assertEqual(['resv-1', 'resv-2', 'resv-3'], result)
def test_reserve_until_refresh(self):
self._stub_get_project_quotas()
self._stub_quota_reserve()
self._mock_get_project_quotas()
self._mock_quota_reserve()
self.flags(until_refresh=500)
expire = timeutils.utcnow() + datetime.timedelta(seconds=120)
result = self.driver.reserve(FakeContext('test_project', 'test_class'),
@ -1356,8 +1356,8 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
self.assertEqual(['resv-1', 'resv-2', 'resv-3'], result)
def test_reserve_max_age(self):
self._stub_get_project_quotas()
self._stub_quota_reserve()
self._mock_get_project_quotas()
self._mock_quota_reserve()
self.flags(max_age=86400)
expire = timeutils.utcnow() + datetime.timedelta(seconds=120)
result = self.driver.reserve(FakeContext('test_project', 'test_class'),
@ -1368,15 +1368,15 @@ class DbQuotaDriverTestCase(DbQuotaDriverBaseTestCase):
('quota_reserve', expire, 0, 86400), ], self.calls)
self.assertEqual(['resv-1', 'resv-2', 'resv-3'], result)
def _stub_quota_destroy_by_project(self):
def _mock_quota_destroy_by_project(self):
def fake_quota_destroy_by_project(context, project_id):
self.calls.append(('quota_destroy_by_project', project_id))
return None
self.stubs.Set(sqa_api, 'quota_destroy_by_project',
fake_quota_destroy_by_project)
self.mock_object(sqa_api, 'quota_destroy_by_project',
fake_quota_destroy_by_project)
def test_destroy_quota_by_project(self):
self._stub_quota_destroy_by_project()
self._mock_quota_destroy_by_project()
self.driver.destroy_by_project(FakeContext('test_project',
'test_class'),
'test_project')
@ -1426,7 +1426,7 @@ class NestedDbQuotaDriverBaseTestCase(DbQuotaDriverBaseTestCase):
self.fixture.config(auth_uri=self.auth_url, group='keystone_authtoken')
self.driver = quota.NestedDbQuotaDriver()
def _stub_get_by_subproject(self):
def _mock_get_by_subproject(self):
def fake_qgabp(context, project_id):
self.calls.append('quota_get_all_by_project')
return dict(volumes=10, gigabytes=50, reserved=0)
@ -1436,15 +1436,15 @@ class NestedDbQuotaDriverBaseTestCase(DbQuotaDriverBaseTestCase):
return dict(volumes=dict(in_use=2, reserved=0),
gigabytes=dict(in_use=10, reserved=0))
self.stubs.Set(db, 'quota_get_all_by_project', fake_qgabp)
self.stubs.Set(db, 'quota_usage_get_all_by_project', fake_qugabp)
self.mock_object(db, 'quota_get_all_by_project', fake_qgabp)
self.mock_object(db, 'quota_usage_get_all_by_project', fake_qugabp)
self._stub_quota_class_get_all_by_name()
self._mock_quota_class_get_all_by_name()
class NestedDbQuotaDriverTestCase(NestedDbQuotaDriverBaseTestCase):
def test_get_defaults(self):
self._stub_volume_type_get_all()
self._mock_volume_type_get_all()
# Test for child project defaults
result = self.driver.get_defaults(self.context,
@ -1473,9 +1473,9 @@ class NestedDbQuotaDriverTestCase(NestedDbQuotaDriverBaseTestCase):
project_id=self._child_proj_id)
def test_get_subproject_quotas(self):
self._stub_get_by_subproject()
self._stub_volume_type_get_all()
self._stub_allocated_get_all_by_project(allocated_quota=True)
self._mock_get_by_subproject()
self._mock_volume_type_get_all()
self._mock_allocated_get_all_by_project(allocated_quota=True)
result = self.driver.get_project_quotas(
self.context,
quota.QUOTAS.resources, self._child_proj_id)
@ -1695,7 +1695,7 @@ class QuotaReserveSqlAlchemyTestCase(test.TestCase):
QUOTA_SYNC_FUNCTIONS['_sync_%s' % res_name] = make_sync(res_name)
self.resources[res_name] = res
self.stubs.Set(sqa_api, 'QUOTA_SYNC_FUNCTIONS', QUOTA_SYNC_FUNCTIONS)
self.mock_object(sqa_api, 'QUOTA_SYNC_FUNCTIONS', QUOTA_SYNC_FUNCTIONS)
self.expire = timeutils.utcnow() + datetime.timedelta(seconds=3600)
self.usages = {}
@ -1734,12 +1734,16 @@ class QuotaReserveSqlAlchemyTestCase(test.TestCase):
self.assertEqual('test_project', project_id)
return {'project_id': project_id}
self.stubs.Set(sqa_api, 'quota_allocated_get_all_by_project',
fake_qagabp)
self.stubs.Set(sqa_api, 'get_session', fake_get_session)
self.stubs.Set(sqa_api, '_get_quota_usages', fake_get_quota_usages)
self.stubs.Set(sqa_api, '_quota_usage_create', fake_quota_usage_create)
self.stubs.Set(sqa_api, '_reservation_create', fake_reservation_create)
self.mock_object(sqa_api, 'quota_allocated_get_all_by_project',
fake_qagabp)
self.mock_object(sqa_api, 'get_session',
fake_get_session)
self.mock_object(sqa_api, '_get_quota_usages',
fake_get_quota_usages)
self.mock_object(sqa_api, '_quota_usage_create',
fake_quota_usage_create)
self.mock_object(sqa_api, '_reservation_create',
fake_reservation_create)
patcher = mock.patch.object(timeutils, 'utcnow')
self.addCleanup(patcher.stop)
@ -1819,7 +1823,7 @@ class QuotaReserveSqlAlchemyTestCase(test.TestCase):
self.assertEqual(0, len(reservations))
def _stub_allocated_get_all_by_project(self, allocated_quota=False):
def _mock_allocated_get_all_by_project(self, allocated_quota=False):
def fake_qagabp(context, project_id):
self.assertEqual('test_project', project_id)
if allocated_quota:
@ -1827,14 +1831,13 @@ class QuotaReserveSqlAlchemyTestCase(test.TestCase):
gigabytes = 2 * 1024)
return dict(project_id=project_id)
self.stubs.Set(sqa_api,
'quota_allocated_get_all_by_project',
fake_qagabp)
self.mock_object(sqa_api, 'quota_allocated_get_all_by_project',
fake_qagabp)
def test_quota_reserve_with_allocated(self):
context = FakeContext('test_project', 'test_class')
# Allocated quota for volume will be updated for 3
self._stub_allocated_get_all_by_project(allocated_quota=True)
self._mock_allocated_get_all_by_project(allocated_quota=True)
# Quota limited for volume updated for 10
quotas = dict(volumes=10,
gigabytes=10 * 1024, )
@ -1868,7 +1871,7 @@ class QuotaReserveSqlAlchemyTestCase(test.TestCase):
gigabytes=10 * 1024, )
deltas = dict(volumes=2,
gigabytes=2 * 1024, )
self._stub_allocated_get_all_by_project()
self._mock_allocated_get_all_by_project()
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 0, 0)
@ -1902,7 +1905,7 @@ class QuotaReserveSqlAlchemyTestCase(test.TestCase):
gigabytes=10 * 1024, )
deltas = dict(volumes=2,
gigabytes=2 * 1024, )
self._stub_allocated_get_all_by_project()
self._mock_allocated_get_all_by_project()
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 5, 0)
@ -1933,7 +1936,7 @@ class QuotaReserveSqlAlchemyTestCase(test.TestCase):
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=2, gigabytes=2 * 1024, )
self._stub_allocated_get_all_by_project()
self._mock_allocated_get_all_by_project()
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 5, 0)
@ -1969,7 +1972,7 @@ class QuotaReserveSqlAlchemyTestCase(test.TestCase):
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=2, gigabytes=2 * 1024, )
self._stub_allocated_get_all_by_project()
self._mock_allocated_get_all_by_project()
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 0, max_age)
@ -2000,7 +2003,7 @@ class QuotaReserveSqlAlchemyTestCase(test.TestCase):
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=2, gigabytes=2 * 1024, )
self._stub_allocated_get_all_by_project()
self._mock_allocated_get_all_by_project()
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 0, 0)
@ -2031,7 +2034,7 @@ class QuotaReserveSqlAlchemyTestCase(test.TestCase):
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=-2, gigabytes=-2 * 1024, )
self._stub_allocated_get_all_by_project()
self._mock_allocated_get_all_by_project()
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 0, 0)
@ -2062,7 +2065,7 @@ class QuotaReserveSqlAlchemyTestCase(test.TestCase):
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=2, gigabytes=2 * 1024, )
self._stub_allocated_get_all_by_project()
self._mock_allocated_get_all_by_project()
self.assertRaises(exception.OverQuota,
sqa_api.quota_reserve,
context, self.resources, quotas,
@ -2088,7 +2091,7 @@ class QuotaReserveSqlAlchemyTestCase(test.TestCase):
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=-2, gigabytes=-2 * 1024, )
self._stub_allocated_get_all_by_project()
self._mock_allocated_get_all_by_project()
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 0, 0)

View File

@ -27,7 +27,6 @@ import uuid
import enum
import eventlet
import mock
from mox3 import mox
import os_brick
from oslo_concurrency import processutils
from oslo_config import cfg
@ -150,12 +149,12 @@ class BaseVolumeTestCase(test.TestCase):
'status': 'creating',
'host': CONF.host,
'size': 1}
self.stubs.Set(brick_lvm.LVM,
'get_all_volume_groups',
self.fake_get_all_volume_groups)
self.mock_object(brick_lvm.LVM,
'get_all_volume_groups',
self.fake_get_all_volume_groups)
fake_image.mock_image_service(self)
self.stubs.Set(brick_lvm.LVM, '_vg_exists', lambda x: True)
self.stubs.Set(os.path, 'exists', lambda x: True)
self.mock_object(brick_lvm.LVM, '_vg_exists', lambda x: True)
self.mock_object(os.path, 'exists', lambda x: True)
self.volume.driver.set_initialized()
self.volume.stats = {'allocated_capacity_gb': 0,
'pools': {}}
@ -208,13 +207,14 @@ class BaseVolumeTestCase(test.TestCase):
dst_fd, dst_path = tempfile.mkstemp()
os.close(dst_fd)
self.stubs.Set(self.volume.driver, 'local_path', fake_local_path)
self.mock_object(self.volume.driver, 'local_path', fake_local_path)
if fakeout_clone_image:
self.stubs.Set(self.volume.driver, 'clone_image', fake_clone_image)
self.stubs.Set(image_utils, 'fetch_to_raw', fake_fetch_to_raw)
self.mock_object(self.volume.driver, 'clone_image',
fake_clone_image)
self.mock_object(image_utils, 'fetch_to_raw', fake_fetch_to_raw)
if fakeout_copy_image_to_volume:
self.stubs.Set(self.volume.driver, 'copy_image_to_volume',
fake_copy_image_to_volume)
self.mock_object(self.volume.driver, 'copy_image_to_volume',
fake_copy_image_to_volume)
mock_clone_image_volume.return_value = ({}, clone_image_volume)
mock_fetch_img.return_value = mock.MagicMock(
spec=tests_utils.get_file_spec())
@ -1207,15 +1207,15 @@ class VolumeTestCase(BaseVolumeTestCase):
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id, volume=volume)
self.mox.StubOutWithMock(self.volume.driver, 'delete_volume')
self.volume.driver.delete_volume(
mox.IgnoreArg()).AndRaise(exception.VolumeIsBusy(
volume_name='fake'))
self.mox.ReplayAll()
self.volume.delete_volume(self.context, volume_id, volume=volume)
volume_ref = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(volume_id, volume_ref.id)
self.assertEqual("available", volume_ref.status)
with mock.patch.object(self.volume.driver, 'delete_volume',
side_effect=exception.VolumeIsBusy(
volume_name='fake')
) as mock_del_vol:
self.volume.delete_volume(self.context, volume_id, volume=volume)
volume_ref = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(volume_id, volume_ref.id)
self.assertEqual("available", volume_ref.status)
mock_del_vol.assert_called_once_with(volume)
def test_get_volume_different_tenant(self):
"""Test can't get volume of another tenant when viewable_admin_meta."""
@ -1636,7 +1636,7 @@ class VolumeTestCase(BaseVolumeTestCase):
admin_ctxt = context.get_admin_context()
# mock the flow runner so we can do some checks
self.stubs.Set(engine.ActionEngine, 'run', mock_flow_run)
self.mock_object(engine.ActionEngine, 'run', mock_flow_run)
# locked
self.volume.create_volume(self.context, volume_id=dst_vol_id,
@ -1664,7 +1664,7 @@ class VolumeTestCase(BaseVolumeTestCase):
@mock.patch.object(coordination.Coordinator, 'get_lock')
def test_create_volume_from_volume_check_locks(self, mock_lock):
# mock the synchroniser so we can record events
self.stubs.Set(utils, 'execute', self._fake_execute)
self.mock_object(utils, 'execute', self._fake_execute)
orig_flow = engine.ActionEngine.run
@ -1690,7 +1690,7 @@ class VolumeTestCase(BaseVolumeTestCase):
admin_ctxt = context.get_admin_context()
# mock the flow runner so we can do some checks
self.stubs.Set(engine.ActionEngine, 'run', mock_flow_run)
self.mock_object(engine.ActionEngine, 'run', mock_flow_run)
# locked
self.volume.create_volume(self.context, volume_id=dst_vol_id,
@ -1727,7 +1727,7 @@ class VolumeTestCase(BaseVolumeTestCase):
def mock_elevated(*args, **kwargs):
# unset mock so it is only called once
self.stubs.Set(self.context, 'elevated', orig_elevated)
self.mock_object(self.context, 'elevated', orig_elevated)
# we expect this to block and then fail
t = eventlet.spawn(self.volume.create_volume,
@ -1741,7 +1741,7 @@ class VolumeTestCase(BaseVolumeTestCase):
# mock something from early on in the delete operation and within the
# lock so that when we do the create we expect it to block.
self.stubs.Set(self.context, 'elevated', mock_elevated)
self.mock_object(self.context, 'elevated', mock_elevated)
# locked
self.volume.delete_volume(self.context, src_vol_id, volume=src_vol)
@ -1985,7 +1985,7 @@ class VolumeTestCase(BaseVolumeTestCase):
def mock_elevated(*args, **kwargs):
# unset mock so it is only called once
self.stubs.Set(self.context, 'elevated', orig_elevated)
self.mock_object(self.context, 'elevated', orig_elevated)
# We expect this to block and then fail
t = eventlet.spawn(self.volume.create_volume, self.context,
@ -1998,7 +1998,7 @@ class VolumeTestCase(BaseVolumeTestCase):
# mock something from early on in the delete operation and within the
# lock so that when we do the create we expect it to block.
self.stubs.Set(self.context, 'elevated', mock_elevated)
self.mock_object(self.context, 'elevated', mock_elevated)
# locked
self.volume.delete_snapshot(self.context, snapshot_obj)
@ -2074,7 +2074,7 @@ class VolumeTestCase(BaseVolumeTestCase):
def test_create_volume_from_encrypted_volume(self):
"""Test volume can be created from an encrypted volume."""
self.stubs.Set(key_manager, 'API', fake_keymgr.fake_api)
self.mock_object(key_manager, 'API', fake_keymgr.fake_api)
cipher = 'aes-xts-plain64'
key_size = 256
@ -2160,9 +2160,9 @@ class VolumeTestCase(BaseVolumeTestCase):
return ({'name': 'nova', 'available': True},
{'name': 'az2', 'available': True})
self.stubs.Set(volume_api,
'list_availability_zones',
fake_list_availability_zones)
self.mock_object(volume_api,
'list_availability_zones',
fake_list_availability_zones)
volume_src = tests_utils.create_volume(self.context,
availability_zone='az2',
@ -3509,22 +3509,23 @@ class VolumeTestCase(BaseVolumeTestCase):
snapshot = create_snapshot(volume_id, size=volume['size'])
self.volume.create_snapshot(self.context, volume_id, snapshot)
self.mox.StubOutWithMock(self.volume.driver, 'delete_snapshot')
self.volume.driver.delete_snapshot(
mox.IgnoreArg()).AndRaise(
exception.SnapshotIsBusy(snapshot_name='fake'))
self.mox.ReplayAll()
snapshot_id = snapshot.id
self.volume.delete_snapshot(self.context, snapshot)
snapshot_ref = objects.Snapshot.get_by_id(self.context, snapshot_id)
self.assertEqual(snapshot_id, snapshot_ref.id)
self.assertEqual(fields.SnapshotStatus.AVAILABLE, snapshot_ref.status)
with mock.patch.object(self.volume.driver, 'delete_snapshot',
side_effect=exception.SnapshotIsBusy(
snapshot_name='fake')
) as mock_del_snap:
snapshot_id = snapshot.id
self.volume.delete_snapshot(self.context, snapshot)
snapshot_ref = objects.Snapshot.get_by_id(self.context,
snapshot_id)
self.assertEqual(snapshot_id, snapshot_ref.id)
self.assertEqual(fields.SnapshotStatus.AVAILABLE,
snapshot_ref.status)
mock_del_snap.assert_called_once_with(snapshot)
@test.testtools.skipIf(sys.platform == "darwin", "SKIP on OSX")
def test_delete_no_dev_fails(self):
"""Test delete snapshot with no dev file fails."""
self.stubs.Set(os.path, 'exists', lambda x: False)
self.mock_object(os.path, 'exists', lambda x: False)
self.volume.driver.vg = fake_lvm.FakeBrickLVM('cinder-volumes',
False,
None,
@ -3537,18 +3538,16 @@ class VolumeTestCase(BaseVolumeTestCase):
snapshot_id = snapshot.id
self.volume.create_snapshot(self.context, volume_id, snapshot)
self.mox.StubOutWithMock(self.volume.driver, 'delete_snapshot')
self.volume.driver.delete_snapshot(
mox.IgnoreArg()).AndRaise(
exception.SnapshotIsBusy(snapshot_name='fake'))
self.mox.ReplayAll()
self.volume.delete_snapshot(self.context, snapshot)
snapshot_ref = objects.Snapshot.get_by_id(self.context, snapshot_id)
self.assertEqual(snapshot_id, snapshot_ref.id)
self.assertEqual(fields.SnapshotStatus.AVAILABLE, snapshot_ref.status)
self.mox.UnsetStubs()
with mock.patch.object(self.volume.driver, 'delete_snapshot',
side_effect=exception.SnapshotIsBusy(
snapshot_name='fake')) as mock_del_snap:
self.volume.delete_snapshot(self.context, snapshot)
snapshot_ref = objects.Snapshot.get_by_id(self.context,
snapshot_id)
self.assertEqual(snapshot_id, snapshot_ref.id)
self.assertEqual(fields.SnapshotStatus.AVAILABLE,
snapshot_ref.status)
mock_del_snap.assert_called_once_with(snapshot)
@mock.patch('cinder.volume.drivers.lvm.LVMVolumeDriver.'
'create_cloned_volume')
@ -3627,7 +3626,7 @@ class VolumeTestCase(BaseVolumeTestCase):
dst_fd, dst_path = tempfile.mkstemp()
os.close(dst_fd)
self.stubs.Set(self.volume.driver, 'local_path', lambda x: dst_path)
self.mock_object(self.volume.driver, 'local_path', lambda x: dst_path)
# creating volume testdata
kwargs = {'display_description': 'Test Desc',
@ -3668,8 +3667,8 @@ class VolumeTestCase(BaseVolumeTestCase):
image_id):
raise exception.ImageCopyFailure()
self.stubs.Set(self.volume.driver, 'copy_image_to_volume',
fake_copy_image_to_volume)
self.mock_object(self.volume.driver, 'copy_image_to_volume',
fake_copy_image_to_volume)
self.assertRaises(exception.ImageCopyFailure,
self._create_volume_from_image)
# NOTE(dulek): Rescheduling should not occur, so lets assert that
@ -4124,8 +4123,8 @@ class VolumeTestCase(BaseVolumeTestCase):
def fake_create_cloned_volume(volume, src_vref):
pass
self.stubs.Set(self.volume.driver, 'create_cloned_volume',
fake_create_cloned_volume)
self.mock_object(self.volume.driver, 'create_cloned_volume',
fake_create_cloned_volume)
volume_src = tests_utils.create_volume(self.context,
**self.volume_params)
self.volume.create_volume(self.context, volume_src.id,
@ -4180,8 +4179,8 @@ class VolumeTestCase(BaseVolumeTestCase):
def fake_create_cloned_volume(volume, src_vref):
pass
self.stubs.Set(self.volume.driver, 'create_cloned_volume',
fake_create_cloned_volume)
self.mock_object(self.volume.driver, 'create_cloned_volume',
fake_create_cloned_volume)
image_info = imageutils.QemuImgInfo()
image_info.virtual_size = '1073741824'
mock_qemu_info.return_value = image_info
@ -4216,8 +4215,8 @@ class VolumeTestCase(BaseVolumeTestCase):
db.volume_update(self.context, src_vref['id'], {'status': 'error'})
raise exception.CinderException('fake exception')
self.stubs.Set(self.volume.driver, 'create_cloned_volume',
fake_error_create_cloned_volume)
self.mock_object(self.volume.driver, 'create_cloned_volume',
fake_error_create_cloned_volume)
volume_src = tests_utils.create_volume(self.context,
**self.volume_params)
self.assertEqual('creating', volume_src.status)
@ -4793,10 +4792,10 @@ class VolumeMigrationTestCase(BaseVolumeTestCase):
def test_migrate_volume_driver(self):
"""Test volume migration done by driver."""
# stub out driver and rpc functions
self.stubs.Set(self.volume.driver, 'migrate_volume',
lambda x, y, z, new_type_id=None: (
True, {'user_id': fake.USER_ID}))
# Mock driver and rpc functions
self.mock_object(self.volume.driver, 'migrate_volume',
lambda x, y, z, new_type_id=None: (
True, {'user_id': fake.USER_ID}))
volume = tests_utils.create_volume(self.context, size=0,
host=CONF.host,
@ -5648,7 +5647,8 @@ class CopyVolumeToImageTestCase(BaseVolumeTestCase):
self.addCleanup(os.unlink, self.dst_path)
os.close(self.dst_fd)
self.stubs.Set(self.volume.driver, 'local_path', self.fake_local_path)
self.mock_object(self.volume.driver, 'local_path',
self.fake_local_path)
self.image_id = '70a599e0-31e7-49b7-b260-868f441e862b'
self.image_meta = {
'id': self.image_id,
@ -6083,7 +6083,7 @@ class DriverTestCase(test.TestCase):
self.context = context.get_admin_context()
self.output = ""
self.configuration = conf.Configuration(None)
self.stubs.Set(brick_lvm.LVM, '_vg_exists', lambda x: True)
self.mock_object(brick_lvm.LVM, '_vg_exists', lambda x: True)
def _fake_execute(_command, *_args, **_kwargs):
"""Fake _execute."""
@ -6574,25 +6574,27 @@ class VolumePolicyTestCase(test.TestCase):
self.context = context.get_admin_context()
def test_check_policy(self):
self.mox.StubOutWithMock(cinder.policy, 'enforce')
target = {
'project_id': self.context.project_id,
'user_id': self.context.user_id,
}
cinder.policy.enforce(self.context, 'volume:attach', target)
self.mox.ReplayAll()
cinder.volume.api.check_policy(self.context, 'attach')
with mock.patch.object(cinder.policy, 'enforce') as mock_enforce:
cinder.volume.api.check_policy(self.context, 'attach')
mock_enforce.assert_called_once_with(self.context,
'volume:attach',
target)
def test_check_policy_with_target(self):
self.mox.StubOutWithMock(cinder.policy, 'enforce')
target = {
'project_id': self.context.project_id,
'user_id': self.context.user_id,
'id': 2,
}
cinder.policy.enforce(self.context, 'volume:attach', target)
self.mox.ReplayAll()
cinder.volume.api.check_policy(self.context, 'attach', {'id': 2})
with mock.patch.object(cinder.policy, 'enforce') as mock_enforce:
cinder.volume.api.check_policy(self.context, 'attach', {'id': 2})
mock_enforce.assert_called_once_with(self.context,
'volume:attach',
target)
class ImageVolumeCacheTestCase(BaseVolumeTestCase):