Resolve 'resourceVersion' conflicts
It is possible for K8s resource to be updated while it is being processed by Kuryr handler. As a result K8sClient will fail to update annotations after the handler finishes its work due to the 'resourceVersion' conflict. This patch updates K8sClient to allow annotation updates with conflicting 'resourceVersion' if the updated annotations do not conflict with the current state (i.e. annotations are either new or have the same value as in the current state of the object). Change-Id: I6db14169bf8d6a8114a991bca9c220e15fccdce4 Co-Authored-By: Antoni Segura Puimedon <antonisp@celebdor.com> Closes-Bug: #1662448
This commit is contained in:
parent
794ec706c5
commit
24b8cc53c2
|
@ -14,12 +14,16 @@
|
|||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import itertools
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
import requests
|
||||
|
||||
from kuryr_kubernetes import exceptions as exc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class K8sClient(object):
|
||||
# REVISIT(ivc): replace with python-k8sclient if it could be extended
|
||||
|
@ -29,6 +33,7 @@ class K8sClient(object):
|
|||
self._base_url = base_url
|
||||
|
||||
def get(self, path):
|
||||
LOG.debug("Get %(path)s", {'path': path})
|
||||
url = self._base_url + path
|
||||
response = requests.get(url)
|
||||
if not response.ok:
|
||||
|
@ -36,20 +41,46 @@ class K8sClient(object):
|
|||
return response.json()
|
||||
|
||||
def annotate(self, path, annotations, resource_version=None):
|
||||
"""Pushes a resource annotation to the K8s API resource
|
||||
|
||||
The annotate operation is made with a PATCH HTTP request of kind:
|
||||
application/merge-patch+json as described in:
|
||||
|
||||
https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#patch-operations # noqa
|
||||
"""
|
||||
LOG.debug("Annotate %(path)s: %(names)s", {
|
||||
'path': path, 'names': list(annotations)})
|
||||
url = self._base_url + path
|
||||
data = jsonutils.dumps({
|
||||
"metadata": {
|
||||
"annotations": annotations,
|
||||
"resourceVersion": resource_version,
|
||||
}
|
||||
}, sort_keys=True)
|
||||
response = requests.patch(url, data=data, headers={
|
||||
'Content-Type': 'application/merge-patch+json',
|
||||
'Accept': 'application/json',
|
||||
})
|
||||
if not response.ok:
|
||||
while itertools.count(1):
|
||||
data = jsonutils.dumps({
|
||||
"metadata": {
|
||||
"annotations": annotations,
|
||||
"resourceVersion": resource_version,
|
||||
}
|
||||
}, sort_keys=True)
|
||||
response = requests.patch(url, data=data, headers={
|
||||
'Content-Type': 'application/merge-patch+json',
|
||||
'Accept': 'application/json',
|
||||
})
|
||||
if response.ok:
|
||||
return response.json()['metadata']['annotations']
|
||||
if response.status_code == requests.codes.conflict:
|
||||
resource = self.get(path)
|
||||
new_version = resource['metadata']['resourceVersion']
|
||||
retrieved_annotations = resource['metadata'].get(
|
||||
'annotations', {})
|
||||
|
||||
for k, v in annotations.items():
|
||||
if v != retrieved_annotations.get(k, v):
|
||||
break
|
||||
else:
|
||||
# No conflicting annotations found. Retry patching
|
||||
resource_version = new_version
|
||||
continue
|
||||
LOG.debug("Annotations for %(path)s already present: "
|
||||
"%(names)s", {'path': path,
|
||||
'names': retrieved_annotations})
|
||||
raise exc.K8sClientException(response.text)
|
||||
return response.json()['metadata']['annotations']
|
||||
|
||||
def watch(self, path):
|
||||
params = {'watch': 'true'}
|
||||
|
|
|
@ -17,6 +17,7 @@ import itertools
|
|||
import mock
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
import requests
|
||||
|
||||
from kuryr_kubernetes import exceptions as exc
|
||||
from kuryr_kubernetes import k8s_client
|
||||
|
@ -52,8 +53,10 @@ class TestK8sClient(test_base.TestCase):
|
|||
|
||||
self.assertRaises(exc.K8sClientException, self.client.get, path)
|
||||
|
||||
@mock.patch('itertools.count')
|
||||
@mock.patch('requests.patch')
|
||||
def test_annotate(self, m_patch):
|
||||
def test_annotate(self, m_patch, m_count):
|
||||
m_count.return_value = list(range(1, 5))
|
||||
path = '/test'
|
||||
annotations = {'a1': 'v1', 'a2': 'v2'}
|
||||
resource_version = "123"
|
||||
|
@ -71,8 +74,10 @@ class TestK8sClient(test_base.TestCase):
|
|||
m_patch.assert_called_once_with(self.base_url + path,
|
||||
data=data, headers=mock.ANY)
|
||||
|
||||
@mock.patch('itertools.count')
|
||||
@mock.patch('requests.patch')
|
||||
def test_annotate_exception(self, m_patch):
|
||||
def test_annotate_exception(self, m_patch, m_count):
|
||||
m_count.return_value = list(range(1, 5))
|
||||
path = '/test'
|
||||
|
||||
m_resp = mock.MagicMock()
|
||||
|
@ -82,6 +87,117 @@ class TestK8sClient(test_base.TestCase):
|
|||
self.assertRaises(exc.K8sClientException, self.client.annotate,
|
||||
path, {})
|
||||
|
||||
@mock.patch('itertools.count')
|
||||
@mock.patch('requests.patch')
|
||||
def test_annotate_diff_resource_vers_no_conflict(self, m_patch, m_count):
|
||||
m_count.return_value = list(range(1, 5))
|
||||
path = '/test'
|
||||
annotations = {'a1': 'v1', 'a2': 'v2'}
|
||||
resource_version = "123"
|
||||
new_resource_version = "456"
|
||||
conflicting_obj = {'metadata': {
|
||||
'annotations': annotations,
|
||||
'resourceVersion': resource_version}}
|
||||
good_obj = {'metadata': {
|
||||
'annotations': annotations,
|
||||
'resourceVersion': new_resource_version}}
|
||||
conflicting_data = jsonutils.dumps(conflicting_obj, sort_keys=True)
|
||||
good_data = jsonutils.dumps(good_obj, sort_keys=True)
|
||||
|
||||
m_resp_conflict = mock.MagicMock()
|
||||
m_resp_conflict.ok = False
|
||||
m_resp_conflict.status_code = requests.codes.conflict
|
||||
m_resp_good = mock.MagicMock()
|
||||
m_resp_good.ok = True
|
||||
m_resp_good.json.return_value = conflicting_obj
|
||||
m_patch.side_effect = [m_resp_conflict, m_resp_good]
|
||||
|
||||
with mock.patch.object(self.client, 'get') as m_get:
|
||||
m_get.return_value = good_obj
|
||||
self.assertEqual(annotations, self.client.annotate(
|
||||
path, annotations, resource_version=resource_version))
|
||||
|
||||
m_patch.assert_has_calls([
|
||||
mock.call(self.base_url + path,
|
||||
data=conflicting_data,
|
||||
headers=mock.ANY),
|
||||
mock.call(self.base_url + path,
|
||||
data=good_data,
|
||||
headers=mock.ANY)])
|
||||
|
||||
@mock.patch('itertools.count')
|
||||
@mock.patch('requests.patch')
|
||||
def test_annotate_diff_resource_vers_no_annotation(self, m_patch, m_count):
|
||||
m_count.return_value = list(range(1, 5))
|
||||
path = '/test'
|
||||
annotations = {'a1': 'v1', 'a2': 'v2'}
|
||||
annotating_resource_version = '123'
|
||||
annotating_obj = {'metadata': {
|
||||
'annotations': annotations,
|
||||
'resourceVersion': annotating_resource_version}}
|
||||
annotating_data = jsonutils.dumps(annotating_obj, sort_keys=True)
|
||||
|
||||
new_resource_version = '456'
|
||||
new_obj = {'metadata': {
|
||||
'resourceVersion': new_resource_version}}
|
||||
|
||||
resolution_obj = annotating_obj.copy()
|
||||
resolution_obj['metadata']['resourceVersion'] = new_resource_version
|
||||
resolution_data = jsonutils.dumps(resolution_obj, sort_keys=True)
|
||||
|
||||
m_resp_conflict = mock.MagicMock()
|
||||
m_resp_conflict.ok = False
|
||||
m_resp_conflict.status_code = requests.codes.conflict
|
||||
m_resp_good = mock.MagicMock()
|
||||
m_resp_good.ok = True
|
||||
m_resp_good.json.return_value = resolution_obj
|
||||
m_patch.side_effect = (m_resp_conflict, m_resp_good)
|
||||
|
||||
with mock.patch.object(self.client, 'get') as m_get:
|
||||
m_get.return_value = new_obj
|
||||
self.assertEqual(annotations, self.client.annotate(
|
||||
path, annotations,
|
||||
resource_version=annotating_resource_version))
|
||||
|
||||
m_patch.assert_has_calls([
|
||||
mock.call(self.base_url + path,
|
||||
data=annotating_data,
|
||||
headers=mock.ANY),
|
||||
mock.call(self.base_url + path,
|
||||
data=resolution_data,
|
||||
headers=mock.ANY)])
|
||||
|
||||
@mock.patch('itertools.count')
|
||||
@mock.patch('requests.patch')
|
||||
def test_annotate_diff_resource_vers_conflict(self, m_patch, m_count):
|
||||
m_count.return_value = list(range(1, 5))
|
||||
path = '/test'
|
||||
annotations = {'a1': 'v1', 'a2': 'v2'}
|
||||
resource_version = "123"
|
||||
new_resource_version = "456"
|
||||
conflicting_obj = {'metadata': {
|
||||
'annotations': annotations,
|
||||
'resourceVersion': resource_version}}
|
||||
actual_obj = {'metadata': {
|
||||
'annotations': {'a1': 'v2'},
|
||||
'resourceVersion': new_resource_version}}
|
||||
conflicting_data = jsonutils.dumps(conflicting_obj, sort_keys=True)
|
||||
|
||||
m_resp_conflict = mock.MagicMock()
|
||||
m_resp_conflict.ok = False
|
||||
m_resp_conflict.status_code = requests.codes.conflict
|
||||
m_patch.return_value = m_resp_conflict
|
||||
|
||||
with mock.patch.object(self.client, 'get') as m_get:
|
||||
m_get.return_value = actual_obj
|
||||
self.assertRaises(exc.K8sClientException,
|
||||
self.client.annotate,
|
||||
path, annotations,
|
||||
resource_version=resource_version)
|
||||
m_patch.assert_called_once_with(self.base_url + path,
|
||||
data=conflicting_data,
|
||||
headers=mock.ANY)
|
||||
|
||||
@mock.patch('requests.get')
|
||||
def test_watch(self, m_get):
|
||||
path = '/test'
|
||||
|
|
Loading…
Reference in New Issue