diff --git a/tests/test_auth_token_middleware.py b/tests/test_auth_token_middleware.py index a9b5e6953..b38dbfa1e 100644 --- a/tests/test_auth_token_middleware.py +++ b/tests/test_auth_token_middleware.py @@ -25,6 +25,7 @@ import testtools import uuid import fixtures +import httpretty import mock import webob @@ -36,9 +37,6 @@ from keystoneclient.openstack.common import timeutils import client_fixtures -SIGNED_REVOCATION_LIST = None -VALID_SIGNED_REVOCATION_LIST = client_fixtures.SIGNED_REVOCATION_LIST - EXPECTED_V2_DEFAULT_ENV_RESPONSE = { 'HTTP_X_IDENTITY_STATUS': 'Confirmed', 'HTTP_X_TENANT_ID': 'tenant_id1', @@ -51,39 +49,48 @@ EXPECTED_V2_DEFAULT_ENV_RESPONSE = { 'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat) } -FAKE_RESPONSE_STACK = [] -VERSION_LIST_v3 = { +BASE_HOST = 'https://keystone.example.com:1234' +BASE_URI = '%s/testadmin' % BASE_HOST +FAKE_ADMIN_TOKEN_ID = 'admin_token2' +FAKE_ADMIN_TOKEN = jsonutils.dumps( + {'access': {'token': {'id': FAKE_ADMIN_TOKEN_ID, + 'expires': '2022-10-03T16:58:01Z'}}}) + + +VERSION_LIST_v3 = jsonutils.dumps({ "versions": { "values": [ { "id": "v3.0", "status": "stable", "updated": "2013-03-06T00:00:00Z", - "links": [] + "links": [{'href': '%s/v3' % BASE_URI, 'rel': 'self'}] }, { "id": "v2.0", - "status": "beta", + "status": "stable", "updated": "2011-11-19T00:00:00Z", - "links": [] + "links": [{'href': '%s/v2.0' % BASE_URI, 'rel': 'self'}] } ] } -} +}) -VERSION_LIST_v2 = { +VERSION_LIST_v2 = jsonutils.dumps({ "versions": { "values": [ { "id": "v2.0", - "status": "beta", + "status": "stable", "updated": "2011-11-19T00:00:00Z", "links": [] } ] } -} +}) + +ERROR_TOKEN = '7ae290c2a06244c4b41692eb4e9225f2' class NoModuleFinder(object): @@ -140,178 +147,14 @@ class FakeSwiftOldMemcacheClient(memorycache.Client): sup.set(key, value, timeout, min_compress_len) -class FakeHTTPResponse(object): - def __init__(self, status, body): - self.status = status - self.body = body - - def read(self): - return self.body - - -class BaseFakeHTTPConnection(object): - - def _user_token_responses(self, token_id): - """Emulate user token responses. - - Return success if the token is in the list we know - about. If the request is for revoked tokens, then return - the revoked list, else if a different token is provided, - return 404 indicating an unknown (therefore unauthorized) token. - - """ - if token_id in client_fixtures.JSON_TOKEN_RESPONSES.keys(): - status = 200 - body = client_fixtures.JSON_TOKEN_RESPONSES[token_id] - elif token_id == "revoked": - status = 200 - body = SIGNED_REVOCATION_LIST - else: - status = 404 - body = str() - return status, body - - def fake_v2_responses(self, path): - token_id = path.rsplit('/', 1)[1] - return self._user_token_responses(token_id) - - def fake_v3_responses(self, path, **kwargs): - headers = kwargs.get('headers') - token_id = headers['X-Subject-Token'] - return self._user_token_responses(token_id) - - def fake_v2_admin_token(self, path): - status = 200 - body = jsonutils.dumps({ - 'access': { - 'token': {'id': 'admin_token2', - 'expires': '2022-10-03T16:58:01Z'} - }, - }) - return status, body - - -class CertificateHTTPConnection(BaseFakeHTTPConnection): - - signing_cert_data = 'SIGNING CERT' - ca_cert_data = 'SIGNING CA' - - def __init__(self, *args, **kwargs): - self.response = None - - def request(self, method, path, **kwargs): - CertificateHTTPConnection.last_requested_url = path - - if method == 'GET' and path == '/testadmin/v2.0/certificates/signing': - self.response = FakeHTTPResponse(200, self.signing_cert_data) - elif method == 'GET' and path == '/testadmin/v2.0/certificates/ca': - self.response = FakeHTTPResponse(200, self.ca_cert_data) - else: - self.response = FakeHTTPResponse(404, '') - - def getresponse(self): - return self.response - - def close(self): - pass - - -class FakeHTTPConnection(BaseFakeHTTPConnection): - """Emulate a fake Keystone v2 server.""" - - def __init__(self, *args, **kwargs): - self.send_valid_revocation_list = True - self.resp = None - - def request(self, method, path, **kwargs): - """Fakes out several http responses. - - Support the following requests: - - - Create admin token ('POST /testadmin/v2.0/tokens') - - Get versions ('GET /testadmin/') - - Get v2 user token responses (see fake_v2_responses) - - """ - FakeHTTPConnection.last_requested_url = path - if method == 'POST' and path == '/testadmin/v2.0/tokens': - status, body = self.fake_v2_admin_token(path) - else: - if path == '/testadmin/': - # It's a GET versions call - status = 300 - body = jsonutils.dumps(VERSION_LIST_v2) - else: - status, body = self.fake_v2_responses(path) - - self.resp = FakeHTTPResponse(status, body) - - def getresponse(self): - # If self.resp is set then this is just the response to - # the earlier request. If it is not set, then we expect - # a stack of responses to have been pre-prepared - if self.resp: - return self.resp - else: - if len(FAKE_RESPONSE_STACK): - return FAKE_RESPONSE_STACK.pop() - return FakeHTTPResponse( - 500, jsonutils.dumps('UNEXPECTED RESPONSE')) - - def close(self): - pass - - -class v3FakeHTTPConnection(FakeHTTPConnection): - """Emulate a fake Keystone v3 server.""" - - def request(self, method, path, **kwargs): - """Fakes out several http responses. - - Support the following requests: - - - Create admin token ('POST /testadmin/v2.0/tokens') - - Get versions ('GET /testadmin/') - - Get v2 user token responses (see fake_v2_responses) - - Get v3 user token responses (see fake_v3_responses) - - """ - v3FakeHTTPConnection.last_requested_url = path - if method == 'POST' and path == '/testadmin/v2.0/tokens': - status, body = self.fake_v2_admin_token(path) - else: - if path == '/testadmin/': - # It's a GET versions call - status = 300 - body = jsonutils.dumps(VERSION_LIST_v3) - elif path.split('/')[2] == 'v2.0': - status, body = self.fake_v2_responses(path) - else: - status, body = self.fake_v3_responses(path, **kwargs) - - self.resp = FakeHTTPResponse(status, body) - - -class RaisingHTTPConnection(FakeHTTPConnection): - """An HTTPConnection that always raises.""" - - def request(self, method, path, **kwargs): - raise AssertionError("HTTP request was called.") - - -class RaisingHTTPNetworkError(FakeHTTPConnection): - """An HTTPConnection that always raises network error.""" - - def request(self, method, path, **kwargs): - raise auth_token.NetworkError("Network connection error.") - - class FakeApp(object): """This represents a WSGI app protected by the auth_token middleware.""" + def __init__(self, expected_env=None): - expected_env = expected_env or {} self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE) - self.expected_env.update(expected_env) + + if expected_env: + self.expected_env.update(expected_env) def __call__(self, env, start_response): for k, v in self.expected_env.items(): @@ -322,13 +165,12 @@ class FakeApp(object): return resp(env, start_response) -class v3FakeApp(object): +class v3FakeApp(FakeApp): """This represents a v3 WSGI app protected by the auth_token middleware.""" + def __init__(self, expected_env=None): - expected_env = expected_env or {} - # We should always get back the same v2 items - self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE) - # ...and with v3 additions, these are for the DEFAULT TOKEN + + # with v3 additions, these are for the DEFAULT TOKEN v3_default_env_additions = { 'HTTP_X_PROJECT_ID': 'tenant_id1', 'HTTP_X_PROJECT_NAME': 'tenant_name1', @@ -337,16 +179,11 @@ class v3FakeApp(object): 'HTTP_X_USER_DOMAIN_ID': 'domain_id1', 'HTTP_X_USER_DOMAIN_NAME': 'domain_name1' } - self.expected_env.update(v3_default_env_additions) - # And finally update for anything passed in - self.expected_env.update(expected_env) - def __call__(self, env, start_response): - for k, v in self.expected_env.items(): - assert env[k] == v, '%s != %s' % (env[k], v) - resp = webob.Response() - resp.body = 'SUCCESS' - return resp(env, start_response) + if expected_env: + v3_default_env_additions.update(expected_env) + + super(v3FakeApp, self).__init__(v3_default_env_additions) class BaseAuthTokenMiddlewareTest(testtools.TestCase): @@ -362,65 +199,26 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase): this to specify, for instance, v3 format. """ - def setUp(self, expected_env=None, auth_version=None, - fake_app=None, fake_http=None, token_dict=None): + def setUp(self, expected_env=None, auth_version=None, fake_app=None): testtools.TestCase.setUp(self) - expected_env = expected_env or {} - if token_dict: - self.token_dict = token_dict - else: - self.token_dict = { - 'uuid_token_default': client_fixtures.UUID_TOKEN_DEFAULT, - 'uuid_token_unscoped': client_fixtures.UUID_TOKEN_UNSCOPED, - 'signed_token_scoped': client_fixtures.SIGNED_TOKEN_SCOPED, - 'signed_token_scoped_expired': - client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED, - 'revoked_token': client_fixtures.REVOKED_TOKEN, - 'revoked_token_hash': client_fixtures.REVOKED_TOKEN_HASH - } + self.expected_env = expected_env or dict() + self.fake_app = fake_app or FakeApp + self.middleware = None self.conf = { 'auth_host': 'keystone.example.com', 'auth_port': 1234, + 'auth_protocol': 'https', 'auth_admin_prefix': '/testadmin', 'signing_dir': client_fixtures.CERTDIR, 'auth_version': auth_version } - # Base assumes v2 for fake app and http, can be overridden for - # child classes by called set_middleware() directly - self.fake_app = fake_app or FakeApp - self.fake_http = fake_http or FakeHTTPConnection - self.set_middleware(self.fake_app, self.fake_http, - expected_env, self.conf) - # self.middleware.verify_signed_token will call - # _ensure_subprocess, but we need - # cms.subprocess.CalledProcessError to be imported first. - # Explicitly call _ensure_subprocess to make sure the import - # happens before we need it regardless of test order. - cms._ensure_subprocess() - self.response_status = None self.response_headers = None - signed_list = 'SIGNED_REVOCATION_LIST' - valid_signed_list = 'VALID_SIGNED_REVOCATION_LIST' - globals()[signed_list] = globals()[valid_signed_list] - - def set_fake_http(self, http_handler): - """Configure the http handler for the auth_token middleware. - - Allows tests to override the default handler on specific tests, - e.g. to use v2 for those parts of auth_token that still use v2 - tokens while running the v3 test class, i.e. getting an admin - token or revocation list. - - """ - self.middleware.http_client_class = http_handler - - def set_middleware(self, fake_app=None, fake_http=None, - expected_env=None, conf=None): + def set_middleware(self, fake_app=None, expected_env=None, conf=None): """Configure the class ready to call the auth_token middleware. Set up the various fake items needed to run the middleware. @@ -428,11 +226,17 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase): function to override the class defaults. """ - conf = conf or self.conf - if fake_http: - conf['http_handler'] = fake_http - fake_app = fake_app or self.fake_app - self.middleware = auth_token.AuthProtocol(fake_app(expected_env), conf) + if conf: + self.conf.update(conf) + + if not fake_app: + fake_app = self.fake_app + + if expected_env: + self.expected_env.update(expected_env) + + self.middleware = auth_token.AuthProtocol(fake_app(self.expected_env), + self.conf) self.middleware._iso8601 = iso8601 self.middleware.revoked_file_name = tempfile.mkstemp()[1] self.middleware.token_revocation_list = jsonutils.dumps( @@ -440,15 +244,22 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase): def tearDown(self): testtools.TestCase.tearDown(self) - try: - os.remove(self.middleware.revoked_file_name) - except OSError: - pass + if self.middleware: + try: + os.remove(self.middleware.revoked_file_name) + except OSError: + pass def start_fake_response(self, status, headers): self.response_status = int(status.split(' ', 1)[0]) self.response_headers = dict(headers) + def assertLastPath(self, path): + if path: + self.assertEqual(path, httpretty.httpretty.last_request.path) + else: + self.assertIsInstance(httpretty.httpretty.last_request, + httpretty.core.HTTPrettyRequestEmpty) if tuple(sys.version_info)[0:2] < (2, 7): @@ -478,58 +289,31 @@ if tuple(sys.version_info)[0:2] < (2, 7): BaseAuthTokenMiddlewareTest = AdjustedBaseAuthTokenMiddlewareTest -class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): - """Auth Token middleware test setup that allows the tests to define - a stack of responses to HTTP requests in the test and get those - responses back in sequence for testing. - - Example:: - - resp1 = FakeHTTPResponse(401, jsonutils.dumps('')) - resp2 = FakeHTTPResponse(200, jsonutils.dumps({ - 'access': { - 'token': {'id': 'admin_token2'}, - }, - }) - FAKE_RESPONSE_STACK.append(resp1) - FAKE_RESPONSE_STACK.append(resp2) - - ... do your testing code here ... - - """ - - def setUp(self): - super(StackResponseAuthTokenMiddlewareTest, self).setUp() +class MultiStepAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): + @httpretty.activate def test_fetch_revocation_list_with_expire(self): - # first response to revocation list should return 401 Unauthorized - # to pretend to be an expired token - resp1 = FakeHTTPResponse(200, jsonutils.dumps({ - 'access': { - 'token': {'id': 'admin_token2'}, - }, - })) - resp2 = FakeHTTPResponse(401, jsonutils.dumps('')) - resp3 = FakeHTTPResponse(200, jsonutils.dumps({ - 'access': { - 'token': {'id': 'admin_token2'}, - }, - })) - resp4 = FakeHTTPResponse(200, SIGNED_REVOCATION_LIST) + self.set_middleware() - # first get_admin_token() call - FAKE_RESPONSE_STACK.append(resp1) - # request revocation list, get "unauthorized" due to simulated expired - # token - FAKE_RESPONSE_STACK.append(resp2) - # request a new admin_token - FAKE_RESPONSE_STACK.append(resp3) - # request revocation list, get the revocation list properly - FAKE_RESPONSE_STACK.append(resp4) + # Get a token, then try to retrieve revocation list and get a 401. + # Get a new token, try to retrieve revocation list and return 200. + httpretty.register_uri(httpretty.POST, "%s/v2.0/tokens" % BASE_URI, + body=FAKE_ADMIN_TOKEN) + + responses = [httpretty.Response(body='', status=401), + httpretty.Response( + body=client_fixtures.SIGNED_REVOCATION_LIST)] + + httpretty.register_uri(httpretty.GET, + "%s/v2.0/tokens/revoked" % BASE_URI, + responses=responses) fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list()) self.assertEqual(fetched_list, client_fixtures.REVOCATION_LIST) + # Check that 4 requests have been made + self.assertEqual(len(httpretty.httpretty.latest_requests), 4) + class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): """Auth Token middleware should understand Diablo keystone responses.""" @@ -541,12 +325,38 @@ class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): # now deprecated (diablo-compat) 'HTTP_X_TENANT': 'tenant_id1', } + super(DiabloAuthTokenMiddlewareTest, self).setUp( expected_env=expected_env) + httpretty.httpretty.reset() + httpretty.enable() + + httpretty.register_uri(httpretty.GET, + "%s/" % BASE_URI, + body=VERSION_LIST_v2, + status=300) + + httpretty.register_uri(httpretty.POST, + "%s/v2.0/tokens" % BASE_URI, + body=FAKE_ADMIN_TOKEN) + + self.token_id = client_fixtures.VALID_DIABLO_TOKEN + token_response = client_fixtures.JSON_TOKEN_RESPONSES[self.token_id] + + httpretty.register_uri(httpretty.GET, + "%s/v2.0/tokens/%s" % (BASE_URI, self.token_id), + body=token_response) + + self.set_middleware() + + def tearDown(self): + httpretty.disable() + super(DiabloAuthTokenMiddlewareTest, self).tearDown() + def test_valid_diablo_response(self): req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = client_fixtures.VALID_DIABLO_TOKEN + req.headers['X-Auth-Token'] = self.token_id self.middleware(req.environ, self.start_fake_response) self.assertEqual(self.response_status, 200) self.assertTrue('keystone.token_info' in req.environ) @@ -571,9 +381,6 @@ class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest): def test_not_use_cache_from_env(self): env = {'swift.cache': 'CACHE_TEST'} conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', 'memcached_servers': 'localhost:11211' } self.set_middleware(conf=conf) @@ -581,22 +388,14 @@ class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest): self.assertNotEqual(self.middleware._cache, 'CACHE_TEST') -class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): +class CommonAuthTokenMiddlewareTest(object): def test_init_does_not_call_http(self): conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, 'revocation_cache_time': 1 } - self.set_fake_http(RaisingHTTPConnection) - self.set_middleware(conf=conf, fake_http=RaisingHTTPConnection) - - def assert_valid_last_url(self, token_id): - # Default version (v2) has id in the token, override this - # method for v3 and other versions - self.assertEqual("/testadmin/v2.0/tokens/%s" % token_id, - self.middleware.http_client_class.last_requested_url) + self.set_middleware(conf=conf) + self.assertLastPath(None) def assert_valid_request_200(self, token, with_catalog=True): req = webob.Request.blank('/') @@ -613,14 +412,12 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): self.assert_valid_last_url(self.token_dict['uuid_token_default']) def test_valid_signed_request(self): - self.middleware.http_client_class.last_requested_url = '' self.assert_valid_request_200( self.token_dict['signed_token_scoped']) self.assertEqual(self.middleware.conf['auth_admin_prefix'], "/testadmin") #ensure that signed requests do not generate HTTP traffic - self.assertEqual( - '', self.middleware.http_client_class.last_requested_url) + self.assertLastPath(None) def test_revoked_token_receives_401(self): self.middleware.token_revocation_list = self.get_revocation_list_json() @@ -707,7 +504,6 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): def test_get_revocation_list_returns_fetched_list(self): # auth_token uses v2 to fetch this, so don't allow the v3 # tests to override the fake http connection - self.set_fake_http(FakeHTTPConnection) self.middleware.token_revocation_list_fetched_time = None os.remove(self.middleware.revoked_file_name) self.assertEqual(self.middleware.token_revocation_list, @@ -723,18 +519,26 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): self.assertEqual(self.middleware.token_revocation_list, in_memory_list) def test_invalid_revocation_list_raises_service_error(self): - globals()['SIGNED_REVOCATION_LIST'] = "{}" + httpretty.register_uri(httpretty.GET, + "%s/v2.0/tokens/revoked" % BASE_URI, + body="{}", + status=200) + self.assertRaises(auth_token.ServiceError, self.middleware.fetch_revocation_list) def test_fetch_revocation_list(self): # auth_token uses v2 to fetch this, so don't allow the v3 # tests to override the fake http connection - self.set_fake_http(FakeHTTPConnection) fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list()) self.assertEqual(fetched_list, client_fixtures.REVOCATION_LIST) def test_request_invalid_uuid_token(self): + # remember because we are testing the middleware we stub the connection + # to the keystone server, but this is not what gets returned + invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI + httpretty.register_uri(httpretty.GET, invalid_uri, body="", status=404) + req = webob.Request.blank('/') req.headers['X-Auth-Token'] = 'invalid-token' self.middleware(req.environ, self.start_fake_response) @@ -799,6 +603,9 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): return self.middleware._cache_get(token_id, ignore_expires=True) def test_memcache(self): + # NOTE(jamielennox): it appears that httpretty can mess with the + # memcache socket. Just disable it as it's not required here anyway. + httpretty.disable() req = webob.Request.blank('/') token = self.token_dict['signed_token_scoped'] req.headers['X-Auth-Token'] = token @@ -806,6 +613,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): self.assertNotEqual(self._get_cached_token(token), None) def test_expired(self): + httpretty.disable() req = webob.Request.blank('/') token = self.token_dict['signed_token_scoped_expired'] req.headers['X-Auth-Token'] = token @@ -813,8 +621,11 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): self.assertEqual(self.response_status, 401) def test_memcache_set_invalid_uuid(self): + invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI + httpretty.register_uri(httpretty.GET, invalid_uri, body="", status=404) + req = webob.Request.blank('/') - token = uuid.uuid4().hex + token = 'invalid-token' req.headers['X-Auth-Token'] = token self.middleware(req.environ, self.start_fake_response) self.assertRaises(auth_token.InvalidUserToken, @@ -829,6 +640,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): self._get_cached_token, token) def test_memcache_set_expired(self, extra_conf={}, extra_environ={}): + httpretty.disable() token_cache_time = 10 conf = { 'token_cache_time': token_cache_time, @@ -864,9 +676,6 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): def test_use_cache_from_env(self): env = {'swift.cache': 'CACHE_TEST'} conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', 'cache': 'swift.cache', 'memcached_servers': ['localhost:11211'] } @@ -883,10 +692,8 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): self.assertFalse(auth_token.will_expire_soon(fortyseconds)) def test_encrypt_cache_data(self): + httpretty.disable() conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', 'memcached_servers': ['localhost:11211'], 'memcache_security_strategy': 'encrypt', 'memcache_secret_key': 'mysecret' @@ -899,10 +706,8 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): self.assertEqual(self.middleware._cache_get(token), data[0]) def test_sign_cache_data(self): + httpretty.disable() conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', 'memcached_servers': ['localhost:11211'], 'memcache_security_strategy': 'mac', 'memcache_secret_key': 'mysecret' @@ -915,10 +720,8 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): self.assertEqual(self.middleware._cache_get(token), data[0]) def test_no_memcache_protection(self): + httpretty.disable() conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', 'memcached_servers': ['localhost:11211'], 'memcache_secret_key': 'mysecret' } @@ -932,44 +735,29 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): def test_assert_valid_memcache_protection_config(self): # test missing memcache_secret_key conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', 'memcached_servers': ['localhost:11211'], 'memcache_security_strategy': 'Encrypt' } self.assertRaises(Exception, self.set_middleware, conf) # test invalue memcache_security_strategy conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', 'memcached_servers': ['localhost:11211'], 'memcache_security_strategy': 'whatever' } self.assertRaises(Exception, self.set_middleware, conf) # test missing memcache_secret_key conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', 'memcached_servers': ['localhost:11211'], 'memcache_security_strategy': 'mac' } self.assertRaises(Exception, self.set_middleware, conf) conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', 'memcached_servers': ['localhost:11211'], 'memcache_security_strategy': 'Encrypt', 'memcache_secret_key': '' } self.assertRaises(Exception, self.set_middleware, conf) conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', 'memcached_servers': ['localhost:11211'], 'memcache_security_strategy': 'mAc', 'memcache_secret_key': '' @@ -978,9 +766,6 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): def test_config_revocation_cache_timeout(self): conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', 'revocation_cache_time': 24 } middleware = auth_token.AuthProtocol(self.fake_app, conf) @@ -994,24 +779,22 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): get_http_connection. """ req = webob.Request.blank('/') - token = self.token_dict['uuid_token_default'] - req.headers['X-Auth-Token'] = token - self.set_fake_http(RaisingHTTPNetworkError) + req.headers['X-Auth-Token'] = ERROR_TOKEN self.middleware.http_request_max_retries = 0 self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self._get_cached_token(token), None) + self.assertEqual(self._get_cached_token(ERROR_TOKEN), None) + self.assert_valid_last_url(ERROR_TOKEN) def test_http_request_max_retries(self): times_retry = 10 req = webob.Request.blank('/') - token = self.token_dict['uuid_token_default'] - req.headers['X-Auth-Token'] = token + req.headers['X-Auth-Token'] = ERROR_TOKEN - self.set_middleware(conf=dict(http_request_max_retries=times_retry)) + conf = {'http_request_max_retries': times_retry} + self.set_middleware(conf=conf) with mock.patch('time.sleep') as mock_obj: - self.set_fake_http(RaisingHTTPNetworkError) self.middleware(req.environ, self.start_fake_response) self.assertEqual(mock_obj.call_count, times_retry) @@ -1023,16 +806,15 @@ class CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest): self.base_dir = tempfile.mkdtemp() self.cert_dir = os.path.join(self.base_dir, 'certs') os.mkdir(self.cert_dir) - - self.conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_protocol': 'http', - 'auth_admin_prefix': '/testadmin', + conf = { 'signing_dir': self.cert_dir, } + self.set_middleware(conf=conf) + + httpretty.enable() def tearDown(self): + httpretty.disable() shutil.rmtree(self.base_dir) super(CertDownloadMiddlewareTest, self).tearDown() @@ -1040,73 +822,95 @@ class CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest): # so invocation of /usr/bin/openssl succeeds. This time we give it # an empty directory, so it fails. def test_request_no_token_dummy(self): - self.set_middleware(fake_http=self.fake_http, conf=self.conf) + cms._ensure_subprocess() + + httpretty.register_uri(httpretty.GET, + "%s/v2.0/certificates/ca" % BASE_URI, + status=404) + httpretty.register_uri(httpretty.GET, + "%s/v2.0/certificates/signing" % BASE_URI, + status=404) self.assertRaises(cms.subprocess.CalledProcessError, self.middleware.verify_signed_token, - self.token_dict['signed_token_scoped']) + client_fixtures.SIGNED_TOKEN_SCOPED) def test_fetch_signing_cert(self): - self.set_middleware(fake_http=CertificateHTTPConnection, - conf=self.conf) - + data = 'FAKE CERT' + httpretty.register_uri(httpretty.GET, + "%s/v2.0/certificates/signing" % BASE_URI, + body=data) self.middleware.fetch_signing_cert() with open(self.middleware.signing_cert_file_name, 'r') as f: - self.assertEqual(f.read(), - CertificateHTTPConnection.signing_cert_data) + self.assertEqual(f.read(), data) - self.assertEqual('/testadmin/v2.0/certificates/signing', - self.middleware.http_client_class.last_requested_url) + self.assertEqual("/testadmin/v2.0/certificates/signing", + httpretty.httpretty.last_request.path) def test_fetch_signing_ca(self): - self.set_middleware(fake_http=CertificateHTTPConnection, - conf=self.conf) - + data = 'FAKE CA' + httpretty.register_uri(httpretty.GET, + "%s/v2.0/certificates/ca" % BASE_URI, + body=data) self.middleware.fetch_ca_cert() with open(self.middleware.ca_file_name, 'r') as f: - self.assertEqual(f.read(), CertificateHTTPConnection.ca_cert_data) + self.assertEqual(f.read(), data) - self.assertEqual('/testadmin/v2.0/certificates/ca', - self.middleware.http_client_class.last_requested_url) + self.assertEqual("/testadmin/v2.0/certificates/ca", + httpretty.httpretty.last_request.path) def test_prefix_trailing_slash(self): self.conf['auth_admin_prefix'] = '/newadmin/' - self.set_middleware(fake_http=CertificateHTTPConnection, - conf=self.conf) - # the requests will return a 404, but it doesn't matter + httpretty.register_uri(httpretty.GET, + "%s/newadmin/v2.0/certificates/ca" % BASE_HOST, + body='FAKECA') + httpretty.register_uri(httpretty.GET, + "%s/newadmin/v2.0/certificates/signing" % + BASE_HOST, body='FAKECERT') + + self.set_middleware(conf=self.conf) self.middleware.fetch_ca_cert() self.assertEqual('/newadmin/v2.0/certificates/ca', - self.middleware.http_client_class.last_requested_url) + httpretty.httpretty.last_request.path) self.middleware.fetch_signing_cert() self.assertEqual('/newadmin/v2.0/certificates/signing', - self.middleware.http_client_class.last_requested_url) + httpretty.httpretty.last_request.path) def test_without_prefix(self): self.conf['auth_admin_prefix'] = '' - self.set_middleware(fake_http=CertificateHTTPConnection, - conf=self.conf) + httpretty.register_uri(httpretty.GET, + "%s/v2.0/certificates/ca" % BASE_HOST, + body='FAKECA') + httpretty.register_uri(httpretty.GET, + "%s/v2.0/certificates/signing" % BASE_HOST, + body='FAKECERT') - # the requests will return a 404, but it doesn't matter + self.set_middleware(conf=self.conf) self.middleware.fetch_ca_cert() self.assertEqual('/v2.0/certificates/ca', - self.middleware.http_client_class.last_requested_url) + httpretty.httpretty.last_request.path) self.middleware.fetch_signing_cert() self.assertEqual('/v2.0/certificates/signing', - self.middleware.http_client_class.last_requested_url) + httpretty.httpretty.last_request.path) -class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): +def network_error_response(method, uri, headers): + raise auth_token.NetworkError("Network connection error.") + + +class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, + CommonAuthTokenMiddlewareTest): """v2 token specific tests. There are some differences between how the auth-token middleware handles @@ -1122,6 +926,55 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): class, but now, since they really are v2 specifc, they are included here. """ + + def setUp(self): + super(v2AuthTokenMiddlewareTest, self).setUp() + + self.token_dict = { + 'uuid_token_default': client_fixtures.UUID_TOKEN_DEFAULT, + 'uuid_token_unscoped': client_fixtures.UUID_TOKEN_UNSCOPED, + 'signed_token_scoped': client_fixtures.SIGNED_TOKEN_SCOPED, + 'signed_token_scoped_expired': + client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED, + 'revoked_token': client_fixtures.REVOKED_TOKEN, + 'revoked_token_hash': client_fixtures.REVOKED_TOKEN_HASH + } + + httpretty.httpretty.reset() + httpretty.enable() + + httpretty.register_uri(httpretty.GET, + "%s/" % BASE_URI, + body=VERSION_LIST_v2, + status=300) + + httpretty.register_uri(httpretty.POST, + "%s/v2.0/tokens" % BASE_URI, + body=FAKE_ADMIN_TOKEN) + + httpretty.register_uri(httpretty.GET, + "%s/v2.0/tokens/revoked" % BASE_URI, + body=client_fixtures.SIGNED_REVOCATION_LIST, + status=200) + + for token in (client_fixtures.UUID_TOKEN_DEFAULT, + client_fixtures.UUID_TOKEN_UNSCOPED, + client_fixtures.UUID_TOKEN_NO_SERVICE_CATALOG): + httpretty.register_uri(httpretty.GET, + "%s/v2.0/tokens/%s" % (BASE_URI, token), + body= + client_fixtures.JSON_TOKEN_RESPONSES[token]) + + httpretty.register_uri(httpretty.GET, + '%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN), + body=network_error_response) + + self.set_middleware() + + def tearDown(self): + httpretty.disable() + super(v2AuthTokenMiddlewareTest, self).tearDown() + def assert_unscoped_default_tenant_auto_scopes(self, token): """Unscoped v2 requests with a default tenant should "auto-scope." @@ -1135,6 +988,9 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): self.assertEqual(body, ['SUCCESS']) self.assertTrue('keystone.token_info' in req.environ) + def assert_valid_last_url(self, token_id): + self.assertLastPath("/testadmin/v2.0/tokens/%s" % token_id) + def test_default_tenant_uuid_token(self): self.assert_unscoped_default_tenant_auto_scopes( client_fixtures.UUID_TOKEN_DEFAULT) @@ -1170,6 +1026,10 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): self.assertFalse(req.headers.get('X-Service-Catalog')) self.assertEqual(body, ['SUCCESS']) + +class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): + + @httpretty.activate def test_valid_uuid_request_forced_to_2_0(self): """Test forcing auth_token to use lower api version. @@ -1181,13 +1041,27 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): """ conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', 'signing_dir': client_fixtures.CERTDIR, 'auth_version': 'v2.0' } - self.set_middleware(fake_http=v3FakeHTTPConnection, conf=conf) + + httpretty.register_uri(httpretty.GET, + "%s/" % BASE_URI, + body=VERSION_LIST_v3, + status=300) + + httpretty.register_uri(httpretty.POST, + "%s/v2.0/tokens" % BASE_URI, + body=FAKE_ADMIN_TOKEN) + + token = client_fixtures.UUID_TOKEN_DEFAULT + httpretty.register_uri(httpretty.GET, + "%s/v2.0/tokens/%s" % (BASE_URI, token), + body= + client_fixtures.JSON_TOKEN_RESPONSES[token]) + + self.set_middleware(conf=conf) + # This tests will only work is auth_token has chosen to use the # lower, v2, api version req = webob.Request.blank('/') @@ -1196,20 +1070,18 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): self.assertEqual(self.response_status, 200) self.assertEqual("/testadmin/v2.0/tokens/%s" % client_fixtures.UUID_TOKEN_DEFAULT, - v3FakeHTTPConnection.last_requested_url) + httpretty.httpretty.last_request.path) def test_invalid_auth_version_request(self): conf = { - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', 'signing_dir': client_fixtures.CERTDIR, 'auth_version': 'v1.0' # v1.0 is no longer supported } self.assertRaises(Exception, self.set_middleware, conf) -class v3AuthTokenMiddlewareTest(AuthTokenMiddlewareTest): +class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, + CommonAuthTokenMiddlewareTest): """Test auth_token middleware with v3 tokens. Re-execute the AuthTokenMiddlewareTest class tests, but with the @@ -1234,7 +1106,11 @@ class v3AuthTokenMiddlewareTest(AuthTokenMiddlewareTest): """ def setUp(self): - token_dict = { + super(v3AuthTokenMiddlewareTest, self).setUp( + auth_version='v3.0', + fake_app=v3FakeApp) + + self.token_dict = { 'uuid_token_default': client_fixtures.v3_UUID_TOKEN_DEFAULT, 'uuid_token_unscoped': client_fixtures.v3_UUID_TOKEN_UNSCOPED, 'signed_token_scoped': client_fixtures.SIGNED_v3_TOKEN_SCOPED, @@ -1243,17 +1119,58 @@ class v3AuthTokenMiddlewareTest(AuthTokenMiddlewareTest): 'revoked_token': client_fixtures.REVOKED_v3_TOKEN, 'revoked_token_hash': client_fixtures.REVOKED_v3_TOKEN_HASH } - super(v3AuthTokenMiddlewareTest, self).setUp( - auth_version='v3.0', - fake_app=v3FakeApp, - fake_http=v3FakeHTTPConnection, - token_dict=token_dict) + + httpretty.httpretty.reset() + httpretty.enable() + + httpretty.register_uri(httpretty.GET, + "%s" % BASE_URI, + body=VERSION_LIST_v3, + status=300) + + # TODO(jamielennox): auth_token middleware uses a v2 admin token + # regardless of the auth_version that is set. + httpretty.register_uri(httpretty.POST, + "%s/v2.0/tokens" % BASE_URI, + body=FAKE_ADMIN_TOKEN) + + # TODO(jamielennox): there is no v3 revocation url yet, it uses v2 + httpretty.register_uri(httpretty.GET, + "%s/v2.0/tokens/revoked" % BASE_URI, + body=client_fixtures.SIGNED_REVOCATION_LIST, + status=200) + + httpretty.register_uri(httpretty.GET, + "%s/v3/auth/tokens" % BASE_URI, + body=self.token_response) + + self.set_middleware() + + def tearDown(self): + httpretty.disable() + super(v3AuthTokenMiddlewareTest, self).tearDown() + + def token_response(self, request, uri, headers): + auth_id = request.headers.get('X-Auth-Token') + token_id = request.headers.get('X-Subject-Token') + self.assertEqual(auth_id, FAKE_ADMIN_TOKEN_ID) + headers.pop('status') + + status = 200 + response = "" + + if token_id == ERROR_TOKEN: + raise auth_token.NetworkError("Network connection error.") + + try: + response = client_fixtures.JSON_TOKEN_RESPONSES[token_id] + except KeyError: + status = 404 + + return status, headers, response def assert_valid_last_url(self, token_id): - # Token ID is not part of the url in v3, so override - # this assert test in the base class - self.assertEqual('/testadmin/v3/auth/tokens', - v3FakeHTTPConnection.last_requested_url) + self.assertLastPath('/testadmin/v3/auth/tokens') def test_valid_unscoped_uuid_request(self): # Remove items that won't be in an unscoped token @@ -1271,8 +1188,7 @@ class v3AuthTokenMiddlewareTest(AuthTokenMiddlewareTest): self.set_middleware(expected_env=delta_expected_env) self.assert_valid_request_200(client_fixtures.v3_UUID_TOKEN_UNSCOPED, with_catalog=False) - self.assertEqual('/testadmin/v3/auth/tokens', - v3FakeHTTPConnection.last_requested_url) + self.assertLastPath('/testadmin/v3/auth/tokens') def test_domain_scoped_uuid_request(self): # Modify items compared to default token for a domain scope @@ -1290,8 +1206,7 @@ class v3AuthTokenMiddlewareTest(AuthTokenMiddlewareTest): self.set_middleware(expected_env=delta_expected_env) self.assert_valid_request_200( client_fixtures.v3_UUID_TOKEN_DOMAIN_SCOPED) - self.assertEqual('/testadmin/v3/auth/tokens', - v3FakeHTTPConnection.last_requested_url) + self.assertLastPath('/testadmin/v3/auth/tokens') class TokenEncodingTest(testtools.TestCase):