Merge "Make L3 HA interface creation concurrency safe"
This commit is contained in:
commit
ab614a10a7
@ -30,12 +30,14 @@ import signal
|
||||
import socket
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from eventlet.green import subprocess
|
||||
import netaddr
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import importutils
|
||||
@ -45,6 +47,7 @@ from stevedore import driver
|
||||
|
||||
from neutron._i18n import _, _LE
|
||||
from neutron.common import constants as n_const
|
||||
from neutron.db import api as db_api
|
||||
|
||||
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -551,3 +554,88 @@ def port_rule_masking(port_min, port_max):
|
||||
_hex_format(mask)))
|
||||
|
||||
return rules
|
||||
|
||||
|
||||
def create_object_with_dependency(creator, dep_getter, dep_creator,
|
||||
dep_id_attr):
|
||||
"""Creates an object that binds to a dependency while handling races.
|
||||
|
||||
creator is a function that expected to take the result of either
|
||||
dep_getter or dep_creator.
|
||||
|
||||
The result of dep_getter and dep_creator must have an attribute of
|
||||
dep_id_attr be used to determine if the dependency changed during object
|
||||
creation.
|
||||
|
||||
dep_getter should return None if the dependency does not exist
|
||||
|
||||
dep_creator can raise a DBDuplicateEntry to indicate that a concurrent
|
||||
create of the dependency occured and the process will restart to get the
|
||||
concurrently created one
|
||||
|
||||
This function will return both the created object and the dependency it
|
||||
used/created.
|
||||
|
||||
This function protects against all of the cases where the dependency can
|
||||
be concurrently removed by catching exceptions and restarting the
|
||||
process of creating the dependency if one no longer exists. It will
|
||||
give up after neutron.db.api.MAX_RETRIES and raise the exception it
|
||||
encounters after that.
|
||||
|
||||
TODO(kevinbenton): currently this does not try to delete the dependency
|
||||
it created. This matches the semantics of the HA network logic it is used
|
||||
for but it should be modified to cleanup in the future.
|
||||
"""
|
||||
result, dependency, dep_id = None, None, None
|
||||
for attempts in range(1, db_api.MAX_RETRIES + 1):
|
||||
# we go to max + 1 here so the exception handlers can raise their
|
||||
# errors at the end
|
||||
try:
|
||||
dependency = dep_getter() or dep_creator()
|
||||
dep_id = getattr(dependency, dep_id_attr)
|
||||
except db_exc.DBDuplicateEntry:
|
||||
# dependency was concurrently created.
|
||||
with excutils.save_and_reraise_exception() as ctx:
|
||||
if attempts < db_api.MAX_RETRIES:
|
||||
# sleep for a random time between 0 and 1 second to
|
||||
# make sure a concurrent worker doesn't retry again
|
||||
# at exactly the same time
|
||||
time.sleep(random.uniform(0, 1))
|
||||
ctx.reraise = False
|
||||
continue
|
||||
try:
|
||||
result = creator(dependency)
|
||||
break
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception() as ctx:
|
||||
# check if dependency we tried to use was removed during
|
||||
# object creation
|
||||
if attempts < db_api.MAX_RETRIES:
|
||||
dependency = dep_getter()
|
||||
if not dependency or dep_id != getattr(dependency,
|
||||
dep_id_attr):
|
||||
ctx.reraise = False
|
||||
return result, dependency
|
||||
|
||||
|
||||
def transaction_guard(f):
|
||||
"""Ensures that the context passed in is not in a transaction.
|
||||
|
||||
Various Neutron methods modifying resources have assumptions that they will
|
||||
not be called inside of a transaction because they perform operations that
|
||||
expect all data to be committed to the database (e.g. ML2 postcommit calls)
|
||||
and/or they have side effects on external systems.
|
||||
So calling them in a transaction can lead to consistency errors on failures
|
||||
since the side effect will not be reverted on a DB rollback.
|
||||
|
||||
If you receive this error, you must alter your code to handle the fact that
|
||||
the thing you are calling can have side effects so using transactions to
|
||||
undo on failures is not possible.
|
||||
"""
|
||||
@functools.wraps(f)
|
||||
def inner(self, context, *args, **kwargs):
|
||||
if context.session.is_active:
|
||||
raise RuntimeError(_("Method cannot be called within a "
|
||||
"transaction."))
|
||||
return f(self, context, *args, **kwargs)
|
||||
return inner
|
||||
|
@ -258,6 +258,17 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
|
||||
ha_network = L3HARouterNetwork(tenant_id=tenant_id,
|
||||
network_id=network_id)
|
||||
context.session.add(ha_network)
|
||||
# we need to check if someone else just inserted at exactly the
|
||||
# same time as us because there is no constrain in L3HARouterNetwork
|
||||
# that prevents multiple networks per tenant
|
||||
with context.session.begin(subtransactions=True):
|
||||
items = (context.session.query(L3HARouterNetwork).
|
||||
filter_by(tenant_id=tenant_id).all())
|
||||
if len(items) > 1:
|
||||
# we need to throw an error so our network is deleted
|
||||
# and the process is started over where the existing
|
||||
# network will be selected.
|
||||
raise db_exc.DBDuplicateEntry(columns=['tenant_id'])
|
||||
return ha_network
|
||||
|
||||
def _add_ha_network_settings(self, network):
|
||||
@ -425,6 +436,19 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
|
||||
return super(L3_HA_NAT_db_mixin,
|
||||
self)._get_device_owner(context, router)
|
||||
|
||||
@n_utils.transaction_guard
|
||||
def _create_ha_interfaces_and_ensure_network(self, context, router_db):
|
||||
"""Attach interfaces to a network while tolerating network deletes."""
|
||||
creator = functools.partial(self._create_ha_interfaces,
|
||||
context, router_db)
|
||||
dep_getter = functools.partial(self.get_ha_network,
|
||||
context, router_db.tenant_id)
|
||||
dep_creator = functools.partial(self._create_ha_network,
|
||||
context, router_db.tenant_id)
|
||||
dep_id_attr = 'network_id'
|
||||
return n_utils.create_object_with_dependency(
|
||||
creator, dep_getter, dep_creator, dep_id_attr)
|
||||
|
||||
def create_router(self, context, router):
|
||||
is_ha = self._is_ha(router['router'])
|
||||
router['router']['ha'] = is_ha
|
||||
@ -433,14 +457,12 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
|
||||
if is_ha:
|
||||
try:
|
||||
router_db = self._get_router(context, router_dict['id'])
|
||||
ha_network = self.get_ha_network(context,
|
||||
router_db.tenant_id)
|
||||
if not ha_network:
|
||||
ha_network = self._create_ha_network(context,
|
||||
router_db.tenant_id)
|
||||
# the following returns interfaces and the network we only
|
||||
# care about the network
|
||||
ha_network = self._create_ha_interfaces_and_ensure_network(
|
||||
context, router_db)[1]
|
||||
|
||||
self._set_vr_id(context, router_db, ha_network)
|
||||
self._create_ha_interfaces(context, router_db, ha_network)
|
||||
self._notify_ha_interfaces_updated(context, router_db.id)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
@ -509,12 +531,9 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
|
||||
self._unbind_ha_router(context, router_id)
|
||||
|
||||
if requested_ha_state:
|
||||
if not ha_network:
|
||||
ha_network = self._create_ha_network(context,
|
||||
router_db.tenant_id)
|
||||
|
||||
ha_network = self._create_ha_interfaces_and_ensure_network(
|
||||
context, router_db)[1]
|
||||
self._set_vr_id(context, router_db, ha_network)
|
||||
self._create_ha_interfaces(context, router_db, ha_network)
|
||||
self._notify_ha_interfaces_updated(context, router_db.id)
|
||||
else:
|
||||
self._delete_ha_interfaces(context, router_db.id)
|
||||
|
@ -13,8 +13,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import functools
|
||||
|
||||
from eventlet import greenthread
|
||||
from oslo_config import cfg
|
||||
from oslo_db import api as oslo_db_api
|
||||
@ -93,28 +91,6 @@ SERVICE_PLUGINS_REQUIRED_DRIVERS = {
|
||||
}
|
||||
|
||||
|
||||
def transaction_guard(f):
|
||||
"""Ensures that the context passed in is not in a transaction.
|
||||
|
||||
Many of ML2's methods modifying resources have assumptions that they will
|
||||
not be called inside of a transaction because they perform operations that
|
||||
expect all data to be committed to the database (all postcommit calls).
|
||||
Calling them in a transaction can lead to consistency errors on failures
|
||||
since the ML2 drivers will not be notified of a DB rollback.
|
||||
|
||||
If you receive this error, you must alter your code to handle the fact that
|
||||
ML2 calls have side effects so using transactions to undo on failures is
|
||||
not possible.
|
||||
"""
|
||||
@functools.wraps(f)
|
||||
def inner(self, context, *args, **kwargs):
|
||||
if context.session.is_active:
|
||||
raise RuntimeError(_("Method cannot be called within a "
|
||||
"transaction."))
|
||||
return f(self, context, *args, **kwargs)
|
||||
return inner
|
||||
|
||||
|
||||
class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
dvr_mac_db.DVRDbMixin,
|
||||
external_net_db.External_net_db_mixin,
|
||||
@ -813,7 +789,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
LOG.exception(_LE("Exception auto-deleting subnet %s"),
|
||||
subnet_id)
|
||||
|
||||
@transaction_guard
|
||||
@utils.transaction_guard
|
||||
def delete_network(self, context, id):
|
||||
# REVISIT(rkukura) The super(Ml2Plugin, self).delete_network()
|
||||
# function is not used because it auto-deletes ports and
|
||||
@ -948,7 +924,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.mechanism_manager.update_subnet_postcommit(mech_context)
|
||||
return updated_subnet
|
||||
|
||||
@transaction_guard
|
||||
@utils.transaction_guard
|
||||
def delete_subnet(self, context, id):
|
||||
# REVISIT(rkukura) The super(Ml2Plugin, self).delete_subnet()
|
||||
# function is not used because it deallocates the subnet's addresses
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import abc
|
||||
import collections
|
||||
import functools
|
||||
import itertools
|
||||
import random
|
||||
|
||||
@ -255,13 +256,24 @@ class L3Scheduler(object):
|
||||
return False
|
||||
return True
|
||||
|
||||
def _add_port_from_net(self, plugin, ctxt, router_id, tenant_id, ha_net):
|
||||
"""small wrapper function to unpack network id from ha_network"""
|
||||
return plugin.add_ha_port(ctxt, router_id, ha_net.network.id,
|
||||
tenant_id)
|
||||
|
||||
def create_ha_port_and_bind(self, plugin, context, router_id,
|
||||
tenant_id, agent):
|
||||
"""Creates and binds a new HA port for this agent."""
|
||||
ha_network = plugin.get_ha_network(context, tenant_id)
|
||||
ctxt = context.elevated()
|
||||
creator = functools.partial(self._add_port_from_net,
|
||||
plugin, ctxt, router_id, tenant_id)
|
||||
dep_getter = functools.partial(plugin.get_ha_network, ctxt, tenant_id)
|
||||
dep_creator = functools.partial(plugin._create_ha_network,
|
||||
ctxt, tenant_id)
|
||||
dep_id_attr = 'network_id'
|
||||
try:
|
||||
port_binding = plugin.add_ha_port(context.elevated(), router_id,
|
||||
ha_network.network.id, tenant_id)
|
||||
port_binding = utils.create_object_with_dependency(
|
||||
creator, dep_getter, dep_creator, dep_id_attr)[0]
|
||||
with db_api.autonested_transaction(context.session):
|
||||
port_binding.l3_agent_id = agent['id']
|
||||
except db_exc.DBDuplicateEntry:
|
||||
|
@ -14,9 +14,11 @@
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_utils import uuidutils
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import orm
|
||||
import testtools
|
||||
|
||||
from neutron.api.rpc.handlers import l3_rpc
|
||||
from neutron.api.v2 import attributes
|
||||
@ -584,6 +586,82 @@ class L3HATestCase(L3HATestFramework):
|
||||
networks_after = self.core_plugin.get_networks(self.admin_ctx)
|
||||
self.assertEqual(networks_before, networks_after)
|
||||
|
||||
def test_create_ha_interfaces_and_ensure_network_net_exists(self):
|
||||
router = self._create_router()
|
||||
router_db = self.plugin._get_router(self.admin_ctx, router['id'])
|
||||
with mock.patch.object(self.plugin, '_create_ha_network') as create:
|
||||
self.plugin._create_ha_interfaces_and_ensure_network(
|
||||
self.admin_ctx, router_db)
|
||||
self.assertFalse(create.called)
|
||||
|
||||
def test_create_ha_interfaces_and_ensure_network_concurrent_create(self):
|
||||
# create a non-ha router so we can manually invoke the create ha
|
||||
# interfaces call down below
|
||||
router = self._create_router(ha=False)
|
||||
router_db = self.plugin._get_router(self.admin_ctx, router['id'])
|
||||
orig_create = self.plugin._create_ha_network
|
||||
created_nets = []
|
||||
|
||||
def _create_ha_network(*args, **kwargs):
|
||||
# create the network and then raise the error to simulate another
|
||||
# worker creating the network before us.
|
||||
created_nets.append(orig_create(*args, **kwargs))
|
||||
raise db_exc.DBDuplicateEntry(columns=['tenant_id'])
|
||||
with mock.patch.object(self.plugin, '_create_ha_network',
|
||||
new=_create_ha_network):
|
||||
net = self.plugin._create_ha_interfaces_and_ensure_network(
|
||||
self.admin_ctx, router_db)[1]
|
||||
# ensure that it used the concurrently created network
|
||||
self.assertEqual([net], created_nets)
|
||||
|
||||
def _test_ensure_with_patched_int_create(self, _create_ha_interfaces):
|
||||
# create a non-ha router so we can manually invoke the create ha
|
||||
# interfaces call down below
|
||||
router = self._create_router(ha=False)
|
||||
router_db = self.plugin._get_router(self.admin_ctx, router['id'])
|
||||
with mock.patch.object(self.plugin, '_create_ha_interfaces',
|
||||
new=_create_ha_interfaces):
|
||||
self.plugin._create_ha_interfaces_and_ensure_network(
|
||||
self.admin_ctx, router_db)
|
||||
self.assertTrue(_create_ha_interfaces.called)
|
||||
|
||||
def test_create_ha_interfaces_and_ensure_network_concurrent_delete(self):
|
||||
orig_create = self.plugin._create_ha_interfaces
|
||||
|
||||
def _create_ha_interfaces(ctx, rdb, ha_net):
|
||||
# concurrent delete on the first attempt
|
||||
if not getattr(_create_ha_interfaces, 'called', False):
|
||||
setattr(_create_ha_interfaces, 'called', True)
|
||||
self.core_plugin.delete_network(self.admin_ctx,
|
||||
ha_net['network_id'])
|
||||
return orig_create(ctx, rdb, ha_net)
|
||||
self._test_ensure_with_patched_int_create(_create_ha_interfaces)
|
||||
|
||||
def test_create_ha_interfaces_and_ensure_network_concurrent_swap(self):
|
||||
orig_create = self.plugin._create_ha_interfaces
|
||||
|
||||
def _create_ha_interfaces(ctx, rdb, ha_net):
|
||||
# concurrent delete on the first attempt
|
||||
if not getattr(_create_ha_interfaces, 'called', False):
|
||||
setattr(_create_ha_interfaces, 'called', True)
|
||||
self.core_plugin.delete_network(self.admin_ctx,
|
||||
ha_net['network_id'])
|
||||
self.plugin._create_ha_network(self.admin_ctx,
|
||||
rdb.tenant_id)
|
||||
return orig_create(ctx, rdb, ha_net)
|
||||
|
||||
self._test_ensure_with_patched_int_create(_create_ha_interfaces)
|
||||
|
||||
def test_create_ha_network_tenant_binding_raises_duplicate(self):
|
||||
router = self._create_router()
|
||||
network = self.plugin.get_ha_network(self.admin_ctx,
|
||||
router['tenant_id'])
|
||||
self.plugin._create_ha_network_tenant_binding(
|
||||
self.admin_ctx, 't1', network['network_id'])
|
||||
with testtools.ExpectedException(db_exc.DBDuplicateEntry):
|
||||
self.plugin._create_ha_network_tenant_binding(
|
||||
self.admin_ctx, 't1', network['network_id'])
|
||||
|
||||
def test_create_ha_interfaces_binding_failure_rolls_back_ports(self):
|
||||
router = self._create_router()
|
||||
network = self.plugin.get_ha_network(self.admin_ctx,
|
||||
|
Loading…
Reference in New Issue
Block a user