diff --git a/mistral/db/sqlalchemy/models.py b/mistral/db/sqlalchemy/models.py index 48ac3ffd5..49b681439 100644 --- a/mistral/db/sqlalchemy/models.py +++ b/mistral/db/sqlalchemy/models.py @@ -91,8 +91,8 @@ class Task(mb.MistralBase): workbook_name = sa.Column(sa.String(80)) execution_id = sa.Column(sa.String(36)) description = sa.Column(sa.String()) - task_dsl = sa.Column(st.JsonDictType()) - service_dsl = sa.Column(st.JsonDictType()) + task_spec = sa.Column(st.JsonDictType()) + service_spec = sa.Column(st.JsonDictType()) state = sa.Column(sa.String(20)) tags = sa.Column(st.JsonListType()) diff --git a/mistral/dsl.py b/mistral/dsl.py deleted file mode 100644 index 421df1354..000000000 --- a/mistral/dsl.py +++ /dev/null @@ -1,116 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright 2013 - Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import yaml -from yaml import error - - -class Parser(object): - """Mistral DSL parser. - - Loads a workbook definition in YAML format as described in Mistral DSL - specification and provides various methods to access DSL entities like - tasks and actions in a form of dictionary. - """ - def __init__(self, workbook_definition): - try: - self.doc = yaml.safe_load(workbook_definition) - except error.YAMLError as exc: - raise RuntimeError("Definition could not be parsed: %s\n" % exc) - - def get_services(self): - services = [] - for service_name in self.doc.get("Services", []): - services.append(self.doc["Services"][service_name]) - return services - - def get_service(self, service_name): - return self.doc["Services"].get(service_name, {}) - - def get_events(self): - events_from_doc = self.doc["Workflow"].get("events", None) - if not events_from_doc: - return [] - events = [] - for name in events_from_doc: - event_dict = {'name': name} - event_dict.update(events_from_doc[name]) - events.append(event_dict) - return events - - def get_tasks(self): - tasks = self.doc.get("Workflow", {}).get("tasks", {}) - - for _, task_dsl in tasks.iteritems(): - task_dsl["service_name"] = task_dsl["action"].split(':')[0] - req = task_dsl.get("requires") - if req and isinstance(req, list): - task_dsl["requires"] = dict(zip(req, ['']*len(req))) - - return tasks - - def get_task(self, task_name): - task = self.get_tasks().get(task_name, {}) - if task: - task['name'] = task_name - return task - - def get_task_dsl_property(self, task_name, property_name): - task_dsl = self.get_task(task_name) - return task_dsl.get(property_name) - - def get_task_on_error(self, task_name): - task = self.get_task_dsl_property(task_name, "on-error") - if task: - return task if isinstance(task, dict) else {task: ''} - return None - - def get_task_on_success(self, task_name): - task = self.get_task_dsl_property(task_name, "on-success") - if task: - return task if isinstance(task, dict) else {task: ''} - return None - - def get_task_on_finish(self, task_name): - task = self.get_task_dsl_property(task_name, "on-finish") - if task: - return task if isinstance(task, dict) else {task: ''} - return None - - def get_task_input(self, task_name): - return self.get_task_dsl_property(task_name, "input") - - def get_action(self, task_action_name): - if task_action_name.find(":") == -1: - return {} - service_name = task_action_name.split(':')[0] - action_name = task_action_name.split(':')[1] - action = self.get_service(service_name)['actions'][action_name] - return action - - def get_actions(self, service_name): - service = self.get_service(service_name) - return service.get('actions', []) - - def get_service_names(self): - names = [] - for name in self.doc['Services']: - names.append(name) - return names - - def get_event_task_name(self, event_name): - event = self.doc["Workflow"]["events"].get(event_name) - return event.get('tasks') if event else "" diff --git a/mistral/dsl_parser.py b/mistral/dsl_parser.py new file mode 100644 index 000000000..b7408feb9 --- /dev/null +++ b/mistral/dsl_parser.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import yaml +from yaml import error +from mistral.workbook import workbook + + +def parse(workbook_definition): + """Loads a workbook definition in YAML format as dictionary object.""" + try: + return yaml.safe_load(workbook_definition) + except error.YAMLError as exc: + raise RuntimeError("Definition could not be parsed: %s\n" % exc) + + +def get_workbook(workbook_definition): + return workbook.WorkbookSpec(parse(workbook_definition)) diff --git a/mistral/engine/abstract_engine.py b/mistral/engine/abstract_engine.py index 3746b9101..9e4c48078 100644 --- a/mistral/engine/abstract_engine.py +++ b/mistral/engine/abstract_engine.py @@ -18,7 +18,7 @@ import abc from mistral.openstack.common import log as logging from mistral.db import api as db_api -from mistral import dsl +from mistral import dsl_parser as parser from mistral import exceptions as exc from mistral.engine import states from mistral.engine import workflow @@ -38,19 +38,17 @@ class AbstractEngine(object): def start_workflow_execution(cls, workbook_name, task_name, context): db_api.start_tx() + workbook = cls._get_workbook(workbook_name) # Persist execution and tasks in DB. try: - wb_dsl = cls._get_wb_dsl(workbook_name) - execution = cls._create_execution(workbook_name, task_name, context) tasks = cls._create_tasks( - workflow.find_workflow_tasks(wb_dsl, task_name), - wb_dsl, - workbook_name, - execution['id'] + workflow.find_workflow_tasks(workbook, task_name), + workbook, + workbook_name, execution['id'] ) tasks_to_start = workflow.find_resolved_tasks(tasks) @@ -73,10 +71,9 @@ class AbstractEngine(object): task_id, state, result): db_api.start_tx() + workbook = cls._get_workbook(workbook_name) try: - wb_dsl = cls._get_wb_dsl(workbook_name) #TODO(rakhmerov): validate state transition - task = db_api.task_get(workbook_name, execution_id, task_id) task_output = data_flow.get_task_output(task, result) @@ -90,7 +87,7 @@ class AbstractEngine(object): # Calculate task outbound context. outbound_context = data_flow.get_outbound_context(task) - cls._create_next_tasks(task, wb_dsl) + cls._create_next_tasks(task, workbook) # Determine what tasks need to be started. tasks = db_api.tasks_get(workbook_name, execution_id) @@ -159,38 +156,37 @@ class AbstractEngine(object): }) @classmethod - def _create_next_tasks(cls, task, wb_dsl): - dsl_tasks = workflow.find_tasks_after_completion(task, wb_dsl) + def _create_next_tasks(cls, task, workbook): + tasks = workflow.find_tasks_after_completion(task, workbook) - tasks = cls._create_tasks(dsl_tasks, wb_dsl, task['workbook_name'], - task['execution_id']) + db_tasks = cls._create_tasks(tasks, workbook, task['workbook_name'], + task['execution_id']) - return workflow.find_resolved_tasks(tasks) + return workflow.find_resolved_tasks(db_tasks) @classmethod - def _create_tasks(cls, dsl_tasks, wb_dsl, workbook_name, execution_id): + def _create_tasks(cls, task_list, workbook, workbook_name, execution_id): tasks = [] - for dsl_task in dsl_tasks: - task = db_api.task_create(workbook_name, execution_id, { - "name": dsl_task["name"], - "requires": dsl_task.get("requires", {}), - "task_dsl": dsl_task, - "service_dsl": wb_dsl.get_service(dsl_task["service_name"]), + for task in task_list: + db_task = db_api.task_create(workbook_name, execution_id, { + "name": task.name, + "requires": task.requires, + "task_spec": task.to_dict(), + "service_spec": workbook.services.get( + task.get_action_service()).to_dict(), "state": states.IDLE, - "tags": dsl_task.get("tags", None) + "tags": task.get_property("tags", None) }) - tasks.append(task) + tasks.append(db_task) return tasks @classmethod - def _get_wb_dsl(cls, workbook_name): + def _get_workbook(cls, workbook_name): wb = db_api.workbook_get(workbook_name) - wb_dsl = dsl.Parser(wb["definition"]) - - return wb_dsl + return parser.get_workbook(wb["definition"]) @classmethod def _determine_execution_state(cls, execution, tasks): diff --git a/mistral/engine/actions/action_factory.py b/mistral/engine/actions/action_factory.py index 7ae038167..1435a4145 100644 --- a/mistral/engine/actions/action_factory.py +++ b/mistral/engine/actions/action_factory.py @@ -18,17 +18,21 @@ from mistral.engine.actions import actions from mistral.engine.actions import action_types from mistral.engine.actions import action_helper as a_h import mistral.exceptions as exc +from mistral.workbook import services +from mistral.workbook import tasks -def create_action(task): - action_type = a_h.get_action_type(task) +def create_action(db_task): + action_type = a_h.get_action_type(db_task) + task = tasks.TaskSpec(db_task['task_spec']) + service = services.ServiceSpec(db_task['service_spec']) if not action_types.is_valid(action_type): raise exc.InvalidActionException("Action type is not supported: %s" % action_type) - action = _get_mapping()[action_type](task) - action.result_helper = _find_action_result_helper(task, action) + action = _get_mapping()[action_type](db_task, task, service) + action.result_helper = _find_action_result_helper(db_task, action) return action @@ -44,33 +48,33 @@ def _get_mapping(): def _find_action_result_helper(task, action): try: - return task['service_dsl']['actions'][action.name].get('output', {}) + return task['service_spec']['actions'][action.name].get('output', {}) except (KeyError, AttributeError): return {} -def get_echo_action(task): - action_type = a_h.get_action_type(task) - action_name = task['task_dsl']['action'].split(':')[1] +def get_echo_action(db_task, task, service): + action_type = service.type + action_name = task.get_action_name() - output = task['service_dsl']['actions'][action_name].get('output', {}) + output = service.actions.get(action_name).output return actions.EchoAction(action_type, action_name, output=output) -def get_rest_action(task): - action_type = a_h.get_action_type(task) - action_name = task['task_dsl']['action'].split(':')[1] - action_dsl = task['service_dsl']['actions'][action_name] - task_params = task['task_dsl'].get('parameters', {}) - url = task['service_dsl']['parameters']['baseUrl'] +\ - action_dsl['parameters']['url'] +def get_rest_action(db_task, task, service): + action_type = service.type + action_name = task.get_action_name() + action = service.actions.get(action_name) + task_params = task.parameters + url = service.parameters['baseUrl'] +\ + action.parameters['url'] headers = {} - headers.update(task['task_dsl'].get('headers', {})) - headers.update(action_dsl.get('headers', {})) + headers.update(task.parameters.get('headers', {})) + headers.update(action.parameters.get('headers', {})) - method = action_dsl['parameters'].get('method', "GET") + method = action.parameters.get('method', "GET") # input_yaql = task.get('input') # TODO(nmakhotkin) extract input from context with the YAQL expression @@ -87,25 +91,26 @@ def get_rest_action(task): headers=headers, data=task_data) -def get_mistral_rest_action(task): +def get_mistral_rest_action(db_task, task, service): mistral_headers = { - 'Mistral-Workbook-Name': task['workbook_name'], - 'Mistral-Execution-Id': task['execution_id'], - 'Mistral-Task-Id': task['id'], + 'Mistral-Workbook-Name': db_task['workbook_name'], + 'Mistral-Execution-Id': db_task['execution_id'], + 'Mistral-Task-Id': db_task['id'], } - action = get_rest_action(task) + action = get_rest_action(db_task, task, service) action.headers.update(mistral_headers) return action -def get_amqp_action(task): - action_type = a_h.get_action_type(task) - action_name = task['task_dsl']['action'].split(':')[1] - action_params = task['service_dsl']['actions'][action_name]['parameters'] - task_params = task['task_dsl'].get('parameters', {}) - service_parameters = task['service_dsl'].get('parameters', {}) +def get_amqp_action(db_task, task, service): + action_type = service.type + action_name = task.get_action_name() + action = service.actions.get(action_name) + action_params = action.parameters + task_params = task.parameters + service_parameters = service.parameters host = service_parameters['host'] port = service_parameters.get('port') @@ -117,18 +122,18 @@ def get_amqp_action(task): exchange = action_params.get('exchange') queue_name = action_params['queue_name'] - return actions.OsloRPCAction(action_type, host, userid, password, - virtual_host, message, routing_key, port, - exchange, queue_name) + return actions.OsloRPCAction(action_type, action_name, host, userid, + password, virtual_host, message, routing_key, + port, exchange, queue_name) -def get_send_email_action(task): +def get_send_email_action(db_task, task, service): #TODO(dzimine): Refactor action_type and action_name settings # for all actions - action_type = a_h.get_action_type(task) - action_name = task['task_dsl']['action'].split(':')[1] - task_params = task['task_dsl'].get('parameters', {}) - service_params = task['service_dsl'].get('parameters', {}) + action_type = service.type + action_name = task.get_action_name() + task_params = task.parameters + service_params = service.parameters return actions.SendEmailAction(action_type, action_name, task_params, service_params) diff --git a/mistral/engine/actions/action_helper.py b/mistral/engine/actions/action_helper.py index 620f5f4b0..66a23de11 100644 --- a/mistral/engine/actions/action_helper.py +++ b/mistral/engine/actions/action_helper.py @@ -18,7 +18,7 @@ from mistral.engine.actions import action_types as a_t def get_action_type(task): - return task['service_dsl']['type'] + return task['service_spec']['type'] def is_task_synchronous(task): diff --git a/mistral/engine/data_flow.py b/mistral/engine/data_flow.py index 5bd1d66a9..4ebda5299 100644 --- a/mistral/engine/data_flow.py +++ b/mistral/engine/data_flow.py @@ -25,7 +25,7 @@ LOG = logging.getLogger(__name__) def evaluate_task_input(task, context): res = {} - params = task['task_dsl'].get('input', {}) + params = task['task_spec'].get('input', {}) if not params: return res @@ -54,7 +54,7 @@ def prepare_tasks(tasks, context): def get_task_output(task, result): - vars_to_publish = task['task_dsl'].get('publish') + vars_to_publish = task['task_spec'].get('publish') output = {} diff --git a/mistral/engine/scalable/executor/server.py b/mistral/engine/scalable/executor/server.py index 2d3af13da..47e687fcb 100644 --- a/mistral/engine/scalable/executor/server.py +++ b/mistral/engine/scalable/executor/server.py @@ -36,8 +36,8 @@ class Executor(object): """ LOG.info("Starting task action [task_id=%s, " "action='%s', service='%s'" % - (task['id'], task['task_dsl']['action'], - task['service_dsl'])) + (task['id'], task['task_spec']['action'], + task['service_spec'])) action = a_f.create_action(task) diff --git a/mistral/engine/workflow.py b/mistral/engine/workflow.py index 02c89471c..406800c1e 100644 --- a/mistral/engine/workflow.py +++ b/mistral/engine/workflow.py @@ -24,20 +24,18 @@ from mistral.openstack.common import log as logging LOG = logging.getLogger(__name__) -def find_workflow_tasks(wb_dsl, task_name): - dsl_tasks = wb_dsl.get_tasks() +def find_workflow_tasks(workbook, task_name): + wb_tasks = workbook.tasks full_graph = nx.DiGraph() - for t in dsl_tasks: + for t in wb_tasks: full_graph.add_node(t) - _update_dependencies(dsl_tasks, full_graph) + _update_dependencies(wb_tasks, full_graph) graph = _get_subgraph(full_graph, task_name) tasks = [] for node in graph: - task = {'name': node} - task.update(dsl_tasks[node]) - tasks.append(task) + tasks.append(wb_tasks[node]) return tasks @@ -55,45 +53,44 @@ def _get_checked_tasks(target_tasks): return checked_tasks -def _get_tasks_to_schedule(target_tasks, wb_dsl): +def _get_tasks_to_schedule(target_tasks, workbook): tasks_to_schedule = _get_checked_tasks(target_tasks) - return [wb_dsl.get_task(t_name) for t_name in tasks_to_schedule] + return [workbook.tasks.get(t_name) for t_name in tasks_to_schedule] -def find_tasks_after_completion(task, wb_dsl): +def find_tasks_after_completion(task, workbook): """Determine tasks which should be scheduled after completing given task. Expression 'on_finish' is not mutually exclusive to 'on_success' and 'on_error'. :param task: Task object - :param wb_dsl: DSL Parser - :return: list of DSL tasks. + :param workbook: Workbook Entity + :return: list of task dictionaries. """ state = task['state'] found_tasks = [] LOG.debug("Recieved task %s: %s" % (task['name'], state)) if state == states.ERROR: - tasks_on_error = wb_dsl.get_task_on_error(task['name']) + tasks_on_error = workbook.tasks.get(task['name']).get_on_error() if tasks_on_error: - found_tasks = _get_tasks_to_schedule(tasks_on_error, wb_dsl) + found_tasks = _get_tasks_to_schedule(tasks_on_error, workbook) elif state == states.SUCCESS: - tasks_on_success = wb_dsl.get_task_on_success(task['name']) + tasks_on_success = workbook.tasks.get(task['name']).get_on_success() if tasks_on_success: - found_tasks = _get_tasks_to_schedule(tasks_on_success, wb_dsl) + found_tasks = _get_tasks_to_schedule(tasks_on_success, workbook) if states.is_finished(state): - tasks_on_finish = wb_dsl.get_task_on_finish(task['name']) + tasks_on_finish = workbook.tasks.get(task['name']).get_on_finish() if tasks_on_finish: - found_tasks += _get_tasks_to_schedule(tasks_on_finish, wb_dsl) + found_tasks += _get_tasks_to_schedule(tasks_on_finish, workbook) LOG.debug("Found tasks: %s" % found_tasks) workflow_tasks = [] for t in found_tasks: - workflow_tasks += find_workflow_tasks(wb_dsl, t['name']) - + workflow_tasks += find_workflow_tasks(workbook, t.name) LOG.debug("Workflow tasks to schedule: %s" % workflow_tasks) return workflow_tasks @@ -120,12 +117,12 @@ def _get_subgraph(full_graph, task_name): def _get_dependency_tasks(tasks, task): - if 'requires' not in tasks[task]: + if len(tasks[task].requires) < 1: return [] deps = set() for t in tasks: - for dep in tasks[task]['requires']: + for dep in tasks[task].requires: if dep == t: deps.add(t) diff --git a/mistral/exceptions.py b/mistral/exceptions.py index 0bc40b010..7461931b0 100644 --- a/mistral/exceptions.py +++ b/mistral/exceptions.py @@ -85,3 +85,13 @@ class ApplicationContextNotFoundException(MistralException): super(ApplicationContextNotFoundException, self).__init__(message) if message: self.message = message + + +class InvalidModelException(MistralException): + message = "Wrong entity definition" + code = "INVALID_MODEL_EXCEPTION" + + def __init__(self, message=None): + super(InvalidModelException, self).__init__(message) + if message: + self.message = message diff --git a/mistral/services/periodic.py b/mistral/services/periodic.py index 7d773b2f9..b45331174 100644 --- a/mistral/services/periodic.py +++ b/mistral/services/periodic.py @@ -21,7 +21,7 @@ from mistral.openstack.common import log from mistral.openstack.common import periodic_task from mistral.openstack.common import threadgroup from mistral import context -from mistral import dsl +from mistral import dsl_parser as parser from mistral.services import scheduler as sched from mistral.services import trusts @@ -38,7 +38,7 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks): context.set_ctx(trusts.create_context(wb)) try: - task = dsl.Parser( + task = parser.get_workbook( wb['definition']).get_event_task_name(event['name']) engine.start_workflow_execution(wb['name'], task) finally: diff --git a/mistral/services/scheduler.py b/mistral/services/scheduler.py index 9628ecf6f..470e4ccb0 100644 --- a/mistral/services/scheduler.py +++ b/mistral/services/scheduler.py @@ -18,7 +18,7 @@ from croniter import croniter from datetime import datetime from datetime import timedelta from mistral.db import api as db_api -from mistral import dsl +from mistral import dsl_parser as parser def get_next_events(): @@ -50,30 +50,30 @@ def create_event(name, pattern, workbook_name, start_time=None): }) -def create_associated_events(workbook): - if not workbook['definition']: +def create_associated_events(db_workbook): + if not db_workbook['definition']: return - parser = dsl.Parser(workbook['definition']) - dsl_events = parser.get_events() + workbook = parser.get_workbook(db_workbook['definition']) + triggers = workbook.get_triggers() # Prepare all events data in advance to make db transaction shorter. - events = [] + db_triggers = [] - for e in dsl_events: + for e in triggers: pattern = e['parameters']['cron-pattern'] next_time = _get_next_execution_time(pattern, datetime.now()) - events.append({ + db_triggers.append({ "name": e['name'], "pattern": pattern, "next_execution_time": next_time, - "workbook_name": workbook['name'] + "workbook_name": db_workbook['name'] }) db_api.start_tx() try: - for e in events: + for e in db_triggers: db_api.event_create(e) db_api.commit_tx() diff --git a/mistral/tests/api/v1/controllers/test_workbook_definition.py b/mistral/tests/api/v1/controllers/test_workbook_definition.py index 850cd390e..6eea7b1e6 100644 --- a/mistral/tests/api/v1/controllers/test_workbook_definition.py +++ b/mistral/tests/api/v1/controllers/test_workbook_definition.py @@ -24,13 +24,23 @@ from mistral.db import api as db_api DEFINITION = "my definition" NEW_DEFINITION = """ +Services: + Service: + type: + actions: + action: + parameters: Workflow: - events: - create-vms: - type: periodic - tasks: create-vms - parameters: - cron-pattern: "* * * * *" + tasks: + task1: + parameters: + action: Service:action +triggers: + create-vms: + type: periodic + tasks: create-vms + parameters: + cron-pattern: "* * * * *" """ diff --git a/mistral/tests/resources/test_amqp.yaml b/mistral/tests/resources/test_amqp.yaml index 7c6d3f5de..583687fae 100644 --- a/mistral/tests/resources/test_amqp.yaml +++ b/mistral/tests/resources/test_amqp.yaml @@ -29,9 +29,9 @@ Workflow: routing_key: my_key message: MyMessage - events: - backup-vm: - type: periodic - tasks: send-messages - parameters: - cron-pattern: 1 0 * * * \ No newline at end of file +triggers: + backup-vm: + type: periodic + tasks: send-messages + parameters: + cron-pattern: 1 0 * * * \ No newline at end of file diff --git a/mistral/tests/resources/test_rest.yaml b/mistral/tests/resources/test_rest.yaml index b485179c7..c8b093242 100644 --- a/mistral/tests/resources/test_rest.yaml +++ b/mistral/tests/resources/test_rest.yaml @@ -93,9 +93,9 @@ Workflow: on-finish: create-vms - events: - create-vms: - type: periodic - tasks: create-vms - parameters: - cron-pattern: "* * * * *" +triggers: + create-vms: + type: periodic + tasks: create-vms + parameters: + cron-pattern: "* * * * *" diff --git a/mistral/tests/unit/db/test_sqlalchemy_db_api.py b/mistral/tests/unit/db/test_sqlalchemy_db_api.py index 2a9b609a5..760965ce1 100644 --- a/mistral/tests/unit/db/test_sqlalchemy_db_api.py +++ b/mistral/tests/unit/db/test_sqlalchemy_db_api.py @@ -223,8 +223,8 @@ TASKS = [ 'name': 'my_task1', 'description': 'my description', 'requires': {'my_task2': '', 'my_task3': ''}, - 'task_dsl': None, - 'service_dsl': None, + 'task_spec': None, + 'service_spec': None, 'action': {'name': 'Nova:create-vm'}, 'state': 'IDLE', 'tags': ['deployment'], @@ -240,8 +240,8 @@ TASKS = [ 'name': 'my_task2', 'description': 'my description', 'requires': {'my_task4': '', 'my_task5': ''}, - 'task_dsl': None, - 'service_dsl': None, + 'task_spec': None, + 'service_spec': None, 'action': {'name': 'Cinder:create-volume'}, 'state': 'IDLE', 'tags': ['deployment'], diff --git a/mistral/tests/unit/engine/actions/test_action_factory.py b/mistral/tests/unit/engine/actions/test_action_factory.py index 3c0adabcc..3674f3baf 100644 --- a/mistral/tests/unit/engine/actions/test_action_factory.py +++ b/mistral/tests/unit/engine/actions/test_action_factory.py @@ -21,16 +21,17 @@ from mistral.engine.actions import action_types SAMPLE_TASK = { - 'task_dsl': { + 'task_spec': { 'action': 'MyRest:create-vm', 'parameters': { 'a': 'b' }, 'headers': { 'Cookie': 'abc' - } + }, + 'name': 'create-vms' }, - 'service_dsl': { + 'service_spec': { 'parameters': { 'baseUrl': 'http://some_host' }, @@ -40,11 +41,14 @@ SAMPLE_TASK = { 'url': '/task1' } } - } + }, + 'type': 'REST_API', + 'name': 'MyRest' }, 'workbook_name': 'wb', 'execution_id': '1234', - 'id': '123' + 'id': '123', + 'name': 'create-vms' } SAMPLE_SEND_EMAIL_TASK = { @@ -55,7 +59,7 @@ SAMPLE_SEND_EMAIL_TASK = { 'id': '800f52c4-1ba9-45ac-ba81-c4d2a7863738', 'execution_id': '645f042f-09cb-43ca-bee7-94f592409a7d', 'state': 'IDLE', - 'service_dsl': { + 'service_spec': { 'type': "SEND_EMAIL", 'parameters': { 'smtp_server': "localhost:25", @@ -64,10 +68,11 @@ SAMPLE_SEND_EMAIL_TASK = { # password: None }, 'actions': { - 'send_email': '' - } + 'send_email': {} + }, + 'name': 'send_email' }, - 'task_dsl': { + 'task_spec': { 'name': 'backup_user_data', 'parameters': { 'to': ["dz@example.com, deg@example.com", "xyz@example.com"], @@ -96,7 +101,7 @@ SAMPLE_RESULT = { class ActionFactoryTest(unittest2.TestCase): def test_get_mistral_rest(self): task = dict(SAMPLE_TASK) - task['service_dsl'].update({'type': action_types.MISTRAL_REST_API}) + task['service_spec'].update({'type': action_types.MISTRAL_REST_API}) action = action_factory.create_action(task) self.assertIn("Mistral-Workbook-Name", action.headers) @@ -104,7 +109,7 @@ class ActionFactoryTest(unittest2.TestCase): def test_get_rest(self): task = dict(SAMPLE_TASK) - task['service_dsl'].update({'type': action_types.REST_API}) + task['service_spec'].update({'type': action_types.REST_API}) action = action_factory.create_action(task) self.assertNotIn("Mistral-Workbook-Name", action.headers) @@ -117,7 +122,7 @@ class ActionFactoryTest(unittest2.TestCase): #NOTE(dzimine): Implement parameter validation in action, # and this will be the only validation we need. # Smoke-test one from task and one from service - for email in task['task_dsl']['parameters']['to']: + for email in task['task_spec']['parameters']['to']: self.assertIn(email, action.to) - self.assertEqual(task['service_dsl']['parameters']['smtp_server'], + self.assertEqual(task['service_spec']['parameters']['smtp_server'], action.smtp_server) diff --git a/mistral/tests/unit/engine/scalable/test_executor.py b/mistral/tests/unit/engine/scalable/test_executor.py index ef5456170..eb625c67e 100644 --- a/mistral/tests/unit/engine/scalable/test_executor.py +++ b/mistral/tests/unit/engine/scalable/test_executor.py @@ -39,7 +39,7 @@ SAMPLE_WORKBOOK = { 'id': str(uuid.uuid4()), 'name': WORKBOOK_NAME, 'description': 'my description', - 'definition': '{}', + 'definition': base.get_resource("test_rest.yaml"), 'tags': [], 'scope': 'public', 'updated_at': None, @@ -59,7 +59,7 @@ SAMPLE_EXECUTION = { SAMPLE_TASK = { 'name': TASK_NAME, 'workbook_name': WORKBOOK_NAME, - 'service_dsl': { + 'service_spec': { 'type': action_types.REST_API, 'parameters': { 'baseUrl': 'http://localhost:8989/v1/'}, @@ -67,8 +67,10 @@ SAMPLE_TASK = { 'my-action': { 'parameters': { 'url': 'workbooks', - 'method': 'GET'}}}}, - 'task_dsl': { + 'method': 'GET'}}}, + 'name': 'MyService' + }, + 'task_spec': { 'action': 'MyRest:my-action', 'service_name': 'MyRest', 'name': TASK_NAME}, @@ -109,6 +111,7 @@ class TestExecutor(base.DbTestCase): def setUp(self): # Initialize configuration for the ExecutorClient. + super(TestExecutor, self).setUp() if not 'executor' in cfg.CONF: cfg_grp = cfg.OptGroup(name='executor', title='Executor options') opts = [cfg.StrOpt('host', default='0.0.0.0'), @@ -124,8 +127,6 @@ class TestExecutor(base.DbTestCase): endpoints, executor='eventlet') self.server.start() - super(TestExecutor, self).setUp() - def tearDown(self): # Stop the Executor. if self.server: diff --git a/mistral/tests/unit/engine/test_data_flow_module.py b/mistral/tests/unit/engine/test_data_flow_module.py index ca8ef1134..8e4e3880e 100644 --- a/mistral/tests/unit/engine/test_data_flow_module.py +++ b/mistral/tests/unit/engine/test_data_flow_module.py @@ -38,7 +38,7 @@ TASK = { 'workbook_name': WB_NAME, 'execution_id': EXEC_ID, 'name': 'my_task', - 'task_dsl': { + 'task_spec': { 'input': { 'p1': 'My string', 'p2': '$.param3.param32' diff --git a/mistral/tests/unit/engine/test_workflow.py b/mistral/tests/unit/engine/test_workflow.py index 4fb97d4ef..efe60e1f8 100644 --- a/mistral/tests/unit/engine/test_workflow.py +++ b/mistral/tests/unit/engine/test_workflow.py @@ -16,9 +16,9 @@ import pkg_resources as pkg -from mistral import dsl from mistral import version from mistral.tests import base +from mistral import dsl_parser as parser from mistral.engine import states from mistral.engine import workflow @@ -47,11 +47,11 @@ class WorkflowTest(base.DbTestCase): self.doc = open(pkg.resource_filename( version.version_info.package, "tests/resources/test_rest.yaml")).read() - self.parser = dsl.Parser(self.doc) + self.parser = parser.get_workbook(self.doc) def test_find_workflow_tasks(self): tasks = workflow.find_workflow_tasks(self.parser, "attach-volumes") - self.assertEqual(tasks[1]['name'], 'create-vms') + self.assertEqual(tasks[1].name, 'create-vms') def test_tasks_to_start(self): tasks_to_start = workflow.find_resolved_tasks(TASKS) diff --git a/mistral/tests/unit/events/test_events.py b/mistral/tests/unit/events/test_events.py index 20e5d52e1..19ccb6663 100644 --- a/mistral/tests/unit/events/test_events.py +++ b/mistral/tests/unit/events/test_events.py @@ -17,7 +17,6 @@ import pkg_resources as pkg from mistral.db import api as db_api -from mistral import dsl from mistral.tests import base from mistral import version from mistral.services import scheduler @@ -29,7 +28,6 @@ class EventsTest(base.DbTestCase): self.doc = open(pkg.resource_filename( version.version_info.package, "tests/resources/test_rest.yaml")).read() - self.dsl = dsl.Parser(self.doc) def test_create_associated_events(self): workbook = { diff --git a/mistral/tests/unit/test_parser.py b/mistral/tests/unit/test_parser.py deleted file mode 100644 index 85a324851..000000000 --- a/mistral/tests/unit/test_parser.py +++ /dev/null @@ -1,81 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright 2013 - Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pkg_resources as pkg -import unittest2 - -from mistral import dsl -from mistral import version - - -class DSLParserTest(unittest2.TestCase): - def setUp(self): - doc = open(pkg.resource_filename( - version.version_info.package, - "tests/resources/test_rest.yaml")).read() - self.dsl = dsl.Parser(doc) - - def test_services(self): - service = self.dsl.get_service("MyRest") - self.assertEqual(service["type"], "MISTRAL_REST_API") - self.assertIn("baseUrl", service["parameters"]) - services = self.dsl.get_services() - self.assertEqual(len(services), 2) - service_names = self.dsl.get_service_names() - self.assertEqual(service_names[0], "MyRest") - - def test_events(self): - events = self.dsl.get_events() - self.assertIn("create-vms", events[0]['name']) - - event_task_name = self.dsl.get_event_task_name("create-vms") - self.assertEqual(event_task_name, "create-vms") - event_task_name = self.dsl.get_event_task_name("not-valid") - self.assertEqual(event_task_name, "") - - def test_tasks(self): - tasks = self.dsl.get_tasks() - self.assertIn("create-vms", tasks) - self.assertIn("parameters", tasks["create-vms"]) - self.assertEqual(tasks["backup-vms"]["action"], - "MyRest:backup-vm") - attach_parameters = self.dsl.get_task_dsl_property("attach-volumes", - "parameters") - self.assertIn("size", attach_parameters) - self.assertIn("mnt_path", attach_parameters) - task = self.dsl.get_task("not-valid-name") - self.assertEqual(task, {}) - - def test_task_property(self): - on_success = self.dsl.get_task_on_success("test_subsequent") - self.assertEqual(on_success, {"attach-volumes": ''}) - on_error = self.dsl.get_task_on_error("test_subsequent") - self.assertEqual(on_error, {"backup-vms": "$.status != 'OK'"}) - - def test_actions(self): - action = self.dsl.get_action("MyRest:attach-volume") - self.assertIn("method", action["parameters"]) - actions = self.dsl.get_actions("MyRest") - self.assertIn("task-parameters", actions["attach-volume"]) - - def test_broken_definition(self): - broken_yaml = """ - Workflow: - [tasks: - create-vms/: - - """ - self.assertRaises(RuntimeError, dsl.Parser, broken_yaml) diff --git a/mistral/tests/unit/workbook/__init__.py b/mistral/tests/unit/workbook/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/tests/unit/workbook/test_workbook.py b/mistral/tests/unit/workbook/test_workbook.py new file mode 100644 index 000000000..e2ffb9234 --- /dev/null +++ b/mistral/tests/unit/workbook/test_workbook.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pkg_resources as pkg +import unittest2 + +from mistral.engine.actions import action_types as a_t +from mistral import dsl_parser as parser +from mistral import version + + +class DSLModelTest(unittest2.TestCase): + def setUp(self): + self.doc = open(pkg.resource_filename( + version.version_info.package, + "tests/resources/test_rest.yaml")).read() + + def test_load_dsl(self): + self.workbook = parser.get_workbook(self.doc) + self.assertEqual(self.workbook.workflow.tasks.items, + self.workbook.tasks.items) + self.assertEqual(self.workbook.tasks.get("create-vms").name, + "create-vms") + self.assertEqual(self.workbook.services.get("MyRest").type, + "MISTRAL_REST_API") + + def test_tasks(self): + self.workbook = parser.get_workbook(self.doc) + self.assertEqual(len(self.workbook.tasks), 6) + attach_volumes = self.workbook.tasks.get("attach-volumes") + self.assertEqual(attach_volumes.get_action_service(), "MyRest") + t_parameters = {"image_id": 1234, "flavor_id": 2} + create_vm_nova = self.workbook.tasks.get("create-vm-nova") + self.assertEqual(create_vm_nova.parameters, t_parameters) + attach_requires = {"create-vms": ''} + self.assertEqual(attach_volumes.requires, attach_requires) + subsequent = self.workbook.tasks.get("test_subsequent") + subseq_success = subsequent.get_on_success() + subseq_error = subsequent.get_on_error() + subseq_finish = subsequent.get_on_finish() + self.assertEqual(subseq_success, {"attach-volumes": ''}) + self.assertEqual(subseq_error, {"backup-vms": "$.status != 'OK'"}) + self.assertEqual(subseq_finish, {"create-vms": ''}) + + def test_actions(self): + self.workbook = parser.get_workbook(self.doc) + actions = self.workbook.services.get("MyRest").actions + self.assertEqual(len(actions), 4) + create_vm = actions.get("create-vm") + self.assertIn('method', create_vm.parameters) + + def test_services(self): + self.workbook = parser.get_workbook(self.doc) + services = self.workbook.services + self.assertEqual(len(services), 2) + nova_service = services.get("Nova") + self.assertEqual(nova_service.type, a_t.REST_API) + self.assertIn("baseUrl", nova_service.parameters) + + def test_triggers(self): + self.workbook = parser.get_workbook(self.doc) + triggers = self.workbook.get_triggers() + self.assertEqual(len(triggers), 1) diff --git a/mistral/workbook/__init__.py b/mistral/workbook/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/workbook/actions.py b/mistral/workbook/actions.py new file mode 100644 index 000000000..c26bd92fc --- /dev/null +++ b/mistral/workbook/actions.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.workbook import base + + +class ActionSpec(base.BaseSpec): + _required_keys = ['name'] + + def __init__(self, action): + super(ActionSpec, self).__init__(action) + if self.validate(): + self.name = action['name'] + self.parameters = action.get('parameters', {}) + self.input = action.get('input', {}) + self.output = action.get('output', {}) + self.task_parameters = action.get('task-parameters', {}) + + +class ActionSpecList(base.BaseSpecList): + item_class = ActionSpec diff --git a/mistral/workbook/base.py b/mistral/workbook/base.py new file mode 100644 index 000000000..ac8f5ae9d --- /dev/null +++ b/mistral/workbook/base.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections + +from mistral import exceptions + + +class BaseSpec(object): + _required_keys = [] + + def __init__(self, data): + self._data = data + + def validate(self): + if not all(k in self._data for k in self._required_keys): + message = ("Wrong model definition for: %s. It should contain" + " required keys: %s" % (self.__class__.__name__, + self._required_keys)) + raise exceptions.InvalidModelException(message) + return True + + def to_dict(self): + return self._data + + +class BaseSpecList(object): + item_class = None + + def __init__(self, data): + self.items = collections.OrderedDict() + for k, v in data.items(): + item = data[k] + item.update({'name': k}) + self.items.update({k: self.item_class(item)}) + for name in self: + self.get(name).validate() + + def __iter__(self): + return iter(self.items) + + def __getitem__(self, name): + return self.items.get(name) + + def __len__(self): + return len(self.items) + + def get(self, name): + return self.__getitem__(name) diff --git a/mistral/workbook/services.py b/mistral/workbook/services.py new file mode 100644 index 000000000..f2125860f --- /dev/null +++ b/mistral/workbook/services.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.workbook import actions +from mistral.workbook import base + + +class ServiceSpec(base.BaseSpec): + _required_keys = ['name', 'type', 'actions'] + + def __init__(self, service): + super(ServiceSpec, self).__init__(service) + if self.validate(): + self.type = service['type'] + self.name = service['name'] + self.parameters = service.get('parameters', {}) + self.actions = actions.ActionSpecList(service['actions']) + + +class ServiceSpecList(base.BaseSpecList): + item_class = ServiceSpec diff --git a/mistral/workbook/tasks.py b/mistral/workbook/tasks.py new file mode 100644 index 000000000..e471a954a --- /dev/null +++ b/mistral/workbook/tasks.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from mistral.workbook import base + + +class TaskSpec(base.BaseSpec): + _required_keys = ['name', 'action'] + + def __init__(self, task): + super(TaskSpec, self).__init__(task) + self._prepare(task) + if self.validate(): + self.requires = task['requires'] + self.action = task['action'] + self.name = task['name'] + self.parameters = task.get('parameters', {}) + + def _prepare(self, task): + if task: + req = task.get("requires", {}) + if req and isinstance(req, list): + task["requires"] = dict(zip(req, ['']*len(req))) + elif isinstance(req, dict): + task['requires'] = req + + def get_property(self, property_name, default=None): + return self._data.get(property_name, default) + + def get_on_error(self): + task = self.get_property("on-error") + if task: + return task if isinstance(task, dict) else {task: ''} + return None + + def get_on_success(self): + task = self.get_property("on-success") + if task: + return task if isinstance(task, dict) else {task: ''} + return None + + def get_on_finish(self): + task = self.get_property("on-finish") + if task: + return task if isinstance(task, dict) else {task: ''} + return None + + def get_action_service(self): + return self.action.split(':')[0] + + def get_action_name(self): + return self.action.split(':')[1] + + +class TaskSpecList(base.BaseSpecList): + item_class = TaskSpec diff --git a/mistral/workbook/workbook.py b/mistral/workbook/workbook.py new file mode 100644 index 000000000..b1f67bf99 --- /dev/null +++ b/mistral/workbook/workbook.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.workbook import base +from mistral.workbook import services +from mistral.workbook import workflow + + +class WorkbookSpec(base.BaseSpec): + _required_keys = ['Services', 'Workflow'] + + def __init__(self, doc): + super(WorkbookSpec, self).__init__(doc) + if self.validate(): + self.services = services.ServiceSpecList(self._data['Services']) + self.workflow = workflow.WorkflowSpec(self._data['Workflow']) + self.tasks = self.workflow.tasks + + def get_triggers(self): + triggers_from_data = self._data.get("triggers", None) + if not triggers_from_data: + return [] + triggers = [] + for name in triggers_from_data: + trigger_dict = {'name': name} + trigger_dict.update(triggers_from_data[name]) + triggers.append(trigger_dict) + return triggers + + def get_action(self, task_action_name): + if task_action_name.find(":") == -1: + return {} + service_name = task_action_name.split(':')[0] + action_name = task_action_name.split(':')[1] + action = self.services.get(service_name).actions.get(action_name) + return action + + def get_actions(self, service_name): + return self.services.get(service_name).actions + + def get_event_task_name(self, event_name): + event = self._data["Workflow"]["events"].get(event_name) + return event.get('tasks') if event else "" diff --git a/mistral/workbook/workflow.py b/mistral/workbook/workflow.py new file mode 100644 index 000000000..f670fd5c3 --- /dev/null +++ b/mistral/workbook/workflow.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.workbook import base +from mistral.workbook import tasks + + +class WorkflowSpec(base.BaseSpec): + _required_keys = ['tasks'] + + def __init__(self, workflow): + super(WorkflowSpec, self).__init__(workflow) + if self.validate(): + self.tasks = tasks.TaskSpecList(workflow['tasks'])