'requires' should take a string or list

* 'requires' can be a task name, or a list of task names. No
use case for a dictionary.
* added convenience methods to get requires.
* minor fix with TaskSpec: get_dict... returns {} not None.

Change-Id: I1c9ef4ab41679e10d8aab38dfae68f169608bd22
This commit is contained in:
Dmitri Zimine 2014-06-19 16:34:37 -07:00
parent afe28f743f
commit 2b15c8d0b5
8 changed files with 33 additions and 32 deletions

View File

@ -87,7 +87,7 @@ class Task(mb.MistralBase):
id = _id_column() id = _id_column()
name = sa.Column(sa.String(80)) name = sa.Column(sa.String(80))
requires = sa.Column(st.JsonDictType()) requires = sa.Column(st.JsonListType())
workbook_name = sa.Column(sa.String(80)) workbook_name = sa.Column(sa.String(80))
execution_id = sa.Column(sa.String(36)) execution_id = sa.Column(sa.String(36))
description = sa.Column(sa.String(200)) description = sa.Column(sa.String(200))

View File

@ -303,7 +303,7 @@ class Engine(object):
db_task = db_api.task_create(workbook_name, execution_id, { db_task = db_api.task_create(workbook_name, execution_id, {
"name": task.name, "name": task.name,
"requires": task.requires, "requires": task.get_requires(),
"task_spec": task.to_dict(), "task_spec": task.to_dict(),
"action_spec": {} if not action_spec "action_spec": {} if not action_spec
else action_spec.to_dict(), else action_spec.to_dict(),

View File

@ -42,7 +42,22 @@ def find_workflow_tasks(workbook, task_name):
def find_resolved_tasks(tasks): def find_resolved_tasks(tasks):
# We need to analyse graph and see which tasks are ready to start # We need to analyse graph and see which tasks are ready to start
return _get_resolved_tasks(tasks) resolved_tasks = []
delayed_tasks = []
allows = []
for t in tasks:
if t['state'] == states.SUCCESS:
allows += [t['name']]
allow_set = set(allows)
for t in tasks:
deps = t.get('requires', [])
if len(set(deps) - allow_set) == 0:
# all required tasks, if any, are SUCCESS
if t['state'] == states.IDLE:
resolved_tasks.append(t)
elif t['state'] == states.DELAYED:
delayed_tasks.append(t)
return resolved_tasks, delayed_tasks
def _get_checked_tasks(target_tasks): def _get_checked_tasks(target_tasks):
@ -134,21 +149,3 @@ def _update_dependencies(tasks, graph):
for task in tasks: for task in tasks:
for dep in _get_dependency_tasks(tasks, task): for dep in _get_dependency_tasks(tasks, task):
graph.add_edge(dep, task) graph.add_edge(dep, task)
def _get_resolved_tasks(tasks):
resolved_tasks = []
delayed_tasks = []
allows = []
for t in tasks:
if t['state'] == states.SUCCESS:
allows += [t['name']]
allow_set = set(allows)
for t in tasks:
deps = t.get('requires', {}).keys()
if len(set(deps) - allow_set) == 0:
if t['state'] == states.IDLE:
resolved_tasks.append(t)
elif t['state'] == states.DELAYED:
delayed_tasks.append(t)
return resolved_tasks, delayed_tasks

View File

@ -45,7 +45,7 @@ Workflow:
flavor_id: 42 flavor_id: 42
attach-volumes: attach-volumes:
requires: attach-volumes requires: create-vms
action: MyRest.attach-volume action: MyRest.attach-volume
parameters: parameters:
size: 1234 size: 1234
@ -58,7 +58,8 @@ Workflow:
server_id: 123 server_id: 123
backup-vms: backup-vms:
requires: [create-vms] requires:
- create-vms
action: MyRest.backup-vm action: MyRest.backup-vm
parameters: parameters:
server_id: 123 server_id: 123

View File

@ -232,7 +232,7 @@ TASKS = [
'execution_id': '1', 'execution_id': '1',
'name': 'my_task1', 'name': 'my_task1',
'description': 'my description', 'description': 'my description',
'requires': {'my_task2': '', 'my_task3': ''}, 'requires': ['my_task2', 'my_task3'],
'task_spec': None, 'task_spec': None,
'action_spec': None, 'action_spec': None,
'action': {'name': 'Nova:create-vm'}, 'action': {'name': 'Nova:create-vm'},
@ -250,7 +250,7 @@ TASKS = [
'execution_id': '1', 'execution_id': '1',
'name': 'my_task2', 'name': 'my_task2',
'description': 'my description', 'description': 'my description',
'requires': {'my_task4': '', 'my_task5': ''}, 'requires': ['my_task4', 'my_task5'],
'task_spec': None, 'task_spec': None,
'action_spec': None, 'action_spec': None,
'action': {'name': 'Cinder:create-volume'}, 'action': {'name': 'Cinder:create-volume'},

View File

@ -79,7 +79,7 @@ SAMPLE_TASK = {
'task_spec': { 'task_spec': {
'action': 'MyRest.my-action', 'action': 'MyRest.my-action',
'name': TASK_NAME}, 'name': TASK_NAME},
'requires': {}, 'requires': [],
'state': states.IDLE} 'state': states.IDLE}
SAMPLE_CONTEXT = { SAMPLE_CONTEXT = {

View File

@ -31,7 +31,7 @@ TASKS = [
'state': states.SUCCESS 'state': states.SUCCESS
}, },
{ {
'requires': {'create-vms': ''}, 'requires': ['create-vms'],
'name': 'attach-volume', 'name': 'attach-volume',
'state': states.IDLE 'state': states.IDLE
} }

View File

@ -42,11 +42,11 @@ class TaskSpec(base.BaseSpec):
elif isinstance(req, dict): elif isinstance(req, dict):
task['requires'] = req task['requires'] = req
def _get_on_state(self, key): def _get_as_dict(self, key):
tasks = self.get_property(key) tasks = self.get_property(key)
if not tasks: if not tasks:
return None return {}
if isinstance(tasks, dict): if isinstance(tasks, dict):
return tasks return tasks
@ -61,14 +61,17 @@ class TaskSpec(base.BaseSpec):
def get_property(self, property_name, default=None): def get_property(self, property_name, default=None):
return self._data.get(property_name, default) return self._data.get(property_name, default)
def get_requires(self):
return self._get_as_dict('requires').keys()
def get_on_error(self): def get_on_error(self):
return self._get_on_state("on-error") return self._get_as_dict("on-error")
def get_on_success(self): def get_on_success(self):
return self._get_on_state("on-success") return self._get_as_dict("on-success")
def get_on_finish(self): def get_on_finish(self):
return self._get_on_state("on-finish") return self._get_as_dict("on-finish")
def get_action_namespace(self): def get_action_namespace(self):
return self.action.split('.')[0] return self.action.split('.')[0]