From b69e991a273a0a7d9eb6cbce7fbc40395bc05de1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Dulko?= Date: Tue, 24 May 2022 16:43:46 +0200 Subject: [PATCH] Add more info to Async handler logs This commit makes sure every log Async handler produces includes information about what K8s resource is processed by the thread in question. Without this patch only UID is printed which isn't something that helps a lot when debugging K8s e2e tests results or when correlating with CNI logs. Change-Id: I4183b17efd8ad37731e2fc3c3db1ad7b76f18534 --- .../controller/handlers/pipeline.py | 3 +- kuryr_kubernetes/handlers/asynchronous.py | 26 ++++++++----- kuryr_kubernetes/handlers/k8s_base.py | 11 ++++++ .../unit/controller/handlers/test_pipeline.py | 2 +- .../tests/unit/handlers/test_asynchronous.py | 39 ++++++++++++------- 5 files changed, 54 insertions(+), 27 deletions(-) diff --git a/kuryr_kubernetes/controller/handlers/pipeline.py b/kuryr_kubernetes/controller/handlers/pipeline.py index d82b3064f..e9cebf9b0 100644 --- a/kuryr_kubernetes/controller/handlers/pipeline.py +++ b/kuryr_kubernetes/controller/handlers/pipeline.py @@ -68,4 +68,5 @@ class ControllerPipeline(h_dis.EventPipeline): def _wrap_dispatcher(self, dispatcher): return h_log.LogExceptions(h_async.Async(dispatcher, self._tg, - h_k8s.object_uid)) + h_k8s.object_uid, + h_k8s.object_info)) diff --git a/kuryr_kubernetes/handlers/asynchronous.py b/kuryr_kubernetes/handlers/asynchronous.py index edb8f0c4a..6204974b2 100755 --- a/kuryr_kubernetes/handlers/asynchronous.py +++ b/kuryr_kubernetes/handlers/asynchronous.py @@ -40,12 +40,13 @@ class Async(base.EventHandler): handled serially and in the same order they arrived to `Async`. """ - def __init__(self, handler, thread_group, group_by, + def __init__(self, handler, thread_group, group_by, info_func, queue_depth=DEFAULT_QUEUE_DEPTH, grace_period=DEFAULT_GRACE_PERIOD): self._handler = handler self._thread_group = thread_group self._group_by = group_by + self._info_func = info_func self._queue_depth = queue_depth self._grace_period = grace_period self._queues = {} @@ -62,12 +63,15 @@ class Async(base.EventHandler): except KeyError: queue = py_queue.Queue(self._queue_depth) self._queues[group] = queue - thread = self._thread_group.add_thread(self._run, group, queue) - thread.link(self._done, group) + info = self._info_func(event) + thread = self._thread_group.add_thread(self._run, group, queue, + info) + thread.link(self._done, group, info) queue.put((event, args, kwargs)) - def _run(self, group, queue): - LOG.trace("Asynchronous handler started processing %s", group) + def _run(self, group, queue, info): + LOG.trace("Asynchronous handler started processing %s (%s)", group, + info) for _ in itertools.count(): # NOTE(ivc): this is a mock-friendly replacement for 'while True' # to allow more controlled environment for unit-tests (e.g. to @@ -104,14 +108,16 @@ class Async(base.EventHandler): time.sleep(STALE_PERIOD) self._handler(event, *args, **kwargs) - def _done(self, thread, group): - LOG.trace("Asynchronous handler stopped processing group %s", group) + def _done(self, thread, group, info): + LOG.trace("Asynchronous handler stopped processing group %s (%s)", + group, info) queue = self._queues.pop(group) if not queue.empty(): - LOG.critical("Asynchronous handler terminated abnormally; " - "%(count)s events dropped for %(group)s", - {'count': queue.qsize(), 'group': group}) + LOG.critical( + "Asynchronous handler thread terminated abnormally; %(count)s " + "events dropped for %(group)s (%(info)s)", + {'count': queue.qsize(), 'group': group, 'info': info}) if not self._queues: LOG.trace("Asynchronous handler is idle") diff --git a/kuryr_kubernetes/handlers/k8s_base.py b/kuryr_kubernetes/handlers/k8s_base.py index 7b291d887..78e479c2a 100755 --- a/kuryr_kubernetes/handlers/k8s_base.py +++ b/kuryr_kubernetes/handlers/k8s_base.py @@ -31,6 +31,17 @@ def object_uid(event): return None +def object_info(event): + try: + resource = event['object'] + try: + return "%(kind)s %(namespace)s/%(name)s" % resource['metadata'] + except KeyError: + return "%(kind)s: %(name)s" % resource['metadata'] + except KeyError: + return None + + class ResourceEventHandler(dispatch.EventConsumer, health.HealthHandler): """Base class for K8s event handlers. diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py index 4bc017ec9..b41d78691 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py @@ -58,4 +58,4 @@ class TestControllerPipeline(test_base.TestCase): self.assertEqual(logging_handler, ret) m_logging_type.assert_called_with(async_handler) m_async_type.assert_called_with(dispatcher, thread_group, - h_k8s.object_uid) + h_k8s.object_uid, h_k8s.object_info) diff --git a/kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py b/kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py index 1e4368948..cd99cbaa9 100644 --- a/kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py +++ b/kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py @@ -28,7 +28,9 @@ class TestAsyncHandler(test_base.TestCase): m_queue = mock.Mock() m_handler = mock.Mock() m_group_by = mock.Mock(return_value=group) - async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by) + m_info = mock.Mock(return_value=group) + async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by, + m_info) async_handler._queues[group] = m_queue async_handler(event) @@ -49,7 +51,8 @@ class TestAsyncHandler(test_base.TestCase): m_tg = mock.Mock() m_tg.add_thread.return_value = m_th m_group_by = mock.Mock(return_value=group) - async_handler = h_async.Async(m_handler, m_tg, m_group_by, + m_info = mock.Mock(return_value=group) + async_handler = h_async.Async(m_handler, m_tg, m_group_by, m_info, queue_depth=queue_depth) async_handler(event) @@ -58,8 +61,8 @@ class TestAsyncHandler(test_base.TestCase): m_queue_type.assert_called_once_with(queue_depth) self.assertEqual({group: m_queue}, async_handler._queues) m_tg.add_thread.assert_called_once_with(async_handler._run, group, - m_queue) - m_th.link.assert_called_once_with(async_handler._done, group) + m_queue, group) + m_th.link.assert_called_once_with(async_handler._done, group, group) m_queue.put.assert_called_once_with((event, (), {})) def test_call_injected(self): @@ -68,7 +71,9 @@ class TestAsyncHandler(test_base.TestCase): m_queue = mock.Mock() m_handler = mock.Mock() m_group_by = mock.Mock(return_value=group) - async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by) + m_info = mock.Mock(return_value=group) + async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by, + m_info) async_handler._queues[group] = m_queue async_handler(event, injected=True) @@ -87,10 +92,10 @@ class TestAsyncHandler(test_base.TestCase): m_handler = mock.Mock() m_count.return_value = [1] async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(), - queue_depth=1) + mock.Mock(), queue_depth=1) with mock.patch('time.sleep'): - async_handler._run(group, m_queue) + async_handler._run(group, m_queue, None) m_handler.assert_called_once_with(event) @@ -104,10 +109,11 @@ class TestAsyncHandler(test_base.TestCase): m_queue.get.side_effect = events + [queue.Empty()] m_handler = mock.Mock() m_count.return_value = list(range(5)) - async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock()) + async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(), + mock.Mock()) with mock.patch('time.sleep'): - async_handler._run(group, m_queue) + async_handler._run(group, m_queue, None) m_handler.assert_has_calls([mock.call(event[0]) for event in events]) self.assertEqual(len(events), m_handler.call_count) @@ -122,20 +128,22 @@ class TestAsyncHandler(test_base.TestCase): m_queue.get.side_effect = events + [queue.Empty()] m_handler = mock.Mock() m_count.return_value = list(range(5)) - async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock()) + async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(), + mock.Mock()) with mock.patch('time.sleep'): - async_handler._run(group, m_queue) + async_handler._run(group, m_queue, None) m_handler.assert_called_once_with(mock.sentinel.event2) def test_done(self): group = mock.sentinel.group m_queue = mock.Mock() - async_handler = h_async.Async(mock.Mock(), mock.Mock(), mock.Mock()) + async_handler = h_async.Async(mock.Mock(), mock.Mock(), mock.Mock(), + mock.Mock()) async_handler._queues[group] = m_queue - async_handler._done(mock.Mock(), group) + async_handler._done(mock.Mock(), group, None) self.assertFalse(async_handler._queues) @@ -144,9 +152,10 @@ class TestAsyncHandler(test_base.TestCase): group = mock.sentinel.group m_queue = mock.Mock() m_queue.empty.return_value = False - async_handler = h_async.Async(mock.Mock(), mock.Mock(), mock.Mock()) + async_handler = h_async.Async(mock.Mock(), mock.Mock(), mock.Mock(), + mock.Mock()) async_handler._queues[group] = m_queue - async_handler._done(mock.Mock(), group) + async_handler._done(mock.Mock(), group, None) m_critical.assert_called_once()