 c3c6edbe8a
			
		
	
	c3c6edbe8a
	
	
	
		
			
			The ksc auth plugins do not have support for the original token-endpoint (aka token flow) auth where the user supplies a token (possibly the Keystone admin_token) and an API endpoint. This is used for bootstrapping Keystone but also has other uses when a scoped user token is provided. The api.auth:TokenEndpoint class is required to provide the same interface methods so all of the special-case code branches to support token-endpoint can be removed. Some additional cleanups related to ClientManager and creating the Compute client also were done to streamline using sessions. Change-Id: I1a6059afa845a591eff92567ca346c09010a93af
		
			
				
	
	
		
			241 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			241 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #   Copyright 2012-2013 OpenStack Foundation
 | |
| #
 | |
| #   Licensed under the Apache License, Version 2.0 (the "License"); you may
 | |
| #   not use this file except in compliance with the License. You may obtain
 | |
| #   a copy of the License at
 | |
| #
 | |
| #        http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| #   Unless required by applicable law or agreed to in writing, software
 | |
| #   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | |
| #   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | |
| #   License for the specific language governing permissions and limitations
 | |
| #   under the License.
 | |
| #
 | |
| import mock
 | |
| from requests_mock.contrib import fixture
 | |
| 
 | |
| from keystoneclient.auth.identity import v2 as auth_v2
 | |
| from keystoneclient.openstack.common import jsonutils
 | |
| from keystoneclient import service_catalog
 | |
| 
 | |
| from openstackclient.api import auth
 | |
| from openstackclient.common import clientmanager
 | |
| from openstackclient.common import exceptions as exc
 | |
| from openstackclient.tests import fakes
 | |
| from openstackclient.tests import utils
 | |
| 
 | |
| 
 | |
| API_VERSION = {"identity": "2.0"}
 | |
| 
 | |
| AUTH_REF = {'version': 'v2.0'}
 | |
| AUTH_REF.update(fakes.TEST_RESPONSE_DICT['access'])
 | |
| SERVICE_CATALOG = service_catalog.ServiceCatalogV2(AUTH_REF)
 | |
| 
 | |
| 
 | |
| class Container(object):
 | |
|     attr = clientmanager.ClientCache(lambda x: object())
 | |
| 
 | |
|     def __init__(self):
 | |
|         pass
 | |
| 
 | |
| 
 | |
| class FakeOptions(object):
 | |
|     def __init__(self, **kwargs):
 | |
|         for option in auth.OPTIONS_LIST:
 | |
|             setattr(self, 'os_' + option.replace('-', '_'), None)
 | |
|         self.os_auth_plugin = None
 | |
|         self.os_identity_api_version = '2.0'
 | |
|         self.timing = None
 | |
|         self.os_region_name = None
 | |
|         self.os_url = None
 | |
|         self.__dict__.update(kwargs)
 | |
| 
 | |
| 
 | |
| class TestClientCache(utils.TestCase):
 | |
| 
 | |
|     def test_singleton(self):
 | |
|         # NOTE(dtroyer): Verify that the ClientCache descriptor only invokes
 | |
|         # the factory one time and always returns the same value after that.
 | |
|         c = Container()
 | |
|         self.assertEqual(c.attr, c.attr)
 | |
| 
 | |
| 
 | |
| class TestClientManager(utils.TestCase):
 | |
|     def setUp(self):
 | |
|         super(TestClientManager, self).setUp()
 | |
|         self.mock = mock.Mock()
 | |
|         self.requests = self.useFixture(fixture.Fixture())
 | |
|         # fake v2password token retrieval
 | |
|         self.stub_auth(json=fakes.TEST_RESPONSE_DICT)
 | |
|         # fake v3password token retrieval
 | |
|         self.stub_auth(json=fakes.TEST_RESPONSE_DICT_V3,
 | |
|                        url='/'.join([fakes.AUTH_URL, 'auth/tokens']))
 | |
|         # fake password version endpoint discovery
 | |
|         self.stub_auth(json=fakes.TEST_VERSIONS,
 | |
|                        url=fakes.AUTH_URL,
 | |
|                        verb='GET')
 | |
| 
 | |
|     def test_client_manager_token_endpoint(self):
 | |
| 
 | |
|         client_manager = clientmanager.ClientManager(
 | |
|             auth_options=FakeOptions(os_token=fakes.AUTH_TOKEN,
 | |
|                                      os_url=fakes.AUTH_URL,
 | |
|                                      os_auth_plugin='token_endpoint'),
 | |
|             api_version=API_VERSION,
 | |
|             verify=True
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             fakes.AUTH_URL,
 | |
|             client_manager._url,
 | |
|         )
 | |
| 
 | |
|         self.assertEqual(
 | |
|             fakes.AUTH_TOKEN,
 | |
|             client_manager._token,
 | |
|         )
 | |
|         self.assertIsInstance(
 | |
|             client_manager.auth,
 | |
|             auth.TokenEndpoint,
 | |
|         )
 | |
|         self.assertFalse(client_manager._insecure)
 | |
|         self.assertTrue(client_manager._verify)
 | |
| 
 | |
|     def test_client_manager_token(self):
 | |
| 
 | |
|         client_manager = clientmanager.ClientManager(
 | |
|             auth_options=FakeOptions(os_token=fakes.AUTH_TOKEN,
 | |
|                                      os_auth_url=fakes.AUTH_URL,
 | |
|                                      os_auth_plugin='v2token'),
 | |
|             api_version=API_VERSION,
 | |
|             verify=True
 | |
|         )
 | |
| 
 | |
|         self.assertEqual(
 | |
|             fakes.AUTH_TOKEN,
 | |
|             client_manager._token,
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             fakes.AUTH_URL,
 | |
|             client_manager._auth_url,
 | |
|         )
 | |
|         self.assertIsInstance(
 | |
|             client_manager.auth,
 | |
|             auth_v2.Token,
 | |
|         )
 | |
|         self.assertFalse(client_manager._insecure)
 | |
|         self.assertTrue(client_manager._verify)
 | |
| 
 | |
|     def test_client_manager_password(self):
 | |
| 
 | |
|         client_manager = clientmanager.ClientManager(
 | |
|             auth_options=FakeOptions(os_auth_url=fakes.AUTH_URL,
 | |
|                                      os_username=fakes.USERNAME,
 | |
|                                      os_password=fakes.PASSWORD),
 | |
|             api_version=API_VERSION,
 | |
|             verify=False,
 | |
|         )
 | |
| 
 | |
|         self.assertEqual(
 | |
|             fakes.AUTH_URL,
 | |
|             client_manager._auth_url,
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             fakes.USERNAME,
 | |
|             client_manager._username,
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             fakes.PASSWORD,
 | |
|             client_manager._password,
 | |
|         )
 | |
|         self.assertIsInstance(
 | |
|             client_manager.auth,
 | |
|             auth_v2.Password,
 | |
|         )
 | |
|         self.assertTrue(client_manager._insecure)
 | |
|         self.assertFalse(client_manager._verify)
 | |
| 
 | |
|         # These need to stick around until the old-style clients are gone
 | |
|         self.assertEqual(
 | |
|             AUTH_REF,
 | |
|             client_manager.auth_ref,
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             fakes.AUTH_TOKEN,
 | |
|             client_manager._token,
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             dir(SERVICE_CATALOG),
 | |
|             dir(client_manager._service_catalog),
 | |
|         )
 | |
| 
 | |
|     def stub_auth(self, json=None, url=None, verb=None, **kwargs):
 | |
|         subject_token = fakes.AUTH_TOKEN
 | |
|         base_url = fakes.AUTH_URL
 | |
|         if json:
 | |
|             text = jsonutils.dumps(json)
 | |
|             headers = {'X-Subject-Token': subject_token,
 | |
|                        'Content-Type': 'application/json'}
 | |
|         if not url:
 | |
|             url = '/'.join([base_url, 'tokens'])
 | |
|         url = url.replace("/?", "?")
 | |
|         if not verb:
 | |
|             verb = 'POST'
 | |
|         self.requests.register_uri(verb,
 | |
|                                    url,
 | |
|                                    headers=headers,
 | |
|                                    text=text)
 | |
| 
 | |
|     def test_client_manager_password_verify_ca(self):
 | |
| 
 | |
|         client_manager = clientmanager.ClientManager(
 | |
|             auth_options=FakeOptions(os_auth_url=fakes.AUTH_URL,
 | |
|                                      os_username=fakes.USERNAME,
 | |
|                                      os_password=fakes.PASSWORD,
 | |
|                                      os_auth_plugin='v2password'),
 | |
|             api_version=API_VERSION,
 | |
|             verify='cafile',
 | |
|         )
 | |
| 
 | |
|         self.assertFalse(client_manager._insecure)
 | |
|         self.assertTrue(client_manager._verify)
 | |
|         self.assertEqual('cafile', client_manager._cacert)
 | |
| 
 | |
|     def _select_auth_plugin(self, auth_params, api_version, auth_plugin):
 | |
|         auth_params['os_auth_plugin'] = auth_plugin
 | |
|         auth_params['os_identity_api_version'] = api_version
 | |
|         client_manager = clientmanager.ClientManager(
 | |
|             auth_options=FakeOptions(**auth_params),
 | |
|             api_version=API_VERSION,
 | |
|             verify=True
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             auth_plugin,
 | |
|             client_manager._auth_plugin,
 | |
|         )
 | |
| 
 | |
|     def test_client_manager_select_auth_plugin(self):
 | |
|         # test token auth
 | |
|         params = dict(os_token=fakes.AUTH_TOKEN,
 | |
|                       os_auth_url=fakes.AUTH_URL)
 | |
|         self._select_auth_plugin(params, '2.0', 'v2token')
 | |
|         self._select_auth_plugin(params, '3', 'v3token')
 | |
|         self._select_auth_plugin(params, 'XXX', 'token')
 | |
|         # test token/endpoint auth
 | |
|         params = dict(os_token=fakes.AUTH_TOKEN, os_url='test')
 | |
|         self._select_auth_plugin(params, 'XXX', 'token_endpoint')
 | |
|         # test password auth
 | |
|         params = dict(os_auth_url=fakes.AUTH_URL,
 | |
|                       os_username=fakes.USERNAME,
 | |
|                       os_password=fakes.PASSWORD)
 | |
|         self._select_auth_plugin(params, '2.0', 'v2password')
 | |
|         self._select_auth_plugin(params, '3', 'v3password')
 | |
|         self._select_auth_plugin(params, 'XXX', 'password')
 | |
| 
 | |
|     def test_client_manager_select_auth_plugin_failure(self):
 | |
|         self.assertRaises(exc.CommandError,
 | |
|                           clientmanager.ClientManager,
 | |
|                           auth_options=FakeOptions(os_auth_plugin=''),
 | |
|                           api_version=API_VERSION,
 | |
|                           verify=True)
 |