Merge "Run actions without Scheduer"

This commit is contained in:
Jenkins 2016-11-03 14:26:39 +00:00 committed by Gerrit Code Review
commit 09b0f87a6a
6 changed files with 99 additions and 29 deletions

View File

@ -0,0 +1,82 @@
# Copyright 2016 - Nokia Networks.
# Copyright 2016 - Brocade Communications Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
from mistral.engine.rpc_backend import rpc
from mistral import utils
_THREAD_LOCAL_NAME = "__action_queue_thread_local"
def _prepare():
utils.set_thread_local(_THREAD_LOCAL_NAME, list())
def _clear():
utils.set_thread_local(_THREAD_LOCAL_NAME, None)
def _get_queue():
queue = utils.get_thread_local(_THREAD_LOCAL_NAME)
if queue is None:
raise RuntimeError(
'Action queue is not initialized for the current thread.'
' Most likely some transactional method is not decorated'
' with action_queue.process()'
)
return queue
def _run_actions():
for action_ex, action_def, target in _get_queue():
rpc.get_executor_client().run_action(
action_ex.id,
action_def.action_class,
action_def.attributes or {},
action_ex.input,
target,
safe_rerun=action_ex.runtime_context.get('safe_rerun', False)
)
def process(func):
"""Decorator that processes (runs) all actions in the action queue.
Various engine methods may cause new actions to be scheduled. All
such methods must be decorated with this decorator. It makes sure
to run all the actions in the queue and clean up the queue.
"""
@functools.wraps(func)
def decorate(*args, **kw):
_prepare()
try:
res = func(*args, **kw)
_run_actions()
finally:
_clear()
return res
return decorate
def schedule(action_ex, action_def, target):
_get_queue().append((action_ex, action_def, target))

View File

@ -19,13 +19,13 @@ from osprofiler import profiler
import six import six
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.engine import action_queue
from mistral.engine.rpc_backend import rpc from mistral.engine.rpc_backend import rpc
from mistral.engine import utils as e_utils from mistral.engine import utils as e_utils
from mistral.engine import workflow_handler as wf_handler from mistral.engine import workflow_handler as wf_handler
from mistral import exceptions as exc from mistral import exceptions as exc
from mistral import expressions as expr from mistral import expressions as expr
from mistral.services import action_manager as a_m from mistral.services import action_manager as a_m
from mistral.services import scheduler
from mistral.services import security from mistral.services import security
from mistral import utils from mistral import utils
from mistral.utils import wf_trace from mistral.utils import wf_trace
@ -34,9 +34,6 @@ from mistral.workflow import states
from mistral.workflow import utils as wf_utils from mistral.workflow import utils as wf_utils
_RUN_EXISTING_ACTION_PATH = 'mistral.engine.actions._run_existing_action'
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class Action(object): class Action(object):
"""Action. """Action.
@ -227,13 +224,7 @@ class PythonAction(Action):
action_ex_id=action_ex_id action_ex_id=action_ex_id
) )
scheduler.schedule_call( action_queue.schedule(self.action_ex, self.action_def, target)
None,
_RUN_EXISTING_ACTION_PATH,
0,
action_ex_id=self.action_ex.id,
target=target
)
@profiler.trace('action-run') @profiler.trace('action-run')
def run(self, input_dict, target, index=0, desc='', save=True, def run(self, input_dict, target, index=0, desc='', save=True,
@ -507,22 +498,6 @@ class WorkflowAction(Action):
pass pass
def _run_existing_action(action_ex_id, target):
action_ex = db_api.get_action_execution(action_ex_id)
action_def = db_api.get_action_definition(action_ex.name)
result = rpc.get_executor_client().run_action(
action_ex_id,
action_def.action_class,
action_def.attributes or {},
action_ex.input,
target,
safe_rerun=action_ex.runtime_context.get('safe_rerun', False)
)
return result.to_dict() if result else None
def resolve_action_definition(action_spec_name, wf_name=None, def resolve_action_definition(action_spec_name, wf_name=None,
wf_spec_name=None): wf_spec_name=None):
"""Resolve action definition accounting for ad-hoc action namespacing. """Resolve action definition accounting for ad-hoc action namespacing.

View File

@ -21,6 +21,7 @@ from mistral import coordination
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db_models from mistral.db.v2.sqlalchemy import models as db_models
from mistral.engine import action_handler from mistral.engine import action_handler
from mistral.engine import action_queue
from mistral.engine import base from mistral.engine import base
from mistral.engine import workflow_handler as wf_handler from mistral.engine import workflow_handler as wf_handler
from mistral import exceptions from mistral import exceptions
@ -41,6 +42,7 @@ class DefaultEngine(base.Engine, coordination.Service):
coordination.Service.__init__(self, 'engine_group') coordination.Service.__init__(self, 'engine_group')
@action_queue.process
@u.log_exec(LOG) @u.log_exec(LOG)
@profiler.trace('engine-start-workflow') @profiler.trace('engine-start-workflow')
def start_workflow(self, wf_identifier, wf_input, description='', def start_workflow(self, wf_identifier, wf_input, description='',
@ -55,6 +57,7 @@ class DefaultEngine(base.Engine, coordination.Service):
return wf_ex.get_clone() return wf_ex.get_clone()
@action_queue.process
@u.log_exec(LOG) @u.log_exec(LOG)
def start_action(self, action_name, action_input, def start_action(self, action_name, action_input,
description=None, **params): description=None, **params):
@ -106,6 +109,7 @@ class DefaultEngine(base.Engine, coordination.Service):
return db_api.create_action_execution(values) return db_api.create_action_execution(values)
@action_queue.process
@u.log_exec(LOG) @u.log_exec(LOG)
@profiler.trace('engine-on-action-complete') @profiler.trace('engine-on-action-complete')
def on_action_complete(self, action_ex_id, result, wf_action=False): def on_action_complete(self, action_ex_id, result, wf_action=False):
@ -128,6 +132,7 @@ class DefaultEngine(base.Engine, coordination.Service):
return wf_ex.get_clone() return wf_ex.get_clone()
@action_queue.process
@u.log_exec(LOG) @u.log_exec(LOG)
def rerun_workflow(self, task_ex_id, reset=True, env=None): def rerun_workflow(self, task_ex_id, reset=True, env=None):
with db_api.transaction(): with db_api.transaction():
@ -139,6 +144,7 @@ class DefaultEngine(base.Engine, coordination.Service):
return wf_ex.get_clone() return wf_ex.get_clone()
@action_queue.process
@u.log_exec(LOG) @u.log_exec(LOG)
def resume_workflow(self, wf_ex_id, env=None): def resume_workflow(self, wf_ex_id, env=None):
with db_api.transaction(): with db_api.transaction():

View File

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.engine import action_queue
from mistral.engine import base from mistral.engine import base
from mistral import expressions from mistral import expressions
from mistral.services import scheduler from mistral.services import scheduler
@ -443,6 +444,7 @@ class ConcurrencyPolicy(base.TaskPolicy):
task_ex.runtime_context = runtime_context task_ex.runtime_context = runtime_context
@action_queue.process
def _continue_task(task_ex_id): def _continue_task(task_ex_id):
from mistral.engine import task_handler from mistral.engine import task_handler
@ -450,6 +452,7 @@ def _continue_task(task_ex_id):
task_handler.continue_task(db_api.get_task_execution(task_ex_id)) task_handler.continue_task(db_api.get_task_execution(task_ex_id))
@action_queue.process
def _complete_task(task_ex_id, state, state_info): def _complete_task(task_ex_id, state, state_info):
from mistral.engine import task_handler from mistral.engine import task_handler
@ -461,6 +464,7 @@ def _complete_task(task_ex_id, state, state_info):
) )
@action_queue.process
def _fail_task_if_incomplete(task_ex_id, timeout): def _fail_task_if_incomplete(task_ex_id, timeout):
from mistral.engine import task_handler from mistral.engine import task_handler

View File

@ -21,6 +21,7 @@ import traceback as tb
from mistral.db.v2 import api as db_api from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models from mistral.db.v2.sqlalchemy import models
from mistral.engine import action_queue
from mistral.engine import tasks from mistral.engine import tasks
from mistral.engine import workflow_handler as wf_handler from mistral.engine import workflow_handler as wf_handler
from mistral import exceptions as exc from mistral import exceptions as exc
@ -249,6 +250,7 @@ def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None,
return cls(wf_ex, wf_spec, task_spec, ctx, task_ex, unique_key, waiting) return cls(wf_ex, wf_spec, task_spec, ctx, task_ex, unique_key, waiting)
@action_queue.process
@profiler.trace('task-handler-refresh-task-state') @profiler.trace('task-handler-refresh-task-state')
def _refresh_task_state(task_ex_id): def _refresh_task_state(task_ex_id):
with db_api.transaction(): with db_api.transaction():
@ -326,6 +328,7 @@ def _schedule_refresh_task_state(task_ex, delay=0):
) )
@action_queue.process
def _scheduled_on_action_complete(action_ex_id, wf_action): def _scheduled_on_action_complete(action_ex_id, wf_action):
with db_api.transaction(): with db_api.transaction():
if wf_action: if wf_action:

View File

@ -84,7 +84,7 @@ def get_thread_local(var_name):
def set_thread_local(var_name, val): def set_thread_local(var_name, val):
if not val and has_thread_local(var_name): if val is None and has_thread_local(var_name):
gl_storage = _get_greenlet_local_storage() gl_storage = _get_greenlet_local_storage()
# Delete variable from greenlet local storage. # Delete variable from greenlet local storage.
@ -95,7 +95,7 @@ def set_thread_local(var_name, val):
if gl_storage and len(gl_storage) == 0: if gl_storage and len(gl_storage) == 0:
del _th_loc_storage.greenlet_locals[corolocal.get_ident()] del _th_loc_storage.greenlet_locals[corolocal.get_ident()]
if val: if val is not None:
gl_storage = _get_greenlet_local_storage() gl_storage = _get_greenlet_local_storage()
if not gl_storage: if not gl_storage:
gl_storage = _th_loc_storage.greenlet_locals[ gl_storage = _th_loc_storage.greenlet_locals[