# Copyright (c) 2016 Mirantis, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from kuryr.lib._i18n import _ from oslo_log import log as logging from oslo_serialization import jsonutils from kuryr_kubernetes import clients from kuryr_kubernetes import config from kuryr_kubernetes import constants as k_const from kuryr_kubernetes.controller.drivers import base as drv_base from kuryr_kubernetes import exceptions as k_exc from kuryr_kubernetes.handlers import k8s_base from kuryr_kubernetes.objects import lbaas as obj_lbaas from kuryr_kubernetes import utils LOG = logging.getLogger(__name__) SUPPORTED_SERVICE_TYPES = ('ClusterIP', 'LoadBalancer') class ServiceHandler(k8s_base.ResourceEventHandler): """ServiceHandler handles K8s Service events. ServiceHandler handles K8s Service events and updates related Endpoints with LBaaSServiceSpec when necessary. """ OBJECT_KIND = k_const.K8S_OBJ_SERVICE OBJECT_WATCH_PATH = "%s/%s" % (k_const.K8S_API_BASE, "services") def __init__(self): super(ServiceHandler, self).__init__() self._drv_project = drv_base.ServiceProjectDriver.get_instance() self._drv_subnets = drv_base.ServiceSubnetsDriver.get_instance() self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance() def on_present(self, service): reason = self._should_ignore(service) if reason: LOG.debug(reason, service['metadata']['name']) return k8s = clients.get_kubernetes_client() loadbalancer_crd = k8s.get_loadbalancer_crd(service) try: if not self._patch_service_finalizer(service): return except k_exc.K8sClientException as ex: LOG.exception("Failed to set service finalizer: %s", ex) raise if loadbalancer_crd is None: try: self.create_crd_spec(service) except k_exc.K8sNamespaceTerminating: LOG.warning('Namespace %s is being terminated, ignoring ' 'Service %s in that namespace.', service['metadata']['namespace'], service['metadata']['name']) return elif self._has_lbaas_spec_changes(service, loadbalancer_crd): self._update_crd_spec(loadbalancer_crd, service) def _is_supported_type(self, service): spec = service['spec'] return spec.get('type') in SUPPORTED_SERVICE_TYPES def _has_spec_annotation(self, service): return (k_const.K8S_ANNOTATION_LBAAS_SPEC in service['metadata'].get('annotations', {})) def _get_service_ip(self, service): if self._is_supported_type(service): return service['spec'].get('clusterIP') return None def _should_ignore(self, service): if not self._has_clusterip(service): return 'Skipping headless Service %s.' if not self._is_supported_type(service): return 'Skipping service %s of unsupported type.' if self._has_spec_annotation(service): return ('Skipping annotated service %s, waiting for it to be ' 'converted to KuryrLoadBalancer object and annotation ' 'removed.') if utils.is_kubernetes_default_resource(service): # Avoid to handle default Kubernetes service as requires https. return 'Skipping default service %s.' return None def _patch_service_finalizer(self, service): k8s = clients.get_kubernetes_client() return k8s.add_finalizer(service, k_const.SERVICE_FINALIZER) def on_finalize(self, service): k8s = clients.get_kubernetes_client() svc_name = service['metadata']['name'] svc_namespace = service['metadata']['namespace'] klb_crd_path = (f"{k_const.K8S_API_CRD_NAMESPACES}/" f"{svc_namespace}/kuryrloadbalancers/{svc_name}") try: k8s.delete(klb_crd_path) except k_exc.K8sResourceNotFound: k8s.remove_finalizer(service, k_const.SERVICE_FINALIZER) def _has_clusterip(self, service): # ignore headless service, clusterIP is None return service['spec'].get('clusterIP') != 'None' def _get_subnet_id(self, service, project_id, ip): subnets_mapping = self._drv_subnets.get_subnets(service, project_id) subnet_ids = { subnet_id for subnet_id, network in subnets_mapping.items() for subnet in network.subnets.objects if ip in subnet.cidr} if len(subnet_ids) != 1: raise k_exc.IntegrityError(_( "Found %(num)s subnets for service %(link)s IP %(ip)s") % { 'link': utils.get_res_link(service), 'ip': ip, 'num': len(subnet_ids)}) return subnet_ids.pop() def create_crd_spec(self, service): svc_name = service['metadata']['name'] svc_namespace = service['metadata']['namespace'] kubernetes = clients.get_kubernetes_client() spec = self._build_kuryrloadbalancer_spec(service) loadbalancer_crd = { 'apiVersion': 'openstack.org/v1', 'kind': 'KuryrLoadBalancer', 'metadata': { 'name': svc_name, 'finalizers': [k_const.KURYRLB_FINALIZER], }, 'spec': spec, 'status': { } } try: kubernetes.post('{}/{}/kuryrloadbalancers'.format( k_const.K8S_API_CRD_NAMESPACES, svc_namespace), loadbalancer_crd) except k_exc.K8sConflict: raise k_exc.ResourceNotReady(svc_name) except k_exc.K8sNamespaceTerminating: raise except k_exc.K8sClientException: LOG.exception("Exception when creating KuryrLoadBalancer CRD.") raise def _update_crd_spec(self, loadbalancer_crd, service): svc_name = service['metadata']['name'] kubernetes = clients.get_kubernetes_client() spec = self._build_kuryrloadbalancer_spec(service) LOG.debug('Patching KuryrLoadBalancer CRD %s', loadbalancer_crd) try: kubernetes.patch_crd('spec', utils.get_res_link(loadbalancer_crd), spec) except k_exc.K8sResourceNotFound: LOG.debug('KuryrLoadBalancer CRD not found %s', loadbalancer_crd) except k_exc.K8sConflict: raise k_exc.ResourceNotReady(svc_name) except k_exc.K8sClientException: LOG.exception('Error updating kuryrnet CRD %s', loadbalancer_crd) raise def _build_kuryrloadbalancer_spec(self, service): svc_ip = self._get_service_ip(service) spec_lb_ip = service['spec'].get('loadBalancerIP') ports = service['spec'].get('ports') for port in ports: if type(port['targetPort']) == int: port['targetPort'] = str(port['targetPort']) project_id = self._drv_project.get_project(service) sg_ids = self._drv_sg.get_security_groups(service, project_id) subnet_id = self._get_subnet_id(service, project_id, svc_ip) spec_type = service['spec'].get('type') spec = { 'ip': svc_ip, 'ports': ports, 'project_id': project_id, 'security_groups_ids': sg_ids, 'subnet_id': subnet_id, 'type': spec_type } if spec_lb_ip is not None: spec['lb_ip'] = spec_lb_ip return spec def _has_lbaas_spec_changes(self, service, loadbalancer_crd): return (self._has_ip_changes(service, loadbalancer_crd) or utils.has_port_changes(service, loadbalancer_crd)) def _has_ip_changes(self, service, loadbalancer_crd): link = utils.get_res_link(service) svc_ip = self._get_service_ip(service) if loadbalancer_crd['spec'].get('ip') is None: if svc_ip is None: return False return True elif str(loadbalancer_crd['spec'].get('ip')) != svc_ip: LOG.debug("LBaaS spec IP %(spec_ip)s != %(svc_ip)s for %(link)s" % {'spec_ip': loadbalancer_crd['spec']['ip'], 'svc_ip': svc_ip, 'link': link}) return True return False class EndpointsHandler(k8s_base.ResourceEventHandler): """EndpointsHandler handles K8s Endpoints events. EndpointsHandler handles K8s Endpoints events and tracks changes in LBaaSServiceSpec to update Neutron LBaaS accordingly and to reflect its' actual state in LBaaSState. """ OBJECT_KIND = k_const.K8S_OBJ_ENDPOINTS OBJECT_WATCH_PATH = "%s/%s" % (k_const.K8S_API_BASE, "endpoints") def __init__(self): super(EndpointsHandler, self).__init__() self._drv_lbaas = drv_base.LBaaSDriver.get_instance() # Note(yboaron) LBaaS driver supports 'provider' parameter in # Load Balancer creation flow. # We need to set the requested load balancer provider # according to 'endpoints_driver_octavia_provider' configuration. self._lb_provider = None if self._drv_lbaas.providers_supported(): self._lb_provider = 'amphora' if (config.CONF.kubernetes.endpoints_driver_octavia_provider != 'default'): self._lb_provider = ( config.CONF.kubernetes.endpoints_driver_octavia_provider) def on_present(self, endpoints): ep_name = endpoints['metadata']['name'] ep_namespace = endpoints['metadata']['namespace'] if self._move_annotations_to_crd(endpoints): return k8s = clients.get_kubernetes_client() loadbalancer_crd = k8s.get_loadbalancer_crd(endpoints) if (not (self._has_pods(endpoints) or (loadbalancer_crd and loadbalancer_crd.get('status'))) or k_const.K8S_ANNOTATION_HEADLESS_SERVICE in endpoints['metadata'].get('labels', []) or utils.is_kubernetes_default_resource(endpoints)): LOG.debug("Ignoring Kubernetes endpoints %s", endpoints['metadata']['name']) return if loadbalancer_crd is None: try: self._create_crd_spec(endpoints) except k_exc.K8sNamespaceTerminating: LOG.warning('Namespace %s is being terminated, ignoring ' 'Endpoints %s in that namespace.', ep_namespace, ep_name) return else: self._update_crd_spec(loadbalancer_crd, endpoints) def _has_pods(self, endpoints): ep_subsets = endpoints.get('subsets', []) if not ep_subsets: return False return any(True for subset in ep_subsets if subset.get('addresses', [])) def _convert_subsets_to_endpointslice(self, endpoints_obj): endpointslices = [] endpoints = [] subsets = endpoints_obj.get('subsets', []) for subset in subsets: addresses = subset.get('addresses', []) ports = subset.get('ports', []) for address in addresses: ip = address.get('ip') targetRef = address.get('targetRef') endpoint = { 'addresses': [ip], 'conditions': { 'ready': True }, } if targetRef: endpoint['targetRef'] = targetRef endpoints.append(endpoint) endpointslices.append({ 'endpoints': endpoints, 'ports': ports, }) return endpointslices def _create_crd_spec(self, endpoints, spec=None, status=None): endpoints_name = endpoints['metadata']['name'] namespace = endpoints['metadata']['namespace'] kubernetes = clients.get_kubernetes_client() # TODO(maysams): Remove the convertion once we start handling # Endpoint slices. epslices = self._convert_subsets_to_endpointslice(endpoints) if not status: status = {} if not spec: spec = {'endpointSlices': epslices} # NOTE(maysams): As the spec may already contain a # ports field from the Service, a new endpointslice # field is introduced to also hold ports from the # Endpoints under the spec. loadbalancer_crd = { 'apiVersion': 'openstack.org/v1', 'kind': 'KuryrLoadBalancer', 'metadata': { 'name': endpoints_name, 'finalizers': [k_const.KURYRLB_FINALIZER], }, 'spec': spec, 'status': status, } if self._lb_provider: loadbalancer_crd['spec']['provider'] = self._lb_provider try: kubernetes.post('{}/{}/kuryrloadbalancers'.format( k_const.K8S_API_CRD_NAMESPACES, namespace), loadbalancer_crd) except k_exc.K8sConflict: raise k_exc.ResourceNotReady(loadbalancer_crd) except k_exc.K8sNamespaceTerminating: raise except k_exc.K8sClientException: LOG.exception("Exception when creating KuryrLoadBalancer CRD.") raise def _update_crd_spec(self, loadbalancer_crd, endpoints): kubernetes = clients.get_kubernetes_client() # TODO(maysams): Remove the convertion once we start handling # Endpoint slices. epslices = self._convert_subsets_to_endpointslice(endpoints) spec = {'endpointSlices': epslices} if self._lb_provider: spec['provider'] = self._lb_provider try: kubernetes.patch_crd( 'spec', utils.get_res_link(loadbalancer_crd), spec) except k_exc.K8sResourceNotFound: LOG.debug('KuryrLoadbalancer CRD not found %s', loadbalancer_crd) except k_exc.K8sConflict: raise k_exc.ResourceNotReady(loadbalancer_crd) except k_exc.K8sClientException: LOG.exception('Error updating KuryrLoadbalancer CRD %s', loadbalancer_crd) raise def _move_annotations_to_crd(self, endpoints): """Support upgrade from annotations to KuryrLoadBalancer CRD.""" try: spec = (endpoints['metadata']['annotations'] [k_const.K8S_ANNOTATION_LBAAS_SPEC]) except KeyError: spec = None try: state = (endpoints['metadata']['annotations'] [k_const.K8S_ANNOTATION_LBAAS_STATE]) except KeyError: state = None if not state and not spec: # No annotations, return return False if state or spec: if state: _dict = jsonutils.loads(state) # This is strongly using the fact that annotation's o.vo # and CRD has the same structure. state = obj_lbaas.flatten_object(_dict) # Endpoints should always have the spec in the annotation spec_dict = jsonutils.loads(spec) spec = obj_lbaas.flatten_object(spec_dict) if state and state['service_pub_ip_info'] is None: del state['service_pub_ip_info'] for spec_port in spec['ports']: if not spec_port.get('name'): del spec_port['name'] if not spec['lb_ip']: del spec['lb_ip'] try: self._create_crd_spec(endpoints, spec, state) except k_exc.ResourceNotReady: LOG.info('KuryrLoadBalancer CRD %s already exists.', utils.get_res_unique_name(endpoints)) except k_exc.K8sClientException: raise k_exc.ResourceNotReady(endpoints) # In this step we only need to make sure all annotations are # removed. It may happen that the Endpoints only had spec set, # in which case we just remove it and let the normal flow handle # creation of the LB. k8s = clients.get_kubernetes_client() service_link = utils.get_service_link(endpoints) to_remove = [ (utils.get_res_link(endpoints), k_const.K8S_ANNOTATION_LBAAS_SPEC), (service_link, k_const.K8S_ANNOTATION_LBAAS_SPEC), ] if state: to_remove.append((utils.get_res_link(endpoints), k_const.K8S_ANNOTATION_LBAAS_STATE)) for path, name in to_remove: try: k8s.remove_annotations(path, name) except k_exc.K8sClientException: LOG.warning('Error removing %s annotation from %s', name, path) return True