Files
kuryr-kubernetes/kuryr_kubernetes/tests/unit/test_k8s_client.py
Michał Dulko 51150ed2cd Timeout connections when watching K8s API
Turns out that without `timeout` parameter, requests library is unable
to see connection interruption if those were "silent" (i.e. firewall
blocked the connection for a period long enough for K8s API to drop the
connection). This can be fixed by adding the `timeout` parameter to the
`requests.get()` invocation in `watch()` and retrying on receiving
`requests.ReadTimeout` exception.

This commit does so by setting the connection and read timeouts to 30
and 60 respectively and implementing the retry logic using last seen
resourceVersion to restart the watching connection.

Change-Id: I19df95aaae404b5a936c469085b1f80f9eb99e2e
Closes-Bug: 1842689
2019-09-06 09:51:33 +02:00

431 lines
17 KiB
Python

# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import mock
import os
import tempfile
from oslo_serialization import jsonutils
import requests
from kuryr_kubernetes import exceptions as exc
from kuryr_kubernetes import k8s_client
from kuryr_kubernetes.tests import base as test_base
class TestK8sClient(test_base.TestCase):
def setUp(self):
super(TestK8sClient, self).setUp()
self.base_url = 'http://127.0.0.1:12345'
self.client = k8s_client.K8sClient(self.base_url)
default_cert = (None, None)
default_token = None
self.assertEqual(default_cert, self.client.cert)
self.assertEqual(False, self.client.verify_server)
self.assertEqual(default_token, self.client.token)
@mock.patch('os.path.exists')
@mock.patch('kuryr_kubernetes.config.CONF')
def test_https_client_init(self, m_cfg, m_exist):
m_cfg.kubernetes.ssl_client_crt_file = 'dummy_crt_file_path'
m_cfg.kubernetes.ssl_client_key_file = 'dummy_key_file_path'
m_cfg.kubernetes.ssl_ca_crt_file = 'dummy_ca_file_path'
m_cfg.kubernetes.token_file = None
m_cfg.kubernetes.ssl_verify_server_crt = True
m_exist.return_value = True
test_client = k8s_client.K8sClient(self.base_url)
cert = ('dummy_crt_file_path', 'dummy_key_file_path')
self.assertEqual(cert, test_client.cert)
self.assertEqual('dummy_ca_file_path', test_client.verify_server)
@mock.patch('kuryr_kubernetes.config.CONF')
def test_https_client_init_invalid_client_crt_path(self, m_cfg):
m_cfg.kubernetes.ssl_client_crt_file = 'dummy_crt_file_path'
m_cfg.kubernetes.ssl_client_key_file = 'dummy_key_file_path'
m_cfg.kubernetes.token_file = None
self.assertRaises(RuntimeError, k8s_client.K8sClient, self.base_url)
@mock.patch('os.path.exists')
@mock.patch('kuryr_kubernetes.config.CONF')
def test_https_client_init_invalid_ca_path(self, m_cfg, m_exist):
m_cfg.kubernetes.ssl_client_crt_file = 'dummy_crt_file_path'
m_cfg.kubernetes.ssl_client_key_file = 'dummy_key_file_path'
m_cfg.kubernetes.ssl_ca_crt_file = None
m_cfg.kubernetes.ssl_verify_server_crt = True
m_cfg.kubernetes.token_file = None
m_exist.return_value = True
self.assertRaises(RuntimeError, k8s_client.K8sClient, self.base_url)
@mock.patch('requests.get')
@mock.patch('kuryr_kubernetes.config.CONF')
def test_bearer_token(self, m_cfg, m_get):
token_content = (
"eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3Nl"
"cnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc"
"3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bn"
"Qvc2VjcmV0Lm5hbWUiOiJkZWZhdWx0LXRva2VuLWh4M3QxIiwia3ViZXJuZXRlcy5"
"pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImRlZmF1bHQi"
"LCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51a"
"WQiOiIxYTkyM2ZmNi00MDkyLTExZTctOTMwYi1mYTE2M2VkY2ViMDUiLCJzdWIiOi"
"JzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZS1zeXN0ZW06ZGVmYXVsdCJ9.lzcPef"
"DQ-uzF5cD-5pLwTKpRvtvvxKB4LX8TLymrPLMTth8WGr1vT6jteJPmLiDZM2C5dZI"
"iFJpOw4LL1XLullik-ls-CmnTWq97NvlW1cZolC0mNyRz6JcL7gkH8WfUSjLA7x80"
"ORalanUxtl9-ghMGKCtKIACAgvr5gGT4iznGYQQRx_hKURs4O6Js5vhwNM6UuOKeW"
"GDDAlhgHMG0u59z3bhiBLl6jbQktZsu8c3diXniQb3sYqYQcGKUm1IQFujyA_ByDb"
"5GUtCv1BOPL_-IjYtvdJD8ZzQ_UnPFoYQklpDyJLB7_7qCGcfVEQbnSCh907NdKo4"
"w_8Wkn2y-Tg")
token_file = tempfile.NamedTemporaryFile(mode="w+t", delete=False)
try:
m_cfg.kubernetes.token_file = token_file.name
token_file.write(token_content)
token_file.close()
m_cfg.kubernetes.ssl_verify_server_crt = False
path = '/test'
client = k8s_client.K8sClient(self.base_url)
client.get(path)
headers = {
'Authorization': 'Bearer {}'.format(token_content)}
m_get.assert_called_once_with(
self.base_url + path, cert=(None, None), headers=headers,
verify=False)
finally:
os.unlink(m_cfg.kubernetes.token_file)
@mock.patch('requests.get')
def test_get(self, m_get):
path = '/test'
ret = {'test': 'value'}
m_resp = mock.MagicMock()
m_resp.ok = True
m_resp.json.return_value = ret
m_get.return_value = m_resp
self.assertEqual(ret, self.client.get(path))
m_get.assert_called_once_with(
self.base_url + path,
cert=(None, None), headers={}, verify=False)
@mock.patch('requests.get')
def test_get_exception(self, m_get):
path = '/test'
m_resp = mock.MagicMock()
m_resp.ok = False
m_get.return_value = m_resp
self.assertRaises(exc.K8sClientException, self.client.get, path)
@mock.patch('itertools.count')
@mock.patch('requests.patch')
def test_annotate(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
annotations = {'a1': 'v1', 'a2': 'v2'}
resource_version = "123"
ret = {'metadata': {'annotations': annotations,
"resourceVersion": resource_version}}
data = jsonutils.dumps(ret, sort_keys=True)
m_resp = mock.MagicMock()
m_resp.ok = True
m_resp.json.return_value = ret
m_patch.return_value = m_resp
self.assertEqual(annotations, self.client.annotate(
path, annotations, resource_version=resource_version))
m_patch.assert_called_once_with(self.base_url + path,
data=data, headers=mock.ANY,
cert=(None, None), verify=False)
@mock.patch('itertools.count')
@mock.patch('requests.patch')
def test_annotate_exception(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
m_resp = mock.MagicMock()
m_resp.ok = False
m_patch.return_value = m_resp
self.assertRaises(exc.K8sClientException, self.client.annotate,
path, {})
@mock.patch('itertools.count')
@mock.patch('requests.patch')
def test_annotate_diff_resource_vers_no_conflict(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
annotations = {'a1': 'v1', 'a2': 'v2'}
resource_version = "123"
new_resource_version = "456"
conflicting_obj = {'metadata': {
'annotations': annotations,
'resourceVersion': resource_version}}
good_obj = {'metadata': {
'annotations': annotations,
'resourceVersion': new_resource_version}}
conflicting_data = jsonutils.dumps(conflicting_obj, sort_keys=True)
m_resp_conflict = mock.MagicMock()
m_resp_conflict.ok = False
m_resp_conflict.status_code = requests.codes.conflict
m_resp_good = mock.MagicMock()
m_resp_good.ok = True
m_resp_good.json.return_value = conflicting_obj
m_patch.side_effect = [m_resp_conflict, m_resp_good]
with mock.patch.object(self.client, 'get') as m_get:
m_get.return_value = good_obj
self.assertEqual(annotations, self.client.annotate(
path, annotations, resource_version=resource_version))
m_patch.assert_has_calls([
mock.call(self.base_url + path,
data=conflicting_data,
headers=mock.ANY,
cert=(None, None), verify=False)])
@mock.patch('itertools.count')
@mock.patch('requests.patch')
def test_annotate_diff_resource_vers_no_annotation(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
annotations = {'a1': 'v1', 'a2': 'v2'}
annotating_resource_version = '123'
annotating_obj = {'metadata': {
'annotations': annotations,
'resourceVersion': annotating_resource_version}}
annotating_data = jsonutils.dumps(annotating_obj, sort_keys=True)
new_resource_version = '456'
new_obj = {'metadata': {
'resourceVersion': new_resource_version}}
resolution_obj = annotating_obj.copy()
resolution_obj['metadata']['resourceVersion'] = new_resource_version
resolution_data = jsonutils.dumps(resolution_obj, sort_keys=True)
m_resp_conflict = mock.MagicMock()
m_resp_conflict.ok = False
m_resp_conflict.status_code = requests.codes.conflict
m_resp_good = mock.MagicMock()
m_resp_good.ok = True
m_resp_good.json.return_value = resolution_obj
m_patch.side_effect = (m_resp_conflict, m_resp_good)
with mock.patch.object(self.client, 'get') as m_get:
m_get.return_value = new_obj
self.assertEqual(annotations, self.client.annotate(
path, annotations,
resource_version=annotating_resource_version))
m_patch.assert_has_calls([
mock.call(self.base_url + path,
data=annotating_data,
headers=mock.ANY,
cert=(None, None), verify=False),
mock.call(self.base_url + path,
data=resolution_data,
headers=mock.ANY,
cert=(None, None), verify=False)])
@mock.patch('itertools.count')
@mock.patch('requests.patch')
def test_annotate_diff_resource_vers_conflict(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
annotations = {'a1': 'v1', 'a2': 'v2'}
resource_version = "123"
new_resource_version = "456"
conflicting_obj = {'metadata': {
'annotations': annotations,
'resourceVersion': resource_version}}
actual_obj = {'metadata': {
'annotations': {'a1': 'v2'},
'resourceVersion': new_resource_version}}
good_obj = {'metadata': {
'annotations': annotations,
'resourceVersion': new_resource_version}}
conflicting_data = jsonutils.dumps(conflicting_obj, sort_keys=True)
good_data = jsonutils.dumps(good_obj, sort_keys=True)
m_resp_conflict = mock.MagicMock()
m_resp_conflict.ok = False
m_resp_conflict.status_code = requests.codes.conflict
m_patch.return_value = m_resp_conflict
m_resp_good = mock.MagicMock()
m_resp_good.ok = True
m_resp_good.json.return_value = conflicting_obj
m_patch.side_effect = [m_resp_conflict, m_resp_good]
with mock.patch.object(self.client, 'get') as m_get:
m_get.return_value = actual_obj
self.assertEqual(annotations, self.client.annotate(
path, annotations,
resource_version=resource_version))
m_patch.assert_has_calls([
mock.call(self.base_url + path,
data=conflicting_data,
headers=mock.ANY,
cert=(None, None), verify=False),
mock.call(self.base_url + path,
data=good_data,
headers=mock.ANY,
cert=(None, None), verify=False)])
@mock.patch('itertools.count')
@mock.patch('requests.patch')
def test_annotate_resource_not_found(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
annotations = {'a1': 'v1', 'a2': 'v2'}
resource_version = "123"
annotate_obj = {'metadata': {
'annotations': annotations,
'resourceVersion': resource_version}}
annotate_data = jsonutils.dumps(annotate_obj, sort_keys=True)
m_resp_not_found = mock.MagicMock()
m_resp_not_found.ok = False
m_resp_not_found.status_code = requests.codes.not_found
m_patch.return_value = m_resp_not_found
self.assertRaises(exc.K8sResourceNotFound,
self.client.annotate,
path,
annotations,
resource_version=resource_version)
m_patch.assert_called_once_with(self.base_url + path,
data=annotate_data,
headers=mock.ANY,
cert=(None, None), verify=False)
@mock.patch('requests.get')
def test_watch(self, m_get):
path = '/test'
data = [{'obj': 'obj%s' % i} for i in range(3)]
lines = [jsonutils.dump_as_bytes(i) for i in data]
m_resp = mock.MagicMock()
m_resp.ok = True
m_resp.iter_lines.return_value = lines
m_get.return_value = m_resp
cycles = 3
self.assertEqual(
data * cycles,
list(itertools.islice(self.client.watch(path),
len(data) * cycles)))
self.assertEqual(cycles, m_get.call_count)
self.assertEqual(cycles, m_resp.close.call_count)
m_get.assert_called_with(self.base_url + path, headers={}, stream=True,
params={'watch': 'true'}, cert=(None, None),
verify=False, timeout=(30, 60))
@mock.patch('requests.get')
def test_watch_restart(self, m_get):
path = '/test'
data = [{'object': {'metadata': {'name': 'obj%s' % i,
'resourceVersion': i}}}
for i in range(3)]
lines = [jsonutils.dump_as_bytes(i) for i in data]
m_resp = mock.MagicMock()
m_resp.ok = True
m_resp.iter_lines.side_effect = [lines, requests.ReadTimeout, lines]
m_get.return_value = m_resp
self.assertEqual(data * 2,
list(itertools.islice(self.client.watch(path),
len(data) * 2)))
self.assertEqual(3, m_get.call_count)
self.assertEqual(3, m_resp.close.call_count)
m_get.assert_any_call(
self.base_url + path, headers={}, stream=True,
params={"watch": "true"}, cert=(None, None), verify=False,
timeout=(30, 60))
m_get.assert_any_call(
self.base_url + path, headers={}, stream=True,
params={"watch": "true", "resourceVersion": 2}, cert=(None, None),
verify=False, timeout=(30, 60))
@mock.patch('requests.get')
def test_watch_exception(self, m_get):
path = '/test'
m_resp = mock.MagicMock()
m_resp.ok = False
m_get.return_value = m_resp
self.assertRaises(exc.K8sClientException, next,
self.client.watch(path))
@mock.patch('requests.post')
def test_post(self, m_post):
path = '/test'
body = {'test': 'body'}
ret = {'test': 'value'}
m_resp = mock.MagicMock()
m_resp.ok = True
m_resp.json.return_value = ret
m_post.return_value = m_resp
self.assertEqual(ret, self.client.post(path, body))
m_post.assert_called_once_with(self.base_url + path, json=body,
headers=mock.ANY, cert=(None, None),
verify=False)
@mock.patch('requests.post')
def test_post_exception(self, m_post):
path = '/test'
body = {'test': 'body'}
m_resp = mock.MagicMock()
m_resp.ok = False
m_post.return_value = m_resp
self.assertRaises(exc.K8sClientException,
self.client.post, path, body)
@mock.patch('requests.delete')
def test_delete(self, m_delete):
path = '/test'
ret = {'test': 'value'}
m_resp = mock.MagicMock()
m_resp.ok = True
m_resp.json.return_value = ret
m_delete.return_value = m_resp
self.assertEqual(ret, self.client.delete(path))
m_delete.assert_called_once_with(self.base_url + path,
headers=mock.ANY, cert=(None, None),
verify=False)
@mock.patch('requests.delete')
def test_delete_exception(self, m_delete):
path = '/test'
m_resp = mock.MagicMock()
m_resp.ok = False
m_delete.return_value = m_resp
self.assertRaises(exc.K8sClientException,
self.client.delete, path)