Browse Source

Merge "[OVN] Wait for WaitForDataPathBindingCreateEvent event in functional tests" into stable/ussuri

changes/15/741915/1
Zuul 2 years ago committed by Gerrit Code Review
parent
commit
f3418f852a
  1. 17
      neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py

17
neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py

@ -38,6 +38,17 @@ class WaitForMACBindingDeleteEvent(event.WaitEvent):
events, table, conditions, timeout=15)
class WaitForDataPathBindingCreateEvent(event.WaitEvent):
event_name = 'WaitForDataPathBindingCreateEvent'
def __init__(self, net_name):
table = 'Datapath_Binding'
events = (self.ROW_CREATE,)
conditions = (('external_ids', '=', {'name2': net_name}),)
super(WaitForDataPathBindingCreateEvent, self).__init__(
events, table, conditions, timeout=15)
class DistributedLockTestEvent(event.WaitEvent):
ONETIME = False
COUNTER = 0
@ -124,10 +135,12 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
* Delete the FIP.
* Check that the MAC_Binding entry gets deleted.
"""
self._make_network(self.fmt, 'network1', True)
net_name = 'network1'
self._make_network(self.fmt, net_name, True)
row_event = WaitForDataPathBindingCreateEvent(net_name)
dp = self.sb_api.db_find(
'Datapath_Binding',
('external_ids', '=', {'name2': 'network1'})).execute()
('external_ids', '=', {'name2': net_name})).execute()
macb_id = self.sb_api.db_create('MAC_Binding', datapath=dp[0]['_uuid'],
ip='100.0.0.21').execute()
port = self.create_port()

Loading…
Cancel
Save