diff --git a/karbor/db/api.py b/karbor/db/api.py index 873e2d4e..e9d487f1 100644 --- a/karbor/db/api.py +++ b/karbor/db/api.py @@ -192,6 +192,25 @@ def trigger_get_all_by_filters_sort(context, filters, limit=None, ################### +def trigger_execution_create(context, trigger_id, time): + return IMPL.trigger_execution_create(context, trigger_id, time) + + +def trigger_execution_get_next(context): + return IMPL.trigger_execution_get_next(context) + + +def trigger_execution_delete(context, id, trigger_id): + return IMPL.trigger_execution_delete(context, id, trigger_id) + + +def trigger_execution_update(context, id, current_time, new_time): + return IMPL.trigger_execution_update(context, id, current_time, new_time) + + +################### + + def scheduled_operation_get(context, id, columns_to_join=[]): """Get a scheduled operation by its id. diff --git a/karbor/db/sqlalchemy/api.py b/karbor/db/sqlalchemy/api.py index cbaa74fa..e59cf8a8 100644 --- a/karbor/db/sqlalchemy/api.py +++ b/karbor/db/sqlalchemy/api.py @@ -445,6 +445,109 @@ def trigger_get_all_by_filters_sort(context, filters, limit=None, marker=None, ################### +def _trigger_execution_list_query(context, session, **kwargs): + return model_query(context, models.TriggerExecution, session=session) + + +def _trigger_execution_list_process_filters(query, filters): + exact_match_filter_names = ['id', 'trigger_id', 'execution_time'] + query = _list_common_process_exact_filter(models.Trigger, query, filters, + exact_match_filter_names) + return query + + +def _trigger_execution_get(context, id, session=None): + result = model_query(context, models.TriggerExecution, + session=session).filter_by(id=id) + result = result.first() + + if not result: + raise exception.TriggerNotFound(id=id) + + return result + + +def trigger_execution_update(context, id, old_time, new_time): + session = get_session() + try: + with session.begin(): + result = model_query( + context, models.TriggerExecution, session=session + ).filter_by( + id=id, execution_time=old_time + ).update({"execution_time": new_time}) + except Exception as e: + LOG.warning("Unable to update trigger execution (%(execution)s): " + "%(exc)s", + {"execution": id, "exc": e}) + return False + else: + LOG.debug("Updated trigger execution (%(execution)s) from %(old_time)s" + " to %(new_time)s", + {"execution": id, "old_time": old_time, "new_time": new_time} + ) + return result == 1 + + +def trigger_execution_create(context, trigger_id, time): + trigger_ex_ref = models.TriggerExecution() + trigger_ex_ref.update({ + 'id': uuidutils.generate_uuid(), + 'trigger_id': trigger_id, + 'execution_time': time, + }) + trigger_ex_ref.save(get_session()) + return trigger_ex_ref + + +def trigger_execution_delete(context, id, trigger_id): + filters = {} + if id: + filters['id'] = id + if trigger_id: + filters['trigger_id'] = trigger_id + + session = get_session() + try: + with session.begin(): + deleted = model_query( + context, models.TriggerExecution, session=session + ).filter_by(**filters).delete() + except Exception as e: + LOG.warning("Unable to delete trigger (%(trigger)s) execution " + "(%(execution)s): %(exc)s", + {"trigger": trigger_id, "execution": id, "exc": e}) + return False + else: + LOG.debug("Deleted trigger (%(trigger)s) execution (%(execution)s)", + {"trigger": trigger_id, "execution": id}) + return deleted == 1 + + +def trigger_execution_get_next(context): + session = get_session() + try: + with session.begin(): + query = _generate_paginate_query( + context, session, + marker=None, + limit=1, + sort_keys=('execution_time', ), + sort_dirs=('asc', ), + filters=None, + paginate_type=models.TriggerExecution, + ) + result = query.first() + except Exception as e: + LOG.warning("Unable to get next trigger execution %s", e) + return None + else: + return result + + +################### + + def scheduled_operation_get(context, id, columns_to_join=[]): return _scheduled_operation_get(context, id, columns_to_join=columns_to_join) @@ -1484,6 +1587,9 @@ PAGINATION_HELPERS = { _restore_get), models.Trigger: (_trigger_list_query, _trigger_list_process_filters, _trigger_get), + models.TriggerExecution: (_trigger_execution_list_query, + _trigger_execution_list_process_filters, + _trigger_execution_get), models.ScheduledOperation: (_scheduled_operation_list_query, _scheduled_operation_list_process_filters, _scheduled_operation_get), diff --git a/karbor/db/sqlalchemy/migrate_repo/versions/001_karbor_init.py b/karbor/db/sqlalchemy/migrate_repo/versions/001_karbor_init.py index f9774af1..1501efa3 100644 --- a/karbor/db/sqlalchemy/migrate_repo/versions/001_karbor_init.py +++ b/karbor/db/sqlalchemy/migrate_repo/versions/001_karbor_init.py @@ -121,6 +121,19 @@ def define_tables(meta): mysql_engine='InnoDB' ) + trigger_executions = Table( + 'trigger_executions', meta, + Column('created_at', DateTime), + Column('updated_at', DateTime), + Column('deleted_at', DateTime), + Column('deleted', Boolean, nullable=False), + Column('id', String(length=36), primary_key=True, nullable=False), + Column('trigger_id', String(length=36), unique=True, nullable=False, + index=True), + Column('execution_time', DateTime, nullable=False, index=True), + mysql_engine='InnoDB' + ) + scheduled_operations = Table( 'scheduled_operations', meta, Column('created_at', DateTime), @@ -206,6 +219,7 @@ def define_tables(meta): restores, operation_logs, triggers, + trigger_executions, scheduled_operations, scheduled_operation_states, scheduled_operation_logs, diff --git a/karbor/db/sqlalchemy/models.py b/karbor/db/sqlalchemy/models.py index 66efb30f..05d4588d 100644 --- a/karbor/db/sqlalchemy/models.py +++ b/karbor/db/sqlalchemy/models.py @@ -74,6 +74,16 @@ class Trigger(BASE, KarborBase): properties = Column(Text, nullable=False) +class TriggerExecution(BASE, KarborBase): + """Represents a future trigger execition""" + + __tablename__ = 'trigger_executions' + + id = Column(String(36), primary_key=True, nullable=False) + trigger_id = Column(String(36), unique=True, nullable=False, index=True) + execution_time = Column(DateTime, nullable=False, index=True) + + class ScheduledOperation(BASE, KarborBase): """Represents a scheduled operation.""" @@ -235,6 +245,7 @@ def register_models(): Plan, Resource, Trigger, + TriggerExecution, ScheduledOperation, ScheduledOperationState, ScheduledOperationLog, diff --git a/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py b/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py index 81218e01..5729fc61 100644 --- a/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py +++ b/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py @@ -12,14 +12,15 @@ from datetime import datetime from datetime import timedelta -import eventlet -import functools from oslo_config import cfg from oslo_log import log as logging +from oslo_service import loopingcall from oslo_utils import timeutils import six from stevedore import driver as import_driver +from karbor import context as karbor_context +from karbor import db from karbor import exception from karbor.i18n import _ from karbor.services.operationengine.engine import triggers @@ -41,7 +42,11 @@ time_trigger_opts = [ cfg.StrOpt('time_format', default='calendar', choices=['crontab', 'calendar'], - help='The type of time format which is used to compute time') + help='The type of time format which is used to compute time'), + cfg.IntOpt('trigger_poll_interval', + default=15, + help='Interval, in seconds, in which Karbor will poll for ' + 'trigger events'), ] CONF = cfg.CONF @@ -49,67 +54,10 @@ CONF.register_opts(time_trigger_opts) LOG = logging.getLogger(__name__) -class TriggerOperationGreenThread(object): - def __init__(self, first_run_time, function): - super(TriggerOperationGreenThread, self).__init__() - self._is_sleeping = True - self._pre_run_time = None - self._running = False - self._thread = None - - self._function = function - - self._start(first_run_time) - - def kill(self): - self._running = False - if self._is_sleeping: - self._thread.kill() - - @property - def running(self): - return self._running - - @property - def pre_run_time(self): - return self._pre_run_time - - def _start(self, first_run_time): - self._running = True - - now = datetime.utcnow() - initial_delay = 0 if first_run_time <= now else ( - int(timeutils.delta_seconds(now, first_run_time))) - - self._thread = eventlet.spawn_after( - initial_delay, self._run, first_run_time) - self._thread.link(self._on_done) - - def _on_done(self, gt, *args, **kwargs): - self._is_sleeping = True - self._pre_run_time = None - self._running = False - self._thread = None - - def _run(self, expect_run_time): - while self._running: - self._is_sleeping = False - self._pre_run_time = expect_run_time - - expect_run_time = self._function(expect_run_time) - if expect_run_time is None or not self._running: - break - - self._is_sleeping = True - - now = datetime.utcnow() - idle_time = 0 if expect_run_time <= now else int( - timeutils.delta_seconds(now, expect_run_time)) - eventlet.sleep(idle_time) - - class TimeTrigger(triggers.BaseTrigger): TRIGGER_TYPE = "time" + _loopingcall = None + _triggers = {} def __init__(self, trigger_id, trigger_property, executor): super(TimeTrigger, self).__init__( @@ -118,30 +66,136 @@ class TimeTrigger(triggers.BaseTrigger): self._trigger_property = self.check_trigger_definition( trigger_property) - self._greenthread = None + timer = self._get_timer(self._trigger_property) + first_run_time = self._compute_next_run_time( + datetime.utcnow(), self._trigger_property['end_time'], timer) + LOG.debug("first_run_time: %s", first_run_time) + + self._trigger_execution_new(self._id, first_run_time) + + if not self.__class__._loopingcall: + self.__class__._loopingcall = loopingcall.FixedIntervalLoopingCall( + self._loop) + self.__class__._loopingcall.start( + interval=CONF.trigger_poll_interval, + stop_on_exception=False, + ) + + self._register() + + def _register(self): + self.__class__._triggers[self._id] = self + + def _unregister(self): + del self.__class__._triggers[self._id] + + @classmethod + def _loop(cls): + while True: + now = datetime.utcnow() + exec_to_handle = cls._trigger_execution_get_next() + if not exec_to_handle: + LOG.debug("No next trigger executions") + break + + trigger_id = exec_to_handle.trigger_id + execution_time = exec_to_handle.execution_time + trigger = cls._triggers.get(trigger_id) + if not trigger: + LOG.warning("Unable to find trigger %s", trigger_id) + res = cls._trigger_execution_delete( + execution_id=exec_to_handle.id) + continue + + if now < execution_time: + LOG.debug("Time trigger not yet due") + break + + trigger_property = trigger._trigger_property + timer = cls._get_timer(trigger_property) + window = trigger_property.get("window") + end_time_to_run = execution_time + timedelta( + seconds=window) + + if now > end_time_to_run: + LOG.debug("Time trigger (%s) out of window",) + execute = False + else: + LOG.debug("Time trigger (%s) is due", trigger_id) + execute = True + + next_exec_time = cls._compute_next_run_time( + now, + trigger_property['end_time'], + timer, + ) + res = False + if not next_exec_time: + LOG.debug("No more planned executions for trigger (%s)", + trigger_id) + res = cls._trigger_execution_delete( + execution_id=exec_to_handle.id) + else: + LOG.debug("Rescheduling (%s) from %s to %s", + trigger_id, + execution_time, + next_exec_time) + res = cls._trigger_execution_update( + exec_to_handle.id, + execution_time, + next_exec_time, + ) + + if not res: + LOG.info("Trigger probably handled by another node") + continue + + if execute: + cls._trigger_operations(trigger_id, execution_time, window) + + @classmethod + def _trigger_execution_new(cls, trigger_id, time): + # Find the first time. + # We don't known when using this trigger first time. + ctxt = karbor_context.get_admin_context() + try: + db.trigger_execution_create(ctxt, trigger_id, time) + return True + except Exception: + return False + + @classmethod + def _trigger_execution_update(cls, id, current_time, next_time): + ctxt = karbor_context.get_admin_context() + return db.trigger_execution_update(ctxt, id, current_time, next_time) + + @classmethod + def _trigger_execution_delete(cls, execution_id=None, trigger_id=None): + if execution_id is None and trigger_id is None: + raise exception.InvalidParameterValue('supply at least one id') + + ctxt = karbor_context.get_admin_context() + num_deleted = db.trigger_execution_delete(ctxt, execution_id, + trigger_id) + return num_deleted > 0 + + @classmethod + def _trigger_execution_get_next(cls): + ctxt = karbor_context.get_admin_context() + return db.trigger_execution_get_next(ctxt) def shutdown(self): - self._kill_greenthread() + self._unregister() def register_operation(self, operation_id, **kwargs): if operation_id in self._operation_ids: msg = (_("The operation_id(%s) is exist") % operation_id) raise exception.ScheduledOperationExist(msg) - if self._greenthread and not self._greenthread.running: - raise exception.TriggerIsInvalid(trigger_id=self._id) - self._operation_ids.add(operation_id) - if self._greenthread is None: - self._start_greenthread() def unregister_operation(self, operation_id, **kwargs): - if operation_id not in self._operation_ids: - return - - self._operation_ids.remove(operation_id) - if 0 == len(self._operation_ids): - self._kill_greenthread() + self._operation_ids.discard(operation_id) def update_trigger_property(self, trigger_property): valid_trigger_property = self.check_trigger_definition( @@ -150,82 +204,38 @@ class TimeTrigger(triggers.BaseTrigger): if valid_trigger_property == self._trigger_property: return - timer, first_run_time = self._get_timer_and_first_run_time( - valid_trigger_property) + timer = self._get_timer(valid_trigger_property) + first_run_time = self._compute_next_run_time( + datetime.utcnow(), valid_trigger_property['end_time'], timer) + if not first_run_time: msg = (_("The new trigger property is invalid, " "Can not find the first run time")) raise exception.InvalidInput(msg) - if self._greenthread is not None: - pre_run_time = self._greenthread.pre_run_time - if pre_run_time: - end_time = pre_run_time + timedelta( - seconds=self._trigger_property['window']) - if first_run_time <= end_time: - msg = (_("The new trigger property is invalid, " - "First run time%(t1)s must be after %(t2)s") % - {'t1': first_run_time, 't2': end_time}) - raise exception.InvalidInput(msg) - self._trigger_property = valid_trigger_property + self._trigger_execution_delete(trigger_id=self._id) + self._trigger_execution_new(self._id, first_run_time) - if len(self._operation_ids) > 0: - # Restart greenthread to take the change of trigger property - # effect immediately - self._kill_greenthread() - self._create_green_thread(first_run_time, timer) + @classmethod + def _trigger_operations(cls, trigger_id, expect_run_time, window): + """Trigger operations once""" - def _kill_greenthread(self): - if self._greenthread: - self._greenthread.kill() - self._greenthread = None - - def _start_greenthread(self): - # Find the first time. - # We don't known when using this trigger first time. - timer, first_run_time = self._get_timer_and_first_run_time( - self._trigger_property) - if not first_run_time: - raise exception.TriggerIsInvalid(trigger_id=self._id) - - self._create_green_thread(first_run_time, timer) - - def _create_green_thread(self, first_run_time, timer): - func = functools.partial( - self._trigger_operations, - trigger_property=self._trigger_property.copy(), - timer=timer) - - self._greenthread = TriggerOperationGreenThread( - first_run_time, func) - - def _trigger_operations(self, expect_run_time, trigger_property, timer): - """Trigger operations once - - returns: wait time for next run - """ - - # Just for robustness, actually expect_run_time always <= now - # but, if the scheduling of eventlet is not accurate, then we - # can do some adjustments. - entry_time = datetime.utcnow() - if entry_time < expect_run_time and ( - int(timeutils.delta_seconds(entry_time, expect_run_time)) > 0): - return expect_run_time - - # The self._executor.execute_operation may have I/O operation. + # The executor execute_operation may have I/O operation. # If it is, this green thread will be switched out during looping # operation_ids. In order to avoid changing self._operation_ids # during the green thread is switched out, copy self._operation_ids # as the iterative object. - operation_ids = self._operation_ids.copy() + trigger = cls._triggers.get(trigger_id) + if not trigger: + LOG.warning("Can't find trigger: %s" % trigger_id) + return + operations_ids = trigger._operation_ids.copy() sent_ops = set() - window = trigger_property.get("window") end_time = expect_run_time + timedelta(seconds=window) - for operation_id in operation_ids: - if operation_id not in self._operation_ids: + for operation_id in operations_ids: + if operation_id not in trigger._operation_ids: # Maybe, when traversing this operation_id, it has been # removed by self.unregister_operation LOG.warning("Execute operation %s which is not exist, " @@ -236,15 +246,13 @@ class TimeTrigger(triggers.BaseTrigger): if now >= end_time: LOG.error("Can not trigger operations to run. Because it is " "out of window time. now=%(now)s, " - "end time=%(end_time)s, expect run time=%(expect)s," - " wating operations=%(ops)s", + "end time=%(end_time)s, waiting operations=%(ops)s", {'now': now, 'end_time': end_time, - 'expect': expect_run_time, - 'ops': operation_ids - sent_ops}) + 'ops': operations_ids - sent_ops}) break try: - self._executor.execute_operation( + trigger._executor.execute_operation( operation_id, now, expect_run_time, window) except Exception: LOG.exception("Submit operation to executor failed, operation" @@ -252,18 +260,6 @@ class TimeTrigger(triggers.BaseTrigger): sent_ops.add(operation_id) - next_time = self._compute_next_run_time( - expect_run_time, trigger_property['end_time'], timer) - now = datetime.utcnow() - if next_time and next_time <= now: - LOG.error("Next run time:%(next_time)s <= now:%(now)s. Maybe the " - "entry time=%(entry)s is too late, even exceeds the end" - " time of window=%(end)s, or it was blocked where " - "sending the operation to executor.", - {'next_time': next_time, 'now': now, - 'entry': entry_time, 'end': end_time}) - return next_time - @classmethod def check_trigger_definition(cls, trigger_definition): """Check trigger definition @@ -350,14 +346,11 @@ class TimeTrigger(triggers.BaseTrigger): CONF.time_format).driver @classmethod - def _get_timer_and_first_run_time(cls, trigger_property): + def _get_timer(cls, trigger_property): tf_cls = cls._get_time_format_class() timer = tf_cls(trigger_property['start_time'], trigger_property['pattern']) - first_run_time = cls._compute_next_run_time( - datetime.utcnow(), trigger_property['end_time'], timer) - - return timer, first_run_time + return timer @classmethod def check_configuration(cls): diff --git a/karbor/services/operationengine/manager.py b/karbor/services/operationengine/manager.py index c1b5029e..949cf904 100644 --- a/karbor/services/operationengine/manager.py +++ b/karbor/services/operationengine/manager.py @@ -221,15 +221,19 @@ class OperationEngineManager(manager.Manager): @messaging.expected_exceptions(exception.InvalidInput) def create_trigger(self, context, trigger): + LOG.debug('Creating trigger (id: "%s" type: "%s")', + trigger.id, trigger.type) self.trigger_manager.add_trigger(trigger.id, trigger.type, trigger.properties) @messaging.expected_exceptions(exception.TriggerNotFound, exception.DeleteTriggerNotAllowed) def delete_trigger(self, context, trigger_id): + LOG.debug('Deleting trigger (id: "%s")', trigger_id) self.trigger_manager.remove_trigger(trigger_id) @messaging.expected_exceptions(exception.TriggerNotFound, exception.InvalidInput) def update_trigger(self, context, trigger): + LOG.debug('Updating trigger (id: "%s")', trigger.id) self.trigger_manager.update_trigger(trigger.id, trigger.properties) diff --git a/karbor/services/operationengine/rpcapi.py b/karbor/services/operationengine/rpcapi.py index 42bb21bb..fce29fae 100644 --- a/karbor/services/operationengine/rpcapi.py +++ b/karbor/services/operationengine/rpcapi.py @@ -64,10 +64,13 @@ class OperationEngineAPI(object): trigger_id=trigger_id) def create_trigger(self, ctxt, trigger): - return self._client.call(ctxt, 'create_trigger', trigger=trigger) + self._client.prepare(fanout=True).cast(ctxt, 'create_trigger', + trigger=trigger) def delete_trigger(self, ctxt, trigger_id): - return self._client.call(ctxt, 'delete_trigger', trigger_id=trigger_id) + self._client.prepare(fanout=True).cast(ctxt, 'delete_trigger', + trigger_id=trigger_id) def update_trigger(self, ctxt, trigger): - return self._client.call(ctxt, 'update_trigger', trigger=trigger) + self._client.prepare(fanout=True).cast(ctxt, 'update_trigger', + trigger=trigger) diff --git a/karbor/tests/fullstack/test_scheduled_operations.py b/karbor/tests/fullstack/test_scheduled_operations.py index 59f4dff9..55e446a1 100644 --- a/karbor/tests/fullstack/test_scheduled_operations.py +++ b/karbor/tests/fullstack/test_scheduled_operations.py @@ -109,12 +109,14 @@ class ScheduledOperationsTest(karbor_base.KarborBaseTest): def test_scheduled_operations_create_and_scheduled(self): freq = 2 + eventlet_grace = 20 pattern = "BEGIN:VEVENT\nRRULE:FREQ=MINUTELY;INTERVAL=5;\nEND:VEVENT" cur_property = {'pattern': pattern, 'format': 'calendar'} operation = self.store(self._create_for_volume(cur_property)) start_time = datetime.now().replace(microsecond=0) sleep_time = self._wait_timestamp(pattern, start_time, freq) + sleep_time += eventlet_grace self.assertNotEqual(0, sleep_time) eventlet.sleep(sleep_time) diff --git a/karbor/tests/fullstack/test_triggers.py b/karbor/tests/fullstack/test_triggers.py index da8c0bdc..5d4393e1 100644 --- a/karbor/tests/fullstack/test_triggers.py +++ b/karbor/tests/fullstack/test_triggers.py @@ -11,6 +11,7 @@ # under the License. +from datetime import datetime from karbor.tests.fullstack import karbor_base from karbor.tests.fullstack import karbor_objects as objects @@ -40,6 +41,29 @@ class TriggersTest(karbor_base.KarborBaseTest): trigger = self.karbor_client.triggers.get(trigger.id) self.assertEqual(trigger_name, trigger.name) + def test_triggers_update(self): + trigger_name = "FullStack Trigger Test Update" + pattern1 = "BEGIN:VEVENT\nRRULE:FREQ=WEEKLY;INTERVAL=1;\nEND:VEVENT" + pattern2 = "BEGIN:VEVENT\nRRULE:FREQ=DAILY;INTERVAL=1;\nEND:VEVENT" + trigger = self.store(objects.Trigger()) + trigger.create('time', {'pattern': pattern1, 'format': 'calendar'}, + name=trigger_name) + properties = { + 'properties': { + 'pattern': pattern2, + 'format': 'calendar', + 'start_time': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'), + } + } + + self.karbor_client.triggers.update( + trigger.id, + properties, + ) + + trigger = self.karbor_client.triggers.get(trigger.id) + self.assertEqual(trigger.properties['pattern'], pattern2) + def test_triggers_delete(self): pattern = "BEGIN:VEVENT\nRRULE:FREQ=WEEKLY;INTERVAL=1;\nEND:VEVENT" trigger = objects.Trigger() diff --git a/karbor/tests/unit/conf_fixture.py b/karbor/tests/unit/conf_fixture.py index 64aa65ad..a34df0e6 100644 --- a/karbor/tests/unit/conf_fixture.py +++ b/karbor/tests/unit/conf_fixture.py @@ -43,3 +43,4 @@ def set_defaults(conf): conf.set_default('username', 'karbor', group='trustee') conf.set_default('password', 'password', group='trustee') conf.set_default('user_domain_id', 'default', group='trustee') + conf.set_default('trigger_poll_interval', 1) diff --git a/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py b/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py index 92d6fc1f..6ed63f1f 100644 --- a/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py +++ b/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py @@ -10,18 +10,27 @@ # License for the specific language governing permissions and limitations # under the License. +from collections import namedtuple from datetime import datetime from datetime import timedelta import eventlet +import functools +import heapq import mock from oslo_config import cfg +from oslo_utils import uuidutils +from karbor import context as karbor_context from karbor import exception -from karbor.services.operationengine.engine.triggers.timetrigger.time_trigger \ - import TimeTrigger +from karbor.services.operationengine.engine.triggers.timetrigger \ + import time_trigger as tt from karbor.tests import base +TriggerExecution = namedtuple('TriggerExecution', + ['execution_time', 'id', 'trigger_id']) + + class FakeTimeFormat(object): def __init__(self, start_time, pattern): super(FakeTimeFormat, self).__init__() @@ -49,30 +58,78 @@ class FakeExecutor(object): self._ops[operation_id] += 1 eventlet.sleep(0.5) - def clear(self): - self._ops.clear() + +class FakeTimeTrigger(object): + @classmethod + def get_time_format(cls, *args, **kwargs): + return FakeTimeFormat + + +class FakeDb(object): + def __init__(self): + self._db = [] + + def trigger_execution_get_next(self, context): + if len(self._db) == 0: + return None + return self._db[0] + + def trigger_execution_create(self, context, trigger_id, time): + element = TriggerExecution(time, uuidutils.generate_uuid(), trigger_id) + heapq.heappush(self._db, element) + + def trigger_execution_update(self, context, id, current_time, new_time): + for idx, element in enumerate(self._db): + if element.id == id: + if element.execution_time != current_time: + return False + self._db[idx] = TriggerExecution(new_time, element.id, + element.trigger_id) + break + heapq.heapify(self._db) + return True + + def trigger_execution_delete(self, context, id, trigger_id): + removed_ids = [] + for idx, element in enumerate(self._db): + if (id and element.id == id) or (trigger_id and + element.trigger_id == trigger_id): + removed_ids.append(idx) + + for idx in reversed(removed_ids): + self._db.pop(idx) + heapq.heapify(self._db) + return len(removed_ids) + + +def time_trigger_test(func): + @functools.wraps(func) + @mock.patch.object(tt, 'db', FakeDb()) + @mock.patch.object(karbor_context, 'get_admin_context', lambda: None) + @mock.patch.object(tt.TimeTrigger, '_get_time_format_class', + FakeTimeTrigger.get_time_format) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + return wrapper class TimeTriggerTestCase(base.TestCase): + _tid = 0 + _default_executor = FakeExecutor() def setUp(self): super(TimeTriggerTestCase, self).setUp() - self._set_configuration() - mock_obj = mock.Mock() - mock_obj.return_value = FakeTimeFormat - TimeTrigger._get_time_format_class = mock_obj - - self._default_executor = FakeExecutor() - def test_check_configuration(self): self._set_configuration(10, 20, 30) self.assertRaisesRegex(exception.InvalidInput, "Configurations of time trigger are invalid", - TimeTrigger.check_configuration) + tt.TimeTrigger.check_configuration) self._set_configuration() + @time_trigger_test def test_check_trigger_property_start_time(self): trigger_property = { "pattern": "", @@ -81,22 +138,23 @@ class TimeTriggerTestCase(base.TestCase): self.assertRaisesRegex(exception.InvalidInput, "The trigger\'s start time is unknown", - TimeTrigger.check_trigger_definition, + tt.TimeTrigger.check_trigger_definition, trigger_property) trigger_property['start_time'] = 'abc' self.assertRaisesRegex(exception.InvalidInput, "The format of trigger .* is not correct", - TimeTrigger.check_trigger_definition, + tt.TimeTrigger.check_trigger_definition, trigger_property) trigger_property['start_time'] = 123 self.assertRaisesRegex(exception.InvalidInput, "The trigger .* is not an instance of string", - TimeTrigger.check_trigger_definition, + tt.TimeTrigger.check_trigger_definition, trigger_property) @mock.patch.object(FakeTimeFormat, 'get_min_interval') + @time_trigger_test def test_check_trigger_property_interval(self, get_min_interval): get_min_interval.return_value = 0 @@ -106,9 +164,10 @@ class TimeTriggerTestCase(base.TestCase): self.assertRaisesRegex(exception.InvalidInput, "The interval of two adjacent time points .*", - TimeTrigger.check_trigger_definition, + tt.TimeTrigger.check_trigger_definition, trigger_property) + @time_trigger_test def test_check_trigger_property_window(self): trigger_property = { "window": "abc", @@ -117,15 +176,16 @@ class TimeTriggerTestCase(base.TestCase): self.assertRaisesRegex(exception.InvalidInput, "The trigger window.* is not integer", - TimeTrigger.check_trigger_definition, + tt.TimeTrigger.check_trigger_definition, trigger_property) trigger_property['window'] = 1000 self.assertRaisesRegex(exception.InvalidInput, "The trigger windows .* must be between .*", - TimeTrigger.check_trigger_definition, + tt.TimeTrigger.check_trigger_definition, trigger_property) + @time_trigger_test def test_check_trigger_property_end_time(self): trigger_property = { "window": 15, @@ -135,27 +195,24 @@ class TimeTriggerTestCase(base.TestCase): self.assertRaisesRegex(exception.InvalidInput, "The format of trigger .* is not correct", - TimeTrigger.check_trigger_definition, + tt.TimeTrigger.check_trigger_definition, trigger_property) + @time_trigger_test def test_register_operation(self): trigger = self._generate_trigger() operation_id = "1" trigger.register_operation(operation_id) - eventlet.sleep(0.3) + eventlet.sleep(2) - self.assertGreaterEqual(trigger._executor._ops[operation_id], 1) + self.assertGreaterEqual(self._default_executor._ops[operation_id], 1) self.assertRaisesRegex(exception.ScheduledOperationExist, "The operation_id.* is exist", trigger.register_operation, operation_id) - eventlet.sleep(0.3) - self.assertRaises(exception.TriggerIsInvalid, - trigger.register_operation, - "2") - + @time_trigger_test def test_unregister_operation(self): trigger = self._generate_trigger() operation_id = "2" @@ -164,26 +221,9 @@ class TimeTriggerTestCase(base.TestCase): self.assertIn(operation_id, trigger._operation_ids) trigger.unregister_operation(operation_id) - self.assertNotIn(operation_id, trigger._operation_ids) - - def test_unregister_operation_when_scheduling(self): - trigger = self._generate_trigger() - - for op_id in ['1', '2', '3']: - trigger.register_operation(op_id) - self.assertIn(op_id, trigger._operation_ids) - eventlet.sleep(0.5) - - for op_id in ['2', '3']: - trigger.unregister_operation(op_id) - self.assertNotIn(op_id, trigger._operation_ids) - eventlet.sleep(0.6) - - self.assertGreaterEqual(trigger._executor._ops['1'], 1) - - self.assertTrue(('2' not in trigger._executor._ops) or ( - '3' not in trigger._executor._ops)) + self.assertNotIn(trigger._id, trigger._operation_ids) + @time_trigger_test def test_update_trigger_property(self): trigger = self._generate_trigger() @@ -199,18 +239,10 @@ class TimeTriggerTestCase(base.TestCase): trigger.update_trigger_property, trigger_property) - trigger.register_operation('1') - eventlet.sleep(0.2) - trigger_property['end_time'] = ( - datetime.utcnow() + timedelta(seconds=1)) - self.assertRaisesRegex(exception.InvalidInput, - ".*First run time.* must be after.*", - trigger.update_trigger_property, - trigger_property) - + @time_trigger_test def test_update_trigger_property_success(self): trigger = self._generate_trigger() - trigger.register_operation('1') + trigger.register_operation('7') eventlet.sleep(0.2) trigger_property = { @@ -221,12 +253,8 @@ class TimeTriggerTestCase(base.TestCase): } with mock.patch.object(FakeTimeFormat, 'compute_next_time') as c: c.return_value = datetime.utcnow() + timedelta(seconds=20) - old_id = id(trigger._greenthread) - trigger.update_trigger_property(trigger_property) - self.assertNotEqual(old_id, id(trigger._greenthread)) - def _generate_trigger(self, end_time=None): if not end_time: end_time = datetime.utcnow() + timedelta(seconds=1) @@ -238,11 +266,15 @@ class TimeTriggerTestCase(base.TestCase): "end_time": end_time } - self._default_executor.clear() - return TimeTrigger("123", trigger_property, self._default_executor) + return tt.TimeTrigger( + uuidutils.generate_uuid(), + trigger_property, + self._default_executor, + ) def _set_configuration(self, min_window=15, - max_window=30, min_interval=60): + max_window=30, min_interval=60, poll_interval=1): self.override_config('min_interval', min_interval) self.override_config('min_window_time', min_window) self.override_config('max_window_time', max_window) + self.override_config('trigger_poll_interval', poll_interval)