Make auth_token middleware fetching respect prefix

In auth_token middleware auth_prefix is only added to requests that go
through the _json_request method. This doesn't include the two
certificate fetching functions. Manually add the auth_prefix to both
those requests.

Closes-Bug: #1211615
Change-Id: I25d1b401598c9a443ddef0fc3259ba859aee8c76
This commit is contained in:
Jamie Lennox
2013-08-13 13:25:39 +10:00
parent d8673ff449
commit becec90286
2 changed files with 102 additions and 14 deletions

View File

@@ -1221,8 +1221,9 @@ class AuthProtocol(object):
return self.cms_verify(data['signed']) return self.cms_verify(data['signed'])
def fetch_signing_cert(self): def fetch_signing_cert(self):
response, data = self._http_request('GET', path = self.auth_admin_prefix.rstrip('/')
'/v2.0/certificates/signing') path += '/v2.0/certificates/signing'
response, data = self._http_request('GET', path)
def write_cert_file(data): def write_cert_file(data):
certfile = open(self.signing_cert_file_name, 'w') certfile = open(self.signing_cert_file_name, 'w')
@@ -1242,8 +1243,9 @@ class AuthProtocol(object):
raise ServiceError('invalid json response') raise ServiceError('invalid json response')
def fetch_ca_cert(self): def fetch_ca_cert(self):
response, data = self._http_request('GET', path = self.auth_admin_prefix.rstrip('/') + '/v2.0/certificates/ca'
'/v2.0/certificates/ca') response, data = self._http_request('GET', path)
try: try:
#todo check response #todo check response
certfile = open(self.ca_file_name, 'w') certfile = open(self.ca_file_name, 'w')

View File

@@ -475,6 +475,31 @@ class BaseFakeHTTPConnection(object):
return status, body return status, body
class CertificateHTTPConnection(BaseFakeHTTPConnection):
signing_cert_data = 'SIGNING CERT'
ca_cert_data = 'SIGNING CA'
def __init__(self, *args, **kwargs):
self.response = None
def request(self, method, path, **kwargs):
CertificateHTTPConnection.last_requested_url = path
if method == 'GET' and path == '/testadmin/v2.0/certificates/signing':
self.response = FakeHTTPResponse(200, self.signing_cert_data)
elif method == 'GET' and path == '/testadmin/v2.0/certificates/ca':
self.response = FakeHTTPResponse(200, self.ca_cert_data)
else:
self.response = FakeHTTPResponse(404, '')
def getresponse(self):
return self.response
def close(self):
pass
class FakeHTTPConnection(BaseFakeHTTPConnection): class FakeHTTPConnection(BaseFakeHTTPConnection):
"""Emulate a fake Keystone v2 server.""" """Emulate a fake Keystone v2 server."""
@@ -1262,6 +1287,16 @@ class CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest):
def setUp(self): def setUp(self):
super(CertDownloadMiddlewareTest, self).setUp() super(CertDownloadMiddlewareTest, self).setUp()
self.base_dir = tempfile.mkdtemp() self.base_dir = tempfile.mkdtemp()
self.cert_dir = os.path.join(self.base_dir, 'certs')
os.mkdir(self.cert_dir)
self.conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_protocol': 'http',
'auth_admin_prefix': '/testadmin',
'signing_dir': self.cert_dir,
}
def tearDown(self): def tearDown(self):
shutil.rmtree(self.base_dir) shutil.rmtree(self.base_dir)
@@ -1271,20 +1306,71 @@ class CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest):
# so invocation of /usr/bin/openssl succeeds. This time we give it # so invocation of /usr/bin/openssl succeeds. This time we give it
# an empty directory, so it fails. # an empty directory, so it fails.
def test_request_no_token_dummy(self): def test_request_no_token_dummy(self):
cert_dir = os.path.join(self.base_dir, 'certs') self.set_middleware(fake_http=self.fake_http, conf=self.conf)
os.mkdir(cert_dir)
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_protocol': 'http',
'auth_admin_prefix': '/testadmin',
'signing_dir': cert_dir,
}
self.set_middleware(fake_http=self.fake_http, conf=conf)
self.assertRaises(cms.subprocess.CalledProcessError, self.assertRaises(cms.subprocess.CalledProcessError,
self.middleware.verify_signed_token, self.middleware.verify_signed_token,
self.token_dict['signed_token_scoped']) self.token_dict['signed_token_scoped'])
def test_fetch_signing_cert(self):
self.set_middleware(fake_http=CertificateHTTPConnection,
conf=self.conf)
self.middleware.fetch_signing_cert()
with open(self.middleware.signing_cert_file_name, 'r') as f:
self.assertEqual(f.read(),
CertificateHTTPConnection.signing_cert_data)
self.assertEqual('/testadmin/v2.0/certificates/signing',
self.middleware.http_client_class.last_requested_url)
def test_fetch_signing_ca(self):
self.set_middleware(fake_http=CertificateHTTPConnection,
conf=self.conf)
self.middleware.fetch_ca_cert()
with open(self.middleware.ca_file_name, 'r') as f:
self.assertEqual(f.read(), CertificateHTTPConnection.ca_cert_data)
self.assertEqual('/testadmin/v2.0/certificates/ca',
self.middleware.http_client_class.last_requested_url)
def test_prefix_trailing_slash(self):
self.conf['auth_admin_prefix'] = '/newadmin/'
self.set_middleware(fake_http=CertificateHTTPConnection,
conf=self.conf)
# the requests will return a 404, but it doesn't matter
self.middleware.fetch_ca_cert()
self.assertEqual('/newadmin/v2.0/certificates/ca',
self.middleware.http_client_class.last_requested_url)
self.middleware.fetch_signing_cert()
self.assertEqual('/newadmin/v2.0/certificates/signing',
self.middleware.http_client_class.last_requested_url)
def test_without_prefix(self):
self.conf['auth_admin_prefix'] = ''
self.set_middleware(fake_http=CertificateHTTPConnection,
conf=self.conf)
# the requests will return a 404, but it doesn't matter
self.middleware.fetch_ca_cert()
self.assertEqual('/v2.0/certificates/ca',
self.middleware.http_client_class.last_requested_url)
self.middleware.fetch_signing_cert()
self.assertEqual('/v2.0/certificates/signing',
self.middleware.http_client_class.last_requested_url)
class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
"""v2 token specific tests. """v2 token specific tests.