Support upgrading LBaaSState annotation to KLB CRD
On upgrade from version using annotations on Endpoints and Services objects to save information about created Octavia resources, to a version where that information lives in KuryrLoadBalancer CRD we need to make sure that data is converted. Otherwise we can end up with doubled loadbalancers. This commit makes sure data is converted before we try processing any Service or Endpoints resource that has annotations. Change-Id: I01ee5cedc7af8bd02283d065cd9b6f4a94f79888
This commit is contained in:
parent
ddb5895311
commit
d80e1bff99
@ -317,7 +317,7 @@ def get_kuryrnetworkpolicy_crds(namespace=None):
|
||||
LOG.debug("Returning KuryrNetworkPolicies %s", knps)
|
||||
except k_exc.K8sResourceNotFound:
|
||||
LOG.exception("KuryrNetworkPolicy CRD not found")
|
||||
raise
|
||||
return []
|
||||
except k_exc.K8sClientException:
|
||||
LOG.exception("Kubernetes Client Exception")
|
||||
raise
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
from kuryr.lib._i18n import _
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from kuryr_kubernetes import clients
|
||||
from kuryr_kubernetes import config
|
||||
@ -47,11 +48,9 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
|
||||
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()
|
||||
|
||||
def on_present(self, service):
|
||||
if self._should_ignore(service):
|
||||
LOG.debug("Skipping Kubernetes service %s of an unsupported kind "
|
||||
"or without a selector as Kubernetes does not create "
|
||||
"an endpoint object for it.",
|
||||
service['metadata']['name'])
|
||||
reason = self._should_ignore(service)
|
||||
if reason:
|
||||
LOG.debug(reason, service['metadata']['name'])
|
||||
return
|
||||
|
||||
k8s = clients.get_kubernetes_client()
|
||||
@ -71,14 +70,26 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
|
||||
spec = service['spec']
|
||||
return spec.get('type') in SUPPORTED_SERVICE_TYPES
|
||||
|
||||
def _has_spec_annotation(self, service):
|
||||
return (k_const.K8S_ANNOTATION_LBAAS_SPEC in
|
||||
service['metadata'].get('annotations', {}))
|
||||
|
||||
def _get_service_ip(self, service):
|
||||
if self._is_supported_type(service):
|
||||
return service['spec'].get('clusterIP')
|
||||
return None
|
||||
|
||||
def _should_ignore(self, service):
|
||||
return (not(self._has_clusterip(service)) or
|
||||
not(self._is_supported_type(service)))
|
||||
if not self._has_clusterip(service):
|
||||
return 'Skipping headless Service %s.'
|
||||
elif not self._is_supported_type(service):
|
||||
return 'Skipping service %s of unsupported type.'
|
||||
elif self._has_spec_annotation(service):
|
||||
return ('Skipping annotated service %s, waiting for it to be '
|
||||
'converted to KuryrLoadBalancer object and annotation '
|
||||
'removed.')
|
||||
else:
|
||||
return None
|
||||
|
||||
def _patch_service_finalizer(self, service):
|
||||
k8s = clients.get_kubernetes_client()
|
||||
@ -255,6 +266,9 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
|
||||
config.CONF.kubernetes.endpoints_driver_octavia_provider)
|
||||
|
||||
def on_present(self, endpoints):
|
||||
if self._move_annotations_to_crd(endpoints):
|
||||
return
|
||||
|
||||
k8s = clients.get_kubernetes_client()
|
||||
loadbalancer_crd = k8s.get_loadbalancer_crd(endpoints)
|
||||
|
||||
@ -297,26 +311,27 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
|
||||
for address in subset.get('addresses', [])
|
||||
if address.get('targetRef', {}).get('kind') == 'Pod')
|
||||
|
||||
def _create_crd_spec(self, endpoints):
|
||||
def _create_crd_spec(self, endpoints, spec=None, status=None):
|
||||
endpoints_name = endpoints['metadata']['name']
|
||||
namespace = endpoints['metadata']['namespace']
|
||||
kubernetes = clients.get_kubernetes_client()
|
||||
|
||||
subsets = endpoints.get('subsets', [])
|
||||
if not status:
|
||||
status = {}
|
||||
if not spec:
|
||||
spec = {'subsets': subsets}
|
||||
|
||||
loadbalancer_crd = {
|
||||
'apiVersion': 'openstack.org/v1',
|
||||
'kind': 'KuryrLoadBalancer',
|
||||
'metadata': {
|
||||
'name': endpoints_name,
|
||||
'finalizers': [k_const.KURYRLB_FINALIZER]
|
||||
},
|
||||
'spec': {
|
||||
'subsets': subsets
|
||||
},
|
||||
'status': {
|
||||
}
|
||||
}
|
||||
'finalizers': [k_const.KURYRLB_FINALIZER],
|
||||
},
|
||||
'spec': spec,
|
||||
'status': status,
|
||||
}
|
||||
|
||||
try:
|
||||
kubernetes.post('{}/{}/kuryrloadbalancers'.format(
|
||||
@ -360,3 +375,73 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
|
||||
'link': ep_link})
|
||||
link_parts[-2] = 'services'
|
||||
return "/".join(link_parts)
|
||||
|
||||
def _move_annotations_to_crd(self, endpoints):
|
||||
"""Support upgrade from annotations to KuryrLoadBalancer CRD."""
|
||||
try:
|
||||
spec = (endpoints['metadata']['annotations']
|
||||
[k_const.K8S_ANNOTATION_LBAAS_SPEC])
|
||||
except KeyError:
|
||||
spec = None
|
||||
|
||||
try:
|
||||
state = (endpoints['metadata']['annotations']
|
||||
[k_const.K8S_ANNOTATION_LBAAS_STATE])
|
||||
except KeyError:
|
||||
state = None
|
||||
|
||||
if not state and not spec:
|
||||
# No annotations, return
|
||||
return False
|
||||
|
||||
if state or spec:
|
||||
if state:
|
||||
_dict = jsonutils.loads(state)
|
||||
# This is strongly using the fact that annotation's o.vo
|
||||
# and CRD has the same structure.
|
||||
state = obj_lbaas.flatten_object(_dict)
|
||||
|
||||
# Endpoints should always have the spec in the annotation
|
||||
spec_dict = jsonutils.loads(spec)
|
||||
spec = obj_lbaas.flatten_object(spec_dict)
|
||||
|
||||
if state and state['service_pub_ip_info'] is None:
|
||||
del state['service_pub_ip_info']
|
||||
for spec_port in spec['ports']:
|
||||
if not spec_port.get('name'):
|
||||
del spec_port['name']
|
||||
if not spec['lb_ip']:
|
||||
del spec['lb_ip']
|
||||
|
||||
try:
|
||||
self._create_crd_spec(endpoints, spec, state)
|
||||
except k_exc.ResourceNotReady:
|
||||
LOG.info('KuryrLoadBalancer CRD %s already exists.',
|
||||
utils.get_res_unique_name(endpoints))
|
||||
except k_exc.K8sClientException:
|
||||
raise k_exc.ResourceNotReady(endpoints)
|
||||
|
||||
# In this step we only need to make sure all annotations are
|
||||
# removed. It may happen that the Endpoints only had spec set,
|
||||
# in which case we just remove it and let the normal flow handle
|
||||
# creation of the LB.
|
||||
k8s = clients.get_kubernetes_client()
|
||||
service_link = utils.get_service_link(endpoints)
|
||||
to_remove = [
|
||||
(endpoints['metadata']['selfLink'],
|
||||
k_const.K8S_ANNOTATION_LBAAS_SPEC),
|
||||
(service_link,
|
||||
k_const.K8S_ANNOTATION_LBAAS_SPEC),
|
||||
]
|
||||
if state:
|
||||
to_remove.append((endpoints['metadata']['selfLink'],
|
||||
k_const.K8S_ANNOTATION_LBAAS_STATE))
|
||||
|
||||
for path, name in to_remove:
|
||||
try:
|
||||
k8s.remove_annotations(path, name)
|
||||
except k_exc.K8sClientException:
|
||||
LOG.warning('Error removing %s annotation from %s', name,
|
||||
path)
|
||||
|
||||
return True
|
||||
|
@ -85,7 +85,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||
lb_ip = loadbalancer_crd['spec'].get('lb_ip')
|
||||
pub_info = loadbalancer_crd['status'].get(
|
||||
'service_pub_ip_info')
|
||||
if pub_info is None:
|
||||
if pub_info is None and loadbalancer_crd['spec'].get('type'):
|
||||
service_pub_ip_info = (
|
||||
self._drv_service_pub_ip.acquire_service_pub_ip_info(
|
||||
loadbalancer_crd['spec']['type'],
|
||||
@ -495,7 +495,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||
for l in loadbalancer_crd['status']['listeners']:
|
||||
if l['id'] != pool['listener_id']:
|
||||
continue
|
||||
for port in loadbalancer_crd['spec'].get('ports'):
|
||||
for port in loadbalancer_crd['spec'].get('ports', []):
|
||||
if l['port'] == port['port'] and l['protocol'] == port[
|
||||
'protocol']:
|
||||
return True
|
||||
|
@ -173,14 +173,20 @@ class K8sClient(object):
|
||||
self._raise_from_response(response)
|
||||
return response.json().get('status')
|
||||
|
||||
def _jsonpatch_escape(self, value):
|
||||
value = value.replace('~', '~0')
|
||||
value = value.replace('/', '~1')
|
||||
return value
|
||||
|
||||
def remove_annotations(self, path, annotation_name):
|
||||
LOG.debug("Remove annotations %(path)s: %(name)s",
|
||||
{'path': path, 'name': annotation_name})
|
||||
content_type = 'application/json-patch+json'
|
||||
url, header = self._get_url_and_header(path, content_type)
|
||||
annotation_name = self._jsonpatch_escape(annotation_name)
|
||||
|
||||
data = [{'op': 'remove',
|
||||
'path': '/metadata/annotations',
|
||||
'value': annotation_name}]
|
||||
|
||||
'path': f'/metadata/annotations/{annotation_name}'}]
|
||||
response = self.session.patch(url, data=jsonutils.dumps(data),
|
||||
headers=header, cert=self.cert,
|
||||
verify=self.verify_server)
|
||||
|
@ -147,3 +147,18 @@ class LBaaSServiceSpec(k_obj.KuryrK8sObjectBase):
|
||||
'type': obj_fields.StringField(nullable=True, default=None),
|
||||
'lb_ip': obj_fields.IPAddressField(nullable=True, default=None),
|
||||
}
|
||||
|
||||
|
||||
def flatten_object(ovo_primitive):
|
||||
if type(ovo_primitive) is dict:
|
||||
d = {}
|
||||
for k, v in ovo_primitive['versioned_object.data'].items():
|
||||
d[k] = flatten_object(v)
|
||||
return d
|
||||
elif type(ovo_primitive) is list:
|
||||
ls = []
|
||||
for v in ovo_primitive:
|
||||
ls.append(flatten_object(v))
|
||||
return ls
|
||||
else:
|
||||
return ovo_primitive
|
||||
|
@ -379,6 +379,18 @@ def get_endpoints_link(service):
|
||||
return "/".join(link_parts)
|
||||
|
||||
|
||||
def get_service_link(endpoints):
|
||||
endpoints_link = endpoints['metadata']['selfLink']
|
||||
link_parts = endpoints_link.split('/')
|
||||
|
||||
if link_parts[-2] != 'endpoints':
|
||||
raise exceptions.IntegrityError(
|
||||
f"Unsupported endpoints link: {endpoints_link}")
|
||||
link_parts[-2] = 'services'
|
||||
|
||||
return "/".join(link_parts)
|
||||
|
||||
|
||||
def has_port_changes(service, loadbalancer_crd):
|
||||
if not loadbalancer_crd:
|
||||
return False
|
||||
|
Loading…
Reference in New Issue
Block a user