kuryr-kubernetes/kuryr_kubernetes/k8s_client.py
Michał Dulko f5e8e9c3b0 Add urllib3's SSLError to expected Watcher exc
Seems like new requests lib is somehow raising
urllib3.exceptions.SSLError more often. This commit adds it to the
exceptions expected by the Watcher and silenced with a retry.

Closes-Bug: 1906498
Change-Id: I214c08c197483c00a5caecc47ef117cdd07a7652
2020-12-02 11:17:48 +01:00

413 lines
17 KiB
Python

# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import functools
import itertools
import os
import ssl
import time
from urllib import parse
import urllib3
from oslo_log import log as logging
from oslo_serialization import jsonutils
import requests
from requests import adapters
from kuryr.lib._i18n import _
from kuryr_kubernetes import config
from kuryr_kubernetes import constants
from kuryr_kubernetes import exceptions as exc
from kuryr_kubernetes import utils
CONF = config.CONF
LOG = logging.getLogger(__name__)
class K8sClient(object):
# REVISIT(ivc): replace with python-k8sclient if it could be extended
# with 'WATCH' support
def __init__(self, base_url):
self._base_url = base_url
cert_file = config.CONF.kubernetes.ssl_client_crt_file
key_file = config.CONF.kubernetes.ssl_client_key_file
ca_crt_file = config.CONF.kubernetes.ssl_ca_crt_file
self.verify_server = config.CONF.kubernetes.ssl_verify_server_crt
token_file = config.CONF.kubernetes.token_file
self.token = None
self.cert = (None, None)
# Setting higher numbers regarding connection pools as we're running
# with max of 1000 green threads.
self.session = requests.Session()
prefix = '%s://' % parse.urlparse(base_url).scheme
self.session.mount(prefix, adapters.HTTPAdapter(pool_maxsize=1000))
if token_file:
if os.path.exists(token_file):
with open(token_file, 'r') as f:
self.token = f.readline().rstrip('\n')
else:
raise RuntimeError(
_("Unable to find token_file : %s") % token_file)
else:
if cert_file and not os.path.exists(cert_file):
raise RuntimeError(
_("Unable to find ssl cert_file : %s") % cert_file)
if key_file and not os.path.exists(key_file):
raise RuntimeError(
_("Unable to find ssl key_file : %s") % key_file)
self.cert = (cert_file, key_file)
if self.verify_server:
if not ca_crt_file:
raise RuntimeError(
_("ssl_ca_crt_file cannot be None"))
elif not os.path.exists(ca_crt_file):
raise RuntimeError(
_("Unable to find ca cert_file : %s") % ca_crt_file)
else:
self.verify_server = ca_crt_file
# Let's setup defaults for our Session.
self.session.cert = self.cert
self.session.verify = self.verify_server
if self.token:
self.session.headers['Authorization'] = f'Bearer {self.token}'
# NOTE(dulek): Seems like this is the only way to set is globally.
self.session.request = functools.partial(
self.session.request, timeout=(
CONF.kubernetes.watch_connection_timeout,
CONF.kubernetes.watch_read_timeout))
def _raise_from_response(self, response):
if response.status_code == requests.codes.not_found:
raise exc.K8sResourceNotFound(response.text)
if response.status_code == requests.codes.conflict:
raise exc.K8sConflict(response.text)
if response.status_code == requests.codes.forbidden:
if 'because it is being terminated' in response.json()['message']:
raise exc.K8sNamespaceTerminating(response.text)
raise exc.K8sForbidden(response.text)
if response.status_code == requests.codes.unprocessable_entity:
# NOTE(gryf): on k8s API code 422 is also Forbidden, but specified
# to FieldValueForbidden. Perhaps there are other usages for
# throwing unprocessable entity errors in different cases.
if ('FieldValueForbidden' in response.text and
'Forbidden' in response.json()['message']):
raise exc.K8sFieldValueForbidden(response.text)
raise exc.K8sUnprocessableEntity(response.text)
if not response.ok:
raise exc.K8sClientException(response.text)
def get(self, path, json=True, headers=None):
LOG.debug("Get %(path)s", {'path': path})
url = self._base_url + path
response = self.session.get(url, headers=headers)
self._raise_from_response(response)
result = response.json() if json else response.text
return result
def _get_url_and_header(self, path, content_type):
url = self._base_url + path
header = {'Content-Type': content_type,
'Accept': 'application/json'}
return url, header
def patch(self, field, path, data):
LOG.debug("Patch %(path)s: %(data)s", {'path': path, 'data': data})
content_type = 'application/merge-patch+json'
url, header = self._get_url_and_header(path, content_type)
response = self.session.patch(url, json={field: data}, headers=header)
self._raise_from_response(response)
return response.json().get('status')
def patch_crd(self, field, path, data, action='replace'):
content_type = 'application/json-patch+json'
url, header = self._get_url_and_header(path, content_type)
if action == 'remove':
data = [{'op': action,
'path': f'/{field}/{data}'}]
else:
if data:
data = [{'op': action,
'path': f'/{field}/{crd_field}',
'value': value}
for crd_field, value in data.items()]
else:
data = [{'op': action,
'path': f'/{field}',
'value': data}]
LOG.debug("Patch %(path)s: %(data)s", {
'path': path, 'data': data})
response = self.session.patch(url, data=jsonutils.dumps(data),
headers=header)
self._raise_from_response(response)
return response.json().get('status')
def patch_node_annotations(self, node, annotation_name, value):
content_type = 'application/json-patch+json'
path = '{}/nodes/{}/'.format(constants.K8S_API_BASE, node)
value = jsonutils.dumps(value)
url, header = self._get_url_and_header(path, content_type)
data = [{'op': 'add',
'path': '/metadata/annotations/{}'.format(annotation_name),
'value': value}]
response = self.session.patch(url, data=jsonutils.dumps(data),
headers=header)
self._raise_from_response(response)
return response.json().get('status')
def remove_node_annotations(self, node, annotation_name):
content_type = 'application/json-patch+json'
path = '{}/nodes/{}/'.format(constants.K8S_API_BASE, node)
url, header = self._get_url_and_header(path, content_type)
data = [{'op': 'remove',
'path': '/metadata/annotations/{}'.format(annotation_name)}]
response = self.session.patch(url, data=jsonutils.dumps(data),
headers=header)
self._raise_from_response(response)
return response.json().get('status')
def _jsonpatch_escape(self, value):
value = value.replace('~', '~0')
value = value.replace('/', '~1')
return value
def remove_annotations(self, path, annotation_name):
LOG.debug("Remove annotations %(path)s: %(name)s",
{'path': path, 'name': annotation_name})
content_type = 'application/json-patch+json'
url, header = self._get_url_and_header(path, content_type)
annotation_name = self._jsonpatch_escape(annotation_name)
data = [{'op': 'remove',
'path': f'/metadata/annotations/{annotation_name}'}]
response = self.session.patch(url, data=jsonutils.dumps(data),
headers=header)
if response.ok:
return response.json().get('status')
raise exc.K8sClientException(response.text)
def post(self, path, body):
LOG.debug("Post %(path)s: %(body)s", {'path': path, 'body': body})
url = self._base_url + path
header = {'Content-Type': 'application/json'}
response = self.session.post(url, json=body, headers=header)
self._raise_from_response(response)
return response.json()
def delete(self, path):
LOG.debug("Delete %(path)s", {'path': path})
url = self._base_url + path
header = {'Content-Type': 'application/json'}
response = self.session.delete(url, headers=header)
self._raise_from_response(response)
return response.json()
# TODO(dulek): add_finalizer() and remove_finalizer() have some code
# duplication, but I don't see a nice way to avoid it.
def add_finalizer(self, obj, finalizer):
if finalizer in obj['metadata'].get('finalizers', []):
return True
path = obj['metadata']['selfLink']
LOG.debug(f"Add finalizer {finalizer} to {path}")
url, headers = self._get_url_and_header(
path, 'application/merge-patch+json')
for i in range(3): # Let's make sure it's not infinite loop
finalizers = obj['metadata'].get('finalizers', []).copy()
finalizers.append(finalizer)
data = {
'metadata': {
'finalizers': finalizers,
'resourceVersion': obj['metadata']['resourceVersion'],
},
}
response = self.session.patch(url, json=data, headers=headers)
if response.ok:
return True
try:
self._raise_from_response(response)
except (exc.K8sFieldValueForbidden, exc.K8sResourceNotFound):
# Object is being deleting or gone. Return.
return False
except exc.K8sConflict:
try:
obj = self.get(path)
except exc.K8sResourceNotFound:
# Object got removed before finalizer was set
return False
if finalizer in obj['metadata'].get('finalizers', []):
# Finalizer is there, return.
return True
# If after 3 iterations there's still conflict, just raise.
self._raise_from_response(response)
def remove_finalizer(self, obj, finalizer):
path = obj['metadata']['selfLink']
LOG.debug(f"Remove finalizer {finalizer} from {path}")
url, headers = self._get_url_and_header(
path, 'application/merge-patch+json')
for i in range(3): # Let's make sure it's not infinite loop
finalizers = obj['metadata'].get('finalizers', []).copy()
try:
finalizers.remove(finalizer)
except ValueError:
# Finalizer is not there, return.
return True
data = {
'metadata': {
'finalizers': finalizers,
'resourceVersion': obj['metadata']['resourceVersion'],
},
}
response = self.session.patch(url, json=data, headers=headers)
if response.ok:
return True
try:
try:
self._raise_from_response(response)
except exc.K8sConflict:
obj = self.get(path)
except (exc.K8sFieldValueForbidden, exc.K8sResourceNotFound):
# Object is being deleted or gone already, stop.
return False
# If after 3 iterations there's still conflict, just raise.
self._raise_from_response(response)
def get_loadbalancer_crd(self, obj):
name = obj['metadata']['name']
namespace = obj['metadata']['namespace']
try:
crd = self.get('{}/{}/kuryrloadbalancers/{}'.format(
constants.K8S_API_CRD_NAMESPACES, namespace,
name))
except exc.K8sResourceNotFound:
return None
except exc.K8sClientException:
LOG.exception("Kubernetes Client Exception.")
raise
return crd
def annotate(self, path, annotations, resource_version=None):
"""Pushes a resource annotation to the K8s API resource
The annotate operation is made with a PATCH HTTP request of kind:
application/merge-patch+json as described in:
https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#patch-operations # noqa
"""
LOG.debug("Annotate %(path)s: %(names)s", {
'path': path, 'names': list(annotations)})
content_type = 'application/merge-patch+json'
url, header = self._get_url_and_header(path, content_type)
while itertools.count(1):
metadata = {"annotations": annotations}
if resource_version:
metadata['resourceVersion'] = resource_version
data = jsonutils.dumps({"metadata": metadata}, sort_keys=True)
response = self.session.patch(url, data=data, headers=header)
if response.ok:
return response.json()['metadata'].get('annotations', {})
if response.status_code == requests.codes.conflict:
resource = self.get(path)
new_version = resource['metadata']['resourceVersion']
retrieved_annotations = resource['metadata'].get(
'annotations', {})
for k, v in annotations.items():
if v != retrieved_annotations.get(k):
break
else:
LOG.debug("Annotations for %(path)s already present: "
"%(names)s", {'path': path,
'names': retrieved_annotations})
return retrieved_annotations
# Retry patching with updated resourceVersion
resource_version = new_version
continue
LOG.error("Exception response, headers: %(headers)s, "
"content: %(content)s, text: %(text)s"
% {'headers': response.headers,
'content': response.content, 'text': response.text})
self._raise_from_response(response)
def watch(self, path):
url = self._base_url + path
resource_version = None
attempt = 0
while True:
try:
params = {'watch': 'true'}
if resource_version:
params['resourceVersion'] = resource_version
with contextlib.closing(
self.session.get(
url, params=params, stream=True)) as response:
if not response.ok:
raise exc.K8sClientException(response.text)
attempt = 0
for line in response.iter_lines():
line = line.decode('utf-8').strip()
if line:
line_dict = jsonutils.loads(line)
yield line_dict
# Saving the resourceVersion in case of a restart.
# At this point it's safely passed to handler.
m = line_dict.get('object', {}).get('metadata', {})
resource_version = m.get('resourceVersion', None)
except (requests.ReadTimeout, requests.ConnectionError,
ssl.SSLError, requests.exceptions.ChunkedEncodingError,
urllib3.exceptions.SSLError):
t = utils.exponential_backoff(attempt)
log = LOG.debug
if attempt > 0:
# Only make it a warning if it's happening again, no need
# to inform about all the read timeouts.
log = LOG.warning
log('Connection error when watching %s. Retrying in %ds with '
'resourceVersion=%s', path, t,
params.get('resourceVersion'))
time.sleep(t)
attempt += 1