Workaround OVN bug causing subports to be DOWN
Neutron should make a subport that is already attached to a trunk ACTIVE immediately. Unfortunately there seems to be an OVN bug causing an event triggering this to be lost, leaving the port in DOWN state forever. This is a disaster for Kuryr, because we can't proceed to wire the pods in such case. This commit attempts to workaround this by making Kuryr reattach the ports that are in DOWN state for more than 90 seconds after they're plugged. Change-Id: If9a3968d68dced588614cd5521d4a111e78d435f
This commit is contained in:
parent
1c2cd78966
commit
9d0b053c4f
|
@ -36,7 +36,7 @@ class CNIHandlerBase(k8s_base.ResourceEventHandler, metaclass=abc.ABCMeta):
|
||||||
self._callback = on_done
|
self._callback = on_done
|
||||||
self._vifs = {}
|
self._vifs = {}
|
||||||
|
|
||||||
def on_present(self, pod):
|
def on_present(self, pod, *args, **kwargs):
|
||||||
vifs = self._get_vifs(pod)
|
vifs = self._get_vifs(pod)
|
||||||
|
|
||||||
if self.should_callback(pod, vifs):
|
if self.should_callback(pod, vifs):
|
||||||
|
@ -107,7 +107,7 @@ class CallbackHandler(CNIHandlerBase):
|
||||||
def callback(self):
|
def callback(self):
|
||||||
self._callback(self._kuryrport, self._callback_vifs)
|
self._callback(self._kuryrport, self._callback_vifs)
|
||||||
|
|
||||||
def on_deleted(self, kuryrport):
|
def on_deleted(self, kuryrport, *args, **kwargs):
|
||||||
LOG.debug("Got kuryrport %s deletion event.",
|
LOG.debug("Got kuryrport %s deletion event.",
|
||||||
kuryrport['metadata']['name'])
|
kuryrport['metadata']['name'])
|
||||||
if self._del_callback:
|
if self._del_callback:
|
||||||
|
|
|
@ -388,7 +388,7 @@ class PodVIFDriver(DriverBase, metaclass=abc.ABCMeta):
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def activate_vif(self, vif):
|
def activate_vif(self, vif, **kwargs):
|
||||||
"""Updates VIF to become active.
|
"""Updates VIF to become active.
|
||||||
|
|
||||||
Implementing drivers should update the specified `vif` object's
|
Implementing drivers should update the specified `vif` object's
|
||||||
|
|
|
@ -64,7 +64,7 @@ class NestedDpdkPodVIFDriver(nested_vif.NestedPodVIFDriver):
|
||||||
vif.id, vm_id)
|
vif.id, vm_id)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def activate_vif(self, vif):
|
def activate_vif(self, vif, **kwargs):
|
||||||
# NOTE(danil): new virtual interface was created in nova instance
|
# NOTE(danil): new virtual interface was created in nova instance
|
||||||
# during request_vif call, thus if it was not created successfully
|
# during request_vif call, thus if it was not created successfully
|
||||||
# an exception o_exc.SDKException would be throwed. During binding
|
# an exception o_exc.SDKException would be throwed. During binding
|
||||||
|
|
|
@ -86,7 +86,7 @@ class NestedMacvlanPodVIFDriver(nested_vif.NestedPodVIFDriver):
|
||||||
LOG.warning("Unable to release port %s as it no longer exists.",
|
LOG.warning("Unable to release port %s as it no longer exists.",
|
||||||
vif.id)
|
vif.id)
|
||||||
|
|
||||||
def activate_vif(self, vif):
|
def activate_vif(self, vif, **kwargs):
|
||||||
# NOTE(mchiappe): there is no way to get feedback on the actual
|
# NOTE(mchiappe): there is no way to get feedback on the actual
|
||||||
# interface creation or activation as no plugging can happen for this
|
# interface creation or activation as no plugging can happen for this
|
||||||
# interface type. However the status of the port is not relevant as
|
# interface type. However the status of the port is not relevant as
|
||||||
|
|
|
@ -33,6 +33,7 @@ LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
DEFAULT_MAX_RETRY_COUNT = 3
|
DEFAULT_MAX_RETRY_COUNT = 3
|
||||||
DEFAULT_RETRY_INTERVAL = 1
|
DEFAULT_RETRY_INTERVAL = 1
|
||||||
|
ACTIVE_TIMEOUT = 90
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
@ -118,6 +119,37 @@ class NestedVlanPodVIFDriver(nested_vif.NestedPodVIFDriver):
|
||||||
vifs.append(vif)
|
vifs.append(vif)
|
||||||
return vifs
|
return vifs
|
||||||
|
|
||||||
|
def activate_vif(self, vif, pod=None, retry_info=None):
|
||||||
|
try:
|
||||||
|
super().activate_vif(vif)
|
||||||
|
except k_exc.ResourceNotReady:
|
||||||
|
if retry_info and retry_info.get('elapsed', 0) > ACTIVE_TIMEOUT:
|
||||||
|
parent_port = self._get_parent_port(pod)
|
||||||
|
trunk_id = self._get_trunk_id(parent_port)
|
||||||
|
# NOTE(dulek): We don't need a lock to prevent VLAN ID from
|
||||||
|
# being taken over because the SegmentationDriver
|
||||||
|
# will keep it reserved in memory unless we
|
||||||
|
# release it. And we won't.
|
||||||
|
LOG.warning('Subport %s is in DOWN status for more than %d '
|
||||||
|
'seconds. This is a Neutron issue. Attempting to '
|
||||||
|
'reattach the subport to trunk %s using VLAN ID %s'
|
||||||
|
' to fix it.', vif.id, retry_info['elapsed'],
|
||||||
|
trunk_id, vif.vlan_id)
|
||||||
|
try:
|
||||||
|
self._remove_subport(trunk_id, vif.id)
|
||||||
|
except os_exc.NotFoundException:
|
||||||
|
# NOTE(dulek): This may happen when _add_subport() failed
|
||||||
|
# or Kuryr crashed between the calls. Let's
|
||||||
|
# try to fix it hoping that VLAN ID is still
|
||||||
|
# free.
|
||||||
|
LOG.warning('Subport %s was not attached to the trunk. '
|
||||||
|
'Trying to attach it anyway.', vif.id)
|
||||||
|
self._add_subport(trunk_id, vif.id,
|
||||||
|
requested_vlan_id=vif.vlan_id)
|
||||||
|
LOG.warning("Reattached subport %s, its state will be "
|
||||||
|
"rechecked when event will be retried.", vif.id)
|
||||||
|
raise
|
||||||
|
|
||||||
def release_vif(self, pod, vif, project_id=None, security_groups=None):
|
def release_vif(self, pod, vif, project_id=None, security_groups=None):
|
||||||
os_net = clients.get_network_client()
|
os_net = clients.get_network_client()
|
||||||
parent_port = self._get_parent_port(pod)
|
parent_port = self._get_parent_port(pod)
|
||||||
|
@ -182,7 +214,7 @@ class NestedVlanPodVIFDriver(nested_vif.NestedPodVIFDriver):
|
||||||
"with a Neutron vlan trunk")
|
"with a Neutron vlan trunk")
|
||||||
raise k_exc.K8sNodeTrunkPortFailure
|
raise k_exc.K8sNodeTrunkPortFailure
|
||||||
|
|
||||||
def _add_subport(self, trunk_id, subport):
|
def _add_subport(self, trunk_id, subport, requested_vlan_id=None):
|
||||||
"""Adds subport port to Neutron trunk
|
"""Adds subport port to Neutron trunk
|
||||||
|
|
||||||
This method gets vlanid allocated from kuryr segmentation driver.
|
This method gets vlanid allocated from kuryr segmentation driver.
|
||||||
|
@ -196,29 +228,34 @@ class NestedVlanPodVIFDriver(nested_vif.NestedPodVIFDriver):
|
||||||
os_net = clients.get_network_client()
|
os_net = clients.get_network_client()
|
||||||
retry_count = 1
|
retry_count = 1
|
||||||
while True:
|
while True:
|
||||||
try:
|
if requested_vlan_id:
|
||||||
vlan_id = self._get_vlan_id(trunk_id)
|
vlan_id = requested_vlan_id
|
||||||
except os_exc.SDKException:
|
else:
|
||||||
LOG.error("Getting VlanID for subport on "
|
try:
|
||||||
"trunk %s failed!!", trunk_id)
|
vlan_id = self._get_vlan_id(trunk_id)
|
||||||
raise
|
except os_exc.SDKException:
|
||||||
|
LOG.error("Getting VLAN ID for subport on "
|
||||||
|
"trunk %s failed!!", trunk_id)
|
||||||
|
raise
|
||||||
|
|
||||||
subport = [{'segmentation_id': vlan_id,
|
subport = [{'segmentation_id': vlan_id,
|
||||||
'port_id': subport,
|
'port_id': subport,
|
||||||
'segmentation_type': 'vlan'}]
|
'segmentation_type': 'vlan'}]
|
||||||
try:
|
try:
|
||||||
os_net.add_trunk_subports(trunk_id, subport)
|
os_net.add_trunk_subports(trunk_id, subport)
|
||||||
except os_exc.ConflictException:
|
except os_exc.ConflictException:
|
||||||
if retry_count < DEFAULT_MAX_RETRY_COUNT:
|
if (retry_count < DEFAULT_MAX_RETRY_COUNT and
|
||||||
LOG.error("vlanid already in use on trunk, "
|
not requested_vlan_id):
|
||||||
"%s. Retrying...", trunk_id)
|
LOG.error("VLAN ID already in use on trunk %s. "
|
||||||
|
"Retrying.", trunk_id)
|
||||||
retry_count += 1
|
retry_count += 1
|
||||||
sleep(DEFAULT_RETRY_INTERVAL)
|
sleep(DEFAULT_RETRY_INTERVAL)
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
LOG.error(
|
LOG.error("Failed to add subport %s to trunk %s due to "
|
||||||
"MAX retry count reached. Failed to add subport")
|
"VLAN ID %d conflict.", subport, trunk_id,
|
||||||
|
vlan_id)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
except os_exc.SDKException:
|
except os_exc.SDKException:
|
||||||
LOG.exception("Error happened during subport "
|
LOG.exception("Error happened during subport "
|
||||||
"addition to trunk %s", trunk_id)
|
"addition to trunk %s", trunk_id)
|
||||||
|
|
|
@ -93,7 +93,7 @@ class NeutronPodVIFDriver(base.PodVIFDriver):
|
||||||
def release_vif(self, pod, vif, project_id=None, security_groups=None):
|
def release_vif(self, pod, vif, project_id=None, security_groups=None):
|
||||||
clients.get_network_client().delete_port(vif.id)
|
clients.get_network_client().delete_port(vif.id)
|
||||||
|
|
||||||
def activate_vif(self, vif):
|
def activate_vif(self, vif, **kwargs):
|
||||||
if vif.active:
|
if vif.active:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
|
@ -73,7 +73,7 @@ class SriovVIFDriver(neutron_vif.NeutronPodVIFDriver):
|
||||||
self._reduce_remaining_sriov_vfs(pod, physnet)
|
self._reduce_remaining_sriov_vfs(pod, physnet)
|
||||||
return vif
|
return vif
|
||||||
|
|
||||||
def activate_vif(self, vif):
|
def activate_vif(self, vif, **kwargs):
|
||||||
vif.active = True
|
vif.active = True
|
||||||
|
|
||||||
def _get_physnet_subnet_mapping(self):
|
def _get_physnet_subnet_mapping(self):
|
||||||
|
|
|
@ -118,8 +118,8 @@ class NoopVIFPool(base.VIFPoolDriver):
|
||||||
def release_vif(self, pod, vif, *argv):
|
def release_vif(self, pod, vif, *argv):
|
||||||
self._drv_vif.release_vif(pod, vif, *argv)
|
self._drv_vif.release_vif(pod, vif, *argv)
|
||||||
|
|
||||||
def activate_vif(self, vif):
|
def activate_vif(self, vif, **kwargs):
|
||||||
self._drv_vif.activate_vif(vif)
|
self._drv_vif.activate_vif(vif, **kwargs)
|
||||||
|
|
||||||
def update_vif_sgs(self, pod, sgs):
|
def update_vif_sgs(self, pod, sgs):
|
||||||
self._drv_vif.update_vif_sgs(pod, sgs)
|
self._drv_vif.update_vif_sgs(pod, sgs)
|
||||||
|
@ -168,8 +168,8 @@ class BaseVIFPool(base.VIFPoolDriver, metaclass=abc.ABCMeta):
|
||||||
def set_vif_driver(self, driver):
|
def set_vif_driver(self, driver):
|
||||||
self._drv_vif = driver
|
self._drv_vif = driver
|
||||||
|
|
||||||
def activate_vif(self, vif):
|
def activate_vif(self, vif, **kwargs):
|
||||||
self._drv_vif.activate_vif(vif)
|
self._drv_vif.activate_vif(vif, **kwargs)
|
||||||
|
|
||||||
def update_vif_sgs(self, pod, sgs):
|
def update_vif_sgs(self, pod, sgs):
|
||||||
self._drv_vif.update_vif_sgs(pod, sgs)
|
self._drv_vif.update_vif_sgs(pod, sgs)
|
||||||
|
@ -1230,9 +1230,9 @@ class MultiVIFPool(base.VIFPoolDriver):
|
||||||
vif_drv_alias = self._get_vif_drv_alias(vif)
|
vif_drv_alias = self._get_vif_drv_alias(vif)
|
||||||
self._vif_drvs[vif_drv_alias].release_vif(pod, vif, *argv)
|
self._vif_drvs[vif_drv_alias].release_vif(pod, vif, *argv)
|
||||||
|
|
||||||
def activate_vif(self, vif):
|
def activate_vif(self, vif, **kwargs):
|
||||||
vif_drv_alias = self._get_vif_drv_alias(vif)
|
vif_drv_alias = self._get_vif_drv_alias(vif)
|
||||||
self._vif_drvs[vif_drv_alias].activate_vif(vif)
|
self._vif_drvs[vif_drv_alias].activate_vif(vif, **kwargs)
|
||||||
|
|
||||||
def update_vif_sgs(self, pod, sgs):
|
def update_vif_sgs(self, pod, sgs):
|
||||||
pod_vif_type = self._get_pod_vif_type(pod)
|
pod_vif_type = self._get_pod_vif_type(pod)
|
||||||
|
|
|
@ -50,7 +50,7 @@ class KuryrNetworkHandler(k8s_base.ResourceEventHandler):
|
||||||
self._drv_svc_sg = (
|
self._drv_svc_sg = (
|
||||||
drivers.ServiceSecurityGroupsDriver.get_instance())
|
drivers.ServiceSecurityGroupsDriver.get_instance())
|
||||||
|
|
||||||
def on_present(self, kuryrnet_crd):
|
def on_present(self, kuryrnet_crd, *args, **kwargs):
|
||||||
ns_name = kuryrnet_crd['spec']['nsName']
|
ns_name = kuryrnet_crd['spec']['nsName']
|
||||||
project_id = kuryrnet_crd['spec']['projectId']
|
project_id = kuryrnet_crd['spec']['projectId']
|
||||||
kns_status = kuryrnet_crd.get('status', {})
|
kns_status = kuryrnet_crd.get('status', {})
|
||||||
|
@ -90,7 +90,7 @@ class KuryrNetworkHandler(k8s_base.ResourceEventHandler):
|
||||||
status = {'nsLabels': kuryrnet_crd['spec']['nsLabels']}
|
status = {'nsLabels': kuryrnet_crd['spec']['nsLabels']}
|
||||||
self._patch_kuryrnetwork_crd(kuryrnet_crd, status, labels=True)
|
self._patch_kuryrnetwork_crd(kuryrnet_crd, status, labels=True)
|
||||||
|
|
||||||
def on_finalize(self, kuryrnet_crd):
|
def on_finalize(self, kuryrnet_crd, *args, **kwargs):
|
||||||
LOG.debug("Deleting kuryrnetwork CRD resources: %s", kuryrnet_crd)
|
LOG.debug("Deleting kuryrnetwork CRD resources: %s", kuryrnet_crd)
|
||||||
|
|
||||||
net_id = kuryrnet_crd.get('status', {}).get('netId')
|
net_id = kuryrnet_crd.get('status', {}).get('netId')
|
||||||
|
|
|
@ -41,7 +41,7 @@ class KuryrNetworkPopulationHandler(k8s_base.ResourceEventHandler):
|
||||||
self._drv_vif_pool.set_vif_driver()
|
self._drv_vif_pool.set_vif_driver()
|
||||||
self._drv_nodes_subnets = drivers.NodesSubnetsDriver.get_instance()
|
self._drv_nodes_subnets = drivers.NodesSubnetsDriver.get_instance()
|
||||||
|
|
||||||
def on_present(self, kuryrnet_crd):
|
def on_present(self, kuryrnet_crd, *args, **kwargs):
|
||||||
subnet_id = kuryrnet_crd.get('status', {}).get('subnetId')
|
subnet_id = kuryrnet_crd.get('status', {}).get('subnetId')
|
||||||
if not subnet_id:
|
if not subnet_id:
|
||||||
LOG.debug("No Subnet present for KuryrNetwork %s",
|
LOG.debug("No Subnet present for KuryrNetwork %s",
|
||||||
|
|
|
@ -87,7 +87,7 @@ class KuryrNetworkPolicyHandler(k8s_base.ResourceEventHandler):
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def on_present(self, knp):
|
def on_present(self, knp, *args, **kwargs):
|
||||||
uniq_name = utils.get_res_unique_name(knp)
|
uniq_name = utils.get_res_unique_name(knp)
|
||||||
LOG.debug('on_present() for NP %s', uniq_name)
|
LOG.debug('on_present() for NP %s', uniq_name)
|
||||||
project_id = self._drv_project.get_project(knp)
|
project_id = self._drv_project.get_project(knp)
|
||||||
|
@ -234,7 +234,7 @@ class KuryrNetworkPolicyHandler(k8s_base.ResourceEventHandler):
|
||||||
raise
|
raise
|
||||||
return net_crd['status']['netId']
|
return net_crd['status']['netId']
|
||||||
|
|
||||||
def on_finalize(self, knp):
|
def on_finalize(self, knp, *args, **kwargs):
|
||||||
LOG.debug("Finalizing KuryrNetworkPolicy %s", knp)
|
LOG.debug("Finalizing KuryrNetworkPolicy %s", knp)
|
||||||
project_id = self._drv_project.get_project(knp)
|
project_id = self._drv_project.get_project(knp)
|
||||||
pods_to_update = self._drv_policy.affected_pods(knp)
|
pods_to_update = self._drv_policy.affected_pods(knp)
|
||||||
|
|
|
@ -58,7 +58,7 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler):
|
||||||
.get_instance())
|
.get_instance())
|
||||||
self.k8s = clients.get_kubernetes_client()
|
self.k8s = clients.get_kubernetes_client()
|
||||||
|
|
||||||
def on_present(self, kuryrport_crd):
|
def on_present(self, kuryrport_crd, *args, **kwargs):
|
||||||
if not kuryrport_crd['status']['vifs']:
|
if not kuryrport_crd['status']['vifs']:
|
||||||
# Get vifs
|
# Get vifs
|
||||||
if not self.get_vifs(kuryrport_crd):
|
if not self.get_vifs(kuryrport_crd):
|
||||||
|
@ -66,6 +66,8 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler):
|
||||||
# get_vifs method.
|
# get_vifs method.
|
||||||
return
|
return
|
||||||
|
|
||||||
|
retry_info = kwargs.get('retry_info')
|
||||||
|
|
||||||
vifs = {ifname: {'default': data['default'],
|
vifs = {ifname: {'default': data['default'],
|
||||||
'vif': objects.base.VersionedObject
|
'vif': objects.base.VersionedObject
|
||||||
.obj_from_primitive(data['vif'])}
|
.obj_from_primitive(data['vif'])}
|
||||||
|
@ -75,6 +77,7 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler):
|
||||||
return
|
return
|
||||||
|
|
||||||
changed = False
|
changed = False
|
||||||
|
pod = self._get_pod(kuryrport_crd)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
for ifname, data in vifs.items():
|
for ifname, data in vifs.items():
|
||||||
|
@ -86,22 +89,14 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler):
|
||||||
driver_utils.update_port_pci_info(pod_node, data['vif'])
|
driver_utils.update_port_pci_info(pod_node, data['vif'])
|
||||||
if not data['vif'].active:
|
if not data['vif'].active:
|
||||||
try:
|
try:
|
||||||
self._drv_vif_pool.activate_vif(data['vif'])
|
self._drv_vif_pool.activate_vif(data['vif'], pod=pod,
|
||||||
|
retry_info=retry_info)
|
||||||
changed = True
|
changed = True
|
||||||
except os_exc.ResourceNotFound:
|
except os_exc.ResourceNotFound:
|
||||||
LOG.debug("Port not found, possibly already deleted. "
|
LOG.debug("Port not found, possibly already deleted. "
|
||||||
"No need to activate it")
|
"No need to activate it")
|
||||||
finally:
|
finally:
|
||||||
if changed:
|
if changed:
|
||||||
try:
|
|
||||||
name = kuryrport_crd['metadata']['name']
|
|
||||||
namespace = kuryrport_crd['metadata']['namespace']
|
|
||||||
pod = self.k8s.get(f"{constants.K8S_API_NAMESPACES}"
|
|
||||||
f"/{namespace}/pods/{name}")
|
|
||||||
except k_exc.K8sResourceNotFound as ex:
|
|
||||||
LOG.exception("Failed to get pod: %s", ex)
|
|
||||||
raise
|
|
||||||
|
|
||||||
project_id = self._drv_project.get_project(pod)
|
project_id = self._drv_project.get_project(pod)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -124,7 +119,7 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler):
|
||||||
self._update_services(services, crd_pod_selectors,
|
self._update_services(services, crd_pod_selectors,
|
||||||
project_id)
|
project_id)
|
||||||
|
|
||||||
def on_finalize(self, kuryrport_crd):
|
def on_finalize(self, kuryrport_crd, *args, **kwargs):
|
||||||
name = kuryrport_crd['metadata']['name']
|
name = kuryrport_crd['metadata']['name']
|
||||||
namespace = kuryrport_crd['metadata']['namespace']
|
namespace = kuryrport_crd['metadata']['namespace']
|
||||||
try:
|
try:
|
||||||
|
@ -282,3 +277,13 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler):
|
||||||
sgs = self._drv_svc_sg.get_security_groups(service,
|
sgs = self._drv_svc_sg.get_security_groups(service,
|
||||||
project_id)
|
project_id)
|
||||||
self._drv_lbaas.update_lbaas_sg(service, sgs)
|
self._drv_lbaas.update_lbaas_sg(service, sgs)
|
||||||
|
|
||||||
|
def _get_pod(self, kuryrport_crd):
|
||||||
|
try:
|
||||||
|
name = kuryrport_crd['metadata']['name']
|
||||||
|
namespace = kuryrport_crd['metadata']['namespace']
|
||||||
|
return self.k8s.get(f"{constants.K8S_API_NAMESPACES}"
|
||||||
|
f"/{namespace}/pods/{name}")
|
||||||
|
except k_exc.K8sResourceNotFound as ex:
|
||||||
|
LOG.exception("Failed to get pod: %s", ex)
|
||||||
|
raise
|
||||||
|
|
|
@ -50,7 +50,7 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
|
||||||
if driver_utils.is_network_policy_enabled():
|
if driver_utils.is_network_policy_enabled():
|
||||||
driver_utils.bump_networkpolicies(svc['metadata']['namespace'])
|
driver_utils.bump_networkpolicies(svc['metadata']['namespace'])
|
||||||
|
|
||||||
def on_present(self, service):
|
def on_present(self, service, *args, **kwargs):
|
||||||
reason = self._should_ignore(service)
|
reason = self._should_ignore(service)
|
||||||
if reason:
|
if reason:
|
||||||
LOG.debug(reason, service['metadata']['name'])
|
LOG.debug(reason, service['metadata']['name'])
|
||||||
|
@ -111,7 +111,7 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
|
||||||
k8s = clients.get_kubernetes_client()
|
k8s = clients.get_kubernetes_client()
|
||||||
return k8s.add_finalizer(service, k_const.SERVICE_FINALIZER)
|
return k8s.add_finalizer(service, k_const.SERVICE_FINALIZER)
|
||||||
|
|
||||||
def on_finalize(self, service):
|
def on_finalize(self, service, *args, **kwargs):
|
||||||
k8s = clients.get_kubernetes_client()
|
k8s = clients.get_kubernetes_client()
|
||||||
|
|
||||||
svc_name = service['metadata']['name']
|
svc_name = service['metadata']['name']
|
||||||
|
@ -305,7 +305,7 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
|
||||||
self._lb_provider = (
|
self._lb_provider = (
|
||||||
config.CONF.kubernetes.endpoints_driver_octavia_provider)
|
config.CONF.kubernetes.endpoints_driver_octavia_provider)
|
||||||
|
|
||||||
def on_present(self, endpoints):
|
def on_present(self, endpoints, *args, **kwargs):
|
||||||
ep_name = endpoints['metadata']['name']
|
ep_name = endpoints['metadata']['name']
|
||||||
ep_namespace = endpoints['metadata']['namespace']
|
ep_namespace = endpoints['metadata']['namespace']
|
||||||
|
|
||||||
|
|
|
@ -56,7 +56,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||||
return utils.get_subnets_id_cidrs(
|
return utils.get_subnets_id_cidrs(
|
||||||
self._drv_nodes_subnets.get_nodes_subnets())
|
self._drv_nodes_subnets.get_nodes_subnets())
|
||||||
|
|
||||||
def on_present(self, loadbalancer_crd):
|
def on_present(self, loadbalancer_crd, *args, **kwargs):
|
||||||
if loadbalancer_crd.get('status', None) is None:
|
if loadbalancer_crd.get('status', None) is None:
|
||||||
|
|
||||||
kubernetes = clients.get_kubernetes_client()
|
kubernetes = clients.get_kubernetes_client()
|
||||||
|
@ -124,7 +124,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def on_finalize(self, loadbalancer_crd):
|
def on_finalize(self, loadbalancer_crd, *args, **kwargs):
|
||||||
LOG.debug("Deleting the loadbalancer CRD")
|
LOG.debug("Deleting the loadbalancer CRD")
|
||||||
|
|
||||||
if loadbalancer_crd['status'] != {}:
|
if loadbalancer_crd['status'] != {}:
|
||||||
|
|
|
@ -52,14 +52,14 @@ class MachineHandler(k8s_base.ResourceEventHandler):
|
||||||
# Had to be deleted in the meanwhile.
|
# Had to be deleted in the meanwhile.
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def on_present(self, machine):
|
def on_present(self, machine, *args, **kwargs):
|
||||||
effect = self.node_subnets_driver.add_node(machine)
|
effect = self.node_subnets_driver.add_node(machine)
|
||||||
if effect:
|
if effect:
|
||||||
# If the change was meaningful we need to make sure all the NPs
|
# If the change was meaningful we need to make sure all the NPs
|
||||||
# are recalculated to get the new SG rules added.
|
# are recalculated to get the new SG rules added.
|
||||||
self._bump_nps()
|
self._bump_nps()
|
||||||
|
|
||||||
def on_deleted(self, machine):
|
def on_deleted(self, machine, *args, **kwargs):
|
||||||
effect = self.node_subnets_driver.delete_node(machine)
|
effect = self.node_subnets_driver.delete_node(machine)
|
||||||
if effect:
|
if effect:
|
||||||
# If the change was meaningful we need to make sure all the NPs
|
# If the change was meaningful we need to make sure all the NPs
|
||||||
|
|
|
@ -33,7 +33,7 @@ class NamespaceHandler(k8s_base.ResourceEventHandler):
|
||||||
super(NamespaceHandler, self).__init__()
|
super(NamespaceHandler, self).__init__()
|
||||||
self._drv_project = drivers.NamespaceProjectDriver.get_instance()
|
self._drv_project = drivers.NamespaceProjectDriver.get_instance()
|
||||||
|
|
||||||
def on_present(self, namespace):
|
def on_present(self, namespace, *args, **kwargs):
|
||||||
ns_labels = namespace['metadata'].get('labels', {})
|
ns_labels = namespace['metadata'].get('labels', {})
|
||||||
ns_name = namespace['metadata']['name']
|
ns_name = namespace['metadata']['name']
|
||||||
kns_crd = self._get_kns_crd(ns_name)
|
kns_crd = self._get_kns_crd(ns_name)
|
||||||
|
|
|
@ -47,7 +47,7 @@ class PodLabelHandler(k8s_base.ResourceEventHandler):
|
||||||
self._drv_vif_pool.set_vif_driver()
|
self._drv_vif_pool.set_vif_driver()
|
||||||
self._drv_lbaas = drivers.LBaaSDriver.get_instance()
|
self._drv_lbaas = drivers.LBaaSDriver.get_instance()
|
||||||
|
|
||||||
def on_present(self, pod):
|
def on_present(self, pod, *args, **kwargs):
|
||||||
if driver_utils.is_host_network(pod) or not self._has_vifs(pod):
|
if driver_utils.is_host_network(pod) or not self._has_vifs(pod):
|
||||||
# NOTE(ltomasbo): The event will be retried once the vif handler
|
# NOTE(ltomasbo): The event will be retried once the vif handler
|
||||||
# annotates the pod with the pod state.
|
# annotates the pod with the pod state.
|
||||||
|
|
|
@ -34,7 +34,7 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
|
||||||
self._drv_policy = drivers.NetworkPolicyDriver.get_instance()
|
self._drv_policy = drivers.NetworkPolicyDriver.get_instance()
|
||||||
self.k8s = clients.get_kubernetes_client()
|
self.k8s = clients.get_kubernetes_client()
|
||||||
|
|
||||||
def on_present(self, policy):
|
def on_present(self, policy, *args, **kwargs):
|
||||||
LOG.debug("Created or updated: %s", policy)
|
LOG.debug("Created or updated: %s", policy)
|
||||||
|
|
||||||
self._drv_policy.ensure_network_policy(policy)
|
self._drv_policy.ensure_network_policy(policy)
|
||||||
|
@ -42,7 +42,7 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
|
||||||
# Put finalizer in if it's not there already.
|
# Put finalizer in if it's not there already.
|
||||||
self.k8s.add_finalizer(policy, k_const.NETWORKPOLICY_FINALIZER)
|
self.k8s.add_finalizer(policy, k_const.NETWORKPOLICY_FINALIZER)
|
||||||
|
|
||||||
def on_finalize(self, policy):
|
def on_finalize(self, policy, *args, **kwargs):
|
||||||
LOG.debug("Finalizing policy %s", policy)
|
LOG.debug("Finalizing policy %s", policy)
|
||||||
if not self._drv_policy.release_network_policy(policy):
|
if not self._drv_policy.release_network_policy(policy):
|
||||||
# KNP was not found, so we need to finalize on our own.
|
# KNP was not found, so we need to finalize on our own.
|
||||||
|
|
|
@ -40,7 +40,7 @@ class VIFHandler(k8s_base.ResourceEventHandler):
|
||||||
OBJECT_KIND = constants.K8S_OBJ_POD
|
OBJECT_KIND = constants.K8S_OBJ_POD
|
||||||
OBJECT_WATCH_PATH = "%s/%s" % (constants.K8S_API_BASE, "pods")
|
OBJECT_WATCH_PATH = "%s/%s" % (constants.K8S_API_BASE, "pods")
|
||||||
|
|
||||||
def on_present(self, pod):
|
def on_present(self, pod, *args, **kwargs):
|
||||||
if (driver_utils.is_host_network(pod) or
|
if (driver_utils.is_host_network(pod) or
|
||||||
not self._is_pod_scheduled(pod)):
|
not self._is_pod_scheduled(pod)):
|
||||||
# REVISIT(ivc): consider an additional configurable check that
|
# REVISIT(ivc): consider an additional configurable check that
|
||||||
|
@ -90,7 +90,7 @@ class VIFHandler(k8s_base.ResourceEventHandler):
|
||||||
"KuryrPort CRD: %s", ex)
|
"KuryrPort CRD: %s", ex)
|
||||||
raise k_exc.ResourceNotReady(pod)
|
raise k_exc.ResourceNotReady(pod)
|
||||||
|
|
||||||
def on_finalize(self, pod):
|
def on_finalize(self, pod, *args, **kwargs):
|
||||||
k8s = clients.get_kubernetes_client()
|
k8s = clients.get_kubernetes_client()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -78,30 +78,30 @@ class ResourceEventHandler(dispatch.EventConsumer, health.HealthHandler):
|
||||||
obj = event.get('object')
|
obj = event.get('object')
|
||||||
if 'MODIFIED' == event_type:
|
if 'MODIFIED' == event_type:
|
||||||
if self._check_finalize(obj):
|
if self._check_finalize(obj):
|
||||||
self.on_finalize(obj)
|
self.on_finalize(obj, *args, **kwargs)
|
||||||
return
|
return
|
||||||
self.on_modified(obj)
|
self.on_modified(obj, *args, **kwargs)
|
||||||
self.on_present(obj)
|
self.on_present(obj, *args, **kwargs)
|
||||||
elif 'ADDED' == event_type:
|
elif 'ADDED' == event_type:
|
||||||
if self._check_finalize(obj):
|
if self._check_finalize(obj):
|
||||||
self.on_finalize(obj)
|
self.on_finalize(obj, *args, **kwargs)
|
||||||
return
|
return
|
||||||
self.on_added(obj)
|
self.on_added(obj, *args, **kwargs)
|
||||||
self.on_present(obj)
|
self.on_present(obj, *args, **kwargs)
|
||||||
elif 'DELETED' == event_type:
|
elif 'DELETED' == event_type:
|
||||||
self.on_deleted(obj)
|
self.on_deleted(obj, *args, **kwargs)
|
||||||
|
|
||||||
def on_added(self, obj):
|
def on_added(self, obj, *args, **kwargs):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def on_present(self, obj):
|
def on_present(self, obj, *args, **kwargs):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def on_modified(self, obj):
|
def on_modified(self, obj, *args, **kwargs):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def on_deleted(self, obj):
|
def on_deleted(self, obj, *args, **kwargs):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def on_finalize(self, obj):
|
def on_finalize(self, obj, *args, **kwargs):
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -54,6 +54,7 @@ class Retry(base.EventHandler):
|
||||||
self._k8s = clients.get_kubernetes_client()
|
self._k8s = clients.get_kubernetes_client()
|
||||||
|
|
||||||
def __call__(self, event, *args, **kwargs):
|
def __call__(self, event, *args, **kwargs):
|
||||||
|
start_time = time.time()
|
||||||
deadline = time.time() + self._timeout
|
deadline = time.time() + self._timeout
|
||||||
for attempt in itertools.count(1):
|
for attempt in itertools.count(1):
|
||||||
if event.get('type') in ['MODIFIED', 'ADDED']:
|
if event.get('type') in ['MODIFIED', 'ADDED']:
|
||||||
|
@ -77,7 +78,10 @@ class Retry(base.EventHandler):
|
||||||
"object. Continuing with handler "
|
"object. Continuing with handler "
|
||||||
"execution.")
|
"execution.")
|
||||||
try:
|
try:
|
||||||
self._handler(event, *args, **kwargs)
|
info = {
|
||||||
|
'elapsed': time.time() - start_time
|
||||||
|
}
|
||||||
|
self._handler(event, *args, retry_info=info, **kwargs)
|
||||||
break
|
break
|
||||||
except os_exc.ConflictException as ex:
|
except os_exc.ConflictException as ex:
|
||||||
if ex.details.startswith('Quota exceeded for resources'):
|
if ex.details.startswith('Quota exceeded for resources'):
|
||||||
|
|
|
@ -146,8 +146,10 @@ class TestKuryrPortHandler(test_base.TestCase):
|
||||||
|
|
||||||
k8s.get.assert_called_once_with(self._pod_uri)
|
k8s.get.assert_called_once_with(self._pod_uri)
|
||||||
|
|
||||||
activate_vif.assert_has_calls([mock.call(self._vif1),
|
activate_vif.assert_has_calls([mock.call(self._vif1, pod=self._pod,
|
||||||
mock.call(self._vif2)])
|
retry_info=mock.ANY),
|
||||||
|
mock.call(self._vif2, pod=self._pod,
|
||||||
|
retry_info=mock.ANY)])
|
||||||
update_crd.assert_called_once_with(self._kp, self._vifs)
|
update_crd.assert_called_once_with(self._kp, self._vifs)
|
||||||
|
|
||||||
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
|
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
|
||||||
|
@ -182,8 +184,10 @@ class TestKuryrPortHandler(test_base.TestCase):
|
||||||
|
|
||||||
kp.on_present(self._kp)
|
kp.on_present(self._kp)
|
||||||
|
|
||||||
activate_vif.assert_has_calls([mock.call(self._vif1),
|
activate_vif.assert_has_calls([mock.call(self._vif1, pod=mock.ANY,
|
||||||
mock.call(self._vif2)])
|
retry_info=mock.ANY),
|
||||||
|
mock.call(self._vif2, pod=mock.ANY,
|
||||||
|
retry_info=mock.ANY)])
|
||||||
|
|
||||||
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
|
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
|
||||||
'activate_vif')
|
'activate_vif')
|
||||||
|
@ -328,8 +332,10 @@ class TestKuryrPortHandler(test_base.TestCase):
|
||||||
|
|
||||||
k8s.get.assert_called_once_with(self._pod_uri)
|
k8s.get.assert_called_once_with(self._pod_uri)
|
||||||
|
|
||||||
activate_vif.assert_has_calls([mock.call(self._vif1),
|
activate_vif.assert_has_calls([mock.call(self._vif1, pod=self._pod,
|
||||||
mock.call(self._vif2)])
|
retry_info=mock.ANY),
|
||||||
|
mock.call(self._vif2, pod=self._pod,
|
||||||
|
retry_info=mock.ANY)])
|
||||||
update_crd.assert_called_once_with(self._kp, self._vifs)
|
update_crd.assert_called_once_with(self._kp, self._vifs)
|
||||||
create_sgr.assert_called_once_with(self._pod)
|
create_sgr.assert_called_once_with(self._pod)
|
||||||
|
|
||||||
|
|
|
@ -95,7 +95,7 @@ class TestRetryHandler(test_base.TestCase):
|
||||||
|
|
||||||
retry(event)
|
retry(event)
|
||||||
|
|
||||||
m_handler.assert_called_once_with(event)
|
m_handler.assert_called_once_with(event, retry_info=mock.ANY)
|
||||||
m_sleep.assert_not_called()
|
m_sleep.assert_not_called()
|
||||||
|
|
||||||
@mock.patch('itertools.count')
|
@mock.patch('itertools.count')
|
||||||
|
@ -134,7 +134,8 @@ class TestRetryHandler(test_base.TestCase):
|
||||||
|
|
||||||
retry(event)
|
retry(event)
|
||||||
|
|
||||||
m_handler.assert_has_calls([mock.call(event)] * attempts)
|
m_handler.assert_has_calls([mock.call(
|
||||||
|
event, retry_info=mock.ANY)] * attempts)
|
||||||
m_sleep.assert_has_calls([
|
m_sleep.assert_has_calls([
|
||||||
mock.call(deadline, i + 1, failures[i])
|
mock.call(deadline, i + 1, failures[i])
|
||||||
for i in range(len(failures))])
|
for i in range(len(failures))])
|
||||||
|
@ -155,7 +156,8 @@ class TestRetryHandler(test_base.TestCase):
|
||||||
|
|
||||||
self.assertRaises(_EX11, retry, event)
|
self.assertRaises(_EX11, retry, event)
|
||||||
|
|
||||||
m_handler.assert_has_calls([mock.call(event)] * attempts)
|
m_handler.assert_has_calls([mock.call(
|
||||||
|
event, retry_info=mock.ANY)] * attempts)
|
||||||
m_sleep.assert_has_calls([
|
m_sleep.assert_has_calls([
|
||||||
mock.call(deadline, i + 1, failures[i])
|
mock.call(deadline, i + 1, failures[i])
|
||||||
for i in range(len(failures))])
|
for i in range(len(failures))])
|
||||||
|
|
Loading…
Reference in New Issue