Update OpenStack clouds file support

Move clouds files support to its own dedicated module.
Improve unit test testing all corner cases.
Use exceptions to handle errors.
Improve error handling.

Change-Id: Ic705a275be766ba20a4779ed7b27dc4c48f46fcf
This commit is contained in:
Federico Ressi 2019-09-16 17:17:47 +02:00
parent a6014b919f
commit 3c319914e4
6 changed files with 500 additions and 166 deletions

View File

@ -14,6 +14,7 @@
from __future__ import absolute_import from __future__ import absolute_import
from tobiko.openstack.keystone import _client from tobiko.openstack.keystone import _client
from tobiko.openstack.keystone import _clouds_file
from tobiko.openstack.keystone import _credentials from tobiko.openstack.keystone import _credentials
from tobiko.openstack.keystone import _services from tobiko.openstack.keystone import _services
from tobiko.openstack.keystone import _session from tobiko.openstack.keystone import _session
@ -27,6 +28,9 @@ list_endpoints = _client.list_endpoints
list_services = _client.list_services list_services = _client.list_services
KeystoneClientFixture = _client.KeystoneClientFixture KeystoneClientFixture = _client.KeystoneClientFixture
CloudsFileKeystoneCredentialsFixture = (
_clouds_file.CloudsFileKeystoneCredentialsFixture)
keystone_credentials = _credentials.keystone_credentials keystone_credentials = _credentials.keystone_credentials
get_keystone_credentials = _credentials.get_keystone_credentials get_keystone_credentials = _credentials.get_keystone_credentials
default_keystone_credentials = _credentials.default_keystone_credentials default_keystone_credentials = _credentials.default_keystone_credentials

View File

@ -0,0 +1,186 @@
# Copyright 2019 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import json
import os
import appdirs
from oslo_log import log
import yaml
from tobiko.openstack.keystone import _credentials
LOG = log.getLogger(__name__)
APPDIRS = appdirs.AppDirs('openstack', 'OpenStack', multipath='/etc')
CONFIG_SEARCH_PATH = [os.getcwd(),
APPDIRS.user_config_dir,
os.path.expanduser('~/.config/openstack'),
APPDIRS.site_config_dir,
'/etc/openstack']
YAML_SUFFIXES = ('.yaml', '.yml')
JSON_SUFFIXES = ('.json',)
DEFAULT_CLOUDS_FILES = [
os.path.join(d, 'clouds' + s)
for d in CONFIG_SEARCH_PATH
for s in YAML_SUFFIXES + JSON_SUFFIXES]
try:
FileNotFound = FileNotFoundError
except NameError:
FileNotFound = OSError
class CloudsFileKeystoneCredentialsFixture(
_credentials.KeystoneCredentialsFixture):
cloud_name = None
clouds_content = None
clouds_file = None
def __init__(self, credentials=None, cloud_name=None,
clouds_content=None, clouds_file=None, clouds_files=None):
super(CloudsFileKeystoneCredentialsFixture, self).__init__(
credentials=credentials)
if cloud_name is not None:
self.cloud_name = cloud_name
if clouds_content is not None:
self.clouds_content = dict(clouds_content)
if clouds_file is not None:
self.clouds_file = clouds_file
if clouds_files is None:
self.clouds_files = tuple(DEFAULT_CLOUDS_FILES)
else:
self.clouds_files = tuple(clouds_files)
def get_credentials(self):
cloud_name = self._get_cloud_name()
clouds_content = self._get_clouds_content()
clouds_section = clouds_content.get("clouds")
if clouds_section is None:
message = ("'clouds' section not found in clouds file "
"{!r}").format(self.clouds_file)
raise ValueError(message)
clouds_config = clouds_section.get(cloud_name)
if clouds_config is None:
message = ("No such cloud with name {!r} in file "
"{!r}").format(cloud_name, self.clouds_file)
raise ValueError(message)
auth = clouds_config.get("auth")
if auth is None:
message = ("No such 'auth' section in cloud file {!r} for cloud "
"name {!r}").format(self.clouds_file, self.cloud_name)
raise ValueError(message)
auth_url = auth.get("auth_url")
if not auth_url:
message = ("No such 'auth_url' in file {!r} for cloud name "
"{!r}").format(self.clouds_file, self.cloud_name)
raise ValueError(message)
api_version = (int(clouds_config.get("identity_api_version", 0)) or
_credentials.api_version_from_url(auth_url))
if api_version == 2:
return _credentials.keystone_credentials(
api_version=api_version,
auth_url=auth_url,
username=auth.get("username"),
password=auth.get("password"),
project_name=auth.get("project_name"))
else:
return _credentials.keystone_credentials(
api_version=api_version,
auth_url=auth_url,
username=auth.get("username"),
password=auth.get("password"),
project_name=auth.get("project_name"),
domain_name=auth.get("domain_name"),
user_domain_name=auth.get("user_domain_name"),
project_domain_name=auth.get("project_domain_name"),
project_domain_id=auth.get("project_domain_id"),
trust_id=auth.get("trust_id"))
def _get_cloud_name(self):
cloud_name = self.cloud_name
if cloud_name is None:
cloud_name = os.environ.get("OS_CLOUD")
if cloud_name:
LOG.debug("Got cloud name from 'OS_CLOUD' environment "
"variable: %r", cloud_name)
self.cloud_name = cloud_name
else:
message = "Undefined environment variable: 'OS_CLOUD'"
raise ValueError(message)
if not cloud_name:
message = "Invalid cloud name: {!r}".format(cloud_name)
raise ValueError(message)
return cloud_name
def _get_clouds_content(self):
clouds_content = self.clouds_content
if clouds_content is None:
clouds_file = self._get_clouds_file()
with open(clouds_file, 'r') as f:
_, suffix = os.path.splitext(clouds_file)
if suffix in JSON_SUFFIXES:
LOG.debug('Load JSON clouds file: %r', clouds_file)
clouds_content = json.load(f)
elif suffix in YAML_SUFFIXES:
LOG.debug('Load YAML clouds file: %r', clouds_file)
clouds_content = yaml.safe_load(f)
else:
message = 'Invalid clouds file suffix: {!r}'.format(
suffix)
raise ValueError(message)
LOG.debug('Clouds file content loaded from %r:\n%r',
clouds_file, json.dumps(clouds_content,
indent=4,
sort_keys=True))
self.clouds_content = clouds_content
if not clouds_content:
message = "Invalid clouds file content: {!r}".format(
clouds_content)
raise ValueError(message)
return clouds_content
def _get_clouds_file(self):
clouds_file = self.clouds_file
if not clouds_file:
clouds_files = self.clouds_files
for filename in clouds_files:
if os.path.exists(filename):
LOG.debug('Found clouds file at %r', filename)
self.clouds_file = clouds_file = filename
break
else:
message = 'No such clouds file: {!s}'.format(
', '.join(repr(f) for f in clouds_files))
raise FileNotFound(message)
if not os.path.exists(clouds_file):
message = 'Cloud file not found: {!r}'.format(clouds_file)
raise FileNotFound(message)
return clouds_file
_credentials.DEFAULT_KEYSTONE_CREDENTIALS_FIXTURES.insert(
0, CloudsFileKeystoneCredentialsFixture)

View File

@ -1,44 +0,0 @@
# Copyright 2019 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import os
import appdirs
APPDIRS = appdirs.AppDirs('openstack', 'OpenStack', multipath='/etc')
CONFIG_HOME = APPDIRS.user_config_dir
CACHE_PATH = APPDIRS.user_cache_dir
UNIX_CONFIG_HOME = os.path.join(
os.path.expanduser(os.path.join('~', '.config')), 'openstack')
UNIX_SITE_CONFIG_HOME = '/etc/openstack'
SITE_CONFIG_HOME = APPDIRS.site_config_dir
CONFIG_SEARCH_PATH = [
os.getcwd(),
CONFIG_HOME, UNIX_CONFIG_HOME,
SITE_CONFIG_HOME, UNIX_SITE_CONFIG_HOME
]
YAML_SUFFIXES = ('.yaml', '.yml')
JSON_SUFFIXES = ('.json',)
def get_cloud_config_files():
return [
os.path.join(d, 'clouds' + s)
for d in CONFIG_SEARCH_PATH
for s in YAML_SUFFIXES + JSON_SUFFIXES]

View File

@ -14,7 +14,6 @@
from __future__ import absolute_import from __future__ import absolute_import
import collections import collections
import json
import os import os
import sys import sys
@ -22,9 +21,6 @@ from oslo_log import log
import yaml import yaml
import tobiko import tobiko
from tobiko.openstack.keystone import _config_files
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -225,58 +221,6 @@ class EnvironKeystoneCredentialsFixture(KeystoneCredentialsFixture):
return value return value
class CloudsFileKeystoneCredentialsFixture(EnvironKeystoneCredentialsFixture):
def __init__(self, credentials=None, environ=None, clouds_files=None):
super(CloudsFileKeystoneCredentialsFixture, self).__init__(
credentials=credentials, environ=environ)
self.clouds_files = (
clouds_files or _config_files.get_cloud_config_files())
def _load_yaml_json_file(self, filelist):
for path in filelist:
if os.path.exists(path):
with open(path, 'r') as f:
if path.endswith('json'):
return path, json.load(f)
else:
return path, yaml.safe_load(f)
return None, {}
def get_credentials(self):
cloud_name = self.get_env("OS_CLOUD")
if not cloud_name:
LOG.debug('No OS_CLOUD env variable')
return None
file_name, clouds_config = self._load_yaml_json_file(self.clouds_files)
clouds_config = clouds_config.get("clouds")
if not clouds_config:
LOG.debug('No clouds configs found in any of %s',
self.clouds_files)
return None
config = clouds_config.get(cloud_name)
if not config:
LOG.debug("No %s cloud config found in cloud configs file %s",
cloud_name, file_name)
return None
auth = config.get("auth", {})
return keystone_credentials(
api_version=int(config.get("identity_api_version")),
auth_url=auth.get("auth_url"),
username=auth.get("username"),
password=auth.get("password"),
project_name=auth.get("project_name"),
domain_name=auth.get("domain_name"),
user_domain_name=auth.get("user_domain_name"),
project_domain_name=auth.get("project_domain_name"),
project_domain_id=auth.get("project_domain_id"),
trust_id=auth.get("trust_id"))
class ConfigKeystoneCredentialsFixture(KeystoneCredentialsFixture): class ConfigKeystoneCredentialsFixture(KeystoneCredentialsFixture):
def get_credentials(self): def get_credentials(self):
@ -312,7 +256,6 @@ class ConfigKeystoneCredentialsFixture(KeystoneCredentialsFixture):
DEFAULT_KEYSTONE_CREDENTIALS_FIXTURES = [ DEFAULT_KEYSTONE_CREDENTIALS_FIXTURES = [
CloudsFileKeystoneCredentialsFixture,
EnvironKeystoneCredentialsFixture, EnvironKeystoneCredentialsFixture,
ConfigKeystoneCredentialsFixture] ConfigKeystoneCredentialsFixture]

View File

@ -0,0 +1,309 @@
# Copyright (c) 2019 Red Hat
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import json
import os
import tempfile
import yaml
import tobiko
from tobiko.openstack import keystone
from tobiko.openstack.keystone import _clouds_file
from tobiko.tests.unit import openstack
from tobiko.tests.unit.openstack.keystone import test_credentials
def make_clouds_content(cloud_name, api_version=None, auth=None):
content = {}
if api_version is not None:
content['identity_api_version'] = api_version
if auth is not None:
content['auth'] = auth
return {'clouds': {cloud_name: content}}
class CloudsFileFixture(tobiko.SharedFixture):
cloud_name = None
api_version = None
auth = None
clouds_content = None
clouds_file = None
suffix = '.yaml'
create_file = True
def __init__(self, cloud_name=None, api_version=None, auth=None,
clouds_file=None, suffix=None, create_file=None,
clouds_content=None):
super(CloudsFileFixture, self).__init__()
if cloud_name is not None:
self.cloud_name = cloud_name
if api_version is not None:
self.api_version = api_version
if auth is not None:
self.auth = auth
if clouds_file is not None:
self.clouds_file = clouds_file
if suffix is not None:
self.suffix = suffix
if create_file is not None:
self.create_file = create_file
if clouds_content is not None:
self.clouds_content = clouds_content
def setup_fixture(self):
clouds_content = self.clouds_content
if clouds_content is None:
self.clouds_content = clouds_content = make_clouds_content(
cloud_name=self.cloud_name, api_version=self.api_version,
auth=self.auth)
if self.create_file:
clouds_file = self.clouds_file
if clouds_file is None:
fd, clouds_file = tempfile.mkstemp(suffix=self.suffix)
self.addCleanup(os.remove, clouds_file)
self.clouds_file = clouds_file
clouds_stream = os.fdopen(fd, 'wt')
else:
clouds_stream = os.open(clouds_file, 'wt')
try:
if self.suffix in _clouds_file.JSON_SUFFIXES:
json.dump(clouds_content, clouds_stream)
elif self.suffix in _clouds_file.YAML_SUFFIXES:
yaml.safe_dump(clouds_content, clouds_stream)
finally:
clouds_stream.close()
class V2CloudsFileFixture(CloudsFileFixture):
cloud_name = 'V2-TEST_CLOUD'
auth = test_credentials.V2_PARAMS
class V3CloudsFileFixture(CloudsFileFixture):
cloud_name = 'V3-TEST_CLOUD'
auth = test_credentials.V3_PARAMS
class CloudsFileKeystoneCredentialsFixtureTest(openstack.OpenstackTest):
def test_init(self):
fixture = keystone.CloudsFileKeystoneCredentialsFixture()
self.assertIsNone(fixture.cloud_name)
self.assertIsNone(fixture.clouds_content)
self.assertIsNone(fixture.clouds_file)
self.assertEqual(tuple(_clouds_file.DEFAULT_CLOUDS_FILES),
fixture.clouds_files)
def test_init_with_cloud_name(self):
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name='cloud-name')
self.assertEqual('cloud-name', fixture.cloud_name)
def test_init_with_clouds_content(self):
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
clouds_content={})
self.assertEqual({}, fixture.clouds_content)
def test_init_with_clouds_file(self):
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
clouds_file='cloud-file')
self.assertEqual('cloud-file', fixture.clouds_file)
def test_init_with_clouds_files(self):
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
clouds_files=['a', 'b', 'd'])
self.assertEqual(('a', 'b', 'd'), fixture.clouds_files)
def test_setup_from_default_clouds_files(self):
file_fixture = self.useFixture(V3CloudsFileFixture())
self.patch(_clouds_file, 'DEFAULT_CLOUDS_FILES',
['/a', file_fixture.clouds_file, '/c'])
credentials_fixture = self.useFixture(
keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name=file_fixture.cloud_name))
self.assertEqual(file_fixture.clouds_content,
credentials_fixture.clouds_content)
self.assertEqual(test_credentials.V3_PARAMS,
credentials_fixture.credentials.to_dict())
def test_setup_from_json(self):
file_fixture = self.useFixture(V3CloudsFileFixture(suffix='.json'))
credentials_fixture = self.useFixture(
keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name=file_fixture.cloud_name,
clouds_file=file_fixture.clouds_file))
self.assertEqual(file_fixture.clouds_content,
credentials_fixture.clouds_content)
self.assertEqual(test_credentials.V3_PARAMS,
credentials_fixture.credentials.to_dict())
def test_setup_from_yaml(self):
file_fixture = self.useFixture(V3CloudsFileFixture(suffix='.yaml'))
credentials_fixture = self.useFixture(
keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name=file_fixture.cloud_name,
clouds_file=file_fixture.clouds_file))
self.assertEqual(file_fixture.clouds_content,
credentials_fixture.clouds_content)
self.assertEqual(test_credentials.V3_PARAMS,
credentials_fixture.credentials.to_dict())
def test_setup_from_yml(self):
file_fixture = self.useFixture(V3CloudsFileFixture(suffix='.yml'))
credentials_fixture = self.useFixture(
keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name=file_fixture.cloud_name,
clouds_file=file_fixture.clouds_file))
self.assertEqual(file_fixture.clouds_content,
credentials_fixture.clouds_content)
self.assertEqual(test_credentials.V3_PARAMS,
credentials_fixture.credentials.to_dict())
def test_setup_from_invalid_suffix(self):
file_fixture = self.useFixture(V3CloudsFileFixture(suffix='.txt'))
credentials_fixture = keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name=file_fixture.cloud_name,
clouds_file=file_fixture.clouds_file)
ex = self.assertRaises(ValueError, tobiko.setup_fixture,
credentials_fixture)
self.assertEqual("Invalid clouds file suffix: '.txt'", str(ex))
def test_setup_v2_credentials(self):
file_fixture = self.useFixture(V2CloudsFileFixture())
credentials_fixture = self.useFixture(
keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name=file_fixture.cloud_name,
clouds_file=file_fixture.clouds_file))
self.assertEqual(file_fixture.clouds_content,
credentials_fixture.clouds_content)
self.assertEqual(test_credentials.V2_PARAMS,
credentials_fixture.credentials.to_dict())
def test_setup_with_cloud_name(self):
file_fixture = self.useFixture(V3CloudsFileFixture())
credentials_fixture = keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name='cloud-name',
clouds_file=file_fixture.clouds_file)
ex = self.assertRaises(ValueError, tobiko.setup_fixture,
credentials_fixture)
self.assertEqual("No such cloud with name 'cloud-name' in file " +
repr(file_fixture.clouds_file), str(ex))
def test_setup_with_cloud_name_from_env(self):
file_fixture = self.useFixture(V2CloudsFileFixture())
self.patch(os, 'environ', {'OS_CLOUD': file_fixture.cloud_name})
credentials_fixture = keystone.CloudsFileKeystoneCredentialsFixture(
clouds_file=file_fixture.clouds_file)
self.assertIsNone(credentials_fixture.cloud_name)
tobiko.setup_fixture(credentials_fixture)
self.assertEqual(file_fixture.cloud_name,
credentials_fixture.cloud_name)
def test_setup_with_empty_cloud_name(self):
file_fixture = self.useFixture(V2CloudsFileFixture())
credentials_fixture = keystone.CloudsFileKeystoneCredentialsFixture(
clouds_file=file_fixture.clouds_file,
cloud_name='')
self.assertEqual('', credentials_fixture.cloud_name)
ex = self.assertRaises(ValueError, tobiko.setup_fixture,
credentials_fixture)
self.assertEqual("Invalid cloud name: ''", str(ex))
def test_setup_with_empty_cloud_name_from_env(self):
file_fixture = self.useFixture(V2CloudsFileFixture())
self.patch(os, 'environ', {'OS_CLOUD': ''})
credentials_fixture = keystone.CloudsFileKeystoneCredentialsFixture(
clouds_file=file_fixture.clouds_file)
self.assertIsNone(credentials_fixture.cloud_name)
ex = self.assertRaises(ValueError, tobiko.setup_fixture,
credentials_fixture)
self.assertEqual("Undefined environment variable: 'OS_CLOUD'", str(ex))
def test_setup_with_no_cloud_name(self):
file_fixture = self.useFixture(V2CloudsFileFixture())
credentials_fixture = keystone.CloudsFileKeystoneCredentialsFixture(
clouds_file=file_fixture.clouds_file)
self.assertIsNone(credentials_fixture.cloud_name)
ex = self.assertRaises(ValueError, tobiko.setup_fixture,
credentials_fixture)
self.assertEqual("Undefined environment variable: 'OS_CLOUD'", str(ex))
def test_setup_with_no_clouds_section(self):
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name='cloud-name', clouds_content={'other_data': None},
clouds_file='clouds-file')
ex = self.assertRaises(ValueError, tobiko.setup_fixture, fixture)
self.assertEqual('cloud-name', fixture.cloud_name)
self.assertEqual({'other_data': None}, fixture.clouds_content)
self.assertEqual("'clouds' section not found in clouds file "
"'clouds-file'", str(ex))
def test_setup_with_empty_clouds_content(self):
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name='cloud-name', clouds_content={})
ex = self.assertRaises(ValueError, tobiko.setup_fixture, fixture)
self.assertEqual('cloud-name', fixture.cloud_name)
self.assertEqual({}, fixture.clouds_content)
self.assertEqual('Invalid clouds file content: {}', str(ex))
def test_setup_with_no_auth(self):
clouds_content = make_clouds_content('cloud-name')
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name='cloud-name',
clouds_content=clouds_content,
clouds_file='cloud-file')
ex = self.assertRaises(ValueError, tobiko.setup_fixture, fixture)
self.assertEqual('cloud-name', fixture.cloud_name)
self.assertEqual(
"No such 'auth' section in cloud file 'cloud-file' for cloud "
"name 'cloud-name'", str(ex))
def test_setup_with_no_auth_url(self):
clouds_content = make_clouds_content('cloud-name', auth={})
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name='cloud-name',
clouds_content=clouds_content,
clouds_file='cloud-file')
ex = self.assertRaises(ValueError, tobiko.setup_fixture, fixture)
self.assertEqual('cloud-name', fixture.cloud_name)
self.assertEqual(
"No such 'auth_url' in file 'cloud-file' for cloud name "
"'cloud-name'", str(ex))
def test_setup_without_clouds_file(self):
self.patch(_clouds_file, 'DEFAULT_CLOUDS_FILES', ['/a', '/b', '/c'])
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name='cloud-name')
ex = self.assertRaises(_clouds_file.FileNotFound, tobiko.setup_fixture,
fixture)
self.assertEqual('cloud-name', fixture.cloud_name)
self.assertEqual("No such clouds file: '/a', '/b', '/c'", str(ex))
def test_setup_with_non_existing_clouds_file(self):
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
clouds_file='/a.yaml',
cloud_name='cloud-name')
ex = self.assertRaises(_clouds_file.FileNotFound, tobiko.setup_fixture,
fixture)
self.assertEqual("Cloud file not found: '/a.yaml'", str(ex))

View File

@ -14,18 +14,15 @@
# under the License. # under the License.
from __future__ import absolute_import from __future__ import absolute_import
import json
import os import os
import mock
import yaml
import tobiko import tobiko
from tobiko import config from tobiko import config
from tobiko.openstack import keystone from tobiko.openstack import keystone
from tobiko.openstack.keystone import _credentials from tobiko.openstack.keystone import _credentials
from tobiko.tests.unit import openstack from tobiko.tests.unit import openstack
V2_PARAMS = { V2_PARAMS = {
'api_version': 2, 'api_version': 2,
'project_name': 'demo', 'project_name': 'demo',
@ -65,17 +62,6 @@ V3_ENVIRON = {
'OS_USER_DOMAIN_NAME': 'Default', 'OS_USER_DOMAIN_NAME': 'Default',
'OS_PROJECT_DOMAIN_NAME': 'Default'} 'OS_PROJECT_DOMAIN_NAME': 'Default'}
CLOUDS_CONFIG = {
'clouds': {
'test-cloud': {
'auth': {'auth_url': V3_PARAMS['auth_url'],
'password': V3_PARAMS['password'],
'project_domain_name': V3_PARAMS['project_domain_name'],
'project_name': V3_PARAMS['project_name'],
'user_domain_name': V3_PARAMS['user_domain_name'],
'username': V3_PARAMS['username']},
'identity_api_version': str(V3_PARAMS['api_version'])}}}
V3_ENVIRON_WITH_VERSION = dict(V3_ENVIRON, OS_IDENTITY_API_VERSION='3') V3_ENVIRON_WITH_VERSION = dict(V3_ENVIRON, OS_IDENTITY_API_VERSION='3')
@ -178,56 +164,6 @@ class EnvironKeystoneCredentialsFixtureTest(openstack.OpenstackTest):
self.assertEqual(V3_PARAMS, fixture.credentials.to_dict()) self.assertEqual(V3_PARAMS, fixture.credentials.to_dict())
class CloudsFileKeystoneCredentialsFixtureTestJson(openstack.OpenstackTest):
clouds_file_name = "/tmp/test-cloud-file.json"
json_cloud_data = json.dumps(CLOUDS_CONFIG)
def setUp(self):
super(CloudsFileKeystoneCredentialsFixtureTestJson, self).setUp()
self.patch(os, 'environ', {'OS_CLOUD': 'test-cloud'})
self.patch(os.path, 'exists', return_value=True)
mocked_open = mock.mock_open(read_data=self.json_cloud_data)
self.patch(_credentials, "open", mocked_open)
def test_setup_from_clouds_config_file(self):
fixture = _credentials.CloudsFileKeystoneCredentialsFixture(
clouds_files=[self.clouds_file_name])
fixture.setUp()
fixture.credentials.validate()
self.assertEqual(
V3_PARAMS, fixture.credentials.to_dict())
def test_setup_from_file_no_os_cloud_env_set(self):
self.patch(os, 'environ', {})
fixture = _credentials.CloudsFileKeystoneCredentialsFixture(
clouds_files=[self.clouds_file_name])
fixture.setUp()
self.assertIsNone(fixture.credentials)
def test_setup_from_file_no_clouds_config_in_file(self):
mocked_open = mock.mock_open(read_data=json.dumps({}))
self.patch(_credentials, "open", mocked_open)
fixture = _credentials.CloudsFileKeystoneCredentialsFixture(
clouds_files=[self.clouds_file_name])
fixture.setUp()
self.assertIsNone(fixture.credentials)
def test_setup_from_file_no_specified_cloud_config_in_file(self):
self.patch(os, 'environ', {'OS_CLOUD': 'some-other-cloud'})
fixture = _credentials.CloudsFileKeystoneCredentialsFixture(
clouds_files=[self.clouds_file_name])
fixture.setUp()
self.assertIsNone(fixture.credentials)
class CloudsFileKeystoneCredentialsFixtureTestYaml(
CloudsFileKeystoneCredentialsFixtureTestJson):
clouds_file_name = "/tmp/test-cloud-file.yaml"
json_cloud_data = yaml.dump(CLOUDS_CONFIG)
class ConfigKeystoneCredentialsFixtureTest(openstack.OpenstackTest): class ConfigKeystoneCredentialsFixtureTest(openstack.OpenstackTest):
def patch_config(self, params, **kwargs): def patch_config(self, params, **kwargs):