Move policy enforcement into REST API layer for v2.1 networks related

This patch moves the policy enforcement into REST API layer for v2.1
networks, networks_associate, tenant_networks extension and adds related
unittest.

Partially implements bp v3-api-policy
DocImpact

Change-Id: I4bf02b400d19d4c13ffe79452789c869114353aa
This commit is contained in:
He Jie Xu 2015-01-26 18:09:31 +08:00
parent 5fc7fdd66d
commit 2831eb337b
5 changed files with 173 additions and 10 deletions

View File

@ -28,9 +28,7 @@ from nova.objects import base as base_obj
from nova.objects import fields as obj_fields
ALIAS = 'os-networks'
authorize = extensions.extension_authorizer('compute', 'v3:' + ALIAS)
authorize_view = extensions.extension_authorizer('compute',
'v3:' + ALIAS + ':view')
authorize = extensions.os_compute_authorizer(ALIAS)
def network_dict(context, network):
@ -81,12 +79,12 @@ def network_dict(context, network):
class NetworkController(wsgi.Controller):
def __init__(self, network_api=None):
self.network_api = network_api or network.API()
self.network_api = network_api or network.API(skip_policy_check=True)
@extensions.expected_errors(())
def index(self, req):
context = req.environ['nova.context']
authorize_view(context)
authorize(context, action='view')
networks = self.network_api.get_all(context)
result = [network_dict(context, net_ref) for net_ref in networks]
return {'networks': result}
@ -111,7 +109,7 @@ class NetworkController(wsgi.Controller):
@extensions.expected_errors(404)
def show(self, req, id):
context = req.environ['nova.context']
authorize_view(context)
authorize(context, action='view')
try:
network = self.network_api.get(context, id)

View File

@ -22,14 +22,14 @@ from nova import network
ALIAS = "os-networks-associate"
authorize = extensions.extension_authorizer('compute', 'v3:' + ALIAS)
authorize = extensions.os_compute_authorizer(ALIAS)
class NetworkAssociateActionController(wsgi.Controller):
"""Network Association API Controller."""
def __init__(self, network_api=None):
self.network_api = network_api or network.API()
self.network_api = network_api or network.API(skip_policy_check=True)
@wsgi.action("disassociate_host")
@wsgi.response(202)

View File

@ -48,7 +48,7 @@ ALIAS = 'os-tenant-networks'
QUOTAS = quota.QUOTAS
LOG = logging.getLogger(__name__)
authorize = extensions.extension_authorizer('compute', 'v3:' + ALIAS)
authorize = extensions.os_compute_authorizer(ALIAS)
def network_dict(network):
@ -62,7 +62,7 @@ def network_dict(network):
class TenantNetworkController(wsgi.Controller):
def __init__(self, network_api=None):
self.network_api = nova.network.API()
self.network_api = nova.network.API(skip_policy_check=True)
self._default_networks = []
def _refresh_default_networks(self):

View File

@ -649,3 +649,115 @@ class NetworksAssociateTestV2(NetworksAssociateTestV21):
def _test_network_neutron_associate_host_validation_failed(self, body):
pass
class NetworksEnforcementV21(test.NoDBTestCase):
def setUp(self):
super(NetworksEnforcementV21, self).setUp()
self.controller = networks_v21.NetworkController()
self.req = fakes.HTTPRequest.blank('')
def test_show_policy_failed(self):
rule_name = 'compute_extension:v3:os-networks:view'
self.policy.set_rules({rule_name: "project:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller.show, self.req, fakes.FAKE_UUID)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_index_policy_failed(self):
rule_name = 'compute_extension:v3:os-networks:view'
self.policy.set_rules({rule_name: "project:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller.index, self.req)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_create_policy_failed(self):
rule_name = 'compute_extension:v3:os-networks'
self.policy.set_rules({rule_name: "project:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller.create, self.req, body=NEW_NETWORK)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_delete_policy_failed(self):
rule_name = 'compute_extension:v3:os-networks'
self.policy.set_rules({rule_name: "project:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller.delete, self.req, fakes.FAKE_UUID)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_add_policy_failed(self):
rule_name = 'compute_extension:v3:os-networks'
self.policy.set_rules({rule_name: "project:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller.add, self.req,
body={'id': fakes.FAKE_UUID})
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_disassociate_policy_failed(self):
rule_name = 'compute_extension:v3:os-networks'
self.policy.set_rules({rule_name: "project:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller._disassociate_host_and_project,
self.req, fakes.FAKE_UUID, body={'network': {}})
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
class NetworksAssociateEnforcementV21(test.NoDBTestCase):
def setUp(self):
super(NetworksAssociateEnforcementV21, self).setUp()
self.controller = (networks_associate_v21.
NetworkAssociateActionController())
self.req = fakes.HTTPRequest.blank('')
def test_disassociate_host_policy_failed(self):
rule_name = 'compute_extension:v3:os-networks-associate'
self.policy.set_rules({rule_name: "project:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller._disassociate_host_only,
self.req, fakes.FAKE_UUID, body={'disassociate_host': {}})
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_disassociate_project_only_policy_failed(self):
rule_name = 'compute_extension:v3:os-networks-associate'
self.policy.set_rules({rule_name: "project:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller._disassociate_project_only,
self.req, fakes.FAKE_UUID, body={'disassociate_project': {}})
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_disassociate_host_only_policy_failed(self):
rule_name = 'compute_extension:v3:os-networks-associate'
self.policy.set_rules({rule_name: "project:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller._associate_host,
self.req, fakes.FAKE_UUID, body={'associate_host': 'fake_host'})
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())

View File

@ -256,3 +256,56 @@ class TenantNetworksTestV2(TenantNetworksTestV21):
def test_network_create_empty_body(self):
self.assertRaises(webob.exc.HTTPUnprocessableEntity,
self.controller.create, self.req, {})
class TenantNetworksEnforcementV21(test.NoDBTestCase):
def setUp(self):
super(TenantNetworksEnforcementV21, self).setUp()
self.controller = networks_v21.TenantNetworkController()
self.req = fakes.HTTPRequest.blank('')
def test_create_policy_failed(self):
rule_name = 'compute_extension:v3:os-tenant-networks'
self.policy.set_rules({rule_name: "project:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller.create,
self.req, body={'network': {'label': 'test',
'cidr': '10.0.0.0/32'}})
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_index_policy_failed(self):
rule_name = 'compute_extension:v3:os-tenant-networks'
self.policy.set_rules({rule_name: "project:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller.index,
self.req)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_delete_policy_failed(self):
rule_name = 'compute_extension:v3:os-tenant-networks'
self.policy.set_rules({rule_name: "project:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller.delete,
self.req, fakes.FAKE_UUID)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())
def test_show_policy_failed(self):
rule_name = 'compute_extension:v3:os-tenant-networks'
self.policy.set_rules({rule_name: "project:non_fake"})
exc = self.assertRaises(
exception.PolicyNotAuthorized,
self.controller.show,
self.req, fakes.FAKE_UUID)
self.assertEqual(
"Policy doesn't allow %s to be performed." % rule_name,
exc.format_message())