[ovn]: Create neutron_pg_drop Port Group on init

The patch adds a short living connection in pre-fork routine that
creates neutron_pg_drop Port Group. Later after workers are spawned,
each worker also creates a short living connection and waits for an
event that the Port Group was created.

The short living IDLs limit its tables only for relevant tables so it
doesn't fetch the whole OVS DB to the local copy.

Closes-bug: #1866068

 Conflicts:
	neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py
	neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/ovsdb_monitor.py
	neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py

Change-Id: I1f5af36b8c3d5650f890edfed3c33dc206869824
Signed-off-by: Jakub Libosvar <libosvar@redhat.com>
(cherry picked from commit d7c23431ad)
This commit is contained in:
Jakub Libosvar 2020-03-18 14:27:17 +00:00 committed by Terry Wilson
parent d8f1f1118d
commit a6106ac2bd
7 changed files with 259 additions and 17 deletions

View File

@ -54,6 +54,7 @@ from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import impl_idl_ovn
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import maintenance
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_client
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_db_sync
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovsdb_monitor
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker
from neutron import service
from neutron.services.qos.drivers.ovn import driver as qos_driver
@ -222,6 +223,33 @@ class OVNMechanismDriver(api.MechanismDriver):
"""Pre-initialize the ML2/OVN driver."""
atexit.register(self._clean_hash_ring)
signal.signal(signal.SIGTERM, self._clean_hash_ring)
self._create_neutron_pg_drop()
def _create_neutron_pg_drop(self):
"""Create neutron_pg_drop Port Group.
The method creates a short living connection to the Northbound
database. Because of multiple controllers can attempt to create the
Port Group at the same time the transaction can fail and raise
RuntimeError. In such case, we make sure the Port Group was created,
otherwise the error is something else and it's raised to the caller.
"""
idl = ovsdb_monitor.OvnInitPGNbIdl.from_server(
ovn_conf.get_ovn_nb_connection(), 'OVN_Northbound', self)
with ovsdb_monitor.short_living_ovsdb_api(
impl_idl_ovn.OvsdbNbOvnIdl, idl) as pre_ovn_nb_api:
try:
create_default_drop_port_group(pre_ovn_nb_api)
except RuntimeError as re:
if pre_ovn_nb_api.get_port_group(
ovn_const.OVN_DROP_PORT_GROUP_NAME):
LOG.debug(
"Port Group %(port_group)s already exists, "
"ignoring RuntimeError %(error)s", {
'port_group': ovn_const.OVN_DROP_PORT_GROUP_NAME,
'error': re})
else:
raise
@staticmethod
def should_post_fork_initialize(worker_class):
@ -236,6 +264,7 @@ class OVNMechanismDriver(api.MechanismDriver):
return
self._post_fork_event.clear()
self._wait_for_pg_drop_event()
self._ovn_client_inst = None
if worker_class == neutron.wsgi.WorkerService:
@ -293,6 +322,23 @@ class OVNMechanismDriver(api.MechanismDriver):
self.hash_ring_group))
self._maintenance_thread.start()
def _wait_for_pg_drop_event(self):
"""Wait for event that occurs when neutron_pg_drop Port Group exists.
The method creates a short living connection to the Northbound
database. It waits for CREATE event caused by the Port Group.
Such event occurs when:
1) The Port Group doesn't exist and is created by other process.
2) The Port Group already exists and event is emitted when DB copy
is available to the IDL.
"""
idl = ovsdb_monitor.OvnInitPGNbIdl.from_server(
ovn_conf.get_ovn_nb_connection(), 'OVN_Northbound', self,
pg_only=True)
with ovsdb_monitor.short_living_ovsdb_api(
impl_idl_ovn.OvsdbNbOvnIdl, idl) as ovn_nb_api:
ovn_nb_api.idl.neutron_pg_drop_event.wait()
def _create_security_group_precommit(self, resource, event, trigger,
**kwargs):
ovn_revision_numbers_db.create_initial_revision(
@ -1231,3 +1277,25 @@ def get_availability_zones(cls, context, _driver, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
return list(_driver.list_availability_zones(context, filters).values())
def create_default_drop_port_group(nb_idl):
pg_name = ovn_const.OVN_DROP_PORT_GROUP_NAME
if nb_idl.get_port_group(pg_name):
LOG.debug("Port Group %s already exists", pg_name)
return
with nb_idl.transaction(check_error=True) as txn:
# If drop Port Group doesn't exist yet, create it.
txn.add(nb_idl.pg_add(pg_name, acls=[], may_exist=True))
# Add ACLs to this Port Group so that all traffic is dropped.
acls = ovn_acl.add_acls_for_drop_port_group(pg_name)
for acl in acls:
txn.add(nb_idl.pg_acl_add(may_exist=True, **acl))
ports_with_pg = set()
for pg in nb_idl.get_port_groups().values():
ports_with_pg.update(pg['ports'])
if ports_with_pg:
# Add the ports to the default Port Group
txn.add(nb_idl.pg_add_ports(pg_name, list(ports_with_pg)))

View File

@ -2027,24 +2027,7 @@ class OVNClient(object):
db_rev.bump_revision(
context, security_group, ovn_const.TYPE_SECURITY_GROUPS)
def create_default_drop_port_group(self, ports=None):
pg_name = ovn_const.OVN_DROP_PORT_GROUP_NAME
with self._nb_idl.transaction(check_error=True) as txn:
if not self._nb_idl.get_port_group(pg_name):
# If drop Port Group doesn't exist yet, create it.
txn.add(self._nb_idl.pg_add(pg_name, acls=[], may_exist=True))
# Add ACLs to this Port Group so that all traffic is dropped.
acls = ovn_acl.add_acls_for_drop_port_group(pg_name)
for acl in acls:
txn.add(self._nb_idl.pg_acl_add(may_exist=True, **acl))
if ports:
ports_ids = [port['id'] for port in ports]
# Add the ports to the default Port Group
txn.add(self._nb_idl.pg_add_ports(pg_name, ports_ids))
def _add_port_to_drop_port_group(self, port, txn):
self.create_default_drop_port_group()
txn.add(self._nb_idl.pg_add_ports(ovn_const.OVN_DROP_PORT_GROUP_NAME,
port))

View File

@ -13,6 +13,7 @@
# under the License.
import abc
import contextlib
import datetime
from neutron_lib import context as neutron_context
@ -346,6 +347,17 @@ class FIPAddDeleteEvent(row_event.RowEvent):
self.driver.delete_mac_binding_entries(row.external_ip)
class NeutronPgDropPortGroupCreated(row_event.WaitEvent):
"""WaitEvent for neutron_pg_drop Create event."""
def __init__(self):
table = 'Port_Group'
events = (self.ROW_CREATE,)
conditions = (('name', '=', ovn_const.OVN_DROP_PORT_GROUP_NAME),)
super(NeutronPgDropPortGroupCreated, self).__init__(
events, table, conditions)
self.event_name = 'PortGroupCreated'
class OvnDbNotifyHandler(backports.RowEventHandler):
def __init__(self, driver):
super(OvnDbNotifyHandler, self).__init__()
@ -549,6 +561,54 @@ class OvnSbIdl(OvnIdlDistributedLock):
PortBindingChassisUpdateEvent(self.driver)])
class OvnInitPGNbIdl(OvnIdl):
"""Very limited OVN NB IDL.
This IDL is intended to be used only in initialization phase with short
living DB connections.
"""
tables = ['Port_Group', 'Logical_Switch_Port', 'ACL']
def __init__(self, driver, remote, schema):
super(OvnInitPGNbIdl, self).__init__(driver, remote, schema)
self.cond_change(
'Port_Group',
[['name', '==', ovn_const.OVN_DROP_PORT_GROUP_NAME]])
self.neutron_pg_drop_event = NeutronPgDropPortGroupCreated()
self.notify_handler.watch_event(self.neutron_pg_drop_event)
@classmethod
def from_server(
cls, connection_string, schema_name, driver, pg_only=False):
_check_and_set_ssl_files(schema_name)
helper = idlutils.get_schema_helper(connection_string, schema_name)
if pg_only:
helper.register_table('Port_Group')
else:
for table in cls.tables:
helper.register_table(table)
return cls(driver, connection_string, helper)
@contextlib.contextmanager
def short_living_ovsdb_api(api_class, idl):
"""Context manager for short living connections to the database.
:param api_class: Class implementing the database calls
(e.g. from the impl_idl module)
:param idl: An instance of IDL class (e.g. instance of OvnNbIdl)
"""
conn = connection.Connection(
idl, timeout=ovn_conf.get_ovn_ovsdb_timeout())
api = api_class(conn)
try:
yield api
finally:
api.ovsdb_connection.stop()
def _check_and_set_ssl_files(schema_name):
if schema_name == 'OVN_Southbound':
priv_key_file = ovn_conf.get_ovn_sb_private_key()

View File

@ -285,6 +285,8 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
self.addCleanup(self._collect_processes_logs)
self.addCleanup(self.stop)
self.mech_driver.pre_fork_initialize(
mock.ANY, mock.ANY, trigger_cls.trigger)
# mech_driver.post_fork_initialize creates the IDL connections
self.mech_driver.post_fork_initialize(

View File

@ -19,11 +19,14 @@ from neutron_lib.api.definitions import portbindings
from neutron_lib import constants
from oslo_config import cfg
from oslo_utils import uuidutils
from ovsdbapp.tests.functional import base as ovs_base
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils
from neutron.common import utils as n_utils
from neutron.db import ovn_revision_numbers_db as db_rev
from neutron.plugins.ml2.drivers.ovn.mech_driver import mech_driver
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import impl_idl_ovn
from neutron.tests import base as tests_base
from neutron.tests.functional import base
@ -707,3 +710,55 @@ class TestAgentApi(base.TestOVNFunctionalBase):
def test_agent_show_ovn_controller(self):
self.assertTrue(self.plugin.get_agent(self.context,
self.controller_agent))
class TestCreateDefaultDropPortGroup(ovs_base.FunctionalTestCase,
base.BaseLoggingTestCase):
schemas = ['OVN_Southbound', 'OVN_Northbound']
PG_NAME = ovn_const.OVN_DROP_PORT_GROUP_NAME
def setUp(self):
super(TestCreateDefaultDropPortGroup, self).setUp()
self.api = impl_idl_ovn.OvsdbNbOvnIdl(
self.connection['OVN_Northbound'])
self.addCleanup(self.api.pg_del(self.PG_NAME, if_exists=True).execute,
check_error=True)
def test_port_group_exists(self):
"""Test new port group is not added or modified.
If Port Group was not existent, acls would be added.
"""
self.api.pg_add(
self.PG_NAME, acls=[], may_exist=True).execute(check_error=True)
mech_driver.create_default_drop_port_group(self.api)
port_group = self.api.get_port_group(self.PG_NAME)
self.assertFalse(port_group.acls)
def _test_pg_with_ports(self, expected_ports=None):
expected_ports = expected_ports or []
mech_driver.create_default_drop_port_group(self.api)
port_group = self.api.get_port_group(self.PG_NAME)
self.assertItemsEqual(
expected_ports, [port.name for port in port_group.ports])
def test_with_ports_available(self):
expected_ports = ['port1', 'port2']
testing_pg = 'testing'
testing_ls = 'testing'
with self.api.transaction(check_error=True) as txn:
txn.add(self.api.pg_add(
testing_pg,
external_ids={ovn_const.OVN_SG_EXT_ID_KEY: 'foo'}))
txn.add(self.api.ls_add(testing_ls))
port_uuids = [txn.add(self.api.lsp_add(testing_ls, port))
for port in expected_ports]
txn.add(self.api.pg_add_ports(testing_pg, port_uuids))
self.addCleanup(self.api.pg_del(testing_pg, if_exists=True).execute,
check_error=True)
self._test_pg_with_ports(expected_ports)
def test_without_ports(self):
self._test_pg_with_ports(expected_ports=[])

View File

@ -26,6 +26,7 @@ from ovs import poller
from ovs.stream import Stream
from ovsdbapp.backend.ovs_idl import connection
from ovsdbapp.backend.ovs_idl import idlutils
import testtools
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import hash_ring_manager
@ -606,3 +607,32 @@ class TestChassisEvent(base.BaseTestCase):
# after it became a Gateway chassis
self._test_handle_ha_chassis_group_changes_create(
self.event.ROW_UPDATE)
class TestShortLivingOvsdbApi(base.BaseTestCase):
def test_context(self):
api_class = mock.Mock()
idl = mock.Mock()
with ovsdb_monitor.short_living_ovsdb_api(api_class, idl) as api:
self.assertEqual(api_class.return_value, api)
api.ovsdb_connection.stop.assert_called_once_with()
def test_context_error(self):
api_class = mock.Mock()
idl = mock.Mock()
exc = RuntimeError()
try:
with ovsdb_monitor.short_living_ovsdb_api(api_class, idl) as api:
self.assertEqual(api_class.return_value, api)
raise exc
except RuntimeError as re:
self.assertIs(exc, re)
api.ovsdb_connection.stop.assert_called_once_with()
def test_api_class_error(self):
api_class = mock.Mock(side_effect=RuntimeError())
idl = mock.Mock()
with testtools.ExpectedException(RuntimeError):
with ovsdb_monitor.short_living_ovsdb_api(api_class, idl):
# Make sure it never enter the api context
raise Exception("API class instantiated but it should not")

View File

@ -51,6 +51,7 @@ from neutron.db import segments_db
from neutron.plugins.ml2.drivers.ovn.agent import neutron_agent
from neutron.plugins.ml2.drivers.ovn.mech_driver import mech_driver
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_client
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovsdb_monitor
from neutron.plugins.ml2.drivers import type_geneve # noqa
from neutron.services.revisions import revision_plugin
from neutron.tests.unit.extensions import test_segment
@ -111,6 +112,49 @@ class TestOVNMechanismDriver(test_plugin.Ml2PluginV2TestCase):
p.start()
self.addCleanup(p.stop)
@mock.patch.object(ovsdb_monitor.OvnInitPGNbIdl, 'from_server')
@mock.patch.object(ovsdb_monitor, 'short_living_ovsdb_api')
def test__create_neutron_pg_drop_non_existing(
self, m_ovsdb_api_con, m_from_server):
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
m_ovsdb_api.get_port_group.return_value = None
self.mech_driver._create_neutron_pg_drop()
self.assertEqual(1, m_ovsdb_api.get_port_group.call_count)
self.assertTrue(m_ovsdb_api.transaction.return_value.__enter__.called)
@mock.patch.object(ovsdb_monitor.OvnInitPGNbIdl, 'from_server')
@mock.patch.object(ovsdb_monitor, 'short_living_ovsdb_api')
def test__create_neutron_pg_drop_existing(
self, m_ovsdb_api_con, m_from_server):
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
m_ovsdb_api.get_port_group.return_value = 'foo'
self.mech_driver._create_neutron_pg_drop()
self.assertEqual(1, m_ovsdb_api.get_port_group.call_count)
self.assertFalse(m_ovsdb_api.transaction.return_value.__enter__.called)
@mock.patch.object(ovsdb_monitor.OvnInitPGNbIdl, 'from_server')
@mock.patch.object(ovsdb_monitor, 'short_living_ovsdb_api')
def test__create_neutron_pg_drop_created_meanwhile(
self, m_ovsdb_api_con, m_from_server):
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
m_ovsdb_api.get_port_group.side_effect = [None, 'foo']
m_ovsdb_api.transaction.return_value.__exit__.side_effect = (
RuntimeError())
self.mech_driver._create_neutron_pg_drop()
self.assertEqual(2, m_ovsdb_api.get_port_group.call_count)
@mock.patch.object(ovsdb_monitor.OvnInitPGNbIdl, 'from_server')
@mock.patch.object(ovsdb_monitor, 'short_living_ovsdb_api')
def test__create_neutron_pg_drop_error(
self, m_ovsdb_api_con, m_from_server):
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
m_ovsdb_api.get_port_group.side_effect = [None, None]
m_ovsdb_api.transaction.return_value.__exit__.side_effect = (
RuntimeError())
self.assertRaises(RuntimeError,
self.mech_driver._create_neutron_pg_drop)
self.assertEqual(2, m_ovsdb_api.get_port_group.call_count)
@mock.patch.object(ovn_revision_numbers_db, 'bump_revision')
def test__create_security_group(self, mock_bump):
self.mech_driver._create_security_group(