Allow business logic and plugins to retry tasks

Add a new common.py module to the barbican/tasks application to define
a concept for retry tasks that is common between the the
barbican.queue.server.py logic (that executes RPC Task methods from the
queue) and the lower level business logic and plugin eventually invoked
by these top-level RPC Task methods. A follow on CR will begin to
populate the retry database table with task retry entries, for the
retry scheduler period process to schedule later.

Change-Id: I2214b2ead65ea78b3baf6cb2463711d2dec22493
Implements: blueprint add-worker-retry-update-support
This commit is contained in:
jfwood 2015-03-18 15:53:05 -05:00
parent b2fbda189f
commit a433db2da6
9 changed files with 325 additions and 39 deletions

View File

@ -32,6 +32,7 @@ from barbican import i18n as u
from barbican.model import repositories
from barbican.openstack.common import service
from barbican import queue
from barbican.tasks import common
from barbican.tasks import resources
if newrelic_loaded:
@ -42,6 +43,29 @@ LOG = utils.getLogger(__name__)
CONF = cfg.CONF
# Maps the common/shared RetryTasks (returned from lower-level business logic
# and plugin processing) to top-level RPC tasks in the Tasks class below.
MAP_RETRY_TASKS = {
common.RetryTasks.INVOKE_CERT_STATUS_CHECK_TASK: 'check_certificate_status'
}
def retryable(fn):
"""Provides retry/scheduling support to tasks."""
@functools.wraps(fn)
def wrapper(method_self, *args, **kwargs):
result = fn(method_self, *args, **kwargs)
retry_rpc_method = schedule_retry_tasks(
fn, result, *args, **kwargs)
if retry_rpc_method:
LOG.info(
u._LI("Scheduled RPC method for retry: '%s'"),
retry_rpc_method)
return wrapper
def transactional(fn):
"""Provides request-scoped database transaction support to tasks."""
@ -50,7 +74,8 @@ def transactional(fn):
fn_name = getattr(fn, '__name__', '????')
if not queue.is_server_side():
fn(*args, **kwargs) # Non-server mode directly invokes tasks.
# Non-server mode directly invokes tasks.
fn(*args, **kwargs)
LOG.info(u._LI("Completed worker task: '%s'"), fn_name)
else:
# Manage session/transaction.
@ -106,6 +131,41 @@ def monitored(fn): # pragma: no cover
return fn
def schedule_retry_tasks(invoked_task, retry_result, *args, **kwargs):
"""Schedules a task for retry.
:param invoked_task: The RPC method that was just invoked.
:param retry_result: A :class:`FollowOnProcessingStatusDTO` if follow-on
processing (such as retrying this or another task) is
required, otherwise None indicates no such follow-on
processing is required.
:param args: List of arguments passed in to the just-invoked task.
:param kwargs: Dict of arguments passed in to the just-invoked task.
:return: Returns the RPC task method scheduled for a retry, None if no RPC
task was scheduled.
"""
retry_rpc_method = None
if not retry_result:
pass
elif common.RetryTasks.INVOKE_SAME_TASK == retry_result.retry_task:
if invoked_task:
retry_rpc_method = getattr(
invoked_task, '__name__', None)
else:
retry_rpc_method = MAP_RETRY_TASKS.get(retry_result.retry_task)
if retry_rpc_method:
# TODO(john-wood-w) Add retry task to the retry table here.
LOG.debug(
'Scheduling RPC method for retry: {0}'.format(retry_rpc_method))
return retry_rpc_method
class Tasks(object):
"""Tasks that can be invoked asynchronously in Barbican.
@ -113,6 +173,9 @@ class Tasks(object):
called directly from the client side for non-asynchronous standalone
single-node operation.
If a new method is added that can be retried, please also add its method
name to MAP_RETRY_TASKS above.
The TaskServer class below extends this class to implement a worker-side
server utilizing Oslo messaging's RPC server. This RPC server can invoke
methods on itself, which include the methods in this class.
@ -120,23 +183,26 @@ class Tasks(object):
@monitored
@transactional
@retryable
def process_type_order(self, context, order_id, project_id):
"""Process TypeOrder."""
LOG.info(
u._LI("Processing type order: order ID is '%s'"),
order_id
)
resources.BeginTypeOrder().process(order_id, project_id)
return resources.BeginTypeOrder().process(order_id, project_id)
@monitored
@transactional
@retryable
def update_order(self, context, order_id, project_id, updated_meta):
"""Update Order."""
LOG.info(
u._LI("Processing update order: order ID is '%s'"),
order_id
)
resources.UpdateOrder().process(order_id, project_id, updated_meta)
return resources.UpdateOrder().process(
order_id, project_id, updated_meta)
class TaskServer(Tasks, service.Service):

88
barbican/tasks/common.py Normal file
View File

@ -0,0 +1,88 @@
# Copyright (c) 2015 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Tasking related information that is shared/common across modules.
"""
from barbican import i18n as u
RETRY_MSEC_DEFAULT = 60 * 1000
class RetryTasks(object):
"""Defines tasks that can be retried/scheduled.
RPC tasks invoked from the queue are handled via methods on
barbican.queue.server.Tasks. These calls in turn delegate to the
'process()' method of BaseTask sub-classes. These calls in turn delegate
to business logic and plugins via modules in this barbican.tasks package.
This class defines a common mechanism for the business logic and plugins
to indicate what RPC tasks need to be retried in a way that the Tasks
class can interpret as high level RPC tasks to enqueue later.
In particular the following generic options are available:
INVOKE_SAME_TASK - Invoke this same task later
NO_ACTION_REQUIRED - To retry/scheduling actions are required
The following task/context-specific actions are available:
INVOKE_CERT_STATUS_CHECK_TASK - Check certificate status later
"""
INVOKE_SAME_TASK = "Invoke Same Task Again Later"
NO_ACTION_REQUIRED = "No Retry/Schedule Actions Are Needed"
INVOKE_CERT_STATUS_CHECK_TASK = "Check Certificate Status Later"
class FollowOnProcessingStatusDTO(object):
"""Follow On Processing status data transfer object (DTO).
An object of this type is optionally returned by the
BaseTask.handle_processing() method defined below, and is used to guide
follow on processing and to provide status feedback to clients.
"""
def __init__(
self,
status=u._('Unknown'),
status_message=u._('Unknown'),
retry_task=RetryTasks.NO_ACTION_REQUIRED,
retry_msec=RETRY_MSEC_DEFAULT
):
"""Creates a new FollowOnProcessingStatusDTO.
:param status: Status for cert order
:param status_message: Message to explain status type.
:param retry_msec: Number of milliseconds to wait for retry
:param retry_task: Task to retry, one of :class:`RetryTasks`
"""
self.status = status
self.status_message = status_message
self.retry_task = retry_task
if not retry_msec:
self.retry_msec = 0
else:
self.retry_msec = max(int(retry_msec), 0)
def is_follow_on_needed(self):
if self.retry_task:
return RetryTasks.NO_ACTION_REQUIRED != self.retry_task
else:
return False

View File

@ -1,4 +1,4 @@
# Copyright (c) 2013-2014 Rackspace, Inc.
# Copyright (c) 2013-2015 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -32,34 +32,6 @@ from barbican.tasks import certificate_resources as cert
LOG = utils.getLogger(__name__)
RETRY_MSEC = 60 * 1000
class FollowOnProcessingStatusDTO(object):
"""Follow On Processing status data transfer object (DTO).
An object of this type is optionally returned by the
BaseTask.handle_processing() method defined below, and is used to guide
follow on processing and to provide status feedback to clients.
"""
def __init__(self, status=u._('Unknown'), status_message=u._('Unknown'),
retry_method=None, retry_msec=RETRY_MSEC):
"""Creates a new FollowOnProcessingStatusDTO.
:param status: Status for cert order
:param status_message: Message to explain status type.
:param retry_method: Method to retry
:param retry_msec: Number of milliseconds to wait for retry
"""
self.status = status
self.status_message = status_message
self.retry_method = retry_method
self.retry_msec = int(retry_msec)
def is_follow_on_needed(self):
return self.retry_method
@six.add_metaclass(abc.ABCMeta)
class BaseTask(object):
"""Base asynchronous task."""
@ -81,9 +53,13 @@ class BaseTask(object):
:param args: List of arguments passed in from the client.
:param kwargs: Dict of arguments passed in from the client.
:return: None
:return: Returns :class:`FollowOnProcessingStatusDTO` if follow-on
processing (such as retrying this or another task) is
required, otherwise a None return indicates that no
follow-on processing is required.
"""
name = self.get_name()
result = None
# Retrieve the target entity (such as an models.Order instance).
try:
@ -121,6 +97,8 @@ class BaseTask(object):
"executing task '%s'."), name)
raise e
return result
@abc.abstractmethod
def retrieve_entity(self, *args, **kwargs):
"""A hook method to retrieve an entity for processing.
@ -216,6 +194,8 @@ class _OrderTaskHelper(object):
if not is_follow_on_needed:
order.status = models.States.ACTIVE
else:
order.status = models.States.PENDING
if sub_status:
order.set_sub_status_safely(sub_status)

View File

@ -349,4 +349,4 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase):
return '/cas?limit={0}&offset={1}'.format(
limit, offset)
else:
return '/cas'
return '/cas'

View File

@ -23,7 +23,7 @@ from barbican.tests import utils
containers_repo = repositories.get_container_repository()
class SuccessfulContainerCreateMixin():
class SuccessfulContainerCreateMixin(object):
def _assert_successful_container_create(self, resp, container_uuid):
self.assertEqual(201, resp.status_int)
# this will raise if the container uuid is not proper

View File

@ -418,4 +418,4 @@ class WhenTestingPreferredCARepo(database_utils.RepositoryTestCase):
ca = self._add_ca(self.parsed_ca, session)
session.commit()
self.ca_repo.update_entity(ca, self.parsed_modified_ca, session)
self.ca_repo.update_entity(ca, self.parsed_modified_ca, session)

View File

@ -16,6 +16,7 @@ import mock
from barbican import queue
from barbican.queue import server
from barbican.tasks import common
from barbican.tests import utils
@ -100,6 +101,109 @@ class WhenUsingTransactionalDecorator(utils.BaseTestCase):
self.assertEqual(self.clear_mock.call_count, 1)
class WhenUsingRetryableDecorator(utils.BaseTestCase):
"""Test using the 'retryable' decorator in server.py."""
def setUp(self):
super(WhenUsingRetryableDecorator, self).setUp()
self.schedule_retry_tasks_patcher = mock.patch(
'barbican.queue.server.schedule_retry_tasks'
)
self.schedule_retry_tasks_mock = (
self.schedule_retry_tasks_patcher.start()
)
self.args = ('foo', 'bar')
self.kwargs = {'k_foo': 1, 'k_bar': 2}
# Class/decorator under test.
class TestClass(object):
my_args = None
my_kwargs = None
is_exception_needed = False
result = common.FollowOnProcessingStatusDTO()
@server.retryable
def test_method(self, *args, **kwargs):
if self.is_exception_needed:
raise ValueError()
self.my_args = args
self.my_kwargs = kwargs
return self.result
self.test_object = TestClass()
self.test_method = TestClass.test_method
def tearDown(self):
super(WhenUsingRetryableDecorator, self).tearDown()
self.schedule_retry_tasks_patcher.stop()
def test_should_successfully_schedule_a_task_for_retry(self):
self.test_object.test_method(*self.args, **self.kwargs)
self.assertEqual(self.args, self.test_object.my_args)
self.assertEqual(self.kwargs, self.test_object.my_kwargs)
self.assertEqual(self.schedule_retry_tasks_mock.call_count, 1)
self.schedule_retry_tasks_mock.assert_called_with(
mock.ANY,
self.test_object.result,
*self.args,
**self.kwargs)
def test_retry_should_not_be_scheduled_if_exception_is_raised(self):
self.test_object.is_exception_needed = True
self.assertRaises(
ValueError,
self.test_object.test_method,
self.args,
self.kwargs,
)
self.assertEqual(self.schedule_retry_tasks_mock.call_count, 0)
class WhenCallingScheduleRetryTasks(utils.BaseTestCase):
"""Test calling schedule_retry_tasks() in server.py."""
def setUp(self):
super(WhenCallingScheduleRetryTasks, self).setUp()
self.result = common.FollowOnProcessingStatusDTO()
def test_should_not_schedule_task_due_to_no_result(self):
retry_rpc_method = server.schedule_retry_tasks(None, None)
self.assertIsNone(retry_rpc_method)
def test_should_not_schedule_task_due_to_no_action_required_result(self):
self.result.retry_task = common.RetryTasks.NO_ACTION_REQUIRED
retry_rpc_method = server.schedule_retry_tasks(None, self.result)
self.assertIsNone(retry_rpc_method)
def test_should_schedule_invoking_task_for_retry(self):
self.result.retry_task = common.RetryTasks.INVOKE_SAME_TASK
retry_rpc_method = server.schedule_retry_tasks(
self.test_should_schedule_invoking_task_for_retry, self.result)
self.assertEqual(
'test_should_schedule_invoking_task_for_retry', retry_rpc_method)
def test_should_schedule_certificate_status_task_for_retry(self):
self.result.retry_task = (
common.RetryTasks.INVOKE_CERT_STATUS_CHECK_TASK
)
retry_rpc_method = server.schedule_retry_tasks(None, self.result)
self.assertEqual(
'check_certificate_status', retry_rpc_method)
class WhenUsingBeginTypeOrderTask(utils.BaseTestCase):
"""Test using the Tasks class for 'type order' task."""

View File

@ -0,0 +1,45 @@
# Copyright (c) 2015 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from barbican import i18n as u
from barbican.tasks import common
from barbican.tests import utils
class WhenUsingFollowOnProcessingStatusDTO(utils.BaseTestCase):
"""Test using the :class:`WhenUsingFollowOnProcessingStatusDTO` class."""
def setUp(self):
super(WhenUsingFollowOnProcessingStatusDTO, self).setUp()
self.target = common.FollowOnProcessingStatusDTO()
def test_should_have_expected_defaults(self):
self.assertEqual(
common.RetryTasks.NO_ACTION_REQUIRED, self.target.retry_task)
self.assertEqual(u._('Unknown'), self.target.status)
self.assertEqual(u._('Unknown'), self.target.status_message)
self.assertEqual(common.RETRY_MSEC_DEFAULT, self.target.retry_msec)
self.assertEqual(False, self.target.is_follow_on_needed())
def test_should_indicate_no_follow_on_with_no_retry_task(self):
self.target.retry_task = None
self.assertEqual(False, self.target.is_follow_on_needed())
def test_should_indicate_follow_on_when_retry_task_provided(self):
self.target.retry_task = common.RetryTasks.INVOKE_SAME_TASK
self.assertEqual(True, self.target.is_follow_on_needed())

View File

@ -1,3 +1,5 @@
# Copyright (c) 2015 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
@ -16,6 +18,7 @@ import mock
from barbican import i18n as u
from barbican.model import models
from barbican.openstack.common import timeutils
from barbican.tasks import common
from barbican.tasks import resources
from barbican.tests import utils
@ -91,7 +94,7 @@ class WhenUsingOrderTaskHelper(BaseOrderTestCase):
def setUp(self):
super(WhenUsingOrderTaskHelper, self).setUp()
self.result = resources.FollowOnProcessingStatusDTO()
self.result = common.FollowOnProcessingStatusDTO()
self.helper = resources._OrderTaskHelper()
@ -130,13 +133,13 @@ class WhenUsingOrderTaskHelper(BaseOrderTestCase):
self.order_repo.save.assert_called_once_with(self.order)
def test_should_handle_success_result_follow_on_needed(self):
self.result.retry_method = 'bogus_method_here'
self.result.retry_task = common.RetryTasks.INVOKE_SAME_TASK
self.result.status = 'status'
self.result.status_message = 'status_message'
self.helper.handle_success(self.order, self.result)
self.assertNotEqual(models.States.ACTIVE, self.order.status)
self.assertEqual(models.States.PENDING, self.order.status)
self.assertEqual('status', self.order.sub_status)
self.assertEqual('status_message', self.order.sub_status_message)
self.order_repo.save.assert_called_once_with(self.order)