diff --git a/kuryr_kubernetes/k8s_client.py b/kuryr_kubernetes/k8s_client.py index 0d1955544..1ab82c938 100644 --- a/kuryr_kubernetes/k8s_client.py +++ b/kuryr_kubernetes/k8s_client.py @@ -14,12 +14,16 @@ # under the License. import contextlib +import itertools +from oslo_log import log as logging from oslo_serialization import jsonutils import requests from kuryr_kubernetes import exceptions as exc +LOG = logging.getLogger(__name__) + class K8sClient(object): # REVISIT(ivc): replace with python-k8sclient if it could be extended @@ -29,6 +33,7 @@ class K8sClient(object): self._base_url = base_url def get(self, path): + LOG.debug("Get %(path)s", {'path': path}) url = self._base_url + path response = requests.get(url) if not response.ok: @@ -36,20 +41,46 @@ class K8sClient(object): return response.json() def annotate(self, path, annotations, resource_version=None): + """Pushes a resource annotation to the K8s API resource + + The annotate operation is made with a PATCH HTTP request of kind: + application/merge-patch+json as described in: + + https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#patch-operations # noqa + """ + LOG.debug("Annotate %(path)s: %(names)s", { + 'path': path, 'names': list(annotations)}) url = self._base_url + path - data = jsonutils.dumps({ - "metadata": { - "annotations": annotations, - "resourceVersion": resource_version, - } - }, sort_keys=True) - response = requests.patch(url, data=data, headers={ - 'Content-Type': 'application/merge-patch+json', - 'Accept': 'application/json', - }) - if not response.ok: + while itertools.count(1): + data = jsonutils.dumps({ + "metadata": { + "annotations": annotations, + "resourceVersion": resource_version, + } + }, sort_keys=True) + response = requests.patch(url, data=data, headers={ + 'Content-Type': 'application/merge-patch+json', + 'Accept': 'application/json', + }) + if response.ok: + return response.json()['metadata']['annotations'] + if response.status_code == requests.codes.conflict: + resource = self.get(path) + new_version = resource['metadata']['resourceVersion'] + retrieved_annotations = resource['metadata'].get( + 'annotations', {}) + + for k, v in annotations.items(): + if v != retrieved_annotations.get(k, v): + break + else: + # No conflicting annotations found. Retry patching + resource_version = new_version + continue + LOG.debug("Annotations for %(path)s already present: " + "%(names)s", {'path': path, + 'names': retrieved_annotations}) raise exc.K8sClientException(response.text) - return response.json()['metadata']['annotations'] def watch(self, path): params = {'watch': 'true'} diff --git a/kuryr_kubernetes/tests/unit/test_k8s_client.py b/kuryr_kubernetes/tests/unit/test_k8s_client.py index c7fc5e2bc..e42c042d7 100644 --- a/kuryr_kubernetes/tests/unit/test_k8s_client.py +++ b/kuryr_kubernetes/tests/unit/test_k8s_client.py @@ -17,6 +17,7 @@ import itertools import mock from oslo_serialization import jsonutils +import requests from kuryr_kubernetes import exceptions as exc from kuryr_kubernetes import k8s_client @@ -52,8 +53,10 @@ class TestK8sClient(test_base.TestCase): self.assertRaises(exc.K8sClientException, self.client.get, path) + @mock.patch('itertools.count') @mock.patch('requests.patch') - def test_annotate(self, m_patch): + def test_annotate(self, m_patch, m_count): + m_count.return_value = list(range(1, 5)) path = '/test' annotations = {'a1': 'v1', 'a2': 'v2'} resource_version = "123" @@ -71,8 +74,10 @@ class TestK8sClient(test_base.TestCase): m_patch.assert_called_once_with(self.base_url + path, data=data, headers=mock.ANY) + @mock.patch('itertools.count') @mock.patch('requests.patch') - def test_annotate_exception(self, m_patch): + def test_annotate_exception(self, m_patch, m_count): + m_count.return_value = list(range(1, 5)) path = '/test' m_resp = mock.MagicMock() @@ -82,6 +87,117 @@ class TestK8sClient(test_base.TestCase): self.assertRaises(exc.K8sClientException, self.client.annotate, path, {}) + @mock.patch('itertools.count') + @mock.patch('requests.patch') + def test_annotate_diff_resource_vers_no_conflict(self, m_patch, m_count): + m_count.return_value = list(range(1, 5)) + path = '/test' + annotations = {'a1': 'v1', 'a2': 'v2'} + resource_version = "123" + new_resource_version = "456" + conflicting_obj = {'metadata': { + 'annotations': annotations, + 'resourceVersion': resource_version}} + good_obj = {'metadata': { + 'annotations': annotations, + 'resourceVersion': new_resource_version}} + conflicting_data = jsonutils.dumps(conflicting_obj, sort_keys=True) + good_data = jsonutils.dumps(good_obj, sort_keys=True) + + m_resp_conflict = mock.MagicMock() + m_resp_conflict.ok = False + m_resp_conflict.status_code = requests.codes.conflict + m_resp_good = mock.MagicMock() + m_resp_good.ok = True + m_resp_good.json.return_value = conflicting_obj + m_patch.side_effect = [m_resp_conflict, m_resp_good] + + with mock.patch.object(self.client, 'get') as m_get: + m_get.return_value = good_obj + self.assertEqual(annotations, self.client.annotate( + path, annotations, resource_version=resource_version)) + + m_patch.assert_has_calls([ + mock.call(self.base_url + path, + data=conflicting_data, + headers=mock.ANY), + mock.call(self.base_url + path, + data=good_data, + headers=mock.ANY)]) + + @mock.patch('itertools.count') + @mock.patch('requests.patch') + def test_annotate_diff_resource_vers_no_annotation(self, m_patch, m_count): + m_count.return_value = list(range(1, 5)) + path = '/test' + annotations = {'a1': 'v1', 'a2': 'v2'} + annotating_resource_version = '123' + annotating_obj = {'metadata': { + 'annotations': annotations, + 'resourceVersion': annotating_resource_version}} + annotating_data = jsonutils.dumps(annotating_obj, sort_keys=True) + + new_resource_version = '456' + new_obj = {'metadata': { + 'resourceVersion': new_resource_version}} + + resolution_obj = annotating_obj.copy() + resolution_obj['metadata']['resourceVersion'] = new_resource_version + resolution_data = jsonutils.dumps(resolution_obj, sort_keys=True) + + m_resp_conflict = mock.MagicMock() + m_resp_conflict.ok = False + m_resp_conflict.status_code = requests.codes.conflict + m_resp_good = mock.MagicMock() + m_resp_good.ok = True + m_resp_good.json.return_value = resolution_obj + m_patch.side_effect = (m_resp_conflict, m_resp_good) + + with mock.patch.object(self.client, 'get') as m_get: + m_get.return_value = new_obj + self.assertEqual(annotations, self.client.annotate( + path, annotations, + resource_version=annotating_resource_version)) + + m_patch.assert_has_calls([ + mock.call(self.base_url + path, + data=annotating_data, + headers=mock.ANY), + mock.call(self.base_url + path, + data=resolution_data, + headers=mock.ANY)]) + + @mock.patch('itertools.count') + @mock.patch('requests.patch') + def test_annotate_diff_resource_vers_conflict(self, m_patch, m_count): + m_count.return_value = list(range(1, 5)) + path = '/test' + annotations = {'a1': 'v1', 'a2': 'v2'} + resource_version = "123" + new_resource_version = "456" + conflicting_obj = {'metadata': { + 'annotations': annotations, + 'resourceVersion': resource_version}} + actual_obj = {'metadata': { + 'annotations': {'a1': 'v2'}, + 'resourceVersion': new_resource_version}} + conflicting_data = jsonutils.dumps(conflicting_obj, sort_keys=True) + + m_resp_conflict = mock.MagicMock() + m_resp_conflict.ok = False + m_resp_conflict.status_code = requests.codes.conflict + m_patch.return_value = m_resp_conflict + + with mock.patch.object(self.client, 'get') as m_get: + m_get.return_value = actual_obj + self.assertRaises(exc.K8sClientException, + self.client.annotate, + path, annotations, + resource_version=resource_version) + m_patch.assert_called_once_with(self.base_url + path, + data=conflicting_data, + headers=mock.ANY) + @mock.patch('requests.get') def test_watch(self, m_get): path = '/test'