Protect storage better against external concurrent access

Lock down the various state machine action handling
functions so that they are ensured correct (and consistent)
access to the storage layer when they are modifiying and/or
reading it.

Change-Id: Ie893a44aa963ab515f19e77f9904f49c843cb4e5
This commit is contained in:
Joshua Harlow
2016-06-14 17:13:37 -07:00
committed by ChangBo Guo(gcb)
parent b3b659f38b
commit 22f75755b7
2 changed files with 62 additions and 44 deletions

View File

@@ -143,10 +143,11 @@ class MachineBuilder(object):
get_atom_intention = self._storage.get_atom_intention
def do_schedule(next_nodes):
return self._scheduler.schedule(
sorted(next_nodes,
key=lambda node: getattr(node, 'priority', 0),
reverse=True))
with self._storage.lock.write_lock():
return self._scheduler.schedule(
sorted(next_nodes,
key=lambda node: getattr(node, 'priority', 0),
reverse=True))
def iter_next_atoms(atom=None, apply_deciders=True):
# Yields and filters and tweaks the next atoms to run...
@@ -164,9 +165,10 @@ class MachineBuilder(object):
# to include any nodes that need to be executed (from a previous
# attempt, which may be empty if never ran before) and any nodes
# that are now ready to be ran.
memory.next_up.update(
iter_utils.unique_seen((self._completer.resume(),
iter_next_atoms())))
with self._storage.lock.write_lock():
memory.next_up.update(
iter_utils.unique_seen((self._completer.resume(),
iter_next_atoms())))
return SCHEDULE
def game_over(old_state, new_state, event):
@@ -176,11 +178,12 @@ class MachineBuilder(object):
# it is *always* called before the final state is entered.
if memory.failures:
return FAILED
leftover_atoms = iter_utils.count(
# Avoid activating the deciders, since at this point
# the engine is finishing and there will be no more further
# work done anyway...
iter_next_atoms(apply_deciders=False))
with self._storage.lock.read_lock():
leftover_atoms = iter_utils.count(
# Avoid activating the deciders, since at this point
# the engine is finishing and there will be no more further
# work done anyway...
iter_next_atoms(apply_deciders=False))
if leftover_atoms:
# Ok we didn't finish (either reverting or executing...) so
# that means we must of been stopped at some point...
@@ -199,21 +202,22 @@ class MachineBuilder(object):
# if the user of this engine has requested the engine/storage
# that holds this information to stop or suspend); handles failures
# that occur during this process safely...
current_flow_state = self._storage.get_flow_state()
if current_flow_state == st.RUNNING and memory.next_up:
not_done, failures = do_schedule(memory.next_up)
if not_done:
memory.not_done.update(not_done)
if failures:
memory.failures.extend(failures)
memory.next_up.intersection_update(not_done)
elif current_flow_state == st.SUSPENDING and memory.not_done:
# Try to force anything not cancelled to now be cancelled
# so that the executor that gets it does not continue to
# try to work on it (if the future execution is still in
# its backlog, if it's already being executed, this will
# do nothing).
memory.cancel_futures()
with self._storage.lock.write_lock():
current_flow_state = self._storage.get_flow_state()
if current_flow_state == st.RUNNING and memory.next_up:
not_done, failures = do_schedule(memory.next_up)
if not_done:
memory.not_done.update(not_done)
if failures:
memory.failures.extend(failures)
memory.next_up.intersection_update(not_done)
elif current_flow_state == st.SUSPENDING and memory.not_done:
# Try to force anything not cancelled to now be cancelled
# so that the executor that gets it does not continue to
# try to work on it (if the future execution is still in
# its backlog, if it's already being executed, this will
# do nothing).
memory.cancel_futures()
return WAIT
def complete_an_atom(fut):
@@ -277,23 +281,25 @@ class MachineBuilder(object):
# nodes to be scheduled in the future); handles failures that
# occur during this process safely...
next_up = set()
while memory.done:
fut = memory.done.pop()
# Force it to be completed so that we can ensure that
# before we iterate over any successors or predecessors
# that we know it has been completed and saved and so on...
completion_status = complete_an_atom(fut)
if (not memory.failures
and completion_status != WAS_CANCELLED):
atom = fut.atom
try:
more_work = set(iter_next_atoms(atom=atom))
except Exception:
memory.failures.append(failure.Failure())
LOG.exception("Engine '%s' atom post-completion"
" next atom searching failed", atom)
else:
next_up.update(more_work)
with self._storage.lock.write_lock():
while memory.done:
fut = memory.done.pop()
# Force it to be completed so that we can ensure that
# before we iterate over any successors or predecessors
# that we know it has been completed and saved and so on...
completion_status = complete_an_atom(fut)
if (not memory.failures
and completion_status != WAS_CANCELLED):
atom = fut.atom
try:
more_work = set(iter_next_atoms(atom=atom))
except Exception:
memory.failures.append(failure.Failure())
LOG.exception(
"Engine '%s' atom post-completion"
" next atom searching failed", atom)
else:
next_up.update(more_work)
current_flow_state = self._storage.get_flow_state()
if (current_flow_state == st.RUNNING
and next_up and not memory.failures):

View File

@@ -407,6 +407,18 @@ class Storage(object):
self._failures.setdefault(atom_name, {})
return atom_ids
@property
def lock(self):
"""Reader/writer lock used to ensure multi-thread safety.
This does **not** protect against the **same** storage objects being
used by multiple engines/users across multiple processes (or
different machines); certain backends handle that situation better
than others (for example by using sequence identifiers) and it's a
ongoing work in progress to make that better).
"""
return self._lock
def ensure_atom(self, atom):
"""Ensure there is an atomdetail for the **given** atom.