Rework TLS option handling

Move all of this into ClientManager.__init__(), and only use the
cli_options dict as source.  That should already have anything
that os-client-config wants to do applied.  Then we fix it for
the legacy clients.

* verify/insecure/cacert
* cert/key

Rename ClientManager attributes:
* _verify -> verify
* _cacert -> cacert
* _cert -> cert
* _insecure -> removed, use 'not verify'
* _interface -> interface
* _region_name -> region_name

Change-Id: Ic6dd81d5129498b306ac2f63515426dcc6ce20c2
This commit is contained in:
Dean Troyer 2016-05-10 17:32:10 -05:00
parent a1229651a9
commit e584cef537
2 changed files with 111 additions and 39 deletions

View File

@ -70,7 +70,6 @@ class ClientManager(object):
self,
cli_options=None,
api_version=None,
verify=True,
pw_func=None,
):
"""Set up a ClientManager
@ -79,10 +78,6 @@ class ClientManager(object):
Options collected from the command-line, environment, or wherever
:param api_version:
Dict of API versions: key is API name, value is the version
:param verify:
TLS certificate verification; may be a boolean to enable or disable
server certificate verification, or a filename of a CA certificate
bundle to be used in verification (implies True)
:param pw_func:
Callback function for asking the user for a password. The function
takes an optional string for the prompt ('Password: ' on None) and
@ -93,32 +88,44 @@ class ClientManager(object):
self._api_version = api_version
self._pw_callback = pw_func
self._url = self._cli_options.auth.get('url')
self._region_name = self._cli_options.region_name
self._interface = self._cli_options.interface
self.region_name = self._cli_options.region_name
self.interface = self._cli_options.interface
self.timing = self._cli_options.timing
self._auth_ref = None
self.session = None
# verify is the Requests-compatible form
self._verify = verify
# also store in the form used by the legacy client libs
self._cacert = None
if isinstance(verify, bool):
self._insecure = not verify
# self.verify is the Requests-compatible form
# self.cacert is the form used by the legacy client libs
# self.insecure is not needed, use 'not self.verify'
# NOTE(dtroyer): Per bug https://bugs.launchpad.net/bugs/1447784
# --insecure overrides any --os-cacert setting
if self._cli_options.insecure:
# Handle --insecure
self.verify = False
self.cacert = None
else:
self._cacert = verify
self._insecure = False
if (self._cli_options.cacert is not None
and self._cli_options.cacert != ''):
# --cacert implies --verify here
self.verify = self._cli_options.cacert
self.cacert = self._cli_options.cacert
else:
# Fall through also gets --verify
self.verify = True
self.cacert = None
# Set up client certificate and key
# NOTE(cbrandily): This converts client certificate/key to requests
# cert argument: None (no client certificate), a path
# to client certificate or a tuple with client
# certificate/key paths.
self._cert = self._cli_options.cert
if self._cert and self._cli_options.key:
self._cert = self._cert, self._cli_options.key
self.cert = self._cli_options.cert
if self.cert and self._cli_options.key:
self.cert = self.cert, self._cli_options.key
# Get logging from root logger
root_logger = logging.getLogger('')
@ -215,8 +222,8 @@ class ClientManager(object):
self.session = osc_session.TimingSession(
auth=self.auth,
session=request_session,
verify=self._verify,
cert=self._cert,
verify=self.verify,
cert=self.cert,
user_agent=USER_AGENT,
)

View File

@ -51,6 +51,9 @@ class FakeOptions(object):
for option in auth.OPTIONS_LIST:
setattr(self, option.replace('-', '_'), None)
self.auth_type = None
self.verify = True
self.cacert = None
self.insecure = None
self.identity_api_version = '2.0'
self.timing = None
self.region_name = None
@ -111,7 +114,6 @@ class TestClientManager(utils.TestCase):
region_name=fakes.REGION_NAME,
),
api_version=API_VERSION,
verify=True
)
client_manager.setup_auth()
client_manager.auth_ref
@ -126,14 +128,13 @@ class TestClientManager(utils.TestCase):
)
self.assertEqual(
fakes.INTERFACE,
client_manager._interface,
client_manager.interface,
)
self.assertEqual(
fakes.REGION_NAME,
client_manager._region_name,
client_manager.region_name,
)
self.assertFalse(client_manager._insecure)
self.assertTrue(client_manager._verify)
self.assertTrue(client_manager.verify)
self.assertTrue(client_manager.is_network_endpoint_enabled())
def test_client_manager_password(self):
@ -148,7 +149,6 @@ class TestClientManager(utils.TestCase):
),
),
api_version=API_VERSION,
verify=False,
)
client_manager.setup_auth()
client_manager.auth_ref
@ -169,8 +169,7 @@ class TestClientManager(utils.TestCase):
client_manager.auth,
auth_v2.Password,
)
self.assertTrue(client_manager._insecure)
self.assertFalse(client_manager._verify)
self.assertTrue(client_manager.verify)
# These need to stick around until the old-style clients are gone
self.assertEqual(
@ -200,7 +199,6 @@ class TestClientManager(utils.TestCase):
auth_type='v3password',
),
api_version={"identity": "3"},
verify=False,
)
client_manager.setup_auth()
client_manager.auth_ref
@ -225,6 +223,28 @@ class TestClientManager(utils.TestCase):
headers=headers,
text=text)
def test_client_manager_password_verify(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
auth_type='v2password',
verify=True,
),
api_version=API_VERSION,
)
client_manager.setup_auth()
client_manager.auth_ref
self.assertTrue(client_manager.verify)
self.assertEqual(None, client_manager.cacert)
self.assertTrue(client_manager.is_network_endpoint_enabled())
def test_client_manager_password_verify_ca(self):
client_manager = clientmanager.ClientManager(
@ -236,32 +256,80 @@ class TestClientManager(utils.TestCase):
project_name=fakes.PROJECT_NAME,
),
auth_type='v2password',
cacert='cafile',
),
api_version=API_VERSION,
verify='cafile',
)
client_manager.setup_auth()
client_manager.auth_ref
self.assertFalse(client_manager._insecure)
self.assertTrue(client_manager._verify)
self.assertEqual('cafile', client_manager._cacert)
# Test that client_manager.verify is Requests-compatible,
# i.e. it contains the value of cafile here
self.assertTrue(client_manager.verify)
self.assertEqual('cafile', client_manager.verify)
self.assertEqual('cafile', client_manager.cacert)
self.assertTrue(client_manager.is_network_endpoint_enabled())
def test_client_manager_password_verify_insecure(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
auth_type='v2password',
insecure=True,
),
api_version=API_VERSION,
)
client_manager.setup_auth()
client_manager.auth_ref
self.assertFalse(client_manager.verify)
self.assertEqual(None, client_manager.cacert)
self.assertTrue(client_manager.is_network_endpoint_enabled())
def test_client_manager_password_verify_insecure_ca(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
auth_type='v2password',
insecure=True,
cacert='cafile',
),
api_version=API_VERSION,
)
client_manager.setup_auth()
client_manager.auth_ref
# insecure overrides cacert
self.assertFalse(client_manager.verify)
self.assertEqual(None, client_manager.cacert)
self.assertTrue(client_manager.is_network_endpoint_enabled())
def test_client_manager_password_no_cert(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions())
self.assertIsNone(client_manager._cert)
self.assertIsNone(client_manager.cert)
def test_client_manager_password_client_cert(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(cert='cert'))
self.assertEqual('cert', client_manager._cert)
self.assertEqual('cert', client_manager.cert)
def test_client_manager_password_client_cert_and_key(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(cert='cert', key='key'))
self.assertEqual(('cert', 'key'), client_manager._cert)
self.assertEqual(('cert', 'key'), client_manager.cert)
def _select_auth_plugin(self, auth_params, api_version, auth_plugin_name):
auth_params['auth_type'] = auth_plugin_name
@ -269,7 +337,6 @@ class TestClientManager(utils.TestCase):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(**auth_params),
api_version={"identity": api_version},
verify=True
)
client_manager.setup_auth()
client_manager.auth_ref
@ -315,7 +382,6 @@ class TestClientManager(utils.TestCase):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(os_auth_plugin=''),
api_version=API_VERSION,
verify=True,
)
self.assertRaises(
exc.CommandError,
@ -334,7 +400,6 @@ class TestClientManager(utils.TestCase):
),
),
api_version=API_VERSION,
verify=False,
)
self.assertFalse(client_manager._auth_setup_completed)
client_manager.setup_auth()