adding v2 support to cinderclient

This will allow the cinderclient to speak to the v2 rest api if it's
enabled in the cinder config. Includes additional v2 tests as well.

blueprint cinderclient-v2-support

Change-Id: I004134d9f528a6eadefdaa89eb48087bcae2691f
This commit is contained in:
Mike Perez 2013-02-02 13:26:18 -08:00
parent 20dcc85fad
commit 636ec5eccd
25 changed files with 2721 additions and 3 deletions

@ -362,6 +362,7 @@ class HTTPClient(object):
def get_client_class(version):
version_map = {
'1': 'cinderclient.v1.client.Client',
'2': 'cinderclient.v2.client.Client',
}
try:
client_path = version_map[str(version)]

@ -32,6 +32,7 @@ from cinderclient import exceptions as exc
import cinderclient.extension
from cinderclient import utils
from cinderclient.v1 import shell as shell_v1
from cinderclient.v2 import shell as shell_v2
DEFAULT_OS_VOLUME_API_VERSION = "1"
DEFAULT_CINDER_ENDPOINT_TYPE = 'publicURL'
@ -162,7 +163,7 @@ class OpenStackCinderShell(object):
metavar='<compute-api-ver>',
default=utils.env('OS_VOLUME_API_VERSION',
default=DEFAULT_OS_VOLUME_API_VERSION),
help='Accepts 1,defaults '
help='Accepts 1 or 2,defaults '
'to env[OS_VOLUME_API_VERSION].')
parser.add_argument('--os_volume_api_version',
help=argparse.SUPPRESS)
@ -223,7 +224,7 @@ class OpenStackCinderShell(object):
try:
actions_module = {
'1.1': shell_v1,
'2': shell_v1,
'2': shell_v2,
}[version]
except KeyError:
actions_module = shell_v1

@ -0,0 +1,17 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient.v2.client import Client

78
cinderclient/v2/client.py Normal file

@ -0,0 +1,78 @@
from cinderclient import client
from cinderclient.v2 import limits
from cinderclient.v2 import quota_classes
from cinderclient.v2 import quotas
from cinderclient.v2 import volumes
from cinderclient.v2 import volume_snapshots
from cinderclient.v2 import volume_types
class Client(object):
"""Top-level object to access the OpenStack Volume API.
Create an instance with your creds::
>>> client = Client(USERNAME, PASSWORD, PROJECT_ID, AUTH_URL)
Then call methods on its managers::
>>> client.volumes.list()
...
"""
def __init__(self, username, api_key, project_id=None, auth_url='',
insecure=False, timeout=None, tenant_id=None,
proxy_tenant_id=None, proxy_token=None, region_name=None,
endpoint_type='publicURL', extensions=None,
service_type='volume', service_name=None,
volume_service_name=None, retries=None,
http_log_debug=False,
cacert=None):
# FIXME(comstud): Rename the api_key argument above when we
# know it's not being used as keyword argument
password = api_key
self.limits = limits.LimitsManager(self)
# extensions
self.volumes = volumes.VolumeManager(self)
self.volume_snapshots = volume_snapshots.SnapshotManager(self)
self.volume_types = volume_types.VolumeTypeManager(self)
self.quota_classes = quota_classes.QuotaClassSetManager(self)
self.quotas = quotas.QuotaSetManager(self)
# Add in any extensions...
if extensions:
for extension in extensions:
if extension.manager_class:
setattr(self, extension.name,
extension.manager_class(self))
self.client = client.HTTPClient(
username,
password,
project_id,
auth_url,
insecure=insecure,
timeout=timeout,
tenant_id=tenant_id,
proxy_token=proxy_token,
proxy_tenant_id=proxy_tenant_id,
region_name=region_name,
endpoint_type=endpoint_type,
service_type=service_type,
service_name=service_name,
volume_service_name=volume_service_name,
retries=retries,
http_log_debug=http_log_debug,
cacert=cacert)
def authenticate(self):
"""Authenticate against the server.
Normally this is called automatically when you first access the API,
but you can call this method to force authentication right now.
Returns on success; raises :exc:`exceptions.Unauthorized` if the
credentials are wrong.
"""
self.client.authenticate()

@ -0,0 +1,15 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

@ -0,0 +1,47 @@
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient import base
from cinderclient import utils
class ListExtResource(base.Resource):
@property
def summary(self):
descr = self.description.strip()
if not descr:
return '??'
lines = descr.split("\n")
if len(lines) == 1:
return lines[0]
else:
return lines[0] + "..."
class ListExtManager(base.Manager):
resource_class = ListExtResource
def show_all(self):
return self._list("/extensions", 'extensions')
@utils.service_type('volume')
def do_list_extensions(client, _args):
"""
List all the os-api extensions that are available.
"""
extensions = client.list_extensions.show_all()
fields = ["Name", "Summary", "Alias", "Updated"]
utils.print_list(extensions, fields)

78
cinderclient/v2/limits.py Normal file

@ -0,0 +1,78 @@
# Copyright 2013 OpenStack LLC.
from cinderclient import base
class Limits(base.Resource):
"""A collection of RateLimit and AbsoluteLimit objects"""
def __repr__(self):
return "<Limits>"
@property
def absolute(self):
for (name, value) in self._info['absolute'].items():
yield AbsoluteLimit(name, value)
@property
def rate(self):
for group in self._info['rate']:
uri = group['uri']
regex = group['regex']
for rate in group['limit']:
yield RateLimit(rate['verb'], uri, regex, rate['value'],
rate['remaining'], rate['unit'],
rate['next-available'])
class RateLimit(object):
"""Data model that represents a flattened view of a single rate limit"""
def __init__(self, verb, uri, regex, value, remain,
unit, next_available):
self.verb = verb
self.uri = uri
self.regex = regex
self.value = value
self.remain = remain
self.unit = unit
self.next_available = next_available
def __eq__(self, other):
return self.uri == other.uri \
and self.regex == other.regex \
and self.value == other.value \
and self.verb == other.verb \
and self.remain == other.remain \
and self.unit == other.unit \
and self.next_available == other.next_available
def __repr__(self):
return "<RateLimit: method=%s uri=%s>" % (self.method, self.uri)
class AbsoluteLimit(object):
"""Data model that represents a single absolute limit"""
def __init__(self, name, value):
self.name = name
self.value = value
def __eq__(self, other):
return self.value == other.value and self.name == other.name
def __repr__(self):
return "<AbsoluteLimit: name=%s>" % (self.name)
class LimitsManager(base.Manager):
"""Manager object used to interact with limits resource"""
resource_class = Limits
def get(self):
"""Get a specific extension.
:rtype: :class:`Limits`
"""
return self._get("/limits", "limits")

@ -0,0 +1,51 @@
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient import base
class QuotaClassSet(base.Resource):
@property
def id(self):
"""Needed by base.Resource to self-refresh and be indexed"""
return self.class_name
def update(self, *args, **kwargs):
self.manager.update(self.class_name, *args, **kwargs)
class QuotaClassSetManager(base.ManagerWithFind):
resource_class = QuotaClassSet
def get(self, class_name):
return self._get("/os-quota-class-sets/%s" % (class_name),
"quota_class_set")
def update(self,
class_name,
volumes=None,
gigabytes=None):
body = {'quota_class_set': {
'class_name': class_name,
'volumes': volumes,
'gigabytes': gigabytes}}
for key in body['quota_class_set'].keys():
if body['quota_class_set'][key] is None:
body['quota_class_set'].pop(key)
self._update('/os-quota-class-sets/%s' % (class_name), body)

53
cinderclient/v2/quotas.py Normal file

@ -0,0 +1,53 @@
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient import base
class QuotaSet(base.Resource):
@property
def id(self):
"""Needed by base.Resource to self-refresh and be indexed"""
return self.tenant_id
def update(self, *args, **kwargs):
self.manager.update(self.tenant_id, *args, **kwargs)
class QuotaSetManager(base.ManagerWithFind):
resource_class = QuotaSet
def get(self, tenant_id):
if hasattr(tenant_id, 'tenant_id'):
tenant_id = tenant_id.tenant_id
return self._get("/os-quota-sets/%s" % (tenant_id), "quota_set")
def update(self, tenant_id, volumes=None, gigabytes=None):
body = {'quota_set': {
'tenant_id': tenant_id,
'volumes': volumes,
'gigabytes': gigabytes}}
for key in body['quota_set'].keys():
if body['quota_set'][key] is None:
body['quota_set'].pop(key)
self._update('/os-quota-sets/%s' % (tenant_id), body)
def defaults(self, tenant_id):
return self._get('/os-quota-sets/%s/defaults' % tenant_id,
'quota_set')

697
cinderclient/v2/shell.py Normal file

@ -0,0 +1,697 @@
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import os
import sys
import time
from cinderclient import exceptions
from cinderclient import utils
def _poll_for_status(poll_fn, obj_id, action, final_ok_states,
poll_period=5, show_progress=True):
"""Block while action is performed, periodically printing progress."""
def print_progress(progress):
if show_progress:
msg = ('\rInstance %(action)s... %(progress)s%% complete'
% dict(action=action, progress=progress))
else:
msg = '\rInstance %(action)s...' % dict(action=action)
sys.stdout.write(msg)
sys.stdout.flush()
print
while True:
obj = poll_fn(obj_id)
status = obj.status.lower()
progress = getattr(obj, 'progress', None) or 0
if status in final_ok_states:
print_progress(100)
print "\nFinished"
break
elif status == "error":
print "\nError %(action)s instance" % locals()
break
else:
print_progress(progress)
time.sleep(poll_period)
def _find_volume(cs, volume):
"""Get a volume by ID."""
return utils.find_resource(cs.volumes, volume)
def _find_volume_snapshot(cs, snapshot):
"""Get a volume snapshot by ID."""
return utils.find_resource(cs.volume_snapshots, snapshot)
def _print_volume_snapshot(snapshot):
utils.print_dict(snapshot._info)
def _translate_volume_keys(collection):
convert = [('volumeType', 'volume_type')]
for item in collection:
keys = item.__dict__.keys()
for from_key, to_key in convert:
if from_key in keys and to_key not in keys:
setattr(item, to_key, item._info[from_key])
def _translate_volume_snapshot_keys(collection):
convert = [('volumeId', 'volume_id')]
for item in collection:
keys = item.__dict__.keys()
for from_key, to_key in convert:
if from_key in keys and to_key not in keys:
setattr(item, to_key, item._info[from_key])
def _extract_metadata(args):
metadata = {}
for metadatum in args.metadata[0]:
# unset doesn't require a val, so we have the if/else
if '=' in metadatum:
(key, value) = metadatum.split('=', 1)
else:
key = metadatum
value = None
metadata[key] = value
return metadata
@utils.arg('--all-tenants',
dest='all_tenants',
metavar='<0|1>',
nargs='?',
type=int,
const=1,
default=0,
help='Display information from all tenants (Admin only).')
@utils.arg('--all_tenants',
nargs='?',
type=int,
const=1,
help=argparse.SUPPRESS)
@utils.arg('--name',
metavar='<name>',
default=None,
help='Filter results by name')
@utils.arg('--display-name',
help=argparse.SUPPRESS)
@utils.arg('--status',
metavar='<status>',
default=None,
help='Filter results by status')
@utils.service_type('volume')
def do_list(cs, args):
"""List all the volumes."""
# NOTE(thingee): Backwards-compatibility with v1 args
if args.display_name is not None:
args.name = args.display_name
all_tenants = int(os.environ.get("ALL_TENANTS", args.all_tenants))
search_opts = {
'all_tenants': all_tenants,
'name': args.name,
'status': args.status,
}
volumes = cs.volumes.list(search_opts=search_opts)
_translate_volume_keys(volumes)
# Create a list of servers to which the volume is attached
for vol in volumes:
servers = [s.get('server_id') for s in vol.attachments]
setattr(vol, 'attached_to', ','.join(map(str, servers)))
utils.print_list(volumes, ['ID', 'Status', 'Name',
'Size', 'Volume Type', 'Bootable', 'Attached to'])
@utils.arg('volume',
metavar='<volume>',
help='ID of the volume.')
@utils.service_type('volume')
def do_show(cs, args):
"""Show details about a volume."""
info = dict()
volume = _find_volume(cs, args.volume)
info.update(volume._info)
if 'links' in info:
info.pop('links')
utils.print_dict(info)
@utils.arg('size',
metavar='<size>',
type=int,
help='Size of volume in GB')
@utils.arg('--snapshot-id',
metavar='<snapshot-id>',
default=None,
help='Create volume from snapshot id (Optional, Default=None)')
@utils.arg('--snapshot_id',
help=argparse.SUPPRESS)
@utils.arg('--source-volid',
metavar='<source-volid>',
default=None,
help='Create volume from volume id (Optional, Default=None)')
@utils.arg('--source_volid',
help=argparse.SUPPRESS)
@utils.arg('--image-id',
metavar='<image-id>',
default=None,
help='Create volume from image id (Optional, Default=None)')
@utils.arg('--image_id',
help=argparse.SUPPRESS)
@utils.arg('--name',
metavar='<name>',
default=None,
help='Volume name (Optional, Default=None)')
@utils.arg('--display-name',
help=argparse.SUPPRESS)
@utils.arg('--display_name',
help=argparse.SUPPRESS)
@utils.arg('--description',
metavar='<description>',
default=None,
help='Volume description (Optional, Default=None)')
@utils.arg('--display-description',
help=argparse.SUPPRESS)
@utils.arg('--display_description',
help=argparse.SUPPRESS)
@utils.arg('--volume-type',
metavar='<volume-type>',
default=None,
help='Volume type (Optional, Default=None)')
@utils.arg('--volume_type',
help=argparse.SUPPRESS)
@utils.arg('--availability-zone',
metavar='<availability-zone>',
default=None,
help='Availability zone for volume (Optional, Default=None)')
@utils.arg('--availability_zone',
help=argparse.SUPPRESS)
@utils.arg('--metadata',
type=str,
nargs='*',
metavar='<key=value>',
help='Metadata key=value pairs (Optional, Default=None)',
default=None)
@utils.service_type('volume')
def do_create(cs, args):
"""Add a new volume."""
# NOTE(thingee): Backwards-compatibility with v1 args
if args.display_name is not None:
args.name = args.display_name
if args.display_description is not None:
args.description = args.display_description
volume_metadata = None
if args.metadata is not None:
volume_metadata = _extract_metadata(args)
volume = cs.volumes.create(args.size,
args.snapshot_id,
args.source_volid,
args.name,
args.description,
args.volume_type,
availability_zone=args.availability_zone,
imageRef=args.image_id,
metadata=volume_metadata)
info = dict()
volume = cs.volumes.get(info['id'])
info.update(volume._info)
info.pop('links')
utils.print_dict(info)
@utils.arg('volume',
metavar='<volume>',
help='ID of the volume to delete.')
@utils.service_type('volume')
def do_delete(cs, args):
"""Remove a volume."""
volume = _find_volume(cs, args.volume)
volume.delete()
@utils.arg('volume',
metavar='<volume>',
help='ID of the volume to delete.')
@utils.service_type('volume')
def do_force_delete(cs, args):
"""Attempt forced removal of a volume, regardless of it's state."""
volume = _find_volume(cs, args.volume)
volume.force_delete()
@utils.arg('volume',
metavar='<volume>',
help='ID of the volume to rename.')
@utils.arg('name',
nargs='?',
metavar='<name>',
help='New name for the volume.')
@utils.arg('--description', metavar='<description>',
help='Optional volume description. (Default=None)',
default=None)
@utils.arg('--display-description',
help=argparse.SUPPRESS)
@utils.arg('--display_description',
help=argparse.SUPPRESS)
@utils.service_type('volume')
def do_rename(cs, args):
"""Rename a volume."""
kwargs = {}
if args.name is not None:
kwargs['name'] = args.name
if args.display_description is not None:
kwargs['description'] = args.display_description
elif args.description is not None:
kwargs['description'] = args.description
_find_volume(cs, args.volume).update(**kwargs)
@utils.arg('volume',
metavar='<volume>',
help='ID of the volume to update metadata on.')
@utils.arg('action',
metavar='<action>',
choices=['set', 'unset'],
help="Actions: 'set' or 'unset'")
@utils.arg('metadata',
metavar='<key=value>',
nargs='+',
action='append',
default=[],
help='Metadata to set/unset (only key is necessary on unset)')
@utils.service_type('volume')
def do_metadata(cs, args):
"""Set or Delete metadata on a volume."""
volume = _find_volume(cs, args.volume)
metadata = _extract_metadata(args)
if args.action == 'set':
cs.volumes.set_metadata(volume, metadata)
elif args.action == 'unset':
cs.volumes.delete_metadata(volume, metadata.keys())
@utils.arg('--all-tenants',
dest='all_tenants',
metavar='<0|1>',
nargs='?',
type=int,
const=1,
default=0,
help='Display information from all tenants (Admin only).')
@utils.arg('--all_tenants',
nargs='?',
type=int,
const=1,
help=argparse.SUPPRESS)
@utils.arg('--name',
metavar='<name>',
default=None,
help='Filter results by name')
@utils.arg('--display-name',
help=argparse.SUPPRESS)
@utils.arg('--display_name',
help=argparse.SUPPRESS)
@utils.arg('--status',
metavar='<status>',
default=None,
help='Filter results by status')
@utils.arg('--volume-id',
metavar='<volume-id>',
default=None,
help='Filter results by volume-id')
@utils.arg('--volume_id',
help=argparse.SUPPRESS)
@utils.service_type('volume')
def do_snapshot_list(cs, args):
"""List all the snapshots."""
all_tenants = int(os.environ.get("ALL_TENANTS", args.all_tenants))
if args.display_name is not None:
args.name = args.display_name
search_opts = {
'all_tenants': all_tenants,
'display_name': args.name,
'status': args.status,
'volume_id': args.volume_id,
}
snapshots = cs.volume_snapshots.list(search_opts=search_opts)
_translate_volume_snapshot_keys(snapshots)
utils.print_list(snapshots,
['ID', 'Volume ID', 'Status', 'Name', 'Size'])
@utils.arg('snapshot',
metavar='<snapshot>',
help='ID of the snapshot.')
@utils.service_type('volume')
def do_snapshot_show(cs, args):
"""Show details about a snapshot."""
snapshot = _find_volume_snapshot(cs, args.snapshot)
_print_volume_snapshot(snapshot)
@utils.arg('volume-id',
metavar='<volume-id>',
help='ID of the volume to snapshot')
@utils.arg('--force',
metavar='<True|False>',
help='Optional flag to indicate whether '
'to snapshot a volume even if it\'s '
'attached to an instance. (Default=False)',
default=False)
@utils.arg('--name',
metavar='<name>',
default=None,
help='Optional snapshot name. (Default=None)')
@utils.arg('--display-name',
help=argparse.SUPPRESS)
@utils.arg('--display_name',
help=argparse.SUPPRESS)
@utils.arg('--description',
metavar='<description>',
default=None,
help='Optional snapshot description. (Default=None)')
@utils.arg('--display-description',
help=argparse.SUPPRESS)
@utils.arg('--display_description',
help=argparse.SUPPRESS)
@utils.service_type('volume')
def do_snapshot_create(cs, args):
"""Add a new snapshot."""
if args.display_name is not None:
args.name = args.display_name
if args.display_description is not None:
args.description = args.display_description
snapshot = cs.volume_snapshots.create(args.volume_id,
args.force,
args.name,
args.description)
_print_volume_snapshot(snapshot)
@utils.arg('snapshot-id',
metavar='<snapshot-id>',
help='ID of the snapshot to delete.')
@utils.service_type('volume')
def do_snapshot_delete(cs, args):
"""Remove a snapshot."""
snapshot = _find_volume_snapshot(cs, args.snapshot_id)
snapshot.delete()
@utils.arg('snapshot', metavar='<snapshot>', help='ID of the snapshot.')
@utils.arg('name', nargs='?', metavar='<name>',
help='New name for the snapshot.')
@utils.arg('--description', metavar='<description>',
help='Optional snapshot description. (Default=None)',
default=None)
@utils.arg('--display-description',
help=argparse.SUPPRESS)
@utils.arg('--display_description',
help=argparse.SUPPRESS)
@utils.service_type('volume')
def do_snapshot_rename(cs, args):
"""Rename a snapshot."""
kwargs = {}
if args.name is not None:
kwargs['name'] = args.name
if args.description is not None:
kwargs['description'] = args.description
elif args.display_description is not None:
kwargs['description'] = args.display_description
_find_volume_snapshot(cs, args.snapshot).update(**kwargs)
def _print_volume_type_list(vtypes):
utils.print_list(vtypes, ['ID', 'Name'])
def _print_type_and_extra_specs_list(vtypes):
formatters = {'extra_specs': _print_type_extra_specs}
utils.print_list(vtypes, ['ID', 'Name', 'extra_specs'], formatters)
@utils.service_type('volume')
def do_type_list(cs, args):
"""Print a list of available 'volume types'."""
vtypes = cs.volume_types.list()
_print_volume_type_list(vtypes)
@utils.service_type('volume')
def do_extra_specs_list(cs, args):
"""Print a list of current 'volume types and extra specs' (Admin Only)."""
vtypes = cs.volume_types.list()
_print_type_and_extra_specs_list(vtypes)
@utils.arg('name',
metavar='<name>',
help="Name of the new volume type")
@utils.service_type('volume')
def do_type_create(cs, args):
"""Create a new volume type."""
vtype = cs.volume_types.create(args.name)
_print_volume_type_list([vtype])
@utils.arg('id',
metavar='<id>',
help="Unique ID of the volume type to delete")
@utils.service_type('volume')
def do_type_delete(cs, args):
"""Delete a specific volume type"""
cs.volume_types.delete(args.id)
@utils.arg('vtype',
metavar='<vtype>',
help="Name or ID of the volume type")
@utils.arg('action',
metavar='<action>',
choices=['set', 'unset'],
help="Actions: 'set' or 'unset'")
@utils.arg('metadata',
metavar='<key=value>',
nargs='+',
action='append',
default=[],
help='Extra_specs to set/unset (only key is necessary on unset)')
@utils.service_type('volume')
def do_type_key(cs, args):
"Set or unset extra_spec for a volume type."""
vtype = _find_volume_type(cs, args.vtype)
keypair = _extract_metadata(args)
if args.action == 'set':
vtype.set_keys(keypair)
elif args.action == 'unset':
vtype.unset_keys(keypair.keys())
def do_endpoints(cs, args):
"""Discover endpoints that get returned from the authenticate services"""
catalog = cs.client.service_catalog.catalog
for e in catalog['access']['serviceCatalog']:
utils.print_dict(e['endpoints'][0], e['name'])
def do_credentials(cs, args):
"""Show user credentials returned from auth"""
catalog = cs.client.service_catalog.catalog
utils.print_dict(catalog['access']['user'], "User Credentials")
utils.print_dict(catalog['access']['token'], "Token")
_quota_resources = ['volumes', 'gigabytes']
def _quota_show(quotas):
quota_dict = {}
for resource in _quota_resources:
quota_dict[resource] = getattr(quotas, resource, None)
utils.print_dict(quota_dict)
def _quota_update(manager, identifier, args):
updates = {}
for resource in _quota_resources:
val = getattr(args, resource, None)
if val is not None:
updates[resource] = val
if updates:
manager.update(identifier, **updates)
@utils.arg('tenant',
metavar='<tenant_id>',
help='UUID of tenant to list the quotas for.')
@utils.service_type('volume')
def do_quota_show(cs, args):
"""List the quotas for a tenant."""
_quota_show(cs.quotas.get(args.tenant))
@utils.arg('tenant',
metavar='<tenant_id>',
help='UUID of tenant to list the default quotas for.')
@utils.service_type('volume')
def do_quota_defaults(cs, args):
"""List the default quotas for a tenant."""
_quota_show(cs.quotas.defaults(args.tenant))
@utils.arg('tenant',
metavar='<tenant_id>',
help='UUID of tenant to set the quotas for.')
@utils.arg('--volumes',
metavar='<volumes>',
type=int, default=None,
help='New value for the "volumes" quota.')
@utils.arg('--gigabytes',
metavar='<gigabytes>',
type=int, default=None,
help='New value for the "gigabytes" quota.')
@utils.service_type('volume')
def do_quota_update(cs, args):
"""Update the quotas for a tenant."""
_quota_update(cs.quotas, args.tenant, args)
@utils.arg('class_name',
metavar='<class>',
help='Name of quota class to list the quotas for.')
@utils.service_type('volume')
def do_quota_class_show(cs, args):
"""List the quotas for a quota class."""
_quota_show(cs.quota_classes.get(args.class_name))
@utils.arg('class-name',
metavar='<class-name>',
help='Name of quota class to set the quotas for.')
@utils.arg('--volumes',
metavar='<volumes>',
type=int, default=None,
help='New value for the "volumes" quota.')
@utils.arg('--gigabytes',
metavar='<gigabytes>',
type=int, default=None,
help='New value for the "gigabytes" quota.')
@utils.service_type('volume')
def do_quota_class_update(cs, args):
"""Update the quotas for a quota class."""
_quota_update(cs.quota_classes, args.class_name, args)
@utils.service_type('volume')
def do_absolute_limits(cs, args):
"""Print a list of absolute limits for a user"""
limits = cs.limits.get().absolute
columns = ['Name', 'Value']
utils.print_list(limits, columns)
@utils.service_type('volume')
def do_rate_limits(cs, args):
"""Print a list of rate limits for a user"""
limits = cs.limits.get().rate
columns = ['Verb', 'URI', 'Value', 'Remain', 'Unit', 'Next_Available']
utils.print_list(limits, columns)
def _print_type_extra_specs(vol_type):
try:
return vol_type.get_keys()
except exceptions.NotFound:
return "N/A"
def _find_volume_type(cs, vtype):
"""Get a volume type by name or ID."""
return utils.find_resource(cs.volume_types, vtype)
@utils.arg('volume-id',
metavar='<volume-id>',
help='ID of the volume to snapshot')
@utils.arg('--force',
metavar='<True|False>',
help='Optional flag to indicate whether '
'to upload a volume even if it\'s '
'attached to an instance. (Default=False)',
default=False)
@utils.arg('--container-format',
metavar='<container-format>',
help='Optional type for container format '
'(Default=bare)',
default='bare')
@utils.arg('--container_format',
help=argparse.SUPPRESS)
@utils.arg('--disk-format',
metavar='<disk-format>',
help='Optional type for disk format '
'(Default=raw)',
default='raw')
@utils.arg('--disk_format',
help=argparse.SUPPRESS)
@utils.arg('image-name',
metavar='<image-name>',
help='Name for created image')
@utils.arg('--image_name',
help=argparse.SUPPRESS)
@utils.service_type('volume')
def do_upload_to_image(cs, args):
"""Upload volume to image service as image."""
volume = _find_volume(cs, args.volume_id)
volume.upload_to_image(args.force,
args.image_name,
args.container_format,
args.disk_format)

@ -0,0 +1,116 @@
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Volume snapshot interface (1.1 extension)."""
import urllib
from cinderclient import base
class Snapshot(base.Resource):
"""A Snapshot is a point-in-time snapshot of an openstack volume."""
def __repr__(self):
return "<Snapshot: %s>" % self.id
def delete(self):
"""Delete this snapshot."""
self.manager.delete(self)
def update(self, **kwargs):
"""Update the name or description for this snapshot."""
self.manager.update(self, **kwargs)
@property
def progress(self):
return self._info.get('os-extended-snapshot-attributes:progress')
@property
def project_id(self):
return self._info.get('os-extended-snapshot-attributes:project_id')
class SnapshotManager(base.ManagerWithFind):
"""Manage :class:`Snapshot` resources."""
resource_class = Snapshot
def create(self, volume_id, force=False,
name=None, description=None):
"""Create a snapshot of the given volume.
:param volume_id: The ID of the volume to snapshot.
:param force: If force is True, create a snapshot even if the volume is
attached to an instance. Default is False.
:param name: Name of the snapshot
:param description: Description of the snapshot
:rtype: :class:`Snapshot`
"""
body = {'snapshot': {'volume_id': volume_id,
'force': force,
'name': name,
'description': description}}
return self._create('/snapshots', body, 'snapshot')
def get(self, snapshot_id):
"""Get a snapshot.
:param snapshot_id: The ID of the snapshot to get.
:rtype: :class:`Snapshot`
"""
return self._get("/snapshots/%s" % snapshot_id, "snapshot")
def list(self, detailed=True, search_opts=None):
"""Get a list of all snapshots.
:rtype: list of :class:`Snapshot`
"""
if search_opts is None:
search_opts = {}
qparams = {}
for opt, val in search_opts.iteritems():
if val:
qparams[opt] = val
query_string = "?%s" % urllib.urlencode(qparams) if qparams else ""
detail = ""
if detailed:
detail = "/detail"
return self._list("/snapshots%s%s" % (detail, query_string),
"snapshots")
def delete(self, snapshot):
"""Delete a snapshot.
:param snapshot: The :class:`Snapshot` to delete.
"""
self._delete("/snapshots/%s" % base.getid(snapshot))
def update(self, snapshot, **kwargs):
"""Update the name or description for a snapshot.
:param snapshot: The :class:`Snapshot` to delete.
"""
if not kwargs:
return
body = {"snapshot": kwargs}
self._update("/snapshots/%s" % base.getid(snapshot), body)

@ -0,0 +1,108 @@
# Copyright 2013 OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Volume Type interface."""
from cinderclient import base
class VolumeType(base.Resource):
"""A Volume Type is the type of volume to be created."""
def __repr__(self):
return "<VolumeType: %s>" % self.name
def get_keys(self):
"""Get extra specs from a volume type.
:param vol_type: The :class:`VolumeType` to get extra specs from
"""
_resp, body = self.manager.api.client.get(
"/types/%s/extra_specs" %
base.getid(self))
return body["extra_specs"]
def set_keys(self, metadata):
"""Set extra specs on a volume type.
:param type : The :class:`VolumeType` to set extra spec on
:param metadata: A dict of key/value pairs to be set
"""
body = {'extra_specs': metadata}
return self.manager._create(
"/types/%s/extra_specs" % base.getid(self),
body,
"extra_specs",
return_raw=True)
def unset_keys(self, keys):
"""Unset extra specs on a volue type.
:param type_id: The :class:`VolumeType` to unset extra spec on
:param keys: A list of keys to be unset
"""
# NOTE(jdg): This wasn't actually doing all of the keys before
# the return in the loop resulted in ony ONE key being unset.
# since on success the return was NONE, we'll only interrupt the loop
# and return if there's an error
for k in keys:
resp = self.manager._delete(
"/types/%s/extra_specs/%s" % (
base.getid(self), k))
if resp is not None:
return resp
class VolumeTypeManager(base.ManagerWithFind):
"""Manage :class:`VolumeType` resources."""
resource_class = VolumeType
def list(self):
"""Get a list of all volume types.
:rtype: list of :class:`VolumeType`.
"""
return self._list("/types", "volume_types")
def get(self, volume_type):
"""Get a specific volume type.
:param volume_type: The ID of the :class:`VolumeType` to get.
:rtype: :class:`VolumeType`
"""
return self._get("/types/%s" % base.getid(volume_type), "volume_type")
def delete(self, volume_type):
"""Delete a specific volume_type.
:param volume_type: The ID of the :class:`VolumeType` to get.
"""
self._delete("/types/%s" % base.getid(volume_type))
def create(self, name):
"""Create a volume type.
:param name: Descriptive name of the volume type
:rtype: :class:`VolumeType`
"""
body = {
"volume_type": {
"name": name,
}
}
return self._create("/types", body, "volume_type")

308
cinderclient/v2/volumes.py Normal file

@ -0,0 +1,308 @@
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Volume interface (v2 extension)."""
import urllib
from cinderclient import base
class Volume(base.Resource):
"""A volume is an extra block level storage to the OpenStack instances."""
def __repr__(self):
return "<Volume: %s>" % self.id
def delete(self):
"""Delete this volume."""
self.manager.delete(self)
def update(self, **kwargs):
"""Update the name or description for this volume."""
self.manager.update(self, **kwargs)
def attach(self, instance_uuid, mountpoint):
"""Set attachment metadata.
:param instance_uuid: uuid of the attaching instance.
:param mountpoint: mountpoint on the attaching instance.
"""
return self.manager.attach(self, instance_uuid, mountpoint)
def detach(self):
"""Clear attachment metadata."""
return self.manager.detach(self)
def reserve(self, volume):
"""Reserve this volume."""
return self.manager.reserve(self)
def unreserve(self, volume):
"""Unreserve this volume."""
return self.manager.unreserve(self)
def begin_detaching(self, volume):
"""Begin detaching volume."""
return self.manager.begin_detaching(self)
def roll_detaching(self, volume):
"""Roll detaching volume."""
return self.manager.roll_detaching(self)
def initialize_connection(self, volume, connector):
"""Initialize a volume connection.
:param connector: connector dict from nova.
"""
return self.manager.initialize_connection(self, connector)
def terminate_connection(self, volume, connector):
"""Terminate a volume connection.
:param connector: connector dict from nova.
"""
return self.manager.terminate_connection(self, connector)
def set_metadata(self, volume, metadata):
"""Set or Append metadata to a volume.
:param type : The :class: `Volume` to set metadata on
:param metadata: A dict of key/value pairs to set
"""
return self.manager.set_metadata(self, metadata)
def upload_to_image(self, force, image_name, container_format,
disk_format):
"""Upload a volume to image service as an image."""
self.manager.upload_to_image(self, force, image_name, container_format,
disk_format)
def force_delete(self):
"""Delete the specififed volume ignoring it's current state.
:param volume: The UUID of the volume to force-delete.
"""
self.manager.force_delete(self)
class VolumeManager(base.ManagerWithFind):
"""Manage :class:`Volume` resources."""
resource_class = Volume
def create(self, size, snapshot_id=None, source_volid=None,
name=None, description=None,
volume_type=None, user_id=None,
project_id=None, availability_zone=None,
metadata=None, imageRef=None):
"""Create a volume.
:param size: Size of volume in GB
:param snapshot_id: ID of the snapshot
:param name: Name of the volume
:param description: Description of the volume
:param volume_type: Type of volume
:rtype: :class:`Volume`
:param user_id: User id derived from context
:param project_id: Project id derived from context
:param availability_zone: Availability Zone to use
:param metadata: Optional metadata to set on volume creation
:param imageRef: reference to an image stored in glance
:param source_volid: ID of source volume to clone from
"""
if metadata is None:
volume_metadata = {}
else:
volume_metadata = metadata
body = {'volume': {'size': size,
'snapshot_id': snapshot_id,
'name': name,
'description': description,
'volume_type': volume_type,
'user_id': user_id,
'project_id': project_id,
'availability_zone': availability_zone,
'status': "creating",
'attach_status': "detached",
'metadata': volume_metadata,
'imageRef': imageRef,
'source_volid': source_volid,
}}
return self._create('/volumes', body, 'volume')
def get(self, volume_id):
"""Get a volume.
:param volume_id: The ID of the volume to delete.
:rtype: :class:`Volume`
"""
return self._get("/volumes/%s" % volume_id, "volume")
def list(self, detailed=True, search_opts=None):
"""Get a list of all volumes.
:rtype: list of :class:`Volume`
"""
if search_opts is None:
search_opts = {}
qparams = {}
for opt, val in search_opts.iteritems():
if val:
qparams[opt] = val
query_string = "?%s" % urllib.urlencode(qparams) if qparams else ""
detail = ""
if detailed:
detail = "/detail"
return self._list("/volumes%s%s" % (detail, query_string),
"volumes")
def delete(self, volume):
"""Delete a volume.
:param volume: The :class:`Volume` to delete.
"""
self._delete("/volumes/%s" % base.getid(volume))
def update(self, volume, **kwargs):
"""Update the name or description for a volume.
:param volume: The :class:`Volume` to delete.
"""
if not kwargs:
return
body = {"volume": kwargs}
self._update("/volumes/%s" % base.getid(volume), body)
def _action(self, action, volume, info=None, **kwargs):
"""Perform a volume "action."
"""
body = {action: info}
self.run_hooks('modify_body_for_action', body, **kwargs)
url = '/volumes/%s/action' % base.getid(volume)
return self.api.client.post(url, body=body)
def attach(self, volume, instance_uuid, mountpoint):
"""Set attachment metadata.
:param volume: The :class:`Volume` (or its ID)
you would like to attach.
:param instance_uuid: uuid of the attaching instance.
:param mountpoint: mountpoint on the attaching instance.
"""
return self._action('os-attach',
volume,
{'instance_uuid': instance_uuid,
'mountpoint': mountpoint})
def detach(self, volume):
"""Clear attachment metadata.
:param volume: The :class:`Volume` (or its ID)
you would like to detach.
"""
return self._action('os-detach', volume)
def reserve(self, volume):
"""Reserve this volume.
:param volume: The :class:`Volume` (or its ID)
you would like to reserve.
"""
return self._action('os-reserve', volume)
def unreserve(self, volume):
"""Unreserve this volume.
:param volume: The :class:`Volume` (or its ID)
you would like to unreserve.
"""
return self._action('os-unreserve', volume)
def begin_detaching(self, volume):
"""Begin detaching this volume.
:param volume: The :class:`Volume` (or its ID)
you would like to detach.
"""
return self._action('os-begin_detaching', volume)
def roll_detaching(self, volume):
"""Roll detaching this volume.
:param volume: The :class:`Volume` (or its ID)
you would like to roll detaching.
"""
return self._action('os-roll_detaching', volume)
def initialize_connection(self, volume, connector):
"""Initialize a volume connection.
:param volume: The :class:`Volume` (or its ID).
:param connector: connector dict from nova.
"""
return self._action('os-initialize_connection', volume,
{'connector': connector})[1]['connection_info']
def terminate_connection(self, volume, connector):
"""Terminate a volume connection.
:param volume: The :class:`Volume` (or its ID).
:param connector: connector dict from nova.
"""
self._action('os-terminate_connection', volume,
{'connector': connector})
def set_metadata(self, volume, metadata):
"""Update/Set a volumes metadata.
:param volume: The :class:`Volume`.
:param metadata: A list of keys to be set.
"""
body = {'metadata': metadata}
return self._create("/volumes/%s/metadata" % base.getid(volume),
body, "metadata")
def delete_metadata(self, volume, keys):
"""Delete specified keys from volumes metadata.
:param volume: The :class:`Volume`.
:param metadata: A list of keys to be removed.
"""
for k in keys:
self._delete("/volumes/%s/metadata/%s" % (base.getid(volume), k))
def upload_to_image(self, volume, force, image_name, container_format,
disk_format):
"""Upload volume to image service as image.
:param volume: The :class:`Volume` to upload.
"""
return self._action('os-volume_upload_image',
volume,
{'force': force,
'image_name': image_name,
'container_format': container_format,
'disk_format': disk_format})
def force_delete(self, volume):
return self._action('os-force_delete', base.getid(volume))

@ -1,6 +1,7 @@
import cinderclient.client
import cinderclient.v1.client
import cinderclient.v2.client
from tests import utils
@ -10,6 +11,10 @@ class ClientTest(utils.TestCase):
output = cinderclient.client.get_client_class('1')
self.assertEqual(output, cinderclient.v1.client.Client)
def test_get_client_class_v2(self):
output = cinderclient.client.get_client_class('2')
self.assertEqual(output, cinderclient.v2.client.Client)
def test_get_client_class_unknown(self):
self.assertRaises(cinderclient.exceptions.UnsupportedVersion,
cinderclient.client.get_client_class, '0')

@ -32,7 +32,7 @@ class ShellTest(utils.TestCase):
'CINDER_USERNAME': 'username',
'CINDER_PASSWORD': 'password',
'CINDER_PROJECT_ID': 'project_id',
'OS_COMPUTE_API_VERSION': '1.1',
'OS_VOLUME_API_VERSION': '1.1',
'CINDER_URL': 'http://no.where',
}

15
tests/v2/__init__.py Normal file

@ -0,0 +1,15 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

@ -0,0 +1,36 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient import extension
from cinderclient.v2.contrib import list_extensions
from tests import utils
from tests.v1 import fakes
extensions = [
extension.Extension(list_extensions.__name__.split(".")[-1],
list_extensions),
]
cs = fakes.FakeClient(extensions=extensions)
class ListExtensionsTests(utils.TestCase):
def test_list_extensions(self):
all_exts = cs.list_extensions.show_all()
cs.assert_called('GET', '/extensions')
self.assertTrue(len(all_exts) > 0)
for r in all_exts:
self.assertTrue(len(r.summary) > 0)

317
tests/v2/fakes.py Normal file

@ -0,0 +1,317 @@
# Copyright 2013 OpenStack, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import urlparse
from cinderclient import client as base_client
from cinderclient.v2 import client
from tests import fakes
import tests.utils as utils
def _stub_volume(**kwargs):
volume = {
'id': '1234',
'name': None,
'description': None,
"attachments": [],
"bootable": "false",
"availability_zone": "cinder",
"created_at": "2012-08-27T00:00:00.000000",
"id": '00000000-0000-0000-0000-000000000000',
"metadata": {},
"size": 1,
"snapshot_id": None,
"status": "available",
"volume_type": "None",
"links": [
{
"href": "http://localhost/v2/fake/volumes/1234",
"rel": "self"
},
{
"href": "http://localhost/fake/volumes/1234",
"rel": "bookmark"
}
],
}
volume.update(kwargs)
return volume
def _stub_snapshot(**kwargs):
snapshot = {
"created_at": "2012-08-28T16:30:31.000000",
"display_description": None,
"display_name": None,
"id": '11111111-1111-1111-1111-111111111111',
"size": 1,
"status": "available",
"volume_id": '00000000-0000-0000-0000-000000000000',
}
snapshot.update(kwargs)
return snapshot
class FakeClient(fakes.FakeClient, client.Client):
def __init__(self, *args, **kwargs):
client.Client.__init__(self, 'username', 'password',
'project_id', 'auth_url',
extensions=kwargs.get('extensions'))
self.client = FakeHTTPClient(**kwargs)
class FakeHTTPClient(base_client.HTTPClient):
def __init__(self, **kwargs):
self.username = 'username'
self.password = 'password'
self.auth_url = 'auth_url'
self.callstack = []
def _cs_request(self, url, method, **kwargs):
# Check that certain things are called correctly
if method in ['GET', 'DELETE']:
assert 'body' not in kwargs
elif method == 'PUT':
assert 'body' in kwargs
# Call the method
args = urlparse.parse_qsl(urlparse.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit('?', 1)[0]
munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
munged_url = munged_url.replace('-', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
# Note the call
self.callstack.append((method, url, kwargs.get('body', None)))
status, headers, body = getattr(self, callback)(**kwargs)
r = utils.TestResponse({
"status_code": status,
"text": body,
"headers": headers,
})
return r, body
if hasattr(status, 'items'):
return utils.TestResponse(status), body
else:
return utils.TestResponse({"status": status}), body
#
# Snapshots
#
def get_snapshots_detail(self, **kw):
return (200, {}, {'snapshots': [
_stub_snapshot(),
]})
def get_snapshots_1234(self, **kw):
return (200, {}, {'snapshot': _stub_snapshot(id='1234')})
def put_snapshots_1234(self, **kw):
snapshot = _stub_snapshot(id='1234')
snapshot.update(kw['body']['snapshot'])
return (200, {}, {'snapshot': snapshot})
#
# Volumes
#
def put_volumes_1234(self, **kw):
volume = _stub_volume(id='1234')
volume.update(kw['body']['volume'])
return (200, {}, {'volume': volume})
def get_volumes(self, **kw):
return (200, {}, {"volumes": [
{'id': 1234, 'name': 'sample-volume'},
{'id': 5678, 'name': 'sample-volume2'}
]})
# TODO(jdg): This will need to change
# at the very least it's not complete
def get_volumes_detail(self, **kw):
return (200, {}, {"volumes": [
{'id': 1234,
'name': 'sample-volume',
'attachments': [{'server_id': 1234}]},
]})
def get_volumes_1234(self, **kw):
r = {'volume': self.get_volumes_detail()[2]['volumes'][0]}
return (200, {}, r)
def post_volumes_1234_action(self, body, **kw):
_body = None
resp = 202
assert len(body.keys()) == 1
action = body.keys()[0]
if action == 'os-attach':
assert body[action].keys() == ['instance_uuid', 'mountpoint']
elif action == 'os-detach':
assert body[action] is None
elif action == 'os-reserve':
assert body[action] is None
elif action == 'os-unreserve':
assert body[action] is None
elif action == 'os-initialize_connection':
assert body[action].keys() == ['connector']
return (202, {}, {'connection_info': 'foos'})
elif action == 'os-terminate_connection':
assert body[action].keys() == ['connector']
elif action == 'os-begin_detaching':
assert body[action] is None
elif action == 'os-roll_detaching':
assert body[action] is None
else:
raise AssertionError("Unexpected server action: %s" % action)
return (resp, {}, _body)
def post_volumes(self, **kw):
return (202, {}, {'volume': {}})
def delete_volumes_1234(self, **kw):
return (202, {}, None)
#
# Quotas
#
def get_os_quota_sets_test(self, **kw):
return (200, {}, {'quota_set': {
'tenant_id': 'test',
'metadata_items': [],
'volumes': 1,
'gigabytes': 1}})
def get_os_quota_sets_test_defaults(self):
return (200, {}, {'quota_set': {
'tenant_id': 'test',
'metadata_items': [],
'volumes': 1,
'gigabytes': 1}})
def put_os_quota_sets_test(self, body, **kw):
assert body.keys() == ['quota_set']
fakes.assert_has_keys(body['quota_set'],
required=['tenant_id'])
return (200, {}, {'quota_set': {
'tenant_id': 'test',
'metadata_items': [],
'volumes': 2,
'gigabytes': 1}})
#
# Quota Classes
#
def get_os_quota_class_sets_test(self, **kw):
return (200, {}, {'quota_class_set': {
'class_name': 'test',
'metadata_items': [],
'volumes': 1,
'gigabytes': 1}})
def put_os_quota_class_sets_test(self, body, **kw):
assert body.keys() == ['quota_class_set']
fakes.assert_has_keys(body['quota_class_set'],
required=['class_name'])
return (200, {}, {'quota_class_set': {
'class_name': 'test',
'metadata_items': [],
'volumes': 2,
'gigabytes': 1}})
#
# VolumeTypes
#
def get_types(self, **kw):
return (200, {}, {
'volume_types': [{'id': 1,
'name': 'test-type-1',
'extra_specs':{}},
{'id': 2,
'name': 'test-type-2',
'extra_specs':{}}]})
def get_types_1(self, **kw):
return (200, {}, {'volume_type': {'id': 1,
'name': 'test-type-1',
'extra_specs': {}}})
def post_types(self, body, **kw):
return (202, {}, {'volume_type': {'id': 3,
'name': 'test-type-3',
'extra_specs': {}}})
def post_types_1_extra_specs(self, body, **kw):
assert body.keys() == ['extra_specs']
return (200, {}, {'extra_specs': {'k': 'v'}})
def delete_types_1_extra_specs_k(self, **kw):
return(204, {}, None)
def delete_types_1(self, **kw):
return (202, {}, None)
#
# Set/Unset metadata
#
def delete_volumes_1234_metadata_test_key(self, **kw):
return (204, {}, None)
def delete_volumes_1234_metadata_key1(self, **kw):
return (204, {}, None)
def delete_volumes_1234_metadata_key2(self, **kw):
return (204, {}, None)
def post_volumes_1234_metadata(self, **kw):
return (204, {}, {'metadata': {'test_key': 'test_value'}})
#
# List all extensions
#
def get_extensions(self, **kw):
exts = [
{
"alias": "FAKE-1",
"description": "Fake extension number 1",
"links": [],
"name": "Fake1",
"namespace": ("http://docs.openstack.org/"
"/ext/fake1/api/v1.1"),
"updated": "2011-06-09T00:00:00+00:00"
},
{
"alias": "FAKE-2",
"description": "Fake extension number 2",
"links": [],
"name": "Fake2",
"namespace": ("http://docs.openstack.org/"
"/ext/fake1/api/v1.1"),
"updated": "2011-06-09T00:00:00+00:00"
},
]
return (200, {}, {"extensions": exts, })

390
tests/v2/test_auth.py Normal file

@ -0,0 +1,390 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
import requests
from cinderclient import exceptions
from cinderclient.v2 import client
from tests import utils
class AuthenticateAgainstKeystoneTests(utils.TestCase):
def test_authenticate_success(self):
cs = client.Client("username", "password", "project_id",
"auth_url/v2.0", service_type='compute')
resp = {
"access": {
"token": {
"expires": "12345",
"id": "FAKE_ID",
},
"serviceCatalog": [
{
"type": "compute",
"endpoints": [
{
"region": "RegionOne",
"adminURL": "http://localhost:8774/v2",
"internalURL": "http://localhost:8774/v2",
"publicURL": "http://localhost:8774/v2/",
},
],
},
],
},
}
auth_response = utils.TestResponse({
"status_code": 200,
"text": json.dumps(resp),
})
mock_request = mock.Mock(return_value=(auth_response))
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
cs.client.authenticate()
headers = {
'User-Agent': cs.client.USER_AGENT,
'Content-Type': 'application/json',
'Accept': 'application/json',
}
body = {
'auth': {
'passwordCredentials': {
'username': cs.client.user,
'password': cs.client.password,
},
'tenantName': cs.client.projectid,
},
}
token_url = cs.client.auth_url + "/tokens"
mock_request.assert_called_with(
"POST",
token_url,
headers=headers,
data=json.dumps(body),
allow_redirects=True,
**self.TEST_REQUEST_BASE)
endpoints = resp["access"]["serviceCatalog"][0]['endpoints']
public_url = endpoints[0]["publicURL"].rstrip('/')
self.assertEqual(cs.client.management_url, public_url)
token_id = resp["access"]["token"]["id"]
self.assertEqual(cs.client.auth_token, token_id)
test_auth_call()
def test_authenticate_tenant_id(self):
cs = client.Client("username", "password", auth_url="auth_url/v2.0",
tenant_id='tenant_id', service_type='compute')
resp = {
"access": {
"token": {
"expires": "12345",
"id": "FAKE_ID",
"tenant": {
"description": None,
"enabled": True,
"id": "tenant_id",
"name": "demo"
} # tenant associated with token
},
"serviceCatalog": [
{
"type": "compute",
"endpoints": [
{
"region": "RegionOne",
"adminURL": "http://localhost:8774/v2",
"internalURL": "http://localhost:8774/v2",
"publicURL": "http://localhost:8774/v2/",
},
],
},
],
},
}
auth_response = utils.TestResponse({
"status_code": 200,
"text": json.dumps(resp),
})
mock_request = mock.Mock(return_value=(auth_response))
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
cs.client.authenticate()
headers = {
'User-Agent': cs.client.USER_AGENT,
'Content-Type': 'application/json',
'Accept': 'application/json',
}
body = {
'auth': {
'passwordCredentials': {
'username': cs.client.user,
'password': cs.client.password,
},
'tenantId': cs.client.tenant_id,
},
}
token_url = cs.client.auth_url + "/tokens"
mock_request.assert_called_with(
"POST",
token_url,
headers=headers,
data=json.dumps(body),
allow_redirects=True,
**self.TEST_REQUEST_BASE)
endpoints = resp["access"]["serviceCatalog"][0]['endpoints']
public_url = endpoints[0]["publicURL"].rstrip('/')
self.assertEqual(cs.client.management_url, public_url)
token_id = resp["access"]["token"]["id"]
self.assertEqual(cs.client.auth_token, token_id)
tenant_id = resp["access"]["token"]["tenant"]["id"]
self.assertEqual(cs.client.tenant_id, tenant_id)
test_auth_call()
def test_authenticate_failure(self):
cs = client.Client("username", "password", "project_id",
"auth_url/v2.0")
resp = {"unauthorized": {"message": "Unauthorized", "code": "401"}}
auth_response = utils.TestResponse({
"status_code": 401,
"text": json.dumps(resp),
})
mock_request = mock.Mock(return_value=(auth_response))
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
self.assertRaises(exceptions.Unauthorized, cs.client.authenticate)
test_auth_call()
def test_auth_redirect(self):
cs = client.Client("username", "password", "project_id",
"auth_url/v2", service_type='compute')
dict_correct_response = {
"access": {
"token": {
"expires": "12345",
"id": "FAKE_ID",
},
"serviceCatalog": [
{
"type": "compute",
"endpoints": [
{
"adminURL": "http://localhost:8774/v2",
"region": "RegionOne",
"internalURL": "http://localhost:8774/v2",
"publicURL": "http://localhost:8774/v2/",
},
],
},
],
},
}
correct_response = json.dumps(dict_correct_response)
dict_responses = [
{"headers": {'location':'http://127.0.0.1:5001'},
"status_code": 305,
"text": "Use proxy"},
# Configured on admin port, cinder redirects to v2.0 port.
# When trying to connect on it, keystone auth succeed by v1.0
# protocol (through headers) but tokens are being returned in
# body (looks like keystone bug). Leaved for compatibility.
{"headers": {},
"status_code": 200,
"text": correct_response},
{"headers": {},
"status_code": 200,
"text": correct_response}
]
responses = [(utils.TestResponse(resp)) for resp in dict_responses]
def side_effect(*args, **kwargs):
return responses.pop(0)
mock_request = mock.Mock(side_effect=side_effect)
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
cs.client.authenticate()
headers = {
'User-Agent': cs.client.USER_AGENT,
'Content-Type': 'application/json',
'Accept': 'application/json',
}
body = {
'auth': {
'passwordCredentials': {
'username': cs.client.user,
'password': cs.client.password,
},
'tenantName': cs.client.projectid,
},
}
token_url = cs.client.auth_url + "/tokens"
mock_request.assert_called_with(
"POST",
token_url,
headers=headers,
data=json.dumps(body),
allow_redirects=True,
**self.TEST_REQUEST_BASE)
resp = dict_correct_response
endpoints = resp["access"]["serviceCatalog"][0]['endpoints']
public_url = endpoints[0]["publicURL"].rstrip('/')
self.assertEqual(cs.client.management_url, public_url)
token_id = resp["access"]["token"]["id"]
self.assertEqual(cs.client.auth_token, token_id)
test_auth_call()
def test_ambiguous_endpoints(self):
cs = client.Client("username", "password", "project_id",
"auth_url/v2.0", service_type='compute')
resp = {
"access": {
"token": {
"expires": "12345",
"id": "FAKE_ID",
},
"serviceCatalog": [
{
"adminURL": "http://localhost:8774/v2",
"type": "compute",
"name": "Compute CLoud",
"endpoints": [
{
"region": "RegionOne",
"internalURL": "http://localhost:8774/v2",
"publicURL": "http://localhost:8774/v2/",
},
],
},
{
"adminURL": "http://localhost:8774/v2",
"type": "compute",
"name": "Hyper-compute Cloud",
"endpoints": [
{
"internalURL": "http://localhost:8774/v2",
"publicURL": "http://localhost:8774/v2/",
},
],
},
],
},
}
auth_response = utils.TestResponse({
"status_code": 200,
"text": json.dumps(resp),
})
mock_request = mock.Mock(return_value=(auth_response))
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
self.assertRaises(exceptions.AmbiguousEndpoints,
cs.client.authenticate)
test_auth_call()
class AuthenticationTests(utils.TestCase):
def test_authenticate_success(self):
cs = client.Client("username", "password", "project_id", "auth_url")
management_url = 'https://localhost/v2.1/443470'
auth_response = utils.TestResponse({
'status_code': 204,
'headers': {
'x-server-management-url': management_url,
'x-auth-token': '1b751d74-de0c-46ae-84f0-915744b582d1',
},
})
mock_request = mock.Mock(return_value=(auth_response))
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
cs.client.authenticate()
headers = {
'Accept': 'application/json',
'X-Auth-User': 'username',
'X-Auth-Key': 'password',
'X-Auth-Project-Id': 'project_id',
'User-Agent': cs.client.USER_AGENT
}
mock_request.assert_called_with(
"GET",
cs.client.auth_url,
headers=headers,
**self.TEST_REQUEST_BASE)
self.assertEqual(cs.client.management_url,
auth_response.headers['x-server-management-url'])
self.assertEqual(cs.client.auth_token,
auth_response.headers['x-auth-token'])
test_auth_call()
def test_authenticate_failure(self):
cs = client.Client("username", "password", "project_id", "auth_url")
auth_response = utils.TestResponse({"status_code": 401})
mock_request = mock.Mock(return_value=(auth_response))
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
self.assertRaises(exceptions.Unauthorized, cs.client.authenticate)
test_auth_call()
def test_auth_automatic(self):
cs = client.Client("username", "password", "project_id", "auth_url")
http_client = cs.client
http_client.management_url = ''
mock_request = mock.Mock(return_value=(None, None))
@mock.patch.object(http_client, 'request', mock_request)
@mock.patch.object(http_client, 'authenticate')
def test_auth_call(m):
http_client.get('/')
m.assert_called()
mock_request.assert_called()
test_auth_call()
def test_auth_manual(self):
cs = client.Client("username", "password", "project_id", "auth_url")
@mock.patch.object(cs.client, 'authenticate')
def test_auth_call(m):
cs.authenticate()
m.assert_called()
test_auth_call()

@ -0,0 +1,42 @@
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tests import utils
from tests.v2 import fakes
cs = fakes.FakeClient()
class QuotaClassSetsTest(utils.TestCase):
def test_class_quotas_get(self):
class_name = 'test'
cs.quota_classes.get(class_name)
cs.assert_called('GET', '/os-quota-class-sets/%s' % class_name)
def test_update_quota(self):
q = cs.quota_classes.get('test')
q.update(volumes=2)
cs.assert_called('PUT', '/os-quota-class-sets/test')
def test_refresh_quota(self):
q = cs.quota_classes.get('test')
q2 = cs.quota_classes.get('test')
self.assertEqual(q.volumes, q2.volumes)
q2.volumes = 0
self.assertNotEqual(q.volumes, q2.volumes)
q2.get()
self.assertEqual(q.volumes, q2.volumes)

47
tests/v2/test_quotas.py Normal file

@ -0,0 +1,47 @@
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tests import utils
from tests.v2 import fakes
cs = fakes.FakeClient()
class QuotaSetsTest(utils.TestCase):
def test_tenant_quotas_get(self):
tenant_id = 'test'
cs.quotas.get(tenant_id)
cs.assert_called('GET', '/os-quota-sets/%s' % tenant_id)
def test_tenant_quotas_defaults(self):
tenant_id = 'test'
cs.quotas.defaults(tenant_id)
cs.assert_called('GET', '/os-quota-sets/%s/defaults' % tenant_id)
def test_update_quota(self):
q = cs.quotas.get('test')
q.update(volumes=2)
cs.assert_called('PUT', '/os-quota-sets/test')
def test_refresh_quota(self):
q = cs.quotas.get('test')
q2 = cs.quotas.get('test')
self.assertEqual(q.volumes, q2.volumes)
q2.volumes = 0
self.assertNotEqual(q.volumes, q2.volumes)
q2.get()
self.assertEqual(q.volumes, q2.volumes)

159
tests/v2/test_shell.py Normal file

@ -0,0 +1,159 @@
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
from cinderclient import client
from cinderclient import shell
from tests import utils
from tests.v2 import fakes
class ShellTest(utils.TestCase):
FAKE_ENV = {
'CINDER_USERNAME': 'username',
'CINDER_PASSWORD': 'password',
'CINDER_PROJECT_ID': 'project_id',
'OS_VOLUME_API_VERSION': '2',
'CINDER_URL': 'http://no.where',
}
# Patch os.environ to avoid required auth info.
def setUp(self):
"""Run before each test."""
super(ShellTest, self).setUp()
for var in self.FAKE_ENV:
self.useFixture(fixtures.EnvironmentVariable(var,
self.FAKE_ENV[var]))
self.shell = shell.OpenStackCinderShell()
#HACK(bcwaldon): replace this when we start using stubs
self.old_get_client_class = client.get_client_class
client.get_client_class = lambda *_: fakes.FakeClient
def tearDown(self):
# For some method like test_image_meta_bad_action we are
# testing a SystemExit to be thrown and object self.shell has
# no time to get instantatiated which is OK in this case, so
# we make sure the method is there before launching it.
if hasattr(self.shell, 'cs'):
self.shell.cs.clear_callstack()
#HACK(bcwaldon): replace this when we start using stubs
client.get_client_class = self.old_get_client_class
super(ShellTest, self).tearDown()
def run_command(self, cmd):
self.shell.main(cmd.split())
def assert_called(self, method, url, body=None, **kwargs):
return self.shell.cs.assert_called(method, url, body, **kwargs)
def assert_called_anytime(self, method, url, body=None):
return self.shell.cs.assert_called_anytime(method, url, body)
def test_list(self):
self.run_command('list')
# NOTE(jdg): we default to detail currently
self.assert_called('GET', '/volumes/detail')
def test_list_filter_status(self):
self.run_command('list --status=available')
self.assert_called('GET', '/volumes/detail?status=available')
def test_list_filter_name(self):
self.run_command('list --name=1234')
self.assert_called('GET', '/volumes/detail?name=1234')
def test_list_all_tenants(self):
self.run_command('list --all-tenants=1')
self.assert_called('GET', '/volumes/detail?all_tenants=1')
def test_show(self):
self.run_command('show 1234')
self.assert_called('GET', '/volumes/1234')
def test_delete(self):
self.run_command('delete 1234')
self.assert_called('DELETE', '/volumes/1234')
def test_snapshot_list_filter_volume_id(self):
self.run_command('snapshot-list --volume-id=1234')
self.assert_called('GET', '/snapshots/detail?volume_id=1234')
def test_snapshot_list_filter_status_and_volume_id(self):
self.run_command('snapshot-list --status=available --volume-id=1234')
self.assert_called('GET', '/snapshots/detail?'
'status=available&volume_id=1234')
def test_rename(self):
# basic rename with positional agruments
self.run_command('rename 1234 new-name')
expected = {'volume': {'name': 'new-name'}}
self.assert_called('PUT', '/volumes/1234', body=expected)
# change description only
self.run_command('rename 1234 --description=new-description')
expected = {'volume': {'description': 'new-description'}}
self.assert_called('PUT', '/volumes/1234', body=expected)
# rename and change description
self.run_command('rename 1234 new-name '
'--description=new-description')
expected = {'volume': {
'name': 'new-name',
'description': 'new-description',
}}
self.assert_called('PUT', '/volumes/1234', body=expected)
# noop, the only all will be the lookup
self.run_command('rename 1234')
self.assert_called('GET', '/volumes/1234')
def test_rename_snapshot(self):
# basic rename with positional agruments
self.run_command('snapshot-rename 1234 new-name')
expected = {'snapshot': {'name': 'new-name'}}
self.assert_called('PUT', '/snapshots/1234', body=expected)
# change description only
self.run_command('snapshot-rename 1234 '
'--description=new-description')
expected = {'snapshot': {'description': 'new-description'}}
self.assert_called('PUT', '/snapshots/1234', body=expected)
# snapshot-rename and change description
self.run_command('snapshot-rename 1234 new-name '
'--description=new-description')
expected = {'snapshot': {
'name': 'new-name',
'description': 'new-description',
}}
self.assert_called('PUT', '/snapshots/1234', body=expected)
# noop, the only all will be the lookup
self.run_command('snapshot-rename 1234')
self.assert_called('GET', '/snapshots/1234')
def test_set_metadata_set(self):
self.run_command('metadata 1234 set key1=val1 key2=val2')
self.assert_called('POST', '/volumes/1234/metadata',
{'metadata': {'key1': 'val1', 'key2': 'val2'}})
def test_set_metadata_delete_dict(self):
self.run_command('metadata 1234 unset key1=val1 key2=val2')
self.assert_called('DELETE', '/volumes/1234/metadata/key1')
self.assert_called('DELETE', '/volumes/1234/metadata/key2', pos=-2)
def test_set_metadata_delete_keys(self):
self.run_command('metadata 1234 unset key1 key2')
self.assert_called('DELETE', '/volumes/1234/metadata/key1')
self.assert_called('DELETE', '/volumes/1234/metadata/key2', pos=-2)

50
tests/v2/test_types.py Normal file

@ -0,0 +1,50 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinderclient.v2 import volume_types
from tests import utils
from tests.v2 import fakes
cs = fakes.FakeClient()
class TypesTest(utils.TestCase):
def test_list_types(self):
tl = cs.volume_types.list()
cs.assert_called('GET', '/types')
for t in tl:
self.assertTrue(isinstance(t, volume_types.VolumeType))
def test_create(self):
t = cs.volume_types.create('test-type-3')
cs.assert_called('POST', '/types')
self.assertTrue(isinstance(t, volume_types.VolumeType))
def test_set_key(self):
t = cs.volume_types.get(1)
t.set_keys({'k': 'v'})
cs.assert_called('POST',
'/types/1/extra_specs',
{'extra_specs': {'k': 'v'}})
def test_unsset_keys(self):
t = cs.volume_types.get(1)
t.unset_keys(['k'])
cs.assert_called('DELETE', '/types/1/extra_specs/k')
def test_delete(self):
cs.volume_types.delete(1)
cs.assert_called('DELETE', '/types/1')

87
tests/v2/test_volumes.py Normal file

@ -0,0 +1,87 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tests import utils
from tests.v2 import fakes
cs = fakes.FakeClient()
class VolumesTest(utils.TestCase):
def test_delete_volume(self):
v = cs.volumes.list()[0]
v.delete()
cs.assert_called('DELETE', '/volumes/1234')
cs.volumes.delete('1234')
cs.assert_called('DELETE', '/volumes/1234')
cs.volumes.delete(v)
cs.assert_called('DELETE', '/volumes/1234')
def test_create_keypair(self):
cs.volumes.create(1)
cs.assert_called('POST', '/volumes')
def test_attach(self):
v = cs.volumes.get('1234')
cs.volumes.attach(v, 1, '/dev/vdc')
cs.assert_called('POST', '/volumes/1234/action')
def test_detach(self):
v = cs.volumes.get('1234')
cs.volumes.detach(v)
cs.assert_called('POST', '/volumes/1234/action')
def test_reserve(self):
v = cs.volumes.get('1234')
cs.volumes.reserve(v)
cs.assert_called('POST', '/volumes/1234/action')
def test_unreserve(self):
v = cs.volumes.get('1234')
cs.volumes.unreserve(v)
cs.assert_called('POST', '/volumes/1234/action')
def test_begin_detaching(self):
v = cs.volumes.get('1234')
cs.volumes.begin_detaching(v)
cs.assert_called('POST', '/volumes/1234/action')
def test_roll_detaching(self):
v = cs.volumes.get('1234')
cs.volumes.roll_detaching(v)
cs.assert_called('POST', '/volumes/1234/action')
def test_initialize_connection(self):
v = cs.volumes.get('1234')
cs.volumes.initialize_connection(v, {})
cs.assert_called('POST', '/volumes/1234/action')
def test_terminate_connection(self):
v = cs.volumes.get('1234')
cs.volumes.terminate_connection(v, {})
cs.assert_called('POST', '/volumes/1234/action')
def test_set_metadata(self):
cs.volumes.set_metadata(1234, {'k1': 'v2'})
cs.assert_called('POST', '/volumes/1234/metadata',
{'metadata': {'k1': 'v2'}})
def test_delete_metadata(self):
keys = ['key1']
cs.volumes.delete_metadata(1234, keys)
cs.assert_called('DELETE', '/volumes/1234/metadata/key1')