Add block_storage.volume actions

Before we can switch BS cloud layer need to add missing volume actions.
Do this for both v2 and v3 (v2 differs slightly).

Change-Id: Iee4c137ea703d7adec4f8f1e7a3585a1d01e6d19
This commit is contained in:
Artem Goncharov
2021-05-14 16:37:24 +02:00
parent d1c9a96ef6
commit f9183cf481
8 changed files with 1439 additions and 104 deletions

View File

@@ -189,26 +189,32 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
"""
return self._create(_volume.Volume, **attrs)
def delete_volume(self, volume, ignore_missing=True):
def delete_volume(self, volume, ignore_missing=True, force=False):
"""Delete a volume
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v2.volume.Volume` instance.
:class:`~openstack.volume.v2.volume.Volume` instance.
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be
raised when the volume does not exist.
When set to ``True``, no exception will be set when
attempting to delete a nonexistent volume.
:class:`~openstack.exceptions.ResourceNotFound` will be raised
when the volume does not exist. When set to ``True``, no
exception will be set when attempting to delete a nonexistent
volume.
:param bool force: Whether to try forcing volume deletion.
:returns: ``None``
"""
self._delete(_volume.Volume, volume, ignore_missing=ignore_missing)
if not force:
self._delete(_volume.Volume, volume, ignore_missing=ignore_missing)
else:
volume = self._get_resource(_volume.Volume, volume)
volume.force_delete(self)
# ====== VOLUME ACTIONS ======
def extend_volume(self, volume, size):
"""Extend a volume
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v2.volume.Volume` instance.
:class:`~openstack.volume.v2.volume.Volume` instance.
:param size: New volume size
:returns: None
@@ -216,6 +222,136 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
volume = self._get_resource(_volume.Volume, volume)
volume.extend(self, size)
def retype_volume(self, volume, new_type, migration_policy="never"):
"""Retype the volume.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v2.volume.Volume` instance.
:param str new_type: The new volume type that volume is changed with.
:param str migration_policy: Specify if the volume should be migrated
when it is re-typed. Possible values are on-demand or never.
Default: never.
:returns: None
"""
volume = self._get_resource(_volume.Volume, volume)
volume.retype(self, new_type, migration_policy)
def set_volume_bootable_status(self, volume, bootable):
"""Set bootable status of the volume.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v2.volume.Volume` instance.
:param bool bootable: Specifies whether the volume should be bootable
or not.
:returns: None
"""
volume = self._get_resource(_volume.Volume, volume)
volume.set_bootable_status(self, bootable)
def reset_volume_status(
self, volume, status, attach_status, migration_status
):
"""Reset volume statuses.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v2.volume.Volume` instance.
:param str status: The new volume status.
:param str attach_status: The new volume attach status.
:param str migration_status: The new volume migration status (admin
only).
:returns: None
"""
volume = self._get_resource(_volume.Volume, volume)
volume.reset_status(self, status, attach_status, migration_status)
def attach_volume(
self, volume, mountpoint, instance=None, host_name=None
):
"""Attaches a volume to a server.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v2.volume.Volume` instance.
:param str mountpoint: The attaching mount point.
:param str instance: The UUID of the attaching instance.
:param str host_name: The name of the attaching host.
:returns: None
"""
volume = self._get_resource(_volume.Volume, volume)
volume.attach(self, mountpoint, instance, host_name)
def detach_volume(
self, volume, attachment, force=False, connector=None
):
"""Detaches a volume from a server.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v2.volume.Volume` instance.
:param str attachment: The ID of the attachment.
:param bool force: Whether to force volume detach (Rolls back an
unsuccessful detach operation after you disconnect the volume.)
:param dict connector: The connector object.
:returns: None
"""
volume = self._get_resource(_volume.Volume, volume)
volume.detach(self, attachment, force, connector)
def unmanage_volume(self, volume):
"""Removes a volume from Block Storage management without removing the
back-end storage object that is associated with it.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v2.volume.Volume` instance.
:returns: None """
volume = self._get_resource(_volume.Volume, volume)
volume.unmanage(self)
def migrate_volume(
self, volume, host=None, force_host_copy=False,
lock_volume=False
):
"""Migrates a volume to the specified host.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v2.volume.Volume` instance.
:param str host: The target host for the volume migration. Host
format is host@backend.
:param bool force_host_copy: If false (the default), rely on the volume
backend driver to perform the migration, which might be optimized.
If true, or the volume driver fails to migrate the volume itself,
a generic host-based migration is performed.
:param bool lock_volume: If true, migrating an available volume will
change its status to maintenance preventing other operations from
being performed on the volume such as attach, detach, retype, etc.
:returns: None
"""
volume = self._get_resource(_volume.Volume, volume)
volume.migrate(self, host, force_host_copy, lock_volume)
def complete_volume_migration(
self, volume, new_volume, error=False
):
"""Complete the migration of a volume.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v2.volume.Volume` instance.
:param str new_volume: The UUID of the new volume.
:param bool error: Used to indicate if an error has occured elsewhere
that requires clean up.
:returns: None
"""
volume = self._get_resource(_volume.Volume, volume)
volume.complete_migration(self, new_volume, error)
# ====== BACKEND POOLS ======
def backend_pools(self, **query):
"""Returns a generator of cinder Back-end storage pools

View File

@@ -99,13 +99,93 @@ class Volume(resource.Resource):
# as both Volume and VolumeDetail instances can be acted on, but
# the URL used is sans any additional /detail/ part.
url = utils.urljoin(Volume.base_path, self.id, 'action')
headers = {'Accept': ''}
return session.post(url, json=body, headers=headers)
return session.post(url, json=body, microversion=None)
def extend(self, session, size):
"""Extend a volume size."""
body = {'os-extend': {'new_size': size}}
self._action(session, body)
def set_bootable_status(self, session, bootable=True):
"""Set volume bootable status flag"""
body = {'os-set_bootable': {'bootable': bootable}}
self._action(session, body)
def reset_status(
self, session, status, attach_status, migration_status
):
"""Reset volume statuses (admin operation)"""
body = {'os-reset_status': {
'status': status,
'attach_status': attach_status,
'migration_status': migration_status
}}
self._action(session, body)
def attach(
self, session, mountpoint, instance
):
"""Attach volume to server"""
body = {'os-attach': {
'mountpoint': mountpoint,
'instance_uuid': instance}}
self._action(session, body)
def detach(self, session, attachment, force=False):
"""Detach volume from server"""
if not force:
body = {'os-detach': {'attachment_id': attachment}}
if force:
body = {'os-force_detach': {
'attachment_id': attachment}}
self._action(session, body)
def unmanage(self, session):
"""Unmanage volume"""
body = {'os-unmanage': {}}
self._action(session, body)
def retype(self, session, new_type, migration_policy=None):
"""Change volume type"""
body = {'os-retype': {
'new_type': new_type}}
if migration_policy:
body['os-retype']['migration_policy'] = migration_policy
self._action(session, body)
def migrate(
self, session, host=None, force_host_copy=False,
lock_volume=False
):
"""Migrate volume"""
req = dict()
if host is not None:
req['host'] = host
if force_host_copy:
req['force_host_copy'] = force_host_copy
if lock_volume:
req['lock_volume'] = lock_volume
body = {'os-migrate_volume': req}
self._action(session, body)
def complete_migration(self, session, new_volume_id, error=False):
"""Complete volume migration"""
body = {'os-migrate_volume_completion': {
'new_volume': new_volume_id,
'error': error}}
self._action(session, body)
def force_delete(self, session):
"""Force volume deletion"""
body = {'os-force_delete': {}}
self._action(session, body)
VolumeDetail = Volume

View File

@@ -304,6 +304,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
return self._update(_type.TypeEncryption, encryption, **attrs)
# ====== VOLUMES ======
def get_volume(self, volume):
"""Get a single volume
@@ -362,7 +363,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
"""
return self._create(_volume.Volume, **attrs)
def delete_volume(self, volume, ignore_missing=True):
def delete_volume(self, volume, ignore_missing=True, force=False):
"""Delete a volume
:param volume: The value can be either the ID of a volume or a
@@ -372,16 +373,22 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
raised when the volume does not exist.
When set to ``True``, no exception will be set when
attempting to delete a nonexistent volume.
:param bool force: Whether to try forcing volume deletion.
:returns: ``None``
"""
self._delete(_volume.Volume, volume, ignore_missing=ignore_missing)
if not force:
self._delete(_volume.Volume, volume, ignore_missing=ignore_missing)
else:
volume = self._get_resource(_volume.Volume, volume)
volume.force_delete(self)
# ====== VOLUME ACTIONS ======
def extend_volume(self, volume, size):
"""Extend a volume
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:class:`~openstack.volume.v3.volume.Volume` instance.
:param size: New volume size
:returns: None
@@ -392,12 +399,12 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
def set_volume_readonly(self, volume, readonly=True):
"""Set a volume's read-only flag.
:param name_or_id: Name, unique ID of the volume or a volume dict.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:param bool readonly: Whether the volume should be a read-only volume
or not
or not.
:raises: OpenStackCloudTimeout if wait time exceeded.
:raises: OpenStackCloudException on operation error.
:returns: None
"""
volume = self._get_resource(_volume.Volume, volume)
volume.set_readonly(self, readonly)
@@ -405,18 +412,243 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
def retype_volume(self, volume, new_type, migration_policy="never"):
"""Retype the volume.
:param name_or_id: Name, unique ID of the volume or a volume dict.
:param new_type: The new volume type that volume is changed with.
:param migration_policy: Specify if the volume should be migrated when
it is re-typed. Possible values are on-demand
or never. Default: never.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:param str new_type: The new volume type that volume is changed with.
:param str migration_policy: Specify if the volume should be migrated
when it is re-typed. Possible values are on-demand or never.
Default: never.
:raises: OpenStackCloudTimeout if wait time exceeded.
:raises: OpenStackCloudException on operation error.
:returns: None
"""
volume = self._get_resource(_volume.Volume, volume)
volume.retype(self, new_type, migration_policy)
def set_volume_bootable_status(self, volume, bootable):
"""Set bootable status of the volume.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:param bool bootable: Specifies whether the volume should be bootable
or not.
:returns: None
"""
volume = self._get_resource(_volume.Volume, volume)
volume.set_bootable_status(self, bootable)
def reset_volume_status(
self, volume, status, attach_status, migration_status
):
"""Reset volume statuses.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:param str status: The new volume status.
:param str attach_status: The new volume attach status.
:param str migration_status: The new volume migration status (admin
only).
:returns: None
"""
volume = self._get_resource(_volume.Volume, volume)
volume.reset_status(self, status, attach_status, migration_status)
def revert_volume_to_snapshot(
self, volume, snapshot
):
"""Revert a volume to its latest snapshot.
This method only support reverting a detached volume, and the
volume status must be available.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:param snapshot: The value can be either the ID of a snapshot or a
:class:`~openstack.volume.v3.snapshot.Snapshot` instance.
:returns: None
"""
volume = self._get_resource(_volume.Volume, volume)
snapshot = self._get_resource(_snapshot.Snapshot, snapshot)
volume.revert_to_snapshot(self, snapshot.id)
def attach_volume(
self, volume, mountpoint, instance=None, host_name=None
):
"""Attaches a volume to a server.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:param str mountpoint: The attaching mount point.
:param str instance: The UUID of the attaching instance.
:param str host_name: The name of the attaching host.
:returns: None
"""
volume = self._get_resource(_volume.Volume, volume)
volume.attach(self, mountpoint, instance, host_name)
def detach_volume(
self, volume, attachment, force=False, connector=None
):
"""Detaches a volume from a server.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:param str attachment: The ID of the attachment.
:param bool force: Whether to force volume detach (Rolls back an
unsuccessful detach operation after you disconnect the volume.)
:param dict connector: The connector object.
:returns: None
"""
volume = self._get_resource(_volume.Volume, volume)
volume.detach(self, attachment, force, connector)
def unmanage_volume(self, volume):
"""Removes a volume from Block Storage management without removing the
back-end storage object that is associated with it.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:returns: None """
volume = self._get_resource(_volume.Volume, volume)
volume.unmanage(self)
def migrate_volume(
self, volume, host=None, force_host_copy=False,
lock_volume=False, cluster=None
):
"""Migrates a volume to the specified host.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:param str host: The target host for the volume migration. Host
format is host@backend.
:param bool force_host_copy: If false (the default), rely on the volume
backend driver to perform the migration, which might be optimized.
If true, or the volume driver fails to migrate the volume itself,
a generic host-based migration is performed.
:param bool lock_volume: If true, migrating an available volume will
change its status to maintenance preventing other operations from
being performed on the volume such as attach, detach, retype, etc.
:param str cluster: The target cluster for the volume migration.
Cluster format is cluster@backend. Starting with microversion
3.16, either cluster or host must be specified. If host is
specified and is part of a cluster, the cluster is used as the
target for the migration.
:returns: None
"""
volume = self._get_resource(_volume.Volume, volume)
volume.migrate(self, host, force_host_copy, lock_volume, cluster)
def complete_volume_migration(
self, volume, new_volume, error=False
):
"""Complete the migration of a volume.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:param str new_volume: The UUID of the new volume.
:param bool error: Used to indicate if an error has occured elsewhere
that requires clean up.
:returns: None
"""
volume = self._get_resource(_volume.Volume, volume)
volume.complete_migration(self, new_volume, error)
def upload_volume_to_image(
self, volume, image_name, force=False, disk_format=None,
container_format=None, visibility=None, protected=None
):
"""Uploads the specified volume to image service.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:param str image name: The name for the new image.
:param bool force: Enables or disables upload of a volume that is
attached to an instance.
:param str disk_format: Disk format for the new image.
:param str container_format: Container format for the new image.
:param str visibility: The visibility property of the new image.
:param str protected: Whether the new image is protected.
:returns: dictionary describing the image.
"""
volume = self._get_resource(_volume.Volume, volume)
volume.upload_to_image(
self, image_name, force=force, disk_format=disk_format,
container_format=container_format, visibility=visibility,
protected=protected
)
def reserve_volume(self, volume):
"""Mark volume as reserved.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:returns: None """
volume = self._get_resource(_volume.Volume, volume)
volume.reserve(self)
def unreserve_volume(self, volume):
"""Unmark volume as reserved.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:returns: None """
volume = self._get_resource(_volume.Volume, volume)
volume.unreserve(self)
def begin_volume_detaching(self, volume):
"""Update volume status to 'detaching'.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:returns: None """
volume = self._get_resource(_volume.Volume, volume)
volume.begin_detaching(self)
def abort_volume_detaching(self, volume):
"""Update volume status to 'in-use'.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:returns: None """
volume = self._get_resource(_volume.Volume, volume)
volume.abort_detaching(self)
def init_volume_attachment(self, volume, connector):
"""Initialize volume attachment.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:param dict connector: The connector object.
:returns: None """
volume = self._get_resource(_volume.Volume, volume)
volume.init_attachment(self, connector)
def terminate_volume_attachment(self, volume, connector):
"""Update volume status to 'in-use'.
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:param dict connector: The connector object.
:returns: None """
volume = self._get_resource(_volume.Volume, volume)
volume.terminate_attachment(self, connector)
# ====== BACKEND POOLS ======
def backend_pools(self, **query):
"""Returns a generator of cinder Back-end storage pools

View File

@@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from openstack import exceptions
from openstack import format
from openstack import resource
from openstack import utils
@@ -21,7 +22,8 @@ class Volume(resource.Resource):
base_path = "/volumes"
_query_mapping = resource.QueryParameters(
'name', 'status', 'project_id', all_projects='all_tenants')
'name', 'status', 'project_id', 'created_at', 'updated_at',
all_projects='all_tenants')
# capabilities
allow_fetch = True
@@ -93,33 +95,185 @@ class Volume(resource.Resource):
#: The name of the associated volume type.
volume_type = resource.Body("volume_type")
def _action(self, session, body):
_max_microversion = "3.60"
def _action(self, session, body, microversion=None):
"""Preform volume actions given the message body."""
# NOTE: This is using Volume.base_path instead of self.base_path
# as both Volume and VolumeDetail instances can be acted on, but
# the URL used is sans any additional /detail/ part.
url = utils.urljoin(Volume.base_path, self.id, 'action')
headers = {'Accept': ''}
return session.post(url, json=body, headers=headers)
resp = session.post(url, json=body,
microversion=self._max_microversion)
exceptions.raise_from_response(resp)
return resp
def extend(self, session, size):
"""Extend a volume size."""
body = {'os-extend': {'new_size': size}}
self._action(session, body)
def set_bootable_status(self, session, bootable=True):
"""Set volume bootable status flag"""
body = {'os-set_bootable': {'bootable': bootable}}
self._action(session, body)
def set_readonly(self, session, readonly):
"""Set volume readonly flag"""
body = {'os-update_readonly_flag': {'readonly': readonly}}
self._action(session, body)
def retype(self, session, new_type, migration_policy):
"""Retype volume considering the migration policy"""
body = {
'os-retype': {
'new_type': new_type,
'migration_policy': migration_policy
}
}
def reset_status(
self, session, status, attach_status, migration_status
):
"""Reset volume statuses (admin operation)"""
body = {'os-reset_status': {
'status': status,
'attach_status': attach_status,
'migration_status': migration_status
}}
self._action(session, body)
def revert_to_snapshot(self, session, snapshot_id):
"""Revert volume to its snapshot"""
utils.require_microversion(session, "3.40")
body = {'revert': {'snapshot_id': snapshot_id}}
self._action(session, body)
def attach(
self, session, mountpoint, instance=None, host_name=None
):
"""Attach volume to server"""
body = {'os-attach': {
'mountpoint': mountpoint}}
if instance is not None:
body['os-attach']['instance_uuid'] = instance
elif host_name is not None:
body['os-attach']['host_name'] = host_name
else:
raise ValueError(
'Either instance_uuid or host_name must be specified')
self._action(session, body)
def detach(self, session, attachment, force=False, connector=None):
"""Detach volume from server"""
if not force:
body = {'os-detach': {'attachment_id': attachment}}
if force:
body = {'os-force_detach': {
'attachment_id': attachment}}
if connector:
body['os-force_detach']['connector'] = connector
self._action(session, body)
def unmanage(self, session):
"""Unmanage volume"""
body = {'os-unmanage': {}}
self._action(session, body)
def retype(self, session, new_type, migration_policy=None):
"""Change volume type"""
body = {'os-retype': {
'new_type': new_type}}
if migration_policy:
body['os-retype']['migration_policy'] = migration_policy
self._action(session, body)
def migrate(
self, session, host=None, force_host_copy=False,
lock_volume=False, cluster=None
):
"""Migrate volume"""
req = dict()
if host is not None:
req['host'] = host
if force_host_copy:
req['force_host_copy'] = force_host_copy
if lock_volume:
req['lock_volume'] = lock_volume
if cluster is not None:
req['cluster'] = cluster
utils.require_microversion(session, "3.16")
body = {'os-migrate_volume': req}
self._action(session, body)
def complete_migration(self, session, new_volume_id, error=False):
"""Complete volume migration"""
body = {'os-migrate_volume_completion': {
'new_volume': new_volume_id,
'error': error}}
self._action(session, body)
def force_delete(self, session):
"""Force volume deletion"""
body = {'os-force_delete': {}}
self._action(session, body)
def upload_to_image(
self, session, image_name, force=False, disk_format=None,
container_format=None, visibility=None, protected=None
):
"""Upload the volume to image service"""
req = dict(image_name=image_name, force=force)
if disk_format is not None:
req['disk_format'] = disk_format
if container_format is not None:
req['container_format'] = container_format
if visibility is not None:
req['visibility'] = visibility
if protected is not None:
req['protected'] = protected
if visibility is not None or protected is not None:
utils.require_microversion(session, "3.1")
body = {'os-volume_upload_image': req}
resp = self._action(session, body).json()
return resp['os-volume_upload_image']
def reserve(self, session):
"""Reserve volume"""
body = {'os-reserve': {}}
self._action(session, body)
def unreserve(self, session):
"""Unreserve volume"""
body = {'os-unreserve': {}}
self._action(session, body)
def begin_detaching(self, session):
"""Update volume status to 'detaching'"""
body = {'os-begin_detaching': {}}
self._action(session, body)
def abort_detaching(self, session):
"""Roll back volume status to 'in-use'"""
body = {'os-roll_detaching': {}}
self._action(session, body)
def init_attachment(self, session, connector):
"""Initialize volume attachment"""
body = {'os-initialize_connection': {'connector': connector}}
self._action(session, body)
def terminate_attachment(self, session, connector):
"""Terminate volume attachment"""
body = {'os-terminate_connection': {'connector': connector}}
self._action(session, body)

View File

@@ -25,6 +25,9 @@ class TestVolumeProxy(test_proxy_base.TestProxyBase):
super(TestVolumeProxy, self).setUp()
self.proxy = _proxy.Proxy(self.session)
class TestVolume(TestVolumeProxy):
def test_snapshot_get(self):
self.verify_get(self.proxy.get_snapshot, snapshot.Snapshot)
@@ -90,12 +93,14 @@ class TestVolumeProxy(test_proxy_base.TestProxyBase):
def test_volume_delete_ignore(self):
self.verify_delete(self.proxy.delete_volume, volume.Volume, True)
def test_volume_extend(self):
def test_volume_delete_force(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.extend",
self.proxy.extend_volume,
method_args=["value", "new-size"],
expected_args=[self.proxy, "new-size"])
"openstack.block_storage.v2.volume.Volume.force_delete",
self.proxy.delete_volume,
method_args=["value"],
method_kwargs={"force": True},
expected_args=[self.proxy]
)
def test_backend_pools(self):
self.verify_list(self.proxy.backend_pools, stats.Pools)
@@ -160,3 +165,92 @@ class TestVolumeProxy(test_proxy_base.TestProxyBase):
self.proxy.wait_for_status,
method_args=[value],
expected_args=[self.proxy, value, 'available', ['error'], 2, 120])
class TestVolumeActions(TestVolumeProxy):
def test_volume_extend(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.extend",
self.proxy.extend_volume,
method_args=["value", "new-size"],
expected_args=[self.proxy, "new-size"])
def test_volume_set_bootable(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.set_bootable_status",
self.proxy.set_volume_bootable_status,
method_args=["value", True],
expected_args=[self.proxy, True])
def test_volume_reset_volume_status(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.reset_status",
self.proxy.reset_volume_status,
method_args=["value", '1', '2', '3'],
expected_args=[self.proxy, '1', '2', '3'])
def test_attach_instance(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.attach",
self.proxy.attach_volume,
method_args=["value", '1'],
method_kwargs={'instance': '2'},
expected_args=[self.proxy, '1', '2', None])
def test_attach_host(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.attach",
self.proxy.attach_volume,
method_args=["value", '1'],
method_kwargs={'host_name': '3'},
expected_args=[self.proxy, '1', None, '3'])
def test_detach_defaults(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.detach",
self.proxy.detach_volume,
method_args=["value", '1'],
expected_args=[self.proxy, '1', False, None])
def test_detach_force(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.detach",
self.proxy.detach_volume,
method_args=["value", '1', True, {'a': 'b'}],
expected_args=[self.proxy, '1', True, {'a': 'b'}])
def test_unmanage(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.unmanage",
self.proxy.unmanage_volume,
method_args=["value"],
expected_args=[self.proxy])
def test_migrate_default(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.migrate",
self.proxy.migrate_volume,
method_args=["value", '1'],
expected_args=[self.proxy, '1', False, False])
def test_migrate_nondefault(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.migrate",
self.proxy.migrate_volume,
method_args=["value", '1', True, True],
expected_args=[self.proxy, '1', True, True])
def test_complete_migration(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.complete_migration",
self.proxy.complete_volume_migration,
method_args=["value", '1'],
expected_args=[self.proxy, "1", False])
def test_complete_migration_error(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.complete_migration",
self.proxy.complete_volume_migration,
method_args=["value", "1", True],
expected_args=[self.proxy, "1", True])

View File

@@ -12,6 +12,8 @@
from unittest import mock
from keystoneauth1 import adapter
from openstack.block_storage.v2 import volume
from openstack.tests.unit import base
@@ -62,14 +64,6 @@ VOLUME = {
class TestVolume(base.TestCase):
def setUp(self):
super(TestVolume, self).setUp()
self.resp = mock.Mock()
self.resp.body = None
self.resp.json = mock.Mock(return_value=self.resp.body)
self.sess = mock.Mock()
self.sess.post = mock.Mock(return_value=self.resp)
def test_basic(self):
sot = volume.Volume(VOLUME)
self.assertEqual("volume", sot.resource_key)
@@ -126,6 +120,20 @@ class TestVolume(base.TestCase):
sot.scheduler_hints)
self.assertFalse(sot.is_encrypted)
class TestVolumeActions(TestVolume):
def setUp(self):
super(TestVolumeActions, self).setUp()
self.resp = mock.Mock()
self.resp.body = None
self.resp.status_code = 200
self.resp.json = mock.Mock(return_value=self.resp.body)
self.sess = mock.Mock(spec=adapter.Adapter)
self.sess.default_microversion = '3.0'
self.sess.post = mock.Mock(return_value=self.resp)
self.sess._get_connection = mock.Mock(return_value=self.cloud)
def test_extend(self):
sot = volume.Volume(**VOLUME)
@@ -133,5 +141,152 @@ class TestVolume(base.TestCase):
url = 'volumes/%s/action' % FAKE_ID
body = {"os-extend": {"new_size": "20"}}
headers = {'Accept': ''}
self.sess.post.assert_called_with(url, json=body, headers=headers)
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_set_volume_bootable(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.set_bootable_status(self.sess))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-set_bootable': {'bootable': True}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_set_volume_bootable_false(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.set_bootable_status(self.sess, False))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-set_bootable': {'bootable': False}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_reset_status(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.reset_status(self.sess, '1', '2', '3'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-reset_status': {'status': '1', 'attach_status': '2',
'migration_status': '3'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_attach_instance(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.attach(self.sess, '1', '2'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-attach': {'mountpoint': '1', 'instance_uuid': '2'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_detach(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.detach(self.sess, '1'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-detach': {'attachment_id': '1'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_detach_force(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(
sot.detach(self.sess, '1', force=True))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-force_detach': {'attachment_id': '1'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_unmanage(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.unmanage(self.sess))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-unmanage': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_retype(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.retype(self.sess, '1'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-retype': {'new_type': '1'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_retype_mp(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.retype(self.sess, '1', migration_policy='2'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-retype': {'new_type': '1', 'migration_policy': '2'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_migrate(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.migrate(self.sess, host='1'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume': {'host': '1'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_migrate_flags(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.migrate(self.sess, host='1',
force_host_copy=True, lock_volume=True))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume': {'host': '1', 'force_host_copy': True,
'lock_volume': True}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_complete_migration(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.complete_migration(self.sess, new_volume_id='1'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume_completion': {'new_volume': '1', 'error':
False}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_complete_migration_error(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.complete_migration(
self.sess, new_volume_id='1', error=True))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume_completion': {'new_volume': '1', 'error':
True}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_force_delete(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.force_delete(self.sess))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-force_delete': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)

View File

@@ -30,6 +30,8 @@ class TestVolumeProxy(test_proxy_base.TestProxyBase):
super(TestVolumeProxy, self).setUp()
self.proxy = _proxy.Proxy(self.session)
class TestVolume(TestVolumeProxy):
def test_snapshot_get(self):
self.verify_get(self.proxy.get_snapshot, snapshot.Snapshot)
@@ -155,38 +157,14 @@ class TestVolumeProxy(test_proxy_base.TestProxyBase):
def test_volume_delete_ignore(self):
self.verify_delete(self.proxy.delete_volume, volume.Volume, True)
def test_volume_extend(self):
def test_volume_delete_force(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.extend",
self.proxy.extend_volume,
method_args=["value", "new-size"],
expected_args=[self.proxy, "new-size"])
def test_volume_set_readonly_no_argument(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.set_readonly",
self.proxy.set_volume_readonly,
"openstack.block_storage.v3.volume.Volume.force_delete",
self.proxy.delete_volume,
method_args=["value"],
expected_args=[self.proxy, True])
def test_volume_set_readonly_false(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.set_readonly",
self.proxy.set_volume_readonly,
method_args=["value", False],
expected_args=[self.proxy, False])
def test_volume_retype_without_migration_policy(self):
self._verify("openstack.block_storage.v3.volume.Volume.retype",
self.proxy.retype_volume,
method_args=["value", "rbd"],
expected_args=[self.proxy, "rbd", "never"])
def test_volume_retype_with_migration_policy(self):
self._verify("openstack.block_storage.v3.volume.Volume.retype",
self.proxy.retype_volume,
method_args=["value", "rbd", "on-demand"],
expected_args=[self.proxy, "rbd", "on-demand"])
method_kwargs={"force": True},
expected_args=[self.proxy]
)
def test_backend_pools(self):
self.verify_list(self.proxy.backend_pools, stats.Pools)
@@ -296,3 +274,197 @@ class TestVolumeProxy(test_proxy_base.TestProxyBase):
self.proxy.wait_for_status,
method_args=[value],
expected_args=[self.proxy, value, 'available', ['error'], 2, 120])
class TestVolumeActions(TestVolumeProxy):
def test_volume_extend(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.extend",
self.proxy.extend_volume,
method_args=["value", "new-size"],
expected_args=[self.proxy, "new-size"])
def test_volume_set_readonly_no_argument(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.set_readonly",
self.proxy.set_volume_readonly,
method_args=["value"],
expected_args=[self.proxy, True])
def test_volume_set_readonly_false(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.set_readonly",
self.proxy.set_volume_readonly,
method_args=["value", False],
expected_args=[self.proxy, False])
def test_volume_set_bootable(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.set_bootable_status",
self.proxy.set_volume_bootable_status,
method_args=["value", True],
expected_args=[self.proxy, True])
def test_volume_reset_volume_status(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.reset_status",
self.proxy.reset_volume_status,
method_args=["value", '1', '2', '3'],
expected_args=[self.proxy, '1', '2', '3'])
def test_volume_revert_to_snapshot(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.revert_to_snapshot",
self.proxy.revert_volume_to_snapshot,
method_args=["value", '1'],
expected_args=[self.proxy, '1'])
def test_attach_instance(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.attach",
self.proxy.attach_volume,
method_args=["value", '1'],
method_kwargs={'instance': '2'},
expected_args=[self.proxy, '1', '2', None])
def test_attach_host(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.attach",
self.proxy.attach_volume,
method_args=["value", '1'],
method_kwargs={'host_name': '3'},
expected_args=[self.proxy, '1', None, '3'])
def test_detach_defaults(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.detach",
self.proxy.detach_volume,
method_args=["value", '1'],
expected_args=[self.proxy, '1', False, None])
def test_detach_force(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.detach",
self.proxy.detach_volume,
method_args=["value", '1', True, {'a': 'b'}],
expected_args=[self.proxy, '1', True, {'a': 'b'}])
def test_unmanage(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.unmanage",
self.proxy.unmanage_volume,
method_args=["value"],
expected_args=[self.proxy])
def test_migrate_default(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.migrate",
self.proxy.migrate_volume,
method_args=["value", '1'],
expected_args=[self.proxy, '1', False, False, None])
def test_migrate_nondefault(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.migrate",
self.proxy.migrate_volume,
method_args=["value", '1', True, True],
expected_args=[self.proxy, '1', True, True, None])
def test_migrate_cluster(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.migrate",
self.proxy.migrate_volume,
method_args=["value"],
method_kwargs={'cluster': '3'},
expected_args=[self.proxy, None, False, False, '3'])
def test_complete_migration(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.complete_migration",
self.proxy.complete_volume_migration,
method_args=["value", '1'],
expected_args=[self.proxy, "1", False])
def test_complete_migration_error(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.complete_migration",
self.proxy.complete_volume_migration,
method_args=["value", "1", True],
expected_args=[self.proxy, "1", True])
def test_upload_to_image(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.upload_to_image",
self.proxy.upload_volume_to_image,
method_args=["value", "1"],
expected_args=[self.proxy, "1"],
expected_kwargs={
"force": False,
"disk_format": None,
"container_format": None,
"visibility": None,
"protected": None
})
def test_upload_to_image_extended(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.upload_to_image",
self.proxy.upload_volume_to_image,
method_args=["value", "1"],
method_kwargs={
"disk_format": "2",
"container_format": "3",
"visibility": "4",
"protected": "5"
},
expected_args=[self.proxy, "1"],
expected_kwargs={
"force": False,
"disk_format": "2",
"container_format": "3",
"visibility": "4",
"protected": "5"
})
def test_reserve(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.reserve",
self.proxy.reserve_volume,
method_args=["value"],
expected_args=[self.proxy])
def test_unreserve(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.unreserve",
self.proxy.unreserve_volume,
method_args=["value"],
expected_args=[self.proxy])
def test_begin_detaching(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.begin_detaching",
self.proxy.begin_volume_detaching,
method_args=["value"],
expected_args=[self.proxy])
def test_abort_detaching(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.abort_detaching",
self.proxy.abort_volume_detaching,
method_args=["value"],
expected_args=[self.proxy])
def test_init_attachment(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.init_attachment",
self.proxy.init_volume_attachment,
method_args=["value", "1"],
expected_args=[self.proxy, "1"])
def test_terminate_attachment(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.terminate_attachment",
self.proxy.terminate_volume_attachment,
method_args=["value", "1"],
expected_args=[self.proxy, "1"])

View File

@@ -12,6 +12,9 @@
from unittest import mock
from keystoneauth1 import adapter
from openstack import exceptions
from openstack.block_storage.v3 import volume
from openstack.tests.unit import base
@@ -62,14 +65,6 @@ VOLUME = {
class TestVolume(base.TestCase):
def setUp(self):
super(TestVolume, self).setUp()
self.resp = mock.Mock()
self.resp.body = None
self.resp.json = mock.Mock(return_value=self.resp.body)
self.sess = mock.Mock()
self.sess.post = mock.Mock(return_value=self.resp)
def test_basic(self):
sot = volume.Volume(VOLUME)
self.assertEqual("volume", sot.resource_key)
@@ -85,6 +80,8 @@ class TestVolume(base.TestCase):
"status": "status",
"all_projects": "all_tenants",
"project_id": "project_id",
"created_at": "created_at",
"updated_at": "updated_at",
"limit": "limit",
"marker": "marker"},
sot._query_mapping._mapping)
@@ -126,6 +123,20 @@ class TestVolume(base.TestCase):
self.assertDictEqual(VOLUME["OS-SCH-HNT:scheduler_hints"],
sot.scheduler_hints)
class TestVolumeActions(TestVolume):
def setUp(self):
super(TestVolumeActions, self).setUp()
self.resp = mock.Mock()
self.resp.body = None
self.resp.status_code = 200
self.resp.json = mock.Mock(return_value=self.resp.body)
self.sess = mock.Mock(spec=adapter.Adapter)
self.sess.default_microversion = '3.0'
self.sess.post = mock.Mock(return_value=self.resp)
self.sess._get_connection = mock.Mock(return_value=self.cloud)
def test_extend(self):
sot = volume.Volume(**VOLUME)
@@ -133,8 +144,8 @@ class TestVolume(base.TestCase):
url = 'volumes/%s/action' % FAKE_ID
body = {"os-extend": {"new_size": "20"}}
headers = {'Accept': ''}
self.sess.post.assert_called_with(url, json=body, headers=headers)
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_set_volume_readonly(self):
sot = volume.Volume(**VOLUME)
@@ -143,8 +154,8 @@ class TestVolume(base.TestCase):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-update_readonly_flag': {'readonly': True}}
headers = {'Accept': ''}
self.sess.post.assert_called_with(url, json=body, headers=headers)
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_set_volume_readonly_false(self):
sot = volume.Volume(**VOLUME)
@@ -153,20 +164,321 @@ class TestVolume(base.TestCase):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-update_readonly_flag': {'readonly': False}}
headers = {'Accept': ''}
self.sess.post.assert_called_with(url, json=body, headers=headers)
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_set_volume_bootable(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.set_bootable_status(self.sess))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-set_bootable': {'bootable': True}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_set_volume_bootable_false(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.set_bootable_status(self.sess, False))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-set_bootable': {'bootable': False}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_reset_status(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.reset_status(self.sess, '1', '2', '3'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-reset_status': {'status': '1', 'attach_status': '2',
'migration_status': '3'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
@mock.patch('openstack.utils.require_microversion', autospec=True,
side_effect=[exceptions.SDKException()])
def test_revert_to_snapshot_before_340(self, mv_mock):
sot = volume.Volume(**VOLUME)
self.assertRaises(
exceptions.SDKException,
sot.revert_to_snapshot,
self.sess,
'1'
)
@mock.patch('openstack.utils.require_microversion', autospec=True,
side_effect=[None])
def test_revert_to_snapshot_after_340(self, mv_mock):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.revert_to_snapshot(self.sess, '1'))
url = 'volumes/%s/action' % FAKE_ID
body = {'revert': {'snapshot_id': '1'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
mv_mock.assert_called_with(self.sess, '3.40')
def test_attach_instance(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.attach(self.sess, '1', instance='2'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-attach': {'mountpoint': '1', 'instance_uuid': '2'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_attach_host(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.attach(self.sess, '1', host_name='2'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-attach': {'mountpoint': '1', 'host_name': '2'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_attach_error(self):
sot = volume.Volume(**VOLUME)
self.assertRaises(
ValueError,
sot.attach,
self.sess,
'1')
def test_detach(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.detach(self.sess, '1'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-detach': {'attachment_id': '1'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_detach_force(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(
sot.detach(self.sess, '1', force=True, connector={'a': 'b'}))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-force_detach': {'attachment_id': '1',
'connector': {'a': 'b'}}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_unmanage(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.unmanage(self.sess))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-unmanage': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_retype(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.retype(self.sess, 'rbd', 'on-demand'))
self.assertIsNone(sot.retype(self.sess, '1'))
url = 'volumes/%s/action' % FAKE_ID
body = {
'os-retype': {
'new_type': 'rbd',
'migration_policy': 'on-demand'
}
}
headers = {'Accept': ''}
self.sess.post.assert_called_with(url, json=body, headers=headers)
body = {'os-retype': {'new_type': '1'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_retype_mp(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.retype(self.sess, '1', migration_policy='2'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-retype': {'new_type': '1', 'migration_policy': '2'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_migrate(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.migrate(self.sess, host='1'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume': {'host': '1'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_migrate_flags(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.migrate(self.sess, host='1',
force_host_copy=True, lock_volume=True))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume': {'host': '1', 'force_host_copy': True,
'lock_volume': True}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
@mock.patch('openstack.utils.require_microversion', autospec=True,
side_effect=[None])
def test_migrate_cluster(self, mv_mock):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.migrate(self.sess, cluster='1',
force_host_copy=True, lock_volume=True))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume': {'cluster': '1', 'force_host_copy': True,
'lock_volume': True}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
mv_mock.assert_called_with(self.sess, '3.16')
def test_complete_migration(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.complete_migration(self.sess, new_volume_id='1'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume_completion': {'new_volume': '1', 'error':
False}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_complete_migration_error(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.complete_migration(
self.sess, new_volume_id='1', error=True))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume_completion': {'new_volume': '1', 'error':
True}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_force_delete(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.force_delete(self.sess))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-force_delete': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_upload_image(self):
sot = volume.Volume(**VOLUME)
self.resp = mock.Mock()
self.resp.body = {'os-volume_upload_image': {'a': 'b'}}
self.resp.status_code = 200
self.resp.json = mock.Mock(return_value=self.resp.body)
self.sess.post = mock.Mock(return_value=self.resp)
self.assertDictEqual({'a': 'b'}, sot.upload_to_image(self.sess, '1'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-volume_upload_image': {
'image_name': '1',
'force': False
}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
@mock.patch('openstack.utils.require_microversion', autospec=True,
side_effect=[None])
def test_upload_image_args(self, mv_mock):
sot = volume.Volume(**VOLUME)
self.resp = mock.Mock()
self.resp.body = {'os-volume_upload_image': {'a': 'b'}}
self.resp.status_code = 200
self.resp.json = mock.Mock(return_value=self.resp.body)
self.sess.post = mock.Mock(return_value=self.resp)
self.assertDictEqual(
{'a': 'b'},
sot.upload_to_image(self.sess, '1', disk_format='2',
container_format='3', visibility='4',
protected='5'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-volume_upload_image': {
'image_name': '1',
'force': False,
'disk_format': '2',
'container_format': '3',
'visibility': '4',
'protected': '5'
}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
mv_mock.assert_called_with(self.sess, '3.1')
def test_reserve(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.reserve(self.sess))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-reserve': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_unreserve(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.unreserve(self.sess))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-unreserve': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_begin_detaching(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.begin_detaching(self.sess))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-begin_detaching': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_abort_detaching(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.abort_detaching(self.sess))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-roll_detaching': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_init_attachment(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.init_attachment(self.sess, {'a': 'b'}))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-initialize_connection': {'connector': {'a': 'b'}}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
def test_terminate_attachment(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.terminate_attachment(self.sess, {'a': 'b'}))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-terminate_connection': {'connector': {'a': 'b'}}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)