diff --git a/quark/plugin_modules/ports.py b/quark/plugin_modules/ports.py index 6570fa7..d0c1342 100644 --- a/quark/plugin_modules/ports.py +++ b/quark/plugin_modules/ports.py @@ -39,9 +39,12 @@ STRATEGY = network_strategy.STRATEGY # HACK(amir): RM9305: do not allow a tenant to associate a network to a port # that does not belong to them unless it is publicnet or servicenet -def _raise_if_unauthorized(tenant_id, net): +# NOTE(blogan): allow advanced services, such as lbaas, the ability +# to associate a network to a port that does not belong to them +def _raise_if_unauthorized(context, net): if (not STRATEGY.is_provider_network(net["id"]) and - net["tenant_id"] != tenant_id): + net["tenant_id"] != context.tenant_id and + not context.is_advsvc): raise exceptions.NotAuthorized() @@ -171,7 +174,7 @@ def create_port(context, port): if not net: raise exceptions.NetworkNotFound(net_id=net_id) - _raise_if_unauthorized(context.tenant_id, net) + _raise_if_unauthorized(context, net) # NOTE (Perkins): If a device_id is given, try to prevent multiple ports # from being created for a device already attached to the network diff --git a/quark/tests/functional/test_ports.py b/quark/tests/functional/test_ports.py index e4f2fb2..33598f0 100644 --- a/quark/tests/functional/test_ports.py +++ b/quark/tests/functional/test_ports.py @@ -497,3 +497,70 @@ class QuarkPortFixedIPOperations(BaseFunctionalTest): for ip in result['fixed_ips']: self.assertTrue(ip in fixed_ips, '%s not in %s' % (ip, expected['fixed_ips'])) + + +class QuarkAdvancedServiceCreatePort(BaseFunctionalTest): + + def __init__(self, *args, **kwargs): + super(QuarkAdvancedServiceCreatePort, self).__init__(*args, **kwargs) + cidr = "192.168.10.0/24" + ip_network = netaddr.IPNetwork(cidr) + network = dict(name="public", + network_plugin="BASE", + ipam_strategy="ANY") + self.net_info = {"network": network} + subnet_v4 = dict(ip_version=4, next_auto_assign_ip=2, + cidr=cidr, first_ip=ip_network.first, + last_ip=ip_network.last, ip_policy=None) + self.sub_info = {"subnet": subnet_v4} + + @contextlib.contextmanager + def _stubs(self, network_info, subnet_info): + with contextlib.nested( + mock.patch("neutron.common.rpc.get_notifier"), + mock.patch("neutron.quota.QUOTAS.limit_check")): + mac = {'mac_address_range': dict(cidr="AA:BB:CC")} + self.context.is_admin = True + macrng_api.create_mac_address_range(self.context, mac) + self.context.is_admin = False + # Setting context's tenant_id because this network needs to belong + # to a regular tenant, and the network create method does not + # care about the tenant_id on the network + self.context.tenant_id = 'joetenant' + network = network_api.create_network(self.context, network_info) + subnet_info['subnet']['network_id'] = network['id'] + subnet = subnet_api.create_subnet(self.context, subnet_info) + self.context.tenant_id = 'advsvc' + yield network, subnet + + def test_can_create_port_with_adv_svc(self): + with self._stubs(self.net_info, self.sub_info) as (network, subnet): + port_info = {'port': {'network_id': network['id'], + 'tenant_id': 'someoneelse'}} + self.context.is_admin = True + self.context.is_advsvc = True + port_mod = port_api.create_port(self.context, port_info) + self.assertIsNotNone(port_mod['id']) + self.assertNotEqual(port_mod['tenant_id'], network['tenant_id']) + + def test_cant_create_port_without_adv_svc(self): + with self._stubs(self.net_info, self.sub_info) as (network, subnet): + port_info = {'port': {'network_id': network['id'], + 'tenant_id': 'someoneelse'}} + self.context.is_admin = True + self.context.is_advsvc = False + self.assertRaises(q_exc.NotAuthorized, + port_api.create_port, self.context, port_info) + + def test_cant_create_port_without_admin(self): + with self._stubs(self.net_info, self.sub_info) as (network, subnet): + port_info = {'port': {'network_id': network['id'], + 'tenant_id': 'someoneelse'}} + self.context.is_admin = False + self.context.is_advsvc = True + # This is NetworkNotFound because prior to doing the authorized + # check, quark will first attempt to retrieve the network but + # since networks are scoped by tenant when it is not an admin, + # it will not be found + self.assertRaises(q_exc.NetworkNotFound, + port_api.create_port, self.context, port_info) diff --git a/quark/tests/plugin_modules/test_ports.py b/quark/tests/plugin_modules/test_ports.py index f821288..6ac6a25 100644 --- a/quark/tests/plugin_modules/test_ports.py +++ b/quark/tests/plugin_modules/test_ports.py @@ -1901,3 +1901,97 @@ class TestQuarkPortUpdateFiltering(test_quark_plugin.TestQuarkPlugin): mac_address=new_port["port"]["mac_address"], device_id=new_port["port"]["device_id"], security_groups=[]) + + +class TestQuarkPortCreateAsAdvancedService(test_quark_plugin.TestQuarkPlugin): + + @contextlib.contextmanager + def _stubs(self, port=None, network=None, addr=None, mac=None): + if network: + network["network_plugin"] = "BASE" + network["ipam_strategy"] = "ANY" + port_model = models.Port() + port_model.update(port['port']) + port_models = port_model + db_mod = "quark.db.api" + ipam = "quark.ipam.QuarkIpam" + with contextlib.nested( + mock.patch("%s.port_create" % db_mod), + mock.patch("%s.network_find" % db_mod), + mock.patch("%s.port_find" % db_mod), + mock.patch("%s.allocate_ip_address" % ipam), + mock.patch("%s.allocate_mac_address" % ipam), + mock.patch("%s.port_count_all" % db_mod), + ) as (port_create, net_find, port_find, alloc_ip, alloc_mac, + port_count): + port_create.return_value = port_models + net_find.return_value = network + port_find.return_value = None + alloc_ip.return_value = addr + alloc_mac.return_value = mac + port_count.return_value = 0 + yield port_create + + def test_advanced_service_create_port_other_tenant_network(self): + """NCP-1819 - Advanced service can create port on any network + + Tests when an advanced service creating a port on another tenant's + network does not fail AND the tenant_id is that of the context's. + """ + self.context.is_advsvc = True + network_id = "foobar" + network = dict(id=network_id, + tenant_id="other_tenant") + ip = dict() + mac = dict(address="AA:BB:CC:DD:EE:FF") + port_1 = dict(port=dict(mac_address="AA:BB:CC:DD:EE:00", + network_id=network_id, + tenant_id=self.context.tenant_id, device_id=5, + name="Fake")) + + with self._stubs(port=port_1, network=network, addr=ip, mac=mac): + port = self.plugin.create_port(self.context, port_1) + self.assertEqual(self.context.tenant_id, port['tenant_id']) + + def test_advsvc_can_create_port_with_another_tenant_id(self): + """NCP-1819 - Advanced Service can create port on another tenant's net + + Tests that an advanced service can create a port on another tenant's + network. + """ + another_tenant_id = 'im-another-tenant' + self.context.is_advsvc = True + network_id = "foobar" + network = dict(id=network_id, + tenant_id="other_tenant") + ip = dict() + mac = dict(address="AA:BB:CC:DD:EE:FF") + port_1 = dict(port=dict(mac_address="AA:BB:CC:DD:EE:00", + network_id=network_id, + tenant_id=another_tenant_id, device_id=5, + name="Fake")) + + with self._stubs(port=port_1, network=network, addr=ip, mac=mac): + port = self.plugin.create_port(self.context, port_1) + self.assertEqual(another_tenant_id, port['tenant_id']) + + def test_non_advsvc_cannot_create_port_another_network(self): + """NCP-1819 - Normal tenant port create should fail another's network + + Tests that a normal tenant creating a port on another tenant's network + should not be allowed and throws an exception. + """ + normal_tenant_id = "other_tenant" + network_id = "foobar" + network = dict(id=network_id, + tenant_id=normal_tenant_id) + ip = dict() + mac = dict(address="AA:BB:CC:DD:EE:FF") + port_1 = dict(port=dict(mac_address="AA:BB:CC:DD:EE:00", + network_id=network_id, + tenant_id=normal_tenant_id, device_id=5, + name="Fake")) + + with self._stubs(port=port_1, network=network, addr=ip, mac=mac): + with self.assertRaises(exceptions.NotAuthorized): + self.plugin.create_port(self.context, port_1)