Merge "Drop support for Cinder v1 API"

This commit is contained in:
Zuul 2019-09-08 05:27:59 +00:00 committed by Gerrit Code Review
commit 5b811d96d0
45 changed files with 24 additions and 5819 deletions

@ -27,7 +27,7 @@ LOG = logging.getLogger(__name__)
# key is a deprecated version and value is an alternative version. # key is a deprecated version and value is an alternative version.
DEPRECATED_VERSIONS = {"1": "2"} DEPRECATED_VERSIONS = {"2": "3"}
DEPRECATED_VERSION = "2.0" DEPRECATED_VERSION = "2.0"
MAX_VERSION = "3.59" MAX_VERSION = "3.59"
MIN_VERSION = "3.0" MIN_VERSION = "3.0"

@ -57,12 +57,10 @@ try:
except ImportError: except ImportError:
import simplejson as json import simplejson as json
_VALID_VERSIONS = ['v1', 'v2', 'v3'] _VALID_VERSIONS = ['v2', 'v3']
V3_SERVICE_TYPE = 'volumev3' V3_SERVICE_TYPE = 'volumev3'
V2_SERVICE_TYPE = 'volumev2' V2_SERVICE_TYPE = 'volumev2'
V1_SERVICE_TYPE = 'volume' SERVICE_TYPES = {'2': V2_SERVICE_TYPE,
SERVICE_TYPES = {'1': V1_SERVICE_TYPE,
'2': V2_SERVICE_TYPE,
'3': V3_SERVICE_TYPE} '3': V3_SERVICE_TYPE}
REQ_ID_HEADER = 'X-OpenStack-Request-ID' REQ_ID_HEADER = 'X-OpenStack-Request-ID'
@ -93,11 +91,11 @@ def get_server_version(url, insecure=False, cacert=None):
# NOTE(andreykurilin): endpoint URL has at least 2 formats: # NOTE(andreykurilin): endpoint URL has at least 2 formats:
# 1. The classic (legacy) endpoint: # 1. The classic (legacy) endpoint:
# http://{host}:{optional_port}/v{1 or 2 or 3}/{project-id} # http://{host}:{optional_port}/v{2 or 3}/{project-id}
# http://{host}:{optional_port}/v{1 or 2 or 3} # http://{host}:{optional_port}/v{2 or 3}
# 3. Under wsgi: # 3. Under wsgi:
# http://{host}:{optional_port}/volume/v{1 or 2 or 3} # http://{host}:{optional_port}/volume/v{2 or 3}
for ver in ['v1', 'v2', 'v3']: for ver in ['v2', 'v3']:
if u.path.endswith(ver) or "/{0}/".format(ver) in u.path: if u.path.endswith(ver) or "/{0}/".format(ver) in u.path:
path = u.path[:u.path.rfind(ver)] path = u.path[:u.path.rfind(ver)]
version_url = '%s://%s%s' % (u.scheme, u.netloc, path) version_url = '%s://%s%s' % (u.scheme, u.netloc, path)
@ -125,6 +123,11 @@ def get_server_version(url, insecure=False, cacert=None):
min_version = version['min_version'] min_version = version['min_version']
current_version = version['version'] current_version = version['version']
break break
else:
# Set the values, but don't break out the loop here in case v3
# comes later
min_version = '2.0'
current_version = '2.0'
except exceptions.ClientException as e: except exceptions.ClientException as e:
logger.warning("Error in server version query:%s\n" logger.warning("Error in server version query:%s\n"
"Returning APIVersion 2.0", six.text_type(e.message)) "Returning APIVersion 2.0", six.text_type(e.message))
@ -751,7 +754,6 @@ def _get_client_class_and_version(version):
def get_client_class(version): def get_client_class(version):
version_map = { version_map = {
'1': 'cinderclient.v1.client.Client',
'2': 'cinderclient.v2.client.Client', '2': 'cinderclient.v2.client.Client',
'3': 'cinderclient.v3.client.Client', '3': 'cinderclient.v3.client.Client',
} }
@ -819,8 +821,7 @@ def Client(version, *args, **kwargs):
Here ``VERSION`` can be a string or Here ``VERSION`` can be a string or
``cinderclient.api_versions.APIVersion`` obj. If you prefer string value, ``cinderclient.api_versions.APIVersion`` obj. If you prefer string value,
you can use ``1`` (deprecated now), ``2``, or ``3.X`` you can use ``2`` (deprecated now) or ``3.X`` (where X is a microversion).
(where X is a microversion).
Alternatively, you can create a client instance using the keystoneclient Alternatively, you can create a client instance using the keystoneclient

@ -13,7 +13,6 @@
from keystoneauth1 import fixture from keystoneauth1 import fixture
from cinderclient.tests.unit.fixture_data import base from cinderclient.tests.unit.fixture_data import base
from cinderclient.v1 import client as v1client
from cinderclient.v2 import client as v2client from cinderclient.v2 import client as v2client
@ -34,21 +33,6 @@ class Base(base.Fixture):
headers=self.json_headers) headers=self.json_headers)
class V1(Base):
def __init__(self, *args, **kwargs):
super(V1, self).__init__(*args, **kwargs)
svc = self.token.add_service('volume')
svc.add_endpoint(self.volume_url)
def new_client(self):
return v1client.Client(username='xx',
api_key='xx',
project_id='xx',
auth_url=self.identity_url)
class V2(Base): class V2(Base):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):

@ -25,7 +25,7 @@ from cinderclient.v3 import volumes
from cinderclient.tests.unit import test_utils from cinderclient.tests.unit import test_utils
from cinderclient.tests.unit import utils from cinderclient.tests.unit import utils
from cinderclient.tests.unit.v1 import fakes from cinderclient.tests.unit.v2 import fakes
cs = fakes.FakeClient() cs = fakes.FakeClient()

@ -25,7 +25,6 @@ import six
from cinderclient import api_versions from cinderclient import api_versions
import cinderclient.client import cinderclient.client
from cinderclient import exceptions from cinderclient import exceptions
import cinderclient.v1.client
import cinderclient.v2.client import cinderclient.v2.client
from cinderclient.tests.unit import utils from cinderclient.tests.unit import utils
@ -35,10 +34,6 @@ from cinderclient.tests.unit.v3 import fakes
@ddt.ddt @ddt.ddt
class ClientTest(utils.TestCase): class ClientTest(utils.TestCase):
def test_get_client_class_v1(self):
output = cinderclient.client.get_client_class('1')
self.assertEqual(cinderclient.v1.client.Client, output)
def test_get_client_class_v2(self): def test_get_client_class_v2(self):
output = cinderclient.client.get_client_class('2') output = cinderclient.client.get_client_class('2')
self.assertEqual(cinderclient.v2.client.Client, output) self.assertEqual(cinderclient.v2.client.Client, output)
@ -87,12 +82,9 @@ class ClientTest(utils.TestCase):
self.assertIn("fakeUser", output[1]) self.assertIn("fakeUser", output[1])
def test_versions(self): def test_versions(self):
v1_url = 'http://fakeurl/v1/tenants'
v2_url = 'http://fakeurl/v2/tenants' v2_url = 'http://fakeurl/v2/tenants'
unknown_url = 'http://fakeurl/v9/tenants' unknown_url = 'http://fakeurl/v9/tenants'
self.assertEqual('1',
cinderclient.client.get_volume_api_from_url(v1_url))
self.assertEqual('2', self.assertEqual('2',
cinderclient.client.get_volume_api_from_url(v2_url)) cinderclient.client.get_volume_api_from_url(v2_url))
self.assertRaises(cinderclient.exceptions.UnsupportedVersion, self.assertRaises(cinderclient.exceptions.UnsupportedVersion,

@ -1,34 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cinderclient import extension
from cinderclient.v1.contrib import list_extensions
from cinderclient.tests.unit import utils
from cinderclient.tests.unit.v1 import fakes
extensions = [
extension.Extension(list_extensions.__name__.split(".")[-1],
list_extensions),
]
cs = fakes.FakeClient(extensions=extensions)
class ListExtensionsTests(utils.TestCase):
def test_list_extensions(self):
all_exts = cs.list_extensions.show_all()
cs.assert_called('GET', '/extensions')
self.assertGreater(len(all_exts), 0)
for r in all_exts:
self.assertGreater(len(r.summary), 0)

@ -1,800 +0,0 @@
# Copyright (c) 2011 X.commerce, a business unit of eBay Inc.
# Copyright (c) 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
import six.moves.urllib.parse as urlparse
from cinderclient import client as base_client
from cinderclient.tests.unit import fakes
import cinderclient.tests.unit.utils as utils
from cinderclient.v1 import client
def _stub_volume(**kwargs):
volume = {
'id': '00000000-0000-0000-0000-000000000000',
'display_name': None,
'display_description': None,
"attachments": [],
"bootable": "false",
"availability_zone": "cinder",
"created_at": "2012-08-27T00:00:00.000000",
"metadata": {},
"size": 1,
"snapshot_id": None,
"status": "available",
"volume_type": "None",
}
volume.update(kwargs)
return volume
def _stub_snapshot(**kwargs):
snapshot = {
"created_at": "2012-08-28T16:30:31.000000",
"display_description": None,
"display_name": None,
"id": '11111111-1111-1111-1111-111111111111',
"size": 1,
"status": "available",
"volume_id": '00000000-0000-0000-0000-000000000000',
}
snapshot.update(kwargs)
return snapshot
def _self_href(base_uri, tenant_id, backup_id):
return '%s/v1/%s/backups/%s' % (base_uri, tenant_id, backup_id)
def _bookmark_href(base_uri, tenant_id, backup_id):
return '%s/%s/backups/%s' % (base_uri, tenant_id, backup_id)
def _stub_backup_full(id, base_uri, tenant_id):
return {
'id': id,
'name': 'backup',
'description': 'nightly backup',
'volume_id': '712f4980-5ac1-41e5-9383-390aa7c9f58b',
'container': 'volumebackups',
'object_count': 220,
'size': 10,
'availability_zone': 'az1',
'created_at': '2013-04-12T08:16:37.000000',
'status': 'available',
'links': [
{
'href': _self_href(base_uri, tenant_id, id),
'rel': 'self'
},
{
'href': _bookmark_href(base_uri, tenant_id, id),
'rel': 'bookmark'
}
]
}
def _stub_backup(id, base_uri, tenant_id):
return {
'id': id,
'name': 'backup',
'links': [
{
'href': _self_href(base_uri, tenant_id, id),
'rel': 'self'
},
{
'href': _bookmark_href(base_uri, tenant_id, id),
'rel': 'bookmark'
}
]
}
def _stub_restore():
return {'volume_id': '712f4980-5ac1-41e5-9383-390aa7c9f58b'}
def _stub_qos_full(id, base_uri, tenant_id, name=None, specs=None):
if not name:
name = 'fake-name'
if not specs:
specs = {}
return {
'qos_specs': {
'id': id,
'name': name,
'consumer': 'back-end',
'specs': specs,
},
'links': {
'href': _bookmark_href(base_uri, tenant_id, id),
'rel': 'bookmark'
}
}
def _stub_qos_associates(id, name):
return {
'assoications_type': 'volume_type',
'name': name,
'id': id,
}
def _stub_transfer_full(id, base_uri, tenant_id):
return {
'id': id,
'name': 'transfer',
'volume_id': '8c05f861-6052-4df6-b3e0-0aebfbe686cc',
'created_at': '2013-04-12T08:16:37.000000',
'auth_key': '123456',
'links': [
{
'href': _self_href(base_uri, tenant_id, id),
'rel': 'self'
},
{
'href': _bookmark_href(base_uri, tenant_id, id),
'rel': 'bookmark'
}
]
}
def _stub_transfer(id, base_uri, tenant_id):
return {
'id': id,
'name': 'transfer',
'volume_id': '8c05f861-6052-4df6-b3e0-0aebfbe686cc',
'links': [
{
'href': _self_href(base_uri, tenant_id, id),
'rel': 'self'
},
{
'href': _bookmark_href(base_uri, tenant_id, id),
'rel': 'bookmark'
}
]
}
def _stub_extend(id, new_size):
return {'volume_id': '712f4980-5ac1-41e5-9383-390aa7c9f58b'}
class FakeClient(fakes.FakeClient, client.Client):
def __init__(self, api_version=None, *args, **kwargs):
client.Client.__init__(self, 'username', 'password',
'project_id', 'auth_url',
extensions=kwargs.get('extensions'))
self.api_version = api_version
self.client = FakeHTTPClient(**kwargs)
def get_volume_api_version_from_endpoint(self):
return self.client.get_volume_api_version_from_endpoint()
class FakeHTTPClient(base_client.HTTPClient):
def __init__(self, **kwargs):
self.username = 'username'
self.password = 'password'
self.auth_url = 'auth_url'
self.callstack = []
self.management_url = 'http://10.0.2.15:8776/v1/fake'
def _cs_request(self, url, method, **kwargs):
# Check that certain things are called correctly
if method in ['GET', 'DELETE']:
assert 'body' not in kwargs
elif method == 'PUT':
assert 'body' in kwargs
# Call the method
args = urlparse.parse_qsl(urlparse.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit('?', 1)[0]
munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
munged_url = munged_url.replace('-', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
# Note the call
self.callstack.append((method, url, kwargs.get('body', None)))
status, headers, body = getattr(self, callback)(**kwargs)
r = utils.TestResponse({
"status_code": status,
"text": body,
"headers": headers,
})
return r, body
def get_volume_api_version_from_endpoint(self):
magic_tuple = urlparse.urlsplit(self.management_url)
scheme, netloc, path, query, frag = magic_tuple
return path.lstrip('/').split('/')[0][1:]
#
# Snapshots
#
def get_snapshots_detail(self, **kw):
return (200, {}, {'snapshots': [
_stub_snapshot(),
]})
def get_snapshots_1234(self, **kw):
return (200, {}, {'snapshot': _stub_snapshot(id='1234')})
def get_snapshots_5678(self, **kw):
return (200, {}, {'snapshot': _stub_snapshot(id='5678')})
def put_snapshots_1234(self, **kw):
snapshot = _stub_snapshot(id='1234')
snapshot.update(kw['body']['snapshot'])
return (200, {}, {'snapshot': snapshot})
def post_snapshots_1234_action(self, body, **kw):
_body = None
resp = 202
assert len(list(body)) == 1
action = list(body)[0]
if action == 'os-reset_status':
assert 'status' in body['os-reset_status']
elif action == 'os-update_snapshot_status':
assert 'status' in body['os-update_snapshot_status']
else:
raise AssertionError("Unexpected action: %s" % action)
return (resp, {}, _body)
def post_snapshots_5678_action(self, body, **kw):
return self.post_snapshots_1234_action(body, **kw)
def delete_snapshots_1234(self, **kw):
return (202, {}, {})
def delete_snapshots_5678(self, **kw):
return (202, {}, {})
#
# Volumes
#
def put_volumes_1234(self, **kw):
volume = _stub_volume(id='1234')
volume.update(kw['body']['volume'])
return (200, {}, {'volume': volume})
def get_volumes(self, **kw):
return (200, {}, {"volumes": [
{'id': 1234, 'display_name': 'sample-volume'},
{'id': 5678, 'display_name': 'sample-volume2'}
]})
# TODO(jdg): This will need to change
# at the very least it's not complete
def get_volumes_detail(self, **kw):
return (200, {}, {"volumes": [
{'id': kw.get('id', 1234),
'display_name': 'sample-volume',
'attachments': [{'server_id': 1234}]},
]})
def get_volumes_1234(self, **kw):
r = {'volume': self.get_volumes_detail(id=1234)[2]['volumes'][0]}
return (200, {}, r)
def get_volumes_5678(self, **kw):
r = {'volume': self.get_volumes_detail(id=5678)[2]['volumes'][0]}
return (200, {}, r)
def get_volumes_1234_encryption(self, **kw):
r = {'encryption_key_id': 'id'}
return (200, {}, r)
def post_volumes_1234_action(self, body, **kw):
_body = None
resp = 202
assert len(list(body)) == 1
action = list(body)[0]
if action == 'os-attach':
keys = sorted(list(body[action]))
assert (keys == ['instance_uuid', 'mode', 'mountpoint'] or
keys == ['host_name', 'mode', 'mountpoint'])
elif action == 'os-detach':
assert body[action] is None
elif action == 'os-reserve':
assert body[action] is None
elif action == 'os-unreserve':
assert body[action] is None
elif action == 'os-initialize_connection':
assert list(body[action]) == ['connector']
return (202, {}, {'connection_info': 'foos'})
elif action == 'os-terminate_connection':
assert list(body[action]) == ['connector']
elif action == 'os-begin_detaching':
assert body[action] is None
elif action == 'os-roll_detaching':
assert body[action] is None
elif action == 'os-reset_status':
assert 'status' in body[action]
elif action == 'os-extend':
assert list(body[action]) == ['new_size']
elif action == 'os-migrate_volume':
assert 'host' in body[action]
assert 'force_host_copy' in body[action]
elif action == 'os-update_readonly_flag':
assert list(body[action]) == ['readonly']
elif action == 'os-set_bootable':
assert list(body[action]) == ['bootable']
else:
raise AssertionError("Unexpected action: %s" % action)
return (resp, {}, _body)
def post_volumes_5678_action(self, body, **kw):
return self.post_volumes_1234_action(body, **kw)
def post_volumes(self, **kw):
return (202, {}, {'volume': {}})
def delete_volumes_1234(self, **kw):
return (202, {}, None)
def delete_volumes_5678(self, **kw):
return (202, {}, None)
#
# Quotas
#
def get_os_quota_sets_test(self, **kw):
return (200, {}, {'quota_set': {
'tenant_id': 'test',
'metadata_items': [],
'volumes': 1,
'snapshots': 1,
'gigabytes': 1,
'backups': 1,
'backup_gigabytes': 1}})
def get_os_quota_sets_test_defaults(self):
return (200, {}, {'quota_set': {
'tenant_id': 'test',
'metadata_items': [],
'volumes': 1,
'snapshots': 1,
'gigabytes': 1,
'backups': 1,
'backup_gigabytes': 1}})
def put_os_quota_sets_test(self, body, **kw):
assert list(body) == ['quota_set']
fakes.assert_has_keys(body['quota_set'],
required=['tenant_id'])
return (200, {}, {'quota_set': {
'tenant_id': 'test',
'metadata_items': [],
'volumes': 2,
'snapshots': 2,
'gigabytes': 1,
'backups': 1,
'backup_gigabytes': 1}})
def delete_os_quota_sets_1234(self, **kw):
return (200, {}, {})
def delete_os_quota_sets_test(self, **kw):
return (200, {}, {})
#
# Quota Classes
#
def get_os_quota_class_sets_test(self, **kw):
return (200, {}, {'quota_class_set': {
'class_name': 'test',
'metadata_items': [],
'volumes': 1,
'snapshots': 1,
'gigabytes': 1,
'backups': 1,
'backup_gigabytes': 1}})
def put_os_quota_class_sets_test(self, body, **kw):
assert list(body) == ['quota_class_set']
fakes.assert_has_keys(body['quota_class_set'],
required=['class_name'])
return (200, {}, {'quota_class_set': {
'class_name': 'test',
'metadata_items': [],
'volumes': 2,
'snapshots': 2,
'gigabytes': 1,
'backups': 1,
'backup_gigabytes': 1}})
#
# VolumeTypes
#
def get_types(self, **kw):
return (200, {}, {
'volume_types': [{'id': 1,
'name': 'test-type-1',
'extra_specs': {}},
{'id': 2,
'name': 'test-type-2',
'extra_specs': {}}]})
def get_types_1(self, **kw):
return (200, {}, {'volume_type': {'id': 1,
'name': 'test-type-1',
'extra_specs': {}}})
def get_types_2(self, **kw):
return (200, {}, {'volume_type': {'id': 2,
'name': 'test-type-2',
'extra_specs': {}}})
def post_types(self, body, **kw):
return (202, {}, {'volume_type': {'id': 3,
'name': 'test-type-3',
'extra_specs': {}}})
def post_types_1_extra_specs(self, body, **kw):
assert list(body) == ['extra_specs']
return (200, {}, {'extra_specs': {'k': 'v'}})
def delete_types_1_extra_specs_k(self, **kw):
return(204, {}, None)
def delete_types_1_extra_specs_m(self, **kw):
return(204, {}, None)
def delete_types_1(self, **kw):
return (202, {}, None)
#
# VolumeEncryptionTypes
#
def get_types_1_encryption(self, **kw):
return (200, {}, {'id': 1, 'volume_type_id': 1, 'provider': 'test',
'cipher': 'test', 'key_size': 1,
'control_location': 'front-end'})
def get_types_2_encryption(self, **kw):
return (200, {}, {})
def post_types_2_encryption(self, body, **kw):
return (200, {}, {'encryption': body})
def put_types_1_encryption_1(self, body, **kw):
return (200, {}, {})
def delete_types_1_encryption_provider(self, **kw):
return (202, {}, None)
#
# Set/Unset metadata
#
def delete_volumes_1234_metadata_test_key(self, **kw):
return (204, {}, None)
def delete_volumes_1234_metadata_key1(self, **kw):
return (204, {}, None)
def delete_volumes_1234_metadata_key2(self, **kw):
return (204, {}, None)
def post_volumes_1234_metadata(self, **kw):
return (204, {}, {'metadata': {'test_key': 'test_value'}})
#
# List all extensions
#
def get_extensions(self, **kw):
exts = [
{
"alias": "FAKE-1",
"description": "Fake extension number 1",
"links": [],
"name": "Fake1",
"namespace": ("http://docs.openstack.org/"
"/ext/fake1/api/v1.1"),
"updated": "2011-06-09T00:00:00+00:00"
},
{
"alias": "FAKE-2",
"description": "Fake extension number 2",
"links": [],
"name": "Fake2",
"namespace": ("http://docs.openstack.org/"
"/ext/fake1/api/v1.1"),
"updated": "2011-06-09T00:00:00+00:00"
},
]
return (200, {}, {"extensions": exts, })
#
# VolumeBackups
#
def get_backups_76a17945_3c6f_435c_975b_b5685db10b62(self, **kw):
base_uri = 'http://localhost:8776'
tenant_id = '0fa851f6668144cf9cd8c8419c1646c1'
backup1 = '76a17945-3c6f-435c-975b-b5685db10b62'
return (200, {},
{'backup': _stub_backup_full(backup1, base_uri, tenant_id)})
def get_backups_detail(self, **kw):
base_uri = 'http://localhost:8776'
tenant_id = '0fa851f6668144cf9cd8c8419c1646c1'
backup1 = '76a17945-3c6f-435c-975b-b5685db10b62'
backup2 = 'd09534c6-08b8-4441-9e87-8976f3a8f699'
return (200, {},
{'backups': [
_stub_backup_full(backup1, base_uri, tenant_id),
_stub_backup_full(backup2, base_uri, tenant_id)]})
def delete_backups_76a17945_3c6f_435c_975b_b5685db10b62(self, **kw):
return (202, {}, None)
def post_backups(self, **kw):
base_uri = 'http://localhost:8776'
tenant_id = '0fa851f6668144cf9cd8c8419c1646c1'
backup1 = '76a17945-3c6f-435c-975b-b5685db10b62'
return (202, {},
{'backup': _stub_backup(backup1, base_uri, tenant_id)})
def post_backups_76a17945_3c6f_435c_975b_b5685db10b62_restore(self, **kw):
return (200, {},
{'restore': _stub_restore()})
def post_backups_1234_restore(self, **kw):
return (200, {},
{'restore': _stub_restore()})
#
# QoSSpecs
#
def get_qos_specs_1B6B6A04_A927_4AEB_810B_B7BAAD49F57C(self, **kw):
base_uri = 'http://localhost:8776'
tenant_id = '0fa851f6668144cf9cd8c8419c1646c1'
qos_id1 = '1B6B6A04-A927-4AEB-810B-B7BAAD49F57C'
return (200, {},
_stub_qos_full(qos_id1, base_uri, tenant_id))
def get_qos_specs(self, **kw):
base_uri = 'http://localhost:8776'
tenant_id = '0fa851f6668144cf9cd8c8419c1646c1'
qos_id1 = '1B6B6A04-A927-4AEB-810B-B7BAAD49F57C'
qos_id2 = '0FD8DD14-A396-4E55-9573-1FE59042E95B'
return (200, {},
{'qos_specs': [
_stub_qos_full(qos_id1, base_uri, tenant_id, 'name-1'),
_stub_qos_full(qos_id2, base_uri, tenant_id)]})
def post_qos_specs(self, **kw):
base_uri = 'http://localhost:8776'
tenant_id = '0fa851f6668144cf9cd8c8419c1646c1'
qos_id = '1B6B6A04-A927-4AEB-810B-B7BAAD49F57C'
qos_name = 'qos-name'
return (202, {},
_stub_qos_full(qos_id, base_uri, tenant_id, qos_name))
def put_qos_specs_1B6B6A04_A927_4AEB_810B_B7BAAD49F57C(self, **kw):
return (202, {}, None)
def put_qos_specs_1B6B6A04_A927_4AEB_810B_B7BAAD49F57C_delete_keys(
self, **kw):
return (202, {}, None)
def delete_qos_specs_1B6B6A04_A927_4AEB_810B_B7BAAD49F57C(self, **kw):
return (202, {}, None)
def get_qos_specs_1B6B6A04_A927_4AEB_810B_B7BAAD49F57C_associations(
self, **kw):
type_id1 = '4230B13A-7A37-4E84-B777-EFBA6FCEE4FF'
type_id2 = '4230B13A-AB37-4E84-B777-EFBA6FCEE4FF'
type_name1 = 'type1'
type_name2 = 'type2'
return (202, {},
{'qos_associations': [
_stub_qos_associates(type_id1, type_name1),
_stub_qos_associates(type_id2, type_name2)]})
def get_qos_specs_1B6B6A04_A927_4AEB_810B_B7BAAD49F57C_associate(
self, **kw):
return (202, {}, None)
def get_qos_specs_1B6B6A04_A927_4AEB_810B_B7BAAD49F57C_disassociate(
self, **kw):
return (202, {}, None)
def get_qos_specs_1B6B6A04_A927_4AEB_810B_B7BAAD49F57C_disassociate_all(
self, **kw):
return (202, {}, None)
#
# VolumeTransfers
#
def get_os_volume_transfer_5678(self, **kw):
base_uri = 'http://localhost:8776'
tenant_id = '0fa851f6668144cf9cd8c8419c1646c1'
transfer1 = '5678'
return (200, {},
{'transfer':
_stub_transfer_full(transfer1, base_uri, tenant_id)})
def get_os_volume_transfer_detail(self, **kw):
base_uri = 'http://localhost:8776'
tenant_id = '0fa851f6668144cf9cd8c8419c1646c1'
transfer1 = '5678'
transfer2 = 'f625ec3e-13dd-4498-a22a-50afd534cc41'
return (200, {},
{'transfers': [
_stub_transfer_full(transfer1, base_uri, tenant_id),
_stub_transfer_full(transfer2, base_uri, tenant_id)]})
def delete_os_volume_transfer_5678(self, **kw):
return (202, {}, None)
def post_os_volume_transfer(self, **kw):
base_uri = 'http://localhost:8776'
tenant_id = '0fa851f6668144cf9cd8c8419c1646c1'
transfer1 = '5678'
return (202, {},
{'transfer': _stub_transfer(transfer1, base_uri, tenant_id)})
def post_os_volume_transfer_5678_accept(self, **kw):
base_uri = 'http://localhost:8776'
tenant_id = '0fa851f6668144cf9cd8c8419c1646c1'
transfer1 = '5678'
return (200, {},
{'transfer': _stub_transfer(transfer1, base_uri, tenant_id)})
#
# Services
#
def get_os_services(self, **kw):
host = kw.get('host', None)
binary = kw.get('binary', None)
services = [
{
'binary': 'cinder-volume',
'host': 'host1',
'zone': 'cinder',
'status': 'enabled',
'state': 'up',
'updated_at': datetime(2012, 10, 29, 13, 42, 2)
},
{
'binary': 'cinder-volume',
'host': 'host2',
'zone': 'cinder',
'status': 'disabled',
'state': 'down',
'updated_at': datetime(2012, 9, 18, 8, 3, 38)
},
{
'binary': 'cinder-scheduler',
'host': 'host2',
'zone': 'cinder',
'status': 'disabled',
'state': 'down',
'updated_at': datetime(2012, 9, 18, 8, 3, 38)
},
]
if host:
services = [i for i in services if i['host'] == host]
if binary:
services = [i for i in services if i['binary'] == binary]
return (200, {}, {'services': services})
def put_os_services_enable(self, body, **kw):
return (200, {}, {'host': body['host'], 'binary': body['binary'],
'status': 'enabled'})
def put_os_services_disable(self, body, **kw):
return (200, {}, {'host': body['host'], 'binary': body['binary'],
'status': 'disabled'})
def put_os_services_disable_log_reason(self, body, **kw):
return (200, {}, {'host': body['host'], 'binary': body['binary'],
'status': 'disabled',
'disabled_reason': body['disabled_reason']})
def get_os_availability_zone(self, **kw):
return (200, {}, {
"availabilityZoneInfo": [
{
"zoneName": "zone-1",
"zoneState": {"available": True},
"hosts": None,
},
{
"zoneName": "zone-2",
"zoneState": {"available": False},
"hosts": None,
},
]
})
def get_os_availability_zone_detail(self, **kw):
return (200, {}, {
"availabilityZoneInfo": [
{
"zoneName": "zone-1",
"zoneState": {"available": True},
"hosts": {
"fake_host-1": {
"cinder-volume": {
"active": True,
"available": True,
"updated_at":
datetime(2012, 12, 26, 14, 45, 25, 0)
}
}
}
},
{
"zoneName": "internal",
"zoneState": {"available": True},
"hosts": {
"fake_host-1": {
"cinder-sched": {
"active": True,
"available": True,
"updated_at":
datetime(2012, 12, 26, 14, 45, 24, 0)
}
}
}
},
{
"zoneName": "zone-2",
"zoneState": {"available": False},
"hosts": None,
},
]
})
def post_snapshots_1234_metadata(self, **kw):
return (200, {}, {"metadata": {"key1": "val1", "key2": "val2"}})
def delete_snapshots_1234_metadata_key1(self, **kw):
return (200, {}, None)
def delete_snapshots_1234_metadata_key2(self, **kw):
return (200, {}, None)
def put_volumes_1234_metadata(self, **kw):
return (200, {}, {"metadata": {"key1": "val1", "key2": "val2"}})
def put_snapshots_1234_metadata(self, **kw):
return (200, {}, {"metadata": {"key1": "val1", "key2": "val2"}})

@ -1,339 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import mock
import requests
from cinderclient import exceptions
from cinderclient.v1 import client
from cinderclient.tests.unit import utils
class AuthenticateAgainstKeystoneTests(utils.TestCase):
def test_authenticate_success(self):
cs = client.Client("username", "password", "project_id",
"http://localhost:8776/v1", service_type='volume')
resp = {
"access": {
"token": {
"expires": "2014-11-01T03:32:15-05:00",
"id": "FAKE_ID",
},
"serviceCatalog": [
{
"type": "volume",
"endpoints": [
{
"region": "RegionOne",
"adminURL": "http://localhost:8776/v1",
"internalURL": "http://localhost:8776/v1",
"publicURL": "http://localhost:8776/v1",
},
],
},
],
},
}
auth_response = utils.TestResponse({
"status_code": 200,
"text": json.dumps(resp),
})
mock_request = mock.Mock(return_value=(auth_response))
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
cs.client.authenticate()
headers = {
'User-Agent': cs.client.USER_AGENT,
'Content-Type': 'application/json',
'Accept': 'application/json',
}
body = {
'auth': {
'passwordCredentials': {
'username': cs.client.user,
'password': cs.client.password,
},
'tenantName': cs.client.projectid,
},
}
token_url = cs.client.auth_url + "/tokens"
mock_request.assert_called_with(
"POST",
token_url,
headers=headers,
data=json.dumps(body),
allow_redirects=True,
**self.TEST_REQUEST_BASE)
endpoints = resp["access"]["serviceCatalog"][0]['endpoints']
public_url = endpoints[0]["publicURL"].rstrip('/')
self.assertEqual(public_url, cs.client.management_url)
token_id = resp["access"]["token"]["id"]
self.assertEqual(token_id, cs.client.auth_token)
test_auth_call()
def test_authenticate_tenant_id(self):
cs = client.Client("username", "password",
auth_url="http://localhost:8776/v1",
tenant_id='tenant_id', service_type='volume')
resp = {
"access": {
"token": {
"expires": "2014-11-01T03:32:15-05:00",
"id": "FAKE_ID",
"tenant": {
"description": None,
"enabled": True,
"id": "tenant_id",
"name": "demo"
} # tenant associated with token
},
"serviceCatalog": [
{
"type": "volume",
"endpoints": [
{
"region": "RegionOne",
"adminURL": "http://localhost:8776/v1",
"internalURL": "http://localhost:8776/v1",
"publicURL": "http://localhost:8776/v1",
},
],
},
],
},
}
auth_response = utils.TestResponse({
"status_code": 200,
"text": json.dumps(resp),
})
mock_request = mock.Mock(return_value=(auth_response))
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
cs.client.authenticate()
headers = {
'User-Agent': cs.client.USER_AGENT,
'Content-Type': 'application/json',
'Accept': 'application/json',
}
body = {
'auth': {
'passwordCredentials': {
'username': cs.client.user,
'password': cs.client.password,
},
'tenantId': cs.client.tenant_id,
},
}
token_url = cs.client.auth_url + "/tokens"
mock_request.assert_called_with(
"POST",
token_url,
headers=headers,
data=json.dumps(body),
allow_redirects=True,
**self.TEST_REQUEST_BASE)
endpoints = resp["access"]["serviceCatalog"][0]['endpoints']
public_url = endpoints[0]["publicURL"].rstrip('/')
self.assertEqual(public_url, cs.client.management_url)
token_id = resp["access"]["token"]["id"]
self.assertEqual(token_id, cs.client.auth_token)
tenant_id = resp["access"]["token"]["tenant"]["id"]
self.assertEqual(tenant_id, cs.client.tenant_id)
test_auth_call()
def test_authenticate_failure(self):
cs = client.Client("username", "password", "project_id",
"http://localhost:8776/v1")
resp = {"unauthorized": {"message": "Unauthorized", "code": "401"}}
auth_response = utils.TestResponse({
"status_code": 401,
"text": json.dumps(resp),
})
mock_request = mock.Mock(return_value=(auth_response))
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
self.assertRaises(exceptions.Unauthorized, cs.client.authenticate)
test_auth_call()
def test_auth_redirect(self):
cs = client.Client("username", "password", "project_id",
"http://localhost:8776/v1", service_type='volume')
dict_correct_response = {
"access": {
"token": {
"expires": "2014-11-01T03:32:15-05:00",
"id": "FAKE_ID",
},
"serviceCatalog": [
{
"type": "volume",
"endpoints": [
{
"adminURL": "http://localhost:8776/v1",
"region": "RegionOne",
"internalURL": "http://localhost:8776/v1",
"publicURL": "http://localhost:8776/v1/",
},
],
},
],
},
}
correct_response = json.dumps(dict_correct_response)
dict_responses = [
{"headers": {'location': 'http://127.0.0.1:5001'},
"status_code": 305,
"text": "Use proxy"},
# Configured on admin port, cinder redirects to v2.0 port.
# When trying to connect on it, keystone auth succeed by v1.0
# protocol (through headers) but tokens are being returned in
# body (looks like keystone bug). Leaved for compatibility.
{"headers": {},
"status_code": 200,
"text": correct_response},
{"headers": {},
"status_code": 200,
"text": correct_response}
]
responses = [(utils.TestResponse(resp)) for resp in dict_responses]
def side_effect(*args, **kwargs):
return responses.pop(0)
mock_request = mock.Mock(side_effect=side_effect)
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
cs.client.authenticate()
headers = {
'User-Agent': cs.client.USER_AGENT,
'Content-Type': 'application/json',
'Accept': 'application/json',
}
body = {
'auth': {
'passwordCredentials': {
'username': cs.client.user,
'password': cs.client.password,
},
'tenantName': cs.client.projectid,
},
}
token_url = cs.client.auth_url + "/tokens"
mock_request.assert_called_with(
"POST",
token_url,
headers=headers,
data=json.dumps(body),
allow_redirects=True,
**self.TEST_REQUEST_BASE)
resp = dict_correct_response
endpoints = resp["access"]["serviceCatalog"][0]['endpoints']
public_url = endpoints[0]["publicURL"].rstrip('/')
self.assertEqual(public_url, cs.client.management_url)
token_id = resp["access"]["token"]["id"]
self.assertEqual(token_id, cs.client.auth_token)
test_auth_call()
class AuthenticationTests(utils.TestCase):
def test_authenticate_success(self):
cs = client.Client("username", "password", "project_id", "auth_url")
management_url = 'https://localhost/v1.1/443470'
auth_response = utils.TestResponse({
'status_code': 204,
'headers': {
'x-server-management-url': management_url,
'x-auth-token': '1b751d74-de0c-46ae-84f0-915744b582d1',
},
})
mock_request = mock.Mock(return_value=(auth_response))
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
cs.client.authenticate()
headers = {
'Accept': 'application/json',
'X-Auth-User': 'username',
'X-Auth-Key': 'password',
'X-Auth-Project-Id': 'project_id',
'User-Agent': cs.client.USER_AGENT
}
mock_request.assert_called_with(
"GET",
cs.client.auth_url,
headers=headers,
**self.TEST_REQUEST_BASE)
self.assertEqual(auth_response.headers['x-server-management-url'],
cs.client.management_url)
self.assertEqual(auth_response.headers['x-auth-token'],
cs.client.auth_token)
test_auth_call()
def test_authenticate_failure(self):
cs = client.Client("username", "password", "project_id", "auth_url")
auth_response = utils.TestResponse({"status_code": 401})
mock_request = mock.Mock(return_value=(auth_response))
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
self.assertRaises(exceptions.Unauthorized, cs.client.authenticate)
test_auth_call()
def test_auth_automatic(self):
cs = client.Client("username", "password", "project_id", "auth_url")
http_client = cs.client
http_client.management_url = ''
mock_request = mock.Mock(return_value=(None, None))
@mock.patch.object(http_client, 'request', mock_request)
@mock.patch.object(http_client, 'authenticate')
def test_auth_call(m):
http_client.get('/')
self.assertTrue(m.called)
self.assertTrue(mock_request.called)
test_auth_call()
def test_auth_manual(self):
cs = client.Client("username", "password", "project_id", "auth_url")
@mock.patch.object(cs.client, 'authenticate')
def test_auth_call(m):
cs.authenticate()
self.assertTrue(m.called)
test_auth_call()

@ -1,89 +0,0 @@
# Copyright 2011-2013 OpenStack Foundation
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from cinderclient.v1 import availability_zones
from cinderclient.v1 import shell
from cinderclient.tests.unit.fixture_data import availability_zones as azfixture # noqa
from cinderclient.tests.unit.fixture_data import client
from cinderclient.tests.unit import utils
class AvailabilityZoneTest(utils.FixturedTestCase):
client_fixture_class = client.V1
data_fixture_class = azfixture.Fixture
def _assertZone(self, zone, name, status):
self.assertEqual(name, zone.zoneName)
self.assertEqual(status, zone.zoneState)
def test_list_availability_zone(self):
zones = self.cs.availability_zones.list(detailed=False)
self.assert_called('GET', '/os-availability-zone')
for zone in zones:
self.assertIsInstance(zone,
availability_zones.AvailabilityZone)
self.assertEqual(2, len(zones))
l0 = [six.u('zone-1'), six.u('available')]
l1 = [six.u('zone-2'), six.u('not available')]
z0 = shell._treeizeAvailabilityZone(zones[0])
z1 = shell._treeizeAvailabilityZone(zones[1])
self.assertEqual((1, 1), (len(z0), len(z1)))
self._assertZone(z0[0], l0[0], l0[1])
self._assertZone(z1[0], l1[0], l1[1])
def test_detail_availability_zone(self):
zones = self.cs.availability_zones.list(detailed=True)
self.assert_called('GET', '/os-availability-zone/detail')
for zone in zones:
self.assertIsInstance(zone,
availability_zones.AvailabilityZone)
self.assertEqual(3, len(zones))
l0 = [six.u('zone-1'), six.u('available')]
l1 = [six.u('|- fake_host-1'), six.u('')]
l2 = [six.u('| |- cinder-volume'),
six.u('enabled :-) 2012-12-26 14:45:25')]
l3 = [six.u('internal'), six.u('available')]
l4 = [six.u('|- fake_host-1'), six.u('')]
l5 = [six.u('| |- cinder-sched'),
six.u('enabled :-) 2012-12-26 14:45:24')]
l6 = [six.u('zone-2'), six.u('not available')]
z0 = shell._treeizeAvailabilityZone(zones[0])
z1 = shell._treeizeAvailabilityZone(zones[1])
z2 = shell._treeizeAvailabilityZone(zones[2])
self.assertEqual((3, 3, 1), (len(z0), len(z1), len(z2)))
self._assertZone(z0[0], l0[0], l0[1])
self._assertZone(z0[1], l1[0], l1[1])
self._assertZone(z0[2], l2[0], l2[1])
self._assertZone(z1[0], l3[0], l3[1])
self._assertZone(z1[1], l4[0], l4[1])
self._assertZone(z1[2], l5[0], l5[1])
self._assertZone(z2[0], l6[0], l6[1])

@ -1,164 +0,0 @@
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from cinderclient.tests.unit import utils
from cinderclient.v1 import limits
def _get_default_RateLimit(verb="verb1", uri="uri1", regex="regex1",
value="value1",
remain="remain1", unit="unit1",
next_available="next1"):
return limits.RateLimit(verb, uri, regex, value, remain, unit,
next_available)
class TestLimits(utils.TestCase):
def test_repr(self):
l = limits.Limits(None, {"foo": "bar"})
self.assertEqual("<Limits>", repr(l))
def test_absolute(self):
l = limits.Limits(None,
{"absolute": {"name1": "value1", "name2": "value2"}})
l1 = limits.AbsoluteLimit("name1", "value1")
l2 = limits.AbsoluteLimit("name2", "value2")
for item in l.absolute:
self.assertIn(item, [l1, l2])
def test_rate(self):
l = limits.Limits(None,
{
"rate": [
{
"uri": "uri1",
"regex": "regex1",
"limit": [
{
"verb": "verb1",
"value": "value1",
"remaining": "remain1",
"unit": "unit1",
"next-available": "next1",
},
],
},
{
"uri": "uri2",
"regex": "regex2",
"limit": [
{
"verb": "verb2",
"value": "value2",
"remaining": "remain2",
"unit": "unit2",
"next-available": "next2",
},
],
},
],
})
l1 = limits.RateLimit("verb1", "uri1", "regex1", "value1", "remain1",
"unit1", "next1")
l2 = limits.RateLimit("verb2", "uri2", "regex2", "value2", "remain2",
"unit2", "next2")
for item in l.rate:
self.assertIn(item, [l1, l2])
class TestRateLimit(utils.TestCase):
def test_equal(self):
l1 = _get_default_RateLimit()
l2 = _get_default_RateLimit()
self.assertEqual(l1, l2)
def test_not_equal_verbs(self):
l1 = _get_default_RateLimit()
l2 = _get_default_RateLimit(verb="verb2")
self.assertNotEqual(l1, l2)
def test_not_equal_uris(self):
l1 = _get_default_RateLimit()
l2 = _get_default_RateLimit(uri="uri2")
self.assertNotEqual(l1, l2)
def test_not_equal_regexps(self):
l1 = _get_default_RateLimit()
l2 = _get_default_RateLimit(regex="regex2")
self.assertNotEqual(l1, l2)
def test_not_equal_values(self):
l1 = _get_default_RateLimit()
l2 = _get_default_RateLimit(value="value2")
self.assertNotEqual(l1, l2)
def test_not_equal_remains(self):
l1 = _get_default_RateLimit()
l2 = _get_default_RateLimit(remain="remain2")
self.assertNotEqual(l1, l2)
def test_not_equal_units(self):
l1 = _get_default_RateLimit()
l2 = _get_default_RateLimit(unit="unit2")
self.assertNotEqual(l1, l2)
def test_not_equal_next_available(self):
l1 = _get_default_RateLimit()
l2 = _get_default_RateLimit(next_available="next2")
self.assertNotEqual(l1, l2)
def test_repr(self):
l1 = _get_default_RateLimit()
self.assertEqual("<RateLimit: method=verb1 uri=uri1>", repr(l1))
class TestAbsoluteLimit(utils.TestCase):
def test_equal(self):
l1 = limits.AbsoluteLimit("name1", "value1")
l2 = limits.AbsoluteLimit("name1", "value1")
self.assertEqual(l1, l2)
def test_not_equal_values(self):
l1 = limits.AbsoluteLimit("name1", "value1")
l2 = limits.AbsoluteLimit("name1", "value2")
self.assertNotEqual(l1, l2)
def test_not_equal_names(self):
l1 = limits.AbsoluteLimit("name1", "value1")
l2 = limits.AbsoluteLimit("name2", "value1")
self.assertNotEqual(l1, l2)
def test_repr(self):
l1 = limits.AbsoluteLimit("name1", "value1")
self.assertEqual("<AbsoluteLimit: name=name1>", repr(l1))
class TestLimitsManager(utils.TestCase):
def test_get(self):
api = mock.Mock()
api.client.get.return_value = (
None,
{"limits": {"absolute": {"name1": "value1", }},
"no-limits": {"absolute": {"name2": "value2", }}})
l1 = limits.AbsoluteLimit("name1", "value1")
limitsManager = limits.LimitsManager(api)
lim = limitsManager.get()
self.assertIsInstance(lim, limits.Limits)
for l in lim.absolute:
self.assertEqual(l1, l)

@ -1,79 +0,0 @@
# Copyright (C) 2013 eBay Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient.tests.unit import utils
from cinderclient.tests.unit.v1 import fakes
cs = fakes.FakeClient()
class QoSSpecsTest(utils.TestCase):
def test_create(self):
specs = dict(k1='v1', k2='v2')
cs.qos_specs.create('qos-name', specs)
cs.assert_called('POST', '/qos-specs')
def test_get(self):
qos_id = '1B6B6A04-A927-4AEB-810B-B7BAAD49F57C'
cs.qos_specs.get(qos_id)
cs.assert_called('GET', '/qos-specs/%s' % qos_id)
def test_list(self):
cs.qos_specs.list()
cs.assert_called('GET', '/qos-specs')
def test_delete(self):
cs.qos_specs.delete('1B6B6A04-A927-4AEB-810B-B7BAAD49F57C')
cs.assert_called('DELETE',
'/qos-specs/1B6B6A04-A927-4AEB-810B-B7BAAD49F57C?'
'force=False')
def test_set_keys(self):
body = {'qos_specs': dict(k1='v1')}
qos_id = '1B6B6A04-A927-4AEB-810B-B7BAAD49F57C'
cs.qos_specs.set_keys(qos_id, body)
cs.assert_called('PUT', '/qos-specs/%s' % qos_id)
def test_unset_keys(self):
qos_id = '1B6B6A04-A927-4AEB-810B-B7BAAD49F57C'
body = {'keys': ['k1']}
cs.qos_specs.unset_keys(qos_id, body)
cs.assert_called('PUT', '/qos-specs/%s/delete_keys' % qos_id)
def test_get_associations(self):
qos_id = '1B6B6A04-A927-4AEB-810B-B7BAAD49F57C'
cs.qos_specs.get_associations(qos_id)
cs.assert_called('GET', '/qos-specs/%s/associations' % qos_id)
def test_associate(self):
qos_id = '1B6B6A04-A927-4AEB-810B-B7BAAD49F57C'
type_id = '4230B13A-7A37-4E84-B777-EFBA6FCEE4FF'
cs.qos_specs.associate(qos_id, type_id)
cs.assert_called('GET', '/qos-specs/%s/associate?vol_type_id=%s'
% (qos_id, type_id))
def test_disassociate(self):
qos_id = '1B6B6A04-A927-4AEB-810B-B7BAAD49F57C'
type_id = '4230B13A-7A37-4E84-B777-EFBA6FCEE4FF'
cs.qos_specs.disassociate(qos_id, type_id)
cs.assert_called('GET', '/qos-specs/%s/disassociate?vol_type_id=%s'
% (qos_id, type_id))
def test_disassociate_all(self):
qos_id = '1B6B6A04-A927-4AEB-810B-B7BAAD49F57C'
cs.qos_specs.disassociate_all(qos_id)
cs.assert_called('GET', '/qos-specs/%s/disassociate_all' % qos_id)

@ -1,59 +0,0 @@
# Copyright (c) 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient.tests.unit import utils
from cinderclient.tests.unit.v1 import fakes
cs = fakes.FakeClient()
class QuotaClassSetsTest(utils.TestCase):
def test_class_quotas_get(self):
class_name = 'test'
cs.quota_classes.get(class_name)
cs.assert_called('GET', '/os-quota-class-sets/%s' % class_name)
def test_update_quota(self):
q = cs.quota_classes.get('test')
q.update(volumes=2, snapshots=2, gigabytes=2000,
backups=2, backup_gigabytes=2000)
cs.assert_called('PUT', '/os-quota-class-sets/test')
def test_refresh_quota(self):
q = cs.quota_classes.get('test')
q2 = cs.quota_classes.get('test')
self.assertEqual(q.volumes, q2.volumes)
self.assertEqual(q.snapshots, q2.snapshots)
self.assertEqual(q.gigabytes, q2.gigabytes)
self.assertEqual(q.backups, q2.backups)
self.assertEqual(q.backup_gigabytes, q2.backup_gigabytes)
q2.volumes = 0
self.assertNotEqual(q.volumes, q2.volumes)
q2.snapshots = 0
self.assertNotEqual(q.snapshots, q2.snapshots)
q2.gigabytes = 0
self.assertNotEqual(q.gigabytes, q2.gigabytes)
q2.backups = 0
self.assertNotEqual(q.backups, q2.backups)
q2.backup_gigabytes = 0
self.assertNotEqual(q.backup_gigabytes, q2.backup_gigabytes)
q2.get()
self.assertEqual(q.volumes, q2.volumes)
self.assertEqual(q.snapshots, q2.snapshots)
self.assertEqual(q.gigabytes, q2.gigabytes)
self.assertEqual(q.backups, q2.backups)
self.assertEqual(q.backup_gigabytes, q2.backup_gigabytes)

@ -1,62 +0,0 @@
# Copyright (c) 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient.tests.unit import utils
from cinderclient.tests.unit.v1 import fakes
cs = fakes.FakeClient()
class QuotaSetsTest(utils.TestCase):
def test_tenant_quotas_get(self):
tenant_id = 'test'
cs.quotas.get(tenant_id)
cs.assert_called('GET', '/os-quota-sets/%s?usage=False' % tenant_id)
def test_tenant_quotas_defaults(self):
tenant_id = 'test'
cs.quotas.defaults(tenant_id)
cs.assert_called('GET', '/os-quota-sets/%s/defaults' % tenant_id)
def test_update_quota(self):
q = cs.quotas.get('test')
q.update(volumes=2)
q.update(snapshots=2)
q.update(backups=2)
cs.assert_called('PUT', '/os-quota-sets/test')
def test_refresh_quota(self):
q = cs.quotas.get('test')
q2 = cs.quotas.get('test')
self.assertEqual(q.volumes, q2.volumes)
self.assertEqual(q.snapshots, q2.snapshots)
self.assertEqual(q.backups, q2.backups)
q2.volumes = 0
self.assertNotEqual(q.volumes, q2.volumes)
q2.snapshots = 0
self.assertNotEqual(q.snapshots, q2.snapshots)
q2.backups = 0
self.assertNotEqual(q.backups, q2.backups)
q2.get()
self.assertEqual(q.volumes, q2.volumes)
self.assertEqual(q.snapshots, q2.snapshots)
self.assertEqual(q.backups, q2.backups)
def test_delete_quota(self):
tenant_id = 'test'
cs.quotas.delete(tenant_id)
cs.assert_called('DELETE', '/os-quota-sets/test')

@ -1,91 +0,0 @@
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient.tests.unit import utils
from cinderclient.tests.unit.v1 import fakes
from cinderclient.v1 import services
cs = fakes.FakeClient()
FAKE_SERVICE = {"host": "host1",
'binary': 'cinder-volume',
"status": "enable",
"availability_zone": "nova"}
class ServicesTest(utils.TestCase):
def test_list_services(self):
svs = cs.services.list()
cs.assert_called('GET', '/os-services')
self.assertEqual(3, len(svs))
[self.assertIsInstance(s, services.Service) for s in svs]
def test_list_services_with_hostname(self):
svs = cs.services.list(host='host2')
cs.assert_called('GET', '/os-services?host=host2')
self.assertEqual(2, len(svs))
[self.assertIsInstance(s, services.Service) for s in svs]
[self.assertEqual('host2', s.host) for s in svs]
def test_list_services_with_binary(self):
svs = cs.services.list(binary='cinder-volume')
cs.assert_called('GET', '/os-services?binary=cinder-volume')
self.assertEqual(2, len(svs))
[self.assertIsInstance(s, services.Service) for s in svs]
[self.assertEqual('cinder-volume', s.binary) for s in svs]
def test_list_services_with_host_binary(self):
svs = cs.services.list('host2', 'cinder-volume')
cs.assert_called('GET', '/os-services?host=host2&binary=cinder-volume')
self.assertEqual(1, len(svs))
[self.assertIsInstance(s, services.Service) for s in svs]
[self.assertEqual('host2', s.host) for s in svs]
[self.assertEqual('cinder-volume', s.binary) for s in svs]
def test_services_enable(self):
s = cs.services.enable('host1', 'cinder-volume')
values = {"host": "host1", 'binary': 'cinder-volume'}
cs.assert_called('PUT', '/os-services/enable', values)
self.assertIsInstance(s, services.Service)
self.assertEqual('enabled', s.status)
def test_services_disable(self):
s = cs.services.disable('host1', 'cinder-volume')
values = {"host": "host1", 'binary': 'cinder-volume'}
cs.assert_called('PUT', '/os-services/disable', values)
self.assertIsInstance(s, services.Service)
self.assertEqual('disabled', s.status)
def test_services_disable_log_reason(self):
s = cs.services.disable_log_reason(
'host1', 'cinder-volume', 'disable bad host')
values = {"host": "host1", 'binary': 'cinder-volume',
"disabled_reason": "disable bad host"}
cs.assert_called('PUT', '/os-services/disable-log-reason', values)
self.assertIsInstance(s, services.Service)
self.assertEqual('disabled', s.status)
def test___repr__(self):
"""
Unit test for Service.__repr__
Verify that one Service object can be printed.
"""
svs = services.Service(None, FAKE_SERVICE)
self.assertEqual(
"<Service: binary=%s host=%s>" % (FAKE_SERVICE['binary'],
FAKE_SERVICE['host']), repr(svs))

@ -1,495 +0,0 @@
# Copyright 2010 Jacob Kaplan-Moss
# Copyright (c) 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
import mock
from requests_mock.contrib import fixture as requests_mock_fixture
from cinderclient import client
from cinderclient import exceptions
from cinderclient import shell
from cinderclient.v1 import shell as shell_v1
from cinderclient.tests.unit.fixture_data import keystone_client
from cinderclient.tests.unit import utils
from cinderclient.tests.unit.v1 import fakes
@mock.patch.object(client, 'Client', fakes.FakeClient)
class ShellTest(utils.TestCase):
FAKE_ENV = {
'CINDER_USERNAME': 'username',
'CINDER_PASSWORD': 'password',
'CINDER_PROJECT_ID': 'project_id',
'OS_VOLUME_API_VERSION': '1',
'CINDER_URL': keystone_client.BASE_URL,
}
# Patch os.environ to avoid required auth info.
def setUp(self):
"""Run before each test."""
super(ShellTest, self).setUp()
for var in self.FAKE_ENV:
self.useFixture(fixtures.EnvironmentVariable(var,
self.FAKE_ENV[var]))
self.mock_completion()
self.shell = shell.OpenStackCinderShell()
# HACK(bcwaldon): replace this when we start using stubs
self.old_client_class = client.Client
client.Client = fakes.FakeClient
self.requests = self.useFixture(requests_mock_fixture.Fixture())
self.requests.register_uri(
'GET', keystone_client.BASE_URL,
text=keystone_client.keystone_request_callback)
def run_command(self, cmd):
self.shell.main(cmd.split())
def assert_called(self, method, url, body=None, **kwargs):
return self.shell.cs.assert_called(method, url, body, **kwargs)
def assert_called_anytime(self, method, url, body=None):
return self.shell.cs.assert_called_anytime(method, url, body)
def test_extract_metadata(self):
# mimic the result of argparse's parse_args() method
class Arguments(object):
def __init__(self, metadata=None):
self.metadata = metadata or []
inputs = [
([], {}),
(["key=value"], {"key": "value"}),
(["key"], {"key": None}),
(["k1=v1", "k2=v2"], {"k1": "v1", "k2": "v2"}),
(["k1=v1", "k2"], {"k1": "v1", "k2": None}),
(["k1", "k2=v2"], {"k1": None, "k2": "v2"})
]
for input in inputs:
args = Arguments(metadata=input[0])
self.assertEqual(input[1], shell_v1._extract_metadata(args))
def test_translate_volume_keys(self):
cs = fakes.FakeClient()
v = cs.volumes.list()[0]
setattr(v, 'os-vol-tenant-attr:tenant_id', 'fake_tenant')
setattr(v, '_info', {'attachments': [{'server_id': 1234}],
'id': 1234, 'display_name': 'sample-volume',
'os-vol-tenant-attr:tenant_id': 'fake_tenant'})
shell_v1._translate_volume_keys([v])
self.assertEqual('fake_tenant', v.tenant_id)
def test_list(self):
self.run_command('list')
# NOTE(jdg): we default to detail currently
self.assert_called('GET', '/volumes/detail?all_tenants=0')
def test_list_filter_tenant_with_all_tenants(self):
self.run_command('list --tenant=123 --all-tenants 1')
self.assert_called('GET',
'/volumes/detail?all_tenants=1&project_id=123')
def test_list_filter_tenant_without_all_tenants(self):
self.run_command('list --tenant=123')
self.assert_called('GET',
'/volumes/detail?all_tenants=1&project_id=123')
def test_metadata_args_with_limiter(self):
self.run_command('create --metadata key1="--test1" 1')
expected = {'volume': {'snapshot_id': None,
'display_description': None,
'source_volid': None,
'status': 'creating',
'size': 1,
'volume_type': None,
'imageRef': None,
'availability_zone': None,
'attach_status': 'detached',
'user_id': None,
'project_id': None,
'metadata': {'key1': '"--test1"'},
'display_name': None}}
self.assert_called_anytime('POST', '/volumes', expected)
def test_metadata_args_limiter_display_name(self):
self.run_command('create --metadata key1="--t1" --display-name="t" 1')
expected = {'volume': {'snapshot_id': None,
'display_description': None,
'source_volid': None,
'status': 'creating',
'size': 1,
'volume_type': None,
'imageRef': None,
'availability_zone': None,
'attach_status': 'detached',
'user_id': None,
'project_id': None,
'metadata': {'key1': '"--t1"'},
'display_name': '"t"'}}
self.assert_called_anytime('POST', '/volumes', expected)
def test_delimit_metadata_args(self):
self.run_command('create --metadata key1="test1" key2="test2" 1')
expected = {'volume': {'snapshot_id': None,
'display_description': None,
'source_volid': None,
'status': 'creating',
'size': 1,
'volume_type': None,
'imageRef': None,
'availability_zone': None,
'attach_status': 'detached',
'user_id': None,
'project_id': None,
'metadata': {'key1': '"test1"',
'key2': '"test2"'},
'display_name': None}}
self.assert_called_anytime('POST', '/volumes', expected)
def test_delimit_metadata_args_display_name(self):
self.run_command('create --metadata key1="t1" --display-name="t" 1')
expected = {'volume': {'snapshot_id': None,
'display_description': None,
'source_volid': None,
'status': 'creating',
'size': 1,
'volume_type': None,
'imageRef': None,
'availability_zone': None,
'attach_status': 'detached',
'user_id': None,
'project_id': None,
'metadata': {'key1': '"t1"'},
'display_name': '"t"'}}
self.assert_called_anytime('POST', '/volumes', expected)
def test_list_filter_status(self):
self.run_command('list --status=available')
self.assert_called('GET',
'/volumes/detail?all_tenants=0&status=available')
def test_list_filter_display_name(self):
self.run_command('list --display-name=1234')
self.assert_called('GET',
'/volumes/detail?all_tenants=0&display_name=1234')
def test_list_all_tenants(self):
self.run_command('list --all-tenants=1')
self.assert_called('GET', '/volumes/detail?all_tenants=1')
def test_list_availability_zone(self):
self.run_command('availability-zone-list')
self.assert_called('GET', '/os-availability-zone')
def test_list_limit(self):
self.run_command('list --limit=10')
self.assert_called('GET', '/volumes/detail?all_tenants=0&limit=10')
def test_show(self):
self.run_command('show 1234')
self.assert_called('GET', '/volumes/1234')
def test_delete(self):
self.run_command('delete 1234')
self.assert_called('DELETE', '/volumes/1234')
def test_delete_by_name(self):
self.run_command('delete sample-volume')
self.assert_called_anytime('GET', '/volumes/detail?all_tenants=1&'
'display_name=sample-volume')
self.assert_called('DELETE', '/volumes/1234')
def test_delete_multiple(self):
self.run_command('delete 1234 5678')
self.assert_called_anytime('DELETE', '/volumes/1234')
self.assert_called('DELETE', '/volumes/5678')
def test_backup(self):
self.run_command('backup-create 1234')
self.assert_called('POST', '/backups')
def test_restore(self):
self.run_command('backup-restore 1234')
self.assert_called('POST', '/backups/1234/restore')
def test_snapshot_list_filter_volume_id(self):
self.run_command('snapshot-list --volume-id=1234')
self.assert_called('GET',
'/snapshots/detail?all_tenants=0&volume_id=1234')
def test_snapshot_list_filter_status_and_volume_id(self):
self.run_command('snapshot-list --status=available --volume-id=1234')
self.assert_called('GET', '/snapshots/detail?'
'all_tenants=0&status=available&volume_id=1234')
def test_rename(self):
# basic rename with positional arguments
self.run_command('rename 1234 new-name')
expected = {'volume': {'display_name': 'new-name'}}
self.assert_called('PUT', '/volumes/1234', body=expected)
# change description only
self.run_command('rename 1234 --display-description=new-description')
expected = {'volume': {'display_description': 'new-description'}}
self.assert_called('PUT', '/volumes/1234', body=expected)
# rename and change description
self.run_command('rename 1234 new-name '
'--display-description=new-description')
expected = {'volume': {
'display_name': 'new-name',
'display_description': 'new-description',
}}
self.assert_called('PUT', '/volumes/1234', body=expected)
# Call rename with no arguments
self.assertRaises(SystemExit, self.run_command, 'rename')
def test_rename_snapshot(self):
# basic rename with positional arguments
self.run_command('snapshot-rename 1234 new-name')
expected = {'snapshot': {'display_name': 'new-name'}}
self.assert_called('PUT', '/snapshots/1234', body=expected)
# change description only
self.run_command('snapshot-rename 1234 '
'--display-description=new-description')
expected = {'snapshot': {'display_description': 'new-description'}}
self.assert_called('PUT', '/snapshots/1234', body=expected)
# snapshot-rename and change description
self.run_command('snapshot-rename 1234 new-name '
'--display-description=new-description')
expected = {'snapshot': {
'display_name': 'new-name',
'display_description': 'new-description',
}}
self.assert_called('PUT', '/snapshots/1234', body=expected)
# Call snapshot-rename with no arguments
self.assertRaises(SystemExit, self.run_command, 'snapshot-rename')
def test_set_metadata_set(self):
self.run_command('metadata 1234 set key1=val1 key2=val2')
self.assert_called('POST', '/volumes/1234/metadata',
{'metadata': {'key1': 'val1', 'key2': 'val2'}})
def test_set_metadata_delete_dict(self):
self.run_command('metadata 1234 unset key1=val1 key2=val2')
self.assert_called('DELETE', '/volumes/1234/metadata/key1')
self.assert_called('DELETE', '/volumes/1234/metadata/key2', pos=-2)
def test_set_metadata_delete_keys(self):
self.run_command('metadata 1234 unset key1 key2')
self.assert_called('DELETE', '/volumes/1234/metadata/key1')
self.assert_called('DELETE', '/volumes/1234/metadata/key2', pos=-2)
def test_reset_state(self):
self.run_command('reset-state 1234')
expected = {'os-reset_status': {'status': 'available'}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
def test_reset_state_attach(self):
self.run_command('reset-state --state in-use 1234')
expected = {'os-reset_status': {'status': 'in-use'}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
def test_reset_state_with_flag(self):
self.run_command('reset-state --state error 1234')
expected = {'os-reset_status': {'status': 'error'}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
def test_reset_state_multiple(self):
self.run_command('reset-state 1234 5678 --state error')
expected = {'os-reset_status': {'status': 'error'}}
self.assert_called_anytime('POST', '/volumes/1234/action',
body=expected)
self.assert_called_anytime('POST', '/volumes/5678/action',
body=expected)
def test_reset_state_two_with_one_nonexistent(self):
cmd = 'reset-state 1234 123456789'
self.assertRaises(exceptions.CommandError, self.run_command, cmd)
expected = {'os-reset_status': {'status': 'available'}}
self.assert_called_anytime('POST', '/volumes/1234/action',
body=expected)
def test_reset_state_one_with_one_nonexistent(self):
cmd = 'reset-state 123456789'
self.assertRaises(exceptions.CommandError, self.run_command, cmd)
def test_snapshot_reset_state(self):
self.run_command('snapshot-reset-state 1234')
expected = {'os-reset_status': {'status': 'available'}}
self.assert_called('POST', '/snapshots/1234/action', body=expected)
def test_snapshot_reset_state_with_flag(self):
self.run_command('snapshot-reset-state --state error 1234')
expected = {'os-reset_status': {'status': 'error'}}
self.assert_called('POST', '/snapshots/1234/action', body=expected)
def test_snapshot_reset_state_multiple(self):
self.run_command('snapshot-reset-state 1234 5678')
expected = {'os-reset_status': {'status': 'available'}}
self.assert_called_anytime('POST', '/snapshots/1234/action',
body=expected)
self.assert_called_anytime('POST', '/snapshots/5678/action',
body=expected)
def test_encryption_type_list(self):
"""
Test encryption-type-list shell command.
Verify a series of GET requests are made:
- one to get the volume type list information
- one per volume type to retrieve the encryption type information
"""
self.run_command('encryption-type-list')
self.assert_called_anytime('GET', '/types')
self.assert_called_anytime('GET', '/types/1/encryption')
self.assert_called_anytime('GET', '/types/2/encryption')
def test_encryption_type_show(self):
"""
Test encryption-type-show shell command.
Verify two GET requests are made per command invocation:
- one to get the volume type information
- one to get the encryption type information
"""
self.run_command('encryption-type-show 1')
self.assert_called('GET', '/types/1/encryption')
self.assert_called_anytime('GET', '/types/1')
def test_encryption_type_create(self):
"""
Test encryption-type-create shell command.
Verify GET and POST requests are made per command invocation:
- one GET request to retrieve the relevant volume type information
- one POST request to create the new encryption type
"""
expected = {'encryption': {'cipher': None, 'key_size': None,
'provider': 'TestProvider',
'control_location': 'front-end'}}
self.run_command('encryption-type-create 2 TestProvider')
self.assert_called('POST', '/types/2/encryption', body=expected)
self.assert_called_anytime('GET', '/types/2')
def test_encryption_type_update(self):
"""
Test encryption-type-update shell command.
Verify two GETs/one PUT requests are made per command invocation:
- one GET request to retrieve the relevant volume type information
- one GET request to retrieve the relevant encryption type information
- one PUT request to update the encryption type information
"""
self.skipTest("Not implemented")
def test_encryption_type_delete(self):
"""
Test encryption-type-delete shell command.
Verify one GET/one DELETE requests are made per command invocation:
- one GET request to retrieve the relevant volume type information
- one DELETE request to delete the encryption type information
"""
self.run_command('encryption-type-delete 1')
self.assert_called('DELETE', '/types/1/encryption/provider')
self.assert_called_anytime('GET', '/types/1')
def test_migrate_volume(self):
self.run_command('migrate 1234 fakehost --force-host-copy=True')
expected = {'os-migrate_volume': {'force_host_copy': 'True',
'host': 'fakehost'}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
def test_snapshot_metadata_set(self):
self.run_command('snapshot-metadata 1234 set key1=val1 key2=val2')
self.assert_called('POST', '/snapshots/1234/metadata',
{'metadata': {'key1': 'val1', 'key2': 'val2'}})
def test_snapshot_metadata_unset_dict(self):
self.run_command('snapshot-metadata 1234 unset key1=val1 key2=val2')
self.assert_called_anytime('DELETE', '/snapshots/1234/metadata/key1')
self.assert_called_anytime('DELETE', '/snapshots/1234/metadata/key2')
def test_snapshot_metadata_unset_keys(self):
self.run_command('snapshot-metadata 1234 unset key1 key2')
self.assert_called_anytime('DELETE', '/snapshots/1234/metadata/key1')
self.assert_called_anytime('DELETE', '/snapshots/1234/metadata/key2')
def test_volume_metadata_update_all(self):
self.run_command('metadata-update-all 1234 key1=val1 key2=val2')
self.assert_called('PUT', '/volumes/1234/metadata',
{'metadata': {'key1': 'val1', 'key2': 'val2'}})
def test_snapshot_metadata_update_all(self):
self.run_command('snapshot-metadata-update-all\
1234 key1=val1 key2=val2')
self.assert_called('PUT', '/snapshots/1234/metadata',
{'metadata': {'key1': 'val1', 'key2': 'val2'}})
def test_readonly_mode_update(self):
self.run_command('readonly-mode-update 1234 True')
expected = {'os-update_readonly_flag': {'readonly': True}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
self.run_command('readonly-mode-update 1234 False')
expected = {'os-update_readonly_flag': {'readonly': False}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
def test_service_disable(self):
self.run_command('service-disable host cinder-volume')
self.assert_called('PUT', '/os-services/disable',
{"binary": "cinder-volume", "host": "host"})
def test_services_disable_with_reason(self):
cmd = 'service-disable host cinder-volume --reason no_reason'
self.run_command(cmd)
body = {'host': 'host', 'binary': 'cinder-volume',
'disabled_reason': 'no_reason'}
self.assert_called('PUT', '/os-services/disable-log-reason', body)
def test_service_enable(self):
self.run_command('service-enable host cinder-volume')
self.assert_called('PUT', '/os-services/enable',
{"binary": "cinder-volume", "host": "host"})
def test_snapshot_delete(self):
self.run_command('snapshot-delete 1234')
self.assert_called('DELETE', '/snapshots/1234')
def test_quota_delete(self):
self.run_command('quota-delete 1234')
self.assert_called('DELETE', '/os-quota-sets/1234')
def test_snapshot_delete_multiple(self):
self.run_command('snapshot-delete 1234 5678')
self.assert_called('DELETE', '/snapshots/5678')
def test_list_transfer(self):
self.run_command('transfer-list')
self.assert_called('GET', '/os-volume-transfer/detail?all_tenants=0')
def test_list_transfer_all_tenants(self):
self.run_command('transfer-list --all-tenants=1')
self.assert_called('GET', '/os-volume-transfer/detail?all_tenants=1')

@ -1,36 +0,0 @@
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient.tests.unit.fixture_data import client
from cinderclient.tests.unit.fixture_data import snapshots
from cinderclient.tests.unit import utils
class SnapshotActionsTest(utils.FixturedTestCase):
client_fixture_class = client.V1
data_fixture_class = snapshots.Fixture
def test_update_snapshot_status(self):
s = self.cs.volume_snapshots.get('1234')
stat = {'status': 'available'}
self.cs.volume_snapshots.update_snapshot_status(s, stat)
self.assert_called('POST', '/snapshots/1234/action')
def test_update_snapshot_status_with_progress(self):
s = self.cs.volume_snapshots.get('1234')
stat = {'status': 'available', 'progress': '73%'}
self.cs.volume_snapshots.update_snapshot_status(s, stat)
self.assert_called('POST', '/snapshots/1234/action')

@ -1,54 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cinderclient.v1 import volume_types
from cinderclient.tests.unit import utils
from cinderclient.tests.unit.v1 import fakes
cs = fakes.FakeClient()
class TypesTest(utils.TestCase):
def test_list_types(self):
tl = cs.volume_types.list()
cs.assert_called('GET', '/types')
for t in tl:
self.assertIsInstance(t, volume_types.VolumeType)
def test_create(self):
t = cs.volume_types.create('test-type-3')
cs.assert_called('POST', '/types')
self.assertIsInstance(t, volume_types.VolumeType)
def test_set_key(self):
t = cs.volume_types.get(1)
t.set_keys({'k': 'v'})
cs.assert_called('POST',
'/types/1/extra_specs',
{'extra_specs': {'k': 'v'}})
def test_unset_keys(self):
t = cs.volume_types.get(1)
t.unset_keys(['k'])
cs.assert_called('DELETE', '/types/1/extra_specs/k')
def test_unset_multiple_keys(self):
t = cs.volume_types.get(1)
t.unset_keys(['k', 'm'])
cs.assert_called_anytime('DELETE', '/types/1/extra_specs/k')
cs.assert_called_anytime('DELETE', '/types/1/extra_specs/m')
def test_delete(self):
cs.volume_types.delete(1)
cs.assert_called('DELETE', '/types/1')

@ -1,53 +0,0 @@
# Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient.tests.unit import utils
from cinderclient.tests.unit.v1 import fakes
cs = fakes.FakeClient()
class VolumeBackupsTest(utils.TestCase):
def test_create(self):
cs.backups.create('2b695faf-b963-40c8-8464-274008fbcef4')
cs.assert_called('POST', '/backups')
def test_get(self):
backup_id = '76a17945-3c6f-435c-975b-b5685db10b62'
cs.backups.get(backup_id)
cs.assert_called('GET', '/backups/%s' % backup_id)
def test_list(self):
cs.backups.list()
cs.assert_called('GET', '/backups/detail')
def test_delete(self):
b = cs.backups.list()[0]
b.delete()
cs.assert_called('DELETE',
'/backups/76a17945-3c6f-435c-975b-b5685db10b62')
cs.backups.delete('76a17945-3c6f-435c-975b-b5685db10b62')
cs.assert_called('DELETE',
'/backups/76a17945-3c6f-435c-975b-b5685db10b62')
cs.backups.delete(b)
cs.assert_called('DELETE',
'/backups/76a17945-3c6f-435c-975b-b5685db10b62')
def test_restore(self):
backup_id = '76a17945-3c6f-435c-975b-b5685db10b62'
cs.restores.restore(backup_id)
cs.assert_called('POST', '/backups/%s/restore' % backup_id)

@ -1,119 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient.v1.volume_encryption_types import VolumeEncryptionType
from cinderclient.tests.unit import utils
from cinderclient.tests.unit.v1 import fakes
cs = fakes.FakeClient()
FAKE_ENCRY_TYPE = {'provider': 'Test',
'key_size': None,
'cipher': None,
'control_location': None,
'volume_type_id': '65922555-7bc0-47e9-8d88-c7fdbcac4781',
'encryption_id': '62daf814-cf9b-401c-8fc8-f84d7850fb7c'}
class VolumeEncryptionTypesTest(utils.TestCase):
"""
Test suite for the Volume Encryption Types Resource and Manager.
"""
def test_list(self):
"""
Unit test for VolumeEncryptionTypesManager.list
Verify that a series of GET requests are made:
- one GET request for the list of volume types
- one GET request per volume type for encryption type information
Verify that all returned information is :class: VolumeEncryptionType
"""
encryption_types = cs.volume_encryption_types.list()
cs.assert_called_anytime('GET', '/types')
cs.assert_called_anytime('GET', '/types/2/encryption')
cs.assert_called_anytime('GET', '/types/1/encryption')
for encryption_type in encryption_types:
self.assertIsInstance(encryption_type, VolumeEncryptionType)
def test_get(self):
"""
Unit test for VolumeEncryptionTypesManager.get
Verify that one GET request is made for the volume type encryption
type information. Verify that returned information is :class:
VolumeEncryptionType
"""
encryption_type = cs.volume_encryption_types.get(1)
cs.assert_called('GET', '/types/1/encryption')
self.assertIsInstance(encryption_type, VolumeEncryptionType)
def test_get_no_encryption(self):
"""
Unit test for VolumeEncryptionTypesManager.get
Verify that a request on a volume type with no associated encryption
type information returns a VolumeEncryptionType with no attributes.
"""
encryption_type = cs.volume_encryption_types.get(2)
self.assertIsInstance(encryption_type, VolumeEncryptionType)
self.assertFalse(hasattr(encryption_type, 'id'),
'encryption type has an id')
def test_create(self):
"""
Unit test for VolumeEncryptionTypesManager.create
Verify that one POST request is made for the encryption type creation.
Verify that encryption type creation returns a VolumeEncryptionType.
"""
result = cs.volume_encryption_types.create(2, {'provider': 'Test',
'key_size': None,
'cipher': None,
'control_location':
None})
cs.assert_called('POST', '/types/2/encryption')
self.assertIsInstance(result, VolumeEncryptionType)
def test_update(self):
"""
Unit test for VolumeEncryptionTypesManager.update
"""
self.skipTest("Not implemented")
def test_delete(self):
"""
Unit test for VolumeEncryptionTypesManager.delete
Verify that one DELETE request is made for encryption type deletion
Verify that encryption type deletion returns None
"""
result = cs.volume_encryption_types.delete(1)
cs.assert_called('DELETE', '/types/1/encryption/provider')
self.assertIsInstance(result, tuple)
self.assertEqual(202, result[0].status_code)
def test___repr__(self):
"""
Unit test for VolumeEncryptionTypes.__repr__
Verify that one encryption type can be printed
"""
encry_type = VolumeEncryptionType(None, FAKE_ENCRY_TYPE)
self.assertEqual(
"<VolumeEncryptionType: %s>" % FAKE_ENCRY_TYPE['encryption_id'],
repr(encry_type))

@ -1,51 +0,0 @@
# Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient.tests.unit import utils
from cinderclient.tests.unit.v1 import fakes
cs = fakes.FakeClient()
class VolumeTransfersTest(utils.TestCase):
def test_create(self):
cs.transfers.create('1234')
cs.assert_called('POST', '/os-volume-transfer')
def test_get(self):
transfer_id = '5678'
cs.transfers.get(transfer_id)
cs.assert_called('GET', '/os-volume-transfer/%s' % transfer_id)
def test_list(self):
cs.transfers.list()
cs.assert_called('GET', '/os-volume-transfer/detail')
def test_delete(self):
b = cs.transfers.list()[0]
b.delete()
cs.assert_called('DELETE', '/os-volume-transfer/5678')
cs.transfers.delete('5678')
cs.assert_called('DELETE', '/os-volume-transfer/5678')
cs.transfers.delete(b)
cs.assert_called('DELETE', '/os-volume-transfer/5678')
def test_accept(self):
transfer_id = '5678'
auth_key = '12345'
cs.transfers.accept(transfer_id, auth_key)
cs.assert_called('POST', '/os-volume-transfer/%s/accept' % transfer_id)

@ -1,118 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cinderclient.tests.unit import utils
from cinderclient.tests.unit.v1 import fakes
cs = fakes.FakeClient()
class VolumesTest(utils.TestCase):
def test_delete_volume(self):
v = cs.volumes.list()[0]
v.delete()
cs.assert_called('DELETE', '/volumes/1234')
cs.volumes.delete('1234')
cs.assert_called('DELETE', '/volumes/1234')
cs.volumes.delete(v)
cs.assert_called('DELETE', '/volumes/1234')
def test_create_volume(self):
cs.volumes.create(1)
cs.assert_called('POST', '/volumes')
def test_attach(self):
v = cs.volumes.get('1234')
cs.volumes.attach(v, 1, '/dev/vdc', mode='rw')
cs.assert_called('POST', '/volumes/1234/action')
def test_attach_to_host(self):
v = cs.volumes.get('1234')
cs.volumes.attach(v, None, None, host_name='test', mode='rw')
cs.assert_called('POST', '/volumes/1234/action')
def test_detach(self):
v = cs.volumes.get('1234')
cs.volumes.detach(v)
cs.assert_called('POST', '/volumes/1234/action')
def test_reserve(self):
v = cs.volumes.get('1234')
cs.volumes.reserve(v)
cs.assert_called('POST', '/volumes/1234/action')
def test_unreserve(self):
v = cs.volumes.get('1234')
cs.volumes.unreserve(v)
cs.assert_called('POST', '/volumes/1234/action')
def test_begin_detaching(self):
v = cs.volumes.get('1234')
cs.volumes.begin_detaching(v)
cs.assert_called('POST', '/volumes/1234/action')
def test_roll_detaching(self):
v = cs.volumes.get('1234')
cs.volumes.roll_detaching(v)
cs.assert_called('POST', '/volumes/1234/action')
def test_initialize_connection(self):
v = cs.volumes.get('1234')
cs.volumes.initialize_connection(v, {})
cs.assert_called('POST', '/volumes/1234/action')
def test_terminate_connection(self):
v = cs.volumes.get('1234')
cs.volumes.terminate_connection(v, {})
cs.assert_called('POST', '/volumes/1234/action')
def test_set_metadata(self):
cs.volumes.set_metadata(1234, {'k1': 'v1'})
cs.assert_called('POST', '/volumes/1234/metadata',
{'metadata': {'k1': 'v1'}})
def test_delete_metadata(self):
keys = ['key1']
cs.volumes.delete_metadata(1234, keys)
cs.assert_called('DELETE', '/volumes/1234/metadata/key1')
def test_extend(self):
v = cs.volumes.get('1234')
cs.volumes.extend(v, 2)
cs.assert_called('POST', '/volumes/1234/action')
def test_get_encryption_metadata(self):
cs.volumes.get_encryption_metadata('1234')
cs.assert_called('GET', '/volumes/1234/encryption')
def test_migrate(self):
v = cs.volumes.get('1234')
cs.volumes.migrate_volume(v, 'dest', False)
cs.assert_called('POST', '/volumes/1234/action')
def test_metadata_update_all(self):
cs.volumes.update_all_metadata(1234, {'k1': 'v1'})
cs.assert_called('PUT', '/volumes/1234/metadata',
{'metadata': {'k1': 'v1'}})
def test_readonly_mode_update(self):
v = cs.volumes.get('1234')
cs.volumes.update_readonly_flag(v, True)
cs.assert_called('POST', '/volumes/1234/action')
def test_set_bootable(self):
v = cs.volumes.get('1234')
cs.volumes.set_bootable(v, True)
cs.assert_called('POST', '/volumes/1234/action')

@ -18,7 +18,7 @@ from cinderclient import extension
from cinderclient.v2.contrib import list_extensions from cinderclient.v2.contrib import list_extensions
from cinderclient.tests.unit import utils from cinderclient.tests.unit import utils
from cinderclient.tests.unit.v1 import fakes from cinderclient.tests.unit.v2 import fakes
extensions = [ extensions = [

@ -675,19 +675,7 @@ class FakeHTTPClient(fake_v2.FakeHTTPClient):
def fake_request_get(): def fake_request_get():
versions = {'versions': [{'id': 'v1.0', versions = {'versions': [{'id': 'v2.0',
'links': [{'href': 'http://docs.openstack.org/',
'rel': 'describedby',
'type': 'text/html'},
{'href': 'http://192.168.122.197/v1/',
'rel': 'self'}],
'media-types': [{'base': 'application/json',
'type': 'application/'}],
'min_version': '',
'status': 'DEPRECATED',
'updated': '2016-05-02T20:25:19Z',
'version': ''},
{'id': 'v2.0',
'links': [{'href': 'http://docs.openstack.org/', 'links': [{'href': 'http://docs.openstack.org/',
'rel': 'describedby', 'rel': 'describedby',
'type': 'text/html'}, 'type': 'text/html'},
@ -696,7 +684,7 @@ def fake_request_get():
'media-types': [{'base': 'application/json', 'media-types': [{'base': 'application/json',
'type': 'application/'}], 'type': 'application/'}],
'min_version': '', 'min_version': '',
'status': 'SUPPORTED', 'status': 'DEPRECATED',
'updated': '2014-06-28T12:20:21Z', 'updated': '2014-06-28T12:20:21Z',
'version': ''}, 'version': ''},
{'id': 'v3.0', {'id': 'v3.0',
@ -715,19 +703,7 @@ def fake_request_get():
def fake_request_get_no_v3(): def fake_request_get_no_v3():
versions = {'versions': [{'id': 'v1.0', versions = {'versions': [{'id': 'v2.0',
'links': [{'href': 'http://docs.openstack.org/',
'rel': 'describedby',
'type': 'text/html'},
{'href': 'http://192.168.122.197/v1/',
'rel': 'self'}],
'media-types': [{'base': 'application/json',
'type': 'application/'}],
'min_version': '',
'status': 'DEPRECATED',
'updated': '2016-05-02T20:25:19Z',
'version': ''},
{'id': 'v2.0',
'links': [{'href': 'http://docs.openstack.org/', 'links': [{'href': 'http://docs.openstack.org/',
'rel': 'describedby', 'rel': 'describedby',
'type': 'text/html'}, 'type': 'text/html'},
@ -736,7 +712,7 @@ def fake_request_get_no_v3():
'media-types': [{'base': 'application/json', 'media-types': [{'base': 'application/json',
'type': 'application/'}], 'type': 'application/'}],
'min_version': '', 'min_version': '',
'status': 'SUPPORTED', 'status': 'DEPRECATED',
'updated': '2014-06-28T12:20:21Z', 'updated': '2014-06-28T12:20:21Z',
'version': ''}]} 'version': ''}]}
return versions return versions

@ -1,17 +0,0 @@
# Copyright (c) 2012 OpenStack Foundation
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient.v1.client import Client # noqa

@ -1,42 +0,0 @@
# Copyright 2011-2013 OpenStack Foundation
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Availability Zone interface (v1 extension)"""
from cinderclient import base
class AvailabilityZone(base.Resource):
NAME_ATTR = 'display_name'
def __repr__(self):
return "<AvailabilityZone: %s>" % self.zoneName
class AvailabilityZoneManager(base.ManagerWithFind):
"""Manage :class:`AvailabilityZone` resources."""
resource_class = AvailabilityZone
def list(self, detailed=False):
"""Lists all availability zones.
:rtype: list of :class:`AvailabilityZone`
"""
if detailed is True:
return self._list("/os-availability-zone/detail",
"availabilityZoneInfo")
else:
return self._list("/os-availability-zone", "availabilityZoneInfo")

@ -1,123 +0,0 @@
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient import client
from cinderclient.v1 import availability_zones
from cinderclient.v1 import limits
from cinderclient.v1 import qos_specs
from cinderclient.v1 import quota_classes
from cinderclient.v1 import quotas
from cinderclient.v1 import services
from cinderclient.v1 import volume_backups
from cinderclient.v1 import volume_backups_restore
from cinderclient.v1 import volume_encryption_types
from cinderclient.v1 import volume_snapshots
from cinderclient.v1 import volume_transfers
from cinderclient.v1 import volume_types
from cinderclient.v1 import volumes
class Client(object):
"""
Top-level object to access the OpenStack Volume API.
Create an instance with your creds::
>>> client = Client(USERNAME, PASSWORD, PROJECT_ID, AUTH_URL)
Then call methods on its managers::
>>> client.volumes.list()
...
"""
version = '1'
def __init__(self, username=None, api_key=None, project_id=None,
auth_url='', insecure=False, timeout=None, tenant_id=None,
proxy_tenant_id=None, proxy_token=None, region_name=None,
endpoint_type='publicURL', extensions=None,
service_type='volume', service_name=None,
volume_service_name=None, bypass_url=None,
retries=0, http_log_debug=False,
cacert=None, auth_system='keystone', auth_plugin=None,
session=None, **kwargs):
# FIXME(comstud): Rename the api_key argument above when we
# know it's not being used as keyword argument
password = api_key
self.limits = limits.LimitsManager(self)
# extensions
self.volumes = volumes.VolumeManager(self)
self.volume_snapshots = volume_snapshots.SnapshotManager(self)
self.volume_types = volume_types.VolumeTypeManager(self)
self.volume_encryption_types = \
volume_encryption_types.VolumeEncryptionTypeManager(self)
self.qos_specs = qos_specs.QoSSpecsManager(self)
self.quota_classes = quota_classes.QuotaClassSetManager(self)
self.quotas = quotas.QuotaSetManager(self)
self.backups = volume_backups.VolumeBackupManager(self)
self.restores = volume_backups_restore.VolumeBackupRestoreManager(self)
self.transfers = volume_transfers.VolumeTransferManager(self)
self.services = services.ServiceManager(self)
self.availability_zones = \
availability_zones.AvailabilityZoneManager(self)
# Add in any extensions...
if extensions:
for extension in extensions:
if extension.manager_class:
setattr(self, extension.name,
extension.manager_class(self))
self.client = client._construct_http_client(
username=username,
password=password,
project_id=project_id,
auth_url=auth_url,
insecure=insecure,
timeout=timeout,
tenant_id=tenant_id,
proxy_tenant_id=tenant_id,
proxy_token=proxy_token,
region_name=region_name,
endpoint_type=endpoint_type,
service_type=service_type,
service_name=service_name,
volume_service_name=volume_service_name,
bypass_url=bypass_url,
retries=retries,
http_log_debug=http_log_debug,
cacert=cacert,
auth_system=auth_system,
auth_plugin=auth_plugin,
session=session,
**kwargs)
def authenticate(self):
"""
Authenticate against the server.
Normally this is called automatically when you first access the API,
but you can call this method to force authentication right now.
Returns on success; raises :exc:`exceptions.Unauthorized` if the
credentials are wrong.
"""
self.client.authenticate()
def get_volume_api_version_from_endpoint(self):
return self.client.get_volume_api_version_from_endpoint()

@ -1,46 +0,0 @@
# Copyright (c) 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient import base
from cinderclient import utils
class ListExtResource(base.Resource):
@property
def summary(self):
descr = self.description.strip()
if not descr:
return '??'
lines = descr.split("\n")
if len(lines) == 1:
return lines[0]
else:
return lines[0] + "..."
class ListExtManager(base.Manager):
resource_class = ListExtResource
def show_all(self):
return self._list("/extensions", 'extensions')
def do_list_extensions(client, _args):
"""
Lists all available os-api extensions.
"""
extensions = client.list_extensions.show_all()
fields = ["Name", "Summary", "Alias", "Updated"]
utils.print_list(extensions, fields)

@ -1,92 +0,0 @@
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cinderclient import base
class Limits(base.Resource):
"""A collection of RateLimit and AbsoluteLimit objects."""
def __repr__(self):
return "<Limits>"
@property
def absolute(self):
for (name, value) in list(self._info['absolute'].items()):
yield AbsoluteLimit(name, value)
@property
def rate(self):
for group in self._info['rate']:
uri = group['uri']
regex = group['regex']
for rate in group['limit']:
yield RateLimit(rate['verb'], uri, regex, rate['value'],
rate['remaining'], rate['unit'],
rate['next-available'])
class RateLimit(object):
"""Data model that represents a flattened view of a single rate limit."""
def __init__(self, verb, uri, regex, value, remain,
unit, next_available):
self.verb = verb
self.uri = uri
self.regex = regex
self.value = value
self.remain = remain
self.unit = unit
self.next_available = next_available
def __eq__(self, other):
return self.uri == other.uri \
and self.regex == other.regex \
and self.value == other.value \
and self.verb == other.verb \
and self.remain == other.remain \
and self.unit == other.unit \
and self.next_available == other.next_available
def __repr__(self):
return "<RateLimit: method=%s uri=%s>" % (self.verb, self.uri)
class AbsoluteLimit(object):
"""Data model that represents a single absolute limit."""
def __init__(self, name, value):
self.name = name
self.value = value
def __eq__(self, other):
return self.value == other.value and self.name == other.name
def __repr__(self):
return "<AbsoluteLimit: name=%s>" % (self.name)
class LimitsManager(base.Manager):
"""Manager object used to interact with limits resource."""
resource_class = Limits
def get(self):
"""
Get a specific extension.
:rtype: :class:`Limits`
"""
return self._get("/limits", "limits")

@ -1,149 +0,0 @@
# Copyright (c) 2013 eBay Inc.
# Copyright (c) OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
QoS Specs interface.
"""
from cinderclient import base
class QoSSpecs(base.Resource):
"""QoS specs entity represents quality-of-service parameters/requirements.
A QoS specs is a set of parameters or requirements for quality-of-service
purpose, which can be associated with volume types (for now). In future,
QoS specs may be extended to be associated other entities, such as single
volume.
"""
def __repr__(self):
return "<QoSSpecs: %s>" % self.name
def delete(self):
return self.manager.delete(self)
class QoSSpecsManager(base.ManagerWithFind):
"""
Manage :class:`QoSSpecs` resources.
"""
resource_class = QoSSpecs
def list(self, search_opts=None):
"""Get a list of all qos specs.
:rtype: list of :class:`QoSSpecs`.
"""
return self._list("/qos-specs", "qos_specs")
def get(self, qos_specs):
"""Get a specific qos specs.
:param qos_specs: The ID of the :class:`QoSSpecs` to get.
:rtype: :class:`QoSSpecs`
"""
return self._get("/qos-specs/%s" % base.getid(qos_specs), "qos_specs")
def delete(self, qos_specs, force=False):
"""Delete a specific qos specs.
:param qos_specs: The ID of the :class:`QoSSpecs` to be removed.
:param force: Flag that indicates whether to delete target qos specs
if it was in-use.
"""
self._delete("/qos-specs/%s?force=%s" %
(base.getid(qos_specs), force))
def create(self, name, specs):
"""Create a qos specs.
:param name: Descriptive name of the qos specs, must be unique
:param specs: A dict of key/value pairs to be set
:rtype: :class:`QoSSpecs`
"""
body = {
"qos_specs": {
"name": name,
}
}
body["qos_specs"].update(specs)
return self._create("/qos-specs", body, "qos_specs")
def set_keys(self, qos_specs, specs):
"""Add/Update keys in a qos_specs.
:param qos_specs: The ID of qos specs
:param specs: A dict of key/value pairs to be set
:rtype: :class:`QoSSpecs`
"""
body = {
"qos_specs": {}
}
body["qos_specs"].update(specs)
return self._update("/qos-specs/%s" % qos_specs, body)
def unset_keys(self, qos_specs, specs):
"""Remove keys from a qos specs.
:param qos_specs: The ID of qos specs
:param specs: A list of key to be unset
:rtype: :class:`QoSSpecs`
"""
body = {'keys': specs}
return self._update("/qos-specs/%s/delete_keys" % qos_specs,
body)
def get_associations(self, qos_specs):
"""Get associated entities of a qos specs.
:param qos_specs: The id of the :class: `QoSSpecs`
:return: a list of entities that associated with specific qos specs.
"""
return self._list("/qos-specs/%s/associations" % base.getid(qos_specs),
"qos_associations")
def associate(self, qos_specs, vol_type_id):
"""Associate a volume type with specific qos specs.
:param qos_specs: The qos specs to be associated with
:param vol_type_id: The volume type id to be associated with
"""
self.api.client.get("/qos-specs/%s/associate?vol_type_id=%s" %
(base.getid(qos_specs), vol_type_id))
def disassociate(self, qos_specs, vol_type_id):
"""Disassociate qos specs from volume type.
:param qos_specs: The qos specs to be associated with
:param vol_type_id: The volume type id to be associated with
"""
self.api.client.get("/qos-specs/%s/disassociate?vol_type_id=%s" %
(base.getid(qos_specs), vol_type_id))
def disassociate_all(self, qos_specs):
"""Disassociate all entities from specific qos specs.
:param qos_specs: The qos specs to be associated with
"""
self.api.client.get("/qos-specs/%s/disassociate_all" %
base.getid(qos_specs))

@ -1,47 +0,0 @@
# Copyright (c) 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient import base
class QuotaClassSet(base.Resource):
@property
def id(self):
"""QuotaClassSet does not have a 'id' attribute but base.Resource
needs it to self-refresh and QuotaSet is indexed by class_name.
"""
return self.class_name
def update(self, *args, **kwargs):
return self.manager.update(self.class_name, *args, **kwargs)
class QuotaClassSetManager(base.Manager):
resource_class = QuotaClassSet
def get(self, class_name):
return self._get("/os-quota-class-sets/%s" % (class_name),
"quota_class_set")
def update(self, class_name, **updates):
body = {'quota_class_set': {'class_name': class_name}}
for update in updates:
body['quota_class_set'][update] = updates[update]
result = self._update('/os-quota-class-sets/%s' % (class_name), body)
return self.resource_class(self,
result['quota_class_set'], loaded=True)

@ -1,57 +0,0 @@
# Copyright (c) 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient import base
class QuotaSet(base.Resource):
@property
def id(self):
"""QuotaSet does not have a 'id' attribute but base. Resource needs it
to self-refresh and QuotaSet is indexed by tenant_id.
"""
return self.tenant_id
def update(self, *args, **kwargs):
return self.manager.update(self.tenant_id, *args, **kwargs)
class QuotaSetManager(base.Manager):
resource_class = QuotaSet
def get(self, tenant_id, usage=False):
if hasattr(tenant_id, 'tenant_id'):
tenant_id = tenant_id.tenant_id
return self._get("/os-quota-sets/%s?usage=%s" % (tenant_id, usage),
"quota_set")
def update(self, tenant_id, **updates):
body = {'quota_set': {'tenant_id': tenant_id}}
for update in updates:
body['quota_set'][update] = updates[update]
result = self._update('/os-quota-sets/%s' % (tenant_id), body)
return self.resource_class(self, result['quota_set'], loaded=True)
def defaults(self, tenant_id):
return self._get('/os-quota-sets/%s/defaults' % tenant_id,
'quota_set')
def delete(self, tenant_id):
if hasattr(tenant_id, 'tenant_id'):
tenant_id = tenant_id.tenant_id
return self._delete("/os-quota-sets/%s" % tenant_id)

@ -1,64 +0,0 @@
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
service interface
"""
from cinderclient import base
class Service(base.Resource):
def __repr__(self):
return "<Service: binary=%s host=%s>" % (self.binary, self.host)
class ServiceManager(base.ManagerWithFind):
resource_class = Service
def list(self, host=None, binary=None):
"""
Describes service list for host.
:param host: destination host name.
:param binary: service binary.
"""
url = "/os-services"
filters = []
if host:
filters.append("host=%s" % host)
if binary:
filters.append("binary=%s" % binary)
if filters:
url = "%s?%s" % (url, "&".join(filters))
return self._list(url, "services")
def enable(self, host, binary):
"""Enable the service specified by hostname and binary."""
body = {"host": host, "binary": binary}
result = self._update("/os-services/enable", body)
return self.resource_class(self, result)
def disable(self, host, binary):
"""Disable the service specified by hostname and binary."""
body = {"host": host, "binary": binary}
result = self._update("/os-services/disable", body)
return self.resource_class(self, result)
def disable_log_reason(self, host, binary, reason):
"""Disable the service with reason."""
body = {"host": host, "binary": binary, "disabled_reason": reason}
result = self._update("/os-services/disable-log-reason", body)
return self.resource_class(self, result)

File diff suppressed because it is too large Load Diff

@ -1,78 +0,0 @@
# Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Volume Backups interface (1.1 extension).
"""
from cinderclient import base
class VolumeBackup(base.Resource):
"""A volume backup is a block level backup of a volume."""
NAME_ATTR = "display_name"
def __repr__(self):
return "<VolumeBackup: %s>" % self.id
def delete(self):
"""Delete this volume backup."""
return self.manager.delete(self)
class VolumeBackupManager(base.ManagerWithFind):
"""Manage :class:`VolumeBackup` resources."""
resource_class = VolumeBackup
def create(self, volume_id, container=None,
name=None, description=None):
"""Creates a volume backup.
:param volume_id: The ID of the volume to backup.
:param container: The name of the backup service container.
:param name: The name of the backup.
:param description: The description of the backup.
:rtype: :class:`VolumeBackup`
"""
body = {'backup': {'volume_id': volume_id,
'container': container,
'name': name,
'description': description}}
return self._create('/backups', body, 'backup')
def get(self, backup_id):
"""Show details of a volume backup.
:param backup_id: The ID of the backup to display.
:rtype: :class:`VolumeBackup`
"""
return self._get("/backups/%s" % backup_id, "backup")
def list(self, detailed=True, search_opts=None):
"""Get a list of all volume backups.
:rtype: list of :class:`VolumeBackup`
"""
if detailed is True:
return self._list("/backups/detail", "backups")
else:
return self._list("/backups", "backups")
def delete(self, backup):
"""Delete a volume backup.
:param backup: The :class:`VolumeBackup` to delete.
"""
self._delete("/backups/%s" % base.getid(backup))

@ -1,43 +0,0 @@
# Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Volume Backups Restore interface (1.1 extension).
This is part of the Volume Backups interface.
"""
from cinderclient import base
class VolumeBackupsRestore(base.Resource):
"""A Volume Backups Restore represents a restore operation."""
def __repr__(self):
return "<VolumeBackupsRestore: %s>" % self.volume_id
class VolumeBackupRestoreManager(base.Manager):
"""Manage :class:`VolumeBackupsRestore` resources."""
resource_class = VolumeBackupsRestore
def restore(self, backup_id, volume_id=None):
"""Restore a backup to a volume.
:param backup_id: The ID of the backup to restore.
:param volume_id: The ID of the volume to restore the backup to.
:rtype: :class:`Restore`
"""
body = {'restore': {'volume_id': volume_id}}
return self._create("/backups/%s/restore" % backup_id,
body, "restore")

@ -1,98 +0,0 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Volume Encryption Type interface
"""
from cinderclient import base
class VolumeEncryptionType(base.Resource):
"""
A Volume Encryption Type is a collection of settings used to conduct
encryption for a specific volume type.
"""
def __repr__(self):
return "<VolumeEncryptionType: %s>" % self.encryption_id
class VolumeEncryptionTypeManager(base.ManagerWithFind):
"""
Manage :class: `VolumeEncryptionType` resources.
"""
resource_class = VolumeEncryptionType
def list(self, search_opts=None):
"""
List all volume encryption types.
:param search_opts: Search options to filter out volume
encryption types
:return: a list of :class: VolumeEncryptionType instances
"""
# Since the encryption type is a volume type extension, we cannot get
# all encryption types without going through all volume types.
volume_types = self.api.volume_types.list()
encryption_types = []
for volume_type in volume_types:
encryption_type = self._get("/types/%s/encryption"
% base.getid(volume_type))
if hasattr(encryption_type, 'volume_type_id'):
encryption_types.append(encryption_type)
return encryption_types
def get(self, volume_type):
"""
Get the volume encryption type for the specified volume type.
:param volume_type: the volume type to query
:return: an instance of :class: VolumeEncryptionType
"""
return self._get("/types/%s/encryption" % base.getid(volume_type))
def create(self, volume_type, specs):
"""
Creates encryption type for a volume type. Default: admin only.
:param volume_type: the volume type on which to add an encryption type
:param specs: the encryption type specifications to add
:return: an instance of :class: VolumeEncryptionType
"""
body = {'encryption': specs}
return self._create("/types/%s/encryption" % base.getid(volume_type),
body, "encryption")
def update(self, volume_type, specs):
"""
Update the encryption type information for the specified volume type.
:param volume_type: the volume type whose encryption type information
must be updated
:param specs: the encryption type specifications to update
:return: an instance of :class: VolumeEncryptionType
"""
raise NotImplementedError()
def delete(self, volume_type):
"""
Delete the encryption type information for the specified volume type.
:param volume_type: the volume type whose encryption type information
must be deleted
"""
return self._delete("/types/%s/encryption/provider" %
base.getid(volume_type))

@ -1,183 +0,0 @@
# Copyright 2011 Denali Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Volume snapshot interface (1.1 extension).
"""
from cinderclient import base
from cinderclient import utils
class Snapshot(base.Resource):
"""
A Snapshot is a point-in-time snapshot of an openstack volume.
"""
NAME_ATTR = "display_name"
def __repr__(self):
return "<Snapshot: %s>" % self.id
def delete(self):
"""
Delete this snapshot.
"""
self.manager.delete(self)
def update(self, **kwargs):
"""
Update the display_name or display_description for this snapshot.
"""
self.manager.update(self, **kwargs)
@property
def progress(self):
return self._info.get('os-extended-snapshot-attributes:progress')
@property
def project_id(self):
return self._info.get('os-extended-snapshot-attributes:project_id')
def reset_state(self, state):
"""Update the snapshot with the provided state."""
self.manager.reset_state(self, state)
def set_metadata(self, metadata):
"""Set metadata of this snapshot."""
return self.manager.set_metadata(self, metadata)
def delete_metadata(self, keys):
"""Delete metadata of this snapshot."""
return self.manager.delete_metadata(self, keys)
def update_all_metadata(self, metadata):
"""Update_all metadata of this snapshot."""
return self.manager.update_all_metadata(self, metadata)
class SnapshotManager(base.ManagerWithFind):
"""
Manage :class:`Snapshot` resources.
"""
resource_class = Snapshot
def create(self, volume_id, force=False,
display_name=None, display_description=None):
"""
Create a snapshot of the given volume.
:param volume_id: The ID of the volume to snapshot.
:param force: If force is True, create a snapshot even if the volume is
attached to an instance. Default is False.
:param display_name: Name of the snapshot
:param display_description: Description of the snapshot
:rtype: :class:`Snapshot`
"""
body = {'snapshot': {'volume_id': volume_id,
'force': force,
'display_name': display_name,
'display_description': display_description}}
return self._create('/snapshots', body, 'snapshot')
def get(self, snapshot_id):
"""
Get a snapshot.
:param snapshot_id: The ID of the snapshot to get.
:rtype: :class:`Snapshot`
"""
return self._get("/snapshots/%s" % snapshot_id, "snapshot")
def list(self, detailed=True, search_opts=None):
"""
Get a list of all snapshots.
:rtype: list of :class:`Snapshot`
"""
query_string = utils.build_query_param(search_opts, sort=True)
detail = ""
if detailed:
detail = "/detail"
return self._list("/snapshots%s%s" % (detail, query_string),
"snapshots")
def delete(self, snapshot):
"""
Delete a snapshot.
:param snapshot: The :class:`Snapshot` to delete.
"""
self._delete("/snapshots/%s" % base.getid(snapshot))
def update(self, snapshot, **kwargs):
"""
Update the display_name or display_description for a snapshot.
:param snapshot: The :class:`Snapshot` to update.
"""
if not kwargs:
return
body = {"snapshot": kwargs}
self._update("/snapshots/%s" % base.getid(snapshot), body)
def reset_state(self, snapshot, state):
"""Update the specified volume with the provided state."""
return self._action('os-reset_status', snapshot, {'status': state})
def _action(self, action, snapshot, info=None, **kwargs):
"""Perform a snapshot action."""
body = {action: info}
self.run_hooks('modify_body_for_action', body, **kwargs)
url = '/snapshots/%s/action' % base.getid(snapshot)
return self.api.client.post(url, body=body)
def update_snapshot_status(self, snapshot, update_dict):
return self._action('os-update_snapshot_status',
base.getid(snapshot), update_dict)
def set_metadata(self, snapshot, metadata):
"""Update/Set a snapshots metadata.
:param snapshot: The :class:`Snapshot`.
:param metadata: A list of keys to be set.
"""
body = {'metadata': metadata}
return self._create("/snapshots/%s/metadata" % base.getid(snapshot),
body, "metadata")
def delete_metadata(self, snapshot, keys):
"""Delete specified keys from snapshot metadata.
:param snapshot: The :class:`Snapshot`.
:param keys: A list of keys to be removed.
"""
snapshot_id = base.getid(snapshot)
for k in keys:
self._delete("/snapshots/%s/metadata/%s" % (snapshot_id, k))
def update_all_metadata(self, snapshot, metadata):
"""Update_all snapshot metadata.
:param snapshot: The :class:`Snapshot`.
:param metadata: A list of keys to be updated.
"""
body = {'metadata': metadata}
return self._update("/snapshots/%s/metadata" % base.getid(snapshot),
body)

@ -1,88 +0,0 @@
# Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Volume transfer interface (1.1 extension).
"""
from cinderclient import base
from cinderclient import utils
class VolumeTransfer(base.Resource):
"""Transfer a volume from one tenant to another"""
def __repr__(self):
return "<VolumeTransfer: %s>" % self.id
def delete(self):
"""Delete this volume transfer."""
return self.manager.delete(self)
class VolumeTransferManager(base.ManagerWithFind):
"""Manage :class:`VolumeTransfer` resources."""
resource_class = VolumeTransfer
def create(self, volume_id, name=None):
"""Creates a volume transfer.
:param volume_id: The ID of the volume to transfer.
:param name: The name of the transfer.
:rtype: :class:`VolumeTransfer`
"""
body = {'transfer': {'volume_id': volume_id,
'name': name}}
return self._create('/os-volume-transfer', body, 'transfer')
def accept(self, transfer_id, auth_key):
"""Accept a volume transfer.
:param transfer_id: The ID of the transfer to accept.
:param auth_key: The auth_key of the transfer.
:rtype: :class:`VolumeTransfer`
"""
body = {'accept': {'auth_key': auth_key}}
return self._create('/os-volume-transfer/%s/accept' % transfer_id,
body, 'transfer')
def get(self, transfer_id):
"""Show details of a volume transfer.
:param transfer_id: The ID of the volume transfer to display.
:rtype: :class:`VolumeTransfer`
"""
return self._get("/os-volume-transfer/%s" % transfer_id, "transfer")
def list(self, detailed=True, search_opts=None):
"""Get a list of all volume transfer.
:rtype: list of :class:`VolumeTransfer`
"""
query_string = utils.build_query_param(search_opts)
detail = ""
if detailed:
detail = "/detail"
return self._list("/os-volume-transfer%s%s" % (detail, query_string),
"transfers")
def delete(self, transfer_id):
"""Delete a volume transfer.
:param transfer_id: The :class:`VolumeTransfer` to delete.
"""
self._delete("/os-volume-transfer/%s" % base.getid(transfer_id))

@ -1,118 +0,0 @@
# Copyright (c) 2011 Rackspace US, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Volume Type interface.
"""
from cinderclient import base
class VolumeType(base.Resource):
"""
A Volume Type is the type of volume to be created
"""
def __repr__(self):
return "<VolumeType: %s>" % self.name
def get_keys(self):
"""
Get extra specs from a volume type.
:param vol_type: The :class:`VolumeType` to get extra specs from
"""
_resp, body = self.manager.api.client.get(
"/types/%s/extra_specs" %
base.getid(self))
return body["extra_specs"]
def set_keys(self, metadata):
"""
Set extra specs on a volume type.
:param type : The :class:`VolumeType` to set extra spec on
:param metadata: A dict of key/value pairs to be set
"""
body = {'extra_specs': metadata}
return self.manager._create(
"/types/%s/extra_specs" % base.getid(self),
body,
"extra_specs",
return_raw=True)
def unset_keys(self, keys):
"""
Unset extra specs on a volume type.
:param type_id: The :class:`VolumeType` to unset extra spec on
:param keys: A list of keys to be unset
"""
# NOTE(jdg): This wasn't actually doing all of the keys before
# the return in the loop resulted in only ONE key being unset,
# since on success the return was None, we'll only interrupt
# the loop and if an exception is raised.
for k in keys:
self.manager._delete("/types/%s/extra_specs/%s" %
(base.getid(self), k))
class VolumeTypeManager(base.ManagerWithFind):
"""
Manage :class:`VolumeType` resources.
"""
resource_class = VolumeType
def list(self, search_opts=None):
"""
Get a list of all volume types.
:rtype: list of :class:`VolumeType`.
"""
return self._list("/types", "volume_types")
def get(self, volume_type):
"""
Get a specific volume type.
:param volume_type: The ID of the :class:`VolumeType` to get.
:rtype: :class:`VolumeType`
"""
return self._get("/types/%s" % base.getid(volume_type), "volume_type")
def delete(self, volume_type):
"""
Delete a specific volume_type.
:param volume_type: The name or ID of the :class:`VolumeType` to get.
"""
self._delete("/types/%s" % base.getid(volume_type))
def create(self, name):
"""
Creates a volume type.
:param name: Descriptive name of the volume type
:rtype: :class:`VolumeType`
"""
body = {
"volume_type": {
"name": name,
}
}
return self._create("/types", body, "volume_type")

@ -1,428 +0,0 @@
# Copyright 2011 Denali Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Volume interface (1.1 extension).
"""
from cinderclient import base
from cinderclient import utils
class Volume(base.Resource):
"""A volume is an extra block level storage to the OpenStack instances."""
NAME_ATTR = "display_name"
def __repr__(self):
return "<Volume: %s>" % self.id
def delete(self):
"""Delete this volume."""
self.manager.delete(self)
def update(self, **kwargs):
"""Update the display_name or display_description for this volume."""
self.manager.update(self, **kwargs)
def attach(self, instance_uuid, mountpoint, mode='rw',
host_name=None):
"""Set attachment metadata.
:param instance_uuid: uuid of the attaching instance.
:param mountpoint: mountpoint on the attaching instance or host.
:param mode: the access mode
:param host_name: name of the attaching host.
"""
return self.manager.attach(self, instance_uuid, mountpoint, mode,
host_name)
def detach(self):
"""Clear attachment metadata."""
return self.manager.detach(self)
def reserve(self, volume):
"""Reserve this volume."""
return self.manager.reserve(self)
def unreserve(self, volume):
"""Unreserve this volume."""
return self.manager.unreserve(self)
def begin_detaching(self, volume):
"""Begin detaching volume."""
return self.manager.begin_detaching(self)
def roll_detaching(self, volume):
"""Roll detaching volume."""
return self.manager.roll_detaching(self)
def initialize_connection(self, volume, connector):
"""Initialize a volume connection.
:param connector: connector dict from nova.
"""
return self.manager.initialize_connection(self, connector)
def terminate_connection(self, volume, connector):
"""Terminate a volume connection.
:param connector: connector dict from nova.
"""
return self.manager.terminate_connection(self, connector)
def set_metadata(self, volume, metadata):
"""Set or Append metadata to a volume.
:param volume : The :class: `Volume` to set metadata on
:param metadata: A dict of key/value pairs to set
"""
return self.manager.set_metadata(self, metadata)
def upload_to_image(self, force, image_name, container_format,
disk_format):
"""Upload a volume to image service as an image."""
return self.manager.upload_to_image(self, force, image_name,
container_format, disk_format)
def force_delete(self):
"""Delete the specified volume ignoring its current state.
:param volume: The UUID of the volume to force-delete.
"""
self.manager.force_delete(self)
def reset_state(self, state):
"""Update the volume with the provided state."""
self.manager.reset_state(self, state)
def extend(self, volume, new_size):
"""Extend the size of the specified volume.
:param volume: The UUID of the volume to extend.
:param new_size: The desired size to extend volume to.
"""
self.manager.extend(self, new_size)
def migrate_volume(self, host, force_host_copy):
"""Migrate the volume to a new host."""
self.manager.migrate_volume(self, host, force_host_copy)
def update_all_metadata(self, metadata):
"""Update all metadata of this volume."""
return self.manager.update_all_metadata(self, metadata)
def update_readonly_flag(self, volume, read_only):
"""Update the read-only access mode flag of the specified volume.
:param volume: The UUID of the volume to update.
:param read_only: The value to indicate whether to update volume to
read-only access mode.
"""
self.manager.update_readonly_flag(self, read_only)
class VolumeManager(base.ManagerWithFind):
"""
Manage :class:`Volume` resources.
"""
resource_class = Volume
def create(self, size, snapshot_id=None, source_volid=None,
display_name=None, display_description=None,
volume_type=None, user_id=None,
project_id=None, availability_zone=None,
metadata=None, imageRef=None):
"""
Creates a volume.
:param size: Size of volume in GB
:param snapshot_id: ID of the snapshot
:param display_name: Name of the volume
:param display_description: Description of the volume
:param volume_type: Type of volume
:param user_id: User id derived from context
:param project_id: Project id derived from context
:param availability_zone: Availability Zone to use
:param metadata: Optional metadata to set on volume creation
:param imageRef: reference to an image stored in glance
:param source_volid: ID of source volume to clone from
:rtype: :class:`Volume`
"""
if metadata is None:
volume_metadata = {}
else:
volume_metadata = metadata
body = {'volume': {'size': size,
'snapshot_id': snapshot_id,
'display_name': display_name,
'display_description': display_description,
'volume_type': volume_type,
'user_id': user_id,
'project_id': project_id,
'availability_zone': availability_zone,
'status': "creating",
'attach_status': "detached",
'metadata': volume_metadata,
'imageRef': imageRef,
'source_volid': source_volid,
}}
return self._create('/volumes', body, 'volume')
def get(self, volume_id):
"""
Get a volume.
:param volume_id: The ID of the volume to get.
:rtype: :class:`Volume`
"""
return self._get("/volumes/%s" % volume_id, "volume")
def list(self, detailed=True, search_opts=None, limit=None):
"""
Get a list of all volumes.
:rtype: list of :class:`Volume`
"""
if search_opts is None:
search_opts = {}
if limit:
search_opts['limit'] = limit
query_string = utils.build_query_param(search_opts, sort=True)
detail = ""
if detailed:
detail = "/detail"
return self._list("/volumes%s%s" % (detail, query_string),
"volumes")
def delete(self, volume):
"""
Delete a volume.
:param volume: The :class:`Volume` to delete.
"""
self._delete("/volumes/%s" % base.getid(volume))
def update(self, volume, **kwargs):
"""
Update the display_name or display_description for a volume.
:param volume: The :class:`Volume` to update.
"""
if not kwargs:
return
body = {"volume": kwargs}
self._update("/volumes/%s" % base.getid(volume), body)
def _action(self, action, volume, info=None, **kwargs):
"""
Perform a volume "action."
"""
body = {action: info}
self.run_hooks('modify_body_for_action', body, **kwargs)
url = '/volumes/%s/action' % base.getid(volume)
return self.api.client.post(url, body=body)
def attach(self, volume, instance_uuid, mountpoint, mode='rw',
host_name=None):
"""
Set attachment metadata.
:param volume: The :class:`Volume` (or its ID)
you would like to attach.
:param instance_uuid: uuid of the attaching instance or host.
:param mountpoint: mountpoint on the attaching instance.
:param mode: the access mode.
:param host_name: name of the attaching host.
"""
body = {'mountpoint': mountpoint, 'mode': mode}
if instance_uuid is not None:
body.update({'instance_uuid': instance_uuid})
if host_name is not None:
body.update({'host_name': host_name})
return self._action('os-attach', volume, body)
def detach(self, volume):
"""
Clear attachment metadata.
:param volume: The :class:`Volume` (or its ID)
you would like to detach.
"""
return self._action('os-detach', volume)
def reserve(self, volume):
"""
Reserve this volume.
:param volume: The :class:`Volume` (or its ID)
you would like to reserve.
"""
return self._action('os-reserve', volume)
def unreserve(self, volume):
"""
Unreserve this volume.
:param volume: The :class:`Volume` (or its ID)
you would like to unreserve.
"""
return self._action('os-unreserve', volume)
def begin_detaching(self, volume):
"""
Begin detaching this volume.
:param volume: The :class:`Volume` (or its ID)
you would like to detach.
"""
return self._action('os-begin_detaching', volume)
def roll_detaching(self, volume):
"""
Roll detaching this volume.
:param volume: The :class:`Volume` (or its ID)
you would like to roll detaching.
"""
return self._action('os-roll_detaching', volume)
def initialize_connection(self, volume, connector):
"""
Initialize a volume connection.
:param volume: The :class:`Volume` (or its ID).
:param connector: connector dict from nova.
"""
return self._action('os-initialize_connection', volume,
{'connector': connector})[1]['connection_info']
def terminate_connection(self, volume, connector):
"""
Terminate a volume connection.
:param volume: The :class:`Volume` (or its ID).
:param connector: connector dict from nova.
"""
self._action('os-terminate_connection', volume,
{'connector': connector})
def set_metadata(self, volume, metadata):
"""
Update/Set a volumes metadata.
:param volume: The :class:`Volume`.
:param metadata: A list of keys to be set.
"""
body = {'metadata': metadata}
return self._create("/volumes/%s/metadata" % base.getid(volume),
body, "metadata")
def delete_metadata(self, volume, keys):
"""
Delete specified keys from volumes metadata.
:param volume: The :class:`Volume`.
:param keys: A list of keys to be removed.
"""
for k in keys:
self._delete("/volumes/%s/metadata/%s" % (base.getid(volume), k))
def upload_to_image(self, volume, force, image_name, container_format,
disk_format):
"""
Upload volume to image service as image.
:param volume: The :class:`Volume` to upload.
"""
return self._action('os-volume_upload_image',
volume,
{'force': force,
'image_name': image_name,
'container_format': container_format,
'disk_format': disk_format})
def force_delete(self, volume):
return self._action('os-force_delete', base.getid(volume))
def reset_state(self, volume, state):
"""Update the provided volume with the provided state."""
return self._action('os-reset_status', volume, {'status': state})
def extend(self, volume, new_size):
return self._action('os-extend',
base.getid(volume),
{'new_size': new_size})
def get_encryption_metadata(self, volume_id):
"""
Retrieve the encryption metadata from the desired volume.
:param volume_id: the id of the volume to query
:return: a dictionary of volume encryption metadata
"""
return self._get("/volumes/%s/encryption" % volume_id)._info
def migrate_volume(self, volume, host, force_host_copy):
"""Migrate volume to new host.
:param volume: The :class:`Volume` to migrate
:param host: The destination host
:param force_host_copy: Skip driver optimizations
"""
return self._action('os-migrate_volume',
volume,
{'host': host, 'force_host_copy': force_host_copy})
def migrate_volume_completion(self, old_volume, new_volume, error):
"""Complete the migration from the old volume to the temp new one.
:param old_volume: The original :class:`Volume` in the migration
:param new_volume: The new temporary :class:`Volume` in the migration
:param error: Inform of an error to cause migration cleanup
"""
new_volume_id = base.getid(new_volume)
return self._action('os-migrate_volume_completion',
old_volume,
{'new_volume': new_volume_id, 'error': error})[1]
def update_all_metadata(self, volume, metadata):
"""Update all metadata of a volume.
:param volume: The :class:`Volume`.
:param metadata: A list of keys to be updated.
"""
body = {'metadata': metadata}
return self._update("/volumes/%s/metadata" % base.getid(volume),
body)
def update_readonly_flag(self, volume, flag):
return self._action('os-update_readonly_flag',
base.getid(volume),
{'readonly': flag})
def set_bootable(self, volume, flag):
return self._action('os-set_bootable',
base.getid(volume),
{'bootable': flag})

@ -5,3 +5,8 @@ prelude: >
support for the Cinder v1 API has been removed. Prior to upgrading to this support for the Cinder v1 API has been removed. Prior to upgrading to this
release, ensure all Cinder services that need to be managed are 13.0.0 release, ensure all Cinder services that need to be managed are 13.0.0
(Rocky) or later. (Rocky) or later.
upgrade:
- |
This version of the python-cinderclient no longer supports the Cinder v1
API. Ensure all mananaged services have at least the v2 API available prior
to upgrading this client.