Add option to start subworkflow via RPC.

In case of several engine instances, starting subworkflows not in the
same instance with parent task, could significantly smooth load between
engine instances. Furthermore, starting workflow is not always
a lightweight operation and we should try to make it as more 'atomic'
as possible.

Change-Id: I895bee811496f920b075880a6c438c53f7ecb2ca
Signed-off-by: Oleg Ovcharuk <vgvoleg@gmail.com>
This commit is contained in:
Oleg Ovcharuk 2019-04-16 22:57:26 +03:00
parent 8e94e87259
commit 243c09d505
5 changed files with 44 additions and 12 deletions

View File

@ -197,6 +197,16 @@ engine_opts = [
default=60,
help=_('A number of seconds that indicates how long action '
'definitions should be stored in the local cache.')
),
cfg.BoolOpt(
'start_subworkflows_via_rpc',
default=False,
help=(
'Enables starting subworkflows via RPC. Use "False" to start '
'subworkflow within the same engine instance. Use "True" '
'to start subworkflow via RPC to improve load balancing '
'in case of several engine instances.'
)
)
]

View File

@ -28,6 +28,7 @@ from mistral import exceptions as exc
from mistral.executors import base as exe
from mistral import expressions as expr
from mistral.lang import parser as spec_parser
from mistral.rpc import clients as rpc
from mistral.services import action_manager as a_m
from mistral.services import security
from mistral import utils
@ -611,14 +612,28 @@ class WorkflowAction(Action):
wf_params[k] = v
del input_dict[k]
wf_handler.start_workflow(
wf_def.id,
wf_def.namespace,
None,
input_dict,
"sub-workflow execution",
wf_params
)
if cfg.CONF.engine.start_subworkflows_via_rpc:
def _start_subworkflow():
rpc.get_engine_client().start_workflow(
wf_def.id,
wf_def.namespace,
None,
input_dict,
"sub-workflow execution",
async_=True,
**wf_params
)
post_tx_queue.register_operation(_start_subworkflow)
else:
wf_handler.start_workflow(
wf_def.id,
wf_def.namespace,
None,
input_dict,
"sub-workflow execution",
wf_params
)
@profiler.trace('workflow-action-run', hide_args=True)
def run(self, input_dict, target, index=0, desc='', save=True,

View File

@ -29,7 +29,7 @@ class Engine(object):
@abc.abstractmethod
def start_workflow(self, wf_identifier, wf_namespace='', wf_ex_id=None,
wf_input=None, description='', **params):
wf_input=None, description='', async_=False, **params):
"""Starts the specified workflow.
:param wf_identifier: Workflow ID or name. Workflow ID is recommended,
@ -39,6 +39,8 @@ class Engine(object):
:param wf_ex_id: Workflow execution id. If passed, it will be set
in the new execution object.
:param description: Execution description.
:param async_: If True, start workflow in asynchronous mode
(w/o waiting for completion).
:param params: Additional workflow type specific parameters.
:return: Workflow execution object.
"""

View File

@ -45,7 +45,7 @@ class DefaultEngine(base.Engine):
@post_tx_queue.run
@profiler.trace('engine-start-workflow', hide_args=True)
def start_workflow(self, wf_identifier, wf_namespace='', wf_ex_id=None,
wf_input=None, description='', **params):
wf_input=None, description='', async_=False, **params):
if wf_namespace:
params['namespace'] = wf_namespace

View File

@ -120,7 +120,7 @@ class EngineClient(eng.Engine):
@base.wrap_messaging_exception
def start_workflow(self, wf_identifier, wf_namespace='', wf_ex_id=None,
wf_input=None, description='', **params):
wf_input=None, description='', async_=False, **params):
"""Starts workflow sending a request to engine over RPC.
:param wf_identifier: Workflow identifier.
@ -129,10 +129,15 @@ class EngineClient(eng.Engine):
:param wf_ex_id: Workflow execution id. If passed, it will be set
in the new execution object.
:param description: Execution description.
:param async_: If True, start workflow in asynchronous mode
(w/o waiting for completion).
:param params: Additional workflow type specific parameters.
:return: Workflow execution.
"""
return self._client.sync_call(
call = self._client.async_call if async_ else self._client.sync_call
return call(
auth_ctx.ctx(),
'start_workflow',
wf_identifier=wf_identifier,