Merge "Fix auth_token middleware to fetch revocation list as admin."
This commit is contained in:
commit
84da6be591
@ -99,6 +99,7 @@
|
||||
#key_size = 1024
|
||||
#valid_days = 3650
|
||||
#ca_password = None
|
||||
#token_format = PKI
|
||||
|
||||
[ldap]
|
||||
# url = ldap://localhost
|
||||
|
@ -772,10 +772,14 @@ class AuthProtocol(object):
|
||||
f.write(value)
|
||||
|
||||
def fetch_revocation_list(self):
|
||||
response, data = self._http_request('GET', '/v2.0/tokens/revoked')
|
||||
headers = {'X-Auth-Token': self.get_admin_token()}
|
||||
response, data = self._json_request('GET', '/v2.0/tokens/revoked',
|
||||
additional_headers=headers)
|
||||
if response.status != 200:
|
||||
raise ServiceError('Unable to fetch token revocation list.')
|
||||
return self.cms_verify(data)
|
||||
if (not 'signed' in data):
|
||||
raise ServiceError('Revocation list inmproperly formatted.')
|
||||
return self.cms_verify(data['signed'])
|
||||
|
||||
def fetch_signing_cert(self):
|
||||
response, data = self._http_request('GET',
|
||||
|
@ -551,7 +551,7 @@ class TokenController(wsgi.Application):
|
||||
config.CONF.signing.certfile,
|
||||
config.CONF.signing.keyfile)
|
||||
|
||||
return signed_text
|
||||
return {'signed': signed_text}
|
||||
|
||||
def endpoints(self, context, token_id):
|
||||
"""Return a list of endpoints available to the token."""
|
||||
|
@ -32,8 +32,8 @@ from keystone import config
|
||||
from keystone import test
|
||||
|
||||
|
||||
#The data for these tests are signed using openssl and are stored in files
|
||||
# in the signing subdirectory. IN order to keep the values consistent between
|
||||
# The data for these tests are signed using openssl and are stored in files
|
||||
# in the signing subdirectory. In order to keep the values consistent between
|
||||
# the tests and the signed documents, we read them in for use in the tests.
|
||||
def setUpModule(self):
|
||||
signing_path = os.path.join(os.path.dirname(__file__), 'signing')
|
||||
@ -47,7 +47,8 @@ def setUpModule(self):
|
||||
with open(os.path.join(signing_path, 'revocation_list.json')) as f:
|
||||
self.REVOCATION_LIST = jsonutils.loads(f.read())
|
||||
with open(os.path.join(signing_path, 'revocation_list.pem')) as f:
|
||||
self.SIGNED_REVOCATION_LIST = f.read()
|
||||
self.VALID_SIGNED_REVOCATION_LIST =\
|
||||
jsonutils.dumps({'signed': f.read()})
|
||||
|
||||
self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED] = {
|
||||
'access': {
|
||||
@ -225,7 +226,7 @@ class FakeHTTPConnection(object):
|
||||
last_requested_url = ''
|
||||
|
||||
def __init__(self, *args):
|
||||
pass
|
||||
self.send_valid_revocation_list = True
|
||||
|
||||
def request(self, method, path, **kwargs):
|
||||
"""Fakes out several http responses.
|
||||
@ -319,6 +320,9 @@ class BaseAuthTokenMiddlewareTest(test.TestCase):
|
||||
self.middleware.token_revocation_list = jsonutils.dumps(
|
||||
{"revoked": [], "extra": "success"})
|
||||
|
||||
globals()['SIGNED_REVOCATION_LIST'] =\
|
||||
globals()['VALID_SIGNED_REVOCATION_LIST']
|
||||
|
||||
super(BaseAuthTokenMiddlewareTest, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
@ -478,6 +482,11 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
|
||||
self.middleware._token_revocation_list = None
|
||||
self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
|
||||
|
||||
def test_invalid_revocation_list_raises_service_error(self):
|
||||
globals()['SIGNED_REVOCATION_LIST'] = "{}"
|
||||
with self.assertRaises(auth_token.ServiceError):
|
||||
self.middleware.fetch_revocation_list()
|
||||
|
||||
def test_fetch_revocation_list(self):
|
||||
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
|
||||
self.assertEqual(fetched_list, REVOCATION_LIST)
|
||||
|
@ -220,11 +220,15 @@ class RestfulTestCase(test.TestCase):
|
||||
|
||||
def public_request(self, port=None, **kwargs):
|
||||
kwargs['port'] = port or self._public_port()
|
||||
return self.restful_request(**kwargs)
|
||||
response = self.restful_request(**kwargs)
|
||||
self.assertValidResponseHeaders(response)
|
||||
return response
|
||||
|
||||
def admin_request(self, port=None, **kwargs):
|
||||
kwargs['port'] = port or self._admin_port()
|
||||
return self.restful_request(**kwargs)
|
||||
response = self.restful_request(**kwargs)
|
||||
self.assertValidResponseHeaders(response)
|
||||
return response
|
||||
|
||||
def get_scoped_token(self):
|
||||
"""Convenience method so that we can test authenticated requests."""
|
||||
@ -621,6 +625,25 @@ class JsonTestCase(RestfulTestCase, CoreApiTests):
|
||||
r = self.admin_request(path=path, expected_status=401)
|
||||
self.assertValidErrorResponse(r)
|
||||
|
||||
def test_fetch_revocation_list_nonadmin_fails(self):
|
||||
r = self.admin_request(
|
||||
method='GET',
|
||||
path='/v2.0/tokens/revoked',
|
||||
expected_status=401)
|
||||
|
||||
def test_fetch_revocation_list_admin_200(self):
|
||||
token = self.get_scoped_token()
|
||||
r = self.restful_request(
|
||||
method='GET',
|
||||
path='/v2.0/tokens/revoked',
|
||||
token=token,
|
||||
expected_status=200,
|
||||
port=self._admin_port())
|
||||
self.assertValidRevocationListResponse(r)
|
||||
|
||||
def assertValidRevocationListResponse(self, response):
|
||||
self.assertIsNotNone(response.body['signed'])
|
||||
|
||||
|
||||
class XmlTestCase(RestfulTestCase, CoreApiTests):
|
||||
xmlns = 'http://docs.openstack.org/identity/api/v2.0'
|
||||
|
@ -7,3 +7,8 @@ driver = keystone.identity.backends.kvs.Identity
|
||||
[catalog]
|
||||
driver = keystone.catalog.backends.templated.TemplatedCatalog
|
||||
template_file = default_catalog.templates
|
||||
|
||||
[signing]
|
||||
certfile = signing/signing_cert.pem
|
||||
keyfile = signing/private_key.pem
|
||||
ca_certs = signing/cacert.pem
|
||||
|
Loading…
Reference in New Issue
Block a user