Add envs for different sqlalchemy versions

Adjust tests to skip the sqlalchemy test if
sqlalchemy is not installed. Adjust examples
to fallback to a directory based backend if
the sqlalchemy does not load or is not available.

Include a updated tox.ini (generated from the
toxgen.py script) that includes the new venv
variations.

Change-Id: I7686f09901a9b65d7c81b4e037b5bffc24aa7ef7
This commit is contained in:
Joshua Harlow
2014-01-06 10:17:59 -08:00
parent d617864898
commit 3d437df749
17 changed files with 776 additions and 271 deletions

View File

@@ -1,7 +1,8 @@
TaskFlow TaskFlow
======== ========
A library to do [jobs, tasks, flows] in a HA manner using different backends to be used with OpenStack projects. A library to do [jobs, tasks, flows] in a HA manner using different backends to
be used with OpenStack projects.
* More information at http://wiki.openstack.org/wiki/TaskFlow * More information at http://wiki.openstack.org/wiki/TaskFlow
@@ -9,3 +10,16 @@ Join us
------- -------
- http://launchpad.net/taskflow - http://launchpad.net/taskflow
Help
----
### Tox.ini
To generate tox.ini, use the `toxgen.py` tool located in `tools/` and provide
that script as input the `tox-tmpl.ini` file to generate the final `tox.ini`
file.
For example:
$ ./tools/toxgen.py -i tox-tmpl.ini -o tox.ini

View File

@@ -4,9 +4,6 @@ anyjson>=0.3.3
iso8601>=0.1.8 iso8601>=0.1.8
# Python 2->3 compatibility library. # Python 2->3 compatibility library.
six>=1.4.1 six>=1.4.1
# Only needed if database backend used.
SQLAlchemy>=0.7.8,<=0.7.99
alembic>=0.4.1
# Very nice graph library # Very nice graph library
networkx>=1.8 networkx>=1.8
Babel>=1.3 Babel>=1.3
@@ -14,8 +11,3 @@ Babel>=1.3
stevedore>=0.12 stevedore>=0.12
# Backport for concurrent.futures which exists in 3.2+ # Backport for concurrent.futures which exists in 3.2+
futures>=2.1.3 futures>=2.1.3
# Only needed if the eventlet executor is used.
# eventlet>=0.13.0
# NOTE(harlowja): if you want to be able to use the graph_utils
# export_graph_to_dot function you will need to uncomment the following.
# pydot>=1.0

View File

@@ -0,0 +1,100 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import logging
import os
import shutil
import sys
import tempfile
from taskflow import exceptions
from taskflow.openstack.common.py3kcompat import urlutils
from taskflow.persistence import backends
LOG = logging.getLogger(__name__)
try:
import sqlalchemy as _sa # noqa
SQLALCHEMY_AVAILABLE = True
except ImportError:
SQLALCHEMY_AVAILABLE = False
def rm_path(persist_path):
if not os.path.exists(persist_path):
return
if os.path.isdir(persist_path):
rm_func = shutil.rmtree
elif os.path.isfile(persist_path):
rm_func = os.unlink
else:
raise ValueError("Unknown how to `rm` path: %s" % (persist_path))
try:
rm_func(persist_path)
except (IOError, OSError):
pass
def _make_conf(backend_uri):
parsed_url = urlutils.urlparse(backend_uri)
backend_type = parsed_url.scheme.lower()
if not backend_type:
raise ValueError("Unknown backend type for uri: %s" % (backend_type))
if backend_type in ('file', 'dir'):
conf = {
'path': parsed_url.path,
'connection': backend_uri,
}
else:
conf = {
'connection': backend_uri,
}
return conf
@contextlib.contextmanager
def get_backend(backend_uri=None):
tmp_dir = None
if not backend_uri:
if len(sys.argv) > 1:
backend_uri = str(sys.argv[1])
if not backend_uri:
tmp_dir = tempfile.mkdtemp()
backend_uri = "file:///%s" % tmp_dir
try:
backend = backends.fetch(_make_conf(backend_uri))
except exceptions.NotFound as e:
# Fallback to one that will work if the provided backend is not found.
if not tmp_dir:
tmp_dir = tempfile.mkdtemp()
backend_uri = "file:///%s" % tmp_dir
LOG.exception("Falling back to file backend using temporary"
" directory located at: %s", tmp_dir)
backend = backends.fetch(_make_conf(backend_uri))
else:
raise e
try:
# Ensure schema upgraded before we continue working.
with contextlib.closing(backend.get_connection()) as conn:
conn.upgrade()
yield backend
finally:
# Make sure to cleanup the temporary path if one was created for us.
if tmp_dir:
rm_path(tmp_dir)

View File

@@ -16,7 +16,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import contextlib
import logging import logging
import os import os
import sys import sys
@@ -25,18 +24,20 @@ import traceback
logging.basicConfig(level=logging.ERROR) logging.basicConfig(level=logging.ERROR)
self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir, os.pardir,
os.pardir)) os.pardir))
sys.path.insert(0, top_dir) sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)
from taskflow import engines from taskflow import engines
from taskflow.patterns import linear_flow as lf from taskflow.patterns import linear_flow as lf
from taskflow.persistence import backends
from taskflow.persistence import logbook from taskflow.persistence import logbook
from taskflow import task from taskflow import task
from taskflow.utils import persistence_utils as p_utils from taskflow.utils import persistence_utils as p_utils
import example_utils # noqa
# INTRO: In this example we create two tasks, one that will say hi and one # INTRO: In this example we create two tasks, one that will say hi and one
# that will say bye with optional capability to raise an error while # that will say bye with optional capability to raise an error while
@@ -49,6 +50,7 @@ from taskflow.utils import persistence_utils as p_utils
# as well as shows you what happens during reversion and what happens to # as well as shows you what happens during reversion and what happens to
# the database during both of these modes (failing or not failing). # the database during both of these modes (failing or not failing).
def print_wrapped(text): def print_wrapped(text):
print("-" * (len(text))) print("-" * (len(text)))
print(text) print(text)
@@ -81,48 +83,44 @@ def make_flow(blowup=False):
return flow return flow
# Persist the flow and task state here, if the file exists already blowup # Persist the flow and task state here, if the file/dir exists already blowup
# if not don't blowup, this allows a user to see both the modes and to # if not don't blowup, this allows a user to see both the modes and to
# see what is stored in each case. # see what is stored in each case.
persist_filename = os.path.join(tempfile.gettempdir(), "persisting.db") if example_utils.SQLALCHEMY_AVAILABLE:
if os.path.isfile(persist_filename): persist_path = os.path.join(tempfile.gettempdir(), "persisting.db")
backend_uri = "sqlite:///%s" % (persist_path)
else:
persist_path = os.path.join(tempfile.gettempdir(), "persisting")
backend_uri = "file:///%s" % (persist_path)
if os.path.exists(persist_path):
blowup = False blowup = False
else: else:
blowup = True blowup = True
# Ensure schema upgraded before we continue working. with example_utils.get_backend(backend_uri) as backend:
backend_config = { # Now we can run.
'connection': "sqlite:///%s" % (persist_filename), engine_config = {
} 'backend': backend,
with contextlib.closing(backends.fetch(backend_config)) as be: 'engine_conf': 'serial',
with contextlib.closing(be.get_connection()) as conn: 'book': logbook.LogBook("my-test"),
conn.upgrade() }
# Now we can run. # Make a flow that will blowup if the file doesn't exist previously, if it
engine_config = { # did exist, assume we won't blowup (and therefore this shows the undo
'backend': backend_config, # and redo that a flow will go through).
'engine_conf': 'serial', flow = make_flow(blowup=blowup)
'book': logbook.LogBook("my-test"), print_wrapped("Running")
}
# Make a flow that will blowup if the file doesn't exist previously, if it
# did exist, assume we won't blowup (and therefore this shows the undo
# and redo that a flow will go through).
flow = make_flow(blowup=blowup)
print_wrapped("Running")
try:
eng = engines.load(flow, **engine_config)
eng.run()
try: try:
os.unlink(persist_filename) eng = engines.load(flow, **engine_config)
except (OSError, IOError): eng.run()
pass if not blowup:
except Exception: example_utils.rm_path(persist_path)
# NOTE(harlowja): don't exit with non-zero status code, so that we can except Exception:
# print the book contents, as well as avoiding exiting also makes the # NOTE(harlowja): don't exit with non-zero status code, so that we can
# unit tests (which also runs these examples) pass. # print the book contents, as well as avoiding exiting also makes the
traceback.print_exc(file=sys.stdout) # unit tests (which also runs these examples) pass.
traceback.print_exc(file=sys.stdout)
print_wrapped("Book contents") print_wrapped("Book contents")
print(p_utils.pformat(engine_config['book'])) print(p_utils.pformat(engine_config['book']))

View File

@@ -22,17 +22,21 @@ import sys
logging.basicConfig(level=logging.ERROR) logging.basicConfig(level=logging.ERROR)
self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir, os.pardir,
os.pardir)) os.pardir))
sys.path.insert(0, top_dir) sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)
import taskflow.engines import taskflow.engines
from taskflow.patterns import linear_flow as lf from taskflow.patterns import linear_flow as lf
from taskflow.persistence import backends
from taskflow import task from taskflow import task
from taskflow.utils import persistence_utils as p_utils from taskflow.utils import persistence_utils as p_utils
import example_utils # noqa
# INTRO: In this example linear_flow is used to group three tasks, one which # INTRO: In this example linear_flow is used to group three tasks, one which
# will suspend the future work the engine may do. This suspend engine is then # will suspend the future work the engine may do. This suspend engine is then
# discarded and the workflow is reloaded from the persisted data and then the # discarded and the workflow is reloaded from the persisted data and then the
@@ -61,17 +65,6 @@ def print_task_states(flowdetail, msg):
print(" %s==%s: %s, result=%s" % item) print(" %s==%s: %s, result=%s" % item)
def get_backend():
try:
backend_uri = sys.argv[1]
except Exception:
backend_uri = 'sqlite://'
backend = backends.fetch({'connection': backend_uri})
backend.get_connection().upgrade()
return backend
def find_flow_detail(backend, lb_id, fd_id): def find_flow_detail(backend, lb_id, fd_id):
conn = backend.get_connection() conn = backend.get_connection()
lb = conn.get_logbook(lb_id) lb = conn.get_logbook(lb_id)
@@ -102,40 +95,38 @@ def flow_factory():
### INITIALIZE PERSISTENCE #################################### ### INITIALIZE PERSISTENCE ####################################
backend = get_backend() with example_utils.get_backend() as backend:
logbook = p_utils.temporary_log_book(backend) logbook = p_utils.temporary_log_book(backend)
### CREATE AND RUN THE FLOW: FIRST ATTEMPT ####################
### CREATE AND RUN THE FLOW: FIRST ATTEMPT #################### flow = flow_factory()
flowdetail = p_utils.create_flow_detail(flow, logbook, backend)
engine = taskflow.engines.load(flow, flow_detail=flowdetail,
backend=backend)
flow = flow_factory() print_task_states(flowdetail, "At the beginning, there is no state")
flowdetail = p_utils.create_flow_detail(flow, logbook, backend) print_wrapped("Running")
engine = taskflow.engines.load(flow, flow_detail=flowdetail, engine.run()
backend=backend) print_task_states(flowdetail, "After running")
print_task_states(flowdetail, "At the beginning, there is no state") ### RE-CREATE, RESUME, RUN ####################################
print_wrapped("Running")
engine.run()
print_task_states(flowdetail, "After running")
print_wrapped("Resuming and running again")
### RE-CREATE, RESUME, RUN #################################### # NOTE(harlowja): reload the flow detail from backend, this will allow us
# to resume the flow from its suspended state, but first we need to search
print_wrapped("Resuming and running again") # for the right flow details in the correct logbook where things are
# stored.
# NOTE(harlowja): reload the flow detail from backend, this will allow us to #
# resume the flow from its suspended state, but first we need to search for # We could avoid re-loading the engine and just do engine.run() again, but
# the right flow details in the correct logbook where things are stored. # this example shows how another process may unsuspend a given flow and
# # start it again for situations where this is useful to-do (say the process
# We could avoid re-loading the engine and just do engine.run() again, but this # running the above flow crashes).
# example shows how another process may unsuspend a given flow and start it flow2 = flow_factory()
# again for situations where this is useful to-do (say the process running flowdetail2 = find_flow_detail(backend, logbook.uuid, flowdetail.uuid)
# the above flow crashes). engine2 = taskflow.engines.load(flow2,
flow2 = flow_factory() flow_detail=flowdetail2,
flowdetail2 = find_flow_detail(backend, logbook.uuid, backend=backend)
flowdetail.uuid) engine2.run()
engine2 = taskflow.engines.load(flow2, print_task_states(flowdetail2, "At the end")
flow_detail=flowdetail2,
backend=backend)
engine2.run()
print_task_states(flowdetail2, "At the end")

View File

@@ -21,6 +21,11 @@ import subprocess
import sys import sys
import tempfile import tempfile
self_dir = os.path.abspath(os.path.dirname(__file__))
sys.path.insert(0, self_dir)
import example_utils # noqa
# INTRO: In this example we create a common persistence database (sqlite based) # INTRO: In this example we create a common persistence database (sqlite based)
# and then we run a few set of processes which themselves use this persistence # and then we run a few set of processes which themselves use this persistence
# database, those processes 'crash' (in a simulated way) by exiting with a # database, those processes 'crash' (in a simulated way) by exiting with a
@@ -58,10 +63,15 @@ def _path_to(name):
def main(): def main():
backend_uri = None
tmp_path = None
try: try:
fd, db_path = tempfile.mkstemp(prefix='tf-resume-example') if example_utils.SQLALCHEMY_AVAILABLE:
os.close(fd) tmp_path = tempfile.mktemp(prefix='tf-resume-example')
backend_uri = 'sqlite:///%s' % db_path backend_uri = "sqlite:///%s" % (tmp_path)
else:
tmp_path = tempfile.mkdtemp(prefix='tf-resume-example')
backend_uri = 'file:///%s' % (tmp_path)
def run_example(name, add_env=None): def run_example(name, add_env=None):
_exec([sys.executable, _path_to(name), backend_uri], add_env) _exec([sys.executable, _path_to(name), backend_uri], add_env)
@@ -78,7 +88,8 @@ def main():
print('\nResuming all failed flows') print('\nResuming all failed flows')
run_example('resume_all.py') run_example('resume_all.py')
finally: finally:
os.unlink(db_path) if tmp_path:
example_utils.rm_path(tmp_path)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@@ -1,31 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from taskflow.persistence import backends
def get_backend():
try:
backend_uri = sys.argv[1]
except Exception:
backend_uri = 'sqlite://'
backend = backends.fetch({'connection': backend_uri})
backend.get_connection().upgrade()
return backend

View File

@@ -25,16 +25,17 @@ logging.basicConfig(level=logging.ERROR)
self_dir = os.path.abspath(os.path.dirname(__file__)) self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath( top_dir = os.path.abspath(
os.path.join(self_dir, os.pardir, os.pardir, os.pardir)) os.path.join(self_dir, os.pardir, os.pardir, os.pardir))
example_dir = os.path.abspath(os.path.join(self_dir, os.pardir))
sys.path.insert(0, top_dir) sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir) sys.path.insert(0, example_dir)
import taskflow.engines import taskflow.engines
from taskflow import states from taskflow import states
import my_utils # noqa import example_utils # noqa
FINISHED_STATES = (states.SUCCESS, states.FAILURE, states.REVERTED) FINISHED_STATES = (states.SUCCESS, states.FAILURE, states.REVERTED)
@@ -48,12 +49,12 @@ def resume(flowdetail, backend):
def main(): def main():
backend = my_utils.get_backend() with example_utils.get_backend() as backend:
logbooks = list(backend.get_connection().get_logbooks()) logbooks = list(backend.get_connection().get_logbooks())
for lb in logbooks: for lb in logbooks:
for fd in lb: for fd in lb:
if fd.state not in FINISHED_STATES: if fd.state not in FINISHED_STATES:
resume(fd, backend) resume(fd, backend)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -25,19 +25,21 @@ logging.basicConfig(level=logging.ERROR)
self_dir = os.path.abspath(os.path.dirname(__file__)) self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath( top_dir = os.path.abspath(
os.path.join(self_dir, os.pardir, os.pardir, os.pardir)) os.path.join(self_dir, os.pardir, os.pardir, os.pardir))
example_dir = os.path.abspath(os.path.join(self_dir, os.pardir))
sys.path.insert(0, top_dir) sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir) sys.path.insert(0, self_dir)
sys.path.insert(0, example_dir)
import taskflow.engines import taskflow.engines
import example_utils # noqa
import my_flows # noqa import my_flows # noqa
import my_utils # noqa
backend = my_utils.get_backend() with example_utils.get_backend() as backend:
engine = taskflow.engines.load_from_factory(my_flows.flow_factory, engine = taskflow.engines.load_from_factory(my_flows.flow_factory,
backend=backend) backend=backend)
print('Running flow %s %s' % (engine.storage.flow_name, print('Running flow %s %s' % (engine.storage.flow_name,
engine.storage.flow_uuid)) engine.storage.flow_uuid))
engine.run() engine.run()

View File

@@ -26,10 +26,12 @@ import time
logging.basicConfig(level=logging.ERROR) logging.basicConfig(level=logging.ERROR)
self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir, os.pardir,
os.pardir)) os.pardir))
sys.path.insert(0, top_dir) sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)
from taskflow.patterns import graph_flow as gf from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf from taskflow.patterns import linear_flow as lf
@@ -40,10 +42,10 @@ from taskflow import engines
from taskflow import exceptions as exc from taskflow import exceptions as exc
from taskflow import task from taskflow import task
from taskflow.persistence import backends
from taskflow.utils import eventlet_utils as e_utils from taskflow.utils import eventlet_utils as e_utils
from taskflow.utils import persistence_utils as p_utils from taskflow.utils import persistence_utils as p_utils
import example_utils # noqa
# INTRO: This examples shows how a hierarchy of flows can be used to create a # INTRO: This examples shows how a hierarchy of flows can be used to create a
# vm in a reliable & resumable manner using taskflow + a miniature version of # vm in a reliable & resumable manner using taskflow + a miniature version of
@@ -67,17 +69,6 @@ def print_wrapped(text):
print("-" * (len(text))) print("-" * (len(text)))
def get_backend():
try:
backend_uri = sys.argv[1]
except Exception:
backend_uri = 'sqlite://'
backend = backends.fetch({'connection': backend_uri})
backend.get_connection().upgrade()
return backend
class PrintText(task.Task): class PrintText(task.Task):
"""Just inserts some text print outs in a workflow.""" """Just inserts some text print outs in a workflow."""
def __init__(self, print_what, no_slow=False): def __init__(self, print_what, no_slow=False):
@@ -243,50 +234,51 @@ def create_flow():
print_wrapped("Initializing") print_wrapped("Initializing")
# Setup the persistence & resumption layer. # Setup the persistence & resumption layer.
backend = get_backend() with example_utils.get_backend() as backend:
try: try:
book_id, flow_id = sys.argv[2].split("+", 1) book_id, flow_id = sys.argv[2].split("+", 1)
if not uuidutils.is_uuid_like(book_id): if not uuidutils.is_uuid_like(book_id):
book_id = None
if not uuidutils.is_uuid_like(flow_id):
flow_id = None
except (IndexError, ValueError):
book_id = None book_id = None
if not uuidutils.is_uuid_like(flow_id):
flow_id = None flow_id = None
except (IndexError, ValueError):
book_id = None
flow_id = None
# Set up how we want our engine to run, serial, parallel... # Set up how we want our engine to run, serial, parallel...
engine_conf = { engine_conf = {
'engine': 'parallel', 'engine': 'parallel',
} }
if e_utils.EVENTLET_AVAILABLE: if e_utils.EVENTLET_AVAILABLE:
engine_conf['executor'] = e_utils.GreenExecutor(5) engine_conf['executor'] = e_utils.GreenExecutor(5)
# Create/fetch a logbook that will track the workflows work. # Create/fetch a logbook that will track the workflows work.
book = None book = None
flow_detail = None flow_detail = None
if all([book_id, flow_id]): if all([book_id, flow_id]):
with contextlib.closing(backend.get_connection()) as conn: with contextlib.closing(backend.get_connection()) as conn:
try: try:
book = conn.get_logbook(book_id) book = conn.get_logbook(book_id)
flow_detail = book.find(flow_id) flow_detail = book.find(flow_id)
except exc.NotFound: except exc.NotFound:
pass pass
if book is None and flow_detail is None: if book is None and flow_detail is None:
book = p_utils.temporary_log_book(backend) book = p_utils.temporary_log_book(backend)
engine = engines.load_from_factory(create_flow, engine = engines.load_from_factory(create_flow,
backend=backend, book=book, backend=backend, book=book,
engine_conf=engine_conf) engine_conf=engine_conf)
print("!! Your tracking id is: '%s+%s'" % (book.uuid, print("!! Your tracking id is: '%s+%s'" % (book.uuid,
engine.storage.flow_uuid)) engine.storage.flow_uuid))
print("!! Please submit this on later runs for tracking purposes") print("!! Please submit this on later runs for tracking purposes")
else: else:
# Attempt to load from a previously potentially partially completed flow. # Attempt to load from a previously partially completed flow.
engine = engines.load_from_detail(flow_detail, engine = engines.load_from_detail(flow_detail,
backend=backend, engine_conf=engine_conf) backend=backend,
engine_conf=engine_conf)
# Make me my vm please! # Make me my vm please!
print_wrapped('Running') print_wrapped('Running')
engine.run() engine.run()
# How to use. # How to use.
# #

View File

@@ -26,10 +26,12 @@ import time
logging.basicConfig(level=logging.ERROR) logging.basicConfig(level=logging.ERROR)
self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir, os.pardir,
os.pardir)) os.pardir))
sys.path.insert(0, top_dir) sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)
from taskflow.patterns import graph_flow as gf from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf from taskflow.patterns import linear_flow as lf
@@ -37,9 +39,9 @@ from taskflow.patterns import linear_flow as lf
from taskflow import engines from taskflow import engines
from taskflow import task from taskflow import task
from taskflow.persistence import backends
from taskflow.utils import persistence_utils as p_utils from taskflow.utils import persistence_utils as p_utils
import example_utils # noqa
# INTRO: This examples shows how a hierarchy of flows can be used to create a # INTRO: This examples shows how a hierarchy of flows can be used to create a
# pseudo-volume in a reliable & resumable manner using taskflow + a miniature # pseudo-volume in a reliable & resumable manner using taskflow + a miniature
@@ -69,17 +71,6 @@ def find_flow_detail(backend, book_id, flow_id):
return lb.find(flow_id) return lb.find(flow_id)
def get_backend():
try:
backend_uri = sys.argv[1]
except Exception:
backend_uri = 'sqlite://'
backend = backends.fetch({'connection': backend_uri})
backend.get_connection().upgrade()
return backend
class PrintText(task.Task): class PrintText(task.Task):
def __init__(self, print_what, no_slow=False): def __init__(self, print_what, no_slow=False):
content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8] content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
@@ -131,38 +122,39 @@ flow = lf.Flow("root").add(
PrintText("Finished volume create", no_slow=True)) PrintText("Finished volume create", no_slow=True))
# Setup the persistence & resumption layer. # Setup the persistence & resumption layer.
backend = get_backend() with example_utils.get_backend() as backend:
try: try:
book_id, flow_id = sys.argv[2].split("+", 1) book_id, flow_id = sys.argv[2].split("+", 1)
except (IndexError, ValueError): except (IndexError, ValueError):
book_id = None book_id = None
flow_id = None flow_id = None
if not all([book_id, flow_id]): if not all([book_id, flow_id]):
# If no 'tracking id' (think a fedex or ups tracking id) is provided then # If no 'tracking id' (think a fedex or ups tracking id) is provided
# we create one by creating a logbook (where flow details are stored) and # then we create one by creating a logbook (where flow details are
# creating a flow detail (where flow and task state is stored). The # stored) and creating a flow detail (where flow and task state is
# combination of these 2 objects unique ids (uuids) allows the users of # stored). The combination of these 2 objects unique ids (uuids) allows
# taskflow to reassociate the workflows that were potentially running (and # the users of taskflow to reassociate the workflows that were
# which may have partially completed) back with taskflow so that those # potentially running (and which may have partially completed) back
# workflows can be resumed (or reverted) after a process/thread/engine # with taskflow so that those workflows can be resumed (or reverted)
# has failed in someway. # after a process/thread/engine has failed in someway.
logbook = p_utils.temporary_log_book(backend) logbook = p_utils.temporary_log_book(backend)
flow_detail = p_utils.create_flow_detail(flow, logbook, backend) flow_detail = p_utils.create_flow_detail(flow, logbook, backend)
print("!! Your tracking id is: '%s+%s'" % (logbook.uuid, flow_detail.uuid)) print("!! Your tracking id is: '%s+%s'" % (logbook.uuid,
print("!! Please submit this on later runs for tracking purposes") flow_detail.uuid))
else: print("!! Please submit this on later runs for tracking purposes")
flow_detail = find_flow_detail(backend, book_id, flow_id) else:
flow_detail = find_flow_detail(backend, book_id, flow_id)
# Annnnd load and run. # Annnnd load and run.
engine_conf = { engine_conf = {
'engine': 'serial', 'engine': 'serial',
} }
engine = engines.load(flow, engine = engines.load(flow,
flow_detail=flow_detail, flow_detail=flow_detail,
backend=backend, backend=backend,
engine_conf=engine_conf) engine_conf=engine_conf)
engine.run() engine.run()
# How to use. # How to use.
# #

View File

@@ -34,6 +34,7 @@ import os
import re import re
import subprocess import subprocess
import sys import sys
import taskflow.test import taskflow.test
ROOT_DIR = os.path.abspath( ROOT_DIR = os.path.abspath(
@@ -48,9 +49,8 @@ def root_path(*args):
def run_example(name): def run_example(name):
path = root_path('taskflow', 'examples', '%s.py' % name) path = root_path('taskflow', 'examples', '%s.py' % name)
obj = subprocess.Popen( obj = subprocess.Popen([sys.executable, path],
[sys.executable, path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output = obj.communicate() output = obj.communicate()
if output[1]: if output[1]:
raise RuntimeError('Example wrote to stderr:\n%s' raise RuntimeError('Example wrote to stderr:\n%s'
@@ -59,15 +59,14 @@ def run_example(name):
def expected_output_path(name): def expected_output_path(name):
return root_path('taskflow', 'examples', return root_path('taskflow', 'examples', '%s.out.txt' % name)
'%s.out.txt' % name)
def list_examples(): def list_examples():
ext = '.py'
examples_dir = root_path('taskflow', 'examples') examples_dir = root_path('taskflow', 'examples')
for filename in os.listdir(examples_dir): for filename in os.listdir(examples_dir):
if filename.endswith(ext): name, ext = os.path.splitext(filename)
if ext == ".py" and 'utils' not in name.lower():
yield filename[:-len(ext)] yield filename[:-len(ext)]

View File

@@ -19,11 +19,19 @@
import os import os
import tempfile import tempfile
from taskflow.persistence.backends import impl_sqlalchemy import testtools
try:
from taskflow.persistence.backends import impl_sqlalchemy
SQLALCHEMY_AVAILABLE = True
except ImportError:
SQLALCHEMY_AVAILABLE = False
from taskflow import test from taskflow import test
from taskflow.tests.unit.persistence import base from taskflow.tests.unit.persistence import base
@testtools.skipIf(not SQLALCHEMY_AVAILABLE, 'sqlalchemy is not available')
class SqlPersistenceTest(test.TestCase, base.PersistenceTestMixin): class SqlPersistenceTest(test.TestCase, base.PersistenceTestMixin):
"""Inherits from the base test and sets up a sqlite temporary db.""" """Inherits from the base test and sets up a sqlite temporary db."""
def _get_connection(self): def _get_connection(self):

View File

@@ -1 +0,0 @@
eventlet>=0.13.0

211
tools/toxgen.py Executable file
View File

@@ -0,0 +1,211 @@
#!/usr/bin/env python
# From: https://bitbucket.org/cdevienne/toxgen (pypi soon hopefully) and
# modified slightly to work in python 2.6 and set some values that are not
# being set.
#
# TODO(harlowja): remove me when toxgen is a pypi package.
"""
Produce a tox.ini file from a template config file.
The template config file is a standard tox.ini file with additional sections.
Theses sections will be combined to create new testenv: sections if they do
not exists yet.
"""
import collections
import itertools
import optparse
import os
import six
from six.moves import configparser
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
HEADER = '# DO NOT EDIT THIS FILE - it is machine generated from %(filename)s'
SKIP_VENVS = frozenset(['venv'])
parser = optparse.OptionParser(epilog=__doc__)
parser.add_option('-i', '--input', dest='input',
default='tox-tmpl.ini', metavar='FILE')
parser.add_option('-o', '--output', dest='output',
default='tox.ini', metavar='FILE')
class AxisItem(object):
def __init__(self, axis, name, config):
self.axis = axis
self.isdefault = name[-1] == '*'
self.name = name[:-1] if self.isdefault else name
self.load(config)
def load(self, config):
sectionname = 'axis:%s:%s' % (self.axis.name, self.name)
if config.has_section(sectionname):
self.options = dict(config.items(sectionname))
else:
self.options = dict()
for name, value in self.axis.defaults.items():
if name not in self.options:
self.options[name] = value
class Axis(object):
def __init__(self, name, config):
self.name = name
self.load(config)
def load(self, config):
self.items = dict()
values = config.get('axes', self.name).split(',')
if config.has_section('axis:%s' % self.name):
self.defaults = dict(
config.items('axis:%s' % self.name)
)
else:
self.defaults = {}
for value in values:
self.items[value.strip('*')] = AxisItem(self, value, config)
def format_list(contents, max_len=80, sep=","):
lines = []
for line in contents:
if not lines:
lines.append(line + ",")
else:
last_len = len(lines[-1])
if last_len + len(line) >= max_len:
lines.append(str(line) + sep)
else:
lines[-1] = lines[-1] + str(line) + sep
return "\n".join(lines).rstrip(",")
def render(incfg, filename, adjust_envlist=True):
test_envs = set()
for s in incfg.sections():
if s.startswith("testenv:"):
env = s[len("testenv:"):].strip()
if env in SKIP_VENVS or not env:
continue
test_envs.add(env)
test_envs = [s for s in test_envs if s]
try:
envlist = incfg.get("tox", 'envlist')
envlist = [e.strip() for e in envlist.split(",")]
envlist = set([e for e in envlist if e])
except (configparser.NoOptionError, configparser.NoSectionError):
envlist = set()
for e in test_envs:
if e not in envlist:
envlist.add(e)
if not incfg.has_section("tox"):
incfg.add_section("tox")
incfg.set("tox", "envlist",
format_list(list(sorted(envlist)), max_len=-1))
text = six.StringIO()
incfg.write(text)
contents = [
HEADER % {'filename': os.path.basename(filename)},
'',
# Remove how configparser uses tabs instead of spaces, madness...
text.getvalue().replace("\t", " " * 4),
]
return "\n".join(contents)
def compile_template(incfg):
axes = dict()
if incfg.has_section('axes'):
for axis in incfg.options('axes'):
axes[axis] = Axis(axis, incfg)
out = configparser.ConfigParser(dict_type=OrderedDict)
for section in incfg.sections():
if section == 'axes' or section.startswith('axis:'):
continue
out.add_section(section)
for name, value in incfg.items(section):
out.set(section, name, value)
items = [axis.items.keys() for axis in axes.values()]
for combination in itertools.product(*items):
options = {}
section_name = (
'testenv:' + '-'.join([item for item in combination if item])
)
section_alt_name = (
'testenv:' + '-'.join([
itemname
for axis, itemname in zip(axes.values(), combination)
if itemname and not axis.items[itemname].isdefault
])
)
if section_alt_name == section_name:
section_alt_name = None
axes_items = [
'%s:%s' % (axis, itemname)
for axis, itemname in zip(axes, combination)
]
for axis, itemname in zip(axes.values(), combination):
axis_options = axis.items[itemname].options
if 'constraints' in axis_options:
constraints = axis_options['constraints'].split('\n')
for c in constraints:
if c.startswith('!') and c[1:] in axes_items:
continue
for name, value in axis_options.items():
if name in options:
options[name] += value
else:
options[name] = value
constraints = options.pop('constraints', '').split('\n')
neg_constraints = [c[1:] for c in constraints if c and c[0] == '!']
if not set(neg_constraints).isdisjoint(axes_items):
continue
if not out.has_section(section_name):
out.add_section(section_name)
if (section_alt_name and not out.has_section(section_alt_name)):
out.add_section(section_alt_name)
for name, value in reversed(options.items()):
if not out.has_option(section_name, name):
out.set(section_name, name, value)
if section_alt_name and not out.has_option(section_alt_name, name):
out.set(section_alt_name, name, value)
return out
def main():
options, args = parser.parse_args()
tmpl = configparser.ConfigParser()
with open(options.input, 'rb') as fh:
tmpl.readfp(fh, filename=options.input)
with open(options.output, 'wb') as outfile:
text = render(compile_template(tmpl), options.input)
outfile.write(text)
if __name__ == '__main__':
main()

84
tox-tmpl.ini Normal file
View File

@@ -0,0 +1,84 @@
# NOTE(harlowja): this is a template, not a fully-generated tox.ini, use toxgen
# to translate this into a fully specified tox.ini file before using. Changes
# made to tox.ini will only be reflected if ran through the toxgen generator.
[tox]
minversion = 1.6
skipsdist = True
[testenv]
usedevelop = True
install_command = pip install {opts} {packages}
setenv = VIRTUAL_ENV={envdir}
LANG=en_US.UTF-8
LANGUAGE=en_US:en
LC_ALL=C
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = python setup.py testr --slowest --testr-args='{posargs}'
[tox:jenkins]
downloadcache = ~/cache/pip
[testenv:pep8]
commands =
flake8 {posargs}
[testenv:pylint]
setenv = VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/requirements.txt
pylint==0.26.0
commands = pylint
[testenv:cover]
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = python setup.py testr --coverage --testr-args='{posargs}'
[testenv:venv]
commands = {posargs}
[flake8]
ignore = H402
builtins = _
exclude = .venv,.tox,dist,doc,./taskflow/openstack/common,*egg,.git,build,tools
[axes]
python = py26,py27,py33
sqlalchemy = sa7,sa8,sa9,*
eventlet = ev,*
[axis:python:py26]
basepython = python2.6
deps = {[testenv]deps}
[axis:python:py27]
basepython = python2.7
deps = {[testenv]deps}
[axis:python:py33]
basepython = python3.3
deps = {[testenv]deps}
[axis:eventlet:ev]
deps =
eventlet>=0.13.0
constraints=
!python:py33
[axis:sqlalchemy:sa7]
deps =
SQLAlchemy<=0.7.99
alembic>=0.4.1
[axis:sqlalchemy:sa8]
deps =
SQLAlchemy>0.7.99
SQLAlchemy<=0.8.99
alembic>=0.4.1
[axis:sqlalchemy:sa9]
deps =
SQLAlchemy>0.8.99
SQLAlchemy<=0.9.99
alembic>=0.4.1

200
tox.ini
View File

@@ -1,47 +1,60 @@
# DO NOT EDIT THIS FILE - it is machine generated from tox-tmpl.ini
[tox] [tox]
minversion = 1.6 minversion = 1.6
skipsdist = True skipsdist = True
envlist = py26,py27,py33,pep8 envlist = cover,
pep8,
py26,
py26-ev,
py26-sa7,
py26-sa7-ev,
py26-sa8,
py26-sa8-ev,
py26-sa9,
py26-sa9-ev,
py27,
py27-ev,
py27-sa7,
py27-sa7-ev,
py27-sa8,
py27-sa8-ev,
py27-sa9,
py27-sa9-ev,
py33,
py33-sa7,
py33-sa8,
py33-sa9,
pylint
[testenv] [testenv]
usedevelop = True
install_command = pip install {opts} {packages} install_command = pip install {opts} {packages}
setenv = VIRTUAL_ENV={envdir}
LANG=en_US.UTF-8
LANGUAGE=en_US:en
LC_ALL=C
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = python setup.py testr --slowest --testr-args='{posargs}' commands = python setup.py testr --slowest --testr-args='{posargs}'
setenv = VIRTUAL_ENV={envdir}
[testenv:py26] LANG=en_US.UTF-8
LANGUAGE=en_US:en
LC_ALL=C
deps = -r{toxinidir}/requirements.txt deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt -r{toxinidir}/test-requirements.txt
-r{toxinidir}/test-2.x-requirements.txt usedevelop = True
[testenv:py27] [testenv:pylint]
commands = pylint
setenv = VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/requirements.txt deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt pylint==0.26.0
-r{toxinidir}/test-2.x-requirements.txt
[testenv:cover]
commands = python setup.py testr --coverage --testr-args='{posargs}'
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
[tox:jenkins] [tox:jenkins]
downloadcache = ~/cache/pip downloadcache = ~/cache/pip
[testenv:pep8] [testenv:pep8]
commands = commands =
flake8 {posargs} flake8 {posargs}
[testenv:pylint]
setenv = VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/requirements.txt
pylint==0.26.0
commands = pylint
[testenv:cover]
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
-r{toxinidir}/test-2.x-requirements.txt
commands = python setup.py testr --coverage --testr-args='{posargs}'
[testenv:venv] [testenv:venv]
commands = {posargs} commands = {posargs}
@@ -50,3 +63,132 @@ commands = {posargs}
ignore = H402 ignore = H402
builtins = _ builtins = _
exclude = .venv,.tox,dist,doc,./taskflow/openstack/common,*egg,.git,build,tools exclude = .venv,.tox,dist,doc,./taskflow/openstack/common,*egg,.git,build,tools
[testenv:py27]
basepython = python2.7
deps = {[testenv]deps}
[testenv:py27-ev]
basepython = python2.7
deps = {[testenv]deps}
eventlet>=0.13.0
[testenv:py27-sa9]
basepython = python2.7
deps = {[testenv]deps}
SQLAlchemy>0.8.99
SQLAlchemy<=0.9.99
alembic>=0.4.1
[testenv:py27-sa9-ev]
basepython = python2.7
deps = {[testenv]deps}
SQLAlchemy>0.8.99
SQLAlchemy<=0.9.99
alembic>=0.4.1
eventlet>=0.13.0
[testenv:py27-sa8]
basepython = python2.7
deps = {[testenv]deps}
SQLAlchemy>0.7.99
SQLAlchemy<=0.8.99
alembic>=0.4.1
[testenv:py27-sa8-ev]
basepython = python2.7
deps = {[testenv]deps}
SQLAlchemy>0.7.99
SQLAlchemy<=0.8.99
alembic>=0.4.1
eventlet>=0.13.0
[testenv:py27-sa7]
basepython = python2.7
deps = {[testenv]deps}
SQLAlchemy<=0.7.99
alembic>=0.4.1
[testenv:py27-sa7-ev]
basepython = python2.7
deps = {[testenv]deps}
SQLAlchemy<=0.7.99
alembic>=0.4.1
eventlet>=0.13.0
[testenv:py26]
basepython = python2.6
deps = {[testenv]deps}
[testenv:py26-ev]
basepython = python2.6
deps = {[testenv]deps}
eventlet>=0.13.0
[testenv:py26-sa9]
basepython = python2.6
deps = {[testenv]deps}
SQLAlchemy>0.8.99
SQLAlchemy<=0.9.99
alembic>=0.4.1
[testenv:py26-sa9-ev]
basepython = python2.6
deps = {[testenv]deps}
SQLAlchemy>0.8.99
SQLAlchemy<=0.9.99
alembic>=0.4.1
eventlet>=0.13.0
[testenv:py26-sa8]
basepython = python2.6
deps = {[testenv]deps}
SQLAlchemy>0.7.99
SQLAlchemy<=0.8.99
alembic>=0.4.1
[testenv:py26-sa8-ev]
basepython = python2.6
deps = {[testenv]deps}
SQLAlchemy>0.7.99
SQLAlchemy<=0.8.99
alembic>=0.4.1
eventlet>=0.13.0
[testenv:py26-sa7]
basepython = python2.6
deps = {[testenv]deps}
SQLAlchemy<=0.7.99
alembic>=0.4.1
[testenv:py26-sa7-ev]
basepython = python2.6
deps = {[testenv]deps}
SQLAlchemy<=0.7.99
alembic>=0.4.1
eventlet>=0.13.0
[testenv:py33]
basepython = python3.3
deps = {[testenv]deps}
[testenv:py33-sa9]
basepython = python3.3
deps = {[testenv]deps}
SQLAlchemy>0.8.99
SQLAlchemy<=0.9.99
alembic>=0.4.1
[testenv:py33-sa8]
basepython = python3.3
deps = {[testenv]deps}
SQLAlchemy>0.7.99
SQLAlchemy<=0.8.99
alembic>=0.4.1
[testenv:py33-sa7]
basepython = python3.3
deps = {[testenv]deps}
SQLAlchemy<=0.7.99
alembic>=0.4.1