From 34549da01aa224615db1293a2d82042f4cccee3b Mon Sep 17 00:00:00 2001 From: Kyle MacLeod Date: Tue, 13 Jul 2021 16:14:28 -0400 Subject: [PATCH] Use resourceVersion when re-registering expired watch This is related to change https://bugs.launchpad.net/starlingx/+bug/1914408 When we re-register a watch on the 410/expired event from the kubernetes watch API, it resends the last event from the watch. This causes the cert-mon watcher to process the event as new. From the Watch source code, the suggestion is to query the current kubernetes 'resourceVersion' attribute for the resource under watch, and then apply that when re-registering the watch. Reference: https://github.com/kubernetes-client/python-base/blob/ b4d3aad42dc23e7a6c0e5c032691f8dc385a786c/watch/watch.py#L119: > Note that watching an API resource can expire. The method tries to > resume automatically once from the last result, but if that last result > is too old as well, an `ApiException` exception will be thrown with > ``code`` 410. In that case you have to recover yourself, probably > by listing the API resource to obtain the latest state and then > watching from that state on by setting ``resource_version`` to > one returned from listing. Local testing shows that this approach avoids the replaying of last watch events. Related-Bug: 1914408 Story: 2008960 Task: 42791 Signed-off-by: Kyle MacLeod Change-Id: I7e6370807feccaad8bc6a4ebebd3252b407d68ff --- .../sysinv/sysinv/sysinv/cert_mon/watcher.py | 65 ++++++++++++++++--- 1 file changed, 55 insertions(+), 10 deletions(-) diff --git a/sysinv/sysinv/sysinv/sysinv/cert_mon/watcher.py b/sysinv/sysinv/sysinv/sysinv/cert_mon/watcher.py index 42271469ef..ebe7bfcec4 100644 --- a/sysinv/sysinv/sysinv/sysinv/cert_mon/watcher.py +++ b/sysinv/sysinv/sysinv/sysinv/cert_mon/watcher.py @@ -206,6 +206,7 @@ class CertWatcher(object): self.listeners = [] self.namespace = None self.context = MonitorContext() + self.last_resource_version = None def register_listener(self, listener): return self.listeners.append(listener) @@ -216,18 +217,62 @@ class CertWatcher(object): c.verify_ssl = True Configuration.set_default(c) ccApi = client.CoreV1Api() - w = watch.Watch() + kube_watch = watch.Watch() - LOG.info('Monitor secrets in %s' % self.namespace) - for item in w.stream(ccApi.list_namespaced_secret, namespace=self.namespace): - LOG.debug('Received new event: %s' % (item)) + kwargs = {'namespace': self.namespace} + if self.last_resource_version is not None: + # Include resource version in call to watch. Ensures we start watch + # from same point of expiry. Reference: + # https://github.com/kubernetes-client/python-base/blob/ + # b4d3aad42dc23e7a6c0e5c032691f8dc385a786c/watch/watch.py#L119 + kwargs['resource_version'] = self.last_resource_version + + LOG.info('Monitor secrets in %s using resource version: %s' + % (self.namespace, self.last_resource_version)) + + for item in kube_watch.stream(ccApi.list_namespaced_secret, **kwargs): + LOG.debug('Received new event: %s, %s' + % (type(item.get('object')), item)) event_type = item.get('type') - if not event_type or event_type == 'ERROR': - # we received an unknown or error event on the watch, instead of trying to - # identify the exact error, we simply start from scratch, which should be - # always safe to do - LOG.info('Received unknown or error event on watch, restarting it') - w.stop() + if not event_type: + LOG.error( + 'Received unexpected event on watch, restarting it: %s' + % item) + self.last_resource_version = None + kube_watch.stop() + return + if event_type == 'ERROR': + # Watch will be restarted, hopefully with the retrieved last + # resourceVersion + if 'raw_object' in item and item['raw_object'].get('code') == 410: + # Expired watch. Retrieve current resource_version for use + # in watch re-registration + try: + response = ccApi.list_namespaced_secret(self.namespace) + self.last_resource_version = \ + response.metadata.resource_version + LOG.debug( + ("Setting last_resource_version: %s " + "for namespace: %s") + % (self.last_resource_version, self.namespace)) + except Exception as e: + self.last_resource_version = None + LOG.error( + "Failed to retrieve resource_version, namespace: %s" + % self.namespace) + LOG.exception(e) + + LOG.info( + ("Received expired event on watch '%s', " + "restarting with resource_version: %s") + % (self.namespace, self.last_resource_version)) + else: + # Unexpected error. Restart without resourceVersion. + self.last_resource_version = None + LOG.error( + ("Received unexpected type=ERROR event on watch, " + "restarting it: %s") % item) + kube_watch.stop() return event_data = CertUpdateEventData(item)