From a433db2da6a37676320d3868488161880a126812 Mon Sep 17 00:00:00 2001 From: jfwood Date: Wed, 18 Mar 2015 15:53:05 -0500 Subject: [PATCH] Allow business logic and plugins to retry tasks Add a new common.py module to the barbican/tasks application to define a concept for retry tasks that is common between the the barbican.queue.server.py logic (that executes RPC Task methods from the queue) and the lower level business logic and plugin eventually invoked by these top-level RPC Task methods. A follow on CR will begin to populate the retry database table with task retry entries, for the retry scheduler period process to schedule later. Change-Id: I2214b2ead65ea78b3baf6cb2463711d2dec22493 Implements: blueprint add-worker-retry-update-support --- barbican/queue/server.py | 72 +++++++++++- barbican/tasks/common.py | 88 +++++++++++++++ barbican/tasks/resources.py | 40 ++----- barbican/tests/api/controllers/test_cas.py | 2 +- .../tests/api/controllers/test_containers.py | 2 +- ...st_repositories_certificate_authorities.py | 2 +- barbican/tests/queue/test_server.py | 104 ++++++++++++++++++ barbican/tests/tasks/test_common.py | 45 ++++++++ barbican/tests/tasks/test_resources.py | 9 +- 9 files changed, 325 insertions(+), 39 deletions(-) create mode 100644 barbican/tasks/common.py create mode 100644 barbican/tests/tasks/test_common.py diff --git a/barbican/queue/server.py b/barbican/queue/server.py index 07d1cf3c1..0b0666385 100644 --- a/barbican/queue/server.py +++ b/barbican/queue/server.py @@ -32,6 +32,7 @@ from barbican import i18n as u from barbican.model import repositories from barbican.openstack.common import service from barbican import queue +from barbican.tasks import common from barbican.tasks import resources if newrelic_loaded: @@ -42,6 +43,29 @@ LOG = utils.getLogger(__name__) CONF = cfg.CONF +# Maps the common/shared RetryTasks (returned from lower-level business logic +# and plugin processing) to top-level RPC tasks in the Tasks class below. +MAP_RETRY_TASKS = { + common.RetryTasks.INVOKE_CERT_STATUS_CHECK_TASK: 'check_certificate_status' +} + + +def retryable(fn): + """Provides retry/scheduling support to tasks.""" + + @functools.wraps(fn) + def wrapper(method_self, *args, **kwargs): + result = fn(method_self, *args, **kwargs) + retry_rpc_method = schedule_retry_tasks( + fn, result, *args, **kwargs) + if retry_rpc_method: + LOG.info( + u._LI("Scheduled RPC method for retry: '%s'"), + retry_rpc_method) + + return wrapper + + def transactional(fn): """Provides request-scoped database transaction support to tasks.""" @@ -50,7 +74,8 @@ def transactional(fn): fn_name = getattr(fn, '__name__', '????') if not queue.is_server_side(): - fn(*args, **kwargs) # Non-server mode directly invokes tasks. + # Non-server mode directly invokes tasks. + fn(*args, **kwargs) LOG.info(u._LI("Completed worker task: '%s'"), fn_name) else: # Manage session/transaction. @@ -106,6 +131,41 @@ def monitored(fn): # pragma: no cover return fn +def schedule_retry_tasks(invoked_task, retry_result, *args, **kwargs): + """Schedules a task for retry. + + :param invoked_task: The RPC method that was just invoked. + :param retry_result: A :class:`FollowOnProcessingStatusDTO` if follow-on + processing (such as retrying this or another task) is + required, otherwise None indicates no such follow-on + processing is required. + :param args: List of arguments passed in to the just-invoked task. + :param kwargs: Dict of arguments passed in to the just-invoked task. + :return: Returns the RPC task method scheduled for a retry, None if no RPC + task was scheduled. + """ + + retry_rpc_method = None + + if not retry_result: + pass + + elif common.RetryTasks.INVOKE_SAME_TASK == retry_result.retry_task: + if invoked_task: + retry_rpc_method = getattr( + invoked_task, '__name__', None) + + else: + retry_rpc_method = MAP_RETRY_TASKS.get(retry_result.retry_task) + + if retry_rpc_method: + # TODO(john-wood-w) Add retry task to the retry table here. + LOG.debug( + 'Scheduling RPC method for retry: {0}'.format(retry_rpc_method)) + + return retry_rpc_method + + class Tasks(object): """Tasks that can be invoked asynchronously in Barbican. @@ -113,6 +173,9 @@ class Tasks(object): called directly from the client side for non-asynchronous standalone single-node operation. + If a new method is added that can be retried, please also add its method + name to MAP_RETRY_TASKS above. + The TaskServer class below extends this class to implement a worker-side server utilizing Oslo messaging's RPC server. This RPC server can invoke methods on itself, which include the methods in this class. @@ -120,23 +183,26 @@ class Tasks(object): @monitored @transactional + @retryable def process_type_order(self, context, order_id, project_id): """Process TypeOrder.""" LOG.info( u._LI("Processing type order: order ID is '%s'"), order_id ) - resources.BeginTypeOrder().process(order_id, project_id) + return resources.BeginTypeOrder().process(order_id, project_id) @monitored @transactional + @retryable def update_order(self, context, order_id, project_id, updated_meta): """Update Order.""" LOG.info( u._LI("Processing update order: order ID is '%s'"), order_id ) - resources.UpdateOrder().process(order_id, project_id, updated_meta) + return resources.UpdateOrder().process( + order_id, project_id, updated_meta) class TaskServer(Tasks, service.Service): diff --git a/barbican/tasks/common.py b/barbican/tasks/common.py new file mode 100644 index 000000000..4c60fcc3f --- /dev/null +++ b/barbican/tasks/common.py @@ -0,0 +1,88 @@ +# Copyright (c) 2015 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tasking related information that is shared/common across modules. +""" +from barbican import i18n as u + + +RETRY_MSEC_DEFAULT = 60 * 1000 + + +class RetryTasks(object): + """Defines tasks that can be retried/scheduled. + + RPC tasks invoked from the queue are handled via methods on + barbican.queue.server.Tasks. These calls in turn delegate to the + 'process()' method of BaseTask sub-classes. These calls in turn delegate + to business logic and plugins via modules in this barbican.tasks package. + This class defines a common mechanism for the business logic and plugins + to indicate what RPC tasks need to be retried in a way that the Tasks + class can interpret as high level RPC tasks to enqueue later. + + In particular the following generic options are available: + + INVOKE_SAME_TASK - Invoke this same task later + + NO_ACTION_REQUIRED - To retry/scheduling actions are required + + + The following task/context-specific actions are available: + + INVOKE_CERT_STATUS_CHECK_TASK - Check certificate status later + + """ + + INVOKE_SAME_TASK = "Invoke Same Task Again Later" + NO_ACTION_REQUIRED = "No Retry/Schedule Actions Are Needed" + INVOKE_CERT_STATUS_CHECK_TASK = "Check Certificate Status Later" + + +class FollowOnProcessingStatusDTO(object): + """Follow On Processing status data transfer object (DTO). + + An object of this type is optionally returned by the + BaseTask.handle_processing() method defined below, and is used to guide + follow on processing and to provide status feedback to clients. + """ + def __init__( + self, + status=u._('Unknown'), + status_message=u._('Unknown'), + retry_task=RetryTasks.NO_ACTION_REQUIRED, + retry_msec=RETRY_MSEC_DEFAULT + ): + """Creates a new FollowOnProcessingStatusDTO. + + :param status: Status for cert order + :param status_message: Message to explain status type. + :param retry_msec: Number of milliseconds to wait for retry + :param retry_task: Task to retry, one of :class:`RetryTasks` + """ + self.status = status + self.status_message = status_message + self.retry_task = retry_task + + if not retry_msec: + self.retry_msec = 0 + else: + self.retry_msec = max(int(retry_msec), 0) + + def is_follow_on_needed(self): + if self.retry_task: + return RetryTasks.NO_ACTION_REQUIRED != self.retry_task + else: + return False diff --git a/barbican/tasks/resources.py b/barbican/tasks/resources.py index 471e72d68..0e333ddba 100644 --- a/barbican/tasks/resources.py +++ b/barbican/tasks/resources.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2014 Rackspace, Inc. +# Copyright (c) 2013-2015 Rackspace, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -32,34 +32,6 @@ from barbican.tasks import certificate_resources as cert LOG = utils.getLogger(__name__) -RETRY_MSEC = 60 * 1000 - - -class FollowOnProcessingStatusDTO(object): - """Follow On Processing status data transfer object (DTO). - - An object of this type is optionally returned by the - BaseTask.handle_processing() method defined below, and is used to guide - follow on processing and to provide status feedback to clients. - """ - def __init__(self, status=u._('Unknown'), status_message=u._('Unknown'), - retry_method=None, retry_msec=RETRY_MSEC): - """Creates a new FollowOnProcessingStatusDTO. - - :param status: Status for cert order - :param status_message: Message to explain status type. - :param retry_method: Method to retry - :param retry_msec: Number of milliseconds to wait for retry - """ - self.status = status - self.status_message = status_message - self.retry_method = retry_method - self.retry_msec = int(retry_msec) - - def is_follow_on_needed(self): - return self.retry_method - - @six.add_metaclass(abc.ABCMeta) class BaseTask(object): """Base asynchronous task.""" @@ -81,9 +53,13 @@ class BaseTask(object): :param args: List of arguments passed in from the client. :param kwargs: Dict of arguments passed in from the client. - :return: None + :return: Returns :class:`FollowOnProcessingStatusDTO` if follow-on + processing (such as retrying this or another task) is + required, otherwise a None return indicates that no + follow-on processing is required. """ name = self.get_name() + result = None # Retrieve the target entity (such as an models.Order instance). try: @@ -121,6 +97,8 @@ class BaseTask(object): "executing task '%s'."), name) raise e + return result + @abc.abstractmethod def retrieve_entity(self, *args, **kwargs): """A hook method to retrieve an entity for processing. @@ -216,6 +194,8 @@ class _OrderTaskHelper(object): if not is_follow_on_needed: order.status = models.States.ACTIVE + else: + order.status = models.States.PENDING if sub_status: order.set_sub_status_safely(sub_status) diff --git a/barbican/tests/api/controllers/test_cas.py b/barbican/tests/api/controllers/test_cas.py index c9e2eb2d3..1be7914b6 100644 --- a/barbican/tests/api/controllers/test_cas.py +++ b/barbican/tests/api/controllers/test_cas.py @@ -349,4 +349,4 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase): return '/cas?limit={0}&offset={1}'.format( limit, offset) else: - return '/cas' \ No newline at end of file + return '/cas' diff --git a/barbican/tests/api/controllers/test_containers.py b/barbican/tests/api/controllers/test_containers.py index 2b4fb5ea5..e394f7e2d 100644 --- a/barbican/tests/api/controllers/test_containers.py +++ b/barbican/tests/api/controllers/test_containers.py @@ -23,7 +23,7 @@ from barbican.tests import utils containers_repo = repositories.get_container_repository() -class SuccessfulContainerCreateMixin(): +class SuccessfulContainerCreateMixin(object): def _assert_successful_container_create(self, resp, container_uuid): self.assertEqual(201, resp.status_int) # this will raise if the container uuid is not proper diff --git a/barbican/tests/model/repositories/test_repositories_certificate_authorities.py b/barbican/tests/model/repositories/test_repositories_certificate_authorities.py index 1041a9d2a..18f696113 100644 --- a/barbican/tests/model/repositories/test_repositories_certificate_authorities.py +++ b/barbican/tests/model/repositories/test_repositories_certificate_authorities.py @@ -418,4 +418,4 @@ class WhenTestingPreferredCARepo(database_utils.RepositoryTestCase): ca = self._add_ca(self.parsed_ca, session) session.commit() - self.ca_repo.update_entity(ca, self.parsed_modified_ca, session) \ No newline at end of file + self.ca_repo.update_entity(ca, self.parsed_modified_ca, session) diff --git a/barbican/tests/queue/test_server.py b/barbican/tests/queue/test_server.py index 8fe27cc01..b3da68a89 100644 --- a/barbican/tests/queue/test_server.py +++ b/barbican/tests/queue/test_server.py @@ -16,6 +16,7 @@ import mock from barbican import queue from barbican.queue import server +from barbican.tasks import common from barbican.tests import utils @@ -100,6 +101,109 @@ class WhenUsingTransactionalDecorator(utils.BaseTestCase): self.assertEqual(self.clear_mock.call_count, 1) +class WhenUsingRetryableDecorator(utils.BaseTestCase): + """Test using the 'retryable' decorator in server.py.""" + + def setUp(self): + super(WhenUsingRetryableDecorator, self).setUp() + + self.schedule_retry_tasks_patcher = mock.patch( + 'barbican.queue.server.schedule_retry_tasks' + ) + self.schedule_retry_tasks_mock = ( + self.schedule_retry_tasks_patcher.start() + ) + + self.args = ('foo', 'bar') + self.kwargs = {'k_foo': 1, 'k_bar': 2} + + # Class/decorator under test. + class TestClass(object): + my_args = None + my_kwargs = None + is_exception_needed = False + result = common.FollowOnProcessingStatusDTO() + + @server.retryable + def test_method(self, *args, **kwargs): + if self.is_exception_needed: + raise ValueError() + self.my_args = args + self.my_kwargs = kwargs + return self.result + self.test_object = TestClass() + self.test_method = TestClass.test_method + + def tearDown(self): + super(WhenUsingRetryableDecorator, self).tearDown() + self.schedule_retry_tasks_patcher.stop() + + def test_should_successfully_schedule_a_task_for_retry(self): + self.test_object.test_method(*self.args, **self.kwargs) + + self.assertEqual(self.args, self.test_object.my_args) + self.assertEqual(self.kwargs, self.test_object.my_kwargs) + + self.assertEqual(self.schedule_retry_tasks_mock.call_count, 1) + self.schedule_retry_tasks_mock.assert_called_with( + mock.ANY, + self.test_object.result, + *self.args, + **self.kwargs) + + def test_retry_should_not_be_scheduled_if_exception_is_raised(self): + self.test_object.is_exception_needed = True + + self.assertRaises( + ValueError, + self.test_object.test_method, + self.args, + self.kwargs, + ) + + self.assertEqual(self.schedule_retry_tasks_mock.call_count, 0) + + +class WhenCallingScheduleRetryTasks(utils.BaseTestCase): + """Test calling schedule_retry_tasks() in server.py.""" + + def setUp(self): + super(WhenCallingScheduleRetryTasks, self).setUp() + + self.result = common.FollowOnProcessingStatusDTO() + + def test_should_not_schedule_task_due_to_no_result(self): + retry_rpc_method = server.schedule_retry_tasks(None, None) + + self.assertIsNone(retry_rpc_method) + + def test_should_not_schedule_task_due_to_no_action_required_result(self): + self.result.retry_task = common.RetryTasks.NO_ACTION_REQUIRED + + retry_rpc_method = server.schedule_retry_tasks(None, self.result) + + self.assertIsNone(retry_rpc_method) + + def test_should_schedule_invoking_task_for_retry(self): + self.result.retry_task = common.RetryTasks.INVOKE_SAME_TASK + + retry_rpc_method = server.schedule_retry_tasks( + self.test_should_schedule_invoking_task_for_retry, self.result) + + self.assertEqual( + 'test_should_schedule_invoking_task_for_retry', retry_rpc_method) + + def test_should_schedule_certificate_status_task_for_retry(self): + self.result.retry_task = ( + common.RetryTasks.INVOKE_CERT_STATUS_CHECK_TASK + ) + + retry_rpc_method = server.schedule_retry_tasks(None, self.result) + + self.assertEqual( + 'check_certificate_status', retry_rpc_method) + + class WhenUsingBeginTypeOrderTask(utils.BaseTestCase): """Test using the Tasks class for 'type order' task.""" diff --git a/barbican/tests/tasks/test_common.py b/barbican/tests/tasks/test_common.py new file mode 100644 index 000000000..243736bb3 --- /dev/null +++ b/barbican/tests/tasks/test_common.py @@ -0,0 +1,45 @@ +# Copyright (c) 2015 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from barbican import i18n as u +from barbican.tasks import common +from barbican.tests import utils + + +class WhenUsingFollowOnProcessingStatusDTO(utils.BaseTestCase): + """Test using the :class:`WhenUsingFollowOnProcessingStatusDTO` class.""" + + def setUp(self): + super(WhenUsingFollowOnProcessingStatusDTO, self).setUp() + + self.target = common.FollowOnProcessingStatusDTO() + + def test_should_have_expected_defaults(self): + self.assertEqual( + common.RetryTasks.NO_ACTION_REQUIRED, self.target.retry_task) + self.assertEqual(u._('Unknown'), self.target.status) + self.assertEqual(u._('Unknown'), self.target.status_message) + self.assertEqual(common.RETRY_MSEC_DEFAULT, self.target.retry_msec) + self.assertEqual(False, self.target.is_follow_on_needed()) + + def test_should_indicate_no_follow_on_with_no_retry_task(self): + self.target.retry_task = None + + self.assertEqual(False, self.target.is_follow_on_needed()) + + def test_should_indicate_follow_on_when_retry_task_provided(self): + self.target.retry_task = common.RetryTasks.INVOKE_SAME_TASK + + self.assertEqual(True, self.target.is_follow_on_needed()) diff --git a/barbican/tests/tasks/test_resources.py b/barbican/tests/tasks/test_resources.py index 8fe703497..adf29e7d3 100644 --- a/barbican/tests/tasks/test_resources.py +++ b/barbican/tests/tasks/test_resources.py @@ -1,3 +1,5 @@ +# Copyright (c) 2015 Rackspace, Inc. +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -16,6 +18,7 @@ import mock from barbican import i18n as u from barbican.model import models from barbican.openstack.common import timeutils +from barbican.tasks import common from barbican.tasks import resources from barbican.tests import utils @@ -91,7 +94,7 @@ class WhenUsingOrderTaskHelper(BaseOrderTestCase): def setUp(self): super(WhenUsingOrderTaskHelper, self).setUp() - self.result = resources.FollowOnProcessingStatusDTO() + self.result = common.FollowOnProcessingStatusDTO() self.helper = resources._OrderTaskHelper() @@ -130,13 +133,13 @@ class WhenUsingOrderTaskHelper(BaseOrderTestCase): self.order_repo.save.assert_called_once_with(self.order) def test_should_handle_success_result_follow_on_needed(self): - self.result.retry_method = 'bogus_method_here' + self.result.retry_task = common.RetryTasks.INVOKE_SAME_TASK self.result.status = 'status' self.result.status_message = 'status_message' self.helper.handle_success(self.order, self.result) - self.assertNotEqual(models.States.ACTIVE, self.order.status) + self.assertEqual(models.States.PENDING, self.order.status) self.assertEqual('status', self.order.sub_status) self.assertEqual('status_message', self.order.sub_status_message) self.order_repo.save.assert_called_once_with(self.order)