Blackify openstack.block_storage

Black used with the '-l 79 -S' flags.

A future change will ignore this commit in git-blame history by adding a
'git-blame-ignore-revs' file.

Change-Id: I502d2788eb75e674e8b399034513996c81407216
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane 2023-05-03 12:12:59 +01:00
parent 542ddaa1ad
commit 34da09f312
46 changed files with 1009 additions and 763 deletions

View File

@ -16,10 +16,16 @@ from openstack import proxy
class BaseBlockStorageProxy(proxy.Proxy, metaclass=abc.ABCMeta):
def create_image(
self, name, volume, allow_duplicates,
container_format, disk_format, wait, timeout):
self,
name,
volume,
allow_duplicates,
container_format,
disk_format,
wait,
timeout,
):
if not disk_format:
disk_format = self._connection.config.config['image_format']
if not container_format:
@ -33,7 +39,8 @@ class BaseBlockStorageProxy(proxy.Proxy, metaclass=abc.ABCMeta):
if not volume_obj:
raise exceptions.SDKException(
"Volume {volume} given to create_image could"
" not be found".format(volume=volume))
" not be found".format(volume=volume)
)
volume_id = volume_obj['id']
data = self.post(
'/volumes/{id}/action'.format(id=volume_id),
@ -42,7 +49,11 @@ class BaseBlockStorageProxy(proxy.Proxy, metaclass=abc.ABCMeta):
'force': allow_duplicates,
'image_name': name,
'container_format': container_format,
'disk_format': disk_format}})
'disk_format': disk_format,
}
},
)
response = self._connection._get_and_munchify(
'os-volume_upload_image', data)
'os-volume_upload_image', data
)
return self._connection.image._existing_image(id=response['image_id'])

View File

@ -132,8 +132,9 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
:returns: ``None``
"""
self._delete(_snapshot.Snapshot, snapshot,
ignore_missing=ignore_missing)
self._delete(
_snapshot.Snapshot, snapshot, ignore_missing=ignore_missing
)
# ====== SNAPSHOT ACTIONS ======
def reset_snapshot(self, snapshot, status):
@ -398,9 +399,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
volume = self._get_resource(_volume.Volume, volume)
volume.reset_status(self, status, attach_status, migration_status)
def attach_volume(
self, volume, mountpoint, instance=None, host_name=None
):
def attach_volume(self, volume, mountpoint, instance=None, host_name=None):
"""Attaches a volume to a server.
:param volume: The value can be either the ID of a volume or a
@ -414,9 +413,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
volume = self._get_resource(_volume.Volume, volume)
volume.attach(self, mountpoint, instance, host_name)
def detach_volume(
self, volume, attachment, force=False, connector=None
):
def detach_volume(self, volume, attachment, force=False, connector=None):
"""Detaches a volume from a server.
:param volume: The value can be either the ID of a volume or a
@ -444,8 +441,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
volume.unmanage(self)
def migrate_volume(
self, volume, host=None, force_host_copy=False,
lock_volume=False
self, volume, host=None, force_host_copy=False, lock_volume=False
):
"""Migrates a volume to the specified host.
@ -466,9 +462,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
volume = self._get_resource(_volume.Volume, volume)
volume.migrate(self, host, force_host_copy, lock_volume)
def complete_volume_migration(
self, volume, new_volume, error=False
):
def complete_volume_migration(self, volume, new_volume, error=False):
"""Complete the migration of a volume.
:param volume: The value can be either the ID of a volume or a
@ -584,8 +578,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
:returns: ``None``
"""
if not force:
self._delete(
_backup.Backup, backup, ignore_missing=ignore_missing)
self._delete(_backup.Backup, backup, ignore_missing=ignore_missing)
else:
backup = self._get_resource(_backup.Backup, backup)
backup.force_delete(self)
@ -617,8 +610,9 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
backup = self._get_resource(_backup.Backup, backup)
backup.reset(self, status)
def wait_for_status(self, res, status='available', failures=None,
interval=2, wait=120):
def wait_for_status(
self, res, status='available', failures=None, interval=2, wait=120
):
"""Wait for a resource to be in a particular status.
:param res: The resource to wait on to reach the specified status.
@ -641,7 +635,8 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
"""
failures = ['error'] if failures is None else failures
return resource.wait_for_status(
self, res, status, failures, interval, wait)
self, res, status, failures, interval, wait
)
def wait_for_delete(self, res, interval=2, wait=120):
"""Wait for a resource to be deleted.
@ -674,9 +669,9 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
"""
project = self._get_resource(_project.Project, project)
res = self._get_resource(
_quota_set.QuotaSet, None, project_id=project.id)
return res.fetch(
self, usage=usage, **query)
_quota_set.QuotaSet, None, project_id=project.id
)
return res.fetch(self, usage=usage, **query)
def get_quota_set_defaults(self, project):
"""Show QuotaSet defaults for the project
@ -691,9 +686,9 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
"""
project = self._get_resource(_project.Project, project)
res = self._get_resource(
_quota_set.QuotaSet, None, project_id=project.id)
return res.fetch(
self, base_path='/os-quota-sets/defaults')
_quota_set.QuotaSet, None, project_id=project.id
)
return res.fetch(self, base_path='/os-quota-sets/defaults')
def revert_quota_set(self, project, **query):
"""Reset Quota for the project/user.
@ -707,7 +702,8 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
"""
project = self._get_resource(_project.Project, project)
res = self._get_resource(
_quota_set.QuotaSet, None, project_id=project.id)
_quota_set.QuotaSet, None, project_id=project.id
)
if not query:
query = {}

View File

@ -16,14 +16,22 @@ from openstack import utils
class Backup(resource.Resource):
"""Volume Backup"""
resource_key = "backup"
resources_key = "backups"
base_path = "/backups"
_query_mapping = resource.QueryParameters(
'all_tenants', 'limit', 'marker', 'project_id',
'name', 'status', 'volume_id',
'sort_key', 'sort_dir')
'all_tenants',
'limit',
'marker',
'project_id',
'name',
'status',
'volume_id',
'sort_key',
'sort_dir',
)
# capabilities
allow_fetch = True
@ -97,35 +105,48 @@ class Backup(resource.Resource):
session = self._get_session(session)
microversion = self._get_microversion(session, action='create')
requires_id = (self.create_requires_id
if self.create_requires_id is not None
else self.create_method == 'PUT')
requires_id = (
self.create_requires_id
if self.create_requires_id is not None
else self.create_method == 'PUT'
)
if self.create_exclude_id_from_body:
self._body._dirty.discard("id")
if self.create_method == 'POST':
request = self._prepare_request(requires_id=requires_id,
prepend_key=prepend_key,
base_path=base_path)
request = self._prepare_request(
requires_id=requires_id,
prepend_key=prepend_key,
base_path=base_path,
)
# NOTE(gtema) this is a funny example of when attribute
# is called "incremental" on create, "is_incremental" on get
# and use of "alias" or "aka" is not working for such conflict,
# since our preferred attr name is exactly "is_incremental"
body = request.body
if 'is_incremental' in body['backup']:
body['backup']['incremental'] = \
body['backup'].pop('is_incremental')
response = session.post(request.url,
json=request.body, headers=request.headers,
microversion=microversion, params=params)
body['backup']['incremental'] = body['backup'].pop(
'is_incremental'
)
response = session.post(
request.url,
json=request.body,
headers=request.headers,
microversion=microversion,
params=params,
)
else:
# Just for safety of the implementation (since PUT removed)
raise exceptions.ResourceFailure(
"Invalid create method: %s" % self.create_method)
"Invalid create method: %s" % self.create_method
)
has_body = (self.has_body if self.create_returns_body is None
else self.create_returns_body)
has_body = (
self.has_body
if self.create_returns_body is None
else self.create_returns_body
)
self.microversion = microversion
self._translate_response(response, has_body=has_body)
# direct comparision to False since we need to rule out None
@ -137,8 +158,9 @@ class Backup(resource.Resource):
def _action(self, session, body, microversion=None):
"""Preform backup actions given the message body."""
url = utils.urljoin(self.base_path, self.id, 'action')
resp = session.post(url, json=body,
microversion=self._max_microversion)
resp = session.post(
url, json=body, microversion=self._max_microversion
)
exceptions.raise_from_response(resp)
return resp
@ -157,22 +179,20 @@ class Backup(resource.Resource):
if name:
body['restore']['name'] = name
if not (volume_id or name):
raise exceptions.SDKException('Either of `name` or `volume_id`'
' must be specified.')
response = session.post(url,
json=body)
raise exceptions.SDKException(
'Either of `name` or `volume_id`' ' must be specified.'
)
response = session.post(url, json=body)
self._translate_response(response, has_body=False)
return self
def force_delete(self, session):
"""Force backup deletion
"""
"""Force backup deletion"""
body = {'os-force_delete': {}}
self._action(session, body)
def reset(self, session, status):
"""Reset the status of the backup
"""
"""Reset the status of the backup"""
body = {'os-reset_status': {'status': status}}
self._action(session, body)

View File

@ -23,7 +23,8 @@ class Snapshot(resource.Resource, metadata.MetadataMixin):
base_path = "/snapshots"
_query_mapping = resource.QueryParameters(
'name', 'status', 'volume_id', all_projects='all_tenants')
'name', 'status', 'volume_id', all_projects='all_tenants'
)
# capabilities
allow_fetch = True
@ -53,14 +54,14 @@ class Snapshot(resource.Resource, metadata.MetadataMixin):
def _action(self, session, body, microversion=None):
"""Preform backup actions given the message body."""
url = utils.urljoin(self.base_path, self.id, 'action')
resp = session.post(url, json=body,
microversion=self._max_microversion)
resp = session.post(
url, json=body, microversion=self._max_microversion
)
exceptions.raise_from_response(resp)
return resp
def reset(self, session, status):
"""Reset the status of the snapshot.
"""
"""Reset the status of the snapshot."""
body = {'os-reset_status': {'status': status}}
self._action(session, body)

View File

@ -21,7 +21,8 @@ class Volume(resource.Resource, metadata.MetadataMixin):
base_path = "/volumes"
_query_mapping = resource.QueryParameters(
'name', 'status', 'project_id', all_projects='all_tenants')
'name', 'status', 'project_id', all_projects='all_tenants'
)
# capabilities
allow_fetch = True
@ -45,7 +46,8 @@ class Volume(resource.Resource, metadata.MetadataMixin):
description = resource.Body("description")
#: Extended replication status on this volume.
extended_replication_status = resource.Body(
"os-volume-replication:extended_status")
"os-volume-replication:extended_status"
)
#: The volume's current back-end.
host = resource.Body("os-vol-host-attr:host")
#: The ID of the image from which you want to create the volume.
@ -66,7 +68,8 @@ class Volume(resource.Resource, metadata.MetadataMixin):
project_id = resource.Body("os-vol-tenant-attr:tenant_id")
#: Data set by the replication driver
replication_driver_data = resource.Body(
"os-volume-replication:driver_data")
"os-volume-replication:driver_data"
)
#: Status of replication on this volume.
replication_status = resource.Body("replication_status")
#: Scheduler hints for the volume
@ -111,24 +114,22 @@ class Volume(resource.Resource, metadata.MetadataMixin):
body = {'os-set_bootable': {'bootable': bootable}}
self._action(session, body)
def reset_status(
self, session, status, attach_status, migration_status
):
def reset_status(self, session, status, attach_status, migration_status):
"""Reset volume statuses (admin operation)"""
body = {'os-reset_status': {
'status': status,
'attach_status': attach_status,
'migration_status': migration_status
}}
body = {
'os-reset_status': {
'status': status,
'attach_status': attach_status,
'migration_status': migration_status,
}
}
self._action(session, body)
def attach(
self, session, mountpoint, instance
):
def attach(self, session, mountpoint, instance):
"""Attach volume to server"""
body = {'os-attach': {
'mountpoint': mountpoint,
'instance_uuid': instance}}
body = {
'os-attach': {'mountpoint': mountpoint, 'instance_uuid': instance}
}
self._action(session, body)
@ -137,8 +138,7 @@ class Volume(resource.Resource, metadata.MetadataMixin):
if not force:
body = {'os-detach': {'attachment_id': attachment}}
if force:
body = {'os-force_detach': {
'attachment_id': attachment}}
body = {'os-force_detach': {'attachment_id': attachment}}
self._action(session, body)
@ -150,16 +150,14 @@ class Volume(resource.Resource, metadata.MetadataMixin):
def retype(self, session, new_type, migration_policy=None):
"""Change volume type"""
body = {'os-retype': {
'new_type': new_type}}
body = {'os-retype': {'new_type': new_type}}
if migration_policy:
body['os-retype']['migration_policy'] = migration_policy
self._action(session, body)
def migrate(
self, session, host=None, force_host_copy=False,
lock_volume=False
self, session, host=None, force_host_copy=False, lock_volume=False
):
"""Migrate volume"""
req = dict()
@ -175,9 +173,12 @@ class Volume(resource.Resource, metadata.MetadataMixin):
def complete_migration(self, session, new_volume_id, error=False):
"""Complete volume migration"""
body = {'os-migrate_volume_completion': {
'new_volume': new_volume_id,
'error': error}}
body = {
'os-migrate_volume_completion': {
'new_volume': new_volume_id,
'error': error,
}
}
self._action(session, body)

View File

@ -45,7 +45,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
"snapshot": _snapshot.Snapshot,
"stats_pools": _stats.Pools,
"type": _type.Type,
"volume": _volume.Volume
"volume": _volume.Volume,
}
# ====== SNAPSHOTS ======
@ -168,7 +168,8 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
"""
if not force:
self._delete(
_snapshot.Snapshot, snapshot, ignore_missing=ignore_missing)
_snapshot.Snapshot, snapshot, ignore_missing=ignore_missing
)
else:
snapshot = self._get_resource(_snapshot.Snapshot, snapshot)
snapshot.force_delete(self)
@ -405,9 +406,11 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
"""
volume_type = self._get_resource(_type.Type, volume_type_id)
return self._get(_type.TypeEncryption,
volume_type_id=volume_type.id,
requires_id=False)
return self._get(
_type.TypeEncryption,
volume_type_id=volume_type.id,
requires_id=False,
)
def create_type_encryption(self, volume_type, **attrs):
"""Create new type encryption from attributes
@ -425,11 +428,13 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
"""
volume_type = self._get_resource(_type.Type, volume_type)
return self._create(_type.TypeEncryption,
volume_type_id=volume_type.id, **attrs)
return self._create(
_type.TypeEncryption, volume_type_id=volume_type.id, **attrs
)
def delete_type_encryption(self, encryption=None,
volume_type=None, ignore_missing=True):
def delete_type_encryption(
self, encryption=None, volume_type=None, ignore_missing=True
):
"""Delete type encryption attributes
:param encryption: The value can be None or a
@ -452,12 +457,15 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
if volume_type:
volume_type = self._get_resource(_type.Type, volume_type)
encryption = self._get(_type.TypeEncryption,
volume_type=volume_type.id,
requires_id=False)
encryption = self._get(
_type.TypeEncryption,
volume_type=volume_type.id,
requires_id=False,
)
self._delete(_type.TypeEncryption, encryption,
ignore_missing=ignore_missing)
self._delete(
_type.TypeEncryption, encryption, ignore_missing=ignore_missing
)
def update_type_encryption(
self,
@ -725,9 +733,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
volume = self._get_resource(_volume.Volume, volume)
volume.reset_status(self, status, attach_status, migration_status)
def revert_volume_to_snapshot(
self, volume, snapshot
):
def revert_volume_to_snapshot(self, volume, snapshot):
"""Revert a volume to its latest snapshot.
This method only support reverting a detached volume, and the
@ -744,9 +750,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
snapshot = self._get_resource(_snapshot.Snapshot, snapshot)
volume.revert_to_snapshot(self, snapshot.id)
def attach_volume(
self, volume, mountpoint, instance=None, host_name=None
):
def attach_volume(self, volume, mountpoint, instance=None, host_name=None):
"""Attaches a volume to a server.
:param volume: The value can be either the ID of a volume or a
@ -760,9 +764,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
volume = self._get_resource(_volume.Volume, volume)
volume.attach(self, mountpoint, instance, host_name)
def detach_volume(
self, volume, attachment, force=False, connector=None
):
def detach_volume(self, volume, attachment, force=False, connector=None):
"""Detaches a volume from a server.
:param volume: The value can be either the ID of a volume or a
@ -784,13 +786,17 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.block_storage.v3.volume.Volume` instance.
:returns: None """
:returns: None"""
volume = self._get_resource(_volume.Volume, volume)
volume.unmanage(self)
def migrate_volume(
self, volume, host=None, force_host_copy=False,
lock_volume=False, cluster=None
self,
volume,
host=None,
force_host_copy=False,
lock_volume=False,
cluster=None,
):
"""Migrates a volume to the specified host.
@ -816,9 +822,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
volume = self._get_resource(_volume.Volume, volume)
volume.migrate(self, host, force_host_copy, lock_volume, cluster)
def complete_volume_migration(
self, volume, new_volume, error=False
):
def complete_volume_migration(self, volume, new_volume, error=False):
"""Complete the migration of a volume.
:param volume: The value can be either the ID of a volume or a
@ -833,8 +837,14 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
volume.complete_migration(self, new_volume, error)
def upload_volume_to_image(
self, volume, image_name, force=False, disk_format=None,
container_format=None, visibility=None, protected=None
self,
volume,
image_name,
force=False,
disk_format=None,
container_format=None,
visibility=None,
protected=None,
):
"""Uploads the specified volume to image service.
@ -852,9 +862,13 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
"""
volume = self._get_resource(_volume.Volume, volume)
volume.upload_to_image(
self, image_name, force=force, disk_format=disk_format,
container_format=container_format, visibility=visibility,
protected=protected
self,
image_name,
force=force,
disk_format=disk_format,
container_format=container_format,
visibility=visibility,
protected=protected,
)
def reserve_volume(self, volume):
@ -863,7 +877,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.block_storage.v3.volume.Volume` instance.
:returns: None """
:returns: None"""
volume = self._get_resource(_volume.Volume, volume)
volume.reserve(self)
@ -873,7 +887,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.block_storage.v3.volume.Volume` instance.
:returns: None """
:returns: None"""
volume = self._get_resource(_volume.Volume, volume)
volume.unreserve(self)
@ -883,7 +897,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.block_storage.v3.volume.Volume` instance.
:returns: None """
:returns: None"""
volume = self._get_resource(_volume.Volume, volume)
volume.begin_detaching(self)
@ -893,7 +907,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.block_storage.v3.volume.Volume` instance.
:returns: None """
:returns: None"""
volume = self._get_resource(_volume.Volume, volume)
volume.abort_detaching(self)
@ -904,7 +918,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
:class:`~openstack.block_storage.v3.volume.Volume` instance.
:param dict connector: The connector object.
:returns: None """
:returns: None"""
volume = self._get_resource(_volume.Volume, volume)
volume.init_attachment(self, connector)
@ -1022,8 +1036,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
:returns: ``None``
"""
if not force:
self._delete(
_backup.Backup, backup, ignore_missing=ignore_missing)
self._delete(_backup.Backup, backup, ignore_missing=ignore_missing)
else:
backup = self._get_resource(_backup.Backup, backup)
backup.force_delete(self)
@ -1295,7 +1308,8 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
:returns: None
"""
resource = self._get_resource(
_group_snapshot.GroupSnapshot, group_snapshot)
_group_snapshot.GroupSnapshot, group_snapshot
)
resource.reset_state(self, state)
def delete_group_snapshot(self, group_snapshot, ignore_missing=True):
@ -1307,8 +1321,10 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
:returns: None
"""
self._delete(
_group_snapshot.GroupSnapshot, group_snapshot,
ignore_missing=ignore_missing)
_group_snapshot.GroupSnapshot,
group_snapshot,
ignore_missing=ignore_missing,
)
# ====== GROUP TYPE ======
def get_group_type(self, group_type):
@ -1395,7 +1411,8 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
:returns: None
"""
self._delete(
_group_type.GroupType, group_type, ignore_missing=ignore_missing)
_group_type.GroupType, group_type, ignore_missing=ignore_missing
)
def update_group_type(self, group_type, **attrs):
"""Update a group_type
@ -1408,8 +1425,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
:returns: The updated group type.
:rtype: :class:`~openstack.block_storage.v3.group_type.GroupType`
"""
return self._update(
_group_type.GroupType, group_type, **attrs)
return self._update(_group_type.GroupType, group_type, **attrs)
def fetch_group_type_group_specs(self, group_type):
"""Lists group specs of a group type.
@ -1488,9 +1504,9 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
"""
project = self._get_resource(_project.Project, project)
res = self._get_resource(
_quota_set.QuotaSet, None, project_id=project.id)
return res.fetch(
self, usage=usage, **query)
_quota_set.QuotaSet, None, project_id=project.id
)
return res.fetch(self, usage=usage, **query)
def get_quota_set_defaults(self, project):
"""Show QuotaSet defaults for the project
@ -1505,9 +1521,9 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
"""
project = self._get_resource(_project.Project, project)
res = self._get_resource(
_quota_set.QuotaSet, None, project_id=project.id)
return res.fetch(
self, base_path='/os-quota-sets/defaults')
_quota_set.QuotaSet, None, project_id=project.id
)
return res.fetch(self, base_path='/os-quota-sets/defaults')
def revert_quota_set(self, project, **query):
"""Reset Quota for the project/user.
@ -1521,7 +1537,8 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
"""
project = self._get_resource(_project.Project, project)
res = self._get_resource(
_quota_set.QuotaSet, None, project_id=project.id)
_quota_set.QuotaSet, None, project_id=project.id
)
return res.delete(self, **query)
@ -1561,7 +1578,12 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
# ====== UTILS ======
def wait_for_status(
self, res, status='available', failures=None, interval=2, wait=120,
self,
res,
status='available',
failures=None,
interval=2,
wait=120,
):
"""Wait for a resource to be in a particular status.
@ -1584,7 +1606,8 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
"""
failures = ['error'] if failures is None else failures
return resource.wait_for_status(
self, res, status, failures, interval, wait)
self, res, status, failures, interval, wait
)
def wait_for_delete(self, res, interval=2, wait=120):
"""Wait for a resource to be deleted.
@ -1602,11 +1625,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
return resource.wait_for_delete(self, res, interval, wait)
def _get_cleanup_dependencies(self):
return {
'block_storage': {
'before': []
}
}
return {'block_storage': {'before': []}}
def _service_cleanup(
self,
@ -1614,7 +1633,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
client_status_queue=None,
identified_resources=None,
filters=None,
resource_evaluation_fn=None
resource_evaluation_fn=None,
):
# It is not possible to delete backup if there are dependent backups.
# In order to be able to do cleanup those is required to have multiple
@ -1634,7 +1653,8 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
client_status_queue=client_status_queue,
identified_resources=identified_resources,
filters=filters,
resource_evaluation_fn=resource_evaluation_fn)
resource_evaluation_fn=resource_evaluation_fn,
)
else:
# Set initial iterations conditions
need_backup_iteration = True
@ -1647,7 +1667,7 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
# To increase success chance sort backups by age, dependent
# backups are logically younger.
for obj in self.backups(
details=True, sort_key='created_at', sort_dir='desc'
details=True, sort_key='created_at', sort_dir='desc'
):
if not obj.has_dependent_backups:
# If no dependent backups - go with it
@ -1658,7 +1678,8 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
client_status_queue=client_status_queue,
identified_resources=identified_resources,
filters=filters,
resource_evaluation_fn=resource_evaluation_fn)
resource_evaluation_fn=resource_evaluation_fn,
)
if not dry_run and need_delete:
backups.append(obj)
else:
@ -1682,7 +1703,8 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
client_status_queue=client_status_queue,
identified_resources=identified_resources,
filters=filters,
resource_evaluation_fn=resource_evaluation_fn)
resource_evaluation_fn=resource_evaluation_fn,
)
if not dry_run and need_delete:
snapshots.append(obj)
@ -1702,4 +1724,5 @@ class Proxy(_base_proxy.BaseBlockStorageProxy):
client_status_queue=client_status_queue,
identified_resources=identified_resources,
filters=filters,
resource_evaluation_fn=resource_evaluation_fn)
resource_evaluation_fn=resource_evaluation_fn,
)

View File

@ -16,6 +16,7 @@ from openstack import utils
class Backup(resource.Resource):
"""Volume Backup"""
resource_key = "backup"
resources_key = "backups"
base_path = "/backups"
@ -24,9 +25,16 @@ class Backup(resource.Resource):
# search (name~, status~, volume_id~). But this is not documented
# officially and seem to require microversion be set
_query_mapping = resource.QueryParameters(
'all_tenants', 'limit', 'marker', 'project_id',
'name', 'status', 'volume_id',
'sort_key', 'sort_dir')
'all_tenants',
'limit',
'marker',
'project_id',
'name',
'status',
'volume_id',
'sort_key',
'sort_dir',
)
# capabilities
allow_fetch = True
@ -111,35 +119,48 @@ class Backup(resource.Resource):
session = self._get_session(session)
microversion = self._get_microversion(session, action='create')
requires_id = (self.create_requires_id
if self.create_requires_id is not None
else self.create_method == 'PUT')
requires_id = (
self.create_requires_id
if self.create_requires_id is not None
else self.create_method == 'PUT'
)
if self.create_exclude_id_from_body:
self._body._dirty.discard("id")
if self.create_method == 'POST':
request = self._prepare_request(requires_id=requires_id,
prepend_key=prepend_key,
base_path=base_path)
request = self._prepare_request(
requires_id=requires_id,
prepend_key=prepend_key,
base_path=base_path,
)
# NOTE(gtema) this is a funny example of when attribute
# is called "incremental" on create, "is_incremental" on get
# and use of "alias" or "aka" is not working for such conflict,
# since our preferred attr name is exactly "is_incremental"
body = request.body
if 'is_incremental' in body['backup']:
body['backup']['incremental'] = \
body['backup'].pop('is_incremental')
response = session.post(request.url,
json=request.body, headers=request.headers,
microversion=microversion, params=params)
body['backup']['incremental'] = body['backup'].pop(
'is_incremental'
)
response = session.post(
request.url,
json=request.body,
headers=request.headers,
microversion=microversion,
params=params,
)
else:
# Just for safety of the implementation (since PUT removed)
raise exceptions.ResourceFailure(
"Invalid create method: %s" % self.create_method)
"Invalid create method: %s" % self.create_method
)
has_body = (self.has_body if self.create_returns_body is None
else self.create_returns_body)
has_body = (
self.has_body
if self.create_returns_body is None
else self.create_returns_body
)
self.microversion = microversion
self._translate_response(response, has_body=has_body)
# direct comparision to False since we need to rule out None
@ -151,8 +172,9 @@ class Backup(resource.Resource):
def _action(self, session, body, microversion=None):
"""Preform backup actions given the message body."""
url = utils.urljoin(self.base_path, self.id, 'action')
resp = session.post(url, json=body,
microversion=self._max_microversion)
resp = session.post(
url, json=body, microversion=self._max_microversion
)
exceptions.raise_from_response(resp)
return resp
@ -171,21 +193,20 @@ class Backup(resource.Resource):
if name:
body['restore']['name'] = name
if not (volume_id or name):
raise exceptions.SDKException('Either of `name` or `volume_id`'
' must be specified.')
raise exceptions.SDKException(
'Either of `name` or `volume_id`' ' must be specified.'
)
response = session.post(url, json=body)
self._translate_response(response, has_body=False)
return self
def force_delete(self, session):
"""Force backup deletion
"""
"""Force backup deletion"""
body = {'os-force_delete': {}}
self._action(session, body)
def reset(self, session, status):
"""Reset the status of the backup
"""
"""Reset the status of the backup"""
body = {'os-reset_status': {'status': status}}
self._action(session, body)

View File

@ -15,6 +15,7 @@ from openstack import resource
class Extension(resource.Resource):
"""Extension"""
resources_key = "extensions"
base_path = "/extensions"

View File

@ -58,10 +58,14 @@ class GroupSnapshot(resource.Resource):
microversion = session.default_microversion
else:
microversion = utils.maximum_supported_microversion(
session, self._max_microversion,
session,
self._max_microversion,
)
response = session.post(
url, json=body, headers=headers, microversion=microversion,
url,
json=body,
headers=headers,
microversion=microversion,
)
exceptions.raise_from_response(response)
return response

View File

@ -71,7 +71,9 @@ class GroupType(resource.Resource):
url = utils.urljoin(GroupType.base_path, self.id, 'group_specs')
microversion = self._get_microversion(session, action='create')
response = session.post(
url, json={'group_specs': specs}, microversion=microversion,
url,
json={'group_specs': specs},
microversion=microversion,
)
exceptions.raise_from_response(response)
specs = response.json().get('group_specs', {})

View File

@ -17,19 +17,22 @@ class AbsoluteLimit(resource.Resource):
#: Properties
#: The maximum total amount of backups, in gibibytes (GiB).
max_total_backup_gigabytes = resource.Body(
"maxTotalBackupGigabytes", type=int)
"maxTotalBackupGigabytes", type=int
)
#: The maximum number of backups.
max_total_backups = resource.Body("maxTotalBackups", type=int)
#: The maximum number of snapshots.
max_total_snapshots = resource.Body("maxTotalSnapshots", type=int)
#: The maximum total amount of volumes, in gibibytes (GiB).
max_total_volume_gigabytes = resource.Body(
"maxTotalVolumeGigabytes", type=int)
"maxTotalVolumeGigabytes", type=int
)
#: The maximum number of volumes.
max_total_volumes = resource.Body("maxTotalVolumes", type=int)
#: The total number of backups gibibytes (GiB) used.
total_backup_gigabytes_used = resource.Body(
"totalBackupGigabytesUsed", type=int)
"totalBackupGigabytesUsed", type=int
)
#: The total number of backups used.
total_backups_used = resource.Body("totalBackupsUsed", type=int)
#: The total number of gibibytes (GiB) used.

View File

@ -15,6 +15,7 @@ from openstack import resource
class ResourceFilter(resource.Resource):
"""Resource Filter"""
resources_key = "resource_filters"
base_path = "/resource_filters"

View File

@ -23,8 +23,8 @@ class Snapshot(resource.Resource, metadata.MetadataMixin):
base_path = "/snapshots"
_query_mapping = resource.QueryParameters(
'name', 'status', 'volume_id',
'project_id', all_projects='all_tenants')
'name', 'status', 'volume_id', 'project_id', all_projects='all_tenants'
)
# capabilities
allow_fetch = True
@ -58,28 +58,25 @@ class Snapshot(resource.Resource, metadata.MetadataMixin):
def _action(self, session, body, microversion=None):
"""Preform backup actions given the message body."""
url = utils.urljoin(self.base_path, self.id, 'action')
resp = session.post(url, json=body,
microversion=self._max_microversion)
resp = session.post(
url, json=body, microversion=self._max_microversion
)
exceptions.raise_from_response(resp)
return resp
def force_delete(self, session):
"""Force snapshot deletion.
"""
"""Force snapshot deletion."""
body = {'os-force_delete': {}}
self._action(session, body)
def reset(self, session, status):
"""Reset the status of the snapshot.
"""
"""Reset the status of the snapshot."""
body = {'os-reset_status': {'status': status}}
self._action(session, body)
def set_status(self, session, status, progress=None):
"""Update fields related to the status of a snapshot.
"""
body = {'os-update_snapshot_status': {
'status': status}}
"""Update fields related to the status of a snapshot."""
body = {'os-update_snapshot_status': {'status': status}}
if progress is not None:
body['os-update_snapshot_status']['progress'] = progress
self._action(session, body)

View File

@ -37,13 +37,13 @@ class Type(resource.Resource):
#: a private volume-type. *Type: bool*
is_public = resource.Body('os-volume-type-access:is_public', type=bool)
def _extra_specs(self, method, key=None, delete=False,
extra_specs=None):
def _extra_specs(self, method, key=None, delete=False, extra_specs=None):
extra_specs = extra_specs or {}
for k, v in extra_specs.items():
if not isinstance(v, str):
raise ValueError("The value for %s (%s) must be "
"a text string" % (k, v))
raise ValueError(
"The value for %s (%s) must be " "a text string" % (k, v)
)
if key is not None:
url = utils.urljoin(self.base_path, self.id, "extra_specs", key)

View File

@ -23,8 +23,13 @@ class Volume(resource.Resource, metadata.MetadataMixin):
base_path = "/volumes"
_query_mapping = resource.QueryParameters(
'name', 'status', 'project_id', 'created_at', 'updated_at',
all_projects='all_tenants')
'name',
'status',
'project_id',
'created_at',
'updated_at',
all_projects='all_tenants',
)
# capabilities
allow_fetch = True
@ -48,7 +53,8 @@ class Volume(resource.Resource, metadata.MetadataMixin):
description = resource.Body("description")
#: Extended replication status on this volume.
extended_replication_status = resource.Body(
"os-volume-replication:extended_status")
"os-volume-replication:extended_status"
)
#: The ID of the group that the volume belongs to.
group_id = resource.Body("group_id")
#: The volume's current back-end.
@ -73,7 +79,8 @@ class Volume(resource.Resource, metadata.MetadataMixin):
project_id = resource.Body("os-vol-tenant-attr:tenant_id")
#: Data set by the replication driver
replication_driver_data = resource.Body(
"os-volume-replication:driver_data")
"os-volume-replication:driver_data"
)
#: Status of replication on this volume.
replication_status = resource.Body("replication_status")
#: Scheduler hints for the volume
@ -108,8 +115,9 @@ class Volume(resource.Resource, metadata.MetadataMixin):
# as both Volume and VolumeDetail instances can be acted on, but
# the URL used is sans any additional /detail/ part.
url = utils.urljoin(Volume.base_path, self.id, 'action')
resp = session.post(url, json=body,
microversion=self._max_microversion)
resp = session.post(
url, json=body, microversion=self._max_microversion
)
exceptions.raise_from_response(resp)
return resp
@ -128,15 +136,15 @@ class Volume(resource.Resource, metadata.MetadataMixin):
body = {'os-update_readonly_flag': {'readonly': readonly}}
self._action(session, body)
def reset_status(
self, session, status, attach_status, migration_status
):
def reset_status(self, session, status, attach_status, migration_status):
"""Reset volume statuses (admin operation)"""
body = {'os-reset_status': {
'status': status,
'attach_status': attach_status,
'migration_status': migration_status
}}
body = {
'os-reset_status': {
'status': status,
'attach_status': attach_status,
'migration_status': migration_status,
}
}
self._action(session, body)
def revert_to_snapshot(self, session, snapshot_id):
@ -145,12 +153,9 @@ class Volume(resource.Resource, metadata.MetadataMixin):
body = {'revert': {'snapshot_id': snapshot_id}}
self._action(session, body)
def attach(
self, session, mountpoint, instance=None, host_name=None
):
def attach(self, session, mountpoint, instance=None, host_name=None):
"""Attach volume to server"""
body = {'os-attach': {
'mountpoint': mountpoint}}
body = {'os-attach': {'mountpoint': mountpoint}}
if instance is not None:
body['os-attach']['instance_uuid'] = instance
@ -158,7 +163,8 @@ class Volume(resource.Resource, metadata.MetadataMixin):
body['os-attach']['host_name'] = host_name
else:
raise ValueError(
'Either instance_uuid or host_name must be specified')
'Either instance_uuid or host_name must be specified'
)
self._action(session, body)
@ -167,8 +173,7 @@ class Volume(resource.Resource, metadata.MetadataMixin):
if not force:
body = {'os-detach': {'attachment_id': attachment}}
if force:
body = {'os-force_detach': {
'attachment_id': attachment}}
body = {'os-force_detach': {'attachment_id': attachment}}
if connector:
body['os-force_detach']['connector'] = connector
@ -182,16 +187,19 @@ class Volume(resource.Resource, metadata.MetadataMixin):
def retype(self, session, new_type, migration_policy=None):
"""Change volume type"""
body = {'os-retype': {
'new_type': new_type}}
body = {'os-retype': {'new_type': new_type}}
if migration_policy:
body['os-retype']['migration_policy'] = migration_policy
self._action(session, body)
def migrate(
self, session, host=None, force_host_copy=False,
lock_volume=False, cluster=None
self,
session,
host=None,
force_host_copy=False,
lock_volume=False,
cluster=None,
):
"""Migrate volume"""
req = dict()
@ -210,9 +218,12 @@ class Volume(resource.Resource, metadata.MetadataMixin):
def complete_migration(self, session, new_volume_id, error=False):
"""Complete volume migration"""
body = {'os-migrate_volume_completion': {
'new_volume': new_volume_id,
'error': error}}
body = {
'os-migrate_volume_completion': {
'new_volume': new_volume_id,
'error': error,
}
}
self._action(session, body)
@ -223,8 +234,14 @@ class Volume(resource.Resource, metadata.MetadataMixin):
self._action(session, body)
def upload_to_image(
self, session, image_name, force=False, disk_format=None,
container_format=None, visibility=None, protected=None
self,
session,
image_name,
force=False,
disk_format=None,
container_format=None,
visibility=None,
protected=None,
):
"""Upload the volume to image service"""
req = dict(image_name=image_name, force=force)

View File

@ -16,7 +16,6 @@ from openstack.tests.functional.block_storage.v2 import base
class TestBackup(base.BaseBlockStorageTest):
def setUp(self):
super(TestBackup, self).setUp()
@ -29,37 +28,39 @@ class TestBackup(base.BaseBlockStorageTest):
self.BACKUP_ID = None
volume = self.user_cloud.block_storage.create_volume(
name=self.VOLUME_NAME,
size=1)
name=self.VOLUME_NAME, size=1
)
self.user_cloud.block_storage.wait_for_status(
volume,
status='available',
failures=['error'],
interval=5,
wait=self._wait_for_timeout)
wait=self._wait_for_timeout,
)
assert isinstance(volume, _volume.Volume)
self.VOLUME_ID = volume.id
backup = self.user_cloud.block_storage.create_backup(
name=self.BACKUP_NAME,
volume_id=volume.id)
name=self.BACKUP_NAME, volume_id=volume.id
)
self.user_cloud.block_storage.wait_for_status(
backup,
status='available',
failures=['error'],
interval=5,
wait=self._wait_for_timeout)
wait=self._wait_for_timeout,
)
assert isinstance(backup, _backup.Backup)
self.assertEqual(self.BACKUP_NAME, backup.name)
self.BACKUP_ID = backup.id
def tearDown(self):
sot = self.user_cloud.block_storage.delete_backup(
self.BACKUP_ID,
ignore_missing=False)
self.BACKUP_ID, ignore_missing=False
)
sot = self.user_cloud.block_storage.delete_volume(
self.VOLUME_ID,
ignore_missing=False)
self.VOLUME_ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestBackup, self).tearDown()

View File

@ -17,7 +17,6 @@ from openstack.tests.functional.block_storage.v2 import base
class TestSnapshot(base.BaseBlockStorageTest):
def setUp(self):
super(TestSnapshot, self).setUp()
@ -27,26 +26,28 @@ class TestSnapshot(base.BaseBlockStorageTest):
self.VOLUME_ID = None
volume = self.user_cloud.block_storage.create_volume(
name=self.VOLUME_NAME,
size=1)
name=self.VOLUME_NAME, size=1
)
self.user_cloud.block_storage.wait_for_status(
volume,
status='available',
failures=['error'],
interval=2,
wait=self._wait_for_timeout)
wait=self._wait_for_timeout,
)
assert isinstance(volume, _volume.Volume)
self.assertEqual(self.VOLUME_NAME, volume.name)
self.VOLUME_ID = volume.id
snapshot = self.user_cloud.block_storage.create_snapshot(
name=self.SNAPSHOT_NAME,
volume_id=self.VOLUME_ID)
name=self.SNAPSHOT_NAME, volume_id=self.VOLUME_ID
)
self.user_cloud.block_storage.wait_for_status(
snapshot,
status='available',
failures=['error'],
interval=2,
wait=self._wait_for_timeout)
wait=self._wait_for_timeout,
)
assert isinstance(snapshot, _snapshot.Snapshot)
self.assertEqual(self.SNAPSHOT_NAME, snapshot.name)
self.SNAPSHOT_ID = snapshot.id
@ -54,12 +55,15 @@ class TestSnapshot(base.BaseBlockStorageTest):
def tearDown(self):
snapshot = self.user_cloud.block_storage.get_snapshot(self.SNAPSHOT_ID)
sot = self.user_cloud.block_storage.delete_snapshot(
snapshot, ignore_missing=False)
snapshot, ignore_missing=False
)
self.user_cloud.block_storage.wait_for_delete(
snapshot, interval=2, wait=self._wait_for_timeout)
snapshot, interval=2, wait=self._wait_for_timeout
)
self.assertIsNone(sot)
sot = self.user_cloud.block_storage.delete_volume(
self.VOLUME_ID, ignore_missing=False)
self.VOLUME_ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestSnapshot, self).tearDown()

View File

@ -16,7 +16,6 @@ from openstack.tests.functional.block_storage.v2 import base
class TestStats(base.BaseBlockStorageTest):
def setUp(self):
super(TestStats, self).setUp()
@ -25,16 +24,28 @@ class TestStats(base.BaseBlockStorageTest):
self.assertIsInstance(pool, _stats.Pools)
def test_list(self):
capList = ['volume_backend_name', 'storage_protocol',
'free_capacity_gb', 'driver_version',
'goodness_function', 'QoS_support',
'vendor_name', 'pool_name', 'thin_provisioning_support',
'thick_provisioning_support', 'timestamp',
'max_over_subscription_ratio', 'total_volumes',
'total_capacity_gb', 'filter_function',
'multiattach', 'provisioned_capacity_gb',
'allocated_capacity_gb', 'reserved_percentage',
'location_info']
capList = [
'volume_backend_name',
'storage_protocol',
'free_capacity_gb',
'driver_version',
'goodness_function',
'QoS_support',
'vendor_name',
'pool_name',
'thin_provisioning_support',
'thick_provisioning_support',
'timestamp',
'max_over_subscription_ratio',
'total_volumes',
'total_capacity_gb',
'filter_function',
'multiattach',
'provisioned_capacity_gb',
'allocated_capacity_gb',
'reserved_percentage',
'location_info',
]
capList.sort()
pools = self.operator_cloud.block_storage.backend_pools()
for pool in pools:

View File

@ -16,7 +16,6 @@ from openstack.tests.functional.block_storage.v2 import base
class TestType(base.BaseBlockStorageTest):
def setUp(self):
super(TestType, self).setUp()
@ -24,14 +23,16 @@ class TestType(base.BaseBlockStorageTest):
self.TYPE_ID = None
sot = self.operator_cloud.block_storage.create_type(
name=self.TYPE_NAME)
name=self.TYPE_NAME
)
assert isinstance(sot, _type.Type)
self.assertEqual(self.TYPE_NAME, sot.name)
self.TYPE_ID = sot.id
def tearDown(self):
sot = self.operator_cloud.block_storage.delete_type(
self.TYPE_ID, ignore_missing=False)
self.TYPE_ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestType, self).tearDown()

View File

@ -15,7 +15,6 @@ from openstack.tests.functional.block_storage.v2 import base
class TestVolume(base.BaseBlockStorageTest):
def setUp(self):
super(TestVolume, self).setUp()
@ -26,22 +25,23 @@ class TestVolume(base.BaseBlockStorageTest):
self.VOLUME_ID = None
volume = self.user_cloud.block_storage.create_volume(
name=self.VOLUME_NAME,
size=1)
name=self.VOLUME_NAME, size=1
)
self.user_cloud.block_storage.wait_for_status(
volume,
status='available',
failures=['error'],
interval=2,
wait=self._wait_for_timeout)
wait=self._wait_for_timeout,
)
assert isinstance(volume, _volume.Volume)
self.assertEqual(self.VOLUME_NAME, volume.name)
self.VOLUME_ID = volume.id
def tearDown(self):
sot = self.user_cloud.block_storage.delete_volume(
self.VOLUME_ID,
ignore_missing=False)
self.VOLUME_ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestVolume, self).tearDown()

View File

@ -15,7 +15,6 @@ from openstack.tests.functional import base
class TestAvailabilityZone(base.BaseFunctionalTest):
def test_list(self):
availability_zones = list(self.conn.block_storage.availability_zones())
self.assertGreater(len(availability_zones), 0)

View File

@ -16,7 +16,6 @@ from openstack.tests.functional.block_storage.v3 import base
class TestBackup(base.BaseBlockStorageTest):
def setUp(self):
super(TestBackup, self).setUp()
@ -29,38 +28,39 @@ class TestBackup(base.BaseBlockStorageTest):
self.BACKUP_ID = None
volume = self.user_cloud.block_storage.create_volume(
name=self.VOLUME_NAME,
size=1)
name=self.VOLUME_NAME, size=1
)
self.user_cloud.block_storage.wait_for_status(
volume,
status='available',
failures=['error'],
interval=5,
wait=self._wait_for_timeout)
wait=self._wait_for_timeout,
)
assert isinstance(volume, _volume.Volume)
self.VOLUME_ID = volume.id
backup = self.user_cloud.block_storage.create_backup(
name=self.BACKUP_NAME,
volume_id=volume.id,
is_incremental=False)
name=self.BACKUP_NAME, volume_id=volume.id, is_incremental=False
)
self.user_cloud.block_storage.wait_for_status(
backup,
status='available',
failures=['error'],
interval=5,
wait=self._wait_for_timeout)
wait=self._wait_for_timeout,
)
assert isinstance(backup, _backup.Backup)
self.assertEqual(self.BACKUP_NAME, backup.name)
self.BACKUP_ID = backup.id
def tearDown(self):
sot = self.user_cloud.block_storage.delete_backup(
self.BACKUP_ID,
ignore_missing=False)
self.BACKUP_ID, ignore_missing=False
)
sot = self.user_cloud.block_storage.delete_volume(
self.VOLUME_ID,
ignore_missing=False)
self.VOLUME_ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestBackup, self).tearDown()
@ -73,31 +73,34 @@ class TestBackup(base.BaseBlockStorageTest):
metadata_backup = self.user_cloud.block_storage.create_backup(
name=self.getUniqueString(),
volume_id=self.VOLUME_ID,
metadata=dict(foo="bar"))
metadata=dict(foo="bar"),
)
self.user_cloud.block_storage.wait_for_status(
metadata_backup,
status='available',
failures=['error'],
interval=5,
wait=self._wait_for_timeout)
wait=self._wait_for_timeout,
)
self.user_cloud.block_storage.delete_backup(
metadata_backup.id,
ignore_missing=False)
metadata_backup.id, ignore_missing=False
)
def test_create_incremental(self):
incremental_backup = self.user_cloud.block_storage.create_backup(
name=self.getUniqueString(),
volume_id=self.VOLUME_ID,
is_incremental=True)
is_incremental=True,
)
self.user_cloud.block_storage.wait_for_status(
incremental_backup,
status='available',
failures=['error'],
interval=5,
wait=self._wait_for_timeout)
wait=self._wait_for_timeout,
)
self.assertEqual(True, incremental_backup.is_incremental)
self.user_cloud.block_storage.delete_backup(
incremental_backup.id,
ignore_missing=False)
self.user_cloud.block_storage.wait_for_delete(
incremental_backup)
incremental_backup.id, ignore_missing=False
)
self.user_cloud.block_storage.wait_for_delete(incremental_backup)

View File

@ -15,10 +15,10 @@ from openstack.tests.functional.block_storage.v3 import base
class TestCapabilities(base.BaseBlockStorageTest):
def test_get(self):
response = (
proxy._json_response(self.conn.block_storage.get('/os-hosts')))
response = proxy._json_response(
self.conn.block_storage.get('/os-hosts')
)
host = response['hosts'][0]['host_name']
sot = self.conn.block_storage.get_capabilities(host)

View File

@ -14,7 +14,6 @@ from openstack.tests.functional.block_storage.v3 import base
class Extensions(base.BaseBlockStorageTest):
def test_get(self):
extensions = list(self.conn.block_storage.extensions())

View File

@ -15,7 +15,6 @@ from openstack.tests.functional.block_storage.v3 import base
class TestLimits(base.BaseBlockStorageTest):
def test_get(self):
sot = self.conn.block_storage.get_limits()
self.assertIsNotNone(sot.absolute.max_total_backup_gigabytes)

View File

@ -15,7 +15,6 @@ from openstack.tests.functional.block_storage.v3 import base
class ResourceFilters(base.BaseBlockStorageTest):
def test_get(self):
resource_filters = list(self.conn.block_storage.resource_filters())

View File

@ -17,7 +17,6 @@ from openstack.tests.functional.block_storage.v3 import base
class TestSnapshot(base.BaseBlockStorageTest):
def setUp(self):
super(TestSnapshot, self).setUp()
@ -27,26 +26,28 @@ class TestSnapshot(base.BaseBlockStorageTest):
self.VOLUME_ID = None
volume = self.user_cloud.block_storage.create_volume(
name=self.VOLUME_NAME,
size=1)
name=self.VOLUME_NAME, size=1
)
self.user_cloud.block_storage.wait_for_status(
volume,
status='available',
failures=['error'],
interval=2,
wait=self._wait_for_timeout)
wait=self._wait_for_timeout,
)
assert isinstance(volume, _volume.Volume)
self.assertEqual(self.VOLUME_NAME, volume.name)
self.VOLUME_ID = volume.id
snapshot = self.user_cloud.block_storage.create_snapshot(
name=self.SNAPSHOT_NAME,
volume_id=self.VOLUME_ID)
name=self.SNAPSHOT_NAME, volume_id=self.VOLUME_ID
)
self.user_cloud.block_storage.wait_for_status(
snapshot,
status='available',
failures=['error'],
interval=2,
wait=self._wait_for_timeout)
wait=self._wait_for_timeout,
)
assert isinstance(snapshot, _snapshot.Snapshot)
self.assertEqual(self.SNAPSHOT_NAME, snapshot.name)
self.SNAPSHOT_ID = snapshot.id
@ -54,12 +55,15 @@ class TestSnapshot(base.BaseBlockStorageTest):
def tearDown(self):
snapshot = self.user_cloud.block_storage.get_snapshot(self.SNAPSHOT_ID)
sot = self.user_cloud.block_storage.delete_snapshot(
snapshot, ignore_missing=False)
snapshot, ignore_missing=False
)
self.user_cloud.block_storage.wait_for_delete(
snapshot, interval=2, wait=self._wait_for_timeout)
snapshot, interval=2, wait=self._wait_for_timeout
)
self.assertIsNone(sot)
sot = self.user_cloud.block_storage.delete_volume(
self.VOLUME_ID, ignore_missing=False)
self.VOLUME_ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestSnapshot, self).tearDown()

View File

@ -16,7 +16,6 @@ from openstack.tests.functional.block_storage.v3 import base
class TestType(base.BaseBlockStorageTest):
def setUp(self):
super(TestType, self).setUp()
@ -26,14 +25,16 @@ class TestType(base.BaseBlockStorageTest):
self.skip("Operator cloud must be set for this test")
self._set_operator_cloud(block_storage_api_version='3')
sot = self.operator_cloud.block_storage.create_type(
name=self.TYPE_NAME)
name=self.TYPE_NAME
)
assert isinstance(sot, _type.Type)
self.assertEqual(self.TYPE_NAME, sot.name)
self.TYPE_ID = sot.id
def tearDown(self):
sot = self.operator_cloud.block_storage.delete_type(
self.TYPE_ID, ignore_missing=False)
self.TYPE_ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestType, self).tearDown()

View File

@ -15,7 +15,6 @@ from openstack.tests.functional.block_storage.v3 import base
class TestVolume(base.BaseBlockStorageTest):
def setUp(self):
super().setUp()

View File

@ -35,12 +35,11 @@ BACKUP = {
"status": "available",
"volume_id": "e5185058-943a-4cb4-96d9-72c184c337d6",
"is_incremental": True,
"has_dependent_backups": False
"has_dependent_backups": False,
}
class TestBackup(base.TestCase):
def setUp(self):
super(TestBackup, self).setUp()
self.resp = mock.Mock()
@ -75,9 +74,9 @@ class TestBackup(base.TestCase):
"sort_dir": "sort_dir",
"sort_key": "sort_key",
"status": "status",
"volume_id": "volume_id"
"volume_id": "volume_id",
},
sot._query_mapping._mapping
sot._query_mapping._mapping,
)
def test_create(self):
@ -95,8 +94,9 @@ class TestBackup(base.TestCase):
self.assertEqual(BACKUP["object_count"], sot.object_count)
self.assertEqual(BACKUP["is_incremental"], sot.is_incremental)
self.assertEqual(BACKUP["size"], sot.size)
self.assertEqual(BACKUP["has_dependent_backups"],
sot.has_dependent_backups)
self.assertEqual(
BACKUP["has_dependent_backups"], sot.has_dependent_backups
)
def test_create_incremental(self):
sot = backup.Backup(is_incremental=True)
@ -118,7 +118,7 @@ class TestBackup(base.TestCase):
}
},
microversion=None,
params={}
params={},
)
sot2.create(self.sess)
@ -131,7 +131,7 @@ class TestBackup(base.TestCase):
}
},
microversion=None,
params={}
params={},
)
def test_restore(self):
@ -164,11 +164,7 @@ class TestBackup(base.TestCase):
def test_restore_no_params(self):
sot = backup.Backup(**BACKUP)
self.assertRaises(
exceptions.SDKException,
sot.restore,
self.sess
)
self.assertRaises(exceptions.SDKException, sot.restore, self.sess)
def test_force_delete(self):
sot = backup.Backup(**BACKUP)
@ -178,7 +174,8 @@ class TestBackup(base.TestCase):
url = 'backups/%s/action' % FAKE_ID
body = {'os-force_delete': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_reset(self):
sot = backup.Backup(**BACKUP)
@ -188,4 +185,5 @@ class TestBackup(base.TestCase):
url = 'backups/%s/action' % FAKE_ID
body = {'os-reset_status': {'status': 'new_status'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)

View File

@ -23,14 +23,12 @@ from openstack.tests.unit import test_proxy_base
class TestVolumeProxy(test_proxy_base.TestProxyBase):
def setUp(self):
super().setUp()
self.proxy = _proxy.Proxy(self.session)
class TestVolume(TestVolumeProxy):
def test_volume_get(self):
self.verify_get(self.proxy.get_volume, volume.Volume)
@ -53,7 +51,7 @@ class TestVolume(TestVolumeProxy):
expected_kwargs={
"base_path": "/volumes/detail",
"all_projects": True,
}
},
)
def test_volumes_not_detailed(self):
@ -79,7 +77,7 @@ class TestVolume(TestVolumeProxy):
self.proxy.delete_volume,
method_args=["value"],
method_kwargs={"force": True},
expected_args=[self.proxy]
expected_args=[self.proxy],
)
def test_get_volume_metadata(self):
@ -88,7 +86,8 @@ class TestVolume(TestVolumeProxy):
self.proxy.get_volume_metadata,
method_args=["value"],
expected_args=[self.proxy],
expected_result=volume.Volume(id="value", metadata={}))
expected_result=volume.Volume(id="value", metadata={}),
)
def test_set_volume_metadata(self):
kwargs = {"a": "1", "b": "2"}
@ -98,12 +97,11 @@ class TestVolume(TestVolumeProxy):
self.proxy.set_volume_metadata,
method_args=[id],
method_kwargs=kwargs,
method_result=volume.Volume.existing(
id=id, metadata=kwargs),
method_result=volume.Volume.existing(id=id, metadata=kwargs),
expected_args=[self.proxy],
expected_kwargs={'metadata': kwargs},
expected_result=volume.Volume.existing(
id=id, metadata=kwargs))
expected_result=volume.Volume.existing(id=id, metadata=kwargs),
)
def test_delete_volume_metadata(self):
self._verify(
@ -111,7 +109,8 @@ class TestVolume(TestVolumeProxy):
self.proxy.delete_volume_metadata,
expected_result=None,
method_args=["value", ["key"]],
expected_args=[self.proxy, "key"])
expected_args=[self.proxy, "key"],
)
def test_backend_pools(self):
self.verify_list(self.proxy.backend_pools, stats.Pools)
@ -121,31 +120,34 @@ class TestVolume(TestVolumeProxy):
self.verify_wait_for_status(
self.proxy.wait_for_status,
method_args=[value],
expected_args=[self.proxy, value, 'available', ['error'], 2, 120])
expected_args=[self.proxy, value, 'available', ['error'], 2, 120],
)
class TestVolumeActions(TestVolumeProxy):
def test_volume_extend(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.extend",
self.proxy.extend_volume,
method_args=["value", "new-size"],
expected_args=[self.proxy, "new-size"])
expected_args=[self.proxy, "new-size"],
)
def test_volume_set_bootable(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.set_bootable_status",
self.proxy.set_volume_bootable_status,
method_args=["value", True],
expected_args=[self.proxy, True])
expected_args=[self.proxy, True],
)
def test_volume_reset_volume_status(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.reset_status",
self.proxy.reset_volume_status,
method_args=["value", '1', '2', '3'],
expected_args=[self.proxy, '1', '2', '3'])
expected_args=[self.proxy, '1', '2', '3'],
)
def test_attach_instance(self):
self._verify(
@ -153,7 +155,8 @@ class TestVolumeActions(TestVolumeProxy):
self.proxy.attach_volume,
method_args=["value", '1'],
method_kwargs={'instance': '2'},
expected_args=[self.proxy, '1', '2', None])
expected_args=[self.proxy, '1', '2', None],
)
def test_attach_host(self):
self._verify(
@ -161,60 +164,67 @@ class TestVolumeActions(TestVolumeProxy):
self.proxy.attach_volume,
method_args=["value", '1'],
method_kwargs={'host_name': '3'},
expected_args=[self.proxy, '1', None, '3'])
expected_args=[self.proxy, '1', None, '3'],
)
def test_detach_defaults(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.detach",
self.proxy.detach_volume,
method_args=["value", '1'],
expected_args=[self.proxy, '1', False, None])
expected_args=[self.proxy, '1', False, None],
)
def test_detach_force(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.detach",
self.proxy.detach_volume,
method_args=["value", '1', True, {'a': 'b'}],
expected_args=[self.proxy, '1', True, {'a': 'b'}])
expected_args=[self.proxy, '1', True, {'a': 'b'}],
)
def test_unmanage(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.unmanage",
self.proxy.unmanage_volume,
method_args=["value"],
expected_args=[self.proxy])
expected_args=[self.proxy],
)
def test_migrate_default(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.migrate",
self.proxy.migrate_volume,
method_args=["value", '1'],
expected_args=[self.proxy, '1', False, False])
expected_args=[self.proxy, '1', False, False],
)
def test_migrate_nondefault(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.migrate",
self.proxy.migrate_volume,
method_args=["value", '1', True, True],
expected_args=[self.proxy, '1', True, True])
expected_args=[self.proxy, '1', True, True],
)
def test_complete_migration(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.complete_migration",
self.proxy.complete_volume_migration,
method_args=["value", '1'],
expected_args=[self.proxy, "1", False])
expected_args=[self.proxy, "1", False],
)
def test_complete_migration_error(self):
self._verify(
"openstack.block_storage.v2.volume.Volume.complete_migration",
self.proxy.complete_volume_migration,
method_args=["value", "1", True],
expected_args=[self.proxy, "1", True])
expected_args=[self.proxy, "1", True],
)
class TestBackup(TestVolumeProxy):
def test_backups_detailed(self):
self.verify_list(
self.proxy.backups,
@ -253,7 +263,7 @@ class TestBackup(TestVolumeProxy):
self.proxy.delete_backup,
method_args=["value"],
method_kwargs={"force": True},
expected_args=[self.proxy]
expected_args=[self.proxy],
)
def test_backup_create_attrs(self):
@ -266,7 +276,7 @@ class TestBackup(TestVolumeProxy):
method_args=['volume_id'],
method_kwargs={'volume_id': 'vol_id', 'name': 'name'},
expected_args=[self.proxy],
expected_kwargs={'volume_id': 'vol_id', 'name': 'name'}
expected_kwargs={'volume_id': 'vol_id', 'name': 'name'},
)
def test_backup_reset(self):
@ -279,7 +289,6 @@ class TestBackup(TestVolumeProxy):
class TestSnapshot(TestVolumeProxy):
def test_snapshot_get(self):
self.verify_get(self.proxy.get_snapshot, snapshot.Snapshot)
@ -314,19 +323,20 @@ class TestSnapshot(TestVolumeProxy):
self.verify_create(self.proxy.create_snapshot, snapshot.Snapshot)
def test_snapshot_delete(self):
self.verify_delete(self.proxy.delete_snapshot,
snapshot.Snapshot, False)
self.verify_delete(
self.proxy.delete_snapshot, snapshot.Snapshot, False
)
def test_snapshot_delete_ignore(self):
self.verify_delete(self.proxy.delete_snapshot,
snapshot.Snapshot, True)
self.verify_delete(self.proxy.delete_snapshot, snapshot.Snapshot, True)
def test_reset(self):
self._verify(
"openstack.block_storage.v2.snapshot.Snapshot.reset",
self.proxy.reset_snapshot,
method_args=["value", "new_status"],
expected_args=[self.proxy, "new_status"])
expected_args=[self.proxy, "new_status"],
)
def test_get_snapshot_metadata(self):
self._verify(
@ -334,7 +344,8 @@ class TestSnapshot(TestVolumeProxy):
self.proxy.get_snapshot_metadata,
method_args=["value"],
expected_args=[self.proxy],
expected_result=snapshot.Snapshot(id="value", metadata={}))
expected_result=snapshot.Snapshot(id="value", metadata={}),
)
def test_set_snapshot_metadata(self):
kwargs = {"a": "1", "b": "2"}
@ -344,12 +355,11 @@ class TestSnapshot(TestVolumeProxy):
self.proxy.set_snapshot_metadata,
method_args=[id],
method_kwargs=kwargs,
method_result=snapshot.Snapshot.existing(
id=id, metadata=kwargs),
method_result=snapshot.Snapshot.existing(id=id, metadata=kwargs),
expected_args=[self.proxy],
expected_kwargs={'metadata': kwargs},
expected_result=snapshot.Snapshot.existing(
id=id, metadata=kwargs))
expected_result=snapshot.Snapshot.existing(id=id, metadata=kwargs),
)
def test_delete_snapshot_metadata(self):
self._verify(
@ -358,11 +368,11 @@ class TestSnapshot(TestVolumeProxy):
self.proxy.delete_snapshot_metadata,
expected_result=None,
method_args=["value", ["key"]],
expected_args=[self.proxy, "key"])
expected_args=[self.proxy, "key"],
)
class TestType(TestVolumeProxy):
def test_type_get(self):
self.verify_get(self.proxy.get_type, type.Type)
@ -383,25 +393,27 @@ class TestType(TestVolumeProxy):
"openstack.block_storage.v2.type.Type.get_private_access",
self.proxy.get_type_access,
method_args=["value"],
expected_args=[self.proxy])
expected_args=[self.proxy],
)
def test_type_add_private_access(self):
self._verify(
"openstack.block_storage.v2.type.Type.add_private_access",
self.proxy.add_type_access,
method_args=["value", "a"],
expected_args=[self.proxy, "a"])
expected_args=[self.proxy, "a"],
)
def test_type_remove_private_access(self):
self._verify(
"openstack.block_storage.v2.type.Type.remove_private_access",
self.proxy.remove_type_access,
method_args=["value", "a"],
expected_args=[self.proxy, "a"])
expected_args=[self.proxy, "a"],
)
class TestQuota(TestVolumeProxy):
def test_get(self):
self._verify(
'openstack.resource.Resource.fetch',
@ -414,7 +426,7 @@ class TestQuota(TestVolumeProxy):
'usage': False,
},
method_result=quota_set.QuotaSet(),
expected_result=quota_set.QuotaSet()
expected_result=quota_set.QuotaSet(),
)
def test_get_query(self):
@ -422,17 +434,14 @@ class TestQuota(TestVolumeProxy):
'openstack.resource.Resource.fetch',
self.proxy.get_quota_set,
method_args=['prj'],
method_kwargs={
'usage': True,
'user_id': 'uid'
},
method_kwargs={'usage': True, 'user_id': 'uid'},
expected_args=[self.proxy],
expected_kwargs={
'error_message': None,
'requires_id': False,
'usage': True,
'user_id': 'uid'
}
'user_id': 'uid',
},
)
def test_get_defaults(self):
@ -444,8 +453,8 @@ class TestQuota(TestVolumeProxy):
expected_kwargs={
'error_message': None,
'requires_id': False,
'base_path': '/os-quota-sets/defaults'
}
'base_path': '/os-quota-sets/defaults',
},
)
def test_reset(self):
@ -455,9 +464,7 @@ class TestQuota(TestVolumeProxy):
method_args=['prj'],
method_kwargs={'user_id': 'uid'},
expected_args=[self.proxy],
expected_kwargs={
'user_id': 'uid'
}
expected_kwargs={'user_id': 'uid'},
)
@mock.patch('openstack.proxy.Proxy._get_resource', autospec=True)
@ -473,12 +480,6 @@ class TestQuota(TestVolumeProxy):
'a': 'b',
},
expected_args=[self.proxy],
expected_kwargs={
'user_id': 'uid'
}
)
gr_mock.assert_called_with(
self.proxy,
quota_set.QuotaSet,
'qs', a='b'
expected_kwargs={'user_id': 'uid'},
)
gr_mock.assert_called_with(self.proxy, quota_set.QuotaSet, 'qs', a='b')

View File

@ -34,8 +34,7 @@ SNAPSHOT = {
DETAILS = {
"os-extended-snapshot-attributes:progress": "100%",
"os-extended-snapshot-attributes:project_id":
"0c2eba2c5af04d3f9e9d0d410b371fde"
"os-extended-snapshot-attributes:project_id": "0c2eba2c5af04d3f9e9d0d410b371fde", # noqa: E501
}
DETAILED_SNAPSHOT = SNAPSHOT.copy()
@ -43,7 +42,6 @@ DETAILED_SNAPSHOT.update(**DETAILS)
class TestSnapshot(base.TestCase):
def test_basic(self):
sot = snapshot.Snapshot(SNAPSHOT)
self.assertEqual("snapshot", sot.resource_key)
@ -55,13 +53,17 @@ class TestSnapshot(base.TestCase):
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
self.assertDictEqual({"name": "name",
"status": "status",
"all_projects": "all_tenants",
"volume_id": "volume_id",
"limit": "limit",
"marker": "marker"},
sot._query_mapping._mapping)
self.assertDictEqual(
{
"name": "name",
"status": "status",
"all_projects": "all_tenants",
"volume_id": "volume_id",
"limit": "limit",
"marker": "marker",
},
sot._query_mapping._mapping,
)
def test_create_basic(self):
sot = snapshot.Snapshot(**SNAPSHOT)
@ -77,7 +79,6 @@ class TestSnapshot(base.TestCase):
class TestSnapshotActions(base.TestCase):
def setUp(self):
super(TestSnapshotActions, self).setUp()
self.resp = mock.Mock()
@ -99,4 +100,5 @@ class TestSnapshotActions(base.TestCase):
url = 'snapshots/%s/action' % FAKE_ID
body = {'os-reset_status': {'status': 'new_status'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)

View File

@ -14,22 +14,22 @@ from openstack.block_storage.v2 import stats
from openstack.tests.unit import base
POOLS = {"name": "pool1",
"capabilities": {
"updated": "2014-10-28T00=00=00-00=00",
"total_capacity": 1024,
"free_capacity": 100,
"volume_backend_name": "pool1",
"reserved_percentage": "0",
"driver_version": "1.0.0",
"storage_protocol": "iSCSI",
"QoS_support": "false"
}
}
POOLS = {
"name": "pool1",
"capabilities": {
"updated": "2014-10-28T00=00=00-00=00",
"total_capacity": 1024,
"free_capacity": 100,
"volume_backend_name": "pool1",
"reserved_percentage": "0",
"driver_version": "1.0.0",
"storage_protocol": "iSCSI",
"QoS_support": "false",
},
}
class TestBackendPools(base.TestCase):
def setUp(self):
super(TestBackendPools, self).setUp()
@ -37,8 +37,9 @@ class TestBackendPools(base.TestCase):
sot = stats.Pools(POOLS)
self.assertEqual("", sot.resource_key)
self.assertEqual("pools", sot.resources_key)
self.assertEqual("/scheduler-stats/get_pools?detail=True",
sot.base_path)
self.assertEqual(
"/scheduler-stats/get_pools?detail=True", sot.base_path
)
self.assertFalse(sot.allow_create)
self.assertFalse(sot.allow_fetch)
self.assertFalse(sot.allow_delete)

View File

@ -19,17 +19,10 @@ from openstack.tests.unit import base
FAKE_ID = "6685584b-1eac-4da6-b5c3-555430cf68ff"
TYPE = {
"extra_specs": {
"capabilities": "gpu"
},
"id": FAKE_ID,
"name": "SSD"
}
TYPE = {"extra_specs": {"capabilities": "gpu"}, "id": FAKE_ID, "name": "SSD"}
class TestType(base.TestCase):
def setUp(self):
super(TestType, self).setUp()
self.extra_specs_result = {"extra_specs": {"go": "cubs", "boo": "sox"}}
@ -68,17 +61,20 @@ class TestType(base.TestCase):
response = mock.Mock()
response.status_code = 200
response.body = {"volume_type_access": [
{"project_id": "a", "volume_type_id": "b"}
]}
response.body = {
"volume_type_access": [{"project_id": "a", "volume_type_id": "b"}]
}
response.json = mock.Mock(return_value=response.body)
self.sess.get = mock.Mock(return_value=response)
self.assertEqual(response.body["volume_type_access"],
sot.get_private_access(self.sess))
self.assertEqual(
response.body["volume_type_access"],
sot.get_private_access(self.sess),
)
self.sess.get.assert_called_with(
"types/%s/os-volume-type-access" % sot.id)
"types/%s/os-volume-type-access" % sot.id
)
def test_add_private_access(self):
sot = type.Type(**TYPE)
@ -87,8 +83,7 @@ class TestType(base.TestCase):
url = "types/%s/action" % sot.id
body = {"addProjectAccess": {"project": "a"}}
self.sess.post.assert_called_with(
url, json=body)
self.sess.post.assert_called_with(url, json=body)
def test_remove_private_access(self):
sot = type.Type(**TYPE)
@ -97,5 +92,4 @@ class TestType(base.TestCase):
url = "types/%s/action" % sot.id
body = {"removeProjectAccess": {"project": "a"}}
self.sess.post.assert_called_with(
url, json=body)
self.sess.post.assert_called_with(url, json=body)

View File

@ -20,11 +20,13 @@ from openstack.tests.unit import base
FAKE_ID = "6685584b-1eac-4da6-b5c3-555430cf68ff"
IMAGE_METADATA = {
'container_format': 'bare',
'min_ram': '64', 'disk_format': u'qcow2',
'min_ram': '64',
'disk_format': u'qcow2',
'image_name': 'TestVM',
'image_id': '625d4f2c-cf67-4af3-afb6-c7220f766947',
'checksum': '64d7c1cd2b6f60c92c14662941cb7913',
'min_disk': '0', u'size': '13167616'
'min_disk': '0',
u'size': '13167616',
}
VOLUME = {
@ -57,14 +59,13 @@ VOLUME = {
"OS-SCH-HNT:scheduler_hints": {
"same_host": [
"a0cf03a5-d921-4877-bb5c-86d26cf818e1",
"8c19174f-4220-44f0-824a-cd1eeef10287"
"8c19174f-4220-44f0-824a-cd1eeef10287",
]
}
},
}
class TestVolume(base.TestCase):
def test_basic(self):
sot = volume.Volume(VOLUME)
self.assertEqual("volume", sot.resource_key)
@ -76,13 +77,17 @@ class TestVolume(base.TestCase):
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
self.assertDictEqual({"name": "name",
"status": "status",
"all_projects": "all_tenants",
"project_id": "project_id",
"limit": "limit",
"marker": "marker"},
sot._query_mapping._mapping)
self.assertDictEqual(
{
"name": "name",
"status": "status",
"all_projects": "all_tenants",
"project_id": "project_id",
"limit": "limit",
"marker": "marker",
},
sot._query_mapping._mapping,
)
def test_create(self):
sot = volume.Volume(**VOLUME)
@ -98,33 +103,40 @@ class TestVolume(base.TestCase):
self.assertEqual(VOLUME["snapshot_id"], sot.snapshot_id)
self.assertEqual(VOLUME["source_volid"], sot.source_volume_id)
self.assertEqual(VOLUME["metadata"], sot.metadata)
self.assertEqual(VOLUME["volume_image_metadata"],
sot.volume_image_metadata)
self.assertEqual(
VOLUME["volume_image_metadata"], sot.volume_image_metadata
)
self.assertEqual(VOLUME["size"], sot.size)
self.assertEqual(VOLUME["imageRef"], sot.image_id)
self.assertEqual(VOLUME["os-vol-host-attr:host"], sot.host)
self.assertEqual(VOLUME["os-vol-tenant-attr:tenant_id"],
sot.project_id)
self.assertEqual(VOLUME["os-vol-mig-status-attr:migstat"],
sot.migration_status)
self.assertEqual(VOLUME["os-vol-mig-status-attr:name_id"],
sot.migration_id)
self.assertEqual(VOLUME["replication_status"],
sot.replication_status)
self.assertEqual(
VOLUME["os-vol-tenant-attr:tenant_id"], sot.project_id
)
self.assertEqual(
VOLUME["os-vol-mig-status-attr:migstat"], sot.migration_status
)
self.assertEqual(
VOLUME["os-vol-mig-status-attr:name_id"], sot.migration_id
)
self.assertEqual(VOLUME["replication_status"], sot.replication_status)
self.assertEqual(
VOLUME["os-volume-replication:extended_status"],
sot.extended_replication_status)
self.assertEqual(VOLUME["consistencygroup_id"],
sot.consistency_group_id)
self.assertEqual(VOLUME["os-volume-replication:driver_data"],
sot.replication_driver_data)
self.assertDictEqual(VOLUME["OS-SCH-HNT:scheduler_hints"],
sot.scheduler_hints)
sot.extended_replication_status,
)
self.assertEqual(
VOLUME["consistencygroup_id"], sot.consistency_group_id
)
self.assertEqual(
VOLUME["os-volume-replication:driver_data"],
sot.replication_driver_data,
)
self.assertDictEqual(
VOLUME["OS-SCH-HNT:scheduler_hints"], sot.scheduler_hints
)
self.assertFalse(sot.is_encrypted)
class TestVolumeActions(TestVolume):
def setUp(self):
super(TestVolumeActions, self).setUp()
self.resp = mock.Mock()
@ -144,7 +156,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {"os-extend": {"new_size": "20"}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_set_volume_bootable(self):
sot = volume.Volume(**VOLUME)
@ -154,7 +167,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-set_bootable': {'bootable': True}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_set_volume_bootable_false(self):
sot = volume.Volume(**VOLUME)
@ -164,7 +178,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-set_bootable': {'bootable': False}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_reset_status(self):
sot = volume.Volume(**VOLUME)
@ -172,10 +187,16 @@ class TestVolumeActions(TestVolume):
self.assertIsNone(sot.reset_status(self.sess, '1', '2', '3'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-reset_status': {'status': '1', 'attach_status': '2',
'migration_status': '3'}}
body = {
'os-reset_status': {
'status': '1',
'attach_status': '2',
'migration_status': '3',
}
}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_attach_instance(self):
sot = volume.Volume(**VOLUME)
@ -185,7 +206,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-attach': {'mountpoint': '1', 'instance_uuid': '2'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_detach(self):
sot = volume.Volume(**VOLUME)
@ -195,18 +217,19 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-detach': {'attachment_id': '1'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_detach_force(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(
sot.detach(self.sess, '1', force=True))
self.assertIsNone(sot.detach(self.sess, '1', force=True))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-force_detach': {'attachment_id': '1'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_unmanage(self):
sot = volume.Volume(**VOLUME)
@ -216,7 +239,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-unmanage': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_retype(self):
sot = volume.Volume(**VOLUME)
@ -226,7 +250,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-retype': {'new_type': '1'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_retype_mp(self):
sot = volume.Volume(**VOLUME)
@ -236,7 +261,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-retype': {'new_type': '1', 'migration_policy': '2'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_migrate(self):
sot = volume.Volume(**VOLUME)
@ -246,19 +272,29 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume': {'host': '1'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_migrate_flags(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.migrate(self.sess, host='1',
force_host_copy=True, lock_volume=True))
self.assertIsNone(
sot.migrate(
self.sess, host='1', force_host_copy=True, lock_volume=True
)
)
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume': {'host': '1', 'force_host_copy': True,
'lock_volume': True}}
body = {
'os-migrate_volume': {
'host': '1',
'force_host_copy': True,
'lock_volume': True,
}
}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_complete_migration(self):
sot = volume.Volume(**VOLUME)
@ -266,22 +302,27 @@ class TestVolumeActions(TestVolume):
self.assertIsNone(sot.complete_migration(self.sess, new_volume_id='1'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume_completion': {'new_volume': '1', 'error':
False}}
body = {
'os-migrate_volume_completion': {'new_volume': '1', 'error': False}
}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_complete_migration_error(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.complete_migration(
self.sess, new_volume_id='1', error=True))
self.assertIsNone(
sot.complete_migration(self.sess, new_volume_id='1', error=True)
)
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume_completion': {'new_volume': '1', 'error':
True}}
body = {
'os-migrate_volume_completion': {'new_volume': '1', 'error': True}
}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_force_delete(self):
sot = volume.Volume(**VOLUME)
@ -291,4 +332,5 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-force_delete': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)

View File

@ -17,15 +17,12 @@ from openstack.tests.unit import base
IDENTIFIER = 'IDENTIFIER'
EXAMPLE = {
"id": IDENTIFIER,
"zoneState": {
"available": True
},
"zoneName": "zone1"
"zoneState": {"available": True},
"zoneName": "zone1",
}
class TestAvailabilityZone(base.TestCase):
def test_basic(self):
sot = az.AvailabilityZone()
self.assertEqual('availabilityZoneInfo', sot.resources_key)

View File

@ -39,12 +39,11 @@ BACKUP = {
"has_dependent_backups": False,
"os-backup-project-attr:project_id": "2c67a14be9314c5dae2ee6c4ec90cf0b",
"user_id": "515ba0dd59f84f25a6a084a45d8d93b2",
"metadata": {"key": "value"}
"metadata": {"key": "value"},
}
class TestBackup(base.TestCase):
def setUp(self):
super(TestBackup, self).setUp()
self.resp = mock.Mock()
@ -80,9 +79,9 @@ class TestBackup(base.TestCase):
"sort_dir": "sort_dir",
"sort_key": "sort_key",
"status": "status",
"volume_id": "volume_id"
"volume_id": "volume_id",
},
sot._query_mapping._mapping
sot._query_mapping._mapping,
)
def test_create(self):
@ -100,10 +99,12 @@ class TestBackup(base.TestCase):
self.assertEqual(BACKUP["object_count"], sot.object_count)
self.assertEqual(BACKUP["is_incremental"], sot.is_incremental)
self.assertEqual(BACKUP["size"], sot.size)
self.assertEqual(BACKUP["has_dependent_backups"],
sot.has_dependent_backups)
self.assertEqual(BACKUP['os-backup-project-attr:project_id'],
sot.project_id)
self.assertEqual(
BACKUP["has_dependent_backups"], sot.has_dependent_backups
)
self.assertEqual(
BACKUP['os-backup-project-attr:project_id'], sot.project_id
)
self.assertEqual(BACKUP['metadata'], sot.metadata)
self.assertEqual(BACKUP['user_id'], sot.user_id)
self.assertEqual(BACKUP['encryption_key_id'], sot.encryption_key_id)
@ -128,7 +129,7 @@ class TestBackup(base.TestCase):
}
},
microversion="3.64",
params={}
params={},
)
sot2.create(self.sess)
@ -141,7 +142,7 @@ class TestBackup(base.TestCase):
}
},
microversion="3.64",
params={}
params={},
)
def test_restore(self):
@ -174,11 +175,7 @@ class TestBackup(base.TestCase):
def test_restore_no_params(self):
sot = backup.Backup(**BACKUP)
self.assertRaises(
exceptions.SDKException,
sot.restore,
self.sess
)
self.assertRaises(exceptions.SDKException, sot.restore, self.sess)
def test_force_delete(self):
sot = backup.Backup(**BACKUP)
@ -188,7 +185,8 @@ class TestBackup(base.TestCase):
url = 'backups/%s/action' % FAKE_ID
body = {'os-force_delete': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_reset(self):
sot = backup.Backup(**BACKUP)
@ -198,4 +196,5 @@ class TestBackup(base.TestCase):
url = 'backups/%s/action' % FAKE_ID
body = {'os-reset_status': {'status': 'new_status'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)

View File

@ -28,29 +28,28 @@ CAPABILITIES = {
"compression": {
"title": "Compression",
"description": "Enables compression.",
"type": "boolean"
"type": "boolean",
},
"qos": {
"title": "QoS",
"description": "Enables QoS.",
"type": "boolean"
"type": "boolean",
},
"replication": {
"title": "Replication",
"description": "Enables replication.",
"type": "boolean"
"type": "boolean",
},
"thin_provisioning": {
"title": "Thin Provisioning",
"description": "Sets thin provisioning.",
"type": "boolean"
}
}
"type": "boolean",
},
},
}
class TestCapabilites(base.TestCase):
def test_basic(self):
capabilities_resource = capabilities.Capabilities()
self.assertEqual(None, capabilities_resource.resource_key)
@ -65,28 +64,39 @@ class TestCapabilites(base.TestCase):
def test_make_capabilities(self):
capabilities_resource = capabilities.Capabilities(**CAPABILITIES)
self.assertEqual(
CAPABILITIES["description"], capabilities_resource.description)
CAPABILITIES["description"], capabilities_resource.description
)
self.assertEqual(
CAPABILITIES["display_name"], capabilities_resource.display_name)
CAPABILITIES["display_name"], capabilities_resource.display_name
)
self.assertEqual(
CAPABILITIES["driver_version"],
capabilities_resource.driver_version)
capabilities_resource.driver_version,
)
self.assertEqual(
CAPABILITIES["namespace"], capabilities_resource.namespace)
CAPABILITIES["namespace"], capabilities_resource.namespace
)
self.assertEqual(
CAPABILITIES["pool_name"], capabilities_resource.pool_name)
CAPABILITIES["pool_name"], capabilities_resource.pool_name
)
self.assertEqual(
CAPABILITIES["properties"], capabilities_resource.properties)
CAPABILITIES["properties"], capabilities_resource.properties
)
self.assertEqual(
CAPABILITIES["replication_targets"],
capabilities_resource.replication_targets)
capabilities_resource.replication_targets,
)
self.assertEqual(
CAPABILITIES["storage_protocol"],
capabilities_resource.storage_protocol)
capabilities_resource.storage_protocol,
)
self.assertEqual(
CAPABILITIES["vendor_name"], capabilities_resource.vendor_name)
CAPABILITIES["vendor_name"], capabilities_resource.vendor_name
)
self.assertEqual(
CAPABILITIES["visibility"], capabilities_resource.visibility)
CAPABILITIES["visibility"], capabilities_resource.visibility
)
self.assertEqual(
CAPABILITIES["volume_backend_name"],
capabilities_resource.volume_backend_name)
capabilities_resource.volume_backend_name,
)

View File

@ -22,7 +22,6 @@ EXTENSION = {
class TestExtension(base.TestCase):
def test_basic(self):
extension_resource = extension.Extension()
self.assertEqual('extensions', extension_resource.resources_key)
@ -36,6 +35,7 @@ class TestExtension(base.TestCase):
def test_make_extension(self):
extension_resource = extension.Extension(**EXTENSION)
self.assertEqual(EXTENSION['alias'], extension_resource.alias)
self.assertEqual(EXTENSION['description'],
extension_resource.description)
self.assertEqual(
EXTENSION['description'], extension_resource.description
)
self.assertEqual(EXTENSION['updated'], extension_resource.updated)

View File

@ -32,12 +32,11 @@ GROUP = {
"volumes": ["a2cdf1ad-5497-4e57-bd7d-f573768f3d03"],
"group_snapshot_id": None,
"source_group_id": None,
"project_id": "7ccf4863071f44aeb8f141f65780c51b"
"project_id": "7ccf4863071f44aeb8f141f65780c51b",
}
class TestGroup(base.TestCase):
def test_basic(self):
resource = group.Group()
self.assertEqual("group", resource.resource_key)
@ -54,7 +53,8 @@ class TestGroup(base.TestCase):
self.assertEqual(GROUP["id"], resource.id)
self.assertEqual(GROUP["status"], resource.status)
self.assertEqual(
GROUP["availability_zone"], resource.availability_zone)
GROUP["availability_zone"], resource.availability_zone
)
self.assertEqual(GROUP["created_at"], resource.created_at)
self.assertEqual(GROUP["name"], resource.name)
self.assertEqual(GROUP["description"], resource.description)
@ -62,13 +62,13 @@ class TestGroup(base.TestCase):
self.assertEqual(GROUP["volume_types"], resource.volume_types)
self.assertEqual(GROUP["volumes"], resource.volumes)
self.assertEqual(
GROUP["group_snapshot_id"], resource.group_snapshot_id)
GROUP["group_snapshot_id"], resource.group_snapshot_id
)
self.assertEqual(GROUP["source_group_id"], resource.source_group_id)
self.assertEqual(GROUP["project_id"], resource.project_id)
class TestGroupAction(base.TestCase):
def setUp(self):
super().setUp()
self.resp = mock.Mock()
@ -90,7 +90,8 @@ class TestGroupAction(base.TestCase):
url = 'groups/%s/action' % GROUP_ID
body = {'delete': {'delete-volumes': False}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_reset(self):
sot = group.Group(**GROUP)
@ -100,7 +101,9 @@ class TestGroupAction(base.TestCase):
url = 'groups/%s/action' % GROUP_ID
body = {'reset_status': {'status': 'new_status'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion,
url,
json=body,
microversion=sot._max_microversion,
)
def test_create_from_source(self):
@ -131,5 +134,7 @@ class TestGroupAction(base.TestCase):
},
}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion,
url,
json=body,
microversion=sot._max_microversion,
)

View File

@ -23,7 +23,7 @@ ABSOLUTE_LIMIT = {
"maxTotalVolumes": 10,
"totalVolumesUsed": 2,
"totalBackupsUsed": 3,
"totalGigabytesUsed": 2
"totalGigabytesUsed": 2,
}
RATE_LIMIT = {
@ -31,23 +31,15 @@ RATE_LIMIT = {
"value": 80,
"remaining": 80,
"unit": "MINUTE",
"next-available": "2021-02-23T22:08:00Z"
"next-available": "2021-02-23T22:08:00Z",
}
RATE_LIMITS = {
"regex": ".*",
"uri": "*",
"limit": [RATE_LIMIT]
}
RATE_LIMITS = {"regex": ".*", "uri": "*", "limit": [RATE_LIMIT]}
LIMIT = {
"rate": [RATE_LIMITS],
"absolute": ABSOLUTE_LIMIT
}
LIMIT = {"rate": [RATE_LIMITS], "absolute": ABSOLUTE_LIMIT}
class TestAbsoluteLimit(base.TestCase):
def test_basic(self):
limit_resource = limits.AbsoluteLimit()
self.assertIsNone(limit_resource.resource_key)
@ -63,38 +55,45 @@ class TestAbsoluteLimit(base.TestCase):
limit_resource = limits.AbsoluteLimit(**ABSOLUTE_LIMIT)
self.assertEqual(
ABSOLUTE_LIMIT['totalSnapshotsUsed'],
limit_resource.total_snapshots_used)
limit_resource.total_snapshots_used,
)
self.assertEqual(
ABSOLUTE_LIMIT['maxTotalBackups'],
limit_resource.max_total_backups)
ABSOLUTE_LIMIT['maxTotalBackups'], limit_resource.max_total_backups
)
self.assertEqual(
ABSOLUTE_LIMIT['maxTotalVolumeGigabytes'],
limit_resource.max_total_volume_gigabytes)
limit_resource.max_total_volume_gigabytes,
)
self.assertEqual(
ABSOLUTE_LIMIT['maxTotalSnapshots'],
limit_resource.max_total_snapshots)
limit_resource.max_total_snapshots,
)
self.assertEqual(
ABSOLUTE_LIMIT['maxTotalBackupGigabytes'],
limit_resource.max_total_backup_gigabytes)
limit_resource.max_total_backup_gigabytes,
)
self.assertEqual(
ABSOLUTE_LIMIT['totalBackupGigabytesUsed'],
limit_resource.total_backup_gigabytes_used)
limit_resource.total_backup_gigabytes_used,
)
self.assertEqual(
ABSOLUTE_LIMIT['maxTotalVolumes'],
limit_resource.max_total_volumes)
ABSOLUTE_LIMIT['maxTotalVolumes'], limit_resource.max_total_volumes
)
self.assertEqual(
ABSOLUTE_LIMIT['totalVolumesUsed'],
limit_resource.total_volumes_used)
limit_resource.total_volumes_used,
)
self.assertEqual(
ABSOLUTE_LIMIT['totalBackupsUsed'],
limit_resource.total_backups_used)
limit_resource.total_backups_used,
)
self.assertEqual(
ABSOLUTE_LIMIT['totalGigabytesUsed'],
limit_resource.total_gigabytes_used)
limit_resource.total_gigabytes_used,
)
class TestRateLimit(base.TestCase):
def test_basic(self):
limit_resource = limits.RateLimit()
self.assertIsNone(limit_resource.resource_key)
@ -113,11 +112,11 @@ class TestRateLimit(base.TestCase):
self.assertEqual(RATE_LIMIT['remaining'], limit_resource.remaining)
self.assertEqual(RATE_LIMIT['unit'], limit_resource.unit)
self.assertEqual(
RATE_LIMIT['next-available'], limit_resource.next_available)
RATE_LIMIT['next-available'], limit_resource.next_available
)
class TestRateLimits(base.TestCase):
def test_basic(self):
limit_resource = limits.RateLimits()
self.assertIsNone(limit_resource.resource_key)
@ -135,7 +134,8 @@ class TestRateLimits(base.TestCase):
self.assertEqual(expected[0]['remaining'], actual[0].remaining)
self.assertEqual(expected[0]['unit'], actual[0].unit)
self.assertEqual(
expected[0]['next-available'], actual[0].next_available)
expected[0]['next-available'], actual[0].next_available
)
def test_make_rate_limits(self):
limit_resource = limits.RateLimits(**RATE_LIMITS)
@ -145,7 +145,6 @@ class TestRateLimits(base.TestCase):
class TestLimit(base.TestCase):
def test_basic(self):
limit_resource = limits.Limit()
self.assertEqual('limits', limit_resource.resource_key)
@ -158,28 +157,34 @@ class TestLimit(base.TestCase):
def _test_absolute_limit(self, expected, actual):
self.assertEqual(
expected['totalSnapshotsUsed'], actual.total_snapshots_used)
self.assertEqual(
expected['maxTotalBackups'], actual.max_total_backups)
expected['totalSnapshotsUsed'], actual.total_snapshots_used
)
self.assertEqual(expected['maxTotalBackups'], actual.max_total_backups)
self.assertEqual(
expected['maxTotalVolumeGigabytes'],
actual.max_total_volume_gigabytes)
actual.max_total_volume_gigabytes,
)
self.assertEqual(
expected['maxTotalSnapshots'], actual.max_total_snapshots)
expected['maxTotalSnapshots'], actual.max_total_snapshots
)
self.assertEqual(
expected['maxTotalBackupGigabytes'],
actual.max_total_backup_gigabytes)
actual.max_total_backup_gigabytes,
)
self.assertEqual(
expected['totalBackupGigabytesUsed'],
actual.total_backup_gigabytes_used)
actual.total_backup_gigabytes_used,
)
self.assertEqual(expected['maxTotalVolumes'], actual.max_total_volumes)
self.assertEqual(
expected['maxTotalVolumes'], actual.max_total_volumes)
expected['totalVolumesUsed'], actual.total_volumes_used
)
self.assertEqual(
expected['totalVolumesUsed'], actual.total_volumes_used)
expected['totalBackupsUsed'], actual.total_backups_used
)
self.assertEqual(
expected['totalBackupsUsed'], actual.total_backups_used)
self.assertEqual(
expected['totalGigabytesUsed'], actual.total_gigabytes_used)
expected['totalGigabytesUsed'], actual.total_gigabytes_used
)
def _test_rate_limit(self, expected, actual):
self.assertEqual(expected[0]['verb'], actual[0].verb)
@ -187,7 +192,8 @@ class TestLimit(base.TestCase):
self.assertEqual(expected[0]['remaining'], actual[0].remaining)
self.assertEqual(expected[0]['unit'], actual[0].unit)
self.assertEqual(
expected[0]['next-available'], actual[0].next_available)
expected[0]['next-available'], actual[0].next_available
)
def _test_rate_limits(self, expected, actual):
self.assertEqual(expected[0]['regex'], actual[0].regex)

View File

@ -18,35 +18,29 @@ RESOURCE_FILTER = {
'status',
'image_metadata',
'bootable',
'migration_status'
'migration_status',
],
'resource': 'volume'
'resource': 'volume',
}
class TestResourceFilter(base.TestCase):
def test_basic(self):
resource = resource_filter.ResourceFilter()
self.assertEqual('resource_filters',
resource.resources_key)
self.assertEqual('/resource_filters',
resource.base_path)
self.assertEqual('resource_filters', resource.resources_key)
self.assertEqual('/resource_filters', resource.base_path)
self.assertFalse(resource.allow_create)
self.assertFalse(resource.allow_fetch)
self.assertFalse(resource.allow_commit)
self.assertFalse(resource.allow_delete)
self.assertTrue(resource.allow_list)
self.assertDictEqual({"resource": "resource",
"limit": "limit",
"marker": "marker"},
resource._query_mapping._mapping)
self.assertDictEqual(
{"resource": "resource", "limit": "limit", "marker": "marker"},
resource._query_mapping._mapping,
)
def test_make_resource_filter(self):
resource = resource_filter.ResourceFilter(
**RESOURCE_FILTER)
self.assertEqual(
RESOURCE_FILTER['filters'], resource.filters)
self.assertEqual(
RESOURCE_FILTER['resource'], resource.resource)
resource = resource_filter.ResourceFilter(**RESOURCE_FILTER)
self.assertEqual(RESOURCE_FILTER['filters'], resource.filters)
self.assertEqual(RESOURCE_FILTER['resource'], resource.resource)

View File

@ -31,13 +31,11 @@ SNAPSHOT = {
"name": "snap-001",
"force": "true",
"os-extended-snapshot-attributes:progress": "100%",
"os-extended-snapshot-attributes:project_id":
"0c2eba2c5af04d3f9e9d0d410b371fde"
"os-extended-snapshot-attributes:project_id": "0c2eba2c5af04d3f9e9d0d410b371fde", # noqa: E501
}
class TestSnapshot(base.TestCase):
def test_basic(self):
sot = snapshot.Snapshot(SNAPSHOT)
self.assertEqual("snapshot", sot.resource_key)
@ -49,14 +47,18 @@ class TestSnapshot(base.TestCase):
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
self.assertDictEqual({"name": "name",
"status": "status",
"all_projects": "all_tenants",
"project_id": "project_id",
"volume_id": "volume_id",
"limit": "limit",
"marker": "marker"},
sot._query_mapping._mapping)
self.assertDictEqual(
{
"name": "name",
"status": "status",
"all_projects": "all_tenants",
"project_id": "project_id",
"volume_id": "volume_id",
"limit": "limit",
"marker": "marker",
},
sot._query_mapping._mapping,
)
def test_create_basic(self):
sot = snapshot.Snapshot(**SNAPSHOT)
@ -69,16 +71,16 @@ class TestSnapshot(base.TestCase):
self.assertEqual(SNAPSHOT["size"], sot.size)
self.assertEqual(SNAPSHOT["name"], sot.name)
self.assertEqual(
SNAPSHOT["os-extended-snapshot-attributes:progress"],
sot.progress)
SNAPSHOT["os-extended-snapshot-attributes:progress"], sot.progress
)
self.assertEqual(
SNAPSHOT["os-extended-snapshot-attributes:project_id"],
sot.project_id)
sot.project_id,
)
self.assertTrue(sot.is_forced)
class TestSnapshotActions(base.TestCase):
def setUp(self):
super(TestSnapshotActions, self).setUp()
self.resp = mock.Mock()
@ -100,7 +102,8 @@ class TestSnapshotActions(base.TestCase):
url = 'snapshots/%s/action' % FAKE_ID
body = {'os-force_delete': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_reset(self):
sot = snapshot.Snapshot(**SNAPSHOT)
@ -110,7 +113,8 @@ class TestSnapshotActions(base.TestCase):
url = 'snapshots/%s/action' % FAKE_ID
body = {'os-reset_status': {'status': 'new_status'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_set_status(self):
sot = snapshot.Snapshot(**SNAPSHOT)
@ -120,4 +124,5 @@ class TestSnapshotActions(base.TestCase):
url = 'snapshots/%s/action' % FAKE_ID
body = {'os-update_snapshot_status': {'status': 'new_status'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)

View File

@ -21,9 +21,7 @@ from openstack.tests.unit import base
FAKE_ID = "6685584b-1eac-4da6-b5c3-555430cf68ff"
TYPE = {
"extra_specs": {
"capabilities": "gpu"
},
"extra_specs": {"capabilities": "gpu"},
"id": FAKE_ID,
"name": "SSD",
"description": "Test type",
@ -31,7 +29,6 @@ TYPE = {
class TestType(base.TestCase):
def setUp(self):
super(TestType, self).setUp()
self.extra_specs_result = {"extra_specs": {"go": "cubs", "boo": "sox"}}
@ -80,9 +77,11 @@ class TestType(base.TestCase):
result = sot.set_extra_specs(sess, **set_specs)
self.assertEqual(result, self.extra_specs_result["extra_specs"])
sess.post.assert_called_once_with("types/" + FAKE_ID + "/extra_specs",
headers={},
json={"extra_specs": set_specs})
sess.post.assert_called_once_with(
"types/" + FAKE_ID + "/extra_specs",
headers={},
json={"extra_specs": set_specs},
)
def test_set_extra_specs_error(self):
sess = mock.Mock()
@ -99,7 +98,8 @@ class TestType(base.TestCase):
exceptions.BadRequestException,
sot.set_extra_specs,
sess,
**set_specs)
**set_specs
)
def test_delete_extra_specs(self):
sess = mock.Mock()
@ -130,27 +130,28 @@ class TestType(base.TestCase):
key = "hey"
self.assertRaises(
exceptions.BadRequestException,
sot.delete_extra_specs,
sess,
[key])
exceptions.BadRequestException, sot.delete_extra_specs, sess, [key]
)
def test_get_private_access(self):
sot = type.Type(**TYPE)
response = mock.Mock()
response.status_code = 200
response.body = {"volume_type_access": [
{"project_id": "a", "volume_type_id": "b"}
]}
response.body = {
"volume_type_access": [{"project_id": "a", "volume_type_id": "b"}]
}
response.json = mock.Mock(return_value=response.body)
self.sess.get = mock.Mock(return_value=response)
self.assertEqual(response.body["volume_type_access"],
sot.get_private_access(self.sess))
self.assertEqual(
response.body["volume_type_access"],
sot.get_private_access(self.sess),
)
self.sess.get.assert_called_with(
"types/%s/os-volume-type-access" % sot.id)
"types/%s/os-volume-type-access" % sot.id
)
def test_add_private_access(self):
sot = type.Type(**TYPE)
@ -159,8 +160,7 @@ class TestType(base.TestCase):
url = "types/%s/action" % sot.id
body = {"addProjectAccess": {"project": "a"}}
self.sess.post.assert_called_with(
url, json=body)
self.sess.post.assert_called_with(url, json=body)
def test_remove_private_access(self):
sot = type.Type(**TYPE)
@ -169,5 +169,4 @@ class TestType(base.TestCase):
url = "types/%s/action" % sot.id
body = {"removeProjectAccess": {"project": "a"}}
self.sess.post.assert_called_with(
url, json=body)
self.sess.post.assert_called_with(url, json=body)

View File

@ -31,7 +31,6 @@ TYPE_ENC = {
class TestTypeEncryption(base.TestCase):
def test_basic(self):
sot = type.TypeEncryption(**TYPE_ENC)
self.assertEqual("encryption", sot.resource_key)

View File

@ -21,11 +21,13 @@ from openstack.tests.unit import base
FAKE_ID = "6685584b-1eac-4da6-b5c3-555430cf68ff"
IMAGE_METADATA = {
'container_format': 'bare',
'min_ram': '64', 'disk_format': u'qcow2',
'min_ram': '64',
'disk_format': u'qcow2',
'image_name': 'TestVM',
'image_id': '625d4f2c-cf67-4af3-afb6-c7220f766947',
'checksum': '64d7c1cd2b6f60c92c14662941cb7913',
'min_disk': '0', u'size': '13167616'
'min_disk': '0',
u'size': '13167616',
}
VOLUME = {
@ -59,14 +61,13 @@ VOLUME = {
"OS-SCH-HNT:scheduler_hints": {
"same_host": [
"a0cf03a5-d921-4877-bb5c-86d26cf818e1",
"8c19174f-4220-44f0-824a-cd1eeef10287"
"8c19174f-4220-44f0-824a-cd1eeef10287",
]
}
},
}
class TestVolume(base.TestCase):
def test_basic(self):
sot = volume.Volume(VOLUME)
self.assertEqual("volume", sot.resource_key)
@ -78,15 +79,19 @@ class TestVolume(base.TestCase):
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
self.assertDictEqual({"name": "name",
"status": "status",
"all_projects": "all_tenants",
"project_id": "project_id",
"created_at": "created_at",
"updated_at": "updated_at",
"limit": "limit",
"marker": "marker"},
sot._query_mapping._mapping)
self.assertDictEqual(
{
"name": "name",
"status": "status",
"all_projects": "all_tenants",
"project_id": "project_id",
"created_at": "created_at",
"updated_at": "updated_at",
"limit": "limit",
"marker": "marker",
},
sot._query_mapping._mapping,
)
def test_create(self):
sot = volume.Volume(**VOLUME)
@ -103,33 +108,40 @@ class TestVolume(base.TestCase):
self.assertEqual(VOLUME["source_volid"], sot.source_volume_id)
self.assertEqual(VOLUME["metadata"], sot.metadata)
self.assertEqual(VOLUME["multiattach"], sot.is_multiattach)
self.assertEqual(VOLUME["volume_image_metadata"],
sot.volume_image_metadata)
self.assertEqual(
VOLUME["volume_image_metadata"], sot.volume_image_metadata
)
self.assertEqual(VOLUME["size"], sot.size)
self.assertEqual(VOLUME["imageRef"], sot.image_id)
self.assertEqual(VOLUME["os-vol-host-attr:host"], sot.host)
self.assertEqual(VOLUME["os-vol-tenant-attr:tenant_id"],
sot.project_id)
self.assertEqual(VOLUME["os-vol-mig-status-attr:migstat"],
sot.migration_status)
self.assertEqual(VOLUME["os-vol-mig-status-attr:name_id"],
sot.migration_id)
self.assertEqual(VOLUME["replication_status"],
sot.replication_status)
self.assertEqual(
VOLUME["os-vol-tenant-attr:tenant_id"], sot.project_id
)
self.assertEqual(
VOLUME["os-vol-mig-status-attr:migstat"], sot.migration_status
)
self.assertEqual(
VOLUME["os-vol-mig-status-attr:name_id"], sot.migration_id
)
self.assertEqual(VOLUME["replication_status"], sot.replication_status)
self.assertEqual(
VOLUME["os-volume-replication:extended_status"],
sot.extended_replication_status)
self.assertEqual(VOLUME["consistencygroup_id"],
sot.consistency_group_id)
self.assertEqual(VOLUME["os-volume-replication:driver_data"],
sot.replication_driver_data)
sot.extended_replication_status,
)
self.assertEqual(
VOLUME["consistencygroup_id"], sot.consistency_group_id
)
self.assertEqual(
VOLUME["os-volume-replication:driver_data"],
sot.replication_driver_data,
)
self.assertFalse(sot.is_encrypted)
self.assertDictEqual(VOLUME["OS-SCH-HNT:scheduler_hints"],
sot.scheduler_hints)
self.assertDictEqual(
VOLUME["OS-SCH-HNT:scheduler_hints"], sot.scheduler_hints
)
class TestVolumeActions(TestVolume):
def setUp(self):
super(TestVolumeActions, self).setUp()
self.resp = mock.Mock()
@ -149,7 +161,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {"os-extend": {"new_size": "20"}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_set_volume_readonly(self):
sot = volume.Volume(**VOLUME)
@ -159,7 +172,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-update_readonly_flag': {'readonly': True}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_set_volume_readonly_false(self):
sot = volume.Volume(**VOLUME)
@ -169,7 +183,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-update_readonly_flag': {'readonly': False}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_set_volume_bootable(self):
sot = volume.Volume(**VOLUME)
@ -179,7 +194,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-set_bootable': {'bootable': True}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_set_volume_bootable_false(self):
sot = volume.Volume(**VOLUME)
@ -189,7 +205,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-set_bootable': {'bootable': False}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_reset_status(self):
sot = volume.Volume(**VOLUME)
@ -197,25 +214,34 @@ class TestVolumeActions(TestVolume):
self.assertIsNone(sot.reset_status(self.sess, '1', '2', '3'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-reset_status': {'status': '1', 'attach_status': '2',
'migration_status': '3'}}
body = {
'os-reset_status': {
'status': '1',
'attach_status': '2',
'migration_status': '3',
}
}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
@mock.patch('openstack.utils.require_microversion', autospec=True,
side_effect=[exceptions.SDKException()])
@mock.patch(
'openstack.utils.require_microversion',
autospec=True,
side_effect=[exceptions.SDKException()],
)
def test_revert_to_snapshot_before_340(self, mv_mock):
sot = volume.Volume(**VOLUME)
self.assertRaises(
exceptions.SDKException,
sot.revert_to_snapshot,
self.sess,
'1'
exceptions.SDKException, sot.revert_to_snapshot, self.sess, '1'
)
@mock.patch('openstack.utils.require_microversion', autospec=True,
side_effect=[None])
@mock.patch(
'openstack.utils.require_microversion',
autospec=True,
side_effect=[None],
)
def test_revert_to_snapshot_after_340(self, mv_mock):
sot = volume.Volume(**VOLUME)
@ -224,7 +250,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'revert': {'snapshot_id': '1'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
mv_mock.assert_called_with(self.sess, '3.40')
def test_attach_instance(self):
@ -235,7 +262,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-attach': {'mountpoint': '1', 'instance_uuid': '2'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_attach_host(self):
sot = volume.Volume(**VOLUME)
@ -245,16 +273,13 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-attach': {'mountpoint': '1', 'host_name': '2'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_attach_error(self):
sot = volume.Volume(**VOLUME)
self.assertRaises(
ValueError,
sot.attach,
self.sess,
'1')
self.assertRaises(ValueError, sot.attach, self.sess, '1')
def test_detach(self):
sot = volume.Volume(**VOLUME)
@ -264,19 +289,23 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-detach': {'attachment_id': '1'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_detach_force(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(
sot.detach(self.sess, '1', force=True, connector={'a': 'b'}))
sot.detach(self.sess, '1', force=True, connector={'a': 'b'})
)
url = 'volumes/%s/action' % FAKE_ID
body = {'os-force_detach': {'attachment_id': '1',
'connector': {'a': 'b'}}}
body = {
'os-force_detach': {'attachment_id': '1', 'connector': {'a': 'b'}}
}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_unmanage(self):
sot = volume.Volume(**VOLUME)
@ -286,7 +315,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-unmanage': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_retype(self):
sot = volume.Volume(**VOLUME)
@ -296,7 +326,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-retype': {'new_type': '1'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_retype_mp(self):
sot = volume.Volume(**VOLUME)
@ -306,7 +337,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-retype': {'new_type': '1', 'migration_policy': '2'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_migrate(self):
sot = volume.Volume(**VOLUME)
@ -316,33 +348,55 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume': {'host': '1'}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_migrate_flags(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.migrate(self.sess, host='1',
force_host_copy=True, lock_volume=True))
self.assertIsNone(
sot.migrate(
self.sess, host='1', force_host_copy=True, lock_volume=True
)
)
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume': {'host': '1', 'force_host_copy': True,
'lock_volume': True}}
body = {
'os-migrate_volume': {
'host': '1',
'force_host_copy': True,
'lock_volume': True,
}
}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
@mock.patch('openstack.utils.require_microversion', autospec=True,
side_effect=[None])
@mock.patch(
'openstack.utils.require_microversion',
autospec=True,
side_effect=[None],
)
def test_migrate_cluster(self, mv_mock):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.migrate(self.sess, cluster='1',
force_host_copy=True, lock_volume=True))
self.assertIsNone(
sot.migrate(
self.sess, cluster='1', force_host_copy=True, lock_volume=True
)
)
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume': {'cluster': '1', 'force_host_copy': True,
'lock_volume': True}}
body = {
'os-migrate_volume': {
'cluster': '1',
'force_host_copy': True,
'lock_volume': True,
}
}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
mv_mock.assert_called_with(self.sess, '3.16')
def test_complete_migration(self):
@ -351,22 +405,27 @@ class TestVolumeActions(TestVolume):
self.assertIsNone(sot.complete_migration(self.sess, new_volume_id='1'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume_completion': {'new_volume': '1', 'error':
False}}
body = {
'os-migrate_volume_completion': {'new_volume': '1', 'error': False}
}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_complete_migration_error(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.complete_migration(
self.sess, new_volume_id='1', error=True))
self.assertIsNone(
sot.complete_migration(self.sess, new_volume_id='1', error=True)
)
url = 'volumes/%s/action' % FAKE_ID
body = {'os-migrate_volume_completion': {'new_volume': '1', 'error':
True}}
body = {
'os-migrate_volume_completion': {'new_volume': '1', 'error': True}
}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_force_delete(self):
sot = volume.Volume(**VOLUME)
@ -376,7 +435,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-force_delete': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_upload_image(self):
sot = volume.Volume(**VOLUME)
@ -390,15 +450,16 @@ class TestVolumeActions(TestVolume):
self.assertDictEqual({'a': 'b'}, sot.upload_to_image(self.sess, '1'))
url = 'volumes/%s/action' % FAKE_ID
body = {'os-volume_upload_image': {
'image_name': '1',
'force': False
}}
body = {'os-volume_upload_image': {'image_name': '1', 'force': False}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
@mock.patch('openstack.utils.require_microversion', autospec=True,
side_effect=[None])
@mock.patch(
'openstack.utils.require_microversion',
autospec=True,
side_effect=[None],
)
def test_upload_image_args(self, mv_mock):
sot = volume.Volume(**VOLUME)
@ -410,21 +471,30 @@ class TestVolumeActions(TestVolume):
self.assertDictEqual(
{'a': 'b'},
sot.upload_to_image(self.sess, '1', disk_format='2',
container_format='3', visibility='4',
protected='5'))
sot.upload_to_image(
self.sess,
'1',
disk_format='2',
container_format='3',
visibility='4',
protected='5',
),
)
url = 'volumes/%s/action' % FAKE_ID
body = {'os-volume_upload_image': {
'image_name': '1',
'force': False,
'disk_format': '2',
'container_format': '3',
'visibility': '4',
'protected': '5'
}}
body = {
'os-volume_upload_image': {
'image_name': '1',
'force': False,
'disk_format': '2',
'container_format': '3',
'visibility': '4',
'protected': '5',
}
}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
mv_mock.assert_called_with(self.sess, '3.1')
def test_reserve(self):
@ -435,7 +505,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-reserve': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_unreserve(self):
sot = volume.Volume(**VOLUME)
@ -445,7 +516,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-unreserve': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_begin_detaching(self):
sot = volume.Volume(**VOLUME)
@ -455,7 +527,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-begin_detaching': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_abort_detaching(self):
sot = volume.Volume(**VOLUME)
@ -465,7 +538,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-roll_detaching': {}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_init_attachment(self):
sot = volume.Volume(**VOLUME)
@ -475,7 +549,8 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-initialize_connection': {'connector': {'a': 'b'}}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)
def test_terminate_attachment(self):
sot = volume.Volume(**VOLUME)
@ -485,4 +560,5 @@ class TestVolumeActions(TestVolume):
url = 'volumes/%s/action' % FAKE_ID
body = {'os-terminate_connection': {'connector': {'a': 'b'}}}
self.sess.post.assert_called_with(
url, json=body, microversion=sot._max_microversion)
url, json=body, microversion=sot._max_microversion
)