Merge "Allows any valid ISO8601 timestamp in token expiry time format"
This commit is contained in:
@@ -26,7 +26,8 @@ from tempest_lib import exceptions
|
||||
from tempest_lib.services.identity.v2 import token_client as json_v2id
|
||||
from tempest_lib.services.identity.v3 import token_client as json_v3id
|
||||
|
||||
|
||||
ISO8601_FLOAT_SECONDS = '%Y-%m-%dT%H:%M:%S.%fZ'
|
||||
ISO8601_INT_SECONDS = '%Y-%m-%dT%H:%M:%SZ'
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -174,6 +175,8 @@ class AuthProvider(object):
|
||||
|
||||
class KeystoneAuthProvider(AuthProvider):
|
||||
|
||||
EXPIRY_DATE_FORMATS = (ISO8601_FLOAT_SECONDS, ISO8601_INT_SECONDS)
|
||||
|
||||
token_expiry_threshold = datetime.timedelta(seconds=60)
|
||||
|
||||
def __init__(self, credentials, auth_url,
|
||||
@@ -223,14 +226,27 @@ class KeystoneAuthProvider(AuthProvider):
|
||||
token, auth_data = auth_func(**auth_params)
|
||||
return token, auth_data
|
||||
|
||||
def _parse_expiry_time(self, expiry_string):
|
||||
expiry = None
|
||||
for date_format in self.EXPIRY_DATE_FORMATS:
|
||||
try:
|
||||
expiry = datetime.datetime.strptime(
|
||||
expiry_string, date_format)
|
||||
except ValueError:
|
||||
pass
|
||||
if expiry is None:
|
||||
raise ValueError(
|
||||
"time data '{data}' does not match any of the"
|
||||
"expected formats: {formats}".format(
|
||||
data=expiry_string, formats=self.EXPIRY_DATE_FORMATS))
|
||||
return expiry
|
||||
|
||||
def get_token(self):
|
||||
return self.auth_data[0]
|
||||
|
||||
|
||||
class KeystoneV2AuthProvider(KeystoneAuthProvider):
|
||||
|
||||
EXPIRY_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
|
||||
|
||||
def _auth_client(self, auth_url):
|
||||
return json_v2id.TokenClientJSON(
|
||||
auth_url, disable_ssl_certificate_validation=self.dsvm,
|
||||
@@ -302,16 +318,13 @@ class KeystoneV2AuthProvider(KeystoneAuthProvider):
|
||||
|
||||
def is_expired(self, auth_data):
|
||||
_, access = auth_data
|
||||
expiry = datetime.datetime.strptime(access['token']['expires'],
|
||||
self.EXPIRY_DATE_FORMAT)
|
||||
expiry = self._parse_expiry_time(access['token']['expires'])
|
||||
return (expiry - self.token_expiry_threshold <=
|
||||
datetime.datetime.utcnow())
|
||||
|
||||
|
||||
class KeystoneV3AuthProvider(KeystoneAuthProvider):
|
||||
|
||||
EXPIRY_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
|
||||
|
||||
def _auth_client(self, auth_url):
|
||||
return json_v3id.V3TokenClientJSON(
|
||||
auth_url, disable_ssl_certificate_validation=self.dsvm,
|
||||
@@ -427,8 +440,7 @@ class KeystoneV3AuthProvider(KeystoneAuthProvider):
|
||||
|
||||
def is_expired(self, auth_data):
|
||||
_, access = auth_data
|
||||
expiry = datetime.datetime.strptime(access['expires_at'],
|
||||
self.EXPIRY_DATE_FORMAT)
|
||||
expiry = self._parse_expiry_time(access['expires_at'])
|
||||
return (expiry - self.token_expiry_threshold <=
|
||||
datetime.datetime.utcnow())
|
||||
|
||||
|
||||
@@ -315,22 +315,23 @@ class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp):
|
||||
|
||||
def test_token_not_expired(self):
|
||||
expiry_data = datetime.datetime.utcnow() + datetime.timedelta(days=1)
|
||||
auth_data = self._auth_data_with_expiry(
|
||||
expiry_data.strftime(self.auth_provider.EXPIRY_DATE_FORMAT))
|
||||
self.assertFalse(self.auth_provider.is_expired(auth_data))
|
||||
self._verify_expiry(expiry_data=expiry_data, should_be_expired=False)
|
||||
|
||||
def test_token_expired(self):
|
||||
expiry_data = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
|
||||
auth_data = self._auth_data_with_expiry(
|
||||
expiry_data.strftime(self.auth_provider.EXPIRY_DATE_FORMAT))
|
||||
self.assertTrue(self.auth_provider.is_expired(auth_data))
|
||||
self._verify_expiry(expiry_data=expiry_data, should_be_expired=True)
|
||||
|
||||
def test_token_not_expired_to_be_renewed(self):
|
||||
expiry_data = (datetime.datetime.utcnow() +
|
||||
self.auth_provider.token_expiry_threshold / 2)
|
||||
auth_data = self._auth_data_with_expiry(
|
||||
expiry_data.strftime(self.auth_provider.EXPIRY_DATE_FORMAT))
|
||||
self.assertTrue(self.auth_provider.is_expired(auth_data))
|
||||
self._verify_expiry(expiry_data=expiry_data, should_be_expired=True)
|
||||
|
||||
def _verify_expiry(self, expiry_data, should_be_expired):
|
||||
for expiry_format in self.auth_provider.EXPIRY_DATE_FORMATS:
|
||||
auth_data = self._auth_data_with_expiry(
|
||||
expiry_data.strftime(expiry_format))
|
||||
self.assertEqual(self.auth_provider.is_expired(auth_data),
|
||||
should_be_expired)
|
||||
|
||||
|
||||
class TestKeystoneV3AuthProvider(TestKeystoneV2AuthProvider):
|
||||
|
||||
Reference in New Issue
Block a user