Browse Source

Periodically fetch full list of watched resources

Kuryr-Kubernetes relies on watching resources in K8s API using an HTTP
stream served by kube-apiserver. In such a distributed system this is
sometimes unstable and e.g. etcd issues can cause some events to be
omitted. To prevent controller from such situations this patch makes
sure that periodically a full list of resources is fetched and injected
as events into the handlers.

We should probably do the same for kuryr-daemon watcher, but that case
is less problematic as it'll be restarted in event of ADD requests
timing out.

Change-Id: I67874d086043071de072420df9ea5e86b3f2582e
changes/43/737843/10
Michał Dulko 1 year ago
parent
commit
9f722e6200
12 changed files with 104 additions and 32 deletions
  1. +8
    -0
      kuryr_kubernetes/config.py
  2. +18
    -12
      kuryr_kubernetes/handlers/asynchronous.py
  3. +1
    -1
      kuryr_kubernetes/handlers/base.py
  4. +4
    -4
      kuryr_kubernetes/handlers/dispatch.py
  5. +1
    -1
      kuryr_kubernetes/handlers/k8s_base.py
  6. +2
    -2
      kuryr_kubernetes/handlers/logging.py
  7. +2
    -2
      kuryr_kubernetes/handlers/retry.py
  8. +23
    -6
      kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py
  9. +3
    -0
      kuryr_kubernetes/tests/unit/test_watcher.py
  10. +38
    -0
      kuryr_kubernetes/watcher.py
  11. +2
    -2
      lower-constraints.txt
  12. +2
    -2
      requirements.txt

+ 8
- 0
kuryr_kubernetes/config.py View File

@ -168,6 +168,14 @@ k8s_opts = [
'too high, Kuryr will take longer to reconnect when K8s '
'API stream was being silently broken.'),
default=60),
cfg.IntOpt('watch_reconcile_period',
help=_('Period (in seconds) between iterations of fetching '
'full list of watched K8s API resources and putting '
'them into the enabled handlers. Setting 0 disables the '
'periodic reconciling. The reconciliation is done to '
'prevent Kuryr from missing events due to K8s API or '
'etcd issues.'),
default=120),
cfg.ListOpt('enabled_handlers',
help=_("The comma-separated handlers that should be "
"registered for watching in the pipeline."),


+ 18
- 12
kuryr_kubernetes/handlers/asynchronous.py View File

@ -17,6 +17,7 @@ import itertools
import queue as py_queue
import time
from oslo_concurrency import lockutils
from oslo_log import log as logging
@ -49,16 +50,21 @@ class Async(base.EventHandler):
self._grace_period = grace_period
self._queues = {}
def __call__(self, event):
def __call__(self, event, *args, **kwargs):
group = self._group_by(event)
try:
queue = self._queues[group]
except KeyError:
queue = py_queue.Queue(self._queue_depth)
self._queues[group] = queue
thread = self._thread_group.add_thread(self._run, group, queue)
thread.link(self._done, group)
queue.put(event)
with lockutils.lock(group):
try:
queue = self._queues[group]
# NOTE(dulek): We don't want to risk injecting an outdated
# state if events for that resource are in queue.
if kwargs.get('injected', False):
return
except KeyError:
queue = py_queue.Queue(self._queue_depth)
self._queues[group] = queue
thread = self._thread_group.add_thread(self._run, group, queue)
thread.link(self._done, group)
queue.put((event, args, kwargs))
def _run(self, group, queue):
LOG.debug("Asynchronous handler started processing %s", group)
@ -67,7 +73,7 @@ class Async(base.EventHandler):
# to allow more controlled environment for unit-tests (e.g. to
# avoid tests getting stuck in infinite loops)
try:
event = queue.get(timeout=self._grace_period)
event, args, kwargs = queue.get(timeout=self._grace_period)
except py_queue.Empty:
break
# FIXME(ivc): temporary workaround to skip stale events
@ -93,10 +99,10 @@ class Async(base.EventHandler):
# also introduce cache for long operations
time.sleep(STALE_PERIOD)
while not queue.empty():
event = queue.get()
event, args, kwargs = queue.get()
if queue.empty():
time.sleep(STALE_PERIOD)
self._handler(event)
self._handler(event, *args, **kwargs)
def _done(self, thread, group):
LOG.debug("Asynchronous handler stopped processing group %s", group)


+ 1
- 1
kuryr_kubernetes/handlers/base.py View File

@ -20,7 +20,7 @@ class EventHandler(object, metaclass=abc.ABCMeta):
"""Base class for event handlers."""
@abc.abstractmethod
def __call__(self, event):
def __call__(self, event, *args, **kwargs):
"""Handle the event."""
raise NotImplementedError()


+ 4
- 4
kuryr_kubernetes/handlers/dispatch.py View File

@ -51,7 +51,7 @@ class Dispatcher(h_base.EventHandler):
handlers = key_group.setdefault(key, [])
handlers.append(handler)
def __call__(self, event):
def __call__(self, event, *args, **kwargs):
handlers = set()
for key_fn, key_group in self._registry.items():
@ -67,7 +67,7 @@ class Dispatcher(h_base.EventHandler):
obj_meta.get('uid'))
for handler in handlers:
handler(event)
handler(event, *args, **kwargs)
class EventConsumer(h_base.EventHandler, metaclass=abc.ABCMeta):
@ -113,8 +113,8 @@ class EventPipeline(h_base.EventHandler, metaclass=abc.ABCMeta):
for key_fn, key in consumer.consumes.items():
self._dispatcher.register(key_fn, key, handler)
def __call__(self, event):
self._handler(event)
def __call__(self, event, *args, **kwargs):
self._handler(event, *args, **kwargs)
@abc.abstractmethod
def _wrap_dispatcher(self, dispatcher):


+ 1
- 1
kuryr_kubernetes/handlers/k8s_base.py View File

@ -73,7 +73,7 @@ class ResourceEventHandler(dispatch.EventConsumer, health.HealthHandler):
return deletion_timestamp
def __call__(self, event):
def __call__(self, event, *args, **kwargs):
event_type = event.get('type')
obj = event.get('object')
if 'MODIFIED' == event_type:


+ 2
- 2
kuryr_kubernetes/handlers/logging.py View File

@ -32,8 +32,8 @@ class LogExceptions(base.EventHandler):
self._handler = handler
self._exceptions = exceptions
def __call__(self, event):
def __call__(self, event, *args, **kwargs):
try:
self._handler(event)
self._handler(event, *args, **kwargs)
except self._exceptions:
LOG.exception("Failed to handle event %s", event)

+ 2
- 2
kuryr_kubernetes/handlers/retry.py View File

@ -51,7 +51,7 @@ class Retry(base.EventHandler):
self._interval = interval
self._k8s = clients.get_kubernetes_client()
def __call__(self, event):
def __call__(self, event, *args, **kwargs):
deadline = time.time() + self._timeout
for attempt in itertools.count(1):
if event.get('type') in ['MODIFIED', 'ADDED']:
@ -75,7 +75,7 @@ class Retry(base.EventHandler):
"object. Continuing with handler "
"execution.")
try:
self._handler(event)
self._handler(event, *args, **kwargs)
break
except os_exc.ConflictException as ex:
if ex.details.startswith('Quota exceeded for resources'):


+ 23
- 6
kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py View File

@ -35,7 +35,7 @@ class TestAsyncHandler(test_base.TestCase):
m_handler.assert_not_called()
self.assertEqual({group: m_queue}, async_handler._queues)
m_queue.put.assert_called_once_with(event)
m_queue.put.assert_called_once_with((event, (), {}))
@mock.patch('queue.Queue')
def test_call_new(self, m_queue_type):
@ -60,7 +60,22 @@ class TestAsyncHandler(test_base.TestCase):
m_tg.add_thread.assert_called_once_with(async_handler._run, group,
m_queue)
m_th.link.assert_called_once_with(async_handler._done, group)
m_queue.put.assert_called_once_with(event)
m_queue.put.assert_called_once_with((event, (), {}))
def test_call_injected(self):
event = mock.sentinel.event
group = mock.sentinel.group
m_queue = mock.Mock()
m_handler = mock.Mock()
m_group_by = mock.Mock(return_value=group)
async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by)
async_handler._queues[group] = m_queue
async_handler(event, injected=True)
m_handler.assert_not_called()
self.assertEqual({group: m_queue}, async_handler._queues)
m_queue.put.assert_not_called()
@mock.patch('itertools.count')
def test_run(self, m_count):
@ -68,7 +83,7 @@ class TestAsyncHandler(test_base.TestCase):
group = mock.sentinel.group
m_queue = mock.Mock()
m_queue.empty.return_value = True
m_queue.get.return_value = event
m_queue.get.return_value = (event, (), {})
m_handler = mock.Mock()
m_count.return_value = [1]
async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(),
@ -81,7 +96,8 @@ class TestAsyncHandler(test_base.TestCase):
@mock.patch('itertools.count')
def test_run_empty(self, m_count):
events = [mock.sentinel.event1, mock.sentinel.event2]
events = [(x, (), {}) for x in (mock.sentinel.event1,
mock.sentinel.event2)]
group = mock.sentinel.group
m_queue = mock.Mock()
m_queue.empty.return_value = True
@ -93,12 +109,13 @@ class TestAsyncHandler(test_base.TestCase):
with mock.patch('time.sleep'):
async_handler._run(group, m_queue)
m_handler.assert_has_calls([mock.call(event) for event in events])
m_handler.assert_has_calls([mock.call(event[0]) for event in events])
self.assertEqual(len(events), m_handler.call_count)
@mock.patch('itertools.count')
def test_run_stale(self, m_count):
events = [mock.sentinel.event1, mock.sentinel.event2]
events = [(x, (), {}) for x in (mock.sentinel.event1,
mock.sentinel.event2)]
group = mock.sentinel.group
m_queue = mock.Mock()
m_queue.empty.side_effect = [False, True, True]


+ 3
- 0
kuryr_kubernetes/tests/unit/test_watcher.py View File

@ -179,13 +179,16 @@ class TestWatcher(test_base.TestCase):
path = '/test'
m_tg = mock.Mock()
m_th = mock.Mock()
m_tt = mock.Mock()
m_handler = mock.Mock()
watcher_obj = watcher.Watcher(m_handler, m_tg)
watcher_obj._idle[path] = True
watcher_obj._watching[path] = m_th
watcher_obj._timers[path] = m_tt
watcher_obj._stop_watch(path)
m_tt.stop.assert_called()
m_th.stop.assert_called()
def test_stop_watch_idle(self):


+ 38
- 0
kuryr_kubernetes/watcher.py View File

@ -17,6 +17,7 @@ import sys
import time
from kuryr_kubernetes import clients
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.handlers import health
from kuryr_kubernetes import utils
from oslo_config import cfg
@ -78,6 +79,7 @@ class Watcher(health.HealthHandler):
self._running = False
self._resources = set()
self._watching = {}
self._timers = {}
self._idle = {}
if timeout is None:
@ -131,11 +133,41 @@ class Watcher(health.HealthHandler):
for path in list(self._watching):
self._stop_watch(path)
def _reconcile(self, path):
LOG.debug(f'Getting {path} for reconciliation.')
try:
response = self._client.get(path)
resources = response['items']
except exceptions.K8sClientException:
LOG.exception(f'Error getting path when reconciling.')
return
for resource in resources:
kind = response['kind']
# Strip List from e.g. PodList. For some reason `.items` of a list
# returned from API doesn't have `kind` set.
if kind.endswith('List'):
kind = kind[:-4]
resource['kind'] = kind
event = {
'type': 'MODIFIED',
'object': resource,
}
self._handler(event, injected=True)
def _start_watch(self, path):
tg = self._thread_group
self._idle[path] = True
if tg:
self._watching[path] = tg.add_thread(self._watch, path)
period = CONF.kubernetes.watch_reconcile_period
if period > 0:
# Let's make sure handlers won't reconcile at the same time.
initial_delay = period + 5 * len(self._timers)
self._timers[path] = tg.add_timer_args(
period, self._reconcile, args=(path,),
initial_delay=initial_delay, stop_on_exception=False)
else:
self._watching[path] = None
self._watch(path)
@ -143,15 +175,21 @@ class Watcher(health.HealthHandler):
def _stop_watch(self, path):
if self._idle.get(path):
if self._thread_group and path in self._watching:
if CONF.kubernetes.watch_reconcile_period:
self._timers[path].stop()
self._watching[path].stop()
# NOTE(dulek): Thread gets killed immediately, so we need to
# take care of this ourselves.
if CONF.kubernetes.watch_reconcile_period:
self._timers.pop(path, None)
self._watching.pop(path, None)
self._idle.pop(path, None)
def _graceful_watch_exit(self, path):
try:
self._watching.pop(path, None)
if CONF.kubernetes.watch_reconcile_period:
self._timers.pop(path, None)
self._idle.pop(path, None)
LOG.info("Stopped watching '%s'", path)
except KeyError:


+ 2
- 2
lower-constraints.txt View File

@ -73,8 +73,8 @@ oslo.policy==1.34.0
oslo.privsep==1.28.0
oslo.reports==1.18.0
oslo.serialization==2.18.0
oslo.service==1.24.0
oslo.utils==3.33.0
oslo.service==1.40.2
oslo.utils==3.40.2
oslo.versionedobjects==1.32.0
oslotest==3.2.0
packaging==17.1


+ 2
- 2
requirements.txt View File

@ -15,8 +15,8 @@ oslo.config>=6.1.0 # Apache-2.0
oslo.log>=3.36.0 # Apache-2.0
oslo.reports>=1.18.0 # Apache-2.0
oslo.serialization!=2.19.1,>=2.18.0 # Apache-2.0
oslo.service!=1.28.1,>=1.24.0 # Apache-2.0
oslo.utils>=3.33.0 # Apache-2.0
oslo.service>=1.40.2 # Apache-2.0
oslo.utils>=3.40.2 # Apache-2.0
os-vif>=1.12.0 # Apache-2.0
PrettyTable<0.8,>=0.7.2 # BSD
pyroute2>=0.5.7;sys_platform!='win32' # Apache-2.0 (+ dual licensed GPL2)


Loading…
Cancel
Save