diff --git a/barbicanclient/barbican.py b/barbicanclient/barbican.py index 6ae37520..e1b3a732 100644 --- a/barbicanclient/barbican.py +++ b/barbicanclient/barbican.py @@ -21,7 +21,8 @@ import sys from cliff import app from cliff import commandmanager -from keystoneclient.auth import identity +from keystoneclient.auth.identity import v3 +from keystoneclient.auth.identity import v2 from keystoneclient import session import six @@ -43,6 +44,138 @@ class Barbican(app.App): **kwargs ) + def check_auth_arguments(self, args, api_version=None, raise_exc=False): + """Verifies that we have the correct arguments for authentication + + Supported Keystone v3 combinations: + - Project Id + - Project Name + Project Domain Name + - Project Name + Project Domain Id + Supported Keystone v2 combinations: + - Tenant Id + - Tenant Name + """ + successful = True + v3_arg_combinations = [ + args.os_project_id, + args.os_project_name and args.os_project_domain_name, + args.os_project_name and args.os_project_domain_id + ] + v2_arg_combinations = [args.os_tenant_id, args.os_tenant_name] + + # Keystone V3 + if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION: + if not any(v3_arg_combinations): + msg = ('ERROR: please specify the following --os-project-id or' + ' (--os-project-name and --os-project-domain-name) or ' + ' (--os-project-name and --os-project-domain-id)') + successful = False + # Keystone V2 + else: + if not any(v2_arg_combinations): + msg = ('ERROR: please specify --os-tenant-id or' + '--os-tenant-name') + successful = False + + if not successful and raise_exc: + raise Exception(msg) + + return successful + + def build_kwargs_based_on_version(self, args, api_version=None): + if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION: + kwargs = { + 'project_id': args.os_project_id, + 'project_name': args.os_project_name, + 'user_domain_id': args.os_user_domain_id, + 'user_domain_name': args.os_user_domain_name, + 'project_domain_id': args.os_project_domain_id, + 'project_domain_name': args.os_project_name + } + else: + kwargs = { + 'tenant_name': args.os_tenant_name, + 'tenant_id': args.os_tenant_id + } + + # Return a dictionary with only the populated (not None) values + return dict((k, v) for (k, v) in six.iteritems(kwargs) if v) + + def create_keystone_session( + self, args, api_version, kwargs_dict, auth_type + ): + # Make sure we have the correct arguments to function + self.check_auth_arguments(args, api_version, raise_exc=True) + + kwargs = self.build_kwargs_based_on_version(args, api_version) + kwargs.update(kwargs_dict) + + if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION: + method = v3.Token if auth_type == 'token' else v3.Password + else: + method = v2.Token if auth_type == 'token' else v2.Password + + auth = method(**kwargs) + + return session.Session(auth=auth, verify=not args.insecure) + + def create_client(self, args): + created_client = None + + api_version = args.os_identity_api_version + if args.no_auth and args.os_auth_url: + raise Exception( + 'ERROR: argument --os-auth-url/-A: not allowed ' + 'with argument --no-auth/-N' + ) + + if args.no_auth: + if not all([args.endpoint, args.os_tenant_id or + args.os_project_id]): + raise Exception( + 'ERROR: please specify --endpoint and ' + '--os-project-id (or --os-tenant-id)') + created_client = client.Client( + endpoint=args.endpoint, + project_id=args.os_tenant_id or args.os_project_id, + verify=not args.insecure + ) + # Token-based authentication + elif args.os_auth_token: + if not args.os_auth_url: + raise Exception('ERROR: please specify --os-auth-url') + token_kwargs = { + 'auth_url': args.os_auth_url, + 'token': args.os_auth_token + } + session = self.create_keystone_session( + args, api_version, token_kwargs, auth_type='token' + ) + created_client = client.Client( + session=session, + endpoint=args.endpoint + ) + + # Password-based authentication + elif args.os_auth_url: + password_kwargs = { + 'auth_url': args.os_auth_url, + 'password': args.os_password, + 'user_id': args.os_user_id, + 'username': args.os_username + } + session = self.create_keystone_session( + args, api_version, password_kwargs, auth_type='password' + ) + created_client = client.Client( + session=session, + endpoint=args.endpoint + ) + else: + raise Exception('ERROR: please specify authentication credentials') + + return created_client + def build_option_parser(self, description, version, argparse_kwargs=None): """Introduces global arguments for the application. This is inherited from the framework. @@ -122,69 +255,6 @@ class Barbican(app.App): session.Session.register_cli_options(parser) return parser - def _assert_no_auth_and_auth_url_mutually_exclusive(self, no_auth, - auth_url): - if no_auth and auth_url: - raise Exception("ERROR: argument --os-auth-url/-A: not allowed " - "with argument --no-auth/-N") - - def _check_auth_arguments(self, args, api_version=None, raise_exc=False): - """Verifies that we have the correct arguments for authentication - - Supported Keystone v3 combinations: - - Project Id - - Project Name + Project Domain Name - - Project Name + Project Domain Id - Support Keystone v2 combinations: - - Tenant Id - - Tenant Name - """ - successful = True - v3_arg_combinations = [ - args.os_project_id, - args.os_project_name and args.os_project_domain_name, - args.os_project_name and args.os_project_domain_id - ] - v2_arg_combinations = [args.os_tenant_id, args.os_tenant_name] - - # Keystone V3 - if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION: - if not any(v3_arg_combinations): - msg = ('ERROR: please specify the following --os-project-id or' - '--os-project-name and --os-project-domain-name or ' - '--os-project-name and --os-project-domain-id') - successful = False - # Keystone V2 - else: - if not any(v2_arg_combinations): - msg = ('ERROR: please specify --os-tenant-id or' - '--os-tenant-name') - successful = False - - if not successful and raise_exc: - raise Exception(msg) - - return successful - - def _build_kwargs_based_on_version(self, args, api_version=None): - if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION: - kwargs = { - 'project_id': args.os_project_id, - 'project_name': args.os_project_name, - 'user_domain_id': args.os_user_domain_id, - 'user_domain_name': args.os_user_domain_name, - 'project_domain_id': args.os_project_id, - 'project_domain_name': args.os_project_name - } - else: - kwargs = { - 'tenant_name': args.os_tenant_name, - 'tenant_id': args.os_tenant_id - } - - # Return a dictionary with only the populated (not None) values - return dict((k, v) for (k, v) in six.iteritems(kwargs) if v) - def initialize_app(self, argv): """Initializes the application. Checks if the minimal parameters are provided and creates the client @@ -192,86 +262,14 @@ class Barbican(app.App): This is inherited from the framework. """ args = self.options - self._assert_no_auth_and_auth_url_mutually_exclusive(args.no_auth, - args.os_auth_url) + self.client = self.create_client(args) - # Aliasing as we use this a number of times - api_version = args.os_identity_api_version - - # TODO(jmvrbanac): Split out these conditionals into discrete functions - if args.no_auth: - if not all([args.endpoint, args.os_tenant_id or - args.os_project_id]): - raise Exception( - 'ERROR: please specify --endpoint and ' - '--os-project-id(or --os-tenant-id)') - self.client = client.Client(endpoint=args.endpoint, - project_id=args.os_tenant_id or - args.os_project_id, - verify=not args.insecure) - # Token-based authentication - elif args.os_auth_token: - if not args.os_auth_url: - raise Exception('ERROR: please specify --os-auth-url') - - # Make sure we have the correct arguments to function - self._check_auth_arguments(args, api_version, raise_exc=True) - - kwargs = self._build_kwargs_based_on_version(args, api_version) - kwargs.update({ - 'auth_url': args.os_auth_url, - 'token': args.os_auth_token - }) - - if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION: - auth = identity.v3.Token(**kwargs) - else: - auth = identity.v2.Token(**kwargs) - - ks_session = session.Session(auth=auth, verify=not args.insecure) - self.client = client.Client( - session=ks_session, - endpoint=args.endpoint - ) - # Password-based authentication - elif all([args.os_auth_url, args.os_user_id or args.os_username, - args.os_password, args.os_tenant_name or args.os_tenant_id or - args.os_project_name or args.os_project_id]): - kwargs = dict() - kwargs['auth_url'] = args.os_auth_url - kwargs['password'] = args.os_password - if args.os_user_id: - kwargs['user_id'] = args.os_user_id - if args.os_username: - kwargs['username'] = args.os_username - - if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION: - if args.os_project_id: - kwargs['project_id'] = args.os_project_id - if args.os_project_name: - kwargs['project_name'] = args.os_project_name - if args.os_user_domain_id: - kwargs['user_domain_id'] = args.os_user_domain_id - if args.os_user_domain_name: - kwargs['user_domain_name'] = args.os_user_domain_name - if args.os_project_domain_id: - kwargs['project_domain_id'] = args.os_project_domain_id - if args.os_project_domain_name: - kwargs['project_domain_name'] = args.os_project_domain_name - auth = identity.v3.Password(**kwargs) - else: - if args.os_tenant_id: - kwargs['tenant_id'] = args.os_tenant_id - if args.os_tenant_name: - kwargs['tenant_name'] = args.os_tenant_name - auth = identity.v2.Password(**kwargs) - - ks_session = session.Session(auth=auth, verify=not args.insecure) - self.client = client.Client(session=ks_session, - endpoint=args.endpoint) - else: + def run(self, argv): + # If no arguments are provided, usage is displayed + if not argv: self.stderr.write(self.parser.format_usage()) - raise Exception('ERROR: please specify authentication credentials') + return 1 + return super(Barbican, self).run(argv) def main(argv=sys.argv[1:]): diff --git a/barbicanclient/tests/test_barbican.py b/barbicanclient/tests/test_barbican.py index 251d174b..896f0865 100644 --- a/barbicanclient/tests/test_barbican.py +++ b/barbicanclient/tests/test_barbican.py @@ -12,72 +12,63 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. - -import os -import sys - -from requests_mock.contrib import fixture import six -import testtools -import uuid -import json from barbicanclient.tests import keystone_client_fixtures from barbicanclient.tests import test_client -import barbicanclient.barbican +from barbicanclient.barbican import Barbican class WhenTestingBarbicanCLI(test_client.BaseEntityResource): def setUp(self): self._setUp('barbican') - self.global_file = six.StringIO() + self.captured_stdout = six.StringIO() + self.captured_stderr = six.StringIO() + self.barbican = Barbican( + stdout=self.captured_stdout, + stderr=self.captured_stderr + ) + self.parser = self.barbican.build_option_parser('desc', 'vers') - def barbican(self, argstr): - """Source: Keystone client's shell method in test_shell.py""" - clean_env = {} - _old_env, os.environ = os.environ, clean_env.copy() - exit_code = 1 - try: - stdout = self.global_file - _barbican = barbicanclient.barbican.Barbican(stdout=stdout, - stderr=stdout) - exit_code = _barbican.run(argv=argstr.split()) - except Exception as exception: - exit_message = exception.message - except SystemExit as sys_exit_exception: - exit_code = sys_exit_exception.code - finally: - out = stdout.getvalue() - os.environ = _old_env - return exit_code, out + def assert_client_raises(self, args, message): + argv, remainder = self.parser.parse_known_args(args.split()) + e = self.assertRaises( + Exception, self.barbican.create_client, argv + ) + self.assertIn(message, str(e)) - def test_should_show_usage_error_with_no_args(self): - args = "" - exit_code, out = self.barbican(args) - self.assertEqual(1, exit_code) - self.assertIn('usage:', out) + def create_and_assert_client(self, args): + argv, remainder = self.parser.parse_known_args(args.split()) + + client = self.barbican.create_client(argv) + self.assertIsNotNone(client) + return client def test_should_show_usage_with_help_flag(self): - args = "-h" - exit_code, out = self.barbican(args) - self.assertEqual(0, exit_code) - self.assertIn('usage: ', out) + e = self.assertRaises(SystemExit, self.parser.parse_known_args, ['-h']) + self.assertEqual(0, e.code) + self.assertIn('usage', self.captured_stdout.getvalue()) + + def test_should_show_usage_with_no_args(self): + exit_code = self.barbican.run([]) + self.assertEquals(1, exit_code) + self.assertIn('usage', self.captured_stderr.getvalue()) def test_should_error_if_noauth_and_authurl_both_specified(self): args = "--no-auth --os-auth-url http://localhost:5000/v3" - exit_code, out = self.barbican(args) - self.assertEqual(1, exit_code) - self.assertIn( + message = ( 'ERROR: argument --os-auth-url/-A: not allowed with ' - 'argument --no-auth/-N', out) + 'argument --no-auth/-N' + ) + self.assert_client_raises(args, message) def _expect_error_with_invalid_noauth_args(self, args): - exit_code, out = self.barbican(args) - self.assertEqual(1, exit_code) - expected_err_msg = 'ERROR: please specify --endpoint '\ - 'and --os-project-id(or --os-tenant-id)\n' - self.assertIn(expected_err_msg, out) + expected_err_msg = ( + 'ERROR: please specify --endpoint ' + 'and --os-project-id (or --os-tenant-id)' + ) + self.assert_client_raises(args, expected_err_msg) def test_should_error_if_noauth_and_missing_endpoint_tenantid_args(self): self._expect_error_with_invalid_noauth_args("--no-auth secret list") @@ -88,39 +79,34 @@ class WhenTestingBarbicanCLI(test_client.BaseEntityResource): self._expect_error_with_invalid_noauth_args( "--no-auth --os-project-id 123 secret list") - def _expect_success_code(self, args): - exit_code, out = self.barbican(args) - self.assertEqual(0, exit_code) - - def _expect_failure_code(self, args, code=1): - exit_code, out = self.barbican(args) - self.assertEqual(code, exit_code) - - def _assert_status_code_and_msg(self, args, expected_msg, code=1): - exit_code, out = self.barbican(args) - self.assertEqual(code, exit_code) - self.assertIn(expected_msg, out) - def test_should_succeed_if_noauth_with_valid_args_specified(self): + args = ( + '--no-auth --endpoint {0} --os-tenant-id {1}' + 'secret list'.format(self.endpoint, self.project_id) + ) list_secrets_url = '{0}/v1/secrets'.format(self.endpoint) - self.responses.get(list_secrets_url, json={"secrets": [], "total": 0}) - - self._expect_success_code( - "--no-auth --endpoint {0} --os-tenant-id {1} secret list". - format(self.endpoint, self.project_id)) + client = self.create_and_assert_client(args) + secret_list = client.secrets.list() + self.assertTrue(self.responses._adapter.called) + self.assertEqual(1, self.responses._adapter.call_count) + self.assertEqual([], secret_list) def test_should_error_if_required_keystone_auth_arguments_are_missing( self): - expected_error_msg = 'ERROR: please specify authentication credentials' - self._assert_status_code_and_msg( + expected_error_msg = ( + 'ERROR: please specify the following --os-project-id or' + ' (--os-project-name and --os-project-domain-name) or ' + ' (--os-project-name and --os-project-domain-id)' + ) + self.assert_client_raises( '--os-auth-url http://localhost:35357/v2.0 secret list', expected_error_msg) - self._assert_status_code_and_msg('--os-auth-url ' - 'http://localhost:35357/v2.0 ' - '--os-username barbican ' - '--os-password barbican ' - 'secret list', expected_error_msg) + self.assert_client_raises( + '--os-auth-url http://localhost:35357/v2.0 --os-username barbican ' + '--os-password barbican secret list', + expected_error_msg + ) class TestBarbicanWithKeystonePasswordAuth(