diff --git a/kuryr_kubernetes/controller/handlers/pipeline.py b/kuryr_kubernetes/controller/handlers/pipeline.py index d82b3064f..e9cebf9b0 100644 --- a/kuryr_kubernetes/controller/handlers/pipeline.py +++ b/kuryr_kubernetes/controller/handlers/pipeline.py @@ -68,4 +68,5 @@ class ControllerPipeline(h_dis.EventPipeline): def _wrap_dispatcher(self, dispatcher): return h_log.LogExceptions(h_async.Async(dispatcher, self._tg, - h_k8s.object_uid)) + h_k8s.object_uid, + h_k8s.object_info)) diff --git a/kuryr_kubernetes/handlers/asynchronous.py b/kuryr_kubernetes/handlers/asynchronous.py index edb8f0c4a..6204974b2 100755 --- a/kuryr_kubernetes/handlers/asynchronous.py +++ b/kuryr_kubernetes/handlers/asynchronous.py @@ -40,12 +40,13 @@ class Async(base.EventHandler): handled serially and in the same order they arrived to `Async`. """ - def __init__(self, handler, thread_group, group_by, + def __init__(self, handler, thread_group, group_by, info_func, queue_depth=DEFAULT_QUEUE_DEPTH, grace_period=DEFAULT_GRACE_PERIOD): self._handler = handler self._thread_group = thread_group self._group_by = group_by + self._info_func = info_func self._queue_depth = queue_depth self._grace_period = grace_period self._queues = {} @@ -62,12 +63,15 @@ class Async(base.EventHandler): except KeyError: queue = py_queue.Queue(self._queue_depth) self._queues[group] = queue - thread = self._thread_group.add_thread(self._run, group, queue) - thread.link(self._done, group) + info = self._info_func(event) + thread = self._thread_group.add_thread(self._run, group, queue, + info) + thread.link(self._done, group, info) queue.put((event, args, kwargs)) - def _run(self, group, queue): - LOG.trace("Asynchronous handler started processing %s", group) + def _run(self, group, queue, info): + LOG.trace("Asynchronous handler started processing %s (%s)", group, + info) for _ in itertools.count(): # NOTE(ivc): this is a mock-friendly replacement for 'while True' # to allow more controlled environment for unit-tests (e.g. to @@ -104,14 +108,16 @@ class Async(base.EventHandler): time.sleep(STALE_PERIOD) self._handler(event, *args, **kwargs) - def _done(self, thread, group): - LOG.trace("Asynchronous handler stopped processing group %s", group) + def _done(self, thread, group, info): + LOG.trace("Asynchronous handler stopped processing group %s (%s)", + group, info) queue = self._queues.pop(group) if not queue.empty(): - LOG.critical("Asynchronous handler terminated abnormally; " - "%(count)s events dropped for %(group)s", - {'count': queue.qsize(), 'group': group}) + LOG.critical( + "Asynchronous handler thread terminated abnormally; %(count)s " + "events dropped for %(group)s (%(info)s)", + {'count': queue.qsize(), 'group': group, 'info': info}) if not self._queues: LOG.trace("Asynchronous handler is idle") diff --git a/kuryr_kubernetes/handlers/k8s_base.py b/kuryr_kubernetes/handlers/k8s_base.py index 7b291d887..78e479c2a 100755 --- a/kuryr_kubernetes/handlers/k8s_base.py +++ b/kuryr_kubernetes/handlers/k8s_base.py @@ -31,6 +31,17 @@ def object_uid(event): return None +def object_info(event): + try: + resource = event['object'] + try: + return "%(kind)s %(namespace)s/%(name)s" % resource['metadata'] + except KeyError: + return "%(kind)s: %(name)s" % resource['metadata'] + except KeyError: + return None + + class ResourceEventHandler(dispatch.EventConsumer, health.HealthHandler): """Base class for K8s event handlers. diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py index 4bc017ec9..b41d78691 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py @@ -58,4 +58,4 @@ class TestControllerPipeline(test_base.TestCase): self.assertEqual(logging_handler, ret) m_logging_type.assert_called_with(async_handler) m_async_type.assert_called_with(dispatcher, thread_group, - h_k8s.object_uid) + h_k8s.object_uid, h_k8s.object_info) diff --git a/kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py b/kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py index 1e4368948..cd99cbaa9 100644 --- a/kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py +++ b/kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py @@ -28,7 +28,9 @@ class TestAsyncHandler(test_base.TestCase): m_queue = mock.Mock() m_handler = mock.Mock() m_group_by = mock.Mock(return_value=group) - async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by) + m_info = mock.Mock(return_value=group) + async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by, + m_info) async_handler._queues[group] = m_queue async_handler(event) @@ -49,7 +51,8 @@ class TestAsyncHandler(test_base.TestCase): m_tg = mock.Mock() m_tg.add_thread.return_value = m_th m_group_by = mock.Mock(return_value=group) - async_handler = h_async.Async(m_handler, m_tg, m_group_by, + m_info = mock.Mock(return_value=group) + async_handler = h_async.Async(m_handler, m_tg, m_group_by, m_info, queue_depth=queue_depth) async_handler(event) @@ -58,8 +61,8 @@ class TestAsyncHandler(test_base.TestCase): m_queue_type.assert_called_once_with(queue_depth) self.assertEqual({group: m_queue}, async_handler._queues) m_tg.add_thread.assert_called_once_with(async_handler._run, group, - m_queue) - m_th.link.assert_called_once_with(async_handler._done, group) + m_queue, group) + m_th.link.assert_called_once_with(async_handler._done, group, group) m_queue.put.assert_called_once_with((event, (), {})) def test_call_injected(self): @@ -68,7 +71,9 @@ class TestAsyncHandler(test_base.TestCase): m_queue = mock.Mock() m_handler = mock.Mock() m_group_by = mock.Mock(return_value=group) - async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by) + m_info = mock.Mock(return_value=group) + async_handler = h_async.Async(m_handler, mock.Mock(), m_group_by, + m_info) async_handler._queues[group] = m_queue async_handler(event, injected=True) @@ -87,10 +92,10 @@ class TestAsyncHandler(test_base.TestCase): m_handler = mock.Mock() m_count.return_value = [1] async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(), - queue_depth=1) + mock.Mock(), queue_depth=1) with mock.patch('time.sleep'): - async_handler._run(group, m_queue) + async_handler._run(group, m_queue, None) m_handler.assert_called_once_with(event) @@ -104,10 +109,11 @@ class TestAsyncHandler(test_base.TestCase): m_queue.get.side_effect = events + [queue.Empty()] m_handler = mock.Mock() m_count.return_value = list(range(5)) - async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock()) + async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(), + mock.Mock()) with mock.patch('time.sleep'): - async_handler._run(group, m_queue) + async_handler._run(group, m_queue, None) m_handler.assert_has_calls([mock.call(event[0]) for event in events]) self.assertEqual(len(events), m_handler.call_count) @@ -122,20 +128,22 @@ class TestAsyncHandler(test_base.TestCase): m_queue.get.side_effect = events + [queue.Empty()] m_handler = mock.Mock() m_count.return_value = list(range(5)) - async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock()) + async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(), + mock.Mock()) with mock.patch('time.sleep'): - async_handler._run(group, m_queue) + async_handler._run(group, m_queue, None) m_handler.assert_called_once_with(mock.sentinel.event2) def test_done(self): group = mock.sentinel.group m_queue = mock.Mock() - async_handler = h_async.Async(mock.Mock(), mock.Mock(), mock.Mock()) + async_handler = h_async.Async(mock.Mock(), mock.Mock(), mock.Mock(), + mock.Mock()) async_handler._queues[group] = m_queue - async_handler._done(mock.Mock(), group) + async_handler._done(mock.Mock(), group, None) self.assertFalse(async_handler._queues) @@ -144,9 +152,10 @@ class TestAsyncHandler(test_base.TestCase): group = mock.sentinel.group m_queue = mock.Mock() m_queue.empty.return_value = False - async_handler = h_async.Async(mock.Mock(), mock.Mock(), mock.Mock()) + async_handler = h_async.Async(mock.Mock(), mock.Mock(), mock.Mock(), + mock.Mock()) async_handler._queues[group] = m_queue - async_handler._done(mock.Mock(), group) + async_handler._done(mock.Mock(), group, None) m_critical.assert_called_once()