Add retry action to execute retries

Change-Id: I093a49f88d9eecea5eb88fcecad9f7825cd7cb33
This commit is contained in:
Anastasia Karpinska
2014-01-21 13:14:48 +02:00
parent 54f459f6f2
commit f15707eb0b
5 changed files with 208 additions and 13 deletions

View File

@@ -19,11 +19,13 @@ import threading
from taskflow.engines.action_engine import executor
from taskflow.engines.action_engine import graph_action
from taskflow.engines.action_engine import graph_analyzer
from taskflow.engines.action_engine import retry_action
from taskflow.engines.action_engine import task_action
from taskflow.engines import base
from taskflow import exceptions as exc
from taskflow.openstack.common import excutils
from taskflow import retry
from taskflow import states
from taskflow import storage as t_storage
@@ -50,6 +52,7 @@ class ActionEngine(base.EngineBase):
_graph_analyzer_cls = graph_analyzer.GraphAnalyzer
_task_action_cls = task_action.TaskAction
_task_executor_cls = executor.SerialTaskExecutor
_retry_action_cls = retry_action.RetryAction
def __init__(self, flow, flow_detail, backend, conf):
super(ActionEngine, self).__init__(flow, flow_detail, backend, conf)
@@ -60,6 +63,7 @@ class ActionEngine(base.EngineBase):
self._state_lock = threading.RLock()
self._task_executor = None
self._task_action = None
self._retry_action = None
def _revert(self, current_failure=None):
self._change_state(states.REVERTING)
@@ -150,24 +154,27 @@ class ActionEngine(base.EngineBase):
self.task_notifier.notify(states.PENDING, details)
self._change_state(states.PENDING)
def _ensure_storage_for(self, task_graph):
def _ensure_storage_for(self, execution_graph):
# NOTE(harlowja): signal to the tasks that exist that we are about to
# resume, if they have a previous state, they will now transition to
# a resuming state (and then to suspended).
self._change_state(states.RESUMING) # does nothing in PENDING state
for task in task_graph.nodes_iter():
task_version = misc.get_version_string(task)
self.storage.ensure_task(task.name, task_version, task.save_as)
for node in execution_graph.nodes_iter():
version = misc.get_version_string(node)
if isinstance(node, retry.Retry):
self.storage.ensure_retry(node.name, version, node.save_as)
else:
self.storage.ensure_task(node.name, version, node.save_as)
self._change_state(states.SUSPENDED) # does nothing in PENDING state
@lock_utils.locked
def compile(self):
if self._compiled:
return
task_graph = flow_utils.flatten(self._flow)
if task_graph.number_of_nodes() == 0:
execution_graph = flow_utils.flatten(self._flow)
if execution_graph.number_of_nodes() == 0:
raise exc.EmptyFlow("Flow %s is empty." % self._flow.name)
self._analyzer = self._graph_analyzer_cls(task_graph,
self._analyzer = self._graph_analyzer_cls(execution_graph,
self.storage)
if self._task_executor is None:
self._task_executor = self._task_executor_cls()
@@ -175,15 +182,19 @@ class ActionEngine(base.EngineBase):
self._task_action = self._task_action_cls(self.storage,
self._task_executor,
self.task_notifier)
if self._retry_action is None:
self._retry_action = self._retry_action_cls(self.storage,
self.task_notifier)
self._root = self._graph_action_cls(self._analyzer,
self.storage,
self._task_action)
self._task_action,
self._retry_action)
# NOTE(harlowja): Perform initial state manipulation and setup.
#
# TODO(harlowja): This doesn't seem like it should be in a compilation
# function since compilation seems like it should not modify any
# external state.
self._ensure_storage_for(task_graph)
self._ensure_storage_for(execution_graph)
self._compiled = True

View File

@@ -15,6 +15,7 @@
# under the License.
from taskflow import states as st
from taskflow import task
from taskflow.utils import misc
@@ -29,10 +30,11 @@ class FutureGraphAction(object):
in parallel, this enables parallel flow run and reversion.
"""
def __init__(self, analyzer, storage, task_action):
def __init__(self, analyzer, storage, task_action, retry_action):
self._analyzer = analyzer
self._storage = storage
self._task_action = task_action
self._retry_action = retry_action
def is_running(self):
return self._storage.get_flow_state() == st.RUNNING
@@ -45,6 +47,7 @@ class FutureGraphAction(object):
self.is_running,
self._task_action.schedule_execution,
self._task_action.complete_execution,
self._retry_action.execute,
self._analyzer.browse_nodes_for_execute)
return st.SUSPENDED if was_suspended else st.SUCCESS
@@ -53,14 +56,21 @@ class FutureGraphAction(object):
self.is_reverting,
self._task_action.schedule_reversion,
self._task_action.complete_reversion,
self._retry_action.revert,
self._analyzer.browse_nodes_for_revert)
return st.SUSPENDED if was_suspended else st.REVERTED
def _run(self, running, schedule_node, complete_node, get_next_nodes):
def _run(self, running, schedule_task, complete_task,
complete_retry, get_next_nodes):
def schedule(nodes, not_done):
for node in nodes:
future = schedule_node(node)
if isinstance(node, task.BaseTask):
future = schedule_task(node)
else:
# Retry controller is always executed immediately in the
# main thread and it should not be scheduled.
future = complete_retry(node)
if future is not None:
not_done.append(future)
else:
@@ -82,7 +92,8 @@ class FutureGraphAction(object):
# NOTE(harlowja): event will be used in the future for smart
# reversion (ignoring it for now).
node, _event, result = future.result()
complete_node(node, result)
if isinstance(node, task.BaseTask):
complete_task(node, result)
if isinstance(result, misc.Failure):
failures.append(result)
else:

View File

@@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from taskflow import states
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE)
class RetryAction(object):
def __init__(self, storage, notifier):
self._storage = storage
self._notifier = notifier
def _get_retry_args(self, retry):
kwargs = self._storage.fetch_mapped_args(retry.rebind)
kwargs['history'] = self._storage.get_retry_history(retry.name)
return kwargs
def _change_state(self, retry, state, result=None):
old_state = self._storage.get_task_state(retry.name)
if not states.check_task_transition(old_state, state):
return False
if state in SAVE_RESULT_STATES:
self._storage.save(retry.name, result, state)
elif state == states.REVERTED:
self.storage.cleanup_retry_history(retry.name, state)
else:
self._storage.set_task_state(retry.name, state)
retry_uuid = self._storage.get_task_uuid(retry.name)
details = dict(retry_name=retry.name,
retry_uuid=retry_uuid,
result=result)
self._notifier.notify(state, details)
return True
def execute(self, retry):
if not self._change_state(retry, states.RUNNING):
return
kwargs = self._get_retry_args(retry)
try:
result = retry.execute(**kwargs)
except Exception:
result = misc.Failure()
self._change_state(retry, states.FAILURE, result=result)
else:
self._change_state(retry, states.SUCCESS, result=result)
def revert(self, retry):
if not self._change_state(retry, states.REVERTING):
return
kwargs = self._get_retry_args(retry)
kwargs['flow_failures'] = self._storage.get_failures()
try:
retry.revert(**kwargs)
except Exception:
self._change_state(retry, states.FAILURE)
else:
self._change_state(retry, states.REVERTED)

View File

@@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
import taskflow.engines
from taskflow import test
from taskflow.tests import utils
from taskflow.utils import eventlet_utils as eu
class RetryTest(utils.EngineTestBase):
def test_run_empty_linear_flow(self):
flow = lf.Flow('flow-1', utils.OneReturnRetry(provides='x'))
engine = self._make_engine(flow)
engine.run()
self.assertEqual(engine.storage.fetch_all(), {'x': 1})
def test_run_empty_unordered_flow(self):
flow = uf.Flow('flow-1', utils.OneReturnRetry(provides='x'))
engine = self._make_engine(flow)
engine.run()
self.assertEqual(engine.storage.fetch_all(), {'x': 1})
def test_run_empty_graph_flow(self):
flow = gf.Flow('flow-1', utils.OneReturnRetry(provides='x'))
engine = self._make_engine(flow)
engine.run()
self.assertEqual(engine.storage.fetch_all(), {'x': 1})
class SingleThreadedEngineTest(RetryTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None):
return taskflow.engines.load(flow,
flow_detail=flow_detail,
engine_conf='serial',
backend=self.backend)
class MultiThreadedEngineTest(RetryTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None):
engine_conf = dict(engine='parallel',
executor=executor)
return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf,
backend=self.backend)
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
class ParallelEngineWithEventletTest(RetryTest,
test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None:
executor = eu.GreenExecutor()
engine_conf = dict(engine='parallel',
executor=executor)
return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf,
backend=self.backend)

View File

@@ -20,6 +20,7 @@ import six
from taskflow import exceptions
from taskflow.persistence.backends import impl_memory
from taskflow import retry
from taskflow import task
from taskflow.utils import misc
@@ -269,3 +270,12 @@ class FailureMatcher(object):
def __eq__(self, other):
return self._failure.matches(other)
class OneReturnRetry(retry.AlwaysRevert):
def execute(self, **kwargs):
return 1
def revert(self, **kwargs):
pass