diff --git a/doc/source/index.rst b/doc/source/index.rst index 1fc28c3a..84075223 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -17,7 +17,6 @@ Contents jobs inputs_and_outputs notifications - storage persistence exceptions utils diff --git a/doc/source/inputs_and_outputs.rst b/doc/source/inputs_and_outputs.rst index 2542fe71..ee00945f 100644 --- a/doc/source/inputs_and_outputs.rst +++ b/doc/source/inputs_and_outputs.rst @@ -63,7 +63,7 @@ Engine and Storage ------------------ The storage layer is how an engine persists flow and task details. For more -in-depth design details see :doc:`persistence` and :doc:`storage`. +in-depth design details see :doc:`persistence`. Inputs ------ diff --git a/doc/source/jobs.rst b/doc/source/jobs.rst index 248ce500..0bff17c9 100644 --- a/doc/source/jobs.rst +++ b/doc/source/jobs.rst @@ -85,8 +85,11 @@ Using Jobboards All engines are mere classes that implement same interface, and of course it is possible to import them and create their instances just like with any classes in Python. But the easier (and recommended) way for creating jobboards is by -using the `fetch()` functionality. Using this function the typical creation of -a jobboard (and an example posting of a job) might look like: +using the :py:meth:`fetch() ` function which uses +entrypoints (internally using `stevedore`_) to fetch and configure your backend + +Using this function the typical creation of a jobboard (and an example posting +of a job) might look like: .. code-block:: python @@ -153,11 +156,6 @@ might look like: time.sleep(coffee_break_time) ... - -.. automodule:: taskflow.jobs.backends -.. automodule:: taskflow.persistence -.. automodule:: taskflow.persistence.backends - Jobboard Configuration ====================== @@ -198,6 +196,7 @@ Additional *configuration* parameters: Job Interface ============= +.. automodule:: taskflow.jobs.backends .. automodule:: taskflow.jobs.job Jobboard Interface @@ -209,3 +208,4 @@ Jobboard Interface .. _zookeeper: http://zookeeper.apache.org/ .. _kazoo: http://kazoo.readthedocs.org/ .. _eventlet handler: https://pypi.python.org/pypi/kazoo-eventlet-handler/ +.. _stevedore: http://stevedore.readthedocs.org/ diff --git a/doc/source/persistence.rst b/doc/source/persistence.rst index 6eccbba4..4981b83c 100644 --- a/doc/source/persistence.rst +++ b/doc/source/persistence.rst @@ -1,17 +1,194 @@ ------------ +=========== Persistence ------------ +=========== -Persistence objects -~~~~~~~~~~~~~~~~~~~ +Overview +======== -.. automodule:: taskflow.persistence.logbook +In order to be able to recieve inputs and create outputs from atoms (or other +engine processes) in a fault-tolerant way, there is a need to be able to place +what atoms output in some kind of location where it can be re-used by other +atoms (or used for other purposes). To accomodate this type of usage taskflow +provides an abstraction (provided by pluggable `stevedore`_ backends) that is +similar in concept to a running programs *memory*. +This abstraction serves the following *major* purposes: -Persistence backends -~~~~~~~~~~~~~~~~~~~~ +* Tracking of what was done (introspection). +* Saving *memory* which allows for restarting from the last saved state + which is a critical feature to restart and resume workflows (checkpointing). +* Associating additional metadata with atoms while running (without having those + atoms need to save this data themselves). This makes it possible to add-on + new metadata in the future without having to change the atoms themselves. For + example the following can be saved: + + * Timing information (how long a task took to run). + * User information (who the task ran as). + * When a atom/workflow was ran (and why). + +* Saving historical data (failures, successes, intermediary results...) to allow + for retry atoms to be able to decide if they should should continue vs. stop. +* *Something you create...* + +For more *general* information, please see `wiki page`_. + +.. _stevedore: http://stevedore.readthedocs.org/ +.. _wiki page: https://wiki.openstack.org/wiki/TaskFlow/Persistence + +How it is used +============== + +On :doc:`engine ` construction typically a backend (it can be optional) +will be provided which satisifies the :py:class:`~taskflow.persistence.backends.base.Backend` +abstraction. Along with providing a backend object a :py:class:`~taskflow.persistence.logbook.FlowDetail` +object will also be created and provided (this object will contain the details about +the flow to be ran) to the engine constructor (or associated :py:meth:`load() ` helper functions). +Typically a :py:class:`~taskflow.persistence.logbook.FlowDetail` object is created from +a :py:class:`~taskflow.persistence.logbook.LogBook` object (the book object +acts as a type of container for :py:class:`~taskflow.persistence.logbook.FlowDetail` +and :py:class:`~taskflow.persistence.logbook.AtomDetail` objects). + +**Preparation**: Once an engine starts to run it will create a :py:class:`~taskflow.storage.Storage` +object which will act as the engines interface to the underlying backend storage +objects (it provides helper functions that are commonly used by the engine, +avoiding repeating code when interacting with the provided :py:class:`~taskflow.persistence.logbook.FlowDetail` +and :py:class:`~taskflow.persistence.backends.base.Backend` objects). As an engine +initializes it will extract (or create) :py:class:`~taskflow.persistence.logbook.AtomDetail` +objects for each atom in the workflow the engine will be executing. + +**Execution:** When an engine beings to execute it will examine any previously existing +:py:class:`~taskflow.persistence.logbook.AtomDetail` objects to see if they can be used +for resuming; see `big picture`_ for more details on this subject. For atoms which have not +finished (or did not finish correctly from a previous run) they will begin executing +only after any dependent inputs are ready. This is done by analyzing the execution +graph and looking at predecessor :py:class:`~taskflow.persistence.logbook.AtomDetail` +outputs and states (which may have been persisted in a past run). This will result +in either using there previous information or by running those predecessors and +saving their output to the :py:class:`~taskflow.persistence.logbook.FlowDetail` and +:py:class:`~taskflow.persistence.backends.base.Backend` objects. This execution, analysis +and interaction with the storage objects continues (what is described here is +a simplification of what really happens; which is quite a bit more complex) +until the engine has finished running (at which point the engine will have +succeeded or failed in its attempt to run the workflow). + +**Post-execution:** Typically when an engine is done running the logbook would +be discarded (to avoid creating a stockpile of useless data) and the backend +storage would be told to delete any contents for a given execution. For certain +use-cases though it may be advantageous to retain logbooks and there contents. + +A few scenarios come to mind: + +* Post runtime failure analysis and triage (saving what failed and why). +* Metrics (saving timing information associated with each atom and using it + to perform offline performance analysis, which enables tuning tasks and/or + isolating and fixing slow tasks). +* Data mining logbooks to find trends (in failures for example). +* Saving logbooks for further forensics analysis. +* Exporting logbooks to `hdfs`_ (or other no-sql storage) and running some type + of map-reduce jobs on them. + +.. _hdfs: https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html +.. _big picture: https://wiki.openstack.org/wiki/TaskFlow/Patterns_and_Engines/Persistence#Big_Picture + +.. note:: + + It should be emphasized that logbook is the authoritative, and, preferably, + the **only** (see :doc:`inputs and outputs `) source of + run-time state information (breaking this principle makes it hard/impossible + to restart or resume in any type of automated fashion). When an atom returns + a result, it should be written directly to a logbook. When atom or flow state + changes in any way, logbook is first to know (see :doc:`notifications ` + for how a user may also get notified of those same state changes). The logbook + and a backend and associated storage helper class are responsible to store the actual data. + These components used together specify the persistence mechanism (how data + is saved and where -- memory, database, whatever...) and the persistence policy + (when data is saved -- every time it changes or at some particular moments + or simply never). + +Persistence Configuration +========================= + +To select which persistence backend to use you should use the +:py:meth:`fetch() ` function which uses +entrypoints (internally using `stevedore`_) to fetch and configure your backend. This makes +it simpler than accessing the backend data types directly and provides a common +function from which a backend can be fetched. + +Using this function to fetch a backend might look like: + +.. code-block:: python + + from taskflow.persistence import backends + + ... + persistence = backends.fetch(conf={ + "connection': "mysql", + "user": ..., + "password": ..., + }) + book = make_and_save_logbook(persistence) + ... + +As can be seen from above the ``conf`` parameter acts as a dictionary that +is used to fetch and configure your backend. The restrictions on it are +the following: + +* a dictionary (or dictionary like type), holding backend type with key + ``'connection'`` and possibly type-specific backend parameters as other + keys. + +Known engine types are listed below. + +**Connection**: ``'memory'`` + +Retains all data in local memory (not persisted to reliable storage). Useful +for scenarios where persistence is not required (and also in unit tests). + +**Connection**: ``'dir'`` or ``'file'`` + +Retains all data in a directory & file based structure on local disk. Will be +persisted **locally** in the case of system failure (allowing for resumption +from the same local machine only). Useful for cases where a *more* reliable +persistence is desired along with the simplicity of files and directories (a +concept everyone is familiar with). + +**Connection**: ``'mysql'`` or ``'postgres'`` or ``'sqlite'`` + +Retains all data in a `ACID`_ compliant database using the `sqlalchemy`_ library +for schemas, connections, and database interaction functionality. Useful when +you need a higher level of durability than offered by the previous solutions. When +using these connection types it is possible to resume a engine from a peer machine (this +does not apply when using sqlite). + +.. _sqlalchemy: http://www.sqlalchemy.org/docs/ +.. _ACID: https://en.wikipedia.org/wiki/ACID + +**Connection**: ``'zookeeper'`` + +Retains all data in a `zookeeper`_ backend (zookeeper exposes operations on +files and directories, similar to the above ``'dir'`` or ``'file'`` connection +types). Internally the `kazoo`_ library is used to interact with zookeeper +to perform reliable, distributed and atomic operations on the contents of a +logbook represented as znodes. Since zookeeper is also distributed it is also +able to resume a engine from a peer machine (having similar functionality +as the database connection types listed previously). + +.. _zookeeper: http://zookeeper.apache.org +.. _kazoo: http://kazoo.readthedocs.org/ + +Persistence Backend Interfaces +============================== .. automodule:: taskflow.persistence.backends - :undoc-members: - .. automodule:: taskflow.persistence.backends.base +.. automodule:: taskflow.persistence.logbook + +Hierarchy +========= + +.. inheritance-diagram:: + taskflow.persistence.backends.impl_memory + taskflow.persistence.backends.impl_zookeeper + taskflow.persistence.backends.impl_dir + taskflow.persistence.backends.impl_sqlalchemy + :parts: 1 diff --git a/doc/source/storage.rst b/doc/source/storage.rst deleted file mode 100644 index 0e5fc407..00000000 --- a/doc/source/storage.rst +++ /dev/null @@ -1,5 +0,0 @@ -------- -Storage -------- - -.. automodule:: taskflow.storage diff --git a/taskflow/jobs/backends/__init__.py b/taskflow/jobs/backends/__init__.py index f9efc534..ad4dc060 100644 --- a/taskflow/jobs/backends/__init__.py +++ b/taskflow/jobs/backends/__init__.py @@ -29,6 +29,10 @@ LOG = logging.getLogger(__name__) def fetch(name, conf, namespace=BACKEND_NAMESPACE, **kwargs): + """Fetch a jobboard backend with the given configuration (and any board + specific kwargs) in the given entrypoint namespace and create it with the + given name. + """ # NOTE(harlowja): this allows simpler syntax. if isinstance(conf, six.string_types): conf = {'board': conf} diff --git a/taskflow/persistence/backends/__init__.py b/taskflow/persistence/backends/__init__.py index 3565bc82..5cf30243 100644 --- a/taskflow/persistence/backends/__init__.py +++ b/taskflow/persistence/backends/__init__.py @@ -32,7 +32,10 @@ SCHEME_REGEX = re.compile(r"^([A-Za-z]{1}[A-Za-z0-9+.-]*):") LOG = logging.getLogger(__name__) -def fetch(conf, namespace=BACKEND_NAMESPACE): +def fetch(conf, namespace=BACKEND_NAMESPACE, **kwargs): + """Fetches a given backend using the given configuration (and any backend + specific kwargs) in the given entrypoint namespace. + """ connection = conf['connection'] match = SCHEME_REGEX.match(connection) @@ -45,7 +48,8 @@ def fetch(conf, namespace=BACKEND_NAMESPACE): try: mgr = driver.DriverManager(namespace, backend_name, invoke_on_load=True, - invoke_kwds={'conf': conf}) + invoke_args=(conf,), + invoke_kwds=kwargs) return mgr.driver except RuntimeError as e: raise exc.NotFound("Could not find backend %s: %s" % (backend_name, e))