Handle concurrent implicit L3P creation

In the implicit_policy driver, when creating a default L3 policy,
raise DefaultL3PolicyAlreadyExists if an L3 policy named 'default'
already exists.

If DefaultL3PolicyAlreadyExists is raised when the implicit_policy
driver attempts to create the default L3 policy for a tenant, query
again to see if a default L3 policy has been concurrently created, and
if so, use that. This requires adding local_api wrappers for
postcommit group policy resource CRUD operations called in the
implicit_policy driver, so that clean DB sessions are used.

Also, fix the resource_mapping driver's
delete_policy_target_group_postcommit to gracefully handle partially
constructed states, such as a null L2 policy or a subnet not attached
to a router.

Closes-bug: 1462024
Partial-bug: 1417312

Change-Id: I09f29eef22edb45290070aae30e97c93c77ea341
This commit is contained in:
Robert Kukura
2015-09-16 14:50:26 -04:00
parent 3becb34638
commit 7acac86b65
5 changed files with 232 additions and 98 deletions

View File

@@ -23,6 +23,7 @@ from oslo_config import cfg
from oslo_log import log as logging
from gbpservice.common import utils
from gbpservice.neutron.extensions import group_policy as gp_ext
from gbpservice.neutron.extensions import servicechain as sc_ext
from gbpservice.neutron.services.grouppolicy.common import exceptions as exc
@@ -53,6 +54,17 @@ class LocalAPI(object):
raise exc.GroupPolicyDeploymentError()
return l3_plugin
@property
def _group_policy_plugin(self):
# REVISIT(rkukura): Need initialization method after all
# plugins are loaded to grab and store plugin.
plugins = manager.NeutronManager.get_service_plugins()
group_policy_plugin = plugins.get(pconst.GROUP_POLICY)
if not group_policy_plugin:
LOG.error(_("No GroupPolicy service plugin found."))
raise exc.GroupPolicyDeploymentError()
return group_policy_plugin
@property
def _servicechain_plugin(self):
# REVISIT(rkukura): Need initialization method after all
@@ -75,7 +87,8 @@ class LocalAPI(object):
dhcp_rpc_agent_api.DhcpAgentNotifyAPI())
return self._cached_agent_notifier
def _create_resource(self, plugin, context, resource, attrs):
def _create_resource(self, plugin, context, resource, attrs,
do_notify=True):
# REVISIT(rkukura): Do create.start notification?
# REVISIT(rkukura): Check authorization?
# REVISIT(rkukura): Do quota?
@@ -83,16 +96,18 @@ class LocalAPI(object):
action = 'create_' + resource
obj_creator = getattr(plugin, action)
obj = obj_creator(context, {resource: attrs})
self._nova_notifier.send_network_change(action, {},
{resource: obj})
# REVISIT(rkukura): Do create.end notification?
if cfg.CONF.dhcp_agent_notification:
self._dhcp_agent_notifier.notify(context,
{resource: obj},
resource + '.create.end')
if do_notify:
self._nova_notifier.send_network_change(action, {},
{resource: obj})
# REVISIT(rkukura): Do create.end notification?
if cfg.CONF.dhcp_agent_notification:
self._dhcp_agent_notifier.notify(context,
{resource: obj},
resource + '.create.end')
return obj
def _update_resource(self, plugin, context, resource, resource_id, attrs):
def _update_resource(self, plugin, context, resource, resource_id, attrs,
do_notify=True):
# REVISIT(rkukura): Do update.start notification?
# REVISIT(rkukura): Check authorization?
with utils.clean_session(context.session):
@@ -101,16 +116,18 @@ class LocalAPI(object):
action = 'update_' + resource
obj_updater = getattr(plugin, action)
obj = obj_updater(context, resource_id, {resource: attrs})
self._nova_notifier.send_network_change(action, orig_obj,
{resource: obj})
# REVISIT(rkukura): Do update.end notification?
if cfg.CONF.dhcp_agent_notification:
self._dhcp_agent_notifier.notify(context,
{resource: obj},
resource + '.update.end')
if do_notify:
self._nova_notifier.send_network_change(action, orig_obj,
{resource: obj})
# REVISIT(rkukura): Do update.end notification?
if cfg.CONF.dhcp_agent_notification:
self._dhcp_agent_notifier.notify(context,
{resource: obj},
resource + '.update.end')
return obj
def _delete_resource(self, plugin, context, resource, resource_id):
def _delete_resource(self, plugin, context, resource, resource_id,
do_notify=True):
# REVISIT(rkukura): Do delete.start notification?
# REVISIT(rkukura): Check authorization?
with utils.clean_session(context.session):
@@ -119,13 +136,14 @@ class LocalAPI(object):
action = 'delete_' + resource
obj_deleter = getattr(plugin, action)
obj_deleter(context, resource_id)
self._nova_notifier.send_network_change(action, {},
{resource: obj})
# REVISIT(rkukura): Do delete.end notification?
if cfg.CONF.dhcp_agent_notification:
self._dhcp_agent_notifier.notify(context,
{resource: obj},
resource + '.delete.end')
if do_notify:
self._nova_notifier.send_network_change(action, {},
{resource: obj})
# REVISIT(rkukura): Do delete.end notification?
if cfg.CONF.dhcp_agent_notification:
self._dhcp_agent_notifier.notify(context,
{resource: obj},
resource + '.delete.end')
def _get_resource(self, plugin, context, resource, resource_id):
with utils.clean_session(context.session):
@@ -133,9 +151,9 @@ class LocalAPI(object):
obj = obj_getter(context, resource_id)
return obj
def _get_resources(self, plugin, context, resource, filters=None):
def _get_resources(self, plugin, context, resource_plural, filters=None):
with utils.clean_session(context.session):
obj_getter = getattr(plugin, 'get_' + resource + 's')
obj_getter = getattr(plugin, 'get_' + resource_plural)
obj = obj_getter(context, filters)
return obj
@@ -152,7 +170,7 @@ class LocalAPI(object):
def _get_ports(self, plugin_context, filters=None):
filters = filters or {}
return self._get_resources(self._core_plugin, plugin_context, 'port',
return self._get_resources(self._core_plugin, plugin_context, 'ports',
filters)
def _create_port(self, plugin_context, attrs):
@@ -176,8 +194,8 @@ class LocalAPI(object):
def _get_subnets(self, plugin_context, filters=None):
filters = filters or {}
return self._get_resources(self._core_plugin, plugin_context, 'subnet',
filters)
return self._get_resources(self._core_plugin, plugin_context,
'subnets', filters)
def _create_subnet(self, plugin_context, attrs):
return self._create_resource(self._core_plugin, plugin_context,
@@ -201,7 +219,7 @@ class LocalAPI(object):
def _get_networks(self, plugin_context, filters=None):
filters = filters or {}
return self._get_resources(
self._core_plugin, plugin_context, 'network', filters)
self._core_plugin, plugin_context, 'networks', filters)
def _create_network(self, plugin_context, attrs):
return self._create_resource(self._core_plugin, plugin_context,
@@ -220,7 +238,7 @@ class LocalAPI(object):
def _get_routers(self, plugin_context, filters=None):
filters = filters or {}
return self._get_resources(self._l3_plugin, plugin_context, 'router',
return self._get_resources(self._l3_plugin, plugin_context, 'routers',
filters)
def _create_router(self, plugin_context, attrs):
@@ -265,7 +283,7 @@ class LocalAPI(object):
def _get_sgs(self, plugin_context, filters=None):
filters = filters or {}
return self._get_resources(
self._core_plugin, plugin_context, 'security_group', filters)
self._core_plugin, plugin_context, 'security_groups', filters)
def _create_sg(self, plugin_context, attrs):
return self._create_resource(self._core_plugin, plugin_context,
@@ -290,7 +308,7 @@ class LocalAPI(object):
def _get_sg_rules(self, plugin_context, filters=None):
filters = filters or {}
return self._get_resources(
self._core_plugin, plugin_context, 'security_group_rule', filters)
self._core_plugin, plugin_context, 'security_group_rules', filters)
def _create_sg_rule(self, plugin_context, attrs):
try:
@@ -319,7 +337,7 @@ class LocalAPI(object):
def _get_fips(self, plugin_context, filters=None):
filters = filters or {}
return self._get_resources(
self._l3_plugin, plugin_context, 'floatingip', filters)
self._l3_plugin, plugin_context, 'floatingips', filters)
def _create_fip(self, plugin_context, attrs):
return self._create_resource(self._l3_plugin, plugin_context,
@@ -336,59 +354,123 @@ class LocalAPI(object):
except l3.FloatingIPNotFound:
LOG.warn(_('Floating IP %s Already deleted'), fip_id)
def _get_l2_policy(self, plugin_context, l2p_id):
return self._get_resource(self._group_policy_plugin, plugin_context,
'l2_policy', l2p_id)
def _get_l2_policies(self, plugin_context, filters=None):
filters = filters or {}
return self._get_resources(self._group_policy_plugin, plugin_context,
'l2_policies', filters)
def _create_l2_policy(self, plugin_context, attrs):
return self._create_resource(self._group_policy_plugin, plugin_context,
'l2_policy', attrs, False)
def _update_l2_policy(self, plugin_context, l2p_id, attrs):
return self._update_resource(self._group_policy_plugin, plugin_context,
'l2_policy', l2p_id, attrs, False)
def _delete_l2_policy(self, plugin_context, l2p_id):
try:
self._delete_resource(self._group_policy_plugin,
plugin_context, 'l2_policy', l2p_id, False)
except gp_ext.L2PolicyNotFound:
LOG.warn(_('L2 Policy %s already deleted'), l2p_id)
def _get_l3_policy(self, plugin_context, l3p_id):
return self._get_resource(self._group_policy_plugin, plugin_context,
'l3_policy', l3p_id)
def _get_l3_policies(self, plugin_context, filters=None):
filters = filters or {}
return self._get_resources(self._group_policy_plugin, plugin_context,
'l3_policies', filters)
def _create_l3_policy(self, plugin_context, attrs):
return self._create_resource(self._group_policy_plugin, plugin_context,
'l3_policy', attrs, False)
def _update_l3_policy(self, plugin_context, l3p_id, attrs):
return self._update_resource(self._group_policy_plugin, plugin_context,
'l3_policy', l3p_id, attrs, False)
def _delete_l3_policy(self, plugin_context, l3p_id):
try:
self._delete_resource(self._group_policy_plugin,
plugin_context, 'l3_policy', l3p_id, False)
except gp_ext.L3PolicyNotFound:
LOG.warn(_('L3 Policy %s already deleted'), l3p_id)
def _get_external_segment(self, plugin_context, es_id):
return self._get_resource(self._group_policy_plugin, plugin_context,
'external_segment', es_id)
def _get_external_segments(self, plugin_context, filters=None):
filters = filters or {}
return self._get_resources(self._group_policy_plugin, plugin_context,
'external_segments', filters)
def _create_external_segment(self, plugin_context, attrs):
return self._create_resource(self._group_policy_plugin, plugin_context,
'external_segment', attrs, False)
def _update_external_segment(self, plugin_context, es_id, attrs):
return self._update_resource(self._group_policy_plugin, plugin_context,
'external_segment', es_id, attrs, False)
def _delete_external_segment(self, plugin_context, es_id):
try:
self._delete_resource(self._group_policy_plugin, plugin_context,
'external_segment', es_id, False)
except gp_ext.ExternalSegmentNotFound:
LOG.warn(_('External Segment %s already deleted'), es_id)
def _get_servicechain_instance(self, plugin_context, sci_id):
return self._get_resource(
self._servicechain_plugin, plugin_context, 'servicechain_instance',
sci_id)
return self._get_resource(self._servicechain_plugin, plugin_context,
'servicechain_instance', sci_id)
def _get_servicechain_instances(self, plugin_context, filters=None):
filters = filters or {}
return self._get_resources(
self._servicechain_plugin, plugin_context, 'servicechain_instance',
filters)
return self._get_resources(self._servicechain_plugin, plugin_context,
'servicechain_instances', filters)
def _create_servicechain_instance(self, plugin_context, attrs):
return self._create_resource(
self._servicechain_plugin, plugin_context,
'servicechain_instance', attrs)
return self._create_resource(self._servicechain_plugin, plugin_context,
'servicechain_instance', attrs, False)
def _update_servicechain_instance(self, plugin_context, sci_id, attrs):
return self._update_resource(self._servicechain_plugin, plugin_context,
'servicechain_instance', sci_id, attrs)
'servicechain_instance', sci_id, attrs,
False)
def _delete_servicechain_instance(self, plugin_context, sci_id):
try:
self._delete_resource(self._servicechain_plugin, plugin_context,
'servicechain_instance', sci_id)
'servicechain_instance', sci_id, False)
except sc_ext.ServiceChainInstanceNotFound:
# SC could have been already deleted
LOG.warn(_("servicechain %s already deleted"), sci_id)
def _get_servicechain_spec(self, plugin_context, scs_id):
return self._get_resource(
self._servicechain_plugin, plugin_context, 'servicechain_spec',
scs_id)
return self._get_resource(self._servicechain_plugin, plugin_context,
'servicechain_spec', scs_id)
def _get_servicechain_specs(self, plugin_context, filters=None):
filters = filters or {}
return self._get_resources(
self._servicechain_plugin, plugin_context, 'servicechain_spec',
filters)
return self._get_resources(self._servicechain_plugin, plugin_context,
'servicechain_specs', filters)
def _create_servicechain_spec(self, plugin_context, attrs):
return self._create_resource(
self._servicechain_plugin, plugin_context,
'servicechain_spec', attrs)
return self._create_resource(self._servicechain_plugin, plugin_context,
'servicechain_spec', attrs, False)
def _update_servicechain_spec(self, plugin_context, scs_id, attrs):
return self._update_resource(self._servicechain_plugin, plugin_context,
'servicechain_spec', scs_id, attrs)
'servicechain_spec', scs_id, attrs, False)
def _delete_servicechain_spec(self, context, scs_id):
def _delete_servicechain_spec(self, plugin_context, scs_id):
try:
self._delete_resource(self._servicechain_plugin,
context._plugin_context,
self._delete_resource(self._servicechain_plugin, plugin_context,
'servicechain_spec', scs_id)
except sc_ext.ServiceChainSpecNotFound:
# SC could have been already deleted
LOG.warn(_("servicechain spec %s already deleted"), scs_id)
LOG.warn(_("servicechain spec %s already deleted"), scs_id)

View File

@@ -186,9 +186,14 @@ class ImplicitSubnetNotSupported(GroupPolicyBadRequest):
message = _("RMD doesn't support implicit external subnet creation.")
class DefaultL3PolicyAlreadyExists(GroupPolicyBadRequest):
message = _("Default L3 Policy with name %(l3p_name)s already "
"exists and is visible for this tenant.")
class DefaultExternalSegmentAlreadyExists(GroupPolicyBadRequest):
message = _("Default External Segment with name %(es_name)s already "
"exists and is visible for this tenant")
"exists and is visible for this tenant.")
class InvalidCrossTenantReference(GroupPolicyBadRequest):

View File

@@ -14,8 +14,10 @@ from neutron.common import log
from neutron.db import model_base
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
import sqlalchemy as sa
from gbpservice.network.neutronv2 import local_api
from gbpservice.neutron.services.grouppolicy import (
group_policy_driver_api as api)
from gbpservice.neutron.services.grouppolicy.common import exceptions as exc
@@ -73,7 +75,7 @@ class OwnedL3Policy(model_base.BASEV2):
nullable=False, primary_key=True)
class ImplicitPolicyDriver(api.PolicyDriver):
class ImplicitPolicyDriver(api.PolicyDriver, local_api.LocalAPI):
"""Implicit Policy driver for Group Policy plugin.
This driver ensures that the l2_policy_id attribute of
@@ -150,6 +152,20 @@ class ImplicitPolicyDriver(api.PolicyDriver):
def update_external_policy_postcommit(self, context):
pass
@log.log
def create_l3_policy_precommit(self, context):
if self._default_l3p_name == context.current['name']:
LOG.debug("Creating default L3 policy: %s", context.current)
tenant_id = context.current['tenant_id']
filter = {'tenant_id': [tenant_id],
'name': [self._default_l3p_name]}
l3ps = context._plugin.get_l3_policies(context._plugin_context,
filter)
if [x for x in l3ps if x['id'] != context.current['id']]:
LOG.debug("Rejecting default L3 policy: %s", context.current)
raise exc.DefaultL3PolicyAlreadyExists(
l3p_name=self._default_l3p_name)
@log.log
def create_l3_policy_postcommit(self, context):
if not context.current['external_segments']:
@@ -160,45 +176,60 @@ class ImplicitPolicyDriver(api.PolicyDriver):
pass
def _use_implicit_l2_policy(self, context):
attrs = {'l2_policy':
{'tenant_id': context.current['tenant_id'],
'name': context.current['name'],
'description': _("Implicitly created L2 policy"),
'l3_policy_id': None,
'shared': context.current.get('shared', False),
'network_id': None}}
l2p = context._plugin.create_l2_policy(context._plugin_context, attrs)
attrs = {'tenant_id': context.current['tenant_id'],
'name': context.current['name'],
'description': _("Implicitly created L2 policy"),
'l3_policy_id': None,
'shared': context.current.get('shared', False),
'network_id': None}
l2p = self._create_l2_policy(context._plugin_context, attrs)
l2p_id = l2p['id']
self._mark_l2_policy_owned(context._plugin_context.session, l2p_id)
context.set_l2_policy_id(l2p_id)
def _cleanup_l2_policy(self, context, l2p_id):
if self._l2_policy_is_owned(context._plugin_context.session, l2p_id):
context._plugin.delete_l2_policy(context._plugin_context, l2p_id)
self._delete_l2_policy(context._plugin_context, l2p_id)
def _use_implicit_l3_policy(self, context):
filter = {'tenant_id': [context.current['tenant_id']],
tenant_id = context.current['tenant_id']
filter = {'tenant_id': [tenant_id],
'name': [self._default_l3p_name]}
l3ps = context._plugin.get_l3_policies(context._plugin_context, filter)
l3ps = self._get_l3_policies(context._plugin_context, filter)
l3p = l3ps and l3ps[0]
if not l3p:
# REVISIT(rkukura): Concurrency could result in multiple
# default L3Ps for the same tenant. A DB table mapping
# tenant_id to default l3_policy_id may be needed to
# ensure a single default L3 policy is used per tenant.
attrs = {'l3_policy':
{'tenant_id': context.current['tenant_id'],
'name': self._default_l3p_name,
'description': _("Implicitly created L3 policy"),
'ip_version': self._default_ip_version,
'ip_pool': self._default_ip_pool,
'shared': context.current.get('shared', False),
'subnet_prefix_length':
self._default_subnet_prefix_length}}
l3p = context._plugin.create_l3_policy(context._plugin_context,
attrs)
self._mark_l3_policy_owned(context._plugin_context.session,
l3p['id'])
attrs = {'tenant_id': tenant_id,
'name': self._default_l3p_name,
'description': _("Implicitly created L3 policy"),
'ip_version': self._default_ip_version,
'ip_pool': self._default_ip_pool,
'shared': context.current.get('shared', False),
'subnet_prefix_length':
self._default_subnet_prefix_length}
try:
l3p = self._create_l3_policy(context._plugin_context, attrs)
self._mark_l3_policy_owned(context._plugin_context.session,
l3p['id'])
except exc.DefaultL3PolicyAlreadyExists:
with excutils.save_and_reraise_exception(
reraise=False) as ctxt:
LOG.debug("Possible concurrent creation of default L3 "
"policy for tenant %s", tenant_id)
l3ps = self._get_l3_policies(context._plugin_context,
filter)
l3p = l3ps and l3ps[0]
if not l3p:
LOG.warning(_("Caught DefaultL3PolicyAlreadyExists, "
"but default L3 policy not concurrently "
"created for tenant %s"), tenant_id)
ctxt.reraise = True
except exc.OverlappingIPPoolsInSameTenantNotAllowed:
with excutils.save_and_reraise_exception():
LOG.info(_("Caught "
"OverlappingIPPoolsinSameTenantNotAllowed "
"during creation of default L3 policy for "
"tenant %s"), tenant_id)
context.current['l3_policy_id'] = l3p['id']
context.set_l3_policy_id(l3p['id'])
@@ -207,8 +238,7 @@ class ImplicitPolicyDriver(api.PolicyDriver):
return
filter = {'name': [self._default_es_name]}
ess = context._plugin.get_external_segments(context._plugin_context,
filter)
ess = self._get_external_segments(context._plugin_context, filter)
# Multiple default ES may exist, this can happen when a per-tenant
# default ES gets his shared attribute flipped. Always prefer the
# specific tenant's ES if any.
@@ -224,6 +254,8 @@ class ImplicitPolicyDriver(api.PolicyDriver):
def _cleanup_l3_policy(self, context, l3p_id):
if self._l3_policy_is_owned(context._plugin_context.session, l3p_id):
# REVISIT(rkukura): Add check_unused parameter to
# local_api._delete_l3_policy()?
context._plugin.delete_l3_policy(context._plugin_context, l3p_id,
check_unused=True)

View File

@@ -22,6 +22,7 @@ from neutron.common import log
from neutron import context as n_context
from neutron.db import model_base
from neutron.db import models_v2
from neutron.extensions import l3 as ext_l3
from neutron.extensions import securitygroup as ext_sg
from oslo_config import cfg
from oslo_log import log as logging
@@ -846,9 +847,11 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI):
context.current['consumed_policy_rule_sets'])
l2p_id = context.current['l2_policy_id']
router_id = self._get_routerid_for_l2policy(context, l2p_id)
for subnet_id in context.current['subnets']:
self._cleanup_subnet(context._plugin_context, subnet_id, router_id)
if l2p_id:
router_id = self._get_routerid_for_l2policy(context, l2p_id)
for subnet_id in context.current['subnets']:
self._cleanup_subnet(context._plugin_context, subnet_id,
router_id)
self._delete_default_security_group(
context._plugin_context, context.current['id'],
context.current['tenant_id'])
@@ -1720,8 +1723,12 @@ class ResourceMappingDriver(api.PolicyDriver, local_api.LocalAPI):
def _cleanup_subnet(self, plugin_context, subnet_id, router_id):
interface_info = {'subnet_id': subnet_id}
if router_id:
self._remove_router_interface(plugin_context, router_id,
interface_info)
try:
self._remove_router_interface(plugin_context, router_id,
interface_info)
except ext_l3.RouterInterfaceNotFoundForSubnet:
LOG.debug("Ignoring RouterInterfaceNotFoundForSubnet cleaning "
"up subnet: %s", subnet_id)
if self._subnet_is_owned(plugin_context.session, subnet_id):
self._delete_subnet(plugin_context, subnet_id)

View File

@@ -251,6 +251,14 @@ class TestImplicitL3Policy(ImplicitPolicyTestCase):
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
def test_single_default_policy(self):
# Verify only one default L3 policy can be created per tenant.
l3p = self.create_l3_policy(name='default')
self.assertEqual('default', l3p['l3_policy']['name'])
res = self.create_l3_policy(name='default', expected_res_status=400)
self.assertEqual('DefaultL3PolicyAlreadyExists',
res['NeutronError']['type'])
def test_update_from_implicit(self):
# Create L2 policy with implicit L3 policy.
l2p = self.create_l2_policy()