Merge "Use resourceVersion when re-registering expired watch"
This commit is contained in:
commit
ea64d0ae3a
|
@ -206,6 +206,7 @@ class CertWatcher(object):
|
|||
self.listeners = []
|
||||
self.namespace = None
|
||||
self.context = MonitorContext()
|
||||
self.last_resource_version = None
|
||||
|
||||
def register_listener(self, listener):
|
||||
return self.listeners.append(listener)
|
||||
|
@ -216,18 +217,62 @@ class CertWatcher(object):
|
|||
c.verify_ssl = True
|
||||
Configuration.set_default(c)
|
||||
ccApi = client.CoreV1Api()
|
||||
w = watch.Watch()
|
||||
kube_watch = watch.Watch()
|
||||
|
||||
LOG.info('Monitor secrets in %s' % self.namespace)
|
||||
for item in w.stream(ccApi.list_namespaced_secret, namespace=self.namespace):
|
||||
LOG.debug('Received new event: %s' % (item))
|
||||
kwargs = {'namespace': self.namespace}
|
||||
if self.last_resource_version is not None:
|
||||
# Include resource version in call to watch. Ensures we start watch
|
||||
# from same point of expiry. Reference:
|
||||
# https://github.com/kubernetes-client/python-base/blob/
|
||||
# b4d3aad42dc23e7a6c0e5c032691f8dc385a786c/watch/watch.py#L119
|
||||
kwargs['resource_version'] = self.last_resource_version
|
||||
|
||||
LOG.info('Monitor secrets in %s using resource version: %s'
|
||||
% (self.namespace, self.last_resource_version))
|
||||
|
||||
for item in kube_watch.stream(ccApi.list_namespaced_secret, **kwargs):
|
||||
LOG.debug('Received new event: %s, %s'
|
||||
% (type(item.get('object')), item))
|
||||
event_type = item.get('type')
|
||||
if not event_type or event_type == 'ERROR':
|
||||
# we received an unknown or error event on the watch, instead of trying to
|
||||
# identify the exact error, we simply start from scratch, which should be
|
||||
# always safe to do
|
||||
LOG.info('Received unknown or error event on watch, restarting it')
|
||||
w.stop()
|
||||
if not event_type:
|
||||
LOG.error(
|
||||
'Received unexpected event on watch, restarting it: %s'
|
||||
% item)
|
||||
self.last_resource_version = None
|
||||
kube_watch.stop()
|
||||
return
|
||||
if event_type == 'ERROR':
|
||||
# Watch will be restarted, hopefully with the retrieved last
|
||||
# resourceVersion
|
||||
if 'raw_object' in item and item['raw_object'].get('code') == 410:
|
||||
# Expired watch. Retrieve current resource_version for use
|
||||
# in watch re-registration
|
||||
try:
|
||||
response = ccApi.list_namespaced_secret(self.namespace)
|
||||
self.last_resource_version = \
|
||||
response.metadata.resource_version
|
||||
LOG.debug(
|
||||
("Setting last_resource_version: %s "
|
||||
"for namespace: %s")
|
||||
% (self.last_resource_version, self.namespace))
|
||||
except Exception as e:
|
||||
self.last_resource_version = None
|
||||
LOG.error(
|
||||
"Failed to retrieve resource_version, namespace: %s"
|
||||
% self.namespace)
|
||||
LOG.exception(e)
|
||||
|
||||
LOG.info(
|
||||
("Received expired event on watch '%s', "
|
||||
"restarting with resource_version: %s")
|
||||
% (self.namespace, self.last_resource_version))
|
||||
else:
|
||||
# Unexpected error. Restart without resourceVersion.
|
||||
self.last_resource_version = None
|
||||
LOG.error(
|
||||
("Received unexpected type=ERROR event on watch, "
|
||||
"restarting it: %s") % item)
|
||||
kube_watch.stop()
|
||||
return
|
||||
|
||||
event_data = CertUpdateEventData(item)
|
||||
|
|
Loading…
Reference in New Issue