Pass the new ActionContext to mistral-lib
Partial-Bug: #1718353 Depends-On: I6057d0ce3fe4ae23468be8fb06cb85dc5f467f6b Change-Id: Ife653558bfcda794e7f37086832f70b0ad7c28a4
This commit is contained in:
parent
4820523e7c
commit
dd4a4bd440
@ -15,6 +15,7 @@
|
||||
|
||||
import base64
|
||||
|
||||
from mistral_lib.actions import context as lib_ctx
|
||||
from oslo_config import cfg
|
||||
from oslo_context import context as oslo_context
|
||||
import oslo_messaging as messaging
|
||||
@ -254,3 +255,27 @@ class ContextHook(hooks.PecanHook):
|
||||
|
||||
def after(self, state):
|
||||
set_ctx(None)
|
||||
|
||||
|
||||
def create_action_context(execution_ctx):
|
||||
|
||||
context = ctx()
|
||||
|
||||
security_ctx = lib_ctx.SecurityContext(
|
||||
auth_cacert=context.auth_cacert,
|
||||
auth_token=context.auth_token,
|
||||
auth_uri=context.auth_uri,
|
||||
expires_at=context.expires_at,
|
||||
insecure=context.insecure,
|
||||
is_target=context.is_target,
|
||||
is_trust_scoped=context.is_trust_scoped,
|
||||
project_id=context.project_id,
|
||||
redelivered=context.redelivered,
|
||||
region_name=context.region_name,
|
||||
service_catalog=context.service_catalog,
|
||||
trust_id=context.trust_id,
|
||||
)
|
||||
|
||||
ex_ctx = lib_ctx.ExecutionContext(**execution_ctx)
|
||||
|
||||
return lib_ctx.ActionContext(security_ctx, ex_ctx)
|
||||
|
@ -57,7 +57,7 @@ def _process_queue(queue):
|
||||
|
||||
for operation, args in queue:
|
||||
if operation == _RUN_ACTION:
|
||||
action_ex, action_def, target = args
|
||||
action_ex, action_def, target, execution_context = args
|
||||
|
||||
executor.run_action(
|
||||
action_ex.id,
|
||||
@ -65,7 +65,8 @@ def _process_queue(queue):
|
||||
action_def.attributes or {},
|
||||
action_ex.input,
|
||||
action_ex.runtime_context.get('safe_rerun', False),
|
||||
target=target
|
||||
execution_context,
|
||||
target=target,
|
||||
)
|
||||
elif operation == _ON_ACTION_COMPLETE:
|
||||
action_ex_id, result, wf_action = args
|
||||
@ -119,8 +120,9 @@ def process(func):
|
||||
return decorate
|
||||
|
||||
|
||||
def schedule_run_action(action_ex, action_def, target):
|
||||
_get_queue().append((_RUN_ACTION, (action_ex, action_def, target)))
|
||||
def schedule_run_action(action_ex, action_def, target, execution_context):
|
||||
args = (action_ex, action_def, target, execution_context)
|
||||
_get_queue().append((_RUN_ACTION, args))
|
||||
|
||||
|
||||
def schedule_on_action_complete(action_ex_id, result, wf_action=False):
|
||||
|
@ -248,10 +248,13 @@ class PythonAction(Action):
|
||||
action_ex_id=action_ex_id
|
||||
)
|
||||
|
||||
execution_context = self._prepare_execution_context()
|
||||
|
||||
action_queue.schedule_run_action(
|
||||
self.action_ex,
|
||||
self.action_def,
|
||||
target
|
||||
target,
|
||||
execution_context,
|
||||
)
|
||||
|
||||
@profiler.trace('action-run', hide_args=True)
|
||||
@ -280,12 +283,15 @@ class PythonAction(Action):
|
||||
|
||||
executor = exe.get_executor(cfg.CONF.executor.type)
|
||||
|
||||
execution_context = self._prepare_execution_context()
|
||||
|
||||
result = executor.run_action(
|
||||
self.action_ex.id if self.action_ex else None,
|
||||
self.action_def.action_class,
|
||||
self.action_def.attributes or {},
|
||||
input_dict,
|
||||
safe_rerun=safe_rerun,
|
||||
safe_rerun,
|
||||
execution_context,
|
||||
target=target,
|
||||
async_=False
|
||||
)
|
||||
@ -317,6 +323,23 @@ class PythonAction(Action):
|
||||
self.action_def.action_class
|
||||
)
|
||||
|
||||
def _prepare_execution_context(self):
|
||||
|
||||
exc_ctx = {}
|
||||
|
||||
if self.task_ex:
|
||||
wf_ex = self.task_ex.workflow_execution
|
||||
exc_ctx['workflow_execution_id'] = wf_ex.id
|
||||
exc_ctx['task_id'] = self.task_ex.id
|
||||
exc_ctx['workflow_name'] = wf_ex.name
|
||||
|
||||
if self.action_ex:
|
||||
exc_ctx['action_execution_id'] = self.action_ex.id
|
||||
callback_url = '/v2/action_executions/%s' % self.action_ex.id
|
||||
exc_ctx['callback_url'] = callback_url
|
||||
|
||||
return exc_ctx
|
||||
|
||||
def _prepare_input(self, input_dict):
|
||||
"""Template method to do manipulations with input parameters.
|
||||
|
||||
|
@ -83,7 +83,7 @@ class DefaultEngine(base.Engine):
|
||||
|
||||
if not save:
|
||||
# Action execution is not created but we need to return similar
|
||||
# object to a client anyway.
|
||||
# object to the client anyway.
|
||||
return db_models.ActionExecution(
|
||||
name=action_name,
|
||||
description=description,
|
||||
|
@ -49,7 +49,7 @@ class Executor(object):
|
||||
|
||||
@abc.abstractmethod
|
||||
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
|
||||
params, safe_rerun, redelivered=False,
|
||||
params, safe_rerun, execution_context, redelivered=False,
|
||||
target=None, async_=True):
|
||||
"""Runs action.
|
||||
|
||||
@ -59,6 +59,8 @@ class Executor(object):
|
||||
will be set to.
|
||||
:param params: Action parameters.
|
||||
:param safe_rerun: Tells if given action can be safely rerun.
|
||||
:param execution_context: A dict of values providing information about
|
||||
the current execution.
|
||||
:param redelivered: Tells if given action was run before on another
|
||||
executor.
|
||||
:param target: Target (group of action executors).
|
||||
|
@ -13,11 +13,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mistral_lib import actions as mistral_lib
|
||||
from oslo_log import log as logging
|
||||
from osprofiler import profiler
|
||||
|
||||
from mistral_lib import actions as mistral_lib
|
||||
|
||||
from mistral.actions import action_factory as a_f
|
||||
from mistral import context
|
||||
from mistral import exceptions as exc
|
||||
@ -35,7 +34,7 @@ class DefaultExecutor(base.Executor):
|
||||
|
||||
@profiler.trace('default-executor-run-action', hide_args=True)
|
||||
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
|
||||
params, safe_rerun, redelivered=False,
|
||||
params, safe_rerun, execution_context, redelivered=False,
|
||||
target=None, async_=True):
|
||||
"""Runs action.
|
||||
|
||||
@ -45,6 +44,8 @@ class DefaultExecutor(base.Executor):
|
||||
will be set to.
|
||||
:param params: Action parameters.
|
||||
:param safe_rerun: Tells if given action can be safely rerun.
|
||||
:param execution_context: A dict of values providing information about
|
||||
the current execution.
|
||||
:param redelivered: Tells if given action was run before on another
|
||||
executor.
|
||||
:param target: Target (group of action executors).
|
||||
@ -103,10 +104,10 @@ class DefaultExecutor(base.Executor):
|
||||
try:
|
||||
|
||||
# NOTE(d0ugal): If the action is a subclass of mistral-lib we know
|
||||
# that it expects to be passed the context. We should deprecate
|
||||
# the builtin action class in Mistral.
|
||||
# that it expects to be passed the context.
|
||||
if isinstance(action, mistral_lib.Action):
|
||||
result = action.run(context.ctx())
|
||||
action_ctx = context.create_action_context(execution_context)
|
||||
result = action.run(action_ctx)
|
||||
else:
|
||||
result = action.run()
|
||||
|
||||
|
@ -60,7 +60,7 @@ class ExecutorServer(service_base.MistralService):
|
||||
self._rpc_server.stop(graceful)
|
||||
|
||||
def run_action(self, rpc_ctx, action_ex_id, action_cls_str,
|
||||
action_cls_attrs, params, safe_rerun):
|
||||
action_cls_attrs, params, safe_rerun, execution_context):
|
||||
"""Receives calls over RPC to run action on executor.
|
||||
|
||||
:param rpc_ctx: RPC request context dictionary.
|
||||
@ -89,6 +89,7 @@ class ExecutorServer(service_base.MistralService):
|
||||
action_cls_attrs,
|
||||
params,
|
||||
safe_rerun,
|
||||
execution_context,
|
||||
redelivered
|
||||
)
|
||||
|
||||
|
@ -299,7 +299,7 @@ class ExecutorClient(exe.Executor):
|
||||
|
||||
@profiler.trace('executor-client-run-action')
|
||||
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
|
||||
params, safe_rerun, redelivered=False,
|
||||
params, safe_rerun, execution_context, redelivered=False,
|
||||
target=None, async_=True):
|
||||
"""Sends a request to run action to executor.
|
||||
|
||||
@ -322,7 +322,8 @@ class ExecutorClient(exe.Executor):
|
||||
'action_cls_str': action_cls_str,
|
||||
'action_cls_attrs': action_cls_attrs,
|
||||
'params': params,
|
||||
'safe_rerun': safe_rerun
|
||||
'safe_rerun': safe_rerun,
|
||||
'execution_context': execution_context,
|
||||
}
|
||||
|
||||
rpc_client_method = (self._client.async_call
|
||||
|
@ -79,7 +79,8 @@ workflows:
|
||||
|
||||
|
||||
def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs,
|
||||
params, safe_rerun, target=None, async_=True):
|
||||
params, safe_rerun, execution_context, target=None,
|
||||
async_=True):
|
||||
|
||||
# We'll just call executor directly for testing purposes.
|
||||
executor = d_exe.DefaultExecutor()
|
||||
@ -89,7 +90,8 @@ def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs,
|
||||
action_cls_str,
|
||||
action_cls_attrs,
|
||||
params,
|
||||
safe_rerun
|
||||
safe_rerun,
|
||||
execution_context=execution_context
|
||||
)
|
||||
|
||||
|
||||
@ -171,12 +173,21 @@ class EnvironmentTest(base.EngineTestCase):
|
||||
for t_ex in wf1_task_execs:
|
||||
a_ex = t_ex.action_executions[0]
|
||||
|
||||
callback_url = '/v2/action_executions/%s' % a_ex.id
|
||||
|
||||
r_exe.RemoteExecutor.run_action.assert_any_call(
|
||||
a_ex.id,
|
||||
'mistral.actions.std_actions.EchoAction',
|
||||
{},
|
||||
a_ex.input,
|
||||
False,
|
||||
{
|
||||
'task_id': t_ex.id,
|
||||
'callback_url': callback_url,
|
||||
'workflow_execution_id': wf1_ex.id,
|
||||
'workflow_name': wf1_ex.name,
|
||||
'action_execution_id': a_ex.id,
|
||||
},
|
||||
target=TARGET
|
||||
)
|
||||
|
||||
|
@ -25,7 +25,8 @@ from mistral.workflow import states
|
||||
|
||||
|
||||
def _run_at_target(action_ex_id, action_class_str, attributes,
|
||||
action_params, safe_rerun, target=None, async_=True):
|
||||
action_params, safe_rerun, execution_context, target=None,
|
||||
async_=True):
|
||||
# We'll just call executor directly for testing purposes.
|
||||
executor = d_exe.DefaultExecutor()
|
||||
|
||||
@ -35,6 +36,7 @@ def _run_at_target(action_ex_id, action_class_str, attributes,
|
||||
attributes,
|
||||
action_params,
|
||||
safe_rerun,
|
||||
execution_context,
|
||||
redelivered=True
|
||||
)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user