Add Chassis creation wait event in "TestAgentMonitor"

Added a wait event in "TestAgentMonitor" setUp method, in order to wait
for the Chassis register creation event. This patch also adds an active
wait to check the number of "AgentCache" agents present in the local
cache, before starting the tests.

Closes-Bug: #1952508
Change-Id: I75c759ce94e027790b18411617e08ae5bb46bcef
(cherry picked from commit 1af63e8812)
This commit is contained in:
Rodolfo Alonso Hernandez 2021-11-27 11:26:13 +00:00 committed by yatin
parent 2f9f70bb22
commit 6a9e52a80c
1 changed files with 20 additions and 10 deletions

View File

@ -45,6 +45,15 @@ class WaitForDataPathBindingCreateEvent(event.WaitEvent):
events, table, conditions, timeout=15)
class WaitForChassisPrivateCreateEvent(event.WaitEvent):
event_name = 'WaitForChassisPrivateCreateEvent'
def __init__(self, chassis_name, chassis_table):
events = (self.ROW_CREATE,)
conditions = (('name', '=', chassis_name),)
super().__init__(events, chassis_table, conditions, timeout=15)
class DistributedLockTestEvent(event.WaitEvent):
ONETIME = False
COUNTER = 0
@ -303,16 +312,17 @@ class TestAgentMonitor(base.TestOVNFunctionalBase):
self.mock_ovsdb_idl = mock.Mock()
self.handler = self.sb_api.idl.notify_handler
self.mock_ovsdb_idl = mock.Mock()
self.chassis_name = self.add_fake_chassis(self.FAKE_CHASSIS_HOST,
external_ids={'ovn-cms-options': 'enable-chassis-as-gw'})
def test_network_agent_present(self):
chassis_row = self.sb_api.db_find(
'Chassis', ('name', '=', self.chassis_name)).execute(
check_error=True)
self.assertTrue(chassis_row)
self.assertEqual(neutron_agent.ControllerGatewayAgent,
type(neutron_agent.AgentCache()[self.chassis_name]))
chassis_name = uuidutils.generate_uuid()
row_event = WaitForChassisPrivateCreateEvent(
chassis_name, self.mech_driver.agent_chassis_table)
self.mech_driver.sb_ovn.idl.notify_handler.watch_event(row_event)
self.chassis_name = self.add_fake_chassis(
self.FAKE_CHASSIS_HOST,
external_ids={'ovn-cms-options': 'enable-chassis-as-gw'},
name=chassis_name)
self.assertTrue(row_event.wait())
n_utils.wait_until_true(
lambda: len(list(neutron_agent.AgentCache())) == 1)
def test_agent_change_controller(self):
self.assertEqual(neutron_agent.ControllerGatewayAgent,