Browse Source

Optimize inconsistency detection (Part 2)

This patch is introducing the maintenance mechanism for the database
consistency work.

This new mechanism is based on the futurist library [0].

The ``check_for_incosistencies`` task will compare the
"ovn_revision_numbers" table with the Neutron's "standardattributes" table
and check if there's any difference between the revision numbers. If
some inconsistency is found, the resource will be sync'd with the
latest version in neutron. A OVSDB lock is used for this task as well
to guarantee that only one check is running in the entirely cluster.

The maintenance mechanism runs on its own OVN worker [1] so the OVSDB
lock doesn't conflict with the OVSDB monitor one.

Currently in this patch, only the ``network`` resource is being
addressed to keep it small and easy to review. New resources will be
added on subsequently patches.

[0] https://github.com/openstack/futurist/
[1]
https://docs.openstack.org/networking-ovn/latest/contributor/design/ovn_worker.html

Change-Id: I61757e70460b83b93429ddf4f7792af7cc2d7ab5
changes/33/518033/15
Lucas Alvares Gomes 5 years ago
parent
commit
cea1801540
  1. 164
      networking_ovn/common/maintenance.py
  2. 55
      networking_ovn/db/maintenance.py
  3. 10
      networking_ovn/ml2/mech_driver.py
  4. 89
      networking_ovn/tests/unit/common/test_maintenance.py
  5. 63
      networking_ovn/tests/unit/db/test_maintenance.py
  6. 8
      releasenotes/notes/maintenance-thread-ee65c1ad317204c7.yaml
  7. 1
      requirements.txt

164
networking_ovn/common/maintenance.py

@ -0,0 +1,164 @@
# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import inspect
import threading
from futurist import periodics
from neutron.common import config as n_conf
from neutron_lib import context as n_context
from neutron_lib import worker
from oslo_log import log
from networking_ovn.common import constants as ovn_const
from networking_ovn.common import utils
from networking_ovn.db import maintenance as db_maint
from networking_ovn.db import revision as db_rev
LOG = log.getLogger(__name__)
DB_CONSISTENCY_CHECK_INTERVAL = 300 # 5 minutes
class MaintenanceWorker(worker.BaseWorker):
def start(self):
super(MaintenanceWorker, self).start()
# NOTE(twilson) The super class will trigger the post_fork_initialize
# in the driver, which starts the connection/IDL notify loop which
# keeps the process from exiting
def stop(self):
"""Stop service."""
super(MaintenanceWorker, self).stop()
def wait(self):
"""Wait for service to complete."""
super(MaintenanceWorker, self).wait()
@staticmethod
def reset():
n_conf.reset_service()
class MaintenanceThread(object):
def __init__(self):
self._callables = []
self._thread = None
self._worker = None
def add_periodics(self, obj):
for name, member in inspect.getmembers(obj):
if periodics.is_periodic(member):
LOG.debug('Periodic task found: %(owner)s.%(member)s',
{'owner': obj.__class__.__name__, 'member': name})
self._callables.append((member, (), {}))
def start(self):
if self._thread is None:
self._worker = periodics.PeriodicWorker(self._callables)
self._thread = threading.Thread(target=self._worker.start)
self._thread.daemon = True
self._thread.start()
def stop(self):
self._worker.stop()
self._worker.wait()
self._thread.join()
self._worker = self._thread = None
class DBInconsistenciesPeriodics(object):
def __init__(self, ovn_client):
self._ovn_client = ovn_client
# FIXME(lucasagomes): We should not be accessing private
# attributes like that, perhaps we should extend the OVNClient
# class and create an interface for the locks ?
self._nb_idl = self._ovn_client._nb_idl
self._idl = self._nb_idl.idl
self._idl.set_lock('ovn_db_inconsistencies_periodics')
@property
def has_lock(self):
return not self._idl.is_lock_contended
def _fix_create_update_network(self, row):
# Get the latest version of the resource in Neutron DB
admin_context = n_context.get_admin_context()
n_db_obj = self._ovn_client._plugin.get_network(
admin_context, row.resource_uuid)
ovn_net = self._nb_idl.get_lswitch(utils.ovn_name(row.resource_uuid))
if not ovn_net:
# If the resource doesn't exist in the OVN DB, create it.
self._ovn_client.create_network(n_db_obj)
else:
ext_ids = getattr(ovn_net, 'external_ids', {})
ovn_revision = int(ext_ids.get(
ovn_const.OVN_REV_NUM_EXT_ID_KEY, -1))
# If the resource exist in the OVN DB but the revision
# number is different from Neutron DB, updated it.
if ovn_revision != n_db_obj['revision_number']:
self._ovn_client.update_network(n_db_obj)
else:
# If the resource exist and the revision number
# is equal on both databases just bump the revision on
# the cache table.
db_rev.bump_revision(n_db_obj, ovn_const.TYPE_NETWORKS)
def _fix_delete_network(self, row):
ovn_net = self._nb_idl.get_lswitch(utils.ovn_name(row.resource_uuid))
if not ovn_net:
db_rev.delete_revision(row.resource_uuid)
else:
self._ovn_client.delete_network(row.resource_uuid)
@periodics.periodic(spacing=DB_CONSISTENCY_CHECK_INTERVAL,
run_immediately=True)
def check_for_inconsistencies(self):
# Only the worker holding a valid lock within OVSDB will run
# this periodic
if not self.has_lock:
return
create_update_inconsistencies = db_maint.get_inconsistent_resources()
delete_inconsistencies = db_maint.get_deleted_resources()
if not any([create_update_inconsistencies, delete_inconsistencies]):
return
LOG.warning('Inconsistencies found in the database!')
# Fix the create/update resources inconsistencies
for row in create_update_inconsistencies:
try:
if row.resource_type == ovn_const.TYPE_NETWORKS:
self._fix_create_update_network(row)
except Exception:
LOG.exception('Failed to fix resource %(res_uuid)s '
'(type: %(res_type)s)',
{'res_uuid': row.resource_uuid,
'res_type': row.resource_type})
# Fix the deleted resources inconsistencies
for row in delete_inconsistencies:
try:
if row.resource_type == ovn_const.TYPE_NETWORKS:
self._fix_delete_network(row)
except Exception:
LOG.exception('Failed to fix deleted resource %(res_uuid)s '
'(type: %(res_type)s)',
{'res_uuid': row.resource_uuid,
'res_type': row.resource_type})

55
networking_ovn/db/maintenance.py

@ -0,0 +1,55 @@
# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.db import standard_attr
from neutron_lib.db import api as db_api
from networking_ovn.db import models
def get_inconsistent_resources():
"""Get a list of inconsistent resources.
:returns: A list of objects which the revision number from the
ovn_revision_number and standardattributes tables differs.
"""
session = db_api.get_reader_session()
with session.begin():
return (session.query(models.OVNRevisionNumbers).
join(
standard_attr.StandardAttribute,
models.OVNRevisionNumbers.standard_attr_id ==
standard_attr.StandardAttribute.id).
filter(
models.OVNRevisionNumbers.revision_number !=
standard_attr.StandardAttribute.revision_number).all())
def get_deleted_resources():
"""Get a list of resources that failed to be deleted in OVN.
Get a list of resources that have been deleted from neutron but not
in OVN. Once a resource is deleted in Neutron the ``standard_attr_id``
foreign key in the ovn_revision_numbers table will be set to NULL.
Upon successfully deleting the resource in OVN the entry in the
ovn_revision_number should also be deleted but if something fails
the entry will be kept and returned in this list so the maintenance
thread can later fix it.
"""
session = db_api.get_reader_session()
with session.begin():
return session.query(models.OVNRevisionNumbers).filter_by(
standard_attr_id=None).all()

10
networking_ovn/ml2/mech_driver.py

@ -36,6 +36,7 @@ from networking_ovn._i18n import _
from networking_ovn.common import acl as ovn_acl
from networking_ovn.common import config
from networking_ovn.common import constants as ovn_const
from networking_ovn.common import maintenance
from networking_ovn.common import ovn_client
from networking_ovn.common import utils
from networking_ovn.db import revision as db_rev
@ -93,6 +94,7 @@ class OVNMechanismDriver(api.MechanismDriver):
self._sb_ovn = None
self._plugin_property = None
self._ovn_client_inst = None
self._maintenance_thread = None
self.sg_enabled = ovn_acl.is_sg_enabled()
self._post_fork_event = threading.Event()
if cfg.CONF.SECURITYGROUP.firewall_driver:
@ -187,6 +189,12 @@ class OVNMechanismDriver(api.MechanismDriver):
)
self.sb_synchronizer.sync()
if trigger.im_class == maintenance.MaintenanceWorker:
self._maintenance_thread = maintenance.MaintenanceThread()
self._maintenance_thread.add_periodics(
maintenance.DBInconsistenciesPeriodics(self._ovn_client))
self._maintenance_thread.start()
def _create_security_group(self, resource, event, trigger,
security_group, **kwargs):
self._ovn_client.create_security_group(security_group)
@ -612,7 +620,7 @@ class OVNMechanismDriver(api.MechanismDriver):
workers, can return a sequence of worker instances.
"""
# See doc/source/design/ovn_worker.rst for more details.
return [ovsdb_monitor.OvnWorker()]
return [ovsdb_monitor.OvnWorker(), maintenance.MaintenanceWorker()]
def _update_subport_host_if_needed(self, port_id):
parent_port = self._ovn_client.get_parent_port(port_id)

89
networking_ovn/tests/unit/common/test_maintenance.py

@ -0,0 +1,89 @@
# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.tests.unit.plugins.ml2 import test_plugin
from neutron_lib.db import api as db_api
from networking_ovn.common import constants
from networking_ovn.common import maintenance
from networking_ovn.db import maintenance as db_maint
from networking_ovn.db import revision as db_rev
from networking_ovn.tests.unit.db import base as db_base
@mock.patch.object(maintenance.DBInconsistenciesPeriodics,
'has_lock', lambda _: True)
class TestDBInconsistenciesPeriodics(db_base.DBTestCase,
test_plugin.Ml2PluginV2TestCase):
def setUp(self):
super(TestDBInconsistenciesPeriodics, self).setUp()
self.net = self._make_network(
self.fmt, name='net1', admin_state_up=True)['network']
self.fake_ovn_client = mock.Mock()
self.periodic = maintenance.DBInconsistenciesPeriodics(
self.fake_ovn_client)
self.session = db_api.get_writer_session()
@mock.patch.object(maintenance.DBInconsistenciesPeriodics,
'_fix_create_update_network')
@mock.patch.object(db_maint, 'get_inconsistent_resources')
def test_check_for_inconsistencies(self, mock_get_incon_res, mock_fix_net):
fake_row = mock.Mock(resource_type=constants.TYPE_NETWORKS)
mock_get_incon_res.return_value = [fake_row, ]
self.periodic.check_for_inconsistencies()
mock_fix_net.assert_called_once_with(fake_row)
def _test_fix_create_update_network(self, ovn_rev, neutron_rev):
self.net['revision_number'] = neutron_rev
# Create an entry to the revision_numbers table and assert the
# initial revision_number for our test object is the expected
db_rev.create_initial_revision(
self.net['id'], constants.TYPE_NETWORKS, self.session,
revision_number=ovn_rev)
row = self.get_revision_row(self.net['id'])
self.assertEqual(ovn_rev, row.revision_number)
if ovn_rev < 0:
self.fake_ovn_client._nb_idl.get_lswitch.return_value = None
else:
fake_ls = mock.Mock(external_ids={
constants.OVN_REV_NUM_EXT_ID_KEY: ovn_rev})
self.fake_ovn_client._nb_idl.get_lswitch.return_value = fake_ls
self.fake_ovn_client._plugin.get_network.return_value = self.net
self.periodic._fix_create_update_network(row)
# Since the revision number was < 0, make sure create_network()
# is invoked with the latest version of the object in the neutron
# database
if ovn_rev < 0:
self.fake_ovn_client.create_network.assert_called_once_with(
self.net)
# If the revision number is > 0 it means that the object already
# exist and we just need to update to match the latest in the
# neutron database so, update_network() should be called.
else:
self.fake_ovn_client.update_network.assert_called_once_with(
self.net)
def test_fix_network_create(self):
self._test_fix_create_update_network(ovn_rev=-1, neutron_rev=2)
def test_fix_network_update(self):
self._test_fix_create_update_network(ovn_rev=5, neutron_rev=7)

63
networking_ovn/tests/unit/db/test_maintenance.py

@ -0,0 +1,63 @@
# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.services.revisions import revision_plugin
from neutron.tests.unit.plugins.ml2 import test_plugin
from neutron_lib.db import api as db_api
from networking_ovn.common import constants
from networking_ovn.db import maintenance as db_maint
from networking_ovn.db import revision as db_rev
from networking_ovn.tests.unit.db import base as db_base
class TestMaintenance(db_base.DBTestCase, test_plugin.Ml2PluginV2TestCase):
def setUp(self):
super(TestMaintenance, self).setUp()
self.net = self._make_network(
self.fmt, name='net1', admin_state_up=True)['network']
self.session = db_api.get_writer_session()
revision_plugin.RevisionPlugin()
def test_get_inconsistent_resources(self):
# Set the intial revision to -1 to force it to be incosistent
db_rev.create_initial_revision(
self.net['id'], constants.TYPE_NETWORKS, self.session,
revision_number=-1)
res = db_maint.get_inconsistent_resources()
self.assertEqual(1, len(res))
self.assertEqual(self.net['id'], res[0].resource_uuid)
def test_get_inconsistent_resources_consistent(self):
# Set the initial revision to 0 which is the initial revision_number
# for recently created resources
db_rev.create_initial_revision(
self.net['id'], constants.TYPE_NETWORKS, self.session,
revision_number=0)
res = db_maint.get_inconsistent_resources()
# Assert nothing is inconsistent
self.assertEqual([], res)
def test_get_deleted_resources(self):
db_rev.create_initial_revision(
self.net['id'], constants.TYPE_NETWORKS, self.session,
revision_number=0)
self._delete('networks', self.net['id'])
res = db_maint.get_deleted_resources()
self.assertEqual(1, len(res))
self.assertEqual(self.net['id'], res[0].resource_uuid)
self.assertIsNone(res[0].standard_attr_id)

8
releasenotes/notes/maintenance-thread-ee65c1ad317204c7.yaml

@ -0,0 +1,8 @@
---
features:
- |
Added a new mechanism that periodically detects and fix
inconsistencies between resources in the Neutron and OVN database.
upgrade:
- |
Adds a new dependency on the Oslo Futurist library.

1
requirements.txt

@ -2,6 +2,7 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
futurist>=1.2.0 # Apache-2.0
netaddr>=0.7.18 # BSD
neutron-lib>=1.11.0 # Apache-2.0
oslo.config>=5.1.0 # Apache-2.0

Loading…
Cancel
Save