Watcher restarts watching resources in failure

kuryr-kubernetes watcher watches k8s resources and trigger registered
pipeline.

This patch handles restarting watching when watch thread has failed.

Change-Id: I27a719a326dc37f97c46b88d0c171d0f12ded605
Closes-Bug: 1739776
Related-Bug: 1705429
Signed-off-by: Eunsoo Park <esevan.park@gmail.com>
(cherry picked from commit 58e6b1914c)
This commit is contained in:
Eunsoo Park 2018-02-22 16:12:34 +09:00 committed by Luis Tomas Bolivar
parent acb62b0205
commit 138c25338b
6 changed files with 144 additions and 41 deletions

View File

@ -137,6 +137,9 @@ k8s_opts = [
cfg.BoolOpt('enable_manager', cfg.BoolOpt('enable_manager',
help=_("Enable Manager to manage the pools."), help=_("Enable Manager to manage the pools."),
default=False), default=False),
cfg.IntOpt('watch_retry_timeout',
help=_('Time (in seconds) the watcher retries watching for.'),
default=60),
] ]
neutron_defaults = [ neutron_defaults = [

View File

@ -14,7 +14,6 @@
# under the License. # under the License.
import itertools import itertools
import random
import time import time
from oslo_log import log as logging from oslo_log import log as logging
@ -22,12 +21,10 @@ from oslo_utils import excutils
from kuryr_kubernetes import exceptions from kuryr_kubernetes import exceptions
from kuryr_kubernetes.handlers import base from kuryr_kubernetes.handlers import base
from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
DEFAULT_TIMEOUT = 180
DEFAULT_INTERVAL = 3
class Retry(base.EventHandler): class Retry(base.EventHandler):
"""Retries handler on failure. """Retries handler on failure.
@ -39,16 +36,13 @@ class Retry(base.EventHandler):
`handler`, so the actual time spent within a single call to `Retry` may `handler`, so the actual time spent within a single call to `Retry` may
exceed the `timeout` depending on responsiveness of the `handler`. exceed the `timeout` depending on responsiveness of the `handler`.
`Retry` implements a variation of exponential backoff algorithm [1] and
ensures that there is a minimal time `interval` after the failed
`handler` is retried for the same `event` (expected backoff E(c) = `handler` is retried for the same `event` (expected backoff E(c) =
interval * 2 ** c / 2). interval * 2 ** c / 2).
[1] https://en.wikipedia.org/wiki/Exponential_backoff
""" """
def __init__(self, handler, exceptions=Exception, def __init__(self, handler, exceptions=Exception,
timeout=DEFAULT_TIMEOUT, interval=DEFAULT_INTERVAL): timeout=utils.DEFAULT_TIMEOUT,
interval=utils.DEFAULT_INTERVAL):
self._handler = handler self._handler = handler
self._exceptions = exceptions self._exceptions = exceptions
self._timeout = timeout self._timeout = timeout
@ -73,28 +67,17 @@ class Retry(base.EventHandler):
raise raise
def _sleep(self, deadline, attempt, exception): def _sleep(self, deadline, attempt, exception):
now = time.time() LOG.debug("Handler %s failed (attempt %s; %s)",
seconds_left = deadline - now self._handler, attempt, exceptions.format_msg(exception))
interval = utils.exponential_sleep(deadline, attempt,
if seconds_left <= 0: self._interval)
if not interval:
LOG.debug("Handler %s failed (attempt %s; %s), " LOG.debug("Handler %s failed (attempt %s; %s), "
"timeout exceeded (%s seconds)", "timeout exceeded (%s seconds)",
self._handler, attempt, exceptions.format_msg(exception), self._handler, attempt, exceptions.format_msg(exception),
self._timeout) self._timeout)
return 0 return 0
interval = random.randint(1, 2 ** attempt - 1) * self._interval LOG.debug("Resumed after %s seconds. Retry handler %s", interval,
self._handler)
if interval > seconds_left:
interval = seconds_left
if interval < self._interval:
interval = self._interval
LOG.debug("Handler %s failed (attempt %s; %s), "
"retrying in %s seconds",
self._handler, attempt, exceptions.format_msg(exception),
interval)
time.sleep(interval)
return interval return interval

View File

@ -210,8 +210,8 @@ class TestWatcher(test_base.TestCase):
self.client.watch.side_effect = client_watch self.client.watch.side_effect = client_watch
@staticmethod @staticmethod
def _test_watch_create_watcher(path, handler): def _test_watch_create_watcher(path, handler, timeout=0):
watcher_obj = watcher.Watcher(handler) watcher_obj = watcher.Watcher(handler, timeout=timeout)
watcher_obj._running = True watcher_obj._running = True
watcher_obj._resources.add(path) watcher_obj._resources.add(path)
watcher_obj._idle[path] = True watcher_obj._idle[path] = True
@ -232,6 +232,7 @@ class TestWatcher(test_base.TestCase):
watcher_obj._watch(path) watcher_obj._watch(path)
self.assertEqual(0, watcher_obj._timeout)
m_handler.assert_has_calls([mock.call(e) for e in events]) m_handler.assert_has_calls([mock.call(e) for e in events])
def test_watch_stopped(self): def test_watch_stopped(self):
@ -301,3 +302,23 @@ class TestWatcher(test_base.TestCase):
self.client.watch.assert_called_once() self.client.watch.assert_called_once()
self.assertFalse(watcher_obj._healthy) self.assertFalse(watcher_obj._healthy)
def test_watch_retry(self):
path = '/test'
events = [{'e': i} for i in range(3)]
m_handler = mock.Mock()
watcher_obj = self._test_watch_create_watcher(path, m_handler, 10)
self.retry = True
def handler(event):
if self.retry:
self.retry = False
raise exceptions.ChunkedEncodingError("Connection Broken")
self.client.watch.side_effect = handler
self._test_watch_mock_events(watcher_obj, events)
watcher_obj._watch(path)
m_handler.assert_has_calls([mock.call(e) for e in events])

View File

@ -10,11 +10,17 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import random
import time
from oslo_config import cfg from oslo_config import cfg
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
CONF = cfg.CONF CONF = cfg.CONF
DEFAULT_TIMEOUT = 180
DEFAULT_INTERVAL = 3
def utf8_json_decoder(byte_data): def utf8_json_decoder(byte_data):
"""Deserializes the bytes into UTF-8 encoded JSON. """Deserializes the bytes into UTF-8 encoded JSON.
@ -39,3 +45,35 @@ def convert_netns(netns):
return netns.replace('/proc', CONF.cni_daemon.netns_proc_dir) return netns.replace('/proc', CONF.cni_daemon.netns_proc_dir)
else: else:
return netns return netns
def exponential_sleep(deadline, attempt, interval=DEFAULT_INTERVAL):
"""Sleep for exponential duration.
This implements a variation of exponential backoff algorithm [1] and
ensures that there is a minimal time `interval` to sleep.
(expected backoff E(c) = interval * 2 ** c / 2).
[1] https://en.wikipedia.org/wiki/Exponential_backoff
:param deadline: sleep timeout duration in seconds.
:param attempt: attempt count of sleep function.
:param interval: minimal time interval to sleep
:return: the actual time that we've slept
"""
now = time.time()
seconds_left = deadline - now
if seconds_left <= 0:
return 0
interval = random.randint(1, 2 ** attempt - 1) * DEFAULT_INTERVAL
if interval > seconds_left:
interval = seconds_left
if interval < DEFAULT_INTERVAL:
interval = DEFAULT_INTERVAL
time.sleep(interval)
return interval

View File

@ -13,11 +13,16 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import time
from kuryr_kubernetes import clients from kuryr_kubernetes import clients
from kuryr_kubernetes.handlers import health from kuryr_kubernetes.handlers import health
from kuryr_kubernetes import utils
from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class Watcher(health.HealthHandler): class Watcher(health.HealthHandler):
@ -50,7 +55,7 @@ class Watcher(health.HealthHandler):
graceful=False)` for asynchronous `Watcher`). graceful=False)` for asynchronous `Watcher`).
""" """
def __init__(self, handler, thread_group=None): def __init__(self, handler, thread_group=None, timeout=None):
"""Initializes a new Watcher instance. """Initializes a new Watcher instance.
:param handler: a `callable` object to be invoked for each observed :param handler: a `callable` object to be invoked for each observed
@ -74,6 +79,10 @@ class Watcher(health.HealthHandler):
self._watching = {} self._watching = {}
self._idle = {} self._idle = {}
if timeout is None:
timeout = CONF.kubernetes.watch_retry_timeout
self._timeout = timeout
def add(self, path): def add(self, path):
"""Adds ths K8s resource to the Watcher. """Adds ths K8s resource to the Watcher.
@ -132,18 +141,46 @@ class Watcher(health.HealthHandler):
if self._thread_group: if self._thread_group:
self._watching[path].stop() self._watching[path].stop()
def _watch(self, path): def _graceful_watch_exit(self, path):
try: try:
LOG.info("Started watching '%s'", path)
for event in self._client.watch(path):
self._idle[path] = False
self._handler(event)
self._idle[path] = True
if not (self._running and path in self._resources):
return
except Exception:
self._healthy = False
finally:
self._watching.pop(path) self._watching.pop(path)
self._idle.pop(path) self._idle.pop(path)
LOG.info("Stopped watching '%s'", path) LOG.info("Stopped watching '%s'", path)
except KeyError:
LOG.error("Failed to exit watch gracefully")
def _watch(self, path):
attempts = 0
deadline = 0
while self._running and path in self._resources:
try:
retry = False
if attempts == 1:
deadline = time.time() + self._timeout
if (attempts > 0 and
utils.exponential_sleep(deadline, attempts) == 0):
LOG.error("Failed watching '%s': deadline exceeded", path)
self._healthy = False
return
LOG.info("Started watching '%s'", path)
for event in self._client.watch(path):
# NOTE(esevan): Watcher retries watching for
# `self._timeout` duration with exponential backoff
# algorithm to tolerate against temporal exception such as
# temporal disconnection to the k8s api server.
attempts = 0
self._idle[path] = False
self._handler(event)
self._idle[path] = True
if not (self._running and path in self._resources):
return
except Exception as e:
LOG.warning("Restarting(%s) watching '%s': %s",
attempts, path, e)
attempts += 1
retry = True
finally:
if not retry:
self._graceful_watch_exit(path)

View File

@ -0,0 +1,21 @@
---
upgrade:
- |
For the kuryr kubernetes watcher,
a new option 'watch_retry_timeout' has been added.
The following should be modified at kuryr.conf::
[kubernetes]
# 'watch_retry_timeout' field is optional,
# default = 60 if not set.
watch_retry_timeout = <Time in seconds>
fixes:
- |
K8s api server is often temporarily down and restored soon in production
environment. Since kuryr-kubernetes watches k8s resources by connecting
k8s api server, watcher fails to watch the resources if k8s api server is
down.
In order to fix it, we made watcher retry connecting to k8s api server for
specific time duration when an exception is raised.