This adds a executor backed job dispatching base class and has the existing blocking executor use it by running jobs and dispatching jobs into a sync executor. It also allows for dispatching jobs into a thread executor, or other executor via a new '_executor_factory' method that can generate executors (it can be overriden in the non-blocking executor to provide your own executors instances). This does alter the behavior in that now that jobs are dispatched into an executor we no longer can immediatly know if a job was dispatched and raised an exception or whether it will raise an exception in the future, so we now alter the 'local_dispatched' to just be a boolean that is used to determine if any dispatches happened (failure or not). Change-Id: I485770e8f4c85d3833892a453c9fb5168d8f0407
2.7 KiB
Conductors
Overview
Conductors provide a mechanism that unifies the various concepts under a single easy to use (as plug-and-play as we can make it) construct.
They are responsible for the following:
- Interacting with
jobboards <jobs>
(examining and claimingjobs <jobs>
). - Creating
engines <engines>
from the claimed jobs (usingfactories <resumption factories>
to reconstruct the contained tasks and flows to be executed). - Dispatching the engine using the provided
persistence <persistence>
layer and engine configuration. - Completing or abandoning the claimed
job <jobs>
(depending on dispatching and execution outcome). - Rinse and repeat.
Note
They are inspired by and have similar responsibilities as railroad conductors or musical conductors.
Considerations
Some usage considerations should be used when using a conductor to make sure it's used in a safe and reliable manner. Eventually we hope to make these non-issues but for now they are worth mentioning.
Endless cycling
What: Jobs that fail (due to some type of internal error) on one conductor will be abandoned by that conductor and then another conductor may experience those same errors and abandon it (and repeat). This will create a job abandonment cycle that will continue for as long as the job exists in an claimable state.
Example:
Alleviate by:
- Forcefully delete jobs that have been failing continuously after a
given number of conductor attempts. This can be either done manually or
automatically via scripts (or other associated monitoring) or via the
jobboards :py
~taskflow.jobs.base.JobBoard.trash
method. - Resolve the internal error's cause (storage backend failure, other...).
Interfaces
taskflow.conductors.base
taskflow.conductors.backends
taskflow.conductors.backends.impl_executor
Implementations
Blocking
taskflow.conductors.backends.impl_blocking
Non-blocking
taskflow.conductors.backends.impl_nonblocking
Hierarchy
taskflow.conductors.base taskflow.conductors.backends.impl_blocking taskflow.conductors.backends.impl_nonblocking taskflow.conductors.backends.impl_executor