diff --git a/barbican/queue/server.py b/barbican/queue/server.py index 07d1cf3c1..0b0666385 100644 --- a/barbican/queue/server.py +++ b/barbican/queue/server.py @@ -32,6 +32,7 @@ from barbican import i18n as u from barbican.model import repositories from barbican.openstack.common import service from barbican import queue +from barbican.tasks import common from barbican.tasks import resources if newrelic_loaded: @@ -42,6 +43,29 @@ LOG = utils.getLogger(__name__) CONF = cfg.CONF +# Maps the common/shared RetryTasks (returned from lower-level business logic +# and plugin processing) to top-level RPC tasks in the Tasks class below. +MAP_RETRY_TASKS = { + common.RetryTasks.INVOKE_CERT_STATUS_CHECK_TASK: 'check_certificate_status' +} + + +def retryable(fn): + """Provides retry/scheduling support to tasks.""" + + @functools.wraps(fn) + def wrapper(method_self, *args, **kwargs): + result = fn(method_self, *args, **kwargs) + retry_rpc_method = schedule_retry_tasks( + fn, result, *args, **kwargs) + if retry_rpc_method: + LOG.info( + u._LI("Scheduled RPC method for retry: '%s'"), + retry_rpc_method) + + return wrapper + + def transactional(fn): """Provides request-scoped database transaction support to tasks.""" @@ -50,7 +74,8 @@ def transactional(fn): fn_name = getattr(fn, '__name__', '????') if not queue.is_server_side(): - fn(*args, **kwargs) # Non-server mode directly invokes tasks. + # Non-server mode directly invokes tasks. + fn(*args, **kwargs) LOG.info(u._LI("Completed worker task: '%s'"), fn_name) else: # Manage session/transaction. @@ -106,6 +131,41 @@ def monitored(fn): # pragma: no cover return fn +def schedule_retry_tasks(invoked_task, retry_result, *args, **kwargs): + """Schedules a task for retry. + + :param invoked_task: The RPC method that was just invoked. + :param retry_result: A :class:`FollowOnProcessingStatusDTO` if follow-on + processing (such as retrying this or another task) is + required, otherwise None indicates no such follow-on + processing is required. + :param args: List of arguments passed in to the just-invoked task. + :param kwargs: Dict of arguments passed in to the just-invoked task. + :return: Returns the RPC task method scheduled for a retry, None if no RPC + task was scheduled. + """ + + retry_rpc_method = None + + if not retry_result: + pass + + elif common.RetryTasks.INVOKE_SAME_TASK == retry_result.retry_task: + if invoked_task: + retry_rpc_method = getattr( + invoked_task, '__name__', None) + + else: + retry_rpc_method = MAP_RETRY_TASKS.get(retry_result.retry_task) + + if retry_rpc_method: + # TODO(john-wood-w) Add retry task to the retry table here. + LOG.debug( + 'Scheduling RPC method for retry: {0}'.format(retry_rpc_method)) + + return retry_rpc_method + + class Tasks(object): """Tasks that can be invoked asynchronously in Barbican. @@ -113,6 +173,9 @@ class Tasks(object): called directly from the client side for non-asynchronous standalone single-node operation. + If a new method is added that can be retried, please also add its method + name to MAP_RETRY_TASKS above. + The TaskServer class below extends this class to implement a worker-side server utilizing Oslo messaging's RPC server. This RPC server can invoke methods on itself, which include the methods in this class. @@ -120,23 +183,26 @@ class Tasks(object): @monitored @transactional + @retryable def process_type_order(self, context, order_id, project_id): """Process TypeOrder.""" LOG.info( u._LI("Processing type order: order ID is '%s'"), order_id ) - resources.BeginTypeOrder().process(order_id, project_id) + return resources.BeginTypeOrder().process(order_id, project_id) @monitored @transactional + @retryable def update_order(self, context, order_id, project_id, updated_meta): """Update Order.""" LOG.info( u._LI("Processing update order: order ID is '%s'"), order_id ) - resources.UpdateOrder().process(order_id, project_id, updated_meta) + return resources.UpdateOrder().process( + order_id, project_id, updated_meta) class TaskServer(Tasks, service.Service): diff --git a/barbican/tasks/common.py b/barbican/tasks/common.py new file mode 100644 index 000000000..4c60fcc3f --- /dev/null +++ b/barbican/tasks/common.py @@ -0,0 +1,88 @@ +# Copyright (c) 2015 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tasking related information that is shared/common across modules. +""" +from barbican import i18n as u + + +RETRY_MSEC_DEFAULT = 60 * 1000 + + +class RetryTasks(object): + """Defines tasks that can be retried/scheduled. + + RPC tasks invoked from the queue are handled via methods on + barbican.queue.server.Tasks. These calls in turn delegate to the + 'process()' method of BaseTask sub-classes. These calls in turn delegate + to business logic and plugins via modules in this barbican.tasks package. + This class defines a common mechanism for the business logic and plugins + to indicate what RPC tasks need to be retried in a way that the Tasks + class can interpret as high level RPC tasks to enqueue later. + + In particular the following generic options are available: + + INVOKE_SAME_TASK - Invoke this same task later + + NO_ACTION_REQUIRED - To retry/scheduling actions are required + + + The following task/context-specific actions are available: + + INVOKE_CERT_STATUS_CHECK_TASK - Check certificate status later + + """ + + INVOKE_SAME_TASK = "Invoke Same Task Again Later" + NO_ACTION_REQUIRED = "No Retry/Schedule Actions Are Needed" + INVOKE_CERT_STATUS_CHECK_TASK = "Check Certificate Status Later" + + +class FollowOnProcessingStatusDTO(object): + """Follow On Processing status data transfer object (DTO). + + An object of this type is optionally returned by the + BaseTask.handle_processing() method defined below, and is used to guide + follow on processing and to provide status feedback to clients. + """ + def __init__( + self, + status=u._('Unknown'), + status_message=u._('Unknown'), + retry_task=RetryTasks.NO_ACTION_REQUIRED, + retry_msec=RETRY_MSEC_DEFAULT + ): + """Creates a new FollowOnProcessingStatusDTO. + + :param status: Status for cert order + :param status_message: Message to explain status type. + :param retry_msec: Number of milliseconds to wait for retry + :param retry_task: Task to retry, one of :class:`RetryTasks` + """ + self.status = status + self.status_message = status_message + self.retry_task = retry_task + + if not retry_msec: + self.retry_msec = 0 + else: + self.retry_msec = max(int(retry_msec), 0) + + def is_follow_on_needed(self): + if self.retry_task: + return RetryTasks.NO_ACTION_REQUIRED != self.retry_task + else: + return False diff --git a/barbican/tasks/resources.py b/barbican/tasks/resources.py index 471e72d68..0e333ddba 100644 --- a/barbican/tasks/resources.py +++ b/barbican/tasks/resources.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2014 Rackspace, Inc. +# Copyright (c) 2013-2015 Rackspace, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -32,34 +32,6 @@ from barbican.tasks import certificate_resources as cert LOG = utils.getLogger(__name__) -RETRY_MSEC = 60 * 1000 - - -class FollowOnProcessingStatusDTO(object): - """Follow On Processing status data transfer object (DTO). - - An object of this type is optionally returned by the - BaseTask.handle_processing() method defined below, and is used to guide - follow on processing and to provide status feedback to clients. - """ - def __init__(self, status=u._('Unknown'), status_message=u._('Unknown'), - retry_method=None, retry_msec=RETRY_MSEC): - """Creates a new FollowOnProcessingStatusDTO. - - :param status: Status for cert order - :param status_message: Message to explain status type. - :param retry_method: Method to retry - :param retry_msec: Number of milliseconds to wait for retry - """ - self.status = status - self.status_message = status_message - self.retry_method = retry_method - self.retry_msec = int(retry_msec) - - def is_follow_on_needed(self): - return self.retry_method - - @six.add_metaclass(abc.ABCMeta) class BaseTask(object): """Base asynchronous task.""" @@ -81,9 +53,13 @@ class BaseTask(object): :param args: List of arguments passed in from the client. :param kwargs: Dict of arguments passed in from the client. - :return: None + :return: Returns :class:`FollowOnProcessingStatusDTO` if follow-on + processing (such as retrying this or another task) is + required, otherwise a None return indicates that no + follow-on processing is required. """ name = self.get_name() + result = None # Retrieve the target entity (such as an models.Order instance). try: @@ -121,6 +97,8 @@ class BaseTask(object): "executing task '%s'."), name) raise e + return result + @abc.abstractmethod def retrieve_entity(self, *args, **kwargs): """A hook method to retrieve an entity for processing. @@ -216,6 +194,8 @@ class _OrderTaskHelper(object): if not is_follow_on_needed: order.status = models.States.ACTIVE + else: + order.status = models.States.PENDING if sub_status: order.set_sub_status_safely(sub_status) diff --git a/barbican/tests/api/controllers/test_cas.py b/barbican/tests/api/controllers/test_cas.py index c9e2eb2d3..1be7914b6 100644 --- a/barbican/tests/api/controllers/test_cas.py +++ b/barbican/tests/api/controllers/test_cas.py @@ -349,4 +349,4 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase): return '/cas?limit={0}&offset={1}'.format( limit, offset) else: - return '/cas' \ No newline at end of file + return '/cas' diff --git a/barbican/tests/api/controllers/test_containers.py b/barbican/tests/api/controllers/test_containers.py index 2b4fb5ea5..e394f7e2d 100644 --- a/barbican/tests/api/controllers/test_containers.py +++ b/barbican/tests/api/controllers/test_containers.py @@ -23,7 +23,7 @@ from barbican.tests import utils containers_repo = repositories.get_container_repository() -class SuccessfulContainerCreateMixin(): +class SuccessfulContainerCreateMixin(object): def _assert_successful_container_create(self, resp, container_uuid): self.assertEqual(201, resp.status_int) # this will raise if the container uuid is not proper diff --git a/barbican/tests/model/repositories/test_repositories_certificate_authorities.py b/barbican/tests/model/repositories/test_repositories_certificate_authorities.py index 1041a9d2a..18f696113 100644 --- a/barbican/tests/model/repositories/test_repositories_certificate_authorities.py +++ b/barbican/tests/model/repositories/test_repositories_certificate_authorities.py @@ -418,4 +418,4 @@ class WhenTestingPreferredCARepo(database_utils.RepositoryTestCase): ca = self._add_ca(self.parsed_ca, session) session.commit() - self.ca_repo.update_entity(ca, self.parsed_modified_ca, session) \ No newline at end of file + self.ca_repo.update_entity(ca, self.parsed_modified_ca, session) diff --git a/barbican/tests/queue/test_server.py b/barbican/tests/queue/test_server.py index 8fe27cc01..b3da68a89 100644 --- a/barbican/tests/queue/test_server.py +++ b/barbican/tests/queue/test_server.py @@ -16,6 +16,7 @@ import mock from barbican import queue from barbican.queue import server +from barbican.tasks import common from barbican.tests import utils @@ -100,6 +101,109 @@ class WhenUsingTransactionalDecorator(utils.BaseTestCase): self.assertEqual(self.clear_mock.call_count, 1) +class WhenUsingRetryableDecorator(utils.BaseTestCase): + """Test using the 'retryable' decorator in server.py.""" + + def setUp(self): + super(WhenUsingRetryableDecorator, self).setUp() + + self.schedule_retry_tasks_patcher = mock.patch( + 'barbican.queue.server.schedule_retry_tasks' + ) + self.schedule_retry_tasks_mock = ( + self.schedule_retry_tasks_patcher.start() + ) + + self.args = ('foo', 'bar') + self.kwargs = {'k_foo': 1, 'k_bar': 2} + + # Class/decorator under test. + class TestClass(object): + my_args = None + my_kwargs = None + is_exception_needed = False + result = common.FollowOnProcessingStatusDTO() + + @server.retryable + def test_method(self, *args, **kwargs): + if self.is_exception_needed: + raise ValueError() + self.my_args = args + self.my_kwargs = kwargs + return self.result + self.test_object = TestClass() + self.test_method = TestClass.test_method + + def tearDown(self): + super(WhenUsingRetryableDecorator, self).tearDown() + self.schedule_retry_tasks_patcher.stop() + + def test_should_successfully_schedule_a_task_for_retry(self): + self.test_object.test_method(*self.args, **self.kwargs) + + self.assertEqual(self.args, self.test_object.my_args) + self.assertEqual(self.kwargs, self.test_object.my_kwargs) + + self.assertEqual(self.schedule_retry_tasks_mock.call_count, 1) + self.schedule_retry_tasks_mock.assert_called_with( + mock.ANY, + self.test_object.result, + *self.args, + **self.kwargs) + + def test_retry_should_not_be_scheduled_if_exception_is_raised(self): + self.test_object.is_exception_needed = True + + self.assertRaises( + ValueError, + self.test_object.test_method, + self.args, + self.kwargs, + ) + + self.assertEqual(self.schedule_retry_tasks_mock.call_count, 0) + + +class WhenCallingScheduleRetryTasks(utils.BaseTestCase): + """Test calling schedule_retry_tasks() in server.py.""" + + def setUp(self): + super(WhenCallingScheduleRetryTasks, self).setUp() + + self.result = common.FollowOnProcessingStatusDTO() + + def test_should_not_schedule_task_due_to_no_result(self): + retry_rpc_method = server.schedule_retry_tasks(None, None) + + self.assertIsNone(retry_rpc_method) + + def test_should_not_schedule_task_due_to_no_action_required_result(self): + self.result.retry_task = common.RetryTasks.NO_ACTION_REQUIRED + + retry_rpc_method = server.schedule_retry_tasks(None, self.result) + + self.assertIsNone(retry_rpc_method) + + def test_should_schedule_invoking_task_for_retry(self): + self.result.retry_task = common.RetryTasks.INVOKE_SAME_TASK + + retry_rpc_method = server.schedule_retry_tasks( + self.test_should_schedule_invoking_task_for_retry, self.result) + + self.assertEqual( + 'test_should_schedule_invoking_task_for_retry', retry_rpc_method) + + def test_should_schedule_certificate_status_task_for_retry(self): + self.result.retry_task = ( + common.RetryTasks.INVOKE_CERT_STATUS_CHECK_TASK + ) + + retry_rpc_method = server.schedule_retry_tasks(None, self.result) + + self.assertEqual( + 'check_certificate_status', retry_rpc_method) + + class WhenUsingBeginTypeOrderTask(utils.BaseTestCase): """Test using the Tasks class for 'type order' task.""" diff --git a/barbican/tests/tasks/test_common.py b/barbican/tests/tasks/test_common.py new file mode 100644 index 000000000..243736bb3 --- /dev/null +++ b/barbican/tests/tasks/test_common.py @@ -0,0 +1,45 @@ +# Copyright (c) 2015 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from barbican import i18n as u +from barbican.tasks import common +from barbican.tests import utils + + +class WhenUsingFollowOnProcessingStatusDTO(utils.BaseTestCase): + """Test using the :class:`WhenUsingFollowOnProcessingStatusDTO` class.""" + + def setUp(self): + super(WhenUsingFollowOnProcessingStatusDTO, self).setUp() + + self.target = common.FollowOnProcessingStatusDTO() + + def test_should_have_expected_defaults(self): + self.assertEqual( + common.RetryTasks.NO_ACTION_REQUIRED, self.target.retry_task) + self.assertEqual(u._('Unknown'), self.target.status) + self.assertEqual(u._('Unknown'), self.target.status_message) + self.assertEqual(common.RETRY_MSEC_DEFAULT, self.target.retry_msec) + self.assertEqual(False, self.target.is_follow_on_needed()) + + def test_should_indicate_no_follow_on_with_no_retry_task(self): + self.target.retry_task = None + + self.assertEqual(False, self.target.is_follow_on_needed()) + + def test_should_indicate_follow_on_when_retry_task_provided(self): + self.target.retry_task = common.RetryTasks.INVOKE_SAME_TASK + + self.assertEqual(True, self.target.is_follow_on_needed()) diff --git a/barbican/tests/tasks/test_resources.py b/barbican/tests/tasks/test_resources.py index 8fe703497..adf29e7d3 100644 --- a/barbican/tests/tasks/test_resources.py +++ b/barbican/tests/tasks/test_resources.py @@ -1,3 +1,5 @@ +# Copyright (c) 2015 Rackspace, Inc. +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -16,6 +18,7 @@ import mock from barbican import i18n as u from barbican.model import models from barbican.openstack.common import timeutils +from barbican.tasks import common from barbican.tasks import resources from barbican.tests import utils @@ -91,7 +94,7 @@ class WhenUsingOrderTaskHelper(BaseOrderTestCase): def setUp(self): super(WhenUsingOrderTaskHelper, self).setUp() - self.result = resources.FollowOnProcessingStatusDTO() + self.result = common.FollowOnProcessingStatusDTO() self.helper = resources._OrderTaskHelper() @@ -130,13 +133,13 @@ class WhenUsingOrderTaskHelper(BaseOrderTestCase): self.order_repo.save.assert_called_once_with(self.order) def test_should_handle_success_result_follow_on_needed(self): - self.result.retry_method = 'bogus_method_here' + self.result.retry_task = common.RetryTasks.INVOKE_SAME_TASK self.result.status = 'status' self.result.status_message = 'status_message' self.helper.handle_success(self.order, self.result) - self.assertNotEqual(models.States.ACTIVE, self.order.status) + self.assertEqual(models.States.PENDING, self.order.status) self.assertEqual('status', self.order.sub_status) self.assertEqual('status_message', self.order.sub_status_message) self.order_repo.save.assert_called_once_with(self.order)