Implements: blueprint mistral-std-repeat-action

In this implementation there is only a provision to repeat a specific action.
- Features like configurable iterations, delay, break-on are implemented.
- All final data is published to the context and there is no per iteration
  persistence.
- The iteration context which is specific to the scope is saved in the
  exec_flow_context property of a task.

Change-Id: I1e9ff7847f0046e11d3b32eec65f664c2b09d3b1
This commit is contained in:
Manas Kelshikar 2014-03-12 17:33:52 -07:00
parent 6424670a58
commit 025343c050
11 changed files with 301 additions and 19 deletions

View File

@ -100,3 +100,6 @@ class Task(mb.MistralBase):
in_context = sa.Column(st.JsonDictType())
input = sa.Column(st.JsonDictType())
output = sa.Column(st.JsonDictType())
# Execution context
exec_flow_context = sa.Column(st.JsonDictType())

View File

@ -15,6 +15,7 @@
# limitations under the License.
import abc
import eventlet
from mistral.openstack.common import log as logging
from mistral.db import api as db_api
@ -23,6 +24,7 @@ from mistral import exceptions as exc
from mistral.engine import states
from mistral.engine import workflow
from mistral.engine import data_flow
from mistral.engine import repeater
LOG = logging.getLogger(__name__)
@ -58,6 +60,7 @@ class AbstractEngine(object):
db_api.commit_tx()
except Exception as e:
LOG.exception("Failed to create necessary DB objects.")
raise exc.EngineException("Failed to create necessary DB objects:"
" %s" % e)
finally:
@ -80,14 +83,11 @@ class AbstractEngine(object):
task_output = data_flow.get_task_output(task, result)
# Update task state.
task = db_api.task_update(workbook_name, execution_id, task_id,
{"state": state, "output": task_output})
task, outbound_context = cls._update_task(workbook, task, state,
task_output)
execution = db_api.execution_get(workbook_name, execution_id)
# Calculate task outbound context.
outbound_context = data_flow.get_outbound_context(task)
cls._create_next_tasks(task, workbook)
# Determine what tasks need to be started.
@ -110,6 +110,7 @@ class AbstractEngine(object):
db_api.commit_tx()
except Exception as e:
LOG.exception("Failed to create necessary DB objects.")
raise exc.EngineException("Failed to create necessary DB objects:"
" %s" % e)
finally:
@ -118,6 +119,9 @@ class AbstractEngine(object):
if states.is_stopped_or_finished(execution["state"]):
return task
if task['state'] == states.DELAYED:
cls._schedule_run(workbook, task, outbound_context)
if tasks_to_start:
cls._run_tasks(tasks_to_start)
@ -163,26 +167,26 @@ class AbstractEngine(object):
db_tasks = cls._create_tasks(tasks, workbook, task['workbook_name'],
task['execution_id'])
return workflow.find_resolved_tasks(db_tasks)
@classmethod
def _create_tasks(cls, task_list, workbook, workbook_name, execution_id):
tasks = []
# create tasks of all the top level tasks.
for task in task_list:
state, exec_flow_context = repeater.get_task_runtime(task)
service_spec = workbook.services.get(task.get_action_service())
db_task = db_api.task_create(workbook_name, execution_id, {
"name": task.name,
"requires": task.requires,
"task_spec": task.to_dict(),
"service_spec": workbook.services.get(
task.get_action_service()).to_dict(),
"state": states.IDLE,
"tags": task.get_property("tags", None)
"service_spec": {} if not service_spec else
service_spec.to_dict(),
"state": state,
"tags": task.get_property("tags", None),
"exec_flow_context": exec_flow_context
})
tasks.append(db_task)
return tasks
@classmethod
@ -203,3 +207,70 @@ class AbstractEngine(object):
@classmethod
def _add_token_to_context(cls, context, workbook):
return data_flow.add_token_to_context(context, workbook)
@classmethod
def _update_task(cls, workbook, task, state, task_output):
"""
Update the task with the runtime information. The outbound_context
for this task is also calculated.
:return: task, outbound_context. task is the updated task and
computed outbound context.
"""
workbook_name = task['workbook_name']
execution_id = task['execution_id']
task_spec = workbook.tasks.get(task["name"])
exec_flow_context = task["exec_flow_context"]
# compute the outbound_context, state and exec_flow_context
outbound_context = data_flow.get_outbound_context(task, task_output)
state, exec_flow_context = repeater.get_task_runtime(task_spec, state,
outbound_context,
exec_flow_context)
# update the task
update_values = {"state": state,
"output": task_output,
"exec_flow_context": exec_flow_context}
task = db_api.task_update(workbook_name, execution_id, task["id"],
update_values)
return task, outbound_context
@classmethod
def _schedule_run(cls, workbook, task, outbound_context):
"""
Schedules task to run after the delay defined in the task
specification. If no delay is specified this method is a no-op.
"""
def run_delayed_task():
"""
Runs the delayed task. Performs all the steps required to setup
a task to run which are not already done. This is mostly code
copied over from convey_task_result.
"""
db_api.start_tx()
try:
workbook_name = task['workbook_name']
execution_id = task['execution_id']
execution = db_api.execution_get(workbook_name, execution_id)
# change state from DELAYED to IDLE to unblock processing
db_task = db_api.task_update(workbook_name,
execution_id,
task['id'],
{"state": states.IDLE})
task_to_start = [db_task]
data_flow.prepare_tasks(task_to_start, outbound_context)
db_api.commit_tx()
finally:
db_api.end_tx()
if not states.is_stopped_or_finished(execution["state"]):
cls._run_tasks(task_to_start)
task_spec = workbook.tasks.get(task['name'])
retries, break_on, delay_sec = task_spec.get_repeat_task_parameters()
if delay_sec > 0:
# run the task after the specified delay
eventlet.spawn_after(delay_sec, run_delayed_task)
else:
LOG.warn("No delay specified for task(id=%s) name=%s. Not "
"scheduling for execution." % (task['id'], task['name']))

View File

@ -74,12 +74,13 @@ def get_task_output(task, result):
return output
def get_outbound_context(task):
def get_outbound_context(task, output=None):
in_context = task.get('in_context')
out_context = in_context.copy() if in_context else {}
output = task.get('output')
if not output:
output = task.get('output')
if output:
out_context.update(output)
@ -105,6 +106,8 @@ def _modify_item(item, context):
def apply_context(data, context):
if not context:
return data
if isinstance(data, dict):
for key in data:
data[key] = _modify_item(data[key], context)

View File

@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
#
# Copyright 2014 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.engine import expressions
from mistral.engine import states
def get_task_runtime(task_spec, state=states.IDLE, outbound_context=None,
exec_flow_context=None):
"""
Computes the state and exec_flow_context runtime properties for a task
based on the supplied properties. This method takes the repeat nature of a
task into consideration.
:param task_spec: specification of the task
:param state: suggested next state
:param outbound_context: outbound_context to be used for computation
:param exec_flow_context: current flow context
:return: state, exec_flow_context tuple. Sample scenarios are,
1. required iteration = 5, current iteration = 0, state = SUCCESS
Move to next iteration therefore state = IDLE/DELAYED, iteration_no = 1.
2. required iteration = 5, current iteration = 2, state = ERROR Stop task.
state = ERROR, iteration_no = 2
3. required iteration = 5, current iteration = 4, state = SUCCESS
Iterations complete therefore state = SUCCESS, iteration_no = 4.
"""
if states.is_stopped_or_unsuccessful_finish(state) or not \
task_spec.is_repeater_task():
return state, exec_flow_context
if exec_flow_context is None:
exec_flow_context = {}
if outbound_context is None:
outbound_context = {}
iteration_no = -1
if "iteration_no" in exec_flow_context:
iteration_no = exec_flow_context["iteration_no"]
iterations, break_on, delay = task_spec.get_repeat_task_parameters()
iterations_incomplete = iteration_no + 1 < iterations
break_early = expressions.evaluate(break_on, outbound_context) if \
break_on and outbound_context else False
if iterations_incomplete and break_early:
state = states.SUCCESS
elif iterations_incomplete:
state = states.DELAYED if delay > 0 else states.IDLE
iteration_no += 1
elif not iterations_incomplete and state == states.IDLE:
# this is the case where the iterations are complete but task is still
# IDLE which implies SUCCESS. Can happen if repeat is specified but
# 0 iterations are requested.
state = states.SUCCESS
exec_flow_context["iteration_no"] = iteration_no
return state, exec_flow_context

View File

@ -116,11 +116,14 @@ class Executor(object):
db_task['state'] != states.IDLE:
return
self._do_task_action(db_task)
# update the state to running before performing action. The
# do_task_action assigns state to the task which is the appropriate
# value to preserve.
db_api.task_update(task['workbook_name'],
task['execution_id'],
task['id'],
{'state': states.RUNNING})
self._do_task_action(db_task)
except Exception as exc:
LOG.exception(exc)
self._handle_task_error(task, exc)

View File

@ -21,8 +21,9 @@ RUNNING = 'RUNNING'
SUCCESS = 'SUCCESS'
ERROR = 'ERROR'
STOPPED = 'STOPPED'
DELAYED = 'DELAYED'
_ALL = [IDLE, RUNNING, SUCCESS, ERROR, STOPPED]
_ALL = [IDLE, RUNNING, SUCCESS, ERROR, STOPPED, DELAYED]
def is_valid(state):
@ -37,6 +38,10 @@ def is_stopped_or_finished(state):
return state == STOPPED or is_finished(state)
def is_stopped_or_unsuccessful_finish(state):
return state == STOPPED or state == ERROR
def get_state_by_http_status_code(status_code):
if not status_code or status_code >= 400:
return ERROR

View File

@ -0,0 +1,17 @@
Services:
MyService:
type: ECHO
actions:
repeatable-action:
output:
greeting: Cheers!
Workflow:
tasks:
repeater_task:
repeat:
iterations: 0
delay: 0
action: MyService:repeatable-action
publish:
greet_msg: greeting

View File

@ -0,0 +1,26 @@
Services:
MyService:
type: ECHO
actions:
repeatable-action:
output:
greeting: Cheers!
Workflow:
tasks:
repeater_task:
repeat:
iterations: 5
delay: 0
action: MyService:repeatable-action
publish:
greet_msg: greeting
repeater_task_break_early:
repeat:
iterations: 5
delay: 0
break-on: $.greet_msg != null
action: MyService:repeatable-action
publish:
greet_msg: greeting

View File

@ -231,7 +231,8 @@ TASKS = [
'updated_at': None,
'in_context': None,
'input': None,
'output': None
'output': None,
'exec_flow_context': None
},
{
'id': '2',
@ -248,7 +249,8 @@ TASKS = [
'updated_at': None,
'in_context': {'image_id': '123123'},
'input': {'image_id': '123123'},
'output': {'vm_id': '343123'}
'output': {'vm_id': '343123'},
'exec_flow_context': None
},
]

View File

@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
#
# Copyright 2014 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.db import api as db_api
from mistral.tests import base
from mistral.engine.local import engine
from oslo.config import cfg
from mistral.openstack.common import importutils
# We need to make sure that all configuration properties are registered.
importutils.import_module("mistral.config")
cfg.CONF.pecan.auth_enable = False
ENGINE = engine.get_engine()
def create_workbook(workbook_name, definition_path):
return db_api.workbook_create({
'name': workbook_name,
'definition': base.get_resource(definition_path)
})
class RepeatTaskTest(base.DbTestCase):
def test_simple_repeat_task(self):
wb = create_workbook('wb_1', 'repeat_task/single_repeat_task.yaml')
execution = ENGINE.start_workflow_execution(wb['name'],
'repeater_task', None)
tasks = db_api.tasks_get(wb['name'], execution['id'])
self._assert_single_item(tasks, name='repeater_task')
self._assert_single_item(tasks, exec_flow_context={"iteration_no": 4})
def test_no_repeat_task(self):
wb = create_workbook('wb_2', 'repeat_task/no_repeat_task.yaml')
execution = ENGINE.start_workflow_execution(wb['name'],
'repeater_task', None)
tasks = db_api.tasks_get(wb['name'], execution['id'])
self._assert_single_item(tasks, name='repeater_task')
self._assert_single_item(tasks, exec_flow_context={
"iteration_no": -1})
def test_break_early_repeat_task(self):
wb = create_workbook('wb_3', 'repeat_task/single_repeat_task.yaml')
execution = ENGINE.start_workflow_execution(
wb['name'], 'repeater_task_break_early', None)
tasks = db_api.tasks_get(wb['name'], execution['id'])
self._assert_single_item(tasks, name='repeater_task_break_early')
self._assert_single_item(tasks, exec_flow_context={"iteration_no": 0})

View File

@ -71,6 +71,23 @@ class TaskSpec(base.BaseSpec):
def get_action_name(self):
return self.action.split(':')[1]
def is_repeater_task(self):
return self.get_property("repeat") is not None
def get_repeat_task_parameters(self):
iterations = 0
break_on = None
delay = 0
repeat = self.get_property("repeat")
if repeat:
if "iterations" in repeat:
iterations = repeat["iterations"]
if "break-on" in repeat:
break_on = repeat["break-on"]
if "delay" in repeat:
delay = repeat["delay"]
return iterations, break_on, delay
class TaskSpecList(base.BaseSpecList):
item_class = TaskSpec