diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py index 06663071..16835013 100644 --- a/taskflow/engines/action_engine/builder.py +++ b/taskflow/engines/action_engine/builder.py @@ -143,10 +143,11 @@ class MachineBuilder(object): get_atom_intention = self._storage.get_atom_intention def do_schedule(next_nodes): - return self._scheduler.schedule( - sorted(next_nodes, - key=lambda node: getattr(node, 'priority', 0), - reverse=True)) + with self._storage.lock.write_lock(): + return self._scheduler.schedule( + sorted(next_nodes, + key=lambda node: getattr(node, 'priority', 0), + reverse=True)) def iter_next_atoms(atom=None, apply_deciders=True): # Yields and filters and tweaks the next atoms to run... @@ -164,9 +165,10 @@ class MachineBuilder(object): # to include any nodes that need to be executed (from a previous # attempt, which may be empty if never ran before) and any nodes # that are now ready to be ran. - memory.next_up.update( - iter_utils.unique_seen((self._completer.resume(), - iter_next_atoms()))) + with self._storage.lock.write_lock(): + memory.next_up.update( + iter_utils.unique_seen((self._completer.resume(), + iter_next_atoms()))) return SCHEDULE def game_over(old_state, new_state, event): @@ -176,11 +178,12 @@ class MachineBuilder(object): # it is *always* called before the final state is entered. if memory.failures: return FAILED - leftover_atoms = iter_utils.count( - # Avoid activating the deciders, since at this point - # the engine is finishing and there will be no more further - # work done anyway... - iter_next_atoms(apply_deciders=False)) + with self._storage.lock.read_lock(): + leftover_atoms = iter_utils.count( + # Avoid activating the deciders, since at this point + # the engine is finishing and there will be no more further + # work done anyway... + iter_next_atoms(apply_deciders=False)) if leftover_atoms: # Ok we didn't finish (either reverting or executing...) so # that means we must of been stopped at some point... @@ -199,21 +202,22 @@ class MachineBuilder(object): # if the user of this engine has requested the engine/storage # that holds this information to stop or suspend); handles failures # that occur during this process safely... - current_flow_state = self._storage.get_flow_state() - if current_flow_state == st.RUNNING and memory.next_up: - not_done, failures = do_schedule(memory.next_up) - if not_done: - memory.not_done.update(not_done) - if failures: - memory.failures.extend(failures) - memory.next_up.intersection_update(not_done) - elif current_flow_state == st.SUSPENDING and memory.not_done: - # Try to force anything not cancelled to now be cancelled - # so that the executor that gets it does not continue to - # try to work on it (if the future execution is still in - # its backlog, if it's already being executed, this will - # do nothing). - memory.cancel_futures() + with self._storage.lock.write_lock(): + current_flow_state = self._storage.get_flow_state() + if current_flow_state == st.RUNNING and memory.next_up: + not_done, failures = do_schedule(memory.next_up) + if not_done: + memory.not_done.update(not_done) + if failures: + memory.failures.extend(failures) + memory.next_up.intersection_update(not_done) + elif current_flow_state == st.SUSPENDING and memory.not_done: + # Try to force anything not cancelled to now be cancelled + # so that the executor that gets it does not continue to + # try to work on it (if the future execution is still in + # its backlog, if it's already being executed, this will + # do nothing). + memory.cancel_futures() return WAIT def complete_an_atom(fut): @@ -277,23 +281,25 @@ class MachineBuilder(object): # nodes to be scheduled in the future); handles failures that # occur during this process safely... next_up = set() - while memory.done: - fut = memory.done.pop() - # Force it to be completed so that we can ensure that - # before we iterate over any successors or predecessors - # that we know it has been completed and saved and so on... - completion_status = complete_an_atom(fut) - if (not memory.failures - and completion_status != WAS_CANCELLED): - atom = fut.atom - try: - more_work = set(iter_next_atoms(atom=atom)) - except Exception: - memory.failures.append(failure.Failure()) - LOG.exception("Engine '%s' atom post-completion" - " next atom searching failed", atom) - else: - next_up.update(more_work) + with self._storage.lock.write_lock(): + while memory.done: + fut = memory.done.pop() + # Force it to be completed so that we can ensure that + # before we iterate over any successors or predecessors + # that we know it has been completed and saved and so on... + completion_status = complete_an_atom(fut) + if (not memory.failures + and completion_status != WAS_CANCELLED): + atom = fut.atom + try: + more_work = set(iter_next_atoms(atom=atom)) + except Exception: + memory.failures.append(failure.Failure()) + LOG.exception( + "Engine '%s' atom post-completion" + " next atom searching failed", atom) + else: + next_up.update(more_work) current_flow_state = self._storage.get_flow_state() if (current_flow_state == st.RUNNING and next_up and not memory.failures): diff --git a/taskflow/storage.py b/taskflow/storage.py index be52774d..76d059b6 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -407,6 +407,18 @@ class Storage(object): self._failures.setdefault(atom_name, {}) return atom_ids + @property + def lock(self): + """Reader/writer lock used to ensure multi-thread safety. + + This does **not** protect against the **same** storage objects being + used by multiple engines/users across multiple processes (or + different machines); certain backends handle that situation better + than others (for example by using sequence identifiers) and it's a + ongoing work in progress to make that better). + """ + return self._lock + def ensure_atom(self, atom): """Ensure there is an atomdetail for the **given** atom.