Use os-client-config and keystoneauth1 in shell
keystoneauth1 is the new auth-only library for getting keystone Sessions, which is lighter-weight and does not require the entire keystoneclient library. It also handles all of the keystone version discover and plugin selection so that code doesn't have to live in neutronclient. Additionally, use os-client-config to process options and get the Session from keystoneauth1. This adds support for reading clouds.yaml files and supporting the OS_CLOUD env var for selecting named clouds from a list of them. This is a step towards bug#1503428 but is not the whole picture. Remove the auth tests - since they are covered inside of ksa. Closes-Bug: #1507384 Change-Id: Ic4f9fd8f231c33513fd74da58ab1b4a3fb00d9f4
This commit is contained in:
parent
3d736107f9
commit
2eed8ea24a
@ -21,8 +21,8 @@ except ImportError:
|
||||
import logging
|
||||
import os
|
||||
|
||||
from keystoneclient import access
|
||||
from keystoneclient import adapter
|
||||
from keystoneauth1 import access
|
||||
from keystoneauth1 import adapter
|
||||
import requests
|
||||
|
||||
from neutronclient.common import exceptions
|
||||
@ -179,7 +179,7 @@ class HTTPClient(object):
|
||||
|
||||
def _extract_service_catalog(self, body):
|
||||
"""Set the client's service catalog from the response data."""
|
||||
self.auth_ref = access.AccessInfo.factory(body=body)
|
||||
self.auth_ref = access.create(body=body)
|
||||
self.service_catalog = self.auth_ref.service_catalog
|
||||
self.auth_token = self.auth_ref.auth_token
|
||||
self.auth_tenant_id = self.auth_ref.tenant_id
|
||||
@ -187,9 +187,9 @@ class HTTPClient(object):
|
||||
|
||||
if not self.endpoint_url:
|
||||
self.endpoint_url = self.service_catalog.url_for(
|
||||
attr='region', filter_value=self.region_name,
|
||||
region_name=self.region_name,
|
||||
service_type=self.service_type,
|
||||
endpoint_type=self.endpoint_type)
|
||||
interface=self.endpoint_type)
|
||||
|
||||
def _authenticate_keystone(self):
|
||||
if self.user_id:
|
||||
@ -355,7 +355,7 @@ def construct_http_client(username=None,
|
||||
timeout=None,
|
||||
endpoint_url=None,
|
||||
insecure=False,
|
||||
endpoint_type='publicURL',
|
||||
endpoint_type='public',
|
||||
log_credentials=None,
|
||||
auth_strategy='keystone',
|
||||
ca_cert=None,
|
||||
|
@ -62,6 +62,7 @@ class ClientManager(object):
|
||||
ca_cert=None,
|
||||
log_credentials=False,
|
||||
service_type=None,
|
||||
service_name=None,
|
||||
timeout=None,
|
||||
retries=0,
|
||||
raise_errors=True,
|
||||
@ -72,6 +73,7 @@ class ClientManager(object):
|
||||
self._url = url
|
||||
self._auth_url = auth_url
|
||||
self._service_type = service_type
|
||||
self._service_name = service_name
|
||||
self._endpoint_type = endpoint_type
|
||||
self._tenant_name = tenant_name
|
||||
self._tenant_id = tenant_id
|
||||
@ -103,6 +105,7 @@ class ClientManager(object):
|
||||
region_name=self._region_name,
|
||||
auth_url=self._auth_url,
|
||||
service_type=self._service_type,
|
||||
service_name=self._service_name,
|
||||
endpoint_type=self._endpoint_type,
|
||||
insecure=self._insecure,
|
||||
ca_cert=self._ca_cert,
|
||||
|
@ -21,20 +21,15 @@ Command-line interface to the Neutron APIs
|
||||
from __future__ import print_function
|
||||
|
||||
import argparse
|
||||
import getpass
|
||||
import inspect
|
||||
import itertools
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
from keystoneclient.auth.identity import v2 as v2_auth
|
||||
from keystoneclient.auth.identity import v3 as v3_auth
|
||||
from keystoneclient import discover
|
||||
from keystoneclient.openstack.common.apiclient import exceptions as ks_exc
|
||||
from keystoneclient import session
|
||||
from keystoneauth1 import session
|
||||
import os_client_config
|
||||
from oslo_utils import encodeutils
|
||||
import six.moves.urllib.parse as urlparse
|
||||
|
||||
from cliff import app
|
||||
from cliff import commandmanager
|
||||
@ -499,7 +494,7 @@ class NeutronShell(app.App):
|
||||
default=0,
|
||||
help=_("How many times the request to the Neutron server should "
|
||||
"be retried if it fails."))
|
||||
# FIXME(bklei): this method should come from python-keystoneclient
|
||||
# FIXME(bklei): this method should come from keystoneauth1
|
||||
self._append_global_identity_args(parser)
|
||||
|
||||
return parser
|
||||
@ -507,9 +502,9 @@ class NeutronShell(app.App):
|
||||
def _append_global_identity_args(self, parser):
|
||||
# FIXME(bklei): these are global identity (Keystone) arguments which
|
||||
# should be consistent and shared by all service clients. Therefore,
|
||||
# they should be provided by python-keystoneclient. We will need to
|
||||
# they should be provided by keystoneauth1. We will need to
|
||||
# refactor this code once this functionality is available in
|
||||
# python-keystoneclient.
|
||||
# keystoneauth1.
|
||||
#
|
||||
# Note: At that time we'll need to decide if we can just abandon
|
||||
# the deprecated args (--service-type and --endpoint-type).
|
||||
@ -521,8 +516,8 @@ class NeutronShell(app.App):
|
||||
|
||||
parser.add_argument(
|
||||
'--os-endpoint-type', metavar='<os-endpoint-type>',
|
||||
default=env('OS_ENDPOINT_TYPE', default='publicURL'),
|
||||
help=_('Defaults to env[OS_ENDPOINT_TYPE] or publicURL.'))
|
||||
default=env('OS_ENDPOINT_TYPE', default='public'),
|
||||
help=_('Defaults to env[OS_ENDPOINT_TYPE] or public.'))
|
||||
|
||||
# FIXME(bklei): --service-type is deprecated but kept in for
|
||||
# backward compatibility.
|
||||
@ -535,7 +530,7 @@ class NeutronShell(app.App):
|
||||
# backward compatibility.
|
||||
parser.add_argument(
|
||||
'--endpoint-type', metavar='<endpoint-type>',
|
||||
default=env('OS_ENDPOINT_TYPE', default='publicURL'),
|
||||
default=env('OS_ENDPOINT_TYPE', default='public'),
|
||||
help=_('DEPRECATED! Use --os-endpoint-type.'))
|
||||
|
||||
parser.add_argument(
|
||||
@ -547,6 +542,11 @@ class NeutronShell(app.App):
|
||||
'--os_auth_strategy',
|
||||
help=argparse.SUPPRESS)
|
||||
|
||||
parser.add_argument(
|
||||
'--os-cloud', metavar='<cloud>',
|
||||
default=env('OS_CLOUD', default=None),
|
||||
help=_('Defaults to env[OS_CLOUD].'))
|
||||
|
||||
parser.add_argument(
|
||||
'--os-auth-url', metavar='<auth-url>',
|
||||
default=env('OS_AUTH_URL'),
|
||||
@ -829,104 +829,28 @@ class NeutronShell(app.App):
|
||||
"""Make sure the user has provided all of the authentication
|
||||
info we need.
|
||||
"""
|
||||
if self.options.os_auth_strategy == 'keystone':
|
||||
if self.options.os_token or self.options.os_url:
|
||||
# Token flow auth takes priority
|
||||
if not self.options.os_token:
|
||||
raise exc.CommandError(
|
||||
_("You must provide a token via"
|
||||
" either --os-token or env[OS_TOKEN]"
|
||||
" when providing a service URL"))
|
||||
cloud_config = os_client_config.OpenStackConfig().get_one_cloud(
|
||||
cloud=self.options.os_cloud, argparse=self.options,
|
||||
network_api_version=self.api_version)
|
||||
verify, cert = cloud_config.get_requests_verify_args()
|
||||
auth = cloud_config.get_auth()
|
||||
|
||||
if not self.options.os_url:
|
||||
raise exc.CommandError(
|
||||
_("You must provide a service URL via"
|
||||
" either --os-url or env[OS_URL]"
|
||||
" when providing a token"))
|
||||
|
||||
else:
|
||||
# Validate password flow auth
|
||||
project_info = (self.options.os_tenant_name or
|
||||
self.options.os_tenant_id or
|
||||
(self.options.os_project_name and
|
||||
(self.options.os_project_domain_name or
|
||||
self.options.os_project_domain_id)) or
|
||||
self.options.os_project_id)
|
||||
|
||||
if (not self.options.os_username
|
||||
and not self.options.os_user_id):
|
||||
raise exc.CommandError(
|
||||
_("You must provide a username or user ID via"
|
||||
" --os-username, env[OS_USERNAME] or"
|
||||
" --os-user-id, env[OS_USER_ID]"))
|
||||
|
||||
if not self.options.os_password:
|
||||
# No password, If we've got a tty, try prompting for it
|
||||
if hasattr(sys.stdin, 'isatty') and sys.stdin.isatty():
|
||||
# Check for Ctl-D
|
||||
try:
|
||||
self.options.os_password = getpass.getpass(
|
||||
'OS Password: ')
|
||||
except EOFError:
|
||||
pass
|
||||
# No password because we didn't have a tty or the
|
||||
# user Ctl-D when prompted.
|
||||
if not self.options.os_password:
|
||||
raise exc.CommandError(
|
||||
_("You must provide a password via"
|
||||
" either --os-password or env[OS_PASSWORD]"))
|
||||
|
||||
if (not project_info):
|
||||
# tenent is deprecated in Keystone v3. Use the latest
|
||||
# terminology instead.
|
||||
raise exc.CommandError(
|
||||
_("You must provide a project_id or project_name ("
|
||||
"with project_domain_name or project_domain_id) "
|
||||
"via "
|
||||
" --os-project-id (env[OS_PROJECT_ID])"
|
||||
" --os-project-name (env[OS_PROJECT_NAME]),"
|
||||
" --os-project-domain-id "
|
||||
"(env[OS_PROJECT_DOMAIN_ID])"
|
||||
" --os-project-domain-name "
|
||||
"(env[OS_PROJECT_DOMAIN_NAME])"))
|
||||
|
||||
if not self.options.os_auth_url:
|
||||
raise exc.CommandError(
|
||||
_("You must provide an auth url via"
|
||||
" either --os-auth-url or via env[OS_AUTH_URL]"))
|
||||
auth_session = self._get_keystone_session()
|
||||
auth = auth_session.auth
|
||||
else: # not keystone
|
||||
if not self.options.os_url:
|
||||
raise exc.CommandError(
|
||||
_("You must provide a service URL via"
|
||||
" either --os-url or env[OS_URL]"))
|
||||
auth_session = None
|
||||
auth = None
|
||||
auth_session = session.Session(
|
||||
auth=auth, verify=verify, cert=cert,
|
||||
timeout=self.options.http_timeout)
|
||||
|
||||
interface = self.options.os_endpoint_type or self.endpoint_type
|
||||
if interface.endswith('URL'):
|
||||
interface = interface[:-3]
|
||||
self.client_manager = clientmanager.ClientManager(
|
||||
token=self.options.os_token,
|
||||
url=self.options.os_url,
|
||||
auth_url=self.options.os_auth_url,
|
||||
tenant_name=self.options.os_tenant_name,
|
||||
tenant_id=self.options.os_tenant_id,
|
||||
username=self.options.os_username,
|
||||
user_id=self.options.os_user_id,
|
||||
password=self.options.os_password,
|
||||
region_name=self.options.os_region_name,
|
||||
api_version=self.api_version,
|
||||
auth_strategy=self.options.os_auth_strategy,
|
||||
# FIXME (bklei) honor deprecated service_type and
|
||||
# endpoint type until they are removed
|
||||
service_type=self.options.os_service_type or
|
||||
self.options.service_type,
|
||||
endpoint_type=self.options.os_endpoint_type or self.endpoint_type,
|
||||
insecure=self.options.insecure,
|
||||
ca_cert=self.options.os_cacert,
|
||||
timeout=self.options.http_timeout,
|
||||
retries=self.options.retries,
|
||||
raise_errors=False,
|
||||
session=auth_session,
|
||||
region_name=cloud_config.get_region_name(),
|
||||
api_version=cloud_config.get_api_version('network'),
|
||||
service_type=cloud_config.get_service_type('network'),
|
||||
service_name=cloud_config.get_service_name('network'),
|
||||
endpoint_type=interface,
|
||||
auth=auth,
|
||||
log_credentials=True)
|
||||
return
|
||||
@ -981,89 +905,6 @@ class NeutronShell(app.App):
|
||||
root_logger.addHandler(console)
|
||||
return
|
||||
|
||||
def get_v2_auth(self, v2_auth_url):
|
||||
return v2_auth.Password(
|
||||
v2_auth_url,
|
||||
username=self.options.os_username,
|
||||
password=self.options.os_password,
|
||||
tenant_id=self.options.os_tenant_id,
|
||||
tenant_name=self.options.os_tenant_name)
|
||||
|
||||
def get_v3_auth(self, v3_auth_url):
|
||||
project_id = self.options.os_project_id or self.options.os_tenant_id
|
||||
project_name = (self.options.os_project_name or
|
||||
self.options.os_tenant_name)
|
||||
|
||||
return v3_auth.Password(
|
||||
v3_auth_url,
|
||||
username=self.options.os_username,
|
||||
password=self.options.os_password,
|
||||
user_id=self.options.os_user_id,
|
||||
user_domain_name=self.options.os_user_domain_name,
|
||||
user_domain_id=self.options.os_user_domain_id,
|
||||
project_id=project_id,
|
||||
project_name=project_name,
|
||||
project_domain_name=self.options.os_project_domain_name,
|
||||
project_domain_id=self.options.os_project_domain_id
|
||||
)
|
||||
|
||||
def _discover_auth_versions(self, session, auth_url):
|
||||
# discover the API versions the server is supporting base on the
|
||||
# given URL
|
||||
try:
|
||||
ks_discover = discover.Discover(session=session, auth_url=auth_url)
|
||||
return (ks_discover.url_for('2.0'), ks_discover.url_for('3.0'))
|
||||
except ks_exc.ClientException:
|
||||
# Identity service may not support discover API version.
|
||||
# Lets try to figure out the API version from the original URL.
|
||||
url_parts = urlparse.urlparse(auth_url)
|
||||
(scheme, netloc, path, params, query, fragment) = url_parts
|
||||
path = path.lower()
|
||||
if path.startswith('/v3'):
|
||||
return (None, auth_url)
|
||||
elif path.startswith('/v2'):
|
||||
return (auth_url, None)
|
||||
else:
|
||||
# not enough information to determine the auth version
|
||||
msg = _('Unable to determine the Keystone version '
|
||||
'to authenticate with using the given '
|
||||
'auth_url. Identity service may not support API '
|
||||
'version discovery. Please provide a versioned '
|
||||
'auth_url instead.')
|
||||
raise exc.CommandError(msg)
|
||||
|
||||
def _get_keystone_session(self):
|
||||
# first create a Keystone session
|
||||
cacert = self.options.os_cacert or None
|
||||
cert = self.options.os_cert or None
|
||||
key = self.options.os_key or None
|
||||
insecure = self.options.insecure or False
|
||||
ks_session = session.Session.construct(dict(cacert=cacert,
|
||||
cert=cert,
|
||||
key=key,
|
||||
insecure=insecure))
|
||||
# discover the supported keystone versions using the given url
|
||||
(v2_auth_url, v3_auth_url) = self._discover_auth_versions(
|
||||
session=ks_session,
|
||||
auth_url=self.options.os_auth_url)
|
||||
|
||||
# Determine which authentication plugin to use. First inspect the
|
||||
# auth_url to see the supported version. If both v3 and v2 are
|
||||
# supported, then use the highest version if possible.
|
||||
user_domain_name = self.options.os_user_domain_name or None
|
||||
user_domain_id = self.options.os_user_domain_id or None
|
||||
project_domain_name = self.options.os_project_domain_name or None
|
||||
project_domain_id = self.options.os_project_domain_id or None
|
||||
domain_info = (user_domain_name or user_domain_id or
|
||||
project_domain_name or project_domain_id)
|
||||
|
||||
if (v2_auth_url and not domain_info) or not v3_auth_url:
|
||||
ks_session.auth = self.get_v2_auth(v2_auth_url)
|
||||
else:
|
||||
ks_session.auth = self.get_v3_auth(v3_auth_url)
|
||||
|
||||
return ks_session
|
||||
|
||||
|
||||
def main(argv=sys.argv[1:]):
|
||||
try:
|
||||
|
@ -12,9 +12,8 @@
|
||||
|
||||
import uuid
|
||||
|
||||
from keystoneclient.auth.identity import v2 as v2_auth
|
||||
from keystoneclient import discover
|
||||
from keystoneclient import session
|
||||
from keystoneauth1 import plugin as ksa_plugin
|
||||
from keystoneauth1 import session
|
||||
from tempest_lib import base
|
||||
import testtools
|
||||
|
||||
@ -38,13 +37,22 @@ class LibraryTestBase(base.BaseTestCase):
|
||||
class Libv2HTTPClientTestBase(LibraryTestBase):
|
||||
|
||||
def _get_client(self):
|
||||
|
||||
creds = func_base.credentials()
|
||||
session_params = {}
|
||||
ks_session = session.Session.construct(session_params)
|
||||
ks_discover = discover.Discover(session=ks_session,
|
||||
auth_url=creds['auth_url'])
|
||||
# At the moment, we use keystone v2 API
|
||||
v2_auth_url = ks_discover.url_for('2.0')
|
||||
cloud_config = func_base.get_cloud_config()
|
||||
|
||||
# We're getting a session so we can find the v2 url via KSA
|
||||
keystone_auth = cloud_config.get_auth()
|
||||
(verify, cert) = cloud_config.get_requests_verify_args()
|
||||
|
||||
ks_session = session.Session(
|
||||
auth=keystone_auth, verify=verify, cert=cert)
|
||||
|
||||
# for the old HTTPClient, we use keystone v2 API, regardless of
|
||||
# whether v3 also exists or is configured
|
||||
v2_auth_url = keystone_auth.get_endpoint(
|
||||
ks_session, interface=ksa_plugin.AUTH_INTERFACE, version=(2, 0))
|
||||
|
||||
return v2_client.Client(username=creds['username'],
|
||||
password=creds['password'],
|
||||
tenant_name=creds['project_name'],
|
||||
@ -54,18 +62,14 @@ class Libv2HTTPClientTestBase(LibraryTestBase):
|
||||
class Libv2SessionClientTestBase(LibraryTestBase):
|
||||
|
||||
def _get_client(self):
|
||||
creds = func_base.credentials()
|
||||
session_params = {}
|
||||
ks_session = session.Session.construct(session_params)
|
||||
ks_discover = discover.Discover(session=ks_session,
|
||||
auth_url=creds['auth_url'])
|
||||
# At the moment, we use keystone v2 API
|
||||
v2_auth_url = ks_discover.url_for('2.0')
|
||||
ks_session.auth = v2_auth.Password(
|
||||
v2_auth_url,
|
||||
username=creds['username'],
|
||||
password=creds['password'],
|
||||
tenant_name=creds['project_name'])
|
||||
cloud_config = func_base.get_cloud_config()
|
||||
keystone_auth = cloud_config.get_auth()
|
||||
(verify, cert) = cloud_config.get_requests_verify_args()
|
||||
|
||||
ks_session = session.Session(
|
||||
auth=keystone_auth,
|
||||
verify=verify,
|
||||
cert=cert)
|
||||
return v2_client.Client(session=ks_session)
|
||||
|
||||
|
||||
|
@ -1,434 +0,0 @@
|
||||
# Copyright 2012 NEC Corporation
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import fixtures
|
||||
from oslo_serialization import jsonutils
|
||||
from requests_mock.contrib import fixture as mock_fixture
|
||||
import testtools
|
||||
|
||||
from keystoneclient.auth.identity import v2 as ks_v2_auth
|
||||
from keystoneclient.auth.identity import v3 as ks_v3_auth
|
||||
from keystoneclient import exceptions as ks_exceptions
|
||||
from keystoneclient import fixture as ks_fixture
|
||||
from keystoneclient import session
|
||||
|
||||
from neutronclient import client
|
||||
from neutronclient.common import exceptions
|
||||
|
||||
|
||||
USERNAME = 'testuser'
|
||||
USER_ID = 'testuser_id'
|
||||
TENANT_NAME = 'testtenant'
|
||||
TENANT_ID = 'testtenant_id'
|
||||
PASSWORD = 'password'
|
||||
ENDPOINT_URL = 'http://localurl'
|
||||
PUBLIC_ENDPOINT_URL = '%s/public' % ENDPOINT_URL
|
||||
ADMIN_ENDPOINT_URL = '%s/admin' % ENDPOINT_URL
|
||||
INTERNAL_ENDPOINT_URL = '%s/internal' % ENDPOINT_URL
|
||||
ENDPOINT_OVERRIDE = 'http://otherurl'
|
||||
TOKENID = uuid.uuid4().hex
|
||||
REGION = 'RegionOne'
|
||||
NOAUTH = 'noauth'
|
||||
|
||||
KS_TOKEN_RESULT = ks_fixture.V2Token()
|
||||
KS_TOKEN_RESULT.set_scope()
|
||||
_s = KS_TOKEN_RESULT.add_service('network', 'Neutron Service')
|
||||
_s.add_endpoint(ENDPOINT_URL, region=REGION)
|
||||
|
||||
ENDPOINTS_RESULT = {
|
||||
'endpoints': [{
|
||||
'type': 'network',
|
||||
'name': 'Neutron Service',
|
||||
'region': REGION,
|
||||
'adminURL': ENDPOINT_URL,
|
||||
'internalURL': ENDPOINT_URL,
|
||||
'publicURL': ENDPOINT_URL
|
||||
}]
|
||||
}
|
||||
|
||||
BASE_URL = "http://keystone.example.com:5000/"
|
||||
|
||||
V2_URL = "%sv2.0" % BASE_URL
|
||||
V3_URL = "%sv3" % BASE_URL
|
||||
|
||||
_v2 = ks_fixture.V2Discovery(V2_URL)
|
||||
_v3 = ks_fixture.V3Discovery(V3_URL)
|
||||
|
||||
V3_VERSION_LIST = jsonutils.dumps({'versions': {'values': [_v2, _v3]}})
|
||||
|
||||
V2_VERSION_ENTRY = {'version': _v2}
|
||||
V3_VERSION_ENTRY = {'version': _v3}
|
||||
|
||||
|
||||
def setup_keystone_v2(mrequests):
|
||||
v2_token = ks_fixture.V2Token(token_id=TOKENID)
|
||||
service = v2_token.add_service('network')
|
||||
service.add_endpoint(PUBLIC_ENDPOINT_URL, region=REGION)
|
||||
|
||||
mrequests.register_uri('POST',
|
||||
'%s/tokens' % (V2_URL),
|
||||
json=v2_token)
|
||||
|
||||
auth_session = session.Session()
|
||||
auth_plugin = ks_v2_auth.Password(V2_URL, 'xx', 'xx')
|
||||
return auth_session, auth_plugin
|
||||
|
||||
|
||||
def setup_keystone_v3(mrequests):
|
||||
mrequests.register_uri('GET',
|
||||
V3_URL,
|
||||
json=V3_VERSION_ENTRY)
|
||||
|
||||
v3_token = ks_fixture.V3Token()
|
||||
service = v3_token.add_service('network')
|
||||
service.add_standard_endpoints(public=PUBLIC_ENDPOINT_URL,
|
||||
admin=ADMIN_ENDPOINT_URL,
|
||||
internal=INTERNAL_ENDPOINT_URL,
|
||||
region=REGION)
|
||||
|
||||
mrequests.register_uri('POST',
|
||||
'%s/auth/tokens' % (V3_URL),
|
||||
text=json.dumps(v3_token),
|
||||
headers={'X-Subject-Token': TOKENID})
|
||||
|
||||
auth_session = session.Session()
|
||||
auth_plugin = ks_v3_auth.Password(V3_URL,
|
||||
username='xx',
|
||||
user_id='xx',
|
||||
user_domain_name='xx',
|
||||
user_domain_id='xx')
|
||||
return auth_session, auth_plugin
|
||||
|
||||
|
||||
AUTH_URL = V2_URL
|
||||
|
||||
|
||||
class CLITestAuthNoAuth(testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
"""Prepare the test environment."""
|
||||
super(CLITestAuthNoAuth, self).setUp()
|
||||
|
||||
self.requests = self.useFixture(mock_fixture.Fixture())
|
||||
|
||||
self.client = client.HTTPClient(username=USERNAME,
|
||||
tenant_name=TENANT_NAME,
|
||||
password=PASSWORD,
|
||||
endpoint_url=ENDPOINT_URL,
|
||||
auth_strategy=NOAUTH,
|
||||
region_name=REGION)
|
||||
|
||||
def test_get_noauth(self):
|
||||
url = ENDPOINT_URL + '/resource'
|
||||
self.requests.get(ENDPOINT_URL + '/resource')
|
||||
self.client.do_request('/resource', 'GET')
|
||||
self.assertEqual(url, self.requests.last_request.url)
|
||||
self.assertEqual(ENDPOINT_URL, self.client.endpoint_url)
|
||||
|
||||
|
||||
class CLITestAuthKeystone(testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
"""Prepare the test environment."""
|
||||
super(CLITestAuthKeystone, self).setUp()
|
||||
|
||||
for var in ('http_proxy', 'HTTP_PROXY'):
|
||||
self.useFixture(fixtures.EnvironmentVariableFixture(var))
|
||||
|
||||
self.logger = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
|
||||
self.requests = self.useFixture(mock_fixture.Fixture())
|
||||
|
||||
self.client = client.construct_http_client(
|
||||
username=USERNAME,
|
||||
tenant_name=TENANT_NAME,
|
||||
password=PASSWORD,
|
||||
auth_url=AUTH_URL,
|
||||
region_name=REGION)
|
||||
|
||||
def test_reused_token_get_auth_info(self):
|
||||
"""Test that Client.get_auth_info() works even if client was
|
||||
instantiated with predefined token.
|
||||
"""
|
||||
token_id = uuid.uuid4().hex
|
||||
client_ = client.HTTPClient(username=USERNAME,
|
||||
tenant_name=TENANT_NAME,
|
||||
token=token_id,
|
||||
password=PASSWORD,
|
||||
auth_url=AUTH_URL,
|
||||
region_name=REGION)
|
||||
expected = {'auth_token': token_id,
|
||||
'auth_tenant_id': None,
|
||||
'auth_user_id': None,
|
||||
'endpoint_url': self.client.endpoint_url}
|
||||
self.assertEqual(expected, client_.get_auth_info())
|
||||
|
||||
def test_get_token(self):
|
||||
auth_session, auth_plugin = setup_keystone_v2(self.requests)
|
||||
|
||||
self.client = client.construct_http_client(
|
||||
username=USERNAME,
|
||||
tenant_name=TENANT_NAME,
|
||||
password=PASSWORD,
|
||||
auth_url=AUTH_URL,
|
||||
region_name=REGION,
|
||||
session=auth_session,
|
||||
auth=auth_plugin)
|
||||
|
||||
m = self.requests.get(PUBLIC_ENDPOINT_URL + '/resource',
|
||||
request_headers={'X-Auth-Token': TOKENID})
|
||||
self.client.do_request('/resource', 'GET')
|
||||
self.assertTrue(m.called)
|
||||
|
||||
def test_refresh_token(self):
|
||||
token_id = uuid.uuid4().hex
|
||||
text = uuid.uuid4().hex
|
||||
self.client.auth_token = token_id
|
||||
self.client.endpoint_url = ENDPOINT_URL
|
||||
|
||||
res_url = ENDPOINT_URL + '/resource'
|
||||
v2_url = AUTH_URL + '/tokens'
|
||||
|
||||
# token_id gives 401, KS_TOKEN_RESULT gives 200
|
||||
self.requests.get(res_url,
|
||||
request_headers={'X-Auth-Token': token_id},
|
||||
status_code=401)
|
||||
|
||||
self.requests.get(
|
||||
res_url,
|
||||
text=text,
|
||||
status_code=200,
|
||||
request_headers={'X-Auth-Token': KS_TOKEN_RESULT.token_id})
|
||||
|
||||
self.requests.post(v2_url, json=KS_TOKEN_RESULT)
|
||||
|
||||
resp = self.client.do_request('/resource', 'GET')
|
||||
|
||||
self.assertEqual(text, resp[1])
|
||||
self.assertEqual(3, len(self.requests.request_history))
|
||||
|
||||
self.assertEqual(res_url, self.requests.request_history[0].url)
|
||||
self.assertEqual(v2_url, self.requests.request_history[1].url)
|
||||
self.assertEqual(res_url, self.requests.request_history[2].url)
|
||||
|
||||
def test_refresh_token_no_auth_url(self):
|
||||
self.client.auth_url = None
|
||||
|
||||
token_id = uuid.uuid4().hex
|
||||
self.client.auth_token = token_id
|
||||
self.client.endpoint_url = ENDPOINT_URL
|
||||
|
||||
self.requests.get(ENDPOINT_URL + '/resource', status_code=401)
|
||||
self.assertRaises(exceptions.NoAuthURLProvided,
|
||||
self.client.do_request,
|
||||
'/resource',
|
||||
'GET')
|
||||
|
||||
def test_get_endpoint_url_with_invalid_auth_url(self):
|
||||
# Handle the case when auth_url is not provided
|
||||
self.client.auth_url = None
|
||||
self.assertRaises(exceptions.NoAuthURLProvided,
|
||||
self.client._get_endpoint_url)
|
||||
|
||||
def test_get_endpoint_url(self):
|
||||
token_id = uuid.uuid4().hex
|
||||
self.client.auth_token = token_id
|
||||
|
||||
self.requests.get(AUTH_URL + '/tokens/%s/endpoints' % token_id,
|
||||
json=ENDPOINTS_RESULT)
|
||||
self.requests.get(ENDPOINT_URL + '/resource')
|
||||
|
||||
self.client.do_request('/resource', 'GET')
|
||||
|
||||
self.assertEqual(token_id,
|
||||
self.requests.last_request.headers['X-Auth-Token'])
|
||||
|
||||
def test_use_given_endpoint_url(self):
|
||||
self.client = client.HTTPClient(
|
||||
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
|
||||
auth_url=AUTH_URL, region_name=REGION,
|
||||
endpoint_url=ENDPOINT_OVERRIDE)
|
||||
self.assertEqual(ENDPOINT_OVERRIDE, self.client.endpoint_url)
|
||||
|
||||
token_id = uuid.uuid4().hex
|
||||
self.client.auth_token = token_id
|
||||
|
||||
self.requests.get(ENDPOINT_OVERRIDE + '/resource')
|
||||
|
||||
self.client.do_request('/resource', 'GET')
|
||||
|
||||
self.assertEqual(ENDPOINT_OVERRIDE, self.client.endpoint_url)
|
||||
self.assertEqual(token_id,
|
||||
self.requests.last_request.headers['X-Auth-Token'])
|
||||
|
||||
def test_get_endpoint_url_other(self):
|
||||
self.client = client.HTTPClient(
|
||||
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
|
||||
auth_url=AUTH_URL, region_name=REGION, endpoint_type='otherURL')
|
||||
|
||||
token_id = uuid.uuid4().hex
|
||||
self.client.auth_token = token_id
|
||||
|
||||
self.requests.get(AUTH_URL + '/tokens/%s/endpoints' % token_id,
|
||||
json=ENDPOINTS_RESULT)
|
||||
|
||||
self.assertRaises(exceptions.EndpointTypeNotFound,
|
||||
self.client.do_request,
|
||||
'/resource',
|
||||
'GET')
|
||||
|
||||
def test_get_endpoint_url_failed(self):
|
||||
token_id = uuid.uuid4().hex
|
||||
self.client.auth_token = token_id
|
||||
|
||||
self.requests.get(AUTH_URL + '/tokens/%s/endpoints' % token_id,
|
||||
status_code=401)
|
||||
self.requests.post(AUTH_URL + '/tokens', json=KS_TOKEN_RESULT)
|
||||
m = self.requests.get(ENDPOINT_URL + '/resource')
|
||||
|
||||
self.client.do_request('/resource', 'GET')
|
||||
|
||||
self.assertEqual(KS_TOKEN_RESULT.token_id,
|
||||
m.last_request.headers['X-Auth-Token'])
|
||||
|
||||
def test_endpoint_type(self):
|
||||
auth_session, auth_plugin = setup_keystone_v3(self.requests)
|
||||
|
||||
# Test default behavior is to choose public.
|
||||
self.client = client.construct_http_client(
|
||||
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
|
||||
auth_url=AUTH_URL, region_name=REGION,
|
||||
session=auth_session, auth=auth_plugin)
|
||||
|
||||
self.assertEqual(PUBLIC_ENDPOINT_URL, self.client.endpoint_url)
|
||||
|
||||
# Test admin url
|
||||
self.client = client.construct_http_client(
|
||||
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
|
||||
auth_url=AUTH_URL, region_name=REGION, endpoint_type='adminURL',
|
||||
session=auth_session, auth=auth_plugin)
|
||||
|
||||
self.assertEqual(ADMIN_ENDPOINT_URL, self.client.endpoint_url)
|
||||
|
||||
# Test public url
|
||||
self.client = client.construct_http_client(
|
||||
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
|
||||
auth_url=AUTH_URL, region_name=REGION, endpoint_type='publicURL',
|
||||
session=auth_session, auth=auth_plugin)
|
||||
|
||||
self.assertEqual(PUBLIC_ENDPOINT_URL, self.client.endpoint_url)
|
||||
|
||||
# Test internal url
|
||||
self.client = client.construct_http_client(
|
||||
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
|
||||
auth_url=AUTH_URL, region_name=REGION, endpoint_type='internalURL',
|
||||
session=auth_session, auth=auth_plugin)
|
||||
|
||||
self.assertEqual(INTERNAL_ENDPOINT_URL, self.client.endpoint_url)
|
||||
|
||||
# Test url that isn't found in the service catalog
|
||||
self.client = client.construct_http_client(
|
||||
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
|
||||
auth_url=AUTH_URL, region_name=REGION, endpoint_type='privateURL',
|
||||
session=auth_session, auth=auth_plugin)
|
||||
|
||||
self.assertRaises(
|
||||
ks_exceptions.EndpointNotFound,
|
||||
getattr, self.client, 'endpoint_url')
|
||||
|
||||
def test_strip_credentials_from_log(self):
|
||||
m = self.requests.post(AUTH_URL + '/tokens', json=KS_TOKEN_RESULT)
|
||||
self.requests.get(ENDPOINT_URL + '/resource')
|
||||
self.client.do_request('/resource', 'GET')
|
||||
|
||||
self.assertIn('REDACTED', self.logger.output)
|
||||
self.assertNotIn(self.client.password, self.logger.output)
|
||||
|
||||
self.assertNotIn('REDACTED', m.last_request.body)
|
||||
self.assertIn(self.client.password, m.last_request.body)
|
||||
|
||||
|
||||
class CLITestAuthKeystoneWithId(CLITestAuthKeystone):
|
||||
|
||||
def setUp(self):
|
||||
"""Prepare the test environment."""
|
||||
super(CLITestAuthKeystoneWithId, self).setUp()
|
||||
self.client = client.HTTPClient(user_id=USER_ID,
|
||||
tenant_id=TENANT_ID,
|
||||
password=PASSWORD,
|
||||
auth_url=AUTH_URL,
|
||||
region_name=REGION)
|
||||
|
||||
|
||||
class CLITestAuthKeystoneWithIdandName(CLITestAuthKeystone):
|
||||
|
||||
def setUp(self):
|
||||
"""Prepare the test environment."""
|
||||
super(CLITestAuthKeystoneWithIdandName, self).setUp()
|
||||
self.client = client.HTTPClient(username=USERNAME,
|
||||
user_id=USER_ID,
|
||||
tenant_id=TENANT_ID,
|
||||
tenant_name=TENANT_NAME,
|
||||
password=PASSWORD,
|
||||
auth_url=AUTH_URL,
|
||||
region_name=REGION)
|
||||
|
||||
|
||||
class TestKeystoneClientVersions(testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
"""Prepare the test environment."""
|
||||
super(TestKeystoneClientVersions, self).setUp()
|
||||
self.requests = self.useFixture(mock_fixture.Fixture())
|
||||
|
||||
def test_v2_auth(self):
|
||||
auth_session, auth_plugin = setup_keystone_v2(self.requests)
|
||||
|
||||
self.client = client.construct_http_client(
|
||||
username=USERNAME,
|
||||
tenant_name=TENANT_NAME,
|
||||
password=PASSWORD,
|
||||
auth_url=AUTH_URL,
|
||||
region_name=REGION,
|
||||
session=auth_session,
|
||||
auth=auth_plugin)
|
||||
|
||||
m = self.requests.get(PUBLIC_ENDPOINT_URL + '/resource')
|
||||
|
||||
self.client.do_request('/resource', 'GET')
|
||||
|
||||
self.assertTrue(m.called)
|
||||
|
||||
def test_v3_auth(self):
|
||||
auth_session, auth_plugin = setup_keystone_v3(self.requests)
|
||||
|
||||
self.client = client.construct_http_client(
|
||||
user_id=USER_ID,
|
||||
tenant_id=TENANT_ID,
|
||||
password=PASSWORD,
|
||||
auth_url=V3_URL,
|
||||
region_name=REGION,
|
||||
session=auth_session,
|
||||
auth=auth_plugin)
|
||||
|
||||
m = self.requests.get(PUBLIC_ENDPOINT_URL + '/resource')
|
||||
|
||||
self.client.do_request('/resource', 'GET')
|
||||
|
||||
self.assertTrue(m.called)
|
@ -21,7 +21,6 @@ import testtools
|
||||
|
||||
from neutronclient import client
|
||||
from neutronclient.common import exceptions
|
||||
from neutronclient.tests.unit import test_auth
|
||||
|
||||
|
||||
AUTH_TOKEN = 'test_token'
|
||||
@ -74,14 +73,6 @@ class TestHTTPClientMixin(object):
|
||||
self._test_headers(headers, body=BODY, headers=headers)
|
||||
|
||||
|
||||
class TestSessionClient(TestHTTPClientMixin, testtools.TestCase):
|
||||
|
||||
def initialize(self):
|
||||
session, auth = test_auth.setup_keystone_v2(self.requests)
|
||||
return [client.SessionClient,
|
||||
client.SessionClient(session=session, auth=auth)]
|
||||
|
||||
|
||||
class TestHTTPClient(TestHTTPClientMixin, testtools.TestCase):
|
||||
|
||||
def initialize(self):
|
||||
|
@ -21,18 +21,11 @@ import sys
|
||||
|
||||
import fixtures
|
||||
from mox3 import mox
|
||||
import requests_mock
|
||||
import six
|
||||
import testtools
|
||||
from testtools import matchers
|
||||
|
||||
from keystoneclient.auth.identity import v2 as v2_auth
|
||||
from keystoneclient.auth.identity import v3 as v3_auth
|
||||
from keystoneclient import session
|
||||
|
||||
from neutronclient.common import clientmanager
|
||||
from neutronclient import shell as openstack_shell
|
||||
from neutronclient.tests.unit import test_auth as auth
|
||||
|
||||
|
||||
DEFAULT_USERNAME = 'username'
|
||||
@ -150,297 +143,6 @@ class ShellTest(testtools.TestCase):
|
||||
self.assertThat(help_text,
|
||||
matchers.MatchesRegex(r, re.DOTALL | re.MULTILINE))
|
||||
|
||||
def test_unknown_auth_strategy(self):
|
||||
self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
|
||||
stdout, stderr = self.shell('--os-auth-strategy fake quota-list')
|
||||
self.assertFalse(stdout)
|
||||
self.assertEqual('You must provide a service URL via '
|
||||
'either --os-url or env[OS_URL]', stderr.strip())
|
||||
|
||||
@requests_mock.Mocker()
|
||||
def test_auth(self, mrequests):
|
||||
# emulate Keystone version discovery
|
||||
mrequests.register_uri('GET',
|
||||
auth.V3_URL,
|
||||
json=auth.V3_VERSION_ENTRY)
|
||||
|
||||
neutron_shell = openstack_shell.NeutronShell('2.0')
|
||||
self.addCleanup(self.mox.UnsetStubs)
|
||||
self.mox.StubOutWithMock(clientmanager.ClientManager, '__init__')
|
||||
self.mox.StubOutWithMock(neutron_shell, 'run_subcommand')
|
||||
clientmanager.ClientManager.__init__(
|
||||
token='', url='', auth_url=auth.V3_URL,
|
||||
tenant_name='test', tenant_id='tenant_id',
|
||||
username='test', user_id='',
|
||||
password='test', region_name='', api_version={'network': '2.0'},
|
||||
auth_strategy='keystone', service_type='network',
|
||||
endpoint_type='publicURL', insecure=False, ca_cert=None,
|
||||
timeout=None,
|
||||
raise_errors=False,
|
||||
retries=0,
|
||||
auth=mox.IsA(v3_auth.Password),
|
||||
session=mox.IsA(session.Session),
|
||||
log_credentials=True)
|
||||
neutron_shell.run_subcommand(['quota-list'])
|
||||
self.mox.ReplayAll()
|
||||
cmdline = ('--os-username test '
|
||||
'--os-password test '
|
||||
'--os-tenant-name test '
|
||||
'--os-auth-url %s '
|
||||
'--os-auth-strategy keystone quota-list'
|
||||
% auth.V3_URL)
|
||||
neutron_shell.run(cmdline.split())
|
||||
self.mox.VerifyAll()
|
||||
|
||||
@requests_mock.Mocker()
|
||||
def test_auth_cert_and_key(self, mrequests):
|
||||
# emulate Keystone version discovery
|
||||
mrequests.register_uri('GET',
|
||||
auth.V3_URL,
|
||||
json=auth.V3_VERSION_ENTRY)
|
||||
|
||||
neutron_shell = openstack_shell.NeutronShell('2.0')
|
||||
self.addCleanup(self.mox.UnsetStubs)
|
||||
self.mox.StubOutWithMock(clientmanager.ClientManager, '__init__')
|
||||
self.mox.StubOutWithMock(neutron_shell, 'run_subcommand')
|
||||
clientmanager.ClientManager.__init__(
|
||||
token='', url='', auth_url=auth.V3_URL,
|
||||
tenant_name='test', tenant_id='tenant_id',
|
||||
username='test', user_id='',
|
||||
password='test', region_name='', api_version={'network': '2.0'},
|
||||
auth_strategy='keystone', service_type='network',
|
||||
raise_errors=False,
|
||||
endpoint_type='publicURL', insecure=False, ca_cert=None, retries=0,
|
||||
timeout=None,
|
||||
auth=mox.IsA(v3_auth.Password),
|
||||
session=mox.IsA(session.Session),
|
||||
log_credentials=True)
|
||||
neutron_shell.run_subcommand(['quota-list'])
|
||||
self.mox.ReplayAll()
|
||||
cmdline = ('--os-username test '
|
||||
'--os-password test '
|
||||
'--os-tenant-name test '
|
||||
'--os-cert test '
|
||||
'--os-key test '
|
||||
'--os-auth-url %s '
|
||||
'--os-auth-strategy keystone quota-list'
|
||||
% auth.V3_URL)
|
||||
neutron_shell.run(cmdline.split())
|
||||
self.mox.VerifyAll()
|
||||
|
||||
@requests_mock.Mocker()
|
||||
def test_v2_auth(self, mrequests):
|
||||
# emulate Keystone version discovery
|
||||
mrequests.register_uri('GET',
|
||||
auth.V2_URL,
|
||||
json=auth.V2_VERSION_ENTRY)
|
||||
|
||||
neutron_shell = openstack_shell.NeutronShell('2.0')
|
||||
self.addCleanup(self.mox.UnsetStubs)
|
||||
self.mox.StubOutWithMock(clientmanager.ClientManager, '__init__')
|
||||
self.mox.StubOutWithMock(neutron_shell, 'run_subcommand')
|
||||
clientmanager.ClientManager.__init__(
|
||||
token='', url='', auth_url=auth.V2_URL,
|
||||
tenant_name='test', tenant_id='tenant_id',
|
||||
username='test', user_id='',
|
||||
password='test', region_name='', api_version={'network': '2.0'},
|
||||
auth_strategy='keystone', service_type='network',
|
||||
endpoint_type='publicURL', insecure=False, ca_cert=None,
|
||||
timeout=None,
|
||||
raise_errors=False,
|
||||
retries=0,
|
||||
auth=mox.IsA(v2_auth.Password),
|
||||
session=mox.IsA(session.Session),
|
||||
log_credentials=True)
|
||||
neutron_shell.run_subcommand(['quota-list'])
|
||||
self.mox.ReplayAll()
|
||||
cmdline = ('--os-username test '
|
||||
'--os-password test '
|
||||
'--os-tenant-name test '
|
||||
'--os-auth-url %s '
|
||||
'--os-auth-strategy keystone quota-list'
|
||||
% auth.V2_URL)
|
||||
neutron_shell.run(cmdline.split())
|
||||
self.mox.VerifyAll()
|
||||
|
||||
@requests_mock.Mocker()
|
||||
def test_failed_auth_version_discovery_v3_auth_url(self, mrequests):
|
||||
# emulate Keystone version discovery
|
||||
mrequests.register_uri('GET',
|
||||
auth.V3_URL,
|
||||
status_code=405)
|
||||
|
||||
neutron_shell = openstack_shell.NeutronShell('2.0')
|
||||
self.addCleanup(self.mox.UnsetStubs)
|
||||
self.mox.StubOutWithMock(clientmanager.ClientManager, '__init__')
|
||||
self.mox.StubOutWithMock(neutron_shell, 'run_subcommand')
|
||||
clientmanager.ClientManager.__init__(
|
||||
token='', url='', auth_url=auth.V3_URL,
|
||||
tenant_name='test', tenant_id='tenant_id',
|
||||
username='test', user_id='',
|
||||
password='test', region_name='', api_version={'network': '2.0'},
|
||||
auth_strategy='keystone', service_type='network',
|
||||
endpoint_type='publicURL', insecure=False, ca_cert=None,
|
||||
timeout=None,
|
||||
raise_errors=False,
|
||||
retries=0,
|
||||
auth=mox.IsA(v3_auth.Password),
|
||||
session=mox.IsA(session.Session),
|
||||
log_credentials=True)
|
||||
neutron_shell.run_subcommand(['quota-list'])
|
||||
self.mox.ReplayAll()
|
||||
cmdline = ('--os-username test '
|
||||
'--os-password test '
|
||||
'--os-user-domain-name test '
|
||||
'--os-tenant-name test '
|
||||
'--os-auth-url %s '
|
||||
'--os-auth-strategy keystone quota-list'
|
||||
% auth.V3_URL)
|
||||
neutron_shell.run(cmdline.split())
|
||||
self.mox.VerifyAll()
|
||||
|
||||
@requests_mock.Mocker()
|
||||
def test_failed_auth_version_discovery_v2_auth_url(self, mrequests):
|
||||
# emulate Keystone version discovery
|
||||
mrequests.register_uri('GET',
|
||||
auth.V2_URL,
|
||||
status_code=405)
|
||||
|
||||
neutron_shell = openstack_shell.NeutronShell('2.0')
|
||||
self.addCleanup(self.mox.UnsetStubs)
|
||||
self.mox.StubOutWithMock(clientmanager.ClientManager, '__init__')
|
||||
self.mox.StubOutWithMock(neutron_shell, 'run_subcommand')
|
||||
clientmanager.ClientManager.__init__(
|
||||
token='', url='', auth_url=auth.V2_URL,
|
||||
tenant_name='test', tenant_id='tenant_id',
|
||||
username='test', user_id='',
|
||||
password='test', region_name='', api_version={'network': '2.0'},
|
||||
auth_strategy='keystone', service_type='network',
|
||||
endpoint_type='publicURL', insecure=False, ca_cert=None,
|
||||
timeout=None,
|
||||
raise_errors=False,
|
||||
retries=0,
|
||||
auth=mox.IsA(v2_auth.Password),
|
||||
session=mox.IsA(session.Session),
|
||||
log_credentials=True)
|
||||
neutron_shell.run_subcommand(['quota-list'])
|
||||
self.mox.ReplayAll()
|
||||
cmdline = ('--os-username test '
|
||||
'--os-password test '
|
||||
'--os-tenant-name test '
|
||||
'--os-auth-url %s '
|
||||
'--os-auth-strategy keystone quota-list'
|
||||
% auth.V2_URL)
|
||||
neutron_shell.run(cmdline.split())
|
||||
self.mox.VerifyAll()
|
||||
|
||||
@requests_mock.Mocker()
|
||||
def test_auth_version_discovery_v3(self, mrequests):
|
||||
# emulate Keystone version discovery
|
||||
mrequests.register_uri('GET',
|
||||
auth.BASE_URL,
|
||||
text=auth.V3_VERSION_LIST)
|
||||
|
||||
neutron_shell = openstack_shell.NeutronShell('2.0')
|
||||
self.addCleanup(self.mox.UnsetStubs)
|
||||
self.mox.StubOutWithMock(clientmanager.ClientManager, '__init__')
|
||||
self.mox.StubOutWithMock(neutron_shell, 'run_subcommand')
|
||||
clientmanager.ClientManager.__init__(
|
||||
token='', url='', auth_url=auth.BASE_URL,
|
||||
tenant_name='test', tenant_id='tenant_id',
|
||||
username='test', user_id='',
|
||||
password='test', region_name='', api_version={'network': '2.0'},
|
||||
auth_strategy='keystone', service_type='network',
|
||||
endpoint_type='publicURL', insecure=False, ca_cert=None,
|
||||
timeout=None,
|
||||
raise_errors=False,
|
||||
retries=0,
|
||||
auth=mox.IsA(v3_auth.Password),
|
||||
session=mox.IsA(session.Session),
|
||||
log_credentials=True)
|
||||
neutron_shell.run_subcommand(['quota-list'])
|
||||
self.mox.ReplayAll()
|
||||
cmdline = ('--os-username test '
|
||||
'--os-password test '
|
||||
'--os-user-domain-name test '
|
||||
'--os-tenant-name test '
|
||||
'--os-auth-url %s '
|
||||
'--os-auth-strategy keystone quota-list'
|
||||
% auth.BASE_URL)
|
||||
neutron_shell.run(cmdline.split())
|
||||
self.mox.VerifyAll()
|
||||
|
||||
@requests_mock.Mocker()
|
||||
def test_auth_version_discovery_v2(self, mrequests):
|
||||
# emulate Keystone version discovery
|
||||
mrequests.register_uri('GET',
|
||||
auth.BASE_URL,
|
||||
text=auth.V3_VERSION_LIST)
|
||||
|
||||
neutron_shell = openstack_shell.NeutronShell('2.0')
|
||||
self.addCleanup(self.mox.UnsetStubs)
|
||||
self.mox.StubOutWithMock(clientmanager.ClientManager, '__init__')
|
||||
self.mox.StubOutWithMock(neutron_shell, 'run_subcommand')
|
||||
clientmanager.ClientManager.__init__(
|
||||
token='', url='', auth_url=auth.BASE_URL,
|
||||
tenant_name='test', tenant_id='tenant_id',
|
||||
username='test', user_id='',
|
||||
password='test', region_name='', api_version={'network': '2.0'},
|
||||
auth_strategy='keystone', service_type='network',
|
||||
endpoint_type='publicURL', insecure=False, ca_cert=None,
|
||||
timeout=None,
|
||||
raise_errors=False,
|
||||
retries=0,
|
||||
auth=mox.IsA(v2_auth.Password),
|
||||
session=mox.IsA(session.Session),
|
||||
log_credentials=True)
|
||||
neutron_shell.run_subcommand(['quota-list'])
|
||||
self.mox.ReplayAll()
|
||||
cmdline = ('--os-username test '
|
||||
'--os-password test '
|
||||
'--os-tenant-name test '
|
||||
'--os-auth-url %s '
|
||||
'--os-auth-strategy keystone quota-list'
|
||||
% auth.BASE_URL)
|
||||
neutron_shell.run(cmdline.split())
|
||||
self.mox.VerifyAll()
|
||||
|
||||
@requests_mock.Mocker()
|
||||
def test_insecure_auth(self, mrequests):
|
||||
# emulate Keystone version discovery
|
||||
mrequests.register_uri('GET',
|
||||
auth.V2_URL,
|
||||
json=auth.V2_VERSION_ENTRY)
|
||||
|
||||
neutron_shell = openstack_shell.NeutronShell('2.0')
|
||||
self.addCleanup(self.mox.UnsetStubs)
|
||||
self.mox.StubOutWithMock(clientmanager.ClientManager, '__init__')
|
||||
self.mox.StubOutWithMock(neutron_shell, 'run_subcommand')
|
||||
clientmanager.ClientManager.__init__(
|
||||
token='', url='', auth_url=auth.V2_URL,
|
||||
tenant_name='test', tenant_id='tenant_id',
|
||||
username='test', user_id='',
|
||||
password='test', region_name='', api_version={'network': '2.0'},
|
||||
auth_strategy='keystone', service_type='network',
|
||||
endpoint_type='publicURL', insecure=True, ca_cert=None,
|
||||
timeout=None,
|
||||
raise_errors=False,
|
||||
retries=0,
|
||||
auth=mox.IgnoreArg(),
|
||||
session=mox.IgnoreArg(),
|
||||
log_credentials=True)
|
||||
neutron_shell.run_subcommand(['quota-list'])
|
||||
self.mox.ReplayAll()
|
||||
cmdline = ('--os-username test '
|
||||
'--os-password test '
|
||||
'--os-tenant-name test '
|
||||
'--insecure '
|
||||
'--os-auth-url %s '
|
||||
'--os-auth-strategy keystone quota-list'
|
||||
% auth.V2_URL)
|
||||
neutron_shell.run(cmdline.split())
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_build_option_parser(self):
|
||||
neutron_shell = openstack_shell.NeutronShell('2.0')
|
||||
result = neutron_shell.build_option_parser('descr', '2.0')
|
||||
@ -465,7 +167,7 @@ class ShellTest(testtools.TestCase):
|
||||
|
||||
# Neither $OS_ENDPOINT_TYPE nor --os-endpoint-type
|
||||
namespace = parser.parse_args([])
|
||||
self.assertEqual('publicURL', namespace.os_endpoint_type)
|
||||
self.assertEqual('public', namespace.os_endpoint_type)
|
||||
|
||||
# --endpoint-type but not $OS_ENDPOINT_TYPE
|
||||
namespace = parser.parse_args(['--os-endpoint-type=admin'])
|
||||
|
@ -1,179 +0,0 @@
|
||||
# Copyright (C) 2013 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import fixtures
|
||||
import requests
|
||||
import testtools
|
||||
|
||||
from mox3 import mox
|
||||
import requests_mock
|
||||
|
||||
from neutronclient.client import HTTPClient
|
||||
from neutronclient.common.clientmanager import ClientManager
|
||||
from neutronclient.common import exceptions
|
||||
from neutronclient import shell as openstack_shell
|
||||
from neutronclient.tests.unit import test_auth as auth
|
||||
|
||||
AUTH_TOKEN = 'test_token'
|
||||
END_URL = 'test_url'
|
||||
METHOD = 'GET'
|
||||
URL = 'http://test.test:1234/v2.0/'
|
||||
CA_CERT = '/tmp/test/path'
|
||||
|
||||
|
||||
class TestSSL(testtools.TestCase):
|
||||
def setUp(self):
|
||||
super(TestSSL, self).setUp()
|
||||
|
||||
self.useFixture(fixtures.EnvironmentVariable('OS_TOKEN', AUTH_TOKEN))
|
||||
self.useFixture(fixtures.EnvironmentVariable('OS_URL', END_URL))
|
||||
|
||||
self.mox = mox.Mox()
|
||||
self.addCleanup(self.mox.UnsetStubs)
|
||||
|
||||
@requests_mock.Mocker()
|
||||
def test_ca_cert_passed(self, mrequests):
|
||||
# emulate Keystone version discovery
|
||||
mrequests.register_uri('GET',
|
||||
auth.V3_URL,
|
||||
json=auth.V3_VERSION_ENTRY)
|
||||
|
||||
self.mox.StubOutWithMock(ClientManager, '__init__')
|
||||
self.mox.StubOutWithMock(openstack_shell.NeutronShell, 'interact')
|
||||
|
||||
ClientManager.__init__(
|
||||
ca_cert=CA_CERT,
|
||||
# we are not really interested in other args
|
||||
api_version=mox.IgnoreArg(),
|
||||
auth_strategy=mox.IgnoreArg(),
|
||||
auth_url=mox.IgnoreArg(),
|
||||
service_type=mox.IgnoreArg(),
|
||||
endpoint_type=mox.IgnoreArg(),
|
||||
insecure=mox.IgnoreArg(),
|
||||
password=mox.IgnoreArg(),
|
||||
region_name=mox.IgnoreArg(),
|
||||
tenant_id=mox.IgnoreArg(),
|
||||
tenant_name=mox.IgnoreArg(),
|
||||
token=mox.IgnoreArg(),
|
||||
url=mox.IgnoreArg(),
|
||||
username=mox.IgnoreArg(),
|
||||
user_id=mox.IgnoreArg(),
|
||||
retries=mox.IgnoreArg(),
|
||||
raise_errors=mox.IgnoreArg(),
|
||||
log_credentials=mox.IgnoreArg(),
|
||||
timeout=mox.IgnoreArg(),
|
||||
auth=mox.IgnoreArg(),
|
||||
session=mox.IgnoreArg()
|
||||
)
|
||||
openstack_shell.NeutronShell.interact().AndReturn(0)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
cmdline = (
|
||||
'--os-cacert %s --os-auth-url %s' %
|
||||
(CA_CERT, auth.V3_URL))
|
||||
|
||||
openstack_shell.NeutronShell('2.0').run(cmdline.split())
|
||||
self.mox.VerifyAll()
|
||||
|
||||
@requests_mock.Mocker()
|
||||
def test_ca_cert_passed_as_env_var(self, mrequests):
|
||||
|
||||
# emulate Keystone version discovery
|
||||
mrequests.register_uri('GET',
|
||||
auth.V3_URL,
|
||||
json=auth.V3_VERSION_ENTRY)
|
||||
|
||||
self.useFixture(fixtures.EnvironmentVariable('OS_CACERT', CA_CERT))
|
||||
|
||||
self.mox.StubOutWithMock(ClientManager, '__init__')
|
||||
self.mox.StubOutWithMock(openstack_shell.NeutronShell, 'interact')
|
||||
|
||||
ClientManager.__init__(
|
||||
ca_cert=CA_CERT,
|
||||
# we are not really interested in other args
|
||||
api_version=mox.IgnoreArg(),
|
||||
auth_strategy=mox.IgnoreArg(),
|
||||
auth_url=mox.IgnoreArg(),
|
||||
service_type=mox.IgnoreArg(),
|
||||
endpoint_type=mox.IgnoreArg(),
|
||||
insecure=mox.IgnoreArg(),
|
||||
password=mox.IgnoreArg(),
|
||||
region_name=mox.IgnoreArg(),
|
||||
tenant_id=mox.IgnoreArg(),
|
||||
tenant_name=mox.IgnoreArg(),
|
||||
token=mox.IgnoreArg(),
|
||||
url=mox.IgnoreArg(),
|
||||
username=mox.IgnoreArg(),
|
||||
user_id=mox.IgnoreArg(),
|
||||
retries=mox.IgnoreArg(),
|
||||
raise_errors=mox.IgnoreArg(),
|
||||
log_credentials=mox.IgnoreArg(),
|
||||
timeout=mox.IgnoreArg(),
|
||||
auth=mox.IgnoreArg(),
|
||||
session=mox.IgnoreArg()
|
||||
)
|
||||
openstack_shell.NeutronShell.interact().AndReturn(0)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
cmdline = ('--os-auth-url %s' % auth.V3_URL)
|
||||
openstack_shell.NeutronShell('2.0').run(cmdline.split())
|
||||
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_client_manager_properly_creates_httpclient_instance(self):
|
||||
self.mox.StubOutWithMock(HTTPClient, '__init__')
|
||||
HTTPClient.__init__(
|
||||
ca_cert=CA_CERT,
|
||||
# we are not really interested in other args
|
||||
auth_strategy=mox.IgnoreArg(),
|
||||
auth_url=mox.IgnoreArg(),
|
||||
endpoint_url=mox.IgnoreArg(),
|
||||
insecure=mox.IgnoreArg(),
|
||||
password=mox.IgnoreArg(),
|
||||
region_name=mox.IgnoreArg(),
|
||||
tenant_name=mox.IgnoreArg(),
|
||||
token=mox.IgnoreArg(),
|
||||
username=mox.IgnoreArg(),
|
||||
user_id=mox.IgnoreArg(),
|
||||
tenant_id=mox.IgnoreArg(),
|
||||
timeout=mox.IgnoreArg(),
|
||||
log_credentials=mox.IgnoreArg(),
|
||||
service_type=mox.IgnoreArg(),
|
||||
endpoint_type=mox.IgnoreArg()
|
||||
)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
version = {'network': '2.0'}
|
||||
ClientManager(ca_cert=CA_CERT,
|
||||
api_version=version,
|
||||
url=END_URL,
|
||||
token=AUTH_TOKEN).neutron
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_proper_exception_is_raised_when_cert_validation_fails(self):
|
||||
http = HTTPClient(token=AUTH_TOKEN, endpoint_url=END_URL)
|
||||
|
||||
self.mox.StubOutWithMock(HTTPClient, 'request')
|
||||
HTTPClient.request(
|
||||
URL, METHOD, headers=mox.IgnoreArg()
|
||||
).AndRaise(requests.exceptions.SSLError)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
self.assertRaises(
|
||||
exceptions.SslCertificateValidationError,
|
||||
http._cs_request,
|
||||
URL, METHOD
|
||||
)
|
||||
self.mox.VerifyAll()
|
@ -9,8 +9,9 @@ netaddr!=0.7.16,>=0.7.12
|
||||
oslo.i18n>=1.5.0 # Apache-2.0
|
||||
oslo.serialization>=1.10.0 # Apache-2.0
|
||||
oslo.utils!=2.6.0,>=2.4.0 # Apache-2.0
|
||||
os-client-config!=1.6.2,>=1.4.0
|
||||
keystoneauth1>=1.0.0
|
||||
requests!=2.8.0,>=2.5.2
|
||||
python-keystoneclient!=1.8.0,>=1.6.0
|
||||
simplejson>=2.2.0
|
||||
six>=1.9.0
|
||||
Babel>=1.3
|
||||
|
@ -9,7 +9,6 @@ discover
|
||||
fixtures>=1.3.1
|
||||
mox3>=0.7.0
|
||||
mock>=1.2
|
||||
os-client-config!=1.6.2,>=1.4.0
|
||||
oslosphinx>=2.5.0 # Apache-2.0
|
||||
oslotest>=1.10.0 # Apache-2.0
|
||||
python-subunit>=0.0.18
|
||||
|
Loading…
Reference in New Issue
Block a user