RBAC: Fix port query and deletion for network owner
Network owner should be able to get all ports and delete ports on network as policy allowed. But current code fails to support this. Current model query for Port is still based on tenant_id, it forgets to check for network owner when context tenant_id is not port owner. For port_delete action, policy will generate checking rules for port attributes, such as: rule:delete_port:binding:vif_details rule:delete_port:binding:vif_type This doesn't make sense, only single policy rule "rule:delete_port" is enough to check. This patch fixes this issue. Co-Authored-By: Kevin Benton <kevinbenton@buttewifi.com> Change-Id: I55328cb43207654b9bb4cfb732923982d020ab0a Closes-Bug: #1498790
This commit is contained in:
parent
cab6c7c67a
commit
67abf5f9f0
@ -707,12 +707,10 @@ class Controller(object):
|
||||
network_owner = network['tenant_id']
|
||||
|
||||
if network_owner != resource_item['tenant_id']:
|
||||
msg = _("Tenant %(tenant_id)s not allowed to "
|
||||
"create %(resource)s on this network")
|
||||
raise webob.exc.HTTPForbidden(msg % {
|
||||
"tenant_id": resource_item['tenant_id'],
|
||||
"resource": self._resource,
|
||||
})
|
||||
# NOTE(kevinbenton): we raise a 404 to hide the existence of the
|
||||
# network from the tenant since they don't have access to it.
|
||||
msg = _('The resource could not be found.')
|
||||
raise webob.exc.HTTPNotFound(msg)
|
||||
|
||||
|
||||
def create_resource(collection, resource, plugin, params, allow_bulk=False,
|
||||
|
@ -312,3 +312,11 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
|
||||
return [{'subnet_id': ip["subnet_id"],
|
||||
'ip_address': ip["ip_address"]}
|
||||
for ip in ips]
|
||||
|
||||
def _port_filter_hook(self, context, original_model, conditions):
|
||||
# Apply the port filter only in non-admin and non-advsvc context
|
||||
if self.model_query_scope(context, original_model):
|
||||
conditions |= (
|
||||
(context.tenant_id == models_v2.Network.tenant_id) &
|
||||
(models_v2.Network.id == models_v2.Port.network_id))
|
||||
return conditions
|
||||
|
@ -1424,3 +1424,10 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
|
||||
device_id=device_id)
|
||||
if tenant_id != router['tenant_id']:
|
||||
raise n_exc.DeviceIDNotOwnedByTenant(device_id=device_id)
|
||||
|
||||
db_base_plugin_common.DbBasePluginCommon.register_model_query_hook(
|
||||
models_v2.Port,
|
||||
"port",
|
||||
None,
|
||||
'_port_filter_hook',
|
||||
None)
|
||||
|
@ -417,8 +417,6 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon):
|
||||
def delete_port(self, context, port_id):
|
||||
query = (context.session.query(models_v2.Port).
|
||||
enable_eagerloads(False).filter_by(id=port_id))
|
||||
if not context.is_admin:
|
||||
query = query.filter_by(tenant_id=context.tenant_id)
|
||||
# Use of the ORM mapper is needed for ensuring appropriate resource
|
||||
# tracking; otherwise SQL Alchemy events won't be triggered.
|
||||
# For more info check 'caveats' in doc/source/devref/quota.rst
|
||||
|
@ -60,10 +60,13 @@ def refresh(policy_file=None):
|
||||
|
||||
|
||||
def get_resource_and_action(action, pluralized=None):
|
||||
"""Extract resource and action (write, read) from api operation."""
|
||||
"""Return resource and enforce_attr_based_check(boolean) per
|
||||
resource and action extracted from api operation.
|
||||
"""
|
||||
data = action.split(':', 1)[0].split('_', 1)
|
||||
resource = pluralized or ("%ss" % data[-1])
|
||||
return (resource, data[0] != 'get')
|
||||
enforce_attr_based_check = data[0] not in ('get', 'delete')
|
||||
return (resource, enforce_attr_based_check)
|
||||
|
||||
|
||||
def set_rules(policies, overwrite=True):
|
||||
@ -150,9 +153,9 @@ def _build_match_rule(action, target, pluralized):
|
||||
(e.g.: create_router:external_gateway_info:network_id)
|
||||
"""
|
||||
match_rule = policy.RuleCheck('rule', action)
|
||||
resource, is_write = get_resource_and_action(action, pluralized)
|
||||
# Attribute-based checks shall not be enforced on GETs
|
||||
if is_write:
|
||||
resource, enforce_attr_based_check = get_resource_and_action(
|
||||
action, pluralized)
|
||||
if enforce_attr_based_check:
|
||||
# assigning to variable with short name for improving readability
|
||||
res_map = attributes.RESOURCE_ATTRIBUTE_MAP
|
||||
if resource in res_map:
|
||||
|
@ -276,11 +276,6 @@ class RBACSharedNetworksTest(base.BaseAdminNetworkTest):
|
||||
@test.attr(type='smoke')
|
||||
@test.idempotent_id('86c3529b-1231-40de-803c-beefbeefbeef')
|
||||
def test_tenant_can_delete_port_on_own_network(self):
|
||||
# TODO(kevinbenton): make adjustments to the db lookup to
|
||||
# make this work.
|
||||
msg = "Non-admin cannot currently delete other's ports."
|
||||
raise self.skipException(msg)
|
||||
# pylint: disable=unreachable
|
||||
net = self.create_network() # owned by self.client
|
||||
self.client.create_rbac_policy(
|
||||
object_type='network', object_id=net['id'],
|
||||
|
@ -1121,6 +1121,29 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
|
||||
self._test_list_resources('port', [port2],
|
||||
neutron_context=n_context)
|
||||
|
||||
def test_list_ports_for_network_owner(self):
|
||||
with self.network(tenant_id='tenant_1') as network:
|
||||
with self.subnet(network) as subnet:
|
||||
with self.port(subnet, tenant_id='tenant_1') as port1,\
|
||||
self.port(subnet, tenant_id='tenant_2') as port2:
|
||||
# network owner request, should return all ports
|
||||
port_res = self._list_ports(
|
||||
'json', set_context=True, tenant_id='tenant_1')
|
||||
port_list = self.deserialize('json', port_res)['ports']
|
||||
port_ids = [p['id'] for p in port_list]
|
||||
self.assertEqual(2, len(port_list))
|
||||
self.assertIn(port1['port']['id'], port_ids)
|
||||
self.assertIn(port2['port']['id'], port_ids)
|
||||
|
||||
# another tenant request, only return ports belong to it
|
||||
port_res = self._list_ports(
|
||||
'json', set_context=True, tenant_id='tenant_2')
|
||||
port_list = self.deserialize('json', port_res)['ports']
|
||||
port_ids = [p['id'] for p in port_list]
|
||||
self.assertEqual(1, len(port_list))
|
||||
self.assertNotIn(port1['port']['id'], port_ids)
|
||||
self.assertIn(port2['port']['id'], port_ids)
|
||||
|
||||
def test_list_ports_with_sort_native(self):
|
||||
if self._skip_native_sorting:
|
||||
self.skipTest("Skip test for not implemented sorting feature")
|
||||
@ -1226,6 +1249,16 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
|
||||
self._show('ports', port['port']['id'],
|
||||
expected_code=webob.exc.HTTPNotFound.code)
|
||||
|
||||
def test_delete_port_by_network_owner(self):
|
||||
with self.network(tenant_id='tenant_1') as network:
|
||||
with self.subnet(network) as subnet:
|
||||
with self.port(subnet, tenant_id='tenant_2') as port:
|
||||
self._delete(
|
||||
'ports', port['port']['id'],
|
||||
neutron_context=context.Context('', 'tenant_1'))
|
||||
self._show('ports', port['port']['id'],
|
||||
expected_code=webob.exc.HTTPNotFound.code)
|
||||
|
||||
def test_update_port(self):
|
||||
with self.port() as port:
|
||||
data = {'port': {'admin_state_up': False}}
|
||||
|
Loading…
x
Reference in New Issue
Block a user