Merge "Remove rescheduler from snapshot service, use timers and threads." into stable/pike

This commit is contained in:
Jenkins 2017-08-20 13:19:39 +00:00 committed by Gerrit Code Review
commit b63126338e
4 changed files with 27 additions and 442 deletions

View File

@ -36,7 +36,7 @@ class DriverBase(object):
pass
def callback_on_fault(self, exception):
LOG.exception('Exception: {0}'.format(exception))
pass
@staticmethod
def _get_end_message(entity_type):

View File

@ -1,205 +0,0 @@
# Copyright 2016 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
import sched
import time
class ReScheduler(object):
"""Rescheduler
The ReScheduler is a decorating class to Python's sched package Scheduler.
Allows scheduling tasks for a repeated number of times while providing
a mechanism to reschedule differently in case of a task's failure.
"""
def __init__(self):
self.scheduler = sched.scheduler(time.time, time.sleep)
def run(self):
"""Starts the scheduler
:return: None
"""
self.scheduler.run()
def reset(self):
"""Removes all scheduled tasks from scheduler
:return: None
"""
for event in self.scheduler.queue:
self._cancel(event=event)
def schedule(self,
func,
args=(),
initial_delay=0,
standard_interval=None,
fault_interval=None,
standard_priority=2,
fault_priority=1,
times=-1,
ttl=None,
fault_callback=None,
fault_callback_kwargs=None):
"""Schedule a new task
:param func: function to run
:type func: function
:param args: arguments for the function, must come as a tuple
:type args: tuple
:param initial_delay: initial delay before first schedule
:type initial_delay: float
:param standard_interval: interval between rescheduling in case of
success
:type standard_interval: float
:param fault_interval: interval between rescheduling in case of failure
:type fault_interval: float
:param standard_priority: standard priority for scheduled task
:type standard_priority: int
:param fault_priority: priority for rescheduled task on failure
:type fault_priority: int
:param times: times to reschedule a successful task
:type times: int
:param fault_callback: callback function in case of failure,
must accept 'exception' as a function keyword
:type fault_callback: function
:param fault_callback_kwargs: callback function in case of failure
keyword arguments, must come as a dictionary
:type fault_callback_kwargs: dict
:return: None
"""
if fault_callback_kwargs is None:
fault_callback_kwargs = {}
if times == 0:
return None
if not func:
raise ValueError('Invalid func value')
if initial_delay < 0:
raise ValueError('Initial delay is less than zero')
if standard_interval is None and time != 1:
raise ValueError('Standard interval is None')
if standard_interval and standard_interval <= 0:
raise ValueError('Standard interval is less than or equal to zero')
if fault_interval is None:
raise ValueError('Fault interval is None')
if fault_interval <= 0:
raise ValueError('Fault interval is less than or equal to zero')
task = self._Task(
self.scheduler,
func,
args,
initial_delay,
standard_interval,
fault_interval,
standard_priority,
fault_priority,
times,
ttl,
fault_callback,
fault_callback_kwargs)
task.reschedule()
def _cancel(self, event):
self.scheduler.cancel(event=event)
class _Task(object):
def __init__(self,
scheduler,
func,
args,
initial_delay,
standard_interval,
fault_interval,
standard_priority=2,
fault_priority=1,
times=-1,
ttl=None,
fault_callback=None,
fault_callback_kwargs=None):
if fault_callback_kwargs is None:
fault_callback_kwargs = {}
self.scheduler = scheduler
self.func = func
self.args = args
self.next_schedule = initial_delay
self.standard_interval = standard_interval
self.fault_interval = fault_interval
self.standard_priority = standard_priority
self.fault_priority = fault_priority
self.times = times
self.fault_callback = fault_callback
self.fault_callback_kwargs = fault_callback_kwargs
self.ttl = ttl
self.next_priority = self.standard_priority
self.first_run_time = None
@property
def exhausted(self):
if self.first_run_time and self.ttl:
time_diff = (datetime.now() - self.first_run_time).seconds + \
self.next_schedule
return self.ttl <= time_diff or self.times == 0
return self.times == 0
def run(self):
if self.first_run_time is None:
self.first_run_time = datetime.now()
try:
self.func(*self.args)
self._decrease_count()
self.next_schedule = self.standard_interval
self.next_priority = self.standard_priority
except AssertionError:
raise
except Exception as e:
self.next_schedule = self.fault_interval
self.next_priority = self.fault_priority
if self.fault_callback:
self.fault_callback_kwargs['exception'] = e
self.fault_callback(**self.fault_callback_kwargs)
def reschedule(self):
if self.exhausted:
return None
self.scheduler.enter(delay=self.next_schedule,
priority=self.next_priority,
action=self.loop,
argument=())
def loop(self):
self.run()
self.reschedule()
def _decrease_count(self):
self.times = max(self.times - 1, -1)

View File

@ -11,11 +11,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_log import log
from oslo_service import service as os_service
from vitrage.common.constants import DatasourceAction
from vitrage.datasources.rescheduler import ReScheduler
LOG = log.getLogger(__name__)
@ -43,35 +43,37 @@ class SnapshotsService(DatasourceService):
init_ttl = self.conf.consistency.initialization_max_retries * \
self.conf.consistency.initialization_interval
snap_scheduler = ReScheduler()
for ds_driver in self.registered_datasources.values():
callback = self.entities_to_queue(
ds_driver,
DatasourceAction.INIT_SNAPSHOT,
fault_interval,
init_ttl)
self.tg.add_thread(callback)
snap_scheduler.schedule(
func=self.entities_to_queue(ds_driver,
DatasourceAction.INIT_SNAPSHOT),
standard_interval=standard_interval,
fault_interval=fault_interval,
times=1,
ttl=init_ttl,
fault_callback=ds_driver.callback_on_fault)
snap_scheduler.schedule(
func=self.entities_to_queue(ds_driver,
DatasourceAction.SNAPSHOT),
initial_delay=standard_interval,
standard_interval=standard_interval,
fault_interval=fault_interval,
fault_callback=ds_driver.callback_on_fault)
self.tg.add_thread(snap_scheduler.run)
callback = self.entities_to_queue(
ds_driver,
DatasourceAction.SNAPSHOT,
fault_interval,
standard_interval)
self.tg.add_timer(standard_interval, callback, standard_interval)
LOG.info('Vitrage datasources Snapshot Service - Started!')
def entities_to_queue(self, driver, datasource_action):
def entities_to_queue(self, driver, action, fault_interval, timeout):
def _entities_to_queue():
for entity in driver.get_all(datasource_action):
self.send_to_queue(entity)
endtime = time.time() + timeout
while time.time() < endtime:
try:
LOG.info('Driver %s - %s', type(driver).__name__, action)
items = driver.get_all(action)
for entity in items:
self.send_to_queue(entity)
break
except Exception as e:
LOG.exception('Driver Exception: {0}'.format(e))
time.sleep(fault_interval)
driver.callback_on_fault(e)
return _entities_to_queue
def stop(self, graceful=False):

View File

@ -1,212 +0,0 @@
# Copyright 2016 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from vitrage.datasources.rescheduler import ReScheduler
from vitrage.tests import base
RESCHEDULING_TIMES = 2
STANDARD_INTERVAL = 0.8
FAULT_INTERVAL = 0.2
TTL = 3
FAULT_NUM = 2
TIME_DIFF_MARGIN_PRECENT = 1.05
class ReschedulerTester(base.BaseTest):
class DummyException(Exception):
def __init__(self, message=''):
self.message = message
def rescheduled_func(self, item, assert_timing=False):
self.counter += 1
self.list.append(item)
if assert_timing:
if self.last_failure:
current_failure = datetime.now()
time_diff = (current_failure - self.last_failure).seconds
self.last_failure = current_failure
self.assertLess(time_diff,
STANDARD_INTERVAL * TIME_DIFF_MARGIN_PRECENT)
else:
self.last_failure = datetime.now()
def rescheduled_func_with_exception(self, item):
self.counter += 1
self.list.append(item)
if self.counter > RESCHEDULING_TIMES // 2 \
and self.fault_counter < FAULT_NUM:
raise self.DummyException('ReScheduler Test Dummy Exception')
def rescheduled_func_callback(self, exception, item, assert_timing=False):
self.fault_counter += 1
self.fault_list.append(item)
if assert_timing:
if self.last_failure:
current_failure = datetime.now()
time_diff = (current_failure - self.last_failure).seconds
self.last_failure = current_failure
self.assertLess(time_diff,
FAULT_INTERVAL * TIME_DIFF_MARGIN_PRECENT)
else:
self.last_failure = datetime.now()
@classmethod
def reset_test_params(cls):
cls.counter = 0
cls.list = []
cls.last_success = None
cls.fault_counter = 0
cls.fault_list = []
cls.last_failure = None
@classmethod
def setUpClass(cls):
cls.rescheduler = ReScheduler()
cls.counter = 0
cls.last_success = None
cls.fault_counter = 0
cls.last_failure = None
cls.list = []
cls.fault_list = []
def test_schedule(self):
# Test setup
self.reset_test_params()
TASKS_NUM = 6
# Test action
for i in range(TASKS_NUM):
self.rescheduler.schedule(func=self.rescheduled_func, args=(i,),
initial_delay=0,
standard_interval=STANDARD_INTERVAL,
fault_interval=FAULT_INTERVAL,
times=RESCHEDULING_TIMES)
# Test assertions
self.assertEqual(TASKS_NUM, len(self.rescheduler.scheduler.queue))
self.reset_test_params()
def test_reset(self):
# Test setup
self.reset_test_params()
TASKS_NUM = 6
for i in range(TASKS_NUM):
self.rescheduler.schedule(func=self.rescheduled_func, args=(i,),
initial_delay=0,
standard_interval=STANDARD_INTERVAL,
fault_interval=FAULT_INTERVAL,
times=RESCHEDULING_TIMES)
# Test action
self.rescheduler.reset()
# Test assertions
self.assertEqual(0, len(self.rescheduler.scheduler.queue))
self.reset_test_params()
def test_rescheduling(self):
# Test setup
self.reset_test_params()
TASKS_NUM = 2
VALIDATE_LIST = [0, 1] * RESCHEDULING_TIMES
for i in range(TASKS_NUM):
self.rescheduler.schedule(func=self.rescheduled_func, args=(i,),
initial_delay=0,
standard_interval=STANDARD_INTERVAL,
fault_interval=FAULT_INTERVAL,
times=RESCHEDULING_TIMES)
# Test action
self.rescheduler.run()
# Test assertions
self.assertEqual(self.counter, TASKS_NUM * RESCHEDULING_TIMES)
self.assert_list_equal(self.list, VALIDATE_LIST)
self.reset_test_params()
def test_rescheduling_timing(self):
# Test setup
self.reset_test_params()
TASKS_NUM = 6
for i in range(TASKS_NUM):
self.rescheduler.schedule(
func=self.rescheduled_func,
args=(i, True),
initial_delay=0,
standard_interval=STANDARD_INTERVAL,
fault_interval=FAULT_INTERVAL,
times=RESCHEDULING_TIMES)
# Test action
# Test assertions
self.rescheduler.run()
self.reset_test_params()
def test_rescheduling_with_fault_callbacks(self):
# Test setup
self.reset_test_params()
VALIDATE_LIST = [0] * (RESCHEDULING_TIMES + FAULT_NUM)
VALIDATE_FAULT_LIST = ['f'] * FAULT_NUM
self.rescheduler.schedule(
func=self.rescheduled_func_with_exception,
args=(0,),
initial_delay=0,
standard_interval=STANDARD_INTERVAL,
fault_interval=FAULT_INTERVAL,
fault_callback=self.rescheduled_func_callback,
fault_callback_kwargs={'item': 'f'},
times=RESCHEDULING_TIMES)
# Test action
self.rescheduler.run()
# Test assertions
self.assertEqual(self.counter, RESCHEDULING_TIMES + FAULT_NUM)
self.assert_list_equal(self.list, VALIDATE_LIST)
self.assertEqual(self.fault_counter, FAULT_NUM)
self.assertEqual(self.fault_list, VALIDATE_FAULT_LIST)
self.reset_test_params()
def test_rescheduling_with_fault_callbacks_timing(self):
# Test setup
self.reset_test_params()
TASKS_NUM = 6
for i in range(TASKS_NUM):
self.rescheduler.schedule(
func=self.rescheduled_func_with_exception,
args=(i,),
initial_delay=0,
standard_interval=STANDARD_INTERVAL,
fault_interval=FAULT_INTERVAL,
fault_callback=self.rescheduled_func_callback,
fault_callback_kwargs={'item': self.fault_counter,
'assert_timing': True},
times=RESCHEDULING_TIMES)
# Test action
# Test assertions
self.rescheduler.run()
self.reset_test_params()
def test_rescheduling_with_ttl(self):
# Test setup
self.reset_test_params()
start = datetime.now()
self.rescheduler.schedule(
func=self.rescheduled_func_with_exception,
args=(0,),
initial_delay=0,
standard_interval=STANDARD_INTERVAL,
fault_interval=FAULT_INTERVAL,
fault_callback=self.rescheduled_func_callback,
fault_callback_kwargs={'item': self.fault_counter},
ttl=TTL)
# Test action
self.rescheduler.run()
# Test assertions
self.assertLess((datetime.now() - start).seconds,
TTL * TIME_DIFF_MARGIN_PRECENT)