Add type hints to keystone package

Change-Id: Ib71fc3f901401a7cd453b3cfa6fbbf1c0f10cf05
changes/24/761724/5
Federico Ressi 2 years ago
parent ad0340fc8a
commit 142585bab6

@ -52,6 +52,7 @@ is_service_missing = _services.is_service_missing
skip_if_missing_service = _services.skip_if_missing_service
keystone_session = _session.keystone_session
KeystoneSessionType = _session.KeystoneSessionType
KeystoneSessionFixture = _session.KeystoneSessionFixture
KeystoneSessionManager = _session.KeystoneSessionManager
get_keystone_session = _session.get_keystone_session

@ -70,25 +70,65 @@ class NoSuchCredentialsError(tobiko.TobikoException):
message = "No such credentials from any of: {fixtures}"
def get_keystone_credentials(obj=None) -> KeystoneCredentials:
if not obj:
class KeystoneCredentialsFixture(tobiko.SharedFixture):
credentials: typing.Optional[KeystoneCredentials] = None
def __init__(self,
credentials: typing.Optional[KeystoneCredentials] = None):
super(KeystoneCredentialsFixture, self).__init__()
if credentials is not None:
self.credentials = credentials
def setup_fixture(self):
self.setup_credentials()
def setup_credentials(self):
credentials = self.credentials
if credentials is None:
credentials = self.get_credentials()
if credentials is not None:
try:
credentials.validate()
except InvalidKeystoneCredentials as ex:
LOG.info("No such valid credentials from %r (%r)",
self, ex)
else:
self.addCleanup(self.cleanup_credentials)
self.credentials = credentials
def cleanup_credentials(self):
del self.credentials
def get_credentials(self) -> typing.Optional[KeystoneCredentials]:
return self.credentials
KeystoneCredentialsType = typing.Union[None,
KeystoneCredentials,
KeystoneCredentialsFixture,
str,
typing.Type]
def get_keystone_credentials(obj: KeystoneCredentialsType = None) -> \
typing.Optional[KeystoneCredentials]:
if obj is None:
return default_keystone_credentials()
if isinstance(obj, KeystoneCredentials):
return obj
if tobiko.is_fixture(obj):
obj = tobiko.get_fixture(obj)
if isinstance(obj, KeystoneCredentialsFixture):
obj = tobiko.setup_fixture(obj).credentials
if isinstance(obj, KeystoneCredentials):
return obj
message = "Can't get {!r} object from {!r}".format(
KeystoneCredentials, obj)
raise TypeError(message)
return get_keystone_credentials(obj)
raise TypeError(f"Can't get {KeystoneCredentials} object from {obj}")
def default_keystone_credentials() -> KeystoneCredentials:
def default_keystone_credentials() -> typing.Optional[KeystoneCredentials]:
credentials = tobiko.setup_fixture(
DefaultKeystoneCredentialsFixture).credentials
if credentials:
if credentials is not None:
tobiko.check_valid_type(credentials, KeystoneCredentials)
return credentials
@ -120,40 +160,6 @@ class InvalidKeystoneCredentials(tobiko.TobikoException):
message = "invalid Keystone credentials; {reason!s}; {credentials!r}"
class KeystoneCredentialsFixture(tobiko.SharedFixture):
credentials: typing.Optional[KeystoneCredentials] = None
def __init__(self,
credentials: typing.Optional[KeystoneCredentials] = None):
super(KeystoneCredentialsFixture, self).__init__()
if credentials:
self.credentials = credentials
def setup_fixture(self):
self.setup_credentials()
def setup_credentials(self):
credentials = self.credentials
if not self.credentials:
credentials = self.get_credentials()
if credentials:
try:
credentials.validate()
except InvalidKeystoneCredentials as ex:
LOG.info("No such valid credentials from %r (%r)",
self, ex)
else:
self.addCleanup(self.cleanup_credentials)
self.credentials = credentials
def cleanup_credentials(self):
del self.credentials
def get_credentials(self) -> typing.Optional[KeystoneCredentials]:
return self.credentials
class EnvironKeystoneCredentialsFixture(KeystoneCredentialsFixture):
environ: typing.Optional[typing.Dict[str, str]] = None

@ -13,6 +13,8 @@
# under the License.
from __future__ import absolute_import
import typing
from keystoneauth1 import loading
from keystoneauth1 import session as _session
from oslo_log import log
@ -25,34 +27,23 @@ from tobiko import http
LOG = log.getLogger(__name__)
def keystone_session(obj):
if not obj:
return default_keystone_session()
if tobiko.is_fixture(obj):
obj = tobiko.get_fixture(obj)
if isinstance(obj, KeystoneSessionFixture):
obj = tobiko.setup_fixture(obj).session
if isinstance(obj, _session.Session):
return obj
raise TypeError("Can't get {!r} object from {!r}".format(
_session.Session, obj))
class KeystoneSessionFixture(tobiko.SharedFixture):
session = None
credentials = None
session: typing.Optional[_session.Session] = None
credentials: _credentials.KeystoneCredentialsType = None
VALID_CREDENTIALS_TYPES = (_credentials.KeystoneCredentials,
_credentials.KeystoneCredentialsFixture,
type, str)
def __init__(self, credentials=None, session=None):
def __init__(self,
credentials: _credentials.KeystoneCredentialsType = None,
session: typing.Optional[_session.Session] = None):
super(KeystoneSessionFixture, self).__init__()
if credentials:
if credentials is not None:
tobiko.check_valid_type(credentials, *self.VALID_CREDENTIALS_TYPES)
self.credentials = credentials
if session:
if session is not None:
self.session = session
def setup_fixture(self):
@ -60,54 +51,91 @@ class KeystoneSessionFixture(tobiko.SharedFixture):
def setup_session(self):
session = self.session
if not session:
if session is None:
credentials = _credentials.get_keystone_credentials(
self.credentials)
LOG.debug("Create Keystone session with credentials %r",
credentials)
LOG.debug("Create Keystone session from credentials "
f"{credentials}")
credentials.validate()
loader = loading.get_plugin_loader('password')
params = credentials.to_dict()
# api version parameter is not accepted
params.pop('api_version', None)
auth = loader.load_from_options(**params)
self.session = session = _session.Session(
auth=auth, verify=False)
self.session = session = _session.Session(auth=auth, verify=False)
http.setup_http_session(session)
self.credentials = credentials
KeystoneSessionType = typing.Union[None,
_session.Session,
typing.Type,
str,
KeystoneSessionFixture]
def keystone_session(obj: KeystoneSessionType) -> _session.Session:
if obj is None:
return default_keystone_session()
if tobiko.is_fixture(obj):
obj = tobiko.get_fixture(obj)
if isinstance(obj, KeystoneSessionFixture):
obj = tobiko.setup_fixture(obj).session
if isinstance(obj, _session.Session):
return obj
raise TypeError(f"Can't get {_session.Session} object from {obj}")
InitSessionType = typing.Optional[typing.Callable]
class KeystoneSessionManager(object):
def __init__(self):
self.sessions = {}
def get_session(self, credentials=None, init_session=None, shared=True):
self.sessions: typing.Dict[typing.Any,
KeystoneSessionFixture] = {}
def get_session(self,
credentials: typing.Any = None,
init_session: InitSessionType = None,
shared: bool = True) \
-> KeystoneSessionFixture:
if shared:
shared_key, session = self.get_shared_session(credentials)
else:
shared_key = session = None
return session or self.create_session(credentials=credentials,
init_session=init_session,
shared=shared,
shared_key=shared_key)
if session is None:
return self.create_session(credentials=credentials,
init_session=init_session,
shared=shared,
shared_key=shared_key)
else:
return session
def get_shared_session(self, credentials):
def get_shared_session(self, credentials: typing.Any) \
-> typing.Tuple[typing.Any,
typing.Optional[KeystoneSessionFixture]]:
if tobiko.is_fixture(credentials):
key = tobiko.get_fixture_name(credentials)
else:
key = credentials
return key, self.sessions.get(key)
def create_session(self, credentials=None, init_session=None, shared=True,
shared_key=None):
init_session = init_session or KeystoneSessionFixture
def create_session(self,
credentials: typing.Any = None,
init_session: InitSessionType = None,
shared: bool = True,
shared_key: typing.Any = None) \
-> KeystoneSessionFixture:
if init_session is None:
init_session = KeystoneSessionFixture
assert callable(init_session)
LOG.debug('Initialize Keystone session: %r(credentials=%r)',
init_session, credentials)
session = init_session(credentials=credentials)
session: KeystoneSessionFixture = init_session(
credentials=credentials)
tobiko.check_valid_type(session, KeystoneSessionFixture)
if shared:
self.sessions[shared_key] = session
return session
@ -116,14 +144,24 @@ class KeystoneSessionManager(object):
SESSIONS = KeystoneSessionManager()
def default_keystone_session(shared=True, init_session=None, manager=None):
def default_keystone_session(
shared: bool = True,
init_session: InitSessionType = None,
manager: typing.Optional[KeystoneSessionManager] = None) -> \
_session.Session:
return get_keystone_session(shared=shared, init_session=init_session,
manager=manager)
def get_keystone_session(credentials=None, shared=True, init_session=None,
manager=None):
manager = manager or SESSIONS
def get_keystone_session(
credentials: typing.Any = None,
shared: bool = True,
init_session: typing.Any = None,
manager: typing.Optional[KeystoneSessionManager] = None) -> \
_session.Session:
if manager is None:
manager = SESSIONS
session = manager.get_session(credentials=credentials, shared=shared,
init_session=init_session)
tobiko.check_valid_type(session, KeystoneSessionFixture)
return tobiko.setup_fixture(session).session

@ -79,7 +79,7 @@ class KeystoneClientAPITest(testtools.TestCase):
keystone.find_service,
name='never-never-land')
def test_find_service_with_defaulkt(self):
def test_find_service_with_default(self):
service = keystone.find_service(name='never-never-land',
default=None)
self.assertIsNone(service)

@ -134,7 +134,8 @@ class KeystoneSessionManagerTest(openstack.OpenstackTest):
self.test_get_session(credentials=CredentialsFixture)
def test_get_session_with_init_session(self):
mock_session = mock.MagicMock(specs=keystone.KeystoneSessionFixture)
mock_session = mock.MagicMock(spec=keystone.KeystoneSessionFixture)
self.assertIsInstance(mock_session, keystone.KeystoneSessionFixture)
init_session = mock.MagicMock(return_value=mock_session)
manager = keystone.KeystoneSessionManager()
session = manager.get_session(credentials=CREDENTIALS,

Loading…
Cancel
Save