Merge "Allowing lock to be applied per task basis"

This commit is contained in:
Jenkins 2017-07-25 07:11:13 +00:00 committed by Gerrit Code Review
commit 88541fa2f5
20 changed files with 478 additions and 247 deletions

View File

@ -155,10 +155,12 @@ def delete_pending_rows(session, operations_to_delete):
@db_api.retry_db_errors
def _update_maintenance_state(session, expected_state, state):
def _update_periodic_task_state(session, expected_state, state, task):
with session.begin():
row = session.query(models.OpenDaylightMaintenance).filter_by(
state=expected_state).with_for_update().one_or_none()
row = session.query(models.OpenDaylightPeriodicTask).filter_by(
state=expected_state,
task=task).with_for_update().one_or_none()
if row is None:
return False
@ -166,18 +168,29 @@ def _update_maintenance_state(session, expected_state, state):
return True
def lock_maintenance(session):
return _update_maintenance_state(session, odl_const.PENDING,
odl_const.PROCESSING)
def was_periodic_task_executed_recently(session, task, interval):
now = session.execute(func.now()).scalar()
delta = datetime.timedelta(seconds=interval)
row = session.query(models.OpenDaylightPeriodicTask).filter(
models.OpenDaylightPeriodicTask.task == task,
(now - delta >= (models.OpenDaylightPeriodicTask.lock_updated))
).one_or_none()
return bool(row is None)
def unlock_maintenance(session):
return _update_maintenance_state(session, odl_const.PROCESSING,
odl_const.PENDING)
def lock_periodic_task(session, task):
return _update_periodic_task_state(session, odl_const.PENDING,
odl_const.PROCESSING, task)
def update_maintenance_operation(session, operation=None):
"""Update the current maintenance operation details.
def unlock_periodic_task(session, task):
return _update_periodic_task_state(session, odl_const.PROCESSING,
odl_const.PENDING, task)
def update_periodic_task(session, task, operation=None):
"""Update the current periodic task details.
The function assumes the lock is held, so it mustn't be run outside of a
locked context.
@ -187,7 +200,8 @@ def update_maintenance_operation(session, operation=None):
op_text = operation.__name__
with session.begin():
row = session.query(models.OpenDaylightMaintenance).one_or_none()
row = session.query(models.OpenDaylightPeriodicTask).filter_by(
task=task).one()
row.processing_operation = op_text

View File

@ -1 +1 @@
fa0c536252a5
eccd865b7d3a

View File

@ -1 +1 @@
0472f56ff2fb
6f7dfb241354

View File

@ -0,0 +1,32 @@
# Copyright 2017 NEC Corp
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""drop opendaylight_maintenance table
Revision ID: eccd865b7d3a
Revises: fa0c536252a5
Create Date: 2017-05-24 03:00:40.194278
"""
# revision identifiers, used by Alembic.
revision = 'eccd865b7d3a'
down_revision = 'fa0c536252a5'
from alembic import op
def upgrade():
op.drop_table('opendaylight_maintenance')

View File

@ -0,0 +1,50 @@
# Copyright 2017 NEC Corp
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""create opendaylight_preiodic_task table
Revision ID: 6f7dfb241354
Revises: 0472f56ff2fb
Create Date: 2017-05-24 03:01:00.755796
"""
# revision identifiers, used by Alembic.
revision = '6f7dfb241354'
down_revision = '0472f56ff2fb'
from alembic import op
import sqlalchemy as sa
from networking_odl.common import constants as odl_const
def upgrade():
periodic_table = op.create_table(
'opendaylight_periodic_task',
sa.Column('state', sa.Enum(odl_const.PENDING, odl_const.PROCESSING,
name='state'),
nullable=False),
sa.Column('processing_operation', sa.String(70)),
sa.Column('task', sa.String(70), primary_key=True),
sa.Column('lock_updated', sa.TIMESTAMP, nullable=False,
server_default=sa.func.now(),
onupdate=sa.func.now())
)
op.bulk_insert(periodic_table,
[{'task': 'maintenance',
'state': odl_const.PENDING},
{'task': 'hostconfig',
'state': odl_const.PENDING}])

View File

@ -64,12 +64,13 @@ class OpenDaylightJournal(model_base.BASEV2):
}
class OpenDaylightMaintenance(model_base.BASEV2, model_base.HasId):
__tablename__ = 'opendaylight_maintenance'
class OpenDaylightPeriodicTask(model_base.BASEV2):
__tablename__ = 'opendaylight_periodic_task'
state = sa.Column(sa.Enum(odl_const.PENDING, odl_const.PROCESSING),
nullable=False)
processing_operation = sa.Column(sa.String(70))
task = sa.Column(sa.String(70), primary_key=True)
lock_updated = sa.Column(sa.TIMESTAMP, nullable=False,
server_default=sa.func.now(),
onupdate=sa.func.now())

View File

@ -1,82 +0,0 @@
#
# Copyright (C) 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from neutron.db import api as neutron_db_api
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from networking_odl.db import db
LOG = logging.getLogger(__name__)
class MaintenanceThread(object):
def __init__(self):
self.timer = loopingcall.FixedIntervalLoopingCall(self.execute_ops)
self.maintenance_interval = cfg.CONF.ml2_odl.maintenance_interval
self.maintenance_ops = []
def start(self):
self.timer.start(self.maintenance_interval, stop_on_exception=False)
def cleanup(self):
# this method is used for unit test to tear down
self.timer.stop()
try:
self.timer.wait()
except AttributeError:
# NOTE(yamahata): workaround
# some tests call this cleanup without calling start
pass
def _execute_op(self, operation, session):
op_details = operation.__name__
if operation.__doc__:
op_details += " (%s)" % operation.func_doc
try:
LOG.info("Starting maintenance operation %s.", op_details)
db.update_maintenance_operation(session, operation=operation)
operation(session=session)
LOG.info("Finished maintenance operation %s.", op_details)
except Exception:
LOG.exception("Failed during maintenance operation %s.",
op_details)
def execute_ops(self):
LOG.info("Starting journal maintenance run.")
session = neutron_db_api.get_writer_session()
if not db.lock_maintenance(session):
LOG.info("Maintenance already running, aborting.")
return
try:
for operation in self.maintenance_ops:
self._execute_op(operation, session)
finally:
db.update_maintenance_operation(session, operation=None)
db.unlock_maintenance(session)
LOG.info("Finished journal maintenance run.")
def register_operation(self, f):
"""Register a function to be run by the maintenance thread.
:param f: Function to call when the thread runs. The function will
receive a DB session to use for DB operations.
"""
self.maintenance_ops.append(f)

View File

@ -0,0 +1,104 @@
#
# Copyright (C) 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from neutron.db import api as neutron_db_api
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from networking_odl.db import db
LOG = logging.getLogger(__name__)
class PeriodicTask(object):
def __init__(self, task, interval=None):
self.task = task
self.phases = []
if interval is None:
interval = cfg.CONF.ml2_odl.maintenance_interval
self.timer = loopingcall.FixedIntervalLoopingCall(self.execute_ops)
self.interval = interval
def start(self):
self.timer.start(self.interval, stop_on_exception=False)
def cleanup(self):
# this method is used for unit test to tear down
self.timer.stop()
try:
self.timer.wait()
except AttributeError:
# NOTE(yamahata): workaround
# some tests call this cleanup without calling start
pass
def _execute_op(self, operation, db_session):
op_details = operation.__name__
if operation.__doc__:
op_details += " (%s)" % operation.func_doc
try:
LOG.info("Starting %s phase of periodic task %s.",
op_details, self.task)
db.update_periodic_task(db_session, task=self.task,
operation=operation)
operation(session=db_session)
LOG.info("Finished %s phase of %s task.", op_details, self.task)
except Exception:
LOG.exception("Failed during periodic task operation %s.",
op_details)
def task_already_executed_recently(self):
db_session = neutron_db_api.get_reader_session()
return db.was_periodic_task_executed_recently(db_session, self.task,
self.interval)
def execute_ops(self):
LOG.info("Starting %s periodic task.", self.task)
# Lock make sure that periodic task is executed only after
# specified interval. It makes sure that maintenance tasks
# are not executed back to back.
if self.task_already_executed_recently():
LOG.info("Periodic %s task executed after periodic interval "
"Skipping execution.", self.task)
return
db_session = neutron_db_api.get_writer_session()
if not db.lock_periodic_task(db_session, self.task):
LOG.info("Periodic %s task already running task", self.task)
return
try:
for phase in self.phases:
self._execute_op(phase, db_session)
finally:
db.update_periodic_task(db_session, task=self.task,
operation=None)
db.unlock_periodic_task(db_session, self.task)
LOG.info("%s task has been finished", self.task)
def register_operation(self, phase):
"""Register a function to be run by the periodic task.
:param phase: Function to call when the thread runs. The function will
receive a DB session to use for DB operations.
"""
self.phases.append(phase)
LOG.info("%s phase has been registered in %s task", phase, self.task)

View File

@ -31,7 +31,7 @@ from networking_odl.common import postcommit
from networking_odl.journal import cleanup
from networking_odl.journal import full_sync
from networking_odl.journal import journal
from networking_odl.journal import maintenance
from networking_odl.journal import periodic_task
from networking_odl.journal import recovery
from networking_odl.ml2 import port_binding
from networking_odl.ml2 import port_status_update
@ -68,28 +68,30 @@ class OpenDaylightMechanismDriver(api.MechanismDriver):
self.trunk_driver = trunk_driver.OpenDaylightTrunkDriverV2.create()
if odl_const.ODL_QOS in cfg.CONF.ml2.extension_drivers:
qos_driver.OpenDaylightQosDriver.create()
self._start_maintenance_thread()
self._start_periodic_task()
full_sync.register(nlib_const.CORE, L2_RESOURCES)
odl_features.init()
def get_workers(self):
return [port_status_update.OdlPortStatusUpdate()]
def _start_maintenance_thread(self):
# start the maintenance thread and register all the maintenance
def _start_periodic_task(self):
# start the periodic task and register all the phases
# operations :
# (1) JournalCleanup - Delete completed rows from journal
# (2) CleanupProcessing - Mark orphaned processing rows to pending
# (3) Full sync - Re-sync when detecting an ODL "cold reboot"
cleanup_obj = cleanup.JournalCleanup()
self._maintenance_thread = maintenance.MaintenanceThread()
self._maintenance_thread.register_operation(
interval = cfg.CONF.ml2_odl.restconf_poll_interval
self._periodic_task = periodic_task.PeriodicTask('maintenance',
interval)
self._periodic_task.register_operation(
cleanup_obj.delete_completed_rows)
self._maintenance_thread.register_operation(
self._periodic_task.register_operation(
cleanup_obj.cleanup_processing_rows)
self._maintenance_thread.register_operation(full_sync.full_sync)
self._maintenance_thread.register_operation(recovery.journal_recovery)
self._maintenance_thread.start()
self._periodic_task.register_operation(full_sync.full_sync)
self._periodic_task.register_operation(recovery.journal_recovery)
self._periodic_task.start()
@staticmethod
def _record_in_journal(context, object_type, operation, data=None):

View File

@ -34,7 +34,7 @@ from networking_odl.common import client as odl_client
from networking_odl.common import odl_features
from networking_odl.common import utils
from networking_odl.common import websocket_client as odl_ws_client
from networking_odl.journal import maintenance as mt
from networking_odl.journal import periodic_task
from networking_odl.ml2 import port_binding
cfg.CONF.import_group('ml2_odl', 'networking_odl.common.config')
@ -83,10 +83,9 @@ class PseudoAgentDBBindingController(port_binding.PortBindingController):
odl_url = utils.get_odl_url()
self._start_websocket(odl_url)
else:
# Start polling ODL restconf using maintenance thread.
# Start polling ODL restconf using periodic task.
# default: 30s (should be <= agent keep-alive poll interval)
self._start_maintenance_thread(
cfg.CONF.ml2_odl.restconf_poll_interval)
self._start_periodic_task(cfg.CONF.ml2_odl.restconf_poll_interval)
def _make_hostconf_uri(self, odl_url=None, path=''):
"""Make ODL hostconfigs URI with host/port extraced from ODL_URL."""
@ -99,11 +98,11 @@ class PseudoAgentDBBindingController(port_binding.PortBindingController):
return urlparse.urlunparse((purl.scheme, purl.netloc,
path, '', '', ''))
def _start_maintenance_thread(self, poll_interval):
self._mainth = mt.MaintenanceThread()
self._mainth.maintenance_interval = poll_interval
self._mainth.register_operation(self._get_and_update_hostconfigs)
self._mainth.start()
def _start_periodic_task(self, poll_interval):
self._periodic = periodic_task.PeriodicTask('hostconfig',
poll_interval)
self._periodic.register_operation(self._get_and_update_hostconfigs)
self._periodic.start()
def _rest_get_hostconfigs(self):
try:

View File

@ -20,6 +20,7 @@ import mock
from oslo_config import cfg
from networking_odl.common import odl_features
from networking_odl.journal import periodic_task
from neutron.tests import base
from networking_odl.journal import journal
@ -84,3 +85,9 @@ class OpenDaylightJournalThreadFixture(fixtures.Fixture):
super(OpenDaylightJournalThreadFixture, self)._setUp()
mock.patch.object(journal.OpenDaylightJournalThread,
'start_odl_sync_thread').start()
class OpenDaylightPeriodicTaskFixture(fixtures.Fixture):
def _setUp(self):
super(OpenDaylightPeriodicTaskFixture, self)._setUp()
mock.patch.object(periodic_task.PeriodicTask, 'start').start()

View File

@ -117,7 +117,7 @@ class _TestModelsMigrationsODL(test_migrations._TestModelsMigrations):
rendered_meta_def = diff_elem[0][6]
if (diff_elem[0][0] == 'modify_default' and
diff_elem[0][2] in ('opendaylightjournal',
'opendaylight_maintenance') and
'opendaylight_periodic_task') and
isinstance(meta_def, schema.DefaultClause) and
isinstance(meta_def.arg, sql.elements.TextClause) and
meta_def.reflected and

View File

@ -20,7 +20,6 @@ from oslo_config import cfg
from networking_odl.common import client
from networking_odl.journal import journal
from networking_odl.journal import maintenance
from networking_odl.ml2 import mech_driver_v2
from networking_odl.tests import base
from networking_odl.tests.unit import test_base_db
@ -30,15 +29,15 @@ class OpenDaylightConfigBase(test_plugin.Ml2PluginV2TestCase,
test_base_db.ODLBaseDbTestCase):
def setUp(self):
self.useFixture(base.OpenDaylightJournalThreadFixture())
self.mock_mt_thread = mock.patch.object(
maintenance.MaintenanceThread, 'start').start()
self.useFixture(base.OpenDaylightRestClientFixture())
super(OpenDaylightConfigBase, self).setUp()
cfg.CONF.set_override('mechanism_drivers',
['logger', 'opendaylight_v2'], 'ml2')
cfg.CONF.set_override('extension_drivers',
['port_security', 'qos'], 'ml2')
self.useFixture(base.OpenDaylightJournalThreadFixture())
self.thread = journal.OpenDaylightJournalThread()
self.useFixture(base.OpenDaylightJournalThreadFixture())
def run_journal_processing(self):
"""Cause the journal to process the first pending entry"""

View File

@ -224,37 +224,98 @@ class DbTestCase(test_base_db.ODLBaseDbTestCase):
def test_update_row_state_to_completed(self):
self._test_update_row_state(odl_const.PROCESSING, odl_const.COMPLETED)
def _test_maintenance_lock_unlock(self, db_func, existing_state,
expected_state, expected_result):
row = models.OpenDaylightMaintenance(id='test',
state=existing_state)
def _test_periodic_task_lock_unlock(self, db_func, existing_state,
expected_state, expected_result,
task='test_task'):
row = models.OpenDaylightPeriodicTask(state=existing_state,
task=task)
self.db_session.add(row)
self.db_session.flush()
self.assertEqual(expected_result, db_func(self.db_session))
row = self.db_session.query(models.OpenDaylightMaintenance).one()
self.assertEqual(expected_result, db_func(self.db_session,
task))
row = self.db_session.query(models.OpenDaylightPeriodicTask).filter_by(
task=task).one()
self.assertEqual(expected_state, row['state'])
def test_lock_maintenance(self):
self._test_maintenance_lock_unlock(db.lock_maintenance,
odl_const.PENDING,
odl_const.PROCESSING,
True)
def test_lock_periodic_task(self):
self._test_periodic_task_lock_unlock(db.lock_periodic_task,
odl_const.PENDING,
odl_const.PROCESSING,
True)
def test_lock_maintenance_fails_when_processing(self):
self._test_maintenance_lock_unlock(db.lock_maintenance,
odl_const.PROCESSING,
odl_const.PROCESSING,
False)
def test_lock_periodic_task_fails_when_processing(self):
self._test_periodic_task_lock_unlock(db.lock_periodic_task,
odl_const.PROCESSING,
odl_const.PROCESSING,
False)
def test_unlock_maintenance(self):
self._test_maintenance_lock_unlock(db.unlock_maintenance,
odl_const.PROCESSING,
odl_const.PENDING,
True)
def test_unlock_periodic_task(self):
self._test_periodic_task_lock_unlock(db.unlock_periodic_task,
odl_const.PROCESSING,
odl_const.PENDING,
True)
def test_unlock_maintenance_fails_when_pending(self):
self._test_maintenance_lock_unlock(db.unlock_maintenance,
odl_const.PENDING,
odl_const.PENDING,
False)
def test_unlock_periodic_task_fails_when_pending(self):
self._test_periodic_task_lock_unlock(db.unlock_periodic_task,
odl_const.PENDING,
odl_const.PENDING,
False)
def test_multiple_row_tasks(self):
self._test_periodic_task_lock_unlock(db.unlock_periodic_task,
odl_const.PENDING,
odl_const.PENDING,
False)
def _add_tasks(self, tasks):
row = []
for count, task in enumerate(tasks):
row.append(models.OpenDaylightPeriodicTask(state=odl_const.PENDING,
task=task))
self.db_session.add(row[count])
self.db_session.flush()
rows = self.db_session.query(models.OpenDaylightPeriodicTask).all()
self.assertEqual(len(tasks), len(rows))
def _perform_ops_on_all_rows(self, tasks, to_lock):
if to_lock:
curr_state = odl_const.PENDING
exp_state = odl_const.PROCESSING
func = db.lock_periodic_task
else:
exp_state = odl_const.PENDING
curr_state = odl_const.PROCESSING
func = db.unlock_periodic_task
processed = []
for task in tasks:
row = self.db_session.query(
models.OpenDaylightPeriodicTask).filter_by(task=task).one()
self.assertEqual(row['state'], curr_state)
self.assertTrue(func(self.db_session, task))
rows = self.db_session.query(
models.OpenDaylightPeriodicTask).filter_by().all()
processed.append(task)
for row in rows:
if row['task'] in processed:
self.assertEqual(exp_state, row['state'])
else:
self.assertEqual(curr_state, row['state'])
self.assertFalse(func(self.db_session, tasks[-1]))
def test_multiple_row_tasks_lock_unlock(self):
task1 = 'test_random_task'
task2 = 'random_task_random'
task3 = 'task_test_random'
tasks = [task1, task2, task3]
self._add_tasks(tasks)
self._perform_ops_on_all_rows(tasks, to_lock=True)
self._perform_ops_on_all_rows(tasks, to_lock=False)

View File

@ -1,91 +0,0 @@
#
# Copyright (C) 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import threading
import mock
from networking_odl.common import constants as odl_const
from networking_odl.db import models
from networking_odl.journal import maintenance
from networking_odl.tests.unit import test_base_db
class MaintenanceThreadTestCase(test_base_db.ODLBaseDbTestCase):
def setUp(self):
super(MaintenanceThreadTestCase, self).setUp()
row = models.OpenDaylightMaintenance(state=odl_const.PENDING)
self.db_session.add(row)
self.db_session.flush()
self.thread = maintenance.MaintenanceThread()
self.thread.maintenance_interval = 0.01
self.addCleanup(self.thread.cleanup)
def test__execute_op_no_exception(self):
with mock.patch.object(maintenance, 'LOG') as mock_log:
operation = mock.MagicMock()
operation.__name__ = "test"
self.thread._execute_op(operation, self.db_session)
self.assertTrue(operation.called)
self.assertTrue(mock_log.info.called)
self.assertFalse(mock_log.exception.called)
def test__execute_op_with_exception(self):
with mock.patch.object(maintenance, 'LOG') as mock_log:
operation = mock.MagicMock(side_effect=Exception())
operation.__name__ = "test"
self.thread._execute_op(operation, self.db_session)
self.assertTrue(mock_log.exception.called)
def test_thread_works(self):
callback_event = threading.Event()
count = [0]
def callback_op(**kwargs):
count[0] += 1
# The following should be true on the second call, so we're making
# sure that the thread runs more than once.
if count[0] > 1:
callback_event.set()
self.thread.register_operation(callback_op)
self.thread.start()
# Make sure the callback event was called and not timed out
self.assertTrue(callback_event.wait(timeout=5))
def test_thread_continues_after_exception(self):
exception_event = threading.Event()
callback_event = threading.Event()
def exception_op(**kwargs):
if not exception_event.is_set():
exception_event.set()
raise Exception()
def callback_op(**kwargs):
callback_event.set()
for op in [exception_op, callback_op]:
self.thread.register_operation(op)
self.thread.start()
# Make sure the callback event was called and not timed out
self.assertTrue(callback_event.wait(timeout=5))

View File

@ -0,0 +1,139 @@
#
# Copyright (C) 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import threading
import time
from neutron.db import api as neutron_db_api
import mock
from networking_odl.common import constants as odl_const
from networking_odl.db import db
from networking_odl.db import models
from networking_odl.journal import periodic_task
from networking_odl.tests.unit import test_base_db
class PeriodicTaskThreadTestCase(test_base_db.ODLBaseDbTestCase):
def setUp(self):
super(PeriodicTaskThreadTestCase, self).setUp()
row = models.OpenDaylightPeriodicTask(task='test-maintenance',
state=odl_const.PENDING)
self.db_session.add(row)
self.db_session.flush()
self.thread = periodic_task.PeriodicTask('test-maintenance')
self.thread.interval = 0.01
self.addCleanup(self.thread.cleanup)
def test__execute_op_no_exception(self):
with mock.patch.object(periodic_task, 'LOG') as mock_log:
operation = mock.MagicMock()
operation.__name__ = "test"
self.thread.register_operation(operation)
db_session = neutron_db_api.get_reader_session()
self.thread._execute_op(operation, db_session)
operation.assert_called()
mock_log.info.assert_called()
mock_log.exception.assert_not_called()
def test__execute_op_with_exception(self):
with mock.patch.object(periodic_task, 'LOG') as mock_log:
operation = mock.MagicMock(side_effect=Exception())
operation.__name__ = "test"
db_session = neutron_db_api.get_reader_session()
self.thread._execute_op(operation, db_session)
mock_log.exception.assert_called()
def test_thread_works(self):
callback_event = threading.Event()
count = [0]
def callback_op(**kwargs):
count[0] += 1
# The following should be true on the second call, so we're making
# sure that the thread runs more than once.
if count[0] > 1:
callback_event.set()
self.thread.register_operation(callback_op)
self.thread.start()
# Make sure the callback event was called and not timed out
self.assertTrue(callback_event.wait(timeout=5))
def test_thread_continues_after_exception(self):
exception_event = threading.Event()
callback_event = threading.Event()
def exception_op(**kwargs):
if not exception_event.is_set():
exception_event.set()
raise Exception()
def callback_op(**kwargs):
callback_event.set()
for op in [exception_op, callback_op]:
self.thread.register_operation(op)
self.thread.start()
# Make sure the callback event was called and not timed out
self.assertTrue(callback_event.wait(timeout=5))
def test_multiple_thread_work(self):
self.thread1 = periodic_task.PeriodicTask('test-maintenance1')
callback_event = threading.Event()
callback_event1 = threading.Event()
self.thread1.interval = 0.01
self.addCleanup(self.thread1.cleanup)
def callback_op(**kwargs):
callback_event.set()
def callback_op1(**kwargs):
callback_event1.set()
self.thread.register_operation(callback_op)
self.thread.register_operation(callback_op1)
self.thread.start()
self.assertTrue(callback_event.wait(timeout=5))
self.thread1.start()
self.assertTrue(callback_event1.wait(timeout=5))
@mock.patch.object(db, "was_periodic_task_executed_recently")
def test_back_to_back_job(self, mock_status_method):
callback_event = threading.Event()
def callback_op(**kwargs):
callback_event.set()
self.thread.register_operation(callback_op)
msg = ("Periodic %s task executed after periodic "
"interval Skipping execution.")
with mock.patch.object(periodic_task.LOG, 'info') as mock_log_info:
mock_status_method.return_value = True
self.thread.start()
time.sleep(1)
mock_log_info.assert_called_with(msg, 'test-maintenance')
self.assertFalse(callback_event.wait(timeout=1))
mock_log_info.assert_called_with(msg, 'test-maintenance')
mock_status_method.return_value = False
self.assertTrue(callback_event.wait(timeout=2))

View File

@ -34,7 +34,6 @@ from networking_odl.common import constants as odl_const
from networking_odl.common import filters
from networking_odl.db import db
from networking_odl.journal import journal
from networking_odl.journal import maintenance
from networking_odl.ml2 import mech_driver_v2
from networking_odl.tests import base as odl_base
from networking_odl.tests.unit import test_base_db
@ -112,14 +111,13 @@ class OpenDaylightL3TestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase,
core_plugin = cfg.CONF.core_plugin
service_plugins = {'l3_plugin_name': 'odl-router_v2'}
self.useFixture(odl_base.OpenDaylightJournalThreadFixture())
self.mock_mt_thread = mock.patch.object(
maintenance.MaintenanceThread, 'start').start()
mock.patch.object(mech_driver_v2.OpenDaylightMechanismDriver,
'_record_in_journal').start()
mock.patch.object(mech_driver_v2.OpenDaylightMechanismDriver,
'sync_from_callback_precommit').start()
mock.patch.object(mech_driver_v2.OpenDaylightMechanismDriver,
'sync_from_callback_postcommit').start()
self.useFixture(odl_base.OpenDaylightPeriodicTaskFixture())
self.useFixture(odl_base.OpenDaylightFeaturesFixture())
super(OpenDaylightL3TestCase, self).setUp(
plugin=core_plugin, service_plugins=service_plugins)

View File

@ -41,7 +41,6 @@ from networking_odl.common import utils
from networking_odl.db import db
from networking_odl.journal import cleanup
from networking_odl.journal import journal
from networking_odl.journal import maintenance
from networking_odl.ml2 import mech_driver_v2
from networking_odl.tests import base
from networking_odl.tests.unit import base_v2
@ -59,11 +58,9 @@ SG_RULE_FAKE_ID = uuidutils.generate_uuid()
class OpenDayLightMechanismConfigTests(testlib_api.SqlTestCase):
def setUp(self):
super(OpenDayLightMechanismConfigTests, self).setUp()
self.useFixture(base.OpenDaylightFeaturesFixture())
self.useFixture(base.OpenDaylightJournalThreadFixture())
self.mock_mt_thread = mock.patch.object(
maintenance.MaintenanceThread, 'start').start()
super(OpenDayLightMechanismConfigTests, self).setUp()
cfg.CONF.set_override('mechanism_drivers',
['logger', 'opendaylight_v2'], 'ml2')
cfg.CONF.set_override('port_binding_controller',

View File

@ -31,6 +31,7 @@ from neutron_lib.plugins.ml2 import api as ml2_api
from networking_odl.common import odl_features
from networking_odl.ml2 import pseudo_agentdb_binding
from networking_odl.tests import base
from networking_odl.tests.unit import test_base_db
from requests.exceptions import HTTPError
from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin
@ -39,7 +40,7 @@ AGENTDB_BINARY = 'neutron-odlagent-portbinding'
L2_TYPE = "ODL L2"
class TestPseudoAgentDBBindingController(base.DietTestCase):
class TestPseudoAgentDBBindingController(test_base_db.ODLBaseDbTestCase):
"""Test class for AgentDBPortBinding."""
# test data hostconfig and hostconfig-dbget

View File

@ -29,4 +29,4 @@ class ODLBaseDbTestCase(SqlTestCaseLight):
def _db_cleanup(self):
self.db_session.query(models.OpenDaylightJournal).delete()
self.db_session.query(models.OpenDaylightMaintenance).delete()
self.db_session.query(models.OpenDaylightPeriodicTask).delete()