Added support for keystone v3client

Change-Id: I7bbc74c9e73f36f942f5800a7af0da717da0bc64
This commit is contained in:
haneef ali 2014-05-23 16:27:20 -07:00 committed by Haneef Ali
parent a87ee75285
commit e8e06ee289
9 changed files with 962 additions and 139 deletions

View File

@ -14,7 +14,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
OpenStack Client interface. Handles the REST calls and responses.
"""
@ -23,6 +22,14 @@ from __future__ import print_function
import logging
from cinderclient import exceptions
from cinderclient import utils
from keystoneclient import access
from keystoneclient.auth.identity import v3 as v3_auth
import requests
try:
import urlparse
except ImportError:
@ -43,15 +50,117 @@ if not hasattr(urlparse, 'parse_qsl'):
import cgi
urlparse.parse_qsl = cgi.parse_qsl
import requests
from keystoneclient import access
class CinderClientMixin(object):
from cinderclient import exceptions
from cinderclient import utils
def get_volume_api_version_from_endpoint(self):
magic_tuple = urlparse.urlsplit(self.management_url)
scheme, netloc, path, query, frag = magic_tuple
components = path.split("/")
valid_versions = ['v1', 'v2']
for version in valid_versions:
if version in components:
return version[1:]
msg = "Invalid client version '%s'. must be one of: %s" % (
(version, ', '.join(valid_versions)))
raise exceptions.UnsupportedVersion(msg)
class HTTPClient(object):
class SessionClient(CinderClientMixin):
def __init__(self, session, auth, interface=None,
service_type=None, service_name=None,
region_name=None, http_log_debug=False):
self.session = session
self.auth = auth
self.interface = interface
self.service_type = service_type
self.service_name = service_name
self.region_name = region_name
self.auth_token = None
self.endpoint_url = None
self.management_url = self.endpoint_url
self.http_log_debug = http_log_debug
self._logger = logging.getLogger(__name__)
if self.http_log_debug:
# Use keystoneclient's logs instead of writing our own
ks_logger = logging.getLogger("keystoneclient")
ks_logger.setLevel(logging.DEBUG)
def request(self, url, method, **kwargs):
kwargs.setdefault('user_agent', 'python-cinderclient')
kwargs.setdefault('auth', self.auth)
kwargs.setdefault('authenticated', False)
try:
kwargs['json'] = kwargs.pop('body')
except KeyError:
pass
endpoint_filter = kwargs.setdefault('endpoint_filter', {})
endpoint_filter.setdefault('interface', self.interface)
endpoint_filter.setdefault('service_type', self.service_type)
endpoint_filter.setdefault('service_name', self.service_name)
endpoint_filter.setdefault('region_name', self.region_name)
resp = self.session.request(url, method, **kwargs)
body = None
if resp.text:
try:
body = resp.json()
except ValueError:
pass
return resp, body
def _cs_request(self, url, method, **kwargs):
# this function is mostly redundant but makes compatibility easier
kwargs.setdefault('authenticated', True)
return self.request(url, method, **kwargs)
def do_request(self, url, method, **kwargs):
# this function is mostly redundant but makes compatibility easier
kwargs.setdefault('headers', {})
if self.auth_token is None:
self.authenticate()
kwargs['headers']['X-Auth-Token'] = self.auth_token
if self.access_info is not None:
kwargs['headers'][
'X-Auth-Project-Id'] = self.access_info.project_id
resp, body = self._cs_request(
self.endpoint_url + url, method, **kwargs)
return resp, body
def authenticate(self):
self.auth_token = self.session.get_token(self.auth)
self.access_info = self.session.auth.get_access(self.session)
self.endpoint_url = self.session.get_endpoint(
self.auth,
service_type=self.service_type,
region_name=self.region_name,
interface=self.interface)
self.management_url = self.endpoint_url
self.service_catalog = self.access_info.service_catalog
def get(self, url, **kwargs):
return self.do_request(url, 'GET', **kwargs)
def post(self, url, **kwargs):
return self.do_request(url, 'POST', **kwargs)
def put(self, url, **kwargs):
return self.do_request(url, 'PUT', **kwargs)
def delete(self, url, **kwargs):
return self.do_request(url, 'DELETE', **kwargs)
class HTTPClient(CinderClientMixin):
USER_AGENT = 'python-cinderclient'
@ -385,17 +494,61 @@ class HTTPClient(object):
return self._extract_service_catalog(url, resp, body)
def get_volume_api_version_from_endpoint(self):
magic_tuple = urlparse.urlsplit(self.management_url)
scheme, netloc, path, query, frag = magic_tuple
components = path.split("/")
valid_versions = ['v1', 'v2']
for version in valid_versions:
if version in components:
return version[1:]
msg = "Invalid client version '%s'. must be one of: %s" % (
(version, ', '.join(valid_versions)))
raise exceptions.UnsupportedVersion(msg)
def _construct_http_client(username=None, password=None, project_id=None,
auth_url=None, insecure=False, timeout=None,
proxy_tenant_id=None, proxy_token=None,
region_name=None, endpoint_type='publicURL',
service_type='volume',
service_name=None, volume_service_name=None,
retries=None,
http_log_debug=False,
auth_system='keystone', auth_plugin=None,
cacert=None, tenant_id=None,
session=None,
auth=None):
if session:
# If auth pluggin is specified use that pluggin
session.auth = auth or session.auth
if isinstance(session.auth, v3_auth.Password):
# In v3 and v2 interace names are different
interface_map = {"publicURL": "public",
"adminURL": "admin"}
endpoint_type = interface_map[endpoint_type]
return SessionClient(session=session,
auth=auth,
interface=endpoint_type,
service_type=service_type,
service_name=service_name,
region_name=region_name,
http_log_debug=http_log_debug)
else:
# FIXME(jamielennox): username and password are now optional. Need
# to test that they were provided in this mode.
return HTTPClient(username,
password,
projectid=project_id,
auth_url=auth_url,
insecure=insecure,
timeout=timeout,
tenant_id=tenant_id,
proxy_token=proxy_token,
proxy_tenant_id=proxy_tenant_id,
region_name=region_name,
endpoint_type=endpoint_type,
service_type=service_type,
service_name=service_name,
volume_service_name=volume_service_name,
retries=retries,
http_log_debug=http_log_debug,
cacert=cacert,
auth_system=auth_system,
auth_plugin=auth_plugin,
)
def get_client_class(version):

View File

@ -24,20 +24,27 @@ import argparse
import glob
import imp
import itertools
import logging
import os
import pkgutil
import sys
import logging
import cinderclient.auth_plugin
from cinderclient import client
from cinderclient import exceptions as exc
from cinderclient import utils
import cinderclient.auth_plugin
import cinderclient.extension
from cinderclient.openstack.common import strutils
from cinderclient import utils
from cinderclient.openstack.common.gettextutils import _
from cinderclient.v1 import shell as shell_v1
from cinderclient.v2 import shell as shell_v2
from keystoneclient import discover
from keystoneclient import session
from keystoneclient.auth.identity import v2 as v2_auth
from keystoneclient.auth.identity import v3 as v3_auth
DEFAULT_OS_VOLUME_API_VERSION = "1"
DEFAULT_CINDER_ENDPOINT_TYPE = 'publicURL'
DEFAULT_CINDER_SERVICE_TYPE = 'volume'
@ -58,7 +65,7 @@ class CinderClientArgumentParser(argparse.ArgumentParser):
exits.
"""
self.print_usage(sys.stderr)
#FIXME(lzyeval): if changes occur in argparse.ArgParser._check_value
# FIXME(lzyeval): if changes occur in argparse.ArgParser._check_value
choose_from = ' (choose from'
progparts = self.prog.partition(' ')
self.exit(2, "error: %(errmsg)s\nTry '%(mainp)s help %(subp)s'"
@ -117,6 +124,116 @@ class OpenStackCinderShell(object):
default=False),
help="Shows debugging output.")
parser.add_argument('--os-auth-system',
metavar='<auth-system>',
default=utils.env('OS_AUTH_SYSTEM'),
help='Defaults to env[OS_AUTH_SYSTEM].')
parser.add_argument('--os_auth_system',
help=argparse.SUPPRESS)
parser.add_argument('--service-type',
metavar='<service-type>',
help='Service type. '
'For most actions, default is volume.')
parser.add_argument('--service_type',
help=argparse.SUPPRESS)
parser.add_argument('--service-name',
metavar='<service-name>',
default=utils.env('CINDER_SERVICE_NAME'),
help='Service name. '
'Default=env[CINDER_SERVICE_NAME].')
parser.add_argument('--service_name',
help=argparse.SUPPRESS)
parser.add_argument('--volume-service-name',
metavar='<volume-service-name>',
default=utils.env('CINDER_VOLUME_SERVICE_NAME'),
help='Volume service name. '
'Default=env[CINDER_VOLUME_SERVICE_NAME].')
parser.add_argument('--volume_service_name',
help=argparse.SUPPRESS)
parser.add_argument('--endpoint-type',
metavar='<endpoint-type>',
default=utils.env('CINDER_ENDPOINT_TYPE',
default=
DEFAULT_CINDER_ENDPOINT_TYPE),
help='Endpoint type, which is publicURL or '
'internalURL. '
'Default=nova env[CINDER_ENDPOINT_TYPE] or '
+ DEFAULT_CINDER_ENDPOINT_TYPE + '.')
parser.add_argument('--endpoint_type',
help=argparse.SUPPRESS)
parser.add_argument('--os-volume-api-version',
metavar='<volume-api-ver>',
default=utils.env('OS_VOLUME_API_VERSION',
default=None),
help='Block Storage API version. '
'Valid values are 1 or 2. '
'Default=env[OS_VOLUME_API_VERSION].')
parser.add_argument('--os_volume_api_version',
help=argparse.SUPPRESS)
parser.add_argument('--retries',
metavar='<retries>',
type=int,
default=0,
help='Number of retries.')
self._append_global_identity_args(parser)
# FIXME(dtroyer): The args below are here for diablo compatibility,
# remove them in folsum cycle
# alias for --os-username, left in for backwards compatibility
parser.add_argument('--username',
help=argparse.SUPPRESS)
# alias for --os-region_name, left in for backwards compatibility
parser.add_argument('--region_name',
help=argparse.SUPPRESS)
# alias for --os-password, left in for backwards compatibility
parser.add_argument('--apikey', '--password', dest='apikey',
default=utils.env('CINDER_API_KEY'),
help=argparse.SUPPRESS)
# alias for --os-tenant-name, left in for backward compatibility
parser.add_argument('--projectid', '--tenant_name', dest='projectid',
default=utils.env('CINDER_PROJECT_ID'),
help=argparse.SUPPRESS)
# alias for --os-auth-url, left in for backward compatibility
parser.add_argument('--url', '--auth_url', dest='url',
default=utils.env('CINDER_URL'),
help=argparse.SUPPRESS)
# The auth-system-plugins might require some extra options
cinderclient.auth_plugin.discover_auth_systems()
cinderclient.auth_plugin.load_auth_system_opts(parser)
return parser
def _append_global_identity_args(self, parser):
# FIXME(bklei): these are global identity (Keystone) arguments which
# should be consistent and shared by all service clients. Therefore,
# they should be provided by python-keystoneclient. We will need to
# refactor this code once this functionality is available in
# python-keystoneclient.
parser.add_argument(
'--os-auth-strategy', metavar='<auth-strategy>',
default=utils.env('OS_AUTH_STRATEGY', default='keystone'),
help=_('Authentication strategy (Env: OS_AUTH_STRATEGY'
', default keystone). For now, any other value will'
' disable the authentication'))
parser.add_argument(
'--os_auth_strategy',
help=argparse.SUPPRESS)
parser.add_argument('--os-username',
metavar='<auth-user-name>',
default=utils.env('OS_USERNAME',
@ -162,6 +279,87 @@ class OpenStackCinderShell(object):
parser.add_argument('--os_auth_url',
help=argparse.SUPPRESS)
parser.add_argument(
'--os-user-id', metavar='<auth-user-id>',
default=utils.env('OS_USER_ID'),
help=_('Authentication user ID (Env: OS_USER_ID)'))
parser.add_argument(
'--os_user_id',
help=argparse.SUPPRESS)
parser.add_argument(
'--os-user-domain-id',
metavar='<auth-user-domain-id>',
default=utils.env('OS_USER_DOMAIN_ID'),
help='OpenStack user domain ID. '
'Defaults to env[OS_USER_DOMAIN_ID].')
parser.add_argument(
'--os_user_domain_id',
help=argparse.SUPPRESS)
parser.add_argument(
'--os-user-domain-name',
metavar='<auth-user-domain-name>',
default=utils.env('OS_USER_DOMAIN_NAME'),
help='OpenStack user domain name. '
'Defaults to env[OS_USER_DOMAIN_NAME].')
parser.add_argument(
'--os_user_domain_name',
help=argparse.SUPPRESS)
parser.add_argument(
'--os-project-id',
metavar='<auth-project-id>',
default=utils.env('OS_PROJECT_ID'),
help='Another way to specify tenant ID. '
'This option is mutually exclusive with '
' --os-tenant-id. '
'Defaults to env[OS_PROJECT_ID].')
parser.add_argument(
'--os_project_id',
help=argparse.SUPPRESS)
parser.add_argument(
'--os-project-name',
metavar='<auth-project-name>',
default=utils.env('OS_PROJECT_NAME'),
help='Another way to specify tenant name. '
'This option is mutually exclusive with '
' --os-tenant-name. '
'Defaults to env[OS_PROJECT_NAME].')
parser.add_argument(
'--os_project_name',
help=argparse.SUPPRESS)
parser.add_argument(
'--os-project-domain-id',
metavar='<auth-project-domain-id>',
default=utils.env('OS_PROJECT_DOMAIN_ID'),
help='Defaults to env[OS_PROJECT_DOMAIN_ID].')
parser.add_argument(
'--os-project-domain-name',
metavar='<auth-project-domain-name>',
default=utils.env('OS_PROJECT_DOMAIN_NAME'),
help='Defaults to env[OS_PROJECT_DOMAIN_NAME].')
parser.add_argument(
'--os-cert',
metavar='<certificate>',
default=utils.env('OS_CERT'),
help='Defaults to env[OS_CERT].')
parser.add_argument(
'--os-key',
metavar='<key>',
default=utils.env('OS_KEY'),
help='Defaults to env[OS_KEY].')
parser.add_argument('--os-region-name',
metavar='<region-name>',
default=utils.env('OS_REGION_NAME',
@ -171,63 +369,29 @@ class OpenStackCinderShell(object):
parser.add_argument('--os_region_name',
help=argparse.SUPPRESS)
parser.add_argument('--os-auth-system',
metavar='<auth-system>',
default=utils.env('OS_AUTH_SYSTEM'),
help='Defaults to env[OS_AUTH_SYSTEM].')
parser.add_argument('--os_auth_system',
help=argparse.SUPPRESS)
parser.add_argument(
'--os-token', metavar='<token>',
default=utils.env('OS_TOKEN'),
help=_('Defaults to env[OS_TOKEN]'))
parser.add_argument(
'--os_token',
help=argparse.SUPPRESS)
parser.add_argument('--service-type',
metavar='<service-type>',
help='Service type. '
'For most actions, default is volume.')
parser.add_argument('--service_type',
help=argparse.SUPPRESS)
parser.add_argument(
'--os-url', metavar='<url>',
default=utils.env('OS_URL'),
help=_('Defaults to env[OS_URL]'))
parser.add_argument(
'--os_url',
help=argparse.SUPPRESS)
parser.add_argument('--service-name',
metavar='<service-name>',
default=utils.env('CINDER_SERVICE_NAME'),
help='Service name. '
'Default=env[CINDER_SERVICE_NAME].')
parser.add_argument('--service_name',
help=argparse.SUPPRESS)
parser.add_argument('--volume-service-name',
metavar='<volume-service-name>',
default=utils.env('CINDER_VOLUME_SERVICE_NAME'),
help='Volume service name. '
'Default=env[CINDER_VOLUME_SERVICE_NAME].')
parser.add_argument('--volume_service_name',
help=argparse.SUPPRESS)
parser.add_argument('--endpoint-type',
metavar='<endpoint-type>',
default=utils.env('CINDER_ENDPOINT_TYPE',
default=DEFAULT_CINDER_ENDPOINT_TYPE),
help='Endpoint type, which is publicURL or '
'internalURL. '
'Default=nova env[CINDER_ENDPOINT_TYPE] or '
+ DEFAULT_CINDER_ENDPOINT_TYPE + '.')
parser.add_argument('--endpoint_type',
help=argparse.SUPPRESS)
parser.add_argument('--os-volume-api-version',
metavar='<volume-api-ver>',
default=utils.env('OS_VOLUME_API_VERSION',
default=None),
help='Block Storage API version. '
'Valid values are 1 or 2. '
'Default=env[OS_VOLUME_API_VERSION].')
parser.add_argument('--os_volume_api_version',
help=argparse.SUPPRESS)
parser.add_argument('--os-cacert',
metavar='<ca-certificate>',
default=utils.env('OS_CACERT', default=None),
help='A CA bundle file that is used to '
'verify a TLS (https) server certificate. '
'Default=env[OS_CACERT].')
parser.add_argument(
'--os-cacert',
metavar='<ca-certificate>',
default=utils.env('OS_CACERT', default=None),
help=_("Specify a CA bundle file to use in "
"verifying a TLS (https) server certificate. "
"Defaults to env[OS_CACERT]"))
parser.add_argument('--insecure',
default=utils.env('CINDERCLIENT_INSECURE',
@ -235,44 +399,6 @@ class OpenStackCinderShell(object):
action='store_true',
help=argparse.SUPPRESS)
parser.add_argument('--retries',
metavar='<retries>',
type=int,
default=0,
help='Number of retries.')
# FIXME(dtroyer): The args below are here for diablo compatibility,
# remove them in folsum cycle
# alias for --os-username, left in for backwards compatibility
parser.add_argument('--username',
help=argparse.SUPPRESS)
# alias for --os-region_name, left in for backwards compatibility
parser.add_argument('--region_name',
help=argparse.SUPPRESS)
# alias for --os-password, left in for backwards compatibility
parser.add_argument('--apikey', '--password', dest='apikey',
default=utils.env('CINDER_API_KEY'),
help=argparse.SUPPRESS)
# alias for --os-tenant-name, left in for backward compatibility
parser.add_argument('--projectid', '--tenant_name', dest='projectid',
default=utils.env('CINDER_PROJECT_ID'),
help=argparse.SUPPRESS)
# alias for --os-auth-url, left in for backward compatibility
parser.add_argument('--url', '--auth_url', dest='url',
default=utils.env('CINDER_URL'),
help=argparse.SUPPRESS)
# The auth-system-plugins might require some extra options
cinderclient.auth_plugin.discover_auth_systems()
cinderclient.auth_plugin.load_auth_system_opts(parser)
return parser
def get_subcommand_parser(self, version):
parser = self.get_base_parser()
@ -378,11 +504,13 @@ class OpenStackCinderShell(object):
logger.addHandler(streamhandler)
def main(self, argv):
# Parse args once to find version and debug settings
parser = self.get_base_parser()
(options, args) = parser.parse_known_args(argv)
self.setup_debugging(options.debug)
api_version_input = True
self.options = options
if not options.os_volume_api_version:
# Environment variable OS_VOLUME_API_VERSION was
@ -442,7 +570,7 @@ class OpenStackCinderShell(object):
service_type = DEFAULT_CINDER_SERVICE_TYPE
service_type = utils.get_service_type(args.func) or service_type
#FIXME(usrleon): Here should be restrict for project id same as
# FIXME(usrleon): Here should be restrict for project id same as
# for os_username or os_password but for compatibility it is not.
if not utils.isunauthenticated(args.func):
@ -474,6 +602,28 @@ class OpenStackCinderShell(object):
else:
os_tenant_name = projectid
# V3 stuff
project_info_provided = self.options.os_tenant_name or \
self.options.os_tenant_id or \
(self.options.os_project_name and
(self.options.project_domain_name or
self.options.project_domain_id)) or \
self.options.os_project_id
if (not project_info_provided):
raise exc.CommandError(
_("You must provide a tenant_name, tenant_id, "
"project_id or project_name (with "
"project_domain_name or project_domain_id) via "
" --os-tenant-name (env[OS_TENANT_NAME]),"
" --os-tenant-id (env[OS_TENANT_ID]),"
" --os-project-id (env[OS_PROJECT_ID])"
" --os-project-name (env[OS_PROJECT_NAME]),"
" --os-project-domain-id "
"(env[OS_PROJECT_DOMAIN_ID])"
" --os-project-domain-name "
"(env[OS_PROJECT_DOMAIN_NAME])"))
if not os_auth_url:
if os_auth_system and os_auth_system != 'keystone':
os_auth_url = auth_plugin.get_auth_url()
@ -499,6 +649,8 @@ class OpenStackCinderShell(object):
"You must provide an authentication URL "
"through --os-auth-url or env[OS_AUTH_URL].")
auth_session = self._get_keystone_session()
self.cs = client.Client(options.os_volume_api_version, os_username,
os_password, os_tenant_name, os_auth_url,
insecure, region_name=os_region_name,
@ -511,7 +663,8 @@ class OpenStackCinderShell(object):
retries=options.retries,
http_log_debug=args.debug,
cacert=cacert, auth_system=os_auth_system,
auth_plugin=auth_plugin)
auth_plugin=auth_plugin,
session=auth_session)
try:
if not utils.isunauthenticated(args.func):
@ -588,9 +741,101 @@ class OpenStackCinderShell(object):
else:
self.parser.print_help()
def get_v2_auth(self, v2_auth_url):
username = self.options.os_username
password = self.options.os_password
tenant_id = self.options.os_tenant_id
tenant_name = self.options.os_tenant_name
return v2_auth.Password(
v2_auth_url,
username=username,
password=password,
tenant_id=tenant_id,
tenant_name=tenant_name)
def get_v3_auth(self, v3_auth_url):
username = self.options.os_username
user_id = self.options.os_user_id
user_domain_name = self.options.os_user_domain_name
user_domain_id = self.options.os_user_domain_id
password = self.options.os_password
project_id = self.options.os_project_id or self.options.os_tenant_id
project_name = (self.options.os_project_name
or self.options.os_tenant_name)
project_domain_name = self.options.os_project_domain_name
project_domain_id = self.options.os_project_domain_id
return v3_auth.Password(
v3_auth_url,
username=username,
password=password,
user_id=user_id,
user_domain_name=user_domain_name,
user_domain_id=user_domain_id,
project_id=project_id,
project_name=project_name,
project_domain_name=project_domain_name,
project_domain_id=project_domain_id,
)
def _get_keystone_session(self, **kwargs):
# first create a Keystone session
cacert = self.options.os_cacert or None
cert = self.options.os_cert or None
insecure = self.options.insecure or False
if insecure:
verify = False
else:
verify = cacert or True
ks_session = session.Session(verify=verify, cert=cert)
# discover the supported keystone versions using the given url
ks_discover = discover.Discover(session=ks_session,
auth_url=self.options.os_auth_url)
# Determine which authentication plugin to use. First inspect the
# auth_url to see the supported version. If both v3 and v2 are
# supported, then use the highest version if possible.
v2_auth_url = ks_discover.url_for('v2.0')
v3_auth_url = ks_discover.url_for('v3.0')
username = self.options.os_username or None
user_domain_name = self.options.os_user_domain_name or None
user_domain_id = self.options.os_user_domain_id or None
auth = None
if v3_auth_url and v2_auth_url:
# support both v2 and v3 auth. Use v3 if possible.
if username:
if user_domain_name or user_domain_id:
# use v3 auth
auth = self.get_v3_auth(v3_auth_url)
else:
# use v2 auth
auth = self.get_v2_auth(v2_auth_url)
elif v3_auth_url:
# support only v3
auth = self.get_v3_auth(v3_auth_url)
elif v2_auth_url:
# support only v2
auth = self.get_v2_auth(v2_auth_url)
else:
raise exc.CommandError('Unable to determine the Keystone version '
'to authenticate with using the given '
'auth_url.')
ks_session.auth = auth
return ks_session
# I'm picky about my shell help.
class OpenStackHelpFormatter(argparse.HelpFormatter):
def start_section(self, heading):
# Title-case the headings
heading = '%s%s' % (heading[0].upper(), heading[1:])
@ -603,7 +848,7 @@ def main():
OpenStackCinderShell().main(sys.argv[1:])
else:
OpenStackCinderShell().main(map(strutils.safe_decode,
sys.argv[1:]))
sys.argv[1:]))
except KeyboardInterrupt:
print("... terminating cinder client", file=sys.stderr)
sys.exit(130)
@ -614,4 +859,5 @@ def main():
if __name__ == "__main__":
main()

View File

@ -0,0 +1,218 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import json
import uuid
# these are copied from python-keystoneclient tests
BASE_HOST = 'http://keystone.example.com'
BASE_URL = "%s:5000/" % BASE_HOST
UPDATED = '2013-03-06T00:00:00Z'
V2_URL = "%sv2.0" % BASE_URL
V2_DESCRIBED_BY_HTML = {'href': 'http://docs.openstack.org/api/'
'openstack-identity-service/2.0/content/',
'rel': 'describedby',
'type': 'text/html'}
V2_DESCRIBED_BY_PDF = {'href': 'http://docs.openstack.org/api/openstack-ident'
'ity-service/2.0/identity-dev-guide-2.0.pdf',
'rel': 'describedby',
'type': 'application/pdf'}
V2_VERSION = {'id': 'v2.0',
'links': [{'href': V2_URL, 'rel': 'self'},
V2_DESCRIBED_BY_HTML, V2_DESCRIBED_BY_PDF],
'status': 'stable',
'updated': UPDATED}
V3_URL = "%sv3" % BASE_URL
V3_MEDIA_TYPES = [{'base': 'application/json',
'type': 'application/vnd.openstack.identity-v3+json'},
{'base': 'application/xml',
'type': 'application/vnd.openstack.identity-v3+xml'}]
V3_VERSION = {'id': 'v3.0',
'links': [{'href': V3_URL, 'rel': 'self'}],
'media-types': V3_MEDIA_TYPES,
'status': 'stable',
'updated': UPDATED}
def _create_version_list(versions):
return json.dumps({'versions': {'values': versions}})
def _create_single_version(version):
return json.dumps({'version': version})
V3_VERSION_LIST = _create_version_list([V3_VERSION, V2_VERSION])
V2_VERSION_LIST = _create_version_list([V2_VERSION])
V3_VERSION_ENTRY = _create_single_version(V3_VERSION)
V2_VERSION_ENTRY = _create_single_version(V2_VERSION)
CINDER_ENDPOINT = 'http://www.cinder.com/v1'
def _get_normalized_token_data(**kwargs):
ref = copy.deepcopy(kwargs)
# normalized token data
ref['user_id'] = ref.get('user_id', uuid.uuid4().hex)
ref['username'] = ref.get('username', uuid.uuid4().hex)
ref['project_id'] = ref.get('project_id',
ref.get('tenant_id', uuid.uuid4().hex))
ref['project_name'] = ref.get('tenant_name',
ref.get('tenant_name', uuid.uuid4().hex))
ref['user_domain_id'] = ref.get('user_domain_id', uuid.uuid4().hex)
ref['user_domain_name'] = ref.get('user_domain_name', uuid.uuid4().hex)
ref['project_domain_id'] = ref.get('project_domain_id', uuid.uuid4().hex)
ref['project_domain_name'] = ref.get('project_domain_name',
uuid.uuid4().hex)
ref['roles'] = ref.get('roles', [{'name': uuid.uuid4().hex,
'id': uuid.uuid4().hex}])
ref['roles_link'] = ref.get('roles_link', [])
ref['cinder_url'] = ref.get('cinder_url', CINDER_ENDPOINT)
return ref
def generate_v2_project_scoped_token(**kwargs):
"""Generate a Keystone V2 token based on auth request."""
ref = _get_normalized_token_data(**kwargs)
token = uuid.uuid4().hex
o = {'access': {'token': {'id': token,
'expires': '2099-05-22T00:02:43.941430Z',
'issued_at': '2013-05-21T00:02:43.941473Z',
'tenant': {'enabled': True,
'id': ref.get('project_id'),
'name': ref.get('project_id')
}
},
'user': {'id': ref.get('user_id'),
'name': uuid.uuid4().hex,
'username': ref.get('username'),
'roles': ref.get('roles'),
'roles_links': ref.get('roles_links')
}
}}
# we only care about Neutron and Keystone endpoints
o['access']['serviceCatalog'] = [
{'endpoints': [
{'publicURL': 'public_' + ref.get('cinder_url'),
'internalURL': 'internal_' + ref.get('cinder_url'),
'adminURL': 'admin_' + (ref.get('auth_url') or ""),
'id': uuid.uuid4().hex,
'region': 'RegionOne'
}],
'endpoints_links': [],
'name': 'Neutron',
'type': 'network'},
{'endpoints': [
{'publicURL': ref.get('auth_url'),
'adminURL': ref.get('auth_url'),
'internalURL': ref.get('auth_url'),
'id': uuid.uuid4().hex,
'region': 'RegionOne'
}],
'endpoint_links': [],
'name': 'keystone',
'type': 'identity'}]
return token, o
def generate_v3_project_scoped_token(**kwargs):
"""Generate a Keystone V3 token based on auth request."""
ref = _get_normalized_token_data(**kwargs)
o = {'token': {'expires_at': '2099-05-22T00:02:43.941430Z',
'issued_at': '2013-05-21T00:02:43.941473Z',
'methods': ['password'],
'project': {'id': ref.get('project_id'),
'name': ref.get('project_name'),
'domain': {'id': ref.get('project_domain_id'),
'name': ref.get(
'project_domain_name')
}
},
'user': {'id': ref.get('user_id'),
'name': ref.get('username'),
'domain': {'id': ref.get('user_domain_id'),
'name': ref.get('user_domain_name')
}
},
'roles': ref.get('roles')
}}
# we only care about Neutron and Keystone endpoints
o['token']['catalog'] = [
{'endpoints': [
{
'id': uuid.uuid4().hex,
'interface': 'public',
'region': 'RegionOne',
'url': 'public_' + ref.get('cinder_url')
},
{
'id': uuid.uuid4().hex,
'interface': 'internal',
'region': 'RegionOne',
'url': 'internal_' + ref.get('cinder_url')
},
{
'id': uuid.uuid4().hex,
'interface': 'admin',
'region': 'RegionOne',
'url': 'admin_' + ref.get('cinder_url')
}],
'id': uuid.uuid4().hex,
'type': 'network'},
{'endpoints': [
{
'id': uuid.uuid4().hex,
'interface': 'public',
'region': 'RegionOne',
'url': ref.get('auth_url')
},
{
'id': uuid.uuid4().hex,
'interface': 'admin',
'region': 'RegionOne',
'url': ref.get('auth_url')
}],
'id': uuid.uuid4().hex,
'type': 'identity'}]
# token ID is conveyed via the X-Subject-Token header so we are generating
# one to stash there
token_id = uuid.uuid4().hex
return token_id, o
def keystone_request_callback(request, uri, headers):
response_headers = {"content-type": "application/json"}
if uri == BASE_URL:
return (200, headers, V3_VERSION_LIST)
elif uri == BASE_URL + "/v2.0":
token_id, token_data = generate_v2_project_scoped_token()
return (200, response_headers, token_data)
elif uri == BASE_URL + "/v3":
token_id, token_data = generate_v3_project_scoped_token()
response_headers["X-Subject-Token"] = token_id
return (201, response_headers, token_data)

View File

@ -22,6 +22,8 @@ from cinderclient import shell
from cinderclient.v1 import shell as shell_v1
from cinderclient.tests.v1 import fakes
from cinderclient.tests import utils
from cinderclient.tests.fixture_data import keystone_client
import httpretty
class ShellTest(utils.TestCase):
@ -31,7 +33,7 @@ class ShellTest(utils.TestCase):
'CINDER_PASSWORD': 'password',
'CINDER_PROJECT_ID': 'project_id',
'OS_VOLUME_API_VERSION': '1',
'CINDER_URL': 'http://no.where',
'CINDER_URL': keystone_client.BASE_URL,
}
# Patch os.environ to avoid required auth info.
@ -44,7 +46,7 @@ class ShellTest(utils.TestCase):
self.shell = shell.OpenStackCinderShell()
#HACK(bcwaldon): replace this when we start using stubs
# HACK(bcwaldon): replace this when we start using stubs
self.old_get_client_class = client.get_client_class
client.get_client_class = lambda *_: fakes.FakeClient
@ -56,7 +58,7 @@ class ShellTest(utils.TestCase):
if hasattr(self.shell, 'cs'):
self.shell.cs.clear_callstack()
#HACK(bcwaldon): replace this when we start using stubs
# HACK(bcwaldon): replace this when we start using stubs
client.get_client_class = self.old_get_client_class
super(ShellTest, self).tearDown()
@ -69,9 +71,14 @@ class ShellTest(utils.TestCase):
def assert_called_anytime(self, method, url, body=None):
return self.shell.cs.assert_called_anytime(method, url, body)
def register_keystone_auth_fixture(self):
httpretty.register_uri(httpretty.GET, keystone_client.BASE_URL,
body=keystone_client.keystone_request_callback)
def test_extract_metadata(self):
# mimic the result of argparse's parse_args() method
class Arguments:
def __init__(self, metadata=[]):
self.metadata = metadata
@ -88,63 +95,91 @@ class ShellTest(utils.TestCase):
args = Arguments(metadata=input[0])
self.assertEqual(shell_v1._extract_metadata(args), input[1])
@httpretty.activate
def test_list(self):
self.register_keystone_auth_fixture()
self.run_command('list')
# NOTE(jdg): we default to detail currently
self.assert_called('GET', '/volumes/detail')
@httpretty.activate
def test_list_filter_status(self):
self.register_keystone_auth_fixture()
self.run_command('list --status=available')
self.assert_called('GET', '/volumes/detail?status=available')
@httpretty.activate
def test_list_filter_display_name(self):
self.register_keystone_auth_fixture()
self.run_command('list --display-name=1234')
self.assert_called('GET', '/volumes/detail?display_name=1234')
@httpretty.activate
def test_list_all_tenants(self):
self.register_keystone_auth_fixture()
self.run_command('list --all-tenants=1')
self.assert_called('GET', '/volumes/detail?all_tenants=1')
@httpretty.activate
def test_list_availability_zone(self):
self.register_keystone_auth_fixture()
self.run_command('availability-zone-list')
self.assert_called('GET', '/os-availability-zone')
@httpretty.activate
def test_show(self):
self.register_keystone_auth_fixture()
self.run_command('show 1234')
self.assert_called('GET', '/volumes/1234')
@httpretty.activate
def test_delete(self):
self.register_keystone_auth_fixture()
self.run_command('delete 1234')
self.assert_called('DELETE', '/volumes/1234')
@httpretty.activate
def test_delete_by_name(self):
self.register_keystone_auth_fixture()
self.run_command('delete sample-volume')
self.assert_called_anytime('GET', '/volumes/detail?all_tenants=1')
self.assert_called('DELETE', '/volumes/1234')
@httpretty.activate
def test_delete_multiple(self):
self.register_keystone_auth_fixture()
self.run_command('delete 1234 5678')
self.assert_called_anytime('DELETE', '/volumes/1234')
self.assert_called('DELETE', '/volumes/5678')
@httpretty.activate
def test_backup(self):
self.register_keystone_auth_fixture()
self.run_command('backup-create 1234')
self.assert_called('POST', '/backups')
@httpretty.activate
def test_restore(self):
self.register_keystone_auth_fixture()
self.run_command('backup-restore 1234')
self.assert_called('POST', '/backups/1234/restore')
@httpretty.activate
def test_snapshot_list_filter_volume_id(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-list --volume-id=1234')
self.assert_called('GET', '/snapshots/detail?volume_id=1234')
@httpretty.activate
def test_snapshot_list_filter_status_and_volume_id(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-list --status=available --volume-id=1234')
self.assert_called('GET', '/snapshots/detail?'
'status=available&volume_id=1234')
@httpretty.activate
def test_rename(self):
self.register_keystone_auth_fixture()
# basic rename with positional arguments
self.run_command('rename 1234 new-name')
expected = {'volume': {'display_name': 'new-name'}}
@ -165,7 +200,9 @@ class ShellTest(utils.TestCase):
# Call rename with no arguments
self.assertRaises(SystemExit, self.run_command, 'rename')
@httpretty.activate
def test_rename_snapshot(self):
self.register_keystone_auth_fixture()
# basic rename with positional arguments
self.run_command('snapshot-rename 1234 new-name')
expected = {'snapshot': {'display_name': 'new-name'}}
@ -187,32 +224,44 @@ class ShellTest(utils.TestCase):
# Call snapshot-rename with no arguments
self.assertRaises(SystemExit, self.run_command, 'snapshot-rename')
@httpretty.activate
def test_set_metadata_set(self):
self.register_keystone_auth_fixture()
self.run_command('metadata 1234 set key1=val1 key2=val2')
self.assert_called('POST', '/volumes/1234/metadata',
{'metadata': {'key1': 'val1', 'key2': 'val2'}})
@httpretty.activate
def test_set_metadata_delete_dict(self):
self.register_keystone_auth_fixture()
self.run_command('metadata 1234 unset key1=val1 key2=val2')
self.assert_called('DELETE', '/volumes/1234/metadata/key1')
self.assert_called('DELETE', '/volumes/1234/metadata/key2', pos=-2)
@httpretty.activate
def test_set_metadata_delete_keys(self):
self.register_keystone_auth_fixture()
self.run_command('metadata 1234 unset key1 key2')
self.assert_called('DELETE', '/volumes/1234/metadata/key1')
self.assert_called('DELETE', '/volumes/1234/metadata/key2', pos=-2)
@httpretty.activate
def test_reset_state(self):
self.register_keystone_auth_fixture()
self.run_command('reset-state 1234')
expected = {'os-reset_status': {'status': 'available'}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
@httpretty.activate
def test_reset_state_with_flag(self):
self.register_keystone_auth_fixture()
self.run_command('reset-state --state error 1234')
expected = {'os-reset_status': {'status': 'error'}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
@httpretty.activate
def test_reset_state_multiple(self):
self.register_keystone_auth_fixture()
self.run_command('reset-state 1234 5678 --state error')
expected = {'os-reset_status': {'status': 'error'}}
self.assert_called_anytime('POST', '/volumes/1234/action',
@ -220,17 +269,24 @@ class ShellTest(utils.TestCase):
self.assert_called_anytime('POST', '/volumes/5678/action',
body=expected)
@httpretty.activate
def test_snapshot_reset_state(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-reset-state 1234')
expected = {'os-reset_status': {'status': 'available'}}
self.assert_called('POST', '/snapshots/1234/action', body=expected)
@httpretty.activate
def test_snapshot_reset_state_with_flag(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-reset-state --state error 1234')
expected = {'os-reset_status': {'status': 'error'}}
self.assert_called('POST', '/snapshots/1234/action', body=expected)
@httpretty.activate
def test_snapshot_reset_state_multiple(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-reset-state 1234 5678')
expected = {'os-reset_status': {'status': 'available'}}
self.assert_called_anytime('POST', '/snapshots/1234/action',
@ -238,6 +294,7 @@ class ShellTest(utils.TestCase):
self.assert_called_anytime('POST', '/snapshots/5678/action',
body=expected)
@httpretty.activate
def test_encryption_type_list(self):
"""
Test encryption-type-list shell command.
@ -246,11 +303,13 @@ class ShellTest(utils.TestCase):
- one to get the volume type list information
- one per volume type to retrieve the encryption type information
"""
self.register_keystone_auth_fixture()
self.run_command('encryption-type-list')
self.assert_called_anytime('GET', '/types')
self.assert_called_anytime('GET', '/types/1/encryption')
self.assert_called_anytime('GET', '/types/2/encryption')
@httpretty.activate
def test_encryption_type_show(self):
"""
Test encryption-type-show shell command.
@ -259,10 +318,12 @@ class ShellTest(utils.TestCase):
- one to get the volume type information
- one to get the encryption type information
"""
self.register_keystone_auth_fixture()
self.run_command('encryption-type-show 1')
self.assert_called('GET', '/types/1/encryption')
self.assert_called_anytime('GET', '/types/1')
@httpretty.activate
def test_encryption_type_create(self):
"""
Test encryption-type-create shell command.
@ -271,6 +332,7 @@ class ShellTest(utils.TestCase):
- one GET request to retrieve the relevant volume type information
- one POST request to create the new encryption type
"""
self.register_keystone_auth_fixture()
expected = {'encryption': {'cipher': None, 'key_size': None,
'provider': 'TestProvider',
'control_location': 'front-end'}}
@ -289,6 +351,7 @@ class ShellTest(utils.TestCase):
"""
self.skipTest("Not implemented")
@httpretty.activate
def test_encryption_type_delete(self):
"""
Test encryption-type-delete shell command.
@ -297,43 +360,58 @@ class ShellTest(utils.TestCase):
- one GET request to retrieve the relevant volume type information
- one DELETE request to delete the encryption type information
"""
self.register_keystone_auth_fixture()
self.run_command('encryption-type-delete 1')
self.assert_called('DELETE', '/types/1/encryption/provider')
self.assert_called_anytime('GET', '/types/1')
@httpretty.activate
def test_migrate_volume(self):
self.register_keystone_auth_fixture()
self.run_command('migrate 1234 fakehost --force-host-copy=True')
expected = {'os-migrate_volume': {'force_host_copy': 'True',
'host': 'fakehost'}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
@httpretty.activate
def test_snapshot_metadata_set(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-metadata 1234 set key1=val1 key2=val2')
self.assert_called('POST', '/snapshots/1234/metadata',
{'metadata': {'key1': 'val1', 'key2': 'val2'}})
@httpretty.activate
def test_snapshot_metadata_unset_dict(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-metadata 1234 unset key1=val1 key2=val2')
self.assert_called_anytime('DELETE', '/snapshots/1234/metadata/key1')
self.assert_called_anytime('DELETE', '/snapshots/1234/metadata/key2')
@httpretty.activate
def test_snapshot_metadata_unset_keys(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-metadata 1234 unset key1 key2')
self.assert_called_anytime('DELETE', '/snapshots/1234/metadata/key1')
self.assert_called_anytime('DELETE', '/snapshots/1234/metadata/key2')
@httpretty.activate
def test_volume_metadata_update_all(self):
self.register_keystone_auth_fixture()
self.run_command('metadata-update-all 1234 key1=val1 key2=val2')
self.assert_called('PUT', '/volumes/1234/metadata',
{'metadata': {'key1': 'val1', 'key2': 'val2'}})
@httpretty.activate
def test_snapshot_metadata_update_all(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-metadata-update-all\
1234 key1=val1 key2=val2')
self.assert_called('PUT', '/snapshots/1234/metadata',
{'metadata': {'key1': 'val1', 'key2': 'val2'}})
@httpretty.activate
def test_readonly_mode_update(self):
self.register_keystone_auth_fixture()
self.run_command('readonly-mode-update 1234 True')
expected = {'os-update_readonly_flag': {'readonly': True}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
@ -342,31 +420,43 @@ class ShellTest(utils.TestCase):
expected = {'os-update_readonly_flag': {'readonly': False}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
@httpretty.activate
def test_service_disable(self):
self.register_keystone_auth_fixture()
self.run_command('service-disable host cinder-volume')
self.assert_called('PUT', '/os-services/disable',
{"binary": "cinder-volume", "host": "host"})
@httpretty.activate
def test_services_disable_with_reason(self):
self.register_keystone_auth_fixture()
cmd = 'service-disable host cinder-volume --reason no_reason'
self.run_command(cmd)
body = {'host': 'host', 'binary': 'cinder-volume',
'disabled_reason': 'no_reason'}
self.assert_called('PUT', '/os-services/disable-log-reason', body)
@httpretty.activate
def test_service_enable(self):
self.register_keystone_auth_fixture()
self.run_command('service-enable host cinder-volume')
self.assert_called('PUT', '/os-services/enable',
{"binary": "cinder-volume", "host": "host"})
@httpretty.activate
def test_snapshot_delete(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-delete 1234')
self.assert_called('DELETE', '/snapshots/1234')
@httpretty.activate
def test_quota_delete(self):
self.register_keystone_auth_fixture()
self.run_command('quota-delete 1234')
self.assert_called('DELETE', '/os-quota-sets/1234')
@httpretty.activate
def test_snapshot_delete_multiple(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-delete 1234 5678')
self.assert_called('DELETE', '/snapshots/5678')

View File

@ -19,6 +19,8 @@ from cinderclient import client
from cinderclient import shell
from cinderclient.tests import utils
from cinderclient.tests.v2 import fakes
from cinderclient.tests.fixture_data import keystone_client
import httpretty
class ShellTest(utils.TestCase):
@ -28,7 +30,7 @@ class ShellTest(utils.TestCase):
'CINDER_PASSWORD': 'password',
'CINDER_PROJECT_ID': 'project_id',
'OS_VOLUME_API_VERSION': '2',
'CINDER_URL': 'http://no.where',
'CINDER_URL': keystone_client.BASE_URL,
}
# Patch os.environ to avoid required auth info.
@ -41,7 +43,7 @@ class ShellTest(utils.TestCase):
self.shell = shell.OpenStackCinderShell()
#HACK(bcwaldon): replace this when we start using stubs
# HACK(bcwaldon): replace this when we start using stubs
self.old_get_client_class = client.get_client_class
client.get_client_class = lambda *_: fakes.FakeClient
@ -53,10 +55,14 @@ class ShellTest(utils.TestCase):
if hasattr(self.shell, 'cs'):
self.shell.cs.clear_callstack()
#HACK(bcwaldon): replace this when we start using stubs
# HACK(bcwaldon): replace this when we start using stubs
client.get_client_class = self.old_get_client_class
super(ShellTest, self).tearDown()
def register_keystone_auth_fixture(self):
httpretty.register_uri(httpretty.GET, keystone_client.BASE_URL,
body=keystone_client.keystone_request_callback)
def run_command(self, cmd):
self.shell.main(cmd.split())
@ -66,73 +72,105 @@ class ShellTest(utils.TestCase):
def assert_called_anytime(self, method, url, body=None):
return self.shell.cs.assert_called_anytime(method, url, body)
@httpretty.activate
def test_list(self):
self.register_keystone_auth_fixture()
self.run_command('list')
# NOTE(jdg): we default to detail currently
self.assert_called('GET', '/volumes/detail')
@httpretty.activate
def test_list_filter_status(self):
self.register_keystone_auth_fixture()
self.run_command('list --status=available')
self.assert_called('GET', '/volumes/detail?status=available')
@httpretty.activate
def test_list_filter_name(self):
self.register_keystone_auth_fixture()
self.run_command('list --name=1234')
self.assert_called('GET', '/volumes/detail?name=1234')
@httpretty.activate
def test_list_all_tenants(self):
self.register_keystone_auth_fixture()
self.run_command('list --all-tenants=1')
self.assert_called('GET', '/volumes/detail?all_tenants=1')
@httpretty.activate
def test_list_availability_zone(self):
self.register_keystone_auth_fixture()
self.run_command('availability-zone-list')
self.assert_called('GET', '/os-availability-zone')
@httpretty.activate
def test_show(self):
self.register_keystone_auth_fixture()
self.run_command('show 1234')
self.assert_called('GET', '/volumes/1234')
@httpretty.activate
def test_delete(self):
self.register_keystone_auth_fixture()
self.run_command('delete 1234')
self.assert_called('DELETE', '/volumes/1234')
@httpretty.activate
def test_delete_by_name(self):
self.register_keystone_auth_fixture()
self.run_command('delete sample-volume')
self.assert_called_anytime('GET', '/volumes/detail?all_tenants=1')
self.assert_called('DELETE', '/volumes/1234')
@httpretty.activate
def test_delete_multiple(self):
self.register_keystone_auth_fixture()
self.run_command('delete 1234 5678')
self.assert_called_anytime('DELETE', '/volumes/1234')
self.assert_called('DELETE', '/volumes/5678')
@httpretty.activate
def test_backup(self):
self.register_keystone_auth_fixture()
self.run_command('backup-create 1234')
self.assert_called('POST', '/backups')
@httpretty.activate
def test_restore(self):
self.register_keystone_auth_fixture()
self.run_command('backup-restore 1234')
self.assert_called('POST', '/backups/1234/restore')
@httpretty.activate
def test_record_export(self):
self.register_keystone_auth_fixture()
self.run_command('backup-export 1234')
self.assert_called('GET', '/backups/1234/export_record')
@httpretty.activate
def test_record_import(self):
self.register_keystone_auth_fixture()
self.run_command('backup-import fake.driver URL_STRING')
expected = {'backup-record': {'backup_service': 'fake.driver',
'backup_url': 'URL_STRING'}}
self.assert_called('POST', '/backups/import_record', expected)
@httpretty.activate
def test_snapshot_list_filter_volume_id(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-list --volume-id=1234')
self.assert_called('GET', '/snapshots/detail?volume_id=1234')
@httpretty.activate
def test_snapshot_list_filter_status_and_volume_id(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-list --status=available --volume-id=1234')
self.assert_called('GET', '/snapshots/detail?'
'status=available&volume_id=1234')
@httpretty.activate
def test_rename(self):
self.register_keystone_auth_fixture()
# basic rename with positional arguments
self.run_command('rename 1234 new-name')
expected = {'volume': {'name': 'new-name'}}
@ -153,7 +191,9 @@ class ShellTest(utils.TestCase):
# Call rename with no arguments
self.assertRaises(SystemExit, self.run_command, 'rename')
@httpretty.activate
def test_rename_snapshot(self):
self.register_keystone_auth_fixture()
# basic rename with positional arguments
self.run_command('snapshot-rename 1234 new-name')
expected = {'snapshot': {'name': 'new-name'}}
@ -175,32 +215,44 @@ class ShellTest(utils.TestCase):
# Call snapshot-rename with no arguments
self.assertRaises(SystemExit, self.run_command, 'snapshot-rename')
@httpretty.activate
def test_set_metadata_set(self):
self.register_keystone_auth_fixture()
self.run_command('metadata 1234 set key1=val1 key2=val2')
self.assert_called('POST', '/volumes/1234/metadata',
{'metadata': {'key1': 'val1', 'key2': 'val2'}})
@httpretty.activate
def test_set_metadata_delete_dict(self):
self.register_keystone_auth_fixture()
self.run_command('metadata 1234 unset key1=val1 key2=val2')
self.assert_called('DELETE', '/volumes/1234/metadata/key1')
self.assert_called('DELETE', '/volumes/1234/metadata/key2', pos=-2)
@httpretty.activate
def test_set_metadata_delete_keys(self):
self.register_keystone_auth_fixture()
self.run_command('metadata 1234 unset key1 key2')
self.assert_called('DELETE', '/volumes/1234/metadata/key1')
self.assert_called('DELETE', '/volumes/1234/metadata/key2', pos=-2)
@httpretty.activate
def test_reset_state(self):
self.register_keystone_auth_fixture()
self.run_command('reset-state 1234')
expected = {'os-reset_status': {'status': 'available'}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
@httpretty.activate
def test_reset_state_with_flag(self):
self.register_keystone_auth_fixture()
self.run_command('reset-state --state error 1234')
expected = {'os-reset_status': {'status': 'error'}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
@httpretty.activate
def test_reset_state_multiple(self):
self.register_keystone_auth_fixture()
self.run_command('reset-state 1234 5678 --state error')
expected = {'os-reset_status': {'status': 'error'}}
self.assert_called_anytime('POST', '/volumes/1234/action',
@ -208,17 +260,23 @@ class ShellTest(utils.TestCase):
self.assert_called_anytime('POST', '/volumes/5678/action',
body=expected)
@httpretty.activate
def test_snapshot_reset_state(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-reset-state 1234')
expected = {'os-reset_status': {'status': 'available'}}
self.assert_called('POST', '/snapshots/1234/action', body=expected)
@httpretty.activate
def test_snapshot_reset_state_with_flag(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-reset-state --state error 1234')
expected = {'os-reset_status': {'status': 'error'}}
self.assert_called('POST', '/snapshots/1234/action', body=expected)
@httpretty.activate
def test_snapshot_reset_state_multiple(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-reset-state 1234 5678')
expected = {'os-reset_status': {'status': 'available'}}
self.assert_called_anytime('POST', '/snapshots/1234/action',
@ -226,6 +284,7 @@ class ShellTest(utils.TestCase):
self.assert_called_anytime('POST', '/snapshots/5678/action',
body=expected)
@httpretty.activate
def test_encryption_type_list(self):
"""
Test encryption-type-list shell command.
@ -234,11 +293,13 @@ class ShellTest(utils.TestCase):
- one to get the volume type list information
- one per volume type to retrieve the encryption type information
"""
self.register_keystone_auth_fixture()
self.run_command('encryption-type-list')
self.assert_called_anytime('GET', '/types')
self.assert_called_anytime('GET', '/types/1/encryption')
self.assert_called_anytime('GET', '/types/2/encryption')
@httpretty.activate
def test_encryption_type_show(self):
"""
Test encryption-type-show shell command.
@ -247,10 +308,12 @@ class ShellTest(utils.TestCase):
- one to get the volume type information
- one to get the encryption type information
"""
self.register_keystone_auth_fixture()
self.run_command('encryption-type-show 1')
self.assert_called('GET', '/types/1/encryption')
self.assert_called_anytime('GET', '/types/1')
@httpretty.activate
def test_encryption_type_create(self):
"""
Test encryption-type-create shell command.
@ -259,6 +322,8 @@ class ShellTest(utils.TestCase):
- one GET request to retrieve the relevant volume type information
- one POST request to create the new encryption type
"""
self.register_keystone_auth_fixture()
expected = {'encryption': {'cipher': None, 'key_size': None,
'provider': 'TestProvider',
'control_location': 'front-end'}}
@ -277,6 +342,7 @@ class ShellTest(utils.TestCase):
"""
self.skipTest("Not implemented")
@httpretty.activate
def test_encryption_type_delete(self):
"""
Test encryption-type-delete shell command.
@ -285,43 +351,65 @@ class ShellTest(utils.TestCase):
- one GET request to retrieve the relevant volume type information
- one DELETE request to delete the encryption type information
"""
self.register_keystone_auth_fixture()
self.run_command('encryption-type-delete 1')
self.assert_called('DELETE', '/types/1/encryption/provider')
self.assert_called_anytime('GET', '/types/1')
@httpretty.activate
def test_migrate_volume(self):
self.register_keystone_auth_fixture()
self.run_command('migrate 1234 fakehost --force-host-copy=True')
expected = {'os-migrate_volume': {'force_host_copy': 'True',
'host': 'fakehost'}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
@httpretty.activate
def test_snapshot_metadata_set(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-metadata 1234 set key1=val1 key2=val2')
self.assert_called('POST', '/snapshots/1234/metadata',
{'metadata': {'key1': 'val1', 'key2': 'val2'}})
@httpretty.activate
def test_snapshot_metadata_unset_dict(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-metadata 1234 unset key1=val1 key2=val2')
self.assert_called_anytime('DELETE', '/snapshots/1234/metadata/key1')
self.assert_called_anytime('DELETE', '/snapshots/1234/metadata/key2')
@httpretty.activate
def test_snapshot_metadata_unset_keys(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-metadata 1234 unset key1 key2')
self.assert_called_anytime('DELETE', '/snapshots/1234/metadata/key1')
self.assert_called_anytime('DELETE', '/snapshots/1234/metadata/key2')
@httpretty.activate
def test_volume_metadata_update_all(self):
self.register_keystone_auth_fixture()
self.run_command('metadata-update-all 1234 key1=val1 key2=val2')
self.assert_called('PUT', '/volumes/1234/metadata',
{'metadata': {'key1': 'val1', 'key2': 'val2'}})
@httpretty.activate
def test_snapshot_metadata_update_all(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-metadata-update-all\
1234 key1=val1 key2=val2')
self.assert_called('PUT', '/snapshots/1234/metadata',
{'metadata': {'key1': 'val1', 'key2': 'val2'}})
@httpretty.activate
def test_readonly_mode_update(self):
self.register_keystone_auth_fixture()
self.run_command('readonly-mode-update 1234 True')
expected = {'os-update_readonly_flag': {'readonly': True}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
@ -330,43 +418,67 @@ class ShellTest(utils.TestCase):
expected = {'os-update_readonly_flag': {'readonly': False}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
@httpretty.activate
def test_service_disable(self):
self.register_keystone_auth_fixture()
self.run_command('service-disable host cinder-volume')
self.assert_called('PUT', '/os-services/disable',
{"binary": "cinder-volume", "host": "host"})
@httpretty.activate
def test_services_disable_with_reason(self):
self.register_keystone_auth_fixture()
cmd = 'service-disable host cinder-volume --reason no_reason'
self.run_command(cmd)
body = {'host': 'host', 'binary': 'cinder-volume',
'disabled_reason': 'no_reason'}
self.assert_called('PUT', '/os-services/disable-log-reason', body)
@httpretty.activate
def test_service_enable(self):
self.register_keystone_auth_fixture()
self.run_command('service-enable host cinder-volume')
self.assert_called('PUT', '/os-services/enable',
{"binary": "cinder-volume", "host": "host"})
@httpretty.activate
def test_retype_with_policy(self):
self.register_keystone_auth_fixture()
self.run_command('retype 1234 foo --migration-policy=on-demand')
expected = {'os-retype': {'new_type': 'foo',
'migration_policy': 'on-demand'}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
@httpretty.activate
def test_retype_default_policy(self):
self.register_keystone_auth_fixture()
self.run_command('retype 1234 foo')
expected = {'os-retype': {'new_type': 'foo',
'migration_policy': 'never'}}
self.assert_called('POST', '/volumes/1234/action', body=expected)
@httpretty.activate
def test_snapshot_delete(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-delete 1234')
self.assert_called('DELETE', '/snapshots/1234')
@httpretty.activate
def test_quota_delete(self):
self.register_keystone_auth_fixture()
self.run_command('quota-delete 1234')
self.assert_called('DELETE', '/os-quota-sets/1234')
@httpretty.activate
def test_snapshot_delete_multiple(self):
self.register_keystone_auth_fixture()
self.run_command('snapshot-delete 5678')
self.assert_called('DELETE', '/snapshots/5678')

View File

@ -51,7 +51,7 @@ class Client(object):
service_type='volume', service_name=None,
volume_service_name=None, retries=None,
http_log_debug=False, cacert=None,
auth_system='keystone', auth_plugin=None):
auth_system='keystone', auth_plugin=None, session=None):
# FIXME(comstud): Rename the api_key argument above when we
# know it's not being used as keyword argument
password = api_key
@ -80,16 +80,16 @@ class Client(object):
setattr(self, extension.name,
extension.manager_class(self))
self.client = client.HTTPClient(
username,
password,
project_id,
auth_url,
self.client = client._construct_http_client(
username=username,
password=password,
project_id=project_id,
auth_url=auth_url,
insecure=insecure,
timeout=timeout,
tenant_id=tenant_id,
proxy_tenant_id=tenant_id,
proxy_token=proxy_token,
proxy_tenant_id=proxy_tenant_id,
region_name=region_name,
endpoint_type=endpoint_type,
service_type=service_type,
@ -99,7 +99,8 @@ class Client(object):
http_log_debug=http_log_debug,
cacert=cacert,
auth_system=auth_system,
auth_plugin=auth_plugin)
auth_plugin=auth_plugin,
session=session)
def authenticate(self):
"""

View File

@ -49,7 +49,8 @@ class Client(object):
service_type='volumev2', service_name=None,
volume_service_name=None, retries=None,
http_log_debug=False, cacert=None,
auth_system='keystone', auth_plugin=None):
auth_system='keystone', auth_plugin=None,
session=None):
# FIXME(comstud): Rename the api_key argument above when we
# know it's not being used as keyword argument
password = api_key
@ -78,16 +79,16 @@ class Client(object):
setattr(self, extension.name,
extension.manager_class(self))
self.client = client.HTTPClient(
username,
password,
project_id,
auth_url,
self.client = client._construct_http_client(
username=username,
password=password,
project_id=project_id,
auth_url=auth_url,
insecure=insecure,
timeout=timeout,
tenant_id=tenant_id,
proxy_tenant_id=tenant_id,
proxy_token=proxy_token,
proxy_tenant_id=proxy_tenant_id,
region_name=region_name,
endpoint_type=endpoint_type,
service_type=service_type,
@ -97,7 +98,8 @@ class Client(object):
http_log_debug=http_log_debug,
cacert=cacert,
auth_system=auth_system,
auth_plugin=auth_plugin)
auth_plugin=auth_plugin,
session=session)
def authenticate(self):
"""Authenticate against the server.

View File

@ -4,6 +4,7 @@ coverage>=3.6
discover
fixtures>=0.3.14
mock>=1.0
httpretty>=0.8.0
python-subunit>=0.0.18
sphinx>=1.2.1,<1.3
testtools>=0.9.34