Move policy enforcement into REST API layer for v2.1 floating_ip_dns
This patch move policy enforcement into REST API layer for v2.1 floating_ip_dns extension, and adds unit tests. Partially implements blueprint v3-api-policy Change-Id: Ib6134fee5d7ce7096ca7a5eff6e22597bda6ee88
This commit is contained in:
parent
fbf1f5acae
commit
3fe4163a2b
|
@ -27,7 +27,7 @@ from nova import network
|
|||
|
||||
|
||||
ALIAS = "os-floating-ip-dns"
|
||||
authorize = extensions.extension_authorizer('compute', 'v3:' + ALIAS)
|
||||
authorize = extensions.os_compute_authorizer(ALIAS)
|
||||
|
||||
|
||||
def _translate_dns_entry_view(dns_entry):
|
||||
|
@ -84,7 +84,7 @@ class FloatingIPDNSDomainController(wsgi.Controller):
|
|||
|
||||
def __init__(self):
|
||||
super(FloatingIPDNSDomainController, self).__init__()
|
||||
self.network_api = network.API()
|
||||
self.network_api = network.API(skip_policy_check=True)
|
||||
|
||||
@extensions.expected_errors(501)
|
||||
def index(self, req):
|
||||
|
@ -165,7 +165,7 @@ class FloatingIPDNSEntryController(wsgi.Controller):
|
|||
|
||||
def __init__(self):
|
||||
super(FloatingIPDNSEntryController, self).__init__()
|
||||
self.network_api = network.API()
|
||||
self.network_api = network.API(skip_policy_check=True)
|
||||
|
||||
@extensions.expected_errors((404, 501))
|
||||
def show(self, req, domain_id, id):
|
||||
|
|
|
@ -397,3 +397,79 @@ class FloatingIpDNSTestV2(FloatingIpDNSTestV21):
|
|||
|
||||
def _bad_request(self):
|
||||
return webob.exc.HTTPUnprocessableEntity
|
||||
|
||||
|
||||
class FloatingIPDNSDomainPolicyEnforcementV21(test.NoDBTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(FloatingIPDNSDomainPolicyEnforcementV21, self).setUp()
|
||||
self.controller = fipdns_v21.FloatingIPDNSDomainController()
|
||||
self.rule_name = "compute_extension:v3:os-floating-ip-dns"
|
||||
self.policy.set_rules({self.rule_name: "project:non_fake"})
|
||||
self.req = fakes.HTTPRequest.blank('')
|
||||
|
||||
def test_get_floating_ip_dns_policy_failed(self):
|
||||
exc = self.assertRaises(
|
||||
exception.PolicyNotAuthorized,
|
||||
self.controller.index, self.req)
|
||||
self.assertEqual(
|
||||
"Policy doesn't allow %s to be performed." % self.rule_name,
|
||||
exc.format_message())
|
||||
|
||||
def test_update_floating_ip_dns_policy_failed(self):
|
||||
body = {'domain_entry':
|
||||
{'scope': 'public',
|
||||
'project': 'testproject'}}
|
||||
exc = self.assertRaises(
|
||||
exception.PolicyNotAuthorized,
|
||||
self.controller.update, self.req, _quote_domain(domain), body=body)
|
||||
self.assertEqual(
|
||||
"Policy doesn't allow %s to be performed." % self.rule_name,
|
||||
exc.format_message())
|
||||
|
||||
def test_delete_floating_ip_dns_policy_failed(self):
|
||||
exc = self.assertRaises(
|
||||
exception.PolicyNotAuthorized,
|
||||
self.controller.delete, self.req, _quote_domain(domain))
|
||||
self.assertEqual(
|
||||
"Policy doesn't allow %s to be performed." % self.rule_name,
|
||||
exc.format_message())
|
||||
|
||||
|
||||
class FloatingIPDNSEntryPolicyEnforcementV21(test.NoDBTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(FloatingIPDNSEntryPolicyEnforcementV21, self).setUp()
|
||||
self.controller = fipdns_v21.FloatingIPDNSEntryController()
|
||||
self.rule_name = "compute_extension:v3:os-floating-ip-dns"
|
||||
self.policy.set_rules({self.rule_name: "project:non_fake"})
|
||||
self.req = fakes.HTTPRequest.blank('')
|
||||
|
||||
def test_show_floating_ip_dns_entry_policy_failed(self):
|
||||
exc = self.assertRaises(
|
||||
exception.PolicyNotAuthorized,
|
||||
self.controller.show, self.req,
|
||||
_quote_domain(domain), test_ipv4_address)
|
||||
self.assertEqual(
|
||||
"Policy doesn't allow %s to be performed." % self.rule_name,
|
||||
exc.format_message())
|
||||
|
||||
def test_update_floating_ip_dns_policy_failed(self):
|
||||
body = {'dns_entry':
|
||||
{'ip': test_ipv4_address,
|
||||
'dns_type': 'A'}}
|
||||
exc = self.assertRaises(
|
||||
exception.PolicyNotAuthorized,
|
||||
self.controller.update, self.req, _quote_domain(domain),
|
||||
name, body=body)
|
||||
self.assertEqual(
|
||||
"Policy doesn't allow %s to be performed." % self.rule_name,
|
||||
exc.format_message())
|
||||
|
||||
def test_delete_floating_ip_dns_policy_failed(self):
|
||||
exc = self.assertRaises(
|
||||
exception.PolicyNotAuthorized,
|
||||
self.controller.delete, self.req, _quote_domain(domain), name)
|
||||
self.assertEqual(
|
||||
"Policy doesn't allow %s to be performed." % self.rule_name,
|
||||
exc.format_message())
|
||||
|
|
Loading…
Reference in New Issue