diff --git a/keystone/test/system/common.py b/keystone/test/system/common.py index fb620f4cba..b33537b287 100644 --- a/keystone/test/system/common.py +++ b/keystone/test/system/common.py @@ -10,7 +10,7 @@ class HttpTestCase(unittest.TestCase): # Initialize a connection connection = httplib.HTTPConnection(host, port, timeout=3) - + # Perform the request connection.request(method, path, body, headers) @@ -129,18 +129,41 @@ class KeystoneTestCase(RestfulTestCase): } } - def service_request(self, port=5000, headers={}, **kwargs): + def setUp(self): + """Prepare keystone for system tests""" + # Authenticate as admin user to establish admin_token + r = self.admin_request(method='POST', path='/tokens', + json=self.admin_credentials) + self.admin_token = r.json['auth']['token']['id'] + + def service_request(self, path='', port=5000, headers={}, **kwargs): """Returns a request to the service API""" + path = KeystoneTestCase._prepend_path(path) + if self.service_token: headers['X-Auth-Token'] = self.service_token - return self.restful_request(port=port, headers=headers, **kwargs) + return self.restful_request(port=port, path=path, headers=headers, + **kwargs) - def admin_request(self, port=5001, headers={}, **kwargs): + def admin_request(self, path='', port=5001, headers={}, **kwargs): """Returns a request to the admin API""" + path = KeystoneTestCase._prepend_path(path) + if self.admin_token: headers['X-Auth-Token'] = self.admin_token - return self.restful_request(port=port, headers=headers, **kwargs) + return self.restful_request(port=port, path=path, headers=headers, **kwargs) + + @staticmethod + def _prepend_path(path): + """Prepend the given path with the API version""" + return '/v2.0' + str(path) + + @staticmethod + def _uuid(): + """Generate and return a unique identifier""" + import uuid + return str(uuid.uuid4()) diff --git a/keystone/test/system/test_auth.py b/keystone/test/system/test_auth.py new file mode 100644 index 0000000000..697c928afb --- /dev/null +++ b/keystone/test/system/test_auth.py @@ -0,0 +1,63 @@ +import unittest +from common import KeystoneTestCase + +class TestAdminAuthentication(KeystoneTestCase): + """Test admin-side user authentication""" + + def setUp(self): + """Empty method to prevent KeystoneTestCase from authenticating""" + pass + + def test_bootstrapped_admin_user(self): + """Bootstrap script should create an 'admin' user with 'Admin' role""" + # Authenticate as admin + r = self.admin_request(method='POST', path='/tokens', + json=self.admin_credentials) + + # Assert we get back a token with an expiration date + self.assertTrue(r.json['auth']['token']['id']) + self.assertTrue(r.json['auth']['token']['expires']) + +class TestServiceAuthentication(KeystoneTestCase): + """Test service-side user authentication""" + + user_id = KeystoneTestCase._uuid() + + def setUp(self): + super(TestServiceAuthentication, self).setUp() + + # Create a user + self.admin_request(method='PUT', path='/users', + json={ + 'user': { + 'id': self.user_id, + 'password': 'secrete', + 'email': 'user@openstack.org', + 'enabled': True, + } + }) + + def tearDown(self): + # Delete user + self.admin_request(method='DELETE', path='/users/%s' % self.user_id) + + def test_user_auth(self): + # Authenticate as user to get a token + r = self.service_request(method='POST', path='/tokens', + json={ + 'passwordCredentials': { + 'username': self.user_id, + 'password': 'secrete', + } + }) + self.service_token = r.json['auth']['token']['id'] + + """In the real world, the service user would then pass his/her token + to some service that depends on keystone, which would then need to + user keystone to validate the provided token.""" + + # Admin independently validates the user token + self.admin_request(path='/tokens/%s' % self.service_token) + +if __name__ == '__main__': + unittest.main() diff --git a/keystone/test/system/test_issue_85.py b/keystone/test/system/test_issue_85.py index 9542ac556b..76490232f9 100644 --- a/keystone/test/system/test_issue_85.py +++ b/keystone/test/system/test_issue_85.py @@ -4,54 +4,61 @@ from common import KeystoneTestCase class TestIssue85(KeystoneTestCase): """Illustrates github issue #85""" - def test_disabling_tenant_disables_token(self): - """Disabling a tenant should invalidate previously-issued tokens""" - # Authenticate as admin - r = self.admin_request(method='POST', path='/v2.0/tokens', - json=self.admin_credentials) - self.admin_token = r.json['auth']['token']['id'] - - user_id = 'user' - tenant_id = 'tenant' + tenant_id = KeystoneTestCase._uuid() + user_id = KeystoneTestCase._uuid() + + def setUp(self): + super(TestIssue85, self).setUp() # Create a tenant - self.admin_request(method='POST', path='/v2.0/tenants', + self.admin_request(method='POST', path='/tenants', json={ 'tenant':{ - 'id': tenant_id, + 'id': self.tenant_id, 'description': 'description', 'enabled': True, } }) # Create a user - self.admin_request(method='PUT', path='/v2.0/users', + self.admin_request(method='PUT', path='/users', json={ 'user':{ - 'id': user_id, + 'id': self.user_id, 'password': 'secrete', 'email': 'user@openstack.org', 'enabled': True, - 'tenant_id': tenant_id, + 'tenant_id': 'tenant', } }) + + def tearDown(self): + # Delete user + self.admin_request(method='DELETE', path='/users/%s' % + self.user_id) + # Delete tenant + self.admin_request(method='DELETE', path='/tenants/%s' % + self.tenant_id) + + def test_disabling_tenant_disables_token(self): + """Disabling a tenant should invalidate previously-issued tokens""" # Authenticate as user to get a token - r = self.service_request(method='POST', path='/v2.0/tokens', + r = self.service_request(method='POST', path='/tokens', json={ - 'passwordCredentials':{ - 'username': user_id, + 'passwordCredentials': { + 'username': self.user_id, 'password': 'secrete', } }) self.service_token = r.json['auth']['token']['id'] # Validate tenant token - self.admin_request(path='/v2.0/tokens/%s' % self.service_token) + self.admin_request(path='/tokens/%s' % self.service_token) # Disable tenant r = self.admin_request(method='PUT', - path='/v2.0/tenants/%s' % tenant_id, + path='/tenants/%s' % self.tenant_id, json={ 'tenant': { 'description': 'description', @@ -62,7 +69,7 @@ class TestIssue85(KeystoneTestCase): # Assert tenant token invalidated # Commented this out because it will fail this test -# self.admin_request(path='/v2.0/tokens/%s' % self.service_token, +# self.admin_request(path='/tokens/%s' % self.service_token, # expect_exception=True) if __name__ == '__main__': diff --git a/keystone/test/system/test_request_specs.py b/keystone/test/system/test_request_specs.py index ee6d050a5c..f828d865bf 100644 --- a/keystone/test/system/test_request_specs.py +++ b/keystone/test/system/test_request_specs.py @@ -6,8 +6,8 @@ class TestUrlHandling(KeystoneTestCase): def test_optional_trailing_slash(self): """Same response returned regardless of a trailing slash in the url.""" - r1 = self.admin_request(path='/v2.0/') - r2 = self.admin_request(path='/v2.0') + r1 = self.service_request(path='/') + r2 = self.service_request(path='') self.assertEqual(r1.read(), r2.read()) class TestContentTypes(KeystoneTestCase): @@ -15,41 +15,39 @@ class TestContentTypes(KeystoneTestCase): def test_default_content_type(self): """Service returns JSON without being asked to""" - r = self.admin_request(path='/v2.0') + r = self.service_request() self.assertTrue('application/json' in r.getheader('Content-Type')) def test_xml_extension(self): """Service responds to .xml URL extension""" - r = self.admin_request(path='/v2.0.xml') + r = self.service_request(path='.xml') self.assertTrue('application/xml' in r.getheader('Content-Type')) def test_json_extension(self): """Service responds to .json URL extension""" - r = self.admin_request(path='/v2.0.json') + r = self.service_request(path='.json') self.assertTrue('application/json' in r.getheader('Content-Type')) def test_xml_accept_header(self): """Service responds to xml Accept header""" - r = self.admin_request(path='/v2.0', - headers={'Accept': 'application/xml'}) + r = self.service_request(headers={'Accept': 'application/xml'}) self.assertTrue('application/xml' in r.getheader('Content-Type')) def test_json_accept_header(self): """Service responds to json Accept header""" - r = self.admin_request(path='/v2.0', - headers={'Accept': 'application/json'}) + r = self.service_request(headers={'Accept': 'application/json'}) self.assertTrue('application/json' in r.getheader('Content-Type')) def test_xml_extension_overrides_conflicting_header(self): """Service returns XML when Accept header conflicts with extension""" - r = self.admin_request(path='/v2.0.xml', + r = self.service_request(path='.xml', headers={'Accept': 'application/json'}) self.assertTrue('application/xml' in r.getheader('Content-Type')) def test_json_extension_overrides_conflicting_header(self): """Service returns JSON when Accept header conflicts with extension""" - r = self.admin_request(path='/v2.0.json', + r = self.service_request(path='.json', headers={'Accept': 'application/xml'}) self.assertTrue('application/json' in r.getheader('Content-Type'))