diff --git a/kuryr_kubernetes/handlers/asynchronous.py b/kuryr_kubernetes/handlers/asynchronous.py index 8be29840f..a4bb435ad 100644 --- a/kuryr_kubernetes/handlers/asynchronous.py +++ b/kuryr_kubernetes/handlers/asynchronous.py @@ -15,6 +15,7 @@ import itertools from six.moves import queue as six_queue +import time from kuryr.lib._i18n import _LC from oslo_log import log as logging @@ -26,7 +27,7 @@ LOG = logging.getLogger(__name__) DEFAULT_QUEUE_DEPTH = 100 DEFAULT_GRACE_PERIOD = 5 -INF = float("inf") +STALE_PERIOD = 0.5 class Async(base.EventHandler): @@ -70,6 +71,32 @@ class Async(base.EventHandler): event = queue.get(timeout=self._grace_period) except six_queue.Empty: break + # FIXME(ivc): temporary workaround to skip stale events + # If K8s updates resource while the handler is processing it, + # when the handler finishes its work it can fail to update an + # annotation due to the 'resourceVersion' conflict. K8sClient + # was updated to allow *new* annotations to be set ignoring + # 'resourceVersion', but it leads to another problem as the + # Handler will receive old events (i.e. before annotation is set) + # and will start processing the event 'from scratch'. + # It has negative effect on handlers' performance (VIFHandler + # creates ports only to later delete them and LBaaS handler also + # produces some excess requests to Neutron, although with lesser + # impact). + # Possible solutions (can be combined): + # - use K8s ThirdPartyResources to store data/annotations instead + # of native K8s resources (assuming Kuryr-K8s will own those + # resources and no one else would update them) + # - use the resulting 'resourceVersion' received from K8sClient's + # 'annotate' to provide feedback to Async to skip all events + # until that version + # - stick to the 'get-or-create' behaviour in handlers and + # also introduce cache for long operations + time.sleep(STALE_PERIOD) + while not queue.empty(): + event = queue.get() + if queue.empty(): + time.sleep(STALE_PERIOD) self._handler(event) def _done(self, thread, group): diff --git a/kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py b/kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py index 60dd4061b..8095e3f8e 100644 --- a/kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py +++ b/kuryr_kubernetes/tests/unit/handlers/test_asynchronous.py @@ -67,13 +67,14 @@ class TestAsyncHandler(test_base.TestCase): event = mock.sentinel.event group = mock.sentinel.group m_queue = mock.Mock() + m_queue.empty.return_value = True m_queue.get.return_value = event m_handler = mock.Mock() m_count.return_value = [1] async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(), queue_depth=1) - with mock.patch('time.time', return_value=0): + with mock.patch('time.sleep'): async_handler._run(group, m_queue) m_handler.assert_called_once_with(event) @@ -83,14 +84,33 @@ class TestAsyncHandler(test_base.TestCase): events = [mock.sentinel.event1, mock.sentinel.event2] group = mock.sentinel.group m_queue = mock.Mock() + m_queue.empty.return_value = True m_queue.get.side_effect = events + [six_queue.Empty()] m_handler = mock.Mock() m_count.return_value = list(range(5)) async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock()) - async_handler._run(group, m_queue) + with mock.patch('time.sleep'): + async_handler._run(group, m_queue) m_handler.assert_has_calls([mock.call(event) for event in events]) + self.assertEqual(len(events), m_handler.call_count) + + @mock.patch('itertools.count') + def test_run_stale(self, m_count): + events = [mock.sentinel.event1, mock.sentinel.event2] + group = mock.sentinel.group + m_queue = mock.Mock() + m_queue.empty.side_effect = [False, True, True] + m_queue.get.side_effect = events + [six_queue.Empty()] + m_handler = mock.Mock() + m_count.return_value = list(range(5)) + async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock()) + + with mock.patch('time.sleep'): + async_handler._run(group, m_queue) + + m_handler.assert_called_once_with(mock.sentinel.event2) def test_done(self): group = mock.sentinel.group