ovn: Use ovsdb-client to create neutron_pg_drop

Previously we used short living OVN database connection to create
neutron_pg_drop Port Group before workers were spawned. The
pre_fork_initialize actually happens after the api workers are spawned
anyways and it blocks spawning of other workers, such as maintenance,
rpc or periodic. If the OVN database was large it may take several
minutes to connect to the database at scale and this blocks spawning of
other workers. That means connecting to OVN in pre_fork is not a good
idea.

This patch replaces the mechanism by using ovsdb-client to send a
transaction without connecting to the database and downloading the whole
content. The command does following, everything is on the server side:

 1) With timeout 0 it waits for neutron_pg_drop Port Group. If the PG is
    present, the transaction finishes and nothing happens.

 2) If the PG is not present, it times out immediately and commits new
    entries that effectivelly creates neutron_pg_drop Port Group with
    implicit ACLs to block ingress and egress traffic.

Closes-Bug: #1991579
Co-Authored-By: Terry Wilson <twilson@redhat.com>

Change-Id: I27af495f96a3ea88dd31345dbfb55f1be8faabd6
This commit is contained in:
Jakub Libosvar 2022-10-03 19:32:02 +00:00
parent f764bf3738
commit 50eee19723
9 changed files with 315 additions and 291 deletions

View File

@ -27,6 +27,7 @@ from neutron_lib import context as n_context
from neutron_lib import exceptions as n_exc
from neutron_lib.plugins import directory
from neutron_lib.utils import net as n_utils
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log
from oslo_serialization import jsonutils
@ -56,6 +57,70 @@ PortExtraDHCPValidation = collections.namedtuple(
'PortExtraDHCPValidation', ['failed', 'invalid_ipv4', 'invalid_ipv6'])
class OvsdbClientCommand(object):
_CONNECTION = 0
_PRIVATE_KEY = 1
_CERTIFICATE = 2
_CA_AUTHORITY = 3
OVN_Northbound = "OVN_Northbound"
OVN_Southbound = "OVN_Southbound"
_db_settings = {
OVN_Northbound: {
_CONNECTION: ovn_conf.get_ovn_nb_connection,
_PRIVATE_KEY: ovn_conf.get_ovn_nb_private_key,
_CERTIFICATE: ovn_conf.get_ovn_nb_certificate,
_CA_AUTHORITY: ovn_conf.get_ovn_nb_ca_cert,
},
OVN_Southbound: {
_CONNECTION: ovn_conf.get_ovn_sb_connection,
_PRIVATE_KEY: ovn_conf.get_ovn_sb_private_key,
_CERTIFICATE: ovn_conf.get_ovn_sb_certificate,
_CA_AUTHORITY: ovn_conf.get_ovn_sb_ca_cert,
},
}
@classmethod
def run(cls, command):
"""Run custom ovsdb protocol command.
:param command: JSON object of ovsdb protocol command
"""
try:
db = command[0]
except IndexError:
raise KeyError(
_("%s or %s schema must be specified in the command %s" % (
cls.OVN_Northbound, cls.OVN_Southbound, command)))
if db not in (cls.OVN_Northbound, cls.OVN_Southbound):
raise KeyError(
_("%s or %s schema must be specified in the command %s" % (
cls.OVN_Northbound, cls.OVN_Southbound, command)))
cmd = ['ovsdb-client',
cls.COMMAND,
cls._db_settings[db][cls._CONNECTION](),
'--timeout',
str(ovn_conf.get_ovn_ovsdb_timeout())]
if cls._db_settings[db][cls._PRIVATE_KEY]():
cmd += ['-p', cls._db_settings[db][cls._PRIVATE_KEY](),
'-c', cls._db_settings[db][cls._CERTIFICATE](),
'-C', cls._db_settings[db][cls._CA_AUTHORITY]()]
cmd.append(jsonutils.dumps(command))
return processutils.execute(
*cmd,
log_errors=processutils.LOG_FINAL_ERROR)
class OvsdbClientTransactCommand(OvsdbClientCommand):
COMMAND = 'transact'
def ovn_name(id):
# The name of the OVN entry will be neutron-<UUID>
# This is due to the fact that the OVN application checks if the name
@ -713,3 +778,56 @@ def retry(max_=None):
reraise=True)(func)(*args, **kwargs)
return wrapper
return inner
def create_neutron_pg_drop():
"""Create neutron_pg_drop Port Group.
It uses ovsdb-client to send to server transact command using ovsdb
protocol that checks if the neutron_pg_drop row exists. If it exists
it times out immediatelly. If it doesn't exist then it creates the
Port_Group and default ACLs to drop all ingress and egress traffic.
"""
command = [
"OVN_Northbound", {
"op": "wait",
"timeout": 0,
"table": "Port_Group",
"where": [
["name", "==", constants.OVN_DROP_PORT_GROUP_NAME]
],
"until": "==",
"rows": []
}, {
"op": "insert",
"table": "ACL",
"row": {
"action": "drop",
"direction": "to-lport",
"match": "outport == @neutron_pg_drop && ip",
"priority": 1001
},
"uuid-name": "droptoport"
}, {
"op": "insert",
"table": "ACL",
"row": {
"action": "drop",
"direction": "from-lport",
"match": "inport == @neutron_pg_drop && ip",
"priority": 1001
},
"uuid-name": "dropfromport"
}, {
"op": "insert",
"table": "Port_Group",
"row": {
"name": constants.OVN_DROP_PORT_GROUP_NAME,
"acls": ["set", [
["named-uuid", "droptoport"],
["named-uuid", "dropfromport"]
]]
}
}]
OvsdbClientTransactCommand.run(command)

View File

@ -37,7 +37,6 @@ from neutron_lib.placement import utils as place_utils
from neutron_lib.plugins import directory
from neutron_lib.plugins.ml2 import api
from neutron_lib.utils import helpers
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_db import exception as os_db_exc
from oslo_log import log
@ -62,7 +61,6 @@ from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import impl_idl_ovn
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import maintenance
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_client
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_db_sync
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovsdb_monitor
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker
from neutron import service
from neutron.services.logapi.drivers.ovn import driver as log_driver
@ -287,47 +285,7 @@ class OVNMechanismDriver(api.MechanismDriver):
"""Pre-initialize the ML2/OVN driver."""
atexit.register(self._clean_hash_ring)
signal.signal(signal.SIGTERM, self._clean_hash_ring)
self._create_neutron_pg_drop()
def _create_neutron_pg_drop(self):
"""Create neutron_pg_drop Port Group.
The method creates a short living connection to the Northbound
database. Because of multiple controllers can attempt to create the
Port Group at the same time the transaction can fail and raise
RuntimeError. In such case, we make sure the Port Group was created,
otherwise the error is something else and it's raised to the caller.
"""
idl = ovsdb_monitor.OvnInitPGNbIdl.from_server(
ovn_conf.get_ovn_nb_connection(), self.nb_schema_helper, self)
# Only one server should try to create the port group
idl.set_lock('pg_drop_creation')
with ovsdb_monitor.short_living_ovsdb_api(
impl_idl_ovn.OvsdbNbOvnIdl, idl) as pre_ovn_nb_api:
try:
create_default_drop_port_group(pre_ovn_nb_api)
except KeyError:
# Due to a bug in python-ovs, we can send transactions before
# the initial OVSDB is populated in memory. This can break
# the AddCommand post_commit method which tries to return a
# row looked up by the newly commited row's uuid. Since we
# don't care about the return value from the PgAddCommand, we
# can just catch the KeyError and continue. This can be
# removed when the python-ovs bug is resolved.
pass
except RuntimeError as re:
# If we don't get the lock, and the port group didn't exist
# when we tried to create it, it might still have been
# created by another server and we just haven't gotten the
# update yet.
LOG.info("Waiting for Port Group %(pg)s to be created",
{'pg': ovn_const.OVN_DROP_PORT_GROUP_NAME})
if not idl.neutron_pg_drop_event.wait():
LOG.error("Port Group %(pg)s was not created in time",
{'pg': ovn_const.OVN_DROP_PORT_GROUP_NAME})
raise re
LOG.info("Porg Group %(pg)s was created by another server",
{'pg': ovn_const.OVN_DROP_PORT_GROUP_NAME})
ovn_utils.create_neutron_pg_drop()
@staticmethod
def should_post_fork_initialize(worker_class):
@ -1178,19 +1136,17 @@ class OVNMechanismDriver(api.MechanismDriver):
def delete_mac_binding_entries(self, external_ip):
"""Delete all MAC_Binding entries associated to this IP address"""
cmd = ['ovsdb-client', 'transact', ovn_conf.get_ovn_sb_connection(),
'--timeout', str(ovn_conf.get_ovn_ovsdb_timeout())]
cmd = [
"OVN_Southbound", {
"op": "delete",
"table": "MAC_Binding",
"where": [
["ip", "==", external_ip]
]
}
]
if ovn_conf.get_ovn_sb_private_key():
cmd += ['-p', ovn_conf.get_ovn_sb_private_key(), '-c',
ovn_conf.get_ovn_sb_certificate(), '-C',
ovn_conf.get_ovn_sb_ca_cert()]
cmd += ['["OVN_Southbound", {"op": "delete", "table": "MAC_Binding", '
'"where": [["ip", "==", "%s"]]}]' % external_ip]
return processutils.execute(*cmd,
log_errors=processutils.LOG_FINAL_ERROR)
return ovn_utils.OvsdbClientTransactCommand.run(cmd)
def update_segment_host_mapping(self, host, phy_nets):
"""Update SegmentHostMapping in DB"""
@ -1414,28 +1370,6 @@ def delete_agent(self, context, id, _driver=None):
if_exists=True).execute(check_error=True)
def create_default_drop_port_group(nb_idl):
pg_name = ovn_const.OVN_DROP_PORT_GROUP_NAME
if nb_idl.get_port_group(pg_name):
LOG.debug("Port Group %s already exists", pg_name)
return
with nb_idl.transaction(check_error=True) as txn:
# If drop Port Group doesn't exist yet, create it.
txn.add(nb_idl.pg_add(pg_name, acls=[], may_exist=True))
# Add ACLs to this Port Group so that all traffic is dropped.
acls = ovn_acl.add_acls_for_drop_port_group(pg_name)
for acl in acls:
txn.add(nb_idl.pg_acl_add(may_exist=True, **acl))
ports_with_pg = set()
for pg in nb_idl.get_sg_port_groups().values():
ports_with_pg.update(pg['ports'])
if ports_with_pg:
# Add the ports to the default Port Group
txn.add(nb_idl.pg_add_ports(pg_name, list(ports_with_pg)))
def get_availability_zones(cls, context, _driver, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):

View File

@ -33,7 +33,6 @@ from neutron_lib.plugins import directory
from neutron_lib.plugins import utils as p_utils
from neutron_lib.utils import helpers
from neutron_lib.utils import net as n_net
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log
from oslo_utils import excutils
@ -1687,16 +1686,16 @@ class OVNClient(object):
is refer to patch:
https://review.opendev.org/c/openstack/neutron/+/812805
"""
cmd = ['ovsdb-client', 'transact', ovn_conf.get_ovn_sb_connection(),
'--timeout', str(ovn_conf.get_ovn_ovsdb_timeout())]
if ovn_conf.get_ovn_sb_private_key():
cmd += ['-p', ovn_conf.get_ovn_sb_private_key(), '-c',
ovn_conf.get_ovn_sb_certificate(), '-C',
ovn_conf.get_ovn_sb_ca_cert()]
cmd += ['["OVN_Southbound", {"op": "delete", "table": "MAC_Binding", '
'"where": [["mac", "==", "%s"]]}]' % mac]
return processutils.execute(*cmd,
log_errors=processutils.LOG_FINAL_ERROR)
cmd = [
"OVN_Southbound", {
"op": "delete",
"table": "MAC_Binding",
"where": [
["mac", "==", mac]
]
}
]
return utils.OvsdbClientTransactCommand.run(cmd)
def _delete_lrouter_port(self, context, port_id, router_id=None, txn=None):
"""Delete a logical router port."""

View File

@ -13,7 +13,6 @@
# under the License.
import abc
import contextlib
import datetime
from neutron_lib import context as neutron_context
@ -581,17 +580,6 @@ class FIPAddDeleteEvent(row_event.RowEvent):
self.driver.delete_mac_binding_entries(row.external_ip)
class NeutronPgDropPortGroupCreated(row_event.WaitEvent):
"""WaitEvent for neutron_pg_drop Create event."""
def __init__(self, timeout=None):
table = 'Port_Group'
events = (self.ROW_CREATE,)
conditions = (('name', '=', ovn_const.OVN_DROP_PORT_GROUP_NAME),)
super(NeutronPgDropPortGroupCreated, self).__init__(
events, table, conditions, timeout=timeout)
self.event_name = 'PortGroupCreated'
class OvnDbNotifyHandler(row_event.RowEventHandler):
def __init__(self, driver):
self.driver = driver
@ -837,56 +825,6 @@ class OvnSbIdl(OvnIdlDistributedLock):
PortBindingChassisUpdateEvent(self.driver)])
class OvnInitPGNbIdl(OvnIdl):
"""Very limited OVN NB IDL.
This IDL is intended to be used only in initialization phase with short
living DB connections.
"""
tables = ['Port_Group', 'Logical_Switch_Port', 'ACL']
def __init__(self, driver, remote, schema):
super(OvnInitPGNbIdl, self).__init__(driver, remote, schema)
self.set_table_condition(
'Port_Group', [['name', '==', ovn_const.OVN_DROP_PORT_GROUP_NAME]])
self.neutron_pg_drop_event = NeutronPgDropPortGroupCreated(
timeout=ovn_conf.get_ovn_ovsdb_timeout())
self.notify_handler.watch_event(self.neutron_pg_drop_event)
def notify(self, event, row, updates=None):
# Go ahead and process events even if the lock is contended so we can
# know that some other server has created the drop group
self.notify_handler.notify(event, row, updates)
@classmethod
def from_server(cls, connection_string, helper, driver, pg_only=False):
if pg_only:
helper.register_table('Port_Group')
else:
for table in cls.tables:
helper.register_table(table)
return cls(driver, connection_string, helper)
@contextlib.contextmanager
def short_living_ovsdb_api(api_class, idl):
"""Context manager for short living connections to the database.
:param api_class: Class implementing the database calls
(e.g. from the impl_idl module)
:param idl: An instance of IDL class (e.g. instance of OvnNbIdl)
"""
conn = connection.Connection(
idl, timeout=ovn_conf.get_ovn_ovsdb_timeout())
api = api_class(conn)
try:
yield api
finally:
api.ovsdb_connection.stop()
def _check_and_set_ssl_files(schema_name):
if schema_name == 'OVN_Southbound':
priv_key_file = ovn_conf.get_ovn_sb_private_key()

View File

@ -0,0 +1,60 @@
# Copyright 2022 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils
from neutron.tests.functional import base
class TestCreateNeutronPgDrop(base.TestOVNFunctionalBase):
def test_already_existing(self):
# Make sure pre-fork initialize created the table
existing_pg = self.nb_api.pg_get(
ovn_const.OVN_DROP_PORT_GROUP_NAME).execute()
self.assertIsNotNone(existing_pg)
# make an attempt to create it again
utils.create_neutron_pg_drop()
pg = self.nb_api.pg_get(ovn_const.OVN_DROP_PORT_GROUP_NAME).execute()
self.assertEqual(existing_pg.uuid, pg.uuid)
def test_non_existing(self):
# Delete the neutron_pg_drop created by pre-fork initialize
self.nb_api.pg_del(ovn_const.OVN_DROP_PORT_GROUP_NAME).execute()
pg = self.nb_api.pg_get(ovn_const.OVN_DROP_PORT_GROUP_NAME).execute()
self.assertIsNone(pg)
utils.create_neutron_pg_drop()
pg = self.nb_api.pg_get(ovn_const.OVN_DROP_PORT_GROUP_NAME).execute()
self.assertIsNotNone(pg)
directions = ['to-lport', 'from-lport']
matches = ['outport == @neutron_pg_drop && ip',
'inport == @neutron_pg_drop && ip']
# Make sure ACLs are correct
self.assertEqual(2, len(pg.acls))
acl1, acl2 = pg.acls
self.assertEqual('drop', acl1.action)
self.assertIn(acl1.direction, directions)
directions.remove(acl1.direction)
self.assertIn(acl1.match, matches)
matches.remove(acl1.match)
self.assertEqual(directions[0], acl2.direction)
self.assertEqual('drop', acl2.action)
self.assertEqual(matches[0], acl2.match)

View File

@ -27,15 +27,12 @@ from neutron_lib.exceptions import agent as agent_exc
from oslo_config import cfg
from oslo_utils import uuidutils
from ovsdbapp.backend.ovs_idl import event
from ovsdbapp.tests.functional import base as ovs_base
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils
from neutron.common import utils as n_utils
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
from neutron.db import ovn_revision_numbers_db as db_rev
from neutron.plugins.ml2.drivers.ovn.mech_driver import mech_driver
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import impl_idl_ovn
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_client
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovsdb_monitor
from neutron.tests import base as tests_base
@ -794,59 +791,6 @@ class TestExternalPorts(base.TestOVNFunctionalBase):
self._test_external_port_update_switchdev(portbindings.VNIC_MACVTAP)
class TestCreateDefaultDropPortGroup(base.BaseLoggingTestCase,
ovs_base.FunctionalTestCase):
schemas = ['OVN_Southbound', 'OVN_Northbound']
PG_NAME = ovn_const.OVN_DROP_PORT_GROUP_NAME
def setUp(self):
super(TestCreateDefaultDropPortGroup, self).setUp()
ovn_conf.register_opts()
self.api = impl_idl_ovn.OvsdbNbOvnIdl(
self.connection['OVN_Northbound'])
self.addCleanup(self.api.pg_del(self.PG_NAME, if_exists=True).execute,
check_error=True)
def test_port_group_exists(self):
"""Test new port group is not added or modified.
If Port Group was not existent, acls would be added.
"""
self.api.pg_add(
self.PG_NAME, acls=[], may_exist=True).execute(check_error=True)
mech_driver.create_default_drop_port_group(self.api)
port_group = self.api.get_port_group(self.PG_NAME)
self.assertFalse(port_group.acls)
def _test_pg_with_ports(self, expected_ports=None):
expected_ports = expected_ports or []
mech_driver.create_default_drop_port_group(self.api)
port_group = self.api.get_port_group(self.PG_NAME)
self.assertCountEqual(
expected_ports, [port.name for port in port_group.ports])
def test_with_ports_available(self):
expected_ports = ['port1', 'port2']
testing_pg = 'testing'
testing_ls = 'testing'
with self.api.transaction(check_error=True) as txn:
txn.add(self.api.pg_add(
testing_pg,
external_ids={ovn_const.OVN_SG_EXT_ID_KEY: 'foo'}))
txn.add(self.api.ls_add(testing_ls))
port_uuids = [txn.add(self.api.lsp_add(testing_ls, port))
for port in expected_ports]
txn.add(self.api.pg_add_ports(testing_pg, port_uuids))
self.addCleanup(self.api.pg_del(testing_pg, if_exists=True).execute,
check_error=True)
self._test_pg_with_ports(expected_ports)
def test_without_ports(self):
self._test_pg_with_ports(expected_ports=[])
class TestSecurityGroup(base.TestOVNFunctionalBase):
def setUp(self):

View File

@ -15,6 +15,7 @@
from collections import namedtuple
from os import path
import shlex
from unittest import mock
import fixtures
@ -22,7 +23,9 @@ import neutron_lib
from neutron_lib.api.definitions import extra_dhcp_opt as edo_ext
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants as n_const
from oslo_concurrency import processutils
from oslo_config import cfg
import testtools
from neutron.common.ovn import constants
from neutron.common.ovn import utils
@ -810,3 +813,116 @@ class TestRetryDecorator(base.BaseTestCase):
# number of exceptions + one successful call
self.assertEqual(number_of_exceptions + 1, method.call_count)
class TestOvsdbClientCommand(base.BaseTestCase):
class OvsdbClientTestCommand(utils.OvsdbClientCommand):
COMMAND = 'test'
def setUp(self):
super().setUp()
self.nb_connection = 'ovn_nb_connection'
self.sb_connection = 'ovn_sb_connection'
ovn_conf.register_opts()
ovn_conf.cfg.CONF.set_default(
'ovn_nb_connection',
self.nb_connection,
group='ovn')
ovn_conf.cfg.CONF.set_default(
'ovn_sb_connection',
self.sb_connection,
group='ovn')
self.m_exec = mock.patch.object(processutils, 'execute').start()
def assert_exec_call(self, expected):
self.m_exec.assert_called_with(
*shlex.split(expected), log_errors=processutils.LOG_FINAL_ERROR)
def test_run_northbound(self):
expected = ('ovsdb-client %s %s --timeout 180 '
'\'["OVN_Northbound", "foo"]\'' % (
self.OvsdbClientTestCommand.COMMAND,
self.nb_connection))
self.OvsdbClientTestCommand.run(['OVN_Northbound', 'foo'])
self.assert_exec_call(expected)
def test_run_southbound(self):
expected = ('ovsdb-client %s %s --timeout 180 '
'\'["OVN_Southbound", "foo"]\'' % (
self.OvsdbClientTestCommand.COMMAND,
self.sb_connection))
self.OvsdbClientTestCommand.run(['OVN_Southbound', 'foo'])
self.assert_exec_call(expected)
def test_run_northbound_with_ssl(self):
private_key = 'north_pk'
certificate = 'north_cert'
ca_auth = 'north_ca_auth'
ovn_conf.cfg.CONF.set_default(
'ovn_nb_private_key',
private_key,
group='ovn')
ovn_conf.cfg.CONF.set_default(
'ovn_nb_certificate',
certificate,
group='ovn')
ovn_conf.cfg.CONF.set_default(
'ovn_nb_ca_cert',
ca_auth,
group='ovn')
expected = ('ovsdb-client %s %s --timeout 180 '
'-p %s '
'-c %s '
'-C %s '
'\'["OVN_Northbound", "foo"]\'' % (
self.OvsdbClientTestCommand.COMMAND,
self.nb_connection,
private_key,
certificate,
ca_auth))
self.OvsdbClientTestCommand.run(['OVN_Northbound', 'foo'])
self.assert_exec_call(expected)
def test_run_southbound_with_ssl(self):
private_key = 'north_pk'
certificate = 'north_cert'
ca_auth = 'north_ca_auth'
ovn_conf.cfg.CONF.set_default(
'ovn_sb_private_key',
private_key,
group='ovn')
ovn_conf.cfg.CONF.set_default(
'ovn_sb_certificate',
certificate,
group='ovn')
ovn_conf.cfg.CONF.set_default(
'ovn_sb_ca_cert',
ca_auth,
group='ovn')
expected = ('ovsdb-client %s %s --timeout 180 '
'-p %s '
'-c %s '
'-C %s '
'\'["OVN_Southbound", "foo"]\'' % (
self.OvsdbClientTestCommand.COMMAND,
self.sb_connection,
private_key,
certificate,
ca_auth))
self.OvsdbClientTestCommand.run(['OVN_Southbound', 'foo'])
self.assert_exec_call(expected)
def test_run_empty_list(self):
with testtools.ExpectedException(KeyError):
self.OvsdbClientTestCommand.run([])
def test_run_bad_schema(self):
with testtools.ExpectedException(KeyError):
self.OvsdbClientTestCommand.run(['foo'])

View File

@ -26,7 +26,6 @@ from ovs import poller
from ovs.stream import Stream
from ovsdbapp.backend.ovs_idl import connection
from ovsdbapp.backend.ovs_idl import idlutils
import testtools
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import hash_ring_manager
@ -723,36 +722,3 @@ class TestChassisEvent(base.BaseTestCase):
# after it became a Gateway chassis
self._test_handle_ha_chassis_group_changes_create(
self.event.ROW_UPDATE)
class TestShortLivingOvsdbApi(base.BaseTestCase):
def setUp(self):
ovn_conf.register_opts()
super().setUp()
def test_context(self):
api_class = mock.Mock()
idl = mock.Mock()
with ovsdb_monitor.short_living_ovsdb_api(api_class, idl) as api:
self.assertEqual(api_class.return_value, api)
api.ovsdb_connection.stop.assert_called_once_with()
def test_context_error(self):
api_class = mock.Mock()
idl = mock.Mock()
exc = RuntimeError()
try:
with ovsdb_monitor.short_living_ovsdb_api(api_class, idl) as api:
self.assertEqual(api_class.return_value, api)
raise exc
except RuntimeError as re:
self.assertIs(exc, re)
api.ovsdb_connection.stop.assert_called_once_with()
def test_api_class_error(self):
api_class = mock.Mock(side_effect=RuntimeError())
idl = mock.Mock()
with testtools.ExpectedException(RuntimeError):
with ovsdb_monitor.short_living_ovsdb_api(api_class, idl):
# Make sure it never enter the api context
raise Exception("API class instantiated but it should not")

View File

@ -61,7 +61,6 @@ from neutron.plugins.ml2.drivers.ovn.agent import neutron_agent
from neutron.plugins.ml2.drivers.ovn.mech_driver import mech_driver
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import impl_idl_ovn
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_client
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovsdb_monitor
from neutron.plugins.ml2.drivers import type_geneve # noqa
from neutron.services.revisions import revision_plugin
from neutron.tests.unit.extensions import test_segment
@ -216,56 +215,6 @@ class TestOVNMechanismDriverBase(MechDriverSetupBase,
class TestOVNMechanismDriver(TestOVNMechanismDriverBase):
@mock.patch.object(ovsdb_monitor.OvnInitPGNbIdl, 'from_server')
@mock.patch.object(ovsdb_monitor, 'short_living_ovsdb_api')
def test__create_neutron_pg_drop_non_existing(
self, m_ovsdb_api_con, m_from_server):
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
m_ovsdb_api.get_port_group.return_value = None
self.mech_driver._create_neutron_pg_drop()
self.assertEqual(1, m_ovsdb_api.get_port_group.call_count)
self.assertTrue(m_ovsdb_api.transaction.return_value.__enter__.called)
@mock.patch.object(ovsdb_monitor.OvnInitPGNbIdl, 'from_server')
@mock.patch.object(ovsdb_monitor, 'short_living_ovsdb_api')
def test__create_neutron_pg_drop_existing(
self, m_ovsdb_api_con, m_from_server):
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
m_ovsdb_api.get_port_group.return_value = 'foo'
self.mech_driver._create_neutron_pg_drop()
self.assertEqual(1, m_ovsdb_api.get_port_group.call_count)
self.assertFalse(m_ovsdb_api.transaction.return_value.__enter__.called)
@mock.patch.object(ovsdb_monitor.OvnInitPGNbIdl, 'from_server')
@mock.patch.object(ovsdb_monitor, 'short_living_ovsdb_api')
def test__create_neutron_pg_drop_created_meanwhile(
self, m_ovsdb_api_con, m_from_server):
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
m_ovsdb_api.get_port_group.return_value = None
m_ovsdb_api.transaction.return_value.__exit__.side_effect = (
RuntimeError())
idl = m_from_server.return_value
idl.neutron_pg_drop_event.wait.return_value = True
result = self.mech_driver._create_neutron_pg_drop()
idl.neutron_pg_drop_event.wait.assert_called_once()
# If something else creates the port group, just return
self.assertIsNone(result)
@mock.patch.object(ovsdb_monitor.OvnInitPGNbIdl, 'from_server')
@mock.patch.object(ovsdb_monitor, 'short_living_ovsdb_api')
def test__create_neutron_pg_drop_error(
self, m_ovsdb_api_con, m_from_server):
m_ovsdb_api = m_ovsdb_api_con.return_value.__enter__.return_value
m_ovsdb_api.get_port_group.return_value = None
m_ovsdb_api.transaction.return_value.__exit__.side_effect = (
RuntimeError())
idl = m_from_server.return_value
idl.neutron_pg_drop_event.wait.return_value = False
self.assertRaises(RuntimeError,
self.mech_driver._create_neutron_pg_drop)
idl.neutron_pg_drop_event.wait.assert_called_once()
def test__get_max_tunid_no_key_set(self):
self.mech_driver.nb_ovn.nb_global.options.get.return_value = None
self.assertIsNone(self.mech_driver._get_max_tunid())