Add interconnection state scheduler worker

Add a worker running, at a fixed interval, a task to check interconnection
resources state in database :
- left in xxxxING state since several seconds,
- or in PRE_xxxx state.

Signed-off-by: Thomas Morin <thomas.morin@orange.com>
Submitted on behalf of a third-party: Orange

Change-Id: Ia00d28652be80fe0d70e695872bcee8ded4ad0f8
This commit is contained in:
ythomas1 2019-02-26 17:31:30 +01:00
parent 20ee49f077
commit 9a5f6d16dd
7 changed files with 215 additions and 0 deletions

View File

@ -3,3 +3,4 @@ output_file = etc/neutron-interconnection.conf.sample
wrap_width = 79
namespace = neutron-interconnection.remote_keystone_auth
namespace = neutron-interconnection.state_scheduler

View File

@ -20,3 +20,9 @@ def list_remote_keystone_auth_opts():
return [
('remote_keystone_auth', inter_config.remote_keystone_auth_opts),
]
def list_state_scheduler_opts():
return [
('state_scheduler', inter_config.state_scheduler_opts),
]

View File

@ -24,6 +24,14 @@ remote_keystone_auth_opts = [
help=_('Remote Keystone authentication password')),
cfg.StrOpt('project',
help=_('Remote Keystone authentication project')),
cfg.IntOpt('check_state_interval', default=10,
help=_('Check state interval in seconds.')),
]
state_scheduler_opts = [
cfg.IntOpt('check_state_interval', default=10,
help=_('Check state interval in seconds.')),
]
cfg.CONF.register_opts(remote_keystone_auth_opts, "remote_keystone_auth")
cfg.CONF.register_opts(state_scheduler_opts, "state_scheduler")

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
INTERCONNECTION = 'interconnection'
TO_VALIDATE = 'TO_VALIDATE'
PRE_VALIDATE = 'PRE_VALIDATE'

View File

@ -0,0 +1,102 @@
# Copyright (c) 2018 Orange.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from oslo_config import cfg
from oslo_log import log as logging
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib import context as n_context
from neutron_lib.objects import utils as obj_utils
from neutron import worker as n_worker
from neutron_interconnection.objects import interconnection as inter_objs
from neutron_interconnection.services.common import constants as inter_consts
from neutron_interconnection.services import lifecycle_fsm
# Delta time in seconds from which an interconnection resource has been left
# in in xxxxING (negative value)
CHECK_STATE_DELTA = -600
LOG = logging.getLogger(__name__)
@registry.has_registry_receivers
class StateSchedulerWorker(n_worker.PeriodicWorker):
"""Starts a periodic task to check interconnection resources state.
This task will look in database if an interconnection resource:
- has been left in xxxxING state since several seconds,
- is in PRE_xxxx state.
Can also be triggered by AFTER_CREATE and AFTER_DELETE database events.
"""
def __init__(self, drivers):
super(StateSchedulerWorker, self).__init__(
self._check_state,
cfg.CONF.state_scheduler.check_state_interval,
0)
self.drivers = drivers
def _check_state(self):
admin_context = n_context.get_admin_context()
state_event = 'retry'
# Check if an interconnection is in a xxxxING state since
# several secondes
time_before = datetime.timedelta(seconds=CHECK_STATE_DELTA)
interconnection = inter_objs.Interconnection.get_objects(
admin_context, validate_filters=False,
count=1, state=obj_utils.StringEnds('ING'),
changed_since=(datetime.datetime.strftime(
datetime.datetime.utcnow() + time_before,
'%Y-%m-%dT%H:%M:%S') + 'Z'
)
)
if not interconnection:
# Check if an interconnection is in a PRE_xxxx state
interconnection = inter_objs.Interconnection.get_objects(
admin_context, validate_filters=False,
count=1, state=obj_utils.StringStarts('PRE')
)
state_event = 'lock'
if interconnection:
LOG.debug("Processing interconnection: %s", interconnection[0])
fsm = lifecycle_fsm.LifecycleFSM(self.drivers, interconnection[0])
fsm.process_event(state_event)
@registry.receives(inter_consts.INTERCONNECTION, [events.AFTER_CREATE])
def process_interconnection_create(self, resource, event, trigger,
payload):
interconnection_obj = payload.current_interconnection
fsm = lifecycle_fsm.LifecycleFSM(self.drivers, interconnection_obj)
if fsm.is_actionable_event('lock'):
fsm.process_event('lock')
@registry.receives(inter_consts.INTERCONNECTION, [events.AFTER_DELETE])
def process_interconnection_delete(self, resource, event, trigger,
payload):
interconnection_obj = payload.original_interconnection
fsm = lifecycle_fsm.LifecycleFSM(self.drivers, interconnection_obj)
if fsm.is_actionable_event('deleted'):
fsm.process_event('deleted')

View File

@ -0,0 +1,95 @@
# Copyright (c) 2018 Orange.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from neutron.common import utils
from neutron.tests import base
from neutron_interconnection.objects import interconnection as inter_objs
from neutron_interconnection.services import state_scheduler_worker
class StateSchedulerWorkerTestCase(base.BaseTestCase):
def setUp(self):
super(StateSchedulerWorkerTestCase, self).setUp()
cfg.CONF.set_override(
"check_state_interval", 1, group="state_scheduler"
)
self.fsm_mock = mock.Mock()
self.fsm_mock.process_event = mock.Mock()
mock.patch(
'neutron_interconnection.services.lifecycle_fsm.LifecycleFSM',
return_value=self.fsm_mock
).start()
@mock.patch.object(inter_objs.Interconnection, 'get_objects',
side_effect=[None, None,
["fake-interconnection"],
None, ["fake-interconnection"],
None, None])
def test_state_scheduler_worker_lifecycle(self, get_objects_mock):
drivers = mock.Mock()
worker = state_scheduler_worker.StateSchedulerWorker(
drivers)
self.addCleanup(worker.stop)
worker.start()
# Any interconnection in xxxxING and PRE_xxxx states
utils.wait_until_true(
lambda: get_objects_mock.call_count == 2,
timeout=5,
exception=RuntimeError("get_objects_mock not called"))
self.fsm_mock.process_event.assert_not_called()
get_objects_mock.reset_mock()
self.fsm_mock.reset_mock()
# An interconnection in xxxxING state
utils.wait_until_true(
lambda: get_objects_mock.called,
timeout=5,
exception=RuntimeError("get_objects_mock not called"))
self.fsm_mock.process_event.assert_called_once_with("retry")
get_objects_mock.reset_mock()
self.fsm_mock.reset_mock()
# An interconnection in PRE_xxxx state
utils.wait_until_true(
lambda: get_objects_mock.called,
timeout=5,
exception=RuntimeError("get_objects_mock not called"))
self.fsm_mock.process_event.assert_called_once_with("lock")
get_objects_mock.reset_mock()
self.fsm_mock.reset_mock()
worker.stop()
worker.wait()
self.assertFalse(get_objects_mock.called)
worker.reset()
utils.wait_until_true(
lambda: get_objects_mock.call_count == 2,
timeout=5,
exception=RuntimeError("get_objects_mock not called"))
self.fsm_mock.process_event.assert_not_called()

View File

@ -32,6 +32,7 @@ neutron.db.alembic_migrations=
neutron-interconnection = neutron_interconnection.db.migration:alembic_migrations
oslo.config.opts =
neutron-interconnection.remote_keystone_auth = neutron_interconnection.opts:list_remote_keystone_auth_opts
neutron-interconnection.state_scheduler = neutron_interconnection.opts:list_state_scheduler_opts
[compile_catalog]
directory = neutron_interconnection/locale