Modify remaining APIs as per RBAC new guidelines

As per the RBAC new direction, we will allow
project resources operation to be performed by
the project scoped token only and system user will
be allowed to perform system level operation only
not project resources specific.

Details about new direction can be found in community-wide
goal
- https://governance.openstack.org/tc/goals/selected/consistent-and-secure-rbac.html

This commit modify remaining APIs as per the new guidelines.

Also, allow all project admin to list the other project limits. This is
what we allowed in legacy policy and until we have domain admin or other
way to list other project resources/info, we will keep that behaviour.

Also modifying and adding tests for four cases:
1. enforce_scope=False + legacy rule (current default policies)
2. enforce_scope=False + No legacy rule
3. enforce_scope=True + legacy rule
4. enforce_scope=True + no legacy rule (end goal of new RBAC)

Partial implement blueprint policy-defaults-refresh-2

Change-Id: I006d47aa2f4678a06c78057bcf407302abbe4907
This commit is contained in:
Ghanshyam Mann 2022-02-13 19:56:27 -06:00
parent 20a07ee9a6
commit ab084d4d1d
36 changed files with 648 additions and 817 deletions

View File

@ -39,6 +39,11 @@ class AssistedVolumeSnapshotsController(wsgi.Controller):
def create(self, req, body):
"""Creates a new snapshot."""
context = req.environ['nova.context']
# NOTE(gmann) We pass empty target to policy enforcement. This API
# is called by cinder which does not have correct project_id.
# By passing the empty target, we make sure that we do not check
# the requester project_id and allow users with
# allowed role to create snapshot.
context.can(avs_policies.POLICY_ROOT % 'create', target={})
snapshot = body['snapshot']
@ -69,6 +74,11 @@ class AssistedVolumeSnapshotsController(wsgi.Controller):
def delete(self, req, id):
"""Delete a snapshot."""
context = req.environ['nova.context']
# NOTE(gmann) We pass empty target to policy enforcement. This API
# is called by cinder which does not have correct project_id.
# By passing the empty target, we make sure that we do not check
# the requester project_id and allow users with allowed role to
# delete snapshot.
context.can(avs_policies.POLICY_ROOT % 'delete', target={})
delete_metadata = {}

View File

@ -30,7 +30,7 @@ class ConsoleAuthTokensController(wsgi.Controller):
def _show(self, req, id, rdp_only):
"""Checks a console auth token and returns the related connect info."""
context = req.environ['nova.context']
context.can(cat_policies.BASE_POLICY_NAME, target={})
context.can(cat_policies.BASE_POLICY_NAME)
token = id
if not token:

View File

@ -78,8 +78,7 @@ class LimitsController(wsgi.Controller):
project_id = context.project_id
if 'tenant_id' in req.GET:
project_id = req.GET.get('tenant_id')
context.can(limits_policies.OTHER_PROJECT_LIMIT_POLICY_NAME,
target={'project_id': project_id})
context.can(limits_policies.OTHER_PROJECT_LIMIT_POLICY_NAME)
quotas = QUOTAS.get_project_quotas(context, project_id,
usages=True)

View File

@ -89,7 +89,7 @@ class MigrationsController(wsgi.Controller):
sort_dirs=None, sort_keys=None, limit=None, marker=None,
allow_changes_since=False, allow_changes_before=False):
context = req.environ['nova.context']
context.can(migrations_policies.POLICY_ROOT % 'index', target={})
context.can(migrations_policies.POLICY_ROOT % 'index')
search_opts = {}
search_opts.update(req.GET)
if 'changes-since' in search_opts:

View File

@ -73,6 +73,11 @@ class ServerExternalEventsController(wsgi.Controller):
def create(self, req, body):
"""Creates a new instance event."""
context = req.environ['nova.context']
# NOTE(gmann) We pass empty target to policy enforcement. This API
# is called by neutron which does not have correct project_id where
# server belongs to. By passing the empty target, we make sure that
# we do not check the requester project_id and allow users with
# allowed role to create external event.
context.can(see_policies.POLICY_ROOT % 'create', target={})
response_events = []

View File

@ -24,7 +24,14 @@ POLICY_ROOT = 'os_compute_api:os-assisted-volume-snapshots:%s'
assisted_volume_snapshots_policies = [
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'create',
check_str=base.SYSTEM_ADMIN,
# TODO(gmann): This is internal API policy and called by
# cinder. Add 'service' role in this policy so that cinder
# can call it with user having 'service' role (not having
# correct project_id). That is for phase-2 of RBAC goal and until
# then, we keep it open for all admin in any project. We cannot
# default it to PROJECT_ADMIN which has the project_id in
# check_str and will fail if cinder call it with other project_id.
check_str=base.ADMIN,
description="Create an assisted volume snapshot",
operations=[
{
@ -32,10 +39,17 @@ assisted_volume_snapshots_policies = [
'method': 'POST'
}
],
scope_types=['system']),
scope_types=['project']),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'delete',
check_str=base.SYSTEM_ADMIN,
# TODO(gmann): This is internal API policy and called by
# cinder. Add 'service' role in this policy so that cinder
# can call it with user having 'service' role (not having
# correct project_id). That is for phase-2 of RBAC goal and until
# then, we keep it open for all admin in any project. We cannot
# default it to PROJECT_ADMIN which has the project_id in
# check_str and will fail if cinder call it with other project_id.
check_str=base.ADMIN,
description="Delete an assisted volume snapshot",
operations=[
{
@ -43,7 +57,7 @@ assisted_volume_snapshots_policies = [
'method': 'DELETE'
}
],
scope_types=['system']),
scope_types=['project']),
]

View File

@ -24,7 +24,7 @@ BASE_POLICY_NAME = 'os_compute_api:os-console-auth-tokens'
console_auth_tokens_policies = [
policy.DocumentedRuleDefault(
name=BASE_POLICY_NAME,
check_str=base.SYSTEM_READER,
check_str=base.PROJECT_ADMIN,
description="Show console connection information for a given console "
"authentication token",
operations=[
@ -33,7 +33,7 @@ console_auth_tokens_policies = [
'path': '/os-console-auth-tokens/{console_token}'
}
],
scope_types=['system'])
scope_types=['project'])
]

View File

@ -24,7 +24,7 @@ BASE_POLICY_NAME = 'os_compute_api:os-console-output'
console_output_policies = [
policy.DocumentedRuleDefault(
name=BASE_POLICY_NAME,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description='Show console output for a server',
operations=[
{
@ -32,7 +32,7 @@ console_output_policies = [
'path': '/servers/{server_id}/action (os-getConsoleOutput)'
}
],
scope_types=['system', 'project'])
scope_types=['project'])
]

View File

@ -24,7 +24,7 @@ BASE_POLICY_NAME = 'os_compute_api:os-create-backup'
create_backup_policies = [
policy.DocumentedRuleDefault(
name=BASE_POLICY_NAME,
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description='Create a back up of a server',
operations=[
{
@ -32,7 +32,7 @@ create_backup_policies = [
'path': '/servers/{server_id}/action (createBackup)'
}
],
scope_types=['system', 'project'])
scope_types=['project'])
]

View File

@ -36,7 +36,7 @@ DEPRECATED_POLICY = policy.DeprecatedRule(
deferred_delete_policies = [
policy.DocumentedRuleDefault(
name=BASE_POLICY_NAME % 'restore',
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description="Restore a soft deleted server",
operations=[
{
@ -44,11 +44,11 @@ deferred_delete_policies = [
'path': '/servers/{server_id}/action (restore)'
},
],
scope_types=['system', 'project'],
scope_types=['project'],
deprecated_rule=DEPRECATED_POLICY),
policy.DocumentedRuleDefault(
name=BASE_POLICY_NAME % 'force',
check_str=base.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
check_str=base.PROJECT_MEMBER,
description="Force delete a server before deferred cleanup",
operations=[
{
@ -56,7 +56,7 @@ deferred_delete_policies = [
'path': '/servers/{server_id}/action (forceDelete)'
}
],
scope_types=['system', 'project'],
scope_types=['project'],
deprecated_rule=DEPRECATED_POLICY)
]

View File

@ -25,8 +25,8 @@ POLICY_ROOT = 'os_compute_api:os-flavor-access:%s'
# NOTE(gmann): Deprecating this policy explicitly as old defaults
# admin or owner is not suitable for that which should be admin (Bug#1867840)
# but changing that will break old deployment so let's keep supporting
# the old default also and new default can be SYSTEM_READER
# SYSTEM_READER rule in base class is defined with the deprecated rule of admin
# the old default also and new default can be System Admin.
# System Admin rule in base class is defined with the deprecated rule of admin
# not admin or owner which is the main reason that we need to explicitly
# deprecate this policy here.
DEPRECATED_REASON = """
@ -45,7 +45,7 @@ DEPRECATED_FLAVOR_ACCESS_POLICY = policy.DeprecatedRule(
flavor_access_policies = [
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'add_tenant_access',
check_str=base.SYSTEM_ADMIN,
check_str=base.ADMIN,
description="Add flavor access to a tenant",
operations=[
{
@ -56,7 +56,7 @@ flavor_access_policies = [
scope_types=['system']),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'remove_tenant_access',
check_str=base.SYSTEM_ADMIN,
check_str=base.ADMIN,
description="Remove flavor access from a tenant",
operations=[
{
@ -67,7 +67,7 @@ flavor_access_policies = [
scope_types=['system']),
policy.DocumentedRuleDefault(
name=BASE_POLICY_NAME,
check_str=base.SYSTEM_READER,
check_str=base.ADMIN,
description="""List flavor access information
Allows access to the full list of tenants that have access

View File

@ -25,7 +25,7 @@ POLICY_ROOT = 'os_compute_api:os-flavor-manage:%s'
flavor_manage_policies = [
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'create',
check_str=base.SYSTEM_ADMIN,
check_str=base.ADMIN,
description="Create a flavor",
operations=[
{
@ -36,7 +36,7 @@ flavor_manage_policies = [
scope_types=['system']),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'update',
check_str=base.SYSTEM_ADMIN,
check_str=base.ADMIN,
description="Update a flavor",
operations=[
{
@ -47,7 +47,7 @@ flavor_manage_policies = [
scope_types=['system']),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'delete',
check_str=base.SYSTEM_ADMIN,
check_str=base.ADMIN,
description="Delete a flavor",
operations=[
{

View File

@ -36,7 +36,7 @@ DEPRECATED_POLICY = policy.DeprecatedRule(
instance_usage_audit_log_policies = [
policy.DocumentedRuleDefault(
name=BASE_POLICY_NAME % 'list',
check_str=base.SYSTEM_READER,
check_str=base.ADMIN,
description="List all usage audits.",
operations=[
{
@ -48,7 +48,7 @@ instance_usage_audit_log_policies = [
deprecated_rule=DEPRECATED_POLICY),
policy.DocumentedRuleDefault(
name=BASE_POLICY_NAME % 'show',
check_str=base.SYSTEM_READER,
check_str=base.ADMIN,
description="List all usage audits occurred before "
"a specified time for all servers on all compute hosts where "
"usage auditing is configured",

View File

@ -23,7 +23,7 @@ POLICY_ROOT = 'os_compute_api:os-keypairs:%s'
keypairs_policies = [
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'index',
check_str='(' + base.SYSTEM_READER + ') or user_id:%(user_id)s',
check_str='(' + base.ADMIN + ') or user_id:%(user_id)s',
description="List all keypairs",
operations=[
{
@ -34,7 +34,7 @@ keypairs_policies = [
scope_types=['system', 'project']),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'create',
check_str='(' + base.SYSTEM_ADMIN + ') or user_id:%(user_id)s',
check_str='(' + base.ADMIN + ') or user_id:%(user_id)s',
description="Create a keypair",
operations=[
{
@ -45,7 +45,7 @@ keypairs_policies = [
scope_types=['system', 'project']),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'delete',
check_str='(' + base.SYSTEM_ADMIN + ') or user_id:%(user_id)s',
check_str='(' + base.ADMIN + ') or user_id:%(user_id)s',
description="Delete a keypair",
operations=[
{
@ -56,7 +56,7 @@ keypairs_policies = [
scope_types=['system', 'project']),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'show',
check_str='(' + base.SYSTEM_READER + ') or user_id:%(user_id)s',
check_str='(' + base.ADMIN + ') or user_id:%(user_id)s',
description="Show details of a keypair",
operations=[
{

View File

@ -46,10 +46,10 @@ limits_policies = [
'path': '/limits'
}
],
scope_types=['system', 'project']),
scope_types=['project']),
policy.DocumentedRuleDefault(
name=OTHER_PROJECT_LIMIT_POLICY_NAME,
check_str=base.SYSTEM_READER,
check_str=base.PROJECT_ADMIN,
description="""Show rate and absolute limits of other project.
This policy only checks if the user has access to the requested
@ -61,7 +61,7 @@ os_compute_api:limits passes""",
'path': '/limits'
}
],
scope_types=['system'],
scope_types=['project'],
deprecated_rule=DEPRECATED_POLICY),
]

View File

@ -24,7 +24,7 @@ POLICY_ROOT = 'os_compute_api:os-migrate-server:%s'
migrate_server_policies = [
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'migrate',
check_str=base.SYSTEM_ADMIN,
check_str=base.PROJECT_ADMIN,
description="Cold migrate a server to a host",
operations=[
{
@ -32,10 +32,10 @@ migrate_server_policies = [
'path': '/servers/{server_id}/action (migrate)'
}
],
scope_types=['system', 'project']),
scope_types=['project']),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'migrate_live',
check_str=base.SYSTEM_ADMIN,
check_str=base.PROJECT_ADMIN,
description="Live migrate a server to a new host without a reboot",
operations=[
{
@ -43,7 +43,7 @@ migrate_server_policies = [
'path': '/servers/{server_id}/action (os-migrateLive)'
}
],
scope_types=['system', 'project']),
scope_types=['project']),
]

View File

@ -24,7 +24,7 @@ POLICY_ROOT = 'os_compute_api:os-migrations:%s'
migrations_policies = [
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'index',
check_str=base.SYSTEM_READER,
check_str=base.PROJECT_ADMIN,
description="List migrations",
operations=[
{
@ -32,7 +32,7 @@ migrations_policies = [
'path': '/os-migrations'
}
],
scope_types=['system']),
scope_types=['project']),
]

View File

@ -24,7 +24,7 @@ BASE_POLICY_NAME = 'os_compute_api:os-server-diagnostics'
server_diagnostics_policies = [
policy.DocumentedRuleDefault(
name=BASE_POLICY_NAME,
check_str=base.SYSTEM_ADMIN,
check_str=base.PROJECT_ADMIN,
description="Show the usage data for a server",
operations=[
{
@ -32,7 +32,7 @@ server_diagnostics_policies = [
'path': '/servers/{server_id}/diagnostics'
}
],
scope_types=['system', 'project']),
scope_types=['project']),
]

View File

@ -24,7 +24,15 @@ POLICY_ROOT = 'os_compute_api:os-server-external-events:%s'
server_external_events_policies = [
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'create',
check_str=base.SYSTEM_ADMIN,
# TODO(gmann): This is internal API policy and supposed to be called
# by neutron, cinder, ironic, and cyborg (may be other openstack
# services in future). Add 'service' role in this policy so that
# neutron can call it with user having 'service' role (not having
# server's project_id). That is for phase-2 of RBAC goal and until
# then, we keep it open for all admin in any project. We cannot
# default it to PROJECT_ADMIN which has the project_id in
# check_str and will fail if neutron call it with other project_id.
check_str=base.ADMIN,
description="Create one or more external events",
operations=[
{
@ -32,7 +40,7 @@ server_external_events_policies = [
'path': '/os-server-external-events'
}
],
scope_types=['system']),
scope_types=['project']),
]

View File

@ -24,7 +24,7 @@ POLICY_ROOT = 'os_compute_api:servers:migrations:%s'
servers_migrations_policies = [
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'show',
check_str=base.SYSTEM_READER,
check_str=base.PROJECT_ADMIN,
description="Show details for an in-progress live migration for a "
"given server",
operations=[
@ -33,10 +33,10 @@ servers_migrations_policies = [
'path': '/servers/{server_id}/migrations/{migration_id}'
}
],
scope_types=['system', 'project']),
scope_types=['project']),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'force_complete',
check_str=base.SYSTEM_ADMIN,
check_str=base.PROJECT_ADMIN,
description="Force an in-progress live migration for a given server "
"to complete",
operations=[
@ -46,10 +46,10 @@ servers_migrations_policies = [
'/action (force_complete)'
}
],
scope_types=['system', 'project']),
scope_types=['project']),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'delete',
check_str=base.SYSTEM_ADMIN,
check_str=base.PROJECT_ADMIN,
description="Delete(Abort) an in-progress live migration",
operations=[
{
@ -57,10 +57,10 @@ servers_migrations_policies = [
'path': '/servers/{server_id}/migrations/{migration_id}'
}
],
scope_types=['system', 'project']),
scope_types=['project']),
policy.DocumentedRuleDefault(
name=POLICY_ROOT % 'index',
check_str=base.SYSTEM_READER,
check_str=base.PROJECT_ADMIN,
description="Lists in-progress live migrations for a given server",
operations=[
{
@ -68,7 +68,7 @@ servers_migrations_policies = [
'path': '/servers/{server_id}/migrations'
}
],
scope_types=['system', 'project']),
scope_types=['project']),
]

View File

@ -212,8 +212,7 @@ class LimitsControllerTestV21(BaseLimitTestSuite):
fake_req.get_response(self.controller)
self.assertEqual(2, self.mock_can.call_count)
self.mock_can.assert_called_with(
l_policies.OTHER_PROJECT_LIMIT_POLICY_NAME,
target={"project_id": tenant_id})
l_policies.OTHER_PROJECT_LIMIT_POLICY_NAME)
mock_get_quotas.assert_called_once_with(context,
tenant_id, usages=True)

View File

@ -32,18 +32,12 @@ class AssistedVolumeSnapshotPolicyTest(base.BasePolicyTest):
super(AssistedVolumeSnapshotPolicyTest, self).setUp()
self.controller = snapshots.AssistedVolumeSnapshotsController()
self.req = fakes.HTTPRequest.blank('')
# Check that admin is able to take volume snapshot.
self.admin_authorized_contexts = [
# By default, legacy rule are enable and scope check is disabled.
# system admin, legacy admin, and project admin is able to
# take volume snapshot.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
# Check that non-admin is not able to take volume snapshot.
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]
@mock.patch('nova.compute.api.API.volume_snapshot_create')
def test_assisted_create_policy(self, mock_create):
@ -52,10 +46,9 @@ class AssistedVolumeSnapshotPolicyTest(base.BasePolicyTest):
'create_info': {'type': 'qcow2',
'new_file': 'new_file',
'snapshot_id': 'snapshot_id'}}}
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller.create,
self.req, body=body)
self.common_policy_auth(self.project_admin_authorized_contexts,
rule_name, self.controller.create,
self.req, body=body)
@mock.patch('nova.compute.api.API.volume_snapshot_delete')
def test_assisted_delete_policy(self, mock_delete):
@ -64,11 +57,20 @@ class AssistedVolumeSnapshotPolicyTest(base.BasePolicyTest):
'delete_info': jsonutils.dumps({'volume_id': '1'}),
}
req = fakes.HTTPRequest.blank('?%s' % urllib.parse.urlencode(params))
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name,
self.controller.delete,
req, 1)
self.common_policy_auth(self.project_admin_authorized_contexts,
rule_name,
self.controller.delete,
req, 1)
class AssistedSnapshotNoLegacyNoScopePolicyTest(
AssistedVolumeSnapshotPolicyTest):
"""Test Assisted Snapshot APIs policies with no legacy deprecated rules
and no scope checks.
"""
without_deprecated_rules = True
class AssistedSnapshotScopeTypePolicyTest(AssistedVolumeSnapshotPolicyTest):
@ -85,16 +87,15 @@ class AssistedSnapshotScopeTypePolicyTest(AssistedVolumeSnapshotPolicyTest):
super(AssistedSnapshotScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Check that system admin is able to take volume snapshot.
self.admin_authorized_contexts = [
self.system_admin_context]
# Check that non-system or non-admin is not able to take volume
# snapshot.
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.project_admin_context, self.project_member_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]
# With scope check enabled, system admin is not able to
# take volume snapshot.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context]
class AssistedSnapshotScopeTypeNoLegacyPolicyTest(
AssistedSnapshotScopeTypePolicyTest):
"""Test os-volume-attachments APIs policies with system scope enabled,
and no legacy deprecated rules.
"""
without_deprecated_rules = True

View File

@ -31,33 +31,29 @@ class ConsoleAuthTokensPolicyTest(base.BasePolicyTest):
self.controller = console_auth_tokens.ConsoleAuthTokensController()
self.req = fakes.HTTPRequest.blank('', version='2.31')
# Check that system reader is able to get console connection
# information.
# With legacy rule, any admin can get console connection
# NOTE(gmann): Until old default rule which is admin_api is
# deprecated and not removed, project admin and legacy admin
# will be able to get console. This make sure that existing
# tokens will keep working even we have changed this policy defaults
# to reader role.
self.reader_authorized_contexts = [
self.system_admin_context, self.system_member_context,
self.system_reader_context, self.legacy_admin_context,
# tokens will keep working.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
# Check that non-admin is not able to get console connection
# information.
self.reader_unauthorized_contexts = [
self.system_foo_context, self.other_project_member_context,
self.project_foo_context, self.project_member_context,
self.project_reader_context,
self.other_project_reader_context,
]
@mock.patch('nova.objects.ConsoleAuthToken.validate')
def test_console_connect_info_token_policy(self, mock_validate):
rule_name = "os_compute_api:os-console-auth-tokens"
self.common_policy_check(self.reader_authorized_contexts,
self.reader_unauthorized_contexts,
rule_name, self.controller.show,
self.req, fakes.FAKE_UUID)
self.common_policy_auth(self.project_admin_authorized_contexts,
rule_name, self.controller.show,
self.req, fakes.FAKE_UUID)
class ConsoleAuthTokensNoLegacyNoScopeTest(ConsoleAuthTokensPolicyTest):
"""Test Console Auth Tokens API policies with deprecated rules
disabled, but scope checking still disabled.
"""
without_deprecated_rules = True
class ConsoleAuthTokensScopeTypePolicyTest(ConsoleAuthTokensPolicyTest):
@ -75,17 +71,14 @@ class ConsoleAuthTokensScopeTypePolicyTest(ConsoleAuthTokensPolicyTest):
super(ConsoleAuthTokensScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Check that system reader is able to get console connection
# information.
self.reader_authorized_contexts = [
self.system_admin_context, self.system_member_context,
self.system_reader_context]
# Check that non-system-reader is not able to get console connection
# information.
self.reader_unauthorized_contexts = [
self.legacy_admin_context, self.system_foo_context,
self.project_admin_context, self.project_member_context,
self.other_project_member_context,
self.project_foo_context, self.project_reader_context,
self.other_project_reader_context,
]
# With scope enabled, system admin is not allowed.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context]
class ConsoleAuthTokensScopeTypeNoLegacyPolicyTest(
ConsoleAuthTokensScopeTypePolicyTest):
"""Test Console Auth Tokens APIs policies with system scope enabled,
and no more deprecated rules.
"""
without_deprecated_rules = True

View File

@ -43,30 +43,37 @@ class ConsoleOutputPolicyTest(base.BasePolicyTest):
id=1, uuid=uuid, vm_state=vm_states.ACTIVE,
task_state=None, launched_at=timeutils.utcnow())
self.mock_get.return_value = self.instance
# Check that admin or owner is able to get the server console.
self.admin_authorized_contexts = [
# With legacy rule, any admin and role in project
# can get the server console.
self.project_member_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context
]
# Check that non-admin and non-owner is not able to get the server
# console.
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
self.project_reader_context, self.project_foo_context]
@mock.patch('nova.compute.api.API.get_console_output')
def test_console_output_policy(self, mock_console):
mock_console.return_value = '\n'.join([str(i) for i in range(2)])
rule_name = "os_compute_api:os-console-output"
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller.get_console_output,
self.req, self.instance.uuid,
body={'os-getConsoleOutput': {}})
self.common_policy_auth(self.project_member_authorized_contexts,
rule_name, self.controller.get_console_output,
self.req, self.instance.uuid,
body={'os-getConsoleOutput': {}})
class ConsoleOutputNoLegacyNoScopePolicyTest(ConsoleOutputPolicyTest):
"""Test Server Console Output APIs policies with no legacy deprecated
rule and no scope check.
"""
without_deprecated_rules = True
def setUp(self):
super(ConsoleOutputNoLegacyNoScopePolicyTest, self).setUp()
# With no legacy rule, only project admin or member is able to
# get the server console.
self.project_member_authorized_contexts = [
self.project_admin_context, self.project_member_context]
class ConsoleOutputScopeTypePolicyTest(ConsoleOutputPolicyTest):
@ -83,31 +90,24 @@ class ConsoleOutputScopeTypePolicyTest(ConsoleOutputPolicyTest):
def setUp(self):
super(ConsoleOutputScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Scope enable will not allow system admin.
self.project_member_authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
class ConsoleOutputNoLegacyPolicyTest(ConsoleOutputPolicyTest):
class ConsoleOutputScopeTypeNoLegacyPolicyTest(
ConsoleOutputScopeTypePolicyTest):
"""Test Console Output APIs policies with system scope enabled,
and no more deprecated rules that allow the legacy admin API to
access system_admin_or_owner APIs.
and no more deprecated rules.
"""
without_deprecated_rules = True
def setUp(self):
super(ConsoleOutputNoLegacyPolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
super(ConsoleOutputScopeTypeNoLegacyPolicyTest, self).setUp()
# Check that system or projct admin or owner is able to
# With scope enable and no legacy rule, only project admin/member can
# get the server console.
self.admin_authorized_contexts = [
self.system_admin_context,
self.project_member_authorized_contexts = [
self.project_admin_context, self.project_member_context]
# Check that non-system and non-admin/owner is not able to
# get the server console.
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.project_reader_context,
self.project_foo_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]

View File

@ -43,20 +43,14 @@ class CreateBackupPolicyTest(base.BasePolicyTest):
id=1, uuid=uuid, vm_state=vm_states.ACTIVE,
task_state=None, launched_at=timeutils.utcnow())
self.mock_get.return_value = self.instance
# Check that admin or owner is able to create server backup.
self.admin_authorized_contexts = [
# With legacy rule and no scope checks, all admin, project members
# project reader or other project role(because legacy rule allow server
# owner- having same project id and no role check) is able to create
# server backup.
self.project_member_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context
]
# Check that non-admin and non-owner is not able to create server
# backup.
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
self.project_reader_context, self.project_foo_context]
@mock.patch('nova.compute.api.API.backup')
def test_create_backup_policy(self, mock_backup):
@ -68,11 +62,26 @@ class CreateBackupPolicyTest(base.BasePolicyTest):
'rotation': 1,
},
}
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller._create_backup,
self.req, self.instance.uuid,
body=body)
self.common_policy_auth(self.project_member_authorized_contexts,
rule_name, self.controller._create_backup,
self.req, self.instance.uuid,
body=body)
class CreateBackupNoLegacyNoScopePolicyTest(CreateBackupPolicyTest):
"""Test Create Backup server APIs policies with no legacy deprecated rules
and no scope checks.
"""
without_deprecated_rules = True
def setUp(self):
super(CreateBackupNoLegacyNoScopePolicyTest, self).setUp()
# With no legacy rule, only project admin or member will be
# able to create the server backup.
self.project_member_authorized_contexts = [
self.project_admin_context, self.project_member_context]
class CreateBackupScopeTypePolicyTest(CreateBackupPolicyTest):
@ -89,31 +98,22 @@ class CreateBackupScopeTypePolicyTest(CreateBackupPolicyTest):
def setUp(self):
super(CreateBackupScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Scope enable will not allow system users to create the server.
self.project_member_authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
class CreateBackupNoLegacyPolicyTest(CreateBackupPolicyTest):
class CreateBackupScopeTypeNoLegacyPolicyTest(CreateBackupScopeTypePolicyTest):
"""Test Create Backup APIs policies with system scope enabled,
and no more deprecated rules that allow the legacy admin API to
access system_admin_or_owner APIs.
and no more deprecated rules.
"""
without_deprecated_rules = True
def setUp(self):
super(CreateBackupNoLegacyPolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Check that system or projct admin or owner is able to create
# server backup.
self.admin_authorized_contexts = [
self.system_admin_context,
super(CreateBackupScopeTypeNoLegacyPolicyTest, self).setUp()
# With scope enable and no legacy rule, only project admin/member
# will be able to create the server backup.
self.project_member_authorized_contexts = [
self.project_admin_context, self.project_member_context]
# Check that non-system and non-admin/owner is not able to
# create server backup.
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.project_reader_context,
self.project_foo_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]

View File

@ -47,37 +47,29 @@ class DeferredDeletePolicyTest(base.BasePolicyTest):
id=1, uuid=uuid, user_id=user_id, vm_state=vm_states.ACTIVE,
task_state=None, launched_at=timeutils.utcnow())
self.mock_get.return_value = self.instance
# Check that admin or owner is able to force delete or restore server.
self.admin_authorized_contexts = [
# With legacy rule and no scope checks, all admin, project members
# project reader or other project role(because legacy rule allow server
# owner- having same project id and no role check) is able to force
# delete or restore server.
self.project_member_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context
]
# Check that non-admin and non-owner is not able to force delete or
# restore server.
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
self.project_reader_context, self.project_foo_context]
@mock.patch('nova.compute.api.API.restore')
def test_restore_server_policy(self, mock_restore):
rule_name = dd_policies.BASE_POLICY_NAME % 'restore'
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller._restore,
self.req, self.instance.uuid,
body={'restore': {}})
self.common_policy_auth(self.project_member_authorized_contexts,
rule_name, self.controller._restore,
self.req, self.instance.uuid,
body={'restore': {}})
def test_force_delete_server_policy(self):
rule_name = dd_policies.BASE_POLICY_NAME % 'force'
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller._force_delete,
self.req, self.instance.uuid,
body={'forceDelete': {}})
self.common_policy_auth(self.project_member_authorized_contexts,
rule_name, self.controller._force_delete,
self.req, self.instance.uuid,
body={'forceDelete': {}})
def test_force_delete_server_policy_failed_with_other_user(self):
rule_name = dd_policies.BASE_POLICY_NAME % 'force'
@ -103,6 +95,27 @@ class DeferredDeletePolicyTest(base.BasePolicyTest):
self.req.environ['nova.context'], self.instance)
class DeferredDeleteNoLegacyNoScopePolicyTest(DeferredDeletePolicyTest):
"""Test Deferred Delete server APIs policies with no legacy deprecated
rule and no scope check.
"""
without_deprecated_rules = True
rules_without_deprecation = {
dd_policies.BASE_POLICY_NAME % 'restore':
base_policy.PROJECT_MEMBER,
dd_policies.BASE_POLICY_NAME % 'force':
base_policy.PROJECT_MEMBER}
def setUp(self):
super(DeferredDeleteNoLegacyNoScopePolicyTest, self).setUp()
# With no legacy rule, only project admin or member is able to force
# delete or restore server.
self.project_member_authorized_contexts = [
self.project_admin_context, self.project_member_context]
class DeferredDeleteScopeTypePolicyTest(DeferredDeletePolicyTest):
"""Test Deferred Delete APIs policies with system scope enabled.
@ -117,36 +130,29 @@ class DeferredDeleteScopeTypePolicyTest(DeferredDeletePolicyTest):
def setUp(self):
super(DeferredDeleteScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Scope enable will not allow system admin.
self.project_member_authorized_contexts = [
self.legacy_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context]
class DeferredDeleteNoLegacyPolicyTest(DeferredDeletePolicyTest):
class DeferredDeleteScopeTypeNoLegacyPolicyTest(
DeferredDeleteScopeTypePolicyTest):
"""Test Deferred Delete APIs policies with system scope enabled,
and no more deprecated rules that allow the legacy admin API to
access system_admin_or_owner APIs.
and no more deprecated rules.
"""
without_deprecated_rules = True
rules_without_deprecation = {
dd_policies.BASE_POLICY_NAME % 'restore':
base_policy.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
base_policy.PROJECT_MEMBER,
dd_policies.BASE_POLICY_NAME % 'force':
base_policy.PROJECT_MEMBER_OR_SYSTEM_ADMIN}
base_policy.PROJECT_MEMBER}
def setUp(self):
super(DeferredDeleteNoLegacyPolicyTest, self).setUp()
super(DeferredDeleteScopeTypeNoLegacyPolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Check that system or projct admin or owner is able to
# force delete or restore server.
self.admin_authorized_contexts = [
self.system_admin_context,
# With scope enable and no legacy rule, only project admin/member is
# able to force delete or restore server.
self.project_member_authorized_contexts = [
self.project_admin_context, self.project_member_context]
# Check that non-system and non-admin/owner is not able to
# force delete or restore server.
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.project_reader_context,
self.project_foo_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]

View File

@ -49,62 +49,61 @@ class FlavorAccessPolicyTest(base.BasePolicyTest):
self.stub_out('nova.objects.flavor._get_projects_from_db',
lambda context, flavorid: [])
# Check that admin is able to add/remove flavor access
# to a tenant.
# With legacy rule and no scope checks, all admin is able to
# add/remove flavor access to a tenant.
self.admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
# Check that non-admin is not able to add/remove flavor access
# to a tenant.
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]
# Check that everyone is able to list flavor access
# information which is nothing but bug#1867840.
self.reader_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
self.reader_unauthorized_contexts = [
]
# With legacy rule, anyone can access flavor access info.
self.admin_index_authorized_contexts = self.all_contexts
def test_list_flavor_access_policy(self):
rule_name = fa_policy.BASE_POLICY_NAME
self.common_policy_check(self.reader_authorized_contexts,
self.reader_unauthorized_contexts,
rule_name, self.controller_index.index,
self.req, '1')
self.common_policy_auth(self.admin_index_authorized_contexts,
rule_name, self.controller_index.index,
self.req, '1')
@mock.patch('nova.objects.Flavor.add_access')
def test_add_tenant_access_policy(self, mock_add):
rule_name = fa_policy.POLICY_ROOT % "add_tenant_access"
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name,
self.controller._add_tenant_access,
self.req, '1',
body={'addTenantAccess': {'tenant': 't1'}})
self.common_policy_auth(self.admin_authorized_contexts,
rule_name,
self.controller._add_tenant_access,
self.req, '1',
body={'addTenantAccess': {'tenant': 't1'}})
@mock.patch('nova.objects.Flavor.remove_access')
def test_remove_tenant_access_policy(self, mock_remove):
rule_name = fa_policy.POLICY_ROOT % "remove_tenant_access"
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name,
self.controller._remove_tenant_access,
self.req, '1',
body={'removeTenantAccess': {'tenant': 't1'}})
self.common_policy_auth(self.admin_authorized_contexts,
rule_name,
self.controller._remove_tenant_access,
self.req, '1',
body={'removeTenantAccess': {'tenant': 't1'}})
class FlavorAccessNoLegacyNoScopeTest(FlavorAccessPolicyTest):
"""Test Flavor Access API policies with deprecated rules
disabled, but scope checking still disabled.
"""
without_deprecated_rules = True
rules_without_deprecation = {
fa_policy.POLICY_ROOT % "add_tenant_access":
base_policy.ADMIN,
fa_policy.POLICY_ROOT % "remove_tenant_access":
base_policy.ADMIN,
fa_policy.BASE_POLICY_NAME:
base_policy.ADMIN}
def setUp(self):
super(FlavorAccessNoLegacyNoScopeTest, self).setUp()
# with no legacy rule means all admin is able to list access info.
self.admin_index_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
class FlavorAccessScopeTypePolicyTest(FlavorAccessPolicyTest):
@ -122,81 +121,29 @@ class FlavorAccessScopeTypePolicyTest(FlavorAccessPolicyTest):
super(FlavorAccessScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Check that system admin is able to add/remove flavor access
# to a tenant.
# Scope checks remove project users power.
self.admin_authorized_contexts = [
self.system_admin_context]
# Check that non-system-admin is not able to add/remove flavor access
# to a tenant.
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.project_admin_context,
self.system_foo_context, self.project_member_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]
# Check that system user is able to list flavor access
# information.
self.reader_authorized_contexts = [
self.system_admin_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context]
# Check that non-system is not able to list flavor access
# information.
self.reader_unauthorized_contexts = [
self.legacy_admin_context, self.other_project_member_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_reader_context,
]
self.admin_index_authorized_contexts = [
self.system_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context]
class FlavorAccessNoLegacyPolicyTest(FlavorAccessPolicyTest):
class FlavorAccessScopeTypeNoLegacyPolicyTest(FlavorAccessScopeTypePolicyTest):
"""Test FlavorAccess APIs policies with system scope enabled,
and no more deprecated rules that allow the legacy admin API to
access system_redear APIs.
and no more deprecated rules.
"""
without_deprecated_rules = True
rules_without_deprecation = {
fa_policy.POLICY_ROOT % "add_tenant_access":
base_policy.SYSTEM_ADMIN,
base_policy.ADMIN,
fa_policy.POLICY_ROOT % "remove_tenant_access":
base_policy.SYSTEM_ADMIN,
base_policy.ADMIN,
fa_policy.BASE_POLICY_NAME:
base_policy.SYSTEM_READER}
base_policy.ADMIN}
def setUp(self):
super(FlavorAccessNoLegacyPolicyTest, self).setUp()
super(FlavorAccessScopeTypeNoLegacyPolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Check that system admin is able to add/remove flavor access
# to a tenant.
self.admin_authorized_contexts = [
self.admin_index_authorized_contexts = [
self.system_admin_context]
# Check that non-system-admin is not able to add/remove flavor access
# to a tenant.
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.project_admin_context,
self.system_foo_context, self.project_member_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]
# Check that system reader is able to list flavor access
# information.
self.reader_authorized_contexts = [
self.system_admin_context,
self.system_member_context, self.system_reader_context]
# Check that non-system-reader is not able to list flavor access
# information.
self.reader_unauthorized_contexts = [
self.legacy_admin_context, self.other_project_member_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.system_foo_context,
self.other_project_reader_context,
]

View File

@ -31,18 +31,11 @@ class FlavorManagePolicyTest(base.BasePolicyTest):
super(FlavorManagePolicyTest, self).setUp()
self.controller = flavor_manage.FlavorManageController()
self.req = fakes.HTTPRequest.blank('')
# Check that admin is able to manage the flavors.
# With legacy rule and no scope checks, all admin can manage
# the flavors.
self.admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
# Check that non-admin is not able to manage the flavors.
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]
def test_create_flavor_policy(self):
rule_name = fm_policies.POLICY_ROOT % 'create'
@ -67,29 +60,34 @@ class FlavorManagePolicyTest(base.BasePolicyTest):
"disk": 1,
}
}
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller._create,
self.req, body=body)
self.common_policy_auth(self.admin_authorized_contexts,
rule_name, self.controller._create,
self.req, body=body)
@mock.patch('nova.objects.Flavor.get_by_flavor_id')
@mock.patch('nova.objects.Flavor.save')
def test_update_flavor_policy(self, mock_save, mock_get):
rule_name = fm_policies.POLICY_ROOT % 'update'
req = fakes.HTTPRequest.blank('', version='2.55')
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller._update,
req, uuids.fake_id,
body={'flavor': {'description': None}})
self.common_policy_auth(self.admin_authorized_contexts,
rule_name, self.controller._update,
req, uuids.fake_id,
body={'flavor': {'description': None}})
@mock.patch('nova.objects.Flavor.destroy')
def test_delete_flavor_policy(self, mock_delete):
rule_name = fm_policies.POLICY_ROOT % 'delete'
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller._delete,
self.req, uuids.fake_id)
self.common_policy_auth(self.admin_authorized_contexts,
rule_name, self.controller._delete,
self.req, uuids.fake_id)
class FlavorManageNoLegacyNoScopeTest(FlavorManagePolicyTest):
"""Test Flavor Access API policies with deprecated rules
disabled, but scope checking still disabled.
"""
without_deprecated_rules = True
class FlavorManageScopeTypePolicyTest(FlavorManagePolicyTest):
@ -106,23 +104,15 @@ class FlavorManageScopeTypePolicyTest(FlavorManagePolicyTest):
super(FlavorManageScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Check that system admin is able to manage the flavors.
# With scope enable, only system admin is able to manage
# the flavors.
self.admin_authorized_contexts = [
self.system_admin_context]
# Check that non-system-admin is not able to manage the flavors.
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.project_admin_context,
self.system_foo_context, self.project_member_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]
class FlavorManageNoLegacyPolicyTest(FlavorManageScopeTypePolicyTest):
class FlavorManageScopeTypeNoLegacyPolicyTest(
FlavorManageScopeTypePolicyTest):
"""Test Flavor Manage APIs policies with system scope enabled,
and no more deprecated rules that allow the legacy admin API to
access system_admin_or_owner APIs.
and no more deprecated rules.
"""
without_deprecated_rules = True

View File

@ -35,37 +35,37 @@ class InstanceUsageAuditLogPolicyTest(base.BasePolicyTest):
self.controller.host_api.task_log_get_all = mock.MagicMock()
self.controller.host_api.service_get_all = mock.MagicMock()
# Check that admin is able to get instance usage audit log.
# NOTE(gmann): Until old default rule which is admin_api is
# deprecated and not removed, project admin and legacy admin
# will be able to get instance usage audit log. This make sure
# that existing tokens will keep working even we have changed
# this policy defaults to reader role.
self.reader_authorized_contexts = [
# With legacy rule, all admin_api will be able to get instance usage
# audit log.
self.admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.system_member_context,
self.system_reader_context]
# Check that non-admin is not able to get instance usage audit log.
self.reader_unauthorized_contexts = [
self.system_foo_context, self.project_member_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]
self.project_admin_context]
def test_show_policy(self):
rule_name = iual_policies.BASE_POLICY_NAME % 'show'
self.common_policy_check(self.reader_authorized_contexts,
self.reader_unauthorized_contexts,
rule_name, self.controller.show,
self.req, '2020-03-25 14:40:00')
self.common_policy_auth(self.admin_authorized_contexts,
rule_name, self.controller.show,
self.req, '2020-03-25 14:40:00')
def test_index_policy(self):
rule_name = iual_policies.BASE_POLICY_NAME % 'list'
self.common_policy_check(self.reader_authorized_contexts,
self.reader_unauthorized_contexts,
rule_name, self.controller.index,
self.req)
self.common_policy_auth(self.admin_authorized_contexts,
rule_name, self.controller.index,
self.req)
class InstanceUsageNoLegacyNoScopeTest(InstanceUsageAuditLogPolicyTest):
"""Test Instance Usage API policies with deprecated rules
disabled, but scope checking still disabled.
"""
without_deprecated_rules = True
rules_without_deprecation = {
iual_policies.BASE_POLICY_NAME % 'list':
base_policy.ADMIN,
iual_policies.BASE_POLICY_NAME % 'show':
base_policy.ADMIN,
}
class InstanceUsageScopeTypePolicyTest(InstanceUsageAuditLogPolicyTest):
@ -83,29 +83,20 @@ class InstanceUsageScopeTypePolicyTest(InstanceUsageAuditLogPolicyTest):
super(InstanceUsageScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Check that system reader is able to get instance usage audit log.
self.reader_authorized_contexts = [
self.system_admin_context, self.system_member_context,
self.system_reader_context]
# Check that non-system-admin is not able to get instance
# usage audit log.
self.reader_unauthorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.system_foo_context, self.project_member_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]
# Scope checks remove project users power.
self.admin_authorized_contexts = [
self.system_admin_context]
class InstanceUsageNoLegacyPolicyTest(InstanceUsageScopeTypePolicyTest):
class InstanceUsageScopeTypeNoLegacyPolicyTest(
InstanceUsageScopeTypePolicyTest):
"""Test Instance Usage Audit Log APIs policies with system scope enabled,
and no more deprecated rules.
"""
without_deprecated_rules = True
rules_without_deprecation = {
iual_policies.BASE_POLICY_NAME % 'list':
base_policy.SYSTEM_READER,
base_policy.ADMIN,
iual_policies.BASE_POLICY_NAME % 'show':
base_policy.SYSTEM_READER,
base_policy.ADMIN,
}

View File

@ -43,87 +43,57 @@ class KeypairsPolicyTest(base.BasePolicyTest):
self.other_project_member_context,
self.other_project_reader_context,
]
self.everyone_unauthorized_contexts = []
# Check that admin is able to create, delete and get
# other users keypairs.
self.admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
# Check that non-admin is not able to create, delete and get
# other users keypairs.
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
# Check that system reader is able to get
# other users keypairs.
self.system_reader_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.system_member_context,
self.system_reader_context]
# Check that non-system reader is not able to get
# other users keypairs.
self.system_reader_unauthorized_contexts = [
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
@mock.patch('nova.compute.api.KeypairAPI.get_key_pairs')
def test_index_keypairs_policy(self, mock_get):
rule_name = policies.POLICY_ROOT % 'index'
self.common_policy_check(self.everyone_authorized_contexts,
self.everyone_unauthorized_contexts,
rule_name,
self.controller.index,
self.req)
self.common_policy_auth(self.everyone_authorized_contexts,
rule_name,
self.controller.index,
self.req)
@mock.patch('nova.compute.api.KeypairAPI.get_key_pairs')
def test_index_others_keypairs_policy(self, mock_get):
req = fakes.HTTPRequest.blank('?user_id=user2', version='2.10')
rule_name = policies.POLICY_ROOT % 'index'
self.common_policy_check(self.system_reader_authorized_contexts,
self.system_reader_unauthorized_contexts,
rule_name,
self.controller.index,
req)
self.common_policy_auth(self.admin_authorized_contexts,
rule_name,
self.controller.index,
req)
@mock.patch('nova.compute.api.KeypairAPI.get_key_pair')
def test_show_keypairs_policy(self, mock_get):
rule_name = policies.POLICY_ROOT % 'show'
self.common_policy_check(self.everyone_authorized_contexts,
self.everyone_unauthorized_contexts,
rule_name,
self.controller.show,
self.req, fakes.FAKE_UUID)
self.common_policy_auth(self.everyone_authorized_contexts,
rule_name,
self.controller.show,
self.req, fakes.FAKE_UUID)
@mock.patch('nova.compute.api.KeypairAPI.get_key_pair')
def test_show_others_keypairs_policy(self, mock_get):
# Change the user_id in request context.
req = fakes.HTTPRequest.blank('?user_id=user2', version='2.10')
rule_name = policies.POLICY_ROOT % 'show'
self.common_policy_check(self.system_reader_authorized_contexts,
self.system_reader_unauthorized_contexts,
rule_name,
self.controller.show,
req, fakes.FAKE_UUID)
self.common_policy_auth(self.admin_authorized_contexts,
rule_name,
self.controller.show,
req, fakes.FAKE_UUID)
@mock.patch('nova.compute.api.KeypairAPI.create_key_pair')
def test_create_keypairs_policy(self, mock_create):
rule_name = policies.POLICY_ROOT % 'create'
mock_create.return_value = (test_keypair.fake_keypair, 'FAKE_KEY')
self.common_policy_check(self.everyone_authorized_contexts,
self.everyone_unauthorized_contexts,
rule_name,
self.controller.create,
self.req,
body={'keypair': {'name': 'create_test'}})
self.common_policy_auth(self.everyone_authorized_contexts,
rule_name,
self.controller.create,
self.req,
body={'keypair': {'name': 'create_test'}})
@mock.patch('nova.compute.api.KeypairAPI.create_key_pair')
def test_create_others_keypairs_policy(self, mock_create):
@ -132,31 +102,39 @@ class KeypairsPolicyTest(base.BasePolicyTest):
rule_name = policies.POLICY_ROOT % 'create'
mock_create.return_value = (test_keypair.fake_keypair, 'FAKE_KEY')
body = {'keypair': {'name': 'test2', 'user_id': 'user2'}}
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name,
self.controller.create,
req, body=body)
self.common_policy_auth(self.admin_authorized_contexts,
rule_name,
self.controller.create,
req, body=body)
@mock.patch('nova.compute.api.KeypairAPI.delete_key_pair')
def test_delete_keypairs_policy(self, mock_delete):
rule_name = policies.POLICY_ROOT % 'delete'
self.common_policy_check(self.everyone_authorized_contexts,
self.everyone_unauthorized_contexts,
rule_name,
self.controller.delete,
self.req, fakes.FAKE_UUID)
self.common_policy_auth(self.everyone_authorized_contexts,
rule_name,
self.controller.delete,
self.req, fakes.FAKE_UUID)
@mock.patch('nova.compute.api.KeypairAPI.delete_key_pair')
def test_delete_others_keypairs_policy(self, mock_delete):
# Change the user_id in request context.
req = fakes.HTTPRequest.blank('?user_id=user2', version='2.10')
rule_name = policies.POLICY_ROOT % 'delete'
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name,
self.controller.delete,
req, fakes.FAKE_UUID)
self.common_policy_auth(self.admin_authorized_contexts,
rule_name,
self.controller.delete,
req, fakes.FAKE_UUID)
class KeypairsNoLegacyNoScopeTest(KeypairsPolicyTest):
"""Test Keypairs API policies with deprecated rules
disabled, but scope checking still disabled.
"""
without_deprecated_rules = True
def setUp(self):
super(KeypairsNoLegacyNoScopeTest, self).setUp()
class KeypairsScopeTypePolicyTest(KeypairsPolicyTest):
@ -180,35 +158,3 @@ class KeypairsNoLegacyPolicyTest(KeypairsScopeTypePolicyTest):
access system APIs.
"""
without_deprecated_rules = True
def setUp(self):
super(KeypairsNoLegacyPolicyTest, self).setUp()
# Check that system admin is able to create, delete and get
# other users keypairs.
self.admin_authorized_contexts = [
self.system_admin_context]
# Check that system non-admin is not able to create, delete and get
# other users keypairs.
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.project_admin_context, self.project_member_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]
# Check that system reader is able to get
# other users keypairs.
self.system_reader_authorized_contexts = [
self.system_admin_context, self.system_member_context,
self.system_reader_context]
# Check that non-system reader is not able to get
# other users keypairs.
self.system_reader_unauthorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]

View File

@ -10,15 +10,20 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
import mock
from nova.api.openstack.compute import limits
import nova.conf
from nova.policies import base as base_policy
from nova.policies import limits as limits_policies
from nova import quota
from nova.tests.unit.api.openstack import fakes
from nova.tests.unit.policies import base
CONF = nova.conf.CONF
class LimitsPolicyTest(base.BasePolicyTest):
"""Test Limits APIs policies with all possible context.
@ -55,48 +60,52 @@ class LimitsPolicyTest(base.BasePolicyTest):
mock_get_project_quotas.start()
# Check that everyone is able to get their limits
self.everyone_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.project_member_context, self.other_project_member_context,
self.project_foo_context, self.project_reader_context,
self.other_project_reader_context,
]
self.everyone_unauthorized_contexts = []
self.everyone_authorized_contexts = self.all_contexts
# Check that system reader is able to get other projects limit.
# NOTE(gmann): Until old default rule which is admin_api is
# deprecated and not removed, project admin and legacy admin
# will be able to get limit. This make sure that existing
# tokens will keep working even we have changed this policy defaults
# to reader role.
self.reader_authorized_contexts = [
# With legacy rule, any admin is able to get other projects limit.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.system_member_context,
self.system_reader_context]
# Check that non-admin is not able to get other projects limit.
self.reader_unauthorized_contexts = [
self.system_foo_context, self.project_member_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]
self.project_admin_context]
def test_get_limits_policy(self):
rule_name = limits_policies.BASE_POLICY_NAME
self.common_policy_check(self.everyone_authorized_contexts,
self.everyone_unauthorized_contexts,
rule_name, self.controller.index,
self.req)
self.common_policy_auth(self.everyone_authorized_contexts,
rule_name, self.controller.index,
self.req)
def test_get_other_limits_policy(self):
rule = limits_policies.BASE_POLICY_NAME
self.policy.set_rules({rule: "@"}, overwrite=False)
req = fakes.HTTPRequest.blank('/?tenant_id=faketenant')
rule_name = limits_policies.OTHER_PROJECT_LIMIT_POLICY_NAME
self.common_policy_check(self.reader_authorized_contexts,
self.reader_unauthorized_contexts,
rule_name, self.controller.index,
req)
if not CONF.oslo_policy.enforce_scope:
check_rule = rule_name
else:
check_rule = functools.partial(base.rule_if_system,
rule, rule_name)
self.common_policy_auth(self.project_admin_authorized_contexts,
check_rule, self.controller.index,
req)
class LimitsNoLegacyNoScopeTest(LimitsPolicyTest):
"""Test Flavor Access API policies with deprecated rules
disabled, but scope checking still disabled.
"""
without_deprecated_rules = True
rules_without_deprecation = {
limits_policies.OTHER_PROJECT_LIMIT_POLICY_NAME:
base_policy.PROJECT_ADMIN}
def setUp(self):
super(LimitsNoLegacyNoScopeTest, self).setUp()
# Even with no legacy rule, any admin can get other project
# limits.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
class LimitsScopeTypePolicyTest(LimitsPolicyTest):
@ -114,22 +123,18 @@ class LimitsScopeTypePolicyTest(LimitsPolicyTest):
super(LimitsScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Check that system reader is able to get other projects limit.
self.reader_authorized_contexts = [
self.system_admin_context, self.system_member_context,
self.system_reader_context]
# Check that non-system reader is not able toget other
# projects limit.
self.reader_unauthorized_contexts = [
self.legacy_admin_context, self.system_foo_context,
self.project_admin_context, self.project_member_context,
# With Scope enable, system users no longer allowed.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context]
self.everyone_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_member_context, self.project_reader_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
self.project_foo_context, self.other_project_reader_context
]
class LimitsNoLegacyPolicyTest(LimitsScopeTypePolicyTest):
class LimitsScopeTypeNoLegacyPolicyTest(LimitsScopeTypePolicyTest):
"""Test Limits APIs policies with system scope enabled,
and no more deprecated rules that allow the legacy admin API to
access system APIs.
@ -137,4 +142,17 @@ class LimitsNoLegacyPolicyTest(LimitsScopeTypePolicyTest):
without_deprecated_rules = True
rules_without_deprecation = {
limits_policies.OTHER_PROJECT_LIMIT_POLICY_NAME:
base_policy.SYSTEM_READER}
base_policy.PROJECT_ADMIN}
def setUp(self):
super(LimitsScopeTypeNoLegacyPolicyTest, self).setUp()
# With no legacy and scope enable, only project level admin
# will get other projects limit.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context]
self.everyone_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.project_member_context, self.project_reader_context,
self.other_project_member_context,
self.project_foo_context, self.other_project_reader_context
]

View File

@ -47,28 +47,19 @@ class MigrateServerPolicyTest(base.BasePolicyTest):
task_state=None, launched_at=timeutils.utcnow())
self.mock_get.return_value = self.instance
# Check that admin is able to migrate the server.
self.admin_authorized_contexts = [
# With legacy rule, any admin is able to migrate
# the server.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context
]
# Check that non-admin is not able to migrate the server
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
self.project_admin_context]
@mock.patch('nova.compute.api.API.resize')
def test_migrate_server_policy(self, mock_resize):
rule_name = ms_policies.POLICY_ROOT % 'migrate'
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller._migrate,
self.req, self.instance.uuid,
body={'migrate': None})
self.common_policy_auth(self.project_admin_authorized_contexts,
rule_name, self.controller._migrate,
self.req, self.instance.uuid,
body={'migrate': None})
@mock.patch('nova.compute.api.API.live_migrate')
def test_migrate_live_server_policy(self, mock_live_migrate):
@ -78,11 +69,23 @@ class MigrateServerPolicyTest(base.BasePolicyTest):
'block_migration': "False",
'disk_over_commit': "False"}
}
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller._migrate_live,
self.req, self.instance.uuid,
body=body)
self.common_policy_auth(self.project_admin_authorized_contexts,
rule_name, self.controller._migrate_live,
self.req, self.instance.uuid,
body=body)
class MigrateServerNoLegacyNoScopeTest(MigrateServerPolicyTest):
"""Test Server Migrations API policies with deprecated rules
disabled, but scope checking still disabled.
"""
without_deprecated_rules = True
def setUp(self):
super(MigrateServerNoLegacyNoScopeTest, self).setUp()
self.project_admin_authorized_contexts = [
self.project_admin_context]
class MigrateServerScopeTypePolicyTest(MigrateServerPolicyTest):
@ -99,32 +102,27 @@ class MigrateServerScopeTypePolicyTest(MigrateServerPolicyTest):
def setUp(self):
super(MigrateServerScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# With scope enabled, system admin is not allowed.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context]
class MigrateServerNoLegacyPolicyTest(MigrateServerScopeTypePolicyTest):
class MigrateServerScopeTypeNoLegacyPolicyTest(
MigrateServerScopeTypePolicyTest):
"""Test Migrate Server APIs policies with system scope enabled,
and no more deprecated rules.
"""
without_deprecated_rules = True
def setUp(self):
super(MigrateServerNoLegacyPolicyTest, self).setUp()
# Check that system admin is able to migrate the server.
self.admin_authorized_contexts = [
self.system_admin_context
]
# Check that non system admin is not able to migrate the server
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
super(MigrateServerScopeTypeNoLegacyPolicyTest, self).setUp()
# with no legacy rule and scope enable., only project admin is able to
# migrate the server.
self.project_admin_authorized_contexts = [self.project_admin_context]
class MigrateServerOverridePolicyTest(MigrateServerNoLegacyPolicyTest):
class MigrateServerOverridePolicyTest(
MigrateServerScopeTypeNoLegacyPolicyTest):
"""Test Migrate Server APIs policies with system and project scoped
but default to system roles only are allowed for project roles
if override by operators. This test is with system scope enable
@ -138,21 +136,11 @@ class MigrateServerOverridePolicyTest(MigrateServerNoLegacyPolicyTest):
# NOTE(gmann): override the rule to project member and verify it
# work as policy is system and projct scoped.
self.policy.set_rules({
rule_migrate: base_policy.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
rule_live_migrate: base_policy.PROJECT_MEMBER_OR_SYSTEM_ADMIN},
rule_migrate: base_policy.PROJECT_MEMBER,
rule_live_migrate: base_policy.PROJECT_MEMBER},
overwrite=False)
# Check that system admin or project scoped role as override above
# Check that project member role as override above
# is able to migrate the server
self.admin_authorized_contexts = [
self.system_admin_context,
self.project_admin_authorized_contexts = [
self.project_admin_context, self.project_member_context]
# Check that non-system admin or project role is not able to
# migrate the server
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]

View File

@ -32,27 +32,25 @@ class MigrationsPolicyTest(base.BasePolicyTest):
self.controller = migrations.MigrationsController()
self.req = fakes.HTTPRequest.blank('')
# Check that admin is able to list migrations.
self.reader_authorized_contexts = [
# With legacy rule, any admin is able to list migrations.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context, self.system_member_context,
self.system_reader_context
]
# Check that non-admin is not able to list migrations.
self.reader_unauthorized_contexts = [
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
self.project_admin_context]
@mock.patch('nova.compute.api.API.get_migrations')
def test_list_migrations_policy(self, mock_migration):
rule_name = migrations_policies.POLICY_ROOT % 'index'
self.common_policy_check(self.reader_authorized_contexts,
self.reader_unauthorized_contexts,
rule_name, self.controller.index,
self.req)
self.common_policy_auth(self.project_admin_authorized_contexts,
rule_name, self.controller.index,
self.req)
class MigrationsNoLegacyNoScopeTest(MigrationsPolicyTest):
"""Test Migrations API policies with deprecated rules
disabled, but scope checking still disabled.
"""
without_deprecated_rules = True
class MigrationsScopeTypePolicyTest(MigrationsPolicyTest):
@ -70,15 +68,14 @@ class MigrationsScopeTypePolicyTest(MigrationsPolicyTest):
super(MigrationsScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Check that system reader is able to list migrations.
self.reader_authorized_contexts = [
self.system_admin_context, self.system_member_context,
self.system_reader_context]
# Check that non system reader is not able to list migrations.
self.reader_unauthorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
# With scope enabled, system admin is not allowed.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context]
class MigrationsScopeTypeNoLegacyPolicyTest(
MigrationsScopeTypePolicyTest):
"""Test Migrations APIs policies with system scope enabled,
and no more deprecated rules.
"""
without_deprecated_rules = True

View File

@ -46,26 +46,29 @@ class ServerDiagnosticsPolicyTest(base.BasePolicyTest):
task_state=None, launched_at=timeutils.utcnow())
self.mock_get.return_value = self.instance
# Check that admin is able to get server diagnostics.
self.admin_authorized_contexts = [
# With legacy rule, any admin is able get server diagnostics.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context
]
# Check that non-admin is not able to get server diagnostics.
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
self.project_admin_context]
def test_server_diagnostics_policy(self):
rule_name = policies.BASE_POLICY_NAME
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller.index,
self.req, self.instance.uuid)
self.common_policy_auth(self.project_admin_authorized_contexts,
rule_name, self.controller.index,
self.req, self.instance.uuid)
class ServerDiagnosticsNoLegacyNoScopeTest(ServerDiagnosticsPolicyTest):
"""Test Server Diagnostics API policies with deprecated rules
disabled, but scope checking still disabled.
"""
without_deprecated_rules = True
def setUp(self):
super(ServerDiagnosticsNoLegacyNoScopeTest, self).setUp()
self.project_admin_authorized_contexts = [
self.project_admin_context]
class ServerDiagnosticsScopeTypePolicyTest(ServerDiagnosticsPolicyTest):
@ -82,9 +85,12 @@ class ServerDiagnosticsScopeTypePolicyTest(ServerDiagnosticsPolicyTest):
def setUp(self):
super(ServerDiagnosticsScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# With scope enabled, system admin is not allowed.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context]
class ServerDiagnosticsNoLegacyPolicyTest(
class ServerDiagnosticsScopeTypeNoLegacyPolicyTest(
ServerDiagnosticsScopeTypePolicyTest):
"""Test Server Diagnostics APIs policies with system scope enabled,
and no more deprecated rules.
@ -92,23 +98,14 @@ class ServerDiagnosticsNoLegacyPolicyTest(
without_deprecated_rules = True
def setUp(self):
super(ServerDiagnosticsNoLegacyPolicyTest, self).setUp()
# Check that system admin is able to get server diagnostics.
self.admin_authorized_contexts = [
self.system_admin_context
]
# Check that non system admin is not able to get server diagnostics.
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
super(ServerDiagnosticsScopeTypeNoLegacyPolicyTest, self).setUp()
# with no legacy rule and scope enable., only project admin is able to
# get server diagnostics.
self.project_admin_authorized_contexts = [self.project_admin_context]
class ServerDiagnosticsOverridePolicyTest(ServerDiagnosticsNoLegacyPolicyTest):
class ServerDiagnosticsOverridePolicyTest(
ServerDiagnosticsScopeTypeNoLegacyPolicyTest):
"""Test Server Diagnostics APIs policies with system and project scoped
but default to system roles only are allowed for project roles
if override by operators. This test is with system scope enable
@ -119,22 +116,12 @@ class ServerDiagnosticsOverridePolicyTest(ServerDiagnosticsNoLegacyPolicyTest):
super(ServerDiagnosticsOverridePolicyTest, self).setUp()
rule = policies.BASE_POLICY_NAME
# NOTE(gmann): override the rule to project member and verify it
# work as policy is system and projct scoped.
# work as policy is projct scoped.
self.policy.set_rules({
rule: base_policy.PROJECT_MEMBER_OR_SYSTEM_ADMIN},
rule: base_policy.PROJECT_MEMBER},
overwrite=False)
# Check that system admin or project scoped role as override above
# Check that project member role as override above
# is able to get server diagnostics.
self.admin_authorized_contexts = [
self.system_admin_context,
self.project_admin_authorized_contexts = [
self.project_admin_context, self.project_member_context]
# Check that non-system admin or project role is not able to
# get server diagnostics.
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.other_project_member_context,
self.project_foo_context, self.project_reader_context,
self.other_project_reader_context,
]

View File

@ -33,20 +33,12 @@ class ServerExternalEventsPolicyTest(base.BasePolicyTest):
self.controller = ev.ServerExternalEventsController()
self.req = fakes.HTTPRequest.blank('')
# Check that admin is able to create the server external events.
self.admin_authorized_contexts = [
# With legacy rule and no scope checks, all admin can
# create the server external events.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context
]
# Check that non-admin is not able to create the server
# external events.
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
@mock.patch('nova.compute.api.API.external_instance_event')
@mock.patch('nova.objects.InstanceMappingList.get_by_instance_uuids')
@ -58,10 +50,18 @@ class ServerExternalEventsPolicyTest(base.BasePolicyTest):
'server_uuid': uuids.fake_id,
'status': 'completed'}]
}
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller.create,
self.req, body=body)
self.common_policy_auth(self.project_admin_authorized_contexts,
rule_name, self.controller.create,
self.req, body=body)
class ServerExternalEventsNoLegacyNoScopeTest(
ServerExternalEventsPolicyTest):
"""Test Server External Events API policies with deprecated rules
disabled, but scope checking still disabled.
"""
without_deprecated_rules = True
class ServerExternalEventsScopeTypePolicyTest(ServerExternalEventsPolicyTest):
@ -79,23 +79,12 @@ class ServerExternalEventsScopeTypePolicyTest(ServerExternalEventsPolicyTest):
super(ServerExternalEventsScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# Check that admin is able to create the server external events.
self.admin_authorized_contexts = [
self.system_admin_context,
]
# Check that non-admin is not able to create the server
# external events.
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
# With scope checks, system admin is not allowed.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context]
class ServerExternalEventsNoLegacyPolicyTest(
class ServerExternalEventsScopeTypeNoLegacyPolicyTest(
ServerExternalEventsScopeTypePolicyTest):
"""Test Server External Events APIs policies with system scope enabled,
and no more deprecated rules.

View File

@ -45,42 +45,18 @@ class ServerMigrationsPolicyTest(base.BasePolicyTest):
vm_state=vm_states.ACTIVE)
self.mock_get.return_value = self.instance
# Check that admin is able to perform operations
# With legacy rule, any admin is able to perform operations
# for server migrations.
self.admin_authorized_contexts = [
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.project_admin_context]
# Check that non-admin is not able to perform operations
# for server migrations.
self.admin_unauthorized_contexts = [
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]
# Check that system-reader are able to perform operations
# for server migrations.
self.reader_authorized_contexts = [
self.system_admin_context, self.system_member_context,
self.system_reader_context, self.legacy_admin_context,
self.project_admin_context]
# Check that non-system-reader are not able to perform operations
# for server migrations.
self.reader_unauthorized_contexts = [
self.system_foo_context, self.other_project_member_context,
self.project_foo_context, self.project_member_context,
self.project_reader_context,
self.other_project_reader_context,
]
@mock.patch('nova.compute.api.API.get_migrations_in_progress_by_instance')
def test_list_server_migrations_policy(self, mock_get):
rule_name = policies.POLICY_ROOT % 'index'
self.common_policy_check(self.reader_authorized_contexts,
self.reader_unauthorized_contexts,
rule_name, self.controller.index,
self.req, self.instance.uuid)
self.common_policy_auth(self.project_admin_authorized_contexts,
rule_name, self.controller.index,
self.req, self.instance.uuid)
@mock.patch('nova.api.openstack.compute.server_migrations.output')
@mock.patch('nova.compute.api.API.get_migration_by_id_and_instance')
@ -90,27 +66,37 @@ class ServerMigrationsPolicyTest(base.BasePolicyTest):
migration_type='live-migration',
status='running',
)
self.common_policy_check(self.reader_authorized_contexts,
self.reader_unauthorized_contexts,
rule_name, self.controller.show,
self.req, self.instance.uuid, 11111)
self.common_policy_auth(self.project_admin_authorized_contexts,
rule_name, self.controller.show,
self.req, self.instance.uuid, 11111)
@mock.patch('nova.compute.api.API.live_migrate_abort')
def test_delete_server_migrations_policy(self, mock_delete):
rule_name = policies.POLICY_ROOT % 'delete'
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller.delete,
self.req, self.instance.uuid, 11111)
self.common_policy_auth(self.project_admin_authorized_contexts,
rule_name, self.controller.delete,
self.req, self.instance.uuid, 11111)
@mock.patch('nova.compute.api.API.live_migrate_force_complete')
def test_force_delete_server_migrations_policy(self, mock_force):
rule_name = policies.POLICY_ROOT % 'force_complete'
self.common_policy_check(self.admin_authorized_contexts,
self.admin_unauthorized_contexts,
rule_name, self.controller._force_complete,
self.req, self.instance.uuid, 11111,
body={"force_complete": None})
self.common_policy_auth(self.project_admin_authorized_contexts,
rule_name, self.controller._force_complete,
self.req, self.instance.uuid, 11111,
body={"force_complete": None})
class ServerMigrationsNoLegacyNoScopeTest(ServerMigrationsPolicyTest):
"""Test Server Migrations API policies with deprecated rules
disabled, but scope checking still disabled.
"""
without_deprecated_rules = True
def setUp(self):
super(ServerMigrationsNoLegacyNoScopeTest, self).setUp()
self.project_admin_authorized_contexts = [
self.project_admin_context]
class ServerMigrationsScopeTypePolicyTest(ServerMigrationsPolicyTest):
@ -126,48 +112,27 @@ class ServerMigrationsScopeTypePolicyTest(ServerMigrationsPolicyTest):
def setUp(self):
super(ServerMigrationsScopeTypePolicyTest, self).setUp()
self.flags(enforce_scope=True, group="oslo_policy")
# With scope enabled, system admin is not allowed.
self.project_admin_authorized_contexts = [
self.legacy_admin_context, self.project_admin_context]
class ServerMigrationsNoLegacyPolicyTest(ServerMigrationsScopeTypePolicyTest):
class ServerMigrationsScopeTypeNoLegacyPolicyTest(
ServerMigrationsScopeTypePolicyTest):
"""Test Server Migrations APIs policies with system scope enabled,
and no more deprecated rules.
"""
without_deprecated_rules = True
def setUp(self):
super(ServerMigrationsNoLegacyPolicyTest, self).setUp()
super(ServerMigrationsScopeTypeNoLegacyPolicyTest, self).setUp()
# Check that admin is able to perform operations
# for server migrations.
self.admin_authorized_contexts = [
self.system_admin_context
]
# Check that non-admin is not able to perform operations
# for server migrations.
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context, self.project_member_context,
self.project_reader_context, self.project_foo_context,
self.other_project_member_context,
self.other_project_reader_context,
]
# Check that system reader is able to perform operations
# for server migrations.
self.reader_authorized_contexts = [
self.system_admin_context, self.system_member_context,
self.system_reader_context]
# Check that non-system-reader is not able to perform operations
# for server migrations.
self.reader_unauthorized_contexts = [
self.legacy_admin_context, self.project_admin_context,
self.system_foo_context, self.project_member_context,
self.other_project_member_context,
self.other_project_reader_context,
self.project_foo_context, self.project_reader_context
]
self.project_admin_authorized_contexts = [self.project_admin_context]
class ServerMigrationsOverridePolicyTest(ServerMigrationsNoLegacyPolicyTest):
class ServerMigrationsOverridePolicyTest(
ServerMigrationsScopeTypeNoLegacyPolicyTest):
"""Test Server Migrations APIs policies with system and project scoped
but default to system roles only are allowed for project roles
if override by operators. This test is with system scope enable
@ -181,38 +146,16 @@ class ServerMigrationsOverridePolicyTest(ServerMigrationsNoLegacyPolicyTest):
rule_force = policies.POLICY_ROOT % 'force_complete'
rule_delete = policies.POLICY_ROOT % 'delete'
# NOTE(gmann): override the rule to project member and verify it
# work as policy is system and projct scoped.
# work as policy is project scoped.
self.policy.set_rules({
rule_show: base_policy.PROJECT_READER_OR_SYSTEM_READER,
rule_list: base_policy.PROJECT_READER_OR_SYSTEM_READER,
rule_force: base_policy.PROJECT_MEMBER_OR_SYSTEM_ADMIN,
rule_delete: base_policy.PROJECT_MEMBER_OR_SYSTEM_ADMIN},
rule_show: base_policy.PROJECT_READER,
rule_list: base_policy.PROJECT_READER,
rule_force: base_policy.PROJECT_READER,
rule_delete: base_policy.PROJECT_READER},
overwrite=False)
# Check that system admin or project scoped role as override above
# Check that project reader as override above
# is able to migrate the server
self.admin_authorized_contexts = [
self.system_admin_context,
self.project_admin_context, self.project_member_context]
# Check that non-system admin or project role is not able to
# migrate the server
self.admin_unauthorized_contexts = [
self.legacy_admin_context, self.system_member_context,
self.system_reader_context, self.system_foo_context,
self.other_project_member_context,
self.project_foo_context, self.project_reader_context,
self.other_project_reader_context,
]
# Check that system reader is able to perform operations
# for server migrations.
self.reader_authorized_contexts = [
self.system_admin_context, self.system_member_context,
self.system_reader_context, self.project_admin_context,
self.project_member_context, self.project_reader_context]
# Check that non-system-reader is not able to perform operations
# for server migrations.
self.reader_unauthorized_contexts = [
self.legacy_admin_context, self.system_foo_context,
self.other_project_member_context, self.project_foo_context,
self.other_project_reader_context,
]
self.project_admin_authorized_contexts = [
self.project_admin_context, self.project_member_context,
self.project_reader_context]