diff --git a/kuryr_kubernetes/cni/handlers.py b/kuryr_kubernetes/cni/handlers.py index a4b889fb9..9abad6904 100644 --- a/kuryr_kubernetes/cni/handlers.py +++ b/kuryr_kubernetes/cni/handlers.py @@ -36,7 +36,7 @@ class CNIHandlerBase(k8s_base.ResourceEventHandler, metaclass=abc.ABCMeta): self._callback = on_done self._vifs = {} - def on_present(self, pod): + def on_present(self, pod, *args, **kwargs): vifs = self._get_vifs(pod) if self.should_callback(pod, vifs): @@ -107,7 +107,7 @@ class CallbackHandler(CNIHandlerBase): def callback(self): self._callback(self._kuryrport, self._callback_vifs) - def on_deleted(self, kuryrport): + def on_deleted(self, kuryrport, *args, **kwargs): LOG.debug("Got kuryrport %s deletion event.", kuryrport['metadata']['name']) if self._del_callback: diff --git a/kuryr_kubernetes/controller/drivers/base.py b/kuryr_kubernetes/controller/drivers/base.py index c83f44529..3ed0070bc 100644 --- a/kuryr_kubernetes/controller/drivers/base.py +++ b/kuryr_kubernetes/controller/drivers/base.py @@ -388,7 +388,7 @@ class PodVIFDriver(DriverBase, metaclass=abc.ABCMeta): raise NotImplementedError() @abc.abstractmethod - def activate_vif(self, vif): + def activate_vif(self, vif, **kwargs): """Updates VIF to become active. Implementing drivers should update the specified `vif` object's diff --git a/kuryr_kubernetes/controller/drivers/nested_dpdk_vif.py b/kuryr_kubernetes/controller/drivers/nested_dpdk_vif.py index 2d977d6df..50cdec1aa 100644 --- a/kuryr_kubernetes/controller/drivers/nested_dpdk_vif.py +++ b/kuryr_kubernetes/controller/drivers/nested_dpdk_vif.py @@ -64,7 +64,7 @@ class NestedDpdkPodVIFDriver(nested_vif.NestedPodVIFDriver): vif.id, vm_id) raise - def activate_vif(self, vif): + def activate_vif(self, vif, **kwargs): # NOTE(danil): new virtual interface was created in nova instance # during request_vif call, thus if it was not created successfully # an exception o_exc.SDKException would be throwed. During binding diff --git a/kuryr_kubernetes/controller/drivers/nested_macvlan_vif.py b/kuryr_kubernetes/controller/drivers/nested_macvlan_vif.py index ac28ae56d..12eb24618 100755 --- a/kuryr_kubernetes/controller/drivers/nested_macvlan_vif.py +++ b/kuryr_kubernetes/controller/drivers/nested_macvlan_vif.py @@ -86,7 +86,7 @@ class NestedMacvlanPodVIFDriver(nested_vif.NestedPodVIFDriver): LOG.warning("Unable to release port %s as it no longer exists.", vif.id) - def activate_vif(self, vif): + def activate_vif(self, vif, **kwargs): # NOTE(mchiappe): there is no way to get feedback on the actual # interface creation or activation as no plugging can happen for this # interface type. However the status of the port is not relevant as diff --git a/kuryr_kubernetes/controller/drivers/nested_vlan_vif.py b/kuryr_kubernetes/controller/drivers/nested_vlan_vif.py index 3eb7b98a9..72a9938da 100644 --- a/kuryr_kubernetes/controller/drivers/nested_vlan_vif.py +++ b/kuryr_kubernetes/controller/drivers/nested_vlan_vif.py @@ -33,6 +33,7 @@ LOG = logging.getLogger(__name__) DEFAULT_MAX_RETRY_COUNT = 3 DEFAULT_RETRY_INTERVAL = 1 +ACTIVE_TIMEOUT = 90 CONF = cfg.CONF @@ -118,6 +119,37 @@ class NestedVlanPodVIFDriver(nested_vif.NestedPodVIFDriver): vifs.append(vif) return vifs + def activate_vif(self, vif, pod=None, retry_info=None): + try: + super().activate_vif(vif) + except k_exc.ResourceNotReady: + if retry_info and retry_info.get('elapsed', 0) > ACTIVE_TIMEOUT: + parent_port = self._get_parent_port(pod) + trunk_id = self._get_trunk_id(parent_port) + # NOTE(dulek): We don't need a lock to prevent VLAN ID from + # being taken over because the SegmentationDriver + # will keep it reserved in memory unless we + # release it. And we won't. + LOG.warning('Subport %s is in DOWN status for more than %d ' + 'seconds. This is a Neutron issue. Attempting to ' + 'reattach the subport to trunk %s using VLAN ID %s' + ' to fix it.', vif.id, retry_info['elapsed'], + trunk_id, vif.vlan_id) + try: + self._remove_subport(trunk_id, vif.id) + except os_exc.NotFoundException: + # NOTE(dulek): This may happen when _add_subport() failed + # or Kuryr crashed between the calls. Let's + # try to fix it hoping that VLAN ID is still + # free. + LOG.warning('Subport %s was not attached to the trunk. ' + 'Trying to attach it anyway.', vif.id) + self._add_subport(trunk_id, vif.id, + requested_vlan_id=vif.vlan_id) + LOG.warning("Reattached subport %s, its state will be " + "rechecked when event will be retried.", vif.id) + raise + def release_vif(self, pod, vif, project_id=None, security_groups=None): os_net = clients.get_network_client() parent_port = self._get_parent_port(pod) @@ -182,7 +214,7 @@ class NestedVlanPodVIFDriver(nested_vif.NestedPodVIFDriver): "with a Neutron vlan trunk") raise k_exc.K8sNodeTrunkPortFailure - def _add_subport(self, trunk_id, subport): + def _add_subport(self, trunk_id, subport, requested_vlan_id=None): """Adds subport port to Neutron trunk This method gets vlanid allocated from kuryr segmentation driver. @@ -196,29 +228,34 @@ class NestedVlanPodVIFDriver(nested_vif.NestedPodVIFDriver): os_net = clients.get_network_client() retry_count = 1 while True: - try: - vlan_id = self._get_vlan_id(trunk_id) - except os_exc.SDKException: - LOG.error("Getting VlanID for subport on " - "trunk %s failed!!", trunk_id) - raise + if requested_vlan_id: + vlan_id = requested_vlan_id + else: + try: + vlan_id = self._get_vlan_id(trunk_id) + except os_exc.SDKException: + LOG.error("Getting VLAN ID for subport on " + "trunk %s failed!!", trunk_id) + raise + subport = [{'segmentation_id': vlan_id, 'port_id': subport, 'segmentation_type': 'vlan'}] try: os_net.add_trunk_subports(trunk_id, subport) except os_exc.ConflictException: - if retry_count < DEFAULT_MAX_RETRY_COUNT: - LOG.error("vlanid already in use on trunk, " - "%s. Retrying...", trunk_id) + if (retry_count < DEFAULT_MAX_RETRY_COUNT and + not requested_vlan_id): + LOG.error("VLAN ID already in use on trunk %s. " + "Retrying.", trunk_id) retry_count += 1 sleep(DEFAULT_RETRY_INTERVAL) continue else: - LOG.error( - "MAX retry count reached. Failed to add subport") + LOG.error("Failed to add subport %s to trunk %s due to " + "VLAN ID %d conflict.", subport, trunk_id, + vlan_id) raise - except os_exc.SDKException: LOG.exception("Error happened during subport " "addition to trunk %s", trunk_id) diff --git a/kuryr_kubernetes/controller/drivers/neutron_vif.py b/kuryr_kubernetes/controller/drivers/neutron_vif.py index eef5d7a91..6701d493e 100644 --- a/kuryr_kubernetes/controller/drivers/neutron_vif.py +++ b/kuryr_kubernetes/controller/drivers/neutron_vif.py @@ -93,7 +93,7 @@ class NeutronPodVIFDriver(base.PodVIFDriver): def release_vif(self, pod, vif, project_id=None, security_groups=None): clients.get_network_client().delete_port(vif.id) - def activate_vif(self, vif): + def activate_vif(self, vif, **kwargs): if vif.active: return diff --git a/kuryr_kubernetes/controller/drivers/sriov.py b/kuryr_kubernetes/controller/drivers/sriov.py index da334ca79..c59f606ab 100644 --- a/kuryr_kubernetes/controller/drivers/sriov.py +++ b/kuryr_kubernetes/controller/drivers/sriov.py @@ -73,7 +73,7 @@ class SriovVIFDriver(neutron_vif.NeutronPodVIFDriver): self._reduce_remaining_sriov_vfs(pod, physnet) return vif - def activate_vif(self, vif): + def activate_vif(self, vif, **kwargs): vif.active = True def _get_physnet_subnet_mapping(self): diff --git a/kuryr_kubernetes/controller/drivers/vif_pool.py b/kuryr_kubernetes/controller/drivers/vif_pool.py index 0522f200f..c6390a014 100644 --- a/kuryr_kubernetes/controller/drivers/vif_pool.py +++ b/kuryr_kubernetes/controller/drivers/vif_pool.py @@ -118,8 +118,8 @@ class NoopVIFPool(base.VIFPoolDriver): def release_vif(self, pod, vif, *argv): self._drv_vif.release_vif(pod, vif, *argv) - def activate_vif(self, vif): - self._drv_vif.activate_vif(vif) + def activate_vif(self, vif, **kwargs): + self._drv_vif.activate_vif(vif, **kwargs) def update_vif_sgs(self, pod, sgs): self._drv_vif.update_vif_sgs(pod, sgs) @@ -168,8 +168,8 @@ class BaseVIFPool(base.VIFPoolDriver, metaclass=abc.ABCMeta): def set_vif_driver(self, driver): self._drv_vif = driver - def activate_vif(self, vif): - self._drv_vif.activate_vif(vif) + def activate_vif(self, vif, **kwargs): + self._drv_vif.activate_vif(vif, **kwargs) def update_vif_sgs(self, pod, sgs): self._drv_vif.update_vif_sgs(pod, sgs) @@ -1230,9 +1230,9 @@ class MultiVIFPool(base.VIFPoolDriver): vif_drv_alias = self._get_vif_drv_alias(vif) self._vif_drvs[vif_drv_alias].release_vif(pod, vif, *argv) - def activate_vif(self, vif): + def activate_vif(self, vif, **kwargs): vif_drv_alias = self._get_vif_drv_alias(vif) - self._vif_drvs[vif_drv_alias].activate_vif(vif) + self._vif_drvs[vif_drv_alias].activate_vif(vif, **kwargs) def update_vif_sgs(self, pod, sgs): pod_vif_type = self._get_pod_vif_type(pod) diff --git a/kuryr_kubernetes/controller/handlers/kuryrnetwork.py b/kuryr_kubernetes/controller/handlers/kuryrnetwork.py index 4e1b07e5f..d23823529 100644 --- a/kuryr_kubernetes/controller/handlers/kuryrnetwork.py +++ b/kuryr_kubernetes/controller/handlers/kuryrnetwork.py @@ -50,7 +50,7 @@ class KuryrNetworkHandler(k8s_base.ResourceEventHandler): self._drv_svc_sg = ( drivers.ServiceSecurityGroupsDriver.get_instance()) - def on_present(self, kuryrnet_crd): + def on_present(self, kuryrnet_crd, *args, **kwargs): ns_name = kuryrnet_crd['spec']['nsName'] project_id = kuryrnet_crd['spec']['projectId'] kns_status = kuryrnet_crd.get('status', {}) @@ -90,7 +90,7 @@ class KuryrNetworkHandler(k8s_base.ResourceEventHandler): status = {'nsLabels': kuryrnet_crd['spec']['nsLabels']} self._patch_kuryrnetwork_crd(kuryrnet_crd, status, labels=True) - def on_finalize(self, kuryrnet_crd): + def on_finalize(self, kuryrnet_crd, *args, **kwargs): LOG.debug("Deleting kuryrnetwork CRD resources: %s", kuryrnet_crd) net_id = kuryrnet_crd.get('status', {}).get('netId') diff --git a/kuryr_kubernetes/controller/handlers/kuryrnetwork_population.py b/kuryr_kubernetes/controller/handlers/kuryrnetwork_population.py index 9673daf4d..09a9c9cd2 100644 --- a/kuryr_kubernetes/controller/handlers/kuryrnetwork_population.py +++ b/kuryr_kubernetes/controller/handlers/kuryrnetwork_population.py @@ -41,7 +41,7 @@ class KuryrNetworkPopulationHandler(k8s_base.ResourceEventHandler): self._drv_vif_pool.set_vif_driver() self._drv_nodes_subnets = drivers.NodesSubnetsDriver.get_instance() - def on_present(self, kuryrnet_crd): + def on_present(self, kuryrnet_crd, *args, **kwargs): subnet_id = kuryrnet_crd.get('status', {}).get('subnetId') if not subnet_id: LOG.debug("No Subnet present for KuryrNetwork %s", diff --git a/kuryr_kubernetes/controller/handlers/kuryrnetworkpolicy.py b/kuryr_kubernetes/controller/handlers/kuryrnetworkpolicy.py index 19d876091..91748ce85 100644 --- a/kuryr_kubernetes/controller/handlers/kuryrnetworkpolicy.py +++ b/kuryr_kubernetes/controller/handlers/kuryrnetworkpolicy.py @@ -87,7 +87,7 @@ class KuryrNetworkPolicyHandler(k8s_base.ResourceEventHandler): return False - def on_present(self, knp): + def on_present(self, knp, *args, **kwargs): uniq_name = utils.get_res_unique_name(knp) LOG.debug('on_present() for NP %s', uniq_name) project_id = self._drv_project.get_project(knp) @@ -234,7 +234,7 @@ class KuryrNetworkPolicyHandler(k8s_base.ResourceEventHandler): raise return net_crd['status']['netId'] - def on_finalize(self, knp): + def on_finalize(self, knp, *args, **kwargs): LOG.debug("Finalizing KuryrNetworkPolicy %s", knp) project_id = self._drv_project.get_project(knp) pods_to_update = self._drv_policy.affected_pods(knp) diff --git a/kuryr_kubernetes/controller/handlers/kuryrport.py b/kuryr_kubernetes/controller/handlers/kuryrport.py index 478924f53..c00fca76f 100644 --- a/kuryr_kubernetes/controller/handlers/kuryrport.py +++ b/kuryr_kubernetes/controller/handlers/kuryrport.py @@ -58,7 +58,7 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler): .get_instance()) self.k8s = clients.get_kubernetes_client() - def on_present(self, kuryrport_crd): + def on_present(self, kuryrport_crd, *args, **kwargs): if not kuryrport_crd['status']['vifs']: # Get vifs if not self.get_vifs(kuryrport_crd): @@ -66,6 +66,8 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler): # get_vifs method. return + retry_info = kwargs.get('retry_info') + vifs = {ifname: {'default': data['default'], 'vif': objects.base.VersionedObject .obj_from_primitive(data['vif'])} @@ -75,6 +77,7 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler): return changed = False + pod = self._get_pod(kuryrport_crd) try: for ifname, data in vifs.items(): @@ -86,22 +89,14 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler): driver_utils.update_port_pci_info(pod_node, data['vif']) if not data['vif'].active: try: - self._drv_vif_pool.activate_vif(data['vif']) + self._drv_vif_pool.activate_vif(data['vif'], pod=pod, + retry_info=retry_info) changed = True except os_exc.ResourceNotFound: LOG.debug("Port not found, possibly already deleted. " "No need to activate it") finally: if changed: - try: - name = kuryrport_crd['metadata']['name'] - namespace = kuryrport_crd['metadata']['namespace'] - pod = self.k8s.get(f"{constants.K8S_API_NAMESPACES}" - f"/{namespace}/pods/{name}") - except k_exc.K8sResourceNotFound as ex: - LOG.exception("Failed to get pod: %s", ex) - raise - project_id = self._drv_project.get_project(pod) try: @@ -124,7 +119,7 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler): self._update_services(services, crd_pod_selectors, project_id) - def on_finalize(self, kuryrport_crd): + def on_finalize(self, kuryrport_crd, *args, **kwargs): name = kuryrport_crd['metadata']['name'] namespace = kuryrport_crd['metadata']['namespace'] try: @@ -282,3 +277,13 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler): sgs = self._drv_svc_sg.get_security_groups(service, project_id) self._drv_lbaas.update_lbaas_sg(service, sgs) + + def _get_pod(self, kuryrport_crd): + try: + name = kuryrport_crd['metadata']['name'] + namespace = kuryrport_crd['metadata']['namespace'] + return self.k8s.get(f"{constants.K8S_API_NAMESPACES}" + f"/{namespace}/pods/{name}") + except k_exc.K8sResourceNotFound as ex: + LOG.exception("Failed to get pod: %s", ex) + raise diff --git a/kuryr_kubernetes/controller/handlers/lbaas.py b/kuryr_kubernetes/controller/handlers/lbaas.py index 8907d7780..775904163 100644 --- a/kuryr_kubernetes/controller/handlers/lbaas.py +++ b/kuryr_kubernetes/controller/handlers/lbaas.py @@ -50,7 +50,7 @@ class ServiceHandler(k8s_base.ResourceEventHandler): if driver_utils.is_network_policy_enabled(): driver_utils.bump_networkpolicies(svc['metadata']['namespace']) - def on_present(self, service): + def on_present(self, service, *args, **kwargs): reason = self._should_ignore(service) if reason: LOG.debug(reason, service['metadata']['name']) @@ -111,7 +111,7 @@ class ServiceHandler(k8s_base.ResourceEventHandler): k8s = clients.get_kubernetes_client() return k8s.add_finalizer(service, k_const.SERVICE_FINALIZER) - def on_finalize(self, service): + def on_finalize(self, service, *args, **kwargs): k8s = clients.get_kubernetes_client() svc_name = service['metadata']['name'] @@ -305,7 +305,7 @@ class EndpointsHandler(k8s_base.ResourceEventHandler): self._lb_provider = ( config.CONF.kubernetes.endpoints_driver_octavia_provider) - def on_present(self, endpoints): + def on_present(self, endpoints, *args, **kwargs): ep_name = endpoints['metadata']['name'] ep_namespace = endpoints['metadata']['namespace'] diff --git a/kuryr_kubernetes/controller/handlers/loadbalancer.py b/kuryr_kubernetes/controller/handlers/loadbalancer.py index ecdc833fb..10fc8dfbe 100644 --- a/kuryr_kubernetes/controller/handlers/loadbalancer.py +++ b/kuryr_kubernetes/controller/handlers/loadbalancer.py @@ -56,7 +56,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): return utils.get_subnets_id_cidrs( self._drv_nodes_subnets.get_nodes_subnets()) - def on_present(self, loadbalancer_crd): + def on_present(self, loadbalancer_crd, *args, **kwargs): if loadbalancer_crd.get('status', None) is None: kubernetes = clients.get_kubernetes_client() @@ -124,7 +124,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler): return False return True - def on_finalize(self, loadbalancer_crd): + def on_finalize(self, loadbalancer_crd, *args, **kwargs): LOG.debug("Deleting the loadbalancer CRD") if loadbalancer_crd['status'] != {}: diff --git a/kuryr_kubernetes/controller/handlers/machine.py b/kuryr_kubernetes/controller/handlers/machine.py index fe6b7519a..1c4ffa9d1 100644 --- a/kuryr_kubernetes/controller/handlers/machine.py +++ b/kuryr_kubernetes/controller/handlers/machine.py @@ -52,14 +52,14 @@ class MachineHandler(k8s_base.ResourceEventHandler): # Had to be deleted in the meanwhile. pass - def on_present(self, machine): + def on_present(self, machine, *args, **kwargs): effect = self.node_subnets_driver.add_node(machine) if effect: # If the change was meaningful we need to make sure all the NPs # are recalculated to get the new SG rules added. self._bump_nps() - def on_deleted(self, machine): + def on_deleted(self, machine, *args, **kwargs): effect = self.node_subnets_driver.delete_node(machine) if effect: # If the change was meaningful we need to make sure all the NPs diff --git a/kuryr_kubernetes/controller/handlers/namespace.py b/kuryr_kubernetes/controller/handlers/namespace.py index 61aa2ba36..ad18a1169 100644 --- a/kuryr_kubernetes/controller/handlers/namespace.py +++ b/kuryr_kubernetes/controller/handlers/namespace.py @@ -33,7 +33,7 @@ class NamespaceHandler(k8s_base.ResourceEventHandler): super(NamespaceHandler, self).__init__() self._drv_project = drivers.NamespaceProjectDriver.get_instance() - def on_present(self, namespace): + def on_present(self, namespace, *args, **kwargs): ns_labels = namespace['metadata'].get('labels', {}) ns_name = namespace['metadata']['name'] kns_crd = self._get_kns_crd(ns_name) diff --git a/kuryr_kubernetes/controller/handlers/pod_label.py b/kuryr_kubernetes/controller/handlers/pod_label.py index 13cbc55cb..78460c9d2 100644 --- a/kuryr_kubernetes/controller/handlers/pod_label.py +++ b/kuryr_kubernetes/controller/handlers/pod_label.py @@ -47,7 +47,7 @@ class PodLabelHandler(k8s_base.ResourceEventHandler): self._drv_vif_pool.set_vif_driver() self._drv_lbaas = drivers.LBaaSDriver.get_instance() - def on_present(self, pod): + def on_present(self, pod, *args, **kwargs): if driver_utils.is_host_network(pod) or not self._has_vifs(pod): # NOTE(ltomasbo): The event will be retried once the vif handler # annotates the pod with the pod state. diff --git a/kuryr_kubernetes/controller/handlers/policy.py b/kuryr_kubernetes/controller/handlers/policy.py index 237b1c9fb..d26ab3ff2 100644 --- a/kuryr_kubernetes/controller/handlers/policy.py +++ b/kuryr_kubernetes/controller/handlers/policy.py @@ -34,7 +34,7 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler): self._drv_policy = drivers.NetworkPolicyDriver.get_instance() self.k8s = clients.get_kubernetes_client() - def on_present(self, policy): + def on_present(self, policy, *args, **kwargs): LOG.debug("Created or updated: %s", policy) self._drv_policy.ensure_network_policy(policy) @@ -42,7 +42,7 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler): # Put finalizer in if it's not there already. self.k8s.add_finalizer(policy, k_const.NETWORKPOLICY_FINALIZER) - def on_finalize(self, policy): + def on_finalize(self, policy, *args, **kwargs): LOG.debug("Finalizing policy %s", policy) if not self._drv_policy.release_network_policy(policy): # KNP was not found, so we need to finalize on our own. diff --git a/kuryr_kubernetes/controller/handlers/vif.py b/kuryr_kubernetes/controller/handlers/vif.py index 049b91092..6c0611573 100644 --- a/kuryr_kubernetes/controller/handlers/vif.py +++ b/kuryr_kubernetes/controller/handlers/vif.py @@ -40,7 +40,7 @@ class VIFHandler(k8s_base.ResourceEventHandler): OBJECT_KIND = constants.K8S_OBJ_POD OBJECT_WATCH_PATH = "%s/%s" % (constants.K8S_API_BASE, "pods") - def on_present(self, pod): + def on_present(self, pod, *args, **kwargs): if (driver_utils.is_host_network(pod) or not self._is_pod_scheduled(pod)): # REVISIT(ivc): consider an additional configurable check that @@ -90,7 +90,7 @@ class VIFHandler(k8s_base.ResourceEventHandler): "KuryrPort CRD: %s", ex) raise k_exc.ResourceNotReady(pod) - def on_finalize(self, pod): + def on_finalize(self, pod, *args, **kwargs): k8s = clients.get_kubernetes_client() try: diff --git a/kuryr_kubernetes/handlers/k8s_base.py b/kuryr_kubernetes/handlers/k8s_base.py index 4a1a8ed45..8f37c180e 100755 --- a/kuryr_kubernetes/handlers/k8s_base.py +++ b/kuryr_kubernetes/handlers/k8s_base.py @@ -78,30 +78,30 @@ class ResourceEventHandler(dispatch.EventConsumer, health.HealthHandler): obj = event.get('object') if 'MODIFIED' == event_type: if self._check_finalize(obj): - self.on_finalize(obj) + self.on_finalize(obj, *args, **kwargs) return - self.on_modified(obj) - self.on_present(obj) + self.on_modified(obj, *args, **kwargs) + self.on_present(obj, *args, **kwargs) elif 'ADDED' == event_type: if self._check_finalize(obj): - self.on_finalize(obj) + self.on_finalize(obj, *args, **kwargs) return - self.on_added(obj) - self.on_present(obj) + self.on_added(obj, *args, **kwargs) + self.on_present(obj, *args, **kwargs) elif 'DELETED' == event_type: - self.on_deleted(obj) + self.on_deleted(obj, *args, **kwargs) - def on_added(self, obj): + def on_added(self, obj, *args, **kwargs): pass - def on_present(self, obj): + def on_present(self, obj, *args, **kwargs): pass - def on_modified(self, obj): + def on_modified(self, obj, *args, **kwargs): pass - def on_deleted(self, obj): + def on_deleted(self, obj, *args, **kwargs): pass - def on_finalize(self, obj): + def on_finalize(self, obj, *args, **kwargs): pass diff --git a/kuryr_kubernetes/handlers/retry.py b/kuryr_kubernetes/handlers/retry.py index 4e1fcda1c..6421cca80 100644 --- a/kuryr_kubernetes/handlers/retry.py +++ b/kuryr_kubernetes/handlers/retry.py @@ -54,6 +54,7 @@ class Retry(base.EventHandler): self._k8s = clients.get_kubernetes_client() def __call__(self, event, *args, **kwargs): + start_time = time.time() deadline = time.time() + self._timeout for attempt in itertools.count(1): if event.get('type') in ['MODIFIED', 'ADDED']: @@ -77,7 +78,10 @@ class Retry(base.EventHandler): "object. Continuing with handler " "execution.") try: - self._handler(event, *args, **kwargs) + info = { + 'elapsed': time.time() - start_time + } + self._handler(event, *args, retry_info=info, **kwargs) break except os_exc.ConflictException as ex: if ex.details.startswith('Quota exceeded for resources'): diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_kuryrport.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_kuryrport.py index 612743e0e..f7b3b5b01 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_kuryrport.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_kuryrport.py @@ -146,8 +146,10 @@ class TestKuryrPortHandler(test_base.TestCase): k8s.get.assert_called_once_with(self._pod_uri) - activate_vif.assert_has_calls([mock.call(self._vif1), - mock.call(self._vif2)]) + activate_vif.assert_has_calls([mock.call(self._vif1, pod=self._pod, + retry_info=mock.ANY), + mock.call(self._vif2, pod=self._pod, + retry_info=mock.ANY)]) update_crd.assert_called_once_with(self._kp, self._vifs) @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') @@ -182,8 +184,10 @@ class TestKuryrPortHandler(test_base.TestCase): kp.on_present(self._kp) - activate_vif.assert_has_calls([mock.call(self._vif1), - mock.call(self._vif2)]) + activate_vif.assert_has_calls([mock.call(self._vif1, pod=mock.ANY, + retry_info=mock.ANY), + mock.call(self._vif2, pod=mock.ANY, + retry_info=mock.ANY)]) @mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.' 'activate_vif') @@ -328,8 +332,10 @@ class TestKuryrPortHandler(test_base.TestCase): k8s.get.assert_called_once_with(self._pod_uri) - activate_vif.assert_has_calls([mock.call(self._vif1), - mock.call(self._vif2)]) + activate_vif.assert_has_calls([mock.call(self._vif1, pod=self._pod, + retry_info=mock.ANY), + mock.call(self._vif2, pod=self._pod, + retry_info=mock.ANY)]) update_crd.assert_called_once_with(self._kp, self._vifs) create_sgr.assert_called_once_with(self._pod) diff --git a/kuryr_kubernetes/tests/unit/handlers/test_retry.py b/kuryr_kubernetes/tests/unit/handlers/test_retry.py index b6ab03253..2f3837e6d 100644 --- a/kuryr_kubernetes/tests/unit/handlers/test_retry.py +++ b/kuryr_kubernetes/tests/unit/handlers/test_retry.py @@ -95,7 +95,7 @@ class TestRetryHandler(test_base.TestCase): retry(event) - m_handler.assert_called_once_with(event) + m_handler.assert_called_once_with(event, retry_info=mock.ANY) m_sleep.assert_not_called() @mock.patch('itertools.count') @@ -134,7 +134,8 @@ class TestRetryHandler(test_base.TestCase): retry(event) - m_handler.assert_has_calls([mock.call(event)] * attempts) + m_handler.assert_has_calls([mock.call( + event, retry_info=mock.ANY)] * attempts) m_sleep.assert_has_calls([ mock.call(deadline, i + 1, failures[i]) for i in range(len(failures))]) @@ -155,7 +156,8 @@ class TestRetryHandler(test_base.TestCase): self.assertRaises(_EX11, retry, event) - m_handler.assert_has_calls([mock.call(event)] * attempts) + m_handler.assert_has_calls([mock.call( + event, retry_info=mock.ANY)] * attempts) m_sleep.assert_has_calls([ mock.call(deadline, i + 1, failures[i]) for i in range(len(failures))])