diff --git a/mistral/engine/action_queue.py b/mistral/engine/action_queue.py new file mode 100644 index 00000000..8f08f90e --- /dev/null +++ b/mistral/engine/action_queue.py @@ -0,0 +1,82 @@ +# Copyright 2016 - Nokia Networks. +# Copyright 2016 - Brocade Communications Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools + +from mistral.engine.rpc_backend import rpc +from mistral import utils + + +_THREAD_LOCAL_NAME = "__action_queue_thread_local" + + +def _prepare(): + utils.set_thread_local(_THREAD_LOCAL_NAME, list()) + + +def _clear(): + utils.set_thread_local(_THREAD_LOCAL_NAME, None) + + +def _get_queue(): + queue = utils.get_thread_local(_THREAD_LOCAL_NAME) + + if queue is None: + raise RuntimeError( + 'Action queue is not initialized for the current thread.' + ' Most likely some transactional method is not decorated' + ' with action_queue.process()' + ) + + return queue + + +def _run_actions(): + for action_ex, action_def, target in _get_queue(): + rpc.get_executor_client().run_action( + action_ex.id, + action_def.action_class, + action_def.attributes or {}, + action_ex.input, + target, + safe_rerun=action_ex.runtime_context.get('safe_rerun', False) + ) + + +def process(func): + """Decorator that processes (runs) all actions in the action queue. + + Various engine methods may cause new actions to be scheduled. All + such methods must be decorated with this decorator. It makes sure + to run all the actions in the queue and clean up the queue. + """ + @functools.wraps(func) + def decorate(*args, **kw): + _prepare() + + try: + res = func(*args, **kw) + + _run_actions() + finally: + _clear() + + return res + + return decorate + + +def schedule(action_ex, action_def, target): + _get_queue().append((action_ex, action_def, target)) diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index 8de2a0a6..2a9526dc 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -19,13 +19,13 @@ from osprofiler import profiler import six from mistral.db.v2 import api as db_api +from mistral.engine import action_queue from mistral.engine.rpc_backend import rpc from mistral.engine import utils as e_utils from mistral.engine import workflow_handler as wf_handler from mistral import exceptions as exc from mistral import expressions as expr from mistral.services import action_manager as a_m -from mistral.services import scheduler from mistral.services import security from mistral import utils from mistral.utils import wf_trace @@ -34,9 +34,6 @@ from mistral.workflow import states from mistral.workflow import utils as wf_utils -_RUN_EXISTING_ACTION_PATH = 'mistral.engine.actions._run_existing_action' - - @six.add_metaclass(abc.ABCMeta) class Action(object): """Action. @@ -227,13 +224,7 @@ class PythonAction(Action): action_ex_id=action_ex_id ) - scheduler.schedule_call( - None, - _RUN_EXISTING_ACTION_PATH, - 0, - action_ex_id=self.action_ex.id, - target=target - ) + action_queue.schedule(self.action_ex, self.action_def, target) @profiler.trace('action-run') def run(self, input_dict, target, index=0, desc='', save=True, @@ -507,22 +498,6 @@ class WorkflowAction(Action): pass -def _run_existing_action(action_ex_id, target): - action_ex = db_api.get_action_execution(action_ex_id) - action_def = db_api.get_action_definition(action_ex.name) - - result = rpc.get_executor_client().run_action( - action_ex_id, - action_def.action_class, - action_def.attributes or {}, - action_ex.input, - target, - safe_rerun=action_ex.runtime_context.get('safe_rerun', False) - ) - - return result.to_dict() if result else None - - def resolve_action_definition(action_spec_name, wf_name=None, wf_spec_name=None): """Resolve action definition accounting for ad-hoc action namespacing. diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 43d93fe7..b1600b5d 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -21,6 +21,7 @@ from mistral import coordination from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models as db_models from mistral.engine import action_handler +from mistral.engine import action_queue from mistral.engine import base from mistral.engine import workflow_handler as wf_handler from mistral import exceptions @@ -41,6 +42,7 @@ class DefaultEngine(base.Engine, coordination.Service): coordination.Service.__init__(self, 'engine_group') + @action_queue.process @u.log_exec(LOG) @profiler.trace('engine-start-workflow') def start_workflow(self, wf_identifier, wf_input, description='', @@ -55,6 +57,7 @@ class DefaultEngine(base.Engine, coordination.Service): return wf_ex.get_clone() + @action_queue.process @u.log_exec(LOG) def start_action(self, action_name, action_input, description=None, **params): @@ -106,6 +109,7 @@ class DefaultEngine(base.Engine, coordination.Service): return db_api.create_action_execution(values) + @action_queue.process @u.log_exec(LOG) @profiler.trace('engine-on-action-complete') def on_action_complete(self, action_ex_id, result, wf_action=False): @@ -128,6 +132,7 @@ class DefaultEngine(base.Engine, coordination.Service): return wf_ex.get_clone() + @action_queue.process @u.log_exec(LOG) def rerun_workflow(self, task_ex_id, reset=True, env=None): with db_api.transaction(): @@ -139,6 +144,7 @@ class DefaultEngine(base.Engine, coordination.Service): return wf_ex.get_clone() + @action_queue.process @u.log_exec(LOG) def resume_workflow(self, wf_ex_id, env=None): with db_api.transaction(): diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index 326d4933..4c8835c3 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -14,6 +14,7 @@ # limitations under the License. from mistral.db.v2 import api as db_api +from mistral.engine import action_queue from mistral.engine import base from mistral import expressions from mistral.services import scheduler @@ -443,6 +444,7 @@ class ConcurrencyPolicy(base.TaskPolicy): task_ex.runtime_context = runtime_context +@action_queue.process def _continue_task(task_ex_id): from mistral.engine import task_handler @@ -450,6 +452,7 @@ def _continue_task(task_ex_id): task_handler.continue_task(db_api.get_task_execution(task_ex_id)) +@action_queue.process def _complete_task(task_ex_id, state, state_info): from mistral.engine import task_handler @@ -461,6 +464,7 @@ def _complete_task(task_ex_id, state, state_info): ) +@action_queue.process def _fail_task_if_incomplete(task_ex_id, timeout): from mistral.engine import task_handler diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index ff7db56c..a5d8d87f 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -21,6 +21,7 @@ import traceback as tb from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models +from mistral.engine import action_queue from mistral.engine import tasks from mistral.engine import workflow_handler as wf_handler from mistral import exceptions as exc @@ -249,6 +250,7 @@ def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None, return cls(wf_ex, wf_spec, task_spec, ctx, task_ex, unique_key, waiting) +@action_queue.process @profiler.trace('task-handler-refresh-task-state') def _refresh_task_state(task_ex_id): with db_api.transaction(): @@ -326,6 +328,7 @@ def _schedule_refresh_task_state(task_ex, delay=0): ) +@action_queue.process def _scheduled_on_action_complete(action_ex_id, wf_action): with db_api.transaction(): if wf_action: diff --git a/mistral/utils/__init__.py b/mistral/utils/__init__.py index aef3c557..03c35615 100644 --- a/mistral/utils/__init__.py +++ b/mistral/utils/__init__.py @@ -84,7 +84,7 @@ def get_thread_local(var_name): def set_thread_local(var_name, val): - if not val and has_thread_local(var_name): + if val is None and has_thread_local(var_name): gl_storage = _get_greenlet_local_storage() # Delete variable from greenlet local storage. @@ -95,7 +95,7 @@ def set_thread_local(var_name, val): if gl_storage and len(gl_storage) == 0: del _th_loc_storage.greenlet_locals[corolocal.get_ident()] - if val: + if val is not None: gl_storage = _get_greenlet_local_storage() if not gl_storage: gl_storage = _th_loc_storage.greenlet_locals[