- Added 'automatic' admin authentication to KeystoneTestCase using bootstrapped user

- Added system tests for admin & service authentication
- Abstracted '/v2.0' path prefix away from system tests
- Added simple uuid function to generate data for system tests (random number gen w/ seeds might work better?)
- Refactored issue #85 tests with setUp & tearDown methods
This commit is contained in:
Dolph Mathews 2011-07-15 14:56:31 -05:00
parent 4a5a8dc898
commit dad61a5438
4 changed files with 127 additions and 36 deletions

View File

@ -10,7 +10,7 @@ class HttpTestCase(unittest.TestCase):
# Initialize a connection
connection = httplib.HTTPConnection(host, port, timeout=3)
# Perform the request
connection.request(method, path, body, headers)
@ -129,18 +129,41 @@ class KeystoneTestCase(RestfulTestCase):
}
}
def service_request(self, port=5000, headers={}, **kwargs):
def setUp(self):
"""Prepare keystone for system tests"""
# Authenticate as admin user to establish admin_token
r = self.admin_request(method='POST', path='/tokens',
json=self.admin_credentials)
self.admin_token = r.json['auth']['token']['id']
def service_request(self, path='', port=5000, headers={}, **kwargs):
"""Returns a request to the service API"""
path = KeystoneTestCase._prepend_path(path)
if self.service_token:
headers['X-Auth-Token'] = self.service_token
return self.restful_request(port=port, headers=headers, **kwargs)
return self.restful_request(port=port, path=path, headers=headers,
**kwargs)
def admin_request(self, port=5001, headers={}, **kwargs):
def admin_request(self, path='', port=5001, headers={}, **kwargs):
"""Returns a request to the admin API"""
path = KeystoneTestCase._prepend_path(path)
if self.admin_token:
headers['X-Auth-Token'] = self.admin_token
return self.restful_request(port=port, headers=headers, **kwargs)
return self.restful_request(port=port, path=path, headers=headers, **kwargs)
@staticmethod
def _prepend_path(path):
"""Prepend the given path with the API version"""
return '/v2.0' + str(path)
@staticmethod
def _uuid():
"""Generate and return a unique identifier"""
import uuid
return str(uuid.uuid4())

View File

@ -0,0 +1,63 @@
import unittest
from common import KeystoneTestCase
class TestAdminAuthentication(KeystoneTestCase):
"""Test admin-side user authentication"""
def setUp(self):
"""Empty method to prevent KeystoneTestCase from authenticating"""
pass
def test_bootstrapped_admin_user(self):
"""Bootstrap script should create an 'admin' user with 'Admin' role"""
# Authenticate as admin
r = self.admin_request(method='POST', path='/tokens',
json=self.admin_credentials)
# Assert we get back a token with an expiration date
self.assertTrue(r.json['auth']['token']['id'])
self.assertTrue(r.json['auth']['token']['expires'])
class TestServiceAuthentication(KeystoneTestCase):
"""Test service-side user authentication"""
user_id = KeystoneTestCase._uuid()
def setUp(self):
super(TestServiceAuthentication, self).setUp()
# Create a user
self.admin_request(method='PUT', path='/users',
json={
'user': {
'id': self.user_id,
'password': 'secrete',
'email': 'user@openstack.org',
'enabled': True,
}
})
def tearDown(self):
# Delete user
self.admin_request(method='DELETE', path='/users/%s' % self.user_id)
def test_user_auth(self):
# Authenticate as user to get a token
r = self.service_request(method='POST', path='/tokens',
json={
'passwordCredentials': {
'username': self.user_id,
'password': 'secrete',
}
})
self.service_token = r.json['auth']['token']['id']
"""In the real world, the service user would then pass his/her token
to some service that depends on keystone, which would then need to
user keystone to validate the provided token."""
# Admin independently validates the user token
self.admin_request(path='/tokens/%s' % self.service_token)
if __name__ == '__main__':
unittest.main()

View File

@ -4,54 +4,61 @@ from common import KeystoneTestCase
class TestIssue85(KeystoneTestCase):
"""Illustrates github issue #85"""
def test_disabling_tenant_disables_token(self):
"""Disabling a tenant should invalidate previously-issued tokens"""
# Authenticate as admin
r = self.admin_request(method='POST', path='/v2.0/tokens',
json=self.admin_credentials)
self.admin_token = r.json['auth']['token']['id']
user_id = 'user'
tenant_id = 'tenant'
tenant_id = KeystoneTestCase._uuid()
user_id = KeystoneTestCase._uuid()
def setUp(self):
super(TestIssue85, self).setUp()
# Create a tenant
self.admin_request(method='POST', path='/v2.0/tenants',
self.admin_request(method='POST', path='/tenants',
json={
'tenant':{
'id': tenant_id,
'id': self.tenant_id,
'description': 'description',
'enabled': True,
}
})
# Create a user
self.admin_request(method='PUT', path='/v2.0/users',
self.admin_request(method='PUT', path='/users',
json={
'user':{
'id': user_id,
'id': self.user_id,
'password': 'secrete',
'email': 'user@openstack.org',
'enabled': True,
'tenant_id': tenant_id,
'tenant_id': 'tenant',
}
})
def tearDown(self):
# Delete user
self.admin_request(method='DELETE', path='/users/%s' %
self.user_id)
# Delete tenant
self.admin_request(method='DELETE', path='/tenants/%s' %
self.tenant_id)
def test_disabling_tenant_disables_token(self):
"""Disabling a tenant should invalidate previously-issued tokens"""
# Authenticate as user to get a token
r = self.service_request(method='POST', path='/v2.0/tokens',
r = self.service_request(method='POST', path='/tokens',
json={
'passwordCredentials':{
'username': user_id,
'passwordCredentials': {
'username': self.user_id,
'password': 'secrete',
}
})
self.service_token = r.json['auth']['token']['id']
# Validate tenant token
self.admin_request(path='/v2.0/tokens/%s' % self.service_token)
self.admin_request(path='/tokens/%s' % self.service_token)
# Disable tenant
r = self.admin_request(method='PUT',
path='/v2.0/tenants/%s' % tenant_id,
path='/tenants/%s' % self.tenant_id,
json={
'tenant': {
'description': 'description',
@ -62,7 +69,7 @@ class TestIssue85(KeystoneTestCase):
# Assert tenant token invalidated
# Commented this out because it will fail this test
# self.admin_request(path='/v2.0/tokens/%s' % self.service_token,
# self.admin_request(path='/tokens/%s' % self.service_token,
# expect_exception=True)
if __name__ == '__main__':

View File

@ -6,8 +6,8 @@ class TestUrlHandling(KeystoneTestCase):
def test_optional_trailing_slash(self):
"""Same response returned regardless of a trailing slash in the url."""
r1 = self.admin_request(path='/v2.0/')
r2 = self.admin_request(path='/v2.0')
r1 = self.service_request(path='/')
r2 = self.service_request(path='')
self.assertEqual(r1.read(), r2.read())
class TestContentTypes(KeystoneTestCase):
@ -15,41 +15,39 @@ class TestContentTypes(KeystoneTestCase):
def test_default_content_type(self):
"""Service returns JSON without being asked to"""
r = self.admin_request(path='/v2.0')
r = self.service_request()
self.assertTrue('application/json' in r.getheader('Content-Type'))
def test_xml_extension(self):
"""Service responds to .xml URL extension"""
r = self.admin_request(path='/v2.0.xml')
r = self.service_request(path='.xml')
self.assertTrue('application/xml' in r.getheader('Content-Type'))
def test_json_extension(self):
"""Service responds to .json URL extension"""
r = self.admin_request(path='/v2.0.json')
r = self.service_request(path='.json')
self.assertTrue('application/json' in r.getheader('Content-Type'))
def test_xml_accept_header(self):
"""Service responds to xml Accept header"""
r = self.admin_request(path='/v2.0',
headers={'Accept': 'application/xml'})
r = self.service_request(headers={'Accept': 'application/xml'})
self.assertTrue('application/xml' in r.getheader('Content-Type'))
def test_json_accept_header(self):
"""Service responds to json Accept header"""
r = self.admin_request(path='/v2.0',
headers={'Accept': 'application/json'})
r = self.service_request(headers={'Accept': 'application/json'})
self.assertTrue('application/json' in r.getheader('Content-Type'))
def test_xml_extension_overrides_conflicting_header(self):
"""Service returns XML when Accept header conflicts with extension"""
r = self.admin_request(path='/v2.0.xml',
r = self.service_request(path='.xml',
headers={'Accept': 'application/json'})
self.assertTrue('application/xml' in r.getheader('Content-Type'))
def test_json_extension_overrides_conflicting_header(self):
"""Service returns JSON when Accept header conflicts with extension"""
r = self.admin_request(path='/v2.0.json',
r = self.service_request(path='.json',
headers={'Accept': 'application/xml'})
self.assertTrue('application/json' in r.getheader('Content-Type'))