[apic-mapping] support for traffic stitching plumber

Partially implements blueprint node-centric-chain-plugin

(cherry picked from commit fe7c901330)

Change-Id: I8e08d9423379b4defa5483c58bcf53f511c77d77
This commit is contained in:
Ivar Lazzaro
2015-08-18 22:35:56 -07:00
committed by Sumit Naiksatam
parent d0dc5d3ae9
commit 86625cd58d
9 changed files with 1493 additions and 145 deletions

View File

@@ -159,6 +159,13 @@ class GroupPolicyMappingDbPlugin(gpdb.GroupPolicyDbPlugin):
ptg_db.subnets.append(assoc)
return [subnet.subnet_id for subnet in ptg_db.subnets]
def _remove_subnets_from_policy_target_groups(self, context, subnet_ids):
with context.session.begin(subtransactions=True):
assocs = context.session.query(PTGToSubnetAssociation).filter(
PTGToSubnetAssociation.subnet_id.in_(subnet_ids)).all()
for assoc in assocs:
context.session.delete(assoc)
def _set_network_for_l2_policy(self, context, l2p_id, network_id):
with context.session.begin(subtransactions=True):
l2p_db = self._get_l2_policy(context, l2p_id)

View File

@@ -31,6 +31,7 @@ from oslo_config import cfg
from oslo_log import log as logging
from gbpservice.neutron.db.grouppolicy import group_policy_mapping_db as gpdb
from gbpservice.neutron.extensions import driver_proxy_group as proxy_group
from gbpservice.neutron.extensions import group_policy as gpolicy
from gbpservice.neutron.services.grouppolicy.common import constants as g_const
from gbpservice.neutron.services.grouppolicy.common import exceptions as gpexc
@@ -116,6 +117,7 @@ IMPLICIT_PREFIX = 'implicit-'
ANY_PREFIX = 'any-'
PROMISCUOUS_SUFFIX = 'promiscuous'
APIC_OWNED = 'apic_owned_'
APIC_OWNED_RES = 'apic_owned_res_'
PROMISCUOUS_TYPES = [n_constants.DEVICE_OWNER_DHCP,
n_constants.DEVICE_OWNER_LOADBALANCER]
ALLOWING_ACTIONS = [g_const.GP_ACTION_ALLOW, g_const.GP_ACTION_REDIRECT]
@@ -205,7 +207,7 @@ class ApicMappingDriver(api.ResourceMappingDriver):
port = port_context.current
# retrieve PTG from a given Port
ptg = self._port_id_to_ptg(context, port['id'])
ptg, pt = self._port_id_to_ptg(context, port['id'])
l2p = self._network_id_to_l2p(context, port['network_id'])
if not ptg and not l2p:
return
@@ -213,17 +215,19 @@ class ApicMappingDriver(api.ResourceMappingDriver):
context._plugin_context = context
l2_policy_id = l2p['id']
ptg_tenant = self._tenant_by_sharing_policy(ptg or l2p)
if ptg:
ptg_tenant = self._tenant_by_sharing_policy(ptg)
endpoint_group_name = self.name_mapper.policy_target_group(
context, ptg)
else:
ptg_tenant = self._tenant_by_sharing_policy(l2p)
endpoint_group_name = self.name_mapper.l2_policy(
context, l2p, prefix=SHADOW_PREFIX)
def is_port_promiscuous(port):
return (port['device_owner'] in PROMISCUOUS_TYPES or
port['name'].endswith(PROMISCUOUS_SUFFIX))
port['name'].endswith(PROMISCUOUS_SUFFIX)) or (
pt and pt.get('group_default_gateway'))
details = {'device': kwargs.get('device'),
'port_id': port_id,
@@ -237,22 +241,44 @@ class ApicMappingDriver(api.ResourceMappingDriver):
'ptg_tenant': self.apic_manager.apic.fvTenant.name(
ptg_tenant),
'endpoint_group_name': str(endpoint_group_name),
'promiscuous_mode': is_port_promiscuous(port)}
'promiscuous_mode': is_port_promiscuous(port),
'extra_ips': [],
'floating_ip': [],
'ip_mapping': []}
if port['device_owner'].startswith('compute:') and port['device_id']:
vm = nclient.NovaClient().get_server(port['device_id'])
details['vm-name'] = vm.name if vm else port['device_id']
l3_policy = context._plugin.get_l3_policy(context,
l2p['l3_policy_id'])
self._add_ip_mapping_details(context, port_id, l3_policy, details)
details['floating_ip'], details['ip_mapping'] = (
self._get_ip_mapping_details(context, port_id, l3_policy))
self._add_network_details(context, port, details)
self._add_vrf_details(context, details)
is_chain_end = bool(pt and pt.get('proxy_gateway') and
ptg.get('proxied_group_id') and
not ptg.get('proxy_group'))
if is_chain_end:
# is a relevant proxy_gateway, push all the addresses from this
# chain to this PT
l3_policy = context._plugin.get_l3_policy(
context, l2p['l3_policy_id'])
while ptg['proxied_group_id']:
proxied = self._gbp_plugin.get_policy_target_group(
context, ptg['proxied_group_id'])
for port in self._get_ptg_ports(proxied):
details['extra_ips'].extend([x['ip_address'] for x in
port['fixed_ips']])
fips, ipms = self._get_ip_mapping_details(
context, port['id'], l3_policy)
details['floating_ip'].extend(fips)
details['ip_mapping'].extend(ipms)
ptg = proxied
return details
def _add_ip_mapping_details(self, context, port_id, l3_policy, details):
def _get_ip_mapping_details(self, context, port_id, l3_policy):
""" Add information about IP mapping for DNAT/SNAT """
if not l3_policy['external_segments']:
return
return [], []
fips = self._get_fips(context, filters={'port_id': [port_id]})
ipms = []
ess = context._plugin.get_external_segments(context._plugin_context,
@@ -275,8 +301,7 @@ class ApicMappingDriver(api.ResourceMappingDriver):
for f in fips_in_es:
f['nat_epg_name'] = nat_epg_name
f['nat_epg_tenant'] = nat_epg_tenant
details['floating_ip'] = fips
details['ip_mapping'] = ipms
return fips, ipms
def _add_network_details(self, context, port, details):
details['allowed_address_pairs'] = port['allowed_address_pairs']
@@ -301,10 +326,7 @@ class ApicMappingDriver(api.ResourceMappingDriver):
pass
def create_policy_rule_precommit(self, context):
if ('policy_actions' in context.current and
len(context.current['policy_actions']) != 1):
# TODO(ivar): to be fixed when redirect is supported
raise ExactlyOneActionPerRuleIsSupportedOnApicDriver()
self._validate_one_action_per_pr(context)
def create_policy_rule_postcommit(self, context, transaction=None):
action = context._plugin.get_policy_action(
@@ -348,6 +370,7 @@ class ApicMappingDriver(api.ResourceMappingDriver):
if not self.name_mapper._is_apic_reference(context.current):
if context.current['child_policy_rule_sets']:
raise HierarchicalContractsNotSupported()
self._reject_multiple_redirects_in_prs(context)
else:
self.name_mapper.has_valid_name(context.current)
@@ -368,13 +391,28 @@ class ApicMappingDriver(api.ResourceMappingDriver):
def create_policy_target_postcommit(self, context):
if not context.current['port_id']:
self._use_implicit_port(context)
port = self._core_plugin.get_port(context._plugin_context,
context.current['port_id'])
ptg = self.gbp_plugin.get_policy_target_group(
context._plugin_context,
context.current['policy_target_group_id'])
subnets = self._get_subnets(
context._plugin_context, {'id': ptg['subnets']})
owned = []
reserved = []
for subnet in subnets:
if not subnet['name'].startswith(APIC_OWNED_RES):
owned.append(subnet)
elif subnet['name'] == APIC_OWNED_RES + ptg['id']:
reserved.append(subnet)
self._use_implicit_port(context, subnets=reserved or owned)
port = self._get_port(context._plugin_context,
context.current['port_id'])
if self._is_port_bound(port):
self._notify_port_update(context._plugin_context, port['id'])
if self._may_have_fip(context):
self._associate_fip_to_pt(context)
self._notify_head_chain_ports(
context.current['policy_target_group_id'])
def _may_have_fip(self, context):
ptg = context._plugin.get_policy_target_group(
@@ -388,6 +426,16 @@ class ApicMappingDriver(api.ResourceMappingDriver):
if not self.name_mapper._is_apic_reference(context.current):
if context.current['subnets']:
raise ExplicitSubnetAssociationNotSupported()
if context.current.get('proxied_group_id'):
# goes in same L2P as proxied group
proxied = context._plugin.get_policy_target_group(
context._plugin_context,
context.current['proxied_group_id'])
db_group = context._plugin._get_policy_target_group(
context._plugin_context, context.current['id'])
db_group.l2_policy_id = proxied['l2_policy_id']
context.current['l2_policy_id'] = proxied['l2_policy_id']
self._validate_ptg_prss(context, context.current)
else:
self.name_mapper.has_valid_name(context.current)
@@ -416,9 +464,38 @@ class ApicMappingDriver(api.ResourceMappingDriver):
self._configure_epg_implicit_contract(
context, context.current, l2p, epg, transaction=trs)
l3p = context._plugin.get_l3_policy(
context._plugin_context, l2_policy_object['l3_policy_id'])
if context.current.get('proxied_group_id'):
self._stitch_proxy_ptg_to_l3p(context, l3p)
self._handle_network_service_policy(context)
# Handle redirect action if any
consumed_prs = context.current['consumed_policy_rule_sets']
provided_prs = context.current['provided_policy_rule_sets']
if provided_prs and not context.current.get('proxied_group_id'):
policy_rule_sets = (consumed_prs + provided_prs)
self._handle_redirect_action(context, policy_rule_sets)
self._manage_ptg_policy_rule_sets(
context, context.current['provided_policy_rule_sets'],
context.current['consumed_policy_rule_sets'], [], [])
self._set_proxy_any_contract(context.current)
# Mirror Contracts
if context.current.get('proxied_group_id'):
proxied = context._plugin.get_policy_target_group(
context._plugin_context.elevated(),
context.current['proxied_group_id'])
updated = context._plugin.update_policy_target_group(
context._plugin_context.elevated(),
context.current['id'], {
'policy_target_group': {
'provided_policy_rule_sets': dict(
(x, '') for x in proxied[
'provided_policy_rule_sets']),
'consumed_policy_rule_sets': dict(
(x, '') for x in proxied[
'consumed_policy_rule_sets'])}})
context.current.update(updated)
def create_l2_policy_precommit(self, context):
if not self.name_mapper._is_apic_reference(context.current):
@@ -502,10 +579,6 @@ class ApicMappingDriver(api.ResourceMappingDriver):
self.apic_manager.delete_tenant_filter(policy_rule, owner=tenant,
transaction=trs)
def delete_policy_rule_set_precommit(self, context):
# Intercept Parent Call
pass
def delete_policy_rule_set_postcommit(self, context):
if not self.name_mapper._is_apic_reference(context.current):
tenant = self._tenant_by_sharing_policy(context.current)
@@ -540,6 +613,36 @@ class ApicMappingDriver(api.ResourceMappingDriver):
context.current)
self.apic_manager.delete_epg_for_network(tenant, ptg)
# Place back proxied PTG, if any
if context.current.get('proxied_group_id'):
proxied = context._plugin.get_policy_target_group(
context._plugin_context,
context.current['proxied_group_id'])
ptg_name = self.name_mapper.policy_target_group(
context, proxied)
l2_policy_object = context._plugin.get_l2_policy(
context._plugin_context, context.current['l2_policy_id'])
l2_policy = self.name_mapper.l2_policy(
context, l2_policy_object)
bd_owner = self._tenant_by_sharing_policy(l2_policy_object)
tenant = self._tenant_by_sharing_policy(proxied)
self.apic_manager.ensure_epg_created(
tenant, ptg_name, bd_owner=bd_owner, bd_name=l2_policy)
# Delete shadow BD
shadow_bd = self.name_mapper.policy_target_group(
context, proxied, prefix=SHADOW_PREFIX)
self.apic_manager.delete_bd_on_apic(tenant, shadow_bd)
# Delete PTG specific subnets
subnets = self._core_plugin.get_subnets(
context._plugin_context, {'name': [APIC_OWNED +
context.current['id']]})
self.gbp_plugin._remove_subnets_from_policy_target_groups(
nctx.get_admin_context(), [x['id'] for x in subnets])
for subnet in subnets:
self._cleanup_subnet(context._plugin_context, subnet['id'],
None)
self._unset_any_contract(context.current)
def delete_l2_policy_precommit(self, context):
if not self.name_mapper._is_apic_reference(context.current):
@@ -613,6 +716,8 @@ class ApicMappingDriver(api.ResourceMappingDriver):
to_remove)
self._apply_policy_rule_set_rules(context, context.current, to_add)
self._handle_redirect_action(context, [context.current['id']])
def update_policy_target_precommit(self, context):
if (context.original['policy_target_group_id'] !=
context.current['policy_target_group_id']):
@@ -626,7 +731,7 @@ class ApicMappingDriver(api.ResourceMappingDriver):
context.current['port_id'])
def update_policy_rule_precommit(self, context):
self._reject_multiple_redirects_in_rule(context)
self._validate_one_action_per_pr(context)
old_redirect = self._get_redirect_action(context, context.original)
new_redirect = self._get_redirect_action(context, context.current)
if not old_redirect and new_redirect:
@@ -640,9 +745,10 @@ class ApicMappingDriver(api.ResourceMappingDriver):
def update_policy_rule_postcommit(self, context):
self._update_policy_rule_on_apic(context)
super(ApicMappingDriver, self).update_policy_rule_postcommit(context)
def update_policy_action_postcommit(self, context):
pass
self._handle_redirect_spec_id_update(context)
def _update_policy_rule_on_apic(self, context):
self._delete_policy_rule_from_apic(context, transaction=None)
@@ -657,6 +763,13 @@ class ApicMappingDriver(api.ResourceMappingDriver):
raise ExplicitSubnetAssociationNotSupported()
self._reject_shared_update(context, 'policy_target_group')
if set(context.original['subnets']) != set(
context.current['subnets']):
raise ExplicitSubnetAssociationNotSupported()
self._reject_shared_update(context, 'policy_target_group')
self._validate_ptg_prss(context, context.current)
self._stash_ptg_modified_chains(context)
def update_policy_target_group_postcommit(self, context):
if not self.name_mapper._is_apic_reference(context.current):
# TODO(ivar): refactor parent to avoid code duplication
@@ -682,12 +795,41 @@ class ApicMappingDriver(api.ResourceMappingDriver):
set(orig_consumed_policy_rule_sets) - set(
curr_consumed_policy_rule_sets))
self._handle_nsp_update_on_ptg(context)
self._cleanup_redirect_action(context)
if self._is_redirect_in_policy_rule_sets(
context, new_provided_policy_rule_sets) and not (
context.current.get('proxied_group_id')):
policy_rule_sets = (curr_consumed_policy_rule_sets +
curr_provided_policy_rule_sets)
self._handle_redirect_action(context, policy_rule_sets)
self._manage_ptg_policy_rule_sets(
context, new_provided_policy_rule_sets,
new_consumed_policy_rule_sets,
removed_provided_policy_rule_sets,
removed_consumed_policy_rule_sets)
# Set same contracts to proxy group
# Refresh current after the above operations took place
current = context._plugin.get_policy_target_group(
context._plugin_context, context.current['id'])
if current.get('proxy_group_id'):
proxy = context._plugin.get_policy_target_group(
context._plugin_context.elevated(),
current['proxy_group_id'])
context._plugin.update_policy_target_group(
context._plugin_context.elevated(),
proxy['id'], {
'policy_target_group': {
'provided_policy_rule_sets': dict(
(x, '') for x in current[
'provided_policy_rule_sets']),
'consumed_policy_rule_sets': dict(
(x, '') for x in current[
'consumed_policy_rule_sets'])}})
def update_l3_policy_precommit(self, context):
self._reject_apic_name_change(context)
if not self.name_mapper._is_apic_reference(context.current):
@@ -769,6 +911,7 @@ class ApicMappingDriver(api.ResourceMappingDriver):
self._remove_policy_rule_set_rules(
context, prs, [(rule, context.original)])
self._apply_policy_rule_set_rules(context, prs, [rule])
self._handle_classifier_update_notification(context)
def create_external_segment_precommit(self, context):
if context.current['port_address_translation']:
@@ -1159,7 +1302,7 @@ class ApicMappingDriver(api.ResourceMappingDriver):
filters={'id': ids})
def _port_to_ptg_network(self, context, port_id):
ptg = self._port_id_to_ptg(context, port_id)
ptg, _ = self._port_id_to_ptg(context, port_id)
if not ptg:
# Not GBP port
return None, None
@@ -1167,18 +1310,17 @@ class ApicMappingDriver(api.ResourceMappingDriver):
return ptg, network
def _port_id_to_pt(self, context, port_id):
pt = (context.session.query(gpdb.PolicyTargetMapping).
filter_by(port_id=port_id).first())
if pt:
db_utils = gpdb.GroupPolicyMappingDbPlugin()
return db_utils._make_policy_target_dict(pt)
pts = self.gbp_plugin.get_policy_targets(
context, {'port_id': [port_id]})
if pts:
return pts[0]
def _port_id_to_ptg(self, context, port_id):
pt = self._port_id_to_pt(context, port_id)
if pt:
return self.gbp_plugin.get_policy_target_group(
context, pt['policy_target_group_id'])
return
context, pt['policy_target_group_id']), pt
return None, None
def _l2p_id_to_network(self, context, l2p_id):
l2_policy = self.gbp_plugin.get_l2_policy(context, l2p_id)
@@ -1467,7 +1609,15 @@ class ApicMappingDriver(api.ResourceMappingDriver):
object):
return apic_manager.TENANT_COMMON
else:
return self.name_mapper.tenant(object)
if object.get('proxied_group_id'): # Then it's a proxy PTG
# Even though they may belong to a different tenant,
# the proxy PTGs will be created on the L2P's tenant to
# make APIC happy
l2p = self.gbp_plugin.get_l2_policy(
nctx.get_admin_context(), object['l2_policy_id'])
return self.name_mapper.tenant(l2p)
else:
return self.name_mapper.tenant(object)
def _get_nat_epg_for_es(self, context, es):
return ("NAT-epg-%s" %
@@ -1532,26 +1682,58 @@ class ApicMappingDriver(api.ResourceMappingDriver):
with lockutils.lock(l2p_id, external=True):
subs = self._get_l2p_subnets(context._plugin_context, l2p_id)
subs = set([x['id'] for x in subs])
added = None
added = []
# Always add a new subnet to L3 proxies
is_proxy = bool(context.current.get('proxied_group_id'))
force_add = force_add or is_proxy
if not subs or force_add:
l2p = context._plugin.get_l2_policy(context._plugin_context,
l2p_id)
if is_proxy:
name = APIC_OWNED_RES + context.current['id']
else:
name = APIC_OWNED + l2p['name']
added = super(
ApicMappingDriver, self)._use_implicit_subnet(
context, mark_as_owned=False,
subnet_specifics={'name': APIC_OWNED + l2p['name']})
subs.add(added['id'])
context, subnet_specifics={'name': name},
add_to_ptg=False, is_proxy=is_proxy)
subs |= set([x['id'] for x in added])
context.add_subnets(subs - set(context.current['subnets']))
if added:
self.process_subnet_added(context._plugin_context, added)
l3p_id = l2p['l3_policy_id']
l3p = context._plugin.get_l3_policy(context._plugin_context,
l3p_id)
for subnet in added:
self.process_subnet_added(context._plugin_context, subnet)
for router_id in l3p['routers']:
# Use admin context because router and subnet may
# be in different tenants
self._plug_router_to_subnet(nctx.get_admin_context(),
added['id'], router_id)
for subnet in added:
self._plug_router_to_subnet(nctx.get_admin_context(),
subnet['id'], router_id)
def _stitch_proxy_ptg_to_l3p(self, context, l3p):
"""Stitch proxy PTGs properly."""
# Proxied PTG is moved to a shadow BD (no routing, learning ON?)
tenant = self._tenant_by_sharing_policy(context.current)
ctx_owner = self._tenant_by_sharing_policy(l3p)
l3_policy_name = self.name_mapper.l3_policy(context, l3p)
proxied = context._plugin.get_policy_target_group(
context._plugin_context, context.current['proxied_group_id'])
bd_name = self.name_mapper.policy_target_group(
context, proxied, prefix=SHADOW_PREFIX)
ptg_name = self.name_mapper.policy_target_group(context, proxied)
is_l2 = context.current['proxy_type'] == proxy_group.PROXY_TYPE_L2
with self.apic_manager.apic.transaction(None) as trs:
# Create shadow BD to host the proxied EPG
self.apic_manager.ensure_bd_created_on_apic(
tenant, bd_name, ctx_owner=ctx_owner, ctx_name=l3_policy_name,
allow_broadcast=is_l2, unicast_route=False, transaction=trs)
# Move current PTG to different BD
self.apic_manager.ensure_epg_created(
tenant, ptg_name, bd_owner=tenant, bd_name=bd_name,
transaction=trs)
# Notify proxied ports
self._notify_proxy_gateways(proxied['id'])
def _sync_epg_subnets(self, plugin_context, l2p):
l2p_subnets = [x['id'] for x in
@@ -1740,25 +1922,6 @@ class ApicMappingDriver(api.ResourceMappingDriver):
if action['action_type'] == g_const.GP_ACTION_REDIRECT:
return action
def _validate_new_prs_redirect(self, context, prs):
if self._prss_redirect_rules(context._plugin_context.session,
[prs['id']]) > 1:
raise gpexc.MultipleRedirectActionsNotSupportedForPRS()
def _prss_redirect_rules(self, session, prs_ids):
if len(prs_ids) == 0:
# No result will be found in this case
return 0
query = (session.query(gpdb.gpdb.PolicyAction).
join(gpdb.gpdb.PolicyRuleActionAssociation).
join(gpdb.gpdb.PolicyRule).
join(gpdb.gpdb.PRSToPRAssociation).
filter(
gpdb.gpdb.PRSToPRAssociation.policy_rule_set_id.in_(prs_ids)).
filter(gpdb.gpdb.PolicyAction.action_type ==
g_const.GP_ACTION_REDIRECT))
return query.count()
def _multiple_pr_redirect_action_number(self, session, pr_ids):
# Given a set of rules, gives the total number of redirect actions
# found
@@ -1866,8 +2029,8 @@ class ApicMappingDriver(api.ResourceMappingDriver):
if router_id: # router connecting to ES's subnet exists
router = self._get_router(context._plugin_context, router_id)
else:
router_id = self._use_implicit_router(context,
l3p['name'] + '-' + es['name'])
router_id = self._use_implicit_router(
context, l3p['name'] + '-' + es['name'])
router = self._create_router_gw_for_external_segment(
context._plugin_context, es, es_dict, router_id)
if not es_dict[es['id']] or not es_dict[es['id']][0]:
@@ -1918,18 +2081,9 @@ class ApicMappingDriver(api.ResourceMappingDriver):
self._plug_router_to_subnet(plugin_context, subnet_id, router_id)
def _plug_router_to_subnet(self, plugin_context, subnet_id, router_id):
interface_info = {'subnet_id': subnet_id}
if router_id:
try:
self._add_router_interface(plugin_context, router_id,
interface_info)
return
except n_exc.IpAddressInUse as e:
LOG.debug(_("Will try to use create and use port - %s"), e)
except n_exc.BadRequest:
LOG.exception(_("Adding subnet to router failed"))
return
# Allocate port and use it as router interface
# This will avoid gateway_ip to be used
subnet = self._get_subnet(plugin_context, subnet_id)
attrs = {'tenant_id': subnet['tenant_id'],
'network_id': subnet['network_id'],
@@ -2052,4 +2206,98 @@ class ApicMappingDriver(api.ResourceMappingDriver):
def _reject_apic_name_change(self, context):
if self.name_mapper._is_apic_reference(context.original):
if context.original['name'] != context.current['name']:
raise CannotUpdateApicName()
raise CannotUpdateApicName()
def _get_ptg_ports(self, ptg):
context = nctx.get_admin_context()
pts = self._gbp_plugin.get_policy_targets(
context, {'id': ptg['policy_targets']})
port_ids = [x['port_id'] for x in pts]
return self._get_ports(context, {'id': port_ids})
def _notify_head_chain_ports(self, ptg_id):
context = nctx.get_admin_context()
ptg = self._gbp_plugin.get_policy_target_group(context, ptg_id)
# to avoid useless double notification exit now if no proxy
if not ptg.get('proxy_group_id'):
return
# Notify proxy gateway pts
while ptg['proxy_group_id']:
ptg = self._gbp_plugin.get_policy_target_group(
context, ptg['proxy_group_id'])
self._notify_proxy_gateways(ptg['id'], plugin_context=context)
def _notify_proxy_gateways(self, group_id, plugin_context=None):
plugin_context = plugin_context or nctx.get_admin_context()
proxy_pts = self._gbp_plugin.get_policy_targets(
plugin_context, {'policy_target_group_id': [group_id],
'proxy_gateway': [True]})
ports = self._get_ports(plugin_context,
{'id': [x['port_id'] for x in proxy_pts]})
for port in ports:
if self._is_port_bound(port):
self._notify_port_update(plugin_context, port['id'])
def _validate_one_action_per_pr(self, context):
if ('policy_actions' in context.current and
len(context.current['policy_actions']) != 1):
raise ExactlyOneActionPerRuleIsSupportedOnApicDriver()
def _create_any_contract(self, origin_ptg_id, transaction=None):
tenant = apic_manager.TENANT_COMMON
contract = ANY_PREFIX + origin_ptg_id
with self.apic_manager.apic.transaction(transaction) as trs:
self.apic_manager.create_contract(
contract, owner=tenant, transaction=trs)
attrs = {'etherT': 'unspecified'}
self._associate_service_filter(
tenant, contract, contract, contract, transaction=trs, **attrs)
return contract
def _delete_any_contract(self, origin_ptg_id, transaction=None):
tenant = apic_manager.TENANT_COMMON
contract = ANY_PREFIX + origin_ptg_id
with self.apic_manager.apic.transaction(transaction) as trs:
self.apic_manager.delete_contract(
contract, owner=tenant, transaction=trs)
def _get_origin_ptg(self, ptg):
context = nctx.get_admin_context()
while ptg['proxied_group_id']:
ptg = self.gbp_plugin.get_policy_target_group(
context, ptg['proxied_group_id'])
return ptg
def _set_proxy_any_contract(self, proxy_group):
if proxy_group.get('proxied_group_id'):
tenant = apic_manager.TENANT_COMMON
context = nctx.get_admin_context()
origin = self.gbp_plugin.get_policy_target_group(
context, proxy_group['proxied_group_id'])
if not origin['proxied_group_id']:
# That's the first proxy, it's a special case for we need to
# create the ANY contract
any_contract = self._create_any_contract(origin['id'])
name = self.name_mapper.policy_target_group(
context, origin)
ptg_tenant = self._tenant_by_sharing_policy(origin)
self.apic_manager.set_contract_for_epg(
ptg_tenant, name, any_contract, provider=True,
contract_owner=tenant)
else:
origin = self._get_origin_ptg(origin)
any_contract = ANY_PREFIX + origin['id']
name = self.name_mapper.policy_target_group(
context, proxy_group)
ptg_tenant = self._tenant_by_sharing_policy(proxy_group)
self.apic_manager.set_contract_for_epg(
ptg_tenant, name, any_contract, provider=False,
contract_owner=tenant)
def _unset_any_contract(self, proxy_group):
if proxy_group.get('proxied_group_id'):
context = nctx.get_admin_context()
origin = self.gbp_plugin.get_policy_target_group(
context, proxy_group['proxied_group_id'])
if not origin['proxied_group_id']:
self._delete_any_contract(origin['id'])

View File

@@ -19,6 +19,7 @@ import sqlalchemy as sa
from gbpservice.network.neutronv2 import local_api
from gbpservice.neutron.extensions import driver_proxy_group as pg_ext
from gbpservice.neutron.extensions import group_policy as gbp_ext
from gbpservice.neutron.services.grouppolicy import (
group_policy_driver_api as api)
from gbpservice.neutron.services.grouppolicy.common import exceptions as exc
@@ -215,7 +216,11 @@ class ImplicitPolicyDriver(api.PolicyDriver, local_api.LocalAPI):
def _cleanup_l2_policy(self, context, l2p_id):
if self._l2_policy_is_owned(context._plugin_context.session, l2p_id):
self._delete_l2_policy(context._plugin_context, l2p_id)
try:
self._delete_l2_policy(context._plugin_context, l2p_id)
except gbp_ext.L2PolicyInUse:
LOG.info(_("Cannot delete implicit L2 Policy %s because it's "
"in use."), l2p_id)
def _use_implicit_l3_policy(self, context):
tenant_id = context.current['tenant_id']

View File

@@ -59,20 +59,13 @@ group_policy_opts = [
help=_("Name of the Tenant that will own the service chain "
"instances for this driver. Leave empty for provider "
"owned chains."), default=''),
]
cfg.CONF.register_opts(group_policy_opts, "resource_mapping")
opts = [
cfg.ListOpt('dns_nameservers',
default=[],
help=_("List of DNS nameservers to be configured for the "
"PTG subnets")),
]
cfg.CONF.register_opts(opts, "resource_mapping")
cfg.CONF.register_opts(group_policy_opts, "resource_mapping")
class OwnedPort(model_base.BASEV2):
@@ -505,6 +498,7 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI):
self._reject_cross_tenant_ptg_l2p(context)
self._validate_ptg_subnets(context)
self._validate_nat_pool_for_nsp(context)
self._validate_ptg_prss(context, context.current)
self._validate_proxy_ptg(context)
self._validate_ptg_prss(context, context.current)
@@ -696,16 +690,15 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI):
if set(context.original['subnets']) - set(context.current['subnets']):
raise exc.PolicyTargetGroupSubnetRemovalNotSupported()
new_subnets = list(set(context.current['subnets']) -
set(context.original['subnets']))
self._validate_ptg_subnets(context, new_subnets)
self._reject_cross_tenant_ptg_l2p(context)
self._validate_ptg_subnets(context, context.current['subnets'])
self._validate_ptg_prss(context, context.current)
self._reject_cross_tenant_ptg_l2p(context)
if (context.current['network_service_policy_id'] !=
context.original['network_service_policy_id']):
self._validate_nat_pool_for_nsp(context)
self._stash_ptg_modified_chains(context)
def _stash_ptg_modified_chains(self, context):
#Update service chain instance when any ruleset is changed
orig_provided_policy_rule_sets = context.original[
'provided_policy_rule_sets']
@@ -776,15 +769,7 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI):
set(curr_consumed_policy_rule_sets) - set(
orig_consumed_policy_rule_sets))
old_nsp = context.original.get("network_service_policy_id")
new_nsp = context.current.get("network_service_policy_id")
if old_nsp != new_nsp:
if old_nsp:
self._cleanup_network_service_policy(
context,
context.original)
if new_nsp:
self._handle_network_service_policy(context)
self._handle_nsp_update_on_ptg(context)
# Only the ones set in context in precommit operation will be deleted
self._cleanup_redirect_action(context)
@@ -826,6 +811,17 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI):
context._plugin_context, context.current['id'],
context.current['tenant_id'], subnets=new_subnets)
def _handle_nsp_update_on_ptg(self, context):
old_nsp = context.original.get("network_service_policy_id")
new_nsp = context.current.get("network_service_policy_id")
if old_nsp != new_nsp:
if old_nsp:
self._cleanup_network_service_policy(
context,
context.original)
if new_nsp:
self._handle_network_service_policy(context)
@log.log
def delete_policy_target_group_precommit(self, context):
context.nsp_cleanup_ipaddress = self._get_ptg_policy_ipaddress_mapping(
@@ -1012,6 +1008,9 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI):
self._update_policy_rule_sg_rules(context, pr_sets,
policy_rule, context.original, context.current)
self._handle_classifier_update_notification(context)
def _handle_classifier_update_notification(self, context):
# Invoke Service chain update notify hook if protocol or port or
# direction is updated. The SC side will have to reclassify the chain
# and update the traffic steering programming
@@ -1559,7 +1558,7 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI):
l3p = context._plugin.get_l3_policy(context._plugin_context, l3p_id)
return l3p
def _use_implicit_port(self, context):
def _use_implicit_port(self, context, subnets=None):
ptg_id = context.current['policy_target_group_id']
ptg = context._plugin.get_policy_target_group(
context._plugin_context, ptg_id)
@@ -1568,8 +1567,9 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI):
sg_id = self._get_default_security_group(
context._plugin_context, ptg_id, context.current['tenant_id'])
last = exc.NoSubnetAvailable()
for subnet in self._get_subnets(context._plugin_context,
{'id': ptg['subnets']}):
subnets = subnets or self._get_subnets(context._plugin_context,
{'id': ptg['subnets']})
for subnet in subnets:
try:
attrs = {'tenant_id': context.current['tenant_id'],
'name': 'pt_' + context.current['name'],
@@ -1645,7 +1645,7 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI):
router_id, interface_info)
def _use_implicit_subnet(self, context, is_proxy=False, prefix_len=None,
mark_as_owned=True, subnet_specifics=None):
add_to_ptg=True, subnet_specifics=None):
# REVISIT(rkukura): This is a temporary allocation algorithm
# that depends on an exception being raised when the subnet
# being created is already in use.
@@ -1668,14 +1668,14 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI):
context, l2p, l3p, [x['cidr'] for x in subnets],
subnet_specifics)
# Unroll the generator
subnet_ids = [x['id'] for x in generator]
if mark_as_owned:
for subnet_id in subnet_ids:
self._mark_subnet_owned(
context._plugin_context.session, subnet_id)
subnets = [x for x in generator]
subnet_ids = [x['id'] for x in subnets]
for subnet_id in subnet_ids:
self._mark_subnet_owned(
context._plugin_context.session, subnet_id)
if add_to_ptg:
context.add_subnet(subnet_id)
return
return subnets
else:
# In case of non proxy PTG or L3 Proxy
LOG.debug("allocate subnets for L2 Proxy or normal PTG %s",
@@ -1706,11 +1706,11 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI):
for subnet in generator:
subnet_id = subnet['id']
try:
if mark_as_owned:
self._mark_subnet_owned(
context._plugin_context.session, subnet_id)
self._mark_subnet_owned(
context._plugin_context.session, subnet_id)
if add_to_ptg:
context.add_subnet(subnet_id)
return subnet
return [subnet]
except n_exc.InvalidInput:
# This exception is not expected. We catch this
# here so that it isn't caught below and handled
@@ -2084,6 +2084,8 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI):
if not ptgs_providing_prs:
continue
ptgs_providing_prs = context._plugin.get_policy_target_groups(
context._plugin_context.elevated(), {'id': ptgs_providing_prs})
parent_classifier_id = None
parent_spec_id = None
if policy_rule_set['parent_id']:
@@ -2124,11 +2126,13 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI):
or None)
for ptg_providing_prs in ptgs_providing_prs:
# REVISIT(Magesh): There may be concurrency issues here
self._create_or_update_chain(
context, ptg_providing_prs,
if not ptg_providing_prs.get('proxied_group_id'):
self._create_or_update_chain(
context, ptg_providing_prs['id'],
SCI_CONSUMER_NOT_AVAILABLE, spec_id,
parent_spec_id, classifier_id,
hierarchial_classifier_mismatch, policy_rule_set)
hierarchial_classifier_mismatch,
policy_rule_set)
def _create_or_update_chain(self, context, provider, consumer, spec_id,
parent_spec_id, classifier_id,

View File

@@ -134,7 +134,7 @@ class NodePlumberBase(object):
'description': TARGET_DESCRIPTION % (relationship,
node['id'],
instance['id']),
'name': SERVICE_TARGET_NAME_PREFIX + '_%s_%s_%s' % (
'name': SERVICE_TARGET_NAME_PREFIX + '%s_%s_%s' % (
relationship, node['id'][:5], instance['id'][:5]),
'port_id': None}
data.update(extra_data)

View File

@@ -130,9 +130,15 @@ class ApiManagerMixin(object):
if res.status_int != 204:
return self.deserialize(self.fmt, res)
def _get_object(self, type, id, api):
def _get_object(self, type, id, api, expected_res_status=None):
req = self.new_show_request(type, id, self.fmt)
return self.deserialize(self.fmt, req.get_response(api))
res = req.get_response(api)
if expected_res_status:
self.assertEqual(res.status_int, expected_res_status)
elif res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(self.fmt, res)
def _bind_port_to_host(self, port_id, host, data=None):
plugin = manager.NeutronManager.get_plugin()

View File

@@ -77,7 +77,7 @@ class ApicMappingTestCase(
test_rmd.ResourceMappingTestCase,
mocked.ControllerMixin, mocked.ConfigMixin):
def setUp(self):
def setUp(self, sc_plugin=None):
self.agent_conf = AGENT_CONF
cfg.CONF.register_opts(sg_cfg.security_group_opts, 'SECURITYGROUP')
config.cfg.CONF.set_override('enable_security_group', False,
@@ -103,7 +103,8 @@ class ApicMappingTestCase(
nova_client.return_value = vm
super(ApicMappingTestCase, self).setUp(
policy_drivers=['implicit_policy', 'apic'],
core_plugin=test_plugin.PLUGIN_NAME, ml2_options=ml2_opts)
core_plugin=test_plugin.PLUGIN_NAME,
ml2_options=ml2_opts, sc_plugin=sc_plugin)
engine = db_api.get_engine()
model_base.BASEV2.metadata.create_all(engine)
plugin = manager.NeutronManager.get_plugin()
@@ -735,6 +736,38 @@ class TestL3Policy(ApicMappingTestCase):
self.assertEqual('OnlyOneAddressIsAllowedPerExternalSegment',
res['NeutronError']['type'])
def test_router_interface_no_gateway(self):
self._mock_external_dict([('supported', '192.168.0.2/24')])
es = self.create_external_segment(
name='supported', cidr='192.168.0.0/24')['external_segment']
l3p = self.create_l3_policy(
external_segments={es['id']: ['192.168.0.2']},
expected_res_status=201)['l3_policy']
l2p = self.create_l2_policy(l3_policy_id=l3p['id'])['l2_policy']
ptg = self.create_policy_target_group(
l2_policy_id=l2p['id'])['policy_target_group']
l3p = self.show_l3_policy(l3p['id'])['l3_policy']
self.assertEqual(1, len(l3p['routers']))
subnet = self._show_subnet(ptg['subnets'][0])['subnet']
router_ports = self._list(
'ports',
query_params='device_id=%s' % l3p['routers'][0])['ports']
self.assertEqual(2, len(router_ports))
for port in router_ports:
self.assertEqual(1, len(port['fixed_ips']))
self.assertNotEqual(subnet['gateway_ip'],
port['fixed_ips'][0]['ip_address'])
# One of the two ports is in subnet
self.assertNotEqual(router_ports[0]['fixed_ips'][0]['subnet_id'],
router_ports[1]['fixed_ips'][0]['subnet_id'])
self.assertTrue(
router_ports[0]['fixed_ips'][0]['subnet_id'] == subnet['id'] or
router_ports[1]['fixed_ips'][0]['subnet_id'] == subnet['id'])
def _test_l3p_plugged_to_es_at_creation(self, shared_es, shared_l3p):
# Verify L3P is correctly plugged to ES on APIC during create
self._mock_external_dict([('supported', '192.168.0.2/24')])

View File

@@ -136,15 +136,16 @@ class ResourceMappingTestCase(test_plugin.GroupPolicyPluginTestCase):
sc_instances = self.deserialize(self.fmt, res)
self.assertEqual(len(sc_instances['servicechain_instances']), 0)
def _check_call_list(self, expected, observed):
def _check_call_list(self, expected, observed, check_all=True):
for call in expected:
self.assertTrue(call in observed,
msg='Call not found, expected:\n%s\nobserved:'
'\n%s' % (str(call), str(observed)))
observed.remove(call)
self.assertFalse(
len(observed),
msg='There are more calls than expected: %s' % str(observed))
if check_all:
self.assertFalse(
len(observed),
msg='There are more calls than expected: %s' % str(observed))
def _create_network(self, fmt, name, admin_state_up, **kwargs):
"""Override the routine for allowing the router:external attribute."""
@@ -384,34 +385,35 @@ class ResourceMappingTestCase(test_plugin.GroupPolicyPluginTestCase):
filter_by(policy_target_group_id=ptg_id).
all())
def _create_service_profile(self, node_type='LOADBALANCER'):
data = {'service_profile': {'service_type': node_type,
'tenant_id': self._tenant_id}}
scn_req = self.new_create_request(SERVICE_PROFILES, data, self.fmt)
node = self.deserialize(self.fmt, scn_req.get_response(self.ext_api))
scn_id = node['service_profile']['id']
return scn_id
def _create_service_profile(self, node_type='LOADBALANCER', shared=False):
data = {'service_type': node_type, 'shared': shared}
profile = self.create_service_profile(expected_res_status=201,
is_admin_context=shared,
**data)
scp_id = profile['service_profile']['id']
return scp_id
def _create_servicechain_node(self, node_type="LOADBALANCER"):
profile_id = self._create_service_profile(node_type)
data = {'servicechain_node': {'service_profile_id': profile_id,
'tenant_id': self._tenant_id,
'config': "{}"}}
scn_req = self.new_create_request(SERVICECHAIN_NODES, data, self.fmt)
node = self.deserialize(self.fmt, scn_req.get_response(self.ext_api))
def _create_servicechain_node(self, node_type="LOADBALANCER",
shared=False):
profile_id = self._create_service_profile(node_type, shared=shared)
data = {'service_profile_id': profile_id,
'config': "{}", 'shared': shared}
node = self.create_servicechain_node(expected_res_status=201,
is_admin_context=shared,
**data)
scn_id = node['servicechain_node']['id']
return scn_id
def _create_servicechain_spec(self, node_types=[]):
if not node_types:
node_types = ['LOADBALANCER']
def _create_servicechain_spec(self, node_types=None, shared=False):
node_types = node_types or ['LOADBALANCER']
node_ids = []
for node_type in node_types:
node_ids.append(self._create_servicechain_node(node_type))
data = {'servicechain_spec': {'tenant_id': self._tenant_id,
'nodes': node_ids}}
scs_req = self.new_create_request(SERVICECHAIN_SPECS, data, self.fmt)
spec = self.deserialize(self.fmt, scs_req.get_response(self.ext_api))
node_ids.append(self._create_servicechain_node(
node_type, shared=shared))
data = {'nodes': node_ids, 'shared': shared}
spec = self.create_servicechain_spec(expected_res_status=201,
is_admin_context=shared,
**data)
scs_id = spec['servicechain_spec']['id']
return scs_id

File diff suppressed because it is too large Load Diff