Refactor Keystone integration tools

- add support for remote cloud files
- refactor keystone credentials and session fixtures
- update test cases
- refactor tripleo and shiftstack credentials fixtures
- allow to configure default cloud name(s) for tripleo and shiftstack
- get cloud name from environment or RC file before than configured one

Change-Id: If0306e4ac4bdaa45c195265191d62703ab80512e
This commit is contained in:
Federico Ressi 2022-07-20 11:37:49 +02:00
parent 18e30f0d98
commit 1db14b7927
18 changed files with 806 additions and 916 deletions

View File

@ -20,6 +20,8 @@ from tobiko.openstack.keystone import _resource
from tobiko.openstack.keystone import _services
from tobiko.openstack.keystone import _session
KeystoneClient = _client.KeystoneClient
KeystoneClientFixture = _client.KeystoneClientFixture
keystone_client = _client.keystone_client
get_keystone_client = _client.get_keystone_client
find_service = _client.find_service
@ -27,26 +29,28 @@ find_endpoint = _client.find_endpoint
find_service_endpoint = _client.find_service_endpoint
list_endpoints = _client.list_endpoints
list_services = _client.list_services
KeystoneClientFixture = _client.KeystoneClientFixture
CloudsFileKeystoneCredentialsFixture = (
_clouds_file.CloudsFileKeystoneCredentialsFixture)
default_keystone_credentials = _credentials.default_keystone_credentials
get_keystone_credentials = _credentials.get_keystone_credentials
has_keystone_credentials = _credentials.has_keystone_credentials
keystone_credentials = _credentials.keystone_credentials
register_default_keystone_credentials = (
_credentials.register_default_keystone_credentials)
skip_unless_has_keystone_credentials = (
_credentials.skip_unless_has_keystone_credentials)
DefaultKeystoneCredentialsFixture = (
_credentials.DefaultKeystoneCredentialsFixture)
ConfigKeystoneCredentialsFixture = (
_credentials.ConfigKeystoneCredentialsFixture)
DelegateKeystoneCredentialsFixture = (
_credentials.DelegateKeystoneCredentialsFixture)
KeystoneCredentials = _credentials.KeystoneCredentials
KeystoneCredentialsFixture = _credentials.KeystoneCredentialsFixture
KeystoneCredentialsType = _credentials.KeystoneCredentialsType
NoSuchKeystoneCredentials = _credentials.NoSuchKeystoneCredentials
EnvironKeystoneCredentialsFixture = \
_credentials.EnvironKeystoneCredentialsFixture
InvalidKeystoneCredentials = _credentials.InvalidKeystoneCredentials
DEFAULT_KEYSTONE_CREDENTIALS_FIXTURES = \
_credentials.DEFAULT_KEYSTONE_CREDENTIALS_FIXTURES
get_keystone_resource_id = _resource.get_keystone_resource_id
get_project_id = _resource.get_project_id
@ -60,6 +64,7 @@ is_service_missing = _services.is_service_missing
skip_if_missing_service = _services.skip_if_missing_service
keystone_session = _session.keystone_session
KeystoneSession = _session.KeystoneSession
KeystoneSessionType = _session.KeystoneSessionType
KeystoneSessionFixture = _session.KeystoneSessionFixture
KeystoneSessionManager = _session.KeystoneSessionManager

View File

@ -13,13 +13,17 @@
# under the License.
from __future__ import absolute_import
import functools
import json
import os
import typing
from oslo_log import log
import tobiko
from tobiko.openstack.keystone import _credentials
from tobiko.shell import find
from tobiko.shell import sh
LOG = log.getLogger(__name__)
@ -33,182 +37,195 @@ class CloudsFileNotFoundError(tobiko.TobikoException):
message = "No such clouds file(s): {clouds_files!s}"
class DefaultCloudsFileConfig(tobiko.SharedFixture):
cloud_name = None
clouds_file_dirs = None
clouds_file_names = None
clouds_files = None
def setup_fixture(self):
keystone_conf = tobiko.tobiko_config().keystone
self.cloud_name = keystone_conf.cloud_name
self.clouds_file_dirs = keystone_conf.clouds_file_dirs
self.clouds_file_names = keystone_conf.clouds_file_names
self.clouds_files = self.list_cloud_files()
def list_cloud_files(self):
cloud_files = []
for directory in self.clouds_file_dirs:
directory = tobiko.tobiko_config_path(directory)
if os.path.isdir(directory):
for file_name in self.clouds_file_names:
file_name = os.path.join(directory, file_name)
if os.path.isfile(file_name):
cloud_files.append(file_name)
return cloud_files
CloudsFileContentType = typing.Mapping[str, typing.Any]
class CloudsFileKeystoneCredentialsFixture(
_credentials.KeystoneCredentialsFixture):
cloud_name = None
clouds_content = None
clouds_file = None
def __init__(self,
credentials: _credentials.KeystoneCredentials = None,
connection: sh.ShellConnectionType = None,
environ: typing.Dict[str, str] = None,
cloud_name: str = None,
directories: typing.Iterable[str] = None,
filenames: typing.Iterable[str] = None):
super().__init__(credentials=credentials,
connection=connection,
environ=environ)
self._cloud_name = cloud_name
if directories is not None:
directories = list(directories)
self._directories = directories
if filenames is not None:
filenames = list(filenames)
self._filenames = filenames
config = tobiko.required_fixture(DefaultCloudsFileConfig)
default_cloud_name: typing.Optional[str] = None
def __init__(self, credentials=None, cloud_name=None,
clouds_content=None, clouds_file=None, clouds_files=None):
super(CloudsFileKeystoneCredentialsFixture, self).__init__(
credentials=credentials)
@property
def cloud_name(self) -> typing.Optional[str]:
if self._cloud_name is None:
self._cloud_name = self._get_cloud_name()
return self._cloud_name
config = self.config
if cloud_name is None:
cloud_name = config.cloud_name
self.cloud_name = cloud_name
@property
def directories(self) -> typing.List[str]:
if self._directories is None:
directories = [self.connection.get_config_path(directory)
for directory in self._get_directories()]
self._directories = directories
return self._directories
if clouds_content is not None:
self.clouds_content = dict(clouds_content)
@property
def filenames(self) -> typing.List[str]:
if self._filenames is None:
self._filenames = self._get_filenames()
return self._filenames
if clouds_file is not None:
self.clouds_file = clouds_file
def _get_credentials(self) -> _credentials.KeystoneCredentials:
try:
filenames = find.find_files(path=self.directories,
name=self.filenames,
max_depth=1,
type='f',
ssh_client=self.connection.ssh_client)
except find.FilesNotFound as ex:
raise _credentials.NoSuchKeystoneCredentials(
reason=('Cloud files not found:\n'
f" login: {self.login}\n"
f" directories: {self.directories}\n"
f" filenames: {self.filenames}\n"
f" error: {ex}\n")) from ex
if clouds_files is None:
clouds_files = config.clouds_files
self.clouds_files = list(clouds_files)
if self.cloud_name is None:
raise _credentials.NoSuchKeystoneCredentials(
reason=(f"[{self.fixture_name}] Clouds name not found at"
f" {self.login!r}"))
def get_credentials(self):
cloud_name = self._get_cloud_name()
if cloud_name is None:
return None
for filename in filenames:
file_spec = f"{self.login}:{filename}"
content = load_clouds_file_content(
connection=self.connection,
filename=filename)
try:
return parse_credentials(
file_spec=file_spec,
content=content,
cloud_name=self.cloud_name)
except _credentials.NoSuchKeystoneCredentials:
LOG.debug(f'Cloud with name {self.cloud_name} not found '
f'in {file_spec}')
raise _credentials.NoSuchKeystoneCredentials(
reason=(f"[{self.fixture_name}] Keystone credentials not found "
f"for cloud name {self.cloud_name!r} in files "
f"{filenames!r} (login={self.login})"))
clouds_content = self._get_clouds_content()
clouds_section = clouds_content.get("clouds")
if clouds_section is None:
message = ("'clouds' section not found in clouds file "
"{!r}").format(self.clouds_file)
raise ValueError(message)
clouds_config = clouds_section.get(cloud_name)
if clouds_config is None:
message = ("No such cloud with name {!r} in file "
"{!r}").format(cloud_name, self.clouds_file)
raise ValueError(message)
auth = clouds_config.get("auth")
if auth is None:
message = ("No such 'auth' section in cloud file {!r} for cloud "
"name {!r}").format(self.clouds_file, self.cloud_name)
raise ValueError(message)
auth_url = auth.get("auth_url")
if not auth_url:
message = ("No such 'auth_url' in file {!r} for cloud name "
"{!r}").format(self.clouds_file, self.cloud_name)
raise ValueError(message)
username = auth.get('username') or auth.get('user_id')
password = auth.get('password')
cacert = clouds_config.get('cacert')
project_name = (auth.get('project_name') or
auth.get('tenant_namer') or
auth.get('project_id') or
auth.get_env('tenant_id'))
api_version = (int(clouds_config.get("identity_api_version", 0)) or
_credentials.api_version_from_url(auth_url))
if api_version == 2:
return _credentials.keystone_credentials(
api_version=api_version,
auth_url=auth_url,
username=username,
password=password,
project_name=project_name)
else:
domain_name = (auth.get("domain_name") or
auth.get("domain_id"))
user_domain_name = (auth.get("user_domain_name") or
auth.get("user_domain_id"))
project_domain_name = auth.get("project_domain_name")
project_domain_id = auth.get("project_domain_id")
trust_id = auth.get("trust_id")
return _credentials.keystone_credentials(
api_version=api_version,
auth_url=auth_url,
username=username,
password=password,
project_name=project_name,
domain_name=domain_name,
user_domain_name=user_domain_name,
project_domain_name=project_domain_name,
project_domain_id=project_domain_id,
cacert=cacert,
trust_id=trust_id)
def _get_cloud_name(self):
cloud_name = self.cloud_name
if cloud_name is None:
cloud_name = os.environ.get("OS_CLOUD")
def _get_cloud_name(self) -> typing.Optional[str]:
for var_name in ['OS_CLOUD', 'OS_CLOUDNAME']:
cloud_name = self.environ.get(var_name)
if cloud_name:
LOG.debug("Got cloud name from 'OS_CLOUD' environment "
"variable: %r", cloud_name)
self.cloud_name = cloud_name
else:
LOG.debug("Undefined environment variable: 'OS_CLOUD'")
return cloud_name or None
LOG.debug(f"Got cloud name from '{var_name}' environment "
f"variable: {cloud_name}", )
return cloud_name
return self._get_default_cloud_name()
def _get_clouds_content(self):
clouds_content = self.clouds_content
if clouds_content is None:
clouds_file = self._get_clouds_file()
with open(clouds_file, 'r') as f:
_, suffix = os.path.splitext(clouds_file)
if suffix in JSON_SUFFIXES:
LOG.debug('Load JSON clouds file: %r', clouds_file)
clouds_content = json.load(f)
else:
LOG.debug('Load YAML clouds file: %r', clouds_file)
clouds_content = tobiko.load_yaml(f)
LOG.debug('Clouds file content loaded from %r:\n%s',
clouds_file, json.dumps(clouds_content,
indent=4,
sort_keys=True))
self.clouds_content = clouds_content
@staticmethod
def _get_default_cloud_name() -> typing.Optional[str]:
return tobiko.tobiko_config().keystone.cloud_name
if not clouds_content:
message = "Invalid clouds file content: {!r}".format(
clouds_content)
raise ValueError(message)
return clouds_content
@staticmethod
def _get_directories() -> typing.List[str]:
return tobiko.tobiko_config().keystone.clouds_file_dirs
def _get_clouds_file(self):
clouds_file = self.clouds_file
if clouds_file:
clouds_files = [self.clouds_file]
@staticmethod
def _get_filenames() -> typing.List[str]:
return tobiko.tobiko_config().keystone.clouds_file_names
def parse_credentials(file_spec: str,
cloud_name: str,
content: CloudsFileContentType):
clouds_section = content.get("clouds")
if clouds_section is None:
raise _credentials.NoSuchKeystoneCredentials(
reason=f"'clouds' section not found in {file_spec!r}")
clouds_config = clouds_section.get(cloud_name)
if clouds_config is None:
raise _credentials.NoSuchKeystoneCredentials(
reason=f"cloud name {cloud_name!r} not found in {file_spec!r}")
auth = clouds_config.get("auth")
if auth is None:
raise _credentials.NoSuchKeystoneCredentials(
reason=f"'auth' section not found for cloud name "
f"{cloud_name!r} in {file_spec!r}")
auth_url = auth.get("auth_url")
if not auth_url:
raise _credentials.NoSuchKeystoneCredentials(
reason=f"'auth_url' is {auth_url!r} for cloud name "
f"{cloud_name!r} in {file_spec!r}")
username = auth.get('username') or auth.get('user_id')
if not username:
raise _credentials.NoSuchKeystoneCredentials(
reason=f"'username' is {username!r} for cloud name "
f"{cloud_name!r} in {file_spec!r}")
password = auth.get('password')
if not password:
raise _credentials.NoSuchKeystoneCredentials(
reason=f"'password' is {password!r} for cloud name "
f"{cloud_name!r} in {file_spec!r}")
cacert = clouds_config.get('cacert')
project_name = (auth.get('project_name') or
auth.get('tenant_namer') or
auth.get('project_id') or
auth.get_env('tenant_id'))
api_version = (int(clouds_config.get("identity_api_version", 0)) or
_credentials.api_version_from_url(auth_url))
if api_version == 2:
return _credentials.keystone_credentials(
api_version=api_version,
auth_url=auth_url,
username=username,
password=password,
project_name=project_name)
else:
domain_name = (auth.get("domain_name") or
auth.get("domain_id"))
user_domain_name = (auth.get("user_domain_name") or
auth.get("user_domain_id"))
project_domain_name = auth.get("project_domain_name")
project_domain_id = auth.get("project_domain_id")
trust_id = auth.get("trust_id")
return _credentials.keystone_credentials(
api_version=api_version,
auth_url=auth_url,
username=username,
password=password,
project_name=project_name,
domain_name=domain_name,
user_domain_name=user_domain_name,
project_domain_name=project_domain_name,
project_domain_id=project_domain_id,
cacert=cacert,
trust_id=trust_id)
@functools.lru_cache()
def load_clouds_file_content(connection: sh.ShellConnection,
filename: str) \
-> CloudsFileContentType:
with connection.open_file(filename, 'r') as f:
_, suffix = os.path.splitext(filename)
if suffix in JSON_SUFFIXES:
LOG.debug(f'Load JSON clouds file: {filename!r}')
return json.load(f)
else:
clouds_files = list(self.clouds_files)
for filename in clouds_files:
if os.path.exists(filename):
LOG.debug('Found clouds file at %r', filename)
self.clouds_file = clouds_file = filename
break
else:
raise CloudsFileNotFoundError(clouds_files=', '.join(clouds_files))
return clouds_file
_credentials.DEFAULT_KEYSTONE_CREDENTIALS_FIXTURES.insert(
0, CloudsFileKeystoneCredentialsFixture)
LOG.debug(f'Load YAML clouds file: {filename!r}')
return tobiko.load_yaml(f)

View File

@ -13,50 +13,61 @@
# under the License.
from __future__ import absolute_import
import collections
import os
import functools
import json
import sys
import typing # noqa
import typing
from oslo_log import log
import testtools
import tobiko
from tobiko.shell import sh
from tobiko.shell import ssh
LOG = log.getLogger(__name__)
class KeystoneCredentials(collections.namedtuple(
'KeystoneCredentials', ['api_version',
'auth_url',
'username',
'password',
'project_name',
'domain_name',
'user_domain_name',
'project_domain_name',
'project_domain_id',
'cacert',
'trust_id'])):
_REQUIRED_CREDENTIALS_PARAMS = (
'auth_url', 'username', 'password', 'project_name')
def to_dict(self):
class KeystoneCredentials(typing.NamedTuple):
auth_url: str
username: str
password: str
project_name: str
api_version: typing.Optional[int] = None
domain_name: typing.Optional[str] = None
user_domain_name: typing.Optional[str] = None
project_domain_name: typing.Optional[str] = None
project_domain_id: typing.Optional[str] = None
cacert: typing.Optional[str] = None
trust_id: typing.Optional[str] = None
def to_dict(self) -> typing.Dict[str, typing.Any]:
# pylint: disable=no-member
return {k: v
for k, v in self._asdict().items()
if v is not None}
def to_json(self, indent=4, sort_keys=True) -> str:
return json.dumps(self.to_dict(),
sort_keys=sort_keys,
indent=indent)
def __repr__(self):
params = self.to_dict()
if 'password' in params:
params['password'] = '***'
return 'keystone_credentials({!s})'.format(
", ".join("{!s}={!r}".format(k, v)
for k, v in sorted(params.items())))
params_dump = ', '.join(f"{name}={value!r}"
for name, value in sorted(params.items()))
return f'keystone_credentials({params_dump})'
required_params = ('auth_url', 'username', 'password', 'project_name')
def validate(self, required_params=None):
required_params = required_params or self.required_params
def validate(self, required_params: typing.Iterable[str] = None):
if required_params is None:
required_params = _REQUIRED_CREDENTIALS_PARAMS
missing_params = [p
for p in required_params
if not getattr(self, p)]
@ -66,96 +77,123 @@ class KeystoneCredentials(collections.namedtuple(
raise InvalidKeystoneCredentials(credentials=self, reason=reason)
class NoSuchCredentialsError(tobiko.TobikoException):
message = "No such credentials from any of: {fixtures}"
class NoSuchKeystoneCredentials(tobiko.ObjectNotFound):
message = "no such credentials. {reason}"
class KeystoneCredentialsFixture(tobiko.SharedFixture):
credentials: typing.Optional[KeystoneCredentials] = None
def __init__(self,
credentials: typing.Optional[KeystoneCredentials] = None):
super(KeystoneCredentialsFixture, self).__init__()
if credentials is not None:
self.credentials = credentials
credentials: KeystoneCredentials = None,
connection: sh.ShellConnectionType = None,
environ: typing.Dict[str, str] = None):
super().__init__()
self.credentials = credentials
self._connection = connection
self._environ = environ
@property
def connection(self) -> sh.ShellConnection:
if self._connection is None:
self._connection = self._get_connection()
self.addCleanup(self._cleanup_connection)
if not isinstance(self._connection, sh.ShellConnection):
self._connection = sh.shell_connection(self._connection)
return self._connection
@property
def environ(self) -> typing.Dict[str, str]:
if self._environ is None:
environ = self._get_environ()
self._environ = {
name: value
for name, value in environ.items()
if name.startswith('OS_')}
self.addCleanup(self._cleanup_environ)
return self._environ
@property
def login(self):
return self.connection.login
def setup_fixture(self):
self.setup_credentials()
assert self.credentials is not None
self.credentials.validate()
def setup_credentials(self):
credentials = self.credentials
if credentials is None:
credentials = self.get_credentials()
if credentials is not None:
try:
credentials.validate()
except InvalidKeystoneCredentials as ex:
LOG.info("No such valid credentials from %r (%r)",
self, ex)
else:
self.addCleanup(self.cleanup_credentials)
self.credentials = credentials
def cleanup_credentials(self):
del self.credentials
def get_credentials(self) -> typing.Optional[KeystoneCredentials]:
def setup_credentials(self) -> KeystoneCredentials:
if self.credentials is None:
LOG.debug('Getting credentials...\n'
f" login: {self.login}\n"
f" fixture: {tobiko.get_fixture_name(self)}\n")
self.credentials = self._get_credentials()
assert self.credentials is not None
credentials_dump = json.dumps(self.credentials.to_dict(),
sort_keys=True,
indent=4)
LOG.debug('Got credentials:\n'
f" login: {self.login}\n"
f" fixture: {tobiko.get_fixture_name(self)}\n"
" credentials:\n"
f"{credentials_dump}\n")
self.addCleanup(self._cleanup_credentials)
return self.credentials
def _get_credentials(self) -> KeystoneCredentials:
raise NoSuchKeystoneCredentials(
reason=f"[{self.fixture_name}] credentials not assigned")
KeystoneCredentialsType = typing.Union[None,
KeystoneCredentials,
KeystoneCredentialsFixture,
str,
typing.Type]
def _cleanup_credentials(self):
self.credentials = None
def _get_connection(self) -> sh.ShellConnectionType:
return sh.local_shell_connection()
def _cleanup_connection(self):
self._connection = None
def _get_environ(self) -> typing.Dict[str, str]:
return self.connection.get_environ()
def _cleanup_environ(self):
self._environ = None
def __repr__(self) -> str:
return f'<{self.fixture_name} {self.login}>'
def get_keystone_credentials(obj: KeystoneCredentialsType = None) -> \
typing.Optional[KeystoneCredentials]:
if obj is None:
return default_keystone_credentials()
if isinstance(obj, KeystoneCredentials):
return obj
if tobiko.is_fixture(obj):
obj = tobiko.get_fixture(obj)
if isinstance(obj, KeystoneCredentialsFixture):
obj = tobiko.setup_fixture(obj).credentials
return get_keystone_credentials(obj)
raise TypeError(f"Can't get {KeystoneCredentials} object from {obj}")
KeystoneCredentialsType = typing.Union[
KeystoneCredentials,
KeystoneCredentialsFixture,
typing.Type[KeystoneCredentialsFixture]]
def default_keystone_credentials() -> typing.Optional[KeystoneCredentials]:
credentials = tobiko.setup_fixture(
DefaultKeystoneCredentialsFixture).credentials
if credentials is not None:
tobiko.check_valid_type(credentials, KeystoneCredentials)
def keystone_credentials(credentials: KeystoneCredentialsType = None,
**params) \
-> KeystoneCredentials:
if credentials is None:
if params:
credentials = KeystoneCredentials(**params)
params = {}
else:
credentials = default_keystone_credentials()
assert credentials is not None
if tobiko.is_fixture(credentials):
credentials = tobiko.get_fixture(credentials)
if isinstance(credentials, KeystoneCredentialsFixture):
credentials = tobiko.setup_fixture(credentials).credentials
assert isinstance(credentials, KeystoneCredentials)
if params:
params = credentials.to_dict()
params.update(**params)
credentials = KeystoneCredentials(**params)
return credentials
def keystone_credentials(api_version=None,
auth_url=None,
username=None,
password=None,
project_name=None,
domain_name=None,
user_domain_name=None,
project_domain_name=None,
project_domain_id=None,
cacert=None,
trust_id=None,
cls=KeystoneCredentials) -> KeystoneCredentials:
return cls(api_version=api_version,
auth_url=auth_url,
username=username,
password=password,
project_name=project_name,
domain_name=domain_name,
user_domain_name=user_domain_name,
project_domain_name=project_domain_name,
project_domain_id=project_domain_id,
cacert=cacert,
trust_id=trust_id)
def default_keystone_credentials() -> KeystoneCredentials:
return tobiko.setup_fixture(
DelegateKeystoneCredentialsFixture).credentials
class InvalidKeystoneCredentials(tobiko.TobikoException):
@ -164,29 +202,12 @@ class InvalidKeystoneCredentials(tobiko.TobikoException):
class EnvironKeystoneCredentialsFixture(KeystoneCredentialsFixture):
environ: typing.Optional[typing.Dict[str, str]] = None
def __init__(self,
credentials: typing.Optional[KeystoneCredentials] = None,
environ: typing.Optional[typing.Dict[str, str]] = None):
super(EnvironKeystoneCredentialsFixture, self).__init__(
credentials=credentials)
if environ is not None:
self.environ = environ
def setup_fixture(self):
if self.environ is None:
self.environ = self.get_environ()
super(EnvironKeystoneCredentialsFixture, self).setup_fixture()
def get_environ(self) -> typing.Optional[typing.Dict[str, str]]:
return dict(os.environ)
def get_credentials(self) -> typing.Optional[KeystoneCredentials]:
def _get_credentials(self) -> KeystoneCredentials:
auth_url = self.get_env('OS_AUTH_URL')
if not auth_url:
LOG.debug("OS_AUTH_URL environment variable not defined")
return None
raise NoSuchKeystoneCredentials(
reason=(f"[{self.fixture_name}] OS_AUTH_URL environment "
f"variable is {auth_url!r}"))
api_version = (
self.get_int_env('OS_IDENTITY_API_VERSION') or
@ -232,11 +253,7 @@ class EnvironKeystoneCredentialsFixture(KeystoneCredentialsFixture):
trust_id=trust_id)
def get_env(self, name) -> typing.Optional[str]:
environ = self.environ
if environ is None:
return None
else:
return environ.get(name)
return self.environ.get(name)
def get_int_env(self, name) -> typing.Optional[int]:
value = self.get_env(name=name)
@ -246,13 +263,16 @@ class EnvironKeystoneCredentialsFixture(KeystoneCredentialsFixture):
return int(value)
def has_keystone_credentials(obj=None) -> bool:
@functools.lru_cache()
def has_keystone_credentials(obj: KeystoneCredentialsType = None) -> bool:
try:
credentials = get_keystone_credentials(obj)
except NoSuchCredentialsError:
credentials = keystone_credentials(obj)
except NoSuchKeystoneCredentials:
LOG.debug('Openstack Keystone credentials not found', exc_info=True)
return False
else:
return credentials is not None
LOG.debug(f'Openstack Keystone credentials found: {credentials!r}')
return True
def skip_unless_has_keystone_credentials(*args, **kwargs):
@ -262,14 +282,13 @@ def skip_unless_has_keystone_credentials(*args, **kwargs):
class ConfigKeystoneCredentialsFixture(KeystoneCredentialsFixture):
def get_credentials(self) -> typing.Optional[KeystoneCredentials]:
from tobiko import config
conf = config.CONF.tobiko.keystone
def _get_credentials(self) -> KeystoneCredentials:
conf = tobiko.tobiko_config().keystone
auth_url = conf.auth_url
if not auth_url:
LOG.debug("auth_url option not defined in 'keystone' section of "
"tobiko.conf")
return None
raise NoSuchKeystoneCredentials(
reason="'auth_url' option not defined in 'keystone' section "
"of 'tobiko.conf' file")
api_version = (conf.api_version or
api_version_from_url(auth_url))
@ -294,39 +313,75 @@ class ConfigKeystoneCredentialsFixture(KeystoneCredentialsFixture):
trust_id=conf.trust_id)
DEFAULT_KEYSTONE_CREDENTIALS_FIXTURES = [
EnvironKeystoneCredentialsFixture,
ConfigKeystoneCredentialsFixture]
class DelegateKeystoneCredentialsFixture(KeystoneCredentialsFixture):
def __init__(self,
delegates: typing.Iterable[KeystoneCredentialsFixture] = None,
credentials: KeystoneCredentials = None,
connection: sh.ShellConnectionType = None,
environ: typing.Dict[str, str] = None):
super().__init__(credentials=credentials,
connection=connection,
environ=environ)
if delegates is not None:
delegates = list(delegates)
self._delegates = delegates
class DefaultKeystoneCredentialsFixture(KeystoneCredentialsFixture):
@property
def delegates(self) -> typing.List[KeystoneCredentialsFixture]:
if self._delegates is None:
self._delegates = self._get_delegates()
return self._delegates
fixtures = DEFAULT_KEYSTONE_CREDENTIALS_FIXTURES
@staticmethod
def _get_delegates() -> typing.List[KeystoneCredentialsFixture]:
from tobiko.openstack.keystone import _clouds_file
def get_credentials(self) -> typing.Optional[KeystoneCredentials]:
errors = []
for fixture in self.fixtures:
delegates: typing.List[KeystoneCredentialsFixture] = []
keystone_conf = tobiko.tobiko_config().keystone
hosts = keystone_conf.clouds_file_hosts
if hosts:
for host in hosts:
delegates.append(
_clouds_file.CloudsFileKeystoneCredentialsFixture(
connection=host))
proxy_client = ssh.ssh_proxy_client()
if proxy_client is not None:
delegates.append(
_clouds_file.CloudsFileKeystoneCredentialsFixture(
connection=proxy_client))
delegates.append(
tobiko.get_fixture(EnvironKeystoneCredentialsFixture))
delegates.append(
tobiko.get_fixture(ConfigKeystoneCredentialsFixture))
return delegates
def _get_credentials(self) -> KeystoneCredentials:
for delegate in self.delegates:
try:
credentials = tobiko.setup_fixture(fixture).credentials
except Exception as ex:
LOG.debug("Error getting credentials from %r: %s",
tobiko.get_fixture_name(fixture), ex)
errors.append(tobiko.exc_info())
continue
return tobiko.setup_fixture(delegate).credentials
except NoSuchKeystoneCredentials as ex:
LOG.debug(f'Got no credentials from {delegate!r}:\n'
f' {ex}\n')
raise NoSuchKeystoneCredentials(
reason=f'[{self.fixture_name}] no credentials from '
f'delegates: {self.delegates!r}')
if credentials:
LOG.info("Got default credentials from fixture %r: %r",
fixture, credentials)
return credentials
else:
LOG.debug('Got no credentials from %r', fixture)
if len(errors) == 1:
errors[0].reraise()
elif errors:
raise testtools.MultipleExceptions(errors)
raise NoSuchCredentialsError(fixtures=self.fixtures)
def register_default_keystone_credentials(
credentials: KeystoneCredentialsFixture,
delegate: DelegateKeystoneCredentialsFixture = None,
position: int = None):
if delegate is None:
delegate = tobiko.get_fixture(DelegateKeystoneCredentialsFixture)
tobiko.check_valid_type(credentials, KeystoneCredentialsFixture)
if position is None:
delegate.delegates.append(credentials)
else:
delegate.delegates.insert(position, credentials)
def api_version_from_url(auth_url) -> typing.Optional[int]:
@ -342,8 +397,8 @@ def api_version_from_url(auth_url) -> typing.Optional[int]:
return None
def print_credentials():
credentials = default_keystone_credentials()
def print_credentials(credentials: KeystoneCredentialsType = None):
credentials = keystone_credentials(credentials)
tobiko.dump_yaml(dict(credentials.to_dict()),
sys.stdout,
indent=4,

View File

@ -27,143 +27,149 @@ from tobiko import http
LOG = log.getLogger(__name__)
KEYSTONE_SESSION_CLASSES = _session.Session,
KeystoneSession = typing.Union[_session.Session]
class KeystoneSessionFixture(tobiko.SharedFixture):
session: typing.Optional[_session.Session] = None
credentials: _credentials.KeystoneCredentialsType = None
VALID_CREDENTIALS_TYPES = (_credentials.KeystoneCredentials,
_credentials.KeystoneCredentialsFixture,
type, str)
type)
def __init__(self,
credentials: _credentials.KeystoneCredentialsType = None,
session: typing.Optional[_session.Session] = None):
session: KeystoneSession = None):
super(KeystoneSessionFixture, self).__init__()
if credentials is not None:
tobiko.check_valid_type(credentials, *self.VALID_CREDENTIALS_TYPES)
self.credentials = credentials
if session is not None:
self.session = session
self._credentials = credentials
self.session = session
@property
def credentials(self) -> _credentials.KeystoneCredentials:
if self._credentials is None:
self._credentials = self._get_credentials()
elif not isinstance(self._credentials,
_credentials.KeystoneCredentials):
self._credentials = _credentials.keystone_credentials(
self._credentials)
return self._credentials
def setup_fixture(self):
self.setup_session()
def setup_session(self):
session = self.session
if session is None:
credentials = _credentials.get_keystone_credentials(
self.credentials)
if self.session is None:
self.session = self._get_session()
LOG.debug("Create Keystone session from credentials "
f"{credentials}")
credentials.validate()
loader = loading.get_plugin_loader('password')
params = credentials.to_dict()
# api version parameter is not accepted
params.pop('api_version', None)
params.pop('cacert', None)
auth = loader.load_from_options(**params)
self.session = session = _session.Session(auth=auth, verify=False)
http.setup_http_session(session)
self.credentials = credentials
def _get_session(self) -> KeystoneSession:
credentials = self.credentials
LOG.debug("Create Keystone session from credentials "
f"{credentials}")
credentials.validate()
loader = loading.get_plugin_loader('password')
params = credentials.to_dict()
# api version parameter is not accepted
params.pop('api_version', None)
params.pop('cacert', None)
auth = loader.load_from_options(**params)
session = _session.Session(auth=auth, verify=False)
http.setup_http_session(session)
return session
@staticmethod
def _get_credentials() -> _credentials.KeystoneCredentials:
return _credentials.default_keystone_credentials()
KeystoneSessionType = typing.Union[None,
_session.Session,
typing.Type,
str,
KeystoneSessionFixture]
KeystoneSessionType = typing.Union[KeystoneSession,
KeystoneSessionFixture,
typing.Type[KeystoneSessionFixture]]
def keystone_session(obj: KeystoneSessionType) -> _session.Session:
def keystone_session(obj: KeystoneSessionType = None) -> KeystoneSession:
if obj is None:
return default_keystone_session()
if tobiko.is_fixture(obj):
obj = tobiko.get_fixture(obj)
if isinstance(obj, KeystoneSessionFixture):
obj = tobiko.setup_fixture(obj).session
if isinstance(obj, _session.Session):
return obj
raise TypeError(f"Can't get {_session.Session} object from {obj}")
return tobiko.check_valid_type(obj, KEYSTONE_SESSION_CLASSES)
InitSessionType = typing.Optional[typing.Callable]
InitSessionType = typing.Callable[[_credentials.KeystoneCredentials],
KeystoneSessionFixture]
class KeystoneSessionManager(object):
def __init__(self):
self.sessions: typing.Dict[typing.Any,
self.sessions: typing.Dict[_credentials.KeystoneCredentials,
KeystoneSessionFixture] = {}
def get_session(self,
credentials: typing.Any = None,
credentials: _credentials.KeystoneCredentialsType = None,
init_session: InitSessionType = None,
shared: bool = True) \
-> KeystoneSessionFixture:
credentials = _credentials.keystone_credentials(credentials)
if shared:
shared_key, session = self.get_shared_session(credentials)
else:
shared_key = session = None
if session is None:
return self.create_session(credentials=credentials,
init_session=init_session,
shared=shared,
shared_key=shared_key)
else:
return session
def get_shared_session(self, credentials: typing.Any) \
-> typing.Tuple[typing.Any,
typing.Optional[KeystoneSessionFixture]]:
if tobiko.is_fixture(credentials):
key = tobiko.get_fixture_name(credentials)
else:
key = credentials
return key, self.sessions.get(key)
def create_session(self,
credentials: typing.Any = None,
init_session: InitSessionType = None,
shared: bool = True,
shared_key: typing.Any = None) \
-> KeystoneSessionFixture:
if init_session is None:
init_session = KeystoneSessionFixture
assert callable(init_session)
LOG.debug('Initialize Keystone session: %r(credentials=%r)',
init_session, credentials)
session: KeystoneSessionFixture = init_session(
credentials=credentials)
tobiko.check_valid_type(session, KeystoneSessionFixture)
session = self.sessions.get(credentials)
if session is not None:
return session
session = self.create_session(credentials=credentials,
init_session=init_session)
if shared:
self.sessions[shared_key] = session
self.sessions[credentials] = session
return session
def create_session(self,
credentials: _credentials.KeystoneCredentials,
init_session: InitSessionType = None) \
-> KeystoneSessionFixture:
if init_session is None:
init_session = self.init_session
assert callable(init_session)
LOG.debug('Initialize Keystone session:\n'
f" init_session: {init_session}\n"
f" credentials: {credentials}\n")
session = init_session(credentials)
LOG.debug('Got new Keystone session:\n'
f" init_session: {init_session}\n"
f" credentials: {credentials}\n"
f" session: {session}\n")
return tobiko.check_valid_type(session, KeystoneSessionFixture)
SESSIONS = KeystoneSessionManager()
@staticmethod
def init_session(credentials: _credentials.KeystoneCredentials) \
-> KeystoneSessionFixture:
return KeystoneSessionFixture(credentials=credentials)
KEYSTONE_SESSION_MANAGER = KeystoneSessionManager()
def default_keystone_session(
shared: bool = True,
init_session: InitSessionType = None,
manager: typing.Optional[KeystoneSessionManager] = None) -> \
_session.Session:
return get_keystone_session(shared=shared, init_session=init_session,
manager: KeystoneSessionManager = None) -> \
KeystoneSession:
return get_keystone_session(shared=shared,
init_session=init_session,
manager=manager)
def get_keystone_session(
credentials: typing.Any = None,
credentials: _credentials.KeystoneCredentialsType = None,
shared: bool = True,
init_session: typing.Any = None,
manager: typing.Optional[KeystoneSessionManager] = None) -> \
_session.Session:
manager: KeystoneSessionManager = None) -> \
KeystoneSession:
if manager is None:
manager = SESSIONS
session = manager.get_session(credentials=credentials, shared=shared,
manager = KEYSTONE_SESSION_MANAGER
session = manager.get_session(credentials=credentials,
shared=shared,
init_session=init_session)
tobiko.check_valid_type(session, KeystoneSessionFixture)
return tobiko.setup_fixture(session).session

View File

@ -54,6 +54,9 @@ OPTIONS = [
default=None,
help=("Cloud name used pick authentication parameters from "
"clouds.*")),
cfg.ListOpt('clouds_file_hosts',
default=['localhost'],
help="Host login from where to search for clouds file"),
cfg.ListOpt('clouds_file_dirs',
default=['.', '~/.config/openstack', '/etc/openstack'],
help="Directories where to look for clouds files"),

View File

@ -54,7 +54,7 @@ def execute(cmd, *args, **kwargs):
def _param_list(*args, **kwargs):
if not any(param in kwargs for param in ['os-token', 'os-username']):
credentials = keystone.get_keystone_credentials()
credentials = keystone.keystone_credentials()
tmp_auth = {}
tmp_auth['os-auth-url'] = credentials.auth_url
tmp_auth['os-password'] = credentials.password

View File

@ -13,37 +13,33 @@
# under the License.
from __future__ import absolute_import
import typing
import tobiko
from tobiko.openstack import keystone
from tobiko.shiftstack import _clouds_file
from tobiko import tripleo
def load_shiftstack_rcfile() -> typing.Dict[str, str]:
conf = tobiko.tobiko_config().shiftstack
return tripleo.fetch_os_env(*conf.rcfile)
class ShiftstackKeystoneCredentialsFixture(
keystone.CloudsFileKeystoneCredentialsFixture):
tripleo.UndercloudCloudsFileKeystoneCredentialsFixture):
clouds_file_fixture = tobiko.required_fixture(
_clouds_file.ShiftStackCloudsFileFixture, setup=False)
@staticmethod
def _get_default_cloud_name() -> typing.Optional[str]:
return tobiko.tobiko_config().shiftstack.cloud_name
def __init__(self,
cloud_name: str = None,
clouds_file: str = None):
if clouds_file is None:
clouds_file = self.clouds_file_fixture.local_clouds_file_path
if cloud_name is None:
cloud_name = tobiko.tobiko_config().shiftstack.cloud_name
super().__init__(clouds_file=clouds_file,
cloud_name=cloud_name)
def setup_fixture(self):
tobiko.setup_fixture(self.clouds_file_fixture)
super().setup_fixture()
def _get_environ(self) -> typing.Dict[str, str]:
return load_shiftstack_rcfile()
def shiftstack_keystone_session():
return keystone.get_keystone_session(
credentials=ShiftstackKeystoneCredentialsFixture)
def shiftstack_keystone_session() -> keystone.KeystoneSession:
credentials = shiftstack_keystone_credentials()
return keystone.get_keystone_session(credentials=credentials)
def shiftstack_keystone_credentials():
return tobiko.setup_fixture(
ShiftstackKeystoneCredentialsFixture).credentials
def shiftstack_keystone_credentials() -> keystone.KeystoneCredentialsFixture:
return tobiko.get_fixture(ShiftstackKeystoneCredentialsFixture)

View File

@ -13,37 +13,46 @@
# under the License.
from __future__ import absolute_import
import functools
from oslo_log import log
import tobiko
from tobiko.openstack import keystone
from tobiko.shiftstack import _keystone
from tobiko import tripleo
LOG = log.getLogger(__name__)
class HasShiftstackFixture(tobiko.SharedFixture):
def check_shiftstack():
try:
tripleo.check_overcloud()
except tripleo.OvercloudNotFound as ex:
raise ShiftstackNotFound(
reason=f'Overcloud not found ({ex})') from ex
def __init__(self,
has_shiftstack: bool = None):
# pylint: disable=redefined-outer-name
super(HasShiftstackFixture, self).__init__()
self.has_shiftstack = has_shiftstack
def setup_fixture(self):
if self.has_shiftstack is None:
try:
_keystone.shiftstack_keystone_session()
except Exception:
LOG.debug('Shifstack credentials not found', exc_info=1)
self.has_shiftstack = False
else:
LOG.debug('Shifstack credentials was found')
self.has_shiftstack = True
try:
_keystone.shiftstack_keystone_session()
except keystone.NoSuchKeystoneCredentials as ex:
raise ShiftstackNotFound(
reason=f'Keystone credentials not found ({ex})') from ex
class ShiftstackNotFound(tobiko.ObjectNotFound):
message = 'shiftstack not found: {reason}'
@functools.lru_cache()
def has_shiftstack() -> bool:
return tobiko.setup_fixture(HasShiftstackFixture).has_shiftstack
try:
check_shiftstack()
except ShiftstackNotFound as ex:
LOG.debug(f'Shiftstack not found: {ex}')
return False
else:
LOG.debug('Shiftstack found')
return True
def skip_unless_has_shiftstack():

View File

@ -30,6 +30,10 @@ OPTIONS = [
cfg.StrOpt('cloud_name',
default='shiftstack',
help="Keystone credentials cloud name"),
cfg.ListOpt('rcfile',
default=['./shiftstackrc'],
help="Path to the RC file used to populate OS_* environment "
"variables")
]

View File

@ -21,6 +21,7 @@ import pandas as pd
import testtools
from tobiko import config
from tobiko.openstack import keystone
from tobiko.openstack import metalsmith
from tobiko import tripleo
from tobiko.tripleo import pacemaker
@ -44,6 +45,24 @@ class OvercloudKeystoneCredentialsTest(testtools.TestCase):
env.get('OS_TENANT_ID') or
env.get('OS_PROJECT_ID'))
def test_overcloud_keystone_credentials(self):
fixture = tripleo.overcloud_keystone_credentials()
self.assertIsInstance(fixture,
keystone.KeystoneCredentialsFixture)
credentials = keystone.keystone_credentials(fixture)
credentials.validate()
def test_overcloud_keystone_session(self):
session = tripleo.overcloud_keystone_session()
client = keystone.get_keystone_client(session=session)
endpoints = keystone.list_endpoints(client=client)
self.assertNotEqual([], endpoints)
def test_overcloud_keystone_client(self):
client = tripleo.overcloud_keystone_client()
_services = keystone.list_services(client=client)
self.assertTrue(_services)
@tripleo.skip_if_missing_overcloud
class OvercloudMetalsmithApiTest(testtools.TestCase):

View File

@ -58,8 +58,10 @@ class UndercloudSshConnectionTest(testtools.TestCase):
class UndercloudKeystoneClientTest(testtools.TestCase):
def test_undercloud_keystone_credentials(self):
credentials = tripleo.undercloud_keystone_credentials()
self.assertIsInstance(credentials, keystone.KeystoneCredentials)
fixture = tripleo.undercloud_keystone_credentials()
self.assertIsInstance(fixture,
keystone.KeystoneCredentialsFixture)
credentials = keystone.keystone_credentials(fixture)
credentials.validate()
def test_undercloud_keystone_session(self):

View File

@ -1,311 +0,0 @@
# Copyright (c) 2019 Red Hat
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import json
import os
import tempfile
import typing # noqa
import tobiko
from tobiko.openstack import keystone
from tobiko.openstack.keystone import _clouds_file
from tobiko.tests.unit import openstack
from tobiko.tests.unit.openstack.keystone import test_credentials
def make_clouds_content(cloud_name, api_version=None, auth=None):
content = {}
if api_version is not None:
content['identity_api_version'] = api_version
if auth is not None:
content['auth'] = auth
return {'clouds': {cloud_name: content}}
class CloudsFileFixture(tobiko.SharedFixture):
cloud_name = None # type: str
api_version = None # type: str
auth = None # type: typing.Dict[str, typing.Any]
clouds_content = None
clouds_file = None
suffix = '.yaml'
create_file = True
def __init__(self, cloud_name=None, api_version=None, auth=None,
clouds_file=None, suffix=None, create_file=None,
clouds_content=None):
super(CloudsFileFixture, self).__init__()
if cloud_name is not None:
self.cloud_name = cloud_name
if api_version is not None:
self.api_version = api_version
if auth is not None:
self.auth = auth
if clouds_file is not None:
self.clouds_file = clouds_file
if suffix is not None:
self.suffix = suffix
if create_file is not None:
self.create_file = create_file
if clouds_content is not None:
self.clouds_content = clouds_content
def setup_fixture(self):
clouds_content = self.clouds_content
if clouds_content is None:
self.clouds_content = clouds_content = make_clouds_content(
cloud_name=self.cloud_name, api_version=self.api_version,
auth=self.auth)
if self.create_file:
clouds_file = self.clouds_file
if clouds_file is None:
fd, clouds_file = tempfile.mkstemp(suffix=self.suffix)
self.addCleanup(os.remove, clouds_file)
self.clouds_file = clouds_file
clouds_stream = os.fdopen(fd, 'wt')
else:
clouds_stream = os.open(clouds_file, 'wt')
try:
if self.suffix in _clouds_file.JSON_SUFFIXES:
json.dump(clouds_content, clouds_stream)
elif self.suffix in _clouds_file.YAML_SUFFIXES:
tobiko.dump_yaml(clouds_content, clouds_stream)
finally:
clouds_stream.close()
class V2CloudsFileFixture(CloudsFileFixture):
cloud_name = 'V2-TEST_CLOUD'
auth = test_credentials.V2_PARAMS
class V3CloudsFileFixture(CloudsFileFixture):
cloud_name = 'V3-TEST_CLOUD'
auth = test_credentials.V3_PARAMS
class CloudsFileKeystoneCredentialsFixtureTest(openstack.OpenstackTest):
config = tobiko.required_fixture(
_clouds_file.DefaultCloudsFileConfig)
def test_init(self):
fixture = keystone.CloudsFileKeystoneCredentialsFixture()
self.assertEqual(self.config.cloud_name, fixture.cloud_name)
self.assertIsNone(fixture.clouds_content)
self.assertIsNone(fixture.clouds_file)
self.assertEqual(self.config.clouds_files, fixture.clouds_files)
def test_init_with_cloud_name(self):
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name='cloud-name')
self.assertEqual('cloud-name', fixture.cloud_name)
def test_init_with_clouds_content(self):
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
clouds_content={})
self.assertEqual({}, fixture.clouds_content)
def test_init_with_clouds_file(self):
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
clouds_file='cloud-file')
self.assertEqual('cloud-file', fixture.clouds_file)
def test_init_with_clouds_files(self):
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
clouds_files=['a', 'b', 'd'])
self.assertEqual(['a', 'b', 'd'], fixture.clouds_files)
def test_setup_from_default_clouds_files(self):
file_fixture = self.useFixture(V3CloudsFileFixture())
self.patch(self.config, 'clouds_files',
['/a', file_fixture.clouds_file, '/c'])
credentials_fixture = self.useFixture(
keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name=file_fixture.cloud_name))
self.assertEqual(file_fixture.clouds_content,
credentials_fixture.clouds_content)
self.assertEqual(test_credentials.V3_PARAMS,
credentials_fixture.credentials.to_dict())
def test_setup_from_json(self):
file_fixture = self.useFixture(V3CloudsFileFixture(suffix='.json'))
credentials_fixture = self.useFixture(
keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name=file_fixture.cloud_name,
clouds_file=file_fixture.clouds_file))
self.assertEqual(file_fixture.clouds_content,
credentials_fixture.clouds_content)
self.assertEqual(test_credentials.V3_PARAMS,
credentials_fixture.credentials.to_dict())
def test_setup_from_yaml(self):
file_fixture = self.useFixture(V3CloudsFileFixture(suffix='.yaml'))
credentials_fixture = self.useFixture(
keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name=file_fixture.cloud_name,
clouds_file=file_fixture.clouds_file))
self.assertEqual(file_fixture.clouds_content,
credentials_fixture.clouds_content)
self.assertEqual(test_credentials.V3_PARAMS,
credentials_fixture.credentials.to_dict())
def test_setup_from_yml(self):
file_fixture = self.useFixture(V3CloudsFileFixture(suffix='.yml'))
credentials_fixture = self.useFixture(
keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name=file_fixture.cloud_name,
clouds_file=file_fixture.clouds_file))
self.assertEqual(file_fixture.clouds_content,
credentials_fixture.clouds_content)
self.assertEqual(test_credentials.V3_PARAMS,
credentials_fixture.credentials.to_dict())
def test_setup_v2_credentials(self):
file_fixture = self.useFixture(V2CloudsFileFixture())
credentials_fixture = self.useFixture(
keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name=file_fixture.cloud_name,
clouds_file=file_fixture.clouds_file))
self.assertEqual(file_fixture.clouds_content,
credentials_fixture.clouds_content)
self.assertEqual(test_credentials.V2_PARAMS,
credentials_fixture.credentials.to_dict())
def test_setup_with_cloud_name(self):
file_fixture = self.useFixture(V3CloudsFileFixture())
credentials_fixture = keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name='cloud-name',
clouds_file=file_fixture.clouds_file)
ex = self.assertRaises(ValueError, tobiko.setup_fixture,
credentials_fixture)
self.assertEqual("No such cloud with name 'cloud-name' in file " +
repr(file_fixture.clouds_file), str(ex))
def test_setup_with_cloud_name_from_env(self):
self.patch(self.config, 'cloud_name', None)
file_fixture = self.useFixture(V2CloudsFileFixture())
self.patch(os, 'environ', {'OS_CLOUD': file_fixture.cloud_name})
credentials_fixture = keystone.CloudsFileKeystoneCredentialsFixture(
clouds_file=file_fixture.clouds_file)
self.assertIsNone(credentials_fixture.cloud_name)
tobiko.setup_fixture(credentials_fixture)
self.assertEqual(file_fixture.cloud_name,
credentials_fixture.cloud_name)
def test_setup_with_empty_cloud_name(self):
file_fixture = self.useFixture(V2CloudsFileFixture())
credentials_fixture = keystone.CloudsFileKeystoneCredentialsFixture(
clouds_file=file_fixture.clouds_file,
cloud_name='')
self.assertIsNone(credentials_fixture.credentials)
self.assertEqual('', credentials_fixture.cloud_name)
tobiko.setup_fixture(credentials_fixture)
self.assertIsNone(credentials_fixture.credentials)
self.assertEqual('', credentials_fixture.cloud_name)
def test_setup_with_empty_cloud_name_from_env(self):
self.patch(self.config, 'cloud_name', None)
file_fixture = self.useFixture(V2CloudsFileFixture())
self.patch(os, 'environ', {'OS_CLOUD': ''})
credentials_fixture = keystone.CloudsFileKeystoneCredentialsFixture(
clouds_file=file_fixture.clouds_file)
self.assertIsNone(credentials_fixture.credentials)
self.assertIsNone(credentials_fixture.cloud_name)
tobiko.setup_fixture(credentials_fixture)
self.assertIsNone(credentials_fixture.credentials)
self.assertIsNone(credentials_fixture.cloud_name)
def test_setup_with_no_cloud_name(self):
self.patch(self.config, 'cloud_name', None)
file_fixture = self.useFixture(V2CloudsFileFixture())
credentials_fixture = keystone.CloudsFileKeystoneCredentialsFixture(
clouds_file=file_fixture.clouds_file)
self.assertIsNone(credentials_fixture.credentials)
self.assertIsNone(credentials_fixture.cloud_name)
tobiko.setup_fixture(credentials_fixture)
self.assertIsNone(credentials_fixture.credentials)
self.assertIsNone(credentials_fixture.cloud_name)
def test_setup_with_no_clouds_section(self):
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name='cloud-name', clouds_content={'other_data': None},
clouds_file='clouds-file')
ex = self.assertRaises(ValueError, tobiko.setup_fixture, fixture)
self.assertEqual('cloud-name', fixture.cloud_name)
self.assertEqual({'other_data': None}, fixture.clouds_content)
self.assertEqual("'clouds' section not found in clouds file "
"'clouds-file'", str(ex))
def test_setup_with_empty_clouds_content(self):
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name='cloud-name', clouds_content={})
ex = self.assertRaises(ValueError, tobiko.setup_fixture, fixture)
self.assertEqual('cloud-name', fixture.cloud_name)
self.assertEqual({}, fixture.clouds_content)
self.assertEqual('Invalid clouds file content: {}', str(ex))
def test_setup_with_no_auth(self):
clouds_content = make_clouds_content('cloud-name')
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name='cloud-name',
clouds_content=clouds_content,
clouds_file='cloud-file')
ex = self.assertRaises(ValueError, tobiko.setup_fixture, fixture)
self.assertEqual('cloud-name', fixture.cloud_name)
self.assertEqual(
"No such 'auth' section in cloud file 'cloud-file' for cloud "
"name 'cloud-name'", str(ex))
def test_setup_with_no_auth_url(self):
clouds_content = make_clouds_content('cloud-name', auth={})
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name='cloud-name',
clouds_content=clouds_content,
clouds_file='cloud-file')
ex = self.assertRaises(ValueError, tobiko.setup_fixture, fixture)
self.assertEqual('cloud-name', fixture.cloud_name)
self.assertEqual(
"No such 'auth_url' in file 'cloud-file' for cloud name "
"'cloud-name'", str(ex))
def test_setup_without_clouds_file(self):
self.patch(self.config, 'clouds_files', ['/a', '/b', '/c'])
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
cloud_name='cloud-name')
ex = self.assertRaises(_clouds_file.CloudsFileNotFoundError,
tobiko.setup_fixture, fixture)
self.assertEqual('cloud-name', fixture.cloud_name)
self.assertEqual("No such clouds file(s): /a, /b, /c", str(ex))
def test_setup_with_non_existing_clouds_file(self):
fixture = keystone.CloudsFileKeystoneCredentialsFixture(
clouds_file='/a.yaml',
cloud_name='cloud-name')
ex = self.assertRaises(_clouds_file.CloudsFileNotFoundError,
tobiko.setup_fixture, fixture)
self.assertEqual("No such clouds file(s): /a.yaml", str(ex))

View File

@ -21,7 +21,6 @@ import testtools
import tobiko
from tobiko import config
from tobiko.openstack import keystone
from tobiko.openstack.keystone import _credentials
from tobiko.tests.unit import openstack
@ -127,45 +126,46 @@ class KeystoneCredentialsTest(openstack.OpenstackTest):
class EnvironKeystoneCredentialsFixtureTest(openstack.OpenstackTest):
def test_init(self):
fixture = _credentials.EnvironKeystoneCredentialsFixture()
fixture = keystone.EnvironKeystoneCredentialsFixture()
self.assertIsNone(fixture.credentials)
def test_setup_with_no_credentials(self):
fixture = _credentials.EnvironKeystoneCredentialsFixture()
fixture.setUp()
self.assertIsNone(fixture.credentials)
fixture = keystone.EnvironKeystoneCredentialsFixture()
self.assertRaises(keystone.NoSuchKeystoneCredentials,
tobiko.setup_fixture,
fixture)
def test_setup_v2(self):
self.patch(os, 'environ', V2_ENVIRON)
fixture = _credentials.EnvironKeystoneCredentialsFixture()
fixture = keystone.EnvironKeystoneCredentialsFixture()
fixture.setUp()
fixture.credentials.validate()
self.assertEqual(V2_PARAMS, fixture.credentials.to_dict())
def test_setup_v2_with_tenant_name(self):
self.patch(os, 'environ', V2_ENVIRON_WITH_TENANT_NAME)
fixture = _credentials.EnvironKeystoneCredentialsFixture()
fixture = keystone.EnvironKeystoneCredentialsFixture()
fixture.setUp()
fixture.credentials.validate()
self.assertEqual(V2_PARAMS, fixture.credentials.to_dict())
def test_setup_v2_with_api_version(self):
self.patch(os, 'environ', V2_ENVIRON_WITH_VERSION)
fixture = _credentials.EnvironKeystoneCredentialsFixture()
fixture = keystone.EnvironKeystoneCredentialsFixture()
fixture.setUp()
fixture.credentials.validate()
self.assertEqual(V2_PARAMS, fixture.credentials.to_dict())
def test_setup_v3(self):
self.patch(os, 'environ', V3_ENVIRON)
fixture = _credentials.EnvironKeystoneCredentialsFixture()
fixture = keystone.EnvironKeystoneCredentialsFixture()
fixture.setUp()
fixture.credentials.validate()
self.assertEqual(V3_PARAMS, fixture.credentials.to_dict())
def test_setup_v3_without_api_version(self):
self.patch(os, 'environ', V3_ENVIRON_WITH_VERSION)
fixture = _credentials.EnvironKeystoneCredentialsFixture()
fixture = keystone.EnvironKeystoneCredentialsFixture()
fixture.setUp()
fixture.credentials.validate()
self.assertEqual(V3_PARAMS, fixture.credentials.to_dict())
@ -178,73 +178,76 @@ class ConfigKeystoneCredentialsFixtureTest(openstack.OpenstackTest):
return self.patch(config.CONF.tobiko, 'keystone', credentials)
def test_init(self):
fixture = _credentials.ConfigKeystoneCredentialsFixture()
fixture = keystone.ConfigKeystoneCredentialsFixture()
self.assertIsNone(fixture.credentials)
def test_setup_v2(self):
self.patch_config(V2_PARAMS, api_version=None)
fixture = _credentials.ConfigKeystoneCredentialsFixture()
fixture = keystone.ConfigKeystoneCredentialsFixture()
fixture.setUp()
fixture.credentials.validate()
self.assertEqual(V2_PARAMS, fixture.credentials.to_dict())
def test_setup_v2_with_api_version(self):
self.patch_config(V2_PARAMS, api_version=2)
fixture = _credentials.ConfigKeystoneCredentialsFixture()
fixture = keystone.ConfigKeystoneCredentialsFixture()
fixture.setUp()
fixture.credentials.validate()
self.assertEqual(V2_PARAMS, fixture.credentials.to_dict())
def test_setup_v3(self):
self.patch_config(V3_PARAMS, api_version=None)
fixture = _credentials.ConfigKeystoneCredentialsFixture()
fixture = keystone.ConfigKeystoneCredentialsFixture()
fixture.setUp()
fixture.credentials.validate()
self.assertEqual(V3_PARAMS, fixture.credentials.to_dict())
def test_setup_v3_with_api_version(self):
self.patch_config(V3_PARAMS, api_version=3)
fixture = _credentials.ConfigKeystoneCredentialsFixture()
fixture = keystone.ConfigKeystoneCredentialsFixture()
fixture.setUp()
fixture.credentials.validate()
self.assertEqual(V3_PARAMS, fixture.credentials.to_dict())
class DefaultKeystoneCredentialsFixtureTest(openstack.OpenstackTest):
class DelegateKeystoneCredentialsFixtureTest(openstack.OpenstackTest):
def setUp(self):
super(DefaultKeystoneCredentialsFixtureTest, self).setUp()
super().setUp()
self.patch_config({})
self.patch(os, 'environ', {})
tobiko.remove_fixture(_credentials.ConfigKeystoneCredentialsFixture)
tobiko.remove_fixture(_credentials.EnvironKeystoneCredentialsFixture)
tobiko.remove_fixture(keystone.ConfigKeystoneCredentialsFixture)
tobiko.remove_fixture(keystone.EnvironKeystoneCredentialsFixture)
def patch_config(self, params, **kwargs):
credentials = make_credentials(params, **kwargs)
return self.patch(config.CONF.tobiko, 'keystone', credentials)
keystone_conf = tobiko.tobiko_config().keystone
for name, value in credentials.to_dict().items():
self.patch(keystone_conf, name, value)
tobiko.tobiko_config().ssh.proxy_jump = None
def test_init(self):
fixture = keystone.DefaultKeystoneCredentialsFixture()
fixture = keystone.DelegateKeystoneCredentialsFixture()
self.assertIsNone(fixture.credentials)
def test_setup_from_environ(self):
self.patch(os, 'environ', V2_ENVIRON)
fixture = keystone.DefaultKeystoneCredentialsFixture()
fixture.setUp()
fixture.credentials.validate()
self.assertEqual(V2_PARAMS, fixture.credentials.to_dict())
def test_setup_from_config(self):
self.patch_config(V2_PARAMS)
fixture = keystone.DefaultKeystoneCredentialsFixture()
fixture = keystone.DelegateKeystoneCredentialsFixture()
fixture.setUp()
fixture.credentials.validate()
self.assertEqual(V2_PARAMS, fixture.credentials.to_dict())
def test_setup_from_environ_and_confif(self):
def test_setup_from_environ(self):
self.patch(os, 'environ', V2_ENVIRON)
fixture = keystone.DelegateKeystoneCredentialsFixture()
fixture.setUp()
fixture.credentials.validate()
self.assertEqual(V2_PARAMS, fixture.credentials.to_dict())
def test_setup_from_environ_and_config(self):
self.patch(os, 'environ', V3_ENVIRON)
self.patch_config(V2_PARAMS)
fixture = keystone.DefaultKeystoneCredentialsFixture()
fixture = keystone.DelegateKeystoneCredentialsFixture()
fixture.setUp()
fixture.credentials.validate()
self.assertEqual(V3_PARAMS, fixture.credentials.to_dict())
@ -252,41 +255,15 @@ class DefaultKeystoneCredentialsFixtureTest(openstack.OpenstackTest):
class SkipUnlessHasKeystoneCredentialsTest(openstack.OpenstackTest):
def setUp(self):
super(SkipUnlessHasKeystoneCredentialsTest, self).setUp()
self.default_credentials = tobiko.setup_fixture(
keystone.DefaultKeystoneCredentialsFixture)
def patch_has_keystone_credentials(self, return_value: bool):
# pylint: disable=protected-access
from tobiko.openstack.keystone import _credentials
return self.patch(_credentials,
'has_keystone_credentials',
return_value=return_value)
def test_skip_method_unless_has_keystone_credentials_without_creds(self):
self.patch(self.default_credentials, 'credentials', None)
@keystone.skip_unless_has_keystone_credentials()
def decorated_func():
self.fail('Not skipped')
self.assertFalse(keystone.has_keystone_credentials())
self.assertRaises(self.skipException, decorated_func)
def test_skip_class_unless_has_keystone_credentials_without_creds(self):
self.patch(self.default_credentials, 'credentials', None)
@keystone.skip_unless_has_keystone_credentials()
class SkipTest(testtools.TestCase):
def test_skip(self):
super(SkipTest, self).setUp()
self.fail('Not skipped')
self.assertFalse(keystone.has_keystone_credentials())
test_case = SkipTest('test_skip')
test_result = tobiko.run_test(test_case)
tobiko.assert_test_case_was_skipped(
test_case, test_result, skip_reason='Missing Keystone credentials')
def test_skip_method_unless_has_keystone_credentials_with_creds(self):
credentials = make_credentials({})
self.patch(self.default_credentials, 'credentials', credentials)
def test_skip_method_unless_has_keystone_credentials(self):
has_keystone_credentials = self.patch_has_keystone_credentials(True)
call_args = []
@ -294,13 +271,16 @@ class SkipUnlessHasKeystoneCredentialsTest(openstack.OpenstackTest):
def decorated_func(*args, **kwargs):
call_args.append([args, kwargs])
self.assertEqual(credentials, keystone.get_keystone_credentials())
has_keystone_credentials.assert_not_called()
decorated_func(1, 2, a=1, b=2)
# pylint: disable=no-member
has_keystone_credentials.assert_called_once()
self.assertEqual(call_args, [[(1, 2), {'a': 1, 'b': 2}]])
def test_skip_class_unless_has_keystone_credentials_with_creds(self):
credentials = make_credentials({})
self.patch(self.default_credentials, 'credentials', credentials)
def test_skip_class_unless_has_keystone_credentials(self):
has_keystone_credentials = self.patch_has_keystone_credentials(True)
calls = []
@ -310,6 +290,41 @@ class SkipUnlessHasKeystoneCredentialsTest(openstack.OpenstackTest):
def test_skip(self):
calls.append(True)
self.assertEqual(credentials, keystone.get_keystone_credentials())
has_keystone_credentials.assert_not_called()
tobiko.run_test(SkipTest('test_skip'))
has_keystone_credentials.assert_called_once()
self.assertEqual(calls, [True])
def test_skip_method_unless_has_keystone_credentials_without_creds(self):
has_keystone_credentials = self.patch_has_keystone_credentials(False)
@keystone.skip_unless_has_keystone_credentials()
def decorated_func():
self.fail('Not skipped')
has_keystone_credentials.assert_not_called()
self.assertRaises(self.skipException, decorated_func)
has_keystone_credentials.assert_called_once()
def test_skip_class_unless_has_keystone_credentials_without_creds(self):
has_keystone_credentials = self.patch_has_keystone_credentials(False)
@keystone.skip_unless_has_keystone_credentials()
class SkipTest(testtools.TestCase):
def test_skip(self):
super(SkipTest, self).setUp()
self.fail('Not skipped')
test_case = SkipTest('test_skip')
has_keystone_credentials.assert_not_called()
test_result = tobiko.run_test(test_case)
tobiko.assert_test_case_was_skipped(
test_case, test_result,
skip_reason='Missing Keystone credentials')
has_keystone_credentials.assert_called_once()

View File

@ -17,7 +17,6 @@ from __future__ import absolute_import
from keystoneauth1 import session as keystonesession
import mock
import tobiko
from tobiko.openstack import keystone
from tobiko.tests.unit import openstack
@ -42,34 +41,23 @@ DEFAULT_CREDENTIALS = keystone.keystone_credentials(
class CredentialsFixture(keystone.KeystoneCredentialsFixture):
credentials = CREDENTIALS
def _get_credentials(self) -> keystone.KeystoneCredentials:
return CREDENTIALS
class DefaultCredentialsFixture(CredentialsFixture):
credentials = DEFAULT_CREDENTIALS
def _get_credentials(self) -> keystone.KeystoneCredentials:
return DEFAULT_CREDENTIALS
class KeystoneSessionFixtureTest(openstack.OpenstackTest):
default_credentials_fixture = (
'tobiko.openstack.keystone._credentials.'
'DefaultKeystoneCredentialsFixture')
def setUp(self):
super(KeystoneSessionFixtureTest, self).setUp()
from tobiko.openstack.keystone import _credentials
from tobiko.openstack.keystone import _session
tobiko.remove_fixture(self.default_credentials_fixture)
self.patch(_credentials, 'DefaultKeystoneCredentialsFixture',
DefaultCredentialsFixture)
self.patch(_session, 'SESSIONS',
_session.KeystoneSessionManager())
def test_init(self, credentials=None):
def test_init(self,
credentials: keystone.KeystoneCredentialsType = None):
# pylint: disable=protected-access
session = keystone.KeystoneSessionFixture(credentials=credentials)
self.assertIs(credentials or None, session.credentials)
self.assertIs(credentials, session._credentials)
def test_init_with_credentials(self):
self.test_init(credentials=CREDENTIALS)
@ -80,15 +68,12 @@ class KeystoneSessionFixtureTest(openstack.OpenstackTest):
def test_init_with_credentials_fixture_type(self):
self.test_init(credentials=CredentialsFixture)
def test_setup(self, credentials=None):
def test_setup(self,
credentials: keystone.KeystoneCredentialsType = None):
session = keystone.KeystoneSessionFixture(credentials=credentials)
session.setUp()
if tobiko.is_fixture(credentials):
credentials = tobiko.get_fixture(credentials)
self.assertIs(credentials.credentials, session.credentials)
else:
self.assertIs(credentials or DEFAULT_CREDENTIALS,
session.credentials)
self.assertEqual(keystone.keystone_credentials(credentials),
session.credentials)
def test_setup_with_credentials(self):
self.test_setup(credentials=CREDENTIALS)
@ -107,19 +92,22 @@ class KeystoneSessionManagerTest(openstack.OpenstackTest):
self.assertTrue(manager)
self.assertEqual({}, manager.sessions)
def test_get_session(self, credentials=None, shared=True):
def test_get_session(self,
credentials: keystone.KeystoneCredentialsType = None,
shared=True):
manager = keystone.KeystoneSessionManager()
session = manager.get_session(credentials=credentials,
shared=shared)
self.assertIs(credentials or None, session.credentials)
self.assertIsNotNone(session.credentials)
self.assertIs(keystone.keystone_credentials(credentials),
session.credentials)
self.assertIsInstance(session, keystone.KeystoneSessionFixture)
if shared:
self.assertIs(session, manager.get_session(
credentials=credentials))
shared_session = manager.get_session(credentials=credentials)
if shared in [True, None]:
self.assertIs(session, shared_session)
else:
self.assertIsNot(session, manager.get_session(
credentials=credentials))
self.assertIsNot(session, shared_session)
def test_get_session_with_credentials(self):
self.test_get_session(credentials=CREDENTIALS)
@ -141,14 +129,18 @@ class KeystoneSessionManagerTest(openstack.OpenstackTest):
session = manager.get_session(credentials=CREDENTIALS,
init_session=init_session)
self.assertIs(mock_session, session)
init_session.assert_called_once_with(credentials=CREDENTIALS)
init_session.assert_called_once_with(CREDENTIALS)
class GetKeystomeSessionTest(openstack.OpenstackTest):
def test_get_keystone_session(self, credentials=None, shared=True):
session1 = keystone.get_keystone_session(credentials=credentials,
shared=shared)
def test_get_keystone_session(
self,
credentials: keystone.KeystoneCredentialsType = None,
shared=True):
session1 = keystone.get_keystone_session(
credentials=credentials,
shared=shared)
session2 = keystone.get_keystone_session(credentials=credentials,
shared=shared)
if shared:

View File

@ -34,11 +34,17 @@ run_playbook_from_undercloud = _ansible.run_playbook_from_undercloud
OvercloudKeystoneCredentialsFixture = \
overcloud.OvercloudKeystoneCredentialsFixture
OvercloudNotFound = overcloud.OvercloudNotFound
OvercloudVersionMismatch = overcloud.OvercloudVersionMismatch
check_overcloud = overcloud.check_overcloud
find_overcloud_node = overcloud.find_overcloud_node
has_overcloud = overcloud.has_overcloud
list_overcloud_nodes = overcloud.list_overcloud_nodes
load_overcloud_rcfile = overcloud.load_overcloud_rcfile
overcloud_host_config = overcloud.overcloud_host_config
overcloud_keystone_client = overcloud.overcloud_keystone_client
overcloud_keystone_credentials = overcloud.overcloud_keystone_credentials
overcloud_keystone_session = overcloud.overcloud_keystone_session
overcloud_node_ip_address = overcloud.overcloud_node_ip_address
overcloud_ssh_client = overcloud.overcloud_ssh_client
overcloud_version = overcloud.overcloud_version
@ -50,6 +56,11 @@ get_rhosp_version = _rhosp.get_rhosp_version
TripleoTopology = topology.TripleoTopology
UndercloudKeystoneCredentialsFixture = \
undercloud.UndercloudKeystoneCredentialsFixture
UndercloudCloudsFileKeystoneCredentialsFixture = \
undercloud.UndercloudCloudsFileKeystoneCredentialsFixture
fetch_os_env = undercloud.fetch_os_env
load_undercloud_rcfile = undercloud.load_undercloud_rcfile
has_undercloud = undercloud.has_undercloud
skip_unless_has_undercloud = undercloud.skip_unlsess_has_undercloud

View File

@ -36,7 +36,8 @@ LOG = log.getLogger(__name__)
def load_overcloud_rcfile() -> typing.Dict[str, str]:
return _undercloud.fetch_os_env(*CONF.tobiko.tripleo.overcloud_rcfile)
conf = tobiko.tobiko_config().tripleo
return _undercloud.fetch_os_env(*conf.overcloud_rcfile)
@functools.lru_cache()
@ -67,17 +68,24 @@ def skip_unless_has_overcloud(min_version: tobiko.VersionType = None,
error_type=(OvercloudNotFound, OvercloudVersionMismatch))
class OvercloudKeystoneCredentialsFixture(
keystone.EnvironKeystoneCredentialsFixture):
class OvercloudKeystoneCredentialsFixtureBase(
_undercloud.UndercloudKeystoneCredentialsFixtureBase):
def get_environ(self) -> typing.Dict[str, str]:
LOG.debug('Looking for credentials from TripleO undercloud host...')
if _undercloud.has_undercloud():
return load_overcloud_rcfile()
else:
LOG.debug("TripleO undercloud host not available for fetching "
'credentials files.')
return {}
def _get_environ(self) -> typing.Dict[str, str]:
return load_overcloud_rcfile()
class OvercloudKeystoneCredentialsFixture(
OvercloudKeystoneCredentialsFixtureBase,
keystone.DelegateKeystoneCredentialsFixture):
@staticmethod
def _get_delegates() -> typing.List[keystone.KeystoneCredentialsFixture]:
return [
tobiko.get_fixture(
OvercloudCloudsFileKeystoneCredentialsFixture),
tobiko.get_fixture(
OvercloudEnvironKeystoneCredentialsFixture)]
def list_overcloud_nodes(**params):
@ -238,11 +246,6 @@ class OvercloudHostConfig(tobiko.SharedFixture):
return parameters
def setup_overcloud_keystone_crederntials():
keystone.DEFAULT_KEYSTONE_CREDENTIALS_FIXTURES.append(
OvercloudKeystoneCredentialsFixture)
def get_overcloud_nodes_dataframe(
oc_node_df_function: typing.Callable[[ssh.SSHClientType],
typing.Any]):
@ -304,3 +307,39 @@ class OvercloudNotFound(tobiko.ObjectNotFound):
class OvercloudVersionMismatch(tobiko.VersionMismatch):
message = 'overcloud version mismatch: {version} {cause}'
class OvercloudCloudsFileKeystoneCredentialsFixture(
OvercloudKeystoneCredentialsFixtureBase,
keystone.CloudsFileKeystoneCredentialsFixture):
@staticmethod
def _get_default_cloud_name() -> typing.Optional[str]:
return tobiko.tobiko_config().tripleo.overcloud_cloud_name
class OvercloudEnvironKeystoneCredentialsFixture(
OvercloudKeystoneCredentialsFixtureBase,
keystone.EnvironKeystoneCredentialsFixture):
pass
def overcloud_keystone_session() -> keystone.KeystoneSession:
credentials = overcloud_keystone_credentials()
return keystone.get_keystone_session(credentials=credentials)
def overcloud_keystone_credentials() -> keystone.KeystoneCredentialsFixture:
return tobiko.get_fixture(OvercloudKeystoneCredentialsFixture)
def overcloud_keystone_client() -> keystone.KeystoneClient:
session = overcloud_keystone_session()
return keystone.get_keystone_client(session=session)
def setup_overcloud_keystone_credentials():
if has_overcloud():
keystone.register_default_keystone_credentials(
credentials=overcloud_keystone_credentials(),
position=0)

View File

@ -49,7 +49,8 @@ class InvalidRCFile(tobiko.TobikoException):
message = "Invalid RC file: {rcfile}"
def fetch_os_env(rcfile, *rcfiles) -> typing.Dict[str, str]:
@functools.lru_cache()
def fetch_os_env(rcfile: str, *rcfiles: str) -> typing.Dict[str, str]:
rcfiles = (rcfile,) + rcfiles
LOG.debug('Fetching OS environment variables from TripleO undercloud '
f'host files: {",".join(rcfiles)}')
@ -81,26 +82,38 @@ def fetch_os_env(rcfile, *rcfiles) -> typing.Dict[str, str]:
def load_undercloud_rcfile() -> typing.Dict[str, str]:
return fetch_os_env(*CONF.tobiko.tripleo.undercloud_rcfile)
conf = tobiko.tobiko_config().tripleo
return fetch_os_env(*conf.undercloud_rcfile)
class EnvironUndercloudKeystoneCredentialsFixture(
keystone.EnvironKeystoneCredentialsFixture):
def get_environ(self) -> typing.Dict[str, str]:
class UndercloudKeystoneCredentialsFixtureBase(
keystone.KeystoneCredentialsFixture):
def _get_credentials(self) -> keystone.KeystoneCredentials:
if not has_undercloud():
raise keystone.NoSuchKeystoneCredentials()
return super()._get_credentials()
def _get_connection(self) -> sh.ShellConnectionType:
return undercloud_ssh_client()
def _get_environ(self) -> typing.Dict[str, str]:
return load_undercloud_rcfile()
class CloudsFileUndercloudKeystoneCredentialsFixture(
class UndercloudCloudsFileKeystoneCredentialsFixture(
UndercloudKeystoneCredentialsFixtureBase,
keystone.CloudsFileKeystoneCredentialsFixture):
def __init__(self, credentials=None, cloud_name=None,
clouds_content=None, clouds_file=None, clouds_files=None):
cloud_name = cloud_name or load_undercloud_rcfile()['OS_CLOUD']
@staticmethod
def _get_default_cloud_name() -> typing.Optional[str]:
return tobiko.tobiko_config().tripleo.undercloud_cloud_name
super(CloudsFileUndercloudKeystoneCredentialsFixture, self).__init__(
credentials=credentials, cloud_name=cloud_name,
clouds_content=clouds_content, clouds_file=clouds_file,
clouds_files=clouds_files)
class UndercloudEnvironKeystoneCredentialsFixture(
UndercloudKeystoneCredentialsFixtureBase,
keystone.EnvironKeystoneCredentialsFixture):
pass
@functools.lru_cache()
@ -110,7 +123,11 @@ def has_undercloud(min_version: tobiko.VersionType = None,
check_undercloud(min_version=min_version,
max_version=max_version)
except (UndercloudNotFound, UndercloudVersionMismatch) as ex:
LOG.debug(f'TripleO undercloud host not found: {ex.cause}')
LOG.debug(f'TripleO undercloud host not found:\n'
f'{ex}')
return False
except Exception:
LOG.exception('Error looking for undercloud host')
return False
else:
LOG.debug('TripleO undercloud host found')
@ -177,22 +194,26 @@ def undercloud_keystone_client():
return keystone.get_keystone_client(session=session)
def _get_keystone_credentials():
environ = load_undercloud_rcfile()
if 'OS_CLOUD' in environ:
credentials = CloudsFileUndercloudKeystoneCredentialsFixture
else:
credentials = EnvironUndercloudKeystoneCredentialsFixture
return credentials
class UndercloudKeystoneCredentialsFixture(
UndercloudKeystoneCredentialsFixtureBase,
keystone.DelegateKeystoneCredentialsFixture):
@staticmethod
def _get_delegates() -> typing.List[keystone.KeystoneCredentialsFixture]:
return [
tobiko.get_fixture(
UndercloudCloudsFileKeystoneCredentialsFixture),
tobiko.get_fixture(
UndercloudEnvironKeystoneCredentialsFixture)]
def undercloud_keystone_session():
return keystone.get_keystone_session(
credentials=_get_keystone_credentials())
def undercloud_keystone_session() -> keystone.KeystoneSession:
credentials = undercloud_keystone_credentials()
return keystone.get_keystone_session(credentials=credentials)
def undercloud_keystone_credentials():
return tobiko.setup_fixture(_get_keystone_credentials()).credentials
def undercloud_keystone_credentials() -> keystone.KeystoneCredentialsFixture:
return tobiko.get_fixture(UndercloudKeystoneCredentialsFixture)
@functools.lru_cache()
@ -201,7 +222,6 @@ def undercloud_version() -> tobiko.Version:
return _rhosp.get_rhosp_version(connection=ssh_client)
@functools.lru_cache()
def check_undercloud(min_version: tobiko.Version = None,
max_version: tobiko.Version = None):
try:

View File

@ -37,6 +37,11 @@ OPTIONS = [
cfg.ListOpt('undercloud_rcfile',
default=['~/stackrc'],
help="Undercloud RC filename"),
cfg.StrOpt('undercloud_cloud_name',
default='undercloud',
help='undercloud cloud name to be used for loading credentials '
'from the undercloud clouds files'),
# Overcloud options
cfg.IntOpt('overcloud_ssh_port',
@ -51,6 +56,10 @@ OPTIONS = [
cfg.ListOpt('overcloud_rcfile',
default=['~/overcloudrc', '~/qe-Cloud-0rc'],
help="Overcloud RC filenames"),
cfg.StrOpt('overcloud_cloud_name',
default='overcloud',
help='overcloud cloud name to be used for loading credentials '
'from the overcloud clouds files'),
cfg.IntOpt('overcloud_ip_version',
help=("Default IP address version to be used to connect to "
"overcloud nodes ")),
@ -61,7 +70,6 @@ OPTIONS = [
cfg.StrOpt('inventory_file',
default='.ansible/inventory/tripleo.yaml',
help="path to where to export tripleo inventory file"),
]
@ -80,5 +88,5 @@ def setup_tobiko_config(conf):
from tobiko.tripleo import topology
_ansible.setup_undercloud_ansible_playbook()
overcloud.setup_overcloud_keystone_crederntials()
overcloud.setup_overcloud_keystone_credentials()
topology.setup_tripleo_topology()