diff --git a/octavia_tempest_plugin/common/constants.py b/octavia_tempest_plugin/common/constants.py index 927aa840..aea476e8 100644 --- a/octavia_tempest_plugin/common/constants.py +++ b/octavia_tempest_plugin/common/constants.py @@ -183,6 +183,7 @@ PATH = 'PATH' # RBAC options ADVANCED = 'advanced' +KEYSTONE_DEFAULT_ROLES = 'keystone_default_roles' OWNERADMIN = 'owner_or_admin' NONE = 'none' diff --git a/octavia_tempest_plugin/config.py b/octavia_tempest_plugin/config.py index f44bf960..fb4cb36c 100644 --- a/octavia_tempest_plugin/config.py +++ b/octavia_tempest_plugin/config.py @@ -86,6 +86,12 @@ OctaviaGroup = [ cfg.StrOpt('admin_role', default='load-balancer_admin', help='The load balancing admin RBAC role.'), + cfg.StrOpt('observer_role', + default='load-balancer_observer', + help='The load balancing observer RBAC role.'), + cfg.StrOpt('global_observer_role', + default='load-balancer_global_observer', + help='The load balancing global observer RBAC role.'), cfg.IntOpt('scp_connection_timeout', default=5, help='Timeout in seconds to wait for a ' @@ -97,10 +103,13 @@ OctaviaGroup = [ default='octavia', help='The provider driver to use for the tests.'), cfg.StrOpt('RBAC_test_type', default=const.ADVANCED, - choices=[const.ADVANCED, const.OWNERADMIN, const.NONE], + choices=[const.ADVANCED, const.KEYSTONE_DEFAULT_ROLES, + const.OWNERADMIN, const.NONE], help='Type of RBAC tests to run. "advanced" runs the octavia ' 'default RBAC tests. "owner_or_admin" runs the legacy ' - 'owner or admin tests. "none" disables the RBAC tests.'), + 'owner or admin tests. "keystone_default_roles" runs the ' + 'tests using only the keystone default roles. "none" ' + 'disables the RBAC tests.'), cfg.DictOpt('enabled_provider_drivers', help=('A comma separated list of dictionaries of the ' 'enabled provider driver names and descriptions. ' @@ -217,6 +226,15 @@ OctaviaGroup = [ default='/opt/octavia-tempest-plugin/test_server.bin', help='Filesystem path to the test web server that will be ' 'installed in the web server VMs.'), + # RBAC related options + # Note: Also see the enforce_scope section (from tempest) for Octavia API + # scope checking setting. + cfg.BoolOpt('enforce_new_defaults', + default=False, + help='Does the load-balancer service API policies enforce ' + 'the new keystone default roles? This configuration ' + 'value should be same as octavia.conf: ' + '[oslo_policy].enforce_new_defaults option.'), ] lb_feature_enabled_group = cfg.OptGroup(name='loadbalancer-feature-enabled', @@ -261,3 +279,15 @@ LBFeatureEnabledGroup = [ "the tempest instance have access to the log files " "specified in the tempest configuration."), ] + +# Extending this enforce_scope group defined in tempest +enforce_scope_group = cfg.OptGroup(name="enforce_scope", + title="OpenStack Services with " + "enforce scope") +EnforceScopeGroup = [ + cfg.BoolOpt('octavia', + default=False, + help='Does the load-balancer service API policies enforce ' + 'scope? This configuration value should be same as ' + 'octavia.conf: [oslo_policy].enforce_scope option.'), +] diff --git a/octavia_tempest_plugin/plugin.py b/octavia_tempest_plugin/plugin.py index ec093e7a..655cdba7 100644 --- a/octavia_tempest_plugin/plugin.py +++ b/octavia_tempest_plugin/plugin.py @@ -38,6 +38,8 @@ class OctaviaTempestPlugin(plugins.TempestPlugin): config.register_opt_group(conf, project_config.lb_feature_enabled_group, project_config.LBFeatureEnabledGroup) + config.register_opt_group(conf, project_config.enforce_scope_group, + project_config.EnforceScopeGroup) def get_opt_lists(self): return [ diff --git a/octavia_tempest_plugin/tests/RBAC_tests.py b/octavia_tempest_plugin/tests/RBAC_tests.py new file mode 100644 index 00000000..b24e5c43 --- /dev/null +++ b/octavia_tempest_plugin/tests/RBAC_tests.py @@ -0,0 +1,472 @@ +# Copyright 2021 Red Hat, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy + +from oslo_log import log as logging +from tempest import config +from tempest.lib import exceptions +from tempest import test + +from octavia_tempest_plugin.common import constants +from octavia_tempest_plugin.tests import waiters + +CONF = config.CONF +LOG = logging.getLogger(__name__) + + +class RBACTestsMixin(test.BaseTestCase): + + def _check_allowed(self, client_str, method_str, allowed_list, + *args, **kwargs): + """Test an API call allowed RBAC enforcement. + + :param client_str: The service client to use for the test, without the + credential. Example: 'amphora_client' + :param method_str: The method on the client to call for the test. + Example: 'list_amphorae' + :param allowed_list: The list of credentials expected to be + allowed. Example: ['os_roles_lb_member']. + :param args: Any positional parameters needed by the method. + :param kwargs: Any named parameters needed by the method. + :raises AssertionError: Raised if the RBAC tests fail. + :raises Forbidden: Raised if a credential that should have access does + not and is denied. + :raises InvalidScope: Raised if a credential that should have the + correct scope for access is denied. + :returns: None on success + """ + for cred in allowed_list: + try: + cred_obj = getattr(self, cred) + except AttributeError: + # TODO(johnsom) Remove once scoped tokens is the default. + if ((cred == 'os_system_admin' or cred == 'os_system_reader') + and not CONF.enforce_scope.octavia): + LOG.info('Skipping %s allowed RBAC test because ' + 'enforce_scope.octavia is not True', cred) + continue + else: + self.fail('Credential {} "expected_allowed" for RBAC ' + 'testing was not created by tempest ' + 'credentials setup. This is likely a bug in the ' + 'test.'.format(cred)) + client = getattr(cred_obj, client_str) + method = getattr(client, method_str) + try: + method(*args, **kwargs) + except exceptions.Forbidden as e: + self.fail('Method {}.{} failed to allow access via RBAC using ' + 'credential {}. Error: {}'.format( + client_str, method_str, cred, str(e))) + + def _check_disallowed(self, client_str, method_str, allowed_list, + status_method=None, obj_id=None, *args, **kwargs): + """Test an API call disallowed RBAC enforcement. + + :param client_str: The service client to use for the test, without the + credential. Example: 'amphora_client' + :param method_str: The method on the client to call for the test. + Example: 'list_amphorae' + :param allowed_list: The list of credentials expected to be + allowed. Example: ['os_roles_lb_member']. + :param status_method: The service client method that will provide + the object status for a status change waiter. + :param obj_id: The ID of the object to check for the expected status + update. + :param args: Any positional parameters needed by the method. + :param kwargs: Any named parameters needed by the method. + :raises AssertionError: Raised if the RBAC tests fail. + :raises Forbidden: Raised if a credential that should have access does + not and is denied. + :raises InvalidScope: Raised if a credential that should have the + correct scope for access is denied. + :returns: None on success + """ + expected_disallowed = (set(self.allocated_credentials) - + set(allowed_list)) + for cred in expected_disallowed: + cred_obj = getattr(self, cred) + client = getattr(cred_obj, client_str) + method = getattr(client, method_str) + + # Unfortunately tempest uses testtools assertRaises[1] which means + # we cannot use the unittest assertRaises context[2] with msg= to + # give a useful error. + # Also, testtools doesn't work with subTest[3], so we can't use + # that to expose the failing credential. + # This all means the exception raised testtools assertRaises + # is less than useful. + # TODO(johnsom) Remove this try block once testtools is useful. + # [1] https://testtools.readthedocs.io/en/latest/ + # api.html#testtools.TestCase.assertRaises + # [2] https://docs.python.org/3/library/ + # unittest.html#unittest.TestCase.assertRaises + # [3] https://github.com/testing-cabal/testtools/issues/235 + try: + method(*args, **kwargs) + except exceptions.Forbidden: + if status_method: + waiters.wait_for_status( + status_method, obj_id, + constants.PROVISIONING_STATUS, constants.ACTIVE, + CONF.load_balancer.check_interval, + CONF.load_balancer.check_timeout) + + continue + self.fail('Method {}.{} failed to deny access via RBAC using ' + 'credential {}.'.format(client_str, method_str, cred)) + + def _list_get_RBAC_enforcement(self, client_str, method_str, + expected_allowed, *args, **kwargs): + """Test an API call RBAC enforcement. + + :param client_str: The service client to use for the test, without the + credential. Example: 'amphora_client' + :param method_str: The method on the client to call for the test. + Example: 'list_amphorae' + :param expected_allowed: The list of credentials expected to be + allowed. Example: ['os_roles_lb_member']. + :param args: Any positional parameters needed by the method. + :param kwargs: Any named parameters needed by the method. + :raises AssertionError: Raised if the RBAC tests fail. + :raises Forbidden: Raised if a credential that should have access does + not and is denied. + :raises InvalidScope: Raised if a credential that should have the + correct scope for access is denied. + :returns: None on success + """ + + allowed_list = copy.deepcopy(expected_allowed) + # os_admin is a special case as it is valid with the old defaults, + # but will not be with the new defaults and/or token scoping. + # The old keystone role "admin" becomes project scoped "admin" + # instead of being a global admin. + # To keep the tests simple, handle that edge case here. + # TODO(johnsom) Once token scope is default, remove this. + if ('os_system_admin' in expected_allowed and + not CONF.load_balancer.enforce_new_defaults and + not CONF.enforce_scope.octavia): + allowed_list.append('os_admin') + + # #### Test that disallowed credentials cannot access the API. + self._check_disallowed(client_str, method_str, allowed_list, + None, None, *args, **kwargs) + + # #### Test that allowed credentials can access the API. + self._check_allowed(client_str, method_str, allowed_list, + *args, **kwargs) + + def check_show_RBAC_enforcement(self, client_str, method_str, + expected_allowed, *args, **kwargs): + """Test an API show call RBAC enforcement. + + :param client_str: The service client to use for the test, without the + credential. Example: 'amphora_client' + :param method_str: The method on the client to call for the test. + Example: 'list_amphorae' + :param expected_allowed: The list of credentials expected to be + allowed. Example: ['os_roles_lb_member']. + :param args: Any positional parameters needed by the method. + :param kwargs: Any named parameters needed by the method. + :raises AssertionError: Raised if the RBAC tests fail. + :raises Forbidden: Raised if a credential that should have access does + not and is denied. + :raises InvalidScope: Raised if a credential that should have the + correct scope for access is denied. + :returns: None on success + """ + self._list_get_RBAC_enforcement(client_str, method_str, + expected_allowed, *args, **kwargs) + + def check_list_RBAC_enforcement(self, client_str, method_str, + expected_allowed, *args, **kwargs): + """Test an API list call RBAC enforcement. + + :param client_str: The service client to use for the test, without the + credential. Example: 'amphora_client' + :param method_str: The method on the client to call for the test. + Example: 'list_amphorae' + :param expected_allowed: The list of credentials expected to be + allowed. Example: ['os_roles_lb_member']. + :param args: Any positional parameters needed by the method. + :param kwargs: Any named parameters needed by the method. + :raises AssertionError: Raised if the RBAC tests fail. + :raises Forbidden: Raised if a credential that should have access does + not and is denied. + :raises InvalidScope: Raised if a credential that should have the + correct scope for access is denied. + :returns: None on success + """ + self._list_get_RBAC_enforcement(client_str, method_str, + expected_allowed, *args, **kwargs) + + def _CUD_RBAC_enforcement(self, client_str, method_str, expected_allowed, + status_method=None, obj_id=None, + *args, **kwargs): + """Test an API create/update/delete call RBAC enforcement. + + :param client_str: The service client to use for the test, without the + credential. Example: 'amphora_client' + :param method_str: The method on the client to call for the test. + Example: 'list_amphorae' + :param expected_allowed: The list of credentials expected to be + allowed. Example: ['os_roles_lb_member']. + :param status_method: The service client method that will provide + the object status for a status change waiter. + :param obj_id: The ID of the object to check for the expected status + update. + :param args: Any positional parameters needed by the method. + :param kwargs: Any named parameters needed by the method. + :raises AssertionError: Raised if the RBAC tests fail. + :raises Forbidden: Raised if a credential that should have access does + not and is denied. + :raises InvalidScope: Raised if a credential that should have the + correct scope for access is denied. + :returns: None on success + """ + + allowed_list = copy.deepcopy(expected_allowed) + # os_admin is a special case as it is valid with the old defaults, + # but will not be with the new defaults and/or token scoping. + # The old keystone role "admin" becomes project scoped "admin" + # instead of being a global admin. + # To keep the tests simple, handle that edge case here. + # TODO(johnsom) Once token scope is default, remove this. + if ('os_system_admin' in expected_allowed and + not CONF.load_balancer.enforce_new_defaults and + not CONF.enforce_scope.octavia): + allowed_list.append('os_admin') + + # #### Test that disallowed credentials cannot access the API. + self._check_disallowed(client_str, method_str, allowed_list, + status_method, obj_id, *args, **kwargs) + + def check_create_RBAC_enforcement( + self, client_str, method_str, expected_allowed, + status_method=None, obj_id=None, *args, **kwargs): + """Test an API create call RBAC enforcement. + + :param client_str: The service client to use for the test, without the + credential. Example: 'amphora_client' + :param method_str: The method on the client to call for the test. + Example: 'list_amphorae' + :param expected_allowed: The list of credentials expected to be + allowed. Example: ['os_roles_lb_member']. + :param status_method: The service client method that will provide + the object status for a status change waiter. + :param obj_id: The ID of the object to check for the expected status + update. + :param args: Any positional parameters needed by the method. + :param kwargs: Any named parameters needed by the method. + :raises AssertionError: Raised if the RBAC tests fail. + :raises Forbidden: Raised if a credential that should have access does + not and is denied. + :raises InvalidScope: Raised if a credential that should have the + correct scope for access is denied. + :returns: None on success + """ + self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed, + status_method, obj_id, *args, **kwargs) + + def check_delete_RBAC_enforcement( + self, client_str, method_str, expected_allowed, + status_method=None, obj_id=None, *args, **kwargs): + """Test an API delete call RBAC enforcement. + + :param client_str: The service client to use for the test, without the + credential. Example: 'amphora_client' + :param method_str: The method on the client to call for the test. + Example: 'list_amphorae' + :param expected_allowed: The list of credentials expected to be + allowed. Example: ['os_roles_lb_member']. + :param status_method: The service client method that will provide + the object status for a status change waiter. + :param obj_id: The ID of the object to check for the expected status + update. + :param args: Any positional parameters needed by the method. + :param kwargs: Any named parameters needed by the method. + :raises AssertionError: Raised if the RBAC tests fail. + :raises Forbidden: Raised if a credential that should have access does + not and is denied. + :raises InvalidScope: Raised if a credential that should have the + correct scope for access is denied. + :returns: None on success + """ + self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed, + status_method, obj_id, *args, **kwargs) + + def check_update_RBAC_enforcement( + self, client_str, method_str, expected_allowed, + status_method=None, obj_id=None, *args, **kwargs): + """Test an API update call RBAC enforcement. + + :param client_str: The service client to use for the test, without the + credential. Example: 'amphora_client' + :param method_str: The method on the client to call for the test. + Example: 'list_amphorae' + :param expected_allowed: The list of credentials expected to be + allowed. Example: ['os_roles_lb_member']. + :param status_method: The service client method that will provide + the object status for a status change waiter. + :param obj_id: The ID of the object to check for the expected status + update. + :param args: Any positional parameters needed by the method. + :param kwargs: Any named parameters needed by the method. + :raises AssertionError: Raised if the RBAC tests fail. + :raises Forbidden: Raised if a credential that should have access does + not and is denied. + :raises InvalidScope: Raised if a credential that should have the + correct scope for access is denied. + :returns: None on success + """ + self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed, + status_method, obj_id, *args, **kwargs) + + def check_list_RBAC_enforcement_count( + self, client_str, method_str, expected_allowed, expected_count, + *args, **kwargs): + """Test an API list call RBAC enforcement result count. + + List APIs will return the object list for the project associated + with the token used to access the API. This means most credentials + will have access, but will get differing results. + + This test will query the list API using a list of credentials and + will validate that only the expected count of results are returned. + + :param client_str: The service client to use for the test, without the + credential. Example: 'amphora_client' + :param method_str: The method on the client to call for the test. + Example: 'list_amphorae' + :param expected_allowed: The list of credentials expected to be + allowed. Example: ['os_roles_lb_member']. + :param expected_count: The number of results expected in the list + returned from the API. + :param args: Any positional parameters needed by the method. + :param kwargs: Any named parameters needed by the method. + :raises AssertionError: Raised if the RBAC tests fail. + :raises Forbidden: Raised if a credential that should have access does + not and is denied. + :raises InvalidScope: Raised if a credential that should have the + correct scope for access is denied. + :returns: None on success + """ + + allowed_list = copy.deepcopy(expected_allowed) + # os_admin is a special case as it is valid with the old defaults, + # but will not be with the new defaults and/or token scoping. + # The old keystone role "admin" becomes project scoped "admin" + # instead of being a global admin. + # To keep the tests simple, handle that edge case here. + # TODO(johnsom) Once token scope is default, remove this. + if ('os_system_admin' in expected_allowed and + not CONF.load_balancer.enforce_new_defaults and + not CONF.enforce_scope.octavia): + allowed_list.append('os_admin') + + for cred in allowed_list: + try: + cred_obj = getattr(self, cred) + except AttributeError: + # TODO(johnsom) Remove once scoped tokens is the default. + if ((cred == 'os_system_admin' or cred == 'os_system_reader') + and not CONF.enforce_scope.octavia): + LOG.info('Skipping %s allowed RBAC test because ' + 'enforce_scope.octavia is not True', cred) + continue + else: + self.fail('Credential {} "expected_allowed" for RBAC ' + 'testing was not created by tempest ' + 'credentials setup. This is likely a bug in the ' + 'test.'.format(cred)) + client = getattr(cred_obj, client_str) + method = getattr(client, method_str) + try: + result = method(*args, **kwargs) + except exceptions.Forbidden as e: + self.fail('Method {}.{} failed to allow access via RBAC using ' + 'credential {}. Error: {}'.format( + client_str, method_str, cred, str(e))) + self.assertEqual(expected_count, len(result), message='Credential ' + '{} saw {} objects when {} was expected.'.format( + cred, len(result), expected_count)) + + def check_list_IDs_RBAC_enforcement( + self, client_str, method_str, expected_allowed, expected_ids, + *args, **kwargs): + """Test an API list call RBAC enforcement result contains IDs. + + List APIs will return the object list for the project associated + with the token used to access the API. This means most credentials + will have access, but will get differing results. + + This test will query the list API using a list of credentials and + will validate that the expected object Ids in included in the results. + + :param client_str: The service client to use for the test, without the + credential. Example: 'amphora_client' + :param method_str: The method on the client to call for the test. + Example: 'list_amphorae' + :param expected_allowed: The list of credentials expected to be + allowed. Example: ['os_roles_lb_member']. + :param expected_ids: The list of object IDs to validate are included + in the returned list from the API. + :param args: Any positional parameters needed by the method. + :param kwargs: Any named parameters needed by the method. + :raises AssertionError: Raised if the RBAC tests fail. + :raises Forbidden: Raised if a credential that should have access does + not and is denied. + :raises InvalidScope: Raised if a credential that should have the + correct scope for access is denied. + :returns: None on success + """ + + allowed_list = copy.deepcopy(expected_allowed) + # os_admin is a special case as it is valid with the old defaults, + # but will not be with the new defaults and/or token scoping. + # The old keystone role "admin" becomes project scoped "admin" + # instead of being a global admin. + # To keep the tests simple, handle that edge case here. + # TODO(johnsom) Once token scope is default, remove this. + if ('os_system_admin' in expected_allowed and + not CONF.load_balancer.enforce_new_defaults and + not CONF.enforce_scope.octavia): + allowed_list.append('os_admin') + + for cred in allowed_list: + try: + cred_obj = getattr(self, cred) + except AttributeError: + # TODO(johnsom) Remove once scoped tokens is the default. + if ((cred == 'os_system_admin' or cred == 'os_system_reader') + and not CONF.enforce_scope.octavia): + LOG.info('Skipping %s allowed RBAC test because ' + 'enforce_scope.octavia is not True', cred) + continue + else: + self.fail('Credential {} "expected_allowed" for RBAC ' + 'testing was not created by tempest ' + 'credentials setup. This is likely a bug in the ' + 'test.'.format(cred)) + client = getattr(cred_obj, client_str) + method = getattr(client, method_str) + try: + result = method(*args, **kwargs) + except exceptions.Forbidden as e: + self.fail('Method {}.{} failed to allow access via RBAC using ' + 'credential {}. Error: {}'.format( + client_str, method_str, cred, str(e))) + result_ids = [lb[constants.ID] for lb in result] + self.assertTrue(set(expected_ids).issubset(set(result_ids))) diff --git a/octavia_tempest_plugin/tests/api/v2/test_amphora.py b/octavia_tempest_plugin/tests/api/v2/test_amphora.py index 146096dd..a1a64416 100644 --- a/octavia_tempest_plugin/tests/api/v2/test_amphora.py +++ b/octavia_tempest_plugin/tests/api/v2/test_amphora.py @@ -20,7 +20,6 @@ from dateutil import parser from tempest import config from tempest.lib.common.utils import data_utils from tempest.lib import decorators -from tempest.lib import exceptions from octavia_tempest_plugin.common import constants as const from octavia_tempest_plugin.tests import test_base @@ -90,12 +89,17 @@ class AmphoraAPITest(test_base.LoadBalancerBaseTest): CONF.load_balancer.lb_build_interval, CONF.load_balancer.lb_build_timeout) - # Test that a user, without the load balancer member role, cannot - # list amphorae + # Test RBAC for list amphorae + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.amphora_client.list_amphorae) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_list_RBAC_enforcement( + 'amphora_client', 'list_amphorae', expected_allowed) # Get an actual list of the amphorae amphorae = self.lb_admin_amphora_client.list_amphorae() @@ -112,13 +116,11 @@ class AmphoraAPITest(test_base.LoadBalancerBaseTest): self.assertEqual(self._expected_amp_count(amphorae), len(amphorae)) self.assertEqual(self.lb_id, amphorae[0][const.LOADBALANCER_ID]) - # Test that a different user, with load balancer member role, cannot - # see this amphora - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.amphora_client - self.assertRaises(exceptions.Forbidden, - member2_client.show_amphora, - amphora_id=amphorae[0][const.ID]) + # Test RBAC for show amphora + if expected_allowed: + self.check_show_RBAC_enforcement( + 'amphora_client', 'show_amphora', expected_allowed, + amphora_id=amphorae[0][const.ID]) show_amphora_response_fields = const.SHOW_AMPHORA_RESPONSE_FIELDS if self.lb_admin_amphora_client.is_version_supported( @@ -175,13 +177,18 @@ class AmphoraAPITest(test_base.LoadBalancerBaseTest): loadbalancer_id=const.LOADBALANCER_ID, lb_id=self.lb_id)) amphora_1 = amphorae[0] - # Test that a user without the load balancer admin role cannot - # create a flavor + # Test RBAC for update an amphora + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.amphora_client.update_amphora_config, - amphora_1[const.ID]) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_update_RBAC_enforcement( + 'amphora_client', 'update_amphora_config', expected_allowed, + None, None, amphora_1[const.ID]) self.lb_admin_amphora_client.update_amphora_config(amphora_1[const.ID]) @@ -205,15 +212,18 @@ class AmphoraAPITest(test_base.LoadBalancerBaseTest): loadbalancer_id=const.LOADBALANCER_ID, lb_id=self.lb_id)) amphora_1 = amphorae[0] - # Test RBAC not authorized for non-admin role - if not CONF.load_balancer.RBAC_test_type == const.NONE: - self.assertRaises(exceptions.Forbidden, - self.os_primary.amphora_client.amphora_failover, - amphora_1[const.ID]) - self.assertRaises( - exceptions.Forbidden, - self.os_roles_lb_member.amphora_client.amphora_failover, - amphora_1[const.ID]) + # Test RBAC for failover an amphora + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.ADVANCED: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_update_RBAC_enforcement( + 'amphora_client', 'amphora_failover', expected_allowed, + None, None, amphora_1[const.ID]) self.lb_admin_amphora_client.amphora_failover(amphora_1[const.ID]) diff --git a/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py b/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py index 29426c25..8ea68089 100644 --- a/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py +++ b/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py @@ -104,12 +104,18 @@ class AvailabilityZoneAPITest(test_base.LoadBalancerBaseTest): self.availability_zone_profile_id} # Test that a user without the load balancer admin role cannot - # create an availability zone + # create an availability zone. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises(exceptions.Forbidden, - self.os_primary.availability_zone_client - .create_availability_zone, - **availability_zone_kwargs) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_create_RBAC_enforcement( + 'availability_zone_client', 'create_availability_zone', + expected_allowed, **availability_zone_kwargs) # Happy path availability_zone = ( @@ -220,11 +226,26 @@ class AvailabilityZoneAPITest(test_base.LoadBalancerBaseTest): # Test that a user without the load balancer role cannot # list availability zones. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = [ + 'os_admin', 'os_primary', 'os_roles_lb_admin', + 'os_roles_lb_observer', 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_admin', 'os_primary', 'os_system_admin', + 'os_system_reader', 'os_roles_lb_observer', + 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.availability_zone_client - .list_availability_zones) + expected_allowed = [ + 'os_system_admin', 'os_system_reader', 'os_roles_lb_admin', + 'os_roles_lb_observer', 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if expected_allowed: + self.check_list_RBAC_enforcement( + 'availability_zone_client', 'list_availability_zones', + expected_allowed) # Check the default sort order (by ID) availability_zones = ( @@ -359,12 +380,26 @@ class AvailabilityZoneAPITest(test_base.LoadBalancerBaseTest): # Test that a user without the load balancer role cannot # show availability zone details. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = [ + 'os_admin', 'os_primary', 'os_roles_lb_admin', + 'os_roles_lb_observer', 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_admin', 'os_primary', 'os_system_admin', + 'os_system_reader', 'os_roles_lb_observer', + 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.availability_zone_client - .show_availability_zone, - availability_zone[const.NAME]) + expected_allowed = [ + 'os_system_admin', 'os_system_reader', 'os_roles_lb_admin', + 'os_roles_lb_observer', 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if expected_allowed: + self.check_show_RBAC_enforcement( + 'availability_zone_client', 'show_availability_zone', + expected_allowed, availability_zone[const.NAME]) result = self.mem_availability_zone_client.show_availability_zone( availability_zone[const.NAME]) @@ -420,13 +455,18 @@ class AvailabilityZoneAPITest(test_base.LoadBalancerBaseTest): const.ENABLED: False} # Test that a user without the load balancer role cannot - # show availability zone details. + # update availability zone details. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.availability_zone_client - .update_availability_zone, - availability_zone[const.NAME], + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_update_RBAC_enforcement( + 'availability_zone_client', 'update_availability_zone', + expected_allowed, None, None, availability_zone[const.NAME], **availability_zone_updated_kwargs) updated_availability_zone = ( @@ -493,12 +533,17 @@ class AvailabilityZoneAPITest(test_base.LoadBalancerBaseTest): # Test that a user without the load balancer admin role cannot # delete an availability zone. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.availability_zone_client - .delete_availability_zone, - availability_zone[const.NAME]) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_delete_RBAC_enforcement( + 'availability_zone_client', 'delete_availability_zone', + expected_allowed, None, None, availability_zone[const.NAME]) # Happy path self.lb_admin_availability_zone_client.delete_availability_zone( diff --git a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py index 4a12057a..4831ede6 100644 --- a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py +++ b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py @@ -15,7 +15,6 @@ from tempest import config from tempest.lib import decorators -from tempest.lib import exceptions from octavia_tempest_plugin.common import constants as const from octavia_tempest_plugin.tests import test_base @@ -45,13 +44,17 @@ class AvailabilityZoneCapabilitiesAPITest(test_base.LoadBalancerBaseTest): # Test that a user without the load balancer admin role cannot # list provider availability zone capabilities. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - os_primary_capabilities_client = ( - self.os_primary.availability_zone_capabilities_client) - self.assertRaises( - exceptions.Forbidden, - (os_primary_capabilities_client - .list_availability_zone_capabilities), + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_list_RBAC_enforcement( + 'availability_zone_capabilities_client', + 'list_availability_zone_capabilities', expected_allowed, CONF.load_balancer.provider) # Check for an expected availability zone capability for the diff --git a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py index f5e4b7e1..079d1258 100644 --- a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py +++ b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py @@ -75,13 +75,19 @@ class AvailabilityZoneProfileAPITest(test_base.LoadBalancerBaseTest): } # Test that a user without the load balancer admin role cannot - # create an availability zone profile profile + # create an availability zone profile. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.availability_zone_profile_client - .create_availability_zone_profile, - **availability_zone_profile_kwargs) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_create_RBAC_enforcement( + 'availability_zone_profile_client', + 'create_availability_zone_profile', + expected_allowed, **availability_zone_profile_kwargs) # Happy path availability_zone_profile = ( @@ -225,11 +231,17 @@ class AvailabilityZoneProfileAPITest(test_base.LoadBalancerBaseTest): # Test that a user without the load balancer admin role cannot # list availability zone profiles. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.availability_zone_profile_client - .list_availability_zone_profiles) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_list_RBAC_enforcement( + 'availability_zone_profile_client', + 'list_availability_zone_profiles', expected_allowed) # Check the default sort order (by ID) profiles = (self.lb_admin_availability_zone_profile_client @@ -379,12 +391,18 @@ class AvailabilityZoneProfileAPITest(test_base.LoadBalancerBaseTest): availability_zone_profile[const.ID]) # Test that a user without the load balancer admin role cannot - # show an availability zone profile profile + # show an availability zone profile + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.availability_zone_profile_client - .show_availability_zone_profile, + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_show_RBAC_enforcement( + 'availability_zone_profile_client', + 'show_availability_zone_profile', expected_allowed, availability_zone_profile[const.ID]) result = ( @@ -475,13 +493,19 @@ class AvailabilityZoneProfileAPITest(test_base.LoadBalancerBaseTest): } # Test that a user without the load balancer admin role cannot - # create an availability zone profile profile + # update an availability zone profile. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.availability_zone_profile_client - .update_availability_zone_profile, - availability_zone_profile[const.ID], + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_update_RBAC_enforcement( + 'availability_zone_profile_client', + 'update_availability_zone_profile', expected_allowed, + None, None, availability_zone_profile[const.ID], **availability_zone_profile_updated_kwargs) result = ( @@ -551,13 +575,19 @@ class AvailabilityZoneProfileAPITest(test_base.LoadBalancerBaseTest): availability_zone_profile[const.ID]) # Test that a user without the load balancer admin role cannot - # delete an availability zone profile profile + # delete an availability zone profile. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.availability_zone_profile_client - .delete_availability_zone_profile, - availability_zone_profile[const.ID]) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_delete_RBAC_enforcement( + 'availability_zone_profile_client', + 'delete_availability_zone_profile', expected_allowed, + None, None, availability_zone_profile[const.ID]) # Happy path (self.lb_admin_availability_zone_profile_client diff --git a/octavia_tempest_plugin/tests/api/v2/test_flavor.py b/octavia_tempest_plugin/tests/api/v2/test_flavor.py index 43331215..c30f830a 100644 --- a/octavia_tempest_plugin/tests/api/v2/test_flavor.py +++ b/octavia_tempest_plugin/tests/api/v2/test_flavor.py @@ -87,11 +87,18 @@ class FlavorAPITest(test_base.LoadBalancerBaseTest): const.FLAVOR_PROFILE_ID: self.flavor_profile_id} # Test that a user without the load balancer admin role cannot - # create a flavor + # create a flavor. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises(exceptions.Forbidden, - self.os_primary.flavor_client.create_flavor, - **flavor_kwargs) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_create_RBAC_enforcement( + 'flavor_client', 'create_flavor', + expected_allowed, None, None, **flavor_kwargs) # Happy path flavor = self.lb_admin_flavor_client.create_flavor(**flavor_kwargs) @@ -185,10 +192,25 @@ class FlavorAPITest(test_base.LoadBalancerBaseTest): # Test that a user without the load balancer role cannot # list flavors. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = [ + 'os_admin', 'os_primary', 'os_roles_lb_admin', + 'os_roles_lb_observer', 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_admin', 'os_primary', 'os_system_admin', + 'os_system_reader', 'os_roles_lb_observer', + 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.flavor_client.list_flavors) + expected_allowed = [ + 'os_system_admin', 'os_system_reader', 'os_roles_lb_admin', + 'os_roles_lb_observer', 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if expected_allowed: + self.check_list_RBAC_enforcement( + 'flavor_client', 'list_flavors', expected_allowed) # Check the default sort order (by ID) flavors = self.mem_flavor_client.list_flavors() @@ -299,10 +321,25 @@ class FlavorAPITest(test_base.LoadBalancerBaseTest): # Test that a user without the load balancer role cannot # show flavor details. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = [ + 'os_admin', 'os_primary', 'os_roles_lb_observer', + 'os_roles_lb_global_observer', 'os_roles_lb_admin', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_admin', 'os_primary', 'os_system_admin', + 'os_system_reader', 'os_roles_lb_observer', + 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.flavor_client.show_flavor, + expected_allowed = [ + 'os_system_admin', 'os_system_reader', 'os_roles_lb_admin', + 'os_roles_lb_observer', 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if expected_allowed: + self.check_show_RBAC_enforcement( + 'flavor_client', 'show_flavor', expected_allowed, flavor[const.ID]) result = self.mem_flavor_client.show_flavor(flavor[const.ID]) @@ -354,11 +391,17 @@ class FlavorAPITest(test_base.LoadBalancerBaseTest): const.ENABLED: False} # Test that a user without the load balancer role cannot - # show flavor details. + # update flavor details. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.flavor_client.update_flavor, + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_update_RBAC_enforcement( + 'flavor_client', 'update_flavor', expected_allowed, None, None, flavor[const.ID], **flavor_updated_kwargs) updated_flavor = self.lb_admin_flavor_client.update_flavor( @@ -413,11 +456,17 @@ class FlavorAPITest(test_base.LoadBalancerBaseTest): # Test that a user without the load balancer admin role cannot # delete a flavor. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.flavor_client.delete_flavor, - flavor[const.ID]) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_delete_RBAC_enforcement( + 'flavor_client', 'delete_flavor', expected_allowed, + None, None, flavor[const.ID]) # Happy path self.lb_admin_flavor_client.delete_flavor(flavor[const.ID]) diff --git a/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py b/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py index 7f9da51a..5b488336 100644 --- a/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py +++ b/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py @@ -14,7 +14,6 @@ from tempest import config from tempest.lib import decorators -from tempest.lib import exceptions from octavia_tempest_plugin.common import constants as const from octavia_tempest_plugin.tests import test_base @@ -43,12 +42,17 @@ class FlavorCapabilitiesAPITest(test_base.LoadBalancerBaseTest): # Test that a user without the load balancer admin role cannot # list provider flavor capabilities. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - os_primary_capabilities_client = ( - self.os_primary.flavor_capabilities_client) - self.assertRaises( - exceptions.Forbidden, - os_primary_capabilities_client.list_flavor_capabilities, + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_list_RBAC_enforcement( + 'flavor_capabilities_client', + 'list_flavor_capabilities', expected_allowed, CONF.load_balancer.provider) # Check for an expected flavor capability for the configured provider diff --git a/octavia_tempest_plugin/tests/api/v2/test_flavor_profile.py b/octavia_tempest_plugin/tests/api/v2/test_flavor_profile.py index 8f9c7d30..eec8679b 100644 --- a/octavia_tempest_plugin/tests/api/v2/test_flavor_profile.py +++ b/octavia_tempest_plugin/tests/api/v2/test_flavor_profile.py @@ -60,11 +60,17 @@ class FlavorProfileAPITest(test_base.LoadBalancerBaseTest): # Test that a user without the load balancer admin role cannot # create a flavor profile + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.flavor_profile_client.create_flavor_profile, - **flavor_profile_kwargs) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_create_RBAC_enforcement( + 'flavor_profile_client', 'create_flavor_profile', + expected_allowed, None, None, **flavor_profile_kwargs) # Happy path flavor_profile = ( @@ -174,10 +180,17 @@ class FlavorProfileAPITest(test_base.LoadBalancerBaseTest): # Test that a user without the load balancer admin role cannot # list flavor profiles. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.flavor_profile_client.list_flavor_profiles) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_list_RBAC_enforcement( + 'flavor_profile_client', 'list_flavor_profiles', + expected_allowed) # Check the default sort order (by ID) profiles = self.lb_admin_flavor_profile_client.list_flavor_profiles() @@ -295,12 +308,18 @@ class FlavorProfileAPITest(test_base.LoadBalancerBaseTest): flavor_profile[const.ID]) # Test that a user without the load balancer admin role cannot - # show a flavor profile + # show a flavor profile. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.flavor_profile_client.show_flavor_profile, - flavor_profile[const.ID]) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_show_RBAC_enforcement( + 'flavor_profile_client', 'show_flavor_profile', + expected_allowed, flavor_profile[const.ID]) result = ( self.lb_admin_flavor_profile_client.show_flavor_profile( @@ -367,12 +386,19 @@ class FlavorProfileAPITest(test_base.LoadBalancerBaseTest): } # Test that a user without the load balancer admin role cannot - # create a flavor profile + # update a flavor profile. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.flavor_profile_client.update_flavor_profile, - flavor_profile[const.ID], **flavor_profile_updated_kwargs) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_update_RBAC_enforcement( + 'flavor_profile_client', 'update_flavor_profile', + expected_allowed, None, None, flavor_profile[const.ID], + **flavor_profile_updated_kwargs) result = self.lb_admin_flavor_profile_client.update_flavor_profile( flavor_profile[const.ID], **flavor_profile_updated_kwargs) @@ -428,11 +454,17 @@ class FlavorProfileAPITest(test_base.LoadBalancerBaseTest): # Test that a user without the load balancer admin role cannot # delete a flavor profile + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.flavor_profile_client.delete_flavor_profile, - flavor_profile[const.ID]) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_delete_RBAC_enforcement( + 'flavor_profile_client', 'delete_flavor_profile', + expected_allowed, None, None, flavor_profile[const.ID]) # Happy path self.lb_admin_flavor_profile_client.delete_flavor_profile( diff --git a/octavia_tempest_plugin/tests/api/v2/test_healthmonitor.py b/octavia_tempest_plugin/tests/api/v2/test_healthmonitor.py index ff07c73f..0b4036d3 100644 --- a/octavia_tempest_plugin/tests/api/v2/test_healthmonitor.py +++ b/octavia_tempest_plugin/tests/api/v2/test_healthmonitor.py @@ -277,11 +277,21 @@ class HealthMonitorAPITest(test_base.LoadBalancerBaseTest): # Test that a user without the loadbalancer role cannot # create a healthmonitor + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.healthmonitor_client.create_healthmonitor, - **hm_kwargs) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_create_RBAC_enforcement( + 'healthmonitor_client', 'create_healthmonitor', + expected_allowed, + status_method=self.mem_lb_client.show_loadbalancer, + obj_id=self.lb_id, **hm_kwargs) hm = self.mem_healthmonitor_client.create_healthmonitor(**hm_kwargs) self.addCleanup( @@ -488,6 +498,9 @@ class HealthMonitorAPITest(test_base.LoadBalancerBaseTest): * List the healthmonitors filtering to one of the three. * List the healthmonitors filtered, one field, and sorted. """ + # IDs of health monitors created in the test + test_ids = [] + if (pool_algorithm == const.LB_ALGORITHM_SOURCE_IP_PORT and not self.mem_listener_client.is_version_supported( self.api_version, '2.13')): @@ -614,6 +627,7 @@ class HealthMonitorAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) + test_ids.append(hm1[const.ID]) # Time resolution for created_at is only to the second, and we need to # ensure that each object has a distinct creation time. Delaying one # second is both a simple and a reliable way to accomplish this. @@ -658,6 +672,7 @@ class HealthMonitorAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) + test_ids.append(hm2[const.ID]) # Time resolution for created_at is only to the second, and we need to # ensure that each object has a distinct creation time. Delaying one # second is both a simple and a reliable way to accomplish this. @@ -702,19 +717,68 @@ class HealthMonitorAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) + test_ids.append(hm3[const.ID]) - # Test that a different user cannot list healthmonitors - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.healthmonitor_client - primary = member2_client.list_healthmonitors( - query_params='pool_id={pool_id}'.format(pool_id=pool1_id)) - self.assertEqual(0, len(primary)) + # Test that a different users cannot see the lb_member healthmonitors + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_primary', 'os_roles_lb_member2', + 'os_roles_lb_observer'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_admin', 'os_primary', + 'os_roles_lb_member2', 'os_roles_lb_observer', + 'os_roles_lb_global_observer'] + if CONF.load_balancer.RBAC_test_type == const.ADVANCED: + expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2'] + if expected_allowed: + self.check_list_RBAC_enforcement_count( + 'healthmonitor_client', 'list_healthmonitors', + expected_allowed, 0) + + # Test credentials that should see these healthmonitors can see them. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.ADVANCED: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', 'os_roles_lb_member', + 'os_roles_lb_global_observer'] + if expected_allowed: + self.check_list_IDs_RBAC_enforcement( + 'healthmonitor_client', 'list_healthmonitors', + expected_allowed, test_ids) # Test that users without the lb member role cannot list healthmonitors + # Note: non-owners can still call this API, they will just get the list + # of health monitors for their project (zero). The above tests + # are intended to cover the cross project use case. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin', + 'os_roles_lb_member', 'os_roles_lb_member2', + 'os_roles_lb_observer', + 'os_roles_lb_global_observer'] + # Note: os_admin is here because it evaluaties to "project_admin" + # in oslo_policy and since keystone considers "project_admin" + # a superscope of "project_reader". This means it can read + # objects in the "admin" credential's project. + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_admin', 'os_primary', 'os_system_admin', + 'os_system_reader', 'os_roles_lb_observer', + 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.healthmonitor_client.list_healthmonitors) + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', 'os_roles_lb_observer', + 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if expected_allowed: + self.check_list_RBAC_enforcement( + 'healthmonitor_client', 'list_healthmonitors', + expected_allowed) # Check the default sort order, created_at hms = self.mem_healthmonitor_client.list_healthmonitors() @@ -1125,33 +1189,24 @@ class HealthMonitorAPITest(test_base.LoadBalancerBaseTest): for item in equal_items: self.assertEqual(hm_kwargs[item], hm[item]) - # Test that a user with lb_admin role can see the healthmonitor + # Test that the appropriate users can see or not see the health + # monitors based on the API RBAC. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - healthmonitor_client = self.os_roles_lb_admin.healthmonitor_client - hm_adm = healthmonitor_client.show_healthmonitor(hm[const.ID]) - self.assertEqual(hm_name, hm_adm[const.NAME]) - - # Test that a user with cloud admin role can see the healthmonitor - if not CONF.load_balancer.RBAC_test_type == const.NONE: - adm = self.os_admin.healthmonitor_client.show_healthmonitor( - hm[const.ID]) - self.assertEqual(hm_name, adm[const.NAME]) - - # Test that a different user, with loadbalancer member role, cannot - # see this healthmonitor - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.healthmonitor_client - self.assertRaises(exceptions.Forbidden, - member2_client.show_healthmonitor, - hm[const.ID]) - - # Test that a user, without the loadbalancer member role, cannot - # show healthmonitors - if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.healthmonitor_client.show_healthmonitor, - hm[const.ID]) + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', + 'os_roles_lb_global_observer', + 'os_roles_lb_member'] + if expected_allowed: + self.check_show_RBAC_enforcement( + 'healthmonitor_client', 'show_healthmonitor', + expected_allowed, hm[const.ID]) @decorators.idempotent_id('2417164b-ec03-4488-afd2-60b096dc0077') def test_LC_HTTP_healthmonitor_update(self): @@ -1417,27 +1472,21 @@ class HealthMonitorAPITest(test_base.LoadBalancerBaseTest): self.assertCountEqual(hm_kwargs[const.TAGS], hm[const.TAGS]) # Test that a user, without the loadbalancer member role, cannot - # use this command + # update this healthmonitor. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.healthmonitor_client.update_healthmonitor, - hm[const.ID], admin_state_up=True) - - # Assert we didn't go into PENDING_* - hm_check = self.mem_healthmonitor_client.show_healthmonitor( - hm[const.ID]) - self.assertEqual(const.ACTIVE, - hm_check[const.PROVISIONING_STATUS]) - self.assertFalse(hm_check[const.ADMIN_STATE_UP]) - - # Test that a user, without the loadbalancer member role, cannot - # update this healthmonitor - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.healthmonitor_client - self.assertRaises(exceptions.Forbidden, - member2_client.update_healthmonitor, - hm[const.ID], admin_state_up=True) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_update_RBAC_enforcement( + 'healthmonitor_client', 'update_healthmonitor', + expected_allowed, None, None, hm[const.ID], + admin_state_up=True) # Assert we didn't go into PENDING_* hm_check = self.mem_healthmonitor_client.show_healthmonitor( @@ -1725,21 +1774,21 @@ class HealthMonitorAPITest(test_base.LoadBalancerBaseTest): CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) - # Test that a user without the loadbalancer role cannot - # delete this healthmonitor + # Test that a user without the loadbalancer role cannot delete this + # healthmonitor. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.healthmonitor_client.delete_healthmonitor, - hm[const.ID]) - - # Test that a different user, with the loadbalancer member role - # cannot delete this healthmonitor - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.healthmonitor_client - self.assertRaises(exceptions.Forbidden, - member2_client.delete_healthmonitor, - hm[const.ID]) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_delete_RBAC_enforcement( + 'healthmonitor_client', 'delete_healthmonitor', + expected_allowed, None, None, hm[const.ID]) self.mem_healthmonitor_client.delete_healthmonitor(hm[const.ID]) diff --git a/octavia_tempest_plugin/tests/api/v2/test_l7policy.py b/octavia_tempest_plugin/tests/api/v2/test_l7policy.py index 440a5338..6e1406b3 100644 --- a/octavia_tempest_plugin/tests/api/v2/test_l7policy.py +++ b/octavia_tempest_plugin/tests/api/v2/test_l7policy.py @@ -19,7 +19,6 @@ from dateutil import parser from tempest import config from tempest.lib.common.utils import data_utils from tempest.lib import decorators -from tempest.lib import exceptions from octavia_tempest_plugin.common import constants as const from octavia_tempest_plugin.tests import test_base @@ -135,11 +134,21 @@ class L7PolicyAPITest(test_base.LoadBalancerBaseTest): # Test that a user without the load balancer role cannot # create a l7policy + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.l7policy_client.create_l7policy, - **l7policy_kwargs) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_create_RBAC_enforcement( + 'l7policy_client', 'create_l7policy', + expected_allowed, + status_method=self.mem_lb_client.show_loadbalancer, + obj_id=self.lb_id, **l7policy_kwargs) l7policy = self.mem_l7policy_client.create_l7policy(**l7policy_kwargs) @@ -208,6 +217,9 @@ class L7PolicyAPITest(test_base.LoadBalancerBaseTest): * List the l7policies filtering to one of the three. * List the l7policies filtered, one field, and sorted. """ + # IDs of L7 policies created in the test + test_ids = [] + listener_name = data_utils.rand_name( "lb_member_listener2_l7policy-list") listener_kwargs = { @@ -263,6 +275,7 @@ class L7PolicyAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) + test_ids.append(l7policy1[const.ID]) # Time resolution for created_at is only to the second, and we need to # ensure that each object has a distinct creation time. Delaying one # second is both a simple and a reliable way to accomplish this. @@ -303,6 +316,7 @@ class L7PolicyAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) + test_ids.append(l7policy2[const.ID]) # Time resolution for created_at is only to the second, and we need to # ensure that each object has a distinct creation time. Delaying one # second is both a simple and a reliable way to accomplish this. @@ -344,21 +358,70 @@ class L7PolicyAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) + test_ids.append(l7policy3[const.ID]) - # Test that a different user cannot list l7policies - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.l7policy_client - primary = member2_client.list_l7policies( + # Test that a different users cannot see the lb_member l7policies + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_primary', 'os_roles_lb_member2', + 'os_roles_lb_observer'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_admin', 'os_primary', + 'os_roles_lb_member2', 'os_roles_lb_observer', + 'os_roles_lb_global_observer'] + if CONF.load_balancer.RBAC_test_type == const.ADVANCED: + expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2'] + if expected_allowed: + self.check_list_RBAC_enforcement_count( + 'l7policy_client', 'list_l7policies', + expected_allowed, 0) + + # Test credentials that should see these l7policies can see them. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.ADVANCED: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', 'os_roles_lb_member', + 'os_roles_lb_global_observer'] + if expected_allowed: + self.check_list_IDs_RBAC_enforcement( + 'l7policy_client', 'list_l7policies', + expected_allowed, test_ids, query_params='listener_id={listener_id}'.format( listener_id=listener_id)) - self.assertEqual(0, len(primary)) - # Test that a user without the lb member role cannot list load - # balancers + # Test that users without the lb member role cannot list l7policies + # Note: non-owners can still call this API, they will just get the list + # of L7 policies for their project (zero). The above tests + # are intended to cover the cross project use case. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin', + 'os_roles_lb_member', 'os_roles_lb_member2', + 'os_roles_lb_observer', + 'os_roles_lb_global_observer'] + # Note: os_admin is here because it evaluaties to "project_admin" + # in oslo_policy and since keystone considers "project_admin" + # a superscope of "project_reader". This means it can read + # objects in the "admin" credential's project. + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_admin', 'os_primary', 'os_system_admin', + 'os_system_reader', 'os_roles_lb_observer', + 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.l7policy_client.list_l7policies) + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', 'os_roles_lb_observer', + 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if expected_allowed: + self.check_list_RBAC_enforcement( + 'l7policy_client', 'list_l7policies', + expected_allowed) # Check the default sort order, created_at l7policies = self.mem_l7policy_client.list_l7policies( @@ -585,33 +648,24 @@ class L7PolicyAPITest(test_base.LoadBalancerBaseTest): self.assertIsNone(l7policy.pop(const.REDIRECT_URL, None)) self.assertIsNone(l7policy.pop(const.REDIRECT_POOL_ID, None)) - # Test that a user with lb_admin role can see the l7policy + # Test that the appropriate users can see or not see the L7 policies + # based on the API RBAC. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - l7policy_client = self.os_roles_lb_admin.l7policy_client - l7policy_adm = l7policy_client.show_l7policy(l7policy[const.ID]) - self.assertEqual(l7policy_name, l7policy_adm[const.NAME]) - - # Test that a user with cloud admin role can see the l7policy - if not CONF.load_balancer.RBAC_test_type == const.NONE: - adm = self.os_admin.l7policy_client.show_l7policy( - l7policy[const.ID]) - self.assertEqual(l7policy_name, adm[const.NAME]) - - # Test that a different user, with load balancer member role, cannot - # see this l7policy - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.l7policy_client - self.assertRaises(exceptions.Forbidden, - member2_client.show_l7policy, - l7policy[const.ID]) - - # Test that a user, without the load balancer member role, cannot - # show l7policies - if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.l7policy_client.show_l7policy, - l7policy[const.ID]) + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', + 'os_roles_lb_global_observer', + 'os_roles_lb_member'] + if expected_allowed: + self.check_show_RBAC_enforcement( + 'l7policy_client', 'show_l7policy', + expected_allowed, l7policy[const.ID]) @decorators.idempotent_id('08f73b22-550b-4e5a-b3d6-2ec03251ca13') def test_l7policy_update(self): @@ -703,28 +757,22 @@ class L7PolicyAPITest(test_base.LoadBalancerBaseTest): self.assertCountEqual(l7policy_kwargs[const.TAGS], l7policy[const.TAGS]) - # Test that a user, without the load balancer member role, cannot - # use this command + # Test that a user, without the loadbalancer member role, cannot + # update this L7 policy. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.l7policy_client.update_l7policy, - l7policy[const.ID], admin_state_up=True) - - # Assert we didn't go into PENDING_* - l7policy_check = self.mem_l7policy_client.show_l7policy( - l7policy[const.ID]) - self.assertEqual(const.ACTIVE, - l7policy_check[const.PROVISIONING_STATUS]) - self.assertFalse(l7policy_check[const.ADMIN_STATE_UP]) - - # Test that a user, without the load balancer member role, cannot - # update this l7policy - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.l7policy_client - self.assertRaises(exceptions.Forbidden, - member2_client.update_l7policy, - l7policy[const.ID], admin_state_up=True) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_update_RBAC_enforcement( + 'l7policy_client', 'update_l7policy', + expected_allowed, None, None, l7policy[const.ID], + admin_state_up=True) # Assert we didn't go into PENDING_* l7policy_check = self.mem_l7policy_client.show_l7policy( @@ -820,21 +868,21 @@ class L7PolicyAPITest(test_base.LoadBalancerBaseTest): CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) - # Test that a user without the load balancer role cannot - # delete this l7policy + # Test that a user without the loadbalancer role cannot delete this + # L7 policy. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.l7policy_client.delete_l7policy, - l7policy[const.ID]) - - # Test that a different user, with the load balancer member role - # cannot delete this l7policy - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.l7policy_client - self.assertRaises(exceptions.Forbidden, - member2_client.delete_l7policy, - l7policy[const.ID]) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_delete_RBAC_enforcement( + 'l7policy_client', 'delete_l7policy', + expected_allowed, None, None, l7policy[const.ID]) self.mem_l7policy_client.delete_l7policy(l7policy[const.ID]) diff --git a/octavia_tempest_plugin/tests/api/v2/test_l7rule.py b/octavia_tempest_plugin/tests/api/v2/test_l7rule.py index e44c9f84..adaeaad0 100644 --- a/octavia_tempest_plugin/tests/api/v2/test_l7rule.py +++ b/octavia_tempest_plugin/tests/api/v2/test_l7rule.py @@ -19,7 +19,6 @@ from dateutil import parser from tempest import config from tempest.lib.common.utils import data_utils from tempest.lib import decorators -from tempest.lib import exceptions from octavia_tempest_plugin.common import constants as const from octavia_tempest_plugin.tests import test_base @@ -141,13 +140,23 @@ class L7RuleAPITest(test_base.LoadBalancerBaseTest): const.TAGS: l7_rule_tags }) - # Test that a user without the load balancer role cannot - # create a l7rule + # Test that a user without the loadbalancer role cannot + # create an L7 rule. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.l7rule_client.create_l7rule, - **l7rule_kwargs) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_create_RBAC_enforcement( + 'l7rule_client', 'create_l7rule', + expected_allowed, + status_method=self.mem_lb_client.show_loadbalancer, + obj_id=self.lb_id, **l7rule_kwargs) l7rule = self.mem_l7rule_client.create_l7rule(**l7rule_kwargs) self.addClassResourceCleanup( @@ -211,6 +220,9 @@ class L7RuleAPITest(test_base.LoadBalancerBaseTest): * List the l7rules filtering to one of the three. * List the l7rules filtered, one field, and sorted. """ + # IDs of L7 rules created in the test + test_ids = [] + l7policy_name = data_utils.rand_name("lb_member_l7policy2_l7rule-list") l7policy = self.mem_l7policy_client.create_l7policy( name=l7policy_name, listener_id=self.listener_id, @@ -260,6 +272,7 @@ class L7RuleAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.check_interval, CONF.load_balancer.check_timeout) + test_ids.append(l7rule1[const.ID]) # Time resolution for created_at is only to the second, and we need to # ensure that each object has a distinct creation time. Delaying one # second is both a simple and a reliable way to accomplish this. @@ -298,6 +311,7 @@ class L7RuleAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.check_interval, CONF.load_balancer.check_timeout) + test_ids.append(l7rule2[const.ID]) # Time resolution for created_at is only to the second, and we need to # ensure that each object has a distinct creation time. Delaying one # second is both a simple and a reliable way to accomplish this. @@ -336,23 +350,47 @@ class L7RuleAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.check_interval, CONF.load_balancer.check_timeout) + test_ids.append(l7rule3[const.ID]) - # Test that a different user cannot list l7rules - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.l7rule_client - self.assertRaises( - exceptions.Forbidden, - member2_client.list_l7rules, - l7policy_id) - - # Test that a user without the lb l7rule role cannot list load - # balancers + # Test credentials that should see these L7 rules can see them. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.l7rule_client.list_l7rules, + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', 'os_roles_lb_member', + 'os_roles_lb_global_observer'] + if expected_allowed: + self.check_list_IDs_RBAC_enforcement( + 'l7rule_client', 'list_l7rules', expected_allowed, test_ids, l7policy_id) + # Test that users without the lb member role cannot list L7 rules. + # Note: The parent policy ID blocks non-owners from listing + # L7 Rules. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + # Note: os_admin is here because it evaluaties to "project_admin" + # in oslo_policy and since keystone considers "project_admin" + # a superscope of "project_reader". This means it can read + # objects in the "admin" credential's project. + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_admin', 'os_system_admin', + 'os_system_reader', 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.ADVANCED: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', + 'os_roles_lb_global_observer', + 'os_roles_lb_member'] + if expected_allowed: + self.check_list_RBAC_enforcement( + 'l7rule_client', 'list_l7rules', expected_allowed, l7policy_id) + # Check the default sort order, created_at l7rules = self.mem_l7rule_client.list_l7rules(l7policy_id) self.assertEqual(l7rule1[const.VALUE], @@ -521,34 +559,25 @@ class L7RuleAPITest(test_base.LoadBalancerBaseTest): for item in equal_items: self.assertEqual(l7rule_kwargs[item], l7rule[item]) - # Test that a user with lb_admin role can see the l7rule + # Test that the appropriate users can see or not see the L7 rule + # based on the API RBAC. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - l7rule_client = self.os_roles_lb_admin.l7rule_client - l7rule_adm = l7rule_client.show_l7rule( - l7rule[const.ID], l7policy_id=self.l7policy_id) - self.assertEqual(l7rule_kwargs[const.KEY], l7rule_adm[const.KEY]) - - # Test that a user with cloud admin role can see the l7rule - if not CONF.load_balancer.RBAC_test_type == const.NONE: - adm = self.os_admin.l7rule_client.show_l7rule( - l7rule[const.ID], l7policy_id=self.l7policy_id) - self.assertEqual(l7rule_kwargs[const.KEY], adm[const.KEY]) - - # Test that a different user, with load balancer member role, cannot - # see this l7rule - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.l7rule_client - self.assertRaises(exceptions.Forbidden, - member2_client.show_l7rule, - l7rule[const.ID], l7policy_id=self.l7policy_id) - - # Test that a user, without the load balancer member role, cannot - # show l7rules - if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.l7rule_client.show_l7rule, - l7rule[const.ID], l7policy_id=self.l7policy_id) + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', + 'os_roles_lb_global_observer', + 'os_roles_lb_member'] + if expected_allowed: + self.check_show_RBAC_enforcement( + 'l7rule_client', 'show_l7rule', + expected_allowed, l7rule[const.ID], + l7policy_id=self.l7policy_id) @decorators.idempotent_id('f8cee23b-89b6-4f3a-a842-1463daf42cf7') def test_l7rule_update(self): @@ -618,29 +647,22 @@ class L7RuleAPITest(test_base.LoadBalancerBaseTest): for item in equal_items: self.assertEqual(l7rule_kwargs[item], l7rule[item]) - # Test that a user, without the load balancer member role, cannot - # use this command + # Test that a user, without the loadbalancer member role, cannot + # update this L7 rule. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.l7rule_client.update_l7rule, - l7rule[const.ID], l7policy_id=self.l7policy_id, - admin_state_up=True) - - # Assert we didn't go into PENDING_* - l7rule_check = self.mem_l7rule_client.show_l7rule( - l7rule[const.ID], l7policy_id=self.l7policy_id) - self.assertEqual(const.ACTIVE, l7rule_check[const.PROVISIONING_STATUS]) - self.assertFalse(l7rule_check[const.ADMIN_STATE_UP]) - - # Test that a user, without the load balancer member role, cannot - # update this l7rule - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.l7rule_client - self.assertRaises(exceptions.Forbidden, - member2_client.update_l7rule, - l7rule[const.ID], l7policy_id=self.l7policy_id, - admin_state_up=True) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_update_RBAC_enforcement( + 'l7rule_client', 'update_l7rule', + expected_allowed, None, None, l7rule[const.ID], + l7policy_id=self.l7policy_id, admin_state_up=True) # Assert we didn't go into PENDING_* l7rule_check = self.mem_l7rule_client.show_l7rule( @@ -727,21 +749,22 @@ class L7RuleAPITest(test_base.LoadBalancerBaseTest): CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) - # Test that a user without the load balancer role cannot - # delete this l7rule + # Test that a user without the loadbalancer role cannot delete this + # L7 rule. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.l7rule_client.delete_l7rule, - l7rule[const.ID], l7policy_id=self.l7policy_id) - - # Test that a different user, with the load balancer member role - # cannot delete this l7rule - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.l7rule_client - self.assertRaises(exceptions.Forbidden, - member2_client.delete_l7rule, - l7rule[const.ID], l7policy_id=self.l7policy_id) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_delete_RBAC_enforcement( + 'l7rule_client', 'delete_l7rule', + expected_allowed, None, None, l7rule[const.ID], + l7policy_id=self.l7policy_id) self.mem_l7rule_client.delete_l7rule(l7rule[const.ID], l7policy_id=self.l7policy_id) diff --git a/octavia_tempest_plugin/tests/api/v2/test_listener.py b/octavia_tempest_plugin/tests/api/v2/test_listener.py index c688fddd..a482dc00 100644 --- a/octavia_tempest_plugin/tests/api/v2/test_listener.py +++ b/octavia_tempest_plugin/tests/api/v2/test_listener.py @@ -143,13 +143,23 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest): listener_kwargs.update({const.ALLOWED_CIDRS: self.allowed_cidrs}) - # Test that a user without the load balancer role cannot - # create a listener + # Test that a user without the loadbalancer role cannot + # create a listener. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.listener_client.create_listener, - **listener_kwargs) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_create_RBAC_enforcement( + 'listener_client', 'create_listener', + expected_allowed, + status_method=self.mem_lb_client.show_loadbalancer, + obj_id=self.lb_id, **listener_kwargs) listener = self.mem_listener_client.create_listener(**listener_kwargs) @@ -378,6 +388,9 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest): * List the listeners filtering to one of the three. * List the listeners filtered, one field, and sorted. """ + # IDs of listeners created in the test + test_ids = [] + lb_name = data_utils.rand_name("lb_member_lb2_listener-list") lb = self.mem_lb_client.create_loadbalancer( name=lb_name, provider=CONF.load_balancer.provider, @@ -427,6 +440,7 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) + test_ids.append(listener1[const.ID]) # Time resolution for created_at is only to the second, and we need to # ensure that each object has a distinct creation time. Delaying one # second is both a simple and a reliable way to accomplish this. @@ -465,6 +479,7 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) + test_ids.append(listener2[const.ID]) # Time resolution for created_at is only to the second, and we need to # ensure that each object has a distinct creation time. Delaying one # second is both a simple and a reliable way to accomplish this. @@ -503,6 +518,7 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) + test_ids.append(listener3[const.ID]) if not CONF.load_balancer.test_with_noop: # Wait for the enabled listeners to come ONLINE @@ -517,18 +533,67 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest): CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) - # Test that a different user cannot list listeners - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.listener_client - primary = member2_client.list_listeners( - query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id)) - self.assertEqual(0, len(primary)) - - # Test that a user without the lb member role cannot list listeners + # Test that a different users cannot see the lb_member listeners. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_primary', 'os_roles_lb_member2', + 'os_roles_lb_observer'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_admin', 'os_primary', + 'os_roles_lb_member2', 'os_roles_lb_observer', + 'os_roles_lb_global_observer'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.listener_client.list_listeners) + expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2'] + if expected_allowed: + self.check_list_RBAC_enforcement_count( + 'listener_client', 'list_listeners', expected_allowed, 0, + query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id)) + + # Test credentials that should see these listeners can see them. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.ADVANCED: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', 'os_roles_lb_member', + 'os_roles_lb_global_observer'] + if expected_allowed: + self.check_list_IDs_RBAC_enforcement( + 'listener_client', 'list_listeners', expected_allowed, + test_ids, + query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id)) + + # Test that users without the lb member role cannot list listeners. + # Note: non-owners can still call this API, they will just get the list + # of health monitors for their project (zero). The above tests + # are intended to cover the cross project use case. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin', + 'os_roles_lb_member', 'os_roles_lb_member2', + 'os_roles_lb_observer', + 'os_roles_lb_global_observer'] + # Note: os_admin is here because it evaluaties to "project_admin" + # in oslo_policy and since keystone considers "project_admin" + # a superscope of "project_reader". This means it can read + # objects in the "admin" credential's project. + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_admin', 'os_primary', 'os_system_admin', + 'os_system_reader', 'os_roles_lb_observer', + 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if CONF.load_balancer.RBAC_test_type == const.ADVANCED: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', 'os_roles_lb_observer', + 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if expected_allowed: + self.check_list_RBAC_enforcement( + 'listener_client', 'list_listeners', expected_allowed, + query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id)) # Check the default sort order, created_at listeners = self.mem_listener_client.list_listeners( @@ -786,33 +851,24 @@ class ListenerAPITest(test_base.LoadBalancerBaseTest): self.api_version, '2.12'): self.assertEqual(self.allowed_cidrs, listener[const.ALLOWED_CIDRS]) - # Test that a user with lb_admin role can see the listener + # Test that the appropriate users can see or not see the listener + # based on the API RBAC. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - listener_client = self.os_roles_lb_admin.listener_client - listener_adm = listener_client.show_listener(listener[const.ID]) - self.assertEqual(listener_name, listener_adm[const.NAME]) - - # Test that a user with cloud admin role can see the listener - if not CONF.load_balancer.RBAC_test_type == const.NONE: - adm = self.os_admin.listener_client.show_listener( - listener[const.ID]) - self.assertEqual(listener_name, adm[const.NAME]) - - # Test that a different user, with load balancer member role, cannot - # see this listener - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.listener_client - self.assertRaises(exceptions.Forbidden, - member2_client.show_listener, - listener[const.ID]) - - # Test that a user, without the load balancer member role, cannot - # show listeners - if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.listener_client.show_listener, - listener[const.ID]) + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', + 'os_roles_lb_global_observer', + 'os_roles_lb_member'] + if expected_allowed: + self.check_show_RBAC_enforcement( + 'listener_client', 'show_listener', + expected_allowed, listener[const.ID]) @decorators.idempotent_id('aaae0298-5778-4c7e-a27a-01549a71b319') def test_http_listener_update(self): diff --git a/octavia_tempest_plugin/tests/api/v2/test_load_balancer.py b/octavia_tempest_plugin/tests/api/v2/test_load_balancer.py index 0cf35769..ba1187c2 100644 --- a/octavia_tempest_plugin/tests/api/v2/test_load_balancer.py +++ b/octavia_tempest_plugin/tests/api/v2/test_load_balancer.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import copy import testtools import time from uuid import UUID @@ -78,13 +79,25 @@ class LoadBalancerAPITest(test_base.LoadBalancerBaseTest): self._setup_lb_network_kwargs(lb_kwargs, ip_version, use_fixed_ip=True) - # Test that a user without the load balancer role cannot - # create a load balancer + # Test that a user without the loadbalancer role cannot + # create a load balancer. + lb_kwargs_with_project_id = copy.deepcopy(lb_kwargs) + lb_kwargs_with_project_id[const.PROJECT_ID] = ( + self.os_roles_lb_member.credentials.project_id) + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', + 'os_roles_lb_member', 'os_roles_lb_member2'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.loadbalancer_client.create_loadbalancer, - **lb_kwargs) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if expected_allowed: + self.check_create_RBAC_enforcement( + 'loadbalancer_client', 'create_loadbalancer', + expected_allowed, None, None, **lb_kwargs_with_project_id) lb = self.mem_lb_client.create_loadbalancer(**lb_kwargs) @@ -173,21 +186,21 @@ class LoadBalancerAPITest(test_base.LoadBalancerBaseTest): CONF.load_balancer.lb_build_interval, CONF.load_balancer.lb_build_timeout) - # Test that a user without the load balancer role cannot - # delete this load balancer + # Test that a user without the loadbalancer role cannot delete this + # load balancer. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.loadbalancer_client.delete_loadbalancer, - lb[const.ID]) - - # Test that a different user, with the load balancer member role - # cannot delete this load balancer - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.loadbalancer_client - self.assertRaises(exceptions.Forbidden, - member2_client.delete_loadbalancer, - lb[const.ID]) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_delete_RBAC_enforcement( + 'loadbalancer_client', 'delete_loadbalancer', + expected_allowed, None, None, lb[const.ID]) self.mem_lb_client.delete_loadbalancer(lb[const.ID]) @@ -222,21 +235,21 @@ class LoadBalancerAPITest(test_base.LoadBalancerBaseTest): # TODO(johnsom) Add other objects when we have clients for them - # Test that a user without the load balancer role cannot - # delete this load balancer + # Test that a user without the loadbalancer role cannot delete this + # load balancer. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.loadbalancer_client.delete_loadbalancer, - lb[const.ID], cascade=True) - - # Test that a different user, with the load balancer member role - # cannot delete this load balancer - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.loadbalancer_client - self.assertRaises(exceptions.Forbidden, - member2_client.delete_loadbalancer, - lb[const.ID], cascade=True) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_delete_RBAC_enforcement( + 'loadbalancer_client', 'delete_loadbalancer', + expected_allowed, None, None, lb[const.ID], cascade=True) self.mem_lb_client.delete_loadbalancer(lb[const.ID], cascade=True) @@ -267,6 +280,8 @@ class LoadBalancerAPITest(test_base.LoadBalancerBaseTest): * List the load balancers filtering to one of the three. * List the load balancers filtered, one field, and sorted. """ + # IDs of load balancers created in the test + test_ids = [] # Get a list of pre-existing LBs to filter from test data pretest_lbs = self.mem_lb_client.list_loadbalancers() # Store their IDs for easy access @@ -313,6 +328,7 @@ class LoadBalancerAPITest(test_base.LoadBalancerBaseTest): const.ONLINE, CONF.load_balancer.check_interval, CONF.load_balancer.check_timeout) + test_ids.append(lb1[const.ID]) # Time resolution for created_at is only to the second, and we need to # ensure that each object has a distinct creation time. Delaying one @@ -356,6 +372,7 @@ class LoadBalancerAPITest(test_base.LoadBalancerBaseTest): const.ONLINE, CONF.load_balancer.check_interval, CONF.load_balancer.check_timeout) + test_ids.append(lb2[const.ID]) # Time resolution for created_at is only to the second, and we need to # ensure that each object has a distinct creation time. Delaying one @@ -394,19 +411,67 @@ class LoadBalancerAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.lb_build_interval, CONF.load_balancer.lb_build_timeout) + test_ids.append(lb3[const.ID]) - # Test that a different user cannot list load balancers - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.loadbalancer_client - primary = member2_client.list_loadbalancers() - self.assertEqual(0, len(primary)) - - # Test that a user without the lb member role cannot list load - # balancers + # Test that a different users cannot see the lb_member load balancers. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_primary', 'os_roles_lb_member2', + 'os_roles_lb_observer'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_admin', 'os_primary', + 'os_roles_lb_member2', 'os_roles_lb_observer', + 'os_roles_lb_global_observer'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.loadbalancer_client.list_loadbalancers) + expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2'] + if expected_allowed: + self.check_list_RBAC_enforcement_count( + 'loadbalancer_client', 'list_loadbalancers', + expected_allowed, 0) + + # Test credentials that should see these load balancers can see them. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.ADVANCED: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', 'os_roles_lb_member', + 'os_roles_lb_global_observer'] + if expected_allowed: + self.check_list_IDs_RBAC_enforcement( + 'loadbalancer_client', 'list_loadbalancers', + expected_allowed, test_ids) + + # Test that users without the lb member role cannot list load balancers + # Note: non-owners can still call this API, they will just get the list + # of load balancers for their project (zero). The above tests + # are intended to cover the cross project use case. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin', + 'os_roles_lb_member', 'os_roles_lb_member2', + 'os_roles_lb_observer', + 'os_roles_lb_global_observer'] + # Note: os_admin is here because it evaluaties to "project_admin" + # in oslo_policy and since keystone considers "project_admin" + # a superscope of "project_reader". This means it can read + # objects in the "admin" credential's project. + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_admin', 'os_primary', 'os_system_admin', + 'os_system_reader', 'os_roles_lb_observer', + 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if CONF.load_balancer.RBAC_test_type == const.ADVANCED: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', 'os_roles_lb_observer', + 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if expected_allowed: + self.check_list_RBAC_enforcement( + 'loadbalancer_client', 'list_loadbalancers', expected_allowed) # Check the default sort order, created_at lbs = self.mem_lb_client.list_loadbalancers() @@ -566,33 +631,24 @@ class LoadBalancerAPITest(test_base.LoadBalancerBaseTest): self.assertEqual(lb_kwargs[const.VIP_SUBNET_ID], lb[const.VIP_SUBNET_ID]) - # Test that a user with lb_admin role can see the load balanacer + # Test that the appropriate users can see or not see the load + # balancer based on the API RBAC. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - lb_client = self.os_roles_lb_admin.loadbalancer_client - lb_adm = lb_client.show_loadbalancer(lb[const.ID]) - self.assertEqual(lb_name, lb_adm[const.NAME]) - - # Test that a user with cloud admin role can see the load balanacer - if not CONF.load_balancer.RBAC_test_type == const.NONE: - adm = self.os_admin.loadbalancer_client.show_loadbalancer( - lb[const.ID]) - self.assertEqual(lb_name, adm[const.NAME]) - - # Test that a different user, with load balancer member role, cannot - # see this load balancer - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.loadbalancer_client - self.assertRaises(exceptions.Forbidden, - member2_client.show_loadbalancer, - lb[const.ID]) - - # Test that a user, without the load balancer member role, cannot - # show load balancers - if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.loadbalancer_client.show_loadbalancer, - lb[const.ID]) + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', + 'os_roles_lb_global_observer', + 'os_roles_lb_member'] + if expected_allowed: + self.check_show_RBAC_enforcement( + 'loadbalancer_client', 'show_loadbalancer', + expected_allowed, lb[const.ID]) # Attempt to clean up so that one full test run doesn't start 10+ # amps before the cleanup phase fires @@ -679,26 +735,22 @@ class LoadBalancerAPITest(test_base.LoadBalancerBaseTest): new_description = data_utils.arbitrary_string(size=255, base_text='new') - # Test that a user, without the load balancer member role, cannot - # use this command + # Test that a user, without the loadbalancer member role, cannot + # update this load balancer. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.loadbalancer_client.update_loadbalancer, - lb[const.ID], admin_state_up=True) - - # Assert we didn't go into PENDING_* - lb_check = self.mem_lb_client.show_loadbalancer(lb[const.ID]) - self.assertEqual(const.ACTIVE, lb_check[const.PROVISIONING_STATUS]) - self.assertFalse(lb_check[const.ADMIN_STATE_UP]) - - # Test that a user, without the load balancer member role, cannot - # update this load balancer - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.loadbalancer_client - self.assertRaises(exceptions.Forbidden, - member2_client.update_loadbalancer, - lb[const.ID], admin_state_up=True) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_update_RBAC_enforcement( + 'loadbalancer_client', 'update_loadbalancer', + expected_allowed, None, None, lb[const.ID], + admin_state_up=True) # Assert we didn't go into PENDING_* lb_check = self.mem_lb_client.show_loadbalancer(lb[const.ID]) @@ -775,21 +827,24 @@ class LoadBalancerAPITest(test_base.LoadBalancerBaseTest): CONF.load_balancer.lb_build_interval, CONF.load_balancer.lb_build_timeout) - # Test that a user, without the load balancer member role, cannot - # use this command + # Test that the appropriate users can see or not see the load + # balancer stats based on the API RBAC. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.loadbalancer_client.get_loadbalancer_stats, - lb[const.ID]) - - # Test that a different user, with the load balancer role, cannot see - # the load balancer stats - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.loadbalancer_client - self.assertRaises(exceptions.Forbidden, - member2_client.get_loadbalancer_stats, - lb[const.ID]) + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', + 'os_roles_lb_global_observer', + 'os_roles_lb_member'] + if expected_allowed: + self.check_show_RBAC_enforcement( + 'loadbalancer_client', 'get_loadbalancer_stats', + expected_allowed, lb[const.ID]) stats = self.mem_lb_client.get_loadbalancer_stats(lb[const.ID]) @@ -843,21 +898,24 @@ class LoadBalancerAPITest(test_base.LoadBalancerBaseTest): CONF.load_balancer.check_interval, CONF.load_balancer.check_timeout) - # Test that a user, without the load balancer member role, cannot - # use this method + # Test that the appropriate users can see or not see the load + # balancer status based on the API RBAC. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.loadbalancer_client.get_loadbalancer_status, - lb[const.ID]) - - # Test that a different user, with load balancer role, cannot see - # the load balancer status - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.loadbalancer_client - self.assertRaises(exceptions.Forbidden, - member2_client.get_loadbalancer_status, - lb[const.ID]) + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', + 'os_roles_lb_global_observer', + 'os_roles_lb_member'] + if expected_allowed: + self.check_show_RBAC_enforcement( + 'loadbalancer_client', 'get_loadbalancer_status', + expected_allowed, lb[const.ID]) status = self.mem_lb_client.get_loadbalancer_status(lb[const.ID]) @@ -917,6 +975,20 @@ class LoadBalancerAPITest(test_base.LoadBalancerBaseTest): self.mem_lb_client.failover_loadbalancer, lb[const.ID]) + # Test that a user without the load balancer admin role cannot + # failover a load balancer. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if CONF.load_balancer.RBAC_test_type == const.ADVANCED: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin'] + if expected_allowed: + self.check_update_RBAC_enforcement( + 'loadbalancer_client', 'failover_loadbalancer', + expected_allowed, None, None, lb[const.ID]) + # Assert we didn't go into PENDING_* lb = self.mem_lb_client.show_loadbalancer(lb[const.ID]) self.assertEqual(const.ACTIVE, lb[const.PROVISIONING_STATUS]) diff --git a/octavia_tempest_plugin/tests/api/v2/test_member.py b/octavia_tempest_plugin/tests/api/v2/test_member.py index 66e367e2..a5607dab 100644 --- a/octavia_tempest_plugin/tests/api/v2/test_member.py +++ b/octavia_tempest_plugin/tests/api/v2/test_member.py @@ -892,6 +892,24 @@ class MemberAPITest(test_base.LoadBalancerBaseTest): self.os_primary.member_client.create_member, **member_kwargs) + # Test that a user without the loadbalancer role cannot + # create a member. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.ADVANCED: + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_create_RBAC_enforcement( + 'member_client', 'create_member', + expected_allowed, + status_method=self.mem_lb_client.show_loadbalancer, + obj_id=self.lb_id, **member_kwargs) + member = self.mem_member_client.create_member(**member_kwargs) waiters.wait_for_status( @@ -1048,6 +1066,9 @@ class MemberAPITest(test_base.LoadBalancerBaseTest): * List the members filtering to one of the three. * List the members filtered, one field, and sorted. """ + # IDs of members created in the test + test_ids = [] + if (algorithm == const.LB_ALGORITHM_SOURCE_IP_PORT and not self.mem_listener_client.is_version_supported( self.api_version, '2.13')): @@ -1124,6 +1145,7 @@ class MemberAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.check_interval, CONF.load_balancer.check_timeout) + test_ids.append(member1[const.ID]) # Time resolution for created_at is only to the second, and we need to # ensure that each object has a distinct creation time. Delaying one # second is both a simple and a reliable way to accomplish this. @@ -1162,6 +1184,7 @@ class MemberAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.check_interval, CONF.load_balancer.check_timeout) + test_ids.append(member2[const.ID]) # Time resolution for created_at is only to the second, and we need to # ensure that each object has a distinct creation time. Delaying one # second is both a simple and a reliable way to accomplish this. @@ -1200,22 +1223,45 @@ class MemberAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.check_interval, CONF.load_balancer.check_timeout) + test_ids.append(member3[const.ID]) - # Test that a different user cannot list members - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.member_client - self.assertRaises( - exceptions.Forbidden, - member2_client.list_members, - pool_id) - - # Test that a user without the lb member role cannot list load - # balancers + # Test credentials that should see these members can see them. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.member_client.list_members, - pool_id) + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', 'os_roles_lb_member', + 'os_roles_lb_global_observer'] + if expected_allowed: + self.check_list_IDs_RBAC_enforcement( + 'member_client', 'list_members', expected_allowed, + test_ids, pool_id) + + # Test that users without the lb member role cannot list members + # Note: The parent pool ID blocks non-owners from listing members. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + # Note: os_admin is here because it evaluaties to "project_admin" + # in oslo_policy and since keystone considers "project_admin" + # a superscope of "project_reader". This means it can read + # objects in the "admin" credential's project. + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_admin', 'os_system_admin', + 'os_system_reader', 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.ADVANCED: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', + 'os_roles_lb_global_observer', + 'os_roles_lb_member'] + if expected_allowed: + self.check_list_RBAC_enforcement( + 'member_client', 'list_members', expected_allowed, pool_id) # Check the default sort order, created_at members = self.mem_member_client.list_members(pool_id) @@ -1740,34 +1786,25 @@ class MemberAPITest(test_base.LoadBalancerBaseTest): for item in equal_items: self.assertEqual(member_kwargs[item], member[item]) - # Test that a user with lb_admin role can see the member + # Test that the appropriate users can see or not see the member + # based on the API RBAC. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - member_client = self.os_roles_lb_admin.member_client - member_adm = member_client.show_member( - member[const.ID], pool_id=pool_id) - self.assertEqual(member_name, member_adm[const.NAME]) - - # Test that a user with cloud admin role can see the member - if not CONF.load_balancer.RBAC_test_type == const.NONE: - adm = self.os_admin.member_client.show_member( - member[const.ID], pool_id=pool_id) - self.assertEqual(member_name, adm[const.NAME]) - - # Test that a different user, with load balancer member role, cannot - # see this member - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.member_client - self.assertRaises(exceptions.Forbidden, - member2_client.show_member, - member[const.ID], pool_id=pool_id) - - # Test that a user, without the load balancer member role, cannot - # show members - if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.member_client.show_member, - member[const.ID], pool_id=pool_id) + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', + 'os_roles_lb_global_observer', + 'os_roles_lb_member'] + if expected_allowed: + self.check_show_RBAC_enforcement( + 'member_client', 'show_member', + expected_allowed, member[const.ID], + pool_id=pool_id) @decorators.idempotent_id('65680d48-1d49-4959-a7d1-677797e54f6b') def test_HTTP_LC_alt_monitor_member_update(self): @@ -2206,30 +2243,22 @@ class MemberAPITest(test_base.LoadBalancerBaseTest): for item in equal_items: self.assertEqual(member_kwargs[item], member[item]) - # Test that a user, without the load balancer member role, cannot - # use this command + # Test that a user, without the loadbalancer member role, cannot + # update this member. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.member_client.update_member, - member[const.ID], pool_id=pool_id, admin_state_up=True) - - # Assert we didn't go into PENDING_* - member_check = self.mem_member_client.show_member( - member[const.ID], pool_id=pool_id) - self.assertEqual(const.ACTIVE, - member_check[const.PROVISIONING_STATUS]) - self.assertEqual(member_kwargs[const.ADMIN_STATE_UP], - member_check[const.ADMIN_STATE_UP]) - - # Test that a user, without the load balancer member role, cannot - # update this member - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.member_client - self.assertRaises(exceptions.Forbidden, - member2_client.update_member, - member[const.ID], pool_id=pool_id, - admin_state_up=True) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_update_RBAC_enforcement( + 'member_client', 'update_member', + expected_allowed, None, None, member[const.ID], + pool_id=pool_id, admin_state_up=True) # Assert we didn't go into PENDING_* member_check = self.mem_member_client.show_member( @@ -2672,12 +2701,21 @@ class MemberAPITest(test_base.LoadBalancerBaseTest): member2_kwargs.pop(const.POOL_ID) batch_update_list = [member2_kwargs, member3_kwargs] - # Test that a user, without the load balancer member role, cannot - # use this command + # Test that a user, without the loadbalancer member role, cannot + # batch update this member. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.member_client.update_members, + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_update_RBAC_enforcement( + 'member_client', 'update_members', + expected_allowed, None, None, pool_id=pool_id, members_list=batch_update_list) # Assert we didn't go into PENDING_* @@ -2908,21 +2946,22 @@ class MemberAPITest(test_base.LoadBalancerBaseTest): CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) - # Test that a user without the load balancer role cannot - # delete this member + # Test that a user without the loadbalancer role cannot delete this + # member. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.member_client.delete_member, - member[const.ID], pool_id=pool_id) - - # Test that a different user, with the load balancer member role - # cannot delete this member - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.member_client - self.assertRaises(exceptions.Forbidden, - member2_client.delete_member, - member[const.ID], pool_id=pool_id) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_delete_RBAC_enforcement( + 'member_client', 'delete_member', + expected_allowed, None, None, member[const.ID], + pool_id=pool_id) self.mem_member_client.delete_member(member[const.ID], pool_id=pool_id) diff --git a/octavia_tempest_plugin/tests/api/v2/test_pool.py b/octavia_tempest_plugin/tests/api/v2/test_pool.py index 3ab48924..61a17cbb 100644 --- a/octavia_tempest_plugin/tests/api/v2/test_pool.py +++ b/octavia_tempest_plugin/tests/api/v2/test_pool.py @@ -401,13 +401,23 @@ class PoolAPITest(test_base.LoadBalancerBaseTest): else: pool_kwargs[const.LOADBALANCER_ID] = self.lb_id - # Test that a user without the load balancer role cannot - # create a pool + # Test that a user without the loadbalancer role cannot + # create a pool. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.pool_client.create_pool, - **pool_kwargs) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_create_RBAC_enforcement( + 'pool_client', 'create_pool', + expected_allowed, + status_method=self.mem_lb_client.show_loadbalancer, + obj_id=self.lb_id, **pool_kwargs) # This is a special case as the reference driver does not support # SOURCE-IP-PORT. Since it runs with not_implemented_is_error, we must @@ -585,6 +595,9 @@ class PoolAPITest(test_base.LoadBalancerBaseTest): * List the pools filtering to one of the three. * List the pools filtered, one field, and sorted. """ + # IDs of pools created in the test + test_ids = [] + if (algorithm == const.LB_ALGORITHM_SOURCE_IP_PORT and not self.mem_listener_client.is_version_supported( self.api_version, '2.13')): @@ -655,6 +668,7 @@ class PoolAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) + test_ids.append(pool1[const.ID]) # Time resolution for created_at is only to the second, and we need to # ensure that each object has a distinct creation time. Delaying one # second is both a simple and a reliable way to accomplish this. @@ -694,6 +708,7 @@ class PoolAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) + test_ids.append(pool2[const.ID]) # Time resolution for created_at is only to the second, and we need to # ensure that each object has a distinct creation time. Delaying one # second is both a simple and a reliable way to accomplish this. @@ -733,20 +748,68 @@ class PoolAPITest(test_base.LoadBalancerBaseTest): const.ACTIVE, CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) + test_ids.append(pool3[const.ID]) - # Test that a different user cannot list pools - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.pool_client - primary = member2_client.list_pools( - query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id)) - self.assertEqual(0, len(primary)) - - # Test that a user without the lb member role cannot list load - # balancers + # Test that a different users cannot see the lb_member pools. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_primary', 'os_roles_lb_member2', + 'os_roles_lb_observer'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_admin', 'os_primary', + 'os_roles_lb_member2', 'os_roles_lb_observer', + 'os_roles_lb_global_observer'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.pool_client.list_pools) + expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2'] + if expected_allowed: + self.check_list_RBAC_enforcement_count( + 'pool_client', 'list_pools', expected_allowed, 0, + query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id)) + + # Test credentials that should see these pools can see them. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.ADVANCED: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', 'os_roles_lb_member', + 'os_roles_lb_global_observer'] + if expected_allowed: + self.check_list_IDs_RBAC_enforcement( + 'pool_client', 'list_pools', expected_allowed, test_ids, + query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id)) + + # Test that users without the lb member role cannot list pools. + # Note: non-owners can still call this API, they will just get the list + # of pools for their project (zero). The above tests + # are intended to cover the cross project use case. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin', + 'os_roles_lb_member', 'os_roles_lb_member2', + 'os_roles_lb_observer', + 'os_roles_lb_global_observer'] + # Note: os_admin is here because it evaluaties to "project_admin" + # in oslo_policy and since keystone considers "project_admin" + # a superscope of "project_reader". This means it can read + # objects in the "admin" credential's project. + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_admin', 'os_primary', 'os_system_admin', + 'os_system_reader', 'os_roles_lb_observer', + 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if CONF.load_balancer.RBAC_test_type == const.ADVANCED: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', 'os_roles_lb_observer', + 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if expected_allowed: + self.check_list_RBAC_enforcement( + 'pool_client', 'list_pools', expected_allowed, + query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id)) # Check the default sort order, created_at pools = self.mem_pool_client.list_pools( @@ -1062,33 +1125,24 @@ class PoolAPITest(test_base.LoadBalancerBaseTest): self.assertEqual(const.SESSION_PERSISTENCE_SOURCE_IP, pool[const.SESSION_PERSISTENCE][const.TYPE]) - # Test that a user with lb_admin role can see the pool + # Test that the appropriate users can see or not see the pool + # based on the API RBAC. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - pool_client = self.os_roles_lb_admin.pool_client - pool_adm = pool_client.show_pool(pool[const.ID]) - self.assertEqual(pool_name, pool_adm[const.NAME]) - - # Test that a user with cloud admin role can see the pool - if not CONF.load_balancer.RBAC_test_type == const.NONE: - adm = self.os_admin.pool_client.show_pool( - pool[const.ID]) - self.assertEqual(pool_name, adm[const.NAME]) - - # Test that a different user, with load balancer member role, cannot - # see this pool - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.pool_client - self.assertRaises(exceptions.Forbidden, - member2_client.show_pool, - pool[const.ID]) - - # Test that a user, without the load balancer member role, cannot - # show pools - if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.pool_client.show_pool, - pool[const.ID]) + expected_allowed = ['os_system_admin', 'os_system_reader', + 'os_roles_lb_admin', + 'os_roles_lb_global_observer', + 'os_roles_lb_member'] + if expected_allowed: + self.check_show_RBAC_enforcement( + 'pool_client', 'show_pool', + expected_allowed, pool[const.ID]) @decorators.idempotent_id('d73755fe-ba3a-4248-9543-8e167a5aa7f4') def test_HTTP_LC_pool_update(self): @@ -1306,28 +1360,22 @@ class PoolAPITest(test_base.LoadBalancerBaseTest): self.assertEqual(const.SESSION_PERSISTENCE_SOURCE_IP, pool[const.SESSION_PERSISTENCE][const.TYPE]) - # Test that a user, without the load balancer member role, cannot - # use this command + # Test that a user, without the loadbalancer member role, cannot + # update this pool. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.pool_client.update_pool, - pool[const.ID], admin_state_up=True) - - # Assert we didn't go into PENDING_* - pool_check = self.mem_pool_client.show_pool( - pool[const.ID]) - self.assertEqual(const.ACTIVE, - pool_check[const.PROVISIONING_STATUS]) - self.assertFalse(pool_check[const.ADMIN_STATE_UP]) - - # Test that a user, without the load balancer member role, cannot - # update this pool - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.pool_client - self.assertRaises(exceptions.Forbidden, - member2_client.update_pool, - pool[const.ID], admin_state_up=True) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_update_RBAC_enforcement( + 'pool_client', 'update_pool', + expected_allowed, None, None, pool[const.ID], + admin_state_up=True) # Assert we didn't go into PENDING_* pool_check = self.mem_pool_client.show_pool( @@ -1609,21 +1657,21 @@ class PoolAPITest(test_base.LoadBalancerBaseTest): CONF.load_balancer.build_interval, CONF.load_balancer.build_timeout) - # Test that a user without the load balancer role cannot - # delete this pool + # Test that a user without the loadbalancer role cannot delete this + # pool. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = ['os_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_system_admin', 'os_roles_lb_member'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.pool_client.delete_pool, - pool[const.ID]) - - # Test that a different user, with the load balancer member role - # cannot delete this pool - if not CONF.load_balancer.RBAC_test_type == const.NONE: - member2_client = self.os_roles_lb_member2.pool_client - self.assertRaises(exceptions.Forbidden, - member2_client.delete_pool, - pool[const.ID]) + expected_allowed = ['os_system_admin', 'os_roles_lb_admin', + 'os_roles_lb_member'] + if expected_allowed: + self.check_delete_RBAC_enforcement( + 'pool_client', 'delete_pool', + expected_allowed, None, None, pool[const.ID]) self.mem_pool_client.delete_pool(pool[const.ID]) diff --git a/octavia_tempest_plugin/tests/api/v2/test_provider.py b/octavia_tempest_plugin/tests/api/v2/test_provider.py index 917b3655..f85d5982 100644 --- a/octavia_tempest_plugin/tests/api/v2/test_provider.py +++ b/octavia_tempest_plugin/tests/api/v2/test_provider.py @@ -15,7 +15,6 @@ from oslo_log import log as logging from tempest import config from tempest.lib import decorators -from tempest.lib import exceptions from octavia_tempest_plugin.common import constants as const from octavia_tempest_plugin.tests import test_base @@ -44,10 +43,25 @@ class ProviderAPITest(test_base.LoadBalancerBaseTest): # Test that a user without the load balancer role cannot # list providers. + expected_allowed = [] + if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN: + expected_allowed = [ + 'os_admin', 'os_primary', 'os_roles_lb_admin', + 'os_roles_lb_observer', 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES: + expected_allowed = ['os_admin', 'os_primary', 'os_system_admin', + 'os_system_reader', 'os_roles_lb_observer', + 'os_roles_lb_global_observer', + 'os_roles_lb_member', 'os_roles_lb_member2'] if CONF.load_balancer.RBAC_test_type == const.ADVANCED: - self.assertRaises( - exceptions.Forbidden, - self.os_primary.provider_client.list_providers) + expected_allowed = [ + 'os_system_admin', 'os_system_reader', 'os_roles_lb_observer', + 'os_roles_lb_global_observer', 'os_roles_lb_admin', + 'os_roles_lb_member', 'os_roles_lb_member2'] + if expected_allowed: + self.check_list_RBAC_enforcement( + 'provider_client', 'list_providers', expected_allowed) providers = self.mem_provider_client.list_providers() diff --git a/octavia_tempest_plugin/tests/test_base.py b/octavia_tempest_plugin/tests/test_base.py index 887c6446..26e44ee7 100644 --- a/octavia_tempest_plugin/tests/test_base.py +++ b/octavia_tempest_plugin/tests/test_base.py @@ -33,6 +33,7 @@ import tenacity from octavia_tempest_plugin import clients from octavia_tempest_plugin.common import cert_utils from octavia_tempest_plugin.common import constants as const +from octavia_tempest_plugin.tests import RBAC_tests from octavia_tempest_plugin.tests import validators from octavia_tempest_plugin.tests import waiters @@ -45,15 +46,43 @@ RETRY_BACKOFF = 1 RETRY_MAX = 5 -class LoadBalancerBaseTest(validators.ValidatorsMixin, test.BaseTestCase): +class LoadBalancerBaseTest(validators.ValidatorsMixin, + RBAC_tests.RBACTestsMixin, test.BaseTestCase): """Base class for load balancer tests.""" - # Setup cls.os_roles_lb_member. cls.os_primary, cls.os_roles_lb_member, - # and cls.os_roles_lb_admin credentials. - credentials = ['admin', 'primary', - ['lb_member', CONF.load_balancer.member_role], - ['lb_member2', CONF.load_balancer.member_role], - ['lb_admin', CONF.load_balancer.admin_role]] + if CONF.load_balancer.enforce_new_defaults: + credentials = [ + 'admin', 'primary', ['lb_admin', CONF.load_balancer.admin_role], + ['lb_observer', CONF.load_balancer.observer_role, 'reader'], + ['lb_global_observer', CONF.load_balancer.global_observer_role, + 'reader'], + ['lb_member', CONF.load_balancer.member_role, 'member'], + ['lb_member2', CONF.load_balancer.member_role, 'member'], + ['lb_member_not_default_member', CONF.load_balancer.member_role]] + else: + credentials = [ + 'admin', 'primary', ['lb_admin', CONF.load_balancer.admin_role], + ['lb_observer', CONF.load_balancer.observer_role, 'reader'], + ['lb_global_observer', CONF.load_balancer.global_observer_role, + 'reader'], + ['lb_member', CONF.load_balancer.member_role], + ['lb_member2', CONF.load_balancer.member_role]] + + # If scope enforcement is enabled, add in the system scope credentials. + # The project scope is already handled by the above credentials. + if CONF.enforce_scope.octavia: + credentials.extend(['system_admin', 'system_reader']) + + # A tuple of credentials that will be allocated by tempest using the + # 'credentials' list above. These are used to build RBAC test lists. + allocated_creds = [] + for cred in credentials: + if isinstance(cred, list): + allocated_creds.append('os_roles_' + cred[0]) + else: + allocated_creds.append('os_' + cred) + # Tests shall not mess with the list of allocated credentials + allocated_credentials = tuple(allocated_creds) client_manager = clients.ManagerV2 webserver1_response = 1 @@ -102,6 +131,31 @@ class LoadBalancerBaseTest(validators.ValidatorsMixin, test.BaseTestCase): cls.set_network_resources() super(LoadBalancerBaseTest, cls).setup_credentials() + # Log the user roles for this test run + role_name_cache = {} + for cred in cls.credentials: + user_roles = [] + if isinstance(cred, list): + user_name = cred[0] + cred_obj = getattr(cls, 'os_roles_' + cred[0]) + else: + user_name = cred + cred_obj = getattr(cls, 'os_' + cred) + params = {'user.id': cred_obj.credentials.user_id, + 'project.id': cred_obj.credentials.project_id} + roles = cls.os_admin.role_assignments_client.list_role_assignments( + **params)['role_assignments'] + for role in roles: + role_id = role['role']['id'] + try: + role_name = role_name_cache[role_id] + except KeyError: + role_name = cls.os_admin.roles_v3_client.show_role( + role_id)['role']['name'] + role_name_cache[role_id] = role_name + user_roles.append([role_name, role['scope']]) + LOG.info("User %s has roles: %s", user_name, user_roles) + @classmethod def setup_clients(cls): """Setup client aliases.""" diff --git a/releasenotes/notes/Add-RBAC-scoped-tokens-tests-920aa35faf4a8c9d.yaml b/releasenotes/notes/Add-RBAC-scoped-tokens-tests-920aa35faf4a8c9d.yaml new file mode 100644 index 00000000..d2d5d231 --- /dev/null +++ b/releasenotes/notes/Add-RBAC-scoped-tokens-tests-920aa35faf4a8c9d.yaml @@ -0,0 +1,17 @@ +--- +features: + - | + Added API test support for keystone default roles and scoped tokens. +issues: + - | + Currently the API tests will not pass with the + keystone_default_roles-policy.yaml override file. This is due to the + tempest framework credentials do not yet support token scopes. + This issue is tracked in https://bugs.launchpad.net/tempest/+bug/1917168 + Once that bug is fixed, octavia-tempest-plugin can be updated to use the + required scope in the test credentials. +upgrade: + - | + Two new tempest.conf settings enable/disable keystone default roles and + scoped token testing, [enforce_scope] octavia = True/False and + [load_balancer] enforce_new_defaults = True/False. diff --git a/zuul.d/jobs.yaml b/zuul.d/jobs.yaml index 20aa0b4a..08263e35 100644 --- a/zuul.d/jobs.yaml +++ b/zuul.d/jobs.yaml @@ -455,6 +455,23 @@ - ^releasenotes/.*$ - ^octavia_tempest_plugin/tests/(?!api/|\w+\.py).* +- job: + name: octavia-v2-dsvm-noop-api-scoped-tokens + parent: octavia-v2-dsvm-noop-api + vars: + devstack_local_conf: + post-config: + $OCTAVIA_CONF: + oslo_policy: + enforce_scope: True + enforce_new_defaults: True + test-config: + "$TEMPEST_CONFIG": + enforce_scope: + octavia: True + load_balancer: + enforce_new_defaults: True + - job: name: octavia-v2-dsvm-noop-py2-api parent: octavia-v2-dsvm-noop-api diff --git a/zuul.d/projects.yaml b/zuul.d/projects.yaml index d7a5ea45..b3782b98 100644 --- a/zuul.d/projects.yaml +++ b/zuul.d/projects.yaml @@ -12,6 +12,7 @@ - octavia-v2-dsvm-noop-api-stable-victoria - octavia-v2-dsvm-noop-api-stable-ussuri - octavia-v2-dsvm-noop-api-stable-train + - octavia-v2-dsvm-noop-api-scoped-tokens - octavia-v2-dsvm-scenario - octavia-v2-dsvm-scenario-stable-victoria - octavia-v2-dsvm-scenario-stable-ussuri @@ -57,6 +58,7 @@ - octavia-v2-dsvm-noop-api-stable-victoria - octavia-v2-dsvm-noop-api-stable-ussuri - octavia-v2-dsvm-noop-api-stable-train + - octavia-v2-dsvm-noop-api-scoped-tokens - octavia-v2-dsvm-scenario - octavia-v2-dsvm-scenario-stable-victoria - octavia-v2-dsvm-scenario-stable-ussuri