Require admin context for interfaces on ext network

Currently any user can attach an interface to a neutron
external network, if the neutron plugin supports the port
binding extension.
In this case, nova will create neutron ports using the admin
client, thus bypassing neutron authZ checks for creating ports
on external networks.

This patch adds a check in nova to verify the API request has an
admin context when a request for an interface is made on a
neutron external network.

Conflicts:
	nova/exception.py

Change-Id: I5fb0bdcbf19eb82746ea3b192c1f65899bfb3c0b
Closes-Bug: 1284718
(cherry picked from commit 7d1b4117fd)
This commit is contained in:
Salvatore Orlando 2014-04-03 14:54:11 -07:00 committed by Aaron Rosen
parent ad3b509e7d
commit 1b69111f07
3 changed files with 41 additions and 6 deletions

View File

@ -593,6 +593,11 @@ class NetworkAmbiguous(Invalid):
"network ID(s) to select which one(s) to connect to,")
class ExternalNetworkAttachForbidden(NotAuthorized):
msg_fmt = _("It is not allowed to create an interface on "
"external network %(network_uuid)s")
class DatastoreNotFound(NotFound):
msg_fmt = _("Could not find the datastore reference(s) which the VM uses.")

View File

@ -144,6 +144,15 @@ class API(base.Base):
nets,
net_ids)
if not context.is_admin:
for net in nets:
# Perform this check here rather than in validate_networks to
# ensure the check is performed everytime allocate_for_instance
# is invoked
if net.get('router:external'):
raise exception.ExternalNetworkAttachForbidden(
network_uuid=net['id'])
return nets
def _create_port(self, port_client, instance, network_id, port_req_body,

View File

@ -180,8 +180,13 @@ class TestNeutronv2Base(test.TestCase):
self.nets4 = [{'id': 'his_netid4',
'name': 'his_netname4',
'tenant_id': 'his_tenantid'}]
self.nets = [self.nets1, self.nets2, self.nets3, self.nets4]
# A network request with external networks
self.nets5 = self.nets1 + [{'id': 'the-external-one',
'name': 'out-of-this-world',
'router:external': True,
'tenant_id': 'should-be-an-admin'}]
self.nets = [self.nets1, self.nets2, self.nets3,
self.nets4, self.nets5]
self.port_address = '10.0.1.2'
self.port_data1 = [{'network_id': 'my_netid1',
@ -1180,7 +1185,8 @@ class TestNeutronv2(TestNeutronv2Base):
api.get_fixed_ip_by_address,
self.context, address)
def _get_available_networks(self, prv_nets, pub_nets, req_ids=None):
def _get_available_networks(self, prv_nets, pub_nets,
req_ids=None, context=None):
api = neutronapi.API()
nets = prv_nets + pub_nets
if req_ids:
@ -1197,9 +1203,10 @@ class TestNeutronv2(TestNeutronv2Base):
**mox_list_params).AndReturn({'networks': pub_nets})
self.mox.ReplayAll()
rets = api._get_available_networks(self.context,
self.instance['project_id'],
req_ids)
rets = api._get_available_networks(
context if context else self.context,
self.instance['project_id'],
req_ids)
self.assertEqual(rets, nets)
def test_get_available_networks_all_private(self):
@ -1218,6 +1225,20 @@ class TestNeutronv2(TestNeutronv2Base):
req_ids = [net['id'] for net in (self.nets3[0], self.nets3[-1])]
self._get_available_networks(prv_nets, pub_nets, req_ids)
def test_get_available_networks_with_externalnet_fails(self):
req_ids = [net['id'] for net in self.nets5]
self.assertRaises(
exception.ExternalNetworkAttachForbidden,
self._get_available_networks,
self.nets5, pub_nets=[], req_ids=req_ids)
def test_get_available_networks_with_externalnet_admin_ctx(self):
admin_ctx = context.RequestContext('userid', 'my_tenantid',
is_admin=True)
req_ids = [net['id'] for net in self.nets5]
self._get_available_networks(self.nets5, pub_nets=[],
req_ids=req_ids, context=admin_ctx)
def test_get_floating_ip_pools(self):
api = neutronapi.API()
search_opts = {'router:external': True}