Fixes #151
Makes all MAC address range operations admin-only. Additionally, moves the one admin-only check in the API extension into the plugin module to make it more easily testable. Finally, moves a brace in nvp_driver because the latest version of flake8 after a venv rebuild inexplicably decided said brace was visually indented incorrectly. We should probably pin the flake8 version, but we can do that in a separate patch.
This commit is contained in:
@@ -53,8 +53,6 @@ class MacAddressRangesController(wsgi.Controller):
|
|||||||
|
|
||||||
def index(self, request):
|
def index(self, request):
|
||||||
context = request.context
|
context = request.context
|
||||||
if not context.is_admin:
|
|
||||||
raise webob.exc.HTTPForbidden()
|
|
||||||
return {"mac_address_ranges":
|
return {"mac_address_ranges":
|
||||||
self._plugin.get_mac_address_ranges(context)}
|
self._plugin.get_mac_address_ranges(context)}
|
||||||
|
|
||||||
|
|||||||
@@ -246,8 +246,7 @@ class NVPDriver(base.BaseDriver):
|
|||||||
'recieved': {
|
'recieved': {
|
||||||
'packets': stats['rx_packets'],
|
'packets': stats['rx_packets'],
|
||||||
'bytes': stats['rx_bytes'],
|
'bytes': stats['rx_bytes'],
|
||||||
'errors': stats['rx_errors']
|
'errors': stats['rx_errors']},
|
||||||
},
|
|
||||||
'transmitted': {
|
'transmitted': {
|
||||||
'packets': stats['tx_packets'],
|
'packets': stats['tx_packets'],
|
||||||
'bytes': stats['tx_bytes'],
|
'bytes': stats['tx_bytes'],
|
||||||
|
|||||||
@@ -14,6 +14,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import netaddr
|
import netaddr
|
||||||
|
from neutron.common import exceptions
|
||||||
from neutron.openstack.common import log as logging
|
from neutron.openstack.common import log as logging
|
||||||
|
|
||||||
from quark.db import api as db_api
|
from quark.db import api as db_api
|
||||||
@@ -63,6 +64,9 @@ def get_mac_address_range(context, id, fields=None):
|
|||||||
LOG.info("get_mac_address_range %s for tenant %s fields %s" %
|
LOG.info("get_mac_address_range %s for tenant %s fields %s" %
|
||||||
(id, context.tenant_id, fields))
|
(id, context.tenant_id, fields))
|
||||||
|
|
||||||
|
if not context.is_admin:
|
||||||
|
raise exceptions.NotAuthorized()
|
||||||
|
|
||||||
mac_address_range = db_api.mac_address_range_find(
|
mac_address_range = db_api.mac_address_range_find(
|
||||||
context, id=id, scope=db_api.ONE)
|
context, id=id, scope=db_api.ONE)
|
||||||
|
|
||||||
@@ -74,12 +78,18 @@ def get_mac_address_range(context, id, fields=None):
|
|||||||
|
|
||||||
def get_mac_address_ranges(context):
|
def get_mac_address_ranges(context):
|
||||||
LOG.info("get_mac_address_ranges for tenant %s" % context.tenant_id)
|
LOG.info("get_mac_address_ranges for tenant %s" % context.tenant_id)
|
||||||
|
if not context.is_admin:
|
||||||
|
raise exceptions.NotAuthorized()
|
||||||
|
|
||||||
ranges = db_api.mac_address_range_find(context)
|
ranges = db_api.mac_address_range_find(context)
|
||||||
return [v._make_mac_range_dict(m) for m in ranges]
|
return [v._make_mac_range_dict(m) for m in ranges]
|
||||||
|
|
||||||
|
|
||||||
def create_mac_address_range(context, mac_range):
|
def create_mac_address_range(context, mac_range):
|
||||||
LOG.info("create_mac_address_range for tenant %s" % context.tenant_id)
|
LOG.info("create_mac_address_range for tenant %s" % context.tenant_id)
|
||||||
|
if not context.is_admin:
|
||||||
|
raise exceptions.NotAuthorized()
|
||||||
|
|
||||||
cidr = mac_range["mac_address_range"]["cidr"]
|
cidr = mac_range["mac_address_range"]["cidr"]
|
||||||
cidr, first_address, last_address = _to_mac_range(cidr)
|
cidr, first_address, last_address = _to_mac_range(cidr)
|
||||||
with context.session.begin():
|
with context.session.begin():
|
||||||
@@ -104,6 +114,9 @@ def delete_mac_address_range(context, id):
|
|||||||
"""
|
"""
|
||||||
LOG.info("delete_mac_address_range %s for tenant %s" %
|
LOG.info("delete_mac_address_range %s for tenant %s" %
|
||||||
(id, context.tenant_id))
|
(id, context.tenant_id))
|
||||||
|
if not context.is_admin:
|
||||||
|
raise exceptions.NotAuthorized()
|
||||||
|
|
||||||
with context.session.begin():
|
with context.session.begin():
|
||||||
mar = db_api.mac_address_range_find(context, id=id, scope=db_api.ONE)
|
mar = db_api.mac_address_range_find(context, id=id, scope=db_api.ONE)
|
||||||
if not mar:
|
if not mar:
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ import contextlib
|
|||||||
|
|
||||||
import mock
|
import mock
|
||||||
import netaddr
|
import netaddr
|
||||||
|
from neutron.common import exceptions
|
||||||
|
|
||||||
from quark import exceptions as quark_exceptions
|
from quark import exceptions as quark_exceptions
|
||||||
from quark.plugin_modules import mac_address_ranges
|
from quark.plugin_modules import mac_address_ranges
|
||||||
@@ -27,9 +28,12 @@ class TestQuarkGetMacAddressRanges(test_quark_plugin.TestQuarkPlugin):
|
|||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def _stubs(self, mac_range):
|
def _stubs(self, mac_range):
|
||||||
db_mod = "quark.db.api"
|
db_mod = "quark.db.api"
|
||||||
|
old_context = self.context
|
||||||
|
self.context = self.context.elevated()
|
||||||
with mock.patch("%s.mac_address_range_find" % db_mod) as mar_find:
|
with mock.patch("%s.mac_address_range_find" % db_mod) as mar_find:
|
||||||
mar_find.return_value = mac_range
|
mar_find.return_value = mac_range
|
||||||
yield
|
yield
|
||||||
|
self.context = old_context
|
||||||
|
|
||||||
def test_find_mac_ranges(self):
|
def test_find_mac_ranges(self):
|
||||||
mar = dict(id=1, cidr="AA:BB:CC/24")
|
mar = dict(id=1, cidr="AA:BB:CC/24")
|
||||||
@@ -55,9 +59,12 @@ class TestQuarkCreateMacAddressRanges(test_quark_plugin.TestQuarkPlugin):
|
|||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def _stubs(self, mac_range):
|
def _stubs(self, mac_range):
|
||||||
db_mod = "quark.db.api"
|
db_mod = "quark.db.api"
|
||||||
|
old_context = self.context
|
||||||
|
self.context = self.context.elevated()
|
||||||
with mock.patch("%s.mac_address_range_create" % db_mod) as mar_create:
|
with mock.patch("%s.mac_address_range_create" % db_mod) as mar_create:
|
||||||
mar_create.return_value = mac_range
|
mar_create.return_value = mac_range
|
||||||
yield
|
yield
|
||||||
|
self.context = old_context
|
||||||
|
|
||||||
def test_create_range(self):
|
def test_create_range(self):
|
||||||
mar = dict(mac_address_range=dict(id=1, cidr="AA:BB:CC/24"))
|
mar = dict(mac_address_range=dict(id=1, cidr="AA:BB:CC/24"))
|
||||||
@@ -124,12 +131,15 @@ class TestQuarkDeleteMacAddressRanges(test_quark_plugin.TestQuarkPlugin):
|
|||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def _stubs(self, mac_range):
|
def _stubs(self, mac_range):
|
||||||
db_mod = "quark.db.api"
|
db_mod = "quark.db.api"
|
||||||
|
old_context = self.context
|
||||||
|
self.context = self.context.elevated()
|
||||||
with contextlib.nested(
|
with contextlib.nested(
|
||||||
mock.patch("%s.mac_address_range_find" % db_mod),
|
mock.patch("%s.mac_address_range_find" % db_mod),
|
||||||
mock.patch("%s.mac_address_range_delete" % db_mod),
|
mock.patch("%s.mac_address_range_delete" % db_mod),
|
||||||
) as (mar_find, mar_delete):
|
) as (mar_find, mar_delete):
|
||||||
mar_find.return_value = mac_range
|
mar_find.return_value = mac_range
|
||||||
yield mar_delete
|
yield mar_delete
|
||||||
|
self.context = old_context
|
||||||
|
|
||||||
def test_mac_address_range_delete_not_found(self):
|
def test_mac_address_range_delete_not_found(self):
|
||||||
with self._stubs(None):
|
with self._stubs(None):
|
||||||
@@ -152,3 +162,22 @@ class TestQuarkDeleteMacAddressRanges(test_quark_plugin.TestQuarkPlugin):
|
|||||||
resp = self.plugin.delete_mac_address_range(self.context, 1)
|
resp = self.plugin.delete_mac_address_range(self.context, 1)
|
||||||
self.assertIsNone(resp)
|
self.assertIsNone(resp)
|
||||||
mar_delete.assert_called_once_with(self.context, mar)
|
mar_delete.assert_called_once_with(self.context, mar)
|
||||||
|
|
||||||
|
|
||||||
|
class TestQuarkMacAddressCRUDNotAdminRaises(test_quark_plugin.TestQuarkPlugin):
|
||||||
|
def test_mac_ranges_index_fails(self):
|
||||||
|
with self.assertRaises(exceptions.NotAuthorized):
|
||||||
|
self.plugin.get_mac_address_ranges(self.context)
|
||||||
|
|
||||||
|
def test_show_mac_range_fails(self):
|
||||||
|
with self.assertRaises(exceptions.NotAuthorized):
|
||||||
|
self.plugin.get_mac_address_range(self.context, 1)
|
||||||
|
|
||||||
|
def test_create_mac_range_fails(self):
|
||||||
|
with self.assertRaises(exceptions.NotAuthorized):
|
||||||
|
self.plugin.create_mac_address_range(
|
||||||
|
self.context, {"mac_address_range": 1})
|
||||||
|
|
||||||
|
def test_delete_mac_range_fails(self):
|
||||||
|
with self.assertRaises(exceptions.NotAuthorized):
|
||||||
|
self.plugin.delete_mac_address_range(self.context, 1)
|
||||||
|
|||||||
Reference in New Issue
Block a user