diff --git a/neutron/common/utils.py b/neutron/common/utils.py index 95269057c55..3a33ac8c2bc 100644 --- a/neutron/common/utils.py +++ b/neutron/common/utils.py @@ -30,12 +30,14 @@ import signal import socket import sys import tempfile +import time import uuid from eventlet.green import subprocess import netaddr from oslo_concurrency import lockutils from oslo_config import cfg +from oslo_db import exception as db_exc from oslo_log import log as logging from oslo_utils import excutils from oslo_utils import importutils @@ -45,6 +47,7 @@ from stevedore import driver from neutron._i18n import _, _LE from neutron.common import constants as n_const +from neutron.db import api as db_api TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" LOG = logging.getLogger(__name__) @@ -551,3 +554,88 @@ def port_rule_masking(port_min, port_max): _hex_format(mask))) return rules + + +def create_object_with_dependency(creator, dep_getter, dep_creator, + dep_id_attr): + """Creates an object that binds to a dependency while handling races. + + creator is a function that expected to take the result of either + dep_getter or dep_creator. + + The result of dep_getter and dep_creator must have an attribute of + dep_id_attr be used to determine if the dependency changed during object + creation. + + dep_getter should return None if the dependency does not exist + + dep_creator can raise a DBDuplicateEntry to indicate that a concurrent + create of the dependency occured and the process will restart to get the + concurrently created one + + This function will return both the created object and the dependency it + used/created. + + This function protects against all of the cases where the dependency can + be concurrently removed by catching exceptions and restarting the + process of creating the dependency if one no longer exists. It will + give up after neutron.db.api.MAX_RETRIES and raise the exception it + encounters after that. + + TODO(kevinbenton): currently this does not try to delete the dependency + it created. This matches the semantics of the HA network logic it is used + for but it should be modified to cleanup in the future. + """ + result, dependency, dep_id = None, None, None + for attempts in range(1, db_api.MAX_RETRIES + 1): + # we go to max + 1 here so the exception handlers can raise their + # errors at the end + try: + dependency = dep_getter() or dep_creator() + dep_id = getattr(dependency, dep_id_attr) + except db_exc.DBDuplicateEntry: + # dependency was concurrently created. + with excutils.save_and_reraise_exception() as ctx: + if attempts < db_api.MAX_RETRIES: + # sleep for a random time between 0 and 1 second to + # make sure a concurrent worker doesn't retry again + # at exactly the same time + time.sleep(random.uniform(0, 1)) + ctx.reraise = False + continue + try: + result = creator(dependency) + break + except Exception: + with excutils.save_and_reraise_exception() as ctx: + # check if dependency we tried to use was removed during + # object creation + if attempts < db_api.MAX_RETRIES: + dependency = dep_getter() + if not dependency or dep_id != getattr(dependency, + dep_id_attr): + ctx.reraise = False + return result, dependency + + +def transaction_guard(f): + """Ensures that the context passed in is not in a transaction. + + Various Neutron methods modifying resources have assumptions that they will + not be called inside of a transaction because they perform operations that + expect all data to be committed to the database (e.g. ML2 postcommit calls) + and/or they have side effects on external systems. + So calling them in a transaction can lead to consistency errors on failures + since the side effect will not be reverted on a DB rollback. + + If you receive this error, you must alter your code to handle the fact that + the thing you are calling can have side effects so using transactions to + undo on failures is not possible. + """ + @functools.wraps(f) + def inner(self, context, *args, **kwargs): + if context.session.is_active: + raise RuntimeError(_("Method cannot be called within a " + "transaction.")) + return f(self, context, *args, **kwargs) + return inner diff --git a/neutron/db/l3_hamode_db.py b/neutron/db/l3_hamode_db.py index 60da7b0c852..6f120cb9fbd 100644 --- a/neutron/db/l3_hamode_db.py +++ b/neutron/db/l3_hamode_db.py @@ -258,6 +258,17 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin, ha_network = L3HARouterNetwork(tenant_id=tenant_id, network_id=network_id) context.session.add(ha_network) + # we need to check if someone else just inserted at exactly the + # same time as us because there is no constrain in L3HARouterNetwork + # that prevents multiple networks per tenant + with context.session.begin(subtransactions=True): + items = (context.session.query(L3HARouterNetwork). + filter_by(tenant_id=tenant_id).all()) + if len(items) > 1: + # we need to throw an error so our network is deleted + # and the process is started over where the existing + # network will be selected. + raise db_exc.DBDuplicateEntry(columns=['tenant_id']) return ha_network def _add_ha_network_settings(self, network): @@ -425,6 +436,19 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin, return super(L3_HA_NAT_db_mixin, self)._get_device_owner(context, router) + @n_utils.transaction_guard + def _create_ha_interfaces_and_ensure_network(self, context, router_db): + """Attach interfaces to a network while tolerating network deletes.""" + creator = functools.partial(self._create_ha_interfaces, + context, router_db) + dep_getter = functools.partial(self.get_ha_network, + context, router_db.tenant_id) + dep_creator = functools.partial(self._create_ha_network, + context, router_db.tenant_id) + dep_id_attr = 'network_id' + return n_utils.create_object_with_dependency( + creator, dep_getter, dep_creator, dep_id_attr) + def create_router(self, context, router): is_ha = self._is_ha(router['router']) router['router']['ha'] = is_ha @@ -433,14 +457,12 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin, if is_ha: try: router_db = self._get_router(context, router_dict['id']) - ha_network = self.get_ha_network(context, - router_db.tenant_id) - if not ha_network: - ha_network = self._create_ha_network(context, - router_db.tenant_id) + # the following returns interfaces and the network we only + # care about the network + ha_network = self._create_ha_interfaces_and_ensure_network( + context, router_db)[1] self._set_vr_id(context, router_db, ha_network) - self._create_ha_interfaces(context, router_db, ha_network) self._notify_ha_interfaces_updated(context, router_db.id) except Exception: with excutils.save_and_reraise_exception(): @@ -509,12 +531,9 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin, self._unbind_ha_router(context, router_id) if requested_ha_state: - if not ha_network: - ha_network = self._create_ha_network(context, - router_db.tenant_id) - + ha_network = self._create_ha_interfaces_and_ensure_network( + context, router_db)[1] self._set_vr_id(context, router_db, ha_network) - self._create_ha_interfaces(context, router_db, ha_network) self._notify_ha_interfaces_updated(context, router_db.id) else: self._delete_ha_interfaces(context, router_db.id) diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 2150791d5ac..f11f8b87527 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -13,8 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import functools - from eventlet import greenthread from oslo_config import cfg from oslo_db import api as oslo_db_api @@ -93,28 +91,6 @@ SERVICE_PLUGINS_REQUIRED_DRIVERS = { } -def transaction_guard(f): - """Ensures that the context passed in is not in a transaction. - - Many of ML2's methods modifying resources have assumptions that they will - not be called inside of a transaction because they perform operations that - expect all data to be committed to the database (all postcommit calls). - Calling them in a transaction can lead to consistency errors on failures - since the ML2 drivers will not be notified of a DB rollback. - - If you receive this error, you must alter your code to handle the fact that - ML2 calls have side effects so using transactions to undo on failures is - not possible. - """ - @functools.wraps(f) - def inner(self, context, *args, **kwargs): - if context.session.is_active: - raise RuntimeError(_("Method cannot be called within a " - "transaction.")) - return f(self, context, *args, **kwargs) - return inner - - class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, dvr_mac_db.DVRDbMixin, external_net_db.External_net_db_mixin, @@ -813,7 +789,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, LOG.exception(_LE("Exception auto-deleting subnet %s"), subnet_id) - @transaction_guard + @utils.transaction_guard def delete_network(self, context, id): # REVISIT(rkukura) The super(Ml2Plugin, self).delete_network() # function is not used because it auto-deletes ports and @@ -948,7 +924,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.mechanism_manager.update_subnet_postcommit(mech_context) return updated_subnet - @transaction_guard + @utils.transaction_guard def delete_subnet(self, context, id): # REVISIT(rkukura) The super(Ml2Plugin, self).delete_subnet() # function is not used because it deallocates the subnet's addresses diff --git a/neutron/scheduler/l3_agent_scheduler.py b/neutron/scheduler/l3_agent_scheduler.py index a9a399b6278..13145bf7059 100644 --- a/neutron/scheduler/l3_agent_scheduler.py +++ b/neutron/scheduler/l3_agent_scheduler.py @@ -15,6 +15,7 @@ import abc import collections +import functools import itertools import random @@ -255,13 +256,24 @@ class L3Scheduler(object): return False return True + def _add_port_from_net(self, plugin, ctxt, router_id, tenant_id, ha_net): + """small wrapper function to unpack network id from ha_network""" + return plugin.add_ha_port(ctxt, router_id, ha_net.network.id, + tenant_id) + def create_ha_port_and_bind(self, plugin, context, router_id, tenant_id, agent): """Creates and binds a new HA port for this agent.""" - ha_network = plugin.get_ha_network(context, tenant_id) + ctxt = context.elevated() + creator = functools.partial(self._add_port_from_net, + plugin, ctxt, router_id, tenant_id) + dep_getter = functools.partial(plugin.get_ha_network, ctxt, tenant_id) + dep_creator = functools.partial(plugin._create_ha_network, + ctxt, tenant_id) + dep_id_attr = 'network_id' try: - port_binding = plugin.add_ha_port(context.elevated(), router_id, - ha_network.network.id, tenant_id) + port_binding = utils.create_object_with_dependency( + creator, dep_getter, dep_creator, dep_id_attr)[0] with db_api.autonested_transaction(context.session): port_binding.l3_agent_id = agent['id'] except db_exc.DBDuplicateEntry: diff --git a/neutron/tests/unit/db/test_l3_hamode_db.py b/neutron/tests/unit/db/test_l3_hamode_db.py index bab8dc9db02..30e08f44f52 100644 --- a/neutron/tests/unit/db/test_l3_hamode_db.py +++ b/neutron/tests/unit/db/test_l3_hamode_db.py @@ -14,9 +14,11 @@ import mock from oslo_config import cfg +from oslo_db import exception as db_exc from oslo_utils import uuidutils import sqlalchemy as sa from sqlalchemy import orm +import testtools from neutron.api.rpc.handlers import l3_rpc from neutron.api.v2 import attributes @@ -584,6 +586,82 @@ class L3HATestCase(L3HATestFramework): networks_after = self.core_plugin.get_networks(self.admin_ctx) self.assertEqual(networks_before, networks_after) + def test_create_ha_interfaces_and_ensure_network_net_exists(self): + router = self._create_router() + router_db = self.plugin._get_router(self.admin_ctx, router['id']) + with mock.patch.object(self.plugin, '_create_ha_network') as create: + self.plugin._create_ha_interfaces_and_ensure_network( + self.admin_ctx, router_db) + self.assertFalse(create.called) + + def test_create_ha_interfaces_and_ensure_network_concurrent_create(self): + # create a non-ha router so we can manually invoke the create ha + # interfaces call down below + router = self._create_router(ha=False) + router_db = self.plugin._get_router(self.admin_ctx, router['id']) + orig_create = self.plugin._create_ha_network + created_nets = [] + + def _create_ha_network(*args, **kwargs): + # create the network and then raise the error to simulate another + # worker creating the network before us. + created_nets.append(orig_create(*args, **kwargs)) + raise db_exc.DBDuplicateEntry(columns=['tenant_id']) + with mock.patch.object(self.plugin, '_create_ha_network', + new=_create_ha_network): + net = self.plugin._create_ha_interfaces_and_ensure_network( + self.admin_ctx, router_db)[1] + # ensure that it used the concurrently created network + self.assertEqual([net], created_nets) + + def _test_ensure_with_patched_int_create(self, _create_ha_interfaces): + # create a non-ha router so we can manually invoke the create ha + # interfaces call down below + router = self._create_router(ha=False) + router_db = self.plugin._get_router(self.admin_ctx, router['id']) + with mock.patch.object(self.plugin, '_create_ha_interfaces', + new=_create_ha_interfaces): + self.plugin._create_ha_interfaces_and_ensure_network( + self.admin_ctx, router_db) + self.assertTrue(_create_ha_interfaces.called) + + def test_create_ha_interfaces_and_ensure_network_concurrent_delete(self): + orig_create = self.plugin._create_ha_interfaces + + def _create_ha_interfaces(ctx, rdb, ha_net): + # concurrent delete on the first attempt + if not getattr(_create_ha_interfaces, 'called', False): + setattr(_create_ha_interfaces, 'called', True) + self.core_plugin.delete_network(self.admin_ctx, + ha_net['network_id']) + return orig_create(ctx, rdb, ha_net) + self._test_ensure_with_patched_int_create(_create_ha_interfaces) + + def test_create_ha_interfaces_and_ensure_network_concurrent_swap(self): + orig_create = self.plugin._create_ha_interfaces + + def _create_ha_interfaces(ctx, rdb, ha_net): + # concurrent delete on the first attempt + if not getattr(_create_ha_interfaces, 'called', False): + setattr(_create_ha_interfaces, 'called', True) + self.core_plugin.delete_network(self.admin_ctx, + ha_net['network_id']) + self.plugin._create_ha_network(self.admin_ctx, + rdb.tenant_id) + return orig_create(ctx, rdb, ha_net) + + self._test_ensure_with_patched_int_create(_create_ha_interfaces) + + def test_create_ha_network_tenant_binding_raises_duplicate(self): + router = self._create_router() + network = self.plugin.get_ha_network(self.admin_ctx, + router['tenant_id']) + self.plugin._create_ha_network_tenant_binding( + self.admin_ctx, 't1', network['network_id']) + with testtools.ExpectedException(db_exc.DBDuplicateEntry): + self.plugin._create_ha_network_tenant_binding( + self.admin_ctx, 't1', network['network_id']) + def test_create_ha_interfaces_binding_failure_rolls_back_ports(self): router = self._create_router() network = self.plugin.get_ha_network(self.admin_ctx,