Browse Source

Merge "Handle events in separate IDL instance"

tags/7.0.0.0b1
Zuul 1 week ago
parent
commit
cb1d0d91d8

+ 33
- 18
networking_ovn/octavia/ovn_driver.py View File

@@ -29,7 +29,6 @@ from ovs.stream import Stream
29 29
 from ovsdbapp.backend.ovs_idl import connection
30 30
 from ovsdbapp.backend.ovs_idl import event as row_event
31 31
 from ovsdbapp.backend.ovs_idl import idlutils
32
-from ovsdbapp.schema.ovn_northbound import impl_idl as idl_ovn
33 32
 from six.moves import queue as Queue
34 33
 from stevedore import driver
35 34
 import tenacity
@@ -38,6 +37,7 @@ from networking_ovn._i18n import _
38 37
 from networking_ovn.common import config as ovn_cfg
39 38
 from networking_ovn.common import constants as ovn_const
40 39
 from networking_ovn.common import utils as ovn_utils
40
+from networking_ovn.ovsdb import impl_idl_ovn
41 41
 from networking_ovn.ovsdb import ovsdb_monitor
42 42
 
43 43
 CONF = cfg.CONF  # Gets Octavia Conf as it runs under o-api domain
@@ -81,6 +81,7 @@ OVN_NATIVE_LB_PROTOCOLS = [constants.PROTOCOL_TCP,
81 81
                            constants.PROTOCOL_UDP, ]
82 82
 OVN_NATIVE_LB_ALGORITHMS = [constants.LB_ALGORITHM_ROUND_ROBIN, ]
83 83
 EXCEPTION_MSG = "Exception occurred during %s"
84
+OVN_EVENT_LOCK_NAME = "neutron_ovn_octavia_event_lock"
84 85
 
85 86
 
86 87
 def get_network_driver():
@@ -140,20 +141,22 @@ class LogicalSwitchPortUpdateEvent(row_event.RowEvent):
140 141
 
141 142
 
142 143
 class OvnNbIdlForLb(ovsdb_monitor.OvnIdl):
143
-
144 144
     SCHEMA = "OVN_Northbound"
145
+    TABLES = ('Logical_Switch', 'Load_Balancer', 'Logical_Router',
146
+              'Logical_Switch_Port', 'Logical_Router_Port',
147
+              'Gateway_Chassis')
145 148
 
146
-    def __init__(self):
147
-        self.tables = ('Logical_Switch', 'Load_Balancer', 'Logical_Router',
148
-                       'Logical_Switch_Port', 'Logical_Router_Port',
149
-                       'Gateway_Chassis')
149
+    def __init__(self, event_lock_name=None):
150 150
         self.conn_string = ovn_cfg.get_ovn_nb_connection()
151 151
         helper = self._get_ovsdb_helper(self.conn_string)
152
-        for table in self.tables:
152
+        for table in OvnNbIdlForLb.TABLES:
153 153
             helper.register_table(table)
154 154
         super(OvnNbIdlForLb, self).__init__(
155 155
             driver=None, remote=self.conn_string, schema=helper)
156
-        self.event_lock_name = "neutron_ovn_octavia_event_lock"
156
+        self.event_lock_name = event_lock_name
157
+        if self.event_lock_name:
158
+            self.set_lock(self.event_lock_name)
159
+        atexit.register(self.stop)
157 160
 
158 161
     @tenacity.retry(
159 162
         wait=tenacity.wait_exponential(max=180),
@@ -164,11 +167,12 @@ class OvnNbIdlForLb(ovsdb_monitor.OvnIdl):
164 167
     def start(self):
165 168
         self.conn = connection.Connection(
166 169
             self, timeout=ovn_cfg.get_ovn_ovsdb_timeout())
167
-        return idl_ovn.OvnNbApiIdlImpl(self.conn)
170
+        return impl_idl_ovn.OvsdbNbOvnIdl(self.conn)
168 171
 
169 172
     def stop(self):
170
-        # Close the running connection
171
-        if not self.conn.stop(timeout=ovn_cfg.get_ovn_ovsdb_timeout()):
173
+        # Close the running connection if it has been initalized
174
+        if ((hasattr(self, 'conn') and not
175
+             self.conn.stop(timeout=ovn_cfg.get_ovn_ovsdb_timeout()))):
172 176
             LOG.debug("Connection terminated to OvnNb "
173 177
                       "but a thread is still alive")
174 178
         # complete the shutdown for the event handler
@@ -179,6 +183,8 @@ class OvnNbIdlForLb(ovsdb_monitor.OvnIdl):
179 183
 
180 184
 class OvnProviderHelper(object):
181 185
 
186
+    ovn_nbdb_api_for_events = None
187
+    ovn_nb_idl_for_events = None
182 188
     ovn_nbdb_api = None
183 189
 
184 190
     def __init__(self):
@@ -189,6 +195,8 @@ class OvnProviderHelper(object):
189 195
         self._octavia_driver_lib = o_driver_lib.DriverLibrary()
190 196
         self._check_and_set_ssl_files()
191 197
         self._init_lb_actions()
198
+        self.events = [LogicalRouterPortEvent(self),
199
+                       LogicalSwitchPortUpdateEvent(self)]
192 200
         self.start()
193 201
 
194 202
     def _init_lb_actions(self):
@@ -229,18 +237,24 @@ class OvnProviderHelper(object):
229 237
             Stream.ssl_set_ca_cert_file(ca_cert_file)
230 238
 
231 239
     def start(self):
232
-        self.ovn_nb_idl_for_lb = OvnNbIdlForLb()
240
+        # NOTE(mjozefcz): This API is only for handling octavia API requests.
233 241
         if not OvnProviderHelper.ovn_nbdb_api:
234
-            OvnProviderHelper.ovn_nbdb_api = self.ovn_nb_idl_for_lb.start()
235
-        self.events = [LogicalRouterPortEvent(self),
236
-                       LogicalSwitchPortUpdateEvent(self)]
237
-        self.ovn_nb_idl_for_lb.notify_handler.watch_events(self.events)
242
+            OvnProviderHelper.ovn_nbdb_api = OvnNbIdlForLb().start()
243
+
244
+        # NOTE(mjozefcz): This API is only for handling OVSDB events!
245
+        if not OvnProviderHelper.ovn_nbdb_api_for_events:
246
+            OvnProviderHelper.ovn_nb_idl_for_events = OvnNbIdlForLb(
247
+                event_lock_name=OVN_EVENT_LOCK_NAME)
248
+            (OvnProviderHelper.ovn_nb_idl_for_events.notify_handler.
249
+             watch_events(self.events))
250
+            OvnProviderHelper.ovn_nbdb_api_for_events = (
251
+                OvnProviderHelper.ovn_nb_idl_for_events.start())
238 252
         self.helper_thread.start()
239 253
 
240 254
     def shutdown(self):
241 255
         self.requests.put({'type': REQ_TYPE_EXIT})
242 256
         self.helper_thread.join()
243
-        self.ovn_nb_idl_for_lb.notify_handler.unwatch_events(self.events)
257
+        self.ovn_nb_idl_for_events.notify_handler.unwatch_events(self.events)
244 258
 
245 259
     @staticmethod
246 260
     def _map_val(row, col, key):
@@ -436,7 +450,8 @@ class OvnProviderHelper(object):
436 450
         return self.ovn_nbdb_api.lookup('Load_Balancer', lb_id)
437 451
 
438 452
     def _find_ovn_lb_with_pool_key(self, pool_key):
439
-        lbs = self.ovn_nbdb_api.db_list_rows('Load_Balancer').execute()
453
+        lbs = self.ovn_nbdb_api.db_list_rows(
454
+            'Load_Balancer').execute(check_error=True)
440 455
         for lb in lbs:
441 456
             if pool_key in lb.external_ids:
442 457
                 return lb

+ 1
- 0
networking_ovn/tests/functional/octavia/test_ovn_driver.py View File

@@ -41,6 +41,7 @@ class TestOctaviaOvnProviderDriver(base.TestOVNFunctionalBase):
41 41
         # use the old object.
42 42
         idl_ovn.OvnNbApiIdlImpl.ovsdb_connection = None
43 43
         ovn_driver.OvnProviderHelper.ovn_nbdb_api = None
44
+        ovn_driver.OvnProviderHelper.ovn_nbdb_api_for_events = None
44 45
         # TODO(mjozefcz): Use octavia listeners to provide needed
45 46
         # sockets and modify tests in order to verify if fake
46 47
         # listener (status) has received valid value.

+ 66
- 2
networking_ovn/tests/unit/octavia/test_ovn_driver.py View File

@@ -11,6 +11,8 @@
11 11
 #    License for the specific language governing permissions and limitations
12 12
 #    under the License.
13 13
 #
14
+import os
15
+
14 16
 import mock
15 17
 from neutron.tests import base
16 18
 from octavia_lib.api.drivers import data_models
@@ -18,12 +20,18 @@ from octavia_lib.api.drivers import driver_lib
18 20
 from octavia_lib.api.drivers import exceptions
19 21
 from octavia_lib.common import constants
20 22
 from oslo_utils import uuidutils
23
+from ovs.db import idl as ovs_idl
21 24
 from ovsdbapp.backend.ovs_idl import idlutils
22 25
 
23 26
 from networking_ovn.common import constants as ovn_const
24 27
 from networking_ovn.octavia import ovn_driver
25 28
 from networking_ovn.tests.unit import fakes
26 29
 
30
+basedir = os.path.dirname(os.path.abspath(__file__))
31
+schema_files = {
32
+    'OVN_Northbound': os.path.join(basedir.replace('octavia', 'ovsdb'),
33
+                                   'schemas', 'ovn-nb.ovsschema')}
34
+
27 35
 
28 36
 # TODO(mjozefcz): Move it to unittest fakes.
29 37
 class MockedLB(data_models.LoadBalancer):
@@ -37,6 +45,50 @@ class MockedLB(data_models.LoadBalancer):
37 45
         return self.__sizeof__()
38 46
 
39 47
 
48
+class TestOvnNbIdlForLb(base.BaseTestCase):
49
+    def setUp(self):
50
+        super(TestOvnNbIdlForLb, self).setUp()
51
+        self.mock_gsh = mock.patch.object(
52
+            idlutils, 'get_schema_helper',
53
+            side_effect=lambda x, y: ovs_idl.SchemaHelper(
54
+                location=schema_files['OVN_Northbound'])).start()
55
+        self.idl = ovn_driver.OvnNbIdlForLb()
56
+
57
+    def test__get_ovsdb_helper(self):
58
+        self.mock_gsh.reset_mock()
59
+        self.idl._get_ovsdb_helper('foo')
60
+        self.mock_gsh.assert_called_once_with('foo', 'OVN_Northbound')
61
+
62
+    def test_start(self):
63
+        with mock.patch('ovsdbapp.backend.ovs_idl.connection.Connection',
64
+                        side_effect=lambda x, timeout: mock.Mock()):
65
+            idl1 = ovn_driver.OvnNbIdlForLb()
66
+            ret1 = idl1.start()
67
+            id1 = id(ret1.ovsdb_connection)
68
+            idl2 = ovn_driver.OvnNbIdlForLb()
69
+            ret2 = idl2.start()
70
+            id2 = id(ret2.ovsdb_connection)
71
+            self.assertNotEqual(id1, id2)
72
+
73
+    @mock.patch('ovsdbapp.backend.ovs_idl.connection.Connection')
74
+    def test_stop(self, mock_conn):
75
+        mock_conn.stop.return_value = False
76
+        with (
77
+            mock.patch.object(
78
+                self.idl.notify_handler, 'shutdown')) as mock_notify, (
79
+                mock.patch.object(self.idl, 'close')) as mock_close:
80
+            self.idl.start()
81
+            self.idl.stop()
82
+        mock_notify.assert_called_once_with()
83
+        mock_close.assert_called_once_with()
84
+
85
+    def test_setlock(self):
86
+        with mock.patch.object(ovn_driver.OvnNbIdlForLb,
87
+                               'set_lock') as set_lock:
88
+            self.idl = ovn_driver.OvnNbIdlForLb(event_lock_name='foo')
89
+        set_lock.assert_called_once_with('foo')
90
+
91
+
40 92
 class TestOvnOctaviaBase(base.BaseTestCase):
41 93
 
42 94
     def setUp(self):
@@ -1287,12 +1339,24 @@ class TestOvnProviderHelper(TestOvnOctaviaBase):
1287 1339
                           '172.26.21.20:80': '192.168.2.149:1010'}))]
1288 1340
         self.helper.ovn_nbdb_api.assert_has_calls(calls)
1289 1341
 
1290
-    def test_single_ovsdb_connection(self):
1342
+    @mock.patch.object(ovn_driver, 'atexit')
1343
+    def test_ovsdb_connections(self, mock_atexit):
1344
+        ovn_driver.OvnProviderHelper.ovn_nbdb_api = None
1345
+        ovn_driver.OvnProviderHelper.ovn_nbdb_api_for_events = None
1291 1346
         prov_helper1 = ovn_driver.OvnProviderHelper()
1292 1347
         prov_helper2 = ovn_driver.OvnProviderHelper()
1293
-        self.assertIs(prov_helper1.ovn_nbdb_api, prov_helper2.ovn_nbdb_api)
1348
+        # One connection for API requests
1349
+        self.assertIs(prov_helper1.ovn_nbdb_api,
1350
+                      prov_helper2.ovn_nbdb_api)
1351
+        # One connection to handle events
1352
+        self.assertIs(prov_helper1.ovn_nbdb_api_for_events,
1353
+                      prov_helper2.ovn_nbdb_api_for_events)
1294 1354
         prov_helper2.shutdown()
1295 1355
         prov_helper1.shutdown()
1356
+        # Assert at_exit calls
1357
+        mock_atexit.assert_has_calls([
1358
+            mock.call.register(prov_helper1.shutdown),
1359
+            mock.call.register(prov_helper2.shutdown)])
1296 1360
 
1297 1361
     def test_create_vip_port_vip_selected(self):
1298 1362
         expected_dict = {

Loading…
Cancel
Save