diff --git a/kuryr_kubernetes/clients.py b/kuryr_kubernetes/clients.py index 893324890..cb5207193 100644 --- a/kuryr_kubernetes/clients.py +++ b/kuryr_kubernetes/clients.py @@ -50,7 +50,7 @@ def get_loadbalancer_client(): return get_openstacksdk().load_balancer -def get_kubernetes_client(): +def get_kubernetes_client() -> k8s_client.K8sClient: return _clients[_KUBERNETES_CLIENT] diff --git a/kuryr_kubernetes/controller/handlers/lbaas.py b/kuryr_kubernetes/controller/handlers/lbaas.py index 543ccfbd1..046897785 100644 --- a/kuryr_kubernetes/controller/handlers/lbaas.py +++ b/kuryr_kubernetes/controller/handlers/lbaas.py @@ -26,6 +26,7 @@ from kuryr_kubernetes.handlers import k8s_base from kuryr_kubernetes import utils LOG = logging.getLogger(__name__) +CONF = config.CONF SUPPORTED_SERVICE_TYPES = ('ClusterIP', 'LoadBalancer') @@ -45,6 +46,15 @@ class ServiceHandler(k8s_base.ResourceEventHandler): self._drv_project = drv_base.ServiceProjectDriver.get_instance() self._drv_subnets = drv_base.ServiceSubnetsDriver.get_instance() self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance() + self._drv_lbaas = drv_base.LBaaSDriver.get_instance() + self.k8s = clients.get_kubernetes_client() + + self._lb_provider = None + if self._drv_lbaas.providers_supported(): + self._lb_provider = 'amphora' + config_provider = CONF.kubernetes.endpoints_driver_octavia_provider + if config_provider != 'default': + self._lb_provider = config_provider def _bump_network_policies(self, svc): if driver_utils.is_network_policy_enabled(): @@ -53,16 +63,21 @@ class ServiceHandler(k8s_base.ResourceEventHandler): def on_present(self, service, *args, **kwargs): reason = self._should_ignore(service) if reason: - LOG.debug(reason, service['metadata']['name']) + reason %= utils.get_res_unique_name(service) + LOG.debug(reason) + self.k8s.add_event(service, 'KuryrServiceSkipped', reason) return - k8s = clients.get_kubernetes_client() - loadbalancer_crd = k8s.get_loadbalancer_crd(service) + loadbalancer_crd = self.k8s.get_loadbalancer_crd(service) try: if not self._patch_service_finalizer(service): return except k_exc.K8sClientException as ex: - LOG.exception("Failed to set service finalizer: %s", ex) + msg = (f'K8s API error when adding finalizer to Service ' + f'{utils.get_res_unique_name(service)}') + LOG.exception(msg) + self.k8s.add_event(service, 'KuryrAddServiceFinalizerError', + f'{msg}: {ex}', 'Warning') raise if loadbalancer_crd is None: @@ -108,20 +123,17 @@ class ServiceHandler(k8s_base.ResourceEventHandler): return None def _patch_service_finalizer(self, service): - k8s = clients.get_kubernetes_client() - return k8s.add_finalizer(service, k_const.SERVICE_FINALIZER) + return self.k8s.add_finalizer(service, k_const.SERVICE_FINALIZER) def on_finalize(self, service, *args, **kwargs): - k8s = clients.get_kubernetes_client() - klb_crd_path = utils.get_klb_crd_path(service) # Bump all the NPs in the namespace to force SG rules # recalculation. self._bump_network_policies(service) try: - k8s.delete(klb_crd_path) + self.k8s.delete(klb_crd_path) except k_exc.K8sResourceNotFound: - k8s.remove_finalizer(service, k_const.SERVICE_FINALIZER) + self.k8s.remove_finalizer(service, k_const.SERVICE_FINALIZER) def _has_clusterip(self, service): # ignore headless service, clusterIP is None @@ -149,17 +161,25 @@ class ServiceHandler(k8s_base.ResourceEventHandler): svc_namespace = service['metadata']['namespace'] kubernetes = clients.get_kubernetes_client() spec = self._build_kuryrloadbalancer_spec(service) + + owner_reference = { + 'apiVersion': service['apiVersion'], + 'kind': service['kind'], + 'name': service['metadata']['name'], + 'uid': service['metadata']['uid'], + } + loadbalancer_crd = { 'apiVersion': 'openstack.org/v1', 'kind': 'KuryrLoadBalancer', 'metadata': { 'name': svc_name, 'finalizers': [k_const.KURYRLB_FINALIZER], - }, + 'ownerReferences': [owner_reference], + }, 'spec': spec, - 'status': { - } - } + 'status': {}, + } try: kubernetes.post('{}/{}/kuryrloadbalancers'.format( @@ -169,8 +189,12 @@ class ServiceHandler(k8s_base.ResourceEventHandler): raise k_exc.ResourceNotReady(svc_name) except k_exc.K8sNamespaceTerminating: raise - except k_exc.K8sClientException: + except k_exc.K8sClientException as e: LOG.exception("Exception when creating KuryrLoadBalancer CRD.") + self.k8s.add_event( + service, 'CreateKLBFailed', + 'Error when creating KuryrLoadBalancer object: %s' % e, + 'Warning') raise def _update_crd_spec(self, loadbalancer_crd, service): @@ -185,13 +209,17 @@ class ServiceHandler(k8s_base.ResourceEventHandler): LOG.debug('KuryrLoadBalancer CRD not found %s', loadbalancer_crd) except k_exc.K8sConflict: raise k_exc.ResourceNotReady(svc_name) - except k_exc.K8sClientException: + except k_exc.K8sClientException as e: LOG.exception('Error updating kuryrnet CRD %s', loadbalancer_crd) + self.k8s.add_event( + service, 'UpdateKLBFailed', + 'Error when updating KuryrLoadBalancer object: %s' % e, + 'Warning') raise def _get_data_timeout_annotation(self, service): - default_timeout_cli = config.CONF.octavia_defaults.timeout_client_data - default_timeout_mem = config.CONF.octavia_defaults.timeout_member_data + default_timeout_cli = CONF.octavia_defaults.timeout_client_data + default_timeout_mem = CONF.octavia_defaults.timeout_member_data try: annotations = service['metadata']['annotations'] except KeyError: @@ -220,13 +248,16 @@ class ServiceHandler(k8s_base.ResourceEventHandler): subnet_id = self._get_subnet_id(service, project_id, svc_ip) spec_type = service['spec'].get('type') spec = { - 'ip': svc_ip, - 'ports': ports, - 'project_id': project_id, - 'security_groups_ids': sg_ids, - 'subnet_id': subnet_id, - 'type': spec_type - } + 'ip': svc_ip, + 'ports': ports, + 'project_id': project_id, + 'security_groups_ids': sg_ids, + 'subnet_id': subnet_id, + 'type': spec_type, + } + + if self._lb_provider: + spec['provider'] = self._lb_provider if spec_lb_ip is not None: spec['lb_ip'] = spec_lb_ip @@ -288,25 +319,13 @@ class EndpointsHandler(k8s_base.ResourceEventHandler): def __init__(self): super(EndpointsHandler, self).__init__() - self._drv_lbaas = drv_base.LBaaSDriver.get_instance() - # Note(yboaron) LBaaS driver supports 'provider' parameter in - # Load Balancer creation flow. - # We need to set the requested load balancer provider - # according to 'endpoints_driver_octavia_provider' configuration. - self._lb_provider = None - if self._drv_lbaas.providers_supported(): - self._lb_provider = 'amphora' - if (config.CONF.kubernetes.endpoints_driver_octavia_provider - != 'default'): - self._lb_provider = ( - config.CONF.kubernetes.endpoints_driver_octavia_provider) + self.k8s = clients.get_kubernetes_client() def on_present(self, endpoints, *args, **kwargs): ep_name = endpoints['metadata']['name'] ep_namespace = endpoints['metadata']['namespace'] - k8s = clients.get_kubernetes_client() - loadbalancer_crd = k8s.get_loadbalancer_crd(endpoints) + loadbalancer_crd = self.k8s.get_loadbalancer_crd(endpoints) if (not (self._has_pods(endpoints) or (loadbalancer_crd and loadbalancer_crd.get('status'))) @@ -318,15 +337,14 @@ class EndpointsHandler(k8s_base.ResourceEventHandler): return if loadbalancer_crd is None: + raise k_exc.KuryrLoadBalancerNotCreated(endpoints) + else: try: - self._create_crd_spec(endpoints) + self._update_crd_spec(loadbalancer_crd, endpoints) except k_exc.K8sNamespaceTerminating: LOG.warning('Namespace %s is being terminated, ignoring ' 'Endpoints %s in that namespace.', ep_namespace, ep_name) - return - else: - self._update_crd_spec(loadbalancer_crd, endpoints) def on_deleted(self, endpoints, *args, **kwargs): self._remove_endpoints(endpoints) @@ -365,84 +383,52 @@ class EndpointsHandler(k8s_base.ResourceEventHandler): return endpointslices - def _create_crd_spec(self, endpoints, spec=None, status=None): - endpoints_name = endpoints['metadata']['name'] - namespace = endpoints['metadata']['namespace'] - kubernetes = clients.get_kubernetes_client() - - # TODO(maysams): Remove the convertion once we start handling - # Endpoint slices. - epslices = self._convert_subsets_to_endpointslice(endpoints) - if not status: - status = {} - if not spec: - spec = {'endpointSlices': epslices} - - # NOTE(maysams): As the spec may already contain a - # ports field from the Service, a new endpointslice - # field is introduced to also hold ports from the - # Endpoints under the spec. - loadbalancer_crd = { - 'apiVersion': 'openstack.org/v1', - 'kind': 'KuryrLoadBalancer', - 'metadata': { - 'name': endpoints_name, - 'finalizers': [k_const.KURYRLB_FINALIZER], - }, - 'spec': spec, - 'status': status, - } - - if self._lb_provider: - loadbalancer_crd['spec']['provider'] = self._lb_provider - + def _add_event(self, endpoints, reason, message, type_=None): + """_add_event adds an event for the corresponding Service.""" try: - kubernetes.post('{}/{}/kuryrloadbalancers'.format( - k_const.K8S_API_CRD_NAMESPACES, namespace), loadbalancer_crd) - except k_exc.K8sConflict: - raise k_exc.ResourceNotReady(loadbalancer_crd) - except k_exc.K8sNamespaceTerminating: - raise + service = self.k8s.get(utils.get_service_link(endpoints)) except k_exc.K8sClientException: - LOG.exception("Exception when creating KuryrLoadBalancer CRD.") - raise + LOG.debug('Error when fetching Service to add an event %s, ' + 'ignoring', utils.get_res_unique_name(endpoints)) + return + kwargs = {'type_': type_} if type_ else {} + self.k8s.add_event(service, reason, message, **kwargs) def _update_crd_spec(self, loadbalancer_crd, endpoints): - kubernetes = clients.get_kubernetes_client() - # TODO(maysams): Remove the convertion once we start handling - # Endpoint slices. + # TODO(maysams): Remove the conversion once we start handling + # EndpointSlices. epslices = self._convert_subsets_to_endpointslice(endpoints) - spec = {'endpointSlices': epslices} - if self._lb_provider: - spec['provider'] = self._lb_provider try: - kubernetes.patch_crd( - 'spec', - utils.get_res_link(loadbalancer_crd), - spec) + self.k8s.patch_crd('spec', utils.get_res_link(loadbalancer_crd), + {'endpointSlices': epslices}) except k_exc.K8sResourceNotFound: LOG.debug('KuryrLoadbalancer CRD not found %s', loadbalancer_crd) except k_exc.K8sConflict: raise k_exc.ResourceNotReady(loadbalancer_crd) - except k_exc.K8sClientException: + except k_exc.K8sClientException as e: LOG.exception('Error updating KuryrLoadbalancer CRD %s', loadbalancer_crd) + self._add_event( + endpoints, 'UpdateKLBFailed', + 'Error when updating KuryrLoadBalancer object: %s' % e, + 'Warning') raise return True def _remove_endpoints(self, endpoints): - kubernetes = clients.get_kubernetes_client() lb_name = endpoints['metadata']['name'] try: - kubernetes.patch_crd('spec', - utils.get_klb_crd_path(endpoints), - 'endpointSlices', - action='remove') + self.k8s.patch_crd('spec', utils.get_klb_crd_path(endpoints), + 'endpointSlices', action='remove') except k_exc.K8sResourceNotFound: LOG.debug('KuryrLoadBalancer CRD not found %s', lb_name) except k_exc.K8sUnprocessableEntity: LOG.warning('KuryrLoadBalancer %s modified, ignoring.', lb_name) - except k_exc.K8sClientException: + except k_exc.K8sClientException as e: LOG.exception('Error updating KuryrLoadBalancer CRD %s', lb_name) + self._add_event( + endpoints, 'UpdateKLBFailed', + 'Error when updating KuryrLoadBalancer object: %s' % e, + 'Warning') raise diff --git a/kuryr_kubernetes/controller/handlers/loadbalancer.py b/kuryr_kubernetes/controller/handlers/loadbalancer.py index 5d955ab5b..1c52b422c 100644 --- a/kuryr_kubernetes/controller/handlers/loadbalancer.py +++ b/kuryr_kubernetes/controller/handlers/loadbalancer.py @@ -53,26 +53,56 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): self._drv_svc_project = drv_base.ServiceProjectDriver.get_instance() self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance() self._drv_nodes_subnets = drv_base.NodesSubnetsDriver.get_instance() + self.k8s = clients.get_kubernetes_client() def _get_nodes_subnets(self): return utils.get_subnets_id_cidrs( self._drv_nodes_subnets.get_nodes_subnets()) + def _add_event(self, klb, reason, message, type_=None): + """_add_event adds an event for the corresponding Service.""" + klb_meta = klb['metadata'] + for ref in klb_meta.get('ownerReferences', []): + # "mock" a Service based on ownerReference to it. + if ref['kind'] == 'Service' and ref['name'] == klb_meta['name']: + service = { + 'apiVersion': ref['apiVersion'], + 'kind': ref['kind'], + 'metadata': { + 'name': ref['name'], + 'uid': ref['uid'], + 'namespace': klb_meta['namespace'], # ref shares ns + }, + } + break + else: + # No reference, just fetch the service from the API. + try: + service = self.k8s.get( + f"{k_const.K8S_API_NAMESPACES}/{klb_meta['namespace']}" + f"/services/{klb_meta['name']}") + except k_exc.K8sClientException: + LOG.debug('Error when fetching Service to add an event %s, ' + 'ignoring', utils.get_res_unique_name(klb)) + return + kwargs = {'type_': type_} if type_ else {} + self.k8s.add_event(service, reason, message, **kwargs) + def on_present(self, loadbalancer_crd, *args, **kwargs): if loadbalancer_crd.get('status', None) is None: - kubernetes = clients.get_kubernetes_client() try: - kubernetes.patch_crd('status', - utils.get_res_link(loadbalancer_crd), - {}) + self.k8s.patch_crd('status', + utils.get_res_link(loadbalancer_crd), {}) except k_exc.K8sResourceNotFound: LOG.debug('KuryrLoadbalancer CRD not found %s', utils.get_res_unique_name(loadbalancer_crd)) return - if self._should_ignore(loadbalancer_crd): - LOG.debug("Ignoring Kubernetes service %s", - loadbalancer_crd['metadata']['name']) + reason = self._should_ignore(loadbalancer_crd) + if reason: + reason %= utils.get_res_unique_name(loadbalancer_crd) + LOG.debug(reason) + self._add_event(loadbalancer_crd, 'KuryrServiceSkipped', reason) return crd_lb = loadbalancer_crd['status'].get('loadbalancer') @@ -83,15 +113,34 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): if not lb_provider or lb_provider in OCTAVIA_DEFAULT_PROVIDERS: if (spec_lb_provider and spec_lb_provider not in OCTAVIA_DEFAULT_PROVIDERS): + self._add_event(loadbalancer_crd, 'KuryrUpdateProvider', + 'Deleting Amphora load balancer to ' + 'recreate it with OVN provider') self._ensure_release_lbaas(loadbalancer_crd) # ovn to amphora downgrade elif lb_provider and lb_provider not in OCTAVIA_DEFAULT_PROVIDERS: if (not spec_lb_provider or spec_lb_provider in OCTAVIA_DEFAULT_PROVIDERS): + self._add_event(loadbalancer_crd, 'KuryrUpdateProvider', + 'Deleting OVN load balancer to ' + 'recreate it with Amphora provider') self._ensure_release_lbaas(loadbalancer_crd) - if self._sync_lbaas_members(loadbalancer_crd): + if not crd_lb: + self._add_event(loadbalancer_crd, 'KuryrEnsureLB', + 'Provisioning a load balancer') + try: + changed = self._sync_lbaas_members(loadbalancer_crd) + except Exception as e: + self._add_event( + loadbalancer_crd, 'KuryrEnsureLBError', + f'Error when provisioning load balancer: {e}', 'Warning') + raise + + if changed: + self._add_event(loadbalancer_crd, 'KuryrEnsuredLB', + 'Load balancer provisioned') # Note(yboaron) For LoadBalancer services, we should allocate FIP, # associate it to LB VIP and update K8S service status lb_ip = loadbalancer_crd['spec'].get('lb_ip') @@ -106,6 +155,9 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): loadbalancer_crd['status']['loadbalancer'][ 'port_id'])) if service_pub_ip_info: + self._add_event( + loadbalancer_crd, 'KuryrEnsureFIP', + 'Associating floating IP to the load balancer') self._drv_service_pub_ip.associate_pub_ip( service_pub_ip_info, loadbalancer_crd['status'][ 'loadbalancer']['port_id']) @@ -128,10 +180,12 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): def _trigger_loadbalancer_reconciliation(self, loadbalancer_crds): LOG.debug("Reconciling the loadbalancer CRDs") # get the loadbalancers id in the CRD status - crd_loadbalancer_ids = [{'id': loadbalancer_crd.get('status', {}).get( - 'loadbalancer', {}).get('id', {}), 'selflink': - utils.get_res_link(loadbalancer_crd)} for - loadbalancer_crd in loadbalancer_crds] + crd_loadbalancer_ids = [ + {'id': loadbalancer_crd.get('status', {}).get( + 'loadbalancer', {}).get('id', {}), + 'selflink': utils.get_res_link(loadbalancer_crd), + 'klb': loadbalancer_crd} for + loadbalancer_crd in loadbalancer_crds] lbaas = clients.get_loadbalancer_client() lbaas_spec = {} self._drv_lbaas.add_tags('loadbalancer', lbaas_spec) @@ -140,21 +194,24 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): for loadbalancer in loadbalancers] # for each loadbalancer id in the CRD status, check if exists in # OpenStack - crds_to_reconcile_selflink = [crd_lb['selflink'] for crd_lb in - crd_loadbalancer_ids if - crd_lb['id'] not in loadbalancers_id] - if not crds_to_reconcile_selflink: + crds_to_reconcile = [crd_lb for crd_lb in crd_loadbalancer_ids + if crd_lb['id'] not in loadbalancers_id] + if not crds_to_reconcile: LOG.debug("KuryrLoadBalancer CRDs already in sync with OpenStack") return LOG.debug("Reconciling the following KuryrLoadBalancer CRDs: %r", - crds_to_reconcile_selflink) - self._reconcile_lbaas(crds_to_reconcile_selflink) + [klb['id'] for klb in crds_to_reconcile]) + self._reconcile_lbaas(crds_to_reconcile) - def _reconcile_lbaas(self, crds_to_reconcile_selflink): - kubernetes = clients.get_kubernetes_client() - for selflink in crds_to_reconcile_selflink: + def _reconcile_lbaas(self, crds_to_reconcile): + for entry in crds_to_reconcile: + selflink = entry['selflink'] try: - kubernetes.patch_crd('status', selflink, {}) + self._add_event( + entry['klb'], 'LoadBalancerRecreating', + 'Load balancer for the Service seems to not exist anymore.' + ' Recreating it.', 'Warning') + self.k8s.patch_crd('status', selflink, {}) except k_exc.K8sResourceNotFound: LOG.debug('Unable to reconcile the KuryLoadBalancer CRD %s', selflink) @@ -165,9 +222,12 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): continue def _should_ignore(self, loadbalancer_crd): - return (not(self._has_endpoints(loadbalancer_crd) or - loadbalancer_crd.get('status')) or not - loadbalancer_crd['spec'].get('ip')) + if not(self._has_endpoints(loadbalancer_crd) or + loadbalancer_crd.get('status')): + return 'Skipping Service %s without Endpoints' + elif not loadbalancer_crd['spec'].get('ip'): + return 'Skipping Service %s without IP set yet' + return False def _has_endpoints(self, loadbalancer_crd): ep_slices = loadbalancer_crd['spec'].get('endpointSlices', []) @@ -179,9 +239,20 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): LOG.debug("Deleting the loadbalancer CRD") if loadbalancer_crd['status'] != {}: - # NOTE(ivc): deleting pool deletes its members - self._drv_lbaas.release_loadbalancer( - loadbalancer=loadbalancer_crd['status'].get('loadbalancer')) + self._add_event(loadbalancer_crd, 'KuryrReleaseLB', + 'Releasing the load balancer') + try: + # NOTE(ivc): deleting pool deletes its members + self._drv_lbaas.release_loadbalancer( + loadbalancer_crd['status'].get('loadbalancer')) + except Exception as e: + # FIXME(dulek): It seems like if loadbalancer will be stuck in + # PENDING_DELETE we'll just silently time out + # waiting for it to be deleted. Is that expected? + self._add_event( + loadbalancer_crd, 'KuryrReleaseLBError', + f'Error when releasing load balancer: {e}', 'Warning') + raise try: pub_info = loadbalancer_crd['status']['service_pub_ip_info'] @@ -189,43 +260,53 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): pub_info = None if pub_info: + self._add_event( + loadbalancer_crd, 'KuryrReleaseFIP', + 'Dissociating floating IP from the load balancer') self._drv_service_pub_ip.release_pub_ip( loadbalancer_crd['status']['service_pub_ip_info']) - kubernetes = clients.get_kubernetes_client() LOG.debug('Removing finalizer from KuryrLoadBalancer CRD %s', loadbalancer_crd) try: - kubernetes.remove_finalizer(loadbalancer_crd, - k_const.KURYRLB_FINALIZER) - except k_exc.K8sClientException: - LOG.exception('Error removing kuryrloadbalancer CRD finalizer ' - 'for %s', loadbalancer_crd) + self.k8s.remove_finalizer(loadbalancer_crd, + k_const.KURYRLB_FINALIZER) + except k_exc.K8sClientException as e: + msg = (f'K8s API error when removing finalizer from ' + f'KuryrLoadBalancer of Service ' + f'{utils.get_res_unique_name(loadbalancer_crd)}') + LOG.exception(msg) + self._add_event(loadbalancer_crd, 'KuryrRemoveLBFinalizerError', + f'{msg}: {e}', 'Warning') raise namespace = loadbalancer_crd['metadata']['namespace'] name = loadbalancer_crd['metadata']['name'] try: - service = kubernetes.get(f"{k_const.K8S_API_NAMESPACES}" - f"/{namespace}/services/{name}") - except k_exc.K8sResourceNotFound as ex: - LOG.warning("Failed to get service: %s", ex) + service = self.k8s.get(f"{k_const.K8S_API_NAMESPACES}/{namespace}" + f"/services/{name}") + except k_exc.K8sResourceNotFound: + LOG.warning('Service %s not found. This is unexpected.', + utils.get_res_unique_name(loadbalancer_crd)) return - LOG.debug('Removing finalizer from service %s', - service["metadata"]["name"]) + LOG.debug('Removing finalizer from Service %s', + utils.get_res_unique_name(service)) try: - kubernetes.remove_finalizer(service, k_const.SERVICE_FINALIZER) - except k_exc.K8sClientException: - LOG.exception('Error removing service finalizer ' - 'for %s', service["metadata"]["name"]) + self.k8s.remove_finalizer(service, k_const.SERVICE_FINALIZER) + except k_exc.K8sClientException as e: + msg = (f'K8s API error when removing finalizer from Service ' + f'{utils.get_res_unique_name(service)}') + LOG.exception(msg) + self._add_event( + loadbalancer_crd, 'KuryrRemoveServiceFinalizerError', + f'{msg}: {e}', 'Warning') raise def _patch_status(self, loadbalancer_crd): - kubernetes = clients.get_kubernetes_client() try: - kubernetes.patch_crd('status', utils.get_res_link( - loadbalancer_crd), loadbalancer_crd['status']) + self.k8s.patch_crd('status', utils.get_res_link(loadbalancer_crd), + loadbalancer_crd['status']) except k_exc.K8sResourceNotFound: LOG.debug('KuryrLoadBalancer CRD not found %s', loadbalancer_crd) return False @@ -233,16 +314,20 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): LOG.warning('KuryrLoadBalancer %s modified, retrying later.', utils.get_res_unique_name(loadbalancer_crd)) return False - except k_exc.K8sClientException: - LOG.exception('Error updating KuryLoadbalancer CRD %s', - loadbalancer_crd) + except k_exc.K8sClientException as e: + msg = (f'K8s API error when updating status of ' + f'{utils.get_res_unique_name(loadbalancer_crd)} Service ' + f'load balancer') + LOG.exception(msg) + self._add_event(loadbalancer_crd, 'KuryrUpdateLBStatusError', + f'{msg}: {e}', 'Warning') raise return True def _sync_lbaas_members(self, loadbalancer_crd): changed = False - if (self._remove_unused_members(loadbalancer_crd)): + if self._remove_unused_members(loadbalancer_crd): changed = True if self._sync_lbaas_pools(loadbalancer_crd): @@ -258,9 +343,8 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): lb = klb_crd['status'].get('loadbalancer') svc_name = klb_crd['metadata']['name'] svc_namespace = klb_crd['metadata']['namespace'] - k8s = clients.get_kubernetes_client() try: - service = k8s.get( + service = self.k8s.get( f'{k_const.K8S_API_NAMESPACES}/{svc_namespace}/' f'services/{svc_name}') except k_exc.K8sResourceNotFound: @@ -275,8 +359,9 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): lb['security_groups'] = lb_sgs try: - k8s.patch_crd('status/loadbalancer', utils.get_res_link(klb_crd), - {'security_groups': lb_sgs}) + self.k8s.patch_crd('status/loadbalancer', + utils.get_res_link(klb_crd), + {'security_groups': lb_sgs}) except k_exc.K8sResourceNotFound: LOG.debug('KuryrLoadBalancer %s not found', svc_name) return None @@ -284,8 +369,13 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): LOG.debug('KuryrLoadBalancer entity not processable ' 'due to missing loadbalancer field.') return None - except k_exc.K8sClientException: - LOG.exception('Error syncing KuryrLoadBalancer %s', svc_name) + except k_exc.K8sClientException as e: + msg = (f'K8s API error when updating SGs status of ' + f'{utils.get_res_unique_name(klb_crd)} Service load ' + f'balancer') + LOG.exception(msg) + self._add_event(klb_crd, 'KuryrUpdateLBStatusError', + f'{msg}: {e}', 'Warning') raise return klb_crd @@ -359,8 +449,14 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): target_pod, target_ip, loadbalancer_crd) if not member_subnet_id: - LOG.warning("Skipping member creation for %s", - target_ip) + msg = ( + f'Unable to determine ID of the subnet of member ' + f'{target_ip} for service ' + f'{utils.get_res_unique_name(loadbalancer_crd)}. ' + f'Skipping its creation') + self._add_event(loadbalancer_crd, 'KuryrSkipMember', + msg, 'Warning') + LOG.warning(msg) continue target_name, target_namespace = self._get_target_info( @@ -617,8 +713,6 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): for port_spec in lbaas_spec_ports: protocol = port_spec['protocol'] port = port_spec['port'] - name = "%s:%s" % (loadbalancer_crd['status']['loadbalancer'][ - 'name'], protocol) listener = [] for l in loadbalancer_crd['status'].get('listeners', []): @@ -638,12 +732,23 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): 'listeners', []) if l['port'] == port] if listener and not self._drv_lbaas.double_listeners_supported(): - LOG.warning("Skipping listener creation for %s as another one" - " already exists with port %s", name, port) + msg = ( + f'Octavia does not support multiple listeners listening ' + f'on the same port. Skipping creation of listener ' + f'{protocol}:{port} because {listener["protocol"]}:' + f'{listener["port"]} already exists for Service ' + f'{utils.get_res_unique_name(loadbalancer_crd)}') + self._add_event(loadbalancer_crd, 'KuryrSkipListener', msg, + 'Warning') + LOG.warning(msg) continue if protocol == "SCTP" and not self._drv_lbaas.sctp_supported(): - LOG.warning("Skipping listener creation as provider does" - " not support %s protocol", protocol) + msg = ( + f'Skipping listener {protocol}:{port} creation as Octavia ' + f'does not support {protocol} protocol.') + self._add_event(loadbalancer_crd, 'KuryrSkipListener', msg, + 'Warning') + LOG.warning(msg) continue listener = self._drv_lbaas.ensure_listener( loadbalancer=loadbalancer_crd['status'].get('loadbalancer'), @@ -697,17 +802,18 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): ns = lb_crd['metadata']['namespace'] status_data = {"loadBalancer": { "ingress": [{"ip": lb_ip_address.format()}]}} - k8s = clients.get_kubernetes_client() try: - k8s.patch("status", f"{k_const.K8S_API_NAMESPACES}" - f"/{ns}/services/{name}/status", - status_data) + self.k8s.patch("status", f"{k_const.K8S_API_NAMESPACES}" + f"/{ns}/services/{name}/status", + status_data) except k_exc.K8sConflict: raise k_exc.ResourceNotReady(name) - except k_exc.K8sClientException: - LOG.exception("Kubernetes Client Exception" - "when updating the svc status %s" - % name) + except k_exc.K8sClientException as e: + msg = (f'K8s API error when updating external FIP data of Service ' + f'{utils.get_res_unique_name(lb_crd)}') + LOG.exception(msg) + self._add_event(lb_crd, 'KuryrUpdateServiceStatusError', + f'{msg}: {e}', 'Warning') raise def _sync_lbaas_loadbalancer(self, loadbalancer_crd): @@ -754,29 +860,26 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): def _ensure_release_lbaas(self, loadbalancer_crd): attempts = 0 - deadline = 0 - retry = True timeout = config.CONF.kubernetes.watch_retry_timeout - while retry: + deadline = time.time() + timeout + while True: try: - if attempts == 1: - deadline = time.time() + timeout - if (attempts > 0 and - utils.exponential_sleep(deadline, attempts) == 0): - LOG.error("Failed releasing lbaas '%s': deadline exceeded", - loadbalancer_crd['status']['loadbalancer'][ - 'name']) + if not utils.exponential_sleep(deadline, attempts): + msg = (f'Timed out waiting for deletion of load balancer ' + f'{utils.get_res_unique_name(loadbalancer_crd)}') + self._add_event( + loadbalancer_crd, 'KuryrLBReleaseTimeout', msg, + 'Warning') + LOG.error(msg) return self._drv_lbaas.release_loadbalancer( - loadbalancer=loadbalancer_crd['status'].get('loadbalancer') - ) - retry = False + loadbalancer_crd['status'].get('loadbalancer')) + break except k_exc.ResourceNotReady: - LOG.debug("Attempt (%s) of loadbalancer release %s failed." + LOG.debug("Attempt %s to release LB %s failed." " A retry will be triggered.", attempts, - loadbalancer_crd['status']['loadbalancer']['name']) + utils.get_res_unique_name(loadbalancer_crd)) attempts += 1 - retry = True loadbalancer_crd['status'] = {} self._patch_status(loadbalancer_crd) diff --git a/kuryr_kubernetes/controller/handlers/pipeline.py b/kuryr_kubernetes/controller/handlers/pipeline.py index 384be01f0..d82b3064f 100644 --- a/kuryr_kubernetes/controller/handlers/pipeline.py +++ b/kuryr_kubernetes/controller/handlers/pipeline.py @@ -58,11 +58,13 @@ class ControllerPipeline(h_dis.EventPipeline): def _wrap_consumer(self, consumer): # TODO(ivc): tune retry interval/timeout - return h_log.LogExceptions(h_retry.Retry( - consumer, exceptions=( - exceptions.ResourceNotReady, - key_exc.connection.ConnectFailure, - requests_exc.ConnectionError))) + return h_log.LogExceptions( + h_retry.Retry( + consumer, + exceptions=(exceptions.ResourceNotReady, + key_exc.connection.ConnectFailure, + requests_exc.ConnectionError)), + ignore_exceptions=(exceptions.KuryrLoadBalancerNotCreated,)) def _wrap_dispatcher(self, dispatcher): return h_log.LogExceptions(h_async.Async(dispatcher, self._tg, diff --git a/kuryr_kubernetes/exceptions.py b/kuryr_kubernetes/exceptions.py index 75b708e67..b215d169d 100644 --- a/kuryr_kubernetes/exceptions.py +++ b/kuryr_kubernetes/exceptions.py @@ -42,6 +42,13 @@ class ResourceNotReady(Exception): super(ResourceNotReady, self).__init__("Resource not ready: %r" % msg) +class KuryrLoadBalancerNotCreated(Exception): + def __init__(self, res): + name = utils.get_res_unique_name(res) + super().__init__( + 'KuryrLoadBalancer not created yet for the Service %s' % name) + + class LoadBalancerNotReady(ResourceNotReady): def __init__(self, loadbalancer_id, status): super().__init__( diff --git a/kuryr_kubernetes/handlers/logging.py b/kuryr_kubernetes/handlers/logging.py index 451069520..4c11c9524 100644 --- a/kuryr_kubernetes/handlers/logging.py +++ b/kuryr_kubernetes/handlers/logging.py @@ -28,13 +28,16 @@ class LogExceptions(base.EventHandler): instead. """ - def __init__(self, handler, exceptions=Exception): + def __init__(self, handler, exceptions=Exception, ignore_exceptions=None): self._handler = handler self._exceptions = exceptions + self._ignore_exceptions = ignore_exceptions or () def __call__(self, event, *args, **kwargs): try: self._handler(event, *args, **kwargs) + except self._ignore_exceptions: + pass except self._exceptions as ex: # If exception comes from OpenStack SDK and contains # 'request_id' then print this 'request_id' along the Exception. diff --git a/kuryr_kubernetes/handlers/retry.py b/kuryr_kubernetes/handlers/retry.py index 96fc821f0..1ce123a87 100644 --- a/kuryr_kubernetes/handlers/retry.py +++ b/kuryr_kubernetes/handlers/retry.py @@ -97,7 +97,10 @@ class Retry(base.EventHandler): .get_instance()) method = getattr(exporter, cls_map[type(exc).__name__]) method() - + except exceptions.KuryrLoadBalancerNotCreated: + with excutils.save_and_reraise_exception() as ex: + if self._sleep(deadline, attempt, ex.value): + ex.reraise = False except os_exc.ConflictException as ex: if ex.details.startswith('Quota exceeded for resources'): with excutils.save_and_reraise_exception() as ex: diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py index 59283b48e..c7252265e 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py @@ -27,23 +27,6 @@ _SUPPORTED_LISTENER_PROT = ('HTTP', 'HTTPS', 'TCP') class TestServiceHandler(test_base.TestCase): - - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.ServiceSecurityGroupsDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.ServiceSubnetsDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.ServiceProjectDriver.get_instance') - def test_init(self, m_get_drv_project, m_get_drv_subnets, m_get_drv_sg): - m_get_drv_project.return_value = mock.sentinel.drv_project - m_get_drv_subnets.return_value = mock.sentinel.drv_subnets - m_get_drv_sg.return_value = mock.sentinel.drv_sg - handler = h_lbaas.ServiceHandler() - - self.assertEqual(mock.sentinel.drv_project, handler._drv_project) - self.assertEqual(mock.sentinel.drv_subnets, handler._drv_subnets) - self.assertEqual(mock.sentinel.drv_sg, handler._drv_sg) - @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') def test_on_present(self, get_k8s_client): svc_event = { @@ -114,6 +97,7 @@ class TestServiceHandler(test_base.TestCase): m_handler.create_crd_spec.return_value = new_spec m_handler._should_ignore.return_value = False m_handler._drv_project = m_drv_project + m_handler.k8s = mock.Mock() h_lbaas.ServiceHandler.on_present(m_handler, svc_event) m_handler.create_crd_spec(svc_event) @@ -179,6 +163,7 @@ class TestServiceHandler(test_base.TestCase): m_handler.create_crd_spec.return_value = old_spec m_handler._should_ignore.return_value = False m_handler._drv_project = m_drv_project + m_handler.k8s = mock.Mock() h_lbaas.ServiceHandler.on_present(m_handler, svc_event) m_handler.create_crd_spec(svc_event) @@ -418,51 +403,36 @@ class TestEndpointsHandler(test_base.TestCase): h_lbaas.EndpointsHandler.on_deleted(m_handler, self._ep) m_handler._remove_endpoints.assert_called_once_with(self._ep) - @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') @mock.patch('kuryr_kubernetes.utils.get_klb_crd_path') - def test__remove_endpoints(self, get_klb_crd_path, get_k8s_client): - k8s = mock.Mock() - get_k8s_client.return_value = k8s - h_lbaas.EndpointsHandler._remove_endpoints(self, self._ep) - k8s.patch_crd.assert_called_once_with('spec', - get_klb_crd_path(self._ep), - 'endpointSlices', - action='remove') + def test__remove_endpoints(self, get_klb_crd_path): + m_handler = mock.Mock() + h_lbaas.EndpointsHandler._remove_endpoints(m_handler, self._ep) + m_handler.k8s.patch_crd.assert_called_once_with( + 'spec', get_klb_crd_path(self._ep), 'endpointSlices', + action='remove') - @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') @mock.patch.object(logging.getLogger( 'kuryr_kubernetes.controller.handlers.lbaas'), 'debug') - def test__remove_endpoints_not_found(self, get_k8s_client, log): - k8s = mock.Mock() - get_k8s_client.return_value = k8s - h_lbaas.EndpointsHandler._remove_endpoints(self, self._ep) - - k8s.patch_crd.side_effect = k_exc.K8sResourceNotFound(self._ep) - + def test__remove_endpoints_not_found(self, log): + m_handler = mock.Mock() + m_handler.k8s.patch_crd.side_effect = k_exc.K8sResourceNotFound('foo') + h_lbaas.EndpointsHandler._remove_endpoints(m_handler, self._ep) log.assert_called_once() - @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') - def test__remove_endpoints_client_exception(self, get_k8s_client): - k8s = mock.Mock() - get_k8s_client.return_value = k8s - h_lbaas.EndpointsHandler._remove_endpoints(self, self._ep) - - k8s.patch_crd.side_effect = k_exc.K8sClientException(self._ep) - + def test__remove_endpoints_client_exception(self): + m_handler = mock.Mock() + m_handler.k8s.patch_crd.side_effect = k_exc.K8sClientException() self.assertRaises(k_exc.K8sClientException, h_lbaas.EndpointsHandler._remove_endpoints, - self, self._ep) + m_handler, self._ep) - @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') @mock.patch.object(logging.getLogger( 'kuryr_kubernetes.controller.handlers.lbaas'), 'warning') - def test__remove_endpoints_unprocessable_entity(self, get_k8s_client, log): - k8s = mock.Mock() - get_k8s_client.return_value = k8s - h_lbaas.EndpointsHandler._remove_endpoints(self, self._ep) - - k8s.patch_crd.side_effect = k_exc.K8sUnprocessableEntity(self._ep) - + def test__remove_endpoints_unprocessable_entity(self, log): + m_handler = mock.Mock() + m_handler.k8s.patch_crd.side_effect = k_exc.K8sUnprocessableEntity( + 'bar') + h_lbaas.EndpointsHandler._remove_endpoints(m_handler, self._ep) log.assert_called_once() diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py index 3df7303a4..ed0abb382 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py @@ -259,81 +259,6 @@ class FakeLBaaSDriver(drv_base.LBaaSDriver): @mock.patch('kuryr_kubernetes.utils.get_subnets_id_cidrs', mock.Mock(return_value=[('id', 'cidr')])) class TestKuryrLoadBalancerHandler(test_base.TestCase): - @mock.patch('kuryr_kubernetes.utils.get_subnet_cidr') - @mock.patch('kuryr_kubernetes.controller.drivers.base.' - 'ServiceProjectDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base.' - 'ServiceSecurityGroupsDriver.get_instance') - @mock.patch('kuryr_kubernetes.config.CONF') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.ServicePubIpDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.PodSubnetsDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.PodProjectDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.LBaaSDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.NodesSubnetsDriver.get_instance') - def test_init(self, m_get_drv_node_subnets, m_get_drv_lbaas, - m_get_drv_project, m_get_drv_subnets, - m_get_drv_service_pub_ip, m_cfg, m_get_svc_sg_drv, - m_get_svc_drv_project, m_get_cidr): - m_get_drv_lbaas.return_value = mock.sentinel.drv_lbaas - m_get_drv_project.return_value = mock.sentinel.drv_project - m_get_drv_subnets.return_value = mock.sentinel.drv_subnets - m_get_cidr.return_value = '10.0.0.128/26' - m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip - m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project - m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg - m_get_drv_node_subnets.return_value = mock.sentinel.drv_node_subnets - handler = h_lb.KuryrLoadBalancerHandler() - - self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas) - self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project) - self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets) - self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip) - self.assertEqual(mock.sentinel.drv_node_subnets, - handler._drv_nodes_subnets) - - @mock.patch('kuryr_kubernetes.utils.get_subnet_cidr') - @mock.patch('kuryr_kubernetes.controller.drivers.base.' - 'ServiceProjectDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base.' - 'ServiceSecurityGroupsDriver.get_instance') - @mock.patch('kuryr_kubernetes.config.CONF') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.ServicePubIpDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.PodSubnetsDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.PodProjectDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.LBaaSDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.NodesSubnetsDriver.get_instance') - def test_init_provider_ovn(self, m_get_drv_node_subnets, m_get_drv_lbaas, - m_get_drv_project, m_get_drv_subnets, - m_get_drv_service_pub_ip, m_cfg, - m_get_svc_sg_drv, m_get_svc_drv_project, - m_get_cidr): - m_get_cidr.return_value = '10.0.0.128/26' - m_get_drv_lbaas.return_value = mock.sentinel.drv_lbaas - m_get_drv_project.return_value = mock.sentinel.drv_project - m_get_drv_subnets.return_value = mock.sentinel.drv_subnets - m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip - m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project - m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg - m_get_drv_node_subnets.return_value = mock.sentinel.drv_node_subnets - handler = h_lb .KuryrLoadBalancerHandler() - - self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas) - self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project) - self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets) - self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip) - self.assertEqual(mock.sentinel.drv_node_subnets, - handler._drv_nodes_subnets) - def test_on_present(self): m_drv_service_pub_ip = mock.Mock() m_drv_service_pub_ip.acquire_service_pub_ip_info.return_value = None @@ -670,14 +595,16 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase): lbaas_spec = {} lbaas.load_balancers.return_value = [] - selflink = ['/apis/openstack.org/v1/namespaces/default/' - 'kuryrloadbalancers/test', - '/apis/openstack.org/v1/namespaces/default/' - 'kuryrloadbalancers/demo'] + expected_selflink = ['/apis/openstack.org/v1/namespaces/default/' + 'kuryrloadbalancers/test', + '/apis/openstack.org/v1/namespaces/default/' + 'kuryrloadbalancers/demo'] h_lb.KuryrLoadBalancerHandler._trigger_loadbalancer_reconciliation( m_handler, loadbalancer_crds) lbaas.load_balancers.assert_called_once_with(**lbaas_spec) - m_handler._reconcile_lbaas.assert_called_with(selflink) + selflink = [c['selflink'] for c + in m_handler._reconcile_lbaas.call_args[0][0]] + self.assertEqual(expected_selflink, selflink) @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') @mock.patch('kuryr_kubernetes.controller.drivers.base' diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py index 8b603067a..4bc017ec9 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py @@ -37,7 +37,8 @@ class TestControllerPipeline(test_base.TestCase): ret = pipeline._wrap_consumer(consumer) self.assertEqual(logging_handler, ret) - m_logging_type.assert_called_with(retry_handler) + m_logging_type.assert_called_with(retry_handler, + ignore_exceptions=mock.ANY) m_retry_type.assert_called_with(consumer, exceptions=mock.ANY) @mock.patch('kuryr_kubernetes.handlers.logging.LogExceptions')