[OVN] OVN DB schema update aware maintenance tasks

This patch introduces a new mechanism to allow rerunning maintenance
tasks upon an OVN database schema change to avoid a service restart.

As an example, the "migrate_to_port_groups" maintenance task will run
again when the database schema is updated. In case of a migration from
an OVN version without port groups support to a version that supports
it, the OVN driver will migrate the code automatically without the need
of a service restart.

Closes-Bug: #1864641
Change-Id: I520a3de105b4c6924908e099a3b8d60c3280f499
Signed-off-by: Lucas Alvares Gomes <lucasagomes@gmail.com>
This commit is contained in:
Lucas Alvares Gomes 2020-02-25 15:14:54 +00:00
parent 1f79ce8736
commit 9e416224b0
3 changed files with 126 additions and 1 deletions

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import inspect
import threading
@ -24,6 +25,7 @@ from neutron_lib import exceptions as n_exc
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
from ovsdbapp.backend.ovs_idl import event as row_event
from neutron.common.ovn import constants as ovn_const
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
@ -68,7 +70,60 @@ class MaintenanceThread(object):
self._worker = self._thread = None
class DBInconsistenciesPeriodics(object):
def rerun_on_schema_updates(func):
"""Tasks decorated with this will rerun upon database version updates."""
func._rerun_on_schema_updates = True
return func
class OVNNBDBReconnectionEvent(row_event.RowEvent):
"""Event listening to reconnections from OVN Northbound DB."""
def __init__(self, driver, version):
self.driver = driver
self.version = version
table = 'Connection'
events = (self.ROW_CREATE,)
super(OVNNBDBReconnectionEvent, self).__init__(events, table, None)
self.event_name = self.__class__.__name__
def run(self, event, row, old):
curr_version = self.driver.get_ovn_nbdb_version()
if self.version != curr_version:
self.driver.nbdb_schema_updated_hook()
self.version = curr_version
class SchemaAwarePeriodicsBase(object):
def __init__(self, ovn_client):
self._nb_idl = ovn_client._nb_idl
self._set_schema_aware_periodics()
self._nb_idl.idl.notify_handler.watch_event(OVNNBDBReconnectionEvent(
self, self.get_ovn_nbdb_version()))
def get_ovn_nbdb_version(self):
return self._nb_idl.idl._db.version
def _set_schema_aware_periodics(self):
self._schema_aware_periodics = []
for name, member in inspect.getmembers(self):
if not inspect.ismethod(member):
continue
schema_upt = getattr(member, '_rerun_on_schema_updates', None)
if schema_upt and periodics.is_periodic(member):
LOG.debug('Schema aware periodic task found: '
'%(owner)s.%(member)s',
{'owner': self.__class__.__name__, 'member': name})
self._schema_aware_periodics.append(member)
@abc.abstractmethod
def nbdb_schema_updated_hook(self):
"""Hook invoked upon OVN NB schema is updated."""
class DBInconsistenciesPeriodics(SchemaAwarePeriodicsBase):
def __init__(self, ovn_client):
self._ovn_client = ovn_client
@ -79,6 +134,7 @@ class DBInconsistenciesPeriodics(object):
self._idl = self._nb_idl.idl
self._idl.set_lock('ovn_db_inconsistencies_periodics')
self._sync_timer = timeutils.StopWatch()
super(DBInconsistenciesPeriodics, self).__init__(ovn_client)
self._resources_func_map = {
ovn_const.TYPE_NETWORKS: {
@ -140,6 +196,21 @@ class DBInconsistenciesPeriodics(object):
def has_lock(self):
return not self._idl.is_lock_contended
def nbdb_schema_updated_hook(self):
if not self.has_lock:
return
for func in self._schema_aware_periodics:
LOG.debug('OVN Northbound DB schema version was updated,'
'invoking "%s"', func.__name__)
try:
func()
except periodics.NeverAgain:
pass
except Exception:
LOG.exception(
'Unknown error while executing "%s"', func.__name__)
def _fix_create_update(self, context, row):
res_map = self._resources_func_map[row.resource_type]
try:
@ -207,6 +278,7 @@ class DBInconsistenciesPeriodics(object):
# is held by some other neutron-server instance in the cloud, we'll attempt
# to perform the migration every 10 seconds until completed.
@periodics.periodic(spacing=10, run_immediately=True)
@rerun_on_schema_updates
def migrate_to_port_groups(self):
"""Perform the migration from Address Sets to Port Groups. """
# TODO(dalvarez): Remove this in U cycle when we're sure that all

View File

@ -268,6 +268,11 @@ class OvnDbNotifyHandler(event.RowEventHandler):
class BaseOvnIdl(connection.OvsdbIdl):
def __init__(self, remote, schema):
self.notify_handler = event.RowEventHandler()
super(BaseOvnIdl, self).__init__(remote, schema)
@classmethod
def from_server(cls, connection_string, schema_name):
_check_and_set_ssl_files(schema_name)
@ -275,6 +280,9 @@ class BaseOvnIdl(connection.OvsdbIdl):
helper.register_all()
return cls(connection_string, helper)
def notify(self, event, row, updates=None):
self.notify_handler.notify(event, row, updates)
class BaseOvnSbIdl(connection.OvsdbIdl):
@classmethod

View File

@ -30,6 +30,51 @@ from neutron.tests.unit.plugins.ml2 import test_security_group as test_sg
from neutron.tests.unit import testlib_api
class TestSchemaAwarePeriodicsBase(testlib_api.SqlTestCaseLight):
def test__set_schema_aware_periodics(self):
class TestClass(maintenance.SchemaAwarePeriodicsBase):
@periodics.periodic(spacing=1)
@maintenance.rerun_on_schema_updates
def test_method_0(self):
pass
@periodics.periodic(spacing=1)
def test_method_1(self):
pass
@periodics.periodic(spacing=1)
@maintenance.rerun_on_schema_updates
def test_method_2(self):
pass
obj = TestClass(mock.Mock())
# Assert that test_method_0 and test_method_2 are schema
# aware periodics
self.assertEqual([obj.test_method_0, obj.test_method_2],
obj._schema_aware_periodics)
@mock.patch.object(maintenance.SchemaAwarePeriodicsBase,
'get_ovn_nbdb_version')
def test_nbdb_schema_updated_hook(self, mock_get_ver):
initial_ver = '1.0.0'
obj = mock.Mock()
obj.get_ovn_nbdb_version.side_effect = (initial_ver, '1.1.0')
obj_evt = maintenance.OVNNBDBReconnectionEvent(obj, initial_ver)
# First run() will be called with the initial version (see
# side_effect), so the hook should not be invoked since the
# versions didn't change
obj_evt.run('update', mock.Mock(), mock.Mock())
self.assertFalse(obj.nbdb_schema_updated_hook.called)
# Second run() will be called with a different version, the
# hook should now be invoked
obj_evt.run('update', mock.Mock(), mock.Mock())
self.assertTrue(obj.nbdb_schema_updated_hook.called)
@mock.patch.object(maintenance.DBInconsistenciesPeriodics,
'has_lock', mock.PropertyMock(return_value=True))
class TestDBInconsistenciesPeriodics(testlib_api.SqlTestCaseLight,