Add advsvc role to neutron policy file

Add in a default "advsvc" user and the logic in the Neutron policy
infrastructure which will allow this user to create/get/update/delete
ports on other tenants networks, as well as view other tenants
networks. This is for the use case of letting advanced services have
a user to put ports on other tenants networks. By default, we do not
define any roles for the policy "context_is_advsvc", but rely on
operators to specify the likely value of "role advsvc".

DocImpact

Closes-Bug: #1331836

Change-Id: I94cb3383eb1fed793934719603f888dbbdbbd85a
Co-Authored-By: Susanne Balle <sleipnir012@gmail.com>
This commit is contained in:
Kyle Mestery 2014-06-18 11:04:52 +00:00 committed by Kyle Mestery
parent 20f8f9422b
commit d4f00659eb
7 changed files with 84 additions and 26 deletions

View File

@ -1,6 +1,7 @@
{
"context_is_admin": "role:admin",
"admin_or_owner": "rule:context_is_admin or tenant_id:%(tenant_id)s",
"context_is_advsvc": "role:advsvc",
"admin_or_network_owner": "rule:context_is_admin or tenant_id:%(network:tenant_id)s",
"admin_only": "rule:context_is_admin",
"regular_user": "",
@ -15,7 +16,7 @@
"delete_subnet": "rule:admin_or_network_owner",
"create_network": "",
"get_network": "rule:admin_or_owner or rule:shared or rule:external",
"get_network": "rule:admin_or_owner or rule:shared or rule:external or rule:context_is_advsvc",
"get_network:router:external": "rule:regular_user",
"get_network:segments": "rule:admin_only",
"get_network:provider:network_type": "rule:admin_only",
@ -38,25 +39,25 @@
"delete_network": "rule:admin_or_owner",
"create_port": "",
"create_port:mac_address": "rule:admin_or_network_owner",
"create_port:fixed_ips": "rule:admin_or_network_owner",
"create_port:port_security_enabled": "rule:admin_or_network_owner",
"create_port:mac_address": "rule:admin_or_network_owner or rule:context_is_advsvc",
"create_port:fixed_ips": "rule:admin_or_network_owner or rule:context_is_advsvc",
"create_port:port_security_enabled": "rule:admin_or_network_owner or rule:context_is_advsvc",
"create_port:binding:host_id": "rule:admin_only",
"create_port:binding:profile": "rule:admin_only",
"create_port:mac_learning_enabled": "rule:admin_or_network_owner",
"get_port": "rule:admin_or_owner",
"create_port:mac_learning_enabled": "rule:admin_or_network_owner or rule:context_is_advsvc",
"get_port": "rule:admin_or_owner or rule:context_is_advsvc",
"get_port:queue_id": "rule:admin_only",
"get_port:binding:vif_type": "rule:admin_only",
"get_port:binding:vif_details": "rule:admin_only",
"get_port:binding:host_id": "rule:admin_only",
"get_port:binding:profile": "rule:admin_only",
"update_port": "rule:admin_or_owner",
"update_port:fixed_ips": "rule:admin_or_network_owner",
"update_port:port_security_enabled": "rule:admin_or_network_owner",
"update_port": "rule:admin_or_owner or rule:context_is_advsvc",
"update_port:fixed_ips": "rule:admin_or_network_owner or rule:context_is_advsvc",
"update_port:port_security_enabled": "rule:admin_or_network_owner or rule:context_is_advsvc",
"update_port:binding:host_id": "rule:admin_only",
"update_port:binding:profile": "rule:admin_only",
"update_port:mac_learning_enabled": "rule:admin_or_network_owner",
"delete_port": "rule:admin_or_owner",
"update_port:mac_learning_enabled": "rule:admin_or_network_owner or rule:context_is_advsvc",
"delete_port": "rule:admin_or_owner or rule:context_is_advsvc",
"get_router:ha": "rule:admin_only",
"create_router": "rule:regular_user",

View File

@ -652,7 +652,7 @@ class Controller(object):
def _validate_network_tenant_ownership(self, request, resource_item):
# TODO(salvatore-orlando): consider whether this check can be folded
# in the policy engine
if (request.context.is_admin or
if (request.context.is_admin or request.context.is_advsvc or
self._resource not in ('port', 'subnet')):
return
network = self._plugin.get_network(

View File

@ -65,6 +65,7 @@ class ContextBase(common_context.RequestContext):
self.timestamp = timestamp
self._session = None
self.roles = roles or []
self.is_advsvc = policy.check_is_advsvc(self)
if self.is_admin is None:
self.is_admin = policy.check_is_admin(self)
elif self.is_admin and load_admin_roles:

View File

@ -70,13 +70,18 @@ class CommonDbMixin(object):
"""
return weakref.proxy(self)
def model_query_scope(self, context, model):
# NOTE(jkoelker) non-admin queries are scoped to their tenant_id
# NOTE(salvatore-orlando): unless the model allows for shared objects
# NOTE(mestery): Or the user has the advsvc role
return ((not context.is_admin and hasattr(model, 'tenant_id')) and
(not context.is_advsvc and hasattr(model, 'tenant_id')))
def _model_query(self, context, model):
query = context.session.query(model)
# define basic filter condition for model query
# NOTE(jkoelker) non-admin queries are scoped to their tenant_id
# NOTE(salvatore-orlando): unless the model allows for shared objects
query_filter = None
if not context.is_admin and hasattr(model, 'tenant_id'):
if self.model_query_scope(context, model):
if hasattr(model, 'shared'):
query_filter = ((model.tenant_id == context.tenant_id) |
(model.shared == sql.true()))

View File

@ -57,8 +57,9 @@ class External_net_db_mixin(object):
def _network_filter_hook(self, context, original_model, conditions):
if conditions is not None and not hasattr(conditions, '__iter__'):
conditions = (conditions, )
# Apply the external network filter only in non-admin context
if not context.is_admin and hasattr(original_model, 'tenant_id'):
# Apply the external network filter only in non-admin and non-advsvc
# context
if self.model_query_scope(context, original_model):
conditions = expr.or_(ExternalNetwork.network_id != expr.null(),
*conditions)
return conditions

View File

@ -39,6 +39,7 @@ LOG = log.getLogger(__name__)
_POLICY_PATH = None
_POLICY_CACHE = {}
ADMIN_CTX_POLICY = 'context_is_admin'
ADVSVC_CTX_POLICY = 'context_is_advsvc'
# Maps deprecated 'extension' policies to new-style policies
DEPRECATED_POLICY_MAP = {
'extension:provider_network':
@ -416,6 +417,19 @@ def check_is_admin(context):
return policy.check(admin_policy, target, credentials)
def check_is_advsvc(context):
"""Verify context has advsvc rights according to policy settings."""
init()
# the target is user-self
credentials = context.to_dict()
target = credentials
# Backward compatibility: if ADVSVC_CTX_POLICY is not
# found, default to validating role:advsvc
advsvc_policy = (ADVSVC_CTX_POLICY in policy._rules
and ADVSVC_CTX_POLICY or 'role:advsvc')
return policy.check(advsvc_policy, target, credentials)
def _extract_roles(rule, roles):
if isinstance(rule, policy.RoleCheck):
roles.append(rule.match.lower())

View File

@ -40,7 +40,7 @@ class PolicyFileTestCase(base.BaseTestCase):
policy.reset()
self.addCleanup(policy.reset)
self.context = context.Context('fake', 'fake', is_admin=False)
self.target = {}
self.target = {'tenant_id': 'fake'}
self.tempdir = self.useFixture(fixtures.TempDir())
def test_modified_policy_reloads(self):
@ -62,6 +62,7 @@ class PolicyFileTestCase(base.BaseTestCase):
# sleep(1)
policy._POLICY_CACHE = {}
policy.init()
self.target = {'tenant_id': 'fake_tenant'}
self.assertRaises(exceptions.PolicyNotAuthorized,
policy.enforce,
self.context,
@ -233,6 +234,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
attributes.RESOURCE_ATTRIBUTE_MAP.update(FAKE_RESOURCE)
self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
"context_is_admin": "role:admin",
"context_is_advsvc": "role:advsvc",
"admin_or_network_owner": "rule:context_is_admin or "
"tenant_id:%(network:tenant_id)s",
"admin_or_owner": ("rule:context_is_admin or "
@ -247,11 +249,13 @@ class NeutronPolicyTestCase(base.BaseTestCase):
"create_network:shared": "rule:admin_only",
"update_network": '@',
"update_network:shared": "rule:admin_only",
"get_network": "rule:admin_or_owner or "
"rule:shared or "
"rule:external",
"create_port:mac": "rule:admin_or_network_owner",
"get_network": "rule:admin_or_owner or rule:shared or "
"rule:external or rule:context_is_advsvc",
"create_port:mac": "rule:admin_or_network_owner or "
"rule:context_is_advsvc",
"update_port": "rule:admin_or_owner or rule:context_is_advsvc",
"get_port": "rule:admin_or_owner or rule:context_is_advsvc",
"delete_port": "rule:admin_or_owner or rule:context_is_advsvc",
"create_something": "rule:admin_or_owner",
"create_something:attr": "rule:admin_or_owner",
"create_something:attr:sub_attr_1": "rule:admin_or_owner",
@ -282,9 +286,9 @@ class NeutronPolicyTestCase(base.BaseTestCase):
fake_manager_instance = fake_manager.return_value
fake_manager_instance.plugin = plugin_klass()
def _test_action_on_attr(self, context, action, attr, value,
def _test_action_on_attr(self, context, action, obj, attr, value,
exception=None, **kwargs):
action = "%s_network" % action
action = "%s_%s" % (action, obj)
target = {'tenant_id': 'the_owner', attr: value}
if kwargs:
target.update(kwargs)
@ -298,7 +302,14 @@ class NeutronPolicyTestCase(base.BaseTestCase):
def _test_nonadmin_action_on_attr(self, action, attr, value,
exception=None, **kwargs):
user_context = context.Context('', "user", roles=['user'])
self._test_action_on_attr(user_context, action, attr,
self._test_action_on_attr(user_context, action, "network", attr,
value, exception, **kwargs)
def _test_advsvc_action_on_attr(self, action, obj, attr, value,
exception=None, **kwargs):
user_context = context.Context('', "user",
roles=['user', 'advsvc'])
self._test_action_on_attr(user_context, action, obj, attr,
value, exception, **kwargs)
def test_nonadmin_write_on_private_fails(self):
@ -313,6 +324,31 @@ class NeutronPolicyTestCase(base.BaseTestCase):
self._test_nonadmin_action_on_attr('create', 'shared', True,
exceptions.PolicyNotAuthorized)
def test_advsvc_get_network_works(self):
self._test_advsvc_action_on_attr('get', 'network', 'shared', False)
def test_advsvc_create_network_fails(self):
self._test_advsvc_action_on_attr('create', 'network', 'shared', False,
exceptions.PolicyNotAuthorized)
def test_advsvc_create_port_works(self):
self._test_advsvc_action_on_attr('create', 'port:mac', 'shared', False)
def test_advsvc_get_port_works(self):
self._test_advsvc_action_on_attr('get', 'port', 'shared', False)
def test_advsvc_update_port_works(self):
kwargs = {const.ATTRIBUTES_TO_UPDATE: ['shared']}
self._test_advsvc_action_on_attr('update', 'port', 'shared', True,
**kwargs)
def test_advsvc_delete_port_works(self):
self._test_advsvc_action_on_attr('delete', 'port', 'shared', False)
def test_advsvc_create_subnet_fails(self):
self._test_advsvc_action_on_attr('create', 'subnet', 'shared', False,
exceptions.PolicyNotAuthorized)
def test_nonadmin_read_on_shared_succeeds(self):
self._test_nonadmin_action_on_attr('get', 'shared', True)