From a8d9ae76fe5492e87525d5cddeeb33f6b81f2096 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 13 Aug 2015 20:11:40 -0700 Subject: [PATCH] Have the storage class provide a 'change_flow_state' method Instead of needing the engine method to lock, fetch, check and then update the flow state it is more appropriate if the storage class does all these actions itself (therefore hiding this change from the engine code, making the engine code that much smaller). Change-Id: I2a289c2bcabe76728fa8eb26265ce168abf81b7c --- taskflow/engines/action_engine/engine.py | 22 +++++++++------------- taskflow/storage.py | 15 +++++++++++++++ 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index cc6b1ac4..1b0a484b 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -95,7 +95,6 @@ class ActionEngine(base.Engine): self._compilation = None self._compiler = compiler.PatternCompiler(flow) self._lock = threading.RLock() - self._state_lock = threading.RLock() self._storage_ensured = False # Retries are not *currently* executed out of the engines process # or thread (this could change in the future if we desire it to). @@ -223,18 +222,15 @@ class ActionEngine(base.Engine): failure.Failure.reraise_if_any(it) def _change_state(self, state): - with self._state_lock: - old_state = self.storage.get_flow_state() - if not states.check_flow_transition(old_state, state): - return - self.storage.set_flow_state(state) - details = { - 'engine': self, - 'flow_name': self.storage.flow_name, - 'flow_uuid': self.storage.flow_uuid, - 'old_state': old_state, - } - self.notifier.notify(state, details) + moved, old_state = self.storage.change_flow_state(state) + if moved: + details = { + 'engine': self, + 'flow_name': self.storage.flow_name, + 'flow_uuid': self.storage.flow_uuid, + 'old_state': old_state, + } + self.notifier.notify(state, details) def _ensure_storage(self): """Ensure all contained atoms exist in the storage unit.""" diff --git a/taskflow/storage.py b/taskflow/storage.py index cab68f6d..943d695a 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -1020,6 +1020,21 @@ class Storage(object): clone.meta.update(update_with) self._with_connection(self._save_flow_detail, source, clone) + @fasteners.write_locked + def change_flow_state(self, state): + """Transition flow from old state to new state. + + Returns ``(True, old_state)`` if transition was performed, + or ``(False, old_state)`` if it was ignored, or raises a + :py:class:`~taskflow.exceptions.InvalidState` exception if transition + is invalid. + """ + old_state = self.get_flow_state() + if not states.check_flow_transition(old_state, state): + return (False, old_state) + self.set_flow_state(state) + return (True, old_state) + @fasteners.read_locked def get_flow_state(self): """Get state from flow details."""