Towards non-locking model: make 'with-items' work w/o locks
* Fixed 'with-items' to work in non-locking mode. 'concurrency' task property has been temporarily dropped since its current implementation does not fit into transactional model. It needs to be re-implemented based on atomic DB reads/writes of a number of currently running actions which is now impossible to do with Json field 'runtime_context', most likely a new DB field is needed for this. TODO: * Fix 'concurrency' * Fix reverse workflows to work in non-locking mode * Remove locks Partially implements: blueprint mistral-non-locking-tx-model Change-Id: I74bb252533ba4742eb3c7bde73e62ed61ed244bd
This commit is contained in:
parent
9f236248c1
commit
297fe921e1
@ -49,7 +49,7 @@ def on_action_complete(action_ex, result):
|
||||
return
|
||||
|
||||
if task_ex:
|
||||
task_handler.on_action_complete(action_ex)
|
||||
task_handler.schedule_on_action_complete(action_ex)
|
||||
|
||||
|
||||
def _build_action(action_ex):
|
||||
|
@ -20,6 +20,7 @@ from osprofiler import profiler
|
||||
import traceback as tb
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models
|
||||
from mistral.engine import tasks
|
||||
from mistral.engine import workflow_handler as wf_handler
|
||||
from mistral import exceptions as exc
|
||||
@ -38,6 +39,10 @@ _CHECK_TASK_START_ALLOWED_PATH = (
|
||||
'mistral.engine.task_handler._check_task_start_allowed'
|
||||
)
|
||||
|
||||
_SCHEDULED_ON_ACTION_COMPLETE_PATH = (
|
||||
'mistral.engine.task_handler._scheduled_on_action_complete'
|
||||
)
|
||||
|
||||
|
||||
@profiler.trace('task-handler-run-task')
|
||||
def run_task(wf_cmd):
|
||||
@ -74,8 +79,8 @@ def run_task(wf_cmd):
|
||||
wf_handler.schedule_on_task_complete(task.task_ex)
|
||||
|
||||
|
||||
@profiler.trace('task-handler-on-task-complete')
|
||||
def on_action_complete(action_ex):
|
||||
@profiler.trace('task-handler-on-action-complete')
|
||||
def _on_action_complete(action_ex):
|
||||
"""Handles action completion event.
|
||||
|
||||
:param action_ex: Action execution.
|
||||
@ -298,3 +303,47 @@ def _schedule_check_task_start_allowed(task_ex, delay=0):
|
||||
unique_key=key,
|
||||
task_ex_id=task_ex.id
|
||||
)
|
||||
|
||||
|
||||
def _scheduled_on_action_complete(action_ex_id, wf_action):
|
||||
with db_api.transaction():
|
||||
if wf_action:
|
||||
action_ex = db_api.get_workflow_execution(action_ex_id)
|
||||
else:
|
||||
action_ex = db_api.get_action_execution(action_ex_id)
|
||||
|
||||
_on_action_complete(action_ex)
|
||||
|
||||
|
||||
def schedule_on_action_complete(action_ex, delay=0):
|
||||
"""Schedules task completion check.
|
||||
|
||||
This method provides transactional decoupling of action completion from
|
||||
task completion check. It's needed in non-locking model in order to
|
||||
avoid 'phantom read' phenomena when reading state of multiple actions
|
||||
to see if a task is completed. Just starting a separate transaction
|
||||
without using scheduler is not safe due to concurrency window that we'll
|
||||
have in this case (time between transactions) whereas scheduler is a
|
||||
special component that is designed to be resistant to failures.
|
||||
|
||||
:param action_ex: Action execution.
|
||||
:param delay: Minimum amount of time before task completion check
|
||||
should be made.
|
||||
"""
|
||||
|
||||
# Optimization to avoid opening a new transaction if it's not needed.
|
||||
if not action_ex.task_execution.spec.get('with-items'):
|
||||
_on_action_complete(action_ex)
|
||||
|
||||
return
|
||||
|
||||
key = 'th_on_a_c-%s' % action_ex.task_execution_id
|
||||
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
_SCHEDULED_ON_ACTION_COMPLETE_PATH,
|
||||
delay,
|
||||
unique_key=key,
|
||||
action_ex_id=action_ex.id,
|
||||
wf_action=isinstance(action_ex, models.WorkflowExecution)
|
||||
)
|
||||
|
@ -239,7 +239,7 @@ class RegularTask(Task):
|
||||
Takes care of processing regular tasks with one action.
|
||||
"""
|
||||
|
||||
@profiler.trace('task-on-action-complete')
|
||||
@profiler.trace('regular-task-on-action-complete')
|
||||
def on_action_complete(self, action_ex):
|
||||
state = action_ex.state
|
||||
# TODO(rakhmerov): Here we can define more informative messages
|
||||
@ -379,18 +379,21 @@ class RegularTask(Task):
|
||||
return actions.PythonAction(action_def, task_ex=self.task_ex)
|
||||
|
||||
|
||||
# TODO(rakhmerov): Concurrency support is currently dropped since it doesn't
|
||||
# fit into non-locking transactional model. It needs to be restored later on.
|
||||
# A possible solution should be able to read and write a number of currently
|
||||
# running actions atomically which is now impossible w/o locks with JSON
|
||||
# field "runtime_context".
|
||||
class WithItemsTask(RegularTask):
|
||||
"""With-items task.
|
||||
|
||||
Takes care of processing "with-items" tasks.
|
||||
"""
|
||||
|
||||
@profiler.trace('task-on-action-complete')
|
||||
@profiler.trace('with-items-task-on-action-complete')
|
||||
def on_action_complete(self, action_ex):
|
||||
assert self.task_ex
|
||||
|
||||
state = action_ex.state
|
||||
|
||||
# TODO(rakhmerov): Here we can define more informative messages
|
||||
# cases when action is successful and when it's not. For example,
|
||||
# in state_info we can specify the cause action.
|
||||
|
@ -28,7 +28,7 @@ from mistral.workflow import states
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_ON_TASK_COMPLETE_PATH = 'mistral.engine.workflow_handler.on_task_complete'
|
||||
_ON_TASK_COMPLETE_PATH = 'mistral.engine.workflow_handler._on_task_complete'
|
||||
|
||||
|
||||
@profiler.trace('workflow-handler-start-workflow')
|
||||
@ -75,7 +75,7 @@ def cancel_workflow(wf_ex, msg=None):
|
||||
|
||||
|
||||
@profiler.trace('workflow-handler-on-task-complete')
|
||||
def on_task_complete(task_ex_id):
|
||||
def _on_task_complete(task_ex_id):
|
||||
# Note: This method can only be called via scheduler.
|
||||
with db_api.transaction():
|
||||
task_ex = db_api.get_task_execution(task_ex_id)
|
||||
|
@ -15,6 +15,7 @@
|
||||
import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
import testtools
|
||||
|
||||
from mistral.actions import std_actions
|
||||
from mistral.db.v2 import api as db_api
|
||||
@ -578,6 +579,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
|
||||
|
||||
self.assertEqual(1, len(task_2_action_exs))
|
||||
|
||||
@testtools.skip('Restore concurrency support.')
|
||||
@mock.patch.object(
|
||||
std_actions.EchoAction,
|
||||
'run',
|
||||
@ -1028,7 +1030,8 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
|
||||
self.assertEqual(states.ERROR, task_1_ex.state)
|
||||
self.await_task_error(task_1_ex.id)
|
||||
|
||||
self.assertIsNotNone(task_1_ex.state_info)
|
||||
|
||||
task_1_action_exs = db_api.get_action_executions(
|
||||
@ -1038,9 +1041,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
|
||||
self.assertEqual(3, len(task_1_action_exs))
|
||||
|
||||
# Resume workflow and re-run failed task. Re-run #1 with no reset.
|
||||
self.engine.rerun_workflow(task_1_ex.id, reset=False)
|
||||
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
wf_ex = self.engine.rerun_workflow(task_1_ex.id, reset=False)
|
||||
|
||||
self.assertEqual(states.RUNNING, wf_ex.state)
|
||||
self.assertIsNone(wf_ex.state_info)
|
||||
|
@ -14,6 +14,7 @@
|
||||
|
||||
import copy
|
||||
from oslo_config import cfg
|
||||
import testtools
|
||||
|
||||
from mistral.actions import base as action_base
|
||||
from mistral.db.v2 import api as db_api
|
||||
@ -602,6 +603,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
|
||||
self.assertIn(task1_ex.published['result'], ['Guy'])
|
||||
|
||||
@testtools.skip('Restore concurrency support.')
|
||||
def test_with_items_concurrency_1(self):
|
||||
wf_with_concurrency_1 = """---
|
||||
version: "2.0"
|
||||
@ -681,6 +683,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_ex.state)
|
||||
|
||||
@testtools.skip('Restore concurrency support.')
|
||||
def test_with_items_concurrency_yaql(self):
|
||||
wf_with_concurrency_yaql = """---
|
||||
version: "2.0"
|
||||
@ -728,6 +731,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
self.assertIn('Ivan', result)
|
||||
self.assertIn('Mistral', result)
|
||||
|
||||
@testtools.skip('Restore concurrency support.')
|
||||
def test_with_items_concurrency_yaql_wrong_type(self):
|
||||
wf_with_concurrency_yaql = """---
|
||||
version: "2.0"
|
||||
@ -760,6 +764,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
)
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
|
||||
@testtools.skip('Restore concurrency support.')
|
||||
def test_with_items_concurrency_2(self):
|
||||
wf_with_concurrency_2 = """---
|
||||
version: "2.0"
|
||||
@ -848,6 +853,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
|
||||
self.assertEqual(states.SUCCESS, task_ex.state)
|
||||
|
||||
@testtools.skip('Restore concurrency support.')
|
||||
def test_with_items_concurrency_2_fail(self):
|
||||
wf_with_concurrency_2_fail = """---
|
||||
version: "2.0"
|
||||
@ -888,6 +894,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
|
||||
self.assertEqual('With-items failed', result)
|
||||
|
||||
@testtools.skip('Restore concurrency support.')
|
||||
def test_with_items_concurrency_3(self):
|
||||
wf_with_concurrency_3 = """---
|
||||
version: "2.0"
|
||||
@ -964,6 +971,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
self.assertIn('Ivan', result)
|
||||
self.assertIn('Mistral', result)
|
||||
|
||||
@testtools.skip('Restore concurrency support.')
|
||||
def test_with_items_concurrency_gt_list_length(self):
|
||||
wf_definition = """---
|
||||
version: "2.0"
|
||||
@ -1046,6 +1054,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
self.assertEqual(9, len(task1_ex.executions))
|
||||
self._assert_multiple_items(task1_ex.executions, 3, accepted=True)
|
||||
|
||||
@testtools.skip('Restore concurrency support.')
|
||||
def test_with_items_retry_policy_concurrency(self):
|
||||
wf_text = """---
|
||||
version: "2.0"
|
||||
@ -1186,6 +1195,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
self.assertIn(3, result_task2)
|
||||
self.assertIn(4, result_task2)
|
||||
|
||||
@testtools.skip('Restore concurrency support.')
|
||||
def test_with_items_subflow_concurrency_gt_list_length(self):
|
||||
wb_text = """---
|
||||
version: "2.0"
|
||||
|
@ -54,4 +54,7 @@ class WithItemsTest(base.BaseTest):
|
||||
# Then call get_indices and expect [2, 3, 4].
|
||||
indices = with_items.get_indices_for_loop(task_ex)
|
||||
|
||||
self.assertListEqual([2, 3, 4], indices)
|
||||
# TODO(rakhmerov): Restore concurrency support.
|
||||
# With disabled 'concurrency' support we expect indices 2,3,4,5
|
||||
# because overall count is 6 and two indices were already processed.
|
||||
self.assertListEqual([2, 3, 4, 5], indices)
|
||||
|
@ -107,7 +107,10 @@ def _get_unaccepted_executions(task_ex):
|
||||
|
||||
|
||||
def get_indices_for_loop(task_ex):
|
||||
capacity = _get_context(task_ex)[_CAPACITY]
|
||||
# TODO(rakhmerov): For now we assume that capacity is unlimited.
|
||||
# TODO(rakhmerov): We need to re-implement 'concurrency' completely.
|
||||
# capacity = _get_context(task_ex)[_CAPACITY]
|
||||
capacity = get_concurrency(task_ex)
|
||||
count = get_count(task_ex)
|
||||
|
||||
accepted = _get_with_item_indices(_get_accepted_executions(task_ex))
|
||||
|
Loading…
Reference in New Issue
Block a user