Create a model for storing session tokens.
This commit is contained in:
@@ -76,6 +76,8 @@ DEFINE_string('vpn_key_suffix',
|
|||||||
'-key',
|
'-key',
|
||||||
'Suffix to add to project name for vpn key')
|
'Suffix to add to project name for vpn key')
|
||||||
|
|
||||||
|
DEFINE_integer('auth_token_ttl', 3600, 'Seconds for auth tokens to linger')
|
||||||
|
|
||||||
# UNUSED
|
# UNUSED
|
||||||
DEFINE_string('node_availability_zone',
|
DEFINE_string('node_availability_zone',
|
||||||
'nova',
|
'nova',
|
||||||
|
|||||||
@@ -16,6 +16,7 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
from datetime import datetime, timedelta
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
@@ -64,6 +65,12 @@ class ModelTestCase(test.TrialTestCase):
|
|||||||
daemon.save()
|
daemon.save()
|
||||||
return daemon
|
return daemon
|
||||||
|
|
||||||
|
def create_session_token(self):
|
||||||
|
session_token = model.SessionToken('tk12341234')
|
||||||
|
session_token['user'] = 'testuser'
|
||||||
|
session_token.save()
|
||||||
|
return session_token
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_create_instance(self):
|
def test_create_instance(self):
|
||||||
"""store with create_instace, then test that a load finds it"""
|
"""store with create_instace, then test that a load finds it"""
|
||||||
@@ -202,3 +209,91 @@ class ModelTestCase(test.TrialTestCase):
|
|||||||
if x.identifier == 'testhost:nova-testdaemon':
|
if x.identifier == 'testhost:nova-testdaemon':
|
||||||
found = True
|
found = True
|
||||||
self.assertTrue(found)
|
self.assertTrue(found)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_create_session_token(self):
|
||||||
|
"""create"""
|
||||||
|
d = yield self.create_session_token()
|
||||||
|
d = model.SessionToken(d.token)
|
||||||
|
self.assertFalse(d.is_new_record())
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_delete_session_token(self):
|
||||||
|
"""create, then destroy, then make sure loads a new record"""
|
||||||
|
instance = yield self.create_session_token()
|
||||||
|
yield instance.destroy()
|
||||||
|
newinst = yield model.SessionToken(instance.token)
|
||||||
|
self.assertTrue(newinst.is_new_record())
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_session_token_added_to_set(self):
|
||||||
|
"""create, then check that it is included in list"""
|
||||||
|
instance = yield self.create_session_token()
|
||||||
|
found = False
|
||||||
|
for x in model.SessionToken.all():
|
||||||
|
if x.identifier == instance.token:
|
||||||
|
found = True
|
||||||
|
self.assert_(found)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_session_token_associates_user(self):
|
||||||
|
"""create, then check that it is listed for the user"""
|
||||||
|
instance = yield self.create_session_token()
|
||||||
|
found = False
|
||||||
|
for x in model.SessionToken.associated_to('user', 'testuser'):
|
||||||
|
if x.identifier == instance.identifier:
|
||||||
|
found = True
|
||||||
|
self.assertTrue(found)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_session_token_generation(self):
|
||||||
|
instance = yield model.SessionToken.generate('username', 'TokenType')
|
||||||
|
self.assertFalse(instance.is_new_record())
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_find_generated_session_token(self):
|
||||||
|
instance = yield model.SessionToken.generate('username', 'TokenType')
|
||||||
|
found = yield model.SessionToken.lookup(instance.identifier)
|
||||||
|
self.assert_(found)
|
||||||
|
|
||||||
|
def test_update_session_token_expiry(self):
|
||||||
|
instance = model.SessionToken('tk12341234')
|
||||||
|
oldtime = datetime.utcnow()
|
||||||
|
instance['expiry'] = oldtime.strftime(utils.TIME_FORMAT)
|
||||||
|
instance.update_expiry()
|
||||||
|
expiry = utils.parse_isotime(instance['expiry'])
|
||||||
|
self.assert_(expiry > datetime.utcnow())
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_session_token_lookup_when_expired(self):
|
||||||
|
instance = yield model.SessionToken.generate("testuser")
|
||||||
|
instance['expiry'] = datetime.utcnow().strftime(utils.TIME_FORMAT)
|
||||||
|
instance.save()
|
||||||
|
inst = model.SessionToken.lookup(instance.identifier)
|
||||||
|
self.assertFalse(inst)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_session_token_lookup_when_not_expired(self):
|
||||||
|
instance = yield model.SessionToken.generate("testuser")
|
||||||
|
inst = model.SessionToken.lookup(instance.identifier)
|
||||||
|
self.assert_(inst)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_session_token_is_expired_when_expired(self):
|
||||||
|
instance = yield model.SessionToken.generate("testuser")
|
||||||
|
instance['expiry'] = datetime.utcnow().strftime(utils.TIME_FORMAT)
|
||||||
|
self.assert_(instance.is_expired())
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_session_token_is_expired_when_not_expired(self):
|
||||||
|
instance = yield model.SessionToken.generate("testuser")
|
||||||
|
self.assertFalse(instance.is_expired())
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_session_token_ttl(self):
|
||||||
|
instance = yield model.SessionToken.generate("testuser")
|
||||||
|
now = datetime.utcnow()
|
||||||
|
delta = timedelta(hours=1)
|
||||||
|
instance['expiry'] = (now + delta).strftime(utils.TIME_FORMAT)
|
||||||
|
# give 5 seconds of fuzziness
|
||||||
|
self.assert_(abs(instance.ttl() - FLAGS.auth_token_ttl) < 5)
|
||||||
|
|||||||
Reference in New Issue
Block a user