diff --git a/README.md b/README.md index d89ee725..7200e70b 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,8 @@ TaskFlow ======== -A library to do [jobs, tasks, flows] in a HA manner using different backends to be used with OpenStack projects. +A library to do [jobs, tasks, flows] in a HA manner using different backends to +be used with OpenStack projects. * More information at http://wiki.openstack.org/wiki/TaskFlow @@ -9,3 +10,16 @@ Join us ------- - http://launchpad.net/taskflow + +Help +---- + +### Tox.ini + +To generate tox.ini, use the `toxgen.py` tool located in `tools/` and provide +that script as input the `tox-tmpl.ini` file to generate the final `tox.ini` +file. + +For example: + + $ ./tools/toxgen.py -i tox-tmpl.ini -o tox.ini diff --git a/requirements.txt b/requirements.txt index 490643eb..644b7075 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,9 +4,6 @@ anyjson>=0.3.3 iso8601>=0.1.8 # Python 2->3 compatibility library. six>=1.4.1 -# Only needed if database backend used. -SQLAlchemy>=0.7.8,<=0.7.99 -alembic>=0.4.1 # Very nice graph library networkx>=1.8 Babel>=1.3 @@ -14,8 +11,3 @@ Babel>=1.3 stevedore>=0.12 # Backport for concurrent.futures which exists in 3.2+ futures>=2.1.3 -# Only needed if the eventlet executor is used. -# eventlet>=0.13.0 -# NOTE(harlowja): if you want to be able to use the graph_utils -# export_graph_to_dot function you will need to uncomment the following. -# pydot>=1.0 diff --git a/taskflow/examples/example_utils.py b/taskflow/examples/example_utils.py new file mode 100644 index 00000000..d3069db6 --- /dev/null +++ b/taskflow/examples/example_utils.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib +import logging +import os +import shutil +import sys +import tempfile + +from taskflow import exceptions +from taskflow.openstack.common.py3kcompat import urlutils +from taskflow.persistence import backends + +LOG = logging.getLogger(__name__) + +try: + import sqlalchemy as _sa # noqa + SQLALCHEMY_AVAILABLE = True +except ImportError: + SQLALCHEMY_AVAILABLE = False + + +def rm_path(persist_path): + if not os.path.exists(persist_path): + return + if os.path.isdir(persist_path): + rm_func = shutil.rmtree + elif os.path.isfile(persist_path): + rm_func = os.unlink + else: + raise ValueError("Unknown how to `rm` path: %s" % (persist_path)) + try: + rm_func(persist_path) + except (IOError, OSError): + pass + + +def _make_conf(backend_uri): + parsed_url = urlutils.urlparse(backend_uri) + backend_type = parsed_url.scheme.lower() + if not backend_type: + raise ValueError("Unknown backend type for uri: %s" % (backend_type)) + if backend_type in ('file', 'dir'): + conf = { + 'path': parsed_url.path, + 'connection': backend_uri, + } + else: + conf = { + 'connection': backend_uri, + } + return conf + + +@contextlib.contextmanager +def get_backend(backend_uri=None): + tmp_dir = None + if not backend_uri: + if len(sys.argv) > 1: + backend_uri = str(sys.argv[1]) + if not backend_uri: + tmp_dir = tempfile.mkdtemp() + backend_uri = "file:///%s" % tmp_dir + try: + backend = backends.fetch(_make_conf(backend_uri)) + except exceptions.NotFound as e: + # Fallback to one that will work if the provided backend is not found. + if not tmp_dir: + tmp_dir = tempfile.mkdtemp() + backend_uri = "file:///%s" % tmp_dir + LOG.exception("Falling back to file backend using temporary" + " directory located at: %s", tmp_dir) + backend = backends.fetch(_make_conf(backend_uri)) + else: + raise e + try: + # Ensure schema upgraded before we continue working. + with contextlib.closing(backend.get_connection()) as conn: + conn.upgrade() + yield backend + finally: + # Make sure to cleanup the temporary path if one was created for us. + if tmp_dir: + rm_path(tmp_dir) diff --git a/taskflow/examples/persistence_example.py b/taskflow/examples/persistence_example.py index 54b84244..d852d33b 100644 --- a/taskflow/examples/persistence_example.py +++ b/taskflow/examples/persistence_example.py @@ -16,7 +16,6 @@ # License for the specific language governing permissions and limitations # under the License. -import contextlib import logging import os import sys @@ -25,18 +24,20 @@ import traceback logging.basicConfig(level=logging.ERROR) +self_dir = os.path.abspath(os.path.dirname(__file__)) top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) sys.path.insert(0, top_dir) +sys.path.insert(0, self_dir) from taskflow import engines from taskflow.patterns import linear_flow as lf -from taskflow.persistence import backends from taskflow.persistence import logbook from taskflow import task from taskflow.utils import persistence_utils as p_utils +import example_utils # noqa # INTRO: In this example we create two tasks, one that will say hi and one # that will say bye with optional capability to raise an error while @@ -49,6 +50,7 @@ from taskflow.utils import persistence_utils as p_utils # as well as shows you what happens during reversion and what happens to # the database during both of these modes (failing or not failing). + def print_wrapped(text): print("-" * (len(text))) print(text) @@ -81,48 +83,44 @@ def make_flow(blowup=False): return flow -# Persist the flow and task state here, if the file exists already blowup +# Persist the flow and task state here, if the file/dir exists already blowup # if not don't blowup, this allows a user to see both the modes and to # see what is stored in each case. -persist_filename = os.path.join(tempfile.gettempdir(), "persisting.db") -if os.path.isfile(persist_filename): +if example_utils.SQLALCHEMY_AVAILABLE: + persist_path = os.path.join(tempfile.gettempdir(), "persisting.db") + backend_uri = "sqlite:///%s" % (persist_path) +else: + persist_path = os.path.join(tempfile.gettempdir(), "persisting") + backend_uri = "file:///%s" % (persist_path) + +if os.path.exists(persist_path): blowup = False else: blowup = True -# Ensure schema upgraded before we continue working. -backend_config = { - 'connection': "sqlite:///%s" % (persist_filename), -} -with contextlib.closing(backends.fetch(backend_config)) as be: - with contextlib.closing(be.get_connection()) as conn: - conn.upgrade() +with example_utils.get_backend(backend_uri) as backend: + # Now we can run. + engine_config = { + 'backend': backend, + 'engine_conf': 'serial', + 'book': logbook.LogBook("my-test"), + } -# Now we can run. -engine_config = { - 'backend': backend_config, - 'engine_conf': 'serial', - 'book': logbook.LogBook("my-test"), -} - -# Make a flow that will blowup if the file doesn't exist previously, if it -# did exist, assume we won't blowup (and therefore this shows the undo -# and redo that a flow will go through). -flow = make_flow(blowup=blowup) -print_wrapped("Running") - -try: - eng = engines.load(flow, **engine_config) - eng.run() + # Make a flow that will blowup if the file doesn't exist previously, if it + # did exist, assume we won't blowup (and therefore this shows the undo + # and redo that a flow will go through). + flow = make_flow(blowup=blowup) + print_wrapped("Running") try: - os.unlink(persist_filename) - except (OSError, IOError): - pass -except Exception: - # NOTE(harlowja): don't exit with non-zero status code, so that we can - # print the book contents, as well as avoiding exiting also makes the - # unit tests (which also runs these examples) pass. - traceback.print_exc(file=sys.stdout) + eng = engines.load(flow, **engine_config) + eng.run() + if not blowup: + example_utils.rm_path(persist_path) + except Exception: + # NOTE(harlowja): don't exit with non-zero status code, so that we can + # print the book contents, as well as avoiding exiting also makes the + # unit tests (which also runs these examples) pass. + traceback.print_exc(file=sys.stdout) -print_wrapped("Book contents") -print(p_utils.pformat(engine_config['book'])) + print_wrapped("Book contents") + print(p_utils.pformat(engine_config['book'])) diff --git a/taskflow/examples/resume_from_backend.py b/taskflow/examples/resume_from_backend.py index 5e40a174..dece074c 100644 --- a/taskflow/examples/resume_from_backend.py +++ b/taskflow/examples/resume_from_backend.py @@ -22,17 +22,21 @@ import sys logging.basicConfig(level=logging.ERROR) +self_dir = os.path.abspath(os.path.dirname(__file__)) top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) sys.path.insert(0, top_dir) +sys.path.insert(0, self_dir) import taskflow.engines + from taskflow.patterns import linear_flow as lf -from taskflow.persistence import backends from taskflow import task from taskflow.utils import persistence_utils as p_utils +import example_utils # noqa + # INTRO: In this example linear_flow is used to group three tasks, one which # will suspend the future work the engine may do. This suspend engine is then # discarded and the workflow is reloaded from the persisted data and then the @@ -61,17 +65,6 @@ def print_task_states(flowdetail, msg): print(" %s==%s: %s, result=%s" % item) -def get_backend(): - try: - backend_uri = sys.argv[1] - except Exception: - backend_uri = 'sqlite://' - - backend = backends.fetch({'connection': backend_uri}) - backend.get_connection().upgrade() - return backend - - def find_flow_detail(backend, lb_id, fd_id): conn = backend.get_connection() lb = conn.get_logbook(lb_id) @@ -102,40 +95,38 @@ def flow_factory(): ### INITIALIZE PERSISTENCE #################################### -backend = get_backend() -logbook = p_utils.temporary_log_book(backend) +with example_utils.get_backend() as backend: + logbook = p_utils.temporary_log_book(backend) + ### CREATE AND RUN THE FLOW: FIRST ATTEMPT #################### -### CREATE AND RUN THE FLOW: FIRST ATTEMPT #################### + flow = flow_factory() + flowdetail = p_utils.create_flow_detail(flow, logbook, backend) + engine = taskflow.engines.load(flow, flow_detail=flowdetail, + backend=backend) -flow = flow_factory() -flowdetail = p_utils.create_flow_detail(flow, logbook, backend) -engine = taskflow.engines.load(flow, flow_detail=flowdetail, - backend=backend) + print_task_states(flowdetail, "At the beginning, there is no state") + print_wrapped("Running") + engine.run() + print_task_states(flowdetail, "After running") -print_task_states(flowdetail, "At the beginning, there is no state") -print_wrapped("Running") -engine.run() -print_task_states(flowdetail, "After running") + ### RE-CREATE, RESUME, RUN #################################### + print_wrapped("Resuming and running again") -### RE-CREATE, RESUME, RUN #################################### - -print_wrapped("Resuming and running again") - -# NOTE(harlowja): reload the flow detail from backend, this will allow us to -# resume the flow from its suspended state, but first we need to search for -# the right flow details in the correct logbook where things are stored. -# -# We could avoid re-loading the engine and just do engine.run() again, but this -# example shows how another process may unsuspend a given flow and start it -# again for situations where this is useful to-do (say the process running -# the above flow crashes). -flow2 = flow_factory() -flowdetail2 = find_flow_detail(backend, logbook.uuid, - flowdetail.uuid) -engine2 = taskflow.engines.load(flow2, - flow_detail=flowdetail2, - backend=backend) -engine2.run() -print_task_states(flowdetail2, "At the end") + # NOTE(harlowja): reload the flow detail from backend, this will allow us + # to resume the flow from its suspended state, but first we need to search + # for the right flow details in the correct logbook where things are + # stored. + # + # We could avoid re-loading the engine and just do engine.run() again, but + # this example shows how another process may unsuspend a given flow and + # start it again for situations where this is useful to-do (say the process + # running the above flow crashes). + flow2 = flow_factory() + flowdetail2 = find_flow_detail(backend, logbook.uuid, flowdetail.uuid) + engine2 = taskflow.engines.load(flow2, + flow_detail=flowdetail2, + backend=backend) + engine2.run() + print_task_states(flowdetail2, "At the end") diff --git a/taskflow/examples/resume_many_flows.py b/taskflow/examples/resume_many_flows.py index e3f08bca..44cad4a2 100644 --- a/taskflow/examples/resume_many_flows.py +++ b/taskflow/examples/resume_many_flows.py @@ -21,6 +21,11 @@ import subprocess import sys import tempfile +self_dir = os.path.abspath(os.path.dirname(__file__)) +sys.path.insert(0, self_dir) + +import example_utils # noqa + # INTRO: In this example we create a common persistence database (sqlite based) # and then we run a few set of processes which themselves use this persistence # database, those processes 'crash' (in a simulated way) by exiting with a @@ -58,10 +63,15 @@ def _path_to(name): def main(): + backend_uri = None + tmp_path = None try: - fd, db_path = tempfile.mkstemp(prefix='tf-resume-example') - os.close(fd) - backend_uri = 'sqlite:///%s' % db_path + if example_utils.SQLALCHEMY_AVAILABLE: + tmp_path = tempfile.mktemp(prefix='tf-resume-example') + backend_uri = "sqlite:///%s" % (tmp_path) + else: + tmp_path = tempfile.mkdtemp(prefix='tf-resume-example') + backend_uri = 'file:///%s' % (tmp_path) def run_example(name, add_env=None): _exec([sys.executable, _path_to(name), backend_uri], add_env) @@ -78,7 +88,8 @@ def main(): print('\nResuming all failed flows') run_example('resume_all.py') finally: - os.unlink(db_path) + if tmp_path: + example_utils.rm_path(tmp_path) if __name__ == '__main__': main() diff --git a/taskflow/examples/resume_many_flows/my_utils.py b/taskflow/examples/resume_many_flows/my_utils.py deleted file mode 100644 index dd1fbd5e..00000000 --- a/taskflow/examples/resume_many_flows/my_utils.py +++ /dev/null @@ -1,31 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import sys - -from taskflow.persistence import backends - - -def get_backend(): - try: - backend_uri = sys.argv[1] - except Exception: - backend_uri = 'sqlite://' - backend = backends.fetch({'connection': backend_uri}) - backend.get_connection().upgrade() - return backend diff --git a/taskflow/examples/resume_many_flows/resume_all.py b/taskflow/examples/resume_many_flows/resume_all.py index 94e3b48a..5cedcaf4 100644 --- a/taskflow/examples/resume_many_flows/resume_all.py +++ b/taskflow/examples/resume_many_flows/resume_all.py @@ -25,16 +25,17 @@ logging.basicConfig(level=logging.ERROR) self_dir = os.path.abspath(os.path.dirname(__file__)) top_dir = os.path.abspath( os.path.join(self_dir, os.pardir, os.pardir, os.pardir)) +example_dir = os.path.abspath(os.path.join(self_dir, os.pardir)) sys.path.insert(0, top_dir) -sys.path.insert(0, self_dir) +sys.path.insert(0, example_dir) import taskflow.engines from taskflow import states -import my_utils # noqa +import example_utils # noqa FINISHED_STATES = (states.SUCCESS, states.FAILURE, states.REVERTED) @@ -48,12 +49,12 @@ def resume(flowdetail, backend): def main(): - backend = my_utils.get_backend() - logbooks = list(backend.get_connection().get_logbooks()) - for lb in logbooks: - for fd in lb: - if fd.state not in FINISHED_STATES: - resume(fd, backend) + with example_utils.get_backend() as backend: + logbooks = list(backend.get_connection().get_logbooks()) + for lb in logbooks: + for fd in lb: + if fd.state not in FINISHED_STATES: + resume(fd, backend) if __name__ == '__main__': diff --git a/taskflow/examples/resume_many_flows/run_flow.py b/taskflow/examples/resume_many_flows/run_flow.py index e3409e13..cbd2c2ec 100644 --- a/taskflow/examples/resume_many_flows/run_flow.py +++ b/taskflow/examples/resume_many_flows/run_flow.py @@ -25,19 +25,21 @@ logging.basicConfig(level=logging.ERROR) self_dir = os.path.abspath(os.path.dirname(__file__)) top_dir = os.path.abspath( os.path.join(self_dir, os.pardir, os.pardir, os.pardir)) +example_dir = os.path.abspath(os.path.join(self_dir, os.pardir)) sys.path.insert(0, top_dir) sys.path.insert(0, self_dir) +sys.path.insert(0, example_dir) import taskflow.engines +import example_utils # noqa import my_flows # noqa -import my_utils # noqa -backend = my_utils.get_backend() -engine = taskflow.engines.load_from_factory(my_flows.flow_factory, - backend=backend) -print('Running flow %s %s' % (engine.storage.flow_name, - engine.storage.flow_uuid)) -engine.run() +with example_utils.get_backend() as backend: + engine = taskflow.engines.load_from_factory(my_flows.flow_factory, + backend=backend) + print('Running flow %s %s' % (engine.storage.flow_name, + engine.storage.flow_uuid)) + engine.run() diff --git a/taskflow/examples/resume_vm_boot.py b/taskflow/examples/resume_vm_boot.py index e4ecc157..7499c70a 100644 --- a/taskflow/examples/resume_vm_boot.py +++ b/taskflow/examples/resume_vm_boot.py @@ -26,10 +26,12 @@ import time logging.basicConfig(level=logging.ERROR) +self_dir = os.path.abspath(os.path.dirname(__file__)) top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) sys.path.insert(0, top_dir) +sys.path.insert(0, self_dir) from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf @@ -40,10 +42,10 @@ from taskflow import engines from taskflow import exceptions as exc from taskflow import task -from taskflow.persistence import backends from taskflow.utils import eventlet_utils as e_utils from taskflow.utils import persistence_utils as p_utils +import example_utils # noqa # INTRO: This examples shows how a hierarchy of flows can be used to create a # vm in a reliable & resumable manner using taskflow + a miniature version of @@ -67,17 +69,6 @@ def print_wrapped(text): print("-" * (len(text))) -def get_backend(): - try: - backend_uri = sys.argv[1] - except Exception: - backend_uri = 'sqlite://' - - backend = backends.fetch({'connection': backend_uri}) - backend.get_connection().upgrade() - return backend - - class PrintText(task.Task): """Just inserts some text print outs in a workflow.""" def __init__(self, print_what, no_slow=False): @@ -243,50 +234,51 @@ def create_flow(): print_wrapped("Initializing") # Setup the persistence & resumption layer. -backend = get_backend() -try: - book_id, flow_id = sys.argv[2].split("+", 1) - if not uuidutils.is_uuid_like(book_id): +with example_utils.get_backend() as backend: + try: + book_id, flow_id = sys.argv[2].split("+", 1) + if not uuidutils.is_uuid_like(book_id): + book_id = None + if not uuidutils.is_uuid_like(flow_id): + flow_id = None + except (IndexError, ValueError): book_id = None - if not uuidutils.is_uuid_like(flow_id): flow_id = None -except (IndexError, ValueError): - book_id = None - flow_id = None -# Set up how we want our engine to run, serial, parallel... -engine_conf = { - 'engine': 'parallel', -} -if e_utils.EVENTLET_AVAILABLE: - engine_conf['executor'] = e_utils.GreenExecutor(5) + # Set up how we want our engine to run, serial, parallel... + engine_conf = { + 'engine': 'parallel', + } + if e_utils.EVENTLET_AVAILABLE: + engine_conf['executor'] = e_utils.GreenExecutor(5) -# Create/fetch a logbook that will track the workflows work. -book = None -flow_detail = None -if all([book_id, flow_id]): - with contextlib.closing(backend.get_connection()) as conn: - try: - book = conn.get_logbook(book_id) - flow_detail = book.find(flow_id) - except exc.NotFound: - pass -if book is None and flow_detail is None: - book = p_utils.temporary_log_book(backend) - engine = engines.load_from_factory(create_flow, - backend=backend, book=book, - engine_conf=engine_conf) - print("!! Your tracking id is: '%s+%s'" % (book.uuid, - engine.storage.flow_uuid)) - print("!! Please submit this on later runs for tracking purposes") -else: - # Attempt to load from a previously potentially partially completed flow. - engine = engines.load_from_detail(flow_detail, - backend=backend, engine_conf=engine_conf) + # Create/fetch a logbook that will track the workflows work. + book = None + flow_detail = None + if all([book_id, flow_id]): + with contextlib.closing(backend.get_connection()) as conn: + try: + book = conn.get_logbook(book_id) + flow_detail = book.find(flow_id) + except exc.NotFound: + pass + if book is None and flow_detail is None: + book = p_utils.temporary_log_book(backend) + engine = engines.load_from_factory(create_flow, + backend=backend, book=book, + engine_conf=engine_conf) + print("!! Your tracking id is: '%s+%s'" % (book.uuid, + engine.storage.flow_uuid)) + print("!! Please submit this on later runs for tracking purposes") + else: + # Attempt to load from a previously partially completed flow. + engine = engines.load_from_detail(flow_detail, + backend=backend, + engine_conf=engine_conf) -# Make me my vm please! -print_wrapped('Running') -engine.run() + # Make me my vm please! + print_wrapped('Running') + engine.run() # How to use. # diff --git a/taskflow/examples/resume_volume_create.py b/taskflow/examples/resume_volume_create.py index 35edbac6..a8be9c69 100644 --- a/taskflow/examples/resume_volume_create.py +++ b/taskflow/examples/resume_volume_create.py @@ -26,10 +26,12 @@ import time logging.basicConfig(level=logging.ERROR) +self_dir = os.path.abspath(os.path.dirname(__file__)) top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) sys.path.insert(0, top_dir) +sys.path.insert(0, self_dir) from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf @@ -37,9 +39,9 @@ from taskflow.patterns import linear_flow as lf from taskflow import engines from taskflow import task -from taskflow.persistence import backends from taskflow.utils import persistence_utils as p_utils +import example_utils # noqa # INTRO: This examples shows how a hierarchy of flows can be used to create a # pseudo-volume in a reliable & resumable manner using taskflow + a miniature @@ -69,17 +71,6 @@ def find_flow_detail(backend, book_id, flow_id): return lb.find(flow_id) -def get_backend(): - try: - backend_uri = sys.argv[1] - except Exception: - backend_uri = 'sqlite://' - - backend = backends.fetch({'connection': backend_uri}) - backend.get_connection().upgrade() - return backend - - class PrintText(task.Task): def __init__(self, print_what, no_slow=False): content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8] @@ -131,38 +122,39 @@ flow = lf.Flow("root").add( PrintText("Finished volume create", no_slow=True)) # Setup the persistence & resumption layer. -backend = get_backend() -try: - book_id, flow_id = sys.argv[2].split("+", 1) -except (IndexError, ValueError): - book_id = None - flow_id = None +with example_utils.get_backend() as backend: + try: + book_id, flow_id = sys.argv[2].split("+", 1) + except (IndexError, ValueError): + book_id = None + flow_id = None -if not all([book_id, flow_id]): - # If no 'tracking id' (think a fedex or ups tracking id) is provided then - # we create one by creating a logbook (where flow details are stored) and - # creating a flow detail (where flow and task state is stored). The - # combination of these 2 objects unique ids (uuids) allows the users of - # taskflow to reassociate the workflows that were potentially running (and - # which may have partially completed) back with taskflow so that those - # workflows can be resumed (or reverted) after a process/thread/engine - # has failed in someway. - logbook = p_utils.temporary_log_book(backend) - flow_detail = p_utils.create_flow_detail(flow, logbook, backend) - print("!! Your tracking id is: '%s+%s'" % (logbook.uuid, flow_detail.uuid)) - print("!! Please submit this on later runs for tracking purposes") -else: - flow_detail = find_flow_detail(backend, book_id, flow_id) + if not all([book_id, flow_id]): + # If no 'tracking id' (think a fedex or ups tracking id) is provided + # then we create one by creating a logbook (where flow details are + # stored) and creating a flow detail (where flow and task state is + # stored). The combination of these 2 objects unique ids (uuids) allows + # the users of taskflow to reassociate the workflows that were + # potentially running (and which may have partially completed) back + # with taskflow so that those workflows can be resumed (or reverted) + # after a process/thread/engine has failed in someway. + logbook = p_utils.temporary_log_book(backend) + flow_detail = p_utils.create_flow_detail(flow, logbook, backend) + print("!! Your tracking id is: '%s+%s'" % (logbook.uuid, + flow_detail.uuid)) + print("!! Please submit this on later runs for tracking purposes") + else: + flow_detail = find_flow_detail(backend, book_id, flow_id) -# Annnnd load and run. -engine_conf = { - 'engine': 'serial', -} -engine = engines.load(flow, - flow_detail=flow_detail, - backend=backend, - engine_conf=engine_conf) -engine.run() + # Annnnd load and run. + engine_conf = { + 'engine': 'serial', + } + engine = engines.load(flow, + flow_detail=flow_detail, + backend=backend, + engine_conf=engine_conf) + engine.run() # How to use. # diff --git a/taskflow/tests/test_examples.py b/taskflow/tests/test_examples.py index dae1fdb2..f526b94c 100644 --- a/taskflow/tests/test_examples.py +++ b/taskflow/tests/test_examples.py @@ -49,9 +49,8 @@ def root_path(*args): def run_example(name): path = root_path('taskflow', 'examples', '%s.py' % name) - obj = subprocess.Popen( - [sys.executable, path], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + obj = subprocess.Popen([sys.executable, path], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) output = obj.communicate() if output[1]: raise RuntimeError('Example wrote to stderr:\n%s' @@ -60,15 +59,14 @@ def run_example(name): def expected_output_path(name): - return root_path('taskflow', 'examples', - '%s.out.txt' % name) + return root_path('taskflow', 'examples', '%s.out.txt' % name) def list_examples(): - ext = '.py' examples_dir = root_path('taskflow', 'examples') for filename in os.listdir(examples_dir): - if filename.endswith(ext): + name, ext = os.path.splitext(filename) + if ext == ".py" and 'utils' not in name.lower(): yield filename[:-len(ext)] diff --git a/taskflow/tests/unit/persistence/test_sql_persistence.py b/taskflow/tests/unit/persistence/test_sql_persistence.py index ba60f5a9..bc3f104b 100644 --- a/taskflow/tests/unit/persistence/test_sql_persistence.py +++ b/taskflow/tests/unit/persistence/test_sql_persistence.py @@ -19,11 +19,19 @@ import os import tempfile -from taskflow.persistence.backends import impl_sqlalchemy +import testtools + +try: + from taskflow.persistence.backends import impl_sqlalchemy + SQLALCHEMY_AVAILABLE = True +except ImportError: + SQLALCHEMY_AVAILABLE = False + from taskflow import test from taskflow.tests.unit.persistence import base +@testtools.skipIf(not SQLALCHEMY_AVAILABLE, 'sqlalchemy is not available') class SqlPersistenceTest(test.TestCase, base.PersistenceTestMixin): """Inherits from the base test and sets up a sqlite temporary db.""" def _get_connection(self): diff --git a/test-2.x-requirements.txt b/test-2.x-requirements.txt deleted file mode 100644 index a6a628e6..00000000 --- a/test-2.x-requirements.txt +++ /dev/null @@ -1 +0,0 @@ -eventlet>=0.13.0 diff --git a/tools/toxgen.py b/tools/toxgen.py new file mode 100755 index 00000000..111ed9b1 --- /dev/null +++ b/tools/toxgen.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python + +# From: https://bitbucket.org/cdevienne/toxgen (pypi soon hopefully) and +# modified slightly to work in python 2.6 and set some values that are not +# being set. +# +# TODO(harlowja): remove me when toxgen is a pypi package. + +""" +Produce a tox.ini file from a template config file. + +The template config file is a standard tox.ini file with additional sections. +Theses sections will be combined to create new testenv: sections if they do +not exists yet. +""" + +import collections +import itertools +import optparse +import os + +import six + +from six.moves import configparser + +try: + from collections import OrderedDict +except ImportError: + from ordereddict import OrderedDict + + +HEADER = '# DO NOT EDIT THIS FILE - it is machine generated from %(filename)s' +SKIP_VENVS = frozenset(['venv']) + +parser = optparse.OptionParser(epilog=__doc__) +parser.add_option('-i', '--input', dest='input', + default='tox-tmpl.ini', metavar='FILE') +parser.add_option('-o', '--output', dest='output', + default='tox.ini', metavar='FILE') + + +class AxisItem(object): + def __init__(self, axis, name, config): + self.axis = axis + self.isdefault = name[-1] == '*' + self.name = name[:-1] if self.isdefault else name + self.load(config) + + def load(self, config): + sectionname = 'axis:%s:%s' % (self.axis.name, self.name) + if config.has_section(sectionname): + self.options = dict(config.items(sectionname)) + else: + self.options = dict() + + for name, value in self.axis.defaults.items(): + if name not in self.options: + self.options[name] = value + + +class Axis(object): + def __init__(self, name, config): + self.name = name + self.load(config) + + def load(self, config): + self.items = dict() + values = config.get('axes', self.name).split(',') + if config.has_section('axis:%s' % self.name): + self.defaults = dict( + config.items('axis:%s' % self.name) + ) + else: + self.defaults = {} + for value in values: + self.items[value.strip('*')] = AxisItem(self, value, config) + + +def format_list(contents, max_len=80, sep=","): + lines = [] + for line in contents: + if not lines: + lines.append(line + ",") + else: + last_len = len(lines[-1]) + if last_len + len(line) >= max_len: + lines.append(str(line) + sep) + else: + lines[-1] = lines[-1] + str(line) + sep + return "\n".join(lines).rstrip(",") + + +def render(incfg, filename, adjust_envlist=True): + test_envs = set() + for s in incfg.sections(): + if s.startswith("testenv:"): + env = s[len("testenv:"):].strip() + if env in SKIP_VENVS or not env: + continue + test_envs.add(env) + test_envs = [s for s in test_envs if s] + + try: + envlist = incfg.get("tox", 'envlist') + envlist = [e.strip() for e in envlist.split(",")] + envlist = set([e for e in envlist if e]) + except (configparser.NoOptionError, configparser.NoSectionError): + envlist = set() + for e in test_envs: + if e not in envlist: + envlist.add(e) + + if not incfg.has_section("tox"): + incfg.add_section("tox") + incfg.set("tox", "envlist", + format_list(list(sorted(envlist)), max_len=-1)) + + text = six.StringIO() + incfg.write(text) + contents = [ + HEADER % {'filename': os.path.basename(filename)}, + '', + # Remove how configparser uses tabs instead of spaces, madness... + text.getvalue().replace("\t", " " * 4), + ] + return "\n".join(contents) + + +def compile_template(incfg): + axes = dict() + + if incfg.has_section('axes'): + for axis in incfg.options('axes'): + axes[axis] = Axis(axis, incfg) + + out = configparser.ConfigParser(dict_type=OrderedDict) + for section in incfg.sections(): + if section == 'axes' or section.startswith('axis:'): + continue + out.add_section(section) + for name, value in incfg.items(section): + out.set(section, name, value) + + items = [axis.items.keys() for axis in axes.values()] + for combination in itertools.product(*items): + options = {} + + section_name = ( + 'testenv:' + '-'.join([item for item in combination if item]) + ) + section_alt_name = ( + 'testenv:' + '-'.join([ + itemname + for axis, itemname in zip(axes.values(), combination) + if itemname and not axis.items[itemname].isdefault + ]) + ) + if section_alt_name == section_name: + section_alt_name = None + + axes_items = [ + '%s:%s' % (axis, itemname) + for axis, itemname in zip(axes, combination) + ] + + for axis, itemname in zip(axes.values(), combination): + axis_options = axis.items[itemname].options + if 'constraints' in axis_options: + constraints = axis_options['constraints'].split('\n') + for c in constraints: + if c.startswith('!') and c[1:] in axes_items: + continue + for name, value in axis_options.items(): + if name in options: + options[name] += value + else: + options[name] = value + + constraints = options.pop('constraints', '').split('\n') + neg_constraints = [c[1:] for c in constraints if c and c[0] == '!'] + if not set(neg_constraints).isdisjoint(axes_items): + continue + + if not out.has_section(section_name): + out.add_section(section_name) + + if (section_alt_name and not out.has_section(section_alt_name)): + out.add_section(section_alt_name) + + for name, value in reversed(options.items()): + if not out.has_option(section_name, name): + out.set(section_name, name, value) + if section_alt_name and not out.has_option(section_alt_name, name): + out.set(section_alt_name, name, value) + + return out + + +def main(): + options, args = parser.parse_args() + tmpl = configparser.ConfigParser() + with open(options.input, 'rb') as fh: + tmpl.readfp(fh, filename=options.input) + with open(options.output, 'wb') as outfile: + text = render(compile_template(tmpl), options.input) + outfile.write(text) + + +if __name__ == '__main__': + main() + diff --git a/tox-tmpl.ini b/tox-tmpl.ini new file mode 100644 index 00000000..b9d55e59 --- /dev/null +++ b/tox-tmpl.ini @@ -0,0 +1,84 @@ +# NOTE(harlowja): this is a template, not a fully-generated tox.ini, use toxgen +# to translate this into a fully specified tox.ini file before using. Changes +# made to tox.ini will only be reflected if ran through the toxgen generator. + +[tox] +minversion = 1.6 +skipsdist = True + +[testenv] +usedevelop = True +install_command = pip install {opts} {packages} +setenv = VIRTUAL_ENV={envdir} + LANG=en_US.UTF-8 + LANGUAGE=en_US:en + LC_ALL=C +deps = -r{toxinidir}/requirements.txt + -r{toxinidir}/test-requirements.txt +commands = python setup.py testr --slowest --testr-args='{posargs}' + +[tox:jenkins] +downloadcache = ~/cache/pip + +[testenv:pep8] +commands = + flake8 {posargs} + +[testenv:pylint] +setenv = VIRTUAL_ENV={envdir} +deps = -r{toxinidir}/requirements.txt + pylint==0.26.0 +commands = pylint + +[testenv:cover] +deps = -r{toxinidir}/requirements.txt + -r{toxinidir}/test-requirements.txt +commands = python setup.py testr --coverage --testr-args='{posargs}' + +[testenv:venv] +commands = {posargs} + +[flake8] +ignore = H402 +builtins = _ +exclude = .venv,.tox,dist,doc,./taskflow/openstack/common,*egg,.git,build,tools + +[axes] +python = py26,py27,py33 +sqlalchemy = sa7,sa8,sa9,* +eventlet = ev,* + +[axis:python:py26] +basepython = python2.6 +deps = {[testenv]deps} + +[axis:python:py27] +basepython = python2.7 +deps = {[testenv]deps} + +[axis:python:py33] +basepython = python3.3 +deps = {[testenv]deps} + +[axis:eventlet:ev] +deps = + eventlet>=0.13.0 +constraints= + !python:py33 + +[axis:sqlalchemy:sa7] +deps = + SQLAlchemy<=0.7.99 + alembic>=0.4.1 + +[axis:sqlalchemy:sa8] +deps = + SQLAlchemy>0.7.99 + SQLAlchemy<=0.8.99 + alembic>=0.4.1 + +[axis:sqlalchemy:sa9] +deps = + SQLAlchemy>0.8.99 + SQLAlchemy<=0.9.99 + alembic>=0.4.1 diff --git a/tox.ini b/tox.ini index 05d03ef5..57d83f5d 100644 --- a/tox.ini +++ b/tox.ini @@ -1,47 +1,60 @@ +# DO NOT EDIT THIS FILE - it is machine generated from tox-tmpl.ini + [tox] minversion = 1.6 skipsdist = True -envlist = py26,py27,py33,pep8 +envlist = cover, + pep8, + py26, + py26-ev, + py26-sa7, + py26-sa7-ev, + py26-sa8, + py26-sa8-ev, + py26-sa9, + py26-sa9-ev, + py27, + py27-ev, + py27-sa7, + py27-sa7-ev, + py27-sa8, + py27-sa8-ev, + py27-sa9, + py27-sa9-ev, + py33, + py33-sa7, + py33-sa8, + py33-sa9, + pylint [testenv] -usedevelop = True install_command = pip install {opts} {packages} -setenv = VIRTUAL_ENV={envdir} - LANG=en_US.UTF-8 - LANGUAGE=en_US:en - LC_ALL=C -deps = -r{toxinidir}/requirements.txt - -r{toxinidir}/test-requirements.txt commands = python setup.py testr --slowest --testr-args='{posargs}' - -[testenv:py26] +setenv = VIRTUAL_ENV={envdir} + LANG=en_US.UTF-8 + LANGUAGE=en_US:en + LC_ALL=C deps = -r{toxinidir}/requirements.txt - -r{toxinidir}/test-requirements.txt - -r{toxinidir}/test-2.x-requirements.txt + -r{toxinidir}/test-requirements.txt +usedevelop = True -[testenv:py27] +[testenv:pylint] +commands = pylint +setenv = VIRTUAL_ENV={envdir} deps = -r{toxinidir}/requirements.txt - -r{toxinidir}/test-requirements.txt - -r{toxinidir}/test-2.x-requirements.txt + pylint==0.26.0 + +[testenv:cover] +commands = python setup.py testr --coverage --testr-args='{posargs}' +deps = -r{toxinidir}/requirements.txt + -r{toxinidir}/test-requirements.txt [tox:jenkins] downloadcache = ~/cache/pip [testenv:pep8] -commands = - flake8 {posargs} - -[testenv:pylint] -setenv = VIRTUAL_ENV={envdir} -deps = -r{toxinidir}/requirements.txt - pylint==0.26.0 -commands = pylint - -[testenv:cover] -deps = -r{toxinidir}/requirements.txt - -r{toxinidir}/test-requirements.txt - -r{toxinidir}/test-2.x-requirements.txt -commands = python setup.py testr --coverage --testr-args='{posargs}' +commands = + flake8 {posargs} [testenv:venv] commands = {posargs} @@ -50,3 +63,132 @@ commands = {posargs} ignore = H402 builtins = _ exclude = .venv,.tox,dist,doc,./taskflow/openstack/common,*egg,.git,build,tools + +[testenv:py27] +basepython = python2.7 +deps = {[testenv]deps} + +[testenv:py27-ev] +basepython = python2.7 +deps = {[testenv]deps} + eventlet>=0.13.0 + +[testenv:py27-sa9] +basepython = python2.7 +deps = {[testenv]deps} + SQLAlchemy>0.8.99 + SQLAlchemy<=0.9.99 + alembic>=0.4.1 + +[testenv:py27-sa9-ev] +basepython = python2.7 +deps = {[testenv]deps} + SQLAlchemy>0.8.99 + SQLAlchemy<=0.9.99 + alembic>=0.4.1 + eventlet>=0.13.0 + +[testenv:py27-sa8] +basepython = python2.7 +deps = {[testenv]deps} + SQLAlchemy>0.7.99 + SQLAlchemy<=0.8.99 + alembic>=0.4.1 + +[testenv:py27-sa8-ev] +basepython = python2.7 +deps = {[testenv]deps} + SQLAlchemy>0.7.99 + SQLAlchemy<=0.8.99 + alembic>=0.4.1 + eventlet>=0.13.0 + +[testenv:py27-sa7] +basepython = python2.7 +deps = {[testenv]deps} + SQLAlchemy<=0.7.99 + alembic>=0.4.1 + +[testenv:py27-sa7-ev] +basepython = python2.7 +deps = {[testenv]deps} + SQLAlchemy<=0.7.99 + alembic>=0.4.1 + eventlet>=0.13.0 + +[testenv:py26] +basepython = python2.6 +deps = {[testenv]deps} + +[testenv:py26-ev] +basepython = python2.6 +deps = {[testenv]deps} + eventlet>=0.13.0 + +[testenv:py26-sa9] +basepython = python2.6 +deps = {[testenv]deps} + SQLAlchemy>0.8.99 + SQLAlchemy<=0.9.99 + alembic>=0.4.1 + +[testenv:py26-sa9-ev] +basepython = python2.6 +deps = {[testenv]deps} + SQLAlchemy>0.8.99 + SQLAlchemy<=0.9.99 + alembic>=0.4.1 + eventlet>=0.13.0 + +[testenv:py26-sa8] +basepython = python2.6 +deps = {[testenv]deps} + SQLAlchemy>0.7.99 + SQLAlchemy<=0.8.99 + alembic>=0.4.1 + +[testenv:py26-sa8-ev] +basepython = python2.6 +deps = {[testenv]deps} + SQLAlchemy>0.7.99 + SQLAlchemy<=0.8.99 + alembic>=0.4.1 + eventlet>=0.13.0 + +[testenv:py26-sa7] +basepython = python2.6 +deps = {[testenv]deps} + SQLAlchemy<=0.7.99 + alembic>=0.4.1 + +[testenv:py26-sa7-ev] +basepython = python2.6 +deps = {[testenv]deps} + SQLAlchemy<=0.7.99 + alembic>=0.4.1 + eventlet>=0.13.0 + +[testenv:py33] +basepython = python3.3 +deps = {[testenv]deps} + +[testenv:py33-sa9] +basepython = python3.3 +deps = {[testenv]deps} + SQLAlchemy>0.8.99 + SQLAlchemy<=0.9.99 + alembic>=0.4.1 + +[testenv:py33-sa8] +basepython = python3.3 +deps = {[testenv]deps} + SQLAlchemy>0.7.99 + SQLAlchemy<=0.8.99 + alembic>=0.4.1 + +[testenv:py33-sa7] +basepython = python3.3 +deps = {[testenv]deps} + SQLAlchemy<=0.7.99 + alembic>=0.4.1 +