Merge "Validate that network_id in port/subnet POST belong to the same tenant"

This commit is contained in:
Jenkins
2012-07-10 19:37:52 +00:00
committed by Gerrit Code Review
3 changed files with 57 additions and 4 deletions

View File

@@ -167,9 +167,20 @@ class Controller(object):
if self._collection in body:
# Have to account for bulk create
for item in body[self._collection]:
policy.enforce(request.context, action,
item[self._resource])
self._validate_network_tenant_ownership(
request,
item[self._resource],
)
policy.enforce(
request.context,
action,
item[self._resource],
)
else:
self._validate_network_tenant_ownership(
request,
body[self._resource]
)
policy.enforce(request.context, action, body[self._resource])
except exceptions.PolicyNotAuthorized:
raise webob.exc.HTTPForbidden()
@@ -294,6 +305,23 @@ class Controller(object):
return body
def _validate_network_tenant_ownership(self, request, resource_item):
if self._resource not in ('port', 'subnet'):
return
network_owner = self._plugin.get_network(
request.context,
resource_item['network_id'],
)['tenant_id']
if network_owner != resource_item['tenant_id']:
msg = _("Tenant %(tenant_id)s not allowed to "
"create %(resource)s on this network")
raise webob.exc.HTTPForbidden(msg % {
"tenant_id": resource_item['tenant_id'],
"resource": self._resource,
})
def create_resource(collection, resource, plugin, params):
controller = Controller(plugin, collection, resource, params)

View File

@@ -580,6 +580,7 @@ class JSONV2TestCase(APIv2TestCase):
return_value.update(initial_input['port'])
instance = self.plugin.return_value
instance.get_network.return_value = {'tenant_id': unicode(tenant_id)}
instance.create_port.return_value = return_value
res = self.api.post_json(_get_path('ports'), initial_input)

View File

@@ -118,7 +118,8 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
data = {'subnet': {'tenant_id': tenant_id,
'network_id': net_id,
'cidr': cidr,
'ip_version': ip_version}}
'ip_version': ip_version,
'tenant_id': self._tenant_id}}
if gateway_ip:
data['subnet']['gateway_ip'] = gateway_ip
if allocation_pools:
@@ -271,7 +272,6 @@ class TestV2HTTPResponse(QuantumDbPluginV2TestCase):
class TestPortsV2(QuantumDbPluginV2TestCase):
def test_create_port_json(self):
keys = [('admin_state_up', True), ('status', 'ACTIVE')]
with self.port() as port:
@@ -282,6 +282,18 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
self.assertEquals(len(ips), 1)
self.assertEquals(ips[0]['ip_address'], '10.0.0.2')
def test_create_port_bad_tenant(self):
with self.network() as network:
data = {'port': {'network_id': network['network']['id'],
'tenant_id': 'bad_tenant_id',
'admin_state_up': True,
'device_id': 'fake_device',
'fixed_ips': []}}
port_req = self.new_create_request('ports', data)
res = port_req.get_response(self.api)
self.assertEquals(res.status_int, 403)
def test_list_ports(self):
with contextlib.nested(self.port(), self.port()) as (port1, port2):
req = self.new_list_request('ports', 'json')
@@ -808,6 +820,18 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
res = req.get_response(self.api)
self.assertEquals(res.status_int, 204)
def test_create_subnet_bad_tenant(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 4,
'tenant_id': 'bad_tenant_id',
'gateway_ip': '10.0.2.1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEquals(res.status_int, 403)
def test_create_subnet_defaults(self):
gateway = '10.0.0.1'
cidr = '10.0.0.0/24'