diff --git a/mistral/engine/abstract_engine.py b/mistral/engine/abstract_engine.py index 250cf242..3fc5a4eb 100644 --- a/mistral/engine/abstract_engine.py +++ b/mistral/engine/abstract_engine.py @@ -22,14 +22,11 @@ from mistral import dsl from mistral import exceptions as exc from mistral.engine import states from mistral.engine import workflow +from mistral.engine import data_flow LOG = logging.getLogger(__name__) -# TODO(rakhmerov): Upcoming Data Flow changes: -# 1. Calculate "in_context" for all the tasks submitted for execution. -# 2. Transfer "in_context" along with task data over AMQP. - class AbstractEngine(object): @classmethod @@ -41,10 +38,10 @@ class AbstractEngine(object): def start_workflow_execution(cls, workbook_name, task_name, context): db_api.start_tx() - wb_dsl = cls._get_wb_dsl(workbook_name) - # Persist execution and tasks in DB. try: + wb_dsl = cls._get_wb_dsl(workbook_name) + execution = cls._create_execution(workbook_name, task_name, context) @@ -52,9 +49,14 @@ class AbstractEngine(object): tasks = cls._create_tasks( workflow.find_workflow_tasks(wb_dsl, task_name), wb_dsl, - workbook_name, execution['id'] + workbook_name, + execution['id'] ) + tasks_to_start = workflow.find_resolved_tasks(tasks) + + data_flow.prepare_tasks(tasks_to_start, context) + db_api.commit_tx() except Exception as e: raise exc.EngineException("Failed to create necessary DB objects:" @@ -62,10 +64,7 @@ class AbstractEngine(object): finally: db_api.end_tx() - # TODO(rakhmerov): This doesn't look correct anymore, we shouldn't - # start tasks which don't have dependencies but are reachable only - # via direct transitions. - cls._run_tasks(workflow.find_resolved_tasks(tasks)) + cls._run_tasks(tasks_to_start) return execution @@ -74,31 +73,40 @@ class AbstractEngine(object): task_id, state, result): db_api.start_tx() - wb_dsl = cls._get_wb_dsl(workbook_name) - #TODO(rakhmerov): validate state transition - - # Update task state. - task = db_api.task_update(workbook_name, execution_id, task_id, - {"state": state, "output": result}) - - execution = db_api.execution_get(workbook_name, execution_id) - - cls._create_next_tasks(task, wb_dsl, workbook_name, execution_id) - - # Determine what tasks need to be started. - tasks = db_api.tasks_get(workbook_name, execution_id) - # TODO(nmakhotkin) merge result into context - try: + wb_dsl = cls._get_wb_dsl(workbook_name) + #TODO(rakhmerov): validate state transition + + # Update task state. + task = db_api.task_update(workbook_name, execution_id, task_id, + {"state": state, "output": result}) + + execution = db_api.execution_get(workbook_name, execution_id) + + # Calculate task outbound context. + # TODO(rakhmerov): publish result into context selectively + outbound_context = \ + data_flow.merge_into_context(task['in_context'], result) + + cls._create_next_tasks(task, wb_dsl) + + # Determine what tasks need to be started. + tasks = db_api.tasks_get(workbook_name, execution_id) + new_exec_state = cls._determine_execution_state(execution, tasks) if execution['state'] != new_exec_state: - db_api.execution_update(workbook_name, execution_id, { - "state": new_exec_state - }) + execution = \ + db_api.execution_update(workbook_name, execution_id, { + "state": new_exec_state + }) LOG.info("Changed execution state: %s" % execution) + tasks_to_start = workflow.find_resolved_tasks(tasks) + + data_flow.prepare_tasks(tasks_to_start, outbound_context) + db_api.commit_tx() except Exception as e: raise exc.EngineException("Failed to create necessary DB objects:" @@ -109,8 +117,8 @@ class AbstractEngine(object): if states.is_stopped_or_finished(execution["state"]): return task - if tasks: - cls._run_tasks(workflow.find_resolved_tasks(tasks)) + if tasks_to_start: + cls._run_tasks(tasks_to_start) return task @@ -149,11 +157,11 @@ class AbstractEngine(object): }) @classmethod - def _create_next_tasks(cls, task, wb_dsl, workbook_name, execution_id): + def _create_next_tasks(cls, task, wb_dsl): dsl_tasks = workflow.find_tasks_after_completion(task, wb_dsl) - tasks = cls._create_tasks(dsl_tasks, wb_dsl, workbook_name, - execution_id) + tasks = cls._create_tasks(dsl_tasks, wb_dsl, task['workbook_name'], + task['execution_id']) return workflow.find_resolved_tasks(tasks) diff --git a/mistral/engine/actions/action_factory.py b/mistral/engine/actions/action_factory.py index b8bd384a..7ae03816 100644 --- a/mistral/engine/actions/action_factory.py +++ b/mistral/engine/actions/action_factory.py @@ -34,6 +34,7 @@ def create_action(task): def _get_mapping(): return { + action_types.ECHO: get_echo_action, action_types.REST_API: get_rest_action, action_types.MISTRAL_REST_API: get_mistral_rest_action, action_types.OSLO_RPC: get_amqp_action, @@ -48,6 +49,15 @@ def _find_action_result_helper(task, action): return {} +def get_echo_action(task): + action_type = a_h.get_action_type(task) + action_name = task['task_dsl']['action'].split(':')[1] + + output = task['service_dsl']['actions'][action_name].get('output', {}) + + return actions.EchoAction(action_type, action_name, output=output) + + def get_rest_action(task): action_type = a_h.get_action_type(task) action_name = task['task_dsl']['action'].split(':')[1] @@ -63,7 +73,7 @@ def get_rest_action(task): method = action_dsl['parameters'].get('method', "GET") # input_yaql = task.get('input') - # TODO(nmakhotkin) extract input from context within the YAQL expression + # TODO(nmakhotkin) extract input from context with the YAQL expression task_input = {} # expressions.evaluate(input_expr, ctx) task_data = {} diff --git a/mistral/engine/actions/action_helper.py b/mistral/engine/actions/action_helper.py index 18fe40cb..620f5f4b 100644 --- a/mistral/engine/actions/action_helper.py +++ b/mistral/engine/actions/action_helper.py @@ -15,9 +15,6 @@ # limitations under the License. from mistral.engine.actions import action_types as a_t -from mistral import exceptions as exc -from mistral.engine import states -from mistral.engine import expressions as expr def get_action_type(task): @@ -26,19 +23,3 @@ def get_action_type(task): def is_task_synchronous(task): return get_action_type(task) != a_t.MISTRAL_REST_API - - -def extract_state_result(action, action_result): - # All non-Mistral tasks are sync-auto because service doesn't know - # about Mistral and we need to receive the result immediately. - if action.type != a_t.MISTRAL_REST_API: - if action.result_helper.get('select'): - result = expr.evaluate(action.result_helper['select'], - action_result) - else: - result = action_result - # TODO(nmakhotkin) get state for other actions - state = states.get_state_by_http_status_code(action.status) - return state, result - raise exc.InvalidActionException("Error. Wrong type of action to " - "retrieve the result") diff --git a/mistral/engine/actions/action_types.py b/mistral/engine/actions/action_types.py index 16682b6f..fc466005 100644 --- a/mistral/engine/actions/action_types.py +++ b/mistral/engine/actions/action_types.py @@ -17,12 +17,13 @@ """Valid action types.""" +ECHO = 'ECHO' REST_API = 'REST_API' OSLO_RPC = 'OSLO_RPC' MISTRAL_REST_API = 'MISTRAL_REST_API' SEND_EMAIL = "SEND_EMAIL" -_ALL = [REST_API, OSLO_RPC, MISTRAL_REST_API, SEND_EMAIL] +_ALL = [ECHO, REST_API, OSLO_RPC, MISTRAL_REST_API, SEND_EMAIL] def is_valid(action_type): diff --git a/mistral/engine/actions/actions.py b/mistral/engine/actions/actions.py index 18f91d30..c0ea44f2 100644 --- a/mistral/engine/actions/actions.py +++ b/mistral/engine/actions/actions.py @@ -14,36 +14,61 @@ # See the License for the specific language governing permissions and # limitations under the License. +#TODO(dzimine):separate actions across different files/modules + +import abc +from email.mime.text import MIMEText +import smtplib + from amqplib import client_0_8 as amqp import requests -#TODO(dzimine):separate actions across different files/modules -import smtplib -from email.mime.text import MIMEText from mistral.openstack.common import log as logging +from mistral import exceptions as exc LOG = logging.getLogger(__name__) -class BaseAction(object): +class Action(object): status = None def __init__(self, action_type, action_name): self.type = action_type self.name = action_name - # Result_helper is a dict for retrieving result within YAQL expression - # and it belongs to action (for defining this attribute immediately - # at action creation). - self.result_helper = {} - + @abc.abstractmethod def run(self): + """Run action logic. + + :return: result of the action. Note that for asynchronous actions + it will always be None. + + In case if action failed this method must throw a ActionException + to indicate that. + """ pass -class RestAction(BaseAction): +class EchoAction(Action): + """Echo action. + This action just returns a configured value as a result without doing + anything else. The value of such action implementation is that it + can be used in development (for testing), demonstration and designing + of workflows themselves where echo action can play the role of temporary + stub. + """ + + def __init__(self, action_type, action_name, output): + super(EchoAction, self).__init__(action_type, action_name) + self.output = output + + def run(self): + return self.output + + +class RestAction(Action): def __init__(self, action_type, action_name, url, params={}, method="GET", headers={}, data={}): super(RestAction, self).__init__(action_type, action_name) @@ -57,20 +82,32 @@ class RestAction(BaseAction): LOG.info("Sending action HTTP request " "[method=%s, url=%s, params=%s, headers=%s]" % (self.method, self.url, self.params, self.headers)) - resp = requests.request(self.method, self.url, params=self.params, - headers=self.headers, data=self.data) + + try: + resp = requests.request(self.method, + self.url, + params=self.params, + headers=self.headers, + data=self.data) + except Exception as e: + raise exc.ActionException("Failed to send HTTP request: %s" % e) + LOG.info("Received HTTP response:\n%s\n%s" % (resp.status_code, resp.content)) + + # TODO(rakhmerov):Here we need to apply logic related with + # extracting a result as configured in DSL. + # Return rather json than text, but response can contain text also. self.status = resp.status_code try: return resp.json() except: - LOG.debug("HTTP response content is not json") + LOG.debug("HTTP response content is not json.") return resp.content -class OsloRPCAction(BaseAction): +class OsloRPCAction(Action): def __init__(self, action_type, action_name, host, userid, password, virtual_host, message, routing_key=None, port=5672, exchange=None, queue_name=None): @@ -116,7 +153,7 @@ class OsloRPCAction(BaseAction): self.status = None -class SendEmailAction(BaseAction): +class SendEmailAction(Action): def __init__(self, action_type, action_name, params, settings): super(SendEmailAction, self).__init__(action_type, action_name) #TODO(dzimine): validate parameters @@ -143,8 +180,10 @@ class SendEmailAction(BaseAction): message['Subject'] = self.subject message['From'] = self.sender message['To'] = self.to + try: s = smtplib.SMTP(self.smtp_server) + if self.password is not None: # Sequence to request TLS connection and log in (RFC-2487). s.ehlo() @@ -156,7 +195,5 @@ class SendEmailAction(BaseAction): to_addrs=self.to, msg=message.as_string()) except (smtplib.SMTPException, IOError) as e: - LOG.error("Error sending email message: %s" % e) - #NOTE(DZ): Raise Misral exception instead re-throwing SMTP? - # For now just logging the error here and re-thorw the original - raise + raise exc.ActionException("Failed to send an email message: %s" + % e) diff --git a/mistral/engine/data_flow.py b/mistral/engine/data_flow.py new file mode 100644 index 00000000..ad9b73f4 --- /dev/null +++ b/mistral/engine/data_flow.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.db import api as db_api +from mistral.engine import expressions as expr + +from mistral.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +def evaluate_task_input(task, context): + res = {} + + params = task['task_dsl'].get('input', {}) + + if not params: + return res + + for name, val in params.iteritems(): + if expr.is_expression(val): + res[name] = expr.evaluate(val, context) + else: + res[name] = val + + return res + + +def prepare_tasks(tasks, context): + for task in tasks: + # TODO(rakhmerov): Inbound context should be a merge of outbound + # contexts of task dependencies, if any. + task['in_context'] = context + task['input'] = evaluate_task_input(task, context) + + db_api.task_update(task['workbook_name'], + task['execution_id'], + task['id'], + {'in_context': task['in_context'], + 'input': task['input']}) + + +def merge_into_context(context, values): + if not context: + return None + + # TODO(rakhmerov): Take care of nested variables. + context.update(values) + + return context diff --git a/mistral/engine/expressions.py b/mistral/engine/expressions.py index 2aa19d4b..579de311 100644 --- a/mistral/engine/expressions.py +++ b/mistral/engine/expressions.py @@ -53,5 +53,10 @@ class YAQLEvaluator(Evaluator): _EVALUATOR = YAQLEvaluator() +def is_expression(s): + # TODO(rakhmerov): It should be generalized since it may not be YAQL. + return s and s.startswith('$.') + + def evaluate(expression, context): return _EVALUATOR.evaluate(expression, context) diff --git a/mistral/engine/local/engine.py b/mistral/engine/local/engine.py index cb563634..8f8c8b4c 100644 --- a/mistral/engine/local/engine.py +++ b/mistral/engine/local/engine.py @@ -35,26 +35,32 @@ class LocalEngine(abs_eng.AbstractEngine): @classmethod def _run_task(cls, task): action = a_f.create_action(task) + LOG.info("Task is started - %s" % task['name']) - db_api.task_update(task['workbook_name'], task['execution_id'], - task['id'], {'state': states.RUNNING}) + if a_h.is_task_synchronous(task): - # In case of sync execution we run task - # and change state right after that. - action_result = action.run() - state, result = a_h.extract_state_result(action, action_result) - # TODO(nmakhotkin) save the result in the context with key - # action.result_helper['store_as'] + try: + state, result = states.SUCCESS, action.run() + except exc.ActionException: + state, result = states.ERROR, None - if states.is_valid(state): - return cls.convey_task_result(task['workbook_name'], - task['execution_id'], - task['id'], state, result) - else: - raise exc.EngineException("Action has returned invalid " - "state: %s" % state) + cls.convey_task_result(task['workbook_name'], + task['execution_id'], + task['id'], + state, result) + else: + try: + action.run() - return action.run() + db_api.task_update(task['workbook_name'], + task['execution_id'], + task['id'], + {'state': states.RUNNING}) + except exc.ActionException: + cls.convey_task_result(task['workbook_name'], + task['execution_id'], + task['id'], + states.ERROR, None) def get_engine(): diff --git a/mistral/engine/scalable/executor/executor.py b/mistral/engine/scalable/executor/executor.py index 1a4fd620..6c0f1d8a 100644 --- a/mistral/engine/scalable/executor/executor.py +++ b/mistral/engine/scalable/executor/executor.py @@ -27,29 +27,36 @@ from mistral.engine.actions import action_helper as a_h LOG = logging.getLogger(__name__) -# TODO(rakhmerov): Upcoming Data Flow changes: -# 1. Receive "in_context" along with task data. -# 2. Apply task input expression to "in_context" and calculate "input". - def do_task_action(task): LOG.info("Starting task action [task_id=%s, action='%s', service='%s'" % (task['id'], task['task_dsl']['action'], task['service_dsl'])) - action = a_f.create_action(task) - if a_h.is_task_synchronous(task): - action_result = action.run() - state, result = a_h.extract_state_result(action, action_result) - # TODO(nmakhotkin) save the result in the context with key - # action.result_helper['store_as'] - if states.is_valid(state): - return engine.convey_task_result(task['workbook_name'], - task['execution_id'], - task['id'], state, result) - else: - raise exc.EngineException("Action has returned invalid " - "state: %s" % state) - action.run() + action = a_f.create_action(task) + + if a_h.is_task_synchronous(task): + try: + state, result = states.SUCCESS, action.run() + except exc.ActionException: + state, result = states.ERROR, None + + engine.convey_task_result(task['workbook_name'], + task['execution_id'], + task['id'], + state, result) + else: + try: + action.run() + + db_api.task_update(task['workbook_name'], + task['execution_id'], + task['id'], + {'state': states.RUNNING}) + except exc.ActionException: + engine.convey_task_result(task['workbook_name'], + task['execution_id'], + task['id'], + states.ERROR, None) def handle_task_error(task, exception): diff --git a/mistral/engine/workflow.py b/mistral/engine/workflow.py index 687d2fe6..02c89471 100644 --- a/mistral/engine/workflow.py +++ b/mistral/engine/workflow.py @@ -89,10 +89,13 @@ def find_tasks_after_completion(task, wb_dsl): found_tasks += _get_tasks_to_schedule(tasks_on_finish, wb_dsl) LOG.debug("Found tasks: %s" % found_tasks) + workflow_tasks = [] for t in found_tasks: workflow_tasks += find_workflow_tasks(wb_dsl, t['name']) + LOG.debug("Workflow tasks to schedule: %s" % workflow_tasks) + return workflow_tasks diff --git a/mistral/exceptions.py b/mistral/exceptions.py index b0333de8..0bc40b01 100644 --- a/mistral/exceptions.py +++ b/mistral/exceptions.py @@ -59,6 +59,15 @@ class DBDuplicateEntry(MistralException): self.message = message +class ActionException(MistralException): + code = "ACTION_ERROR" + + def __init__(self, message=None): + super(MistralException, self).__init__(message) + if message: + self.message = message + + class EngineException(MistralException): code = "ENGINE_ERROR" diff --git a/mistral/tests/api/base.py b/mistral/tests/api/base.py index 2660e503..b4853218 100644 --- a/mistral/tests/api/base.py +++ b/mistral/tests/api/base.py @@ -21,7 +21,7 @@ from webtest.app import AppError from oslo.config import cfg from mistral.openstack.common import importutils -from mistral.tests.unit import base as test_base +from mistral.tests import base # We need to make sure that all configuration properties are registered. importutils.import_module("mistral.config") @@ -34,7 +34,7 @@ cfg.CONF.register_opt(cfg.BoolOpt('auth_enable', default=False), group='pecan') __all__ = ['FunctionalTest'] -class FunctionalTest(test_base.DbTestCase): +class FunctionalTest(base.DbTestCase): """Used for functional tests where you need to test your literal application and its integration with the framework. """ diff --git a/mistral/tests/base.py b/mistral/tests/base.py index 9c4074ca..66fb37fe 100644 --- a/mistral/tests/base.py +++ b/mistral/tests/base.py @@ -15,10 +15,25 @@ # limitations under the License. import unittest2 +import pkg_resources as pkg +import os +import tempfile + +from mistral import version +from mistral.db.sqlalchemy import api as db_api +from mistral.openstack.common.db.sqlalchemy import session + + +RESOURCES_PATH = 'tests/resources/' + + +def get_resource(resource_name): + return open(pkg.resource_filename( + version.version_info.package, + RESOURCES_PATH + resource_name)).read() class BaseTest(unittest2.TestCase): - def setUp(self): super(BaseTest, self).setUp() @@ -28,3 +43,19 @@ class BaseTest(unittest2.TestCase): super(BaseTest, self).tearDown() # TODO: add whatever is needed for all Mistral tests in here + + +class DbTestCase(BaseTest): + + def setUp(self): + self.db_fd, self.db_path = tempfile.mkstemp() + session.set_defaults('sqlite:///' + self.db_path, self.db_path) + db_api.setup_db() + + def tearDown(self): + db_api.drop_db() + os.close(self.db_fd) + os.unlink(self.db_path) + + def is_db_session_open(self): + return db_api._get_thread_local_session() is not None diff --git a/mistral/tests/resources/data_flow/two_dependent_tasks.yaml b/mistral/tests/resources/data_flow/two_dependent_tasks.yaml new file mode 100644 index 00000000..0592e49c --- /dev/null +++ b/mistral/tests/resources/data_flow/two_dependent_tasks.yaml @@ -0,0 +1,39 @@ +Services: + MyService: + type: ECHO + actions: + build_full_name: + output: + full_name: John Doe + build_greeting: + output: + greeting: Hello, John Doe! + + +Workflow: + # context = { + # 'person': { + # 'first_name': 'John', + # 'last_name': 'Doe', + # 'address': { + # 'street': '124352 Broadway Street', + # 'city': 'Gloomington', + # 'country': 'USA' + # } + # } + # } + + tasks: + build_full_name: + action: MyService:build_full_name + input: + first_name: $.person.first_name + last_name: $.person.last_name + output: full_name + + build_greeting: + requires: [build_full_name] + action: MyService:build_greeting + input: + full_name: $.full_name + output: greeting diff --git a/mistral/tests/unit/db/test_sqlalchemy_db_api.py b/mistral/tests/unit/db/test_sqlalchemy_db_api.py index d23a1793..2a9b609a 100644 --- a/mistral/tests/unit/db/test_sqlalchemy_db_api.py +++ b/mistral/tests/unit/db/test_sqlalchemy_db_api.py @@ -17,7 +17,7 @@ from mistral.openstack.common import timeutils from mistral.db.sqlalchemy import api as db_api -from mistral.tests.unit import base as test_base +from mistral.tests import base as test_base EVENTS = [ diff --git a/mistral/tests/unit/engine/actions/test_action_factory.py b/mistral/tests/unit/engine/actions/test_action_factory.py index 9eb49563..3c0adabc 100644 --- a/mistral/tests/unit/engine/actions/test_action_factory.py +++ b/mistral/tests/unit/engine/actions/test_action_factory.py @@ -14,15 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy -import mock import unittest2 -from mistral.engine.actions import actions from mistral.engine.actions import action_factory -from mistral.engine.actions import action_helper from mistral.engine.actions import action_types -from mistral.engine import states SAMPLE_TASK = { @@ -126,31 +121,3 @@ class ActionFactoryTest(unittest2.TestCase): self.assertIn(email, action.to) self.assertEqual(task['service_dsl']['parameters']['smtp_server'], action.smtp_server) - - @mock.patch.object(actions.RestAction, "run", - mock.MagicMock(return_value=SAMPLE_RESULT)) - def test_action_result_with_results(self): - task = copy.deepcopy(SAMPLE_TASK) - task['service_dsl'].update({'type': action_types.REST_API}) - create_vm = task['service_dsl']['actions']['create-vm'] - create_vm.update(SAMPLE_RESULT_HELPER) - action = action_factory.create_action(task) - action_result = action.run() - action.status = 200 - state, result = action_helper.extract_state_result(action, - action_result) - self.assertEqual(state, states.SUCCESS) - self.assertEqual(result, SAMPLE_RESULT['server']['id']) - - @mock.patch.object(actions.RestAction, "run", - mock.MagicMock(return_value=SAMPLE_RESULT)) - def test_action_result_without_results(self): - task = copy.deepcopy(SAMPLE_TASK) - task['service_dsl'].update({'type': action_types.REST_API}) - action = action_factory.create_action(task) - action_result = action.run() - action.status = 200 - state, result = action_helper.extract_state_result(action, - action_result) - self.assertEqual(state, states.SUCCESS) - self.assertEqual(result, SAMPLE_RESULT) diff --git a/mistral/tests/unit/base.py b/mistral/tests/unit/engine/actions/test_fake_action.py similarity index 50% rename from mistral/tests/unit/base.py rename to mistral/tests/unit/engine/actions/test_fake_action.py index e539c1ca..7a60fa7a 100644 --- a/mistral/tests/unit/base.py +++ b/mistral/tests/unit/engine/actions/test_fake_action.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright 2013 - Mirantis, Inc. +# Copyright 2013 - StackStorm, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,27 +14,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +from mistral.engine.actions import actions +from mistral.engine.actions import action_types -import os -import tempfile - -import unittest2 - -from mistral.db.sqlalchemy import api as db_api -from mistral.openstack.common.db.sqlalchemy import session +from mistral.tests import base -class DbTestCase(unittest2.TestCase): +class FakeActionTest(base.BaseTest): - def setUp(self): - self.db_fd, self.db_path = tempfile.mkstemp() - session.set_defaults('sqlite:///' + self.db_path, self.db_path) - db_api.setup_db() + def test_send_email_real(self): + expected = "my output" - def tearDown(self): - db_api.drop_db() - os.close(self.db_fd) - os.unlink(self.db_path) + action = actions.EchoAction(action_types.ECHO, "test", output=expected) - def is_db_session_open(self): - return db_api._get_thread_local_session() is not None + self.assertEqual(action.run(), expected) diff --git a/mistral/tests/unit/engine/actions/test_send_email_action.py b/mistral/tests/unit/engine/actions/test_send_email_action.py index b9e492b4..9cf4e9e1 100644 --- a/mistral/tests/unit/engine/actions/test_send_email_action.py +++ b/mistral/tests/unit/engine/actions/test_send_email_action.py @@ -22,6 +22,7 @@ from email.parser import Parser from mistral.engine.actions import actions from mistral.engine.actions import action_types +from mistral import exceptions as exc ACTION_TYPE = action_types.SEND_EMAIL ACTION_NAME = "TEMPORARY" @@ -117,9 +118,7 @@ class SendEmailActionTest(unittest2.TestCase): ACTION_TYPE, ACTION_NAME, self.params, self.settings) try: action.run() - except IOError: + except exc.ActionException: pass else: self.assertFalse("Must throw exception") - - self.assertTrue(log.error.called) diff --git a/mistral/tests/unit/engine/local/test_engine.py b/mistral/tests/unit/engine/local/test_engine.py index 06075d4d..67956cc4 100644 --- a/mistral/tests/unit/engine/local/test_engine.py +++ b/mistral/tests/unit/engine/local/test_engine.py @@ -14,35 +14,26 @@ # limitations under the License. import mock -import pkg_resources as pkg from mistral.db import api as db_api from mistral.engine.actions import actions from mistral.engine.local import engine from mistral.engine import states -from mistral import version -from mistral.tests.unit import base as test_base +from mistral.tests import base ENGINE = engine.get_engine() -CFG_PREFIX = "tests/resources/" WB_NAME = "my_workbook" CONTEXT = None # TODO(rakhmerov): Use a meaningful value. #TODO(rakhmerov): add more tests for errors, execution stop etc. -def get_cfg(cfg_suffix): - return open(pkg.resource_filename( - version.version_info.package, - CFG_PREFIX + cfg_suffix)).read() - - -class TestLocalEngine(test_base.DbTestCase): +class TestLocalEngine(base.DbTestCase): @mock.patch.object(db_api, "workbook_get", mock.MagicMock(return_value={ - 'definition': get_cfg("test_rest.yaml") + 'definition': base.get_resource("test_rest.yaml") })) @mock.patch.object(actions.RestAction, "run", mock.MagicMock(return_value={'state': states.RUNNING})) @@ -63,7 +54,7 @@ class TestLocalEngine(test_base.DbTestCase): @mock.patch.object(db_api, "workbook_get", mock.MagicMock(return_value={ - 'definition': get_cfg("test_rest.yaml") + 'definition': base.get_resource("test_rest.yaml") })) @mock.patch.object(actions.RestAction, "run", mock.MagicMock(return_value={'state': states.RUNNING})) @@ -105,7 +96,7 @@ class TestLocalEngine(test_base.DbTestCase): mock.MagicMock(return_value={'state': states.SUCCESS})) @mock.patch.object(db_api, "workbook_get", mock.MagicMock(return_value={ - 'definition': get_cfg("test_rest.yaml") + 'definition': base.get_resource("test_rest.yaml") })) @mock.patch.object(states, "get_state_by_http_status_code", mock.MagicMock(return_value=states.SUCCESS)) @@ -121,7 +112,7 @@ class TestLocalEngine(test_base.DbTestCase): @mock.patch.object(db_api, "workbook_get", mock.MagicMock(return_value={ - 'definition': get_cfg("test_rest.yaml") + 'definition': base.get_resource("test_rest.yaml") })) @mock.patch.object(actions.RestAction, "run", mock.MagicMock(return_value={'state': states.SUCCESS})) @@ -172,7 +163,7 @@ class TestLocalEngine(test_base.DbTestCase): @mock.patch.object(db_api, "workbook_get", mock.MagicMock(return_value={ - 'definition': get_cfg("test_rest.yaml") + 'definition': base.get_resource("test_rest.yaml") })) @mock.patch.object(actions.RestAction, "run", mock.MagicMock(return_value={'state': states.SUCCESS})) diff --git a/mistral/tests/unit/engine/scalable/test_engine.py b/mistral/tests/unit/engine/scalable/test_engine.py index 886b9372..ccbff4c7 100644 --- a/mistral/tests/unit/engine/scalable/test_engine.py +++ b/mistral/tests/unit/engine/scalable/test_engine.py @@ -14,19 +14,16 @@ # limitations under the License. import mock -import pkg_resources as pkg from mistral.db import api as db_api from mistral.engine.actions import actions from mistral.engine.scalable import engine from mistral.engine import states -from mistral import version -from mistral.tests.unit import base as test_base +from mistral.tests import base ENGINE = engine.get_engine() -CFG_PREFIX = "tests/resources/" WB_NAME = "my_workbook" CONTEXT = None # TODO(rakhmerov): Use a meaningful value. @@ -34,18 +31,12 @@ CONTEXT = None # TODO(rakhmerov): Use a meaningful value. #TODO(rakhmerov): add more tests for errors, execution stop etc. -def get_cfg(cfg_suffix): - return open(pkg.resource_filename( - version.version_info.package, - CFG_PREFIX + cfg_suffix)).read() - - -class TestScalableEngine(test_base.DbTestCase): +class TestScalableEngine(base.DbTestCase): @mock.patch.object(engine.ScalableEngine, "_notify_task_executors", mock.MagicMock(return_value="")) @mock.patch.object(db_api, "workbook_get", mock.MagicMock(return_value={ - 'definition': get_cfg("test_rest.yaml") + 'definition': base.get_resource("test_rest.yaml") })) @mock.patch.object(actions.RestAction, "run", mock.MagicMock(return_value="result")) @@ -68,7 +59,7 @@ class TestScalableEngine(test_base.DbTestCase): mock.MagicMock(return_value="")) @mock.patch.object(db_api, "workbook_get", mock.MagicMock(return_value={ - 'definition': get_cfg("test_rest.yaml") + 'definition': base.get_resource("test_rest.yaml") })) @mock.patch.object(actions.RestAction, "run", mock.MagicMock(return_value="result")) diff --git a/mistral/tests/unit/engine/scalable/test_executor.py b/mistral/tests/unit/engine/scalable/test_executor.py index e9221d01..0e4e7b27 100644 --- a/mistral/tests/unit/engine/scalable/test_executor.py +++ b/mistral/tests/unit/engine/scalable/test_executor.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from mistral.tests.unit import base as test_base +from mistral.tests import base -class TestTaskExecutor(test_base.DbTestCase): +class TestTaskExecutor(base.DbTestCase): def setUp(self): super(TestTaskExecutor, self).setUp() self.wb_name = "my_workbook" diff --git a/mistral/tests/unit/engine/test_data_flow.py b/mistral/tests/unit/engine/test_data_flow.py new file mode 100644 index 00000000..416872bd --- /dev/null +++ b/mistral/tests/unit/engine/test_data_flow.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.openstack.common import log as logging +from mistral.tests import base +from mistral.db import api as db_api +from mistral.engine.local import engine +from mistral.engine import states + +LOG = logging.getLogger(__name__) + +ENGINE = engine.get_engine()() + +CONTEXT = { + 'person': { + 'first_name': 'John', + 'last_name': 'Doe', + 'address': { + 'street': '124352 Broadway Street', + 'city': 'Gloomington', + 'country': 'USA' + } + } +} + + +def create_workbook(definition_path): + return db_api.workbook_create({ + 'name': 'my_workbook', + 'definition': base.get_resource(definition_path) + }) + + +class DataFlowTest(base.DbTestCase): + def test_two_dependent_tasks(self): + wb = create_workbook('data_flow/two_dependent_tasks.yaml') + + execution = ENGINE.start_workflow_execution(wb['name'], + 'build_greeting', + CONTEXT) + + # We have to reread execution to get its latest version. + execution = db_api.execution_get(execution['workbook_name'], + execution['id']) + + self.assertEqual(execution['state'], states.SUCCESS) + self.assertDictEqual(execution['context'], CONTEXT) + + tasks = db_api.tasks_get(wb['name'], execution['id']) + + self.assertEqual(2, len(tasks)) + + if tasks[0]['name'] == 'build_full_name': + build_full_name_task = tasks[0] + build_greeting_task = tasks[1] + else: + build_full_name_task = tasks[1] + build_greeting_task = tasks[0] + + self.assertEqual(build_full_name_task['name'], 'build_full_name') + self.assertEqual(build_greeting_task['name'], 'build_greeting') + + # Check the first task. + self.assertEqual(states.SUCCESS, build_full_name_task['state']) + self.assertDictEqual(CONTEXT, build_full_name_task['in_context']) + self.assertDictEqual({'first_name': 'John', 'last_name': 'Doe'}, + build_full_name_task['input']) + self.assertDictEqual({'full_name': 'John Doe'}, + build_full_name_task['output']) + + # Check the second task. + in_context = CONTEXT.copy() + in_context['full_name'] = 'John Doe' + + self.assertEqual(states.SUCCESS, build_greeting_task['state']) + self.assertDictEqual(in_context, build_greeting_task['in_context']) + self.assertDictEqual({'full_name': 'John Doe'}, + build_greeting_task['input']) + self.assertDictEqual({'greeting': 'Hello, John Doe!'}, + build_greeting_task['output']) + + # TODO(rakhmerov): add more checks diff --git a/mistral/tests/unit/engine/test_data_flow_module.py b/mistral/tests/unit/engine/test_data_flow_module.py new file mode 100644 index 00000000..a21ce531 --- /dev/null +++ b/mistral/tests/unit/engine/test_data_flow_module.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.engine import data_flow +from mistral.tests import base +from mistral.db import api as db_api + +from mistral.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +WB_NAME = 'my_workbook' +EXEC_ID = '1' + +CONTEXT = { + 'param1': 'val1', + 'param2': 'val2', + 'param3': { + 'param31': 'val31', + 'param32': 'val32' + } +} + +TASK = { + 'workbook_name': WB_NAME, + 'execution_id': EXEC_ID, + 'name': 'my_task', + 'task_dsl': { + 'input': { + 'p1': 'My string', + 'p2': '$.param3.param32' + } + } +} + + +class DataFlowTest(base.DbTestCase): + def test_prepare_task_input(self): + input = data_flow.evaluate_task_input(TASK, CONTEXT) + + self.assertEqual(len(input), 2) + self.assertEqual(input['p1'], 'My string') + self.assertEqual(input['p2'], 'val32') + + def test_prepare_tasks(self): + task = db_api.task_create(WB_NAME, EXEC_ID, TASK.copy()) + tasks = [task] + + data_flow.prepare_tasks(tasks, CONTEXT) + + db_task = db_api.task_get(WB_NAME, EXEC_ID, tasks[0]['id']) + + self.assertDictEqual(db_task['in_context'], CONTEXT) + self.assertDictEqual(db_task['input'], { + 'p1': 'My string', + 'p2': 'val32' + }) + + def test_merge_into_context(self): + ctx = data_flow.merge_into_context(CONTEXT.copy(), + {'new_key1': 'new_val1'}) + + self.assertEqual(ctx['new_key1'], 'new_val1') diff --git a/mistral/tests/unit/engine/test_workflow.py b/mistral/tests/unit/engine/test_workflow.py index a45676ee..4fb97d4e 100644 --- a/mistral/tests/unit/engine/test_workflow.py +++ b/mistral/tests/unit/engine/test_workflow.py @@ -18,7 +18,7 @@ import pkg_resources as pkg from mistral import dsl from mistral import version -from mistral.tests.unit import base +from mistral.tests import base from mistral.engine import states from mistral.engine import workflow diff --git a/mistral/tests/unit/events/test_events.py b/mistral/tests/unit/events/test_events.py index c03b8edd..20e5d52e 100644 --- a/mistral/tests/unit/events/test_events.py +++ b/mistral/tests/unit/events/test_events.py @@ -18,7 +18,7 @@ import pkg_resources as pkg from mistral.db import api as db_api from mistral import dsl -from mistral.tests.unit import base +from mistral.tests import base from mistral import version from mistral.services import scheduler diff --git a/mistral/tests/unit/test_scheduler.py b/mistral/tests/unit/test_scheduler.py index 8bf9039d..79e37264 100644 --- a/mistral/tests/unit/test_scheduler.py +++ b/mistral/tests/unit/test_scheduler.py @@ -20,7 +20,7 @@ from datetime import timedelta from mistral.openstack.common import timeutils from mistral.services import scheduler as s -from mistral.tests.unit import base as test_base +from mistral.tests import base as test_base SAMPLE_EVENT = {