Refactored barbican.py for better testability
We heavily refactored barbican.py and its respective tests in order to ensure better testing practices. The tests no longer rely on executing through cliff. Cliff was responsible for suppressing errors. Change-Id: I820c90a0d197d6470b36b5767b9f983572a7ff0f
This commit is contained in:
parent
28a45f591c
commit
5ef33c2da8
|
@ -21,7 +21,8 @@ import sys
|
|||
|
||||
from cliff import app
|
||||
from cliff import commandmanager
|
||||
from keystoneclient.auth import identity
|
||||
from keystoneclient.auth.identity import v3
|
||||
from keystoneclient.auth.identity import v2
|
||||
from keystoneclient import session
|
||||
import six
|
||||
|
||||
|
@ -43,6 +44,138 @@ class Barbican(app.App):
|
|||
**kwargs
|
||||
)
|
||||
|
||||
def check_auth_arguments(self, args, api_version=None, raise_exc=False):
|
||||
"""Verifies that we have the correct arguments for authentication
|
||||
|
||||
Supported Keystone v3 combinations:
|
||||
- Project Id
|
||||
- Project Name + Project Domain Name
|
||||
- Project Name + Project Domain Id
|
||||
Supported Keystone v2 combinations:
|
||||
- Tenant Id
|
||||
- Tenant Name
|
||||
"""
|
||||
successful = True
|
||||
v3_arg_combinations = [
|
||||
args.os_project_id,
|
||||
args.os_project_name and args.os_project_domain_name,
|
||||
args.os_project_name and args.os_project_domain_id
|
||||
]
|
||||
v2_arg_combinations = [args.os_tenant_id, args.os_tenant_name]
|
||||
|
||||
# Keystone V3
|
||||
if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION:
|
||||
if not any(v3_arg_combinations):
|
||||
msg = ('ERROR: please specify the following --os-project-id or'
|
||||
' (--os-project-name and --os-project-domain-name) or '
|
||||
' (--os-project-name and --os-project-domain-id)')
|
||||
successful = False
|
||||
# Keystone V2
|
||||
else:
|
||||
if not any(v2_arg_combinations):
|
||||
msg = ('ERROR: please specify --os-tenant-id or'
|
||||
'--os-tenant-name')
|
||||
successful = False
|
||||
|
||||
if not successful and raise_exc:
|
||||
raise Exception(msg)
|
||||
|
||||
return successful
|
||||
|
||||
def build_kwargs_based_on_version(self, args, api_version=None):
|
||||
if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION:
|
||||
kwargs = {
|
||||
'project_id': args.os_project_id,
|
||||
'project_name': args.os_project_name,
|
||||
'user_domain_id': args.os_user_domain_id,
|
||||
'user_domain_name': args.os_user_domain_name,
|
||||
'project_domain_id': args.os_project_domain_id,
|
||||
'project_domain_name': args.os_project_name
|
||||
}
|
||||
else:
|
||||
kwargs = {
|
||||
'tenant_name': args.os_tenant_name,
|
||||
'tenant_id': args.os_tenant_id
|
||||
}
|
||||
|
||||
# Return a dictionary with only the populated (not None) values
|
||||
return dict((k, v) for (k, v) in six.iteritems(kwargs) if v)
|
||||
|
||||
def create_keystone_session(
|
||||
self, args, api_version, kwargs_dict, auth_type
|
||||
):
|
||||
# Make sure we have the correct arguments to function
|
||||
self.check_auth_arguments(args, api_version, raise_exc=True)
|
||||
|
||||
kwargs = self.build_kwargs_based_on_version(args, api_version)
|
||||
kwargs.update(kwargs_dict)
|
||||
|
||||
if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION:
|
||||
method = v3.Token if auth_type == 'token' else v3.Password
|
||||
else:
|
||||
method = v2.Token if auth_type == 'token' else v2.Password
|
||||
|
||||
auth = method(**kwargs)
|
||||
|
||||
return session.Session(auth=auth, verify=not args.insecure)
|
||||
|
||||
def create_client(self, args):
|
||||
created_client = None
|
||||
|
||||
api_version = args.os_identity_api_version
|
||||
if args.no_auth and args.os_auth_url:
|
||||
raise Exception(
|
||||
'ERROR: argument --os-auth-url/-A: not allowed '
|
||||
'with argument --no-auth/-N'
|
||||
)
|
||||
|
||||
if args.no_auth:
|
||||
if not all([args.endpoint, args.os_tenant_id or
|
||||
args.os_project_id]):
|
||||
raise Exception(
|
||||
'ERROR: please specify --endpoint and '
|
||||
'--os-project-id (or --os-tenant-id)')
|
||||
created_client = client.Client(
|
||||
endpoint=args.endpoint,
|
||||
project_id=args.os_tenant_id or args.os_project_id,
|
||||
verify=not args.insecure
|
||||
)
|
||||
# Token-based authentication
|
||||
elif args.os_auth_token:
|
||||
if not args.os_auth_url:
|
||||
raise Exception('ERROR: please specify --os-auth-url')
|
||||
token_kwargs = {
|
||||
'auth_url': args.os_auth_url,
|
||||
'token': args.os_auth_token
|
||||
}
|
||||
session = self.create_keystone_session(
|
||||
args, api_version, token_kwargs, auth_type='token'
|
||||
)
|
||||
created_client = client.Client(
|
||||
session=session,
|
||||
endpoint=args.endpoint
|
||||
)
|
||||
|
||||
# Password-based authentication
|
||||
elif args.os_auth_url:
|
||||
password_kwargs = {
|
||||
'auth_url': args.os_auth_url,
|
||||
'password': args.os_password,
|
||||
'user_id': args.os_user_id,
|
||||
'username': args.os_username
|
||||
}
|
||||
session = self.create_keystone_session(
|
||||
args, api_version, password_kwargs, auth_type='password'
|
||||
)
|
||||
created_client = client.Client(
|
||||
session=session,
|
||||
endpoint=args.endpoint
|
||||
)
|
||||
else:
|
||||
raise Exception('ERROR: please specify authentication credentials')
|
||||
|
||||
return created_client
|
||||
|
||||
def build_option_parser(self, description, version, argparse_kwargs=None):
|
||||
"""Introduces global arguments for the application.
|
||||
This is inherited from the framework.
|
||||
|
@ -122,69 +255,6 @@ class Barbican(app.App):
|
|||
session.Session.register_cli_options(parser)
|
||||
return parser
|
||||
|
||||
def _assert_no_auth_and_auth_url_mutually_exclusive(self, no_auth,
|
||||
auth_url):
|
||||
if no_auth and auth_url:
|
||||
raise Exception("ERROR: argument --os-auth-url/-A: not allowed "
|
||||
"with argument --no-auth/-N")
|
||||
|
||||
def _check_auth_arguments(self, args, api_version=None, raise_exc=False):
|
||||
"""Verifies that we have the correct arguments for authentication
|
||||
|
||||
Supported Keystone v3 combinations:
|
||||
- Project Id
|
||||
- Project Name + Project Domain Name
|
||||
- Project Name + Project Domain Id
|
||||
Support Keystone v2 combinations:
|
||||
- Tenant Id
|
||||
- Tenant Name
|
||||
"""
|
||||
successful = True
|
||||
v3_arg_combinations = [
|
||||
args.os_project_id,
|
||||
args.os_project_name and args.os_project_domain_name,
|
||||
args.os_project_name and args.os_project_domain_id
|
||||
]
|
||||
v2_arg_combinations = [args.os_tenant_id, args.os_tenant_name]
|
||||
|
||||
# Keystone V3
|
||||
if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION:
|
||||
if not any(v3_arg_combinations):
|
||||
msg = ('ERROR: please specify the following --os-project-id or'
|
||||
'--os-project-name and --os-project-domain-name or '
|
||||
'--os-project-name and --os-project-domain-id')
|
||||
successful = False
|
||||
# Keystone V2
|
||||
else:
|
||||
if not any(v2_arg_combinations):
|
||||
msg = ('ERROR: please specify --os-tenant-id or'
|
||||
'--os-tenant-name')
|
||||
successful = False
|
||||
|
||||
if not successful and raise_exc:
|
||||
raise Exception(msg)
|
||||
|
||||
return successful
|
||||
|
||||
def _build_kwargs_based_on_version(self, args, api_version=None):
|
||||
if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION:
|
||||
kwargs = {
|
||||
'project_id': args.os_project_id,
|
||||
'project_name': args.os_project_name,
|
||||
'user_domain_id': args.os_user_domain_id,
|
||||
'user_domain_name': args.os_user_domain_name,
|
||||
'project_domain_id': args.os_project_id,
|
||||
'project_domain_name': args.os_project_name
|
||||
}
|
||||
else:
|
||||
kwargs = {
|
||||
'tenant_name': args.os_tenant_name,
|
||||
'tenant_id': args.os_tenant_id
|
||||
}
|
||||
|
||||
# Return a dictionary with only the populated (not None) values
|
||||
return dict((k, v) for (k, v) in six.iteritems(kwargs) if v)
|
||||
|
||||
def initialize_app(self, argv):
|
||||
"""Initializes the application.
|
||||
Checks if the minimal parameters are provided and creates the client
|
||||
|
@ -192,86 +262,14 @@ class Barbican(app.App):
|
|||
This is inherited from the framework.
|
||||
"""
|
||||
args = self.options
|
||||
self._assert_no_auth_and_auth_url_mutually_exclusive(args.no_auth,
|
||||
args.os_auth_url)
|
||||
self.client = self.create_client(args)
|
||||
|
||||
# Aliasing as we use this a number of times
|
||||
api_version = args.os_identity_api_version
|
||||
|
||||
# TODO(jmvrbanac): Split out these conditionals into discrete functions
|
||||
if args.no_auth:
|
||||
if not all([args.endpoint, args.os_tenant_id or
|
||||
args.os_project_id]):
|
||||
raise Exception(
|
||||
'ERROR: please specify --endpoint and '
|
||||
'--os-project-id(or --os-tenant-id)')
|
||||
self.client = client.Client(endpoint=args.endpoint,
|
||||
project_id=args.os_tenant_id or
|
||||
args.os_project_id,
|
||||
verify=not args.insecure)
|
||||
# Token-based authentication
|
||||
elif args.os_auth_token:
|
||||
if not args.os_auth_url:
|
||||
raise Exception('ERROR: please specify --os-auth-url')
|
||||
|
||||
# Make sure we have the correct arguments to function
|
||||
self._check_auth_arguments(args, api_version, raise_exc=True)
|
||||
|
||||
kwargs = self._build_kwargs_based_on_version(args, api_version)
|
||||
kwargs.update({
|
||||
'auth_url': args.os_auth_url,
|
||||
'token': args.os_auth_token
|
||||
})
|
||||
|
||||
if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION:
|
||||
auth = identity.v3.Token(**kwargs)
|
||||
else:
|
||||
auth = identity.v2.Token(**kwargs)
|
||||
|
||||
ks_session = session.Session(auth=auth, verify=not args.insecure)
|
||||
self.client = client.Client(
|
||||
session=ks_session,
|
||||
endpoint=args.endpoint
|
||||
)
|
||||
# Password-based authentication
|
||||
elif all([args.os_auth_url, args.os_user_id or args.os_username,
|
||||
args.os_password, args.os_tenant_name or args.os_tenant_id or
|
||||
args.os_project_name or args.os_project_id]):
|
||||
kwargs = dict()
|
||||
kwargs['auth_url'] = args.os_auth_url
|
||||
kwargs['password'] = args.os_password
|
||||
if args.os_user_id:
|
||||
kwargs['user_id'] = args.os_user_id
|
||||
if args.os_username:
|
||||
kwargs['username'] = args.os_username
|
||||
|
||||
if not api_version or api_version == _DEFAULT_IDENTITY_API_VERSION:
|
||||
if args.os_project_id:
|
||||
kwargs['project_id'] = args.os_project_id
|
||||
if args.os_project_name:
|
||||
kwargs['project_name'] = args.os_project_name
|
||||
if args.os_user_domain_id:
|
||||
kwargs['user_domain_id'] = args.os_user_domain_id
|
||||
if args.os_user_domain_name:
|
||||
kwargs['user_domain_name'] = args.os_user_domain_name
|
||||
if args.os_project_domain_id:
|
||||
kwargs['project_domain_id'] = args.os_project_domain_id
|
||||
if args.os_project_domain_name:
|
||||
kwargs['project_domain_name'] = args.os_project_domain_name
|
||||
auth = identity.v3.Password(**kwargs)
|
||||
else:
|
||||
if args.os_tenant_id:
|
||||
kwargs['tenant_id'] = args.os_tenant_id
|
||||
if args.os_tenant_name:
|
||||
kwargs['tenant_name'] = args.os_tenant_name
|
||||
auth = identity.v2.Password(**kwargs)
|
||||
|
||||
ks_session = session.Session(auth=auth, verify=not args.insecure)
|
||||
self.client = client.Client(session=ks_session,
|
||||
endpoint=args.endpoint)
|
||||
else:
|
||||
def run(self, argv):
|
||||
# If no arguments are provided, usage is displayed
|
||||
if not argv:
|
||||
self.stderr.write(self.parser.format_usage())
|
||||
raise Exception('ERROR: please specify authentication credentials')
|
||||
return 1
|
||||
return super(Barbican, self).run(argv)
|
||||
|
||||
|
||||
def main(argv=sys.argv[1:]):
|
||||
|
|
|
@ -12,72 +12,63 @@
|
|||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from requests_mock.contrib import fixture
|
||||
import six
|
||||
import testtools
|
||||
import uuid
|
||||
import json
|
||||
|
||||
from barbicanclient.tests import keystone_client_fixtures
|
||||
from barbicanclient.tests import test_client
|
||||
import barbicanclient.barbican
|
||||
from barbicanclient.barbican import Barbican
|
||||
|
||||
|
||||
class WhenTestingBarbicanCLI(test_client.BaseEntityResource):
|
||||
|
||||
def setUp(self):
|
||||
self._setUp('barbican')
|
||||
self.global_file = six.StringIO()
|
||||
self.captured_stdout = six.StringIO()
|
||||
self.captured_stderr = six.StringIO()
|
||||
self.barbican = Barbican(
|
||||
stdout=self.captured_stdout,
|
||||
stderr=self.captured_stderr
|
||||
)
|
||||
self.parser = self.barbican.build_option_parser('desc', 'vers')
|
||||
|
||||
def barbican(self, argstr):
|
||||
"""Source: Keystone client's shell method in test_shell.py"""
|
||||
clean_env = {}
|
||||
_old_env, os.environ = os.environ, clean_env.copy()
|
||||
exit_code = 1
|
||||
try:
|
||||
stdout = self.global_file
|
||||
_barbican = barbicanclient.barbican.Barbican(stdout=stdout,
|
||||
stderr=stdout)
|
||||
exit_code = _barbican.run(argv=argstr.split())
|
||||
except Exception as exception:
|
||||
exit_message = exception.message
|
||||
except SystemExit as sys_exit_exception:
|
||||
exit_code = sys_exit_exception.code
|
||||
finally:
|
||||
out = stdout.getvalue()
|
||||
os.environ = _old_env
|
||||
return exit_code, out
|
||||
def assert_client_raises(self, args, message):
|
||||
argv, remainder = self.parser.parse_known_args(args.split())
|
||||
e = self.assertRaises(
|
||||
Exception, self.barbican.create_client, argv
|
||||
)
|
||||
self.assertIn(message, str(e))
|
||||
|
||||
def test_should_show_usage_error_with_no_args(self):
|
||||
args = ""
|
||||
exit_code, out = self.barbican(args)
|
||||
self.assertEqual(1, exit_code)
|
||||
self.assertIn('usage:', out)
|
||||
def create_and_assert_client(self, args):
|
||||
argv, remainder = self.parser.parse_known_args(args.split())
|
||||
|
||||
client = self.barbican.create_client(argv)
|
||||
self.assertIsNotNone(client)
|
||||
return client
|
||||
|
||||
def test_should_show_usage_with_help_flag(self):
|
||||
args = "-h"
|
||||
exit_code, out = self.barbican(args)
|
||||
self.assertEqual(0, exit_code)
|
||||
self.assertIn('usage: ', out)
|
||||
e = self.assertRaises(SystemExit, self.parser.parse_known_args, ['-h'])
|
||||
self.assertEqual(0, e.code)
|
||||
self.assertIn('usage', self.captured_stdout.getvalue())
|
||||
|
||||
def test_should_show_usage_with_no_args(self):
|
||||
exit_code = self.barbican.run([])
|
||||
self.assertEquals(1, exit_code)
|
||||
self.assertIn('usage', self.captured_stderr.getvalue())
|
||||
|
||||
def test_should_error_if_noauth_and_authurl_both_specified(self):
|
||||
args = "--no-auth --os-auth-url http://localhost:5000/v3"
|
||||
exit_code, out = self.barbican(args)
|
||||
self.assertEqual(1, exit_code)
|
||||
self.assertIn(
|
||||
message = (
|
||||
'ERROR: argument --os-auth-url/-A: not allowed with '
|
||||
'argument --no-auth/-N', out)
|
||||
'argument --no-auth/-N'
|
||||
)
|
||||
self.assert_client_raises(args, message)
|
||||
|
||||
def _expect_error_with_invalid_noauth_args(self, args):
|
||||
exit_code, out = self.barbican(args)
|
||||
self.assertEqual(1, exit_code)
|
||||
expected_err_msg = 'ERROR: please specify --endpoint '\
|
||||
'and --os-project-id(or --os-tenant-id)\n'
|
||||
self.assertIn(expected_err_msg, out)
|
||||
expected_err_msg = (
|
||||
'ERROR: please specify --endpoint '
|
||||
'and --os-project-id (or --os-tenant-id)'
|
||||
)
|
||||
self.assert_client_raises(args, expected_err_msg)
|
||||
|
||||
def test_should_error_if_noauth_and_missing_endpoint_tenantid_args(self):
|
||||
self._expect_error_with_invalid_noauth_args("--no-auth secret list")
|
||||
|
@ -88,39 +79,34 @@ class WhenTestingBarbicanCLI(test_client.BaseEntityResource):
|
|||
self._expect_error_with_invalid_noauth_args(
|
||||
"--no-auth --os-project-id 123 secret list")
|
||||
|
||||
def _expect_success_code(self, args):
|
||||
exit_code, out = self.barbican(args)
|
||||
self.assertEqual(0, exit_code)
|
||||
|
||||
def _expect_failure_code(self, args, code=1):
|
||||
exit_code, out = self.barbican(args)
|
||||
self.assertEqual(code, exit_code)
|
||||
|
||||
def _assert_status_code_and_msg(self, args, expected_msg, code=1):
|
||||
exit_code, out = self.barbican(args)
|
||||
self.assertEqual(code, exit_code)
|
||||
self.assertIn(expected_msg, out)
|
||||
|
||||
def test_should_succeed_if_noauth_with_valid_args_specified(self):
|
||||
args = (
|
||||
'--no-auth --endpoint {0} --os-tenant-id {1}'
|
||||
'secret list'.format(self.endpoint, self.project_id)
|
||||
)
|
||||
list_secrets_url = '{0}/v1/secrets'.format(self.endpoint)
|
||||
|
||||
self.responses.get(list_secrets_url, json={"secrets": [], "total": 0})
|
||||
|
||||
self._expect_success_code(
|
||||
"--no-auth --endpoint {0} --os-tenant-id {1} secret list".
|
||||
format(self.endpoint, self.project_id))
|
||||
client = self.create_and_assert_client(args)
|
||||
secret_list = client.secrets.list()
|
||||
self.assertTrue(self.responses._adapter.called)
|
||||
self.assertEqual(1, self.responses._adapter.call_count)
|
||||
self.assertEqual([], secret_list)
|
||||
|
||||
def test_should_error_if_required_keystone_auth_arguments_are_missing(
|
||||
self):
|
||||
expected_error_msg = 'ERROR: please specify authentication credentials'
|
||||
self._assert_status_code_and_msg(
|
||||
expected_error_msg = (
|
||||
'ERROR: please specify the following --os-project-id or'
|
||||
' (--os-project-name and --os-project-domain-name) or '
|
||||
' (--os-project-name and --os-project-domain-id)'
|
||||
)
|
||||
self.assert_client_raises(
|
||||
'--os-auth-url http://localhost:35357/v2.0 secret list',
|
||||
expected_error_msg)
|
||||
self._assert_status_code_and_msg('--os-auth-url '
|
||||
'http://localhost:35357/v2.0 '
|
||||
'--os-username barbican '
|
||||
'--os-password barbican '
|
||||
'secret list', expected_error_msg)
|
||||
self.assert_client_raises(
|
||||
'--os-auth-url http://localhost:35357/v2.0 --os-username barbican '
|
||||
'--os-password barbican secret list',
|
||||
expected_error_msg
|
||||
)
|
||||
|
||||
|
||||
class TestBarbicanWithKeystonePasswordAuth(
|
||||
|
|
Loading…
Reference in New Issue