From 87bb9141a2a47333122f755367c556bd7651e017 Mon Sep 17 00:00:00 2001 From: jfwood Date: Thu, 19 Feb 2015 16:33:10 -0600 Subject: [PATCH] Add sub-status logic to worker/task processing The barbican.tasks.resources.BaseTask was modified to allow the handle_processing() template method to return an optional status result that contains optional sub-status and retry information that is then used to determine if an entity (such as an Order) stays in the PENDING state marked with sub-status data, or else terminates in the ERROR or ACTIVE states. Change-Id: I29e0e61e723f84a123b88b30db50746b6862b02d --- barbican/model/models.py | 24 +++- barbican/queue/keystone_listener.py | 4 + barbican/queue/server.py | 37 +++--- barbican/tasks/keystone_consumer.py | 7 +- barbican/tasks/resources.py | 160 ++++++++++++++++++------- barbican/tests/queue/test_server.py | 4 +- barbican/tests/tasks/test_resources.py | 92 +++++++++++++- barbican/tests/utils.py | 2 +- 8 files changed, 258 insertions(+), 72 deletions(-) diff --git a/barbican/model/models.py b/barbican/model/models.py index 8811b91d4..af5651df4 100644 --- a/barbican/model/models.py +++ b/barbican/model/models.py @@ -35,6 +35,9 @@ from barbican.plugin.interface import secret_store LOG = utils.getLogger(__name__) BASE = declarative.declarative_base() +ERROR_REASON_LENGTH = 255 +SUB_STATUS_LENGTH = 36 +SUB_STATUS_MESSAGE_LENGTH = 255 # Allowed entity states @@ -483,7 +486,7 @@ class Order(BASE, SoftDeleteMixIn, ModelBase): index=True, nullable=False) error_status_code = sa.Column(sa.String(16)) - error_reason = sa.Column(sa.String(255)) + error_reason = sa.Column(sa.String(ERROR_REASON_LENGTH)) meta = sa.Column(JsonBlob(), nullable=True) @@ -491,8 +494,9 @@ class Order(BASE, SoftDeleteMixIn, ModelBase): index=True, nullable=True) container_id = sa.Column(sa.String(36), sa.ForeignKey('containers.id'), index=True, nullable=True) - sub_status = sa.Column(sa.String(36), nullable=True) - sub_status_message = sa.Column(sa.String(255), nullable=True) + sub_status = sa.Column(sa.String(SUB_STATUS_LENGTH), nullable=True) + sub_status_message = sa.Column(sa.String(SUB_STATUS_MESSAGE_LENGTH), + nullable=True) order_plugin_metadata = orm.relationship( "OrderPluginMetadatum", @@ -517,6 +521,20 @@ class Order(BASE, SoftDeleteMixIn, ModelBase): self.sub_status_message = parsed_request.get( 'sub_status_message') + def set_error_reason_safely(self, error_reason_raw): + """Ensure error reason does not raise database attribute exceptions.""" + self.error_reason = error_reason_raw[:ERROR_REASON_LENGTH] + + def set_sub_status_safely(self, sub_status_raw): + """Ensure sub-status does not raise database attribute exceptions.""" + self.sub_status = sub_status_raw[:SUB_STATUS_LENGTH] + + def set_sub_status_message_safely(self, sub_status_message_raw): + """Ensure status message doesn't raise database attrib. exceptions.""" + self.sub_status_message = sub_status_message_raw[ + :SUB_STATUS_MESSAGE_LENGTH + ] + def _do_delete_children(self, session): """Sub-class hook: delete children relationships.""" for k, v in self.order_plugin_metadata.items(): diff --git a/barbican/queue/keystone_listener.py b/barbican/queue/keystone_listener.py index 337a6c2b7..731d47456 100644 --- a/barbican/queue/keystone_listener.py +++ b/barbican/queue/keystone_listener.py @@ -89,6 +89,10 @@ class NotificationTask(object): except Exception: # No need to log message here as task process method has # already logged it + # TODO(john-wood-w) This really should be retried on a + # schedule and really only if the database is down, not + # for any exception otherwise tasks will be re-queued + # repeatedly. Revisit as part of the retry task work later. if self.conf.keystone_notifications.allow_requeue: return messaging.NotificationResult.REQUEUE else: diff --git a/barbican/queue/server.py b/barbican/queue/server.py index 98781c3c2..07d1cf3c1 100644 --- a/barbican/queue/server.py +++ b/barbican/queue/server.py @@ -28,6 +28,7 @@ except ImportError: from oslo_config import cfg from barbican.common import utils +from barbican import i18n as u from barbican.model import repositories from barbican.openstack.common import service from barbican import queue @@ -46,13 +47,17 @@ def transactional(fn): @functools.wraps(fn) def wrapper(*args, **kwargs): + fn_name = getattr(fn, '__name__', '????') + if not queue.is_server_side(): fn(*args, **kwargs) # Non-server mode directly invokes tasks. + LOG.info(u._LI("Completed worker task: '%s'"), fn_name) else: # Manage session/transaction. try: fn(*args, **kwargs) repositories.commit() + LOG.info(u._LI("Completed worker task: '%s'"), fn_name) except Exception: """NOTE: Wrapped functions must process with care! @@ -60,6 +65,10 @@ def transactional(fn): including any updates made to entities such as setting error codes and error messages. """ + LOG.exception( + u._LE("Problem seen processing worker task: '%s'"), + fn_name + ) repositories.rollback() finally: repositories.clear() @@ -113,25 +122,21 @@ class Tasks(object): @transactional def process_type_order(self, context, order_id, project_id): """Process TypeOrder.""" - LOG.info('Processing TypeOrder: {0}'.format(order_id)) - task = resources.BeginTypeOrder() - try: - task.process(order_id, project_id) - except Exception: - LOG.exception(">>>>> Task exception seen, details reported " - "on the Orders entity.") + LOG.info( + u._LI("Processing type order: order ID is '%s'"), + order_id + ) + resources.BeginTypeOrder().process(order_id, project_id) @monitored @transactional def update_order(self, context, order_id, project_id, updated_meta): """Update Order.""" - LOG.info('Updating TypeOrder: {0}'.format(order_id)) - task = resources.UpdateOrder() - try: - task.process(order_id, project_id, updated_meta) - except Exception: - LOG.exception(">>>>> Task exception seen, details reported " - "on the Orders entity.") + LOG.info( + u._LI("Processing update order: order ID is '%s'"), + order_id + ) + resources.UpdateOrder().process(order_id, project_id, updated_meta) class TaskServer(Tasks, service.Service): @@ -160,11 +165,11 @@ class TaskServer(Tasks, service.Service): endpoints=[self]) def start(self): - LOG.info("Starting the TaskServer") + LOG.info(u._LI("Starting the TaskServer")) self._server.start() super(TaskServer, self).start() def stop(self): - LOG.info("Halting the TaskServer") + LOG.info(u._LI("Halting the TaskServer")) super(TaskServer, self).stop() self._server.stop() diff --git a/barbican/tasks/keystone_consumer.py b/barbican/tasks/keystone_consumer.py index b13cc583a..7e7057f5d 100644 --- a/barbican/tasks/keystone_consumer.py +++ b/barbican/tasks/keystone_consumer.py @@ -86,8 +86,11 @@ class KeystoneEventConsumer(resources.BaseTask): } ) - def handle_success(self, project, project_id=None, resource_type=None, - operation_type=None): + def handle_success(self, project, result, project_id=None, + resource_type=None, operation_type=None): + # Note: The processing 'result' argument can be ignored as 'result' + # only pertains to long-running tasks. See the documentation for + # BaseTask for more details. LOG.info( u._LI( 'Successfully handled Keystone event, ' diff --git a/barbican/tasks/resources.py b/barbican/tasks/resources.py index c2b08c9d5..471e72d68 100644 --- a/barbican/tasks/resources.py +++ b/barbican/tasks/resources.py @@ -32,6 +32,34 @@ from barbican.tasks import certificate_resources as cert LOG = utils.getLogger(__name__) +RETRY_MSEC = 60 * 1000 + + +class FollowOnProcessingStatusDTO(object): + """Follow On Processing status data transfer object (DTO). + + An object of this type is optionally returned by the + BaseTask.handle_processing() method defined below, and is used to guide + follow on processing and to provide status feedback to clients. + """ + def __init__(self, status=u._('Unknown'), status_message=u._('Unknown'), + retry_method=None, retry_msec=RETRY_MSEC): + """Creates a new FollowOnProcessingStatusDTO. + + :param status: Status for cert order + :param status_message: Message to explain status type. + :param retry_method: Method to retry + :param retry_msec: Number of milliseconds to wait for retry + """ + self.status = status + self.status_message = status_message + self.retry_method = retry_method + self.retry_msec = int(retry_msec) + + def is_follow_on_needed(self): + return self.retry_method + + @six.add_metaclass(abc.ABCMeta) class BaseTask(object): """Base asynchronous task.""" @@ -68,7 +96,7 @@ class BaseTask(object): # Process the target entity. try: - self.handle_processing(entity, *args, **kwargs) + result = self.handle_processing(entity, *args, **kwargs) except Exception as e_orig: LOG.exception(u._LE("Could not perform processing for " "task '%s'."), name) @@ -87,7 +115,7 @@ class BaseTask(object): # Handle successful conclusion of processing. try: - self.handle_success(entity, *args, **kwargs) + self.handle_success(entity, result, *args, **kwargs) except Exception as e: LOG.exception(u._LE("Could not process after successfully " "executing task '%s'."), name) @@ -108,7 +136,9 @@ class BaseTask(object): :param args: List of arguments passed in from the client. :param kwargs: Dict of arguments passed in from the client. - :return: None + :return: None if no follow on processing is needed for this task, + otherwise a :class:`FollowOnProcessingStatusDTO` instance + with information on how to process this task into the future. """ @abc.abstractmethod @@ -129,19 +159,73 @@ class BaseTask(object): """ @abc.abstractmethod - def handle_success(self, entity, *args, **kwargs): + def handle_success(self, entity, result, *args, **kwargs): """A hook method to post-process after successful entity processing. This method could be used to mark entity as being active, or to add information/references to the entity. :param entity: Entity retrieved from _retrieve_entity() above. + :param result: A :class:`FollowOnProcessingStatusDTO` instance + representing processing result status, None implies + that no follow on processing is required. :param args: List of arguments passed in from the client. :param kwargs: Dict of arguments passed in from the client. :return: None """ +class _OrderTaskHelper(object): + """Supports order-related BaseTask operations. + + BaseTask sub-classes can delegate to an instance of this class to perform + common order-related operations. + """ + def __init__(self): + self.order_repo = rep.get_order_repository() + + def retrieve_entity(self, order_id, external_project_id, *args, **kwargs): + """Retrieve an order entity by its PK ID.""" + return self.order_repo.get( + entity_id=order_id, + external_project_id=external_project_id) + + def handle_error(self, order, status, message, exception, + *args, **kwargs): + """Stamp the order entity as terminated due to an error.""" + order.status = models.States.ERROR + order.error_status_code = status + order.set_error_reason_safely(message) + self.order_repo.save(order) + + def handle_success(self, order, result, *args, **kwargs): + """Handle if the order entity is terminated or else long running. + + The 'result' argument (if present) indicates if a order should now be + terminated due to it being completed, or else should be held in the + PENDING state due to follow on workflow processing. If 'result' is not + provided, the order is presumed completed. + """ + is_follow_on_needed = False + sub_status = None + sub_status_message = None + if result: + is_follow_on_needed = result.is_follow_on_needed() + sub_status = result.status + sub_status_message = result.status_message + + if not is_follow_on_needed: + order.status = models.States.ACTIVE + + if sub_status: + order.set_sub_status_safely(sub_status) + + if sub_status_message: + order.set_sub_status_message_safely(sub_status_message) + + self.order_repo.save(order) + + class BeginTypeOrder(BaseTask): """Handles beginning processing of a TypeOrder.""" @@ -150,35 +234,15 @@ class BeginTypeOrder(BaseTask): def __init__(self): LOG.debug('Creating BeginTypeOrder task processor') - self.order_repo = rep.get_order_repository() self.project_repo = rep.get_project_repository() + self.helper = _OrderTaskHelper() - def retrieve_entity(self, order_id, external_project_id): - return self.order_repo.get( - entity_id=order_id, - external_project_id=external_project_id) + def retrieve_entity(self, *args, **kwargs): + return self.helper.retrieve_entity(*args, **kwargs) def handle_processing(self, order, *args, **kwargs): self.handle_order(order) - def handle_error(self, order, status, message, exception, - *args, **kwargs): - order.status = models.States.ERROR - order.error_status_code = status - order.error_reason = message - self.order_repo.save(order) - - def handle_success(self, order, *args, **kwargs): - if models.OrderType.CERTIFICATE != order.type: - order.status = models.States.ACTIVE - else: - # TODO(alee-3): enable the code below when sub status is added - # if cert.ORDER_STATUS_CERT_GENERATED.id == order.sub_status: - # order.status = models.States.ACTIVE - order.status = models.States.ACTIVE - - self.order_repo.save(order) - def handle_order(self, order): """Handle secret creation using meta info. @@ -228,6 +292,15 @@ class BeginTypeOrder(BaseTask): u._('Order type "{order_type}" not implemented.').format( order_type=order_type)) + def handle_error(self, order, status, message, exception, + *args, **kwargs): + self.helper.handle_error( + order, status, message, exception, *args, **kwargs) + + def handle_success(self, order, result, *args, **kwargs): + self.helper.handle_success( + order, result, *args, **kwargs) + class UpdateOrder(BaseTask): """Handles updating an order.""" @@ -236,29 +309,15 @@ class UpdateOrder(BaseTask): def __init__(self): LOG.debug('Creating UpdateOrder task processor') - self.order_repo = rep.get_order_repository() + self.helper = _OrderTaskHelper() - def retrieve_entity(self, order_id, external_project_id, updated_meta): - return self.order_repo.get( - entity_id=order_id, - external_project_id=external_project_id) + def retrieve_entity(self, *args, **kwargs): + return self.helper.retrieve_entity(*args, **kwargs) - def handle_processing(self, order, order_id, keystone_id, updated_meta): + def handle_processing( + self, order, order_id, external_project_id, updated_meta): self.handle_order(order, updated_meta) - def handle_error(self, order, status, message, exception, - *args, **kwargs): - order.status = models.States.ERROR - order.error_status_code = status - order.error_reason = message - LOG.exception(u._LE("An error has occurred updating the order.")) - self.order_repo.save(order) - - def handle_success(self, order, *args, **kwargs): - # TODO(chellygel): Handle sub-status on a pending order. - order.status = models.States.ACTIVE - self.order_repo.save(order) - def handle_order(self, order, updated_meta): """Handle Order Update @@ -278,3 +337,12 @@ class UpdateOrder(BaseTask): order_type=order_type)) LOG.debug("...done updating order.") + + def handle_error(self, order, status, message, exception, + *args, **kwargs): + self.helper.handle_error( + order, status, message, exception, *args, **kwargs) + + def handle_success(self, order, result, *args, **kwargs): + self.helper.handle_success( + order, result, *args, **kwargs) diff --git a/barbican/tests/queue/test_server.py b/barbican/tests/queue/test_server.py index 5d4158162..8fe27cc01 100644 --- a/barbican/tests/queue/test_server.py +++ b/barbican/tests/queue/test_server.py @@ -151,8 +151,8 @@ class WhenUsingBeginTypeOrderTask(utils.BaseTestCase): @mock.patch('barbican.tasks.resources.BeginTypeOrder') def test_process_order_catch_exception(self, mock_begin_order): - """Test process_type_order() handles all exceptions.""" - mock_begin_order.return_value.process.side_effect = Exception() + """Test that BeginTypeOrder's process() handles all exceptions.""" + mock_begin_order.return_value._process.side_effect = Exception() self.tasks.process_type_order(None, self.order_id, self.external_project_id) diff --git a/barbican/tests/tasks/test_resources.py b/barbican/tests/tasks/test_resources.py index fcebfb801..8fe703497 100644 --- a/barbican/tests/tasks/test_resources.py +++ b/barbican/tests/tasks/test_resources.py @@ -48,6 +48,7 @@ class BaseOrderTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin): self.setup_project_repository_mock(self.project_repo) self.order.status = models.States.PENDING + self.order.id = 'orderid1234' self.order.project_id = self.project_id self.order_repo = mock.MagicMock() self.order_repo.get.return_value = self.order @@ -85,6 +86,76 @@ class BaseOrderTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin): self.container = models.Container() +class WhenUsingOrderTaskHelper(BaseOrderTestCase): + + def setUp(self): + super(WhenUsingOrderTaskHelper, self).setUp() + + self.result = resources.FollowOnProcessingStatusDTO() + + self.helper = resources._OrderTaskHelper() + + def test_should_retrieve_entity(self): + order_model = self.helper.retrieve_entity( + self.order.id, self.external_project_id) + + self.assertEqual(self.order.id, order_model.id) + self.order_repo.get.assert_called_once_with( + entity_id=self.order.id, + external_project_id=self.external_project_id) + + def test_should_handle_error(self): + self.helper.handle_error(self.order, 'status_code', 'reason', + ValueError()) + + self.assertEqual(models.States.ERROR, self.order.status) + self.assertEqual('status_code', self.order.error_status_code) + self.assertEqual('reason', self.order.error_reason) + self.order_repo.save.assert_called_once_with(self.order) + + def test_should_handle_success_no_result(self): + self.helper.handle_success(self.order, None) + + self.assertEqual(models.States.ACTIVE, self.order.status) + self.assertIsNone(self.order.sub_status) + self.assertIsNone(self.order.sub_status_message) + self.order_repo.save.assert_called_once_with(self.order) + + def test_should_handle_success_result_no_follow_on_needed(self): + self.helper.handle_success(self.order, self.result) + + self.assertEqual(models.States.ACTIVE, self.order.status) + self.assertEqual('Unknown', self.order.sub_status) + self.assertEqual('Unknown', self.order.sub_status_message) + self.order_repo.save.assert_called_once_with(self.order) + + def test_should_handle_success_result_follow_on_needed(self): + self.result.retry_method = 'bogus_method_here' + self.result.status = 'status' + self.result.status_message = 'status_message' + + self.helper.handle_success(self.order, self.result) + + self.assertNotEqual(models.States.ACTIVE, self.order.status) + self.assertEqual('status', self.order.sub_status) + self.assertEqual('status_message', self.order.sub_status_message) + self.order_repo.save.assert_called_once_with(self.order) + + def test_should_handle_success_result_large_statuses_clipped(self): + sub_status = 'z' * (models.SUB_STATUS_LENGTH + 1) + sub_status_message = 'z' * (models.SUB_STATUS_MESSAGE_LENGTH + 1) + + self.result.status = sub_status + self.result.status_message = sub_status_message + + self.helper.handle_success(self.order, self.result) + + self.assertEqual(sub_status[:-1], self.order.sub_status) + self.assertEqual( + sub_status_message[:-1], self.order.sub_status_message) + self.order_repo.save.assert_called_once_with(self.order) + + class WhenBeginningKeyTypeOrder(BaseOrderTestCase): def setUp(self): @@ -179,13 +250,30 @@ class WhenBeginningKeyTypeOrder(BaseOrderTestCase): ) -class WhenUpdatingKeyTypeOrder(BaseOrderTestCase): +class WhenUpdatingOrder(BaseOrderTestCase): def setUp(self): - super(WhenUpdatingKeyTypeOrder, self).setUp() + super(WhenUpdatingOrder, self).setUp() + + self.updated_meta = 'updated' self.resource = resources.UpdateOrder() + @mock.patch( + 'barbican.tasks.certificate_resources.modify_certificate_request') + def test_should_update_certificate_order(self, mock_modify_cert_request): + self.order.type = models.OrderType.CERTIFICATE + + self.resource.process( + self.order.id, self.external_project_id, self.updated_meta) + + self.assertEqual(self.order.status, models.States.ACTIVE) + + mock_modify_cert_request.assert_called_once_with( + self.order, + self.updated_meta + ) + @mock.patch( 'barbican.tasks.certificate_resources.modify_certificate_request') def test_should_fail_during_processing(self, mock_mod_cert): diff --git a/barbican/tests/utils.py b/barbican/tests/utils.py index d44dda738..f2d352985 100644 --- a/barbican/tests/utils.py +++ b/barbican/tests/utils.py @@ -83,7 +83,7 @@ class MockModelRepositoryMixin(object): """Class for setting up the repo factory mocks This class has the purpose of setting up the mocks for the model repository - factory functions. This is because the are intended to be singletons, and + factory functions. This is because they are intended to be singletons, and thus called inside the code-base, and not really passed around as arguments. Thus, this kind of approach is needed.