Files
manila/manila_tempest_tests/services/share/v2/json/shares_client.py
Valeriy Ponomaryov 05c42ecf70 Add quotas per share type
With this feature it will be possible to set quotas per share type
for all existing quota resources. It is useful for deployments with
multiple backends that are accessible via different share types.

Also, fix one of existing DB migrations that hangs on PostgreSQL.

APIImpact
DocImpact
Implements blueprint support-quotas-per-share-type
Change-Id: I8472418c2eb363cf5a76c672c7fdea72f21e4f63
2017-07-19 17:29:04 +03:00

1728 lines
74 KiB
Python

# Copyright 2015 Andrew Kerr
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import six
import time
from six.moves.urllib import parse as urlparse
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions
from manila_tempest_tests.common import constants
from manila_tempest_tests.services.share.json import shares_client
from manila_tempest_tests import share_exceptions
from manila_tempest_tests import utils
CONF = config.CONF
LATEST_MICROVERSION = CONF.share.max_api_microversion
EXPERIMENTAL = {'X-OpenStack-Manila-API-Experimental': 'True'}
class SharesV2Client(shares_client.SharesClient):
"""Tempest REST client for Manila.
It handles shares and access to it in OpenStack.
"""
api_version = 'v2'
def __init__(self, auth_provider):
super(SharesV2Client, self).__init__(auth_provider)
self.API_MICROVERSIONS_HEADER = 'x-openstack-manila-api-version'
def inject_microversion_header(self, headers, version,
extra_headers=False):
"""Inject the required manila microversion header."""
new_headers = self.get_headers()
new_headers[self.API_MICROVERSIONS_HEADER] = version
if extra_headers and headers:
new_headers.update(headers)
elif headers:
new_headers = headers
return new_headers
# Overwrite all http verb calls to inject the micro version header
def post(self, url, body, headers=None, extra_headers=False,
version=LATEST_MICROVERSION):
headers = self.inject_microversion_header(headers, version,
extra_headers=extra_headers)
return super(SharesV2Client, self).post(url, body, headers=headers)
def get(self, url, headers=None, extra_headers=False,
version=LATEST_MICROVERSION):
headers = self.inject_microversion_header(headers, version,
extra_headers=extra_headers)
return super(SharesV2Client, self).get(url, headers=headers)
def delete(self, url, headers=None, body=None, extra_headers=False,
version=LATEST_MICROVERSION):
headers = self.inject_microversion_header(headers, version,
extra_headers=extra_headers)
return super(SharesV2Client, self).delete(url, headers=headers,
body=body)
def patch(self, url, body, headers=None, extra_headers=False,
version=LATEST_MICROVERSION):
headers = self.inject_microversion_header(headers, version,
extra_headers=extra_headers)
return super(SharesV2Client, self).patch(url, body, headers=headers)
def put(self, url, body, headers=None, extra_headers=False,
version=LATEST_MICROVERSION):
headers = self.inject_microversion_header(headers, version,
extra_headers=extra_headers)
return super(SharesV2Client, self).put(url, body, headers=headers)
def head(self, url, headers=None, extra_headers=False,
version=LATEST_MICROVERSION):
headers = self.inject_microversion_header(headers, version,
extra_headers=extra_headers)
return super(SharesV2Client, self).head(url, headers=headers)
def copy(self, url, headers=None, extra_headers=False,
version=LATEST_MICROVERSION):
headers = self.inject_microversion_header(headers, version,
extra_headers=extra_headers)
return super(SharesV2Client, self).copy(url, headers=headers)
def reset_state(self, s_id, status="error", s_type="shares",
headers=None, version=LATEST_MICROVERSION,
action_name=None):
"""Resets the state of a share, snapshot, cg, or a cgsnapshot.
status: available, error, creating, deleting, error_deleting
s_type: shares, share_instances, snapshots, consistency-groups,
cgsnapshots.
"""
if action_name is None:
if utils.is_microversion_gt(version, "2.6"):
action_name = 'reset_status'
else:
action_name = 'os-reset_status'
body = {action_name: {"status": status}}
body = json.dumps(body)
resp, body = self.post("%s/%s/action" % (s_type, s_id), body,
headers=headers, extra_headers=True,
version=version)
self.expected_success(202, resp.status)
return body
def force_delete(self, s_id, s_type="shares", headers=None,
version=LATEST_MICROVERSION, action_name=None):
"""Force delete share or snapshot.
s_type: shares, snapshots
"""
if action_name is None:
if utils.is_microversion_gt(version, "2.6"):
action_name = 'force_delete'
else:
action_name = 'os-force_delete'
body = {action_name: None}
body = json.dumps(body)
resp, body = self.post("%s/%s/action" % (s_type, s_id), body,
headers=headers, extra_headers=True,
version=version)
self.expected_success(202, resp.status)
return body
def send_microversion_request(self, version=None, script_name=None):
"""Prepare and send the HTTP GET Request to the base URL.
Extracts the base URL from the shares_client endpoint and makes a GET
request with the microversions request header.
:param version: The string to send for the value of the microversion
header, or None to omit the header.
:param script_name: The first part of the URL (v1 or v2), or None to
omit it.
"""
headers = self.get_headers()
url, headers, body = self.auth_provider.auth_request(
'GET', 'shares', headers, None, self.filters)
url = '/'.join(url.split('/')[:3]) + '/'
if script_name:
url += script_name + '/'
if version:
headers[self.API_MICROVERSIONS_HEADER] = version
resp, resp_body = self.raw_request(url, 'GET', headers=headers)
self.response_checker('GET', resp, resp_body)
resp_body = json.loads(resp_body)
return resp, resp_body
def is_resource_deleted(self, *args, **kwargs):
"""Verifies whether provided resource deleted or not.
:param kwargs: dict with expected keys 'share_id', 'snapshot_id',
:param kwargs: 'sn_id', 'ss_id', 'vt_id' and 'server_id'
:raises share_exceptions.InvalidResource
"""
if "share_instance_id" in kwargs:
return self._is_resource_deleted(
self.get_share_instance, kwargs.get("share_instance_id"))
elif "share_group_id" in kwargs:
return self._is_resource_deleted(
self.get_share_group, kwargs.get("share_group_id"))
elif "share_group_snapshot_id" in kwargs:
return self._is_resource_deleted(
self.get_share_group_snapshot,
kwargs.get("share_group_snapshot_id"))
elif "share_group_type_id" in kwargs:
return self._is_resource_deleted(
self.get_share_group_type, kwargs.get("share_group_type_id"))
elif "replica_id" in kwargs:
return self._is_resource_deleted(
self.get_share_replica, kwargs.get("replica_id"))
elif "message_id" in kwargs:
return self._is_resource_deleted(
self.get_message, kwargs.get("message_id"))
else:
return super(SharesV2Client, self).is_resource_deleted(
*args, **kwargs)
###############
def create_share(self, share_protocol=None, size=None,
name=None, snapshot_id=None, description=None,
metadata=None, share_network_id=None,
share_type_id=None, is_public=False,
share_group_id=None, availability_zone=None,
version=LATEST_MICROVERSION, experimental=False):
headers = EXPERIMENTAL if experimental else None
metadata = metadata or {}
if name is None:
name = data_utils.rand_name("tempest-created-share")
if description is None:
description = data_utils.rand_name("tempest-created-share-desc")
if size is None:
size = self.share_size
if share_protocol is None:
share_protocol = self.share_protocol
if share_protocol is None:
raise share_exceptions.ShareProtocolNotSpecified()
post_body = {
"share": {
"share_proto": share_protocol,
"description": description,
"snapshot_id": snapshot_id,
"name": name,
"size": size,
"metadata": metadata,
"is_public": is_public,
}
}
if availability_zone:
post_body["share"]["availability_zone"] = availability_zone
if share_network_id:
post_body["share"]["share_network_id"] = share_network_id
if share_type_id:
post_body["share"]["share_type"] = share_type_id
if share_group_id:
post_body["share"]["share_group_id"] = share_group_id
body = json.dumps(post_body)
resp, body = self.post("shares", body, headers=headers,
extra_headers=experimental, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_shares(self, detailed=False, params=None,
version=LATEST_MICROVERSION, experimental=False):
"""Get list of shares w/o filters."""
headers = EXPERIMENTAL if experimental else None
uri = 'shares/detail' if detailed else 'shares'
uri += '?%s' % urlparse.urlencode(params) if params else ''
resp, body = self.get(uri, headers=headers, extra_headers=experimental,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_shares_with_detail(self, params=None,
version=LATEST_MICROVERSION,
experimental=False):
"""Get detailed list of shares w/o filters."""
return self.list_shares(detailed=True, params=params,
version=version, experimental=experimental)
def get_share(self, share_id, version=LATEST_MICROVERSION,
experimental=False):
headers = EXPERIMENTAL if experimental else None
resp, body = self.get("shares/%s" % share_id, headers=headers,
extra_headers=experimental, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def get_share_export_location(
self, share_id, export_location_uuid, version=LATEST_MICROVERSION):
resp, body = self.get(
"shares/%(share_id)s/export_locations/%(el_uuid)s" % {
"share_id": share_id, "el_uuid": export_location_uuid},
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_share_export_locations(
self, share_id, version=LATEST_MICROVERSION):
resp, body = self.get(
"shares/%(share_id)s/export_locations" % {"share_id": share_id},
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def delete_share(self, share_id, params=None,
version=LATEST_MICROVERSION):
uri = "shares/%s" % share_id
uri += '?%s' % (urlparse.urlencode(params) if params else '')
resp, body = self.delete(uri, version=version)
self.expected_success(202, resp.status)
return body
###############
def get_instances_of_share(self, share_id, version=LATEST_MICROVERSION):
resp, body = self.get("shares/%s/instances" % share_id,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_share_instances(self, version=LATEST_MICROVERSION,
params=None):
uri = 'share_instances'
uri += '?%s' % urlparse.urlencode(params) if params else ''
resp, body = self.get(uri, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def get_share_instance(self, instance_id, version=LATEST_MICROVERSION):
resp, body = self.get("share_instances/%s" % instance_id,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def get_share_instance_export_location(
self, instance_id, export_location_uuid,
version=LATEST_MICROVERSION):
resp, body = self.get(
"share_instances/%(instance_id)s/export_locations/%(el_uuid)s" % {
"instance_id": instance_id, "el_uuid": export_location_uuid},
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_share_instance_export_locations(
self, instance_id, version=LATEST_MICROVERSION):
resp, body = self.get(
"share_instances/%s/export_locations" % instance_id,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def wait_for_share_instance_status(self, instance_id, status,
version=LATEST_MICROVERSION):
"""Waits for a share to reach a given status."""
body = self.get_share_instance(instance_id, version=version)
instance_status = body['status']
start = int(time.time())
while instance_status != status:
time.sleep(self.build_interval)
body = self.get_share(instance_id)
instance_status = body['status']
if instance_status == status:
return
elif 'error' in instance_status.lower():
raise share_exceptions.ShareInstanceBuildErrorException(
id=instance_id)
if int(time.time()) - start >= self.build_timeout:
message = ('Share instance %s failed to reach %s status within'
' the required time (%s s).' %
(instance_id, status, self.build_timeout))
raise exceptions.TimeoutException(message)
def wait_for_share_status(self, share_id, status, status_attr='status',
version=LATEST_MICROVERSION):
"""Waits for a share to reach a given status."""
body = self.get_share(share_id, version=version)
share_status = body[status_attr]
start = int(time.time())
while share_status != status:
time.sleep(self.build_interval)
body = self.get_share(share_id, version=version)
share_status = body[status_attr]
if share_status == status:
return
elif 'error' in share_status.lower():
raise share_exceptions.ShareBuildErrorException(
share_id=share_id)
if int(time.time()) - start >= self.build_timeout:
message = ("Share's %(status_attr)s failed to transition to "
"%(status)s within the required time %(seconds)s." %
{"status_attr": status_attr, "status": status,
"seconds": self.build_timeout})
raise exceptions.TimeoutException(message)
###############
def extend_share(self, share_id, new_size, version=LATEST_MICROVERSION,
action_name=None):
if action_name is None:
if utils.is_microversion_gt(version, "2.6"):
action_name = 'extend'
else:
action_name = 'os-extend'
post_body = {
action_name: {
"new_size": new_size,
}
}
body = json.dumps(post_body)
resp, body = self.post(
"shares/%s/action" % share_id, body, version=version)
self.expected_success(202, resp.status)
return body
def shrink_share(self, share_id, new_size, version=LATEST_MICROVERSION,
action_name=None):
if action_name is None:
if utils.is_microversion_gt(version, "2.6"):
action_name = 'shrink'
else:
action_name = 'os-shrink'
post_body = {
action_name: {
"new_size": new_size,
}
}
body = json.dumps(post_body)
resp, body = self.post(
"shares/%s/action" % share_id, body, version=version)
self.expected_success(202, resp.status)
return body
###############
def manage_share(self, service_host, protocol, export_path,
share_type_id, name=None, description=None,
is_public=False, version=LATEST_MICROVERSION,
url=None):
post_body = {
"share": {
"export_path": export_path,
"service_host": service_host,
"protocol": protocol,
"share_type": share_type_id,
"name": name,
"description": description,
"is_public": is_public,
}
}
if url is None:
if utils.is_microversion_gt(version, "2.6"):
url = 'shares/manage'
else:
url = 'os-share-manage'
body = json.dumps(post_body)
resp, body = self.post(url, body, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def unmanage_share(self, share_id, version=LATEST_MICROVERSION, url=None,
action_name=None, body=None):
if url is None:
if utils.is_microversion_gt(version, "2.6"):
url = 'shares'
else:
url = 'os-share-unmanage'
if action_name is None:
if utils.is_microversion_gt(version, "2.6"):
action_name = 'action'
else:
action_name = 'unmanage'
if body is None and utils.is_microversion_gt(version, "2.6"):
body = json.dumps({'unmanage': {}})
resp, body = self.post(
"%(url)s/%(share_id)s/%(action_name)s" % {
'url': url, 'share_id': share_id, 'action_name': action_name},
body,
version=version)
self.expected_success(202, resp.status)
return body
###############
def create_snapshot(self, share_id, name=None, description=None,
force=False, version=LATEST_MICROVERSION):
if name is None:
name = data_utils.rand_name("tempest-created-share-snap")
if description is None:
description = data_utils.rand_name(
"tempest-created-share-snap-desc")
post_body = {
"snapshot": {
"name": name,
"force": force,
"description": description,
"share_id": share_id,
}
}
body = json.dumps(post_body)
resp, body = self.post("snapshots", body, version=version)
self.expected_success(202, resp.status)
return self._parse_resp(body)
def get_snapshot(self, snapshot_id, version=LATEST_MICROVERSION):
resp, body = self.get("snapshots/%s" % snapshot_id, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_snapshots(self, detailed=False, params=None,
version=LATEST_MICROVERSION):
"""Get list of share snapshots w/o filters."""
uri = 'snapshots/detail' if detailed else 'snapshots'
uri += '?%s' % urlparse.urlencode(params) if params else ''
resp, body = self.get(uri, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_snapshots_for_share(self, share_id, detailed=False,
version=LATEST_MICROVERSION):
"""Get list of snapshots for given share."""
uri = ('snapshots/detail?share_id=%s' % share_id
if detailed else 'snapshots?share_id=%s' % share_id)
resp, body = self.get(uri, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_snapshots_with_detail(self, params=None,
version=LATEST_MICROVERSION):
"""Get detailed list of share snapshots w/o filters."""
return self.list_snapshots(detailed=True, params=params,
version=version)
def delete_snapshot(self, snap_id, version=LATEST_MICROVERSION):
resp, body = self.delete("snapshots/%s" % snap_id, version=version)
self.expected_success(202, resp.status)
return body
def wait_for_snapshot_status(self, snapshot_id, status,
version=LATEST_MICROVERSION):
"""Waits for a snapshot to reach a given status."""
body = self.get_snapshot(snapshot_id, version=version)
snapshot_name = body['name']
snapshot_status = body['status']
start = int(time.time())
while snapshot_status != status:
time.sleep(self.build_interval)
body = self.get_snapshot(snapshot_id, version=version)
snapshot_status = body['status']
if 'error' in snapshot_status:
raise (share_exceptions.
SnapshotBuildErrorException(snapshot_id=snapshot_id))
if int(time.time()) - start >= self.build_timeout:
message = ('Share Snapshot %s failed to reach %s status '
'within the required time (%s s).' %
(snapshot_name, status, self.build_timeout))
raise exceptions.TimeoutException(message)
def manage_snapshot(self, share_id, provider_location,
name=None, description=None,
version=LATEST_MICROVERSION,
driver_options=None):
if name is None:
name = data_utils.rand_name("tempest-manage-snapshot")
if description is None:
description = data_utils.rand_name("tempest-manage-snapshot-desc")
post_body = {
"snapshot": {
"share_id": share_id,
"provider_location": provider_location,
"name": name,
"description": description,
"driver_options": driver_options if driver_options else {},
}
}
url = 'snapshots/manage'
body = json.dumps(post_body)
resp, body = self.post(url, body, version=version)
self.expected_success(202, resp.status)
return self._parse_resp(body)
def unmanage_snapshot(self, snapshot_id, version=LATEST_MICROVERSION,
body=None):
url = 'snapshots'
action_name = 'action'
if body is None:
body = json.dumps({'unmanage': {}})
resp, body = self.post(
"%(url)s/%(snapshot_id)s/%(action_name)s" % {
'url': url, 'snapshot_id': snapshot_id,
'action_name': action_name},
body,
version=version)
self.expected_success(202, resp.status)
return body
###############
def revert_to_snapshot(self, share_id, snapshot_id,
version=LATEST_MICROVERSION):
url = 'shares/%s/action' % share_id
body = json.dumps({'revert': {'snapshot_id': snapshot_id}})
resp, body = self.post(url, body, version=version)
self.expected_success(202, resp.status)
return self._parse_resp(body)
###############
def create_share_type_extra_specs(self, share_type_id, extra_specs,
version=LATEST_MICROVERSION):
url = "types/%s/extra_specs" % share_type_id
post_body = json.dumps({'extra_specs': extra_specs})
resp, body = self.post(url, post_body, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def get_share_type_extra_spec(self, share_type_id, extra_spec_name,
version=LATEST_MICROVERSION):
uri = "types/%s/extra_specs/%s" % (share_type_id, extra_spec_name)
resp, body = self.get(uri, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def get_share_type_extra_specs(self, share_type_id, params=None,
version=LATEST_MICROVERSION):
uri = "types/%s/extra_specs" % share_type_id
if params is not None:
uri += '?%s' % urlparse.urlencode(params)
resp, body = self.get(uri, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def update_share_type_extra_spec(self, share_type_id, spec_name,
spec_value, version=LATEST_MICROVERSION):
uri = "types/%s/extra_specs/%s" % (share_type_id, spec_name)
extra_spec = {spec_name: spec_value}
post_body = json.dumps(extra_spec)
resp, body = self.put(uri, post_body, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def update_share_type_extra_specs(self, share_type_id, extra_specs,
version=LATEST_MICROVERSION):
uri = "types/%s/extra_specs" % share_type_id
extra_specs = {"extra_specs": extra_specs}
post_body = json.dumps(extra_specs)
resp, body = self.post(uri, post_body, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def delete_share_type_extra_spec(self, share_type_id, extra_spec_name,
version=LATEST_MICROVERSION):
uri = "types/%s/extra_specs/%s" % (share_type_id, extra_spec_name)
resp, body = self.delete(uri, version=version)
self.expected_success(202, resp.status)
return body
###############
def get_snapshot_instance(self, instance_id, version=LATEST_MICROVERSION):
resp, body = self.get("snapshot-instances/%s" % instance_id,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_snapshot_instances(self, detail=False, snapshot_id=None,
version=LATEST_MICROVERSION):
"""Get list of share snapshot instances."""
uri = "snapshot-instances%s" % ('/detail' if detail else '')
if snapshot_id is not None:
uri += '?snapshot_id=%s' % snapshot_id
resp, body = self.get(uri, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def reset_snapshot_instance_status(self, instance_id,
status=constants.STATUS_AVAILABLE,
version=LATEST_MICROVERSION):
"""Reset the status."""
uri = 'snapshot-instances/%s/action' % instance_id
post_body = {
'reset_status': {
'status': status
}
}
body = json.dumps(post_body)
resp, body = self.post(uri, body, extra_headers=True, version=version)
self.expected_success(202, resp.status)
return self._parse_resp(body)
def wait_for_snapshot_instance_status(self, instance_id, expected_status):
"""Waits for a snapshot instance status to reach a given status."""
body = self.get_snapshot_instance(instance_id)
instance_status = body['status']
start = int(time.time())
while instance_status != expected_status:
time.sleep(self.build_interval)
body = self.get_snapshot_instance(instance_id)
instance_status = body['status']
if instance_status == expected_status:
return
if 'error' in instance_status:
raise share_exceptions.SnapshotInstanceBuildErrorException(
id=instance_id)
if int(time.time()) - start >= self.build_timeout:
message = ('The status of snapshot instance %(id)s failed to '
'reach %(expected_status)s status within the '
'required time (%(time)ss). Current '
'status: %(current_status)s.' %
{
'expected_status': expected_status,
'time': self.build_timeout,
'id': instance_id,
'current_status': instance_status,
})
raise exceptions.TimeoutException(message)
def get_snapshot_instance_export_location(
self, instance_id, export_location_uuid,
version=LATEST_MICROVERSION):
resp, body = self.get(
"snapshot-instances/%(instance_id)s/export-locations/%("
"el_uuid)s" % {
"instance_id": instance_id,
"el_uuid": export_location_uuid},
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_snapshot_instance_export_locations(
self, instance_id, version=LATEST_MICROVERSION):
resp, body = self.get(
"snapshot-instances/%s/export-locations" % instance_id,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
###############
def _get_access_action_name(self, version, action):
if utils.is_microversion_gt(version, "2.6"):
return action.split('os-')[-1]
return action
def create_access_rule(self, share_id, access_type="ip",
access_to="0.0.0.0", access_level=None,
version=LATEST_MICROVERSION, action_name=None):
post_body = {
self._get_access_action_name(version, 'os-allow_access'): {
"access_type": access_type,
"access_to": access_to,
"access_level": access_level,
}
}
body = json.dumps(post_body)
resp, body = self.post(
"shares/%s/action" % share_id, body, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_access_rules(self, share_id, version=LATEST_MICROVERSION,
action_name=None):
body = {self._get_access_action_name(version, 'os-access_list'): None}
resp, body = self.post(
"shares/%s/action" % share_id, json.dumps(body), version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def delete_access_rule(self, share_id, rule_id,
version=LATEST_MICROVERSION, action_name=None):
post_body = {
self._get_access_action_name(version, 'os-deny_access'): {
"access_id": rule_id,
}
}
body = json.dumps(post_body)
resp, body = self.post(
"shares/%s/action" % share_id, body, version=version)
self.expected_success(202, resp.status)
return body
###############
def list_availability_zones(self, url='availability-zones',
version=LATEST_MICROVERSION):
"""Get list of availability zones."""
if url is None:
if utils.is_microversion_gt(version, "2.6"):
url = 'availability-zones'
else:
url = 'os-availability-zone'
resp, body = self.get(url, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
###############
def list_services(self, params=None, url=None,
version=LATEST_MICROVERSION):
"""List services."""
if url is None:
if utils.is_microversion_gt(version, "2.6"):
url = 'services'
else:
url = 'os-services'
if params:
url += '?%s' % urlparse.urlencode(params)
resp, body = self.get(url, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
###############
def list_share_types(self, params=None, default=False,
version=LATEST_MICROVERSION):
uri = 'types'
if default:
uri += '/default'
if params is not None:
uri += '?%s' % urlparse.urlencode(params)
resp, body = self.get(uri, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def create_share_type(self, name, is_public=True,
version=LATEST_MICROVERSION, **kwargs):
if utils.is_microversion_gt(version, "2.6"):
is_public_keyname = 'share_type_access:is_public'
else:
is_public_keyname = 'os-share-type-access:is_public'
post_body = {
'name': name,
'extra_specs': kwargs.get('extra_specs'),
is_public_keyname: is_public,
}
post_body = json.dumps({'share_type': post_body})
resp, body = self.post('types', post_body, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def delete_share_type(self, share_type_id, version=LATEST_MICROVERSION):
resp, body = self.delete("types/%s" % share_type_id, version=version)
self.expected_success(202, resp.status)
return body
def get_share_type(self, share_type_id, version=LATEST_MICROVERSION):
resp, body = self.get("types/%s" % share_type_id, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_access_to_share_type(self, share_type_id,
version=LATEST_MICROVERSION,
action_name=None):
if action_name is None:
if utils.is_microversion_gt(version, "2.6"):
action_name = 'share_type_access'
else:
action_name = 'os-share-type-access'
url = 'types/%(st_id)s/%(action_name)s' % {
'st_id': share_type_id, 'action_name': action_name}
resp, body = self.get(url, version=version)
# [{"share_type_id": "%st_id%", "project_id": "%project_id%"}, ]
self.expected_success(200, resp.status)
return self._parse_resp(body)
###############
@staticmethod
def _get_quotas_url(version):
if utils.is_microversion_gt(version, "2.6"):
return 'quota-sets'
return 'os-quota-sets'
@staticmethod
def _get_quotas_url_arguments_as_str(user_id=None, share_type=None):
args_str = ''
if not (user_id is None or share_type is None):
args_str = "?user_id=%s&share_type=%s" % (user_id, share_type)
elif user_id is not None:
args_str = "?user_id=%s" % user_id
elif share_type is not None:
args_str = "?share_type=%s" % share_type
return args_str
def default_quotas(self, tenant_id, url=None, version=LATEST_MICROVERSION):
if url is None:
url = self._get_quotas_url(version)
url += '/%s' % tenant_id
resp, body = self.get("%s/defaults" % url, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def show_quotas(self, tenant_id, user_id=None, share_type=None, url=None,
version=LATEST_MICROVERSION):
if url is None:
url = self._get_quotas_url(version)
url += '/%s' % tenant_id
url += self._get_quotas_url_arguments_as_str(user_id, share_type)
resp, body = self.get(url, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def reset_quotas(self, tenant_id, user_id=None, share_type=None, url=None,
version=LATEST_MICROVERSION):
if url is None:
url = self._get_quotas_url(version)
url += '/%s' % tenant_id
url += self._get_quotas_url_arguments_as_str(user_id, share_type)
resp, body = self.delete(url, version=version)
self.expected_success(202, resp.status)
return body
def detail_quotas(self, tenant_id, user_id=None, share_type=None, url=None,
version=LATEST_MICROVERSION):
if url is None:
url = self._get_quotas_url(version)
url += '/%s/detail' % tenant_id
url += self._get_quotas_url_arguments_as_str(user_id, share_type)
resp, body = self.get(url, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def update_quotas(self, tenant_id, user_id=None, shares=None,
snapshots=None, gigabytes=None, snapshot_gigabytes=None,
share_networks=None, force=True, share_type=None,
url=None, version=LATEST_MICROVERSION):
if url is None:
url = self._get_quotas_url(version)
url += '/%s' % tenant_id
url += self._get_quotas_url_arguments_as_str(user_id, share_type)
put_body = {"tenant_id": tenant_id}
if force:
put_body["force"] = "true"
if shares is not None:
put_body["shares"] = shares
if snapshots is not None:
put_body["snapshots"] = snapshots
if gigabytes is not None:
put_body["gigabytes"] = gigabytes
if snapshot_gigabytes is not None:
put_body["snapshot_gigabytes"] = snapshot_gigabytes
if share_networks is not None:
put_body["share_networks"] = share_networks
put_body = json.dumps({"quota_set": put_body})
resp, body = self.put(url, put_body, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
###############
def create_share_group(self, name=None, description=None,
share_group_type_id=None, share_type_ids=(),
share_network_id=None,
source_share_group_snapshot_id=None,
availability_zone=None,
version=LATEST_MICROVERSION):
"""Create a new share group."""
uri = 'share-groups'
post_body = {}
if name:
post_body['name'] = name
if description:
post_body['description'] = description
if share_group_type_id:
post_body['share_group_type_id'] = share_group_type_id
if share_type_ids:
post_body['share_types'] = share_type_ids
if source_share_group_snapshot_id:
post_body['source_share_group_snapshot_id'] = (
source_share_group_snapshot_id)
if share_network_id:
post_body['share_network_id'] = share_network_id
if availability_zone:
post_body['availability_zone'] = availability_zone
body = json.dumps({'share_group': post_body})
resp, body = self.post(uri, body, headers=EXPERIMENTAL,
extra_headers=True, version=version)
self.expected_success(202, resp.status)
return self._parse_resp(body)
def delete_share_group(self, share_group_id, version=LATEST_MICROVERSION):
"""Delete a share group."""
uri = 'share-groups/%s' % share_group_id
resp, body = self.delete(uri, headers=EXPERIMENTAL,
extra_headers=True, version=version)
self.expected_success(202, resp.status)
return self._parse_resp(body)
def list_share_groups(self, detailed=False, params=None,
version=LATEST_MICROVERSION):
"""Get list of share groups w/o filters."""
uri = 'share-groups%s' % ('/detail' if detailed else '')
uri += '?%s' % (urlparse.urlencode(params) if params else '')
resp, body = self.get(uri, headers=EXPERIMENTAL, extra_headers=True,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def get_share_group(self, share_group_id, version=LATEST_MICROVERSION):
"""Get share group info."""
uri = 'share-groups/%s' % share_group_id
resp, body = self.get(uri, headers=EXPERIMENTAL, extra_headers=True,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def update_share_group(self, share_group_id, name=None, description=None,
version=LATEST_MICROVERSION, **kwargs):
"""Update an existing share group."""
uri = 'share-groups/%s' % share_group_id
post_body = {}
if name:
post_body['name'] = name
if description:
post_body['description'] = description
if kwargs:
post_body.update(kwargs)
body = json.dumps({'share_group': post_body})
resp, body = self.put(uri, body, headers=EXPERIMENTAL,
extra_headers=True, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def share_group_reset_state(self, share_group_id, status='error',
version=LATEST_MICROVERSION):
self.reset_state(share_group_id, status=status, s_type='groups',
headers=EXPERIMENTAL, version=version)
def share_group_force_delete(self, share_group_id,
version=LATEST_MICROVERSION):
self.force_delete(share_group_id, s_type='share-groups',
headers=EXPERIMENTAL, version=version)
def wait_for_share_group_status(self, share_group_id, status):
"""Waits for a share group to reach a given status."""
body = self.get_share_group(share_group_id)
sg_name = body['name']
sg_status = body['status']
start = int(time.time())
while sg_status != status:
time.sleep(self.build_interval)
body = self.get_share_group(share_group_id)
sg_status = body['status']
if 'error' in sg_status and status != 'error':
raise share_exceptions.ShareGroupBuildErrorException(
share_group_id=share_group_id)
if int(time.time()) - start >= self.build_timeout:
sg_name = sg_name or share_group_id
message = ('Share Group %s failed to reach %s status '
'within the required time (%s s). '
'Current status: %s' %
(sg_name, status, self.build_timeout, sg_status))
raise exceptions.TimeoutException(message)
###############
def create_share_group_type(self, name=None, share_types=(),
is_public=None, group_specs=None,
version=LATEST_MICROVERSION):
"""Create a new share group type."""
uri = 'share-group-types'
post_body = {}
if isinstance(share_types, (tuple, list)):
share_types = list(share_types)
else:
share_types = [share_types]
if name is not None:
post_body['name'] = name
if share_types:
post_body['share_types'] = share_types
if is_public is not None:
post_body['is_public'] = is_public
if group_specs:
post_body['group_specs'] = group_specs
body = json.dumps({'share_group_type': post_body})
resp, body = self.post(uri, body, headers=EXPERIMENTAL,
extra_headers=True, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_share_group_types(self, detailed=False, params=None,
version=LATEST_MICROVERSION):
"""Get list of share group types."""
uri = 'share-group-types%s' % ('/detail' if detailed else '')
uri += '?%s' % (urlparse.urlencode(params) if params else '')
resp, body = self.get(uri, headers=EXPERIMENTAL, extra_headers=True,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def get_share_group_type(self, share_group_type_id,
version=LATEST_MICROVERSION):
"""Get share group type info."""
uri = 'share-group-types/%s' % share_group_type_id
resp, body = self.get(uri, headers=EXPERIMENTAL, extra_headers=True,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def get_default_share_group_type(self, version=LATEST_MICROVERSION):
"""Get default share group type info."""
uri = 'share-group-types/default'
resp, body = self.get(uri, headers=EXPERIMENTAL, extra_headers=True,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def delete_share_group_type(self, share_group_type_id,
version=LATEST_MICROVERSION):
"""Delete an existing share group type."""
uri = 'share-group-types/%s' % share_group_type_id
resp, body = self.delete(uri, headers=EXPERIMENTAL,
extra_headers=True, version=version)
self.expected_success(204, resp.status)
return self._parse_resp(body)
def add_access_to_share_group_type(self, share_group_type_id, project_id,
version=LATEST_MICROVERSION):
uri = 'share-group-types/%s/action' % share_group_type_id
post_body = {'project': project_id}
post_body = json.dumps({'addProjectAccess': post_body})
resp, body = self.post(uri, post_body, headers=EXPERIMENTAL,
extra_headers=True, version=version)
self.expected_success(202, resp.status)
return self._parse_resp(body)
def remove_access_from_share_group_type(self, share_group_type_id,
project_id,
version=LATEST_MICROVERSION):
uri = 'share-group-types/%s/action' % share_group_type_id
post_body = {'project': project_id}
post_body = json.dumps({'removeProjectAccess': post_body})
resp, body = self.post(uri, post_body, headers=EXPERIMENTAL,
extra_headers=True, version=version)
self.expected_success(202, resp.status)
return self._parse_resp(body)
def list_access_to_share_group_type(self, share_group_type_id,
version=LATEST_MICROVERSION):
uri = 'share-group-types/%s/access' % share_group_type_id
resp, body = self.get(uri, headers=EXPERIMENTAL, extra_headers=True,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
###############
def create_share_group_type_specs(self, share_group_type_id,
group_specs_dict,
version=LATEST_MICROVERSION):
url = "share-group-types/%s/group-specs" % share_group_type_id
post_body = json.dumps({'group_specs': group_specs_dict})
resp, body = self.post(url, post_body, headers=EXPERIMENTAL,
extra_headers=True, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def get_share_group_type_spec(self, share_group_type_id, group_spec_key,
version=LATEST_MICROVERSION):
uri = "group-types/%s/group_specs/%s" % (
share_group_type_id, group_spec_key)
resp, body = self.get(uri, headers=EXPERIMENTAL, extra_headers=True,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_share_group_type_specs(self, share_group_type_id, params=None,
version=LATEST_MICROVERSION):
uri = "share-group-types/%s/group_specs" % share_group_type_id
if params is not None:
uri += '?%s' % urlparse.urlencode(params)
resp, body = self.get(uri, headers=EXPERIMENTAL, extra_headers=True,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def update_share_group_type_spec(self, share_group_type_id, group_spec_key,
group_spec_value,
version=LATEST_MICROVERSION):
uri = "share-group-types/%s/group-specs/%s" % (
share_group_type_id, group_spec_key)
group_spec = {group_spec_key: group_spec_value}
post_body = json.dumps(group_spec)
resp, body = self.put(uri, post_body, headers=EXPERIMENTAL,
extra_headers=True, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def update_share_group_type_specs(self, share_group_type_id,
group_specs_dict,
version=LATEST_MICROVERSION):
return self.create_share_group_type_specs(
share_group_type_id, group_specs_dict, version=version)
def delete_share_group_type_spec(self, share_type_id, group_spec_key,
version=LATEST_MICROVERSION):
uri = "share-group-types/%s/group-specs/%s" % (
share_type_id, group_spec_key)
resp, body = self.delete(uri, headers=EXPERIMENTAL, extra_headers=True,
version=version)
self.expected_success(204, resp.status)
return body
###############
def create_share_group_snapshot(self, share_group_id, name=None,
description=None,
version=LATEST_MICROVERSION):
"""Create a new share group snapshot of an existing share group."""
uri = 'share-group-snapshots'
post_body = {'share_group_id': share_group_id}
if name:
post_body['name'] = name
if description:
post_body['description'] = description
body = json.dumps({'share_group_snapshot': post_body})
resp, body = self.post(uri, body, headers=EXPERIMENTAL,
extra_headers=True, version=version)
self.expected_success(202, resp.status)
return self._parse_resp(body)
def delete_share_group_snapshot(self, share_group_snapshot_id,
version=LATEST_MICROVERSION):
"""Delete an existing share group snapshot."""
uri = 'share-group-snapshots/%s' % share_group_snapshot_id
resp, body = self.delete(uri, headers=EXPERIMENTAL,
extra_headers=True, version=version)
self.expected_success(202, resp.status)
return body
def list_share_group_snapshots(self, detailed=False, params=None,
version=LATEST_MICROVERSION):
"""Get list of share group snapshots w/o filters."""
uri = 'share-group-snapshots%s' % ('/detail' if detailed else '')
uri += '?%s' % (urlparse.urlencode(params) if params else '')
resp, body = self.get(uri, headers=EXPERIMENTAL, extra_headers=True,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def get_share_group_snapshot(self, share_group_snapshot_id,
version=LATEST_MICROVERSION):
"""Get share group snapshot info."""
uri = 'share-group-snapshots/%s' % share_group_snapshot_id
resp, body = self.get(uri, headers=EXPERIMENTAL, extra_headers=True,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def update_share_group_snapshot(self, share_group_snapshot_id, name=None,
description=None,
version=LATEST_MICROVERSION):
"""Update an existing share group snapshot."""
uri = 'share-group-snapshots/%s' % share_group_snapshot_id
post_body = {}
if name:
post_body['name'] = name
if description:
post_body['description'] = description
body = json.dumps({'share_group_snapshot': post_body})
resp, body = self.put(uri, body, headers=EXPERIMENTAL,
extra_headers=True, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def share_group_snapshot_reset_state(self, share_group_snapshot_id,
status='error',
version=LATEST_MICROVERSION):
self.reset_state(
share_group_snapshot_id, status=status,
s_type='group-snapshots', headers=EXPERIMENTAL, version=version)
def share_group_snapshot_force_delete(self, share_group_snapshot_id,
version=LATEST_MICROVERSION):
self.force_delete(
share_group_snapshot_id, s_type='share-group-snapshots',
headers=EXPERIMENTAL, version=version)
def wait_for_share_group_snapshot_status(self, share_group_snapshot_id,
status):
"""Waits for a share group snapshot to reach a given status."""
body = self.get_share_group_snapshot(share_group_snapshot_id)
sg_snapshot_name = body['name']
sg_snapshot_status = body['status']
start = int(time.time())
while sg_snapshot_status != status:
time.sleep(self.build_interval)
body = self.get_share_group_snapshot(share_group_snapshot_id)
sg_snapshot_status = body['status']
if 'error' in sg_snapshot_status and status != 'error':
raise share_exceptions.ShareGroupSnapshotBuildErrorException(
share_group_snapshot_id=share_group_snapshot_id)
if int(time.time()) - start >= self.build_timeout:
message = ('Share Group Snapshot %s failed to reach %s status '
'within the required time (%s s).' %
(sg_snapshot_name, status, self.build_timeout))
raise exceptions.TimeoutException(message)
###############
def migrate_share(self, share_id, host,
force_host_assisted_migration=False,
new_share_network_id=None, writable=False,
preserve_metadata=False, preserve_snapshots=False,
nondisruptive=False, new_share_type_id=None,
version=LATEST_MICROVERSION):
body = {
'migration_start': {
'host': host,
'force_host_assisted_migration': force_host_assisted_migration,
'new_share_network_id': new_share_network_id,
'new_share_type_id': new_share_type_id,
'writable': writable,
'preserve_metadata': preserve_metadata,
'preserve_snapshots': preserve_snapshots,
'nondisruptive': nondisruptive,
}
}
body = json.dumps(body)
return self.post('shares/%s/action' % share_id, body,
headers=EXPERIMENTAL, extra_headers=True,
version=version)
def migration_complete(self, share_id, version=LATEST_MICROVERSION,
action_name='migration_complete'):
post_body = {
action_name: None,
}
body = json.dumps(post_body)
return self.post('shares/%s/action' % share_id, body,
headers=EXPERIMENTAL, extra_headers=True,
version=version)
def migration_cancel(self, share_id, version=LATEST_MICROVERSION,
action_name='migration_cancel'):
post_body = {
action_name: None,
}
body = json.dumps(post_body)
return self.post('shares/%s/action' % share_id, body,
headers=EXPERIMENTAL, extra_headers=True,
version=version)
def migration_get_progress(self, share_id, version=LATEST_MICROVERSION,
action_name='migration_get_progress'):
post_body = {
action_name: None,
}
body = json.dumps(post_body)
result = self.post('shares/%s/action' % share_id, body,
headers=EXPERIMENTAL, extra_headers=True,
version=version)
return json.loads(result[1])
def reset_task_state(
self, share_id, task_state, version=LATEST_MICROVERSION,
action_name='reset_task_state'):
post_body = {
action_name: {
'task_state': task_state,
}
}
body = json.dumps(post_body)
return self.post('shares/%s/action' % share_id, body,
headers=EXPERIMENTAL, extra_headers=True,
version=version)
def wait_for_migration_status(self, share_id, dest_host, status_to_wait,
version=LATEST_MICROVERSION):
"""Waits for a share to migrate to a certain host."""
statuses = ((status_to_wait,)
if not isinstance(status_to_wait, (tuple, list, set))
else status_to_wait)
share = self.get_share(share_id, version=version)
migration_timeout = CONF.share.migration_timeout
start = int(time.time())
while share['task_state'] not in statuses:
time.sleep(self.build_interval)
share = self.get_share(share_id, version=version)
if share['task_state'] in statuses:
break
elif share['task_state'] == 'migration_error':
raise share_exceptions.ShareMigrationException(
share_id=share['id'], src=share['host'], dest=dest_host)
elif int(time.time()) - start >= migration_timeout:
message = ('Share %(share_id)s failed to reach a status in'
'%(status)s when migrating from host %(src)s to '
'host %(dest)s within the required time '
'%(timeout)s.' % {
'src': share['host'],
'dest': dest_host,
'share_id': share['id'],
'timeout': self.build_timeout,
'status': six.text_type(statuses),
})
raise exceptions.TimeoutException(message)
return share
################
def create_share_replica(self, share_id, availability_zone=None,
version=LATEST_MICROVERSION):
"""Add a share replica of an existing share."""
uri = "share-replicas"
post_body = {
'share_id': share_id,
'availability_zone': availability_zone,
}
body = json.dumps({'share_replica': post_body})
resp, body = self.post(uri, body,
headers=EXPERIMENTAL,
extra_headers=True,
version=version)
self.expected_success(202, resp.status)
return self._parse_resp(body)
def get_share_replica(self, replica_id, version=LATEST_MICROVERSION):
"""Get the details of share_replica."""
resp, body = self.get("share-replicas/%s" % replica_id,
headers=EXPERIMENTAL,
extra_headers=True,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_share_replicas(self, share_id=None, version=LATEST_MICROVERSION):
"""Get list of replicas."""
uri = "share-replicas/detail"
uri += ("?share_id=%s" % share_id) if share_id is not None else ''
resp, body = self.get(uri, headers=EXPERIMENTAL,
extra_headers=True, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_share_replicas_summary(self, share_id=None,
version=LATEST_MICROVERSION):
"""Get summary list of replicas."""
uri = "share-replicas"
uri += ("?share_id=%s" % share_id) if share_id is not None else ''
resp, body = self.get(uri, headers=EXPERIMENTAL,
extra_headers=True, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def delete_share_replica(self, replica_id, version=LATEST_MICROVERSION):
"""Delete share_replica."""
uri = "share-replicas/%s" % replica_id
resp, body = self.delete(uri,
headers=EXPERIMENTAL,
extra_headers=True,
version=version)
self.expected_success(202, resp.status)
return body
def promote_share_replica(self, replica_id, expected_status=202,
version=LATEST_MICROVERSION):
"""Promote a share replica to active state."""
uri = "share-replicas/%s/action" % replica_id
post_body = {
'promote': None,
}
body = json.dumps(post_body)
resp, body = self.post(uri, body,
headers=EXPERIMENTAL,
extra_headers=True,
version=version)
self.expected_success(expected_status, resp.status)
return self._parse_resp(body)
def wait_for_share_replica_status(self, replica_id, expected_status,
status_attr='status'):
"""Waits for a replica's status_attr to reach a given status."""
body = self.get_share_replica(replica_id)
replica_status = body[status_attr]
start = int(time.time())
while replica_status != expected_status:
time.sleep(self.build_interval)
body = self.get_share_replica(replica_id)
replica_status = body[status_attr]
if replica_status == expected_status:
return
if ('error' in replica_status
and expected_status != constants.STATUS_ERROR):
raise share_exceptions.ShareInstanceBuildErrorException(
id=replica_id)
if int(time.time()) - start >= self.build_timeout:
message = ('The %(status_attr)s of Replica %(id)s failed to '
'reach %(expected_status)s status within the '
'required time (%(time)ss). Current '
'%(status_attr)s: %(current_status)s.' %
{
'status_attr': status_attr,
'expected_status': expected_status,
'time': self.build_timeout,
'id': replica_id,
'current_status': replica_status,
})
raise exceptions.TimeoutException(message)
def reset_share_replica_status(self, replica_id,
status=constants.STATUS_AVAILABLE,
version=LATEST_MICROVERSION):
"""Reset the status."""
uri = 'share-replicas/%s/action' % replica_id
post_body = {
'reset_status': {
'status': status
}
}
body = json.dumps(post_body)
resp, body = self.post(uri, body,
headers=EXPERIMENTAL,
extra_headers=True,
version=version)
self.expected_success(202, resp.status)
return self._parse_resp(body)
def reset_share_replica_state(self, replica_id,
state=constants.REPLICATION_STATE_ACTIVE,
version=LATEST_MICROVERSION):
"""Reset the replication state of a replica."""
uri = 'share-replicas/%s/action' % replica_id
post_body = {
'reset_replica_state': {
'replica_state': state
}
}
body = json.dumps(post_body)
resp, body = self.post(uri, body,
headers=EXPERIMENTAL,
extra_headers=True,
version=version)
self.expected_success(202, resp.status)
return self._parse_resp(body)
def resync_share_replica(self, replica_id, expected_result=202,
version=LATEST_MICROVERSION):
"""Force an immediate resync of the replica."""
uri = 'share-replicas/%s/action' % replica_id
post_body = {
'resync': None
}
body = json.dumps(post_body)
resp, body = self.post(uri, body,
headers=EXPERIMENTAL,
extra_headers=True,
version=version)
self.expected_success(expected_result, resp.status)
return self._parse_resp(body)
def force_delete_share_replica(self, replica_id,
version=LATEST_MICROVERSION):
"""Force delete a replica."""
uri = 'share-replicas/%s/action' % replica_id
post_body = {
'force_delete': None
}
body = json.dumps(post_body)
resp, body = self.post(uri, body,
headers=EXPERIMENTAL,
extra_headers=True,
version=version)
self.expected_success(202, resp.status)
return self._parse_resp(body)
def list_share_networks(self, detailed=False, params=None,
version=LATEST_MICROVERSION):
"""Get list of share networks w/o filters."""
uri = 'share-networks/detail' if detailed else 'share-networks'
uri += '?%s' % urlparse.urlencode(params) if params else ''
resp, body = self.get(uri, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_share_networks_with_detail(self, params=None,
version=LATEST_MICROVERSION):
"""Get detailed list of share networks w/o filters."""
return self.list_share_networks(
detailed=True, params=params, version=version)
def get_share_network(self, share_network_id, version=LATEST_MICROVERSION):
resp, body = self.get("share-networks/%s" % share_network_id,
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
################
def create_snapshot_access_rule(self, snapshot_id, access_type="ip",
access_to="0.0.0.0/0"):
body = {
"allow_access": {
"access_type": access_type,
"access_to": access_to
}
}
resp, body = self.post("snapshots/%s/action" % snapshot_id,
json.dumps(body), version=LATEST_MICROVERSION)
self.expected_success(202, resp.status)
return self._parse_resp(body)
def get_snapshot_access_rule(self, snapshot_id, rule_id):
resp, body = self.get("snapshots/%s/access-list" % snapshot_id,
version=LATEST_MICROVERSION)
body = self._parse_resp(body)
found_rules = filter(lambda x: x['id'] == rule_id, body)
return found_rules[0] if len(found_rules) > 0 else None
def wait_for_snapshot_access_rule_status(self, snapshot_id, rule_id,
expected_state='active'):
rule = self.get_snapshot_access_rule(snapshot_id, rule_id)
state = rule['state']
start = int(time.time())
while state != expected_state:
time.sleep(self.build_interval)
rule = self.get_snapshot_access_rule(snapshot_id, rule_id)
state = rule['state']
if state == expected_state:
return
if 'error' in state:
raise share_exceptions.AccessRuleBuildErrorException(
snapshot_id)
if int(time.time()) - start >= self.build_timeout:
message = ('The status of snapshot access rule %(id)s failed '
'to reach %(expected_state)s state within the '
'required time (%(time)ss). Current '
'state: %(current_state)s.' %
{
'expected_state': expected_state,
'time': self.build_timeout,
'id': rule_id,
'current_state': state,
})
raise exceptions.TimeoutException(message)
def delete_snapshot_access_rule(self, snapshot_id, rule_id):
body = {
"deny_access": {
"access_id": rule_id,
}
}
resp, body = self.post("snapshots/%s/action" % snapshot_id,
json.dumps(body), version=LATEST_MICROVERSION)
self.expected_success(202, resp.status)
return self._parse_resp(body)
def wait_for_snapshot_access_rule_deletion(self, snapshot_id, rule_id):
rule = self.get_snapshot_access_rule(snapshot_id, rule_id)
start = int(time.time())
while rule is not None:
time.sleep(self.build_interval)
rule = self.get_snapshot_access_rule(snapshot_id, rule_id)
if rule is None:
return
if int(time.time()) - start >= self.build_timeout:
message = ('The snapshot access rule %(id)s failed to delete '
'within the required time (%(time)ss).' %
{
'time': self.build_timeout,
'id': rule_id,
})
raise exceptions.TimeoutException(message)
def get_snapshot_export_location(self, snapshot_id, export_location_uuid,
version=LATEST_MICROVERSION):
resp, body = self.get(
"snapshots/%(snapshot_id)s/export-locations/%(el_uuid)s" % {
"snapshot_id": snapshot_id, "el_uuid": export_location_uuid},
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_snapshot_export_locations(
self, snapshot_id, version=LATEST_MICROVERSION):
resp, body = self.get(
"snapshots/%s/export-locations" % snapshot_id, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
###############
def get_message(self, message_id, version=LATEST_MICROVERSION):
"""Show details for a single message."""
url = 'messages/%s' % message_id
resp, body = self.get(url, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def list_messages(self, params=None, version=LATEST_MICROVERSION):
"""List all messages."""
url = 'messages'
url += '?%s' % urlparse.urlencode(params) if params else ''
resp, body = self.get(url, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def delete_message(self, message_id, version=LATEST_MICROVERSION):
"""Delete a single message."""
url = 'messages/%s' % message_id
resp, body = self.delete(url, version=version)
self.expected_success(204, resp.status)
return self._parse_resp(body)
def wait_for_message(self, resource_id):
"""Waits until a message for a resource with given id exists"""
start = int(time.time())
message = None
while not message:
time.sleep(self.build_interval)
for msg in self.list_messages():
if msg['resource_id'] == resource_id:
return msg
if int(time.time()) - start >= self.build_timeout:
message = ('No message for resource with id %s was created in'
' the required time (%s s).' %
(resource_id, self.build_timeout))
raise exceptions.TimeoutException(message)