diff --git a/taskflow/blocks/__init__.py b/taskflow/blocks/__init__.py new file mode 100644 index 00000000..8f523a6a --- /dev/null +++ b/taskflow/blocks/__init__.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Blocks define *structure* + +There are two categories of blocks: +- patterns, which provide convenient way to express basic flow + structure, like linear flow or parallel flow +- terminals, which run task or needed for housekeeping + +""" + +# Import most used blocks into this module namespace: +from taskflow.blocks.patterns import LinearFlow # noqa +from taskflow.blocks.patterns import ParallelFlow # noqa +from taskflow.blocks.task import Task # noqa diff --git a/taskflow/blocks/base.py b/taskflow/blocks/base.py new file mode 100644 index 00000000..1620546c --- /dev/null +++ b/taskflow/blocks/base.py @@ -0,0 +1,25 @@ + +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class Block(object): + """Basic flow structure unit + + From blocks the flow definition is build. + """ diff --git a/taskflow/blocks/patterns.py b/taskflow/blocks/patterns.py new file mode 100644 index 00000000..1bbfcc58 --- /dev/null +++ b/taskflow/blocks/patterns.py @@ -0,0 +1,57 @@ + +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from taskflow.blocks import base + + +class Pattern(base.Block): + """Base class for patterns that can contain nested blocks + + Patterns put child blocks into *structure*. + """ + + def __init__(self): + super(Pattern, self).__init__() + self._children = [] + + @property + def children(self): + return self._children + + def add(self, *children): + self._children.extend(children) + return self + + +class LinearFlow(Pattern): + """Linear (sequential) pattern + + Children of this pattern should be executed one after another, + in order. Every child implicitly depends on all the children + before it. + """ + + +class ParallelFlow(Pattern): + """Parallel (unordered) pattern + + Children of this pattern are independent, and thus can be + executed in any order or in parallel. + """ diff --git a/taskflow/blocks/task.py b/taskflow/blocks/task.py new file mode 100644 index 00000000..bb8ebb57 --- /dev/null +++ b/taskflow/blocks/task.py @@ -0,0 +1,47 @@ + +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Terminal blocks that actually run code +""" + +from taskflow.blocks import base +from taskflow.openstack.common import uuidutils + + +class Task(base.Block): + """A block that wraps a single task + + The task should be executed, and produced results should be saved. + """ + + def __init__(self, task, uuid=None): + super(Task, self).__init__() + self._task = task + if uuid is None: + self._id = uuidutils.generate_uuid() + else: + self._id = str(uuid) + + @property + def task(self): + return self._task + + @property + def uuid(self): + return self._id diff --git a/taskflow/engines/__init__.py b/taskflow/engines/__init__.py new file mode 100644 index 00000000..830dd2e7 --- /dev/null +++ b/taskflow/engines/__init__.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/taskflow/engines/action_engine/__init__.py b/taskflow/engines/action_engine/__init__.py new file mode 100644 index 00000000..830dd2e7 --- /dev/null +++ b/taskflow/engines/action_engine/__init__.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/taskflow/engines/action_engine/base_action.py b/taskflow/engines/action_engine/base_action.py new file mode 100644 index 00000000..97f1f7e4 --- /dev/null +++ b/taskflow/engines/action_engine/base_action.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import abc + + +class Action(object): + """Basic action class + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def execute(self, engine): + """Run the action""" + + @abc.abstractmethod + def revert(self, engine): + """Undo all side effects of execute method""" diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py new file mode 100644 index 00000000..e3886427 --- /dev/null +++ b/taskflow/engines/action_engine/engine.py @@ -0,0 +1,58 @@ + +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from taskflow.engines.action_engine import seq_action +from taskflow.engines.action_engine import task_action + +from taskflow import blocks +from taskflow import states + + +class ActionEngine(object): + """Generic action-based engine + + Converts the flow to recursive structure of actions. + """ + + def __init__(self, flow, action_map): + self._action_map = action_map + self._root = self._to_action(flow) + + def _to_action(self, pattern): + try: + factory = self._action_map[type(pattern)] + except KeyError: + raise ValueError('Action of unknown type: %s (type %s)' + % (pattern, type(pattern))) + return factory(pattern, self._to_action) + + def run(self): + status = self._root.execute(self) + if status == states.FAILURE: + self._root.revert(self) + + +class SingleThreadedActionEngine(ActionEngine): + def __init__(self, flow): + ActionEngine.__init__(self, flow, { + blocks.Task: task_action.TaskAction, + blocks.LinearFlow: seq_action.SequentialAction, + blocks.ParallelFlow: seq_action.SequentialAction + }) diff --git a/taskflow/engines/action_engine/seq_action.py b/taskflow/engines/action_engine/seq_action.py new file mode 100644 index 00000000..b68a96bd --- /dev/null +++ b/taskflow/engines/action_engine/seq_action.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from taskflow.engines.action_engine import base_action as base +from taskflow import states + + +class SequentialAction(base.Action): + + def __init__(self, pattern, to_action): + self._history = [] + self._actions = [to_action(pat) for pat in pattern.children] + + def execute(self, engine): + state = states.SUCCESS + for action in self._actions: + #TODO(imelnikov): save history to storage + self._history.append(action) + state = action.execute(engine) + if state != states.SUCCESS: + break + return state + + def revert(self, engine): + while self._history: + action = self._history[-1] + action.revert(engine) + self._history.pop() diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py new file mode 100644 index 00000000..efb2fd73 --- /dev/null +++ b/taskflow/engines/action_engine/task_action.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from taskflow.engines.action_engine import base_action as base +from taskflow import states + +import sys + + +class TaskAction(base.Action): + + def __init__(self, block, _to_action): + self._task = block.task + if isinstance(self._task, type): + self._task = self._task() + self.state = states.PENDING + + def execute(self, engine): + # TODO(imelnikov): notifications + self.state = states.RUNNING + try: + # TODO(imelnikov): pass only necessary args to task + self._task.execute() + except Exception: + # TODO(imelnikov): save exception information + print sys.exc_info() + self.state = states.FAILURE + else: + self.state = states.SUCCESS + return self.state + + def revert(self, engine): + if self.state == states.PENDING: # pragma: no cover + # NOTE(imelnikov): in all the other states, the task + # execution was at least attempted, so we should give + # task a chance for cleanup + return + try: + self._task.revert() + except Exception: + self.state = states.FAILURE + raise + else: + self.state = states.PENDING diff --git a/taskflow/patterns/distributed_flow.py b/taskflow/patterns/distributed_flow.py index e6699a2c..56ce2657 100644 --- a/taskflow/patterns/distributed_flow.py +++ b/taskflow/patterns/distributed_flow.py @@ -20,7 +20,6 @@ import celery import logging - LOG = logging.getLogger(__name__) diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py new file mode 100644 index 00000000..ef104399 --- /dev/null +++ b/taskflow/tests/unit/test_action_engine.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from taskflow import blocks +from taskflow import task +from taskflow import test + +from taskflow.engines.action_engine import engine as eng + + +class TestTask(task.Task): + + def __init__(self, values, name): + super(TestTask, self).__init__(name) + self.values = values + + def execute(self, **kwargs): + self.values.append(self.name) + return 5 + + def revert(self, **kwargs): + self.values.append(self.name + ' reverted') + + +class FailingTask(TestTask): + def execute(self, **kwargs): + raise RuntimeError('Woot!') + + +class NeverRunningTask(task.Task): + def execute(self, **kwargs): + assert False, 'This method should not be called' + + def revert(self, **kwargs): + assert False, 'This method should not be called' + + +class NastyTask(task.Task): + def execute(self, **kwargs): + pass + + def revert(self, **kwargs): + raise RuntimeError('Gotcha!') + + +class EngineTestBase(object): + def setUp(self): + super(EngineTestBase, self).setUp() + self.values = [] + + def _make_engine(self, _flow): + raise NotImplementedError() + + +class EngineTaskTest(EngineTestBase): + + def test_run_task_as_flow(self): + flow = blocks.Task(TestTask(self.values, name='task1')) + self._make_engine(flow).run() + self.assertEquals(self.values, ['task1']) + + def test_invalid_block_raises(self): + value = 'i am string, not block, sorry' + flow = blocks.LinearFlow().add(value) + with self.assertRaises(ValueError) as err: + self._make_engine(flow) + self.assertIn(value, str(err.exception)) + + +class EngineLinearFlowTest(EngineTestBase): + + def test_sequential_flow_one_task(self): + flow = blocks.LinearFlow().add( + blocks.Task(TestTask(self.values, name='task1')) + ) + self._make_engine(flow).run() + self.assertEquals(self.values, ['task1']) + + def test_sequential_flow_two_tasks(self): + flow = blocks.LinearFlow().add( + blocks.Task(TestTask(self.values, name='task1')), + blocks.Task(TestTask(self.values, name='task2')) + ) + self._make_engine(flow).run() + self.assertEquals(self.values, ['task1', 'task2']) + + def test_sequential_flow_nested_blocks(self): + flow = blocks.LinearFlow().add( + blocks.Task(TestTask(self.values, 'task1')), + blocks.LinearFlow().add( + blocks.Task(TestTask(self.values, 'task2')) + ) + ) + self._make_engine(flow).run() + self.assertEquals(self.values, ['task1', 'task2']) + + def test_revert_exception_is_reraised(self): + flow = blocks.LinearFlow().add( + blocks.Task(NastyTask), + blocks.Task(FailingTask(self.values, 'fail')) + ) + engine = self._make_engine(flow) + with self.assertRaisesRegexp(RuntimeError, '^Gotcha'): + engine.run() + + def test_revert_not_run_task_is_not_reverted(self): + flow = blocks.LinearFlow().add( + blocks.Task(FailingTask(self.values, 'fail')), + blocks.Task(NeverRunningTask) + ) + self._make_engine(flow).run() + self.assertEquals(self.values, ['fail reverted']) + + def test_correctly_reverts_children(self): + flow = blocks.LinearFlow().add( + blocks.Task(TestTask(self.values, 'task1')), + blocks.LinearFlow().add( + blocks.Task(TestTask(self.values, 'task2')), + blocks.Task(FailingTask(self.values, 'fail')) + ) + ) + engine = self._make_engine(flow) + engine.run() + self.assertEquals(self.values, ['task1', 'task2', + 'fail reverted', + 'task2 reverted', 'task1 reverted']) + + +class SingleThreadedEngineTest(EngineTaskTest, + EngineLinearFlowTest, + test.TestCase): + def _make_engine(self, flow): + return eng.SingleThreadedActionEngine(flow) diff --git a/tox.ini b/tox.ini index ada78a29..ee1dfb58 100644 --- a/tox.ini +++ b/tox.ini @@ -39,6 +39,6 @@ setenv = NOSE_WITH_COVERAGE=1 commands = {posargs} [flake8] -ignore = H402,H302 +ignore = H402 builtins = _ -exclude = .venv,.tox,dist,doc,*openstack/common*,*egg,.git,build,tools \ No newline at end of file +exclude = .venv,.tox,dist,doc,*openstack/common*,*egg,.git,build,tools