From 6887246449c64e48b0a99e731d0c37f775fcaa99 Mon Sep 17 00:00:00 2001 From: Wu Wenxiang Date: Tue, 30 Jul 2013 23:29:02 +0800 Subject: [PATCH] Refactor verify signing dir logic Add a function AuthProtocol::verify_signing_dir() in keystoneclient/middleware/auth_token.py. It will be called both in init stage and in fetching signing cert scenario. Fixes bug 1201577 Change-Id: Ice0e8a93ba9d85c20de84cf8da3e9b0f2274b740 --- keystoneclient/middleware/auth_token.py | 43 ++++++++++++++++--------- tests/test_auth_token_middleware.py | 19 +++++++++++ 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/keystoneclient/middleware/auth_token.py b/keystoneclient/middleware/auth_token.py index dc3d17f1d..1f67b55df 100644 --- a/keystoneclient/middleware/auth_token.py +++ b/keystoneclient/middleware/auth_token.py @@ -383,20 +383,7 @@ class AuthProtocol(object): self.signing_dirname = tempfile.mkdtemp(prefix='keystone-signing-') self.LOG.info('Using %s as cache directory for signing certificate' % self.signing_dirname) - if os.path.exists(self.signing_dirname): - if not os.access(self.signing_dirname, os.W_OK): - raise ConfigurationError( - 'unable to access signing_dir %s' % self.signing_dirname) - if os.stat(self.signing_dirname).st_uid != os.getuid(): - self.LOG.warning( - 'signing_dir is not owned by %s' % os.getuid()) - current_mode = stat.S_IMODE(os.stat(self.signing_dirname).st_mode) - if current_mode != stat.S_IRWXU: - self.LOG.warning( - 'signing_dir mode is %s instead of %s' % - (oct(current_mode), oct(stat.S_IRWXU))) - else: - os.makedirs(self.signing_dirname, stat.S_IRWXU) + self.verify_signing_dir() val = '%s/signing_cert.pem' % self.signing_dirname self.signing_cert_file_name = val @@ -1145,6 +1132,22 @@ class AuthProtocol(object): formatted = cms.token_to_cms(signed_text) return self.cms_verify(formatted) + def verify_signing_dir(self): + if os.path.exists(self.signing_dirname): + if not os.access(self.signing_dirname, os.W_OK): + raise ConfigurationError( + 'unable to access signing_dir %s' % self.signing_dirname) + if os.stat(self.signing_dirname).st_uid != os.getuid(): + self.LOG.warning( + 'signing_dir is not owned by %s' % os.getuid()) + current_mode = stat.S_IMODE(os.stat(self.signing_dirname).st_mode) + if current_mode != stat.S_IRWXU: + self.LOG.warning( + 'signing_dir mode is %s instead of %s' % + (oct(current_mode), oct(stat.S_IRWXU))) + else: + os.makedirs(self.signing_dirname, stat.S_IRWXU) + @property def token_revocation_list_fetched_time(self): if not self._token_revocation_list_fetched_time: @@ -1210,11 +1213,19 @@ class AuthProtocol(object): def fetch_signing_cert(self): response, data = self._http_request('GET', '/v2.0/certificates/signing') - try: - #todo check response + + def write_cert_file(data): certfile = open(self.signing_cert_file_name, 'w') certfile.write(data) certfile.close() + + try: + #todo check response + try: + write_cert_file(data) + except IOError: + self.verify_signing_dir() + write_cert_file(data) except (AssertionError, KeyError): self.LOG.warn( "Unexpected response from keystone service: %s", data) diff --git a/tests/test_auth_token_middleware.py b/tests/test_auth_token_middleware.py index 7f58e8f37..10d45b085 100644 --- a/tests/test_auth_token_middleware.py +++ b/tests/test_auth_token_middleware.py @@ -18,10 +18,12 @@ import datetime import iso8601 import os import shutil +import stat import string import sys import tempfile import testtools +import uuid import fixtures import webob @@ -910,6 +912,23 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): self.middleware.verify_signed_token( self.token_dict['signed_token_scoped']) + def test_verify_signing_dir_create_while_missing(self): + tmp_name = uuid.uuid4().hex + test_parent_signing_dir = "/tmp/%s" % tmp_name + self.middleware.signing_dirname = "/tmp/%s/%s" % ((tmp_name,) * 2) + self.middleware.signing_cert_file_name = "%s/test.pem" %\ + self.middleware.signing_dirname + self.middleware.verify_signing_dir() + # NOTE(wu_wenxiang): Verify if the signing dir was created as expected. + self.assertTrue(os.path.isdir(self.middleware.signing_dirname)) + self.assertTrue(os.access(self.middleware.signing_dirname, os.W_OK)) + self.assertEqual(os.stat(self.middleware.signing_dirname).st_uid, + os.getuid()) + self.assertEqual( + stat.S_IMODE(os.stat(self.middleware.signing_dirname).st_mode), + stat.S_IRWXU) + shutil.rmtree(test_parent_signing_dir) + def test_cert_file_missing(self): self.assertFalse(self.middleware.cert_file_missing( "openstack: /tmp/haystack: No such file or directory",