diff --git a/kuryr_kubernetes/config.py b/kuryr_kubernetes/config.py index 215aa96bf..1a510e9ee 100644 --- a/kuryr_kubernetes/config.py +++ b/kuryr_kubernetes/config.py @@ -142,6 +142,9 @@ k8s_opts = [ cfg.BoolOpt('enable_manager', help=_("Enable Manager to manage the pools."), default=False), + cfg.IntOpt('watch_retry_timeout', + help=_('Time (in seconds) the watcher retries watching for.'), + default=60), ] neutron_defaults = [ diff --git a/kuryr_kubernetes/handlers/retry.py b/kuryr_kubernetes/handlers/retry.py index a4671a14e..dedc3eae0 100644 --- a/kuryr_kubernetes/handlers/retry.py +++ b/kuryr_kubernetes/handlers/retry.py @@ -14,7 +14,6 @@ # under the License. import itertools -import random import time from oslo_log import log as logging @@ -22,12 +21,10 @@ from oslo_utils import excutils from kuryr_kubernetes import exceptions from kuryr_kubernetes.handlers import base +from kuryr_kubernetes import utils LOG = logging.getLogger(__name__) -DEFAULT_TIMEOUT = 180 -DEFAULT_INTERVAL = 3 - class Retry(base.EventHandler): """Retries handler on failure. @@ -39,16 +36,13 @@ class Retry(base.EventHandler): `handler`, so the actual time spent within a single call to `Retry` may exceed the `timeout` depending on responsiveness of the `handler`. - `Retry` implements a variation of exponential backoff algorithm [1] and - ensures that there is a minimal time `interval` after the failed `handler` is retried for the same `event` (expected backoff E(c) = interval * 2 ** c / 2). - - [1] https://en.wikipedia.org/wiki/Exponential_backoff """ def __init__(self, handler, exceptions=Exception, - timeout=DEFAULT_TIMEOUT, interval=DEFAULT_INTERVAL): + timeout=utils.DEFAULT_TIMEOUT, + interval=utils.DEFAULT_INTERVAL): self._handler = handler self._exceptions = exceptions self._timeout = timeout @@ -73,28 +67,17 @@ class Retry(base.EventHandler): raise def _sleep(self, deadline, attempt, exception): - now = time.time() - seconds_left = deadline - now - - if seconds_left <= 0: + LOG.debug("Handler %s failed (attempt %s; %s)", + self._handler, attempt, exceptions.format_msg(exception)) + interval = utils.exponential_sleep(deadline, attempt, + self._interval) + if not interval: LOG.debug("Handler %s failed (attempt %s; %s), " "timeout exceeded (%s seconds)", self._handler, attempt, exceptions.format_msg(exception), self._timeout) return 0 - interval = random.randint(1, 2 ** attempt - 1) * self._interval - - if interval > seconds_left: - interval = seconds_left - - if interval < self._interval: - interval = self._interval - - LOG.debug("Handler %s failed (attempt %s; %s), " - "retrying in %s seconds", - self._handler, attempt, exceptions.format_msg(exception), - interval) - - time.sleep(interval) + LOG.debug("Resumed after %s seconds. Retry handler %s", interval, + self._handler) return interval diff --git a/kuryr_kubernetes/tests/unit/test_watcher.py b/kuryr_kubernetes/tests/unit/test_watcher.py index dc1234aa8..ba35713af 100644 --- a/kuryr_kubernetes/tests/unit/test_watcher.py +++ b/kuryr_kubernetes/tests/unit/test_watcher.py @@ -210,8 +210,8 @@ class TestWatcher(test_base.TestCase): self.client.watch.side_effect = client_watch @staticmethod - def _test_watch_create_watcher(path, handler): - watcher_obj = watcher.Watcher(handler) + def _test_watch_create_watcher(path, handler, timeout=0): + watcher_obj = watcher.Watcher(handler, timeout=timeout) watcher_obj._running = True watcher_obj._resources.add(path) watcher_obj._idle[path] = True @@ -232,6 +232,7 @@ class TestWatcher(test_base.TestCase): watcher_obj._watch(path) + self.assertEqual(0, watcher_obj._timeout) m_handler.assert_has_calls([mock.call(e) for e in events]) def test_watch_stopped(self): @@ -301,3 +302,23 @@ class TestWatcher(test_base.TestCase): self.client.watch.assert_called_once() self.assertFalse(watcher_obj._healthy) + + def test_watch_retry(self): + path = '/test' + events = [{'e': i} for i in range(3)] + m_handler = mock.Mock() + watcher_obj = self._test_watch_create_watcher(path, m_handler, 10) + + self.retry = True + + def handler(event): + if self.retry: + self.retry = False + raise exceptions.ChunkedEncodingError("Connection Broken") + + self.client.watch.side_effect = handler + self._test_watch_mock_events(watcher_obj, events) + + watcher_obj._watch(path) + + m_handler.assert_has_calls([mock.call(e) for e in events]) diff --git a/kuryr_kubernetes/utils.py b/kuryr_kubernetes/utils.py index 3e0a89182..68d3e367d 100644 --- a/kuryr_kubernetes/utils.py +++ b/kuryr_kubernetes/utils.py @@ -10,6 +10,9 @@ # License for the specific language governing permissions and limitations # under the License. +import random +import time + from oslo_config import cfg from oslo_serialization import jsonutils @@ -21,6 +24,8 @@ VALID_MULTI_POD_POOLS_OPTS = {'noop': ['neutron-vif', 'neutron': ['neutron-vif'], 'nested': ['nested-vlan'], } +DEFAULT_TIMEOUT = 180 +DEFAULT_INTERVAL = 3 def utf8_json_decoder(byte_data): @@ -61,3 +66,35 @@ def get_pod_unique_name(pod): def check_suitable_multi_pool_driver_opt(pool_driver, pod_driver): return pod_driver in VALID_MULTI_POD_POOLS_OPTS.get(pool_driver, []) + + +def exponential_sleep(deadline, attempt, interval=DEFAULT_INTERVAL): + """Sleep for exponential duration. + + This implements a variation of exponential backoff algorithm [1] and + ensures that there is a minimal time `interval` to sleep. + (expected backoff E(c) = interval * 2 ** c / 2). + + [1] https://en.wikipedia.org/wiki/Exponential_backoff + + :param deadline: sleep timeout duration in seconds. + :param attempt: attempt count of sleep function. + :param interval: minimal time interval to sleep + :return: the actual time that we've slept + """ + now = time.time() + seconds_left = deadline - now + + if seconds_left <= 0: + return 0 + + interval = random.randint(1, 2 ** attempt - 1) * DEFAULT_INTERVAL + + if interval > seconds_left: + interval = seconds_left + + if interval < DEFAULT_INTERVAL: + interval = DEFAULT_INTERVAL + + time.sleep(interval) + return interval diff --git a/kuryr_kubernetes/watcher.py b/kuryr_kubernetes/watcher.py index 2c11a52b1..d5314fa12 100644 --- a/kuryr_kubernetes/watcher.py +++ b/kuryr_kubernetes/watcher.py @@ -13,11 +13,16 @@ # License for the specific language governing permissions and limitations # under the License. +import time + from kuryr_kubernetes import clients from kuryr_kubernetes.handlers import health +from kuryr_kubernetes import utils +from oslo_config import cfg from oslo_log import log as logging LOG = logging.getLogger(__name__) +CONF = cfg.CONF class Watcher(health.HealthHandler): @@ -50,7 +55,7 @@ class Watcher(health.HealthHandler): graceful=False)` for asynchronous `Watcher`). """ - def __init__(self, handler, thread_group=None): + def __init__(self, handler, thread_group=None, timeout=None): """Initializes a new Watcher instance. :param handler: a `callable` object to be invoked for each observed @@ -74,6 +79,10 @@ class Watcher(health.HealthHandler): self._watching = {} self._idle = {} + if timeout is None: + timeout = CONF.kubernetes.watch_retry_timeout + self._timeout = timeout + def add(self, path): """Adds ths K8s resource to the Watcher. @@ -132,18 +141,46 @@ class Watcher(health.HealthHandler): if self._thread_group: self._watching[path].stop() - def _watch(self, path): + def _graceful_watch_exit(self, path): try: - LOG.info("Started watching '%s'", path) - for event in self._client.watch(path): - self._idle[path] = False - self._handler(event) - self._idle[path] = True - if not (self._running and path in self._resources): - return - except Exception: - self._healthy = False - finally: self._watching.pop(path) self._idle.pop(path) LOG.info("Stopped watching '%s'", path) + except KeyError: + LOG.error("Failed to exit watch gracefully") + + def _watch(self, path): + attempts = 0 + deadline = 0 + while self._running and path in self._resources: + try: + retry = False + if attempts == 1: + deadline = time.time() + self._timeout + + if (attempts > 0 and + utils.exponential_sleep(deadline, attempts) == 0): + LOG.error("Failed watching '%s': deadline exceeded", path) + self._healthy = False + return + + LOG.info("Started watching '%s'", path) + for event in self._client.watch(path): + # NOTE(esevan): Watcher retries watching for + # `self._timeout` duration with exponential backoff + # algorithm to tolerate against temporal exception such as + # temporal disconnection to the k8s api server. + attempts = 0 + self._idle[path] = False + self._handler(event) + self._idle[path] = True + if not (self._running and path in self._resources): + return + except Exception as e: + LOG.warning("Restarting(%s) watching '%s': %s", + attempts, path, e) + attempts += 1 + retry = True + finally: + if not retry: + self._graceful_watch_exit(path) diff --git a/releasenotes/notes/fault-tolerable-watcher-24c51dbccabf5f17.yaml b/releasenotes/notes/fault-tolerable-watcher-24c51dbccabf5f17.yaml new file mode 100644 index 000000000..b349793d7 --- /dev/null +++ b/releasenotes/notes/fault-tolerable-watcher-24c51dbccabf5f17.yaml @@ -0,0 +1,21 @@ +--- +upgrade: + - | + For the kuryr kubernetes watcher, + a new option 'watch_retry_timeout' has been added. + The following should be modified at kuryr.conf:: + + + [kubernetes] + # 'watch_retry_timeout' field is optional, + # default = 60 if not set. + watch_retry_timeout =