From cde0dee14e0e93018f5d9a5a32a98623ec29bbc4 Mon Sep 17 00:00:00 2001 From: "Ivan A. Melnikov" Date: Fri, 13 Sep 2013 12:22:27 +0400 Subject: [PATCH] Simpler API to load flows into engines Previously to run a flow client code had to put together the flow, an engine, logbook, flowdetail, and storage backend. This commit adds two helper functions, run() and load(), so that simplest usecase now looks like taskflow.engines.run(flow) Client code may also provide configuration for storage and engine if needed, but if not needed it just works with defaults. Engines are loaded via stevedore, as drivers in 'taskflow.engines' backend. Now three entry points are defined in that namespace: - 'default', for SingleThreadedActionEngine, used by default; - 'serial', as another synonym for SingleThreadedActionEngine; - 'parallel', for MultiThreadedActionEngine. Closes-bug: #1224726 Change-Id: I7f4cb5c8ff7f5f12831ddd0952c202d2fd8cd6ef --- setup.cfg | 5 + taskflow/engines/__init__.py | 5 + taskflow/engines/action_engine/engine.py | 72 ++++-------- taskflow/engines/base.py | 40 +++++++ taskflow/engines/helpers.py | 112 +++++++++++++++++++ taskflow/examples/calculate_in_parallel.py | 21 ++-- taskflow/examples/calculate_linear.py | 15 ++- taskflow/examples/complex_graph.py | 46 ++++---- taskflow/examples/fake_boot_vm.py | 28 ++--- taskflow/examples/graph_flow.py | 35 +++--- taskflow/examples/reverting_linear.py | 21 ++-- taskflow/examples/simple_linear.py | 20 ++-- taskflow/examples/simple_linear_listening.py | 24 ++-- taskflow/tests/unit/test_action_engine.py | 48 +++++--- taskflow/tests/unit/test_functor_task.py | 11 +- taskflow/tests/unit/test_linear_flow.py | 7 +- taskflow/tests/unit/test_progress.py | 16 ++- taskflow/tests/unit/test_unordered_flow.py | 8 +- taskflow/utils/persistence_utils.py | 24 ++-- taskflow/utils/threading_utils.py | 12 ++ 20 files changed, 349 insertions(+), 221 deletions(-) create mode 100644 taskflow/engines/base.py create mode 100644 taskflow/engines/helpers.py diff --git a/setup.cfg b/setup.cfg index 378ff7412..4e485a5e0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -36,6 +36,11 @@ taskflow.persistence = postgresql = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend sqlite = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend +taskflow.engines = + default = taskflow.engines.action_engine.engine:SingleThreadedActionEngine + serial = taskflow.engines.action_engine.engine:SingleThreadedActionEngine + parallel = taskflow.engines.action_engine.engine:MultiThreadedActionEngine + [nosetests] cover-erase = true verbosity = 2 diff --git a/taskflow/engines/__init__.py b/taskflow/engines/__init__.py index 830dd2e7c..bdec4e4cc 100644 --- a/taskflow/engines/__init__.py +++ b/taskflow/engines/__init__.py @@ -15,3 +15,8 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + + +# promote helpers to this module namespace +from taskflow.engines.helpers import load # noqa +from taskflow.engines.helpers import run # noqa diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 562e82d37..fdd01c063 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -16,13 +16,13 @@ # License for the specific language governing permissions and limitations # under the License. -import multiprocessing import threading from concurrent import futures from taskflow.engines.action_engine import graph_action from taskflow.engines.action_engine import task_action +from taskflow.engines import base from taskflow import decorators from taskflow import exceptions as exc @@ -31,25 +31,24 @@ from taskflow import storage as t_storage from taskflow.utils import flow_utils from taskflow.utils import misc -from taskflow.utils import persistence_utils as p_utils +from taskflow.utils import threading_utils -class ActionEngine(object): +class ActionEngine(base.EngineBase): """Generic action-based engine. Converts the flow to recursive structure of actions. """ _graph_action = None - def __init__(self, flow, storage): + def __init__(self, flow, flow_detail, backend, conf): + super(ActionEngine, self).__init__(flow, flow_detail, backend, conf) self._failures = [] self._root = None - self._flow = flow self._lock = threading.RLock() self._state_lock = threading.RLock() self.notifier = misc.TransitionNotifier() self.task_notifier = misc.TransitionNotifier() - self.storage = storage def _revert(self, current_failure): self._change_state(states.REVERTING) @@ -145,62 +144,37 @@ class ActionEngine(object): class SingleThreadedActionEngine(ActionEngine): # This one attempts to run in a serial manner. _graph_action = graph_action.SequentialGraphAction - - def __init__(self, flow, flow_detail=None, book=None, backend=None): - if flow_detail is None: - flow_detail = p_utils.create_flow_detail(flow, - book=book, - backend=backend) - ActionEngine.__init__(self, flow, - storage=t_storage.Storage(flow_detail, backend)) + _storage_cls = t_storage.Storage class MultiThreadedActionEngine(ActionEngine): # This one attempts to run in a parallel manner. _graph_action = graph_action.ParallelGraphAction + _storage_cls = t_storage.ThreadSafeStorage - def __init__(self, flow, flow_detail=None, book=None, backend=None, - executor=None): - if flow_detail is None: - flow_detail = p_utils.create_flow_detail(flow, - book=book, - backend=backend) - ActionEngine.__init__(self, flow, - storage=t_storage.ThreadSafeStorage(flow_detail, - backend)) - if executor is not None: - self._executor = executor - self._owns_executor = False - self._thread_count = -1 - else: - self._executor = None - self._owns_executor = True - # TODO(harlowja): allow this to be configurable?? - try: - self._thread_count = multiprocessing.cpu_count() + 1 - except NotImplementedError: - # NOTE(harlowja): apparently may raise so in this case we will - # just setup two threads since its hard to know what else we - # should do in this situation. - self._thread_count = 2 + def __init__(self, flow, flow_detail, backend, conf): + super(MultiThreadedActionEngine, self).__init__( + flow, flow_detail, backend, conf) + self._executor = conf.get('executor', None) @decorators.locked def run(self): - if self._owns_executor: - if self._executor is not None: - # The previous shutdown failed, something is very wrong. - raise exc.InvalidStateException("The previous shutdown() of" - " the executor powering this" - " engine failed. Something is" - " very very wrong.") - self._executor = futures.ThreadPoolExecutor(self._thread_count) + if self._executor is None: + self._executor = futures.ThreadPoolExecutor( + threading_utils.get_optimal_thread_count()) + owns_executor = True + else: + owns_executor = False + try: ActionEngine.run(self) finally: # Don't forget to shutdown the executor!! - if self._owns_executor and self._executor is not None: - self._executor.shutdown(wait=True) - self._executor = None + if owns_executor: + try: + self._executor.shutdown(wait=True) + finally: + self._executor = None @property def executor(self): diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py new file mode 100644 index 000000000..1debd771f --- /dev/null +++ b/taskflow/engines/base.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import abc + + +class EngineBase(object): + __metaclass__ = abc.ABCMeta + + def __init__(self, flow, flow_detail, backend, conf): + self._flow = flow + self.storage = self._storage_cls(flow_detail, backend) + + @abc.abstractproperty + def _storage_cls(self): + """Storage class""" + + @abc.abstractmethod + def compile(self): + """Check the flow and convert it to internal representation""" + + @abc.abstractmethod + def run(self): + """Run the flow""" diff --git a/taskflow/engines/helpers.py b/taskflow/engines/helpers.py new file mode 100644 index 000000000..6ef6dc984 --- /dev/null +++ b/taskflow/engines/helpers.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six +import stevedore.driver + +from taskflow.persistence import backends as p_backends +from taskflow.utils import persistence_utils as p_utils + + +# NOTE(imelnikov): this is the entrypoint namespace, not the module namespace. +ENGINES_NAMESPACE = 'taskflow.engines' + + +def load(flow, store=None, flow_detail=None, book=None, + engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE): + """Load flow into engine + + This function creates and prepares engine to run the + flow. All that is left is to run the engine with 'run()' method. + + Which engine to load is specified in 'engine_conf' parameter. It + can be a string that names engine type or a dictionary which holds + engine type (with 'engine' key) and additional engine-specific + configuration (for example, executor for multithreaded engine). + + Which storage backend to use is defined by backend parameter. It + can be backend itself, or a dictionary that is passed to + taskflow.persistence.backends.fetch to obtain backend. + + :param flow: flow to load + :param store: dict -- data to put to storage to satisfy flow requirements + :param flow_detail: FlowDetail that holds state of the flow + :param book: LogBook to create flow detail in if flow_detail is None + :param engine_conf: engine type and configuration configuration + :param backend: storage backend to use or configuration + :param namespace: driver namespace for stevedore (default is fine + if you don't know what is it) + :returns: engine + """ + + if engine_conf is None: + engine_conf = {'engine': 'default'} + + # NOTE(imelnikov): this allows simpler syntax + if isinstance(engine_conf, six.string_types): + engine_conf = {'engine': engine_conf} + + if isinstance(backend, dict): + backend = p_backends.fetch(backend) + + if flow_detail is None: + if book is None: + _lb, flow_detail = p_utils.temporary_flow_detail(backend) + else: + flow_detail = p_utils.create_flow_detail(flow, book=book, + backend=backend) + + mgr = stevedore.driver.DriverManager( + namespace, engine_conf['engine'], + invoke_on_load=True, + invoke_kwds={ + 'conf': engine_conf.copy(), + 'flow': flow, + 'flow_detail': flow_detail, + 'backend': backend + }) + engine = mgr.driver + if store: + engine.storage.inject(store) + return engine + + +def run(flow, store=None, engine_conf=None, backend=None): + """Run the flow + + This function load the flow into engine (with 'load' function) + and runs the engine. + + Which engine to load is specified in 'engine_conf' parameter. It + can be a string that names engine type or a dictionary which holds + engine type (with 'engine' key) and additional engine-specific + configuration (for example, executor for multithreaded engine). + + Which storage backend to use is defined by backend parameter. It + can be backend itself, or a dictionary that is passed to + taskflow.persistence.backends.fetch to obtain backend. + + :param flow: flow to load + :param store: dict -- data to put to storage to satisfy flow requirements + :param engine_conf: engine type and configuration configuration + :param backend: storage backend to use or configuration + :returns: dictionary of all named task results (see Storage.fetch_all) + """ + engine = load(flow, store=store, engine_conf=engine_conf, backend=backend) + engine.run() + return engine.storage.fetch_all() diff --git a/taskflow/examples/calculate_in_parallel.py b/taskflow/examples/calculate_in_parallel.py index 7513e3471..7b8f8a387 100644 --- a/taskflow/examples/calculate_in_parallel.py +++ b/taskflow/examples/calculate_in_parallel.py @@ -4,11 +4,13 @@ import sys logging.basicConfig(level=logging.ERROR) -my_dir_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), - os.pardir)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +import taskflow.engines -from taskflow.engines.action_engine import engine as eng from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf from taskflow import task @@ -20,7 +22,6 @@ from taskflow import task class Provider(task.Task): - def __init__(self, name, *args, **kwargs): super(Provider, self).__init__(name=name, **kwargs) self._provide = args @@ -30,11 +31,6 @@ class Provider(task.Task): class Adder(task.Task): - - def __init__(self, name, provides, rebind): - super(Adder, self).__init__(name=name, provides=provides, - rebind=rebind) - def execute(self, x, y): return x + y @@ -52,7 +48,6 @@ flow = lf.Flow('root').add( # r = z1+z2 = 18 Adder(name="sum-1", provides='r', rebind=['z1', 'z2'])) -engine = eng.MultiThreadedActionEngine(flow) -engine.run() -print engine.storage.fetch_all() +result = taskflow.engines.run(flow, engine_conf='parallel') +print result diff --git a/taskflow/examples/calculate_linear.py b/taskflow/examples/calculate_linear.py index ca946f6e1..d5cb24c14 100644 --- a/taskflow/examples/calculate_linear.py +++ b/taskflow/examples/calculate_linear.py @@ -4,11 +4,12 @@ import sys logging.basicConfig(level=logging.ERROR) -my_dir_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), - os.pardir)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) -from taskflow.engines.action_engine import engine as eng +import taskflow.engines from taskflow.patterns import linear_flow as lf from taskflow import task @@ -62,7 +63,5 @@ flow = lf.Flow('root').add( Multiplier("multi", 3, provides='r', rebind={'z': 'a'}) ) -engine = eng.SingleThreadedActionEngine(flow) -engine.run() - -print engine.storage.fetch_all() +results = taskflow.engines.run(flow) +print results diff --git a/taskflow/examples/complex_graph.py b/taskflow/examples/complex_graph.py index fb31ab299..9dc9126d3 100644 --- a/taskflow/examples/complex_graph.py +++ b/taskflow/examples/complex_graph.py @@ -7,11 +7,13 @@ import sys logging.basicConfig(level=logging.ERROR) -my_dir_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), - os.pardir)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) -from taskflow.engines.action_engine import engine as eng + +import taskflow.engines from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow import task @@ -54,7 +56,6 @@ def trash(**kwargs): def startup(**kwargs): - pass # TODO(harlowja): try triggering reversion here! # raise ValueError("Car not verified") return True @@ -95,11 +96,7 @@ flow = lf.Flow("make-auto").add( 'windows_installed', 'wheels_installed'])) -engine = eng.SingleThreadedActionEngine(flow) -engine.notifier.register('*', flow_watch) -engine.task_notifier.register('*', task_watch) - -engine.storage.inject({'spec': { +spec = { "frame": 'steel', "engine": 'honda', "doors": '2', @@ -108,28 +105,25 @@ engine.storage.inject({'spec': { "doors_installed": True, "windows_installed": True, "wheels_installed": True, -}}) +} -print "Build a car" -engine.run() -engine = eng.SingleThreadedActionEngine(flow) +engine = taskflow.engines.load(flow, store={'spec': spec.copy()}) engine.notifier.register('*', flow_watch) engine.task_notifier.register('*', task_watch) -engine.storage.inject({'spec': { - "frame": 'steel', - "engine": 'honda', - "doors": '5', - "wheels": '4', - "engine_installed": True, - "doors_installed": True, - "windows_installed": True, - "wheels_installed": True, -}}) +print("Build a car") +engine.run() + + +spec['doors'] = 5 + +engine = taskflow.engines.load(flow, store={'spec': spec.copy()}) +engine.notifier.register('*', flow_watch) +engine.task_notifier.register('*', task_watch) try: - print "Build a wrong car that doesn't match specification" + print("Build a wrong car that doesn't match specification") engine.run() except Exception as e: - print e + print("Flow failed: %s" % e) diff --git a/taskflow/examples/fake_boot_vm.py b/taskflow/examples/fake_boot_vm.py index e14e63a6f..eba733417 100644 --- a/taskflow/examples/fake_boot_vm.py +++ b/taskflow/examples/fake_boot_vm.py @@ -8,11 +8,13 @@ import uuid logging.basicConfig(level=logging.ERROR) -my_dir_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), - os.pardir)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) -from taskflow.engines.action_engine import engine as eng + +import taskflow.engines from taskflow.patterns import graph_flow as gf from taskflow import task @@ -141,14 +143,6 @@ flow = gf.Flow("Boot-Fake-Vm").add( ScheduleVM(), BootVM()) -engine = eng.SingleThreadedActionEngine(flow) - -# Get notified of the state changes the flow is going through. -engine.notifier.register('*', flow_notify) - -# Get notified of the state changes the flows tasks/runners are going through. -engine.task_notifier.register('*', task_notify) - # Simulates what nova/glance/keystone... calls a context context = { 'user_id': 'xyz', @@ -157,7 +151,15 @@ context = { } context = Context(**context) -engine.storage.inject({'context': context}) +# Load the flow +engine = taskflow.engines.load(flow, store={'context': context}) + +# Get notified of the state changes the flow is going through. +engine.notifier.register('*', flow_notify) + +# Get notified of the state changes the flows tasks/runners are going through. +engine.task_notifier.register('*', task_notify) + print '-' * 7 print 'Running' diff --git a/taskflow/examples/graph_flow.py b/taskflow/examples/graph_flow.py index 5c2b68f41..56041b266 100644 --- a/taskflow/examples/graph_flow.py +++ b/taskflow/examples/graph_flow.py @@ -4,11 +4,12 @@ import sys logging.basicConfig(level=logging.ERROR) -my_dir_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), - os.pardir)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) -from taskflow.engines.action_engine import engine as eng +import taskflow.engines from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow import task @@ -46,30 +47,20 @@ flow = gf.Flow('root').add( # x7 = x6+x6 = 82 Adder("add7", provides='x7', rebind=['x6', 'x6'])) -single_threaded_engine = eng.SingleThreadedActionEngine(flow) -single_threaded_engine.storage.inject({ +store = { "y1": 1, "y2": 3, "y3": 5, "y4": 7, "y5": 9, -}) +} -single_threaded_engine.run() +result = taskflow.engines.run( + flow, engine_conf='serial', store=store) -print ("Single threaded engine result %s" % - single_threaded_engine.storage.fetch_all()) +print("Single threaded engine result %s" % result) -multi_threaded_engine = eng.MultiThreadedActionEngine(flow) -multi_threaded_engine.storage.inject({ - "y1": 1, - "y2": 3, - "y3": 5, - "y4": 7, - "y5": 9, -}) +result = taskflow.engines.run( + flow, engine_conf='parallel', store=store) -multi_threaded_engine.run() - -print ("Multi threaded engine result %s" % - multi_threaded_engine.storage.fetch_all()) +print("Multi threaded engine result %s" % result) diff --git a/taskflow/examples/reverting_linear.py b/taskflow/examples/reverting_linear.py index b16209ab3..b1f45a2c3 100644 --- a/taskflow/examples/reverting_linear.py +++ b/taskflow/examples/reverting_linear.py @@ -4,11 +4,13 @@ import sys logging.basicConfig(level=logging.ERROR) -my_dir_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), - os.pardir)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +import taskflow.engines -from taskflow.engines.action_engine import engine as eng from taskflow.patterns import linear_flow as lf from taskflow import task @@ -43,15 +45,10 @@ flow = lf.Flow('simple-linear').add( CallJoe(), CallSuzzie() ) -engine = eng.SingleThreadedActionEngine(flow) - -engine.storage.inject({ - "joe_number": 444, - "jim_number": 555, - "suzzie_number": 666 -}) try: - engine.run() + taskflow.engines.run(flow, store=dict(joe_number=444, + jim_number=555, + suzzie_number=666)) except Exception as e: print "Flow failed: %r" % e diff --git a/taskflow/examples/simple_linear.py b/taskflow/examples/simple_linear.py index bde8c6477..fb2a7c12b 100644 --- a/taskflow/examples/simple_linear.py +++ b/taskflow/examples/simple_linear.py @@ -4,11 +4,12 @@ import sys logging.basicConfig(level=logging.ERROR) -my_dir_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), - os.pardir)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) -from taskflow.engines.action_engine import engine as eng +import taskflow.engines from taskflow.patterns import linear_flow as lf from taskflow import task @@ -30,16 +31,11 @@ class CallJoe(task.Task): def execute(self, joe_number, *args, **kwargs): print("Calling joe %s." % joe_number) + flow = lf.Flow('simple-linear').add( CallJim(), CallJoe() ) -engine = eng.SingleThreadedActionEngine(flow) - -engine.storage.inject({ - "joe_number": 444, - "jim_number": 555, -}) - -engine.run() +taskflow.engines.run(flow, store=dict(joe_number=444, + jim_number=555)) diff --git a/taskflow/examples/simple_linear_listening.py b/taskflow/examples/simple_linear_listening.py index 090b4d7f6..0fbe261e0 100644 --- a/taskflow/examples/simple_linear_listening.py +++ b/taskflow/examples/simple_linear_listening.py @@ -4,11 +4,13 @@ import sys logging.basicConfig(level=logging.ERROR) -my_dir_path = os.path.dirname(os.path.abspath(__file__)) -sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), - os.pardir)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +import taskflow.engines -from taskflow.engines.action_engine import engine as eng from taskflow.patterns import linear_flow as lf from taskflow import task @@ -35,13 +37,13 @@ flow = lf.Flow("Call-them") flow.add(task.FunctorTask(execute=call_jim)) flow.add(task.FunctorTask(execute=call_joe)) -engine = eng.SingleThreadedActionEngine(flow) +engine = taskflow.engines.load(flow, store={ + 'context': { + "joe_number": 444, + "jim_number": 555, + } +}) + engine.notifier.register('*', flow_watch) engine.task_notifier.register('*', task_watch) - -context = { - "joe_number": 444, - "jim_number": 555, -} -engine.storage.inject({'context': context}) engine.run() diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index 900df5f00..6f2e053ff 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -26,6 +26,8 @@ from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf +import taskflow.engines + from taskflow.engines.action_engine import engine as eng from taskflow import exceptions as exc from taskflow.persistence.backends import impl_memory @@ -139,14 +141,12 @@ class EngineTestBase(object): super(EngineTestBase, self).setUp() self.values = [] self.backend = impl_memory.MemoryBackend(conf={}) - self.book = p_utils.temporary_log_book(self.backend) def tearDown(self): super(EngineTestBase, self).tearDown() with contextlib.closing(self.backend) as be: with contextlib.closing(be.get_connection()) as conn: conn.clear_all() - self.book = None def _make_engine(self, flow, flow_detail=None): raise NotImplementedError() @@ -650,11 +650,18 @@ class SingleThreadedEngineTest(EngineTaskTest, SuspendFlowTest, test.TestCase): def _make_engine(self, flow, flow_detail=None): - if flow_detail is None: - flow_detail = p_utils.create_flow_detail(flow, self.book, - self.backend) - return eng.SingleThreadedActionEngine(flow, backend=self.backend, - flow_detail=flow_detail) + return taskflow.engines.load(flow, + flow_detail=flow_detail, + engine_conf='serial', + backend=self.backend) + + def test_correct_load(self): + engine = self._make_engine(TestTask) + self.assertIsInstance(engine, eng.SingleThreadedActionEngine) + + def test_singlethreaded_is_the_default(self): + engine = taskflow.engines.load(TestTask) + self.assertIsInstance(engine, eng.SingleThreadedActionEngine) class MultiThreadedEngineTest(EngineTaskTest, @@ -664,19 +671,26 @@ class MultiThreadedEngineTest(EngineTaskTest, SuspendFlowTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): - if flow_detail is None: - flow_detail = p_utils.create_flow_detail(flow, self.book, - self.backend) - return eng.MultiThreadedActionEngine(flow, backend=self.backend, - flow_detail=flow_detail, - executor=executor) + engine_conf = dict(engine='parallel', + executor=executor) + return taskflow.engines.load(flow, flow_detail=flow_detail, + engine_conf=engine_conf, + backend=self.backend) - def test_using_common_pool(self): + def test_correct_load(self): + engine = self._make_engine(TestTask) + self.assertIsInstance(engine, eng.MultiThreadedActionEngine) + self.assertIs(engine.executor, None) + + def test_using_common_executor(self): flow = TestTask(self.values, name='task1') executor = futures.ThreadPoolExecutor(2) - e1 = self._make_engine(flow, executor=executor) - e2 = self._make_engine(flow, executor=executor) - self.assertIs(e1.executor, e2.executor) + try: + e1 = self._make_engine(flow, executor=executor) + e2 = self._make_engine(flow, executor=executor) + self.assertIs(e1.executor, e2.executor) + finally: + executor.shutdown(wait=True) def test_parallel_revert_specific(self): flow = uf.Flow('p-r-r').add( diff --git a/taskflow/tests/unit/test_functor_task.py b/taskflow/tests/unit/test_functor_task.py index aa02f960e..7e2f28f1e 100644 --- a/taskflow/tests/unit/test_functor_task.py +++ b/taskflow/tests/unit/test_functor_task.py @@ -16,18 +16,13 @@ # License for the specific language governing permissions and limitations # under the License. -from taskflow.engines.action_engine import engine as eng +import taskflow.engines + from taskflow.patterns import linear_flow from taskflow import task as base from taskflow import test -def _make_engine(flow): - e = eng.SingleThreadedActionEngine(flow) - e.compile() - return e - - def add(a, b): return a + b @@ -69,5 +64,5 @@ class FunctorTaskTest(test.TestCase): t(bof.run_fail) ) with self.assertRaisesRegexp(RuntimeError, '^Woot'): - _make_engine(flow).run() + taskflow.engines.run(flow) self.assertEquals(values, ['one', 'fail', 'revert one']) diff --git a/taskflow/tests/unit/test_linear_flow.py b/taskflow/tests/unit/test_linear_flow.py index e83a21292..74e29d391 100644 --- a/taskflow/tests/unit/test_linear_flow.py +++ b/taskflow/tests/unit/test_linear_flow.py @@ -18,7 +18,7 @@ import collections -from taskflow.engines.action_engine import engine as eng +import taskflow.engines from taskflow import exceptions as exc from taskflow.patterns import linear_flow as lw from taskflow import states @@ -30,10 +30,7 @@ from taskflow.tests import utils class LinearFlowTest(test.TestCase): def _make_engine(self, flow): - e = eng.SingleThreadedActionEngine(flow) - e.storage.inject([('context', {})]) - e.compile() - return e + return taskflow.engines.load(flow, store={'context': {}}) def test_result_access(self): diff --git a/taskflow/tests/unit/test_progress.py b/taskflow/tests/unit/test_progress.py index 56d479d88..2b97f4e08 100644 --- a/taskflow/tests/unit/test_progress.py +++ b/taskflow/tests/unit/test_progress.py @@ -21,7 +21,7 @@ import contextlib from taskflow import task from taskflow import test -from taskflow.engines.action_engine import engine as eng +import taskflow.engines from taskflow.patterns import linear_flow as lf from taskflow.persistence.backends import impl_memory from taskflow.utils import persistence_utils as p_utils @@ -41,8 +41,10 @@ class ProgressTask(task.Task): class TestProgress(test.TestCase): - def _make_engine(self, flow): - e = eng.SingleThreadedActionEngine(flow) + def _make_engine(self, flow, flow_detail=None, backend=None): + e = taskflow.engines.load(flow, + flow_detail=flow_detail, + backend=backend) e.compile() return e @@ -91,9 +93,7 @@ class TestProgress(test.TestCase): flo = lf.Flow("test") flo.add(ProgressTask("test", 3)) b, fd = p_utils.temporary_flow_detail(be) - e = eng.SingleThreadedActionEngine(flo, - book=b, flow_detail=fd, - backend=be) + e = self._make_engine(flo, flow_detail=fd, backend=be) e.run() t_uuid = e.storage.get_uuid_by_name("test") end_progress = e.storage.get_task_progress(t_uuid) @@ -113,9 +113,7 @@ class TestProgress(test.TestCase): flo = lf.Flow("test") flo.add(t) b, fd = p_utils.temporary_flow_detail(be) - e = eng.SingleThreadedActionEngine(flo, - book=b, flow_detail=fd, - backend=be) + e = self._make_engine(flo, flow_detail=fd, backend=be) e.run() t_uuid = e.storage.get_uuid_by_name("test") diff --git a/taskflow/tests/unit/test_unordered_flow.py b/taskflow/tests/unit/test_unordered_flow.py index 352c3628f..f3b6b081f 100644 --- a/taskflow/tests/unit/test_unordered_flow.py +++ b/taskflow/tests/unit/test_unordered_flow.py @@ -16,7 +16,8 @@ # License for the specific language governing permissions and limitations # under the License. -from taskflow.engines.action_engine import engine as eng +import taskflow.engines + from taskflow.patterns import unordered_flow as uf from taskflow import task from taskflow import test @@ -26,10 +27,7 @@ from taskflow.tests import utils class UnorderedFlowTest(test.TestCase): def _make_engine(self, flow): - e = eng.SingleThreadedActionEngine(flow) - e.storage.inject([('context', {})]) - e.compile() - return e + return taskflow.engines.load(flow, store={'context': {}}) def test_result_access(self): diff --git a/taskflow/utils/persistence_utils.py b/taskflow/utils/persistence_utils.py index fa6f5f991..8f85050ef 100644 --- a/taskflow/utils/persistence_utils.py +++ b/taskflow/utils/persistence_utils.py @@ -26,19 +26,20 @@ from taskflow.persistence import logbook LOG = logging.getLogger(__name__) -def temporary_log_book(backend): +def temporary_log_book(backend=None): """Creates a temporary logbook for temporary usage in the given backend. Mainly useful for tests and other use cases where a temporary logbook is needed for a short-period of time. """ book = logbook.LogBook('tmp') - with contextlib.closing(backend.get_connection()) as conn: - conn.save_logbook(book) - return book + if backend is not None: + with contextlib.closing(backend.get_connection()) as conn: + conn.save_logbook(book) + return book -def temporary_flow_detail(backend): +def temporary_flow_detail(backend=None): """Creates a temporary flow detail and logbook for temporary usage in the given backend. @@ -47,12 +48,13 @@ def temporary_flow_detail(backend): """ flow_id = uuidutils.generate_uuid() book = temporary_log_book(backend) - with contextlib.closing(backend.get_connection()) as conn: - book.add(logbook.FlowDetail(name='tmp-flow-detail', uuid=flow_id)) - conn.save_logbook(book) - # Return the one from the saved logbook instead of the local one so - # that the freshest version is given back. - return (book, book.find(flow_id)) + book.add(logbook.FlowDetail(name='tmp-flow-detail', uuid=flow_id)) + if backend is not None: + with contextlib.closing(backend.get_connection()) as conn: + conn.save_logbook(book) + # Return the one from the saved logbook instead of the local one so + # that the freshest version is given back. + return book, book.find(flow_id) def create_flow_detail(flow, book=None, backend=None): diff --git a/taskflow/utils/threading_utils.py b/taskflow/utils/threading_utils.py index b8e40f276..38a093482 100644 --- a/taskflow/utils/threading_utils.py +++ b/taskflow/utils/threading_utils.py @@ -17,6 +17,7 @@ # under the License. import logging +import multiprocessing import threading import time import types @@ -44,6 +45,17 @@ def await(check_functor, timeout=None): return True +def get_optimal_thread_count(): + """Try to guess optimal thread count for current system.""" + try: + return multiprocessing.cpu_count() + 1 + except NotImplementedError: + # NOTE(harlowja): apparently may raise so in this case we will + # just setup two threads since its hard to know what else we + # should do in this situation. + return 2 + + class MultiLock(object): """A class which can attempt to obtain many locks at once and release said locks when exiting.