Update engine class names to better reflect there usage

Rename the single threaded engine to be the serial engine which
better matches its entrypoint, do the same for the multithreaded
engine (renaming it to the parallel engine).

Change-Id: I6174b4f1936858c13eeee416bfa3836cf20a1350
This commit is contained in:
Joshua Harlow 2014-10-07 10:56:30 -07:00
parent 1caaecc5d6
commit 8d143187ea
6 changed files with 25 additions and 28 deletions

View File

@ -137,14 +137,14 @@ For example, this is how you can use
>>> with printing.PrintingListener(eng): >>> with printing.PrintingListener(eng):
... eng.run() ... eng.run()
... ...
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved flow 'cat-dog' (...) into state 'RUNNING' taskflow.engines.action_engine.engine.SerialActionEngine: ... has moved flow 'cat-dog' (...) into state 'RUNNING'
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved task 'CatTalk' (...) into state 'RUNNING' taskflow.engines.action_engine.engine.SerialActionEngine: ... has moved task 'CatTalk' (...) into state 'RUNNING'
meow meow
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved task 'CatTalk' (...) into state 'SUCCESS' with result 'cat' (failure=False) taskflow.engines.action_engine.engine.SerialActionEngine: ... has moved task 'CatTalk' (...) into state 'SUCCESS' with result 'cat' (failure=False)
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved task 'DogTalk' (...) into state 'RUNNING' taskflow.engines.action_engine.engine.SerialActionEngine: ... has moved task 'DogTalk' (...) into state 'RUNNING'
woof woof
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved task 'DogTalk' (...) into state 'SUCCESS' with result 'dog' (failure=False) taskflow.engines.action_engine.engine.SerialActionEngine: ... has moved task 'DogTalk' (...) into state 'SUCCESS' with result 'dog' (failure=False)
taskflow.engines.action_engine.engine.SingleThreadedActionEngine: ... has moved flow 'cat-dog' (...) into state 'SUCCESS' taskflow.engines.action_engine.engine.SerialActionEngine: ... has moved flow 'cat-dog' (...) into state 'SUCCESS'
Basic listener Basic listener
-------------- --------------

View File

@ -49,9 +49,9 @@ taskflow.persistence =
zookeeper = taskflow.persistence.backends.impl_zookeeper:ZkBackend zookeeper = taskflow.persistence.backends.impl_zookeeper:ZkBackend
taskflow.engines = taskflow.engines =
default = taskflow.engines.action_engine.engine:SingleThreadedActionEngine default = taskflow.engines.action_engine.engine:SerialActionEngine
serial = taskflow.engines.action_engine.engine:SingleThreadedActionEngine serial = taskflow.engines.action_engine.engine:SerialActionEngine
parallel = taskflow.engines.action_engine.engine:MultiThreadedActionEngine parallel = taskflow.engines.action_engine.engine:ParallelActionEngine
worker-based = taskflow.engines.worker_based.engine:WorkerBasedActionEngine worker-based = taskflow.engines.worker_based.engine:WorkerBasedActionEngine
[nosetests] [nosetests]

View File

@ -225,12 +225,12 @@ class ActionEngine(base.EngineBase):
self._compiled = True self._compiled = True
class SingleThreadedActionEngine(ActionEngine): class SerialActionEngine(ActionEngine):
"""Engine that runs tasks in serial manner.""" """Engine that runs tasks in serial manner."""
_storage_factory = atom_storage.SingleThreadedStorage _storage_factory = atom_storage.SingleThreadedStorage
class MultiThreadedActionEngine(ActionEngine): class ParallelActionEngine(ActionEngine):
"""Engine that runs tasks in parallel manner.""" """Engine that runs tasks in parallel manner."""
_storage_factory = atom_storage.MultiThreadedStorage _storage_factory = atom_storage.MultiThreadedStorage
@ -240,7 +240,7 @@ class MultiThreadedActionEngine(ActionEngine):
def __init__(self, flow, flow_detail, backend, conf, def __init__(self, flow, flow_detail, backend, conf,
executor=None, max_workers=None): executor=None, max_workers=None):
super(MultiThreadedActionEngine, self).__init__( super(ParallelActionEngine, self).__init__(flow, flow_detail,
flow, flow_detail, backend, conf) backend, conf)
self._executor = executor self._executor = executor
self._max_workers = max_workers self._max_workers = max_workers

View File

@ -30,9 +30,9 @@ sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir) sys.path.insert(0, self_dir)
from taskflow.engines.action_engine import engine from taskflow import engines
from taskflow.patterns import linear_flow as lf from taskflow.patterns import linear_flow as lf
from taskflow.persistence.backends import impl_memory from taskflow.persistence import backends as persistence_backends
from taskflow import task from taskflow import task
from taskflow.utils import persistence_utils from taskflow.utils import persistence_utils
@ -73,18 +73,15 @@ flows = []
for i in range(0, flow_count): for i in range(0, flow_count):
f = make_alphabet_flow(i + 1) f = make_alphabet_flow(i + 1)
flows.append(make_alphabet_flow(i + 1)) flows.append(make_alphabet_flow(i + 1))
be = impl_memory.MemoryBackend({}) be = persistence_backends.fetch(conf={'connection': 'memory'})
book = persistence_utils.temporary_log_book(be) book = persistence_utils.temporary_log_book(be)
engines = [] engine_iters = []
for f in flows: for f in flows:
fd = persistence_utils.create_flow_detail(f, book, be) fd = persistence_utils.create_flow_detail(f, book, be)
e = engine.SingleThreadedActionEngine(f, fd, be, {}) e = engines.load(f, flow_detail=fd, backend=be, book=book)
e.compile() e.compile()
e.storage.inject({'A': 'A'}) e.storage.inject({'A': 'A'})
e.prepare() e.prepare()
engines.append(e)
engine_iters = []
for e in engines:
engine_iters.append(e.run_iter()) engine_iters.append(e.run_iter())
while engine_iters: while engine_iters:
for it in list(engine_iters): for it in list(engine_iters):

View File

@ -27,9 +27,9 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
sys.path.insert(0, top_dir) sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir) sys.path.insert(0, self_dir)
from taskflow.engines.action_engine import engine from taskflow import engines
from taskflow.patterns import linear_flow as lf from taskflow.patterns import linear_flow as lf
from taskflow.persistence.backends import impl_memory from taskflow.persistence import backends as persistence_backends
from taskflow import task from taskflow import task
from taskflow.utils import persistence_utils from taskflow.utils import persistence_utils
@ -48,10 +48,10 @@ f = lf.Flow("counter")
for i in range(0, 10): for i in range(0, 10):
f.add(EchoNameTask("echo_%s" % (i + 1))) f.add(EchoNameTask("echo_%s" % (i + 1)))
be = impl_memory.MemoryBackend() be = persistence_backends.fetch(conf={'connection': 'memory'})
book = persistence_utils.temporary_log_book(be) book = persistence_utils.temporary_log_book(be)
fd = persistence_utils.create_flow_detail(f, book, be) fd = persistence_utils.create_flow_detail(f, book, be)
e = engine.SingleThreadedActionEngine(f, fd, be, {}) e = engines.load(f, flow_detail=fd, backend=be, book=book)
e.compile() e.compile()
e.prepare() e.prepare()

View File

@ -555,11 +555,11 @@ class SingleThreadedEngineTest(EngineTaskTest,
def test_correct_load(self): def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns) engine = self._make_engine(utils.TaskNoRequiresNoReturns)
self.assertIsInstance(engine, eng.SingleThreadedActionEngine) self.assertIsInstance(engine, eng.SerialActionEngine)
def test_singlethreaded_is_the_default(self): def test_singlethreaded_is_the_default(self):
engine = taskflow.engines.load(utils.TaskNoRequiresNoReturns) engine = taskflow.engines.load(utils.TaskNoRequiresNoReturns)
self.assertIsInstance(engine, eng.SingleThreadedActionEngine) self.assertIsInstance(engine, eng.SerialActionEngine)
class MultiThreadedEngineTest(EngineTaskTest, class MultiThreadedEngineTest(EngineTaskTest,
@ -578,7 +578,7 @@ class MultiThreadedEngineTest(EngineTaskTest,
def test_correct_load(self): def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns) engine = self._make_engine(utils.TaskNoRequiresNoReturns)
self.assertIsInstance(engine, eng.MultiThreadedActionEngine) self.assertIsInstance(engine, eng.ParallelActionEngine)
self.assertIs(engine._executor, None) self.assertIs(engine._executor, None)
def test_using_common_executor(self): def test_using_common_executor(self):