Merge "Watcher restarts watching resources in failure"
This commit is contained in:
commit
d16cb7d8a5
@ -146,6 +146,9 @@ k8s_opts = [
|
||||
cfg.BoolOpt('enable_manager',
|
||||
help=_("Enable Manager to manage the pools."),
|
||||
default=False),
|
||||
cfg.IntOpt('watch_retry_timeout',
|
||||
help=_('Time (in seconds) the watcher retries watching for.'),
|
||||
default=60),
|
||||
]
|
||||
|
||||
neutron_defaults = [
|
||||
|
@ -14,7 +14,6 @@
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
import random
|
||||
import time
|
||||
|
||||
from oslo_log import log as logging
|
||||
@ -22,12 +21,10 @@ from oslo_utils import excutils
|
||||
|
||||
from kuryr_kubernetes import exceptions
|
||||
from kuryr_kubernetes.handlers import base
|
||||
from kuryr_kubernetes import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_TIMEOUT = 180
|
||||
DEFAULT_INTERVAL = 3
|
||||
|
||||
|
||||
class Retry(base.EventHandler):
|
||||
"""Retries handler on failure.
|
||||
@ -39,16 +36,13 @@ class Retry(base.EventHandler):
|
||||
`handler`, so the actual time spent within a single call to `Retry` may
|
||||
exceed the `timeout` depending on responsiveness of the `handler`.
|
||||
|
||||
`Retry` implements a variation of exponential backoff algorithm [1] and
|
||||
ensures that there is a minimal time `interval` after the failed
|
||||
`handler` is retried for the same `event` (expected backoff E(c) =
|
||||
interval * 2 ** c / 2).
|
||||
|
||||
[1] https://en.wikipedia.org/wiki/Exponential_backoff
|
||||
"""
|
||||
|
||||
def __init__(self, handler, exceptions=Exception,
|
||||
timeout=DEFAULT_TIMEOUT, interval=DEFAULT_INTERVAL):
|
||||
timeout=utils.DEFAULT_TIMEOUT,
|
||||
interval=utils.DEFAULT_INTERVAL):
|
||||
self._handler = handler
|
||||
self._exceptions = exceptions
|
||||
self._timeout = timeout
|
||||
@ -73,28 +67,17 @@ class Retry(base.EventHandler):
|
||||
raise
|
||||
|
||||
def _sleep(self, deadline, attempt, exception):
|
||||
now = time.time()
|
||||
seconds_left = deadline - now
|
||||
|
||||
if seconds_left <= 0:
|
||||
LOG.debug("Handler %s failed (attempt %s; %s)",
|
||||
self._handler, attempt, exceptions.format_msg(exception))
|
||||
interval = utils.exponential_sleep(deadline, attempt,
|
||||
self._interval)
|
||||
if not interval:
|
||||
LOG.debug("Handler %s failed (attempt %s; %s), "
|
||||
"timeout exceeded (%s seconds)",
|
||||
self._handler, attempt, exceptions.format_msg(exception),
|
||||
self._timeout)
|
||||
return 0
|
||||
|
||||
interval = random.randint(1, 2 ** attempt - 1) * self._interval
|
||||
|
||||
if interval > seconds_left:
|
||||
interval = seconds_left
|
||||
|
||||
if interval < self._interval:
|
||||
interval = self._interval
|
||||
|
||||
LOG.debug("Handler %s failed (attempt %s; %s), "
|
||||
"retrying in %s seconds",
|
||||
self._handler, attempt, exceptions.format_msg(exception),
|
||||
interval)
|
||||
|
||||
time.sleep(interval)
|
||||
LOG.debug("Resumed after %s seconds. Retry handler %s", interval,
|
||||
self._handler)
|
||||
return interval
|
||||
|
@ -210,8 +210,8 @@ class TestWatcher(test_base.TestCase):
|
||||
self.client.watch.side_effect = client_watch
|
||||
|
||||
@staticmethod
|
||||
def _test_watch_create_watcher(path, handler):
|
||||
watcher_obj = watcher.Watcher(handler)
|
||||
def _test_watch_create_watcher(path, handler, timeout=0):
|
||||
watcher_obj = watcher.Watcher(handler, timeout=timeout)
|
||||
watcher_obj._running = True
|
||||
watcher_obj._resources.add(path)
|
||||
watcher_obj._idle[path] = True
|
||||
@ -232,6 +232,7 @@ class TestWatcher(test_base.TestCase):
|
||||
|
||||
watcher_obj._watch(path)
|
||||
|
||||
self.assertEqual(0, watcher_obj._timeout)
|
||||
m_handler.assert_has_calls([mock.call(e) for e in events])
|
||||
|
||||
def test_watch_stopped(self):
|
||||
@ -301,3 +302,23 @@ class TestWatcher(test_base.TestCase):
|
||||
|
||||
self.client.watch.assert_called_once()
|
||||
self.assertFalse(watcher_obj._healthy)
|
||||
|
||||
def test_watch_retry(self):
|
||||
path = '/test'
|
||||
events = [{'e': i} for i in range(3)]
|
||||
m_handler = mock.Mock()
|
||||
watcher_obj = self._test_watch_create_watcher(path, m_handler, 10)
|
||||
|
||||
self.retry = True
|
||||
|
||||
def handler(event):
|
||||
if self.retry:
|
||||
self.retry = False
|
||||
raise exceptions.ChunkedEncodingError("Connection Broken")
|
||||
|
||||
self.client.watch.side_effect = handler
|
||||
self._test_watch_mock_events(watcher_obj, events)
|
||||
|
||||
watcher_obj._watch(path)
|
||||
|
||||
m_handler.assert_has_calls([mock.call(e) for e in events])
|
||||
|
@ -10,6 +10,9 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import random
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
@ -21,6 +24,8 @@ VALID_MULTI_POD_POOLS_OPTS = {'noop': ['neutron-vif',
|
||||
'neutron': ['neutron-vif'],
|
||||
'nested': ['nested-vlan'],
|
||||
}
|
||||
DEFAULT_TIMEOUT = 180
|
||||
DEFAULT_INTERVAL = 3
|
||||
|
||||
|
||||
def utf8_json_decoder(byte_data):
|
||||
@ -61,3 +66,35 @@ def get_pod_unique_name(pod):
|
||||
|
||||
def check_suitable_multi_pool_driver_opt(pool_driver, pod_driver):
|
||||
return pod_driver in VALID_MULTI_POD_POOLS_OPTS.get(pool_driver, [])
|
||||
|
||||
|
||||
def exponential_sleep(deadline, attempt, interval=DEFAULT_INTERVAL):
|
||||
"""Sleep for exponential duration.
|
||||
|
||||
This implements a variation of exponential backoff algorithm [1] and
|
||||
ensures that there is a minimal time `interval` to sleep.
|
||||
(expected backoff E(c) = interval * 2 ** c / 2).
|
||||
|
||||
[1] https://en.wikipedia.org/wiki/Exponential_backoff
|
||||
|
||||
:param deadline: sleep timeout duration in seconds.
|
||||
:param attempt: attempt count of sleep function.
|
||||
:param interval: minimal time interval to sleep
|
||||
:return: the actual time that we've slept
|
||||
"""
|
||||
now = time.time()
|
||||
seconds_left = deadline - now
|
||||
|
||||
if seconds_left <= 0:
|
||||
return 0
|
||||
|
||||
interval = random.randint(1, 2 ** attempt - 1) * DEFAULT_INTERVAL
|
||||
|
||||
if interval > seconds_left:
|
||||
interval = seconds_left
|
||||
|
||||
if interval < DEFAULT_INTERVAL:
|
||||
interval = DEFAULT_INTERVAL
|
||||
|
||||
time.sleep(interval)
|
||||
return interval
|
||||
|
@ -13,11 +13,16 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
from kuryr_kubernetes import clients
|
||||
from kuryr_kubernetes.handlers import health
|
||||
from kuryr_kubernetes import utils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class Watcher(health.HealthHandler):
|
||||
@ -50,7 +55,7 @@ class Watcher(health.HealthHandler):
|
||||
graceful=False)` for asynchronous `Watcher`).
|
||||
"""
|
||||
|
||||
def __init__(self, handler, thread_group=None):
|
||||
def __init__(self, handler, thread_group=None, timeout=None):
|
||||
"""Initializes a new Watcher instance.
|
||||
|
||||
:param handler: a `callable` object to be invoked for each observed
|
||||
@ -74,6 +79,10 @@ class Watcher(health.HealthHandler):
|
||||
self._watching = {}
|
||||
self._idle = {}
|
||||
|
||||
if timeout is None:
|
||||
timeout = CONF.kubernetes.watch_retry_timeout
|
||||
self._timeout = timeout
|
||||
|
||||
def add(self, path):
|
||||
"""Adds ths K8s resource to the Watcher.
|
||||
|
||||
@ -132,18 +141,46 @@ class Watcher(health.HealthHandler):
|
||||
if self._thread_group:
|
||||
self._watching[path].stop()
|
||||
|
||||
def _watch(self, path):
|
||||
def _graceful_watch_exit(self, path):
|
||||
try:
|
||||
LOG.info("Started watching '%s'", path)
|
||||
for event in self._client.watch(path):
|
||||
self._idle[path] = False
|
||||
self._handler(event)
|
||||
self._idle[path] = True
|
||||
if not (self._running and path in self._resources):
|
||||
return
|
||||
except Exception:
|
||||
self._healthy = False
|
||||
finally:
|
||||
self._watching.pop(path)
|
||||
self._idle.pop(path)
|
||||
LOG.info("Stopped watching '%s'", path)
|
||||
except KeyError:
|
||||
LOG.error("Failed to exit watch gracefully")
|
||||
|
||||
def _watch(self, path):
|
||||
attempts = 0
|
||||
deadline = 0
|
||||
while self._running and path in self._resources:
|
||||
try:
|
||||
retry = False
|
||||
if attempts == 1:
|
||||
deadline = time.time() + self._timeout
|
||||
|
||||
if (attempts > 0 and
|
||||
utils.exponential_sleep(deadline, attempts) == 0):
|
||||
LOG.error("Failed watching '%s': deadline exceeded", path)
|
||||
self._healthy = False
|
||||
return
|
||||
|
||||
LOG.info("Started watching '%s'", path)
|
||||
for event in self._client.watch(path):
|
||||
# NOTE(esevan): Watcher retries watching for
|
||||
# `self._timeout` duration with exponential backoff
|
||||
# algorithm to tolerate against temporal exception such as
|
||||
# temporal disconnection to the k8s api server.
|
||||
attempts = 0
|
||||
self._idle[path] = False
|
||||
self._handler(event)
|
||||
self._idle[path] = True
|
||||
if not (self._running and path in self._resources):
|
||||
return
|
||||
except Exception as e:
|
||||
LOG.warning("Restarting(%s) watching '%s': %s",
|
||||
attempts, path, e)
|
||||
attempts += 1
|
||||
retry = True
|
||||
finally:
|
||||
if not retry:
|
||||
self._graceful_watch_exit(path)
|
||||
|
@ -0,0 +1,21 @@
|
||||
---
|
||||
upgrade:
|
||||
- |
|
||||
For the kuryr kubernetes watcher,
|
||||
a new option 'watch_retry_timeout' has been added.
|
||||
The following should be modified at kuryr.conf::
|
||||
|
||||
|
||||
[kubernetes]
|
||||
# 'watch_retry_timeout' field is optional,
|
||||
# default = 60 if not set.
|
||||
watch_retry_timeout = <Time in seconds>
|
||||
|
||||
fixes:
|
||||
- |
|
||||
K8s api server is often temporarily down and restored soon in production
|
||||
environment. Since kuryr-kubernetes watches k8s resources by connecting
|
||||
k8s api server, watcher fails to watch the resources if k8s api server is
|
||||
down.
|
||||
In order to fix it, we made watcher retry connecting to k8s api server for
|
||||
specific time duration when an exception is raised.
|
Loading…
x
Reference in New Issue
Block a user