Skip stale K8s events
When a new K8s resource (e.g. pod) is created, a series of events are triggered. The current flow is as follows: 1. events A and B happen in a short succession; neither of them contain Kuryr-related annotation 2. Kuryr handles event A, creates necessary Neutron resources and adds an annotation, which triggers another event C 3. Kuryr then handles event B, but since the annotations added in previous step are not yet available in event B, Kuryr is not aware of already created Neutron resources This patch introduces a workaround that allows to skip events that happen in short 'bursts' so handlers will only receive the latest event. A more robust solution will be added in a later patch. Change-Id: I794b34a7b9518b8212ff14e61e2bde61f44546f9
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
|
||||
import itertools
|
||||
from six.moves import queue as six_queue
|
||||
import time
|
||||
|
||||
from kuryr.lib._i18n import _LC
|
||||
from oslo_log import log as logging
|
||||
@@ -26,7 +27,7 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_QUEUE_DEPTH = 100
|
||||
DEFAULT_GRACE_PERIOD = 5
|
||||
INF = float("inf")
|
||||
STALE_PERIOD = 0.5
|
||||
|
||||
|
||||
class Async(base.EventHandler):
|
||||
@@ -70,6 +71,32 @@ class Async(base.EventHandler):
|
||||
event = queue.get(timeout=self._grace_period)
|
||||
except six_queue.Empty:
|
||||
break
|
||||
# FIXME(ivc): temporary workaround to skip stale events
|
||||
# If K8s updates resource while the handler is processing it,
|
||||
# when the handler finishes its work it can fail to update an
|
||||
# annotation due to the 'resourceVersion' conflict. K8sClient
|
||||
# was updated to allow *new* annotations to be set ignoring
|
||||
# 'resourceVersion', but it leads to another problem as the
|
||||
# Handler will receive old events (i.e. before annotation is set)
|
||||
# and will start processing the event 'from scratch'.
|
||||
# It has negative effect on handlers' performance (VIFHandler
|
||||
# creates ports only to later delete them and LBaaS handler also
|
||||
# produces some excess requests to Neutron, although with lesser
|
||||
# impact).
|
||||
# Possible solutions (can be combined):
|
||||
# - use K8s ThirdPartyResources to store data/annotations instead
|
||||
# of native K8s resources (assuming Kuryr-K8s will own those
|
||||
# resources and no one else would update them)
|
||||
# - use the resulting 'resourceVersion' received from K8sClient's
|
||||
# 'annotate' to provide feedback to Async to skip all events
|
||||
# until that version
|
||||
# - stick to the 'get-or-create' behaviour in handlers and
|
||||
# also introduce cache for long operations
|
||||
time.sleep(STALE_PERIOD)
|
||||
while not queue.empty():
|
||||
event = queue.get()
|
||||
if queue.empty():
|
||||
time.sleep(STALE_PERIOD)
|
||||
self._handler(event)
|
||||
|
||||
def _done(self, thread, group):
|
||||
|
@@ -67,13 +67,14 @@ class TestAsyncHandler(test_base.TestCase):
|
||||
event = mock.sentinel.event
|
||||
group = mock.sentinel.group
|
||||
m_queue = mock.Mock()
|
||||
m_queue.empty.return_value = True
|
||||
m_queue.get.return_value = event
|
||||
m_handler = mock.Mock()
|
||||
m_count.return_value = [1]
|
||||
async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(),
|
||||
queue_depth=1)
|
||||
|
||||
with mock.patch('time.time', return_value=0):
|
||||
with mock.patch('time.sleep'):
|
||||
async_handler._run(group, m_queue)
|
||||
|
||||
m_handler.assert_called_once_with(event)
|
||||
@@ -83,14 +84,33 @@ class TestAsyncHandler(test_base.TestCase):
|
||||
events = [mock.sentinel.event1, mock.sentinel.event2]
|
||||
group = mock.sentinel.group
|
||||
m_queue = mock.Mock()
|
||||
m_queue.empty.return_value = True
|
||||
m_queue.get.side_effect = events + [six_queue.Empty()]
|
||||
m_handler = mock.Mock()
|
||||
m_count.return_value = list(range(5))
|
||||
async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock())
|
||||
|
||||
async_handler._run(group, m_queue)
|
||||
with mock.patch('time.sleep'):
|
||||
async_handler._run(group, m_queue)
|
||||
|
||||
m_handler.assert_has_calls([mock.call(event) for event in events])
|
||||
self.assertEqual(len(events), m_handler.call_count)
|
||||
|
||||
@mock.patch('itertools.count')
|
||||
def test_run_stale(self, m_count):
|
||||
events = [mock.sentinel.event1, mock.sentinel.event2]
|
||||
group = mock.sentinel.group
|
||||
m_queue = mock.Mock()
|
||||
m_queue.empty.side_effect = [False, True, True]
|
||||
m_queue.get.side_effect = events + [six_queue.Empty()]
|
||||
m_handler = mock.Mock()
|
||||
m_count.return_value = list(range(5))
|
||||
async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock())
|
||||
|
||||
with mock.patch('time.sleep'):
|
||||
async_handler._run(group, m_queue)
|
||||
|
||||
m_handler.assert_called_once_with(mock.sentinel.event2)
|
||||
|
||||
def test_done(self):
|
||||
group = mock.sentinel.group
|
||||
|
Reference in New Issue
Block a user