Merge "Added policy checks for add interface and remove interface"

This commit is contained in:
Jenkins 2012-09-10 00:21:07 +00:00 committed by Gerrit Code Review
commit 0a2d8ebba4
3 changed files with 91 additions and 4 deletions

View File

@ -12,6 +12,8 @@
"extension:router:view": [["rule:regular_user"]],
"extension:router:set": [["rule:admin_only"]],
"extension:router:add_router_interface": [["rule:admin_only"]],
"extension:router:remove_router_interface": [["rule:admin_or_owner"]],
"subnets:private:read": [["rule:admin_or_owner"]],
"subnets:private:write": [["rule:admin_or_owner"]],

View File

@ -274,12 +274,19 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
pass
def add_router_interface(self, context, router_id, interface_info):
# make sure router exists - will raise if not
self._get_router(context, router_id)
# make sure router exists
router = self._get_router(context, router_id)
if not interface_info:
msg = "Either subnet_id or port_id must be specified"
raise q_exc.BadRequest(resource='router', msg=msg)
try:
policy.enforce(context,
"extension:router:add_router_interface",
self._make_router_dict(router))
except q_exc.PolicyNotAuthorized:
raise l3.RouterNotFound(router_id=router_id)
if 'port_id' in interface_info:
if 'subnet_id' in interface_info:
msg = "cannot specify both subnet-id and port-id"
@ -327,6 +334,12 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
def remove_router_interface(self, context, router_id, interface_info):
# make sure router exists
router = self._get_router(context, router_id)
try:
policy.enforce(context,
"extension:router:remove_router_interface",
self._make_router_dict(router))
except q_exc.PolicyNotAuthorized:
raise l3.RouterNotFound(router_id=router_id)
if not interface_info:
msg = "Either subnet_id or port_id must be specified"

View File

@ -329,8 +329,9 @@ class L3NatDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
return self.deserialize('json', res)
@contextlib.contextmanager
def router(self, name='router1', admin_status_up=True, fmt='json'):
res = self._create_router(fmt, _uuid(), name=name,
def router(self, name='router1', admin_status_up=True,
fmt='json', tenant_id=_uuid()):
res = self._create_router(fmt, tenant_id, name=name,
admin_state_up=admin_status_up)
router = self.deserialize(fmt, res)
yield router
@ -387,6 +388,41 @@ class L3NatDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
body = self._show('ports', r_port_id,
expected_code=exc.HTTPNotFound.code)
def test_router_add_interface_subnet_with_bad_tenant(self):
with mock.patch('quantum.context.Context.to_dict') as tdict:
tenant_id = _uuid()
admin_context = {'roles': ['admin']}
tenant_context = {'tenant_id': 'bad_tenant',
'roles': []}
tdict.return_value = admin_context
with self.router(tenant_id=tenant_id) as r:
with self.network(tenant_id=tenant_id) as n:
with self.subnet(network=n) as s:
tdict.return_value = tenant_context
err_code = exc.HTTPNotFound.code
self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None,
err_code)
tdict.return_value = admin_context
body = self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None)
self.assertTrue('port_id' in body)
tdict.return_value = tenant_context
self._router_interface_action('remove',
r['router']['id'],
s['subnet']['id'],
None,
err_code)
tdict.return_value = admin_context
body = self._router_interface_action('remove',
r['router']['id'],
s['subnet']['id'],
None)
def test_router_add_interface_port(self):
with self.router() as r:
with self.port(no_delete=True) as p:
@ -407,6 +443,42 @@ class L3NatDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
None,
p['port']['id'])
def test_router_add_interface_port_bad_tenant(self):
with mock.patch('quantum.context.Context.to_dict') as tdict:
tenant_id = _uuid()
admin_context = {'roles': ['admin']}
tenant_context = {'tenant_id': 'bad_tenant',
'roles': []}
tdict.return_value = admin_context
with self.router() as r:
with self.port(no_delete=True) as p:
tdict.return_value = tenant_context
err_code = exc.HTTPNotFound.code
body = self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'],
err_code)
tdict.return_value = admin_context
body = self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'])
tdict.return_value = tenant_context
# clean-up
self._router_interface_action('remove',
r['router']['id'],
None,
p['port']['id'],
err_code)
tdict.return_value = admin_context
self._router_interface_action('remove',
r['router']['id'],
None,
p['port']['id'])
def test_router_add_interface_dup_subnet1(self):
with self.router() as r:
with self.subnet() as s: