Expose auth methods on the adapter

Provide access to get_token, get_endpoint and invalidate to the adapter.
The adapter is essentially created per individual client and it can be
useful to know things like the endpoint that requests will be sent to
based on the parameters that are included in the endpoint_filter.

This essentially allows us to emulate the management_url and auth_token
properties of the existing clients.

Change-Id: Ic01bc52bb38e8fb72e7a6d93bfd2944b11d0b070
This commit is contained in:
Jamie Lennox
2014-08-29 17:21:00 +10:00
parent 0e61719043
commit 4be8e8db3f
2 changed files with 88 additions and 28 deletions

View File

@@ -52,19 +52,21 @@ class Adapter(object):
self.user_agent = user_agent self.user_agent = user_agent
self.auth = auth self.auth = auth
def _set_endpoint_filter_kwargs(self, kwargs):
if self.service_type:
kwargs.setdefault('service_type', self.service_type)
if self.service_name:
kwargs.setdefault('service_name', self.service_name)
if self.interface:
kwargs.setdefault('interface', self.interface)
if self.region_name:
kwargs.setdefault('region_name', self.region_name)
if self.version:
kwargs.setdefault('version', self.version)
def request(self, url, method, **kwargs): def request(self, url, method, **kwargs):
endpoint_filter = kwargs.setdefault('endpoint_filter', {}) endpoint_filter = kwargs.setdefault('endpoint_filter', {})
self._set_endpoint_filter_kwargs(endpoint_filter)
if self.service_type:
endpoint_filter.setdefault('service_type', self.service_type)
if self.service_name:
endpoint_filter.setdefault('service_name', self.service_name)
if self.interface:
endpoint_filter.setdefault('interface', self.interface)
if self.region_name:
endpoint_filter.setdefault('region_name', self.region_name)
if self.version:
endpoint_filter.setdefault('version', self.version)
if self.endpoint_override: if self.endpoint_override:
kwargs.setdefault('endpoint_override', self.endpoint_override) kwargs.setdefault('endpoint_override', self.endpoint_override)
@@ -76,6 +78,37 @@ class Adapter(object):
return self.session.request(url, method, **kwargs) return self.session.request(url, method, **kwargs)
def get_token(self, auth=None):
"""Return a token as provided by the auth plugin.
:param auth: The auth plugin to use for token. Overrides the plugin
on the session. (optional)
:type auth: :class:`keystoneclient.auth.base.BaseAuthPlugin`
:raises AuthorizationFailure: if a new token fetch fails.
:returns string: A valid token.
"""
return self.session.get_token(auth or self.auth)
def get_endpoint(self, auth=None, **kwargs):
"""Get an endpoint as provided by the auth plugin.
:param auth: The auth plugin to use for token. Overrides the plugin on
the session. (optional)
:type auth: :class:`keystoneclient.auth.base.BaseAuthPlugin`
:raises MissingAuthPlugin: if a plugin is not available.
:returns string: An endpoint if available or None.
"""
self._set_endpoint_filter_kwargs(kwargs)
return self.session.get_endpoint(auth or self.auth, **kwargs)
def invalidate(self, auth=None):
"""Invalidate an authentication plugin."""
return self.session.invalidate(auth or self.auth)
def get(self, url, **kwargs): def get(self, url, **kwargs):
return self.request(url, 'GET', **kwargs) return self.request(url, 'GET', **kwargs)

View File

@@ -552,13 +552,10 @@ class AdapterTest(utils.TestCase):
TEST_URL = CalledAuthPlugin.ENDPOINT TEST_URL = CalledAuthPlugin.ENDPOINT
def test_setting_variables(self): def _create_loaded_adapter(self):
response = uuid.uuid4().hex
self.stub_url('GET', text=response)
auth = CalledAuthPlugin() auth = CalledAuthPlugin()
sess = client_session.Session() sess = client_session.Session()
adpt = adapter.Adapter(sess, return adapter.Adapter(sess,
auth=auth, auth=auth,
service_type=self.SERVICE_TYPE, service_type=self.SERVICE_TYPE,
service_name=self.SERVICE_NAME, service_name=self.SERVICE_NAME,
@@ -567,23 +564,36 @@ class AdapterTest(utils.TestCase):
user_agent=self.USER_AGENT, user_agent=self.USER_AGENT,
version=self.VERSION) version=self.VERSION)
def _verify_endpoint_called(self, adpt):
self.assertEqual(self.SERVICE_TYPE,
adpt.auth.endpoint_arguments['service_type'])
self.assertEqual(self.SERVICE_NAME,
adpt.auth.endpoint_arguments['service_name'])
self.assertEqual(self.INTERFACE,
adpt.auth.endpoint_arguments['interface'])
self.assertEqual(self.REGION_NAME,
adpt.auth.endpoint_arguments['region_name'])
self.assertEqual(self.VERSION,
adpt.auth.endpoint_arguments['version'])
def test_setting_variables_on_request(self):
response = uuid.uuid4().hex
self.stub_url('GET', text=response)
adpt = self._create_loaded_adapter()
resp = adpt.get('/') resp = adpt.get('/')
self.assertEqual(resp.text, response) self.assertEqual(resp.text, response)
self.assertEqual(self.SERVICE_TYPE, self._verify_endpoint_called(adpt)
auth.endpoint_arguments['service_type']) self.assertTrue(adpt.auth.get_token_called)
self.assertEqual(self.SERVICE_NAME,
auth.endpoint_arguments['service_name'])
self.assertEqual(self.INTERFACE,
auth.endpoint_arguments['interface'])
self.assertEqual(self.REGION_NAME,
auth.endpoint_arguments['region_name'])
self.assertEqual(self.VERSION,
auth.endpoint_arguments['version'])
self.assertTrue(auth.get_token_called)
self.assertRequestHeaderEqual('User-Agent', self.USER_AGENT) self.assertRequestHeaderEqual('User-Agent', self.USER_AGENT)
def test_setting_variables_on_get_endpoint(self):
adpt = self._create_loaded_adapter()
url = adpt.get_endpoint()
self.assertEqual(self.TEST_URL, url)
self._verify_endpoint_called(adpt)
def test_legacy_binding(self): def test_legacy_binding(self):
key = uuid.uuid4().hex key = uuid.uuid4().hex
val = uuid.uuid4().hex val = uuid.uuid4().hex
@@ -647,6 +657,23 @@ class AdapterTest(utils.TestCase):
self.assertEqual(response, resp.text) self.assertEqual(response, resp.text)
self.assertEqual(endpoint_url, self.requests.last_request.url) self.assertEqual(endpoint_url, self.requests.last_request.url)
def test_adapter_invalidate(self):
auth = CalledAuthPlugin()
sess = client_session.Session()
adpt = adapter.Adapter(sess, auth=auth)
adpt.invalidate()
self.assertTrue(auth.invalidate_called)
def test_adapter_get_token(self):
auth = CalledAuthPlugin()
sess = client_session.Session()
adpt = adapter.Adapter(sess, auth=auth)
self.assertEqual(self.TEST_TOKEN, adpt.get_token())
self.assertTrue(auth.get_token_called)
class ConfLoadingTests(utils.TestCase): class ConfLoadingTests(utils.TestCase):