Merge "Add logging around metadata, ignore tallying + history"
This commit is contained in:
@@ -22,8 +22,11 @@ import six
|
||||
from taskflow import deciders
|
||||
from taskflow.engines.action_engine import compiler
|
||||
from taskflow.engines.action_engine import traversal
|
||||
from taskflow import logging
|
||||
from taskflow import states
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Decider(object):
|
||||
@@ -119,31 +122,46 @@ class IgnoreDecider(Decider):
|
||||
self._edge_deciders = edge_deciders
|
||||
|
||||
def tally(self, runtime):
|
||||
if not self._edge_deciders:
|
||||
return []
|
||||
# Gather all atoms (the ones that were not ignored) results so that
|
||||
# those results can be used
|
||||
# by the decider(s) that are making a decision as to pass or
|
||||
# not pass...
|
||||
states_intentions = runtime.storage.get_atoms_states(
|
||||
ed.from_node.name for ed in self._edge_deciders
|
||||
if ed.kind in compiler.ATOMS)
|
||||
voters = {
|
||||
'run_it': [],
|
||||
'do_not_run_it': [],
|
||||
'ignored': [],
|
||||
}
|
||||
history = {}
|
||||
for atom_name in six.iterkeys(states_intentions):
|
||||
atom_state, _atom_intention = states_intentions[atom_name]
|
||||
if atom_state != states.IGNORE:
|
||||
history[atom_name] = runtime.storage.get(atom_name)
|
||||
nay_voters = []
|
||||
for ed in self._edge_deciders:
|
||||
if (ed.kind in compiler.ATOMS and
|
||||
# It was an ignored atom (not included in history and the
|
||||
# only way that is possible is via above loop skipping
|
||||
# it...)
|
||||
ed.from_node.name not in history):
|
||||
continue
|
||||
if not ed.decider(history=history):
|
||||
nay_voters.append(ed)
|
||||
return nay_voters
|
||||
if self._edge_deciders:
|
||||
# Gather all atoms (the ones that were not ignored) results so
|
||||
# that those results can be used by the decider(s) that are
|
||||
# making a decision as to pass or not pass...
|
||||
states_intentions = runtime.storage.get_atoms_states(
|
||||
ed.from_node.name for ed in self._edge_deciders
|
||||
if ed.kind in compiler.ATOMS)
|
||||
for atom_name in six.iterkeys(states_intentions):
|
||||
atom_state, _atom_intention = states_intentions[atom_name]
|
||||
if atom_state != states.IGNORE:
|
||||
history[atom_name] = runtime.storage.get(atom_name)
|
||||
for ed in self._edge_deciders:
|
||||
if (ed.kind in compiler.ATOMS and
|
||||
# It was an ignored atom (not included in history and
|
||||
# the only way that is possible is via above loop
|
||||
# skipping it...)
|
||||
ed.from_node.name not in history):
|
||||
voters['ignored'].append(ed)
|
||||
continue
|
||||
if not ed.decider(history=history):
|
||||
voters['do_not_run_it'].append(ed)
|
||||
else:
|
||||
voters['run_it'].append(ed)
|
||||
if LOG.isEnabledFor(logging.TRACE):
|
||||
LOG.trace("Out of %s deciders there were %s 'do no run it'"
|
||||
" voters, %s 'do run it' voters and %s 'ignored'"
|
||||
" voters for transition to atom '%s' given history %s",
|
||||
sum(len(eds) for eds in six.itervalues(voters)),
|
||||
list(ed.from_node.name
|
||||
for ed in voters['do_not_run_it']),
|
||||
list(ed.from_node.name for ed in voters['run_it']),
|
||||
list(ed.from_node.name for ed in voters['ignored']),
|
||||
self._atom.name, history)
|
||||
return voters['do_not_run_it']
|
||||
|
||||
def affect(self, runtime, nay_voters):
|
||||
# If there were many 'nay' edge deciders that were targeted
|
||||
|
@@ -30,6 +30,7 @@ from taskflow.engines.action_engine import scheduler as sched
|
||||
from taskflow.engines.action_engine import scopes as sc
|
||||
from taskflow.engines.action_engine import traversal as tr
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow import logging
|
||||
from taskflow import states as st
|
||||
from taskflow.utils import misc
|
||||
|
||||
@@ -39,6 +40,8 @@ from taskflow.flow import (LINK_DECIDER, LINK_DECIDER_DEPTH) # noqa
|
||||
_EdgeDecider = collections.namedtuple('_EdgeDecider',
|
||||
'from_node,kind,decider,depth')
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Runtime(object):
|
||||
"""A aggregate of runtime objects, properties, ... used during execution.
|
||||
@@ -139,6 +142,8 @@ class Runtime(object):
|
||||
metadata['scheduler'] = scheduler
|
||||
metadata['edge_deciders'] = tuple(deciders_it)
|
||||
metadata['action'] = action
|
||||
LOG.trace("Compiled %s metadata for node %s (%s)",
|
||||
metadata, node.name, node_kind)
|
||||
self._atom_cache[node.name] = metadata
|
||||
# TODO(harlowja): optimize the different decider depths to avoid
|
||||
# repeated full successor searching; this can be done by searching
|
||||
|
Reference in New Issue
Block a user