Update requests-mock syntax

With requests-mock 0.5 we can do away with register_uri(method, ..) and
just use the method name as a function. This feels much cleaner and
looks more like requests syntax.

In the same update there is query string decoding included in the
history, so we no longer have to manually parse the query strings out of
the url.

Change-Id: I43d31576d15b4be72350bebf00733c08a7fb3e6c
This commit is contained in:
Jamie Lennox
2014-10-28 11:04:00 +01:00
parent 9eec63c3b6
commit 97e777840e
12 changed files with 173 additions and 254 deletions

View File

@@ -107,7 +107,7 @@ class CommonIdentityTests(object):
# register responses such that if the discovery URL is hit more than
# once then the response will be invalid and not point to COMPUTE_ADMIN
resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}]
self.requests.register_uri('GET', self.TEST_COMPUTE_ADMIN, resps)
self.requests.get(self.TEST_COMPUTE_ADMIN, resps)
body = 'SUCCESS'
self.stub_url('GET', ['path'], text=body)
@@ -132,7 +132,7 @@ class CommonIdentityTests(object):
# register responses such that if the discovery URL is hit more than
# once then the response will be invalid and not point to COMPUTE_ADMIN
resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}]
self.requests.register_uri('GET', self.TEST_COMPUTE_ADMIN, resps)
self.requests.get(self.TEST_COMPUTE_ADMIN, resps)
body = 'SUCCESS'
self.stub_url('GET', ['path'], text=body)

View File

@@ -431,8 +431,7 @@ class V3IdentityPlugin(utils.TestCase):
{'status_code': 200, 'json': self.TEST_RESPONSE_DICT,
'headers': {'X-Subject-Token': 'token2'}}]
self.requests.register_uri('POST', '%s/auth/tokens' % self.TEST_URL,
auth_responses)
self.requests.post('%s/auth/tokens' % self.TEST_URL, auth_responses)
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS)

View File

@@ -23,7 +23,7 @@ class TokenEndpointTest(utils.TestCase):
TEST_URL = 'http://server/prefix'
def test_basic_case(self):
self.requests.register_uri('GET', self.TEST_URL, text='body')
self.requests.get(self.TEST_URL, text='body')
a = token_endpoint.Token(self.TEST_URL, self.TEST_TOKEN)
s = session.Session(auth=a)

View File

@@ -56,8 +56,7 @@ EXTENSION_LIST = _create_extension_list([EXTENSION_FOO, EXTENSION_BAR])
class ClientDiscoveryTests(utils.TestCase):
def test_discover_extensions_v2(self):
self.requests.register_uri('GET', "%s/extensions" % V2_URL,
text=EXTENSION_LIST)
self.requests.get("%s/extensions" % V2_URL, text=EXTENSION_LIST)
extensions = client.Client().discover_extensions(url=V2_URL)
self.assertIn(EXTENSION_ALIAS_FOO, extensions)
self.assertEqual(extensions[EXTENSION_ALIAS_FOO], EXTENSION_NAME_FOO)

View File

@@ -258,13 +258,11 @@ class MultiStepAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
# Get a token, then try to retrieve revocation list and get a 401.
# Get a new token, try to retrieve revocation list and return 200.
self.requests.register_uri('POST', "%s/v2.0/tokens" % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.requests.post("%s/v2.0/tokens" % BASE_URI, text=FAKE_ADMIN_TOKEN)
text = self.examples.SIGNED_REVOCATION_LIST
self.requests.register_uri('GET', "%s/v2.0/tokens/revoked" % BASE_URI,
response_list=[{'status_code': 401},
{'text': text}])
self.requests.get("%s/v2.0/tokens/revoked" % BASE_URI,
response_list=[{'status_code': 401}, {'text': text}])
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
self.assertEqual(fetched_list, self.examples.REVOCATION_LIST)
@@ -291,17 +289,17 @@ class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
super(DiabloAuthTokenMiddlewareTest, self).setUp(
expected_env=expected_env)
self.requests.register_uri('GET', "%s/" % BASE_URI,
text=VERSION_LIST_v2, status_code=300)
self.requests.get("%s/" % BASE_URI,
text=VERSION_LIST_v2,
status_code=300)
self.requests.register_uri('POST', "%s/v2.0/tokens" % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.requests.post("%s/v2.0/tokens" % BASE_URI, text=FAKE_ADMIN_TOKEN)
self.token_id = self.examples.VALID_DIABLO_TOKEN
token_response = self.examples.JSON_TOKEN_RESPONSES[self.token_id]
url = '%s/v2.0/tokens/%s' % (BASE_URI, self.token_id)
self.requests.register_uri('GET', url, text=token_response)
self.requests.get(url, text=token_response)
self.set_middleware()
@@ -856,8 +854,7 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
def test_invalid_revocation_list_raises_service_error(self):
self.requests.register_uri('GET', '%s/v2.0/tokens/revoked' % BASE_URI,
text='{}')
self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, text='{}')
self.assertRaises(auth_token.ServiceError,
self.middleware.fetch_revocation_list)
@@ -872,8 +869,7 @@ class CommonAuthTokenMiddlewareTest(object):
# remember because we are testing the middleware we stub the connection
# to the keystone server, but this is not what gets returned
invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
self.requests.register_uri('GET', invalid_uri, text="",
status_code=404)
self.requests.get(invalid_uri, text="", status_code=404)
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = 'invalid-token'
@@ -961,7 +957,7 @@ class CommonAuthTokenMiddlewareTest(object):
def test_memcache_set_invalid_uuid(self):
invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
self.requests.register_uri('GET', invalid_uri, status_code=404)
self.requests.get(invalid_uri, status_code=404)
req = webob.Request.blank('/')
token = 'invalid-token'
@@ -1255,10 +1251,10 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_request_no_token_dummy(self):
cms._ensure_subprocess()
self.requests.register_uri('GET', "%s%s" % (BASE_URI, self.ca_path),
status_code=404)
self.requests.get("%s%s" % (BASE_URI, self.ca_path),
status_code=404)
url = "%s%s" % (BASE_URI, self.signing_path)
self.requests.register_uri('GET', url, status_code=404)
self.requests.get(url, status_code=404)
self.assertRaises(exceptions.CertificateConfigError,
self.middleware.verify_signed_token,
self.examples.SIGNED_TOKEN_SCOPED,
@@ -1267,7 +1263,7 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_fetch_signing_cert(self):
data = 'FAKE CERT'
url = '%s%s' % (BASE_URI, self.signing_path)
self.requests.register_uri('GET', url, text=data)
self.requests.get(url, text=data)
self.middleware.fetch_signing_cert()
with open(self.middleware.signing_cert_file_name, 'r') as f:
@@ -1277,8 +1273,7 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_fetch_signing_ca(self):
data = 'FAKE CA'
self.requests.register_uri('GET', "%s%s" % (BASE_URI, self.ca_path),
text=data)
self.requests.get("%s%s" % (BASE_URI, self.ca_path), text=data)
self.middleware.fetch_ca_cert()
with open(self.middleware.signing_ca_file_name, 'r') as f:
@@ -1293,11 +1288,10 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.conf['auth_port'] = 1234
self.conf['auth_admin_prefix'] = '/newadmin/'
self.requests.register_uri('GET',
"%s/newadmin%s" % (BASE_HOST, self.ca_path),
text='FAKECA')
self.requests.get("%s/newadmin%s" % (BASE_HOST, self.ca_path),
text='FAKECA')
url = "%s/newadmin%s" % (BASE_HOST, self.signing_path)
self.requests.register_uri('GET', url, text='FAKECERT')
self.requests.get(url, text='FAKECERT')
self.set_middleware(conf=self.conf)
@@ -1316,11 +1310,9 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.conf['auth_port'] = 1234
self.conf['auth_admin_prefix'] = ''
self.requests.register_uri('GET', "%s%s" % (BASE_HOST, self.ca_path),
text='FAKECA')
self.requests.register_uri('GET', "%s%s" % (BASE_HOST,
self.signing_path),
text='FAKECERT')
self.requests.get("%s%s" % (BASE_HOST, self.ca_path), text='FAKECA')
self.requests.get("%s%s" % (BASE_HOST, self.signing_path),
text='FAKECERT')
self.set_middleware(conf=self.conf)
@@ -1392,14 +1384,15 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.REVOKED_TOKEN_HASH_SHA256,
}
self.requests.register_uri('GET', "%s/" % BASE_URI,
text=VERSION_LIST_v2, status_code=300)
self.requests.get("%s/" % BASE_URI,
text=VERSION_LIST_v2,
status_code=300)
self.requests.register_uri('POST', "%s/v2.0/tokens" % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.requests.post("%s/v2.0/tokens" % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.requests.register_uri('GET', "%s/v2.0/tokens/revoked" % BASE_URI,
text=self.examples.SIGNED_REVOCATION_LIST)
self.requests.get("%s/v2.0/tokens/revoked" % BASE_URI,
text=self.examples.SIGNED_REVOCATION_LIST)
for token in (self.examples.UUID_TOKEN_DEFAULT,
self.examples.UUID_TOKEN_UNSCOPED,
@@ -1409,14 +1402,11 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.SIGNED_TOKEN_SCOPED_KEY,
self.examples.SIGNED_TOKEN_SCOPED_PKIZ_KEY,):
text = self.examples.JSON_TOKEN_RESPONSES[token]
self.requests.register_uri('GET',
'%s/v2.0/tokens/%s' % (BASE_URI, token),
text=text)
self.requests.get('%s/v2.0/tokens/%s' % (BASE_URI, token),
text=text)
self.requests.register_uri('GET',
'%s/v2.0/tokens/%s' % (BASE_URI,
ERROR_TOKEN),
text=network_error_response)
self.requests.get('%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN),
text=network_error_response)
self.set_middleware()
@@ -1492,16 +1482,16 @@ class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
'auth_version': 'v2.0'
}
self.requests.register_uri('GET', '%s/' % BASE_URI,
text=VERSION_LIST_v3, status_code=300)
self.requests.get('%s/' % BASE_URI,
text=VERSION_LIST_v3,
status_code=300)
self.requests.register_uri('POST', '%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.requests.post('%s/v2.0/tokens' % BASE_URI, text=FAKE_ADMIN_TOKEN)
token = self.examples.UUID_TOKEN_DEFAULT
url = '%s/v2.0/tokens/%s' % (BASE_URI, token)
response_body = self.examples.JSON_TOKEN_RESPONSES[token]
self.requests.register_uri('GET', url, text=response_body)
self.requests.get(url, text=response_body)
self.set_middleware(conf=conf)
@@ -1573,20 +1563,18 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.REVOKED_v3_PKIZ_TOKEN_HASH,
}
self.requests.register_uri('GET', BASE_URI,
text=VERSION_LIST_v3, status_code=300)
self.requests.get(BASE_URI, text=VERSION_LIST_v3, status_code=300)
# TODO(jamielennox): auth_token middleware uses a v2 admin token
# regardless of the auth_version that is set.
self.requests.register_uri('POST', '%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.requests.post('%s/v2.0/tokens' % BASE_URI, text=FAKE_ADMIN_TOKEN)
# TODO(jamielennox): there is no v3 revocation url yet, it uses v2
self.requests.register_uri('GET', '%s/v2.0/tokens/revoked' % BASE_URI,
text=self.examples.SIGNED_REVOCATION_LIST)
self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI,
text=self.examples.SIGNED_REVOCATION_LIST)
self.requests.register_uri('GET', '%s/v3/auth/tokens' % BASE_URI,
text=self.token_response)
self.requests.get('%s/v3/auth/tokens' % BASE_URI,
text=self.token_response)
self.set_middleware()

View File

@@ -242,7 +242,7 @@ class AvailableVersionsTests(utils.TestCase):
for path, text in six.iteritems(examples):
url = "%s%s" % (BASE_URL, path)
self.requests.register_uri('GET', url, status_code=300, text=text)
self.requests.get(url, status_code=300, text=text)
versions = discover.available_versions(url)
for v in versions:
@@ -252,8 +252,7 @@ class AvailableVersionsTests(utils.TestCase):
matchers.Contains(n)))
def test_available_versions_individual(self):
self.requests.register_uri('GET', V3_URL, status_code=200,
text=V3_VERSION_ENTRY)
self.requests.get(V3_URL, status_code=200, text=V3_VERSION_ENTRY)
versions = discover.available_versions(V3_URL)
@@ -264,8 +263,7 @@ class AvailableVersionsTests(utils.TestCase):
self.assertIn('links', v)
def test_available_keystone_data(self):
self.requests.register_uri('GET', BASE_URL, status_code=300,
text=V3_VERSION_LIST)
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
versions = discover.available_versions(BASE_URL)
self.assertEqual(2, len(versions))
@@ -280,7 +278,7 @@ class AvailableVersionsTests(utils.TestCase):
def test_available_cinder_data(self):
text = jsonutils.dumps(CINDER_EXAMPLES)
self.requests.register_uri('GET', BASE_URL, status_code=300, text=text)
self.requests.get(BASE_URL, status_code=300, text=text)
versions = discover.available_versions(BASE_URL)
self.assertEqual(2, len(versions))
@@ -296,7 +294,7 @@ class AvailableVersionsTests(utils.TestCase):
def test_available_glance_data(self):
text = jsonutils.dumps(GLANCE_EXAMPLES)
self.requests.register_uri('GET', BASE_URL, status_code=200, text=text)
self.requests.get(BASE_URL, status_code=200, text=text)
versions = discover.available_versions(BASE_URL)
self.assertEqual(5, len(versions))
@@ -313,10 +311,9 @@ class AvailableVersionsTests(utils.TestCase):
class ClientDiscoveryTests(utils.TestCase):
def assertCreatesV3(self, **kwargs):
self.requests.register_uri('POST',
'%s/auth/tokens' % V3_URL,
text=V3_AUTH_RESPONSE,
headers={'X-Subject-Token': V3_TOKEN})
self.requests.post('%s/auth/tokens' % V3_URL,
text=V3_AUTH_RESPONSE,
headers={'X-Subject-Token': V3_TOKEN})
kwargs.setdefault('username', 'foo')
kwargs.setdefault('password', 'bar')
@@ -325,8 +322,7 @@ class ClientDiscoveryTests(utils.TestCase):
return keystone
def assertCreatesV2(self, **kwargs):
self.requests.register_uri('POST', "%s/tokens" % V2_URL,
text=V2_AUTH_RESPONSE)
self.requests.post("%s/tokens" % V2_URL, text=V2_AUTH_RESPONSE)
kwargs.setdefault('username', 'foo')
kwargs.setdefault('password', 'bar')
@@ -349,89 +345,73 @@ class ClientDiscoveryTests(utils.TestCase):
client.Client, **kwargs)
def test_discover_v3(self):
self.requests.register_uri('GET', BASE_URL, status_code=300,
text=V3_VERSION_LIST)
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
self.assertCreatesV3(auth_url=BASE_URL)
def test_discover_v2(self):
self.requests.register_uri('GET', BASE_URL, status_code=300,
text=V2_VERSION_LIST)
self.requests.register_uri('POST', "%s/tokens" % V2_URL,
text=V2_AUTH_RESPONSE)
self.requests.get(BASE_URL, status_code=300, text=V2_VERSION_LIST)
self.requests.post("%s/tokens" % V2_URL, text=V2_AUTH_RESPONSE)
self.assertCreatesV2(auth_url=BASE_URL)
def test_discover_endpoint_v2(self):
self.requests.register_uri('GET', BASE_URL, status_code=300,
text=V2_VERSION_LIST)
self.requests.get(BASE_URL, status_code=300, text=V2_VERSION_LIST)
self.assertCreatesV2(endpoint=BASE_URL, token='fake-token')
def test_discover_endpoint_v3(self):
self.requests.register_uri('GET', BASE_URL, status_code=300,
text=V3_VERSION_LIST)
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
self.assertCreatesV3(endpoint=BASE_URL, token='fake-token')
def test_discover_invalid_major_version(self):
self.requests.register_uri('GET', BASE_URL, status_code=300,
text=V3_VERSION_LIST)
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
self.assertVersionNotAvailable(auth_url=BASE_URL, version=5)
def test_discover_200_response_fails(self):
self.requests.register_uri('GET', BASE_URL,
status_code=200, text='ok')
self.requests.get(BASE_URL, text='ok')
self.assertDiscoveryFailure(auth_url=BASE_URL)
def test_discover_minor_greater_than_available_fails(self):
self.requests.register_uri('GET', BASE_URL, status_code=300,
text=V3_VERSION_LIST)
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
self.assertVersionNotAvailable(endpoint=BASE_URL, version=3.4)
def test_discover_individual_version_v2(self):
self.requests.register_uri('GET', V2_URL, status_code=200,
text=V2_VERSION_ENTRY)
self.requests.get(V2_URL, text=V2_VERSION_ENTRY)
self.assertCreatesV2(auth_url=V2_URL)
def test_discover_individual_version_v3(self):
self.requests.register_uri('GET', V3_URL, status_code=200,
text=V3_VERSION_ENTRY)
self.requests.get(V3_URL, text=V3_VERSION_ENTRY)
self.assertCreatesV3(auth_url=V3_URL)
def test_discover_individual_endpoint_v2(self):
self.requests.register_uri('GET', V2_URL, status_code=200,
text=V2_VERSION_ENTRY)
self.requests.get(V2_URL, text=V2_VERSION_ENTRY)
self.assertCreatesV2(endpoint=V2_URL, token='fake-token')
def test_discover_individual_endpoint_v3(self):
self.requests.register_uri('GET', V3_URL, status_code=200,
text=V3_VERSION_ENTRY)
self.requests.get(V3_URL, text=V3_VERSION_ENTRY)
self.assertCreatesV3(endpoint=V3_URL, token='fake-token')
def test_discover_fail_to_create_bad_individual_version(self):
self.requests.register_uri('GET', V2_URL, status_code=200,
text=V2_VERSION_ENTRY)
self.requests.register_uri('GET', V3_URL, status_code=200,
text=V3_VERSION_ENTRY)
self.requests.get(V2_URL, text=V2_VERSION_ENTRY)
self.requests.get(V3_URL, text=V3_VERSION_ENTRY)
self.assertVersionNotAvailable(auth_url=V2_URL, version=3)
self.assertVersionNotAvailable(auth_url=V3_URL, version=2)
def test_discover_unstable_versions(self):
version_list = fixture.DiscoveryList(BASE_URL, v3_status='beta')
self.requests.register_uri('GET', BASE_URL, status_code=300,
json=version_list)
self.requests.get(BASE_URL, status_code=300, json=version_list)
self.assertCreatesV2(auth_url=BASE_URL)
self.assertVersionNotAvailable(auth_url=BASE_URL, version=3)
self.assertCreatesV3(auth_url=BASE_URL, unstable=True)
def test_discover_forwards_original_ip(self):
self.requests.register_uri('GET', BASE_URL, status_code=300,
text=V3_VERSION_LIST)
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
ip = '192.168.1.1'
self.assertCreatesV3(auth_url=BASE_URL, original_ip=ip)
@@ -444,8 +424,7 @@ class ClientDiscoveryTests(utils.TestCase):
client.Client)
def test_discover_bad_response(self):
self.requests.register_uri('GET', BASE_URL, status_code=300,
json={'FOO': 'BAR'})
self.requests.get(BASE_URL, status_code=300, json={'FOO': 'BAR'})
self.assertDiscoveryFailure(auth_url=BASE_URL)
def test_discovery_ignore_invalid(self):
@@ -454,44 +433,40 @@ class ClientDiscoveryTests(utils.TestCase):
'media-types': V3_MEDIA_TYPES,
'status': 'stable',
'updated': UPDATED}]
self.requests.register_uri('GET', BASE_URL, status_code=300,
text=_create_version_list(resp))
self.requests.get(BASE_URL, status_code=300,
text=_create_version_list(resp))
self.assertDiscoveryFailure(auth_url=BASE_URL)
def test_ignore_entry_without_links(self):
v3 = V3_VERSION.copy()
v3['links'] = []
self.requests.register_uri('GET', BASE_URL, status_code=300,
text=_create_version_list([v3, V2_VERSION]))
self.requests.get(BASE_URL, status_code=300,
text=_create_version_list([v3, V2_VERSION]))
self.assertCreatesV2(auth_url=BASE_URL)
def test_ignore_entry_without_status(self):
v3 = V3_VERSION.copy()
del v3['status']
self.requests.register_uri('GET', BASE_URL, status_code=300,
text=_create_version_list([v3, V2_VERSION]))
self.requests.get(BASE_URL, status_code=300,
text=_create_version_list([v3, V2_VERSION]))
self.assertCreatesV2(auth_url=BASE_URL)
def test_greater_version_than_required(self):
versions = fixture.DiscoveryList(BASE_URL, v3_id='v3.6')
self.requests.register_uri('GET', BASE_URL, status_code=200,
json=versions)
self.requests.get(BASE_URL, json=versions)
self.assertCreatesV3(auth_url=BASE_URL, version=(3, 4))
def test_lesser_version_than_required(self):
versions = fixture.DiscoveryList(BASE_URL, v3_id='v3.4')
self.requests.register_uri('GET', BASE_URL, status_code=200,
json=versions)
self.requests.get(BASE_URL, json=versions)
self.assertVersionNotAvailable(auth_url=BASE_URL, version=(3, 6))
def test_bad_response(self):
self.requests.register_uri('GET', BASE_URL, status_code=300,
text="Ugly Duckling")
self.requests.get(BASE_URL, status_code=300, text="Ugly Duckling")
self.assertDiscoveryFailure(auth_url=BASE_URL)
def test_pass_client_arguments(self):
self.requests.register_uri('GET', BASE_URL, status_code=300,
text=V2_VERSION_LIST)
self.requests.get(BASE_URL, status_code=300, text=V2_VERSION_LIST)
kwargs = {'original_ip': '100', 'use_keyring': False,
'stale_duration': 15}
@@ -502,12 +477,11 @@ class ClientDiscoveryTests(utils.TestCase):
self.assertFalse(cl.use_keyring)
def test_overriding_stored_kwargs(self):
self.requests.register_uri('GET', BASE_URL, status_code=300,
text=V3_VERSION_LIST)
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
self.requests.register_uri('POST', "%s/auth/tokens" % V3_URL,
text=V3_AUTH_RESPONSE,
headers={'X-Subject-Token': V3_TOKEN})
self.requests.post("%s/auth/tokens" % V3_URL,
text=V3_AUTH_RESPONSE,
headers={'X-Subject-Token': V3_TOKEN})
disc = discover.Discover(auth_url=BASE_URL, debug=False,
username='foo')
@@ -520,8 +494,7 @@ class ClientDiscoveryTests(utils.TestCase):
self.assertEqual(client.password, 'bar')
def test_available_versions(self):
self.requests.register_uri('GET', BASE_URL, status_code=300,
text=V3_VERSION_ENTRY)
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_ENTRY)
disc = discover.Discover(auth_url=BASE_URL)
versions = disc.available_versions()
@@ -536,8 +509,7 @@ class ClientDiscoveryTests(utils.TestCase):
'updated': UPDATED}
versions = fixture.DiscoveryList()
versions.add_version(V4_VERSION)
self.requests.register_uri('GET', BASE_URL, status_code=300,
json=versions)
self.requests.get(BASE_URL, status_code=300, json=versions)
disc = discover.Discover(auth_url=BASE_URL)
self.assertRaises(exceptions.DiscoveryFailure,
@@ -545,16 +517,14 @@ class ClientDiscoveryTests(utils.TestCase):
def test_discovery_fail_for_missing_v3(self):
versions = fixture.DiscoveryList(v2=True, v3=False)
self.requests.register_uri('GET', BASE_URL, status_code=300,
json=versions)
self.requests.get(BASE_URL, status_code=300, json=versions)
disc = discover.Discover(auth_url=BASE_URL)
self.assertRaises(exceptions.DiscoveryFailure,
disc.create_client, version=(3, 0))
def _do_discovery_call(self, token=None, **kwargs):
self.requests.register_uri('GET', BASE_URL, status_code=300,
text=V3_VERSION_LIST)
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
if not token:
token = uuid.uuid4().hex
@@ -581,8 +551,7 @@ class ClientDiscoveryTests(utils.TestCase):
class DiscoverQueryTests(utils.TestCase):
def test_available_keystone_data(self):
self.requests.register_uri('GET', BASE_URL, status_code=300,
text=V3_VERSION_LIST)
self.requests.get(BASE_URL, status_code=300, text=V3_VERSION_LIST)
disc = discover.Discover(auth_url=BASE_URL)
versions = disc.version_data()
@@ -610,7 +579,7 @@ class DiscoverQueryTests(utils.TestCase):
def test_available_cinder_data(self):
text = jsonutils.dumps(CINDER_EXAMPLES)
self.requests.register_uri('GET', BASE_URL, status_code=300, text=text)
self.requests.get(BASE_URL, status_code=300, text=text)
v1_url = "%sv1/" % BASE_URL
v2_url = "%sv2/" % BASE_URL
@@ -641,7 +610,7 @@ class DiscoverQueryTests(utils.TestCase):
def test_available_glance_data(self):
text = jsonutils.dumps(GLANCE_EXAMPLES)
self.requests.register_uri('GET', BASE_URL, status_code=200, text=text)
self.requests.get(BASE_URL, text=text)
v1_url = "%sv1/" % BASE_URL
v2_url = "%sv2/" % BASE_URL
@@ -690,7 +659,7 @@ class DiscoverQueryTests(utils.TestCase):
'status': status,
'updated': UPDATED}]
text = jsonutils.dumps({'versions': version_list})
self.requests.register_uri('GET', BASE_URL, status_code=200, text=text)
self.requests.get(BASE_URL, text=text)
disc = discover.Discover(auth_url=BASE_URL)
@@ -712,7 +681,7 @@ class DiscoverQueryTests(utils.TestCase):
'status': status,
'updated': UPDATED}]
text = jsonutils.dumps({'versions': version_list})
self.requests.register_uri('GET', BASE_URL, status_code=200, text=text)
self.requests.get(BASE_URL, text=text)
disc = discover.Discover(auth_url=BASE_URL)
@@ -729,8 +698,7 @@ class DiscoverQueryTests(utils.TestCase):
status = 'abcdef'
version_list = fixture.DiscoveryList(BASE_URL, v2=False,
v3_status=status)
self.requests.register_uri('GET', BASE_URL, status_code=200,
json=version_list)
self.requests.get(BASE_URL, json=version_list)
disc = discover.Discover(auth_url=BASE_URL)
versions = disc.version_data()
@@ -759,8 +727,7 @@ class DiscoverQueryTests(utils.TestCase):
}]
text = jsonutils.dumps({'versions': version_list})
self.requests.register_uri('GET', BASE_URL, status_code=200,
text=text)
self.requests.get(BASE_URL, text=text)
disc = discover.Discover(auth_url=BASE_URL)

View File

@@ -64,8 +64,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
super(S3TokenMiddlewareTestGood, self).setUp()
self.middleware = s3_token.S3Token(FakeApp(), self.conf)
self.requests.register_uri('POST', self.TEST_URL,
status_code=201, json=GOOD_RESPONSE)
self.requests.post(self.TEST_URL, status_code=201, json=GOOD_RESPONSE)
# Ignore the request and pass to the next middleware in the
# pipeline if no path has been specified.
@@ -99,8 +98,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
TEST_URL = 'http://%s:%d/v2.0/s3tokens' % (self.TEST_HOST,
self.TEST_PORT)
self.requests.register_uri('POST', TEST_URL,
status_code=201, json=GOOD_RESPONSE)
self.requests.post(TEST_URL, status_code=201, json=GOOD_RESPONSE)
self.middleware = (
s3_token.filter_factory({'auth_protocol': 'http',
@@ -153,8 +151,7 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
{"message": "EC2 access key not found.",
"code": 401,
"title": "Unauthorized"}}
self.requests.register_uri('POST', self.TEST_URL,
status_code=403, json=ret)
self.requests.post(self.TEST_URL, status_code=403, json=ret)
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'access:signature'
req.headers['X-Storage-Token'] = 'token'
@@ -186,8 +183,7 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
self.assertEqual(resp.status_int, s3_invalid_req.status_int)
def test_bad_reply(self):
self.requests.register_uri('POST', self.TEST_URL,
status_code=201, text="<badreply>")
self.requests.post(self.TEST_URL, status_code=201, text="<badreply>")
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
req.headers['Authorization'] = 'access:signature'

View File

@@ -437,7 +437,7 @@ class SessionAuthTests(utils.TestCase):
def test_raises_exc_only_when_asked(self):
# A request that returns a HTTP error should by default raise an
# exception by default, if you specify raise_exc=False then it will not
self.requests.register_uri('GET', self.TEST_URL, status_code=401)
self.requests.get(self.TEST_URL, status_code=401)
sess = client_session.Session()
self.assertRaises(exceptions.Unauthorized, sess.get, self.TEST_URL)
@@ -449,9 +449,8 @@ class SessionAuthTests(utils.TestCase):
passed = CalledAuthPlugin()
sess = client_session.Session()
self.requests.register_uri('GET',
CalledAuthPlugin.ENDPOINT + 'path',
status_code=200)
self.requests.get(CalledAuthPlugin.ENDPOINT + 'path',
status_code=200)
endpoint_filter = {'service_type': 'identity'}
# no plugin with authenticated won't work
@@ -474,9 +473,8 @@ class SessionAuthTests(utils.TestCase):
sess = client_session.Session(fixed)
self.requests.register_uri('GET',
CalledAuthPlugin.ENDPOINT + 'path',
status_code=200)
self.requests.get(CalledAuthPlugin.ENDPOINT + 'path',
status_code=200)
resp = sess.get('path', auth=passed,
endpoint_filter={'service_type': 'identity'})
@@ -508,9 +506,9 @@ class SessionAuthTests(utils.TestCase):
auth = CalledAuthPlugin(invalidate=True)
sess = client_session.Session(auth=auth)
self.requests.register_uri('GET', self.TEST_URL,
[{'text': 'Failed', 'status_code': 401},
{'text': 'Hello', 'status_code': 200}])
self.requests.get(self.TEST_URL,
[{'text': 'Failed', 'status_code': 401},
{'text': 'Hello', 'status_code': 200}])
# allow_reauth=True is the default
resp = sess.get(self.TEST_URL, authenticated=True)
@@ -523,9 +521,9 @@ class SessionAuthTests(utils.TestCase):
auth = CalledAuthPlugin(invalidate=True)
sess = client_session.Session(auth=auth)
self.requests.register_uri('GET', self.TEST_URL,
[{'text': 'Failed', 'status_code': 401},
{'text': 'Hello', 'status_code': 200}])
self.requests.get(self.TEST_URL,
[{'text': 'Failed', 'status_code': 401},
{'text': 'Hello', 'status_code': 200}])
self.assertRaises(exceptions.Unauthorized, sess.get, self.TEST_URL,
authenticated=True, allow_reauth=False)
@@ -540,7 +538,7 @@ class SessionAuthTests(utils.TestCase):
override_url = override_base + path
resp_text = uuid.uuid4().hex
self.requests.register_uri('GET', override_url, text=resp_text)
self.requests.get(override_url, text=resp_text)
resp = sess.get(path,
endpoint_override=override_base,
@@ -560,7 +558,7 @@ class SessionAuthTests(utils.TestCase):
url = self.TEST_URL + path
resp_text = uuid.uuid4().hex
self.requests.register_uri('GET', url, text=resp_text)
self.requests.get(url, text=resp_text)
resp = sess.get(url,
endpoint_override='http://someother.url',
@@ -682,7 +680,7 @@ class AdapterTest(utils.TestCase):
adpt = adapter.Adapter(sess, endpoint_override=endpoint_override)
response = uuid.uuid4().hex
self.requests.register_uri('GET', endpoint_url, text=response)
self.requests.get(endpoint_url, text=response)
resp = adpt.get(path)

View File

@@ -128,8 +128,7 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
def test_initial_sp_call(self):
"""Test initial call, expect SOAP message."""
self.requests.register_uri(
'GET',
self.requests.get(
self.FEDERATION_AUTH_URL,
content=make_oneline(saml2_fixtures.SP_SOAP_RESPONSE))
a = self.saml2plugin._send_service_provider_request(self.session)
@@ -154,8 +153,7 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
str(self.saml2plugin.sp_response_consumer_url)))
def test_initial_sp_call_when_saml_authenticated(self):
self.requests.register_uri(
'GET',
self.requests.get(
self.FEDERATION_AUTH_URL,
json=saml2_fixtures.UNSCOPED_TOKEN,
headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER})
@@ -170,8 +168,7 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
self.saml2plugin.authenticated_response.headers['X-Subject-Token'])
def test_get_unscoped_token_when_authenticated(self):
self.requests.register_uri(
'GET',
self.requests.get(
self.FEDERATION_AUTH_URL,
json=saml2_fixtures.UNSCOPED_TOKEN,
headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER,
@@ -184,9 +181,8 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
def test_initial_sp_call_invalid_response(self):
"""Send initial SP HTTP request and receive wrong server response."""
self.requests.register_uri('GET',
self.FEDERATION_AUTH_URL,
text='NON XML RESPONSE')
self.requests.get(self.FEDERATION_AUTH_URL,
text='NON XML RESPONSE')
self.assertRaises(
exceptions.AuthorizationFailure,
@@ -194,9 +190,8 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
self.session)
def test_send_authn_req_to_idp(self):
self.requests.register_uri('POST',
self.IDENTITY_PROVIDER_URL,
content=saml2_fixtures.SAML2_ASSERTION)
self.requests.post(self.IDENTITY_PROVIDER_URL,
content=saml2_fixtures.SAML2_ASSERTION)
self.saml2plugin.sp_response_consumer_url = self.SHIB_CONSUMER_URL
self.saml2plugin.saml2_authn_request = etree.XML(
@@ -213,9 +208,7 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
self.assertEqual(idp_response, saml2_assertion_oneline, error)
def test_fail_basicauth_idp_authentication(self):
self.requests.register_uri('POST',
self.IDENTITY_PROVIDER_URL,
status_code=401)
self.requests.post(self.IDENTITY_PROVIDER_URL, status_code=401)
self.saml2plugin.sp_response_consumer_url = self.SHIB_CONSUMER_URL
self.saml2plugin.saml2_authn_request = etree.XML(
@@ -232,8 +225,7 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
self.IDENTITY_PROVIDER_URL)
def test_send_authn_response_to_sp(self):
self.requests.register_uri(
'POST',
self.requests.post(
self.SHIB_CONSUMER_URL,
json=saml2_fixtures.UNSCOPED_TOKEN,
headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER})
@@ -263,7 +255,7 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
self.SHIB_CONSUMER_URL)
def test_consumer_url_mismatch(self):
self.requests.register_uri('POST', self.SHIB_CONSUMER_URL)
self.requests.post(self.SHIB_CONSUMER_URL)
invalid_consumer_url = uuid.uuid4().hex
self.assertRaises(
exceptions.ValidationError,
@@ -272,15 +264,13 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
invalid_consumer_url)
def test_custom_302_redirection(self):
self.requests.register_uri(
'POST',
self.requests.post(
self.SHIB_CONSUMER_URL,
text='BODY',
headers={'location': self.FEDERATION_AUTH_URL},
status_code=302)
self.requests.register_uri(
'GET',
self.requests.get(
self.FEDERATION_AUTH_URL,
json=saml2_fixtures.UNSCOPED_TOKEN,
headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER})
@@ -299,17 +289,14 @@ class AuthenticateviaSAML2Tests(utils.TestCase):
self.assertEqual('GET', response.request.method)
def test_end_to_end_workflow(self):
self.requests.register_uri(
'GET',
self.requests.get(
self.FEDERATION_AUTH_URL,
content=make_oneline(saml2_fixtures.SP_SOAP_RESPONSE))
self.requests.register_uri('POST',
self.IDENTITY_PROVIDER_URL,
content=saml2_fixtures.SAML2_ASSERTION)
self.requests.post(self.IDENTITY_PROVIDER_URL,
content=saml2_fixtures.SAML2_ASSERTION)
self.requests.register_uri(
'POST',
self.requests.post(
self.SHIB_CONSUMER_URL,
json=saml2_fixtures.UNSCOPED_TOKEN,
headers={'X-Subject-Token': saml2_fixtures.UNSCOPED_TOKEN_HEADER,
@@ -476,8 +463,8 @@ class AuthenticateviaADFSTests(utils.TestCase):
def test_get_adfs_security_token(self):
"""Test ADFSUnscopedToken._get_adfs_security_token()."""
self.requests.register_uri(
'POST', self.IDENTITY_PROVIDER_URL,
self.requests.post(
self.IDENTITY_PROVIDER_URL,
content=make_oneline(self.ADFS_SECURITY_TOKEN_RESPONSE),
status_code=200)
@@ -539,9 +526,9 @@ class AuthenticateviaADFSTests(utils.TestCase):
An exceptions.AuthorizationFailure should be raised including
error message from the XML message indicating where was the problem.
"""
self.requests.register_uri(
'POST', self.IDENTITY_PROVIDER_URL,
content=make_oneline(self.ADFS_FAULT), status_code=500)
self.requests.post(self.IDENTITY_PROVIDER_URL,
content=make_oneline(self.ADFS_FAULT),
status_code=500)
self.adfsplugin._prepare_adfs_request()
self.assertRaises(exceptions.AuthorizationFailure,
@@ -558,10 +545,9 @@ class AuthenticateviaADFSTests(utils.TestCase):
and correctly raise exceptions.InternalServerError once it cannot
parse XML fault message
"""
self.requests.register_uri(
'POST', self.IDENTITY_PROVIDER_URL,
content=b'NOT XML',
status_code=500)
self.requests.post(self.IDENTITY_PROVIDER_URL,
content=b'NOT XML',
status_code=500)
self.adfsplugin._prepare_adfs_request()
self.assertRaises(exceptions.InternalServerError,
self.adfsplugin._get_adfs_security_token,
@@ -573,9 +559,9 @@ class AuthenticateviaADFSTests(utils.TestCase):
"""Test whether SP issues a cookie."""
cookie = uuid.uuid4().hex
self.requests.register_uri('POST', self.SP_ENDPOINT,
headers={"set-cookie": cookie},
status_code=302)
self.requests.post(self.SP_ENDPOINT,
headers={"set-cookie": cookie},
status_code=302)
self.adfsplugin.adfs_token = self._build_adfs_request()
self.adfsplugin._prepare_sp_request()
@@ -584,8 +570,7 @@ class AuthenticateviaADFSTests(utils.TestCase):
self.assertEqual(1, len(self.session.session.cookies))
def test_send_assertion_to_service_provider_bad_status(self):
self.requests.register_uri('POST', self.SP_ENDPOINT,
status_code=500)
self.requests.post(self.SP_ENDPOINT, status_code=500)
self.adfsplugin.adfs_token = etree.XML(
self.ADFS_SECURITY_TOKEN_RESPONSE)
@@ -605,10 +590,9 @@ class AuthenticateviaADFSTests(utils.TestCase):
self.session)
def test_check_valid_token_when_authenticated(self):
self.requests.register_uri(
'GET', self.FEDERATION_AUTH_URL,
json=saml2_fixtures.UNSCOPED_TOKEN,
headers=client_fixtures.AUTH_RESPONSE_HEADERS)
self.requests.get(self.FEDERATION_AUTH_URL,
json=saml2_fixtures.UNSCOPED_TOKEN,
headers=client_fixtures.AUTH_RESPONSE_HEADERS)
self.session.session.cookies = [object()]
self.adfsplugin._access_service_provider(self.session)
@@ -621,18 +605,15 @@ class AuthenticateviaADFSTests(utils.TestCase):
response.json()['token'])
def test_end_to_end_workflow(self):
self.requests.register_uri(
'POST', self.IDENTITY_PROVIDER_URL,
content=self.ADFS_SECURITY_TOKEN_RESPONSE,
status_code=200)
self.requests.register_uri(
'POST', self.SP_ENDPOINT,
headers={"set-cookie": 'x'},
status_code=302)
self.requests.register_uri(
'GET', self.FEDERATION_AUTH_URL,
json=saml2_fixtures.UNSCOPED_TOKEN,
headers=client_fixtures.AUTH_RESPONSE_HEADERS)
self.requests.post(self.IDENTITY_PROVIDER_URL,
content=self.ADFS_SECURITY_TOKEN_RESPONSE,
status_code=200)
self.requests.post(self.SP_ENDPOINT,
headers={"set-cookie": 'x'},
status_code=302)
self.requests.get(self.FEDERATION_AUTH_URL,
json=saml2_fixtures.UNSCOPED_TOKEN,
headers=client_fixtures.AUTH_RESPONSE_HEADERS)
# NOTE(marek-denis): We need to mimic this until self.requests can
# issue cookies properly.

View File

@@ -61,9 +61,9 @@ class DiscoverKeystoneTests(utils.UnauthenticatedTestCase):
}
def test_get_version_local(self):
self.requests.register_uri('GET', "http://localhost:35357/",
status_code=300,
json=self.TEST_RESPONSE_DICT)
self.requests.get("http://localhost:35357/",
status_code=300,
json=self.TEST_RESPONSE_DICT)
cs = client.Client()
versions = cs.discover()

View File

@@ -349,8 +349,7 @@ class FederationProjectTests(utils.TestCase):
projects_json = {
self.collection_key: [self.new_ref(), self.new_ref()]
}
self.requests.register_uri('GET', self.URL,
json=projects_json, status_code=200)
self.requests.get(self.URL, json=projects_json)
returned_list = self.manager.list()
self.assertEqual(len(projects_ref), len(returned_list))
@@ -381,8 +380,7 @@ class FederationDomainTests(utils.TestCase):
domains_json = {
self.collection_key: domains_ref
}
self.requests.register_uri('GET', self.URL,
json=domains_json, status_code=200)
self.requests.get(self.URL, json=domains_json)
returned_list = self.manager.list()
self.assertEqual(len(domains_ref), len(returned_list))
for domain in returned_list:

View File

@@ -244,27 +244,22 @@ class CrudTests(object):
ref_list = ref_list or [self.new_ref(), self.new_ref()]
expected_path = self._get_expected_path(expected_path)
self.requests.register_uri('GET',
urlparse.urljoin(self.TEST_URL,
expected_path),
json=self.encode(ref_list))
self.requests.get(urlparse.urljoin(self.TEST_URL, expected_path),
json=self.encode(ref_list))
returned_list = self.manager.list(**filter_kwargs)
self.assertEqual(len(ref_list), len(returned_list))
[self.assertIsInstance(r, self.model) for r in returned_list]
# register_uri doesn't match the querystring component, so we have to
# explicitly test the querystring component passed by the manager
parts = urlparse.urlparse(self.requests.last_request.url)
qs_args = urlparse.parse_qs(parts.query)
qs_args = self.requests.last_request.qs
qs_args_expected = expected_query or filter_kwargs
for key, value in six.iteritems(qs_args_expected):
self.assertIn(key, qs_args)
# The httppretty.querystring value is a list
# Note we convert the value to a string, as the query string
# is always a string and the filter_kwargs may contain non-string
# values, for example a boolean, causing the comaprison to fail.
self.assertIn(str(value), qs_args[key])
# The querystring value is a list. Note we convert the value to a
# string and lower, as the query string is always a string and the
# filter_kwargs may contain non-string values, for example a
# boolean, causing the comaprison to fail.
self.assertIn(str(value).lower(), qs_args[key])
# Also check that no query string args exist which are not expected
for key in qs_args:
@@ -275,10 +270,8 @@ class CrudTests(object):
filter_kwargs = {uuid.uuid4().hex: uuid.uuid4().hex}
expected_path = self._get_expected_path()
self.requests.register_uri('GET',
urlparse.urljoin(self.TEST_URL,
expected_path),
json=self.encode(ref_list))
self.requests.get(urlparse.urljoin(self.TEST_URL, expected_path),
json=self.encode(ref_list))
self.manager.list(**filter_kwargs)
self.assertQueryStringContains(**filter_kwargs)