Merge "Remove **most** usage of taskflow.utils in examples"

This commit is contained in:
Jenkins
2015-07-21 21:36:34 +00:00
committed by Gerrit Code Review
12 changed files with 159 additions and 134 deletions

View File

@@ -29,9 +29,7 @@ sys.path.insert(0, self_dir)
from taskflow import engines
from taskflow.patterns import linear_flow as lf
from taskflow.persistence import backends
from taskflow import task
from taskflow.utils import persistence_utils as pu
# INTRO: in this example we create a dummy flow with a dummy task, and run
# it using a in-memory backend and pre/post run we dump out the contents
@@ -43,22 +41,18 @@ class PrintTask(task.Task):
def execute(self):
print("Running '%s'" % self.name)
backend = backends.fetch({
'connection': 'memory://',
})
book, flow_detail = pu.temporary_flow_detail(backend=backend)
# Make a little flow and run it...
f = lf.Flow('root')
for alpha in ['a', 'b', 'c']:
f.add(PrintTask(alpha))
e = engines.load(f, flow_detail=flow_detail,
book=book, backend=backend)
e = engines.load(f)
e.compile()
e.prepare()
# After prepare the storage layer + backend can now be accessed safely...
backend = e.storage.backend
print("----------")
print("Before run")
print("----------")

View File

@@ -31,7 +31,6 @@ from taskflow import engines
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
from taskflow import task
from taskflow.utils import eventlet_utils
# INTRO: This is the defacto hello world equivalent for taskflow; it shows how
@@ -82,25 +81,34 @@ song.add(PrinterTask("conductor@begin",
show_name=False, inject={'output': "*dong*"}))
# Run in parallel using eventlet green threads...
if eventlet_utils.EVENTLET_AVAILABLE:
with futurist.GreenThreadPoolExecutor() as executor:
try:
executor = futurist.GreenThreadPoolExecutor()
except RuntimeError:
# No eventlet currently active, skip running with it...
pass
else:
print("-- Running in parallel using eventlet --")
with executor:
e = engines.load(song, executor=executor, engine='parallel')
e.run()
# Run in parallel using real threads...
with futurist.ThreadPoolExecutor(max_workers=1) as executor:
print("-- Running in parallel using threads --")
e = engines.load(song, executor=executor, engine='parallel')
e.run()
# Run in parallel using external processes...
with futurist.ProcessPoolExecutor(max_workers=1) as executor:
print("-- Running in parallel using processes --")
e = engines.load(song, executor=executor, engine='parallel')
e.run()
# Run serially (aka, if the workflow could have been ran in parallel, it will
# not be when ran in this mode)...
print("-- Running serially --")
e = engines.load(song, engine='serial')
e.run()

View File

@@ -33,7 +33,6 @@ from six.moves import range as compat_range
from taskflow import engines
from taskflow.patterns import unordered_flow as uf
from taskflow import task
from taskflow.utils import eventlet_utils
# INTRO: This example walks through a miniature workflow which does a parallel
# table modification where each row in the table gets adjusted by a thread, or
@@ -97,9 +96,10 @@ def main():
f = make_flow(tbl)
# Now run it (using the specified executor)...
if eventlet_utils.EVENTLET_AVAILABLE:
try:
executor = futurist.GreenThreadPoolExecutor(max_workers=5)
else:
except RuntimeError:
# No eventlet currently active, use real threads instead.
executor = futurist.ThreadPoolExecutor(max_workers=5)
try:
e = engines.load(f, engine='parallel', executor=executor)

View File

@@ -33,7 +33,6 @@ from taskflow import engines
from taskflow.patterns import linear_flow as lf
from taskflow.persistence import models
from taskflow import task
from taskflow.utils import persistence_utils as p_utils
import example_utils as eu # noqa
@@ -110,4 +109,4 @@ with eu.get_backend(backend_uri) as backend:
traceback.print_exc(file=sys.stdout)
eu.print_wrapped("Book contents")
print(p_utils.pformat(book))
print(book.pformat())

View File

@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import logging
import os
import sys
@@ -27,10 +28,12 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)
from oslo_utils import uuidutils
import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow.persistence import models
from taskflow import task
from taskflow.utils import persistence_utils as p_utils
import example_utils as eu # noqa
@@ -99,19 +102,25 @@ def flow_factory():
# INITIALIZE PERSISTENCE ####################################
with eu.get_backend() as backend:
logbook = p_utils.temporary_log_book(backend)
# Create a place where the persistence information will be stored.
book = models.LogBook("example")
flow_detail = models.FlowDetail("resume from backend example",
uuid=uuidutils.generate_uuid())
book.add(flow_detail)
with contextlib.closing(backend.get_connection()) as conn:
conn.save_logbook(book)
# CREATE AND RUN THE FLOW: FIRST ATTEMPT ####################
flow = flow_factory()
flowdetail = p_utils.create_flow_detail(flow, logbook, backend)
engine = taskflow.engines.load(flow, flow_detail=flowdetail,
backend=backend)
engine = taskflow.engines.load(flow, flow_detail=flow_detail,
book=book, backend=backend)
print_task_states(flowdetail, "At the beginning, there is no state")
print_task_states(flow_detail, "At the beginning, there is no state")
eu.print_wrapped("Running")
engine.run()
print_task_states(flowdetail, "After running")
print_task_states(flow_detail, "After running")
# RE-CREATE, RESUME, RUN ####################################
@@ -127,9 +136,9 @@ with eu.get_backend() as backend:
# start it again for situations where this is useful to-do (say the process
# running the above flow crashes).
flow2 = flow_factory()
flowdetail2 = find_flow_detail(backend, logbook.uuid, flowdetail.uuid)
flow_detail_2 = find_flow_detail(backend, book.uuid, flow_detail.uuid)
engine2 = taskflow.engines.load(flow2,
flow_detail=flowdetail2,
backend=backend)
flow_detail=flow_detail_2,
backend=backend, book=book)
engine2.run()
print_task_states(flowdetail2, "At the end")
print_task_states(flow_detail_2, "At the end")

View File

@@ -38,9 +38,8 @@ from taskflow import engines
from taskflow import exceptions as exc
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow.persistence import models
from taskflow import task
from taskflow.utils import eventlet_utils
from taskflow.utils import persistence_utils as p_utils
import example_utils as eu # noqa
@@ -226,6 +225,8 @@ eu.print_wrapped("Initializing")
# Setup the persistence & resumption layer.
with eu.get_backend() as backend:
# Try to find a previously passed in tracking id...
try:
book_id, flow_id = sys.argv[2].split("+", 1)
if not uuidutils.is_uuid_like(book_id):
@@ -237,14 +238,17 @@ with eu.get_backend() as backend:
flow_id = None
# Set up how we want our engine to run, serial, parallel...
executor = None
if eventlet_utils.EVENTLET_AVAILABLE:
executor = futurist.GreenThreadPoolExecutor(5)
try:
executor = futurist.GreenThreadPoolExecutor(max_workers=5)
except RuntimeError:
# No eventlet installed, just let the default be used instead.
executor = None
# Create/fetch a logbook that will track the workflows work.
book = None
flow_detail = None
if all([book_id, flow_id]):
# Try to find in a prior logbook and flow detail...
with contextlib.closing(backend.get_connection()) as conn:
try:
book = conn.get_logbook(book_id)
@@ -252,7 +256,9 @@ with eu.get_backend() as backend:
except exc.NotFound:
pass
if book is None and flow_detail is None:
book = p_utils.temporary_log_book(backend)
book = models.LogBook("vm-boot")
with contextlib.closing(backend.get_connection()) as conn:
conn.save_logbook(book)
engine = engines.load_from_factory(create_flow,
backend=backend, book=book,
engine='parallel',

View File

@@ -31,11 +31,13 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)
from oslo_utils import uuidutils
from taskflow import engines
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow.persistence import models
from taskflow import task
from taskflow.utils import persistence_utils as p_utils
import example_utils # noqa
@@ -134,9 +136,12 @@ with example_utils.get_backend() as backend:
# potentially running (and which may have partially completed) back
# with taskflow so that those workflows can be resumed (or reverted)
# after a process/thread/engine has failed in someway.
logbook = p_utils.temporary_log_book(backend)
flow_detail = p_utils.create_flow_detail(flow, logbook, backend)
print("!! Your tracking id is: '%s+%s'" % (logbook.uuid,
book = models.LogBook('resume-volume-create')
flow_detail = models.FlowDetail("root", uuid=uuidutils.generate_uuid())
book.add(flow_detail)
with contextlib.closing(backend.get_connection()) as conn:
conn.save_logbook(book)
print("!! Your tracking id is: '%s+%s'" % (book.uuid,
flow_detail.uuid))
print("!! Please submit this on later runs for tracking purposes")
else:

View File

@@ -32,9 +32,7 @@ sys.path.insert(0, self_dir)
from taskflow import engines
from taskflow.patterns import linear_flow as lf
from taskflow.persistence import backends as persistence_backends
from taskflow import task
from taskflow.utils import persistence_utils
# INTRO: This example shows how to run a set of engines at the same time, each
@@ -73,12 +71,9 @@ flows = []
for i in range(0, flow_count):
f = make_alphabet_flow(i + 1)
flows.append(make_alphabet_flow(i + 1))
be = persistence_backends.fetch(conf={'connection': 'memory'})
book = persistence_utils.temporary_log_book(be)
engine_iters = []
for f in flows:
fd = persistence_utils.create_flow_detail(f, book, be)
e = engines.load(f, flow_detail=fd, backend=be, book=book)
e = engines.load(f)
e.compile()
e.storage.inject({'A': 'A'})
e.prepare()

View File

@@ -29,9 +29,7 @@ sys.path.insert(0, self_dir)
from taskflow import engines
from taskflow.patterns import linear_flow as lf
from taskflow.persistence import backends as persistence_backends
from taskflow import task
from taskflow.utils import persistence_utils
# INTRO: These examples show how to run an engine using the engine iteration
# capability, in between iterations other activities occur (in this case a
@@ -48,10 +46,7 @@ f = lf.Flow("counter")
for i in range(0, 10):
f.add(EchoNameTask("echo_%s" % (i + 1)))
be = persistence_backends.fetch(conf={'connection': 'memory'})
book = persistence_utils.temporary_log_book(be)
fd = persistence_utils.create_flow_detail(f, book, be)
e = engines.load(f, flow_detail=fd, backend=be, book=book)
e = engines.load(f)
e.compile()
e.prepare()

View File

@@ -27,9 +27,7 @@ sys.path.insert(0, top_dir)
from taskflow import engines
from taskflow.patterns import graph_flow as gf
from taskflow.persistence import backends
from taskflow import task
from taskflow.utils import persistence_utils as pu
class DummyTask(task.Task):
@@ -42,18 +40,15 @@ def allow(history):
return False
# Declare our work to be done...
r = gf.Flow("root")
r_a = DummyTask('r-a')
r_b = DummyTask('r-b')
r.add(r_a, r_b)
r.link(r_a, r_b, decider=allow)
backend = backends.fetch({
'connection': 'memory://',
})
book, flow_detail = pu.temporary_flow_detail(backend=backend)
e = engines.load(r, flow_detail=flow_detail, book=book, backend=backend)
# Setup and run the engine layer.
e = engines.load(r)
e.compile()
e.prepare()
e.run()
@@ -62,6 +57,7 @@ e.run()
print("---------")
print("After run")
print("---------")
backend = e.storage.backend
entries = [os.path.join(backend.memory.root_path, child)
for child in backend.memory.ls(backend.memory.root_path)]
while entries:

View File

@@ -17,6 +17,7 @@
import abc
import copy
import os
from oslo_utils import timeutils
from oslo_utils import uuidutils
@@ -26,12 +27,42 @@ from taskflow import exceptions as exc
from taskflow import logging
from taskflow import states
from taskflow.types import failure as ft
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
# Internal helpers...
def _format_meta(metadata, indent):
"""Format the common metadata dictionary in the same manner."""
if not metadata:
return []
lines = [
'%s- metadata:' % (" " * indent),
]
for (k, v) in metadata.items():
# Progress for now is a special snowflake and will be formatted
# in percent format.
if k == 'progress' and isinstance(v, misc.NUMERIC_TYPES):
v = "%0.2f%%" % (v * 100.0)
lines.append("%s+ %s = %s" % (" " * (indent + 2), k, v))
return lines
def _format_shared(obj, indent):
"""Format the common shared attributes in the same manner."""
if obj is None:
return []
lines = []
for attr_name in ("uuid", "state"):
if not hasattr(obj, attr_name):
continue
lines.append("%s- %s = %s" % (" " * indent, attr_name,
getattr(obj, attr_name)))
return lines
def _is_all_none(arg, *args):
if arg is not None:
return False
@@ -104,6 +135,33 @@ class LogBook(object):
self.updated_at = None
self.meta = {}
def pformat(self, indent=0, linesep=os.linesep):
"""Pretty formats this logbook into a string.
>>> from taskflow.persistence import models
>>> tmp = models.LogBook("example")
>>> print(tmp.pformat())
LogBook: 'example'
- uuid = ...
- created_at = ...
"""
cls_name = self.__class__.__name__
lines = ["%s%s: '%s'" % (" " * indent, cls_name, self.name)]
lines.extend(_format_shared(self, indent=indent + 1))
lines.extend(_format_meta(self.meta, indent=indent + 1))
if self.created_at is not None:
lines.append("%s- created_at = %s"
% (" " * (indent + 1),
timeutils.isotime(self.created_at)))
if self.updated_at is not None:
lines.append("%s- updated_at = %s"
% (" " * (indent + 1),
timeutils.isotime(self.updated_at)))
for flow_detail in self:
lines.append(flow_detail.pformat(indent=indent + 1,
linesep=linesep))
return linesep.join(lines)
def add(self, fd):
"""Adds a new flow detail into this logbook.
@@ -275,6 +333,27 @@ class FlowDetail(object):
self.meta = fd.meta
return self
def pformat(self, indent=0, linesep=os.linesep):
"""Pretty formats this flow detail into a string.
>>> from oslo_utils import uuidutils
>>> from taskflow.persistence import models
>>> flow_detail = models.FlowDetail("example",
... uuid=uuidutils.generate_uuid())
>>> print(flow_detail.pformat())
FlowDetail: 'example'
- uuid = ...
- state = ...
"""
cls_name = self.__class__.__name__
lines = ["%s%s: '%s'" % (" " * indent, cls_name, self.name)]
lines.extend(_format_shared(self, indent=indent + 1))
lines.extend(_format_meta(self.meta, indent=indent + 1))
for atom_detail in self:
lines.append(atom_detail.pformat(indent=indent + 1,
linesep=linesep))
return linesep.join(lines)
def merge(self, fd, deep_copy=False):
"""Merges the current object state with the given one's state.
@@ -611,6 +690,20 @@ class AtomDetail(object):
def copy(self):
"""Copies this atom detail."""
def pformat(self, indent=0, linesep=os.linesep):
"""Pretty formats this atom detail into a string."""
cls_name = self.__class__.__name__
lines = ["%s%s: '%s'" % (" " * (indent), cls_name, self.name)]
lines.extend(_format_shared(self, indent=indent + 1))
lines.append("%s- version = %s"
% (" " * (indent + 1), misc.get_version_string(self)))
lines.append("%s- results = %s"
% (" " * (indent + 1), self.results))
lines.append("%s- failure = %s" % (" " * (indent + 1),
bool(self.failure)))
lines.extend(_format_meta(self.meta, indent=indent + 1))
return linesep.join(lines)
class TaskDetail(AtomDetail):
"""A task detail (an atom detail typically associated with a |tt| atom).

View File

@@ -15,14 +15,11 @@
# under the License.
import contextlib
import os
from oslo_utils import timeutils
from oslo_utils import uuidutils
from taskflow import logging
from taskflow.persistence import models
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
@@ -97,75 +94,3 @@ def create_flow_detail(flow, book=None, backend=None, meta=None):
return book.find(flow_id)
else:
return flow_detail
def _format_meta(metadata, indent):
"""Format the common metadata dictionary in the same manner."""
if not metadata:
return []
lines = [
'%s- metadata:' % (" " * indent),
]
for (k, v) in metadata.items():
# Progress for now is a special snowflake and will be formatted
# in percent format.
if k == 'progress' and isinstance(v, misc.NUMERIC_TYPES):
v = "%0.2f%%" % (v * 100.0)
lines.append("%s+ %s = %s" % (" " * (indent + 2), k, v))
return lines
def _format_shared(obj, indent):
"""Format the common shared attributes in the same manner."""
if obj is None:
return []
lines = []
for attr_name in ("uuid", "state"):
if not hasattr(obj, attr_name):
continue
lines.append("%s- %s = %s" % (" " * indent, attr_name,
getattr(obj, attr_name)))
return lines
def pformat_atom_detail(atom_detail, indent=0):
"""Pretty formats a atom detail."""
detail_type = models.atom_detail_type(atom_detail)
lines = ["%s%s: '%s'" % (" " * (indent), detail_type, atom_detail.name)]
lines.extend(_format_shared(atom_detail, indent=indent + 1))
lines.append("%s- version = %s"
% (" " * (indent + 1), misc.get_version_string(atom_detail)))
lines.append("%s- results = %s"
% (" " * (indent + 1), atom_detail.results))
lines.append("%s- failure = %s" % (" " * (indent + 1),
bool(atom_detail.failure)))
lines.extend(_format_meta(atom_detail.meta, indent=indent + 1))
return os.linesep.join(lines)
def pformat_flow_detail(flow_detail, indent=0):
"""Pretty formats a flow detail."""
lines = ["%sFlow: '%s'" % (" " * indent, flow_detail.name)]
lines.extend(_format_shared(flow_detail, indent=indent + 1))
lines.extend(_format_meta(flow_detail.meta, indent=indent + 1))
for task_detail in flow_detail:
lines.append(pformat_atom_detail(task_detail, indent=indent + 1))
return os.linesep.join(lines)
def pformat(book, indent=0):
"""Pretty formats a logbook."""
lines = ["%sLogbook: '%s'" % (" " * indent, book.name)]
lines.extend(_format_shared(book, indent=indent + 1))
lines.extend(_format_meta(book.meta, indent=indent + 1))
if book.created_at is not None:
lines.append("%s- created_at = %s"
% (" " * (indent + 1),
timeutils.isotime(book.created_at)))
if book.updated_at is not None:
lines.append("%s- updated_at = %s"
% (" " * (indent + 1),
timeutils.isotime(book.updated_at)))
for flow_detail in book:
lines.append(pformat_flow_detail(flow_detail, indent=indent + 1))
return os.linesep.join(lines)