diff --git a/kuryr_kubernetes/k8s_client.py b/kuryr_kubernetes/k8s_client.py index 5fe5ea65d..a17d5b8a8 100644 --- a/kuryr_kubernetes/k8s_client.py +++ b/kuryr_kubernetes/k8s_client.py @@ -78,6 +78,13 @@ class K8sClient(object): else: self.verify_server = ca_crt_file + self._rq_params = { + 'cert': self.cert, + 'verify': self.verify_server, + 'timeout': (CONF.kubernetes.watch_connection_timeout, + CONF.kubernetes.watch_read_timeout), + } + def _raise_from_response(self, response): if response.status_code == requests.codes.not_found: raise exc.K8sResourceNotFound(response.text) @@ -106,9 +113,7 @@ class K8sClient(object): header.update({'Authorization': 'Bearer %s' % self.token}) if headers: header.update(headers) - response = self.session.get(url, cert=self.cert, - verify=self.verify_server, - headers=header) + response = self.session.get(url, headers=header, **self._rq_params) self._raise_from_response(response) result = response.json() if json else response.text return result @@ -128,8 +133,7 @@ class K8sClient(object): content_type = 'application/merge-patch+json' url, header = self._get_url_and_header(path, content_type) response = self.session.patch(url, json={field: data}, - headers=header, cert=self.cert, - verify=self.verify_server) + headers=header, **self._rq_params) self._raise_from_response(response) return response.json().get('status') @@ -155,8 +159,7 @@ class K8sClient(object): 'path': path, 'data': data}) response = self.session.patch(url, data=jsonutils.dumps(data), - headers=header, cert=self.cert, - verify=self.verify_server) + headers=header, **self._rq_params) self._raise_from_response(response) return response.json().get('status') @@ -171,8 +174,7 @@ class K8sClient(object): 'value': value}] response = self.session.patch(url, data=jsonutils.dumps(data), - headers=header, cert=self.cert, - verify=self.verify_server) + headers=header, **self._rq_params) self._raise_from_response(response) return response.json().get('status') @@ -185,8 +187,7 @@ class K8sClient(object): 'path': '/metadata/annotations/{}'.format(annotation_name)}] response = self.session.patch(url, data=jsonutils.dumps(data), - headers=header, cert=self.cert, - verify=self.verify_server) + headers=header, **self._rq_params) self._raise_from_response(response) return response.json().get('status') @@ -205,8 +206,7 @@ class K8sClient(object): data = [{'op': 'remove', 'path': f'/metadata/annotations/{annotation_name}'}] response = self.session.patch(url, data=jsonutils.dumps(data), - headers=header, cert=self.cert, - verify=self.verify_server) + headers=header, **self._rq_params) if response.ok: return response.json().get('status') raise exc.K8sClientException(response.text) @@ -218,8 +218,8 @@ class K8sClient(object): if self.token: header.update({'Authorization': 'Bearer %s' % self.token}) - response = self.session.post(url, json=body, cert=self.cert, - verify=self.verify_server, headers=header) + response = self.session.post(url, json=body, headers=header, + **self._rq_params) self._raise_from_response(response) return response.json() @@ -230,9 +230,7 @@ class K8sClient(object): if self.token: header.update({'Authorization': 'Bearer %s' % self.token}) - response = self.session.delete(url, cert=self.cert, - verify=self.verify_server, - headers=header) + response = self.session.delete(url, headers=header, **self._rq_params) self._raise_from_response(response) return response.json() @@ -259,8 +257,7 @@ class K8sClient(object): } response = self.session.patch(url, json=data, headers=headers, - cert=self.cert, - verify=self.verify_server) + **self._rq_params) if response.ok: return True @@ -301,8 +298,7 @@ class K8sClient(object): } response = self.session.patch(url, json=data, headers=headers, - cert=self.cert, - verify=self.verify_server) + **self._rq_params) if response.ok: return True @@ -354,8 +350,7 @@ class K8sClient(object): metadata['resourceVersion'] = resource_version data = jsonutils.dumps({"metadata": metadata}, sort_keys=True) response = self.session.patch(url, data=data, - headers=header, cert=self.cert, - verify=self.verify_server) + headers=header, **self._rq_params) if response.ok: return response.json()['metadata'].get('annotations', {}) if response.status_code == requests.codes.conflict: @@ -387,8 +382,6 @@ class K8sClient(object): url = self._base_url + path resource_version = None header = {} - timeouts = (CONF.kubernetes.watch_connection_timeout, - CONF.kubernetes.watch_read_timeout) if self.token: header.update({'Authorization': 'Bearer %s' % self.token}) @@ -400,9 +393,8 @@ class K8sClient(object): params['resourceVersion'] = resource_version with contextlib.closing( self.session.get( - url, params=params, stream=True, cert=self.cert, - verify=self.verify_server, headers=header, - timeout=timeouts)) as response: + url, params=params, stream=True, headers=header, + **self._rq_params)) as response: if not response.ok: raise exc.K8sClientException(response.text) attempt = 0 diff --git a/kuryr_kubernetes/tests/unit/test_k8s_client.py b/kuryr_kubernetes/tests/unit/test_k8s_client.py index 3ef1d7c86..273f79243 100644 --- a/kuryr_kubernetes/tests/unit/test_k8s_client.py +++ b/kuryr_kubernetes/tests/unit/test_k8s_client.py @@ -106,7 +106,7 @@ class TestK8sClient(test_base.TestCase): 'Authorization': 'Bearer {}'.format(token_content)} m_get.assert_called_once_with( self.base_url + path, cert=(None, None), headers=headers, - verify=False) + verify=False, timeout=(30, 60)) finally: os.unlink(m_cfg.kubernetes.token_file) @@ -123,7 +123,7 @@ class TestK8sClient(test_base.TestCase): self.assertEqual(ret, self.client.get(path)) m_get.assert_called_once_with( self.base_url + path, - cert=(None, None), headers={}, verify=False) + cert=(None, None), headers={}, verify=False, timeout=(30, 60)) @mock.patch('requests.sessions.Session.get') def test_get_exception(self, m_get): @@ -155,7 +155,8 @@ class TestK8sClient(test_base.TestCase): path, annotations, resource_version=resource_version)) m_patch.assert_called_once_with(self.base_url + path, data=data, headers=mock.ANY, - cert=(None, None), verify=False) + cert=(None, None), verify=False, + timeout=(30, 60)) @mock.patch('itertools.count') @mock.patch('requests.sessions.Session.patch') @@ -203,7 +204,7 @@ class TestK8sClient(test_base.TestCase): mock.call(self.base_url + path, data=conflicting_data, headers=mock.ANY, - cert=(None, None), verify=False)]) + cert=(None, None), verify=False, timeout=(30, 60))]) @mock.patch('itertools.count') @mock.patch('requests.sessions.Session.patch') @@ -243,11 +244,11 @@ class TestK8sClient(test_base.TestCase): mock.call(self.base_url + path, data=annotating_data, headers=mock.ANY, - cert=(None, None), verify=False), + cert=(None, None), verify=False, timeout=(30, 60)), mock.call(self.base_url + path, data=resolution_data, headers=mock.ANY, - cert=(None, None), verify=False)]) + cert=(None, None), verify=False, timeout=(30, 60))]) @mock.patch('itertools.count') @mock.patch('requests.sessions.Session.patch') @@ -287,11 +288,11 @@ class TestK8sClient(test_base.TestCase): mock.call(self.base_url + path, data=conflicting_data, headers=mock.ANY, - cert=(None, None), verify=False), + cert=(None, None), verify=False, timeout=(30, 60)), mock.call(self.base_url + path, data=good_data, headers=mock.ANY, - cert=(None, None), verify=False)]) + cert=(None, None), verify=False, timeout=(30, 60))]) @mock.patch('itertools.count') @mock.patch('requests.sessions.Session.patch') @@ -318,7 +319,8 @@ class TestK8sClient(test_base.TestCase): m_patch.assert_called_once_with(self.base_url + path, data=annotate_data, headers=mock.ANY, - cert=(None, None), verify=False) + cert=(None, None), verify=False, + timeout=(30, 60)) @mock.patch('requests.sessions.Session.get') def test_watch(self, m_get): @@ -395,7 +397,7 @@ class TestK8sClient(test_base.TestCase): self.assertEqual(ret, self.client.post(path, body)) m_post.assert_called_once_with(self.base_url + path, json=body, headers=mock.ANY, cert=(None, None), - verify=False) + verify=False, timeout=(30, 60)) @mock.patch('requests.sessions.Session.post') def test_post_exception(self, m_post): @@ -422,7 +424,7 @@ class TestK8sClient(test_base.TestCase): self.assertEqual(ret, self.client.delete(path)) m_delete.assert_called_once_with(self.base_url + path, headers=mock.ANY, cert=(None, None), - verify=False) + verify=False, timeout=(30, 60)) @mock.patch('requests.sessions.Session.delete') def test_delete_exception(self, m_delete):