From ca719a4009384b8c86f9e0561d809f573f7d8dab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Dulko?= Date: Fri, 3 Dec 2021 18:41:16 +0100 Subject: [PATCH] Add events for Services This commit implements adding Events for various things that may happen when Kuryr handles a Service - either incidents or just informative messages helpful to the user. As to add an Event it is required to have a uid of the object and in most of KuryrLoadBalancerHandler we don't have the Service representation, this commit implements populating ownerReferences of the KuryrLoadBalancer object with a reference to the Service. This has a major side effect that KLB will be garbage collected if corresponding service doesn't exist, which is probably a good thing (and we manage that ourselves using finalizers anyway). Another set of refactorings is to remove KLB creation from EndpointsHandler in order to stop it fighting with ServiceHandler over creation - EndpointsHandler cannot add ownerReference as it has no uid of the Service. Other refactorings related to the error messages are also included. Change-Id: I36b4d62e6fc7ace00909e9b1aa7681f6d59ca455 --- kuryr_kubernetes/clients.py | 2 +- kuryr_kubernetes/controller/handlers/lbaas.py | 184 ++++++------ .../controller/handlers/loadbalancer.py | 281 ++++++++++++------ .../controller/handlers/pipeline.py | 12 +- kuryr_kubernetes/exceptions.py | 7 + kuryr_kubernetes/handlers/logging.py | 5 +- kuryr_kubernetes/handlers/retry.py | 5 +- .../unit/controller/handlers/test_lbaas.py | 72 ++--- .../controller/handlers/test_loadbalancer.py | 87 +----- .../unit/controller/handlers/test_pipeline.py | 3 +- 10 files changed, 330 insertions(+), 328 deletions(-) diff --git a/kuryr_kubernetes/clients.py b/kuryr_kubernetes/clients.py index 893324890..cb5207193 100644 --- a/kuryr_kubernetes/clients.py +++ b/kuryr_kubernetes/clients.py @@ -50,7 +50,7 @@ def get_loadbalancer_client(): return get_openstacksdk().load_balancer -def get_kubernetes_client(): +def get_kubernetes_client() -> k8s_client.K8sClient: return _clients[_KUBERNETES_CLIENT] diff --git a/kuryr_kubernetes/controller/handlers/lbaas.py b/kuryr_kubernetes/controller/handlers/lbaas.py index 543ccfbd1..046897785 100644 --- a/kuryr_kubernetes/controller/handlers/lbaas.py +++ b/kuryr_kubernetes/controller/handlers/lbaas.py @@ -26,6 +26,7 @@ from kuryr_kubernetes.handlers import k8s_base from kuryr_kubernetes import utils LOG = logging.getLogger(__name__) +CONF = config.CONF SUPPORTED_SERVICE_TYPES = ('ClusterIP', 'LoadBalancer') @@ -45,6 +46,15 @@ class ServiceHandler(k8s_base.ResourceEventHandler): self._drv_project = drv_base.ServiceProjectDriver.get_instance() self._drv_subnets = drv_base.ServiceSubnetsDriver.get_instance() self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance() + self._drv_lbaas = drv_base.LBaaSDriver.get_instance() + self.k8s = clients.get_kubernetes_client() + + self._lb_provider = None + if self._drv_lbaas.providers_supported(): + self._lb_provider = 'amphora' + config_provider = CONF.kubernetes.endpoints_driver_octavia_provider + if config_provider != 'default': + self._lb_provider = config_provider def _bump_network_policies(self, svc): if driver_utils.is_network_policy_enabled(): @@ -53,16 +63,21 @@ class ServiceHandler(k8s_base.ResourceEventHandler): def on_present(self, service, *args, **kwargs): reason = self._should_ignore(service) if reason: - LOG.debug(reason, service['metadata']['name']) + reason %= utils.get_res_unique_name(service) + LOG.debug(reason) + self.k8s.add_event(service, 'KuryrServiceSkipped', reason) return - k8s = clients.get_kubernetes_client() - loadbalancer_crd = k8s.get_loadbalancer_crd(service) + loadbalancer_crd = self.k8s.get_loadbalancer_crd(service) try: if not self._patch_service_finalizer(service): return except k_exc.K8sClientException as ex: - LOG.exception("Failed to set service finalizer: %s", ex) + msg = (f'K8s API error when adding finalizer to Service ' + f'{utils.get_res_unique_name(service)}') + LOG.exception(msg) + self.k8s.add_event(service, 'KuryrAddServiceFinalizerError', + f'{msg}: {ex}', 'Warning') raise if loadbalancer_crd is None: @@ -108,20 +123,17 @@ class ServiceHandler(k8s_base.ResourceEventHandler): return None def _patch_service_finalizer(self, service): - k8s = clients.get_kubernetes_client() - return k8s.add_finalizer(service, k_const.SERVICE_FINALIZER) + return self.k8s.add_finalizer(service, k_const.SERVICE_FINALIZER) def on_finalize(self, service, *args, **kwargs): - k8s = clients.get_kubernetes_client() - klb_crd_path = utils.get_klb_crd_path(service) # Bump all the NPs in the namespace to force SG rules # recalculation. self._bump_network_policies(service) try: - k8s.delete(klb_crd_path) + self.k8s.delete(klb_crd_path) except k_exc.K8sResourceNotFound: - k8s.remove_finalizer(service, k_const.SERVICE_FINALIZER) + self.k8s.remove_finalizer(service, k_const.SERVICE_FINALIZER) def _has_clusterip(self, service): # ignore headless service, clusterIP is None @@ -149,17 +161,25 @@ class ServiceHandler(k8s_base.ResourceEventHandler): svc_namespace = service['metadata']['namespace'] kubernetes = clients.get_kubernetes_client() spec = self._build_kuryrloadbalancer_spec(service) + + owner_reference = { + 'apiVersion': service['apiVersion'], + 'kind': service['kind'], + 'name': service['metadata']['name'], + 'uid': service['metadata']['uid'], + } + loadbalancer_crd = { 'apiVersion': 'openstack.org/v1', 'kind': 'KuryrLoadBalancer', 'metadata': { 'name': svc_name, 'finalizers': [k_const.KURYRLB_FINALIZER], - }, + 'ownerReferences': [owner_reference], + }, 'spec': spec, - 'status': { - } - } + 'status': {}, + } try: kubernetes.post('{}/{}/kuryrloadbalancers'.format( @@ -169,8 +189,12 @@ class ServiceHandler(k8s_base.ResourceEventHandler): raise k_exc.ResourceNotReady(svc_name) except k_exc.K8sNamespaceTerminating: raise - except k_exc.K8sClientException: + except k_exc.K8sClientException as e: LOG.exception("Exception when creating KuryrLoadBalancer CRD.") + self.k8s.add_event( + service, 'CreateKLBFailed', + 'Error when creating KuryrLoadBalancer object: %s' % e, + 'Warning') raise def _update_crd_spec(self, loadbalancer_crd, service): @@ -185,13 +209,17 @@ class ServiceHandler(k8s_base.ResourceEventHandler): LOG.debug('KuryrLoadBalancer CRD not found %s', loadbalancer_crd) except k_exc.K8sConflict: raise k_exc.ResourceNotReady(svc_name) - except k_exc.K8sClientException: + except k_exc.K8sClientException as e: LOG.exception('Error updating kuryrnet CRD %s', loadbalancer_crd) + self.k8s.add_event( + service, 'UpdateKLBFailed', + 'Error when updating KuryrLoadBalancer object: %s' % e, + 'Warning') raise def _get_data_timeout_annotation(self, service): - default_timeout_cli = config.CONF.octavia_defaults.timeout_client_data - default_timeout_mem = config.CONF.octavia_defaults.timeout_member_data + default_timeout_cli = CONF.octavia_defaults.timeout_client_data + default_timeout_mem = CONF.octavia_defaults.timeout_member_data try: annotations = service['metadata']['annotations'] except KeyError: @@ -220,13 +248,16 @@ class ServiceHandler(k8s_base.ResourceEventHandler): subnet_id = self._get_subnet_id(service, project_id, svc_ip) spec_type = service['spec'].get('type') spec = { - 'ip': svc_ip, - 'ports': ports, - 'project_id': project_id, - 'security_groups_ids': sg_ids, - 'subnet_id': subnet_id, - 'type': spec_type - } + 'ip': svc_ip, + 'ports': ports, + 'project_id': project_id, + 'security_groups_ids': sg_ids, + 'subnet_id': subnet_id, + 'type': spec_type, + } + + if self._lb_provider: + spec['provider'] = self._lb_provider if spec_lb_ip is not None: spec['lb_ip'] = spec_lb_ip @@ -288,25 +319,13 @@ class EndpointsHandler(k8s_base.ResourceEventHandler): def __init__(self): super(EndpointsHandler, self).__init__() - self._drv_lbaas = drv_base.LBaaSDriver.get_instance() - # Note(yboaron) LBaaS driver supports 'provider' parameter in - # Load Balancer creation flow. - # We need to set the requested load balancer provider - # according to 'endpoints_driver_octavia_provider' configuration. - self._lb_provider = None - if self._drv_lbaas.providers_supported(): - self._lb_provider = 'amphora' - if (config.CONF.kubernetes.endpoints_driver_octavia_provider - != 'default'): - self._lb_provider = ( - config.CONF.kubernetes.endpoints_driver_octavia_provider) + self.k8s = clients.get_kubernetes_client() def on_present(self, endpoints, *args, **kwargs): ep_name = endpoints['metadata']['name'] ep_namespace = endpoints['metadata']['namespace'] - k8s = clients.get_kubernetes_client() - loadbalancer_crd = k8s.get_loadbalancer_crd(endpoints) + loadbalancer_crd = self.k8s.get_loadbalancer_crd(endpoints) if (not (self._has_pods(endpoints) or (loadbalancer_crd and loadbalancer_crd.get('status'))) @@ -318,15 +337,14 @@ class EndpointsHandler(k8s_base.ResourceEventHandler): return if loadbalancer_crd is None: + raise k_exc.KuryrLoadBalancerNotCreated(endpoints) + else: try: - self._create_crd_spec(endpoints) + self._update_crd_spec(loadbalancer_crd, endpoints) except k_exc.K8sNamespaceTerminating: LOG.warning('Namespace %s is being terminated, ignoring ' 'Endpoints %s in that namespace.', ep_namespace, ep_name) - return - else: - self._update_crd_spec(loadbalancer_crd, endpoints) def on_deleted(self, endpoints, *args, **kwargs): self._remove_endpoints(endpoints) @@ -365,84 +383,52 @@ class EndpointsHandler(k8s_base.ResourceEventHandler): return endpointslices - def _create_crd_spec(self, endpoints, spec=None, status=None): - endpoints_name = endpoints['metadata']['name'] - namespace = endpoints['metadata']['namespace'] - kubernetes = clients.get_kubernetes_client() - - # TODO(maysams): Remove the convertion once we start handling - # Endpoint slices. - epslices = self._convert_subsets_to_endpointslice(endpoints) - if not status: - status = {} - if not spec: - spec = {'endpointSlices': epslices} - - # NOTE(maysams): As the spec may already contain a - # ports field from the Service, a new endpointslice - # field is introduced to also hold ports from the - # Endpoints under the spec. - loadbalancer_crd = { - 'apiVersion': 'openstack.org/v1', - 'kind': 'KuryrLoadBalancer', - 'metadata': { - 'name': endpoints_name, - 'finalizers': [k_const.KURYRLB_FINALIZER], - }, - 'spec': spec, - 'status': status, - } - - if self._lb_provider: - loadbalancer_crd['spec']['provider'] = self._lb_provider - + def _add_event(self, endpoints, reason, message, type_=None): + """_add_event adds an event for the corresponding Service.""" try: - kubernetes.post('{}/{}/kuryrloadbalancers'.format( - k_const.K8S_API_CRD_NAMESPACES, namespace), loadbalancer_crd) - except k_exc.K8sConflict: - raise k_exc.ResourceNotReady(loadbalancer_crd) - except k_exc.K8sNamespaceTerminating: - raise + service = self.k8s.get(utils.get_service_link(endpoints)) except k_exc.K8sClientException: - LOG.exception("Exception when creating KuryrLoadBalancer CRD.") - raise + LOG.debug('Error when fetching Service to add an event %s, ' + 'ignoring', utils.get_res_unique_name(endpoints)) + return + kwargs = {'type_': type_} if type_ else {} + self.k8s.add_event(service, reason, message, **kwargs) def _update_crd_spec(self, loadbalancer_crd, endpoints): - kubernetes = clients.get_kubernetes_client() - # TODO(maysams): Remove the convertion once we start handling - # Endpoint slices. + # TODO(maysams): Remove the conversion once we start handling + # EndpointSlices. epslices = self._convert_subsets_to_endpointslice(endpoints) - spec = {'endpointSlices': epslices} - if self._lb_provider: - spec['provider'] = self._lb_provider try: - kubernetes.patch_crd( - 'spec', - utils.get_res_link(loadbalancer_crd), - spec) + self.k8s.patch_crd('spec', utils.get_res_link(loadbalancer_crd), + {'endpointSlices': epslices}) except k_exc.K8sResourceNotFound: LOG.debug('KuryrLoadbalancer CRD not found %s', loadbalancer_crd) except k_exc.K8sConflict: raise k_exc.ResourceNotReady(loadbalancer_crd) - except k_exc.K8sClientException: + except k_exc.K8sClientException as e: LOG.exception('Error updating KuryrLoadbalancer CRD %s', loadbalancer_crd) + self._add_event( + endpoints, 'UpdateKLBFailed', + 'Error when updating KuryrLoadBalancer object: %s' % e, + 'Warning') raise return True def _remove_endpoints(self, endpoints): - kubernetes = clients.get_kubernetes_client() lb_name = endpoints['metadata']['name'] try: - kubernetes.patch_crd('spec', - utils.get_klb_crd_path(endpoints), - 'endpointSlices', - action='remove') + self.k8s.patch_crd('spec', utils.get_klb_crd_path(endpoints), + 'endpointSlices', action='remove') except k_exc.K8sResourceNotFound: LOG.debug('KuryrLoadBalancer CRD not found %s', lb_name) except k_exc.K8sUnprocessableEntity: LOG.warning('KuryrLoadBalancer %s modified, ignoring.', lb_name) - except k_exc.K8sClientException: + except k_exc.K8sClientException as e: LOG.exception('Error updating KuryrLoadBalancer CRD %s', lb_name) + self._add_event( + endpoints, 'UpdateKLBFailed', + 'Error when updating KuryrLoadBalancer object: %s' % e, + 'Warning') raise diff --git a/kuryr_kubernetes/controller/handlers/loadbalancer.py b/kuryr_kubernetes/controller/handlers/loadbalancer.py index 5d955ab5b..1c52b422c 100644 --- a/kuryr_kubernetes/controller/handlers/loadbalancer.py +++ b/kuryr_kubernetes/controller/handlers/loadbalancer.py @@ -53,26 +53,56 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): self._drv_svc_project = drv_base.ServiceProjectDriver.get_instance() self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance() self._drv_nodes_subnets = drv_base.NodesSubnetsDriver.get_instance() + self.k8s = clients.get_kubernetes_client() def _get_nodes_subnets(self): return utils.get_subnets_id_cidrs( self._drv_nodes_subnets.get_nodes_subnets()) + def _add_event(self, klb, reason, message, type_=None): + """_add_event adds an event for the corresponding Service.""" + klb_meta = klb['metadata'] + for ref in klb_meta.get('ownerReferences', []): + # "mock" a Service based on ownerReference to it. + if ref['kind'] == 'Service' and ref['name'] == klb_meta['name']: + service = { + 'apiVersion': ref['apiVersion'], + 'kind': ref['kind'], + 'metadata': { + 'name': ref['name'], + 'uid': ref['uid'], + 'namespace': klb_meta['namespace'], # ref shares ns + }, + } + break + else: + # No reference, just fetch the service from the API. + try: + service = self.k8s.get( + f"{k_const.K8S_API_NAMESPACES}/{klb_meta['namespace']}" + f"/services/{klb_meta['name']}") + except k_exc.K8sClientException: + LOG.debug('Error when fetching Service to add an event %s, ' + 'ignoring', utils.get_res_unique_name(klb)) + return + kwargs = {'type_': type_} if type_ else {} + self.k8s.add_event(service, reason, message, **kwargs) + def on_present(self, loadbalancer_crd, *args, **kwargs): if loadbalancer_crd.get('status', None) is None: - kubernetes = clients.get_kubernetes_client() try: - kubernetes.patch_crd('status', - utils.get_res_link(loadbalancer_crd), - {}) + self.k8s.patch_crd('status', + utils.get_res_link(loadbalancer_crd), {}) except k_exc.K8sResourceNotFound: LOG.debug('KuryrLoadbalancer CRD not found %s', utils.get_res_unique_name(loadbalancer_crd)) return - if self._should_ignore(loadbalancer_crd): - LOG.debug("Ignoring Kubernetes service %s", - loadbalancer_crd['metadata']['name']) + reason = self._should_ignore(loadbalancer_crd) + if reason: + reason %= utils.get_res_unique_name(loadbalancer_crd) + LOG.debug(reason) + self._add_event(loadbalancer_crd, 'KuryrServiceSkipped', reason) return crd_lb = loadbalancer_crd['status'].get('loadbalancer') @@ -83,15 +113,34 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): if not lb_provider or lb_provider in OCTAVIA_DEFAULT_PROVIDERS: if (spec_lb_provider and spec_lb_provider not in OCTAVIA_DEFAULT_PROVIDERS): + self._add_event(loadbalancer_crd, 'KuryrUpdateProvider', + 'Deleting Amphora load balancer to ' + 'recreate it with OVN provider') self._ensure_release_lbaas(loadbalancer_crd) # ovn to amphora downgrade elif lb_provider and lb_provider not in OCTAVIA_DEFAULT_PROVIDERS: if (not spec_lb_provider or spec_lb_provider in OCTAVIA_DEFAULT_PROVIDERS): + self._add_event(loadbalancer_crd, 'KuryrUpdateProvider', + 'Deleting OVN load balancer to ' + 'recreate it with Amphora provider') self._ensure_release_lbaas(loadbalancer_crd) - if self._sync_lbaas_members(loadbalancer_crd): + if not crd_lb: + self._add_event(loadbalancer_crd, 'KuryrEnsureLB', + 'Provisioning a load balancer') + try: + changed = self._sync_lbaas_members(loadbalancer_crd) + except Exception as e: + self._add_event( + loadbalancer_crd, 'KuryrEnsureLBError', + f'Error when provisioning load balancer: {e}', 'Warning') + raise + + if changed: + self._add_event(loadbalancer_crd, 'KuryrEnsuredLB', + 'Load balancer provisioned') # Note(yboaron) For LoadBalancer services, we should allocate FIP, # associate it to LB VIP and update K8S service status lb_ip = loadbalancer_crd['spec'].get('lb_ip') @@ -106,6 +155,9 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): loadbalancer_crd['status']['loadbalancer'][ 'port_id'])) if service_pub_ip_info: + self._add_event( + loadbalancer_crd, 'KuryrEnsureFIP', + 'Associating floating IP to the load balancer') self._drv_service_pub_ip.associate_pub_ip( service_pub_ip_info, loadbalancer_crd['status'][ 'loadbalancer']['port_id']) @@ -128,10 +180,12 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): def _trigger_loadbalancer_reconciliation(self, loadbalancer_crds): LOG.debug("Reconciling the loadbalancer CRDs") # get the loadbalancers id in the CRD status - crd_loadbalancer_ids = [{'id': loadbalancer_crd.get('status', {}).get( - 'loadbalancer', {}).get('id', {}), 'selflink': - utils.get_res_link(loadbalancer_crd)} for - loadbalancer_crd in loadbalancer_crds] + crd_loadbalancer_ids = [ + {'id': loadbalancer_crd.get('status', {}).get( + 'loadbalancer', {}).get('id', {}), + 'selflink': utils.get_res_link(loadbalancer_crd), + 'klb': loadbalancer_crd} for + loadbalancer_crd in loadbalancer_crds] lbaas = clients.get_loadbalancer_client() lbaas_spec = {} self._drv_lbaas.add_tags('loadbalancer', lbaas_spec) @@ -140,21 +194,24 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): for loadbalancer in loadbalancers] # for each loadbalancer id in the CRD status, check if exists in # OpenStack - crds_to_reconcile_selflink = [crd_lb['selflink'] for crd_lb in - crd_loadbalancer_ids if - crd_lb['id'] not in loadbalancers_id] - if not crds_to_reconcile_selflink: + crds_to_reconcile = [crd_lb for crd_lb in crd_loadbalancer_ids + if crd_lb['id'] not in loadbalancers_id] + if not crds_to_reconcile: LOG.debug("KuryrLoadBalancer CRDs already in sync with OpenStack") return LOG.debug("Reconciling the following KuryrLoadBalancer CRDs: %r", - crds_to_reconcile_selflink) - self._reconcile_lbaas(crds_to_reconcile_selflink) + [klb['id'] for klb in crds_to_reconcile]) + self._reconcile_lbaas(crds_to_reconcile) - def _reconcile_lbaas(self, crds_to_reconcile_selflink): - kubernetes = clients.get_kubernetes_client() - for selflink in crds_to_reconcile_selflink: + def _reconcile_lbaas(self, crds_to_reconcile): + for entry in crds_to_reconcile: + selflink = entry['selflink'] try: - kubernetes.patch_crd('status', selflink, {}) + self._add_event( + entry['klb'], 'LoadBalancerRecreating', + 'Load balancer for the Service seems to not exist anymore.' + ' Recreating it.', 'Warning') + self.k8s.patch_crd('status', selflink, {}) except k_exc.K8sResourceNotFound: LOG.debug('Unable to reconcile the KuryLoadBalancer CRD %s', selflink) @@ -165,9 +222,12 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): continue def _should_ignore(self, loadbalancer_crd): - return (not(self._has_endpoints(loadbalancer_crd) or - loadbalancer_crd.get('status')) or not - loadbalancer_crd['spec'].get('ip')) + if not(self._has_endpoints(loadbalancer_crd) or + loadbalancer_crd.get('status')): + return 'Skipping Service %s without Endpoints' + elif not loadbalancer_crd['spec'].get('ip'): + return 'Skipping Service %s without IP set yet' + return False def _has_endpoints(self, loadbalancer_crd): ep_slices = loadbalancer_crd['spec'].get('endpointSlices', []) @@ -179,9 +239,20 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): LOG.debug("Deleting the loadbalancer CRD") if loadbalancer_crd['status'] != {}: - # NOTE(ivc): deleting pool deletes its members - self._drv_lbaas.release_loadbalancer( - loadbalancer=loadbalancer_crd['status'].get('loadbalancer')) + self._add_event(loadbalancer_crd, 'KuryrReleaseLB', + 'Releasing the load balancer') + try: + # NOTE(ivc): deleting pool deletes its members + self._drv_lbaas.release_loadbalancer( + loadbalancer_crd['status'].get('loadbalancer')) + except Exception as e: + # FIXME(dulek): It seems like if loadbalancer will be stuck in + # PENDING_DELETE we'll just silently time out + # waiting for it to be deleted. Is that expected? + self._add_event( + loadbalancer_crd, 'KuryrReleaseLBError', + f'Error when releasing load balancer: {e}', 'Warning') + raise try: pub_info = loadbalancer_crd['status']['service_pub_ip_info'] @@ -189,43 +260,53 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): pub_info = None if pub_info: + self._add_event( + loadbalancer_crd, 'KuryrReleaseFIP', + 'Dissociating floating IP from the load balancer') self._drv_service_pub_ip.release_pub_ip( loadbalancer_crd['status']['service_pub_ip_info']) - kubernetes = clients.get_kubernetes_client() LOG.debug('Removing finalizer from KuryrLoadBalancer CRD %s', loadbalancer_crd) try: - kubernetes.remove_finalizer(loadbalancer_crd, - k_const.KURYRLB_FINALIZER) - except k_exc.K8sClientException: - LOG.exception('Error removing kuryrloadbalancer CRD finalizer ' - 'for %s', loadbalancer_crd) + self.k8s.remove_finalizer(loadbalancer_crd, + k_const.KURYRLB_FINALIZER) + except k_exc.K8sClientException as e: + msg = (f'K8s API error when removing finalizer from ' + f'KuryrLoadBalancer of Service ' + f'{utils.get_res_unique_name(loadbalancer_crd)}') + LOG.exception(msg) + self._add_event(loadbalancer_crd, 'KuryrRemoveLBFinalizerError', + f'{msg}: {e}', 'Warning') raise namespace = loadbalancer_crd['metadata']['namespace'] name = loadbalancer_crd['metadata']['name'] try: - service = kubernetes.get(f"{k_const.K8S_API_NAMESPACES}" - f"/{namespace}/services/{name}") - except k_exc.K8sResourceNotFound as ex: - LOG.warning("Failed to get service: %s", ex) + service = self.k8s.get(f"{k_const.K8S_API_NAMESPACES}/{namespace}" + f"/services/{name}") + except k_exc.K8sResourceNotFound: + LOG.warning('Service %s not found. This is unexpected.', + utils.get_res_unique_name(loadbalancer_crd)) return - LOG.debug('Removing finalizer from service %s', - service["metadata"]["name"]) + LOG.debug('Removing finalizer from Service %s', + utils.get_res_unique_name(service)) try: - kubernetes.remove_finalizer(service, k_const.SERVICE_FINALIZER) - except k_exc.K8sClientException: - LOG.exception('Error removing service finalizer ' - 'for %s', service["metadata"]["name"]) + self.k8s.remove_finalizer(service, k_const.SERVICE_FINALIZER) + except k_exc.K8sClientException as e: + msg = (f'K8s API error when removing finalizer from Service ' + f'{utils.get_res_unique_name(service)}') + LOG.exception(msg) + self._add_event( + loadbalancer_crd, 'KuryrRemoveServiceFinalizerError', + f'{msg}: {e}', 'Warning') raise def _patch_status(self, loadbalancer_crd): - kubernetes = clients.get_kubernetes_client() try: - kubernetes.patch_crd('status', utils.get_res_link( - loadbalancer_crd), loadbalancer_crd['status']) + self.k8s.patch_crd('status', utils.get_res_link(loadbalancer_crd), + loadbalancer_crd['status']) except k_exc.K8sResourceNotFound: LOG.debug('KuryrLoadBalancer CRD not found %s', loadbalancer_crd) return False @@ -233,16 +314,20 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): LOG.warning('KuryrLoadBalancer %s modified, retrying later.', utils.get_res_unique_name(loadbalancer_crd)) return False - except k_exc.K8sClientException: - LOG.exception('Error updating KuryLoadbalancer CRD %s', - loadbalancer_crd) + except k_exc.K8sClientException as e: + msg = (f'K8s API error when updating status of ' + f'{utils.get_res_unique_name(loadbalancer_crd)} Service ' + f'load balancer') + LOG.exception(msg) + self._add_event(loadbalancer_crd, 'KuryrUpdateLBStatusError', + f'{msg}: {e}', 'Warning') raise return True def _sync_lbaas_members(self, loadbalancer_crd): changed = False - if (self._remove_unused_members(loadbalancer_crd)): + if self._remove_unused_members(loadbalancer_crd): changed = True if self._sync_lbaas_pools(loadbalancer_crd): @@ -258,9 +343,8 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): lb = klb_crd['status'].get('loadbalancer') svc_name = klb_crd['metadata']['name'] svc_namespace = klb_crd['metadata']['namespace'] - k8s = clients.get_kubernetes_client() try: - service = k8s.get( + service = self.k8s.get( f'{k_const.K8S_API_NAMESPACES}/{svc_namespace}/' f'services/{svc_name}') except k_exc.K8sResourceNotFound: @@ -275,8 +359,9 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): lb['security_groups'] = lb_sgs try: - k8s.patch_crd('status/loadbalancer', utils.get_res_link(klb_crd), - {'security_groups': lb_sgs}) + self.k8s.patch_crd('status/loadbalancer', + utils.get_res_link(klb_crd), + {'security_groups': lb_sgs}) except k_exc.K8sResourceNotFound: LOG.debug('KuryrLoadBalancer %s not found', svc_name) return None @@ -284,8 +369,13 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): LOG.debug('KuryrLoadBalancer entity not processable ' 'due to missing loadbalancer field.') return None - except k_exc.K8sClientException: - LOG.exception('Error syncing KuryrLoadBalancer %s', svc_name) + except k_exc.K8sClientException as e: + msg = (f'K8s API error when updating SGs status of ' + f'{utils.get_res_unique_name(klb_crd)} Service load ' + f'balancer') + LOG.exception(msg) + self._add_event(klb_crd, 'KuryrUpdateLBStatusError', + f'{msg}: {e}', 'Warning') raise return klb_crd @@ -359,8 +449,14 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): target_pod, target_ip, loadbalancer_crd) if not member_subnet_id: - LOG.warning("Skipping member creation for %s", - target_ip) + msg = ( + f'Unable to determine ID of the subnet of member ' + f'{target_ip} for service ' + f'{utils.get_res_unique_name(loadbalancer_crd)}. ' + f'Skipping its creation') + self._add_event(loadbalancer_crd, 'KuryrSkipMember', + msg, 'Warning') + LOG.warning(msg) continue target_name, target_namespace = self._get_target_info( @@ -617,8 +713,6 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): for port_spec in lbaas_spec_ports: protocol = port_spec['protocol'] port = port_spec['port'] - name = "%s:%s" % (loadbalancer_crd['status']['loadbalancer'][ - 'name'], protocol) listener = [] for l in loadbalancer_crd['status'].get('listeners', []): @@ -638,12 +732,23 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): 'listeners', []) if l['port'] == port] if listener and not self._drv_lbaas.double_listeners_supported(): - LOG.warning("Skipping listener creation for %s as another one" - " already exists with port %s", name, port) + msg = ( + f'Octavia does not support multiple listeners listening ' + f'on the same port. Skipping creation of listener ' + f'{protocol}:{port} because {listener["protocol"]}:' + f'{listener["port"]} already exists for Service ' + f'{utils.get_res_unique_name(loadbalancer_crd)}') + self._add_event(loadbalancer_crd, 'KuryrSkipListener', msg, + 'Warning') + LOG.warning(msg) continue if protocol == "SCTP" and not self._drv_lbaas.sctp_supported(): - LOG.warning("Skipping listener creation as provider does" - " not support %s protocol", protocol) + msg = ( + f'Skipping listener {protocol}:{port} creation as Octavia ' + f'does not support {protocol} protocol.') + self._add_event(loadbalancer_crd, 'KuryrSkipListener', msg, + 'Warning') + LOG.warning(msg) continue listener = self._drv_lbaas.ensure_listener( loadbalancer=loadbalancer_crd['status'].get('loadbalancer'), @@ -697,17 +802,18 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): ns = lb_crd['metadata']['namespace'] status_data = {"loadBalancer": { "ingress": [{"ip": lb_ip_address.format()}]}} - k8s = clients.get_kubernetes_client() try: - k8s.patch("status", f"{k_const.K8S_API_NAMESPACES}" - f"/{ns}/services/{name}/status", - status_data) + self.k8s.patch("status", f"{k_const.K8S_API_NAMESPACES}" + f"/{ns}/services/{name}/status", + status_data) except k_exc.K8sConflict: raise k_exc.ResourceNotReady(name) - except k_exc.K8sClientException: - LOG.exception("Kubernetes Client Exception" - "when updating the svc status %s" - % name) + except k_exc.K8sClientException as e: + msg = (f'K8s API error when updating external FIP data of Service ' + f'{utils.get_res_unique_name(lb_crd)}') + LOG.exception(msg) + self._add_event(lb_crd, 'KuryrUpdateServiceStatusError', + f'{msg}: {e}', 'Warning') raise def _sync_lbaas_loadbalancer(self, loadbalancer_crd): @@ -754,29 +860,26 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): def _ensure_release_lbaas(self, loadbalancer_crd): attempts = 0 - deadline = 0 - retry = True timeout = config.CONF.kubernetes.watch_retry_timeout - while retry: + deadline = time.time() + timeout + while True: try: - if attempts == 1: - deadline = time.time() + timeout - if (attempts > 0 and - utils.exponential_sleep(deadline, attempts) == 0): - LOG.error("Failed releasing lbaas '%s': deadline exceeded", - loadbalancer_crd['status']['loadbalancer'][ - 'name']) + if not utils.exponential_sleep(deadline, attempts): + msg = (f'Timed out waiting for deletion of load balancer ' + f'{utils.get_res_unique_name(loadbalancer_crd)}') + self._add_event( + loadbalancer_crd, 'KuryrLBReleaseTimeout', msg, + 'Warning') + LOG.error(msg) return self._drv_lbaas.release_loadbalancer( - loadbalancer=loadbalancer_crd['status'].get('loadbalancer') - ) - retry = False + loadbalancer_crd['status'].get('loadbalancer')) + break except k_exc.ResourceNotReady: - LOG.debug("Attempt (%s) of loadbalancer release %s failed." + LOG.debug("Attempt %s to release LB %s failed." " A retry will be triggered.", attempts, - loadbalancer_crd['status']['loadbalancer']['name']) + utils.get_res_unique_name(loadbalancer_crd)) attempts += 1 - retry = True loadbalancer_crd['status'] = {} self._patch_status(loadbalancer_crd) diff --git a/kuryr_kubernetes/controller/handlers/pipeline.py b/kuryr_kubernetes/controller/handlers/pipeline.py index 384be01f0..d82b3064f 100644 --- a/kuryr_kubernetes/controller/handlers/pipeline.py +++ b/kuryr_kubernetes/controller/handlers/pipeline.py @@ -58,11 +58,13 @@ class ControllerPipeline(h_dis.EventPipeline): def _wrap_consumer(self, consumer): # TODO(ivc): tune retry interval/timeout - return h_log.LogExceptions(h_retry.Retry( - consumer, exceptions=( - exceptions.ResourceNotReady, - key_exc.connection.ConnectFailure, - requests_exc.ConnectionError))) + return h_log.LogExceptions( + h_retry.Retry( + consumer, + exceptions=(exceptions.ResourceNotReady, + key_exc.connection.ConnectFailure, + requests_exc.ConnectionError)), + ignore_exceptions=(exceptions.KuryrLoadBalancerNotCreated,)) def _wrap_dispatcher(self, dispatcher): return h_log.LogExceptions(h_async.Async(dispatcher, self._tg, diff --git a/kuryr_kubernetes/exceptions.py b/kuryr_kubernetes/exceptions.py index 75b708e67..b215d169d 100644 --- a/kuryr_kubernetes/exceptions.py +++ b/kuryr_kubernetes/exceptions.py @@ -42,6 +42,13 @@ class ResourceNotReady(Exception): super(ResourceNotReady, self).__init__("Resource not ready: %r" % msg) +class KuryrLoadBalancerNotCreated(Exception): + def __init__(self, res): + name = utils.get_res_unique_name(res) + super().__init__( + 'KuryrLoadBalancer not created yet for the Service %s' % name) + + class LoadBalancerNotReady(ResourceNotReady): def __init__(self, loadbalancer_id, status): super().__init__( diff --git a/kuryr_kubernetes/handlers/logging.py b/kuryr_kubernetes/handlers/logging.py index 451069520..4c11c9524 100644 --- a/kuryr_kubernetes/handlers/logging.py +++ b/kuryr_kubernetes/handlers/logging.py @@ -28,13 +28,16 @@ class LogExceptions(base.EventHandler): instead. """ - def __init__(self, handler, exceptions=Exception): + def __init__(self, handler, exceptions=Exception, ignore_exceptions=None): self._handler = handler self._exceptions = exceptions + self._ignore_exceptions = ignore_exceptions or () def __call__(self, event, *args, **kwargs): try: self._handler(event, *args, **kwargs) + except self._ignore_exceptions: + pass except self._exceptions as ex: # If exception comes from OpenStack SDK and contains # 'request_id' then print this 'request_id' along the Exception. diff --git a/kuryr_kubernetes/handlers/retry.py b/kuryr_kubernetes/handlers/retry.py index 96fc821f0..1ce123a87 100644 --- a/kuryr_kubernetes/handlers/retry.py +++ b/kuryr_kubernetes/handlers/retry.py @@ -97,7 +97,10 @@ class Retry(base.EventHandler): .get_instance()) method = getattr(exporter, cls_map[type(exc).__name__]) method() - + except exceptions.KuryrLoadBalancerNotCreated: + with excutils.save_and_reraise_exception() as ex: + if self._sleep(deadline, attempt, ex.value): + ex.reraise = False except os_exc.ConflictException as ex: if ex.details.startswith('Quota exceeded for resources'): with excutils.save_and_reraise_exception() as ex: diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py index 59283b48e..c7252265e 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py @@ -27,23 +27,6 @@ _SUPPORTED_LISTENER_PROT = ('HTTP', 'HTTPS', 'TCP') class TestServiceHandler(test_base.TestCase): - - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.ServiceSecurityGroupsDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.ServiceSubnetsDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.ServiceProjectDriver.get_instance') - def test_init(self, m_get_drv_project, m_get_drv_subnets, m_get_drv_sg): - m_get_drv_project.return_value = mock.sentinel.drv_project - m_get_drv_subnets.return_value = mock.sentinel.drv_subnets - m_get_drv_sg.return_value = mock.sentinel.drv_sg - handler = h_lbaas.ServiceHandler() - - self.assertEqual(mock.sentinel.drv_project, handler._drv_project) - self.assertEqual(mock.sentinel.drv_subnets, handler._drv_subnets) - self.assertEqual(mock.sentinel.drv_sg, handler._drv_sg) - @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') def test_on_present(self, get_k8s_client): svc_event = { @@ -114,6 +97,7 @@ class TestServiceHandler(test_base.TestCase): m_handler.create_crd_spec.return_value = new_spec m_handler._should_ignore.return_value = False m_handler._drv_project = m_drv_project + m_handler.k8s = mock.Mock() h_lbaas.ServiceHandler.on_present(m_handler, svc_event) m_handler.create_crd_spec(svc_event) @@ -179,6 +163,7 @@ class TestServiceHandler(test_base.TestCase): m_handler.create_crd_spec.return_value = old_spec m_handler._should_ignore.return_value = False m_handler._drv_project = m_drv_project + m_handler.k8s = mock.Mock() h_lbaas.ServiceHandler.on_present(m_handler, svc_event) m_handler.create_crd_spec(svc_event) @@ -418,51 +403,36 @@ class TestEndpointsHandler(test_base.TestCase): h_lbaas.EndpointsHandler.on_deleted(m_handler, self._ep) m_handler._remove_endpoints.assert_called_once_with(self._ep) - @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') @mock.patch('kuryr_kubernetes.utils.get_klb_crd_path') - def test__remove_endpoints(self, get_klb_crd_path, get_k8s_client): - k8s = mock.Mock() - get_k8s_client.return_value = k8s - h_lbaas.EndpointsHandler._remove_endpoints(self, self._ep) - k8s.patch_crd.assert_called_once_with('spec', - get_klb_crd_path(self._ep), - 'endpointSlices', - action='remove') + def test__remove_endpoints(self, get_klb_crd_path): + m_handler = mock.Mock() + h_lbaas.EndpointsHandler._remove_endpoints(m_handler, self._ep) + m_handler.k8s.patch_crd.assert_called_once_with( + 'spec', get_klb_crd_path(self._ep), 'endpointSlices', + action='remove') - @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') @mock.patch.object(logging.getLogger( 'kuryr_kubernetes.controller.handlers.lbaas'), 'debug') - def test__remove_endpoints_not_found(self, get_k8s_client, log): - k8s = mock.Mock() - get_k8s_client.return_value = k8s - h_lbaas.EndpointsHandler._remove_endpoints(self, self._ep) - - k8s.patch_crd.side_effect = k_exc.K8sResourceNotFound(self._ep) - + def test__remove_endpoints_not_found(self, log): + m_handler = mock.Mock() + m_handler.k8s.patch_crd.side_effect = k_exc.K8sResourceNotFound('foo') + h_lbaas.EndpointsHandler._remove_endpoints(m_handler, self._ep) log.assert_called_once() - @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') - def test__remove_endpoints_client_exception(self, get_k8s_client): - k8s = mock.Mock() - get_k8s_client.return_value = k8s - h_lbaas.EndpointsHandler._remove_endpoints(self, self._ep) - - k8s.patch_crd.side_effect = k_exc.K8sClientException(self._ep) - + def test__remove_endpoints_client_exception(self): + m_handler = mock.Mock() + m_handler.k8s.patch_crd.side_effect = k_exc.K8sClientException() self.assertRaises(k_exc.K8sClientException, h_lbaas.EndpointsHandler._remove_endpoints, - self, self._ep) + m_handler, self._ep) - @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') @mock.patch.object(logging.getLogger( 'kuryr_kubernetes.controller.handlers.lbaas'), 'warning') - def test__remove_endpoints_unprocessable_entity(self, get_k8s_client, log): - k8s = mock.Mock() - get_k8s_client.return_value = k8s - h_lbaas.EndpointsHandler._remove_endpoints(self, self._ep) - - k8s.patch_crd.side_effect = k_exc.K8sUnprocessableEntity(self._ep) - + def test__remove_endpoints_unprocessable_entity(self, log): + m_handler = mock.Mock() + m_handler.k8s.patch_crd.side_effect = k_exc.K8sUnprocessableEntity( + 'bar') + h_lbaas.EndpointsHandler._remove_endpoints(m_handler, self._ep) log.assert_called_once() diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py index 3df7303a4..ed0abb382 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py @@ -259,81 +259,6 @@ class FakeLBaaSDriver(drv_base.LBaaSDriver): @mock.patch('kuryr_kubernetes.utils.get_subnets_id_cidrs', mock.Mock(return_value=[('id', 'cidr')])) class TestKuryrLoadBalancerHandler(test_base.TestCase): - @mock.patch('kuryr_kubernetes.utils.get_subnet_cidr') - @mock.patch('kuryr_kubernetes.controller.drivers.base.' - 'ServiceProjectDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base.' - 'ServiceSecurityGroupsDriver.get_instance') - @mock.patch('kuryr_kubernetes.config.CONF') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.ServicePubIpDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.PodSubnetsDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.PodProjectDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.LBaaSDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.NodesSubnetsDriver.get_instance') - def test_init(self, m_get_drv_node_subnets, m_get_drv_lbaas, - m_get_drv_project, m_get_drv_subnets, - m_get_drv_service_pub_ip, m_cfg, m_get_svc_sg_drv, - m_get_svc_drv_project, m_get_cidr): - m_get_drv_lbaas.return_value = mock.sentinel.drv_lbaas - m_get_drv_project.return_value = mock.sentinel.drv_project - m_get_drv_subnets.return_value = mock.sentinel.drv_subnets - m_get_cidr.return_value = '10.0.0.128/26' - m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip - m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project - m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg - m_get_drv_node_subnets.return_value = mock.sentinel.drv_node_subnets - handler = h_lb.KuryrLoadBalancerHandler() - - self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas) - self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project) - self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets) - self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip) - self.assertEqual(mock.sentinel.drv_node_subnets, - handler._drv_nodes_subnets) - - @mock.patch('kuryr_kubernetes.utils.get_subnet_cidr') - @mock.patch('kuryr_kubernetes.controller.drivers.base.' - 'ServiceProjectDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base.' - 'ServiceSecurityGroupsDriver.get_instance') - @mock.patch('kuryr_kubernetes.config.CONF') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.ServicePubIpDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.PodSubnetsDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.PodProjectDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.LBaaSDriver.get_instance') - @mock.patch('kuryr_kubernetes.controller.drivers.base' - '.NodesSubnetsDriver.get_instance') - def test_init_provider_ovn(self, m_get_drv_node_subnets, m_get_drv_lbaas, - m_get_drv_project, m_get_drv_subnets, - m_get_drv_service_pub_ip, m_cfg, - m_get_svc_sg_drv, m_get_svc_drv_project, - m_get_cidr): - m_get_cidr.return_value = '10.0.0.128/26' - m_get_drv_lbaas.return_value = mock.sentinel.drv_lbaas - m_get_drv_project.return_value = mock.sentinel.drv_project - m_get_drv_subnets.return_value = mock.sentinel.drv_subnets - m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip - m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project - m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg - m_get_drv_node_subnets.return_value = mock.sentinel.drv_node_subnets - handler = h_lb .KuryrLoadBalancerHandler() - - self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas) - self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project) - self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets) - self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip) - self.assertEqual(mock.sentinel.drv_node_subnets, - handler._drv_nodes_subnets) - def test_on_present(self): m_drv_service_pub_ip = mock.Mock() m_drv_service_pub_ip.acquire_service_pub_ip_info.return_value = None @@ -670,14 +595,16 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase): lbaas_spec = {} lbaas.load_balancers.return_value = [] - selflink = ['/apis/openstack.org/v1/namespaces/default/' - 'kuryrloadbalancers/test', - '/apis/openstack.org/v1/namespaces/default/' - 'kuryrloadbalancers/demo'] + expected_selflink = ['/apis/openstack.org/v1/namespaces/default/' + 'kuryrloadbalancers/test', + '/apis/openstack.org/v1/namespaces/default/' + 'kuryrloadbalancers/demo'] h_lb.KuryrLoadBalancerHandler._trigger_loadbalancer_reconciliation( m_handler, loadbalancer_crds) lbaas.load_balancers.assert_called_once_with(**lbaas_spec) - m_handler._reconcile_lbaas.assert_called_with(selflink) + selflink = [c['selflink'] for c + in m_handler._reconcile_lbaas.call_args[0][0]] + self.assertEqual(expected_selflink, selflink) @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') @mock.patch('kuryr_kubernetes.controller.drivers.base' diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py index 8b603067a..4bc017ec9 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py @@ -37,7 +37,8 @@ class TestControllerPipeline(test_base.TestCase): ret = pipeline._wrap_consumer(consumer) self.assertEqual(logging_handler, ret) - m_logging_type.assert_called_with(retry_handler) + m_logging_type.assert_called_with(retry_handler, + ignore_exceptions=mock.ANY) m_retry_type.assert_called_with(consumer, exceptions=mock.ANY) @mock.patch('kuryr_kubernetes.handlers.logging.LogExceptions')