Retry trunk status updates failing with StaleDataError
This is an approximate partial fix to #1828375. update_trunk_status and update_subport_bindings rpc messages are processed concurrently and possibly out of order on the server side. Therefore they may race with each other. The status update race combined with 1) the versioning feature of sqlalchemy used in the standardattributes table and 2) the less than serializable isolation level of some DB backends (like MySQL InnoDB) does raise StaleDataErrors and by that leaves some trunk subports in DOWN status. This change retries the trunk status update (to BUILD) blindly when StaleDataError was caught. In my local testbed this practically fixes #1828375. However theoretically the retry may cover up other real errors (when the cause of the StaleDataError was a different status not just a different revision count). To the best of my understanding a proper fix would entail guaranteeing the in order processing of the above rpc messages - which likely won't ever happen. I'm not sure at all if this change is worth merging - let me know what you think. Conflicts: neutron/services/trunk/rpc/server.py neutron/tests/unit/services/trunk/rpc/test_server.py Change-Id: Ie581809f24f9547b55a87423dac7db933862d66a Partial-Bug: #1828375 (cherry picked from commit618e24e241
) (cherry picked from commitd090fb9a3c
)
This commit is contained in:
parent
41db09c434
commit
80389bb4ee
|
@ -19,6 +19,7 @@ from neutron_lib.plugins import directory
|
||||||
from oslo_log import helpers as log_helpers
|
from oslo_log import helpers as log_helpers
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
|
from sqlalchemy.orm import exc
|
||||||
|
|
||||||
from neutron.api.rpc.callbacks import events
|
from neutron.api.rpc.callbacks import events
|
||||||
from neutron.api.rpc.callbacks.producer import registry
|
from neutron.api.rpc.callbacks.producer import registry
|
||||||
|
@ -115,11 +116,23 @@ class TrunkSkeleton(object):
|
||||||
trunk_port = self.core_plugin.get_port(context, trunk_port_id)
|
trunk_port = self.core_plugin.get_port(context, trunk_port_id)
|
||||||
trunk_host = trunk_port.get(portbindings.HOST_ID)
|
trunk_host = trunk_port.get(portbindings.HOST_ID)
|
||||||
|
|
||||||
# NOTE(status_police) Set the trunk in BUILD state before processing
|
tries = 3
|
||||||
# subport bindings. The trunk will stay in BUILD state until an
|
for try_cnt in range(tries):
|
||||||
# attempt has been made to bind all subports passed here and the
|
try:
|
||||||
# agent acknowledges the operation was successful.
|
# NOTE(status_police) Set the trunk in BUILD state before
|
||||||
trunk.update(status=trunk_consts.BUILD_STATUS)
|
# processing subport bindings. The trunk will stay in BUILD
|
||||||
|
# state until an attempt has been made to bind all subports
|
||||||
|
# passed here and the agent acknowledges the operation was
|
||||||
|
# successful.
|
||||||
|
trunk.update(status=trunk_consts.BUILD_STATUS)
|
||||||
|
break
|
||||||
|
except exc.StaleDataError as e:
|
||||||
|
if try_cnt < tries - 1:
|
||||||
|
LOG.debug("Got StaleDataError exception: %s", e)
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
# re-raise when all tries failed
|
||||||
|
raise
|
||||||
|
|
||||||
for port_id in port_ids:
|
for port_id in port_ids:
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -16,6 +16,7 @@ from neutron_lib.api.definitions import portbindings
|
||||||
from neutron_lib.plugins import directory
|
from neutron_lib.plugins import directory
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
|
from sqlalchemy.orm import exc
|
||||||
|
|
||||||
from neutron.api.rpc.callbacks import events
|
from neutron.api.rpc.callbacks import events
|
||||||
from neutron.api.rpc.callbacks import resources
|
from neutron.api.rpc.callbacks import resources
|
||||||
|
@ -149,6 +150,82 @@ class TrunkSkeletonTest(test_plugin.Ml2PluginV2TestCase):
|
||||||
self.assertEqual(trunk.status, constants.ERROR_STATUS)
|
self.assertEqual(trunk.status, constants.ERROR_STATUS)
|
||||||
self.assertEqual([], updated_subports[trunk.id])
|
self.assertEqual([], updated_subports[trunk.id])
|
||||||
|
|
||||||
|
def test_udate_subport_bindings_staledataerror(self):
|
||||||
|
with self.port() as _parent_port:
|
||||||
|
parent_port = _parent_port
|
||||||
|
trunk = self._create_test_trunk(parent_port)
|
||||||
|
port_data = {portbindings.HOST_ID: 'trunk_host_id'}
|
||||||
|
self.core_plugin.update_port(
|
||||||
|
self.context, parent_port['port']['id'], {'port': port_data})
|
||||||
|
subports = []
|
||||||
|
for vid in range(0, 3):
|
||||||
|
with self.port() as new_port:
|
||||||
|
new_port[portbindings.HOST_ID] = 'trunk_host_id'
|
||||||
|
obj = trunk_obj.SubPort(
|
||||||
|
context=self.context,
|
||||||
|
trunk_id=trunk['id'],
|
||||||
|
port_id=new_port['port']['id'],
|
||||||
|
segmentation_type='vlan',
|
||||||
|
segmentation_id=vid)
|
||||||
|
subports.append(obj)
|
||||||
|
|
||||||
|
test_obj = server.TrunkSkeleton()
|
||||||
|
test_obj._trunk_plugin = self.trunk_plugin
|
||||||
|
test_obj._core_plugin = self.core_plugin
|
||||||
|
self.mock_update_port.return_value = {portbindings.VIF_TYPE:
|
||||||
|
portbindings.VIF_TYPE_BINDING_FAILED}
|
||||||
|
mock_trunk_obj = mock.Mock(port_id=parent_port['port']['id'])
|
||||||
|
mock_trunk_obj.update.side_effect = exc.StaleDataError
|
||||||
|
|
||||||
|
with mock.patch.object(
|
||||||
|
trunk_obj.Trunk,
|
||||||
|
'get_object',
|
||||||
|
return_value=mock_trunk_obj):
|
||||||
|
self.assertRaises(
|
||||||
|
exc.StaleDataError,
|
||||||
|
test_obj.update_subport_bindings,
|
||||||
|
self.context,
|
||||||
|
subports=subports)
|
||||||
|
self.assertEqual(3, mock_trunk_obj.update.call_count)
|
||||||
|
|
||||||
|
def test_udate_subport_bindings_noretryerror(self):
|
||||||
|
with self.port() as _parent_port:
|
||||||
|
parent_port = _parent_port
|
||||||
|
trunk = self._create_test_trunk(parent_port)
|
||||||
|
port_data = {portbindings.HOST_ID: 'trunk_host_id'}
|
||||||
|
self.core_plugin.update_port(
|
||||||
|
self.context, parent_port['port']['id'], {'port': port_data})
|
||||||
|
subports = []
|
||||||
|
for vid in range(0, 3):
|
||||||
|
with self.port() as new_port:
|
||||||
|
new_port[portbindings.HOST_ID] = 'trunk_host_id'
|
||||||
|
obj = trunk_obj.SubPort(
|
||||||
|
context=self.context,
|
||||||
|
trunk_id=trunk['id'],
|
||||||
|
port_id=new_port['port']['id'],
|
||||||
|
segmentation_type='vlan',
|
||||||
|
segmentation_id=vid)
|
||||||
|
subports.append(obj)
|
||||||
|
|
||||||
|
test_obj = server.TrunkSkeleton()
|
||||||
|
test_obj._trunk_plugin = self.trunk_plugin
|
||||||
|
test_obj._core_plugin = self.core_plugin
|
||||||
|
self.mock_update_port.return_value = {portbindings.VIF_TYPE:
|
||||||
|
portbindings.VIF_TYPE_BINDING_FAILED}
|
||||||
|
mock_trunk_obj = mock.Mock(port_id=parent_port['port']['id'])
|
||||||
|
mock_trunk_obj.update.side_effect = KeyError
|
||||||
|
|
||||||
|
with mock.patch.object(
|
||||||
|
trunk_obj.Trunk,
|
||||||
|
'get_object',
|
||||||
|
return_value=mock_trunk_obj):
|
||||||
|
self.assertRaises(
|
||||||
|
KeyError,
|
||||||
|
test_obj.update_subport_bindings,
|
||||||
|
self.context,
|
||||||
|
subports=subports)
|
||||||
|
self.assertEqual(1, mock_trunk_obj.update.call_count)
|
||||||
|
|
||||||
def test_update_subport_bindings_exception(self):
|
def test_update_subport_bindings_exception(self):
|
||||||
with self.port() as _parent_port:
|
with self.port() as _parent_port:
|
||||||
parent_port = _parent_port
|
parent_port = _parent_port
|
||||||
|
|
Loading…
Reference in New Issue