Refactoring engine using abstraction of command

* Refactored engine completely
* Rearranged utility classes to remove circular dependencies
* Fixed policies
* Fixed workflow handlers to generate commands
* Fixed all unit tests
* Other minor changes

Change-Id: Ie0c4d8222ff701e50876a8fe4660e297fc4bc4af
This commit is contained in:
Renat Akhmerov 2014-09-13 22:35:53 +07:00
parent 36176a5842
commit e458708b73
20 changed files with 510 additions and 379 deletions

View File

@ -26,8 +26,8 @@ from mistral.engine1 import rpc
from mistral import exceptions as exc
from mistral.openstack.common import log as logging
from mistral.utils import rest_utils
from mistral.workflow import base as wf_base
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
@ -117,9 +117,9 @@ class TasksController(rest.RestController):
raise exc.InvalidResultException()
if task.state == states.ERROR:
raw_result = wf_base.TaskResult(None, result)
raw_result = wf_utils.TaskResult(None, result)
else:
raw_result = wf_base.TaskResult(result)
raw_result = wf_utils.TaskResult(result)
engine = rpc.get_engine_client()

View File

@ -44,6 +44,8 @@ from mistral import context as ctx
from mistral.db.v2 import api as db_api
from mistral import engine
from mistral.engine import executor
from mistral.engine1 import default_engine as def_eng
from mistral.engine1 import default_executor as def_executor
from mistral.engine1 import rpc
from mistral.openstack.common import log as logging
from mistral.services import scheduler
@ -60,10 +62,11 @@ def launch_executor(transport):
# Since engine and executor are tightly coupled, use the engine
# configuration to decide which executor to get.
endpoints = [
executor.get_executor(cfg.CONF.engine.engine, transport),
rpc.get_executor_server()
]
executor_v1 = executor.get_executor(cfg.CONF.engine.engine, transport)
executor_v2 = def_executor.DefaultExecutor(rpc.get_engine_client())
endpoints = [executor_v1, rpc.ExecutorServer(executor_v2)]
server = messaging.get_rpc_server(
transport,
@ -83,10 +86,13 @@ def launch_engine(transport):
server=cfg.CONF.engine.host
)
endpoints = [
engine.get_engine(cfg.CONF.engine.engine, transport),
rpc.get_engine_server()
]
engine_v1 = engine.get_engine(cfg.CONF.engine.engine, transport)
engine_v2 = def_eng.DefaultEngine(
rpc.get_engine_client(),
rpc.get_executor_client()
)
endpoints = [engine_v1, rpc.EngineServer(engine_v2)]
# Setup scheduler in engine.
db_api.setup_db()

204
mistral/engine1/commands.py Normal file
View File

@ -0,0 +1,204 @@
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
from mistral.db.v2 import api as db_api
from mistral.engine1 import policies
from mistral.engine1 import rpc
from mistral.engine1 import utils
from mistral import expressions as expr
from mistral.openstack.common import log as logging
from mistral.workbook import parser as spec_parser
from mistral.workflow import data_flow
from mistral.workflow import states
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class EngineCommand(object):
"""Engine command interface."""
@abc.abstractmethod
def run(self, exec_db, wf_handler):
"""Runs the command.
:param exec_db: Workflow execution DB object.
:param wf_handler: Workflow handler currently being used.
:return False if engine should stop further command processing,
True otherwise.
"""
raise NotImplementedError
class RunTask(EngineCommand):
def __init__(self, task_spec, task_db=None):
self.task_spec = task_spec
self.task_db = task_db
def run(self, exec_db, wf_handler):
LOG.debug('Running workflow task: %s' % self.task_spec)
self._prepare_task(exec_db, wf_handler)
self._before_task_start()
self._run_task()
def _prepare_task(self, exec_db, wf_handler):
if self.task_db:
return
self.task_db = self._create_db_task(exec_db)
# Evaluate Data Flow properties ('parameters', 'in_context').
data_flow.prepare_db_task(
self.task_db,
self.task_spec,
wf_handler.get_upstream_tasks(self.task_spec),
exec_db
)
def _before_task_start(self):
for p in policies.build_policies(self.task_spec.get_policies()):
p.before_task_start(self.task_db, self.task_spec)
def _create_db_task(self, exec_db):
return db_api.create_task({
'execution_id': exec_db.id,
'name': self.task_spec.get_name(),
'state': states.RUNNING,
'spec': self.task_spec.to_dict(),
'parameters': None,
'in_context': None,
'output': None,
'runtime_context': None
})
def _run_task(self):
# Policies could possibly change task state.
if self.task_db.state != states.RUNNING:
return
if self.task_spec.get_action_name():
self._run_action()
elif self.task_spec.get_workflow_name():
self._run_workflow()
def _run_action(self):
exec_db = self.task_db.execution
wf_spec = spec_parser.get_workflow_spec(exec_db.wf_spec)
action_spec_name = self.task_spec.get_action_name()
action_db = utils.resolve_action(
exec_db.wf_name,
wf_spec.get_name(),
action_spec_name
)
action_params = self.task_db.parameters or {}
if action_db.spec:
# Ad-hoc action.
action_spec = spec_parser.get_action_spec(action_db.spec)
base_name = action_spec.get_base()
action_db = utils.resolve_action(
exec_db.wf_name,
wf_spec.get_name(),
base_name
)
base_params = action_spec.get_base_parameters()
if base_params:
action_params = expr.evaluate_recursively(
base_params,
action_params
)
else:
action_params = {}
rpc.get_executor_client().run_action(
self.task_db.id,
action_db.action_class,
action_db.attributes or {},
action_params
)
def _run_workflow(self):
parent_exec_db = self.task_db.execution
parent_wf_spec = spec_parser.get_workflow_spec(parent_exec_db.wf_spec)
wf_spec_name = self.task_spec.get_workflow_name()
wf_db = utils.resolve_workflow(
parent_exec_db.wf_name,
parent_wf_spec.get_name(),
wf_spec_name
)
wf_spec = spec_parser.get_workflow_spec(wf_db.spec)
wf_input = self.task_db.parameters
start_params = {'parent_task_id': self.task_db.id}
for k, v in wf_input.items():
if k not in wf_spec.get_parameters():
start_params[k] = v
del wf_input[k]
rpc.get_engine_client().start_workflow(
wf_db.name,
wf_input,
**start_params
)
class FailWorkflow(EngineCommand):
def run(self, exec_db, wf_handler):
exec_db.state = states.ERROR
return False
class SucceedWorkflow(EngineCommand):
def run(self, exec_db, wf_handler):
exec_db.state = states.SUCCESS
return False
class PauseWorkflow(EngineCommand):
def run(self, exec_db, wf_handler):
wf_handler.pause_workflow()
return False
class RollbackWorkflow(EngineCommand):
def run(self, exec_db, wf_handler):
pass
CMD_MAP = {
'run_task': RunTask,
'fail': FailWorkflow,
'succeed': SucceedWorkflow,
'pause': PauseWorkflow,
'rollback': PauseWorkflow
}

View File

@ -17,14 +17,14 @@ from oslo.config import cfg
from mistral.db.v2 import api as db_api
from mistral.engine1 import base
from mistral.engine1 import commands
from mistral.engine1 import policies
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.engine1 import utils
from mistral.openstack.common import log as logging
from mistral.workbook import parser as spec_parser
from mistral.workflow import base as wf_base
from mistral.workflow import data_flow
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
from mistral.workflow import workflow_handler_factory as wfh_factory
@ -60,11 +60,10 @@ class DefaultEngine(base.Engine):
wf_handler = wfh_factory.create_workflow_handler(exec_db, wf_spec)
# Calculate tasks to process next.
task_specs = wf_handler.start_workflow(**params)
# Calculate commands to process next.
commands = wf_handler.start_workflow(**params)
if task_specs:
self._process_task_specs(task_specs, exec_db, wf_handler)
self._run_commands(commands, exec_db, wf_handler)
return exec_db
@ -73,7 +72,7 @@ class DefaultEngine(base.Engine):
task_db = db_api.get_task(task_id)
exec_db = db_api.get_execution(task_db.execution_id)
raw_result = self._transform_result(exec_db, task_db, raw_result)
raw_result = utils.transform_result(exec_db, task_db, raw_result)
self._after_task_complete(
task_db,
@ -86,11 +85,10 @@ class DefaultEngine(base.Engine):
wf_handler = wfh_factory.create_workflow_handler(exec_db)
# Calculate tasks to process next.
task_specs = wf_handler.on_task_result(task_db, raw_result)
# Calculate commands to process next.
commands = wf_handler.on_task_result(task_db, raw_result)
if task_specs:
self._process_task_specs(task_specs, exec_db, wf_handler)
self._run_commands(commands, exec_db, wf_handler)
self._check_subworkflow_completion(exec_db)
@ -112,11 +110,10 @@ class DefaultEngine(base.Engine):
wf_handler = wfh_factory.create_workflow_handler(exec_db)
# Calculate tasks to process next.
task_specs = wf_handler.resume_workflow()
# Calculate commands to process next.
commands = wf_handler.resume_workflow()
if task_specs:
self._process_task_specs(task_specs, exec_db, wf_handler)
self._run_commands(commands, exec_db, wf_handler)
return exec_db
@ -124,32 +121,13 @@ class DefaultEngine(base.Engine):
# TODO(rakhmerov): Implement.
raise NotImplementedError
def _process_task_specs(self, task_specs, exec_db, wf_handler):
LOG.debug('Processing workflow tasks: %s' % task_specs)
@staticmethod
def _run_commands(commands, exec_db, wf_handler):
if not commands:
return
# DB tasks & Data Flow properties
db_tasks = self._prepare_db_tasks(task_specs, exec_db, wf_handler)
# Running actions/workflows.
self._run_tasks(db_tasks, task_specs)
def _prepare_db_tasks(self, task_specs, exec_db, wf_handler):
wf_spec = spec_parser.get_workflow_spec(exec_db.wf_spec)
new_db_tasks = self._create_db_tasks(exec_db, task_specs)
# Evaluate Data Flow properties ('parameters', 'in_context').
for t_db in new_db_tasks:
task_spec = wf_spec.get_tasks()[t_db.name]
data_flow.prepare_db_task(
t_db,
task_spec,
wf_handler.get_upstream_tasks(task_spec),
exec_db
)
return new_db_tasks
for cmd in commands:
cmd.run(exec_db, wf_handler)
@staticmethod
def _create_db_execution(wf_db, wf_spec, wf_input, params):
@ -169,180 +147,21 @@ class DefaultEngine(base.Engine):
return exec_db
@staticmethod
def _create_db_tasks(exec_db, task_specs):
new_db_tasks = []
for task_spec in task_specs:
t = db_api.create_task({
'execution_id': exec_db.id,
'name': task_spec.get_name(),
'state': states.RUNNING,
'spec': task_spec.to_dict(),
'parameters': None,
'in_context': None,
'output': None,
'runtime_context': None
})
new_db_tasks.append(t)
return new_db_tasks
@staticmethod
def _before_task_start(task_db, task_spec):
for p in policies.build_policies(task_spec.get_policies()):
p.before_task_start(task_db, task_spec)
@staticmethod
def _after_task_complete(task_db, task_spec, raw_result):
for p in policies.build_policies(task_spec.get_policies()):
p.after_task_complete(task_db, task_spec, raw_result)
def _run_tasks(self, db_tasks, task_specs):
for t_db, t_spec in zip(db_tasks, task_specs):
self._before_task_start(t_db, t_spec)
# Policies could possibly change task state.
if t_db.state == states.RUNNING:
self._run_task(t_db, t_spec)
def _run_task(self, t_db, t_spec):
if t_spec.get_action_name():
self._run_action(t_db, t_spec)
elif t_spec.get_workflow_name():
self._run_workflow(t_db, t_spec)
def run_task(self, task_id):
with db_api.transaction():
task_db = db_api.update_task(task_id, {'state': states.RUNNING})
task_spec = spec_parser.get_task_spec(task_db.spec)
self._run_task(task_db, task_spec)
exec_db = task_db.execution
@staticmethod
def _resolve_action(wf_name, wf_spec_name, action_spec_name):
action_db = None
wf_handler = wfh_factory.create_workflow_handler(exec_db)
if wf_name != wf_spec_name:
# If workflow belongs to a workbook then check
# action within the same workbook (to be able to
# use short names within workbooks).
# If it doesn't exist then use a name from spec
# to find an action in DB.
wb_name = wf_name.rstrip(wf_spec_name)[:-1]
action_full_name = "%s.%s" % (wb_name, action_spec_name)
action_db = db_api.load_action(action_full_name)
if not action_db:
action_db = db_api.load_action(action_spec_name)
if not action_db:
raise exc.InvalidActionException(
"Failed to find action [action_name=%s]" % action_spec_name
)
return action_db
def _run_action(self, task_db, task_spec):
exec_db = task_db.execution
wf_spec = spec_parser.get_workflow_spec(exec_db.wf_spec)
action_spec_name = task_spec.get_action_name()
action_db = self._resolve_action(
exec_db.wf_name,
wf_spec.get_name(),
action_spec_name
)
action_params = task_db.parameters or {}
if action_db.spec:
# Ad-hoc action.
action_spec = spec_parser.get_action_spec(action_db.spec)
base_name = action_spec.get_base()
action_db = self._resolve_action(
exec_db.wf_name,
wf_spec.get_name(),
base_name
)
base_params = action_spec.get_base_parameters()
if base_params:
action_params = expr.evaluate_recursively(
base_params,
action_params
)
else:
action_params = {}
self._executor_client.run_action(
task_db.id,
action_db.action_class,
action_db.attributes or {},
action_params
)
@staticmethod
def _resolve_workflow(parent_wf_name, parent_wf_spec_name, wf_spec_name):
wf_db = None
if parent_wf_name != parent_wf_spec_name:
# If parent workflow belongs to a workbook then
# check child workflow within the same workbook
# (to be able to use short names within workbooks).
# If it doesn't exist then use a name from spec
# to find a workflow in DB.
wb_name = parent_wf_name.rstrip(parent_wf_spec_name)[:-1]
wf_full_name = "%s.%s" % (wb_name, wf_spec_name)
wf_db = db_api.load_workflow(wf_full_name)
if not wf_db:
wf_db = db_api.load_workflow(wf_spec_name)
if not wf_db:
raise exc.WorkflowException(
"Failed to find workflow [name=%s]" % wf_spec_name
)
return wf_db
def _run_workflow(self, task_db, task_spec):
parent_exec_db = task_db.execution
parent_wf_spec = spec_parser.get_workflow_spec(parent_exec_db.wf_spec)
wf_spec_name = task_spec.get_workflow_name()
wf_db = self._resolve_workflow(
parent_exec_db.wf_name,
parent_wf_spec.get_name(),
wf_spec_name
)
wf_spec = spec_parser.get_workflow_spec(wf_db.spec)
wf_input = task_db.parameters
start_params = {'parent_task_id': task_db.id}
for k, v in wf_input.items():
if k not in wf_spec.get_parameters():
start_params[k] = v
del wf_input[k]
self._engine_client.start_workflow(
wf_db.name,
wf_input,
**start_params
)
commands.RunTask(task_spec, task_db).run(exec_db, wf_handler)
def _check_subworkflow_completion(self, exec_db):
if not exec_db.parent_task_id:
@ -351,53 +170,12 @@ class DefaultEngine(base.Engine):
if exec_db.state == states.SUCCESS:
self._engine_client.on_task_result(
exec_db.parent_task_id,
wf_base.TaskResult(data=exec_db.output)
wf_utils.TaskResult(data=exec_db.output)
)
elif exec_db.state == states.ERROR:
err_msg = 'Failed subworkflow [execution_id=%s]' % exec_db.id
self._engine_client.on_task_result(
exec_db.parent_task_id,
wf_base.TaskResult(error=err_msg)
wf_utils.TaskResult(error=err_msg)
)
def _transform_result(self, exec_db, task_db, raw_result):
if raw_result.is_error():
return raw_result
action_spec_name =\
spec_parser.get_task_spec(task_db.spec).get_action_name()
wf_spec_name = \
spec_parser.get_workflow_spec(exec_db.wf_spec).get_name()
if action_spec_name:
return self._transform_action_result(
exec_db.wf_name,
wf_spec_name,
action_spec_name,
raw_result
)
return raw_result
def _transform_action_result(self, wf_name, wf_spec_name, action_spec_name,
raw_result):
action_db = self._resolve_action(
wf_name,
wf_spec_name,
action_spec_name
)
if not action_db.spec:
return raw_result
transformer = spec_parser.get_action_spec(action_db.spec).get_output()
if transformer is None:
return raw_result
return wf_base.TaskResult(
data=expr.evaluate_recursively(transformer, raw_result.data),
error=raw_result.error
)

View File

@ -20,7 +20,7 @@ from mistral.actions import action_factory as a_f
from mistral.engine1 import base
from mistral import exceptions as exc
from mistral.openstack.common import log as logging
from mistral.workflow import base as wf_base
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
@ -50,7 +50,7 @@ class DefaultExecutor(base.Executor):
if action.is_sync():
self._engine_client.on_task_result(
task_id,
wf_base.TaskResult(data=result)
wf_utils.TaskResult(data=result)
)
except exc.ActionException as e:
LOG.exception(
@ -61,5 +61,5 @@ class DefaultExecutor(base.Executor):
self._engine_client.on_task_result(
task_id,
wf_base.TaskResult(error=str(e))
wf_utils.TaskResult(error=str(e))
)

View File

@ -87,9 +87,29 @@ class WaitBeforePolicy(base.TaskPolicy):
self.delay = delay
def before_task_start(self, task_db, task_spec):
context_key = 'wait_before_policy'
runtime_context = _ensure_context_has_key(
task_db.runtime_context,
context_key
)
task_db.runtime_context = runtime_context
policy_context = runtime_context[context_key]
if policy_context.get('skip'):
# Unset state 'DELAYED'.
task_db.state = states.RUNNING
return
policy_context.update({'skip': True})
_log_task_delay(task_db.name, task_db.state, self.delay)
task_db.state = states.DELAYED
scheduler.schedule_call(
_ENGINE_CLIENT_PATH,
'run_task',
@ -106,7 +126,10 @@ class WaitAfterPolicy(base.TaskPolicy):
context_key = 'wait_after_policy'
runtime_context = _ensure_context_has_key(
task_db.runtime_context, context_key)
task_db.runtime_context,
context_key
)
task_db.runtime_context = runtime_context
policy_context = runtime_context[context_key]
@ -115,6 +138,7 @@ class WaitAfterPolicy(base.TaskPolicy):
# Unset state 'DELAYED'.
task_db.state = \
states.ERROR if raw_result.is_error() else states.SUCCESS
return
policy_context.update({'skip': True})
@ -125,8 +149,9 @@ class WaitAfterPolicy(base.TaskPolicy):
task_db.state = states.DELAYED
serializers = {
'raw_result': 'mistral.utils.serializer.TaskResultSerializer'
'raw_result': 'mistral.workflow.utils.TaskResultSerializer'
}
scheduler.schedule_call(
_ENGINE_CLIENT_PATH,
'on_task_result',
@ -156,8 +181,12 @@ class RetryPolicy(base.TaskPolicy):
context_key = 'retry_task_policy'
runtime_context = _ensure_context_has_key(
task_db.runtime_context, context_key)
task_db.runtime_context,
context_key
)
task_db.runtime_context = runtime_context
state = states.ERROR if raw_result.is_error() else states.SUCCESS
if state != states.ERROR:
@ -167,14 +196,17 @@ class RetryPolicy(base.TaskPolicy):
policy_context = runtime_context[context_key]
retry_no = 0
if "retry_no" in policy_context:
retry_no = policy_context["retry_no"]
del policy_context["retry_no"]
retries_remain = retry_no + 1 < self.count
break_early = (expressions.evaluate(
self.break_on, outbound_context) if
self.break_on and outbound_context else False)
break_early = (
expressions.evaluate(self.break_on, outbound_context)
if self.break_on and outbound_context else False
)
if not retries_remain or break_early:
return

View File

@ -17,20 +17,15 @@ from oslo import messaging
from mistral import context as auth_ctx
from mistral.engine1 import base
from mistral.engine1 import default_engine as def_eng
from mistral.engine1 import default_executor as def_executor
from mistral.openstack.common import log as logging
from mistral.workflow import base as wf_base
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
_TRANSPORT = None
_ENGINE_SERVER = None
_ENGINE_CLIENT = None
_EXECUTOR_SERVER = None
_EXECUTOR_CLIENT = None
@ -43,18 +38,6 @@ def get_transport():
return _TRANSPORT
def get_engine_server():
global _ENGINE_SERVER
if not _ENGINE_SERVER:
# TODO(rakhmerov): It should be configurable.
_ENGINE_SERVER = EngineServer(
def_eng.DefaultEngine(get_engine_client(), get_executor_client())
)
return _ENGINE_SERVER
def get_engine_client():
global _ENGINE_CLIENT
@ -64,18 +47,6 @@ def get_engine_client():
return _ENGINE_CLIENT
def get_executor_server():
global _EXECUTOR_SERVER
if not _EXECUTOR_SERVER:
# TODO(rakhmerov): It should be configurable.
_EXECUTOR_SERVER = ExecutorServer(
def_executor.DefaultExecutor(get_engine_client())
)
return _EXECUTOR_SERVER
def get_executor_client():
global _EXECUTOR_CLIENT
@ -118,7 +89,7 @@ class EngineServer(object):
:return: Task.
"""
task_result = wf_base.TaskResult(result_data, result_error)
task_result = wf_utils.TaskResult(result_data, result_error)
LOG.info(
"Received RPC request 'on_task_result'[rpc_ctx=%s,"

117
mistral/engine1/utils.py Normal file
View File

@ -0,0 +1,117 @@
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.openstack.common import log as logging
from mistral.workbook import parser as spec_parser
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
def resolve_action(wf_name, wf_spec_name, action_spec_name):
action_db = None
if wf_name != wf_spec_name:
# If workflow belongs to a workbook then check
# action within the same workbook (to be able to
# use short names within workbooks).
# If it doesn't exist then use a name from spec
# to find an action in DB.
wb_name = wf_name.rstrip(wf_spec_name)[:-1]
action_full_name = "%s.%s" % (wb_name, action_spec_name)
action_db = db_api.load_action(action_full_name)
if not action_db:
action_db = db_api.load_action(action_spec_name)
if not action_db:
raise exc.InvalidActionException(
"Failed to find action [action_name=%s]" % action_spec_name
)
return action_db
def resolve_workflow(parent_wf_name, parent_wf_spec_name, wf_spec_name):
wf_db = None
if parent_wf_name != parent_wf_spec_name:
# If parent workflow belongs to a workbook then
# check child workflow within the same workbook
# (to be able to use short names within workbooks).
# If it doesn't exist then use a name from spec
# to find a workflow in DB.
wb_name = parent_wf_name.rstrip(parent_wf_spec_name)[:-1]
wf_full_name = "%s.%s" % (wb_name, wf_spec_name)
wf_db = db_api.load_workflow(wf_full_name)
if not wf_db:
wf_db = db_api.load_workflow(wf_spec_name)
if not wf_db:
raise exc.WorkflowException(
"Failed to find workflow [name=%s]" % wf_spec_name
)
return wf_db
def transform_result(exec_db, task_db, raw_result):
if raw_result.is_error():
return raw_result
action_spec_name =\
spec_parser.get_task_spec(task_db.spec).get_action_name()
wf_spec_name = \
spec_parser.get_workflow_spec(exec_db.wf_spec).get_name()
if action_spec_name:
return transform_action_result(
exec_db.wf_name,
wf_spec_name,
action_spec_name,
raw_result
)
return raw_result
def transform_action_result(wf_name, wf_spec_name, action_spec_name,
raw_result):
action_db = resolve_action(
wf_name,
wf_spec_name,
action_spec_name
)
if not action_db.spec:
return raw_result
transformer = spec_parser.get_action_spec(action_db.spec).get_output()
if transformer is None:
return raw_result
return wf_utils.TaskResult(
data=expr.evaluate_recursively(transformer, raw_result.data),
error=raw_result.error
)

View File

@ -24,8 +24,8 @@ from mistral.db.v2.sqlalchemy import models
from mistral.engine1 import rpc
from mistral import exceptions as exc
from mistral.tests.unit.api import base
from mistral.workflow import base as wf
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
# TODO(everyone): later we need additional tests verifying all the errors etc.
@ -67,13 +67,13 @@ UPDATED_TASK_DB = copy.copy(TASK_DB)
UPDATED_TASK_DB['state'] = 'SUCCESS'
UPDATED_TASK = copy.copy(TASK)
UPDATED_TASK['state'] = 'SUCCESS'
UPDATED_TASK_RES = wf.TaskResult(json.loads(UPDATED_TASK['result']))
UPDATED_TASK_RES = wf_utils.TaskResult(json.loads(UPDATED_TASK['result']))
ERROR_TASK_DB = copy.copy(TASK_DB)
ERROR_TASK_DB['state'] = 'ERROR'
ERROR_TASK = copy.copy(TASK)
ERROR_TASK['state'] = 'ERROR'
ERROR_TASK_RES = wf.TaskResult(None, json.loads(ERROR_TASK['result']))
ERROR_TASK_RES = wf_utils.TaskResult(None, json.loads(ERROR_TASK['result']))
BROKEN_TASK = copy.copy(TASK)
BROKEN_TASK['result'] = 'string not escaped'

View File

@ -91,8 +91,10 @@ class EngineTestCase(base.DbTestCase):
self.engine_client = rpc.EngineClient(transport)
self.executor_client = rpc.ExecutorClient(transport)
self.engine = def_eng.DefaultEngine(self.engine_client,
self.executor_client)
self.engine = def_eng.DefaultEngine(
self.engine_client,
self.executor_client
)
self.executor = def_exec.DefaultExecutor(self.engine_client)
LOG.info("Starting engine and executor threads...")

View File

@ -22,8 +22,8 @@ from mistral.engine1 import default_engine as d_eng
from mistral.openstack.common import log as logging
from mistral.services import workbooks as wb_service
from mistral.tests import base
from mistral.workflow import base as wf_base
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
@ -139,7 +139,7 @@ class DefaultEngineTest(base.DbTestCase):
# Finish 'task1'.
task1_db = self.engine.on_task_result(
exec_db.tasks[0].id,
wf_base.TaskResult(data='Hey')
wf_utils.TaskResult(data='Hey')
)
self.assertIsInstance(task1_db, models.Task)
@ -172,7 +172,7 @@ class DefaultEngineTest(base.DbTestCase):
# Finish 'task2'.
task2_db = self.engine.on_task_result(
task2_db.id,
wf_base.TaskResult(data='Hi')
wf_utils.TaskResult(data='Hi')
)
exec_db = db_api.get_execution(exec_db.id)

View File

@ -95,7 +95,7 @@ workflows:
tasks:
task1:
action: std.http url="http://some_non-existient_host"
action: std.http url="http://some_non-existing_host"
policies:
retry:
count: 2
@ -154,8 +154,12 @@ class PoliciesTest(base.EngineTestCase):
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
task_db = exec_db.tasks[0]
self.assertEqual(states.DELAYED, task_db.state)
self.assertIsNone(task_db.runtime_context)
self.assertDictEqual(
{'wait_before_policy': {'skip': True}},
task_db.runtime_context
)
self._await(
lambda: self.is_execution_success(exec_db.id),
@ -175,6 +179,7 @@ class PoliciesTest(base.EngineTestCase):
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
task_db = exec_db.tasks[0]
self.assertEqual(states.RUNNING, task_db.state)
self.assertIsNone(task_db.runtime_context)
@ -200,6 +205,7 @@ class PoliciesTest(base.EngineTestCase):
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
task_db = exec_db.tasks[0]
self.assertEqual(states.RUNNING, task_db.state)
self.assertIsNone(task_db.runtime_context)
@ -213,10 +219,13 @@ class PoliciesTest(base.EngineTestCase):
self._await(
lambda: self.is_execution_error(exec_db.id),
)
exec_db = db_api.get_execution(exec_db.id)
task_db = exec_db.tasks[0]
self.assertEqual(
1, task_db.runtime_context["retry_task_policy"]["retry_no"])
1,
task_db.runtime_context["retry_task_policy"]["retry_no"]
)
self.assertIsNotNone(exec_db)
self.assertEqual(states.ERROR, exec_db.state)

View File

@ -19,7 +19,7 @@ import mock
from mistral.db.v2 import api as db_api
from mistral.services import scheduler
from mistral.tests import base
from mistral.workflow import base as wf_base
from mistral.workflow import utils as wf_utils
def factory_method():
@ -100,7 +100,7 @@ class SchedulerServiceTest(base.DbTestCase):
'test_scheduler.factory_method')
target_method = 'run_something'
task_result = wf_base.TaskResult('data', 'error')
task_result = wf_utils.TaskResult('data', 'error')
method_args = {
'name': 'task',
@ -108,7 +108,7 @@ class SchedulerServiceTest(base.DbTestCase):
'result': task_result}
serializers_map = {
'result': 'mistral.utils.serializer.TaskResultSerializer'
'result': 'mistral.workflow.utils.TaskResultSerializer'
}
delay = 0.5
@ -132,7 +132,7 @@ class SchedulerServiceTest(base.DbTestCase):
result = factory().run_something.call_args[1].get('result')
self.assertIsInstance(result, wf_base.TaskResult)
self.assertIsInstance(result, wf_utils.TaskResult)
self.assertEqual('data', result.data)
self.assertEqual('error', result.error)

View File

@ -18,9 +18,9 @@ from mistral.db.v2.sqlalchemy import models
from mistral.openstack.common import log as logging
from mistral.tests import base
from mistral.workbook import parser as spec_parser
from mistral.workflow import base as wf_base
from mistral.workflow import direct_workflow as d_wf
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
@ -83,10 +83,10 @@ class DirectWorkflowHandlerTest(base.BaseTest):
return task_db
def test_start_workflow(self):
task_specs = self.handler.start_workflow()
commands = self.handler.start_workflow()
self.assertEqual(1, len(task_specs))
self.assertEqual('task1', task_specs[0].get_name())
self.assertEqual(1, len(commands))
self.assertEqual('task1', commands[0].task_spec.get_name())
self.assertEqual(states.RUNNING, self.exec_db.state)
def test_on_task_result(self):
@ -95,13 +95,13 @@ class DirectWorkflowHandlerTest(base.BaseTest):
task1_db = self._create_db_task('1-1-1-1', 'task1', states.RUNNING)
# Emulate finishing 'task1'.
task_specs = self.handler.on_task_result(
commands = self.handler.on_task_result(
task1_db,
wf_base.TaskResult(data='Hey')
wf_utils.TaskResult(data='Hey')
)
self.assertEqual(1, len(task_specs))
self.assertEqual('task2', task_specs[0].get_name())
self.assertEqual(1, len(commands))
self.assertEqual('task2', commands[0].task_spec.get_name())
self.assertEqual(states.RUNNING, self.exec_db.state)
self.assertEqual(states.SUCCESS, task1_db.state)
@ -109,12 +109,12 @@ class DirectWorkflowHandlerTest(base.BaseTest):
# Emulate finishing 'task2'.
task2_db = self._create_db_task('1-1-1-2', 'task2', states.RUNNING)
task_specs = self.handler.on_task_result(
commands = self.handler.on_task_result(
task2_db,
wf_base.TaskResult(data='Hi')
wf_utils.TaskResult(data='Hi')
)
self.assertEqual(0, len(task_specs))
self.assertEqual(0, len(commands))
self.assertEqual(states.SUCCESS, self.exec_db.state)
self.assertEqual(states.SUCCESS, task1_db.state)

View File

@ -17,9 +17,9 @@ from mistral import exceptions as exc
from mistral.openstack.common import log as logging
from mistral.tests import base
from mistral.workbook import parser as spec_parser
from mistral.workflow import base as wf_base
from mistral.workflow import reverse_workflow as r_wf
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
@ -74,17 +74,17 @@ class ReverseWorkflowHandlerTest(base.BaseTest):
return task_db
def test_start_workflow_task2(self):
task_specs = self.handler.start_workflow(task_name='task2')
commands = self.handler.start_workflow(task_name='task2')
self.assertEqual(1, len(task_specs))
self.assertEqual('task1', task_specs[0].get_name())
self.assertEqual(1, len(commands))
self.assertEqual('task1', commands[0].task_spec.get_name())
self.assertEqual(states.RUNNING, self.exec_db.state)
def test_start_workflow_task1(self):
task_specs = self.handler.start_workflow(task_name='task1')
commands = self.handler.start_workflow(task_name='task1')
self.assertEqual(1, len(task_specs))
self.assertEqual('task1', task_specs[0].get_name())
self.assertEqual(1, len(commands))
self.assertEqual('task1', commands[0].task_spec.get_name())
self.assertEqual(states.RUNNING, self.exec_db.state)
def test_start_workflow_without_task(self):
@ -97,13 +97,13 @@ class ReverseWorkflowHandlerTest(base.BaseTest):
task1_db = self._create_db_task('1-1-1-1', 'task1', states.RUNNING)
# Emulate finishing 'task1'.
task_specs = self.handler.on_task_result(
commands = self.handler.on_task_result(
task1_db,
wf_base.TaskResult(data='Hey')
wf_utils.TaskResult(data='Hey')
)
self.assertEqual(1, len(task_specs))
self.assertEqual('task2', task_specs[0].get_name())
self.assertEqual(1, len(commands))
self.assertEqual('task2', commands[0].task_spec.get_name())
self.assertEqual(states.RUNNING, self.exec_db.state)
self.assertEqual(states.SUCCESS, task1_db.state)
@ -113,7 +113,7 @@ class ReverseWorkflowHandlerTest(base.BaseTest):
task_specs = self.handler.on_task_result(
task2_db,
wf_base.TaskResult(data='Hi!')
wf_utils.TaskResult(data='Hi!')
)
self.assertEqual(0, len(task_specs))

View File

@ -14,8 +14,6 @@
import abc
from mistral.workflow import base
class Serializer(object):
@abc.abstractmethod
@ -25,17 +23,3 @@ class Serializer(object):
@abc.abstractmethod
def deserialize(self, entity):
pass
class TaskResultSerializer(Serializer):
def serialize(self, entity):
return {
'data': entity.data,
'error': entity.error
}
def deserialize(self, entity):
return base.TaskResult(
entity['data'],
entity['error']
)

View File

@ -13,6 +13,8 @@
# limitations under the License.
import abc
from mistral.engine1 import commands
from mistral import exceptions as exc
from mistral.openstack.common import log as logging
from mistral import utils
@ -48,7 +50,7 @@ class WorkflowHandler(object):
according to this workflow type rules and identifies a list of
tasks that can be scheduled for execution.
:param params: Additional parameters specific to workflow type.
:return: List of tasks that can be scheduled for execution.
:return: List of engine commands that needs to be performed.
"""
raise NotImplementedError
@ -59,8 +61,8 @@ class WorkflowHandler(object):
identifies tasks that can be scheduled for execution.
:param task_db: Task that the result corresponds to.
:param raw_result: Raw task result that comes from action/workflow
(before publisher). Instance of mistral.workflow.base.TaskResult
:return List of tasks that can be scheduled for execution.
(before publisher). Instance of mistral.workflow.utils.TaskResult
:return List of engine commands that needs to be performed.
"""
task_db.state = \
states.ERROR if raw_result.is_error() else states.SUCCESS
@ -96,7 +98,7 @@ class WorkflowHandler(object):
task_out_ctx
)
return task_specs
return [commands.RunTask(t_s) for t_s in task_specs]
@abc.abstractmethod
def _find_next_tasks(self, task_db):
@ -124,7 +126,7 @@ class WorkflowHandler(object):
def resume_workflow(self):
"""Resumes workflow this handler is associated with.
:return: Tasks available to run.
:return: List of engine commands that needs to be performed..
"""
self._set_execution_state(states.RUNNING)
@ -156,27 +158,6 @@ class WorkflowHandler(object):
if t_db.state == states.RUNNING]
class TaskResult(object):
"""Explicit data structure containing a result of task execution."""
def __init__(self, data=None, error=None):
self.data = data
self.error = error
def __repr__(self):
return 'TaskResult [data=%s, error=%s]' % \
(repr(self.data), repr(self.error))
def is_error(self):
return self.error is not None
def is_success(self):
return not self.is_error()
def __eq__(self, other):
return self.data == other.data and self.error == other.error
class FlowControl(object):
"""Flow control structure.

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.engine1 import commands
from mistral import expressions as expr
from mistral.openstack.common import log as logging
from mistral.workflow import base
@ -37,7 +38,8 @@ class DirectWorkflowHandler(base.WorkflowHandler):
def start_workflow(self, **params):
self._set_execution_state(states.RUNNING)
return [self.wf_spec.get_start_task()]
# TODO(rakhmerov): Explicit start task will be removed.
return [commands.RunTask(self.wf_spec.get_start_task())]
def get_upstream_tasks(self, task_spec):
# TODO(rakhmerov): For direct workflow it's pretty hard to do

View File

@ -15,6 +15,7 @@
import networkx as nx
from networkx.algorithms import traversal
from mistral.engine1 import commands
from mistral import exceptions as exc
from mistral.workflow import base
from mistral.workflow import states
@ -50,7 +51,7 @@ class ReverseWorkflowHandler(base.WorkflowHandler):
if len(task_specs) > 0:
self._set_execution_state(states.RUNNING)
return task_specs
return [commands.RunTask(t_s) for t_s in task_specs]
def get_upstream_tasks(self, task_spec):
return [self.wf_spec.get_tasks()[t_name]

44
mistral/workflow/utils.py Normal file
View File

@ -0,0 +1,44 @@
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.utils import serializer
class TaskResult(object):
"""Explicit data structure containing a result of task execution."""
def __init__(self, data=None, error=None):
self.data = data
self.error = error
def __repr__(self):
return 'TaskResult [data=%s, error=%s]' % \
(repr(self.data), repr(self.error))
def is_error(self):
return self.error is not None
def is_success(self):
return not self.is_error()
def __eq__(self, other):
return self.data == other.data and self.error == other.error
class TaskResultSerializer(serializer.Serializer):
def serialize(self, entity):
return {'data': entity.data, 'error': entity.error}
def deserialize(self, entity):
return TaskResult(entity['data'], entity['error'])