diff --git a/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py b/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py index 777ec3d5..5235cd86 100644 --- a/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py +++ b/karbor/services/operationengine/engine/triggers/timetrigger/time_trigger.py @@ -21,7 +21,7 @@ import six from stevedore import driver as import_driver from karbor import exception -from karbor.i18n import _, _LE +from karbor.i18n import _, _LE, _LW from karbor.services.operationengine.engine import triggers time_trigger_opts = [ @@ -212,31 +212,44 @@ class TimeTrigger(triggers.BaseTrigger): int(timeutils.delta_seconds(entry_time, expect_run_time)) > 0): return expect_run_time + # The self._executor.execute_operation may have I/O operation. + # If it is, this green thread will be switched out during looping + # operation_ids. In order to avoid changing self._operation_ids + # during the green thread is switched out, copy self._operation_ids + # as the iterative object. + operation_ids = self._operation_ids.copy() + sent_ops = set() window = trigger_property.get("window") end_time = expect_run_time + timedelta(seconds=window) - if end_time <= entry_time: - LOG.error(_LE("Can not trigger operations to run. " - "Because it is out of end time of window. " - "expect run time=%(expect)s, end time=%(end)s, " - "entry time=%(now)s"), - {'expect': expect_run_time, 'end': end_time, - 'now': entry_time}) - else: - # The self._executor.execute_operation may have I/O operation. - # If it is, this green thread will be switched out during looping - # operation_ids. In order to avoid changing self._operation_ids - # during the green thread is switched out, copy self._operation_ids - # as the iterative object. - operation_ids = self._operation_ids.copy() - for operation_id in operation_ids: - try: - self._executor.execute_operation( - operation_id, entry_time, expect_run_time, window) - except Exception: - LOG.exception(_LE("Submit operation to executor " - "failed, id=%s"), - operation_id) - pass + + for operation_id in operation_ids: + if operation_id not in self._operation_ids: + # Maybe, when traversing this operation_id, it has been + # removed by self.unregister_operation + LOG.warn(_LW("Execuete operation %s which is not exist, " + "ignore it"), operation_id) + continue + + now = datetime.utcnow() + if now >= end_time: + LOG.error(_LE("Can not trigger operations to run. " + "Because it is out of window time. " + "now=%(now)s, end time=%(end_time)s, " + "expect run time=%(expect)s, " + "wating operations=%(ops)s"), + {'now': now, 'end_time': end_time, + 'expect': expect_run_time, + 'ops': operation_ids - sent_ops}) + break + + try: + self._executor.execute_operation( + operation_id, now, expect_run_time, window) + except Exception: + LOG.exception(_LE("Submit operation to executor " + "failed, operation id=%s"), operation_id) + + sent_ops.add(operation_id) next_time = self._compute_next_run_time( expect_run_time, trigger_property['end_time'], timer) diff --git a/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py b/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py index 235eb388..1c3db892 100644 --- a/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py +++ b/karbor/tests/unit/operationengine/engine/triggers/timetrigger/test_time_trigger.py @@ -165,6 +165,24 @@ class TimeTriggerTestCase(base.TestCase): trigger.unregister_operation(operation_id) self.assertNotIn(operation_id, trigger._operation_ids) + def test_unregister_operation_when_scheduling(self): + trigger = self._generate_trigger() + + for op_id in ['1', '2', '3']: + trigger.register_operation(op_id) + self.assertTrue(op_id in trigger._operation_ids) + eventlet.sleep(0.5) + + for op_id in ['2', '3']: + trigger.unregister_operation(op_id) + self.assertTrue(op_id not in trigger._operation_ids) + eventlet.sleep(0.6) + + self.assertTrue(trigger._executor._ops['1'] >= 1) + + self.assertTrue(('2' not in trigger._executor._ops) or ( + '3' not in trigger._executor._ops)) + def test_update_trigger_property(self): trigger = self._generate_trigger()