Write task retry info to database from server.py

Continue the server.py work to handle retrying tasks, by writing the
retry information to the OrderTaskRetry database table. The next CR
will close the loop, with the periodic retry scheduler process reading
from this same table and then sending RPC tasks to the queue to retry.

Change-Id: Ibff2bc6924729aced68a40fa7329f279b6d528c7
This commit is contained in:
jfwood 2015-03-20 15:19:10 -05:00
parent 813c0d8558
commit edb3c6d7b1
7 changed files with 344 additions and 59 deletions

View File

@ -634,7 +634,7 @@ class OrderBarbicanMetadatum(BASE, SoftDeleteMixIn, ModelBase):
'value': self.value}
class OrderRetryTask(BASE, SoftDeleteMixIn):
class OrderRetryTask(BASE, SoftDeleteMixIn, ModelBase):
__tablename__ = "order_retry_tasks"
__table_args__ = {"mysql_engine": "InnoDB"}
@ -648,13 +648,10 @@ class OrderRetryTask(BASE, SoftDeleteMixIn):
)
retry_task = sa.Column(sa.Text, nullable=False)
retry_at = sa.Column(sa.DateTime, default=None, nullable=False)
retry_args = sa.Column(sa.Text, nullable=False)
retry_kwargs = sa.Column(sa.Text, nullable=False)
retry_args = sa.Column(JsonBlob(), nullable=False)
retry_kwargs = sa.Column(JsonBlob(), nullable=False)
retry_count = sa.Column(sa.Integer, nullable=False, default=0)
def get_retry_params(self):
return json.loads(self.retry_args), json.loads(self.retry_kwargs)
class Container(BASE, SoftDeleteMixIn, ModelBase):
"""Represents a Container for Secrets in the datastore.

View File

@ -56,6 +56,7 @@ _KEK_DATUM_REPOSITORY = None
_ORDER_PLUGIN_META_REPOSITORY = None
_ORDER_BARBICAN_META_REPOSITORY = None
_ORDER_REPOSITORY = None
_ORDER_RETRY_TASK_REPOSITORY = None
_PREFERRED_CA_REPOSITORY = None
_PROJECT_REPOSITORY = None
_PROJECT_CA_REPOSITORY = None
@ -538,6 +539,10 @@ class ProjectRepo(BaseRepo):
"""Sub-class hook: build a retrieve query."""
return session.query(models.Project).filter_by(id=entity_id)
def _do_validate(self, values):
"""Sub-class hook: validate values."""
pass
def find_by_external_project_id(self, external_project_id,
suppress_exception=False, session=None):
session = self.get_session(session)
@ -1008,6 +1013,71 @@ class OrderBarbicanMetadatumRepo(BaseRepo):
pass
class OrderRetryTaskRepo(BaseRepo):
"""Repository for the OrderRetryTask entity."""
def get_by_create_date(
self, only_at_or_before_this_date=None,
offset_arg=None, limit_arg=None,
suppress_exception=False,
session=None):
"""Returns a list of order retry task entities
The list is ordered by the date they were created at and paged
based on the offset and limit fields.
:param only_at_or_before_this_date: If specified, only entities at or
before this date are returned.
:param offset_arg: The entity number where the query result should
start.
:param limit_arg: The maximum amount of entities in the result set.
:param suppress_exception: Whether NoResultFound exceptions should be
suppressed.
:param session: SQLAlchemy session object.
:returns: Tuple consisting of (list_of_entities, offset, limit, total).
"""
offset, limit = clean_paging_values(offset_arg, limit_arg)
session = self.get_session(session)
query = session.query(models.OrderRetryTask)
query = query.order_by(models.OrderRetryTask.created_at)
query = query.filter_by(deleted=False)
if only_at_or_before_this_date:
query = query.filter(
models.OrderRetryTask.retry_at <= only_at_or_before_this_date)
start = offset
end = offset + limit
LOG.debug('Retrieving from %s to %s', start, end)
total = query.count()
entities = query[start:end]
LOG.debug('Number entities retrieved: %s out of %s',
len(entities), total
)
if total <= 0 and not suppress_exception:
_raise_no_entities_found(self._do_entity_name())
return entities, offset, limit, total
def _do_entity_name(self):
"""Sub-class hook: return entity name, such as for debugging."""
return "OrderRetryTask"
def _do_build_get_query(self, entity_id, external_project_id, session):
"""Sub-class hook: build a retrieve query."""
query = session.query(models.OrderRetryTask)
query = query.filter_by(id=entity_id, deleted=False)
return query
def _do_validate(self, values):
"""Sub-class hook: validate values."""
pass
class ContainerRepo(BaseRepo):
"""Repository for the Container entity."""
@ -1591,6 +1661,12 @@ def get_order_repository():
return _get_repository(_ORDER_REPOSITORY, OrderRepo)
def get_order_retry_tasks_repository():
"""Returns a singleton OrderRetryTask repository instance."""
global _ORDER_RETRY_TASK_REPOSITORY
return _get_repository(_ORDER_RETRY_TASK_REPOSITORY, OrderRetryTaskRepo)
def get_preferred_ca_repository():
"""Returns a singleton Secret repository instance."""
global _PREFERRED_CA_REPOSITORY

View File

@ -16,6 +16,7 @@
"""
Server-side (i.e. worker side) classes and logic.
"""
import datetime
import functools
try:
@ -29,6 +30,7 @@ from oslo_config import cfg
from barbican.common import utils
from barbican import i18n as u
from barbican.model import models
from barbican.model import repositories
from barbican.openstack.common import service
from barbican import queue
@ -50,13 +52,13 @@ MAP_RETRY_TASKS = {
}
def retryable(fn):
"""Provides retry/scheduling support to tasks."""
def retryable_order(fn):
"""Provides retry/scheduling support to Order-related tasks."""
@functools.wraps(fn)
def wrapper(method_self, *args, **kwargs):
result = fn(method_self, *args, **kwargs)
retry_rpc_method = schedule_retry_tasks(
retry_rpc_method = schedule_order_retry_tasks(
fn, result, *args, **kwargs)
if retry_rpc_method:
LOG.info(
@ -131,14 +133,16 @@ def monitored(fn): # pragma: no cover
return fn
def schedule_retry_tasks(invoked_task, retry_result, *args, **kwargs):
"""Schedules a task for retry.
def schedule_order_retry_tasks(
invoked_task, retry_result, order_id, *args, **kwargs):
"""Schedules an Order-related task for retry.
:param invoked_task: The RPC method that was just invoked.
:param retry_result: A :class:`FollowOnProcessingStatusDTO` if follow-on
processing (such as retrying this or another task) is
required, otherwise None indicates no such follow-on
processing is required.
:param order_id: ID of the Order entity the task to retry is for.
:param args: List of arguments passed in to the just-invoked task.
:param kwargs: Dict of arguments passed in to the just-invoked task.
:return: Returns the RPC task method scheduled for a retry, None if no RPC
@ -147,7 +151,7 @@ def schedule_retry_tasks(invoked_task, retry_result, *args, **kwargs):
retry_rpc_method = None
if not retry_result:
if not retry_result or not order_id:
pass
elif common.RetryTasks.INVOKE_SAME_TASK == retry_result.retry_task:
@ -159,10 +163,23 @@ def schedule_retry_tasks(invoked_task, retry_result, *args, **kwargs):
retry_rpc_method = MAP_RETRY_TASKS.get(retry_result.retry_task)
if retry_rpc_method:
# TODO(john-wood-w) Add retry task to the retry table here.
LOG.debug(
'Scheduling RPC method for retry: {0}'.format(retry_rpc_method))
date_to_retry_at = datetime.datetime.now() + datetime.timedelta(
milliseconds=retry_result.retry_msec)
retry_model = models.OrderRetryTask()
retry_model.order_id = order_id
retry_model.retry_task = retry_rpc_method
retry_model.retry_at = date_to_retry_at
retry_model.retry_args = args
retry_model.retry_kwargs = kwargs
retry_model.retry_count = 0
retry_repo = repositories.get_order_retry_tasks_repository()
retry_repo.create_from(retry_model)
return retry_rpc_method
@ -183,7 +200,7 @@ class Tasks(object):
@monitored
@transactional
@retryable
@retryable_order
def process_type_order(self, context, order_id, project_id):
"""Process TypeOrder."""
LOG.info(
@ -194,7 +211,7 @@ class Tasks(object):
@monitored
@transactional
@retryable
@retryable_order
def update_order(self, context, order_id, project_id, updated_meta):
"""Update Order."""
LOG.info(

View File

@ -19,6 +19,7 @@ break the DevStack functional test discovery process.
"""
import oslotest.base as oslotest
from barbican.model import models
from barbican.model import repositories
@ -40,6 +41,26 @@ def in_memory_cleanup():
repositories.clear()
def get_session():
return repositories.get_session()
def create_project(external_id="my keystone id", session=None):
project = models.Project()
project.external_id = external_id
project_repo = repositories.get_project_repository()
project_repo.create_from(project, session=session)
return project
def create_order(project, session=None):
order = models.Order()
order.project_id = project.id
order_repo = repositories.get_order_repository()
order_repo.create_from(order, session=session)
return order
class RepositoryTestCase(oslotest.BaseTestCase):
"""Base test case class for in-memory database unit tests.

View File

@ -0,0 +1,132 @@
# Copyright (c) 2015 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import time
from oslo_config import cfg
from barbican.common import exception
from barbican.model import models
from barbican.model import repositories
from barbican.tests import database_utils
class WhenTestingOrderRetryTaskRepository(database_utils.RepositoryTestCase):
def setUp(self):
super(WhenTestingOrderRetryTaskRepository, self).setUp()
self.date_time_now = datetime.datetime.now()
self.test_args = ['test', 'args']
self.test_kwargs = {'test': 1, 'kwargs': 2}
self.repo = repositories.OrderRetryTaskRepo()
self.order_repo = repositories.OrderRepo()
def test_get_order_retry_task(self):
session = self.repo.get_session()
order_retry_task = self._create_retry_task(session)
order_retry_task_from_get = self.repo.get(
order_retry_task.id,
session=session,
)
self.assertEqual(order_retry_task.id, order_retry_task_from_get.id)
self.assertEqual(
self.date_time_now, order_retry_task_from_get.retry_at)
self.assertEqual(u'retry-task', order_retry_task_from_get.retry_task)
self.assertEqual(self.test_args, order_retry_task_from_get.retry_args)
self.assertEqual(self.test_kwargs,
order_retry_task_from_get.retry_kwargs)
def test_get_order_retry_task_filtered_by_retry_time(self):
session = self.repo.get_session()
future_seconds = 3
date_time_future = (
self.date_time_now + datetime.timedelta(seconds=future_seconds)
)
order_retry_task = self._create_retry_task(
session, retry_at=date_time_future)
# A retrieve by the current time should return no entries, as the only
# retry record is set into the future.
entities, offset, limit, total = self.repo.get_by_create_date(
only_at_or_before_this_date=self.date_time_now,
session=session,
suppress_exception=True
)
self.assertEqual(0, total)
self.assertEqual([], entities)
# Wait until the future time is the current time.
time.sleep(2 * future_seconds)
# Now, a retrieve by the current time should return our entry.
entities, offset, limit, total = self.repo.get_by_create_date(
only_at_or_before_this_date=datetime.datetime.now(),
session=session,
suppress_exception=True
)
self.assertEqual(1, total)
# Verify that retry task record is what we put in originally.
order_retry_task_from_get = entities[0]
self.assertEqual(order_retry_task.id, order_retry_task_from_get.id)
self.assertEqual(date_time_future, order_retry_task_from_get.retry_at)
self.assertEqual(u'retry-task', order_retry_task_from_get.retry_task)
self.assertEqual(self.test_args, order_retry_task_from_get.retry_args)
self.assertEqual(self.test_kwargs,
order_retry_task_from_get.retry_kwargs)
def test_should_raise_no_result_found_no_exception(self):
session = self.repo.get_session()
entities, offset, limit, total = self.repo.get_by_create_date(
session=session,
suppress_exception=True)
self.assertEqual([], entities)
self.assertEqual(0, offset)
self.assertEqual(cfg.CONF.default_limit_paging, limit)
self.assertEqual(0, total)
def test_should_raise_no_result_found_with_exceptions(self):
session = self.repo.get_session()
self.assertRaises(
exception.NotFound,
self.repo.get_by_create_date,
session=session,
suppress_exception=False)
def _create_retry_task(self, session, retry_at=None):
project = database_utils.create_project(session=session)
order = database_utils.create_order(project, session=session)
order_retry_task = models.OrderRetryTask()
order_retry_task.order_id = order.id
order_retry_task.retry_task = u'retry-task'
order_retry_task.retry_at = retry_at or self.date_time_now
order_retry_task.retry_args = self.test_args
order_retry_task.retry_kwargs = self.test_kwargs
self.repo.create_from(order_retry_task, session=session)
session.commit()
return order_retry_task

View File

@ -16,7 +16,6 @@
import datetime
from barbican.model import models
from barbican.openstack.common import jsonutils as json
from barbican.plugin.interface import secret_store
from barbican.tests import utils
@ -201,35 +200,23 @@ class WhenCreatingOrderRetryTask(utils.BaseTestCase):
'sub_status_message': 'Waiting for instructions...'
})
at = datetime.datetime.utcnow()
order_retry_task = models.OrderRetryTask(
order_id=order.id,
retry_task="foobar",
retry_at=at,
retry_args=json.dumps(["one", "two"]),
retry_kwargs=json.dumps({"three": "four"}),
)
order_retry_task = models.OrderRetryTask()
order_retry_task.order_id = order.id
order_retry_task.retry_task = "foobar"
order_retry_task.retry_at = at
order_retry_task.retry_args = ["one", "two"]
order_retry_task.retry_kwargs = {"three": "four"}
self.assertEqual(order_retry_task.order_id, order.id)
self.assertEqual(order_retry_task.retry_task, "foobar")
self.assertEqual(order_retry_task.retry_at, at)
self.assertEqual(
["one", "two"],
order_retry_task.retry_args,
json.dumps(["one", "two"]),
)
self.assertEqual(
{"three": "four"},
order_retry_task.retry_kwargs,
json.dumps({"three": "four"}),
)
def test_get_retry_params(self):
order_retry_task = models.OrderRetryTask(
retry_args=json.dumps(["one", "two"]),
retry_kwargs=json.dumps({"three": "four"}),
)
self.assertEqual(
order_retry_task.get_retry_params(),
(["one", "two"], {"three": "four"}),
)

View File

@ -12,11 +12,15 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import mock
from barbican.model import repositories
from barbican import queue
from barbican.queue import server
from barbican.tasks import common
from barbican.tests import database_utils
from barbican.tests import utils
@ -101,33 +105,36 @@ class WhenUsingTransactionalDecorator(utils.BaseTestCase):
self.assertEqual(self.clear_mock.call_count, 1)
class WhenUsingRetryableDecorator(utils.BaseTestCase):
"""Test using the 'retryable' decorator in server.py."""
class WhenUsingRetryableOrderDecorator(utils.BaseTestCase):
"""Test using the 'retryable_order' decorator in server.py."""
def setUp(self):
super(WhenUsingRetryableDecorator, self).setUp()
super(WhenUsingRetryableOrderDecorator, self).setUp()
self.schedule_retry_tasks_patcher = mock.patch(
'barbican.queue.server.schedule_retry_tasks'
'barbican.queue.server.schedule_order_retry_tasks'
)
self.schedule_retry_tasks_mock = (
self.schedule_retry_tasks_patcher.start()
)
self.order_id = 'order-id'
self.args = ('foo', 'bar')
self.kwargs = {'k_foo': 1, 'k_bar': 2}
# Class/decorator under test.
class TestClass(object):
self.order_id = None
my_args = None
my_kwargs = None
is_exception_needed = False
result = common.FollowOnProcessingStatusDTO()
@server.retryable
def test_method(self, *args, **kwargs):
@server.retryable_order
def test_method(self, order_id, *args, **kwargs):
if self.is_exception_needed:
raise ValueError()
self.order_id = order_id
self.my_args = args
self.my_kwargs = kwargs
return self.result
@ -135,12 +142,13 @@ class WhenUsingRetryableDecorator(utils.BaseTestCase):
self.test_method = TestClass.test_method
def tearDown(self):
super(WhenUsingRetryableDecorator, self).tearDown()
super(WhenUsingRetryableOrderDecorator, self).tearDown()
self.schedule_retry_tasks_patcher.stop()
def test_should_successfully_schedule_a_task_for_retry(self):
self.test_object.test_method(*self.args, **self.kwargs)
self.test_object.test_method(self.order_id, *self.args, **self.kwargs)
self.assertEqual(self.order_id, self.test_object.order_id)
self.assertEqual(self.args, self.test_object.my_args)
self.assertEqual(self.kwargs, self.test_object.my_kwargs)
@ -148,6 +156,7 @@ class WhenUsingRetryableDecorator(utils.BaseTestCase):
self.schedule_retry_tasks_mock.assert_called_with(
mock.ANY,
self.test_object.result,
self.order_id,
*self.args,
**self.kwargs)
@ -157,6 +166,7 @@ class WhenUsingRetryableDecorator(utils.BaseTestCase):
self.assertRaises(
ValueError,
self.test_object.test_method,
self.order_id,
self.args,
self.kwargs,
)
@ -164,44 +174,92 @@ class WhenUsingRetryableDecorator(utils.BaseTestCase):
self.assertEqual(self.schedule_retry_tasks_mock.call_count, 0)
class WhenCallingScheduleRetryTasks(utils.BaseTestCase):
"""Test calling schedule_retry_tasks() in server.py."""
class WhenCallingScheduleOrderRetryTasks(database_utils.RepositoryTestCase):
"""Test calling schedule_order_retry_tasks() in server.py."""
def setUp(self):
super(WhenCallingScheduleRetryTasks, self).setUp()
super(WhenCallingScheduleOrderRetryTasks, self).setUp()
self.project = database_utils.create_project()
self.order = database_utils.create_order(self.project)
database_utils.get_session().commit()
self.repo = repositories.OrderRetryTaskRepo()
self.result = common.FollowOnProcessingStatusDTO()
self.args = ['args-foo', 'args-bar']
self.kwargs = {'foo': 1, 'bar': 2}
self.date_to_retry_at = datetime.datetime.now() + datetime.timedelta(
milliseconds=self.result.retry_msec)
def test_should_not_schedule_task_due_to_no_result(self):
retry_rpc_method = server.schedule_retry_tasks(None, None)
retry_rpc_method = server.schedule_order_retry_tasks(None, None, None)
self.assertIsNone(retry_rpc_method)
def test_should_not_schedule_task_due_to_no_action_required_result(self):
self.result.retry_task = common.RetryTasks.NO_ACTION_REQUIRED
retry_rpc_method = server.schedule_retry_tasks(None, self.result)
retry_rpc_method = server.schedule_order_retry_tasks(
None, self.result, None)
self.assertIsNone(retry_rpc_method)
def test_should_schedule_invoking_task_for_retry(self):
self.result.retry_task = common.RetryTasks.INVOKE_SAME_TASK
retry_rpc_method = server.schedule_retry_tasks(
self.test_should_schedule_invoking_task_for_retry, self.result)
# Schedule this test method as the passed-in 'retry' function.
retry_rpc_method = server.schedule_order_retry_tasks(
self.test_should_schedule_invoking_task_for_retry,
self.result,
self.order.id,
*self.args,
**self.kwargs)
database_utils.get_session().commit() # Flush to the database.
self.assertEqual(
'test_should_schedule_invoking_task_for_retry', retry_rpc_method)
self._verify_retry_task_entity(
'test_should_schedule_invoking_task_for_retry')
def test_should_schedule_certificate_status_task_for_retry(self):
self.result.retry_task = (
common.RetryTasks.INVOKE_CERT_STATUS_CHECK_TASK
)
retry_rpc_method = server.schedule_retry_tasks(None, self.result)
# Schedule this test method as the passed-in 'retry' function.
retry_rpc_method = server.schedule_order_retry_tasks(
None, # Should be ignored for non-self retries.
self.result,
self.order.id,
*self.args,
**self.kwargs)
database_utils.get_session().commit() # Flush to the database.
self.assertEqual(
'check_certificate_status', retry_rpc_method)
self._verify_retry_task_entity(
'check_certificate_status')
def _verify_retry_task_entity(self, retry_task):
# Retrieve the task retry entity created above and verify it.
entities, offset, limit, total = self.repo.get_by_create_date()
self.assertEqual(1, total)
retry_model = entities[0]
self.assertEqual(retry_task, retry_model.retry_task)
self.assertEqual(self.args, retry_model.retry_args)
self.assertEqual(self.kwargs, retry_model.retry_kwargs)
self.assertEqual(0, retry_model.retry_count)
# Compare retry_at times.
# Note that the expected retry_at time is computed at setUp() time, but
# the retry_at time on the task retry entity/model is computed and set
# a few milliseconds after this setUp() time, hence they will vary by a
# small amount of time.
delta = retry_model.retry_at - self.date_to_retry_at
delta_seconds = delta.seconds
self.assertEqual(True, delta_seconds <= 2)
class WhenUsingBeginTypeOrderTask(utils.BaseTestCase):
@ -233,9 +291,8 @@ class WhenUsingBeginTypeOrderTask(utils.BaseTestCase):
def test_should_process_order(self, mock_begin_order):
mock_begin_order.return_value.process.return_value = None
self.tasks.process_type_order(context=None,
order_id=self.order_id,
project_id=self.external_project_id)
self.tasks.process_type_order(
None, self.order_id, self.external_project_id)
mock_begin_order.return_value.process.assert_called_with(
self.order_id, self.external_project_id)
@ -245,10 +302,8 @@ class WhenUsingBeginTypeOrderTask(utils.BaseTestCase):
mock_update_order.return_value.process.return_value = None
updated_meta = {}
self.tasks.update_order(context=None,
order_id=self.order_id,
project_id=self.external_project_id,
updated_meta=updated_meta)
self.tasks.update_order(
None, self.order_id, self.external_project_id, updated_meta)
mock_update_order.return_value.process.assert_called_with(
self.order_id, self.external_project_id, updated_meta
)