Merge "Use same code to reset flow and parts of it"
This commit is contained in:
@@ -82,13 +82,16 @@ class ActionEngine(base.EngineBase):
|
||||
@lock_utils.locked
|
||||
def run(self):
|
||||
"""Runs the flow in the engine to completion."""
|
||||
if self.storage.get_flow_state() == states.REVERTED:
|
||||
self._reset()
|
||||
self.compile()
|
||||
external_provides = set(self.storage.fetch_all().keys())
|
||||
missing = self._flow.requires - external_provides
|
||||
if missing:
|
||||
raise exc.MissingDependencies(self._flow, sorted(missing))
|
||||
|
||||
if self.storage.get_flow_state() == states.REVERTED:
|
||||
self._root.reset_all()
|
||||
self._change_state(states.PENDING)
|
||||
|
||||
self._task_executor.start()
|
||||
try:
|
||||
self._run()
|
||||
@@ -129,15 +132,6 @@ class ActionEngine(base.EngineBase):
|
||||
old_state=old_state)
|
||||
self.notifier.notify(state, details)
|
||||
|
||||
def _reset(self):
|
||||
for name, uuid in self.storage.reset_tasks():
|
||||
details = dict(engine=self,
|
||||
task_name=name,
|
||||
task_uuid=uuid,
|
||||
result=None)
|
||||
self.task_notifier.notify(states.PENDING, details)
|
||||
self._change_state(states.PENDING)
|
||||
|
||||
def _ensure_storage_for(self, execution_graph):
|
||||
# NOTE(harlowja): signal to the tasks that exist that we are about to
|
||||
# resume, if they have a previous state, they will now transition to
|
||||
|
||||
@@ -169,9 +169,17 @@ class FutureGraphAction(object):
|
||||
next_nodes.add(node)
|
||||
return next_nodes
|
||||
|
||||
def reset_all(self):
|
||||
self._retry_subflow(None)
|
||||
|
||||
def _retry_subflow(self, retry):
|
||||
self._storage.set_atom_intention(retry.name, st.EXECUTE)
|
||||
for node in self._analyzer.iterate_subgraph(retry):
|
||||
if retry is not None:
|
||||
self._storage.set_atom_intention(retry.name, st.EXECUTE)
|
||||
nodes_iter = self._analyzer.iterate_subgraph(retry)
|
||||
else:
|
||||
nodes_iter = self._analyzer.iterate_all_nodes()
|
||||
|
||||
for node in nodes_iter:
|
||||
if isinstance(node, task.BaseTask):
|
||||
self._task_action.change_state(node, st.PENDING, progress=0.0)
|
||||
else:
|
||||
|
||||
@@ -378,24 +378,6 @@ class Storage(object):
|
||||
if self._reset_task(td, state):
|
||||
self._with_connection(self._save_task_detail, td)
|
||||
|
||||
def reset_tasks(self):
|
||||
"""Reset all tasks to PENDING state, removing results.
|
||||
|
||||
Returns list of (name, uuid) tuples for all tasks that were reset.
|
||||
"""
|
||||
reset_results = []
|
||||
|
||||
def do_reset_all(connection):
|
||||
for td in self._flowdetail:
|
||||
if self._reset_task(td, states.PENDING):
|
||||
self._save_task_detail(connection, td)
|
||||
reset_results.append((td.name, td.uuid))
|
||||
|
||||
with self._lock.write_lock():
|
||||
self._with_connection(do_reset_all)
|
||||
|
||||
return reset_results
|
||||
|
||||
def inject(self, pairs):
|
||||
"""Add values into storage.
|
||||
|
||||
|
||||
@@ -186,34 +186,6 @@ class StorageTestMixin(object):
|
||||
s.ensure_task('my task')
|
||||
self.assertEqual(s.reset('my task'), None)
|
||||
|
||||
def test_reset_tasks(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.save('my task', 5)
|
||||
s.ensure_task('my other task')
|
||||
s.save('my other task', 7)
|
||||
|
||||
s.reset_tasks()
|
||||
|
||||
self.assertEqual(s.get_task_state('my task'), states.PENDING)
|
||||
self.assertRaises(exceptions.NotFound, s.get, 'my task')
|
||||
self.assertEqual(s.get_task_state('my other task'), states.PENDING)
|
||||
self.assertRaises(exceptions.NotFound, s.get, 'my other task')
|
||||
|
||||
def test_reset_tasks_does_not_breaks_inject(self):
|
||||
s = self._get_storage()
|
||||
s.inject({'foo': 'bar', 'spam': 'eggs'})
|
||||
|
||||
# NOTE(imelnikov): injecting is implemented as special task
|
||||
# so resetting tasks may break it if implemented incorrectly.
|
||||
s.reset_tasks()
|
||||
|
||||
self.assertEqual(s.fetch('spam'), 'eggs')
|
||||
self.assertEqual(s.fetch_all(), {
|
||||
'foo': 'bar',
|
||||
'spam': 'eggs',
|
||||
})
|
||||
|
||||
def test_fetch_by_name(self):
|
||||
s = self._get_storage()
|
||||
name = 'my result'
|
||||
@@ -351,32 +323,6 @@ class StorageTestMixin(object):
|
||||
# Only one task should have been made, no more.
|
||||
self.assertEqual(1, len(s._flowdetail))
|
||||
|
||||
def test_many_thread_one_reset(self):
|
||||
s = self._get_storage(threaded=True)
|
||||
s.ensure_task('a')
|
||||
s.set_task_state('a', states.SUSPENDED)
|
||||
s.ensure_task('b')
|
||||
s.set_task_state('b', states.SUSPENDED)
|
||||
|
||||
results = []
|
||||
result_lock = threading.Lock()
|
||||
|
||||
def reset_all():
|
||||
r = s.reset_tasks()
|
||||
with result_lock:
|
||||
results.append(r)
|
||||
|
||||
threads = []
|
||||
for i in range(0, self.thread_count):
|
||||
threads.append(threading.Thread(target=reset_all))
|
||||
|
||||
self._run_many_threads(threads)
|
||||
|
||||
# Only one thread should have actually reset (not anymore)
|
||||
results = [r for r in results if len(r)]
|
||||
self.assertEqual(1, len(results))
|
||||
self.assertEqual(['a', 'b'], sorted([a[0] for a in results[0]]))
|
||||
|
||||
def test_many_thread_inject(self):
|
||||
s = self._get_storage(threaded=True)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user