diff --git a/etc/policy.json b/etc/policy.json index 6cbb374febf..c975189796c 100644 --- a/etc/policy.json +++ b/etc/policy.json @@ -12,6 +12,8 @@ "extension:router:view": [["rule:regular_user"]], "extension:router:set": [["rule:admin_only"]], + "extension:router:add_router_interface": [["rule:admin_only"]], + "extension:router:remove_router_interface": [["rule:admin_or_owner"]], "subnets:private:read": [["rule:admin_or_owner"]], "subnets:private:write": [["rule:admin_or_owner"]], diff --git a/quantum/db/l3_db.py b/quantum/db/l3_db.py index f65fca81bf1..980fb0e5d67 100644 --- a/quantum/db/l3_db.py +++ b/quantum/db/l3_db.py @@ -274,12 +274,19 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): pass def add_router_interface(self, context, router_id, interface_info): - # make sure router exists - will raise if not - self._get_router(context, router_id) + # make sure router exists + router = self._get_router(context, router_id) if not interface_info: msg = "Either subnet_id or port_id must be specified" raise q_exc.BadRequest(resource='router', msg=msg) + try: + policy.enforce(context, + "extension:router:add_router_interface", + self._make_router_dict(router)) + except q_exc.PolicyNotAuthorized: + raise l3.RouterNotFound(router_id=router_id) + if 'port_id' in interface_info: if 'subnet_id' in interface_info: msg = "cannot specify both subnet-id and port-id" @@ -327,6 +334,12 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): def remove_router_interface(self, context, router_id, interface_info): # make sure router exists router = self._get_router(context, router_id) + try: + policy.enforce(context, + "extension:router:remove_router_interface", + self._make_router_dict(router)) + except q_exc.PolicyNotAuthorized: + raise l3.RouterNotFound(router_id=router_id) if not interface_info: msg = "Either subnet_id or port_id must be specified" diff --git a/quantum/tests/unit/test_l3_plugin.py b/quantum/tests/unit/test_l3_plugin.py index b96dffe03f1..fae16af4122 100644 --- a/quantum/tests/unit/test_l3_plugin.py +++ b/quantum/tests/unit/test_l3_plugin.py @@ -329,8 +329,9 @@ class L3NatDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase): return self.deserialize('json', res) @contextlib.contextmanager - def router(self, name='router1', admin_status_up=True, fmt='json'): - res = self._create_router(fmt, _uuid(), name=name, + def router(self, name='router1', admin_status_up=True, + fmt='json', tenant_id=_uuid()): + res = self._create_router(fmt, tenant_id, name=name, admin_state_up=admin_status_up) router = self.deserialize(fmt, res) yield router @@ -387,6 +388,41 @@ class L3NatDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase): body = self._show('ports', r_port_id, expected_code=exc.HTTPNotFound.code) + def test_router_add_interface_subnet_with_bad_tenant(self): + with mock.patch('quantum.context.Context.to_dict') as tdict: + tenant_id = _uuid() + admin_context = {'roles': ['admin']} + tenant_context = {'tenant_id': 'bad_tenant', + 'roles': []} + tdict.return_value = admin_context + with self.router(tenant_id=tenant_id) as r: + with self.network(tenant_id=tenant_id) as n: + with self.subnet(network=n) as s: + tdict.return_value = tenant_context + err_code = exc.HTTPNotFound.code + self._router_interface_action('add', + r['router']['id'], + s['subnet']['id'], + None, + err_code) + tdict.return_value = admin_context + body = self._router_interface_action('add', + r['router']['id'], + s['subnet']['id'], + None) + self.assertTrue('port_id' in body) + tdict.return_value = tenant_context + self._router_interface_action('remove', + r['router']['id'], + s['subnet']['id'], + None, + err_code) + tdict.return_value = admin_context + body = self._router_interface_action('remove', + r['router']['id'], + s['subnet']['id'], + None) + def test_router_add_interface_port(self): with self.router() as r: with self.port(no_delete=True) as p: @@ -407,6 +443,42 @@ class L3NatDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase): None, p['port']['id']) + def test_router_add_interface_port_bad_tenant(self): + with mock.patch('quantum.context.Context.to_dict') as tdict: + tenant_id = _uuid() + admin_context = {'roles': ['admin']} + tenant_context = {'tenant_id': 'bad_tenant', + 'roles': []} + tdict.return_value = admin_context + with self.router() as r: + with self.port(no_delete=True) as p: + tdict.return_value = tenant_context + err_code = exc.HTTPNotFound.code + body = self._router_interface_action('add', + r['router']['id'], + None, + p['port']['id'], + err_code) + tdict.return_value = admin_context + body = self._router_interface_action('add', + r['router']['id'], + None, + p['port']['id']) + + tdict.return_value = tenant_context + # clean-up + self._router_interface_action('remove', + r['router']['id'], + None, + p['port']['id'], + err_code) + + tdict.return_value = admin_context + self._router_interface_action('remove', + r['router']['id'], + None, + p['port']['id']) + def test_router_add_interface_dup_subnet1(self): with self.router() as r: with self.subnet() as s: