Blackify openstack.shared_file_system

Black used with the '-l 79 -S' flags.

A future change will ignore this commit in git-blame history by adding a
'git-blame-ignore-revs' file.

Change-Id: I54209e6cbfeec18a15771882d38730aca273238c
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane 2023-05-05 10:59:36 +01:00
parent 82c2a53402
commit 3d2511f980
36 changed files with 768 additions and 600 deletions

View File

@ -16,6 +16,7 @@ from openstack.shared_file_system.v2 import _proxy
class SharedFilesystemService(service_description.ServiceDescription): class SharedFilesystemService(service_description.ServiceDescription):
"""The shared file systems service.""" """The shared file systems service."""
supported_versions = { supported_versions = {
'2': _proxy.Proxy, '2': _proxy.Proxy,
} }

View File

@ -13,34 +13,27 @@
from openstack import proxy from openstack import proxy
from openstack import resource from openstack import resource
from openstack.shared_file_system.v2 import ( from openstack.shared_file_system.v2 import (
availability_zone as _availability_zone) availability_zone as _availability_zone,
from openstack.shared_file_system.v2 import (
share_access_rule as _share_access_rule
)
from openstack.shared_file_system.v2 import (
share_export_locations as _share_export_locations
)
from openstack.shared_file_system.v2 import (
share_network as _share_network
)
from openstack.shared_file_system.v2 import (
share_network_subnet as _share_network_subnet
)
from openstack.shared_file_system.v2 import (
share_snapshot as _share_snapshot
)
from openstack.shared_file_system.v2 import (
share_snapshot_instance as _share_snapshot_instance
)
from openstack.shared_file_system.v2 import (
storage_pool as _storage_pool
)
from openstack.shared_file_system.v2 import (
user_message as _user_message
) )
from openstack.shared_file_system.v2 import limit as _limit from openstack.shared_file_system.v2 import limit as _limit
from openstack.shared_file_system.v2 import share as _share from openstack.shared_file_system.v2 import share as _share
from openstack.shared_file_system.v2 import (
share_access_rule as _share_access_rule,
)
from openstack.shared_file_system.v2 import (
share_export_locations as _share_export_locations,
)
from openstack.shared_file_system.v2 import share_instance as _share_instance from openstack.shared_file_system.v2 import share_instance as _share_instance
from openstack.shared_file_system.v2 import share_network as _share_network
from openstack.shared_file_system.v2 import (
share_network_subnet as _share_network_subnet,
)
from openstack.shared_file_system.v2 import share_snapshot as _share_snapshot
from openstack.shared_file_system.v2 import (
share_snapshot_instance as _share_snapshot_instance,
)
from openstack.shared_file_system.v2 import storage_pool as _storage_pool
from openstack.shared_file_system.v2 import user_message as _user_message
class Proxy(proxy.Proxy): class Proxy(proxy.Proxy):
@ -53,11 +46,9 @@ class Proxy(proxy.Proxy):
"share": _share.Share, "share": _share.Share,
"share_network": _share_network.ShareNetwork, "share_network": _share_network.ShareNetwork,
"share_network_subnet": _share_network_subnet.ShareNetworkSubnet, "share_network_subnet": _share_network_subnet.ShareNetworkSubnet,
"share_snapshot_instance": "share_snapshot_instance": _share_snapshot_instance.ShareSnapshotInstance, # noqa: E501
_share_snapshot_instance.ShareSnapshotInstance,
"share_instance": _share_instance.ShareInstance, "share_instance": _share_instance.ShareInstance,
"share_export_locations": "share_export_locations": _share_export_locations.ShareExportLocation,
_share_export_locations.ShareExportLocation,
"share_access_rule": _share_access_rule.ShareAccessRule, "share_access_rule": _share_access_rule.ShareAccessRule,
} }
@ -134,8 +125,7 @@ class Proxy(proxy.Proxy):
:returns: Result of the ``delete`` :returns: Result of the ``delete``
:rtype: ``None`` :rtype: ``None``
""" """
self._delete(_share.Share, share, self._delete(_share.Share, share, ignore_missing=ignore_missing)
ignore_missing=ignore_missing)
def update_share(self, share_id, **attrs): def update_share(self, share_id, **attrs):
"""Updates details of a single share. """Updates details of a single share.
@ -172,12 +162,7 @@ class Proxy(proxy.Proxy):
res.revert_to_snapshot(self, snapshot_id) res.revert_to_snapshot(self, snapshot_id)
def resize_share( def resize_share(
self, self, share_id, new_size, no_shrink=False, no_extend=False, force=False
share_id,
new_size,
no_shrink=False,
no_extend=False,
force=False
): ):
"""Resizes a share, extending/shrinking the share as needed. """Resizes a share, extending/shrinking the share as needed.
@ -206,8 +191,9 @@ class Proxy(proxy.Proxy):
elif new_size < res.size and no_shrink is not True: elif new_size < res.size and no_shrink is not True:
res.shrink_share(self, new_size) res.shrink_share(self, new_size)
def wait_for_status(self, res, status='active', failures=None, def wait_for_status(
interval=2, wait=120): self, res, status='active', failures=None, interval=2, wait=120
):
"""Wait for a resource to be in a particular status. """Wait for a resource to be in a particular status.
:param res: The resource to wait on to reach the specified status. :param res: The resource to wait on to reach the specified status.
The resource must have a ``status`` attribute. The resource must have a ``status`` attribute.
@ -229,7 +215,8 @@ class Proxy(proxy.Proxy):
""" """
failures = [] if failures is None else failures failures = [] if failures is None else failures
return resource.wait_for_status( return resource.wait_for_status(
self, res, status, failures, interval, wait) self, res, status, failures, interval, wait
)
def storage_pools(self, details=True, **query): def storage_pools(self, details=True, **query):
"""Lists all back-end storage pools with details """Lists all back-end storage pools with details
@ -248,7 +235,8 @@ class Proxy(proxy.Proxy):
""" """
base_path = '/scheduler-stats/pools/detail' if details else None base_path = '/scheduler-stats/pools/detail' if details else None
return self._list( return self._list(
_storage_pool.StoragePool, base_path=base_path, **query) _storage_pool.StoragePool, base_path=base_path, **query
)
def user_messages(self, **query): def user_messages(self, **query):
"""List shared file system user messages """List shared file system user messages
@ -278,8 +266,7 @@ class Proxy(proxy.Proxy):
:rtype: :rtype:
:class:`~openstack.shared_file_system.v2.user_message.UserMessage` :class:`~openstack.shared_file_system.v2.user_message.UserMessage`
""" """
return self._list( return self._list(_user_message.UserMessage, **query)
_user_message.UserMessage, **query)
def get_user_message(self, message_id): def get_user_message(self, message_id):
"""List details of a single user message """List details of a single user message
@ -300,8 +287,10 @@ class Proxy(proxy.Proxy):
:class:`~openstack.shared_file_system.v2.user_message.UserMessage` :class:`~openstack.shared_file_system.v2.user_message.UserMessage`
""" """
return self._delete( return self._delete(
_user_message.UserMessage, message_id, _user_message.UserMessage,
ignore_missing=ignore_missing) message_id,
ignore_missing=ignore_missing,
)
def limits(self, **query): def limits(self, **query):
"""Lists all share limits. """Lists all share limits.
@ -312,8 +301,7 @@ class Proxy(proxy.Proxy):
:returns: A generator of manila share limits resources :returns: A generator of manila share limits resources
:rtype: :class:`~openstack.shared_file_system.v2.limit.Limit` :rtype: :class:`~openstack.shared_file_system.v2.limit.Limit`
""" """
return self._list( return self._list(_limit.Limit, **query)
_limit.Limit, **query)
def share_snapshots(self, details=True, **query): def share_snapshots(self, details=True, **query):
"""Lists all share snapshots with details. """Lists all share snapshots with details.
@ -329,7 +317,8 @@ class Proxy(proxy.Proxy):
""" """
base_path = '/snapshots/detail' if details else None base_path = '/snapshots/detail' if details else None
return self._list( return self._list(
_share_snapshot.ShareSnapshot, base_path=base_path, **query) _share_snapshot.ShareSnapshot, base_path=base_path, **query
)
def get_share_snapshot(self, snapshot_id): def get_share_snapshot(self, snapshot_id):
"""Lists details of a single share snapshot """Lists details of a single share snapshot
@ -359,8 +348,9 @@ class Proxy(proxy.Proxy):
:rtype: :rtype:
:class:`~openstack.shared_file_system.v2.share_snapshot.ShareSnapshot` :class:`~openstack.shared_file_system.v2.share_snapshot.ShareSnapshot`
""" """
return self._update(_share_snapshot.ShareSnapshot, snapshot_id, return self._update(
**attrs) _share_snapshot.ShareSnapshot, snapshot_id, **attrs
)
def delete_share_snapshot(self, snapshot_id, ignore_missing=True): def delete_share_snapshot(self, snapshot_id, ignore_missing=True):
"""Deletes a single share snapshot """Deletes a single share snapshot
@ -369,8 +359,11 @@ class Proxy(proxy.Proxy):
:returns: Result of the ``delete`` :returns: Result of the ``delete``
:rtype: ``None`` :rtype: ``None``
""" """
self._delete(_share_snapshot.ShareSnapshot, snapshot_id, self._delete(
ignore_missing=ignore_missing) _share_snapshot.ShareSnapshot,
snapshot_id,
ignore_missing=ignore_missing,
)
# ========= Network Subnets ========== # ========= Network Subnets ==========
def share_network_subnets(self, share_network_id): def share_network_subnets(self, share_network_id):
@ -384,10 +377,13 @@ class Proxy(proxy.Proxy):
""" """
return self._list( return self._list(
_share_network_subnet.ShareNetworkSubnet, _share_network_subnet.ShareNetworkSubnet,
share_network_id=share_network_id) share_network_id=share_network_id,
)
def get_share_network_subnet( def get_share_network_subnet(
self, share_network_id, share_network_subnet_id, self,
share_network_id,
share_network_subnet_id,
): ):
"""Lists details of a single share network subnet. """Lists details of a single share network subnet.
@ -403,7 +399,8 @@ class Proxy(proxy.Proxy):
return self._get( return self._get(
_share_network_subnet.ShareNetworkSubnet, _share_network_subnet.ShareNetworkSubnet,
share_network_subnet_id, share_network_subnet_id,
share_network_id=share_network_id) share_network_id=share_network_id,
)
def create_share_network_subnet(self, share_network_id, **attrs): def create_share_network_subnet(self, share_network_id, **attrs):
"""Creates a share network subnet from attributes """Creates a share network subnet from attributes
@ -419,7 +416,8 @@ class Proxy(proxy.Proxy):
return self._create( return self._create(
_share_network_subnet.ShareNetworkSubnet, _share_network_subnet.ShareNetworkSubnet,
**attrs, **attrs,
share_network_id=share_network_id) share_network_id=share_network_id
)
def delete_share_network_subnet( def delete_share_network_subnet(
self, share_network_id, share_network_subnet, ignore_missing=True self, share_network_id, share_network_subnet, ignore_missing=True
@ -438,7 +436,8 @@ class Proxy(proxy.Proxy):
_share_network_subnet.ShareNetworkSubnet, _share_network_subnet.ShareNetworkSubnet,
share_network_subnet, share_network_subnet,
share_network_id=share_network_id, share_network_id=share_network_id,
ignore_missing=ignore_missing) ignore_missing=ignore_missing,
)
def wait_for_delete(self, res, interval=2, wait=120): def wait_for_delete(self, res, interval=2, wait=120):
"""Wait for a resource to be deleted. """Wait for a resource to be deleted.
@ -474,8 +473,11 @@ class Proxy(proxy.Proxy):
share_snapshot_instance.ShareSnapshotInstance` share_snapshot_instance.ShareSnapshotInstance`
""" """
base_path = '/snapshot-instances/detail' if details else None base_path = '/snapshot-instances/detail' if details else None
return self._list(_share_snapshot_instance.ShareSnapshotInstance, return self._list(
base_path=base_path, **query) _share_snapshot_instance.ShareSnapshotInstance,
base_path=base_path,
**query
)
def get_share_snapshot_instance(self, snapshot_instance_id): def get_share_snapshot_instance(self, snapshot_instance_id):
"""Lists details of a single share snapshot instance """Lists details of a single share snapshot instance
@ -485,8 +487,10 @@ class Proxy(proxy.Proxy):
:rtype: :class:`~openstack.shared_file_system.v2. :rtype: :class:`~openstack.shared_file_system.v2.
share_snapshot_instance.ShareSnapshotInstance` share_snapshot_instance.ShareSnapshotInstance`
""" """
return self._get(_share_snapshot_instance.ShareSnapshotInstance, return self._get(
snapshot_instance_id) _share_snapshot_instance.ShareSnapshotInstance,
snapshot_instance_id,
)
def share_networks(self, details=True, **query): def share_networks(self, details=True, **query):
"""Lists all share networks with details. """Lists all share networks with details.
@ -508,7 +512,8 @@ class Proxy(proxy.Proxy):
""" """
base_path = '/share-networks/detail' if details else None base_path = '/share-networks/detail' if details else None
return self._list( return self._list(
_share_network.ShareNetwork, base_path=base_path, **query) _share_network.ShareNetwork, base_path=base_path, **query
)
def get_share_network(self, share_network_id): def get_share_network(self, share_network_id):
"""Lists details of a single share network """Lists details of a single share network
@ -527,8 +532,10 @@ class Proxy(proxy.Proxy):
:rtype: ``None`` :rtype: ``None``
""" """
self._delete( self._delete(
_share_network.ShareNetwork, share_network_id, _share_network.ShareNetwork,
ignore_missing=ignore_missing) share_network_id,
ignore_missing=ignore_missing,
)
def update_share_network(self, share_network_id, **attrs): def update_share_network(self, share_network_id, **attrs):
"""Updates details of a single share network. """Updates details of a single share network.
@ -540,7 +547,8 @@ class Proxy(proxy.Proxy):
share_network.ShareNetwork` share_network.ShareNetwork`
""" """
return self._update( return self._update(
_share_network.ShareNetwork, share_network_id, **attrs) _share_network.ShareNetwork, share_network_id, **attrs
)
def create_share_network(self, **attrs): def create_share_network(self, **attrs):
"""Creates a share network from attributes """Creates a share network from attributes
@ -570,8 +578,7 @@ class Proxy(proxy.Proxy):
:rtype: :class:`~openstack.shared_file_system.v2. :rtype: :class:`~openstack.shared_file_system.v2.
share_instance.ShareInstance` share_instance.ShareInstance`
""" """
return self._list( return self._list(_share_instance.ShareInstance, **query)
_share_instance.ShareInstance, **query)
def get_share_instance(self, share_instance_id): def get_share_instance(self, share_instance_id):
"""Shows details for a single share instance """Shows details for a single share instance
@ -592,8 +599,9 @@ class Proxy(proxy.Proxy):
:returns: ``None`` :returns: ``None``
""" """
res = self._get_resource(_share_instance.ShareInstance, res = self._get_resource(
share_instance_id) _share_instance.ShareInstance, share_instance_id
)
res.reset_status(self, status) res.reset_status(self, status)
def delete_share_instance(self, share_instance_id): def delete_share_instance(self, share_instance_id):
@ -603,8 +611,9 @@ class Proxy(proxy.Proxy):
:returns: ``None`` :returns: ``None``
""" """
res = self._get_resource(_share_instance.ShareInstance, res = self._get_resource(
share_instance_id) _share_instance.ShareInstance, share_instance_id
)
res.force_delete(self) res.force_delete(self)
def export_locations(self, share_id): def export_locations(self, share_id):
@ -615,8 +624,9 @@ class Proxy(proxy.Proxy):
:rtype: List of :class:`~openstack.shared_filesystem_storage.v2. :rtype: List of :class:`~openstack.shared_filesystem_storage.v2.
share_export_locations.ShareExportLocations` share_export_locations.ShareExportLocations`
""" """
return self._list(_share_export_locations.ShareExportLocation, return self._list(
share_id=share_id) _share_export_locations.ShareExportLocation, share_id=share_id
)
def get_export_location(self, export_location, share_id): def get_export_location(self, export_location, share_id):
"""List details of export location """List details of export location
@ -631,7 +641,9 @@ class Proxy(proxy.Proxy):
export_location_id = resource.Resource._get_id(export_location) export_location_id = resource.Resource._get_id(export_location)
return self._get( return self._get(
_share_export_locations.ShareExportLocation, _share_export_locations.ShareExportLocation,
export_location_id, share_id=share_id) export_location_id,
share_id=share_id,
)
def access_rules(self, share, **query): def access_rules(self, share, **query):
"""Lists the access rules on a share. """Lists the access rules on a share.
@ -642,8 +654,8 @@ class Proxy(proxy.Proxy):
""" """
share = self._get_resource(_share.Share, share) share = self._get_resource(_share.Share, share)
return self._list( return self._list(
_share_access_rule.ShareAccessRule, _share_access_rule.ShareAccessRule, share_id=share.id, **query
share_id=share.id, **query) )
def get_access_rule(self, access_id): def get_access_rule(self, access_id):
"""List details of an access rule. """List details of an access rule.
@ -653,8 +665,7 @@ class Proxy(proxy.Proxy):
:rtype: :class:`~openstack.shared_file_system.v2. :rtype: :class:`~openstack.shared_file_system.v2.
share_access_rules.ShareAccessRules` share_access_rules.ShareAccessRules`
""" """
return self._get( return self._get(_share_access_rule.ShareAccessRule, access_id)
_share_access_rule.ShareAccessRule, access_id)
def create_access_rule(self, share_id, **attrs): def create_access_rule(self, share_id, **attrs):
"""Creates an access rule from attributes """Creates an access rule from attributes
@ -670,7 +681,8 @@ class Proxy(proxy.Proxy):
""" """
base_path = "/shares/%s/action" % (share_id,) base_path = "/shares/%s/action" % (share_id,)
return self._create( return self._create(
_share_access_rule.ShareAccessRule, base_path=base_path, **attrs) _share_access_rule.ShareAccessRule, base_path=base_path, **attrs
)
def delete_access_rule(self, access_id, share_id, ignore_missing=True): def delete_access_rule(self, access_id, share_id, ignore_missing=True):
"""Deletes an access rule """Deletes an access rule
@ -680,6 +692,5 @@ class Proxy(proxy.Proxy):
:rtype: ``None`` :rtype: ``None``
""" """
res = self._get_resource( res = self._get_resource(_share_access_rule.ShareAccessRule, access_id)
_share_access_rule.ShareAccessRule, access_id)
res.delete(self, share_id, ignore_missing=ignore_missing) res.delete(self, share_id, ignore_missing=ignore_missing)

View File

@ -29,47 +29,46 @@ class Limit(resource.Resource):
#: The maximum number of replica gigabytes that are allowed #: The maximum number of replica gigabytes that are allowed
#: in a project. #: in a project.
maxTotalReplicaGigabytes = resource.Body( maxTotalReplicaGigabytes = resource.Body(
"maxTotalReplicaGigabytes", type=int) "maxTotalReplicaGigabytes", type=int
)
#: The total maximum number of shares that are allowed in a project. #: The total maximum number of shares that are allowed in a project.
maxTotalShares = resource.Body("maxTotalShares", type=int) maxTotalShares = resource.Body("maxTotalShares", type=int)
#: The total maximum number of share gigabytes that are allowed in a #: The total maximum number of share gigabytes that are allowed in a
#: project. #: project.
maxTotalShareGigabytes = resource.Body( maxTotalShareGigabytes = resource.Body("maxTotalShareGigabytes", type=int)
"maxTotalShareGigabytes", type=int)
#: The total maximum number of share-networks that are allowed in a #: The total maximum number of share-networks that are allowed in a
#: project. #: project.
maxTotalShareNetworks = resource.Body( maxTotalShareNetworks = resource.Body("maxTotalShareNetworks", type=int)
"maxTotalShareNetworks", type=int)
#: The total maximum number of share snapshots that are allowed in a #: The total maximum number of share snapshots that are allowed in a
#: project. #: project.
maxTotalShareSnapshots = resource.Body( maxTotalShareSnapshots = resource.Body("maxTotalShareSnapshots", type=int)
"maxTotalShareSnapshots", type=int)
#: The maximum number of share replicas that is allowed. #: The maximum number of share replicas that is allowed.
maxTotalShareReplicas = resource.Body( maxTotalShareReplicas = resource.Body("maxTotalShareReplicas", type=int)
"maxTotalShareReplicas", type=int)
#: The total maximum number of snapshot gigabytes that are allowed #: The total maximum number of snapshot gigabytes that are allowed
#: in a project. #: in a project.
maxTotalSnapshotGigabytes = resource.Body( maxTotalSnapshotGigabytes = resource.Body(
"maxTotalSnapshotGigabytes", type=int) "maxTotalSnapshotGigabytes", type=int
)
#: The total number of replica gigabytes used in a project by #: The total number of replica gigabytes used in a project by
#: share replicas. #: share replicas.
totalReplicaGigabytesUsed = resource.Body( totalReplicaGigabytesUsed = resource.Body(
"totalReplicaGigabytesUsed", type=int) "totalReplicaGigabytesUsed", type=int
)
#: The total number of gigabytes used in a project by shares. #: The total number of gigabytes used in a project by shares.
totalShareGigabytesUsed = resource.Body( totalShareGigabytesUsed = resource.Body(
"totalShareGigabytesUsed", type=int) "totalShareGigabytesUsed", type=int
)
#: The total number of created shares in a project. #: The total number of created shares in a project.
totalSharesUsed = resource.Body( totalSharesUsed = resource.Body("totalSharesUsed", type=int)
"totalSharesUsed", type=int)
#: The total number of created share-networks in a project. #: The total number of created share-networks in a project.
totalShareNetworksUsed = resource.Body( totalShareNetworksUsed = resource.Body("totalShareNetworksUsed", type=int)
"totalShareNetworksUsed", type=int)
#: The total number of created share snapshots in a project. #: The total number of created share snapshots in a project.
totalShareSnapshotsUsed = resource.Body( totalShareSnapshotsUsed = resource.Body(
"totalShareSnapshotsUsed", type=int) "totalShareSnapshotsUsed", type=int
)
#: The total number of gigabytes used in a project by snapshots. #: The total number of gigabytes used in a project by snapshots.
totalSnapshotGigabytesUsed = resource.Body( totalSnapshotGigabytesUsed = resource.Body(
"totalSnapshotGigabytesUsed", type=int) "totalSnapshotGigabytesUsed", type=int
)
#: The total number of created share replicas in a project. #: The total number of created share replicas in a project.
totalShareReplicasUsed = resource.Body( totalShareReplicasUsed = resource.Body("totalShareReplicasUsed", type=int)
"totalShareReplicasUsed", type=int)

View File

@ -46,18 +46,20 @@ class Share(resource.Resource):
#: Whether or not this share supports snapshots that can be #: Whether or not this share supports snapshots that can be
#: cloned into new shares. #: cloned into new shares.
is_creating_new_share_from_snapshot_supported = resource.Body( is_creating_new_share_from_snapshot_supported = resource.Body(
"create_share_from_snapshot_support", type=bool) "create_share_from_snapshot_support", type=bool
)
#: Whether the share's snapshots can be mounted directly and access #: Whether the share's snapshots can be mounted directly and access
#: controlled independently or not. #: controlled independently or not.
is_mounting_snapshot_supported = resource.Body( is_mounting_snapshot_supported = resource.Body(
"mount_snapshot_support", type=bool) "mount_snapshot_support", type=bool
)
#: Whether the share can be reverted to its latest snapshot or not. #: Whether the share can be reverted to its latest snapshot or not.
is_reverting_to_snapshot_supported = resource.Body( is_reverting_to_snapshot_supported = resource.Body(
"revert_to_snapshot_support", type=bool) "revert_to_snapshot_support", type=bool
)
#: An extra specification that filters back ends by whether the share #: An extra specification that filters back ends by whether the share
#: supports snapshots or not. #: supports snapshots or not.
is_snapshot_supported = resource.Body( is_snapshot_supported = resource.Body("snapshot_support", type=bool)
"snapshot_support", type=bool)
#: Indicates whether the share has replicas or not. #: Indicates whether the share has replicas or not.
is_replicated = resource.Body("has_replicas", type=bool) is_replicated = resource.Body("has_replicas", type=bool)
#: One or more metadata key and value pairs as a dictionary of strings. #: One or more metadata key and value pairs as a dictionary of strings.
@ -91,7 +93,8 @@ class Share(resource.Resource):
#: The ID of the group snapshot instance that was used to create #: The ID of the group snapshot instance that was used to create
#: this share. #: this share.
source_share_group_snapshot_member_id = resource.Body( source_share_group_snapshot_member_id = resource.Body(
"source_share_group_snapshot_member_id", type=str) "source_share_group_snapshot_member_id", type=str
)
#: The share status #: The share status
status = resource.Body("status", type=str) status = resource.Body("status", type=str)
#: For the share migration, the migration task state. #: For the share migration, the migration task state.
@ -109,14 +112,11 @@ class Share(resource.Resource):
headers = {'Accept': ''} headers = {'Accept': ''}
if microversion is None: if microversion is None:
microversion = \ microversion = self._get_microversion(session, action=action)
self._get_microversion(session, action=action)
response = session.post( response = session.post(
url, url, json=body, headers=headers, microversion=microversion
json=body, )
headers=headers,
microversion=microversion)
exceptions.raise_from_response(response) exceptions.raise_from_response(response)
return response return response

View File

@ -55,28 +55,26 @@ class ShareAccessRule(resource.Resource):
#: the services database. #: the services database.
updated_at = resource.Body("updated_at", type=str) updated_at = resource.Body("updated_at", type=str)
def _action(self, session, body, url, def _action(self, session, body, url, action='patch', microversion=None):
action='patch', microversion=None):
headers = {'Accept': ''} headers = {'Accept': ''}
if microversion is None: if microversion is None:
microversion = \ microversion = self._get_microversion(session, action=action)
self._get_microversion(session, action=action)
session.post( session.post(
url, url, json=body, headers=headers, microversion=microversion
json=body, )
headers=headers,
microversion=microversion)
def create(self, session, **kwargs): def create(self, session, **kwargs):
return super().create(session, return super().create(
resource_request_key='allow_access', session,
resource_response_key='access', resource_request_key='allow_access',
**kwargs) resource_response_key='access',
**kwargs
)
def delete(self, session, share_id, ignore_missing=True): def delete(self, session, share_id, ignore_missing=True):
body = {'deny_access': {'access_id' : self.id}} body = {'deny_access': {'access_id': self.id}}
url = utils.urljoin('/shares', share_id, 'action') url = utils.urljoin('/shares', share_id, 'action')
try: try:
response = self._action(session, body, url) response = self._action(session, body, url)

View File

@ -66,8 +66,9 @@ class ShareInstance(resource.Resource):
# Set microversion override # Set microversion override
extra_attrs['microversion'] = microversion extra_attrs['microversion'] = microversion
else: else:
extra_attrs['microversion'] = \ extra_attrs['microversion'] = self._get_microversion(
self._get_microversion(session, action=action) session, action=action
)
response = session.post(url, json=body, headers=headers, **extra_attrs) response = session.post(url, json=body, headers=headers, **extra_attrs)
exceptions.raise_from_response(response) exceptions.raise_from_response(response)
return response return response

View File

@ -27,9 +27,15 @@ class ShareNetwork(resource.Resource):
allow_head = False allow_head = False
_query_mapping = resource.QueryParameters( _query_mapping = resource.QueryParameters(
"project_id", "name", "description", "project_id",
"created_since", "created_before", "security_service_id", "name",
"limit", "offset", all_projects="all_tenants", "description",
"created_since",
"created_before",
"security_service_id",
"limit",
"offset",
all_projects="all_tenants",
) )
#: Properties #: Properties

View File

@ -56,7 +56,6 @@ class ShareNetworkSubnet(resource.Resource):
updated_at = resource.Body("updated_at", type=str) updated_at = resource.Body("updated_at", type=str)
def create(self, session, **kwargs): def create(self, session, **kwargs):
return super().\ return super().create(
create(session, session, resource_request_key='share-network-subnet', **kwargs
resource_request_key='share-network-subnet', )
**kwargs)

View File

@ -26,9 +26,7 @@ class ShareSnapshot(resource.Resource):
allow_list = True allow_list = True
allow_head = False allow_head = False
_query_mapping = resource.QueryParameters( _query_mapping = resource.QueryParameters("snapshot_id")
"snapshot_id"
)
#: Properties #: Properties
#: The date and time stamp when the resource was #: The date and time stamp when the resource was
@ -39,8 +37,7 @@ class ShareSnapshot(resource.Resource):
#: The user defined name of the resource. #: The user defined name of the resource.
display_name = resource.Body("display_name", type=str) display_name = resource.Body("display_name", type=str)
#: The user defined description of the resource #: The user defined description of the resource
display_description = resource.Body( display_description = resource.Body("display_description", type=str)
"display_description", type=str)
#: ID of the project that the snapshot belongs to. #: ID of the project that the snapshot belongs to.
project_id = resource.Body("project_id", type=str) project_id = resource.Body("project_id", type=str)
#: The UUID of the source share that was used to #: The UUID of the source share that was used to

View File

@ -27,7 +27,11 @@ class StoragePool(resource.Resource):
allow_head = False allow_head = False
_query_mapping = resource.QueryParameters( _query_mapping = resource.QueryParameters(
'pool', 'backend', 'host', 'capabilities', 'share_type', 'pool',
'backend',
'host',
'capabilities',
'share_type',
) )
#: Properties #: Properties

View File

@ -25,9 +25,7 @@ class UserMessage(resource.Resource):
allow_list = True allow_list = True
allow_head = False allow_head = False
_query_mapping = resource.QueryParameters( _query_mapping = resource.QueryParameters("message_id")
"message_id"
)
_max_microversion = '2.37' _max_microversion = '2.37'

View File

@ -20,40 +20,49 @@ class BaseSharedFileSystemTest(base.BaseFunctionalTest):
def setUp(self): def setUp(self):
super(BaseSharedFileSystemTest, self).setUp() super(BaseSharedFileSystemTest, self).setUp()
self.require_service('shared-file-system', self.require_service(
min_microversion=self.min_microversion) 'shared-file-system', min_microversion=self.min_microversion
)
self._set_operator_cloud(shared_file_system_api_version='2.63') self._set_operator_cloud(shared_file_system_api_version='2.63')
self._set_user_cloud(shared_file_system_api_version='2.63') self._set_user_cloud(shared_file_system_api_version='2.63')
def create_share(self, **kwargs): def create_share(self, **kwargs):
share = self.user_cloud.share.create_share(**kwargs) share = self.user_cloud.share.create_share(**kwargs)
self.addCleanup(self.user_cloud.share.delete_share, self.addCleanup(
share.id, self.user_cloud.share.delete_share, share.id, ignore_missing=True
ignore_missing=True) )
self.user_cloud.share.wait_for_status( self.user_cloud.share.wait_for_status(
share, share,
status='available', status='available',
failures=['error'], failures=['error'],
interval=5, interval=5,
wait=self._wait_for_timeout) wait=self._wait_for_timeout,
)
self.assertIsNotNone(share.id) self.assertIsNotNone(share.id)
return share return share
def create_share_snapshot(self, share_id, **kwargs): def create_share_snapshot(self, share_id, **kwargs):
share_snapshot = self.user_cloud.share.create_share_snapshot( share_snapshot = self.user_cloud.share.create_share_snapshot(
share_id=share_id, force=True) share_id=share_id, force=True
self.addCleanup(resource.wait_for_delete, )
self.user_cloud.share, share_snapshot, self.addCleanup(
wait=self._wait_for_timeout, resource.wait_for_delete,
interval=2) self.user_cloud.share,
self.addCleanup(self.user_cloud.share.delete_share_snapshot, share_snapshot,
share_snapshot.id, wait=self._wait_for_timeout,
ignore_missing=False) interval=2,
)
self.addCleanup(
self.user_cloud.share.delete_share_snapshot,
share_snapshot.id,
ignore_missing=False,
)
self.user_cloud.share.wait_for_status( self.user_cloud.share.wait_for_status(
share_snapshot, share_snapshot,
status='available', status='available',
failures=['error'], failures=['error'],
interval=5, interval=5,
wait=self._wait_for_timeout) wait=self._wait_for_timeout,
)
self.assertIsNotNone(share_snapshot.id) self.assertIsNotNone(share_snapshot.id)
return share_snapshot return share_snapshot

View File

@ -22,8 +22,12 @@ class TestExportLocation(base.BaseSharedFileSystemTest):
self.SHARE_NAME = self.getUniqueString() self.SHARE_NAME = self.getUniqueString()
my_share = self.create_share( my_share = self.create_share(
name=self.SHARE_NAME, size=2, share_type="dhss_false", name=self.SHARE_NAME,
share_protocol='NFS', description=None) size=2,
share_type="dhss_false",
share_protocol='NFS',
description=None,
)
self.SHARE_ID = my_share.id self.SHARE_ID = my_share.id
def test_export_locations(self): def test_export_locations(self):
@ -33,8 +37,12 @@ class TestExportLocation(base.BaseSharedFileSystemTest):
self.assertGreater(len(list(exs)), 0) self.assertGreater(len(list(exs)), 0)
for ex in exs: for ex in exs:
for attribute in ( for attribute in (
'id', 'path', 'share_instance_id', 'id',
'updated_at', 'created_at'): 'path',
'share_instance_id',
'updated_at',
'created_at',
):
self.assertTrue(hasattr(ex, attribute)) self.assertTrue(hasattr(ex, attribute))
self.assertIsInstance(getattr(ex, attribute), 'str') self.assertIsInstance(getattr(ex, attribute), 'str')
for attribute in ('is_preferred', 'is_admin'): for attribute in ('is_preferred', 'is_admin'):

View File

@ -14,23 +14,24 @@ from openstack.tests.functional.shared_file_system import base
class LimitTest(base.BaseSharedFileSystemTest): class LimitTest(base.BaseSharedFileSystemTest):
def test_limits(self): def test_limits(self):
limits = self.user_cloud.shared_file_system.limits() limits = self.user_cloud.shared_file_system.limits()
self.assertGreater(len(list(limits)), 0) self.assertGreater(len(list(limits)), 0)
for limit in limits: for limit in limits:
for attribute in ("maxTotalReplicaGigabytes", for attribute in (
"maxTotalShares", "maxTotalReplicaGigabytes",
"maxTotalShareGigabytes", "maxTotalShares",
"maxTotalShareNetworks", "maxTotalShareGigabytes",
"maxTotalShareSnapshots", "maxTotalShareNetworks",
"maxTotalShareReplicas", "maxTotalShareSnapshots",
"maxTotalSnapshotGigabytes", "maxTotalShareReplicas",
"totalReplicaGigabytesUsed", "maxTotalSnapshotGigabytes",
"totalShareGigabytesUsed", "totalReplicaGigabytesUsed",
"totalSharesUsed", "totalShareGigabytesUsed",
"totalShareNetworksUsed", "totalSharesUsed",
"totalShareSnapshotsUsed", "totalShareNetworksUsed",
"totalSnapshotGigabytesUsed", "totalShareSnapshotsUsed",
"totalShareReplicasUsed"): "totalSnapshotGigabytesUsed",
"totalShareReplicasUsed",
):
self.assertTrue(hasattr(limit, attribute)) self.assertTrue(hasattr(limit, attribute))

View File

@ -15,19 +15,20 @@ from openstack.tests.functional.shared_file_system import base
class ShareTest(base.BaseSharedFileSystemTest): class ShareTest(base.BaseSharedFileSystemTest):
def setUp(self): def setUp(self):
super(ShareTest, self).setUp() super(ShareTest, self).setUp()
self.SHARE_NAME = self.getUniqueString() self.SHARE_NAME = self.getUniqueString()
my_share = self.create_share( my_share = self.create_share(
name=self.SHARE_NAME, size=2, share_type="dhss_false", name=self.SHARE_NAME,
share_protocol='NFS', description=None) size=2,
share_type="dhss_false",
share_protocol='NFS',
description=None,
)
self.SHARE_ID = my_share.id self.SHARE_ID = my_share.id
self.SHARE_SIZE = my_share.size self.SHARE_SIZE = my_share.size
my_share_snapshot = self.create_share_snapshot( my_share_snapshot = self.create_share_snapshot(share_id=self.SHARE_ID)
share_id=self.SHARE_ID
)
self.SHARE_SNAPSHOT_ID = my_share_snapshot.id self.SHARE_SNAPSHOT_ID = my_share_snapshot.id
def test_get(self): def test_get(self):
@ -44,73 +45,73 @@ class ShareTest(base.BaseSharedFileSystemTest):
def test_update(self): def test_update(self):
updated_share = self.user_cloud.share.update_share( updated_share = self.user_cloud.share.update_share(
self.SHARE_ID, display_description='updated share') self.SHARE_ID, display_description='updated share'
get_updated_share = self.user_cloud.share.get_share( )
updated_share.id) get_updated_share = self.user_cloud.share.get_share(updated_share.id)
self.assertEqual('updated share', get_updated_share.description) self.assertEqual('updated share', get_updated_share.description)
def test_revert_share_to_snapshot(self): def test_revert_share_to_snapshot(self):
self.user_cloud.share.revert_share_to_snapshot( self.user_cloud.share.revert_share_to_snapshot(
self.SHARE_ID, self.SHARE_SNAPSHOT_ID) self.SHARE_ID, self.SHARE_SNAPSHOT_ID
get_reverted_share = self.user_cloud.share.get_share( )
self.SHARE_ID) get_reverted_share = self.user_cloud.share.get_share(self.SHARE_ID)
self.user_cloud.share.wait_for_status( self.user_cloud.share.wait_for_status(
get_reverted_share, get_reverted_share,
status='available', status='available',
failures=['error'], failures=['error'],
interval=5, interval=5,
wait=self._wait_for_timeout) wait=self._wait_for_timeout,
)
self.assertIsNotNone(get_reverted_share.id) self.assertIsNotNone(get_reverted_share.id)
def test_resize_share_larger(self): def test_resize_share_larger(self):
larger_size = 3 larger_size = 3
self.user_cloud.share.resize_share( self.user_cloud.share.resize_share(self.SHARE_ID, larger_size)
self.SHARE_ID, larger_size)
get_resized_share = self.user_cloud.share.get_share( get_resized_share = self.user_cloud.share.get_share(self.SHARE_ID)
self.SHARE_ID)
self.user_cloud.share.wait_for_status( self.user_cloud.share.wait_for_status(
get_resized_share, get_resized_share,
status='available', status='available',
failures=['error'], failures=['error'],
interval=5, interval=5,
wait=self._wait_for_timeout) wait=self._wait_for_timeout,
)
self.assertEqual(larger_size, get_resized_share.size) self.assertEqual(larger_size, get_resized_share.size)
def test_resize_share_smaller(self): def test_resize_share_smaller(self):
# Resize to 3 GiB # Resize to 3 GiB
smaller_size = 1 smaller_size = 1
self.user_cloud.share.resize_share( self.user_cloud.share.resize_share(self.SHARE_ID, smaller_size)
self.SHARE_ID, smaller_size)
get_resized_share = self.user_cloud.share.get_share( get_resized_share = self.user_cloud.share.get_share(self.SHARE_ID)
self.SHARE_ID)
self.user_cloud.share.wait_for_status( self.user_cloud.share.wait_for_status(
get_resized_share, get_resized_share,
status='available', status='available',
failures=['error'], failures=['error'],
interval=5, interval=5,
wait=self._wait_for_timeout) wait=self._wait_for_timeout,
)
self.assertEqual(smaller_size, get_resized_share.size) self.assertEqual(smaller_size, get_resized_share.size)
def test_resize_share_larger_no_extend(self): def test_resize_share_larger_no_extend(self):
larger_size = 3 larger_size = 3
self.user_cloud.share.resize_share( self.user_cloud.share.resize_share(
self.SHARE_ID, larger_size, no_extend=True) self.SHARE_ID, larger_size, no_extend=True
)
get_resized_share = self.user_cloud.share.get_share( get_resized_share = self.user_cloud.share.get_share(self.SHARE_ID)
self.SHARE_ID)
self.user_cloud.share.wait_for_status( self.user_cloud.share.wait_for_status(
get_resized_share, get_resized_share,
status='available', status='available',
failures=['error'], failures=['error'],
interval=5, interval=5,
wait=self._wait_for_timeout) wait=self._wait_for_timeout,
)
# Assert that no change was made. # Assert that no change was made.
self.assertEqual(self.SHARE_SIZE, get_resized_share.size) self.assertEqual(self.SHARE_SIZE, get_resized_share.size)
@ -119,17 +120,18 @@ class ShareTest(base.BaseSharedFileSystemTest):
smaller_size = 1 smaller_size = 1
self.user_cloud.share.resize_share( self.user_cloud.share.resize_share(
self.SHARE_ID, smaller_size, no_shrink=True) self.SHARE_ID, smaller_size, no_shrink=True
)
get_resized_share = self.user_cloud.share.get_share( get_resized_share = self.user_cloud.share.get_share(self.SHARE_ID)
self.SHARE_ID)
self.user_cloud.share.wait_for_status( self.user_cloud.share.wait_for_status(
get_resized_share, get_resized_share,
status='available', status='available',
failures=['error'], failures=['error'],
interval=5, interval=5,
wait=self._wait_for_timeout) wait=self._wait_for_timeout,
)
# Assert that no change was made. # Assert that no change was made.
self.assertEqual(self.SHARE_SIZE, get_resized_share.size) self.assertEqual(self.SHARE_SIZE, get_resized_share.size)
@ -138,15 +140,16 @@ class ShareTest(base.BaseSharedFileSystemTest):
# Resize to 3 GiB # Resize to 3 GiB
larger_size = 3 larger_size = 3
self.user_cloud.share.resize_share( self.user_cloud.share.resize_share(
self.SHARE_ID, larger_size, force=True) self.SHARE_ID, larger_size, force=True
)
get_resized_share = self.user_cloud.share.get_share( get_resized_share = self.user_cloud.share.get_share(self.SHARE_ID)
self.SHARE_ID)
self.user_cloud.share.wait_for_status( self.user_cloud.share.wait_for_status(
get_resized_share, get_resized_share,
status='available', status='available',
failures=['error'], failures=['error'],
interval=5, interval=5,
wait=self._wait_for_timeout) wait=self._wait_for_timeout,
)
self.assertEqual(larger_size, get_resized_share.size) self.assertEqual(larger_size, get_resized_share.size)

View File

@ -14,20 +14,24 @@ from openstack.tests.functional.shared_file_system import base
class ShareAccessRuleTest(base.BaseSharedFileSystemTest): class ShareAccessRuleTest(base.BaseSharedFileSystemTest):
def setUp(self): def setUp(self):
super(ShareAccessRuleTest, self).setUp() super(ShareAccessRuleTest, self).setUp()
self.SHARE_NAME = self.getUniqueString() self.SHARE_NAME = self.getUniqueString()
mys = self.create_share( mys = self.create_share(
name=self.SHARE_NAME, size=2, share_type="dhss_false", name=self.SHARE_NAME,
share_protocol='NFS', description=None) size=2,
share_type="dhss_false",
share_protocol='NFS',
description=None,
)
self.user_cloud.shared_file_system.wait_for_status( self.user_cloud.shared_file_system.wait_for_status(
mys, mys,
status='available', status='available',
failures=['error'], failures=['error'],
interval=5, interval=5,
wait=self._wait_for_timeout) wait=self._wait_for_timeout,
)
self.assertIsNotNone(mys) self.assertIsNotNone(mys)
self.assertIsNotNone(mys.id) self.assertIsNotNone(mys.id)
self.SHARE_ID = mys.id self.SHARE_ID = mys.id
@ -36,16 +40,14 @@ class ShareAccessRuleTest(base.BaseSharedFileSystemTest):
self.SHARE_ID, self.SHARE_ID,
access_level="rw", access_level="rw",
access_type="ip", access_type="ip",
access_to="0.0.0.0/0" access_to="0.0.0.0/0",
) )
self.ACCESS_ID = access_rule.id self.ACCESS_ID = access_rule.id
self.RESOURCE_KEY = access_rule.resource_key self.RESOURCE_KEY = access_rule.resource_key
def tearDown(self): def tearDown(self):
acr = self.user_cloud.share.delete_access_rule( acr = self.user_cloud.share.delete_access_rule(
self.ACCESS_ID, self.ACCESS_ID, self.SHARE_ID, ignore_missing=True
self.SHARE_ID,
ignore_missing=True
) )
self.assertIsNone(acr) self.assertIsNone(acr)
@ -59,12 +61,19 @@ class ShareAccessRuleTest(base.BaseSharedFileSystemTest):
def test_list_access_rules(self): def test_list_access_rules(self):
rules = self.user_cloud.shared_file_system.access_rules( rules = self.user_cloud.shared_file_system.access_rules(
self.SHARE, self.SHARE, details=True
details=True
) )
self.assertGreater(len(list(rules)), 0) self.assertGreater(len(list(rules)), 0)
for rule in rules: for rule in rules:
for attribute in ('id', 'created_at', 'updated_at', for attribute in (
'access_level', 'access_type', 'access_to', 'id',
'share_id', 'access_key', 'metadata'): 'created_at',
'updated_at',
'access_level',
'access_type',
'access_to',
'share_id',
'access_key',
'metadata',
):
self.assertTrue(hasattr(rule, attribute)) self.assertTrue(hasattr(rule, attribute))

View File

@ -25,8 +25,12 @@ class ShareInstanceTest(base.BaseSharedFileSystemTest):
self.SHARE_NAME = self.getUniqueString() self.SHARE_NAME = self.getUniqueString()
my_share = self.create_share( my_share = self.create_share(
name=self.SHARE_NAME, size=2, share_type="dhss_false", name=self.SHARE_NAME,
share_protocol='NFS', description=None) size=2,
share_type="dhss_false",
share_protocol='NFS',
description=None,
)
self.SHARE_ID = my_share.id self.SHARE_ID = my_share.id
instances_list = self.operator_cloud.share.share_instances() instances_list = self.operator_cloud.share.share_instances()
self.SHARE_INSTANCE_ID = None self.SHARE_INSTANCE_ID = None
@ -36,7 +40,8 @@ class ShareInstanceTest(base.BaseSharedFileSystemTest):
def test_get(self): def test_get(self):
sot = self.operator_cloud.share.get_share_instance( sot = self.operator_cloud.share.get_share_instance(
self.SHARE_INSTANCE_ID) self.SHARE_INSTANCE_ID
)
assert isinstance(sot, _share_instance.ShareInstance) assert isinstance(sot, _share_instance.ShareInstance)
self.assertEqual(self.SHARE_INSTANCE_ID, sot.id) self.assertEqual(self.SHARE_INSTANCE_ID, sot.id)
@ -44,25 +49,36 @@ class ShareInstanceTest(base.BaseSharedFileSystemTest):
share_instances = self.operator_cloud.share.share_instances() share_instances = self.operator_cloud.share.share_instances()
self.assertGreater(len(list(share_instances)), 0) self.assertGreater(len(list(share_instances)), 0)
for share_instance in share_instances: for share_instance in share_instances:
for attribute in ('id', 'name', 'created_at', for attribute in (
'access_rules_status', 'id',
'availability_zone'): 'name',
'created_at',
'access_rules_status',
'availability_zone',
):
self.assertTrue(hasattr(share_instance, attribute)) self.assertTrue(hasattr(share_instance, attribute))
def test_reset(self): def test_reset(self):
res = self.operator_cloud.share.reset_share_instance_status( res = self.operator_cloud.share.reset_share_instance_status(
self.SHARE_INSTANCE_ID, 'error') self.SHARE_INSTANCE_ID, 'error'
)
self.assertIsNone(res) self.assertIsNone(res)
sot = self.operator_cloud.share.get_share_instance( sot = self.operator_cloud.share.get_share_instance(
self.SHARE_INSTANCE_ID) self.SHARE_INSTANCE_ID
)
self.assertEqual('error', sot.status) self.assertEqual('error', sot.status)
def test_delete(self): def test_delete(self):
sot = self.operator_cloud.share.get_share_instance( sot = self.operator_cloud.share.get_share_instance(
self.SHARE_INSTANCE_ID) self.SHARE_INSTANCE_ID
)
fdel = self.operator_cloud.share.delete_share_instance( fdel = self.operator_cloud.share.delete_share_instance(
self.SHARE_INSTANCE_ID) self.SHARE_INSTANCE_ID
resource.wait_for_delete(self.operator_cloud.share, sot, )
wait=self._wait_for_timeout, resource.wait_for_delete(
interval=2) self.operator_cloud.share,
sot,
wait=self._wait_for_timeout,
interval=2,
)
self.assertIsNone(fdel) self.assertIsNone(fdel)

View File

@ -15,27 +15,28 @@ from openstack.tests.functional.shared_file_system import base
class ShareNetworkTest(base.BaseSharedFileSystemTest): class ShareNetworkTest(base.BaseSharedFileSystemTest):
def setUp(self): def setUp(self):
super(ShareNetworkTest, self).setUp() super(ShareNetworkTest, self).setUp()
self.SHARE_NETWORK_NAME = self.getUniqueString() self.SHARE_NETWORK_NAME = self.getUniqueString()
snt = self.user_cloud.shared_file_system.create_share_network( snt = self.user_cloud.shared_file_system.create_share_network(
name=self.SHARE_NETWORK_NAME) name=self.SHARE_NETWORK_NAME
)
self.assertIsNotNone(snt) self.assertIsNotNone(snt)
self.assertIsNotNone(snt.id) self.assertIsNotNone(snt.id)
self.SHARE_NETWORK_ID = snt.id self.SHARE_NETWORK_ID = snt.id
def tearDown(self): def tearDown(self):
sot = self.user_cloud.shared_file_system.delete_share_network( sot = self.user_cloud.shared_file_system.delete_share_network(
self.SHARE_NETWORK_ID, self.SHARE_NETWORK_ID, ignore_missing=True
ignore_missing=True) )
self.assertIsNone(sot) self.assertIsNone(sot)
super(ShareNetworkTest, self).tearDown() super(ShareNetworkTest, self).tearDown()
def test_get(self): def test_get(self):
sot = self.user_cloud.shared_file_system.get_share_network( sot = self.user_cloud.shared_file_system.get_share_network(
self.SHARE_NETWORK_ID) self.SHARE_NETWORK_ID
)
assert isinstance(sot, _share_network.ShareNetwork) assert isinstance(sot, _share_network.ShareNetwork)
self.assertEqual(self.SHARE_NETWORK_ID, sot.id) self.assertEqual(self.SHARE_NETWORK_ID, sot.id)
@ -50,12 +51,13 @@ class ShareNetworkTest(base.BaseSharedFileSystemTest):
def test_delete_share_network(self): def test_delete_share_network(self):
sot = self.user_cloud.shared_file_system.delete_share_network( sot = self.user_cloud.shared_file_system.delete_share_network(
self.SHARE_NETWORK_ID) self.SHARE_NETWORK_ID
)
self.assertIsNone(sot) self.assertIsNone(sot)
def test_update(self): def test_update(self):
unt = self.user_cloud.shared_file_system.update_share_network( unt = self.user_cloud.shared_file_system.update_share_network(
self.SHARE_NETWORK_ID, description='updated share network') self.SHARE_NETWORK_ID, description='updated share network'
get_unt = self.user_cloud.shared_file_system.get_share_network( )
unt.id) get_unt = self.user_cloud.shared_file_system.get_share_network(unt.id)
self.assertEqual('updated share network', get_unt.description) self.assertEqual('updated share network', get_unt.description)

View File

@ -11,65 +11,76 @@
# under the License. # under the License.
from openstack.shared_file_system.v2 import ( from openstack.shared_file_system.v2 import (
share_network_subnet as _share_network_subnet share_network_subnet as _share_network_subnet,
) )
from openstack.tests.functional.shared_file_system import base from openstack.tests.functional.shared_file_system import base
class ShareNetworkSubnetTest(base.BaseSharedFileSystemTest): class ShareNetworkSubnetTest(base.BaseSharedFileSystemTest):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
zones = self.user_cloud.shared_file_system.\ zones = self.user_cloud.shared_file_system.availability_zones()
availability_zones()
first_zone = next(zones) first_zone = next(zones)
self.SHARE_NETWORK_NAME = self.getUniqueString() self.SHARE_NETWORK_NAME = self.getUniqueString()
snt = self.user_cloud.shared_file_system.create_share_network( snt = self.user_cloud.shared_file_system.create_share_network(
name=self.SHARE_NETWORK_NAME) name=self.SHARE_NETWORK_NAME
)
self.assertIsNotNone(snt) self.assertIsNotNone(snt)
self.assertIsNotNone(snt.id) self.assertIsNotNone(snt.id)
self.SHARE_NETWORK_ID = snt.id self.SHARE_NETWORK_ID = snt.id
snsb = self.user_cloud.shared_file_system.\ snsb = self.user_cloud.shared_file_system.create_share_network_subnet(
create_share_network_subnet(self.SHARE_NETWORK_ID, self.SHARE_NETWORK_ID, availability_zone=first_zone.name
availability_zone=first_zone.name) )
self.assertIsNotNone(snsb) self.assertIsNotNone(snsb)
self.assertIsNotNone(snsb.id) self.assertIsNotNone(snsb.id)
self.SHARE_NETWORK_SUBNET_ID = snsb.id self.SHARE_NETWORK_SUBNET_ID = snsb.id
def tearDown(self): def tearDown(self):
subnet = self.user_cloud.shared_file_system.\ subnet = self.user_cloud.shared_file_system.get_share_network_subnet(
get_share_network_subnet(self.SHARE_NETWORK_ID, self.SHARE_NETWORK_ID, self.SHARE_NETWORK_SUBNET_ID
self.SHARE_NETWORK_SUBNET_ID) )
fdel = self.user_cloud.shared_file_system.\ fdel = self.user_cloud.shared_file_system.delete_share_network_subnet(
delete_share_network_subnet(self.SHARE_NETWORK_ID, self.SHARE_NETWORK_ID,
self.SHARE_NETWORK_SUBNET_ID, self.SHARE_NETWORK_SUBNET_ID,
ignore_missing=True) ignore_missing=True,
)
self.assertIsNone(fdel) self.assertIsNone(fdel)
self.user_cloud.shared_file_system.\ self.user_cloud.shared_file_system.wait_for_delete(subnet)
wait_for_delete(subnet) sot = self.user_cloud.shared_file_system.delete_share_network(
sot = self.user_cloud.shared_file_system.\ self.SHARE_NETWORK_ID, ignore_missing=True
delete_share_network(self.SHARE_NETWORK_ID, )
ignore_missing=True)
self.assertIsNone(sot) self.assertIsNone(sot)
super().tearDown() super().tearDown()
def test_get(self): def test_get(self):
sub = self.user_cloud.shared_file_system.\ sub = self.user_cloud.shared_file_system.get_share_network_subnet(
get_share_network_subnet(self.SHARE_NETWORK_ID, self.SHARE_NETWORK_ID, self.SHARE_NETWORK_SUBNET_ID
self.SHARE_NETWORK_SUBNET_ID) )
assert isinstance(sub, _share_network_subnet.ShareNetworkSubnet) assert isinstance(sub, _share_network_subnet.ShareNetworkSubnet)
def test_list(self): def test_list(self):
subs = self.user_cloud.shared_file_system.share_network_subnets( subs = self.user_cloud.shared_file_system.share_network_subnets(
self.SHARE_NETWORK_ID) self.SHARE_NETWORK_ID
)
self.assertGreater(len(list(subs)), 0) self.assertGreater(len(list(subs)), 0)
for sub in subs: for sub in subs:
for attribute in ('id', 'name', 'created_at', 'updated_at', for attribute in (
'share_network_id', 'availability_zone', 'id',
'cidr', 'gateway', 'ip_version', 'mtu', 'name',
'network_type', 'neutron_net_id', 'created_at',
'neutron_subnet_id', 'segmentation_id', 'updated_at',
'share_network_name'): 'share_network_id',
'availability_zone',
'cidr',
'gateway',
'ip_version',
'mtu',
'network_type',
'neutron_net_id',
'neutron_subnet_id',
'segmentation_id',
'share_network_name',
):
self.assertTrue(hasattr(sub, attribute)) self.assertTrue(hasattr(sub, attribute))

View File

@ -14,48 +14,56 @@ from openstack.tests.functional.shared_file_system import base
class ShareSnapshotTest(base.BaseSharedFileSystemTest): class ShareSnapshotTest(base.BaseSharedFileSystemTest):
def setUp(self): def setUp(self):
super(ShareSnapshotTest, self).setUp() super(ShareSnapshotTest, self).setUp()
self.SHARE_NAME = self.getUniqueString() self.SHARE_NAME = self.getUniqueString()
self.SNAPSHOT_NAME = self.getUniqueString() self.SNAPSHOT_NAME = self.getUniqueString()
my_share = self.operator_cloud.shared_file_system.create_share( my_share = self.operator_cloud.shared_file_system.create_share(
name=self.SHARE_NAME, size=2, share_type="dhss_false", name=self.SHARE_NAME,
share_protocol='NFS', description=None) size=2,
share_type="dhss_false",
share_protocol='NFS',
description=None,
)
self.operator_cloud.shared_file_system.wait_for_status( self.operator_cloud.shared_file_system.wait_for_status(
my_share, my_share,
status='available', status='available',
failures=['error'], failures=['error'],
interval=5, interval=5,
wait=self._wait_for_timeout) wait=self._wait_for_timeout,
)
self.assertIsNotNone(my_share) self.assertIsNotNone(my_share)
self.assertIsNotNone(my_share.id) self.assertIsNotNone(my_share.id)
self.SHARE_ID = my_share.id self.SHARE_ID = my_share.id
msp = self.operator_cloud.shared_file_system.create_share_snapshot( msp = self.operator_cloud.shared_file_system.create_share_snapshot(
share_id=self.SHARE_ID, name=self.SNAPSHOT_NAME, force=True) share_id=self.SHARE_ID, name=self.SNAPSHOT_NAME, force=True
)
self.operator_cloud.shared_file_system.wait_for_status( self.operator_cloud.shared_file_system.wait_for_status(
msp, msp,
status='available', status='available',
failures=['error'], failures=['error'],
interval=5, interval=5,
wait=self._wait_for_timeout) wait=self._wait_for_timeout,
)
self.assertIsNotNone(msp.id) self.assertIsNotNone(msp.id)
self.SNAPSHOT_ID = msp.id self.SNAPSHOT_ID = msp.id
def tearDown(self): def tearDown(self):
snpt = self.operator_cloud.shared_file_system.get_share_snapshot( snpt = self.operator_cloud.shared_file_system.get_share_snapshot(
self.SNAPSHOT_ID) self.SNAPSHOT_ID
)
sot = self.operator_cloud.shared_file_system.delete_share_snapshot( sot = self.operator_cloud.shared_file_system.delete_share_snapshot(
snpt, ignore_missing=False snpt, ignore_missing=False
) )
self.operator_cloud.shared_file_system.wait_for_delete( self.operator_cloud.shared_file_system.wait_for_delete(
snpt, interval=2, wait=self._wait_for_timeout) snpt, interval=2, wait=self._wait_for_timeout
)
self.assertIsNone(sot) self.assertIsNone(sot)
sot = self.operator_cloud.shared_file_system.delete_share( sot = self.operator_cloud.shared_file_system.delete_share(
self.SHARE_ID, self.SHARE_ID, ignore_missing=False
ignore_missing=False) )
self.assertIsNone(sot) self.assertIsNone(sot)
super(ShareSnapshotTest, self).tearDown() super(ShareSnapshotTest, self).tearDown()
@ -71,14 +79,26 @@ class ShareSnapshotTest(base.BaseSharedFileSystemTest):
) )
self.assertGreater(len(list(snaps)), 0) self.assertGreater(len(list(snaps)), 0)
for snap in snaps: for snap in snaps:
for attribute in ('id', 'name', 'created_at', 'updated_at', for attribute in (
'description', 'share_id', 'share_proto', 'id',
'share_size', 'size', 'status', 'user_id'): 'name',
'created_at',
'updated_at',
'description',
'share_id',
'share_proto',
'share_size',
'size',
'status',
'user_id',
):
self.assertTrue(hasattr(snap, attribute)) self.assertTrue(hasattr(snap, attribute))
def test_update(self): def test_update(self):
u_snap = self.operator_cloud.shared_file_system.update_share_snapshot( u_snap = self.operator_cloud.shared_file_system.update_share_snapshot(
self.SNAPSHOT_ID, display_description='updated share snapshot') self.SNAPSHOT_ID, display_description='updated share snapshot'
)
get_u_snap = self.operator_cloud.shared_file_system.get_share_snapshot( get_u_snap = self.operator_cloud.shared_file_system.get_share_snapshot(
u_snap.id) u_snap.id
)
self.assertEqual('updated share snapshot', get_u_snap.description) self.assertEqual('updated share snapshot', get_u_snap.description)

View File

@ -14,22 +14,24 @@ from openstack.tests.functional.shared_file_system import base
class ShareSnapshotInstanceTest(base.BaseSharedFileSystemTest): class ShareSnapshotInstanceTest(base.BaseSharedFileSystemTest):
def setUp(self): def setUp(self):
super(ShareSnapshotInstanceTest, self).setUp() super(ShareSnapshotInstanceTest, self).setUp()
self.SHARE_NAME = self.getUniqueString() self.SHARE_NAME = self.getUniqueString()
my_share = self.create_share( my_share = self.create_share(
name=self.SHARE_NAME, size=2, share_type="dhss_false", name=self.SHARE_NAME,
share_protocol='NFS', description=None) size=2,
self.SHARE_ID = my_share.id share_type="dhss_false",
self.create_share_snapshot( share_protocol='NFS',
share_id=self.SHARE_ID description=None,
) )
self.SHARE_ID = my_share.id
self.create_share_snapshot(share_id=self.SHARE_ID)
def test_share_snapshot_instances(self): def test_share_snapshot_instances(self):
sots = \ sots = (
self.operator_cloud.shared_file_system.share_snapshot_instances() self.operator_cloud.shared_file_system.share_snapshot_instances()
)
self.assertGreater(len(list(sots)), 0) self.assertGreater(len(list(sots)), 0)
for sot in sots: for sot in sots:
for attribute in ('id', 'name', 'created_at', 'updated_at'): for attribute in ('id', 'name', 'created_at', 'updated_at'):

View File

@ -14,11 +14,15 @@ from openstack.tests.functional.shared_file_system import base
class StoragePoolTest(base.BaseSharedFileSystemTest): class StoragePoolTest(base.BaseSharedFileSystemTest):
def test_storage_pools(self): def test_storage_pools(self):
pools = self.operator_cloud.shared_file_system.storage_pools() pools = self.operator_cloud.shared_file_system.storage_pools()
self.assertGreater(len(list(pools)), 0) self.assertGreater(len(list(pools)), 0)
for pool in pools: for pool in pools:
for attribute in ('pool', 'name', 'host', 'backend', for attribute in (
'capabilities'): 'pool',
'name',
'host',
'backend',
'capabilities',
):
self.assertTrue(hasattr(pool, attribute)) self.assertTrue(hasattr(pool, attribute))

View File

@ -14,7 +14,6 @@ from openstack.tests.functional.shared_file_system import base
class UserMessageTest(base.BaseSharedFileSystemTest): class UserMessageTest(base.BaseSharedFileSystemTest):
def test_user_messages(self): def test_user_messages(self):
# TODO(kafilat): We must intentionally cause an asynchronous failure to # TODO(kafilat): We must intentionally cause an asynchronous failure to
# ensure that at least one user message exists; # ensure that at least one user message exists;
@ -22,11 +21,19 @@ class UserMessageTest(base.BaseSharedFileSystemTest):
# self.assertGreater(len(list(u_messages)), 0) # self.assertGreater(len(list(u_messages)), 0)
for u_message in u_messages: for u_message in u_messages:
for attribute in ( for attribute in (
'id', 'created_at', 'action_id', 'detail_id', 'id',
'expires_at', 'message_level', 'project_id', 'request_id', 'created_at',
'resource_id', 'resource_type', 'user_message'): 'action_id',
'detail_id',
'expires_at',
'message_level',
'project_id',
'request_id',
'resource_id',
'resource_type',
'user_message',
):
self.assertTrue(hasattr(u_message, attribute)) self.assertTrue(hasattr(u_message, attribute))
self.assertIsInstance(getattr(u_message, attribute), str) self.assertIsInstance(getattr(u_message, attribute), str)
self.conn.shared_file_system.delete_user_message( self.conn.shared_file_system.delete_user_message(u_message)
u_message)

View File

@ -23,7 +23,6 @@ EXAMPLE = {
class TestAvailabilityZone(base.TestCase): class TestAvailabilityZone(base.TestCase):
def test_basic(self): def test_basic(self):
az_resource = az.AvailabilityZone() az_resource = az.AvailabilityZone()
self.assertEqual('availability_zones', az_resource.resources_key) self.assertEqual('availability_zones', az_resource.resources_key)

View File

@ -27,12 +27,11 @@ EXAMPLE = {
"maxTotalShareReplicas": 100, "maxTotalShareReplicas": 100,
"maxTotalReplicaGigabytes": 1000, "maxTotalReplicaGigabytes": 1000,
"totalShareReplicasUsed": 0, "totalShareReplicasUsed": 0,
"totalReplicaGigabytesUsed": 0 "totalReplicaGigabytesUsed": 0,
} }
class TestLimit(base.TestCase): class TestLimit(base.TestCase):
def test_basic(self): def test_basic(self):
limits = limit.Limit() limits = limit.Limit()
self.assertEqual('limits', limits.resources_key) self.assertEqual('limits', limits.resources_key)
@ -46,31 +45,45 @@ class TestLimit(base.TestCase):
def test_make_limits(self): def test_make_limits(self):
limits = limit.Limit(**EXAMPLE) limits = limit.Limit(**EXAMPLE)
self.assertEqual(EXAMPLE['totalShareNetworksUsed'], self.assertEqual(
limits.totalShareNetworksUsed) EXAMPLE['totalShareNetworksUsed'], limits.totalShareNetworksUsed
self.assertEqual(EXAMPLE['maxTotalShareGigabytes'], )
limits.maxTotalShareGigabytes) self.assertEqual(
self.assertEqual(EXAMPLE['maxTotalShareNetworks'], EXAMPLE['maxTotalShareGigabytes'], limits.maxTotalShareGigabytes
limits.maxTotalShareNetworks) )
self.assertEqual(EXAMPLE['totalSharesUsed'], self.assertEqual(
limits.totalSharesUsed) EXAMPLE['maxTotalShareNetworks'], limits.maxTotalShareNetworks
self.assertEqual(EXAMPLE['totalShareGigabytesUsed'], )
limits.totalShareGigabytesUsed) self.assertEqual(EXAMPLE['totalSharesUsed'], limits.totalSharesUsed)
self.assertEqual(EXAMPLE['totalShareSnapshotsUsed'], self.assertEqual(
limits.totalShareSnapshotsUsed) EXAMPLE['totalShareGigabytesUsed'], limits.totalShareGigabytesUsed
self.assertEqual(EXAMPLE['maxTotalShares'], )
limits.maxTotalShares) self.assertEqual(
self.assertEqual(EXAMPLE['totalSnapshotGigabytesUsed'], EXAMPLE['totalShareSnapshotsUsed'], limits.totalShareSnapshotsUsed
limits.totalSnapshotGigabytesUsed) )
self.assertEqual(EXAMPLE['maxTotalSnapshotGigabytes'], self.assertEqual(EXAMPLE['maxTotalShares'], limits.maxTotalShares)
limits.maxTotalSnapshotGigabytes) self.assertEqual(
self.assertEqual(EXAMPLE['maxTotalShareSnapshots'], EXAMPLE['totalSnapshotGigabytesUsed'],
limits.maxTotalShareSnapshots) limits.totalSnapshotGigabytesUsed,
self.assertEqual(EXAMPLE['maxTotalShareReplicas'], )
limits.maxTotalShareReplicas) self.assertEqual(
self.assertEqual(EXAMPLE['maxTotalReplicaGigabytes'], EXAMPLE['maxTotalSnapshotGigabytes'],
limits.maxTotalReplicaGigabytes) limits.maxTotalSnapshotGigabytes,
self.assertEqual(EXAMPLE['totalShareReplicasUsed'], )
limits.totalShareReplicasUsed) self.assertEqual(
self.assertEqual(EXAMPLE['totalReplicaGigabytesUsed'], EXAMPLE['maxTotalShareSnapshots'], limits.maxTotalShareSnapshots
limits.totalReplicaGigabytesUsed) )
self.assertEqual(
EXAMPLE['maxTotalShareReplicas'], limits.maxTotalShareReplicas
)
self.assertEqual(
EXAMPLE['maxTotalReplicaGigabytes'],
limits.maxTotalReplicaGigabytes,
)
self.assertEqual(
EXAMPLE['totalShareReplicasUsed'], limits.totalShareReplicasUsed
)
self.assertEqual(
EXAMPLE['totalReplicaGigabytesUsed'],
limits.totalReplicaGigabytesUsed,
)

View File

@ -12,12 +12,10 @@
from unittest import mock from unittest import mock
from openstack.shared_file_system.v2 import (
share_access_rule
)
from openstack.shared_file_system.v2 import _proxy from openstack.shared_file_system.v2 import _proxy
from openstack.shared_file_system.v2 import limit from openstack.shared_file_system.v2 import limit
from openstack.shared_file_system.v2 import share from openstack.shared_file_system.v2 import share
from openstack.shared_file_system.v2 import share_access_rule
from openstack.shared_file_system.v2 import share_instance from openstack.shared_file_system.v2 import share_instance
from openstack.shared_file_system.v2 import share_network from openstack.shared_file_system.v2 import share_network
from openstack.shared_file_system.v2 import share_network_subnet from openstack.shared_file_system.v2 import share_network_subnet
@ -29,7 +27,6 @@ from openstack.tests.unit import test_proxy_base
class TestSharedFileSystemProxy(test_proxy_base.TestProxyBase): class TestSharedFileSystemProxy(test_proxy_base.TestProxyBase):
def setUp(self): def setUp(self):
super(TestSharedFileSystemProxy, self).setUp() super(TestSharedFileSystemProxy, self).setUp()
self.proxy = _proxy.Proxy(self.session) self.proxy = _proxy.Proxy(self.session)
@ -40,25 +37,29 @@ class TestSharedFileSystemShare(TestSharedFileSystemProxy):
self.verify_list(self.proxy.shares, share.Share) self.verify_list(self.proxy.shares, share.Share)
def test_shares_detailed(self): def test_shares_detailed(self):
self.verify_list(self.proxy.shares, share.Share, self.verify_list(
method_kwargs={"details": True, "query": 1}, self.proxy.shares,
expected_kwargs={"query": 1}) share.Share,
method_kwargs={"details": True, "query": 1},
expected_kwargs={"query": 1},
)
def test_shares_not_detailed(self): def test_shares_not_detailed(self):
self.verify_list(self.proxy.shares, share.Share, self.verify_list(
method_kwargs={"details": False, "query": 1}, self.proxy.shares,
expected_kwargs={"query": 1}) share.Share,
method_kwargs={"details": False, "query": 1},
expected_kwargs={"query": 1},
)
def test_share_get(self): def test_share_get(self):
self.verify_get(self.proxy.get_share, share.Share) self.verify_get(self.proxy.get_share, share.Share)
def test_share_delete(self): def test_share_delete(self):
self.verify_delete( self.verify_delete(self.proxy.delete_share, share.Share, False)
self.proxy.delete_share, share.Share, False)
def test_share_delete_ignore(self): def test_share_delete_ignore(self):
self.verify_delete( self.verify_delete(self.proxy.delete_share, share.Share, True)
self.proxy.delete_share, share.Share, True)
def test_share_create(self): def test_share_create(self):
self.verify_create(self.proxy.create_share, share.Share) self.verify_create(self.proxy.create_share, share.Share)
@ -71,8 +72,7 @@ class TestSharedFileSystemShare(TestSharedFileSystemProxy):
self.proxy._get = mock.Mock(return_value=mock_share) self.proxy._get = mock.Mock(return_value=mock_share)
self._verify( self._verify(
"openstack.shared_file_system.v2.share." "openstack.shared_file_system.v2.share." + "Share.extend_share",
+ "Share.extend_share",
self.proxy.resize_share, self.proxy.resize_share,
method_args=['fakeId', 20], method_args=['fakeId', 20],
expected_args=[self.proxy, 20, False], expected_args=[self.proxy, 20, False],
@ -83,20 +83,21 @@ class TestSharedFileSystemShare(TestSharedFileSystemProxy):
self.proxy._get = mock.Mock(return_value=mock_share) self.proxy._get = mock.Mock(return_value=mock_share)
self._verify( self._verify(
"openstack.shared_file_system.v2.share." "openstack.shared_file_system.v2.share." + "Share.shrink_share",
+ "Share.shrink_share",
self.proxy.resize_share, self.proxy.resize_share,
method_args=['fakeId', 20], method_args=['fakeId', 20],
expected_args=[self.proxy, 20], expected_args=[self.proxy, 20],
) )
def test_share_instances(self): def test_share_instances(self):
self.verify_list(self.proxy.share_instances, self.verify_list(
share_instance.ShareInstance) self.proxy.share_instances, share_instance.ShareInstance
)
def test_share_instance_get(self): def test_share_instance_get(self):
self.verify_get(self.proxy.get_share_instance, self.verify_get(
share_instance.ShareInstance) self.proxy.get_share_instance, share_instance.ShareInstance
)
def test_share_instance_reset(self): def test_share_instance_reset(self):
self._verify( self._verify(
@ -113,7 +114,8 @@ class TestSharedFileSystemShare(TestSharedFileSystemProxy):
+ "ShareInstance.force_delete", + "ShareInstance.force_delete",
self.proxy.delete_share_instance, self.proxy.delete_share_instance,
method_args=['id'], method_args=['id'],
expected_args=[self.proxy]) expected_args=[self.proxy],
)
@mock.patch("openstack.resource.wait_for_status") @mock.patch("openstack.resource.wait_for_status")
def test_wait_for(self, mock_wait): def test_wait_for(self, mock_wait):
@ -122,30 +124,33 @@ class TestSharedFileSystemShare(TestSharedFileSystemProxy):
self.proxy.wait_for_status(mock_resource, 'ACTIVE') self.proxy.wait_for_status(mock_resource, 'ACTIVE')
mock_wait.assert_called_once_with(self.proxy, mock_resource, mock_wait.assert_called_once_with(
'ACTIVE', [], 2, 120) self.proxy, mock_resource, 'ACTIVE', [], 2, 120
)
class TestSharedFileSystemStoragePool(TestSharedFileSystemProxy): class TestSharedFileSystemStoragePool(TestSharedFileSystemProxy):
def test_storage_pools(self): def test_storage_pools(self):
self.verify_list( self.verify_list(self.proxy.storage_pools, storage_pool.StoragePool)
self.proxy.storage_pools, storage_pool.StoragePool)
def test_storage_pool_detailed(self): def test_storage_pool_detailed(self):
self.verify_list( self.verify_list(
self.proxy.storage_pools, storage_pool.StoragePool, self.proxy.storage_pools,
storage_pool.StoragePool,
method_kwargs={"details": True, "backend": "alpha"}, method_kwargs={"details": True, "backend": "alpha"},
expected_kwargs={"backend": "alpha"}) expected_kwargs={"backend": "alpha"},
)
def test_storage_pool_not_detailed(self): def test_storage_pool_not_detailed(self):
self.verify_list( self.verify_list(
self.proxy.storage_pools, storage_pool.StoragePool, self.proxy.storage_pools,
storage_pool.StoragePool,
method_kwargs={"details": False, "backend": "alpha"}, method_kwargs={"details": False, "backend": "alpha"},
expected_kwargs={"backend": "alpha"}) expected_kwargs={"backend": "alpha"},
)
class TestUserMessageProxy(test_proxy_base.TestProxyBase): class TestUserMessageProxy(test_proxy_base.TestProxyBase):
def setUp(self): def setUp(self):
super(TestUserMessageProxy, self).setUp() super(TestUserMessageProxy, self).setUp()
self.proxy = _proxy.Proxy(self.session) self.proxy = _proxy.Proxy(self.session)
@ -155,70 +160,83 @@ class TestUserMessageProxy(test_proxy_base.TestProxyBase):
def test_user_messages_queried(self): def test_user_messages_queried(self):
self.verify_list( self.verify_list(
self.proxy.user_messages, user_message.UserMessage, self.proxy.user_messages,
user_message.UserMessage,
method_kwargs={"action_id": "1"}, method_kwargs={"action_id": "1"},
expected_kwargs={"action_id": "1"}) expected_kwargs={"action_id": "1"},
)
def test_user_message_get(self): def test_user_message_get(self):
self.verify_get(self.proxy.get_user_message, user_message.UserMessage) self.verify_get(self.proxy.get_user_message, user_message.UserMessage)
def test_delete_user_message(self): def test_delete_user_message(self):
self.verify_delete( self.verify_delete(
self.proxy.delete_user_message, user_message.UserMessage, False) self.proxy.delete_user_message, user_message.UserMessage, False
)
def test_delete_user_message_true(self): def test_delete_user_message_true(self):
self.verify_delete( self.verify_delete(
self.proxy.delete_user_message, user_message.UserMessage, True) self.proxy.delete_user_message, user_message.UserMessage, True
)
def test_limit(self): def test_limit(self):
self.verify_list(self.proxy.limits, limit.Limit) self.verify_list(self.proxy.limits, limit.Limit)
class TestShareSnapshotResource(test_proxy_base.TestProxyBase): class TestShareSnapshotResource(test_proxy_base.TestProxyBase):
def setUp(self): def setUp(self):
super(TestShareSnapshotResource, self).setUp() super(TestShareSnapshotResource, self).setUp()
self.proxy = _proxy.Proxy(self.session) self.proxy = _proxy.Proxy(self.session)
def test_share_snapshots(self): def test_share_snapshots(self):
self.verify_list( self.verify_list(
self.proxy.share_snapshots, share_snapshot.ShareSnapshot) self.proxy.share_snapshots, share_snapshot.ShareSnapshot
)
def test_share_snapshots_detailed(self): def test_share_snapshots_detailed(self):
self.verify_list( self.verify_list(
self.proxy.share_snapshots, self.proxy.share_snapshots,
share_snapshot.ShareSnapshot, share_snapshot.ShareSnapshot,
method_kwargs={"details": True, "name": "my_snapshot"}, method_kwargs={"details": True, "name": "my_snapshot"},
expected_kwargs={"name": "my_snapshot"}) expected_kwargs={"name": "my_snapshot"},
)
def test_share_snapshots_not_detailed(self): def test_share_snapshots_not_detailed(self):
self.verify_list( self.verify_list(
self.proxy.share_snapshots, self.proxy.share_snapshots,
share_snapshot.ShareSnapshot, share_snapshot.ShareSnapshot,
method_kwargs={"details": False, "name": "my_snapshot"}, method_kwargs={"details": False, "name": "my_snapshot"},
expected_kwargs={"name": "my_snapshot"}) expected_kwargs={"name": "my_snapshot"},
)
def test_share_snapshot_get(self): def test_share_snapshot_get(self):
self.verify_get( self.verify_get(
self.proxy.get_share_snapshot, share_snapshot.ShareSnapshot) self.proxy.get_share_snapshot, share_snapshot.ShareSnapshot
)
def test_share_snapshot_delete(self): def test_share_snapshot_delete(self):
self.verify_delete( self.verify_delete(
self.proxy.delete_share_snapshot, self.proxy.delete_share_snapshot,
share_snapshot.ShareSnapshot, False) share_snapshot.ShareSnapshot,
False,
)
def test_share_snapshot_delete_ignore(self): def test_share_snapshot_delete_ignore(self):
self.verify_delete( self.verify_delete(
self.proxy.delete_share_snapshot, self.proxy.delete_share_snapshot,
share_snapshot.ShareSnapshot, True) share_snapshot.ShareSnapshot,
True,
)
def test_share_snapshot_create(self): def test_share_snapshot_create(self):
self.verify_create( self.verify_create(
self.proxy.create_share_snapshot, share_snapshot.ShareSnapshot) self.proxy.create_share_snapshot, share_snapshot.ShareSnapshot
)
def test_share_snapshot_update(self): def test_share_snapshot_update(self):
self.verify_update( self.verify_update(
self.proxy.update_share_snapshot, share_snapshot.ShareSnapshot) self.proxy.update_share_snapshot, share_snapshot.ShareSnapshot
)
@mock.patch("openstack.resource.wait_for_delete") @mock.patch("openstack.resource.wait_for_delete")
def test_wait_for_delete(self, mock_wait): def test_wait_for_delete(self, mock_wait):
@ -231,7 +249,6 @@ class TestShareSnapshotResource(test_proxy_base.TestProxyBase):
class TestShareSnapshotInstanceResource(test_proxy_base.TestProxyBase): class TestShareSnapshotInstanceResource(test_proxy_base.TestProxyBase):
def setUp(self): def setUp(self):
super(TestShareSnapshotInstanceResource, self).setUp() super(TestShareSnapshotInstanceResource, self).setUp()
self.proxy = _proxy.Proxy(self.session) self.proxy = _proxy.Proxy(self.session)
@ -239,34 +256,33 @@ class TestShareSnapshotInstanceResource(test_proxy_base.TestProxyBase):
def test_share_snapshot_instances(self): def test_share_snapshot_instances(self):
self.verify_list( self.verify_list(
self.proxy.share_snapshot_instances, self.proxy.share_snapshot_instances,
share_snapshot_instance.ShareSnapshotInstance) share_snapshot_instance.ShareSnapshotInstance,
)
def test_share_snapshot_instance_detailed(self): def test_share_snapshot_instance_detailed(self):
self.verify_list(self.proxy.share_snapshot_instances, self.verify_list(
share_snapshot_instance.ShareSnapshotInstance, self.proxy.share_snapshot_instances,
method_kwargs={ share_snapshot_instance.ShareSnapshotInstance,
"details": True, method_kwargs={"details": True, "query": {'snapshot_id': 'fake'}},
"query": {'snapshot_id': 'fake'} expected_kwargs={"query": {'snapshot_id': 'fake'}},
}, )
expected_kwargs={"query": {'snapshot_id': 'fake'}})
def test_share_snapshot_instance_not_detailed(self): def test_share_snapshot_instance_not_detailed(self):
self.verify_list(self.proxy.share_snapshot_instances, self.verify_list(
share_snapshot_instance.ShareSnapshotInstance, self.proxy.share_snapshot_instances,
method_kwargs={ share_snapshot_instance.ShareSnapshotInstance,
"details": False, method_kwargs={"details": False, "query": {'snapshot_id': 'fake'}},
"query": {'snapshot_id': 'fake'} expected_kwargs={"query": {'snapshot_id': 'fake'}},
}, )
expected_kwargs={"query": {'snapshot_id': 'fake'}})
def test_share_snapshot_instance_get(self): def test_share_snapshot_instance_get(self):
self.verify_get( self.verify_get(
self.proxy.get_share_snapshot_instance, self.proxy.get_share_snapshot_instance,
share_snapshot_instance.ShareSnapshotInstance) share_snapshot_instance.ShareSnapshotInstance,
)
class TestShareNetworkResource(test_proxy_base.TestProxyBase): class TestShareNetworkResource(test_proxy_base.TestProxyBase):
def setUp(self): def setUp(self):
super(TestShareNetworkResource, self).setUp() super(TestShareNetworkResource, self).setUp()
self.proxy = _proxy.Proxy(self.session) self.proxy = _proxy.Proxy(self.session)
@ -275,38 +291,48 @@ class TestShareNetworkResource(test_proxy_base.TestProxyBase):
self.verify_list(self.proxy.share_networks, share_network.ShareNetwork) self.verify_list(self.proxy.share_networks, share_network.ShareNetwork)
def test_share_networks_detailed(self): def test_share_networks_detailed(self):
self.verify_list(self.proxy.share_networks, share_network.ShareNetwork, self.verify_list(
method_kwargs={"details": True, "name": "my_net"}, self.proxy.share_networks,
expected_kwargs={"name": "my_net"}) share_network.ShareNetwork,
method_kwargs={"details": True, "name": "my_net"},
expected_kwargs={"name": "my_net"},
)
def test_share_networks_not_detailed(self): def test_share_networks_not_detailed(self):
self.verify_list(self.proxy.share_networks, share_network.ShareNetwork, self.verify_list(
method_kwargs={"details": False, "name": "my_net"}, self.proxy.share_networks,
expected_kwargs={"name": "my_net"}) share_network.ShareNetwork,
method_kwargs={"details": False, "name": "my_net"},
expected_kwargs={"name": "my_net"},
)
def test_share_network_get(self): def test_share_network_get(self):
self.verify_get( self.verify_get(
self.proxy.get_share_network, share_network.ShareNetwork) self.proxy.get_share_network, share_network.ShareNetwork
)
def test_share_network_delete(self): def test_share_network_delete(self):
self.verify_delete( self.verify_delete(
self.proxy.delete_share_network, share_network.ShareNetwork, False) self.proxy.delete_share_network, share_network.ShareNetwork, False
)
def test_share_network_delete_ignore(self): def test_share_network_delete_ignore(self):
self.verify_delete( self.verify_delete(
self.proxy.delete_share_network, share_network.ShareNetwork, True) self.proxy.delete_share_network, share_network.ShareNetwork, True
)
def test_share_network_create(self): def test_share_network_create(self):
self.verify_create( self.verify_create(
self.proxy.create_share_network, share_network.ShareNetwork) self.proxy.create_share_network, share_network.ShareNetwork
)
def test_share_network_update(self): def test_share_network_update(self):
self.verify_update( self.verify_update(
self.proxy.update_share_network, share_network.ShareNetwork) self.proxy.update_share_network, share_network.ShareNetwork
)
class TestShareNetworkSubnetResource(test_proxy_base.TestProxyBase): class TestShareNetworkSubnetResource(test_proxy_base.TestProxyBase):
def setUp(self): def setUp(self):
super(TestShareNetworkSubnetResource, self).setUp() super(TestShareNetworkSubnetResource, self).setUp()
self.proxy = _proxy.Proxy(self.session) self.proxy = _proxy.Proxy(self.session)
@ -317,7 +343,8 @@ class TestShareNetworkSubnetResource(test_proxy_base.TestProxyBase):
share_network_subnet.ShareNetworkSubnet, share_network_subnet.ShareNetworkSubnet,
method_args=["test_share"], method_args=["test_share"],
expected_args=[], expected_args=[],
expected_kwargs={"share_network_id": "test_share"}) expected_kwargs={"share_network_id": "test_share"},
)
def test_share_network_subnet_get(self): def test_share_network_subnet_get(self):
self.verify_get( self.verify_get(
@ -325,7 +352,8 @@ class TestShareNetworkSubnetResource(test_proxy_base.TestProxyBase):
share_network_subnet.ShareNetworkSubnet, share_network_subnet.ShareNetworkSubnet,
method_args=["fake_network_id", "fake_sub_network_id"], method_args=["fake_network_id", "fake_sub_network_id"],
expected_args=['fake_sub_network_id'], expected_args=['fake_sub_network_id'],
expected_kwargs={'share_network_id': 'fake_network_id'}) expected_kwargs={'share_network_id': 'fake_network_id'},
)
def test_share_network_subnet_create(self): def test_share_network_subnet_create(self):
self.verify_create( self.verify_create(
@ -336,7 +364,9 @@ class TestShareNetworkSubnetResource(test_proxy_base.TestProxyBase):
expected_args=[], expected_args=[],
expected_kwargs={ expected_kwargs={
"share_network_id": "fake_network_id", "share_network_id": "fake_network_id",
"p1": "v1"}) "p1": "v1",
},
)
def test_share_network_subnet_delete(self): def test_share_network_subnet_delete(self):
self.verify_delete( self.verify_delete(
@ -345,11 +375,11 @@ class TestShareNetworkSubnetResource(test_proxy_base.TestProxyBase):
False, False,
method_args=["fake_network_id", "fake_sub_network_id"], method_args=["fake_network_id", "fake_sub_network_id"],
expected_args=["fake_sub_network_id"], expected_args=["fake_sub_network_id"],
expected_kwargs={'share_network_id': 'fake_network_id'}) expected_kwargs={'share_network_id': 'fake_network_id'},
)
class TestAccessRuleProxy(test_proxy_base.TestProxyBase): class TestAccessRuleProxy(test_proxy_base.TestProxyBase):
def setUp(self): def setUp(self):
super(TestAccessRuleProxy, self).setUp() super(TestAccessRuleProxy, self).setUp()
self.proxy = _proxy.Proxy(self.session) self.proxy = _proxy.Proxy(self.session)
@ -360,18 +390,21 @@ class TestAccessRuleProxy(test_proxy_base.TestProxyBase):
share_access_rule.ShareAccessRule, share_access_rule.ShareAccessRule,
method_args=["test_share"], method_args=["test_share"],
expected_args=[], expected_args=[],
expected_kwargs={"share_id": "test_share"}) expected_kwargs={"share_id": "test_share"},
)
def test_access_rules_get(self): def test_access_rules_get(self):
self.verify_get( self.verify_get(
self.proxy.get_access_rule, share_access_rule.ShareAccessRule) self.proxy.get_access_rule, share_access_rule.ShareAccessRule
)
def test_access_rules_create(self): def test_access_rules_create(self):
self.verify_create( self.verify_create(
self.proxy.create_access_rule, self.proxy.create_access_rule,
share_access_rule.ShareAccessRule, share_access_rule.ShareAccessRule,
method_args=["share_id"], method_args=["share_id"],
expected_args=[]) expected_args=[],
)
def test_access_rules_delete(self): def test_access_rules_delete(self):
self._verify( self._verify(
@ -379,4 +412,5 @@ class TestAccessRuleProxy(test_proxy_base.TestProxyBase):
+ "ShareAccessRule.delete", + "ShareAccessRule.delete",
self.proxy.delete_access_rule, self.proxy.delete_access_rule,
method_args=['access_id', 'share_id', 'ignore_missing'], method_args=['access_id', 'share_id', 'ignore_missing'],
expected_args=[self.proxy , 'share_id']) expected_args=[self.proxy, 'share_id'],
)

View File

@ -49,12 +49,11 @@ EXAMPLE = {
"is_mounting_snapshot_supported": True, "is_mounting_snapshot_supported": True,
"progress": "100%", "progress": "100%",
"share_server_id": None, "share_server_id": None,
"host": "new@denver#lvm-single-pool" "host": "new@denver#lvm-single-pool",
} }
class TestShares(base.TestCase): class TestShares(base.TestCase):
def test_basic(self): def test_basic(self):
shares_resource = share.Share() shares_resource = share.Share()
self.assertEqual('shares', shares_resource.resources_key) self.assertEqual('shares', shares_resource.resources_key)
@ -69,50 +68,65 @@ class TestShares(base.TestCase):
shares_resource = share.Share(**EXAMPLE) shares_resource = share.Share(**EXAMPLE)
self.assertEqual(EXAMPLE['id'], shares_resource.id) self.assertEqual(EXAMPLE['id'], shares_resource.id)
self.assertEqual(EXAMPLE['size'], shares_resource.size) self.assertEqual(EXAMPLE['size'], shares_resource.size)
self.assertEqual(EXAMPLE['availability_zone'], self.assertEqual(
shares_resource.availability_zone) EXAMPLE['availability_zone'], shares_resource.availability_zone
)
self.assertEqual(EXAMPLE['created_at'], shares_resource.created_at) self.assertEqual(EXAMPLE['created_at'], shares_resource.created_at)
self.assertEqual(EXAMPLE['status'], shares_resource.status) self.assertEqual(EXAMPLE['status'], shares_resource.status)
self.assertEqual(EXAMPLE['name'], shares_resource.name) self.assertEqual(EXAMPLE['name'], shares_resource.name)
self.assertEqual(EXAMPLE['description'], self.assertEqual(EXAMPLE['description'], shares_resource.description)
shares_resource.description)
self.assertEqual(EXAMPLE['project_id'], shares_resource.project_id) self.assertEqual(EXAMPLE['project_id'], shares_resource.project_id)
self.assertEqual(EXAMPLE['snapshot_id'], shares_resource.snapshot_id) self.assertEqual(EXAMPLE['snapshot_id'], shares_resource.snapshot_id)
self.assertEqual(EXAMPLE['share_network_id'], self.assertEqual(
shares_resource.share_network_id) EXAMPLE['share_network_id'], shares_resource.share_network_id
self.assertEqual(EXAMPLE['share_protocol'], )
shares_resource.share_protocol) self.assertEqual(
EXAMPLE['share_protocol'], shares_resource.share_protocol
)
self.assertEqual(EXAMPLE['metadata'], shares_resource.metadata) self.assertEqual(EXAMPLE['metadata'], shares_resource.metadata)
self.assertEqual(EXAMPLE['share_type'], shares_resource.share_type) self.assertEqual(EXAMPLE['share_type'], shares_resource.share_type)
self.assertEqual(EXAMPLE['is_public'], shares_resource.is_public) self.assertEqual(EXAMPLE['is_public'], shares_resource.is_public)
self.assertEqual(EXAMPLE['is_snapshot_supported'], self.assertEqual(
shares_resource.is_snapshot_supported) EXAMPLE['is_snapshot_supported'],
shares_resource.is_snapshot_supported,
)
self.assertEqual(EXAMPLE['task_state'], shares_resource.task_state) self.assertEqual(EXAMPLE['task_state'], shares_resource.task_state)
self.assertEqual(EXAMPLE['share_type_name'], self.assertEqual(
shares_resource.share_type_name) EXAMPLE['share_type_name'], shares_resource.share_type_name
self.assertEqual(EXAMPLE['access_rules_status'], )
shares_resource.access_rules_status) self.assertEqual(
self.assertEqual(EXAMPLE['replication_type'], EXAMPLE['access_rules_status'], shares_resource.access_rules_status
shares_resource.replication_type) )
self.assertEqual(EXAMPLE['is_replicated'], self.assertEqual(
shares_resource.is_replicated) EXAMPLE['replication_type'], shares_resource.replication_type
)
self.assertEqual(
EXAMPLE['is_replicated'], shares_resource.is_replicated
)
self.assertEqual(EXAMPLE['user_id'], shares_resource.user_id) self.assertEqual(EXAMPLE['user_id'], shares_resource.user_id)
self.assertEqual(EXAMPLE[ self.assertEqual(
'is_creating_new_share_from_snapshot_supported'], EXAMPLE['is_creating_new_share_from_snapshot_supported'],
(shares_resource.is_creating_new_share_from_snapshot_supported)) (shares_resource.is_creating_new_share_from_snapshot_supported),
self.assertEqual(EXAMPLE['is_reverting_to_snapshot_supported'], )
shares_resource.is_reverting_to_snapshot_supported) self.assertEqual(
self.assertEqual(EXAMPLE['share_group_id'], EXAMPLE['is_reverting_to_snapshot_supported'],
shares_resource.share_group_id) shares_resource.is_reverting_to_snapshot_supported,
self.assertEqual(EXAMPLE[ )
'source_share_group_snapshot_member_id'], self.assertEqual(
shares_resource.source_share_group_snapshot_member_id) EXAMPLE['share_group_id'], shares_resource.share_group_id
self.assertEqual(EXAMPLE['is_mounting_snapshot_supported'], )
shares_resource.is_mounting_snapshot_supported) self.assertEqual(
self.assertEqual(EXAMPLE['progress'], EXAMPLE['source_share_group_snapshot_member_id'],
shares_resource.progress) shares_resource.source_share_group_snapshot_member_id,
self.assertEqual(EXAMPLE['share_server_id'], )
shares_resource.share_server_id) self.assertEqual(
EXAMPLE['is_mounting_snapshot_supported'],
shares_resource.is_mounting_snapshot_supported,
)
self.assertEqual(EXAMPLE['progress'], shares_resource.progress)
self.assertEqual(
EXAMPLE['share_server_id'], shares_resource.share_server_id
)
self.assertEqual(EXAMPLE['host'], shares_resource.host) self.assertEqual(EXAMPLE['host'], shares_resource.host)
@ -139,8 +153,8 @@ class TestShareActions(TestShares):
headers = {'Accept': ''} headers = {'Accept': ''}
self.sess.post.assert_called_with( self.sess.post.assert_called_with(
url, json=body, headers=headers, url, json=body, headers=headers, microversion=microversion
microversion=microversion) )
def test_extend_share(self): def test_extend_share(self):
sot = share.Share(**EXAMPLE) sot = share.Share(**EXAMPLE)
@ -153,8 +167,8 @@ class TestShareActions(TestShares):
headers = {'Accept': ''} headers = {'Accept': ''}
self.sess.post.assert_called_with( self.sess.post.assert_called_with(
url, json=body, headers=headers, url, json=body, headers=headers, microversion=microversion
microversion=microversion) )
def test_revert_to_snapshot(self): def test_revert_to_snapshot(self):
sot = share.Share(**EXAMPLE) sot = share.Share(**EXAMPLE)
@ -167,5 +181,5 @@ class TestShareActions(TestShares):
headers = {'Accept': ''} headers = {'Accept': ''}
self.sess.post.assert_called_with( self.sess.post.assert_called_with(
url, json=body, headers=headers, url, json=body, headers=headers, microversion=microversion
microversion=microversion) )

View File

@ -23,26 +23,21 @@ EXAMPLE = {
"access_key": None, "access_key": None,
"created_at": "2021-09-12T02:01:04.000000", "created_at": "2021-09-12T02:01:04.000000",
"updated_at": "2021-09-12T02:01:04.000000", "updated_at": "2021-09-12T02:01:04.000000",
"metadata": { "metadata": {"key1": "value1", "key2": "value2"},
"key1": "value1",
"key2": "value2"}
} }
class TestShareAccessRule(base.TestCase): class TestShareAccessRule(base.TestCase):
def test_basic(self): def test_basic(self):
rules_resource = share_access_rule.ShareAccessRule() rules_resource = share_access_rule.ShareAccessRule()
self.assertEqual('access_list', rules_resource.resources_key) self.assertEqual('access_list', rules_resource.resources_key)
self.assertEqual('/share-access-rules', rules_resource.base_path) self.assertEqual('/share-access-rules', rules_resource.base_path)
self.assertTrue(rules_resource.allow_list) self.assertTrue(rules_resource.allow_list)
self.assertDictEqual({ self.assertDictEqual(
"limit": "limit", {"limit": "limit", "marker": "marker", "share_id": "share_id"},
"marker": "marker", rules_resource._query_mapping._mapping,
"share_id": "share_id" )
},
rules_resource._query_mapping._mapping)
def test_make_share_access_rules(self): def test_make_share_access_rules(self):
rules_resource = share_access_rule.ShareAccessRule(**EXAMPLE) rules_resource = share_access_rule.ShareAccessRule(**EXAMPLE)

View File

@ -17,21 +17,23 @@ from openstack.tests.unit import base
IDENTIFIER = '08a87d37-5ca2-4308-86c5-cba06d8d796c' IDENTIFIER = '08a87d37-5ca2-4308-86c5-cba06d8d796c'
EXAMPLE = { EXAMPLE = {
"id": "f87589cb-f4bc-4a9b-b481-ab701206eb85", "id": "f87589cb-f4bc-4a9b-b481-ab701206eb85",
"path": ("199.19.213.225:/opt/stack/data/manila/mnt/" "path": (
"share-6ba490c5-5225-4c3b-9982-14b8f475c6d9"), "199.19.213.225:/opt/stack/data/manila/mnt/"
"share-6ba490c5-5225-4c3b-9982-14b8f475c6d9"
),
"preferred": False, "preferred": False,
"share_instance_id": "6ba490c5-5225-4c3b-9982-14b8f475c6d9", "share_instance_id": "6ba490c5-5225-4c3b-9982-14b8f475c6d9",
"is_admin_only": False "is_admin_only": False,
} }
class TestShareExportLocations(base.TestCase): class TestShareExportLocations(base.TestCase):
def test_basic(self): def test_basic(self):
export = el.ShareExportLocation() export = el.ShareExportLocation()
self.assertEqual('export_locations', export.resources_key) self.assertEqual('export_locations', export.resources_key)
self.assertEqual( self.assertEqual(
'/shares/%(share_id)s/export_locations', export.base_path) '/shares/%(share_id)s/export_locations', export.base_path
)
self.assertTrue(export.allow_list) self.assertTrue(export.allow_list)
def test_share_export_locations(self): def test_share_export_locations(self):
@ -40,5 +42,6 @@ class TestShareExportLocations(base.TestCase):
self.assertEqual(EXAMPLE['path'], export.path) self.assertEqual(EXAMPLE['path'], export.path)
self.assertEqual(EXAMPLE['preferred'], export.is_preferred) self.assertEqual(EXAMPLE['preferred'], export.is_preferred)
self.assertEqual( self.assertEqual(
EXAMPLE['share_instance_id'], export.share_instance_id) EXAMPLE['share_instance_id'], export.share_instance_id
)
self.assertEqual(EXAMPLE['is_admin_only'], export.is_admin) self.assertEqual(EXAMPLE['is_admin_only'], export.is_admin)

View File

@ -31,16 +31,16 @@ EXAMPLE = {
"share_server_id": "ba11930a-bf1a-4aa7-bae4-a8dfbaa3cc73", "share_server_id": "ba11930a-bf1a-4aa7-bae4-a8dfbaa3cc73",
"host": "manila2@generic1#GENERIC1", "host": "manila2@generic1#GENERIC1",
"access_rules_status": "active", "access_rules_status": "active",
"id": IDENTIFIER "id": IDENTIFIER,
} }
class TestShareInstances(base.TestCase): class TestShareInstances(base.TestCase):
def test_basic(self): def test_basic(self):
share_instance_resource = share_instance.ShareInstance() share_instance_resource = share_instance.ShareInstance()
self.assertEqual('share_instances', self.assertEqual(
share_instance_resource.resources_key) 'share_instances', share_instance_resource.resources_key
)
self.assertEqual('/share_instances', share_instance_resource.base_path) self.assertEqual('/share_instances', share_instance_resource.base_path)
self.assertTrue(share_instance_resource.allow_list) self.assertTrue(share_instance_resource.allow_list)
self.assertFalse(share_instance_resource.allow_create) self.assertFalse(share_instance_resource.allow_create)
@ -53,26 +53,36 @@ class TestShareInstances(base.TestCase):
self.assertEqual(EXAMPLE['status'], share_instance_resource.status) self.assertEqual(EXAMPLE['status'], share_instance_resource.status)
self.assertEqual(EXAMPLE['progress'], share_instance_resource.progress) self.assertEqual(EXAMPLE['progress'], share_instance_resource.progress)
self.assertEqual(EXAMPLE['share_id'], share_instance_resource.share_id) self.assertEqual(EXAMPLE['share_id'], share_instance_resource.share_id)
self.assertEqual(EXAMPLE['availability_zone'], self.assertEqual(
share_instance_resource.availability_zone) EXAMPLE['availability_zone'],
self.assertEqual(EXAMPLE['replica_state'], share_instance_resource.availability_zone,
share_instance_resource.replica_state) )
self.assertEqual(EXAMPLE['created_at'], self.assertEqual(
share_instance_resource.created_at) EXAMPLE['replica_state'], share_instance_resource.replica_state
self.assertEqual(EXAMPLE['cast_rules_to_readonly'], )
share_instance_resource.cast_rules_to_readonly) self.assertEqual(
self.assertEqual(EXAMPLE['share_network_id'], EXAMPLE['created_at'], share_instance_resource.created_at
share_instance_resource.share_network_id) )
self.assertEqual(EXAMPLE['share_server_id'], self.assertEqual(
share_instance_resource.share_server_id) EXAMPLE['cast_rules_to_readonly'],
share_instance_resource.cast_rules_to_readonly,
)
self.assertEqual(
EXAMPLE['share_network_id'],
share_instance_resource.share_network_id,
)
self.assertEqual(
EXAMPLE['share_server_id'], share_instance_resource.share_server_id
)
self.assertEqual(EXAMPLE['host'], share_instance_resource.host) self.assertEqual(EXAMPLE['host'], share_instance_resource.host)
self.assertEqual(EXAMPLE['access_rules_status'], self.assertEqual(
share_instance_resource.access_rules_status) EXAMPLE['access_rules_status'],
share_instance_resource.access_rules_status,
)
self.assertEqual(EXAMPLE['id'], share_instance_resource.id) self.assertEqual(EXAMPLE['id'], share_instance_resource.id)
class TestShareInstanceActions(TestShareInstances): class TestShareInstanceActions(TestShareInstances):
def setUp(self): def setUp(self):
super(TestShareInstanceActions, self).setUp() super(TestShareInstanceActions, self).setUp()
self.resp = mock.Mock() self.resp = mock.Mock()
@ -94,8 +104,8 @@ class TestShareInstanceActions(TestShareInstances):
body = {"reset_status": {"status": 'active'}} body = {"reset_status": {"status": 'active'}}
headers = {'Accept': ''} headers = {'Accept': ''}
self.sess.post.assert_called_with( self.sess.post.assert_called_with(
url, json=body, headers=headers, url, json=body, headers=headers, microversion=microversion
microversion=microversion) )
def test_force_delete(self): def test_force_delete(self):
sot = share_instance.ShareInstance(**EXAMPLE) sot = share_instance.ShareInstance(**EXAMPLE)
@ -107,5 +117,5 @@ class TestShareInstanceActions(TestShareInstances):
body = {'force_delete': None} body = {'force_delete': None}
headers = {'Accept': ''} headers = {'Accept': ''}
self.sess.post.assert_called_with( self.sess.post.assert_called_with(
url, json=body, headers=headers, url, json=body, headers=headers, microversion=microversion
microversion=microversion) )

View File

@ -21,12 +21,11 @@ EXAMPLE = {
"description": "My share network", "description": "My share network",
"created_at": "2021-06-10T10:11:17.291981", "created_at": "2021-06-10T10:11:17.291981",
"updated_at": None, "updated_at": None,
"share_network_subnets": [] "share_network_subnets": [],
} }
class TestShareNetwork(base.TestCase): class TestShareNetwork(base.TestCase):
def test_basic(self): def test_basic(self):
networks = share_network.ShareNetwork() networks = share_network.ShareNetwork()
self.assertEqual('share_networks', networks.resources_key) self.assertEqual('share_networks', networks.resources_key)
@ -38,28 +37,28 @@ class TestShareNetwork(base.TestCase):
self.assertTrue(networks.allow_delete) self.assertTrue(networks.allow_delete)
self.assertFalse(networks.allow_head) self.assertFalse(networks.allow_head)
self.assertDictEqual({ self.assertDictEqual(
"limit": "limit", {
"marker": "marker", "limit": "limit",
"project_id": "project_id", "marker": "marker",
"created_since": "created_since", "project_id": "project_id",
"created_before": "created_before", "created_since": "created_since",
"offset": "offset", "created_before": "created_before",
"security_service_id": "security_service_id", "offset": "offset",
"project_id": "project_id", "security_service_id": "security_service_id",
"all_projects": "all_tenants", "project_id": "project_id",
"name": "name", "all_projects": "all_tenants",
"description": "description" "name": "name",
}, "description": "description",
networks._query_mapping._mapping) },
networks._query_mapping._mapping,
)
def test_share_network(self): def test_share_network(self):
networks = share_network.ShareNetwork(**EXAMPLE) networks = share_network.ShareNetwork(**EXAMPLE)
self.assertEqual(EXAMPLE['id'], networks.id) self.assertEqual(EXAMPLE['id'], networks.id)
self.assertEqual(EXAMPLE['name'], networks.name) self.assertEqual(EXAMPLE['name'], networks.name)
self.assertEqual(EXAMPLE['project_id'], networks.project_id) self.assertEqual(EXAMPLE['project_id'], networks.project_id)
self.assertEqual( self.assertEqual(EXAMPLE['description'], networks.description)
EXAMPLE['description'], networks.description) self.assertEqual(EXAMPLE['created_at'], networks.created_at)
self.assertEqual(
EXAMPLE['created_at'], networks.created_at)
self.assertEqual(EXAMPLE['updated_at'], networks.updated_at) self.assertEqual(EXAMPLE['updated_at'], networks.updated_at)

View File

@ -28,36 +28,43 @@ EXAMPLE = {
"cidr": None, "cidr": None,
"network_type": None, "network_type": None,
"mtu": None, "mtu": None,
"gateway": None "gateway": None,
} }
class TestShareNetworkSubnet(base.TestCase): class TestShareNetworkSubnet(base.TestCase):
def test_basic(self): def test_basic(self):
SNS_resource = SNS.ShareNetworkSubnet() SNS_resource = SNS.ShareNetworkSubnet()
self.assertEqual('share_network_subnets', SNS_resource.resources_key) self.assertEqual('share_network_subnets', SNS_resource.resources_key)
self.assertEqual('/share-networks/%(share_network_id)s/subnets', self.assertEqual(
SNS_resource.base_path) '/share-networks/%(share_network_id)s/subnets',
SNS_resource.base_path,
)
self.assertTrue(SNS_resource.allow_list) self.assertTrue(SNS_resource.allow_list)
def test_make_share_network_subnet(self): def test_make_share_network_subnet(self):
SNS_resource = SNS.ShareNetworkSubnet(**EXAMPLE) SNS_resource = SNS.ShareNetworkSubnet(**EXAMPLE)
self.assertEqual(EXAMPLE['id'], SNS_resource.id) self.assertEqual(EXAMPLE['id'], SNS_resource.id)
self.assertEqual(EXAMPLE['availability_zone'], self.assertEqual(
SNS_resource.availability_zone) EXAMPLE['availability_zone'], SNS_resource.availability_zone
self.assertEqual(EXAMPLE['share_network_id'], )
SNS_resource.share_network_id) self.assertEqual(
self.assertEqual(EXAMPLE['share_network_name'], EXAMPLE['share_network_id'], SNS_resource.share_network_id
SNS_resource.share_network_name) )
self.assertEqual(
EXAMPLE['share_network_name'], SNS_resource.share_network_name
)
self.assertEqual(EXAMPLE['created_at'], SNS_resource.created_at) self.assertEqual(EXAMPLE['created_at'], SNS_resource.created_at)
self.assertEqual(EXAMPLE['segmentation_id'], self.assertEqual(
SNS_resource.segmentation_id) EXAMPLE['segmentation_id'], SNS_resource.segmentation_id
self.assertEqual(EXAMPLE['neutron_subnet_id'], )
SNS_resource.neutron_subnet_id) self.assertEqual(
EXAMPLE['neutron_subnet_id'], SNS_resource.neutron_subnet_id
)
self.assertEqual(EXAMPLE['updated_at'], SNS_resource.updated_at) self.assertEqual(EXAMPLE['updated_at'], SNS_resource.updated_at)
self.assertEqual(EXAMPLE['neutron_net_id'], self.assertEqual(
SNS_resource.neutron_net_id) EXAMPLE['neutron_net_id'], SNS_resource.neutron_net_id
)
self.assertEqual(EXAMPLE['ip_version'], SNS_resource.ip_version) self.assertEqual(EXAMPLE['ip_version'], SNS_resource.ip_version)
self.assertEqual(EXAMPLE['cidr'], SNS_resource.cidr) self.assertEqual(EXAMPLE['cidr'], SNS_resource.cidr)
self.assertEqual(EXAMPLE['network_type'], SNS_resource.network_type) self.assertEqual(EXAMPLE['network_type'], SNS_resource.network_type)

View File

@ -25,41 +25,36 @@ EXAMPLE = {
"share_size": 1, "share_size": 1,
"id": "6d221c1d-0200-461e-8d20-24b4776b9ddb", "id": "6d221c1d-0200-461e-8d20-24b4776b9ddb",
"project_id": "cadd7139bc3148b8973df097c0911016", "project_id": "cadd7139bc3148b8973df097c0911016",
"size": 1 "size": 1,
} }
class TestShareSnapshot(base.TestCase): class TestShareSnapshot(base.TestCase):
def test_basic(self): def test_basic(self):
snapshot_resource = share_snapshot.ShareSnapshot() snapshot_resource = share_snapshot.ShareSnapshot()
self.assertEqual( self.assertEqual('snapshots', snapshot_resource.resources_key)
'snapshots', snapshot_resource.resources_key)
self.assertEqual('/snapshots', snapshot_resource.base_path) self.assertEqual('/snapshots', snapshot_resource.base_path)
self.assertTrue(snapshot_resource.allow_list) self.assertTrue(snapshot_resource.allow_list)
self.assertDictEqual({ self.assertDictEqual(
"limit": "limit", {
"marker": "marker", "limit": "limit",
"snapshot_id": "snapshot_id" "marker": "marker",
}, "snapshot_id": "snapshot_id",
snapshot_resource._query_mapping._mapping) },
snapshot_resource._query_mapping._mapping,
)
def test_make_share_snapshot(self): def test_make_share_snapshot(self):
snapshot_resource = share_snapshot.ShareSnapshot(**EXAMPLE) snapshot_resource = share_snapshot.ShareSnapshot(**EXAMPLE)
self.assertEqual(EXAMPLE['id'], snapshot_resource.id) self.assertEqual(EXAMPLE['id'], snapshot_resource.id)
self.assertEqual(EXAMPLE['share_id'], snapshot_resource.share_id) self.assertEqual(EXAMPLE['share_id'], snapshot_resource.share_id)
self.assertEqual(EXAMPLE['user_id'], self.assertEqual(EXAMPLE['user_id'], snapshot_resource.user_id)
snapshot_resource.user_id)
self.assertEqual(EXAMPLE['created_at'], snapshot_resource.created_at) self.assertEqual(EXAMPLE['created_at'], snapshot_resource.created_at)
self.assertEqual(EXAMPLE['status'], snapshot_resource.status) self.assertEqual(EXAMPLE['status'], snapshot_resource.status)
self.assertEqual(EXAMPLE['name'], snapshot_resource.name) self.assertEqual(EXAMPLE['name'], snapshot_resource.name)
self.assertEqual( self.assertEqual(EXAMPLE['description'], snapshot_resource.description)
EXAMPLE['description'], snapshot_resource.description) self.assertEqual(EXAMPLE['share_proto'], snapshot_resource.share_proto)
self.assertEqual( self.assertEqual(EXAMPLE['share_size'], snapshot_resource.share_size)
EXAMPLE['share_proto'], snapshot_resource.share_proto) self.assertEqual(EXAMPLE['project_id'], snapshot_resource.project_id)
self.assertEqual(
EXAMPLE['share_size'], snapshot_resource.share_size)
self.assertEqual(
EXAMPLE['project_id'], snapshot_resource.project_id)
self.assertEqual(EXAMPLE['size'], snapshot_resource.size) self.assertEqual(EXAMPLE['size'], snapshot_resource.size)

View File

@ -22,12 +22,11 @@ EXAMPLE = {
"created_at": "2021-06-04T00:44:52.000000", "created_at": "2021-06-04T00:44:52.000000",
"id": "275516e8-c998-4e78-a41e-7dd3a03e71cd", "id": "275516e8-c998-4e78-a41e-7dd3a03e71cd",
"provider_location": "/path/to/fake...", "provider_location": "/path/to/fake...",
"updated_at": "2017-06-04T00:44:54.000000" "updated_at": "2017-06-04T00:44:54.000000",
} }
class TestShareSnapshotInstances(base.TestCase): class TestShareSnapshotInstances(base.TestCase):
def test_basic(self): def test_basic(self):
instances = share_snapshot_instance.ShareSnapshotInstance() instances = share_snapshot_instance.ShareSnapshotInstance()
self.assertEqual('snapshot_instance', instances.resource_key) self.assertEqual('snapshot_instance', instances.resource_key)
@ -40,11 +39,13 @@ class TestShareSnapshotInstances(base.TestCase):
self.assertEqual(EXAMPLE['id'], instance.id) self.assertEqual(EXAMPLE['id'], instance.id)
self.assertEqual(EXAMPLE['share_id'], instance.share_id) self.assertEqual(EXAMPLE['share_id'], instance.share_id)
self.assertEqual( self.assertEqual(
EXAMPLE['share_instance_id'], instance.share_instance_id) EXAMPLE['share_instance_id'], instance.share_instance_id
)
self.assertEqual(EXAMPLE['snapshot_id'], instance.snapshot_id) self.assertEqual(EXAMPLE['snapshot_id'], instance.snapshot_id)
self.assertEqual(EXAMPLE['status'], instance.status) self.assertEqual(EXAMPLE['status'], instance.status)
self.assertEqual(EXAMPLE['progress'], instance.progress) self.assertEqual(EXAMPLE['progress'], instance.progress)
self.assertEqual(EXAMPLE['created_at'], instance.created_at) self.assertEqual(EXAMPLE['created_at'], instance.created_at)
self.assertEqual(EXAMPLE['updated_at'], instance.updated_at) self.assertEqual(EXAMPLE['updated_at'], instance.updated_at)
self.assertEqual( self.assertEqual(
EXAMPLE['provider_location'], instance.provider_location) EXAMPLE['provider_location'], instance.provider_location
)

View File

@ -40,29 +40,30 @@ EXAMPLE = {
"replication_domain": None, "replication_domain": None,
"sg_consistent_snapshot_support": "pool", "sg_consistent_snapshot_support": "pool",
"ipv4_support": True, "ipv4_support": True,
"ipv6_support": False} "ipv6_support": False,
},
} }
class TestStoragePool(base.TestCase): class TestStoragePool(base.TestCase):
def test_basic(self): def test_basic(self):
pool_resource = storage_pool.StoragePool() pool_resource = storage_pool.StoragePool()
self.assertEqual('pools', pool_resource.resources_key) self.assertEqual('pools', pool_resource.resources_key)
self.assertEqual( self.assertEqual('/scheduler-stats/pools', pool_resource.base_path)
'/scheduler-stats/pools', pool_resource.base_path)
self.assertTrue(pool_resource.allow_list) self.assertTrue(pool_resource.allow_list)
self.assertDictEqual({ self.assertDictEqual(
'pool': 'pool', {
'backend': 'backend', 'pool': 'pool',
'host': 'host', 'backend': 'backend',
'limit': 'limit', 'host': 'host',
'marker': 'marker', 'limit': 'limit',
'capabilities': 'capabilities', 'marker': 'marker',
'share_type': 'share_type', 'capabilities': 'capabilities',
}, 'share_type': 'share_type',
pool_resource._query_mapping._mapping) },
pool_resource._query_mapping._mapping,
)
def test_make_storage_pool(self): def test_make_storage_pool(self):
pool_resource = storage_pool.StoragePool(**EXAMPLE) pool_resource = storage_pool.StoragePool(**EXAMPLE)
@ -70,5 +71,4 @@ class TestStoragePool(base.TestCase):
self.assertEqual(EXAMPLE['host'], pool_resource.host) self.assertEqual(EXAMPLE['host'], pool_resource.host)
self.assertEqual(EXAMPLE['name'], pool_resource.name) self.assertEqual(EXAMPLE['name'], pool_resource.name)
self.assertEqual(EXAMPLE['backend'], pool_resource.backend) self.assertEqual(EXAMPLE['backend'], pool_resource.backend)
self.assertEqual( self.assertEqual(EXAMPLE['capabilities'], pool_resource.capabilities)
EXAMPLE['capabilities'], pool_resource.capabilities)

View File

@ -29,12 +29,12 @@ EXAMPLE = {
"user_message": ( "user_message": (
"allocate host: No storage could be allocated" "allocate host: No storage could be allocated"
"for this share request, Capabilities filter" "for this share request, Capabilities filter"
"didn't succeed.") "didn't succeed."
),
} }
class TestUserMessage(base.TestCase): class TestUserMessage(base.TestCase):
def test_basic(self): def test_basic(self):
message = user_message.UserMessage() message = user_message.UserMessage()
self.assertEqual('messages', message.resources_key) self.assertEqual('messages', message.resources_key)
@ -46,29 +46,21 @@ class TestUserMessage(base.TestCase):
self.assertTrue(message.allow_fetch) self.assertTrue(message.allow_fetch)
self.assertFalse(message.allow_head) self.assertFalse(message.allow_head)
self.assertDictEqual({ self.assertDictEqual(
"limit": "limit", {"limit": "limit", "marker": "marker", "message_id": "message_id"},
"marker": "marker", message._query_mapping._mapping,
"message_id": "message_id" )
},
message._query_mapping._mapping)
def test_user_message(self): def test_user_message(self):
messages = user_message.UserMessage(**EXAMPLE) messages = user_message.UserMessage(**EXAMPLE)
self.assertEqual(EXAMPLE['id'], messages.id) self.assertEqual(EXAMPLE['id'], messages.id)
self.assertEqual(EXAMPLE['resource_id'], messages.resource_id) self.assertEqual(EXAMPLE['resource_id'], messages.resource_id)
self.assertEqual( self.assertEqual(EXAMPLE['message_level'], messages.message_level)
EXAMPLE['message_level'], messages.message_level) self.assertEqual(EXAMPLE['user_message'], messages.user_message)
self.assertEqual(
EXAMPLE['user_message'], messages.user_message)
self.assertEqual(EXAMPLE['expires_at'], messages.expires_at) self.assertEqual(EXAMPLE['expires_at'], messages.expires_at)
self.assertEqual(EXAMPLE['detail_id'], messages.detail_id) self.assertEqual(EXAMPLE['detail_id'], messages.detail_id)
self.assertEqual(EXAMPLE['created_at'], messages.created_at) self.assertEqual(EXAMPLE['created_at'], messages.created_at)
self.assertEqual( self.assertEqual(EXAMPLE['request_id'], messages.request_id)
EXAMPLE['request_id'], messages.request_id) self.assertEqual(EXAMPLE['project_id'], messages.project_id)
self.assertEqual( self.assertEqual(EXAMPLE['resource_type'], messages.resource_type)
EXAMPLE['project_id'], messages.project_id) self.assertEqual(EXAMPLE['action_id'], messages.action_id)
self.assertEqual(
EXAMPLE['resource_type'], messages.resource_type)
self.assertEqual(
EXAMPLE['action_id'], messages.action_id)