Merge "Expose auth methods on the adapter"

This commit is contained in:
Jenkins
2014-09-12 09:47:06 +00:00
committed by Gerrit Code Review
2 changed files with 88 additions and 28 deletions

View File

@@ -52,19 +52,21 @@ class Adapter(object):
self.user_agent = user_agent
self.auth = auth
def _set_endpoint_filter_kwargs(self, kwargs):
if self.service_type:
kwargs.setdefault('service_type', self.service_type)
if self.service_name:
kwargs.setdefault('service_name', self.service_name)
if self.interface:
kwargs.setdefault('interface', self.interface)
if self.region_name:
kwargs.setdefault('region_name', self.region_name)
if self.version:
kwargs.setdefault('version', self.version)
def request(self, url, method, **kwargs):
endpoint_filter = kwargs.setdefault('endpoint_filter', {})
if self.service_type:
endpoint_filter.setdefault('service_type', self.service_type)
if self.service_name:
endpoint_filter.setdefault('service_name', self.service_name)
if self.interface:
endpoint_filter.setdefault('interface', self.interface)
if self.region_name:
endpoint_filter.setdefault('region_name', self.region_name)
if self.version:
endpoint_filter.setdefault('version', self.version)
self._set_endpoint_filter_kwargs(endpoint_filter)
if self.endpoint_override:
kwargs.setdefault('endpoint_override', self.endpoint_override)
@@ -76,6 +78,37 @@ class Adapter(object):
return self.session.request(url, method, **kwargs)
def get_token(self, auth=None):
"""Return a token as provided by the auth plugin.
:param auth: The auth plugin to use for token. Overrides the plugin
on the session. (optional)
:type auth: :class:`keystoneclient.auth.base.BaseAuthPlugin`
:raises AuthorizationFailure: if a new token fetch fails.
:returns string: A valid token.
"""
return self.session.get_token(auth or self.auth)
def get_endpoint(self, auth=None, **kwargs):
"""Get an endpoint as provided by the auth plugin.
:param auth: The auth plugin to use for token. Overrides the plugin on
the session. (optional)
:type auth: :class:`keystoneclient.auth.base.BaseAuthPlugin`
:raises MissingAuthPlugin: if a plugin is not available.
:returns string: An endpoint if available or None.
"""
self._set_endpoint_filter_kwargs(kwargs)
return self.session.get_endpoint(auth or self.auth, **kwargs)
def invalidate(self, auth=None):
"""Invalidate an authentication plugin."""
return self.session.invalidate(auth or self.auth)
def get(self, url, **kwargs):
return self.request(url, 'GET', **kwargs)

View File

@@ -552,13 +552,10 @@ class AdapterTest(utils.TestCase):
TEST_URL = CalledAuthPlugin.ENDPOINT
def test_setting_variables(self):
response = uuid.uuid4().hex
self.stub_url('GET', text=response)
def _create_loaded_adapter(self):
auth = CalledAuthPlugin()
sess = client_session.Session()
adpt = adapter.Adapter(sess,
return adapter.Adapter(sess,
auth=auth,
service_type=self.SERVICE_TYPE,
service_name=self.SERVICE_NAME,
@@ -567,23 +564,36 @@ class AdapterTest(utils.TestCase):
user_agent=self.USER_AGENT,
version=self.VERSION)
def _verify_endpoint_called(self, adpt):
self.assertEqual(self.SERVICE_TYPE,
adpt.auth.endpoint_arguments['service_type'])
self.assertEqual(self.SERVICE_NAME,
adpt.auth.endpoint_arguments['service_name'])
self.assertEqual(self.INTERFACE,
adpt.auth.endpoint_arguments['interface'])
self.assertEqual(self.REGION_NAME,
adpt.auth.endpoint_arguments['region_name'])
self.assertEqual(self.VERSION,
adpt.auth.endpoint_arguments['version'])
def test_setting_variables_on_request(self):
response = uuid.uuid4().hex
self.stub_url('GET', text=response)
adpt = self._create_loaded_adapter()
resp = adpt.get('/')
self.assertEqual(resp.text, response)
self.assertEqual(self.SERVICE_TYPE,
auth.endpoint_arguments['service_type'])
self.assertEqual(self.SERVICE_NAME,
auth.endpoint_arguments['service_name'])
self.assertEqual(self.INTERFACE,
auth.endpoint_arguments['interface'])
self.assertEqual(self.REGION_NAME,
auth.endpoint_arguments['region_name'])
self.assertEqual(self.VERSION,
auth.endpoint_arguments['version'])
self.assertTrue(auth.get_token_called)
self._verify_endpoint_called(adpt)
self.assertTrue(adpt.auth.get_token_called)
self.assertRequestHeaderEqual('User-Agent', self.USER_AGENT)
def test_setting_variables_on_get_endpoint(self):
adpt = self._create_loaded_adapter()
url = adpt.get_endpoint()
self.assertEqual(self.TEST_URL, url)
self._verify_endpoint_called(adpt)
def test_legacy_binding(self):
key = uuid.uuid4().hex
val = uuid.uuid4().hex
@@ -647,6 +657,23 @@ class AdapterTest(utils.TestCase):
self.assertEqual(response, resp.text)
self.assertEqual(endpoint_url, self.requests.last_request.url)
def test_adapter_invalidate(self):
auth = CalledAuthPlugin()
sess = client_session.Session()
adpt = adapter.Adapter(sess, auth=auth)
adpt.invalidate()
self.assertTrue(auth.invalidate_called)
def test_adapter_get_token(self):
auth = CalledAuthPlugin()
sess = client_session.Session()
adpt = adapter.Adapter(sess, auth=auth)
self.assertEqual(self.TEST_TOKEN, adpt.get_token())
self.assertTrue(auth.get_token_called)
class ConfLoadingTests(utils.TestCase):