diff --git a/setup.cfg b/setup.cfg index 378ff7412..4e485a5e0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -36,6 +36,11 @@ taskflow.persistence = postgresql = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend sqlite = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend +taskflow.engines = + default = taskflow.engines.action_engine.engine:SingleThreadedActionEngine + serial = taskflow.engines.action_engine.engine:SingleThreadedActionEngine + parallel = taskflow.engines.action_engine.engine:MultiThreadedActionEngine + [nosetests] cover-erase = true verbosity = 2 diff --git a/taskflow/engines/__init__.py b/taskflow/engines/__init__.py index 830dd2e7c..bdec4e4cc 100644 --- a/taskflow/engines/__init__.py +++ b/taskflow/engines/__init__.py @@ -15,3 +15,8 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + + +# promote helpers to this module namespace +from taskflow.engines.helpers import load # noqa +from taskflow.engines.helpers import run # noqa diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 562e82d37..fdd01c063 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -16,13 +16,13 @@ # License for the specific language governing permissions and limitations # under the License. -import multiprocessing import threading from concurrent import futures from taskflow.engines.action_engine import graph_action from taskflow.engines.action_engine import task_action +from taskflow.engines import base from taskflow import decorators from taskflow import exceptions as exc @@ -31,25 +31,24 @@ from taskflow import storage as t_storage from taskflow.utils import flow_utils from taskflow.utils import misc -from taskflow.utils import persistence_utils as p_utils +from taskflow.utils import threading_utils -class ActionEngine(object): +class ActionEngine(base.EngineBase): """Generic action-based engine. Converts the flow to recursive structure of actions. """ _graph_action = None - def __init__(self, flow, storage): + def __init__(self, flow, flow_detail, backend, conf): + super(ActionEngine, self).__init__(flow, flow_detail, backend, conf) self._failures = [] self._root = None - self._flow = flow self._lock = threading.RLock() self._state_lock = threading.RLock() self.notifier = misc.TransitionNotifier() self.task_notifier = misc.TransitionNotifier() - self.storage = storage def _revert(self, current_failure): self._change_state(states.REVERTING) @@ -145,62 +144,37 @@ class ActionEngine(object): class SingleThreadedActionEngine(ActionEngine): # This one attempts to run in a serial manner. _graph_action = graph_action.SequentialGraphAction - - def __init__(self, flow, flow_detail=None, book=None, backend=None): - if flow_detail is None: - flow_detail = p_utils.create_flow_detail(flow, - book=book, - backend=backend) - ActionEngine.__init__(self, flow, - storage=t_storage.Storage(flow_detail, backend)) + _storage_cls = t_storage.Storage class MultiThreadedActionEngine(ActionEngine): # This one attempts to run in a parallel manner. _graph_action = graph_action.ParallelGraphAction + _storage_cls = t_storage.ThreadSafeStorage - def __init__(self, flow, flow_detail=None, book=None, backend=None, - executor=None): - if flow_detail is None: - flow_detail = p_utils.create_flow_detail(flow, - book=book, - backend=backend) - ActionEngine.__init__(self, flow, - storage=t_storage.ThreadSafeStorage(flow_detail, - backend)) - if executor is not None: - self._executor = executor - self._owns_executor = False - self._thread_count = -1 - else: - self._executor = None - self._owns_executor = True - # TODO(harlowja): allow this to be configurable?? - try: - self._thread_count = multiprocessing.cpu_count() + 1 - except NotImplementedError: - # NOTE(harlowja): apparently may raise so in this case we will - # just setup two threads since its hard to know what else we - # should do in this situation. - self._thread_count = 2 + def __init__(self, flow, flow_detail, backend, conf): + super(MultiThreadedActionEngine, self).__init__( + flow, flow_detail, backend, conf) + self._executor = conf.get('executor', None) @decorators.locked def run(self): - if self._owns_executor: - if self._executor is not None: - # The previous shutdown failed, something is very wrong. - raise exc.InvalidStateException("The previous shutdown() of" - " the executor powering this" - " engine failed. Something is" - " very very wrong.") - self._executor = futures.ThreadPoolExecutor(self._thread_count) + if self._executor is None: + self._executor = futures.ThreadPoolExecutor( + threading_utils.get_optimal_thread_count()) + owns_executor = True + else: + owns_executor = False + try: ActionEngine.run(self) finally: # Don't forget to shutdown the executor!! - if self._owns_executor and self._executor is not None: - self._executor.shutdown(wait=True) - self._executor = None + if owns_executor: + try: + self._executor.shutdown(wait=True) + finally: + self._executor = None @property def executor(self): diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py new file mode 100644 index 000000000..1debd771f --- /dev/null +++ b/taskflow/engines/base.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import abc + + +class EngineBase(object): + __metaclass__ = abc.ABCMeta + + def __init__(self, flow, flow_detail, backend, conf): + self._flow = flow + self.storage = self._storage_cls(flow_detail, backend) + + @abc.abstractproperty + def _storage_cls(self): + """Storage class""" + + @abc.abstractmethod + def compile(self): + """Check the flow and convert it to internal representation""" + + @abc.abstractmethod + def run(self): + """Run the flow""" diff --git a/taskflow/engines/helpers.py b/taskflow/engines/helpers.py new file mode 100644 index 000000000..6ef6dc984 --- /dev/null +++ b/taskflow/engines/helpers.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six +import stevedore.driver + +from taskflow.persistence import backends as p_backends +from taskflow.utils import persistence_utils as p_utils + + +# NOTE(imelnikov): this is the entrypoint namespace, not the module namespace. +ENGINES_NAMESPACE = 'taskflow.engines' + + +def load(flow, store=None, flow_detail=None, book=None, + engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE): + """Load flow into engine + + This function creates and prepares engine to run the + flow. All that is left is to run the engine with 'run()' method. + + Which engine to load is specified in 'engine_conf' parameter. It + can be a string that names engine type or a dictionary which holds + engine type (with 'engine' key) and additional engine-specific + configuration (for example, executor for multithreaded engine). + + Which storage backend to use is defined by backend parameter. It + can be backend itself, or a dictionary that is passed to + taskflow.persistence.backends.fetch to obtain backend. + + :param flow: flow to load + :param store: dict -- data to put to storage to satisfy flow requirements + :param flow_detail: FlowDetail that holds state of the flow + :param book: LogBook to create flow detail in if flow_detail is None + :param engine_conf: engine type and configuration configuration + :param backend: storage backend to use or configuration + :param namespace: driver namespace for stevedore (default is fine + if you don't know what is it) + :returns: engine + """ + + if engine_conf is None: + engine_conf = {'engine': 'default'} + + # NOTE(imelnikov): this allows simpler syntax + if isinstance(engine_conf, six.string_types): + engine_conf = {'engine': engine_conf} + + if isinstance(backend, dict): + backend = p_backends.fetch(backend) + + if flow_detail is None: + if book is None: + _lb, flow_detail = p_utils.temporary_flow_detail(backend) + else: + flow_detail = p_utils.create_flow_detail(flow, book=book, + backend=backend) + + mgr = stevedore.driver.DriverManager( + namespace, engine_conf['engine'], + invoke_on_load=True, + invoke_kwds={ + 'conf': engine_conf.copy(), + 'flow': flow, + 'flow_detail': flow_detail, + 'backend': backend + }) + engine = mgr.driver + if store: + engine.storage.inject(store) + return engine + + +def run(flow, store=None, engine_conf=None, backend=None): + """Run the flow + + This function load the flow into engine (with 'load' function) + and runs the engine. + + Which engine to load is specified in 'engine_conf' parameter. It + can be a string that names engine type or a dictionary which holds + engine type (with 'engine' key) and additional engine-specific + configuration (for example, executor for multithreaded engine). + + Which storage backend to use is defined by backend parameter. It + can be backend itself, or a dictionary that is passed to + taskflow.persistence.backends.fetch to obtain backend. + + :param flow: flow to load + :param store: dict -- data to put to storage to satisfy flow requirements + :param engine_conf: engine type and configuration configuration + :param backend: storage backend to use or configuration + :returns: dictionary of all named task results (see Storage.fetch_all) + """ + engine = load(flow, store=store, engine_conf=engine_conf, backend=backend) + engine.run() + return engine.storage.fetch_all() diff --git a/taskflow/examples/calculate_in_parallel.py b/taskflow/examples/calculate_in_parallel.py index 7513e3471..7b8f8a387 100644 --- a/taskflow/examples/calculate_in_parallel.py +++ b/taskflow/examples/calculate_in_parallel.py @@ -4,11 +4,13 @@ import sys logging.basicConfig(level=logging.ERROR) -my_dir_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), - os.pardir)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +import taskflow.engines -from taskflow.engines.action_engine import engine as eng from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf from taskflow import task @@ -20,7 +22,6 @@ from taskflow import task class Provider(task.Task): - def __init__(self, name, *args, **kwargs): super(Provider, self).__init__(name=name, **kwargs) self._provide = args @@ -30,11 +31,6 @@ class Provider(task.Task): class Adder(task.Task): - - def __init__(self, name, provides, rebind): - super(Adder, self).__init__(name=name, provides=provides, - rebind=rebind) - def execute(self, x, y): return x + y @@ -52,7 +48,6 @@ flow = lf.Flow('root').add( # r = z1+z2 = 18 Adder(name="sum-1", provides='r', rebind=['z1', 'z2'])) -engine = eng.MultiThreadedActionEngine(flow) -engine.run() -print engine.storage.fetch_all() +result = taskflow.engines.run(flow, engine_conf='parallel') +print result diff --git a/taskflow/examples/calculate_linear.py b/taskflow/examples/calculate_linear.py index ca946f6e1..d5cb24c14 100644 --- a/taskflow/examples/calculate_linear.py +++ b/taskflow/examples/calculate_linear.py @@ -4,11 +4,12 @@ import sys logging.basicConfig(level=logging.ERROR) -my_dir_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), - os.pardir)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) -from taskflow.engines.action_engine import engine as eng +import taskflow.engines from taskflow.patterns import linear_flow as lf from taskflow import task @@ -62,7 +63,5 @@ flow = lf.Flow('root').add( Multiplier("multi", 3, provides='r', rebind={'z': 'a'}) ) -engine = eng.SingleThreadedActionEngine(flow) -engine.run() - -print engine.storage.fetch_all() +results = taskflow.engines.run(flow) +print results diff --git a/taskflow/examples/complex_graph.py b/taskflow/examples/complex_graph.py index fb31ab299..9dc9126d3 100644 --- a/taskflow/examples/complex_graph.py +++ b/taskflow/examples/complex_graph.py @@ -7,11 +7,13 @@ import sys logging.basicConfig(level=logging.ERROR) -my_dir_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), - os.pardir)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) -from taskflow.engines.action_engine import engine as eng + +import taskflow.engines from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow import task @@ -54,7 +56,6 @@ def trash(**kwargs): def startup(**kwargs): - pass # TODO(harlowja): try triggering reversion here! # raise ValueError("Car not verified") return True @@ -95,11 +96,7 @@ flow = lf.Flow("make-auto").add( 'windows_installed', 'wheels_installed'])) -engine = eng.SingleThreadedActionEngine(flow) -engine.notifier.register('*', flow_watch) -engine.task_notifier.register('*', task_watch) - -engine.storage.inject({'spec': { +spec = { "frame": 'steel', "engine": 'honda', "doors": '2', @@ -108,28 +105,25 @@ engine.storage.inject({'spec': { "doors_installed": True, "windows_installed": True, "wheels_installed": True, -}}) +} -print "Build a car" -engine.run() -engine = eng.SingleThreadedActionEngine(flow) +engine = taskflow.engines.load(flow, store={'spec': spec.copy()}) engine.notifier.register('*', flow_watch) engine.task_notifier.register('*', task_watch) -engine.storage.inject({'spec': { - "frame": 'steel', - "engine": 'honda', - "doors": '5', - "wheels": '4', - "engine_installed": True, - "doors_installed": True, - "windows_installed": True, - "wheels_installed": True, -}}) +print("Build a car") +engine.run() + + +spec['doors'] = 5 + +engine = taskflow.engines.load(flow, store={'spec': spec.copy()}) +engine.notifier.register('*', flow_watch) +engine.task_notifier.register('*', task_watch) try: - print "Build a wrong car that doesn't match specification" + print("Build a wrong car that doesn't match specification") engine.run() except Exception as e: - print e + print("Flow failed: %s" % e) diff --git a/taskflow/examples/fake_boot_vm.py b/taskflow/examples/fake_boot_vm.py index e14e63a6f..eba733417 100644 --- a/taskflow/examples/fake_boot_vm.py +++ b/taskflow/examples/fake_boot_vm.py @@ -8,11 +8,13 @@ import uuid logging.basicConfig(level=logging.ERROR) -my_dir_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), - os.pardir)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) -from taskflow.engines.action_engine import engine as eng + +import taskflow.engines from taskflow.patterns import graph_flow as gf from taskflow import task @@ -141,14 +143,6 @@ flow = gf.Flow("Boot-Fake-Vm").add( ScheduleVM(), BootVM()) -engine = eng.SingleThreadedActionEngine(flow) - -# Get notified of the state changes the flow is going through. -engine.notifier.register('*', flow_notify) - -# Get notified of the state changes the flows tasks/runners are going through. -engine.task_notifier.register('*', task_notify) - # Simulates what nova/glance/keystone... calls a context context = { 'user_id': 'xyz', @@ -157,7 +151,15 @@ context = { } context = Context(**context) -engine.storage.inject({'context': context}) +# Load the flow +engine = taskflow.engines.load(flow, store={'context': context}) + +# Get notified of the state changes the flow is going through. +engine.notifier.register('*', flow_notify) + +# Get notified of the state changes the flows tasks/runners are going through. +engine.task_notifier.register('*', task_notify) + print '-' * 7 print 'Running' diff --git a/taskflow/examples/graph_flow.py b/taskflow/examples/graph_flow.py index 5c2b68f41..56041b266 100644 --- a/taskflow/examples/graph_flow.py +++ b/taskflow/examples/graph_flow.py @@ -4,11 +4,12 @@ import sys logging.basicConfig(level=logging.ERROR) -my_dir_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), - os.pardir)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) -from taskflow.engines.action_engine import engine as eng +import taskflow.engines from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow import task @@ -46,30 +47,20 @@ flow = gf.Flow('root').add( # x7 = x6+x6 = 82 Adder("add7", provides='x7', rebind=['x6', 'x6'])) -single_threaded_engine = eng.SingleThreadedActionEngine(flow) -single_threaded_engine.storage.inject({ +store = { "y1": 1, "y2": 3, "y3": 5, "y4": 7, "y5": 9, -}) +} -single_threaded_engine.run() +result = taskflow.engines.run( + flow, engine_conf='serial', store=store) -print ("Single threaded engine result %s" % - single_threaded_engine.storage.fetch_all()) +print("Single threaded engine result %s" % result) -multi_threaded_engine = eng.MultiThreadedActionEngine(flow) -multi_threaded_engine.storage.inject({ - "y1": 1, - "y2": 3, - "y3": 5, - "y4": 7, - "y5": 9, -}) +result = taskflow.engines.run( + flow, engine_conf='parallel', store=store) -multi_threaded_engine.run() - -print ("Multi threaded engine result %s" % - multi_threaded_engine.storage.fetch_all()) +print("Multi threaded engine result %s" % result) diff --git a/taskflow/examples/reverting_linear.py b/taskflow/examples/reverting_linear.py index b16209ab3..b1f45a2c3 100644 --- a/taskflow/examples/reverting_linear.py +++ b/taskflow/examples/reverting_linear.py @@ -4,11 +4,13 @@ import sys logging.basicConfig(level=logging.ERROR) -my_dir_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), - os.pardir)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +import taskflow.engines -from taskflow.engines.action_engine import engine as eng from taskflow.patterns import linear_flow as lf from taskflow import task @@ -43,15 +45,10 @@ flow = lf.Flow('simple-linear').add( CallJoe(), CallSuzzie() ) -engine = eng.SingleThreadedActionEngine(flow) - -engine.storage.inject({ - "joe_number": 444, - "jim_number": 555, - "suzzie_number": 666 -}) try: - engine.run() + taskflow.engines.run(flow, store=dict(joe_number=444, + jim_number=555, + suzzie_number=666)) except Exception as e: print "Flow failed: %r" % e diff --git a/taskflow/examples/simple_linear.py b/taskflow/examples/simple_linear.py index bde8c6477..fb2a7c12b 100644 --- a/taskflow/examples/simple_linear.py +++ b/taskflow/examples/simple_linear.py @@ -4,11 +4,12 @@ import sys logging.basicConfig(level=logging.ERROR) -my_dir_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), - os.pardir)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) -from taskflow.engines.action_engine import engine as eng +import taskflow.engines from taskflow.patterns import linear_flow as lf from taskflow import task @@ -30,16 +31,11 @@ class CallJoe(task.Task): def execute(self, joe_number, *args, **kwargs): print("Calling joe %s." % joe_number) + flow = lf.Flow('simple-linear').add( CallJim(), CallJoe() ) -engine = eng.SingleThreadedActionEngine(flow) - -engine.storage.inject({ - "joe_number": 444, - "jim_number": 555, -}) - -engine.run() +taskflow.engines.run(flow, store=dict(joe_number=444, + jim_number=555)) diff --git a/taskflow/examples/simple_linear_listening.py b/taskflow/examples/simple_linear_listening.py index 090b4d7f6..0fbe261e0 100644 --- a/taskflow/examples/simple_linear_listening.py +++ b/taskflow/examples/simple_linear_listening.py @@ -4,11 +4,13 @@ import sys logging.basicConfig(level=logging.ERROR) -my_dir_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), - os.pardir)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +import taskflow.engines -from taskflow.engines.action_engine import engine as eng from taskflow.patterns import linear_flow as lf from taskflow import task @@ -35,13 +37,13 @@ flow = lf.Flow("Call-them") flow.add(task.FunctorTask(execute=call_jim)) flow.add(task.FunctorTask(execute=call_joe)) -engine = eng.SingleThreadedActionEngine(flow) +engine = taskflow.engines.load(flow, store={ + 'context': { + "joe_number": 444, + "jim_number": 555, + } +}) + engine.notifier.register('*', flow_watch) engine.task_notifier.register('*', task_watch) - -context = { - "joe_number": 444, - "jim_number": 555, -} -engine.storage.inject({'context': context}) engine.run() diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index 900df5f00..6f2e053ff 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -26,6 +26,8 @@ from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf +import taskflow.engines + from taskflow.engines.action_engine import engine as eng from taskflow import exceptions as exc from taskflow.persistence.backends import impl_memory @@ -139,14 +141,12 @@ class EngineTestBase(object): super(EngineTestBase, self).setUp() self.values = [] self.backend = impl_memory.MemoryBackend(conf={}) - self.book = p_utils.temporary_log_book(self.backend) def tearDown(self): super(EngineTestBase, self).tearDown() with contextlib.closing(self.backend) as be: with contextlib.closing(be.get_connection()) as conn: conn.clear_all() - self.book = None def _make_engine(self, flow, flow_detail=None): raise NotImplementedError() @@ -650,11 +650,18 @@ class SingleThreadedEngineTest(EngineTaskTest, SuspendFlowTest, test.TestCase): def _make_engine(self, flow, flow_detail=None): - if flow_detail is None: - flow_detail = p_utils.create_flow_detail(flow, self.book, - self.backend) - return eng.SingleThreadedActionEngine(flow, backend=self.backend, - flow_detail=flow_detail) + return taskflow.engines.load(flow, + flow_detail=flow_detail, + engine_conf='serial', + backend=self.backend) + + def test_correct_load(self): + engine = self._make_engine(TestTask) + self.assertIsInstance(engine, eng.SingleThreadedActionEngine) + + def test_singlethreaded_is_the_default(self): + engine = taskflow.engines.load(TestTask) + self.assertIsInstance(engine, eng.SingleThreadedActionEngine) class MultiThreadedEngineTest(EngineTaskTest, @@ -664,19 +671,26 @@ class MultiThreadedEngineTest(EngineTaskTest, SuspendFlowTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): - if flow_detail is None: - flow_detail = p_utils.create_flow_detail(flow, self.book, - self.backend) - return eng.MultiThreadedActionEngine(flow, backend=self.backend, - flow_detail=flow_detail, - executor=executor) + engine_conf = dict(engine='parallel', + executor=executor) + return taskflow.engines.load(flow, flow_detail=flow_detail, + engine_conf=engine_conf, + backend=self.backend) - def test_using_common_pool(self): + def test_correct_load(self): + engine = self._make_engine(TestTask) + self.assertIsInstance(engine, eng.MultiThreadedActionEngine) + self.assertIs(engine.executor, None) + + def test_using_common_executor(self): flow = TestTask(self.values, name='task1') executor = futures.ThreadPoolExecutor(2) - e1 = self._make_engine(flow, executor=executor) - e2 = self._make_engine(flow, executor=executor) - self.assertIs(e1.executor, e2.executor) + try: + e1 = self._make_engine(flow, executor=executor) + e2 = self._make_engine(flow, executor=executor) + self.assertIs(e1.executor, e2.executor) + finally: + executor.shutdown(wait=True) def test_parallel_revert_specific(self): flow = uf.Flow('p-r-r').add( diff --git a/taskflow/tests/unit/test_functor_task.py b/taskflow/tests/unit/test_functor_task.py index aa02f960e..7e2f28f1e 100644 --- a/taskflow/tests/unit/test_functor_task.py +++ b/taskflow/tests/unit/test_functor_task.py @@ -16,18 +16,13 @@ # License for the specific language governing permissions and limitations # under the License. -from taskflow.engines.action_engine import engine as eng +import taskflow.engines + from taskflow.patterns import linear_flow from taskflow import task as base from taskflow import test -def _make_engine(flow): - e = eng.SingleThreadedActionEngine(flow) - e.compile() - return e - - def add(a, b): return a + b @@ -69,5 +64,5 @@ class FunctorTaskTest(test.TestCase): t(bof.run_fail) ) with self.assertRaisesRegexp(RuntimeError, '^Woot'): - _make_engine(flow).run() + taskflow.engines.run(flow) self.assertEquals(values, ['one', 'fail', 'revert one']) diff --git a/taskflow/tests/unit/test_linear_flow.py b/taskflow/tests/unit/test_linear_flow.py index e83a21292..74e29d391 100644 --- a/taskflow/tests/unit/test_linear_flow.py +++ b/taskflow/tests/unit/test_linear_flow.py @@ -18,7 +18,7 @@ import collections -from taskflow.engines.action_engine import engine as eng +import taskflow.engines from taskflow import exceptions as exc from taskflow.patterns import linear_flow as lw from taskflow import states @@ -30,10 +30,7 @@ from taskflow.tests import utils class LinearFlowTest(test.TestCase): def _make_engine(self, flow): - e = eng.SingleThreadedActionEngine(flow) - e.storage.inject([('context', {})]) - e.compile() - return e + return taskflow.engines.load(flow, store={'context': {}}) def test_result_access(self): diff --git a/taskflow/tests/unit/test_progress.py b/taskflow/tests/unit/test_progress.py index 56d479d88..2b97f4e08 100644 --- a/taskflow/tests/unit/test_progress.py +++ b/taskflow/tests/unit/test_progress.py @@ -21,7 +21,7 @@ import contextlib from taskflow import task from taskflow import test -from taskflow.engines.action_engine import engine as eng +import taskflow.engines from taskflow.patterns import linear_flow as lf from taskflow.persistence.backends import impl_memory from taskflow.utils import persistence_utils as p_utils @@ -41,8 +41,10 @@ class ProgressTask(task.Task): class TestProgress(test.TestCase): - def _make_engine(self, flow): - e = eng.SingleThreadedActionEngine(flow) + def _make_engine(self, flow, flow_detail=None, backend=None): + e = taskflow.engines.load(flow, + flow_detail=flow_detail, + backend=backend) e.compile() return e @@ -91,9 +93,7 @@ class TestProgress(test.TestCase): flo = lf.Flow("test") flo.add(ProgressTask("test", 3)) b, fd = p_utils.temporary_flow_detail(be) - e = eng.SingleThreadedActionEngine(flo, - book=b, flow_detail=fd, - backend=be) + e = self._make_engine(flo, flow_detail=fd, backend=be) e.run() t_uuid = e.storage.get_uuid_by_name("test") end_progress = e.storage.get_task_progress(t_uuid) @@ -113,9 +113,7 @@ class TestProgress(test.TestCase): flo = lf.Flow("test") flo.add(t) b, fd = p_utils.temporary_flow_detail(be) - e = eng.SingleThreadedActionEngine(flo, - book=b, flow_detail=fd, - backend=be) + e = self._make_engine(flo, flow_detail=fd, backend=be) e.run() t_uuid = e.storage.get_uuid_by_name("test") diff --git a/taskflow/tests/unit/test_unordered_flow.py b/taskflow/tests/unit/test_unordered_flow.py index 352c3628f..f3b6b081f 100644 --- a/taskflow/tests/unit/test_unordered_flow.py +++ b/taskflow/tests/unit/test_unordered_flow.py @@ -16,7 +16,8 @@ # License for the specific language governing permissions and limitations # under the License. -from taskflow.engines.action_engine import engine as eng +import taskflow.engines + from taskflow.patterns import unordered_flow as uf from taskflow import task from taskflow import test @@ -26,10 +27,7 @@ from taskflow.tests import utils class UnorderedFlowTest(test.TestCase): def _make_engine(self, flow): - e = eng.SingleThreadedActionEngine(flow) - e.storage.inject([('context', {})]) - e.compile() - return e + return taskflow.engines.load(flow, store={'context': {}}) def test_result_access(self): diff --git a/taskflow/utils/persistence_utils.py b/taskflow/utils/persistence_utils.py index fa6f5f991..8f85050ef 100644 --- a/taskflow/utils/persistence_utils.py +++ b/taskflow/utils/persistence_utils.py @@ -26,19 +26,20 @@ from taskflow.persistence import logbook LOG = logging.getLogger(__name__) -def temporary_log_book(backend): +def temporary_log_book(backend=None): """Creates a temporary logbook for temporary usage in the given backend. Mainly useful for tests and other use cases where a temporary logbook is needed for a short-period of time. """ book = logbook.LogBook('tmp') - with contextlib.closing(backend.get_connection()) as conn: - conn.save_logbook(book) - return book + if backend is not None: + with contextlib.closing(backend.get_connection()) as conn: + conn.save_logbook(book) + return book -def temporary_flow_detail(backend): +def temporary_flow_detail(backend=None): """Creates a temporary flow detail and logbook for temporary usage in the given backend. @@ -47,12 +48,13 @@ def temporary_flow_detail(backend): """ flow_id = uuidutils.generate_uuid() book = temporary_log_book(backend) - with contextlib.closing(backend.get_connection()) as conn: - book.add(logbook.FlowDetail(name='tmp-flow-detail', uuid=flow_id)) - conn.save_logbook(book) - # Return the one from the saved logbook instead of the local one so - # that the freshest version is given back. - return (book, book.find(flow_id)) + book.add(logbook.FlowDetail(name='tmp-flow-detail', uuid=flow_id)) + if backend is not None: + with contextlib.closing(backend.get_connection()) as conn: + conn.save_logbook(book) + # Return the one from the saved logbook instead of the local one so + # that the freshest version is given back. + return book, book.find(flow_id) def create_flow_detail(flow, book=None, backend=None): diff --git a/taskflow/utils/threading_utils.py b/taskflow/utils/threading_utils.py index b8e40f276..38a093482 100644 --- a/taskflow/utils/threading_utils.py +++ b/taskflow/utils/threading_utils.py @@ -17,6 +17,7 @@ # under the License. import logging +import multiprocessing import threading import time import types @@ -44,6 +45,17 @@ def await(check_functor, timeout=None): return True +def get_optimal_thread_count(): + """Try to guess optimal thread count for current system.""" + try: + return multiprocessing.cpu_count() + 1 + except NotImplementedError: + # NOTE(harlowja): apparently may raise so in this case we will + # just setup two threads since its hard to know what else we + # should do in this situation. + return 2 + + class MultiLock(object): """A class which can attempt to obtain many locks at once and release said locks when exiting.