Privatize Everything

auth_token middleware has always been in a weird state where the
keystone developers decided the internals were private but other people
would use them.

Make everything about auth_token middleware private.

Change-Id: I83f8d65a7d6903553d77e3f9916ea753447dba3f
This commit is contained in:
Jamie Lennox 2014-07-02 09:48:41 +10:00 committed by Morgan Fainberg
parent 8e626d5b77
commit 4b6dc3f1bd
2 changed files with 350 additions and 339 deletions

File diff suppressed because it is too large Load Diff

View File

@ -250,15 +250,15 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase):
self.fake_app(self.expected_env), self.conf)
self.middleware._iso8601 = iso8601
with tempfile.NamedTemporaryFile(dir=self.middleware.signing_dirname,
with tempfile.NamedTemporaryFile(dir=self.middleware._signing_dirname,
delete=False) as f:
pass
self.middleware.revoked_file_name = f.name
self.middleware._revoked_file_name = f.name
self.addCleanup(cleanup_revoked_file,
self.middleware.revoked_file_name)
self.middleware._revoked_file_name)
self.middleware.token_revocation_list = jsonutils.dumps(
self.middleware._token_revocation_list = jsonutils.dumps(
{"revoked": [], "extra": "success"})
def start_fake_response(self, status, headers):
@ -295,8 +295,8 @@ class MultiStepAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
"%s/v2.0/tokens/revoked" % BASE_URI,
responses=responses)
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
self.assertEqual(fetched_list, self.examples.REVOCATION_LIST)
fetched = jsonutils.loads(self.middleware._fetch_revocation_list())
self.assertEqual(fetched, self.examples.REVOCATION_LIST)
# Check that 4 requests have been made
self.assertEqual(len(httpretty.httpretty.latest_requests), 4)
@ -436,10 +436,10 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_will_expire_soon(self):
tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
seconds=10)
self.assertTrue(auth_token.will_expire_soon(tenseconds))
self.assertTrue(auth_token._will_expire_soon(tenseconds))
fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
seconds=40)
self.assertFalse(auth_token.will_expire_soon(fortyseconds))
self.assertFalse(auth_token._will_expire_soon(fortyseconds))
def test_token_is_v2_accepts_v2(self):
token = self.examples.UUID_TOKEN_DEFAULT
@ -557,7 +557,7 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
'auth_uri': 'https://keystone.example.com:1234',
}
middleware = auth_token.AuthProtocol(self.fake_app, conf)
self.assertEqual(middleware.token_revocation_list_cache_timeout,
self.assertEqual(middleware._token_revocation_list_cache_timeout,
datetime.timedelta(seconds=24))
@ -581,7 +581,7 @@ class CommonAuthTokenMiddlewareTest(object):
}
self.set_middleware(conf=conf)
expected_auth_uri = 'http://[2001:2013:1:f101::1]:1234'
self.assertEqual(expected_auth_uri, self.middleware.auth_uri)
self.assertEqual(expected_auth_uri, self.middleware._auth_uri)
def assert_valid_request_200(self, token, with_catalog=True):
req = webob.Request.blank('/')
@ -614,7 +614,7 @@ class CommonAuthTokenMiddlewareTest(object):
def _test_cache_revoked(self, token, revoked_form=None):
# When the token is cached and revoked, 401 is returned.
self.middleware.check_revocations_for_cached = True
self.middleware._check_revocations_for_cached = True
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = token
@ -624,7 +624,7 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertEqual(200, self.response_status)
# Put it in revocation list.
self.middleware.token_revocation_list = self.get_revocation_list_json(
self.middleware._token_revocation_list = self.get_revocation_list_json(
token_ids=[revoked_form or token])
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(401, self.response_status)
@ -647,7 +647,8 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertLastPath(None)
def test_revoked_token_receives_401(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
self.middleware._token_revocation_list = (
self.get_revocation_list_json())
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
self.middleware(req.environ, self.start_fake_response)
@ -656,7 +657,7 @@ class CommonAuthTokenMiddlewareTest(object):
def test_revoked_token_receives_401_sha256(self):
self.conf['hash_algorithms'] = ['sha256', 'md5']
self.set_middleware()
self.middleware.token_revocation_list = (
self.middleware._token_revocation_list = (
self.get_revocation_list_json(mode='sha256'))
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
@ -675,7 +676,8 @@ class CommonAuthTokenMiddlewareTest(object):
# considered revoked so returns 401.
self.conf['hash_algorithms'] = ['sha256', 'md5']
self.set_middleware()
self.middleware.token_revocation_list = self.get_revocation_list_json()
self.middleware._token_revocation_list = (
self.get_revocation_list_json())
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
self.middleware(req.environ, self.start_fake_response)
@ -696,7 +698,7 @@ class CommonAuthTokenMiddlewareTest(object):
# Put the token in the revocation list.
token_hashed = cms.cms_hash_token(token)
self.middleware.token_revocation_list = self.get_revocation_list_json(
self.middleware._token_revocation_list = self.get_revocation_list_json(
token_ids=[token_hashed])
# First, request is using the hashed token, is valid so goes in
@ -723,50 +725,52 @@ class CommonAuthTokenMiddlewareTest(object):
def test_is_signed_token_revoked_returns_false(self):
#explicitly setting an empty revocation list here to document intent
self.middleware.token_revocation_list = jsonutils.dumps(
self.middleware._token_revocation_list = jsonutils.dumps(
{"revoked": [], "extra": "success"})
result = self.middleware.is_signed_token_revoked(
result = self.middleware._is_signed_token_revoked(
[self.token_dict['revoked_token_hash']])
self.assertFalse(result)
def test_is_signed_token_revoked_returns_true(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
result = self.middleware.is_signed_token_revoked(
self.middleware._token_revocation_list = (
self.get_revocation_list_json())
result = self.middleware._is_signed_token_revoked(
[self.token_dict['revoked_token_hash']])
self.assertTrue(result)
def test_is_signed_token_revoked_returns_true_sha256(self):
self.conf['hash_algorithms'] = ['sha256', 'md5']
self.set_middleware()
self.middleware.token_revocation_list = (
self.middleware._token_revocation_list = (
self.get_revocation_list_json(mode='sha256'))
result = self.middleware.is_signed_token_revoked(
result = self.middleware._is_signed_token_revoked(
[self.token_dict['revoked_token_hash_sha256']])
self.assertTrue(result)
def test_verify_signed_token_raises_exception_for_revoked_token(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
self.middleware._token_revocation_list = (
self.get_revocation_list_json())
self.assertRaises(auth_token.InvalidUserToken,
self.middleware.verify_signed_token,
self.middleware._verify_signed_token,
self.token_dict['revoked_token'],
[self.token_dict['revoked_token_hash']])
def test_verify_signed_token_raises_exception_for_revoked_token_s256(self):
self.conf['hash_algorithms'] = ['sha256', 'md5']
self.set_middleware()
self.middleware.token_revocation_list = (
self.middleware._token_revocation_list = (
self.get_revocation_list_json(mode='sha256'))
self.assertRaises(auth_token.InvalidUserToken,
self.middleware.verify_signed_token,
self.middleware._verify_signed_token,
self.token_dict['revoked_token'],
[self.token_dict['revoked_token_hash_sha256'],
self.token_dict['revoked_token_hash']])
def test_verify_signed_token_raises_exception_for_revoked_pkiz_token(self):
self.middleware.token_revocation_list = (
self.middleware._token_revocation_list = (
self.examples.REVOKED_TOKEN_PKIZ_LIST_JSON)
self.assertRaises(auth_token.InvalidUserToken,
self.middleware.verify_pkiz_token,
self.middleware._verify_pkiz_token,
self.token_dict['revoked_token_pkiz'],
[self.token_dict['revoked_token_pkiz_hash']])
@ -774,15 +778,17 @@ class CommonAuthTokenMiddlewareTest(object):
json.loads(text)
def test_verify_signed_token_succeeds_for_unrevoked_token(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
text = self.middleware.verify_signed_token(
self.middleware._token_revocation_list = (
self.get_revocation_list_json())
text = self.middleware._verify_signed_token(
self.token_dict['signed_token_scoped'],
[self.token_dict['signed_token_scoped_hash']])
self.assertIsValidJSON(text)
def test_verify_signed_compressed_token_succeeds_for_unrevoked_token(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
text = self.middleware.verify_pkiz_token(
self.middleware._token_revocation_list = (
self.get_revocation_list_json())
text = self.middleware._verify_pkiz_token(
self.token_dict['signed_token_scoped_pkiz'],
[self.token_dict['signed_token_scoped_hash']])
self.assertIsValidJSON(text)
@ -790,9 +796,9 @@ class CommonAuthTokenMiddlewareTest(object):
def test_verify_signed_token_succeeds_for_unrevoked_token_sha256(self):
self.conf['hash_algorithms'] = ['sha256', 'md5']
self.set_middleware()
self.middleware.token_revocation_list = (
self.middleware._token_revocation_list = (
self.get_revocation_list_json(mode='sha256'))
text = self.middleware.verify_signed_token(
text = self.middleware._verify_signed_token(
self.token_dict['signed_token_scoped'],
[self.token_dict['signed_token_scoped_hash_sha256'],
self.token_dict['signed_token_scoped_hash']])
@ -801,64 +807,65 @@ class CommonAuthTokenMiddlewareTest(object):
def test_verify_signing_dir_create_while_missing(self):
tmp_name = uuid.uuid4().hex
test_parent_signing_dir = "/tmp/%s" % tmp_name
self.middleware.signing_dirname = "/tmp/%s/%s" % ((tmp_name,) * 2)
self.middleware.signing_cert_file_name = (
"%s/test.pem" % self.middleware.signing_dirname)
self.middleware.verify_signing_dir()
self.middleware._signing_dirname = "/tmp/%s/%s" % ((tmp_name,) * 2)
self.middleware._signing_cert_file_name = (
"%s/test.pem" % self.middleware._signing_dirname)
self.middleware._verify_signing_dir()
# NOTE(wu_wenxiang): Verify if the signing dir was created as expected.
self.assertTrue(os.path.isdir(self.middleware.signing_dirname))
self.assertTrue(os.access(self.middleware.signing_dirname, os.W_OK))
self.assertEqual(os.stat(self.middleware.signing_dirname).st_uid,
self.assertTrue(os.path.isdir(self.middleware._signing_dirname))
self.assertTrue(os.access(self.middleware._signing_dirname, os.W_OK))
self.assertEqual(os.stat(self.middleware._signing_dirname).st_uid,
os.getuid())
self.assertEqual(
stat.S_IMODE(os.stat(self.middleware.signing_dirname).st_mode),
stat.S_IMODE(os.stat(self.middleware._signing_dirname).st_mode),
stat.S_IRWXU)
shutil.rmtree(test_parent_signing_dir)
def test_get_token_revocation_list_fetched_time_returns_min(self):
self.middleware.token_revocation_list_fetched_time = None
self.middleware.revoked_file_name = ''
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
self.middleware._token_revocation_list_fetched_time = None
self.middleware._revoked_file_name = ''
self.assertEqual(self.middleware._token_revocation_list_fetched_time,
datetime.datetime.min)
def test_get_token_revocation_list_fetched_time_returns_mtime(self):
self.middleware.token_revocation_list_fetched_time = None
mtime = os.path.getmtime(self.middleware.revoked_file_name)
self.middleware._token_revocation_list_fetched_time = None
mtime = os.path.getmtime(self.middleware._revoked_file_name)
fetched_time = datetime.datetime.utcfromtimestamp(mtime)
self.assertEqual(fetched_time,
self.middleware.token_revocation_list_fetched_time)
self.middleware._token_revocation_list_fetched_time)
@testtools.skipUnless(TimezoneFixture.supported(),
'TimezoneFixture not supported')
def test_get_token_revocation_list_fetched_time_returns_utc(self):
with TimezoneFixture('UTC-1'):
self.middleware.token_revocation_list = jsonutils.dumps(
self.middleware._token_revocation_list = jsonutils.dumps(
self.examples.REVOCATION_LIST)
self.middleware.token_revocation_list_fetched_time = None
fetched_time = self.middleware.token_revocation_list_fetched_time
self.middleware._token_revocation_list_fetched_time = None
fetched_time = self.middleware._token_revocation_list_fetched_time
self.assertTrue(timeutils.is_soon(fetched_time, 1))
def test_get_token_revocation_list_fetched_time_returns_value(self):
expected = self.middleware._token_revocation_list_fetched_time
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
self.assertEqual(self.middleware._token_revocation_list_fetched_time,
expected)
def test_get_revocation_list_returns_fetched_list(self):
# auth_token uses v2 to fetch this, so don't allow the v3
# tests to override the fake http connection
self.middleware.token_revocation_list_fetched_time = None
os.remove(self.middleware.revoked_file_name)
self.assertEqual(self.middleware.token_revocation_list,
self.middleware._token_revocation_list_fetched_time = None
os.remove(self.middleware._revoked_file_name)
self.assertEqual(self.middleware._token_revocation_list,
self.examples.REVOCATION_LIST)
def test_get_revocation_list_returns_current_list_from_memory(self):
self.assertEqual(self.middleware.token_revocation_list,
self.middleware._token_revocation_list)
self.assertEqual(self.middleware._token_revocation_list,
self.middleware._token_revocation_list_prop)
def test_get_revocation_list_returns_current_list_from_disk(self):
in_memory_list = self.middleware.token_revocation_list
self.middleware._token_revocation_list = None
self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
in_memory_list = self.middleware._token_revocation_list
self.middleware._token_revocation_list_prop = None
self.assertEqual(self.middleware._token_revocation_list,
in_memory_list)
def test_invalid_revocation_list_raises_service_error(self):
httpretty.register_uri(httpretty.GET,
@ -867,13 +874,13 @@ class CommonAuthTokenMiddlewareTest(object):
status=200)
self.assertRaises(auth_token.ServiceError,
self.middleware.fetch_revocation_list)
self.middleware._fetch_revocation_list)
def test_fetch_revocation_list(self):
# auth_token uses v2 to fetch this, so don't allow the v3
# tests to override the fake http connection
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
self.assertEqual(fetched_list, self.examples.REVOCATION_LIST)
fetched = jsonutils.loads(self.middleware._fetch_revocation_list())
self.assertEqual(fetched, self.examples.REVOCATION_LIST)
def test_request_invalid_uuid_token(self):
# remember because we are testing the middleware we stub the connection
@ -923,12 +930,12 @@ class CommonAuthTokenMiddlewareTest(object):
def debug(self, msg=None, *args, **kwargs):
self.debugmsg = msg
self.middleware.LOG = FakeLog()
self.middleware.delay_auth_decision = False
self.middleware._LOG = FakeLog()
self.middleware._delay_auth_decision = False
self.assertRaises(auth_token.InvalidUserToken,
self.middleware._get_user_token_from_header, {})
self.assertIsNotNone(self.middleware.LOG.msg)
self.assertIsNotNone(self.middleware.LOG.debugmsg)
self.assertIsNotNone(self.middleware._LOG.msg)
self.assertIsNotNone(self.middleware._LOG.debugmsg)
def test_request_no_token_http(self):
req = webob.Request.blank('/', environ={'REQUEST_METHOD': 'HEAD'})
@ -1042,7 +1049,7 @@ class CommonAuthTokenMiddlewareTest(object):
"""
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = ERROR_TOKEN
self.middleware.http_request_max_retries = 0
self.middleware._http_request_max_retries = 0
self.middleware(req.environ, self.start_fake_response)
self.assertIsNone(self._get_cached_token(ERROR_TOKEN))
self.assert_valid_last_url(ERROR_TOKEN)
@ -1277,7 +1284,7 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
"%s%s" % (BASE_URI, self.signing_path),
status=404)
self.assertRaises(exceptions.CertificateConfigError,
self.middleware.verify_signed_token,
self.middleware._verify_signed_token,
self.examples.SIGNED_TOKEN_SCOPED,
[self.examples.SIGNED_TOKEN_SCOPED_HASH])
@ -1286,9 +1293,9 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
httpretty.register_uri(httpretty.GET,
"%s%s" % (BASE_URI, self.signing_path),
body=data)
self.middleware.fetch_signing_cert()
self.middleware._fetch_signing_cert()
with open(self.middleware.signing_cert_file_name, 'r') as f:
with open(self.middleware._signing_cert_file_name, 'r') as f:
self.assertEqual(f.read(), data)
self.assertEqual("/testadmin%s" % self.signing_path,
@ -1299,9 +1306,9 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
httpretty.register_uri(httpretty.GET,
"%s%s" % (BASE_URI, self.ca_path),
body=data)
self.middleware.fetch_ca_cert()
self.middleware._fetch_ca_cert()
with open(self.middleware.signing_ca_file_name, 'r') as f:
with open(self.middleware._signing_ca_file_name, 'r') as f:
self.assertEqual(f.read(), data)
self.assertEqual("/testadmin%s" % self.ca_path,
@ -1323,12 +1330,12 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.set_middleware(conf=self.conf)
self.middleware.fetch_ca_cert()
self.middleware._fetch_ca_cert()
self.assertEqual('/newadmin%s' % self.ca_path,
httpretty.last_request().path)
self.middleware.fetch_signing_cert()
self.middleware._fetch_signing_cert()
self.assertEqual('/newadmin%s' % self.signing_path,
httpretty.last_request().path)
@ -1349,12 +1356,12 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.set_middleware(conf=self.conf)
self.middleware.fetch_ca_cert()
self.middleware._fetch_ca_cert()
self.assertEqual(self.ca_path,
httpretty.last_request().path)
self.middleware.fetch_signing_cert()
self.middleware._fetch_signing_cert()
self.assertEqual(self.signing_path,
httpretty.last_request().path)
@ -1715,10 +1722,10 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
class TokenEncodingTest(testtools.TestCase):
def test_unquoted_token(self):
self.assertEqual('foo%20bar', auth_token.safe_quote('foo bar'))
self.assertEqual('foo%20bar', auth_token._safe_quote('foo bar'))
def test_quoted_token(self):
self.assertEqual('foo%20bar', auth_token.safe_quote('foo%20bar'))
self.assertEqual('foo%20bar', auth_token._safe_quote('foo%20bar'))
class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
@ -1790,25 +1797,25 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
def test_no_data(self):
data = {}
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
auth_token._confirm_token_not_expired,
data)
def test_bad_data(self):
data = {'my_happy_token_dict': 'woo'}
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
auth_token._confirm_token_not_expired,
data)
def test_v2_token_not_expired(self):
data = self.create_v2_token_fixture()
expected_expires = data['access']['token']['expires']
actual_expires = auth_token.confirm_token_not_expired(data)
actual_expires = auth_token._confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)
def test_v2_token_expired(self):
data = self.create_v2_token_fixture(expires=self.one_hour_ago)
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
auth_token._confirm_token_not_expired,
data)
@mock.patch('keystonemiddleware.openstack.common.timeutils.utcnow')
@ -1819,7 +1826,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
data = self.create_v2_token_fixture(
expires='2000-01-01T00:05:10.000123-05:00')
expected_expires = '2000-01-01T05:05:10.000123Z'
actual_expires = auth_token.confirm_token_not_expired(data)
actual_expires = auth_token._confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)
@mock.patch('keystonemiddleware.openstack.common.timeutils.utcnow')
@ -1831,19 +1838,19 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
expires='2000-01-01T00:05:10.000123+05:00')
data['access']['token']['expires'] = '2000-01-01T00:05:10.000123+05:00'
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
auth_token._confirm_token_not_expired,
data)
def test_v3_token_not_expired(self):
data = self.create_v3_token_fixture()
expected_expires = data['token']['expires_at']
actual_expires = auth_token.confirm_token_not_expired(data)
actual_expires = auth_token._confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)
def test_v3_token_expired(self):
data = self.create_v3_token_fixture(expires=self.one_hour_ago)
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
auth_token._confirm_token_not_expired,
data)
@mock.patch('keystonemiddleware.openstack.common.timeutils.utcnow')
@ -1855,7 +1862,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
expires='2000-01-01T00:05:10.000123-05:00')
expected_expires = '2000-01-01T05:05:10.000123Z'
actual_expires = auth_token.confirm_token_not_expired(data)
actual_expires = auth_token._confirm_token_not_expired(data)
self.assertEqual(actual_expires, expected_expires)
@mock.patch('keystonemiddleware.openstack.common.timeutils.utcnow')
@ -1866,7 +1873,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
data = self.create_v3_token_fixture(
expires='2000-01-01T00:05:10.000123+05:00')
self.assertRaises(auth_token.InvalidUserToken,
auth_token.confirm_token_not_expired,
auth_token._confirm_token_not_expired,
data)
def test_cached_token_not_expired(self):