From 41e21c7616aab54372c4290767c2d97035a0cfef Mon Sep 17 00:00:00 2001 From: Nikolay Mahotkin Date: Thu, 12 Dec 2013 16:19:39 +0400 Subject: [PATCH] Add engine related features * Workflow in scalable engine * find tasks to run * find workflow tasks * Added unit tests * Fix events test Change-Id: Ib3a97f976b101a68cbbde9d2117f5b2ca5eab5cd --- mistral/dsl.py | 28 +++++-- mistral/engine/scalable/workflow.py | 72 ++++++++++++++--- mistral/tests/resources/test_rest.yaml | 80 +++++++++---------- mistral/tests/unit/db/test_events.py | 3 +- .../tests/unit/engine/scalable/__init__.py | 0 .../unit/engine/scalable/test_workflow.py | 57 +++++++++++++ mistral/tests/unit/test_parser.py | 15 +++- 7 files changed, 195 insertions(+), 60 deletions(-) create mode 100644 mistral/tests/unit/engine/scalable/__init__.py create mode 100644 mistral/tests/unit/engine/scalable/test_workflow.py diff --git a/mistral/dsl.py b/mistral/dsl.py index ee74c26a..cf2a4252 100644 --- a/mistral/dsl.py +++ b/mistral/dsl.py @@ -31,8 +31,14 @@ class Parser(object): raise RuntimeError("Definition could not be parsed: %s\n" % exc.message) - def get_service(self): - return self.doc["Service"] + def get_services(self): + services = [] + for service_name in self.doc["Services"]: + services.append(self.doc["Services"][service_name]) + return services + + def get_service(self, service_name): + return self.doc["Services"][service_name] def get_events(self): events_from_doc = self.doc["Workflow"]["events"] @@ -46,12 +52,20 @@ class Parser(object): def get_tasks(self): return self.doc["Workflow"]["tasks"] - def get_action(self, action_name): - # TODO(rakhmerov): it needs to return action definition as a dict - pass + def get_action(self, task_action_name): + service_name = task_action_name.split(':')[0] + action_name = task_action_name.split(':')[1] + action = self.get_service(service_name)['actions'][action_name] + return action - def get_service_name(self): - return self.doc['Service']['name'] + def get_actions(self, service_name): + return self.get_service(service_name)['actions'] + + def get_service_names(self): + names = [] + for name in self.doc['Services']: + names.append(name) + return names def get_event_task_name(self, event_name): return self.doc["Workflow"]["events"][event_name]['tasks'] diff --git a/mistral/engine/scalable/workflow.py b/mistral/engine/scalable/workflow.py index 1591611e..b38d85a1 100644 --- a/mistral/engine/scalable/workflow.py +++ b/mistral/engine/scalable/workflow.py @@ -14,18 +14,72 @@ # See the License for the specific language governing permissions and # limitations under the License. +import networkx as nx +from networkx.algorithms import traversal + +from mistral.engine import states + def find_workflow_tasks(wb_dsl, target_task_name): - # TODO(rakhmerov): implement using networkX - return None - - -def find_tasks_to_start(tasks): - # TODO(rakhmerov): implement using networkX - # We need to analyse graph and see which tasks are ready to start + dsl_tasks = wb_dsl.get_tasks() + full_graph = nx.DiGraph() + for t in dsl_tasks: + full_graph.add_node(t) + _update_dependencies(dsl_tasks, full_graph) + graph = _get_subgraph(full_graph, target_task_name) + tasks = [] + for node in graph: + task = {'name': node} + task.update(dsl_tasks[node]) + tasks.append(task) return tasks +def find_tasks_to_start(tasks): + # We need to analyse graph and see which tasks are ready to start + return _get_resolved_tasks(tasks) + + def is_finished(tasks): - # TODO(rakhmerov): implement - return False + for task in tasks: + if not states.is_finished(task['state']): + return False + return True + + +def _get_subgraph(full_graph, task_name): + nodes_set = traversal.dfs_predecessors(full_graph.reverse(), + task_name).keys() + nodes_set.append(task_name) + return full_graph.subgraph(nodes_set) + + +def _get_dependency_tasks(tasks, task): + if 'dependsOn' not in tasks[task]: + return [] + deps = set() + for t in tasks: + for dep in tasks[task]['dependsOn']: + if dep == t: + deps.add(t) + return deps + + +def _update_dependencies(tasks, graph): + for task in tasks: + for dep in _get_dependency_tasks(tasks, task): + graph.add_edge(dep, task) + + +def _get_resolved_tasks(tasks): + resolved_tasks = [] + allows = [] + for t in tasks: + if t['state'] == states.SUCCESS: + allows += t['dependencies'] + allow_set = set(allows) + for t in tasks: + if len(allow_set - set(t['dependencies'])) == 0: + if t['state'] == states.IDLE: + resolved_tasks.append(t) + return resolved_tasks diff --git a/mistral/tests/resources/test_rest.yaml b/mistral/tests/resources/test_rest.yaml index ab5a7883..dee517d9 100644 --- a/mistral/tests/resources/test_rest.yaml +++ b/mistral/tests/resources/test_rest.yaml @@ -1,43 +1,43 @@ -Service: - name: MyRest - type: REST_API - parameters: - baseUrl: http://some_host - actions: - create-vm: - parameters: - url: /service/action/execute - method: GET - task-parameters: - flavor_id: - optional: false - image_id: - optional: false - backup-vm: - parameters: - url: url_for_backup - method: GET - task-parameters: - server_id: - optional: false - attach-volume: - parameters: - url: url_for_attach - method: GET - task-parameters: - size: - optional: false - mnt_path: - optional: false - format-volume: - parameters: - url: url_for_format - method: GET - task-parameters: - volume_id: - optional: false - server_id: - optional: false +Services: + MyRest: + type: REST_API + parameters: + baseUrl: http://some_host + actions: + create-vm: + parameters: + url: /service/action/execute + method: GET + task-parameters: + flavor_id: + optional: false + image_id: + optional: false + backup-vm: + parameters: + url: url_for_backup + method: GET + task-parameters: + server_id: + optional: false + attach-volume: + parameters: + url: url_for_attach + method: GET + task-parameters: + size: + optional: false + mnt_path: + optional: false + format-volume: + parameters: + url: url_for_format + method: GET + task-parameters: + volume_id: + optional: false + server_id: + optional: false Workflow: diff --git a/mistral/tests/unit/db/test_events.py b/mistral/tests/unit/db/test_events.py index cb9938af..0581d66a 100644 --- a/mistral/tests/unit/db/test_events.py +++ b/mistral/tests/unit/db/test_events.py @@ -24,7 +24,8 @@ SAMPLE_EVENT = { "id": "123", "name": "test_event", "pattern": "* *", - "next_execution_time": timeutils.utcnow() + "next_execution_time": timeutils.utcnow(), + 'workbook_name': 'wb_name' } diff --git a/mistral/tests/unit/engine/scalable/__init__.py b/mistral/tests/unit/engine/scalable/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/tests/unit/engine/scalable/test_workflow.py b/mistral/tests/unit/engine/scalable/test_workflow.py new file mode 100644 index 00000000..479f3ccb --- /dev/null +++ b/mistral/tests/unit/engine/scalable/test_workflow.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pkg_resources as pkg + +from mistral import dsl +from mistral import version +from mistral.tests.unit import base +from mistral.engine import states +from mistral.engine.scalable import workflow + +TASKS = [ + { + 'dependencies': [], + 'name': 'backup-vms', + 'state': states.IDLE + }, + { + 'dependencies': [], + 'name': 'create-vms', + 'state': states.RUNNING + }, + { + 'dependencies': ['create-vms'], + 'name': 'attach-volume', + 'state': states.IDLE + } +] + + +class WorkflowTest(base.DbTestCase): + def setUp(self): + super(WorkflowTest, self).setUp() + self.doc = open(pkg.resource_filename( + version.version_info.package, + "tests/resources/test_rest.yaml")).read() + self.parser = dsl.Parser(self.doc) + + def test_find_workflow_tasks(self): + tasks = workflow.find_workflow_tasks(self.parser, "attach-volumes") + self.assertEqual(tasks[1]['name'], 'create-vms') + + def test_tasks_to_start(self): + tasks_to_start = workflow.find_tasks_to_start(TASKS) + self.assertEqual(len(tasks_to_start), 2) diff --git a/mistral/tests/unit/test_parser.py b/mistral/tests/unit/test_parser.py index 4609bd55..2100da06 100644 --- a/mistral/tests/unit/test_parser.py +++ b/mistral/tests/unit/test_parser.py @@ -27,11 +27,14 @@ class DSLParserTest(unittest2.TestCase): "tests/resources/test_rest.yaml")).read() self.dsl = dsl.Parser(doc) - def test_service(self): - service = self.dsl.get_service() - self.assertEqual(service["name"], "MyRest") + def test_services(self): + service = self.dsl.get_service("MyRest") self.assertEqual(service["type"], "REST_API") self.assertIn("baseUrl", service["parameters"]) + services = self.dsl.get_services() + self.assertEqual(len(services), 1) + service_names = self.dsl.get_service_names() + self.assertEqual(service_names[0], "MyRest") def test_events(self): events = self.dsl.get_events() @@ -44,6 +47,12 @@ class DSLParserTest(unittest2.TestCase): self.assertEqual(tasks["backup-vms"]["action"], "Nova:backup-vm") + def test_actions(self): + action = self.dsl.get_action("MyRest:attach-volume") + self.assertIn("method", action["parameters"]) + actions = self.dsl.get_actions("MyRest") + self.assertIn("task-parameters", actions["attach-volume"]) + def test_broken_definition(self): broken_yaml = """ Workflow: