Server actions APIs scoped to project scope

As per the RBAC new direction, we will allow
project resources operation to be performed by
the project scoped token only and system user will
be allowed to perform system level operation only
not project resources specific.

Details about new direction can be found in community-wide
goal
- https://governance.openstack.org/tc/goals/selected/consistent-and-secure-rbac.html

This commit modify the server action APIs to be scoped
to project scope.

Fix the shelve-offload policy to pass the instance project
id as target.

Also modifying and adding tests for four cases:
1. enforce_scope=False + legacy rule (current default policies)
2. enforce_scope=False + No legacy rule
3. enforce_scope=True + legacy rule
4. enforce_scope=True + no legacy rule (end goal of new RBAC)

Partial implement blueprint policy-defaults-refresh-2

Change-Id: I5293e9aa9cb3b48f97a5a2cf272939ada1aea2db
This commit is contained in:
Ghanshyam Mann 2022-01-12 02:09:24 -06:00 committed by Ghanshyam
parent d7be635fb4
commit 20a07ee9a6
21 changed files with 508 additions and 499 deletions

View File

@ -64,9 +64,11 @@ class ShelveController(wsgi.Controller):
def _shelve_offload(self, req, id, body):
"""Force removal of a shelved instance from the compute node."""
context = req.environ["nova.context"]
context.can(shelve_policies.POLICY_ROOT % 'shelve_offload')
instance = common.get_instance(self.compute_api, context, id)
context.can(shelve_policies.POLICY_ROOT % 'shelve_offload',
target={'user_id': instance.user_id,
'project_id': instance.project_id})
try:
self.compute_api.shelve_offload(context, instance)
except exception.InstanceIsLocked as e:

View File

@ -24,7 +24,7 @@ POLICY_ROOT = 'os_compute_api:os-admin-actions:%s'
admin_actions_policies = [
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'reset_state',
check_str=base.SYSTEM_ADMIN,
check_str=base.PROJECT_ADMIN,
description="Reset the state of a given server",
operations=[
{
@ -32,10 +32,10 @@ admin_actions_policies = [
'path': '/servers/{server_id}/action (os-resetState)'
}
],
scope_types=['system', 'project']),
scope_types=['project']),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'inject_network_info',
check_str=base.SYSTEM_ADMIN,
check_str=base.PROJECT_ADMIN,
description="Inject network information into the server",
operations=[
{
@ -43,7 +43,7 @@ admin_actions_policies = [
'path': '/servers/{server_id}/action (injectNetworkInfo)'
}
],
scope_types=['system', 'project']),
scope_types=['project']),
]

View File

@ -24,7 +24,7 @@ BASE_POLICY_NAME = 'os_compute_api:os-admin-password'
admin_password_policies = [
policy.DocumentedRuleDefault(
name=BASE_POLICY_NAME,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description="Change the administrative password for a server",
operations=[
{
@ -32,7 +32,7 @@ admin_password_policies = [
'method': 'POST'
}
],
scope_types=['system', 'project'])
scope_types=['project'])
]

View File

@ -24,7 +24,7 @@ BASE_POLICY_NAME = 'os_compute_api:os-evacuate'
evacuate_policies = [
policy.DocumentedRuleDefault(
name=BASE_POLICY_NAME,
check_str=base.SYSTEM_ADMIN,
check_str=base.PROJECT_ADMIN,
description="Evacuate a server from a failed host to a new host",
operations=[
{
@ -32,7 +32,7 @@ evacuate_policies = [
'method': 'POST'
}
],
scope_types=['system', 'project']),
scope_types=['project']),
]

View File

@ -24,7 +24,7 @@ POLICY_ROOT = 'os_compute_api:os-lock-server:%s'
lock_server_policies = [
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'lock',
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description="Lock a server",
operations=[
{
@ -32,11 +32,11 @@ lock_server_policies = [
'method': 'POST'
}
],
scope_types=['system', 'project']
scope_types=['project']
),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'unlock',
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description="Unlock a server",
operations=[
{
@ -44,11 +44,11 @@ lock_server_policies = [
'method': 'POST'
}
],
scope_types=['system', 'project']
scope_types=['project']
),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'unlock:unlock_override',
check_str=base.SYSTEM_ADMIN,
check_str=base.PROJECT_ADMIN,
description="""Unlock a server, regardless who locked the server.
This check is performed only after the check
@ -59,7 +59,7 @@ os_compute_api:os-lock-server:unlock passes""",
'method': 'POST'
}
],
scope_types=['system', 'project']
scope_types=['project']
),
]

View File

@ -38,7 +38,7 @@ DEPRECATED_POLICY = policy.DeprecatedRule(
multinic_policies = [
policy.DocumentedRuleDefault(
name=BASE_POLICY_NAME % 'add',
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description="""Add a fixed IP address to a server.
This API is proxy calls to the Network service. This is
@ -49,11 +49,11 @@ deprecated.""",
'path': '/servers/{server_id}/action (addFixedIp)'
}
],
scope_types=['system', 'project'],
scope_types=['project'],
deprecated_rule=DEPRECATED_POLICY),
policy.DocumentedRuleDefault(
name=BASE_POLICY_NAME % 'remove',
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description="""Remove a fixed IP address from a server.
This API is proxy calls to the Network service. This is
@ -64,7 +64,7 @@ deprecated.""",
'path': '/servers/{server_id}/action (removeFixedIp)'
}
],
scope_types=['system', 'project'],
scope_types=['project'],
deprecated_rule=DEPRECATED_POLICY),
]

View File

@ -24,7 +24,7 @@ POLICY_ROOT = 'os_compute_api:os-pause-server:%s'
pause_server_policies = [
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'pause',
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description="Pause a server",
operations=[
{
@ -32,11 +32,11 @@ pause_server_policies = [
'method': 'POST'
}
],
scope_types=['system', 'project']
scope_types=['project']
),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'unpause',
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description="Unpause a paused server",
operations=[
{
@ -44,7 +44,7 @@ pause_server_policies = [
'method': 'POST'
}
],
scope_types=['system', 'project']
scope_types=['project']
),
]

View File

@ -24,7 +24,7 @@ BASE_POLICY_NAME = 'os_compute_api:os-remote-consoles'
remote_consoles_policies = [
policy.DocumentedRuleDefault(
name=BASE_POLICY_NAME,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description="""Generate a URL to access remove server console.
This policy is for ``POST /remote-consoles`` API and below Server actions APIs
@ -56,7 +56,7 @@ are deprecated:
'path': '/servers/{server_id}/remote-consoles'
},
],
scope_types=['system', 'project']),
scope_types=['project']),
]

View File

@ -37,7 +37,7 @@ DEPRECATED_POLICY = policy.DeprecatedRule(
rescue_policies = [
policy.DocumentedRuleDefault(
name=BASE_POLICY_NAME,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description="Rescue a server",
operations=[
{
@ -45,10 +45,10 @@ rescue_policies = [
'method': 'POST'
},
],
scope_types=['system', 'project']),
scope_types=['project']),
policy.DocumentedRuleDefault(
name=UNRESCUE_POLICY_NAME,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description="Unrescue a server",
operations=[
{
@ -56,7 +56,7 @@ rescue_policies = [
'method': 'POST'
}
],
scope_types=['system', 'project'],
scope_types=['project'],
deprecated_rule=DEPRECATED_POLICY
),
]

View File

@ -24,7 +24,7 @@ POLICY_ROOT = 'os_compute_api:os-shelve:%s'
shelve_policies = [
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'shelve',
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description="Shelve server",
operations=[
{
@ -32,10 +32,10 @@ shelve_policies = [
'path': '/servers/{server_id}/action (shelve)'
}
],
scope_types=['system', 'project']),
scope_types=['project']),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'unshelve',
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description="Unshelve (restore) shelved server",
operations=[
{
@ -43,10 +43,10 @@ shelve_policies = [
'path': '/servers/{server_id}/action (unshelve)'
}
],
scope_types=['system', 'project']),
scope_types=['project']),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'shelve_offload',
check_str=base.SYSTEM_ADMIN,
check_str=base.PROJECT_ADMIN,
description="Shelf-offload (remove) server",
operations=[
{
@ -54,7 +54,7 @@ shelve_policies = [
'path': '/servers/{server_id}/action (shelveOffload)'
}
],
scope_types=['system', 'project']),
scope_types=['project']),
]

View File

@ -24,7 +24,7 @@ POLICY_ROOT = 'os_compute_api:os-suspend-server:%s'
suspend_server_policies = [
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'resume',
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description="Resume suspended server",
operations=[
{
@ -32,10 +32,10 @@ suspend_server_policies = [
'path': '/servers/{server_id}/action (resume)'
}
],
scope_types=['system', 'project']),
scope_types=['project']),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'suspend',
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description="Suspend server",
operations=[
{
@ -43,7 +43,7 @@ suspend_server_policies = [
'path': '/servers/{server_id}/action (suspend)'
}
],
scope_types=['system', 'project']),
scope_types=['project']),
]

View File

@ -40,40 +40,48 @@ class AdminActionsPolicyTest(base.BasePolicyTest):
uuid = uuids.fake_id
self.instance = fake_instance.fake_instance_obj(
self.project_member_context,
id=1, uuid=uuid, vm_state=vm_states.ACTIVE,
task_state=None, launched_at=timeutils.utcnow())
id=1, uuid=uuid, project_id=self.project_id,
vm_state=vm_states.ACTIVE, task_state=None,
launched_at=timeutils.utcnow())
self.mock_get.return_value = self.instance
# Check that admin is able to change the service
self.admin_authorized_contexts = [
# By default, legacy rule are enable and scope check is disabled.
# system admin, legacy admin, and project admin is able to perform
# server admin actions
self.project_action_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
# Check that non-admin is not able to change the service
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]
@mock.patch('nova.objects.Instance.save')
def test_reset_state_policy(self, mock_save):
rule_name = "os_compute_api:os-admin-actions:reset_state"
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller._reset_state,
self.req, self.instance.uuid,
body={'os-resetState': {'state': 'active'}})
self.common_policy_auth(self.project_action_authorized_contexts,
rule_name, self.controller._reset_state,
self.req, self.instance.uuid,
body={'os-resetState': {'state': 'active'}})
def test_inject_network_info_policy(self):
rule_name = "os_compute_api:os-admin-actions:inject_network_info"
with mock.patch.object(self.controller.compute_api,
"inject_network_info"):
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name,
self.controller._inject_network_info,
self.req, self.instance.uuid, body={})
self.common_policy_auth(self.project_action_authorized_contexts,
rule_name,
self.controller._inject_network_info,
self.req, self.instance.uuid, body={})
class AdminActionsNoLegacyNoScopePolicyTest(AdminActionsPolicyTest):
"""Test Admin Actions APIs policies with no legacy deprecated rules
and no scope checks which means new defaults only.
"""
without_deprecated_rules = True
def setUp(self):
super(AdminActionsNoLegacyNoScopePolicyTest, self).setUp()
# With no legacy rule and scope diable, only project admin
# is able to perform server admin actions.
self.project_action_authorized_contexts = [self.project_admin_context]
class AdminActionsScopeTypePolicyTest(AdminActionsPolicyTest):
@ -90,27 +98,22 @@ class AdminActionsScopeTypePolicyTest(AdminActionsPolicyTest):
def setUp(self):
super(AdminActionsScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# With scope enable, system admin will not be able to
# perform server admin actions.
self.project_action_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context]
class AdminActionsNoLegacyPolicyTest(AdminActionsScopeTypePolicyTest):
class AdminActionsScopeTypeNoLegacyPolicyTest(AdminActionsScopeTypePolicyTest):
"""Test Admin Actions APIs policies with system scope enabled,
and no more deprecated rules.
and no more deprecated rules which means scope + new defaults so
only project admin is able to perform admin action on their server.
"""
without_deprecated_rules = True
def setUp(self):
super(AdminActionsScopeTypePolicyTest, self).setUp()
# Check that system admin is able to perform the system level actions
# on server.
self.admin_authorized_contexts = [
self.system_admin_context]
# Check that non-system or non-admin is not able to perform the system
# level actions on server.
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.project_admin_context, self.project_member_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]
super(AdminActionsScopeTypeNoLegacyPolicyTest, self).setUp()
# This is how our RBAC will looks like. With no legacy rule
# and scope enable, only project admin is able to perform
# server admin actions.
self.project_action_authorized_contexts = [self.project_admin_context]

View File

@ -47,28 +47,23 @@ class AdminPasswordPolicyTest(base.BasePolicyTest):
user_id=user_id, vm_state=vm_states.ACTIVE,
task_state=None, launched_at=timeutils.utcnow())
self.mock_get.return_value = self.instance
# Check that admin or and server owner is able to change the password
self.admin_authorized_contexts = [
# With legacy rule and no scope checks, all admin, project members
# project reader or other project role(because legacy rule allow server
# owner- having same project id and no role check) is able to change
# the password for their server.
self.project_action_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
# Check that non-admin is not able to change the password
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
@mock.patch('nova.compute.api.API.set_admin_password')
def test_change_paassword_policy(self, mock_password):
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
self.rule_name,
self.controller.change_password,
self.req, self.instance.uuid,
body={'changePassword': {
'adminPass': '1234pass'}})
self.common_policy_auth(self.project_action_authorized_contexts,
self.rule_name,
self.controller.change_password,
self.req, self.instance.uuid,
body={'changePassword': {
'adminPass': '1234pass'}})
def test_change_password_overridden_policy_failed_with_other_user(self):
# Change the user_id in request context.
@ -93,6 +88,22 @@ class AdminPasswordPolicyTest(base.BasePolicyTest):
mock.ANY, '1234pass')
class AdminPasswordNoLegacyNoScopePolicyTest(AdminPasswordPolicyTest):
"""Test Admin Password APIs policies with no legacy deprecated rules
and no scope checks which means new defaults only.
"""
without_deprecated_rules = True
def setUp(self):
super(AdminPasswordNoLegacyNoScopePolicyTest, self).setUp()
# With no legacy rule, only project admin or member will be
# able to change the server password.
self.project_action_authorized_contexts = [
self.project_admin_context, self.project_member_context]
class AdminPasswordScopeTypePolicyTest(AdminPasswordPolicyTest):
"""Test Admin Password APIs policies with system scope enabled.
This class set the nova.conf [oslo_policy] enforce_scope to True
@ -106,31 +117,26 @@ class AdminPasswordScopeTypePolicyTest(AdminPasswordPolicyTest):
def setUp(self):
super(AdminPasswordScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Scope enable will not allow system admin to change password.
self.project_action_authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
class AdminPasswordNoLegacyPolicyTest(AdminPasswordPolicyTest):
class AdminPasswordScopeTypeNoLegacyTest(AdminPasswordScopeTypePolicyTest):
"""Test Admin Password APIs policies with system scope enabled,
and no more deprecated rules that allow the legacy admin API to
access system_admin_or_owner APIs.
and no more deprecated rules which means scope + new defaults so
only project admin and member is able to change their server password.
"""
without_deprecated_rules = True
def setUp(self):
super(AdminPasswordNoLegacyPolicyTest, self).setUp()
super(AdminPasswordScopeTypeNoLegacyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Check that system or projct admin or owner is able to change
# the password.
self.admin_authorized_contexts = [
self.system_admin_context,
# With scope enable and no legacy rule only project admin/member
# will be able to change password for the server.
self.project_action_authorized_contexts = [
self.project_admin_context, self.project_member_context]
# Check that non-system and non-admin/owner is not able to change the
# password.
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.project_reader_context,
self.project_foo_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]

View File

@ -55,18 +55,12 @@ class EvacuatePolicyTest(base.BasePolicyTest):
id=1, uuid=uuid, user_id=user_id, vm_state=vm_states.ACTIVE,
task_state=None, launched_at=timeutils.utcnow())
self.mock_get.return_value = self.instance
# Check that admin is able to evacuate the server
self.admin_authorized_contexts = [
# By default, legacy rule are enable and scope check is disabled.
# system admin, legacy admin, and project admin is able to evacuate
# the server.
self.project_action_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
# Check that non-admin is not able to evacuate the server
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]
@mock.patch('nova.compute.api.API.evacuate')
def test_evacuate_policy(self, mock_evacuate):
@ -75,11 +69,10 @@ class EvacuatePolicyTest(base.BasePolicyTest):
'onSharedStorage': 'False',
'adminPass': 'admin_pass'}
}
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller._evacuate,
self.req, uuids.fake_id,
body=body)
self.common_policy_auth(self.project_action_authorized_contexts,
rule_name, self.controller._evacuate,
self.req, uuids.fake_id,
body=body)
def test_evacuate_policy_failed_with_other_user(self):
rule_name = "os_compute_api:os-evacuate"
@ -112,6 +105,21 @@ class EvacuatePolicyTest(base.BasePolicyTest):
'MyNewPass', None)
class EvacuateNoLegacyNoScopePolicyTest(EvacuatePolicyTest):
"""Test Evacuate APIs policies with no legacy deprecated rules
and no scope checks which means new defaults only.
"""
without_deprecated_rules = True
def setUp(self):
super(EvacuateNoLegacyNoScopePolicyTest, self).setUp()
# With no legacy rule and scope disable, only project admin
# will be able to evacuate server.
self.project_action_authorized_contexts = [self.project_admin_context]
class EvacuateScopeTypePolicyTest(EvacuatePolicyTest):
"""Test Evacuate APIs policies with system scope enabled.
@ -126,28 +134,21 @@ class EvacuateScopeTypePolicyTest(EvacuatePolicyTest):
def setUp(self):
super(EvacuateScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# With scope enable, system admin will not be able to
# evacuate the server.
self.project_action_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context]
class EvacuateNoLegacyPolicyTest(EvacuateScopeTypePolicyTest):
class EvacuateScopeTypeNoLegacyPolicyTest(EvacuateScopeTypePolicyTest):
"""Test Evacuate APIs policies with system scope enabled,
and no more deprecated rules that allow the legacy admin API to
access system APIs.
and no more deprecated rules which means scope + new defaults.
"""
without_deprecated_rules = True
def setUp(self):
super(EvacuateNoLegacyPolicyTest, self).setUp()
# Check that system admin is able to evacuate server.
self.admin_authorized_contexts = [
self.system_admin_context]
# Check that non-system or non-admin is not able to evacuate
super(EvacuateScopeTypeNoLegacyPolicyTest, self).setUp()
# This is how our RBAC will looks like. With no legacy rule
# and scope enable, only project admin is able to evacuate
# server.
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.project_admin_context, self.project_member_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]
self.project_action_authorized_contexts = [self.project_admin_context]

View File

@ -10,6 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
import fixtures
import mock
from oslo_utils.fixture import uuidsentinel as uuids
@ -17,6 +19,7 @@ from oslo_utils import timeutils
from nova.api.openstack.compute import lock_server
from nova.compute import vm_states
import nova.conf
from nova import exception
from nova.policies import base as base_policy
from nova.policies import lock_server as ls_policies
@ -24,6 +27,8 @@ from nova.tests.unit.api.openstack import fakes
from nova.tests.unit import fake_instance
from nova.tests.unit.policies import base
CONF = nova.conf.CONF
class LockServerPolicyTest(base.BasePolicyTest):
"""Test Lock server APIs policies with all possible context.
@ -48,54 +53,39 @@ class LockServerPolicyTest(base.BasePolicyTest):
task_state=None, launched_at=timeutils.utcnow())
self.mock_get.return_value = self.instance
# Check that admin or and server owner is able to lock/unlock
# the server
self.admin_or_owner_authorized_contexts = [
# With legacy rule and no scope checks, all admin, project members
# project reader or other project role(because legacy rule allow server
# owner- having same project id and no role check) is able to lock,
# unlock the server.
self.project_action_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
# Check that non-admin/owner is not able to lock/unlock
# the server
self.admin_or_owner_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
# Check that admin is able to unlock the server which is
# locked by other
self.admin_authorized_contexts = [
# By default, legacy rule are enable and scope check is disabled.
# system admin, legacy admin, and project admin is able to override
# unlock, regardless who locked the server.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
# Check that non-admin is not able to unlock the server
# which is locked by other
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
@mock.patch('nova.compute.api.API.lock')
def test_lock_server_policy(self, mock_lock):
rule_name = ls_policies.POLICY_ROOT % 'lock'
self.common_policy_check(self.admin_or_owner_authorized_contexts,
self.admin_or_owner_unauthorized_contexts,
rule_name,
self.controller._lock,
self.req, self.instance.uuid,
body={'lock': {}})
self.common_policy_auth(self.project_action_authorized_contexts,
rule_name,
self.controller._lock,
self.req, self.instance.uuid,
body={'lock': {}})
@mock.patch('nova.compute.api.API.unlock')
def test_unlock_server_policy(self, mock_unlock):
rule_name = ls_policies.POLICY_ROOT % 'unlock'
self.common_policy_check(self.admin_or_owner_authorized_contexts,
self.admin_or_owner_unauthorized_contexts,
rule_name,
self.controller._unlock,
self.req, self.instance.uuid,
body={'unlock': {}})
self.common_policy_auth(self.project_action_authorized_contexts,
rule_name,
self.controller._unlock,
self.req, self.instance.uuid,
body={'unlock': {}})
@mock.patch('nova.compute.api.API.unlock')
@mock.patch('nova.compute.api.API.is_expected_locked_by')
@ -104,12 +94,16 @@ class LockServerPolicyTest(base.BasePolicyTest):
rule = ls_policies.POLICY_ROOT % 'unlock'
self.policy.set_rules({rule: "@"}, overwrite=False)
rule_name = ls_policies.POLICY_ROOT % 'unlock:unlock_override'
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name,
self.controller._unlock,
self.req, self.instance.uuid,
body={'unlock': {}})
if not CONF.oslo_policy.enforce_scope:
check_rule = rule_name
else:
check_rule = functools.partial(base.rule_if_system,
rule, rule_name)
self.common_policy_auth(self.project_admin_authorized_contexts,
check_rule,
self.controller._unlock,
self.req, self.instance.uuid,
body={'unlock': {}})
def test_lock_server_policy_failed_with_other_user(self):
# Change the user_id in request context.
@ -134,6 +128,24 @@ class LockServerPolicyTest(base.BasePolicyTest):
body={'lock': {}})
class LockServerNoLegacyNoScopePolicyTest(LockServerPolicyTest):
"""Test lock/unlock server APIs policies with no legacy deprecated rules
and no scope checks which means new defaults only.
"""
without_deprecated_rules = True
def setUp(self):
super(LockServerNoLegacyNoScopePolicyTest, self).setUp()
# With no legacy rule, only project admin or member will be
# able to lock/unlock the server and only project admin can
# override the unlock.
self.project_action_authorized_contexts = [
self.project_admin_context, self.project_member_context]
self.project_admin_authorized_contexts = [self.project_admin_context]
class LockServerScopeTypePolicyTest(LockServerPolicyTest):
"""Test Lock Server APIs policies with system scope enabled.
This class set the nova.conf [oslo_policy] enforce_scope to True
@ -147,49 +159,31 @@ class LockServerScopeTypePolicyTest(LockServerPolicyTest):
def setUp(self):
super(LockServerScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Scope enable will not allow system admin to lock/unlock the server.
self.project_action_authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context]
class LockServerNoLegacyPolicyTest(LockServerScopeTypePolicyTest):
class LockServerScopeTypeNoLegacyPolicyTest(LockServerScopeTypePolicyTest):
"""Test Lock Server APIs policies with system scope enabled,
and no more deprecated rules that allow the legacy admin API to
access system APIs.
and no more deprecated rules.
"""
without_deprecated_rules = True
def setUp(self):
super(LockServerNoLegacyPolicyTest, self).setUp()
# Check that system admin or and server owner is able to lock/unlock
# the server
self.admin_or_owner_authorized_contexts = [
self.system_admin_context,
super(LockServerScopeTypeNoLegacyPolicyTest, self).setUp()
# With scope enable and no legacy rule, only project admin/member
# will be able to lock/unlock the server.
self.project_action_authorized_contexts = [
self.project_admin_context, self.project_member_context]
# Check that non-system/admin/owner is not able to lock/unlock
# the server
self.admin_or_owner_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.other_project_member_context, self.project_reader_context,
self.project_foo_context,
self.other_project_reader_context,
]
# Check that system admin is able to unlock the server which is
# locked by other
self.admin_authorized_contexts = [
self.system_admin_context]
# Check that system non-admin is not able to unlock the server
# which is locked by other
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.project_admin_context, self.project_member_context,
self.other_project_member_context,
self.project_foo_context, self.project_reader_context,
self.other_project_reader_context,
]
self.project_admin_authorized_contexts = [self.project_admin_context]
class LockServerOverridePolicyTest(LockServerNoLegacyPolicyTest):
class LockServerOverridePolicyTest(LockServerScopeTypeNoLegacyPolicyTest):
"""Test Lock Server APIs policies with system and project scoped
but default to system roles only are allowed for project roles
if override by operators. This test is with system scope enable
@ -198,21 +192,11 @@ class LockServerOverridePolicyTest(LockServerNoLegacyPolicyTest):
def setUp(self):
super(LockServerOverridePolicyTest, self).setUp()
# Check that system admin or project scoped role as override above
# is able to unlock the server which is locked by other
self.admin_authorized_contexts = [
self.system_admin_context,
# We are overriding the 'unlock:unlock_override' policy
# to PROJECT_MEMBER so testing it with both admin as well
# as project member as allowed context.
self.project_admin_authorized_contexts = [
self.project_admin_context, self.project_member_context]
# Check that non-system admin or project role is not able to
# unlock the server which is locked by other
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.other_project_member_context,
self.project_foo_context, self.project_reader_context,
self.other_project_reader_context,
]
def test_unlock_override_server_policy(self):
rule = ls_policies.POLICY_ROOT % 'unlock:unlock_override'
@ -220,6 +204,6 @@ class LockServerOverridePolicyTest(LockServerNoLegacyPolicyTest):
# make unlock allowed for everyone so that we can check unlock
# override policy.
ls_policies.POLICY_ROOT % 'unlock': "@",
rule: base_policy.PROJECT_MEMBER_OR_SYSTEM_ADMIN}, overwrite=False)
rule: base_policy.PROJECT_MEMBER}, overwrite=False)
super(LockServerOverridePolicyTest,
self).test_unlock_override_server_policy()

View File

@ -45,40 +45,53 @@ class MultinicPolicyTest(base.BasePolicyTest):
id=1, uuid=uuid, vm_state=vm_states.ACTIVE,
task_state=None, launched_at=timeutils.utcnow())
self.mock_get.return_value = self.instance
# Check that admin or owner is able to add/remove fixed ip.
self.admin_or_owner_authorized_contexts = [
# With legacy rule and no scope checks, all admin, project members
# project reader or other project role(because legacy rule allow server
# owner- having same project id and no role check) is able to
# add/remove fixed ip.
self.project_action_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context
]
# Check that non-admin and non-owner is not able to add/remove
# fixed ip.
self.admin_or_owner_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
self.project_reader_context, self.project_foo_context]
@mock.patch('nova.compute.api.API.add_fixed_ip')
def test_add_fixed_ip_policy(self, mock_add):
rule_name = "os_compute_api:os-multinic:add"
body = dict(addFixedIp=dict(networkId='test_net'))
self.common_policy_check(self.admin_or_owner_authorized_contexts,
self.admin_or_owner_unauthorized_contexts,
rule_name, self.controller._add_fixed_ip,
self.req, self.instance.uuid,
body=body)
self.common_policy_auth(self.project_action_authorized_contexts,
rule_name, self.controller._add_fixed_ip,
self.req, self.instance.uuid,
body=body)
@mock.patch('nova.compute.api.API.remove_fixed_ip')
def test_remove_fixed_ip_policy(self, mock_remove):
rule_name = "os_compute_api:os-multinic:remove"
body = dict(removeFixedIp=dict(address='1.2.3.4'))
self.common_policy_check(self.admin_or_owner_authorized_contexts,
self.admin_or_owner_unauthorized_contexts,
rule_name, self.controller._remove_fixed_ip,
self.req, self.instance.uuid,
body=body)
self.common_policy_auth(self.project_action_authorized_contexts,
rule_name, self.controller._remove_fixed_ip,
self.req, self.instance.uuid,
body=body)
class MultinicNoLegacyNoScopePolicyTest(MultinicPolicyTest):
"""Test Multinic APIs policies with no legacy deprecated rules
and no scope checks which means new defaults only.
"""
without_deprecated_rules = True
rules_without_deprecation = {
policies.BASE_POLICY_NAME % 'add':
base_policy.PROJECT_MEMBER,
policies.BASE_POLICY_NAME % 'remove':
base_policy.PROJECT_MEMBER}
def setUp(self):
super(MultinicNoLegacyNoScopePolicyTest, self).setUp()
# With no legacy rule, only project admin or member will be
# able to add/remove the fixed ip.
self.project_action_authorized_contexts = [
self.project_admin_context, self.project_member_context]
class MultinicScopeTypePolicyTest(MultinicPolicyTest):
@ -95,33 +108,28 @@ class MultinicScopeTypePolicyTest(MultinicPolicyTest):
def setUp(self):
super(MultinicScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Scope enable will not allow system admin to add/remove
# the fixed ip.
self.project_action_authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
class MultinicNoLegacyPolicyTest(MultinicScopeTypePolicyTest):
class MultinicScopeTypeNoLegacyPolicyTest(MultinicScopeTypePolicyTest):
"""Test Multinic APIs policies with system scope enabled,
and no more deprecated rules.
"""
without_deprecated_rules = True
rules_without_deprecation = {
policies.BASE_POLICY_NAME % 'add':
base_policy.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
base_policy.PROJECT_MEMBER,
policies.BASE_POLICY_NAME % 'remove':
base_policy.PROJECT_MEMBER_OR_SYSTEM_ADMIN}
base_policy.PROJECT_MEMBER}
def setUp(self):
super(MultinicNoLegacyPolicyTest, self).setUp()
# Check that system admin or owner is able to
# add/delete Fixed IP to server.
self.admin_or_owner_authorized_contexts = [
self.system_admin_context,
self.project_admin_context, self.project_member_context,
]
# Check that non-system and non-admin/owner is not able
# to add/delete Fixed IP to server.
self.admin_or_owner_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.project_reader_context,
self.project_foo_context,
self.system_foo_context, self.other_project_member_context,
self.other_project_reader_context
]
super(MultinicScopeTypeNoLegacyPolicyTest, self).setUp()
# With scope enable and no legacy rule, only project admin/member
# will be able to add/remove the fixed ip.
self.project_action_authorized_contexts = [
self.project_admin_context, self.project_member_context]

View File

@ -46,41 +46,32 @@ class PauseServerPolicyTest(base.BasePolicyTest):
user_id=user_id, vm_state=vm_states.ACTIVE,
task_state=None, launched_at=timeutils.utcnow())
self.mock_get.return_value = self.instance
# Check that admin or and server owner is able to pause/unpause
# the server
self.admin_or_owner_authorized_contexts = [
# With legacy rule and no scope checks, all admin, project members
# project reader or other project role(because legacy rule allow server
# owner- having same project id and no role check) is able to pause,
# unpause the server.
self.project_action_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
# Check that non-admin/owner is not able to pause/unpause
# the server
self.admin_or_owner_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
@mock.patch('nova.compute.api.API.pause')
def test_pause_server_policy(self, mock_pause):
rule_name = ps_policies.POLICY_ROOT % 'pause'
self.common_policy_check(self.admin_or_owner_authorized_contexts,
self.admin_or_owner_unauthorized_contexts,
rule_name,
self.controller._pause,
self.req, self.instance.uuid,
body={'pause': {}})
self.common_policy_auth(self.project_action_authorized_contexts,
rule_name,
self.controller._pause,
self.req, self.instance.uuid,
body={'pause': {}})
@mock.patch('nova.compute.api.API.unpause')
def test_unpause_server_policy(self, mock_unpause):
rule_name = ps_policies.POLICY_ROOT % 'unpause'
self.common_policy_check(self.admin_or_owner_authorized_contexts,
self.admin_or_owner_unauthorized_contexts,
rule_name,
self.controller._unpause,
self.req, self.instance.uuid,
body={'unpause': {}})
self.common_policy_auth(self.project_action_authorized_contexts,
rule_name,
self.controller._unpause,
self.req, self.instance.uuid,
body={'unpause': {}})
def test_pause_server_policy_failed_with_other_user(self):
# Change the user_id in request context.
@ -105,6 +96,22 @@ class PauseServerPolicyTest(base.BasePolicyTest):
body={'pause': {}})
class PauseServerNoLegacyNoScopePolicyTest(PauseServerPolicyTest):
"""Test Pause/unpause server APIs policies with no legacy deprecated rules
and no scope checks which means new defaults only.
"""
without_deprecated_rules = True
def setUp(self):
super(PauseServerNoLegacyNoScopePolicyTest, self).setUp()
# With no legacy rule, only project admin or member will be
# able to pause/unpause the server.
self.project_action_authorized_contexts = [
self.project_admin_context, self.project_member_context]
class PauseServerScopeTypePolicyTest(PauseServerPolicyTest):
"""Test Pause Server APIs policies with system scope enabled.
This class set the nova.conf [oslo_policy] enforce_scope to True
@ -118,28 +125,22 @@ class PauseServerScopeTypePolicyTest(PauseServerPolicyTest):
def setUp(self):
super(PauseServerScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Scope enable will not allow system admin to pause/unpause the server.
self.project_action_authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
class PauseServerNoLegacyPolicyTest(PauseServerScopeTypePolicyTest):
class PauseServerScopeTypeNoLegacyPolicyTest(PauseServerScopeTypePolicyTest):
"""Test Pause Server APIs policies with system scope enabled,
and no more deprecated rules that allow the legacy admin API to
access system APIs.
and no more deprecated rules.
"""
without_deprecated_rules = True
def setUp(self):
super(PauseServerNoLegacyPolicyTest, self).setUp()
# Check that system admin or server owner is able to pause/unpause
# the server
self.admin_or_owner_authorized_contexts = [
self.system_admin_context,
super(PauseServerScopeTypeNoLegacyPolicyTest, self).setUp()
# With scope enable and no legacy rule, only project admin/member
# will be able to pause/unpause the server.
self.project_action_authorized_contexts = [
self.project_admin_context, self.project_member_context]
# Check that non-system/admin/owner is not able to pause/unpause
# the server
self.admin_or_owner_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.other_project_member_context, self.project_reader_context,
self.project_foo_context,
self.other_project_reader_context,
]

View File

@ -48,31 +48,38 @@ class RemoteConsolesPolicyTest(base.BasePolicyTest):
user_id=user_id, vm_state=vm_states.ACTIVE,
task_state=None, launched_at=timeutils.utcnow())
self.mock_get.return_value = self.instance
# Check that admin or and server owner is able to get server
# remote consoles.
self.admin_or_owner_authorized_contexts = [
# With legacy rule and no scope checks, all admin, project members
# project reader or other project role(because legacy rule allow server
# owner- having same project id and no role check) is able to get
# server remote consoles.
self.project_action_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
# Check that non-admin/owner is not able to get server
# remote consoles.
self.admin_or_owner_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
def test_create_console_policy(self):
rule_name = rc_policies.BASE_POLICY_NAME
body = {'remote_console': {'protocol': 'vnc', 'type': 'novnc'}}
self.common_policy_check(self.admin_or_owner_authorized_contexts,
self.admin_or_owner_unauthorized_contexts,
rule_name,
self.controller.create,
self.req, self.instance.uuid,
body=body)
self.common_policy_auth(self.project_action_authorized_contexts,
rule_name,
self.controller.create,
self.req, self.instance.uuid,
body=body)
class RemoteConsolesNoLegacyNoScopePolicyTest(RemoteConsolesPolicyTest):
"""Test Remote Consoles APIs policies with no legacy deprecated rules
and no scope checks which means new defaults only.
"""
without_deprecated_rules = True
def setUp(self):
super(RemoteConsolesNoLegacyNoScopePolicyTest, self).setUp()
# With no legacy rule, only project admin or member will be
# able get server remote consoles.
self.project_action_authorized_contexts = [
self.project_admin_context, self.project_member_context]
class RemoteConsolesScopeTypePolicyTest(RemoteConsolesPolicyTest):
@ -88,9 +95,16 @@ class RemoteConsolesScopeTypePolicyTest(RemoteConsolesPolicyTest):
def setUp(self):
super(RemoteConsolesScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Scope enable will not allow system admin to get server
# remote console.
self.project_action_authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
class RemoteConsolesNoLegacyPolicyTest(RemoteConsolesScopeTypePolicyTest):
class RemoteConsolesScopeTypeNoLegacyPolicyTest(
RemoteConsolesScopeTypePolicyTest):
"""Test Remote Consoles APIs policies with system scope enabled,
and no more deprecated rules that allow the legacy admin API to
access system APIs.
@ -98,18 +112,8 @@ class RemoteConsolesNoLegacyPolicyTest(RemoteConsolesScopeTypePolicyTest):
without_deprecated_rules = True
def setUp(self):
super(RemoteConsolesNoLegacyPolicyTest, self).setUp()
# Check that system admin or and server owner is able to get server
# remote consoles.
self.admin_or_owner_authorized_contexts = [
self.system_admin_context,
super(RemoteConsolesScopeTypeNoLegacyPolicyTest, self).setUp()
# With scope enable and no legacy rule, only project admin/member
# will be able to get server remote console.
self.project_action_authorized_contexts = [
self.project_admin_context, self.project_member_context]
# Check that non-system/admin/owner is not able to get server
# remote consoles.
self.admin_or_owner_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.other_project_member_context, self.project_reader_context,
self.project_foo_context,
self.other_project_reader_context,
]

View File

@ -48,40 +48,32 @@ class RescueServerPolicyTest(base.BasePolicyTest):
task_state=None, launched_at=timeutils.utcnow())
self.mock_get.return_value = self.instance
# Check that admin or and server owner is able to rescue/unrescue
# the sevrer
self.admin_or_owner_authorized_contexts = [
# With legacy rule and no scope checks, all admin, project members
# project reader or other project role(because legacy rule allow server
# owner- having same project id and no role check) is able to rescue,
# unrescue the server.
self.project_action_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
# Check that non-admin/owner is not able to rescue/unrescue
# the server
self.admin_or_owner_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
@mock.patch('nova.compute.api.API.rescue')
def test_rescue_server_policy(self, mock_rescue):
rule_name = rs_policies.BASE_POLICY_NAME
self.common_policy_check(self.admin_or_owner_authorized_contexts,
self.admin_or_owner_unauthorized_contexts,
rule_name,
self.controller._rescue,
self.req, self.instance.uuid,
body={'rescue': {}})
self.common_policy_auth(self.project_action_authorized_contexts,
rule_name,
self.controller._rescue,
self.req, self.instance.uuid,
body={'rescue': {}})
@mock.patch('nova.compute.api.API.unrescue')
def test_unrescue_server_policy(self, mock_unrescue):
rule_name = rs_policies.UNRESCUE_POLICY_NAME
self.common_policy_check(self.admin_or_owner_authorized_contexts,
self.admin_or_owner_unauthorized_contexts,
rule_name,
self.controller._unrescue,
self.req, self.instance.uuid,
body={'unrescue': {}})
self.common_policy_auth(self.project_action_authorized_contexts,
rule_name,
self.controller._unrescue,
self.req, self.instance.uuid,
body={'unrescue': {}})
def test_rescue_server_policy_failed_with_other_user(self):
# Change the user_id in request context.
@ -106,6 +98,27 @@ class RescueServerPolicyTest(base.BasePolicyTest):
body={'rescue': {}})
class RescueServerNoLegacyNoScopePolicyTest(RescueServerPolicyTest):
"""Test rescue/unrescue server APIs policies with no legacy deprecated rules
and no scope checks which means new defaults only.
"""
without_deprecated_rules = True
rules_without_deprecation = {
rs_policies.UNRESCUE_POLICY_NAME:
base_policy.PROJECT_MEMBER,
rs_policies.BASE_POLICY_NAME:
base_policy.PROJECT_MEMBER}
def setUp(self):
super(RescueServerNoLegacyNoScopePolicyTest, self).setUp()
# With no legacy rule, only project admin or member will be
# able to rescue/unrescue the server.
self.project_action_authorized_contexts = [
self.project_admin_context, self.project_member_context]
class RescueServerScopeTypePolicyTest(RescueServerPolicyTest):
"""Test Rescue Server APIs policies with system scope enabled.
This class set the nova.conf [oslo_policy] enforce_scope to True
@ -119,9 +132,15 @@ class RescueServerScopeTypePolicyTest(RescueServerPolicyTest):
def setUp(self):
super(RescueServerScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Scope enable will not allow system admin to rescue/unrescue the
# server.
self.project_action_authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
class RescueServerNoLegacyPolicyTest(RescueServerScopeTypePolicyTest):
class RescueServerScopeTypeNoLegacyPolicyTest(RescueServerScopeTypePolicyTest):
"""Test Rescue Server APIs policies with system scope enabled,
and no more deprecated rules that allow the legacy admin API to
access system APIs.
@ -129,23 +148,13 @@ class RescueServerNoLegacyPolicyTest(RescueServerScopeTypePolicyTest):
without_deprecated_rules = True
rules_without_deprecation = {
rs_policies.UNRESCUE_POLICY_NAME:
base_policy.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
base_policy.PROJECT_MEMBER,
rs_policies.BASE_POLICY_NAME:
base_policy.PROJECT_MEMBER_OR_SYSTEM_ADMIN}
base_policy.PROJECT_MEMBER}
def setUp(self):
super(RescueServerNoLegacyPolicyTest, self).setUp()
# Check that system admin or and server owner is able to
# rescue/unrescue the sevrer
self.admin_or_owner_authorized_contexts = [
self.system_admin_context,
super(RescueServerScopeTypeNoLegacyPolicyTest, self).setUp()
# With scope enable and no legacy rule, only project admin/member
# will be able to rescue/unrescue the server.
self.project_action_authorized_contexts = [
self.project_admin_context, self.project_member_context]
# Check that non-system/admin/owner is not able to rescue/unrescue
# the server
self.admin_or_owner_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.other_project_member_context, self.project_reader_context,
self.project_foo_context,
self.other_project_reader_context,
]

View File

@ -43,63 +43,48 @@ class ShelveServerPolicyTest(base.BasePolicyTest):
id=1, uuid=uuids.fake_id, project_id=self.project_id,
user_id=user_id, vm_state=vm_states.ACTIVE)
self.mock_get.return_value = self.instance
# Check that admin or and server owner is able to shelve/unshelve
# the server
self.admin_or_owner_authorized_contexts = [
# With legacy rule and no scope checks, all admin, project members
# project reader or other project role(because legacy rule allow server
# owner- having same project id and no role check) is able to shelve,
# unshelve the server.
self.project_action_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
# Check that non-admin/owner is not able to shelve/unshelve
# the server
self.admin_or_owner_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
# Check that admin is able to shelve offload the server.
self.admin_authorized_contexts = [
# By default, legacy rule are enable and scope check is disabled.
# system admin, legacy admin, and project admin is able to shelve
# offload the server.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
# Check that non-admin is not able to shelve offload the server.
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
@mock.patch('nova.compute.api.API.shelve')
def test_shelve_server_policy(self, mock_shelve):
rule_name = policies.POLICY_ROOT % 'shelve'
self.common_policy_check(self.admin_or_owner_authorized_contexts,
self.admin_or_owner_unauthorized_contexts,
rule_name,
self.controller._shelve,
self.req, self.instance.uuid,
body={'shelve': {}})
self.common_policy_auth(self.project_action_authorized_contexts,
rule_name,
self.controller._shelve,
self.req, self.instance.uuid,
body={'shelve': {}})
@mock.patch('nova.compute.api.API.unshelve')
def test_unshelve_server_policy(self, mock_unshelve):
rule_name = policies.POLICY_ROOT % 'unshelve'
self.common_policy_check(self.admin_or_owner_authorized_contexts,
self.admin_or_owner_unauthorized_contexts,
rule_name,
self.controller._unshelve,
self.req, self.instance.uuid,
body={'unshelve': {}})
self.common_policy_auth(self.project_action_authorized_contexts,
rule_name,
self.controller._unshelve,
self.req, self.instance.uuid,
body={'unshelve': {}})
@mock.patch('nova.compute.api.API.shelve_offload')
def test_shelve_offload_server_policy(self, mock_offload):
rule_name = policies.POLICY_ROOT % 'shelve_offload'
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name,
self.controller._shelve_offload,
self.req, self.instance.uuid,
body={'shelveOffload': {}})
self.common_policy_auth(self.project_admin_authorized_contexts,
rule_name,
self.controller._shelve_offload,
self.req, self.instance.uuid,
body={'shelveOffload': {}})
def test_shelve_server_policy_failed_with_other_user(self):
# Change the user_id in request context.
@ -124,6 +109,23 @@ class ShelveServerPolicyTest(base.BasePolicyTest):
body={'shelve': {}})
class ShelveServerNoLegacyNoScopePolicyTest(ShelveServerPolicyTest):
"""Test shelve/unshelve server APIs policies with no legacy deprecated
rules and no scope checks which means new defaults only.
"""
without_deprecated_rules = True
def setUp(self):
super(ShelveServerNoLegacyNoScopePolicyTest, self).setUp()
# With no legacy rule, only project admin or member will be
# able to shelve/unshelve the server and only project admin can
# shelve offload the server.
self.project_action_authorized_contexts = [
self.project_admin_context, self.project_member_context]
self.project_admin_authorized_contexts = [self.project_admin_context]
class ShelveServerScopeTypePolicyTest(ShelveServerPolicyTest):
"""Test Shelve Server APIs policies with system scope enabled.
This class set the nova.conf [oslo_policy] enforce_scope to True
@ -137,41 +139,26 @@ class ShelveServerScopeTypePolicyTest(ShelveServerPolicyTest):
def setUp(self):
super(ShelveServerScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Scope enable will not allow system admin to shelve/unshelve the
# server.
self.project_action_authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context]
class ShelveServerNoLegacyPolicyTest(ShelveServerScopeTypePolicyTest):
class ShelveServerScopeTypeNoLegacyPolicyTest(ShelveServerScopeTypePolicyTest):
"""Test Shelve Server APIs policies with system scope enabled,
and no more deprecated rules.
"""
without_deprecated_rules = True
def setUp(self):
super(ShelveServerNoLegacyPolicyTest, self).setUp()
# Check that system admin or and owner is able to shelve/unshelve
# the server.
self.admin_or_owner_authorized_contexts = [
self.system_admin_context,
super(ShelveServerScopeTypeNoLegacyPolicyTest, self).setUp()
# With scope enable and no legacy rule, only project admin/member
# will be able to shelve/unshelve the server.
self.project_action_authorized_contexts = [
self.project_admin_context, self.project_member_context]
# Check that non-system/admin/owner is not able to shelve/unshelve
# the server.
self.admin_or_owner_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.other_project_member_context, self.project_reader_context,
self.project_foo_context,
self.other_project_reader_context,
]
# Check that system admin is able to shelve offload the server.
self.admin_authorized_contexts = [
self.system_admin_context
]
# Check that non system admin is not able to shelve offload the server
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
self.project_admin_authorized_contexts = [self.project_admin_context]

View File

@ -44,40 +44,32 @@ class SuspendServerPolicyTest(base.BasePolicyTest):
user_id=user_id, vm_state=vm_states.ACTIVE)
self.mock_get.return_value = self.instance
# Check that admin or and server owner is able to suspend/resume
# the server
self.admin_or_owner_authorized_contexts = [
# With legacy rule and no scope checks, all admin, project members
# project reader or other project role(because legacy rule allow server
# owner- having same project id and no role check) is able to suspend
# resume the server.
self.project_action_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
# Check that non-admin/owner is not able to suspend/resume
# the server
self.admin_or_owner_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
@mock.patch('nova.compute.api.API.suspend')
def test_suspend_server_policy(self, mock_suspend):
rule_name = policies.POLICY_ROOT % 'suspend'
self.common_policy_check(self.admin_or_owner_authorized_contexts,
self.admin_or_owner_unauthorized_contexts,
rule_name,
self.controller._suspend,
self.req, self.instance.uuid,
body={'suspend': {}})
self.common_policy_auth(self.project_action_authorized_contexts,
rule_name,
self.controller._suspend,
self.req, self.instance.uuid,
body={'suspend': {}})
@mock.patch('nova.compute.api.API.resume')
def test_resume_server_policy(self, mock_resume):
rule_name = policies.POLICY_ROOT % 'resume'
self.common_policy_check(self.admin_or_owner_authorized_contexts,
self.admin_or_owner_unauthorized_contexts,
rule_name,
self.controller._resume,
self.req, self.instance.uuid,
body={'resume': {}})
self.common_policy_auth(self.project_action_authorized_contexts,
rule_name,
self.controller._resume,
self.req, self.instance.uuid,
body={'resume': {}})
def test_suspend_server_policy_failed_with_other_user(self):
# Change the user_id in request context.
@ -102,6 +94,22 @@ class SuspendServerPolicyTest(base.BasePolicyTest):
body={'suspend': {}})
class SuspendServerNoLegacyNoScopePolicyTest(SuspendServerPolicyTest):
"""Test suspend server APIs policies with no legacy deprecated rules
and no scope checks which means new defaults only.
"""
without_deprecated_rules = True
def setUp(self):
super(SuspendServerNoLegacyNoScopePolicyTest, self).setUp()
# With no legacy rule, only project admin or member will be
# able to suspend/resume the server.
self.project_action_authorized_contexts = [
self.project_admin_context, self.project_member_context]
class SuspendServerScopeTypePolicyTest(SuspendServerPolicyTest):
"""Test Suspend Server APIs policies with system scope enabled.
This class set the nova.conf [oslo_policy] enforce_scope to True
@ -115,28 +123,24 @@ class SuspendServerScopeTypePolicyTest(SuspendServerPolicyTest):
def setUp(self):
super(SuspendServerScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Scope enable will not allow system admin to suspend/resume server.
self.project_action_authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
class SuspendServerNoLegacyPolicyTest(SuspendServerScopeTypePolicyTest):
"""Test Suspend Server APIs policies with system scope enabled,
and no more deprecated rules that allow the legacy admin API to
access system APIs.
class SuspendServerScopeTypeNoLegacyTest(SuspendServerScopeTypePolicyTest):
"""Test suspend/resume server APIs policies with system scope enabled,
and no more deprecated rules which means scope + new defaults so
only project admin and member is able to suspend/resume server.
"""
without_deprecated_rules = True
def setUp(self):
super(SuspendServerNoLegacyPolicyTest, self).setUp()
# Check that system admin or and server owner is able to
# suspend/resume the server.
self.admin_or_owner_authorized_contexts = [
self.system_admin_context,
super(SuspendServerScopeTypeNoLegacyTest, self).setUp()
# With scope enable and no legacy rule only project admin/member
# will be able to suspend/resume the server.
self.project_action_authorized_contexts = [
self.project_admin_context, self.project_member_context]
# Check that non-system/admin/owner is not able to suspend/resume
# the server.
self.admin_or_owner_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.other_project_member_context, self.project_reader_context,
self.project_foo_context,
self.other_project_reader_context,
]