Set read timeout for any request in K8sClient

We're often contacting the K8s API through a loadbalancer (e.g. Octavia
LB in DevStack deployments, HAProxy in OpenShift) and we've often seen
they're able to drop connections silently, effectively leaving our
requests hanging forever. This got fixed in `K8sClient.watch` which
helped a lot, but we now seem to see it happening with other requests.
In order to make sure we won't block processing events for a resource
forever due to that, this commit adds read timeout to all the methods in
K8sClient.

Closes-Bug: 1897893
Change-Id: If1846ec78abc0840e7aba04565b220a1d20e5dc9
This commit is contained in:
Michał Dulko 2020-09-30 12:56:57 +02:00
parent ff0299d507
commit 44890a84d5
2 changed files with 34 additions and 40 deletions

View File

@ -78,6 +78,13 @@ class K8sClient(object):
else: else:
self.verify_server = ca_crt_file self.verify_server = ca_crt_file
self._rq_params = {
'cert': self.cert,
'verify': self.verify_server,
'timeout': (CONF.kubernetes.watch_connection_timeout,
CONF.kubernetes.watch_read_timeout),
}
def _raise_from_response(self, response): def _raise_from_response(self, response):
if response.status_code == requests.codes.not_found: if response.status_code == requests.codes.not_found:
raise exc.K8sResourceNotFound(response.text) raise exc.K8sResourceNotFound(response.text)
@ -106,9 +113,7 @@ class K8sClient(object):
header.update({'Authorization': 'Bearer %s' % self.token}) header.update({'Authorization': 'Bearer %s' % self.token})
if headers: if headers:
header.update(headers) header.update(headers)
response = self.session.get(url, cert=self.cert, response = self.session.get(url, headers=header, **self._rq_params)
verify=self.verify_server,
headers=header)
self._raise_from_response(response) self._raise_from_response(response)
result = response.json() if json else response.text result = response.json() if json else response.text
return result return result
@ -128,8 +133,7 @@ class K8sClient(object):
content_type = 'application/merge-patch+json' content_type = 'application/merge-patch+json'
url, header = self._get_url_and_header(path, content_type) url, header = self._get_url_and_header(path, content_type)
response = self.session.patch(url, json={field: data}, response = self.session.patch(url, json={field: data},
headers=header, cert=self.cert, headers=header, **self._rq_params)
verify=self.verify_server)
self._raise_from_response(response) self._raise_from_response(response)
return response.json().get('status') return response.json().get('status')
@ -155,8 +159,7 @@ class K8sClient(object):
'path': path, 'data': data}) 'path': path, 'data': data})
response = self.session.patch(url, data=jsonutils.dumps(data), response = self.session.patch(url, data=jsonutils.dumps(data),
headers=header, cert=self.cert, headers=header, **self._rq_params)
verify=self.verify_server)
self._raise_from_response(response) self._raise_from_response(response)
return response.json().get('status') return response.json().get('status')
@ -171,8 +174,7 @@ class K8sClient(object):
'value': value}] 'value': value}]
response = self.session.patch(url, data=jsonutils.dumps(data), response = self.session.patch(url, data=jsonutils.dumps(data),
headers=header, cert=self.cert, headers=header, **self._rq_params)
verify=self.verify_server)
self._raise_from_response(response) self._raise_from_response(response)
return response.json().get('status') return response.json().get('status')
@ -185,8 +187,7 @@ class K8sClient(object):
'path': '/metadata/annotations/{}'.format(annotation_name)}] 'path': '/metadata/annotations/{}'.format(annotation_name)}]
response = self.session.patch(url, data=jsonutils.dumps(data), response = self.session.patch(url, data=jsonutils.dumps(data),
headers=header, cert=self.cert, headers=header, **self._rq_params)
verify=self.verify_server)
self._raise_from_response(response) self._raise_from_response(response)
return response.json().get('status') return response.json().get('status')
@ -205,8 +206,7 @@ class K8sClient(object):
data = [{'op': 'remove', data = [{'op': 'remove',
'path': f'/metadata/annotations/{annotation_name}'}] 'path': f'/metadata/annotations/{annotation_name}'}]
response = self.session.patch(url, data=jsonutils.dumps(data), response = self.session.patch(url, data=jsonutils.dumps(data),
headers=header, cert=self.cert, headers=header, **self._rq_params)
verify=self.verify_server)
if response.ok: if response.ok:
return response.json().get('status') return response.json().get('status')
raise exc.K8sClientException(response.text) raise exc.K8sClientException(response.text)
@ -218,8 +218,8 @@ class K8sClient(object):
if self.token: if self.token:
header.update({'Authorization': 'Bearer %s' % self.token}) header.update({'Authorization': 'Bearer %s' % self.token})
response = self.session.post(url, json=body, cert=self.cert, response = self.session.post(url, json=body, headers=header,
verify=self.verify_server, headers=header) **self._rq_params)
self._raise_from_response(response) self._raise_from_response(response)
return response.json() return response.json()
@ -230,9 +230,7 @@ class K8sClient(object):
if self.token: if self.token:
header.update({'Authorization': 'Bearer %s' % self.token}) header.update({'Authorization': 'Bearer %s' % self.token})
response = self.session.delete(url, cert=self.cert, response = self.session.delete(url, headers=header, **self._rq_params)
verify=self.verify_server,
headers=header)
self._raise_from_response(response) self._raise_from_response(response)
return response.json() return response.json()
@ -259,8 +257,7 @@ class K8sClient(object):
} }
response = self.session.patch(url, json=data, headers=headers, response = self.session.patch(url, json=data, headers=headers,
cert=self.cert, **self._rq_params)
verify=self.verify_server)
if response.ok: if response.ok:
return True return True
@ -301,8 +298,7 @@ class K8sClient(object):
} }
response = self.session.patch(url, json=data, headers=headers, response = self.session.patch(url, json=data, headers=headers,
cert=self.cert, **self._rq_params)
verify=self.verify_server)
if response.ok: if response.ok:
return True return True
@ -354,8 +350,7 @@ class K8sClient(object):
metadata['resourceVersion'] = resource_version metadata['resourceVersion'] = resource_version
data = jsonutils.dumps({"metadata": metadata}, sort_keys=True) data = jsonutils.dumps({"metadata": metadata}, sort_keys=True)
response = self.session.patch(url, data=data, response = self.session.patch(url, data=data,
headers=header, cert=self.cert, headers=header, **self._rq_params)
verify=self.verify_server)
if response.ok: if response.ok:
return response.json()['metadata'].get('annotations', {}) return response.json()['metadata'].get('annotations', {})
if response.status_code == requests.codes.conflict: if response.status_code == requests.codes.conflict:
@ -387,8 +382,6 @@ class K8sClient(object):
url = self._base_url + path url = self._base_url + path
resource_version = None resource_version = None
header = {} header = {}
timeouts = (CONF.kubernetes.watch_connection_timeout,
CONF.kubernetes.watch_read_timeout)
if self.token: if self.token:
header.update({'Authorization': 'Bearer %s' % self.token}) header.update({'Authorization': 'Bearer %s' % self.token})
@ -400,9 +393,8 @@ class K8sClient(object):
params['resourceVersion'] = resource_version params['resourceVersion'] = resource_version
with contextlib.closing( with contextlib.closing(
self.session.get( self.session.get(
url, params=params, stream=True, cert=self.cert, url, params=params, stream=True, headers=header,
verify=self.verify_server, headers=header, **self._rq_params)) as response:
timeout=timeouts)) as response:
if not response.ok: if not response.ok:
raise exc.K8sClientException(response.text) raise exc.K8sClientException(response.text)
attempt = 0 attempt = 0

View File

@ -106,7 +106,7 @@ class TestK8sClient(test_base.TestCase):
'Authorization': 'Bearer {}'.format(token_content)} 'Authorization': 'Bearer {}'.format(token_content)}
m_get.assert_called_once_with( m_get.assert_called_once_with(
self.base_url + path, cert=(None, None), headers=headers, self.base_url + path, cert=(None, None), headers=headers,
verify=False) verify=False, timeout=(30, 60))
finally: finally:
os.unlink(m_cfg.kubernetes.token_file) os.unlink(m_cfg.kubernetes.token_file)
@ -123,7 +123,7 @@ class TestK8sClient(test_base.TestCase):
self.assertEqual(ret, self.client.get(path)) self.assertEqual(ret, self.client.get(path))
m_get.assert_called_once_with( m_get.assert_called_once_with(
self.base_url + path, self.base_url + path,
cert=(None, None), headers={}, verify=False) cert=(None, None), headers={}, verify=False, timeout=(30, 60))
@mock.patch('requests.sessions.Session.get') @mock.patch('requests.sessions.Session.get')
def test_get_exception(self, m_get): def test_get_exception(self, m_get):
@ -155,7 +155,8 @@ class TestK8sClient(test_base.TestCase):
path, annotations, resource_version=resource_version)) path, annotations, resource_version=resource_version))
m_patch.assert_called_once_with(self.base_url + path, m_patch.assert_called_once_with(self.base_url + path,
data=data, headers=mock.ANY, data=data, headers=mock.ANY,
cert=(None, None), verify=False) cert=(None, None), verify=False,
timeout=(30, 60))
@mock.patch('itertools.count') @mock.patch('itertools.count')
@mock.patch('requests.sessions.Session.patch') @mock.patch('requests.sessions.Session.patch')
@ -203,7 +204,7 @@ class TestK8sClient(test_base.TestCase):
mock.call(self.base_url + path, mock.call(self.base_url + path,
data=conflicting_data, data=conflicting_data,
headers=mock.ANY, headers=mock.ANY,
cert=(None, None), verify=False)]) cert=(None, None), verify=False, timeout=(30, 60))])
@mock.patch('itertools.count') @mock.patch('itertools.count')
@mock.patch('requests.sessions.Session.patch') @mock.patch('requests.sessions.Session.patch')
@ -243,11 +244,11 @@ class TestK8sClient(test_base.TestCase):
mock.call(self.base_url + path, mock.call(self.base_url + path,
data=annotating_data, data=annotating_data,
headers=mock.ANY, headers=mock.ANY,
cert=(None, None), verify=False), cert=(None, None), verify=False, timeout=(30, 60)),
mock.call(self.base_url + path, mock.call(self.base_url + path,
data=resolution_data, data=resolution_data,
headers=mock.ANY, headers=mock.ANY,
cert=(None, None), verify=False)]) cert=(None, None), verify=False, timeout=(30, 60))])
@mock.patch('itertools.count') @mock.patch('itertools.count')
@mock.patch('requests.sessions.Session.patch') @mock.patch('requests.sessions.Session.patch')
@ -287,11 +288,11 @@ class TestK8sClient(test_base.TestCase):
mock.call(self.base_url + path, mock.call(self.base_url + path,
data=conflicting_data, data=conflicting_data,
headers=mock.ANY, headers=mock.ANY,
cert=(None, None), verify=False), cert=(None, None), verify=False, timeout=(30, 60)),
mock.call(self.base_url + path, mock.call(self.base_url + path,
data=good_data, data=good_data,
headers=mock.ANY, headers=mock.ANY,
cert=(None, None), verify=False)]) cert=(None, None), verify=False, timeout=(30, 60))])
@mock.patch('itertools.count') @mock.patch('itertools.count')
@mock.patch('requests.sessions.Session.patch') @mock.patch('requests.sessions.Session.patch')
@ -318,7 +319,8 @@ class TestK8sClient(test_base.TestCase):
m_patch.assert_called_once_with(self.base_url + path, m_patch.assert_called_once_with(self.base_url + path,
data=annotate_data, data=annotate_data,
headers=mock.ANY, headers=mock.ANY,
cert=(None, None), verify=False) cert=(None, None), verify=False,
timeout=(30, 60))
@mock.patch('requests.sessions.Session.get') @mock.patch('requests.sessions.Session.get')
def test_watch(self, m_get): def test_watch(self, m_get):
@ -395,7 +397,7 @@ class TestK8sClient(test_base.TestCase):
self.assertEqual(ret, self.client.post(path, body)) self.assertEqual(ret, self.client.post(path, body))
m_post.assert_called_once_with(self.base_url + path, json=body, m_post.assert_called_once_with(self.base_url + path, json=body,
headers=mock.ANY, cert=(None, None), headers=mock.ANY, cert=(None, None),
verify=False) verify=False, timeout=(30, 60))
@mock.patch('requests.sessions.Session.post') @mock.patch('requests.sessions.Session.post')
def test_post_exception(self, m_post): def test_post_exception(self, m_post):
@ -422,7 +424,7 @@ class TestK8sClient(test_base.TestCase):
self.assertEqual(ret, self.client.delete(path)) self.assertEqual(ret, self.client.delete(path))
m_delete.assert_called_once_with(self.base_url + path, m_delete.assert_called_once_with(self.base_url + path,
headers=mock.ANY, cert=(None, None), headers=mock.ANY, cert=(None, None),
verify=False) verify=False, timeout=(30, 60))
@mock.patch('requests.sessions.Session.delete') @mock.patch('requests.sessions.Session.delete')
def test_delete_exception(self, m_delete): def test_delete_exception(self, m_delete):