From 58407b6f941b8736908f9e1c5405192c25825d41 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Thu, 20 Feb 2014 14:44:48 +0700 Subject: [PATCH] Working on Data Flow (step 1) * Refactored API layer so that we can work with 'context' as with a json object in underlying layers (DB, engine, etc.) rather than a string * Added "context" parameter in all required places * Added necessary Data Flow related properties to DB models * Refactored and fixed a series of tests * Minor formatting changes TODO: * Calculation of task incoming context * Abstract interface for expression evaluator * Data Flow related tests Partially implements blueprint: mistral-dataflow Change-Id: Ie7f94d79265e9861f7ad15c76ff6d788ec62b683 --- mistral/api/controllers/v1/execution.py | 31 ++- mistral/db/sqlalchemy/models.py | 6 + mistral/engine/abstract_engine.py | 50 +++-- mistral/engine/engine.py | 5 +- mistral/engine/scalable/executor/executor.py | 4 + .../api/v1/controllers/test_executions.py | 54 +++-- .../tests/unit/db/test_sqlalchemy_db_api.py | 210 +++++++++--------- .../tests/unit/engine/local/test_engine.py | 40 +++- .../tests/unit/engine/scalable/test_engine.py | 10 +- mistral/utils/yaql_utils.py | 3 +- tools/lintstack.py | 8 +- 11 files changed, 258 insertions(+), 163 deletions(-) diff --git a/mistral/api/controllers/v1/execution.py b/mistral/api/controllers/v1/execution.py index 9677a57d7..2d90eb6fd 100644 --- a/mistral/api/controllers/v1/execution.py +++ b/mistral/api/controllers/v1/execution.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + from pecan import rest from pecan import abort from wsme import types as wtypes @@ -37,9 +39,29 @@ class Execution(resource.Resource): task = wtypes.text state = wtypes.text # Context is a JSON object but since WSME doesn't support arbitrary - # dictionaries we have to use text type. + # dictionaries we have to use text type convert to json and back manually. context = wtypes.text + def to_dict(self): + d = super(Execution, self).to_dict() + + if d.get('context'): + d['context'] = json.loads(d['context']) + + return d + + @classmethod + def from_dict(cls, d): + e = cls() + + for key, val in d.items(): + if hasattr(e, key): + if key == 'context' and val: + val = json.dumps(val) + setattr(e, key, val) + + return e + class Executions(resource.Resource): """A collection of Execution resources.""" @@ -80,8 +102,13 @@ class ExecutionsController(rest.RestController): LOG.debug("Create execution [workbook_name=%s, execution=%s]" % (workbook_name, execution)) try: + context = None + if execution.context: + context = json.loads(execution.context) + values = engine.start_workflow_execution(execution.workbook_name, - execution.task) + execution.task, + context) except ex.MistralException as e: #TODO(nmakhotkin) we should use thing such a decorator here abort(400, e.message) diff --git a/mistral/db/sqlalchemy/models.py b/mistral/db/sqlalchemy/models.py index e17335b49..48ac3ffd5 100644 --- a/mistral/db/sqlalchemy/models.py +++ b/mistral/db/sqlalchemy/models.py @@ -58,6 +58,7 @@ class WorkflowExecution(mb.MistralBase): workbook_name = sa.Column(sa.String(80)) task = sa.Column(sa.String(80)) state = sa.Column(sa.String(20)) + context = sa.Column(st.JsonDictType()) class Workbook(mb.MistralBase): @@ -94,3 +95,8 @@ class Task(mb.MistralBase): service_dsl = sa.Column(st.JsonDictType()) state = sa.Column(sa.String(20)) tags = sa.Column(st.JsonListType()) + + # Data Flow properties. + in_context = sa.Column(st.JsonDictType()) + input = sa.Column(st.JsonDictType()) + output = sa.Column(st.JsonDictType()) diff --git a/mistral/engine/abstract_engine.py b/mistral/engine/abstract_engine.py index e0966a136..250cf242e 100644 --- a/mistral/engine/abstract_engine.py +++ b/mistral/engine/abstract_engine.py @@ -26,6 +26,10 @@ from mistral.engine import workflow LOG = logging.getLogger(__name__) +# TODO(rakhmerov): Upcoming Data Flow changes: +# 1. Calculate "in_context" for all the tasks submitted for execution. +# 2. Transfer "in_context" along with task data over AMQP. + class AbstractEngine(object): @classmethod @@ -34,17 +38,22 @@ class AbstractEngine(object): pass @classmethod - def start_workflow_execution(cls, workbook_name, task_name): - wb_dsl = cls._get_wb_dsl(workbook_name) - dsl_tasks = workflow.find_workflow_tasks(wb_dsl, task_name) + def start_workflow_execution(cls, workbook_name, task_name, context): db_api.start_tx() + wb_dsl = cls._get_wb_dsl(workbook_name) + # Persist execution and tasks in DB. try: - execution = cls._create_execution(workbook_name, task_name) + execution = cls._create_execution(workbook_name, + task_name, + context) - tasks = cls._create_tasks(dsl_tasks, wb_dsl, - workbook_name, execution['id']) + tasks = cls._create_tasks( + workflow.find_workflow_tasks(wb_dsl, task_name), + wb_dsl, + workbook_name, execution['id'] + ) db_api.commit_tx() except Exception as e: @@ -53,6 +62,9 @@ class AbstractEngine(object): finally: db_api.end_tx() + # TODO(rakhmerov): This doesn't look correct anymore, we shouldn't + # start tasks which don't have dependencies but are reachable only + # via direct transitions. cls._run_tasks(workflow.find_resolved_tasks(tasks)) return execution @@ -61,17 +73,17 @@ class AbstractEngine(object): def convey_task_result(cls, workbook_name, execution_id, task_id, state, result): db_api.start_tx() + wb_dsl = cls._get_wb_dsl(workbook_name) #TODO(rakhmerov): validate state transition # Update task state. task = db_api.task_update(workbook_name, execution_id, task_id, - {"state": state, "result": result}) + {"state": state, "output": result}) + execution = db_api.execution_get(workbook_name, execution_id) - cls._create_next_tasks(task, - wb_dsl, - workbook_name, - execution_id) + + cls._create_next_tasks(task, wb_dsl, workbook_name, execution_id) # Determine what tasks need to be started. tasks = db_api.tasks_get(workbook_name, execution_id) @@ -128,24 +140,27 @@ class AbstractEngine(object): return task["state"] @classmethod - def _create_execution(cls, workbook_name, task_name): + def _create_execution(cls, workbook_name, task_name, context): return db_api.execution_create(workbook_name, { "workbook_name": workbook_name, "task": task_name, - "state": states.RUNNING + "state": states.RUNNING, + "context": context }) @classmethod - def _create_next_tasks(cls, task, wb_dsl, - workbook_name, execution_id): + def _create_next_tasks(cls, task, wb_dsl, workbook_name, execution_id): dsl_tasks = workflow.find_tasks_after_completion(task, wb_dsl) - tasks = cls._create_tasks(dsl_tasks, wb_dsl, - workbook_name, execution_id) + + tasks = cls._create_tasks(dsl_tasks, wb_dsl, workbook_name, + execution_id) + return workflow.find_resolved_tasks(tasks) @classmethod def _create_tasks(cls, dsl_tasks, wb_dsl, workbook_name, execution_id): tasks = [] + for dsl_task in dsl_tasks: task = db_api.task_create(workbook_name, execution_id, { "name": dsl_task["name"], @@ -157,6 +172,7 @@ class AbstractEngine(object): }) tasks.append(task) + return tasks @classmethod diff --git a/mistral/engine/engine.py b/mistral/engine/engine.py index a63acc713..d356f8007 100644 --- a/mistral/engine/engine.py +++ b/mistral/engine/engine.py @@ -31,15 +31,16 @@ finally: pass -def start_workflow_execution(workbook_name, task_name): +def start_workflow_execution(workbook_name, task_name, context=None): """Starts a workflow execution based on the specified workbook name and target task. :param workbook_name: Workbook name :param task_name: Target task name + :param context: Execution context which defines a workflow input :return: Workflow execution. """ - return IMPL.start_workflow_execution(workbook_name, task_name) + return IMPL.start_workflow_execution(workbook_name, task_name, context) def stop_workflow_execution(workbook_name, execution_id): diff --git a/mistral/engine/scalable/executor/executor.py b/mistral/engine/scalable/executor/executor.py index facf60199..369be02f7 100644 --- a/mistral/engine/scalable/executor/executor.py +++ b/mistral/engine/scalable/executor/executor.py @@ -27,6 +27,10 @@ from mistral.engine.actions import action_helper as a_h LOG = logging.getLogger(__name__) +# TODO(rakhmerov): Upcoming Data Flow changes: +# 1. Receive "in_context" along with task data. +# 2. Apply task input expression to "in_context" and calculate "input". + def do_task_action(task): LOG.info("Starting task action [task_id=%s, action='%s', service='%s'" % diff --git a/mistral/tests/api/v1/controllers/test_executions.py b/mistral/tests/api/v1/controllers/test_executions.py index fe1d6d589..85fdee824 100644 --- a/mistral/tests/api/v1/controllers/test_executions.py +++ b/mistral/tests/api/v1/controllers/test_executions.py @@ -15,6 +15,7 @@ # limitations under the License. import mock +import json from mistral import exceptions as ex from webtest.app import AppError @@ -24,20 +25,19 @@ from mistral.engine import engine # TODO: later we need additional tests verifying all the errors etc. + EXECS = [ { - 'id': "123", + 'id': '123', 'workbook_name': 'my_workbook', 'task': 'my_task', 'state': 'RUNNING', - 'context': """ - { - "person": { - "first_name": "John", - "last_name": "Doe" - } + 'context': { + "person": { + "first_name": "John", + "last_name": "Doe" } - """ + } } ] @@ -45,38 +45,48 @@ UPDATED_EXEC = EXECS[0].copy() UPDATED_EXEC['state'] = 'STOPPED' +def canonize(json_dict): + if json_dict.get('context'): + json_dict['context'] = json.loads(json_dict['context']) + + return json_dict + + class TestExecutionsController(base.FunctionalTest): - @mock.patch.object(db_api, "execution_get", + @mock.patch.object(db_api, 'execution_get', mock.MagicMock(return_value=EXECS[0])) def test_get(self): resp = self.app.get('/v1/workbooks/my_workbook/executions/123') self.assertEqual(resp.status_int, 200) - self.assertDictEqual(EXECS[0], resp.json) + self.assertDictEqual(EXECS[0], canonize(resp.json)) - @mock.patch.object(db_api, "execution_get", + @mock.patch.object(db_api, 'execution_get', mock.MagicMock(return_value=None)) def test_get_empty(self): self.assertNotFound('/v1/workbooks/my_workbook/executions/123') - @mock.patch.object(db_api, "execution_update", + @mock.patch.object(db_api, 'execution_update', mock.MagicMock(return_value=UPDATED_EXEC)) def test_put(self): resp = self.app.put_json('/v1/workbooks/my_workbook/executions/123', dict(state='STOPPED')) self.assertEqual(resp.status_int, 200) - self.assertDictEqual(UPDATED_EXEC, resp.json) + self.assertDictEqual(UPDATED_EXEC, canonize(resp.json)) - @mock.patch.object(engine, "start_workflow_execution", + @mock.patch.object(engine, 'start_workflow_execution', mock.MagicMock(return_value=EXECS[0])) def test_post(self): - resp = self.app.post_json('/v1/workbooks/my_workbook/executions', - EXECS[0]) - self.assertEqual(resp.status_int, 201) - self.assertDictEqual(EXECS[0], resp.json) + new_exec = EXECS[0].copy() + new_exec['context'] = json.dumps(new_exec['context']) - @mock.patch.object(engine, "start_workflow_execution", + resp = self.app.post_json('/v1/workbooks/my_workbook/executions', + new_exec) + self.assertEqual(resp.status_int, 201) + self.assertDictEqual(EXECS[0], canonize(resp.json)) + + @mock.patch.object(engine, 'start_workflow_execution', mock.MagicMock(side_effect=ex.MistralException)) def test_post_throws_exception(self): with self.assertRaises(AppError) as context: @@ -84,14 +94,14 @@ class TestExecutionsController(base.FunctionalTest): EXECS[0]) self.assertIn('Bad response: 400', context.exception.message) - @mock.patch.object(db_api, "execution_delete", + @mock.patch.object(db_api, 'execution_delete', mock.MagicMock(return_value=None)) def test_delete(self): resp = self.app.delete('/v1/workbooks/my_workbook/executions/123') self.assertEqual(resp.status_int, 204) - @mock.patch.object(db_api, "executions_get", + @mock.patch.object(db_api, 'executions_get', mock.MagicMock(return_value=EXECS)) def test_get_all(self): resp = self.app.get('/v1/workbooks/my_workbook/executions') @@ -99,4 +109,4 @@ class TestExecutionsController(base.FunctionalTest): self.assertEqual(resp.status_int, 200) self.assertEqual(len(resp.json), 1) - self.assertDictEqual(EXECS[0], resp.json['executions'][0]) + self.assertDictEqual(EXECS[0], canonize(resp.json['executions'][0])) diff --git a/mistral/tests/unit/db/test_sqlalchemy_db_api.py b/mistral/tests/unit/db/test_sqlalchemy_db_api.py index 3838a1a87..d23a1793d 100644 --- a/mistral/tests/unit/db/test_sqlalchemy_db_api.py +++ b/mistral/tests/unit/db/test_sqlalchemy_db_api.py @@ -22,20 +22,20 @@ from mistral.tests.unit import base as test_base EVENTS = [ { - "id": u'1', - "name": u'test_event1', - 'workbook_name': u'wb_name', - "pattern": u'* *', - "next_execution_time": timeutils.utcnow(), - "updated_at": None + 'id': '1', + 'name': 'test_event1', + 'workbook_name': 'wb_name', + 'pattern': '* *', + 'next_execution_time': timeutils.utcnow(), + 'updated_at': None }, { - "id": u'2', - "name": u'test_event2', - 'workbook_name': u'wb_name', - "pattern": u'* * *', - "next_execution_time": timeutils.utcnow(), - "updated_at": None + 'id': '2', + 'name': 'test_event2', + 'workbook_name': 'wb_name', + 'pattern': '* * *', + 'next_execution_time': timeutils.utcnow(), + 'updated_at': None } ] @@ -45,7 +45,7 @@ class EventTest(test_base.DbTestCase): created = db_api.event_create(EVENTS[0]) self.assertIsInstance(created, dict) - fetched = db_api.event_get(created["id"]) + fetched = db_api.event_get(created['id']) self.assertIsInstance(fetched, dict) self.assertDictEqual(created, fetched) @@ -53,11 +53,11 @@ class EventTest(test_base.DbTestCase): created = db_api.event_create(EVENTS[0]) self.assertIsInstance(created, dict) - updated = db_api.event_update(created["id"], {"pattern": "0 * *"}) + updated = db_api.event_update(created['id'], {'pattern': '0 * *'}) self.assertIsInstance(updated, dict) - self.assertEqual(u'0 * *', updated["pattern"]) + self.assertEqual('0 * *', updated['pattern']) - fetched = db_api.event_get(created["id"]) + fetched = db_api.event_get(created['id']) self.assertDictEqual(updated, fetched) def test_event_list(self): @@ -73,26 +73,26 @@ class EventTest(test_base.DbTestCase): WORKBOOKS = [ { - "id": u'1', - "name": u'my_workbook1', - 'description': u'my description', - "definition": u'empty', - "tags": [u'mc'], - "scope": u'public', - "updated_at": None, - "project_id": '123', - "trust_id": '1234' + 'id': '1', + 'name': 'my_workbook1', + 'description': 'my description', + 'definition': 'empty', + 'tags': ['mc'], + 'scope': 'public', + 'updated_at': None, + 'project_id': '123', + 'trust_id': '1234' }, { - "id": u'2', - "name": u'my_workbook2', - 'description': u'my description', - "definition": u'empty', - "tags": [u'mc'], - "scope": u'public', - "updated_at": None, - "project_id": '1233', - "trust_id": '12345' + 'id': '2', + 'name': 'my_workbook2', + 'description': 'my description', + 'definition': 'empty', + 'tags': ['mc'], + 'scope': 'public', + 'updated_at': None, + 'project_id': '1233', + 'trust_id': '12345' }, ] @@ -102,7 +102,7 @@ class WorkbookTest(test_base.DbTestCase): created = db_api.workbook_create(WORKBOOKS[0]) self.assertIsInstance(created, dict) - fetched = db_api.workbook_get(created["name"]) + fetched = db_api.workbook_get(created['name']) self.assertIsInstance(fetched, dict) self.assertDictEqual(created, fetched) @@ -110,12 +110,12 @@ class WorkbookTest(test_base.DbTestCase): created = db_api.workbook_create(WORKBOOKS[0]) self.assertIsInstance(created, dict) - updated = db_api.workbook_update(created["name"], - {"description": "my new desc"}) + updated = db_api.workbook_update(created['name'], + {'description': 'my new desc'}) self.assertIsInstance(updated, dict) - self.assertEqual(u'my new desc', updated["description"]) + self.assertEqual('my new desc', updated['description']) - fetched = db_api.workbook_get(created["name"]) + fetched = db_api.workbook_get(created['name']) self.assertDictEqual(updated, fetched) def test_workbook_list(self): @@ -132,28 +132,30 @@ class WorkbookTest(test_base.DbTestCase): created = db_api.workbook_create(WORKBOOKS[0]) self.assertIsInstance(created, dict) - fetched = db_api.workbook_get(created["name"]) + fetched = db_api.workbook_get(created['name']) self.assertIsInstance(fetched, dict) self.assertDictEqual(created, fetched) db_api.workbook_delete(created['name']) - self.assertIsNone(db_api.workbook_get(created["name"])) + self.assertIsNone(db_api.workbook_get(created['name'])) EXECUTIONS = [ { - "id": u'1', - "workbook_name": u'my_workbook', - 'task': u'my_task1', - "state": u'IDLE', - "updated_at": None + 'id': '1', + 'workbook_name': 'my_workbook', + 'task': 'my_task1', + 'state': 'IDLE', + 'updated_at': None, + 'context': None }, { - "id": u'2', - "workbook_name": u'my_workbook', - 'task': u'my_task2', - "state": u'RUNNING', - "updated_at": None + 'id': '2', + 'workbook_name': 'my_workbook', + 'task': 'my_task2', + 'state': 'RUNNING', + 'updated_at': None, + 'context': {'image_id': '123123'} } ] @@ -165,7 +167,7 @@ class ExecutionTest(test_base.DbTestCase): self.assertIsInstance(created, dict) fetched = db_api.execution_get(EXECUTIONS[0]['workbook_name'], - created["id"]) + created['id']) self.assertIsInstance(fetched, dict) self.assertDictEqual(created, fetched) @@ -175,13 +177,13 @@ class ExecutionTest(test_base.DbTestCase): self.assertIsInstance(created, dict) updated = db_api.execution_update(EXECUTIONS[0]['workbook_name'], - created["id"], - {"task": "task10"}) + created['id'], + {'task': 'task10'}) self.assertIsInstance(updated, dict) - self.assertEqual(u'task10', updated["task"]) + self.assertEqual('task10', updated['task']) fetched = db_api.execution_get(EXECUTIONS[0]['workbook_name'], - created["id"]) + created['id']) self.assertDictEqual(updated, fetched) def test_execution_list(self): @@ -203,44 +205,50 @@ class ExecutionTest(test_base.DbTestCase): self.assertIsInstance(created, dict) fetched = db_api.execution_get(EXECUTIONS[0]['workbook_name'], - created["id"]) + created['id']) self.assertIsInstance(fetched, dict) self.assertDictEqual(created, fetched) db_api.execution_delete(EXECUTIONS[0]['workbook_name'], created['id']) self.assertIsNone(db_api.execution_get(EXECUTIONS[0]['workbook_name'], - created["id"])) + created['id'])) TASKS = [ { - "id": u'1', - "workbook_name": u'my_workbook', - "execution_id": u'1', - 'name': u'my_task1', - 'description': u'my description', - 'requires': {u'my_task2': u'', u'my_task3': u''}, - "task_dsl": None, - "service_dsl": None, - "action": {u'name': u'Nova:create-vm'}, - "state": u'IDLE', - "tags": [u'deployment'], - "updated_at": None + 'id': '1', + 'workbook_name': 'my_workbook', + 'execution_id': '1', + 'name': 'my_task1', + 'description': 'my description', + 'requires': {'my_task2': '', 'my_task3': ''}, + 'task_dsl': None, + 'service_dsl': None, + 'action': {'name': 'Nova:create-vm'}, + 'state': 'IDLE', + 'tags': ['deployment'], + 'updated_at': None, + 'in_context': None, + 'input': None, + 'output': None }, { - "id": u'2', - "workbook_name": u'my_workbook', - "execution_id": u'1', - 'name': u'my_task2', - 'description': u'my description', - 'requires': {u'my_task4': u'', u'my_task5': u''}, - "task_dsl": None, - "service_dsl": None, - "action": {u'name': u'Cinder:create-volume'}, - "state": u'IDLE', - "tags": [u'deployment'], - "updated_at": None + 'id': '2', + 'workbook_name': 'my_workbook', + 'execution_id': '1', + 'name': 'my_task2', + 'description': 'my description', + 'requires': {'my_task4': '', 'my_task5': ''}, + 'task_dsl': None, + 'service_dsl': None, + 'action': {'name': 'Cinder:create-volume'}, + 'state': 'IDLE', + 'tags': ['deployment'], + 'updated_at': None, + 'in_context': {'image_id': '123123'}, + 'input': {'image_id': '123123'}, + 'output': {'vm_id': '343123'} }, ] @@ -254,7 +262,7 @@ class TaskTest(test_base.DbTestCase): fetched = db_api.task_get(TASKS[0]['workbook_name'], TASKS[0]['execution_id'], - created["id"]) + created['id']) self.assertIsInstance(fetched, dict) self.assertDictEqual(created, fetched) @@ -266,14 +274,14 @@ class TaskTest(test_base.DbTestCase): updated = db_api.task_update(TASKS[0]['workbook_name'], TASKS[0]['execution_id'], - created["id"], - {"description": "my new desc"}) + created['id'], + {'description': 'my new desc'}) self.assertIsInstance(updated, dict) - self.assertEqual(u'my new desc', updated["description"]) + self.assertEqual('my new desc', updated['description']) fetched = db_api.task_get(TASKS[0]['workbook_name'], TASKS[0]['execution_id'], - created["id"]) + created['id']) self.assertDictEqual(updated, fetched) def test_task_list(self): @@ -299,7 +307,7 @@ class TaskTest(test_base.DbTestCase): fetched = db_api.task_get(TASKS[0]['workbook_name'], TASKS[0]['execution_id'], - created["id"]) + created['id']) self.assertIsInstance(fetched, dict) self.assertDictEqual(created, fetched) @@ -308,7 +316,7 @@ class TaskTest(test_base.DbTestCase): created['id']) self.assertIsNone(db_api.task_get(TASKS[0]['workbook_name'], TASKS[0]['execution_id'], - created["id"])) + created['id'])) class TXTest(test_base.DbTestCase): @@ -319,7 +327,7 @@ class TXTest(test_base.DbTestCase): created = db_api.event_create(EVENTS[0]) self.assertIsInstance(created, dict) - fetched = db_api.event_get(created["id"]) + fetched = db_api.event_get(created['id']) self.assertIsInstance(fetched, dict) self.assertDictEqual(created, fetched) @@ -331,7 +339,7 @@ class TXTest(test_base.DbTestCase): self.assertFalse(self.is_db_session_open()) - fetched = db_api.event_get(created["id"]) + fetched = db_api.event_get(created['id']) self.assertIsNone(fetched) self.assertFalse(self.is_db_session_open()) @@ -343,7 +351,7 @@ class TXTest(test_base.DbTestCase): created = db_api.event_create(EVENTS[0]) self.assertIsInstance(created, dict) - fetched = db_api.event_get(created["id"]) + fetched = db_api.event_get(created['id']) self.assertIsInstance(fetched, dict) self.assertDictEqual(created, fetched) @@ -355,7 +363,7 @@ class TXTest(test_base.DbTestCase): self.assertFalse(self.is_db_session_open()) - fetched = db_api.event_get(created["id"]) + fetched = db_api.event_get(created['id']) self.assertIsInstance(fetched, dict) self.assertDictEqual(created, fetched) @@ -368,14 +376,14 @@ class TXTest(test_base.DbTestCase): created_event = db_api.event_create(EVENTS[0]) self.assertIsInstance(created_event, dict) - fetched_event = db_api.event_get(created_event["id"]) + fetched_event = db_api.event_get(created_event['id']) self.assertIsInstance(fetched_event, dict) self.assertDictEqual(created_event, fetched_event) created_workbook = db_api.workbook_create(WORKBOOKS[0]) self.assertIsInstance(created_workbook, dict) - fetched_workbook = db_api.workbook_get(created_workbook["name"]) + fetched_workbook = db_api.workbook_get(created_workbook['name']) self.assertIsInstance(fetched_workbook, dict) self.assertDictEqual(created_workbook, fetched_workbook) @@ -387,10 +395,10 @@ class TXTest(test_base.DbTestCase): self.assertFalse(self.is_db_session_open()) - fetched_event = db_api.event_get(created_event["id"]) + fetched_event = db_api.event_get(created_event['id']) self.assertIsNone(fetched_event) - fetched_workbook = db_api.workbook_get(created_workbook["name"]) + fetched_workbook = db_api.workbook_get(created_workbook['name']) self.assertIsNone(fetched_workbook) self.assertFalse(self.is_db_session_open()) @@ -402,14 +410,14 @@ class TXTest(test_base.DbTestCase): created_event = db_api.event_create(EVENTS[0]) self.assertIsInstance(created_event, dict) - fetched_event = db_api.event_get(created_event["id"]) + fetched_event = db_api.event_get(created_event['id']) self.assertIsInstance(fetched_event, dict) self.assertDictEqual(created_event, fetched_event) created_workbook = db_api.workbook_create(WORKBOOKS[0]) self.assertIsInstance(created_workbook, dict) - fetched_workbook = db_api.workbook_get(created_workbook["name"]) + fetched_workbook = db_api.workbook_get(created_workbook['name']) self.assertIsInstance(fetched_workbook, dict) self.assertDictEqual(created_workbook, fetched_workbook) @@ -421,11 +429,11 @@ class TXTest(test_base.DbTestCase): self.assertFalse(self.is_db_session_open()) - fetched_event = db_api.event_get(created_event["id"]) + fetched_event = db_api.event_get(created_event['id']) self.assertIsInstance(fetched_event, dict) self.assertDictEqual(created_event, fetched_event) - fetched_workbook = db_api.workbook_get(created_workbook["name"]) + fetched_workbook = db_api.workbook_get(created_workbook['name']) self.assertIsInstance(fetched_workbook, dict) self.assertDictEqual(created_workbook, fetched_workbook) diff --git a/mistral/tests/unit/engine/local/test_engine.py b/mistral/tests/unit/engine/local/test_engine.py index d9ed67f23..06075d4d7 100644 --- a/mistral/tests/unit/engine/local/test_engine.py +++ b/mistral/tests/unit/engine/local/test_engine.py @@ -28,6 +28,7 @@ ENGINE = engine.get_engine() CFG_PREFIX = "tests/resources/" WB_NAME = "my_workbook" +CONTEXT = None # TODO(rakhmerov): Use a meaningful value. #TODO(rakhmerov): add more tests for errors, execution stop etc. @@ -46,8 +47,8 @@ class TestLocalEngine(test_base.DbTestCase): @mock.patch.object(actions.RestAction, "run", mock.MagicMock(return_value={'state': states.RUNNING})) def test_engine_one_task(self): - execution = ENGINE.start_workflow_execution(WB_NAME, - "create-vms") + execution = ENGINE.start_workflow_execution(WB_NAME, "create-vms", + CONTEXT) task = db_api.tasks_get(WB_NAME, execution['id'])[0] @@ -67,8 +68,8 @@ class TestLocalEngine(test_base.DbTestCase): @mock.patch.object(actions.RestAction, "run", mock.MagicMock(return_value={'state': states.RUNNING})) def test_engine_multiple_tasks(self): - execution = ENGINE.start_workflow_execution(WB_NAME, - "backup-vms") + execution = ENGINE.start_workflow_execution(WB_NAME, "backup-vms", + CONTEXT) tasks = db_api.tasks_get(WB_NAME, execution['id']) @@ -109,9 +110,12 @@ class TestLocalEngine(test_base.DbTestCase): @mock.patch.object(states, "get_state_by_http_status_code", mock.MagicMock(return_value=states.SUCCESS)) def test_engine_sync_task(self): - execution = ENGINE.start_workflow_execution(WB_NAME, "create-vm-nova") + execution = ENGINE.start_workflow_execution(WB_NAME, "create-vm-nova", + CONTEXT) + task = db_api.tasks_get(WB_NAME, execution['id'])[0] execution = db_api.execution_get(WB_NAME, execution['id']) + self.assertEqual(execution['state'], states.SUCCESS) self.assertEqual(task['state'], states.SUCCESS) @@ -122,22 +126,28 @@ class TestLocalEngine(test_base.DbTestCase): @mock.patch.object(actions.RestAction, "run", mock.MagicMock(return_value={'state': states.SUCCESS})) def test_engine_tasks_on_success_finish(self): - execution = ENGINE.start_workflow_execution(WB_NAME, - "test_subsequent") + execution = ENGINE.start_workflow_execution(WB_NAME, "test_subsequent", + CONTEXT) tasks = db_api.tasks_get(WB_NAME, execution['id']) + self.assertEqual(len(tasks), 1) + execution = db_api.execution_get(WB_NAME, execution['id']) ENGINE.convey_task_result(WB_NAME, execution['id'], tasks[0]['id'], states.SUCCESS, None) tasks = db_api.tasks_get(WB_NAME, execution['id']) + self.assertEqual(len(tasks), 4) + attach_volumes = [t for t in tasks if t['name'] == 'attach-volumes'][0] + self.assertIn(attach_volumes, tasks) self.assertEqual(tasks[0]['state'], states.SUCCESS) self.assertEqual(tasks[1]['state'], states.IDLE) self.assertEqual(tasks[2]['state'], states.RUNNING) + ENGINE.convey_task_result(WB_NAME, execution['id'], tasks[2]['id'], states.SUCCESS, None) @@ -146,14 +156,17 @@ class TestLocalEngine(test_base.DbTestCase): states.SUCCESS, None) tasks = db_api.tasks_get(WB_NAME, execution['id']) + self.assertEqual(tasks[2]['state'], states.SUCCESS) self.assertEqual(tasks[1]['state'], states.RUNNING) + ENGINE.convey_task_result(WB_NAME, execution['id'], tasks[1]['id'], states.SUCCESS, None) tasks = db_api.tasks_get(WB_NAME, execution['id']) execution = db_api.execution_get(WB_NAME, execution['id']) + self.assertEqual(tasks[1]['state'], states.SUCCESS) self.assertEqual(execution['state'], states.SUCCESS) @@ -164,21 +177,27 @@ class TestLocalEngine(test_base.DbTestCase): @mock.patch.object(actions.RestAction, "run", mock.MagicMock(return_value={'state': states.SUCCESS})) def test_engine_tasks_on_error_finish(self): - execution = ENGINE.start_workflow_execution(WB_NAME, - "test_subsequent") + execution = ENGINE.start_workflow_execution(WB_NAME, "test_subsequent", + CONTEXT) + tasks = db_api.tasks_get(WB_NAME, execution['id']) execution = db_api.execution_get(WB_NAME, execution['id']) + ENGINE.convey_task_result(WB_NAME, execution['id'], tasks[0]['id'], states.ERROR, None) tasks = db_api.tasks_get(WB_NAME, execution['id']) + self.assertEqual(len(tasks), 4) + backup_vms = [t for t in tasks if t['name'] == 'backup-vms'][0] + self.assertIn(backup_vms, tasks) self.assertEqual(tasks[0]['state'], states.ERROR) self.assertEqual(tasks[1]['state'], states.IDLE) self.assertEqual(tasks[2]['state'], states.RUNNING) + ENGINE.convey_task_result(WB_NAME, execution['id'], tasks[2]['id'], states.SUCCESS, None) @@ -187,13 +206,16 @@ class TestLocalEngine(test_base.DbTestCase): states.SUCCESS, None) tasks = db_api.tasks_get(WB_NAME, execution['id']) + self.assertEqual(tasks[2]['state'], states.SUCCESS) self.assertEqual(tasks[1]['state'], states.RUNNING) + ENGINE.convey_task_result(WB_NAME, execution['id'], tasks[1]['id'], states.SUCCESS, None) tasks = db_api.tasks_get(WB_NAME, execution['id']) execution = db_api.execution_get(WB_NAME, execution['id']) + self.assertEqual(tasks[1]['state'], states.SUCCESS) self.assertEqual(execution['state'], states.SUCCESS) diff --git a/mistral/tests/unit/engine/scalable/test_engine.py b/mistral/tests/unit/engine/scalable/test_engine.py index 8fba41ce3..886b93729 100644 --- a/mistral/tests/unit/engine/scalable/test_engine.py +++ b/mistral/tests/unit/engine/scalable/test_engine.py @@ -28,6 +28,8 @@ ENGINE = engine.get_engine() CFG_PREFIX = "tests/resources/" WB_NAME = "my_workbook" +CONTEXT = None # TODO(rakhmerov): Use a meaningful value. + #TODO(rakhmerov): add more tests for errors, execution stop etc. @@ -48,8 +50,8 @@ class TestScalableEngine(test_base.DbTestCase): @mock.patch.object(actions.RestAction, "run", mock.MagicMock(return_value="result")) def test_engine_one_task(self): - execution = ENGINE.start_workflow_execution(WB_NAME, - "create-vms") + execution = ENGINE.start_workflow_execution(WB_NAME, "create-vms", + CONTEXT) task = db_api.tasks_get(WB_NAME, execution['id'])[0] @@ -71,8 +73,8 @@ class TestScalableEngine(test_base.DbTestCase): @mock.patch.object(actions.RestAction, "run", mock.MagicMock(return_value="result")) def test_engine_multiple_tasks(self): - execution = ENGINE.start_workflow_execution(WB_NAME, - "backup-vms") + execution = ENGINE.start_workflow_execution(WB_NAME, "backup-vms", + CONTEXT) tasks = db_api.tasks_get(WB_NAME, execution['id']) diff --git a/mistral/utils/yaql_utils.py b/mistral/utils/yaql_utils.py index 78f4159df..a19ad9e03 100644 --- a/mistral/utils/yaql_utils.py +++ b/mistral/utils/yaql_utils.py @@ -18,5 +18,4 @@ import yaql def evaluate(expression_str, data): - expression = yaql.parse(expression_str) - return expression.evaluate(data) + return yaql.parse(expression_str).evaluate(data) diff --git a/tools/lintstack.py b/tools/lintstack.py index 8b70c0db4..dbf9e8d3c 100755 --- a/tools/lintstack.py +++ b/tools/lintstack.py @@ -31,11 +31,11 @@ from pylint.reporters import text # Note(maoy): E1103 is error code related to partial type inference ignore_codes = ["E1103"] # Note(maoy): the error message is the pattern of E0202. It should be ignored -# for savanna.tests modules -ignore_messages = ["An attribute affected in savanna.tests"] +# for mistral.tests modules +ignore_messages = ["An attribute affected in mistral.tests"] # We ignore all errors in openstack.common because it should be checked # elsewhere. -ignore_modules = ["savanna/openstack/common/"] +ignore_modules = ["mistral/openstack/common/"] KNOWN_PYLINT_EXCEPTIONS_FILE = "tools/pylint_exceptions" @@ -130,7 +130,7 @@ class ErrorKeys(object): def run_pylint(): buff = StringIO.StringIO() reporter = text.ParseableTextReporter(output=buff) - args = ["--include-ids=y", "-E", "savanna"] + args = ["--include-ids=y", "-E", "mistral"] lint.Run(args, reporter=reporter, exit=False) val = buff.getvalue() buff.close()