Rename requests mock object in testing

It has been mentioned a number of times that the self.requests naming
for the requests_mock object is confusing between whether you are
actually sending a request or are mocking a request.

Rename all entries of the requests object to requests_mock.

This cleans up a couple of entries where the older register_uri format
was being used in favour of using the HTTP method as the requests_mock
method.

Change-Id: I315085b4088130b510f9dbd696011d983598372c
This commit is contained in:
Jamie Lennox
2015-03-16 14:58:28 +11:00
parent fc1f5a7963
commit 4822d8bb9d
21 changed files with 246 additions and 229 deletions

View File

@@ -113,7 +113,7 @@ class CommonIdentityTests(object):
# register responses such that if the discovery URL is hit more than # register responses such that if the discovery URL is hit more than
# once then the response will be invalid and not point to COMPUTE_ADMIN # once then the response will be invalid and not point to COMPUTE_ADMIN
resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}] resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}]
self.requests.get(self.TEST_COMPUTE_ADMIN, resps) self.requests_mock.get(self.TEST_COMPUTE_ADMIN, resps)
body = 'SUCCESS' body = 'SUCCESS'
self.stub_url('GET', ['path'], text=body) self.stub_url('GET', ['path'], text=body)
@@ -138,7 +138,7 @@ class CommonIdentityTests(object):
# register responses such that if the discovery URL is hit more than # register responses such that if the discovery URL is hit more than
# once then the response will be invalid and not point to COMPUTE_ADMIN # once then the response will be invalid and not point to COMPUTE_ADMIN
resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}] resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}]
self.requests.get(self.TEST_COMPUTE_ADMIN, resps) self.requests_mock.get(self.TEST_COMPUTE_ADMIN, resps)
body = 'SUCCESS' body = 'SUCCESS'
self.stub_url('GET', ['path'], text=body) self.stub_url('GET', ['path'], text=body)
@@ -419,4 +419,5 @@ class GenericAuthPluginTests(utils.TestCase):
self.assertIsNone(self.session.get_token()) self.assertIsNone(self.session.get_token())
self.assertEqual(self.auth.headers, self.assertEqual(self.auth.headers,
self.session.get_auth_headers()) self.session.get_auth_headers())
self.assertNotIn('X-Auth-Token', self.requests.last_request.headers) self.assertNotIn('X-Auth-Token',
self.requests_mock.last_request.headers)

View File

@@ -200,7 +200,8 @@ class V2IdentityPlugin(utils.TestCase):
resp = s.get('/path', endpoint_filter=endpoint_filter) resp = s.get('/path', endpoint_filter=endpoint_filter)
self.assertEqual(resp.status_code, 200) self.assertEqual(resp.status_code, 200)
self.assertEqual(self.requests.last_request.url, base_url + '/path') self.assertEqual(self.requests_mock.last_request.url,
base_url + '/path')
def test_service_url(self): def test_service_url(self):
endpoint_filter = {'service_type': 'compute', endpoint_filter = {'service_type': 'compute',

View File

@@ -385,7 +385,8 @@ class V3IdentityPlugin(utils.TestCase):
resp = s.get('/path', endpoint_filter=endpoint_filter) resp = s.get('/path', endpoint_filter=endpoint_filter)
self.assertEqual(resp.status_code, 200) self.assertEqual(resp.status_code, 200)
self.assertEqual(self.requests.last_request.url, base_url + '/path') self.assertEqual(self.requests_mock.last_request.url,
base_url + '/path')
def test_service_url(self): def test_service_url(self):
endpoint_filter = {'service_type': 'compute', endpoint_filter = {'service_type': 'compute',
@@ -448,7 +449,8 @@ class V3IdentityPlugin(utils.TestCase):
{'status_code': 200, 'json': self.TEST_RESPONSE_DICT, {'status_code': 200, 'json': self.TEST_RESPONSE_DICT,
'headers': {'X-Subject-Token': 'token2'}}] 'headers': {'X-Subject-Token': 'token2'}}]
self.requests.post('%s/auth/tokens' % self.TEST_URL, auth_responses) self.requests_mock.post('%s/auth/tokens' % self.TEST_URL,
auth_responses)
a = v3.Password(self.TEST_URL, username=self.TEST_USER, a = v3.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS) password=self.TEST_PASS)
@@ -488,7 +490,7 @@ class V3IdentityPlugin(utils.TestCase):
auth_url = self.TEST_URL + '/auth/tokens' auth_url = self.TEST_URL + '/auth/tokens'
self.assertEqual(auth_url, a.token_url) self.assertEqual(auth_url, a.token_url)
self.assertEqual(auth_url + '?nocatalog', self.assertEqual(auth_url + '?nocatalog',
self.requests.last_request.url) self.requests_mock.last_request.url)
def test_symbols(self): def test_symbols(self):
self.assertIs(v3.AuthMethod, v3_base.AuthMethod) self.assertIs(v3.AuthMethod, v3_base.AuthMethod)

View File

@@ -23,7 +23,7 @@ class TokenEndpointTest(utils.TestCase):
TEST_URL = 'http://server/prefix' TEST_URL = 'http://server/prefix'
def test_basic_case(self): def test_basic_case(self):
self.requests.get(self.TEST_URL, text='body') self.requests_mock.get(self.TEST_URL, text='body')
a = token_endpoint.Token(self.TEST_URL, self.TEST_TOKEN) a = token_endpoint.Token(self.TEST_URL, self.TEST_TOKEN)
s = session.Session(auth=a) s = session.Session(auth=a)

View File

@@ -128,7 +128,7 @@ class GenericPluginTestCase(utils.TestCase):
auth_ref = auth.get_auth_ref(self.session) auth_ref = auth.get_auth_ref(self.session)
self.assertIsInstance(auth_ref, access.AccessInfoV3) self.assertIsInstance(auth_ref, access.AccessInfoV3)
self.assertEqual(self.TEST_URL + 'v3/auth/tokens', self.assertEqual(self.TEST_URL + 'v3/auth/tokens',
self.requests.last_request.url) self.requests_mock.last_request.url)
self.assertIsInstance(auth._plugin, self.V3_PLUGIN_CLASS) self.assertIsInstance(auth._plugin, self.V3_PLUGIN_CLASS)
return auth return auth
@@ -137,7 +137,7 @@ class GenericPluginTestCase(utils.TestCase):
auth_ref = auth.get_auth_ref(self.session) auth_ref = auth.get_auth_ref(self.session)
self.assertIsInstance(auth_ref, access.AccessInfoV2) self.assertIsInstance(auth_ref, access.AccessInfoV2)
self.assertEqual(self.TEST_URL + 'v2.0/tokens', self.assertEqual(self.TEST_URL + 'v2.0/tokens',
self.requests.last_request.url) self.requests_mock.last_request.url)
self.assertIsInstance(auth._plugin, self.V2_PLUGIN_CLASS) self.assertIsInstance(auth._plugin, self.V2_PLUGIN_CLASS)
return auth return auth

View File

@@ -56,7 +56,7 @@ EXTENSION_LIST = _create_extension_list([EXTENSION_FOO, EXTENSION_BAR])
class ClientDiscoveryTests(utils.TestCase): class ClientDiscoveryTests(utils.TestCase):
def test_discover_extensions_v2(self): def test_discover_extensions_v2(self):
self.requests.get("%s/extensions" % V2_URL, text=EXTENSION_LIST) self.requests_mock.get("%s/extensions" % V2_URL, text=EXTENSION_LIST)
extensions = client.Client().discover_extensions(url=V2_URL) extensions = client.Client().discover_extensions(url=V2_URL)
self.assertIn(EXTENSION_ALIAS_FOO, extensions) self.assertIn(EXTENSION_ALIAS_FOO, extensions)
self.assertEqual(extensions[EXTENSION_ALIAS_FOO], EXTENSION_NAME_FOO) self.assertEqual(extensions[EXTENSION_ALIAS_FOO], EXTENSION_NAME_FOO)

View File

@@ -222,7 +222,7 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase):
self.response_status = None self.response_status = None
self.response_headers = None self.response_headers = None
self.requests = self.useFixture(mock_fixture.Fixture()) self.requests_mock = self.useFixture(mock_fixture.Fixture())
def set_middleware(self, expected_env=None, conf=None): def set_middleware(self, expected_env=None, conf=None):
"""Configure the class ready to call the auth_token middleware. """Configure the class ready to call the auth_token middleware.
@@ -259,10 +259,10 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase):
def assertLastPath(self, path): def assertLastPath(self, path):
if path: if path:
parts = urlparse.urlparse(self.requests.last_request.url) parts = urlparse.urlparse(self.requests_mock.last_request.url)
self.assertEqual(path, parts.path) self.assertEqual(path, parts.path)
else: else:
self.assertIsNone(self.requests.last_request) self.assertIsNone(self.requests_mock.last_request)
class MultiStepAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, class MultiStepAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
@@ -275,17 +275,19 @@ class MultiStepAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
# Get a token, then try to retrieve revocation list and get a 401. # Get a token, then try to retrieve revocation list and get a 401.
# Get a new token, try to retrieve revocation list and return 200. # Get a new token, try to retrieve revocation list and return 200.
self.requests.post("%s/v2.0/tokens" % BASE_URI, text=FAKE_ADMIN_TOKEN) self.requests_mock.post("%s/v2.0/tokens" % BASE_URI,
text=FAKE_ADMIN_TOKEN)
text = self.examples.SIGNED_REVOCATION_LIST text = self.examples.SIGNED_REVOCATION_LIST
self.requests.get("%s/v2.0/tokens/revoked" % BASE_URI, self.requests_mock.get("%s/v2.0/tokens/revoked" % BASE_URI,
response_list=[{'status_code': 401}, {'text': text}]) response_list=[{'status_code': 401},
{'text': text}])
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list()) fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
self.assertEqual(fetched_list, self.examples.REVOCATION_LIST) self.assertEqual(fetched_list, self.examples.REVOCATION_LIST)
# Check that 4 requests have been made # Check that 4 requests have been made
self.assertEqual(len(self.requests.request_history), 4) self.assertEqual(len(self.requests_mock.request_history), 4)
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
@@ -306,17 +308,18 @@ class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
super(DiabloAuthTokenMiddlewareTest, self).setUp( super(DiabloAuthTokenMiddlewareTest, self).setUp(
expected_env=expected_env) expected_env=expected_env)
self.requests.get("%s/" % BASE_URI, self.requests_mock.get("%s/" % BASE_URI,
text=VERSION_LIST_v2, text=VERSION_LIST_v2,
status_code=300) status_code=300)
self.requests.post("%s/v2.0/tokens" % BASE_URI, text=FAKE_ADMIN_TOKEN) self.requests_mock.post("%s/v2.0/tokens" % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.token_id = self.examples.VALID_DIABLO_TOKEN self.token_id = self.examples.VALID_DIABLO_TOKEN
token_response = self.examples.JSON_TOKEN_RESPONSES[self.token_id] token_response = self.examples.JSON_TOKEN_RESPONSES[self.token_id]
url = '%s/v2.0/tokens/%s' % (BASE_URI, self.token_id) url = '%s/v2.0/tokens/%s' % (BASE_URI, self.token_id)
self.requests.get(url, text=token_response) self.requests_mock.get(url, text=token_response)
self.set_middleware() self.set_middleware()
@@ -871,7 +874,7 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertEqual(self.middleware.token_revocation_list, in_memory_list) self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
def test_invalid_revocation_list_raises_service_error(self): def test_invalid_revocation_list_raises_service_error(self):
self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, text='{}') self.requests_mock.get('%s/v2.0/tokens/revoked' % BASE_URI, text='{}')
self.assertRaises(auth_token.ServiceError, self.assertRaises(auth_token.ServiceError,
self.middleware.fetch_revocation_list) self.middleware.fetch_revocation_list)
@@ -886,7 +889,7 @@ class CommonAuthTokenMiddlewareTest(object):
# remember because we are testing the middleware we stub the connection # remember because we are testing the middleware we stub the connection
# to the keystone server, but this is not what gets returned # to the keystone server, but this is not what gets returned
invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
self.requests.get(invalid_uri, text="", status_code=404) self.requests_mock.get(invalid_uri, text="", status_code=404)
req = webob.Request.blank('/') req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = 'invalid-token' req.headers['X-Auth-Token'] = 'invalid-token'
@@ -974,7 +977,7 @@ class CommonAuthTokenMiddlewareTest(object):
def test_memcache_set_invalid_uuid(self): def test_memcache_set_invalid_uuid(self):
invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
self.requests.get(invalid_uri, status_code=404) self.requests_mock.get(invalid_uri, status_code=404)
req = webob.Request.blank('/') req = webob.Request.blank('/')
token = 'invalid-token' token = 'invalid-token'
@@ -1267,10 +1270,10 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_request_no_token_dummy(self): def test_request_no_token_dummy(self):
cms._ensure_subprocess() cms._ensure_subprocess()
self.requests.get("%s%s" % (BASE_URI, self.ca_path), self.requests_mock.get("%s%s" % (BASE_URI, self.ca_path),
status_code=404) status_code=404)
url = "%s%s" % (BASE_URI, self.signing_path) url = "%s%s" % (BASE_URI, self.signing_path)
self.requests.get(url, status_code=404) self.requests_mock.get(url, status_code=404)
self.assertRaises(exceptions.CertificateConfigError, self.assertRaises(exceptions.CertificateConfigError,
self.middleware.verify_signed_token, self.middleware.verify_signed_token,
self.examples.SIGNED_TOKEN_SCOPED, self.examples.SIGNED_TOKEN_SCOPED,
@@ -1279,7 +1282,7 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_fetch_signing_cert(self): def test_fetch_signing_cert(self):
data = 'FAKE CERT' data = 'FAKE CERT'
url = '%s%s' % (BASE_URI, self.signing_path) url = '%s%s' % (BASE_URI, self.signing_path)
self.requests.get(url, text=data) self.requests_mock.get(url, text=data)
self.middleware.fetch_signing_cert() self.middleware.fetch_signing_cert()
with open(self.middleware.signing_cert_file_name, 'r') as f: with open(self.middleware.signing_cert_file_name, 'r') as f:
@@ -1289,7 +1292,7 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_fetch_signing_ca(self): def test_fetch_signing_ca(self):
data = 'FAKE CA' data = 'FAKE CA'
self.requests.get("%s%s" % (BASE_URI, self.ca_path), text=data) self.requests_mock.get("%s%s" % (BASE_URI, self.ca_path), text=data)
self.middleware.fetch_ca_cert() self.middleware.fetch_ca_cert()
with open(self.middleware.signing_ca_file_name, 'r') as f: with open(self.middleware.signing_ca_file_name, 'r') as f:
@@ -1304,10 +1307,10 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.conf['auth_port'] = 1234 self.conf['auth_port'] = 1234
self.conf['auth_admin_prefix'] = '/newadmin/' self.conf['auth_admin_prefix'] = '/newadmin/'
self.requests.get("%s/newadmin%s" % (BASE_HOST, self.ca_path), self.requests_mock.get("%s/newadmin%s" % (BASE_HOST, self.ca_path),
text='FAKECA') text='FAKECA')
url = "%s/newadmin%s" % (BASE_HOST, self.signing_path) url = "%s/newadmin%s" % (BASE_HOST, self.signing_path)
self.requests.get(url, text='FAKECERT') self.requests_mock.get(url, text='FAKECERT')
self.set_middleware(conf=self.conf) self.set_middleware(conf=self.conf)
@@ -1326,8 +1329,9 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.conf['auth_port'] = 1234 self.conf['auth_port'] = 1234
self.conf['auth_admin_prefix'] = '' self.conf['auth_admin_prefix'] = ''
self.requests.get("%s%s" % (BASE_HOST, self.ca_path), text='FAKECA') self.requests_mock.get("%s%s" % (BASE_HOST, self.ca_path),
self.requests.get("%s%s" % (BASE_HOST, self.signing_path), text='FAKECA')
self.requests_mock.get("%s%s" % (BASE_HOST, self.signing_path),
text='FAKECERT') text='FAKECERT')
self.set_middleware(conf=self.conf) self.set_middleware(conf=self.conf)
@@ -1400,14 +1404,14 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.REVOKED_TOKEN_HASH_SHA256, self.examples.REVOKED_TOKEN_HASH_SHA256,
} }
self.requests.get("%s/" % BASE_URI, self.requests_mock.get("%s/" % BASE_URI,
text=VERSION_LIST_v2, text=VERSION_LIST_v2,
status_code=300) status_code=300)
self.requests.post("%s/v2.0/tokens" % BASE_URI, self.requests_mock.post("%s/v2.0/tokens" % BASE_URI,
text=FAKE_ADMIN_TOKEN) text=FAKE_ADMIN_TOKEN)
self.requests.get("%s/v2.0/tokens/revoked" % BASE_URI, self.requests_mock.get("%s/v2.0/tokens/revoked" % BASE_URI,
text=self.examples.SIGNED_REVOCATION_LIST) text=self.examples.SIGNED_REVOCATION_LIST)
for token in (self.examples.UUID_TOKEN_DEFAULT, for token in (self.examples.UUID_TOKEN_DEFAULT,
@@ -1418,10 +1422,10 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.SIGNED_TOKEN_SCOPED_KEY, self.examples.SIGNED_TOKEN_SCOPED_KEY,
self.examples.SIGNED_TOKEN_SCOPED_PKIZ_KEY,): self.examples.SIGNED_TOKEN_SCOPED_PKIZ_KEY,):
text = self.examples.JSON_TOKEN_RESPONSES[token] text = self.examples.JSON_TOKEN_RESPONSES[token]
self.requests.get('%s/v2.0/tokens/%s' % (BASE_URI, token), self.requests_mock.get('%s/v2.0/tokens/%s' % (BASE_URI, token),
text=text) text=text)
self.requests.get('%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN), self.requests_mock.get('%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN),
text=network_error_response) text=network_error_response)
self.set_middleware() self.set_middleware()
@@ -1498,16 +1502,17 @@ class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
'auth_version': 'v2.0' 'auth_version': 'v2.0'
} }
self.requests.get('%s/' % BASE_URI, self.requests_mock.get('%s/' % BASE_URI,
text=VERSION_LIST_v3, text=VERSION_LIST_v3,
status_code=300) status_code=300)
self.requests.post('%s/v2.0/tokens' % BASE_URI, text=FAKE_ADMIN_TOKEN) self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
token = self.examples.UUID_TOKEN_DEFAULT token = self.examples.UUID_TOKEN_DEFAULT
url = '%s/v2.0/tokens/%s' % (BASE_URI, token) url = '%s/v2.0/tokens/%s' % (BASE_URI, token)
response_body = self.examples.JSON_TOKEN_RESPONSES[token] response_body = self.examples.JSON_TOKEN_RESPONSES[token]
self.requests.get(url, text=response_body) self.requests_mock.get(url, text=response_body)
self.set_middleware(conf=conf) self.set_middleware(conf=conf)
@@ -1579,17 +1584,18 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.REVOKED_v3_PKIZ_TOKEN_HASH, self.examples.REVOKED_v3_PKIZ_TOKEN_HASH,
} }
self.requests.get(BASE_URI, text=VERSION_LIST_v3, status_code=300) self.requests_mock.get(BASE_URI, text=VERSION_LIST_v3, status_code=300)
# TODO(jamielennox): auth_token middleware uses a v2 admin token # TODO(jamielennox): auth_token middleware uses a v2 admin token
# regardless of the auth_version that is set. # regardless of the auth_version that is set.
self.requests.post('%s/v2.0/tokens' % BASE_URI, text=FAKE_ADMIN_TOKEN) self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
# TODO(jamielennox): there is no v3 revocation url yet, it uses v2 # TODO(jamielennox): there is no v3 revocation url yet, it uses v2
self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, self.requests_mock.get('%s/v2.0/tokens/revoked' % BASE_URI,
text=self.examples.SIGNED_REVOCATION_LIST) text=self.examples.SIGNED_REVOCATION_LIST)
self.requests.get('%s/v3/auth/tokens' % BASE_URI, self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI,
text=self.token_response) text=self.token_response)
self.set_middleware() self.set_middleware()

View File

@@ -242,7 +242,7 @@ class AvailableVersionsTests(utils.TestCase):
for path, text in six.iteritems(examples): for path, text in six.iteritems(examples):
url = "%s%s" % (BASE_URL, path) url = "%s%s" % (BASE_URL, path)
self.requests.get(url, status_code=300, text=text) self.requests_mock.get(url, status_code=300, text=text)
versions = discover.available_versions(url) versions = discover.available_versions(url)
for v in versions: for v in versions:
@@ -252,7 +252,7 @@ class AvailableVersionsTests(utils.TestCase):
matchers.Contains(n))) matchers.Contains(n)))
def test_available_versions_individual(self): def test_available_versions_individual(self):
self.requests.get(V3_URL, status_code=200, text=V3_VERSION_ENTRY) self.requests_mock.get(V3_URL, status_code=200, text=V3_VERSION_ENTRY)
versions = discover.available_versions(V3_URL) versions = discover.available_versions(V3_URL)
@@ -263,7 +263,7 @@ class AvailableVersionsTests(utils.TestCase):
self.assertIn('links', v) self.assertIn('links', v)
def test_available_keystone_data(self): def test_available_keystone_data(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
versions = discover.available_versions(BASE_URL) versions = discover.available_versions(BASE_URL)
self.assertEqual(2, len(versions)) self.assertEqual(2, len(versions))
@@ -278,7 +278,7 @@ class AvailableVersionsTests(utils.TestCase):
def test_available_cinder_data(self): def test_available_cinder_data(self):
text = jsonutils.dumps(CINDER_EXAMPLES) text = jsonutils.dumps(CINDER_EXAMPLES)
self.requests.get(BASE_URL, status_code=300, text=text) self.requests_mock.get(BASE_URL, status_code=300, text=text)
versions = discover.available_versions(BASE_URL) versions = discover.available_versions(BASE_URL)
self.assertEqual(2, len(versions)) self.assertEqual(2, len(versions))
@@ -294,7 +294,7 @@ class AvailableVersionsTests(utils.TestCase):
def test_available_glance_data(self): def test_available_glance_data(self):
text = jsonutils.dumps(GLANCE_EXAMPLES) text = jsonutils.dumps(GLANCE_EXAMPLES)
self.requests.get(BASE_URL, status_code=200, text=text) self.requests_mock.get(BASE_URL, status_code=200, text=text)
versions = discover.available_versions(BASE_URL) versions = discover.available_versions(BASE_URL)
self.assertEqual(5, len(versions)) self.assertEqual(5, len(versions))
@@ -311,7 +311,7 @@ class AvailableVersionsTests(utils.TestCase):
class ClientDiscoveryTests(utils.TestCase): class ClientDiscoveryTests(utils.TestCase):
def assertCreatesV3(self, **kwargs): def assertCreatesV3(self, **kwargs):
self.requests.post('%s/auth/tokens' % V3_URL, self.requests_mock.post('%s/auth/tokens' % V3_URL,
text=V3_AUTH_RESPONSE, text=V3_AUTH_RESPONSE,
headers={'X-Subject-Token': V3_TOKEN}) headers={'X-Subject-Token': V3_TOKEN})
@@ -322,7 +322,7 @@ class ClientDiscoveryTests(utils.TestCase):
return keystone return keystone
def assertCreatesV2(self, **kwargs): def assertCreatesV2(self, **kwargs):
self.requests.post("%s/tokens" % V2_URL, text=V2_AUTH_RESPONSE) self.requests_mock.post("%s/tokens" % V2_URL, text=V2_AUTH_RESPONSE)
kwargs.setdefault('username', 'foo') kwargs.setdefault('username', 'foo')
kwargs.setdefault('password', 'bar') kwargs.setdefault('password', 'bar')
@@ -345,78 +345,78 @@ class ClientDiscoveryTests(utils.TestCase):
client.Client, **kwargs) client.Client, **kwargs)
def test_discover_v3(self): def test_discover_v3(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
self.assertCreatesV3(auth_url=BASE_URL) self.assertCreatesV3(auth_url=BASE_URL)
def test_discover_v2(self): def test_discover_v2(self):
self.requests.get(BASE_URL, status_code=300, text=V2_VERSION_LIST) self.requests_mock.get(BASE_URL, status_code=300, text=V2_VERSION_LIST)
self.requests.post("%s/tokens" % V2_URL, text=V2_AUTH_RESPONSE) self.requests_mock.post("%s/tokens" % V2_URL, text=V2_AUTH_RESPONSE)
self.assertCreatesV2(auth_url=BASE_URL) self.assertCreatesV2(auth_url=BASE_URL)
def test_discover_endpoint_v2(self): def test_discover_endpoint_v2(self):
self.requests.get(BASE_URL, status_code=300, text=V2_VERSION_LIST) self.requests_mock.get(BASE_URL, status_code=300, text=V2_VERSION_LIST)
self.assertCreatesV2(endpoint=BASE_URL, token='fake-token') self.assertCreatesV2(endpoint=BASE_URL, token='fake-token')
def test_discover_endpoint_v3(self): def test_discover_endpoint_v3(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
self.assertCreatesV3(endpoint=BASE_URL, token='fake-token') self.assertCreatesV3(endpoint=BASE_URL, token='fake-token')
def test_discover_invalid_major_version(self): def test_discover_invalid_major_version(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
self.assertVersionNotAvailable(auth_url=BASE_URL, version=5) self.assertVersionNotAvailable(auth_url=BASE_URL, version=5)
def test_discover_200_response_fails(self): def test_discover_200_response_fails(self):
self.requests.get(BASE_URL, text='ok') self.requests_mock.get(BASE_URL, text='ok')
self.assertDiscoveryFailure(auth_url=BASE_URL) self.assertDiscoveryFailure(auth_url=BASE_URL)
def test_discover_minor_greater_than_available_fails(self): def test_discover_minor_greater_than_available_fails(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
self.assertVersionNotAvailable(endpoint=BASE_URL, version=3.4) self.assertVersionNotAvailable(endpoint=BASE_URL, version=3.4)
def test_discover_individual_version_v2(self): def test_discover_individual_version_v2(self):
self.requests.get(V2_URL, text=V2_VERSION_ENTRY) self.requests_mock.get(V2_URL, text=V2_VERSION_ENTRY)
self.assertCreatesV2(auth_url=V2_URL) self.assertCreatesV2(auth_url=V2_URL)
def test_discover_individual_version_v3(self): def test_discover_individual_version_v3(self):
self.requests.get(V3_URL, text=V3_VERSION_ENTRY) self.requests_mock.get(V3_URL, text=V3_VERSION_ENTRY)
self.assertCreatesV3(auth_url=V3_URL) self.assertCreatesV3(auth_url=V3_URL)
def test_discover_individual_endpoint_v2(self): def test_discover_individual_endpoint_v2(self):
self.requests.get(V2_URL, text=V2_VERSION_ENTRY) self.requests_mock.get(V2_URL, text=V2_VERSION_ENTRY)
self.assertCreatesV2(endpoint=V2_URL, token='fake-token') self.assertCreatesV2(endpoint=V2_URL, token='fake-token')
def test_discover_individual_endpoint_v3(self): def test_discover_individual_endpoint_v3(self):
self.requests.get(V3_URL, text=V3_VERSION_ENTRY) self.requests_mock.get(V3_URL, text=V3_VERSION_ENTRY)
self.assertCreatesV3(endpoint=V3_URL, token='fake-token') self.assertCreatesV3(endpoint=V3_URL, token='fake-token')
def test_discover_fail_to_create_bad_individual_version(self): def test_discover_fail_to_create_bad_individual_version(self):
self.requests.get(V2_URL, text=V2_VERSION_ENTRY) self.requests_mock.get(V2_URL, text=V2_VERSION_ENTRY)
self.requests.get(V3_URL, text=V3_VERSION_ENTRY) self.requests_mock.get(V3_URL, text=V3_VERSION_ENTRY)
self.assertVersionNotAvailable(auth_url=V2_URL, version=3) self.assertVersionNotAvailable(auth_url=V2_URL, version=3)
self.assertVersionNotAvailable(auth_url=V3_URL, version=2) self.assertVersionNotAvailable(auth_url=V3_URL, version=2)
def test_discover_unstable_versions(self): def test_discover_unstable_versions(self):
version_list = fixture.DiscoveryList(BASE_URL, v3_status='beta') version_list = fixture.DiscoveryList(BASE_URL, v3_status='beta')
self.requests.get(BASE_URL, status_code=300, json=version_list) self.requests_mock.get(BASE_URL, status_code=300, json=version_list)
self.assertCreatesV2(auth_url=BASE_URL) self.assertCreatesV2(auth_url=BASE_URL)
self.assertVersionNotAvailable(auth_url=BASE_URL, version=3) self.assertVersionNotAvailable(auth_url=BASE_URL, version=3)
self.assertCreatesV3(auth_url=BASE_URL, unstable=True) self.assertCreatesV3(auth_url=BASE_URL, unstable=True)
def test_discover_forwards_original_ip(self): def test_discover_forwards_original_ip(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
ip = '192.168.1.1' ip = '192.168.1.1'
self.assertCreatesV3(auth_url=BASE_URL, original_ip=ip) self.assertCreatesV3(auth_url=BASE_URL, original_ip=ip)
self.assertThat(self.requests.last_request.headers['forwarded'], self.assertThat(self.requests_mock.last_request.headers['forwarded'],
matchers.Contains(ip)) matchers.Contains(ip))
def test_discover_bad_args(self): def test_discover_bad_args(self):
@@ -424,7 +424,7 @@ class ClientDiscoveryTests(utils.TestCase):
client.Client) client.Client)
def test_discover_bad_response(self): def test_discover_bad_response(self):
self.requests.get(BASE_URL, status_code=300, json={'FOO': 'BAR'}) self.requests_mock.get(BASE_URL, status_code=300, json={'FOO': 'BAR'})
self.assertDiscoveryFailure(auth_url=BASE_URL) self.assertDiscoveryFailure(auth_url=BASE_URL)
def test_discovery_ignore_invalid(self): def test_discovery_ignore_invalid(self):
@@ -433,40 +433,40 @@ class ClientDiscoveryTests(utils.TestCase):
'media-types': V3_MEDIA_TYPES, 'media-types': V3_MEDIA_TYPES,
'status': 'stable', 'status': 'stable',
'updated': UPDATED}] 'updated': UPDATED}]
self.requests.get(BASE_URL, status_code=300, self.requests_mock.get(BASE_URL, status_code=300,
text=_create_version_list(resp)) text=_create_version_list(resp))
self.assertDiscoveryFailure(auth_url=BASE_URL) self.assertDiscoveryFailure(auth_url=BASE_URL)
def test_ignore_entry_without_links(self): def test_ignore_entry_without_links(self):
v3 = V3_VERSION.copy() v3 = V3_VERSION.copy()
v3['links'] = [] v3['links'] = []
self.requests.get(BASE_URL, status_code=300, self.requests_mock.get(BASE_URL, status_code=300,
text=_create_version_list([v3, V2_VERSION])) text=_create_version_list([v3, V2_VERSION]))
self.assertCreatesV2(auth_url=BASE_URL) self.assertCreatesV2(auth_url=BASE_URL)
def test_ignore_entry_without_status(self): def test_ignore_entry_without_status(self):
v3 = V3_VERSION.copy() v3 = V3_VERSION.copy()
del v3['status'] del v3['status']
self.requests.get(BASE_URL, status_code=300, self.requests_mock.get(BASE_URL, status_code=300,
text=_create_version_list([v3, V2_VERSION])) text=_create_version_list([v3, V2_VERSION]))
self.assertCreatesV2(auth_url=BASE_URL) self.assertCreatesV2(auth_url=BASE_URL)
def test_greater_version_than_required(self): def test_greater_version_than_required(self):
versions = fixture.DiscoveryList(BASE_URL, v3_id='v3.6') versions = fixture.DiscoveryList(BASE_URL, v3_id='v3.6')
self.requests.get(BASE_URL, json=versions) self.requests_mock.get(BASE_URL, json=versions)
self.assertCreatesV3(auth_url=BASE_URL, version=(3, 4)) self.assertCreatesV3(auth_url=BASE_URL, version=(3, 4))
def test_lesser_version_than_required(self): def test_lesser_version_than_required(self):
versions = fixture.DiscoveryList(BASE_URL, v3_id='v3.4') versions = fixture.DiscoveryList(BASE_URL, v3_id='v3.4')
self.requests.get(BASE_URL, json=versions) self.requests_mock.get(BASE_URL, json=versions)
self.assertVersionNotAvailable(auth_url=BASE_URL, version=(3, 6)) self.assertVersionNotAvailable(auth_url=BASE_URL, version=(3, 6))
def test_bad_response(self): def test_bad_response(self):
self.requests.get(BASE_URL, status_code=300, text="Ugly Duckling") self.requests_mock.get(BASE_URL, status_code=300, text="Ugly Duckling")
self.assertDiscoveryFailure(auth_url=BASE_URL) self.assertDiscoveryFailure(auth_url=BASE_URL)
def test_pass_client_arguments(self): def test_pass_client_arguments(self):
self.requests.get(BASE_URL, status_code=300, text=V2_VERSION_LIST) self.requests_mock.get(BASE_URL, status_code=300, text=V2_VERSION_LIST)
kwargs = {'original_ip': '100', 'use_keyring': False, kwargs = {'original_ip': '100', 'use_keyring': False,
'stale_duration': 15} 'stale_duration': 15}
@@ -477,9 +477,9 @@ class ClientDiscoveryTests(utils.TestCase):
self.assertFalse(cl.use_keyring) self.assertFalse(cl.use_keyring)
def test_overriding_stored_kwargs(self): def test_overriding_stored_kwargs(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
self.requests.post("%s/auth/tokens" % V3_URL, self.requests_mock.post("%s/auth/tokens" % V3_URL,
text=V3_AUTH_RESPONSE, text=V3_AUTH_RESPONSE,
headers={'X-Subject-Token': V3_TOKEN}) headers={'X-Subject-Token': V3_TOKEN})
@@ -494,7 +494,9 @@ class ClientDiscoveryTests(utils.TestCase):
self.assertEqual(client.password, 'bar') self.assertEqual(client.password, 'bar')
def test_available_versions(self): def test_available_versions(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_ENTRY) self.requests_mock.get(BASE_URL,
status_code=300,
text=V3_VERSION_ENTRY)
disc = discover.Discover(auth_url=BASE_URL) disc = discover.Discover(auth_url=BASE_URL)
versions = disc.available_versions() versions = disc.available_versions()
@@ -509,7 +511,7 @@ class ClientDiscoveryTests(utils.TestCase):
'updated': UPDATED} 'updated': UPDATED}
versions = fixture.DiscoveryList() versions = fixture.DiscoveryList()
versions.add_version(V4_VERSION) versions.add_version(V4_VERSION)
self.requests.get(BASE_URL, status_code=300, json=versions) self.requests_mock.get(BASE_URL, status_code=300, json=versions)
disc = discover.Discover(auth_url=BASE_URL) disc = discover.Discover(auth_url=BASE_URL)
self.assertRaises(exceptions.DiscoveryFailure, self.assertRaises(exceptions.DiscoveryFailure,
@@ -517,14 +519,14 @@ class ClientDiscoveryTests(utils.TestCase):
def test_discovery_fail_for_missing_v3(self): def test_discovery_fail_for_missing_v3(self):
versions = fixture.DiscoveryList(v2=True, v3=False) versions = fixture.DiscoveryList(v2=True, v3=False)
self.requests.get(BASE_URL, status_code=300, json=versions) self.requests_mock.get(BASE_URL, status_code=300, json=versions)
disc = discover.Discover(auth_url=BASE_URL) disc = discover.Discover(auth_url=BASE_URL)
self.assertRaises(exceptions.DiscoveryFailure, self.assertRaises(exceptions.DiscoveryFailure,
disc.create_client, version=(3, 0)) disc.create_client, version=(3, 0))
def _do_discovery_call(self, token=None, **kwargs): def _do_discovery_call(self, token=None, **kwargs):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
if not token: if not token:
token = uuid.uuid4().hex token = uuid.uuid4().hex
@@ -536,7 +538,7 @@ class ClientDiscoveryTests(utils.TestCase):
# will default to true as there is a plugin on the session # will default to true as there is a plugin on the session
discover.Discover(s, auth_url=BASE_URL, **kwargs) discover.Discover(s, auth_url=BASE_URL, **kwargs)
self.assertEqual(BASE_URL, self.requests.last_request.url) self.assertEqual(BASE_URL, self.requests_mock.last_request.url)
def test_setting_authenticated_true(self): def test_setting_authenticated_true(self):
token = uuid.uuid4().hex token = uuid.uuid4().hex
@@ -545,13 +547,14 @@ class ClientDiscoveryTests(utils.TestCase):
def test_setting_authenticated_false(self): def test_setting_authenticated_false(self):
self._do_discovery_call(authenticated=False) self._do_discovery_call(authenticated=False)
self.assertNotIn('X-Auth-Token', self.requests.last_request.headers) self.assertNotIn('X-Auth-Token',
self.requests_mock.last_request.headers)
class DiscoverQueryTests(utils.TestCase): class DiscoverQueryTests(utils.TestCase):
def test_available_keystone_data(self): def test_available_keystone_data(self):
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST) self.requests_mock.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
disc = discover.Discover(auth_url=BASE_URL) disc = discover.Discover(auth_url=BASE_URL)
versions = disc.version_data() versions = disc.version_data()
@@ -579,7 +582,7 @@ class DiscoverQueryTests(utils.TestCase):
def test_available_cinder_data(self): def test_available_cinder_data(self):
text = jsonutils.dumps(CINDER_EXAMPLES) text = jsonutils.dumps(CINDER_EXAMPLES)
self.requests.get(BASE_URL, status_code=300, text=text) self.requests_mock.get(BASE_URL, status_code=300, text=text)
v1_url = "%sv1/" % BASE_URL v1_url = "%sv1/" % BASE_URL
v2_url = "%sv2/" % BASE_URL v2_url = "%sv2/" % BASE_URL
@@ -610,7 +613,7 @@ class DiscoverQueryTests(utils.TestCase):
def test_available_glance_data(self): def test_available_glance_data(self):
text = jsonutils.dumps(GLANCE_EXAMPLES) text = jsonutils.dumps(GLANCE_EXAMPLES)
self.requests.get(BASE_URL, text=text) self.requests_mock.get(BASE_URL, text=text)
v1_url = "%sv1/" % BASE_URL v1_url = "%sv1/" % BASE_URL
v2_url = "%sv2/" % BASE_URL v2_url = "%sv2/" % BASE_URL
@@ -659,7 +662,7 @@ class DiscoverQueryTests(utils.TestCase):
'status': status, 'status': status,
'updated': UPDATED}] 'updated': UPDATED}]
text = jsonutils.dumps({'versions': version_list}) text = jsonutils.dumps({'versions': version_list})
self.requests.get(BASE_URL, text=text) self.requests_mock.get(BASE_URL, text=text)
disc = discover.Discover(auth_url=BASE_URL) disc = discover.Discover(auth_url=BASE_URL)
@@ -681,7 +684,7 @@ class DiscoverQueryTests(utils.TestCase):
'status': status, 'status': status,
'updated': UPDATED}] 'updated': UPDATED}]
text = jsonutils.dumps({'versions': version_list}) text = jsonutils.dumps({'versions': version_list})
self.requests.get(BASE_URL, text=text) self.requests_mock.get(BASE_URL, text=text)
disc = discover.Discover(auth_url=BASE_URL) disc = discover.Discover(auth_url=BASE_URL)
@@ -698,7 +701,7 @@ class DiscoverQueryTests(utils.TestCase):
status = 'abcdef' status = 'abcdef'
version_list = fixture.DiscoveryList(BASE_URL, v2=False, version_list = fixture.DiscoveryList(BASE_URL, v2=False,
v3_status=status) v3_status=status)
self.requests.get(BASE_URL, json=version_list) self.requests_mock.get(BASE_URL, json=version_list)
disc = discover.Discover(auth_url=BASE_URL) disc = discover.Discover(auth_url=BASE_URL)
versions = disc.version_data() versions = disc.version_data()
@@ -727,7 +730,7 @@ class DiscoverQueryTests(utils.TestCase):
}] }]
text = jsonutils.dumps({'versions': version_list}) text = jsonutils.dumps({'versions': version_list})
self.requests.get(BASE_URL, text=text) self.requests_mock.get(BASE_URL, text=text)
disc = discover.Discover(auth_url=BASE_URL) disc = discover.Discover(auth_url=BASE_URL)

View File

@@ -68,8 +68,8 @@ class ClientTest(utils.TestCase):
self.stub_url('GET', text=RESPONSE_BODY) self.stub_url('GET', text=RESPONSE_BODY)
resp, body = cl.get("/hi") resp, body = cl.get("/hi")
self.assertEqual(self.requests.last_request.method, 'GET') self.assertEqual(self.requests_mock.last_request.method, 'GET')
self.assertEqual(self.requests.last_request.url, self.TEST_URL) self.assertEqual(self.requests_mock.last_request.url, self.TEST_URL)
self.assertRequestHeaderEqual('X-Auth-Token', 'token') self.assertRequestHeaderEqual('X-Auth-Token', 'token')
self.assertRequestHeaderEqual('User-Agent', httpclient.USER_AGENT) self.assertRequestHeaderEqual('User-Agent', httpclient.USER_AGENT)
@@ -108,8 +108,8 @@ class ClientTest(utils.TestCase):
self.stub_url('POST') self.stub_url('POST')
cl.post("/hi", body=[1, 2, 3]) cl.post("/hi", body=[1, 2, 3])
self.assertEqual(self.requests.last_request.method, 'POST') self.assertEqual(self.requests_mock.last_request.method, 'POST')
self.assertEqual(self.requests.last_request.body, '[1, 2, 3]') self.assertEqual(self.requests_mock.last_request.body, '[1, 2, 3]')
self.assertRequestHeaderEqual('X-Auth-Token', 'token') self.assertRequestHeaderEqual('X-Auth-Token', 'token')
self.assertRequestHeaderEqual('Content-Type', 'application/json') self.assertRequestHeaderEqual('Content-Type', 'application/json')
@@ -164,7 +164,7 @@ class BasicRequestTests(utils.TestCase):
if not url: if not url:
url = self.url url = self.url
self.requests.register_uri(method, url, text=response, self.requests_mock.register_uri(method, url, text=response,
status_code=status_code) status_code=status_code)
return httpclient.request(url, method, **kwargs) return httpclient.request(url, method, **kwargs)
@@ -176,7 +176,7 @@ class BasicRequestTests(utils.TestCase):
self.request(method=method, status_code=status, response=response) self.request(method=method, status_code=status, response=response)
self.assertEqual(self.requests.last_request.method, method) self.assertEqual(self.requests_mock.last_request.method, method)
logger_message = self.logger_message.getvalue() logger_message = self.logger_message.getvalue()

View File

@@ -64,7 +64,9 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
super(S3TokenMiddlewareTestGood, self).setUp() super(S3TokenMiddlewareTestGood, self).setUp()
self.middleware = s3_token.S3Token(FakeApp(), self.conf) self.middleware = s3_token.S3Token(FakeApp(), self.conf)
self.requests.post(self.TEST_URL, status_code=201, json=GOOD_RESPONSE) self.requests_mock.post(self.TEST_URL,
status_code=201,
json=GOOD_RESPONSE)
# Ignore the request and pass to the next middleware in the # Ignore the request and pass to the next middleware in the
# pipeline if no path has been specified. # pipeline if no path has been specified.
@@ -98,7 +100,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
TEST_URL = 'http://%s:%d/v2.0/s3tokens' % (self.TEST_HOST, TEST_URL = 'http://%s:%d/v2.0/s3tokens' % (self.TEST_HOST,
self.TEST_PORT) self.TEST_PORT)
self.requests.post(TEST_URL, status_code=201, json=GOOD_RESPONSE) self.requests_mock.post(TEST_URL, status_code=201, json=GOOD_RESPONSE)
self.middleware = ( self.middleware = (
s3_token.filter_factory({'auth_protocol': 'http', s3_token.filter_factory({'auth_protocol': 'http',
@@ -151,7 +153,7 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
{"message": "EC2 access key not found.", {"message": "EC2 access key not found.",
"code": 401, "code": 401,
"title": "Unauthorized"}} "title": "Unauthorized"}}
self.requests.post(self.TEST_URL, status_code=403, json=ret) self.requests_mock.post(self.TEST_URL, status_code=403, json=ret)
req = webob.Request.blank('/v1/AUTH_cfa/c/o') req = webob.Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'access:signature' req.headers['Authorization'] = 'access:signature'
req.headers['X-Storage-Token'] = 'token' req.headers['X-Storage-Token'] = 'token'
@@ -183,7 +185,9 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
self.assertEqual(resp.status_int, s3_invalid_req.status_int) self.assertEqual(resp.status_int, s3_invalid_req.status_int)
def test_bad_reply(self): def test_bad_reply(self):
self.requests.post(self.TEST_URL, status_code=201, text="<badreply>") self.requests_mock.post(self.TEST_URL,
status_code=201,
text="<badreply>")
req = webob.Request.blank('/v1/AUTH_cfa/c/o') req = webob.Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'access:signature' req.headers['Authorization'] = 'access:signature'

View File

@@ -38,7 +38,7 @@ class SessionTests(utils.TestCase):
self.stub_url('GET', text='response') self.stub_url('GET', text='response')
resp = session.get(self.TEST_URL) resp = session.get(self.TEST_URL)
self.assertEqual('GET', self.requests.last_request.method) self.assertEqual('GET', self.requests_mock.last_request.method)
self.assertEqual(resp.text, 'response') self.assertEqual(resp.text, 'response')
self.assertTrue(resp.ok) self.assertTrue(resp.ok)
@@ -47,7 +47,7 @@ class SessionTests(utils.TestCase):
self.stub_url('POST', text='response') self.stub_url('POST', text='response')
resp = session.post(self.TEST_URL, json={'hello': 'world'}) resp = session.post(self.TEST_URL, json={'hello': 'world'})
self.assertEqual('POST', self.requests.last_request.method) self.assertEqual('POST', self.requests_mock.last_request.method)
self.assertEqual(resp.text, 'response') self.assertEqual(resp.text, 'response')
self.assertTrue(resp.ok) self.assertTrue(resp.ok)
self.assertRequestBodyIs(json={'hello': 'world'}) self.assertRequestBodyIs(json={'hello': 'world'})
@@ -57,7 +57,7 @@ class SessionTests(utils.TestCase):
self.stub_url('HEAD') self.stub_url('HEAD')
resp = session.head(self.TEST_URL) resp = session.head(self.TEST_URL)
self.assertEqual('HEAD', self.requests.last_request.method) self.assertEqual('HEAD', self.requests_mock.last_request.method)
self.assertTrue(resp.ok) self.assertTrue(resp.ok)
self.assertRequestBodyIs('') self.assertRequestBodyIs('')
@@ -66,7 +66,7 @@ class SessionTests(utils.TestCase):
self.stub_url('PUT', text='response') self.stub_url('PUT', text='response')
resp = session.put(self.TEST_URL, json={'hello': 'world'}) resp = session.put(self.TEST_URL, json={'hello': 'world'})
self.assertEqual('PUT', self.requests.last_request.method) self.assertEqual('PUT', self.requests_mock.last_request.method)
self.assertEqual(resp.text, 'response') self.assertEqual(resp.text, 'response')
self.assertTrue(resp.ok) self.assertTrue(resp.ok)
self.assertRequestBodyIs(json={'hello': 'world'}) self.assertRequestBodyIs(json={'hello': 'world'})
@@ -76,7 +76,7 @@ class SessionTests(utils.TestCase):
self.stub_url('DELETE', text='response') self.stub_url('DELETE', text='response')
resp = session.delete(self.TEST_URL) resp = session.delete(self.TEST_URL)
self.assertEqual('DELETE', self.requests.last_request.method) self.assertEqual('DELETE', self.requests_mock.last_request.method)
self.assertTrue(resp.ok) self.assertTrue(resp.ok)
self.assertEqual(resp.text, 'response') self.assertEqual(resp.text, 'response')
@@ -85,7 +85,7 @@ class SessionTests(utils.TestCase):
self.stub_url('PATCH', text='response') self.stub_url('PATCH', text='response')
resp = session.patch(self.TEST_URL, json={'hello': 'world'}) resp = session.patch(self.TEST_URL, json={'hello': 'world'})
self.assertEqual('PATCH', self.requests.last_request.method) self.assertEqual('PATCH', self.requests_mock.last_request.method)
self.assertTrue(resp.ok) self.assertTrue(resp.ok)
self.assertEqual(resp.text, 'response') self.assertEqual(resp.text, 'response')
self.assertRequestBodyIs(json={'hello': 'world'}) self.assertRequestBodyIs(json={'hello': 'world'})
@@ -202,7 +202,7 @@ class SessionTests(utils.TestCase):
m.assert_called_with(2.0) m.assert_called_with(2.0)
# we count retries so there will be one initial request + 3 retries # we count retries so there will be one initial request + 3 retries
self.assertThat(self.requests.request_history, self.assertThat(self.requests_mock.request_history,
matchers.HasLength(retries + 1)) matchers.HasLength(retries + 1))
def test_uses_tcp_keepalive_by_default(self): def test_uses_tcp_keepalive_by_default(self):
@@ -234,13 +234,13 @@ class RedirectTests(utils.TestCase):
redirect_kwargs.setdefault('text', self.DEFAULT_REDIRECT_BODY) redirect_kwargs.setdefault('text', self.DEFAULT_REDIRECT_BODY)
for s, d in zip(self.REDIRECT_CHAIN, self.REDIRECT_CHAIN[1:]): for s, d in zip(self.REDIRECT_CHAIN, self.REDIRECT_CHAIN[1:]):
self.requests.register_uri(method, s, status_code=status_code, self.requests_mock.register_uri(method, s, status_code=status_code,
headers={'Location': d}, headers={'Location': d},
**redirect_kwargs) **redirect_kwargs)
final_kwargs.setdefault('status_code', 200) final_kwargs.setdefault('status_code', 200)
final_kwargs.setdefault('text', self.DEFAULT_RESP_BODY) final_kwargs.setdefault('text', self.DEFAULT_RESP_BODY)
self.requests.register_uri(method, self.REDIRECT_CHAIN[-1], self.requests_mock.register_uri(method, self.REDIRECT_CHAIN[-1],
**final_kwargs) **final_kwargs)
def assertResponse(self, resp): def assertResponse(self, resp):
@@ -405,7 +405,7 @@ class SessionAuthTests(utils.TestCase):
base_url = AuthPlugin.SERVICE_URLS[service_type][interface] base_url = AuthPlugin.SERVICE_URLS[service_type][interface]
uri = "%s/%s" % (base_url.rstrip('/'), path.lstrip('/')) uri = "%s/%s" % (base_url.rstrip('/'), path.lstrip('/'))
self.requests.register_uri(method, uri, **kwargs) self.requests_mock.register_uri(method, uri, **kwargs)
def test_auth_plugin_default_with_plugin(self): def test_auth_plugin_default_with_plugin(self):
self.stub_url('GET', base_url=self.TEST_URL, json=self.TEST_JSON) self.stub_url('GET', base_url=self.TEST_URL, json=self.TEST_JSON)
@@ -446,7 +446,7 @@ class SessionAuthTests(utils.TestCase):
endpoint_filter={'service_type': service_type, endpoint_filter={'service_type': service_type,
'interface': interface}) 'interface': interface})
self.assertEqual(self.requests.last_request.url, self.assertEqual(self.requests_mock.last_request.url,
AuthPlugin.SERVICE_URLS['compute']['public'] + path) AuthPlugin.SERVICE_URLS['compute']['public'] + path)
self.assertEqual(resp.text, body) self.assertEqual(resp.text, body)
self.assertEqual(resp.status_code, status) self.assertEqual(resp.status_code, status)
@@ -468,7 +468,7 @@ class SessionAuthTests(utils.TestCase):
def test_raises_exc_only_when_asked(self): def test_raises_exc_only_when_asked(self):
# A request that returns a HTTP error should by default raise an # A request that returns a HTTP error should by default raise an
# exception by default, if you specify raise_exc=False then it will not # exception by default, if you specify raise_exc=False then it will not
self.requests.get(self.TEST_URL, status_code=401) self.requests_mock.get(self.TEST_URL, status_code=401)
sess = client_session.Session() sess = client_session.Session()
self.assertRaises(exceptions.Unauthorized, sess.get, self.TEST_URL) self.assertRaises(exceptions.Unauthorized, sess.get, self.TEST_URL)
@@ -480,7 +480,7 @@ class SessionAuthTests(utils.TestCase):
passed = CalledAuthPlugin() passed = CalledAuthPlugin()
sess = client_session.Session() sess = client_session.Session()
self.requests.get(CalledAuthPlugin.ENDPOINT + 'path', self.requests_mock.get(CalledAuthPlugin.ENDPOINT + 'path',
status_code=200) status_code=200)
endpoint_filter = {'service_type': 'identity'} endpoint_filter = {'service_type': 'identity'}
@@ -504,7 +504,7 @@ class SessionAuthTests(utils.TestCase):
sess = client_session.Session(fixed) sess = client_session.Session(fixed)
self.requests.get(CalledAuthPlugin.ENDPOINT + 'path', self.requests_mock.get(CalledAuthPlugin.ENDPOINT + 'path',
status_code=200) status_code=200)
resp = sess.get('path', auth=passed, resp = sess.get('path', auth=passed,
@@ -537,7 +537,7 @@ class SessionAuthTests(utils.TestCase):
auth = CalledAuthPlugin(invalidate=True) auth = CalledAuthPlugin(invalidate=True)
sess = client_session.Session(auth=auth) sess = client_session.Session(auth=auth)
self.requests.get(self.TEST_URL, self.requests_mock.get(self.TEST_URL,
[{'text': 'Failed', 'status_code': 401}, [{'text': 'Failed', 'status_code': 401},
{'text': 'Hello', 'status_code': 200}]) {'text': 'Hello', 'status_code': 200}])
@@ -552,7 +552,7 @@ class SessionAuthTests(utils.TestCase):
auth = CalledAuthPlugin(invalidate=True) auth = CalledAuthPlugin(invalidate=True)
sess = client_session.Session(auth=auth) sess = client_session.Session(auth=auth)
self.requests.get(self.TEST_URL, self.requests_mock.get(self.TEST_URL,
[{'text': 'Failed', 'status_code': 401}, [{'text': 'Failed', 'status_code': 401},
{'text': 'Hello', 'status_code': 200}]) {'text': 'Hello', 'status_code': 200}])
@@ -569,14 +569,14 @@ class SessionAuthTests(utils.TestCase):
override_url = override_base + path override_url = override_base + path
resp_text = uuid.uuid4().hex resp_text = uuid.uuid4().hex
self.requests.get(override_url, text=resp_text) self.requests_mock.get(override_url, text=resp_text)
resp = sess.get(path, resp = sess.get(path,
endpoint_override=override_base, endpoint_override=override_base,
endpoint_filter={'service_type': 'identity'}) endpoint_filter={'service_type': 'identity'})
self.assertEqual(resp_text, resp.text) self.assertEqual(resp_text, resp.text)
self.assertEqual(override_url, self.requests.last_request.url) self.assertEqual(override_url, self.requests_mock.last_request.url)
self.assertTrue(auth.get_token_called) self.assertTrue(auth.get_token_called)
self.assertFalse(auth.get_endpoint_called) self.assertFalse(auth.get_endpoint_called)
@@ -589,14 +589,14 @@ class SessionAuthTests(utils.TestCase):
url = self.TEST_URL + path url = self.TEST_URL + path
resp_text = uuid.uuid4().hex resp_text = uuid.uuid4().hex
self.requests.get(url, text=resp_text) self.requests_mock.get(url, text=resp_text)
resp = sess.get(url, resp = sess.get(url,
endpoint_override='http://someother.url', endpoint_override='http://someother.url',
endpoint_filter={'service_type': 'identity'}) endpoint_filter={'service_type': 'identity'})
self.assertEqual(resp_text, resp.text) self.assertEqual(resp_text, resp.text)
self.assertEqual(url, self.requests.last_request.url) self.assertEqual(url, self.requests_mock.last_request.url)
self.assertTrue(auth.get_token_called) self.assertTrue(auth.get_token_called)
self.assertFalse(auth.get_endpoint_called) self.assertFalse(auth.get_endpoint_called)
@@ -718,12 +718,12 @@ class AdapterTest(utils.TestCase):
adpt = adapter.Adapter(sess, endpoint_override=endpoint_override) adpt = adapter.Adapter(sess, endpoint_override=endpoint_override)
response = uuid.uuid4().hex response = uuid.uuid4().hex
self.requests.get(endpoint_url, text=response) self.requests_mock.get(endpoint_url, text=response)
resp = adpt.get(path) resp = adpt.get(path)
self.assertEqual(response, resp.text) self.assertEqual(response, resp.text)
self.assertEqual(endpoint_url, self.requests.last_request.url) self.assertEqual(endpoint_url, self.requests_mock.last_request.url)
self.assertEqual(endpoint_override, adpt.get_endpoint()) self.assertEqual(endpoint_override, adpt.get_endpoint())
@@ -760,7 +760,7 @@ class AdapterTest(utils.TestCase):
self.assertEqual(retries, m.call_count) self.assertEqual(retries, m.call_count)
# we count retries so there will be one initial request + 2 retries # we count retries so there will be one initial request + 2 retries
self.assertThat(self.requests.request_history, self.assertThat(self.requests_mock.request_history,
matchers.HasLength(retries + 1)) matchers.HasLength(retries + 1))
def test_user_and_project_id(self): def test_user_and_project_id(self):

View File

@@ -48,7 +48,7 @@ class TestCase(testtools.TestCase):
self.time_patcher = mock.patch.object(time, 'time', lambda: 1234) self.time_patcher = mock.patch.object(time, 'time', lambda: 1234)
self.time_patcher.start() self.time_patcher.start()
self.requests = self.useFixture(fixture.Fixture()) self.requests_mock = self.useFixture(fixture.Fixture())
def tearDown(self): def tearDown(self):
self.time_patcher.stop() self.time_patcher.stop()
@@ -71,10 +71,10 @@ class TestCase(testtools.TestCase):
url = base_url url = base_url
url = url.replace("/?", "?") url = url.replace("/?", "?")
self.requests.register_uri(method, url, **kwargs) self.requests_mock.register_uri(method, url, **kwargs)
def assertRequestBodyIs(self, body=None, json=None): def assertRequestBodyIs(self, body=None, json=None):
last_request_body = self.requests.last_request.body last_request_body = self.requests_mock.last_request.body
if json: if json:
val = jsonutils.loads(last_request_body) val = jsonutils.loads(last_request_body)
self.assertEqual(json, val) self.assertEqual(json, val)
@@ -87,7 +87,7 @@ class TestCase(testtools.TestCase):
The qs parameter should be of the format \'foo=bar&abc=xyz\' The qs parameter should be of the format \'foo=bar&abc=xyz\'
""" """
expected = urlparse.parse_qs(qs, keep_blank_values=True) expected = urlparse.parse_qs(qs, keep_blank_values=True)
parts = urlparse.urlparse(self.requests.last_request.url) parts = urlparse.urlparse(self.requests_mock.last_request.url)
querystring = urlparse.parse_qs(parts.query, keep_blank_values=True) querystring = urlparse.parse_qs(parts.query, keep_blank_values=True)
self.assertEqual(expected, querystring) self.assertEqual(expected, querystring)
@@ -101,7 +101,7 @@ class TestCase(testtools.TestCase):
verified is that the parameter is present. verified is that the parameter is present.
""" """
parts = urlparse.urlparse(self.requests.last_request.url) parts = urlparse.urlparse(self.requests_mock.last_request.url)
qs = urlparse.parse_qs(parts.query, keep_blank_values=True) qs = urlparse.parse_qs(parts.query, keep_blank_values=True)
for k, v in six.iteritems(kwargs): for k, v in six.iteritems(kwargs):
@@ -113,7 +113,7 @@ class TestCase(testtools.TestCase):
The request must have already been made. The request must have already been made.
""" """
headers = self.requests.last_request.headers headers = self.requests_mock.last_request.headers
self.assertEqual(headers.get(name), val) self.assertEqual(headers.get(name), val)

View File

@@ -159,13 +159,13 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
cl = client.Client(auth_url=self.TEST_URL, cl = client.Client(auth_url=self.TEST_URL,
token=fake_token) token=fake_token)
json_body = jsonutils.loads(self.requests.last_request.body) json_body = jsonutils.loads(self.requests_mock.last_request.body)
self.assertEqual(json_body['auth']['token']['id'], fake_token) self.assertEqual(json_body['auth']['token']['id'], fake_token)
resp, body = cl.get(fake_url) resp, body = cl.get(fake_url)
self.assertEqual(fake_resp, body) self.assertEqual(fake_resp, body)
token = self.requests.last_request.headers.get('X-Auth-Token') token = self.requests_mock.last_request.headers.get('X-Auth-Token')
self.assertEqual(self.TEST_TOKEN, token) self.assertEqual(self.TEST_TOKEN, token)
def test_authenticate_success_token_scoped(self): def test_authenticate_success_token_scoped(self):
@@ -236,7 +236,7 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
resp, body = cl.get(fake_url) resp, body = cl.get(fake_url)
self.assertEqual(fake_resp, body) self.assertEqual(fake_resp, body)
token = self.requests.last_request.headers.get('X-Auth-Token') token = self.requests_mock.last_request.headers.get('X-Auth-Token')
self.assertEqual(self.TEST_TOKEN, token) self.assertEqual(self.TEST_TOKEN, token)
# then override that token and the new token shall be used # then override that token and the new token shall be used
@@ -245,7 +245,7 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
resp, body = cl.get(fake_url) resp, body = cl.get(fake_url)
self.assertEqual(fake_resp, body) self.assertEqual(fake_resp, body)
token = self.requests.last_request.headers.get('X-Auth-Token') token = self.requests_mock.last_request.headers.get('X-Auth-Token')
self.assertEqual(fake_token, token) self.assertEqual(fake_token, token)
# if we clear that overridden token then we fall back to the original # if we clear that overridden token then we fall back to the original
@@ -254,5 +254,5 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
resp, body = cl.get(fake_url) resp, body = cl.get(fake_url)
self.assertEqual(fake_resp, body) self.assertEqual(fake_resp, body)
token = self.requests.last_request.headers.get('X-Auth-Token') token = self.requests_mock.last_request.headers.get('X-Auth-Token')
self.assertEqual(self.TEST_TOKEN, token) self.assertEqual(self.TEST_TOKEN, token)

View File

@@ -80,9 +80,9 @@ class ShellTests(utils.TestCase):
return out return out
def assert_called(self, method, path, base_url=TEST_URL): def assert_called(self, method, path, base_url=TEST_URL):
self.assertEqual(method, self.requests.last_request.method) self.assertEqual(method, self.requests_mock.last_request.method)
self.assertEqual(base_url + path.lstrip('/'), self.assertEqual(base_url + path.lstrip('/'),
self.requests.last_request.url) self.requests_mock.last_request.url)
def test_user_list(self): def test_user_list(self):
self.stub_url('GET', ['users'], json={'users': []}) self.stub_url('GET', ['users'], json={'users': []})
@@ -394,7 +394,7 @@ class ShellTests(utils.TestCase):
def called_anytime(method, path, json=None): def called_anytime(method, path, json=None):
test_url = self.TEST_URL.strip('/') test_url = self.TEST_URL.strip('/')
for r in self.requests.request_history: for r in self.requests_mock.request_history:
if not r.method == method: if not r.method == method:
continue continue
if not r.url == test_url + path: if not r.url == test_url + path:

View File

@@ -226,13 +226,13 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
cl = client.Client(auth_url=self.TEST_URL, cl = client.Client(auth_url=self.TEST_URL,
token=fake_token) token=fake_token)
body = jsonutils.loads(self.requests.last_request.body) body = jsonutils.loads(self.requests_mock.last_request.body)
self.assertEqual(body['auth']['identity']['token']['id'], fake_token) self.assertEqual(body['auth']['identity']['token']['id'], fake_token)
resp, body = cl.get(fake_url) resp, body = cl.get(fake_url)
self.assertEqual(fake_resp, body) self.assertEqual(fake_resp, body)
token = self.requests.last_request.headers.get('X-Auth-Token') token = self.requests_mock.last_request.headers.get('X-Auth-Token')
self.assertEqual(self.TEST_TOKEN, token) self.assertEqual(self.TEST_TOKEN, token)
def test_authenticate_success_token_domain_scoped(self): def test_authenticate_success_token_domain_scoped(self):
@@ -330,7 +330,7 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
resp, body = cl.get(fake_url) resp, body = cl.get(fake_url)
self.assertEqual(fake_resp, body) self.assertEqual(fake_resp, body)
token = self.requests.last_request.headers.get('X-Auth-Token') token = self.requests_mock.last_request.headers.get('X-Auth-Token')
self.assertEqual(self.TEST_TOKEN, token) self.assertEqual(self.TEST_TOKEN, token)
# then override that token and the new token shall be used # then override that token and the new token shall be used
@@ -339,7 +339,7 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
resp, body = cl.get(fake_url) resp, body = cl.get(fake_url)
self.assertEqual(fake_resp, body) self.assertEqual(fake_resp, body)
token = self.requests.last_request.headers.get('X-Auth-Token') token = self.requests_mock.last_request.headers.get('X-Auth-Token')
self.assertEqual(fake_token, token) self.assertEqual(fake_token, token)
# if we clear that overridden token then we fall back to the original # if we clear that overridden token then we fall back to the original
@@ -348,5 +348,5 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
resp, body = cl.get(fake_url) resp, body = cl.get(fake_url)
self.assertEqual(fake_resp, body) self.assertEqual(fake_resp, body)
token = self.requests.last_request.headers.get('X-Auth-Token') token = self.requests_mock.last_request.headers.get('X-Auth-Token')
self.assertEqual(self.TEST_TOKEN, token) self.assertEqual(self.TEST_TOKEN, token)

View File

@@ -128,7 +128,7 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
def test_initial_sp_call(self): def test_initial_sp_call(self):
"""Test initial call, expect SOAP message.""" """Test initial call, expect SOAP message."""
self.requests.get( self.requests_mock.get(
self.FEDERATION_AUTH_URL, self.FEDERATION_AUTH_URL,
content=make_oneline(saml2_fixtures.SP_SOAP_RESPONSE)) content=make_oneline(saml2_fixtures.SP_SOAP_RESPONSE))
a = self.saml2plugin._send_service_provider_request(self.session) a = self.saml2plugin._send_service_provider_request(self.session)
@@ -153,7 +153,7 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
str(self.saml2plugin.sp_response_consumer_url))) str(self.saml2plugin.sp_response_consumer_url)))
def test_initial_sp_call_when_saml_authenticated(self): def test_initial_sp_call_when_saml_authenticated(self):
self.requests.get( self.requests_mock.get(
self.FEDERATION_AUTH_URL, self.FEDERATION_AUTH_URL,
json=saml2_fixtures.UNSCOPED_TOKEN, json=saml2_fixtures.UNSCOPED_TOKEN,
headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER}) headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER})
@@ -168,7 +168,7 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
self.saml2plugin.authenticated_response.headers['X-Subject-Token']) self.saml2plugin.authenticated_response.headers['X-Subject-Token'])
def test_get_unscoped_token_when_authenticated(self): def test_get_unscoped_token_when_authenticated(self):
self.requests.get( self.requests_mock.get(
self.FEDERATION_AUTH_URL, self.FEDERATION_AUTH_URL,
json=saml2_fixtures.UNSCOPED_TOKEN, json=saml2_fixtures.UNSCOPED_TOKEN,
headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER, headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER,
@@ -181,7 +181,7 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
def test_initial_sp_call_invalid_response(self): def test_initial_sp_call_invalid_response(self):
"""Send initial SP HTTP request and receive wrong server response.""" """Send initial SP HTTP request and receive wrong server response."""
self.requests.get(self.FEDERATION_AUTH_URL, self.requests_mock.get(self.FEDERATION_AUTH_URL,
text='NON XML RESPONSE') text='NON XML RESPONSE')
self.assertRaises( self.assertRaises(
@@ -190,7 +190,7 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
self.session) self.session)
def test_send_authn_req_to_idp(self): def test_send_authn_req_to_idp(self):
self.requests.post(self.IDENTITY_PROVIDER_URL, self.requests_mock.post(self.IDENTITY_PROVIDER_URL,
content=saml2_fixtures.SAML2_ASSERTION) content=saml2_fixtures.SAML2_ASSERTION)
self.saml2plugin.sp_response_consumer_url = self.SHIB_CONSUMER_URL self.saml2plugin.sp_response_consumer_url = self.SHIB_CONSUMER_URL
@@ -208,7 +208,7 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
self.assertEqual(idp_response, saml2_assertion_oneline, error) self.assertEqual(idp_response, saml2_assertion_oneline, error)
def test_fail_basicauth_idp_authentication(self): def test_fail_basicauth_idp_authentication(self):
self.requests.post(self.IDENTITY_PROVIDER_URL, status_code=401) self.requests_mock.post(self.IDENTITY_PROVIDER_URL, status_code=401)
self.saml2plugin.sp_response_consumer_url = self.SHIB_CONSUMER_URL self.saml2plugin.sp_response_consumer_url = self.SHIB_CONSUMER_URL
self.saml2plugin.saml2_authn_request = etree.XML( self.saml2plugin.saml2_authn_request = etree.XML(
@@ -225,7 +225,7 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
self.IDENTITY_PROVIDER_URL) self.IDENTITY_PROVIDER_URL)
def test_send_authn_response_to_sp(self): def test_send_authn_response_to_sp(self):
self.requests.post( self.requests_mock.post(
self.SHIB_CONSUMER_URL, self.SHIB_CONSUMER_URL,
json=saml2_fixtures.UNSCOPED_TOKEN, json=saml2_fixtures.UNSCOPED_TOKEN,
headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER}) headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER})
@@ -255,7 +255,7 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
self.SHIB_CONSUMER_URL) self.SHIB_CONSUMER_URL)
def test_consumer_url_mismatch(self): def test_consumer_url_mismatch(self):
self.requests.post(self.SHIB_CONSUMER_URL) self.requests_mock.post(self.SHIB_CONSUMER_URL)
invalid_consumer_url = uuid.uuid4().hex invalid_consumer_url = uuid.uuid4().hex
self.assertRaises( self.assertRaises(
exceptions.ValidationError, exceptions.ValidationError,
@@ -264,13 +264,13 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
invalid_consumer_url) invalid_consumer_url)
def test_custom_302_redirection(self): def test_custom_302_redirection(self):
self.requests.post( self.requests_mock.post(
self.SHIB_CONSUMER_URL, self.SHIB_CONSUMER_URL,
text='BODY', text='BODY',
headers={'location': self.FEDERATION_AUTH_URL}, headers={'location': self.FEDERATION_AUTH_URL},
status_code=302) status_code=302)
self.requests.get( self.requests_mock.get(
self.FEDERATION_AUTH_URL, self.FEDERATION_AUTH_URL,
json=saml2_fixtures.UNSCOPED_TOKEN, json=saml2_fixtures.UNSCOPED_TOKEN,
headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER}) headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER})
@@ -289,14 +289,14 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
self.assertEqual('GET', response.request.method) self.assertEqual('GET', response.request.method)
def test_end_to_end_workflow(self): def test_end_to_end_workflow(self):
self.requests.get( self.requests_mock.get(
self.FEDERATION_AUTH_URL, self.FEDERATION_AUTH_URL,
content=make_oneline(saml2_fixtures.SP_SOAP_RESPONSE)) content=make_oneline(saml2_fixtures.SP_SOAP_RESPONSE))
self.requests.post(self.IDENTITY_PROVIDER_URL, self.requests_mock.post(self.IDENTITY_PROVIDER_URL,
content=saml2_fixtures.SAML2_ASSERTION) content=saml2_fixtures.SAML2_ASSERTION)
self.requests.post( self.requests_mock.post(
self.SHIB_CONSUMER_URL, self.SHIB_CONSUMER_URL,
json=saml2_fixtures.UNSCOPED_TOKEN, json=saml2_fixtures.UNSCOPED_TOKEN,
headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER, headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER,
@@ -463,7 +463,7 @@ class AuthenticateviaADFSTests(utils.TestCase):
def test_get_adfs_security_token(self): def test_get_adfs_security_token(self):
"""Test ADFSUnscopedToken._get_adfs_security_token().""" """Test ADFSUnscopedToken._get_adfs_security_token()."""
self.requests.post( self.requests_mock.post(
self.IDENTITY_PROVIDER_URL, self.IDENTITY_PROVIDER_URL,
content=make_oneline(self.ADFS_SECURITY_TOKEN_RESPONSE), content=make_oneline(self.ADFS_SECURITY_TOKEN_RESPONSE),
status_code=200) status_code=200)
@@ -526,7 +526,7 @@ class AuthenticateviaADFSTests(utils.TestCase):
An exceptions.AuthorizationFailure should be raised including An exceptions.AuthorizationFailure should be raised including
error message from the XML message indicating where was the problem. error message from the XML message indicating where was the problem.
""" """
self.requests.post(self.IDENTITY_PROVIDER_URL, self.requests_mock.post(self.IDENTITY_PROVIDER_URL,
content=make_oneline(self.ADFS_FAULT), content=make_oneline(self.ADFS_FAULT),
status_code=500) status_code=500)
@@ -545,7 +545,7 @@ class AuthenticateviaADFSTests(utils.TestCase):
and correctly raise exceptions.InternalServerError once it cannot and correctly raise exceptions.InternalServerError once it cannot
parse XML fault message parse XML fault message
""" """
self.requests.post(self.IDENTITY_PROVIDER_URL, self.requests_mock.post(self.IDENTITY_PROVIDER_URL,
content=b'NOT XML', content=b'NOT XML',
status_code=500) status_code=500)
self.adfsplugin._prepare_adfs_request() self.adfsplugin._prepare_adfs_request()
@@ -559,7 +559,7 @@ class AuthenticateviaADFSTests(utils.TestCase):
"""Test whether SP issues a cookie.""" """Test whether SP issues a cookie."""
cookie = uuid.uuid4().hex cookie = uuid.uuid4().hex
self.requests.post(self.SP_ENDPOINT, self.requests_mock.post(self.SP_ENDPOINT,
headers={"set-cookie": cookie}, headers={"set-cookie": cookie},
status_code=302) status_code=302)
@@ -570,7 +570,7 @@ class AuthenticateviaADFSTests(utils.TestCase):
self.assertEqual(1, len(self.session.session.cookies)) self.assertEqual(1, len(self.session.session.cookies))
def test_send_assertion_to_service_provider_bad_status(self): def test_send_assertion_to_service_provider_bad_status(self):
self.requests.post(self.SP_ENDPOINT, status_code=500) self.requests_mock.post(self.SP_ENDPOINT, status_code=500)
self.adfsplugin.adfs_token = etree.XML( self.adfsplugin.adfs_token = etree.XML(
self.ADFS_SECURITY_TOKEN_RESPONSE) self.ADFS_SECURITY_TOKEN_RESPONSE)
@@ -590,7 +590,7 @@ class AuthenticateviaADFSTests(utils.TestCase):
self.session) self.session)
def test_check_valid_token_when_authenticated(self): def test_check_valid_token_when_authenticated(self):
self.requests.get(self.FEDERATION_AUTH_URL, self.requests_mock.get(self.FEDERATION_AUTH_URL,
json=saml2_fixtures.UNSCOPED_TOKEN, json=saml2_fixtures.UNSCOPED_TOKEN,
headers=client_fixtures.AUTH_RESPONSE_HEADERS) headers=client_fixtures.AUTH_RESPONSE_HEADERS)
@@ -605,17 +605,17 @@ class AuthenticateviaADFSTests(utils.TestCase):
response.json()['token']) response.json()['token'])
def test_end_to_end_workflow(self): def test_end_to_end_workflow(self):
self.requests.post(self.IDENTITY_PROVIDER_URL, self.requests_mock.post(self.IDENTITY_PROVIDER_URL,
content=self.ADFS_SECURITY_TOKEN_RESPONSE, content=self.ADFS_SECURITY_TOKEN_RESPONSE,
status_code=200) status_code=200)
self.requests.post(self.SP_ENDPOINT, self.requests_mock.post(self.SP_ENDPOINT,
headers={"set-cookie": 'x'}, headers={"set-cookie": 'x'},
status_code=302) status_code=302)
self.requests.get(self.FEDERATION_AUTH_URL, self.requests_mock.get(self.FEDERATION_AUTH_URL,
json=saml2_fixtures.UNSCOPED_TOKEN, json=saml2_fixtures.UNSCOPED_TOKEN,
headers=client_fixtures.AUTH_RESPONSE_HEADERS) headers=client_fixtures.AUTH_RESPONSE_HEADERS)
# NOTE(marek-denis): We need to mimic this until self.requests can # NOTE(marek-denis): We need to mimic this until self.requests_mock can
# issue cookies properly. # issue cookies properly.
self.session.session.cookies = [object()] self.session.session.cookies = [object()]
token, token_json = self.adfsplugin._get_unscoped_token(self.session) token, token_json = self.adfsplugin._get_unscoped_token(self.session)

View File

@@ -61,7 +61,7 @@ class DiscoverKeystoneTests(utils.UnauthenticatedTestCase):
} }
def test_get_version_local(self): def test_get_version_local(self):
self.requests.get("http://localhost:35357/", self.requests_mock.get("http://localhost:35357/",
status_code=300, status_code=300,
json=self.TEST_RESPONSE_DICT) json=self.TEST_RESPONSE_DICT)

View File

@@ -350,7 +350,7 @@ class FederationProjectTests(utils.TestCase):
projects_json = { projects_json = {
self.collection_key: [self.new_ref(), self.new_ref()] self.collection_key: [self.new_ref(), self.new_ref()]
} }
self.requests.get(self.URL, json=projects_json) self.requests_mock.get(self.URL, json=projects_json)
returned_list = self.manager.list() returned_list = self.manager.list()
self.assertEqual(len(projects_ref), len(returned_list)) self.assertEqual(len(projects_ref), len(returned_list))
@@ -381,7 +381,7 @@ class FederationDomainTests(utils.TestCase):
domains_json = { domains_json = {
self.collection_key: domains_ref self.collection_key: domains_ref
} }
self.requests.get(self.URL, json=domains_json) self.requests_mock.get(self.URL, json=domains_json)
returned_list = self.manager.list() returned_list = self.manager.list()
self.assertEqual(len(domains_ref), len(returned_list)) self.assertEqual(len(domains_ref), len(returned_list))
for domain in returned_list: for domain in returned_list:

View File

@@ -183,7 +183,7 @@ class RequestTokenTests(TokenTests):
# Assert that the project id is in the header # Assert that the project id is in the header
self.assertRequestHeaderEqual('requested-project-id', project_id) self.assertRequestHeaderEqual('requested-project-id', project_id)
req_headers = self.requests.last_request.headers req_headers = self.requests_mock.last_request.headers
oauth_client = oauth1.Client(consumer_key, oauth_client = oauth1.Client(consumer_key,
client_secret=consumer_secret, client_secret=consumer_secret,
@@ -223,7 +223,7 @@ class AccessTokenTests(TokenTests):
self.assertEqual(access_secret, access_token.secret) self.assertEqual(access_secret, access_token.secret)
self.assertEqual(expires_at, access_token.expires) self.assertEqual(expires_at, access_token.expires)
req_headers = self.requests.last_request.headers req_headers = self.requests_mock.last_request.headers
oauth_client = oauth1.Client(consumer_key, oauth_client = oauth1.Client(consumer_key,
client_secret=consumer_secret, client_secret=consumer_secret,
resource_owner_key=request_key, resource_owner_key=request_key,
@@ -275,7 +275,7 @@ class AuthenticateWithOAuthTests(TokenTests):
self.assertRequestBodyIs(json=OAUTH_REQUEST_BODY) self.assertRequestBodyIs(json=OAUTH_REQUEST_BODY)
# Assert that the headers have the same oauthlib data # Assert that the headers have the same oauthlib data
req_headers = self.requests.last_request.headers req_headers = self.requests_mock.last_request.headers
oauth_client = oauth1.Client(consumer_key, oauth_client = oauth1.Client(consumer_key,
client_secret=consumer_secret, client_secret=consumer_secret,
resource_owner_key=access_key, resource_owner_key=access_key,

View File

@@ -241,7 +241,7 @@ class UserTests(utils.TestCase, utils.CrudTests):
} }
self.assertEqual(self.TEST_URL + '/users/test/password', self.assertEqual(self.TEST_URL + '/users/test/password',
self.requests.last_request.url) self.requests_mock.last_request.url)
self.assertRequestBodyIs(json=exp_req_body) self.assertRequestBodyIs(json=exp_req_body)
self.assertNotIn(old_password, self.logger.output) self.assertNotIn(old_password, self.logger.output)
self.assertNotIn(new_password, self.logger.output) self.assertNotIn(new_password, self.logger.output)

View File

@@ -250,14 +250,14 @@ class CrudTests(object):
ref_list = ref_list or [self.new_ref(), self.new_ref()] ref_list = ref_list or [self.new_ref(), self.new_ref()]
expected_path = self._get_expected_path(expected_path) expected_path = self._get_expected_path(expected_path)
self.requests.get(urlparse.urljoin(self.TEST_URL, expected_path), self.requests_mock.get(urlparse.urljoin(self.TEST_URL, expected_path),
json=self.encode(ref_list)) json=self.encode(ref_list))
returned_list = self.manager.list(**filter_kwargs) returned_list = self.manager.list(**filter_kwargs)
self.assertEqual(len(ref_list), len(returned_list)) self.assertEqual(len(ref_list), len(returned_list))
[self.assertIsInstance(r, self.model) for r in returned_list] [self.assertIsInstance(r, self.model) for r in returned_list]
qs_args = self.requests.last_request.qs qs_args = self.requests_mock.last_request.qs
qs_args_expected = expected_query or filter_kwargs qs_args_expected = expected_query or filter_kwargs
for key, value in six.iteritems(qs_args_expected): for key, value in six.iteritems(qs_args_expected):
self.assertIn(key, qs_args) self.assertIn(key, qs_args)
@@ -276,7 +276,7 @@ class CrudTests(object):
filter_kwargs = {uuid.uuid4().hex: uuid.uuid4().hex} filter_kwargs = {uuid.uuid4().hex: uuid.uuid4().hex}
expected_path = self._get_expected_path() expected_path = self._get_expected_path()
self.requests.get(urlparse.urljoin(self.TEST_URL, expected_path), self.requests_mock.get(urlparse.urljoin(self.TEST_URL, expected_path),
json=self.encode(ref_list)) json=self.encode(ref_list))
self.manager.list(**filter_kwargs) self.manager.list(**filter_kwargs)