Browse Source

Retry trunk status updates failing with StaleDataError

This is an approximate partial fix to #1828375.

update_trunk_status and update_subport_bindings rpc messages are
processed concurrently and possibly out of order on the server side.
Therefore they may race with each other.

The status update race combined with
1) the versioning feature of sqlalchemy used in the standardattributes
   table and
2) the less than serializable isolation level of some DB backends (like
   MySQL InnoDB)
does raise StaleDataErrors and by that leaves some trunk subports in
DOWN status.

This change retries the trunk status update (to BUILD) blindly when
StaleDataError was caught. In my local testbed this practically
fixes #1828375.

However theoretically the retry may cover up other real errors (when the
cause of the StaleDataError was a different status not just a different
revision count).

To the best of my understanding a proper fix would entail guaranteeing
the in order processing of the above rpc messages - which likely won't
ever happen.

I'm not sure at all if this change is worth merging - let me know what
you think.

Conflicts:
    neutron/services/trunk/rpc/server.py
    neutron/tests/unit/services/trunk/rpc/test_server.py

Change-Id: Ie581809f24f9547b55a87423dac7db933862d66a
Partial-Bug: #1828375
(cherry picked from commit 618e24e241)
(cherry picked from commit d090fb9a3c)
changes/45/673745/1
Bence Romsics 4 months ago
parent
commit
10191fd817

+ 18
- 5
neutron/services/trunk/rpc/server.py View File

@@ -19,6 +19,7 @@ from neutron_lib.plugins import directory
19 19
 from oslo_log import helpers as log_helpers
20 20
 from oslo_log import log as logging
21 21
 import oslo_messaging
22
+from sqlalchemy.orm import exc
22 23
 
23 24
 from neutron.api.rpc.callbacks import events
24 25
 from neutron.api.rpc.callbacks.producer import registry
@@ -115,11 +116,23 @@ class TrunkSkeleton(object):
115 116
         trunk_port = self.core_plugin.get_port(context, trunk_port_id)
116 117
         trunk_host = trunk_port.get(portbindings.HOST_ID)
117 118
 
118
-        # NOTE(status_police) Set the trunk in BUILD state before processing
119
-        # subport bindings. The trunk will stay in BUILD state until an
120
-        # attempt has been made to bind all subports passed here and the
121
-        # agent acknowledges the operation was successful.
122
-        trunk.update(status=trunk_consts.BUILD_STATUS)
119
+        tries = 3
120
+        for try_cnt in range(tries):
121
+            try:
122
+                # NOTE(status_police) Set the trunk in BUILD state before
123
+                # processing subport bindings. The trunk will stay in BUILD
124
+                # state until an attempt has been made to bind all subports
125
+                # passed here and the agent acknowledges the operation was
126
+                # successful.
127
+                trunk.update(status=trunk_consts.BUILD_STATUS)
128
+                break
129
+            except exc.StaleDataError as e:
130
+                if try_cnt < tries - 1:
131
+                    LOG.debug("Got StaleDataError exception: %s", e)
132
+                    continue
133
+                else:
134
+                    # re-raise when all tries failed
135
+                    raise
123 136
 
124 137
         for port_id in port_ids:
125 138
             try:

+ 77
- 0
neutron/tests/unit/services/trunk/rpc/test_server.py View File

@@ -16,6 +16,7 @@ from neutron_lib.api.definitions import portbindings
16 16
 from neutron_lib.plugins import directory
17 17
 from oslo_config import cfg
18 18
 import oslo_messaging
19
+from sqlalchemy.orm import exc
19 20
 
20 21
 from neutron.api.rpc.callbacks import events
21 22
 from neutron.api.rpc.callbacks import resources
@@ -149,6 +150,82 @@ class TrunkSkeletonTest(test_plugin.Ml2PluginV2TestCase):
149 150
         self.assertEqual(trunk.status, constants.ERROR_STATUS)
150 151
         self.assertEqual([], updated_subports[trunk.id])
151 152
 
153
+    def test_udate_subport_bindings_staledataerror(self):
154
+        with self.port() as _parent_port:
155
+            parent_port = _parent_port
156
+        trunk = self._create_test_trunk(parent_port)
157
+        port_data = {portbindings.HOST_ID: 'trunk_host_id'}
158
+        self.core_plugin.update_port(
159
+            self.context, parent_port['port']['id'], {'port': port_data})
160
+        subports = []
161
+        for vid in range(0, 3):
162
+            with self.port() as new_port:
163
+                new_port[portbindings.HOST_ID] = 'trunk_host_id'
164
+                obj = trunk_obj.SubPort(
165
+                    context=self.context,
166
+                    trunk_id=trunk['id'],
167
+                    port_id=new_port['port']['id'],
168
+                    segmentation_type='vlan',
169
+                    segmentation_id=vid)
170
+                subports.append(obj)
171
+
172
+        test_obj = server.TrunkSkeleton()
173
+        test_obj._trunk_plugin = self.trunk_plugin
174
+        test_obj._core_plugin = self.core_plugin
175
+        self.mock_update_port.return_value = {portbindings.VIF_TYPE:
176
+                                         portbindings.VIF_TYPE_BINDING_FAILED}
177
+        mock_trunk_obj = mock.Mock(port_id=parent_port['port']['id'])
178
+        mock_trunk_obj.update.side_effect = exc.StaleDataError
179
+
180
+        with mock.patch.object(
181
+                trunk_obj.Trunk,
182
+                'get_object',
183
+                return_value=mock_trunk_obj):
184
+            self.assertRaises(
185
+                exc.StaleDataError,
186
+                test_obj.update_subport_bindings,
187
+                self.context,
188
+                subports=subports)
189
+            self.assertEqual(3, mock_trunk_obj.update.call_count)
190
+
191
+    def test_udate_subport_bindings_noretryerror(self):
192
+        with self.port() as _parent_port:
193
+            parent_port = _parent_port
194
+        trunk = self._create_test_trunk(parent_port)
195
+        port_data = {portbindings.HOST_ID: 'trunk_host_id'}
196
+        self.core_plugin.update_port(
197
+            self.context, parent_port['port']['id'], {'port': port_data})
198
+        subports = []
199
+        for vid in range(0, 3):
200
+            with self.port() as new_port:
201
+                new_port[portbindings.HOST_ID] = 'trunk_host_id'
202
+                obj = trunk_obj.SubPort(
203
+                    context=self.context,
204
+                    trunk_id=trunk['id'],
205
+                    port_id=new_port['port']['id'],
206
+                    segmentation_type='vlan',
207
+                    segmentation_id=vid)
208
+                subports.append(obj)
209
+
210
+        test_obj = server.TrunkSkeleton()
211
+        test_obj._trunk_plugin = self.trunk_plugin
212
+        test_obj._core_plugin = self.core_plugin
213
+        self.mock_update_port.return_value = {portbindings.VIF_TYPE:
214
+                                         portbindings.VIF_TYPE_BINDING_FAILED}
215
+        mock_trunk_obj = mock.Mock(port_id=parent_port['port']['id'])
216
+        mock_trunk_obj.update.side_effect = KeyError
217
+
218
+        with mock.patch.object(
219
+                trunk_obj.Trunk,
220
+                'get_object',
221
+                return_value=mock_trunk_obj):
222
+            self.assertRaises(
223
+                KeyError,
224
+                test_obj.update_subport_bindings,
225
+                self.context,
226
+                subports=subports)
227
+            self.assertEqual(1, mock_trunk_obj.update.call_count)
228
+
152 229
     def test_update_subport_bindings_exception(self):
153 230
         with self.port() as _parent_port:
154 231
             parent_port = _parent_port

Loading…
Cancel
Save