Use requests module for HTTP/HTTPS

* Implement correct certificate verification
* Add requests to tools/pip-requires
* Fix OS_CACERT env var help text
* Add info to README
* Rework tests to use requests

Pinned requests module to < 1.0 as 1.0.2 is now current in pipi
as of 17Dec2012.

Change-Id: I120d2c12d6f20ebe2fd7182ec8988cc73f623b80
This commit is contained in:
Dean Troyer
2012-11-16 17:43:05 -06:00
parent 581264757e
commit 51dc6a0cef
24 changed files with 947 additions and 716 deletions

View File

@@ -75,6 +75,17 @@ OS_REGION_NAME``)::
If a region is not specified and multiple regions are returned by the If a region is not specified and multiple regions are returned by the
Identity service, the client may not access the same region consistently. Identity service, the client may not access the same region consistently.
If you need to connect to a server that is TLS-enabled (the auth URL begins
with 'https') and it uses a certificate from a private CA or a self-signed
certificate you will need to specify the path to an appropriate CA certificate
to use to validate the server certificate with ``--os-cacert`` or an
environment variable::
export OS_CACERT=/etc/ssl/my-root-cert.pem
Certificate verification can be turned off using ``--insecure``. This should
be used with caution.
You'll find complete documentation on the shell by running ``keystone help``:: You'll find complete documentation on the shell by running ``keystone help``::
usage: keystone [--os-username <auth-user-name>] usage: keystone [--os-username <auth-user-name>]
@@ -85,8 +96,8 @@ You'll find complete documentation on the shell by running ``keystone help``::
[--os-identity-api-version <identity-api-version>] [--os-identity-api-version <identity-api-version>]
[--os-token <service-token>] [--os-token <service-token>]
[--os-endpoint <service-endpoint>] [--os-endpoint <service-endpoint>]
[--os-cacert <ca-certificate>] [--os-cert <certificate>] [--os-cacert <ca-certificate>] [--insecure]
[--os-key <key>] [--insecure] [--os-cert <certificate>] [--os-key <key>] [--no-cache]
<subcommand> ... <subcommand> ...
Command-line interface to the OpenStack Identity API. Command-line interface to the OpenStack Identity API.
@@ -167,13 +178,14 @@ You'll find complete documentation on the shell by running ``keystone help``::
from the service catalog (via authentication). from the service catalog (via authentication).
Defaults to env[OS_SERVICE_ENDPOINT] Defaults to env[OS_SERVICE_ENDPOINT]
--os-cacert <ca-certificate> --os-cacert <ca-certificate>
Defaults to env[OS_CACERT] Specify a CA bundle file to use in verifying a TLS
(https) server certificate. Defaults to env[OS_CACERT]
--insecure Explicitly allow keystoneclient to perform "insecure"
TLS (https) requests. The server's certificate will
not be verified against any certificate authorities.
This option should be used with caution.
--os-cert <certificate> --os-cert <certificate>
Defaults to env[OS_CERT] Defaults to env[OS_CERT]
--os-key <key> Defaults to env[OS_KEY] --os-key <key> Defaults to env[OS_KEY]
--insecure Explicitly allow keystoneclient to perform "insecure"
SSL (https) requests. The server's certificate will
not be verified against any certificate authorities.
This option should be used with caution.
See "keystone help COMMAND" for help on a specific command. See "keystone help COMMAND" for help on a specific command.

View File

@@ -82,7 +82,7 @@ class Manager(object):
def _head(self, url): def _head(self, url):
resp, body = self.api.head(url) resp, body = self.api.head(url)
return resp.status == 204 return resp.status_code == 204
def _create(self, url, body, response_key, return_raw=False): def _create(self, url, body, response_key, return_raw=False):
resp, body = self.api.post(url, body=body) resp, body = self.api.post(url, body=body)

View File

@@ -10,9 +10,10 @@ OpenStack Client interface. Handles the REST calls and responses.
import copy import copy
import logging import logging
import sys
import urlparse import urlparse
import httplib2 import requests
try: try:
import json import json
@@ -42,22 +43,20 @@ except ImportError:
keyring_available = False keyring_available = False
class HTTPClient(httplib2.Http): class HTTPClient(object):
USER_AGENT = 'python-keystoneclient' USER_AGENT = 'python-keystoneclient'
requests_config = {
'danger_mode': False,
}
def __init__(self, username=None, tenant_id=None, tenant_name=None, def __init__(self, username=None, tenant_id=None, tenant_name=None,
password=None, auth_url=None, region_name=None, timeout=None, password=None, auth_url=None, region_name=None, timeout=None,
endpoint=None, token=None, cacert=None, key=None, endpoint=None, token=None, cacert=None, key=None,
cert=None, insecure=False, original_ip=None, debug=False, cert=None, insecure=False, original_ip=None, debug=False,
auth_ref=None, use_keyring=False, force_new_token=False, auth_ref=None, use_keyring=False, force_new_token=False,
stale_duration=None): stale_duration=None):
super(HTTPClient, self).__init__(timeout=timeout, ca_certs=cacert)
if cert:
if key:
self.add_certificate(key=key, cert=cert, domain='')
else:
self.add_certificate(key=cert, cert=cert, domain='')
self.version = 'v2.0' self.version = 'v2.0'
# set baseline defaults # set baseline defaults
self.username = None self.username = None
@@ -94,10 +93,16 @@ class HTTPClient(httplib2.Http):
self.password = password self.password = password
self.original_ip = original_ip self.original_ip = original_ip
self.region_name = region_name self.region_name = region_name
if cacert:
# httplib2 overrides self.verify_cert = cacert
self.force_exception_to_status_code = True else:
self.disable_ssl_certificate_validation = insecure self.verify_cert = True
if insecure:
self.verify_cert = False
self.cert = cert
if cert and key:
self.cert = (cert, key,)
self.domain = ''
# logging setup # logging setup
self.debug_log = debug self.debug_log = debug
@@ -105,6 +110,7 @@ class HTTPClient(httplib2.Http):
ch = logging.StreamHandler() ch = logging.StreamHandler()
_logger.setLevel(logging.DEBUG) _logger.setLevel(logging.DEBUG)
_logger.addHandler(ch) _logger.addHandler(ch)
self.requests_config['verbose'] = sys.stderr
# keyring setup # keyring setup
self.use_keyring = use_keyring and keyring_available self.use_keyring = use_keyring and keyring_available
@@ -277,13 +283,17 @@ class HTTPClient(httplib2.Http):
header = ' -H "%s: %s"' % (element, kwargs['headers'][element]) header = ' -H "%s: %s"' % (element, kwargs['headers'][element])
string_parts.append(header) string_parts.append(header)
_logger.debug("REQ: %s\n" % "".join(string_parts)) _logger.debug("REQ: %s" % "".join(string_parts))
if 'body' in kwargs: if 'body' in kwargs:
_logger.debug("REQ BODY: %s\n" % (kwargs['body'])) _logger.debug("REQ BODY: %s\n" % (kwargs['body']))
def http_log_resp(self, resp, body): def http_log_resp(self, resp):
if self.debug_log: if self.debug_log:
_logger.debug("RESP: %s\nRESP BODY: %s\n", resp, body) _logger.debug(
"RESP: [%s] %s\nRESP BODY: %s\n",
resp.status_code,
resp.headers,
resp.text)
def serialize(self, entity): def serialize(self, entity):
return json.dumps(entity) return json.dumps(entity)
@@ -291,7 +301,7 @@ class HTTPClient(httplib2.Http):
def request(self, url, method, **kwargs): def request(self, url, method, **kwargs):
""" Send an http request with the specified characteristics. """ Send an http request with the specified characteristics.
Wrapper around httplib2.Http.request to handle tasks such as Wrapper around requests.request to handle tasks such as
setting headers, JSON encoding/decoding, and error handling. setting headers, JSON encoding/decoding, and error handling.
""" """
# Copy the kwargs so we can reuse the original in case of redirects # Copy the kwargs so we can reuse the original in case of redirects
@@ -303,26 +313,37 @@ class HTTPClient(httplib2.Http):
self.original_ip, self.USER_AGENT) self.original_ip, self.USER_AGENT)
if 'body' in kwargs: if 'body' in kwargs:
request_kwargs['headers']['Content-Type'] = 'application/json' request_kwargs['headers']['Content-Type'] = 'application/json'
request_kwargs['body'] = self.serialize(kwargs['body']) request_kwargs['data'] = self.serialize(kwargs['body'])
del request_kwargs['body']
if self.cert:
request_kwargs['cert'] = self.cert
self.http_log_req((url, method,), request_kwargs) self.http_log_req((url, method,), request_kwargs)
resp, body = super(HTTPClient, self).request(url, resp = requests.request(
method, method,
**request_kwargs) url,
self.http_log_resp(resp, body) verify=self.verify_cert,
config=self.requests_config,
**request_kwargs)
if resp.status in (400, 401, 403, 404, 408, 409, 413, 500, 501): self.http_log_resp(resp)
_logger.debug("Request returned failure status: %s", resp.status)
raise exceptions.from_response(resp, body) if resp.status_code in (400, 401, 403, 404, 408, 409, 413, 500, 501):
elif resp.status in (301, 302, 305): _logger.debug(
"Request returned failure status: %s",
resp.status_code)
raise exceptions.from_response(resp, resp.text)
elif resp.status_code in (301, 302, 305):
# Redirected. Reissue the request to the new location. # Redirected. Reissue the request to the new location.
return self.request(resp['location'], method, **kwargs) return self.request(resp.headers['location'], method, **kwargs)
if body: if resp.text:
try: try:
body = json.loads(body) body = json.loads(resp.text)
except ValueError: except ValueError:
_logger.debug("Could not decode JSON from body: %s" % body) body = None
_logger.debug("Could not decode JSON from body: %s"
% resp.text)
else: else:
_logger.debug("No body was returned.") _logger.debug("No body was returned.")
body = None body = None

View File

@@ -130,15 +130,15 @@ _code_map = dict((c.http_status, c) for c in [BadRequest,
def from_response(response, body): def from_response(response, body):
""" """
Return an instance of an ClientException or subclass Return an instance of an ClientException or subclass
based on an httplib2 response. based on an requests response.
Usage:: Usage::
resp, body = http.request(...) resp = requests.request(...)
if resp.status != 200: if resp.status_code != 200:
raise exception_from_response(resp, body) raise exception_from_response(resp, resp.text)
""" """
cls = _code_map.get(response.status, ClientException) cls = _code_map.get(response.status_code, ClientException)
if body: if body:
if hasattr(body, 'keys'): if hasattr(body, 'keys'):
error = body[body.keys()[0]] error = body[body.keys()[0]]
@@ -149,6 +149,6 @@ def from_response(response, body):
# probably couldn't communicate with Keystone at all. # probably couldn't communicate with Keystone at all.
message = "Unable to communicate with identity service: %s." % body message = "Unable to communicate with identity service: %s." % body
details = None details = None
return cls(code=response.status, message=message, details=details) return cls(code=response.status_code, message=message, details=details)
else: else:
return cls(code=response.status) return cls(code=response.status_code)

View File

@@ -84,7 +84,7 @@ class Client(client.HTTPClient):
resp, body = httpclient.request(url, "GET", resp, body = httpclient.request(url, "GET",
headers={'Accept': headers={'Accept':
'application/json'}) 'application/json'})
if resp.status in (200, 204): # in some cases we get No Content if resp.status_code in (200, 204): # some cases we get No Content
try: try:
results = {} results = {}
if 'version' in body: if 'version' in body:
@@ -113,10 +113,10 @@ class Client(client.HTTPClient):
return results return results
except KeyError: except KeyError:
raise exceptions.AuthorizationFailure() raise exceptions.AuthorizationFailure()
elif resp.status == 305: elif resp.status_code == 305:
return self._check_keystone_versions(resp['location']) return self._check_keystone_versions(resp['location'])
else: else:
raise exceptions.from_response(resp, body) raise exceptions.from_response(resp, resp.text)
except Exception as e: except Exception as e:
_logger.exception(e) _logger.exception(e)
@@ -145,7 +145,7 @@ class Client(client.HTTPClient):
resp, body = httpclient.request("%sextensions" % url, "GET", resp, body = httpclient.request("%sextensions" % url, "GET",
headers={'Accept': headers={'Accept':
'application/json'}) 'application/json'})
if resp.status in (200, 204): # in some cases we get No Content if resp.status_code in (200, 204): # some cases we get No Content
try: try:
results = {} results = {}
if 'extensions' in body: if 'extensions' in body:
@@ -171,10 +171,10 @@ class Client(client.HTTPClient):
return results return results
except KeyError: except KeyError:
raise exceptions.AuthorizationFailure() raise exceptions.AuthorizationFailure()
elif resp.status == 305: elif resp.status_code == 305:
return self._check_keystone_extensions(resp['location']) return self._check_keystone_extensions(resp['location'])
else: else:
raise exceptions.from_response(resp, body) raise exceptions.from_response(resp, resp.text)
except Exception as e: except Exception as e:
_logger.exception(e) _logger.exception(e)

View File

@@ -20,7 +20,6 @@ Command-line interface to the OpenStack Identity API.
import argparse import argparse
import getpass import getpass
import httplib2
import os import os
import sys import sys
@@ -153,10 +152,21 @@ class OpenStackIdentityShell(object):
parser.add_argument('--os-cacert', parser.add_argument('--os-cacert',
metavar='<ca-certificate>', metavar='<ca-certificate>',
default=env('OS_CACERT', default=None), default=env('OS_CACERT', default=None),
help='Defaults to env[OS_CACERT]') help='Specify a CA bundle file to use in '
'verifying a TLS (https) server certificate. '
'Defaults to env[OS_CACERT]')
parser.add_argument('--os_cacert', parser.add_argument('--os_cacert',
help=argparse.SUPPRESS) help=argparse.SUPPRESS)
parser.add_argument('--insecure',
default=False,
action="store_true",
help='Explicitly allow keystoneclient to perform '
'"insecure" TLS (https) requests. The '
'server\'s certificate will not be verified '
'against any certificate authorities. This '
'option should be used with caution.')
parser.add_argument('--os-cert', parser.add_argument('--os-cert',
metavar='<certificate>', metavar='<certificate>',
default=env('OS_CERT'), default=env('OS_CERT'),
@@ -171,15 +181,6 @@ class OpenStackIdentityShell(object):
parser.add_argument('--os_key', parser.add_argument('--os_key',
help=argparse.SUPPRESS) help=argparse.SUPPRESS)
parser.add_argument('--insecure',
default=False,
action="store_true",
help='Explicitly allow keystoneclient to perform '
'"insecure" SSL (https) requests. The '
'server\'s certificate will not be verified '
'against any certificate authorities. This '
'option should be used with caution.')
parser.add_argument('--os-cache', parser.add_argument('--os-cache',
default=env('OS_CACHE', default=False), default=env('OS_CACHE', default=False),
action='store_true', action='store_true',
@@ -297,10 +298,6 @@ class OpenStackIdentityShell(object):
# Parse args again and call whatever callback was selected # Parse args again and call whatever callback was selected
args = subcommand_parser.parse_args(argv) args = subcommand_parser.parse_args(argv)
# Deal with global arguments
if args.debug:
httplib2.debuglevel = 1
# Short-circuit and deal with help command right away. # Short-circuit and deal with help command right away.
if args.func == self.do_help: if args.func == self.do_help:
self.do_help(args) self.do_help(args)
@@ -472,8 +469,5 @@ def main():
OpenStackIdentityShell().main(sys.argv[1:]) OpenStackIdentityShell().main(sys.argv[1:])
except Exception as e: except Exception as e:
if httplib2.debuglevel == 1: print >> sys.stderr, e
raise # dump stack.
else:
print >> sys.stderr, e
sys.exit(1) sys.exit(1)

View File

@@ -1,21 +1,25 @@
import httplib2 import copy
import json import json
import mock import mock
import requests
from keystoneclient.v2_0 import client from keystoneclient.v2_0 import client
from tests import client_fixtures from tests import client_fixtures
from tests import utils from tests import utils
fake_response = httplib2.Response({"status": 200}) fake_response = utils.TestResponse({
fake_body = json.dumps(client_fixtures.PROJECT_SCOPED_TOKEN) "status_code": 200,
mock_request = mock.Mock(return_value=(fake_response, fake_body)) "text": json.dumps(client_fixtures.PROJECT_SCOPED_TOKEN)
})
mock_request = mock.Mock(return_value=(fake_response))
class KeystoneclientTest(utils.TestCase): class KeystoneclientTest(utils.TestCase):
def test_scoped_init(self): def test_scoped_init(self):
with mock.patch.object(httplib2.Http, "request", mock_request): with mock.patch.object(requests, "request", mock_request):
cl = client.Client(username='exampleuser', cl = client.Client(username='exampleuser',
password='password', password='password',
auth_url='http://somewhere/') auth_url='http://somewhere/')
@@ -23,7 +27,7 @@ class KeystoneclientTest(utils.TestCase):
self.assertTrue(cl.auth_ref.scoped) self.assertTrue(cl.auth_ref.scoped)
def test_auth_ref_load(self): def test_auth_ref_load(self):
with mock.patch.object(httplib2.Http, "request", mock_request): with mock.patch.object(requests, "request", mock_request):
cl = client.Client(username='exampleuser', cl = client.Client(username='exampleuser',
password='password', password='password',
auth_url='http://somewhere/') auth_url='http://somewhere/')

View File

@@ -1,14 +1,17 @@
import httplib2
import mock import mock
import requests
from keystoneclient import client from keystoneclient import client
from keystoneclient import exceptions from keystoneclient import exceptions
from tests import utils from tests import utils
fake_response = httplib2.Response({"status": 200}) FAKE_RESPONSE = utils.TestResponse({
fake_body = '{"hi": "there"}' "status_code": 200,
mock_request = mock.Mock(return_value=(fake_response, fake_body)) "text": '{"hi": "there"}',
})
MOCK_REQUEST = mock.Mock(return_value=(FAKE_RESPONSE))
def get_client(): def get_client():
@@ -36,39 +39,47 @@ class ClientTest(utils.TestCase):
def test_get(self): def test_get(self):
cl = get_authed_client() cl = get_authed_client()
with mock.patch.object(httplib2.Http, "request", mock_request): with mock.patch.object(requests, "request", MOCK_REQUEST):
with mock.patch('time.time', mock.Mock(return_value=1234)): with mock.patch('time.time', mock.Mock(return_value=1234)):
resp, body = cl.get("/hi") resp, body = cl.get("/hi")
headers = {"X-Auth-Token": "token", headers = {"X-Auth-Token": "token",
"User-Agent": cl.USER_AGENT} "User-Agent": cl.USER_AGENT}
mock_request.assert_called_with("http://127.0.0.1:5000/hi", MOCK_REQUEST.assert_called_with(
"GET", headers=headers) "GET",
"http://127.0.0.1:5000/hi",
headers=headers,
**self.TEST_REQUEST_BASE)
# Automatic JSON parsing # Automatic JSON parsing
self.assertEqual(body, {"hi": "there"}) self.assertEqual(body, {"hi": "there"})
def test_get_error(self): def test_get_error(self):
cl = get_authed_client() cl = get_authed_client()
fake_err_response = httplib2.Response({"status": 400}) fake_err_response = utils.TestResponse({
fake_err_body = 'Some evil plaintext string' "status_code": 400,
err_mock_request = mock.Mock(return_value=(fake_err_response, "text": 'Some evil plaintext string',
fake_err_body)) })
err_MOCK_REQUEST = mock.Mock(return_value=(fake_err_response))
with mock.patch.object(httplib2.Http, "request", err_mock_request): with mock.patch.object(requests, "request", err_MOCK_REQUEST):
self.assertRaises(exceptions.BadRequest, cl.get, '/hi') self.assertRaises(exceptions.BadRequest, cl.get, '/hi')
def test_post(self): def test_post(self):
cl = get_authed_client() cl = get_authed_client()
with mock.patch.object(httplib2.Http, "request", mock_request): with mock.patch.object(requests, "request", MOCK_REQUEST):
cl.post("/hi", body=[1, 2, 3]) cl.post("/hi", body=[1, 2, 3])
headers = { headers = {
"X-Auth-Token": "token", "X-Auth-Token": "token",
"Content-Type": "application/json", "Content-Type": "application/json",
"User-Agent": cl.USER_AGENT "User-Agent": cl.USER_AGENT
} }
mock_request.assert_called_with("http://127.0.0.1:5000/hi", "POST", MOCK_REQUEST.assert_called_with(
headers=headers, body='[1, 2, 3]') "POST",
"http://127.0.0.1:5000/hi",
headers=headers,
data='[1, 2, 3]',
**self.TEST_REQUEST_BASE)
def test_forwarded_for(self): def test_forwarded_for(self):
ORIGINAL_IP = "10.100.100.1" ORIGINAL_IP = "10.100.100.1"
@@ -76,10 +87,10 @@ class ClientTest(utils.TestCase):
tenant_id="tenant", auth_url="auth_test", tenant_id="tenant", auth_url="auth_test",
original_ip=ORIGINAL_IP) original_ip=ORIGINAL_IP)
with mock.patch.object(httplib2.Http, "request", mock_request): with mock.patch.object(requests, "request", MOCK_REQUEST):
res = cl.request('/', 'GET') res = cl.request('/', 'GET')
args, kwargs = mock_request.call_args args, kwargs = MOCK_REQUEST.call_args
self.assertIn( self.assertIn(
('Forwarded', "for=%s;by=%s" % (ORIGINAL_IP, cl.USER_AGENT)), ('Forwarded', "for=%s;by=%s" % (ORIGINAL_IP, cl.USER_AGENT)),
kwargs['headers'].items()) kwargs['headers'].items())

View File

@@ -1,13 +1,17 @@
import httplib2 import copy
import mock import mock
import requests
from keystoneclient import client from keystoneclient import client
from tests import utils from tests import utils
FAKE_RESPONSE = httplib2.Response({"status": 200}) FAKE_RESPONSE = utils.TestResponse({
FAKE_BODY = '{"hi": "there"}' "status_code": 200,
MOCK_REQUEST = mock.Mock(return_value=(FAKE_RESPONSE, FAKE_BODY)) "text": '{"hi": "there"}',
})
MOCK_REQUEST = mock.Mock(return_value=(FAKE_RESPONSE))
def get_client(): def get_client():
@@ -29,26 +33,38 @@ class ClientTest(utils.TestCase):
def test_get(self): def test_get(self):
cl = get_authed_client() cl = get_authed_client()
with mock.patch.object(httplib2.Http, "request", MOCK_REQUEST): with mock.patch.object(requests, "request", MOCK_REQUEST):
with mock.patch('time.time', mock.Mock(return_value=1234)): with mock.patch('time.time', mock.Mock(return_value=1234)):
resp, body = cl.get("/hi") resp, body = cl.get("/hi")
headers = {"X-Auth-Token": "token", headers = {"X-Auth-Token": "token",
"User-Agent": cl.USER_AGENT} "User-Agent": cl.USER_AGENT}
MOCK_REQUEST.assert_called_with("https://127.0.0.1:5000/hi", kwargs = copy.copy(self.TEST_REQUEST_BASE)
"GET", headers=headers) kwargs['cert'] = ('cert.pem', 'key.pem')
kwargs['verify'] = 'ca.pem'
MOCK_REQUEST.assert_called_with(
"GET",
"https://127.0.0.1:5000/hi",
headers=headers,
**kwargs)
# Automatic JSON parsing # Automatic JSON parsing
self.assertEqual(body, {"hi": "there"}) self.assertEqual(body, {"hi": "there"})
def test_post(self): def test_post(self):
cl = get_authed_client() cl = get_authed_client()
with mock.patch.object(httplib2.Http, "request", MOCK_REQUEST): with mock.patch.object(requests, "request", MOCK_REQUEST):
cl.post("/hi", body=[1, 2, 3]) cl.post("/hi", body=[1, 2, 3])
headers = { headers = {
"X-Auth-Token": "token", "X-Auth-Token": "token",
"Content-Type": "application/json", "Content-Type": "application/json",
"User-Agent": cl.USER_AGENT "User-Agent": cl.USER_AGENT
} }
MOCK_REQUEST.assert_called_with("https://127.0.0.1:5000/hi", kwargs = copy.copy(self.TEST_REQUEST_BASE)
"POST", headers=headers, kwargs['cert'] = ('cert.pem', 'key.pem')
body='[1, 2, 3]') kwargs['verify'] = 'ca.pem'
MOCK_REQUEST.assert_called_with(
"POST",
"https://127.0.0.1:5000/hi",
headers=headers,
data='[1, 2, 3]',
**kwargs)

View File

@@ -1,6 +1,7 @@
import os import os
import mock import mock
import httplib2
import requests
from keystoneclient import shell as openstack_shell from keystoneclient import shell as openstack_shell
from keystoneclient.v2_0 import shell as shell_v2_0 from keystoneclient.v2_0 import shell as shell_v2_0
@@ -42,11 +43,6 @@ class ShellTest(utils.TestCase):
def test_help_unknown_command(self): def test_help_unknown_command(self):
self.assertRaises(exceptions.CommandError, shell, 'help foofoo') self.assertRaises(exceptions.CommandError, shell, 'help foofoo')
def test_debug(self):
httplib2.debuglevel = 0
shell('--debug help')
assert httplib2.debuglevel == 1
def test_shell_args(self): def test_shell_args(self):
do_tenant_mock = mock.MagicMock() do_tenant_mock = mock.MagicMock()
with mock.patch('keystoneclient.v2_0.shell.do_user_list', with mock.patch('keystoneclient.v2_0.shell.do_user_list',

View File

@@ -1,7 +1,7 @@
import time import time
import httplib2
import mox import mox
import requests
import unittest2 as unittest import unittest2 as unittest
from keystoneclient.v2_0 import client from keystoneclient.v2_0 import client
@@ -16,6 +16,10 @@ class TestCase(unittest.TestCase):
TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v2.0') TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v2.0')
TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/' TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/'
TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v2.0') TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v2.0')
TEST_REQUEST_BASE = {
'config': {'danger_mode': False},
'verify': True,
}
TEST_SERVICE_CATALOG = [{ TEST_SERVICE_CATALOG = [{
"endpoints": [{ "endpoints": [{
@@ -69,7 +73,7 @@ class TestCase(unittest.TestCase):
self.mox = mox.Mox() self.mox = mox.Mox()
self._original_time = time.time self._original_time = time.time
time.time = lambda: 1234 time.time = lambda: 1234
httplib2.Http.request = self.mox.CreateMockAnything() requests.request = self.mox.CreateMockAnything()
self.client = client.Client(username=self.TEST_USER, self.client = client.Client(username=self.TEST_USER,
token=self.TEST_TOKEN, token=self.TEST_TOKEN,
tenant_name=self.TEST_TENANT_NAME, tenant_name=self.TEST_TENANT_NAME,
@@ -89,16 +93,43 @@ class UnauthenticatedTestCase(unittest.TestCase):
TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v2.0') TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v2.0')
TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/' TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/'
TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v2.0') TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v2.0')
TEST_REQUEST_BASE = {
'config': {'danger_mode': False},
'verify': True,
}
def setUp(self): def setUp(self):
super(UnauthenticatedTestCase, self).setUp() super(UnauthenticatedTestCase, self).setUp()
self.mox = mox.Mox() self.mox = mox.Mox()
self._original_time = time.time self._original_time = time.time
time.time = lambda: 1234 time.time = lambda: 1234
httplib2.Http.request = self.mox.CreateMockAnything() requests.request = self.mox.CreateMockAnything()
def tearDown(self): def tearDown(self):
time.time = self._original_time time.time = self._original_time
super(UnauthenticatedTestCase, self).tearDown() super(UnauthenticatedTestCase, self).tearDown()
self.mox.UnsetStubs() self.mox.UnsetStubs()
self.mox.VerifyAll() self.mox.VerifyAll()
class TestResponse(requests.Response):
""" Class used to wrap requests.Response and provide some
convenience to initialize with a dict """
def __init__(self, data):
self._text = None
super(TestResponse, self)
if isinstance(data, dict):
self.status_code = data.get('status_code', None)
self.headers = data.get('headers', None)
# Fake the text attribute to streamline Response creation
self._text = data.get('text', None)
else:
self.status_code = data
def __eq__(self, other):
return self.__dict__ == other.__dict__
@property
def text(self):
return self._text

View File

@@ -1,23 +1,13 @@
import httplib2 import copy
import json import json
import requests
from keystoneclient.v2_0 import client from keystoneclient.v2_0 import client
from keystoneclient import exceptions from keystoneclient import exceptions
from tests import utils from tests import utils
def to_http_response(resp_dict):
"""
Utility function to convert a python dictionary
(e.g. {'status':status, 'body': body, 'headers':headers}
to an httplib2 response.
"""
resp = httplib2.Response(resp_dict)
for k, v in resp_dict['headers'].items():
resp[k] = v
return resp
class AuthenticateAgainstKeystoneTests(utils.TestCase): class AuthenticateAgainstKeystoneTests(utils.TestCase):
def setUp(self): def setUp(self):
super(AuthenticateAgainstKeystoneTests, self).setUp() super(AuthenticateAgainstKeystoneTests, self).setUp()
@@ -55,9 +45,9 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
_cred = 'passwordCredentials' _cred = 'passwordCredentials'
_pass = 'password' _pass = 'password'
self.TEST_REQUEST_BODY[_auth][_cred][_pass] = 'bad_key' self.TEST_REQUEST_BODY[_auth][_cred][_pass] = 'bad_key'
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 401, "status_code": 401,
"body": json.dumps({ "text": json.dumps({
"unauthorized": { "unauthorized": {
"message": "Unauthorized", "message": "Unauthorized",
"code": "401", "code": "401",
@@ -65,11 +55,12 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
}), }),
}) })
httplib2.Http.request(self.TEST_URL + "/tokens", kwargs = copy.copy(self.TEST_REQUEST_BASE)
'POST', kwargs['headers'] = self.TEST_REQUEST_HEADERS
body=json.dumps(self.TEST_REQUEST_BODY), kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY)
headers=self.TEST_REQUEST_HEADERS) \ requests.request('POST',
.AndReturn((resp, resp['body'])) self.TEST_URL + "/tokens",
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
# Workaround for issue with assertRaises on python2.6 # Workaround for issue with assertRaises on python2.6
@@ -90,28 +81,30 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
"headers": { "headers": {
'location': self.TEST_ADMIN_URL + "/tokens", 'location': self.TEST_ADMIN_URL + "/tokens",
}, },
"status": 305, "status_code": 305,
"body": "Use proxy", "text": "Use proxy",
}, },
{ {
"headers": {}, "headers": {},
"status": 200, "status_code": 200,
"body": correct_response, "text": correct_response,
}, },
] ]
responses = [(to_http_response(resp), resp['body']) responses = [(utils.TestResponse(resp))
for resp in dict_responses] for resp in dict_responses]
httplib2.Http.request(self.TEST_URL + "/tokens", kwargs = copy.copy(self.TEST_REQUEST_BASE)
'POST', kwargs['headers'] = self.TEST_REQUEST_HEADERS
body=json.dumps(self.TEST_REQUEST_BODY), kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY)
headers=self.TEST_REQUEST_HEADERS) \ requests.request('POST',
.AndReturn(responses[0]) self.TEST_URL + "/tokens",
httplib2.Http.request(self.TEST_ADMIN_URL + "/tokens", **kwargs).AndReturn(responses[0])
'POST', kwargs = copy.copy(self.TEST_REQUEST_BASE)
body=json.dumps(self.TEST_REQUEST_BODY), kwargs['headers'] = self.TEST_REQUEST_HEADERS
headers=self.TEST_REQUEST_HEADERS) \ kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY)
.AndReturn(responses[1]) requests.request('POST',
self.TEST_ADMIN_URL + "/tokens",
**kwargs).AndReturn(responses[1])
self.mox.ReplayAll() self.mox.ReplayAll()
cs = client.Client(username=self.TEST_USER, cs = client.Client(username=self.TEST_USER,
@@ -126,16 +119,17 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
self.TEST_RESPONSE_DICT["access"]["token"]["id"]) self.TEST_RESPONSE_DICT["access"]["token"]["id"])
def test_authenticate_success_password_scoped(self): def test_authenticate_success_password_scoped(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_RESPONSE_DICT), "text": json.dumps(self.TEST_RESPONSE_DICT),
}) })
httplib2.Http.request(self.TEST_URL + "/tokens", kwargs = copy.copy(self.TEST_REQUEST_BASE)
'POST', kwargs['headers'] = self.TEST_REQUEST_HEADERS
body=json.dumps(self.TEST_REQUEST_BODY), kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY)
headers=self.TEST_REQUEST_HEADERS) \ requests.request('POST',
.AndReturn((resp, resp['body'])) self.TEST_URL + "/tokens",
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
cs = client.Client(username=self.TEST_USER, cs = client.Client(username=self.TEST_USER,
@@ -151,16 +145,17 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
def test_authenticate_success_password_unscoped(self): def test_authenticate_success_password_unscoped(self):
del self.TEST_RESPONSE_DICT['access']['serviceCatalog'] del self.TEST_RESPONSE_DICT['access']['serviceCatalog']
del self.TEST_REQUEST_BODY['auth']['tenantId'] del self.TEST_REQUEST_BODY['auth']['tenantId']
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_RESPONSE_DICT), "text": json.dumps(self.TEST_RESPONSE_DICT),
}) })
httplib2.Http.request(self.TEST_URL + "/tokens", kwargs = copy.copy(self.TEST_REQUEST_BASE)
'POST', kwargs['headers'] = self.TEST_REQUEST_HEADERS
body=json.dumps(self.TEST_REQUEST_BODY), kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY)
headers=self.TEST_REQUEST_HEADERS) \ requests.request('POST',
.AndReturn((resp, resp['body'])) self.TEST_URL + "/tokens",
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
cs = client.Client(username=self.TEST_USER, cs = client.Client(username=self.TEST_USER,
@@ -174,16 +169,17 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
del self.TEST_REQUEST_BODY['auth']['passwordCredentials'] del self.TEST_REQUEST_BODY['auth']['passwordCredentials']
self.TEST_REQUEST_BODY['auth']['token'] = {'id': self.TEST_TOKEN} self.TEST_REQUEST_BODY['auth']['token'] = {'id': self.TEST_TOKEN}
self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_RESPONSE_DICT), "text": json.dumps(self.TEST_RESPONSE_DICT),
}) })
httplib2.Http.request(self.TEST_URL + "/tokens", kwargs = copy.copy(self.TEST_REQUEST_BASE)
'POST', kwargs['headers'] = self.TEST_REQUEST_HEADERS
body=json.dumps(self.TEST_REQUEST_BODY), kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY)
headers=self.TEST_REQUEST_HEADERS) \ requests.request('POST',
.AndReturn((resp, resp['body'])) self.TEST_URL + "/tokens",
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
cs = client.Client(token=self.TEST_TOKEN, cs = client.Client(token=self.TEST_TOKEN,
@@ -201,16 +197,17 @@ class AuthenticateAgainstKeystoneTests(utils.TestCase):
del self.TEST_RESPONSE_DICT['access']['serviceCatalog'] del self.TEST_RESPONSE_DICT['access']['serviceCatalog']
self.TEST_REQUEST_BODY['auth']['token'] = {'id': self.TEST_TOKEN} self.TEST_REQUEST_BODY['auth']['token'] = {'id': self.TEST_TOKEN}
self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_RESPONSE_DICT), "text": json.dumps(self.TEST_RESPONSE_DICT),
}) })
httplib2.Http.request(self.TEST_URL + "/tokens", kwargs = copy.copy(self.TEST_REQUEST_BASE)
'POST', kwargs['headers'] = self.TEST_REQUEST_HEADERS
body=json.dumps(self.TEST_REQUEST_BODY), kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY)
headers=self.TEST_REQUEST_HEADERS) \ requests.request('POST',
.AndReturn((resp, resp['body'])) self.TEST_URL + "/tokens",
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
cs = client.Client(token=self.TEST_TOKEN, cs = client.Client(token=self.TEST_TOKEN,

View File

@@ -1,22 +1,11 @@
import httplib2 import copy
import json import json
import requests
from keystoneclient.generic import client from keystoneclient.generic import client
from tests import utils from tests import utils
def to_http_response(resp_dict):
"""
Utility function to convert a python dictionary
(e.g. {'status':status, 'body': body, 'headers':headers}
to an httplib2 response.
"""
resp = httplib2.Response(resp_dict)
for k, v in resp_dict['headers'].items():
resp[k] = v
return resp
class DiscoverKeystoneTests(utils.UnauthenticatedTestCase): class DiscoverKeystoneTests(utils.UnauthenticatedTestCase):
def setUp(self): def setUp(self):
super(DiscoverKeystoneTests, self).setUp() super(DiscoverKeystoneTests, self).setUp()
@@ -58,15 +47,16 @@ class DiscoverKeystoneTests(utils.UnauthenticatedTestCase):
} }
def test_get_versions(self): def test_get_versions(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_RESPONSE_DICT), "text": json.dumps(self.TEST_RESPONSE_DICT),
}) })
httplib2.Http.request(self.TEST_ROOT_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'GET', kwargs['headers'] = self.TEST_REQUEST_HEADERS
headers=self.TEST_REQUEST_HEADERS) \ requests.request('GET',
.AndReturn((resp, resp['body'])) self.TEST_ROOT_URL,
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
cs = client.Client() cs = client.Client()
@@ -80,15 +70,15 @@ class DiscoverKeystoneTests(utils.UnauthenticatedTestCase):
['href']) ['href'])
def test_get_version_local(self): def test_get_version_local(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_RESPONSE_DICT), "text": json.dumps(self.TEST_RESPONSE_DICT),
}) })
kwargs = copy.copy(self.TEST_REQUEST_BASE)
httplib2.Http.request("http://localhost:35357", kwargs['headers'] = self.TEST_REQUEST_HEADERS
'GET', requests.request('GET',
headers=self.TEST_REQUEST_HEADERS) \ "http://localhost:35357",
.AndReturn((resp, resp['body'])) **kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
cs = client.Client() cs = client.Client()

View File

@@ -1,7 +1,8 @@
import copy
import urlparse import urlparse
import json import json
import httplib2 import requests
from keystoneclient.v2_0 import ec2 from keystoneclient.v2_0 import ec2
from tests import utils from tests import utils
@@ -35,18 +36,19 @@ class EC2Tests(utils.TestCase):
"enabled": True, "enabled": True,
} }
} }
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(resp_body), "text": json.dumps(resp_body),
}) })
url = urlparse.urljoin(self.TEST_URL, url = urlparse.urljoin(self.TEST_URL,
'v2.0/users/%s/credentials/OS-EC2' % user_id) 'v2.0/users/%s/credentials/OS-EC2' % user_id)
httplib2.Http.request(url, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'POST', kwargs['headers'] = self.TEST_POST_HEADERS
body=json.dumps(req_body), kwargs['data'] = json.dumps(req_body)
headers=self.TEST_POST_HEADERS) \ requests.request('POST',
.AndReturn((resp, resp['body'])) url,
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
cred = self.client.ec2.create(user_id, tenant_id) cred = self.client.ec2.create(user_id, tenant_id)
@@ -68,18 +70,19 @@ class EC2Tests(utils.TestCase):
"enabled": True, "enabled": True,
} }
} }
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(resp_body), "text": json.dumps(resp_body),
}) })
url = urlparse.urljoin(self.TEST_URL, url = urlparse.urljoin(self.TEST_URL,
'v2.0/users/%s/credentials/OS-EC2/%s' % 'v2.0/users/%s/credentials/OS-EC2/%s' %
(user_id, 'access')) (user_id, 'access'))
httplib2.Http.request(url, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'GET', kwargs['headers'] = self.TEST_REQUEST_HEADERS
headers=self.TEST_REQUEST_HEADERS) \ requests.request('GET',
.AndReturn((resp, resp['body'])) url,
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
cred = self.client.ec2.get(user_id, 'access') cred = self.client.ec2.get(user_id, 'access')
@@ -113,17 +116,18 @@ class EC2Tests(utils.TestCase):
} }
} }
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(resp_body), "text": json.dumps(resp_body),
}) })
url = urlparse.urljoin(self.TEST_URL, url = urlparse.urljoin(self.TEST_URL,
'v2.0/users/%s/credentials/OS-EC2' % user_id) 'v2.0/users/%s/credentials/OS-EC2' % user_id)
httplib2.Http.request(url, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'GET', kwargs['headers'] = self.TEST_REQUEST_HEADERS
headers=self.TEST_REQUEST_HEADERS) \ requests.request('GET',
.AndReturn((resp, resp['body'])) url,
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
creds = self.client.ec2.list(user_id) creds = self.client.ec2.list(user_id)
@@ -138,18 +142,19 @@ class EC2Tests(utils.TestCase):
def test_delete(self): def test_delete(self):
user_id = 'usr' user_id = 'usr'
access = 'access' access = 'access'
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 204, "status_code": 204,
"body": "", "text": "",
}) })
url = urlparse.urljoin(self.TEST_URL, url = urlparse.urljoin(self.TEST_URL,
'v2.0/users/%s/credentials/OS-EC2/%s' % 'v2.0/users/%s/credentials/OS-EC2/%s' %
(user_id, access)) (user_id, access))
httplib2.Http.request(url, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'DELETE', kwargs['headers'] = self.TEST_REQUEST_HEADERS
headers=self.TEST_REQUEST_HEADERS) \ requests.request('DELETE',
.AndReturn((resp, resp['body'])) url,
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
self.client.ec2.delete(user_id, access) self.client.ec2.delete(user_id, access)

View File

@@ -1,7 +1,8 @@
import copy
import urlparse import urlparse
import json import json
import httplib2 import requests
from keystoneclient.v2_0 import endpoints from keystoneclient.v2_0 import endpoints
from tests import utils from tests import utils
@@ -59,17 +60,18 @@ class EndpointTests(utils.TestCase):
} }
} }
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(resp_body), "text": json.dumps(resp_body),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/endpoints'), kwargs['headers'] = self.TEST_POST_HEADERS
'POST', kwargs['data'] = json.dumps(req_body)
body=json.dumps(req_body), requests.request('POST',
headers=self.TEST_POST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/endpoints'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
endpoint = self.client.endpoints.create( endpoint = self.client.endpoints.create(
@@ -82,30 +84,32 @@ class EndpointTests(utils.TestCase):
self.assertTrue(isinstance(endpoint, endpoints.Endpoint)) self.assertTrue(isinstance(endpoint, endpoints.Endpoint))
def test_delete(self): def test_delete(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 204, "status_code": 204,
"body": "", "text": "",
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/endpoints/8f953'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'DELETE', requests.request('DELETE',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/endpoints/8f953'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
self.client.endpoints.delete('8f953') self.client.endpoints.delete('8f953')
def test_list(self): def test_list(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_ENDPOINTS), "text": json.dumps(self.TEST_ENDPOINTS),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/endpoints'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'GET', requests.request('GET',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/endpoints'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
endpoint_list = self.client.endpoints.list() endpoint_list = self.client.endpoints.list()

View File

@@ -1,7 +1,8 @@
import copy
import urlparse import urlparse
import json import json
import httplib2 import requests
from keystoneclient.v2_0 import roles from keystoneclient.v2_0 import roles
from tests import utils from tests import utils
@@ -46,17 +47,18 @@ class RoleTests(utils.TestCase):
"id": 3, "id": 3,
} }
} }
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(resp_body), "text": json.dumps(resp_body),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/OS-KSADM/roles'), kwargs['headers'] = self.TEST_POST_HEADERS
'POST', kwargs['data'] = json.dumps(req_body)
body=json.dumps(req_body), requests.request('POST',
headers=self.TEST_POST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/OS-KSADM/roles'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
role = self.client.roles.create(req_body['role']['name']) role = self.client.roles.create(req_body['role']['name'])
@@ -65,31 +67,35 @@ class RoleTests(utils.TestCase):
self.assertEqual(role.name, req_body['role']['name']) self.assertEqual(role.name, req_body['role']['name'])
def test_delete(self): def test_delete(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 204, "status_code": 204,
"body": "", "text": "",
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
'v2.0/OS-KSADM/roles/1'), kwargs = copy.copy(self.TEST_REQUEST_BASE)
'DELETE', kwargs['headers'] = self.TEST_REQUEST_HEADERS
headers=self.TEST_REQUEST_HEADERS) \ requests.request('DELETE',
.AndReturn((resp, resp['body'])) urlparse.urljoin(self.TEST_URL,
'v2.0/OS-KSADM/roles/1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
self.client.roles.delete(1) self.client.roles.delete(1)
def test_get(self): def test_get(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps({ "text": json.dumps({
'role': self.TEST_ROLES['roles']['values'][0], 'role': self.TEST_ROLES['roles']['values'][0],
}), }),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
'v2.0/OS-KSADM/roles/1'), kwargs = copy.copy(self.TEST_REQUEST_BASE)
'GET', kwargs['headers'] = self.TEST_REQUEST_HEADERS
headers=self.TEST_REQUEST_HEADERS) \ requests.request('GET',
.AndReturn((resp, resp['body'])) urlparse.urljoin(self.TEST_URL,
'v2.0/OS-KSADM/roles/1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
role = self.client.roles.get(1) role = self.client.roles.get(1)
@@ -98,109 +104,116 @@ class RoleTests(utils.TestCase):
self.assertEqual(role.name, 'admin') self.assertEqual(role.name, 'admin')
def test_list(self): def test_list(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_ROLES), "text": json.dumps(self.TEST_ROLES),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/OS-KSADM/roles'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'GET', requests.request('GET',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/OS-KSADM/roles'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
role_list = self.client.roles.list() role_list = self.client.roles.list()
[self.assertTrue(isinstance(r, roles.Role)) for r in role_list] [self.assertTrue(isinstance(r, roles.Role)) for r in role_list]
def test_roles_for_user(self): def test_roles_for_user(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_ROLES), "text": json.dumps(self.TEST_ROLES),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/users/foo/roles'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'GET', requests.request('GET',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/users/foo/roles'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
role_list = self.client.roles.roles_for_user('foo') role_list = self.client.roles.roles_for_user('foo')
[self.assertTrue(isinstance(r, roles.Role)) for r in role_list] [self.assertTrue(isinstance(r, roles.Role)) for r in role_list]
def test_roles_for_user_tenant(self): def test_roles_for_user_tenant(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_ROLES), "text": json.dumps(self.TEST_ROLES),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/tenants/barrr/users/foo/roles'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'GET', requests.request('GET',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/tenants/barrr/users/foo/roles'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
role_list = self.client.roles.roles_for_user('foo', 'barrr') role_list = self.client.roles.roles_for_user('foo', 'barrr')
[self.assertTrue(isinstance(r, roles.Role)) for r in role_list] [self.assertTrue(isinstance(r, roles.Role)) for r in role_list]
def test_add_user_role(self): def test_add_user_role(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 204, "status_code": 204,
"body": '', "text": '',
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/users/foo/roles/OS-KSADM/barrr'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'PUT', requests.request('PUT',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/users/foo/roles/OS-KSADM/barrr'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
self.client.roles.add_user_role('foo', 'barrr') self.client.roles.add_user_role('foo', 'barrr')
def test_add_user_role_tenant(self): def test_add_user_role_tenant(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 204, "status_code": 204,
"body": '', "text": '',
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'PUT', requests.request('PUT',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
self.client.roles.add_user_role('foo', 'barrr', '4') self.client.roles.add_user_role('foo', 'barrr', '4')
def test_remove_user_role(self): def test_remove_user_role(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 204, "status_code": 204,
"body": '', "text": '',
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/users/foo/roles/OS-KSADM/barrr'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'DELETE', requests.request('DELETE',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/users/foo/roles/OS-KSADM/barrr'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
self.client.roles.remove_user_role('foo', 'barrr') self.client.roles.remove_user_role('foo', 'barrr')
def test_remove_user_role_tenant(self): def test_remove_user_role_tenant(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 204, "status_code": 204,
"body": '', "text": '',
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'DELETE', requests.request('DELETE',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
self.client.roles.remove_user_role('foo', 'barrr', '4') self.client.roles.remove_user_role('foo', 'barrr', '4')

View File

@@ -1,7 +1,8 @@
import copy
import urlparse import urlparse
import json import json
import httplib2 import requests
from keystoneclient.v2_0 import services from keystoneclient.v2_0 import services
from tests import utils from tests import utils
@@ -54,17 +55,18 @@ class ServiceTests(utils.TestCase):
"id": 3, "id": 3,
} }
} }
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(resp_body), "text": json.dumps(resp_body),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/OS-KSADM/services'), kwargs['headers'] = self.TEST_POST_HEADERS
'POST', kwargs['data'] = json.dumps(req_body)
body=json.dumps(req_body), requests.request('POST',
headers=self.TEST_POST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/OS-KSADM/services'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
service = self.client.services.create( service = self.client.services.create(
@@ -76,30 +78,34 @@ class ServiceTests(utils.TestCase):
self.assertEqual(service.name, req_body['OS-KSADM:service']['name']) self.assertEqual(service.name, req_body['OS-KSADM:service']['name'])
def test_delete(self): def test_delete(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": "", "text": "",
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
'v2.0/OS-KSADM/services/1'), kwargs = copy.copy(self.TEST_REQUEST_BASE)
'DELETE', kwargs['headers'] = self.TEST_REQUEST_HEADERS
headers=self.TEST_REQUEST_HEADERS) \ requests.request('DELETE',
.AndReturn((resp, resp['body'])) urlparse.urljoin(self.TEST_URL,
'v2.0/OS-KSADM/services/1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
self.client.services.delete(1) self.client.services.delete(1)
def test_get(self): def test_get(self):
test_services = self.TEST_SERVICES['OS-KSADM:services']['values'][0] test_services = self.TEST_SERVICES['OS-KSADM:services']['values'][0]
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps({'OS-KSADM:service': test_services}), "text": json.dumps({'OS-KSADM:service': test_services}),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
'v2.0/OS-KSADM/services/1'), kwargs = copy.copy(self.TEST_REQUEST_BASE)
'GET', kwargs['headers'] = self.TEST_REQUEST_HEADERS
headers=self.TEST_REQUEST_HEADERS) \ requests.request('GET',
.AndReturn((resp, resp['body'])) urlparse.urljoin(self.TEST_URL,
'v2.0/OS-KSADM/services/1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
service = self.client.services.get(1) service = self.client.services.get(1)
@@ -109,16 +115,17 @@ class ServiceTests(utils.TestCase):
self.assertEqual(service.type, 'compute') self.assertEqual(service.type, 'compute')
def test_list(self): def test_list(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_SERVICES), "text": json.dumps(self.TEST_SERVICES),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/OS-KSADM/services'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'GET', requests.request('GET',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/OS-KSADM/services'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
service_list = self.client.services.list() service_list = self.client.services.list()

View File

@@ -1,7 +1,8 @@
import copy
import urlparse import urlparse
import json import json
import httplib2 import requests
from keystoneclient.v2_0 import tenants from keystoneclient.v2_0 import tenants
from tests import utils from tests import utils
@@ -61,16 +62,17 @@ class TenantTests(utils.TestCase):
"description": "Like tenant 9, but better.", "description": "Like tenant 9, but better.",
} }
} }
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(resp_body), "text": json.dumps(resp_body),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, 'v2.0/tenants'), kwargs = copy.copy(self.TEST_REQUEST_BASE)
'POST', kwargs['headers'] = self.TEST_POST_HEADERS
body=json.dumps(req_body), kwargs['data'] = json.dumps(req_body)
headers=self.TEST_POST_HEADERS) \ requests.request('POST',
.AndReturn((resp, resp['body'])) urlparse.urljoin(self.TEST_URL, 'v2.0/tenants'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
tenant = self.client.tenants.create(req_body['tenant']['name'], tenant = self.client.tenants.create(req_body['tenant']['name'],
@@ -82,31 +84,33 @@ class TenantTests(utils.TestCase):
self.assertEqual(tenant.description, "Like tenant 9, but better.") self.assertEqual(tenant.description, "Like tenant 9, but better.")
def test_delete(self): def test_delete(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 204, "status_code": 204,
"body": "", "text": "",
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
'v2.0/tenants/1'), kwargs = copy.copy(self.TEST_REQUEST_BASE)
'DELETE', kwargs['headers'] = self.TEST_REQUEST_HEADERS
headers=self.TEST_REQUEST_HEADERS) \ requests.request('DELETE',
.AndReturn((resp, resp['body'])) urlparse.urljoin(self.TEST_URL, 'v2.0/tenants/1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
self.client.tenants.delete(1) self.client.tenants.delete(1)
def test_get(self): def test_get(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps({ "text": json.dumps({
'tenant': self.TEST_TENANTS['tenants']['values'][2], 'tenant': self.TEST_TENANTS['tenants']['values'][2],
}), }),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
'v2.0/tenants/1'), kwargs = copy.copy(self.TEST_REQUEST_BASE)
'GET', kwargs['headers'] = self.TEST_REQUEST_HEADERS
headers=self.TEST_REQUEST_HEADERS) \ requests.request('GET',
.AndReturn((resp, resp['body'])) urlparse.urljoin(self.TEST_URL, 'v2.0/tenants/1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
t = self.client.tenants.get(1) t = self.client.tenants.get(1)
@@ -115,64 +119,67 @@ class TenantTests(utils.TestCase):
self.assertEqual(t.name, 'admin') self.assertEqual(t.name, 'admin')
def test_list(self): def test_list(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_TENANTS), "text": json.dumps(self.TEST_TENANTS),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/tenants'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'GET', requests.request('GET',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL, 'v2.0/tenants'),
.AndReturn((resp, resp['body'])) **kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
tenant_list = self.client.tenants.list() tenant_list = self.client.tenants.list()
[self.assertTrue(isinstance(t, tenants.Tenant)) for t in tenant_list] [self.assertTrue(isinstance(t, tenants.Tenant)) for t in tenant_list]
def test_list_limit(self): def test_list_limit(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_TENANTS), "text": json.dumps(self.TEST_TENANTS),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/tenants?limit=1'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'GET', requests.request('GET',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/tenants?limit=1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
tenant_list = self.client.tenants.list(limit=1) tenant_list = self.client.tenants.list(limit=1)
[self.assertTrue(isinstance(t, tenants.Tenant)) for t in tenant_list] [self.assertTrue(isinstance(t, tenants.Tenant)) for t in tenant_list]
def test_list_marker(self): def test_list_marker(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_TENANTS), "text": json.dumps(self.TEST_TENANTS),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/tenants?marker=1'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'GET', requests.request('GET',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/tenants?marker=1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
tenant_list = self.client.tenants.list(marker=1) tenant_list = self.client.tenants.list(marker=1)
[self.assertTrue(isinstance(t, tenants.Tenant)) for t in tenant_list] [self.assertTrue(isinstance(t, tenants.Tenant)) for t in tenant_list]
def test_list_limit_marker(self): def test_list_limit_marker(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_TENANTS), "text": json.dumps(self.TEST_TENANTS),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/tenants?marker=1&limit=1'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'GET', requests.request('GET',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/tenants?marker=1&limit=1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
tenant_list = self.client.tenants.list(limit=1, marker=1) tenant_list = self.client.tenants.list(limit=1, marker=1)
@@ -195,17 +202,18 @@ class TenantTests(utils.TestCase):
"description": "I changed you!", "description": "I changed you!",
}, },
} }
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(resp_body), "text": json.dumps(resp_body),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/tenants/4'), kwargs['headers'] = self.TEST_POST_HEADERS
'POST', kwargs['data'] = json.dumps(req_body)
body=json.dumps(req_body), requests.request('POST',
headers=self.TEST_POST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/tenants/4'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
tenant = self.client.tenants.update(req_body['tenant']['id'], tenant = self.client.tenants.update(req_body['tenant']['id'],
@@ -235,17 +243,18 @@ class TenantTests(utils.TestCase):
"description": "", "description": "",
}, },
} }
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(resp_body), "text": json.dumps(resp_body),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/tenants/4'), kwargs['headers'] = self.TEST_POST_HEADERS
'POST', kwargs['data'] = json.dumps(req_body)
body=json.dumps(req_body), requests.request('POST',
headers=self.TEST_POST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/tenants/4'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
tenant = self.client.tenants.update(req_body['tenant']['id'], tenant = self.client.tenants.update(req_body['tenant']['id'],
@@ -259,31 +268,33 @@ class TenantTests(utils.TestCase):
self.assertFalse(tenant.enabled) self.assertFalse(tenant.enabled)
def test_add_user(self): def test_add_user(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 204, "status_code": 204,
"body": '', "text": '',
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'PUT', requests.request('PUT',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
self.client.tenants.add_user('4', 'foo', 'barrr') self.client.tenants.add_user('4', 'foo', 'barrr')
def test_remove_user(self): def test_remove_user(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 204, "status_code": 204,
"body": '', "text": '',
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'DELETE', requests.request('DELETE',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
self.client.tenants.remove_user('4', 'foo', 'barrr') self.client.tenants.remove_user('4', 'foo', 'barrr')
@@ -297,16 +308,17 @@ class TenantTests(utils.TestCase):
"enabled": False, "enabled": False,
}, },
} }
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 204, "status_code": 204,
"body": '', "text": '',
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'PUT', requests.request('PUT',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
# make tenant object with manager # make tenant object with manager
@@ -324,16 +336,17 @@ class TenantTests(utils.TestCase):
"enabled": False, "enabled": False,
}, },
} }
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 204, "status_code": 204,
"body": '', "text": '',
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'DELETE', requests.request('DELETE',
headers=self.TEST_REQUEST_HEADERS) \ urlparse.urljoin(self.TEST_URL,
.AndReturn((resp, resp['body'])) 'v2.0/tenants/4/users/foo/roles/OS-KSADM/barrr'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
# make tenant object with manager # make tenant object with manager

View File

@@ -1,6 +1,7 @@
import copy
import urlparse import urlparse
import httplib2 import requests
from tests import utils from tests import utils
@@ -17,15 +18,16 @@ class TokenTests(utils.TestCase):
'User-Agent': 'python-keystoneclient'} 'User-Agent': 'python-keystoneclient'}
def test_delete(self): def test_delete(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 204, "status_code": 204,
"body": ""}) "text": ""})
req = httplib2.Http.request( kwargs = copy.copy(self.TEST_REQUEST_BASE)
urlparse.urljoin(self.TEST_URL, 'v2.0/tokens/1'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
requests.request(
'DELETE', 'DELETE',
headers=self.TEST_REQUEST_HEADERS) urlparse.urljoin(self.TEST_URL, 'v2.0/tokens/1'),
req.AndReturn((resp, resp['body'])) **kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()

View File

@@ -1,7 +1,8 @@
import copy
import urlparse import urlparse
import json import json
import httplib2 import requests
from keystoneclient.v2_0 import users from keystoneclient.v2_0 import users
from tests import utils from tests import utils
@@ -58,16 +59,18 @@ class UserTests(utils.TestCase):
"email": "test@example.com", "email": "test@example.com",
} }
} }
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(resp_body), "text": json.dumps(resp_body),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, 'v2.0/users'), kwargs = copy.copy(self.TEST_REQUEST_BASE)
'POST', kwargs['headers'] = self.TEST_POST_HEADERS
body=json.dumps(req_body), kwargs['data'] = json.dumps(req_body)
headers=self.TEST_POST_HEADERS) \ requests.request(
.AndReturn((resp, resp['body'])) 'POST',
urlparse.urljoin(self.TEST_URL, 'v2.0/users'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
user = self.client.users.create(req_body['user']['name'], user = self.client.users.create(req_body['user']['name'],
@@ -81,30 +84,35 @@ class UserTests(utils.TestCase):
self.assertEqual(user.email, "test@example.com") self.assertEqual(user.email, "test@example.com")
def test_delete(self): def test_delete(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 204, "status_code": 204,
"body": "", "text": "",
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, 'v2.0/users/1'),
'DELETE', kwargs = copy.copy(self.TEST_REQUEST_BASE)
headers=self.TEST_REQUEST_HEADERS) \ kwargs['headers'] = self.TEST_REQUEST_HEADERS
.AndReturn((resp, resp['body'])) requests.request(
'DELETE',
urlparse.urljoin(self.TEST_URL, 'v2.0/users/1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
self.client.users.delete(1) self.client.users.delete(1)
def test_get(self): def test_get(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps({ "text": json.dumps({
'user': self.TEST_USERS['users']['values'][0], 'user': self.TEST_USERS['users']['values'][0],
}) })
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
'v2.0/users/1'), kwargs = copy.copy(self.TEST_REQUEST_BASE)
'GET', kwargs['headers'] = self.TEST_REQUEST_HEADERS
headers=self.TEST_REQUEST_HEADERS) \ requests.request(
.AndReturn((resp, resp['body'])) 'GET',
urlparse.urljoin(self.TEST_URL, 'v2.0/users/1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
u = self.client.users.get(1) u = self.client.users.get(1)
@@ -113,105 +121,146 @@ class UserTests(utils.TestCase):
self.assertEqual(u.name, 'admin') self.assertEqual(u.name, 'admin')
def test_list(self): def test_list(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_USERS), "text": json.dumps(self.TEST_USERS),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/users'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'GET', requests.request(
headers=self.TEST_REQUEST_HEADERS) \ 'GET',
.AndReturn((resp, resp['body'])) urlparse.urljoin(self.TEST_URL, 'v2.0/users'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
user_list = self.client.users.list() user_list = self.client.users.list()
[self.assertTrue(isinstance(u, users.User)) for u in user_list] [self.assertTrue(isinstance(u, users.User)) for u in user_list]
def test_list_limit(self): def test_list_limit(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_USERS), "text": json.dumps(self.TEST_USERS),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/users?limit=1'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'GET', requests.request(
headers=self.TEST_REQUEST_HEADERS) \ 'GET',
.AndReturn((resp, resp['body'])) urlparse.urljoin(self.TEST_URL, 'v2.0/users?limit=1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
user_list = self.client.users.list(limit=1) user_list = self.client.users.list(limit=1)
[self.assertTrue(isinstance(u, users.User)) for u in user_list] [self.assertTrue(isinstance(u, users.User)) for u in user_list]
def test_list_marker(self): def test_list_marker(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_USERS), "text": json.dumps(self.TEST_USERS),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/users?marker=1'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'GET', requests.request(
headers=self.TEST_REQUEST_HEADERS) \ 'GET',
.AndReturn((resp, resp['body'])) urlparse.urljoin(self.TEST_URL, 'v2.0/users?marker=1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
user_list = self.client.users.list(marker=1) user_list = self.client.users.list(marker=1)
[self.assertTrue(isinstance(u, users.User)) for u in user_list] [self.assertTrue(isinstance(u, users.User)) for u in user_list]
def test_list_limit_marker(self): def test_list_limit_marker(self):
resp = httplib2.Response({ resp = utils.TestResponse({
"status": 200, "status_code": 200,
"body": json.dumps(self.TEST_USERS), "text": json.dumps(self.TEST_USERS),
}) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, kwargs = copy.copy(self.TEST_REQUEST_BASE)
'v2.0/users?marker=1&limit=1'), kwargs['headers'] = self.TEST_REQUEST_HEADERS
'GET', requests.request(
headers=self.TEST_REQUEST_HEADERS) \ 'GET',
.AndReturn((resp, resp['body'])) urlparse.urljoin(self.TEST_URL, 'v2.0/users?marker=1&limit=1'),
**kwargs).AndReturn((resp))
self.mox.ReplayAll() self.mox.ReplayAll()
user_list = self.client.users.list(limit=1, marker=1) user_list = self.client.users.list(limit=1, marker=1)
[self.assertTrue(isinstance(u, users.User)) for u in user_list] [self.assertTrue(isinstance(u, users.User)) for u in user_list]
def test_update(self): def test_update(self):
req_1 = {"user": {"password": "swordfish", "id": 2}} req_1 = {
req_2 = {"user": {"id": 2, "user": {
"email": "gabriel@example.com", "id": 2,
"name": "gabriel"}} "email": "gabriel@example.com",
req_3 = {"user": {"tenantId": 1, "id": 2}} "name": "gabriel",
req_4 = {"user": {"enabled": False, "id": 2}} }
# Keystone basically echoes these back... including the password :-/ }
resp_1 = httplib2.Response({"status": 200, "body": json.dumps(req_1)}) req_2 = {
resp_2 = httplib2.Response({"status": 200, "body": json.dumps(req_2)}) "user": {
resp_3 = httplib2.Response({"status": 200, "body": json.dumps(req_3)}) "id": 2,
resp_4 = httplib2.Response({"status": 200, "body": json.dumps(req_3)}) "password": "swordfish",
}
}
req_3 = {
"user": {
"id": 2,
"tenantId": 1,
}
}
req_4 = {
"user": {
"id": 2,
"enabled": False,
}
}
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, 'v2.0/users/2'), # Keystone basically echoes these back... including the password :-/
'PUT', resp_1 = utils.TestResponse({
body=json.dumps(req_2), "status_code": 200,
headers=self.TEST_POST_HEADERS) \ "text": json.dumps(req_1)
.AndReturn((resp_2, resp_2['body'])) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, resp_2 = utils.TestResponse({
'v2.0/users/2/OS-KSADM/password'), "status_code": 200,
'PUT', "text": json.dumps(req_2)
body=json.dumps(req_1), })
headers=self.TEST_POST_HEADERS) \ resp_3 = utils.TestResponse({
.AndReturn((resp_1, resp_1['body'])) "status_code": 200,
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, "text": json.dumps(req_3)
'v2.0/users/2/OS-KSADM/tenant'), })
'PUT', resp_4 = utils.TestResponse({
body=json.dumps(req_3), "status_code": 200,
headers=self.TEST_POST_HEADERS) \ "text": json.dumps(req_4)
.AndReturn((resp_3, resp_3['body'])) })
httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
'v2.0/users/2/OS-KSADM/enabled'), kwargs = copy.copy(self.TEST_REQUEST_BASE)
'PUT', kwargs['headers'] = self.TEST_POST_HEADERS
body=json.dumps(req_4), kwargs['data'] = json.dumps(req_1)
headers=self.TEST_POST_HEADERS) \ requests.request(
.AndReturn((resp_4, resp_4['body'])) 'PUT',
urlparse.urljoin(self.TEST_URL, 'v2.0/users/2'),
**kwargs).AndReturn((resp_1))
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_POST_HEADERS
kwargs['data'] = json.dumps(req_2)
requests.request(
'PUT',
urlparse.urljoin(self.TEST_URL, 'v2.0/users/2/OS-KSADM/password'),
**kwargs).AndReturn((resp_2))
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_POST_HEADERS
kwargs['data'] = json.dumps(req_3)
requests.request(
'PUT',
urlparse.urljoin(self.TEST_URL, 'v2.0/users/2/OS-KSADM/tenant'),
**kwargs).AndReturn((resp_3))
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.TEST_POST_HEADERS
kwargs['data'] = json.dumps(req_4)
requests.request(
'PUT',
urlparse.urljoin(self.TEST_URL, 'v2.0/users/2/OS-KSADM/enabled'),
**kwargs).AndReturn((resp_4))
self.mox.ReplayAll() self.mox.ReplayAll()
user = self.client.users.update(2, user = self.client.users.update(2,
@@ -222,17 +271,26 @@ class UserTests(utils.TestCase):
user = self.client.users.update_enabled(2, False) user = self.client.users.update_enabled(2, False)
def test_update_own_password(self): def test_update_own_password(self):
req_1 = {'user': {'password': 'ABCD', 'original_password': 'DCBA'}} req_body = {
'user': {
'password': 'ABCD', 'original_password': 'DCBA'
}
}
resp_body = {
'access': {}
}
resp = utils.TestResponse({
"status_code": 200,
"text": json.dumps(resp_body)
})
resp_1 = {'access': {}} kwargs = copy.copy(self.TEST_REQUEST_BASE)
resp_1 = httplib2.Response({'status': 200, 'body': json.dumps(resp_1)}) kwargs['headers'] = self.TEST_POST_HEADERS
kwargs['data'] = json.dumps(req_body)
httplib2.Http.request(urlparse.urljoin(self.TEST_URL, requests.request(
'v2.0/OS-KSCRUD/users/123'), 'PATCH',
'PATCH', urlparse.urljoin(self.TEST_URL, 'v2.0/OS-KSCRUD/users/123'),
body=json.dumps(req_1), **kwargs).AndReturn((resp))
headers=self.TEST_POST_HEADERS) \
.AndReturn((resp_1, resp_1['body']))
self.mox.ReplayAll() self.mox.ReplayAll()

View File

@@ -1,7 +1,9 @@
import httplib2 import copy
import urlparse import urlparse
import uuid import uuid
import requests
from keystoneclient.v3 import projects from keystoneclient.v3 import projects
from tests.v3 import utils from tests.v3 import utils
@@ -26,19 +28,20 @@ class ProjectTests(utils.TestCase, utils.CrudTests):
ref_list = [self.new_ref(), self.new_ref()] ref_list = [self.new_ref(), self.new_ref()]
user_id = uuid.uuid4().hex user_id = uuid.uuid4().hex
resp = httplib2.Response({ resp = utils.TestResponse({
'status': 200, "status-code": 200,
'body': self.serialize(ref_list), "text": self.serialize(ref_list),
}) })
method = 'GET' method = 'GET'
httplib2.Http.request( kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.headers[method]
requests.request(
method,
urlparse.urljoin( urlparse.urljoin(
self.TEST_URL, self.TEST_URL,
'v3/users/%s/%s' % (user_id, self.collection_key)), 'v3/users/%s/%s' % (user_id, self.collection_key)),
method, **kwargs).AndReturn((resp))
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll() self.mox.ReplayAll()
returned_list = self.manager.list(user=user_id) returned_list = self.manager.list(user=user_id)
@@ -49,19 +52,20 @@ class ProjectTests(utils.TestCase, utils.CrudTests):
ref_list = [self.new_ref(), self.new_ref()] ref_list = [self.new_ref(), self.new_ref()]
domain_id = uuid.uuid4().hex domain_id = uuid.uuid4().hex
resp = httplib2.Response({ resp = utils.TestResponse({
'status': 200, "status_code": 200,
'body': self.serialize(ref_list), "text": self.serialize(ref_list),
}) })
method = 'GET' method = 'GET'
httplib2.Http.request( kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.headers[method]
requests.request(
method,
urlparse.urljoin( urlparse.urljoin(
self.TEST_URL, self.TEST_URL,
'v3/%s?domain_id=%s' % (self.collection_key, domain_id)), 'v3/%s?domain_id=%s' % (self.collection_key, domain_id)),
method, **kwargs).AndReturn((resp))
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll() self.mox.ReplayAll()
returned_list = self.manager.list(domain=domain_id) returned_list = self.manager.list(domain=domain_id)

View File

@@ -1,7 +1,9 @@
import httplib2 import copy
import urlparse import urlparse
import uuid import uuid
import requests
from keystoneclient import exceptions from keystoneclient import exceptions
from keystoneclient.v3 import roles from keystoneclient.v3 import roles
from tests.v3 import utils from tests.v3 import utils
@@ -25,20 +27,21 @@ class RoleTests(utils.TestCase, utils.CrudTests):
user_id = uuid.uuid4().hex user_id = uuid.uuid4().hex
domain_id = uuid.uuid4().hex domain_id = uuid.uuid4().hex
ref = self.new_ref() ref = self.new_ref()
resp = httplib2.Response({ resp = utils.TestResponse({
'status': 201, "status_code": 201,
'body': '', "text": '',
}) })
method = 'PUT' method = 'PUT'
httplib2.Http.request( kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.headers[method]
requests.request(
method,
urlparse.urljoin( urlparse.urljoin(
self.TEST_URL, self.TEST_URL,
'v3/domains/%s/users/%s/%s/%s' % ( 'v3/domains/%s/users/%s/%s/%s' % (
domain_id, user_id, self.collection_key, ref['id'])), domain_id, user_id, self.collection_key, ref['id'])),
method, **kwargs).AndReturn((resp))
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll() self.mox.ReplayAll()
self.manager.grant(role=ref['id'], domain=domain_id, user=user_id) self.manager.grant(role=ref['id'], domain=domain_id, user=user_id)
@@ -47,20 +50,21 @@ class RoleTests(utils.TestCase, utils.CrudTests):
user_id = uuid.uuid4().hex user_id = uuid.uuid4().hex
domain_id = uuid.uuid4().hex domain_id = uuid.uuid4().hex
ref_list = [self.new_ref(), self.new_ref()] ref_list = [self.new_ref(), self.new_ref()]
resp = httplib2.Response({ resp = utils.TestResponse({
'status': 200, "status_code": 200,
'body': self.serialize(ref_list), "text": self.serialize(ref_list),
}) })
method = 'GET' method = 'GET'
httplib2.Http.request( kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.headers[method]
requests.request(
method,
urlparse.urljoin( urlparse.urljoin(
self.TEST_URL, self.TEST_URL,
'v3/domains/%s/users/%s/%s' % ( 'v3/domains/%s/users/%s/%s' % (
domain_id, user_id, self.collection_key)), domain_id, user_id, self.collection_key)),
method, **kwargs).AndReturn((resp))
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll() self.mox.ReplayAll()
self.manager.list(domain=domain_id, user=user_id) self.manager.list(domain=domain_id, user=user_id)
@@ -69,20 +73,21 @@ class RoleTests(utils.TestCase, utils.CrudTests):
user_id = uuid.uuid4().hex user_id = uuid.uuid4().hex
domain_id = uuid.uuid4().hex domain_id = uuid.uuid4().hex
ref = self.new_ref() ref = self.new_ref()
resp = httplib2.Response({ resp = utils.TestResponse({
'status': 200, "status_code": 200,
'body': '', "text": '',
}) })
method = 'HEAD' method = 'HEAD'
httplib2.Http.request( kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.headers[method]
requests.request(
method,
urlparse.urljoin( urlparse.urljoin(
self.TEST_URL, self.TEST_URL,
'v3/domains/%s/users/%s/%s/%s' % ( 'v3/domains/%s/users/%s/%s/%s' % (
domain_id, user_id, self.collection_key, ref['id'])), domain_id, user_id, self.collection_key, ref['id'])),
method, **kwargs).AndReturn((resp))
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll() self.mox.ReplayAll()
self.manager.check(role=ref['id'], domain=domain_id, user=user_id) self.manager.check(role=ref['id'], domain=domain_id, user=user_id)
@@ -91,20 +96,21 @@ class RoleTests(utils.TestCase, utils.CrudTests):
user_id = uuid.uuid4().hex user_id = uuid.uuid4().hex
domain_id = uuid.uuid4().hex domain_id = uuid.uuid4().hex
ref = self.new_ref() ref = self.new_ref()
resp = httplib2.Response({ resp = utils.TestResponse({
'status': 204, "status_code": 204,
'body': '', "text": '',
}) })
method = 'DELETE' method = 'DELETE'
httplib2.Http.request( kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.headers[method]
requests.request(
method,
urlparse.urljoin( urlparse.urljoin(
self.TEST_URL, self.TEST_URL,
'v3/domains/%s/users/%s/%s/%s' % ( 'v3/domains/%s/users/%s/%s/%s' % (
domain_id, user_id, self.collection_key, ref['id'])), domain_id, user_id, self.collection_key, ref['id'])),
method, **kwargs).AndReturn((resp))
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll() self.mox.ReplayAll()
self.manager.revoke(role=ref['id'], domain=domain_id, user=user_id) self.manager.revoke(role=ref['id'], domain=domain_id, user=user_id)
@@ -113,20 +119,21 @@ class RoleTests(utils.TestCase, utils.CrudTests):
user_id = uuid.uuid4().hex user_id = uuid.uuid4().hex
project_id = uuid.uuid4().hex project_id = uuid.uuid4().hex
ref = self.new_ref() ref = self.new_ref()
resp = httplib2.Response({ resp = utils.TestResponse({
'status': 201, "status_code": 201,
'body': '', "text": '',
}) })
method = 'PUT' method = 'PUT'
httplib2.Http.request( kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.headers[method]
requests.request(
method,
urlparse.urljoin( urlparse.urljoin(
self.TEST_URL, self.TEST_URL,
'v3/projects/%s/users/%s/%s/%s' % ( 'v3/projects/%s/users/%s/%s/%s' % (
project_id, user_id, self.collection_key, ref['id'])), project_id, user_id, self.collection_key, ref['id'])),
method, **kwargs).AndReturn((resp))
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll() self.mox.ReplayAll()
self.manager.grant(role=ref['id'], project=project_id, user=user_id) self.manager.grant(role=ref['id'], project=project_id, user=user_id)
@@ -135,20 +142,21 @@ class RoleTests(utils.TestCase, utils.CrudTests):
user_id = uuid.uuid4().hex user_id = uuid.uuid4().hex
project_id = uuid.uuid4().hex project_id = uuid.uuid4().hex
ref_list = [self.new_ref(), self.new_ref()] ref_list = [self.new_ref(), self.new_ref()]
resp = httplib2.Response({ resp = utils.TestResponse({
'status': 200, "status_code": 200,
'body': self.serialize(ref_list), "text": self.serialize(ref_list),
}) })
method = 'GET' method = 'GET'
httplib2.Http.request( kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.headers[method]
requests.request(
method,
urlparse.urljoin( urlparse.urljoin(
self.TEST_URL, self.TEST_URL,
'v3/projects/%s/users/%s/%s' % ( 'v3/projects/%s/users/%s/%s' % (
project_id, user_id, self.collection_key)), project_id, user_id, self.collection_key)),
method, **kwargs).AndReturn((resp))
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll() self.mox.ReplayAll()
self.manager.list(project=project_id, user=user_id) self.manager.list(project=project_id, user=user_id)
@@ -157,20 +165,21 @@ class RoleTests(utils.TestCase, utils.CrudTests):
user_id = uuid.uuid4().hex user_id = uuid.uuid4().hex
project_id = uuid.uuid4().hex project_id = uuid.uuid4().hex
ref = self.new_ref() ref = self.new_ref()
resp = httplib2.Response({ resp = utils.TestResponse({
'status': 200, "status_code": 200,
'body': '', "text": '',
}) })
method = 'HEAD' method = 'HEAD'
httplib2.Http.request( kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.headers[method]
requests.request(
method,
urlparse.urljoin( urlparse.urljoin(
self.TEST_URL, self.TEST_URL,
'v3/projects/%s/users/%s/%s/%s' % ( 'v3/projects/%s/users/%s/%s/%s' % (
project_id, user_id, self.collection_key, ref['id'])), project_id, user_id, self.collection_key, ref['id'])),
method, **kwargs).AndReturn((resp))
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll() self.mox.ReplayAll()
self.manager.check(role=ref['id'], project=project_id, user=user_id) self.manager.check(role=ref['id'], project=project_id, user=user_id)
@@ -179,20 +188,21 @@ class RoleTests(utils.TestCase, utils.CrudTests):
user_id = uuid.uuid4().hex user_id = uuid.uuid4().hex
project_id = uuid.uuid4().hex project_id = uuid.uuid4().hex
ref = self.new_ref() ref = self.new_ref()
resp = httplib2.Response({ resp = utils.TestResponse({
'status': 204, "status_code": 204,
'body': '', "text": '',
}) })
method = 'DELETE' method = 'DELETE'
httplib2.Http.request( kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.headers[method]
requests.request(
method,
urlparse.urljoin( urlparse.urljoin(
self.TEST_URL, self.TEST_URL,
'v3/projects/%s/users/%s/%s/%s' % ( 'v3/projects/%s/users/%s/%s/%s' % (
project_id, user_id, self.collection_key, ref['id'])), project_id, user_id, self.collection_key, ref['id'])),
method, **kwargs).AndReturn((resp))
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll() self.mox.ReplayAll()
self.manager.revoke(role=ref['id'], project=project_id, user=user_id) self.manager.revoke(role=ref['id'], project=project_id, user=user_id)

View File

@@ -1,10 +1,11 @@
import copy
import json import json
import uuid import uuid
import time import time
import urlparse import urlparse
import httplib2
import mox import mox
import requests
import unittest2 as unittest import unittest2 as unittest
from keystoneclient.v3 import client from keystoneclient.v3 import client
@@ -24,6 +25,12 @@ def parameterize(ref):
return params return params
class TestClient(client.Client):
def serialize(self, entity):
return json.dumps(entity, sort_keys=True)
class TestCase(unittest.TestCase): class TestCase(unittest.TestCase):
TEST_TENANT_NAME = 'aTenant' TEST_TENANT_NAME = 'aTenant'
TEST_TOKEN = 'aToken' TEST_TOKEN = 'aToken'
@@ -32,18 +39,22 @@ class TestCase(unittest.TestCase):
TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v3') TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v3')
TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/' TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/'
TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v3') TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v3')
TEST_REQUEST_BASE = {
'config': {'danger_mode': False},
'verify': True,
}
def setUp(self): def setUp(self):
super(TestCase, self).setUp() super(TestCase, self).setUp()
self.mox = mox.Mox() self.mox = mox.Mox()
self._original_time = time.time self._original_time = time.time
time.time = lambda: 1234 time.time = lambda: 1234
httplib2.Http.request = self.mox.CreateMockAnything() requests.request = self.mox.CreateMockAnything()
self.client = client.Client(username=self.TEST_USER, self.client = TestClient(username=self.TEST_USER,
token=self.TEST_TOKEN, token=self.TEST_TOKEN,
tenant_name=self.TEST_TENANT_NAME, tenant_name=self.TEST_TENANT_NAME,
auth_url=self.TEST_URL, auth_url=self.TEST_URL,
endpoint=self.TEST_URL) endpoint=self.TEST_URL)
def tearDown(self): def tearDown(self):
time.time = self._original_time time.time = self._original_time
@@ -58,13 +69,17 @@ class UnauthenticatedTestCase(unittest.TestCase):
TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v3') TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v3')
TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/' TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/'
TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v3') TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v3')
TEST_REQUEST_BASE = {
'config': {'danger_mode': False},
'verify': True,
}
def setUp(self): def setUp(self):
super(UnauthenticatedTestCase, self).setUp() super(UnauthenticatedTestCase, self).setUp()
self.mox = mox.Mox() self.mox = mox.Mox()
self._original_time = time.time self._original_time = time.time
time.time = lambda: 1234 time.time = lambda: 1234
httplib2.Http.request = self.mox.CreateMockAnything() requests.request = self.mox.CreateMockAnything()
def tearDown(self): def tearDown(self):
time.time = self._original_time time.time = self._original_time
@@ -107,22 +122,23 @@ class CrudTests(object):
def test_create(self, ref=None): def test_create(self, ref=None):
ref = ref or self.new_ref() ref = ref or self.new_ref()
resp = httplib2.Response({ resp = TestResponse({
'status': 201, "status_code": 201,
'body': self.serialize(ref), "text": self.serialize(ref),
}) })
method = 'POST' method = 'POST'
req_ref = ref.copy() req_ref = ref.copy()
req_ref.pop('id') req_ref.pop('id')
httplib2.Http.request( kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.headers[method]
kwargs['data'] = self.serialize(req_ref)
requests.request(
method,
urlparse.urljoin( urlparse.urljoin(
self.TEST_URL, self.TEST_URL,
'v3/%s' % self.collection_key), 'v3/%s' % self.collection_key),
method, **kwargs).AndReturn((resp))
body=self.serialize(req_ref),
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll() self.mox.ReplayAll()
returned = self.manager.create(**parameterize(req_ref)) returned = self.manager.create(**parameterize(req_ref))
@@ -135,18 +151,20 @@ class CrudTests(object):
def test_get(self, ref=None): def test_get(self, ref=None):
ref = ref or self.new_ref() ref = ref or self.new_ref()
resp = httplib2.Response({ resp = TestResponse({
'status': 200, "status_code": 200,
'body': self.serialize(ref), "text": self.serialize(ref),
}) })
method = 'GET' method = 'GET'
httplib2.Http.request( kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.headers[method]
requests.request(
method,
urlparse.urljoin( urlparse.urljoin(
self.TEST_URL, self.TEST_URL,
'v3/%s/%s' % (self.collection_key, ref['id'])), 'v3/%s/%s' % (self.collection_key, ref['id'])),
method, **kwargs).AndReturn((resp))
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll() self.mox.ReplayAll()
returned = self.manager.get(ref['id']) returned = self.manager.get(ref['id'])
@@ -159,20 +177,20 @@ class CrudTests(object):
def test_list(self, ref_list=None, expected_path=None, **filter_kwargs): def test_list(self, ref_list=None, expected_path=None, **filter_kwargs):
ref_list = ref_list or [self.new_ref(), self.new_ref()] ref_list = ref_list or [self.new_ref(), self.new_ref()]
resp = TestResponse({
resp = httplib2.Response({ "status_code": 200,
'status': 200, "text": self.serialize(ref_list),
'body': self.serialize(ref_list),
}) })
method = 'GET' method = 'GET'
httplib2.Http.request( kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.headers[method]
requests.request(
method,
urlparse.urljoin( urlparse.urljoin(
self.TEST_URL, self.TEST_URL,
expected_path or 'v3/%s' % self.collection_key), expected_path or 'v3/%s' % self.collection_key),
method, **kwargs).AndReturn((resp))
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll() self.mox.ReplayAll()
returned_list = self.manager.list(**filter_kwargs) returned_list = self.manager.list(**filter_kwargs)
@@ -183,21 +201,21 @@ class CrudTests(object):
ref = ref or self.new_ref() ref = ref or self.new_ref()
req_ref = ref.copy() req_ref = ref.copy()
del req_ref['id'] del req_ref['id']
resp = TestResponse({
resp = httplib2.Response({ "status_code": 200,
'status': 200, "text": self.serialize(ref),
'body': self.serialize(ref),
}) })
method = 'PATCH' method = 'PATCH'
httplib2.Http.request( kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.headers[method]
kwargs['data'] = self.serialize(req_ref)
requests.request(
method,
urlparse.urljoin( urlparse.urljoin(
self.TEST_URL, self.TEST_URL,
'v3/%s/%s' % (self.collection_key, ref['id'])), 'v3/%s/%s' % (self.collection_key, ref['id'])),
method, **kwargs).AndReturn((resp))
body=self.serialize(req_ref),
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll() self.mox.ReplayAll()
returned = self.manager.update(ref['id'], **parameterize(req_ref)) returned = self.manager.update(ref['id'], **parameterize(req_ref))
@@ -210,18 +228,43 @@ class CrudTests(object):
def test_delete(self, ref=None): def test_delete(self, ref=None):
ref = ref or self.new_ref() ref = ref or self.new_ref()
method = 'DELETE' resp = TestResponse({
resp = httplib2.Response({ "status_code": 204,
'status': 204, "text": '',
'body': '',
}) })
httplib2.Http.request(
method = 'DELETE'
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = self.headers[method]
requests.request(
method,
urlparse.urljoin( urlparse.urljoin(
self.TEST_URL, self.TEST_URL,
'v3/%s/%s' % (self.collection_key, ref['id'])), 'v3/%s/%s' % (self.collection_key, ref['id'])),
method, **kwargs).AndReturn((resp))
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll() self.mox.ReplayAll()
self.manager.delete(ref['id']) self.manager.delete(ref['id'])
class TestResponse(requests.Response):
""" Class used to wrap requests.Response and provide some
convenience to initialize with a dict """
def __init__(self, data):
self._text = None
super(TestResponse, self)
if isinstance(data, dict):
self.status_code = data.get('status_code', None)
self.headers = data.get('headers', None)
# Fake the text attribute to streamline Response creation
self._text = data.get('text', None)
else:
self.status_code = data
def __eq__(self, other):
return self.__dict__ == other.__dict__
@property
def text(self):
return self._text

View File

@@ -1,4 +1,4 @@
argparse argparse
httplib2>=0.7
prettytable prettytable
requests<1.0
simplejson simplejson