|
|
|
@ -16,6 +16,7 @@ from __future__ import absolute_import
|
|
|
|
|
import collections
|
|
|
|
|
import os
|
|
|
|
|
import sys
|
|
|
|
|
import typing # noqa
|
|
|
|
|
|
|
|
|
|
from oslo_log import log
|
|
|
|
|
import testtools
|
|
|
|
@ -27,28 +28,6 @@ import tobiko
|
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_keystone_credentials(obj=None):
|
|
|
|
|
if not obj:
|
|
|
|
|
return default_keystone_credentials()
|
|
|
|
|
if tobiko.is_fixture(obj):
|
|
|
|
|
obj = tobiko.get_fixture(obj)
|
|
|
|
|
if isinstance(obj, KeystoneCredentialsFixture):
|
|
|
|
|
obj = tobiko.setup_fixture(obj).credentials
|
|
|
|
|
if isinstance(obj, KeystoneCredentials):
|
|
|
|
|
return obj
|
|
|
|
|
|
|
|
|
|
message = "Can't get {!r} object from {!r}".format(
|
|
|
|
|
KeystoneCredentials, obj)
|
|
|
|
|
raise TypeError(message)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def default_keystone_credentials():
|
|
|
|
|
credentials = tobiko.setup_fixture(DefaultKeystoneCredentialsFixture
|
|
|
|
|
).credentials
|
|
|
|
|
tobiko.check_valid_type(credentials, KeystoneCredentials)
|
|
|
|
|
return credentials
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class KeystoneCredentials(collections.namedtuple(
|
|
|
|
|
'KeystoneCredentials', ['api_version',
|
|
|
|
|
'auth_url',
|
|
|
|
@ -87,6 +66,33 @@ class KeystoneCredentials(collections.namedtuple(
|
|
|
|
|
raise InvalidKeystoneCredentials(credentials=self, reason=reason)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NoSuchCredentialsError(tobiko.TobikoException):
|
|
|
|
|
message = "No such credentials from any of: {fixtures}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_keystone_credentials(obj=None) -> KeystoneCredentials:
|
|
|
|
|
if not obj:
|
|
|
|
|
return default_keystone_credentials()
|
|
|
|
|
if tobiko.is_fixture(obj):
|
|
|
|
|
obj = tobiko.get_fixture(obj)
|
|
|
|
|
if isinstance(obj, KeystoneCredentialsFixture):
|
|
|
|
|
obj = tobiko.setup_fixture(obj).credentials
|
|
|
|
|
if isinstance(obj, KeystoneCredentials):
|
|
|
|
|
return obj
|
|
|
|
|
|
|
|
|
|
message = "Can't get {!r} object from {!r}".format(
|
|
|
|
|
KeystoneCredentials, obj)
|
|
|
|
|
raise TypeError(message)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def default_keystone_credentials() -> KeystoneCredentials:
|
|
|
|
|
credentials = tobiko.setup_fixture(
|
|
|
|
|
DefaultKeystoneCredentialsFixture).credentials
|
|
|
|
|
if credentials:
|
|
|
|
|
tobiko.check_valid_type(credentials, KeystoneCredentials)
|
|
|
|
|
return credentials
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def keystone_credentials(api_version=None,
|
|
|
|
|
auth_url=None,
|
|
|
|
|
username=None,
|
|
|
|
@ -97,7 +103,7 @@ def keystone_credentials(api_version=None,
|
|
|
|
|
project_domain_name=None,
|
|
|
|
|
project_domain_id=None,
|
|
|
|
|
trust_id=None,
|
|
|
|
|
cls=KeystoneCredentials):
|
|
|
|
|
cls=KeystoneCredentials) -> KeystoneCredentials:
|
|
|
|
|
return cls(api_version=api_version,
|
|
|
|
|
auth_url=auth_url,
|
|
|
|
|
username=username,
|
|
|
|
@ -116,9 +122,10 @@ class InvalidKeystoneCredentials(tobiko.TobikoException):
|
|
|
|
|
|
|
|
|
|
class KeystoneCredentialsFixture(tobiko.SharedFixture):
|
|
|
|
|
|
|
|
|
|
credentials = None
|
|
|
|
|
credentials: typing.Optional[KeystoneCredentials] = None
|
|
|
|
|
|
|
|
|
|
def __init__(self, credentials=None):
|
|
|
|
|
def __init__(self,
|
|
|
|
|
credentials: typing.Optional[KeystoneCredentials] = None):
|
|
|
|
|
super(KeystoneCredentialsFixture, self).__init__()
|
|
|
|
|
if credentials:
|
|
|
|
|
self.credentials = credentials
|
|
|
|
@ -143,15 +150,17 @@ class KeystoneCredentialsFixture(tobiko.SharedFixture):
|
|
|
|
|
def cleanup_credentials(self):
|
|
|
|
|
del self.credentials
|
|
|
|
|
|
|
|
|
|
def get_credentials(self):
|
|
|
|
|
def get_credentials(self) -> typing.Optional[KeystoneCredentials]:
|
|
|
|
|
return self.credentials
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EnvironKeystoneCredentialsFixture(KeystoneCredentialsFixture):
|
|
|
|
|
|
|
|
|
|
environ = None
|
|
|
|
|
environ: typing.Optional[typing.Dict[str, str]] = None
|
|
|
|
|
|
|
|
|
|
def __init__(self, credentials=None, environ=None):
|
|
|
|
|
def __init__(self,
|
|
|
|
|
credentials: typing.Optional[KeystoneCredentials] = None,
|
|
|
|
|
environ: typing.Optional[typing.Dict[str, str]] = None):
|
|
|
|
|
super(EnvironKeystoneCredentialsFixture, self).__init__(
|
|
|
|
|
credentials=credentials)
|
|
|
|
|
if environ is not None:
|
|
|
|
@ -162,10 +171,10 @@ class EnvironKeystoneCredentialsFixture(KeystoneCredentialsFixture):
|
|
|
|
|
self.environ = self.get_environ()
|
|
|
|
|
super(EnvironKeystoneCredentialsFixture, self).setup_fixture()
|
|
|
|
|
|
|
|
|
|
def get_environ(self):
|
|
|
|
|
return os.environ
|
|
|
|
|
def get_environ(self) -> typing.Optional[typing.Dict[str, str]]:
|
|
|
|
|
return dict(os.environ)
|
|
|
|
|
|
|
|
|
|
def get_credentials(self):
|
|
|
|
|
def get_credentials(self) -> typing.Optional[KeystoneCredentials]:
|
|
|
|
|
auth_url = self.get_env('OS_AUTH_URL')
|
|
|
|
|
if not auth_url:
|
|
|
|
|
LOG.debug("OS_AUTH_URL environment variable not defined")
|
|
|
|
@ -214,19 +223,38 @@ class EnvironKeystoneCredentialsFixture(KeystoneCredentialsFixture):
|
|
|
|
|
project_domain_id=project_domain_id,
|
|
|
|
|
trust_id=trust_id)
|
|
|
|
|
|
|
|
|
|
def get_env(self, name):
|
|
|
|
|
return self.environ.get(name, None)
|
|
|
|
|
def get_env(self, name) -> typing.Optional[str]:
|
|
|
|
|
environ = self.environ
|
|
|
|
|
if environ is None:
|
|
|
|
|
return None
|
|
|
|
|
else:
|
|
|
|
|
return environ.get(name)
|
|
|
|
|
|
|
|
|
|
def get_int_env(self, name):
|
|
|
|
|
def get_int_env(self, name) -> typing.Optional[int]:
|
|
|
|
|
value = self.get_env(name=name)
|
|
|
|
|
if value is not None:
|
|
|
|
|
value = int(value)
|
|
|
|
|
return value
|
|
|
|
|
if value is None:
|
|
|
|
|
return None
|
|
|
|
|
else:
|
|
|
|
|
return int(value)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def has_keystone_credentials(obj=None) -> bool:
|
|
|
|
|
try:
|
|
|
|
|
credentials = get_keystone_credentials(obj)
|
|
|
|
|
except NoSuchCredentialsError:
|
|
|
|
|
return False
|
|
|
|
|
else:
|
|
|
|
|
return credentials is not None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def skip_unless_has_keystone_credentials(*args, **kwargs):
|
|
|
|
|
return tobiko.skip_unless('Missing Keystone credentials',
|
|
|
|
|
has_keystone_credentials, *args, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ConfigKeystoneCredentialsFixture(KeystoneCredentialsFixture):
|
|
|
|
|
|
|
|
|
|
def get_credentials(self):
|
|
|
|
|
def get_credentials(self) -> typing.Optional[KeystoneCredentials]:
|
|
|
|
|
from tobiko import config
|
|
|
|
|
conf = config.CONF.tobiko.keystone
|
|
|
|
|
auth_url = conf.auth_url
|
|
|
|
@ -267,7 +295,7 @@ class DefaultKeystoneCredentialsFixture(KeystoneCredentialsFixture):
|
|
|
|
|
|
|
|
|
|
fixtures = DEFAULT_KEYSTONE_CREDENTIALS_FIXTURES
|
|
|
|
|
|
|
|
|
|
def get_credentials(self):
|
|
|
|
|
def get_credentials(self) -> typing.Optional[KeystoneCredentials]:
|
|
|
|
|
errors = []
|
|
|
|
|
for fixture in self.fixtures:
|
|
|
|
|
try:
|
|
|
|
@ -290,12 +318,10 @@ class DefaultKeystoneCredentialsFixture(KeystoneCredentialsFixture):
|
|
|
|
|
elif errors:
|
|
|
|
|
raise testtools.MultipleExceptions(errors)
|
|
|
|
|
|
|
|
|
|
raise ValueError("No such credentials from any of: \n " +
|
|
|
|
|
'\n '.join(tobiko.get_fixture_name(fixture)
|
|
|
|
|
for fixture in self.fixtures))
|
|
|
|
|
raise NoSuchCredentialsError(fixtures=self.fixtures)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def api_version_from_url(auth_url):
|
|
|
|
|
def api_version_from_url(auth_url) -> typing.Optional[int]:
|
|
|
|
|
if auth_url.endswith('/v2.0'):
|
|
|
|
|
LOG.debug('Got Keystone API version 2 from auth_url: %r', auth_url)
|
|
|
|
|
return 2
|
|
|
|
|