enable volume list tests for cinder v2 - part1

This changeset only copies the v1 files into the appropriate v2
directories unchanged. There's the other changeset based on this
patch which makes the modifications required for v2.

Implements: blueprint cinder-v2-api-tests
Change-Id: Ia161409050ae1926376b41f31fff1ac389da79fc
This commit is contained in:
Zhi Kun Liu
2014-02-08 11:52:05 +08:00
parent be8bdbc203
commit 4be2f60d1e
7 changed files with 942 additions and 0 deletions

View File

View File

@@ -0,0 +1,228 @@
# Copyright 2012 OpenStack Foundation
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import operator
from tempest.api.volume import base
from tempest.common.utils import data_utils
from tempest.openstack.common import log as logging
from tempest.test import attr
from testtools.matchers import ContainsAll
LOG = logging.getLogger(__name__)
VOLUME_FIELDS = ('id', 'display_name')
class VolumesListTest(base.BaseVolumeV1Test):
"""
This test creates a number of 1G volumes. To run successfully,
ensure that the backing file for the volume group that Nova uses
has space for at least 3 1G volumes!
If you are running a Devstack environment, ensure that the
VOLUME_BACKING_FILE_SIZE is at least 4G in your localrc
"""
_interface = 'json'
def assertVolumesIn(self, fetched_list, expected_list, fields=None):
if fields:
expected_list = map(operator.itemgetter(*fields), expected_list)
fetched_list = map(operator.itemgetter(*fields), fetched_list)
missing_vols = [v for v in expected_list if v not in fetched_list]
if len(missing_vols) == 0:
return
def str_vol(vol):
return "%s:%s" % (vol['id'], vol['display_name'])
raw_msg = "Could not find volumes %s in expected list %s; fetched %s"
self.fail(raw_msg % ([str_vol(v) for v in missing_vols],
[str_vol(v) for v in expected_list],
[str_vol(v) for v in fetched_list]))
@classmethod
def setUpClass(cls):
super(VolumesListTest, cls).setUpClass()
cls.client = cls.volumes_client
# Create 3 test volumes
cls.volume_list = []
cls.volume_id_list = []
cls.metadata = {'Type': 'work'}
for i in range(3):
try:
volume = cls.create_volume(metadata=cls.metadata)
resp, volume = cls.client.get_volume(volume['id'])
cls.volume_list.append(volume)
cls.volume_id_list.append(volume['id'])
except Exception:
LOG.exception('Failed to create volume. %d volumes were '
'created' % len(cls.volume_id_list))
if cls.volume_list:
# We could not create all the volumes, though we were able
# to create *some* of the volumes. This is typically
# because the backing file size of the volume group is
# too small.
for volid in cls.volume_id_list:
cls.client.delete_volume(volid)
cls.client.wait_for_resource_deletion(volid)
raise
@classmethod
def tearDownClass(cls):
# Delete the created volumes
for volid in cls.volume_id_list:
resp, _ = cls.client.delete_volume(volid)
cls.client.wait_for_resource_deletion(volid)
super(VolumesListTest, cls).tearDownClass()
def _list_by_param_value_and_assert(self, params, with_detail=False):
"""
Perform list or list_details action with given params
and validates result.
"""
if with_detail:
resp, fetched_vol_list = \
self.client.list_volumes_with_detail(params=params)
else:
resp, fetched_vol_list = self.client.list_volumes(params=params)
self.assertEqual(200, resp.status)
# Validating params of fetched volumes
for volume in fetched_vol_list:
for key in params:
msg = "Failed to list volumes %s by %s" % \
('details' if with_detail else '', key)
if key == 'metadata':
self.assertThat(volume[key].items(),
ContainsAll(params[key].items()),
msg)
else:
self.assertEqual(params[key], volume[key], msg)
@attr(type='smoke')
def test_volume_list(self):
# Get a list of Volumes
# Fetch all volumes
resp, fetched_list = self.client.list_volumes()
self.assertEqual(200, resp.status)
self.assertVolumesIn(fetched_list, self.volume_list,
fields=VOLUME_FIELDS)
@attr(type='gate')
def test_volume_list_with_details(self):
# Get a list of Volumes with details
# Fetch all Volumes
resp, fetched_list = self.client.list_volumes_with_detail()
self.assertEqual(200, resp.status)
self.assertVolumesIn(fetched_list, self.volume_list)
@attr(type='gate')
def test_volume_list_by_name(self):
volume = self.volume_list[data_utils.rand_int_id(0, 2)]
params = {'display_name': volume['display_name']}
resp, fetched_vol = self.client.list_volumes(params)
self.assertEqual(200, resp.status)
self.assertEqual(1, len(fetched_vol), str(fetched_vol))
self.assertEqual(fetched_vol[0]['display_name'],
volume['display_name'])
@attr(type='gate')
def test_volume_list_details_by_name(self):
volume = self.volume_list[data_utils.rand_int_id(0, 2)]
params = {'display_name': volume['display_name']}
resp, fetched_vol = self.client.list_volumes_with_detail(params)
self.assertEqual(200, resp.status)
self.assertEqual(1, len(fetched_vol), str(fetched_vol))
self.assertEqual(fetched_vol[0]['display_name'],
volume['display_name'])
@attr(type='gate')
def test_volumes_list_by_status(self):
params = {'status': 'available'}
resp, fetched_list = self.client.list_volumes(params)
self.assertEqual(200, resp.status)
for volume in fetched_list:
self.assertEqual('available', volume['status'])
self.assertVolumesIn(fetched_list, self.volume_list,
fields=VOLUME_FIELDS)
@attr(type='gate')
def test_volumes_list_details_by_status(self):
params = {'status': 'available'}
resp, fetched_list = self.client.list_volumes_with_detail(params)
self.assertEqual(200, resp.status)
for volume in fetched_list:
self.assertEqual('available', volume['status'])
self.assertVolumesIn(fetched_list, self.volume_list)
@attr(type='gate')
def test_volumes_list_by_availability_zone(self):
volume = self.volume_list[data_utils.rand_int_id(0, 2)]
zone = volume['availability_zone']
params = {'availability_zone': zone}
resp, fetched_list = self.client.list_volumes(params)
self.assertEqual(200, resp.status)
for volume in fetched_list:
self.assertEqual(zone, volume['availability_zone'])
self.assertVolumesIn(fetched_list, self.volume_list,
fields=VOLUME_FIELDS)
@attr(type='gate')
def test_volumes_list_details_by_availability_zone(self):
volume = self.volume_list[data_utils.rand_int_id(0, 2)]
zone = volume['availability_zone']
params = {'availability_zone': zone}
resp, fetched_list = self.client.list_volumes_with_detail(params)
self.assertEqual(200, resp.status)
for volume in fetched_list:
self.assertEqual(zone, volume['availability_zone'])
self.assertVolumesIn(fetched_list, self.volume_list)
@attr(type='gate')
def test_volume_list_with_param_metadata(self):
# Test to list volumes when metadata param is given
params = {'metadata': self.metadata}
self._list_by_param_value_and_assert(params)
@attr(type='gate')
def test_volume_list_with_detail_param_metadata(self):
# Test to list volumes details when metadata param is given
params = {'metadata': self.metadata}
self._list_by_param_value_and_assert(params, with_detail=True)
@attr(type='gate')
def test_volume_list_param_display_name_and_status(self):
# Test to list volume when display name and status param is given
volume = self.volume_list[data_utils.rand_int_id(0, 2)]
params = {'display_name': volume['display_name'],
'status': 'available'}
self._list_by_param_value_and_assert(params)
@attr(type='gate')
def test_volume_list_with_detail_param_display_name_and_status(self):
# Test to list volume when name and status param is given
volume = self.volume_list[data_utils.rand_int_id(0, 2)]
params = {'display_name': volume['display_name'],
'status': 'available'}
self._list_by_param_value_and_assert(params, with_detail=True)
class VolumeListTestXML(VolumesListTest):
_interface = 'xml'

View File

View File

@@ -0,0 +1,302 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import time
import urllib
from tempest.common.rest_client import RestClient
from tempest import config
from tempest import exceptions
CONF = config.CONF
class VolumesClientJSON(RestClient):
"""
Client class to send CRUD Volume API requests to a Cinder endpoint
"""
def __init__(self, auth_provider):
super(VolumesClientJSON, self).__init__(auth_provider)
self.service = CONF.volume.catalog_type
self.build_interval = CONF.volume.build_interval
self.build_timeout = CONF.volume.build_timeout
def get_attachment_from_volume(self, volume):
"""Return the element 'attachment' from input volumes."""
return volume['attachments'][0]
def list_volumes(self, params=None):
"""List all the volumes created."""
url = 'volumes'
if params:
url += '?%s' % urllib.urlencode(params)
resp, body = self.get(url)
body = json.loads(body)
return resp, body['volumes']
def list_volumes_with_detail(self, params=None):
"""List the details of all volumes."""
url = 'volumes/detail'
if params:
url += '?%s' % urllib.urlencode(params)
resp, body = self.get(url)
body = json.loads(body)
return resp, body['volumes']
def get_volume(self, volume_id):
"""Returns the details of a single volume."""
url = "volumes/%s" % str(volume_id)
resp, body = self.get(url)
body = json.loads(body)
return resp, body['volume']
def create_volume(self, size, **kwargs):
"""
Creates a new Volume.
size(Required): Size of volume in GB.
Following optional keyword arguments are accepted:
display_name: Optional Volume Name.
metadata: A dictionary of values to be used as metadata.
volume_type: Optional Name of volume_type for the volume
snapshot_id: When specified the volume is created from this snapshot
imageRef: When specified the volume is created from this image
"""
post_body = {'size': size}
post_body.update(kwargs)
post_body = json.dumps({'volume': post_body})
resp, body = self.post('volumes', post_body, self.headers)
body = json.loads(body)
return resp, body['volume']
def update_volume(self, volume_id, **kwargs):
"""Updates the Specified Volume."""
put_body = json.dumps({'volume': kwargs})
resp, body = self.put('volumes/%s' % volume_id, put_body,
self.headers)
body = json.loads(body)
return resp, body['volume']
def delete_volume(self, volume_id):
"""Deletes the Specified Volume."""
return self.delete("volumes/%s" % str(volume_id))
def upload_volume(self, volume_id, image_name, disk_format):
"""Uploads a volume in Glance."""
post_body = {
'image_name': image_name,
'disk_format': disk_format
}
post_body = json.dumps({'os-volume_upload_image': post_body})
url = 'volumes/%s/action' % (volume_id)
resp, body = self.post(url, post_body, self.headers)
body = json.loads(body)
return resp, body['os-volume_upload_image']
def attach_volume(self, volume_id, instance_uuid, mountpoint):
"""Attaches a volume to a given instance on a given mountpoint."""
post_body = {
'instance_uuid': instance_uuid,
'mountpoint': mountpoint,
}
post_body = json.dumps({'os-attach': post_body})
url = 'volumes/%s/action' % (volume_id)
resp, body = self.post(url, post_body, self.headers)
return resp, body
def detach_volume(self, volume_id):
"""Detaches a volume from an instance."""
post_body = {}
post_body = json.dumps({'os-detach': post_body})
url = 'volumes/%s/action' % (volume_id)
resp, body = self.post(url, post_body, self.headers)
return resp, body
def reserve_volume(self, volume_id):
"""Reserves a volume."""
post_body = {}
post_body = json.dumps({'os-reserve': post_body})
url = 'volumes/%s/action' % (volume_id)
resp, body = self.post(url, post_body, self.headers)
return resp, body
def unreserve_volume(self, volume_id):
"""Restore a reserved volume ."""
post_body = {}
post_body = json.dumps({'os-unreserve': post_body})
url = 'volumes/%s/action' % (volume_id)
resp, body = self.post(url, post_body, self.headers)
return resp, body
def wait_for_volume_status(self, volume_id, status):
"""Waits for a Volume to reach a given status."""
resp, body = self.get_volume(volume_id)
volume_name = body['display_name']
volume_status = body['status']
start = int(time.time())
while volume_status != status:
time.sleep(self.build_interval)
resp, body = self.get_volume(volume_id)
volume_status = body['status']
if volume_status == 'error':
raise exceptions.VolumeBuildErrorException(volume_id=volume_id)
if int(time.time()) - start >= self.build_timeout:
message = ('Volume %s failed to reach %s status within '
'the required time (%s s).' %
(volume_name, status, self.build_timeout))
raise exceptions.TimeoutException(message)
def is_resource_deleted(self, id):
try:
self.get_volume(id)
except exceptions.NotFound:
return True
return False
def extend_volume(self, volume_id, extend_size):
"""Extend a volume."""
post_body = {
'new_size': extend_size
}
post_body = json.dumps({'os-extend': post_body})
url = 'volumes/%s/action' % (volume_id)
resp, body = self.post(url, post_body, self.headers)
return resp, body
def reset_volume_status(self, volume_id, status):
"""Reset the Specified Volume's Status."""
post_body = json.dumps({'os-reset_status': {"status": status}})
resp, body = self.post('volumes/%s/action' % volume_id, post_body,
self.headers)
return resp, body
def volume_begin_detaching(self, volume_id):
"""Volume Begin Detaching."""
post_body = json.dumps({'os-begin_detaching': {}})
resp, body = self.post('volumes/%s/action' % volume_id, post_body,
self.headers)
return resp, body
def volume_roll_detaching(self, volume_id):
"""Volume Roll Detaching."""
post_body = json.dumps({'os-roll_detaching': {}})
resp, body = self.post('volumes/%s/action' % volume_id, post_body,
self.headers)
return resp, body
def create_volume_transfer(self, vol_id, display_name=None):
"""Create a volume transfer."""
post_body = {
'volume_id': vol_id
}
if display_name:
post_body['name'] = display_name
post_body = json.dumps({'transfer': post_body})
resp, body = self.post('os-volume-transfer',
post_body,
self.headers)
body = json.loads(body)
return resp, body['transfer']
def get_volume_transfer(self, transfer_id):
"""Returns the details of a volume transfer."""
url = "os-volume-transfer/%s" % str(transfer_id)
resp, body = self.get(url, self.headers)
body = json.loads(body)
return resp, body['transfer']
def list_volume_transfers(self, params=None):
"""List all the volume transfers created."""
url = 'os-volume-transfer'
if params:
url += '?%s' % urllib.urlencode(params)
resp, body = self.get(url)
body = json.loads(body)
return resp, body['transfers']
def delete_volume_transfer(self, transfer_id):
"""Delete a volume transfer."""
return self.delete("os-volume-transfer/%s" % str(transfer_id))
def accept_volume_transfer(self, transfer_id, transfer_auth_key):
"""Accept a volume transfer."""
post_body = {
'auth_key': transfer_auth_key,
}
url = 'os-volume-transfer/%s/accept' % transfer_id
post_body = json.dumps({'accept': post_body})
resp, body = self.post(url, post_body, self.headers)
body = json.loads(body)
return resp, body['transfer']
def update_volume_readonly(self, volume_id, readonly):
"""Update the Specified Volume readonly."""
post_body = {
'readonly': readonly
}
post_body = json.dumps({'os-update_readonly_flag': post_body})
url = 'volumes/%s/action' % (volume_id)
resp, body = self.post(url, post_body, self.headers)
return resp, body
def force_delete_volume(self, volume_id):
"""Force Delete Volume."""
post_body = json.dumps({'os-force_delete': {}})
resp, body = self.post('volumes/%s/action' % volume_id, post_body,
self.headers)
return resp, body
def create_volume_metadata(self, volume_id, metadata):
"""Create metadata for the volume."""
put_body = json.dumps({'metadata': metadata})
url = "volumes/%s/metadata" % str(volume_id)
resp, body = self.post(url, put_body, self.headers)
body = json.loads(body)
return resp, body['metadata']
def get_volume_metadata(self, volume_id):
"""Get metadata of the volume."""
url = "volumes/%s/metadata" % str(volume_id)
resp, body = self.get(url, self.headers)
body = json.loads(body)
return resp, body['metadata']
def update_volume_metadata(self, volume_id, metadata):
"""Update metadata for the volume."""
put_body = json.dumps({'metadata': metadata})
url = "volumes/%s/metadata" % str(volume_id)
resp, body = self.put(url, put_body, self.headers)
body = json.loads(body)
return resp, body['metadata']
def update_volume_metadata_item(self, volume_id, id, meta_item):
"""Update metadata item for the volume."""
put_body = json.dumps({'meta': meta_item})
url = "volumes/%s/metadata/%s" % (str(volume_id), str(id))
resp, body = self.put(url, put_body, self.headers)
body = json.loads(body)
return resp, body['meta']
def delete_volume_metadata_item(self, volume_id, id):
"""Delete metadata item for the volume."""
url = "volumes/%s/metadata/%s" % (str(volume_id), str(id))
resp, body = self.delete(url, self.headers)
return resp, body

View File

@@ -0,0 +1,412 @@
# Copyright 2012 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import urllib
from lxml import etree
from tempest.common.rest_client import RestClientXML
from tempest import config
from tempest import exceptions
from tempest.services.compute.xml.common import Document
from tempest.services.compute.xml.common import Element
from tempest.services.compute.xml.common import Text
from tempest.services.compute.xml.common import xml_to_json
from tempest.services.compute.xml.common import XMLNS_11
CONF = config.CONF
class VolumesClientXML(RestClientXML):
"""
Client class to send CRUD Volume API requests to a Cinder endpoint
"""
def __init__(self, auth_provider):
super(VolumesClientXML, self).__init__(auth_provider)
self.service = CONF.volume.catalog_type
self.build_interval = CONF.compute.build_interval
self.build_timeout = CONF.compute.build_timeout
def _parse_volume(self, body):
vol = dict((attr, body.get(attr)) for attr in body.keys())
for child in body.getchildren():
tag = child.tag
if tag.startswith("{"):
ns, tag = tag.split("}", 1)
if tag == 'metadata':
vol['metadata'] = dict((meta.get('key'),
meta.text) for meta in
child.getchildren())
else:
vol[tag] = xml_to_json(child)
return vol
def get_attachment_from_volume(self, volume):
"""Return the element 'attachment' from input volumes."""
return volume['attachments']['attachment']
def _check_if_bootable(self, volume):
"""
Check if the volume is bootable, also change the value
of 'bootable' from string to boolean.
"""
# NOTE(jdg): Version 1 of Cinder API uses lc strings
# We should consider being explicit in this check to
# avoid introducing bugs like: LP #1227837
if volume['bootable'].lower() == 'true':
volume['bootable'] = True
elif volume['bootable'].lower() == 'false':
volume['bootable'] = False
else:
raise ValueError(
'bootable flag is supposed to be either True or False,'
'it is %s' % volume['bootable'])
return volume
def list_volumes(self, params=None):
"""List all the volumes created."""
url = 'volumes'
if params:
url += '?%s' % urllib.urlencode(params)
resp, body = self.get(url, self.headers)
body = etree.fromstring(body)
volumes = []
if body is not None:
volumes += [self._parse_volume(vol) for vol in list(body)]
for v in volumes:
v = self._check_if_bootable(v)
return resp, volumes
def list_volumes_with_detail(self, params=None):
"""List all the details of volumes."""
url = 'volumes/detail'
if params:
url += '?%s' % urllib.urlencode(params)
resp, body = self.get(url, self.headers)
body = etree.fromstring(body)
volumes = []
if body is not None:
volumes += [self._parse_volume(vol) for vol in list(body)]
for v in volumes:
v = self._check_if_bootable(v)
return resp, volumes
def get_volume(self, volume_id):
"""Returns the details of a single volume."""
url = "volumes/%s" % str(volume_id)
resp, body = self.get(url, self.headers)
body = self._parse_volume(etree.fromstring(body))
body = self._check_if_bootable(body)
return resp, body
def create_volume(self, size, **kwargs):
"""Creates a new Volume.
:param size: Size of volume in GB. (Required)
:param display_name: Optional Volume Name.
:param metadata: An optional dictionary of values for metadata.
:param volume_type: Optional Name of volume_type for the volume
:param snapshot_id: When specified the volume is created from
this snapshot
:param imageRef: When specified the volume is created from this
image
"""
# NOTE(afazekas): it should use a volume namespace
volume = Element("volume", xmlns=XMLNS_11, size=size)
if 'metadata' in kwargs:
_metadata = Element('metadata')
volume.append(_metadata)
for key, value in kwargs['metadata'].items():
meta = Element('meta')
meta.add_attr('key', key)
meta.append(Text(value))
_metadata.append(meta)
attr_to_add = kwargs.copy()
del attr_to_add['metadata']
else:
attr_to_add = kwargs
for key, value in attr_to_add.items():
volume.add_attr(key, value)
resp, body = self.post('volumes', str(Document(volume)),
self.headers)
body = xml_to_json(etree.fromstring(body))
return resp, body
def update_volume(self, volume_id, **kwargs):
"""Updates the Specified Volume."""
put_body = Element("volume", xmlns=XMLNS_11, **kwargs)
resp, body = self.put('volumes/%s' % volume_id,
str(Document(put_body)),
self.headers)
body = xml_to_json(etree.fromstring(body))
return resp, body
def delete_volume(self, volume_id):
"""Deletes the Specified Volume."""
return self.delete("volumes/%s" % str(volume_id))
def wait_for_volume_status(self, volume_id, status):
"""Waits for a Volume to reach a given status."""
resp, body = self.get_volume(volume_id)
volume_status = body['status']
start = int(time.time())
while volume_status != status:
time.sleep(self.build_interval)
resp, body = self.get_volume(volume_id)
volume_status = body['status']
if volume_status == 'error':
raise exceptions.VolumeBuildErrorException(volume_id=volume_id)
if int(time.time()) - start >= self.build_timeout:
message = 'Volume %s failed to reach %s status within '\
'the required time (%s s).' % (volume_id,
status,
self.build_timeout)
raise exceptions.TimeoutException(message)
def is_resource_deleted(self, id):
try:
self.get_volume(id)
except exceptions.NotFound:
return True
return False
def attach_volume(self, volume_id, instance_uuid, mountpoint):
"""Attaches a volume to a given instance on a given mountpoint."""
post_body = Element("os-attach",
instance_uuid=instance_uuid,
mountpoint=mountpoint
)
url = 'volumes/%s/action' % str(volume_id)
resp, body = self.post(url, str(Document(post_body)), self.headers)
if body:
body = xml_to_json(etree.fromstring(body))
return resp, body
def detach_volume(self, volume_id):
"""Detaches a volume from an instance."""
post_body = Element("os-detach")
url = 'volumes/%s/action' % str(volume_id)
resp, body = self.post(url, str(Document(post_body)), self.headers)
if body:
body = xml_to_json(etree.fromstring(body))
return resp, body
def upload_volume(self, volume_id, image_name, disk_format):
"""Uploads a volume in Glance."""
post_body = Element("os-volume_upload_image",
image_name=image_name,
disk_format=disk_format)
url = 'volumes/%s/action' % str(volume_id)
resp, body = self.post(url, str(Document(post_body)), self.headers)
volume = xml_to_json(etree.fromstring(body))
return resp, volume
def extend_volume(self, volume_id, extend_size):
"""Extend a volume."""
post_body = Element("os-extend",
new_size=extend_size)
url = 'volumes/%s/action' % str(volume_id)
resp, body = self.post(url, str(Document(post_body)), self.headers)
if body:
body = xml_to_json(etree.fromstring(body))
return resp, body
def reset_volume_status(self, volume_id, status):
"""Reset the Specified Volume's Status."""
post_body = Element("os-reset_status",
status=status
)
url = 'volumes/%s/action' % str(volume_id)
resp, body = self.post(url, str(Document(post_body)), self.headers)
if body:
body = xml_to_json(etree.fromstring(body))
return resp, body
def volume_begin_detaching(self, volume_id):
"""Volume Begin Detaching."""
post_body = Element("os-begin_detaching")
url = 'volumes/%s/action' % str(volume_id)
resp, body = self.post(url, str(Document(post_body)), self.headers)
if body:
body = xml_to_json(etree.fromstring(body))
return resp, body
def volume_roll_detaching(self, volume_id):
"""Volume Roll Detaching."""
post_body = Element("os-roll_detaching")
url = 'volumes/%s/action' % str(volume_id)
resp, body = self.post(url, str(Document(post_body)), self.headers)
if body:
body = xml_to_json(etree.fromstring(body))
return resp, body
def reserve_volume(self, volume_id):
"""Reserves a volume."""
post_body = Element("os-reserve")
url = 'volumes/%s/action' % str(volume_id)
resp, body = self.post(url, str(Document(post_body)), self.headers)
if body:
body = xml_to_json(etree.fromstring(body))
return resp, body
def unreserve_volume(self, volume_id):
"""Restore a reserved volume ."""
post_body = Element("os-unreserve")
url = 'volumes/%s/action' % str(volume_id)
resp, body = self.post(url, str(Document(post_body)), self.headers)
if body:
body = xml_to_json(etree.fromstring(body))
return resp, body
def create_volume_transfer(self, vol_id, display_name=None):
"""Create a volume transfer."""
post_body = Element("transfer",
volume_id=vol_id)
if display_name:
post_body.add_attr('name', display_name)
resp, body = self.post('os-volume-transfer',
str(Document(post_body)),
self.headers)
volume = xml_to_json(etree.fromstring(body))
return resp, volume
def get_volume_transfer(self, transfer_id):
"""Returns the details of a volume transfer."""
url = "os-volume-transfer/%s" % str(transfer_id)
resp, body = self.get(url, self.headers)
volume = xml_to_json(etree.fromstring(body))
return resp, volume
def list_volume_transfers(self, params=None):
"""List all the volume transfers created."""
url = 'os-volume-transfer'
if params:
url += '?%s' % urllib.urlencode(params)
resp, body = self.get(url, self.headers)
body = etree.fromstring(body)
volumes = []
if body is not None:
volumes += [self._parse_volume_transfer(vol) for vol in list(body)]
return resp, volumes
def _parse_volume_transfer(self, body):
vol = dict((attr, body.get(attr)) for attr in body.keys())
for child in body.getchildren():
tag = child.tag
if tag.startswith("{"):
tag = tag.split("}", 1)
vol[tag] = xml_to_json(child)
return vol
def delete_volume_transfer(self, transfer_id):
"""Delete a volume transfer."""
return self.delete("os-volume-transfer/%s" % str(transfer_id))
def accept_volume_transfer(self, transfer_id, transfer_auth_key):
"""Accept a volume transfer."""
post_body = Element("accept", auth_key=transfer_auth_key)
url = 'os-volume-transfer/%s/accept' % transfer_id
resp, body = self.post(url, str(Document(post_body)), self.headers)
volume = xml_to_json(etree.fromstring(body))
return resp, volume
def update_volume_readonly(self, volume_id, readonly):
"""Update the Specified Volume readonly."""
post_body = Element("os-update_readonly_flag",
readonly=readonly)
url = 'volumes/%s/action' % str(volume_id)
resp, body = self.post(url, str(Document(post_body)), self.headers)
if body:
body = xml_to_json(etree.fromstring(body))
return resp, body
def force_delete_volume(self, volume_id):
"""Force Delete Volume."""
post_body = Element("os-force_delete")
url = 'volumes/%s/action' % str(volume_id)
resp, body = self.post(url, str(Document(post_body)), self.headers)
if body:
body = xml_to_json(etree.fromstring(body))
return resp, body
def _metadata_body(self, meta):
post_body = Element('metadata')
for k, v in meta.items():
data = Element('meta', key=k)
data.append(Text(v))
post_body.append(data)
return post_body
def _parse_key_value(self, node):
"""Parse <foo key='key'>value</foo> data into {'key': 'value'}."""
data = {}
for node in node.getchildren():
data[node.get('key')] = node.text
return data
def create_volume_metadata(self, volume_id, metadata):
"""Create metadata for the volume."""
post_body = self._metadata_body(metadata)
resp, body = self.post('volumes/%s/metadata' % volume_id,
str(Document(post_body)),
self.headers)
body = self._parse_key_value(etree.fromstring(body))
return resp, body
def get_volume_metadata(self, volume_id):
"""Get metadata of the volume."""
url = "volumes/%s/metadata" % str(volume_id)
resp, body = self.get(url, self.headers)
body = self._parse_key_value(etree.fromstring(body))
return resp, body
def update_volume_metadata(self, volume_id, metadata):
"""Update metadata for the volume."""
put_body = self._metadata_body(metadata)
url = "volumes/%s/metadata" % str(volume_id)
resp, body = self.put(url, str(Document(put_body)), self.headers)
body = self._parse_key_value(etree.fromstring(body))
return resp, body
def update_volume_metadata_item(self, volume_id, id, meta_item):
"""Update metadata item for the volume."""
for k, v in meta_item.items():
put_body = Element('meta', key=k)
put_body.append(Text(v))
url = "volumes/%s/metadata/%s" % (str(volume_id), str(id))
resp, body = self.put(url, str(Document(put_body)), self.headers)
body = xml_to_json(etree.fromstring(body))
return resp, body
def delete_volume_metadata_item(self, volume_id, id):
"""Delete metadata item for the volume."""
url = "volumes/%s/metadata/%s" % (str(volume_id), str(id))
return self.delete(url)