Working on Data Flow (step 1)

* Refactored API layer so that we can work with 'context' as with a
 json object in underlying layers (DB, engine, etc.) rather than
 a string
* Added "context" parameter in all required places
* Added necessary Data Flow related properties to DB models
* Refactored and fixed a series of tests
* Minor formatting changes

TODO:
* Calculation of task incoming context
* Abstract interface for expression evaluator
* Data Flow related tests

Partially implements blueprint: mistral-dataflow

Change-Id: Ie7f94d79265e9861f7ad15c76ff6d788ec62b683
This commit is contained in:
Renat Akhmerov 2014-02-20 14:44:48 +07:00
parent 8a73da20fd
commit 58407b6f94
11 changed files with 258 additions and 163 deletions

View File

@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
from pecan import rest from pecan import rest
from pecan import abort from pecan import abort
from wsme import types as wtypes from wsme import types as wtypes
@ -37,9 +39,29 @@ class Execution(resource.Resource):
task = wtypes.text task = wtypes.text
state = wtypes.text state = wtypes.text
# Context is a JSON object but since WSME doesn't support arbitrary # Context is a JSON object but since WSME doesn't support arbitrary
# dictionaries we have to use text type. # dictionaries we have to use text type convert to json and back manually.
context = wtypes.text context = wtypes.text
def to_dict(self):
d = super(Execution, self).to_dict()
if d.get('context'):
d['context'] = json.loads(d['context'])
return d
@classmethod
def from_dict(cls, d):
e = cls()
for key, val in d.items():
if hasattr(e, key):
if key == 'context' and val:
val = json.dumps(val)
setattr(e, key, val)
return e
class Executions(resource.Resource): class Executions(resource.Resource):
"""A collection of Execution resources.""" """A collection of Execution resources."""
@ -80,8 +102,13 @@ class ExecutionsController(rest.RestController):
LOG.debug("Create execution [workbook_name=%s, execution=%s]" % LOG.debug("Create execution [workbook_name=%s, execution=%s]" %
(workbook_name, execution)) (workbook_name, execution))
try: try:
context = None
if execution.context:
context = json.loads(execution.context)
values = engine.start_workflow_execution(execution.workbook_name, values = engine.start_workflow_execution(execution.workbook_name,
execution.task) execution.task,
context)
except ex.MistralException as e: except ex.MistralException as e:
#TODO(nmakhotkin) we should use thing such a decorator here #TODO(nmakhotkin) we should use thing such a decorator here
abort(400, e.message) abort(400, e.message)

View File

@ -58,6 +58,7 @@ class WorkflowExecution(mb.MistralBase):
workbook_name = sa.Column(sa.String(80)) workbook_name = sa.Column(sa.String(80))
task = sa.Column(sa.String(80)) task = sa.Column(sa.String(80))
state = sa.Column(sa.String(20)) state = sa.Column(sa.String(20))
context = sa.Column(st.JsonDictType())
class Workbook(mb.MistralBase): class Workbook(mb.MistralBase):
@ -94,3 +95,8 @@ class Task(mb.MistralBase):
service_dsl = sa.Column(st.JsonDictType()) service_dsl = sa.Column(st.JsonDictType())
state = sa.Column(sa.String(20)) state = sa.Column(sa.String(20))
tags = sa.Column(st.JsonListType()) tags = sa.Column(st.JsonListType())
# Data Flow properties.
in_context = sa.Column(st.JsonDictType())
input = sa.Column(st.JsonDictType())
output = sa.Column(st.JsonDictType())

View File

@ -26,6 +26,10 @@ from mistral.engine import workflow
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
# TODO(rakhmerov): Upcoming Data Flow changes:
# 1. Calculate "in_context" for all the tasks submitted for execution.
# 2. Transfer "in_context" along with task data over AMQP.
class AbstractEngine(object): class AbstractEngine(object):
@classmethod @classmethod
@ -34,17 +38,22 @@ class AbstractEngine(object):
pass pass
@classmethod @classmethod
def start_workflow_execution(cls, workbook_name, task_name): def start_workflow_execution(cls, workbook_name, task_name, context):
wb_dsl = cls._get_wb_dsl(workbook_name)
dsl_tasks = workflow.find_workflow_tasks(wb_dsl, task_name)
db_api.start_tx() db_api.start_tx()
wb_dsl = cls._get_wb_dsl(workbook_name)
# Persist execution and tasks in DB. # Persist execution and tasks in DB.
try: try:
execution = cls._create_execution(workbook_name, task_name) execution = cls._create_execution(workbook_name,
task_name,
context)
tasks = cls._create_tasks(dsl_tasks, wb_dsl, tasks = cls._create_tasks(
workbook_name, execution['id']) workflow.find_workflow_tasks(wb_dsl, task_name),
wb_dsl,
workbook_name, execution['id']
)
db_api.commit_tx() db_api.commit_tx()
except Exception as e: except Exception as e:
@ -53,6 +62,9 @@ class AbstractEngine(object):
finally: finally:
db_api.end_tx() db_api.end_tx()
# TODO(rakhmerov): This doesn't look correct anymore, we shouldn't
# start tasks which don't have dependencies but are reachable only
# via direct transitions.
cls._run_tasks(workflow.find_resolved_tasks(tasks)) cls._run_tasks(workflow.find_resolved_tasks(tasks))
return execution return execution
@ -61,17 +73,17 @@ class AbstractEngine(object):
def convey_task_result(cls, workbook_name, execution_id, def convey_task_result(cls, workbook_name, execution_id,
task_id, state, result): task_id, state, result):
db_api.start_tx() db_api.start_tx()
wb_dsl = cls._get_wb_dsl(workbook_name) wb_dsl = cls._get_wb_dsl(workbook_name)
#TODO(rakhmerov): validate state transition #TODO(rakhmerov): validate state transition
# Update task state. # Update task state.
task = db_api.task_update(workbook_name, execution_id, task_id, task = db_api.task_update(workbook_name, execution_id, task_id,
{"state": state, "result": result}) {"state": state, "output": result})
execution = db_api.execution_get(workbook_name, execution_id) execution = db_api.execution_get(workbook_name, execution_id)
cls._create_next_tasks(task,
wb_dsl, cls._create_next_tasks(task, wb_dsl, workbook_name, execution_id)
workbook_name,
execution_id)
# Determine what tasks need to be started. # Determine what tasks need to be started.
tasks = db_api.tasks_get(workbook_name, execution_id) tasks = db_api.tasks_get(workbook_name, execution_id)
@ -128,24 +140,27 @@ class AbstractEngine(object):
return task["state"] return task["state"]
@classmethod @classmethod
def _create_execution(cls, workbook_name, task_name): def _create_execution(cls, workbook_name, task_name, context):
return db_api.execution_create(workbook_name, { return db_api.execution_create(workbook_name, {
"workbook_name": workbook_name, "workbook_name": workbook_name,
"task": task_name, "task": task_name,
"state": states.RUNNING "state": states.RUNNING,
"context": context
}) })
@classmethod @classmethod
def _create_next_tasks(cls, task, wb_dsl, def _create_next_tasks(cls, task, wb_dsl, workbook_name, execution_id):
workbook_name, execution_id):
dsl_tasks = workflow.find_tasks_after_completion(task, wb_dsl) dsl_tasks = workflow.find_tasks_after_completion(task, wb_dsl)
tasks = cls._create_tasks(dsl_tasks, wb_dsl,
workbook_name, execution_id) tasks = cls._create_tasks(dsl_tasks, wb_dsl, workbook_name,
execution_id)
return workflow.find_resolved_tasks(tasks) return workflow.find_resolved_tasks(tasks)
@classmethod @classmethod
def _create_tasks(cls, dsl_tasks, wb_dsl, workbook_name, execution_id): def _create_tasks(cls, dsl_tasks, wb_dsl, workbook_name, execution_id):
tasks = [] tasks = []
for dsl_task in dsl_tasks: for dsl_task in dsl_tasks:
task = db_api.task_create(workbook_name, execution_id, { task = db_api.task_create(workbook_name, execution_id, {
"name": dsl_task["name"], "name": dsl_task["name"],
@ -157,6 +172,7 @@ class AbstractEngine(object):
}) })
tasks.append(task) tasks.append(task)
return tasks return tasks
@classmethod @classmethod

View File

@ -31,15 +31,16 @@ finally:
pass pass
def start_workflow_execution(workbook_name, task_name): def start_workflow_execution(workbook_name, task_name, context=None):
"""Starts a workflow execution based on the specified workbook name """Starts a workflow execution based on the specified workbook name
and target task. and target task.
:param workbook_name: Workbook name :param workbook_name: Workbook name
:param task_name: Target task name :param task_name: Target task name
:param context: Execution context which defines a workflow input
:return: Workflow execution. :return: Workflow execution.
""" """
return IMPL.start_workflow_execution(workbook_name, task_name) return IMPL.start_workflow_execution(workbook_name, task_name, context)
def stop_workflow_execution(workbook_name, execution_id): def stop_workflow_execution(workbook_name, execution_id):

View File

@ -27,6 +27,10 @@ from mistral.engine.actions import action_helper as a_h
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
# TODO(rakhmerov): Upcoming Data Flow changes:
# 1. Receive "in_context" along with task data.
# 2. Apply task input expression to "in_context" and calculate "input".
def do_task_action(task): def do_task_action(task):
LOG.info("Starting task action [task_id=%s, action='%s', service='%s'" % LOG.info("Starting task action [task_id=%s, action='%s', service='%s'" %

View File

@ -15,6 +15,7 @@
# limitations under the License. # limitations under the License.
import mock import mock
import json
from mistral import exceptions as ex from mistral import exceptions as ex
from webtest.app import AppError from webtest.app import AppError
@ -24,20 +25,19 @@ from mistral.engine import engine
# TODO: later we need additional tests verifying all the errors etc. # TODO: later we need additional tests verifying all the errors etc.
EXECS = [ EXECS = [
{ {
'id': "123", 'id': '123',
'workbook_name': 'my_workbook', 'workbook_name': 'my_workbook',
'task': 'my_task', 'task': 'my_task',
'state': 'RUNNING', 'state': 'RUNNING',
'context': """ 'context': {
{ "person": {
"person": { "first_name": "John",
"first_name": "John", "last_name": "Doe"
"last_name": "Doe"
}
} }
""" }
} }
] ]
@ -45,38 +45,48 @@ UPDATED_EXEC = EXECS[0].copy()
UPDATED_EXEC['state'] = 'STOPPED' UPDATED_EXEC['state'] = 'STOPPED'
def canonize(json_dict):
if json_dict.get('context'):
json_dict['context'] = json.loads(json_dict['context'])
return json_dict
class TestExecutionsController(base.FunctionalTest): class TestExecutionsController(base.FunctionalTest):
@mock.patch.object(db_api, "execution_get", @mock.patch.object(db_api, 'execution_get',
mock.MagicMock(return_value=EXECS[0])) mock.MagicMock(return_value=EXECS[0]))
def test_get(self): def test_get(self):
resp = self.app.get('/v1/workbooks/my_workbook/executions/123') resp = self.app.get('/v1/workbooks/my_workbook/executions/123')
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
self.assertDictEqual(EXECS[0], resp.json) self.assertDictEqual(EXECS[0], canonize(resp.json))
@mock.patch.object(db_api, "execution_get", @mock.patch.object(db_api, 'execution_get',
mock.MagicMock(return_value=None)) mock.MagicMock(return_value=None))
def test_get_empty(self): def test_get_empty(self):
self.assertNotFound('/v1/workbooks/my_workbook/executions/123') self.assertNotFound('/v1/workbooks/my_workbook/executions/123')
@mock.patch.object(db_api, "execution_update", @mock.patch.object(db_api, 'execution_update',
mock.MagicMock(return_value=UPDATED_EXEC)) mock.MagicMock(return_value=UPDATED_EXEC))
def test_put(self): def test_put(self):
resp = self.app.put_json('/v1/workbooks/my_workbook/executions/123', resp = self.app.put_json('/v1/workbooks/my_workbook/executions/123',
dict(state='STOPPED')) dict(state='STOPPED'))
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
self.assertDictEqual(UPDATED_EXEC, resp.json) self.assertDictEqual(UPDATED_EXEC, canonize(resp.json))
@mock.patch.object(engine, "start_workflow_execution", @mock.patch.object(engine, 'start_workflow_execution',
mock.MagicMock(return_value=EXECS[0])) mock.MagicMock(return_value=EXECS[0]))
def test_post(self): def test_post(self):
resp = self.app.post_json('/v1/workbooks/my_workbook/executions', new_exec = EXECS[0].copy()
EXECS[0]) new_exec['context'] = json.dumps(new_exec['context'])
self.assertEqual(resp.status_int, 201)
self.assertDictEqual(EXECS[0], resp.json)
@mock.patch.object(engine, "start_workflow_execution", resp = self.app.post_json('/v1/workbooks/my_workbook/executions',
new_exec)
self.assertEqual(resp.status_int, 201)
self.assertDictEqual(EXECS[0], canonize(resp.json))
@mock.patch.object(engine, 'start_workflow_execution',
mock.MagicMock(side_effect=ex.MistralException)) mock.MagicMock(side_effect=ex.MistralException))
def test_post_throws_exception(self): def test_post_throws_exception(self):
with self.assertRaises(AppError) as context: with self.assertRaises(AppError) as context:
@ -84,14 +94,14 @@ class TestExecutionsController(base.FunctionalTest):
EXECS[0]) EXECS[0])
self.assertIn('Bad response: 400', context.exception.message) self.assertIn('Bad response: 400', context.exception.message)
@mock.patch.object(db_api, "execution_delete", @mock.patch.object(db_api, 'execution_delete',
mock.MagicMock(return_value=None)) mock.MagicMock(return_value=None))
def test_delete(self): def test_delete(self):
resp = self.app.delete('/v1/workbooks/my_workbook/executions/123') resp = self.app.delete('/v1/workbooks/my_workbook/executions/123')
self.assertEqual(resp.status_int, 204) self.assertEqual(resp.status_int, 204)
@mock.patch.object(db_api, "executions_get", @mock.patch.object(db_api, 'executions_get',
mock.MagicMock(return_value=EXECS)) mock.MagicMock(return_value=EXECS))
def test_get_all(self): def test_get_all(self):
resp = self.app.get('/v1/workbooks/my_workbook/executions') resp = self.app.get('/v1/workbooks/my_workbook/executions')
@ -99,4 +109,4 @@ class TestExecutionsController(base.FunctionalTest):
self.assertEqual(resp.status_int, 200) self.assertEqual(resp.status_int, 200)
self.assertEqual(len(resp.json), 1) self.assertEqual(len(resp.json), 1)
self.assertDictEqual(EXECS[0], resp.json['executions'][0]) self.assertDictEqual(EXECS[0], canonize(resp.json['executions'][0]))

View File

@ -22,20 +22,20 @@ from mistral.tests.unit import base as test_base
EVENTS = [ EVENTS = [
{ {
"id": u'1', 'id': '1',
"name": u'test_event1', 'name': 'test_event1',
'workbook_name': u'wb_name', 'workbook_name': 'wb_name',
"pattern": u'* *', 'pattern': '* *',
"next_execution_time": timeutils.utcnow(), 'next_execution_time': timeutils.utcnow(),
"updated_at": None 'updated_at': None
}, },
{ {
"id": u'2', 'id': '2',
"name": u'test_event2', 'name': 'test_event2',
'workbook_name': u'wb_name', 'workbook_name': 'wb_name',
"pattern": u'* * *', 'pattern': '* * *',
"next_execution_time": timeutils.utcnow(), 'next_execution_time': timeutils.utcnow(),
"updated_at": None 'updated_at': None
} }
] ]
@ -45,7 +45,7 @@ class EventTest(test_base.DbTestCase):
created = db_api.event_create(EVENTS[0]) created = db_api.event_create(EVENTS[0])
self.assertIsInstance(created, dict) self.assertIsInstance(created, dict)
fetched = db_api.event_get(created["id"]) fetched = db_api.event_get(created['id'])
self.assertIsInstance(fetched, dict) self.assertIsInstance(fetched, dict)
self.assertDictEqual(created, fetched) self.assertDictEqual(created, fetched)
@ -53,11 +53,11 @@ class EventTest(test_base.DbTestCase):
created = db_api.event_create(EVENTS[0]) created = db_api.event_create(EVENTS[0])
self.assertIsInstance(created, dict) self.assertIsInstance(created, dict)
updated = db_api.event_update(created["id"], {"pattern": "0 * *"}) updated = db_api.event_update(created['id'], {'pattern': '0 * *'})
self.assertIsInstance(updated, dict) self.assertIsInstance(updated, dict)
self.assertEqual(u'0 * *', updated["pattern"]) self.assertEqual('0 * *', updated['pattern'])
fetched = db_api.event_get(created["id"]) fetched = db_api.event_get(created['id'])
self.assertDictEqual(updated, fetched) self.assertDictEqual(updated, fetched)
def test_event_list(self): def test_event_list(self):
@ -73,26 +73,26 @@ class EventTest(test_base.DbTestCase):
WORKBOOKS = [ WORKBOOKS = [
{ {
"id": u'1', 'id': '1',
"name": u'my_workbook1', 'name': 'my_workbook1',
'description': u'my description', 'description': 'my description',
"definition": u'empty', 'definition': 'empty',
"tags": [u'mc'], 'tags': ['mc'],
"scope": u'public', 'scope': 'public',
"updated_at": None, 'updated_at': None,
"project_id": '123', 'project_id': '123',
"trust_id": '1234' 'trust_id': '1234'
}, },
{ {
"id": u'2', 'id': '2',
"name": u'my_workbook2', 'name': 'my_workbook2',
'description': u'my description', 'description': 'my description',
"definition": u'empty', 'definition': 'empty',
"tags": [u'mc'], 'tags': ['mc'],
"scope": u'public', 'scope': 'public',
"updated_at": None, 'updated_at': None,
"project_id": '1233', 'project_id': '1233',
"trust_id": '12345' 'trust_id': '12345'
}, },
] ]
@ -102,7 +102,7 @@ class WorkbookTest(test_base.DbTestCase):
created = db_api.workbook_create(WORKBOOKS[0]) created = db_api.workbook_create(WORKBOOKS[0])
self.assertIsInstance(created, dict) self.assertIsInstance(created, dict)
fetched = db_api.workbook_get(created["name"]) fetched = db_api.workbook_get(created['name'])
self.assertIsInstance(fetched, dict) self.assertIsInstance(fetched, dict)
self.assertDictEqual(created, fetched) self.assertDictEqual(created, fetched)
@ -110,12 +110,12 @@ class WorkbookTest(test_base.DbTestCase):
created = db_api.workbook_create(WORKBOOKS[0]) created = db_api.workbook_create(WORKBOOKS[0])
self.assertIsInstance(created, dict) self.assertIsInstance(created, dict)
updated = db_api.workbook_update(created["name"], updated = db_api.workbook_update(created['name'],
{"description": "my new desc"}) {'description': 'my new desc'})
self.assertIsInstance(updated, dict) self.assertIsInstance(updated, dict)
self.assertEqual(u'my new desc', updated["description"]) self.assertEqual('my new desc', updated['description'])
fetched = db_api.workbook_get(created["name"]) fetched = db_api.workbook_get(created['name'])
self.assertDictEqual(updated, fetched) self.assertDictEqual(updated, fetched)
def test_workbook_list(self): def test_workbook_list(self):
@ -132,28 +132,30 @@ class WorkbookTest(test_base.DbTestCase):
created = db_api.workbook_create(WORKBOOKS[0]) created = db_api.workbook_create(WORKBOOKS[0])
self.assertIsInstance(created, dict) self.assertIsInstance(created, dict)
fetched = db_api.workbook_get(created["name"]) fetched = db_api.workbook_get(created['name'])
self.assertIsInstance(fetched, dict) self.assertIsInstance(fetched, dict)
self.assertDictEqual(created, fetched) self.assertDictEqual(created, fetched)
db_api.workbook_delete(created['name']) db_api.workbook_delete(created['name'])
self.assertIsNone(db_api.workbook_get(created["name"])) self.assertIsNone(db_api.workbook_get(created['name']))
EXECUTIONS = [ EXECUTIONS = [
{ {
"id": u'1', 'id': '1',
"workbook_name": u'my_workbook', 'workbook_name': 'my_workbook',
'task': u'my_task1', 'task': 'my_task1',
"state": u'IDLE', 'state': 'IDLE',
"updated_at": None 'updated_at': None,
'context': None
}, },
{ {
"id": u'2', 'id': '2',
"workbook_name": u'my_workbook', 'workbook_name': 'my_workbook',
'task': u'my_task2', 'task': 'my_task2',
"state": u'RUNNING', 'state': 'RUNNING',
"updated_at": None 'updated_at': None,
'context': {'image_id': '123123'}
} }
] ]
@ -165,7 +167,7 @@ class ExecutionTest(test_base.DbTestCase):
self.assertIsInstance(created, dict) self.assertIsInstance(created, dict)
fetched = db_api.execution_get(EXECUTIONS[0]['workbook_name'], fetched = db_api.execution_get(EXECUTIONS[0]['workbook_name'],
created["id"]) created['id'])
self.assertIsInstance(fetched, dict) self.assertIsInstance(fetched, dict)
self.assertDictEqual(created, fetched) self.assertDictEqual(created, fetched)
@ -175,13 +177,13 @@ class ExecutionTest(test_base.DbTestCase):
self.assertIsInstance(created, dict) self.assertIsInstance(created, dict)
updated = db_api.execution_update(EXECUTIONS[0]['workbook_name'], updated = db_api.execution_update(EXECUTIONS[0]['workbook_name'],
created["id"], created['id'],
{"task": "task10"}) {'task': 'task10'})
self.assertIsInstance(updated, dict) self.assertIsInstance(updated, dict)
self.assertEqual(u'task10', updated["task"]) self.assertEqual('task10', updated['task'])
fetched = db_api.execution_get(EXECUTIONS[0]['workbook_name'], fetched = db_api.execution_get(EXECUTIONS[0]['workbook_name'],
created["id"]) created['id'])
self.assertDictEqual(updated, fetched) self.assertDictEqual(updated, fetched)
def test_execution_list(self): def test_execution_list(self):
@ -203,44 +205,50 @@ class ExecutionTest(test_base.DbTestCase):
self.assertIsInstance(created, dict) self.assertIsInstance(created, dict)
fetched = db_api.execution_get(EXECUTIONS[0]['workbook_name'], fetched = db_api.execution_get(EXECUTIONS[0]['workbook_name'],
created["id"]) created['id'])
self.assertIsInstance(fetched, dict) self.assertIsInstance(fetched, dict)
self.assertDictEqual(created, fetched) self.assertDictEqual(created, fetched)
db_api.execution_delete(EXECUTIONS[0]['workbook_name'], db_api.execution_delete(EXECUTIONS[0]['workbook_name'],
created['id']) created['id'])
self.assertIsNone(db_api.execution_get(EXECUTIONS[0]['workbook_name'], self.assertIsNone(db_api.execution_get(EXECUTIONS[0]['workbook_name'],
created["id"])) created['id']))
TASKS = [ TASKS = [
{ {
"id": u'1', 'id': '1',
"workbook_name": u'my_workbook', 'workbook_name': 'my_workbook',
"execution_id": u'1', 'execution_id': '1',
'name': u'my_task1', 'name': 'my_task1',
'description': u'my description', 'description': 'my description',
'requires': {u'my_task2': u'', u'my_task3': u''}, 'requires': {'my_task2': '', 'my_task3': ''},
"task_dsl": None, 'task_dsl': None,
"service_dsl": None, 'service_dsl': None,
"action": {u'name': u'Nova:create-vm'}, 'action': {'name': 'Nova:create-vm'},
"state": u'IDLE', 'state': 'IDLE',
"tags": [u'deployment'], 'tags': ['deployment'],
"updated_at": None 'updated_at': None,
'in_context': None,
'input': None,
'output': None
}, },
{ {
"id": u'2', 'id': '2',
"workbook_name": u'my_workbook', 'workbook_name': 'my_workbook',
"execution_id": u'1', 'execution_id': '1',
'name': u'my_task2', 'name': 'my_task2',
'description': u'my description', 'description': 'my description',
'requires': {u'my_task4': u'', u'my_task5': u''}, 'requires': {'my_task4': '', 'my_task5': ''},
"task_dsl": None, 'task_dsl': None,
"service_dsl": None, 'service_dsl': None,
"action": {u'name': u'Cinder:create-volume'}, 'action': {'name': 'Cinder:create-volume'},
"state": u'IDLE', 'state': 'IDLE',
"tags": [u'deployment'], 'tags': ['deployment'],
"updated_at": None 'updated_at': None,
'in_context': {'image_id': '123123'},
'input': {'image_id': '123123'},
'output': {'vm_id': '343123'}
}, },
] ]
@ -254,7 +262,7 @@ class TaskTest(test_base.DbTestCase):
fetched = db_api.task_get(TASKS[0]['workbook_name'], fetched = db_api.task_get(TASKS[0]['workbook_name'],
TASKS[0]['execution_id'], TASKS[0]['execution_id'],
created["id"]) created['id'])
self.assertIsInstance(fetched, dict) self.assertIsInstance(fetched, dict)
self.assertDictEqual(created, fetched) self.assertDictEqual(created, fetched)
@ -266,14 +274,14 @@ class TaskTest(test_base.DbTestCase):
updated = db_api.task_update(TASKS[0]['workbook_name'], updated = db_api.task_update(TASKS[0]['workbook_name'],
TASKS[0]['execution_id'], TASKS[0]['execution_id'],
created["id"], created['id'],
{"description": "my new desc"}) {'description': 'my new desc'})
self.assertIsInstance(updated, dict) self.assertIsInstance(updated, dict)
self.assertEqual(u'my new desc', updated["description"]) self.assertEqual('my new desc', updated['description'])
fetched = db_api.task_get(TASKS[0]['workbook_name'], fetched = db_api.task_get(TASKS[0]['workbook_name'],
TASKS[0]['execution_id'], TASKS[0]['execution_id'],
created["id"]) created['id'])
self.assertDictEqual(updated, fetched) self.assertDictEqual(updated, fetched)
def test_task_list(self): def test_task_list(self):
@ -299,7 +307,7 @@ class TaskTest(test_base.DbTestCase):
fetched = db_api.task_get(TASKS[0]['workbook_name'], fetched = db_api.task_get(TASKS[0]['workbook_name'],
TASKS[0]['execution_id'], TASKS[0]['execution_id'],
created["id"]) created['id'])
self.assertIsInstance(fetched, dict) self.assertIsInstance(fetched, dict)
self.assertDictEqual(created, fetched) self.assertDictEqual(created, fetched)
@ -308,7 +316,7 @@ class TaskTest(test_base.DbTestCase):
created['id']) created['id'])
self.assertIsNone(db_api.task_get(TASKS[0]['workbook_name'], self.assertIsNone(db_api.task_get(TASKS[0]['workbook_name'],
TASKS[0]['execution_id'], TASKS[0]['execution_id'],
created["id"])) created['id']))
class TXTest(test_base.DbTestCase): class TXTest(test_base.DbTestCase):
@ -319,7 +327,7 @@ class TXTest(test_base.DbTestCase):
created = db_api.event_create(EVENTS[0]) created = db_api.event_create(EVENTS[0])
self.assertIsInstance(created, dict) self.assertIsInstance(created, dict)
fetched = db_api.event_get(created["id"]) fetched = db_api.event_get(created['id'])
self.assertIsInstance(fetched, dict) self.assertIsInstance(fetched, dict)
self.assertDictEqual(created, fetched) self.assertDictEqual(created, fetched)
@ -331,7 +339,7 @@ class TXTest(test_base.DbTestCase):
self.assertFalse(self.is_db_session_open()) self.assertFalse(self.is_db_session_open())
fetched = db_api.event_get(created["id"]) fetched = db_api.event_get(created['id'])
self.assertIsNone(fetched) self.assertIsNone(fetched)
self.assertFalse(self.is_db_session_open()) self.assertFalse(self.is_db_session_open())
@ -343,7 +351,7 @@ class TXTest(test_base.DbTestCase):
created = db_api.event_create(EVENTS[0]) created = db_api.event_create(EVENTS[0])
self.assertIsInstance(created, dict) self.assertIsInstance(created, dict)
fetched = db_api.event_get(created["id"]) fetched = db_api.event_get(created['id'])
self.assertIsInstance(fetched, dict) self.assertIsInstance(fetched, dict)
self.assertDictEqual(created, fetched) self.assertDictEqual(created, fetched)
@ -355,7 +363,7 @@ class TXTest(test_base.DbTestCase):
self.assertFalse(self.is_db_session_open()) self.assertFalse(self.is_db_session_open())
fetched = db_api.event_get(created["id"]) fetched = db_api.event_get(created['id'])
self.assertIsInstance(fetched, dict) self.assertIsInstance(fetched, dict)
self.assertDictEqual(created, fetched) self.assertDictEqual(created, fetched)
@ -368,14 +376,14 @@ class TXTest(test_base.DbTestCase):
created_event = db_api.event_create(EVENTS[0]) created_event = db_api.event_create(EVENTS[0])
self.assertIsInstance(created_event, dict) self.assertIsInstance(created_event, dict)
fetched_event = db_api.event_get(created_event["id"]) fetched_event = db_api.event_get(created_event['id'])
self.assertIsInstance(fetched_event, dict) self.assertIsInstance(fetched_event, dict)
self.assertDictEqual(created_event, fetched_event) self.assertDictEqual(created_event, fetched_event)
created_workbook = db_api.workbook_create(WORKBOOKS[0]) created_workbook = db_api.workbook_create(WORKBOOKS[0])
self.assertIsInstance(created_workbook, dict) self.assertIsInstance(created_workbook, dict)
fetched_workbook = db_api.workbook_get(created_workbook["name"]) fetched_workbook = db_api.workbook_get(created_workbook['name'])
self.assertIsInstance(fetched_workbook, dict) self.assertIsInstance(fetched_workbook, dict)
self.assertDictEqual(created_workbook, fetched_workbook) self.assertDictEqual(created_workbook, fetched_workbook)
@ -387,10 +395,10 @@ class TXTest(test_base.DbTestCase):
self.assertFalse(self.is_db_session_open()) self.assertFalse(self.is_db_session_open())
fetched_event = db_api.event_get(created_event["id"]) fetched_event = db_api.event_get(created_event['id'])
self.assertIsNone(fetched_event) self.assertIsNone(fetched_event)
fetched_workbook = db_api.workbook_get(created_workbook["name"]) fetched_workbook = db_api.workbook_get(created_workbook['name'])
self.assertIsNone(fetched_workbook) self.assertIsNone(fetched_workbook)
self.assertFalse(self.is_db_session_open()) self.assertFalse(self.is_db_session_open())
@ -402,14 +410,14 @@ class TXTest(test_base.DbTestCase):
created_event = db_api.event_create(EVENTS[0]) created_event = db_api.event_create(EVENTS[0])
self.assertIsInstance(created_event, dict) self.assertIsInstance(created_event, dict)
fetched_event = db_api.event_get(created_event["id"]) fetched_event = db_api.event_get(created_event['id'])
self.assertIsInstance(fetched_event, dict) self.assertIsInstance(fetched_event, dict)
self.assertDictEqual(created_event, fetched_event) self.assertDictEqual(created_event, fetched_event)
created_workbook = db_api.workbook_create(WORKBOOKS[0]) created_workbook = db_api.workbook_create(WORKBOOKS[0])
self.assertIsInstance(created_workbook, dict) self.assertIsInstance(created_workbook, dict)
fetched_workbook = db_api.workbook_get(created_workbook["name"]) fetched_workbook = db_api.workbook_get(created_workbook['name'])
self.assertIsInstance(fetched_workbook, dict) self.assertIsInstance(fetched_workbook, dict)
self.assertDictEqual(created_workbook, fetched_workbook) self.assertDictEqual(created_workbook, fetched_workbook)
@ -421,11 +429,11 @@ class TXTest(test_base.DbTestCase):
self.assertFalse(self.is_db_session_open()) self.assertFalse(self.is_db_session_open())
fetched_event = db_api.event_get(created_event["id"]) fetched_event = db_api.event_get(created_event['id'])
self.assertIsInstance(fetched_event, dict) self.assertIsInstance(fetched_event, dict)
self.assertDictEqual(created_event, fetched_event) self.assertDictEqual(created_event, fetched_event)
fetched_workbook = db_api.workbook_get(created_workbook["name"]) fetched_workbook = db_api.workbook_get(created_workbook['name'])
self.assertIsInstance(fetched_workbook, dict) self.assertIsInstance(fetched_workbook, dict)
self.assertDictEqual(created_workbook, fetched_workbook) self.assertDictEqual(created_workbook, fetched_workbook)

View File

@ -28,6 +28,7 @@ ENGINE = engine.get_engine()
CFG_PREFIX = "tests/resources/" CFG_PREFIX = "tests/resources/"
WB_NAME = "my_workbook" WB_NAME = "my_workbook"
CONTEXT = None # TODO(rakhmerov): Use a meaningful value.
#TODO(rakhmerov): add more tests for errors, execution stop etc. #TODO(rakhmerov): add more tests for errors, execution stop etc.
@ -46,8 +47,8 @@ class TestLocalEngine(test_base.DbTestCase):
@mock.patch.object(actions.RestAction, "run", @mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value={'state': states.RUNNING})) mock.MagicMock(return_value={'state': states.RUNNING}))
def test_engine_one_task(self): def test_engine_one_task(self):
execution = ENGINE.start_workflow_execution(WB_NAME, execution = ENGINE.start_workflow_execution(WB_NAME, "create-vms",
"create-vms") CONTEXT)
task = db_api.tasks_get(WB_NAME, execution['id'])[0] task = db_api.tasks_get(WB_NAME, execution['id'])[0]
@ -67,8 +68,8 @@ class TestLocalEngine(test_base.DbTestCase):
@mock.patch.object(actions.RestAction, "run", @mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value={'state': states.RUNNING})) mock.MagicMock(return_value={'state': states.RUNNING}))
def test_engine_multiple_tasks(self): def test_engine_multiple_tasks(self):
execution = ENGINE.start_workflow_execution(WB_NAME, execution = ENGINE.start_workflow_execution(WB_NAME, "backup-vms",
"backup-vms") CONTEXT)
tasks = db_api.tasks_get(WB_NAME, execution['id']) tasks = db_api.tasks_get(WB_NAME, execution['id'])
@ -109,9 +110,12 @@ class TestLocalEngine(test_base.DbTestCase):
@mock.patch.object(states, "get_state_by_http_status_code", @mock.patch.object(states, "get_state_by_http_status_code",
mock.MagicMock(return_value=states.SUCCESS)) mock.MagicMock(return_value=states.SUCCESS))
def test_engine_sync_task(self): def test_engine_sync_task(self):
execution = ENGINE.start_workflow_execution(WB_NAME, "create-vm-nova") execution = ENGINE.start_workflow_execution(WB_NAME, "create-vm-nova",
CONTEXT)
task = db_api.tasks_get(WB_NAME, execution['id'])[0] task = db_api.tasks_get(WB_NAME, execution['id'])[0]
execution = db_api.execution_get(WB_NAME, execution['id']) execution = db_api.execution_get(WB_NAME, execution['id'])
self.assertEqual(execution['state'], states.SUCCESS) self.assertEqual(execution['state'], states.SUCCESS)
self.assertEqual(task['state'], states.SUCCESS) self.assertEqual(task['state'], states.SUCCESS)
@ -122,22 +126,28 @@ class TestLocalEngine(test_base.DbTestCase):
@mock.patch.object(actions.RestAction, "run", @mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value={'state': states.SUCCESS})) mock.MagicMock(return_value={'state': states.SUCCESS}))
def test_engine_tasks_on_success_finish(self): def test_engine_tasks_on_success_finish(self):
execution = ENGINE.start_workflow_execution(WB_NAME, execution = ENGINE.start_workflow_execution(WB_NAME, "test_subsequent",
"test_subsequent") CONTEXT)
tasks = db_api.tasks_get(WB_NAME, execution['id']) tasks = db_api.tasks_get(WB_NAME, execution['id'])
self.assertEqual(len(tasks), 1) self.assertEqual(len(tasks), 1)
execution = db_api.execution_get(WB_NAME, execution['id']) execution = db_api.execution_get(WB_NAME, execution['id'])
ENGINE.convey_task_result(WB_NAME, execution['id'], ENGINE.convey_task_result(WB_NAME, execution['id'],
tasks[0]['id'], tasks[0]['id'],
states.SUCCESS, None) states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id']) tasks = db_api.tasks_get(WB_NAME, execution['id'])
self.assertEqual(len(tasks), 4) self.assertEqual(len(tasks), 4)
attach_volumes = [t for t in tasks if t['name'] == 'attach-volumes'][0] attach_volumes = [t for t in tasks if t['name'] == 'attach-volumes'][0]
self.assertIn(attach_volumes, tasks) self.assertIn(attach_volumes, tasks)
self.assertEqual(tasks[0]['state'], states.SUCCESS) self.assertEqual(tasks[0]['state'], states.SUCCESS)
self.assertEqual(tasks[1]['state'], states.IDLE) self.assertEqual(tasks[1]['state'], states.IDLE)
self.assertEqual(tasks[2]['state'], states.RUNNING) self.assertEqual(tasks[2]['state'], states.RUNNING)
ENGINE.convey_task_result(WB_NAME, execution['id'], ENGINE.convey_task_result(WB_NAME, execution['id'],
tasks[2]['id'], tasks[2]['id'],
states.SUCCESS, None) states.SUCCESS, None)
@ -146,14 +156,17 @@ class TestLocalEngine(test_base.DbTestCase):
states.SUCCESS, None) states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id']) tasks = db_api.tasks_get(WB_NAME, execution['id'])
self.assertEqual(tasks[2]['state'], states.SUCCESS) self.assertEqual(tasks[2]['state'], states.SUCCESS)
self.assertEqual(tasks[1]['state'], states.RUNNING) self.assertEqual(tasks[1]['state'], states.RUNNING)
ENGINE.convey_task_result(WB_NAME, execution['id'], ENGINE.convey_task_result(WB_NAME, execution['id'],
tasks[1]['id'], tasks[1]['id'],
states.SUCCESS, None) states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id']) tasks = db_api.tasks_get(WB_NAME, execution['id'])
execution = db_api.execution_get(WB_NAME, execution['id']) execution = db_api.execution_get(WB_NAME, execution['id'])
self.assertEqual(tasks[1]['state'], states.SUCCESS) self.assertEqual(tasks[1]['state'], states.SUCCESS)
self.assertEqual(execution['state'], states.SUCCESS) self.assertEqual(execution['state'], states.SUCCESS)
@ -164,21 +177,27 @@ class TestLocalEngine(test_base.DbTestCase):
@mock.patch.object(actions.RestAction, "run", @mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value={'state': states.SUCCESS})) mock.MagicMock(return_value={'state': states.SUCCESS}))
def test_engine_tasks_on_error_finish(self): def test_engine_tasks_on_error_finish(self):
execution = ENGINE.start_workflow_execution(WB_NAME, execution = ENGINE.start_workflow_execution(WB_NAME, "test_subsequent",
"test_subsequent") CONTEXT)
tasks = db_api.tasks_get(WB_NAME, execution['id']) tasks = db_api.tasks_get(WB_NAME, execution['id'])
execution = db_api.execution_get(WB_NAME, execution['id']) execution = db_api.execution_get(WB_NAME, execution['id'])
ENGINE.convey_task_result(WB_NAME, execution['id'], ENGINE.convey_task_result(WB_NAME, execution['id'],
tasks[0]['id'], tasks[0]['id'],
states.ERROR, None) states.ERROR, None)
tasks = db_api.tasks_get(WB_NAME, execution['id']) tasks = db_api.tasks_get(WB_NAME, execution['id'])
self.assertEqual(len(tasks), 4) self.assertEqual(len(tasks), 4)
backup_vms = [t for t in tasks if t['name'] == 'backup-vms'][0] backup_vms = [t for t in tasks if t['name'] == 'backup-vms'][0]
self.assertIn(backup_vms, tasks) self.assertIn(backup_vms, tasks)
self.assertEqual(tasks[0]['state'], states.ERROR) self.assertEqual(tasks[0]['state'], states.ERROR)
self.assertEqual(tasks[1]['state'], states.IDLE) self.assertEqual(tasks[1]['state'], states.IDLE)
self.assertEqual(tasks[2]['state'], states.RUNNING) self.assertEqual(tasks[2]['state'], states.RUNNING)
ENGINE.convey_task_result(WB_NAME, execution['id'], ENGINE.convey_task_result(WB_NAME, execution['id'],
tasks[2]['id'], tasks[2]['id'],
states.SUCCESS, None) states.SUCCESS, None)
@ -187,13 +206,16 @@ class TestLocalEngine(test_base.DbTestCase):
states.SUCCESS, None) states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id']) tasks = db_api.tasks_get(WB_NAME, execution['id'])
self.assertEqual(tasks[2]['state'], states.SUCCESS) self.assertEqual(tasks[2]['state'], states.SUCCESS)
self.assertEqual(tasks[1]['state'], states.RUNNING) self.assertEqual(tasks[1]['state'], states.RUNNING)
ENGINE.convey_task_result(WB_NAME, execution['id'], ENGINE.convey_task_result(WB_NAME, execution['id'],
tasks[1]['id'], tasks[1]['id'],
states.SUCCESS, None) states.SUCCESS, None)
tasks = db_api.tasks_get(WB_NAME, execution['id']) tasks = db_api.tasks_get(WB_NAME, execution['id'])
execution = db_api.execution_get(WB_NAME, execution['id']) execution = db_api.execution_get(WB_NAME, execution['id'])
self.assertEqual(tasks[1]['state'], states.SUCCESS) self.assertEqual(tasks[1]['state'], states.SUCCESS)
self.assertEqual(execution['state'], states.SUCCESS) self.assertEqual(execution['state'], states.SUCCESS)

View File

@ -28,6 +28,8 @@ ENGINE = engine.get_engine()
CFG_PREFIX = "tests/resources/" CFG_PREFIX = "tests/resources/"
WB_NAME = "my_workbook" WB_NAME = "my_workbook"
CONTEXT = None # TODO(rakhmerov): Use a meaningful value.
#TODO(rakhmerov): add more tests for errors, execution stop etc. #TODO(rakhmerov): add more tests for errors, execution stop etc.
@ -48,8 +50,8 @@ class TestScalableEngine(test_base.DbTestCase):
@mock.patch.object(actions.RestAction, "run", @mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value="result")) mock.MagicMock(return_value="result"))
def test_engine_one_task(self): def test_engine_one_task(self):
execution = ENGINE.start_workflow_execution(WB_NAME, execution = ENGINE.start_workflow_execution(WB_NAME, "create-vms",
"create-vms") CONTEXT)
task = db_api.tasks_get(WB_NAME, execution['id'])[0] task = db_api.tasks_get(WB_NAME, execution['id'])[0]
@ -71,8 +73,8 @@ class TestScalableEngine(test_base.DbTestCase):
@mock.patch.object(actions.RestAction, "run", @mock.patch.object(actions.RestAction, "run",
mock.MagicMock(return_value="result")) mock.MagicMock(return_value="result"))
def test_engine_multiple_tasks(self): def test_engine_multiple_tasks(self):
execution = ENGINE.start_workflow_execution(WB_NAME, execution = ENGINE.start_workflow_execution(WB_NAME, "backup-vms",
"backup-vms") CONTEXT)
tasks = db_api.tasks_get(WB_NAME, execution['id']) tasks = db_api.tasks_get(WB_NAME, execution['id'])

View File

@ -18,5 +18,4 @@ import yaql
def evaluate(expression_str, data): def evaluate(expression_str, data):
expression = yaql.parse(expression_str) return yaql.parse(expression_str).evaluate(data)
return expression.evaluate(data)

View File

@ -31,11 +31,11 @@ from pylint.reporters import text
# Note(maoy): E1103 is error code related to partial type inference # Note(maoy): E1103 is error code related to partial type inference
ignore_codes = ["E1103"] ignore_codes = ["E1103"]
# Note(maoy): the error message is the pattern of E0202. It should be ignored # Note(maoy): the error message is the pattern of E0202. It should be ignored
# for savanna.tests modules # for mistral.tests modules
ignore_messages = ["An attribute affected in savanna.tests"] ignore_messages = ["An attribute affected in mistral.tests"]
# We ignore all errors in openstack.common because it should be checked # We ignore all errors in openstack.common because it should be checked
# elsewhere. # elsewhere.
ignore_modules = ["savanna/openstack/common/"] ignore_modules = ["mistral/openstack/common/"]
KNOWN_PYLINT_EXCEPTIONS_FILE = "tools/pylint_exceptions" KNOWN_PYLINT_EXCEPTIONS_FILE = "tools/pylint_exceptions"
@ -130,7 +130,7 @@ class ErrorKeys(object):
def run_pylint(): def run_pylint():
buff = StringIO.StringIO() buff = StringIO.StringIO()
reporter = text.ParseableTextReporter(output=buff) reporter = text.ParseableTextReporter(output=buff)
args = ["--include-ids=y", "-E", "savanna"] args = ["--include-ids=y", "-E", "mistral"]
lint.Run(args, reporter=reporter, exit=False) lint.Run(args, reporter=reporter, exit=False)
val = buff.getvalue() val = buff.getvalue()
buff.close() buff.close()