From 522ea984891b4cf1cac304bebb0d4c1f38a6b0ea Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 17 Dec 2015 22:38:31 -0800 Subject: [PATCH] Move all internal blather usage/calls to trace usage/calls Change-Id: I415a81d3b6b15b17a9a91cc2a0681c159172a4e1 --- taskflow/engines/action_engine/builder.py | 10 ++--- taskflow/engines/action_engine/compiler.py | 16 +++---- taskflow/engines/action_engine/completer.py | 8 ++-- taskflow/engines/action_engine/engine.py | 12 ++--- taskflow/engines/action_engine/executor.py | 28 ++++++------ taskflow/engines/action_engine/scopes.py | 8 ++-- taskflow/engines/worker_based/server.py | 8 ++-- taskflow/logging.py | 7 ++- .../persistence/backends/impl_sqlalchemy.py | 2 +- taskflow/storage.py | 44 +++++++++---------- 10 files changed, 73 insertions(+), 70 deletions(-) diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py index 4ef658a3..67b333df 100644 --- a/taskflow/engines/action_engine/builder.py +++ b/taskflow/engines/action_engine/builder.py @@ -156,9 +156,9 @@ class MachineBuilder(object): if leftover_atoms: # Ok we didn't finish (either reverting or executing...) so # that means we must of been stopped at some point... - LOG.blather("Suspension determined to have been reacted to" - " since (at least) %s atoms have been left in an" - " unfinished state", leftover_atoms) + LOG.trace("Suspension determined to have been reacted to" + " since (at least) %s atoms have been left in an" + " unfinished state", leftover_atoms) return SUSPENDED elif self._analyzer.is_success(): return SUCCESS @@ -246,10 +246,10 @@ class MachineBuilder(object): LOG.debug("Entering new state '%s' in response to event '%s'", new_state, event) - # NOTE(harlowja): when ran in blather mode it is quite useful + # NOTE(harlowja): when ran in trace mode it is quite useful # to track the various state transitions as they happen... watchers = {} - if LOG.isEnabledFor(logging.BLATHER): + if LOG.isEnabledFor(logging.TRACE): watchers['on_exit'] = on_exit watchers['on_enter'] = on_enter diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py index e27b1f8f..83044e09 100644 --- a/taskflow/engines/action_engine/compiler.py +++ b/taskflow/engines/action_engine/compiler.py @@ -296,23 +296,23 @@ class PatternCompiler(object): " and/or recursive compiling is not" " supported" % (item, type(item))) self._history.add(item) - if LOG.isEnabledFor(logging.BLATHER): - LOG.blather("%sCompiling '%s'", " " * self._level, item) + if LOG.isEnabledFor(logging.TRACE): + LOG.trace("%sCompiling '%s'", " " * self._level, item) self._level += 1 def _post_item_compile(self, item, graph, node): """Called after a item is compiled; doing post-compilation actions.""" self._level -= 1 - if LOG.isEnabledFor(logging.BLATHER): + if LOG.isEnabledFor(logging.TRACE): prefix = ' ' * self._level - LOG.blather("%sDecomposed '%s' into:", prefix, item) + LOG.trace("%sDecomposed '%s' into:", prefix, item) prefix = ' ' * (self._level + 1) - LOG.blather("%sGraph:", prefix) + LOG.trace("%sGraph:", prefix) for line in graph.pformat().splitlines(): - LOG.blather("%s %s", prefix, line) - LOG.blather("%sHierarchy:", prefix) + LOG.trace("%s %s", prefix, line) + LOG.trace("%sHierarchy:", prefix) for line in node.pformat().splitlines(): - LOG.blather("%s %s", prefix, line) + LOG.trace("%s %s", prefix, line) def _pre_compile(self): """Called before the compilation of the root starts.""" diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index c1813aa6..140785ad 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -221,11 +221,11 @@ class Completer(object): LOG.debug("Applying resolver '%s' to resolve failure '%s'" " of atom '%s'", resolver, failure, atom) tweaked = resolver.apply() - # Only show the tweaked node list when blather is on, otherwise + # Only show the tweaked node list when trace is on, otherwise # just show the amount/count of nodes tweaks... - if LOG.isEnabledFor(logging.BLATHER): - LOG.blather("Modified/tweaked %s nodes while applying" - " resolver '%s'", tweaked, resolver) + if LOG.isEnabledFor(logging.TRACE): + LOG.trace("Modified/tweaked %s nodes while applying" + " resolver '%s'", tweaked, resolver) else: LOG.debug("Modified/tweaked %s nodes while applying" " resolver '%s'", len(tweaked), resolver) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 1a32b2e8..c65da342 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -329,13 +329,13 @@ class ActionEngine(base.Engine): # flow/task provided or storage provided, if there are still missing # dependencies then this flow will fail at runtime (which we can avoid # by failing at validation time). - if LOG.isEnabledFor(logging.BLATHER): + if LOG.isEnabledFor(logging.TRACE): execution_graph = self._compilation.execution_graph - LOG.blather("Validating scoping and argument visibility for" - " execution graph with %s nodes and %s edges with" - " density %0.3f", execution_graph.number_of_nodes(), - execution_graph.number_of_edges(), - nx.density(execution_graph)) + LOG.trace("Validating scoping and argument visibility for" + " execution graph with %s nodes and %s edges with" + " density %0.3f", execution_graph.number_of_nodes(), + execution_graph.number_of_edges(), + nx.density(execution_graph)) missing = set() # Attempt to retain a chain of what was missing (so that the final # raised exception for the flow has the nodes that had missing diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 2d0d292d..1f7ec0b7 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -193,9 +193,9 @@ class _WaitWorkItem(object): watch = timeutils.StopWatch() watch.start() self._barrier.wait() - LOG.blather("Waited %s seconds until task '%s' %s emitted" - " notifications were depleted", watch.elapsed(), - self._task, sent_events) + LOG.trace("Waited %s seconds until task '%s' %s emitted" + " notifications were depleted", watch.elapsed(), + self._task, sent_events) def __call__(self): args = self._args @@ -270,11 +270,11 @@ class _Dispatcher(object): else: # Just incase set the barrier to unblock any worker... target.barrier.set() - if LOG.isEnabledFor(logging.BLATHER): - LOG.blather("Dispatched %s messages %s to target '%s' during" - " the lifetime of its existence in the dispatcher", - sum(six.itervalues(target.dispatched)), - dict(target.dispatched), target) + if LOG.isEnabledFor(logging.TRACE): + LOG.trace("Dispatched %s messages %s to target '%s' during" + " the lifetime of its existence in the dispatcher", + sum(six.itervalues(target.dispatched)), + dict(target.dispatched), target) def reset(self): self._stop_when_empty = False @@ -289,12 +289,12 @@ class _Dispatcher(object): self._dead.set() def _dispatch(self, message): - if LOG.isEnabledFor(logging.BLATHER): - LOG.blather("Dispatching message %s (it took %s seconds" - " for it to arrive for processing after being" - " sent)", message, - timeutils.delta_seconds(message['sent_on'], - timeutils.utcnow())) + if LOG.isEnabledFor(logging.TRACE): + LOG.trace("Dispatching message %s (it took %s seconds" + " for it to arrive for processing after being" + " sent)", message, + timeutils.delta_seconds(message['sent_on'], + timeutils.utcnow())) try: kind = message['kind'] sender = message['sender'] diff --git a/taskflow/engines/action_engine/scopes.py b/taskflow/engines/action_engine/scopes.py index 4dbfa76f..4558ddda 100644 --- a/taskflow/engines/action_engine/scopes.py +++ b/taskflow/engines/action_engine/scopes.py @@ -125,11 +125,11 @@ class ScopeWalker(object): if not predecessors: break self._level_cache[lvl] = (visible, removals) - if LOG.isEnabledFor(logging.BLATHER): + if LOG.isEnabledFor(logging.TRACE): visible_names = [a.name for a in visible] - LOG.blather("Scope visible to '%s' (limited by parent '%s'" - " index < %s) is: %s", self._atom, - parent.item.name, last_idx, visible_names) + LOG.trace("Scope visible to '%s' (limited by parent '%s'" + " index < %s) is: %s", self._atom, + parent.item.name, last_idx, visible_names) if self._names_only: yield [a.name for a in visible] else: diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index f1bd0fcd..4c3b32bc 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -67,10 +67,10 @@ class Server(object): func_name = reflection.get_callable_name(func) def _on_run(watch, content, message): - LOG.blather("It took %s seconds to get around to running" - " function/method '%s' with" - " message '%s'", watch.elapsed(), func_name, - ku.DelayedPretty(message)) + LOG.trace("It took %s seconds to get around to running" + " function/method '%s' with" + " message '%s'", watch.elapsed(), func_name, + ku.DelayedPretty(message)) return func(content, message) def _on_receive(content, message): diff --git a/taskflow/logging.py b/taskflow/logging.py index 3fa5ad93..9682e2b6 100644 --- a/taskflow/logging.py +++ b/taskflow/logging.py @@ -18,6 +18,8 @@ from __future__ import absolute_import import logging +from debtcollector import moves + _BASE = __name__.split(".", 1)[0] # Add a BLATHER/TRACE level, this matches the multiprocessing @@ -40,8 +42,9 @@ WARN = logging.WARN WARNING = logging.WARNING -class _BlatherLoggerAdapter(logging.LoggerAdapter): +class _TraceLoggerAdapter(logging.LoggerAdapter): + @moves.moved_method("trace", version="1.26.0", removal_version="?") def blather(self, msg, *args, **kwargs): """Delegate a blather call to the underlying logger.""" self.log(BLATHER, msg, *args, **kwargs) @@ -59,4 +62,4 @@ def getLogger(name=_BASE, extra=None): logger = logging.getLogger(name) if not logger.handlers: logger.addHandler(logging.NullHandler()) - return _BlatherLoggerAdapter(logger, extra=extra) + return _TraceLoggerAdapter(logger, extra=extra) diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index f6cf7f04..e223e02e 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -300,7 +300,7 @@ class SQLAlchemyBackend(base.Backend): log_statements = conf.pop('log_statements', False) if _as_bool(log_statements): log_statements_level = conf.pop("log_statements_level", - logging.BLATHER) + logging.TRACE) sa.event.listen(engine, "before_cursor_execute", functools.partial(_log_statements, log_statements_level)) diff --git a/taskflow/storage.py b/taskflow/storage.py index f6769369..f8a38d4a 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -850,11 +850,11 @@ class Storage(object): ] missing = set(six.iterkeys(args_mapping)) for (bound_name, name) in six.iteritems(args_mapping): - if LOG.isEnabledFor(logging.BLATHER): - LOG.blather("Looking for %r <= %r for atom named: %s", - bound_name, name, atom_name) + if LOG.isEnabledFor(logging.TRACE): + LOG.trace("Looking for %r <= %r for atom named: %s", + bound_name, name, atom_name) if bound_name in optional_args: - LOG.blather("Argument %r is optional, skipping", bound_name) + LOG.trace("Argument %r is optional, skipping", bound_name) missing.discard(bound_name) continue maybe_providers = 0 @@ -866,9 +866,9 @@ class Storage(object): providers = _locate_providers(name, scope_walker=scope_walker) maybe_providers += len(providers) if maybe_providers: - LOG.blather("Atom %s will have %s potential providers" - " of %r <= %r", atom_name, maybe_providers, - bound_name, name) + LOG.trace("Atom %s will have %s potential providers" + " of %r <= %r", atom_name, maybe_providers, + bound_name, name) missing.discard(bound_name) return missing @@ -962,32 +962,32 @@ class Storage(object): return {} mapped_args = {} for (bound_name, name) in six.iteritems(args_mapping): - if LOG.isEnabledFor(logging.BLATHER): + if LOG.isEnabledFor(logging.TRACE): if atom_name: - LOG.blather("Looking for %r <= %r for atom named: %s", - bound_name, name, atom_name) + LOG.trace("Looking for %r <= %r for atom named: %s", + bound_name, name, atom_name) else: - LOG.blather("Looking for %r <= %r", bound_name, name) + LOG.trace("Looking for %r <= %r", bound_name, name) try: source_index, value = _extract_first_from(name, injected_sources) mapped_args[bound_name] = value - if LOG.isEnabledFor(logging.BLATHER): + if LOG.isEnabledFor(logging.TRACE): if source_index == 0: - LOG.blather("Matched %r <= %r to %r (from injected" - " atom-specific transient" - " values)", bound_name, name, value) + LOG.trace("Matched %r <= %r to %r (from injected" + " atom-specific transient" + " values)", bound_name, name, value) else: - LOG.blather("Matched %r <= %r to %r (from injected" - " atom-specific persistent" - " values)", bound_name, name, value) + LOG.trace("Matched %r <= %r to %r (from injected" + " atom-specific persistent" + " values)", bound_name, name, value) except KeyError: try: possible_providers = self._reverse_mapping[name] except KeyError: if bound_name in optional_args: - LOG.blather("Argument %r is optional, skipping", - bound_name) + LOG.trace("Argument %r is optional, skipping", + bound_name) continue raise exceptions.NotFound("Name %r is not mapped as a" " produced output by any" @@ -1003,8 +1003,8 @@ class Storage(object): % (bound_name, name, len(possible_providers))) provider, value = _item_from_first_of(providers, name) mapped_args[bound_name] = value - LOG.blather("Matched %r <= %r to %r (from %s)", - bound_name, name, value, provider) + LOG.trace("Matched %r <= %r to %r (from %s)", + bound_name, name, value, provider) return mapped_args @fasteners.write_locked