Add sub-status logic to worker/task processing

The barbican.tasks.resources.BaseTask was modified to allow the
handle_processing() template method to return an optional status result
that contains optional sub-status and retry information that is then
used to determine if an entity (such as an Order) stays in the PENDING
state marked with sub-status data, or else terminates in the ERROR or
ACTIVE states.

Change-Id: I29e0e61e723f84a123b88b30db50746b6862b02d
This commit is contained in:
jfwood
2015-02-19 16:33:10 -06:00
parent f9a00d76ad
commit 87bb9141a2
8 changed files with 258 additions and 72 deletions

View File

@@ -35,6 +35,9 @@ from barbican.plugin.interface import secret_store
LOG = utils.getLogger(__name__)
BASE = declarative.declarative_base()
ERROR_REASON_LENGTH = 255
SUB_STATUS_LENGTH = 36
SUB_STATUS_MESSAGE_LENGTH = 255
# Allowed entity states
@@ -483,7 +486,7 @@ class Order(BASE, SoftDeleteMixIn, ModelBase):
index=True, nullable=False)
error_status_code = sa.Column(sa.String(16))
error_reason = sa.Column(sa.String(255))
error_reason = sa.Column(sa.String(ERROR_REASON_LENGTH))
meta = sa.Column(JsonBlob(), nullable=True)
@@ -491,8 +494,9 @@ class Order(BASE, SoftDeleteMixIn, ModelBase):
index=True, nullable=True)
container_id = sa.Column(sa.String(36), sa.ForeignKey('containers.id'),
index=True, nullable=True)
sub_status = sa.Column(sa.String(36), nullable=True)
sub_status_message = sa.Column(sa.String(255), nullable=True)
sub_status = sa.Column(sa.String(SUB_STATUS_LENGTH), nullable=True)
sub_status_message = sa.Column(sa.String(SUB_STATUS_MESSAGE_LENGTH),
nullable=True)
order_plugin_metadata = orm.relationship(
"OrderPluginMetadatum",
@@ -517,6 +521,20 @@ class Order(BASE, SoftDeleteMixIn, ModelBase):
self.sub_status_message = parsed_request.get(
'sub_status_message')
def set_error_reason_safely(self, error_reason_raw):
"""Ensure error reason does not raise database attribute exceptions."""
self.error_reason = error_reason_raw[:ERROR_REASON_LENGTH]
def set_sub_status_safely(self, sub_status_raw):
"""Ensure sub-status does not raise database attribute exceptions."""
self.sub_status = sub_status_raw[:SUB_STATUS_LENGTH]
def set_sub_status_message_safely(self, sub_status_message_raw):
"""Ensure status message doesn't raise database attrib. exceptions."""
self.sub_status_message = sub_status_message_raw[
:SUB_STATUS_MESSAGE_LENGTH
]
def _do_delete_children(self, session):
"""Sub-class hook: delete children relationships."""
for k, v in self.order_plugin_metadata.items():

View File

@@ -89,6 +89,10 @@ class NotificationTask(object):
except Exception:
# No need to log message here as task process method has
# already logged it
# TODO(john-wood-w) This really should be retried on a
# schedule and really only if the database is down, not
# for any exception otherwise tasks will be re-queued
# repeatedly. Revisit as part of the retry task work later.
if self.conf.keystone_notifications.allow_requeue:
return messaging.NotificationResult.REQUEUE
else:

View File

@@ -28,6 +28,7 @@ except ImportError:
from oslo_config import cfg
from barbican.common import utils
from barbican import i18n as u
from barbican.model import repositories
from barbican.openstack.common import service
from barbican import queue
@@ -46,13 +47,17 @@ def transactional(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
fn_name = getattr(fn, '__name__', '????')
if not queue.is_server_side():
fn(*args, **kwargs) # Non-server mode directly invokes tasks.
LOG.info(u._LI("Completed worker task: '%s'"), fn_name)
else:
# Manage session/transaction.
try:
fn(*args, **kwargs)
repositories.commit()
LOG.info(u._LI("Completed worker task: '%s'"), fn_name)
except Exception:
"""NOTE: Wrapped functions must process with care!
@@ -60,6 +65,10 @@ def transactional(fn):
including any updates made to entities such as setting error
codes and error messages.
"""
LOG.exception(
u._LE("Problem seen processing worker task: '%s'"),
fn_name
)
repositories.rollback()
finally:
repositories.clear()
@@ -113,25 +122,21 @@ class Tasks(object):
@transactional
def process_type_order(self, context, order_id, project_id):
"""Process TypeOrder."""
LOG.info('Processing TypeOrder: {0}'.format(order_id))
task = resources.BeginTypeOrder()
try:
task.process(order_id, project_id)
except Exception:
LOG.exception(">>>>> Task exception seen, details reported "
"on the Orders entity.")
LOG.info(
u._LI("Processing type order: order ID is '%s'"),
order_id
)
resources.BeginTypeOrder().process(order_id, project_id)
@monitored
@transactional
def update_order(self, context, order_id, project_id, updated_meta):
"""Update Order."""
LOG.info('Updating TypeOrder: {0}'.format(order_id))
task = resources.UpdateOrder()
try:
task.process(order_id, project_id, updated_meta)
except Exception:
LOG.exception(">>>>> Task exception seen, details reported "
"on the Orders entity.")
LOG.info(
u._LI("Processing update order: order ID is '%s'"),
order_id
)
resources.UpdateOrder().process(order_id, project_id, updated_meta)
class TaskServer(Tasks, service.Service):
@@ -160,11 +165,11 @@ class TaskServer(Tasks, service.Service):
endpoints=[self])
def start(self):
LOG.info("Starting the TaskServer")
LOG.info(u._LI("Starting the TaskServer"))
self._server.start()
super(TaskServer, self).start()
def stop(self):
LOG.info("Halting the TaskServer")
LOG.info(u._LI("Halting the TaskServer"))
super(TaskServer, self).stop()
self._server.stop()

View File

@@ -86,8 +86,11 @@ class KeystoneEventConsumer(resources.BaseTask):
}
)
def handle_success(self, project, project_id=None, resource_type=None,
operation_type=None):
def handle_success(self, project, result, project_id=None,
resource_type=None, operation_type=None):
# Note: The processing 'result' argument can be ignored as 'result'
# only pertains to long-running tasks. See the documentation for
# BaseTask for more details.
LOG.info(
u._LI(
'Successfully handled Keystone event, '

View File

@@ -32,6 +32,34 @@ from barbican.tasks import certificate_resources as cert
LOG = utils.getLogger(__name__)
RETRY_MSEC = 60 * 1000
class FollowOnProcessingStatusDTO(object):
"""Follow On Processing status data transfer object (DTO).
An object of this type is optionally returned by the
BaseTask.handle_processing() method defined below, and is used to guide
follow on processing and to provide status feedback to clients.
"""
def __init__(self, status=u._('Unknown'), status_message=u._('Unknown'),
retry_method=None, retry_msec=RETRY_MSEC):
"""Creates a new FollowOnProcessingStatusDTO.
:param status: Status for cert order
:param status_message: Message to explain status type.
:param retry_method: Method to retry
:param retry_msec: Number of milliseconds to wait for retry
"""
self.status = status
self.status_message = status_message
self.retry_method = retry_method
self.retry_msec = int(retry_msec)
def is_follow_on_needed(self):
return self.retry_method
@six.add_metaclass(abc.ABCMeta)
class BaseTask(object):
"""Base asynchronous task."""
@@ -68,7 +96,7 @@ class BaseTask(object):
# Process the target entity.
try:
self.handle_processing(entity, *args, **kwargs)
result = self.handle_processing(entity, *args, **kwargs)
except Exception as e_orig:
LOG.exception(u._LE("Could not perform processing for "
"task '%s'."), name)
@@ -87,7 +115,7 @@ class BaseTask(object):
# Handle successful conclusion of processing.
try:
self.handle_success(entity, *args, **kwargs)
self.handle_success(entity, result, *args, **kwargs)
except Exception as e:
LOG.exception(u._LE("Could not process after successfully "
"executing task '%s'."), name)
@@ -108,7 +136,9 @@ class BaseTask(object):
:param args: List of arguments passed in from the client.
:param kwargs: Dict of arguments passed in from the client.
:return: None
:return: None if no follow on processing is needed for this task,
otherwise a :class:`FollowOnProcessingStatusDTO` instance
with information on how to process this task into the future.
"""
@abc.abstractmethod
@@ -129,19 +159,73 @@ class BaseTask(object):
"""
@abc.abstractmethod
def handle_success(self, entity, *args, **kwargs):
def handle_success(self, entity, result, *args, **kwargs):
"""A hook method to post-process after successful entity processing.
This method could be used to mark entity as being active, or to
add information/references to the entity.
:param entity: Entity retrieved from _retrieve_entity() above.
:param result: A :class:`FollowOnProcessingStatusDTO` instance
representing processing result status, None implies
that no follow on processing is required.
:param args: List of arguments passed in from the client.
:param kwargs: Dict of arguments passed in from the client.
:return: None
"""
class _OrderTaskHelper(object):
"""Supports order-related BaseTask operations.
BaseTask sub-classes can delegate to an instance of this class to perform
common order-related operations.
"""
def __init__(self):
self.order_repo = rep.get_order_repository()
def retrieve_entity(self, order_id, external_project_id, *args, **kwargs):
"""Retrieve an order entity by its PK ID."""
return self.order_repo.get(
entity_id=order_id,
external_project_id=external_project_id)
def handle_error(self, order, status, message, exception,
*args, **kwargs):
"""Stamp the order entity as terminated due to an error."""
order.status = models.States.ERROR
order.error_status_code = status
order.set_error_reason_safely(message)
self.order_repo.save(order)
def handle_success(self, order, result, *args, **kwargs):
"""Handle if the order entity is terminated or else long running.
The 'result' argument (if present) indicates if a order should now be
terminated due to it being completed, or else should be held in the
PENDING state due to follow on workflow processing. If 'result' is not
provided, the order is presumed completed.
"""
is_follow_on_needed = False
sub_status = None
sub_status_message = None
if result:
is_follow_on_needed = result.is_follow_on_needed()
sub_status = result.status
sub_status_message = result.status_message
if not is_follow_on_needed:
order.status = models.States.ACTIVE
if sub_status:
order.set_sub_status_safely(sub_status)
if sub_status_message:
order.set_sub_status_message_safely(sub_status_message)
self.order_repo.save(order)
class BeginTypeOrder(BaseTask):
"""Handles beginning processing of a TypeOrder."""
@@ -150,35 +234,15 @@ class BeginTypeOrder(BaseTask):
def __init__(self):
LOG.debug('Creating BeginTypeOrder task processor')
self.order_repo = rep.get_order_repository()
self.project_repo = rep.get_project_repository()
self.helper = _OrderTaskHelper()
def retrieve_entity(self, order_id, external_project_id):
return self.order_repo.get(
entity_id=order_id,
external_project_id=external_project_id)
def retrieve_entity(self, *args, **kwargs):
return self.helper.retrieve_entity(*args, **kwargs)
def handle_processing(self, order, *args, **kwargs):
self.handle_order(order)
def handle_error(self, order, status, message, exception,
*args, **kwargs):
order.status = models.States.ERROR
order.error_status_code = status
order.error_reason = message
self.order_repo.save(order)
def handle_success(self, order, *args, **kwargs):
if models.OrderType.CERTIFICATE != order.type:
order.status = models.States.ACTIVE
else:
# TODO(alee-3): enable the code below when sub status is added
# if cert.ORDER_STATUS_CERT_GENERATED.id == order.sub_status:
# order.status = models.States.ACTIVE
order.status = models.States.ACTIVE
self.order_repo.save(order)
def handle_order(self, order):
"""Handle secret creation using meta info.
@@ -228,6 +292,15 @@ class BeginTypeOrder(BaseTask):
u._('Order type "{order_type}" not implemented.').format(
order_type=order_type))
def handle_error(self, order, status, message, exception,
*args, **kwargs):
self.helper.handle_error(
order, status, message, exception, *args, **kwargs)
def handle_success(self, order, result, *args, **kwargs):
self.helper.handle_success(
order, result, *args, **kwargs)
class UpdateOrder(BaseTask):
"""Handles updating an order."""
@@ -236,29 +309,15 @@ class UpdateOrder(BaseTask):
def __init__(self):
LOG.debug('Creating UpdateOrder task processor')
self.order_repo = rep.get_order_repository()
self.helper = _OrderTaskHelper()
def retrieve_entity(self, order_id, external_project_id, updated_meta):
return self.order_repo.get(
entity_id=order_id,
external_project_id=external_project_id)
def retrieve_entity(self, *args, **kwargs):
return self.helper.retrieve_entity(*args, **kwargs)
def handle_processing(self, order, order_id, keystone_id, updated_meta):
def handle_processing(
self, order, order_id, external_project_id, updated_meta):
self.handle_order(order, updated_meta)
def handle_error(self, order, status, message, exception,
*args, **kwargs):
order.status = models.States.ERROR
order.error_status_code = status
order.error_reason = message
LOG.exception(u._LE("An error has occurred updating the order."))
self.order_repo.save(order)
def handle_success(self, order, *args, **kwargs):
# TODO(chellygel): Handle sub-status on a pending order.
order.status = models.States.ACTIVE
self.order_repo.save(order)
def handle_order(self, order, updated_meta):
"""Handle Order Update
@@ -278,3 +337,12 @@ class UpdateOrder(BaseTask):
order_type=order_type))
LOG.debug("...done updating order.")
def handle_error(self, order, status, message, exception,
*args, **kwargs):
self.helper.handle_error(
order, status, message, exception, *args, **kwargs)
def handle_success(self, order, result, *args, **kwargs):
self.helper.handle_success(
order, result, *args, **kwargs)

View File

@@ -151,8 +151,8 @@ class WhenUsingBeginTypeOrderTask(utils.BaseTestCase):
@mock.patch('barbican.tasks.resources.BeginTypeOrder')
def test_process_order_catch_exception(self, mock_begin_order):
"""Test process_type_order() handles all exceptions."""
mock_begin_order.return_value.process.side_effect = Exception()
"""Test that BeginTypeOrder's process() handles all exceptions."""
mock_begin_order.return_value._process.side_effect = Exception()
self.tasks.process_type_order(None, self.order_id,
self.external_project_id)

View File

@@ -48,6 +48,7 @@ class BaseOrderTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin):
self.setup_project_repository_mock(self.project_repo)
self.order.status = models.States.PENDING
self.order.id = 'orderid1234'
self.order.project_id = self.project_id
self.order_repo = mock.MagicMock()
self.order_repo.get.return_value = self.order
@@ -85,6 +86,76 @@ class BaseOrderTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin):
self.container = models.Container()
class WhenUsingOrderTaskHelper(BaseOrderTestCase):
def setUp(self):
super(WhenUsingOrderTaskHelper, self).setUp()
self.result = resources.FollowOnProcessingStatusDTO()
self.helper = resources._OrderTaskHelper()
def test_should_retrieve_entity(self):
order_model = self.helper.retrieve_entity(
self.order.id, self.external_project_id)
self.assertEqual(self.order.id, order_model.id)
self.order_repo.get.assert_called_once_with(
entity_id=self.order.id,
external_project_id=self.external_project_id)
def test_should_handle_error(self):
self.helper.handle_error(self.order, 'status_code', 'reason',
ValueError())
self.assertEqual(models.States.ERROR, self.order.status)
self.assertEqual('status_code', self.order.error_status_code)
self.assertEqual('reason', self.order.error_reason)
self.order_repo.save.assert_called_once_with(self.order)
def test_should_handle_success_no_result(self):
self.helper.handle_success(self.order, None)
self.assertEqual(models.States.ACTIVE, self.order.status)
self.assertIsNone(self.order.sub_status)
self.assertIsNone(self.order.sub_status_message)
self.order_repo.save.assert_called_once_with(self.order)
def test_should_handle_success_result_no_follow_on_needed(self):
self.helper.handle_success(self.order, self.result)
self.assertEqual(models.States.ACTIVE, self.order.status)
self.assertEqual('Unknown', self.order.sub_status)
self.assertEqual('Unknown', self.order.sub_status_message)
self.order_repo.save.assert_called_once_with(self.order)
def test_should_handle_success_result_follow_on_needed(self):
self.result.retry_method = 'bogus_method_here'
self.result.status = 'status'
self.result.status_message = 'status_message'
self.helper.handle_success(self.order, self.result)
self.assertNotEqual(models.States.ACTIVE, self.order.status)
self.assertEqual('status', self.order.sub_status)
self.assertEqual('status_message', self.order.sub_status_message)
self.order_repo.save.assert_called_once_with(self.order)
def test_should_handle_success_result_large_statuses_clipped(self):
sub_status = 'z' * (models.SUB_STATUS_LENGTH + 1)
sub_status_message = 'z' * (models.SUB_STATUS_MESSAGE_LENGTH + 1)
self.result.status = sub_status
self.result.status_message = sub_status_message
self.helper.handle_success(self.order, self.result)
self.assertEqual(sub_status[:-1], self.order.sub_status)
self.assertEqual(
sub_status_message[:-1], self.order.sub_status_message)
self.order_repo.save.assert_called_once_with(self.order)
class WhenBeginningKeyTypeOrder(BaseOrderTestCase):
def setUp(self):
@@ -179,13 +250,30 @@ class WhenBeginningKeyTypeOrder(BaseOrderTestCase):
)
class WhenUpdatingKeyTypeOrder(BaseOrderTestCase):
class WhenUpdatingOrder(BaseOrderTestCase):
def setUp(self):
super(WhenUpdatingKeyTypeOrder, self).setUp()
super(WhenUpdatingOrder, self).setUp()
self.updated_meta = 'updated'
self.resource = resources.UpdateOrder()
@mock.patch(
'barbican.tasks.certificate_resources.modify_certificate_request')
def test_should_update_certificate_order(self, mock_modify_cert_request):
self.order.type = models.OrderType.CERTIFICATE
self.resource.process(
self.order.id, self.external_project_id, self.updated_meta)
self.assertEqual(self.order.status, models.States.ACTIVE)
mock_modify_cert_request.assert_called_once_with(
self.order,
self.updated_meta
)
@mock.patch(
'barbican.tasks.certificate_resources.modify_certificate_request')
def test_should_fail_during_processing(self, mock_mod_cert):

View File

@@ -83,7 +83,7 @@ class MockModelRepositoryMixin(object):
"""Class for setting up the repo factory mocks
This class has the purpose of setting up the mocks for the model repository
factory functions. This is because the are intended to be singletons, and
factory functions. This is because they are intended to be singletons, and
thus called inside the code-base, and not really passed around as
arguments. Thus, this kind of approach is needed.