diff --git a/taskflow/atom.py b/taskflow/atom.py index 0a179342..3e8c39ad 100644 --- a/taskflow/atom.py +++ b/taskflow/atom.py @@ -28,7 +28,7 @@ LOG = logging.getLogger(__name__) def _save_as_to_mapping(save_as): - """Convert save_as to mapping name => index + """Convert save_as to mapping name => index. Result should follow storage convention for mappings. """ diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 58b27384..73dc8a76 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -180,7 +180,7 @@ class ActionEngine(base.EngineBase): self._root = self._graph_action_cls(self._analyzer, self.storage, self._task_action) - # NOTE(harlowja): Perform inital state manipulation and setup. + # NOTE(harlowja): Perform initial state manipulation and setup. # # TODO(harlowja): This doesn't seem like it should be in a compilation # function since compilation seems like it should not modify any @@ -190,12 +190,12 @@ class ActionEngine(base.EngineBase): class SingleThreadedActionEngine(ActionEngine): - """Engine that runs tasks in serial manner""" + """Engine that runs tasks in serial manner.""" _storage_cls = t_storage.Storage class MultiThreadedActionEngine(ActionEngine): - """Engine that runs tasks in parallel manner""" + """Engine that runs tasks in parallel manner.""" _storage_cls = t_storage.ThreadSafeStorage diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 66897a66..8d812a3c 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -25,7 +25,7 @@ from taskflow.utils import async_utils from taskflow.utils import misc from taskflow.utils import threading_utils -# Revert or execution events... +# Execution and reversion events. EXECUTED = 'executed' REVERTED = 'reverted' @@ -36,7 +36,7 @@ def _execute_task(task, arguments, progress_callback): result = task.execute(**arguments) except Exception: # NOTE(imelnikov): wrap current exception with Failure - # object and return it + # object and return it. result = misc.Failure() return (task, EXECUTED, result) @@ -50,7 +50,7 @@ def _revert_task(task, arguments, result, failures, progress_callback): result = task.revert(**kwargs) except Exception: # NOTE(imelnikov): wrap current exception with Failure - # object and return it + # object and return it. result = misc.Failure() return (task, REVERTED, result) @@ -71,18 +71,18 @@ class TaskExecutorBase(object): @abc.abstractmethod def revert_task(self, task, arguments, result, failures, progress_callback=None): - """Schedules task reversion""" + """Schedules task reversion.""" @abc.abstractmethod def wait_for_any(self, fs, timeout=None): - """Wait for futures returned by this executor to complete""" + """Wait for futures returned by this executor to complete.""" def start(self): - """Prepare to execute tasks""" + """Prepare to execute tasks.""" pass def stop(self): - """Finalize task executor""" + """Finalize task executor.""" pass @@ -100,7 +100,7 @@ class SerialTaskExecutor(TaskExecutorBase): failures, progress_callback)) def wait_for_any(self, fs, timeout=None): - # NOTE(imelnikov): this executor returns only done futures + # NOTE(imelnikov): this executor returns only done futures. return fs, [] diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index 080d7e0a..f84f6741 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -75,7 +75,7 @@ class FutureGraphAction(object): while not_done: # NOTE(imelnikov): if timeout occurs before any of futures # completes, done list will be empty and we'll just go - # for next iteration + # for next iteration. done, not_done = self._task_action.wait_for_any( not_done, _WAITING_TIMEOUT) @@ -97,7 +97,7 @@ class FutureGraphAction(object): else: # NOTE(imelnikov): engine stopped while there were # still some tasks to do, so we either failed - # or were suspended + # or were suspended. was_suspended = True misc.Failure.reraise_if_any(failures) diff --git a/taskflow/engines/action_engine/graph_analyzer.py b/taskflow/engines/action_engine/graph_analyzer.py index 78ca8596..447c1425 100644 --- a/taskflow/engines/action_engine/graph_analyzer.py +++ b/taskflow/engines/action_engine/graph_analyzer.py @@ -36,8 +36,8 @@ class GraphAnalyzer(object): return self._graph def browse_nodes_for_execute(self, node=None): - """Browse next nodes to execute for given node if - specified and for whole graph otherwise. + """Browse next nodes to execute for given node if specified and + for whole graph otherwise. """ if node: nodes = self._graph.successors(node) @@ -51,8 +51,8 @@ class GraphAnalyzer(object): return available_nodes def browse_nodes_for_revert(self, node=None): - """Browse next nodes to revert for given node if - specified and for whole graph otherwise. + """Browse next nodes to revert for given node if specified and + for whole graph otherwise. """ if node: nodes = self._graph.predecessors(node) @@ -66,7 +66,7 @@ class GraphAnalyzer(object): return available_nodes def _is_ready_for_execute(self, task): - """Checks if task is ready to be executed""" + """Checks if task is ready to be executed.""" state = self._storage.get_task_state(task.name) if not st.check_task_transition(state, st.RUNNING): @@ -81,7 +81,7 @@ class GraphAnalyzer(object): for state in six.itervalues(task_states)) def _is_ready_for_revert(self, task): - """Checks if task is ready to be reverted""" + """Checks if task is ready to be reverted.""" state = self._storage.get_task_state(task.name) if not st.check_task_transition(state, st.REVERTING): diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index 6742a2d1..7aa130a0 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -53,7 +53,7 @@ class TaskAction(object): return True def _on_update_progress(self, task, event_data, progress, **kwargs): - """Should be called when task updates its progress""" + """Should be called when task updates its progress.""" try: self._storage.set_task_progress(task.name, progress, kwargs) except Exception: diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py index a35d5f33..c907997f 100644 --- a/taskflow/engines/base.py +++ b/taskflow/engines/base.py @@ -26,7 +26,7 @@ from taskflow.utils import misc @six.add_metaclass(abc.ABCMeta) class EngineBase(object): - """Base for all engines implementations""" + """Base for all engines implementations.""" def __init__(self, flow, flow_detail, backend, conf): self._flow = flow @@ -49,7 +49,7 @@ class EngineBase(object): @abc.abstractproperty def _storage_cls(self): - """Storage class that will be used to generate storage objects""" + """Storage class that will be used to generate storage objects.""" @abc.abstractmethod def compile(self): diff --git a/taskflow/engines/helpers.py b/taskflow/engines/helpers.py index 95c2d9b0..687d64b1 100644 --- a/taskflow/engines/helpers.py +++ b/taskflow/engines/helpers.py @@ -31,7 +31,7 @@ ENGINES_NAMESPACE = 'taskflow.engines' def load(flow, store=None, flow_detail=None, book=None, engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE): - """Load flow into engine + """Load flow into engine. This function creates and prepares engine to run the flow. All that is left is to run the engine with 'run()' method. @@ -59,7 +59,7 @@ def load(flow, store=None, flow_detail=None, book=None, if engine_conf is None: engine_conf = {'engine': 'default'} - # NOTE(imelnikov): this allows simpler syntax + # NOTE(imelnikov): this allows simpler syntax. if isinstance(engine_conf, six.string_types): engine_conf = {'engine': engine_conf} @@ -86,7 +86,7 @@ def load(flow, store=None, flow_detail=None, book=None, def run(flow, store=None, engine_conf=None, backend=None): - """Run the flow + """Run the flow. This function load the flow into engine (with 'load' function) and runs the engine. @@ -113,7 +113,7 @@ def run(flow, store=None, engine_conf=None, backend=None): def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None, store=None, book=None, engine_conf=None, backend=None): - """Load flow from factory function into engine + """Load flow from factory function into engine. Gets flow factory function (or name of it) and creates flow with it. Then, flow is loaded into engine with load(), and factory @@ -158,9 +158,9 @@ def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None, def flow_from_detail(flow_detail): - """Recreate flow previously loaded with load_form_factory + """Recreate flow previously loaded with load_form_factory. - Gets flow factory name from metadata, calls it to recreate the flow + Gets flow factory name from metadata, calls it to recreate the flow. :param flow_detail: FlowDetail that holds state of the flow to load """ @@ -183,7 +183,7 @@ def flow_from_detail(flow_detail): def load_from_detail(flow_detail, store=None, engine_conf=None, backend=None): - """Reload flow previously loaded with load_form_factory + """Reload flow previously loaded with load_form_factory. Gets flow factory name from metadata, calls it to recreate the flow and loads flow into engine with load(). diff --git a/taskflow/examples/create_parallel_volume.py b/taskflow/examples/create_parallel_volume.py index c271af71..7ca9d735 100644 --- a/taskflow/examples/create_parallel_volume.py +++ b/taskflow/examples/create_parallel_volume.py @@ -100,7 +100,7 @@ class VolumeCreator(task.Task): print("Finished making volume %s" % (self._volume_id)) -# Assume there is no ordering dependency between volumes +# Assume there is no ordering dependency between volumes. flow = uf.Flow("volume-maker") for i in range(0, VOLUME_COUNT): flow.add(VolumeCreator(volume_id="vol-%s" % (i))) diff --git a/taskflow/examples/fake_billing.py b/taskflow/examples/fake_billing.py index 63e95769..d2f5e3f0 100644 --- a/taskflow/examples/fake_billing.py +++ b/taskflow/examples/fake_billing.py @@ -65,7 +65,7 @@ class UrlCaller(object): for i in range(0, len(data)): time.sleep(sleep_time) # As we send the data, each chunk we 'fake' send will progress - # the sending progress that much further to 100% + # the sending progress that much further to 100%. if status_cb: status_cb(float(i) / len(data)) diff --git a/taskflow/examples/persistence_example.py b/taskflow/examples/persistence_example.py index d852d33b..6ec51cb0 100644 --- a/taskflow/examples/persistence_example.py +++ b/taskflow/examples/persistence_example.py @@ -84,8 +84,8 @@ def make_flow(blowup=False): # Persist the flow and task state here, if the file/dir exists already blowup -# if not don't blowup, this allows a user to see both the modes and to -# see what is stored in each case. +# if not don't blowup, this allows a user to see both the modes and to see +# what is stored in each case. if example_utils.SQLALCHEMY_AVAILABLE: persist_path = os.path.join(tempfile.gettempdir(), "persisting.db") backend_uri = "sqlite:///%s" % (persist_path) diff --git a/taskflow/examples/resume_volume_create.py b/taskflow/examples/resume_volume_create.py index a8be9c69..8230f2b7 100644 --- a/taskflow/examples/resume_volume_create.py +++ b/taskflow/examples/resume_volume_create.py @@ -146,7 +146,7 @@ with example_utils.get_backend() as backend: else: flow_detail = find_flow_detail(backend, book_id, flow_id) - # Annnnd load and run. + # Load and run. engine_conf = { 'engine': 'serial', } diff --git a/taskflow/examples/simple_linear.py b/taskflow/examples/simple_linear.py index e478d40e..c9f6d11f 100644 --- a/taskflow/examples/simple_linear.py +++ b/taskflow/examples/simple_linear.py @@ -62,6 +62,6 @@ flow = lf.Flow('simple-linear').add( CallJoe() ) -# Now run that flow using the provided initial data (store below) +# Now run that flow using the provided initial data (store below). taskflow.engines.run(flow, store=dict(joe_number=444, jim_number=555)) diff --git a/taskflow/examples/wrapped_exception.py b/taskflow/examples/wrapped_exception.py index c15a9aba..cf1d8f7c 100644 --- a/taskflow/examples/wrapped_exception.py +++ b/taskflow/examples/wrapped_exception.py @@ -77,11 +77,11 @@ def wrap_all_failures(): class FirstException(Exception): - """Exception that first task raises""" + """Exception that first task raises.""" class SecondException(Exception): - """Exception that second task raises""" + """Exception that second task raises.""" class FirstTask(task.Task): diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index 2e3f772e..b205e8ce 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -89,7 +89,7 @@ class EmptyFlow(TaskFlowException): class WrappedFailure(TaskFlowException): - """Wraps one or several failures + """Wraps one or several failures. When exception cannot be re-raised (for example, because the value and traceback is lost in serialization) or @@ -101,17 +101,17 @@ class WrappedFailure(TaskFlowException): self._causes = [] for cause in causes: if cause.check(type(self)) and cause.exception: - # NOTE(imelnikov): flatten wrapped failures + # NOTE(imelnikov): flatten wrapped failures. self._causes.extend(cause.exception) else: self._causes.append(cause) def __iter__(self): - """Iterate over failures that caused the exception""" + """Iterate over failures that caused the exception.""" return iter(self._causes) def __len__(self): - """Return number of wrapped failures""" + """Return number of wrapped failures.""" return len(self._causes) def check(self, *exc_classes): diff --git a/taskflow/flow.py b/taskflow/flow.py index fbe7fc56..18c77fa7 100644 --- a/taskflow/flow.py +++ b/taskflow/flow.py @@ -54,7 +54,7 @@ class Flow(object): @property def name(self): - """A non-unique name for this flow (human readable)""" + """A non-unique name for this flow (human readable).""" return self._name @abc.abstractmethod diff --git a/taskflow/jobs/job.py b/taskflow/jobs/job.py index 93715950..428f1763 100644 --- a/taskflow/jobs/job.py +++ b/taskflow/jobs/job.py @@ -71,16 +71,16 @@ class Job(object): @property def uuid(self): - """The uuid of this job""" + """The uuid of this job.""" return self._uuid @property def name(self): - """The non-uniquely identifying name of this job""" + """The non-uniquely identifying name of this job.""" return self._name def __iter__(self): - # Don't iterate while holding the lock + # Don't iterate while holding the lock. with self._lock: flows = list(self._flows) for f in flows: diff --git a/taskflow/jobs/jobboard.py b/taskflow/jobs/jobboard.py index 78d8ba6a..59fecccb 100644 --- a/taskflow/jobs/jobboard.py +++ b/taskflow/jobs/jobboard.py @@ -35,7 +35,7 @@ class JobBoard(object): @property def name(self): - """The non-uniquely identifying name of this jobboard""" + """The non-uniquely identifying name of this jobboard.""" return self._name @abc.abstractmethod diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py index fbb40ff6..54319a82 100644 --- a/taskflow/listeners/base.py +++ b/taskflow/listeners/base.py @@ -29,7 +29,7 @@ from taskflow.utils import misc LOG = logging.getLogger(__name__) -# NOTE(harlowja): on these states will results be useable, all other states +# NOTE(harlowja): on these states will results be usable, all other states # do not produce results. FINISH_STATES = (states.FAILURE, states.SUCCESS) diff --git a/taskflow/listeners/printing.py b/taskflow/listeners/printing.py index 882f9d24..dff453bd 100644 --- a/taskflow/listeners/printing.py +++ b/taskflow/listeners/printing.py @@ -26,7 +26,7 @@ from taskflow.utils import misc class PrintingListener(base.LoggingBase): - """Writes the task and flow notifications messages to stdout or stderr""" + """Writes the task and flow notifications messages to stdout or stderr.""" def __init__(self, engine, task_listen_for=(misc.TransitionNotifier.ANY,), flow_listen_for=(misc.TransitionNotifier.ANY,), diff --git a/taskflow/listeners/timing.py b/taskflow/listeners/timing.py index 552bd72b..1375cbec 100644 --- a/taskflow/listeners/timing.py +++ b/taskflow/listeners/timing.py @@ -49,8 +49,7 @@ class TimingListener(base.ListenerBase): 'duration': float(timer.elapsed()), } try: - # Don't let storage failures throw exceptions in a listener - # method. + # Don't let storage failures throw exceptions in a listener method. self._engine.storage.update_task_metadata(task_name, meta_update) except excp.StorageError: LOG.exception("Failure to store duration update %s for task %s", diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 56c77b5b..ddd27fb9 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -27,7 +27,7 @@ from taskflow.utils import graph_utils class Flow(flow.Flow): - """Graph flow pattern + """Graph flow pattern. Contained *flows/tasks* will be executed according to their dependencies which will be resolved by using the *flows/tasks* provides and requires diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index 7ef1f84c..8ce8070e 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -41,7 +41,7 @@ class Flow(flow.Flow): return self # NOTE(imelnikov): we add item to the end of flow, so it should - # not provide anything previous items of the flow require + # not provide anything previous items of the flow require. requires = self.requires provides = self.provides for item in items: diff --git a/taskflow/patterns/unordered_flow.py b/taskflow/patterns/unordered_flow.py index 77e93668..1274d367 100644 --- a/taskflow/patterns/unordered_flow.py +++ b/taskflow/patterns/unordered_flow.py @@ -43,7 +43,8 @@ class Flow(flow.Flow): if not items: return self - # NOTE(harlowja): check that items to be added are actually independent + # NOTE(harlowja): check that items to be added are actually + # independent. provides = self.provides old_requires = self.requires for item in items: diff --git a/taskflow/persistence/backends/impl_dir.py b/taskflow/persistence/backends/impl_dir.py index 635cb2b1..e3e4eb46 100644 --- a/taskflow/persistence/backends/impl_dir.py +++ b/taskflow/persistence/backends/impl_dir.py @@ -144,7 +144,7 @@ class Connection(base.Connection): pass def get_logbooks(self): - # NOTE(harlowja): don't hold the lock while iterating + # NOTE(harlowja): don't hold the lock while iterating. with self._lock: books = list(self._get_logbooks()) for b in books: @@ -406,7 +406,7 @@ class Connection(base.Connection): ### def _str_2_datetime(text): - """Converts an iso8601 string/text into a datetime object (or none)""" + """Converts an iso8601 string/text into a datetime object (or none).""" if text is None: return None if not isinstance(text, six.string_types): diff --git a/taskflow/persistence/backends/impl_memory.py b/taskflow/persistence/backends/impl_memory.py index 6716fdd9..5208e7f7 100644 --- a/taskflow/persistence/backends/impl_memory.py +++ b/taskflow/persistence/backends/impl_memory.py @@ -152,8 +152,8 @@ class Connection(base.Connection): e_lb._updated_at = timeutils.utcnow() p_utils.logbook_merge(e_lb, book, deep_copy=True) - # Add anything in to the new logbook that isn't already - # in the existing logbook. + # Add anything in to the new logbook that isn't already in the existing + # logbook. for flow_detail in book: try: e_fd = self.backend.flow_details[flow_detail.uuid] @@ -178,6 +178,6 @@ class Connection(base.Connection): return list(self.backend.log_books.values()) def get_logbooks(self): - # NOTE(harlowja): don't hold the lock while iterating + # NOTE(harlowja): don't hold the lock while iterating. for lb in self._get_logbooks(): yield lb diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index fc2d061b..d03be3db 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -89,17 +89,17 @@ POSTGRES_CONN_ERRORS = ( 'could not connect to server', ) POSTGRES_GONE_WAY_AWAY_ERRORS = ( - # Server terminated while in progress (postgres errors are pretty weird) + # Server terminated while in progress (postgres errors are pretty weird). 'server closed the connection unexpectedly', 'terminating connection due to administrator command', ) -# These connection urls mean sqlite is being used as an in-memory DB +# These connection urls mean sqlite is being used as an in-memory DB. SQLITE_IN_MEMORY = ('sqlite://', 'sqlite:///', 'sqlite:///:memory:') def _in_any(reason, err_haystack): - """Checks if any elements of the haystack are in the given reason""" + """Checks if any elements of the haystack are in the given reason.""" for err in err_haystack: if reason.find(str(err)) != -1: return True @@ -137,7 +137,7 @@ def _set_mode_traditional(dbapi_con, con_record, connection_proxy): def _ping_listener(dbapi_conn, connection_rec, connection_proxy): """Ensures that MySQL connections checked out of the pool are alive. - Modified + borrowed from: http://bit.ly/14BYaW6 + Modified + borrowed from: http://bit.ly/14BYaW6. """ try: dbapi_conn.cursor().execute('select 1') @@ -218,7 +218,7 @@ class SQLAlchemyBackend(base.Backend): if 'sqlite' in e_url.drivername: engine_args["poolclass"] = sa_pool.NullPool - # Adjustments for in-memory sqlite usage + # Adjustments for in-memory sqlite usage. if sql_connection.lower().strip() in SQLITE_IN_MEMORY: engine_args["poolclass"] = sa_pool.StaticPool engine_args["connect_args"] = {'check_same_thread': False} diff --git a/taskflow/persistence/backends/sqlalchemy/models.py b/taskflow/persistence/backends/sqlalchemy/models.py index d24b630d..abf1f994 100644 --- a/taskflow/persistence/backends/sqlalchemy/models.py +++ b/taskflow/persistence/backends/sqlalchemy/models.py @@ -71,7 +71,7 @@ class Failure(types.TypeDecorator): class ModelBase(TimestampMixin): - """Base model for all taskflow objects""" + """Base model for all taskflow objects.""" uuid = Column(String, default=uuidutils.generate_uuid, primary_key=True, nullable=False, unique=True) name = Column(String, nullable=True) @@ -79,7 +79,7 @@ class ModelBase(TimestampMixin): class LogBook(BASE, ModelBase): - """Represents a logbook for a set of flows""" + """Represents a logbook for a set of flows.""" __tablename__ = 'logbooks' # Relationships diff --git a/taskflow/states.py b/taskflow/states.py index 887e508c..74b47a04 100644 --- a/taskflow/states.py +++ b/taskflow/states.py @@ -145,13 +145,13 @@ _ALLOWED_TASK_TRANSITIONS = frozenset(( # NOTE(harlowja): allow the tasks to restart if in the same state # as a they were in before as a task may be 'killed' while in one of the # below states and it is permissible to let the task to re-enter that - # same state to try to finish + # same state to try to finish. (REVERTING, REVERTING), (RUNNING, RUNNING), # NOTE(harlowja): the task was 'killed' while in one of the starting/ending # states and it is permissible to let the task to start running or - # reverting again (if it really wants too) + # reverting again (if it really wants too). (REVERTING, RUNNING), (RUNNING, REVERTING), )) @@ -166,7 +166,7 @@ _IGNORED_TASK_TRANSITIONS = [ # # NOTE(harlowja): the above ALLOWED_TASK_TRANSITIONS does allow # transitions to certain equivalent states (but only for a few special -# cases) +# cases). _IGNORED_TASK_TRANSITIONS.extend( (a, a) for a in (PENDING, FAILURE, SUCCESS, REVERTED) ) diff --git a/taskflow/storage.py b/taskflow/storage.py index ffaa99c4..8e71ee6e 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -33,7 +33,7 @@ STATES_WITH_RESULTS = (states.SUCCESS, states.REVERTING, states.FAILURE) class Storage(object): - """Interface between engines and logbook + """Interface between engines and logbook. This class provides a simple interface to save tasks of a given flow and associated activity and results to persistence layer (logbook, @@ -50,7 +50,7 @@ class Storage(object): self._flowdetail = flow_detail # NOTE(imelnikov): failure serialization looses information, - # so we cache failures here, in task name -> misc.Failure mapping + # so we cache failures here, in task name -> misc.Failure mapping. self._failures = {} self._reload_failures() @@ -76,7 +76,7 @@ class Storage(object): functor(conn, *args, **kwargs) def ensure_task(self, task_name, task_version=None, result_mapping=None): - """Ensure that there is taskdetail that correspond the task + """Ensure that there is taskdetail that correspond the task. If task does not exist, adds a record for it. Added task will have PENDING state. Sets result mapping for the task from result_mapping @@ -94,20 +94,20 @@ class Storage(object): return task_id def _add_task(self, uuid, task_name, task_version=None): - """Add the task to storage + """Add the task to storage. Task becomes known to storage by that name and uuid. Task state is set to PENDING. """ # TODO(imelnikov): check that task with same uuid or - # task name does not exist + # task name does not exist. td = logbook.TaskDetail(name=task_name, uuid=uuid) td.state = states.PENDING td.version = task_version self._flowdetail.add(td) def save_both(conn): - """Saves the flow and the task detail with the same connection""" + """Saves the flow and the task detail with the same connection.""" self._save_flow_detail(conn) self._save_task_detail(conn, task_detail=td) @@ -145,18 +145,18 @@ class Storage(object): task_detail.update(conn.update_task_details(task_detail)) def get_task_uuid(self, task_name): - """Get task uuid by given name""" + """Get task uuid by given name.""" td = self._taskdetail_by_name(task_name) return td.uuid def set_task_state(self, task_name, state): - """Set task state""" + """Set task state.""" td = self._taskdetail_by_name(task_name) td.state = state self._with_connection(self._save_task_detail, task_detail=td) def get_task_state(self, task_name): - """Get state of task with given name""" + """Get state of task with given name.""" return self._taskdetail_by_name(task_name).state def get_tasks_states(self, task_names): @@ -188,7 +188,7 @@ class Storage(object): if details is not None: # NOTE(imelnikov): as we can update progress without # updating details (e.g. automatically from engine) - # we save progress value with details, too + # we save progress value with details, too. if details: metadata_update['progress_details'] = { 'at_progress': progress, @@ -222,7 +222,7 @@ class Storage(object): return meta.get('progress_details') def _check_all_results_provided(self, task_name, data): - """Warn if task did not provide some of results + """Warn if task did not provide some of results. This may happen if task returns shorter tuple or list or dict without all needed keys. It may also happen if task returns @@ -240,7 +240,7 @@ class Storage(object): task_name, index, name) def save(self, task_name, data, state=states.SUCCESS): - """Put result for task with id 'uuid' to storage""" + """Put result for task with id 'uuid' to storage.""" td = self._taskdetail_by_name(task_name) td.state = state if state == states.FAILURE and isinstance(data, misc.Failure): @@ -266,13 +266,13 @@ class Storage(object): return fail def _reload_failures(self): - """Refresh failures cache""" + """Refresh failures cache.""" for td in self._flowdetail: if td.failure is not None: self._cache_failure(td.name, td.failure) def get(self, task_name): - """Get result for task with name 'task_name' to storage""" + """Get result for task with name 'task_name' to storage.""" td = self._taskdetail_by_name(task_name) if td.failure is not None: return self._cache_failure(td.name, td.failure) @@ -289,7 +289,7 @@ class Storage(object): return self._failures.copy() def has_failures(self): - """Returns True if there are failed tasks in the storage""" + """Returns True if there are failed tasks in the storage.""" return bool(self._failures) def _reset_task(self, td, state): @@ -304,7 +304,7 @@ class Storage(object): return True def reset(self, task_name, state=states.PENDING): - """Remove result for task with id 'uuid' from storage""" + """Remove result for task with id 'uuid' from storage.""" td = self._taskdetail_by_name(task_name) if self._reset_task(td, state): self._with_connection(self._save_task_detail, task_detail=td) @@ -326,7 +326,7 @@ class Storage(object): return result def inject(self, pairs): - """Add values into storage + """Add values into storage. This method should be used to put flow parameters (requirements that are not satisfied by any task in the flow) into storage. @@ -347,7 +347,7 @@ class Storage(object): dict((name, name) for name in names)) def _set_result_mapping(self, task_name, mapping): - """Set mapping for naming task results + """Set mapping for naming task results. The result saved with given name would be accessible by names defined in mapping. Mapping is a dict name => index. If index @@ -377,7 +377,7 @@ class Storage(object): name) def fetch(self, name): - """Fetch named task result""" + """Fetch named task result.""" try: indexes = self._reverse_mapping[name] except KeyError: @@ -392,7 +392,7 @@ class Storage(object): raise exceptions.NotFound("Unable to find result %r" % name) def fetch_all(self): - """Fetch all named task results known so far + """Fetch all named task results known so far. Should be used for debugging and testing purposes mostly. """ @@ -405,17 +405,17 @@ class Storage(object): return result def fetch_mapped_args(self, args_mapping): - """Fetch arguments for the task using arguments mapping""" + """Fetch arguments for the task using arguments mapping.""" return dict((key, self.fetch(name)) for key, name in six.iteritems(args_mapping)) def set_flow_state(self, state): - """Set flowdetails state and save it""" + """Set flowdetails state and save it.""" self._flowdetail.state = state self._with_connection(self._save_flow_detail) def get_flow_state(self): - """Set state from flowdetails""" + """Set state from flowdetails.""" state = self._flowdetail.state if state is None: state = states.PENDING diff --git a/taskflow/task.py b/taskflow/task.py index 8cde8ccb..45ceba51 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -48,16 +48,16 @@ class BaseTask(atom.Atom): def execute(self, *args, **kwargs): """Activate a given task which will perform some operation and return. - This method can be used to apply some given context and given set - of args and kwargs to accomplish some goal. Note that the result - that is returned needs to be serializable so that it can be passed - back into this task if reverting is triggered. + This method can be used to apply some given context and given set + of args and kwargs to accomplish some goal. Note that the result + that is returned needs to be serializable so that it can be passed + back into this task if reverting is triggered. """ def revert(self, *args, **kwargs): """Revert this task using the given context, result that the apply - provided as well as any information which may have caused - said reversion. + provided as well as any information which may have caused said + reversion. """ def update_progress(self, progress, **kwargs): @@ -144,7 +144,7 @@ class BaseTask(atom.Atom): class Task(BaseTask): - """Base class for user-defined tasks + """Base class for user-defined tasks. Adds following features to Task: - auto-generates name from type of self @@ -157,7 +157,7 @@ class Task(BaseTask): def __init__(self, name=None, provides=None, requires=None, auto_extract=True, rebind=None): - """Initialize task instance""" + """Initialize task instance.""" if provides is None: provides = self.default_provides super(Task, self).__init__(name, provides=provides) @@ -165,7 +165,7 @@ class Task(BaseTask): class FunctorTask(BaseTask): - """Adaptor to make a task from a callable + """Adaptor to make a task from a callable. Take any callable and make a task from it. """ diff --git a/taskflow/tests/test_examples.py b/taskflow/tests/test_examples.py index 3fc0cc72..d5cdbf08 100644 --- a/taskflow/tests/test_examples.py +++ b/taskflow/tests/test_examples.py @@ -17,7 +17,7 @@ # under the License. -"""Run examples as unit tests +"""Run examples as unit tests. This module executes examples as unit tests, thus ensuring they at least can be executed with current taskflow. For examples with deterministic @@ -111,7 +111,7 @@ ExamplesTestCase.update() def make_output_files(): - """Generate output files for all examples""" + """Generate output files for all examples.""" for name in list_examples(): output = run_example(name) with open(expected_output_path(name), 'w') as f: diff --git a/taskflow/tests/unit/test_graph_flow.py b/taskflow/tests/unit/test_graph_flow.py index 319330b8..8ca0f8b1 100644 --- a/taskflow/tests/unit/test_graph_flow.py +++ b/taskflow/tests/unit/test_graph_flow.py @@ -33,7 +33,7 @@ class GraphFlowTest(test.TestCase): return taskflow.engines.load(flow, store={}) def _capture_states(self): - # TODO(harlowja): move function to shared helper + # TODO(harlowja): move function to shared helper. capture_where = collections.defaultdict(list) def do_capture(state, details): diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index fa048362..db814b96 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -190,7 +190,7 @@ class StorageTest(test.TestCase): s.inject({'foo': 'bar', 'spam': 'eggs'}) # NOTE(imelnikov): injecting is implemented as special task - # so resetting tasks may break it if implemented incorrectly + # so resetting tasks may break it if implemented incorrectly. s.reset_tasks() self.assertEqual(s.fetch('spam'), 'eggs') diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py index 5ead8733..05303245 100644 --- a/taskflow/tests/unit/test_utils.py +++ b/taskflow/tests/unit/test_utils.py @@ -121,7 +121,7 @@ class GetCallableNameTest(test.TestCase): def test_static_method(self): # NOTE(imelnikov): static method are just functions, class name - # is not recorded anywhere in them + # is not recorded anywhere in them. name = reflection.get_callable_name(Class.static_method) self.assertEqual(name, '.'.join((__name__, 'static_method'))) @@ -332,14 +332,14 @@ class AttrDictTest(test.TestCase): def test_invalid_create(self): attrs = { - # Python attributes can't start with a number + # Python attributes can't start with a number. '123_abc': 1, } self.assertRaises(AttributeError, misc.AttrDict, **attrs) def test_no_overwrite(self): attrs = { - # Python attributes can't start with a number + # Python attributes can't start with a number. 'update': 1, } self.assertRaises(AttributeError, misc.AttrDict, **attrs) @@ -438,7 +438,7 @@ class StopWatchUtilsTest(test.TestCase): watch = misc.StopWatch() watch.start() time.sleep(0.2) - # NOTE(harlowja): Allow for a slight variation by using 0.19 + # NOTE(harlowja): Allow for a slight variation by using 0.19. self.assertGreaterEqual(0.19, watch.elapsed()) def test_pause_resume(self): diff --git a/taskflow/utils/async_utils.py b/taskflow/utils/async_utils.py index 9a3d5fcd..59733c5c 100644 --- a/taskflow/utils/async_utils.py +++ b/taskflow/utils/async_utils.py @@ -74,7 +74,7 @@ def wait_for_any(fs, timeout=None): def make_completed_future(result): - """Make with completed with given result""" + """Make with completed with given result.""" future = futures.Future() future.set_result(result) return future diff --git a/taskflow/utils/flow_utils.py b/taskflow/utils/flow_utils.py index 1fa2f1d3..61fc0825 100644 --- a/taskflow/utils/flow_utils.py +++ b/taskflow/utils/flow_utils.py @@ -99,12 +99,12 @@ def _flatten_task(task): def _flatten_graph(flow, flattened): graph = nx.DiGraph(name=_graph_name(flow)) subgraph_map = {} - # Flatten all nodes + # Flatten all nodes. for n in flow.graph.nodes_iter(): subgraph = _flatten(n, flattened) subgraph_map[n] = subgraph graph = gu.merge_graphs([graph, subgraph]) - # Reconnect all nodes to there corresponding subgraphs + # Reconnect all nodes to there corresponding subgraphs. for (u, v) in flow.graph.edges_iter(): # Retain and update the original edge attributes. u_v_attrs = gu.get_edge_attrs(flow.graph, u, v) diff --git a/taskflow/utils/graph_utils.py b/taskflow/utils/graph_utils.py index e0ce99e8..f0c2020c 100644 --- a/taskflow/utils/graph_utils.py +++ b/taskflow/utils/graph_utils.py @@ -51,14 +51,14 @@ def merge_graphs(graphs, allow_overlaps=False): def get_no_successors(graph): - """Returns an iterator for all nodes with no successors""" + """Returns an iterator for all nodes with no successors.""" for n in graph.nodes_iter(): if not len(graph.successors(n)): yield n def get_no_predecessors(graph): - """Returns an iterator for all nodes with no predecessors""" + """Returns an iterator for all nodes with no predecessors.""" for n in graph.nodes_iter(): if not len(graph.predecessors(n)): yield n @@ -96,5 +96,5 @@ def pformat(graph): def export_graph_to_dot(graph): - """Exports the graph to a dot format (requires pydot library)""" + """Exports the graph to a dot format (requires pydot library).""" return nx.to_pydot(graph).to_string() diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index 72939610..da84d1a0 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -158,14 +158,16 @@ _ASCII_WORD_SYMBOLS = frozenset(string.ascii_letters + string.digits + '_') def is_valid_attribute_name(name, allow_self=False, allow_hidden=False): - """Validates that a string name is a valid/invalid python attribute name""" + """Validates that a string name is a valid/invalid python attribute + name. + """ return all(( isinstance(name, six.string_types), len(name) > 0, (allow_self or not name.lower().startswith('self')), (allow_hidden or not name.lower().startswith('_')), - # NOTE(imelnikov): keywords should be forbidden + # NOTE(imelnikov): keywords should be forbidden. not keyword.iskeyword(name), # See: http://docs.python.org/release/2.5.2/ref/grammar.txt @@ -184,7 +186,7 @@ class AttrDict(dict): def _is_valid_attribute_name(cls, name): if not is_valid_attribute_name(name): return False - # Make the name just be a simple string in latin-1 encoding in python3 + # Make the name just be a simple string in latin-1 encoding in python3. if name in cls.NO_ATTRS: return False return True @@ -263,7 +265,7 @@ def as_int(obj, quiet=False): # amount of other files it does not seem so useful to include that full # module just for this function. def ensure_tree(path): - """Create a directory (and any ancestor directories required) + """Create a directory (and any ancestor directories required). :param path: Directory to create """ @@ -361,7 +363,7 @@ class TransitionNotifier(object): self._listeners = collections.defaultdict(list) def __len__(self): - """Returns how many callbacks are registered""" + """Returns how many callbacks are registered.""" count = 0 for (_s, callbacks) in six.iteritems(self._listeners): @@ -421,12 +423,12 @@ class TransitionNotifier(object): def copy_exc_info(exc_info): - """Make copy of exception info tuple, as deep as possible""" + """Make copy of exception info tuple, as deep as possible.""" if exc_info is None: return None exc_type, exc_value, tb = exc_info # NOTE(imelnikov): there is no need to copy type, and - # we can't copy traceback + # we can't copy traceback. return (exc_type, copy.deepcopy(exc_value), tb) @@ -438,7 +440,7 @@ def are_equal_exc_info_tuples(ei1, ei2): # NOTE(imelnikov): we can't compare exceptions with '==' # because we want exc_info be equal to it's copy made with - # copy_exc_info above + # copy_exc_info above. if ei1[0] is not ei2[0]: return False if not all((type(ei1[1]) == type(ei2[1]), @@ -558,14 +560,14 @@ class Failure(object): raise exceptions.WrappedFailure(failures) def reraise(self): - """Re-raise captured exception""" + """Re-raise captured exception.""" if self._exc_info: six.reraise(*self._exc_info) else: raise exceptions.WrappedFailure([self]) def check(self, *exc_classes): - """Check if any of exc_classes caused the failure + """Check if any of exc_classes caused the failure. Arguments of this method can be exception types or type names (stings). If captured exception is instance of @@ -586,7 +588,7 @@ class Failure(object): self._exception_str) def __iter__(self): - """Iterate over exception type names""" + """Iterate over exception type names.""" for et in self._exc_type_names: yield et diff --git a/taskflow/utils/persistence_utils.py b/taskflow/utils/persistence_utils.py index 3c53564d..b9bd1940 100644 --- a/taskflow/utils/persistence_utils.py +++ b/taskflow/utils/persistence_utils.py @@ -86,7 +86,7 @@ def create_flow_detail(flow, book=None, backend=None, meta=None): with contextlib.closing(backend.get_connection()) as conn: conn.save_logbook(book) # Return the one from the saved logbook instead of the local one so - # that the freshest version is given back + # that the freshest version is given back. return book.find(flow_id) else: return flow_detail @@ -112,7 +112,7 @@ def task_details_merge(td_e, td_new, deep_copy=False): copy_fn = _copy_function(deep_copy) if td_e.state != td_new.state: - # NOTE(imelnikov): states are just strings, no need to copy + # NOTE(imelnikov): states are just strings, no need to copy. td_e.state = td_new.state if td_e.results != td_new.results: td_e.results = copy_fn(td_new.results) @@ -145,7 +145,7 @@ def flow_details_merge(fd_e, fd_new, deep_copy=False): if fd_e.meta != fd_new.meta: fd_e.meta = copy_fn(fd_new.meta) if fd_e.state != fd_new.state: - # NOTE(imelnikov): states are just strings, no need to copy + # NOTE(imelnikov): states are just strings, no need to copy. fd_e.state = fd_new.state return fd_e @@ -168,7 +168,7 @@ def logbook_merge(lb_e, lb_new, deep_copy=False): def failure_to_dict(failure): - """Convert misc.Failure object to JSON-serializable dict""" + """Convert misc.Failure object to JSON-serializable dict.""" if not failure: return None if not isinstance(failure, misc.Failure): @@ -185,8 +185,7 @@ def failure_to_dict(failure): def failure_from_dict(data): """Restore misc.Failure object from dict. - The dict should be similar to what failure_to_dict() function - produces. + The dict should be similar to what failure_to_dict() function produces. """ if not data: return None @@ -227,7 +226,7 @@ def _format_shared(obj, indent): def pformat_task_detail(task_detail, indent=0): - """Pretty formats a task detail""" + """Pretty formats a task detail.""" lines = ["%sTask: '%s'" % (" " * (indent), task_detail.name)] lines.extend(_format_shared(task_detail, indent=indent + 1)) lines.append("%s- version = %s" @@ -241,7 +240,7 @@ def pformat_task_detail(task_detail, indent=0): def pformat_flow_detail(flow_detail, indent=0): - """Pretty formats a flow detail""" + """Pretty formats a flow detail.""" lines = ["%sFlow: '%s'" % (" " * indent, flow_detail.name)] lines.extend(_format_shared(flow_detail, indent=indent + 1)) lines.extend(_format_meta(flow_detail.meta, indent=indent + 1)) @@ -251,7 +250,7 @@ def pformat_flow_detail(flow_detail, indent=0): def pformat(book, indent=0): - """Pretty formats a logbook""" + """Pretty formats a logbook.""" lines = ["%sLogbook: '%s'" % (" " * indent, book.name)] lines.extend(_format_shared(book, indent=indent + 1)) lines.extend(_format_meta(book.meta, indent=indent + 1)) diff --git a/taskflow/utils/reflection.py b/taskflow/utils/reflection.py index 0674628f..a7d200cc 100644 --- a/taskflow/utils/reflection.py +++ b/taskflow/utils/reflection.py @@ -47,7 +47,7 @@ def get_class_name(obj): def get_all_class_names(obj, up_to=object): - """Get class names of object parent classes + """Get class names of object parent classes. Iterate over all class names object is instance or subclass of, in order of method resolution (mro). If up_to parameter is provided, @@ -61,7 +61,7 @@ def get_all_class_names(obj, up_to=object): def get_callable_name(function): - """Generate a name from callable + """Generate a name from callable. Tries to do the best to guess fully qualified callable name. """ @@ -136,7 +136,7 @@ def _get_arg_spec(function): def get_callable_args(function, required_only=False): - """Get names of callable arguments + """Get names of callable arguments. Special arguments (like *args and **kwargs) are not included into output. @@ -154,6 +154,6 @@ def get_callable_args(function, required_only=False): def accepts_kwargs(function): - """Returns True if function accepts kwargs""" + """Returns True if function accepts kwargs.""" argspec, _bound = _get_arg_spec(function) return bool(argspec.keywords) diff --git a/taskflow/utils/threading_utils.py b/taskflow/utils/threading_utils.py index ee82af0a..b47fb701 100644 --- a/taskflow/utils/threading_utils.py +++ b/taskflow/utils/threading_utils.py @@ -40,7 +40,7 @@ def get_optimal_thread_count(): class ThreadSafeMeta(type): - """Metaclass that adds locking to all pubic methods of a class""" + """Metaclass that adds locking to all pubic methods of a class.""" def __new__(cls, name, bases, attrs): for attr_name, attr_value in six.iteritems(attrs): diff --git a/tox-tmpl.ini b/tox-tmpl.ini index ba3f175d..857fd269 100644 --- a/tox-tmpl.ini +++ b/tox-tmpl.ini @@ -38,7 +38,6 @@ commands = python setup.py testr --coverage --testr-args='{posargs}' commands = {posargs} [flake8] -ignore = H402 builtins = _ exclude = .venv,.tox,dist,doc,./taskflow/openstack/common,*egg,.git,build,tools diff --git a/tox.ini b/tox.ini index 63d5de57..3ca1cc0c 100644 --- a/tox.ini +++ b/tox.ini @@ -62,7 +62,6 @@ commands = python setup.py testr --coverage --testr-args='{posargs}' commands = {posargs} [flake8] -ignore = H402 builtins = _ exclude = .venv,.tox,dist,doc,./taskflow/openstack/common,*egg,.git,build,tools