Merge "[ovn]: Create neutron_pg_drop Port Group on init"

This commit is contained in:
Zuul 2020-05-08 22:54:47 +00:00 committed by Gerrit Code Review
commit 0fce58fb07
7 changed files with 259 additions and 17 deletions

View File

@ -50,6 +50,7 @@ from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import impl_idl_ovn
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import maintenance
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_client
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_db_sync
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovsdb_monitor
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker
from neutron.services.qos.drivers.ovn import driver as qos_driver
from neutron.services.segments import db as segment_service_db
@ -207,11 +208,39 @@ class OVNMechanismDriver(api.MechanismDriver):
"""Pre-initialize the ML2/OVN driver."""
atexit.register(self._clean_hash_ring)
signal.signal(signal.SIGTERM, self._clean_hash_ring)
self._create_neutron_pg_drop()
def _create_neutron_pg_drop(self):
"""Create neutron_pg_drop Port Group.
The method creates a short living connection to the Northbound
database. Because of multiple controllers can attempt to create the
Port Group at the same time the transaction can fail and raise
RuntimeError. In such case, we make sure the Port Group was created,
otherwise the error is something else and it's raised to the caller.
"""
idl = ovsdb_monitor.OvnInitPGNbIdl.from_server(
ovn_conf.get_ovn_nb_connection(), 'OVN_Northbound', self)
with ovsdb_monitor.short_living_ovsdb_api(
impl_idl_ovn.OvsdbNbOvnIdl, idl) as pre_ovn_nb_api:
try:
create_default_drop_port_group(pre_ovn_nb_api)
except RuntimeError as re:
if pre_ovn_nb_api.get_port_group(
ovn_const.OVN_DROP_PORT_GROUP_NAME):
LOG.debug(
"Port Group %(port_group)s already exists, "
"ignoring RuntimeError %(error)s", {
'port_group': ovn_const.OVN_DROP_PORT_GROUP_NAME,
'error': re})
else:
raise
def post_fork_initialize(self, resource, event, trigger, payload=None):
# NOTE(rtheis): This will initialize all workers (API, RPC,
# plugin service and OVN) with OVN IDL connections.
self._post_fork_event.clear()
self._wait_for_pg_drop_event()
self._ovn_client_inst = None
is_maintenance = (ovn_utils.get_method_class(trigger) ==
@ -265,6 +294,23 @@ class OVNMechanismDriver(api.MechanismDriver):
self.hash_ring_group))
self._maintenance_thread.start()
def _wait_for_pg_drop_event(self):
"""Wait for event that occurs when neutron_pg_drop Port Group exists.
The method creates a short living connection to the Northbound
database. It waits for CREATE event caused by the Port Group.
Such event occurs when:
1) The Port Group doesn't exist and is created by other process.
2) The Port Group already exists and event is emitted when DB copy
is available to the IDL.
"""
idl = ovsdb_monitor.OvnInitPGNbIdl.from_server(
ovn_conf.get_ovn_nb_connection(), 'OVN_Northbound', self,
pg_only=True)
with ovsdb_monitor.short_living_ovsdb_api(
impl_idl_ovn.OvsdbNbOvnIdl, idl) as ovn_nb_api:
ovn_nb_api.idl.neutron_pg_drop_event.wait()
def _create_security_group_precommit(self, resource, event, trigger,
**kwargs):
ovn_revision_numbers_db.create_initial_revision(
@ -1155,3 +1201,25 @@ def delete_agent(self, context, id, _driver=None):
get_agent(self, None, id, _driver=_driver)
raise n_exc.BadRequest(resource='agent',
msg='OVN agents cannot be deleted')
def create_default_drop_port_group(nb_idl):
pg_name = ovn_const.OVN_DROP_PORT_GROUP_NAME
if nb_idl.get_port_group(pg_name):
LOG.debug("Port Group %s already exists", pg_name)
return
with nb_idl.transaction(check_error=True) as txn:
# If drop Port Group doesn't exist yet, create it.
txn.add(nb_idl.pg_add(pg_name, acls=[], may_exist=True))
# Add ACLs to this Port Group so that all traffic is dropped.
acls = ovn_acl.add_acls_for_drop_port_group(pg_name)
for acl in acls:
txn.add(nb_idl.pg_acl_add(may_exist=True, **acl))
ports_with_pg = set()
for pg in nb_idl.get_port_groups().values():
ports_with_pg.update(pg['ports'])
if ports_with_pg:
# Add the ports to the default Port Group
txn.add(nb_idl.pg_add_ports(pg_name, list(ports_with_pg)))

View File

@ -2077,24 +2077,7 @@ class OVNClient(object):
db_rev.bump_revision(
context, security_group, ovn_const.TYPE_SECURITY_GROUPS)
def create_default_drop_port_group(self, ports=None):
pg_name = ovn_const.OVN_DROP_PORT_GROUP_NAME
with self._nb_idl.transaction(check_error=True) as txn:
if not self._nb_idl.get_port_group(pg_name):
# If drop Port Group doesn't exist yet, create it.
txn.add(self._nb_idl.pg_add(pg_name, acls=[], may_exist=True))
# Add ACLs to this Port Group so that all traffic is dropped.
acls = ovn_acl.add_acls_for_drop_port_group(pg_name)
for acl in acls:
txn.add(self._nb_idl.pg_acl_add(may_exist=True, **acl))
if ports:
ports_ids = [port['id'] for port in ports]
# Add the ports to the default Port Group
txn.add(self._nb_idl.pg_add_ports(pg_name, ports_ids))
def _add_port_to_drop_port_group(self, port, txn):
self.create_default_drop_port_group()
txn.add(self._nb_idl.pg_add_ports(ovn_const.OVN_DROP_PORT_GROUP_NAME,
port))

View File

@ -13,6 +13,7 @@
# under the License.
import abc
import contextlib
import datetime
from neutron_lib import context as neutron_context
@ -346,6 +347,17 @@ class FIPAddDeleteEvent(row_event.RowEvent):
self.driver.delete_mac_binding_entries(row.external_ip)
class NeutronPgDropPortGroupCreated(row_event.WaitEvent):
"""WaitEvent for neutron_pg_drop Create event."""
def __init__(self):
table = 'Port_Group'
events = (self.ROW_CREATE,)
conditions = (('name', '=', ovn_const.OVN_DROP_PORT_GROUP_NAME),)
super(NeutronPgDropPortGroupCreated, self).__init__(
events, table, conditions)
self.event_name = 'PortGroupCreated'
class OvnDbNotifyHandler(event.RowEventHandler):
def __init__(self, driver):
super(OvnDbNotifyHandler, self).__init__()
@ -538,6 +550,54 @@ class OvnSbIdl(OvnIdlDistributedLock):
PortBindingChassisUpdateEvent(self.driver)])
class OvnInitPGNbIdl(OvnIdl):
"""Very limited OVN NB IDL.
This IDL is intended to be used only in initialization phase with short
living DB connections.
"""
tables = ['Port_Group', 'Logical_Switch_Port', 'ACL']
def __init__(self, driver, remote, schema):
super(OvnInitPGNbIdl, self).__init__(driver, remote, schema)
self.cond_change(
'Port_Group',
[['name', '==', ovn_const.OVN_DROP_PORT_GROUP_NAME]])
self.neutron_pg_drop_event = NeutronPgDropPortGroupCreated()
self.notify_handler.watch_event(self.neutron_pg_drop_event)
@classmethod
def from_server(
cls, connection_string, schema_name, driver, pg_only=False):
_check_and_set_ssl_files(schema_name)
helper = idlutils.get_schema_helper(connection_string, schema_name)
if pg_only:
helper.register_table('Port_Group')
else:
for table in cls.tables:
helper.register_table(table)
return cls(driver, connection_string, helper)
@contextlib.contextmanager
def short_living_ovsdb_api(api_class, idl):
"""Context manager for short living connections to the database.
:param api_class: Class implementing the database calls
(e.g. from the impl_idl module)
:param idl: An instance of IDL class (e.g. instance of OvnNbIdl)
"""
conn = connection.Connection(
idl, timeout=ovn_conf.get_ovn_ovsdb_timeout())
api = api_class(conn)
try:
yield api
finally:
api.ovsdb_connection.stop()
def _check_and_set_ssl_files(schema_name):
if schema_name == 'OVN_Southbound':
priv_key_file = ovn_conf.get_ovn_sb_private_key()

View File

@ -278,6 +278,8 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
self.addCleanup(self._collect_processes_logs)
self.addCleanup(self.stop)
self.mech_driver.pre_fork_initialize(
mock.ANY, mock.ANY, trigger_cls.trigger)
# mech_driver.post_fork_initialize creates the IDL connections
self.mech_driver.post_fork_initialize(

View File

@ -18,11 +18,14 @@ import mock
from neutron_lib.api.definitions import portbindings
from oslo_config import cfg
from oslo_utils import uuidutils
from ovsdbapp.tests.functional import base as ovs_base
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils
from neutron.common import utils as n_utils
from neutron.db import ovn_revision_numbers_db as db_rev
from neutron.plugins.ml2.drivers.ovn.mech_driver import mech_driver
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import impl_idl_ovn
from neutron.tests import base as tests_base
from neutron.tests.functional import base
@ -594,3 +597,55 @@ class TestExternalPorts(base.TestOVNFunctionalBase):
def test_external_port_update_switchdev_vnic_macvtap(self):
self._test_external_port_update_switchdev(portbindings.VNIC_MACVTAP)
class TestCreateDefaultDropPortGroup(ovs_base.FunctionalTestCase,
base.BaseLoggingTestCase):
schemas = ['OVN_Southbound', 'OVN_Northbound']
PG_NAME = ovn_const.OVN_DROP_PORT_GROUP_NAME
def setUp(self):
super(TestCreateDefaultDropPortGroup, self).setUp()
self.api = impl_idl_ovn.OvsdbNbOvnIdl(
self.connection['OVN_Northbound'])
self.addCleanup(self.api.pg_del(self.PG_NAME, if_exists=True).execute,
check_error=True)
def test_port_group_exists(self):
"""Test new port group is not added or modified.
If Port Group was not existent, acls would be added.
"""
self.api.pg_add(
self.PG_NAME, acls=[], may_exist=True).execute(check_error=True)
mech_driver.create_default_drop_port_group(self.api)
port_group = self.api.get_port_group(self.PG_NAME)
self.assertFalse(port_group.acls)
def _test_pg_with_ports(self, expected_ports=None):
expected_ports = expected_ports or []
mech_driver.create_default_drop_port_group(self.api)
port_group = self.api.get_port_group(self.PG_NAME)
self.assertItemsEqual(
expected_ports, [port.name for port in port_group.ports])
def test_with_ports_available(self):
expected_ports = ['port1', 'port2']
testing_pg = 'testing'
testing_ls = 'testing'
with self.api.transaction(check_error=True) as txn:
txn.add(self.api.pg_add(
testing_pg,
external_ids={ovn_const.OVN_SG_EXT_ID_KEY: 'foo'}))
txn.add(self.api.ls_add(testing_ls))
port_uuids = [txn.add(self.api.lsp_add(testing_ls, port))
for port in expected_ports]
txn.add(self.api.pg_add_ports(testing_pg, port_uuids))
self.addCleanup(self.api.pg_del(testing_pg, if_exists=True).execute,
check_error=True)
self._test_pg_with_ports(expected_ports)
def test_without_ports(self):
self._test_pg_with_ports(expected_ports=[])

View File

@ -26,6 +26,7 @@ from ovs import poller
from ovs.stream import Stream
from ovsdbapp.backend.ovs_idl import connection
from ovsdbapp.backend.ovs_idl import idlutils
import testtools
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import hash_ring_manager
@ -601,3 +602,32 @@ class TestChassisEvent(base.BaseTestCase):
# after it became a Gateway chassis
self._test_handle_ha_chassis_group_changes_create(
self.event.ROW_UPDATE)
class TestShortLivingOvsdbApi(base.BaseTestCase):
def test_context(self):
api_class = mock.Mock()
idl = mock.Mock()
with ovsdb_monitor.short_living_ovsdb_api(api_class, idl) as api:
self.assertEqual(api_class.return_value, api)
api.ovsdb_connection.stop.assert_called_once_with()
def test_context_error(self):
api_class = mock.Mock()
idl = mock.Mock()
exc = RuntimeError()
try:
with ovsdb_monitor.short_living_ovsdb_api(api_class, idl) as api:
self.assertEqual(api_class.return_value, api)
raise exc
except RuntimeError as re:
self.assertIs(exc, re)
api.ovsdb_connection.stop.assert_called_once_with()
def test_api_class_error(self):
api_class = mock.Mock(side_effect=RuntimeError())
idl = mock.Mock()
with testtools.ExpectedException(RuntimeError):
with ovsdb_monitor.short_living_ovsdb_api(api_class, idl):
# Make sure it never enter the api context
raise Exception("API class instantiated but it should not")

View File

@ -48,6 +48,7 @@ from neutron.db import provisioning_blocks
from neutron.db import securitygroups_db
from neutron.plugins.ml2.drivers.ovn.mech_driver import mech_driver
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_client
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovsdb_monitor
from neutron.plugins.ml2.drivers import type_geneve # noqa
from neutron.services.revisions import revision_plugin
from neutron.tests.unit.extensions import test_segment
@ -108,6 +109,49 @@ class TestOVNMechanismDriver(test_plugin.Ml2PluginV2TestCase):
p.start()
self.addCleanup(p.stop)
@mock.patch.object(ovsdb_monitor.OvnInitPGNbIdl, 'from_server')
@mock.patch.object(ovsdb_monitor, 'short_living_ovsdb_api')
def test__create_neutron_pg_drop_non_existing(
self, m_ovsdb_api_con, m_from_server):
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
m_ovsdb_api.get_port_group.return_value = None
self.mech_driver._create_neutron_pg_drop()
self.assertEqual(1, m_ovsdb_api.get_port_group.call_count)
self.assertTrue(m_ovsdb_api.transaction.return_value.__enter__.called)
@mock.patch.object(ovsdb_monitor.OvnInitPGNbIdl, 'from_server')
@mock.patch.object(ovsdb_monitor, 'short_living_ovsdb_api')
def test__create_neutron_pg_drop_existing(
self, m_ovsdb_api_con, m_from_server):
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
m_ovsdb_api.get_port_group.return_value = 'foo'
self.mech_driver._create_neutron_pg_drop()
self.assertEqual(1, m_ovsdb_api.get_port_group.call_count)
self.assertFalse(m_ovsdb_api.transaction.return_value.__enter__.called)
@mock.patch.object(ovsdb_monitor.OvnInitPGNbIdl, 'from_server')
@mock.patch.object(ovsdb_monitor, 'short_living_ovsdb_api')
def test__create_neutron_pg_drop_created_meanwhile(
self, m_ovsdb_api_con, m_from_server):
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
m_ovsdb_api.get_port_group.side_effect = [None, 'foo']
m_ovsdb_api.transaction.return_value.__exit__.side_effect = (
RuntimeError())
self.mech_driver._create_neutron_pg_drop()
self.assertEqual(2, m_ovsdb_api.get_port_group.call_count)
@mock.patch.object(ovsdb_monitor.OvnInitPGNbIdl, 'from_server')
@mock.patch.object(ovsdb_monitor, 'short_living_ovsdb_api')
def test__create_neutron_pg_drop_error(
self, m_ovsdb_api_con, m_from_server):
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
m_ovsdb_api.get_port_group.side_effect = [None, None]
m_ovsdb_api.transaction.return_value.__exit__.side_effect = (
RuntimeError())
self.assertRaises(RuntimeError,
self.mech_driver._create_neutron_pg_drop)
self.assertEqual(2, m_ovsdb_api.get_port_group.call_count)
@mock.patch.object(ovn_revision_numbers_db, 'bump_revision')
def test__create_security_group(self, mock_bump):
self.mech_driver._create_security_group(