diff --git a/doc/source/user/index.rst b/doc/source/user/index.rst index 546f68ae1..2b6ba0726 100644 --- a/doc/source/user/index.rst +++ b/doc/source/user/index.rst @@ -9,3 +9,4 @@ User Guide send_request_api authentication_tokens headers_queue_api_working + notification_delivery_policy diff --git a/doc/source/user/notification_delivery_policy.rst b/doc/source/user/notification_delivery_policy.rst new file mode 100644 index 000000000..6e90d82de --- /dev/null +++ b/doc/source/user/notification_delivery_policy.rst @@ -0,0 +1,68 @@ +.. + Licensed under the Apache License, Version 2.0 (the "License"); you may + not use this file except in compliance with the License. You may obtain + a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + +====================================== +The Notification Delivery Policy Guide +====================================== + +Support notification delivery policy in webhook type. It will work when +the notification is sent from Zaqar to the subscriber failed. +This guide shows how to use this feature: + +Webhook +------- + +.. note:: + + You should make sure that the message notification is enabled. By default, + the ``message_pipeline`` config option in [storage] section should be set + like: message_pipeline = zaqar.notification.notifier + +1. Create the queue with _retry_policy metadata like this: + +.. code:: json + + { + '_retry_policy': { + 'retries_with_no_delay': , + 'minimum_delay_retries': , + 'minimum_delay': , + 'maximum_delay': , + 'maximum_delay_retries': , + 'retry_backoff_function': , + 'ignore_subscription_override': } + } + +- 'minimum_delay' and 'maximum_delay' mean delay time in seconds. +- 'retry_backoff_function' mean name of retry backoff function. + There will be a enum in Zaqar that contain all valid values. + At first step, Zaqar only supports one function: 'linear'. +- 'minimum_delay_retries' and 'maximum_delay_retries' mean the number of + retries with 'minimum_delay' or 'maximum_delay' delay time. + +If value of retry_policy is empty dict, that Zaqar will use default +value to those keys: + +- retries_with_no_delay=3 +- minimum_delay_retries=3 +- minimum_delay=5 +- maximum_delay=30 +- maximum_delay_retries=3 +- retry_backoff_function=linear +- ignore_subscription_override=False + +2. Create a subscription with options like queue's metadata below. If user + don't set the options, Zaqar will use the retry policy in queue's metadata. + If user do it, Zaqar will use the retry policy in options by default, if + user still want to use retry policy in queue's metadata, then can set the + ignore_subscription_override = True. diff --git a/releasenotes/notes/support-notification-delivery-policy-fbc94083b4e6b8d0.yaml b/releasenotes/notes/support-notification-delivery-policy-fbc94083b4e6b8d0.yaml new file mode 100644 index 000000000..37b6f5d64 --- /dev/null +++ b/releasenotes/notes/support-notification-delivery-policy-fbc94083b4e6b8d0.yaml @@ -0,0 +1,6 @@ +--- +features: + - Support notificaiton delivery policy in webhook type. It will work when + the notification is sent from Zaqar to the subscriber failed. + User can define the retry policy in the options of subscription or + metadata of queue. diff --git a/zaqar/common/consts.py b/zaqar/common/consts.py index 96cb1adb4..05e59e8b5 100644 --- a/zaqar/common/consts.py +++ b/zaqar/common/consts.py @@ -115,3 +115,19 @@ FLAVOR_OPS = ( 'flavor_update', 'flavor_delete', ) + +RETRY_OPS = ( + RETRIES_WITH_NO_DELAY, + MINIMUM_DELAY_RETRIES, + MINIMUM_DELAY, + MAXIMUM_DELAY, + MAXIMUM_DELA_RETRIES, + LINEAR_INTERVAL, +) = ( + 3, + 3, + 5, + 30, + 3, + 5, +) diff --git a/zaqar/notification/notifier.py b/zaqar/notification/notifier.py index 485e91ec2..9e94f7ef7 100644 --- a/zaqar/notification/notifier.py +++ b/zaqar/notification/notifier.py @@ -45,6 +45,7 @@ class NotifierDriver(object): max_workers = kwargs.get('max_notifier_workers', 10) self.executor = futurist.ThreadPoolExecutor(max_workers=max_workers) self.require_confirmation = kwargs.get('require_confirmation', False) + self.queue_controller = kwargs.get('queue_controller') def post(self, queue_name, messages, client_uuid, project=None): """Send messages to the subscribers.""" @@ -52,6 +53,9 @@ class NotifierDriver(object): if not isinstance(self.subscription_controller, pooling.SubscriptionController): marker = None + queue_metadata = self.queue_controller.get(queue_name, + project) + retry_policy = queue_metadata.get('_retry_policy', {}) while True: subscribers = self.subscription_controller.list( queue_name, project, marker=marker) @@ -70,7 +74,8 @@ class NotifierDriver(object): continue for msg in messages: msg['Message_Type'] = MessageType.Notification.name - self._execute(s_type, sub, messages) + self._execute(s_type, sub, messages, + retry_policy=retry_policy) marker = next(subscribers) if not marker: break @@ -137,7 +142,8 @@ class NotifierDriver(object): self._execute(s_type, subscription, [messages], conf) - def _execute(self, s_type, subscription, messages, conf=None): + def _execute(self, s_type, subscription, messages, conf=None, + retry_policy=None): if self.subscription_controller: data_driver = self.subscription_controller.driver conf = data_driver.conf @@ -147,4 +153,4 @@ class NotifierDriver(object): s_type, invoke_on_load=True) self.executor.submit(mgr.driver.execute, subscription, messages, - conf=conf) + conf=conf, queue_retry_policy=retry_policy) diff --git a/zaqar/notification/tasks/webhook.py b/zaqar/notification/tasks/webhook.py index 0fc9b2e0c..bc3d9356e 100644 --- a/zaqar/notification/tasks/webhook.py +++ b/zaqar/notification/tasks/webhook.py @@ -13,15 +13,84 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time + import json from oslo_log import log as logging import requests +from zaqar.common import consts + LOG = logging.getLogger(__name__) +def _Linear_function(minimum_delay, maximum_delay, times): + return range(minimum_delay, maximum_delay, times) + +RETRY_BACKOFF_FUNCTION_MAP = {'linear': _Linear_function} + + class WebhookTask(object): + def _post_request_success(self, subscriber, data, headers): + try: + response = requests.post(subscriber, data=data, headers=headers) + if response and (response.status_code in range(200, 500)): + return True + except Exception as e: + LOG.exception('post request got exception in retry: %s.', str(e)) + return False + + def _retry_post(self, sub_retry_policy, queue_retry_policy, subscriber, + data, headers): + retry_policy = None + if sub_retry_policy.get('ignore_subscription_override') or \ + queue_retry_policy.get('ignore_subscription_override'): + retry_policy = queue_retry_policy or {} + else: + retry_policy = sub_retry_policy or queue_retry_policy or {} + # Immediate Retry Phase + for retry_with_no_delay in range( + 0, retry_policy.get('retries_with_no_delay', + consts.RETRIES_WITH_NO_DELAY)): + LOG.debug('Retry with no delay, count: %s', retry_with_no_delay) + if self._post_request_success(subscriber, data, headers): + return + # Pre-Backoff Phase + for minimum_delay_retry in range( + 0, retry_policy.get('minimum_delay_retries', + consts.MINIMUM_DELAY_RETRIES)): + LOG.debug('Retry with minimum delay, count: %s', + minimum_delay_retry) + time.sleep(retry_policy.get('minimum_delay', consts.MINIMUM_DELAY)) + if self._post_request_success(subscriber, data, headers): + return + # Backoff Phase: Linear retry + # TODO(wanghao): Now we only support the linear function, we should + # support more in Queens. + retry_function = retry_policy.get('retry_backoff_function', 'linear') + backoff_function = RETRY_BACKOFF_FUNCTION_MAP[retry_function] + for i in backoff_function(retry_policy.get('minimum_delay', + consts.MINIMUM_DELAY), + retry_policy.get('maximum_delay', + consts.MAXIMUM_DELAY), + consts.LINEAR_INTERVAL): + LOG.debug('Retry with retry_backoff_function, sleep: %s seconds', + i) + time.sleep(i) + if self._post_request_success(subscriber, data, headers): + return + # Post-Backoff Phase + for maximum_delay_retries in range( + 0, retry_policy.get('maximum_delay_retries', + consts.MAXIMUM_DELA_RETRIES)): + LOG.debug('Retry with maximum delay, count: %s', + maximum_delay_retries) + time.sleep(retry_policy.get('maximum_delay', consts.MAXIMUM_DELAY)) + if self._post_request_success(subscriber, data, headers): + return + LOG.debug('Send request retries are all failed.') + def execute(self, subscription, messages, headers=None, **kwargs): if headers is None: headers = {'Content-Type': 'application/json'} @@ -37,11 +106,23 @@ class WebhookTask(object): data = data.replace('"$zaqar_message$"', json.dumps(msg)) else: data = json.dumps(msg) - requests.post(subscription['subscriber'], - data=data, - headers=headers) + response = requests.post(subscription['subscriber'], + data=data, + headers=headers) + if response and (response.status_code not in range(200, 500)): + LOG.info("Response is %s, begin to retry", + response.status_code) + self._retry_post( + subscription['options'].get('_retry_policy', {}), + kwargs.get('queue_retry_policy'), + subscription['subscriber'], + data, headers) except Exception as e: LOG.exception('webhook task got exception: %s.', str(e)) + self._retry_post(subscription['options'].get('_retry_policy', {}), + kwargs.get('queue_retry_policy'), + subscription['subscriber'], + data, headers) def register(self, subscriber, options, ttl, project_id, request_data): pass diff --git a/zaqar/storage/pipeline.py b/zaqar/storage/pipeline.py index 4d3314d7f..b673e6b3c 100644 --- a/zaqar/storage/pipeline.py +++ b/zaqar/storage/pipeline.py @@ -155,7 +155,9 @@ class DataDriver(base.DataDriverBase): 'max_notifier_workers': self.conf.notification.max_notifier_workers, 'require_confirmation': - self.conf.notification.require_confirmation} + self.conf.notification.require_confirmation, + 'queue_controller': + self._storage.queue_controller} stages.extend(_get_storage_pipeline('message', self.conf, **kwargs)) stages.append(self._storage.message_controller) return common.Pipeline(stages) diff --git a/zaqar/tests/unit/notification/test_notifier.py b/zaqar/tests/unit/notification/test_notifier.py index 608c16f3f..95c71f81b 100644 --- a/zaqar/tests/unit/notification/test_notifier.py +++ b/zaqar/tests/unit/notification/test_notifier.py @@ -70,9 +70,13 @@ class NotifierTest(testing.TestBase): 'options': {}}] ctlr = mock.MagicMock() ctlr.list = mock.Mock(return_value=iter([subscription, {}])) - driver = notifier.NotifierDriver(subscription_controller=ctlr) + queue_ctlr = mock.MagicMock() + queue_ctlr.get = mock.Mock(return_value={}) + driver = notifier.NotifierDriver(subscription_controller=ctlr, + queue_controller=queue_ctlr) headers = {'Content-Type': 'application/json'} with mock.patch('requests.post') as mock_post: + mock_post.return_value = None driver.post('fake_queue', self.messages, self.client_id, self.project) driver.executor.shutdown() @@ -114,9 +118,13 @@ class NotifierTest(testing.TestBase): 'options': {'post_data': json.dumps(post_data)}}] ctlr = mock.MagicMock() ctlr.list = mock.Mock(return_value=iter([subscription, {}])) - driver = notifier.NotifierDriver(subscription_controller=ctlr) + queue_ctlr = mock.MagicMock() + queue_ctlr.get = mock.Mock(return_value={}) + driver = notifier.NotifierDriver(subscription_controller=ctlr, + queue_controller=queue_ctlr) headers = {'Content-Type': 'application/json'} with mock.patch('requests.post') as mock_post: + mock_post.return_value = None driver.post('fake_queue', self.messages, self.client_id, self.project) driver.executor.shutdown() @@ -155,9 +163,13 @@ class NotifierTest(testing.TestBase): return iter([subscription2, {}]) ctlr.list = mock_list - driver = notifier.NotifierDriver(subscription_controller=ctlr) + queue_ctlr = mock.MagicMock() + queue_ctlr.get = mock.Mock(return_value={}) + driver = notifier.NotifierDriver(subscription_controller=ctlr, + queue_controller=queue_ctlr) headers = {'Content-Type': 'application/json'} with mock.patch('requests.post') as mock_post: + mock_post.return_value = None driver.post('fake_queue', self.messages, self.client_id, self.project) driver.executor.shutdown() @@ -192,7 +204,10 @@ class NotifierTest(testing.TestBase): 'from': 'zaqar@example.com'}}] ctlr = mock.MagicMock() ctlr.list = mock.Mock(return_value=iter([subscription, {}])) - driver = notifier.NotifierDriver(subscription_controller=ctlr) + queue_ctlr = mock.MagicMock() + queue_ctlr.get = mock.Mock(return_value={}) + driver = notifier.NotifierDriver(subscription_controller=ctlr, + queue_controller=queue_ctlr) called = set() msg = ('Content-Type: text/plain; charset="us-ascii"\n' 'MIME-Version: 1.0\nContent-Transfer-Encoding: 7bit\nto:' @@ -242,7 +257,10 @@ class NotifierTest(testing.TestBase): def test_post_no_subscriber(self): ctlr = mock.MagicMock() ctlr.list = mock.Mock(return_value=iter([[], {}])) - driver = notifier.NotifierDriver(subscription_controller=ctlr) + queue_ctlr = mock.MagicMock() + queue_ctlr.get = mock.Mock(return_value={}) + driver = notifier.NotifierDriver(subscription_controller=ctlr, + queue_controller=queue_ctlr) with mock.patch('requests.post') as mock_post: driver.post('fake_queue', self.messages, self.client_id, self.project) @@ -255,8 +273,12 @@ class NotifierTest(testing.TestBase): 'options': {}}] ctlr = mock.MagicMock() ctlr.list = mock.Mock(return_value=iter([subscription, {}])) - driver = notifier.NotifierDriver(subscription_controller=ctlr) + queue_ctlr = mock.MagicMock() + queue_ctlr.get = mock.Mock(return_value={}) + driver = notifier.NotifierDriver(subscription_controller=ctlr, + queue_controller=queue_ctlr) with mock.patch('requests.post') as mock_post: + mock_post.return_value = None driver.post('fake_queue', self.messages, self.client_id, self.project) driver.executor.shutdown() diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py index 8082b75f2..978709607 100644 --- a/zaqar/transport/validation.py +++ b/zaqar/transport/validation.py @@ -232,6 +232,39 @@ class Validator(object): path_list = self._decode_json_pointer(path) return op, path_list + def _validate_retry_policy(self, metadata): + retry_policy = metadata.get('_retry_policy') if metadata else None + if retry_policy and not isinstance(retry_policy, dict): + msg = _('retry_policy must be a dict.') + raise ValidationFailed(msg) + + if retry_policy: + valid_keys = ['retries_with_no_delay', 'minimum_delay_retries', + 'minimum_delay', 'maximum_delay', + 'maximum_delay_retries', 'retry_backoff_function', + 'ignore_subscription_override'] + for key in valid_keys: + retry_value = retry_policy.get(key) + if key == 'retry_backoff_function': + if retry_value and not isinstance(retry_value, str): + msg = _('retry_backoff_function must be a string.') + raise ValidationFailed(msg) + # TODO(wanghao): Now we only support linear function. + # This will be removed after we support more functions. + if retry_value and retry_value != 'linear': + msg = _('retry_backoff_function only supports linear ' + 'now.') + raise ValidationFailed(msg) + elif key == 'ignore_subscription_override': + if retry_value and not isinstance(retry_value, bool): + msg = _('ignore_subscription_override must be a ' + 'boolean.') + raise ValidationFailed(msg) + else: + if retry_value and not isinstance(retry_value, int): + msg = _('Retry policy: %s must be a integer.') % key + raise ValidationFailed(msg) + def queue_patching(self, request, changes): washed_changes = [] content_types = { @@ -344,6 +377,8 @@ class Validator(object): raise ValidationFailed(msg, self._limits_conf.max_message_ttl, MIN_MESSAGE_TTL) + self._validate_retry_policy(queue_metadata) + def queue_purging(self, document): """Restrictions the resource types to be purged for a queue. @@ -559,6 +594,8 @@ class Validator(object): msg = _(u'Options must be a dict.') raise ValidationFailed(msg) + self._validate_retry_policy(options) + ttl = subscription.get('ttl') if ttl: if not isinstance(ttl, int):