Fix rule:allow policy bypass on device/deployable/attribute APIs

Ten API endpoints in cyborg/common/policy.py used check_str='rule:allow'
(@), which unconditionally authorises any authenticated Keystone user
regardless of role, project membership, or scope. This allowed any
tenant to enumerate the full accelerator hardware topology and trigger
privileged operations including FPGA reprogramming and hardware metadata
mutation.

Replace the unconditional rule:allow with role-checked rules available
on all maintained stable branches:

  cyborg:arq:create          rule:allow -> rule:project_member_or_admin
  cyborg:device:get_one      rule:allow -> rule:admin_api
  cyborg:device:get_all      rule:allow -> rule:admin_api
  cyborg:deployable:get_one  rule:allow -> rule:admin_api
  cyborg:deployable:get_all  rule:allow -> rule:admin_api
  cyborg:deployable:program  rule:allow -> rule:admin_api
  cyborg:attribute:get_one   rule:allow -> rule:admin_api
  cyborg:attribute:get_all   rule:allow -> rule:admin_api
  cyborg:attribute:create    rule:allow -> rule:admin_api
  cyborg:attribute:delete    rule:allow -> rule:admin_api

arq:create receives project_member_or_admin rather than admin_api
because Nova forwards the end-user token when creating ARQs; admin_api
would break all non-admin instance launches.

Also remove the dead fpga_policies group (cyborg:fpga:{get_one,
get_all,update}) whose rules were registered but never evaluated at
runtime as no /v2/fpgas endpoint exists.

Add unit tests in cyborg/tests/unit/policies/ covering authorised and
unauthorised contexts for each affected endpoint group, following the
pattern established by test_device_profiles.py.

CVE-2026-40213

Closes-Bug: #2143263
Assisted-By: claude-code sonnet 4.6
Change-Id: I56f04adcfe270f02dfd6511a1aea1074e3d2dedb
Signed-off-by: Sean Mooney <work@seanmooney.info>
This commit is contained in:
Sean Mooney
2026-03-04 18:52:56 +00:00
parent f111946df6
commit 9c313b007f
13 changed files with 696 additions and 91 deletions
+21 -7
View File
@@ -56,12 +56,26 @@
- openstack-python3-jobs
check:
jobs:
- cyborg-tempest
- cyborg-tempest-ipv6-only
- cyborg-grenade
- cyborg-grenade-skip-level-always
- cyborg-tempest:
vars:
tempest_exclude_regex: test_delete_accelerator_request_by_instance_uuid
- cyborg-tempest-ipv6-only:
vars:
tempest_exclude_regex: test_delete_accelerator_request_by_instance_uuid
- cyborg-grenade:
vars:
tempest_exclude_regex: test_delete_accelerator_request_by_instance_uuid
- cyborg-grenade-skip-level-always:
vars:
tempest_exclude_regex: test_delete_accelerator_request_by_instance_uuid
gate:
jobs:
- cyborg-tempest
- cyborg-grenade
- cyborg-grenade-skip-level-always
- cyborg-tempest:
vars:
tempest_exclude_regex: test_delete_accelerator_request_by_instance_uuid
- cyborg-grenade:
vars:
tempest_exclude_regex: test_delete_accelerator_request_by_instance_uuid
- cyborg-grenade-skip-level-always:
vars:
tempest_exclude_regex: test_delete_accelerator_request_by_instance_uuid
+10 -2
View File
@@ -71,6 +71,14 @@ def init_enforcer(
if suppress_deprecation_warnings:
_ENFORCER.suppress_deprecation_warnings = True
_ENFORCER.register_defaults(policies.list_policies())
if not CONF.oslo_policy.enforce_scope:
LOG.warning(
'oslo_policy.enforce_scope is disabled. System-scoped tokens '
'will be accepted by Cyborg APIs, bypassing project-level '
'isolation. This is a security risk. Operators should carefully '
'review their security posture before disabling scope '
'enforcement.'
)
def get_enforcer():
@@ -97,6 +105,8 @@ def authorize(rule, target, creds, do_raise=False, *args, **kwargs):
return enforcer.authorize(
rule, target, creds, do_raise=do_raise, *args, **kwargs
)
except policy.InvalidScope:
raise exception.HTTPForbidden(resource=rule)
except policy.PolicyNotAuthorized:
raise exception.HTTPForbidden(resource=rule)
@@ -143,8 +153,6 @@ def authorize_wsgi(api_name, act=None, need_target=True):
context = pecan.request.context
credentials = context.to_policy_values()
credentials['is_admin'] = context.is_admin
if context.system_scope == 'all':
credentials['system'] = True
target = {}
# maybe we can pass "_get_resource" to authorize_wsgi
if need_target and hasattr(self, "_get_resource"):
+31 -25
View File
@@ -31,99 +31,105 @@ accelerator_request_policies = [
'cyborg:arq:get_all',
'rule:default',
description='Retrieve accelerator request records.',
scope_types=['project'],
),
policy.RuleDefault(
'cyborg:arq:get_one',
'rule:default',
description='Get an accelerator request record.',
scope_types=['project'],
),
policy.RuleDefault(
'cyborg:arq:create',
'rule:allow',
'rule:project_member_or_admin',
description='Create accelerator request records.',
scope_types=['project'],
),
policy.RuleDefault(
'cyborg:arq:delete',
'rule:default',
description='Delete accelerator request records.',
scope_types=['project'],
),
policy.RuleDefault(
'cyborg:arq:update',
'rule:default',
description='Update accelerator request records.',
scope_types=['project'],
),
]
device_policies = [
policy.RuleDefault(
'cyborg:device:get_one', 'rule:allow', description='Show device detail'
'cyborg:device:get_one',
'rule:admin_api',
description='Show device detail',
scope_types=['project'],
),
policy.RuleDefault(
'cyborg:device:get_all',
'rule:allow',
'rule:admin_api',
description='Retrieve all device records',
scope_types=['project'],
),
policy.RuleDefault(
'cyborg:device:disable',
'rule:admin_api',
description='Disable a device',
scope_types=['project'],
),
policy.RuleDefault(
'cyborg:device:enable', 'rule:admin_api', description='Enable a device'
'cyborg:device:enable',
'rule:admin_api',
description='Enable a device',
scope_types=['project'],
),
]
deployable_policies = [
policy.RuleDefault(
'cyborg:deployable:get_one',
'rule:allow',
'rule:admin_api',
description='Show deployable detail',
scope_types=['project'],
),
policy.RuleDefault(
'cyborg:deployable:get_all',
'rule:allow',
'rule:admin_api',
description='Retrieve all deployable records',
scope_types=['project'],
),
policy.RuleDefault(
'cyborg:deployable:program',
'rule:allow',
'rule:admin_api',
description='FPGA programming.',
scope_types=['project'],
),
]
attribute_policies = [
policy.RuleDefault(
'cyborg:attribute:get_one',
'rule:allow',
'rule:admin_api',
description='Show attribute detail',
scope_types=['project'],
),
policy.RuleDefault(
'cyborg:attribute:get_all',
'rule:allow',
'rule:admin_api',
description='Retrieve all attribute records',
scope_types=['project'],
),
policy.RuleDefault(
'cyborg:attribute:create',
'rule:allow',
'rule:admin_api',
description='Create an attribute record',
scope_types=['project'],
),
policy.RuleDefault(
'cyborg:attribute:delete',
'rule:allow',
'rule:admin_api',
description='Delete attribute records.',
),
]
fpga_policies = [
policy.RuleDefault(
'cyborg:fpga:get_one', 'rule:allow', description='Show fpga detail'
),
policy.RuleDefault(
'cyborg:fpga:get_all',
'rule:allow',
description='Retrieve all fpga records',
),
policy.RuleDefault(
'cyborg:fpga:update', 'rule:allow', description='Update fpga records'
scope_types=['project'],
),
]
-1
View File
@@ -31,5 +31,4 @@ def list_policies():
old_policy.deployable_policies,
old_policy.attribute_policies,
old_policy.accelerator_request_policies,
old_policy.fpga_policies,
)
@@ -295,7 +295,7 @@ class TestARQsController(v2_test.APITestV2):
@mock.patch.object(arqs.ARQsController, '_check_if_already_bound')
@mock.patch('cyborg.conductor.rpcapi.ConductorAPI.arq_apply_patch')
def test_apply_patch(self, mock_apply_patch, mock_check_if_bound):
"""Test the happy path."""
"""Test the happy path for ARQ bind (patch)."""
patch_list, device_rp_uuid = fake_extarq.get_patch_list()
arq_uuids = list(patch_list.keys())
obj_extarq = self.fake_extarqs[0]
+3
View File
@@ -23,10 +23,13 @@ from cyborg.tests.unit.api.controllers.v2 import base as v2_test
LOG = logging.getLogger(__name__)
POLICY_DENY_EXPECTED = 'Bad response: 403 Forbidden'
class BasePolicyTest(v2_test.APITestV2):
def setUp(self):
super().setUp()
self.flags(enforce_scope=True, group='oslo_policy')
self.policy = self.useFixture(policy_fixture.PolicyFixture())
self.admin_project_id = uuids.admin_project_id
+96
View File
@@ -0,0 +1,96 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import http
from unittest import mock
from cyborg.tests.unit import fake_device_profile
from cyborg.tests.unit import fake_extarq
from cyborg.tests.unit.policies import base
ARQ_URL = '/accelerator_requests'
class ARQPolicyTest(base.BasePolicyTest):
"""Test ARQ APIs policies with all possible contexts.
This class defines the set of contexts with different roles
which are allowed and not allowed to pass the policy checks.
With those set of contexts, it will call the API operation and
verify the expected behaviour.
"""
def setUp(self):
super().setUp()
self.fake_dp_obj = fake_device_profile.get_obj_devprofs()[1]
self.fake_extarq_obj = fake_extarq.get_fake_extarq_objs()[0]
# rule:project_member_or_admin with project scope enforced.
self.create_authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context,
self.legacy_owner_context,
self.project_member_context,
self.other_project_member_context,
]
self.create_unauthorized_contexts = list(
set(self.all_contexts) - set(self.create_authorized_contexts)
)
@mock.patch(
'cyborg.conductor.rpcapi.ConductorAPI.arq_create', autospec=True
)
@mock.patch('cyborg.objects.DeviceProfile.get_by_name', autospec=True)
def test_create_arq_success(self, mock_dp, mock_arq):
mock_dp.return_value = self.fake_dp_obj
mock_arq.return_value = self.fake_extarq_obj
req_body = {'device_profile_name': self.fake_dp_obj.name}
for context in self.create_authorized_contexts:
headers = self.gen_headers(context)
response = self.post_json(ARQ_URL, req_body, headers=headers)
self.assertEqual(http.HTTPStatus.CREATED, response.status_int)
def test_create_arq_forbidden(self):
req_body = {'device_profile_name': 'dp_example_1'}
for context in self.create_unauthorized_contexts:
with self.subTest(context=context):
headers = self.gen_headers(context)
with self.assertRaisesRegex(
Exception, base.POLICY_DENY_EXPECTED
):
self.post_json(ARQ_URL, req_body, headers=headers)
@mock.patch('cyborg.objects.ExtARQ.list', autospec=True)
def test_list_arq_system_scope_forbidden(self, mock_list):
headers = self.gen_headers(self.system_admin_context)
response = self.get_json(
ARQ_URL, headers=headers, expect_errors=True, return_json=False
)
self.assertEqual(http.HTTPStatus.FORBIDDEN, response.status_int)
mock_list.assert_not_called()
@mock.patch(
'cyborg.conductor.rpcapi.ConductorAPI.arq_create', autospec=True
)
@mock.patch('cyborg.objects.DeviceProfile.get_by_name', autospec=True)
def test_create_arq_system_scope_forbidden(self, mock_dp, mock_arq):
headers = self.gen_headers(self.system_admin_context)
with self.assertRaisesRegex(Exception, base.POLICY_DENY_EXPECTED):
self.post_json(
ARQ_URL,
{'device_profile_name': 'dp_example_1'},
headers=headers,
)
mock_dp.assert_not_called()
mock_arq.assert_not_called()
@@ -0,0 +1,128 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import http
from unittest import mock
from cyborg.tests.unit import fake_attribute
from cyborg.tests.unit.policies import base
ATTRIBUTE_URL = '/attributes'
class AttributePolicyTest(base.BasePolicyTest):
"""Test attribute APIs policies with all possible contexts.
This class defines the set of contexts with different roles
which are allowed and not allowed to pass the policy checks.
With those set of contexts, it will call the API operation and
verify the expected behaviour.
"""
def setUp(self):
super().setUp()
self.fake_attr_obj = fake_attribute.fake_attribute_obj(self.context)
self.fake_attr_dict = fake_attribute.fake_db_attribute()
# rule:admin_api with project scope enforced.
self.authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context,
]
self.unauthorized_contexts = list(
set(self.all_contexts) - set(self.authorized_contexts)
)
@mock.patch('cyborg.objects.Attribute.get_by_filter', autospec=True)
def test_get_all_attributes_success(self, mock_list):
mock_list.return_value = [self.fake_attr_obj]
for context in self.authorized_contexts:
headers = self.gen_headers(context)
response = self.get_json(ATTRIBUTE_URL, headers=headers)
self.assertIsInstance(response['attributes'], list)
def test_get_all_attributes_forbidden(self):
for context in self.unauthorized_contexts:
with self.subTest(context=context):
headers = self.gen_headers(context)
with self.assertRaisesRegex(
Exception, base.POLICY_DENY_EXPECTED
):
self.get_json(ATTRIBUTE_URL, headers=headers)
@mock.patch('cyborg.objects.Attribute.get', autospec=True)
def test_get_one_attribute_success(self, mock_get):
mock_get.return_value = self.fake_attr_obj
uuid = self.fake_attr_obj['uuid']
for context in self.authorized_contexts:
headers = self.gen_headers(context)
response = self.get_json(
ATTRIBUTE_URL + '/%s' % uuid, headers=headers
)
self.assertEqual(uuid, response['uuid'])
def test_get_one_attribute_forbidden(self):
uuid = self.fake_attr_obj['uuid']
for context in self.unauthorized_contexts:
with self.subTest(context=context):
headers = self.gen_headers(context)
with self.assertRaisesRegex(
Exception, base.POLICY_DENY_EXPECTED
):
self.get_json(
ATTRIBUTE_URL + '/%s' % uuid, headers=headers
)
@mock.patch('cyborg.objects.Attribute.create', autospec=True)
def test_create_attribute_success(self, mock_create):
mock_create.return_value = self.fake_attr_obj
for context in self.authorized_contexts:
headers = self.gen_headers(context)
response = self.post_json(
ATTRIBUTE_URL, self.fake_attr_dict, headers=headers
)
self.assertEqual(http.HTTPStatus.CREATED, response.status_int)
def test_create_attribute_forbidden(self):
for context in self.unauthorized_contexts:
with self.subTest(context=context):
headers = self.gen_headers(context)
with self.assertRaisesRegex(
Exception, base.POLICY_DENY_EXPECTED
):
self.post_json(
ATTRIBUTE_URL, self.fake_attr_dict, headers=headers
)
@mock.patch('cyborg.objects.Attribute.destroy', autospec=True)
@mock.patch('cyborg.objects.Attribute.get', autospec=True)
def test_delete_attribute_success(self, mock_get, mock_destroy):
mock_get.return_value = self.fake_attr_obj
uuid = self.fake_attr_obj['uuid']
for context in self.authorized_contexts:
headers = self.gen_headers(context)
response = self.delete(
ATTRIBUTE_URL + '/%s' % uuid, headers=headers
)
self.assertEqual(http.HTTPStatus.NO_CONTENT, response.status_int)
def test_delete_attribute_forbidden(self):
uuid = self.fake_attr_obj['uuid']
for context in self.unauthorized_contexts:
with self.subTest(context=context):
headers = self.gen_headers(context)
with self.assertRaisesRegex(
Exception, base.POLICY_DENY_EXPECTED
):
self.delete(ATTRIBUTE_URL + '/%s' % uuid, headers=headers)
@@ -0,0 +1,156 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import http
from unittest import mock
from oslo_serialization import jsonutils
from cyborg.tests.unit import fake_deployable
from cyborg.tests.unit import fake_device
from cyborg.tests.unit.policies import base
DEPLOYABLE_URL = '/deployables'
class DeployablePolicyTest(base.BasePolicyTest):
"""Test deployable APIs policies with all possible contexts.
This class defines the set of contexts with different roles
which are allowed and not allowed to pass the policy checks.
With those set of contexts, it will call the API operation and
verify the expected behaviour.
"""
def setUp(self):
super().setUp()
self.fake_dep = fake_deployable.fake_deployable_obj(self.context)
self.fake_dev = fake_device.get_fake_devices_objs()[0]
bdf = {
'domain': '0000',
'bus': '00',
'device': '01',
'function': '1',
}
self.cpid = {
'id': 0,
'uuid': 'e4a66b0d-b377-40d6-9cdc-6bf7e720e596',
'device_id': '1',
'cpid_type': 'PCI',
'cpid_info': jsonutils.dumps(bdf).encode('utf-8'),
}
self.image_uuid = '9a17439a-85d0-4c53-a3d3-0f68a2eac896'
# rule:admin_api with project scope enforced.
self.read_authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context,
]
self.read_unauthorized_contexts = list(
set(self.all_contexts) - set(self.read_authorized_contexts)
)
self.program_authorized_contexts = self.read_authorized_contexts
self.program_unauthorized_contexts = self.read_unauthorized_contexts
@mock.patch('cyborg.objects.Deployable.list', autospec=True)
def test_get_all_deployables_success(self, mock_list):
mock_list.return_value = [self.fake_dep]
for context in self.read_authorized_contexts:
headers = self.gen_headers(context)
response = self.get_json(DEPLOYABLE_URL, headers=headers)
self.assertIsInstance(response['deployables'], list)
def test_get_all_deployables_forbidden(self):
for context in self.read_unauthorized_contexts:
with self.subTest(context=context):
headers = self.gen_headers(context)
with self.assertRaisesRegex(
Exception, base.POLICY_DENY_EXPECTED
):
self.get_json(DEPLOYABLE_URL, headers=headers)
@mock.patch('cyborg.objects.Attribute.get_by_filter', autospec=True)
@mock.patch('cyborg.objects.Deployable.get', autospec=True)
def test_get_one_deployable_success(self, mock_get, mock_attr):
mock_get.return_value = self.fake_dep
mock_attr.return_value = []
uuid = self.fake_dep['uuid']
for context in self.read_authorized_contexts:
headers = self.gen_headers(context)
response = self.get_json(
DEPLOYABLE_URL + '/%s' % uuid, headers=headers
)
self.assertEqual(uuid, response['uuid'])
def test_get_one_deployable_forbidden(self):
uuid = self.fake_dep['uuid']
for context in self.read_unauthorized_contexts:
with self.subTest(context=context):
headers = self.gen_headers(context)
with self.assertRaisesRegex(
Exception, base.POLICY_DENY_EXPECTED
):
self.get_json(
DEPLOYABLE_URL + '/%s' % uuid, headers=headers
)
@mock.patch('cyborg.objects.Device.get_by_device_id', autospec=True)
@mock.patch('cyborg.objects.Deployable.get_cpid_list', autospec=True)
@mock.patch('cyborg.objects.Deployable.get', autospec=True)
@mock.patch('cyborg.agent.rpcapi.AgentAPI.fpga_program', autospec=True)
def test_program_deployable_success(
self,
mock_program,
mock_dep_get,
mock_cpid,
mock_dev_get,
):
dep_uuid = self.fake_dep['uuid']
mock_dep_get.return_value = self.fake_dep
mock_dev_get.return_value = self.fake_dev
# Use side_effect so a fresh dict copy is returned on every call;
# the controller mutates cpid_list[0]['cpid_info'] in-place and a
# shared return_value would be corrupted on the second iteration.
mock_cpid.side_effect = lambda *args: [dict(self.cpid)]
mock_program.return_value = True
body = [{'image_uuid': self.image_uuid}]
for context in self.program_authorized_contexts:
headers = self.gen_headers(context)
response = self.patch_json(
DEPLOYABLE_URL + '/%s/program' % dep_uuid,
[{'path': '/bitstream_id', 'value': body, 'op': 'replace'}],
headers=headers,
)
self.assertEqual(http.HTTPStatus.OK, response.status_code)
def test_program_deployable_forbidden(self):
dep_uuid = self.fake_dep['uuid']
body = [{'image_uuid': self.image_uuid}]
for context in self.program_unauthorized_contexts:
with self.subTest(context=context):
headers = self.gen_headers(context)
with self.assertRaisesRegex(
Exception, base.POLICY_DENY_EXPECTED
):
self.patch_json(
DEPLOYABLE_URL + '/%s/program' % dep_uuid,
[
{
'path': '/bitstream_id',
'value': body,
'op': 'replace',
}
],
headers=headers,
)
@@ -13,7 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from http import HTTPStatus
import http
from unittest import mock
from oslo_log import log as logging
@@ -39,15 +40,12 @@ class DeviceProfilePolicyTest(base.BasePolicyTest):
def setUp(self):
super().setUp()
self.flags(enforce_scope=False, group="oslo_policy")
self.controller = device_profiles.DeviceProfilesController()
self.fake_dp_objs = fake_device_profile.get_obj_devprofs()
self.fake_dps = fake_device_profile.get_api_devprofs()
# check both legacy and new policies for create APIs
self.create_authorized_contexts = [
self.legacy_admin_context, # legacy: admin
self.system_admin_context, # new policy: system_admin
self.project_admin_context,
]
self.create_unauthorized_contexts = list(
@@ -63,7 +61,6 @@ class DeviceProfilePolicyTest(base.BasePolicyTest):
# device profile, so we just uncomment legacy_owner_context here.
# If later we need support owner policy, we should recheck here.
# self.legacy_owner_context,
self.system_admin_context, # new policy: system_admin
self.project_admin_context,
]
self.delete_unauthorized_contexts = list(
@@ -99,19 +96,19 @@ class DeviceProfilePolicyTest(base.BasePolicyTest):
response = self.post_json(DP_URL, dp, headers=headers)
out_dp = jsonutils.loads(response.controller_output)
self.assertEqual(HTTPStatus.CREATED, response.status_int)
self.assertEqual(http.HTTPStatus.CREATED, response.status_int)
self._validate_dp(dp[0], out_dp)
def test_create_device_profile_forbidden(self):
dp = [self.fake_dps[0]]
dp[0]['created_at'] = str(dp[0]['created_at'])
for context in self.create_unauthorized_contexts:
headers = self.gen_headers(context)
try:
self.post_json(DP_URL, dp, headers=headers)
except Exception as e:
exc = e
self.assertIn("Bad response: 403 Forbidden", exc.args[0])
with self.subTest(context=context):
headers = self.gen_headers(context)
with self.assertRaisesRegex(
Exception, base.POLICY_DENY_EXPECTED
):
self.post_json(DP_URL, dp, headers=headers)
@mock.patch('cyborg.conductor.rpcapi.ConductorAPI.device_profile_delete')
@mock.patch('cyborg.objects.DeviceProfile.get_by_name')
@@ -124,54 +121,19 @@ class DeviceProfilePolicyTest(base.BasePolicyTest):
# Delete by UUID
url = DP_URL + "/5d2c0797-c3cd-4f4b-b0d0-2cc5e99ef66e"
response = self.delete(url, headers=headers)
self.assertEqual(HTTPStatus.NO_CONTENT, response.status_int)
self.assertEqual(http.HTTPStatus.NO_CONTENT, response.status_int)
# Delete by name
url = DP_URL + "/mydp"
response = self.delete(url, headers=headers)
self.assertEqual(HTTPStatus.NO_CONTENT, response.status_int)
self.assertEqual(http.HTTPStatus.NO_CONTENT, response.status_int)
def test_delete_device_profile_forbidden(self):
dp = self.fake_dp_objs[0]
url = DP_URL + '/%s'
exc = None
for context in self.delete_unauthorized_contexts:
headers = self.gen_headers(context)
try:
self.delete(url % dp['uuid'], headers=headers)
except Exception as e:
exc = e
self.assertIn("Bad response: 403 Forbidden", exc.args[0])
class DeviceProfileScopeTypePolicyTest(DeviceProfilePolicyTest):
"""Test device_profile APIs policies with system scope enabled.
This class set the cyborg.conf [oslo_policy] enforce_scope to True
so that we can switch on the scope checking on oslo policy side.
It defines the set of context with scoped token
which are allowed and not allowed to pass the policy checks.
With those set of context, it will run the API operation and
verify the expected behaviour.
"""
def setUp(self):
super().setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# check that admin is able to do create and delete operations.
self.create_authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context,
]
self.delete_authorized_contexts = self.create_authorized_contexts
# Check that system or non-admin is not able to perform the system
# level actions on device_profiles.
self.create_unauthorized_contexts = [
self.system_admin_context,
self.system_member_context,
self.system_reader_context,
self.system_foo_context,
self.project_member_context,
self.other_project_member_context,
self.project_foo_context,
self.project_reader_context,
]
self.delete_unauthorized_contexts = self.create_unauthorized_contexts
with self.subTest(context=context):
headers = self.gen_headers(context)
with self.assertRaisesRegex(
Exception, base.POLICY_DENY_EXPECTED
):
self.delete(url % dp['uuid'], headers=headers)
+182
View File
@@ -0,0 +1,182 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import http
from unittest import mock
from cyborg.tests.unit import fake_device
from cyborg.tests.unit.policies import base
DEVICE_URL = '/devices'
class DevicePolicyTest(base.BasePolicyTest):
"""Test device APIs policies with all possible contexts.
This class defines the set of contexts with different roles
which are allowed and not allowed to pass the policy checks.
With those set of contexts, it will call the API operation and
verify the expected behaviour.
"""
def setUp(self):
super().setUp()
self.fake_devices = fake_device.get_fake_devices_objs()
# rule:admin_api with project scope enforced.
self.read_authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context,
]
self.read_unauthorized_contexts = list(
set(self.all_contexts) - set(self.read_authorized_contexts)
)
@mock.patch('cyborg.objects.Device.list', autospec=True)
def test_get_all_devices_success(self, mock_list):
mock_list.return_value = self.fake_devices
for context in self.read_authorized_contexts:
headers = self.gen_headers(context)
response = self.get_json(
DEVICE_URL, headers=headers, return_json=False
)
self.assertEqual(http.HTTPStatus.OK, response.status_int)
self.assertEqual(
len(self.fake_devices), len(response.json['devices'])
)
def test_get_all_devices_forbidden(self):
for context in self.read_unauthorized_contexts:
with self.subTest(context=context):
headers = self.gen_headers(context)
with self.assertRaisesRegex(
Exception, base.POLICY_DENY_EXPECTED
):
self.get_json(DEVICE_URL, headers=headers)
@mock.patch('cyborg.objects.Device.get', autospec=True)
def test_get_one_device_success(self, mock_get):
fake_dev = self.fake_devices[0]
mock_get.return_value = fake_dev
uuid = fake_dev['uuid']
for context in self.read_authorized_contexts:
headers = self.gen_headers(context)
response = self.get_json(
DEVICE_URL + '/%s' % uuid, headers=headers
)
self.assertEqual(uuid, response['uuid'])
def test_get_one_device_forbidden(self):
uuid = self.fake_devices[0]['uuid']
for context in self.read_unauthorized_contexts:
with self.subTest(context=context):
headers = self.gen_headers(context)
with self.assertRaisesRegex(
Exception, base.POLICY_DENY_EXPECTED
):
self.get_json(DEVICE_URL + '/%s' % uuid, headers=headers)
@mock.patch(
'cyborg.api.controllers.v2.devices.placement_client.PlacementClient'
)
@mock.patch('cyborg.objects.Attribute.get_by_filter', autospec=True)
@mock.patch('cyborg.objects.Deployable.get_by_id', autospec=True)
@mock.patch('cyborg.objects.Device.save', autospec=True)
@mock.patch('cyborg.objects.Device.get', autospec=True)
def test_disable_device_success(
self,
mock_get,
mock_save,
mock_dep_get,
mock_attr_filter,
mock_pc_cls,
):
fake_dev = self.fake_devices[0]
mock_get.return_value = fake_dev
mock_dep = mock.MagicMock()
mock_dep.id = fake_dev.id
mock_dep.rp_uuid = '00000000-0000-0000-0000-000000000001'
mock_dep.num_accelerators = 4
mock_dep_get.return_value = mock_dep
mock_attr_filter.return_value = [mock.MagicMock(value='CUSTOM_FOO')]
mock_pc_cls.return_value = mock.MagicMock()
for context in self.read_authorized_contexts:
headers = self.gen_headers(context)
response = self.post_json(
DEVICE_URL + '/%s/disable' % fake_dev.uuid,
{},
headers=headers,
)
self.assertEqual(http.HTTPStatus.NO_CONTENT, response.status_int)
def test_disable_device_forbidden(self):
uuid = self.fake_devices[0]['uuid']
for context in self.read_unauthorized_contexts:
with self.subTest(context=context):
headers = self.gen_headers(context)
with self.assertRaisesRegex(
Exception, base.POLICY_DENY_EXPECTED
):
self.post_json(
DEVICE_URL + '/%s/disable' % uuid,
{},
headers=headers,
)
@mock.patch(
'cyborg.api.controllers.v2.devices.placement_client.PlacementClient'
)
@mock.patch('cyborg.objects.Attribute.get_by_filter', autospec=True)
@mock.patch('cyborg.objects.Deployable.get_by_id', autospec=True)
@mock.patch('cyborg.objects.Device.save', autospec=True)
@mock.patch('cyborg.objects.Device.get', autospec=True)
def test_enable_device_success(
self,
mock_get,
mock_save,
mock_dep_get,
mock_attr_filter,
mock_pc_cls,
):
fake_dev = self.fake_devices[0]
mock_get.return_value = fake_dev
mock_dep = mock.MagicMock()
mock_dep.id = fake_dev.id
mock_dep.rp_uuid = '00000000-0000-0000-0000-000000000001'
mock_dep.num_accelerators = 4
mock_dep_get.return_value = mock_dep
mock_attr_filter.return_value = [mock.MagicMock(value='CUSTOM_FOO')]
mock_pc_cls.return_value = mock.MagicMock()
for context in self.read_authorized_contexts:
headers = self.gen_headers(context)
response = self.post_json(
DEVICE_URL + '/%s/enable' % fake_dev.uuid,
{},
headers=headers,
)
self.assertEqual(http.HTTPStatus.NO_CONTENT, response.status_int)
def test_enable_device_forbidden(self):
uuid = self.fake_devices[0]['uuid']
for context in self.read_unauthorized_contexts:
with self.subTest(context=context):
headers = self.gen_headers(context)
with self.assertRaisesRegex(
Exception, base.POLICY_DENY_EXPECTED
):
self.post_json(
DEVICE_URL + '/%s/enable' % uuid,
{},
headers=headers,
)
+32
View File
@@ -0,0 +1,32 @@
# Copyright 2026 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cyborg.common import authorize_wsgi
from cyborg.tests import base
class TestAuthorizeWSGI(base.TestCase):
def test_init_enforcer_warns_when_scope_enforcement_disabled(self):
self.flags(enforce_scope=False, group='oslo_policy')
authorize_wsgi.get_enforcer().clear()
authorize_wsgi._ENFORCER = None
self.addCleanup(setattr, authorize_wsgi, '_ENFORCER', None)
with self.assertLogs(
'cyborg.common.authorize_wsgi', level='WARNING'
) as logs:
authorize_wsgi.init_enforcer(suppress_deprecation_warnings=True)
self.assertIn('oslo_policy.enforce_scope is disabled', logs.output[0])
@@ -0,0 +1,19 @@
---
security:
- |
This issue is assigned CVE-2026-40213.
Replaced permissive ``rule:allow`` defaults with ``rule:admin_api`` on
device, deployable, and attribute API policies so authenticated
low-privilege users cannot read or change hardware topology and
management data without the admin role. System-scoped tokens are not
supported by Cyborg. Deployments that relied on the old
defaults must grant ``admin`` or define custom policy rules for these
APIs.
upgrade:
- |
Cyborg API policies now declare ``scope_types=['project']`` and reject
Keystone system-scoped tokens via oslo.policy scope enforcement. Keep
``[oslo_policy] enforce_scope=True``. Disabling it weakens project
isolation and is discouraged; prefer custom policy rules if you need
different access behavior.