Refactor verify signing dir logic

Add a function AuthProtocol::verify_signing_dir() in
keystoneclient/middleware/auth_token.py. It will be called both in init
stage and in fetching signing cert scenario.

Fixes bug 1201577

Change-Id: Ice0e8a93ba9d85c20de84cf8da3e9b0f2274b740
This commit is contained in:
Wu Wenxiang
2013-07-30 23:29:02 +08:00
parent f952c817e0
commit 6887246449
2 changed files with 46 additions and 16 deletions

View File

@@ -383,20 +383,7 @@ class AuthProtocol(object):
self.signing_dirname = tempfile.mkdtemp(prefix='keystone-signing-') self.signing_dirname = tempfile.mkdtemp(prefix='keystone-signing-')
self.LOG.info('Using %s as cache directory for signing certificate' % self.LOG.info('Using %s as cache directory for signing certificate' %
self.signing_dirname) self.signing_dirname)
if os.path.exists(self.signing_dirname): self.verify_signing_dir()
if not os.access(self.signing_dirname, os.W_OK):
raise ConfigurationError(
'unable to access signing_dir %s' % self.signing_dirname)
if os.stat(self.signing_dirname).st_uid != os.getuid():
self.LOG.warning(
'signing_dir is not owned by %s' % os.getuid())
current_mode = stat.S_IMODE(os.stat(self.signing_dirname).st_mode)
if current_mode != stat.S_IRWXU:
self.LOG.warning(
'signing_dir mode is %s instead of %s' %
(oct(current_mode), oct(stat.S_IRWXU)))
else:
os.makedirs(self.signing_dirname, stat.S_IRWXU)
val = '%s/signing_cert.pem' % self.signing_dirname val = '%s/signing_cert.pem' % self.signing_dirname
self.signing_cert_file_name = val self.signing_cert_file_name = val
@@ -1145,6 +1132,22 @@ class AuthProtocol(object):
formatted = cms.token_to_cms(signed_text) formatted = cms.token_to_cms(signed_text)
return self.cms_verify(formatted) return self.cms_verify(formatted)
def verify_signing_dir(self):
if os.path.exists(self.signing_dirname):
if not os.access(self.signing_dirname, os.W_OK):
raise ConfigurationError(
'unable to access signing_dir %s' % self.signing_dirname)
if os.stat(self.signing_dirname).st_uid != os.getuid():
self.LOG.warning(
'signing_dir is not owned by %s' % os.getuid())
current_mode = stat.S_IMODE(os.stat(self.signing_dirname).st_mode)
if current_mode != stat.S_IRWXU:
self.LOG.warning(
'signing_dir mode is %s instead of %s' %
(oct(current_mode), oct(stat.S_IRWXU)))
else:
os.makedirs(self.signing_dirname, stat.S_IRWXU)
@property @property
def token_revocation_list_fetched_time(self): def token_revocation_list_fetched_time(self):
if not self._token_revocation_list_fetched_time: if not self._token_revocation_list_fetched_time:
@@ -1210,11 +1213,19 @@ class AuthProtocol(object):
def fetch_signing_cert(self): def fetch_signing_cert(self):
response, data = self._http_request('GET', response, data = self._http_request('GET',
'/v2.0/certificates/signing') '/v2.0/certificates/signing')
try:
#todo check response def write_cert_file(data):
certfile = open(self.signing_cert_file_name, 'w') certfile = open(self.signing_cert_file_name, 'w')
certfile.write(data) certfile.write(data)
certfile.close() certfile.close()
try:
#todo check response
try:
write_cert_file(data)
except IOError:
self.verify_signing_dir()
write_cert_file(data)
except (AssertionError, KeyError): except (AssertionError, KeyError):
self.LOG.warn( self.LOG.warn(
"Unexpected response from keystone service: %s", data) "Unexpected response from keystone service: %s", data)

View File

@@ -18,10 +18,12 @@ import datetime
import iso8601 import iso8601
import os import os
import shutil import shutil
import stat
import string import string
import sys import sys
import tempfile import tempfile
import testtools import testtools
import uuid
import fixtures import fixtures
import webob import webob
@@ -910,6 +912,23 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
self.middleware.verify_signed_token( self.middleware.verify_signed_token(
self.token_dict['signed_token_scoped']) self.token_dict['signed_token_scoped'])
def test_verify_signing_dir_create_while_missing(self):
tmp_name = uuid.uuid4().hex
test_parent_signing_dir = "/tmp/%s" % tmp_name
self.middleware.signing_dirname = "/tmp/%s/%s" % ((tmp_name,) * 2)
self.middleware.signing_cert_file_name = "%s/test.pem" %\
self.middleware.signing_dirname
self.middleware.verify_signing_dir()
# NOTE(wu_wenxiang): Verify if the signing dir was created as expected.
self.assertTrue(os.path.isdir(self.middleware.signing_dirname))
self.assertTrue(os.access(self.middleware.signing_dirname, os.W_OK))
self.assertEqual(os.stat(self.middleware.signing_dirname).st_uid,
os.getuid())
self.assertEqual(
stat.S_IMODE(os.stat(self.middleware.signing_dirname).st_mode),
stat.S_IRWXU)
shutil.rmtree(test_parent_signing_dir)
def test_cert_file_missing(self): def test_cert_file_missing(self):
self.assertFalse(self.middleware.cert_file_missing( self.assertFalse(self.middleware.cert_file_missing(
"openstack: /tmp/haystack: No such file or directory", "openstack: /tmp/haystack: No such file or directory",