Merge "Don't send canceled operation to executor"

This commit is contained in:
Jenkins 2016-12-12 02:41:59 +00:00 committed by Gerrit Code Review
commit 42156ef892
2 changed files with 55 additions and 24 deletions

View File

@ -21,7 +21,7 @@ import six
from stevedore import driver as import_driver
from karbor import exception
from karbor.i18n import _, _LE
from karbor.i18n import _, _LE, _LW
from karbor.services.operationengine.engine import triggers
time_trigger_opts = [
@ -212,31 +212,44 @@ class TimeTrigger(triggers.BaseTrigger):
int(timeutils.delta_seconds(entry_time, expect_run_time)) > 0):
return expect_run_time
# The self._executor.execute_operation may have I/O operation.
# If it is, this green thread will be switched out during looping
# operation_ids. In order to avoid changing self._operation_ids
# during the green thread is switched out, copy self._operation_ids
# as the iterative object.
operation_ids = self._operation_ids.copy()
sent_ops = set()
window = trigger_property.get("window")
end_time = expect_run_time + timedelta(seconds=window)
if end_time <= entry_time:
LOG.error(_LE("Can not trigger operations to run. "
"Because it is out of end time of window. "
"expect run time=%(expect)s, end time=%(end)s, "
"entry time=%(now)s"),
{'expect': expect_run_time, 'end': end_time,
'now': entry_time})
else:
# The self._executor.execute_operation may have I/O operation.
# If it is, this green thread will be switched out during looping
# operation_ids. In order to avoid changing self._operation_ids
# during the green thread is switched out, copy self._operation_ids
# as the iterative object.
operation_ids = self._operation_ids.copy()
for operation_id in operation_ids:
try:
self._executor.execute_operation(
operation_id, entry_time, expect_run_time, window)
except Exception:
LOG.exception(_LE("Submit operation to executor "
"failed, id=%s"),
operation_id)
pass
for operation_id in operation_ids:
if operation_id not in self._operation_ids:
# Maybe, when traversing this operation_id, it has been
# removed by self.unregister_operation
LOG.warn(_LW("Execuete operation %s which is not exist, "
"ignore it"), operation_id)
continue
now = datetime.utcnow()
if now >= end_time:
LOG.error(_LE("Can not trigger operations to run. "
"Because it is out of window time. "
"now=%(now)s, end time=%(end_time)s, "
"expect run time=%(expect)s, "
"wating operations=%(ops)s"),
{'now': now, 'end_time': end_time,
'expect': expect_run_time,
'ops': operation_ids - sent_ops})
break
try:
self._executor.execute_operation(
operation_id, now, expect_run_time, window)
except Exception:
LOG.exception(_LE("Submit operation to executor "
"failed, operation id=%s"), operation_id)
sent_ops.add(operation_id)
next_time = self._compute_next_run_time(
expect_run_time, trigger_property['end_time'], timer)

View File

@ -165,6 +165,24 @@ class TimeTriggerTestCase(base.TestCase):
trigger.unregister_operation(operation_id)
self.assertNotIn(operation_id, trigger._operation_ids)
def test_unregister_operation_when_scheduling(self):
trigger = self._generate_trigger()
for op_id in ['1', '2', '3']:
trigger.register_operation(op_id)
self.assertTrue(op_id in trigger._operation_ids)
eventlet.sleep(0.5)
for op_id in ['2', '3']:
trigger.unregister_operation(op_id)
self.assertTrue(op_id not in trigger._operation_ids)
eventlet.sleep(0.6)
self.assertTrue(trigger._executor._ops['1'] >= 1)
self.assertTrue(('2' not in trigger._executor._ops) or (
'3' not in trigger._executor._ops))
def test_update_trigger_property(self):
trigger = self._generate_trigger()