Merge "Refactored barbican.py for better testability"
This commit is contained in:
@@ -21,7 +21,8 @@ import sys
|
||||
|
||||
from cliff import app
|
||||
from cliff import commandmanager
|
||||
from keystoneclient.auth import identity
|
||||
from keystoneclient.auth.identity import v3
|
||||
from keystoneclient.auth.identity import v2
|
||||
from keystoneclient import session
|
||||
import six
|
||||
|
||||
@@ -43,6 +44,138 @@ class Barbican(app.App):
|
||||
**kwargs
|
||||
)
|
||||
|
||||
def check_auth_arguments(self, args, api_version=None, raise_exc=False):
|
||||
"""Verifies that we have the correct arguments for authentication
|
||||
|
||||
Supported Keystone v3 combinations:
|
||||
- Project Id
|
||||
- Project Name + Project Domain Name
|
||||
- Project Name + Project Domain Id
|
||||
Supported Keystone v2 combinations:
|
||||
- Tenant Id
|
||||
- Tenant Name
|
||||
"""
|
||||
successful = True
|
||||
v3_arg_combinations = [
|
||||
args.os_project_id,
|
||||
args.os_project_name and args.os_project_domain_name,
|
||||
args.os_project_name and args.os_project_domain_id
|
||||
]
|
||||
v2_arg_combinations = [args.os_tenant_id, args.os_tenant_name]
|
||||
|
||||
# Keystone V3
|
||||
if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION:
|
||||
if not any(v3_arg_combinations):
|
||||
msg = ('ERROR: please specify the following --os-project-id or'
|
||||
' (--os-project-name and --os-project-domain-name) or '
|
||||
' (--os-project-name and --os-project-domain-id)')
|
||||
successful = False
|
||||
# Keystone V2
|
||||
else:
|
||||
if not any(v2_arg_combinations):
|
||||
msg = ('ERROR: please specify --os-tenant-id or'
|
||||
'--os-tenant-name')
|
||||
successful = False
|
||||
|
||||
if not successful and raise_exc:
|
||||
raise Exception(msg)
|
||||
|
||||
return successful
|
||||
|
||||
def build_kwargs_based_on_version(self, args, api_version=None):
|
||||
if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION:
|
||||
kwargs = {
|
||||
'project_id': args.os_project_id,
|
||||
'project_name': args.os_project_name,
|
||||
'user_domain_id': args.os_user_domain_id,
|
||||
'user_domain_name': args.os_user_domain_name,
|
||||
'project_domain_id': args.os_project_domain_id,
|
||||
'project_domain_name': args.os_project_name
|
||||
}
|
||||
else:
|
||||
kwargs = {
|
||||
'tenant_name': args.os_tenant_name,
|
||||
'tenant_id': args.os_tenant_id
|
||||
}
|
||||
|
||||
# Return a dictionary with only the populated (not None) values
|
||||
return dict((k, v) for (k, v) in six.iteritems(kwargs) if v)
|
||||
|
||||
def create_keystone_session(
|
||||
self, args, api_version, kwargs_dict, auth_type
|
||||
):
|
||||
# Make sure we have the correct arguments to function
|
||||
self.check_auth_arguments(args, api_version, raise_exc=True)
|
||||
|
||||
kwargs = self.build_kwargs_based_on_version(args, api_version)
|
||||
kwargs.update(kwargs_dict)
|
||||
|
||||
if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION:
|
||||
method = v3.Token if auth_type == 'token' else v3.Password
|
||||
else:
|
||||
method = v2.Token if auth_type == 'token' else v2.Password
|
||||
|
||||
auth = method(**kwargs)
|
||||
|
||||
return session.Session(auth=auth, verify=not args.insecure)
|
||||
|
||||
def create_client(self, args):
|
||||
created_client = None
|
||||
|
||||
api_version = args.os_identity_api_version
|
||||
if args.no_auth and args.os_auth_url:
|
||||
raise Exception(
|
||||
'ERROR: argument --os-auth-url/-A: not allowed '
|
||||
'with argument --no-auth/-N'
|
||||
)
|
||||
|
||||
if args.no_auth:
|
||||
if not all([args.endpoint, args.os_tenant_id or
|
||||
args.os_project_id]):
|
||||
raise Exception(
|
||||
'ERROR: please specify --endpoint and '
|
||||
'--os-project-id (or --os-tenant-id)')
|
||||
created_client = client.Client(
|
||||
endpoint=args.endpoint,
|
||||
project_id=args.os_tenant_id or args.os_project_id,
|
||||
verify=not args.insecure
|
||||
)
|
||||
# Token-based authentication
|
||||
elif args.os_auth_token:
|
||||
if not args.os_auth_url:
|
||||
raise Exception('ERROR: please specify --os-auth-url')
|
||||
token_kwargs = {
|
||||
'auth_url': args.os_auth_url,
|
||||
'token': args.os_auth_token
|
||||
}
|
||||
session = self.create_keystone_session(
|
||||
args, api_version, token_kwargs, auth_type='token'
|
||||
)
|
||||
created_client = client.Client(
|
||||
session=session,
|
||||
endpoint=args.endpoint
|
||||
)
|
||||
|
||||
# Password-based authentication
|
||||
elif args.os_auth_url:
|
||||
password_kwargs = {
|
||||
'auth_url': args.os_auth_url,
|
||||
'password': args.os_password,
|
||||
'user_id': args.os_user_id,
|
||||
'username': args.os_username
|
||||
}
|
||||
session = self.create_keystone_session(
|
||||
args, api_version, password_kwargs, auth_type='password'
|
||||
)
|
||||
created_client = client.Client(
|
||||
session=session,
|
||||
endpoint=args.endpoint
|
||||
)
|
||||
else:
|
||||
raise Exception('ERROR: please specify authentication credentials')
|
||||
|
||||
return created_client
|
||||
|
||||
def build_option_parser(self, description, version, argparse_kwargs=None):
|
||||
"""Introduces global arguments for the application.
|
||||
This is inherited from the framework.
|
||||
@@ -122,69 +255,6 @@ class Barbican(app.App):
|
||||
session.Session.register_cli_options(parser)
|
||||
return parser
|
||||
|
||||
def _assert_no_auth_and_auth_url_mutually_exclusive(self, no_auth,
|
||||
auth_url):
|
||||
if no_auth and auth_url:
|
||||
raise Exception("ERROR: argument --os-auth-url/-A: not allowed "
|
||||
"with argument --no-auth/-N")
|
||||
|
||||
def _check_auth_arguments(self, args, api_version=None, raise_exc=False):
|
||||
"""Verifies that we have the correct arguments for authentication
|
||||
|
||||
Supported Keystone v3 combinations:
|
||||
- Project Id
|
||||
- Project Name + Project Domain Name
|
||||
- Project Name + Project Domain Id
|
||||
Support Keystone v2 combinations:
|
||||
- Tenant Id
|
||||
- Tenant Name
|
||||
"""
|
||||
successful = True
|
||||
v3_arg_combinations = [
|
||||
args.os_project_id,
|
||||
args.os_project_name and args.os_project_domain_name,
|
||||
args.os_project_name and args.os_project_domain_id
|
||||
]
|
||||
v2_arg_combinations = [args.os_tenant_id, args.os_tenant_name]
|
||||
|
||||
# Keystone V3
|
||||
if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION:
|
||||
if not any(v3_arg_combinations):
|
||||
msg = ('ERROR: please specify the following --os-project-id or'
|
||||
'--os-project-name and --os-project-domain-name or '
|
||||
'--os-project-name and --os-project-domain-id')
|
||||
successful = False
|
||||
# Keystone V2
|
||||
else:
|
||||
if not any(v2_arg_combinations):
|
||||
msg = ('ERROR: please specify --os-tenant-id or'
|
||||
'--os-tenant-name')
|
||||
successful = False
|
||||
|
||||
if not successful and raise_exc:
|
||||
raise Exception(msg)
|
||||
|
||||
return successful
|
||||
|
||||
def _build_kwargs_based_on_version(self, args, api_version=None):
|
||||
if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION:
|
||||
kwargs = {
|
||||
'project_id': args.os_project_id,
|
||||
'project_name': args.os_project_name,
|
||||
'user_domain_id': args.os_user_domain_id,
|
||||
'user_domain_name': args.os_user_domain_name,
|
||||
'project_domain_id': args.os_project_id,
|
||||
'project_domain_name': args.os_project_name
|
||||
}
|
||||
else:
|
||||
kwargs = {
|
||||
'tenant_name': args.os_tenant_name,
|
||||
'tenant_id': args.os_tenant_id
|
||||
}
|
||||
|
||||
# Return a dictionary with only the populated (not None) values
|
||||
return dict((k, v) for (k, v) in six.iteritems(kwargs) if v)
|
||||
|
||||
def initialize_app(self, argv):
|
||||
"""Initializes the application.
|
||||
Checks if the minimal parameters are provided and creates the client
|
||||
@@ -192,86 +262,14 @@ class Barbican(app.App):
|
||||
This is inherited from the framework.
|
||||
"""
|
||||
args = self.options
|
||||
self._assert_no_auth_and_auth_url_mutually_exclusive(args.no_auth,
|
||||
args.os_auth_url)
|
||||
self.client = self.create_client(args)
|
||||
|
||||
# Aliasing as we use this a number of times
|
||||
api_version = args.os_identity_api_version
|
||||
|
||||
# TODO(jmvrbanac): Split out these conditionals into discrete functions
|
||||
if args.no_auth:
|
||||
if not all([args.endpoint, args.os_tenant_id or
|
||||
args.os_project_id]):
|
||||
raise Exception(
|
||||
'ERROR: please specify --endpoint and '
|
||||
'--os-project-id(or --os-tenant-id)')
|
||||
self.client = client.Client(endpoint=args.endpoint,
|
||||
project_id=args.os_tenant_id or
|
||||
args.os_project_id,
|
||||
verify=not args.insecure)
|
||||
# Token-based authentication
|
||||
elif args.os_auth_token:
|
||||
if not args.os_auth_url:
|
||||
raise Exception('ERROR: please specify --os-auth-url')
|
||||
|
||||
# Make sure we have the correct arguments to function
|
||||
self._check_auth_arguments(args, api_version, raise_exc=True)
|
||||
|
||||
kwargs = self._build_kwargs_based_on_version(args, api_version)
|
||||
kwargs.update({
|
||||
'auth_url': args.os_auth_url,
|
||||
'token': args.os_auth_token
|
||||
})
|
||||
|
||||
if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION:
|
||||
auth = identity.v3.Token(**kwargs)
|
||||
else:
|
||||
auth = identity.v2.Token(**kwargs)
|
||||
|
||||
ks_session = session.Session(auth=auth, verify=not args.insecure)
|
||||
self.client = client.Client(
|
||||
session=ks_session,
|
||||
endpoint=args.endpoint
|
||||
)
|
||||
# Password-based authentication
|
||||
elif all([args.os_auth_url, args.os_user_id or args.os_username,
|
||||
args.os_password, args.os_tenant_name or args.os_tenant_id or
|
||||
args.os_project_name or args.os_project_id]):
|
||||
kwargs = dict()
|
||||
kwargs['auth_url'] = args.os_auth_url
|
||||
kwargs['password'] = args.os_password
|
||||
if args.os_user_id:
|
||||
kwargs['user_id'] = args.os_user_id
|
||||
if args.os_username:
|
||||
kwargs['username'] = args.os_username
|
||||
|
||||
if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION:
|
||||
if args.os_project_id:
|
||||
kwargs['project_id'] = args.os_project_id
|
||||
if args.os_project_name:
|
||||
kwargs['project_name'] = args.os_project_name
|
||||
if args.os_user_domain_id:
|
||||
kwargs['user_domain_id'] = args.os_user_domain_id
|
||||
if args.os_user_domain_name:
|
||||
kwargs['user_domain_name'] = args.os_user_domain_name
|
||||
if args.os_project_domain_id:
|
||||
kwargs['project_domain_id'] = args.os_project_domain_id
|
||||
if args.os_project_domain_name:
|
||||
kwargs['project_domain_name'] = args.os_project_domain_name
|
||||
auth = identity.v3.Password(**kwargs)
|
||||
else:
|
||||
if args.os_tenant_id:
|
||||
kwargs['tenant_id'] = args.os_tenant_id
|
||||
if args.os_tenant_name:
|
||||
kwargs['tenant_name'] = args.os_tenant_name
|
||||
auth = identity.v2.Password(**kwargs)
|
||||
|
||||
ks_session = session.Session(auth=auth, verify=not args.insecure)
|
||||
self.client = client.Client(session=ks_session,
|
||||
endpoint=args.endpoint)
|
||||
else:
|
||||
def run(self, argv):
|
||||
# If no arguments are provided, usage is displayed
|
||||
if not argv:
|
||||
self.stderr.write(self.parser.format_usage())
|
||||
raise Exception('ERROR: please specify authentication credentials')
|
||||
return 1
|
||||
return super(Barbican, self).run(argv)
|
||||
|
||||
|
||||
def main(argv=sys.argv[1:]):
|
||||
|
||||
@@ -12,72 +12,63 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from requests_mock.contrib import fixture
|
||||
import six
|
||||
import testtools
|
||||
import uuid
|
||||
import json
|
||||
|
||||
from barbicanclient.tests import keystone_client_fixtures
|
||||
from barbicanclient.tests import test_client
|
||||
import barbicanclient.barbican
|
||||
from barbicanclient.barbican import Barbican
|
||||
|
||||
|
||||
class WhenTestingBarbicanCLI(test_client.BaseEntityResource):
|
||||
|
||||
def setUp(self):
|
||||
self._setUp('barbican')
|
||||
self.global_file = six.StringIO()
|
||||
self.captured_stdout = six.StringIO()
|
||||
self.captured_stderr = six.StringIO()
|
||||
self.barbican = Barbican(
|
||||
stdout=self.captured_stdout,
|
||||
stderr=self.captured_stderr
|
||||
)
|
||||
self.parser = self.barbican.build_option_parser('desc', 'vers')
|
||||
|
||||
def barbican(self, argstr):
|
||||
"""Source: Keystone client's shell method in test_shell.py"""
|
||||
clean_env = {}
|
||||
_old_env, os.environ = os.environ, clean_env.copy()
|
||||
exit_code = 1
|
||||
try:
|
||||
stdout = self.global_file
|
||||
_barbican = barbicanclient.barbican.Barbican(stdout=stdout,
|
||||
stderr=stdout)
|
||||
exit_code = _barbican.run(argv=argstr.split())
|
||||
except Exception as exception:
|
||||
exit_message = exception.message
|
||||
except SystemExit as sys_exit_exception:
|
||||
exit_code = sys_exit_exception.code
|
||||
finally:
|
||||
out = stdout.getvalue()
|
||||
os.environ = _old_env
|
||||
return exit_code, out
|
||||
def assert_client_raises(self, args, message):
|
||||
argv, remainder = self.parser.parse_known_args(args.split())
|
||||
e = self.assertRaises(
|
||||
Exception, self.barbican.create_client, argv
|
||||
)
|
||||
self.assertIn(message, str(e))
|
||||
|
||||
def test_should_show_usage_error_with_no_args(self):
|
||||
args = ""
|
||||
exit_code, out = self.barbican(args)
|
||||
self.assertEqual(1, exit_code)
|
||||
self.assertIn('usage:', out)
|
||||
def create_and_assert_client(self, args):
|
||||
argv, remainder = self.parser.parse_known_args(args.split())
|
||||
|
||||
client = self.barbican.create_client(argv)
|
||||
self.assertIsNotNone(client)
|
||||
return client
|
||||
|
||||
def test_should_show_usage_with_help_flag(self):
|
||||
args = "-h"
|
||||
exit_code, out = self.barbican(args)
|
||||
self.assertEqual(0, exit_code)
|
||||
self.assertIn('usage: ', out)
|
||||
e = self.assertRaises(SystemExit, self.parser.parse_known_args, ['-h'])
|
||||
self.assertEqual(0, e.code)
|
||||
self.assertIn('usage', self.captured_stdout.getvalue())
|
||||
|
||||
def test_should_show_usage_with_no_args(self):
|
||||
exit_code = self.barbican.run([])
|
||||
self.assertEquals(1, exit_code)
|
||||
self.assertIn('usage', self.captured_stderr.getvalue())
|
||||
|
||||
def test_should_error_if_noauth_and_authurl_both_specified(self):
|
||||
args = "--no-auth --os-auth-url http://localhost:5000/v3"
|
||||
exit_code, out = self.barbican(args)
|
||||
self.assertEqual(1, exit_code)
|
||||
self.assertIn(
|
||||
message = (
|
||||
'ERROR: argument --os-auth-url/-A: not allowed with '
|
||||
'argument --no-auth/-N', out)
|
||||
'argument --no-auth/-N'
|
||||
)
|
||||
self.assert_client_raises(args, message)
|
||||
|
||||
def _expect_error_with_invalid_noauth_args(self, args):
|
||||
exit_code, out = self.barbican(args)
|
||||
self.assertEqual(1, exit_code)
|
||||
expected_err_msg = 'ERROR: please specify --endpoint '\
|
||||
'and --os-project-id(or --os-tenant-id)\n'
|
||||
self.assertIn(expected_err_msg, out)
|
||||
expected_err_msg = (
|
||||
'ERROR: please specify --endpoint '
|
||||
'and --os-project-id (or --os-tenant-id)'
|
||||
)
|
||||
self.assert_client_raises(args, expected_err_msg)
|
||||
|
||||
def test_should_error_if_noauth_and_missing_endpoint_tenantid_args(self):
|
||||
self._expect_error_with_invalid_noauth_args("--no-auth secret list")
|
||||
@@ -88,39 +79,34 @@ class WhenTestingBarbicanCLI(test_client.BaseEntityResource):
|
||||
self._expect_error_with_invalid_noauth_args(
|
||||
"--no-auth --os-project-id 123 secret list")
|
||||
|
||||
def _expect_success_code(self, args):
|
||||
exit_code, out = self.barbican(args)
|
||||
self.assertEqual(0, exit_code)
|
||||
|
||||
def _expect_failure_code(self, args, code=1):
|
||||
exit_code, out = self.barbican(args)
|
||||
self.assertEqual(code, exit_code)
|
||||
|
||||
def _assert_status_code_and_msg(self, args, expected_msg, code=1):
|
||||
exit_code, out = self.barbican(args)
|
||||
self.assertEqual(code, exit_code)
|
||||
self.assertIn(expected_msg, out)
|
||||
|
||||
def test_should_succeed_if_noauth_with_valid_args_specified(self):
|
||||
args = (
|
||||
'--no-auth --endpoint {0} --os-tenant-id {1}'
|
||||
'secret list'.format(self.endpoint, self.project_id)
|
||||
)
|
||||
list_secrets_url = '{0}/v1/secrets'.format(self.endpoint)
|
||||
|
||||
self.responses.get(list_secrets_url, json={"secrets": [], "total": 0})
|
||||
|
||||
self._expect_success_code(
|
||||
"--no-auth --endpoint {0} --os-tenant-id {1} secret list".
|
||||
format(self.endpoint, self.project_id))
|
||||
client = self.create_and_assert_client(args)
|
||||
secret_list = client.secrets.list()
|
||||
self.assertTrue(self.responses._adapter.called)
|
||||
self.assertEqual(1, self.responses._adapter.call_count)
|
||||
self.assertEqual([], secret_list)
|
||||
|
||||
def test_should_error_if_required_keystone_auth_arguments_are_missing(
|
||||
self):
|
||||
expected_error_msg = 'ERROR: please specify authentication credentials'
|
||||
self._assert_status_code_and_msg(
|
||||
expected_error_msg = (
|
||||
'ERROR: please specify the following --os-project-id or'
|
||||
' (--os-project-name and --os-project-domain-name) or '
|
||||
' (--os-project-name and --os-project-domain-id)'
|
||||
)
|
||||
self.assert_client_raises(
|
||||
'--os-auth-url http://localhost:35357/v2.0 secret list',
|
||||
expected_error_msg)
|
||||
self._assert_status_code_and_msg('--os-auth-url '
|
||||
'http://localhost:35357/v2.0 '
|
||||
'--os-username barbican '
|
||||
'--os-password barbican '
|
||||
'secret list', expected_error_msg)
|
||||
self.assert_client_raises(
|
||||
'--os-auth-url http://localhost:35357/v2.0 --os-username barbican '
|
||||
'--os-password barbican secret list',
|
||||
expected_error_msg
|
||||
)
|
||||
|
||||
|
||||
class TestBarbicanWithKeystonePasswordAuth(
|
||||
|
||||
Reference in New Issue
Block a user