Remove calls to policy.check from plugin logic

Blueprint make-authz-orthogonal

This patch implements part #3 of this blueprint, according to its
specification.
It does so by allowing the view generator in the API layer to strip
off fields which do not satify authorization policies.
Also, some checks in unit tests for plugins relied on the
capability of the plugin to invoke directly the policy engine.
This checks have been removed and replaced by equivalent unit tests.
Finally, this patch required changes to most test cases for API
extensions in order to ensure the resource attribute map was
updated with the extension's attributes

Change-Id: I1ef94a8a628d34697254b68d7a539bd1c636876e
This commit is contained in:
Salvatore Orlando 2013-05-02 19:39:30 +02:00
parent 391108a014
commit 13f9e02a64
37 changed files with 386 additions and 363 deletions

View File

@ -8,20 +8,6 @@
"external": "field:networks:router:external=True",
"default": "rule:admin_or_owner",
"extension:provider_network:view": "rule:admin_only",
"extension:provider_network:set": "rule:admin_only",
"extension:router:view": "rule:regular_user",
"extension:port_binding:view": "rule:admin_only",
"extension:port_binding:set": "rule:admin_only",
"get_port:binding:host_id": "rule:admin_only",
"get_port:binding:vif_type": "rule:admin_only",
"get_port:binding:profile": "rule:admin_only",
"get_port:binding:capabilities": "rule:admin_only",
"create_port:binding:host_id": "rule:admin_only",
"update_port:binding:host_id": "rule:admin_only",
"subnets:private:read": "rule:admin_or_owner",
"subnets:private:write": "rule:admin_or_owner",
"subnets:shared:read": "rule:regular_user",
@ -34,6 +20,11 @@
"create_network": "",
"get_network": "rule:admin_or_owner or rule:shared or rule:external",
"get_network:router:external": "rule:regular_user",
"get_network:provider:network_type": "rule:admin_only",
"get_network:provider:physical_network": "rule:admin_only",
"get_network:provider:segmentation_id": "rule:admin_only",
"get_network:queue_id": "rule:admin_only",
"create_network:shared": "rule:admin_only",
"create_network:router:external": "rule:admin_only",
"create_network:provider:network_type": "rule:admin_only",
@ -49,13 +40,19 @@
"create_port:mac_address": "rule:admin_or_network_owner",
"create_port:fixed_ips": "rule:admin_or_network_owner",
"create_port:port_security_enabled": "rule:admin_or_network_owner",
"create_port:binding:host_id": "rule:admin_only",
"get_port": "rule:admin_or_owner",
"get_port:queue_id": "rule:admin_only",
"get_port:binding:vif_type": "rule:admin_only",
"get_port:binding:capabilities": "rule:admin_only",
"get_port:binding:host_id": "rule:admin_only",
"get_port:binding:profile": "rule:admin_only",
"update_port": "rule:admin_or_owner",
"update_port:fixed_ips": "rule:admin_or_network_owner",
"update_port:port_security_enabled": "rule:admin_or_network_owner",
"update_port:binding:host_id": "rule:admin_only",
"delete_port": "rule:admin_or_owner",
"extension:service_type:view_extended": "rule:admin_only",
"create_service_type": "rule:admin_only",
"update_service_type": "rule:admin_only",
"delete_service_type": "rule:admin_only",
@ -63,7 +60,6 @@
"create_qos_queue": "rule:admin_only",
"get_qos_queue": "rule:admin_only",
"get_qos_queues": "rule:admin_only",
"update_agent": "rule:admin_only",
"delete_agent": "rule:admin_only",

View File

@ -116,17 +116,40 @@ class Controller(object):
% self._plugin.__class__.__name__)
return getattr(self._plugin, native_sorting_attr_name, False)
def _is_visible(self, attr):
attr_val = self._attr_info.get(attr)
return attr_val and attr_val['is_visible']
def _is_visible(self, context, attr_name, data):
action = "%s:%s" % (self._plugin_handlers[self.SHOW], attr_name)
# Optimistically init authz_check to True
authz_check = True
try:
attr = (attributes.RESOURCE_ATTRIBUTE_MAP
[self._collection].get(attr_name))
if attr and attr.get('enforce_policy'):
authz_check = policy.check_if_exists(
context, action, data)
except KeyError:
# The extension was not configured for adding its resources
# to the global resource attribute map. Policy check should
# not be performed
LOG.debug(_("The resource %(resource)s was not found in the "
"RESOURCE_ATTRIBUTE_MAP; unable to perform authZ "
"check for attribute %(attr)s"),
{'resource': self._collection,
'attr': attr})
except exceptions.PolicyRuleNotFound:
LOG.debug(_("Policy rule:%(action)s not found. Assuming no "
"authZ check is defined for %(attr)s"),
{'action': action,
'attr': attr_name})
attr_val = self._attr_info.get(attr_name)
return attr_val and attr_val['is_visible'] and authz_check
def _view(self, data, fields_to_strip=None):
def _view(self, context, data, fields_to_strip=None):
# make sure fields_to_strip is iterable
if not fields_to_strip:
fields_to_strip = []
return dict(item for item in data.iteritems()
if (self._is_visible(item[0]) and
if (self._is_visible(context, item[0], data) and
item[0] not in fields_to_strip))
def _do_field_list(self, original_fields):
@ -204,7 +227,6 @@ class Controller(object):
obj_list = obj_getter(request.context, **kwargs)
obj_list = sorting_helper.sort(obj_list)
obj_list = pagination_helper.paginate(obj_list)
# Check authz
if do_authz:
# FIXME(salvatore-orlando): obj_getter might return references to
@ -216,7 +238,7 @@ class Controller(object):
obj,
plugin=self._plugin)]
collection = {self._collection:
[self._view(obj,
[self._view(request.context, obj,
fields_to_strip=fields_to_add)
for obj in obj_list]}
pagination_links = pagination_helper.get_links(obj_list)
@ -260,7 +282,8 @@ class Controller(object):
api_common.list_args(request, "fields"))
parent_id = kwargs.get(self._parent_id_name)
return {self._resource:
self._view(self._item(request,
self._view(request.context,
self._item(request,
id,
do_authz=True,
field_list=field_list,
@ -278,7 +301,8 @@ class Controller(object):
kwargs = {self._resource: item}
if parent_id:
kwargs[self._parent_id_name] = parent_id
objs.append(self._view(obj_creator(request.context,
objs.append(self._view(request.context,
obj_creator(request.context,
**kwargs)))
return objs
# Note(salvatore-orlando): broad catch as in theory a plugin
@ -367,7 +391,7 @@ class Controller(object):
# plugin does atomic bulk create operations
obj_creator = getattr(self._plugin, "%s_bulk" % action)
objs = obj_creator(request.context, body, **kwargs)
return notify({self._collection: [self._view(obj)
return notify({self._collection: [self._view(request.context, obj)
for obj in objs]})
else:
obj_creator = getattr(self._plugin, action)
@ -379,7 +403,8 @@ class Controller(object):
else:
kwargs.update({self._resource: body})
obj = obj_creator(request.context, **kwargs)
return notify({self._resource: self._view(obj)})
return notify({self._resource: self._view(request.context,
obj)})
def delete(self, request, id, **kwargs):
"""Deletes the specified entity."""
@ -411,7 +436,7 @@ class Controller(object):
notifier_method,
notifier_api.CONF.default_notification_level,
{self._resource + '_id': id})
result = {self._resource: self._view(obj)}
result = {self._resource: self._view(request.context, obj)}
self._send_dhcp_notification(request.context,
result,
notifier_method)
@ -459,7 +484,7 @@ class Controller(object):
if parent_id:
kwargs[self._parent_id_name] = parent_id
obj = obj_updater(request.context, id, **kwargs)
result = {self._resource: self._view(obj)}
result = {self._resource: self._view(request.context, obj)}
notifier_method = self._resource + '.update.end'
notifier_api.notify(request.context,
self._publisher_id,

View File

@ -79,10 +79,14 @@ class PortNotFound(NotFound):
"on network %(net_id)s")
class PolicyNotFound(NotFound):
class PolicyFileNotFound(NotFound):
message = _("Policy configuration policy.json could not be found")
class PolicyRuleNotFound(NotFound):
message = _("Requested rule:%(rule)s cannot be found")
class StateInvalid(BadRequest):
message = _("Unsupported port state: %(port_state)s")

View File

@ -34,7 +34,6 @@ from quantum.extensions import l3
from quantum.openstack.common import log as logging
from quantum.openstack.common.notifier import api as notifier_api
from quantum.openstack.common import uuidutils
from quantum import policy
LOG = logging.getLogger(__name__)
@ -784,11 +783,6 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
routers = self.get_sync_data(context.elevated(), [router_id])
l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers)
def _check_l3_view_auth(self, context, network):
return policy.check(context,
"extension:router:view",
network)
def _network_is_external(self, context, net_id):
try:
context.session.query(ExternalNetwork).filter_by(
@ -798,9 +792,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
return False
def _extend_network_dict_l3(self, context, network):
if self._check_l3_view_auth(context, network):
network[l3.EXTERNAL] = self._network_is_external(
context, network['id'])
network[l3.EXTERNAL] = self._network_is_external(
context, network['id'])
def _process_l3_create(self, context, net_data, net_id):
external = net_data.get(l3.EXTERNAL)

View File

@ -25,7 +25,6 @@ from quantum.db import model_base
from quantum.db import models_v2
from quantum.extensions import portbindings
from quantum.openstack.common import log as logging
from quantum import policy
LOG = logging.getLogger(__name__)
@ -69,18 +68,6 @@ class PortBindingMixin(object):
None,
_port_result_filter_hook)
def _check_portbindings_view_auth(self, context, port):
#TODO(salv-orlando): Remove this as part of bp/make-authz-orthogonal
keys_to_delete = []
for key in port:
if key.startswith('binding'):
policy_rule = "get_port:%s" % key
if not policy.check(context, policy_rule, port):
keys_to_delete.append(key)
for key in keys_to_delete:
del port[key]
return port
def _process_portbindings_create_and_update(self, context, port_data,
port):
host = port_data.get(portbindings.HOST_ID)

View File

@ -29,7 +29,6 @@ from quantum.db import api as db
from quantum.db import model_base
from quantum.db import models_v2
from quantum.openstack.common import log as logging
from quantum import policy
LOG = logging.getLogger(__name__)
@ -198,14 +197,6 @@ class ServiceTypeManager(object):
context.session.add(ServiceDefinition(**svc_def))
return svc_type_db
def _check_service_type_view_auth(self, context, service_type):
# FIXME(salvatore-orlando): This should be achieved via policy
# engine without need for explicit checks in manager code.
# Also, the policy in this way does not make a lot of sense
return policy.check(context,
"extension:service_type:view_extended",
service_type)
def _get_service_type(self, context, svc_type_id):
try:
query = context.session.query(ServiceType)
@ -232,21 +223,17 @@ class ServiceTypeManager(object):
def _make_svc_def_dict(svc_def_db):
svc_def = {'service_class': svc_def_db['service_class']}
if self._check_service_type_view_auth(context,
svc_type.as_dict()):
svc_def.update({'plugin': svc_def_db['plugin'],
'driver': svc_def_db['driver']})
svc_def.update({'plugin': svc_def_db['plugin'],
'driver': svc_def_db['driver']})
return svc_def
res = {'id': svc_type['id'],
'name': svc_type['name'],
'default': svc_type['default'],
'num_instances': svc_type['num_instances'],
'service_definitions':
[_make_svc_def_dict(svc_def) for svc_def
in svc_type['service_definitions']]}
if self._check_service_type_view_auth(context,
svc_type.as_dict()):
res['num_instances'] = svc_type['num_instances']
# Field selection
if fields:
return dict(((k, v) for k, v in res.iteritems()

View File

@ -112,7 +112,10 @@ class Agent(object):
return [ex]
def get_extended_resources(self, version):
return {}
if version == "2.0":
return RESOURCE_ATTRIBUTE_MAP
else:
return {}
class AgentPluginBase(object):

View File

@ -46,6 +46,7 @@ EXTENDED_ATTRIBUTES_2_0 = {
'ports': {
VIF_TYPE: {'allow_post': False, 'allow_put': False,
'default': attributes.ATTR_NOT_SPECIFIED,
'enforce_policy': True,
'is_visible': True},
HOST_ID: {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
@ -53,10 +54,12 @@ EXTENDED_ATTRIBUTES_2_0 = {
'enforce_policy': True},
PROFILE: {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
'enforce_policy': True,
'validate': {'type:dict': None},
'is_visible': True},
CAPABILITIES: {'allow_post': False, 'allow_put': False,
'default': attributes.ATTR_NOT_SPECIFIED,
'enforce_policy': True,
'is_visible': True},
}
}

View File

@ -39,6 +39,7 @@ EXTENDED_ATTRIBUTES_2_0 = {
'networks': {
PORTSECURITY: {'allow_post': True, 'allow_put': True,
'convert_to': attributes.convert_to_boolean,
'enforce_policy': True,
'default': True,
'is_visible': True},
},
@ -46,6 +47,7 @@ EXTENDED_ATTRIBUTES_2_0 = {
PORTSECURITY: {'allow_post': True, 'allow_put': True,
'convert_to': attributes.convert_to_boolean,
'default': attributes.ATTR_NOT_SPECIFIED,
'enforce_policy': True,
'is_visible': True},
}
}

View File

@ -265,7 +265,8 @@ class Securitygroup(extensions.ExtensionDescriptor):
def get_extended_resources(self, version):
if version == "2.0":
return EXTENDED_ATTRIBUTES_2_0
return dict(EXTENDED_ATTRIBUTES_2_0.items() +
RESOURCE_ATTRIBUTE_MAP.items())
else:
return {}

View File

@ -30,12 +30,12 @@ from quantum.plugins.common import constants
LOG = logging.getLogger(__name__)
RESOURCE_NAME = "service-type"
RESOURCE_NAME = "service_type"
COLLECTION_NAME = "%ss" % RESOURCE_NAME
SERVICE_ATTR = 'service_class'
PLUGIN_ATTR = 'plugin'
DRIVER_ATTR = 'driver'
EXT_ALIAS = RESOURCE_NAME
EXT_ALIAS = 'service-type'
# Attribute Map for Service Type Resource
RESOURCE_ATTRIBUTE_MAP = {
@ -190,18 +190,17 @@ class Servicetype(extensions.ExtensionDescriptor):
@classmethod
def get_resources(cls):
"""Returns Extended Resource for service type management."""
my_plurals = [(key.replace('-', '_'),
key[:-1].replace('-', '_')) for
key in RESOURCE_ATTRIBUTE_MAP.keys()]
my_plurals = [(key, key[:-1]) for key in RESOURCE_ATTRIBUTE_MAP.keys()]
my_plurals.append(('service_definitions', 'service_definition'))
attributes.PLURALS.update(dict(my_plurals))
attr_map = RESOURCE_ATTRIBUTE_MAP[COLLECTION_NAME]
collection_name = COLLECTION_NAME.replace('_', '-')
controller = base.create_resource(
COLLECTION_NAME,
collection_name,
RESOURCE_NAME,
servicetype_db.ServiceTypeManager.get_instance(),
attr_map)
return [extensions.ResourceExtension(COLLECTION_NAME,
return [extensions.ResourceExtension(collection_name,
controller,
attr_map=attr_map)]

View File

@ -69,7 +69,6 @@ from quantum.openstack.common import lockutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.plugins.bigswitch.version import version_string_with_vcs
from quantum import policy
LOG = logging.getLogger(__name__)
@ -292,9 +291,6 @@ class QuantumRestProxyV2(db_base_plugin_v2.QuantumDbPluginV2,
supported_extension_aliases = ["router", "binding"]
binding_view = "extension:port_binding:view"
binding_set = "extension:port_binding:set"
def __init__(self):
LOG.info(_('QuantumRestProxy: Starting plugin. Version=%s'),
version_string_with_vcs())
@ -1243,13 +1239,9 @@ class QuantumRestProxyV2(db_base_plugin_v2.QuantumDbPluginV2,
return data
def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource)
def _extend_port_dict_binding(self, context, port):
if self._check_view_auth(context, port, self.binding_view):
port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_OVS
port[portbindings.CAPABILITIES] = {
portbindings.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases}
port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_OVS
port[portbindings.CAPABILITIES] = {
portbindings.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases}
return port

View File

@ -49,7 +49,6 @@ from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
from quantum.plugins.brocade.db import models as brocade_db
from quantum.plugins.brocade import vlanbm as vbm
from quantum import policy
from quantum import scheduler
@ -212,8 +211,6 @@ class BrocadePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
self.supported_extension_aliases = ["binding", "security-group",
"agent", "agent_scheduler"]
self.binding_view = "extension:port_binding:view"
self.binding_set = "extension:port_binding:set"
self.physical_interface = (cfg.CONF.PHYSICAL_INTERFACE.
physical_interface)
@ -436,16 +433,12 @@ class BrocadePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
bport.vlan_id)
def _extend_port_dict_binding(self, context, port):
if self._check_view_auth(context, port, self.binding_view):
port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_BRIDGE
port[portbindings.CAPABILITIES] = {
portbindings.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases}
port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_BRIDGE
port[portbindings.CAPABILITIES] = {
portbindings.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases}
return port
def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource)
def get_plugin_version(self):
"""Get version number of the plugin."""
return PLUGIN_VERSION

View File

@ -33,7 +33,6 @@ from quantum.plugins.hyperv import agent_notifier_api
from quantum.plugins.hyperv.common import constants
from quantum.plugins.hyperv import db as hyperv_db
from quantum.plugins.hyperv import rpc_callbacks
from quantum import policy
DEFAULT_VLAN_RANGES = []
@ -150,11 +149,6 @@ class HyperVQuantumPlugin(db_base_plugin_v2.QuantumDbPluginV2,
__native_bulk_support = True
supported_extension_aliases = ["provider", "router", "binding", "quotas"]
network_view = "extension:provider_network:view"
network_set = "extension:provider_network:set"
binding_view = "extension:port_binding:view"
binding_set = "extension:port_binding:set"
def __init__(self, configfile=None):
self._db = hyperv_db.HyperVPluginDB()
self._db.initialize()
@ -193,9 +187,6 @@ class HyperVQuantumPlugin(db_base_plugin_v2.QuantumDbPluginV2,
# Consume from all consumers in a thread
self.conn.consume_in_thread()
def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource)
def _parse_network_vlan_ranges(self):
self._network_vlan_ranges = plugin_utils.parse_network_vlan_ranges(
cfg.CONF.HYPERV.network_vlan_ranges)
@ -255,12 +246,11 @@ class HyperVQuantumPlugin(db_base_plugin_v2.QuantumDbPluginV2,
return net
def _extend_network_dict_provider(self, context, network):
if self._check_view_auth(context, network, self.network_view):
binding = self._db.get_network_binding(
context.session, network['id'])
network[provider.NETWORK_TYPE] = binding.network_type
p = self._network_providers_map[binding.network_type]
p.extend_network_dict(network, binding)
binding = self._db.get_network_binding(
context.session, network['id'])
network[provider.NETWORK_TYPE] = binding.network_type
p = self._network_providers_map[binding.network_type]
p.extend_network_dict(network, binding)
def _check_provider_update(self, context, attrs):
network_type = attrs.get(provider.NETWORK_TYPE)
@ -318,8 +308,7 @@ class HyperVQuantumPlugin(db_base_plugin_v2.QuantumDbPluginV2,
return [self._fields(net, fields) for net in nets]
def _extend_port_dict_binding(self, context, port):
if self._check_view_auth(context, port, self.binding_view):
port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_HYPERV
port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_HYPERV
return port
def create_port(self, context, port):

View File

@ -44,7 +44,6 @@ from quantum.openstack.common.rpc import proxy
from quantum.plugins.common import utils as plugin_utils
from quantum.plugins.linuxbridge.common import constants
from quantum.plugins.linuxbridge.db import l2network_db_v2 as db
from quantum import policy
LOG = logging.getLogger(__name__)
@ -214,9 +213,6 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
self._aliases = aliases
return self._aliases
network_view = "extension:provider_network:view"
network_set = "extension:provider_network:set"
def __init__(self):
self.extra_binding_dict = {
portbindings.VIF_TYPE: portbindings.VIF_TYPE_BRIDGE,
@ -264,27 +260,28 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
sys.exit(1)
LOG.info(_("Network VLAN ranges: %s"), self.network_vlan_ranges)
def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource)
def _add_network_vlan_range(self, physical_network, vlan_min, vlan_max):
self._add_network(physical_network)
self.network_vlan_ranges[physical_network].append((vlan_min, vlan_max))
# REVISIT(rkukura) Use core mechanism for attribute authorization
# when available.
def _add_network(self, physical_network):
if physical_network not in self.network_vlan_ranges:
self.network_vlan_ranges[physical_network] = []
def _extend_network_dict_provider(self, context, network):
if self._check_view_auth(context, network, self.network_view):
binding = db.get_network_binding(context.session, network['id'])
if binding.vlan_id == constants.FLAT_VLAN_ID:
network[provider.NETWORK_TYPE] = constants.TYPE_FLAT
network[provider.PHYSICAL_NETWORK] = binding.physical_network
network[provider.SEGMENTATION_ID] = None
elif binding.vlan_id == constants.LOCAL_VLAN_ID:
network[provider.NETWORK_TYPE] = constants.TYPE_LOCAL
network[provider.PHYSICAL_NETWORK] = None
network[provider.SEGMENTATION_ID] = None
else:
network[provider.NETWORK_TYPE] = constants.TYPE_VLAN
network[provider.PHYSICAL_NETWORK] = binding.physical_network
network[provider.SEGMENTATION_ID] = binding.vlan_id
binding = db.get_network_binding(context.session, network['id'])
if binding.vlan_id == constants.FLAT_VLAN_ID:
network[provider.NETWORK_TYPE] = constants.TYPE_FLAT
network[provider.PHYSICAL_NETWORK] = binding.physical_network
network[provider.SEGMENTATION_ID] = None
elif binding.vlan_id == constants.LOCAL_VLAN_ID:
network[provider.NETWORK_TYPE] = constants.TYPE_LOCAL
network[provider.PHYSICAL_NETWORK] = None
network[provider.SEGMENTATION_ID] = None
else:
network[provider.NETWORK_TYPE] = constants.TYPE_VLAN
network[provider.PHYSICAL_NETWORK] = binding.physical_network
network[provider.SEGMENTATION_ID] = binding.vlan_id
def _process_provider_create(self, context, attrs):
network_type = attrs.get(provider.NETWORK_TYPE)
@ -446,25 +443,6 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
return [self._fields(net, fields) for net in nets]
def get_port(self, context, id, fields=None):
with context.session.begin(subtransactions=True):
port = super(LinuxBridgePluginV2, self).get_port(context,
id,
fields)
return self._check_portbindings_view_auth(context, port)
def get_ports(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
res_ports = []
with context.session.begin(subtransactions=True):
ports = super(LinuxBridgePluginV2,
self).get_ports(context, filters, fields, sorts,
limit, marker, page_reverse)
for port in ports:
self._check_portbindings_view_auth(context, port)
res_ports.append(port)
return res_ports
def create_port(self, context, port):
session = context.session
port_data = port['port']
@ -482,7 +460,7 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
self._process_port_create_security_group(
context, port, sgids)
self.notify_security_groups_member_updated(context, port)
return self._check_portbindings_view_auth(context, port)
return port
def update_port(self, context, id, port):
original_port = self.get_port(context, id)
@ -506,7 +484,7 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
if need_port_update_notify:
self._notify_port_updated(context, updated_port)
return self._check_portbindings_view_auth(context, updated_port)
return updated_port
def delete_port(self, context, id, l3_port_check=True):

View File

@ -37,7 +37,6 @@ from quantum.plugins.mlnx import agent_notify_api
from quantum.plugins.mlnx.common import constants
from quantum.plugins.mlnx.db import mlnx_db_v2 as db
from quantum.plugins.mlnx import rpc_callbacks
from quantum import policy
LOG = logging.getLogger(__name__)
@ -105,12 +104,6 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.QuantumDbPluginV2,
sys.exit(1)
LOG.info(_("Network VLAN ranges: %s"), self.network_vlan_ranges)
def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource)
def _enforce_set_auth(self, context, resource, action):
policy.enforce(context, action, resource)
def _add_network_vlan_range(self, physical_network, vlan_min, vlan_max):
self._add_network(physical_network)
self.network_vlan_ranges[physical_network].append((vlan_min, vlan_max))
@ -120,18 +113,17 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.QuantumDbPluginV2,
self.network_vlan_ranges[physical_network] = []
def _extend_network_dict_provider(self, context, network):
if self._check_view_auth(context, network, self.network_view):
binding = db.get_network_binding(context.session, network['id'])
network[provider.NETWORK_TYPE] = binding.network_type
if binding.network_type == constants.TYPE_FLAT:
network[provider.PHYSICAL_NETWORK] = binding.physical_network
network[provider.SEGMENTATION_ID] = None
elif binding.network_type == constants.TYPE_LOCAL:
network[provider.PHYSICAL_NETWORK] = None
network[provider.SEGMENTATION_ID] = None
else:
network[provider.PHYSICAL_NETWORK] = binding.physical_network
network[provider.SEGMENTATION_ID] = binding.segmentation_id
binding = db.get_network_binding(context.session, network['id'])
network[provider.NETWORK_TYPE] = binding.network_type
if binding.network_type == constants.TYPE_FLAT:
network[provider.PHYSICAL_NETWORK] = binding.physical_network
network[provider.SEGMENTATION_ID] = None
elif binding.network_type == constants.TYPE_LOCAL:
network[provider.PHYSICAL_NETWORK] = None
network[provider.SEGMENTATION_ID] = None
else:
network[provider.PHYSICAL_NETWORK] = binding.physical_network
network[provider.SEGMENTATION_ID] = binding.segmentation_id
def _set_tenant_network_type(self):
self.tenant_network_type = cfg.CONF.MLNX.tenant_network_type
@ -156,8 +148,6 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.QuantumDbPluginV2,
if not (network_type_set or physical_network_set or
segmentation_id_set):
return (None, None, None)
# Authorize before exposing plugin details to client
self._enforce_set_auth(context, attrs, self.network_set)
if not network_type_set:
msg = _("provider:network_type required")
@ -237,8 +227,6 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.QuantumDbPluginV2,
if not (network_type_set or physical_network_set or
segmentation_id_set):
return
# Authorize before exposing plugin details to client
self._enforce_set_auth(context, attrs, self.network_set)
msg = _("Plugin does not support updating provider attributes")
raise q_exc.InvalidInput(error_message=msg)
@ -346,18 +334,17 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.QuantumDbPluginV2,
return [self._fields(net, fields) for net in nets]
def _extend_port_dict_binding(self, context, port):
if self._check_view_auth(context, port, self.binding_view):
port_binding = db.get_port_profile_binding(context.session,
port['id'])
if port_binding:
port[portbindings.VIF_TYPE] = port_binding.vnic_type
port[portbindings.CAPABILITIES] = {
portbindings.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases}
binding = db.get_network_binding(context.session,
port['network_id'])
fabric = binding.physical_network
port[portbindings.PROFILE] = {'physical_network': fabric}
port_binding = db.get_port_profile_binding(context.session,
port['id'])
if port_binding:
port[portbindings.VIF_TYPE] = port_binding.vnic_type
port[portbindings.CAPABILITIES] = {
portbindings.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases}
binding = db.get_network_binding(context.session,
port['network_id'])
fabric = binding.physical_network
port[portbindings.PROFILE] = {'physical_network': fabric}
return port
def create_port(self, context, port):

View File

@ -38,7 +38,6 @@ from quantum.plugins.nec.common import exceptions as nexc
from quantum.plugins.nec.db import api as ndb
from quantum.plugins.nec.db import nec_plugin_base
from quantum.plugins.nec import ofc_manager
from quantum import policy
LOG = logging.getLogger(__name__)
@ -87,9 +86,6 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base,
self._aliases = aliases
return self._aliases
binding_view = "extension:port_binding:view"
binding_set = "extension:port_binding:set"
def __init__(self):
ndb.initialize()
self.ofc = ofc_manager.OFCManager()
@ -130,9 +126,6 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base,
# Consume from all consumers in a thread
self.conn.consume_in_thread()
def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource)
def _update_resource_status(self, context, resource, id, status):
"""Update status of specified resource."""
request = {}
@ -365,11 +358,10 @@ class NECPluginV2(nec_plugin_base.NECPluginV2Base,
return [self._fields(net, fields) for net in nets]
def _extend_port_dict_binding(self, context, port):
if self._check_view_auth(context, port, self.binding_view):
port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_OVS
port[portbindings.CAPABILITIES] = {
portbindings.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases}
port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_OVS
port[portbindings.CAPABILITIES] = {
portbindings.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases}
return port
def create_port(self, context, port):

View File

@ -64,7 +64,7 @@ from quantum.plugins.nicira import nvp_cluster
from quantum.plugins.nicira.nvp_plugin_version import PLUGIN_VERSION
from quantum.plugins.nicira import NvpApiClient
from quantum.plugins.nicira import nvplib
from quantum import policy
LOG = logging.getLogger("QuantumPlugin")
NVP_NOSNAT_RULES_ORDER = 10
@ -142,7 +142,6 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
# Map nova zones to cluster for easy retrieval
novazone_cluster_map = {}
provider_network_view = "extension:provider_network:view"
port_security_enabled_update = "update_port:port_security_enabled"
def __init__(self, loglevel=None):
@ -668,9 +667,6 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
nvp_exc.NvpNoMorePortsException:
webob.exc.HTTPBadRequest})
def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource)
def _handle_provider_create(self, context, attrs):
# NOTE(salvatore-orlando): This method has been borrowed from
# the OpenvSwtich plugin, altough changed to match NVP specifics.
@ -720,17 +716,16 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
# which should be specified in physical_network
def _extend_network_dict_provider(self, context, network, binding=None):
if self._check_view_auth(context, network, self.provider_network_view):
if not binding:
binding = nicira_db.get_network_binding(context.session,
network['id'])
# With NVP plugin 'normal' overlay networks will have no binding
# TODO(salvatore-orlando) make sure users can specify a distinct
# phy_uuid as 'provider network' for STT net type
if binding:
network[pnet.NETWORK_TYPE] = binding.binding_type
network[pnet.PHYSICAL_NETWORK] = binding.phy_uuid
network[pnet.SEGMENTATION_ID] = binding.vlan_id
if not binding:
binding = nicira_db.get_network_binding(context.session,
network['id'])
# With NVP plugin 'normal' overlay networks will have no binding
# TODO(salvatore-orlando) make sure users can specify a distinct
# phy_uuid as 'provider network' for STT net type
if binding:
network[pnet.NETWORK_TYPE] = binding.binding_type
network[pnet.PHYSICAL_NETWORK] = binding.phy_uuid
network[pnet.SEGMENTATION_ID] = binding.vlan_id
def _handle_lswitch_selection(self, cluster, network,
network_binding, max_ports,
@ -2093,18 +2088,3 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
return
nvplib.delete_lqueue(self.cluster, id)
return super(NvpPluginV2, self).delete_qos_queue(context, id)
def get_qos_queue(self, context, id, fields=None):
if not self._check_view_auth(context, {}, ext_qos.qos_queue_get):
# don't want the user to find out that they guessed the right id
# so we raise not found if the policy.json file doesn't allow them
raise ext_qos.QueueNotFound(id=id)
return super(NvpPluginV2, self).get_qos_queue(context, id, fields)
def get_qos_queues(self, context, filters=None, fields=None):
if not self._check_view_auth(context, {'qos_queue': []},
ext_qos.qos_queue_list):
return []
return super(NvpPluginV2, self).get_qos_queues(context, filters,
fields)

View File

@ -27,7 +27,7 @@ from quantum import manager
from quantum import quota
RESOURCE_NAME = "network-gateway"
RESOURCE_NAME = "network_gateway"
COLLECTION_NAME = "%ss" % RESOURCE_NAME
EXT_ALIAS = RESOURCE_NAME
DEVICE_ID_ATTR = 'id'
@ -137,8 +137,8 @@ class Nvp_networkgw(object):
# register quotas for network gateways
quota.QUOTAS.register_resource_by_name(RESOURCE_NAME)
controller = base.create_resource(COLLECTION_NAME,
collection_name = COLLECTION_NAME.replace('_', '-')
controller = base.create_resource(collection_name,
RESOURCE_NAME,
plugin, params,
member_actions=member_actions)
@ -146,6 +146,12 @@ class Nvp_networkgw(object):
controller,
member_actions=member_actions)]
def get_extended_resources(self, version):
if version == "2.0":
return RESOURCE_ATTRIBUTE_MAP
else:
return {}
class NetworkGatewayPluginBase(object):

View File

@ -122,16 +122,19 @@ EXTENDED_ATTRIBUTES_2_0 = {
'allow_put': False,
'is_visible': False,
'default': 1,
'enforce_policy': True,
'convert_to': convert_to_unsigned_int_or_none},
QUEUE: {'allow_post': False,
'allow_put': False,
'is_visible': True,
'default': False}},
'default': False,
'enforce_policy': True}},
'networks': {QUEUE: {'allow_post': True,
'allow_put': True,
'is_visible': True,
'default': False}}
'default': False,
'enforce_policy': True}}
}

View File

@ -138,27 +138,23 @@ class NVPQoSDbMixin(ext_qos.QueuePluginBase):
context.session.delete(binding)
def _extend_port_qos_queue(self, context, port):
if self._check_view_auth(context, {'qos_queue': None},
ext_qos.qos_queue_get):
filters = {'port_id': [port['id']]}
fields = ['queue_id']
port[ext_qos.QUEUE] = None
queue_id = self._get_port_queue_bindings(
context, filters, fields)
if queue_id:
port[ext_qos.QUEUE] = queue_id[0]['queue_id']
filters = {'port_id': [port['id']]}
fields = ['queue_id']
port[ext_qos.QUEUE] = None
queue_id = self._get_port_queue_bindings(
context, filters, fields)
if queue_id:
port[ext_qos.QUEUE] = queue_id[0]['queue_id']
return port
def _extend_network_qos_queue(self, context, network):
if self._check_view_auth(context, {'qos_queue': None},
ext_qos.qos_queue_get):
filters = {'network_id': [network['id']]}
fields = ['queue_id']
network[ext_qos.QUEUE] = None
queue_id = self._get_network_queue_bindings(
context, filters, fields)
if queue_id:
network[ext_qos.QUEUE] = queue_id[0]['queue_id']
filters = {'network_id': [network['id']]}
fields = ['queue_id']
network[ext_qos.QUEUE] = None
queue_id = self._get_network_queue_bindings(
context, filters, fields)
if queue_id:
network[ext_qos.QUEUE] = queue_id[0]['queue_id']
return network
def _make_qos_queue_dict(self, queue, fields=None):

View File

@ -51,7 +51,6 @@ from quantum.plugins.common import utils as plugin_utils
from quantum.plugins.openvswitch.common import config # noqa
from quantum.plugins.openvswitch.common import constants
from quantum.plugins.openvswitch import ovs_db_v2
from quantum import policy
LOG = logging.getLogger(__name__)
@ -254,9 +253,6 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
self._aliases = aliases
return self._aliases
network_view = "extension:provider_network:view"
network_set = "extension:provider_network:set"
def __init__(self, configfile=None):
self.extra_binding_dict = {
portbindings.VIF_TYPE: portbindings.VIF_TYPE_OVS,
@ -326,29 +322,22 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
sys.exit(1)
LOG.info(_("Tunnel ID ranges: %s"), self.tunnel_id_ranges)
# TODO(rkukura) Use core mechanism for attribute authorization
# when available.
def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource)
def _extend_network_dict_provider(self, context, network):
if self._check_view_auth(context, network, self.network_view):
binding = ovs_db_v2.get_network_binding(context.session,
network['id'])
network[provider.NETWORK_TYPE] = binding.network_type
if binding.network_type == constants.TYPE_GRE:
network[provider.PHYSICAL_NETWORK] = None
network[provider.SEGMENTATION_ID] = binding.segmentation_id
elif binding.network_type == constants.TYPE_FLAT:
network[provider.PHYSICAL_NETWORK] = binding.physical_network
network[provider.SEGMENTATION_ID] = None
elif binding.network_type == constants.TYPE_VLAN:
network[provider.PHYSICAL_NETWORK] = binding.physical_network
network[provider.SEGMENTATION_ID] = binding.segmentation_id
elif binding.network_type == constants.TYPE_LOCAL:
network[provider.PHYSICAL_NETWORK] = None
network[provider.SEGMENTATION_ID] = None
binding = ovs_db_v2.get_network_binding(context.session,
network['id'])
network[provider.NETWORK_TYPE] = binding.network_type
if binding.network_type == constants.TYPE_GRE:
network[provider.PHYSICAL_NETWORK] = None
network[provider.SEGMENTATION_ID] = binding.segmentation_id
elif binding.network_type == constants.TYPE_FLAT:
network[provider.PHYSICAL_NETWORK] = binding.physical_network
network[provider.SEGMENTATION_ID] = None
elif binding.network_type == constants.TYPE_VLAN:
network[provider.PHYSICAL_NETWORK] = binding.physical_network
network[provider.SEGMENTATION_ID] = binding.segmentation_id
elif binding.network_type == constants.TYPE_LOCAL:
network[provider.PHYSICAL_NETWORK] = None
network[provider.SEGMENTATION_ID] = None
def _process_provider_create(self, context, attrs):
network_type = attrs.get(provider.NETWORK_TYPE)
@ -548,26 +537,7 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
port_data, port)
self._process_port_create_security_group(context, port, sgids)
self.notify_security_groups_member_updated(context, port)
return self._check_portbindings_view_auth(context, port)
def get_port(self, context, id, fields=None):
with context.session.begin(subtransactions=True):
port = super(OVSQuantumPluginV2, self).get_port(context,
id,
fields)
return self._check_portbindings_view_auth(context, port)
def get_ports(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
res_ports = []
with context.session.begin(subtransactions=True):
ports = super(OVSQuantumPluginV2,
self).get_ports(context, filters, fields, sorts,
limit, marker, page_reverse)
for port in ports:
self._check_portbindings_view_auth(context, port)
res_ports.append(port)
return res_ports
return port
def update_port(self, context, id, port):
session = context.session
@ -594,7 +564,7 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
binding.network_type,
binding.segmentation_id,
binding.physical_network)
return self._check_portbindings_view_auth(context, updated_port)
return updated_port
def delete_port(self, context, id, l3_port_check=True):

View File

@ -49,7 +49,7 @@ def init():
if not _POLICY_PATH:
_POLICY_PATH = utils.find_config_file({}, cfg.CONF.policy_file)
if not _POLICY_PATH:
raise exceptions.PolicyNotFound(path=cfg.CONF.policy_file)
raise exceptions.PolicyFileNotFound(path=cfg.CONF.policy_file)
# pass _set_brain to read_cached_file so that the policy brain
# is reset only if the file has changed
utils.read_cached_file(_POLICY_PATH, _POLICY_CACHE,
@ -65,6 +65,8 @@ def get_resource_and_action(action):
def _set_rules(data):
default_rule = 'default'
LOG.debug(_("loading policies from file: %s"), _POLICY_PATH)
# TODO(salvatore-orlando): Ensure backward compatibility with
# folsom/grizzly style for extension rules (bp/make-authz-orthogonal)
policy.set_rules(policy.Rules.load_json(data, default_rule))
@ -110,6 +112,7 @@ def _build_match_rule(action, target):
match_rule = policy.RuleCheck('rule', action)
resource, is_write = get_resource_and_action(action)
# Attribute-based checks shall not be enforced on GETs
if is_write:
# assigning to variable with short name for improving readability
res_map = attributes.RESOURCE_ATTRIBUTE_MAP
@ -160,6 +163,20 @@ class FieldCheck(policy.Check):
return target_value == self.value
def _prepare_check(context, action, target, plugin=None):
"""Prepare rule, target, and credentials for the policy engine."""
init()
# Compare with None to distinguish case in which target is {}
if target is None:
target = {}
# Update target only if plugin is provided
if plugin:
target = _build_target(action, target, plugin, context)
match_rule = _build_match_rule(action, target)
credentials = context.to_dict()
return match_rule, target, credentials
def check(context, action, target, plugin=None):
"""Verifies that the action is valid on the target in this context.
@ -174,14 +191,23 @@ def check(context, action, target, plugin=None):
:return: Returns True if access is permitted else False.
"""
init()
# Compare with None to distinguish case in which target is {}
if target is None:
target = {}
real_target = _build_target(action, target, plugin, context)
match_rule = _build_match_rule(action, real_target)
credentials = context.to_dict()
return policy.check(match_rule, real_target, credentials)
return policy.check(*(_prepare_check(context, action, target, plugin)))
def check_if_exists(context, action, target):
"""Verify if the action can be authorized, and raise if it is unknown.
Check whether the action can be performed on the target within this
context, and raise a PolicyRuleNotFound exception if the action is
not defined in the policy engine.
"""
# TODO(salvatore-orlando): Consider modifying oslo policy engine in
# order to allow to raise distinct exception when check fails and
# when policy is missing
# Raise if there's no match for requested action in the policy engine
if not policy._rules or action not in policy._rules:
raise exceptions.PolicyRuleNotFound(rule=action)
return policy.check(*(_prepare_check(context, action, target)))
def enforce(context, action, target, plugin=None):

View File

@ -48,19 +48,9 @@ class TestHyperVVirtualSwitchV2HTTPResponse(
class TestHyperVVirtualSwitchPortsV2(
test_plugin.TestPortsV2, HyperVQuantumPluginTestCase):
def test_port_vif_details(self):
plugin = QuantumManager.get_plugin()
with self.port(name='name') as port:
port_id = port['port']['id']
self.assertEqual(port['port']['binding:vif_type'],
portbindings.VIF_TYPE_HYPERV)
# By default user is admin - now test non admin user
ctx = context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
non_admin_port = plugin.get_port(ctx, port_id)
self.assertTrue('status' in non_admin_port)
self.assertFalse('binding:vif_type' in non_admin_port)
def test_ports_vif_details(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
@ -72,16 +62,6 @@ class TestHyperVVirtualSwitchPortsV2(
for port in ports:
self.assertEqual(port['binding:vif_type'],
portbindings.VIF_TYPE_HYPERV)
# By default user is admin - now test non admin user
ctx = context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
ports = plugin.get_ports(ctx)
self.assertEqual(len(ports), 2)
for non_admin_port in ports:
self.assertTrue('status' in non_admin_port)
self.assertFalse('binding:vif_type' in non_admin_port)
class TestHyperVVirtualSwitchNetworksV2(

View File

@ -13,9 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from quantum import context
from quantum.manager import QuantumManager
from quantum.plugins.mlnx.common import constants
from quantum.tests.unit import _test_extension_portbindings as test_bindings
from quantum.tests.unit import test_db_plugin as test_plugin
PLUGIN_NAME = ('quantum.plugins.mlnx.mlnx_plugin.MellanoxEswitchPlugin')
@ -39,24 +38,14 @@ class TestMlnxV2HTTPResponse(test_plugin.TestV2HTTPResponse,
class TestMlnxPortsV2(test_plugin.TestPortsV2,
MlnxPluginV2TestCase):
VIF_TYPE = constants.VIF_TYPE_DIRECT
HAS_PORT_FILTER = False
def test_port_vif_details(self):
plugin = QuantumManager.get_plugin()
with self.port(name='name') as port:
port_id = port['port']['id']
self.assertEqual(port['port']['binding:vif_type'],
self.VIF_TYPE)
# By default user is admin - now test non admin user
ctx = context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
non_admin_port = plugin.get_port(ctx, port_id)
self.assertIn('status', non_admin_port)
self.assertNotIn('binding:vif_type', non_admin_port)
pass
class TestMlnxNetworksV2(test_plugin.TestNetworksV2, MlnxPluginV2TestCase):
pass
class TestMlnxPortBinding(MlnxPluginV2TestCase,
test_bindings.PortBindingsTestCase):
VIF_TYPE = constants.VIF_TYPE_DIRECT
HAS_PORT_FILTER = False

View File

@ -22,6 +22,7 @@ import webtest
from quantum.api import extensions
from quantum.api.extensions import PluginAwareExtensionManager
from quantum.api.v2 import attributes
from quantum.common import config
from quantum.common.test_lib import test_config
from quantum import context
@ -43,6 +44,12 @@ _get_path = test_api_v2._get_path
class TestExtensionManager(object):
def get_resources(self):
# Add the resources to the global attribute map
# This is done here as the setup process won't
# initialize the main API router which extends
# the global attribute map
attributes.RESOURCE_ATTRIBUTE_MAP.update(
networkgw.RESOURCE_ATTRIBUTE_MAP)
return networkgw.Nvp_networkgw.get_resources()
def get_actions(self):

View File

@ -757,7 +757,7 @@ class TestNiciraQoSQueue(NiciraPluginV2TestCase):
quantum_context = context.Context('', 'not_admin')
port = self._update('ports', port['port']['id'], data,
quantum_context=quantum_context)
self.assertEqual(ext_qos.QUEUE not in port['port'], True)
self.assertFalse(ext_qos.QUEUE in port['port'])
def test_rxtx_factor(self):
with self.qos_queue(max=10) as q1:

View File

@ -21,11 +21,13 @@ from webob import exc
from quantum.api import extensions
from quantum.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from quantum.api.v2 import attributes
from quantum.common import constants
from quantum import context
from quantum.db import agents_db
from quantum.db import dhcp_rpc_base
from quantum.db import l3_rpc_base
from quantum.extensions import agent
from quantum.extensions import agentscheduler
from quantum import manager
from quantum.openstack.common import timeutils
@ -179,10 +181,10 @@ class AgentSchedulerTestMixIn(object):
def _get_agent_id(self, agent_type, host):
agents = self._list_agents()
for agent in agents['agents']:
if (agent['agent_type'] == agent_type and
agent['host'] == host):
return agent['id']
for agent_data in agents['agents']:
if (agent_data['agent_type'] == agent_type and
agent_data['host'] == host):
return agent_data['id']
class OvsAgentSchedulerTestCase(test_l3_plugin.L3NatTestCaseMixin,
@ -194,12 +196,27 @@ class OvsAgentSchedulerTestCase(test_l3_plugin.L3NatTestCaseMixin,
'ovs_quantum_plugin.OVSQuantumPluginV2')
def setUp(self):
# Save the global RESOURCE_ATTRIBUTE_MAP
self.saved_attr_map = {}
for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
self.saved_attr_map[resource] = attrs.copy()
super(OvsAgentSchedulerTestCase, self).setUp(self.plugin_str)
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
self.adminContext = context.get_admin_context()
# Add the resources to the global attribute map
# This is done here as the setup process won't
# initialize the main API router which extends
# the global attribute map
attributes.RESOURCE_ATTRIBUTE_MAP.update(
agent.RESOURCE_ATTRIBUTE_MAP)
self.addCleanup(self.restore_attribute_map)
self.agentscheduler_dbMinxin = manager.QuantumManager.get_plugin()
def restore_attribute_map(self):
# Restore the original RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
def test_report_states(self):
self._register_agent_states()
agents = self._list_agents()
@ -757,11 +774,27 @@ class OvsDhcpAgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin,
'DhcpAgentNotifyAPI')
self.dhcp_notifier_cls = self.dhcp_notifier_cls_p.start()
self.dhcp_notifier_cls.return_value = self.dhcp_notifier
# Save the global RESOURCE_ATTRIBUTE_MAP
self.saved_attr_map = {}
for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
self.saved_attr_map[resource] = attrs.copy()
super(OvsDhcpAgentNotifierTestCase, self).setUp(self.plugin_str)
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
self.adminContext = context.get_admin_context()
# Add the resources to the global attribute map
# This is done here as the setup process won't
# initialize the main API router which extends
# the global attribute map
attributes.RESOURCE_ATTRIBUTE_MAP.update(
agent.RESOURCE_ATTRIBUTE_MAP)
self.agentscheduler_dbMinxin = manager.QuantumManager.get_plugin()
self.addCleanup(self.dhcp_notifier_cls_p.stop)
self.addCleanup(self.restore_attribute_map)
def restore_attribute_map(self):
# Restore the original RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
def test_network_add_to_dhcp_agent_notification(self):
with mock.patch.object(self.dhcp_notifier, 'cast') as mock_dhcp:
@ -855,11 +888,27 @@ class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin,
self.dhcp_notifier = mock.Mock(name='dhcp_notifier')
self.dhcp_notifier_cls = self.dhcp_notifier_cls_p.start()
self.dhcp_notifier_cls.return_value = self.dhcp_notifier
# Save the global RESOURCE_ATTRIBUTE_MAP
self.saved_attr_map = {}
for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
self.saved_attr_map[resource] = attrs.copy()
super(OvsL3AgentNotifierTestCase, self).setUp(self.plugin_str)
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
self.adminContext = context.get_admin_context()
# Add the resources to the global attribute map
# This is done here as the setup process won't
# initialize the main API router which extends
# the global attribute map
attributes.RESOURCE_ATTRIBUTE_MAP.update(
agent.RESOURCE_ATTRIBUTE_MAP)
self.agentscheduler_dbMinxin = manager.QuantumManager.get_plugin()
self.addCleanup(self.dhcp_notifier_cls_p.stop)
self.addCleanup(self.restore_attribute_map)
def restore_attribute_map(self):
# Restore the original RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
def test_router_add_to_l3_agent_notification(self):
plugin = manager.QuantumManager.get_plugin()

View File

@ -21,6 +21,7 @@ import time
from oslo.config import cfg
from webob import exc
from quantum.api.v2 import attributes
from quantum.common import constants
from quantum.common.test_lib import test_config
from quantum.common import topics
@ -48,6 +49,12 @@ DHCP_HOSTC = 'hostc'
class AgentTestExtensionManager(object):
def get_resources(self):
# Add the resources to the global attribute map
# This is done here as the setup process won't
# initialize the main API router which extends
# the global attribute map
attributes.RESOURCE_ATTRIBUTE_MAP.update(
agent.RESOURCE_ATTRIBUTE_MAP)
return agent.Agent.get_resources()
def get_actions(self):
@ -128,10 +135,20 @@ class AgentDBTestCase(AgentDBTestMixIn,
'quantum.tests.unit.test_agent_ext_plugin.TestAgentPlugin')
# for these tests we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
# Save the original RESOURCE_ATTRIBUTE_MAP
self.saved_attr_map = {}
for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
self.saved_attr_map[resource] = attrs.copy()
ext_mgr = AgentTestExtensionManager()
test_config['extension_manager'] = ext_mgr
self.addCleanup(self.restore_resource_attribute_map)
self.addCleanup(cfg.CONF.reset)
super(AgentDBTestCase, self).setUp()
def restore_resource_attribute_map(self):
# Restore the originak RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
def test_create_agent(self):
data = {'agent': {}}
_req = self.new_create_request('agents', data, self.fmt)

View File

@ -34,6 +34,7 @@ from quantum.common import exceptions as q_exc
from quantum import context
from quantum.manager import QuantumManager
from quantum.openstack.common.notifier import api as notifer_api
from quantum.openstack.common import policy as common_policy
from quantum.openstack.common import uuidutils
from quantum.tests import base
from quantum.tests.unit import testlib_api
@ -1033,6 +1034,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
extra_environ=env,
expect_errors=expect_errors)
self.assertEqual(res.status_int, expected_code)
return res
def test_get_noauth(self):
self._test_get(None, _uuid(), 200)
@ -1050,6 +1052,18 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
tenant_id = _uuid()
self._test_get(tenant_id + "another", tenant_id, 200)
def test_get_keystone_strip_admin_only_attribute(self):
tenant_id = _uuid()
# Inject rule in policy engine
common_policy._rules['get_network:name'] = common_policy.parse_rule(
"rule:admin_only")
res = self._test_get(tenant_id, tenant_id, 200)
res = self.deserialize(res)
try:
self.assertNotIn('name', res['network'])
finally:
del common_policy._rules['get_network:name']
def _test_update(self, req_tenant_id, real_tenant_id, expected_code,
expect_errors=False):
env = {}
@ -1209,7 +1223,7 @@ class V2Views(base.BaseTestCase):
data['fake'] = 'value'
attr_info = attributes.RESOURCE_ATTRIBUTE_MAP[collection]
controller = v2_base.Controller(None, collection, resource, attr_info)
res = controller._view(data)
res = controller._view(context.get_admin_context(), data)
self.assertTrue('fake' not in res)
for key in keys:
self.assertTrue(key in res)

View File

@ -24,6 +24,7 @@ import webob.exc as webexc
import quantum
from quantum.api import extensions
from quantum.api.v2 import attributes
from quantum.common import config
from quantum import manager
from quantum.plugins.common import constants
@ -93,7 +94,23 @@ class ExtensionExtendedAttributeTestCase(base.BaseTestCase):
self._api = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr)
self._tenant_id = "8c70909f-b081-452d-872b-df48e6c355d1"
# Save the global RESOURCE_ATTRIBUTE_MAP
self.saved_attr_map = {}
for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
self.saved_attr_map[resource] = attrs.copy()
# Add the resources to the global attribute map
# This is done here as the setup process won't
# initialize the main API router which extends
# the global attribute map
attributes.RESOURCE_ATTRIBUTE_MAP.update(
extattr.EXTENDED_ATTRIBUTES_2_0)
self.agentscheduler_dbMinxin = manager.QuantumManager.get_plugin()
self.addCleanup(cfg.CONF.reset)
self.addCleanup(self.restore_attribute_map)
def restore_attribute_map(self):
# Restore the original RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
def _do_request(self, method, path, data=None, params=None, action=None):
content_type = 'application/json'

View File

@ -364,4 +364,9 @@ class TestPortSecurity(PortSecurityDBTestCase):
req.environ['quantum.context'] = context.Context(
'', 'not_network_owner')
res = req.get_response(self.api)
self.assertEqual(res.status_int, 403)
# TODO(salvatore-orlando): Expected error is 404 because
# the current API controller always returns this error
# code for any policy check failures on update.
# It should be 404 when the caller cannot access the whole
# resource, and 403 when it cannot access a single attribute
self.assertEqual(res.status_int, 404)

View File

@ -40,6 +40,12 @@ def etcdir(*p):
class SecurityGroupTestExtensionManager(object):
def get_resources(self):
# Add the resources to the global attribute map
# This is done here as the setup process won't
# initialize the main API router which extends
# the global attribute map
attr.RESOURCE_ATTRIBUTE_MAP.update(
ext_sg.RESOURCE_ATTRIBUTE_MAP)
return ext_sg.Securitygroup.get_resources()
def get_actions(self):

View File

@ -59,6 +59,12 @@ _get_path = test_api_v2._get_path
class L3TestExtensionManager(object):
def get_resources(self):
# Add the resources to the global attribute map
# This is done here as the setup process won't
# initialize the main API router which extends
# the global attribute map
attributes.RESOURCE_ATTRIBUTE_MAP.update(
l3.RESOURCE_ATTRIBUTE_MAP)
return l3.L3.get_resources()
def get_actions(self):

View File

@ -22,6 +22,7 @@ from webob import exc
import webtest
from quantum.api import extensions
from quantum.api.v2 import attributes
from quantum.common import config
from quantum.extensions import loadbalancer
from quantum import manager
@ -39,6 +40,12 @@ _get_path = test_api_v2._get_path
class LoadBalancerTestExtensionManager(object):
def get_resources(self):
# Add the resources to the global attribute map
# This is done here as the setup process won't
# initialize the main API router which extends
# the global attribute map
attributes.RESOURCE_ATTRIBUTE_MAP.update(
loadbalancer.RESOURCE_ATTRIBUTE_MAP)
return loadbalancer.Loadbalancer.get_resources()
def get_actions(self):

View File

@ -103,6 +103,12 @@ class PolicyTestCase(base.BaseTestCase):
result = policy.check(self.context, action, self.target)
self.assertEqual(result, False)
def test_check_if_exists_non_existent_action_raises(self):
action = "example:idonotexist"
self.assertRaises(exceptions.PolicyRuleNotFound,
policy.check_if_exists,
self.context, action, self.target)
def test_enforce_good_action(self):
action = "example:allowed"
result = policy.enforce(self.context, action, self.target)

View File

@ -26,6 +26,7 @@ import webob.exc as webexc
import webtest
from quantum.api import extensions
from quantum.api.v2 import attributes
from quantum import context
from quantum.db import api as db_api
from quantum.db import servicetype_db
@ -51,6 +52,13 @@ class TestServiceTypeExtensionManager(object):
"""Mock extensions manager."""
def get_resources(self):
# Add the resources to the global attribute map
# This is done here as the setup process won't
# initialize the main API router which extends
# the global attribute map
attributes.RESOURCE_ATTRIBUTE_MAP.update(
servicetype.RESOURCE_ATTRIBUTE_MAP)
attributes.RESOURCE_ATTRIBUTE_MAP.update(dp.RESOURCE_ATTRIBUTE_MAP)
return (servicetype.Servicetype.get_resources() +
dp.Dummy.get_resources())