From 97fd6660c6842df9c18617f1efc7832f89476bf6 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 27 Jan 2016 15:40:46 -0800 Subject: [PATCH] Add some basic/initial engine statistics It can be quite nice to expose a basic set of metrics about the internals of an engine, including the time in each state, and how long the engine is active for and likely more in the future. To start add a engine statistics property and gather some basic timing data and place this data into this property for access (and/or introspection) by users. Part of blueprint gather-engine-statistics Change-Id: Ibc3c78755bd8ae779b52fc4772519f243a521576 --- taskflow/engines/action_engine/builder.py | 70 +++++++++++++------ taskflow/engines/action_engine/engine.py | 22 +++++- taskflow/engines/base.py | 16 +++++ taskflow/examples/alphabet_soup.py | 2 +- taskflow/examples/hello_world.py | 8 +++ .../tests/unit/action_engine/test_builder.py | 2 +- 6 files changed, 97 insertions(+), 23 deletions(-) diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py index 5b7c93492..75ab3591c 100644 --- a/taskflow/engines/action_engine/builder.py +++ b/taskflow/engines/action_engine/builder.py @@ -17,6 +17,7 @@ import weakref from automaton import machines +from oslo_utils import timeutils from taskflow import logging from taskflow import states as st @@ -42,6 +43,11 @@ SUCCESS = 'success' REVERTED = 'reverted' START = 'start' +# For these states we will gather how long (in seconds) the +# state was in-progress (cumulatively if the state is entered multiple +# times) +TIMED_STATES = (st.ANALYZING, st.RESUMING, st.SCHEDULING, st.WAITING) + LOG = logging.getLogger(__name__) @@ -100,8 +106,20 @@ class MachineBuilder(object): self._storage = runtime.storage self._waiter = waiter - def build(self, timeout=None): + def build(self, statistics, timeout=None, gather_statistics=True): """Builds a state-machine (that is used during running).""" + if gather_statistics: + watches = {} + state_statistics = {} + statistics['seconds_per_state'] = state_statistics + watches = {} + for timed_state in TIMED_STATES: + state_statistics[timed_state.lower()] = 0.0 + watches[timed_state] = timeutils.StopWatch() + statistics['discarded_failures'] = 0 + statistics['awaiting'] = 0 + statistics['completed'] = 0 + statistics['incomplete'] = 0 memory = MachineMemory() if timeout is None: @@ -208,6 +226,10 @@ class MachineBuilder(object): " units request during completion of" " atom '%s' (intention is to %s)", result, outcome, atom, intention) + if gather_statistics: + statistics['discarded_failures'] += 1 + if gather_statistics: + statistics['completed'] += 1 except Exception: memory.failures.append(failure.Failure()) LOG.exception("Engine '%s' atom post-completion failed", atom) @@ -254,31 +276,39 @@ class MachineBuilder(object): return FINISH def on_exit(old_state, event): - LOG.debug("Exiting old state '%s' in response to event '%s'", + LOG.trace("Exiting old state '%s' in response to event '%s'", old_state, event) + if gather_statistics: + if old_state in watches: + w = watches[old_state] + w.stop() + state_statistics[old_state.lower()] += w.elapsed() + if old_state in (st.SCHEDULING, st.WAITING): + statistics['incomplete'] = len(memory.not_done) + if old_state in (st.ANALYZING, st.SCHEDULING): + statistics['awaiting'] = len(memory.next_up) def on_enter(new_state, event): - LOG.debug("Entering new state '%s' in response to event '%s'", + LOG.trace("Entering new state '%s' in response to event '%s'", new_state, event) + if gather_statistics and new_state in watches: + watches[new_state].restart() - # NOTE(harlowja): when ran in trace mode it is quite useful - # to track the various state transitions as they happen... - watchers = {} - if LOG.isEnabledFor(logging.TRACE): - watchers['on_exit'] = on_exit - watchers['on_enter'] = on_enter - + state_kwargs = { + 'on_exit': on_exit, + 'on_enter': on_enter, + } m = machines.FiniteMachine() - m.add_state(GAME_OVER, **watchers) - m.add_state(UNDEFINED, **watchers) - m.add_state(st.ANALYZING, **watchers) - m.add_state(st.RESUMING, **watchers) - m.add_state(st.REVERTED, terminal=True, **watchers) - m.add_state(st.SCHEDULING, **watchers) - m.add_state(st.SUCCESS, terminal=True, **watchers) - m.add_state(st.SUSPENDED, terminal=True, **watchers) - m.add_state(st.WAITING, **watchers) - m.add_state(st.FAILURE, terminal=True, **watchers) + m.add_state(GAME_OVER, **state_kwargs) + m.add_state(UNDEFINED, **state_kwargs) + m.add_state(st.ANALYZING, **state_kwargs) + m.add_state(st.RESUMING, **state_kwargs) + m.add_state(st.REVERTED, terminal=True, **state_kwargs) + m.add_state(st.SCHEDULING, **state_kwargs) + m.add_state(st.SUCCESS, terminal=True, **state_kwargs) + m.add_state(st.SUSPENDED, terminal=True, **state_kwargs) + m.add_state(st.WAITING, **state_kwargs) + m.add_state(st.FAILURE, terminal=True, **state_kwargs) m.default_start_state = UNDEFINED m.add_transition(GAME_OVER, st.REVERTED, REVERTED) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 8973c1c6d..4a5dbfdbb 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -25,6 +25,7 @@ import fasteners import networkx as nx from oslo_utils import excutils from oslo_utils import strutils +from oslo_utils import timeutils import six from taskflow.engines.action_engine import builder @@ -169,6 +170,9 @@ class ActionEngine(base.Engine): self._retry_executor = executor.SerialRetryExecutor() self._inject_transient = strutils.bool_from_string( self._options.get('inject_transient', True)) + self._gather_statistics = strutils.bool_from_string( + self._options.get('gather_statistics', True)) + self._statistics = {} @_pre_check(check_compiled=True, # NOTE(harlowja): We can alter the state of the @@ -180,6 +184,10 @@ class ActionEngine(base.Engine): def suspend(self): self._change_state(states.SUSPENDING) + @property + def statistics(self): + return self._statistics + @property def compilation(self): """The compilation result. @@ -261,9 +269,17 @@ class ActionEngine(base.Engine): maxlen=max(1, self.MAX_MACHINE_STATES_RETAINED)) with _start_stop(self._task_executor, self._retry_executor): self._change_state(states.RUNNING) + if self._gather_statistics: + self._statistics.clear() + w = timeutils.StopWatch() + w.start() + else: + w = None try: closed = False - machine, memory = self._runtime.builder.build(timeout=timeout) + machine, memory = self._runtime.builder.build( + self._statistics, timeout=timeout, + gather_statistics=self._gather_statistics) r = runners.FiniteRunner(machine) for transition in r.run_iter(builder.START): last_transitions.append(transition) @@ -306,6 +322,10 @@ class ActionEngine(base.Engine): six.itervalues(failures), six.itervalues(more_failures)) failure.Failure.reraise_if_any(fails) + finally: + if w is not None: + w.stop() + self._statistics['active_for'] = w.elapsed() @staticmethod def _check_compilation(compilation): diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py index c479b6fb6..ac7290ce9 100644 --- a/taskflow/engines/base.py +++ b/taskflow/engines/base.py @@ -80,6 +80,22 @@ class Engine(object): def storage(self): """The storage unit for this engine.""" + @abc.abstractproperty + def statistics(self): + """A dictionary of runtime statistics this engine has gathered. + + This dictionary will be empty when the engine has never been + ran. When it is running or has ran previously it should have (but + may not) have useful and/or informational keys and values when + running is underway and/or completed. + + .. warning:: The keys in this dictionary **should** be some what + stable (not changing), but there existence **may** + change between major releases as new statistics are + gathered or removed so before accessing keys ensure that + they actually exist and handle when they do not. + """ + @abc.abstractmethod def compile(self): """Compiles the contained flow into a internal representation. diff --git a/taskflow/examples/alphabet_soup.py b/taskflow/examples/alphabet_soup.py index eb199f8ef..e70a63975 100644 --- a/taskflow/examples/alphabet_soup.py +++ b/taskflow/examples/alphabet_soup.py @@ -88,6 +88,6 @@ try: e.prepare() print("Running...") e.run() - print("Done...") + print("Done: %s" % e.statistics) except exceptions.NotImplementedError as e: print(e) diff --git a/taskflow/examples/hello_world.py b/taskflow/examples/hello_world.py index 2ec1c9535..73fe5aa39 100644 --- a/taskflow/examples/hello_world.py +++ b/taskflow/examples/hello_world.py @@ -91,6 +91,8 @@ else: with executor: e = engines.load(song, executor=executor, engine='parallel') e.run() + print("-- Statistics gathered --") + print(e.statistics) # Run in parallel using real threads... @@ -98,6 +100,8 @@ with futurist.ThreadPoolExecutor(max_workers=1) as executor: print("-- Running in parallel using threads --") e = engines.load(song, executor=executor, engine='parallel') e.run() + print("-- Statistics gathered --") + print(e.statistics) # Run in parallel using external processes... @@ -105,6 +109,8 @@ with futurist.ProcessPoolExecutor(max_workers=1) as executor: print("-- Running in parallel using processes --") e = engines.load(song, executor=executor, engine='parallel') e.run() + print("-- Statistics gathered --") + print(e.statistics) # Run serially (aka, if the workflow could have been ran in parallel, it will @@ -112,3 +118,5 @@ with futurist.ProcessPoolExecutor(max_workers=1) as executor: print("-- Running serially --") e = engines.load(song, engine='serial') e.run() +print("-- Statistics gathered --") +print(e.statistics) diff --git a/taskflow/tests/unit/action_engine/test_builder.py b/taskflow/tests/unit/action_engine/test_builder.py index 08877f8e1..3950f85dc 100644 --- a/taskflow/tests/unit/action_engine/test_builder.py +++ b/taskflow/tests/unit/action_engine/test_builder.py @@ -56,7 +56,7 @@ class BuildersTest(test.TestCase): def _make_machine(self, flow, initial_state=None): runtime = self._make_runtime(flow, initial_state=initial_state) - machine, memory = runtime.builder.build() + machine, memory = runtime.builder.build({}) machine_runner = runners.FiniteRunner(machine) return (runtime, machine, memory, machine_runner)