Allow parameter expansion in endpoint_override

Allow %(project_id)s and %(user_id)s in an endpoint_override string to
be replaced with the appropriate values from the authentication. This
makes it easier to use and specify overrides from the command line or
scripts where we may not know the project_id/user_id ahead of time.

Change-Id: Ia09f5a88cb6590113dab26796c58e708014e98f7
Closes-Bug: #1536874
This commit is contained in:
Jamie Lennox 2016-01-22 13:37:03 +11:00
parent 834a96f31e
commit 585c525403
2 changed files with 108 additions and 2 deletions

View File

@ -71,6 +71,28 @@ class _JSONEncoder(json.JSONEncoder):
return super(_JSONEncoder, self).default(o)
class _StringFormatter(object):
"""A String formatter that fetches values on demand"""
def __init__(self, session, auth):
self.session = session
self.auth = auth
def __getitem__(self, item):
if item == 'project_id':
value = self.session.get_project_id(self.auth)
elif item == 'user_id':
value = self.session.get_user_id(self.auth)
else:
raise AttributeError(item)
if not value:
raise ValueError("This type of authentication does not provide a "
"%s that can be substituted" % item)
return value
class Session(object):
"""Maintains client communication state and common functionality.
@ -302,7 +324,11 @@ class Session(object):
endpoint in the auth plugin. This will be
ignored if a fully qualified URL is
provided but take priority over an
endpoint_filter. (optional)
endpoint_filter. This string may contain
the values %(project_id)s and %(user_id)s
to have those values replaced by the
project_id/user_id of the current
authentication. (optional)
:param auth: The auth plugin to use when authenticating this request.
This will override the plugin that is attached to the
session (if any). (optional)
@ -360,7 +386,7 @@ class Session(object):
base_url = None
if endpoint_override:
base_url = endpoint_override
base_url = endpoint_override % _StringFormatter(self, auth)
elif endpoint_filter:
base_url = self.get_endpoint(auth, **endpoint_filter)

View File

@ -25,6 +25,7 @@ from keystoneauth1 import exceptions
from keystoneauth1 import plugin
from keystoneauth1 import session as client_session
from keystoneauth1.tests.unit import utils
from keystoneauth1 import token_endpoint
class SessionTests(utils.TestCase):
@ -366,12 +367,16 @@ class AuthPlugin(plugin.BaseAuthPlugin):
class CalledAuthPlugin(plugin.BaseAuthPlugin):
ENDPOINT = 'http://fakeendpoint/'
USER_ID = uuid.uuid4().hex
PROJECT_ID = uuid.uuid4().hex
def __init__(self, invalidate=True):
self.get_token_called = False
self.get_endpoint_called = False
self.endpoint_arguments = {}
self.invalidate_called = False
self.get_project_id_called = False
self.get_user_id_called = False
self._invalidate = invalidate
def get_token(self, session):
@ -387,6 +392,14 @@ class CalledAuthPlugin(plugin.BaseAuthPlugin):
self.invalidate_called = True
return self._invalidate
def get_project_id(self, session, **kwargs):
self.get_project_id_called = True
return self.PROJECT_ID
def get_user_id(self, session, **kwargs):
self.get_user_id_called = True
return self.USER_ID
class SessionAuthTests(utils.TestCase):
@ -574,6 +587,9 @@ class SessionAuthTests(utils.TestCase):
self.assertTrue(auth.get_token_called)
self.assertFalse(auth.get_endpoint_called)
self.assertFalse(auth.get_user_id_called)
self.assertFalse(auth.get_project_id_called)
def test_endpoint_override_ignore_full_url(self):
auth = CalledAuthPlugin()
sess = client_session.Session(auth=auth)
@ -594,6 +610,70 @@ class SessionAuthTests(utils.TestCase):
self.assertTrue(auth.get_token_called)
self.assertFalse(auth.get_endpoint_called)
self.assertFalse(auth.get_user_id_called)
self.assertFalse(auth.get_project_id_called)
def test_endpoint_override_does_id_replacement(self):
auth = CalledAuthPlugin()
sess = client_session.Session(auth=auth)
override_base = 'http://mytest/%(project_id)s/%(user_id)s'
path = 'path'
replacements = {'user_id': CalledAuthPlugin.USER_ID,
'project_id': CalledAuthPlugin.PROJECT_ID}
override_url = override_base % replacements + '/' + path
resp_text = uuid.uuid4().hex
self.requests_mock.get(override_url, text=resp_text)
resp = sess.get(path,
endpoint_override=override_base,
endpoint_filter={'service_type': 'identity'})
self.assertEqual(resp_text, resp.text)
self.assertEqual(override_url, self.requests_mock.last_request.url)
self.assertTrue(auth.get_token_called)
self.assertTrue(auth.get_user_id_called)
self.assertTrue(auth.get_project_id_called)
self.assertFalse(auth.get_endpoint_called)
def test_endpoint_override_fails_to_replace_if_none(self):
# The token_endpoint plugin doesn't know user_id or project_id
auth = token_endpoint.Token(uuid.uuid4().hex, uuid.uuid4().hex)
sess = client_session.Session(auth=auth)
override_base = 'http://mytest/%(project_id)s'
e = self.assertRaises(ValueError,
sess.get,
'/path',
endpoint_override=override_base,
endpoint_filter={'service_type': 'identity'})
self.assertIn('project_id', str(e))
override_base = 'http://mytest/%(user_id)s'
e = self.assertRaises(ValueError,
sess.get,
'/path',
endpoint_override=override_base,
endpoint_filter={'service_type': 'identity'})
self.assertIn('user_id', str(e))
def test_endpoint_override_fails_to_do_unknown_replacement(self):
auth = CalledAuthPlugin()
sess = client_session.Session(auth=auth)
override_base = 'http://mytest/%(unknown_id)s'
e = self.assertRaises(AttributeError,
sess.get,
'/path',
endpoint_override=override_base,
endpoint_filter={'service_type': 'identity'})
self.assertIn('unknown_id', str(e))
def test_user_and_project_id(self):
auth = AuthPlugin()
sess = client_session.Session(auth=auth)