Watcher restarts watching resources in failure

kuryr-kubernetes watcher watches k8s resources and trigger registered
pipeline.

This patch handles restarting watching when watch thread has failed.

Change-Id: I27a719a326dc37f97c46b88d0c171d0f12ded605
Closes-Bug: 1739776
Related-Bug: 1705429
Signed-off-by: Eunsoo Park <esevan.park@gmail.com>
This commit is contained in:
Eunsoo Park
2018-02-22 16:12:34 +09:00
parent e7b76bf2a5
commit 58e6b1914c
6 changed files with 143 additions and 41 deletions

View File

@@ -142,6 +142,9 @@ k8s_opts = [
cfg.BoolOpt('enable_manager',
help=_("Enable Manager to manage the pools."),
default=False),
cfg.IntOpt('watch_retry_timeout',
help=_('Time (in seconds) the watcher retries watching for.'),
default=60),
]
neutron_defaults = [

View File

@@ -14,7 +14,6 @@
# under the License.
import itertools
import random
import time
from oslo_log import log as logging
@@ -22,12 +21,10 @@ from oslo_utils import excutils
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.handlers import base
from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__)
DEFAULT_TIMEOUT = 180
DEFAULT_INTERVAL = 3
class Retry(base.EventHandler):
"""Retries handler on failure.
@@ -39,16 +36,13 @@ class Retry(base.EventHandler):
`handler`, so the actual time spent within a single call to `Retry` may
exceed the `timeout` depending on responsiveness of the `handler`.
`Retry` implements a variation of exponential backoff algorithm [1] and
ensures that there is a minimal time `interval` after the failed
`handler` is retried for the same `event` (expected backoff E(c) =
interval * 2 ** c / 2).
[1] https://en.wikipedia.org/wiki/Exponential_backoff
"""
def __init__(self, handler, exceptions=Exception,
timeout=DEFAULT_TIMEOUT, interval=DEFAULT_INTERVAL):
timeout=utils.DEFAULT_TIMEOUT,
interval=utils.DEFAULT_INTERVAL):
self._handler = handler
self._exceptions = exceptions
self._timeout = timeout
@@ -73,28 +67,17 @@ class Retry(base.EventHandler):
raise
def _sleep(self, deadline, attempt, exception):
now = time.time()
seconds_left = deadline - now
if seconds_left <= 0:
LOG.debug("Handler %s failed (attempt %s; %s)",
self._handler, attempt, exceptions.format_msg(exception))
interval = utils.exponential_sleep(deadline, attempt,
self._interval)
if not interval:
LOG.debug("Handler %s failed (attempt %s; %s), "
"timeout exceeded (%s seconds)",
self._handler, attempt, exceptions.format_msg(exception),
self._timeout)
return 0
interval = random.randint(1, 2 ** attempt - 1) * self._interval
if interval > seconds_left:
interval = seconds_left
if interval < self._interval:
interval = self._interval
LOG.debug("Handler %s failed (attempt %s; %s), "
"retrying in %s seconds",
self._handler, attempt, exceptions.format_msg(exception),
interval)
time.sleep(interval)
LOG.debug("Resumed after %s seconds. Retry handler %s", interval,
self._handler)
return interval

View File

@@ -210,8 +210,8 @@ class TestWatcher(test_base.TestCase):
self.client.watch.side_effect = client_watch
@staticmethod
def _test_watch_create_watcher(path, handler):
watcher_obj = watcher.Watcher(handler)
def _test_watch_create_watcher(path, handler, timeout=0):
watcher_obj = watcher.Watcher(handler, timeout=timeout)
watcher_obj._running = True
watcher_obj._resources.add(path)
watcher_obj._idle[path] = True
@@ -232,6 +232,7 @@ class TestWatcher(test_base.TestCase):
watcher_obj._watch(path)
self.assertEqual(0, watcher_obj._timeout)
m_handler.assert_has_calls([mock.call(e) for e in events])
def test_watch_stopped(self):
@@ -301,3 +302,23 @@ class TestWatcher(test_base.TestCase):
self.client.watch.assert_called_once()
self.assertFalse(watcher_obj._healthy)
def test_watch_retry(self):
path = '/test'
events = [{'e': i} for i in range(3)]
m_handler = mock.Mock()
watcher_obj = self._test_watch_create_watcher(path, m_handler, 10)
self.retry = True
def handler(event):
if self.retry:
self.retry = False
raise exceptions.ChunkedEncodingError("Connection Broken")
self.client.watch.side_effect = handler
self._test_watch_mock_events(watcher_obj, events)
watcher_obj._watch(path)
m_handler.assert_has_calls([mock.call(e) for e in events])

View File

@@ -10,6 +10,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import random
import time
from oslo_config import cfg
from oslo_serialization import jsonutils
@@ -21,6 +24,8 @@ VALID_MULTI_POD_POOLS_OPTS = {'noop': ['neutron-vif',
'neutron': ['neutron-vif'],
'nested': ['nested-vlan'],
}
DEFAULT_TIMEOUT = 180
DEFAULT_INTERVAL = 3
def utf8_json_decoder(byte_data):
@@ -61,3 +66,35 @@ def get_pod_unique_name(pod):
def check_suitable_multi_pool_driver_opt(pool_driver, pod_driver):
return pod_driver in VALID_MULTI_POD_POOLS_OPTS.get(pool_driver, [])
def exponential_sleep(deadline, attempt, interval=DEFAULT_INTERVAL):
"""Sleep for exponential duration.
This implements a variation of exponential backoff algorithm [1] and
ensures that there is a minimal time `interval` to sleep.
(expected backoff E(c) = interval * 2 ** c / 2).
[1] https://en.wikipedia.org/wiki/Exponential_backoff
:param deadline: sleep timeout duration in seconds.
:param attempt: attempt count of sleep function.
:param interval: minimal time interval to sleep
:return: the actual time that we've slept
"""
now = time.time()
seconds_left = deadline - now
if seconds_left <= 0:
return 0
interval = random.randint(1, 2 ** attempt - 1) * DEFAULT_INTERVAL
if interval > seconds_left:
interval = seconds_left
if interval < DEFAULT_INTERVAL:
interval = DEFAULT_INTERVAL
time.sleep(interval)
return interval

View File

@@ -13,11 +13,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
from kuryr_kubernetes import clients
from kuryr_kubernetes.handlers import health
from kuryr_kubernetes import utils
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class Watcher(health.HealthHandler):
@@ -50,7 +55,7 @@ class Watcher(health.HealthHandler):
graceful=False)` for asynchronous `Watcher`).
"""
def __init__(self, handler, thread_group=None):
def __init__(self, handler, thread_group=None, timeout=None):
"""Initializes a new Watcher instance.
:param handler: a `callable` object to be invoked for each observed
@@ -74,6 +79,10 @@ class Watcher(health.HealthHandler):
self._watching = {}
self._idle = {}
if timeout is None:
timeout = CONF.kubernetes.watch_retry_timeout
self._timeout = timeout
def add(self, path):
"""Adds ths K8s resource to the Watcher.
@@ -132,18 +141,46 @@ class Watcher(health.HealthHandler):
if self._thread_group:
self._watching[path].stop()
def _watch(self, path):
def _graceful_watch_exit(self, path):
try:
LOG.info("Started watching '%s'", path)
for event in self._client.watch(path):
self._idle[path] = False
self._handler(event)
self._idle[path] = True
if not (self._running and path in self._resources):
return
except Exception:
self._healthy = False
finally:
self._watching.pop(path)
self._idle.pop(path)
LOG.info("Stopped watching '%s'", path)
except KeyError:
LOG.error("Failed to exit watch gracefully")
def _watch(self, path):
attempts = 0
deadline = 0
while self._running and path in self._resources:
try:
retry = False
if attempts == 1:
deadline = time.time() + self._timeout
if (attempts > 0 and
utils.exponential_sleep(deadline, attempts) == 0):
LOG.error("Failed watching '%s': deadline exceeded", path)
self._healthy = False
return
LOG.info("Started watching '%s'", path)
for event in self._client.watch(path):
# NOTE(esevan): Watcher retries watching for
# `self._timeout` duration with exponential backoff
# algorithm to tolerate against temporal exception such as
# temporal disconnection to the k8s api server.
attempts = 0
self._idle[path] = False
self._handler(event)
self._idle[path] = True
if not (self._running and path in self._resources):
return
except Exception as e:
LOG.warning("Restarting(%s) watching '%s': %s",
attempts, path, e)
attempts += 1
retry = True
finally:
if not retry:
self._graceful_watch_exit(path)

View File

@@ -0,0 +1,21 @@
---
upgrade:
- |
For the kuryr kubernetes watcher,
a new option 'watch_retry_timeout' has been added.
The following should be modified at kuryr.conf::
[kubernetes]
# 'watch_retry_timeout' field is optional,
# default = 60 if not set.
watch_retry_timeout = <Time in seconds>
fixes:
- |
K8s api server is often temporarily down and restored soon in production
environment. Since kuryr-kubernetes watches k8s resources by connecting
k8s api server, watcher fails to watch the resources if k8s api server is
down.
In order to fix it, we made watcher retry connecting to k8s api server for
specific time duration when an exception is raised.