diff --git a/tests/test_base.py b/tests/test_base.py index ac33b81f5..a781a7ced 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -13,6 +13,7 @@ # under the License. from keystoneclient import base +from keystoneclient.v2_0 import client from keystoneclient.v2_0 import roles from tests import utils @@ -35,6 +36,12 @@ class BaseTest(utils.TestCase): self.assertEqual(base.getid(TmpObject), 4) def test_resource_lazy_getattr(self): + self.client = client.Client(username=self.TEST_USER, + token=self.TEST_TOKEN, + tenant_name=self.TEST_TENANT_NAME, + auth_url='http://127.0.0.1:5000', + endpoint='http://127.0.0.1:5000') + self.client.get = self.mox.CreateMockAnything() self.client.get('/OS-KSADM/roles/1').AndRaise(AttributeError) self.mox.ReplayAll() @@ -78,6 +85,11 @@ class ManagerTest(utils.TestCase): def setUp(self): super(ManagerTest, self).setUp() + self.client = client.Client(username=self.TEST_USER, + token=self.TEST_TOKEN, + tenant_name=self.TEST_TENANT_NAME, + auth_url='http://127.0.0.1:5000', + endpoint='http://127.0.0.1:5000') self.mgr = base.Manager(self.client) self.mgr.resource_class = base.Resource diff --git a/tests/test_http.py b/tests/test_http.py index e5c7cbcee..6cd2e4804 100644 --- a/tests/test_http.py +++ b/tests/test_http.py @@ -14,11 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. -import json -import mock - import httpretty -import requests import testtools from testtools import matchers @@ -26,12 +22,7 @@ from keystoneclient import exceptions from keystoneclient import httpclient from tests import utils - -FAKE_RESPONSE = utils.TestResponse({ - "status_code": 200, - "text": '{"hi": "there"}', -}) -MOCK_REQUEST = mock.Mock(return_value=(FAKE_RESPONSE)) +RESPONSE_BODY = '{"hi": "there"}' def get_client(): @@ -61,6 +52,8 @@ class FakeLog(object): class ClientTest(utils.TestCase): + TEST_URL = 'http://127.0.0.1:5000/hi' + def test_unauthorized_client_requests(self): cl = get_client() self.assertRaises(exceptions.AuthorizationFailure, cl.get, '/hi') @@ -68,34 +61,34 @@ class ClientTest(utils.TestCase): self.assertRaises(exceptions.AuthorizationFailure, cl.put, '/hi') self.assertRaises(exceptions.AuthorizationFailure, cl.delete, '/hi') + @httpretty.activate def test_get(self): cl = get_authed_client() - with mock.patch.object(requests, "request", MOCK_REQUEST): - with mock.patch('time.time', mock.Mock(return_value=1234)): - resp, body = cl.get("/hi") - headers = {"X-Auth-Token": "token", - "User-Agent": httpclient.USER_AGENT} - MOCK_REQUEST.assert_called_with( - "GET", - "http://127.0.0.1:5000/hi", - headers=headers, - **self.TEST_REQUEST_BASE) - # Automatic JSON parsing - self.assertEqual(body, {"hi": "there"}) + self.stub_url(httpretty.GET, body=RESPONSE_BODY) + resp, body = cl.get("/hi") + self.assertEqual(httpretty.last_request().method, 'GET') + self.assertEqual(httpretty.last_request().path, '/hi') + + req_headers = httpretty.last_request().headers + + self.assertEqual(req_headers.getheader('X-Auth-Token'), 'token') + self.assertEqual(req_headers.getheader('User-Agent'), + httpclient.USER_AGENT) + + # Automatic JSON parsing + self.assertEqual(body, {"hi": "there"}) + + @httpretty.activate def test_get_error_with_plaintext_resp(self): cl = get_authed_client() + self.stub_url(httpretty.GET, status=400, + body='Some evil plaintext string') - fake_err_response = utils.TestResponse({ - "status_code": 400, - "text": 'Some evil plaintext string', - }) - err_MOCK_REQUEST = mock.Mock(return_value=(fake_err_response)) - - with mock.patch.object(requests, "request", err_MOCK_REQUEST): - self.assertRaises(exceptions.BadRequest, cl.get, '/hi') + self.assertRaises(exceptions.BadRequest, cl.get, '/hi') + @httpretty.activate def test_get_error_with_json_resp(self): cl = get_authed_client() err_response = { @@ -105,53 +98,45 @@ class ClientTest(utils.TestCase): "message": "Error message string" } } - fake_err_response = utils.TestResponse({ - "status_code": 400, - "text": json.dumps(err_response), - "headers": {"Content-Type": "application/json"}, - }) - err_MOCK_REQUEST = mock.Mock(return_value=(fake_err_response)) - - with mock.patch.object(requests, "request", err_MOCK_REQUEST): - exc_raised = False - try: - cl.get('/hi') - except exceptions.BadRequest as exc: - exc_raised = True - self.assertEqual(exc.message, "Error message string") - self.assertTrue(exc_raised, 'Exception not raised.') + self.stub_url(httpretty.GET, status=400, json=err_response) + exc_raised = False + try: + cl.get('/hi') + except exceptions.BadRequest as exc: + exc_raised = True + self.assertEqual(exc.message, "Error message string") + self.assertTrue(exc_raised, 'Exception not raised.') + @httpretty.activate def test_post(self): cl = get_authed_client() - with mock.patch.object(requests, "request", MOCK_REQUEST): - cl.post("/hi", body=[1, 2, 3]) - headers = { - "X-Auth-Token": "token", - "Content-Type": "application/json", - "User-Agent": httpclient.USER_AGENT - } - MOCK_REQUEST.assert_called_with( - "POST", - "http://127.0.0.1:5000/hi", - headers=headers, - data='[1, 2, 3]', - **self.TEST_REQUEST_BASE) + self.stub_url(httpretty.POST) + cl.post("/hi", body=[1, 2, 3]) + self.assertEqual(httpretty.last_request().method, 'POST') + self.assertEqual(httpretty.last_request().body, '[1, 2, 3]') + + req_headers = httpretty.last_request().headers + + self.assertEqual(req_headers.getheader('X-Auth-Token'), 'token') + self.assertEqual(req_headers.getheader('Content-Type'), + 'application/json') + self.assertEqual(req_headers.getheader('User-Agent'), + httpclient.USER_AGENT) + + @httpretty.activate def test_forwarded_for(self): ORIGINAL_IP = "10.100.100.1" cl = httpclient.HTTPClient(username="username", password="password", tenant_id="tenant", auth_url="auth_test", original_ip=ORIGINAL_IP) - with mock.patch.object(requests, "request", MOCK_REQUEST): - cl.request('/', 'GET') + self.stub_url(httpretty.GET) - args, kwargs = MOCK_REQUEST.call_args - self.assertIn( - ('Forwarded', "for=%s;by=%s" % (ORIGINAL_IP, - httpclient.USER_AGENT)), - kwargs['headers'].items()) + cl.request(self.TEST_URL, 'GET') + self.assertEqual(httpretty.last_request().headers['Forwarded'], + "for=%s;by=%s" % (ORIGINAL_IP, httpclient.USER_AGENT)) def test_client_deprecated(self): # Can resolve symbols from the keystoneclient.client module. @@ -174,11 +159,6 @@ class BasicRequestTests(testtools.TestCase): def setUp(self): super(BasicRequestTests, self).setUp() self.logger = FakeLog() - httpretty.enable() - - def tearDown(self): - httpretty.disable() - super(BasicRequestTests, self).tearDown() def request(self, method='GET', response='Test Response', status=200, url=None, **kwargs): @@ -190,10 +170,7 @@ class BasicRequestTests(testtools.TestCase): return httpclient.request(url, method, debug=True, logger=self.logger, **kwargs) - @property - def last_request(self): - return httpretty.httpretty.last_request - + @httpretty.activate def test_basic_params(self): method = 'GET' response = 'Test Response' @@ -201,7 +178,7 @@ class BasicRequestTests(testtools.TestCase): self.request(method=method, status=status, response=response) - self.assertEqual(self.last_request.method, method) + self.assertEqual(httpretty.last_request().method, method) self.assertThat(self.logger.debug_log, matchers.Contains('curl')) self.assertThat(self.logger.debug_log, matchers.Contains('-X %s' % @@ -211,18 +188,20 @@ class BasicRequestTests(testtools.TestCase): self.assertThat(self.logger.debug_log, matchers.Contains(str(status))) self.assertThat(self.logger.debug_log, matchers.Contains(response)) + @httpretty.activate def test_headers(self): headers = {'key': 'val', 'test': 'other'} self.request(headers=headers) for k, v in headers.iteritems(): - self.assertEqual(self.last_request.headers[k], v) + self.assertEqual(httpretty.last_request().headers[k], v) for header in headers.iteritems(): self.assertThat(self.logger.debug_log, matchers.Contains('-H "%s: %s"' % header)) + @httpretty.activate def test_body(self): data = "BODY DATA" self.request(response=data) diff --git a/tests/test_https.py b/tests/test_https.py index c253ec392..5bbccba86 100644 --- a/tests/test_https.py +++ b/tests/test_https.py @@ -12,7 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -import copy import mock import requests @@ -20,13 +19,15 @@ import requests from keystoneclient import httpclient from tests import utils - FAKE_RESPONSE = utils.TestResponse({ "status_code": 200, "text": '{"hi": "there"}', }) MOCK_REQUEST = mock.Mock(return_value=(FAKE_RESPONSE)) +REQUEST_URL = 'https://127.0.0.1:5000/hi' +RESPONSE_BODY = '{"hi": "there"}' + def get_client(): cl = httpclient.HTTPClient(username="username", password="password", @@ -44,44 +45,49 @@ def get_authed_client(): class ClientTest(utils.TestCase): + def setUp(self): + super(ClientTest, self).setUp() + self.request_patcher = mock.patch.object(requests, 'request', + self.mox.CreateMockAnything()) + self.request_patcher.start() + + def tearDown(self): + self.request_patcher.stop() + super(ClientTest, self).tearDown() + def test_get(self): cl = get_authed_client() with mock.patch.object(requests, "request", MOCK_REQUEST): - with mock.patch('time.time', mock.Mock(return_value=1234)): - resp, body = cl.get("/hi") - headers = {"X-Auth-Token": "token", - "User-Agent": httpclient.USER_AGENT} - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['cert'] = ('cert.pem', 'key.pem') - kwargs['verify'] = 'ca.pem' - MOCK_REQUEST.assert_called_with( - "GET", - "https://127.0.0.1:5000/hi", - headers=headers, - **kwargs) - # Automatic JSON parsing - self.assertEqual(body, {"hi": "there"}) + resp, body = cl.get("/hi") + + # this may become too tightly couple later + mock_args, mock_kwargs = MOCK_REQUEST.call_args + + self.assertEqual(mock_args[0], 'GET') + self.assertEqual(mock_args[1], REQUEST_URL) + self.assertEqual(mock_kwargs['headers']['X-Auth-Token'], 'token') + self.assertEqual(mock_kwargs['cert'], ('cert.pem', 'key.pem')) + self.assertEqual(mock_kwargs['verify'], 'ca.pem') + + # Automatic JSON parsing + self.assertEqual(body, {"hi": "there"}) def test_post(self): cl = get_authed_client() with mock.patch.object(requests, "request", MOCK_REQUEST): cl.post("/hi", body=[1, 2, 3]) - headers = { - "X-Auth-Token": "token", - "Content-Type": "application/json", - "User-Agent": httpclient.USER_AGENT - } - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['cert'] = ('cert.pem', 'key.pem') - kwargs['verify'] = 'ca.pem' - MOCK_REQUEST.assert_called_with( - "POST", - "https://127.0.0.1:5000/hi", - headers=headers, - data='[1, 2, 3]', - **kwargs) + + # this may become too tightly couple later + mock_args, mock_kwargs = MOCK_REQUEST.call_args + + self.assertEqual(mock_args[0], 'POST') + self.assertEqual(mock_args[1], REQUEST_URL) + self.assertEqual(mock_kwargs['data'], '[1, 2, 3]') + self.assertEqual(mock_kwargs['headers']['X-Auth-Token'], 'token') + self.assertEqual(mock_kwargs['cert'], ('cert.pem', 'key.pem')) + self.assertEqual(mock_kwargs['verify'], 'ca.pem') def test_post_auth(self): with mock.patch.object(requests, "request", MOCK_REQUEST): @@ -92,17 +98,13 @@ class ClientTest(utils.TestCase): cl.management_url = "https://127.0.0.1:5000" cl.auth_token = "token" cl.post("/hi", body=[1, 2, 3]) - headers = { - "X-Auth-Token": "token", - "Content-Type": "application/json", - "User-Agent": httpclient.USER_AGENT - } - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['cert'] = ('cert.pem', 'key.pem') - kwargs['verify'] = 'ca.pem' - MOCK_REQUEST.assert_called_with( - "POST", - "https://127.0.0.1:5000/hi", - headers=headers, - data='[1, 2, 3]', - **kwargs) + + # this may become too tightly couple later + mock_args, mock_kwargs = MOCK_REQUEST.call_args + + self.assertEqual(mock_args[0], 'POST') + self.assertEqual(mock_args[1], REQUEST_URL) + self.assertEqual(mock_kwargs['data'], '[1, 2, 3]') + self.assertEqual(mock_kwargs['headers']['X-Auth-Token'], 'token') + self.assertEqual(mock_kwargs['cert'], ('cert.pem', 'key.pem')) + self.assertEqual(mock_kwargs['verify'], 'ca.pem') diff --git a/tests/utils.py b/tests/utils.py index 87ddeba4c..24e8bed41 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -12,14 +12,16 @@ # License for the specific language governing permissions and limitations # under the License. +import sys import time +import httpretty import mock from mox3 import mox import requests import testtools -from keystoneclient.v2_0 import client +from keystoneclient.openstack.common import jsonutils class TestCase(testtools.TestCase): @@ -29,110 +31,63 @@ class TestCase(testtools.TestCase): TEST_TENANT_NAME = 'aTenant' TEST_TOKEN = 'aToken' TEST_USER = 'test' - TEST_ROOT_URL = 'http://127.0.0.1:5000/' - TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v2.0') - TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/' - TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v2.0') - TEST_REQUEST_BASE = { - 'verify': True, - } - TEST_SERVICE_CATALOG = [{ - "endpoints": [{ - "adminURL": "http://cdn.admin-nets.local:8774/v1.0", - "region": "RegionOne", - "internalURL": "http://127.0.0.1:8774/v1.0", - "publicURL": "http://cdn.admin-nets.local:8774/v1.0/" - }], - "type": "nova_compat", - "name": "nova_compat" - }, { - "endpoints": [{ - "adminURL": "http://nova/novapi/admin", - "region": "RegionOne", - "internalURL": "http://nova/novapi/internal", - "publicURL": "http://nova/novapi/public" - }], - "type": "compute", - "name": "nova" - }, { - "endpoints": [{ - "adminURL": "http://glance/glanceapi/admin", - "region": "RegionOne", - "internalURL": "http://glance/glanceapi/internal", - "publicURL": "http://glance/glanceapi/public" - }], - "type": "image", - "name": "glance" - }, { - "endpoints": [{ - "adminURL": "http://127.0.0.1:35357/v2.0", - "region": "RegionOne", - "internalURL": "http://127.0.0.1:5000/v2.0", - "publicURL": "http://127.0.0.1:5000/v2.0" - }], - "type": "identity", - "name": "keystone" - }, { - "endpoints": [{ - "adminURL": "http://swift/swiftapi/admin", - "region": "RegionOne", - "internalURL": "http://swift/swiftapi/internal", - "publicURL": "http://swift/swiftapi/public" - }], - "type": "object-store", - "name": "swift" - }] + TEST_ROOT_URL = 'http://127.0.0.1:5000/' def setUp(self): super(TestCase, self).setUp() self.mox = mox.Mox() - self.request_patcher = mock.patch.object(requests, 'request', - self.mox.CreateMockAnything()) - self.time_patcher = mock.patch.object(time, 'time', - lambda: 1234) - self.request_patcher.start() + self.time_patcher = mock.patch.object(time, 'time', lambda: 1234) self.time_patcher.start() - self.client = client.Client(username=self.TEST_USER, - token=self.TEST_TOKEN, - tenant_name=self.TEST_TENANT_NAME, - auth_url=self.TEST_URL, - endpoint=self.TEST_URL) def tearDown(self): - self.request_patcher.stop() self.time_patcher.stop() self.mox.UnsetStubs() self.mox.VerifyAll() super(TestCase, self).tearDown() + def stub_url(self, method, parts=None, base_url=None, json=None, **kwargs): + if not base_url: + base_url = self.TEST_URL -class UnauthenticatedTestCase(testtools.TestCase): - """Class used as base for unauthenticated calls.""" - TEST_ROOT_URL = 'http://127.0.0.1:5000/' - TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v2.0') - TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/' - TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v2.0') - TEST_REQUEST_BASE = { - 'verify': True, - } + if json: + kwargs['body'] = jsonutils.dumps(json) + kwargs['content_type'] = 'application/json' - def setUp(self): - super(UnauthenticatedTestCase, self).setUp() - self.mox = mox.Mox() - self.request_patcher = mock.patch.object(requests, 'request', - self.mox.CreateMockAnything()) - self.time_patcher = mock.patch.object(time, 'time', - lambda: 1234) - self.request_patcher.start() - self.time_patcher.start() + if parts: + url = '/'.join([p.strip('/') for p in [base_url] + parts]) + else: + url = base_url - def tearDown(self): - self.request_patcher.stop() - self.time_patcher.stop() - self.mox.UnsetStubs() - self.mox.VerifyAll() - super(UnauthenticatedTestCase, self).tearDown() + httpretty.register_uri(method, url, **kwargs) + + def assertRequestBodyIs(self, body=None, json=None): + if json: + val = jsonutils.loads(httpretty.last_request().body) + self.assertEqual(json, val) + elif body: + self.assertEqual(body, httpretty.last_request().body) + + def assertQueryStringIs(self, val): + self.assertEqual(httpretty.last_request().querystring, val) + + +if tuple(sys.version_info)[0:2] < (2, 7): + + def assertDictEqual(self, d1, d2, msg=None): + # Simple version taken from 2.7 + self.assertIsInstance(d1, dict, + 'First argument is not a dictionary') + self.assertIsInstance(d2, dict, + 'Second argument is not a dictionary') + if d1 != d2: + if msg: + self.fail(msg) + else: + standardMsg = '%r != %r' % (d1, d2) + self.fail(standardMsg) + + TestCase.assertDictEqual = assertDictEqual class TestResponse(requests.Response): diff --git a/tests/v2_0/fakes.py b/tests/v2_0/fakes.py index 8152322e6..b02c9faa4 100644 --- a/tests/v2_0/fakes.py +++ b/tests/v2_0/fakes.py @@ -16,7 +16,7 @@ import urlparse from tests import fakes -from tests import utils +from tests.v2_0 import utils class FakeHTTPClient(fakes.FakeClient): diff --git a/tests/v2_0/test_access.py b/tests/v2_0/test_access.py index 65154d525..3eaa11a93 100644 --- a/tests/v2_0/test_access.py +++ b/tests/v2_0/test_access.py @@ -17,8 +17,8 @@ import datetime from keystoneclient import access from keystoneclient.openstack.common import timeutils from tests import client_fixtures as token_data -from tests import utils from tests.v2_0 import client_fixtures +from tests.v2_0 import utils UNSCOPED_TOKEN = client_fixtures.UNSCOPED_TOKEN PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN diff --git a/tests/v2_0/test_auth.py b/tests/v2_0/test_auth.py index 19cca5cdf..4d3a2116d 100644 --- a/tests/v2_0/test_auth.py +++ b/tests/v2_0/test_auth.py @@ -12,17 +12,15 @@ # License for the specific language governing permissions and limitations # under the License. -import copy import datetime import json -import requests +import httpretty from keystoneclient import exceptions from keystoneclient.openstack.common import timeutils from keystoneclient.v2_0 import client - -from tests import utils +from tests.v2_0 import utils class AuthenticateAgainstKeystoneTests(utils.TestCase): @@ -52,76 +50,50 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): "tenantId": self.TEST_TENANT_ID, }, } - self.TEST_REQUEST_HEADERS = { - 'Content-Type': 'application/json', - 'User-Agent': 'python-keystoneclient', - } + @httpretty.activate def test_authenticate_success_expired(self): # Build an expired token self.TEST_RESPONSE_DICT['access']['token']['expires'] = \ (timeutils.utcnow() - datetime.timedelta(1)).isoformat() - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_RESPONSE_DICT), - }) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY) - requests.request('POST', - self.TEST_URL + "/tokens", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() - - cs = client.Client(tenant_id=self.TEST_TENANT_ID, - auth_url=self.TEST_URL, - username=self.TEST_USER, - password=self.TEST_TOKEN) - self.assertEqual(cs.management_url, - self.TEST_RESPONSE_DICT["access"]["serviceCatalog"][3] - ['endpoints'][0]["adminURL"]) + exp_resp = httpretty.Response(body=json.dumps(self.TEST_RESPONSE_DICT), + content_type='application/json') # Build a new response - self.mox.ResetAll() TEST_TOKEN = "abcdef" self.TEST_RESPONSE_DICT['access']['token']['expires'] = \ '2020-01-01T00:00:10.000123Z' self.TEST_RESPONSE_DICT['access']['token']['id'] = TEST_TOKEN - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_RESPONSE_DICT), - }) - requests.request('POST', - self.TEST_URL + "/tokens", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + new_resp = httpretty.Response(body=json.dumps(self.TEST_RESPONSE_DICT), + content_type='application/json') + + # return expired first, and then the new response + self.stub_auth(responses=[exp_resp, new_resp]) + + cs = client.Client(tenant_id=self.TEST_TENANT_ID, + auth_url=self.TEST_URL, + username=self.TEST_USER, + password=self.TEST_TOKEN) + + self.assertEqual(cs.management_url, + self.TEST_RESPONSE_DICT["access"]["serviceCatalog"][3] + ['endpoints'][0]["adminURL"]) self.assertEqual(cs.auth_token, TEST_TOKEN) + self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY) + @httpretty.activate def test_authenticate_failure(self): _auth = 'auth' _cred = 'passwordCredentials' _pass = 'password' self.TEST_REQUEST_BODY[_auth][_cred][_pass] = 'bad_key' - resp = utils.TestResponse({ - "status_code": 401, - "text": json.dumps({ - "unauthorized": { - "message": "Unauthorized", - "code": "401", - }, - }), - }) + error = {"unauthorized": {"message": "Unauthorized", + "code": "401"}} - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY) - requests.request('POST', - self.TEST_URL + "/tokens", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_auth(status=401, json=error) # Workaround for issue with assertRaises on python2.6 # where with assertRaises(exceptions.Unauthorized): doesn't work @@ -133,39 +105,15 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): auth_url=self.TEST_URL) self.assertRaises(exceptions.Unauthorized, client_create_wrapper) + self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY) + @httpretty.activate def test_auth_redirect(self): - correct_response = json.dumps(self.TEST_RESPONSE_DICT) - dict_responses = [ - { - "headers": { - 'location': self.TEST_ADMIN_URL + "/tokens", - }, - "status_code": 305, - "text": "Use proxy", - }, - { - "headers": {}, - "status_code": 200, - "text": correct_response, - }, - ] - responses = [(utils.TestResponse(resp)) - for resp in dict_responses] + self.stub_auth(status=305, body='Use Proxy', + location=self.TEST_ADMIN_URL + "/tokens") - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY) - requests.request('POST', - self.TEST_URL + "/tokens", - **kwargs).AndReturn(responses[0]) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY) - requests.request('POST', - self.TEST_ADMIN_URL + "/tokens", - **kwargs).AndReturn(responses[1]) - self.mox.ReplayAll() + self.stub_auth(base_url=self.TEST_ADMIN_URL, + json=self.TEST_RESPONSE_DICT) cs = client.Client(username=self.TEST_USER, password=self.TEST_TOKEN, @@ -177,20 +125,11 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): ['endpoints'][0]["adminURL"]) self.assertEqual(cs.auth_token, self.TEST_RESPONSE_DICT["access"]["token"]["id"]) + self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY) + @httpretty.activate def test_authenticate_success_password_scoped(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_RESPONSE_DICT), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY) - requests.request('POST', - self.TEST_URL + "/tokens", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_auth(json=self.TEST_RESPONSE_DICT) cs = client.Client(username=self.TEST_USER, password=self.TEST_TOKEN, @@ -201,22 +140,14 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): ['endpoints'][0]["adminURL"]) self.assertEqual(cs.auth_token, self.TEST_RESPONSE_DICT["access"]["token"]["id"]) + self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY) + @httpretty.activate def test_authenticate_success_password_unscoped(self): del self.TEST_RESPONSE_DICT['access']['serviceCatalog'] del self.TEST_REQUEST_BODY['auth']['tenantId'] - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_RESPONSE_DICT), - }) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY) - requests.request('POST', - self.TEST_URL + "/tokens", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_auth(json=self.TEST_RESPONSE_DICT) cs = client.Client(username=self.TEST_USER, password=self.TEST_TOKEN, @@ -224,23 +155,13 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): self.assertEqual(cs.auth_token, self.TEST_RESPONSE_DICT["access"]["token"]["id"]) self.assertFalse('serviceCatalog' in cs.service_catalog.catalog) + self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY) + @httpretty.activate def test_authenticate_success_token_scoped(self): del self.TEST_REQUEST_BODY['auth']['passwordCredentials'] self.TEST_REQUEST_BODY['auth']['token'] = {'id': self.TEST_TOKEN} - self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_RESPONSE_DICT), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY) - requests.request('POST', - self.TEST_URL + "/tokens", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_auth(json=self.TEST_RESPONSE_DICT) cs = client.Client(token=self.TEST_TOKEN, tenant_id=self.TEST_TENANT_ID, @@ -250,28 +171,20 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): ['endpoints'][0]["adminURL"]) self.assertEqual(cs.auth_token, self.TEST_RESPONSE_DICT["access"]["token"]["id"]) + self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY) + @httpretty.activate def test_authenticate_success_token_unscoped(self): del self.TEST_REQUEST_BODY['auth']['passwordCredentials'] del self.TEST_REQUEST_BODY['auth']['tenantId'] del self.TEST_RESPONSE_DICT['access']['serviceCatalog'] self.TEST_REQUEST_BODY['auth']['token'] = {'id': self.TEST_TOKEN} - self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_RESPONSE_DICT), - }) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY) - requests.request('POST', - self.TEST_URL + "/tokens", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_auth(json=self.TEST_RESPONSE_DICT) cs = client.Client(token=self.TEST_TOKEN, auth_url=self.TEST_URL) self.assertEqual(cs.auth_token, self.TEST_RESPONSE_DICT["access"]["token"]["id"]) self.assertFalse('serviceCatalog' in cs.service_catalog.catalog) + self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY) diff --git a/tests/v2_0/test_client.py b/tests/v2_0/test_client.py index d92cb4caf..6c93fcdef 100644 --- a/tests/v2_0/test_client.py +++ b/tests/v2_0/test_client.py @@ -13,98 +13,91 @@ # under the License. import json -import mock -import requests +import httpretty from keystoneclient import exceptions from keystoneclient.v2_0 import client -from tests import utils from tests.v2_0 import client_fixtures +from tests.v2_0 import utils class KeystoneClientTest(utils.TestCase): - def setUp(self): - super(KeystoneClientTest, self).setUp() - - scoped_fake_resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(client_fixtures.PROJECT_SCOPED_TOKEN) - }) - self.scoped_mock_req = mock.Mock(return_value=scoped_fake_resp) - - unscoped_fake_resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(client_fixtures.UNSCOPED_TOKEN) - }) - self.unscoped_mock_req = mock.Mock(return_value=unscoped_fake_resp) + @httpretty.activate def test_unscoped_init(self): - with mock.patch.object(requests, "request", self.unscoped_mock_req): - c = client.Client(username='exampleuser', - password='password', - auth_url='http://somewhere/') - self.assertIsNotNone(c.auth_ref) - self.assertFalse(c.auth_ref.scoped) - self.assertFalse(c.auth_ref.domain_scoped) - self.assertFalse(c.auth_ref.project_scoped) - self.assertIsNone(c.auth_ref.trust_id) - self.assertFalse(c.auth_ref.trust_scoped) + self.stub_auth(json=client_fixtures.UNSCOPED_TOKEN) + c = client.Client(username='exampleuser', + password='password', + auth_url=self.TEST_URL) + self.assertIsNotNone(c.auth_ref) + self.assertFalse(c.auth_ref.scoped) + self.assertFalse(c.auth_ref.domain_scoped) + self.assertFalse(c.auth_ref.project_scoped) + self.assertIsNone(c.auth_ref.trust_id) + self.assertFalse(c.auth_ref.trust_scoped) + + @httpretty.activate def test_scoped_init(self): - with mock.patch.object(requests, "request", self.scoped_mock_req): - c = client.Client(username='exampleuser', - password='password', - tenant_name='exampleproject', - auth_url='http://somewhere/') - self.assertIsNotNone(c.auth_ref) - self.assertTrue(c.auth_ref.scoped) - self.assertTrue(c.auth_ref.project_scoped) - self.assertFalse(c.auth_ref.domain_scoped) - self.assertIsNone(c.auth_ref.trust_id) - self.assertFalse(c.auth_ref.trust_scoped) + self.stub_auth(json=client_fixtures.PROJECT_SCOPED_TOKEN) + c = client.Client(username='exampleuser', + password='password', + tenant_name='exampleproject', + auth_url=self.TEST_URL) + self.assertIsNotNone(c.auth_ref) + self.assertTrue(c.auth_ref.scoped) + self.assertTrue(c.auth_ref.project_scoped) + self.assertFalse(c.auth_ref.domain_scoped) + self.assertIsNone(c.auth_ref.trust_id) + self.assertFalse(c.auth_ref.trust_scoped) + + @httpretty.activate def test_auth_ref_load(self): - with mock.patch.object(requests, "request", self.scoped_mock_req): - cl = client.Client(username='exampleuser', - password='password', - tenant_name='exampleproject', - auth_url='http://somewhere/') - cache = json.dumps(cl.auth_ref) - new_client = client.Client(auth_ref=json.loads(cache)) - self.assertIsNotNone(new_client.auth_ref) - self.assertTrue(new_client.auth_ref.scoped) - self.assertTrue(new_client.auth_ref.project_scoped) - self.assertFalse(new_client.auth_ref.domain_scoped) - self.assertIsNone(new_client.auth_ref.trust_id) - self.assertFalse(new_client.auth_ref.trust_scoped) - self.assertEquals(new_client.username, 'exampleuser') - self.assertIsNone(new_client.password) - self.assertEqual(new_client.management_url, - 'http://admin:35357/v2.0') + self.stub_auth(json=client_fixtures.PROJECT_SCOPED_TOKEN) + cl = client.Client(username='exampleuser', + password='password', + tenant_name='exampleproject', + auth_url=self.TEST_URL) + cache = json.dumps(cl.auth_ref) + new_client = client.Client(auth_ref=json.loads(cache)) + self.assertIsNotNone(new_client.auth_ref) + self.assertTrue(new_client.auth_ref.scoped) + self.assertTrue(new_client.auth_ref.project_scoped) + self.assertFalse(new_client.auth_ref.domain_scoped) + self.assertIsNone(new_client.auth_ref.trust_id) + self.assertFalse(new_client.auth_ref.trust_scoped) + self.assertEquals(new_client.username, 'exampleuser') + self.assertIsNone(new_client.password) + self.assertEqual(new_client.management_url, + 'http://admin:35357/v2.0') + + @httpretty.activate def test_auth_ref_load_with_overridden_arguments(self): - with mock.patch.object(requests, "request", self.scoped_mock_req): - cl = client.Client(username='exampleuser', - password='password', - tenant_name='exampleproject', - auth_url='http://somewhere/') - cache = json.dumps(cl.auth_ref) - new_auth_url = "http://new-public:5000/v2.0" - new_client = client.Client(auth_ref=json.loads(cache), - auth_url=new_auth_url) - self.assertIsNotNone(new_client.auth_ref) - self.assertTrue(new_client.auth_ref.scoped) - self.assertTrue(new_client.auth_ref.scoped) - self.assertTrue(new_client.auth_ref.project_scoped) - self.assertFalse(new_client.auth_ref.domain_scoped) - self.assertIsNone(new_client.auth_ref.trust_id) - self.assertFalse(new_client.auth_ref.trust_scoped) - self.assertEquals(new_client.auth_url, new_auth_url) - self.assertEquals(new_client.username, 'exampleuser') - self.assertIsNone(new_client.password) - self.assertEqual(new_client.management_url, - 'http://admin:35357/v2.0') + self.stub_auth(json=client_fixtures.PROJECT_SCOPED_TOKEN) + + cl = client.Client(username='exampleuser', + password='password', + tenant_name='exampleproject', + auth_url=self.TEST_URL) + cache = json.dumps(cl.auth_ref) + new_auth_url = "http://new-public:5000/v2.0" + new_client = client.Client(auth_ref=json.loads(cache), + auth_url=new_auth_url) + self.assertIsNotNone(new_client.auth_ref) + self.assertTrue(new_client.auth_ref.scoped) + self.assertTrue(new_client.auth_ref.scoped) + self.assertTrue(new_client.auth_ref.project_scoped) + self.assertFalse(new_client.auth_ref.domain_scoped) + self.assertIsNone(new_client.auth_ref.trust_id) + self.assertFalse(new_client.auth_ref.trust_scoped) + self.assertEquals(new_client.auth_url, new_auth_url) + self.assertEquals(new_client.username, 'exampleuser') + self.assertIsNone(new_client.password) + self.assertEqual(new_client.management_url, + 'http://admin:35357/v2.0') def test_init_err_no_auth_url(self): self.assertRaises(exceptions.AuthorizationFailure, diff --git a/tests/v2_0/test_discovery.py b/tests/v2_0/test_discovery.py index 47ddca49d..c85d9dea0 100644 --- a/tests/v2_0/test_discovery.py +++ b/tests/v2_0/test_discovery.py @@ -12,12 +12,10 @@ # License for the specific language governing permissions and limitations # under the License. -import copy -import json -import requests +import httpretty from keystoneclient.generic import client -from tests import utils +from tests.v2_0 import utils class DiscoverKeystoneTests(utils.UnauthenticatedTestCase): @@ -55,23 +53,11 @@ class DiscoverKeystoneTests(utils.UnauthenticatedTestCase): }], }, } - self.TEST_REQUEST_HEADERS = { - 'User-Agent': 'python-keystoneclient', - 'Accept': 'application/json', - } + @httpretty.activate def test_get_versions(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_RESPONSE_DICT), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('GET', - self.TEST_ROOT_URL, - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, base_url=self.TEST_ROOT_URL, + json=self.TEST_RESPONSE_DICT) cs = client.Client() versions = cs.discover(self.TEST_ROOT_URL) @@ -83,17 +69,10 @@ class DiscoverKeystoneTests(utils.UnauthenticatedTestCase): self.TEST_RESPONSE_DICT['versions']['values'][0]['links'][0] ['href']) + @httpretty.activate def test_get_version_local(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_RESPONSE_DICT), - }) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('GET', - "http://localhost:35357", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, base_url="http://localhost:35357/", + json=self.TEST_RESPONSE_DICT) cs = client.Client() versions = cs.discover() diff --git a/tests/v2_0/test_ec2.py b/tests/v2_0/test_ec2.py index 3d2048d1f..30b7ead7a 100644 --- a/tests/v2_0/test_ec2.py +++ b/tests/v2_0/test_ec2.py @@ -12,29 +12,15 @@ # License for the specific language governing permissions and limitations # under the License. -import copy -import json -import urlparse - -import requests +import httpretty from keystoneclient.v2_0 import ec2 -from tests import utils +from tests.v2_0 import utils class EC2Tests(utils.TestCase): - def setUp(self): - super(EC2Tests, self).setUp() - self.TEST_REQUEST_HEADERS = { - 'X-Auth-Token': 'aToken', - 'User-Agent': 'python-keystoneclient', - } - self.TEST_POST_HEADERS = { - 'Content-Type': 'application/json', - 'X-Auth-Token': 'aToken', - 'User-Agent': 'python-keystoneclient', - } + @httpretty.activate def test_create(self): user_id = 'usr' tenant_id = 'tnt' @@ -50,20 +36,8 @@ class EC2Tests(utils.TestCase): "enabled": True, } } - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(resp_body), - }) - - url = urlparse.urljoin(self.TEST_URL, - 'v2.0/users/%s/credentials/OS-EC2' % user_id) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_POST_HEADERS - kwargs['data'] = json.dumps(req_body) - requests.request('POST', - url, - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.POST, ['users', user_id, 'credentials', + 'OS-EC2'], json=resp_body) cred = self.client.ec2.create(user_id, tenant_id) self.assertTrue(isinstance(cred, ec2.EC2)) @@ -71,7 +45,9 @@ class EC2Tests(utils.TestCase): self.assertEqual(cred.enabled, True) self.assertEqual(cred.access, 'access') self.assertEqual(cred.secret, 'secret') + self.assertRequestBodyIs(json=req_body) + @httpretty.activate def test_get(self): user_id = 'usr' tenant_id = 'tnt' @@ -84,20 +60,8 @@ class EC2Tests(utils.TestCase): "enabled": True, } } - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(resp_body), - }) - - url = urlparse.urljoin(self.TEST_URL, - 'v2.0/users/%s/credentials/OS-EC2/%s' % - (user_id, 'access')) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('GET', - url, - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['users', user_id, 'credentials', + 'OS-EC2', 'access'], json=resp_body) cred = self.client.ec2.get(user_id, 'access') self.assertTrue(isinstance(cred, ec2.EC2)) @@ -106,6 +70,7 @@ class EC2Tests(utils.TestCase): self.assertEqual(cred.access, 'access') self.assertEqual(cred.secret, 'secret') + @httpretty.activate def test_list(self): user_id = 'usr' tenant_id = 'tnt' @@ -129,20 +94,8 @@ class EC2Tests(utils.TestCase): ] } } - - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(resp_body), - }) - - url = urlparse.urljoin(self.TEST_URL, - 'v2.0/users/%s/credentials/OS-EC2' % user_id) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('GET', - url, - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['users', user_id, 'credentials', + 'OS-EC2'], json=resp_body) creds = self.client.ec2.list(user_id) self.assertTrue(len(creds), 2) @@ -153,22 +106,10 @@ class EC2Tests(utils.TestCase): self.assertEqual(cred.access, 'access') self.assertEqual(cred.secret, 'secret') + @httpretty.activate def test_delete(self): user_id = 'usr' access = 'access' - resp = utils.TestResponse({ - "status_code": 204, - "text": "", - }) - - url = urlparse.urljoin(self.TEST_URL, - 'v2.0/users/%s/credentials/OS-EC2/%s' % - (user_id, access)) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('DELETE', - url, - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() - + self.stub_url(httpretty.DELETE, ['users', user_id, 'credentials', + 'OS-EC2', access], status=204) self.client.ec2.delete(user_id, access) diff --git a/tests/v2_0/test_endpoints.py b/tests/v2_0/test_endpoints.py index 99b6539dd..dfbbc6129 100644 --- a/tests/v2_0/test_endpoints.py +++ b/tests/v2_0/test_endpoints.py @@ -12,28 +12,15 @@ # License for the specific language governing permissions and limitations # under the License. -import copy -import json -import urlparse - -import requests +import httpretty from keystoneclient.v2_0 import endpoints -from tests import utils +from tests.v2_0 import utils class EndpointTests(utils.TestCase): def setUp(self): super(EndpointTests, self).setUp() - self.TEST_REQUEST_HEADERS = { - 'X-Auth-Token': 'aToken', - 'User-Agent': 'python-keystoneclient', - } - self.TEST_POST_HEADERS = { - 'Content-Type': 'application/json', - 'X-Auth-Token': 'aToken', - 'User-Agent': 'python-keystoneclient', - } self.TEST_ENDPOINTS = { 'endpoints': [ { @@ -53,6 +40,7 @@ class EndpointTests(utils.TestCase): ] } + @httpretty.activate def test_create(self): req_body = { "endpoint": { @@ -74,19 +62,7 @@ class EndpointTests(utils.TestCase): } } - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(resp_body), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_POST_HEADERS - kwargs['data'] = json.dumps(req_body) - requests.request('POST', - urlparse.urljoin(self.TEST_URL, - 'v2.0/endpoints'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.POST, ['endpoints'], json=resp_body) endpoint = self.client.endpoints.create( region=req_body['endpoint']['region'], @@ -96,35 +72,16 @@ class EndpointTests(utils.TestCase): service_id=req_body['endpoint']['service_id'] ) self.assertTrue(isinstance(endpoint, endpoints.Endpoint)) + self.assertRequestBodyIs(json=req_body) + @httpretty.activate def test_delete(self): - resp = utils.TestResponse({ - "status_code": 204, - "text": "", - }) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('DELETE', - urlparse.urljoin(self.TEST_URL, - 'v2.0/endpoints/8f953'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() - + self.stub_url(httpretty.DELETE, ['endpoints', '8f953'], status=204) self.client.endpoints.delete('8f953') + @httpretty.activate def test_list(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_ENDPOINTS), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('GET', - urlparse.urljoin(self.TEST_URL, - 'v2.0/endpoints'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['endpoints'], json=self.TEST_ENDPOINTS) endpoint_list = self.client.endpoints.list() [self.assertTrue(isinstance(r, endpoints.Endpoint)) diff --git a/tests/v2_0/test_roles.py b/tests/v2_0/test_roles.py index 1cd299fcd..21b9ec581 100644 --- a/tests/v2_0/test_roles.py +++ b/tests/v2_0/test_roles.py @@ -12,28 +12,15 @@ # License for the specific language governing permissions and limitations # under the License. -import copy -import json -import urlparse - -import requests +import httpretty from keystoneclient.v2_0 import roles -from tests import utils +from tests.v2_0 import utils class RoleTests(utils.TestCase): def setUp(self): super(RoleTests, self).setUp() - self.TEST_REQUEST_HEADERS = { - 'X-Auth-Token': 'aToken', - 'User-Agent': 'python-keystoneclient', - } - self.TEST_POST_HEADERS = { - 'Content-Type': 'application/json', - 'X-Auth-Token': 'aToken', - 'User-Agent': 'python-keystoneclient', - } self.TEST_ROLES = { "roles": { "values": [ @@ -49,6 +36,7 @@ class RoleTests(utils.TestCase): }, } + @httpretty.activate def test_create(self): req_body = { "role": { @@ -61,173 +49,76 @@ class RoleTests(utils.TestCase): "id": 3, } } - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(resp_body), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_POST_HEADERS - kwargs['data'] = json.dumps(req_body) - requests.request('POST', - urlparse.urljoin(self.TEST_URL, - 'v2.0/OS-KSADM/roles'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.POST, ['OS-KSADM', 'roles'], json=resp_body) role = self.client.roles.create(req_body['role']['name']) + self.assertRequestBodyIs(json=req_body) self.assertTrue(isinstance(role, roles.Role)) self.assertEqual(role.id, 3) self.assertEqual(role.name, req_body['role']['name']) + @httpretty.activate def test_delete(self): - resp = utils.TestResponse({ - "status_code": 204, - "text": "", - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('DELETE', - urlparse.urljoin(self.TEST_URL, - 'v2.0/OS-KSADM/roles/1'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() - + self.stub_url(httpretty.DELETE, ['OS-KSADM', 'roles', '1'], status=204) self.client.roles.delete(1) + @httpretty.activate def test_get(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps({ - 'role': self.TEST_ROLES['roles']['values'][0], - }), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('GET', - urlparse.urljoin(self.TEST_URL, - 'v2.0/OS-KSADM/roles/1'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['OS-KSADM', 'roles', '1'], + json={'role': self.TEST_ROLES['roles']['values'][0]}) role = self.client.roles.get(1) self.assertTrue(isinstance(role, roles.Role)) self.assertEqual(role.id, 1) self.assertEqual(role.name, 'admin') + @httpretty.activate def test_list(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_ROLES), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('GET', - urlparse.urljoin(self.TEST_URL, - 'v2.0/OS-KSADM/roles'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['OS-KSADM', 'roles'], + json=self.TEST_ROLES) role_list = self.client.roles.list() [self.assertTrue(isinstance(r, roles.Role)) for r in role_list] + @httpretty.activate def test_roles_for_user(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_ROLES), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('GET', - urlparse.urljoin(self.TEST_URL, - 'v2.0/users/foo/roles'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['users', 'foo', 'roles'], + json=self.TEST_ROLES) role_list = self.client.roles.roles_for_user('foo') [self.assertTrue(isinstance(r, roles.Role)) for r in role_list] + @httpretty.activate def test_roles_for_user_tenant(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_ROLES), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('GET', - urlparse.urljoin(self.TEST_URL, - 'v2.0/tenants/barrr/users/foo/roles'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['tenants', 'barrr', 'users', 'foo', + 'roles'], json=self.TEST_ROLES) role_list = self.client.roles.roles_for_user('foo', 'barrr') [self.assertTrue(isinstance(r, roles.Role)) for r in role_list] + @httpretty.activate def test_add_user_role(self): - resp = utils.TestResponse({ - "status_code": 204, - "text": '', - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('PUT', - urlparse.urljoin(self.TEST_URL, - 'v2.0/users/foo/roles/OS-KSADM/barrr'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.PUT, ['users', 'foo', 'roles', 'OS-KSADM', + 'barrr'], status=204) self.client.roles.add_user_role('foo', 'barrr') + @httpretty.activate def test_add_user_role_tenant(self): - resp = utils.TestResponse({ - "status_code": 204, - "text": '', - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('PUT', - urlparse.urljoin(self.TEST_URL, - 'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.PUT, ['tenants', '4', 'users', 'foo', 'roles', + 'OS-KSADM', 'barrr'], status=204) self.client.roles.add_user_role('foo', 'barrr', '4') + @httpretty.activate def test_remove_user_role(self): - resp = utils.TestResponse({ - "status_code": 204, - "text": '', - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('DELETE', - urlparse.urljoin(self.TEST_URL, - 'v2.0/users/foo/roles/OS-KSADM/barrr'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() - + self.stub_url(httpretty.DELETE, ['users', 'foo', 'roles', 'OS-KSADM', + 'barrr'], status=204) self.client.roles.remove_user_role('foo', 'barrr') + @httpretty.activate def test_remove_user_role_tenant(self): - resp = utils.TestResponse({ - "status_code": 204, - "text": '', - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('DELETE', - urlparse.urljoin(self.TEST_URL, - 'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() - + self.stub_url(httpretty.DELETE, ['tenants', '4', 'users', 'foo', + 'roles', 'OS-KSADM', 'barrr'], + status=204) self.client.roles.remove_user_role('foo', 'barrr', '4') diff --git a/tests/v2_0/test_service_catalog.py b/tests/v2_0/test_service_catalog.py index 13c800fb5..c337b692f 100644 --- a/tests/v2_0/test_service_catalog.py +++ b/tests/v2_0/test_service_catalog.py @@ -17,8 +17,8 @@ import copy from keystoneclient import access from keystoneclient import exceptions -from tests import utils from tests.v2_0 import client_fixtures +from tests.v2_0 import utils class ServiceCatalogTest(utils.TestCase): diff --git a/tests/v2_0/test_services.py b/tests/v2_0/test_services.py index e7beac98e..2c953e5a4 100644 --- a/tests/v2_0/test_services.py +++ b/tests/v2_0/test_services.py @@ -12,28 +12,15 @@ # License for the specific language governing permissions and limitations # under the License. -import copy -import json -import urlparse - -import requests +import httpretty from keystoneclient.v2_0 import services -from tests import utils +from tests.v2_0 import utils class ServiceTests(utils.TestCase): def setUp(self): super(ServiceTests, self).setUp() - self.TEST_REQUEST_HEADERS = { - 'X-Auth-Token': 'aToken', - 'User-Agent': 'python-keystoneclient', - } - self.TEST_POST_HEADERS = { - 'Content-Type': 'application/json', - 'X-Auth-Token': 'aToken', - 'User-Agent': 'python-keystoneclient', - } self.TEST_SERVICES = { "OS-KSADM:services": { "values": [ @@ -53,6 +40,7 @@ class ServiceTests(utils.TestCase): }, } + @httpretty.activate def test_create(self): req_body = { "OS-KSADM:service": { @@ -69,19 +57,7 @@ class ServiceTests(utils.TestCase): "id": 3, } } - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(resp_body), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_POST_HEADERS - kwargs['data'] = json.dumps(req_body) - requests.request('POST', - urlparse.urljoin(self.TEST_URL, - 'v2.0/OS-KSADM/services'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.POST, ['OS-KSADM', 'services'], json=resp_body) service = self.client.services.create( req_body['OS-KSADM:service']['name'], @@ -90,37 +66,21 @@ class ServiceTests(utils.TestCase): self.assertTrue(isinstance(service, services.Service)) self.assertEqual(service.id, 3) self.assertEqual(service.name, req_body['OS-KSADM:service']['name']) + self.assertRequestBodyIs(json=req_body) + @httpretty.activate def test_delete(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": "", - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('DELETE', - urlparse.urljoin(self.TEST_URL, - 'v2.0/OS-KSADM/services/1'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.DELETE, ['OS-KSADM', 'services', '1'], + status=204) self.client.services.delete(1) + @httpretty.activate def test_get(self): test_services = self.TEST_SERVICES['OS-KSADM:services']['values'][0] - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps({'OS-KSADM:service': test_services}), - }) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('GET', - urlparse.urljoin(self.TEST_URL, - 'v2.0/OS-KSADM/services/1'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['OS-KSADM', 'services', '1'], + json={'OS-KSADM:service': test_services}) service = self.client.services.get(1) self.assertTrue(isinstance(service, services.Service)) @@ -128,19 +88,10 @@ class ServiceTests(utils.TestCase): self.assertEqual(service.name, 'nova') self.assertEqual(service.type, 'compute') + @httpretty.activate def test_list(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_SERVICES), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('GET', - urlparse.urljoin(self.TEST_URL, - 'v2.0/OS-KSADM/services'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['OS-KSADM', 'services'], + json=self.TEST_SERVICES) service_list = self.client.services.list() [self.assertTrue(isinstance(r, services.Service)) diff --git a/tests/v2_0/test_shell.py b/tests/v2_0/test_shell.py index 6ad29a8d8..847265e91 100644 --- a/tests/v2_0/test_shell.py +++ b/tests/v2_0/test_shell.py @@ -21,8 +21,8 @@ from testtools import matchers from keystoneclient import httpclient -from tests import utils from tests.v2_0 import fakes +from tests.v2_0 import utils DEFAULT_USERNAME = 'username' diff --git a/tests/v2_0/test_tenants.py b/tests/v2_0/test_tenants.py index 1e12087a2..02797a83a 100644 --- a/tests/v2_0/test_tenants.py +++ b/tests/v2_0/test_tenants.py @@ -12,29 +12,16 @@ # License for the specific language governing permissions and limitations # under the License. -import copy -import json -import urlparse - -import requests +import httpretty from keystoneclient import exceptions from keystoneclient.v2_0 import tenants -from tests import utils +from tests.v2_0 import utils class TenantTests(utils.TestCase): def setUp(self): super(TenantTests, self).setUp() - self.TEST_REQUEST_HEADERS = { - 'X-Auth-Token': 'aToken', - 'User-Agent': 'python-keystoneclient', - } - self.TEST_POST_HEADERS = { - 'Content-Type': 'application/json', - 'X-Auth-Token': 'aToken', - 'User-Agent': 'python-keystoneclient', - } self.TEST_TENANTS = { "tenants": { "values": [ @@ -68,6 +55,7 @@ class TenantTests(utils.TestCase): }, } + @httpretty.activate def test_create(self): req_body = { "tenant": { @@ -86,18 +74,8 @@ class TenantTests(utils.TestCase): "extravalue01": "metadata01", } } - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(resp_body), - }) + self.stub_url(httpretty.POST, ['tenants'], json=resp_body) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_POST_HEADERS - kwargs['data'] = json.dumps(req_body) - requests.request('POST', - urlparse.urljoin(self.TEST_URL, 'v2.0/tenants'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() tenant = self.client.tenants.create( req_body['tenant']['name'], req_body['tenant']['description'], @@ -109,7 +87,9 @@ class TenantTests(utils.TestCase): self.assertEqual(tenant.name, "tenantX") self.assertEqual(tenant.description, "Like tenant 9, but better.") self.assertEqual(tenant.extravalue01, "metadata01") + self.assertRequestBodyIs(json=req_body) + @httpretty.activate def test_duplicate_create(self): req_body = { "tenant": { @@ -125,18 +105,7 @@ class TenantTests(utils.TestCase): "title": "Conflict", } } - resp = utils.TestResponse({ - "status_code": 409, - "text": json.dumps(resp_body), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_POST_HEADERS - kwargs['data'] = json.dumps(req_body) - requests.request('POST', - urlparse.urljoin(self.TEST_URL, 'v2.0/tenants'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.POST, ['tenants'], status=409, json=resp_body) def create_duplicate_tenant(): self.client.tenants.create(req_body['tenant']['name'], @@ -145,108 +114,53 @@ class TenantTests(utils.TestCase): self.assertRaises(exceptions.Conflict, create_duplicate_tenant) + @httpretty.activate def test_delete(self): - resp = utils.TestResponse({ - "status_code": 204, - "text": "", - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('DELETE', - urlparse.urljoin(self.TEST_URL, 'v2.0/tenants/1'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() - + self.stub_url(httpretty.DELETE, ['tenants', '1'], status=204) self.client.tenants.delete(1) + @httpretty.activate def test_get(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps({ - 'tenant': self.TEST_TENANTS['tenants']['values'][2], - }), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('GET', - urlparse.urljoin(self.TEST_URL, 'v2.0/tenants/1'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + resp = {'tenant': self.TEST_TENANTS['tenants']['values'][2]} + self.stub_url(httpretty.GET, ['tenants', '1'], json=resp) t = self.client.tenants.get(1) self.assertTrue(isinstance(t, tenants.Tenant)) self.assertEqual(t.id, 1) self.assertEqual(t.name, 'admin') + @httpretty.activate def test_list(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_TENANTS), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('GET', - urlparse.urljoin(self.TEST_URL, 'v2.0/tenants'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['tenants'], json=self.TEST_TENANTS) tenant_list = self.client.tenants.list() [self.assertTrue(isinstance(t, tenants.Tenant)) for t in tenant_list] + @httpretty.activate def test_list_limit(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_TENANTS), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('GET', - urlparse.urljoin(self.TEST_URL, - 'v2.0/tenants?limit=1'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['tenants'], json=self.TEST_TENANTS) tenant_list = self.client.tenants.list(limit=1) + self.assertQueryStringIs({'limit': ['1']}) [self.assertTrue(isinstance(t, tenants.Tenant)) for t in tenant_list] + @httpretty.activate def test_list_marker(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_TENANTS), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('GET', - urlparse.urljoin(self.TEST_URL, - 'v2.0/tenants?marker=1'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['tenants'], json=self.TEST_TENANTS) tenant_list = self.client.tenants.list(marker=1) + self.assertQueryStringIs({'marker': ['1']}) [self.assertTrue(isinstance(t, tenants.Tenant)) for t in tenant_list] + @httpretty.activate def test_list_limit_marker(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_TENANTS), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('GET', - urlparse.urljoin(self.TEST_URL, - 'v2.0/tenants?marker=1&limit=1'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['tenants'], json=self.TEST_TENANTS) tenant_list = self.client.tenants.list(limit=1, marker=1) + self.assertQueryStringIs({'marker': ['1'], 'limit': ['1']}) [self.assertTrue(isinstance(t, tenants.Tenant)) for t in tenant_list] + @httpretty.activate def test_update(self): req_body = { "tenant": { @@ -267,19 +181,8 @@ class TenantTests(utils.TestCase): "extravalue01": "metadataChanged", }, } - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(resp_body), - }) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_POST_HEADERS - kwargs['data'] = json.dumps(req_body) - requests.request('POST', - urlparse.urljoin(self.TEST_URL, - 'v2.0/tenants/4'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.POST, ['tenants', '4'], json=resp_body) tenant = self.client.tenants.update( req_body['tenant']['id'], @@ -289,12 +192,14 @@ class TenantTests(utils.TestCase): extravalue01=req_body['tenant']['extravalue01'], name="dont overwrite priors") self.assertTrue(isinstance(tenant, tenants.Tenant)) + self.assertRequestBodyIs(json=req_body) self.assertEqual(tenant.id, 4) self.assertEqual(tenant.name, "tenantX") self.assertEqual(tenant.description, "I changed you!") self.assertFalse(tenant.enabled) self.assertEqual(tenant.extravalue01, "metadataChanged") + @httpretty.activate def test_update_empty_description(self): req_body = { "tenant": { @@ -312,63 +217,40 @@ class TenantTests(utils.TestCase): "description": "", }, } - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(resp_body), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_POST_HEADERS - kwargs['data'] = json.dumps(req_body) - requests.request('POST', - urlparse.urljoin(self.TEST_URL, - 'v2.0/tenants/4'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.POST, ['tenants', '4'], json=resp_body) tenant = self.client.tenants.update(req_body['tenant']['id'], req_body['tenant']['name'], req_body['tenant']['description'], req_body['tenant']['enabled']) self.assertTrue(isinstance(tenant, tenants.Tenant)) + self.assertRequestBodyIs(json=req_body) self.assertEqual(tenant.id, 4) self.assertEqual(tenant.name, "tenantX") self.assertEqual(tenant.description, "") self.assertFalse(tenant.enabled) + @httpretty.activate def test_add_user(self): - resp = utils.TestResponse({ - "status_code": 204, - "text": '', - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('PUT', - urlparse.urljoin(self.TEST_URL, - 'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.PUT, ['tenants', '4', 'users', 'foo', 'roles', + 'OS-KSADM', 'barrr'], status=204) self.client.tenants.add_user('4', 'foo', 'barrr') + @httpretty.activate def test_remove_user(self): - resp = utils.TestResponse({ - "status_code": 204, - "text": '', - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('DELETE', - urlparse.urljoin(self.TEST_URL, - 'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.DELETE, ['tenants', '4', 'users', 'foo', + 'roles', 'OS-KSADM', 'barrr'], + status=204) self.client.tenants.remove_user('4', 'foo', 'barrr') + @httpretty.activate def test_tenant_add_user(self): + self.stub_url(httpretty.PUT, ['tenants', '4', 'users', 'foo', 'roles', + 'OS-KSADM', 'barrr'], + status=204) + req_body = { "tenant": { "id": 4, @@ -377,26 +259,18 @@ class TenantTests(utils.TestCase): "enabled": False, }, } - resp = utils.TestResponse({ - "status_code": 204, - "text": '', - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('PUT', - urlparse.urljoin(self.TEST_URL, - 'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() - # make tenant object with manager tenant = self.client.tenants.resource_class(self.client.tenants, req_body['tenant']) tenant.add_user('foo', 'barrr') self.assertTrue(isinstance(tenant, tenants.Tenant)) + @httpretty.activate def test_tenant_remove_user(self): + self.stub_url(httpretty.DELETE, ['tenants', '4', 'users', 'foo', + 'roles', 'OS-KSADM', 'barrr'], + status=204) + req_body = { "tenant": { "id": 4, @@ -405,18 +279,6 @@ class TenantTests(utils.TestCase): "enabled": False, }, } - resp = utils.TestResponse({ - "status_code": 204, - "text": '', - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('DELETE', - urlparse.urljoin(self.TEST_URL, - 'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() # make tenant object with manager tenant = self.client.tenants.resource_class(self.client.tenants, diff --git a/tests/v2_0/test_tokens.py b/tests/v2_0/test_tokens.py index e1d896098..6424d2856 100644 --- a/tests/v2_0/test_tokens.py +++ b/tests/v2_0/test_tokens.py @@ -12,37 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. -import copy -import urlparse +import httpretty -import requests - -from tests import utils +from tests.v2_0 import utils class TokenTests(utils.TestCase): - def setUp(self): - super(TokenTests, self).setUp() - self.TEST_REQUEST_HEADERS = { - 'X-Auth-Token': 'aToken', - 'User-Agent': 'python-keystoneclient'} - self.TEST_POST_HEADERS = { - 'Content-Type': 'application/json', - 'X-Auth-Token': 'aToken', - 'User-Agent': 'python-keystoneclient'} - + @httpretty.activate def test_delete(self): - resp = utils.TestResponse({ - "status_code": 204, - "text": ""}) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request( - 'DELETE', - urlparse.urljoin(self.TEST_URL, 'v2.0/tokens/1'), - **kwargs).AndReturn((resp)) - - self.mox.ReplayAll() - + self.stub_url(httpretty.DELETE, ['tokens', '1'], status=204) self.client.tokens.delete(1) diff --git a/tests/v2_0/test_users.py b/tests/v2_0/test_users.py index 88b9aa84f..288b55549 100644 --- a/tests/v2_0/test_users.py +++ b/tests/v2_0/test_users.py @@ -12,28 +12,15 @@ # License for the specific language governing permissions and limitations # under the License. -import copy -import json -import urlparse - -import requests +import httpretty from keystoneclient.v2_0 import users -from tests import utils +from tests.v2_0 import utils class UserTests(utils.TestCase): def setUp(self): super(UserTests, self).setUp() - self.TEST_REQUEST_HEADERS = { - 'X-Auth-Token': 'aToken', - 'User-Agent': 'python-keystoneclient', - } - self.TEST_POST_HEADERS = { - 'Content-Type': 'application/json', - 'X-Auth-Token': 'aToken', - 'User-Agent': 'python-keystoneclient', - } self.TEST_USERS = { "users": { "values": [ @@ -53,6 +40,7 @@ class UserTests(utils.TestCase): } } + @httpretty.activate def test_create(self): req_body = { "user": { @@ -63,6 +51,7 @@ class UserTests(utils.TestCase): "enabled": True, } } + resp_body = { "user": { "name": "gabriel", @@ -73,19 +62,8 @@ class UserTests(utils.TestCase): "email": "test@example.com", } } - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(resp_body), - }) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_POST_HEADERS - kwargs['data'] = json.dumps(req_body) - requests.request( - 'POST', - urlparse.urljoin(self.TEST_URL, 'v2.0/users'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.POST, ['users'], json=resp_body) user = self.client.users.create(req_body['user']['name'], req_body['user']['password'], @@ -96,112 +74,59 @@ class UserTests(utils.TestCase): self.assertEqual(user.id, 3) self.assertEqual(user.name, "gabriel") self.assertEqual(user.email, "test@example.com") + self.assertRequestBodyIs(json=req_body) + @httpretty.activate def test_delete(self): - resp = utils.TestResponse({ - "status_code": 204, - "text": "", - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request( - 'DELETE', - urlparse.urljoin(self.TEST_URL, 'v2.0/users/1'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() - + self.stub_url(httpretty.DELETE, ['users', '1'], status=204) self.client.users.delete(1) + @httpretty.activate def test_get(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps({ - 'user': self.TEST_USERS['users']['values'][0], - }) - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request( - 'GET', - urlparse.urljoin(self.TEST_URL, 'v2.0/users/1'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['users', '1'], + json={'user': self.TEST_USERS['users']['values'][0]}) u = self.client.users.get(1) self.assertTrue(isinstance(u, users.User)) self.assertEqual(u.id, 1) self.assertEqual(u.name, 'admin') + @httpretty.activate def test_list(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_USERS), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request( - 'GET', - urlparse.urljoin(self.TEST_URL, 'v2.0/users'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['users'], json=self.TEST_USERS) user_list = self.client.users.list() [self.assertTrue(isinstance(u, users.User)) for u in user_list] + @httpretty.activate def test_list_limit(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_USERS), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request( - 'GET', - urlparse.urljoin(self.TEST_URL, 'v2.0/users?limit=1'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['users'], json=self.TEST_USERS) user_list = self.client.users.list(limit=1) + self.assertEqual(httpretty.last_request().querystring, + {'limit': ['1']}) [self.assertTrue(isinstance(u, users.User)) for u in user_list] + @httpretty.activate def test_list_marker(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_USERS), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request( - 'GET', - urlparse.urljoin(self.TEST_URL, 'v2.0/users?marker=foo'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['users'], json=self.TEST_USERS) user_list = self.client.users.list(marker='foo') + self.assertDictEqual(httpretty.last_request().querystring, + {'marker': ['foo']}) [self.assertTrue(isinstance(u, users.User)) for u in user_list] + @httpretty.activate def test_list_limit_marker(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_USERS), - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request( - 'GET', - urlparse.urljoin(self.TEST_URL, 'v2.0/users?marker=foo&limit=1'), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.GET, ['users'], json=self.TEST_USERS) user_list = self.client.users.list(limit=1, marker='foo') + + self.assertDictEqual(httpretty.last_request().querystring, + {'marker': ['foo'], 'limit': ['1']}) [self.assertTrue(isinstance(u, users.User)) for u in user_list] + @httpretty.activate def test_update(self): req_1 = { "user": { @@ -229,61 +154,26 @@ class UserTests(utils.TestCase): } } - # Keystone basically echoes these back... including the password :-/ - resp_1 = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(req_1) - }) - resp_2 = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(req_2) - }) - resp_3 = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(req_3) - }) - resp_4 = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(req_4) - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_POST_HEADERS - kwargs['data'] = json.dumps(req_1) - requests.request( - 'PUT', - urlparse.urljoin(self.TEST_URL, 'v2.0/users/2'), - **kwargs).AndReturn((resp_1)) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_POST_HEADERS - kwargs['data'] = json.dumps(req_2) - requests.request( - 'PUT', - urlparse.urljoin(self.TEST_URL, 'v2.0/users/2/OS-KSADM/password'), - **kwargs).AndReturn((resp_2)) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_POST_HEADERS - kwargs['data'] = json.dumps(req_3) - requests.request( - 'PUT', - urlparse.urljoin(self.TEST_URL, 'v2.0/users/2/OS-KSADM/tenant'), - **kwargs).AndReturn((resp_3)) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_POST_HEADERS - kwargs['data'] = json.dumps(req_4) - requests.request( - 'PUT', - urlparse.urljoin(self.TEST_URL, 'v2.0/users/2/OS-KSADM/enabled'), - **kwargs).AndReturn((resp_4)) - self.mox.ReplayAll() + self.stub_url(httpretty.PUT, ['users', '2'], json=req_1) + self.stub_url(httpretty.PUT, ['users', '2', 'OS-KSADM', 'password'], + json=req_2) + self.stub_url(httpretty.PUT, ['users', '2', 'OS-KSADM', 'tenant'], + json=req_3) + self.stub_url(httpretty.PUT, ['users', '2', 'OS-KSADM', 'enabled'], + json=req_4) self.client.users.update(2, name='gabriel', email='gabriel@example.com') + self.assertRequestBodyIs(json=req_1) self.client.users.update_password(2, 'swordfish') + self.assertRequestBodyIs(json=req_2) self.client.users.update_tenant(2, 1) + self.assertRequestBodyIs(json=req_3) self.client.users.update_enabled(2, False) + self.assertRequestBodyIs(json=req_4) + @httpretty.activate def test_update_own_password(self): req_body = { 'user': { @@ -293,20 +183,9 @@ class UserTests(utils.TestCase): resp_body = { 'access': {} } - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(resp_body) - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_POST_HEADERS - kwargs['data'] = json.dumps(req_body) - requests.request( - 'PATCH', - urlparse.urljoin(self.TEST_URL, 'v2.0/OS-KSCRUD/users/123'), - **kwargs).AndReturn((resp)) - - self.mox.ReplayAll() + self.stub_url(httpretty.PATCH, ['OS-KSCRUD', 'users', '123'], + json=resp_body) self.client.user_id = '123' self.client.users.update_own_password('DCBA', 'ABCD') + self.assertRequestBodyIs(json=req_body) diff --git a/tests/v2_0/utils.py b/tests/v2_0/utils.py new file mode 100644 index 000000000..870f06c65 --- /dev/null +++ b/tests/v2_0/utils.py @@ -0,0 +1,90 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import httpretty + +from keystoneclient.v2_0 import client +from tests import utils + +TestResponse = utils.TestResponse + + +class UnauthenticatedTestCase(utils.TestCase): + """Class used as base for unauthenticated calls.""" + + TEST_ROOT_URL = 'http://127.0.0.1:5000/' + TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v2.0') + TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/' + TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v2.0') + + +class TestCase(UnauthenticatedTestCase): + + TEST_SERVICE_CATALOG = [{ + "endpoints": [{ + "adminURL": "http://cdn.admin-nets.local:8774/v1.0", + "region": "RegionOne", + "internalURL": "http://127.0.0.1:8774/v1.0", + "publicURL": "http://cdn.admin-nets.local:8774/v1.0/" + }], + "type": "nova_compat", + "name": "nova_compat" + }, { + "endpoints": [{ + "adminURL": "http://nova/novapi/admin", + "region": "RegionOne", + "internalURL": "http://nova/novapi/internal", + "publicURL": "http://nova/novapi/public" + }], + "type": "compute", + "name": "nova" + }, { + "endpoints": [{ + "adminURL": "http://glance/glanceapi/admin", + "region": "RegionOne", + "internalURL": "http://glance/glanceapi/internal", + "publicURL": "http://glance/glanceapi/public" + }], + "type": "image", + "name": "glance" + }, { + "endpoints": [{ + "adminURL": "http://127.0.0.1:35357/v2.0", + "region": "RegionOne", + "internalURL": "http://127.0.0.1:5000/v2.0", + "publicURL": "http://127.0.0.1:5000/v2.0" + }], + "type": "identity", + "name": "keystone" + }, { + "endpoints": [{ + "adminURL": "http://swift/swiftapi/admin", + "region": "RegionOne", + "internalURL": "http://swift/swiftapi/internal", + "publicURL": "http://swift/swiftapi/public" + }], + "type": "object-store", + "name": "swift" + }] + + def setUp(self): + super(TestCase, self).setUp() + self.client = client.Client(username=self.TEST_USER, + token=self.TEST_TOKEN, + tenant_name=self.TEST_TENANT_NAME, + auth_url=self.TEST_URL, + endpoint=self.TEST_URL) + + def stub_auth(self, **kwargs): + self.stub_url(httpretty.POST, ['tokens'], **kwargs) diff --git a/tests/v3/client_fixtures.py b/tests/v3/client_fixtures.py index 72cb10e91..179916bc2 100644 --- a/tests/v3/client_fixtures.py +++ b/tests/v3/client_fixtures.py @@ -162,8 +162,10 @@ PROJECT_SCOPED_TOKEN = { } } +AUTH_SUBJECT_TOKEN = '3e2813b7ba0b4006840c3825860b86ed' + AUTH_RESPONSE_HEADERS = { - 'X-Subject-Token': '3e2813b7ba0b4006840c3825860b86ed' + 'X-Subject-Token': AUTH_SUBJECT_TOKEN } AUTH_RESPONSE_BODY = { diff --git a/tests/v3/test_access.py b/tests/v3/test_access.py index e57c83766..6c9fe9978 100644 --- a/tests/v3/test_access.py +++ b/tests/v3/test_access.py @@ -16,8 +16,8 @@ import datetime from keystoneclient import access from keystoneclient.openstack.common import timeutils -from tests import utils from tests.v3 import client_fixtures +from tests.v3 import utils TOKEN_RESPONSE = utils.TestResponse({ "headers": client_fixtures.AUTH_RESPONSE_HEADERS diff --git a/tests/v3/test_auth.py b/tests/v3/test_auth.py index b17d34afb..1cd5407af 100644 --- a/tests/v3/test_auth.py +++ b/tests/v3/test_auth.py @@ -12,9 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. -import copy -import json -import requests +import httpretty from keystoneclient import exceptions from keystoneclient.v3 import client @@ -82,53 +80,31 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): 'X-Subject-Token': self.TEST_TOKEN } + @httpretty.activate def test_authenticate_success(self): TEST_TOKEN = "abcdef" - self.TEST_RESPONSE_HEADERS['X-Subject-Token'] = TEST_TOKEN ident = self.TEST_REQUEST_BODY['auth']['identity'] del ident['password']['user']['domain'] del ident['password']['user']['name'] ident['password']['user']['id'] = self.TEST_USER - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_RESPONSE_DICT), - "headers": self.TEST_RESPONSE_HEADERS, - }) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) - requests.request('POST', - self.TEST_URL + "/auth/tokens", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_auth(json=self.TEST_RESPONSE_DICT, subject_token=TEST_TOKEN) cs = client.Client(user_id=self.TEST_USER, password=self.TEST_TOKEN, project_id=self.TEST_TENANT_ID, auth_url=self.TEST_URL) self.assertEqual(cs.auth_token, TEST_TOKEN) + self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY) + @httpretty.activate def test_authenticate_failure(self): ident = self.TEST_REQUEST_BODY['auth']['identity'] ident['password']['user']['password'] = 'bad_key' - resp = utils.TestResponse({ - "status_code": 401, - "text": json.dumps({ - "unauthorized": { - "message": "Unauthorized", - "code": "401", - }, - }), - }) + error = {"unauthorized": {"message": "Unauthorized", + "code": "401"}} - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) - requests.request('POST', - self.TEST_URL + "/auth/tokens", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_auth(status=401, json=error) # Workaround for issue with assertRaises on python2.6 # where with assertRaises(exceptions.Unauthorized): doesn't work @@ -141,40 +117,15 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): auth_url=self.TEST_URL) self.assertRaises(exceptions.Unauthorized, client_create_wrapper) + self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY) + @httpretty.activate def test_auth_redirect(self): - correct_response = json.dumps(self.TEST_RESPONSE_DICT, sort_keys=True) - dict_responses = [ - { - "headers": { - 'location': self.TEST_ADMIN_URL + "/auth/tokens", - 'X-Subject-Token': self.TEST_TOKEN, - }, - "status_code": 305, - "text": "Use proxy", - }, - { - "headers": {'X-Subject-Token': self.TEST_TOKEN}, - "status_code": 200, - "text": correct_response, - }, - ] - responses = [(utils.TestResponse(resp)) - for resp in dict_responses] + self.stub_auth(status=305, body='Use proxy', + location=self.TEST_ADMIN_URL + '/auth/tokens') - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) - requests.request('POST', - self.TEST_URL + "/auth/tokens", - **kwargs).AndReturn(responses[0]) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) - requests.request('POST', - self.TEST_ADMIN_URL + "/auth/tokens", - **kwargs).AndReturn(responses[1]) - self.mox.ReplayAll() + self.stub_auth(json=self.TEST_RESPONSE_DICT, + base_url=self.TEST_ADMIN_URL) cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME, username=self.TEST_USER, @@ -188,20 +139,9 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): self.assertEqual(cs.auth_token, self.TEST_RESPONSE_HEADERS["X-Subject-Token"]) + @httpretty.activate def test_authenticate_success_domain_username_password_scoped(self): - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_RESPONSE_DICT), - "headers": self.TEST_RESPONSE_HEADERS, - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) - requests.request('POST', - self.TEST_URL + "/auth/tokens", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_auth(json=self.TEST_RESPONSE_DICT) cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME, username=self.TEST_USER, @@ -214,6 +154,7 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): self.assertEqual(cs.auth_token, self.TEST_RESPONSE_HEADERS["X-Subject-Token"]) + @httpretty.activate def test_authenticate_success_userid_password_domain_scoped(self): ident = self.TEST_REQUEST_BODY['auth']['identity'] del ident['password']['user']['domain'] @@ -231,20 +172,7 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): token['domain']['id'] = self.TEST_DOMAIN_ID token['domain']['name'] = self.TEST_DOMAIN_NAME - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_RESPONSE_DICT), - "headers": self.TEST_RESPONSE_HEADERS, - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) - requests.request('POST', - self.TEST_URL + "/auth/tokens", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_auth(json=self.TEST_RESPONSE_DICT) cs = client.Client(user_id=self.TEST_USER, password=self.TEST_TOKEN, @@ -257,27 +185,16 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): ['endpoints'][2]["url"]) self.assertEqual(cs.auth_token, self.TEST_RESPONSE_HEADERS["X-Subject-Token"]) + self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY) + @httpretty.activate def test_authenticate_success_userid_password_project_scoped(self): ident = self.TEST_REQUEST_BODY['auth']['identity'] del ident['password']['user']['domain'] del ident['password']['user']['name'] ident['password']['user']['id'] = self.TEST_USER - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_RESPONSE_DICT), - "headers": self.TEST_RESPONSE_HEADERS, - }) - - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) - requests.request('POST', - self.TEST_URL + "/auth/tokens", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_auth(json=self.TEST_RESPONSE_DICT) cs = client.Client(user_id=self.TEST_USER, password=self.TEST_TOKEN, @@ -290,23 +207,14 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): ['endpoints'][2]["url"]) self.assertEqual(cs.auth_token, self.TEST_RESPONSE_HEADERS["X-Subject-Token"]) + self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY) + @httpretty.activate def test_authenticate_success_password_unscoped(self): del self.TEST_RESPONSE_DICT['token']['catalog'] del self.TEST_REQUEST_BODY['auth']['scope'] - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_RESPONSE_DICT), - "headers": self.TEST_RESPONSE_HEADERS, - }) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) - requests.request('POST', - self.TEST_URL + "/auth/tokens", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_auth(json=self.TEST_RESPONSE_DICT) cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME, username=self.TEST_USER, @@ -315,7 +223,9 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): self.assertEqual(cs.auth_token, self.TEST_RESPONSE_HEADERS["X-Subject-Token"]) self.assertFalse('catalog' in cs.service_catalog.catalog) + self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY) + @httpretty.activate def test_authenticate_success_token_domain_scoped(self): ident = self.TEST_REQUEST_BODY['auth']['identity'] del ident['password'] @@ -335,19 +245,8 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): token['domain']['name'] = self.TEST_DOMAIN_NAME self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_RESPONSE_DICT), - "headers": self.TEST_RESPONSE_HEADERS, - }) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) - requests.request('POST', - self.TEST_URL + "/auth/tokens", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_auth(json=self.TEST_RESPONSE_DICT) cs = client.Client(token=self.TEST_TOKEN, domain_id=self.TEST_DOMAIN_ID, @@ -359,7 +258,9 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): ['endpoints'][2]["url"]) self.assertEqual(cs.auth_token, self.TEST_RESPONSE_HEADERS["X-Subject-Token"]) + self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY) + @httpretty.activate def test_authenticate_success_token_project_scoped(self): ident = self.TEST_REQUEST_BODY['auth']['identity'] del ident['password'] @@ -367,19 +268,8 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): ident['token'] = {} ident['token']['id'] = self.TEST_TOKEN self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_RESPONSE_DICT), - "headers": self.TEST_RESPONSE_HEADERS, - }) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) - requests.request('POST', - self.TEST_URL + "/auth/tokens", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_auth(json=self.TEST_RESPONSE_DICT) cs = client.Client(token=self.TEST_TOKEN, project_id=self.TEST_TENANT_ID, @@ -391,7 +281,9 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): ['endpoints'][2]["url"]) self.assertEqual(cs.auth_token, self.TEST_RESPONSE_HEADERS["X-Subject-Token"]) + self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY) + @httpretty.activate def test_authenticate_success_token_unscoped(self): ident = self.TEST_REQUEST_BODY['auth']['identity'] del ident['password'] @@ -401,22 +293,12 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase): del self.TEST_REQUEST_BODY['auth']['scope'] del self.TEST_RESPONSE_DICT['token']['catalog'] self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN - resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(self.TEST_RESPONSE_DICT), - "headers": self.TEST_RESPONSE_HEADERS, - }) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True) - requests.request('POST', - self.TEST_URL + "/auth/tokens", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_auth(json=self.TEST_RESPONSE_DICT) cs = client.Client(token=self.TEST_TOKEN, auth_url=self.TEST_URL) self.assertEqual(cs.auth_token, self.TEST_RESPONSE_HEADERS["X-Subject-Token"]) self.assertFalse('catalog' in cs.service_catalog.catalog) + self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY) diff --git a/tests/v3/test_client.py b/tests/v3/test_client.py index 5a15a0d34..a49e4f5cb 100644 --- a/tests/v3/test_client.py +++ b/tests/v3/test_client.py @@ -13,148 +13,122 @@ # under the License. import json -import mock -import requests +import httpretty from keystoneclient import exceptions from keystoneclient.v3 import client -from tests import utils from tests.v3 import client_fixtures +from tests.v3 import utils class KeystoneClientTest(utils.TestCase): - def setUp(self): - super(KeystoneClientTest, self).setUp() - - domain_scoped_fake_resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(client_fixtures.DOMAIN_SCOPED_TOKEN), - "headers": client_fixtures.AUTH_RESPONSE_HEADERS - }) - self.domain_scoped_mock_req = mock.Mock( - return_value=domain_scoped_fake_resp) - - project_scoped_fake_resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(client_fixtures.PROJECT_SCOPED_TOKEN), - "headers": client_fixtures.AUTH_RESPONSE_HEADERS - }) - self.project_scoped_mock_req = mock.Mock( - return_value=project_scoped_fake_resp) - - unscoped_fake_resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(client_fixtures.UNSCOPED_TOKEN), - "headers": client_fixtures.AUTH_RESPONSE_HEADERS - }) - self.unscoped_mock_req = mock.Mock(return_value=unscoped_fake_resp) - - trust_fake_resp = utils.TestResponse({ - "status_code": 200, - "text": json.dumps(client_fixtures.TRUST_TOKEN), - "headers": client_fixtures.AUTH_RESPONSE_HEADERS - }) - self.trust_mock_req = mock.Mock(return_value=trust_fake_resp) + @httpretty.activate def test_unscoped_init(self): - with mock.patch.object(requests, "request", self.unscoped_mock_req): - c = client.Client(user_domain_name='exampledomain', - username='exampleuser', - password='password', - auth_url='http://somewhere/') - self.assertIsNotNone(c.auth_ref) - self.assertFalse(c.auth_ref.domain_scoped) - self.assertFalse(c.auth_ref.project_scoped) - self.assertEquals(c.auth_user_id, - 'c4da488862bd435c9e6c0275a0d0e49a') + self.stub_auth(json=client_fixtures.UNSCOPED_TOKEN) + c = client.Client(user_domain_name='exampledomain', + username='exampleuser', + password='password', + auth_url=self.TEST_URL) + self.assertIsNotNone(c.auth_ref) + self.assertFalse(c.auth_ref.domain_scoped) + self.assertFalse(c.auth_ref.project_scoped) + self.assertEquals(c.auth_user_id, + 'c4da488862bd435c9e6c0275a0d0e49a') + + @httpretty.activate def test_domain_scoped_init(self): - with mock.patch.object(requests, - "request", - self.domain_scoped_mock_req): - c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a', - password='password', - domain_name='exampledomain', - auth_url='http://somewhere/') - self.assertIsNotNone(c.auth_ref) - self.assertTrue(c.auth_ref.domain_scoped) - self.assertFalse(c.auth_ref.project_scoped) - self.assertEquals(c.auth_user_id, - 'c4da488862bd435c9e6c0275a0d0e49a') - self.assertEquals(c.auth_domain_id, - '8e9283b7ba0b1038840c3842058b86ab') + self.stub_auth(json=client_fixtures.DOMAIN_SCOPED_TOKEN) + c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a', + password='password', + domain_name='exampledomain', + auth_url=self.TEST_URL) + self.assertIsNotNone(c.auth_ref) + self.assertTrue(c.auth_ref.domain_scoped) + self.assertFalse(c.auth_ref.project_scoped) + self.assertEquals(c.auth_user_id, + 'c4da488862bd435c9e6c0275a0d0e49a') + self.assertEquals(c.auth_domain_id, + '8e9283b7ba0b1038840c3842058b86ab') + + @httpretty.activate def test_project_scoped_init(self): - with mock.patch.object(requests, - "request", - self.project_scoped_mock_req): - c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a', - password='password', - user_domain_name='exampledomain', - project_name='exampleproject', - auth_url='http://somewhere/') - self.assertIsNotNone(c.auth_ref) - self.assertFalse(c.auth_ref.domain_scoped) - self.assertTrue(c.auth_ref.project_scoped) - self.assertEquals(c.auth_user_id, - 'c4da488862bd435c9e6c0275a0d0e49a') - self.assertEquals(c.auth_tenant_id, - '225da22d3ce34b15877ea70b2a575f58') + self.stub_auth(json=client_fixtures.PROJECT_SCOPED_TOKEN), + c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a', + password='password', + user_domain_name='exampledomain', + project_name='exampleproject', + auth_url=self.TEST_URL) + self.assertIsNotNone(c.auth_ref) + self.assertFalse(c.auth_ref.domain_scoped) + self.assertTrue(c.auth_ref.project_scoped) + self.assertEquals(c.auth_user_id, + 'c4da488862bd435c9e6c0275a0d0e49a') + self.assertEquals(c.auth_tenant_id, + '225da22d3ce34b15877ea70b2a575f58') + + @httpretty.activate def test_auth_ref_load(self): - with mock.patch.object(requests, - "request", - self.project_scoped_mock_req): - c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a', - password='password', - project_id='225da22d3ce34b15877ea70b2a575f58', - auth_url='http://somewhere/') - cache = json.dumps(c.auth_ref) - new_client = client.Client(auth_ref=json.loads(cache)) - self.assertIsNotNone(new_client.auth_ref) - self.assertFalse(new_client.auth_ref.domain_scoped) - self.assertTrue(new_client.auth_ref.project_scoped) - self.assertEquals(new_client.username, 'exampleuser') - self.assertIsNone(new_client.password) - self.assertEqual(new_client.management_url, - 'http://admin:35357/v3') + self.stub_auth(json=client_fixtures.PROJECT_SCOPED_TOKEN) + c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a', + password='password', + project_id='225da22d3ce34b15877ea70b2a575f58', + auth_url=self.TEST_URL) + cache = json.dumps(c.auth_ref) + new_client = client.Client(auth_ref=json.loads(cache)) + self.assertIsNotNone(new_client.auth_ref) + self.assertFalse(new_client.auth_ref.domain_scoped) + self.assertTrue(new_client.auth_ref.project_scoped) + self.assertEquals(new_client.username, 'exampleuser') + self.assertIsNone(new_client.password) + self.assertEqual(new_client.management_url, + 'http://admin:35357/v3') + + @httpretty.activate def test_auth_ref_load_with_overridden_arguments(self): - with mock.patch.object(requests, - "request", - self.project_scoped_mock_req): - c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a', - password='password', - project_id='225da22d3ce34b15877ea70b2a575f58', - auth_url='http://somewhere/') - cache = json.dumps(c.auth_ref) - new_auth_url = "http://new-public:5000/v3" - new_client = client.Client(auth_ref=json.loads(cache), - auth_url=new_auth_url) - self.assertIsNotNone(new_client.auth_ref) - self.assertFalse(new_client.auth_ref.domain_scoped) - self.assertTrue(new_client.auth_ref.project_scoped) - self.assertEquals(new_client.auth_url, new_auth_url) - self.assertEquals(new_client.username, 'exampleuser') - self.assertIsNone(new_client.password) - self.assertEqual(new_client.management_url, - 'http://admin:35357/v3') + new_auth_url = 'https://newkeystone.com/v3' + self.stub_auth(json=client_fixtures.PROJECT_SCOPED_TOKEN) + self.stub_auth(json=client_fixtures.PROJECT_SCOPED_TOKEN, + base_url=new_auth_url) + + c = client.Client(user_id='c4da488862bd435c9e6c0275a0d0e49a', + password='password', + project_id='225da22d3ce34b15877ea70b2a575f58', + auth_url=self.TEST_URL) + cache = json.dumps(c.auth_ref) + new_client = client.Client(auth_ref=json.loads(cache), + auth_url=new_auth_url) + self.assertIsNotNone(new_client.auth_ref) + self.assertFalse(new_client.auth_ref.domain_scoped) + self.assertTrue(new_client.auth_ref.project_scoped) + self.assertEquals(new_client.auth_url, new_auth_url) + self.assertEquals(new_client.username, 'exampleuser') + self.assertIsNone(new_client.password) + self.assertEqual(new_client.management_url, + 'http://admin:35357/v3') + + @httpretty.activate def test_trust_init(self): - with mock.patch.object(requests, "request", self.trust_mock_req): - c = client.Client(user_domain_name='exampledomain', - username='exampleuser', - password='password', - auth_url='http://somewhere/', - trust_id='fe0aef') - self.assertIsNotNone(c.auth_ref) - self.assertFalse(c.auth_ref.domain_scoped) - self.assertFalse(c.auth_ref.project_scoped) - self.assertEqual(c.auth_ref.trust_id, 'fe0aef') - self.assertTrue(c.auth_ref.trust_scoped) - self.assertEquals(c.auth_user_id, '0ca8f6') + self.stub_auth(json=client_fixtures.TRUST_TOKEN) + + c = client.Client(user_domain_name='exampledomain', + username='exampleuser', + password='password', + auth_url=self.TEST_URL, + trust_id='fe0aef') + self.assertIsNotNone(c.auth_ref) + self.assertFalse(c.auth_ref.domain_scoped) + self.assertFalse(c.auth_ref.project_scoped) + self.assertEqual(c.auth_ref.trust_id, 'fe0aef') + self.assertTrue(c.auth_ref.trust_scoped) + self.assertEquals(c.auth_user_id, '0ca8f6') def test_init_err_no_auth_url(self): self.assertRaises(exceptions.AuthorizationFailure, diff --git a/tests/v3/test_credentials.py b/tests/v3/test_credentials.py index 9a6887264..25ec1b2bd 100644 --- a/tests/v3/test_credentials.py +++ b/tests/v3/test_credentials.py @@ -21,7 +21,6 @@ from tests.v3 import utils class CredentialTests(utils.TestCase, utils.CrudTests): def setUp(self): super(CredentialTests, self).setUp() - self.additionalSetUp() self.key = 'credential' self.collection_key = 'credentials' self.model = credentials.Credential diff --git a/tests/v3/test_discover.py b/tests/v3/test_discover.py index 51278d295..40538bc15 100644 --- a/tests/v3/test_discover.py +++ b/tests/v3/test_discover.py @@ -12,12 +12,12 @@ # License for the specific language governing permissions and limitations # under the License. -import copy import json -import requests + +import httpretty from keystoneclient.generic import client -from tests import utils +from tests.v3 import utils class DiscoverKeystoneTests(utils.UnauthenticatedTestCase): @@ -66,17 +66,11 @@ class DiscoverKeystoneTests(utils.UnauthenticatedTestCase): 'Accept': 'application/json', } + @httpretty.activate def test_get_version_local(self): - resp = utils.TestResponse({ - "status_code": 300, - "text": json.dumps(self.TEST_RESPONSE_DICT), - }) - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.TEST_REQUEST_HEADERS - requests.request('GET', - "http://localhost:35357", - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + httpretty.register_uri(httpretty.GET, "http://localhost:35357/", + status=300, + body=json.dumps(self.TEST_RESPONSE_DICT)) cs = client.Client() versions = cs.discover() diff --git a/tests/v3/test_domains.py b/tests/v3/test_domains.py index 2fedba55e..ab4500fbf 100644 --- a/tests/v3/test_domains.py +++ b/tests/v3/test_domains.py @@ -21,7 +21,6 @@ from tests.v3 import utils class DomainTests(utils.TestCase, utils.CrudTests): def setUp(self): super(DomainTests, self).setUp() - self.additionalSetUp() self.key = 'domain' self.collection_key = 'domains' self.model = domains.Domain diff --git a/tests/v3/test_endpoints.py b/tests/v3/test_endpoints.py index c28fff762..1534b90bd 100644 --- a/tests/v3/test_endpoints.py +++ b/tests/v3/test_endpoints.py @@ -21,7 +21,6 @@ from tests.v3 import utils class EndpointTests(utils.TestCase, utils.CrudTests): def setUp(self): super(EndpointTests, self).setUp() - self.additionalSetUp() self.key = 'endpoint' self.collection_key = 'endpoints' self.model = endpoints.Endpoint diff --git a/tests/v3/test_groups.py b/tests/v3/test_groups.py index 08b762e25..5e9b9adbb 100644 --- a/tests/v3/test_groups.py +++ b/tests/v3/test_groups.py @@ -14,11 +14,9 @@ # License for the specific language governing permissions and limitations # under the License. -import copy -import urlparse import uuid -import requests +import httpretty from keystoneclient.v3 import groups from tests.v3 import utils @@ -27,7 +25,6 @@ from tests.v3 import utils class GroupTests(utils.TestCase, utils.CrudTests): def setUp(self): super(GroupTests, self).setUp() - self.additionalSetUp() self.key = 'group' self.collection_key = 'groups' self.model = groups.Group @@ -38,50 +35,31 @@ class GroupTests(utils.TestCase, utils.CrudTests): kwargs.setdefault('name', uuid.uuid4().hex) return kwargs + @httpretty.activate def test_list_groups_for_user(self): user_id = uuid.uuid4().hex ref_list = [self.new_ref(), self.new_ref()] - resp = utils.TestResponse({ - "status_code": 200, - "text": self.serialize(ref_list), - }) - method = 'GET' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/users/%s/%s' % ( - user_id, self.collection_key)), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_entity(httpretty.GET, + ['users', user_id, self.collection_key], + status=200, entity=ref_list) returned_list = self.manager.list(user=user_id) self.assertTrue(len(returned_list)) [self.assertTrue(isinstance(r, self.model)) for r in returned_list] + @httpretty.activate def test_list_groups_for_domain(self): ref_list = [self.new_ref(), self.new_ref()] - domain_id = uuid.uuid4().hex - resp = utils.TestResponse({ - "status_code": 200, - "text": self.serialize(ref_list), - }) - method = 'GET' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/%s?domain_id=%s' % (self.collection_key, domain_id)), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_entity(httpretty.GET, + [self.collection_key], + status=200, entity=ref_list) returned_list = self.manager.list(domain=domain_id) self.assertTrue(len(returned_list)) [self.assertTrue(isinstance(r, self.model)) for r in returned_list] + + self.assertEqual(httpretty.last_request().querystring, + {'domain_id': [domain_id]}) diff --git a/tests/v3/test_policies.py b/tests/v3/test_policies.py index ec88c7f6f..b2ce1eabb 100644 --- a/tests/v3/test_policies.py +++ b/tests/v3/test_policies.py @@ -21,7 +21,6 @@ from tests.v3 import utils class PolicyTests(utils.TestCase, utils.CrudTests): def setUp(self): super(PolicyTests, self).setUp() - self.additionalSetUp() self.key = 'policy' self.collection_key = 'policies' self.model = policies.Policy diff --git a/tests/v3/test_projects.py b/tests/v3/test_projects.py index 08ab0bbaf..ad696dc3d 100644 --- a/tests/v3/test_projects.py +++ b/tests/v3/test_projects.py @@ -12,11 +12,9 @@ # License for the specific language governing permissions and limitations # under the License. -import copy -import urlparse import uuid -import requests +import httpretty from keystoneclient.v3 import projects from tests.v3 import utils @@ -25,7 +23,6 @@ from tests.v3 import utils class ProjectTests(utils.TestCase, utils.CrudTests): def setUp(self): super(ProjectTests, self).setUp() - self.additionalSetUp() self.key = 'project' self.collection_key = 'projects' self.model = projects.Project @@ -38,50 +35,30 @@ class ProjectTests(utils.TestCase, utils.CrudTests): kwargs.setdefault('name', uuid.uuid4().hex) return kwargs + @httpretty.activate def test_list_projects_for_user(self): ref_list = [self.new_ref(), self.new_ref()] - user_id = uuid.uuid4().hex - resp = utils.TestResponse({ - "status-code": 200, - "text": self.serialize(ref_list), - }) - method = 'GET' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/users/%s/%s' % (user_id, self.collection_key)), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_entity(httpretty.GET, + ['users', user_id, self.collection_key], + entity=ref_list) returned_list = self.manager.list(user=user_id) self.assertTrue(len(returned_list)) [self.assertTrue(isinstance(r, self.model)) for r in returned_list] + @httpretty.activate def test_list_projects_for_domain(self): ref_list = [self.new_ref(), self.new_ref()] - domain_id = uuid.uuid4().hex - resp = utils.TestResponse({ - "status_code": 200, - "text": self.serialize(ref_list), - }) - method = 'GET' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/%s?domain_id=%s' % (self.collection_key, domain_id)), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_entity(httpretty.GET, [self.collection_key], + entity=ref_list) returned_list = self.manager.list(domain=domain_id) self.assertTrue(len(returned_list)) [self.assertTrue(isinstance(r, self.model)) for r in returned_list] + + self.assertEqual(httpretty.last_request().querystring, + {'domain_id': [domain_id]}) diff --git a/tests/v3/test_roles.py b/tests/v3/test_roles.py index d694a3ef1..59db0800a 100644 --- a/tests/v3/test_roles.py +++ b/tests/v3/test_roles.py @@ -14,11 +14,9 @@ # License for the specific language governing permissions and limitations # under the License. -import copy -import urlparse import uuid -import requests +import httpretty from keystoneclient import exceptions from keystoneclient.v3 import roles @@ -28,7 +26,6 @@ from tests.v3 import utils class RoleTests(utils.TestCase, utils.CrudTests): def setUp(self): super(RoleTests, self).setUp() - self.additionalSetUp() self.key = 'role' self.collection_key = 'roles' self.model = roles.Role @@ -39,376 +36,213 @@ class RoleTests(utils.TestCase, utils.CrudTests): kwargs.setdefault('name', uuid.uuid4().hex) return kwargs + @httpretty.activate def test_domain_role_grant(self): user_id = uuid.uuid4().hex domain_id = uuid.uuid4().hex ref = self.new_ref() - resp = utils.TestResponse({ - "status_code": 201, - "text": '', - }) - method = 'PUT' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/domains/%s/users/%s/%s/%s' % ( - domain_id, user_id, self.collection_key, ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.PUT, + ['domains', domain_id, 'users', user_id, + self.collection_key, ref['id']], + status=201) self.manager.grant(role=ref['id'], domain=domain_id, user=user_id) + @httpretty.activate def test_domain_group_role_grant(self): group_id = uuid.uuid4().hex domain_id = uuid.uuid4().hex ref = self.new_ref() - resp = utils.TestResponse({ - "status_code": 201, - "text": '', - }) - method = 'PUT' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/domains/%s/groups/%s/%s/%s' % ( - domain_id, group_id, self.collection_key, ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.PUT, + ['domains', domain_id, 'groups', group_id, + self.collection_key, ref['id']], + status=201) self.manager.grant(role=ref['id'], domain=domain_id, group=group_id) + @httpretty.activate def test_domain_role_list(self): user_id = uuid.uuid4().hex domain_id = uuid.uuid4().hex ref_list = [self.new_ref(), self.new_ref()] - resp = utils.TestResponse({ - "status_code": 200, - "text": self.serialize(ref_list), - }) - method = 'GET' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/domains/%s/users/%s/%s' % ( - domain_id, user_id, self.collection_key)), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_entity(httpretty.GET, + ['domains', domain_id, 'users', user_id, + self.collection_key], entity=ref_list) self.manager.list(domain=domain_id, user=user_id) + @httpretty.activate def test_domain_group_role_list(self): group_id = uuid.uuid4().hex domain_id = uuid.uuid4().hex ref_list = [self.new_ref(), self.new_ref()] - resp = utils.TestResponse({ - "status_code": 200, - "text": self.serialize(ref_list), - }) - method = 'GET' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/domains/%s/groups/%s/%s' % ( - domain_id, group_id, self.collection_key)), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_entity(httpretty.GET, + ['domains', domain_id, 'groups', group_id, + self.collection_key], entity=ref_list) self.manager.list(domain=domain_id, group=group_id) + @httpretty.activate def test_domain_role_check(self): user_id = uuid.uuid4().hex domain_id = uuid.uuid4().hex ref = self.new_ref() - resp = utils.TestResponse({ - "status_code": 204, - "text": '', - }) - method = 'HEAD' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/domains/%s/users/%s/%s/%s' % ( - domain_id, user_id, self.collection_key, ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.HEAD, + ['domains', domain_id, 'users', user_id, + self.collection_key, ref['id']], + status=204) self.manager.check(role=ref['id'], domain=domain_id, user=user_id) + @httpretty.activate def test_domain_group_role_check(self): return group_id = uuid.uuid4().hex domain_id = uuid.uuid4().hex ref = self.new_ref() - resp = utils.TestResponse({ - "status_code": 204, - "text": '', - }) - method = 'HEAD' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/domains/%s/groups/%s/%s/%s' % ( - domain_id, group_id, self.collection_key, ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.HEAD, + ['domains', domain_id, 'groups', group_id, + self.collection_key, ref['id']], + status=204) self.manager.check(role=ref['id'], domain=domain_id, group=group_id) + @httpretty.activate def test_domain_role_revoke(self): user_id = uuid.uuid4().hex domain_id = uuid.uuid4().hex ref = self.new_ref() - resp = utils.TestResponse({ - "status_code": 204, - "text": '', - }) - method = 'DELETE' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/domains/%s/users/%s/%s/%s' % ( - domain_id, user_id, self.collection_key, ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.DELETE, + ['domains', domain_id, 'users', user_id, + self.collection_key, ref['id']], + status=204) self.manager.revoke(role=ref['id'], domain=domain_id, user=user_id) + @httpretty.activate def test_domain_group_role_revoke(self): group_id = uuid.uuid4().hex domain_id = uuid.uuid4().hex ref = self.new_ref() - resp = utils.TestResponse({ - "status_code": 204, - "text": '', - }) - method = 'DELETE' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/domains/%s/groups/%s/%s/%s' % ( - domain_id, group_id, self.collection_key, ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.DELETE, + ['domains', domain_id, 'groups', group_id, + self.collection_key, ref['id']], + status=204) self.manager.revoke(role=ref['id'], domain=domain_id, group=group_id) + @httpretty.activate def test_project_role_grant(self): user_id = uuid.uuid4().hex project_id = uuid.uuid4().hex ref = self.new_ref() - resp = utils.TestResponse({ - "status_code": 201, - "text": '', - }) - method = 'PUT' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/projects/%s/users/%s/%s/%s' % ( - project_id, user_id, self.collection_key, ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.PUT, + ['projects', project_id, 'users', user_id, + self.collection_key, ref['id']], + status=201) self.manager.grant(role=ref['id'], project=project_id, user=user_id) + @httpretty.activate def test_project_group_role_grant(self): group_id = uuid.uuid4().hex project_id = uuid.uuid4().hex ref = self.new_ref() - resp = utils.TestResponse({ - "status_code": 201, - "text": '', - }) - method = 'PUT' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/projects/%s/groups/%s/%s/%s' % ( - project_id, group_id, self.collection_key, ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.PUT, + ['projects', project_id, 'groups', group_id, + self.collection_key, ref['id']], + status=201) self.manager.grant(role=ref['id'], project=project_id, group=group_id) + @httpretty.activate def test_project_role_list(self): user_id = uuid.uuid4().hex project_id = uuid.uuid4().hex ref_list = [self.new_ref(), self.new_ref()] - resp = utils.TestResponse({ - "status_code": 200, - "text": self.serialize(ref_list), - }) - method = 'GET' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/projects/%s/users/%s/%s' % ( - project_id, user_id, self.collection_key)), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_entity(httpretty.GET, + ['projects', project_id, 'users', user_id, + self.collection_key], entity=ref_list) self.manager.list(project=project_id, user=user_id) + @httpretty.activate def test_project_group_role_list(self): group_id = uuid.uuid4().hex project_id = uuid.uuid4().hex ref_list = [self.new_ref(), self.new_ref()] - resp = utils.TestResponse({ - "status_code": 200, - "text": self.serialize(ref_list), - }) - method = 'GET' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/projects/%s/groups/%s/%s' % ( - project_id, group_id, self.collection_key)), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_entity(httpretty.GET, + ['projects', project_id, 'groups', group_id, + self.collection_key], entity=ref_list) self.manager.list(project=project_id, group=group_id) + @httpretty.activate def test_project_role_check(self): user_id = uuid.uuid4().hex project_id = uuid.uuid4().hex ref = self.new_ref() - resp = utils.TestResponse({ - "status_code": 200, - "text": '', - }) - method = 'HEAD' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/projects/%s/users/%s/%s/%s' % ( - project_id, user_id, self.collection_key, ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.HEAD, + ['projects', project_id, 'users', user_id, + self.collection_key, ref['id']], + status=200) self.manager.check(role=ref['id'], project=project_id, user=user_id) + @httpretty.activate def test_project_group_role_check(self): group_id = uuid.uuid4().hex project_id = uuid.uuid4().hex ref = self.new_ref() - resp = utils.TestResponse({ - "status_code": 200, - "text": '', - }) - method = 'HEAD' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/projects/%s/groups/%s/%s/%s' % ( - project_id, group_id, self.collection_key, ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.HEAD, + ['projects', project_id, 'groups', group_id, + self.collection_key, ref['id']], + status=200) self.manager.check(role=ref['id'], project=project_id, group=group_id) + @httpretty.activate def test_project_role_revoke(self): user_id = uuid.uuid4().hex project_id = uuid.uuid4().hex ref = self.new_ref() - resp = utils.TestResponse({ - "status_code": 204, - "text": '', - }) - method = 'DELETE' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/projects/%s/users/%s/%s/%s' % ( - project_id, user_id, self.collection_key, ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.DELETE, + ['projects', project_id, 'users', user_id, + self.collection_key, ref['id']], + status=204) self.manager.revoke(role=ref['id'], project=project_id, user=user_id) + @httpretty.activate def test_project_group_role_revoke(self): group_id = uuid.uuid4().hex project_id = uuid.uuid4().hex ref = self.new_ref() - resp = utils.TestResponse({ - "status_code": 204, - "text": '', - }) - method = 'DELETE' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/projects/%s/groups/%s/%s/%s' % ( - project_id, group_id, self.collection_key, ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.DELETE, + ['projects', project_id, 'groups', group_id, + self.collection_key, ref['id']], + status=204) self.manager.revoke(role=ref['id'], project=project_id, group=group_id) + @httpretty.activate def test_domain_project_role_grant_fails(self): user_id = uuid.uuid4().hex project_id = uuid.uuid4().hex diff --git a/tests/v3/test_services.py b/tests/v3/test_services.py index 1be525062..be73b3b1a 100644 --- a/tests/v3/test_services.py +++ b/tests/v3/test_services.py @@ -21,7 +21,6 @@ from tests.v3 import utils class ServiceTests(utils.TestCase, utils.CrudTests): def setUp(self): super(ServiceTests, self).setUp() - self.additionalSetUp() self.key = 'service' self.collection_key = 'services' self.model = services.Service diff --git a/tests/v3/test_trusts.py b/tests/v3/test_trusts.py index f378cc6de..d97ead143 100644 --- a/tests/v3/test_trusts.py +++ b/tests/v3/test_trusts.py @@ -13,18 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +import uuid from keystoneclient import exceptions from keystoneclient.openstack.common import timeutils from keystoneclient.v3.contrib import trusts from tests.v3 import utils -import uuid class TrustTests(utils.TestCase, utils.CrudTests): def setUp(self): super(TrustTests, self).setUp() - self.additionalSetUp() self.key = 'trust' self.collection_key = 'trusts' self.model = trusts.Trust diff --git a/tests/v3/test_users.py b/tests/v3/test_users.py index 9982feab2..1c28ef5a1 100644 --- a/tests/v3/test_users.py +++ b/tests/v3/test_users.py @@ -14,11 +14,9 @@ # License for the specific language governing permissions and limitations # under the License. -import copy -import urlparse import uuid -import requests +import httpretty from keystoneclient import exceptions from keystoneclient.v3 import users @@ -28,7 +26,6 @@ from tests.v3 import utils class UserTests(utils.TestCase, utils.CrudTests): def setUp(self): super(UserTests, self).setUp() - self.additionalSetUp() self.key = 'user' self.collection_key = 'users' self.model = users.User @@ -43,25 +40,13 @@ class UserTests(utils.TestCase, utils.CrudTests): kwargs.setdefault('default_project_id', uuid.uuid4().hex) return kwargs + @httpretty.activate def test_add_user_to_group(self): group_id = uuid.uuid4().hex ref = self.new_ref() - resp = utils.TestResponse({ - "status_code": 204, - "text": '', - }) - - method = 'PUT' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/groups/%s/%s/%s' % ( - group_id, self.collection_key, ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.PUT, + ['groups', group_id, self.collection_key, ref['id']], + status=204) self.manager.add_to_group(user=ref['id'], group=group_id) self.assertRaises(exceptions.ValidationError, @@ -69,49 +54,27 @@ class UserTests(utils.TestCase, utils.CrudTests): user=ref['id'], group=None) + @httpretty.activate def test_list_users_in_group(self): group_id = uuid.uuid4().hex ref_list = [self.new_ref(), self.new_ref()] - resp = utils.TestResponse({ - "status_code": 200, - "text": self.serialize(ref_list), - }) - method = 'GET' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/groups/%s/%s' % ( - group_id, self.collection_key)), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_entity(httpretty.GET, + ['groups', group_id, self.collection_key], + entity=ref_list) returned_list = self.manager.list(group=group_id) self.assertTrue(len(returned_list)) [self.assertTrue(isinstance(r, self.model)) for r in returned_list] + @httpretty.activate def test_check_user_in_group(self): group_id = uuid.uuid4().hex ref = self.new_ref() - resp = utils.TestResponse({ - "status_code": 204, - "text": '', - }) - method = 'HEAD' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/groups/%s/%s/%s' % ( - group_id, self.collection_key, ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.HEAD, + ['groups', group_id, self.collection_key, ref['id']], + status=204) self.manager.check_in_group(user=ref['id'], group=group_id) @@ -120,25 +83,14 @@ class UserTests(utils.TestCase, utils.CrudTests): user=ref['id'], group=None) + @httpretty.activate def test_remove_user_from_group(self): group_id = uuid.uuid4().hex ref = self.new_ref() - resp = utils.TestResponse({ - "status_code": 204, - "text": '', - }) - method = 'DELETE' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/groups/%s/%s/%s' % ( - group_id, self.collection_key, ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_url(httpretty.DELETE, + ['groups', group_id, self.collection_key, ref['id']], + status=204) self.manager.remove_from_group(user=ref['id'], group=group_id) self.assertRaises(exceptions.ValidationError, @@ -146,29 +98,17 @@ class UserTests(utils.TestCase, utils.CrudTests): user=ref['id'], group=None) + @httpretty.activate def test_create_with_project(self): # Can create a user with the deprecated project option rather than # default_project_id. ref = self.new_ref() - resp = utils.TestResponse({ - "status_code": 201, - "text": self.serialize(ref), - }) - method = 'POST' + self.stub_entity(httpretty.POST, [self.collection_key], + status=201, entity=ref) + req_ref = ref.copy() req_ref.pop('id') - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - kwargs['data'] = self.serialize(req_ref) - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/%s' % self.collection_key), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() - param_ref = req_ref.copy() # Use deprecated project_id rather than new default_project_id. param_ref['project_id'] = param_ref.pop('default_project_id') @@ -181,31 +121,22 @@ class UserTests(utils.TestCase, utils.CrudTests): getattr(returned, attr), ref[attr], 'Expected different %s' % attr) + self.assertEntityRequestBodyIs(req_ref) + @httpretty.activate def test_create_with_project_and_default_project(self): # Can create a user with the deprecated project and default_project_id. # The backend call should only pass the default_project_id. ref = self.new_ref() - resp = utils.TestResponse({ - "status_code": 201, - "text": self.serialize(ref), - }) - method = 'POST' + self.stub_entity(httpretty.POST, + [self.collection_key], + status=201, entity=ref) + req_ref = ref.copy() req_ref.pop('id') - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - kwargs['data'] = self.serialize(req_ref) - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/%s' % self.collection_key), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() - param_ref = req_ref.copy() + # Add the deprecated project_id in the call, the value will be ignored. param_ref['project_id'] = 'project' params = utils.parameterize(param_ref) @@ -217,31 +148,21 @@ class UserTests(utils.TestCase, utils.CrudTests): getattr(returned, attr), ref[attr], 'Expected different %s' % attr) + self.assertEntityRequestBodyIs(req_ref) + @httpretty.activate def test_update_with_project(self): # Can update a user with the deprecated project option rather than # default_project_id. ref = self.new_ref() req_ref = ref.copy() - del req_ref['id'] - resp = utils.TestResponse({ - "status_code": 200, - "text": self.serialize(ref), - }) - - method = 'PATCH' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - kwargs['data'] = self.serialize(req_ref) - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/%s/%s' % (self.collection_key, ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() - + req_ref.pop('id') param_ref = req_ref.copy() + + self.stub_entity(httpretty.PATCH, + [self.collection_key, ref['id']], + status=200, entity=ref) + # Use deprecated project_id rather than new default_project_id. param_ref['project_id'] = param_ref.pop('default_project_id') params = utils.parameterize(param_ref) @@ -253,29 +174,19 @@ class UserTests(utils.TestCase, utils.CrudTests): getattr(returned, attr), ref[attr], 'Expected different %s' % attr) + self.assertEntityRequestBodyIs(req_ref) + @httpretty.activate def test_update_with_project_and_default_project(self, ref=None): ref = self.new_ref() req_ref = ref.copy() - del req_ref['id'] - resp = utils.TestResponse({ - "status_code": 200, - "text": self.serialize(ref), - }) - - method = 'PATCH' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - kwargs['data'] = self.serialize(req_ref) - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - 'v3/%s/%s' % (self.collection_key, ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() - + req_ref.pop('id') param_ref = req_ref.copy() + + self.stub_entity(httpretty.PATCH, + [self.collection_key, ref['id']], + status=200, entity=ref) + # Add the deprecated project_id in the call, the value will be ignored. param_ref['project_id'] = 'project' params = utils.parameterize(param_ref) @@ -287,3 +198,4 @@ class UserTests(utils.TestCase, utils.CrudTests): getattr(returned, attr), ref[attr], 'Expected different %s' % attr) + self.assertEntityRequestBodyIs(req_ref) diff --git a/tests/v3/utils.py b/tests/v3/utils.py index 8358f8282..87cba88d1 100644 --- a/tests/v3/utils.py +++ b/tests/v3/utils.py @@ -12,21 +12,16 @@ # License for the specific language governing permissions and limitations # under the License. -import copy -import json -import time import urlparse import uuid -import mock -from mox3 import mox -import requests -import testtools - -from .. import utils +import httpretty +from keystoneclient.openstack.common import jsonutils from keystoneclient.v3 import client +from tests import utils + TestResponse = utils.TestResponse @@ -44,26 +39,16 @@ def parameterize(ref): return params -class TestClient(client.Client): +class UnauthenticatedTestCase(utils.TestCase): + """Class used as base for unauthenticated calls.""" - def serialize(self, entity): - return json.dumps(entity, sort_keys=True) - - -class TestCase(testtools.TestCase): - TEST_DOMAIN_ID = '1' - TEST_DOMAIN_NAME = 'aDomain' - TEST_TENANT_ID = '1' - TEST_TENANT_NAME = 'aTenant' - TEST_TOKEN = 'aToken' - TEST_USER = 'test' TEST_ROOT_URL = 'http://127.0.0.1:5000/' TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v3') TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/' TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v3') - TEST_REQUEST_BASE = { - 'verify': True, - } + + +class TestCase(UnauthenticatedTestCase): TEST_SERVICE_CATALOG = [{ "endpoints": [{ @@ -145,56 +130,21 @@ class TestCase(testtools.TestCase): def setUp(self): super(TestCase, self).setUp() - self.mox = mox.Mox() - self.request_patcher = mock.patch.object(requests, 'request', - self.mox.CreateMockAnything()) - self.time_patcher = mock.patch.object(time, 'time', - lambda: 1234) - self.request_patcher.start() - self.time_patcher.start() - self.client = TestClient(username=self.TEST_USER, - token=self.TEST_TOKEN, - tenant_name=self.TEST_TENANT_NAME, - auth_url=self.TEST_URL, - endpoint=self.TEST_URL) + self.client = client.Client(username=self.TEST_USER, + token=self.TEST_TOKEN, + tenant_name=self.TEST_TENANT_NAME, + auth_url=self.TEST_URL, + endpoint=self.TEST_URL) - def tearDown(self): - self.request_patcher.stop() - self.time_patcher.stop() - self.mox.UnsetStubs() - self.mox.VerifyAll() - super(TestCase, self).tearDown() + def stub_auth(self, subject_token=None, **kwargs): + if not subject_token: + subject_token = self.TEST_TOKEN + + self.stub_url(httpretty.POST, ['auth', 'tokens'], + X_Subject_Token=subject_token, **kwargs) -class UnauthenticatedTestCase(testtools.TestCase): - """Class used as base for unauthenticated calls.""" - TEST_ROOT_URL = 'http://127.0.0.1:5000/' - TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v3') - TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/' - TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v3') - TEST_REQUEST_BASE = { - 'verify': True, - } - - def setUp(self): - super(UnauthenticatedTestCase, self).setUp() - self.mox = mox.Mox() - - self.request_patcher = mock.patch.object(requests, 'request', - self.mox.CreateMockAnything()) - self.time_patcher = mock.patch.object(time, 'time', - lambda: 1234) - self.request_patcher.start() - - def tearDown(self): - self.request_patcher.stop() - self.time_patcher.stop() - self.mox.UnsetStubs() - self.mox.VerifyAll() - super(UnauthenticatedTestCase, self).tearDown() - - -class CrudTests(testtools.TestCase): +class CrudTests(object): key = None collection_key = None model = None @@ -205,34 +155,36 @@ class CrudTests(testtools.TestCase): kwargs.setdefault('id', uuid.uuid4().hex) return kwargs - def additionalSetUp(self): - self.headers = { - 'GET': { - 'X-Auth-Token': 'aToken', - 'User-Agent': 'python-keystoneclient', - } - } - - self.headers['HEAD'] = self.headers['GET'].copy() - self.headers['DELETE'] = self.headers['GET'].copy() - self.headers['PUT'] = self.headers['GET'].copy() - self.headers['POST'] = self.headers['GET'].copy() - self.headers['POST']['Content-Type'] = 'application/json' - self.headers['PATCH'] = self.headers['POST'].copy() - - def serialize(self, entity): + def encode(self, entity): if isinstance(entity, dict): - return json.dumps({self.key: entity}, sort_keys=True) + return {self.key: entity} if isinstance(entity, list): - return json.dumps({self.collection_key: entity}, sort_keys=True) - raise NotImplementedError('Are you sure you want to serialize that?') + return {self.collection_key: entity} + raise NotImplementedError('Are you sure you want to encode that?') - def _req_path(self): - if self.path_prefix: - return 'v3/%s/%s' % (self.path_prefix, self.collection_key) - else: - return 'v3/%s' % self.collection_key + def stub_entity(self, method, parts=None, entity=None, id=None, **kwargs): + if entity: + entity = self.encode(entity) + kwargs['json'] = entity + if not parts: + parts = [self.collection_key] + + if self.path_prefix: + parts.insert(0, self.path_prefix) + + if id: + if not parts: + parts = [] + + parts.append(id) + + self.stub_url(method, parts=parts, **kwargs) + + def assertEntityRequestBodyIs(self, entity): + self.assertRequestBodyIs(json=self.encode(entity)) + + @httpretty.activate def test_create(self, ref=None, req_ref=None): ref = ref or self.new_ref() manager_ref = ref.copy() @@ -244,24 +196,8 @@ class CrudTests(testtools.TestCase): # from datetime object to timestamp string) req_ref = req_ref or ref.copy() req_ref.pop('id') - data = self.serialize(req_ref) - resp = TestResponse({ - "status_code": 201, - "text": data, - }) - method = 'POST' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - kwargs['data'] = data - - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - self._req_path()), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_entity(httpretty.POST, entity=req_ref, status=201) returned = self.manager.create(**parameterize(manager_ref)) self.assertTrue(isinstance(returned, self.model)) @@ -270,24 +206,13 @@ class CrudTests(testtools.TestCase): getattr(returned, attr), req_ref[attr], 'Expected different %s' % attr) + self.assertEntityRequestBodyIs(req_ref) + @httpretty.activate def test_get(self, ref=None): ref = ref or self.new_ref() - resp = TestResponse({ - "status_code": 200, - "text": self.serialize(ref), - }) - method = 'GET' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - '%s/%s' % (self._req_path(), ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_entity(httpretty.GET, id=ref['id'], entity=ref) returned = self.manager.get(ref['id']) self.assertTrue(isinstance(returned, self.model)) @@ -297,47 +222,31 @@ class CrudTests(testtools.TestCase): ref[attr], 'Expected different %s' % attr) + @httpretty.activate def test_list(self, ref_list=None, expected_path=None, **filter_kwargs): ref_list = ref_list or [self.new_ref(), self.new_ref()] - resp = TestResponse({ - "status_code": 200, - "text": self.serialize(ref_list), - }) - method = 'GET' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - expected_path or self._req_path()), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + if not expected_path: + if self.path_prefix: + expected_path = 'v3/%s/%s' % (self.path_prefix, + self.collection_key) + else: + expected_path = 'v3/%s' % self.collection_key + + httpretty.register_uri(httpretty.GET, + urlparse.urljoin(self.TEST_URL, expected_path), + body=jsonutils.dumps(self.encode(ref_list))) returned_list = self.manager.list(**filter_kwargs) self.assertTrue(len(returned_list)) [self.assertTrue(isinstance(r, self.model)) for r in returned_list] + @httpretty.activate def test_find(self, ref=None): ref = ref or self.new_ref() ref_list = [ref] - resp = TestResponse({ - "status_code": 200, - "text": self.serialize(ref_list), - }) - method = 'GET' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - query = '?name=%s' % ref['name'] if hasattr(ref, 'name') else '' - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - '%s%s' % (self._req_path(), query)), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_entity(httpretty.GET, entity=ref_list) returned = self.manager.find(name=getattr(ref, 'name', None)) self.assertTrue(isinstance(returned, self.model)) @@ -347,26 +256,19 @@ class CrudTests(testtools.TestCase): ref[attr], 'Expected different %s' % attr) + if hasattr(ref, 'name'): + self.assertQueryStringIs({'name': ref['name']}) + else: + self.assertQueryStringIs({}) + + @httpretty.activate def test_update(self, ref=None): ref = ref or self.new_ref() - req_ref = ref.copy() - del req_ref['id'] - resp = TestResponse({ - "status_code": 200, - "text": self.serialize(ref), - }) - method = 'PATCH' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - kwargs['data'] = self.serialize(req_ref) - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - '%s/%s' % (self._req_path(), ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_entity(httpretty.PATCH, id=ref['id'], entity=ref) + + req_ref = ref.copy() + req_ref.pop('id') returned = self.manager.update(ref['id'], **parameterize(req_ref)) self.assertTrue(isinstance(returned, self.model)) @@ -375,23 +277,11 @@ class CrudTests(testtools.TestCase): getattr(returned, attr), ref[attr], 'Expected different %s' % attr) + self.assertEntityRequestBodyIs(req_ref) + @httpretty.activate def test_delete(self, ref=None): ref = ref or self.new_ref() - resp = TestResponse({ - "status_code": 204, - "text": '', - }) - - method = 'DELETE' - kwargs = copy.copy(self.TEST_REQUEST_BASE) - kwargs['headers'] = self.headers[method] - requests.request( - method, - urlparse.urljoin( - self.TEST_URL, - '%s/%s' % (self._req_path(), ref['id'])), - **kwargs).AndReturn((resp)) - self.mox.ReplayAll() + self.stub_entity(httpretty.DELETE, id=ref['id'], status=204) self.manager.delete(ref['id'])