diff --git a/doc/source/notifications.rst b/doc/source/notifications.rst index 755f7b13..e147e902 100644 --- a/doc/source/notifications.rst +++ b/doc/source/notifications.rst @@ -137,14 +137,14 @@ For example, this is how you can use >>> with printing.PrintingListener(eng): ... eng.run() ... - taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved flow 'cat-dog' (...) into state 'RUNNING' - taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved task 'CatTalk' (...) into state 'RUNNING' + taskflow.engines.action_engine.engine.SerialActionEngine: ... has moved flow 'cat-dog' (...) into state 'RUNNING' + taskflow.engines.action_engine.engine.SerialActionEngine: ... has moved task 'CatTalk' (...) into state 'RUNNING' meow - taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved task 'CatTalk' (...) into state 'SUCCESS' with result 'cat' (failure=False) - taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved task 'DogTalk' (...) into state 'RUNNING' + taskflow.engines.action_engine.engine.SerialActionEngine: ... has moved task 'CatTalk' (...) into state 'SUCCESS' with result 'cat' (failure=False) + taskflow.engines.action_engine.engine.SerialActionEngine: ... has moved task 'DogTalk' (...) into state 'RUNNING' woof - taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved task 'DogTalk' (...) into state 'SUCCESS' with result 'dog' (failure=False) - taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved flow 'cat-dog' (...) into state 'SUCCESS' + taskflow.engines.action_engine.engine.SerialActionEngine: ... has moved task 'DogTalk' (...) into state 'SUCCESS' with result 'dog' (failure=False) + taskflow.engines.action_engine.engine.SerialActionEngine: ... has moved flow 'cat-dog' (...) into state 'SUCCESS' Basic listener -------------- diff --git a/setup.cfg b/setup.cfg index 0c396204..dadce717 100644 --- a/setup.cfg +++ b/setup.cfg @@ -49,9 +49,9 @@ taskflow.persistence = zookeeper = taskflow.persistence.backends.impl_zookeeper:ZkBackend taskflow.engines = - default = taskflow.engines.action_engine.engine:SingleThreadedActionEngine - serial = taskflow.engines.action_engine.engine:SingleThreadedActionEngine - parallel = taskflow.engines.action_engine.engine:MultiThreadedActionEngine + default = taskflow.engines.action_engine.engine:SerialActionEngine + serial = taskflow.engines.action_engine.engine:SerialActionEngine + parallel = taskflow.engines.action_engine.engine:ParallelActionEngine worker-based = taskflow.engines.worker_based.engine:WorkerBasedActionEngine [nosetests] diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 9bf62429..bbd89c9d 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -225,12 +225,12 @@ class ActionEngine(base.EngineBase): self._compiled = True -class SingleThreadedActionEngine(ActionEngine): +class SerialActionEngine(ActionEngine): """Engine that runs tasks in serial manner.""" _storage_factory = atom_storage.SingleThreadedStorage -class MultiThreadedActionEngine(ActionEngine): +class ParallelActionEngine(ActionEngine): """Engine that runs tasks in parallel manner.""" _storage_factory = atom_storage.MultiThreadedStorage @@ -240,7 +240,7 @@ class MultiThreadedActionEngine(ActionEngine): def __init__(self, flow, flow_detail, backend, conf, executor=None, max_workers=None): - super(MultiThreadedActionEngine, self).__init__( - flow, flow_detail, backend, conf) + super(ParallelActionEngine, self).__init__(flow, flow_detail, + backend, conf) self._executor = executor self._max_workers = max_workers diff --git a/taskflow/examples/run_by_iter.py b/taskflow/examples/run_by_iter.py index 0a7761b7..4b7b98cc 100644 --- a/taskflow/examples/run_by_iter.py +++ b/taskflow/examples/run_by_iter.py @@ -30,9 +30,9 @@ sys.path.insert(0, top_dir) sys.path.insert(0, self_dir) -from taskflow.engines.action_engine import engine +from taskflow import engines from taskflow.patterns import linear_flow as lf -from taskflow.persistence.backends import impl_memory +from taskflow.persistence import backends as persistence_backends from taskflow import task from taskflow.utils import persistence_utils @@ -73,18 +73,15 @@ flows = [] for i in range(0, flow_count): f = make_alphabet_flow(i + 1) flows.append(make_alphabet_flow(i + 1)) -be = impl_memory.MemoryBackend({}) +be = persistence_backends.fetch(conf={'connection': 'memory'}) book = persistence_utils.temporary_log_book(be) -engines = [] +engine_iters = [] for f in flows: fd = persistence_utils.create_flow_detail(f, book, be) - e = engine.SingleThreadedActionEngine(f, fd, be, {}) + e = engines.load(f, flow_detail=fd, backend=be, book=book) e.compile() e.storage.inject({'A': 'A'}) e.prepare() - engines.append(e) -engine_iters = [] -for e in engines: engine_iters.append(e.run_iter()) while engine_iters: for it in list(engine_iters): diff --git a/taskflow/examples/run_by_iter_enumerate.py b/taskflow/examples/run_by_iter_enumerate.py index 66b1859f..d954d6aa 100644 --- a/taskflow/examples/run_by_iter_enumerate.py +++ b/taskflow/examples/run_by_iter_enumerate.py @@ -27,9 +27,9 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), sys.path.insert(0, top_dir) sys.path.insert(0, self_dir) -from taskflow.engines.action_engine import engine +from taskflow import engines from taskflow.patterns import linear_flow as lf -from taskflow.persistence.backends import impl_memory +from taskflow.persistence import backends as persistence_backends from taskflow import task from taskflow.utils import persistence_utils @@ -48,10 +48,10 @@ f = lf.Flow("counter") for i in range(0, 10): f.add(EchoNameTask("echo_%s" % (i + 1))) -be = impl_memory.MemoryBackend() +be = persistence_backends.fetch(conf={'connection': 'memory'}) book = persistence_utils.temporary_log_book(be) fd = persistence_utils.create_flow_detail(f, book, be) -e = engine.SingleThreadedActionEngine(f, fd, be, {}) +e = engines.load(f, flow_detail=fd, backend=be, book=book) e.compile() e.prepare() diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index d2fb0d43..243635fb 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -555,11 +555,11 @@ class SingleThreadedEngineTest(EngineTaskTest, def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) - self.assertIsInstance(engine, eng.SingleThreadedActionEngine) + self.assertIsInstance(engine, eng.SerialActionEngine) def test_singlethreaded_is_the_default(self): engine = taskflow.engines.load(utils.TaskNoRequiresNoReturns) - self.assertIsInstance(engine, eng.SingleThreadedActionEngine) + self.assertIsInstance(engine, eng.SerialActionEngine) class MultiThreadedEngineTest(EngineTaskTest, @@ -578,7 +578,7 @@ class MultiThreadedEngineTest(EngineTaskTest, def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) - self.assertIsInstance(engine, eng.MultiThreadedActionEngine) + self.assertIsInstance(engine, eng.ParallelActionEngine) self.assertIs(engine._executor, None) def test_using_common_executor(self):