Merge "Have the storage class provide a 'change_flow_state' method"
This commit is contained in:
@@ -171,7 +171,6 @@ class ActionEngine(base.Engine):
|
|||||||
self._compilation = None
|
self._compilation = None
|
||||||
self._compiler = compiler.PatternCompiler(flow)
|
self._compiler = compiler.PatternCompiler(flow)
|
||||||
self._lock = threading.RLock()
|
self._lock = threading.RLock()
|
||||||
self._state_lock = threading.RLock()
|
|
||||||
self._storage_ensured = False
|
self._storage_ensured = False
|
||||||
self._validated = False
|
self._validated = False
|
||||||
# Retries are not *currently* executed out of the engines process
|
# Retries are not *currently* executed out of the engines process
|
||||||
@@ -362,18 +361,15 @@ class ActionEngine(base.Engine):
|
|||||||
return compilation
|
return compilation
|
||||||
|
|
||||||
def _change_state(self, state):
|
def _change_state(self, state):
|
||||||
with self._state_lock:
|
moved, old_state = self.storage.change_flow_state(state)
|
||||||
old_state = self.storage.get_flow_state()
|
if moved:
|
||||||
if not states.check_flow_transition(old_state, state):
|
details = {
|
||||||
return
|
'engine': self,
|
||||||
self.storage.set_flow_state(state)
|
'flow_name': self.storage.flow_name,
|
||||||
details = {
|
'flow_uuid': self.storage.flow_uuid,
|
||||||
'engine': self,
|
'old_state': old_state,
|
||||||
'flow_name': self.storage.flow_name,
|
}
|
||||||
'flow_uuid': self.storage.flow_uuid,
|
self.notifier.notify(state, details)
|
||||||
'old_state': old_state,
|
|
||||||
}
|
|
||||||
self.notifier.notify(state, details)
|
|
||||||
|
|
||||||
def _ensure_storage(self):
|
def _ensure_storage(self):
|
||||||
"""Ensure all contained atoms exist in the storage unit."""
|
"""Ensure all contained atoms exist in the storage unit."""
|
||||||
|
|||||||
@@ -1117,6 +1117,21 @@ class Storage(object):
|
|||||||
clone.meta.update(update_with)
|
clone.meta.update(update_with)
|
||||||
self._with_connection(self._save_flow_detail, source, clone)
|
self._with_connection(self._save_flow_detail, source, clone)
|
||||||
|
|
||||||
|
@fasteners.write_locked
|
||||||
|
def change_flow_state(self, state):
|
||||||
|
"""Transition flow from old state to new state.
|
||||||
|
|
||||||
|
Returns ``(True, old_state)`` if transition was performed,
|
||||||
|
or ``(False, old_state)`` if it was ignored, or raises a
|
||||||
|
:py:class:`~taskflow.exceptions.InvalidState` exception if transition
|
||||||
|
is invalid.
|
||||||
|
"""
|
||||||
|
old_state = self.get_flow_state()
|
||||||
|
if not states.check_flow_transition(old_state, state):
|
||||||
|
return (False, old_state)
|
||||||
|
self.set_flow_state(state)
|
||||||
|
return (True, old_state)
|
||||||
|
|
||||||
@fasteners.read_locked
|
@fasteners.read_locked
|
||||||
def get_flow_state(self):
|
def get_flow_state(self):
|
||||||
"""Get state from flow details."""
|
"""Get state from flow details."""
|
||||||
|
|||||||
Reference in New Issue
Block a user