From 900bdbe3d9aa80b3e44d18f637711629cb456e3d Mon Sep 17 00:00:00 2001 From: wanghao Date: Tue, 27 Jun 2017 09:33:40 +0800 Subject: [PATCH] Notification Delivery Policy This patch introduces the delivery retry policy into Zaqar. It will work when the notification sent from Zaqar to the subscriber failed. User can define the retry policy in the options of subscription or metadata of queue. Change-Id: I1a74c2d5b69fb82826c303468099db34b3e41b5b Implements: bp notification-delivery-policy --- doc/source/user/index.rst | 1 + .../user/notification_delivery_policy.rst | 68 +++++++++++++++ ...tion-delivery-policy-fbc94083b4e6b8d0.yaml | 6 ++ zaqar/common/consts.py | 16 ++++ zaqar/notification/notifier.py | 12 ++- zaqar/notification/tasks/webhook.py | 87 ++++++++++++++++++- zaqar/storage/pipeline.py | 4 +- .../tests/unit/notification/test_notifier.py | 34 ++++++-- zaqar/transport/validation.py | 37 ++++++++ 9 files changed, 252 insertions(+), 13 deletions(-) create mode 100644 doc/source/user/notification_delivery_policy.rst create mode 100644 releasenotes/notes/support-notification-delivery-policy-fbc94083b4e6b8d0.yaml diff --git a/doc/source/user/index.rst b/doc/source/user/index.rst index 546f68ae1..2b6ba0726 100644 --- a/doc/source/user/index.rst +++ b/doc/source/user/index.rst @@ -9,3 +9,4 @@ User Guide send_request_api authentication_tokens headers_queue_api_working + notification_delivery_policy diff --git a/doc/source/user/notification_delivery_policy.rst b/doc/source/user/notification_delivery_policy.rst new file mode 100644 index 000000000..6e90d82de --- /dev/null +++ b/doc/source/user/notification_delivery_policy.rst @@ -0,0 +1,68 @@ +.. + Licensed under the Apache License, Version 2.0 (the "License"); you may + not use this file except in compliance with the License. You may obtain + a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + +====================================== +The Notification Delivery Policy Guide +====================================== + +Support notification delivery policy in webhook type. It will work when +the notification is sent from Zaqar to the subscriber failed. +This guide shows how to use this feature: + +Webhook +------- + +.. note:: + + You should make sure that the message notification is enabled. By default, + the ``message_pipeline`` config option in [storage] section should be set + like: message_pipeline = zaqar.notification.notifier + +1. Create the queue with _retry_policy metadata like this: + +.. code:: json + + { + '_retry_policy': { + 'retries_with_no_delay': , + 'minimum_delay_retries': , + 'minimum_delay': , + 'maximum_delay': , + 'maximum_delay_retries': , + 'retry_backoff_function': , + 'ignore_subscription_override': } + } + +- 'minimum_delay' and 'maximum_delay' mean delay time in seconds. +- 'retry_backoff_function' mean name of retry backoff function. + There will be a enum in Zaqar that contain all valid values. + At first step, Zaqar only supports one function: 'linear'. +- 'minimum_delay_retries' and 'maximum_delay_retries' mean the number of + retries with 'minimum_delay' or 'maximum_delay' delay time. + +If value of retry_policy is empty dict, that Zaqar will use default +value to those keys: + +- retries_with_no_delay=3 +- minimum_delay_retries=3 +- minimum_delay=5 +- maximum_delay=30 +- maximum_delay_retries=3 +- retry_backoff_function=linear +- ignore_subscription_override=False + +2. Create a subscription with options like queue's metadata below. If user + don't set the options, Zaqar will use the retry policy in queue's metadata. + If user do it, Zaqar will use the retry policy in options by default, if + user still want to use retry policy in queue's metadata, then can set the + ignore_subscription_override = True. diff --git a/releasenotes/notes/support-notification-delivery-policy-fbc94083b4e6b8d0.yaml b/releasenotes/notes/support-notification-delivery-policy-fbc94083b4e6b8d0.yaml new file mode 100644 index 000000000..37b6f5d64 --- /dev/null +++ b/releasenotes/notes/support-notification-delivery-policy-fbc94083b4e6b8d0.yaml @@ -0,0 +1,6 @@ +--- +features: + - Support notificaiton delivery policy in webhook type. It will work when + the notification is sent from Zaqar to the subscriber failed. + User can define the retry policy in the options of subscription or + metadata of queue. diff --git a/zaqar/common/consts.py b/zaqar/common/consts.py index 96cb1adb4..05e59e8b5 100644 --- a/zaqar/common/consts.py +++ b/zaqar/common/consts.py @@ -115,3 +115,19 @@ FLAVOR_OPS = ( 'flavor_update', 'flavor_delete', ) + +RETRY_OPS = ( + RETRIES_WITH_NO_DELAY, + MINIMUM_DELAY_RETRIES, + MINIMUM_DELAY, + MAXIMUM_DELAY, + MAXIMUM_DELA_RETRIES, + LINEAR_INTERVAL, +) = ( + 3, + 3, + 5, + 30, + 3, + 5, +) diff --git a/zaqar/notification/notifier.py b/zaqar/notification/notifier.py index 485e91ec2..9e94f7ef7 100644 --- a/zaqar/notification/notifier.py +++ b/zaqar/notification/notifier.py @@ -45,6 +45,7 @@ class NotifierDriver(object): max_workers = kwargs.get('max_notifier_workers', 10) self.executor = futurist.ThreadPoolExecutor(max_workers=max_workers) self.require_confirmation = kwargs.get('require_confirmation', False) + self.queue_controller = kwargs.get('queue_controller') def post(self, queue_name, messages, client_uuid, project=None): """Send messages to the subscribers.""" @@ -52,6 +53,9 @@ class NotifierDriver(object): if not isinstance(self.subscription_controller, pooling.SubscriptionController): marker = None + queue_metadata = self.queue_controller.get(queue_name, + project) + retry_policy = queue_metadata.get('_retry_policy', {}) while True: subscribers = self.subscription_controller.list( queue_name, project, marker=marker) @@ -70,7 +74,8 @@ class NotifierDriver(object): continue for msg in messages: msg['Message_Type'] = MessageType.Notification.name - self._execute(s_type, sub, messages) + self._execute(s_type, sub, messages, + retry_policy=retry_policy) marker = next(subscribers) if not marker: break @@ -137,7 +142,8 @@ class NotifierDriver(object): self._execute(s_type, subscription, [messages], conf) - def _execute(self, s_type, subscription, messages, conf=None): + def _execute(self, s_type, subscription, messages, conf=None, + retry_policy=None): if self.subscription_controller: data_driver = self.subscription_controller.driver conf = data_driver.conf @@ -147,4 +153,4 @@ class NotifierDriver(object): s_type, invoke_on_load=True) self.executor.submit(mgr.driver.execute, subscription, messages, - conf=conf) + conf=conf, queue_retry_policy=retry_policy) diff --git a/zaqar/notification/tasks/webhook.py b/zaqar/notification/tasks/webhook.py index 0fc9b2e0c..bc3d9356e 100644 --- a/zaqar/notification/tasks/webhook.py +++ b/zaqar/notification/tasks/webhook.py @@ -13,15 +13,84 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time + import json from oslo_log import log as logging import requests +from zaqar.common import consts + LOG = logging.getLogger(__name__) +def _Linear_function(minimum_delay, maximum_delay, times): + return range(minimum_delay, maximum_delay, times) + +RETRY_BACKOFF_FUNCTION_MAP = {'linear': _Linear_function} + + class WebhookTask(object): + def _post_request_success(self, subscriber, data, headers): + try: + response = requests.post(subscriber, data=data, headers=headers) + if response and (response.status_code in range(200, 500)): + return True + except Exception as e: + LOG.exception('post request got exception in retry: %s.', str(e)) + return False + + def _retry_post(self, sub_retry_policy, queue_retry_policy, subscriber, + data, headers): + retry_policy = None + if sub_retry_policy.get('ignore_subscription_override') or \ + queue_retry_policy.get('ignore_subscription_override'): + retry_policy = queue_retry_policy or {} + else: + retry_policy = sub_retry_policy or queue_retry_policy or {} + # Immediate Retry Phase + for retry_with_no_delay in range( + 0, retry_policy.get('retries_with_no_delay', + consts.RETRIES_WITH_NO_DELAY)): + LOG.debug('Retry with no delay, count: %s', retry_with_no_delay) + if self._post_request_success(subscriber, data, headers): + return + # Pre-Backoff Phase + for minimum_delay_retry in range( + 0, retry_policy.get('minimum_delay_retries', + consts.MINIMUM_DELAY_RETRIES)): + LOG.debug('Retry with minimum delay, count: %s', + minimum_delay_retry) + time.sleep(retry_policy.get('minimum_delay', consts.MINIMUM_DELAY)) + if self._post_request_success(subscriber, data, headers): + return + # Backoff Phase: Linear retry + # TODO(wanghao): Now we only support the linear function, we should + # support more in Queens. + retry_function = retry_policy.get('retry_backoff_function', 'linear') + backoff_function = RETRY_BACKOFF_FUNCTION_MAP[retry_function] + for i in backoff_function(retry_policy.get('minimum_delay', + consts.MINIMUM_DELAY), + retry_policy.get('maximum_delay', + consts.MAXIMUM_DELAY), + consts.LINEAR_INTERVAL): + LOG.debug('Retry with retry_backoff_function, sleep: %s seconds', + i) + time.sleep(i) + if self._post_request_success(subscriber, data, headers): + return + # Post-Backoff Phase + for maximum_delay_retries in range( + 0, retry_policy.get('maximum_delay_retries', + consts.MAXIMUM_DELA_RETRIES)): + LOG.debug('Retry with maximum delay, count: %s', + maximum_delay_retries) + time.sleep(retry_policy.get('maximum_delay', consts.MAXIMUM_DELAY)) + if self._post_request_success(subscriber, data, headers): + return + LOG.debug('Send request retries are all failed.') + def execute(self, subscription, messages, headers=None, **kwargs): if headers is None: headers = {'Content-Type': 'application/json'} @@ -37,11 +106,23 @@ class WebhookTask(object): data = data.replace('"$zaqar_message$"', json.dumps(msg)) else: data = json.dumps(msg) - requests.post(subscription['subscriber'], - data=data, - headers=headers) + response = requests.post(subscription['subscriber'], + data=data, + headers=headers) + if response and (response.status_code not in range(200, 500)): + LOG.info("Response is %s, begin to retry", + response.status_code) + self._retry_post( + subscription['options'].get('_retry_policy', {}), + kwargs.get('queue_retry_policy'), + subscription['subscriber'], + data, headers) except Exception as e: LOG.exception('webhook task got exception: %s.', str(e)) + self._retry_post(subscription['options'].get('_retry_policy', {}), + kwargs.get('queue_retry_policy'), + subscription['subscriber'], + data, headers) def register(self, subscriber, options, ttl, project_id, request_data): pass diff --git a/zaqar/storage/pipeline.py b/zaqar/storage/pipeline.py index 4d3314d7f..b673e6b3c 100644 --- a/zaqar/storage/pipeline.py +++ b/zaqar/storage/pipeline.py @@ -155,7 +155,9 @@ class DataDriver(base.DataDriverBase): 'max_notifier_workers': self.conf.notification.max_notifier_workers, 'require_confirmation': - self.conf.notification.require_confirmation} + self.conf.notification.require_confirmation, + 'queue_controller': + self._storage.queue_controller} stages.extend(_get_storage_pipeline('message', self.conf, **kwargs)) stages.append(self._storage.message_controller) return common.Pipeline(stages) diff --git a/zaqar/tests/unit/notification/test_notifier.py b/zaqar/tests/unit/notification/test_notifier.py index 608c16f3f..95c71f81b 100644 --- a/zaqar/tests/unit/notification/test_notifier.py +++ b/zaqar/tests/unit/notification/test_notifier.py @@ -70,9 +70,13 @@ class NotifierTest(testing.TestBase): 'options': {}}] ctlr = mock.MagicMock() ctlr.list = mock.Mock(return_value=iter([subscription, {}])) - driver = notifier.NotifierDriver(subscription_controller=ctlr) + queue_ctlr = mock.MagicMock() + queue_ctlr.get = mock.Mock(return_value={}) + driver = notifier.NotifierDriver(subscription_controller=ctlr, + queue_controller=queue_ctlr) headers = {'Content-Type': 'application/json'} with mock.patch('requests.post') as mock_post: + mock_post.return_value = None driver.post('fake_queue', self.messages, self.client_id, self.project) driver.executor.shutdown() @@ -114,9 +118,13 @@ class NotifierTest(testing.TestBase): 'options': {'post_data': json.dumps(post_data)}}] ctlr = mock.MagicMock() ctlr.list = mock.Mock(return_value=iter([subscription, {}])) - driver = notifier.NotifierDriver(subscription_controller=ctlr) + queue_ctlr = mock.MagicMock() + queue_ctlr.get = mock.Mock(return_value={}) + driver = notifier.NotifierDriver(subscription_controller=ctlr, + queue_controller=queue_ctlr) headers = {'Content-Type': 'application/json'} with mock.patch('requests.post') as mock_post: + mock_post.return_value = None driver.post('fake_queue', self.messages, self.client_id, self.project) driver.executor.shutdown() @@ -155,9 +163,13 @@ class NotifierTest(testing.TestBase): return iter([subscription2, {}]) ctlr.list = mock_list - driver = notifier.NotifierDriver(subscription_controller=ctlr) + queue_ctlr = mock.MagicMock() + queue_ctlr.get = mock.Mock(return_value={}) + driver = notifier.NotifierDriver(subscription_controller=ctlr, + queue_controller=queue_ctlr) headers = {'Content-Type': 'application/json'} with mock.patch('requests.post') as mock_post: + mock_post.return_value = None driver.post('fake_queue', self.messages, self.client_id, self.project) driver.executor.shutdown() @@ -192,7 +204,10 @@ class NotifierTest(testing.TestBase): 'from': 'zaqar@example.com'}}] ctlr = mock.MagicMock() ctlr.list = mock.Mock(return_value=iter([subscription, {}])) - driver = notifier.NotifierDriver(subscription_controller=ctlr) + queue_ctlr = mock.MagicMock() + queue_ctlr.get = mock.Mock(return_value={}) + driver = notifier.NotifierDriver(subscription_controller=ctlr, + queue_controller=queue_ctlr) called = set() msg = ('Content-Type: text/plain; charset="us-ascii"\n' 'MIME-Version: 1.0\nContent-Transfer-Encoding: 7bit\nto:' @@ -242,7 +257,10 @@ class NotifierTest(testing.TestBase): def test_post_no_subscriber(self): ctlr = mock.MagicMock() ctlr.list = mock.Mock(return_value=iter([[], {}])) - driver = notifier.NotifierDriver(subscription_controller=ctlr) + queue_ctlr = mock.MagicMock() + queue_ctlr.get = mock.Mock(return_value={}) + driver = notifier.NotifierDriver(subscription_controller=ctlr, + queue_controller=queue_ctlr) with mock.patch('requests.post') as mock_post: driver.post('fake_queue', self.messages, self.client_id, self.project) @@ -255,8 +273,12 @@ class NotifierTest(testing.TestBase): 'options': {}}] ctlr = mock.MagicMock() ctlr.list = mock.Mock(return_value=iter([subscription, {}])) - driver = notifier.NotifierDriver(subscription_controller=ctlr) + queue_ctlr = mock.MagicMock() + queue_ctlr.get = mock.Mock(return_value={}) + driver = notifier.NotifierDriver(subscription_controller=ctlr, + queue_controller=queue_ctlr) with mock.patch('requests.post') as mock_post: + mock_post.return_value = None driver.post('fake_queue', self.messages, self.client_id, self.project) driver.executor.shutdown() diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py index 8082b75f2..978709607 100644 --- a/zaqar/transport/validation.py +++ b/zaqar/transport/validation.py @@ -232,6 +232,39 @@ class Validator(object): path_list = self._decode_json_pointer(path) return op, path_list + def _validate_retry_policy(self, metadata): + retry_policy = metadata.get('_retry_policy') if metadata else None + if retry_policy and not isinstance(retry_policy, dict): + msg = _('retry_policy must be a dict.') + raise ValidationFailed(msg) + + if retry_policy: + valid_keys = ['retries_with_no_delay', 'minimum_delay_retries', + 'minimum_delay', 'maximum_delay', + 'maximum_delay_retries', 'retry_backoff_function', + 'ignore_subscription_override'] + for key in valid_keys: + retry_value = retry_policy.get(key) + if key == 'retry_backoff_function': + if retry_value and not isinstance(retry_value, str): + msg = _('retry_backoff_function must be a string.') + raise ValidationFailed(msg) + # TODO(wanghao): Now we only support linear function. + # This will be removed after we support more functions. + if retry_value and retry_value != 'linear': + msg = _('retry_backoff_function only supports linear ' + 'now.') + raise ValidationFailed(msg) + elif key == 'ignore_subscription_override': + if retry_value and not isinstance(retry_value, bool): + msg = _('ignore_subscription_override must be a ' + 'boolean.') + raise ValidationFailed(msg) + else: + if retry_value and not isinstance(retry_value, int): + msg = _('Retry policy: %s must be a integer.') % key + raise ValidationFailed(msg) + def queue_patching(self, request, changes): washed_changes = [] content_types = { @@ -344,6 +377,8 @@ class Validator(object): raise ValidationFailed(msg, self._limits_conf.max_message_ttl, MIN_MESSAGE_TTL) + self._validate_retry_policy(queue_metadata) + def queue_purging(self, document): """Restrictions the resource types to be purged for a queue. @@ -559,6 +594,8 @@ class Validator(object): msg = _(u'Options must be a dict.') raise ValidationFailed(msg) + self._validate_retry_policy(options) + ttl = subscription.get('ttl') if ttl: if not isinstance(ttl, int):