Merge "Add some basic/initial engine statistics"
This commit is contained in:
@@ -17,6 +17,7 @@
|
||||
import weakref
|
||||
|
||||
from automaton import machines
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from taskflow import logging
|
||||
from taskflow import states as st
|
||||
@@ -42,6 +43,11 @@ SUCCESS = 'success'
|
||||
REVERTED = 'reverted'
|
||||
START = 'start'
|
||||
|
||||
# For these states we will gather how long (in seconds) the
|
||||
# state was in-progress (cumulatively if the state is entered multiple
|
||||
# times)
|
||||
TIMED_STATES = (st.ANALYZING, st.RESUMING, st.SCHEDULING, st.WAITING)
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -100,8 +106,20 @@ class MachineBuilder(object):
|
||||
self._storage = runtime.storage
|
||||
self._waiter = waiter
|
||||
|
||||
def build(self, timeout=None):
|
||||
def build(self, statistics, timeout=None, gather_statistics=True):
|
||||
"""Builds a state-machine (that is used during running)."""
|
||||
if gather_statistics:
|
||||
watches = {}
|
||||
state_statistics = {}
|
||||
statistics['seconds_per_state'] = state_statistics
|
||||
watches = {}
|
||||
for timed_state in TIMED_STATES:
|
||||
state_statistics[timed_state.lower()] = 0.0
|
||||
watches[timed_state] = timeutils.StopWatch()
|
||||
statistics['discarded_failures'] = 0
|
||||
statistics['awaiting'] = 0
|
||||
statistics['completed'] = 0
|
||||
statistics['incomplete'] = 0
|
||||
|
||||
memory = MachineMemory()
|
||||
if timeout is None:
|
||||
@@ -208,6 +226,10 @@ class MachineBuilder(object):
|
||||
" units request during completion of"
|
||||
" atom '%s' (intention is to %s)",
|
||||
result, outcome, atom, intention)
|
||||
if gather_statistics:
|
||||
statistics['discarded_failures'] += 1
|
||||
if gather_statistics:
|
||||
statistics['completed'] += 1
|
||||
except Exception:
|
||||
memory.failures.append(failure.Failure())
|
||||
LOG.exception("Engine '%s' atom post-completion failed", atom)
|
||||
@@ -254,31 +276,39 @@ class MachineBuilder(object):
|
||||
return FINISH
|
||||
|
||||
def on_exit(old_state, event):
|
||||
LOG.debug("Exiting old state '%s' in response to event '%s'",
|
||||
LOG.trace("Exiting old state '%s' in response to event '%s'",
|
||||
old_state, event)
|
||||
if gather_statistics:
|
||||
if old_state in watches:
|
||||
w = watches[old_state]
|
||||
w.stop()
|
||||
state_statistics[old_state.lower()] += w.elapsed()
|
||||
if old_state in (st.SCHEDULING, st.WAITING):
|
||||
statistics['incomplete'] = len(memory.not_done)
|
||||
if old_state in (st.ANALYZING, st.SCHEDULING):
|
||||
statistics['awaiting'] = len(memory.next_up)
|
||||
|
||||
def on_enter(new_state, event):
|
||||
LOG.debug("Entering new state '%s' in response to event '%s'",
|
||||
LOG.trace("Entering new state '%s' in response to event '%s'",
|
||||
new_state, event)
|
||||
if gather_statistics and new_state in watches:
|
||||
watches[new_state].restart()
|
||||
|
||||
# NOTE(harlowja): when ran in trace mode it is quite useful
|
||||
# to track the various state transitions as they happen...
|
||||
watchers = {}
|
||||
if LOG.isEnabledFor(logging.TRACE):
|
||||
watchers['on_exit'] = on_exit
|
||||
watchers['on_enter'] = on_enter
|
||||
|
||||
state_kwargs = {
|
||||
'on_exit': on_exit,
|
||||
'on_enter': on_enter,
|
||||
}
|
||||
m = machines.FiniteMachine()
|
||||
m.add_state(GAME_OVER, **watchers)
|
||||
m.add_state(UNDEFINED, **watchers)
|
||||
m.add_state(st.ANALYZING, **watchers)
|
||||
m.add_state(st.RESUMING, **watchers)
|
||||
m.add_state(st.REVERTED, terminal=True, **watchers)
|
||||
m.add_state(st.SCHEDULING, **watchers)
|
||||
m.add_state(st.SUCCESS, terminal=True, **watchers)
|
||||
m.add_state(st.SUSPENDED, terminal=True, **watchers)
|
||||
m.add_state(st.WAITING, **watchers)
|
||||
m.add_state(st.FAILURE, terminal=True, **watchers)
|
||||
m.add_state(GAME_OVER, **state_kwargs)
|
||||
m.add_state(UNDEFINED, **state_kwargs)
|
||||
m.add_state(st.ANALYZING, **state_kwargs)
|
||||
m.add_state(st.RESUMING, **state_kwargs)
|
||||
m.add_state(st.REVERTED, terminal=True, **state_kwargs)
|
||||
m.add_state(st.SCHEDULING, **state_kwargs)
|
||||
m.add_state(st.SUCCESS, terminal=True, **state_kwargs)
|
||||
m.add_state(st.SUSPENDED, terminal=True, **state_kwargs)
|
||||
m.add_state(st.WAITING, **state_kwargs)
|
||||
m.add_state(st.FAILURE, terminal=True, **state_kwargs)
|
||||
m.default_start_state = UNDEFINED
|
||||
|
||||
m.add_transition(GAME_OVER, st.REVERTED, REVERTED)
|
||||
|
||||
@@ -25,6 +25,7 @@ import fasteners
|
||||
import networkx as nx
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import strutils
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
|
||||
from taskflow.engines.action_engine import builder
|
||||
@@ -178,6 +179,9 @@ class ActionEngine(base.Engine):
|
||||
self._retry_executor = executor.SerialRetryExecutor()
|
||||
self._inject_transient = strutils.bool_from_string(
|
||||
self._options.get('inject_transient', True))
|
||||
self._gather_statistics = strutils.bool_from_string(
|
||||
self._options.get('gather_statistics', True))
|
||||
self._statistics = {}
|
||||
|
||||
@_pre_check(check_compiled=True,
|
||||
# NOTE(harlowja): We can alter the state of the
|
||||
@@ -189,6 +193,10 @@ class ActionEngine(base.Engine):
|
||||
def suspend(self):
|
||||
self._change_state(states.SUSPENDING)
|
||||
|
||||
@property
|
||||
def statistics(self):
|
||||
return self._statistics
|
||||
|
||||
@property
|
||||
def compilation(self):
|
||||
"""The compilation result.
|
||||
@@ -270,9 +278,17 @@ class ActionEngine(base.Engine):
|
||||
maxlen=max(1, self.MAX_MACHINE_STATES_RETAINED))
|
||||
with _start_stop(self._task_executor, self._retry_executor):
|
||||
self._change_state(states.RUNNING)
|
||||
if self._gather_statistics:
|
||||
self._statistics.clear()
|
||||
w = timeutils.StopWatch()
|
||||
w.start()
|
||||
else:
|
||||
w = None
|
||||
try:
|
||||
closed = False
|
||||
machine, memory = self._runtime.builder.build(timeout=timeout)
|
||||
machine, memory = self._runtime.builder.build(
|
||||
self._statistics, timeout=timeout,
|
||||
gather_statistics=self._gather_statistics)
|
||||
r = runners.FiniteRunner(machine)
|
||||
for transition in r.run_iter(builder.START):
|
||||
last_transitions.append(transition)
|
||||
@@ -315,6 +331,10 @@ class ActionEngine(base.Engine):
|
||||
six.itervalues(failures),
|
||||
six.itervalues(more_failures))
|
||||
failure.Failure.reraise_if_any(fails)
|
||||
finally:
|
||||
if w is not None:
|
||||
w.stop()
|
||||
self._statistics['active_for'] = w.elapsed()
|
||||
|
||||
@staticmethod
|
||||
def _check_compilation(compilation):
|
||||
|
||||
@@ -80,6 +80,22 @@ class Engine(object):
|
||||
def storage(self):
|
||||
"""The storage unit for this engine."""
|
||||
|
||||
@abc.abstractproperty
|
||||
def statistics(self):
|
||||
"""A dictionary of runtime statistics this engine has gathered.
|
||||
|
||||
This dictionary will be empty when the engine has never been
|
||||
ran. When it is running or has ran previously it should have (but
|
||||
may not) have useful and/or informational keys and values when
|
||||
running is underway and/or completed.
|
||||
|
||||
.. warning:: The keys in this dictionary **should** be some what
|
||||
stable (not changing), but there existence **may**
|
||||
change between major releases as new statistics are
|
||||
gathered or removed so before accessing keys ensure that
|
||||
they actually exist and handle when they do not.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def compile(self):
|
||||
"""Compiles the contained flow into a internal representation.
|
||||
|
||||
@@ -88,6 +88,6 @@ try:
|
||||
e.prepare()
|
||||
print("Running...")
|
||||
e.run()
|
||||
print("Done...")
|
||||
print("Done: %s" % e.statistics)
|
||||
except exceptions.NotImplementedError as e:
|
||||
print(e)
|
||||
|
||||
@@ -91,6 +91,8 @@ else:
|
||||
with executor:
|
||||
e = engines.load(song, executor=executor, engine='parallel')
|
||||
e.run()
|
||||
print("-- Statistics gathered --")
|
||||
print(e.statistics)
|
||||
|
||||
|
||||
# Run in parallel using real threads...
|
||||
@@ -98,6 +100,8 @@ with futurist.ThreadPoolExecutor(max_workers=1) as executor:
|
||||
print("-- Running in parallel using threads --")
|
||||
e = engines.load(song, executor=executor, engine='parallel')
|
||||
e.run()
|
||||
print("-- Statistics gathered --")
|
||||
print(e.statistics)
|
||||
|
||||
|
||||
# Run in parallel using external processes...
|
||||
@@ -105,6 +109,8 @@ with futurist.ProcessPoolExecutor(max_workers=1) as executor:
|
||||
print("-- Running in parallel using processes --")
|
||||
e = engines.load(song, executor=executor, engine='parallel')
|
||||
e.run()
|
||||
print("-- Statistics gathered --")
|
||||
print(e.statistics)
|
||||
|
||||
|
||||
# Run serially (aka, if the workflow could have been ran in parallel, it will
|
||||
@@ -112,3 +118,5 @@ with futurist.ProcessPoolExecutor(max_workers=1) as executor:
|
||||
print("-- Running serially --")
|
||||
e = engines.load(song, engine='serial')
|
||||
e.run()
|
||||
print("-- Statistics gathered --")
|
||||
print(e.statistics)
|
||||
|
||||
@@ -56,7 +56,7 @@ class BuildersTest(test.TestCase):
|
||||
|
||||
def _make_machine(self, flow, initial_state=None):
|
||||
runtime = self._make_runtime(flow, initial_state=initial_state)
|
||||
machine, memory = runtime.builder.build()
|
||||
machine, memory = runtime.builder.build({})
|
||||
machine_runner = runners.FiniteRunner(machine)
|
||||
return (runtime, machine, memory, machine_runner)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user