Browse Source

Base mechanism for full sync

This patch introduce a base class containing for all drivers. This
class is responsible for returning all resources of a specific
types.

Change-Id: I6e855422ce9da70c333013985b7d25a2a63bb658
Partial-bug: #1713697
tags/12.0.0.0b3
Rajiv Kumar 3 years ago
committed by Rajiv Kumar
parent
commit
24675835ee
6 changed files with 303 additions and 0 deletions
  1. +15
    -0
      doc/source/contributor/drivers_architecture.rst
  2. +56
    -0
      networking_odl/common/exceptions.py
  3. +73
    -0
      networking_odl/journal/base_driver.py
  4. +14
    -0
      networking_odl/journal/full_sync.py
  5. +89
    -0
      networking_odl/tests/unit/journal/test_base_driver.py
  6. +56
    -0
      networking_odl/tests/unit/journal/test_full_sync.py

+ 15
- 0
doc/source/contributor/drivers_architecture.rst View File

@@ -55,6 +55,21 @@ The journal entry is recorded in the pre-commit phase (whenever applicable) so
that in case of a commit failure the journal entry gets aborted along with the
original operation, and there's nothing extra needed.

The *get_resources_for_full_sync* method is defined in the ResourceBaseDriver
class, it fetches all the resources needed for full sync, based on resource
type. To override the default behaviour of *get_resources_for_full_sync*
define it in driver class, For example

#. L2 gateway driver needs to provide customized method for filtering of
fetched gateway connection information from database. Neutron
defines *l2_gateway_id* for a l2 gateway connection but ODL expects
*gateway_id*, these kind of pre or post processing can be done in this
method.
#. For lbaas driver, as per default resource fetching mechanism, it looks for
*get_member* instead the lbaas plugin defines *get_pool_member*, by
overriding the *get_resources* method, it is possible to solve this
inconsistency.

Journal Entry Lifecycle
-----------------------



+ 56
- 0
networking_odl/common/exceptions.py View File

@@ -0,0 +1,56 @@
# Copyright (c) 2017 NEC Corp
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from neutron_lib import exceptions

from neutron._i18n import _


class NetworkingODLException(exceptions.NeutronException):
"""Base Networking-ODL exception."""
pass


class FullSyncError(NetworkingODLException):
"""Base exception for Full Sync"""
pass


class UnsupportedResourceType(NetworkingODLException):
"""An exception for unsupported resource for full sync and recovery"""
pass


class PluginMethodNotFound(NetworkingODLException, AttributeError):
"""An exception indicating plugin method was not found.

Specialization of AttributeError and NetworkingODLException indicating
requested plugin method could not be found.

:param method: Name of the method being accessed.
:param plugin: Plugin name expected to have required method.
"""
message = _("%(method)s not found in %(plugin)s")


class ResourceNotRegistered(FullSyncError):
"""An exception indicating resource is not registered for maintenance task.

Specialization of FullSync error indicating resource is not registered
for maintenance tasks full sync and recovery.

:param resource_type: Resource type not registered for maintenance task.
"""
message = _("%(resource_type)s resource is not registered for maintenance")

+ 73
- 0
networking_odl/journal/base_driver.py View File

@@ -0,0 +1,73 @@
# Copyright (c) 2017 NEC Corp
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from neutron_lib.plugins import directory
from oslo_log import log as logging

from networking_odl.common import exceptions

LOG = logging.getLogger(__name__)

ALL_RESOURCES = {}


def get_driver(resource_type):
try:
return ALL_RESOURCES[resource_type]
except KeyError:
raise exceptions.ResourceNotRegistered(resource_type=resource_type)


class ResourceBaseDriver(object):
"""Base class for all the drivers to support full sync

ResourceBaseDriver class acts as base class for all the drivers and
provides default behaviour for full sync functionality.

A driver has to provide class or object attribute RESOURCES, specifying
resources it manages. RESOURCES must be a dictionary, keys of the
dictionary should be resource type and value should be method suffix
or plural used for the resources.

A driver has to provide plugin type for itself, as class or object
attribute. Its value should be the same, as used by neutron to
register plugin for the resources it manages.
"""

RESOURCES = {}
plugin_type = None

def __init__(self, *args, **kwargs):
super(ResourceBaseDriver, self).__init__(*args, **kwargs)
for resource in self.RESOURCES:
ALL_RESOURCES[resource] = self

def get_resources_for_full_sync(self, context, resource_type):
"""Provide all resources of type resource_type """
if resource_type not in self.RESOURCES:
raise exceptions.UnsupportedResourceType

method_name = 'get_%s' % self.RESOURCES[resource_type]
try:
resource_getter = getattr(self.plugin, method_name)
except AttributeError:
raise exceptions.PluginMethodNotFound(plugin=self.plugin_type,
method=method_name)

return resource_getter(context)

@property
def plugin(self):
return directory.get_plugin(self.plugin_type)

+ 14
- 0
networking_odl/journal/full_sync.py View File

@@ -23,6 +23,7 @@ from neutron.db import api as db_api
from networking_odl.common import client
from networking_odl.common import constants as odl_const
from networking_odl.db import db
from networking_odl.journal import base_driver
from networking_odl.journal import journal

# Define which pending operation types should be deleted
@@ -147,3 +148,16 @@ def _sync_resources(context, object_type, handler):
for resource in resources:
journal.record(context, object_type, resource['id'],
odl_const.ODL_CREATE, resource)


@db_api.retry_if_session_inactive()
# TODO(rajivk): Change name from sync_resource to _sync_resources
# once, we are completely moved to new sync mechanism to plug new syncing
# mechanism.
def sync_resources(context, resource_type):
driver = base_driver.get_driver(resource_type)
resources = driver.get_resources_for_full_sync(context, resource_type)
with db_api.autonested_transaction(context.session):
for resource in resources:
journal.record(context, resource_type, resource['id'],
odl_const.ODL_CREATE, resource)

+ 89
- 0
networking_odl/tests/unit/journal/test_base_driver.py View File

@@ -0,0 +1,89 @@
# Copyright (c) 2017 NEC Corp
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from neutron_lib.plugins import directory

from networking_odl.common import exceptions
from networking_odl.journal import base_driver
from networking_odl.tests.unit import test_base_db

TEST_PLUGIN = 'test_plugin'
TEST_RESOURCE1 = 'test_resource1'
TEST_RESOURCE2 = 'test_resource2'
TEST_RESOURCE1_SUFFIX = 'test_resource1s'
TEST_RESOURCE2_SUFFIX = 'test_resource2s'
INVALID_RESOURCE = 'invalid_resource'
INVALID_PLUGIN = 'invalid_plugin'


class TestPlugin(object):
def get_test_resource1s(self, context):
return [{'id': 'test_id1'}, {'id': 'test_id2'}]

def get_test_resource2s(self, context):
return [{'id': 'test_id3'}, {'id': 'test_id4'}]


class TestDriver(base_driver.ResourceBaseDriver):
RESOURCES = {
TEST_RESOURCE1: TEST_RESOURCE1_SUFFIX,
TEST_RESOURCE2: TEST_RESOURCE2_SUFFIX
}
plugin_type = TEST_PLUGIN

def __init__(self):
super(TestDriver, self).__init__()


class BaseDriverTestCase(test_base_db.ODLBaseDbTestCase):
def setUp(self):
super(BaseDriverTestCase, self).setUp()
self.test_driver = TestDriver()
self.plugin = TestPlugin()
directory.add_plugin(TEST_PLUGIN, self.plugin)
self.addCleanup(directory.add_plugin, TEST_PLUGIN, None)

def test_get_resource_driver(self):
for resource, resource_suffix in self.test_driver.RESOURCES.items():
driver = base_driver.get_driver(resource)
self.assertEqual(driver, self.test_driver)
self.assertEqual(driver.plugin_type, TEST_PLUGIN)
self.assertEqual(self.test_driver.RESOURCES.get(resource),
resource_suffix)

def non_existing_plugin_cleanup(self):
self.test_driver.plugin_type = TEST_PLUGIN

def test_non_existing_plugin(self):
self.test_driver.plugin_type = INVALID_PLUGIN
self.addCleanup(self.non_existing_plugin_cleanup)
self.assertIsNone(self.test_driver.plugin)

def test_get_non_existing_resource_driver(self):
self.assertRaises(exceptions.ResourceNotRegistered,
base_driver.get_driver, INVALID_RESOURCE)

def test_get_resources_for_full_sync(self):
received_resources = self.test_driver.get_resources_for_full_sync(
self.db_context,
TEST_RESOURCE1)
resources = self.plugin.get_test_resource1s(self.db_context)
for resource in resources:
self.assertIn(resource, received_resources)

def test_get_non_existing_resources_for_full_sync(self):
self.assertRaises(exceptions.UnsupportedResourceType,
self.test_driver.get_resources_for_full_sync,
self.db_context, INVALID_RESOURCE)

+ 56
- 0
networking_odl/tests/unit/journal/test_full_sync.py View File

@@ -27,7 +27,9 @@ from neutron_lib.plugins import directory

from networking_odl.bgpvpn import odl_v2 as bgpvpn_driver
from networking_odl.common import constants as odl_const
from networking_odl.common import exceptions
from networking_odl.db import db
from networking_odl.journal import base_driver
from networking_odl.journal import full_sync
from networking_odl.journal import journal
from networking_odl.l2gateway import driver_v2 as l2gw_driver
@@ -38,6 +40,7 @@ from networking_odl.qos import qos_driver_v2 as qos_driver
from networking_odl.sfc.flowclassifier import sfc_flowclassifier_v2
from networking_odl.sfc import sfc_driver_v2 as sfc_driver
from networking_odl.tests import base
from networking_odl.tests.unit.journal import test_base_driver
from networking_odl.tests.unit import test_base_db
from networking_odl.trunk import trunk_driver_v2 as trunk_driver

@@ -416,3 +419,56 @@ class FullSyncTestCase(test_base_db.ODLBaseDbTestCase):
def test_full_sync_retries_exceptions(self):
with mock.patch.object(full_sync, '_full_sync_needed') as m:
self._test_retry_exceptions(full_sync.full_sync, m, True)

def test_object_not_registered(self):
self.assertRaises(exceptions.ResourceNotRegistered,
full_sync.sync_resources,
self.db_context,
'test-object-type')
self.assertEqual([], db.get_all_db_rows(self.db_session))

def _register_resources(self):
test_base_driver.TestDriver()
self.addCleanup(base_driver.ALL_RESOURCES.clear)

def add_plugin(self, plugin_type, plugin):
directory.add_plugin(plugin_type, plugin)

def test_plugin_not_registered(self):
self._register_resources()
# NOTE(rajivk): workaround, as we don't have delete method for plugin
plugin = directory.get_plugin(test_base_driver.TEST_PLUGIN)
directory.add_plugin(test_base_driver.TEST_PLUGIN, None)
self.addCleanup(self.add_plugin, test_base_driver.TEST_PLUGIN, plugin)
self.assertRaises(exceptions.PluginMethodNotFound,
full_sync.sync_resources,
self.db_context,
test_base_driver.TEST_RESOURCE1)
self.assertEqual([], db.get_all_db_rows(self.db_session))

def test_sync_resources(self):
self._register_resources()
plugin = test_base_driver.TestPlugin()
self.add_plugin(test_base_driver.TEST_PLUGIN, plugin)
resources = plugin.get_test_resource1s(self.db_context)
full_sync.sync_resources(self.db_context,
test_base_driver.TEST_RESOURCE1)
entries = [entry.data for entry in db.get_all_db_rows(self.db_session)]
for resource in resources:
self.assertIn(resource, entries)
self.assertEqual(len(resources), len(entries))

@mock.patch.object(base_driver.ResourceBaseDriver,
'get_resources_for_full_sync')
def test_get_resources_failed(self, mock_get_resources):
self._register_resources()
mock_get_resources.side_effect = exceptions.UnsupportedResourceType()
resource_name = test_base_driver.TEST_RESOURCE1
self.assertRaises(exceptions.UnsupportedResourceType,
full_sync.sync_resources, self.db_context,
resource_name)

mock_get_resources.assert_called_once_with(self.db_context,
resource_name)

self.assertEqual([], db.get_all_db_rows(self.db_session))

Loading…
Cancel
Save