Merge "Fix 'with-items' task completion condition"
This commit is contained in:
@@ -429,20 +429,24 @@ class WithItemsTask(RegularTask):
|
||||
def on_action_complete(self, action_ex):
|
||||
assert self.task_ex
|
||||
|
||||
if not self._get_concurrency():
|
||||
self._on_action_complete()
|
||||
if (not self._get_concurrency() and
|
||||
not self.task_spec.get_policies().get_retry()):
|
||||
self._on_action_complete(action_ex)
|
||||
else:
|
||||
# If we need to control 'concurrency' we need to do atomic
|
||||
# reads/writes to task runtime context. Locking prevents us
|
||||
# from modifying runtime context simultaneously by multiple
|
||||
# transactions.
|
||||
with db_api.named_lock('with-items-%s' % self.task_ex.id):
|
||||
self._on_action_complete()
|
||||
self._on_action_complete(action_ex)
|
||||
|
||||
def _on_action_complete(self, action_ex):
|
||||
if self.is_completed():
|
||||
return
|
||||
|
||||
def _on_action_complete(self):
|
||||
self._increase_capacity()
|
||||
|
||||
if self.is_completed():
|
||||
if self.is_with_items_completed():
|
||||
state = self._get_final_state()
|
||||
|
||||
# TODO(rakhmerov): Here we can define more informative messages
|
||||
@@ -464,6 +468,11 @@ class WithItemsTask(RegularTask):
|
||||
if self._has_more_iterations() and self._get_concurrency():
|
||||
self._schedule_actions()
|
||||
|
||||
def _reset_with_items_capacity(self):
|
||||
ctx = self._get_with_items_context()
|
||||
|
||||
ctx[self._CAPACITY] = self._get_concurrency()
|
||||
|
||||
def _schedule_actions(self):
|
||||
with_items_values = self._get_with_items_values()
|
||||
|
||||
@@ -580,7 +589,7 @@ class WithItemsTask(RegularTask):
|
||||
def _get_concurrency(self):
|
||||
return self.task_ex.runtime_context.get(self._CONCURRENCY)
|
||||
|
||||
def is_completed(self):
|
||||
def is_with_items_completed(self):
|
||||
find_cancelled = lambda x: x.accepted and x.state == states.CANCELLED
|
||||
|
||||
if list(filter(find_cancelled, self.task_ex.executions)):
|
||||
@@ -589,11 +598,23 @@ class WithItemsTask(RegularTask):
|
||||
execs = list(filter(lambda t: t.accepted, self.task_ex.executions))
|
||||
count = self._get_with_items_count() or 1
|
||||
|
||||
return count == len(execs)
|
||||
# We need to make sure that method on_action_complete() has been
|
||||
# called for every action. Just looking at number of actions and
|
||||
# their 'accepted' flag is not enough because action gets accepted
|
||||
# before on_action_complete() is called for it. This call is
|
||||
# mandatory in order to do all needed processing from task
|
||||
# perspective. So we can simply check if capacity is fully reset
|
||||
# to its initial state.
|
||||
full_capacity = (
|
||||
not self._get_concurrency() or
|
||||
self._get_with_items_capacity() == self._get_concurrency()
|
||||
)
|
||||
|
||||
return count == len(execs) and full_capacity
|
||||
|
||||
def _get_final_state(self):
|
||||
find_error = lambda x: x.accepted and x.state == states.ERROR
|
||||
find_cancelled = lambda x: x.accepted and x.state == states.CANCELLED
|
||||
find_error = lambda x: x.accepted and x.state == states.ERROR
|
||||
|
||||
if list(filter(find_cancelled, self.task_ex.executions)):
|
||||
return states.CANCELLED
|
||||
@@ -662,6 +683,7 @@ class WithItemsTask(RegularTask):
|
||||
self.task_ex.runtime_context.update({self._WITH_ITEMS: ctx})
|
||||
|
||||
def _decrease_capacity(self, count):
|
||||
# TODO(rakhmerov): create generic method for accessing context values
|
||||
ctx = self._get_with_items_context()
|
||||
|
||||
capacity = ctx[self._CAPACITY]
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
import copy
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
import testtools
|
||||
|
||||
from mistral.actions import base as action_base
|
||||
from mistral.actions import std_actions
|
||||
@@ -1098,8 +1097,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
self.assertEqual(12, len(task1_executions))
|
||||
self._assert_multiple_items(task1_executions, 3, accepted=True)
|
||||
|
||||
@testtools.skip('Repair with-items concurrency')
|
||||
def test_with_items_retry_policy_concurrency(self):
|
||||
def test_with_items_concurrency_retry_policy(self):
|
||||
wf_text = """---
|
||||
version: "2.0"
|
||||
|
||||
@@ -1139,7 +1137,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
|
||||
task1_execs = task1_ex.executions
|
||||
|
||||
self.assertEqual(12, len(task1_execs))
|
||||
self.assertEqual(16, len(task1_execs))
|
||||
self._assert_multiple_items(task1_execs, 4, accepted=True)
|
||||
|
||||
def test_with_items_env(self):
|
||||
|
||||
Reference in New Issue
Block a user