Have the storage class provide a 'change_flow_state' method

Instead of needing the engine method to lock, fetch, check
and then update the flow state it is more appropriate if the
storage class does all these actions itself (therefore hiding
this change from the engine code, making the engine code
that much smaller).

Change-Id: I2a289c2bcabe76728fa8eb26265ce168abf81b7c
This commit is contained in:
Joshua Harlow
2015-08-13 20:11:40 -07:00
parent 42837b0dfa
commit a8d9ae76fe
2 changed files with 24 additions and 13 deletions

View File

@@ -95,7 +95,6 @@ class ActionEngine(base.Engine):
self._compilation = None
self._compiler = compiler.PatternCompiler(flow)
self._lock = threading.RLock()
self._state_lock = threading.RLock()
self._storage_ensured = False
# Retries are not *currently* executed out of the engines process
# or thread (this could change in the future if we desire it to).
@@ -223,18 +222,15 @@ class ActionEngine(base.Engine):
failure.Failure.reraise_if_any(it)
def _change_state(self, state):
with self._state_lock:
old_state = self.storage.get_flow_state()
if not states.check_flow_transition(old_state, state):
return
self.storage.set_flow_state(state)
details = {
'engine': self,
'flow_name': self.storage.flow_name,
'flow_uuid': self.storage.flow_uuid,
'old_state': old_state,
}
self.notifier.notify(state, details)
moved, old_state = self.storage.change_flow_state(state)
if moved:
details = {
'engine': self,
'flow_name': self.storage.flow_name,
'flow_uuid': self.storage.flow_uuid,
'old_state': old_state,
}
self.notifier.notify(state, details)
def _ensure_storage(self):
"""Ensure all contained atoms exist in the storage unit."""

View File

@@ -1020,6 +1020,21 @@ class Storage(object):
clone.meta.update(update_with)
self._with_connection(self._save_flow_detail, source, clone)
@fasteners.write_locked
def change_flow_state(self, state):
"""Transition flow from old state to new state.
Returns ``(True, old_state)`` if transition was performed,
or ``(False, old_state)`` if it was ignored, or raises a
:py:class:`~taskflow.exceptions.InvalidState` exception if transition
is invalid.
"""
old_state = self.get_flow_state()
if not states.check_flow_transition(old_state, state):
return (False, old_state)
self.set_flow_state(state)
return (True, old_state)
@fasteners.read_locked
def get_flow_state(self):
"""Get state from flow details."""