RM7831
Fixes #166 Implements attribute checking for ports, networks and subnets for both create and update methods.
This commit is contained in:
@@ -77,7 +77,7 @@ def create_network(context, network):
|
||||
pnet_type, phys_net, seg_id = _adapt_provider_nets(context, network)
|
||||
|
||||
ipam_strategy = utils.pop_param(net_attrs, "ipam_strategy", None)
|
||||
if not ipam_strategy:
|
||||
if not ipam_strategy or not context.is_admin:
|
||||
ipam_strategy = CONF.QUARK.default_ipam_strategy
|
||||
|
||||
if not ipam.IPAM_REGISTRY.is_valid_strategy(ipam_strategy):
|
||||
@@ -134,7 +134,11 @@ def update_network(context, id, network):
|
||||
net = db_api.network_find(context, id=id, scope=db_api.ONE)
|
||||
if not net:
|
||||
raise exceptions.NetworkNotFound(net_id=id)
|
||||
net = db_api.network_update(context, net, **network["network"])
|
||||
net_dict = network["network"]
|
||||
utils.pop_param(net_dict, "network_plugin")
|
||||
if not context.is_admin and "ipam_strategy" in net_dict:
|
||||
utils.pop_param(net_dict, "ipam_strategy")
|
||||
net = db_api.network_update(context, net, **net_dict)
|
||||
|
||||
return v._make_network_dict(net)
|
||||
|
||||
|
@@ -44,6 +44,10 @@ def create_port(context, port):
|
||||
neutron/api/v2/attributes.py. All keys will be populated.
|
||||
"""
|
||||
LOG.info("create_port for tenant %s" % context.tenant_id)
|
||||
port_attrs = port["port"]
|
||||
|
||||
admin_only = ["mac_address", "device_owner", "bridge", "admin_state_up"]
|
||||
utils.filter_body(context, port_attrs, admin_only=admin_only)
|
||||
|
||||
port_attrs = port["port"]
|
||||
mac_address = utils.pop_param(port_attrs, "mac_address", None)
|
||||
@@ -199,9 +203,16 @@ def update_port(context, id, port):
|
||||
if not port_db:
|
||||
raise exceptions.PortNotFound(port_id=id)
|
||||
|
||||
fixed_ips = port["port"].pop("fixed_ips", None)
|
||||
if fixed_ips is not None:
|
||||
port_dict = port["port"]
|
||||
fixed_ips = port_dict.pop("fixed_ips", None)
|
||||
|
||||
admin_only = ["mac_address", "device_owner", "bridge", "admin_state_up",
|
||||
"device_id"]
|
||||
always_filter = ["network_id", "backend_key"]
|
||||
utils.filter_body(context, port_dict, admin_only=admin_only,
|
||||
always_filter=always_filter)
|
||||
|
||||
if fixed_ips is not None:
|
||||
# NOTE(mdietz): we want full control over IPAM since
|
||||
# we're allocating by subnet instead of
|
||||
# network.
|
||||
@@ -256,20 +267,20 @@ def update_port(context, id, port):
|
||||
|
||||
# Need to return all existing addresses and the new ones
|
||||
if addresses:
|
||||
port["port"]["addresses"] = port_db["ip_addresses"]
|
||||
port["port"]["addresses"].extend(addresses)
|
||||
port_dict["addresses"] = port_db["ip_addresses"]
|
||||
port_dict["addresses"].extend(addresses)
|
||||
|
||||
group_ids, security_groups = v.make_security_group_list(
|
||||
context, port["port"].pop("security_groups", None))
|
||||
context, port_dict.pop("security_groups", None))
|
||||
net_driver = registry.DRIVER_REGISTRY.get_driver(
|
||||
port_db.network["network_plugin"])
|
||||
net_driver.update_port(context, port_id=port_db.backend_key,
|
||||
security_groups=group_ids)
|
||||
|
||||
port["port"]["security_groups"] = security_groups
|
||||
port_dict["security_groups"] = security_groups
|
||||
|
||||
with context.session.begin():
|
||||
port = db_api.port_update(context, port_db, **port["port"])
|
||||
port = db_api.port_update(context, port_db, **port_dict)
|
||||
|
||||
# NOTE(mdietz): fix for issue 112, we wanted the IPs to be in
|
||||
# allocated_at order, so get a fresh object every time
|
||||
|
@@ -109,6 +109,17 @@ def create_subnet(context, subnet):
|
||||
|
||||
sub_attrs = subnet["subnet"]
|
||||
|
||||
always_pop = ["enable_dhcp", "ip_version", "first_ip", "last_ip",
|
||||
"_cidr"]
|
||||
admin_only = ["segment_id", "do_not_use", "created_at",
|
||||
"next_auto_assign_ip"]
|
||||
for attr in always_pop:
|
||||
if attr in sub_attrs:
|
||||
sub_attrs.pop(attr)
|
||||
if not context.is_admin:
|
||||
for attr in admin_only:
|
||||
if attr in sub_attrs:
|
||||
sub_attrs.pop(attr)
|
||||
_validate_subnet_cidr(context, net_id, sub_attrs["cidr"])
|
||||
|
||||
cidr = netaddr.IPNetwork(sub_attrs["cidr"])
|
||||
@@ -132,9 +143,6 @@ def create_subnet(context, subnet):
|
||||
host_routes = utils.pop_param(sub_attrs, "host_routes", [])
|
||||
allocation_pools = utils.pop_param(sub_attrs, "allocation_pools", None)
|
||||
|
||||
if not context.is_admin and "segment_id" in sub_attrs:
|
||||
sub_attrs.pop("segment_id")
|
||||
|
||||
sub_attrs["network"] = net
|
||||
|
||||
new_subnet = db_api.subnet_create(context, **sub_attrs)
|
||||
@@ -198,6 +206,11 @@ def update_subnet(context, id, subnet):
|
||||
raise exceptions.SubnetNotFound(id=id)
|
||||
|
||||
s = subnet["subnet"]
|
||||
always_pop = ["_cidr", "cidr", "first_ip", "last_ip", "ip_version",
|
||||
"segment_id", "network_id"]
|
||||
admin_only = ["do_not_use", "created_at", "tenant_id",
|
||||
"next_auto_assign_ip", "enable_dhcp"]
|
||||
utils.filter_body(context, s, admin_only, always_pop)
|
||||
|
||||
dns_ips = utils.pop_param(s, "dns_nameservers", [])
|
||||
host_routes = utils.pop_param(s, "host_routes", [])
|
||||
|
@@ -166,15 +166,30 @@ class TestQuarkUpdateNetwork(test_quark_plugin.TestQuarkPlugin):
|
||||
|
||||
def test_update_network(self):
|
||||
net = dict(id=1)
|
||||
new_net = net.copy()
|
||||
new_net["ipam_strategy"] = "BOTH_REQUIRED"
|
||||
with self._stubs(net=net) as net_update:
|
||||
self.plugin.update_network(self.context, 1, dict(network=net))
|
||||
self.assertTrue(net_update.called)
|
||||
self.plugin.update_network(self.context, 1, dict(network=new_net))
|
||||
net_update.assert_called_once_with(
|
||||
self.context, net, id=net["id"])
|
||||
|
||||
def test_update_network_not_found_fails(self):
|
||||
with self._stubs(net=None):
|
||||
with self.assertRaises(exceptions.NetworkNotFound):
|
||||
self.plugin.update_network(self.context, 1, None)
|
||||
|
||||
def test_update_network_admin_set_ipam_strategy(self):
|
||||
net = dict(id=1)
|
||||
new_net = net.copy()
|
||||
new_net["ipam_strategy"] = "BOTH_REQUIRED"
|
||||
|
||||
admin_ctx = self.context.elevated()
|
||||
with self._stubs(net=net) as net_update:
|
||||
self.plugin.update_network(admin_ctx, 1, dict(network=new_net))
|
||||
net_update.assert_called_once_with(
|
||||
admin_ctx, net, ipam_strategy=new_net["ipam_strategy"],
|
||||
id=net["id"])
|
||||
|
||||
|
||||
class TestQuarkDeleteNetwork(test_quark_plugin.TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
@@ -328,16 +343,18 @@ class TestQuarkCreateNetwork(test_quark_plugin.TestQuarkPlugin):
|
||||
def test_create_network_with_ipam_strategy(self):
|
||||
net = dict(id="abcdef", name="public", admin_state_up=True,
|
||||
tenant_id=0, ipam_strategy="BOTH")
|
||||
admin_context = self.context.elevated()
|
||||
with self._stubs(net=net):
|
||||
res = self.plugin.create_network(self.context, dict(network=net))
|
||||
res = self.plugin.create_network(admin_context, dict(network=net))
|
||||
self.assertEqual(res["ipam_strategy"], net["ipam_strategy"])
|
||||
|
||||
def test_create_network_with_bad_ipam_strategy_raises(self):
|
||||
net = dict(id="abcdef", name="public", admin_state_up=True,
|
||||
tenant_id=0, ipam_strategy="BUSTED")
|
||||
admin_context = self.context.elevated()
|
||||
with self._stubs(net=net):
|
||||
with self.assertRaises(q_exc.InvalidIpamStrategy):
|
||||
self.plugin.create_network(self.context, dict(network=net))
|
||||
self.plugin.create_network(admin_context, dict(network=net))
|
||||
|
||||
|
||||
class TestQuarkDiagnoseNetworks(test_quark_plugin.TestQuarkPlugin):
|
||||
|
@@ -20,6 +20,7 @@ import mock
|
||||
from neutron.api.v2 import attributes as neutron_attrs
|
||||
from neutron.common import exceptions
|
||||
from neutron.extensions import securitygroup as sg_ext
|
||||
from oslo.config import cfg
|
||||
|
||||
from quark.db import models
|
||||
from quark import exceptions as q_exc
|
||||
@@ -921,3 +922,145 @@ class TestPortBadNetworkPlugin(test_quark_plugin.TestQuarkPlugin):
|
||||
|
||||
with self.assertRaises(Exception): # noqa
|
||||
self.plugin.create_port(self.context, port)
|
||||
|
||||
|
||||
class TestQuarkPortCreateFiltering(test_quark_plugin.TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, network=None, addr=None, mac=None):
|
||||
network["network_plugin"] = "BASE"
|
||||
network["ipam_strategy"] = "ANY"
|
||||
|
||||
db_mod = "quark.db.api"
|
||||
ipam = "quark.ipam.QuarkIpam"
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch("%s.port_create" % db_mod),
|
||||
mock.patch("%s.network_find" % db_mod),
|
||||
mock.patch("%s.allocate_ip_address" % ipam),
|
||||
mock.patch("%s.allocate_mac_address" % ipam),
|
||||
mock.patch("neutron.openstack.common.uuidutils.generate_uuid"),
|
||||
mock.patch("quark.plugin_views._make_port_dict"),
|
||||
) as (port_create, net_find, alloc_ip, alloc_mac, gen_uuid, make_port):
|
||||
net_find.return_value = network
|
||||
alloc_ip.return_value = addr
|
||||
alloc_mac.return_value = mac
|
||||
gen_uuid.return_value = 1
|
||||
yield port_create, alloc_mac, net_find
|
||||
|
||||
def test_create_port_attribute_filtering(self):
|
||||
network = dict(id=1)
|
||||
mac = dict(address="AA:BB:CC:DD:EE:FF")
|
||||
port_name = "foobar"
|
||||
ip = dict()
|
||||
port = dict(port=dict(mac_address=mac["address"], network_id=1,
|
||||
tenant_id=self.context.tenant_id, device_id=2,
|
||||
name=port_name, device_owner="quark_tests",
|
||||
bridge="quark_bridge", admin_state_up=False))
|
||||
|
||||
port_create_dict = {}
|
||||
port_create_dict["port"] = port["port"].copy()
|
||||
port_create_dict["port"]["mac_address"] = "DE:AD:BE:EF:00:00"
|
||||
port_create_dict["port"]["device_owner"] = "ignored"
|
||||
port_create_dict["port"]["bridge"] = "ignored"
|
||||
port_create_dict["port"]["admin_state_up"] = "ignored"
|
||||
|
||||
with self._stubs(network=network, addr=ip,
|
||||
mac=mac) as (port_create, alloc_mac, net_find):
|
||||
self.plugin.create_port(self.context, port_create_dict)
|
||||
alloc_mac.assert_called_once_with(
|
||||
self.context, network["id"], 1,
|
||||
cfg.CONF.QUARK.ipam_reuse_after,
|
||||
mac_address=None)
|
||||
port_create.assert_called_once_with(
|
||||
self.context, addresses=[], network_id=network["id"],
|
||||
tenant_id="fake", uuid=1, name="foobar",
|
||||
mac_address=alloc_mac()["address"], backend_key=1, id=1,
|
||||
security_groups=[], device_id=2)
|
||||
|
||||
def test_create_port_attribute_filtering_admin(self):
|
||||
network = dict(id=1)
|
||||
mac = dict(address="AA:BB:CC:DD:EE:FF")
|
||||
port_name = "foobar"
|
||||
ip = dict()
|
||||
|
||||
port = dict(port=dict(mac_address=mac["address"], network_id=1,
|
||||
tenant_id=self.context.tenant_id, device_id=2,
|
||||
name=port_name, device_owner="quark_tests",
|
||||
bridge="quark_bridge", admin_state_up=False))
|
||||
|
||||
expected_mac = "DE:AD:BE:EF:00:00"
|
||||
expected_bridge = "new_bridge"
|
||||
expected_device_owner = "new_device_owner"
|
||||
expected_admin_state = "new_state"
|
||||
|
||||
port_create_dict = {}
|
||||
port_create_dict["port"] = port["port"].copy()
|
||||
port_create_dict["port"]["mac_address"] = expected_mac
|
||||
port_create_dict["port"]["device_owner"] = expected_device_owner
|
||||
port_create_dict["port"]["bridge"] = expected_bridge
|
||||
port_create_dict["port"]["admin_state_up"] = expected_admin_state
|
||||
|
||||
admin_ctx = self.context.elevated()
|
||||
with self._stubs(network=network, addr=ip,
|
||||
mac=mac) as (port_create, alloc_mac, net_find):
|
||||
self.plugin.create_port(admin_ctx, port_create_dict)
|
||||
|
||||
alloc_mac.assert_called_once_with(
|
||||
admin_ctx, network["id"], 1,
|
||||
cfg.CONF.QUARK.ipam_reuse_after,
|
||||
mac_address=expected_mac)
|
||||
|
||||
port_create.assert_called_once_with(
|
||||
admin_ctx, bridge=expected_bridge, uuid=1, name="foobar",
|
||||
admin_state_up=expected_admin_state, network_id=1,
|
||||
tenant_id="fake", id=1, device_owner=expected_device_owner,
|
||||
mac_address=mac["address"], device_id=2, backend_key=1,
|
||||
security_groups=[], addresses=[])
|
||||
|
||||
|
||||
class TestQuarkPortUpdateFiltering(test_quark_plugin.TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self):
|
||||
with contextlib.nested(
|
||||
mock.patch("quark.db.api.port_find"),
|
||||
mock.patch("quark.db.api.port_update"),
|
||||
mock.patch("quark.drivers.registry.DriverRegistry.get_driver"),
|
||||
mock.patch("quark.plugin_views._make_port_dict"),
|
||||
) as (port_find, port_update, get_driver, make_port):
|
||||
yield port_find, port_update
|
||||
|
||||
def test_update_port_attribute_filtering(self):
|
||||
new_port = {}
|
||||
new_port["port"] = {
|
||||
"mac_address": "DD:EE:FF:00:00:00", "device_owner": "new_owner",
|
||||
"bridge": "new_bridge", "admin_state_up": False, "device_id": 3,
|
||||
"network_id": 10, "backend_key": 1234, "name": "new_name"}
|
||||
|
||||
with self._stubs() as (port_find, port_update):
|
||||
self.plugin.update_port(self.context, 1, new_port)
|
||||
port_update.assert_called_once_with(
|
||||
self.context,
|
||||
port_find(),
|
||||
name="new_name",
|
||||
security_groups=[])
|
||||
|
||||
def test_update_port_attribute_filtering_admin(self):
|
||||
new_port = {}
|
||||
new_port["port"] = {
|
||||
"mac_address": "DD:EE:FF:00:00:00", "device_owner": "new_owner",
|
||||
"bridge": "new_bridge", "admin_state_up": False, "device_id": 3,
|
||||
"network_id": 10, "backend_key": 1234, "name": "new_name"}
|
||||
|
||||
admin_ctx = self.context.elevated()
|
||||
with self._stubs() as (port_find, port_update):
|
||||
self.plugin.update_port(admin_ctx, 1, new_port)
|
||||
port_update.assert_called_once_with(
|
||||
admin_ctx,
|
||||
port_find(),
|
||||
name="new_name",
|
||||
bridge=new_port["port"]["bridge"],
|
||||
admin_state_up=new_port["port"]["admin_state_up"],
|
||||
device_owner=new_port["port"]["device_owner"],
|
||||
mac_address=new_port["port"]["mac_address"],
|
||||
device_id=new_port["port"]["device_id"],
|
||||
security_groups=[])
|
||||
|
@@ -1009,10 +1009,11 @@ class TestSubnetsNotification(test_quark_plugin.TestQuarkPlugin):
|
||||
s = dict(network_id=1, cidr="192.168.10.0/24",
|
||||
tenant_id=1, id=1, created_at="123")
|
||||
with self._stubs(s) as notify:
|
||||
self.plugin.create_subnet(self.context, dict(subnet=s))
|
||||
admin_ctx = self.context.elevated()
|
||||
self.plugin.create_subnet(admin_ctx, dict(subnet=s))
|
||||
notify.assert_called_once_with("network")
|
||||
notify.return_value.info.assert_called_once_with(
|
||||
self.context,
|
||||
admin_ctx,
|
||||
"ip_block.create",
|
||||
dict(tenant_id=s["tenant_id"],
|
||||
ip_block_id=s["id"],
|
||||
@@ -1099,3 +1100,111 @@ class TestQuarkDiagnoseSubnets(test_quark_plugin.TestQuarkPlugin):
|
||||
with self._stubs(subnets=subnet, routes=[route]):
|
||||
actual = self.plugin.diagnose_subnet(self.context, subnet_id, None)
|
||||
self.assertEqual(subnet["id"], actual["subnets"]["id"])
|
||||
|
||||
|
||||
class TestQuarkCreateSubnetAttrFilters(test_quark_plugin.TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self):
|
||||
with contextlib.nested(
|
||||
mock.patch("quark.db.api.subnet_create"),
|
||||
mock.patch("quark.db.api.network_find"),
|
||||
mock.patch("quark.db.api.dns_create"),
|
||||
mock.patch("quark.db.api.route_create"),
|
||||
mock.patch("quark.plugin_views._make_subnet_dict")
|
||||
) as (subnet_create, net_find, dns_create, route_create, sub_dict):
|
||||
yield subnet_create, net_find
|
||||
|
||||
def test_create_subnet(self):
|
||||
subnet = {"subnet": {
|
||||
"network_id": 1, "tenant_id": self.context.tenant_id,
|
||||
"ip_version": 4, "cidr": "172.16.0.0/24",
|
||||
"gateway_ip": "0.0.0.0",
|
||||
"dns_nameservers": neutron_attrs.ATTR_NOT_SPECIFIED,
|
||||
"host_routes": neutron_attrs.ATTR_NOT_SPECIFIED,
|
||||
"enable_dhcp": None, "first_ip": 0, "last_ip": 1,
|
||||
"next_auto_assign_ip": 10}}
|
||||
|
||||
with self._stubs() as (subnet_create, net_find):
|
||||
self.plugin.create_subnet(self.context, subnet)
|
||||
self.assertEqual(subnet_create.call_count, 1)
|
||||
subnet_create.assert_called_once_with(
|
||||
self.context, network_id=subnet["subnet"]["network_id"],
|
||||
tenant_id=subnet["subnet"]["tenant_id"],
|
||||
cidr=subnet["subnet"]["cidr"], network=net_find())
|
||||
|
||||
def test_create_subnet_admin(self):
|
||||
subnet = {"subnet": {
|
||||
"network_id": 1, "tenant_id": self.context.tenant_id,
|
||||
"ip_version": 4, "cidr": "172.16.0.0/24",
|
||||
"gateway_ip": "0.0.0.0",
|
||||
"dns_nameservers": neutron_attrs.ATTR_NOT_SPECIFIED,
|
||||
"host_routes": neutron_attrs.ATTR_NOT_SPECIFIED,
|
||||
"enable_dhcp": None, "first_ip": 0, "last_ip": 1,
|
||||
"next_auto_assign_ip": 10}}
|
||||
|
||||
admin_ctx = self.context.elevated()
|
||||
with self._stubs() as (subnet_create, net_find):
|
||||
self.plugin.create_subnet(admin_ctx, subnet)
|
||||
self.assertEqual(subnet_create.call_count, 1)
|
||||
subnet_create.assert_called_once_with(
|
||||
admin_ctx, network_id=subnet["subnet"]["network_id"],
|
||||
tenant_id=subnet["subnet"]["tenant_id"],
|
||||
cidr=subnet["subnet"]["cidr"], network=net_find(),
|
||||
next_auto_assign_ip=subnet["subnet"]["next_auto_assign_ip"])
|
||||
|
||||
|
||||
class TestQuarkUpdateSubnetAttrFilters(test_quark_plugin.TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self):
|
||||
with contextlib.nested(
|
||||
mock.patch("quark.db.api.subnet_find"),
|
||||
mock.patch("quark.db.api.subnet_update"),
|
||||
mock.patch("quark.db.api.dns_create"),
|
||||
mock.patch("quark.db.api.route_find"),
|
||||
mock.patch("quark.db.api.route_update"),
|
||||
mock.patch("quark.db.api.route_create"),
|
||||
mock.patch("quark.plugin_views._make_subnet_dict")
|
||||
) as (subnet_find, subnet_update, dns_create, route_find,
|
||||
route_update, route_create, make_subnet):
|
||||
yield subnet_update, subnet_find
|
||||
|
||||
def test_update_subnet_attr_filters(self):
|
||||
subnet = {"subnet": {
|
||||
"network_id": 1, "tenant_id": self.context.tenant_id,
|
||||
"ip_version": 4, "cidr": "172.16.0.0/24",
|
||||
"gateway_ip": "0.0.0.0",
|
||||
"dns_nameservers": neutron_attrs.ATTR_NOT_SPECIFIED,
|
||||
"host_routes": neutron_attrs.ATTR_NOT_SPECIFIED,
|
||||
"enable_dhcp": None, "first_ip": 0, "last_ip": 1,
|
||||
"next_auto_assign_ip": 10, "do_not_use": False}}
|
||||
|
||||
with self._stubs() as (subnet_update, subnet_find):
|
||||
self.plugin.update_subnet(self.context, 1, subnet)
|
||||
|
||||
# NOTE(mdietz): the assertion here shows that, without admin,
|
||||
# all of the attributes passed above are stripped
|
||||
# from the request body. Otherwise, the attributes
|
||||
# above would be passed as keyword arguments to the
|
||||
# subnet_update db api call.
|
||||
subnet_update.assert_called_once_with(
|
||||
self.context, subnet_find())
|
||||
|
||||
def test_update_subnet_attr_filters_admin(self):
|
||||
subnet = {"subnet": {
|
||||
"network_id": 1, "tenant_id": self.context.tenant_id,
|
||||
"ip_version": 4, "cidr": "172.16.0.0/24",
|
||||
"gateway_ip": "0.0.0.0",
|
||||
"dns_nameservers": neutron_attrs.ATTR_NOT_SPECIFIED,
|
||||
"host_routes": neutron_attrs.ATTR_NOT_SPECIFIED,
|
||||
"enable_dhcp": False, "first_ip": 0, "last_ip": 1,
|
||||
"next_auto_assign_ip": 10, "do_not_use": True}}
|
||||
|
||||
admin_ctx = self.context.elevated()
|
||||
with self._stubs() as (subnet_update, subnet_find):
|
||||
self.plugin.update_subnet(admin_ctx, 1, subnet)
|
||||
subnet_update.assert_called_once_with(
|
||||
admin_ctx, subnet_find(),
|
||||
next_auto_assign_ip=subnet["subnet"]["next_auto_assign_ip"],
|
||||
tenant_id=subnet["subnet"]["tenant_id"],
|
||||
enable_dhcp=subnet["subnet"]["enable_dhcp"],
|
||||
do_not_use=subnet["subnet"]["do_not_use"])
|
||||
|
@@ -31,6 +31,15 @@ from neutron.openstack.common import log as logging
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def filter_body(context, body, admin_only=None, always_filter=None):
|
||||
if not context.is_admin and admin_only:
|
||||
for attr in admin_only:
|
||||
pop_param(body, attr)
|
||||
if always_filter:
|
||||
for attr in always_filter:
|
||||
pop_param(body, attr)
|
||||
|
||||
|
||||
def attr_specified(param):
|
||||
return param is not attributes.ATTR_NOT_SPECIFIED
|
||||
|
||||
|
Reference in New Issue
Block a user