kuryr-kubernetes/kuryr_kubernetes/k8s_client.py

425 lines
17 KiB
Python

# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import itertools
import os
import ssl
import time
from urllib import parse
from oslo_log import log as logging
from oslo_serialization import jsonutils
import requests
from requests import adapters
from kuryr.lib._i18n import _
from kuryr_kubernetes import config
from kuryr_kubernetes import constants
from kuryr_kubernetes import exceptions as exc
from kuryr_kubernetes import utils
CONF = config.CONF
LOG = logging.getLogger(__name__)
class K8sClient(object):
# REVISIT(ivc): replace with python-k8sclient if it could be extended
# with 'WATCH' support
def __init__(self, base_url):
self._base_url = base_url
cert_file = config.CONF.kubernetes.ssl_client_crt_file
key_file = config.CONF.kubernetes.ssl_client_key_file
ca_crt_file = config.CONF.kubernetes.ssl_ca_crt_file
self.verify_server = config.CONF.kubernetes.ssl_verify_server_crt
token_file = config.CONF.kubernetes.token_file
self.token = None
self.cert = (None, None)
# Setting higher numbers regarding connection pools as we're running
# with max of 1000 green threads.
self.session = requests.Session()
prefix = '%s://' % parse.urlparse(base_url).scheme
self.session.mount(prefix, adapters.HTTPAdapter(pool_maxsize=1000))
if token_file:
if os.path.exists(token_file):
with open(token_file, 'r') as f:
self.token = f.readline().rstrip('\n')
else:
raise RuntimeError(
_("Unable to find token_file : %s") % token_file)
else:
if cert_file and not os.path.exists(cert_file):
raise RuntimeError(
_("Unable to find ssl cert_file : %s") % cert_file)
if key_file and not os.path.exists(key_file):
raise RuntimeError(
_("Unable to find ssl key_file : %s") % key_file)
self.cert = (cert_file, key_file)
if self.verify_server:
if not ca_crt_file:
raise RuntimeError(
_("ssl_ca_crt_file cannot be None"))
elif not os.path.exists(ca_crt_file):
raise RuntimeError(
_("Unable to find ca cert_file : %s") % ca_crt_file)
else:
self.verify_server = ca_crt_file
def _raise_from_response(self, response):
if response.status_code == requests.codes.not_found:
raise exc.K8sResourceNotFound(response.text)
if response.status_code == requests.codes.conflict:
raise exc.K8sConflict(response.text)
if response.status_code == requests.codes.forbidden:
if 'because it is being terminated' in response.json()['message']:
raise exc.K8sNamespaceTerminating(response.text)
raise exc.K8sForbidden(response.text)
if response.status_code == requests.codes.unprocessable_entity:
raise exc.K8sUnprocessableEntity(response.text)
if not response.ok:
raise exc.K8sClientException(response.text)
def get(self, path, json=True, headers=None):
LOG.debug("Get %(path)s", {'path': path})
url = self._base_url + path
header = {}
if self.token:
header.update({'Authorization': 'Bearer %s' % self.token})
if headers:
header.update(headers)
response = self.session.get(url, cert=self.cert,
verify=self.verify_server,
headers=header)
self._raise_from_response(response)
result = response.json() if json else response.text
return result
def _get_url_and_header(self, path, content_type):
url = self._base_url + path
header = {'Content-Type': content_type,
'Accept': 'application/json'}
if self.token:
header.update({'Authorization': 'Bearer %s' % self.token})
return url, header
def patch(self, field, path, data):
LOG.debug("Patch %(path)s: %(data)s", {
'path': path, 'data': data})
content_type = 'application/merge-patch+json'
url, header = self._get_url_and_header(path, content_type)
response = self.session.patch(url, json={field: data},
headers=header, cert=self.cert,
verify=self.verify_server)
self._raise_from_response(response)
return response.json().get('status')
def patch_crd(self, field, path, data, action='replace'):
content_type = 'application/json-patch+json'
url, header = self._get_url_and_header(path, content_type)
if action == 'remove':
data = [{'op': action,
'path': f'/{field}/{data}'}]
else:
if data:
data = [{'op': action,
'path': f'/{field}/{crd_field}',
'value': value}
for crd_field, value in data.items()]
else:
data = [{'op': action,
'path': f'/{field}',
'value': data}]
LOG.debug("Patch %(path)s: %(data)s", {
'path': path, 'data': data})
response = self.session.patch(url, data=jsonutils.dumps(data),
headers=header, cert=self.cert,
verify=self.verify_server)
self._raise_from_response(response)
return response.json().get('status')
def patch_node_annotations(self, node, annotation_name, value):
content_type = 'application/json-patch+json'
path = '{}/nodes/{}/'.format(constants.K8S_API_BASE, node)
value = jsonutils.dumps(value)
url, header = self._get_url_and_header(path, content_type)
data = [{'op': 'add',
'path': '/metadata/annotations/{}'.format(annotation_name),
'value': value}]
response = self.session.patch(url, data=jsonutils.dumps(data),
headers=header, cert=self.cert,
verify=self.verify_server)
self._raise_from_response(response)
return response.json().get('status')
def remove_node_annotations(self, node, annotation_name):
content_type = 'application/json-patch+json'
path = '{}/nodes/{}/'.format(constants.K8S_API_BASE, node)
url, header = self._get_url_and_header(path, content_type)
data = [{'op': 'remove',
'path': '/metadata/annotations/{}'.format(annotation_name)}]
response = self.session.patch(url, data=jsonutils.dumps(data),
headers=header, cert=self.cert,
verify=self.verify_server)
self._raise_from_response(response)
return response.json().get('status')
def _jsonpatch_escape(self, value):
value = value.replace('~', '~0')
value = value.replace('/', '~1')
return value
def remove_annotations(self, path, annotation_name):
LOG.debug("Remove annotations %(path)s: %(name)s",
{'path': path, 'name': annotation_name})
content_type = 'application/json-patch+json'
url, header = self._get_url_and_header(path, content_type)
annotation_name = self._jsonpatch_escape(annotation_name)
data = [{'op': 'remove',
'path': f'/metadata/annotations/{annotation_name}'}]
response = self.session.patch(url, data=jsonutils.dumps(data),
headers=header, cert=self.cert,
verify=self.verify_server)
if response.ok:
return response.json().get('status')
raise exc.K8sClientException(response.text)
def post(self, path, body):
LOG.debug("Post %(path)s: %(body)s", {'path': path, 'body': body})
url = self._base_url + path
header = {'Content-Type': 'application/json'}
if self.token:
header.update({'Authorization': 'Bearer %s' % self.token})
response = self.session.post(url, json=body, cert=self.cert,
verify=self.verify_server, headers=header)
self._raise_from_response(response)
return response.json()
def delete(self, path):
LOG.debug("Delete %(path)s", {'path': path})
url = self._base_url + path
header = {'Content-Type': 'application/json'}
if self.token:
header.update({'Authorization': 'Bearer %s' % self.token})
response = self.session.delete(url, cert=self.cert,
verify=self.verify_server,
headers=header)
self._raise_from_response(response)
return response.json()
# TODO(dulek): add_finalizer() and remove_finalizer() have some code
# duplication, but I don't see a nice way to avoid it.
def add_finalizer(self, obj, finalizer):
if finalizer in obj['metadata'].get('finalizers', []):
return True
path = obj['metadata']['selfLink']
LOG.debug(f"Add finalizer {finalizer} to {path}")
url, headers = self._get_url_and_header(
path, 'application/merge-patch+json')
for i in range(3): # Let's make sure it's not infinite loop
finalizers = obj['metadata'].get('finalizers', []).copy()
finalizers.append(finalizer)
data = {
'metadata': {
'finalizers': finalizers,
'resourceVersion': obj['metadata']['resourceVersion'],
},
}
response = self.session.patch(url, json=data, headers=headers,
cert=self.cert,
verify=self.verify_server)
if response.ok:
return True
try:
self._raise_from_response(response)
except (exc.K8sForbidden, exc.K8sResourceNotFound):
# Object is being deleting or gone. Return.
return False
except exc.K8sConflict:
obj = self.get(path)
if finalizer in obj['metadata'].get('finalizers', []):
# Finalizer is there, return.
return True
# If after 3 iterations there's still conflict, just raise.
self._raise_from_response(response)
def remove_finalizer(self, obj, finalizer):
path = obj['metadata']['selfLink']
LOG.debug(f"Remove finalizer {finalizer} from {path}")
url, headers = self._get_url_and_header(
path, 'application/merge-patch+json')
for i in range(3): # Let's make sure it's not infinite loop
finalizers = obj['metadata'].get('finalizers', []).copy()
try:
finalizers.remove(finalizer)
except ValueError:
# Finalizer is not there, return.
return True
data = {
'metadata': {
'finalizers': finalizers,
'resourceVersion': obj['metadata']['resourceVersion'],
},
}
response = self.session.patch(url, json=data, headers=headers,
cert=self.cert,
verify=self.verify_server)
if response.ok:
return True
try:
try:
self._raise_from_response(response)
except exc.K8sConflict:
obj = self.get(path)
except exc.K8sResourceNotFound:
# Object is gone already, stop.
return False
# If after 3 iterations there's still conflict, just raise.
self._raise_from_response(response)
def get_loadbalancer_crd(self, obj):
name = obj['metadata']['name']
namespace = obj['metadata']['namespace']
try:
crd = self.get('{}/{}/kuryrloadbalancers/{}'.format(
constants.K8S_API_CRD_NAMESPACES, namespace,
name))
except exc.K8sResourceNotFound:
return None
except exc.K8sClientException:
LOG.exception("Kubernetes Client Exception.")
raise
return crd
def annotate(self, path, annotations, resource_version=None):
"""Pushes a resource annotation to the K8s API resource
The annotate operation is made with a PATCH HTTP request of kind:
application/merge-patch+json as described in:
https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#patch-operations # noqa
"""
LOG.debug("Annotate %(path)s: %(names)s", {
'path': path, 'names': list(annotations)})
content_type = 'application/merge-patch+json'
url, header = self._get_url_and_header(path, content_type)
while itertools.count(1):
metadata = {"annotations": annotations}
if resource_version:
metadata['resourceVersion'] = resource_version
data = jsonutils.dumps({"metadata": metadata}, sort_keys=True)
response = self.session.patch(url, data=data,
headers=header, cert=self.cert,
verify=self.verify_server)
if response.ok:
return response.json()['metadata'].get('annotations', {})
if response.status_code == requests.codes.conflict:
resource = self.get(path)
new_version = resource['metadata']['resourceVersion']
retrieved_annotations = resource['metadata'].get(
'annotations', {})
for k, v in annotations.items():
if v != retrieved_annotations.get(k):
break
else:
LOG.debug("Annotations for %(path)s already present: "
"%(names)s", {'path': path,
'names': retrieved_annotations})
return retrieved_annotations
# Retry patching with updated resourceVersion
resource_version = new_version
continue
LOG.error("Exception response, headers: %(headers)s, "
"content: %(content)s, text: %(text)s"
% {'headers': response.headers,
'content': response.content, 'text': response.text})
self._raise_from_response(response)
def watch(self, path):
url = self._base_url + path
resource_version = None
header = {}
timeouts = (CONF.kubernetes.watch_connection_timeout,
CONF.kubernetes.watch_read_timeout)
if self.token:
header.update({'Authorization': 'Bearer %s' % self.token})
attempt = 0
while True:
try:
params = {'watch': 'true'}
if resource_version:
params['resourceVersion'] = resource_version
with contextlib.closing(
self.session.get(
url, params=params, stream=True, cert=self.cert,
verify=self.verify_server, headers=header,
timeout=timeouts)) as response:
if not response.ok:
raise exc.K8sClientException(response.text)
attempt = 0
for line in response.iter_lines():
line = line.decode('utf-8').strip()
if line:
line_dict = jsonutils.loads(line)
yield line_dict
# Saving the resourceVersion in case of a restart.
# At this point it's safely passed to handler.
m = line_dict.get('object', {}).get('metadata', {})
resource_version = m.get('resourceVersion', None)
except (requests.ReadTimeout, requests.ConnectionError,
ssl.SSLError, requests.exceptions.ChunkedEncodingError):
t = utils.exponential_backoff(attempt)
log = LOG.debug
if attempt > 0:
# Only make it a warning if it's happening again, no need
# to inform about all the read timeouts.
log = LOG.warning
log('Connection error when watching %s. Retrying in %ds with '
'resourceVersion=%s', path, t,
params.get('resourceVersion'))
time.sleep(t)
attempt += 1