diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 59071b6b..721f6437 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -171,7 +171,6 @@ class ActionEngine(base.Engine): self._compilation = None self._compiler = compiler.PatternCompiler(flow) self._lock = threading.RLock() - self._state_lock = threading.RLock() self._storage_ensured = False self._validated = False # Retries are not *currently* executed out of the engines process @@ -362,18 +361,15 @@ class ActionEngine(base.Engine): return compilation def _change_state(self, state): - with self._state_lock: - old_state = self.storage.get_flow_state() - if not states.check_flow_transition(old_state, state): - return - self.storage.set_flow_state(state) - details = { - 'engine': self, - 'flow_name': self.storage.flow_name, - 'flow_uuid': self.storage.flow_uuid, - 'old_state': old_state, - } - self.notifier.notify(state, details) + moved, old_state = self.storage.change_flow_state(state) + if moved: + details = { + 'engine': self, + 'flow_name': self.storage.flow_name, + 'flow_uuid': self.storage.flow_uuid, + 'old_state': old_state, + } + self.notifier.notify(state, details) def _ensure_storage(self): """Ensure all contained atoms exist in the storage unit.""" diff --git a/taskflow/storage.py b/taskflow/storage.py index dde1f2cd..9cd069fe 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -1117,6 +1117,21 @@ class Storage(object): clone.meta.update(update_with) self._with_connection(self._save_flow_detail, source, clone) + @fasteners.write_locked + def change_flow_state(self, state): + """Transition flow from old state to new state. + + Returns ``(True, old_state)`` if transition was performed, + or ``(False, old_state)`` if it was ignored, or raises a + :py:class:`~taskflow.exceptions.InvalidState` exception if transition + is invalid. + """ + old_state = self.get_flow_state() + if not states.check_flow_transition(old_state, state): + return (False, old_state) + self.set_flow_state(state) + return (True, old_state) + @fasteners.read_locked def get_flow_state(self): """Get state from flow details."""