diff --git a/sysinv/sysinv/sysinv/sysinv/cert_mon/watcher.py b/sysinv/sysinv/sysinv/sysinv/cert_mon/watcher.py index c05c527f32..901900f818 100644 --- a/sysinv/sysinv/sysinv/sysinv/cert_mon/watcher.py +++ b/sysinv/sysinv/sysinv/sysinv/cert_mon/watcher.py @@ -206,6 +206,7 @@ class CertWatcher(object): self.listeners = [] self.namespace = None self.context = MonitorContext() + self.last_resource_version = None def register_listener(self, listener): return self.listeners.append(listener) @@ -216,18 +217,62 @@ class CertWatcher(object): c.verify_ssl = True Configuration.set_default(c) ccApi = client.CoreV1Api() - w = watch.Watch() + kube_watch = watch.Watch() - LOG.info('Monitor secrets in %s' % self.namespace) - for item in w.stream(ccApi.list_namespaced_secret, namespace=self.namespace): - LOG.debug('Received new event: %s' % (item)) + kwargs = {'namespace': self.namespace} + if self.last_resource_version is not None: + # Include resource version in call to watch. Ensures we start watch + # from same point of expiry. Reference: + # https://github.com/kubernetes-client/python-base/blob/ + # b4d3aad42dc23e7a6c0e5c032691f8dc385a786c/watch/watch.py#L119 + kwargs['resource_version'] = self.last_resource_version + + LOG.info('Monitor secrets in %s using resource version: %s' + % (self.namespace, self.last_resource_version)) + + for item in kube_watch.stream(ccApi.list_namespaced_secret, **kwargs): + LOG.debug('Received new event: %s, %s' + % (type(item.get('object')), item)) event_type = item.get('type') - if not event_type or event_type == 'ERROR': - # we received an unknown or error event on the watch, instead of trying to - # identify the exact error, we simply start from scratch, which should be - # always safe to do - LOG.info('Received unknown or error event on watch, restarting it') - w.stop() + if not event_type: + LOG.error( + 'Received unexpected event on watch, restarting it: %s' + % item) + self.last_resource_version = None + kube_watch.stop() + return + if event_type == 'ERROR': + # Watch will be restarted, hopefully with the retrieved last + # resourceVersion + if 'raw_object' in item and item['raw_object'].get('code') == 410: + # Expired watch. Retrieve current resource_version for use + # in watch re-registration + try: + response = ccApi.list_namespaced_secret(self.namespace) + self.last_resource_version = \ + response.metadata.resource_version + LOG.debug( + ("Setting last_resource_version: %s " + "for namespace: %s") + % (self.last_resource_version, self.namespace)) + except Exception as e: + self.last_resource_version = None + LOG.error( + "Failed to retrieve resource_version, namespace: %s" + % self.namespace) + LOG.exception(e) + + LOG.info( + ("Received expired event on watch '%s', " + "restarting with resource_version: %s") + % (self.namespace, self.last_resource_version)) + else: + # Unexpected error. Restart without resourceVersion. + self.last_resource_version = None + LOG.error( + ("Received unexpected type=ERROR event on watch, " + "restarting it: %s") % item) + kube_watch.stop() return event_data = CertUpdateEventData(item)