AIM Policy Driver - Part 1

This is the first, in what is expected to be a series patches (not
necessarily sequential), that implements a new GBP Policy Driver. This
new Policy Driver is meant to be used in conjunction with the new
APIC mechanism driver, and the ACI Integration Module (AIM) library
that are being developed in parallel. This new Policy Driver is being
called the AIM Mapping driver.

The are at least a couple of goals that are foremost in the design of
this driver -

(1) Transactional consistency - the driver performs all orchestration
operations in the pre-commit hook that is part of the same transaction
as that which is created by the GBP plugin in response to the API call.
Apart from the implementation in this new driver, some refactoring of
the local_api module was required to optionally avoid creating a new
transaction every time the driver orchestrated changes to other parts
of the policy model, and/or Neutron resources.

(2) Asynchronous behavior - the driver will not directly interact
with external backends. As stated before, it will interface with Neutron
and/or AIM, and appropriately populate the status of the GBP resources
using the status and status_details attributes available for each GBP
resource (this does not happen in this patch).

The AIM driver attempts to reuse as much of the existing implementation
as possible. Towards this end, some parts of the Implicit Policy, and
the Resource Mapping drivers have been refactored to allow the code to
be shared and reused. The AIM driver effectively reuses these two existing
policy drivers as libraries, but is self-sufficient and should be the
only one configured.

Wherever possible, an attempt is being made to implement the mapping to
the Neutron resources in a base class, which in future, can help to build
a replacement for the current Resource Mapping driver with the same
transactional consistency goal.

A new “ensure_tenant” hook is being added to the Policy Driver interface.
This allows each driver to perform the tenant related bookkeeping prior
to proceeding with any orchestration for a resource itself. The hook is
invoked from the GBP plugin for the create operation of each resource.
This invocation happens prior to the transaction that is started to
create the resource.

The APIC mechanism driver uses a name-mapper module to map GBP resource
names to qualified names that are used by the AIM library. This AIM
policy driver holds a reference to that same name-mapper module (and
subsequently the cache that it uses) to perform name mapping for the
GBP resources.

In addition to the UTs that test the new code, specific UTs have been
added for validating the transactional consistency by testing the
rollback of created/updated/deleted resources if a downstream
operation fails.

Change-Id: I945d700c1a5e670de48d9c0d22e456e2d45f78a8
This commit is contained in:
Sumit Naiksatam
2016-03-04 00:40:43 -08:00
committed by Amit Bose
parent 6e307e0a38
commit fb3aa33f04
18 changed files with 1675 additions and 559 deletions

View File

@@ -34,6 +34,15 @@ from gbpservice.neutron.services.grouppolicy.common import exceptions as exc
LOG = logging.getLogger(__name__)
class dummy_context_mgr(object):
def __enter__(self):
return None
def __exit__(self, exc_type, exc_value, traceback):
return False
class LocalAPI(object):
"""API for interacting with the neutron Plugins directly."""
@@ -92,10 +101,11 @@ class LocalAPI(object):
return self._cached_agent_notifier
def _create_resource(self, plugin, context, resource, attrs,
do_notify=True):
do_notify=True, clean_session=True):
# REVISIT(rkukura): Do create.start notification?
# REVISIT(rkukura): Check authorization?
with utils.clean_session(context.session):
with utils.clean_session(context.session) if clean_session else (
dummy_context_mgr()):
reservation = None
if plugin in [self._group_policy_plugin,
self._servicechain_plugin]:
@@ -134,10 +144,11 @@ class LocalAPI(object):
return obj
def _update_resource(self, plugin, context, resource, resource_id, attrs,
do_notify=True):
do_notify=True, clean_session=True):
# REVISIT(rkukura): Do update.start notification?
# REVISIT(rkukura): Check authorization?
with utils.clean_session(context.session):
with utils.clean_session(context.session) if clean_session else (
dummy_context_mgr()):
obj_getter = getattr(plugin, 'get_' + resource)
orig_obj = obj_getter(context, resource_id)
action = 'update_' + resource
@@ -154,10 +165,11 @@ class LocalAPI(object):
return obj
def _delete_resource(self, plugin, context, resource, resource_id,
do_notify=True):
do_notify=True, clean_session=True):
# REVISIT(rkukura): Do delete.start notification?
# REVISIT(rkukura): Check authorization?
with utils.clean_session(context.session):
with utils.clean_session(context.session) if clean_session else (
dummy_context_mgr()):
obj_getter = getattr(plugin, 'get_' + resource)
obj = obj_getter(context, resource_id)
action = 'delete_' + resource
@@ -172,14 +184,18 @@ class LocalAPI(object):
{resource: obj},
resource + '.delete.end')
def _get_resource(self, plugin, context, resource, resource_id):
with utils.clean_session(context.session):
def _get_resource(self, plugin, context, resource, resource_id,
clean_session=True):
with utils.clean_session(context.session) if clean_session else (
dummy_context_mgr()):
obj_getter = getattr(plugin, 'get_' + resource)
obj = obj_getter(context, resource_id)
return obj
def _get_resources(self, plugin, context, resource_plural, filters=None):
with utils.clean_session(context.session):
def _get_resources(self, plugin, context, resource_plural, filters=None,
clean_session=True):
with utils.clean_session(context.session) if clean_session else (
dummy_context_mgr()):
obj_getter = getattr(plugin, 'get_' + resource_plural)
obj = obj_getter(context, filters)
return obj
@@ -191,71 +207,80 @@ class LocalAPI(object):
# neutronclient is also a possibility, but presents significant
# issues to unit testing as well as overhead and failure modes.
def _get_port(self, plugin_context, port_id):
def _get_port(self, plugin_context, port_id, clean_session=True):
return self._get_resource(self._core_plugin, plugin_context, 'port',
port_id)
port_id, clean_session=clean_session)
def _get_ports(self, plugin_context, filters=None):
def _get_ports(self, plugin_context, filters=None, clean_session=True):
filters = filters or {}
return self._get_resources(self._core_plugin, plugin_context, 'ports',
filters)
filters, clean_session=clean_session)
def _create_port(self, plugin_context, attrs):
def _create_port(self, plugin_context, attrs, clean_session=True):
return self._create_resource(self._core_plugin, plugin_context, 'port',
attrs)
attrs, clean_session=clean_session)
def _update_port(self, plugin_context, port_id, attrs):
def _update_port(self, plugin_context, port_id, attrs, clean_session=True):
return self._update_resource(self._core_plugin, plugin_context, 'port',
port_id, attrs)
port_id, attrs,
clean_session=clean_session)
def _delete_port(self, plugin_context, port_id):
def _delete_port(self, plugin_context, port_id, clean_session=True):
try:
self._delete_resource(self._core_plugin,
plugin_context, 'port', port_id)
plugin_context, 'port', port_id,
clean_session=clean_session)
except n_exc.PortNotFound:
LOG.warning(_LW('Port %s already deleted'), port_id)
def _get_subnet(self, plugin_context, subnet_id):
def _get_subnet(self, plugin_context, subnet_id, clean_session=True):
return self._get_resource(self._core_plugin, plugin_context, 'subnet',
subnet_id)
subnet_id, clean_session=clean_session)
def _get_subnets(self, plugin_context, filters=None):
def _get_subnets(self, plugin_context, filters=None, clean_session=True):
filters = filters or {}
return self._get_resources(self._core_plugin, plugin_context,
'subnets', filters)
'subnets', filters,
clean_session=clean_session)
def _create_subnet(self, plugin_context, attrs):
def _create_subnet(self, plugin_context, attrs, clean_session=True):
return self._create_resource(self._core_plugin, plugin_context,
'subnet', attrs)
'subnet', attrs,
clean_session=clean_session)
def _update_subnet(self, plugin_context, subnet_id, attrs):
def _update_subnet(self, plugin_context, subnet_id, attrs,
clean_session=True):
return self._update_resource(self._core_plugin, plugin_context,
'subnet', subnet_id, attrs)
'subnet', subnet_id, attrs,
clean_session=clean_session)
def _delete_subnet(self, plugin_context, subnet_id):
def _delete_subnet(self, plugin_context, subnet_id, clean_session=True):
try:
self._delete_resource(self._core_plugin, plugin_context, 'subnet',
subnet_id)
subnet_id, clean_session=clean_session)
except n_exc.SubnetNotFound:
LOG.warning(_LW('Subnet %s already deleted'), subnet_id)
def _get_network(self, plugin_context, network_id):
def _get_network(self, plugin_context, network_id, clean_session=True):
return self._get_resource(self._core_plugin, plugin_context, 'network',
network_id)
network_id, clean_session=clean_session)
def _get_networks(self, plugin_context, filters=None):
def _get_networks(self, plugin_context, filters=None, clean_session=True):
filters = filters or {}
return self._get_resources(
self._core_plugin, plugin_context, 'networks', filters)
self._core_plugin, plugin_context, 'networks', filters,
clean_session=clean_session)
def _create_network(self, plugin_context, attrs):
def _create_network(self, plugin_context, attrs, clean_session=True):
return self._create_resource(self._core_plugin, plugin_context,
'network', attrs)
'network', attrs, True,
clean_session=clean_session)
def _delete_network(self, plugin_context, network_id):
def _delete_network(self, plugin_context, network_id, clean_session=True):
try:
self._delete_resource(self._core_plugin, plugin_context,
'network', network_id)
'network', network_id,
clean_session=clean_session)
except n_exc.NetworkNotFound:
LOG.warning(_LW('Network %s already deleted'), network_id)
@@ -323,57 +348,66 @@ class LocalAPI(object):
except l3.RouterNotFound:
LOG.warning(_LW('Router %s already deleted'), router_id)
def _get_sg(self, plugin_context, sg_id):
def _get_sg(self, plugin_context, sg_id, clean_session=True):
return self._get_resource(
self._core_plugin, plugin_context, 'security_group', sg_id)
self._core_plugin, plugin_context, 'security_group', sg_id,
clean_session=clean_session)
def _get_sgs(self, plugin_context, filters=None):
def _get_sgs(self, plugin_context, filters=None, clean_session=True):
filters = filters or {}
return self._get_resources(
self._core_plugin, plugin_context, 'security_groups', filters)
self._core_plugin, plugin_context, 'security_groups', filters,
clean_session=clean_session)
def _create_sg(self, plugin_context, attrs):
def _create_sg(self, plugin_context, attrs, clean_session=True):
return self._create_resource(self._core_plugin, plugin_context,
'security_group', attrs)
'security_group', attrs,
clean_session=clean_session)
def _update_sg(self, plugin_context, sg_id, attrs):
def _update_sg(self, plugin_context, sg_id, attrs, clean_session=True):
return self._update_resource(self._core_plugin, plugin_context,
'security_group', sg_id, attrs)
'security_group', sg_id, attrs,
clean_session=clean_session)
def _delete_sg(self, plugin_context, sg_id):
def _delete_sg(self, plugin_context, sg_id, clean_session=True):
try:
self._delete_resource(self._core_plugin, plugin_context,
'security_group', sg_id)
'security_group', sg_id,
clean_session=clean_session)
except ext_sg.SecurityGroupNotFound:
LOG.warning(_LW('Security Group %s already deleted'), sg_id)
def _get_sg_rule(self, plugin_context, sg_rule_id):
def _get_sg_rule(self, plugin_context, sg_rule_id, clean_session=True):
return self._get_resource(
self._core_plugin, plugin_context, 'security_group_rule',
sg_rule_id)
sg_rule_id, clean_session=clean_session)
def _get_sg_rules(self, plugin_context, filters=None):
def _get_sg_rules(self, plugin_context, filters=None, clean_session=True):
filters = filters or {}
return self._get_resources(
self._core_plugin, plugin_context, 'security_group_rules', filters)
self._core_plugin, plugin_context, 'security_group_rules', filters,
clean_session=clean_session)
def _create_sg_rule(self, plugin_context, attrs):
def _create_sg_rule(self, plugin_context, attrs, clean_session=True):
try:
return self._create_resource(self._core_plugin, plugin_context,
'security_group_rule', attrs)
'security_group_rule', attrs,
clean_session=clean_session)
except ext_sg.SecurityGroupRuleExists as ex:
LOG.warning(_LW('Security Group already exists %s'), ex.message)
return
def _update_sg_rule(self, plugin_context, sg_rule_id, attrs):
def _update_sg_rule(self, plugin_context, sg_rule_id, attrs,
clean_session=True):
return self._update_resource(self._core_plugin, plugin_context,
'security_group_rule', sg_rule_id,
attrs)
attrs, clean_session=clean_session)
def _delete_sg_rule(self, plugin_context, sg_rule_id):
def _delete_sg_rule(self, plugin_context, sg_rule_id, clean_session=True):
try:
self._delete_resource(self._core_plugin, plugin_context,
'security_group_rule', sg_rule_id)
'security_group_rule', sg_rule_id,
clean_session=clean_session)
except ext_sg.SecurityGroupRuleNotFound:
LOG.warning(_LW('Security Group Rule %s already deleted'),
sg_rule_id)
@@ -402,51 +436,65 @@ class LocalAPI(object):
except l3.FloatingIPNotFound:
LOG.warning(_LW('Floating IP %s Already deleted'), fip_id)
def _get_l2_policy(self, plugin_context, l2p_id):
def _get_l2_policy(self, plugin_context, l2p_id, clean_session=True):
return self._get_resource(self._group_policy_plugin, plugin_context,
'l2_policy', l2p_id)
'l2_policy', l2p_id,
clean_session=clean_session)
def _get_l2_policies(self, plugin_context, filters=None):
def _get_l2_policies(self, plugin_context, filters=None,
clean_session=True):
filters = filters or {}
return self._get_resources(self._group_policy_plugin, plugin_context,
'l2_policies', filters)
'l2_policies', filters,
clean_session=clean_session)
def _create_l2_policy(self, plugin_context, attrs):
def _create_l2_policy(self, plugin_context, attrs, clean_session=True):
return self._create_resource(self._group_policy_plugin, plugin_context,
'l2_policy', attrs, False)
'l2_policy', attrs, False,
clean_session=clean_session)
def _update_l2_policy(self, plugin_context, l2p_id, attrs):
def _update_l2_policy(self, plugin_context, l2p_id, attrs,
clean_session=True):
return self._update_resource(self._group_policy_plugin, plugin_context,
'l2_policy', l2p_id, attrs, False)
'l2_policy', l2p_id, attrs, False,
clean_session=clean_session)
def _delete_l2_policy(self, plugin_context, l2p_id):
def _delete_l2_policy(self, plugin_context, l2p_id, clean_session=True):
try:
self._delete_resource(self._group_policy_plugin,
plugin_context, 'l2_policy', l2p_id, False)
plugin_context, 'l2_policy', l2p_id, False,
clean_session=clean_session)
except gp_ext.L2PolicyNotFound:
LOG.warning(_LW('L2 Policy %s already deleted'), l2p_id)
def _get_l3_policy(self, plugin_context, l3p_id):
def _get_l3_policy(self, plugin_context, l3p_id, clean_session=True):
return self._get_resource(self._group_policy_plugin, plugin_context,
'l3_policy', l3p_id)
'l3_policy', l3p_id,
clean_session=clean_session)
def _get_l3_policies(self, plugin_context, filters=None):
def _get_l3_policies(self, plugin_context, filters=None,
clean_session=True):
filters = filters or {}
return self._get_resources(self._group_policy_plugin, plugin_context,
'l3_policies', filters)
'l3_policies', filters,
clean_session=clean_session)
def _create_l3_policy(self, plugin_context, attrs):
def _create_l3_policy(self, plugin_context, attrs, clean_session=True):
return self._create_resource(self._group_policy_plugin, plugin_context,
'l3_policy', attrs, False)
'l3_policy', attrs, False,
clean_session=clean_session)
def _update_l3_policy(self, plugin_context, l3p_id, attrs):
def _update_l3_policy(self, plugin_context, l3p_id, attrs,
clean_session=True):
return self._update_resource(self._group_policy_plugin, plugin_context,
'l3_policy', l3p_id, attrs, False)
'l3_policy', l3p_id, attrs, False,
clean_session=clean_session)
def _delete_l3_policy(self, plugin_context, l3p_id):
def _delete_l3_policy(self, plugin_context, l3p_id, clean_session=True):
try:
self._delete_resource(self._group_policy_plugin,
plugin_context, 'l3_policy', l3p_id, False)
plugin_context, 'l3_policy', l3p_id, False,
clean_session=clean_session)
except gp_ext.L3PolicyNotFound:
LOG.warning(_LW('L3 Policy %s already deleted'), l3p_id)
@@ -571,51 +619,67 @@ class LocalAPI(object):
except sc_ext.ServiceChainSpecNotFound:
LOG.warning(_LW("servicechain spec %s already deleted"), scs_id)
def _get_policy_target(self, plugin_context, pt_id):
def _get_policy_target(self, plugin_context, pt_id, clean_session=True):
return self._get_resource(self._group_policy_plugin, plugin_context,
'policy_target', pt_id)
'policy_target', pt_id,
clean_session=clean_session)
def _get_policy_targets(self, plugin_context, filters=None):
def _get_policy_targets(self, plugin_context, filters=None,
clean_session=True):
filters = filters or {}
return self._get_resources(self._group_policy_plugin, plugin_context,
'policy_targets', filters)
'policy_targets', filters,
clean_session=clean_session)
def _create_policy_target(self, plugin_context, attrs):
def _create_policy_target(self, plugin_context, attrs, clean_session=True):
return self._create_resource(self._group_policy_plugin, plugin_context,
'policy_target', attrs, False)
'policy_target', attrs, False,
clean_session=clean_session)
def _update_policy_target(self, plugin_context, pt_id, attrs):
def _update_policy_target(self, plugin_context, pt_id, attrs,
clean_session=True):
return self._update_resource(self._group_policy_plugin, plugin_context,
'policy_target', pt_id, attrs, False)
'policy_target', pt_id, attrs, False,
clean_session=clean_session)
def _delete_policy_target(self, plugin_context, pt_id):
def _delete_policy_target(self, plugin_context, pt_id, clean_session=True):
try:
self._delete_resource(self._group_policy_plugin, plugin_context,
'policy_target', pt_id, False)
'policy_target', pt_id, False,
clean_session=clean_session)
except gp_ext.PolicyTargetNotFound:
LOG.warning(_LW('Policy Rule Set %s already deleted'), pt_id)
def _get_policy_target_group(self, plugin_context, ptg_id):
def _get_policy_target_group(self, plugin_context, ptg_id,
clean_session=True):
return self._get_resource(self._group_policy_plugin, plugin_context,
'policy_target_group', ptg_id)
'policy_target_group', ptg_id,
clean_session=clean_session)
def _get_policy_target_groups(self, plugin_context, filters=None):
def _get_policy_target_groups(self, plugin_context, filters=None,
clean_session=True):
filters = filters or {}
return self._get_resources(self._group_policy_plugin, plugin_context,
'policy_target_groups', filters)
'policy_target_groups', filters,
clean_session=clean_session)
def _create_policy_target_group(self, plugin_context, attrs):
def _create_policy_target_group(self, plugin_context, attrs,
clean_session=True):
return self._create_resource(self._group_policy_plugin, plugin_context,
'policy_target_group', attrs, False)
'policy_target_group', attrs, False,
clean_session=clean_session)
def _update_policy_target_group(self, plugin_context, ptg_id, attrs):
def _update_policy_target_group(self, plugin_context, ptg_id, attrs,
clean_session=True):
return self._update_resource(self._group_policy_plugin, plugin_context,
'policy_target_group', ptg_id, attrs,
False)
False, clean_session=clean_session)
def _delete_policy_target_group(self, plugin_context, ptg_id):
def _delete_policy_target_group(self, plugin_context, ptg_id,
clean_session=True):
try:
self._delete_resource(self._group_policy_plugin, plugin_context,
'policy_target_group', ptg_id)
'policy_target_group', ptg_id,
clean_session=clean_session)
except sc_ext.ServiceChainSpecNotFound:
LOG.warning(_LW("Policy Target Group %s already deleted"), ptg_id)

View File

@@ -156,6 +156,16 @@ class GroupPolicyMappingDbPlugin(gpdb.GroupPolicyDbPlugin):
for assoc in assocs:
context.session.delete(assoc)
def _remove_subnets_from_policy_target_group(self, context, ptg_id):
with context.session.begin(subtransactions=True):
ptg_db = self._get_policy_target_group(context, ptg_id)
assocs = (context.session.query(PTGToSubnetAssociation).
filter_by(policy_target_group_id=ptg_id).all())
ptg_db.update({'subnets': []})
for assoc in assocs:
context.session.delete(assoc)
return []
def _set_network_for_l2_policy(self, context, l2p_id, network_id):
with context.session.begin(subtransactions=True):
l2p_db = self._get_l2_policy(context, l2p_id)

View File

@@ -23,6 +23,13 @@ LOG = None
NAME_TYPE_TENANT = 'tenant'
NAME_TYPE_NETWORK = 'network'
NAME_TYPE_POLICY_TARGET_GROUP = 'policy_target_group'
NAME_TYPE_L3_POLICY = 'l3_policy'
NAME_TYPE_L2_POLICY = 'l2_policy'
NAME_TYPE_POLICY_RULE_SET = 'policy_rule_set'
NAME_TYPE_POLICY_RULE = 'policy_rule'
NAME_TYPE_EXTERNAL_SEGMENT = 'external_segment'
NAME_TYPE_EXTERNAL_POLICY = 'external_policy'
NAME_TYPE_NAT_POOL = 'nat_pool'
MAX_APIC_NAME_LENGTH = 46
@@ -129,5 +136,47 @@ class APICNameMapper(object):
policy_target_group_name=None):
return policy_target_group_name
@mapper(NAME_TYPE_L3_POLICY)
def l3_policy(self, context, l3_policy_id):
l3_policy = context._plugin.get_l3_policy(context._plugin_context,
l3_policy_id)
return l3_policy['name']
@mapper(NAME_TYPE_L2_POLICY)
def l2_policy(self, context, l2_policy_id):
l2_policy = context._plugin.get_l2_policy(context._plugin_context,
l2_policy_id)
return l2_policy['name']
@mapper(NAME_TYPE_POLICY_RULE_SET)
def policy_rule_set(self, context, policy_rule_set_id):
policy_rule_set = context._plugin.get_policy_rule_set(
context._plugin_context, policy_rule_set_id)
return policy_rule_set['name']
@mapper(NAME_TYPE_POLICY_RULE)
def policy_rule(self, context, policy_rule_id):
policy_rule = context._plugin.get_policy_rule(context._plugin_context,
policy_rule_id)
return policy_rule['name']
@mapper(NAME_TYPE_EXTERNAL_SEGMENT)
def external_segment(self, context, external_segment_id):
external_segment = context._plugin.get_external_segment(
context._plugin_context, external_segment_id)
return external_segment['name']
@mapper(NAME_TYPE_EXTERNAL_POLICY)
def external_policy(self, context, external_policy_id):
external_policy = context._plugin.get_external_policy(
context._plugin_context, external_policy_id)
return external_policy['name']
@mapper(NAME_TYPE_NAT_POOL)
def nat_pool(self, context, nat_pool_id):
nat_pool = context._plugin.get_nat_pool(context._plugin_context,
nat_pool_id)
return nat_pool['name']
def delete_apic_name(self, session, object_id):
self.db.delete_apic_name(session, object_id)

View File

@@ -21,6 +21,7 @@ from neutron.plugins.ml2 import managers as ml2_managers
from neutron.plugins.ml2 import plugin as ml2_plugin
from neutron.quota import resource_registry
from oslo_log import log
from sqlalchemy import inspect
from gbpservice.neutron.plugins.ml2plus import managers
@@ -70,6 +71,22 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin):
self._verify_service_plugins_requirements()
LOG.info(_LI("Modular L2 Plugin (extended) initialization complete"))
def _ml2_md_extend_network_dict(self, result, netdb):
session = inspect(netdb).session
with session.begin(subtransactions=True):
self.extension_manager.extend_network_dict(session, netdb, result)
def _ml2_md_extend_port_dict(self, result, portdb):
session = inspect(portdb).session
with session.begin(subtransactions=True):
self.extension_manager.extend_port_dict(session, portdb, result)
def _ml2_md_extend_subnet_dict(self, result, subnetdb):
session = inspect(subnetdb).session
with session.begin(subtransactions=True):
self.extension_manager.extend_subnet_dict(
session, subnetdb, result)
def create_network(self, context, network):
self._ensure_tenant(context, network[attributes.NETWORK])
return super(Ml2PlusPlugin, self).create_network(context, network)

View File

@@ -0,0 +1,287 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from aim import aim_manager
from aim.api import resource as aim_resource
from aim import context as aim_context
from neutron._i18n import _LI
from neutron import context as nctx
from neutron import manager
from oslo_concurrency import lockutils
from oslo_log import helpers as log
from oslo_log import log as logging
from gbpservice.neutron.extensions import group_policy as gpolicy
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import (
mechanism_driver as aim_md)
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import model
from gbpservice.neutron.services.grouppolicy.common import exceptions as gpexc
from gbpservice.neutron.services.grouppolicy.drivers import (
neutron_resources as nrd)
LOG = logging.getLogger(__name__)
APIC_OWNED = 'apic_owned_'
class ExplicitSubnetAssociationNotSupported(gpexc.GroupPolicyBadRequest):
message = _("Explicit subnet association not supported by APIC driver.")
class AIMMappingDriver(nrd.CommonNeutronBase):
"""AIM Mapping Orchestration driver.
This driver maps GBP resources to the ACI-Integration-Module (AIM).
"""
@log.log_method_call
def initialize(self):
LOG.info(_LI("APIC AIM Policy Driver initializing"))
self.db = model.DbModel()
self.aim = aim_manager.AimManager()
super(AIMMappingDriver, self).initialize()
self._apic_aim_mech_driver = None
@property
def aim_mech_driver(self):
if not self._apic_aim_mech_driver:
ml2plus_plugin = manager.NeutronManager.get_plugin()
self._apic_aim_mech_driver = (
ml2plus_plugin.mechanism_manager.mech_drivers['apic_aim'].obj)
return self._apic_aim_mech_driver
@property
def name_mapper(self):
return self.aim_mech_driver.name_mapper
def _aim_tenant_name(self, context):
session = context._plugin_context.session
tenant_id = context.current['tenant_id']
tenant_name = self.name_mapper.tenant(session, tenant_id)
LOG.info(_LI("Mapped tenant_id %(id)s to %(apic_name)s"),
{'id': tenant_id, 'apic_name': tenant_name})
return tenant_name
def _aim_endpoint_group(self, context, bd_name=None, bd_tenant_name=None):
session = context._plugin_context.session
tenant_name = self._aim_tenant_name(context)
id = context.current['id']
name = context.current['name']
epg_name = self.name_mapper.policy_target_group(session, id, name)
LOG.info(_LI("Mapped ptg_id %(id)s with name %(name)s to "
"%(apic_name)s"),
{'id': id, 'name': name, 'apic_name': epg_name})
epg = aim_resource.EndpointGroup(tenant_name=str(tenant_name),
name=str(epg_name),
app_profile_name=aim_md.AP_NAME,
bd_name=bd_name,
bd_tenant_name=bd_tenant_name)
return epg
def _aim_bridge_domain(self, context, network_id, network_name):
session = context._plugin_context.session
tenant_name = self._aim_tenant_name(context)
bd_name = self.name_mapper.network(session, network_id, network_name)
LOG.info(_LI("Mapped network_id %(id)s with name %(name)s to "
"%(apic_name)s"),
{'id': network_id, 'name': network_name,
'apic_name': bd_name})
bd = aim_resource.BridgeDomain(tenant_name=str(tenant_name),
name=str(bd_name))
return bd
def _get_l2p_subnets(self, context, l2p_id, clean_session=False):
plugin_context = context._plugin_context
l2p = context._plugin.get_l2_policy(plugin_context, l2p_id)
# REVISIT: The following should be a get_subnets call via local API
return self._core_plugin.get_subnets_by_network(
plugin_context, l2p['network_id'])
def _sync_ptg_subnets(self, context, l2p):
l2p_subnets = [x['id'] for x in
self._get_l2p_subnets(context, l2p['id'])]
ptgs = context._plugin.get_policy_target_groups(
nctx.get_admin_context(), {'l2_policy_id': [l2p['id']]})
for sub in l2p_subnets:
# Add to PTG
for ptg in ptgs:
if sub not in ptg['subnets']:
try:
(context._plugin.
_add_subnet_to_policy_target_group(
nctx.get_admin_context(), ptg['id'], sub))
except gpolicy.PolicyTargetGroupNotFound as e:
LOG.warning(e)
def _use_implicit_subnet(self, context, force_add=False,
clean_session=False):
"""Implicit subnet for AIM.
The first PTG in a L2P will allocate a new subnet from the L3P.
Any subsequent PTG in the same L2P will use the same subnet.
Additional subnets will be allocated as and when the currently used
subnet runs out of IP addresses.
"""
l2p_id = context.current['l2_policy_id']
with lockutils.lock(l2p_id, external=True):
subs = self._get_l2p_subnets(context, l2p_id)
subs = set([x['id'] for x in subs])
added = []
if not subs or force_add:
l2p = context._plugin.get_l2_policy(context._plugin_context,
l2p_id)
name = APIC_OWNED + l2p['name']
added = super(
AIMMappingDriver, self)._use_implicit_subnet(
context, subnet_specifics={'name': name},
is_proxy=False, clean_session=clean_session)
context.add_subnets(subs - set(context.current['subnets']))
for subnet in added:
self._sync_ptg_subnets(context, l2p)
@log.log_method_call
def ensure_tenant(self, plugin_context, tenant_id):
self.aim_mech_driver.ensure_tenant(plugin_context, tenant_id)
@log.log_method_call
def create_policy_target_group_precommit(self, context):
if context.current['subnets']:
raise ExplicitSubnetAssociationNotSupported()
ptg_db = context._plugin._get_policy_target_group(
context._plugin_context, context.current['id'])
session = context._plugin_context.session
if not context.current['l2_policy_id']:
self._create_implicit_l2_policy(context, clean_session=False)
ptg_db['l2_policy_id'] = l2p_id = context.current['l2_policy_id']
else:
l2p_id = context.current['l2_policy_id']
l2p_db = context._plugin._get_l2_policy(
context._plugin_context, l2p_id)
net = self._get_network(
context._plugin_context, l2p_db['network_id'],
clean_session=False)
self._use_implicit_subnet(context)
aim_ctx = aim_context.AimContext(session)
bd_name = str(self.name_mapper.network(
session, net['id'], net['name']))
bd_tenant_name = str(self._aim_tenant_name(context))
epg = self._aim_endpoint_group(context, bd_name, bd_tenant_name)
self.aim.create(aim_ctx, epg)
@log.log_method_call
def update_policy_target_group_precommit(self, context):
# TODO(Sumit): Implement
pass
@log.log_method_call
def delete_policy_target_group_precommit(self, context):
plugin_context = context._plugin_context
ptg_db = context._plugin._get_policy_target_group(
context._plugin_context, context.current['id'])
session = context._plugin_context.session
aim_ctx = aim_context.AimContext(session)
epg = self._aim_endpoint_group(context)
self.aim.delete(aim_ctx, epg)
self.name_mapper.delete_apic_name(session, context.current['id'])
# REVISIT(Sumit): Delete app_profile if this is last PTG
subnet_ids = [assoc['subnet_id'] for assoc in ptg_db['subnets']]
context._plugin._remove_subnets_from_policy_target_group(
context._plugin_context, ptg_db['id'])
if subnet_ids:
for subnet_id in subnet_ids:
if not context._plugin._get_ptgs_for_subnet(
context._plugin_context, subnet_id):
self._cleanup_subnet(plugin_context, subnet_id,
clean_session=False)
if ptg_db['l2_policy_id']:
l2p_id = ptg_db['l2_policy_id']
ptg_db.update({'l2_policy_id': None})
l2p_db = context._plugin._get_l2_policy(
context._plugin_context, l2p_id)
if not l2p_db['policy_target_groups']:
self._cleanup_l2_policy(context, l2p_id, clean_session=False)
@log.log_method_call
def create_policy_target_precommit(self, context):
if not context.current['port_id']:
ptg = context._plugin.get_policy_target_group(
context._plugin_context,
context.current['policy_target_group_id'])
subnets = self._get_subnets(
context._plugin_context, {'id': ptg['subnets']},
clean_session=False)
self._use_implicit_port(context, subnets=subnets,
clean_session=False)
@log.log_method_call
def update_policy_target_precommit(self, context):
# TODO(Sumit): Implement
pass
@log.log_method_call
def delete_policy_target_precommit(self, context):
pt_db = context._plugin._get_policy_target(
context._plugin_context, context.current['id'])
if pt_db['port_id']:
self._cleanup_port(context._plugin_context, pt_db['port_id'])
@log.log_method_call
def delete_l3_policy_precommit(self, context):
# TODO(Sumit): Implement
pass
@log.log_method_call
def create_policy_rule_precommit(self, context):
pass
# TODO(sumit): uncomment the following when AIM supports TenantFilter
# aim_context = aim_manager.AimContext(context._plugin_context.session)
# tenant = context.current['tenant_id']
# pr_id = context.current['id']
# pr_name = context.current['name']
# rn = self.mapper.tenant_filter(tenant, pr_id, name=pr_name)
# tf = aim_resource.TenantFilter(tenant_rn=tenant, rn=rn)
# self.aim.create(aim_context, tf)
# pr_db = context._plugin_context.session.query(
# gpdb.PolicyRule).get(context.current['id'])
# context._plugin_context.session.expunge(pr_db)
# TODO(sumit): uncomment the following line when the GBP resource
# is appropriately extended to hold AIM references
# pr_db['aim_id'] = rn
# context._plugin_context.session.add(pr_db)
@log.log_method_call
def delete_policy_rule_precommit(self, context):
pass
# TODO(sumit): uncomment the following when AIM supports TenantFilter
# aim_context = aim_manager.AimContext(context._plugin_context.session)
# tenant = context.current['tenant_id']
# pr_id = context.current['id']
# rn = self.mapper.tenant_filter(tenant, pr_id)
# tf = aim_resource.TenantFilter(tenant_rn=tenant, rn=rn)
# self.aim.delete(aim_context, tf)

View File

@@ -3423,7 +3423,8 @@ class ApicMappingDriver(api.ResourceMappingDriver,
entry=apic_manager.CP_ENTRY + '-' + str(x), **entry)
x += 1
def _get_l3p_allocated_subnets(self, context, l3p_id):
def _get_l3p_allocated_subnets(self, context, l3p_id,
clean_session=True):
l2ps = self._get_l2_policies(context._plugin_context,
{'l3_policy_id': [l3p_id]})
subnets = [x['cidr'] for x in

View File

@@ -78,14 +78,7 @@ class OwnedL3Policy(model_base.BASEV2):
nullable=False, primary_key=True)
class ImplicitPolicyDriver(api.PolicyDriver, local_api.LocalAPI):
"""Implicit Policy driver for Group Policy plugin.
This driver ensures that the l2_policy_id attribute of
PolicyTargetGroup references an L2Policy instance and that the
l3_policy_id attribute of L2Policy references an L3Policy instance
when the default value of None is specified.
"""
class ImplicitPolicyBase(api.PolicyDriver, local_api.LocalAPI):
@log.log_method_call
def initialize(self):
@@ -103,6 +96,135 @@ class ImplicitPolicyDriver(api.PolicyDriver, local_api.LocalAPI):
gpproxy.default_proxy_subnet_prefix_length)
self._default_es_name = gpip.default_external_segment_name
def _create_implicit_l3_policy(self, context, clean_session=True):
tenant_id = context.current['tenant_id']
filter = {'tenant_id': [tenant_id],
'name': [self._default_l3p_name]}
l3ps = self._get_l3_policies(context._plugin_context, filter,
clean_session)
l3p = l3ps and l3ps[0]
if not l3p:
attrs = {'tenant_id': tenant_id,
'name': self._default_l3p_name,
'description': _("Implicitly created L3 policy"),
'ip_version': self._default_ip_version,
'ip_pool': self._default_ip_pool,
'shared': context.current.get('shared', False),
'subnet_prefix_length':
self._default_subnet_prefix_length}
if self._proxy_group_enabled:
attrs['proxy_ip_pool'] = (
self._default_proxy_ip_pool)
attrs['proxy_subnet_prefix_length'] = (
self._default_proxy_subnet_prefix_length)
try:
l3p = self._create_l3_policy(context._plugin_context, attrs,
clean_session)
self._mark_l3_policy_owned(context._plugin_context.session,
l3p['id'])
except exc.DefaultL3PolicyAlreadyExists:
with excutils.save_and_reraise_exception(
reraise=False) as ctxt:
LOG.debug("Possible concurrent creation of default L3 "
"policy for tenant %s", tenant_id)
l3ps = self._get_l3_policies(context._plugin_context,
filter, clean_session)
l3p = l3ps and l3ps[0]
if not l3p:
LOG.warning(_LW(
"Caught DefaultL3PolicyAlreadyExists, "
"but default L3 policy not concurrently "
"created for tenant %s"), tenant_id)
ctxt.reraise = True
except exc.OverlappingIPPoolsInSameTenantNotAllowed:
with excutils.save_and_reraise_exception():
LOG.info(_LI("Caught "
"OverlappingIPPoolsinSameTenantNotAllowed "
"during creation of default L3 policy for "
"tenant %s"), tenant_id)
context.current['l3_policy_id'] = l3p['id']
def _use_implicit_l3_policy(self, context):
self._create_implicit_l3_policy(context)
context.set_l3_policy_id(context.current['l3_policy_id'])
def _create_implicit_l2_policy(self, context, clean_session=True):
attrs = {'tenant_id': context.current['tenant_id'],
'name': context.current['name'],
'description': _("Implicitly created L2 policy"),
'l3_policy_id': None,
'shared': context.current.get('shared', False),
'network_id': None}
if context.current.get('proxied_group_id'):
# The L3P has to be the same as the proxied group
group = context._plugin.get_policy_target_group(
context._plugin_context, context.current['proxied_group_id'])
l2p = context._plugin.get_l2_policy(
context._plugin_context, group['l2_policy_id'])
attrs['l3_policy_id'] = l2p['l3_policy_id']
l2p = self._create_l2_policy(context._plugin_context, attrs,
clean_session)
context.current['l2_policy_id'] = l2p['id']
self._mark_l2_policy_owned(context._plugin_context.session, l2p['id'])
def _use_implicit_l2_policy(self, context):
self._create_implicit_l2_policy(context)
context.set_l2_policy_id(context.current['l2_policy_id'])
def _mark_l2_policy_owned(self, session, l2p_id):
with session.begin(subtransactions=True):
owned = OwnedL2Policy(l2_policy_id=l2p_id)
session.add(owned)
def _l2_policy_is_owned(self, session, l2p_id):
with session.begin(subtransactions=True):
return (session.query(OwnedL2Policy).
filter_by(l2_policy_id=l2p_id).
first() is not None)
def _mark_l3_policy_owned(self, session, l3p_id):
with session.begin(subtransactions=True):
owned = OwnedL3Policy(l3_policy_id=l3p_id)
session.add(owned)
def _l3_policy_is_owned(self, session, l3p_id):
with session.begin(subtransactions=True):
return (session.query(OwnedL3Policy).
filter_by(l3_policy_id=l3p_id).
first() is not None)
def _cleanup_l3_policy(self, context, l3p_id, clean_session=True):
if self._l3_policy_is_owned(context._plugin_context.session, l3p_id):
# REVISIT(rkukura): Add check_unused parameter to
# local_api._delete_l3_policy()?
context._plugin.delete_l3_policy(context._plugin_context, l3p_id,
check_unused=True)
def _cleanup_l2_policy(self, context, l2p_id, clean_session=True):
if self._l2_policy_is_owned(context._plugin_context.session, l2p_id):
try:
self._delete_l2_policy(context._plugin_context, l2p_id,
clean_session)
except gbp_ext.L2PolicyInUse:
LOG.info(_LI(
"Cannot delete implicit L2 Policy %s because it's "
"in use."), l2p_id)
class ImplicitPolicyDriver(ImplicitPolicyBase):
"""Implicit Policy driver for Group Policy plugin.
This driver ensures that the l2_policy_id attribute of
PolicyTargetGroup references an L2Policy instance and that the
l3_policy_id attribute of L2Policy references an L3Policy instance
when the default value of None is specified.
"""
@log.log_method_call
def initialize(self):
super(ImplicitPolicyDriver, self).initialize()
@log.log_method_call
def create_policy_target_group_postcommit(self, context):
if not context.current['l2_policy_id']:
@@ -184,82 +306,6 @@ class ImplicitPolicyDriver(api.PolicyDriver, local_api.LocalAPI):
def update_l3_policy_postcommit(self, context):
pass
def _use_implicit_l2_policy(self, context):
attrs = {'tenant_id': context.current['tenant_id'],
'name': context.current['name'],
'description': _("Implicitly created L2 policy"),
'l3_policy_id': None,
'shared': context.current.get('shared', False),
'network_id': None}
if context.current.get('proxied_group_id'):
# The L3P has to be the same as the proxied group
group = context._plugin.get_policy_target_group(
context._plugin_context, context.current['proxied_group_id'])
l2p = context._plugin.get_l2_policy(
context._plugin_context, group['l2_policy_id'])
attrs['l3_policy_id'] = l2p['l3_policy_id']
l2p = self._create_l2_policy(context._plugin_context, attrs)
l2p_id = l2p['id']
self._mark_l2_policy_owned(context._plugin_context.session, l2p_id)
context.set_l2_policy_id(l2p_id)
def _cleanup_l2_policy(self, context, l2p_id):
if self._l2_policy_is_owned(context._plugin_context.session, l2p_id):
try:
self._delete_l2_policy(context._plugin_context, l2p_id)
except gbp_ext.L2PolicyInUse:
LOG.info(_LI(
"Cannot delete implicit L2 Policy %s because it's "
"in use."), l2p_id)
def _use_implicit_l3_policy(self, context):
tenant_id = context.current['tenant_id']
filter = {'tenant_id': [tenant_id],
'name': [self._default_l3p_name]}
l3ps = self._get_l3_policies(context._plugin_context, filter)
l3p = l3ps and l3ps[0]
if not l3p:
attrs = {'tenant_id': tenant_id,
'name': self._default_l3p_name,
'description': _("Implicitly created L3 policy"),
'ip_version': self._default_ip_version,
'ip_pool': self._default_ip_pool,
'shared': context.current.get('shared', False),
'subnet_prefix_length':
self._default_subnet_prefix_length}
if self._proxy_group_enabled:
attrs['proxy_ip_pool'] = (
self._default_proxy_ip_pool)
attrs['proxy_subnet_prefix_length'] = (
self._default_proxy_subnet_prefix_length)
try:
l3p = self._create_l3_policy(context._plugin_context, attrs)
self._mark_l3_policy_owned(context._plugin_context.session,
l3p['id'])
except exc.DefaultL3PolicyAlreadyExists:
with excutils.save_and_reraise_exception(
reraise=False) as ctxt:
LOG.debug("Possible concurrent creation of default L3 "
"policy for tenant %s", tenant_id)
l3ps = self._get_l3_policies(context._plugin_context,
filter)
l3p = l3ps and l3ps[0]
if not l3p:
LOG.warning(_LW(
"Caught DefaultL3PolicyAlreadyExists, "
"but default L3 policy not concurrently "
"created for tenant %s"), tenant_id)
ctxt.reraise = True
except exc.OverlappingIPPoolsInSameTenantNotAllowed:
with excutils.save_and_reraise_exception():
LOG.info(_LI("Caught "
"OverlappingIPPoolsinSameTenantNotAllowed "
"during creation of default L3 policy for "
"tenant %s"), tenant_id)
context.current['l3_policy_id'] = l3p['id']
context.set_l3_policy_id(l3p['id'])
def _use_implicit_external_segment(self, context):
if not self._default_es_name:
return
@@ -278,32 +324,3 @@ class ImplicitPolicyDriver(api.PolicyDriver, local_api.LocalAPI):
if default:
# Set default ES
context.set_external_segment(default['id'])
def _cleanup_l3_policy(self, context, l3p_id):
if self._l3_policy_is_owned(context._plugin_context.session, l3p_id):
# REVISIT(rkukura): Add check_unused parameter to
# local_api._delete_l3_policy()?
context._plugin.delete_l3_policy(context._plugin_context, l3p_id,
check_unused=True)
def _mark_l2_policy_owned(self, session, l2p_id):
with session.begin(subtransactions=True):
owned = OwnedL2Policy(l2_policy_id=l2p_id)
session.add(owned)
def _l2_policy_is_owned(self, session, l2p_id):
with session.begin(subtransactions=True):
return (session.query(OwnedL2Policy).
filter_by(l2_policy_id=l2p_id).
first() is not None)
def _mark_l3_policy_owned(self, session, l3p_id):
with session.begin(subtransactions=True):
owned = OwnedL3Policy(l3_policy_id=l3p_id)
session.add(owned)
def _l3_policy_is_owned(self, session, l3p_id):
with session.begin(subtransactions=True):
return (session.query(OwnedL3Policy).
filter_by(l3_policy_id=l3p_id).
first() is not None)

View File

@@ -0,0 +1,72 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import helpers as log
from oslo_log import log as logging
from gbpservice.neutron.services.grouppolicy.common import exceptions as exc
from gbpservice.neutron.services.grouppolicy.drivers import (
implicit_policy as ipd)
from gbpservice.neutron.services.grouppolicy.drivers import (
resource_mapping as rmd)
LOG = logging.getLogger(__name__)
class CommonNeutronBase(ipd.ImplicitPolicyBase, rmd.OwnedResourcesOperations,
rmd.ImplicitResourceOperations):
"""Neutron Resources' Orchestration driver.
This driver realizes GBP's network semantics by orchestrating
the necessary Neutron resources.
"""
@log.log_method_call
def initialize(self):
# REVISIT: Check if this is still required
self._cached_agent_notifier = None
super(CommonNeutronBase, self).initialize()
@log.log_method_call
def create_l2_policy_precommit(self, context):
l2p_db = context._plugin._get_l2_policy(
context._plugin_context, context.current['id'])
if not context.current['l3_policy_id']:
self._create_implicit_l3_policy(context, clean_session=False)
l2p_db['l3_policy_id'] = context.current['l3_policy_id']
if not context.current['network_id']:
self._use_implicit_network(context, clean_session=False)
l2p_db['network_id'] = context.current['network_id']
@log.log_method_call
def update_l2_policy_precommit(self, context):
if (context.current['inject_default_route'] !=
context.original['inject_default_route']):
raise exc.UnsettingInjectDefaultRouteOfL2PolicyNotSupported()
if (context.current['l3_policy_id'] !=
context.original['l3_policy_id']):
raise exc.L3PolicyUpdateOfL2PolicyNotSupported()
@log.log_method_call
def delete_l2_policy_precommit(self, context):
l2p_db = context._plugin._get_l2_policy(
context._plugin_context, context.current['id'])
if l2p_db['network_id']:
network_id = l2p_db['network_id']
l2p_db.update({'network_id': None})
self._cleanup_network(context._plugin_context, network_id,
clean_session=False)
if l2p_db['l3_policy_id']:
l3p_id = l2p_db['l3_policy_id']
l2p_db.update({'l3_policy_id': None})
self._cleanup_l3_policy(context, l3p_id, clean_session=False)

View File

@@ -110,8 +110,328 @@ class CidrInUse(exc.GroupPolicyInternalError):
message = _("CIDR %(cidr)s in-use within L3 policy %(l3p_id)s")
class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI,
nsp_manager.NetworkServicePolicyMappingMixin):
class OwnedResourcesOperations(object):
def _mark_port_owned(self, session, port_id):
with session.begin(subtransactions=True):
owned = OwnedPort(port_id=port_id)
session.add(owned)
def _port_is_owned(self, session, port_id):
with session.begin(subtransactions=True):
return (session.query(OwnedPort).
filter_by(port_id=port_id).
first() is not None)
def _mark_subnet_owned(self, session, subnet_id):
with session.begin(subtransactions=True):
owned = OwnedSubnet(subnet_id=subnet_id)
session.add(owned)
def _subnet_is_owned(self, session, subnet_id):
with session.begin(subtransactions=True):
return (session.query(OwnedSubnet).
filter_by(subnet_id=subnet_id).
first() is not None)
def _mark_network_owned(self, session, network_id):
with session.begin(subtransactions=True):
owned = OwnedNetwork(network_id=network_id)
session.add(owned)
def _network_is_owned(self, session, network_id):
with session.begin(subtransactions=True):
return (session.query(OwnedNetwork).
filter_by(network_id=network_id).
first() is not None)
def _mark_router_owned(self, session, router_id):
with session.begin(subtransactions=True):
owned = OwnedRouter(router_id=router_id)
session.add(owned)
def _router_is_owned(self, session, router_id):
with session.begin(subtransactions=True):
return (session.query(OwnedRouter).
filter_by(router_id=router_id).
first() is not None)
class ImplicitResourceOperations(local_api.LocalAPI):
def _create_implicit_network(self, context, clean_session=True, **kwargs):
attrs = {'tenant_id': context.current['tenant_id'],
'name': context.current['name'], 'admin_state_up': True,
'shared': context.current.get('shared', False)}
attrs.update(**kwargs)
network = self._create_network(context._plugin_context, attrs,
clean_session)
network_id = network['id']
self._mark_network_owned(context._plugin_context.session, network_id)
return network
def _use_implicit_network(self, context, clean_session=True):
network = self._create_implicit_network(
context, clean_session, name='l2p_' + context.current['name'])
context.set_network_id(network['id'])
def _cleanup_network(self, plugin_context, network_id, clean_session=True):
if self._network_is_owned(plugin_context.session, network_id):
self._delete_network(plugin_context, network_id, clean_session)
def _generate_subnets_from_cidrs(self, context, l2p, l3p, cidrs,
subnet_specifics, clean_session=True):
for usable_cidr in cidrs:
try:
attrs = {'tenant_id': context.current['tenant_id'],
'name': 'ptg_' + context.current['name'],
'network_id': l2p['network_id'],
'ip_version': l3p['ip_version'],
'cidr': usable_cidr,
'enable_dhcp': True,
'gateway_ip': attributes.ATTR_NOT_SPECIFIED,
'allocation_pools': attributes.ATTR_NOT_SPECIFIED,
'dns_nameservers': (
cfg.CONF.resource_mapping.dns_nameservers or
attributes.ATTR_NOT_SPECIFIED),
'host_routes': attributes.ATTR_NOT_SPECIFIED}
attrs.update(subnet_specifics)
subnet = self._create_subnet(
context._plugin_context, attrs,
clean_session=clean_session)
yield subnet
except n_exc.BadRequest:
# This is expected (CIDR overlap within network) until
# we have a proper subnet allocation algorithm. We
# ignore the exception and repeat with the next CIDR.
pass
def _get_ptg_cidrs(self, context, ptgs, ptg_dicts=None,
clean_session=True):
cidrs = []
if ptg_dicts:
ptgs = ptg_dicts
else:
ptgs = context._plugin.get_policy_target_groups(
context._plugin_context.elevated(), filters={'id': ptgs})
subnets = []
for ptg in ptgs:
subnets.extend(ptg['subnets'])
if subnets:
cidrs = [x['cidr'] for x in self._get_subnets(
context._plugin_context.elevated(), {'id': subnets},
clean_session=clean_session)]
return cidrs
def _get_l3p_allocated_subnets(self, context, l3p_id, clean_session=True):
ptgs = context._plugin._get_l3p_ptgs(
context._plugin_context.elevated(), l3p_id)
return self._get_ptg_cidrs(context, None, ptg_dicts=ptgs,
clean_session=clean_session)
def _validate_and_add_subnet(self, context, subnet, l3p_id,
clean_session=True):
subnet_id = subnet['id']
session = context._plugin_context.session
with utils.clean_session(session) if clean_session else (
local_api.dummy_context_mgr()):
with session.begin(subtransactions=True):
LOG.debug("starting validate_and_add_subnet transaction for "
"subnet %s", subnet_id)
ptgs = context._plugin._get_l3p_ptgs(
context._plugin_context.elevated(), l3p_id)
allocated = netaddr.IPSet(
iterable=self._get_ptg_cidrs(context, None,
ptg_dicts=ptgs,
clean_session=clean_session))
cidr = subnet['cidr']
if cidr in allocated:
LOG.debug("CIDR %s in-use for L3P %s, allocated: %s",
cidr, l3p_id, allocated)
raise CidrInUse(cidr=cidr, l3p_id=l3p_id)
context.add_subnet(subnet_id)
LOG.debug("ending validate_and_add_subnet transaction for "
"subnet %s", subnet_id)
def _use_l2_proxy_implicit_subnets(self, context,
subnet_specifics, l2p, l3p,
clean_session=True):
LOG.debug("allocate subnets for L2 Proxy %s",
context.current['id'])
proxied = context._plugin.get_policy_target_group(
context._plugin_context, context.current['proxied_group_id'])
subnets = self._get_subnets(context._plugin_context,
{'id': proxied['subnets']},
clean_session=clean_session)
# Use the same subnets as the Proxied PTG
generator = self._generate_subnets_from_cidrs(
context, l2p, l3p, [x['cidr'] for x in subnets],
subnet_specifics, clean_session=clean_session)
# Unroll the generator
subnets = [x for x in generator]
subnet_ids = [x['id'] for x in subnets]
for subnet_id in subnet_ids:
self._mark_subnet_owned(
context._plugin_context.session, subnet_id)
context.add_subnet(subnet_id)
return subnets
def _use_normal_implicit_subnet(self, context, is_proxy, prefix_len,
subnet_specifics, l2p, l3p,
clean_session=True):
LOG.debug("allocate subnets for L3 Proxy or normal PTG %s",
context.current['id'])
# REVISIT(rkukura): The folowing is a temporary allocation
# algorithm that should be replaced with use of a neutron
# subnet pool.
pool = netaddr.IPSet(
iterable=[l3p['proxy_ip_pool'] if is_proxy else
l3p['ip_pool']])
prefixlen = prefix_len or (
l3p['proxy_subnet_prefix_length'] if is_proxy
else l3p['subnet_prefix_length'])
l3p_id = l3p['id']
allocated = netaddr.IPSet(
iterable=self._get_l3p_allocated_subnets(
context, l3p_id, clean_session=clean_session))
available = pool - allocated
available.compact()
for cidr in sorted(available.iter_cidrs(),
key=operator.attrgetter('prefixlen'),
reverse=True):
if prefixlen < cidr.prefixlen:
# Close the loop, no remaining subnet is big enough
# for this allocation
break
generator = self._generate_subnets_from_cidrs(
context, l2p, l3p, cidr.subnet(prefixlen),
subnet_specifics, clean_session=clean_session)
for subnet in generator:
LOG.debug("Trying subnet %s for PTG %s", subnet,
context.current['id'])
subnet_id = subnet['id']
try:
self._mark_subnet_owned(context._plugin_context.session,
subnet_id)
self._validate_and_add_subnet(context, subnet, l3p_id,
clean_session=clean_session)
LOG.debug("Using subnet %s for PTG %s", subnet,
context.current['id'])
return [subnet]
except CidrInUse:
# This exception is expected when a concurrent
# request has beat this one to calling
# _validate_and_add_subnet() using the same
# available CIDR. We delete the subnet and try the
# next available CIDR.
self._delete_subnet(context._plugin_context,
subnet['id'],
clean_session=clean_session)
except n_exc.InvalidInput:
# This exception is not expected. We catch this
# here so that it isn't caught below and handled
# as if the CIDR is already in use.
self._delete_subnet(context._plugin_context,
subnet['id'],
clean_session=clean_session)
raise exc.GroupPolicyInternalError()
raise exc.NoSubnetAvailable()
def _use_implicit_subnet(self, context, is_proxy=False, prefix_len=None,
subnet_specifics=None, clean_session=True):
subnet_specifics = subnet_specifics or {}
l2p_id = context.current['l2_policy_id']
l2p = context._plugin.get_l2_policy(context._plugin_context, l2p_id)
l3p_id = l2p['l3_policy_id']
l3p = context._plugin.get_l3_policy(context._plugin_context, l3p_id)
if (is_proxy and
context.current['proxy_type'] == proxy_ext.PROXY_TYPE_L2):
# In case of L2 proxy
return self._use_l2_proxy_implicit_subnets(
context, subnet_specifics, l2p, l3p,
clean_session=clean_session)
else:
# In case of non proxy PTG or L3 Proxy
return self._use_normal_implicit_subnet(
context, is_proxy, prefix_len, subnet_specifics, l2p, l3p,
clean_session=clean_session)
def _cleanup_subnet(self, plugin_context, subnet_id, router_id=None,
clean_session=True):
interface_info = {'subnet_id': subnet_id}
if router_id:
try:
self._remove_router_interface(plugin_context, router_id,
interface_info)
except ext_l3.RouterInterfaceNotFoundForSubnet:
LOG.debug("Ignoring RouterInterfaceNotFoundForSubnet cleaning "
"up subnet: %s", subnet_id)
if self._subnet_is_owned(plugin_context.session, subnet_id):
self._delete_subnet(plugin_context, subnet_id,
clean_session=clean_session)
def _get_default_security_group(self, plugin_context, ptg_id,
tenant_id, clean_session=True):
port_name = DEFAULT_SG_PREFIX % ptg_id
filters = {'name': [port_name], 'tenant_id': [tenant_id]}
default_group = self._get_sgs(plugin_context, filters,
clean_session=clean_session)
return default_group[0]['id'] if default_group else None
def _use_implicit_port(self, context, subnets=None, clean_session=True):
ptg_id = context.current['policy_target_group_id']
ptg = context._plugin.get_policy_target_group(
context._plugin_context, ptg_id)
l2p_id = ptg['l2_policy_id']
l2p = context._plugin.get_l2_policy(context._plugin_context, l2p_id)
sg_id = self._get_default_security_group(
context._plugin_context, ptg_id, context.current['tenant_id'],
clean_session=clean_session)
last = exc.NoSubnetAvailable()
subnets = subnets or self._get_subnets(context._plugin_context,
{'id': ptg['subnets']},
clean_session=clean_session)
for subnet in subnets:
try:
attrs = {'tenant_id': context.current['tenant_id'],
'name': 'pt_' + context.current['name'],
'network_id': l2p['network_id'],
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'fixed_ips': [{'subnet_id': subnet['id']}],
'device_id': '',
'device_owner': '',
'security_groups': [sg_id] if sg_id else None,
'admin_state_up': True}
if context.current.get('group_default_gateway'):
attrs['fixed_ips'][0]['ip_address'] = subnet['gateway_ip']
attrs.update(context.current.get('port_attributes', {}))
port = self._create_port(context._plugin_context, attrs,
clean_session=clean_session)
port_id = port['id']
self._mark_port_owned(context._plugin_context.session, port_id)
context.set_port_id(port_id)
return
except n_exc.IpAddressGenerationFailure as ex:
LOG.warning(_LW("No more address available in subnet %s"),
subnet['id'])
last = ex
raise last
def _cleanup_port(self, plugin_context, port_id):
if self._port_is_owned(plugin_context.session, port_id):
try:
self._delete_port(plugin_context, port_id)
except n_exc.PortNotFound:
LOG.warning(_LW("Port %s is missing") % port_id)
class ResourceMappingDriver(api.PolicyDriver, ImplicitResourceOperations,
nsp_manager.NetworkServicePolicyMappingMixin,
OwnedResourcesOperations):
"""Resource Mapping driver for Group Policy plugin.
This driver implements group policy semantics by mapping group
@@ -1299,49 +1619,6 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI,
l3p = context._plugin.get_l3_policy(context._plugin_context, l3p_id)
return l3p
def _use_implicit_port(self, context, subnets=None):
ptg_id = context.current['policy_target_group_id']
ptg = context._plugin.get_policy_target_group(
context._plugin_context, ptg_id)
l2p_id = ptg['l2_policy_id']
l2p = context._plugin.get_l2_policy(context._plugin_context, l2p_id)
sg_id = self._get_default_security_group(
context._plugin_context, ptg_id, context.current['tenant_id'])
last = exc.NoSubnetAvailable()
subnets = subnets or self._get_subnets(context._plugin_context,
{'id': ptg['subnets']})
for subnet in subnets:
try:
attrs = {'tenant_id': context.current['tenant_id'],
'name': 'pt_' + context.current['name'],
'network_id': l2p['network_id'],
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'fixed_ips': [{'subnet_id': subnet['id']}],
'device_id': '',
'device_owner': '',
'security_groups': [sg_id] if sg_id else None,
'admin_state_up': True}
if context.current.get('group_default_gateway'):
attrs['fixed_ips'][0]['ip_address'] = subnet['gateway_ip']
attrs.update(context.current.get('port_attributes', {}))
port = self._create_port(context._plugin_context, attrs)
port_id = port['id']
self._mark_port_owned(context._plugin_context.session, port_id)
context.set_port_id(port_id)
return
except n_exc.IpAddressGenerationFailure as ex:
LOG.warning(_LW("No more address available in subnet %s"),
subnet['id'])
last = ex
raise last
def _cleanup_port(self, plugin_context, port_id):
if self._port_is_owned(plugin_context.session, port_id):
try:
self._delete_port(plugin_context, port_id)
except n_exc.PortNotFound:
LOG.warning(_LW("Port %s is missing") % port_id)
def _plug_router_to_external_segment(self, context, es_dict):
es_list = context._plugin.get_external_segments(
context._plugin_context, filters={'id': es_dict.keys()})
@@ -1386,125 +1663,6 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI,
self._remove_router_gw_interface(context._plugin_context,
router_id, interface_info)
def _use_implicit_subnet(self, context, is_proxy=False, prefix_len=None,
subnet_specifics=None):
subnet_specifics = subnet_specifics or {}
l2p_id = context.current['l2_policy_id']
l2p = context._plugin.get_l2_policy(context._plugin_context, l2p_id)
l3p_id = l2p['l3_policy_id']
l3p = context._plugin.get_l3_policy(context._plugin_context, l3p_id)
if (is_proxy and
context.current['proxy_type'] == proxy_ext.PROXY_TYPE_L2):
# In case of L2 proxy
return self._use_l2_proxy_implicit_subnets(
context, subnet_specifics, l2p, l3p)
else:
# In case of non proxy PTG or L3 Proxy
return self._use_normal_implicit_subnet(
context, is_proxy, prefix_len, subnet_specifics, l2p, l3p)
def _use_l2_proxy_implicit_subnets(self, context,
subnet_specifics, l2p, l3p):
LOG.debug("allocate subnets for L2 Proxy %s",
context.current['id'])
proxied = context._plugin.get_policy_target_group(
context._plugin_context, context.current['proxied_group_id'])
subnets = self._get_subnets(context._plugin_context,
{'id': proxied['subnets']})
# Use the same subnets as the Proxied PTG
generator = self._generate_subnets_from_cidrs(
context, l2p, l3p, [x['cidr'] for x in subnets],
subnet_specifics)
# Unroll the generator
subnets = [x for x in generator]
subnet_ids = [x['id'] for x in subnets]
for subnet_id in subnet_ids:
self._mark_subnet_owned(
context._plugin_context.session, subnet_id)
context.add_subnet(subnet_id)
return subnets
def _use_normal_implicit_subnet(self, context, is_proxy, prefix_len,
subnet_specifics, l2p, l3p):
LOG.debug("allocate subnets for L3 Proxy or normal PTG %s",
context.current['id'])
# REVISIT(rkukura): The folowing is a temporary allocation
# algorithm that should be replaced with use of a neutron
# subnet pool.
pool = netaddr.IPSet(
iterable=[l3p['proxy_ip_pool'] if is_proxy else
l3p['ip_pool']])
prefixlen = prefix_len or (
l3p['proxy_subnet_prefix_length'] if is_proxy
else l3p['subnet_prefix_length'])
l3p_id = l3p['id']
allocated = netaddr.IPSet(
iterable=self._get_l3p_allocated_subnets(context, l3p_id))
available = pool - allocated
available.compact()
for cidr in sorted(available.iter_cidrs(),
key=operator.attrgetter('prefixlen'),
reverse=True):
if prefixlen < cidr.prefixlen:
# Close the loop, no remaining subnet is big enough
# for this allocation
break
generator = self._generate_subnets_from_cidrs(
context, l2p, l3p, cidr.subnet(prefixlen),
subnet_specifics)
for subnet in generator:
LOG.debug("Trying subnet %s for PTG %s", subnet,
context.current['id'])
subnet_id = subnet['id']
try:
self._mark_subnet_owned(context._plugin_context.session,
subnet_id)
self._validate_and_add_subnet(context, subnet,
l3p_id)
LOG.debug("Using subnet %s for PTG %s", subnet,
context.current['id'])
return [subnet]
except CidrInUse:
# This exception is expected when a concurrent
# request has beat this one to calling
# _validate_and_add_subnet() using the same
# available CIDR. We delete the subnet and try the
# next available CIDR.
self._delete_subnet(context._plugin_context,
subnet['id'])
except n_exc.InvalidInput:
# This exception is not expected. We catch this
# here so that it isn't caught below and handled
# as if the CIDR is already in use.
self._delete_subnet(context._plugin_context,
subnet['id'])
raise exc.GroupPolicyInternalError()
raise exc.NoSubnetAvailable()
def _validate_and_add_subnet(self, context, subnet, l3p_id):
subnet_id = subnet['id']
session = context._plugin_context.session
with utils.clean_session(session):
with session.begin(subtransactions=True):
LOG.debug("starting validate_and_add_subnet transaction for "
"subnet %s", subnet_id)
ptgs = context._plugin._get_l3p_ptgs(
context._plugin_context.elevated(), l3p_id)
allocated = netaddr.IPSet(
iterable=self._get_ptg_cidrs(context, None,
ptg_dicts=ptgs))
cidr = subnet['cidr']
if cidr in allocated:
LOG.debug("CIDR %s in-use for L3P %s, allocated: %s",
cidr, l3p_id, allocated)
raise CidrInUse(cidr=cidr, l3p_id=l3p_id)
context.add_subnet(subnet_id)
LOG.debug("ending validate_and_add_subnet transaction for "
"subnet %s", subnet_id)
def _use_implicit_nat_pool_subnet(self, context):
es = context._plugin.get_external_segment(
context._plugin_context, context.current['external_segment_id'])
@@ -1535,32 +1693,6 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI,
LOG.exception(_LE("Adding subnet to router failed"))
raise exc.GroupPolicyInternalError()
def _generate_subnets_from_cidrs(self, context, l2p, l3p, cidrs,
subnet_specifics):
for usable_cidr in cidrs:
try:
attrs = {'tenant_id': context.current['tenant_id'],
'name': 'ptg_' + context.current['name'],
'network_id': l2p['network_id'],
'ip_version': l3p['ip_version'],
'cidr': usable_cidr,
'enable_dhcp': True,
'gateway_ip': attributes.ATTR_NOT_SPECIFIED,
'allocation_pools': attributes.ATTR_NOT_SPECIFIED,
'dns_nameservers': (
cfg.CONF.resource_mapping.dns_nameservers or
attributes.ATTR_NOT_SPECIFIED),
'host_routes': attributes.ATTR_NOT_SPECIFIED}
attrs.update(subnet_specifics)
subnet = self._create_subnet(context._plugin_context,
attrs)
yield subnet
except n_exc.BadRequest:
# This is expected (CIDR overlap within network) until
# we have a proper subnet allocation algorithm. We
# ignore the exception and repeat with the next CIDR.
pass
def _stitch_ptg_to_l3p(self, context, ptg, l3p, subnet_ids):
if l3p['routers']:
router_id = l3p['routers'][0]
@@ -1627,37 +1759,6 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI,
self._delete_subnet(context._plugin_context, subnet_id)
raise exc.GroupPolicyInternalError()
def _cleanup_subnet(self, plugin_context, subnet_id, router_id):
interface_info = {'subnet_id': subnet_id}
if router_id:
try:
self._remove_router_interface(plugin_context, router_id,
interface_info)
except ext_l3.RouterInterfaceNotFoundForSubnet:
LOG.debug("Ignoring RouterInterfaceNotFoundForSubnet cleaning "
"up subnet: %s", subnet_id)
if self._subnet_is_owned(plugin_context.session, subnet_id):
self._delete_subnet(plugin_context, subnet_id)
def _create_implicit_network(self, context, **kwargs):
attrs = {'tenant_id': context.current['tenant_id'],
'name': context.current['name'], 'admin_state_up': True,
'shared': context.current.get('shared', False)}
attrs.update(**kwargs)
network = self._create_network(context._plugin_context, attrs)
network_id = network['id']
self._mark_network_owned(context._plugin_context.session, network_id)
return network
def _use_implicit_network(self, context):
network = self._create_implicit_network(
context, name='l2p_' + context.current['name'])
context.set_network_id(network['id'])
def _cleanup_network(self, plugin_context, network_id):
if self._network_is_owned(plugin_context.session, network_id):
self._delete_network(plugin_context, network_id)
def _use_implicit_router(self, context, router_name=None):
attrs = {'tenant_id': context.current['tenant_id'],
'name': router_name or ('l3p_' + context.current['name']),
@@ -1806,50 +1907,6 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI,
fip = self._create_fip(plugin_context, attrs)
return fip['id']
def _mark_port_owned(self, session, port_id):
with session.begin(subtransactions=True):
owned = OwnedPort(port_id=port_id)
session.add(owned)
def _port_is_owned(self, session, port_id):
with session.begin(subtransactions=True):
return (session.query(OwnedPort).
filter_by(port_id=port_id).
first() is not None)
def _mark_subnet_owned(self, session, subnet_id):
with session.begin(subtransactions=True):
owned = OwnedSubnet(subnet_id=subnet_id)
session.add(owned)
def _subnet_is_owned(self, session, subnet_id):
with session.begin(subtransactions=True):
return (session.query(OwnedSubnet).
filter_by(subnet_id=subnet_id).
first() is not None)
def _mark_network_owned(self, session, network_id):
with session.begin(subtransactions=True):
owned = OwnedNetwork(network_id=network_id)
session.add(owned)
def _network_is_owned(self, session, network_id):
with session.begin(subtransactions=True):
return (session.query(OwnedNetwork).
filter_by(network_id=network_id).
first() is not None)
def _mark_router_owned(self, session, router_id):
with session.begin(subtransactions=True):
owned = OwnedRouter(router_id=router_id)
session.add(owned)
def _router_is_owned(self, session, router_id):
with session.begin(subtransactions=True):
return (session.query(OwnedRouter).
filter_by(router_id=router_id).
first() is not None)
def _set_policy_rule_set_sg_mapping(
self, session, policy_rule_set_id, consumed_sg_id, provided_sg_id):
with session.begin(subtransactions=True):
@@ -2155,13 +2212,6 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI,
context._plugin_context, filters={'id': child_rule_ids})
self._apply_policy_rule_set_rules(context, child, child_rules)
def _get_default_security_group(self, plugin_context, ptg_id,
tenant_id):
port_name = DEFAULT_SG_PREFIX % ptg_id
filters = {'name': [port_name], 'tenant_id': [tenant_id]}
default_group = self._get_sgs(plugin_context, filters)
return default_group[0]['id'] if default_group else None
def _update_default_security_group(self, plugin_context, ptg_id,
tenant_id, subnets=None):
@@ -2209,22 +2259,6 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI,
if sg_id:
self._delete_sg(plugin_context, sg_id)
def _get_ptg_cidrs(self, context, ptgs, ptg_dicts=None):
cidrs = []
if ptg_dicts:
ptgs = ptg_dicts
else:
ptgs = context._plugin.get_policy_target_groups(
context._plugin_context.elevated(), filters={'id': ptgs})
subnets = []
for ptg in ptgs:
subnets.extend(ptg['subnets'])
if subnets:
cidrs = [x['cidr'] for x in self._get_subnets(
context._plugin_context.elevated(), {'id': subnets})]
return cidrs
def _get_ep_cidrs(self, context, eps):
cidrs = []
eps = context._plugin.get_external_policies(
@@ -2569,11 +2603,6 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI,
master_ips = [x['ip_address'] for x in master_port['fixed_ips']]
return master_mac, master_ips
def _get_l3p_allocated_subnets(self, context, l3p_id):
ptgs = context._plugin._get_l3p_ptgs(
context._plugin_context.elevated(), l3p_id)
return self._get_ptg_cidrs(context, None, ptg_dicts=ptgs)
def _check_nat_pool_subnet_in_use(self, plugin_context, nat_pool):
if not self._subnet_is_owned(plugin_context.session,
nat_pool['subnet_id']):

View File

@@ -1260,6 +1260,23 @@ class PolicyDriver(object):
"""
pass
# REVISIT(rkukura): Is this needed for all operations, or just for
# create operations? If its needed for all operations, should the
# method be specific to the resource and operation, and include
# the request data (i.e. update_network_pretransaction(self,
# data))?
def ensure_tenant(self, plugin_context, tenant_id):
"""Ensure tenant known before creating resource.
:param plugin_context: Plugin request context.
:param tenant_id: Tenant owning resource about to be created.
Called before the start of a transaction creating any new core
resource, allowing any needed tenant-specific processing to be
performed.
"""
pass
@six.add_metaclass(abc.ABCMeta)
class ExtensionDriver(object):

View File

@@ -456,6 +456,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
@log.log_method_call
def create_policy_target(self, context, policy_target):
self._ensure_tenant(context, policy_target['policy_target'])
self._add_fixed_ips_to_port_attributes(policy_target)
session = context.session
with session.begin(subtransactions=True):
@@ -480,9 +481,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
result['id'])
self.delete_policy_target(context, result['id'])
# Strip the extra port attributes
result.pop('port_attributes', None)
return result
return self.get_policy_target(context, result['id'])
@log.log_method_call
def update_policy_target(self, context, policy_target_id, policy_target):
@@ -507,7 +506,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
self.policy_driver_manager.update_policy_target_postcommit(
policy_context)
return updated_policy_target
return self.get_policy_target(context, policy_target_id)
@log.log_method_call
def delete_policy_target(self, context, policy_target_id):
@@ -545,6 +544,8 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
@log.log_method_call
def create_policy_target_group(self, context, policy_target_group):
self._ensure_tenant(context,
policy_target_group['policy_target_group'])
session = context.session
with session.begin(subtransactions=True):
result = super(GroupPolicyPlugin,
@@ -569,7 +570,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
result['id'])
self.delete_policy_target_group(context, result['id'])
return result
return self.get_policy_target_group(context, result['id'])
@log.log_method_call
def update_policy_target_group(self, context, policy_target_group_id,
@@ -608,7 +609,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
self.policy_driver_manager.update_policy_target_group_postcommit(
policy_context)
return updated_policy_target_group
return self.get_policy_target_group(context, policy_target_group_id)
@log.log_method_call
def delete_policy_target_group(self, context, policy_target_group_id):
@@ -683,6 +684,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
@log.log_method_call
def create_l2_policy(self, context, l2_policy):
self._ensure_tenant(context, l2_policy['l2_policy'])
session = context.session
with session.begin(subtransactions=True):
result = super(GroupPolicyPlugin,
@@ -704,7 +706,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
result['id'])
self.delete_l2_policy(context, result['id'])
return result
return self.get_l2_policy(context, result['id'])
@log.log_method_call
def update_l2_policy(self, context, l2_policy_id, l2_policy):
@@ -726,7 +728,8 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
self.policy_driver_manager.update_l2_policy_postcommit(
policy_context)
return updated_l2_policy
return self.get_l2_policy(context, l2_policy_id)
@log.log_method_call
def delete_l2_policy(self, context, l2_policy_id):
@@ -764,6 +767,8 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
@log.log_method_call
def create_network_service_policy(self, context, network_service_policy):
self._ensure_tenant(
context, network_service_policy['network_service_policy'])
session = context.session
with session.begin(subtransactions=True):
result = super(GroupPolicyPlugin,
@@ -790,7 +795,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
result['id'])
self.delete_network_service_policy(context, result['id'])
return result
return self.get_network_service_policy(context, result['id'])
@log.log_method_call
def update_network_service_policy(self, context, network_service_policy_id,
@@ -818,7 +823,8 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
self.policy_driver_manager.update_network_service_policy_postcommit(
policy_context)
return updated_network_service_policy
return self.get_network_service_policy(context,
network_service_policy_id)
@log.log_method_call
def delete_network_service_policy(
@@ -860,6 +866,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
@log.log_method_call
def create_l3_policy(self, context, l3_policy):
self._ensure_tenant(context, l3_policy['l3_policy'])
session = context.session
with session.begin(subtransactions=True):
result = super(GroupPolicyPlugin,
@@ -883,7 +890,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
result['id'])
self.delete_l3_policy(context, result['id'])
return result
return self.get_l3_policy(context, result['id'])
@log.log_method_call
def update_l3_policy(self, context, l3_policy_id, l3_policy):
@@ -907,7 +914,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
self.policy_driver_manager.update_l3_policy_postcommit(
policy_context)
return updated_l3_policy
return self.get_l3_policy(context, l3_policy_id)
@log.log_method_call
def delete_l3_policy(self, context, l3_policy_id, check_unused=False):
@@ -950,6 +957,8 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
@log.log_method_call
def create_policy_classifier(self, context, policy_classifier):
self._ensure_tenant(context,
policy_classifier['policy_classifier'])
session = context.session
with session.begin(subtransactions=True):
result = super(
@@ -974,7 +983,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
" failed, deleting policy_classifier %s"), result['id'])
self.delete_policy_classifier(context, result['id'])
return result
return self.get_policy_classifier(context, result['id'])
@log.log_method_call
def update_policy_classifier(self, context, id, policy_classifier):
@@ -998,7 +1007,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
self.policy_driver_manager.update_policy_classifier_postcommit(
policy_context)
return updated_policy_classifier
return self.get_policy_classifier(context, id)
@log.log_method_call
def delete_policy_classifier(self, context, id):
@@ -1037,6 +1046,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
@log.log_method_call
def create_policy_action(self, context, policy_action):
self._ensure_tenant(context, policy_action['policy_action'])
session = context.session
with session.begin(subtransactions=True):
result = super(GroupPolicyPlugin,
@@ -1061,7 +1071,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
"failed, deleting policy_action %s"), result['id'])
self.delete_policy_action(context, result['id'])
return result
return self.get_policy_action(context, result['id'])
@log.log_method_call
def update_policy_action(self, context, id, policy_action):
@@ -1086,7 +1096,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
self.policy_driver_manager.update_policy_action_postcommit(
policy_context)
return updated_policy_action
return self.get_policy_action(context, id)
@log.log_method_call
def delete_policy_action(self, context, id):
@@ -1123,6 +1133,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
@log.log_method_call
def create_policy_rule(self, context, policy_rule):
self._ensure_tenant(context, policy_rule['policy_rule'])
session = context.session
with session.begin(subtransactions=True):
result = super(
@@ -1146,7 +1157,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
" failed, deleting policy_rule %s"), result['id'])
self.delete_policy_rule(context, result['id'])
return result
return self.get_policy_rule(context, result['id'])
@log.log_method_call
def update_policy_rule(self, context, id, policy_rule):
@@ -1169,7 +1180,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
self.policy_driver_manager.update_policy_rule_postcommit(
policy_context)
return updated_policy_rule
return self.get_policy_rule(context, id)
@log.log_method_call
def delete_policy_rule(self, context, id):
@@ -1207,6 +1218,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
@log.log_method_call
def create_policy_rule_set(self, context, policy_rule_set):
self._ensure_tenant(context, policy_rule_set['policy_rule_set'])
session = context.session
with session.begin(subtransactions=True):
result = super(GroupPolicyPlugin,
@@ -1231,7 +1243,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
"failed, deleting policy_rule_set %s"), result['id'])
self.delete_policy_rule_set(context, result['id'])
return result
return self.get_policy_rule_set(context, result['id'])
@log.log_method_call
def update_policy_rule_set(self, context, id, policy_rule_set):
@@ -1255,7 +1267,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
self.policy_driver_manager.update_policy_rule_set_postcommit(
policy_context)
return updated_policy_rule_set
return self.get_policy_rule_set(context, id)
@log.log_method_call
def delete_policy_rule_set(self, context, id):
@@ -1292,6 +1304,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
@log.log_method_call
def create_external_segment(self, context, external_segment):
self._ensure_tenant(context, external_segment['external_segment'])
session = context.session
with session.begin(subtransactions=True):
result = super(GroupPolicyPlugin,
@@ -1319,7 +1332,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
"%s"), result['id'])
self.delete_external_segment(context, result['id'])
return result
return self.get_external_segment(context, result['id'])
@log.log_method_call
def update_external_segment(self, context, external_segment_id,
@@ -1349,7 +1362,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
self.policy_driver_manager.update_external_segment_postcommit(
policy_context)
return updated_external_segment
return self.get_external_segment(context, external_segment_id)
@log.log_method_call
def delete_external_segment(self, context, external_segment_id):
@@ -1391,6 +1404,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
@log.log_method_call
def create_external_policy(self, context, external_policy):
self._ensure_tenant(context, external_policy['external_policy'])
session = context.session
with session.begin(subtransactions=True):
result = super(GroupPolicyPlugin,
@@ -1415,7 +1429,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
"%s"), result['id'])
self.delete_external_policy(context, result['id'])
return result
return self.get_external_policy(context, result['id'])
@log.log_method_call
def update_external_policy(self, context, external_policy_id,
@@ -1442,7 +1456,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
self.policy_driver_manager.update_external_policy_postcommit(
policy_context)
return updated_external_policy
return self.get_external_policy(context, external_policy_id)
@log.log_method_call
def delete_external_policy(self, context, external_policy_id,
@@ -1481,6 +1495,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
@log.log_method_call
def create_nat_pool(self, context, nat_pool):
self._ensure_tenant(context, nat_pool['nat_pool'])
session = context.session
with session.begin(subtransactions=True):
result = super(GroupPolicyPlugin, self).create_nat_pool(
@@ -1502,7 +1517,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
"nat_pool %s"), result['id'])
self.delete_nat_pool(context, result['id'])
return result
return self.get_nat_pool(context, result['id'])
@log.log_method_call
def update_nat_pool(self, context, nat_pool_id, nat_pool):
@@ -1523,7 +1538,7 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
update_nat_pool_precommit(policy_context))
self.policy_driver_manager.update_nat_pool_postcommit(policy_context)
return updated_nat_pool
return self.get_nat_pool(context, nat_pool_id)
@log.log_method_call
def delete_nat_pool(self, context, nat_pool_id, check_unused=False):
@@ -1572,3 +1587,12 @@ class GroupPolicyPlugin(group_policy_mapping_db.GroupPolicyMappingDbPlugin):
def _is_service_target(self, context, pt_id):
return bool(ncp_model.get_service_targets_count(
context.session, pt_id))
def _ensure_tenant(self, context, resource):
# TODO(Sumit): This check is ideally not required, but a bunch of UTs
# are not setup correctly to populate the tenant_id, hence we
# temporarily need to perform this check. This will go with the fix
# for the deprecated get_tenant_id_for_create method.
if 'tenant_id' in resource:
tenant_id = resource['tenant_id']
self.policy_driver_manager.ensure_tenant(context, tenant_id)

View File

@@ -17,6 +17,7 @@ from oslo_log import log
import stevedore
from gbpservice.neutron.services.grouppolicy.common import exceptions as gp_exc
from gbpservice.neutron.services.grouppolicy import group_policy_driver_api
LOG = log.getLogger(__name__)
@@ -135,6 +136,16 @@ class PolicyDriverManager(stevedore.named.NamedExtensionManager):
method=method_name
)
def ensure_tenant(self, plugin_context, tenant_id):
for driver in self.ordered_policy_drivers:
if isinstance(driver.obj, group_policy_driver_api.PolicyDriver):
try:
driver.obj.ensure_tenant(plugin_context, tenant_id)
except Exception:
LOG.exception(_LE("Policy driver '%s' failed in "
"ensure_tenant"), driver.name)
raise gp_exc.GroupPolicyDriverError(method="ensure_tenant")
def create_policy_target_precommit(self, context):
self._call_on_drivers("create_policy_target_precommit", context)

View File

@@ -1108,17 +1108,21 @@ class TestGroupResources(GroupPolicyDbTestCase):
policy_actions=[pa1_id, pa2_id])
npa1_id = self.create_policy_action()['policy_action']['id']
npa2_id = self.create_policy_action()['policy_action']['id']
actions_list = [npa1_id, npa2_id]
attrs = cm.get_create_policy_rule_default_attrs(
policy_actions=[npa1_id, npa2_id])
policy_actions=actions_list)
data = {'policy_rule': {'policy_actions': [npa1_id, npa2_id]}}
data = {'policy_rule': {'policy_actions': actions_list}}
req = self.new_update_request('policy_rules', data,
pr['policy_rule']['id'])
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertItemsEqual(actions_list,
res['policy_rule']['policy_actions'])
del attrs['policy_actions']
for k, v in attrs.iteritems():
self.assertEqual(res['policy_rule'][k], v)
self.assertEqual(v, res['policy_rule'][k])
def test_delete_policy_rule(self):
ctx = context.get_admin_context()

View File

@@ -0,0 +1,354 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from aim import aim_manager
from aim.api import resource as aim_resource
from aim import context as aim_context
from aim.db import model_base as aim_model_base
from keystoneclient.v3 import client as ksc_client
from neutron import context as nctx
from neutron.db import api as db_api
from oslo_log import log as logging
import webob.exc
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import (
mechanism_driver as aim_md)
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import apic_mapper
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import model
from gbpservice.neutron.services.grouppolicy import config
from gbpservice.neutron.tests.unit.plugins.ml2plus import (
test_apic_aim as test_aim_md)
from gbpservice.neutron.tests.unit.services.grouppolicy import (
test_neutron_resources_driver as test_nr_base)
ML2PLUS_PLUGIN = 'gbpservice.neutron.plugins.ml2plus.plugin.Ml2PlusPlugin'
class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase):
def setUp(self, policy_drivers=None, core_plugin=None, ml2_options=None,
sc_plugin=None, **kwargs):
core_plugin = core_plugin or ML2PLUS_PLUGIN
policy_drivers = policy_drivers or ['aim_mapping']
ml2_opts = ml2_options or {'mechanism_drivers': ['logger', 'apic_aim'],
'extension_drivers': ['apic_aim'],
'type_drivers': ['opflex', 'local', 'vlan'],
'tenant_network_types': ['opflex']}
super(AIMBaseTestCase, self).setUp(
policy_drivers=policy_drivers, core_plugin=core_plugin,
ml2_options=ml2_opts, sc_plugin=sc_plugin)
config.cfg.CONF.set_override('network_vlan_ranges',
['physnet1:1000:1099'],
group='ml2_type_vlan')
self.saved_keystone_client = ksc_client.Client
ksc_client.Client = test_aim_md.FakeKeystoneClient
self._tenant_id = 'test-tenant'
self._neutron_context = nctx.Context(
'', kwargs.get('tenant_id', self._tenant_id),
is_admin_context=False)
self._neutron_admin_context = nctx.get_admin_context()
engine = db_api.get_engine()
aim_model_base.Base.metadata.create_all(engine)
self._aim = aim_manager.AimManager()
self._aim_context = aim_context.AimContext(
self._neutron_context.session)
self._db = model.DbModel()
self._name_mapper = apic_mapper.APICNameMapper(self._db, logging)
def tearDown(self):
ksc_client.Client = self.saved_keystone_client
super(AIMBaseTestCase, self).tearDown()
class TestL2Policy(test_nr_base.TestL2Policy, AIMBaseTestCase):
pass
class TestPolicyTargetGroup(AIMBaseTestCase):
def test_policy_target_group_lifecycle_implicit_l2p(self):
ptg = self.create_policy_target_group(
name="ptg1")['policy_target_group']
ptg_id = ptg['id']
self.show_policy_target_group(ptg_id, expected_res_status=200)
self.show_l2_policy(ptg['l2_policy_id'], expected_res_status=200)
req = self.new_show_request('subnets', ptg['subnets'][0], fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertIsNotNone(res['subnet']['id'])
ptg_name = ptg['name']
aim_epg_name = str(self._name_mapper.policy_target_group(
self._neutron_context.session, ptg_id, ptg_name))
aim_tenant_name = str(self._name_mapper.tenant(
self._neutron_context.session, self._tenant_id))
aim_app_profile_name = aim_md.AP_NAME
aim_app_profiles = self._aim.find(
self._aim_context, aim_resource.ApplicationProfile,
tenant_name=aim_tenant_name, name=aim_app_profile_name)
self.assertEqual(1, len(aim_app_profiles))
aim_epgs = self._aim.find(
self._aim_context, aim_resource.EndpointGroup, name=aim_epg_name)
self.assertEqual(1, len(aim_epgs))
self.assertEqual(aim_epg_name, aim_epgs[0].name)
self.assertEqual(aim_tenant_name, aim_epgs[0].tenant_name)
self.delete_policy_target_group(ptg_id, expected_res_status=204)
self.show_policy_target_group(ptg_id, expected_res_status=404)
# Implicitly created subnet should be deleted
req = self.new_show_request('subnets', ptg['subnets'][0], fmt=self.fmt)
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNotFound.code, res.status_int)
# Implicitly created L2P should be deleted
self.show_l2_policy(ptg['l2_policy_id'], expected_res_status=404)
aim_epgs = self._aim.find(
self._aim_context, aim_resource.EndpointGroup, name=aim_epg_name)
self.assertEqual(0, len(aim_epgs))
def test_policy_target_group_lifecycle_explicit_l2p(self):
# TODO(Sumit): Refactor the common parts of this and the implicit test
l2p = self.create_l2_policy(name="l2p1")['l2_policy']
l2p_id = l2p['id']
ptg = self.create_policy_target_group(
name="ptg1", l2_policy_id=l2p_id)['policy_target_group']
ptg_id = ptg['id']
self.show_policy_target_group(ptg_id, expected_res_status=200)
self.assertEqual(l2p_id, ptg['l2_policy_id'])
self.show_l2_policy(ptg['l2_policy_id'], expected_res_status=200)
req = self.new_show_request('subnets', ptg['subnets'][0], fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertIsNotNone(res['subnet']['id'])
ptg_name = ptg['name']
aim_epg_name = str(self._name_mapper.policy_target_group(
self._neutron_context.session, ptg_id, ptg_name))
aim_tenant_name = str(self._name_mapper.tenant(
self._neutron_context.session, self._tenant_id))
aim_app_profile_name = aim_md.AP_NAME
aim_app_profiles = self._aim.find(
self._aim_context, aim_resource.ApplicationProfile,
tenant_name=aim_tenant_name, name=aim_app_profile_name)
self.assertEqual(1, len(aim_app_profiles))
aim_epgs = self._aim.find(
self._aim_context, aim_resource.EndpointGroup, name=aim_epg_name)
self.assertEqual(1, len(aim_epgs))
self.assertEqual(aim_epg_name, aim_epgs[0].name)
self.assertEqual(aim_tenant_name, aim_epgs[0].tenant_name)
self.delete_policy_target_group(ptg_id, expected_res_status=204)
self.show_policy_target_group(ptg_id, expected_res_status=404)
# Implicitly created subnet should be deleted
req = self.new_show_request('subnets', ptg['subnets'][0], fmt=self.fmt)
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNotFound.code, res.status_int)
# Explicitly created L2P should not be deleted
self.show_l2_policy(ptg['l2_policy_id'], expected_res_status=200)
aim_epgs = self._aim.find(
self._aim_context, aim_resource.EndpointGroup, name=aim_epg_name)
self.assertEqual(0, len(aim_epgs))
def test_ptg_delete_no_subnet_delete(self):
ptg = self.create_policy_target_group(
name="ptg1")['policy_target_group']
ptg_id = ptg['id']
ptg2 = self.create_policy_target_group(
name="ptg2", l2_policy_id=ptg['l2_policy_id'])[
'policy_target_group']
self.assertEqual(ptg['subnets'], ptg2['subnets'])
self.show_l2_policy(ptg['l2_policy_id'], expected_res_status=200)
req = self.new_show_request('subnets', ptg['subnets'][0], fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertIsNotNone(res['subnet']['id'])
self.delete_policy_target_group(ptg_id, expected_res_status=204)
self.show_policy_target_group(ptg_id, expected_res_status=404)
# Implicitly created subnet should not be deleted
req = self.new_show_request('subnets', ptg['subnets'][0], fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertIsNotNone(res['subnet']['id'])
class TestPolicyTargetGroupRollback(AIMBaseTestCase):
def test_policy_target_group_create_fail(self):
# REVISIT(Sumit): This exception should be raised from the deepest
# point. Currently this is the deepest point.
self._gbp_plugin.policy_driver_manager.policy_drivers[
'aim_mapping'].obj._validate_and_add_subnet = mock.Mock(
side_effect=Exception)
self.create_policy_target_group(name="ptg1", expected_res_status=500)
self.assertEqual([], self._plugin.get_subnets(self._context))
self.assertEqual([], self._plugin.get_networks(self._context))
self.assertEqual([], self._gbp_plugin.get_policy_target_groups(
self._context))
self.assertEqual([], self._gbp_plugin.get_l2_policies(self._context))
self.assertEqual([], self._gbp_plugin.get_l3_policies(self._context))
def test_policy_target_group_update_fail(self):
# REVISIT(Sumit): This exception should be raised from the deepest
# point. Currently this is the deepest point.
self._gbp_plugin.policy_driver_manager.policy_drivers[
'aim_mapping'].obj.update_policy_target_group_precommit = (
mock.Mock(side_effect=Exception))
ptg = self.create_policy_target_group(name="ptg1")
ptg_id = ptg['policy_target_group']['id']
self.update_policy_target_group(ptg_id, expected_res_status=500,
name="new name")
new_ptg = self.show_policy_target_group(ptg_id,
expected_res_status=200)
self.assertEqual(ptg['policy_target_group']['name'],
new_ptg['policy_target_group']['name'])
def test_policy_target_group_delete_fail(self):
# REVISIT(Sumit): This exception should be raised from the deepest
# point. Currently this is the deepest point.
self._gbp_plugin.policy_driver_manager.policy_drivers[
'aim_mapping'].obj.delete_l3_policy_precommit = mock.Mock(
side_effect=Exception)
ptg = self.create_policy_target_group(name="ptg1")
ptg_id = ptg['policy_target_group']['id']
l2p_id = ptg['policy_target_group']['l2_policy_id']
subnet_id = ptg['policy_target_group']['subnets'][0]
l2p = self.show_l2_policy(l2p_id, expected_res_status=200)
l3p_id = l2p['l2_policy']['l3_policy_id']
self.delete_policy_target_group(ptg_id, expected_res_status=500)
req = self.new_show_request('subnets', subnet_id, fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertIsNotNone(res['subnet']['id'])
self.show_policy_target_group(ptg_id, expected_res_status=200)
self.show_l2_policy(l2p_id, expected_res_status=200)
self.show_l3_policy(l3p_id, expected_res_status=200)
class TestPolicyTarget(AIMBaseTestCase):
def test_policy_target_lifecycle_implicit_port(self):
ptg = self.create_policy_target_group(
name="ptg1")['policy_target_group']
ptg_id = ptg['id']
pt = self.create_policy_target(
name="pt1", policy_target_group_id=ptg_id)['policy_target']
pt_id = pt['id']
self.show_policy_target(pt_id, expected_res_status=200)
req = self.new_show_request('ports', pt['port_id'], fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertIsNotNone(res['port']['id'])
self.update_policy_target(pt_id, expected_res_status=200,
name="new name")
new_pt = self.show_policy_target(pt_id, expected_res_status=200)
self.assertEqual('new name', new_pt['policy_target']['name'])
self.delete_policy_target(pt_id, expected_res_status=204)
self.show_policy_target(pt_id, expected_res_status=404)
req = self.new_show_request('ports', pt['port_id'], fmt=self.fmt)
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNotFound.code, res.status_int)
class TestPolicyTargetRollback(AIMBaseTestCase):
def test_policy_target_create_fail(self):
# REVISIT(Sumit): This exception should be raised from the deepest
# point. Currently this is the deepest point.
self._gbp_plugin.policy_driver_manager.policy_drivers[
'aim_mapping'].obj._mark_port_owned = mock.Mock(
side_effect=Exception)
ptg_id = self.create_policy_target_group(
name="ptg1")['policy_target_group']['id']
self.create_policy_target(name="pt1",
policy_target_group_id=ptg_id,
expected_res_status=500)
self.assertEqual([],
self._gbp_plugin.get_policy_targets(self._context))
self.assertEqual([], self._plugin.get_ports(self._context))
def test_policy_target_update_fail(self):
# REVISIT(Sumit): This exception should be raised from the deepest
# point. Currently this is the deepest point.
self._gbp_plugin.policy_driver_manager.policy_drivers[
'aim_mapping'].obj.update_policy_target_precommit = mock.Mock(
side_effect=Exception)
ptg = self.create_policy_target_group(
name="ptg1")['policy_target_group']
ptg_id = ptg['id']
pt = self.create_policy_target(
name="pt1", policy_target_group_id=ptg_id)['policy_target']
pt_id = pt['id']
self.update_policy_target(pt_id, expected_res_status=500,
name="new name")
new_pt = self.show_policy_target(pt_id, expected_res_status=200)
self.assertEqual(pt['name'], new_pt['policy_target']['name'])
def test_policy_target_delete_fail(self):
# REVISIT(Sumit): This exception should be raised from the deepest
# point. Currently this is the deepest point.
self._gbp_plugin.policy_driver_manager.policy_drivers[
'aim_mapping'].obj._delete_port = mock.Mock(
side_effect=Exception)
ptg = self.create_policy_target_group(
name="ptg1")['policy_target_group']
ptg_id = ptg['id']
pt = self.create_policy_target(
name="pt1", policy_target_group_id=ptg_id)['policy_target']
pt_id = pt['id']
port_id = pt['port_id']
self.delete_policy_target(pt_id, expected_res_status=500)
self.show_policy_target(pt_id, expected_res_status=200)
req = self.new_show_request('ports', port_id, fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertIsNotNone(res['port']['id'])
class TestPolicyRule(AIMBaseTestCase):
def _test_policy_rule_lifecycle(self):
# TODO(Sumit): Enable this test when the AIM driver is ready
action1 = self.create_policy_action(
action_type='redirect')['policy_action']
classifier = self.create_policy_classifier(
protocol='TCP', port_range="22",
direction='bi')['policy_classifier']
pr = self.create_policy_rule(
name="pr1", policy_classifier_id=classifier['id'],
policy_actions=[action1['id']])['policy_rule']
pr_id = pr['id']
self.show_policy_rule(pr_id, expected_res_status=200)
tenant = pr['tenant_id']
pr_id = pr['id']
pr_name = pr['name']
rn = self._aim_mapper.tenant_filter(tenant, pr_id, name=pr_name)
aim_pr = self._aim.find(
self._aim_context, aim_resource.TenantFilter, rn=rn)
self.assertEqual(1, len(aim_pr))
self.assertEqual(rn, aim_pr[0].rn)
self.assertEqual(tenant, aim_pr[0].tenant_rn)
self.delete_policy_rule(pr_id, expected_res_status=204)
self.show_policy_rule(pr_id, expected_res_status=404)
aim_pr = self._aim.find(
self._aim_context, aim_resource.TenantFilter, rn=rn)
self.assertEqual(0, len(aim_pr))

View File

@@ -57,7 +57,7 @@ def get_status_for_test(self, context):
getattr(context, resource_name)['status_details'] = NEW_STATUS_DETAILS
class GroupPolicyPluginTestCase(tgpmdb.GroupPolicyMappingDbTestCase):
class GroupPolicyPluginTestBase(tgpmdb.GroupPolicyMappingDbTestCase):
def setUp(self, core_plugin=None, gp_plugin=None, ml2_options=None,
sc_plugin=None):
@@ -67,29 +67,10 @@ class GroupPolicyPluginTestCase(tgpmdb.GroupPolicyMappingDbTestCase):
for opt, val in ml2_opts.items():
cfg.CONF.set_override(opt, val, 'ml2')
core_plugin = core_plugin or test_plugin.PLUGIN_NAME
super(GroupPolicyPluginTestCase, self).setUp(core_plugin=core_plugin,
super(GroupPolicyPluginTestBase, self).setUp(core_plugin=core_plugin,
gp_plugin=gp_plugin,
sc_plugin=sc_plugin)
def test_reverse_on_delete(self):
manager = self.plugin.policy_driver_manager
ctx = context.get_admin_context()
drivers = manager.ordered_policy_drivers
first, second = mock.Mock(), mock.Mock()
first.obj, second.obj = FakeDriver(), FakeDriver()
try:
manager.ordered_policy_drivers = [first, second]
manager.reverse_ordered_policy_drivers = [second, first]
ordered_obj = [first.obj, second.obj]
ctx.call_order = []
manager._call_on_drivers('nodelete', ctx)
self.assertEqual(ordered_obj, ctx.call_order)
ctx.call_order = []
manager._call_on_drivers('delete', ctx)
self.assertEqual(ordered_obj[::-1], ctx.call_order)
finally:
manager.ordered_policy_drivers = drivers
def _create_l2_policy_on_shared(self, **kwargs):
l3p = self.create_l3_policy(shared=True)['l3_policy']
return self.create_l2_policy(l3_policy_id=l3p['id'],
@@ -172,6 +153,28 @@ class GroupPolicyPluginTestCase(tgpmdb.GroupPolicyMappingDbTestCase):
return self.deserialize(self.fmt, res)
class GroupPolicyPluginTestCase(GroupPolicyPluginTestBase):
def test_reverse_on_delete(self):
manager = self.plugin.policy_driver_manager
ctx = context.get_admin_context()
drivers = manager.ordered_policy_drivers
first, second = mock.Mock(), mock.Mock()
first.obj, second.obj = FakeDriver(), FakeDriver()
try:
manager.ordered_policy_drivers = [first, second]
manager.reverse_ordered_policy_drivers = [second, first]
ordered_obj = [first.obj, second.obj]
ctx.call_order = []
manager._call_on_drivers('nodelete', ctx)
self.assertEqual(ordered_obj, ctx.call_order)
ctx.call_order = []
manager._call_on_drivers('delete', ctx)
self.assertEqual(ordered_obj[::-1], ctx.call_order)
finally:
manager.ordered_policy_drivers = drivers
class TestL3Policy(GroupPolicyPluginTestCase):
def _get_es_dict(self, es, addr=None):
@@ -947,8 +950,18 @@ class TestResourceStatusChange(GroupPolicyPluginTestCase):
resource = self.create_policy_rule(policy_classifier_id=pc_id)
else:
resource = getattr(self, "create_" + resource_singular)()
self.assertIsNone(resource[resource_singular]['status'])
self.assertIsNone(resource[resource_singular]['status_details'])
self.assertEqual(NEW_STATUS, resource[resource_singular]['status'])
self.assertEqual(NEW_STATUS_DETAILS,
resource[resource_singular]['status_details'])
# Reset status directly in the DB to test that GET works
reset_status = {resource_singular: {'status': None,
'status_details': None}}
neutron_context = context.Context('', self._tenant_id)
getattr(gpmdb.GroupPolicyMappingDbPlugin,
"update_" + resource_singular)(
self._gbp_plugin, neutron_context,
resource[resource_singular]['id'], reset_status)
req = self.new_show_request(
resource_name, resource[resource_singular]['id'], fmt=self.fmt,
@@ -960,7 +973,6 @@ class TestResourceStatusChange(GroupPolicyPluginTestCase):
self.assertEqual(NEW_STATUS_DETAILS,
res[resource_singular]['status_details'])
elif not gplugin.STATUS_SET.intersection(set(fields)):
neutron_context = context.Context('', self._tenant_id)
db_obj = getattr(
gpmdb.GroupPolicyMappingDbPlugin, "get_" + resource_singular)(
self._gbp_plugin, neutron_context,
@@ -991,7 +1003,15 @@ class TestResourceStatusChange(GroupPolicyPluginTestCase):
getattr(self, create_method)(),
getattr(self, create_method)()]
neutron_context = context.Context('', self._tenant_id)
reset_status = {resource_singular: {'status': None,
'status_details': None}}
for obj in objs:
# Reset status directly in the DB to test that GET works
getattr(gpmdb.GroupPolicyMappingDbPlugin,
"update_" + resource_singular)(
self._gbp_plugin, neutron_context,
obj[resource_singular]['id'], reset_status)
req = self.new_show_request(
resource_name, obj[resource_singular]['id'], fmt=self.fmt,
fields=fields)
@@ -1002,7 +1022,6 @@ class TestResourceStatusChange(GroupPolicyPluginTestCase):
self.assertEqual(NEW_STATUS_DETAILS,
res[resource_singular]['status_details'])
elif not gplugin.STATUS_SET.intersection(set(fields)):
neutron_context = context.Context('', self._tenant_id)
db_obj = getattr(
gpmdb.GroupPolicyMappingDbPlugin,
"get_" + resource_singular)(

View File

@@ -0,0 +1,137 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from neutron import context as nctx
from neutron.db import api as db_api
from neutron.db import model_base
from neutron import manager
from neutron.plugins.common import constants as pconst
from neutron.tests.unit.plugins.ml2 import test_plugin as n_test_plugin
import webob.exc
from gbpservice.neutron.services.grouppolicy import config
from gbpservice.neutron.services.servicechain.plugins.msc import (
config as sc_cfg)
from gbpservice.neutron.tests.unit.services.grouppolicy import (
test_grouppolicy_plugin as test_plugin)
CORE_PLUGIN = ('gbpservice.neutron.tests.unit.services.grouppolicy.'
'test_resource_mapping.NoL3NatSGTestPlugin')
class CommonNeutronBaseTestCase(test_plugin.GroupPolicyPluginTestBase):
def setUp(self, policy_drivers=None,
core_plugin=n_test_plugin.PLUGIN_NAME, ml2_options=None,
sc_plugin=None):
policy_drivers = policy_drivers or ['neutron_resources']
config.cfg.CONF.set_override('policy_drivers',
policy_drivers,
group='group_policy')
sc_cfg.cfg.CONF.set_override('servicechain_drivers',
['dummy'], group='servicechain')
config.cfg.CONF.set_override('allow_overlapping_ips', True)
super(CommonNeutronBaseTestCase, self).setUp(core_plugin=core_plugin,
ml2_options=ml2_options,
sc_plugin=sc_plugin)
engine = db_api.get_engine()
model_base.BASEV2.metadata.create_all(engine)
res = mock.patch('neutron.db.l3_db.L3_NAT_dbonly_mixin.'
'_check_router_needs_rescheduling').start()
res.return_value = None
self._plugin = manager.NeutronManager.get_plugin()
self._plugin.remove_networks_from_down_agents = mock.Mock()
self._plugin.is_agent_down = mock.Mock(return_value=False)
self._context = nctx.get_admin_context()
plugins = manager.NeutronManager.get_service_plugins()
self._gbp_plugin = plugins.get(pconst.GROUP_POLICY)
self._l3_plugin = plugins.get(pconst.L3_ROUTER_NAT)
config.cfg.CONF.set_override('debug', True)
config.cfg.CONF.set_override('verbose', True)
def get_plugin_context(self):
return self._plugin, self._context
class TestL2Policy(CommonNeutronBaseTestCase):
def test_l2_policy_lifecycle(self):
l2p = self.create_l2_policy(name="l2p1")
l2p_id = l2p['l2_policy']['id']
network_id = l2p['l2_policy']['network_id']
l3p_id = l2p['l2_policy']['l3_policy_id']
self.assertIsNotNone(network_id)
self.assertIsNotNone(l3p_id)
req = self.new_show_request('networks', network_id, fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertIsNotNone(res['network']['id'])
self.show_l3_policy(l3p_id, expected_res_status=200)
self.show_l2_policy(l2p_id, expected_res_status=200)
self.update_l2_policy(l2p_id, expected_res_status=200,
name="new name")
self.delete_l2_policy(l2p_id, expected_res_status=204)
self.show_l2_policy(l2p_id, expected_res_status=404)
req = self.new_show_request('networks', network_id, fmt=self.fmt)
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNotFound.code, res.status_int)
self.show_l3_policy(l3p_id, expected_res_status=404)
class TestL2PolicyRollback(CommonNeutronBaseTestCase):
def setUp(self, policy_drivers=None,
core_plugin=n_test_plugin.PLUGIN_NAME, ml2_options=None,
sc_plugin=None):
policy_drivers = policy_drivers or ['neutron_resources',
'dummy']
super(TestL2PolicyRollback, self).setUp(policy_drivers=policy_drivers,
core_plugin=core_plugin,
ml2_options=ml2_options,
sc_plugin=sc_plugin)
self.dummy_driver = manager.NeutronManager.get_service_plugins()[
'GROUP_POLICY'].policy_driver_manager.policy_drivers['dummy'].obj
def test_l2_policy_create_fail(self):
self.dummy_driver.create_l2_policy_precommit = mock.Mock(
side_effect=Exception)
self.create_l2_policy(name="l2p1", expected_res_status=500)
self.assertEqual([], self._plugin.get_networks(self._context))
self.assertEqual([], self._gbp_plugin.get_l2_policies(self._context))
self.assertEqual([], self._gbp_plugin.get_l3_policies(self._context))
def test_l2_policy_update_fail(self):
self.dummy_driver.update_l2_policy_precommit = mock.Mock(
side_effect=Exception)
l2p = self.create_l2_policy(name="l2p1")
l2p_id = l2p['l2_policy']['id']
self.update_l2_policy(l2p_id, expected_res_status=500,
name="new name")
new_l2p = self.show_l2_policy(l2p_id, expected_res_status=200)
self.assertEqual(l2p['l2_policy']['name'],
new_l2p['l2_policy']['name'])
def test_l2_policy_delete_fail(self):
self.dummy_driver.delete_l2_policy_precommit = mock.Mock(
side_effect=Exception)
l2p = self.create_l2_policy(name="l2p1")
l2p_id = l2p['l2_policy']['id']
network_id = l2p['l2_policy']['network_id']
l3p_id = l2p['l2_policy']['l3_policy_id']
self.delete_l2_policy(l2p_id, expected_res_status=500)
req = self.new_show_request('networks', network_id, fmt=self.fmt)
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertIsNotNone(res['network']['id'])
self.show_l3_policy(l3p_id, expected_res_status=200)
self.show_l2_policy(l2p_id, expected_res_status=200)

View File

@@ -565,8 +565,10 @@ class NodeCompositionPluginTestCase(
# Verify notification issued for created PT in the provider
pt = self.create_policy_target(
policy_target_group_id=provider['id'])['policy_target']
pt['port_attributes'] = {}
self.assertEqual(1, add.call_count)
add.assert_called_with(mock.ANY, pt)
del pt['port_attributes']
# Verify notification issued for deleted PT in the provider
self.delete_policy_target(pt['id'])

View File

@@ -52,7 +52,9 @@ gbpservice.neutron.group_policy.policy_drivers =
dummy = gbpservice.neutron.services.grouppolicy.drivers.dummy_driver:NoopDriver
implicit_policy = gbpservice.neutron.services.grouppolicy.drivers.implicit_policy:ImplicitPolicyDriver
resource_mapping = gbpservice.neutron.services.grouppolicy.drivers.resource_mapping:ResourceMappingDriver
neutron_resources = gbpservice.neutron.services.grouppolicy.drivers.neutron_resources:CommonNeutronBase
chain_mapping = gbpservice.neutron.services.grouppolicy.drivers.chain_mapping:ChainMappingDriver
aim_mapping = gbpservice.neutron.services.grouppolicy.drivers.cisco.apic.aim_mapping:AIMMappingDriver
apic = gbpservice.neutron.services.grouppolicy.drivers.cisco.apic.apic_mapping:ApicMappingDriver
odl = gbpservice.neutron.services.grouppolicy.drivers.odl.odl_mapping:OdlMappingDriver
oneconvergence_gbp_driver = gbpservice.neutron.services.grouppolicy.drivers.oneconvergence.nvsd_gbp_driver:NvsdGbpDriver