From d80e1bff997df66d025ebb3d222ba6489a1207a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Dulko?= Date: Wed, 29 Jul 2020 17:58:20 +0200 Subject: [PATCH] Support upgrading LBaaSState annotation to KLB CRD On upgrade from version using annotations on Endpoints and Services objects to save information about created Octavia resources, to a version where that information lives in KuryrLoadBalancer CRD we need to make sure that data is converted. Otherwise we can end up with doubled loadbalancers. This commit makes sure data is converted before we try processing any Service or Endpoints resource that has annotations. Change-Id: I01ee5cedc7af8bd02283d065cd9b6f4a94f79888 --- kuryr_kubernetes/controller/drivers/utils.py | 2 +- kuryr_kubernetes/controller/handlers/lbaas.py | 117 +++++++++++++++--- .../controller/handlers/loadbalancer.py | 4 +- kuryr_kubernetes/k8s_client.py | 12 +- kuryr_kubernetes/objects/lbaas.py | 15 +++ kuryr_kubernetes/utils.py | 12 ++ 6 files changed, 140 insertions(+), 22 deletions(-) diff --git a/kuryr_kubernetes/controller/drivers/utils.py b/kuryr_kubernetes/controller/drivers/utils.py index 093b5e514..d765bc6d1 100644 --- a/kuryr_kubernetes/controller/drivers/utils.py +++ b/kuryr_kubernetes/controller/drivers/utils.py @@ -317,7 +317,7 @@ def get_kuryrnetworkpolicy_crds(namespace=None): LOG.debug("Returning KuryrNetworkPolicies %s", knps) except k_exc.K8sResourceNotFound: LOG.exception("KuryrNetworkPolicy CRD not found") - raise + return [] except k_exc.K8sClientException: LOG.exception("Kubernetes Client Exception") raise diff --git a/kuryr_kubernetes/controller/handlers/lbaas.py b/kuryr_kubernetes/controller/handlers/lbaas.py index 5884a0ef6..00342e4b4 100644 --- a/kuryr_kubernetes/controller/handlers/lbaas.py +++ b/kuryr_kubernetes/controller/handlers/lbaas.py @@ -15,6 +15,7 @@ from kuryr.lib._i18n import _ from oslo_log import log as logging +from oslo_serialization import jsonutils from kuryr_kubernetes import clients from kuryr_kubernetes import config @@ -47,11 +48,9 @@ class ServiceHandler(k8s_base.ResourceEventHandler): self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance() def on_present(self, service): - if self._should_ignore(service): - LOG.debug("Skipping Kubernetes service %s of an unsupported kind " - "or without a selector as Kubernetes does not create " - "an endpoint object for it.", - service['metadata']['name']) + reason = self._should_ignore(service) + if reason: + LOG.debug(reason, service['metadata']['name']) return k8s = clients.get_kubernetes_client() @@ -71,14 +70,26 @@ class ServiceHandler(k8s_base.ResourceEventHandler): spec = service['spec'] return spec.get('type') in SUPPORTED_SERVICE_TYPES + def _has_spec_annotation(self, service): + return (k_const.K8S_ANNOTATION_LBAAS_SPEC in + service['metadata'].get('annotations', {})) + def _get_service_ip(self, service): if self._is_supported_type(service): return service['spec'].get('clusterIP') return None def _should_ignore(self, service): - return (not(self._has_clusterip(service)) or - not(self._is_supported_type(service))) + if not self._has_clusterip(service): + return 'Skipping headless Service %s.' + elif not self._is_supported_type(service): + return 'Skipping service %s of unsupported type.' + elif self._has_spec_annotation(service): + return ('Skipping annotated service %s, waiting for it to be ' + 'converted to KuryrLoadBalancer object and annotation ' + 'removed.') + else: + return None def _patch_service_finalizer(self, service): k8s = clients.get_kubernetes_client() @@ -255,6 +266,9 @@ class EndpointsHandler(k8s_base.ResourceEventHandler): config.CONF.kubernetes.endpoints_driver_octavia_provider) def on_present(self, endpoints): + if self._move_annotations_to_crd(endpoints): + return + k8s = clients.get_kubernetes_client() loadbalancer_crd = k8s.get_loadbalancer_crd(endpoints) @@ -297,26 +311,27 @@ class EndpointsHandler(k8s_base.ResourceEventHandler): for address in subset.get('addresses', []) if address.get('targetRef', {}).get('kind') == 'Pod') - def _create_crd_spec(self, endpoints): + def _create_crd_spec(self, endpoints, spec=None, status=None): endpoints_name = endpoints['metadata']['name'] namespace = endpoints['metadata']['namespace'] kubernetes = clients.get_kubernetes_client() subsets = endpoints.get('subsets', []) + if not status: + status = {} + if not spec: + spec = {'subsets': subsets} loadbalancer_crd = { 'apiVersion': 'openstack.org/v1', 'kind': 'KuryrLoadBalancer', 'metadata': { 'name': endpoints_name, - 'finalizers': [k_const.KURYRLB_FINALIZER] - }, - 'spec': { - 'subsets': subsets - }, - 'status': { - } - } + 'finalizers': [k_const.KURYRLB_FINALIZER], + }, + 'spec': spec, + 'status': status, + } try: kubernetes.post('{}/{}/kuryrloadbalancers'.format( @@ -360,3 +375,73 @@ class EndpointsHandler(k8s_base.ResourceEventHandler): 'link': ep_link}) link_parts[-2] = 'services' return "/".join(link_parts) + + def _move_annotations_to_crd(self, endpoints): + """Support upgrade from annotations to KuryrLoadBalancer CRD.""" + try: + spec = (endpoints['metadata']['annotations'] + [k_const.K8S_ANNOTATION_LBAAS_SPEC]) + except KeyError: + spec = None + + try: + state = (endpoints['metadata']['annotations'] + [k_const.K8S_ANNOTATION_LBAAS_STATE]) + except KeyError: + state = None + + if not state and not spec: + # No annotations, return + return False + + if state or spec: + if state: + _dict = jsonutils.loads(state) + # This is strongly using the fact that annotation's o.vo + # and CRD has the same structure. + state = obj_lbaas.flatten_object(_dict) + + # Endpoints should always have the spec in the annotation + spec_dict = jsonutils.loads(spec) + spec = obj_lbaas.flatten_object(spec_dict) + + if state and state['service_pub_ip_info'] is None: + del state['service_pub_ip_info'] + for spec_port in spec['ports']: + if not spec_port.get('name'): + del spec_port['name'] + if not spec['lb_ip']: + del spec['lb_ip'] + + try: + self._create_crd_spec(endpoints, spec, state) + except k_exc.ResourceNotReady: + LOG.info('KuryrLoadBalancer CRD %s already exists.', + utils.get_res_unique_name(endpoints)) + except k_exc.K8sClientException: + raise k_exc.ResourceNotReady(endpoints) + + # In this step we only need to make sure all annotations are + # removed. It may happen that the Endpoints only had spec set, + # in which case we just remove it and let the normal flow handle + # creation of the LB. + k8s = clients.get_kubernetes_client() + service_link = utils.get_service_link(endpoints) + to_remove = [ + (endpoints['metadata']['selfLink'], + k_const.K8S_ANNOTATION_LBAAS_SPEC), + (service_link, + k_const.K8S_ANNOTATION_LBAAS_SPEC), + ] + if state: + to_remove.append((endpoints['metadata']['selfLink'], + k_const.K8S_ANNOTATION_LBAAS_STATE)) + + for path, name in to_remove: + try: + k8s.remove_annotations(path, name) + except k_exc.K8sClientException: + LOG.warning('Error removing %s annotation from %s', name, + path) + + return True diff --git a/kuryr_kubernetes/controller/handlers/loadbalancer.py b/kuryr_kubernetes/controller/handlers/loadbalancer.py index 7ec3769ae..6b24516ca 100644 --- a/kuryr_kubernetes/controller/handlers/loadbalancer.py +++ b/kuryr_kubernetes/controller/handlers/loadbalancer.py @@ -85,7 +85,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): lb_ip = loadbalancer_crd['spec'].get('lb_ip') pub_info = loadbalancer_crd['status'].get( 'service_pub_ip_info') - if pub_info is None: + if pub_info is None and loadbalancer_crd['spec'].get('type'): service_pub_ip_info = ( self._drv_service_pub_ip.acquire_service_pub_ip_info( loadbalancer_crd['spec']['type'], @@ -495,7 +495,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): for l in loadbalancer_crd['status']['listeners']: if l['id'] != pool['listener_id']: continue - for port in loadbalancer_crd['spec'].get('ports'): + for port in loadbalancer_crd['spec'].get('ports', []): if l['port'] == port['port'] and l['protocol'] == port[ 'protocol']: return True diff --git a/kuryr_kubernetes/k8s_client.py b/kuryr_kubernetes/k8s_client.py index 3ea0a6cb0..b9f33ffa5 100644 --- a/kuryr_kubernetes/k8s_client.py +++ b/kuryr_kubernetes/k8s_client.py @@ -173,14 +173,20 @@ class K8sClient(object): self._raise_from_response(response) return response.json().get('status') + def _jsonpatch_escape(self, value): + value = value.replace('~', '~0') + value = value.replace('/', '~1') + return value + def remove_annotations(self, path, annotation_name): + LOG.debug("Remove annotations %(path)s: %(name)s", + {'path': path, 'name': annotation_name}) content_type = 'application/json-patch+json' url, header = self._get_url_and_header(path, content_type) + annotation_name = self._jsonpatch_escape(annotation_name) data = [{'op': 'remove', - 'path': '/metadata/annotations', - 'value': annotation_name}] - + 'path': f'/metadata/annotations/{annotation_name}'}] response = self.session.patch(url, data=jsonutils.dumps(data), headers=header, cert=self.cert, verify=self.verify_server) diff --git a/kuryr_kubernetes/objects/lbaas.py b/kuryr_kubernetes/objects/lbaas.py index 57f668191..51f87633d 100644 --- a/kuryr_kubernetes/objects/lbaas.py +++ b/kuryr_kubernetes/objects/lbaas.py @@ -147,3 +147,18 @@ class LBaaSServiceSpec(k_obj.KuryrK8sObjectBase): 'type': obj_fields.StringField(nullable=True, default=None), 'lb_ip': obj_fields.IPAddressField(nullable=True, default=None), } + + +def flatten_object(ovo_primitive): + if type(ovo_primitive) is dict: + d = {} + for k, v in ovo_primitive['versioned_object.data'].items(): + d[k] = flatten_object(v) + return d + elif type(ovo_primitive) is list: + ls = [] + for v in ovo_primitive: + ls.append(flatten_object(v)) + return ls + else: + return ovo_primitive diff --git a/kuryr_kubernetes/utils.py b/kuryr_kubernetes/utils.py index 52f1d3dd8..0c8328f44 100644 --- a/kuryr_kubernetes/utils.py +++ b/kuryr_kubernetes/utils.py @@ -379,6 +379,18 @@ def get_endpoints_link(service): return "/".join(link_parts) +def get_service_link(endpoints): + endpoints_link = endpoints['metadata']['selfLink'] + link_parts = endpoints_link.split('/') + + if link_parts[-2] != 'endpoints': + raise exceptions.IntegrityError( + f"Unsupported endpoints link: {endpoints_link}") + link_parts[-2] = 'services' + + return "/".join(link_parts) + + def has_port_changes(service, loadbalancer_crd): if not loadbalancer_crd: return False