Refactor CinderFixture

Replaced stub with MockPatch.

Change-Id: Iaf4a9182b79ec4d1c2d3436b3dc9a6c760cd48f9
This commit is contained in:
Amit Uniyal 2023-06-09 12:39:00 +00:00
parent 6f56c5c9fd
commit f7ce4df51c
6 changed files with 300 additions and 288 deletions

View File

@ -105,297 +105,309 @@ class CinderFixture(fixtures.Fixture):
def setUp(self):
super().setUp()
self._create_fakes()
def _is_multiattach(volume_id):
return volume_id in [
self.MULTIATTACH_VOL,
self.MULTIATTACH_RO_SWAP_OLD_VOL,
self.MULTIATTACH_RO_SWAP_NEW_VOL,
self.MULTIATTACH_RO_MIGRATE_OLD_VOL,
self.MULTIATTACH_RO_MIGRATE_NEW_VOL]
def _create_fakes(self):
def fake_get(self_api, context, volume_id, microversion=None):
volume = {
'display_name': volume_id,
'id': volume_id,
'size': 1,
'multiattach': _is_multiattach(volume_id),
'availability_zone': self.az
}
# Add any attachment details the fixture has
fixture_attachments = self.volume_to_attachment[volume_id]
if fixture_attachments:
attachments = {}
for attachment in list(fixture_attachments.values()):
instance_uuid = attachment['instance_uuid']
# legacy cruft left over from notification tests
if (
volume_id == self.SWAP_OLD_VOL and
self.swap_volume_instance_uuid
):
instance_uuid = self.swap_volume_instance_uuid
if (
volume_id == self.SWAP_ERR_OLD_VOL and
self.swap_volume_instance_error_uuid
):
instance_uuid = self.swap_volume_instance_error_uuid
attachments[instance_uuid] = {
'attachment_id': attachment['id'],
'mountpoint': '/dev/vdb',
}
volume.update({
'status': 'in-use',
'attach_status': 'attached',
'attachments': attachments,
})
# Otherwise mark the volume as avilable and detached
else:
volume.update({
'status': 'available',
'attach_status': 'detached',
})
if volume_id == self.IMAGE_BACKED_VOL:
volume['bootable'] = True
volume['volume_image_metadata'] = {
'image_id': '155d900f-4e14-4e4c-a73d-069cbf4541e6'
}
if volume_id == self.IMAGE_BACKED_VOL_QUIESCE:
volume['bootable'] = True
volume['volume_image_metadata'] = {
"os_require_quiesce": "True",
"hw_qemu_guest_agent": "True"
}
if volume_id == self.IMAGE_WITH_TRAITS_BACKED_VOL:
volume['bootable'] = True
volume['volume_image_metadata'] = {
'image_id': '155d900f-4e14-4e4c-a73d-069cbf4541e6',
"trait:HW_CPU_X86_SGX": "required",
}
# If we haven't called migrate_volume_completion then return
# a migration_status of migrating
if (
volume_id == self.MULTIATTACH_RO_MIGRATE_OLD_VOL and
not self.multiattach_ro_migrated
):
volume['migration_status'] = 'migrating'
# If we have migrated and are still GET'ing the new volume
# return raise VolumeNotFound
if (
volume_id == self.MULTIATTACH_RO_MIGRATE_NEW_VOL and
self.multiattach_ro_migrated
):
raise exception.VolumeNotFound(
volume_id=self.MULTIATTACH_RO_MIGRATE_NEW_VOL)
return volume
def fake_migrate_volume_completion(
_self, context, old_volume_id, new_volume_id, error,
):
if new_volume_id == self.MULTIATTACH_RO_MIGRATE_NEW_VOL:
# Mimic the behaviour of Cinder here that renames the new
# volume to the old UUID once the migration is complete.
# This boolean is used above to signal that the old volume
# has been deleted if callers try to GET it.
self.multiattach_ro_migrated = True
return {'save_volume_id': old_volume_id}
return {'save_volume_id': new_volume_id}
def _find_attachment(attachment_id):
"""Find attachment corresponding to ``attachment_id``.
:returns: A tuple of the volume ID, an attachment dict for the
given attachment ID, and a dict (keyed by attachment id) of
attachment dicts for the volume.
"""
for volume_id, attachments in self.volume_to_attachment.items():
for attachment in attachments.values():
if attachment_id == attachment['id']:
return volume_id, attachment, attachments
raise exception.VolumeAttachmentNotFound(
attachment_id=attachment_id)
def _find_connection_info(volume_id, attachment_id):
"""Find the connection_info associated with an attachment
:returns: A connection_info dict based on a deepcopy associated
with the volume_id but containing both the attachment_id and
volume_id making it unique for the attachment.
"""
connection_info = copy.deepcopy(
self.VOLUME_CONNECTION_INFO.get(
volume_id, self.VOLUME_CONNECTION_INFO.get('fake')
)
)
connection_info['data']['volume_id'] = volume_id
connection_info['data']['attachment_id'] = attachment_id
return connection_info
def fake_attachment_create(
_self, context, volume_id, instance_uuid, connector=None,
mountpoint=None,
):
attachment_id = uuidutils.generate_uuid()
if self.attachment_error_id is not None:
attachment_id = self.attachment_error_id
attachment = {'id': attachment_id}
if connector:
attachment['connection_info'] = _find_connection_info(
volume_id, attachment_id)
self.volume_to_attachment[volume_id][attachment_id] = {
'id': attachment_id,
'instance_uuid': instance_uuid,
'connector': connector,
}
if volume_id in [self.MULTIATTACH_RO_SWAP_OLD_VOL,
self.MULTIATTACH_RO_SWAP_NEW_VOL,
self.MULTIATTACH_RO_MIGRATE_OLD_VOL,
self.MULTIATTACH_RO_MIGRATE_NEW_VOL]:
attachment['attach_mode'] = 'ro'
LOG.info(
'Created attachment %s for volume %s. Total attachments '
'for volume: %d',
attachment_id, volume_id,
len(self.volume_to_attachment[volume_id]))
return attachment
def fake_attachment_delete(_self, context, attachment_id):
# 'attachment' is a tuple defining a attachment-instance mapping
volume_id, attachment, attachments = (
_find_attachment(attachment_id))
del attachments[attachment_id]
LOG.info(
'Deleted attachment %s for volume %s. Total attachments '
'for volume: %d',
attachment_id, volume_id, len(attachments))
def fake_attachment_update(
_self, context, attachment_id, connector, mountpoint=None,
):
# Ensure the attachment exists
volume_id, attachment, attachments = _find_attachment(
attachment_id)
# Cinder will only allow one "connected" attachment per
# non-multiattach volume at a time.
if volume_id != self.MULTIATTACH_VOL:
for _attachment in attachments.values():
if _attachment['connector'] is not None:
raise exception.InvalidInput(
'Volume %s is already connected with attachment '
'%s on host %s' % (
volume_id, _attachment['id'],
_attachment['connector'].get('host')))
# If the mountpoint was provided stash it in the connector as we do
# within nova.volume.cinder.API.attachment_update before calling
# c-api and then stash the connector in the attachment record.
if mountpoint:
connector['device'] = mountpoint
attachment['connector'] = connector
LOG.info('Updating volume attachment: %s', attachment_id)
attachment_ref = {
'id': attachment_id,
'connection_info': _find_connection_info(
volume_id, attachment_id)
}
if attachment_id == self.SWAP_ERR_ATTACH_ID:
# This intentionally triggers a TypeError for the
# instance.volume_swap.error versioned notification tests.
attachment_ref = {'connection_info': ()}
return attachment_ref
def fake_attachment_get(_self, context, attachment_id):
# Ensure the attachment exists and grab the volume_id
volume_id, _, _ = _find_attachment(attachment_id)
attachment_ref = {
'id': attachment_id,
'connection_info': _find_connection_info(
volume_id, attachment_id)
}
return attachment_ref
def fake_get_all_volume_types(*args, **kwargs):
return [{
# This is used in the 2.67 API sample test.
'id': '5f9204ec-3e94-4f27-9beb-fe7bb73b6eb9',
'name': 'lvm-1'
}]
def fake_attachment_complete(_self, _context, attachment_id):
# Ensure the attachment exists
_find_attachment(attachment_id)
LOG.info('Completing volume attachment: %s', attachment_id)
def fake_reimage_volume(*args, **kwargs):
if self.IMAGE_BACKED_VOL not in args:
raise exception.VolumeNotFound()
if 'reimage_reserved' not in kwargs:
raise exception.InvalidInput('reimage_reserved not specified')
def fake_get_absolute_limits(_self, context):
limits = {'totalSnapshotsUsed': 0, 'maxTotalSnapshots': -1}
return limits
self.test.stub_out(
'nova.volume.cinder.API.attachment_create', fake_attachment_create)
self.test.stub_out(
'nova.volume.cinder.API.attachment_delete', fake_attachment_delete)
self.test.stub_out(
'nova.volume.cinder.API.attachment_update', fake_attachment_update)
self.test.stub_out(
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.attachment_create',
side_effect=self.fake_attachment_create, autospec=False))
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.attachment_update',
side_effect=self.fake_attachment_update, autospec=False))
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.attachment_delete',
side_effect=self.fake_attachment_delete, autospec=False))
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.attachment_complete',
fake_attachment_complete)
self.test.stub_out(
'nova.volume.cinder.API.attachment_get', fake_attachment_get)
self.test.stub_out(
side_effect=self.fake_attachment_complete, autospec=False))
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.attachment_get',
side_effect=self.fake_attachment_get, autospec=False))
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.begin_detaching',
lambda *args, **kwargs: None)
self.test.stub_out('nova.volume.cinder.API.get', fake_get)
self.test.stub_out(
lambda *args, **kwargs: None))
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.get',
side_effect=self.fake_get, autospec=False))
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.migrate_volume_completion',
fake_migrate_volume_completion)
self.test.stub_out(
side_effect=self.fake_migrate_volume_completion, autospec=False))
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.roll_detaching',
lambda *args, **kwargs: None)
self.test.stub_out(
side_effect=(lambda *args, **kwargs: None), autospec=False))
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.is_microversion_supported',
lambda ctxt, microversion: None)
self.test.stub_out(
side_effect=(lambda ctxt, microversion: None), autospec=False))
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.check_attached',
lambda *args, **kwargs: None)
self.test.stub_out(
side_effect=(lambda *args, **kwargs: None), autospec=False))
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.get_all_volume_types',
fake_get_all_volume_types)
side_effect=self.fake_get_all_volume_types, autospec=False))
# TODO(lyarwood): These legacy cinderv2 APIs aren't currently wired
# into the fixture but should be in the future before we migrate any
# remaining legacy exports to cinderv3 attachments.
self.test.stub_out(
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.initialize_connection',
lambda *args, **kwargs: None)
self.test.stub_out(
side_effect=(lambda *args, **kwargs: None), autospec=False))
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.terminate_connection',
lambda *args, **kwargs: None)
self.test.stub_out(
side_effect=lambda *args, **kwargs: None, autospec=False))
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.reimage_volume',
fake_reimage_volume)
self.test.stub_out(
side_effect=self.fake_reimage_volume, autospec=False))
self.useFixture(fixtures.MockPatch(
'nova.volume.cinder.API.get_absolute_limits',
fake_get_absolute_limits)
side_effect=self.fake_get_absolute_limits, autospec=False))
def _is_multiattach(self, volume_id):
return volume_id in [
self.MULTIATTACH_VOL,
self.MULTIATTACH_RO_SWAP_OLD_VOL,
self.MULTIATTACH_RO_SWAP_NEW_VOL,
self.MULTIATTACH_RO_MIGRATE_OLD_VOL,
self.MULTIATTACH_RO_MIGRATE_NEW_VOL]
def _find_attachment(self, attachment_id):
"""Find attachment corresponding to ``attachment_id``.
:returns: A tuple of the volume ID, an attachment dict for the
given attachment ID, and a dict (keyed by attachment id) of
attachment dicts for the volume.
"""
for volume_id, attachments in self.volume_to_attachment.items():
for attachment in attachments.values():
if attachment_id == attachment['id']:
return volume_id, attachment, attachments
raise exception.VolumeAttachmentNotFound(
attachment_id=attachment_id)
def _find_connection_info(self, volume_id, attachment_id):
"""Find the connection_info associated with an attachment
:returns: A connection_info dict based on a deepcopy associated
with the volume_id but containing both the attachment_id and
volume_id making it unique for the attachment.
"""
connection_info = copy.deepcopy(
self.VOLUME_CONNECTION_INFO.get(
volume_id, self.VOLUME_CONNECTION_INFO.get('fake')
)
)
connection_info['data']['volume_id'] = volume_id
connection_info['data']['attachment_id'] = attachment_id
return connection_info
def fake_migrate_volume_completion(
self, context, old_volume_id, new_volume_id, error,
):
if new_volume_id == self.MULTIATTACH_RO_MIGRATE_NEW_VOL:
# Mimic the behaviour of Cinder here that renames the new
# volume to the old UUID once the migration is complete.
# This boolean is used above to signal that the old volume
# has been deleted if callers try to GET it.
self.multiattach_ro_migrated = True
return {'save_volume_id': old_volume_id}
return {'save_volume_id': new_volume_id}
def fake_get(self, context, volume_id, microversion=None):
volume = {
'display_name': volume_id,
'id': volume_id,
'size': 1,
'multiattach': self._is_multiattach(volume_id),
'availability_zone': self.az
}
# Add any attachment details the fixture has
fixture_attachments = self.volume_to_attachment[volume_id]
if fixture_attachments:
attachments = {}
for attachment in list(fixture_attachments.values()):
instance_uuid = attachment['instance_uuid']
# legacy cruft left over from notification tests
if (
volume_id == self.SWAP_OLD_VOL and
self.swap_volume_instance_uuid
):
instance_uuid = self.swap_volume_instance_uuid
if (
volume_id == self.SWAP_ERR_OLD_VOL and
self.swap_volume_instance_error_uuid
):
instance_uuid = self.swap_volume_instance_error_uuid
attachments[instance_uuid] = {
'attachment_id': attachment['id'],
'mountpoint': '/dev/vdb',
}
volume.update({
'status': 'in-use',
'attach_status': 'attached',
'attachments': attachments,
})
# Otherwise mark the volume as avilable and detached
else:
volume.update({
'status': 'available',
'attach_status': 'detached',
})
if volume_id == self.IMAGE_BACKED_VOL:
volume['bootable'] = True
volume['volume_image_metadata'] = {
'image_id': '155d900f-4e14-4e4c-a73d-069cbf4541e6'
}
if volume_id == self.IMAGE_BACKED_VOL_QUIESCE:
volume['bootable'] = True
volume['volume_image_metadata'] = {
"os_require_quiesce": "True",
"hw_qemu_guest_agent": "True"
}
if volume_id == self.IMAGE_WITH_TRAITS_BACKED_VOL:
volume['bootable'] = True
volume['volume_image_metadata'] = {
'image_id': '155d900f-4e14-4e4c-a73d-069cbf4541e6',
"trait:HW_CPU_X86_SGX": "required",
}
# If we haven't called migrate_volume_completion then return
# a migration_status of migrating
if (
volume_id == self.MULTIATTACH_RO_MIGRATE_OLD_VOL and
not self.multiattach_ro_migrated
):
volume['migration_status'] = 'migrating'
# If we have migrated and are still GET'ing the new volume
# return raise VolumeNotFound
if (
volume_id == self.MULTIATTACH_RO_MIGRATE_NEW_VOL and
self.multiattach_ro_migrated
):
raise exception.VolumeNotFound(
volume_id=self.MULTIATTACH_RO_MIGRATE_NEW_VOL)
return volume
def fake_get_all_volume_types(self, *args, **kwargs):
return [{
# This is used in the 2.67 API sample test.
'id': '5f9204ec-3e94-4f27-9beb-fe7bb73b6eb9',
'name': 'lvm-1'
}]
def fake_attachment_get(self, context, attachment_id):
# Ensure the attachment exists and grab the volume_id
volume_id, _, _ = self._find_attachment(attachment_id)
attachment_ref = {
'id': attachment_id,
'connection_info': self._find_connection_info(
volume_id, attachment_id)
}
return attachment_ref
def fake_attachment_create(
self, context, volume_id, instance_uuid, connector=None,
mountpoint=None,
):
attachment_id = uuidutils.generate_uuid()
if self.attachment_error_id is not None:
attachment_id = self.attachment_error_id
attachment = {'id': attachment_id}
if connector:
attachment['connection_info'] = self._find_connection_info(
volume_id, attachment_id)
self.volume_to_attachment[volume_id][attachment_id] = {
'id': attachment_id,
'instance_uuid': instance_uuid,
'connector': connector,
}
if volume_id in [self.MULTIATTACH_RO_SWAP_OLD_VOL,
self.MULTIATTACH_RO_SWAP_NEW_VOL,
self.MULTIATTACH_RO_MIGRATE_OLD_VOL,
self.MULTIATTACH_RO_MIGRATE_NEW_VOL]:
attachment['attach_mode'] = 'ro'
LOG.info(
'Created attachment %s for volume %s. Total attachments '
'for volume: %d',
attachment_id, volume_id,
len(self.volume_to_attachment[volume_id]))
return attachment
def fake_attachment_update(
self, context, attachment_id, connector, mountpoint=None,
):
# Ensure the attachment exists
volume_id, attachment, attachments = self._find_attachment(
attachment_id)
# Cinder will only allow one "connected" attachment per
# non-multiattach volume at a time.
if volume_id != self.MULTIATTACH_VOL:
for _attachment in attachments.values():
if _attachment['connector'] is not None:
raise exception.InvalidInput(
'Volume %s is already connected with attachment '
'%s on host %s' % (
volume_id, _attachment['id'],
_attachment['connector'].get('host')))
# If the mountpoint was provided stash it in the connector as we do
# within nova.volume.cinder.API.attachment_update before calling
# c-api and then stash the connector in the attachment record.
if mountpoint:
connector['device'] = mountpoint
attachment['connector'] = connector
LOG.info('Updating volume attachment: %s', attachment_id)
attachment_ref = {
'id': attachment_id,
'connection_info': self._find_connection_info(
volume_id, attachment_id)
}
if attachment_id == self.SWAP_ERR_ATTACH_ID:
# This intentionally triggers a TypeError for the
# instance.volume_swap.error versioned notification tests.
attachment_ref = {'connection_info': ()}
return attachment_ref
def fake_attachment_complete(self, _context, attachment_id):
# Ensure the attachment exists
self._find_attachment(attachment_id)
LOG.info('Completing volume attachment: %s', attachment_id)
def fake_attachment_delete(self, context, attachment_id):
# 'attachment' is a tuple defining a attachment-instance mapping
volume_id, attachment, attachments = (
self._find_attachment(attachment_id))
del attachments[attachment_id]
LOG.info(
'Deleted attachment %s for volume %s. Total attachments '
'for volume: %d',
attachment_id, volume_id, len(attachments))
def fake_reimage_volume(self, *args, **kwargs):
if self.IMAGE_BACKED_VOL not in args:
raise exception.VolumeNotFound()
if 'reimage_reserved' not in kwargs:
raise exception.InvalidInput('reimage_reserved not specified')
def fake_get_absolute_limits(self, context):
limits = {'totalSnapshotsUsed': 0, 'maxTotalSnapshots': -1}
return limits
def volume_ids_for_instance(self, instance_uuid):
for volume_id, attachments in self.volume_to_attachment.items():

View File

@ -19,6 +19,7 @@ from nova.tests import fixtures
from nova.tests.functional.api import client
from nova.tests.functional.notification_sample_tests \
import notification_sample_base
from nova.volume import cinder
class TestInstanceNotificationSampleWithMultipleCompute(
@ -1774,11 +1775,14 @@ class TestInstanceNotificationSample(
# Leave instance in normal, active state
self.api.post_server_action(server['id'], {'restore': {}})
def _do_test_attach_volume_error(self, server, mock_attach):
def _test_attach_volume_error(self, server):
def attach_volume(*args, **kwargs):
raise exception.CinderConnectionFailed(
reason="Connection timed out")
mock_attach.side_effect = attach_volume
# Override the fixture's default implementation of this with our
# error-generating version.
cinder.API.attachment_update.side_effect = attach_volume
post = {"volumeAttachment": {"volumeId": self.cinder.SWAP_NEW_VOL}}
self.api.post_server_volume(server['id'], post)
@ -1834,10 +1838,6 @@ class TestInstanceNotificationSample(
'fault.traceback': self.ANY},
actual=self.notifier.versioned_notifications[1])
@mock.patch('nova.volume.cinder.API.attachment_update')
def _test_attach_volume_error(self, server, mock_attach):
self._do_test_attach_volume_error(server, mock_attach)
def _test_interface_attach_and_detach(self, server):
post = {
'interfaceAttachment': {

View File

@ -84,7 +84,7 @@ class TestVolumeDisconnectDuringPreLiveMigrationRollback(base.ServersTestBase):
self.computes['dest'].driver, '_disconnect_volume'),
mock.patch(
'nova.volume.cinder.API.attachment_create',
side_effect=test.TestingException)
side_effect=test.TestingException, autospec=False)
) as (
mock_dest_connector, mock_dest_connect, mock_dest_disconnect,
mock_attachment_create

View File

@ -57,7 +57,7 @@ class TestDetachAttachmentNotFound(integrated_helpers._IntegratedTestBase):
with mock.patch(
'nova.volume.cinder.API.attachment_delete',
side_effect=exception.VolumeAttachmentNotFound(
attachment_id=bdm.attachment_id)
attachment_id=bdm.attachment_id), autospec=False
) as (
mock_attachment_delete
):

View File

@ -417,7 +417,7 @@ class TestNovaManageVolumeAttachmentRefresh(
with (
mock.patch(
'nova.volume.cinder.API.attachment_update',
side_effect=test.TestingException)
side_effect=test.TestingException, autospec=False)
) as (
mock_attachment_update
):

View File

@ -1137,9 +1137,9 @@ class ServerTestV220(integrated_helpers._IntegratedTestBase):
mock.patch.object(volume.cinder.API,
'check_availability_zone'),
mock.patch.object(volume.cinder.API,
'attachment_create'),
'attachment_create', autospec=False),
mock.patch.object(volume.cinder.API,
'attachment_complete')
'attachment_complete', autospec=False)
) as (mock_check_vol_attached,
mock_check_av_zone, mock_attach_create,
mock_attachment_complete):
@ -1667,7 +1667,7 @@ class ServerRebuildTestCaseV293(integrated_helpers._IntegratedTestBase):
server = self._bfv_server()
error = cinder_exception.ClientException(code=500)
with mock.patch.object(volume.cinder.API, 'attachment_create',
side_effect=error):
side_effect=error, autospec=False):
# We expect this to fail because we are doing cast-as-call
self.assertRaises(client.OpenStackApiException,
self._test_rebuild, server)
@ -1693,7 +1693,7 @@ class ServerRebuildTestCaseV293(integrated_helpers._IntegratedTestBase):
server = self._bfv_server()
error = cinder_exception.ClientException(code=500)
with mock.patch.object(volume.cinder.API, 'attachment_delete',
side_effect=error):
side_effect=error, autospec=False):
# We expect this to fail because we are doing cast-as-call
self.assertRaises(client.OpenStackApiException,
self._test_rebuild, server)