diff --git a/barbican/model/models.py b/barbican/model/models.py index af5651df4..8f5cc1f86 100644 --- a/barbican/model/models.py +++ b/barbican/model/models.py @@ -634,7 +634,7 @@ class OrderBarbicanMetadatum(BASE, SoftDeleteMixIn, ModelBase): 'value': self.value} -class OrderRetryTask(BASE, SoftDeleteMixIn): +class OrderRetryTask(BASE, SoftDeleteMixIn, ModelBase): __tablename__ = "order_retry_tasks" __table_args__ = {"mysql_engine": "InnoDB"} @@ -648,13 +648,10 @@ class OrderRetryTask(BASE, SoftDeleteMixIn): ) retry_task = sa.Column(sa.Text, nullable=False) retry_at = sa.Column(sa.DateTime, default=None, nullable=False) - retry_args = sa.Column(sa.Text, nullable=False) - retry_kwargs = sa.Column(sa.Text, nullable=False) + retry_args = sa.Column(JsonBlob(), nullable=False) + retry_kwargs = sa.Column(JsonBlob(), nullable=False) retry_count = sa.Column(sa.Integer, nullable=False, default=0) - def get_retry_params(self): - return json.loads(self.retry_args), json.loads(self.retry_kwargs) - class Container(BASE, SoftDeleteMixIn, ModelBase): """Represents a Container for Secrets in the datastore. diff --git a/barbican/model/repositories.py b/barbican/model/repositories.py index aa143ac06..123d048b1 100755 --- a/barbican/model/repositories.py +++ b/barbican/model/repositories.py @@ -56,6 +56,7 @@ _KEK_DATUM_REPOSITORY = None _ORDER_PLUGIN_META_REPOSITORY = None _ORDER_BARBICAN_META_REPOSITORY = None _ORDER_REPOSITORY = None +_ORDER_RETRY_TASK_REPOSITORY = None _PREFERRED_CA_REPOSITORY = None _PROJECT_REPOSITORY = None _PROJECT_CA_REPOSITORY = None @@ -538,6 +539,10 @@ class ProjectRepo(BaseRepo): """Sub-class hook: build a retrieve query.""" return session.query(models.Project).filter_by(id=entity_id) + def _do_validate(self, values): + """Sub-class hook: validate values.""" + pass + def find_by_external_project_id(self, external_project_id, suppress_exception=False, session=None): session = self.get_session(session) @@ -1008,6 +1013,71 @@ class OrderBarbicanMetadatumRepo(BaseRepo): pass +class OrderRetryTaskRepo(BaseRepo): + """Repository for the OrderRetryTask entity.""" + + def get_by_create_date( + self, only_at_or_before_this_date=None, + offset_arg=None, limit_arg=None, + suppress_exception=False, + session=None): + """Returns a list of order retry task entities + + The list is ordered by the date they were created at and paged + based on the offset and limit fields. + + :param only_at_or_before_this_date: If specified, only entities at or + before this date are returned. + :param offset_arg: The entity number where the query result should + start. + :param limit_arg: The maximum amount of entities in the result set. + :param suppress_exception: Whether NoResultFound exceptions should be + suppressed. + :param session: SQLAlchemy session object. + + :returns: Tuple consisting of (list_of_entities, offset, limit, total). + """ + + offset, limit = clean_paging_values(offset_arg, limit_arg) + + session = self.get_session(session) + + query = session.query(models.OrderRetryTask) + query = query.order_by(models.OrderRetryTask.created_at) + query = query.filter_by(deleted=False) + if only_at_or_before_this_date: + query = query.filter( + models.OrderRetryTask.retry_at <= only_at_or_before_this_date) + + start = offset + end = offset + limit + LOG.debug('Retrieving from %s to %s', start, end) + total = query.count() + entities = query[start:end] + LOG.debug('Number entities retrieved: %s out of %s', + len(entities), total + ) + + if total <= 0 and not suppress_exception: + _raise_no_entities_found(self._do_entity_name()) + + return entities, offset, limit, total + + def _do_entity_name(self): + """Sub-class hook: return entity name, such as for debugging.""" + return "OrderRetryTask" + + def _do_build_get_query(self, entity_id, external_project_id, session): + """Sub-class hook: build a retrieve query.""" + query = session.query(models.OrderRetryTask) + query = query.filter_by(id=entity_id, deleted=False) + return query + + def _do_validate(self, values): + """Sub-class hook: validate values.""" + pass + + class ContainerRepo(BaseRepo): """Repository for the Container entity.""" @@ -1591,6 +1661,12 @@ def get_order_repository(): return _get_repository(_ORDER_REPOSITORY, OrderRepo) +def get_order_retry_tasks_repository(): + """Returns a singleton OrderRetryTask repository instance.""" + global _ORDER_RETRY_TASK_REPOSITORY + return _get_repository(_ORDER_RETRY_TASK_REPOSITORY, OrderRetryTaskRepo) + + def get_preferred_ca_repository(): """Returns a singleton Secret repository instance.""" global _PREFERRED_CA_REPOSITORY diff --git a/barbican/queue/server.py b/barbican/queue/server.py index 0b0666385..1ff958631 100644 --- a/barbican/queue/server.py +++ b/barbican/queue/server.py @@ -16,6 +16,7 @@ """ Server-side (i.e. worker side) classes and logic. """ +import datetime import functools try: @@ -29,6 +30,7 @@ from oslo_config import cfg from barbican.common import utils from barbican import i18n as u +from barbican.model import models from barbican.model import repositories from barbican.openstack.common import service from barbican import queue @@ -50,13 +52,13 @@ MAP_RETRY_TASKS = { } -def retryable(fn): - """Provides retry/scheduling support to tasks.""" +def retryable_order(fn): + """Provides retry/scheduling support to Order-related tasks.""" @functools.wraps(fn) def wrapper(method_self, *args, **kwargs): result = fn(method_self, *args, **kwargs) - retry_rpc_method = schedule_retry_tasks( + retry_rpc_method = schedule_order_retry_tasks( fn, result, *args, **kwargs) if retry_rpc_method: LOG.info( @@ -131,14 +133,16 @@ def monitored(fn): # pragma: no cover return fn -def schedule_retry_tasks(invoked_task, retry_result, *args, **kwargs): - """Schedules a task for retry. +def schedule_order_retry_tasks( + invoked_task, retry_result, order_id, *args, **kwargs): + """Schedules an Order-related task for retry. :param invoked_task: The RPC method that was just invoked. :param retry_result: A :class:`FollowOnProcessingStatusDTO` if follow-on processing (such as retrying this or another task) is required, otherwise None indicates no such follow-on processing is required. + :param order_id: ID of the Order entity the task to retry is for. :param args: List of arguments passed in to the just-invoked task. :param kwargs: Dict of arguments passed in to the just-invoked task. :return: Returns the RPC task method scheduled for a retry, None if no RPC @@ -147,7 +151,7 @@ def schedule_retry_tasks(invoked_task, retry_result, *args, **kwargs): retry_rpc_method = None - if not retry_result: + if not retry_result or not order_id: pass elif common.RetryTasks.INVOKE_SAME_TASK == retry_result.retry_task: @@ -159,10 +163,23 @@ def schedule_retry_tasks(invoked_task, retry_result, *args, **kwargs): retry_rpc_method = MAP_RETRY_TASKS.get(retry_result.retry_task) if retry_rpc_method: - # TODO(john-wood-w) Add retry task to the retry table here. LOG.debug( 'Scheduling RPC method for retry: {0}'.format(retry_rpc_method)) + date_to_retry_at = datetime.datetime.now() + datetime.timedelta( + milliseconds=retry_result.retry_msec) + + retry_model = models.OrderRetryTask() + retry_model.order_id = order_id + retry_model.retry_task = retry_rpc_method + retry_model.retry_at = date_to_retry_at + retry_model.retry_args = args + retry_model.retry_kwargs = kwargs + retry_model.retry_count = 0 + + retry_repo = repositories.get_order_retry_tasks_repository() + retry_repo.create_from(retry_model) + return retry_rpc_method @@ -183,7 +200,7 @@ class Tasks(object): @monitored @transactional - @retryable + @retryable_order def process_type_order(self, context, order_id, project_id): """Process TypeOrder.""" LOG.info( @@ -194,7 +211,7 @@ class Tasks(object): @monitored @transactional - @retryable + @retryable_order def update_order(self, context, order_id, project_id, updated_meta): """Update Order.""" LOG.info( diff --git a/barbican/tests/database_utils.py b/barbican/tests/database_utils.py index fb4ba801d..4122d2e8d 100644 --- a/barbican/tests/database_utils.py +++ b/barbican/tests/database_utils.py @@ -19,6 +19,7 @@ break the DevStack functional test discovery process. """ import oslotest.base as oslotest +from barbican.model import models from barbican.model import repositories @@ -40,6 +41,26 @@ def in_memory_cleanup(): repositories.clear() +def get_session(): + return repositories.get_session() + + +def create_project(external_id="my keystone id", session=None): + project = models.Project() + project.external_id = external_id + project_repo = repositories.get_project_repository() + project_repo.create_from(project, session=session) + return project + + +def create_order(project, session=None): + order = models.Order() + order.project_id = project.id + order_repo = repositories.get_order_repository() + order_repo.create_from(order, session=session) + return order + + class RepositoryTestCase(oslotest.BaseTestCase): """Base test case class for in-memory database unit tests. diff --git a/barbican/tests/model/repositories/test_repositories_order_retry_tasks.py b/barbican/tests/model/repositories/test_repositories_order_retry_tasks.py new file mode 100644 index 000000000..c7473c64b --- /dev/null +++ b/barbican/tests/model/repositories/test_repositories_order_retry_tasks.py @@ -0,0 +1,132 @@ +# Copyright (c) 2015 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import time + +from oslo_config import cfg + +from barbican.common import exception +from barbican.model import models +from barbican.model import repositories +from barbican.tests import database_utils + + +class WhenTestingOrderRetryTaskRepository(database_utils.RepositoryTestCase): + + def setUp(self): + super(WhenTestingOrderRetryTaskRepository, self).setUp() + + self.date_time_now = datetime.datetime.now() + self.test_args = ['test', 'args'] + self.test_kwargs = {'test': 1, 'kwargs': 2} + + self.repo = repositories.OrderRetryTaskRepo() + self.order_repo = repositories.OrderRepo() + + def test_get_order_retry_task(self): + session = self.repo.get_session() + + order_retry_task = self._create_retry_task(session) + + order_retry_task_from_get = self.repo.get( + order_retry_task.id, + session=session, + ) + + self.assertEqual(order_retry_task.id, order_retry_task_from_get.id) + self.assertEqual( + self.date_time_now, order_retry_task_from_get.retry_at) + self.assertEqual(u'retry-task', order_retry_task_from_get.retry_task) + self.assertEqual(self.test_args, order_retry_task_from_get.retry_args) + self.assertEqual(self.test_kwargs, + order_retry_task_from_get.retry_kwargs) + + def test_get_order_retry_task_filtered_by_retry_time(self): + session = self.repo.get_session() + + future_seconds = 3 + date_time_future = ( + self.date_time_now + datetime.timedelta(seconds=future_seconds) + ) + + order_retry_task = self._create_retry_task( + session, retry_at=date_time_future) + + # A retrieve by the current time should return no entries, as the only + # retry record is set into the future. + entities, offset, limit, total = self.repo.get_by_create_date( + only_at_or_before_this_date=self.date_time_now, + session=session, + suppress_exception=True + ) + self.assertEqual(0, total) + self.assertEqual([], entities) + + # Wait until the future time is the current time. + time.sleep(2 * future_seconds) + + # Now, a retrieve by the current time should return our entry. + entities, offset, limit, total = self.repo.get_by_create_date( + only_at_or_before_this_date=datetime.datetime.now(), + session=session, + suppress_exception=True + ) + self.assertEqual(1, total) + + # Verify that retry task record is what we put in originally. + order_retry_task_from_get = entities[0] + self.assertEqual(order_retry_task.id, order_retry_task_from_get.id) + self.assertEqual(date_time_future, order_retry_task_from_get.retry_at) + self.assertEqual(u'retry-task', order_retry_task_from_get.retry_task) + self.assertEqual(self.test_args, order_retry_task_from_get.retry_args) + self.assertEqual(self.test_kwargs, + order_retry_task_from_get.retry_kwargs) + + def test_should_raise_no_result_found_no_exception(self): + session = self.repo.get_session() + + entities, offset, limit, total = self.repo.get_by_create_date( + session=session, + suppress_exception=True) + + self.assertEqual([], entities) + self.assertEqual(0, offset) + self.assertEqual(cfg.CONF.default_limit_paging, limit) + self.assertEqual(0, total) + + def test_should_raise_no_result_found_with_exceptions(self): + session = self.repo.get_session() + + self.assertRaises( + exception.NotFound, + self.repo.get_by_create_date, + session=session, + suppress_exception=False) + + def _create_retry_task(self, session, retry_at=None): + project = database_utils.create_project(session=session) + order = database_utils.create_order(project, session=session) + + order_retry_task = models.OrderRetryTask() + order_retry_task.order_id = order.id + order_retry_task.retry_task = u'retry-task' + order_retry_task.retry_at = retry_at or self.date_time_now + order_retry_task.retry_args = self.test_args + order_retry_task.retry_kwargs = self.test_kwargs + self.repo.create_from(order_retry_task, session=session) + + session.commit() + + return order_retry_task diff --git a/barbican/tests/model/test_models.py b/barbican/tests/model/test_models.py index ab2addb6f..fdfb22931 100644 --- a/barbican/tests/model/test_models.py +++ b/barbican/tests/model/test_models.py @@ -16,7 +16,6 @@ import datetime from barbican.model import models -from barbican.openstack.common import jsonutils as json from barbican.plugin.interface import secret_store from barbican.tests import utils @@ -201,35 +200,23 @@ class WhenCreatingOrderRetryTask(utils.BaseTestCase): 'sub_status_message': 'Waiting for instructions...' }) at = datetime.datetime.utcnow() - order_retry_task = models.OrderRetryTask( - order_id=order.id, - retry_task="foobar", - retry_at=at, - retry_args=json.dumps(["one", "two"]), - retry_kwargs=json.dumps({"three": "four"}), - ) + order_retry_task = models.OrderRetryTask() + order_retry_task.order_id = order.id + order_retry_task.retry_task = "foobar" + order_retry_task.retry_at = at + order_retry_task.retry_args = ["one", "two"] + order_retry_task.retry_kwargs = {"three": "four"} self.assertEqual(order_retry_task.order_id, order.id) self.assertEqual(order_retry_task.retry_task, "foobar") self.assertEqual(order_retry_task.retry_at, at) self.assertEqual( + ["one", "two"], order_retry_task.retry_args, - json.dumps(["one", "two"]), ) self.assertEqual( + {"three": "four"}, order_retry_task.retry_kwargs, - json.dumps({"three": "four"}), - ) - - def test_get_retry_params(self): - order_retry_task = models.OrderRetryTask( - retry_args=json.dumps(["one", "two"]), - retry_kwargs=json.dumps({"three": "four"}), - ) - - self.assertEqual( - order_retry_task.get_retry_params(), - (["one", "two"], {"three": "four"}), ) diff --git a/barbican/tests/queue/test_server.py b/barbican/tests/queue/test_server.py index b3da68a89..488648e82 100644 --- a/barbican/tests/queue/test_server.py +++ b/barbican/tests/queue/test_server.py @@ -12,11 +12,15 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +import datetime + import mock +from barbican.model import repositories from barbican import queue from barbican.queue import server from barbican.tasks import common +from barbican.tests import database_utils from barbican.tests import utils @@ -101,33 +105,36 @@ class WhenUsingTransactionalDecorator(utils.BaseTestCase): self.assertEqual(self.clear_mock.call_count, 1) -class WhenUsingRetryableDecorator(utils.BaseTestCase): - """Test using the 'retryable' decorator in server.py.""" +class WhenUsingRetryableOrderDecorator(utils.BaseTestCase): + """Test using the 'retryable_order' decorator in server.py.""" def setUp(self): - super(WhenUsingRetryableDecorator, self).setUp() + super(WhenUsingRetryableOrderDecorator, self).setUp() self.schedule_retry_tasks_patcher = mock.patch( - 'barbican.queue.server.schedule_retry_tasks' + 'barbican.queue.server.schedule_order_retry_tasks' ) self.schedule_retry_tasks_mock = ( self.schedule_retry_tasks_patcher.start() ) + self.order_id = 'order-id' self.args = ('foo', 'bar') self.kwargs = {'k_foo': 1, 'k_bar': 2} # Class/decorator under test. class TestClass(object): + self.order_id = None my_args = None my_kwargs = None is_exception_needed = False result = common.FollowOnProcessingStatusDTO() - @server.retryable - def test_method(self, *args, **kwargs): + @server.retryable_order + def test_method(self, order_id, *args, **kwargs): if self.is_exception_needed: raise ValueError() + self.order_id = order_id self.my_args = args self.my_kwargs = kwargs return self.result @@ -135,12 +142,13 @@ class WhenUsingRetryableDecorator(utils.BaseTestCase): self.test_method = TestClass.test_method def tearDown(self): - super(WhenUsingRetryableDecorator, self).tearDown() + super(WhenUsingRetryableOrderDecorator, self).tearDown() self.schedule_retry_tasks_patcher.stop() def test_should_successfully_schedule_a_task_for_retry(self): - self.test_object.test_method(*self.args, **self.kwargs) + self.test_object.test_method(self.order_id, *self.args, **self.kwargs) + self.assertEqual(self.order_id, self.test_object.order_id) self.assertEqual(self.args, self.test_object.my_args) self.assertEqual(self.kwargs, self.test_object.my_kwargs) @@ -148,6 +156,7 @@ class WhenUsingRetryableDecorator(utils.BaseTestCase): self.schedule_retry_tasks_mock.assert_called_with( mock.ANY, self.test_object.result, + self.order_id, *self.args, **self.kwargs) @@ -157,6 +166,7 @@ class WhenUsingRetryableDecorator(utils.BaseTestCase): self.assertRaises( ValueError, self.test_object.test_method, + self.order_id, self.args, self.kwargs, ) @@ -164,44 +174,92 @@ class WhenUsingRetryableDecorator(utils.BaseTestCase): self.assertEqual(self.schedule_retry_tasks_mock.call_count, 0) -class WhenCallingScheduleRetryTasks(utils.BaseTestCase): - """Test calling schedule_retry_tasks() in server.py.""" +class WhenCallingScheduleOrderRetryTasks(database_utils.RepositoryTestCase): + """Test calling schedule_order_retry_tasks() in server.py.""" def setUp(self): - super(WhenCallingScheduleRetryTasks, self).setUp() + super(WhenCallingScheduleOrderRetryTasks, self).setUp() + + self.project = database_utils.create_project() + self.order = database_utils.create_order(self.project) + database_utils.get_session().commit() + + self.repo = repositories.OrderRetryTaskRepo() self.result = common.FollowOnProcessingStatusDTO() + self.args = ['args-foo', 'args-bar'] + self.kwargs = {'foo': 1, 'bar': 2} + self.date_to_retry_at = datetime.datetime.now() + datetime.timedelta( + milliseconds=self.result.retry_msec) + def test_should_not_schedule_task_due_to_no_result(self): - retry_rpc_method = server.schedule_retry_tasks(None, None) + retry_rpc_method = server.schedule_order_retry_tasks(None, None, None) self.assertIsNone(retry_rpc_method) def test_should_not_schedule_task_due_to_no_action_required_result(self): self.result.retry_task = common.RetryTasks.NO_ACTION_REQUIRED - retry_rpc_method = server.schedule_retry_tasks(None, self.result) + retry_rpc_method = server.schedule_order_retry_tasks( + None, self.result, None) self.assertIsNone(retry_rpc_method) def test_should_schedule_invoking_task_for_retry(self): self.result.retry_task = common.RetryTasks.INVOKE_SAME_TASK - retry_rpc_method = server.schedule_retry_tasks( - self.test_should_schedule_invoking_task_for_retry, self.result) + # Schedule this test method as the passed-in 'retry' function. + retry_rpc_method = server.schedule_order_retry_tasks( + self.test_should_schedule_invoking_task_for_retry, + self.result, + self.order.id, + *self.args, + **self.kwargs) + database_utils.get_session().commit() # Flush to the database. self.assertEqual( 'test_should_schedule_invoking_task_for_retry', retry_rpc_method) + self._verify_retry_task_entity( + 'test_should_schedule_invoking_task_for_retry') def test_should_schedule_certificate_status_task_for_retry(self): self.result.retry_task = ( common.RetryTasks.INVOKE_CERT_STATUS_CHECK_TASK ) - retry_rpc_method = server.schedule_retry_tasks(None, self.result) + # Schedule this test method as the passed-in 'retry' function. + retry_rpc_method = server.schedule_order_retry_tasks( + None, # Should be ignored for non-self retries. + self.result, + self.order.id, + *self.args, + **self.kwargs) + database_utils.get_session().commit() # Flush to the database. self.assertEqual( 'check_certificate_status', retry_rpc_method) + self._verify_retry_task_entity( + 'check_certificate_status') + + def _verify_retry_task_entity(self, retry_task): + # Retrieve the task retry entity created above and verify it. + entities, offset, limit, total = self.repo.get_by_create_date() + self.assertEqual(1, total) + retry_model = entities[0] + self.assertEqual(retry_task, retry_model.retry_task) + self.assertEqual(self.args, retry_model.retry_args) + self.assertEqual(self.kwargs, retry_model.retry_kwargs) + self.assertEqual(0, retry_model.retry_count) + + # Compare retry_at times. + # Note that the expected retry_at time is computed at setUp() time, but + # the retry_at time on the task retry entity/model is computed and set + # a few milliseconds after this setUp() time, hence they will vary by a + # small amount of time. + delta = retry_model.retry_at - self.date_to_retry_at + delta_seconds = delta.seconds + self.assertEqual(True, delta_seconds <= 2) class WhenUsingBeginTypeOrderTask(utils.BaseTestCase): @@ -233,9 +291,8 @@ class WhenUsingBeginTypeOrderTask(utils.BaseTestCase): def test_should_process_order(self, mock_begin_order): mock_begin_order.return_value.process.return_value = None - self.tasks.process_type_order(context=None, - order_id=self.order_id, - project_id=self.external_project_id) + self.tasks.process_type_order( + None, self.order_id, self.external_project_id) mock_begin_order.return_value.process.assert_called_with( self.order_id, self.external_project_id) @@ -245,10 +302,8 @@ class WhenUsingBeginTypeOrderTask(utils.BaseTestCase): mock_update_order.return_value.process.return_value = None updated_meta = {} - self.tasks.update_order(context=None, - order_id=self.order_id, - project_id=self.external_project_id, - updated_meta=updated_meta) + self.tasks.update_order( + None, self.order_id, self.external_project_id, updated_meta) mock_update_order.return_value.process.assert_called_with( self.order_id, self.external_project_id, updated_meta )