Retry handler
This patch adds the Retry handler that can be used as part of the event handling pipeline to retry failed handlers. Change-Id: Ia86790de8efa6a3ef5b677a70ffbd2d8201f9d95 Partially-Implements: blueprint kuryr-k8s-integration
This commit is contained in:
parent
a3790b1555
commit
5f6c9a574e
@ -15,7 +15,7 @@
|
||||
|
||||
import sys
|
||||
|
||||
from kuryr.lib._i18n import _LI
|
||||
from kuryr.lib._i18n import _LI, _LE
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import service
|
||||
|
||||
@ -24,6 +24,7 @@ from kuryr_kubernetes import config
|
||||
from kuryr_kubernetes import constants
|
||||
from kuryr_kubernetes.handlers import dispatch as h_dis
|
||||
from kuryr_kubernetes.handlers import k8s_base as h_k8s
|
||||
from kuryr_kubernetes.handlers import retry as h_retry
|
||||
from kuryr_kubernetes import watcher
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -38,6 +39,15 @@ class KuryrK8sService(service.Service):
|
||||
class DummyHandler(h_k8s.ResourceEventHandler):
|
||||
OBJECT_KIND = constants.K8S_OBJ_NAMESPACE
|
||||
|
||||
def __init__(self):
|
||||
self.event_seq = 0
|
||||
|
||||
def __call__(self, event):
|
||||
self.event_seq += 1
|
||||
if self.event_seq % 4:
|
||||
raise Exception(_LE("Dummy exception %s") % self.event_seq)
|
||||
super(DummyHandler, self).__call__(event)
|
||||
|
||||
def on_added(self, event):
|
||||
LOG.debug("added: %s",
|
||||
event['object']['metadata']['selfLink'])
|
||||
@ -54,7 +64,12 @@ class KuryrK8sService(service.Service):
|
||||
LOG.debug("present: %s",
|
||||
event['object']['metadata']['selfLink'])
|
||||
|
||||
pipeline = h_dis.EventPipeline()
|
||||
class DummyPipeline(h_dis.EventPipeline):
|
||||
def _wrap_consumer(self, consumer):
|
||||
retry = h_retry.Retry(consumer)
|
||||
return super(DummyPipeline, self)._wrap_consumer(retry)
|
||||
|
||||
pipeline = DummyPipeline()
|
||||
pipeline.register(DummyHandler())
|
||||
self.watcher = watcher.Watcher(pipeline, self.tg)
|
||||
self.watcher.add(constants.K8S_API_NAMESPACES)
|
||||
|
@ -16,3 +16,7 @@
|
||||
|
||||
class K8sClientException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def format_msg(exception):
|
||||
return "%s: %s" % (exception.__class__.__name__, exception)
|
||||
|
93
kuryr_kubernetes/handlers/retry.py
Normal file
93
kuryr_kubernetes/handlers/retry.py
Normal file
@ -0,0 +1,93 @@
|
||||
# Copyright (c) 2016 Mirantis, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
import random
|
||||
import time
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
|
||||
from kuryr_kubernetes import exceptions
|
||||
from kuryr_kubernetes.handlers import base
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_TIMEOUT = 180
|
||||
DEFAULT_INTERVAL = 3
|
||||
|
||||
|
||||
class Retry(base.EventHandler):
|
||||
"""Retries handler on failure.
|
||||
|
||||
`Retry` can be used to decorate another `handler` to be retried whenever
|
||||
it raises any of the specified `exceptions`. If the `handler` does not
|
||||
succeed within the time limit specified by `timeout`, `Retry` will
|
||||
raise the exception risen by `handler`. `Retry` does not interrupt the
|
||||
`handler`, so the actual time spent within a single call to `Retry` may
|
||||
exceed the `timeout` depending on responsiveness of the `handler`.
|
||||
|
||||
`Retry` implements a variation of exponential backoff algorithm [1] and
|
||||
ensures that there is a minimal time `interval` after the failed
|
||||
`handler` is retried for the same `event` (expected backoff E(c) =
|
||||
interval * 2 ** c / 2).
|
||||
|
||||
[1] https://en.wikipedia.org/wiki/Exponential_backoff
|
||||
"""
|
||||
|
||||
def __init__(self, handler, exceptions=Exception,
|
||||
timeout=DEFAULT_TIMEOUT, interval=DEFAULT_INTERVAL):
|
||||
self._handler = handler
|
||||
self._exceptions = exceptions
|
||||
self._timeout = timeout
|
||||
self._interval = interval
|
||||
|
||||
def __call__(self, event):
|
||||
deadline = time.time() + self._timeout
|
||||
for attempt in itertools.count(1):
|
||||
try:
|
||||
self._handler(event)
|
||||
break
|
||||
except self._exceptions:
|
||||
with excutils.save_and_reraise_exception() as ex:
|
||||
if self._sleep(deadline, attempt, ex.value):
|
||||
ex.reraise = False
|
||||
|
||||
def _sleep(self, deadline, attempt, exception):
|
||||
now = time.time()
|
||||
seconds_left = deadline - now
|
||||
|
||||
if seconds_left <= 0:
|
||||
LOG.debug("Handler %s failed (attempt %s; %s), "
|
||||
"timeout exceeded (%s seconds)",
|
||||
self._handler, attempt, exceptions.format_msg(exception),
|
||||
self._timeout)
|
||||
return 0
|
||||
|
||||
interval = random.randint(1, 2 ** attempt - 1) * self._interval
|
||||
|
||||
if interval > seconds_left:
|
||||
interval = seconds_left
|
||||
|
||||
if interval < self._interval:
|
||||
interval = self._interval
|
||||
|
||||
LOG.debug("Handler %s failed (attempt %s; %s), "
|
||||
"retrying in %s seconds",
|
||||
self._handler, attempt, exceptions.format_msg(exception),
|
||||
interval)
|
||||
|
||||
time.sleep(interval)
|
||||
return interval
|
150
kuryr_kubernetes/tests/unit/handlers/test_retry.py
Normal file
150
kuryr_kubernetes/tests/unit/handlers/test_retry.py
Normal file
@ -0,0 +1,150 @@
|
||||
# Copyright (c) 2016 Mirantis, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
import time
|
||||
|
||||
from kuryr_kubernetes.handlers import retry as h_retry
|
||||
from kuryr_kubernetes.tests import base as test_base
|
||||
|
||||
|
||||
class _EX1(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class _EX11(_EX1):
|
||||
pass
|
||||
|
||||
|
||||
class _EX2(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class TestRetryHandler(test_base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestRetryHandler, self).setUp()
|
||||
|
||||
self.now = time.time()
|
||||
f_time = self.useFixture(fixtures.MockPatch('time.time'))
|
||||
f_time.mock.return_value = self.now
|
||||
|
||||
@mock.patch('random.randint')
|
||||
@mock.patch('time.sleep')
|
||||
def test_should_not_sleep(self, m_sleep, m_randint):
|
||||
deadline = self.now - 1
|
||||
retry = h_retry.Retry(mock.Mock())
|
||||
|
||||
ret = retry._sleep(deadline, 1, _EX1())
|
||||
|
||||
self.assertFalse(ret)
|
||||
m_sleep.assert_not_called()
|
||||
m_randint.assert_not_called()
|
||||
|
||||
def _test_should_sleep(self, seconds_left, slept):
|
||||
attempt = 5
|
||||
timeout = 20
|
||||
interval = 3
|
||||
randint = 2
|
||||
deadline = self.now + seconds_left
|
||||
retry = h_retry.Retry(mock.Mock(), timeout=timeout, interval=interval)
|
||||
|
||||
with mock.patch('random.randint') as m_randint, \
|
||||
mock.patch('time.sleep') as m_sleep:
|
||||
m_randint.return_value = randint
|
||||
|
||||
ret = retry._sleep(deadline, attempt, _EX2())
|
||||
|
||||
self.assertEqual(slept, ret)
|
||||
m_randint.assert_called_once_with(1, 2 ** attempt - 1)
|
||||
m_sleep.assert_called_once_with(slept)
|
||||
|
||||
def test_should_sleep(self):
|
||||
self._test_should_sleep(7, 6)
|
||||
|
||||
def test_should_sleep_last(self):
|
||||
self._test_should_sleep(5, 5)
|
||||
|
||||
def test_should_sleep_last_capped(self):
|
||||
self._test_should_sleep(2, 3)
|
||||
|
||||
@mock.patch('itertools.count')
|
||||
@mock.patch.object(h_retry.Retry, '_sleep')
|
||||
def test_call(self, m_sleep, m_count):
|
||||
m_handler = mock.Mock()
|
||||
m_count.return_value = list(range(1, 5))
|
||||
retry = h_retry.Retry(m_handler)
|
||||
|
||||
retry(mock.sentinel.event)
|
||||
|
||||
m_handler.assert_called_once_with(mock.sentinel.event)
|
||||
m_sleep.assert_not_called()
|
||||
|
||||
@mock.patch('itertools.count')
|
||||
@mock.patch.object(h_retry.Retry, '_sleep')
|
||||
def test_call_retry(self, m_sleep, m_count):
|
||||
attempts = 3
|
||||
timeout = 10
|
||||
deadline = self.now + timeout
|
||||
failures = [_EX1()] * (attempts - 1)
|
||||
event = mock.sentinel.event
|
||||
m_handler = mock.Mock()
|
||||
m_handler.side_effect = failures + [None]
|
||||
m_sleep.return_value = 1
|
||||
m_count.return_value = list(range(1, 5))
|
||||
retry = h_retry.Retry(m_handler, timeout=timeout, exceptions=_EX1)
|
||||
|
||||
retry(event)
|
||||
|
||||
m_handler.assert_has_calls([mock.call(event)] * attempts)
|
||||
m_sleep.assert_has_calls([
|
||||
mock.call(deadline, i + 1, failures[i])
|
||||
for i in range(len(failures))])
|
||||
|
||||
@mock.patch('itertools.count')
|
||||
@mock.patch.object(h_retry.Retry, '_sleep')
|
||||
def test_call_retry_raises(self, m_sleep, m_count):
|
||||
attempts = 3
|
||||
timeout = 10
|
||||
deadline = self.now + timeout
|
||||
failures = [_EX1(), _EX1(), _EX11()]
|
||||
event = mock.sentinel.event
|
||||
m_handler = mock.Mock()
|
||||
m_handler.side_effect = failures
|
||||
m_sleep.side_effect = [1] * (attempts - 1) + [0]
|
||||
m_count.return_value = list(range(1, 5))
|
||||
retry = h_retry.Retry(m_handler, timeout=timeout, exceptions=_EX1)
|
||||
|
||||
self.assertRaises(_EX11, retry, event)
|
||||
|
||||
m_handler.assert_has_calls([mock.call(event)] * attempts)
|
||||
m_sleep.assert_has_calls([
|
||||
mock.call(deadline, i + 1, failures[i])
|
||||
for i in range(len(failures))])
|
||||
|
||||
@mock.patch('itertools.count')
|
||||
@mock.patch.object(h_retry.Retry, '_sleep')
|
||||
def test_call_raises_no_retry(self, m_sleep, m_count):
|
||||
event = mock.sentinel.event
|
||||
m_handler = mock.Mock()
|
||||
m_handler.side_effect = _EX1()
|
||||
m_count.return_value = list(range(1, 5))
|
||||
retry = h_retry.Retry(m_handler, exceptions=(_EX11, _EX2))
|
||||
|
||||
self.assertRaises(_EX1, retry, event)
|
||||
|
||||
m_handler.assert_called_once_with(event)
|
||||
m_sleep.assert_not_called()
|
Loading…
x
Reference in New Issue
Block a user