Merge "Fix discovery cache sharing"

This commit is contained in:
Zuul 2019-08-13 15:14:40 +00:00 committed by Gerrit Code Review
commit c7ba2b7b7f
3 changed files with 60 additions and 40 deletions

View File

@ -226,21 +226,10 @@ class _OpenStackCloudMixin(object):
for key, value in kwargs.items():
params['auth'][key] = value
# TODO(mordred) Replace this chunk with the next patch that allows
# passing a Session to CloudRegion.
# Closure to pass to OpenStackConfig to ensure the new cloud shares
# the Session with the current cloud. This will ensure that version
# discovery cache will be re-used.
def session_constructor(*args, **kwargs):
# We need to pass our current keystone session to the Session
# Constructor, otherwise the new auth plugin doesn't get used.
return keystoneauth1.session.Session(
session=self.session,
discovery_cache=self.config._discovery_cache)
cloud_config = config.get_one(
session_constructor=session_constructor,
**params)
cloud_config = config.get_one(**params)
# Attach the discovery cache from the old session so we won't
# double discover.
cloud_config._discovery_cache = self.session._discovery_cache
# Override the cloud name so that logging/location work right
cloud_config._name = self.name
cloud_config.config['profile'] = self.name

View File

@ -420,24 +420,34 @@ class TestCase(base.TestCase):
self.calls = []
self._uri_registry.clear()
def get_keystone_v3_token(self, catalog='catalog-v3.json'):
catalog_file = os.path.join(self.fixtures_directory, catalog)
with open(catalog_file, 'r') as tokens_file:
return dict(
method='POST',
uri='https://identity.example.com/v3/auth/tokens',
headers={
'X-Subject-Token': self.getUniqueString('KeystoneToken')
},
text=tokens_file.read()
)
def get_keystone_v3_discovery(self):
with open(self.discovery_json, 'r') as discovery_file:
return dict(
method='GET',
uri='https://identity.example.com/',
text=discovery_file.read(),
)
def use_keystone_v3(self, catalog='catalog-v3.json'):
self.adapter = self.useFixture(rm_fixture.Fixture())
self.calls = []
self._uri_registry.clear()
with open(self.discovery_json, 'r') as discovery_file, \
open(os.path.join(
self.fixtures_directory, catalog), 'r') as tokens_file:
self.__do_register_uris([
dict(method='GET', uri='https://identity.example.com/',
text=discovery_file.read()),
dict(method='POST',
uri='https://identity.example.com/v3/auth/tokens',
headers={
'X-Subject-Token':
self.getUniqueString('KeystoneToken')},
text=tokens_file.read()
),
])
self.__do_register_uris([
self.get_keystone_v3_discovery(),
self.get_keystone_v3_token(catalog),
])
self._make_test_cloud(identity_api_version='3')
def use_keystone_v2(self):

View File

@ -56,20 +56,41 @@ class TestShade(base.TestCase):
def test_connect_as(self):
# Do initial auth/catalog steps
# TODO(mordred) This only tests the constructor steps. Discovery
# cache sharing is broken. We need to get discovery_cache option
# plumbed through
# keystoneauth1.loading.base.BaseLoader.load_from_options
self.cloud.connect_as(project_name='test_project')
# This should authenticate a second time, but should not
# need a second identity discovery
self.register_uris([
self.get_keystone_v3_token(),
self.get_nova_discovery_mock_dict(),
dict(
method='GET',
uri=self.get_mock_url(
'compute', 'public', append=['servers', 'detail']),
json={'servers': []},
),
])
c2 = self.cloud.connect_as(project_name='test_project')
self.assertEqual(c2.list_servers(), [])
self.assert_calls()
def test_connect_as_context(self):
# Do initial auth/catalog steps
# TODO(mordred) This only tests the constructor steps. Discovery
# cache sharing is broken. We need to get discovery_cache option
# plumbed through
# keystoneauth1.loading.base.BaseLoader.load_from_options
with self.cloud.connect_as(project_name='test_project'):
pass
# This should authenticate a second time, but should not
# need a second identity discovery
self.register_uris([
self.get_keystone_v3_token(),
self.get_nova_discovery_mock_dict(),
dict(
method='GET',
uri=self.get_mock_url(
'compute', 'public', append=['servers', 'detail']),
json={'servers': []},
),
])
with self.cloud.connect_as(project_name='test_project') as c2:
self.assertEqual(c2.list_servers(), [])
self.assert_calls()
@mock.patch.object(connection.Connection, 'search_images')
def test_get_images(self, mock_search):