Add more info to Async handler logs

This commit makes sure every log Async handler produces includes
information about what K8s resource is processed by the thread in
question. Without this patch only UID is printed which isn't something
that helps a lot when debugging K8s e2e tests results or when
correlating with CNI logs.

Change-Id: I4183b17efd8ad37731e2fc3c3db1ad7b76f18534
This commit is contained in:
Michał Dulko 2022-05-24 16:43:46 +02:00
parent 8f61307fa6
commit b69e991a27
5 changed files with 54 additions and 27 deletions

View File

@ -68,4 +68,5 @@ class ControllerPipeline(h_dis.EventPipeline):
def _wrap_dispatcher(self, dispatcher):
return h_log.LogExceptions(h_async.Async(dispatcher, self._tg,
h_k8s.object_uid))
h_k8s.object_uid,
h_k8s.object_info))

View File

@ -40,12 +40,13 @@ class Async(base.EventHandler):
handled serially and in the same order they arrived to `Async`.
"""
def __init__(self, handler, thread_group, group_by,
def __init__(self, handler, thread_group, group_by, info_func,
queue_depth=DEFAULT_QUEUE_DEPTH,
grace_period=DEFAULT_GRACE_PERIOD):
self._handler = handler
self._thread_group = thread_group
self._group_by = group_by
self._info_func = info_func
self._queue_depth = queue_depth
self._grace_period = grace_period
self._queues = {}
@ -62,12 +63,15 @@ class Async(base.EventHandler):
except KeyError:
queue = py_queue.Queue(self._queue_depth)
self._queues[group] = queue
thread = self._thread_group.add_thread(self._run, group, queue)
thread.link(self._done, group)
info = self._info_func(event)
thread = self._thread_group.add_thread(self._run, group, queue,
info)
thread.link(self._done, group, info)
queue.put((event, args, kwargs))
def _run(self, group, queue):
LOG.trace("Asynchronous handler started processing %s", group)
def _run(self, group, queue, info):
LOG.trace("Asynchronous handler started processing %s (%s)", group,
info)
for _ in itertools.count():
# NOTE(ivc): this is a mock-friendly replacement for 'while True'
# to allow more controlled environment for unit-tests (e.g. to
@ -104,14 +108,16 @@ class Async(base.EventHandler):
time.sleep(STALE_PERIOD)
self._handler(event, *args, **kwargs)
def _done(self, thread, group):
LOG.trace("Asynchronous handler stopped processing group %s", group)
def _done(self, thread, group, info):
LOG.trace("Asynchronous handler stopped processing group %s (%s)",
group, info)
queue = self._queues.pop(group)
if not queue.empty():
LOG.critical("Asynchronous handler terminated abnormally; "
"%(count)s events dropped for %(group)s",
{'count': queue.qsize(), 'group': group})
LOG.critical(
"Asynchronous handler thread terminated abnormally; %(count)s "
"events dropped for %(group)s (%(info)s)",
{'count': queue.qsize(), 'group': group, 'info': info})
if not self._queues:
LOG.trace("Asynchronous handler is idle")

View File

@ -31,6 +31,17 @@ def object_uid(event):
return None
def object_info(event):
try:
resource = event['object']
try:
return "%(kind)s %(namespace)s/%(name)s" % resource['metadata']
except KeyError:
return "%(kind)s: %(name)s" % resource['metadata']
except KeyError:
return None
class ResourceEventHandler(dispatch.EventConsumer, health.HealthHandler):
"""Base class for K8s event handlers.

View File

@ -58,4 +58,4 @@ class TestControllerPipeline(test_base.TestCase):
self.assertEqual(logging_handler, ret)
m_logging_type.assert_called_with(async_handler)
m_async_type.assert_called_with(dispatcher, thread_group,
h_k8s.object_uid)
h_k8s.object_uid, h_k8s.object_info)

View File

@ -28,7 +28,9 @@ class TestAsyncHandler(test_base.TestCase):
m_queue = mock.Mock()
m_handler = mock.Mock()
m_group_by = mock.Mock(return_value=group)
async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by)
m_info = mock.Mock(return_value=group)
async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by,
m_info)
async_handler._queues[group] = m_queue
async_handler(event)
@ -49,7 +51,8 @@ class TestAsyncHandler(test_base.TestCase):
m_tg = mock.Mock()
m_tg.add_thread.return_value = m_th
m_group_by = mock.Mock(return_value=group)
async_handler = h_async.Async(m_handler, m_tg, m_group_by,
m_info = mock.Mock(return_value=group)
async_handler = h_async.Async(m_handler, m_tg, m_group_by, m_info,
queue_depth=queue_depth)
async_handler(event)
@ -58,8 +61,8 @@ class TestAsyncHandler(test_base.TestCase):
m_queue_type.assert_called_once_with(queue_depth)
self.assertEqual({group: m_queue}, async_handler._queues)
m_tg.add_thread.assert_called_once_with(async_handler._run, group,
m_queue)
m_th.link.assert_called_once_with(async_handler._done, group)
m_queue, group)
m_th.link.assert_called_once_with(async_handler._done, group, group)
m_queue.put.assert_called_once_with((event, (), {}))
def test_call_injected(self):
@ -68,7 +71,9 @@ class TestAsyncHandler(test_base.TestCase):
m_queue = mock.Mock()
m_handler = mock.Mock()
m_group_by = mock.Mock(return_value=group)
async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by)
m_info = mock.Mock(return_value=group)
async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by,
m_info)
async_handler._queues[group] = m_queue
async_handler(event, injected=True)
@ -87,10 +92,10 @@ class TestAsyncHandler(test_base.TestCase):
m_handler = mock.Mock()
m_count.return_value = [1]
async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(),
queue_depth=1)
mock.Mock(), queue_depth=1)
with mock.patch('time.sleep'):
async_handler._run(group, m_queue)
async_handler._run(group, m_queue, None)
m_handler.assert_called_once_with(event)
@ -104,10 +109,11 @@ class TestAsyncHandler(test_base.TestCase):
m_queue.get.side_effect = events + [queue.Empty()]
m_handler = mock.Mock()
m_count.return_value = list(range(5))
async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock())
async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(),
mock.Mock())
with mock.patch('time.sleep'):
async_handler._run(group, m_queue)
async_handler._run(group, m_queue, None)
m_handler.assert_has_calls([mock.call(event[0]) for event in events])
self.assertEqual(len(events), m_handler.call_count)
@ -122,20 +128,22 @@ class TestAsyncHandler(test_base.TestCase):
m_queue.get.side_effect = events + [queue.Empty()]
m_handler = mock.Mock()
m_count.return_value = list(range(5))
async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock())
async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(),
mock.Mock())
with mock.patch('time.sleep'):
async_handler._run(group, m_queue)
async_handler._run(group, m_queue, None)
m_handler.assert_called_once_with(mock.sentinel.event2)
def test_done(self):
group = mock.sentinel.group
m_queue = mock.Mock()
async_handler = h_async.Async(mock.Mock(), mock.Mock(), mock.Mock())
async_handler = h_async.Async(mock.Mock(), mock.Mock(), mock.Mock(),
mock.Mock())
async_handler._queues[group] = m_queue
async_handler._done(mock.Mock(), group)
async_handler._done(mock.Mock(), group, None)
self.assertFalse(async_handler._queues)
@ -144,9 +152,10 @@ class TestAsyncHandler(test_base.TestCase):
group = mock.sentinel.group
m_queue = mock.Mock()
m_queue.empty.return_value = False
async_handler = h_async.Async(mock.Mock(), mock.Mock(), mock.Mock())
async_handler = h_async.Async(mock.Mock(), mock.Mock(), mock.Mock(),
mock.Mock())
async_handler._queues[group] = m_queue
async_handler._done(mock.Mock(), group)
async_handler._done(mock.Mock(), group, None)
m_critical.assert_called_once()