diff --git a/taskflow/backends/memory.py b/taskflow/backends/memory.py index f006e310..4f9e0599 100644 --- a/taskflow/backends/memory.py +++ b/taskflow/backends/memory.py @@ -22,10 +22,9 @@ import logging import threading import weakref -import taskflow.job - from taskflow import catalog from taskflow import exceptions as exc +from taskflow import job as t_job from taskflow import jobboard from taskflow import logbook from taskflow import states @@ -46,7 +45,7 @@ def check_not_closed(meth): return check -class MemoryClaimer(taskflow.job.Claimer): +class MemoryClaimer(t_job.Claimer): def claim(self, job, owner): job.owner = owner diff --git a/taskflow/db/sqlalchemy/models.py b/taskflow/db/sqlalchemy/models.py index 35572fab..2e2a08af 100644 --- a/taskflow/db/sqlalchemy/models.py +++ b/taskflow/db/sqlalchemy/models.py @@ -102,7 +102,8 @@ class TaskFlowBase(object): def iteritems(self): """Make the model object behave like a dict - Includes attributes from joins.""" + Includes attributes from joins. + """ local = dict(self) joined = dict([k, v] for k, v in self.__dict__.iteritems() if not k[0] == '_') diff --git a/taskflow/decorators.py b/taskflow/decorators.py index 60135981..ea99d139 100644 --- a/taskflow/decorators.py +++ b/taskflow/decorators.py @@ -86,7 +86,8 @@ def _take_arg(a): def wraps(fn): """This will not be needed in python 3.2 or greater which already has this - built-in to its functools.wraps method.""" + built-in to its functools.wraps method. + """ def wrapper(f): f = functools.wraps(fn)(f) @@ -108,7 +109,8 @@ def locked(f): def task(*args, **kwargs): """Decorates a given function and ensures that all needed attributes of - that function are set so that the function can be used as a task.""" + that function are set so that the function can be used as a task. + """ def decorator(f): w_f = extract(f) @@ -160,7 +162,8 @@ def task(*args, **kwargs): def _versionize(major, minor=None): """A decorator that marks the wrapped function with a major & minor version - number.""" + number. + """ if minor is None: minor = 0 @@ -180,7 +183,8 @@ def _versionize(major, minor=None): def _optional(*args, **kwargs): """Attaches a set of items that the decorated function would like as input - to the functions underlying dictionary.""" + to the functions underlying dictionary. + """ def decorator(f): w_f = extract(f) @@ -209,7 +213,8 @@ def _optional(*args, **kwargs): def _requires(*args, **kwargs): """Attaches a set of items that the decorated function requires as input - to the functions underlying dictionary.""" + to the functions underlying dictionary. + """ def decorator(f): w_f = extract(f) @@ -243,7 +248,8 @@ def _requires(*args, **kwargs): def _provides(*args, **kwargs): """Attaches a set of items that the decorated function provides as output - to the functions underlying dictionary.""" + to the functions underlying dictionary. + """ def decorator(f): w_f = extract(f) diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index 3f39527b..5b7d65dc 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -41,7 +41,8 @@ class ClosedException(TaskFlowException): class InvalidStateException(TaskFlowException): """Raised when a task/job/workflow is in an invalid state when an - operation is attempting to apply to said task/job/workflow.""" + operation is attempting to apply to said task/job/workflow. + """ pass diff --git a/taskflow/graph_utils.py b/taskflow/graph_utils.py index 18e7f617..b56559bd 100644 --- a/taskflow/graph_utils.py +++ b/taskflow/graph_utils.py @@ -26,7 +26,8 @@ LOG = logging.getLogger(__name__) def connect(graph, infer_key='infer', auto_reason='auto', discard_func=None): """Connects a graphs runners to other runners in the graph which provide - outputs for each runners requirements.""" + outputs for each runners requirements. + """ if len(graph) == 0: return diff --git a/taskflow/job.py b/taskflow/job.py index 1c0ffce0..409833c2 100644 --- a/taskflow/job.py +++ b/taskflow/job.py @@ -30,7 +30,8 @@ LOG = logging.getLogger(__name__) class Claimer(object): """A base class for objects that can attempt to claim a given - job, so that said job can be worked on.""" + job, so that said job can be worked on. + """ __metaclass__ = abc.ABCMeta @@ -38,21 +39,24 @@ class Claimer(object): def claim(self, job, owner): """This method will attempt to claim said job and must either succeed at this or throw an exception signaling the job can not - be claimed.""" + be claimed. + """ raise NotImplementedError() @abc.abstractmethod def unclaim(self, job, owner): """This method will attempt to unclaim said job and must either succeed at this or throw an exception signaling the job can not - be unclaimed.""" + be unclaimed. + """ raise NotImplementedError() class Job(object): """A job is connection to some set of work to be done by some agent. Basic information is provided about said work to be able to attempt to - fullfill said work.""" + fullfill said work. + """ __metaclass__ = abc.ABCMeta @@ -103,7 +107,8 @@ class Job(object): This must be done in a way that likely uses some type of locking or ownership transfer so that only a single entity gets this job to work on. This will avoid multi-job ownership, which can lead to - inconsistent state.""" + inconsistent state. + """ if self.state != states.UNCLAIMED: raise exc.UnclaimableJobException("Unable to claim job when job is" " in state %s" % (self.state)) @@ -136,7 +141,8 @@ class Job(object): def await(self, timeout=None): """Awaits until either the job fails or succeeds or the provided - timeout is reached.""" + timeout is reached. + """ def check_functor(): if self.state not in (states.FAILURE, states.SUCCESS): @@ -148,5 +154,6 @@ class Job(object): @property def uuid(self): """Returns a tracking *unique* identifier that can be used to identify - this job among other jobs.""" + this job among other jobs. + """ return "j-%s" % (self._id) diff --git a/taskflow/jobboard.py b/taskflow/jobboard.py index 05604200..e4d5c0c1 100644 --- a/taskflow/jobboard.py +++ b/taskflow/jobboard.py @@ -44,27 +44,31 @@ class JobBoard(object): def _notify_posted(self, job): """When a job is received, by whichever mechanism the underlying implementation provides, the job should be given to said listeners - for them to know that a job has arrived.""" + for them to know that a job has arrived. + """ for f in self._listeners: f(job, POSTED) def _notify_erased(self, job): """When a job is erased, by whichever mechanism the underlying implementation provides, the job should be given to said listeners - for them to know that a job has been erased.""" + for them to know that a job has been erased. + """ for f in self._listeners: f(job, ERASED) @abc.abstractmethod def posted_after(self, date_posted=None): """Gets the jobs posted after (or equal to) the given datetime object - (or all jobs if none).""" + (or all jobs if none). + """ raise NotImplementedError() @abc.abstractmethod def posted_before(self, date_posted=None): """Gets the jobs posted before the given datetime object - (or all jobs if none).""" + (or all jobs if none). + """ raise NotImplementedError() @abc.abstractmethod @@ -84,7 +88,8 @@ class JobBoard(object): def unsubscribe(self, listener): """Removes a given listener from notifications about job - updates/postings.""" + updates/postings. + """ if listener in self._listeners: self._listeners.remove(listener) diff --git a/taskflow/logbook.py b/taskflow/logbook.py index 037a3306..dd4844da 100644 --- a/taskflow/logbook.py +++ b/taskflow/logbook.py @@ -48,13 +48,15 @@ class FlowDetail(object): def __iter__(self): """Iterates over all task details. - The order will be in the same order that they were added.""" + The order will be in the same order that they were added. + """ raise NotImplementedError() @abc.abstractmethod def __contains__(self, task_name): """Determines if any task details with the given name exists in this - flow details.""" + flow details. + """ raise NotImplementedError() @abc.abstractmethod @@ -65,7 +67,8 @@ class FlowDetail(object): @abc.abstractmethod def add_task(self, task_name, metadata=None): """Atomically creates a new task detail entry to this flows details and - returns it for further use.""" + returns it for further use. + """ raise NotImplementedError() @abc.abstractmethod @@ -98,27 +101,31 @@ class LogBook(object): @abc.abstractmethod def __getitem__(self, flow_name): """Fetches the given flow details object for the given flow - name or raises an exception if that flow name does not exist.""" + name or raises an exception if that flow name does not exist. + """ raise NotImplementedError() @abc.abstractmethod def __contains__(self, flow_name): """Determines if a flow details object with the given flow name - exists in this logbook.""" + exists in this logbook. + """ raise NotImplementedError() @abc.abstractmethod def __delitem__(self, flow_name): """Removes the given flow details object that matches the provided flow name or raises an exception if that flow name does not - exist.""" + exist. + """ raise NotImplementedError() @abc.abstractmethod def __iter__(self): """Iterates over all the contained flow details. - The order will be in the same order that they were added.""" + The order will be in the same order that they were added. + """ raise NotImplementedError() @abc.abstractmethod diff --git a/taskflow/patterns/base.py b/taskflow/patterns/base.py index a4756e0d..cfa45a30 100644 --- a/taskflow/patterns/base.py +++ b/taskflow/patterns/base.py @@ -28,9 +28,28 @@ from taskflow import utils class Flow(object): - """The base abstract class of all flow implementations.""" + """The base abstract class of all flow implementations. + + It provides a set of parents to flows that have a concept of parent flows + as well as a state and state utility functions to the deriving classes. It + also provides a name and an identifier (uuid or other) to the flow so that + it can be uniquely identifed among many flows. + + Flows are expected to provide (if desired) the following methods: + - add + - add_many + - interrupt + - reset + - rollback + - run + - soft_reset + """ + __metaclass__ = abc.ABCMeta + # Common states that certain actions can be performed in. If the flow + # is not in these sets of states then it is likely that the flow operation + # can not succeed. RESETTABLE_STATES = set([ states.INTERRUPTED, states.SUCCESS, @@ -78,10 +97,12 @@ class Flow(object): @property def name(self): + """A non-unique name for this flow (human readable)""" return self._name @property def uuid(self): + """Uniquely identifies this flow""" return "f-%s" % (self._id) @property @@ -117,7 +138,8 @@ class Flow(object): """Adds a given task to this flow. Returns the uuid that is associated with the task for later operations - before and after it is ran.""" + before and after it is ran. + """ raise NotImplementedError() @decorators.locked @@ -156,7 +178,10 @@ class Flow(object): @decorators.locked def reset(self): """Fully resets the internal state of this flow, allowing for the flow - to be ran again. *Listeners are also reset*""" + to be ran again. + + Note: Listeners are also reset. + """ if self.state not in self.RESETTABLE_STATES: raise exc.InvalidStateException(("Can not reset when" " in state %s") % (self.state)) @@ -167,7 +192,8 @@ class Flow(object): @decorators.locked def soft_reset(self): """Partially resets the internal state of this flow, allowing for the - flow to be ran again from an interrupted state *only*""" + flow to be ran again from an interrupted state only. + """ if self.state not in self.SOFT_RESETTABLE_STATES: raise exc.InvalidStateException(("Can not soft reset when" " in state %s") % (self.state)) @@ -183,5 +209,6 @@ class Flow(object): @decorators.locked def rollback(self, context, cause): """Performs rollback of this workflow and any attached parent workflows - if present.""" + if present. + """ pass diff --git a/taskflow/patterns/distributed_flow.py b/taskflow/patterns/distributed_flow.py index 90b3f25d..5b4268b6 100644 --- a/taskflow/patterns/distributed_flow.py +++ b/taskflow/patterns/distributed_flow.py @@ -27,8 +27,14 @@ LOG = logging.getLogger(__name__) class Flow(object): - """A linear chain of independent tasks that can be applied as one unit or - rolled back as one unit.""" + """A flow that can paralleize task running by using celery. + + This flow backs running tasks (and associated dependencies) by using celery + as the runtime framework to accomplish execution (and status reporting) of + said tasks that compose the flow. It allows for parallel execution where + possible (data/task dependency dependent) without having to worry about how + this is accomplished in celery. + """ def __init__(self, name, parents=None): self.name = name diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 7e051db0..320c3477 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -35,7 +35,8 @@ LOG = logging.getLogger(__name__) class Flow(linear_flow.Flow): """A extension of the linear flow which will run the associated tasks in a linear topological ordering (and reverse using the same linear - topological order)""" + topological order). + """ def __init__(self, name, parents=None, uuid=None): super(Flow, self).__init__(name, parents, uuid) @@ -67,7 +68,8 @@ class Flow(linear_flow.Flow): @decorators.locked def add_dependency(self, provider_uuid, requirer_uuid): """Connects provider to requirer where provider will now be required - to run before requirer does.""" + to run before requirer does. + """ if provider_uuid == requirer_uuid: raise ValueError("Unable to link %s to itself" % provider_uuid) provider = self._find_uuid(provider_uuid) @@ -116,7 +118,8 @@ class Flow(linear_flow.Flow): def _connect(self): """Connects the nodes & edges of the graph together by examining who the requirements of each node and finding another node that will - create said dependency.""" + create said dependency. + """ if len(self._graph) == 0: return [] if self._connected: diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index 96ef74e9..c4d6084d 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -34,11 +34,12 @@ LOG = logging.getLogger(__name__) class Flow(base.Flow): """"A linear chain of tasks that can be applied in order as one unit and - rolled back as one unit using the reverse order that the tasks have - been applied in. + rolled back as one unit using the reverse order that the tasks have + been applied in. - Note(harlowja): Each task in the chain must have requirements - which are satisfied by the previous task/s in the chain.""" + Note(harlowja): Each task in the chain must have requirements + which are satisfied by the previous task/s in the chain. + """ def __init__(self, name, parents=None, uuid=None): super(Flow, self).__init__(name, parents, uuid) diff --git a/taskflow/patterns/resumption/logbook.py b/taskflow/patterns/resumption/logbook.py index a3f78599..687e1667 100644 --- a/taskflow/patterns/resumption/logbook.py +++ b/taskflow/patterns/resumption/logbook.py @@ -35,7 +35,8 @@ class Resumption(object): def _task_listener(state, details): """Store the result of the task under the given flow in the log - book so that it can be retrieved later.""" + book so that it can be retrieved later. + """ runner = details['runner'] flow = details['flow'] LOG.debug("Recording %s of %s has finished state %s", @@ -67,7 +68,8 @@ class Resumption(object): def _workflow_listener(state, details): """Ensure that when we receive an event from said workflow that we - make sure a logbook entry exists for that flow.""" + make sure a logbook entry exists for that flow. + """ flow = details['flow'] old_state = details['old_state'] LOG.debug("%s has transitioned from %s to %s", flow, old_state, @@ -112,7 +114,8 @@ class Resumption(object): def resume(self, flow, ordering): """Splits the initial ordering into two segments, the first which has already completed (or errored) and the second which has not - completed or errored.""" + completed or errored. + """ flow_id = flow.uuid if flow_id not in self._logbook: diff --git a/taskflow/task.py b/taskflow/task.py index ecf2d69d..f2851bf5 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -54,11 +54,13 @@ class Task(object): This method can be used to apply some given context and given set of args and kwargs to accomplish some goal. Note that the result that is returned needs to be serializable so that it can be passed - back into this task if reverting is triggered.""" + back into this task if reverting is triggered. + """ raise NotImplementedError() def revert(self, context, result, cause): """Revert this task using the given context, result that the apply provided as well as any information which may have caused - said reversion.""" + said reversion. + """ pass diff --git a/taskflow/utils.py b/taskflow/utils.py index 37162974..4275c9e6 100644 --- a/taskflow/utils.py +++ b/taskflow/utils.py @@ -142,7 +142,8 @@ def await(check_functor, timeout=None): class LastFedIter(object): """An iterator which yields back the first item and then yields back - results from the provided iterator.""" + results from the provided iterator. + """ def __init__(self, first, rest_itr): self.first = first @@ -156,7 +157,8 @@ class LastFedIter(object): class FlowFailure(object): """When a task failure occurs the following object will be given to revert - and can be used to interrogate what caused the failure.""" + and can be used to interrogate what caused the failure. + """ def __init__(self, runner, flow, exception): self.runner = runner @@ -167,7 +169,8 @@ class FlowFailure(object): class RollbackTask(object): """A helper task that on being called will call the underlying callable - tasks revert method (if said method exists)""" + tasks revert method (if said method exists). + """ def __init__(self, context, task, result): self.task = task @@ -261,7 +264,8 @@ class Runner(object): class TransitionNotifier(object): """A utility helper class that can be used to subscribe to notifications of events occuring as well as allow a entity to post said - notifications to subscribers.""" + notifications to subscribers. + """ RESERVED_KEYS = ('details',) ANY = '*' @@ -319,7 +323,8 @@ class RollbackAccumulator(object): """A utility class that can help in organizing 'undo' like code so that said code be rolled back on failure (automatically or manually) by activating rollback callables that were inserted during said codes - progression.""" + progression. + """ def __init__(self): self._rollbacks = [] @@ -387,7 +392,8 @@ class ReaderWriterLock(object): """Acquire a read lock. Several threads can hold this typeof lock. - It is exclusive with write locks.""" + It is exclusive with write locks. + """ self.monitor.acquire() while self.rwlock < 0 or self.writers_waiting: @@ -399,7 +405,8 @@ class ReaderWriterLock(object): """Acquire a write lock. Only one thread can hold this lock, and only when no read locks - are also held.""" + are also held. + """ self.monitor.acquire() while self.rwlock != 0: diff --git a/tox.ini b/tox.ini index 4499f71f..ada78a29 100644 --- a/tox.ini +++ b/tox.ini @@ -39,6 +39,6 @@ setenv = NOSE_WITH_COVERAGE=1 commands = {posargs} [flake8] -ignore = H402,H403 +ignore = H402,H302 builtins = _ exclude = .venv,.tox,dist,doc,*openstack/common*,*egg,.git,build,tools \ No newline at end of file