diff --git a/taskflow/tests/unit/test_graph_flow.py b/taskflow/tests/unit/test_graph_flow.py deleted file mode 100644 index dc661553..00000000 --- a/taskflow/tests/unit/test_graph_flow.py +++ /dev/null @@ -1,221 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections - -from taskflow import decorators -from taskflow import exceptions as excp -from taskflow.patterns import graph_flow as gw -from taskflow import states -# from taskflow import test -from taskflow.tests import utils - - -# FIXME(imelnikov): threaded flow is broken, so we temporarily skip -# the tests by replacing parent class with object -class GraphFlowTest(object): - def test_reverting_flow(self): - flo = gw.Flow("test-flow") - reverted = [] - - def run1_revert(context, result, cause): # pylint: disable=W0613 - reverted.append('run1') - self.assertEquals(states.REVERTING, cause.flow.state) - self.assertEquals(result, {'a': 1}) - - @decorators.task(revert=run1_revert, provides=['a']) - def run1(context): # pylint: disable=W0613 - return { - 'a': 1, - } - - @decorators.task(provides=['c']) - def run2(context, a): # pylint: disable=W0613,C0103 - raise Exception('Dead') - - flo.add(run1) - flo.add(run2) - - self.assertEquals(states.PENDING, flo.state) - self.assertRaises(Exception, flo.run, {}) - self.assertEquals(states.FAILURE, flo.state) - self.assertEquals(['run1'], reverted) - - def test_no_requires_provider(self): - flo = gw.Flow("test-flow") - flo.add(utils.ProvidesRequiresTask('test1', - provides=['a', 'b'], - requires=['c', 'd'])) - self.assertEquals(states.PENDING, flo.state) - self.assertRaises(excp.InvalidStateException, flo.run, {}) - self.assertEquals(states.FAILURE, flo.state) - - def test_looping_flow(self): - flo = gw.Flow("test-flow") - flo.add(utils.ProvidesRequiresTask('test1', - provides=['a', 'b'], - requires=['c', 'd', 'e'])) - flo.add(utils.ProvidesRequiresTask('test2', - provides=['c', 'd', 'e'], - requires=['a', 'b'])) - ctx = collections.defaultdict(list) - self.assertEquals(states.PENDING, flo.state) - self.assertRaises(excp.InvalidStateException, flo.run, ctx) - self.assertEquals(states.FAILURE, flo.state) - - def test_complicated_inputs_outputs(self): - flo = gw.Flow("test-flow") - flo.add(utils.ProvidesRequiresTask('test1', - provides=['a', 'b'], - requires=['c', 'd', 'e'])) - flo.add(utils.ProvidesRequiresTask('test2', - provides=['c', 'd', 'e'], - requires=[])) - flo.add(utils.ProvidesRequiresTask('test3', - provides=['c', 'd'], - requires=[])) - flo.add(utils.ProvidesRequiresTask('test4', - provides=['z'], - requires=['a', 'b', 'c', 'd', 'e'])) - flo.add(utils.ProvidesRequiresTask('test5', - provides=['y'], - requires=['z'])) - flo.add(utils.ProvidesRequiresTask('test6', - provides=[], - requires=['y'])) - - self.assertEquals(states.PENDING, flo.state) - ctx = collections.defaultdict(list) - flo.run(ctx) - self.assertEquals(states.SUCCESS, flo.state) - run_order = ctx[utils.ORDER_KEY] - - # Order isn't deterministic so that's why we sort it - self.assertEquals(['test2', 'test3'], sorted(run_order[0:2])) - - # This order is deterministic - self.assertEquals(['test1', 'test4', 'test5', 'test6'], run_order[2:]) - - def test_connect_requirement_failure(self): - - @decorators.task(provides=['a']) - def run1(context): # pylint: disable=W0613 - return { - 'a': 1, - } - - @decorators.task - def run2(context, b, c, d): # pylint: disable=W0613,C0103 - return None - - flo = gw.Flow("test-flow") - flo.add(run1) - flo.add(run2) - - self.assertRaises(excp.InvalidStateException, flo.run, {}) - - def test_manual_dependencies(self): - flo = gw.Flow("test-flow") - run_order = [] - - @decorators.task - def run1(context): # pylint: disable=W0613,C0103 - run_order.append('ran1') - - @decorators.task - def run2(context): # pylint: disable=W0613,C0103 - run_order.append('ran2') - - @decorators.task - def run3(context): # pylint: disable=W0613,C0103 - run_order.append('ran3') - - (uuid1, uuid2, uuid3) = flo.add_many([run1, run2, run3]) - flo.add_dependency(uuid3, uuid2) - flo.add_dependency(uuid2, uuid1) - self.assertRaises(ValueError, flo.add_dependency, uuid2, uuid2) - self.assertRaises(ValueError, flo.add_dependency, - uuid2 + "blah", uuid3) - - flo.run({}) - self.assertEquals(['ran3', 'ran2', 'ran1'], run_order) - - def test_manual_providing_dependencies(self): - flo = gw.Flow("test-flow") - - @decorators.task(provides=['a']) - def run1(context): - return { - 'a': 2, - } - - @decorators.task - def run2(context, a): - pass - - uuid1 = flo.add(run1) - uuid2 = flo.add(run2, infer=False) - self.assertRaises(excp.MissingDependencies, - flo.run, {}) - flo.reset() - flo.add_dependency(uuid1, uuid2) - flo.run({}) - - def test_happy_flow(self): - flo = gw.Flow("test-flow") - - run_order = [] - f_args = {} - - @decorators.task(provides=['a']) - def run1(context): # pylint: disable=W0613,C0103 - run_order.append('ran1') - return { - 'a': 1, - } - - @decorators.task(provides=['c']) - def run2(context, a): # pylint: disable=W0613,C0103 - run_order.append('ran2') - return { - 'c': 3, - } - - @decorators.task(provides=['b']) - def run3(context, a): # pylint: disable=W0613,C0103 - run_order.append('ran3') - return { - 'b': 2, - } - - @decorators.task - def run4(context, b, c): # pylint: disable=W0613,C0103 - run_order.append('ran4') - f_args['b'] = b - f_args['c'] = c - - flo.add(run1) - flo.add(run2) - flo.add(run3) - flo.add(run4) - - flo.run({}) - self.assertEquals(['ran1', 'ran2', 'ran3', 'ran4'], sorted(run_order)) - self.assertEquals('ran1', run_order[0]) - self.assertEquals('ran4', run_order[-1]) - self.assertEquals({'b': 2, 'c': 3}, f_args) diff --git a/taskflow/tests/unit/test_threaded_flow.py b/taskflow/tests/unit/test_threaded_flow.py deleted file mode 100644 index 2c02613d..00000000 --- a/taskflow/tests/unit/test_threaded_flow.py +++ /dev/null @@ -1,391 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import threading -import time - -from taskflow import decorators -from taskflow import exceptions as excp -from taskflow import states - -# from taskflow.patterns import threaded_flow as tf -from taskflow.patterns import graph_flow as tf # make flake8 happy -# from taskflow import test -from taskflow.tests import utils - - -def _find_idx(what, search_where): - for i, j in enumerate(search_where): - if i == what: - return j - return -1 - - -# FIXME(imelnikov): threaded flow is broken, so we temporarily skip -# the tests by replacing parent class with object -class ThreadedFlowTest(object): - def _make_tracking_flow(self, name): - notify_lock = threading.RLock() - flo = tf.Flow(name) - notifications = [] - - def save_notify(state, details): - runner = details.get('runner') - if not runner: - return - with notify_lock: - notifications.append((runner.uuid, state, dict(details))) - - flo.task_notifier.register('*', save_notify) - return (flo, notifications) - - def _make_watched_flow(self, name): - history_lock = threading.RLock() - flo = tf.Flow(name) - history = {} - - def save_state(state, details): - runner = details.get('runner') - if not runner: - return - with history_lock: - old_state = details.get('old_state') - old_states = history.get(runner.uuid, []) - if not old_states: - old_states.append(old_state) - old_states.append(state) - history[runner.uuid] = old_states - - flo.task_notifier.register('*', save_state) - return (flo, history) - - def test_somewhat_complicated(self): - """Tests a somewhat complicated dependency graph. - - X--Y--C--D - | | - A--B-- --G-- - | | |--Z(end) - E--F-- --H-- - """ - (flo, notifications) = self._make_tracking_flow("sanity-test") - - # X--Y - x = flo.add(utils.ProvidesRequiresTask("X", - provides=['x'], - requires=[])) - y = flo.add(utils.ProvidesRequiresTask("Y", - provides=['y'], - requires=['x'])) - - # A--B - a = flo.add(utils.ProvidesRequiresTask("A", - provides=['a'], - requires=[])) - b = flo.add(utils.ProvidesRequiresTask("B", - provides=['b'], - requires=['a'])) - - # E--F - e = flo.add(utils.ProvidesRequiresTask("E", - provides=['e'], - requires=[])) - f = flo.add(utils.ProvidesRequiresTask("F", - provides=['f'], - requires=['e'])) - - # C--D - c = flo.add(utils.ProvidesRequiresTask("C", - provides=['c'], - requires=['f', 'b', 'y'])) - d = flo.add(utils.ProvidesRequiresTask("D", - provides=['d'], - requires=['c'])) - - # G - g = flo.add(utils.ProvidesRequiresTask("G", - provides=['g'], - requires=['d'])) - - # H - h = flo.add(utils.ProvidesRequiresTask("H", - provides=['h'], - requires=['d'])) - - # Z - z = flo.add(utils.ProvidesRequiresTask("Z", - provides=['z'], - requires=['g', 'h'])) - - all_uuids = [z, h, g, d, c, f, e, b, a, y, x] - self.assertEquals(states.PENDING, flo.state) - flo.run({}) - self.assertEquals(states.SUCCESS, flo.state) - - # Analyze the notifications to determine that the correct ordering - # occurred - - # Discard states we aren't really interested in. - c_notifications = [] - uuids_ran = set() - for (uuid, state, details) in notifications: - if state not in [states.RUNNING, states.SUCCESS, states.FAILURE]: - continue - uuids_ran.add(uuid) - c_notifications.append((uuid, state, details)) - notifications = c_notifications - self.assertEquals(len(all_uuids), len(uuids_ran)) - - # Select out the run order - just_ran_uuids = [] - for (uuid, state, details) in notifications: - if state not in [states.RUNNING]: - continue - just_ran_uuids.append(uuid) - - def ran_before(ran_uuid, before_what): - before_idx = just_ran_uuids.index(ran_uuid) - other_idxs = [just_ran_uuids.index(u) for u in before_what] - was_before = True - for idx in other_idxs: - if idx < before_idx: - was_before = False - return was_before - - def ran_after(ran_uuid, after_what): - after_idx = just_ran_uuids.index(ran_uuid) - other_idxs = [just_ran_uuids.index(u) for u in after_what] - was_after = True - for idx in other_idxs: - if idx > after_idx: - was_after = False - return was_after - - # X, A, E should always run before the others - self.assertTrue(ran_before(x, [c, d, g, h, z])) - self.assertTrue(ran_before(a, [c, d, g, h, z])) - self.assertTrue(ran_before(e, [c, d, g, h, z])) - - # Y, B, F should always run before C - self.assertTrue(ran_before(y, [c])) - self.assertTrue(ran_before(b, [c])) - self.assertTrue(ran_before(f, [c])) - - # C runs before D - self.assertTrue(ran_before(c, [d])) - - # G and H are before Z - self.assertTrue(ran_before(g, [z])) - self.assertTrue(ran_before(h, [z])) - - # C, D runs after X, Y, B, E, F - self.assertTrue(ran_after(c, [x, y, b, e, f])) - self.assertTrue(ran_after(d, [x, y, b, c, e, f])) - - # Z is last - all_uuids_no_z = list(all_uuids) - all_uuids_no_z.remove(z) - self.assertTrue(ran_after(z, all_uuids_no_z)) - - def test_empty_cancel(self): - (flo, history) = self._make_watched_flow("sanity-test") - self.assertEquals(states.PENDING, flo.state) - flo.cancel() - self.assertEquals(states.CANCELLED, flo.state) - - def test_self_loop_flo(self): - (flo, history) = self._make_watched_flow("sanity-test") - flo.add(utils.ProvidesRequiresTask("do-that", - provides=['c'], - requires=['c'])) - self.assertRaises(excp.InvalidStateException, flo.run, {}) - - def test_circular_flo(self): - (flo, history) = self._make_watched_flow("sanity-test") - flo.add(utils.ProvidesRequiresTask("do-that", - provides=['c'], - requires=['a'])) - flo.add(utils.ProvidesRequiresTask("do-this", - provides=['a'], - requires=['c'])) - self.assertRaises(excp.InvalidStateException, flo.run, {}) - - def test_no_input_flo(self): - (flo, history) = self._make_watched_flow("sanity-test") - flo.add(utils.ProvidesRequiresTask("do-that", - provides=['c'], - requires=['a'])) - flo.add(utils.ProvidesRequiresTask("do-this", - provides=['b'], - requires=['c'])) - self.assertRaises(excp.InvalidStateException, flo.run, {}) - - def test_simple_resume(self): - (flo, history) = self._make_watched_flow("sanity-test") - f_uuid = flo.add(utils.ProvidesRequiresTask("do-this", - provides=['a'], - requires=[])) - flo.add(utils.ProvidesRequiresTask("do-that", - provides=['c'], - requires=['a'])) - - def resume_it(flow, ordering): - ran_already = [] - not_ran = [] - for r in ordering: - if r.uuid == f_uuid: - ran_already.append((r, { - 'result': 'b', - 'states': [states.SUCCESS], - })) - else: - not_ran.append(r) - return (ran_already, not_ran) - - flo.resumer = resume_it - flo.run({}) - self.assertEquals('b', flo.results[f_uuid]) - self.assertEquals(states.SUCCESS, flo.state) - - def test_active_cancel(self): - (flo, history) = self._make_watched_flow("sanity-test") - flo.add(utils.ProvidesRequiresTask("do-this", - provides=['a'], - requires=[])) - flo.add(utils.ProvidesRequiresTask("do-that", - provides=['c'], - requires=['a'])) - - @decorators.task(provides=['d'], requires=['c']) - def cancel_it(context, c): - am_cancelled = flo.cancel() - return am_cancelled - - uuid = flo.add(cancel_it) - flo.add(utils.ProvidesRequiresTask("do-the-other", - provides=['e'], - requires=['d'])) - - flo.run({}) - self.assertIn(uuid, flo.results) - self.assertEquals(states.INCOMPLETE, flo.state) - self.assertEquals(1, flo.results[uuid]) - - def test_sanity_run(self): - (flo, history) = self._make_watched_flow("sanity-test") - flo.add(utils.ProvidesRequiresTask("do-this", - provides=['a'], - requires=[])) - flo.add(utils.ProvidesRequiresTask("do-that", - provides=['c'], - requires=['a'])) - flo.add(utils.ProvidesRequiresTask("do-other", - provides=['d'], - requires=[])) - flo.add(utils.ProvidesRequiresTask("do-thing", - provides=['e'], - requires=['d'])) - self.assertEquals(states.PENDING, flo.state) - context = {} - flo.run(context) - self.assertEquals(states.SUCCESS, flo.state) - self.assertTrue(len(context) > 0) - # Even when running in parallel this will be the required order since - # 'do-that' depends on 'do-this' finishing first. - expected_order = ['do-this', 'do-that'] - this_that = [t for t in context[utils.ORDER_KEY] - if t in expected_order] - self.assertEquals(expected_order, this_that) - expected_order = ['do-other', 'do-thing'] - this_that = [t for t in context[utils.ORDER_KEY] - if t in expected_order] - self.assertEquals(expected_order, this_that) - - def test_single_failure(self): - - def reverter(context, result, cause): - context['reverted'] = True - - @decorators.task(revert=reverter) - def fail_quick(context): - raise IOError("Broken") - - (flo, history) = self._make_watched_flow('test-single-fail') - f_uuid = flo.add(fail_quick) - context = {} - self.assertRaises(IOError, flo.run, context) - self.assertEquals(states.FAILURE, flo.state) - self.assertEquals(states.REVERTED, history[f_uuid][-1]) - self.assertTrue(context.get('reverted')) - - def test_failure_cancel_successors(self): - (flo, history) = self._make_watched_flow("failure-cancel") - - @decorators.task(provides=['b', 'c']) - def fail_quick(context): - raise IOError("Broken") - - @decorators.task - def after_fail(context, b): - pass - - @decorators.task - def after_fail2(context, c): - pass - - fq, af, af2 = flo.add_many([fail_quick, after_fail, after_fail2]) - self.assertEquals(states.PENDING, flo.state) - - context = {} - self.assertRaises(IOError, flo.run, context) - self.assertEquals(states.FAILURE, flo.state) - self.assertEquals(states.REVERTED, history[fq][-1]) - self.assertEquals(states.CANCELLED, history[af][-1]) - self.assertEquals(states.CANCELLED, history[af2][-1]) - - def test_live_timeout(self): - - @decorators.task(provides=['a']) - def task_long(context): - time.sleep(1) - return { - 'a': 2, - } - - @decorators.task(provides=['b']) - def wait_short(context, a): - pass - - @decorators.task - def wait_ok_long(context, a): - pass - - @decorators.task - def wait_after_short(context, b): - pass - - (flo, history) = self._make_watched_flow('test-live') - flo.add(task_long) - ws_uuid = flo.add(wait_short, timeout=0.1) - flo.add(wait_ok_long) - was_uuid = flo.add(wait_after_short) - - flo.run({}) - self.assertEquals(states.INCOMPLETE, flo.state) - self.assertEquals(states.TIMED_OUT, history[ws_uuid][-1]) - self.assertEquals(states.CANCELLED, history[was_uuid][-1])