Merge "Ensure only one worker creates neturon_pg_drop"
This commit is contained in:
commit
19372a3cd8
@ -264,20 +264,25 @@ class OVNMechanismDriver(api.MechanismDriver):
|
|||||||
"""
|
"""
|
||||||
idl = ovsdb_monitor.OvnInitPGNbIdl.from_server(
|
idl = ovsdb_monitor.OvnInitPGNbIdl.from_server(
|
||||||
ovn_conf.get_ovn_nb_connection(), self.nb_schema_helper, self)
|
ovn_conf.get_ovn_nb_connection(), self.nb_schema_helper, self)
|
||||||
|
# Only one server should try to create the port group
|
||||||
|
idl.set_lock('pg_drop_creation')
|
||||||
with ovsdb_monitor.short_living_ovsdb_api(
|
with ovsdb_monitor.short_living_ovsdb_api(
|
||||||
impl_idl_ovn.OvsdbNbOvnIdl, idl) as pre_ovn_nb_api:
|
impl_idl_ovn.OvsdbNbOvnIdl, idl) as pre_ovn_nb_api:
|
||||||
try:
|
try:
|
||||||
create_default_drop_port_group(pre_ovn_nb_api)
|
create_default_drop_port_group(pre_ovn_nb_api)
|
||||||
except RuntimeError as re:
|
except RuntimeError as re:
|
||||||
if pre_ovn_nb_api.get_port_group(
|
# If we don't get the lock, and the port group didn't exist
|
||||||
ovn_const.OVN_DROP_PORT_GROUP_NAME):
|
# when we tried to create it, it might still have been
|
||||||
LOG.debug(
|
# created by another server and we just haven't gotten the
|
||||||
"Port Group %(port_group)s already exists, "
|
# update yet.
|
||||||
"ignoring RuntimeError %(error)s", {
|
LOG.info("Waiting for Port Group %(pg)s to be created",
|
||||||
'port_group': ovn_const.OVN_DROP_PORT_GROUP_NAME,
|
{'pg': ovn_const.OVN_DROP_PORT_GROUP_NAME})
|
||||||
'error': re})
|
if not idl.neutron_pg_drop_event.wait():
|
||||||
else:
|
LOG.error("Port Group %(pg)s was not created in time",
|
||||||
raise
|
{'pg': ovn_const.OVN_DROP_PORT_GROUP_NAME})
|
||||||
|
raise re
|
||||||
|
LOG.info("Porg Group %(pg)s was created by another server",
|
||||||
|
{'pg': ovn_const.OVN_DROP_PORT_GROUP_NAME})
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def should_post_fork_initialize(worker_class):
|
def should_post_fork_initialize(worker_class):
|
||||||
@ -292,7 +297,6 @@ class OVNMechanismDriver(api.MechanismDriver):
|
|||||||
return
|
return
|
||||||
|
|
||||||
self._post_fork_event.clear()
|
self._post_fork_event.clear()
|
||||||
self._wait_for_pg_drop_event()
|
|
||||||
self._ovn_client_inst = None
|
self._ovn_client_inst = None
|
||||||
|
|
||||||
if worker_class == neutron.wsgi.WorkerService:
|
if worker_class == neutron.wsgi.WorkerService:
|
||||||
@ -344,23 +348,6 @@ class OVNMechanismDriver(api.MechanismDriver):
|
|||||||
self.hash_ring_group))
|
self.hash_ring_group))
|
||||||
self._maintenance_thread.start()
|
self._maintenance_thread.start()
|
||||||
|
|
||||||
def _wait_for_pg_drop_event(self):
|
|
||||||
"""Wait for event that occurs when neutron_pg_drop Port Group exists.
|
|
||||||
|
|
||||||
The method creates a short living connection to the Northbound
|
|
||||||
database. It waits for CREATE event caused by the Port Group.
|
|
||||||
Such event occurs when:
|
|
||||||
1) The Port Group doesn't exist and is created by other process.
|
|
||||||
2) The Port Group already exists and event is emitted when DB copy
|
|
||||||
is available to the IDL.
|
|
||||||
"""
|
|
||||||
idl = ovsdb_monitor.OvnInitPGNbIdl.from_server(
|
|
||||||
ovn_conf.get_ovn_nb_connection(), self.nb_schema_helper, self,
|
|
||||||
pg_only=True)
|
|
||||||
with ovsdb_monitor.short_living_ovsdb_api(
|
|
||||||
impl_idl_ovn.OvsdbNbOvnIdl, idl) as ovn_nb_api:
|
|
||||||
ovn_nb_api.idl.neutron_pg_drop_event.wait()
|
|
||||||
|
|
||||||
def _create_security_group_precommit(self, resource, event, trigger,
|
def _create_security_group_precommit(self, resource, event, trigger,
|
||||||
payload):
|
payload):
|
||||||
context = payload.context
|
context = payload.context
|
||||||
|
@ -763,9 +763,7 @@ class OvsdbNbOvnIdl(nb_impl_idl.OvnNbApiIdlImpl, Backend):
|
|||||||
if uuidutils.is_uuid_like(pg_name):
|
if uuidutils.is_uuid_like(pg_name):
|
||||||
pg_name = utils.ovn_port_group_name(pg_name)
|
pg_name = utils.ovn_port_group_name(pg_name)
|
||||||
try:
|
try:
|
||||||
for pg in self._tables['Port_Group'].rows.values():
|
return self.lookup('Port_Group', pg_name, default=None)
|
||||||
if pg.name == pg_name:
|
|
||||||
return pg
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# TODO(dalvarez): This except block is added for backwards compat
|
# TODO(dalvarez): This except block is added for backwards compat
|
||||||
# with old OVN schemas (<=2.9) where Port Groups are not present.
|
# with old OVN schemas (<=2.9) where Port Groups are not present.
|
||||||
|
@ -445,12 +445,12 @@ class FIPAddDeleteEvent(row_event.RowEvent):
|
|||||||
|
|
||||||
class NeutronPgDropPortGroupCreated(row_event.WaitEvent):
|
class NeutronPgDropPortGroupCreated(row_event.WaitEvent):
|
||||||
"""WaitEvent for neutron_pg_drop Create event."""
|
"""WaitEvent for neutron_pg_drop Create event."""
|
||||||
def __init__(self):
|
def __init__(self, timeout=None):
|
||||||
table = 'Port_Group'
|
table = 'Port_Group'
|
||||||
events = (self.ROW_CREATE,)
|
events = (self.ROW_CREATE,)
|
||||||
conditions = (('name', '=', ovn_const.OVN_DROP_PORT_GROUP_NAME),)
|
conditions = (('name', '=', ovn_const.OVN_DROP_PORT_GROUP_NAME),)
|
||||||
super(NeutronPgDropPortGroupCreated, self).__init__(
|
super(NeutronPgDropPortGroupCreated, self).__init__(
|
||||||
events, table, conditions)
|
events, table, conditions, timeout=timeout)
|
||||||
self.event_name = 'PortGroupCreated'
|
self.event_name = 'PortGroupCreated'
|
||||||
|
|
||||||
|
|
||||||
@ -693,9 +693,15 @@ class OvnInitPGNbIdl(OvnIdl):
|
|||||||
self.cond_change(
|
self.cond_change(
|
||||||
'Port_Group',
|
'Port_Group',
|
||||||
[['name', '==', ovn_const.OVN_DROP_PORT_GROUP_NAME]])
|
[['name', '==', ovn_const.OVN_DROP_PORT_GROUP_NAME]])
|
||||||
self.neutron_pg_drop_event = NeutronPgDropPortGroupCreated()
|
self.neutron_pg_drop_event = NeutronPgDropPortGroupCreated(
|
||||||
|
timeout=ovn_conf.get_ovn_ovsdb_timeout())
|
||||||
self.notify_handler.watch_event(self.neutron_pg_drop_event)
|
self.notify_handler.watch_event(self.neutron_pg_drop_event)
|
||||||
|
|
||||||
|
def notify(self, event, row, updates=None):
|
||||||
|
# Go ahead and process events even if the lock is contended so we can
|
||||||
|
# know that some other server has created the drop group
|
||||||
|
self.notify_handler.notify(event, row, updates)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_server(cls, connection_string, helper, driver, pg_only=False):
|
def from_server(cls, connection_string, helper, driver, pg_only=False):
|
||||||
if pg_only:
|
if pg_only:
|
||||||
|
@ -154,23 +154,29 @@ class TestOVNMechanismDriver(TestOVNMechanismDriverBase):
|
|||||||
def test__create_neutron_pg_drop_created_meanwhile(
|
def test__create_neutron_pg_drop_created_meanwhile(
|
||||||
self, m_ovsdb_api_con, m_from_server):
|
self, m_ovsdb_api_con, m_from_server):
|
||||||
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
|
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
|
||||||
m_ovsdb_api.get_port_group.side_effect = [None, 'foo']
|
m_ovsdb_api.get_port_group.return_value = None
|
||||||
m_ovsdb_api.transaction.return_value.__exit__.side_effect = (
|
m_ovsdb_api.transaction.return_value.__exit__.side_effect = (
|
||||||
RuntimeError())
|
RuntimeError())
|
||||||
self.mech_driver._create_neutron_pg_drop()
|
idl = m_from_server.return_value
|
||||||
self.assertEqual(2, m_ovsdb_api.get_port_group.call_count)
|
idl.neutron_pg_drop_event.wait.return_value = True
|
||||||
|
result = self.mech_driver._create_neutron_pg_drop()
|
||||||
|
idl.neutron_pg_drop_event.wait.assert_called_once()
|
||||||
|
# If sommething else creates the port group, just return
|
||||||
|
self.assertIsNone(result)
|
||||||
|
|
||||||
@mock.patch.object(ovsdb_monitor.OvnInitPGNbIdl, 'from_server')
|
@mock.patch.object(ovsdb_monitor.OvnInitPGNbIdl, 'from_server')
|
||||||
@mock.patch.object(ovsdb_monitor, 'short_living_ovsdb_api')
|
@mock.patch.object(ovsdb_monitor, 'short_living_ovsdb_api')
|
||||||
def test__create_neutron_pg_drop_error(
|
def test__create_neutron_pg_drop_error(
|
||||||
self, m_ovsdb_api_con, m_from_server):
|
self, m_ovsdb_api_con, m_from_server):
|
||||||
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
|
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
|
||||||
m_ovsdb_api.get_port_group.side_effect = [None, None]
|
m_ovsdb_api.get_port_group.return_value = None
|
||||||
m_ovsdb_api.transaction.return_value.__exit__.side_effect = (
|
m_ovsdb_api.transaction.return_value.__exit__.side_effect = (
|
||||||
RuntimeError())
|
RuntimeError())
|
||||||
|
idl = m_from_server.return_value
|
||||||
|
idl.neutron_pg_drop_event.wait.return_value = False
|
||||||
self.assertRaises(RuntimeError,
|
self.assertRaises(RuntimeError,
|
||||||
self.mech_driver._create_neutron_pg_drop)
|
self.mech_driver._create_neutron_pg_drop)
|
||||||
self.assertEqual(2, m_ovsdb_api.get_port_group.call_count)
|
idl.neutron_pg_drop_event.wait.assert_called_once()
|
||||||
|
|
||||||
def test__get_max_tunid_no_key_set(self):
|
def test__get_max_tunid_no_key_set(self):
|
||||||
self.mech_driver.nb_ovn.nb_global.options.get.return_value = None
|
self.mech_driver.nb_ovn.nb_global.options.get.return_value = None
|
||||||
|
Loading…
Reference in New Issue
Block a user