Merge "Use batch 'get_atoms_states' where we can"
This commit is contained in:
@@ -135,7 +135,7 @@ class Analyzer(object):
|
||||
if atom is None:
|
||||
return iter_utils.unique_seen(self.browse_atoms_for_execute(),
|
||||
self.browse_atoms_for_revert())
|
||||
state = self.get_state(atom)
|
||||
state = self._storage.get_atom_state(atom.name)
|
||||
intention = self._storage.get_atom_intention(atom.name)
|
||||
if state == st.SUCCESS:
|
||||
if intention == st.REVERT:
|
||||
@@ -195,7 +195,7 @@ class Analyzer(object):
|
||||
def _get_maybe_ready(self, atom, transition_to, allowed_intentions,
|
||||
connected_fetcher, connected_checker,
|
||||
decider_fetcher):
|
||||
state = self.get_state(atom)
|
||||
state = self._storage.get_atom_state(atom.name)
|
||||
ok_to_transition = self._runtime.check_atom_transition(atom, state,
|
||||
transition_to)
|
||||
if not ok_to_transition:
|
||||
@@ -261,8 +261,15 @@ class Analyzer(object):
|
||||
|
||||
If no state is provided it will yield back all retry atoms.
|
||||
"""
|
||||
for atom in self.iterate_nodes((co.RETRY,)):
|
||||
if not state or self.get_state(atom) == state:
|
||||
if state:
|
||||
atoms = list(self.iterate_nodes((co.RETRY,)))
|
||||
atom_states = self._storage.get_atoms_states(atom.name
|
||||
for atom in atoms)
|
||||
for atom in atoms:
|
||||
if atom_states[atom.name][0] == state:
|
||||
yield atom
|
||||
else:
|
||||
for atom in self.iterate_nodes((co.RETRY,)):
|
||||
yield atom
|
||||
|
||||
def iterate_nodes(self, allowed_kinds):
|
||||
@@ -277,14 +284,13 @@ class Analyzer(object):
|
||||
|
||||
def is_success(self):
|
||||
"""Checks if all atoms in the execution graph are in 'happy' state."""
|
||||
for atom in self.iterate_nodes(co.ATOMS):
|
||||
atom_state = self.get_state(atom)
|
||||
atoms = list(self.iterate_nodes(co.ATOMS))
|
||||
atom_states = self._storage.get_atoms_states(atom.name
|
||||
for atom in atoms)
|
||||
for atom in atoms:
|
||||
atom_state = atom_states[atom.name][0]
|
||||
if atom_state == st.IGNORE:
|
||||
continue
|
||||
if atom_state != st.SUCCESS:
|
||||
return False
|
||||
return True
|
||||
|
||||
def get_state(self, atom):
|
||||
"""Gets the state of a given atom (from the backend storage unit)."""
|
||||
return self._storage.get_atom_state(atom.name)
|
||||
|
@@ -139,14 +139,21 @@ class Completer(object):
|
||||
atoms that were previously not finished (due to a RUNNING or REVERTING
|
||||
attempt not previously finishing).
|
||||
"""
|
||||
for atom in self._analyzer.iterate_nodes(co.ATOMS):
|
||||
if self._analyzer.get_state(atom) == st.FAILURE:
|
||||
atoms = list(self._analyzer.iterate_nodes(co.ATOMS))
|
||||
atom_states = self._storage.get_atoms_states(atom.name
|
||||
for atom in atoms)
|
||||
for atom in atoms:
|
||||
atom_state = atom_states[atom.name][0]
|
||||
if atom_state == st.FAILURE:
|
||||
self._process_atom_failure(atom, self._storage.get(atom.name))
|
||||
for retry in self._analyzer.iterate_retries(st.RETRYING):
|
||||
self._runtime.retry_subflow(retry)
|
||||
for atom, state, intention in self._runtime.retry_subflow(retry):
|
||||
if state:
|
||||
atom_states[atom.name] = (state, intention)
|
||||
unfinished_atoms = set()
|
||||
for atom in self._analyzer.iterate_nodes(co.ATOMS):
|
||||
if self._analyzer.get_state(atom) in (st.RUNNING, st.REVERTING):
|
||||
for atom in atoms:
|
||||
atom_state = atom_states[atom.name][0]
|
||||
if atom_state in (st.RUNNING, st.REVERTING):
|
||||
unfinished_atoms.add(atom)
|
||||
return unfinished_atoms
|
||||
|
||||
|
@@ -249,5 +249,6 @@ class Runtime(object):
|
||||
subgraph (its successors) to the ``PENDING`` state with an ``EXECUTE``
|
||||
intention.
|
||||
"""
|
||||
self.storage.set_atom_intention(retry.name, st.EXECUTE)
|
||||
self.reset_subgraph(retry)
|
||||
tweaked = self.reset_atoms([retry], state=None, intention=st.EXECUTE)
|
||||
tweaked.extend(self.reset_subgraph(retry))
|
||||
return tweaked
|
||||
|
Reference in New Issue
Block a user