diff --git a/neutronclient/shell.py b/neutronclient/shell.py index d880c657c..0870599c3 100644 --- a/neutronclient/shell.py +++ b/neutronclient/shell.py @@ -894,7 +894,8 @@ class NeutronShell(app.App): """ cloud_config = os_client_config.OpenStackConfig().get_one_cloud( cloud=self.options.os_cloud, argparse=self.options, - network_api_version=self.api_version) + network_api_version=self.api_version, + verify=not self.options.insecure) verify, cert = cloud_config.get_requests_verify_args() # TODO(singhj): Remove dependancy on HTTPClient @@ -928,6 +929,7 @@ class NeutronShell(app.App): service_name=cloud_config.get_service_name('network'), endpoint_type=interface, auth=auth, + insecure=not verify, log_credentials=True) return diff --git a/neutronclient/tests/unit/test_shell.py b/neutronclient/tests/unit/test_shell.py index d343cdc26..a3326a17d 100644 --- a/neutronclient/tests/unit/test_shell.py +++ b/neutronclient/tests/unit/test_shell.py @@ -20,11 +20,13 @@ import re import sys import fixtures +from keystoneauth1 import session import mock import six import testtools from testtools import matchers +from neutronclient.common import clientmanager from neutronclient import shell as openstack_shell @@ -35,6 +37,13 @@ DEFAULT_TENANT_NAME = 'tenant_name' DEFAULT_AUTH_URL = 'http://127.0.0.1:5000/v2.0/' DEFAULT_TOKEN = '3bcc3d3a03f44e3d8377f9247b0ad155' DEFAULT_URL = 'http://quantum.example.org:9696/' +DEFAULT_REGION = 'regionOne' +DEFAULT_ENDPOINT_TYPE = 'public' +DEFAULT_API_VERSION = '2.0' +DEFAULT_SERVICE_TYPE = 'network' +DEFAULT_SERVICE_NAME = 'neutron' +DEFAULT_RETRIES = 3 +DEFAULT_TIMEOUT = 3.0 class ShellTest(testtools.TestCase): @@ -219,3 +228,128 @@ class ShellTest(testtools.TestCase): search_str = "Try 'neutron help port-create' for more information" self.assertTrue(any(search_str in string for string in stderr.split('\n'))) + + def _test_authenticate_user(self, expect_verify, expect_insecure, + **options): + base_options = {'os_cloud': None, + 'http_timeout': DEFAULT_TIMEOUT, + 'region_name': DEFAULT_REGION, + 'network_service_name': DEFAULT_SERVICE_NAME, + 'neutron_service_type': DEFAULT_SERVICE_TYPE} + + options.update(base_options) + if options.get('os_token'): + options.update({'os_token': 'token', 'os_url': 'url'}) + else: + options.update({'os_token': None, 'os_url': None}) + + with mock.patch.object(openstack_shell.NeutronShell, + 'run_subcommand'), \ + mock.patch.object(session, 'Session') as session_mock, \ + mock.patch.object(clientmanager, 'ClientManager') as cmgr_mock: + + shell = openstack_shell.NeutronShell(DEFAULT_API_VERSION) + shell.options = mock.Mock(spec=options.keys()) + for k, v in options.items(): + setattr(shell.options, k, v) + shell.options.os_endpoint_type = DEFAULT_ENDPOINT_TYPE + shell.options.retries = DEFAULT_RETRIES + + if not (options.get('os_token') and options.get('os_url')): + auth = mock.ANY + auth_session = mock.sentinel.session + session_mock.return_value = auth_session + else: + auth = None + auth_session = None + + shell.authenticate_user() + + if not (options.get('os_token') and options.get('os_url')): + session_mock.assert_called_once_with( + auth=mock.ANY, verify=expect_verify, + cert=options.get('cert'), + timeout=DEFAULT_TIMEOUT) + else: + self.assertFalse(session_mock.called) + + cmgr_mock.assert_called_once_with( + retries=DEFAULT_RETRIES, + raise_errors=False, + session=auth_session, + url=options.get('os_url'), + token=options.get('os_token'), + region_name=DEFAULT_REGION, + api_version=DEFAULT_API_VERSION, + service_type=DEFAULT_SERVICE_TYPE, + service_name=DEFAULT_SERVICE_NAME, + endpoint_type=DEFAULT_ENDPOINT_TYPE, + auth=auth, + insecure=expect_insecure, + log_credentials=True) + + def test_authenticate_secure_with_cacert_with_cert(self): + self._test_authenticate_user( + insecure=False, cacert='cacert', cert='cert', + expect_verify='cacert', expect_insecure=False) + + def test_authenticate_secure_with_cacert_with_cert_with_token(self): + self._test_authenticate_user( + os_token='token', + insecure=False, cacert='cacert', cert='cert', + expect_verify='cacert', expect_insecure=False) + + def test_authenticate_insecure_with_cacert_with_cert(self): + self._test_authenticate_user( + insecure=True, cacert='cacert', cert='cert', + expect_verify=False, expect_insecure=True) + + def test_authenticate_insecure_with_cacert_with_cert_with_token(self): + self._test_authenticate_user( + os_token='token', + insecure=True, cacert='cacert', cert='cert', + expect_verify=False, expect_insecure=True) + + def test_authenticate_secure_without_cacert_with_cert(self): + self._test_authenticate_user( + insecure=False, cert='cert', + expect_verify=True, expect_insecure=False) + + def test_authenticate_secure_without_cacert_with_cert_with_token(self): + self._test_authenticate_user( + os_token='token', + insecure=False, cert='cert', + expect_verify=True, expect_insecure=False) + + def test_authenticate_insecure_without_cacert_with_cert(self): + self._test_authenticate_user( + insecure=True, cert='cert', + expect_verify=False, expect_insecure=True) + + def test_authenticate_insecure_without_cacert_with_cert_with_token(self): + self._test_authenticate_user( + os_token='token', + insecure=True, cert='cert', + expect_verify=False, expect_insecure=True) + + def test_authenticate_secure_with_cacert_without_cert(self): + self._test_authenticate_user( + insecure=False, cacert='cacert', + expect_verify='cacert', expect_insecure=False) + + def test_authenticate_secure_with_cacert_without_cert_with_token(self): + self._test_authenticate_user( + os_token='token', + insecure=False, cacert='cacert', + expect_verify='cacert', expect_insecure=False) + + def test_authenticate_insecure_with_cacert_without_cert(self): + self._test_authenticate_user( + insecure=True, cacert='cacert', + expect_verify=False, expect_insecure=True) + + def test_authenticate_insecure_with_cacert_without_cert_with_token(self): + self._test_authenticate_user( + os_token='token', + insecure=True, cacert='cacert', + expect_verify=False, expect_insecure=True)