|
|
|
@ -16,11 +16,13 @@ import mock
|
|
|
|
|
import re |
|
|
|
|
|
|
|
|
|
import netaddr |
|
|
|
|
from neutron.agent.linux import utils as linux_utils |
|
|
|
|
from neutron.tests import base as tests_base |
|
|
|
|
from neutron_lib.api.definitions import portbindings |
|
|
|
|
from neutron_lib import constants |
|
|
|
|
from oslo_config import cfg |
|
|
|
|
from oslo_utils import uuidutils |
|
|
|
|
from ovsdbapp.backend.ovs_idl import event |
|
|
|
|
|
|
|
|
|
from networking_ovn.common import config |
|
|
|
|
from networking_ovn.common import constants as ovn_const |
|
|
|
@ -776,3 +778,50 @@ class TestMetadataPorts(base.TestOVNFunctionalBase):
|
|
|
|
|
self.assertIsNone(self._check_subnet_dhcp_options(subnet['id'], |
|
|
|
|
'2001:db8::/64')) |
|
|
|
|
self._check_metadata_port(self.n1_id, []) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ConnectionInactivityProbeSetEvent(event.WaitEvent): |
|
|
|
|
"""Wait for a Connection (NB/SB) to have the inactivity probe set""" |
|
|
|
|
ONETIME = False |
|
|
|
|
def __init__(self, target, inactivity_probe): |
|
|
|
|
table = 'Connection' |
|
|
|
|
events = (self.ROW_UPDATE,) |
|
|
|
|
super(ConnectionInactivityProbeSetEvent, self).__init__(events, table, |
|
|
|
|
None) |
|
|
|
|
self.event_name = "ConnectionEvent" |
|
|
|
|
self.target = target |
|
|
|
|
self.inactivity_probe = inactivity_probe |
|
|
|
|
def match_fn(self, event, row, old): |
|
|
|
|
return row.target in self.target |
|
|
|
|
def run(self, event, row, old): |
|
|
|
|
if (row.inactivity_probe and |
|
|
|
|
row.inactivity_probe[0] == self.inactivity_probe): |
|
|
|
|
self.event.set() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestSetInactivityProbe(base.TestOVNFunctionalBase): |
|
|
|
|
|
|
|
|
|
def setUp(self, *args): |
|
|
|
|
super(TestSetInactivityProbe, self).setUp(*args) |
|
|
|
|
self.dbs = [(config.get_ovn_nb_connection(), 'ptcp:1000:1.2.3.4'), |
|
|
|
|
(config.get_ovn_sb_connection(), 'ptcp:1001:1.2.3.4')] |
|
|
|
|
linux_utils.execute( |
|
|
|
|
['ovn-nbctl', '--db=%s' % self.dbs[0][0], |
|
|
|
|
'set-connection', self.dbs[0][1]], run_as_root=True) |
|
|
|
|
linux_utils.execute( |
|
|
|
|
['ovn-sbctl', '--db=%s' % self.dbs[1][0], |
|
|
|
|
'set-connection', self.dbs[1][1]], run_as_root=True) |
|
|
|
|
|
|
|
|
|
def test_1(self): |
|
|
|
|
mock.patch.object(config, 'get_ovn_ovsdb_probe_interval', |
|
|
|
|
return_value='2500').start() |
|
|
|
|
nb_connection = ConnectionInactivityProbeSetEvent(self.dbs[0][1], 2500) |
|
|
|
|
sb_connection = ConnectionInactivityProbeSetEvent(self.dbs[1][1], 2500) |
|
|
|
|
self.nb_api.idl.notify_handler.watch_event(nb_connection) |
|
|
|
|
self.sb_api.idl.notify_handler.watch_event(sb_connection) |
|
|
|
|
with mock.patch.object(utils, 'connection_config_to_target_string') \ |
|
|
|
|
as mock_target: |
|
|
|
|
mock_target.side_effect = [self.dbs[0][1], self.dbs[1][1]] |
|
|
|
|
self.mech_driver._set_inactivity_probe() |
|
|
|
|
self.assertTrue(nb_connection.wait()) |
|
|
|
|
self.assertTrue(sb_connection.wait()) |
|
|
|
|