Merge "Retry port create/update on duplicate db records" into stable/liberty
This commit is contained in:
commit
a0fed179c3
|
@ -19,6 +19,7 @@ from oslo_config import cfg
|
|||
from oslo_db import api as oslo_db_api
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_db.sqlalchemy import session
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import uuidutils
|
||||
from sqlalchemy import exc
|
||||
|
||||
|
@ -37,6 +38,17 @@ retry_db_errors = oslo_db_api.wrap_db_retry(
|
|||
)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def exc_to_retry(exceptions):
|
||||
try:
|
||||
yield
|
||||
except Exception as e:
|
||||
with excutils.save_and_reraise_exception() as ctx:
|
||||
if isinstance(e, exceptions):
|
||||
ctx.reraise = False
|
||||
raise db_exc.RetryRequest(e)
|
||||
|
||||
|
||||
def _create_facade_lazily():
|
||||
global _FACADE
|
||||
|
||||
|
|
|
@ -1000,7 +1000,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
attrs['status'] = const.PORT_STATUS_DOWN
|
||||
|
||||
session = context.session
|
||||
with session.begin(subtransactions=True):
|
||||
with db_api.exc_to_retry(os_db_exception.DBDuplicateEntry),\
|
||||
session.begin(subtransactions=True):
|
||||
dhcp_opts = attrs.get(edo_ext.EXTRADHCPOPTS, [])
|
||||
result = super(Ml2Plugin, self).create_port(context, port)
|
||||
self.extension_manager.process_create_port(context, attrs, result)
|
||||
|
@ -1135,7 +1136,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
session = context.session
|
||||
bound_mech_contexts = []
|
||||
|
||||
with session.begin(subtransactions=True):
|
||||
with db_api.exc_to_retry(os_db_exception.DBDuplicateEntry),\
|
||||
session.begin(subtransactions=True):
|
||||
port_db, binding = db.get_locked_port_and_binding(session, id)
|
||||
if not port_db:
|
||||
raise exc.PortNotFound(port_id=id)
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_db import exception as db_exc
|
||||
import testtools
|
||||
|
||||
from neutron.db import api as db_api
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
class TestExceptionToRetryContextManager(base.BaseTestCase):
|
||||
|
||||
def test_translates_single_exception(self):
|
||||
with testtools.ExpectedException(db_exc.RetryRequest):
|
||||
with db_api.exc_to_retry(ValueError):
|
||||
raise ValueError()
|
||||
|
||||
def test_translates_multiple_exception_types(self):
|
||||
with testtools.ExpectedException(db_exc.RetryRequest):
|
||||
with db_api.exc_to_retry((ValueError, TypeError)):
|
||||
raise TypeError()
|
||||
|
||||
def test_passes_other_exceptions(self):
|
||||
with testtools.ExpectedException(ValueError):
|
||||
with db_api.exc_to_retry(TypeError):
|
||||
raise ValueError()
|
||||
|
||||
def test_inner_exception_preserved_in_retryrequest(self):
|
||||
try:
|
||||
exc = ValueError('test')
|
||||
with db_api.exc_to_retry(ValueError):
|
||||
raise exc
|
||||
except db_exc.RetryRequest as e:
|
||||
self.assertEqual(exc, e.inner_exc)
|
|
@ -749,6 +749,54 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
|
|||
self.assertRaises(
|
||||
exc.PortNotFound, plugin.get_port, ctx, port['port']['id'])
|
||||
|
||||
def test_port_create_resillient_to_duplicate_records(self):
|
||||
|
||||
def make_port():
|
||||
with self.port():
|
||||
pass
|
||||
|
||||
self._test_operation_resillient_to_ipallocation_failure(make_port)
|
||||
|
||||
def test_port_update_resillient_to_duplicate_records(self):
|
||||
with self.port() as p:
|
||||
data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.9'}]}}
|
||||
req = self.new_update_request('ports', data, p['port']['id'])
|
||||
|
||||
def do_request():
|
||||
self.assertEqual(200, req.get_response(self.api).status_int)
|
||||
|
||||
self._test_operation_resillient_to_ipallocation_failure(do_request)
|
||||
|
||||
def _test_operation_resillient_to_ipallocation_failure(self, func):
|
||||
from sqlalchemy import event
|
||||
|
||||
class IPAllocationsGrenade(object):
|
||||
insert_ip_called = False
|
||||
except_raised = False
|
||||
|
||||
def execute(self, con, curs, stmt, *args, **kwargs):
|
||||
if 'INSERT INTO ipallocations' in stmt:
|
||||
self.insert_ip_called = True
|
||||
|
||||
def commit(self, con):
|
||||
# we blow up on commit to simulate another thread/server
|
||||
# stealing our IP before our transaction was done
|
||||
if self.insert_ip_called and not self.except_raised:
|
||||
self.except_raised = True
|
||||
raise db_exc.DBDuplicateEntry()
|
||||
|
||||
listener = IPAllocationsGrenade()
|
||||
engine = db_api.get_engine()
|
||||
event.listen(engine, 'before_cursor_execute', listener.execute)
|
||||
event.listen(engine, 'commit', listener.commit)
|
||||
self.addCleanup(event.remove, engine, 'before_cursor_execute',
|
||||
listener.execute)
|
||||
self.addCleanup(event.remove, engine, 'commit',
|
||||
listener.commit)
|
||||
func()
|
||||
# make sure that the grenade went off during the commit
|
||||
self.assertTrue(listener.except_raised)
|
||||
|
||||
|
||||
class TestMl2PluginOnly(Ml2PluginV2TestCase):
|
||||
"""For testing methods that don't call drivers"""
|
||||
|
|
Loading…
Reference in New Issue