Improve keystone client 'lazyness'
Change-Id: Ib6ac662344edaf5167da532991e34c430385001f
This commit is contained in:
parent
1dfc6ccc6b
commit
5996055ab6
|
@ -17,10 +17,13 @@ from tobiko.openstack.keystone import _credentials
|
||||||
from tobiko.openstack.keystone import _session
|
from tobiko.openstack.keystone import _session
|
||||||
|
|
||||||
keystone_credentials = _credentials.keystone_credentials
|
keystone_credentials = _credentials.keystone_credentials
|
||||||
|
get_keystone_credentials = _credentials.get_keystone_credentials
|
||||||
default_keystone_credentials = _credentials.default_keystone_credentials
|
default_keystone_credentials = _credentials.default_keystone_credentials
|
||||||
KeystoneCredentials = _credentials.KeystoneCredentials
|
KeystoneCredentials = _credentials.KeystoneCredentials
|
||||||
|
KeystoneCredentialsFixture = _credentials.KeystoneCredentialsFixture
|
||||||
InvalidKeystoneCredentials = _credentials.InvalidKeystoneCredentials
|
InvalidKeystoneCredentials = _credentials.InvalidKeystoneCredentials
|
||||||
|
|
||||||
|
keystone_session = _session.keystone_session
|
||||||
KeystoneSessionFixture = _session.KeystoneSessionFixture
|
KeystoneSessionFixture = _session.KeystoneSessionFixture
|
||||||
KeystoneSessionManager = _session.KeystoneSessionManager
|
KeystoneSessionManager = _session.KeystoneSessionManager
|
||||||
get_keystone_session = _session.get_keystone_session
|
get_keystone_session = _session.get_keystone_session
|
||||||
|
|
|
@ -23,8 +23,26 @@ import tobiko
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def get_keystone_credentials(obj=None):
|
||||||
|
if not obj:
|
||||||
|
return default_keystone_credentials()
|
||||||
|
if tobiko.is_fixture(obj):
|
||||||
|
obj = tobiko.get_fixture(obj)
|
||||||
|
if isinstance(obj, KeystoneCredentialsFixture):
|
||||||
|
obj = tobiko.setup_fixture(obj).credentials
|
||||||
|
if isinstance(obj, KeystoneCredentials):
|
||||||
|
return obj
|
||||||
|
|
||||||
|
message = "Can't get {!r} object from {!r}".format(
|
||||||
|
KeystoneCredentials, obj)
|
||||||
|
raise TypeError(message)
|
||||||
|
|
||||||
|
|
||||||
def default_keystone_credentials():
|
def default_keystone_credentials():
|
||||||
return tobiko.setup_fixture(DefaultKeystoneCredentialsFixture).credentials
|
credentials = tobiko.setup_fixture(DefaultKeystoneCredentialsFixture
|
||||||
|
).credentials
|
||||||
|
tobiko.check_valid_type(credentials, KeystoneCredentials)
|
||||||
|
return credentials
|
||||||
|
|
||||||
|
|
||||||
class KeystoneCredentials(collections.namedtuple(
|
class KeystoneCredentials(collections.namedtuple(
|
||||||
|
@ -93,16 +111,47 @@ class InvalidKeystoneCredentials(tobiko.TobikoException):
|
||||||
message = "invalid Keystone credentials; {reason!s}; {credentials!r}"
|
message = "invalid Keystone credentials; {reason!s}; {credentials!r}"
|
||||||
|
|
||||||
|
|
||||||
class EnvironKeystoneCredentialsFixture(tobiko.SharedFixture):
|
class KeystoneCredentialsFixture(tobiko.SharedFixture):
|
||||||
|
|
||||||
credentials = None
|
credentials = None
|
||||||
|
|
||||||
|
def __init__(self, credentials=None):
|
||||||
|
super(KeystoneCredentialsFixture, self).__init__()
|
||||||
|
if credentials:
|
||||||
|
self.credentials = credentials
|
||||||
|
|
||||||
def setup_fixture(self):
|
def setup_fixture(self):
|
||||||
|
self.setup_credentials()
|
||||||
|
|
||||||
|
def setup_credentials(self):
|
||||||
|
credentials = self.credentials
|
||||||
|
if not self.credentials:
|
||||||
|
credentials = self.get_credentials()
|
||||||
|
if credentials:
|
||||||
|
try:
|
||||||
|
credentials.validate()
|
||||||
|
except InvalidKeystoneCredentials as ex:
|
||||||
|
LOG.info("No such valid credentials from environment: %r",
|
||||||
|
ex)
|
||||||
|
else:
|
||||||
|
self.addCleanup(self.cleanup_credentials)
|
||||||
|
self.credentials = credentials
|
||||||
|
|
||||||
|
def cleanup_credentials(self):
|
||||||
|
del self.credentials
|
||||||
|
|
||||||
|
def get_credentials(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class EnvironKeystoneCredentialsFixture(KeystoneCredentialsFixture):
|
||||||
|
|
||||||
|
def get_credentials(self):
|
||||||
from tobiko import config
|
from tobiko import config
|
||||||
auth_url = config.get_env('OS_AUTH_URL')
|
auth_url = config.get_env('OS_AUTH_URL')
|
||||||
if not auth_url:
|
if not auth_url:
|
||||||
LOG.debug("OS_AUTH_URL environment variable not defined")
|
LOG.debug("OS_AUTH_URL environment variable not defined")
|
||||||
return
|
return None
|
||||||
|
|
||||||
api_version = (
|
api_version = (
|
||||||
config.get_int_env('OS_IDENTITY_API_VERSION') or
|
config.get_int_env('OS_IDENTITY_API_VERSION') or
|
||||||
|
@ -117,7 +166,7 @@ class EnvironKeystoneCredentialsFixture(tobiko.SharedFixture):
|
||||||
config.get_env('OS_PROJECT_ID') or
|
config.get_env('OS_PROJECT_ID') or
|
||||||
config.get_env('OS_TENANT_ID'))
|
config.get_env('OS_TENANT_ID'))
|
||||||
if api_version == 2:
|
if api_version == 2:
|
||||||
credentials = keystone_credentials(
|
return keystone_credentials(
|
||||||
api_version=api_version,
|
api_version=api_version,
|
||||||
auth_url=auth_url,
|
auth_url=auth_url,
|
||||||
username=username,
|
username=username,
|
||||||
|
@ -135,7 +184,7 @@ class EnvironKeystoneCredentialsFixture(tobiko.SharedFixture):
|
||||||
project_domain_id = (
|
project_domain_id = (
|
||||||
config.get_env('OS_PROJECT_DOMAIN_ID'))
|
config.get_env('OS_PROJECT_DOMAIN_ID'))
|
||||||
trust_id = config.get_env('OS_TRUST_ID')
|
trust_id = config.get_env('OS_TRUST_ID')
|
||||||
credentials = keystone_credentials(
|
return keystone_credentials(
|
||||||
api_version=api_version,
|
api_version=api_version,
|
||||||
auth_url=auth_url,
|
auth_url=auth_url,
|
||||||
username=username,
|
username=username,
|
||||||
|
@ -146,38 +195,30 @@ class EnvironKeystoneCredentialsFixture(tobiko.SharedFixture):
|
||||||
project_domain_name=project_domain_name,
|
project_domain_name=project_domain_name,
|
||||||
project_domain_id=project_domain_id,
|
project_domain_id=project_domain_id,
|
||||||
trust_id=trust_id)
|
trust_id=trust_id)
|
||||||
try:
|
|
||||||
credentials.validate()
|
|
||||||
except InvalidKeystoneCredentials as ex:
|
|
||||||
LOG.info("No such valid credentials from environment: %r", ex)
|
|
||||||
else:
|
|
||||||
self.credentials = credentials
|
|
||||||
|
|
||||||
|
|
||||||
class ConfigKeystoneCredentialsFixture(tobiko.SharedFixture):
|
class ConfigKeystoneCredentialsFixture(KeystoneCredentialsFixture):
|
||||||
|
|
||||||
credentials = None
|
def get_credentials(self):
|
||||||
|
|
||||||
def setup_fixture(self):
|
|
||||||
from tobiko import config
|
from tobiko import config
|
||||||
conf = config.CONF.tobiko.keystone
|
conf = config.CONF.tobiko.keystone
|
||||||
auth_url = conf.auth_url
|
auth_url = conf.auth_url
|
||||||
if not auth_url:
|
if not auth_url:
|
||||||
LOG.debug("auth_url option not defined in 'keystone' section of "
|
LOG.debug("auth_url option not defined in 'keystone' section of "
|
||||||
"tobiko.conf")
|
"tobiko.conf")
|
||||||
return
|
return None
|
||||||
|
|
||||||
api_version = (conf.api_version or
|
api_version = (conf.api_version or
|
||||||
api_version_from_url(auth_url))
|
api_version_from_url(auth_url))
|
||||||
if api_version == 2:
|
if api_version == 2:
|
||||||
credentials = keystone_credentials(
|
return keystone_credentials(
|
||||||
api_version=api_version,
|
api_version=api_version,
|
||||||
auth_url=auth_url,
|
auth_url=auth_url,
|
||||||
username=conf.username,
|
username=conf.username,
|
||||||
password=conf.password,
|
password=conf.password,
|
||||||
project_name=conf.project_name)
|
project_name=conf.project_name)
|
||||||
else:
|
else:
|
||||||
credentials = keystone_credentials(
|
return keystone_credentials(
|
||||||
api_version=api_version,
|
api_version=api_version,
|
||||||
auth_url=auth_url,
|
auth_url=auth_url,
|
||||||
username=conf.username,
|
username=conf.username,
|
||||||
|
@ -188,12 +229,6 @@ class ConfigKeystoneCredentialsFixture(tobiko.SharedFixture):
|
||||||
project_domain_name=conf.project_domain_name,
|
project_domain_name=conf.project_domain_name,
|
||||||
project_domain_id=conf.project_domain_id,
|
project_domain_id=conf.project_domain_id,
|
||||||
trust_id=conf.trust_id)
|
trust_id=conf.trust_id)
|
||||||
try:
|
|
||||||
credentials.validate()
|
|
||||||
except InvalidKeystoneCredentials as ex:
|
|
||||||
LOG.info("No such valid credentials from tobiko.conf: %r", ex)
|
|
||||||
else:
|
|
||||||
self.credentials = credentials
|
|
||||||
|
|
||||||
|
|
||||||
DEFAULT_KEYSTONE_CREDENTIALS_FIXTURES = [
|
DEFAULT_KEYSTONE_CREDENTIALS_FIXTURES = [
|
||||||
|
@ -201,32 +236,30 @@ DEFAULT_KEYSTONE_CREDENTIALS_FIXTURES = [
|
||||||
ConfigKeystoneCredentialsFixture]
|
ConfigKeystoneCredentialsFixture]
|
||||||
|
|
||||||
|
|
||||||
class DefaultKeystoneCredentialsFixture(tobiko.SharedFixture):
|
class DefaultKeystoneCredentialsFixture(KeystoneCredentialsFixture):
|
||||||
|
|
||||||
fixtures = DEFAULT_KEYSTONE_CREDENTIALS_FIXTURES
|
fixtures = DEFAULT_KEYSTONE_CREDENTIALS_FIXTURES
|
||||||
credentials = None
|
|
||||||
|
|
||||||
def setup_fixture(self):
|
def get_credentials(self):
|
||||||
for fixture in self.fixtures:
|
for fixture in self.fixtures:
|
||||||
try:
|
try:
|
||||||
credentials = tobiko.setup_fixture(fixture).credentials
|
credentials = tobiko.setup_fixture(fixture).credentials
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception("Error setting up fixture %r", fixture)
|
LOG.exception("Error setting up fixture %r", fixture)
|
||||||
else:
|
continue
|
||||||
if credentials:
|
|
||||||
LOG.info("Got default credentials from %r: %r",
|
if credentials:
|
||||||
fixture, credentials)
|
LOG.info("Got default credentials from fixture %r: %r",
|
||||||
self.credentials = credentials
|
fixture, credentials)
|
||||||
return credentials
|
return credentials
|
||||||
raise RuntimeError('Unable to found any valid credentials')
|
|
||||||
|
|
||||||
|
|
||||||
def api_version_from_url(auth_url):
|
def api_version_from_url(auth_url):
|
||||||
if auth_url.endswith('/v2.0'):
|
if auth_url.endswith('/v2.0'):
|
||||||
LOG.info('Got Keystone API version 2 from auth_url: %r', auth_url)
|
LOG.debug('Got Keystone API version 2 from auth_url: %r', auth_url)
|
||||||
return 2
|
return 2
|
||||||
elif auth_url.endswith('/v3'):
|
elif auth_url.endswith('/v3'):
|
||||||
LOG.info('Got Keystone API version 3 from auth_url: %r', auth_url)
|
LOG.debug('Got Keystone API version 3 from auth_url: %r', auth_url)
|
||||||
return 3
|
return 3
|
||||||
else:
|
else:
|
||||||
LOG.warning('Unable to get Keystone API version from auth_url: %r',
|
LOG.warning('Unable to get Keystone API version from auth_url: %r',
|
||||||
|
|
|
@ -24,57 +24,80 @@ from tobiko.openstack.keystone import _credentials
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def keystone_session(obj):
|
||||||
|
if not obj:
|
||||||
|
return default_keystone_session()
|
||||||
|
if tobiko.is_fixture(obj):
|
||||||
|
obj = tobiko.get_fixture(obj)
|
||||||
|
if isinstance(obj, KeystoneSessionFixture):
|
||||||
|
obj = tobiko.setup_fixture(obj).session
|
||||||
|
if isinstance(obj, _session.Session):
|
||||||
|
return obj
|
||||||
|
raise TypeError("Can't get {!r} object from {!r}".format(
|
||||||
|
_session.Session, obj))
|
||||||
|
|
||||||
|
|
||||||
class KeystoneSessionFixture(tobiko.SharedFixture):
|
class KeystoneSessionFixture(tobiko.SharedFixture):
|
||||||
|
|
||||||
session = None
|
session = None
|
||||||
credentials = None
|
credentials = None
|
||||||
credentials_fixture = None
|
|
||||||
|
|
||||||
def __init__(self, credentials=None):
|
VALID_CREDENTIALS_TYPES = (_credentials.KeystoneCredentials,
|
||||||
|
_credentials.KeystoneCredentialsFixture,
|
||||||
|
type, str)
|
||||||
|
|
||||||
|
def __init__(self, credentials=None, session=None):
|
||||||
super(KeystoneSessionFixture, self).__init__()
|
super(KeystoneSessionFixture, self).__init__()
|
||||||
if credentials:
|
if credentials:
|
||||||
if tobiko.is_fixture(credentials):
|
tobiko.check_valid_type(credentials, *self.VALID_CREDENTIALS_TYPES)
|
||||||
self.credentials_fixture = credentials
|
self.credentials = credentials
|
||||||
else:
|
if session:
|
||||||
self.credentials = credentials
|
self.session = session
|
||||||
|
|
||||||
def setup_fixture(self):
|
def setup_fixture(self):
|
||||||
self.setup_credentials()
|
self.setup_session()
|
||||||
self.setup_session(credentials=self.credentials)
|
|
||||||
|
|
||||||
def setup_credentials(self):
|
def setup_session(self):
|
||||||
credentials_fixture = self.credentials_fixture
|
session = self.session
|
||||||
if credentials_fixture:
|
if not session:
|
||||||
self.credentials = tobiko.setup_fixture(
|
credentials = _credentials.get_keystone_credentials(
|
||||||
credentials_fixture).credentials
|
self.credentials)
|
||||||
elif not self.credentials:
|
|
||||||
self.credentials = _credentials.default_keystone_credentials()
|
|
||||||
|
|
||||||
def setup_session(self, credentials):
|
LOG.debug("Create Keystone session with credentials %r",
|
||||||
LOG.debug("Create session for credentials %r", credentials)
|
credentials)
|
||||||
loader = loading.get_plugin_loader('password')
|
credentials.validate()
|
||||||
params = credentials.to_dict()
|
loader = loading.get_plugin_loader('password')
|
||||||
del params['api_version'] # parameter not required
|
params = credentials.to_dict()
|
||||||
auth = loader.load_from_options(**params)
|
del params['api_version'] # parameter not required
|
||||||
self.session = _session.Session(auth=auth, verify=False)
|
auth = loader.load_from_options(**params)
|
||||||
|
self.session = _session.Session(auth=auth, verify=False)
|
||||||
|
self.credentials = credentials
|
||||||
|
|
||||||
|
|
||||||
class KeystoneSessionManager(object):
|
class KeystoneSessionManager(object):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._sessions = {}
|
self.sessions = {}
|
||||||
|
|
||||||
def get_session(self, credentials=None, shared=True, init_session=None):
|
def get_session(self, credentials=None, init_session=None, shared=True):
|
||||||
if shared:
|
if shared:
|
||||||
|
shared_key, session = self.get_shared_session(credentials)
|
||||||
|
else:
|
||||||
|
shared_key = session = None
|
||||||
|
return session or self.create_session(credentials=credentials,
|
||||||
|
init_session=init_session,
|
||||||
|
shared=shared,
|
||||||
|
shared_key=shared_key)
|
||||||
|
|
||||||
|
def get_shared_session(self, credentials):
|
||||||
|
if tobiko.is_fixture(credentials):
|
||||||
|
key = tobiko.get_fixture_name(credentials)
|
||||||
|
else:
|
||||||
key = credentials
|
key = credentials
|
||||||
if credentials:
|
return key, self.sessions.get(key)
|
||||||
if tobiko.is_fixture(credentials):
|
|
||||||
key = tobiko.get_fixture_name(credentials)
|
|
||||||
|
|
||||||
session = self._sessions.get(key)
|
|
||||||
if session:
|
|
||||||
return session
|
|
||||||
|
|
||||||
|
def create_session(self, credentials=None, init_session=None, shared=True,
|
||||||
|
shared_key=None):
|
||||||
init_session = init_session or KeystoneSessionFixture
|
init_session = init_session or KeystoneSessionFixture
|
||||||
assert callable(init_session)
|
assert callable(init_session)
|
||||||
LOG.debug('Initialize Keystone session: %r(credentials=%r)',
|
LOG.debug('Initialize Keystone session: %r(credentials=%r)',
|
||||||
|
@ -82,17 +105,21 @@ class KeystoneSessionManager(object):
|
||||||
session = init_session(credentials=credentials)
|
session = init_session(credentials=credentials)
|
||||||
|
|
||||||
if shared:
|
if shared:
|
||||||
self._sessions[key] = session
|
self.sessions[shared_key] = session
|
||||||
return session
|
return session
|
||||||
|
|
||||||
|
|
||||||
SESSIONS = KeystoneSessionManager()
|
SESSIONS = KeystoneSessionManager()
|
||||||
|
|
||||||
|
|
||||||
|
def default_keystone_session(shared=True, init_session=None, manager=None):
|
||||||
|
return get_keystone_session(shared=shared, init_session=init_session,
|
||||||
|
manager=manager)
|
||||||
|
|
||||||
|
|
||||||
def get_keystone_session(credentials=None, shared=True, init_session=None,
|
def get_keystone_session(credentials=None, shared=True, init_session=None,
|
||||||
manager=None):
|
manager=None):
|
||||||
manager = manager or SESSIONS
|
manager = manager or SESSIONS
|
||||||
session = manager.get_session(credentials=credentials, shared=shared,
|
session = manager.get_session(credentials=credentials, shared=shared,
|
||||||
init_session=init_session)
|
init_session=init_session)
|
||||||
session.setUp()
|
return tobiko.setup_fixture(session).session
|
||||||
return session.session
|
|
||||||
|
|
|
@ -14,8 +14,6 @@
|
||||||
# under the License.
|
# under the License.
|
||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
|
|
||||||
import inspect
|
|
||||||
|
|
||||||
from keystoneauth1 import session as keystonesession
|
from keystoneauth1 import session as keystonesession
|
||||||
import mock
|
import mock
|
||||||
|
|
||||||
|
@ -34,22 +32,6 @@ CREDENTIALS = keystone.keystone_credentials(
|
||||||
project_domain_name='Default')
|
project_domain_name='Default')
|
||||||
|
|
||||||
|
|
||||||
class CredentialsFixture(tobiko.SharedFixture):
|
|
||||||
|
|
||||||
credentials = None
|
|
||||||
|
|
||||||
def setup_fixture(self):
|
|
||||||
self.credentials = CREDENTIALS
|
|
||||||
|
|
||||||
|
|
||||||
class DefaultCredentialsFixture(tobiko.SharedFixture):
|
|
||||||
|
|
||||||
credentials = None
|
|
||||||
|
|
||||||
def setup_fixture(self):
|
|
||||||
self.credentials = DEFAULT_CREDENTIALS
|
|
||||||
|
|
||||||
|
|
||||||
DEFAULT_CREDENTIALS = keystone.keystone_credentials(
|
DEFAULT_CREDENTIALS = keystone.keystone_credentials(
|
||||||
api_version=2,
|
api_version=2,
|
||||||
auth_url='http://127.0.0.1:5000/identiy/v2.0',
|
auth_url='http://127.0.0.1:5000/identiy/v2.0',
|
||||||
|
@ -58,23 +40,17 @@ DEFAULT_CREDENTIALS = keystone.keystone_credentials(
|
||||||
password='this is a secret')
|
password='this is a secret')
|
||||||
|
|
||||||
|
|
||||||
class CheckSessionCredentialsMixin(object):
|
class CredentialsFixture(keystone.KeystoneCredentialsFixture):
|
||||||
|
|
||||||
def check_session_credentials(self, session, credentials):
|
credentials = CREDENTIALS
|
||||||
if credentials:
|
|
||||||
if tobiko.is_fixture(credentials):
|
|
||||||
self.assertIsNone(session.credentials)
|
|
||||||
self.assertIs(credentials, session.credentials_fixture)
|
|
||||||
else:
|
|
||||||
self.assertIs(credentials, session.credentials)
|
|
||||||
self.assertIsNone(session.credentials_fixture)
|
|
||||||
else:
|
|
||||||
self.assertIsNone(session.credentials)
|
|
||||||
self.assertIsNone(session.credentials_fixture)
|
|
||||||
|
|
||||||
|
|
||||||
class KeystoneSessionFixtureTest(CheckSessionCredentialsMixin,
|
class DefaultCredentialsFixture(CredentialsFixture):
|
||||||
openstack.OpenstackTest):
|
|
||||||
|
credentials = DEFAULT_CREDENTIALS
|
||||||
|
|
||||||
|
|
||||||
|
class KeystoneSessionFixtureTest(openstack.OpenstackTest):
|
||||||
|
|
||||||
default_credentials_fixture = (
|
default_credentials_fixture = (
|
||||||
'tobiko.openstack.keystone._credentials.'
|
'tobiko.openstack.keystone._credentials.'
|
||||||
|
@ -83,15 +59,17 @@ class KeystoneSessionFixtureTest(CheckSessionCredentialsMixin,
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(KeystoneSessionFixtureTest, self).setUp()
|
super(KeystoneSessionFixtureTest, self).setUp()
|
||||||
from tobiko.openstack.keystone import _credentials
|
from tobiko.openstack.keystone import _credentials
|
||||||
|
from tobiko.openstack.keystone import _session
|
||||||
|
|
||||||
tobiko.remove_fixture(self.default_credentials_fixture)
|
tobiko.remove_fixture(self.default_credentials_fixture)
|
||||||
self.patch(_credentials, 'DefaultKeystoneCredentialsFixture',
|
self.patch(_credentials, 'DefaultKeystoneCredentialsFixture',
|
||||||
DefaultCredentialsFixture)
|
DefaultCredentialsFixture)
|
||||||
|
self.patch(_session, 'SESSIONS',
|
||||||
|
_session.KeystoneSessionManager())
|
||||||
|
|
||||||
def test_init(self, credentials=None):
|
def test_init(self, credentials=None):
|
||||||
session = keystone.KeystoneSessionFixture(credentials=credentials)
|
session = keystone.KeystoneSessionFixture(credentials=credentials)
|
||||||
self.check_session_credentials(session=session,
|
self.assertIs(credentials or None, session.credentials)
|
||||||
credentials=credentials)
|
|
||||||
|
|
||||||
def test_init_with_credentials(self):
|
def test_init_with_credentials(self):
|
||||||
self.test_init(credentials=CREDENTIALS)
|
self.test_init(credentials=CREDENTIALS)
|
||||||
|
@ -105,15 +83,12 @@ class KeystoneSessionFixtureTest(CheckSessionCredentialsMixin,
|
||||||
def test_setup(self, credentials=None):
|
def test_setup(self, credentials=None):
|
||||||
session = keystone.KeystoneSessionFixture(credentials=credentials)
|
session = keystone.KeystoneSessionFixture(credentials=credentials)
|
||||||
session.setUp()
|
session.setUp()
|
||||||
if credentials:
|
if tobiko.is_fixture(credentials):
|
||||||
if tobiko.is_fixture(credentials):
|
credentials = tobiko.get_fixture(credentials)
|
||||||
if inspect.isclass(credentials):
|
self.assertIs(credentials.credentials, session.credentials)
|
||||||
credentials = tobiko.get_fixture(credentials)
|
|
||||||
self.assertIs(credentials.credentials, session.credentials)
|
|
||||||
else:
|
|
||||||
self.assertIs(credentials, session.credentials)
|
|
||||||
else:
|
else:
|
||||||
self.assertEqual(DEFAULT_CREDENTIALS, session.credentials)
|
self.assertIs(credentials or DEFAULT_CREDENTIALS,
|
||||||
|
session.credentials)
|
||||||
|
|
||||||
def test_setup_with_credentials(self):
|
def test_setup_with_credentials(self):
|
||||||
self.test_setup(credentials=CREDENTIALS)
|
self.test_setup(credentials=CREDENTIALS)
|
||||||
|
@ -125,21 +100,20 @@ class KeystoneSessionFixtureTest(CheckSessionCredentialsMixin,
|
||||||
self.test_setup(credentials=CredentialsFixture)
|
self.test_setup(credentials=CredentialsFixture)
|
||||||
|
|
||||||
|
|
||||||
class KeystoneSessionManagerTest(CheckSessionCredentialsMixin,
|
class KeystoneSessionManagerTest(openstack.OpenstackTest):
|
||||||
openstack.OpenstackTest):
|
|
||||||
|
|
||||||
def test_init(self):
|
def test_init(self):
|
||||||
manager = keystone.KeystoneSessionManager()
|
manager = keystone.KeystoneSessionManager()
|
||||||
|
|
||||||
self.assertTrue(manager)
|
self.assertTrue(manager)
|
||||||
|
self.assertEqual({}, manager.sessions)
|
||||||
|
|
||||||
def test_get_session(self, credentials=None, shared=True):
|
def test_get_session(self, credentials=None, shared=True):
|
||||||
manager = keystone.KeystoneSessionManager()
|
manager = keystone.KeystoneSessionManager()
|
||||||
session = manager.get_session(credentials=credentials,
|
session = manager.get_session(credentials=credentials,
|
||||||
shared=shared)
|
shared=shared)
|
||||||
|
self.assertIs(credentials or None, session.credentials)
|
||||||
|
|
||||||
self.assertIsInstance(session, keystone.KeystoneSessionFixture)
|
self.assertIsInstance(session, keystone.KeystoneSessionFixture)
|
||||||
self.check_session_credentials(session=session,
|
|
||||||
credentials=credentials)
|
|
||||||
if shared:
|
if shared:
|
||||||
self.assertIs(session, manager.get_session(
|
self.assertIs(session, manager.get_session(
|
||||||
credentials=credentials))
|
credentials=credentials))
|
||||||
|
|
Loading…
Reference in New Issue