Merge "Add more info to Async handler logs"

This commit is contained in:
Zuul 2022-07-22 17:06:16 +00:00 committed by Gerrit Code Review
commit cf2cf599d6
5 changed files with 54 additions and 27 deletions

View File

@ -68,4 +68,5 @@ class ControllerPipeline(h_dis.EventPipeline):
def _wrap_dispatcher(self, dispatcher):
return h_log.LogExceptions(h_async.Async(dispatcher, self._tg,
h_k8s.object_uid))
h_k8s.object_uid,
h_k8s.object_info))

View File

@ -40,12 +40,13 @@ class Async(base.EventHandler):
handled serially and in the same order they arrived to `Async`.
"""
def __init__(self, handler, thread_group, group_by,
def __init__(self, handler, thread_group, group_by, info_func,
queue_depth=DEFAULT_QUEUE_DEPTH,
grace_period=DEFAULT_GRACE_PERIOD):
self._handler = handler
self._thread_group = thread_group
self._group_by = group_by
self._info_func = info_func
self._queue_depth = queue_depth
self._grace_period = grace_period
self._queues = {}
@ -62,12 +63,15 @@ class Async(base.EventHandler):
except KeyError:
queue = py_queue.Queue(self._queue_depth)
self._queues[group] = queue
thread = self._thread_group.add_thread(self._run, group, queue)
thread.link(self._done, group)
info = self._info_func(event)
thread = self._thread_group.add_thread(self._run, group, queue,
info)
thread.link(self._done, group, info)
queue.put((event, args, kwargs))
def _run(self, group, queue):
LOG.trace("Asynchronous handler started processing %s", group)
def _run(self, group, queue, info):
LOG.trace("Asynchronous handler started processing %s (%s)", group,
info)
for _ in itertools.count():
# NOTE(ivc): this is a mock-friendly replacement for 'while True'
# to allow more controlled environment for unit-tests (e.g. to
@ -104,14 +108,16 @@ class Async(base.EventHandler):
time.sleep(STALE_PERIOD)
self._handler(event, *args, **kwargs)
def _done(self, thread, group):
LOG.trace("Asynchronous handler stopped processing group %s", group)
def _done(self, thread, group, info):
LOG.trace("Asynchronous handler stopped processing group %s (%s)",
group, info)
queue = self._queues.pop(group)
if not queue.empty():
LOG.critical("Asynchronous handler terminated abnormally; "
"%(count)s events dropped for %(group)s",
{'count': queue.qsize(), 'group': group})
LOG.critical(
"Asynchronous handler thread terminated abnormally; %(count)s "
"events dropped for %(group)s (%(info)s)",
{'count': queue.qsize(), 'group': group, 'info': info})
if not self._queues:
LOG.trace("Asynchronous handler is idle")

View File

@ -31,6 +31,17 @@ def object_uid(event):
return None
def object_info(event):
try:
resource = event['object']
try:
return "%(kind)s %(namespace)s/%(name)s" % resource['metadata']
except KeyError:
return "%(kind)s: %(name)s" % resource['metadata']
except KeyError:
return None
class ResourceEventHandler(dispatch.EventConsumer, health.HealthHandler):
"""Base class for K8s event handlers.

View File

@ -58,4 +58,4 @@ class TestControllerPipeline(test_base.TestCase):
self.assertEqual(logging_handler, ret)
m_logging_type.assert_called_with(async_handler)
m_async_type.assert_called_with(dispatcher, thread_group,
h_k8s.object_uid)
h_k8s.object_uid, h_k8s.object_info)

View File

@ -28,7 +28,9 @@ class TestAsyncHandler(test_base.TestCase):
m_queue = mock.Mock()
m_handler = mock.Mock()
m_group_by = mock.Mock(return_value=group)
async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by)
m_info = mock.Mock(return_value=group)
async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by,
m_info)
async_handler._queues[group] = m_queue
async_handler(event)
@ -49,7 +51,8 @@ class TestAsyncHandler(test_base.TestCase):
m_tg = mock.Mock()
m_tg.add_thread.return_value = m_th
m_group_by = mock.Mock(return_value=group)
async_handler = h_async.Async(m_handler, m_tg, m_group_by,
m_info = mock.Mock(return_value=group)
async_handler = h_async.Async(m_handler, m_tg, m_group_by, m_info,
queue_depth=queue_depth)
async_handler(event)
@ -58,8 +61,8 @@ class TestAsyncHandler(test_base.TestCase):
m_queue_type.assert_called_once_with(queue_depth)
self.assertEqual({group: m_queue}, async_handler._queues)
m_tg.add_thread.assert_called_once_with(async_handler._run, group,
m_queue)
m_th.link.assert_called_once_with(async_handler._done, group)
m_queue, group)
m_th.link.assert_called_once_with(async_handler._done, group, group)
m_queue.put.assert_called_once_with((event, (), {}))
def test_call_injected(self):
@ -68,7 +71,9 @@ class TestAsyncHandler(test_base.TestCase):
m_queue = mock.Mock()
m_handler = mock.Mock()
m_group_by = mock.Mock(return_value=group)
async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by)
m_info = mock.Mock(return_value=group)
async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by,
m_info)
async_handler._queues[group] = m_queue
async_handler(event, injected=True)
@ -87,10 +92,10 @@ class TestAsyncHandler(test_base.TestCase):
m_handler = mock.Mock()
m_count.return_value = [1]
async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(),
queue_depth=1)
mock.Mock(), queue_depth=1)
with mock.patch('time.sleep'):
async_handler._run(group, m_queue)
async_handler._run(group, m_queue, None)
m_handler.assert_called_once_with(event)
@ -104,10 +109,11 @@ class TestAsyncHandler(test_base.TestCase):
m_queue.get.side_effect = events + [queue.Empty()]
m_handler = mock.Mock()
m_count.return_value = list(range(5))
async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock())
async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(),
mock.Mock())
with mock.patch('time.sleep'):
async_handler._run(group, m_queue)
async_handler._run(group, m_queue, None)
m_handler.assert_has_calls([mock.call(event[0]) for event in events])
self.assertEqual(len(events), m_handler.call_count)
@ -122,20 +128,22 @@ class TestAsyncHandler(test_base.TestCase):
m_queue.get.side_effect = events + [queue.Empty()]
m_handler = mock.Mock()
m_count.return_value = list(range(5))
async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock())
async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(),
mock.Mock())
with mock.patch('time.sleep'):
async_handler._run(group, m_queue)
async_handler._run(group, m_queue, None)
m_handler.assert_called_once_with(mock.sentinel.event2)
def test_done(self):
group = mock.sentinel.group
m_queue = mock.Mock()
async_handler = h_async.Async(mock.Mock(), mock.Mock(), mock.Mock())
async_handler = h_async.Async(mock.Mock(), mock.Mock(), mock.Mock(),
mock.Mock())
async_handler._queues[group] = m_queue
async_handler._done(mock.Mock(), group)
async_handler._done(mock.Mock(), group, None)
self.assertFalse(async_handler._queues)
@ -144,9 +152,10 @@ class TestAsyncHandler(test_base.TestCase):
group = mock.sentinel.group
m_queue = mock.Mock()
m_queue.empty.return_value = False
async_handler = h_async.Async(mock.Mock(), mock.Mock(), mock.Mock())
async_handler = h_async.Async(mock.Mock(), mock.Mock(), mock.Mock(),
mock.Mock())
async_handler._queues[group] = m_queue
async_handler._done(mock.Mock(), group)
async_handler._done(mock.Mock(), group, None)
m_critical.assert_called_once()