456 lines
18 KiB
Python
456 lines
18 KiB
Python
# -*- encoding: utf-8 -*-
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
"""
|
|
Tests for ACL. Checks whether certain kinds of requests
|
|
are blocked or allowed to be processed.
|
|
"""
|
|
|
|
import abc
|
|
from unittest import mock
|
|
|
|
import ddt
|
|
from keystonemiddleware import auth_token
|
|
from oslo_config import cfg
|
|
|
|
from ironic.api.controllers.v1 import versions as api_versions
|
|
from ironic.common import exception
|
|
from ironic.conductor import rpcapi
|
|
from ironic.tests.unit.api import base
|
|
from ironic.tests.unit.db import utils as db_utils
|
|
|
|
|
|
class TestACLBase(base.BaseApiTest):
|
|
|
|
def setUp(self):
|
|
super(TestACLBase, self).setUp()
|
|
|
|
self.environ = {}
|
|
self.format_data = {}
|
|
self._create_test_data()
|
|
self.fake_token = None
|
|
mock_auth = mock.patch.object(
|
|
auth_token.AuthProtocol, 'process_request',
|
|
autospec=True)
|
|
self.mock_auth = mock_auth.start()
|
|
self.addCleanup(mock_auth.stop)
|
|
|
|
topic = mock.patch.object(
|
|
rpcapi.ConductorAPI, 'get_topic_for', autospec=True)
|
|
self.mock_topic = topic.start()
|
|
self.mock_topic.side_effect = exception.TemporaryFailure
|
|
self.addCleanup(topic.stop)
|
|
rtopic = mock.patch.object(rpcapi.ConductorAPI, 'get_random_topic',
|
|
autospec=True)
|
|
self.mock_random_topic = rtopic.start()
|
|
self.mock_random_topic.side_effect = exception.TemporaryFailure
|
|
self.addCleanup(rtopic.stop)
|
|
self._set_test_config()
|
|
|
|
def _make_app(self):
|
|
cfg.CONF.set_override('auth_strategy', 'keystone')
|
|
return super(TestACLBase, self)._make_app()
|
|
|
|
@abc.abstractmethod
|
|
def _create_test_data(self):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _set_test_config(self):
|
|
pass
|
|
|
|
def _check_skip(self, **kwargs):
|
|
if kwargs.get('skip_reason'):
|
|
self.skipTest(kwargs.get('skip_reason'))
|
|
|
|
def _fake_process_request(self, request, auth_token_request):
|
|
pass
|
|
|
|
def _test_request(self, path, params=None, headers=None, method='get',
|
|
body=None, assert_status=None,
|
|
assert_dict_contains=None,
|
|
assert_list_length=None,
|
|
deprecated=None):
|
|
path = path.format(**self.format_data)
|
|
self.mock_auth.side_effect = self._fake_process_request
|
|
|
|
# always request the latest api version
|
|
version = api_versions.max_version_string()
|
|
rheaders = {
|
|
'X-OpenStack-Ironic-API-Version': version
|
|
}
|
|
# NOTE(TheJulia): Logging the test request to aid
|
|
# in troubleshooting ACL testing. This is a pattern
|
|
# followed in API unit testing in ironic, and
|
|
# really does help.
|
|
print('API ACL Testing Path %s %s' % (method, path))
|
|
if headers:
|
|
for k, v in headers.items():
|
|
rheaders[k] = v.format(**self.format_data)
|
|
if method == 'get':
|
|
response = self.get_json(
|
|
path,
|
|
headers=rheaders,
|
|
expect_errors=True,
|
|
extra_environ=self.environ,
|
|
path_prefix=''
|
|
)
|
|
elif method == 'put':
|
|
response = self.put_json(
|
|
path,
|
|
headers=rheaders,
|
|
expect_errors=True,
|
|
extra_environ=self.environ,
|
|
path_prefix='',
|
|
params=body
|
|
)
|
|
elif method == 'post':
|
|
response = self.post_json(
|
|
path,
|
|
headers=rheaders,
|
|
expect_errors=True,
|
|
extra_environ=self.environ,
|
|
path_prefix='',
|
|
params=body
|
|
)
|
|
elif method == 'patch':
|
|
response = self.patch_json(
|
|
path,
|
|
params=body,
|
|
headers=rheaders,
|
|
expect_errors=True,
|
|
extra_environ=self.environ,
|
|
path_prefix=''
|
|
)
|
|
elif method == 'delete':
|
|
response = self.delete(
|
|
path,
|
|
headers=rheaders,
|
|
expect_errors=True,
|
|
extra_environ=self.environ,
|
|
path_prefix=''
|
|
)
|
|
else:
|
|
assert False, 'Unimplemented test method: %s' % method
|
|
# Once miggrated:
|
|
# Items will return:
|
|
# 403 - Trying to access something that is generally denied.
|
|
# Example: PATCH /v1/nodes/<uuid> as a reader.
|
|
# 404 - Trying to access something where we don't have permissions
|
|
# in a project scope. This is particularly true where implied
|
|
# permissions or assocation exists. Ports are attempted to be
|
|
# accessed when the underlying node is inaccessible as owner
|
|
# nor node matches.
|
|
# Example: GET /v1/portgroups or /v1/nodes/<uuid>/ports
|
|
# 500 - Attempting to access something such an system scoped endpoint
|
|
# with a project scoped request. Example: /v1/conductors.
|
|
if not (bool(deprecated)
|
|
and ('404' in response.status
|
|
or '500' in response.status
|
|
or '403' in response.status)
|
|
and cfg.CONF.oslo_policy.enforce_scope
|
|
and cfg.CONF.oslo_policy.enforce_new_defaults):
|
|
self.assertEqual(assert_status, response.status_int)
|
|
else:
|
|
self.assertTrue(
|
|
('404' in response.status
|
|
or '500' in response.status
|
|
or '403' in response.status))
|
|
# We can't check the contents of the response if there is no
|
|
# response.
|
|
return
|
|
if not bool(deprecated):
|
|
self.assertIsNotNone(assert_status,
|
|
'Tests must include an assert_status')
|
|
|
|
if assert_dict_contains:
|
|
for k, v in assert_dict_contains.items():
|
|
self.assertIn(k, response)
|
|
print(k)
|
|
print(v)
|
|
if str(v) == "None":
|
|
# Compare since the variable loaded from the
|
|
# json ends up being null in json or None.
|
|
self.assertIsNone(response.json[k])
|
|
elif str(v) == "{}":
|
|
# Special match for signifying a dictonary.
|
|
self.assertEqual({}, response.json[k])
|
|
elif isinstance(v, dict):
|
|
# The value from the YAML can be a dictionary,
|
|
# which cannot be formatted, so we're likely doing
|
|
# direct matching.
|
|
self.assertEqual(str(v), str(response.json[k]))
|
|
else:
|
|
self.assertEqual(v.format(**self.format_data),
|
|
response.json[k])
|
|
|
|
if assert_list_length:
|
|
for root, length in assert_list_length.items():
|
|
# root - object to look inside
|
|
# length - number of expected elements which will be
|
|
# important for owner/lessee testing.
|
|
items = response.json[root]
|
|
self.assertIsInstance(items, list)
|
|
if not (bool(deprecated)
|
|
and cfg.CONF.oslo_policy.enforce_scope):
|
|
self.assertEqual(length, len(items))
|
|
else:
|
|
# If we have scope enforcement, we likely have different
|
|
# views, such as "other" admins being subjected to
|
|
# a filtered view in these cases.
|
|
self.assertEqual(0, len(items))
|
|
|
|
# NOTE(TheJulia): API tests in Ironic tend to have a pattern
|
|
# to print request and response data to aid in development
|
|
# and troubleshooting. As such the prints should remain,
|
|
# at least until we are through primary development of the
|
|
# this test suite.
|
|
print('ACL Test GOT %s' % response)
|
|
|
|
|
|
@ddt.ddt
|
|
class TestRBACBasic(TestACLBase):
|
|
|
|
def _create_test_data(self):
|
|
fake_db_node = db_utils.create_test_node(chassis_id=None)
|
|
self.format_data['node_uuid'] = fake_db_node['uuid']
|
|
|
|
@ddt.file_data('test_acl_basic.yaml')
|
|
@ddt.unpack
|
|
def test_basic(self, **kwargs):
|
|
self._check_skip(**kwargs)
|
|
self._test_request(**kwargs)
|
|
|
|
|
|
@ddt.ddt
|
|
class TestRBACModelBeforeScopesBase(TestACLBase):
|
|
|
|
def _create_test_data(self):
|
|
allocated_node_id = 31
|
|
fake_db_allocation = db_utils.create_test_allocation(
|
|
node_id=allocated_node_id,
|
|
resource_class="CUSTOM_TEST")
|
|
fake_db_node = db_utils.create_test_node(
|
|
chassis_id=None,
|
|
driver='fake-driverz',
|
|
owner='z')
|
|
fake_db_node_alloced = db_utils.create_test_node(
|
|
id=allocated_node_id,
|
|
chassis_id=None,
|
|
allocation_id=fake_db_allocation['id'],
|
|
uuid='22e26c0b-03f2-4d2e-ae87-c02d7f33c000',
|
|
driver='fake-driverz',
|
|
owner='z')
|
|
fake_vif_port_id = "ee21d58f-5de2-4956-85ff-33935ea1ca00"
|
|
fake_db_port = db_utils.create_test_port(
|
|
node_id=fake_db_node['id'],
|
|
internal_info={'tenant_vif_port_id': fake_vif_port_id})
|
|
fake_db_portgroup = db_utils.create_test_portgroup(
|
|
uuid="6eb02b44-18a3-4659-8c0b-8d2802581ae4",
|
|
node_id=fake_db_node['id'])
|
|
fake_db_chassis = db_utils.create_test_chassis(
|
|
drivers=['fake-hardware', 'fake-driverz', 'fake-driver'])
|
|
fake_db_deploy_template = db_utils.create_test_deploy_template()
|
|
fake_db_conductor = db_utils.create_test_conductor()
|
|
fake_db_volume_target = db_utils.create_test_volume_target(
|
|
node_id=fake_db_allocation['id'])
|
|
fake_db_volume_connector = db_utils.create_test_volume_connector(
|
|
node_id=fake_db_allocation['id'])
|
|
# Trait name aligns with create_test_node_trait.
|
|
fake_trait = 'trait'
|
|
fake_setting = 'FAKE_SETTING'
|
|
db_utils.create_test_bios_setting(
|
|
node_id=fake_db_node['id'],
|
|
name=fake_setting,
|
|
value=fake_setting)
|
|
db_utils.create_test_node_trait(
|
|
node_id=fake_db_node['id'])
|
|
|
|
# dedicated node for portgroup addition test to avoid
|
|
# false positives with test runners.
|
|
db_utils.create_test_node(
|
|
uuid='18a552fb-dcd2-43bf-9302-e4c93287be11')
|
|
self.format_data.update({
|
|
'node_ident': fake_db_node['uuid'],
|
|
'allocated_node_ident': fake_db_node_alloced['uuid'],
|
|
'port_ident': fake_db_port['uuid'],
|
|
'portgroup_ident': fake_db_portgroup['uuid'],
|
|
'chassis_ident': fake_db_chassis['uuid'],
|
|
'deploy_template_ident': fake_db_deploy_template['uuid'],
|
|
'allocation_ident': fake_db_allocation['uuid'],
|
|
'conductor_ident': fake_db_conductor['hostname'],
|
|
'vif_ident': fake_vif_port_id,
|
|
# Can't use the same fake-driver as other tests can
|
|
# pollute a global method cache in the API that is in the
|
|
# test runner, resulting in false positives.
|
|
'driver_name': 'fake-driverz',
|
|
'bios_setting': fake_setting,
|
|
'trait': fake_trait,
|
|
'volume_target_ident': fake_db_volume_target['uuid'],
|
|
'volume_connector_ident': fake_db_volume_connector['uuid'],
|
|
})
|
|
|
|
|
|
@ddt.ddt
|
|
class TestRBACModelBeforeScopes(TestRBACModelBeforeScopesBase):
|
|
|
|
def _set_test_config(self):
|
|
# NOTE(TheJulia): Sets default test conditions, in the event
|
|
# oslo_policy defaults change.
|
|
cfg.CONF.set_override('enforce_scope', False, group='oslo_policy')
|
|
cfg.CONF.set_override('enforce_new_defaults', False,
|
|
group='oslo_policy')
|
|
|
|
@ddt.file_data('test_rbac_legacy.yaml')
|
|
@ddt.unpack
|
|
def test_rbac_legacy(self, **kwargs):
|
|
self._check_skip(**kwargs)
|
|
self._test_request(**kwargs)
|
|
|
|
|
|
@ddt.ddt
|
|
class TestRBACScoped(TestRBACModelBeforeScopes):
|
|
"""Test Scoped RBAC access using our existing access policy."""
|
|
|
|
def _set_test_config(self):
|
|
# NOTE(TheJulia): This test class is as like a canary.
|
|
# The operational intent is for it to kind of provide
|
|
# a safety net as we're changing policy rules so we can
|
|
# incremently disable the ones we *know* will no longer work
|
|
# while we also enable the new ones in another test class with
|
|
# the appropriate scope friendly chagnges. In other words, two
|
|
# test changes will be needed for each which should also reduce
|
|
# risk of accidential policy changes. It may just be Julia being
|
|
# super risk-adverse, just let her roll with it and we will delete
|
|
# this class later.
|
|
# NOTE(TheJulia): This test class runs with test_rbac_legacy.yaml!
|
|
cfg.CONF.set_override('enforce_scope', True, group='oslo_policy')
|
|
cfg.CONF.set_override('enforce_new_defaults', True,
|
|
group='oslo_policy')
|
|
|
|
@ddt.file_data('test_rbac_legacy.yaml')
|
|
def test_scoped_canary(self, **kwargs):
|
|
self._check_skip(**kwargs)
|
|
self._test_request(**kwargs)
|
|
|
|
|
|
@ddt.ddt
|
|
class TestRBACScopedRequests(TestRBACModelBeforeScopesBase):
|
|
|
|
@ddt.file_data('test_rbac_system_scoped.yaml')
|
|
@ddt.unpack
|
|
def test_system_scoped(self, **kwargs):
|
|
self._check_skip(**kwargs)
|
|
self._test_request(**kwargs)
|
|
|
|
|
|
@ddt.ddt
|
|
class TestRBACProjectScoped(TestACLBase):
|
|
|
|
def setUp(self):
|
|
super(TestRBACProjectScoped, self).setUp()
|
|
|
|
cfg.CONF.set_override('enforce_scope', True, group='oslo_policy')
|
|
cfg.CONF.set_override('enforce_new_defaults', True,
|
|
group='oslo_policy')
|
|
|
|
def _create_test_data(self):
|
|
owner_node_ident = '1ab63b9e-66d7-4cd7-8618-dddd0f9f7881'
|
|
lessee_node_ident = '38d5abed-c585-4fce-a57e-a2ffc2a2ec6f'
|
|
owner_project_id = '70e5e25a-2ca2-4cb1-8ae8-7d8739cee205'
|
|
lessee_project_id = 'f11853c7-fa9c-4db3-a477-c9d8e0dbbf13'
|
|
unowned_node = db_utils.create_test_node(chassis_id=None)
|
|
|
|
# owned node - since the tests use the same node for
|
|
# owner/lesse checks
|
|
owned_node = db_utils.create_test_node(
|
|
uuid=owner_node_ident,
|
|
owner=owner_project_id,
|
|
last_error='meow',
|
|
reservation='lolcats')
|
|
owned_node_port = db_utils.create_test_port(
|
|
uuid='ebe30f19-358d-41e1-8d28-fd7357a0164c',
|
|
node_id=owned_node['id'],
|
|
address='00:00:00:00:00:01')
|
|
db_utils.create_test_port(
|
|
uuid='21a3c5a7-1e14-44dc-a9dd-0c84d5477a57',
|
|
node_id=owned_node['id'],
|
|
address='00:00:00:00:00:02')
|
|
owner_pgroup = db_utils.create_test_portgroup(
|
|
uuid='b16efcf3-2990-41a1-bc1d-5e2c16f3d5fc',
|
|
node_id=owned_node['id'],
|
|
name='magicfoo',
|
|
address='01:03:09:ff:01:01')
|
|
db_utils.create_test_volume_target(
|
|
uuid='a265e2f0-e97f-4177-b1c0-8298add53086',
|
|
node_id=owned_node['id'])
|
|
db_utils.create_test_volume_connector(
|
|
uuid='65ea0296-219b-4635-b0c8-a6e055da878d',
|
|
node_id=owned_node['id'],
|
|
connector_id='iqn.2012-06.org.openstack.magic')
|
|
|
|
# Leased nodes
|
|
leased_node = db_utils.create_test_node(
|
|
uuid=lessee_node_ident,
|
|
owner=owner_project_id,
|
|
lessee=lessee_project_id,
|
|
last_error='meow',
|
|
reservation='lolcats')
|
|
fake_db_volume_target = db_utils.create_test_volume_target(
|
|
node_id=leased_node['id'])
|
|
fake_db_volume_connector = db_utils.create_test_volume_connector(
|
|
node_id=leased_node['id'])
|
|
fake_db_port = db_utils.create_test_port(
|
|
node_id=leased_node['id'])
|
|
fake_db_portgroup = db_utils.create_test_portgroup(
|
|
node_id=leased_node['id'])
|
|
fake_trait = 'CUSTOM_MEOW'
|
|
fake_vif_port_id = "0e21d58f-5de2-4956-85ff-33935ea1ca01"
|
|
|
|
# Random objects that shouldn't be project visible
|
|
other_port = db_utils.create_test_port(
|
|
uuid='abfd8dbb-1732-449a-b760-2224035c6b99',
|
|
address='00:00:00:00:00:ff')
|
|
|
|
other_node = db_utils.create_test_node(
|
|
uuid='573208e5-cd41-4e26-8f06-ef44022b3793')
|
|
other_pgroup = db_utils.create_test_portgroup(
|
|
uuid='5810f41c-6585-41fc-b9c9-a94f50d421b5',
|
|
node_id=other_node['id'],
|
|
name='corgis_rule_the_world',
|
|
address='ff:ff:ff:ff:ff:0f')
|
|
|
|
self.format_data.update({
|
|
'node_ident': unowned_node['uuid'],
|
|
'owner_node_ident': owner_node_ident,
|
|
'lessee_node_ident': lessee_node_ident,
|
|
'allocated_node_ident': lessee_node_ident,
|
|
'volume_target_ident': fake_db_volume_target['uuid'],
|
|
'volume_connector_ident': fake_db_volume_connector['uuid'],
|
|
'lessee_port_ident': fake_db_port['uuid'],
|
|
'lessee_portgroup_ident': fake_db_portgroup['uuid'],
|
|
'trait': fake_trait,
|
|
'vif_ident': fake_vif_port_id,
|
|
'ind_component': 'component',
|
|
'ind_ident': 'magic_light',
|
|
'owner_port_ident': owned_node_port['uuid'],
|
|
'other_port_ident': other_port['uuid'],
|
|
'owner_portgroup_ident': owner_pgroup['uuid'],
|
|
'other_portgroup_ident': other_pgroup['uuid']})
|
|
|
|
@ddt.file_data('test_rbac_project_scoped.yaml')
|
|
@ddt.unpack
|
|
def test_project_scoped(self, **kwargs):
|
|
self._check_skip(**kwargs)
|
|
self._test_request(**kwargs)
|