Add events for Services

This commit implements adding Events for various things that may happen
when Kuryr handles a Service - either incidents or just informative
messages helpful to the user.

As to add an Event it is required to have a uid of the object and in
most of KuryrLoadBalancerHandler we don't have the Service
representation, this commit implements populating ownerReferences of the
KuryrLoadBalancer object with a reference to the Service. This has a
major side effect that KLB will be garbage collected if corresponding
service doesn't exist, which is probably a good thing (and we manage
that ourselves using finalizers anyway).

Another set of refactorings is to remove KLB creation from
EndpointsHandler in order to stop it fighting with ServiceHandler over
creation - EndpointsHandler cannot add ownerReference as it has no uid
of the Service.

Other refactorings related to the error messages are also included.

Change-Id: I36b4d62e6fc7ace00909e9b1aa7681f6d59ca455
This commit is contained in:
Michał Dulko 2021-12-03 18:41:16 +01:00
parent d32122a5d1
commit ca719a4009
10 changed files with 330 additions and 328 deletions

View File

@ -50,7 +50,7 @@ def get_loadbalancer_client():
return get_openstacksdk().load_balancer
def get_kubernetes_client():
def get_kubernetes_client() -> k8s_client.K8sClient:
return _clients[_KUBERNETES_CLIENT]

View File

@ -26,6 +26,7 @@ from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__)
CONF = config.CONF
SUPPORTED_SERVICE_TYPES = ('ClusterIP', 'LoadBalancer')
@ -45,6 +46,15 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
self._drv_project = drv_base.ServiceProjectDriver.get_instance()
self._drv_subnets = drv_base.ServiceSubnetsDriver.get_instance()
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()
self._drv_lbaas = drv_base.LBaaSDriver.get_instance()
self.k8s = clients.get_kubernetes_client()
self._lb_provider = None
if self._drv_lbaas.providers_supported():
self._lb_provider = 'amphora'
config_provider = CONF.kubernetes.endpoints_driver_octavia_provider
if config_provider != 'default':
self._lb_provider = config_provider
def _bump_network_policies(self, svc):
if driver_utils.is_network_policy_enabled():
@ -53,16 +63,21 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
def on_present(self, service, *args, **kwargs):
reason = self._should_ignore(service)
if reason:
LOG.debug(reason, service['metadata']['name'])
reason %= utils.get_res_unique_name(service)
LOG.debug(reason)
self.k8s.add_event(service, 'KuryrServiceSkipped', reason)
return
k8s = clients.get_kubernetes_client()
loadbalancer_crd = k8s.get_loadbalancer_crd(service)
loadbalancer_crd = self.k8s.get_loadbalancer_crd(service)
try:
if not self._patch_service_finalizer(service):
return
except k_exc.K8sClientException as ex:
LOG.exception("Failed to set service finalizer: %s", ex)
msg = (f'K8s API error when adding finalizer to Service '
f'{utils.get_res_unique_name(service)}')
LOG.exception(msg)
self.k8s.add_event(service, 'KuryrAddServiceFinalizerError',
f'{msg}: {ex}', 'Warning')
raise
if loadbalancer_crd is None:
@ -108,20 +123,17 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
return None
def _patch_service_finalizer(self, service):
k8s = clients.get_kubernetes_client()
return k8s.add_finalizer(service, k_const.SERVICE_FINALIZER)
return self.k8s.add_finalizer(service, k_const.SERVICE_FINALIZER)
def on_finalize(self, service, *args, **kwargs):
k8s = clients.get_kubernetes_client()
klb_crd_path = utils.get_klb_crd_path(service)
# Bump all the NPs in the namespace to force SG rules
# recalculation.
self._bump_network_policies(service)
try:
k8s.delete(klb_crd_path)
self.k8s.delete(klb_crd_path)
except k_exc.K8sResourceNotFound:
k8s.remove_finalizer(service, k_const.SERVICE_FINALIZER)
self.k8s.remove_finalizer(service, k_const.SERVICE_FINALIZER)
def _has_clusterip(self, service):
# ignore headless service, clusterIP is None
@ -149,17 +161,25 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
svc_namespace = service['metadata']['namespace']
kubernetes = clients.get_kubernetes_client()
spec = self._build_kuryrloadbalancer_spec(service)
owner_reference = {
'apiVersion': service['apiVersion'],
'kind': service['kind'],
'name': service['metadata']['name'],
'uid': service['metadata']['uid'],
}
loadbalancer_crd = {
'apiVersion': 'openstack.org/v1',
'kind': 'KuryrLoadBalancer',
'metadata': {
'name': svc_name,
'finalizers': [k_const.KURYRLB_FINALIZER],
},
'ownerReferences': [owner_reference],
},
'spec': spec,
'status': {
}
}
'status': {},
}
try:
kubernetes.post('{}/{}/kuryrloadbalancers'.format(
@ -169,8 +189,12 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
raise k_exc.ResourceNotReady(svc_name)
except k_exc.K8sNamespaceTerminating:
raise
except k_exc.K8sClientException:
except k_exc.K8sClientException as e:
LOG.exception("Exception when creating KuryrLoadBalancer CRD.")
self.k8s.add_event(
service, 'CreateKLBFailed',
'Error when creating KuryrLoadBalancer object: %s' % e,
'Warning')
raise
def _update_crd_spec(self, loadbalancer_crd, service):
@ -185,13 +209,17 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
LOG.debug('KuryrLoadBalancer CRD not found %s', loadbalancer_crd)
except k_exc.K8sConflict:
raise k_exc.ResourceNotReady(svc_name)
except k_exc.K8sClientException:
except k_exc.K8sClientException as e:
LOG.exception('Error updating kuryrnet CRD %s', loadbalancer_crd)
self.k8s.add_event(
service, 'UpdateKLBFailed',
'Error when updating KuryrLoadBalancer object: %s' % e,
'Warning')
raise
def _get_data_timeout_annotation(self, service):
default_timeout_cli = config.CONF.octavia_defaults.timeout_client_data
default_timeout_mem = config.CONF.octavia_defaults.timeout_member_data
default_timeout_cli = CONF.octavia_defaults.timeout_client_data
default_timeout_mem = CONF.octavia_defaults.timeout_member_data
try:
annotations = service['metadata']['annotations']
except KeyError:
@ -220,13 +248,16 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
subnet_id = self._get_subnet_id(service, project_id, svc_ip)
spec_type = service['spec'].get('type')
spec = {
'ip': svc_ip,
'ports': ports,
'project_id': project_id,
'security_groups_ids': sg_ids,
'subnet_id': subnet_id,
'type': spec_type
}
'ip': svc_ip,
'ports': ports,
'project_id': project_id,
'security_groups_ids': sg_ids,
'subnet_id': subnet_id,
'type': spec_type,
}
if self._lb_provider:
spec['provider'] = self._lb_provider
if spec_lb_ip is not None:
spec['lb_ip'] = spec_lb_ip
@ -288,25 +319,13 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
def __init__(self):
super(EndpointsHandler, self).__init__()
self._drv_lbaas = drv_base.LBaaSDriver.get_instance()
# Note(yboaron) LBaaS driver supports 'provider' parameter in
# Load Balancer creation flow.
# We need to set the requested load balancer provider
# according to 'endpoints_driver_octavia_provider' configuration.
self._lb_provider = None
if self._drv_lbaas.providers_supported():
self._lb_provider = 'amphora'
if (config.CONF.kubernetes.endpoints_driver_octavia_provider
!= 'default'):
self._lb_provider = (
config.CONF.kubernetes.endpoints_driver_octavia_provider)
self.k8s = clients.get_kubernetes_client()
def on_present(self, endpoints, *args, **kwargs):
ep_name = endpoints['metadata']['name']
ep_namespace = endpoints['metadata']['namespace']
k8s = clients.get_kubernetes_client()
loadbalancer_crd = k8s.get_loadbalancer_crd(endpoints)
loadbalancer_crd = self.k8s.get_loadbalancer_crd(endpoints)
if (not (self._has_pods(endpoints) or (loadbalancer_crd and
loadbalancer_crd.get('status')))
@ -318,15 +337,14 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
return
if loadbalancer_crd is None:
raise k_exc.KuryrLoadBalancerNotCreated(endpoints)
else:
try:
self._create_crd_spec(endpoints)
self._update_crd_spec(loadbalancer_crd, endpoints)
except k_exc.K8sNamespaceTerminating:
LOG.warning('Namespace %s is being terminated, ignoring '
'Endpoints %s in that namespace.',
ep_namespace, ep_name)
return
else:
self._update_crd_spec(loadbalancer_crd, endpoints)
def on_deleted(self, endpoints, *args, **kwargs):
self._remove_endpoints(endpoints)
@ -365,84 +383,52 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
return endpointslices
def _create_crd_spec(self, endpoints, spec=None, status=None):
endpoints_name = endpoints['metadata']['name']
namespace = endpoints['metadata']['namespace']
kubernetes = clients.get_kubernetes_client()
# TODO(maysams): Remove the convertion once we start handling
# Endpoint slices.
epslices = self._convert_subsets_to_endpointslice(endpoints)
if not status:
status = {}
if not spec:
spec = {'endpointSlices': epslices}
# NOTE(maysams): As the spec may already contain a
# ports field from the Service, a new endpointslice
# field is introduced to also hold ports from the
# Endpoints under the spec.
loadbalancer_crd = {
'apiVersion': 'openstack.org/v1',
'kind': 'KuryrLoadBalancer',
'metadata': {
'name': endpoints_name,
'finalizers': [k_const.KURYRLB_FINALIZER],
},
'spec': spec,
'status': status,
}
if self._lb_provider:
loadbalancer_crd['spec']['provider'] = self._lb_provider
def _add_event(self, endpoints, reason, message, type_=None):
"""_add_event adds an event for the corresponding Service."""
try:
kubernetes.post('{}/{}/kuryrloadbalancers'.format(
k_const.K8S_API_CRD_NAMESPACES, namespace), loadbalancer_crd)
except k_exc.K8sConflict:
raise k_exc.ResourceNotReady(loadbalancer_crd)
except k_exc.K8sNamespaceTerminating:
raise
service = self.k8s.get(utils.get_service_link(endpoints))
except k_exc.K8sClientException:
LOG.exception("Exception when creating KuryrLoadBalancer CRD.")
raise
LOG.debug('Error when fetching Service to add an event %s, '
'ignoring', utils.get_res_unique_name(endpoints))
return
kwargs = {'type_': type_} if type_ else {}
self.k8s.add_event(service, reason, message, **kwargs)
def _update_crd_spec(self, loadbalancer_crd, endpoints):
kubernetes = clients.get_kubernetes_client()
# TODO(maysams): Remove the convertion once we start handling
# Endpoint slices.
# TODO(maysams): Remove the conversion once we start handling
# EndpointSlices.
epslices = self._convert_subsets_to_endpointslice(endpoints)
spec = {'endpointSlices': epslices}
if self._lb_provider:
spec['provider'] = self._lb_provider
try:
kubernetes.patch_crd(
'spec',
utils.get_res_link(loadbalancer_crd),
spec)
self.k8s.patch_crd('spec', utils.get_res_link(loadbalancer_crd),
{'endpointSlices': epslices})
except k_exc.K8sResourceNotFound:
LOG.debug('KuryrLoadbalancer CRD not found %s', loadbalancer_crd)
except k_exc.K8sConflict:
raise k_exc.ResourceNotReady(loadbalancer_crd)
except k_exc.K8sClientException:
except k_exc.K8sClientException as e:
LOG.exception('Error updating KuryrLoadbalancer CRD %s',
loadbalancer_crd)
self._add_event(
endpoints, 'UpdateKLBFailed',
'Error when updating KuryrLoadBalancer object: %s' % e,
'Warning')
raise
return True
def _remove_endpoints(self, endpoints):
kubernetes = clients.get_kubernetes_client()
lb_name = endpoints['metadata']['name']
try:
kubernetes.patch_crd('spec',
utils.get_klb_crd_path(endpoints),
'endpointSlices',
action='remove')
self.k8s.patch_crd('spec', utils.get_klb_crd_path(endpoints),
'endpointSlices', action='remove')
except k_exc.K8sResourceNotFound:
LOG.debug('KuryrLoadBalancer CRD not found %s', lb_name)
except k_exc.K8sUnprocessableEntity:
LOG.warning('KuryrLoadBalancer %s modified, ignoring.', lb_name)
except k_exc.K8sClientException:
except k_exc.K8sClientException as e:
LOG.exception('Error updating KuryrLoadBalancer CRD %s', lb_name)
self._add_event(
endpoints, 'UpdateKLBFailed',
'Error when updating KuryrLoadBalancer object: %s' % e,
'Warning')
raise

View File

@ -53,26 +53,56 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
self._drv_svc_project = drv_base.ServiceProjectDriver.get_instance()
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()
self._drv_nodes_subnets = drv_base.NodesSubnetsDriver.get_instance()
self.k8s = clients.get_kubernetes_client()
def _get_nodes_subnets(self):
return utils.get_subnets_id_cidrs(
self._drv_nodes_subnets.get_nodes_subnets())
def _add_event(self, klb, reason, message, type_=None):
"""_add_event adds an event for the corresponding Service."""
klb_meta = klb['metadata']
for ref in klb_meta.get('ownerReferences', []):
# "mock" a Service based on ownerReference to it.
if ref['kind'] == 'Service' and ref['name'] == klb_meta['name']:
service = {
'apiVersion': ref['apiVersion'],
'kind': ref['kind'],
'metadata': {
'name': ref['name'],
'uid': ref['uid'],
'namespace': klb_meta['namespace'], # ref shares ns
},
}
break
else:
# No reference, just fetch the service from the API.
try:
service = self.k8s.get(
f"{k_const.K8S_API_NAMESPACES}/{klb_meta['namespace']}"
f"/services/{klb_meta['name']}")
except k_exc.K8sClientException:
LOG.debug('Error when fetching Service to add an event %s, '
'ignoring', utils.get_res_unique_name(klb))
return
kwargs = {'type_': type_} if type_ else {}
self.k8s.add_event(service, reason, message, **kwargs)
def on_present(self, loadbalancer_crd, *args, **kwargs):
if loadbalancer_crd.get('status', None) is None:
kubernetes = clients.get_kubernetes_client()
try:
kubernetes.patch_crd('status',
utils.get_res_link(loadbalancer_crd),
{})
self.k8s.patch_crd('status',
utils.get_res_link(loadbalancer_crd), {})
except k_exc.K8sResourceNotFound:
LOG.debug('KuryrLoadbalancer CRD not found %s',
utils.get_res_unique_name(loadbalancer_crd))
return
if self._should_ignore(loadbalancer_crd):
LOG.debug("Ignoring Kubernetes service %s",
loadbalancer_crd['metadata']['name'])
reason = self._should_ignore(loadbalancer_crd)
if reason:
reason %= utils.get_res_unique_name(loadbalancer_crd)
LOG.debug(reason)
self._add_event(loadbalancer_crd, 'KuryrServiceSkipped', reason)
return
crd_lb = loadbalancer_crd['status'].get('loadbalancer')
@ -83,15 +113,34 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
if not lb_provider or lb_provider in OCTAVIA_DEFAULT_PROVIDERS:
if (spec_lb_provider and
spec_lb_provider not in OCTAVIA_DEFAULT_PROVIDERS):
self._add_event(loadbalancer_crd, 'KuryrUpdateProvider',
'Deleting Amphora load balancer to '
'recreate it with OVN provider')
self._ensure_release_lbaas(loadbalancer_crd)
# ovn to amphora downgrade
elif lb_provider and lb_provider not in OCTAVIA_DEFAULT_PROVIDERS:
if (not spec_lb_provider or
spec_lb_provider in OCTAVIA_DEFAULT_PROVIDERS):
self._add_event(loadbalancer_crd, 'KuryrUpdateProvider',
'Deleting OVN load balancer to '
'recreate it with Amphora provider')
self._ensure_release_lbaas(loadbalancer_crd)
if self._sync_lbaas_members(loadbalancer_crd):
if not crd_lb:
self._add_event(loadbalancer_crd, 'KuryrEnsureLB',
'Provisioning a load balancer')
try:
changed = self._sync_lbaas_members(loadbalancer_crd)
except Exception as e:
self._add_event(
loadbalancer_crd, 'KuryrEnsureLBError',
f'Error when provisioning load balancer: {e}', 'Warning')
raise
if changed:
self._add_event(loadbalancer_crd, 'KuryrEnsuredLB',
'Load balancer provisioned')
# Note(yboaron) For LoadBalancer services, we should allocate FIP,
# associate it to LB VIP and update K8S service status
lb_ip = loadbalancer_crd['spec'].get('lb_ip')
@ -106,6 +155,9 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
loadbalancer_crd['status']['loadbalancer'][
'port_id']))
if service_pub_ip_info:
self._add_event(
loadbalancer_crd, 'KuryrEnsureFIP',
'Associating floating IP to the load balancer')
self._drv_service_pub_ip.associate_pub_ip(
service_pub_ip_info, loadbalancer_crd['status'][
'loadbalancer']['port_id'])
@ -128,10 +180,12 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
def _trigger_loadbalancer_reconciliation(self, loadbalancer_crds):
LOG.debug("Reconciling the loadbalancer CRDs")
# get the loadbalancers id in the CRD status
crd_loadbalancer_ids = [{'id': loadbalancer_crd.get('status', {}).get(
'loadbalancer', {}).get('id', {}), 'selflink':
utils.get_res_link(loadbalancer_crd)} for
loadbalancer_crd in loadbalancer_crds]
crd_loadbalancer_ids = [
{'id': loadbalancer_crd.get('status', {}).get(
'loadbalancer', {}).get('id', {}),
'selflink': utils.get_res_link(loadbalancer_crd),
'klb': loadbalancer_crd} for
loadbalancer_crd in loadbalancer_crds]
lbaas = clients.get_loadbalancer_client()
lbaas_spec = {}
self._drv_lbaas.add_tags('loadbalancer', lbaas_spec)
@ -140,21 +194,24 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
for loadbalancer in loadbalancers]
# for each loadbalancer id in the CRD status, check if exists in
# OpenStack
crds_to_reconcile_selflink = [crd_lb['selflink'] for crd_lb in
crd_loadbalancer_ids if
crd_lb['id'] not in loadbalancers_id]
if not crds_to_reconcile_selflink:
crds_to_reconcile = [crd_lb for crd_lb in crd_loadbalancer_ids
if crd_lb['id'] not in loadbalancers_id]
if not crds_to_reconcile:
LOG.debug("KuryrLoadBalancer CRDs already in sync with OpenStack")
return
LOG.debug("Reconciling the following KuryrLoadBalancer CRDs: %r",
crds_to_reconcile_selflink)
self._reconcile_lbaas(crds_to_reconcile_selflink)
[klb['id'] for klb in crds_to_reconcile])
self._reconcile_lbaas(crds_to_reconcile)
def _reconcile_lbaas(self, crds_to_reconcile_selflink):
kubernetes = clients.get_kubernetes_client()
for selflink in crds_to_reconcile_selflink:
def _reconcile_lbaas(self, crds_to_reconcile):
for entry in crds_to_reconcile:
selflink = entry['selflink']
try:
kubernetes.patch_crd('status', selflink, {})
self._add_event(
entry['klb'], 'LoadBalancerRecreating',
'Load balancer for the Service seems to not exist anymore.'
' Recreating it.', 'Warning')
self.k8s.patch_crd('status', selflink, {})
except k_exc.K8sResourceNotFound:
LOG.debug('Unable to reconcile the KuryLoadBalancer CRD %s',
selflink)
@ -165,9 +222,12 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
continue
def _should_ignore(self, loadbalancer_crd):
return (not(self._has_endpoints(loadbalancer_crd) or
loadbalancer_crd.get('status')) or not
loadbalancer_crd['spec'].get('ip'))
if not(self._has_endpoints(loadbalancer_crd) or
loadbalancer_crd.get('status')):
return 'Skipping Service %s without Endpoints'
elif not loadbalancer_crd['spec'].get('ip'):
return 'Skipping Service %s without IP set yet'
return False
def _has_endpoints(self, loadbalancer_crd):
ep_slices = loadbalancer_crd['spec'].get('endpointSlices', [])
@ -179,9 +239,20 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
LOG.debug("Deleting the loadbalancer CRD")
if loadbalancer_crd['status'] != {}:
# NOTE(ivc): deleting pool deletes its members
self._drv_lbaas.release_loadbalancer(
loadbalancer=loadbalancer_crd['status'].get('loadbalancer'))
self._add_event(loadbalancer_crd, 'KuryrReleaseLB',
'Releasing the load balancer')
try:
# NOTE(ivc): deleting pool deletes its members
self._drv_lbaas.release_loadbalancer(
loadbalancer_crd['status'].get('loadbalancer'))
except Exception as e:
# FIXME(dulek): It seems like if loadbalancer will be stuck in
# PENDING_DELETE we'll just silently time out
# waiting for it to be deleted. Is that expected?
self._add_event(
loadbalancer_crd, 'KuryrReleaseLBError',
f'Error when releasing load balancer: {e}', 'Warning')
raise
try:
pub_info = loadbalancer_crd['status']['service_pub_ip_info']
@ -189,43 +260,53 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
pub_info = None
if pub_info:
self._add_event(
loadbalancer_crd, 'KuryrReleaseFIP',
'Dissociating floating IP from the load balancer')
self._drv_service_pub_ip.release_pub_ip(
loadbalancer_crd['status']['service_pub_ip_info'])
kubernetes = clients.get_kubernetes_client()
LOG.debug('Removing finalizer from KuryrLoadBalancer CRD %s',
loadbalancer_crd)
try:
kubernetes.remove_finalizer(loadbalancer_crd,
k_const.KURYRLB_FINALIZER)
except k_exc.K8sClientException:
LOG.exception('Error removing kuryrloadbalancer CRD finalizer '
'for %s', loadbalancer_crd)
self.k8s.remove_finalizer(loadbalancer_crd,
k_const.KURYRLB_FINALIZER)
except k_exc.K8sClientException as e:
msg = (f'K8s API error when removing finalizer from '
f'KuryrLoadBalancer of Service '
f'{utils.get_res_unique_name(loadbalancer_crd)}')
LOG.exception(msg)
self._add_event(loadbalancer_crd, 'KuryrRemoveLBFinalizerError',
f'{msg}: {e}', 'Warning')
raise
namespace = loadbalancer_crd['metadata']['namespace']
name = loadbalancer_crd['metadata']['name']
try:
service = kubernetes.get(f"{k_const.K8S_API_NAMESPACES}"
f"/{namespace}/services/{name}")
except k_exc.K8sResourceNotFound as ex:
LOG.warning("Failed to get service: %s", ex)
service = self.k8s.get(f"{k_const.K8S_API_NAMESPACES}/{namespace}"
f"/services/{name}")
except k_exc.K8sResourceNotFound:
LOG.warning('Service %s not found. This is unexpected.',
utils.get_res_unique_name(loadbalancer_crd))
return
LOG.debug('Removing finalizer from service %s',
service["metadata"]["name"])
LOG.debug('Removing finalizer from Service %s',
utils.get_res_unique_name(service))
try:
kubernetes.remove_finalizer(service, k_const.SERVICE_FINALIZER)
except k_exc.K8sClientException:
LOG.exception('Error removing service finalizer '
'for %s', service["metadata"]["name"])
self.k8s.remove_finalizer(service, k_const.SERVICE_FINALIZER)
except k_exc.K8sClientException as e:
msg = (f'K8s API error when removing finalizer from Service '
f'{utils.get_res_unique_name(service)}')
LOG.exception(msg)
self._add_event(
loadbalancer_crd, 'KuryrRemoveServiceFinalizerError',
f'{msg}: {e}', 'Warning')
raise
def _patch_status(self, loadbalancer_crd):
kubernetes = clients.get_kubernetes_client()
try:
kubernetes.patch_crd('status', utils.get_res_link(
loadbalancer_crd), loadbalancer_crd['status'])
self.k8s.patch_crd('status', utils.get_res_link(loadbalancer_crd),
loadbalancer_crd['status'])
except k_exc.K8sResourceNotFound:
LOG.debug('KuryrLoadBalancer CRD not found %s', loadbalancer_crd)
return False
@ -233,16 +314,20 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
LOG.warning('KuryrLoadBalancer %s modified, retrying later.',
utils.get_res_unique_name(loadbalancer_crd))
return False
except k_exc.K8sClientException:
LOG.exception('Error updating KuryLoadbalancer CRD %s',
loadbalancer_crd)
except k_exc.K8sClientException as e:
msg = (f'K8s API error when updating status of '
f'{utils.get_res_unique_name(loadbalancer_crd)} Service '
f'load balancer')
LOG.exception(msg)
self._add_event(loadbalancer_crd, 'KuryrUpdateLBStatusError',
f'{msg}: {e}', 'Warning')
raise
return True
def _sync_lbaas_members(self, loadbalancer_crd):
changed = False
if (self._remove_unused_members(loadbalancer_crd)):
if self._remove_unused_members(loadbalancer_crd):
changed = True
if self._sync_lbaas_pools(loadbalancer_crd):
@ -258,9 +343,8 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
lb = klb_crd['status'].get('loadbalancer')
svc_name = klb_crd['metadata']['name']
svc_namespace = klb_crd['metadata']['namespace']
k8s = clients.get_kubernetes_client()
try:
service = k8s.get(
service = self.k8s.get(
f'{k_const.K8S_API_NAMESPACES}/{svc_namespace}/'
f'services/{svc_name}')
except k_exc.K8sResourceNotFound:
@ -275,8 +359,9 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
lb['security_groups'] = lb_sgs
try:
k8s.patch_crd('status/loadbalancer', utils.get_res_link(klb_crd),
{'security_groups': lb_sgs})
self.k8s.patch_crd('status/loadbalancer',
utils.get_res_link(klb_crd),
{'security_groups': lb_sgs})
except k_exc.K8sResourceNotFound:
LOG.debug('KuryrLoadBalancer %s not found', svc_name)
return None
@ -284,8 +369,13 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
LOG.debug('KuryrLoadBalancer entity not processable '
'due to missing loadbalancer field.')
return None
except k_exc.K8sClientException:
LOG.exception('Error syncing KuryrLoadBalancer %s', svc_name)
except k_exc.K8sClientException as e:
msg = (f'K8s API error when updating SGs status of '
f'{utils.get_res_unique_name(klb_crd)} Service load '
f'balancer')
LOG.exception(msg)
self._add_event(klb_crd, 'KuryrUpdateLBStatusError',
f'{msg}: {e}', 'Warning')
raise
return klb_crd
@ -359,8 +449,14 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
target_pod, target_ip, loadbalancer_crd)
if not member_subnet_id:
LOG.warning("Skipping member creation for %s",
target_ip)
msg = (
f'Unable to determine ID of the subnet of member '
f'{target_ip} for service '
f'{utils.get_res_unique_name(loadbalancer_crd)}. '
f'Skipping its creation')
self._add_event(loadbalancer_crd, 'KuryrSkipMember',
msg, 'Warning')
LOG.warning(msg)
continue
target_name, target_namespace = self._get_target_info(
@ -617,8 +713,6 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
for port_spec in lbaas_spec_ports:
protocol = port_spec['protocol']
port = port_spec['port']
name = "%s:%s" % (loadbalancer_crd['status']['loadbalancer'][
'name'], protocol)
listener = []
for l in loadbalancer_crd['status'].get('listeners', []):
@ -638,12 +732,23 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
'listeners', []) if l['port'] == port]
if listener and not self._drv_lbaas.double_listeners_supported():
LOG.warning("Skipping listener creation for %s as another one"
" already exists with port %s", name, port)
msg = (
f'Octavia does not support multiple listeners listening '
f'on the same port. Skipping creation of listener '
f'{protocol}:{port} because {listener["protocol"]}:'
f'{listener["port"]} already exists for Service '
f'{utils.get_res_unique_name(loadbalancer_crd)}')
self._add_event(loadbalancer_crd, 'KuryrSkipListener', msg,
'Warning')
LOG.warning(msg)
continue
if protocol == "SCTP" and not self._drv_lbaas.sctp_supported():
LOG.warning("Skipping listener creation as provider does"
" not support %s protocol", protocol)
msg = (
f'Skipping listener {protocol}:{port} creation as Octavia '
f'does not support {protocol} protocol.')
self._add_event(loadbalancer_crd, 'KuryrSkipListener', msg,
'Warning')
LOG.warning(msg)
continue
listener = self._drv_lbaas.ensure_listener(
loadbalancer=loadbalancer_crd['status'].get('loadbalancer'),
@ -697,17 +802,18 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
ns = lb_crd['metadata']['namespace']
status_data = {"loadBalancer": {
"ingress": [{"ip": lb_ip_address.format()}]}}
k8s = clients.get_kubernetes_client()
try:
k8s.patch("status", f"{k_const.K8S_API_NAMESPACES}"
f"/{ns}/services/{name}/status",
status_data)
self.k8s.patch("status", f"{k_const.K8S_API_NAMESPACES}"
f"/{ns}/services/{name}/status",
status_data)
except k_exc.K8sConflict:
raise k_exc.ResourceNotReady(name)
except k_exc.K8sClientException:
LOG.exception("Kubernetes Client Exception"
"when updating the svc status %s"
% name)
except k_exc.K8sClientException as e:
msg = (f'K8s API error when updating external FIP data of Service '
f'{utils.get_res_unique_name(lb_crd)}')
LOG.exception(msg)
self._add_event(lb_crd, 'KuryrUpdateServiceStatusError',
f'{msg}: {e}', 'Warning')
raise
def _sync_lbaas_loadbalancer(self, loadbalancer_crd):
@ -754,29 +860,26 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
def _ensure_release_lbaas(self, loadbalancer_crd):
attempts = 0
deadline = 0
retry = True
timeout = config.CONF.kubernetes.watch_retry_timeout
while retry:
deadline = time.time() + timeout
while True:
try:
if attempts == 1:
deadline = time.time() + timeout
if (attempts > 0 and
utils.exponential_sleep(deadline, attempts) == 0):
LOG.error("Failed releasing lbaas '%s': deadline exceeded",
loadbalancer_crd['status']['loadbalancer'][
'name'])
if not utils.exponential_sleep(deadline, attempts):
msg = (f'Timed out waiting for deletion of load balancer '
f'{utils.get_res_unique_name(loadbalancer_crd)}')
self._add_event(
loadbalancer_crd, 'KuryrLBReleaseTimeout', msg,
'Warning')
LOG.error(msg)
return
self._drv_lbaas.release_loadbalancer(
loadbalancer=loadbalancer_crd['status'].get('loadbalancer')
)
retry = False
loadbalancer_crd['status'].get('loadbalancer'))
break
except k_exc.ResourceNotReady:
LOG.debug("Attempt (%s) of loadbalancer release %s failed."
LOG.debug("Attempt %s to release LB %s failed."
" A retry will be triggered.", attempts,
loadbalancer_crd['status']['loadbalancer']['name'])
utils.get_res_unique_name(loadbalancer_crd))
attempts += 1
retry = True
loadbalancer_crd['status'] = {}
self._patch_status(loadbalancer_crd)

View File

@ -58,11 +58,13 @@ class ControllerPipeline(h_dis.EventPipeline):
def _wrap_consumer(self, consumer):
# TODO(ivc): tune retry interval/timeout
return h_log.LogExceptions(h_retry.Retry(
consumer, exceptions=(
exceptions.ResourceNotReady,
key_exc.connection.ConnectFailure,
requests_exc.ConnectionError)))
return h_log.LogExceptions(
h_retry.Retry(
consumer,
exceptions=(exceptions.ResourceNotReady,
key_exc.connection.ConnectFailure,
requests_exc.ConnectionError)),
ignore_exceptions=(exceptions.KuryrLoadBalancerNotCreated,))
def _wrap_dispatcher(self, dispatcher):
return h_log.LogExceptions(h_async.Async(dispatcher, self._tg,

View File

@ -42,6 +42,13 @@ class ResourceNotReady(Exception):
super(ResourceNotReady, self).__init__("Resource not ready: %r" % msg)
class KuryrLoadBalancerNotCreated(Exception):
def __init__(self, res):
name = utils.get_res_unique_name(res)
super().__init__(
'KuryrLoadBalancer not created yet for the Service %s' % name)
class LoadBalancerNotReady(ResourceNotReady):
def __init__(self, loadbalancer_id, status):
super().__init__(

View File

@ -28,13 +28,16 @@ class LogExceptions(base.EventHandler):
instead.
"""
def __init__(self, handler, exceptions=Exception):
def __init__(self, handler, exceptions=Exception, ignore_exceptions=None):
self._handler = handler
self._exceptions = exceptions
self._ignore_exceptions = ignore_exceptions or ()
def __call__(self, event, *args, **kwargs):
try:
self._handler(event, *args, **kwargs)
except self._ignore_exceptions:
pass
except self._exceptions as ex:
# If exception comes from OpenStack SDK and contains
# 'request_id' then print this 'request_id' along the Exception.

View File

@ -97,7 +97,10 @@ class Retry(base.EventHandler):
.get_instance())
method = getattr(exporter, cls_map[type(exc).__name__])
method()
except exceptions.KuryrLoadBalancerNotCreated:
with excutils.save_and_reraise_exception() as ex:
if self._sleep(deadline, attempt, ex.value):
ex.reraise = False
except os_exc.ConflictException as ex:
if ex.details.startswith('Quota exceeded for resources'):
with excutils.save_and_reraise_exception() as ex:

View File

@ -27,23 +27,6 @@ _SUPPORTED_LISTENER_PROT = ('HTTP', 'HTTPS', 'TCP')
class TestServiceHandler(test_base.TestCase):
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.ServiceSubnetsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.ServiceProjectDriver.get_instance')
def test_init(self, m_get_drv_project, m_get_drv_subnets, m_get_drv_sg):
m_get_drv_project.return_value = mock.sentinel.drv_project
m_get_drv_subnets.return_value = mock.sentinel.drv_subnets
m_get_drv_sg.return_value = mock.sentinel.drv_sg
handler = h_lbaas.ServiceHandler()
self.assertEqual(mock.sentinel.drv_project, handler._drv_project)
self.assertEqual(mock.sentinel.drv_subnets, handler._drv_subnets)
self.assertEqual(mock.sentinel.drv_sg, handler._drv_sg)
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
def test_on_present(self, get_k8s_client):
svc_event = {
@ -114,6 +97,7 @@ class TestServiceHandler(test_base.TestCase):
m_handler.create_crd_spec.return_value = new_spec
m_handler._should_ignore.return_value = False
m_handler._drv_project = m_drv_project
m_handler.k8s = mock.Mock()
h_lbaas.ServiceHandler.on_present(m_handler, svc_event)
m_handler.create_crd_spec(svc_event)
@ -179,6 +163,7 @@ class TestServiceHandler(test_base.TestCase):
m_handler.create_crd_spec.return_value = old_spec
m_handler._should_ignore.return_value = False
m_handler._drv_project = m_drv_project
m_handler.k8s = mock.Mock()
h_lbaas.ServiceHandler.on_present(m_handler, svc_event)
m_handler.create_crd_spec(svc_event)
@ -418,51 +403,36 @@ class TestEndpointsHandler(test_base.TestCase):
h_lbaas.EndpointsHandler.on_deleted(m_handler, self._ep)
m_handler._remove_endpoints.assert_called_once_with(self._ep)
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.utils.get_klb_crd_path')
def test__remove_endpoints(self, get_klb_crd_path, get_k8s_client):
k8s = mock.Mock()
get_k8s_client.return_value = k8s
h_lbaas.EndpointsHandler._remove_endpoints(self, self._ep)
k8s.patch_crd.assert_called_once_with('spec',
get_klb_crd_path(self._ep),
'endpointSlices',
action='remove')
def test__remove_endpoints(self, get_klb_crd_path):
m_handler = mock.Mock()
h_lbaas.EndpointsHandler._remove_endpoints(m_handler, self._ep)
m_handler.k8s.patch_crd.assert_called_once_with(
'spec', get_klb_crd_path(self._ep), 'endpointSlices',
action='remove')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch.object(logging.getLogger(
'kuryr_kubernetes.controller.handlers.lbaas'),
'debug')
def test__remove_endpoints_not_found(self, get_k8s_client, log):
k8s = mock.Mock()
get_k8s_client.return_value = k8s
h_lbaas.EndpointsHandler._remove_endpoints(self, self._ep)
k8s.patch_crd.side_effect = k_exc.K8sResourceNotFound(self._ep)
def test__remove_endpoints_not_found(self, log):
m_handler = mock.Mock()
m_handler.k8s.patch_crd.side_effect = k_exc.K8sResourceNotFound('foo')
h_lbaas.EndpointsHandler._remove_endpoints(m_handler, self._ep)
log.assert_called_once()
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
def test__remove_endpoints_client_exception(self, get_k8s_client):
k8s = mock.Mock()
get_k8s_client.return_value = k8s
h_lbaas.EndpointsHandler._remove_endpoints(self, self._ep)
k8s.patch_crd.side_effect = k_exc.K8sClientException(self._ep)
def test__remove_endpoints_client_exception(self):
m_handler = mock.Mock()
m_handler.k8s.patch_crd.side_effect = k_exc.K8sClientException()
self.assertRaises(k_exc.K8sClientException,
h_lbaas.EndpointsHandler._remove_endpoints,
self, self._ep)
m_handler, self._ep)
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch.object(logging.getLogger(
'kuryr_kubernetes.controller.handlers.lbaas'),
'warning')
def test__remove_endpoints_unprocessable_entity(self, get_k8s_client, log):
k8s = mock.Mock()
get_k8s_client.return_value = k8s
h_lbaas.EndpointsHandler._remove_endpoints(self, self._ep)
k8s.patch_crd.side_effect = k_exc.K8sUnprocessableEntity(self._ep)
def test__remove_endpoints_unprocessable_entity(self, log):
m_handler = mock.Mock()
m_handler.k8s.patch_crd.side_effect = k_exc.K8sUnprocessableEntity(
'bar')
h_lbaas.EndpointsHandler._remove_endpoints(m_handler, self._ep)
log.assert_called_once()

View File

@ -259,81 +259,6 @@ class FakeLBaaSDriver(drv_base.LBaaSDriver):
@mock.patch('kuryr_kubernetes.utils.get_subnets_id_cidrs',
mock.Mock(return_value=[('id', 'cidr')]))
class TestKuryrLoadBalancerHandler(test_base.TestCase):
@mock.patch('kuryr_kubernetes.utils.get_subnet_cidr')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceProjectDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.config.CONF')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.ServicePubIpDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.PodSubnetsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.PodProjectDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.LBaaSDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.NodesSubnetsDriver.get_instance')
def test_init(self, m_get_drv_node_subnets, m_get_drv_lbaas,
m_get_drv_project, m_get_drv_subnets,
m_get_drv_service_pub_ip, m_cfg, m_get_svc_sg_drv,
m_get_svc_drv_project, m_get_cidr):
m_get_drv_lbaas.return_value = mock.sentinel.drv_lbaas
m_get_drv_project.return_value = mock.sentinel.drv_project
m_get_drv_subnets.return_value = mock.sentinel.drv_subnets
m_get_cidr.return_value = '10.0.0.128/26'
m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip
m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project
m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg
m_get_drv_node_subnets.return_value = mock.sentinel.drv_node_subnets
handler = h_lb.KuryrLoadBalancerHandler()
self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas)
self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project)
self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets)
self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip)
self.assertEqual(mock.sentinel.drv_node_subnets,
handler._drv_nodes_subnets)
@mock.patch('kuryr_kubernetes.utils.get_subnet_cidr')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceProjectDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.config.CONF')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.ServicePubIpDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.PodSubnetsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.PodProjectDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.LBaaSDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.NodesSubnetsDriver.get_instance')
def test_init_provider_ovn(self, m_get_drv_node_subnets, m_get_drv_lbaas,
m_get_drv_project, m_get_drv_subnets,
m_get_drv_service_pub_ip, m_cfg,
m_get_svc_sg_drv, m_get_svc_drv_project,
m_get_cidr):
m_get_cidr.return_value = '10.0.0.128/26'
m_get_drv_lbaas.return_value = mock.sentinel.drv_lbaas
m_get_drv_project.return_value = mock.sentinel.drv_project
m_get_drv_subnets.return_value = mock.sentinel.drv_subnets
m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip
m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project
m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg
m_get_drv_node_subnets.return_value = mock.sentinel.drv_node_subnets
handler = h_lb .KuryrLoadBalancerHandler()
self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas)
self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project)
self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets)
self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip)
self.assertEqual(mock.sentinel.drv_node_subnets,
handler._drv_nodes_subnets)
def test_on_present(self):
m_drv_service_pub_ip = mock.Mock()
m_drv_service_pub_ip.acquire_service_pub_ip_info.return_value = None
@ -670,14 +595,16 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
lbaas_spec = {}
lbaas.load_balancers.return_value = []
selflink = ['/apis/openstack.org/v1/namespaces/default/'
'kuryrloadbalancers/test',
'/apis/openstack.org/v1/namespaces/default/'
'kuryrloadbalancers/demo']
expected_selflink = ['/apis/openstack.org/v1/namespaces/default/'
'kuryrloadbalancers/test',
'/apis/openstack.org/v1/namespaces/default/'
'kuryrloadbalancers/demo']
h_lb.KuryrLoadBalancerHandler._trigger_loadbalancer_reconciliation(
m_handler, loadbalancer_crds)
lbaas.load_balancers.assert_called_once_with(**lbaas_spec)
m_handler._reconcile_lbaas.assert_called_with(selflink)
selflink = [c['selflink'] for c
in m_handler._reconcile_lbaas.call_args[0][0]]
self.assertEqual(expected_selflink, selflink)
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base'

View File

@ -37,7 +37,8 @@ class TestControllerPipeline(test_base.TestCase):
ret = pipeline._wrap_consumer(consumer)
self.assertEqual(logging_handler, ret)
m_logging_type.assert_called_with(retry_handler)
m_logging_type.assert_called_with(retry_handler,
ignore_exceptions=mock.ANY)
m_retry_type.assert_called_with(consumer, exceptions=mock.ANY)
@mock.patch('kuryr_kubernetes.handlers.logging.LogExceptions')