diff --git a/swiftclient/client.py b/swiftclient/client.py index 7ce72182..3bc9c756 100644 --- a/swiftclient/client.py +++ b/swiftclient/client.py @@ -285,6 +285,23 @@ def get_keystoneclient_2_0(auth_url, user, key, os_options, **kwargs): return get_auth_keystone(auth_url, user, key, os_options, **kwargs) +def _import_keystone_client(auth_version): + # the attempted imports are encapsulated in this function to allow + # mocking for tests + try: + if auth_version in AUTH_VERSIONS_V3: + from keystoneclient.v3 import client as ksclient + else: + from keystoneclient.v2_0 import client as ksclient + from keystoneclient import exceptions + return ksclient, exceptions + except ImportError: + sys.exit(''' +Auth versions 2.0 and 3 require python-keystoneclient, install it or use Auth +version 1.0 which requires ST_AUTH, ST_USER, and ST_KEY environment +variables to be set or overridden with -A, -U, or -K.''') + + def get_auth_keystone(auth_url, user, key, os_options, **kwargs): """ Authenticate against a keystone server. @@ -296,17 +313,7 @@ def get_auth_keystone(auth_url, user, key, os_options, **kwargs): auth_version = kwargs.get('auth_version', '2.0') debug = logger.isEnabledFor(logging.DEBUG) and True or False - try: - if auth_version in AUTH_VERSIONS_V3: - from keystoneclient.v3 import client as ksclient - else: - from keystoneclient.v2_0 import client as ksclient - from keystoneclient import exceptions - except ImportError: - sys.exit(''' -Auth versions 2.0 and 3 require python-keystoneclient, install it or use Auth -version 1.0 which requires ST_AUTH, ST_USER, and ST_KEY environment -variables to be set or overridden with -A, -U, or -K.''') + ksclient, exceptions = _import_keystone_client(auth_version) try: _ksclient = ksclient.Client( diff --git a/swiftclient/service.py b/swiftclient/service.py index fa39eccc..365485af 100644 --- a/swiftclient/service.py +++ b/swiftclient/service.py @@ -77,8 +77,10 @@ class SwiftError(Exception): def process_options(options): - if not (options['auth'] and options['user'] and options['key']): - # Use 2.0 auth if none of the old args are present + if (not (options.get('auth') and options.get('user') + and options.get('key')) + and options.get('auth_version') != '3'): + # Use keystone 2.0 auth if any of the old-style args are missing options['auth_version'] = '2.0' # Use new-style args if old ones not present @@ -91,8 +93,15 @@ def process_options(options): # Specific OpenStack options options['os_options'] = { + 'user_id': options['os_user_id'], + 'user_domain_id': options['os_user_domain_id'], + 'user_domain_name': options['os_user_domain_name'], 'tenant_id': options['os_tenant_id'], 'tenant_name': options['os_tenant_name'], + 'project_id': options['os_project_id'], + 'project_name': options['os_project_name'], + 'project_domain_id': options['os_project_domain_id'], + 'project_domain_name': options['os_project_domain_name'], 'service_type': options['os_service_type'], 'endpoint_type': options['os_endpoint_type'], 'auth_token': options['os_auth_token'], @@ -111,9 +120,16 @@ _default_global_options = { "key": environ.get('ST_KEY'), "retries": 5, "os_username": environ.get('OS_USERNAME'), + "os_user_id": environ.get('OS_USER_ID'), + "os_user_domain_name": environ.get('OS_USER_DOMAIN_NAME'), + "os_user_domain_id": environ.get('OS_USER_DOMAIN_ID'), "os_password": environ.get('OS_PASSWORD'), "os_tenant_id": environ.get('OS_TENANT_ID'), "os_tenant_name": environ.get('OS_TENANT_NAME'), + "os_project_name": environ.get('OS_PROJECT_NAME'), + "os_project_id": environ.get('OS_PROJECT_ID'), + "os_project_domain_name": environ.get('OS_PROJECT_DOMAIN_NAME'), + "os_project_domain_id": environ.get('OS_PROJECT_DOMAIN_ID'), "os_auth_url": environ.get('OS_AUTH_URL'), "os_auth_token": environ.get('OS_AUTH_TOKEN'), "os_storage_url": environ.get('OS_STORAGE_URL'), diff --git a/tests/unit/test_shell.py b/tests/unit/test_shell.py index 80d63d96..031b689f 100644 --- a/tests/unit/test_shell.py +++ b/tests/unit/test_shell.py @@ -21,10 +21,12 @@ import unittest import six import swiftclient +from swiftclient.service import SwiftError import swiftclient.shell import swiftclient.utils from os.path import basename, dirname +from tests.unit.test_swiftclient import MockHttpTest if six.PY2: BUILTIN_OPEN = '__builtin__.open' @@ -38,6 +40,40 @@ mocked_os_environ = { } +def _make_args(cmd, opts, os_opts, separator='-', flags=None, cmd_args=None): + """ + Construct command line arguments for given options. + """ + args = [""] + flags = flags or [] + for k, v in opts.items(): + arg = "--" + k.replace("_", "-") + args = args + [arg, v] + for k, v in os_opts.items(): + arg = "--os" + separator + k.replace("_", separator) + args = args + [arg, v] + for flag in flags: + args.append('--%s' % flag) + args = args + [cmd] + if cmd_args: + args = args + cmd_args + return args + + +def _make_env(opts, os_opts): + """ + Construct a dict of environment variables for given options. + """ + env = {} + for k, v in opts.items(): + key = 'ST_' + k.upper().replace('-', '_') + env[key] = v + for k, v in os_opts.items(): + key = 'OS_' + k.upper().replace('-', '_') + env[key] = v + return env + + @mock.patch.dict(os.environ, mocked_os_environ) class TestShell(unittest.TestCase): def __init__(self, *args, **kwargs): @@ -413,33 +449,6 @@ class TestParsing(unittest.TestCase): result[0], result[1] = swiftclient.shell.parse_args(parser, args) return fake_command - def _make_args(self, cmd, opts, os_opts, separator='-'): - """ - Construct command line arguments for given options. - """ - args = [""] - for k, v in opts.items(): - arg = "--" + k.replace("_", "-") - args = args + [arg, v] - for k, v in os_opts.items(): - arg = "--os" + separator + k.replace("_", separator) - args = args + [arg, v] - args = args + [cmd] - return args - - def _make_env(self, opts, os_opts): - """ - Construct a dict of environment variables for given options. - """ - env = {} - for k, v in opts.items(): - key = 'ST_' + k.upper() - env[key] = v - for k, v in os_opts.items(): - key = 'OS_' + k.upper() - env[key] = v - return env - def _verify_opts(self, actual_opts, opts, os_opts={}, os_opts_dict={}): """ Check parsed options are correct. @@ -502,7 +511,7 @@ class TestParsing(unittest.TestCase): # username with domain is sufficient in args because keystone will # assume user is in default domain - args = self._make_args("stat", opts, os_opts, '-') + args = _make_args("stat", opts, os_opts, '-') result = [None, None] fake_command = self._make_fake_command(result) with mock.patch('swiftclient.shell.st_stat', fake_command): @@ -516,7 +525,7 @@ class TestParsing(unittest.TestCase): all_os_opts = os_opts.copy() all_os_opts.update(os_opts_dict) - args = self._make_args("stat", opts, all_os_opts, '-') + args = _make_args("stat", opts, all_os_opts, '-') result = [None, None] fake_command = self._make_fake_command(result) with mock.patch('swiftclient.shell.st_stat', fake_command): @@ -528,7 +537,7 @@ class TestParsing(unittest.TestCase): os_opts_dict = {"storage_url": "http://example.com:8080/v1", "auth_token": "0123abcd"} - args = self._make_args("stat", opts, os_opts_dict, '-') + args = _make_args("stat", opts, os_opts_dict, '-') result = [None, None] fake_command = self._make_fake_command(result) with mock.patch('swiftclient.shell.st_stat', fake_command): @@ -558,7 +567,7 @@ class TestParsing(unittest.TestCase): all_os_opts.update(os_opts_dict) # check using hyphen separator - args = self._make_args("stat", opts, all_os_opts, '-') + args = _make_args("stat", opts, all_os_opts, '-') result = [None, None] fake_command = self._make_fake_command(result) with mock.patch('swiftclient.shell.st_stat', fake_command): @@ -566,7 +575,7 @@ class TestParsing(unittest.TestCase): self._verify_opts(result[0], opts, os_opts, os_opts_dict) # check using underscore separator - args = self._make_args("stat", opts, all_os_opts, '_') + args = _make_args("stat", opts, all_os_opts, '_') result = [None, None] fake_command = self._make_fake_command(result) with mock.patch('swiftclient.shell.st_stat', fake_command): @@ -574,8 +583,8 @@ class TestParsing(unittest.TestCase): self._verify_opts(result[0], opts, os_opts, os_opts_dict) # check using environment variables - args = self._make_args("stat", {}, {}) - env = self._make_env(opts, all_os_opts) + args = _make_args("stat", {}, {}) + env = _make_env(opts, all_os_opts) result = [None, None] fake_command = self._make_fake_command(result) with mock.patch.dict(os.environ, env): @@ -584,7 +593,7 @@ class TestParsing(unittest.TestCase): self._verify_opts(result[0], opts, os_opts, os_opts_dict) # check again using OS_AUTH_VERSION instead of ST_AUTH_VERSION - env = self._make_env({}, all_os_opts) + env = _make_env({}, all_os_opts) env.update({'OS_AUTH_VERSION': '3'}) result = [None, None] fake_command = self._make_fake_command(result) @@ -600,7 +609,7 @@ class TestParsing(unittest.TestCase): os_opts = {"password": "secret", "username": "user", "auth_url": "http://example.com:5000/v3"} - args = self._make_args("stat", opts, os_opts) + args = _make_args("stat", opts, os_opts) with mock.patch('swiftclient.shell.st_stat', fake_command): swiftclient.shell.main(args) self.assertEqual(['stat'], result[1]) @@ -613,37 +622,37 @@ class TestParsing(unittest.TestCase): opts = {"auth_version": "3"} os_opts = {"password": "secret", "auth_url": "http://example.com:5000/v3"} - args = self._make_args("stat", opts, os_opts) + args = _make_args("stat", opts, os_opts) self.assertRaises(SystemExit, swiftclient.shell.main, args) os_opts = {"username": "user", "auth_url": "http://example.com:5000/v3"} - args = self._make_args("stat", opts, os_opts) + args = _make_args("stat", opts, os_opts) self.assertRaises(SystemExit, swiftclient.shell.main, args) os_opts = {"username": "user", "password": "secret"} - args = self._make_args("stat", opts, os_opts) + args = _make_args("stat", opts, os_opts) self.assertRaises(SystemExit, swiftclient.shell.main, args) def test_insufficient_env_vars_v3(self): - args = self._make_args("stat", {}, {}) + args = _make_args("stat", {}, {}) opts = {"auth_version": "3"} os_opts = {"password": "secret", "auth_url": "http://example.com:5000/v3"} - env = self._make_env(opts, os_opts) + env = _make_env(opts, os_opts) with mock.patch.dict(os.environ, env): self.assertRaises(SystemExit, swiftclient.shell.main, args) os_opts = {"username": "user", "auth_url": "http://example.com:5000/v3"} - env = self._make_env(opts, os_opts) + env = _make_env(opts, os_opts) with mock.patch.dict(os.environ, env): self.assertRaises(SystemExit, swiftclient.shell.main, args) os_opts = {"username": "user", "password": "secret"} - env = self._make_env(opts, os_opts) + env = _make_env(opts, os_opts) with mock.patch.dict(os.environ, env): self.assertRaises(SystemExit, swiftclient.shell.main, args) @@ -651,7 +660,7 @@ class TestParsing(unittest.TestCase): # --help returns condensed help message opts = {"help": ""} os_opts = {} - args = self._make_args("stat", opts, os_opts) + args = _make_args("stat", opts, os_opts) mock_stdout = six.StringIO() with mock.patch('sys.stdout', mock_stdout): self.assertRaises(SystemExit, swiftclient.shell.main, args) @@ -665,7 +674,7 @@ class TestParsing(unittest.TestCase): # "password": "secret", # "username": "user", # "auth_url": "http://example.com:5000/v3"} - args = self._make_args("", opts, os_opts) + args = _make_args("", opts, os_opts) mock_stdout = six.StringIO() with mock.patch('sys.stdout', mock_stdout): self.assertRaises(SystemExit, swiftclient.shell.main, args) @@ -675,10 +684,272 @@ class TestParsing(unittest.TestCase): ## --os-help return os options help opts = {} - args = self._make_args("", opts, os_opts) + args = _make_args("", opts, os_opts) mock_stdout = six.StringIO() with mock.patch('sys.stdout', mock_stdout): self.assertRaises(SystemExit, swiftclient.shell.main, args) out = mock_stdout.getvalue() self.assertTrue(out.find('[--key <api_key>]') > 0) self.assertTrue(out.find('--os-username=<auth-user-name>') > 0) + + +class FakeKeystone(object): + ''' + Fake keystone client module. Returns given endpoint url and auth token. + ''' + def __init__(self, endpoint, token): + self.calls = [] + self.auth_version = None + self.endpoint = endpoint + self.token = token + + class _Client(): + def __init__(self, endpoint, token, **kwargs): + self.auth_token = token + self.endpoint = endpoint + self.service_catalog = self.ServiceCatalog(endpoint) + + class ServiceCatalog(object): + def __init__(self, endpoint): + self.calls = [] + self.endpoint_url = endpoint + + def url_for(self, **kwargs): + self.calls.append(kwargs) + return self.endpoint_url + + def Client(self, **kwargs): + self.calls.append(kwargs) + self.client = self._Client(endpoint=self.endpoint, token=self.token, + **kwargs) + return self.client + + class Unauthorized(Exception): + pass + + class AuthorizationFailure(Exception): + pass + + class EndpointNotFound(Exception): + pass + + +def _make_fake_import_keystone_client(fake_import): + def _fake_import_keystone_client(auth_version): + fake_import.auth_version = auth_version + return fake_import, fake_import + + return _fake_import_keystone_client + + +class TestKeystoneOptions(MockHttpTest): + """ + Tests to check that options are passed from the command line or + environment variables through to the keystone client interface. + """ + all_os_opts = {'password': 'secret', + 'username': 'user', + 'auth-url': 'http://example.com:5000/v3', + 'user-domain-name': 'userdomain', + 'user-id': 'userid', + 'user-domain-id': 'userdomainid', + 'tenant-name': 'tenantname', + 'tenant-id': 'tenantid', + 'project-name': 'projectname', + 'project-id': 'projectid', + 'project-domain-id': 'projectdomainid', + 'project-domain-name': 'projectdomain', + 'cacert': 'foo'} + catalog_opts = {'service-type': 'my-object-store', + 'endpoint-type': 'public', + 'region-name': 'my-region'} + flags = ['insecure', 'debug'] + + # options that are given default values in code if missing from CLI + defaults = {'auth-version': '2.0', + 'service-type': 'object-store', + 'endpoint-type': 'publicURL'} + + def _build_os_opts(self, keys): + os_opts = {} + for k in keys: + os_opts[k] = self.all_os_opts.get(k, self.catalog_opts.get(k)) + return os_opts + + def _test_options_passed_to_keystone(self, cmd, opts, os_opts, + flags=None, use_env=False, + cmd_args=None, no_auth=False): + flags = flags or [] + if use_env: + # set up fake environment variables and make a minimal command line + env = _make_env(opts, os_opts) + args = _make_args(cmd, {}, {}, separator='-', flags=flags, + cmd_args=cmd_args) + else: + # set up empty environment and make full command line + env = {} + args = _make_args(cmd, opts, os_opts, separator='-', flags=flags, + cmd_args=cmd_args) + ks_endpoint = 'http://example.com:8080/v1/AUTH_acc' + ks_token = 'fake_auth_token' + fake_ks = FakeKeystone(endpoint=ks_endpoint, token=ks_token) + # fake_conn will check that storage_url and auth_token are as expected + endpoint = os_opts.get('storage-url', ks_endpoint) + token = os_opts.get('auth-token', ks_token) + fake_conn = self.fake_http_connection(204, headers={}, + storage_url=endpoint, + auth_token=token) + + with mock.patch('swiftclient.client._import_keystone_client', + _make_fake_import_keystone_client(fake_ks)): + with mock.patch('swiftclient.client.http_connection', fake_conn): + with mock.patch.dict(os.environ, env, clear=True): + try: + swiftclient.shell.main(args) + except SystemExit as e: + self.fail('Unexpected SystemExit: %s' % e) + except SwiftError as err: + self.fail('Unexpected SwiftError: %s' % err) + + if no_auth: + # check that keystone client was not used and terminate tests + self.assertIsNone(getattr(fake_ks, 'auth_version')) + self.assertEqual(len(fake_ks.calls), 0) + return + + # check correct auth version was passed to _import_keystone_client + key = 'auth-version' + expected = opts.get(key, self.defaults.get(key)) + self.assertEqual(expected, fake_ks.auth_version) + + # check args passed to keystone Client __init__ + self.assertEqual(len(fake_ks.calls), 1) + actual_args = fake_ks.calls[0] + for key in self.all_os_opts.keys(): + expected = os_opts.get(key, self.defaults.get(key)) + key = key.replace('-', '_') + self.assertTrue(key in actual_args, + 'Expected key %s not found in args %s' + % (key, actual_args)) + self.assertEqual(expected, actual_args[key], + 'Expected %s for key %s, found %s' + % (expected, key, actual_args[key])) + for flag in flags: + self.assertTrue(flag in actual_args) + self.assertTrue(actual_args[flag]) + + # check args passed to ServiceCatalog.url_for() method + self.assertEqual(len(fake_ks.client.service_catalog.calls), 1) + actual_args = fake_ks.client.service_catalog.calls[0] + for key in self.catalog_opts.keys(): + expected = os_opts.get(key, self.defaults.get(key)) + key = key.replace('-', '_') + if key == 'region_name': + key = 'filter_value' + self.assertTrue(key in actual_args, + 'Expected key %s not found in args %s' + % (key, actual_args)) + self.assertEqual(expected, actual_args[key], + 'Expected %s for key %s, found %s' + % (expected, key, actual_args[key])) + key, v = 'attr', 'region' + self.assertTrue(key in actual_args, + 'Expected key %s not found in args %s' + % (key, actual_args)) + self.assertEqual(v, actual_args[key], + 'Expected %s for key %s, found %s' + % (v, key, actual_args[key])) + + def _test_options(self, opts, os_opts, flags=None, no_auth=False): + # repeat test for different commands using env and command line options + for cmd in ('stat', 'post'): + self._test_options_passed_to_keystone(cmd, opts, os_opts, + flags=flags, no_auth=no_auth) + self._test_options_passed_to_keystone(cmd, opts, os_opts, + flags=flags, use_env=True, + no_auth=no_auth) + + def test_all_args_passed_to_keystone(self): + # check that all possible command line args are passed to keystone + opts = {'auth-version': '3'} + os_opts = dict(self.all_os_opts) + os_opts.update(self.catalog_opts) + self._test_options(opts, os_opts, flags=self.flags) + + opts = {'auth-version': '2.0'} + self._test_options(opts, os_opts, flags=self.flags) + + opts = {} + self._test_options(opts, os_opts, flags=self.flags) + + def test_catalog_options_and_flags_not_required_v3(self): + # check that all possible command line args are passed to keystone + opts = {'auth-version': '3'} + os_opts = dict(self.all_os_opts) + self._test_options(opts, os_opts, flags=None) + + def test_ok_option_combinations_v3(self): + opts = {'auth-version': '3'} + keys = ('username', 'password', 'tenant-name', 'auth-url') + os_opts = self._build_os_opts(keys) + self._test_options(opts, os_opts) + + keys = ('user-id', 'password', 'tenant-name', 'auth-url') + os_opts = self._build_os_opts(keys) + self._test_options(opts, os_opts) + + keys = ('user-id', 'password', 'tenant-id', 'auth-url') + os_opts = self._build_os_opts(keys) + self._test_options(opts, os_opts) + + keys = ('user-id', 'password', 'project-name', 'auth-url') + os_opts = self._build_os_opts(keys) + self._test_options(opts, os_opts) + + keys = ('user-id', 'password', 'project-id', 'auth-url') + os_opts = self._build_os_opts(keys) + self._test_options(opts, os_opts) + + def test_ok_option_combinations_v2(self): + opts = {'auth-version': '2.0'} + keys = ('username', 'password', 'tenant-name', 'auth-url') + os_opts = self._build_os_opts(keys) + self._test_options(opts, os_opts) + + keys = ('username', 'password', 'tenant-id', 'auth-url') + os_opts = self._build_os_opts(keys) + self._test_options(opts, os_opts) + + # allow auth_version to default to 2.0 + opts = {} + keys = ('username', 'password', 'tenant-name', 'auth-url') + os_opts = self._build_os_opts(keys) + self._test_options(opts, os_opts) + + keys = ('username', 'password', 'tenant-id', 'auth-url') + os_opts = self._build_os_opts(keys) + self._test_options(opts, os_opts) + + def test_url_and_token_provided_on_command_line(self): + endpoint = 'http://alternate.com:8080/v1/AUTH_another' + token = 'alternate_auth_token' + os_opts = {'auth-token': token, + 'storage-url': endpoint} + opts = {'auth-version': '3'} + self._test_options(opts, os_opts, no_auth=True) + + opts = {'auth-version': '2.0'} + self._test_options(opts, os_opts, no_auth=True) + + def test_url_provided_on_command_line(self): + endpoint = 'http://alternate.com:8080/v1/AUTH_another' + os_opts = {'username': 'username', + 'password': 'password', + 'project-name': 'projectname', + 'auth-url': 'http://example.com:5000/v3', + 'storage-url': endpoint} + opts = {'auth-version': '3'} + self._test_options(opts, os_opts) + + opts = {'auth-version': '2.0'} + self._test_options(opts, os_opts) diff --git a/tests/unit/test_swiftclient.py b/tests/unit/test_swiftclient.py index facd6d96..1a5c7721 100644 --- a/tests/unit/test_swiftclient.py +++ b/tests/unit/test_swiftclient.py @@ -30,7 +30,7 @@ from six.moves.urllib.parse import urlparse from six.moves import reload_module # TODO: mock http connection class with more control over headers -from .utils import fake_http_connect, fake_get_auth_keystone +from .utils import MockHttpTest, fake_get_auth_keystone from swiftclient import client as c import swiftclient.utils @@ -103,51 +103,6 @@ class TestJsonImport(testtools.TestCase): self.assertEqual(loads, c.json_loads) -class MockHttpTest(testtools.TestCase): - - def setUp(self): - super(MockHttpTest, self).setUp() - - def fake_http_connection(*args, **kwargs): - _orig_http_connection = c.http_connection - return_read = kwargs.get('return_read') - query_string = kwargs.get('query_string') - storage_url = kwargs.get('storage_url') - - def wrapper(url, proxy=None, cacert=None, insecure=False, - ssl_compression=True): - if storage_url: - self.assertEqual(storage_url, url) - - parsed, _conn = _orig_http_connection(url, proxy=proxy) - conn = fake_http_connect(*args, **kwargs)() - - def request(method, url, *args, **kwargs): - if query_string: - self.assertTrue(url.endswith('?' + query_string)) - if url.endswith('invalid_cert') and not insecure: - from swiftclient import client as c - raise c.ClientException("invalid_certificate") - return - conn.request = request - - conn.has_been_read = False - _orig_read = conn.read - - def read(*args, **kwargs): - conn.has_been_read = True - return _orig_read(*args, **kwargs) - conn.read = return_read or read - - return parsed, conn - return wrapper - self.fake_http_connection = fake_http_connection - - def tearDown(self): - super(MockHttpTest, self).tearDown() - reload_module(c) - - class MockHttpResponse(): def __init__(self, status=0): self.status = status diff --git a/tests/unit/utils.py b/tests/unit/utils.py index cb671cf4..09b31c13 100644 --- a/tests/unit/utils.py +++ b/tests/unit/utils.py @@ -14,6 +14,9 @@ # limitations under the License. from requests import RequestException from time import sleep +import testtools +from six.moves import reload_module +from swiftclient import client as c def fake_get_auth_keystone(os_options, exc=None, **kwargs): @@ -156,3 +159,54 @@ def fake_http_connect(*code_iter, **kwargs): return fake_conn return connect + + +class MockHttpTest(testtools.TestCase): + + def setUp(self): + super(MockHttpTest, self).setUp() + + def fake_http_connection(*args, **kwargs): + _orig_http_connection = c.http_connection + return_read = kwargs.get('return_read') + query_string = kwargs.get('query_string') + storage_url = kwargs.get('storage_url') + auth_token = kwargs.get('auth_token') + + def wrapper(url, proxy=None, cacert=None, insecure=False, + ssl_compression=True): + if storage_url: + self.assertEqual(storage_url, url) + + parsed, _conn = _orig_http_connection(url, proxy=proxy) + conn = fake_http_connect(*args, **kwargs)() + + def request(method, url, *args, **kwargs): + if auth_token: + headers = args[1] + self.assertTrue('X-Auth-Token' in headers) + actual_token = headers.get('X-Auth-Token') + self.assertEqual(auth_token, actual_token) + if query_string: + self.assertTrue(url.endswith('?' + query_string)) + if url.endswith('invalid_cert') and not insecure: + from swiftclient import client as c + raise c.ClientException("invalid_certificate") + return + conn.request = request + + conn.has_been_read = False + _orig_read = conn.read + + def read(*args, **kwargs): + conn.has_been_read = True + return _orig_read(*args, **kwargs) + conn.read = return_read or read + + return parsed, conn + return wrapper + self.fake_http_connection = fake_http_connection + + def tearDown(self): + super(MockHttpTest, self).tearDown() + reload_module(c)