diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py index 5b7c93492..75ab3591c 100644 --- a/taskflow/engines/action_engine/builder.py +++ b/taskflow/engines/action_engine/builder.py @@ -17,6 +17,7 @@ import weakref from automaton import machines +from oslo_utils import timeutils from taskflow import logging from taskflow import states as st @@ -42,6 +43,11 @@ SUCCESS = 'success' REVERTED = 'reverted' START = 'start' +# For these states we will gather how long (in seconds) the +# state was in-progress (cumulatively if the state is entered multiple +# times) +TIMED_STATES = (st.ANALYZING, st.RESUMING, st.SCHEDULING, st.WAITING) + LOG = logging.getLogger(__name__) @@ -100,8 +106,20 @@ class MachineBuilder(object): self._storage = runtime.storage self._waiter = waiter - def build(self, timeout=None): + def build(self, statistics, timeout=None, gather_statistics=True): """Builds a state-machine (that is used during running).""" + if gather_statistics: + watches = {} + state_statistics = {} + statistics['seconds_per_state'] = state_statistics + watches = {} + for timed_state in TIMED_STATES: + state_statistics[timed_state.lower()] = 0.0 + watches[timed_state] = timeutils.StopWatch() + statistics['discarded_failures'] = 0 + statistics['awaiting'] = 0 + statistics['completed'] = 0 + statistics['incomplete'] = 0 memory = MachineMemory() if timeout is None: @@ -208,6 +226,10 @@ class MachineBuilder(object): " units request during completion of" " atom '%s' (intention is to %s)", result, outcome, atom, intention) + if gather_statistics: + statistics['discarded_failures'] += 1 + if gather_statistics: + statistics['completed'] += 1 except Exception: memory.failures.append(failure.Failure()) LOG.exception("Engine '%s' atom post-completion failed", atom) @@ -254,31 +276,39 @@ class MachineBuilder(object): return FINISH def on_exit(old_state, event): - LOG.debug("Exiting old state '%s' in response to event '%s'", + LOG.trace("Exiting old state '%s' in response to event '%s'", old_state, event) + if gather_statistics: + if old_state in watches: + w = watches[old_state] + w.stop() + state_statistics[old_state.lower()] += w.elapsed() + if old_state in (st.SCHEDULING, st.WAITING): + statistics['incomplete'] = len(memory.not_done) + if old_state in (st.ANALYZING, st.SCHEDULING): + statistics['awaiting'] = len(memory.next_up) def on_enter(new_state, event): - LOG.debug("Entering new state '%s' in response to event '%s'", + LOG.trace("Entering new state '%s' in response to event '%s'", new_state, event) + if gather_statistics and new_state in watches: + watches[new_state].restart() - # NOTE(harlowja): when ran in trace mode it is quite useful - # to track the various state transitions as they happen... - watchers = {} - if LOG.isEnabledFor(logging.TRACE): - watchers['on_exit'] = on_exit - watchers['on_enter'] = on_enter - + state_kwargs = { + 'on_exit': on_exit, + 'on_enter': on_enter, + } m = machines.FiniteMachine() - m.add_state(GAME_OVER, **watchers) - m.add_state(UNDEFINED, **watchers) - m.add_state(st.ANALYZING, **watchers) - m.add_state(st.RESUMING, **watchers) - m.add_state(st.REVERTED, terminal=True, **watchers) - m.add_state(st.SCHEDULING, **watchers) - m.add_state(st.SUCCESS, terminal=True, **watchers) - m.add_state(st.SUSPENDED, terminal=True, **watchers) - m.add_state(st.WAITING, **watchers) - m.add_state(st.FAILURE, terminal=True, **watchers) + m.add_state(GAME_OVER, **state_kwargs) + m.add_state(UNDEFINED, **state_kwargs) + m.add_state(st.ANALYZING, **state_kwargs) + m.add_state(st.RESUMING, **state_kwargs) + m.add_state(st.REVERTED, terminal=True, **state_kwargs) + m.add_state(st.SCHEDULING, **state_kwargs) + m.add_state(st.SUCCESS, terminal=True, **state_kwargs) + m.add_state(st.SUSPENDED, terminal=True, **state_kwargs) + m.add_state(st.WAITING, **state_kwargs) + m.add_state(st.FAILURE, terminal=True, **state_kwargs) m.default_start_state = UNDEFINED m.add_transition(GAME_OVER, st.REVERTED, REVERTED) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 8973c1c6d..4a5dbfdbb 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -25,6 +25,7 @@ import fasteners import networkx as nx from oslo_utils import excutils from oslo_utils import strutils +from oslo_utils import timeutils import six from taskflow.engines.action_engine import builder @@ -169,6 +170,9 @@ class ActionEngine(base.Engine): self._retry_executor = executor.SerialRetryExecutor() self._inject_transient = strutils.bool_from_string( self._options.get('inject_transient', True)) + self._gather_statistics = strutils.bool_from_string( + self._options.get('gather_statistics', True)) + self._statistics = {} @_pre_check(check_compiled=True, # NOTE(harlowja): We can alter the state of the @@ -180,6 +184,10 @@ class ActionEngine(base.Engine): def suspend(self): self._change_state(states.SUSPENDING) + @property + def statistics(self): + return self._statistics + @property def compilation(self): """The compilation result. @@ -261,9 +269,17 @@ class ActionEngine(base.Engine): maxlen=max(1, self.MAX_MACHINE_STATES_RETAINED)) with _start_stop(self._task_executor, self._retry_executor): self._change_state(states.RUNNING) + if self._gather_statistics: + self._statistics.clear() + w = timeutils.StopWatch() + w.start() + else: + w = None try: closed = False - machine, memory = self._runtime.builder.build(timeout=timeout) + machine, memory = self._runtime.builder.build( + self._statistics, timeout=timeout, + gather_statistics=self._gather_statistics) r = runners.FiniteRunner(machine) for transition in r.run_iter(builder.START): last_transitions.append(transition) @@ -306,6 +322,10 @@ class ActionEngine(base.Engine): six.itervalues(failures), six.itervalues(more_failures)) failure.Failure.reraise_if_any(fails) + finally: + if w is not None: + w.stop() + self._statistics['active_for'] = w.elapsed() @staticmethod def _check_compilation(compilation): diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py index c479b6fb6..ac7290ce9 100644 --- a/taskflow/engines/base.py +++ b/taskflow/engines/base.py @@ -80,6 +80,22 @@ class Engine(object): def storage(self): """The storage unit for this engine.""" + @abc.abstractproperty + def statistics(self): + """A dictionary of runtime statistics this engine has gathered. + + This dictionary will be empty when the engine has never been + ran. When it is running or has ran previously it should have (but + may not) have useful and/or informational keys and values when + running is underway and/or completed. + + .. warning:: The keys in this dictionary **should** be some what + stable (not changing), but there existence **may** + change between major releases as new statistics are + gathered or removed so before accessing keys ensure that + they actually exist and handle when they do not. + """ + @abc.abstractmethod def compile(self): """Compiles the contained flow into a internal representation. diff --git a/taskflow/examples/alphabet_soup.py b/taskflow/examples/alphabet_soup.py index eb199f8ef..e70a63975 100644 --- a/taskflow/examples/alphabet_soup.py +++ b/taskflow/examples/alphabet_soup.py @@ -88,6 +88,6 @@ try: e.prepare() print("Running...") e.run() - print("Done...") + print("Done: %s" % e.statistics) except exceptions.NotImplementedError as e: print(e) diff --git a/taskflow/examples/hello_world.py b/taskflow/examples/hello_world.py index 2ec1c9535..73fe5aa39 100644 --- a/taskflow/examples/hello_world.py +++ b/taskflow/examples/hello_world.py @@ -91,6 +91,8 @@ else: with executor: e = engines.load(song, executor=executor, engine='parallel') e.run() + print("-- Statistics gathered --") + print(e.statistics) # Run in parallel using real threads... @@ -98,6 +100,8 @@ with futurist.ThreadPoolExecutor(max_workers=1) as executor: print("-- Running in parallel using threads --") e = engines.load(song, executor=executor, engine='parallel') e.run() + print("-- Statistics gathered --") + print(e.statistics) # Run in parallel using external processes... @@ -105,6 +109,8 @@ with futurist.ProcessPoolExecutor(max_workers=1) as executor: print("-- Running in parallel using processes --") e = engines.load(song, executor=executor, engine='parallel') e.run() + print("-- Statistics gathered --") + print(e.statistics) # Run serially (aka, if the workflow could have been ran in parallel, it will @@ -112,3 +118,5 @@ with futurist.ProcessPoolExecutor(max_workers=1) as executor: print("-- Running serially --") e = engines.load(song, engine='serial') e.run() +print("-- Statistics gathered --") +print(e.statistics) diff --git a/taskflow/tests/unit/action_engine/test_builder.py b/taskflow/tests/unit/action_engine/test_builder.py index 08877f8e1..3950f85dc 100644 --- a/taskflow/tests/unit/action_engine/test_builder.py +++ b/taskflow/tests/unit/action_engine/test_builder.py @@ -56,7 +56,7 @@ class BuildersTest(test.TestCase): def _make_machine(self, flow, initial_state=None): runtime = self._make_runtime(flow, initial_state=initial_state) - machine, memory = runtime.builder.build() + machine, memory = runtime.builder.build({}) machine_runner = runners.FiniteRunner(machine) return (runtime, machine, memory, machine_runner)