Merge "Use os-client-config and keystoneauth1 in shell"

This commit is contained in:
Jenkins
2015-11-09 16:31:22 +00:00
committed by Gerrit Code Review
10 changed files with 66 additions and 1138 deletions

View File

@@ -21,8 +21,8 @@ except ImportError:
import logging
import os
from keystoneclient import access
from keystoneclient import adapter
from keystoneauth1 import access
from keystoneauth1 import adapter
import requests
from neutronclient.common import exceptions
@@ -179,7 +179,7 @@ class HTTPClient(object):
def _extract_service_catalog(self, body):
"""Set the client's service catalog from the response data."""
self.auth_ref = access.AccessInfo.factory(body=body)
self.auth_ref = access.create(body=body)
self.service_catalog = self.auth_ref.service_catalog
self.auth_token = self.auth_ref.auth_token
self.auth_tenant_id = self.auth_ref.tenant_id
@@ -187,9 +187,9 @@ class HTTPClient(object):
if not self.endpoint_url:
self.endpoint_url = self.service_catalog.url_for(
attr='region', filter_value=self.region_name,
region_name=self.region_name,
service_type=self.service_type,
endpoint_type=self.endpoint_type)
interface=self.endpoint_type)
def _authenticate_keystone(self):
if self.user_id:
@@ -355,7 +355,7 @@ def construct_http_client(username=None,
timeout=None,
endpoint_url=None,
insecure=False,
endpoint_type='publicURL',
endpoint_type='public',
log_credentials=None,
auth_strategy='keystone',
ca_cert=None,

View File

@@ -62,6 +62,7 @@ class ClientManager(object):
ca_cert=None,
log_credentials=False,
service_type=None,
service_name=None,
timeout=None,
retries=0,
raise_errors=True,
@@ -72,6 +73,7 @@ class ClientManager(object):
self._url = url
self._auth_url = auth_url
self._service_type = service_type
self._service_name = service_name
self._endpoint_type = endpoint_type
self._tenant_name = tenant_name
self._tenant_id = tenant_id
@@ -103,6 +105,7 @@ class ClientManager(object):
region_name=self._region_name,
auth_url=self._auth_url,
service_type=self._service_type,
service_name=self._service_name,
endpoint_type=self._endpoint_type,
insecure=self._insecure,
ca_cert=self._ca_cert,

View File

@@ -21,20 +21,15 @@ Command-line interface to the Neutron APIs
from __future__ import print_function
import argparse
import getpass
import inspect
import itertools
import logging
import os
import sys
from keystoneclient.auth.identity import v2 as v2_auth
from keystoneclient.auth.identity import v3 as v3_auth
from keystoneclient import discover
from keystoneclient.openstack.common.apiclient import exceptions as ks_exc
from keystoneclient import session
from keystoneauth1 import session
import os_client_config
from oslo_utils import encodeutils
import six.moves.urllib.parse as urlparse
from cliff import app
from cliff import commandmanager
@@ -499,7 +494,7 @@ class NeutronShell(app.App):
default=0,
help=_("How many times the request to the Neutron server should "
"be retried if it fails."))
# FIXME(bklei): this method should come from python-keystoneclient
# FIXME(bklei): this method should come from keystoneauth1
self._append_global_identity_args(parser)
return parser
@@ -507,9 +502,9 @@ class NeutronShell(app.App):
def _append_global_identity_args(self, parser):
# FIXME(bklei): these are global identity (Keystone) arguments which
# should be consistent and shared by all service clients. Therefore,
# they should be provided by python-keystoneclient. We will need to
# they should be provided by keystoneauth1. We will need to
# refactor this code once this functionality is available in
# python-keystoneclient.
# keystoneauth1.
#
# Note: At that time we'll need to decide if we can just abandon
# the deprecated args (--service-type and --endpoint-type).
@@ -521,8 +516,8 @@ class NeutronShell(app.App):
parser.add_argument(
'--os-endpoint-type', metavar='<os-endpoint-type>',
default=env('OS_ENDPOINT_TYPE', default='publicURL'),
help=_('Defaults to env[OS_ENDPOINT_TYPE] or publicURL.'))
default=env('OS_ENDPOINT_TYPE', default='public'),
help=_('Defaults to env[OS_ENDPOINT_TYPE] or public.'))
# FIXME(bklei): --service-type is deprecated but kept in for
# backward compatibility.
@@ -535,7 +530,7 @@ class NeutronShell(app.App):
# backward compatibility.
parser.add_argument(
'--endpoint-type', metavar='<endpoint-type>',
default=env('OS_ENDPOINT_TYPE', default='publicURL'),
default=env('OS_ENDPOINT_TYPE', default='public'),
help=_('DEPRECATED! Use --os-endpoint-type.'))
parser.add_argument(
@@ -547,6 +542,11 @@ class NeutronShell(app.App):
'--os_auth_strategy',
help=argparse.SUPPRESS)
parser.add_argument(
'--os-cloud', metavar='<cloud>',
default=env('OS_CLOUD', default=None),
help=_('Defaults to env[OS_CLOUD].'))
parser.add_argument(
'--os-auth-url', metavar='<auth-url>',
default=env('OS_AUTH_URL'),
@@ -829,104 +829,28 @@ class NeutronShell(app.App):
"""Make sure the user has provided all of the authentication
info we need.
"""
if self.options.os_auth_strategy == 'keystone':
if self.options.os_token or self.options.os_url:
# Token flow auth takes priority
if not self.options.os_token:
raise exc.CommandError(
_("You must provide a token via"
" either --os-token or env[OS_TOKEN]"
" when providing a service URL"))
cloud_config = os_client_config.OpenStackConfig().get_one_cloud(
cloud=self.options.os_cloud, argparse=self.options,
network_api_version=self.api_version)
verify, cert = cloud_config.get_requests_verify_args()
auth = cloud_config.get_auth()
if not self.options.os_url:
raise exc.CommandError(
_("You must provide a service URL via"
" either --os-url or env[OS_URL]"
" when providing a token"))
else:
# Validate password flow auth
project_info = (self.options.os_tenant_name or
self.options.os_tenant_id or
(self.options.os_project_name and
(self.options.os_project_domain_name or
self.options.os_project_domain_id)) or
self.options.os_project_id)
if (not self.options.os_username
and not self.options.os_user_id):
raise exc.CommandError(
_("You must provide a username or user ID via"
" --os-username, env[OS_USERNAME] or"
" --os-user-id, env[OS_USER_ID]"))
if not self.options.os_password:
# No password, If we've got a tty, try prompting for it
if hasattr(sys.stdin, 'isatty') and sys.stdin.isatty():
# Check for Ctl-D
try:
self.options.os_password = getpass.getpass(
'OS Password: ')
except EOFError:
pass
# No password because we didn't have a tty or the
# user Ctl-D when prompted.
if not self.options.os_password:
raise exc.CommandError(
_("You must provide a password via"
" either --os-password or env[OS_PASSWORD]"))
if (not project_info):
# tenent is deprecated in Keystone v3. Use the latest
# terminology instead.
raise exc.CommandError(
_("You must provide a project_id or project_name ("
"with project_domain_name or project_domain_id) "
"via "
" --os-project-id (env[OS_PROJECT_ID])"
" --os-project-name (env[OS_PROJECT_NAME]),"
" --os-project-domain-id "
"(env[OS_PROJECT_DOMAIN_ID])"
" --os-project-domain-name "
"(env[OS_PROJECT_DOMAIN_NAME])"))
if not self.options.os_auth_url:
raise exc.CommandError(
_("You must provide an auth url via"
" either --os-auth-url or via env[OS_AUTH_URL]"))
auth_session = self._get_keystone_session()
auth = auth_session.auth
else: # not keystone
if not self.options.os_url:
raise exc.CommandError(
_("You must provide a service URL via"
" either --os-url or env[OS_URL]"))
auth_session = None
auth = None
auth_session = session.Session(
auth=auth, verify=verify, cert=cert,
timeout=self.options.http_timeout)
interface = self.options.os_endpoint_type or self.endpoint_type
if interface.endswith('URL'):
interface = interface[:-3]
self.client_manager = clientmanager.ClientManager(
token=self.options.os_token,
url=self.options.os_url,
auth_url=self.options.os_auth_url,
tenant_name=self.options.os_tenant_name,
tenant_id=self.options.os_tenant_id,
username=self.options.os_username,
user_id=self.options.os_user_id,
password=self.options.os_password,
region_name=self.options.os_region_name,
api_version=self.api_version,
auth_strategy=self.options.os_auth_strategy,
# FIXME (bklei) honor deprecated service_type and
# endpoint type until they are removed
service_type=self.options.os_service_type or
self.options.service_type,
endpoint_type=self.options.os_endpoint_type or self.endpoint_type,
insecure=self.options.insecure,
ca_cert=self.options.os_cacert,
timeout=self.options.http_timeout,
retries=self.options.retries,
raise_errors=False,
session=auth_session,
region_name=cloud_config.get_region_name(),
api_version=cloud_config.get_api_version('network'),
service_type=cloud_config.get_service_type('network'),
service_name=cloud_config.get_service_name('network'),
endpoint_type=interface,
auth=auth,
log_credentials=True)
return
@@ -981,89 +905,6 @@ class NeutronShell(app.App):
root_logger.addHandler(console)
return
def get_v2_auth(self, v2_auth_url):
return v2_auth.Password(
v2_auth_url,
username=self.options.os_username,
password=self.options.os_password,
tenant_id=self.options.os_tenant_id,
tenant_name=self.options.os_tenant_name)
def get_v3_auth(self, v3_auth_url):
project_id = self.options.os_project_id or self.options.os_tenant_id
project_name = (self.options.os_project_name or
self.options.os_tenant_name)
return v3_auth.Password(
v3_auth_url,
username=self.options.os_username,
password=self.options.os_password,
user_id=self.options.os_user_id,
user_domain_name=self.options.os_user_domain_name,
user_domain_id=self.options.os_user_domain_id,
project_id=project_id,
project_name=project_name,
project_domain_name=self.options.os_project_domain_name,
project_domain_id=self.options.os_project_domain_id
)
def _discover_auth_versions(self, session, auth_url):
# discover the API versions the server is supporting base on the
# given URL
try:
ks_discover = discover.Discover(session=session, auth_url=auth_url)
return (ks_discover.url_for('2.0'), ks_discover.url_for('3.0'))
except ks_exc.ClientException:
# Identity service may not support discover API version.
# Lets try to figure out the API version from the original URL.
url_parts = urlparse.urlparse(auth_url)
(scheme, netloc, path, params, query, fragment) = url_parts
path = path.lower()
if path.startswith('/v3'):
return (None, auth_url)
elif path.startswith('/v2'):
return (auth_url, None)
else:
# not enough information to determine the auth version
msg = _('Unable to determine the Keystone version '
'to authenticate with using the given '
'auth_url. Identity service may not support API '
'version discovery. Please provide a versioned '
'auth_url instead.')
raise exc.CommandError(msg)
def _get_keystone_session(self):
# first create a Keystone session
cacert = self.options.os_cacert or None
cert = self.options.os_cert or None
key = self.options.os_key or None
insecure = self.options.insecure or False
ks_session = session.Session.construct(dict(cacert=cacert,
cert=cert,
key=key,
insecure=insecure))
# discover the supported keystone versions using the given url
(v2_auth_url, v3_auth_url) = self._discover_auth_versions(
session=ks_session,
auth_url=self.options.os_auth_url)
# Determine which authentication plugin to use. First inspect the
# auth_url to see the supported version. If both v3 and v2 are
# supported, then use the highest version if possible.
user_domain_name = self.options.os_user_domain_name or None
user_domain_id = self.options.os_user_domain_id or None
project_domain_name = self.options.os_project_domain_name or None
project_domain_id = self.options.os_project_domain_id or None
domain_info = (user_domain_name or user_domain_id or
project_domain_name or project_domain_id)
if (v2_auth_url and not domain_info) or not v3_auth_url:
ks_session.auth = self.get_v2_auth(v2_auth_url)
else:
ks_session.auth = self.get_v3_auth(v3_auth_url)
return ks_session
def main(argv=sys.argv[1:]):
try:

View File

@@ -12,9 +12,8 @@
import uuid
from keystoneclient.auth.identity import v2 as v2_auth
from keystoneclient import discover
from keystoneclient import session
from keystoneauth1 import plugin as ksa_plugin
from keystoneauth1 import session
from tempest_lib import base
import testtools
@@ -38,13 +37,22 @@ class LibraryTestBase(base.BaseTestCase):
class Libv2HTTPClientTestBase(LibraryTestBase):
def _get_client(self):
creds = func_base.credentials()
session_params = {}
ks_session = session.Session.construct(session_params)
ks_discover = discover.Discover(session=ks_session,
auth_url=creds['auth_url'])
# At the moment, we use keystone v2 API
v2_auth_url = ks_discover.url_for('2.0')
cloud_config = func_base.get_cloud_config()
# We're getting a session so we can find the v2 url via KSA
keystone_auth = cloud_config.get_auth()
(verify, cert) = cloud_config.get_requests_verify_args()
ks_session = session.Session(
auth=keystone_auth, verify=verify, cert=cert)
# for the old HTTPClient, we use keystone v2 API, regardless of
# whether v3 also exists or is configured
v2_auth_url = keystone_auth.get_endpoint(
ks_session, interface=ksa_plugin.AUTH_INTERFACE, version=(2, 0))
return v2_client.Client(username=creds['username'],
password=creds['password'],
tenant_name=creds['project_name'],
@@ -54,18 +62,14 @@ class Libv2HTTPClientTestBase(LibraryTestBase):
class Libv2SessionClientTestBase(LibraryTestBase):
def _get_client(self):
creds = func_base.credentials()
session_params = {}
ks_session = session.Session.construct(session_params)
ks_discover = discover.Discover(session=ks_session,
auth_url=creds['auth_url'])
# At the moment, we use keystone v2 API
v2_auth_url = ks_discover.url_for('2.0')
ks_session.auth = v2_auth.Password(
v2_auth_url,
username=creds['username'],
password=creds['password'],
tenant_name=creds['project_name'])
cloud_config = func_base.get_cloud_config()
keystone_auth = cloud_config.get_auth()
(verify, cert) = cloud_config.get_requests_verify_args()
ks_session = session.Session(
auth=keystone_auth,
verify=verify,
cert=cert)
return v2_client.Client(session=ks_session)

View File

@@ -1,434 +0,0 @@
# Copyright 2012 NEC Corporation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import json
import logging
import uuid
import fixtures
from oslo_serialization import jsonutils
from requests_mock.contrib import fixture as mock_fixture
import testtools
from keystoneclient.auth.identity import v2 as ks_v2_auth
from keystoneclient.auth.identity import v3 as ks_v3_auth
from keystoneclient import exceptions as ks_exceptions
from keystoneclient import fixture as ks_fixture
from keystoneclient import session
from neutronclient import client
from neutronclient.common import exceptions
USERNAME = 'testuser'
USER_ID = 'testuser_id'
TENANT_NAME = 'testtenant'
TENANT_ID = 'testtenant_id'
PASSWORD = 'password'
ENDPOINT_URL = 'http://localurl'
PUBLIC_ENDPOINT_URL = '%s/public' % ENDPOINT_URL
ADMIN_ENDPOINT_URL = '%s/admin' % ENDPOINT_URL
INTERNAL_ENDPOINT_URL = '%s/internal' % ENDPOINT_URL
ENDPOINT_OVERRIDE = 'http://otherurl'
TOKENID = uuid.uuid4().hex
REGION = 'RegionOne'
NOAUTH = 'noauth'
KS_TOKEN_RESULT = ks_fixture.V2Token()
KS_TOKEN_RESULT.set_scope()
_s = KS_TOKEN_RESULT.add_service('network', 'Neutron Service')
_s.add_endpoint(ENDPOINT_URL, region=REGION)
ENDPOINTS_RESULT = {
'endpoints': [{
'type': 'network',
'name': 'Neutron Service',
'region': REGION,
'adminURL': ENDPOINT_URL,
'internalURL': ENDPOINT_URL,
'publicURL': ENDPOINT_URL
}]
}
BASE_URL = "http://keystone.example.com:5000/"
V2_URL = "%sv2.0" % BASE_URL
V3_URL = "%sv3" % BASE_URL
_v2 = ks_fixture.V2Discovery(V2_URL)
_v3 = ks_fixture.V3Discovery(V3_URL)
V3_VERSION_LIST = jsonutils.dumps({'versions': {'values': [_v2, _v3]}})
V2_VERSION_ENTRY = {'version': _v2}
V3_VERSION_ENTRY = {'version': _v3}
def setup_keystone_v2(mrequests):
v2_token = ks_fixture.V2Token(token_id=TOKENID)
service = v2_token.add_service('network')
service.add_endpoint(PUBLIC_ENDPOINT_URL, region=REGION)
mrequests.register_uri('POST',
'%s/tokens' % (V2_URL),
json=v2_token)
auth_session = session.Session()
auth_plugin = ks_v2_auth.Password(V2_URL, 'xx', 'xx')
return auth_session, auth_plugin
def setup_keystone_v3(mrequests):
mrequests.register_uri('GET',
V3_URL,
json=V3_VERSION_ENTRY)
v3_token = ks_fixture.V3Token()
service = v3_token.add_service('network')
service.add_standard_endpoints(public=PUBLIC_ENDPOINT_URL,
admin=ADMIN_ENDPOINT_URL,
internal=INTERNAL_ENDPOINT_URL,
region=REGION)
mrequests.register_uri('POST',
'%s/auth/tokens' % (V3_URL),
text=json.dumps(v3_token),
headers={'X-Subject-Token': TOKENID})
auth_session = session.Session()
auth_plugin = ks_v3_auth.Password(V3_URL,
username='xx',
user_id='xx',
user_domain_name='xx',
user_domain_id='xx')
return auth_session, auth_plugin
AUTH_URL = V2_URL
class CLITestAuthNoAuth(testtools.TestCase):
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthNoAuth, self).setUp()
self.requests = self.useFixture(mock_fixture.Fixture())
self.client = client.HTTPClient(username=USERNAME,
tenant_name=TENANT_NAME,
password=PASSWORD,
endpoint_url=ENDPOINT_URL,
auth_strategy=NOAUTH,
region_name=REGION)
def test_get_noauth(self):
url = ENDPOINT_URL + '/resource'
self.requests.get(ENDPOINT_URL + '/resource')
self.client.do_request('/resource', 'GET')
self.assertEqual(url, self.requests.last_request.url)
self.assertEqual(ENDPOINT_URL, self.client.endpoint_url)
class CLITestAuthKeystone(testtools.TestCase):
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthKeystone, self).setUp()
for var in ('http_proxy', 'HTTP_PROXY'):
self.useFixture(fixtures.EnvironmentVariableFixture(var))
self.logger = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
self.requests = self.useFixture(mock_fixture.Fixture())
self.client = client.construct_http_client(
username=USERNAME,
tenant_name=TENANT_NAME,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)
def test_reused_token_get_auth_info(self):
"""Test that Client.get_auth_info() works even if client was
instantiated with predefined token.
"""
token_id = uuid.uuid4().hex
client_ = client.HTTPClient(username=USERNAME,
tenant_name=TENANT_NAME,
token=token_id,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)
expected = {'auth_token': token_id,
'auth_tenant_id': None,
'auth_user_id': None,
'endpoint_url': self.client.endpoint_url}
self.assertEqual(expected, client_.get_auth_info())
def test_get_token(self):
auth_session, auth_plugin = setup_keystone_v2(self.requests)
self.client = client.construct_http_client(
username=USERNAME,
tenant_name=TENANT_NAME,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION,
session=auth_session,
auth=auth_plugin)
m = self.requests.get(PUBLIC_ENDPOINT_URL + '/resource',
request_headers={'X-Auth-Token': TOKENID})
self.client.do_request('/resource', 'GET')
self.assertTrue(m.called)
def test_refresh_token(self):
token_id = uuid.uuid4().hex
text = uuid.uuid4().hex
self.client.auth_token = token_id
self.client.endpoint_url = ENDPOINT_URL
res_url = ENDPOINT_URL + '/resource'
v2_url = AUTH_URL + '/tokens'
# token_id gives 401, KS_TOKEN_RESULT gives 200
self.requests.get(res_url,
request_headers={'X-Auth-Token': token_id},
status_code=401)
self.requests.get(
res_url,
text=text,
status_code=200,
request_headers={'X-Auth-Token': KS_TOKEN_RESULT.token_id})
self.requests.post(v2_url, json=KS_TOKEN_RESULT)
resp = self.client.do_request('/resource', 'GET')
self.assertEqual(text, resp[1])
self.assertEqual(3, len(self.requests.request_history))
self.assertEqual(res_url, self.requests.request_history[0].url)
self.assertEqual(v2_url, self.requests.request_history[1].url)
self.assertEqual(res_url, self.requests.request_history[2].url)
def test_refresh_token_no_auth_url(self):
self.client.auth_url = None
token_id = uuid.uuid4().hex
self.client.auth_token = token_id
self.client.endpoint_url = ENDPOINT_URL
self.requests.get(ENDPOINT_URL + '/resource', status_code=401)
self.assertRaises(exceptions.NoAuthURLProvided,
self.client.do_request,
'/resource',
'GET')
def test_get_endpoint_url_with_invalid_auth_url(self):
# Handle the case when auth_url is not provided
self.client.auth_url = None
self.assertRaises(exceptions.NoAuthURLProvided,
self.client._get_endpoint_url)
def test_get_endpoint_url(self):
token_id = uuid.uuid4().hex
self.client.auth_token = token_id
self.requests.get(AUTH_URL + '/tokens/%s/endpoints' % token_id,
json=ENDPOINTS_RESULT)
self.requests.get(ENDPOINT_URL + '/resource')
self.client.do_request('/resource', 'GET')
self.assertEqual(token_id,
self.requests.last_request.headers['X-Auth-Token'])
def test_use_given_endpoint_url(self):
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION,
endpoint_url=ENDPOINT_OVERRIDE)
self.assertEqual(ENDPOINT_OVERRIDE, self.client.endpoint_url)
token_id = uuid.uuid4().hex
self.client.auth_token = token_id
self.requests.get(ENDPOINT_OVERRIDE + '/resource')
self.client.do_request('/resource', 'GET')
self.assertEqual(ENDPOINT_OVERRIDE, self.client.endpoint_url)
self.assertEqual(token_id,
self.requests.last_request.headers['X-Auth-Token'])
def test_get_endpoint_url_other(self):
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='otherURL')
token_id = uuid.uuid4().hex
self.client.auth_token = token_id
self.requests.get(AUTH_URL + '/tokens/%s/endpoints' % token_id,
json=ENDPOINTS_RESULT)
self.assertRaises(exceptions.EndpointTypeNotFound,
self.client.do_request,
'/resource',
'GET')
def test_get_endpoint_url_failed(self):
token_id = uuid.uuid4().hex
self.client.auth_token = token_id
self.requests.get(AUTH_URL + '/tokens/%s/endpoints' % token_id,
status_code=401)
self.requests.post(AUTH_URL + '/tokens', json=KS_TOKEN_RESULT)
m = self.requests.get(ENDPOINT_URL + '/resource')
self.client.do_request('/resource', 'GET')
self.assertEqual(KS_TOKEN_RESULT.token_id,
m.last_request.headers['X-Auth-Token'])
def test_endpoint_type(self):
auth_session, auth_plugin = setup_keystone_v3(self.requests)
# Test default behavior is to choose public.
self.client = client.construct_http_client(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION,
session=auth_session, auth=auth_plugin)
self.assertEqual(PUBLIC_ENDPOINT_URL, self.client.endpoint_url)
# Test admin url
self.client = client.construct_http_client(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='adminURL',
session=auth_session, auth=auth_plugin)
self.assertEqual(ADMIN_ENDPOINT_URL, self.client.endpoint_url)
# Test public url
self.client = client.construct_http_client(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='publicURL',
session=auth_session, auth=auth_plugin)
self.assertEqual(PUBLIC_ENDPOINT_URL, self.client.endpoint_url)
# Test internal url
self.client = client.construct_http_client(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='internalURL',
session=auth_session, auth=auth_plugin)
self.assertEqual(INTERNAL_ENDPOINT_URL, self.client.endpoint_url)
# Test url that isn't found in the service catalog
self.client = client.construct_http_client(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='privateURL',
session=auth_session, auth=auth_plugin)
self.assertRaises(
ks_exceptions.EndpointNotFound,
getattr, self.client, 'endpoint_url')
def test_strip_credentials_from_log(self):
m = self.requests.post(AUTH_URL + '/tokens', json=KS_TOKEN_RESULT)
self.requests.get(ENDPOINT_URL + '/resource')
self.client.do_request('/resource', 'GET')
self.assertIn('REDACTED', self.logger.output)
self.assertNotIn(self.client.password, self.logger.output)
self.assertNotIn('REDACTED', m.last_request.body)
self.assertIn(self.client.password, m.last_request.body)
class CLITestAuthKeystoneWithId(CLITestAuthKeystone):
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthKeystoneWithId, self).setUp()
self.client = client.HTTPClient(user_id=USER_ID,
tenant_id=TENANT_ID,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)
class CLITestAuthKeystoneWithIdandName(CLITestAuthKeystone):
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthKeystoneWithIdandName, self).setUp()
self.client = client.HTTPClient(username=USERNAME,
user_id=USER_ID,
tenant_id=TENANT_ID,
tenant_name=TENANT_NAME,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)
class TestKeystoneClientVersions(testtools.TestCase):
def setUp(self):
"""Prepare the test environment."""
super(TestKeystoneClientVersions, self).setUp()
self.requests = self.useFixture(mock_fixture.Fixture())
def test_v2_auth(self):
auth_session, auth_plugin = setup_keystone_v2(self.requests)
self.client = client.construct_http_client(
username=USERNAME,
tenant_name=TENANT_NAME,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION,
session=auth_session,
auth=auth_plugin)
m = self.requests.get(PUBLIC_ENDPOINT_URL + '/resource')
self.client.do_request('/resource', 'GET')
self.assertTrue(m.called)
def test_v3_auth(self):
auth_session, auth_plugin = setup_keystone_v3(self.requests)
self.client = client.construct_http_client(
user_id=USER_ID,
tenant_id=TENANT_ID,
password=PASSWORD,
auth_url=V3_URL,
region_name=REGION,
session=auth_session,
auth=auth_plugin)
m = self.requests.get(PUBLIC_ENDPOINT_URL + '/resource')
self.client.do_request('/resource', 'GET')
self.assertTrue(m.called)

View File

@@ -21,7 +21,6 @@ import testtools
from neutronclient import client
from neutronclient.common import exceptions
from neutronclient.tests.unit import test_auth
AUTH_TOKEN = 'test_token'
@@ -74,14 +73,6 @@ class TestHTTPClientMixin(object):
self._test_headers(headers, body=BODY, headers=headers)
class TestSessionClient(TestHTTPClientMixin, testtools.TestCase):
def initialize(self):
session, auth = test_auth.setup_keystone_v2(self.requests)
return [client.SessionClient,
client.SessionClient(session=session, auth=auth)]
class TestHTTPClient(TestHTTPClientMixin, testtools.TestCase):
def initialize(self):

View File

@@ -21,18 +21,11 @@ import sys
import fixtures
from mox3 import mox
import requests_mock
import six
import testtools
from testtools import matchers
from keystoneclient.auth.identity import v2 as v2_auth
from keystoneclient.auth.identity import v3 as v3_auth
from keystoneclient import session
from neutronclient.common import clientmanager
from neutronclient import shell as openstack_shell
from neutronclient.tests.unit import test_auth as auth
DEFAULT_USERNAME = 'username'
@@ -150,297 +143,6 @@ class ShellTest(testtools.TestCase):
self.assertThat(help_text,
matchers.MatchesRegex(r, re.DOTALL | re.MULTILINE))
def test_unknown_auth_strategy(self):
self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
stdout, stderr = self.shell('--os-auth-strategy fake quota-list')
self.assertFalse(stdout)
self.assertEqual('You must provide a service URL via '
'either --os-url or env[OS_URL]', stderr.strip())
@requests_mock.Mocker()
def test_auth(self, mrequests):
# emulate Keystone version discovery
mrequests.register_uri('GET',
auth.V3_URL,
json=auth.V3_VERSION_ENTRY)
neutron_shell = openstack_shell.NeutronShell('2.0')
self.addCleanup(self.mox.UnsetStubs)
self.mox.StubOutWithMock(clientmanager.ClientManager, '__init__')
self.mox.StubOutWithMock(neutron_shell, 'run_subcommand')
clientmanager.ClientManager.__init__(
token='', url='', auth_url=auth.V3_URL,
tenant_name='test', tenant_id='tenant_id',
username='test', user_id='',
password='test', region_name='', api_version={'network': '2.0'},
auth_strategy='keystone', service_type='network',
endpoint_type='publicURL', insecure=False, ca_cert=None,
timeout=None,
raise_errors=False,
retries=0,
auth=mox.IsA(v3_auth.Password),
session=mox.IsA(session.Session),
log_credentials=True)
neutron_shell.run_subcommand(['quota-list'])
self.mox.ReplayAll()
cmdline = ('--os-username test '
'--os-password test '
'--os-tenant-name test '
'--os-auth-url %s '
'--os-auth-strategy keystone quota-list'
% auth.V3_URL)
neutron_shell.run(cmdline.split())
self.mox.VerifyAll()
@requests_mock.Mocker()
def test_auth_cert_and_key(self, mrequests):
# emulate Keystone version discovery
mrequests.register_uri('GET',
auth.V3_URL,
json=auth.V3_VERSION_ENTRY)
neutron_shell = openstack_shell.NeutronShell('2.0')
self.addCleanup(self.mox.UnsetStubs)
self.mox.StubOutWithMock(clientmanager.ClientManager, '__init__')
self.mox.StubOutWithMock(neutron_shell, 'run_subcommand')
clientmanager.ClientManager.__init__(
token='', url='', auth_url=auth.V3_URL,
tenant_name='test', tenant_id='tenant_id',
username='test', user_id='',
password='test', region_name='', api_version={'network': '2.0'},
auth_strategy='keystone', service_type='network',
raise_errors=False,
endpoint_type='publicURL', insecure=False, ca_cert=None, retries=0,
timeout=None,
auth=mox.IsA(v3_auth.Password),
session=mox.IsA(session.Session),
log_credentials=True)
neutron_shell.run_subcommand(['quota-list'])
self.mox.ReplayAll()
cmdline = ('--os-username test '
'--os-password test '
'--os-tenant-name test '
'--os-cert test '
'--os-key test '
'--os-auth-url %s '
'--os-auth-strategy keystone quota-list'
% auth.V3_URL)
neutron_shell.run(cmdline.split())
self.mox.VerifyAll()
@requests_mock.Mocker()
def test_v2_auth(self, mrequests):
# emulate Keystone version discovery
mrequests.register_uri('GET',
auth.V2_URL,
json=auth.V2_VERSION_ENTRY)
neutron_shell = openstack_shell.NeutronShell('2.0')
self.addCleanup(self.mox.UnsetStubs)
self.mox.StubOutWithMock(clientmanager.ClientManager, '__init__')
self.mox.StubOutWithMock(neutron_shell, 'run_subcommand')
clientmanager.ClientManager.__init__(
token='', url='', auth_url=auth.V2_URL,
tenant_name='test', tenant_id='tenant_id',
username='test', user_id='',
password='test', region_name='', api_version={'network': '2.0'},
auth_strategy='keystone', service_type='network',
endpoint_type='publicURL', insecure=False, ca_cert=None,
timeout=None,
raise_errors=False,
retries=0,
auth=mox.IsA(v2_auth.Password),
session=mox.IsA(session.Session),
log_credentials=True)
neutron_shell.run_subcommand(['quota-list'])
self.mox.ReplayAll()
cmdline = ('--os-username test '
'--os-password test '
'--os-tenant-name test '
'--os-auth-url %s '
'--os-auth-strategy keystone quota-list'
% auth.V2_URL)
neutron_shell.run(cmdline.split())
self.mox.VerifyAll()
@requests_mock.Mocker()
def test_failed_auth_version_discovery_v3_auth_url(self, mrequests):
# emulate Keystone version discovery
mrequests.register_uri('GET',
auth.V3_URL,
status_code=405)
neutron_shell = openstack_shell.NeutronShell('2.0')
self.addCleanup(self.mox.UnsetStubs)
self.mox.StubOutWithMock(clientmanager.ClientManager, '__init__')
self.mox.StubOutWithMock(neutron_shell, 'run_subcommand')
clientmanager.ClientManager.__init__(
token='', url='', auth_url=auth.V3_URL,
tenant_name='test', tenant_id='tenant_id',
username='test', user_id='',
password='test', region_name='', api_version={'network': '2.0'},
auth_strategy='keystone', service_type='network',
endpoint_type='publicURL', insecure=False, ca_cert=None,
timeout=None,
raise_errors=False,
retries=0,
auth=mox.IsA(v3_auth.Password),
session=mox.IsA(session.Session),
log_credentials=True)
neutron_shell.run_subcommand(['quota-list'])
self.mox.ReplayAll()
cmdline = ('--os-username test '
'--os-password test '
'--os-user-domain-name test '
'--os-tenant-name test '
'--os-auth-url %s '
'--os-auth-strategy keystone quota-list'
% auth.V3_URL)
neutron_shell.run(cmdline.split())
self.mox.VerifyAll()
@requests_mock.Mocker()
def test_failed_auth_version_discovery_v2_auth_url(self, mrequests):
# emulate Keystone version discovery
mrequests.register_uri('GET',
auth.V2_URL,
status_code=405)
neutron_shell = openstack_shell.NeutronShell('2.0')
self.addCleanup(self.mox.UnsetStubs)
self.mox.StubOutWithMock(clientmanager.ClientManager, '__init__')
self.mox.StubOutWithMock(neutron_shell, 'run_subcommand')
clientmanager.ClientManager.__init__(
token='', url='', auth_url=auth.V2_URL,
tenant_name='test', tenant_id='tenant_id',
username='test', user_id='',
password='test', region_name='', api_version={'network': '2.0'},
auth_strategy='keystone', service_type='network',
endpoint_type='publicURL', insecure=False, ca_cert=None,
timeout=None,
raise_errors=False,
retries=0,
auth=mox.IsA(v2_auth.Password),
session=mox.IsA(session.Session),
log_credentials=True)
neutron_shell.run_subcommand(['quota-list'])
self.mox.ReplayAll()
cmdline = ('--os-username test '
'--os-password test '
'--os-tenant-name test '
'--os-auth-url %s '
'--os-auth-strategy keystone quota-list'
% auth.V2_URL)
neutron_shell.run(cmdline.split())
self.mox.VerifyAll()
@requests_mock.Mocker()
def test_auth_version_discovery_v3(self, mrequests):
# emulate Keystone version discovery
mrequests.register_uri('GET',
auth.BASE_URL,
text=auth.V3_VERSION_LIST)
neutron_shell = openstack_shell.NeutronShell('2.0')
self.addCleanup(self.mox.UnsetStubs)
self.mox.StubOutWithMock(clientmanager.ClientManager, '__init__')
self.mox.StubOutWithMock(neutron_shell, 'run_subcommand')
clientmanager.ClientManager.__init__(
token='', url='', auth_url=auth.BASE_URL,
tenant_name='test', tenant_id='tenant_id',
username='test', user_id='',
password='test', region_name='', api_version={'network': '2.0'},
auth_strategy='keystone', service_type='network',
endpoint_type='publicURL', insecure=False, ca_cert=None,
timeout=None,
raise_errors=False,
retries=0,
auth=mox.IsA(v3_auth.Password),
session=mox.IsA(session.Session),
log_credentials=True)
neutron_shell.run_subcommand(['quota-list'])
self.mox.ReplayAll()
cmdline = ('--os-username test '
'--os-password test '
'--os-user-domain-name test '
'--os-tenant-name test '
'--os-auth-url %s '
'--os-auth-strategy keystone quota-list'
% auth.BASE_URL)
neutron_shell.run(cmdline.split())
self.mox.VerifyAll()
@requests_mock.Mocker()
def test_auth_version_discovery_v2(self, mrequests):
# emulate Keystone version discovery
mrequests.register_uri('GET',
auth.BASE_URL,
text=auth.V3_VERSION_LIST)
neutron_shell = openstack_shell.NeutronShell('2.0')
self.addCleanup(self.mox.UnsetStubs)
self.mox.StubOutWithMock(clientmanager.ClientManager, '__init__')
self.mox.StubOutWithMock(neutron_shell, 'run_subcommand')
clientmanager.ClientManager.__init__(
token='', url='', auth_url=auth.BASE_URL,
tenant_name='test', tenant_id='tenant_id',
username='test', user_id='',
password='test', region_name='', api_version={'network': '2.0'},
auth_strategy='keystone', service_type='network',
endpoint_type='publicURL', insecure=False, ca_cert=None,
timeout=None,
raise_errors=False,
retries=0,
auth=mox.IsA(v2_auth.Password),
session=mox.IsA(session.Session),
log_credentials=True)
neutron_shell.run_subcommand(['quota-list'])
self.mox.ReplayAll()
cmdline = ('--os-username test '
'--os-password test '
'--os-tenant-name test '
'--os-auth-url %s '
'--os-auth-strategy keystone quota-list'
% auth.BASE_URL)
neutron_shell.run(cmdline.split())
self.mox.VerifyAll()
@requests_mock.Mocker()
def test_insecure_auth(self, mrequests):
# emulate Keystone version discovery
mrequests.register_uri('GET',
auth.V2_URL,
json=auth.V2_VERSION_ENTRY)
neutron_shell = openstack_shell.NeutronShell('2.0')
self.addCleanup(self.mox.UnsetStubs)
self.mox.StubOutWithMock(clientmanager.ClientManager, '__init__')
self.mox.StubOutWithMock(neutron_shell, 'run_subcommand')
clientmanager.ClientManager.__init__(
token='', url='', auth_url=auth.V2_URL,
tenant_name='test', tenant_id='tenant_id',
username='test', user_id='',
password='test', region_name='', api_version={'network': '2.0'},
auth_strategy='keystone', service_type='network',
endpoint_type='publicURL', insecure=True, ca_cert=None,
timeout=None,
raise_errors=False,
retries=0,
auth=mox.IgnoreArg(),
session=mox.IgnoreArg(),
log_credentials=True)
neutron_shell.run_subcommand(['quota-list'])
self.mox.ReplayAll()
cmdline = ('--os-username test '
'--os-password test '
'--os-tenant-name test '
'--insecure '
'--os-auth-url %s '
'--os-auth-strategy keystone quota-list'
% auth.V2_URL)
neutron_shell.run(cmdline.split())
self.mox.VerifyAll()
def test_build_option_parser(self):
neutron_shell = openstack_shell.NeutronShell('2.0')
result = neutron_shell.build_option_parser('descr', '2.0')
@@ -465,7 +167,7 @@ class ShellTest(testtools.TestCase):
# Neither $OS_ENDPOINT_TYPE nor --os-endpoint-type
namespace = parser.parse_args([])
self.assertEqual('publicURL', namespace.os_endpoint_type)
self.assertEqual('public', namespace.os_endpoint_type)
# --endpoint-type but not $OS_ENDPOINT_TYPE
namespace = parser.parse_args(['--os-endpoint-type=admin'])

View File

@@ -1,179 +0,0 @@
# Copyright (C) 2013 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
import requests
import testtools
from mox3 import mox
import requests_mock
from neutronclient.client import HTTPClient
from neutronclient.common.clientmanager import ClientManager
from neutronclient.common import exceptions
from neutronclient import shell as openstack_shell
from neutronclient.tests.unit import test_auth as auth
AUTH_TOKEN = 'test_token'
END_URL = 'test_url'
METHOD = 'GET'
URL = 'http://test.test:1234/v2.0/'
CA_CERT = '/tmp/test/path'
class TestSSL(testtools.TestCase):
def setUp(self):
super(TestSSL, self).setUp()
self.useFixture(fixtures.EnvironmentVariable('OS_TOKEN', AUTH_TOKEN))
self.useFixture(fixtures.EnvironmentVariable('OS_URL', END_URL))
self.mox = mox.Mox()
self.addCleanup(self.mox.UnsetStubs)
@requests_mock.Mocker()
def test_ca_cert_passed(self, mrequests):
# emulate Keystone version discovery
mrequests.register_uri('GET',
auth.V3_URL,
json=auth.V3_VERSION_ENTRY)
self.mox.StubOutWithMock(ClientManager, '__init__')
self.mox.StubOutWithMock(openstack_shell.NeutronShell, 'interact')
ClientManager.__init__(
ca_cert=CA_CERT,
# we are not really interested in other args
api_version=mox.IgnoreArg(),
auth_strategy=mox.IgnoreArg(),
auth_url=mox.IgnoreArg(),
service_type=mox.IgnoreArg(),
endpoint_type=mox.IgnoreArg(),
insecure=mox.IgnoreArg(),
password=mox.IgnoreArg(),
region_name=mox.IgnoreArg(),
tenant_id=mox.IgnoreArg(),
tenant_name=mox.IgnoreArg(),
token=mox.IgnoreArg(),
url=mox.IgnoreArg(),
username=mox.IgnoreArg(),
user_id=mox.IgnoreArg(),
retries=mox.IgnoreArg(),
raise_errors=mox.IgnoreArg(),
log_credentials=mox.IgnoreArg(),
timeout=mox.IgnoreArg(),
auth=mox.IgnoreArg(),
session=mox.IgnoreArg()
)
openstack_shell.NeutronShell.interact().AndReturn(0)
self.mox.ReplayAll()
cmdline = (
'--os-cacert %s --os-auth-url %s' %
(CA_CERT, auth.V3_URL))
openstack_shell.NeutronShell('2.0').run(cmdline.split())
self.mox.VerifyAll()
@requests_mock.Mocker()
def test_ca_cert_passed_as_env_var(self, mrequests):
# emulate Keystone version discovery
mrequests.register_uri('GET',
auth.V3_URL,
json=auth.V3_VERSION_ENTRY)
self.useFixture(fixtures.EnvironmentVariable('OS_CACERT', CA_CERT))
self.mox.StubOutWithMock(ClientManager, '__init__')
self.mox.StubOutWithMock(openstack_shell.NeutronShell, 'interact')
ClientManager.__init__(
ca_cert=CA_CERT,
# we are not really interested in other args
api_version=mox.IgnoreArg(),
auth_strategy=mox.IgnoreArg(),
auth_url=mox.IgnoreArg(),
service_type=mox.IgnoreArg(),
endpoint_type=mox.IgnoreArg(),
insecure=mox.IgnoreArg(),
password=mox.IgnoreArg(),
region_name=mox.IgnoreArg(),
tenant_id=mox.IgnoreArg(),
tenant_name=mox.IgnoreArg(),
token=mox.IgnoreArg(),
url=mox.IgnoreArg(),
username=mox.IgnoreArg(),
user_id=mox.IgnoreArg(),
retries=mox.IgnoreArg(),
raise_errors=mox.IgnoreArg(),
log_credentials=mox.IgnoreArg(),
timeout=mox.IgnoreArg(),
auth=mox.IgnoreArg(),
session=mox.IgnoreArg()
)
openstack_shell.NeutronShell.interact().AndReturn(0)
self.mox.ReplayAll()
cmdline = ('--os-auth-url %s' % auth.V3_URL)
openstack_shell.NeutronShell('2.0').run(cmdline.split())
self.mox.VerifyAll()
def test_client_manager_properly_creates_httpclient_instance(self):
self.mox.StubOutWithMock(HTTPClient, '__init__')
HTTPClient.__init__(
ca_cert=CA_CERT,
# we are not really interested in other args
auth_strategy=mox.IgnoreArg(),
auth_url=mox.IgnoreArg(),
endpoint_url=mox.IgnoreArg(),
insecure=mox.IgnoreArg(),
password=mox.IgnoreArg(),
region_name=mox.IgnoreArg(),
tenant_name=mox.IgnoreArg(),
token=mox.IgnoreArg(),
username=mox.IgnoreArg(),
user_id=mox.IgnoreArg(),
tenant_id=mox.IgnoreArg(),
timeout=mox.IgnoreArg(),
log_credentials=mox.IgnoreArg(),
service_type=mox.IgnoreArg(),
endpoint_type=mox.IgnoreArg()
)
self.mox.ReplayAll()
version = {'network': '2.0'}
ClientManager(ca_cert=CA_CERT,
api_version=version,
url=END_URL,
token=AUTH_TOKEN).neutron
self.mox.VerifyAll()
def test_proper_exception_is_raised_when_cert_validation_fails(self):
http = HTTPClient(token=AUTH_TOKEN, endpoint_url=END_URL)
self.mox.StubOutWithMock(HTTPClient, 'request')
HTTPClient.request(
URL, METHOD, headers=mox.IgnoreArg()
).AndRaise(requests.exceptions.SSLError)
self.mox.ReplayAll()
self.assertRaises(
exceptions.SslCertificateValidationError,
http._cs_request,
URL, METHOD
)
self.mox.VerifyAll()

View File

@@ -9,8 +9,9 @@ netaddr!=0.7.16,>=0.7.12
oslo.i18n>=1.5.0 # Apache-2.0
oslo.serialization>=1.10.0 # Apache-2.0
oslo.utils!=2.6.0,>=2.4.0 # Apache-2.0
os-client-config!=1.6.2,>=1.4.0
keystoneauth1>=1.0.0
requests!=2.8.0,>=2.5.2
python-keystoneclient!=1.8.0,>=1.6.0
simplejson>=2.2.0
six>=1.9.0
Babel>=1.3

View File

@@ -8,7 +8,6 @@ discover
fixtures>=1.3.1
mox3>=0.7.0
mock>=1.2
os-client-config!=1.6.2,>=1.4.0
oslosphinx>=2.5.0 # Apache-2.0
oslotest>=1.10.0 # Apache-2.0
python-subunit>=0.0.18