Fix bug with some OS options not being passed to client

Fixes a regression that is preventing swiftclient from
authenticating using keystone v3 options.

swiftclient/shell.py processes the dict of command
line options to set up a child dict ('os_options')
of keystone auth specific options (line 960). The
processing includes stripping --os- prefixes from
the command line options before adding keys to
os_options.

A recent patch https://review.openstack.org/#/c/85453/
introduced a duplication of this option processing
in service.py (line 79) which replaces the os_options
created in shell.py, but omits keystone v3 related
options. Consequently the keystone v3 options are
not being passed to the client get_auth() method.

This patch adds the keystone v3 related options to
the option processing in service.py.

For pragmatic reasons (i.e. fixing the bug quickly)
the option processing code has not been removed from
parse_args in shell.py. It is likely that the code in
parse_args is now redundant, but all code paths between
parse_args and process_options should be inspected and
test coverage added before removing that code.

Unit tests have been added in test_shell.py to verify
that command line options are correctly passed to the
client get_auth method.

The MockHttpTest class is re-used in test_shell.py,
so it is moved from test_swiftclient.py
to tests/unit/utils.py

Closes-bug: #1372465

Change-Id: I4fed013cdb8936509609d06093337cc147ade0d6
This commit is contained in:
Alistair Coles 2014-09-22 11:52:44 +01:00
parent 8f1b394325
commit f90d7f28e5
5 changed files with 408 additions and 105 deletions

@ -285,6 +285,23 @@ def get_keystoneclient_2_0(auth_url, user, key, os_options, **kwargs):
return get_auth_keystone(auth_url, user, key, os_options, **kwargs)
def _import_keystone_client(auth_version):
# the attempted imports are encapsulated in this function to allow
# mocking for tests
try:
if auth_version in AUTH_VERSIONS_V3:
from keystoneclient.v3 import client as ksclient
else:
from keystoneclient.v2_0 import client as ksclient
from keystoneclient import exceptions
return ksclient, exceptions
except ImportError:
sys.exit('''
Auth versions 2.0 and 3 require python-keystoneclient, install it or use Auth
version 1.0 which requires ST_AUTH, ST_USER, and ST_KEY environment
variables to be set or overridden with -A, -U, or -K.''')
def get_auth_keystone(auth_url, user, key, os_options, **kwargs):
"""
Authenticate against a keystone server.
@ -296,17 +313,7 @@ def get_auth_keystone(auth_url, user, key, os_options, **kwargs):
auth_version = kwargs.get('auth_version', '2.0')
debug = logger.isEnabledFor(logging.DEBUG) and True or False
try:
if auth_version in AUTH_VERSIONS_V3:
from keystoneclient.v3 import client as ksclient
else:
from keystoneclient.v2_0 import client as ksclient
from keystoneclient import exceptions
except ImportError:
sys.exit('''
Auth versions 2.0 and 3 require python-keystoneclient, install it or use Auth
version 1.0 which requires ST_AUTH, ST_USER, and ST_KEY environment
variables to be set or overridden with -A, -U, or -K.''')
ksclient, exceptions = _import_keystone_client(auth_version)
try:
_ksclient = ksclient.Client(

@ -77,8 +77,10 @@ class SwiftError(Exception):
def process_options(options):
if not (options['auth'] and options['user'] and options['key']):
# Use 2.0 auth if none of the old args are present
if (not (options.get('auth') and options.get('user')
and options.get('key'))
and options.get('auth_version') != '3'):
# Use keystone 2.0 auth if any of the old-style args are missing
options['auth_version'] = '2.0'
# Use new-style args if old ones not present
@ -91,8 +93,15 @@ def process_options(options):
# Specific OpenStack options
options['os_options'] = {
'user_id': options['os_user_id'],
'user_domain_id': options['os_user_domain_id'],
'user_domain_name': options['os_user_domain_name'],
'tenant_id': options['os_tenant_id'],
'tenant_name': options['os_tenant_name'],
'project_id': options['os_project_id'],
'project_name': options['os_project_name'],
'project_domain_id': options['os_project_domain_id'],
'project_domain_name': options['os_project_domain_name'],
'service_type': options['os_service_type'],
'endpoint_type': options['os_endpoint_type'],
'auth_token': options['os_auth_token'],
@ -111,9 +120,16 @@ _default_global_options = {
"key": environ.get('ST_KEY'),
"retries": 5,
"os_username": environ.get('OS_USERNAME'),
"os_user_id": environ.get('OS_USER_ID'),
"os_user_domain_name": environ.get('OS_USER_DOMAIN_NAME'),
"os_user_domain_id": environ.get('OS_USER_DOMAIN_ID'),
"os_password": environ.get('OS_PASSWORD'),
"os_tenant_id": environ.get('OS_TENANT_ID'),
"os_tenant_name": environ.get('OS_TENANT_NAME'),
"os_project_name": environ.get('OS_PROJECT_NAME'),
"os_project_id": environ.get('OS_PROJECT_ID'),
"os_project_domain_name": environ.get('OS_PROJECT_DOMAIN_NAME'),
"os_project_domain_id": environ.get('OS_PROJECT_DOMAIN_ID'),
"os_auth_url": environ.get('OS_AUTH_URL'),
"os_auth_token": environ.get('OS_AUTH_TOKEN'),
"os_storage_url": environ.get('OS_STORAGE_URL'),

@ -21,10 +21,12 @@ import unittest
import six
import swiftclient
from swiftclient.service import SwiftError
import swiftclient.shell
import swiftclient.utils
from os.path import basename, dirname
from tests.unit.test_swiftclient import MockHttpTest
if six.PY2:
BUILTIN_OPEN = '__builtin__.open'
@ -38,6 +40,40 @@ mocked_os_environ = {
}
def _make_args(cmd, opts, os_opts, separator='-', flags=None, cmd_args=None):
"""
Construct command line arguments for given options.
"""
args = [""]
flags = flags or []
for k, v in opts.items():
arg = "--" + k.replace("_", "-")
args = args + [arg, v]
for k, v in os_opts.items():
arg = "--os" + separator + k.replace("_", separator)
args = args + [arg, v]
for flag in flags:
args.append('--%s' % flag)
args = args + [cmd]
if cmd_args:
args = args + cmd_args
return args
def _make_env(opts, os_opts):
"""
Construct a dict of environment variables for given options.
"""
env = {}
for k, v in opts.items():
key = 'ST_' + k.upper().replace('-', '_')
env[key] = v
for k, v in os_opts.items():
key = 'OS_' + k.upper().replace('-', '_')
env[key] = v
return env
@mock.patch.dict(os.environ, mocked_os_environ)
class TestShell(unittest.TestCase):
def __init__(self, *args, **kwargs):
@ -413,33 +449,6 @@ class TestParsing(unittest.TestCase):
result[0], result[1] = swiftclient.shell.parse_args(parser, args)
return fake_command
def _make_args(self, cmd, opts, os_opts, separator='-'):
"""
Construct command line arguments for given options.
"""
args = [""]
for k, v in opts.items():
arg = "--" + k.replace("_", "-")
args = args + [arg, v]
for k, v in os_opts.items():
arg = "--os" + separator + k.replace("_", separator)
args = args + [arg, v]
args = args + [cmd]
return args
def _make_env(self, opts, os_opts):
"""
Construct a dict of environment variables for given options.
"""
env = {}
for k, v in opts.items():
key = 'ST_' + k.upper()
env[key] = v
for k, v in os_opts.items():
key = 'OS_' + k.upper()
env[key] = v
return env
def _verify_opts(self, actual_opts, opts, os_opts={}, os_opts_dict={}):
"""
Check parsed options are correct.
@ -502,7 +511,7 @@ class TestParsing(unittest.TestCase):
# username with domain is sufficient in args because keystone will
# assume user is in default domain
args = self._make_args("stat", opts, os_opts, '-')
args = _make_args("stat", opts, os_opts, '-')
result = [None, None]
fake_command = self._make_fake_command(result)
with mock.patch('swiftclient.shell.st_stat', fake_command):
@ -516,7 +525,7 @@ class TestParsing(unittest.TestCase):
all_os_opts = os_opts.copy()
all_os_opts.update(os_opts_dict)
args = self._make_args("stat", opts, all_os_opts, '-')
args = _make_args("stat", opts, all_os_opts, '-')
result = [None, None]
fake_command = self._make_fake_command(result)
with mock.patch('swiftclient.shell.st_stat', fake_command):
@ -528,7 +537,7 @@ class TestParsing(unittest.TestCase):
os_opts_dict = {"storage_url": "http://example.com:8080/v1",
"auth_token": "0123abcd"}
args = self._make_args("stat", opts, os_opts_dict, '-')
args = _make_args("stat", opts, os_opts_dict, '-')
result = [None, None]
fake_command = self._make_fake_command(result)
with mock.patch('swiftclient.shell.st_stat', fake_command):
@ -558,7 +567,7 @@ class TestParsing(unittest.TestCase):
all_os_opts.update(os_opts_dict)
# check using hyphen separator
args = self._make_args("stat", opts, all_os_opts, '-')
args = _make_args("stat", opts, all_os_opts, '-')
result = [None, None]
fake_command = self._make_fake_command(result)
with mock.patch('swiftclient.shell.st_stat', fake_command):
@ -566,7 +575,7 @@ class TestParsing(unittest.TestCase):
self._verify_opts(result[0], opts, os_opts, os_opts_dict)
# check using underscore separator
args = self._make_args("stat", opts, all_os_opts, '_')
args = _make_args("stat", opts, all_os_opts, '_')
result = [None, None]
fake_command = self._make_fake_command(result)
with mock.patch('swiftclient.shell.st_stat', fake_command):
@ -574,8 +583,8 @@ class TestParsing(unittest.TestCase):
self._verify_opts(result[0], opts, os_opts, os_opts_dict)
# check using environment variables
args = self._make_args("stat", {}, {})
env = self._make_env(opts, all_os_opts)
args = _make_args("stat", {}, {})
env = _make_env(opts, all_os_opts)
result = [None, None]
fake_command = self._make_fake_command(result)
with mock.patch.dict(os.environ, env):
@ -584,7 +593,7 @@ class TestParsing(unittest.TestCase):
self._verify_opts(result[0], opts, os_opts, os_opts_dict)
# check again using OS_AUTH_VERSION instead of ST_AUTH_VERSION
env = self._make_env({}, all_os_opts)
env = _make_env({}, all_os_opts)
env.update({'OS_AUTH_VERSION': '3'})
result = [None, None]
fake_command = self._make_fake_command(result)
@ -600,7 +609,7 @@ class TestParsing(unittest.TestCase):
os_opts = {"password": "secret",
"username": "user",
"auth_url": "http://example.com:5000/v3"}
args = self._make_args("stat", opts, os_opts)
args = _make_args("stat", opts, os_opts)
with mock.patch('swiftclient.shell.st_stat', fake_command):
swiftclient.shell.main(args)
self.assertEqual(['stat'], result[1])
@ -613,37 +622,37 @@ class TestParsing(unittest.TestCase):
opts = {"auth_version": "3"}
os_opts = {"password": "secret",
"auth_url": "http://example.com:5000/v3"}
args = self._make_args("stat", opts, os_opts)
args = _make_args("stat", opts, os_opts)
self.assertRaises(SystemExit, swiftclient.shell.main, args)
os_opts = {"username": "user",
"auth_url": "http://example.com:5000/v3"}
args = self._make_args("stat", opts, os_opts)
args = _make_args("stat", opts, os_opts)
self.assertRaises(SystemExit, swiftclient.shell.main, args)
os_opts = {"username": "user",
"password": "secret"}
args = self._make_args("stat", opts, os_opts)
args = _make_args("stat", opts, os_opts)
self.assertRaises(SystemExit, swiftclient.shell.main, args)
def test_insufficient_env_vars_v3(self):
args = self._make_args("stat", {}, {})
args = _make_args("stat", {}, {})
opts = {"auth_version": "3"}
os_opts = {"password": "secret",
"auth_url": "http://example.com:5000/v3"}
env = self._make_env(opts, os_opts)
env = _make_env(opts, os_opts)
with mock.patch.dict(os.environ, env):
self.assertRaises(SystemExit, swiftclient.shell.main, args)
os_opts = {"username": "user",
"auth_url": "http://example.com:5000/v3"}
env = self._make_env(opts, os_opts)
env = _make_env(opts, os_opts)
with mock.patch.dict(os.environ, env):
self.assertRaises(SystemExit, swiftclient.shell.main, args)
os_opts = {"username": "user",
"password": "secret"}
env = self._make_env(opts, os_opts)
env = _make_env(opts, os_opts)
with mock.patch.dict(os.environ, env):
self.assertRaises(SystemExit, swiftclient.shell.main, args)
@ -651,7 +660,7 @@ class TestParsing(unittest.TestCase):
# --help returns condensed help message
opts = {"help": ""}
os_opts = {}
args = self._make_args("stat", opts, os_opts)
args = _make_args("stat", opts, os_opts)
mock_stdout = six.StringIO()
with mock.patch('sys.stdout', mock_stdout):
self.assertRaises(SystemExit, swiftclient.shell.main, args)
@ -665,7 +674,7 @@ class TestParsing(unittest.TestCase):
# "password": "secret",
# "username": "user",
# "auth_url": "http://example.com:5000/v3"}
args = self._make_args("", opts, os_opts)
args = _make_args("", opts, os_opts)
mock_stdout = six.StringIO()
with mock.patch('sys.stdout', mock_stdout):
self.assertRaises(SystemExit, swiftclient.shell.main, args)
@ -675,10 +684,272 @@ class TestParsing(unittest.TestCase):
## --os-help return os options help
opts = {}
args = self._make_args("", opts, os_opts)
args = _make_args("", opts, os_opts)
mock_stdout = six.StringIO()
with mock.patch('sys.stdout', mock_stdout):
self.assertRaises(SystemExit, swiftclient.shell.main, args)
out = mock_stdout.getvalue()
self.assertTrue(out.find('[--key <api_key>]') > 0)
self.assertTrue(out.find('--os-username=<auth-user-name>') > 0)
class FakeKeystone(object):
'''
Fake keystone client module. Returns given endpoint url and auth token.
'''
def __init__(self, endpoint, token):
self.calls = []
self.auth_version = None
self.endpoint = endpoint
self.token = token
class _Client():
def __init__(self, endpoint, token, **kwargs):
self.auth_token = token
self.endpoint = endpoint
self.service_catalog = self.ServiceCatalog(endpoint)
class ServiceCatalog(object):
def __init__(self, endpoint):
self.calls = []
self.endpoint_url = endpoint
def url_for(self, **kwargs):
self.calls.append(kwargs)
return self.endpoint_url
def Client(self, **kwargs):
self.calls.append(kwargs)
self.client = self._Client(endpoint=self.endpoint, token=self.token,
**kwargs)
return self.client
class Unauthorized(Exception):
pass
class AuthorizationFailure(Exception):
pass
class EndpointNotFound(Exception):
pass
def _make_fake_import_keystone_client(fake_import):
def _fake_import_keystone_client(auth_version):
fake_import.auth_version = auth_version
return fake_import, fake_import
return _fake_import_keystone_client
class TestKeystoneOptions(MockHttpTest):
"""
Tests to check that options are passed from the command line or
environment variables through to the keystone client interface.
"""
all_os_opts = {'password': 'secret',
'username': 'user',
'auth-url': 'http://example.com:5000/v3',
'user-domain-name': 'userdomain',
'user-id': 'userid',
'user-domain-id': 'userdomainid',
'tenant-name': 'tenantname',
'tenant-id': 'tenantid',
'project-name': 'projectname',
'project-id': 'projectid',
'project-domain-id': 'projectdomainid',
'project-domain-name': 'projectdomain',
'cacert': 'foo'}
catalog_opts = {'service-type': 'my-object-store',
'endpoint-type': 'public',
'region-name': 'my-region'}
flags = ['insecure', 'debug']
# options that are given default values in code if missing from CLI
defaults = {'auth-version': '2.0',
'service-type': 'object-store',
'endpoint-type': 'publicURL'}
def _build_os_opts(self, keys):
os_opts = {}
for k in keys:
os_opts[k] = self.all_os_opts.get(k, self.catalog_opts.get(k))
return os_opts
def _test_options_passed_to_keystone(self, cmd, opts, os_opts,
flags=None, use_env=False,
cmd_args=None, no_auth=False):
flags = flags or []
if use_env:
# set up fake environment variables and make a minimal command line
env = _make_env(opts, os_opts)
args = _make_args(cmd, {}, {}, separator='-', flags=flags,
cmd_args=cmd_args)
else:
# set up empty environment and make full command line
env = {}
args = _make_args(cmd, opts, os_opts, separator='-', flags=flags,
cmd_args=cmd_args)
ks_endpoint = 'http://example.com:8080/v1/AUTH_acc'
ks_token = 'fake_auth_token'
fake_ks = FakeKeystone(endpoint=ks_endpoint, token=ks_token)
# fake_conn will check that storage_url and auth_token are as expected
endpoint = os_opts.get('storage-url', ks_endpoint)
token = os_opts.get('auth-token', ks_token)
fake_conn = self.fake_http_connection(204, headers={},
storage_url=endpoint,
auth_token=token)
with mock.patch('swiftclient.client._import_keystone_client',
_make_fake_import_keystone_client(fake_ks)):
with mock.patch('swiftclient.client.http_connection', fake_conn):
with mock.patch.dict(os.environ, env, clear=True):
try:
swiftclient.shell.main(args)
except SystemExit as e:
self.fail('Unexpected SystemExit: %s' % e)
except SwiftError as err:
self.fail('Unexpected SwiftError: %s' % err)
if no_auth:
# check that keystone client was not used and terminate tests
self.assertIsNone(getattr(fake_ks, 'auth_version'))
self.assertEqual(len(fake_ks.calls), 0)
return
# check correct auth version was passed to _import_keystone_client
key = 'auth-version'
expected = opts.get(key, self.defaults.get(key))
self.assertEqual(expected, fake_ks.auth_version)
# check args passed to keystone Client __init__
self.assertEqual(len(fake_ks.calls), 1)
actual_args = fake_ks.calls[0]
for key in self.all_os_opts.keys():
expected = os_opts.get(key, self.defaults.get(key))
key = key.replace('-', '_')
self.assertTrue(key in actual_args,
'Expected key %s not found in args %s'
% (key, actual_args))
self.assertEqual(expected, actual_args[key],
'Expected %s for key %s, found %s'
% (expected, key, actual_args[key]))
for flag in flags:
self.assertTrue(flag in actual_args)
self.assertTrue(actual_args[flag])
# check args passed to ServiceCatalog.url_for() method
self.assertEqual(len(fake_ks.client.service_catalog.calls), 1)
actual_args = fake_ks.client.service_catalog.calls[0]
for key in self.catalog_opts.keys():
expected = os_opts.get(key, self.defaults.get(key))
key = key.replace('-', '_')
if key == 'region_name':
key = 'filter_value'
self.assertTrue(key in actual_args,
'Expected key %s not found in args %s'
% (key, actual_args))
self.assertEqual(expected, actual_args[key],
'Expected %s for key %s, found %s'
% (expected, key, actual_args[key]))
key, v = 'attr', 'region'
self.assertTrue(key in actual_args,
'Expected key %s not found in args %s'
% (key, actual_args))
self.assertEqual(v, actual_args[key],
'Expected %s for key %s, found %s'
% (v, key, actual_args[key]))
def _test_options(self, opts, os_opts, flags=None, no_auth=False):
# repeat test for different commands using env and command line options
for cmd in ('stat', 'post'):
self._test_options_passed_to_keystone(cmd, opts, os_opts,
flags=flags, no_auth=no_auth)
self._test_options_passed_to_keystone(cmd, opts, os_opts,
flags=flags, use_env=True,
no_auth=no_auth)
def test_all_args_passed_to_keystone(self):
# check that all possible command line args are passed to keystone
opts = {'auth-version': '3'}
os_opts = dict(self.all_os_opts)
os_opts.update(self.catalog_opts)
self._test_options(opts, os_opts, flags=self.flags)
opts = {'auth-version': '2.0'}
self._test_options(opts, os_opts, flags=self.flags)
opts = {}
self._test_options(opts, os_opts, flags=self.flags)
def test_catalog_options_and_flags_not_required_v3(self):
# check that all possible command line args are passed to keystone
opts = {'auth-version': '3'}
os_opts = dict(self.all_os_opts)
self._test_options(opts, os_opts, flags=None)
def test_ok_option_combinations_v3(self):
opts = {'auth-version': '3'}
keys = ('username', 'password', 'tenant-name', 'auth-url')
os_opts = self._build_os_opts(keys)
self._test_options(opts, os_opts)
keys = ('user-id', 'password', 'tenant-name', 'auth-url')
os_opts = self._build_os_opts(keys)
self._test_options(opts, os_opts)
keys = ('user-id', 'password', 'tenant-id', 'auth-url')
os_opts = self._build_os_opts(keys)
self._test_options(opts, os_opts)
keys = ('user-id', 'password', 'project-name', 'auth-url')
os_opts = self._build_os_opts(keys)
self._test_options(opts, os_opts)
keys = ('user-id', 'password', 'project-id', 'auth-url')
os_opts = self._build_os_opts(keys)
self._test_options(opts, os_opts)
def test_ok_option_combinations_v2(self):
opts = {'auth-version': '2.0'}
keys = ('username', 'password', 'tenant-name', 'auth-url')
os_opts = self._build_os_opts(keys)
self._test_options(opts, os_opts)
keys = ('username', 'password', 'tenant-id', 'auth-url')
os_opts = self._build_os_opts(keys)
self._test_options(opts, os_opts)
# allow auth_version to default to 2.0
opts = {}
keys = ('username', 'password', 'tenant-name', 'auth-url')
os_opts = self._build_os_opts(keys)
self._test_options(opts, os_opts)
keys = ('username', 'password', 'tenant-id', 'auth-url')
os_opts = self._build_os_opts(keys)
self._test_options(opts, os_opts)
def test_url_and_token_provided_on_command_line(self):
endpoint = 'http://alternate.com:8080/v1/AUTH_another'
token = 'alternate_auth_token'
os_opts = {'auth-token': token,
'storage-url': endpoint}
opts = {'auth-version': '3'}
self._test_options(opts, os_opts, no_auth=True)
opts = {'auth-version': '2.0'}
self._test_options(opts, os_opts, no_auth=True)
def test_url_provided_on_command_line(self):
endpoint = 'http://alternate.com:8080/v1/AUTH_another'
os_opts = {'username': 'username',
'password': 'password',
'project-name': 'projectname',
'auth-url': 'http://example.com:5000/v3',
'storage-url': endpoint}
opts = {'auth-version': '3'}
self._test_options(opts, os_opts)
opts = {'auth-version': '2.0'}
self._test_options(opts, os_opts)

@ -30,7 +30,7 @@ from six.moves.urllib.parse import urlparse
from six.moves import reload_module
# TODO: mock http connection class with more control over headers
from .utils import fake_http_connect, fake_get_auth_keystone
from .utils import MockHttpTest, fake_get_auth_keystone
from swiftclient import client as c
import swiftclient.utils
@ -103,51 +103,6 @@ class TestJsonImport(testtools.TestCase):
self.assertEqual(loads, c.json_loads)
class MockHttpTest(testtools.TestCase):
def setUp(self):
super(MockHttpTest, self).setUp()
def fake_http_connection(*args, **kwargs):
_orig_http_connection = c.http_connection
return_read = kwargs.get('return_read')
query_string = kwargs.get('query_string')
storage_url = kwargs.get('storage_url')
def wrapper(url, proxy=None, cacert=None, insecure=False,
ssl_compression=True):
if storage_url:
self.assertEqual(storage_url, url)
parsed, _conn = _orig_http_connection(url, proxy=proxy)
conn = fake_http_connect(*args, **kwargs)()
def request(method, url, *args, **kwargs):
if query_string:
self.assertTrue(url.endswith('?' + query_string))
if url.endswith('invalid_cert') and not insecure:
from swiftclient import client as c
raise c.ClientException("invalid_certificate")
return
conn.request = request
conn.has_been_read = False
_orig_read = conn.read
def read(*args, **kwargs):
conn.has_been_read = True
return _orig_read(*args, **kwargs)
conn.read = return_read or read
return parsed, conn
return wrapper
self.fake_http_connection = fake_http_connection
def tearDown(self):
super(MockHttpTest, self).tearDown()
reload_module(c)
class MockHttpResponse():
def __init__(self, status=0):
self.status = status

@ -14,6 +14,9 @@
# limitations under the License.
from requests import RequestException
from time import sleep
import testtools
from six.moves import reload_module
from swiftclient import client as c
def fake_get_auth_keystone(os_options, exc=None, **kwargs):
@ -156,3 +159,54 @@ def fake_http_connect(*code_iter, **kwargs):
return fake_conn
return connect
class MockHttpTest(testtools.TestCase):
def setUp(self):
super(MockHttpTest, self).setUp()
def fake_http_connection(*args, **kwargs):
_orig_http_connection = c.http_connection
return_read = kwargs.get('return_read')
query_string = kwargs.get('query_string')
storage_url = kwargs.get('storage_url')
auth_token = kwargs.get('auth_token')
def wrapper(url, proxy=None, cacert=None, insecure=False,
ssl_compression=True):
if storage_url:
self.assertEqual(storage_url, url)
parsed, _conn = _orig_http_connection(url, proxy=proxy)
conn = fake_http_connect(*args, **kwargs)()
def request(method, url, *args, **kwargs):
if auth_token:
headers = args[1]
self.assertTrue('X-Auth-Token' in headers)
actual_token = headers.get('X-Auth-Token')
self.assertEqual(auth_token, actual_token)
if query_string:
self.assertTrue(url.endswith('?' + query_string))
if url.endswith('invalid_cert') and not insecure:
from swiftclient import client as c
raise c.ClientException("invalid_certificate")
return
conn.request = request
conn.has_been_read = False
_orig_read = conn.read
def read(*args, **kwargs):
conn.has_been_read = True
return _orig_read(*args, **kwargs)
conn.read = return_read or read
return parsed, conn
return wrapper
self.fake_http_connection = fake_http_connection
def tearDown(self):
super(MockHttpTest, self).tearDown()
reload_module(c)