diff --git a/kuryr_kubernetes/config.py b/kuryr_kubernetes/config.py index f3f0e9926..2c778dee7 100644 --- a/kuryr_kubernetes/config.py +++ b/kuryr_kubernetes/config.py @@ -168,6 +168,14 @@ k8s_opts = [ 'too high, Kuryr will take longer to reconnect when K8s ' 'API stream was being silently broken.'), default=60), + cfg.IntOpt('watch_reconcile_period', + help=_('Period (in seconds) between iterations of fetching ' + 'full list of watched K8s API resources and putting ' + 'them into the enabled handlers. Setting 0 disables the ' + 'periodic reconciling. The reconciliation is done to ' + 'prevent Kuryr from missing events due to K8s API or ' + 'etcd issues.'), + default=120), cfg.ListOpt('enabled_handlers', help=_("The comma-separated handlers that should be " "registered for watching in the pipeline."), diff --git a/kuryr_kubernetes/handlers/asynchronous.py b/kuryr_kubernetes/handlers/asynchronous.py index 9f20a2e9e..afb7bc02b 100755 --- a/kuryr_kubernetes/handlers/asynchronous.py +++ b/kuryr_kubernetes/handlers/asynchronous.py @@ -17,6 +17,7 @@ import itertools import queue as py_queue import time +from oslo_concurrency import lockutils from oslo_log import log as logging @@ -49,16 +50,21 @@ class Async(base.EventHandler): self._grace_period = grace_period self._queues = {} - def __call__(self, event): + def __call__(self, event, *args, **kwargs): group = self._group_by(event) - try: - queue = self._queues[group] - except KeyError: - queue = py_queue.Queue(self._queue_depth) - self._queues[group] = queue - thread = self._thread_group.add_thread(self._run, group, queue) - thread.link(self._done, group) - queue.put(event) + with lockutils.lock(group): + try: + queue = self._queues[group] + # NOTE(dulek): We don't want to risk injecting an outdated + # state if events for that resource are in queue. + if kwargs.get('injected', False): + return + except KeyError: + queue = py_queue.Queue(self._queue_depth) + self._queues[group] = queue + thread = self._thread_group.add_thread(self._run, group, queue) + thread.link(self._done, group) + queue.put((event, args, kwargs)) def _run(self, group, queue): LOG.debug("Asynchronous handler started processing %s", group) @@ -67,7 +73,7 @@ class Async(base.EventHandler): # to allow more controlled environment for unit-tests (e.g. to # avoid tests getting stuck in infinite loops) try: - event = queue.get(timeout=self._grace_period) + event, args, kwargs = queue.get(timeout=self._grace_period) except py_queue.Empty: break # FIXME(ivc): temporary workaround to skip stale events @@ -93,10 +99,10 @@ class Async(base.EventHandler): # also introduce cache for long operations time.sleep(STALE_PERIOD) while not queue.empty(): - event = queue.get() + event, args, kwargs = queue.get() if queue.empty(): time.sleep(STALE_PERIOD) - self._handler(event) + self._handler(event, *args, **kwargs) def _done(self, thread, group): LOG.debug("Asynchronous handler stopped processing group %s", group) diff --git a/kuryr_kubernetes/handlers/base.py b/kuryr_kubernetes/handlers/base.py index 62c23f9e4..438625d47 100644 --- a/kuryr_kubernetes/handlers/base.py +++ b/kuryr_kubernetes/handlers/base.py @@ -20,7 +20,7 @@ class EventHandler(object, metaclass=abc.ABCMeta): """Base class for event handlers.""" @abc.abstractmethod - def __call__(self, event): + def __call__(self, event, *args, **kwargs): """Handle the event.""" raise NotImplementedError() diff --git a/kuryr_kubernetes/handlers/dispatch.py b/kuryr_kubernetes/handlers/dispatch.py index 4bf023740..901df4e88 100644 --- a/kuryr_kubernetes/handlers/dispatch.py +++ b/kuryr_kubernetes/handlers/dispatch.py @@ -51,7 +51,7 @@ class Dispatcher(h_base.EventHandler): handlers = key_group.setdefault(key, []) handlers.append(handler) - def __call__(self, event): + def __call__(self, event, *args, **kwargs): handlers = set() for key_fn, key_group in self._registry.items(): @@ -67,7 +67,7 @@ class Dispatcher(h_base.EventHandler): obj_meta.get('uid')) for handler in handlers: - handler(event) + handler(event, *args, **kwargs) class EventConsumer(h_base.EventHandler, metaclass=abc.ABCMeta): @@ -113,8 +113,8 @@ class EventPipeline(h_base.EventHandler, metaclass=abc.ABCMeta): for key_fn, key in consumer.consumes.items(): self._dispatcher.register(key_fn, key, handler) - def __call__(self, event): - self._handler(event) + def __call__(self, event, *args, **kwargs): + self._handler(event, *args, **kwargs) @abc.abstractmethod def _wrap_dispatcher(self, dispatcher): diff --git a/kuryr_kubernetes/handlers/k8s_base.py b/kuryr_kubernetes/handlers/k8s_base.py index 22c372a46..4a1a8ed45 100755 --- a/kuryr_kubernetes/handlers/k8s_base.py +++ b/kuryr_kubernetes/handlers/k8s_base.py @@ -73,7 +73,7 @@ class ResourceEventHandler(dispatch.EventConsumer, health.HealthHandler): return deletion_timestamp - def __call__(self, event): + def __call__(self, event, *args, **kwargs): event_type = event.get('type') obj = event.get('object') if 'MODIFIED' == event_type: diff --git a/kuryr_kubernetes/handlers/logging.py b/kuryr_kubernetes/handlers/logging.py index f4259bab0..1036a84a3 100644 --- a/kuryr_kubernetes/handlers/logging.py +++ b/kuryr_kubernetes/handlers/logging.py @@ -32,8 +32,8 @@ class LogExceptions(base.EventHandler): self._handler = handler self._exceptions = exceptions - def __call__(self, event): + def __call__(self, event, *args, **kwargs): try: - self._handler(event) + self._handler(event, *args, **kwargs) except self._exceptions: LOG.exception("Failed to handle event %s", event) diff --git a/kuryr_kubernetes/handlers/retry.py b/kuryr_kubernetes/handlers/retry.py index e67bf1146..76f15db37 100644 --- a/kuryr_kubernetes/handlers/retry.py +++ b/kuryr_kubernetes/handlers/retry.py @@ -51,7 +51,7 @@ class Retry(base.EventHandler): self._interval = interval self._k8s = clients.get_kubernetes_client() - def __call__(self, event): + def __call__(self, event, *args, **kwargs): deadline = time.time() + self._timeout for attempt in itertools.count(1): if event.get('type') in ['MODIFIED', 'ADDED']: @@ -75,7 +75,7 @@ class Retry(base.EventHandler): "object. Continuing with handler " "execution.") try: - self._handler(event) + self._handler(event, *args, **kwargs) break except os_exc.ConflictException as ex: if ex.details.startswith('Quota exceeded for resources'): diff --git a/kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py b/kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py index bb0114beb..1e4368948 100644 --- a/kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py +++ b/kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py @@ -35,7 +35,7 @@ class TestAsyncHandler(test_base.TestCase): m_handler.assert_not_called() self.assertEqual({group: m_queue}, async_handler._queues) - m_queue.put.assert_called_once_with(event) + m_queue.put.assert_called_once_with((event, (), {})) @mock.patch('queue.Queue') def test_call_new(self, m_queue_type): @@ -60,7 +60,22 @@ class TestAsyncHandler(test_base.TestCase): m_tg.add_thread.assert_called_once_with(async_handler._run, group, m_queue) m_th.link.assert_called_once_with(async_handler._done, group) - m_queue.put.assert_called_once_with(event) + m_queue.put.assert_called_once_with((event, (), {})) + + def test_call_injected(self): + event = mock.sentinel.event + group = mock.sentinel.group + m_queue = mock.Mock() + m_handler = mock.Mock() + m_group_by = mock.Mock(return_value=group) + async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by) + async_handler._queues[group] = m_queue + + async_handler(event, injected=True) + + m_handler.assert_not_called() + self.assertEqual({group: m_queue}, async_handler._queues) + m_queue.put.assert_not_called() @mock.patch('itertools.count') def test_run(self, m_count): @@ -68,7 +83,7 @@ class TestAsyncHandler(test_base.TestCase): group = mock.sentinel.group m_queue = mock.Mock() m_queue.empty.return_value = True - m_queue.get.return_value = event + m_queue.get.return_value = (event, (), {}) m_handler = mock.Mock() m_count.return_value = [1] async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(), @@ -81,7 +96,8 @@ class TestAsyncHandler(test_base.TestCase): @mock.patch('itertools.count') def test_run_empty(self, m_count): - events = [mock.sentinel.event1, mock.sentinel.event2] + events = [(x, (), {}) for x in (mock.sentinel.event1, + mock.sentinel.event2)] group = mock.sentinel.group m_queue = mock.Mock() m_queue.empty.return_value = True @@ -93,12 +109,13 @@ class TestAsyncHandler(test_base.TestCase): with mock.patch('time.sleep'): async_handler._run(group, m_queue) - m_handler.assert_has_calls([mock.call(event) for event in events]) + m_handler.assert_has_calls([mock.call(event[0]) for event in events]) self.assertEqual(len(events), m_handler.call_count) @mock.patch('itertools.count') def test_run_stale(self, m_count): - events = [mock.sentinel.event1, mock.sentinel.event2] + events = [(x, (), {}) for x in (mock.sentinel.event1, + mock.sentinel.event2)] group = mock.sentinel.group m_queue = mock.Mock() m_queue.empty.side_effect = [False, True, True] diff --git a/kuryr_kubernetes/tests/unit/test_watcher.py b/kuryr_kubernetes/tests/unit/test_watcher.py index 29960eeec..a209320a2 100644 --- a/kuryr_kubernetes/tests/unit/test_watcher.py +++ b/kuryr_kubernetes/tests/unit/test_watcher.py @@ -179,13 +179,16 @@ class TestWatcher(test_base.TestCase): path = '/test' m_tg = mock.Mock() m_th = mock.Mock() + m_tt = mock.Mock() m_handler = mock.Mock() watcher_obj = watcher.Watcher(m_handler, m_tg) watcher_obj._idle[path] = True watcher_obj._watching[path] = m_th + watcher_obj._timers[path] = m_tt watcher_obj._stop_watch(path) + m_tt.stop.assert_called() m_th.stop.assert_called() def test_stop_watch_idle(self): diff --git a/kuryr_kubernetes/watcher.py b/kuryr_kubernetes/watcher.py index 775a93a05..ce54abe96 100644 --- a/kuryr_kubernetes/watcher.py +++ b/kuryr_kubernetes/watcher.py @@ -17,6 +17,7 @@ import sys import time from kuryr_kubernetes import clients +from kuryr_kubernetes import exceptions from kuryr_kubernetes.handlers import health from kuryr_kubernetes import utils from oslo_config import cfg @@ -78,6 +79,7 @@ class Watcher(health.HealthHandler): self._running = False self._resources = set() self._watching = {} + self._timers = {} self._idle = {} if timeout is None: @@ -131,11 +133,41 @@ class Watcher(health.HealthHandler): for path in list(self._watching): self._stop_watch(path) + def _reconcile(self, path): + LOG.debug(f'Getting {path} for reconciliation.') + try: + response = self._client.get(path) + resources = response['items'] + except exceptions.K8sClientException: + LOG.exception(f'Error getting path when reconciling.') + return + + for resource in resources: + kind = response['kind'] + # Strip List from e.g. PodList. For some reason `.items` of a list + # returned from API doesn't have `kind` set. + if kind.endswith('List'): + kind = kind[:-4] + resource['kind'] = kind + + event = { + 'type': 'MODIFIED', + 'object': resource, + } + self._handler(event, injected=True) + def _start_watch(self, path): tg = self._thread_group self._idle[path] = True if tg: self._watching[path] = tg.add_thread(self._watch, path) + period = CONF.kubernetes.watch_reconcile_period + if period > 0: + # Let's make sure handlers won't reconcile at the same time. + initial_delay = period + 5 * len(self._timers) + self._timers[path] = tg.add_timer_args( + period, self._reconcile, args=(path,), + initial_delay=initial_delay, stop_on_exception=False) else: self._watching[path] = None self._watch(path) @@ -143,15 +175,21 @@ class Watcher(health.HealthHandler): def _stop_watch(self, path): if self._idle.get(path): if self._thread_group and path in self._watching: + if CONF.kubernetes.watch_reconcile_period: + self._timers[path].stop() self._watching[path].stop() # NOTE(dulek): Thread gets killed immediately, so we need to # take care of this ourselves. + if CONF.kubernetes.watch_reconcile_period: + self._timers.pop(path, None) self._watching.pop(path, None) self._idle.pop(path, None) def _graceful_watch_exit(self, path): try: self._watching.pop(path, None) + if CONF.kubernetes.watch_reconcile_period: + self._timers.pop(path, None) self._idle.pop(path, None) LOG.info("Stopped watching '%s'", path) except KeyError: diff --git a/lower-constraints.txt b/lower-constraints.txt index 1b3b75205..670e3f3ce 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -73,8 +73,8 @@ oslo.policy==1.34.0 oslo.privsep==1.28.0 oslo.reports==1.18.0 oslo.serialization==2.18.0 -oslo.service==1.24.0 -oslo.utils==3.33.0 +oslo.service==1.40.2 +oslo.utils==3.40.2 oslo.versionedobjects==1.32.0 oslotest==3.2.0 packaging==17.1 diff --git a/requirements.txt b/requirements.txt index 33d32e20c..8df466d09 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,8 +15,8 @@ oslo.config>=6.1.0 # Apache-2.0 oslo.log>=3.36.0 # Apache-2.0 oslo.reports>=1.18.0 # Apache-2.0 oslo.serialization!=2.19.1,>=2.18.0 # Apache-2.0 -oslo.service!=1.28.1,>=1.24.0 # Apache-2.0 -oslo.utils>=3.33.0 # Apache-2.0 +oslo.service>=1.40.2 # Apache-2.0 +oslo.utils>=3.40.2 # Apache-2.0 os-vif>=1.12.0 # Apache-2.0 PrettyTable<0.8,>=0.7.2 # BSD pyroute2>=0.5.7;sys_platform!='win32' # Apache-2.0 (+ dual licensed GPL2)