Use resourceVersion when re-registering expired watch

This is related to change https://bugs.launchpad.net/starlingx/+bug/1914408

When we re-register a watch on the 410/expired event from the kubernetes
watch API, it resends the last event from the watch. This causes the
cert-mon watcher to process the event as new.

From the Watch source code, the suggestion is to query the current
kubernetes 'resourceVersion' attribute for the resource under watch, and
then apply that when re-registering the watch.

Reference:
https://github.com/kubernetes-client/python-base/blob/
b4d3aad42dc23e7a6c0e5c032691f8dc385a786c/watch/watch.py#L119:

> Note that watching an API resource can expire. The method tries to
> resume automatically once from the last result, but if that last result
> is too old as well, an `ApiException` exception will be thrown with
> ``code`` 410. In that case you have to recover yourself, probably
> by listing the API resource to obtain the latest state and then
> watching from that state on by setting ``resource_version`` to
> one returned from listing.

Local testing shows that this approach avoids the replaying of last
watch events.

Related-Bug: 1914408
Story: 2008960
Task: 42791

Signed-off-by: Kyle MacLeod <kyle.macleod@windriver.com>
Change-Id: I7e6370807feccaad8bc6a4ebebd3252b407d68ff
This commit is contained in:
Kyle MacLeod 2021-07-13 16:14:28 -04:00
parent 4b4d4630f4
commit 34549da01a

View File

@ -206,6 +206,7 @@ class CertWatcher(object):
self.listeners = []
self.namespace = None
self.context = MonitorContext()
self.last_resource_version = None
def register_listener(self, listener):
return self.listeners.append(listener)
@ -216,18 +217,62 @@ class CertWatcher(object):
c.verify_ssl = True
Configuration.set_default(c)
ccApi = client.CoreV1Api()
w = watch.Watch()
kube_watch = watch.Watch()
LOG.info('Monitor secrets in %s' % self.namespace)
for item in w.stream(ccApi.list_namespaced_secret, namespace=self.namespace):
LOG.debug('Received new event: %s' % (item))
kwargs = {'namespace': self.namespace}
if self.last_resource_version is not None:
# Include resource version in call to watch. Ensures we start watch
# from same point of expiry. Reference:
# https://github.com/kubernetes-client/python-base/blob/
# b4d3aad42dc23e7a6c0e5c032691f8dc385a786c/watch/watch.py#L119
kwargs['resource_version'] = self.last_resource_version
LOG.info('Monitor secrets in %s using resource version: %s'
% (self.namespace, self.last_resource_version))
for item in kube_watch.stream(ccApi.list_namespaced_secret, **kwargs):
LOG.debug('Received new event: %s, %s'
% (type(item.get('object')), item))
event_type = item.get('type')
if not event_type or event_type == 'ERROR':
# we received an unknown or error event on the watch, instead of trying to
# identify the exact error, we simply start from scratch, which should be
# always safe to do
LOG.info('Received unknown or error event on watch, restarting it')
w.stop()
if not event_type:
LOG.error(
'Received unexpected event on watch, restarting it: %s'
% item)
self.last_resource_version = None
kube_watch.stop()
return
if event_type == 'ERROR':
# Watch will be restarted, hopefully with the retrieved last
# resourceVersion
if 'raw_object' in item and item['raw_object'].get('code') == 410:
# Expired watch. Retrieve current resource_version for use
# in watch re-registration
try:
response = ccApi.list_namespaced_secret(self.namespace)
self.last_resource_version = \
response.metadata.resource_version
LOG.debug(
("Setting last_resource_version: %s "
"for namespace: %s")
% (self.last_resource_version, self.namespace))
except Exception as e:
self.last_resource_version = None
LOG.error(
"Failed to retrieve resource_version, namespace: %s"
% self.namespace)
LOG.exception(e)
LOG.info(
("Received expired event on watch '%s', "
"restarting with resource_version: %s")
% (self.namespace, self.last_resource_version))
else:
# Unexpected error. Restart without resourceVersion.
self.last_resource_version = None
LOG.error(
("Received unexpected type=ERROR event on watch, "
"restarting it: %s") % item)
kube_watch.stop()
return
event_data = CertUpdateEventData(item)