From 80389bb4ee8a9cd52401187b278e339c69d9f295 Mon Sep 17 00:00:00 2001 From: Bence Romsics Date: Wed, 22 May 2019 16:42:34 +0200 Subject: [PATCH] Retry trunk status updates failing with StaleDataError This is an approximate partial fix to #1828375. update_trunk_status and update_subport_bindings rpc messages are processed concurrently and possibly out of order on the server side. Therefore they may race with each other. The status update race combined with 1) the versioning feature of sqlalchemy used in the standardattributes table and 2) the less than serializable isolation level of some DB backends (like MySQL InnoDB) does raise StaleDataErrors and by that leaves some trunk subports in DOWN status. This change retries the trunk status update (to BUILD) blindly when StaleDataError was caught. In my local testbed this practically fixes #1828375. However theoretically the retry may cover up other real errors (when the cause of the StaleDataError was a different status not just a different revision count). To the best of my understanding a proper fix would entail guaranteeing the in order processing of the above rpc messages - which likely won't ever happen. I'm not sure at all if this change is worth merging - let me know what you think. Conflicts: neutron/services/trunk/rpc/server.py neutron/tests/unit/services/trunk/rpc/test_server.py Change-Id: Ie581809f24f9547b55a87423dac7db933862d66a Partial-Bug: #1828375 (cherry picked from commit 618e24e241241c7323702a815542f11e91fd32a1) (cherry picked from commit d090fb9a3cbad6c4c3f5c524907a94cb5f95b205) --- neutron/services/trunk/rpc/server.py | 23 ++++-- .../unit/services/trunk/rpc/test_server.py | 77 +++++++++++++++++++ 2 files changed, 95 insertions(+), 5 deletions(-) diff --git a/neutron/services/trunk/rpc/server.py b/neutron/services/trunk/rpc/server.py index 7004632257e..0652b99081a 100644 --- a/neutron/services/trunk/rpc/server.py +++ b/neutron/services/trunk/rpc/server.py @@ -19,6 +19,7 @@ from neutron_lib.plugins import directory from oslo_log import helpers as log_helpers from oslo_log import log as logging import oslo_messaging +from sqlalchemy.orm import exc from neutron.api.rpc.callbacks import events from neutron.api.rpc.callbacks.producer import registry @@ -115,11 +116,23 @@ class TrunkSkeleton(object): trunk_port = self.core_plugin.get_port(context, trunk_port_id) trunk_host = trunk_port.get(portbindings.HOST_ID) - # NOTE(status_police) Set the trunk in BUILD state before processing - # subport bindings. The trunk will stay in BUILD state until an - # attempt has been made to bind all subports passed here and the - # agent acknowledges the operation was successful. - trunk.update(status=trunk_consts.BUILD_STATUS) + tries = 3 + for try_cnt in range(tries): + try: + # NOTE(status_police) Set the trunk in BUILD state before + # processing subport bindings. The trunk will stay in BUILD + # state until an attempt has been made to bind all subports + # passed here and the agent acknowledges the operation was + # successful. + trunk.update(status=trunk_consts.BUILD_STATUS) + break + except exc.StaleDataError as e: + if try_cnt < tries - 1: + LOG.debug("Got StaleDataError exception: %s", e) + continue + else: + # re-raise when all tries failed + raise for port_id in port_ids: try: diff --git a/neutron/tests/unit/services/trunk/rpc/test_server.py b/neutron/tests/unit/services/trunk/rpc/test_server.py index e94a04ecf55..cf3ae161d78 100644 --- a/neutron/tests/unit/services/trunk/rpc/test_server.py +++ b/neutron/tests/unit/services/trunk/rpc/test_server.py @@ -16,6 +16,7 @@ from neutron_lib.api.definitions import portbindings from neutron_lib.plugins import directory from oslo_config import cfg import oslo_messaging +from sqlalchemy.orm import exc from neutron.api.rpc.callbacks import events from neutron.api.rpc.callbacks import resources @@ -149,6 +150,82 @@ class TrunkSkeletonTest(test_plugin.Ml2PluginV2TestCase): self.assertEqual(trunk.status, constants.ERROR_STATUS) self.assertEqual([], updated_subports[trunk.id]) + def test_udate_subport_bindings_staledataerror(self): + with self.port() as _parent_port: + parent_port = _parent_port + trunk = self._create_test_trunk(parent_port) + port_data = {portbindings.HOST_ID: 'trunk_host_id'} + self.core_plugin.update_port( + self.context, parent_port['port']['id'], {'port': port_data}) + subports = [] + for vid in range(0, 3): + with self.port() as new_port: + new_port[portbindings.HOST_ID] = 'trunk_host_id' + obj = trunk_obj.SubPort( + context=self.context, + trunk_id=trunk['id'], + port_id=new_port['port']['id'], + segmentation_type='vlan', + segmentation_id=vid) + subports.append(obj) + + test_obj = server.TrunkSkeleton() + test_obj._trunk_plugin = self.trunk_plugin + test_obj._core_plugin = self.core_plugin + self.mock_update_port.return_value = {portbindings.VIF_TYPE: + portbindings.VIF_TYPE_BINDING_FAILED} + mock_trunk_obj = mock.Mock(port_id=parent_port['port']['id']) + mock_trunk_obj.update.side_effect = exc.StaleDataError + + with mock.patch.object( + trunk_obj.Trunk, + 'get_object', + return_value=mock_trunk_obj): + self.assertRaises( + exc.StaleDataError, + test_obj.update_subport_bindings, + self.context, + subports=subports) + self.assertEqual(3, mock_trunk_obj.update.call_count) + + def test_udate_subport_bindings_noretryerror(self): + with self.port() as _parent_port: + parent_port = _parent_port + trunk = self._create_test_trunk(parent_port) + port_data = {portbindings.HOST_ID: 'trunk_host_id'} + self.core_plugin.update_port( + self.context, parent_port['port']['id'], {'port': port_data}) + subports = [] + for vid in range(0, 3): + with self.port() as new_port: + new_port[portbindings.HOST_ID] = 'trunk_host_id' + obj = trunk_obj.SubPort( + context=self.context, + trunk_id=trunk['id'], + port_id=new_port['port']['id'], + segmentation_type='vlan', + segmentation_id=vid) + subports.append(obj) + + test_obj = server.TrunkSkeleton() + test_obj._trunk_plugin = self.trunk_plugin + test_obj._core_plugin = self.core_plugin + self.mock_update_port.return_value = {portbindings.VIF_TYPE: + portbindings.VIF_TYPE_BINDING_FAILED} + mock_trunk_obj = mock.Mock(port_id=parent_port['port']['id']) + mock_trunk_obj.update.side_effect = KeyError + + with mock.patch.object( + trunk_obj.Trunk, + 'get_object', + return_value=mock_trunk_obj): + self.assertRaises( + KeyError, + test_obj.update_subport_bindings, + self.context, + subports=subports) + self.assertEqual(1, mock_trunk_obj.update.call_count) + def test_update_subport_bindings_exception(self): with self.port() as _parent_port: parent_port = _parent_port