Merge "Add events for Services"

This commit is contained in:
Zuul 2021-12-15 21:28:45 +00:00 committed by Gerrit Code Review
commit b4699f697e
10 changed files with 330 additions and 328 deletions

View File

@ -50,7 +50,7 @@ def get_loadbalancer_client():
return get_openstacksdk().load_balancer
def get_kubernetes_client():
def get_kubernetes_client() -> k8s_client.K8sClient:
return _clients[_KUBERNETES_CLIENT]

View File

@ -26,6 +26,7 @@ from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__)
CONF = config.CONF
SUPPORTED_SERVICE_TYPES = ('ClusterIP', 'LoadBalancer')
@ -45,6 +46,15 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
self._drv_project = drv_base.ServiceProjectDriver.get_instance()
self._drv_subnets = drv_base.ServiceSubnetsDriver.get_instance()
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()
self._drv_lbaas = drv_base.LBaaSDriver.get_instance()
self.k8s = clients.get_kubernetes_client()
self._lb_provider = None
if self._drv_lbaas.providers_supported():
self._lb_provider = 'amphora'
config_provider = CONF.kubernetes.endpoints_driver_octavia_provider
if config_provider != 'default':
self._lb_provider = config_provider
def _bump_network_policies(self, svc):
if driver_utils.is_network_policy_enabled():
@ -53,16 +63,21 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
def on_present(self, service, *args, **kwargs):
reason = self._should_ignore(service)
if reason:
LOG.debug(reason, service['metadata']['name'])
reason %= utils.get_res_unique_name(service)
LOG.debug(reason)
self.k8s.add_event(service, 'KuryrServiceSkipped', reason)
return
k8s = clients.get_kubernetes_client()
loadbalancer_crd = k8s.get_loadbalancer_crd(service)
loadbalancer_crd = self.k8s.get_loadbalancer_crd(service)
try:
if not self._patch_service_finalizer(service):
return
except k_exc.K8sClientException as ex:
LOG.exception("Failed to set service finalizer: %s", ex)
msg = (f'K8s API error when adding finalizer to Service '
f'{utils.get_res_unique_name(service)}')
LOG.exception(msg)
self.k8s.add_event(service, 'KuryrAddServiceFinalizerError',
f'{msg}: {ex}', 'Warning')
raise
if loadbalancer_crd is None:
@ -108,20 +123,17 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
return None
def _patch_service_finalizer(self, service):
k8s = clients.get_kubernetes_client()
return k8s.add_finalizer(service, k_const.SERVICE_FINALIZER)
return self.k8s.add_finalizer(service, k_const.SERVICE_FINALIZER)
def on_finalize(self, service, *args, **kwargs):
k8s = clients.get_kubernetes_client()
klb_crd_path = utils.get_klb_crd_path(service)
# Bump all the NPs in the namespace to force SG rules
# recalculation.
self._bump_network_policies(service)
try:
k8s.delete(klb_crd_path)
self.k8s.delete(klb_crd_path)
except k_exc.K8sResourceNotFound:
k8s.remove_finalizer(service, k_const.SERVICE_FINALIZER)
self.k8s.remove_finalizer(service, k_const.SERVICE_FINALIZER)
def _has_clusterip(self, service):
# ignore headless service, clusterIP is None
@ -149,17 +161,25 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
svc_namespace = service['metadata']['namespace']
kubernetes = clients.get_kubernetes_client()
spec = self._build_kuryrloadbalancer_spec(service)
owner_reference = {
'apiVersion': service['apiVersion'],
'kind': service['kind'],
'name': service['metadata']['name'],
'uid': service['metadata']['uid'],
}
loadbalancer_crd = {
'apiVersion': 'openstack.org/v1',
'kind': 'KuryrLoadBalancer',
'metadata': {
'name': svc_name,
'finalizers': [k_const.KURYRLB_FINALIZER],
},
'ownerReferences': [owner_reference],
},
'spec': spec,
'status': {
}
}
'status': {},
}
try:
kubernetes.post('{}/{}/kuryrloadbalancers'.format(
@ -169,8 +189,12 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
raise k_exc.ResourceNotReady(svc_name)
except k_exc.K8sNamespaceTerminating:
raise
except k_exc.K8sClientException:
except k_exc.K8sClientException as e:
LOG.exception("Exception when creating KuryrLoadBalancer CRD.")
self.k8s.add_event(
service, 'CreateKLBFailed',
'Error when creating KuryrLoadBalancer object: %s' % e,
'Warning')
raise
def _update_crd_spec(self, loadbalancer_crd, service):
@ -185,13 +209,17 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
LOG.debug('KuryrLoadBalancer CRD not found %s', loadbalancer_crd)
except k_exc.K8sConflict:
raise k_exc.ResourceNotReady(svc_name)
except k_exc.K8sClientException:
except k_exc.K8sClientException as e:
LOG.exception('Error updating kuryrnet CRD %s', loadbalancer_crd)
self.k8s.add_event(
service, 'UpdateKLBFailed',
'Error when updating KuryrLoadBalancer object: %s' % e,
'Warning')
raise
def _get_data_timeout_annotation(self, service):
default_timeout_cli = config.CONF.octavia_defaults.timeout_client_data
default_timeout_mem = config.CONF.octavia_defaults.timeout_member_data
default_timeout_cli = CONF.octavia_defaults.timeout_client_data
default_timeout_mem = CONF.octavia_defaults.timeout_member_data
try:
annotations = service['metadata']['annotations']
except KeyError:
@ -220,13 +248,16 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
subnet_id = self._get_subnet_id(service, project_id, svc_ip)
spec_type = service['spec'].get('type')
spec = {
'ip': svc_ip,
'ports': ports,
'project_id': project_id,
'security_groups_ids': sg_ids,
'subnet_id': subnet_id,
'type': spec_type
}
'ip': svc_ip,
'ports': ports,
'project_id': project_id,
'security_groups_ids': sg_ids,
'subnet_id': subnet_id,
'type': spec_type,
}
if self._lb_provider:
spec['provider'] = self._lb_provider
if spec_lb_ip is not None:
spec['lb_ip'] = spec_lb_ip
@ -288,25 +319,13 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
def __init__(self):
super(EndpointsHandler, self).__init__()
self._drv_lbaas = drv_base.LBaaSDriver.get_instance()
# Note(yboaron) LBaaS driver supports 'provider' parameter in
# Load Balancer creation flow.
# We need to set the requested load balancer provider
# according to 'endpoints_driver_octavia_provider' configuration.
self._lb_provider = None
if self._drv_lbaas.providers_supported():
self._lb_provider = 'amphora'
if (config.CONF.kubernetes.endpoints_driver_octavia_provider
!= 'default'):
self._lb_provider = (
config.CONF.kubernetes.endpoints_driver_octavia_provider)
self.k8s = clients.get_kubernetes_client()
def on_present(self, endpoints, *args, **kwargs):
ep_name = endpoints['metadata']['name']
ep_namespace = endpoints['metadata']['namespace']
k8s = clients.get_kubernetes_client()
loadbalancer_crd = k8s.get_loadbalancer_crd(endpoints)
loadbalancer_crd = self.k8s.get_loadbalancer_crd(endpoints)
if (not (self._has_pods(endpoints) or (loadbalancer_crd and
loadbalancer_crd.get('status')))
@ -318,15 +337,14 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
return
if loadbalancer_crd is None:
raise k_exc.KuryrLoadBalancerNotCreated(endpoints)
else:
try:
self._create_crd_spec(endpoints)
self._update_crd_spec(loadbalancer_crd, endpoints)
except k_exc.K8sNamespaceTerminating:
LOG.warning('Namespace %s is being terminated, ignoring '
'Endpoints %s in that namespace.',
ep_namespace, ep_name)
return
else:
self._update_crd_spec(loadbalancer_crd, endpoints)
def on_deleted(self, endpoints, *args, **kwargs):
self._remove_endpoints(endpoints)
@ -365,84 +383,52 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
return endpointslices
def _create_crd_spec(self, endpoints, spec=None, status=None):
endpoints_name = endpoints['metadata']['name']
namespace = endpoints['metadata']['namespace']
kubernetes = clients.get_kubernetes_client()
# TODO(maysams): Remove the convertion once we start handling
# Endpoint slices.
epslices = self._convert_subsets_to_endpointslice(endpoints)
if not status:
status = {}
if not spec:
spec = {'endpointSlices': epslices}
# NOTE(maysams): As the spec may already contain a
# ports field from the Service, a new endpointslice
# field is introduced to also hold ports from the
# Endpoints under the spec.
loadbalancer_crd = {
'apiVersion': 'openstack.org/v1',
'kind': 'KuryrLoadBalancer',
'metadata': {
'name': endpoints_name,
'finalizers': [k_const.KURYRLB_FINALIZER],
},
'spec': spec,
'status': status,
}
if self._lb_provider:
loadbalancer_crd['spec']['provider'] = self._lb_provider
def _add_event(self, endpoints, reason, message, type_=None):
"""_add_event adds an event for the corresponding Service."""
try:
kubernetes.post('{}/{}/kuryrloadbalancers'.format(
k_const.K8S_API_CRD_NAMESPACES, namespace), loadbalancer_crd)
except k_exc.K8sConflict:
raise k_exc.ResourceNotReady(loadbalancer_crd)
except k_exc.K8sNamespaceTerminating:
raise
service = self.k8s.get(utils.get_service_link(endpoints))
except k_exc.K8sClientException:
LOG.exception("Exception when creating KuryrLoadBalancer CRD.")
raise
LOG.debug('Error when fetching Service to add an event %s, '
'ignoring', utils.get_res_unique_name(endpoints))
return
kwargs = {'type_': type_} if type_ else {}
self.k8s.add_event(service, reason, message, **kwargs)
def _update_crd_spec(self, loadbalancer_crd, endpoints):
kubernetes = clients.get_kubernetes_client()
# TODO(maysams): Remove the convertion once we start handling
# Endpoint slices.
# TODO(maysams): Remove the conversion once we start handling
# EndpointSlices.
epslices = self._convert_subsets_to_endpointslice(endpoints)
spec = {'endpointSlices': epslices}
if self._lb_provider:
spec['provider'] = self._lb_provider
try:
kubernetes.patch_crd(
'spec',
utils.get_res_link(loadbalancer_crd),
spec)
self.k8s.patch_crd('spec', utils.get_res_link(loadbalancer_crd),
{'endpointSlices': epslices})
except k_exc.K8sResourceNotFound:
LOG.debug('KuryrLoadbalancer CRD not found %s', loadbalancer_crd)
except k_exc.K8sConflict:
raise k_exc.ResourceNotReady(loadbalancer_crd)
except k_exc.K8sClientException:
except k_exc.K8sClientException as e:
LOG.exception('Error updating KuryrLoadbalancer CRD %s',
loadbalancer_crd)
self._add_event(
endpoints, 'UpdateKLBFailed',
'Error when updating KuryrLoadBalancer object: %s' % e,
'Warning')
raise
return True
def _remove_endpoints(self, endpoints):
kubernetes = clients.get_kubernetes_client()
lb_name = endpoints['metadata']['name']
try:
kubernetes.patch_crd('spec',
utils.get_klb_crd_path(endpoints),
'endpointSlices',
action='remove')
self.k8s.patch_crd('spec', utils.get_klb_crd_path(endpoints),
'endpointSlices', action='remove')
except k_exc.K8sResourceNotFound:
LOG.debug('KuryrLoadBalancer CRD not found %s', lb_name)
except k_exc.K8sUnprocessableEntity:
LOG.warning('KuryrLoadBalancer %s modified, ignoring.', lb_name)
except k_exc.K8sClientException:
except k_exc.K8sClientException as e:
LOG.exception('Error updating KuryrLoadBalancer CRD %s', lb_name)
self._add_event(
endpoints, 'UpdateKLBFailed',
'Error when updating KuryrLoadBalancer object: %s' % e,
'Warning')
raise

View File

@ -53,26 +53,56 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
self._drv_svc_project = drv_base.ServiceProjectDriver.get_instance()
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()
self._drv_nodes_subnets = drv_base.NodesSubnetsDriver.get_instance()
self.k8s = clients.get_kubernetes_client()
def _get_nodes_subnets(self):
return utils.get_subnets_id_cidrs(
self._drv_nodes_subnets.get_nodes_subnets())
def _add_event(self, klb, reason, message, type_=None):
"""_add_event adds an event for the corresponding Service."""
klb_meta = klb['metadata']
for ref in klb_meta.get('ownerReferences', []):
# "mock" a Service based on ownerReference to it.
if ref['kind'] == 'Service' and ref['name'] == klb_meta['name']:
service = {
'apiVersion': ref['apiVersion'],
'kind': ref['kind'],
'metadata': {
'name': ref['name'],
'uid': ref['uid'],
'namespace': klb_meta['namespace'], # ref shares ns
},
}
break
else:
# No reference, just fetch the service from the API.
try:
service = self.k8s.get(
f"{k_const.K8S_API_NAMESPACES}/{klb_meta['namespace']}"
f"/services/{klb_meta['name']}")
except k_exc.K8sClientException:
LOG.debug('Error when fetching Service to add an event %s, '
'ignoring', utils.get_res_unique_name(klb))
return
kwargs = {'type_': type_} if type_ else {}
self.k8s.add_event(service, reason, message, **kwargs)
def on_present(self, loadbalancer_crd, *args, **kwargs):
if loadbalancer_crd.get('status', None) is None:
kubernetes = clients.get_kubernetes_client()
try:
kubernetes.patch_crd('status',
utils.get_res_link(loadbalancer_crd),
{})
self.k8s.patch_crd('status',
utils.get_res_link(loadbalancer_crd), {})
except k_exc.K8sResourceNotFound:
LOG.debug('KuryrLoadbalancer CRD not found %s',
utils.get_res_unique_name(loadbalancer_crd))
return
if self._should_ignore(loadbalancer_crd):
LOG.debug("Ignoring Kubernetes service %s",
loadbalancer_crd['metadata']['name'])
reason = self._should_ignore(loadbalancer_crd)
if reason:
reason %= utils.get_res_unique_name(loadbalancer_crd)
LOG.debug(reason)
self._add_event(loadbalancer_crd, 'KuryrServiceSkipped', reason)
return
crd_lb = loadbalancer_crd['status'].get('loadbalancer')
@ -83,15 +113,34 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
if not lb_provider or lb_provider in OCTAVIA_DEFAULT_PROVIDERS:
if (spec_lb_provider and
spec_lb_provider not in OCTAVIA_DEFAULT_PROVIDERS):
self._add_event(loadbalancer_crd, 'KuryrUpdateProvider',
'Deleting Amphora load balancer to '
'recreate it with OVN provider')
self._ensure_release_lbaas(loadbalancer_crd)
# ovn to amphora downgrade
elif lb_provider and lb_provider not in OCTAVIA_DEFAULT_PROVIDERS:
if (not spec_lb_provider or
spec_lb_provider in OCTAVIA_DEFAULT_PROVIDERS):
self._add_event(loadbalancer_crd, 'KuryrUpdateProvider',
'Deleting OVN load balancer to '
'recreate it with Amphora provider')
self._ensure_release_lbaas(loadbalancer_crd)
if self._sync_lbaas_members(loadbalancer_crd):
if not crd_lb:
self._add_event(loadbalancer_crd, 'KuryrEnsureLB',
'Provisioning a load balancer')
try:
changed = self._sync_lbaas_members(loadbalancer_crd)
except Exception as e:
self._add_event(
loadbalancer_crd, 'KuryrEnsureLBError',
f'Error when provisioning load balancer: {e}', 'Warning')
raise
if changed:
self._add_event(loadbalancer_crd, 'KuryrEnsuredLB',
'Load balancer provisioned')
# Note(yboaron) For LoadBalancer services, we should allocate FIP,
# associate it to LB VIP and update K8S service status
lb_ip = loadbalancer_crd['spec'].get('lb_ip')
@ -106,6 +155,9 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
loadbalancer_crd['status']['loadbalancer'][
'port_id']))
if service_pub_ip_info:
self._add_event(
loadbalancer_crd, 'KuryrEnsureFIP',
'Associating floating IP to the load balancer')
self._drv_service_pub_ip.associate_pub_ip(
service_pub_ip_info, loadbalancer_crd['status'][
'loadbalancer']['port_id'])
@ -128,10 +180,12 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
def _trigger_loadbalancer_reconciliation(self, loadbalancer_crds):
LOG.debug("Reconciling the loadbalancer CRDs")
# get the loadbalancers id in the CRD status
crd_loadbalancer_ids = [{'id': loadbalancer_crd.get('status', {}).get(
'loadbalancer', {}).get('id', {}), 'selflink':
utils.get_res_link(loadbalancer_crd)} for
loadbalancer_crd in loadbalancer_crds]
crd_loadbalancer_ids = [
{'id': loadbalancer_crd.get('status', {}).get(
'loadbalancer', {}).get('id', {}),
'selflink': utils.get_res_link(loadbalancer_crd),
'klb': loadbalancer_crd} for
loadbalancer_crd in loadbalancer_crds]
lbaas = clients.get_loadbalancer_client()
lbaas_spec = {}
self._drv_lbaas.add_tags('loadbalancer', lbaas_spec)
@ -140,21 +194,24 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
for loadbalancer in loadbalancers]
# for each loadbalancer id in the CRD status, check if exists in
# OpenStack
crds_to_reconcile_selflink = [crd_lb['selflink'] for crd_lb in
crd_loadbalancer_ids if
crd_lb['id'] not in loadbalancers_id]
if not crds_to_reconcile_selflink:
crds_to_reconcile = [crd_lb for crd_lb in crd_loadbalancer_ids
if crd_lb['id'] not in loadbalancers_id]
if not crds_to_reconcile:
LOG.debug("KuryrLoadBalancer CRDs already in sync with OpenStack")
return
LOG.debug("Reconciling the following KuryrLoadBalancer CRDs: %r",
crds_to_reconcile_selflink)
self._reconcile_lbaas(crds_to_reconcile_selflink)
[klb['id'] for klb in crds_to_reconcile])
self._reconcile_lbaas(crds_to_reconcile)
def _reconcile_lbaas(self, crds_to_reconcile_selflink):
kubernetes = clients.get_kubernetes_client()
for selflink in crds_to_reconcile_selflink:
def _reconcile_lbaas(self, crds_to_reconcile):
for entry in crds_to_reconcile:
selflink = entry['selflink']
try:
kubernetes.patch_crd('status', selflink, {})
self._add_event(
entry['klb'], 'LoadBalancerRecreating',
'Load balancer for the Service seems to not exist anymore.'
' Recreating it.', 'Warning')
self.k8s.patch_crd('status', selflink, {})
except k_exc.K8sResourceNotFound:
LOG.debug('Unable to reconcile the KuryLoadBalancer CRD %s',
selflink)
@ -165,9 +222,12 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
continue
def _should_ignore(self, loadbalancer_crd):
return (not(self._has_endpoints(loadbalancer_crd) or
loadbalancer_crd.get('status')) or not
loadbalancer_crd['spec'].get('ip'))
if not(self._has_endpoints(loadbalancer_crd) or
loadbalancer_crd.get('status')):
return 'Skipping Service %s without Endpoints'
elif not loadbalancer_crd['spec'].get('ip'):
return 'Skipping Service %s without IP set yet'
return False
def _has_endpoints(self, loadbalancer_crd):
ep_slices = loadbalancer_crd['spec'].get('endpointSlices', [])
@ -179,9 +239,20 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
LOG.debug("Deleting the loadbalancer CRD")
if loadbalancer_crd['status'] != {}:
# NOTE(ivc): deleting pool deletes its members
self._drv_lbaas.release_loadbalancer(
loadbalancer=loadbalancer_crd['status'].get('loadbalancer'))
self._add_event(loadbalancer_crd, 'KuryrReleaseLB',
'Releasing the load balancer')
try:
# NOTE(ivc): deleting pool deletes its members
self._drv_lbaas.release_loadbalancer(
loadbalancer_crd['status'].get('loadbalancer'))
except Exception as e:
# FIXME(dulek): It seems like if loadbalancer will be stuck in
# PENDING_DELETE we'll just silently time out
# waiting for it to be deleted. Is that expected?
self._add_event(
loadbalancer_crd, 'KuryrReleaseLBError',
f'Error when releasing load balancer: {e}', 'Warning')
raise
try:
pub_info = loadbalancer_crd['status']['service_pub_ip_info']
@ -189,43 +260,53 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
pub_info = None
if pub_info:
self._add_event(
loadbalancer_crd, 'KuryrReleaseFIP',
'Dissociating floating IP from the load balancer')
self._drv_service_pub_ip.release_pub_ip(
loadbalancer_crd['status']['service_pub_ip_info'])
kubernetes = clients.get_kubernetes_client()
LOG.debug('Removing finalizer from KuryrLoadBalancer CRD %s',
loadbalancer_crd)
try:
kubernetes.remove_finalizer(loadbalancer_crd,
k_const.KURYRLB_FINALIZER)
except k_exc.K8sClientException:
LOG.exception('Error removing kuryrloadbalancer CRD finalizer '
'for %s', loadbalancer_crd)
self.k8s.remove_finalizer(loadbalancer_crd,
k_const.KURYRLB_FINALIZER)
except k_exc.K8sClientException as e:
msg = (f'K8s API error when removing finalizer from '
f'KuryrLoadBalancer of Service '
f'{utils.get_res_unique_name(loadbalancer_crd)}')
LOG.exception(msg)
self._add_event(loadbalancer_crd, 'KuryrRemoveLBFinalizerError',
f'{msg}: {e}', 'Warning')
raise
namespace = loadbalancer_crd['metadata']['namespace']
name = loadbalancer_crd['metadata']['name']
try:
service = kubernetes.get(f"{k_const.K8S_API_NAMESPACES}"
f"/{namespace}/services/{name}")
except k_exc.K8sResourceNotFound as ex:
LOG.warning("Failed to get service: %s", ex)
service = self.k8s.get(f"{k_const.K8S_API_NAMESPACES}/{namespace}"
f"/services/{name}")
except k_exc.K8sResourceNotFound:
LOG.warning('Service %s not found. This is unexpected.',
utils.get_res_unique_name(loadbalancer_crd))
return
LOG.debug('Removing finalizer from service %s',
service["metadata"]["name"])
LOG.debug('Removing finalizer from Service %s',
utils.get_res_unique_name(service))
try:
kubernetes.remove_finalizer(service, k_const.SERVICE_FINALIZER)
except k_exc.K8sClientException:
LOG.exception('Error removing service finalizer '
'for %s', service["metadata"]["name"])
self.k8s.remove_finalizer(service, k_const.SERVICE_FINALIZER)
except k_exc.K8sClientException as e:
msg = (f'K8s API error when removing finalizer from Service '
f'{utils.get_res_unique_name(service)}')
LOG.exception(msg)
self._add_event(
loadbalancer_crd, 'KuryrRemoveServiceFinalizerError',
f'{msg}: {e}', 'Warning')
raise
def _patch_status(self, loadbalancer_crd):
kubernetes = clients.get_kubernetes_client()
try:
kubernetes.patch_crd('status', utils.get_res_link(
loadbalancer_crd), loadbalancer_crd['status'])
self.k8s.patch_crd('status', utils.get_res_link(loadbalancer_crd),
loadbalancer_crd['status'])
except k_exc.K8sResourceNotFound:
LOG.debug('KuryrLoadBalancer CRD not found %s', loadbalancer_crd)
return False
@ -233,16 +314,20 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
LOG.warning('KuryrLoadBalancer %s modified, retrying later.',
utils.get_res_unique_name(loadbalancer_crd))
return False
except k_exc.K8sClientException:
LOG.exception('Error updating KuryLoadbalancer CRD %s',
loadbalancer_crd)
except k_exc.K8sClientException as e:
msg = (f'K8s API error when updating status of '
f'{utils.get_res_unique_name(loadbalancer_crd)} Service '
f'load balancer')
LOG.exception(msg)
self._add_event(loadbalancer_crd, 'KuryrUpdateLBStatusError',
f'{msg}: {e}', 'Warning')
raise
return True
def _sync_lbaas_members(self, loadbalancer_crd):
changed = False
if (self._remove_unused_members(loadbalancer_crd)):
if self._remove_unused_members(loadbalancer_crd):
changed = True
if self._sync_lbaas_pools(loadbalancer_crd):
@ -258,9 +343,8 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
lb = klb_crd['status'].get('loadbalancer')
svc_name = klb_crd['metadata']['name']
svc_namespace = klb_crd['metadata']['namespace']
k8s = clients.get_kubernetes_client()
try:
service = k8s.get(
service = self.k8s.get(
f'{k_const.K8S_API_NAMESPACES}/{svc_namespace}/'
f'services/{svc_name}')
except k_exc.K8sResourceNotFound:
@ -275,8 +359,9 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
lb['security_groups'] = lb_sgs
try:
k8s.patch_crd('status/loadbalancer', utils.get_res_link(klb_crd),
{'security_groups': lb_sgs})
self.k8s.patch_crd('status/loadbalancer',
utils.get_res_link(klb_crd),
{'security_groups': lb_sgs})
except k_exc.K8sResourceNotFound:
LOG.debug('KuryrLoadBalancer %s not found', svc_name)
return None
@ -284,8 +369,13 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
LOG.debug('KuryrLoadBalancer entity not processable '
'due to missing loadbalancer field.')
return None
except k_exc.K8sClientException:
LOG.exception('Error syncing KuryrLoadBalancer %s', svc_name)
except k_exc.K8sClientException as e:
msg = (f'K8s API error when updating SGs status of '
f'{utils.get_res_unique_name(klb_crd)} Service load '
f'balancer')
LOG.exception(msg)
self._add_event(klb_crd, 'KuryrUpdateLBStatusError',
f'{msg}: {e}', 'Warning')
raise
return klb_crd
@ -359,8 +449,14 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
target_pod, target_ip, loadbalancer_crd)
if not member_subnet_id:
LOG.warning("Skipping member creation for %s",
target_ip)
msg = (
f'Unable to determine ID of the subnet of member '
f'{target_ip} for service '
f'{utils.get_res_unique_name(loadbalancer_crd)}. '
f'Skipping its creation')
self._add_event(loadbalancer_crd, 'KuryrSkipMember',
msg, 'Warning')
LOG.warning(msg)
continue
target_name, target_namespace = self._get_target_info(
@ -617,8 +713,6 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
for port_spec in lbaas_spec_ports:
protocol = port_spec['protocol']
port = port_spec['port']
name = "%s:%s" % (loadbalancer_crd['status']['loadbalancer'][
'name'], protocol)
listener = []
for l in loadbalancer_crd['status'].get('listeners', []):
@ -638,12 +732,23 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
'listeners', []) if l['port'] == port]
if listener and not self._drv_lbaas.double_listeners_supported():
LOG.warning("Skipping listener creation for %s as another one"
" already exists with port %s", name, port)
msg = (
f'Octavia does not support multiple listeners listening '
f'on the same port. Skipping creation of listener '
f'{protocol}:{port} because {listener["protocol"]}:'
f'{listener["port"]} already exists for Service '
f'{utils.get_res_unique_name(loadbalancer_crd)}')
self._add_event(loadbalancer_crd, 'KuryrSkipListener', msg,
'Warning')
LOG.warning(msg)
continue
if protocol == "SCTP" and not self._drv_lbaas.sctp_supported():
LOG.warning("Skipping listener creation as provider does"
" not support %s protocol", protocol)
msg = (
f'Skipping listener {protocol}:{port} creation as Octavia '
f'does not support {protocol} protocol.')
self._add_event(loadbalancer_crd, 'KuryrSkipListener', msg,
'Warning')
LOG.warning(msg)
continue
listener = self._drv_lbaas.ensure_listener(
loadbalancer=loadbalancer_crd['status'].get('loadbalancer'),
@ -697,17 +802,18 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
ns = lb_crd['metadata']['namespace']
status_data = {"loadBalancer": {
"ingress": [{"ip": lb_ip_address.format()}]}}
k8s = clients.get_kubernetes_client()
try:
k8s.patch("status", f"{k_const.K8S_API_NAMESPACES}"
f"/{ns}/services/{name}/status",
status_data)
self.k8s.patch("status", f"{k_const.K8S_API_NAMESPACES}"
f"/{ns}/services/{name}/status",
status_data)
except k_exc.K8sConflict:
raise k_exc.ResourceNotReady(name)
except k_exc.K8sClientException:
LOG.exception("Kubernetes Client Exception"
"when updating the svc status %s"
% name)
except k_exc.K8sClientException as e:
msg = (f'K8s API error when updating external FIP data of Service '
f'{utils.get_res_unique_name(lb_crd)}')
LOG.exception(msg)
self._add_event(lb_crd, 'KuryrUpdateServiceStatusError',
f'{msg}: {e}', 'Warning')
raise
def _sync_lbaas_loadbalancer(self, loadbalancer_crd):
@ -754,29 +860,26 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
def _ensure_release_lbaas(self, loadbalancer_crd):
attempts = 0
deadline = 0
retry = True
timeout = config.CONF.kubernetes.watch_retry_timeout
while retry:
deadline = time.time() + timeout
while True:
try:
if attempts == 1:
deadline = time.time() + timeout
if (attempts > 0 and
utils.exponential_sleep(deadline, attempts) == 0):
LOG.error("Failed releasing lbaas '%s': deadline exceeded",
loadbalancer_crd['status']['loadbalancer'][
'name'])
if not utils.exponential_sleep(deadline, attempts):
msg = (f'Timed out waiting for deletion of load balancer '
f'{utils.get_res_unique_name(loadbalancer_crd)}')
self._add_event(
loadbalancer_crd, 'KuryrLBReleaseTimeout', msg,
'Warning')
LOG.error(msg)
return
self._drv_lbaas.release_loadbalancer(
loadbalancer=loadbalancer_crd['status'].get('loadbalancer')
)
retry = False
loadbalancer_crd['status'].get('loadbalancer'))
break
except k_exc.ResourceNotReady:
LOG.debug("Attempt (%s) of loadbalancer release %s failed."
LOG.debug("Attempt %s to release LB %s failed."
" A retry will be triggered.", attempts,
loadbalancer_crd['status']['loadbalancer']['name'])
utils.get_res_unique_name(loadbalancer_crd))
attempts += 1
retry = True
loadbalancer_crd['status'] = {}
self._patch_status(loadbalancer_crd)

View File

@ -58,11 +58,13 @@ class ControllerPipeline(h_dis.EventPipeline):
def _wrap_consumer(self, consumer):
# TODO(ivc): tune retry interval/timeout
return h_log.LogExceptions(h_retry.Retry(
consumer, exceptions=(
exceptions.ResourceNotReady,
key_exc.connection.ConnectFailure,
requests_exc.ConnectionError)))
return h_log.LogExceptions(
h_retry.Retry(
consumer,
exceptions=(exceptions.ResourceNotReady,
key_exc.connection.ConnectFailure,
requests_exc.ConnectionError)),
ignore_exceptions=(exceptions.KuryrLoadBalancerNotCreated,))
def _wrap_dispatcher(self, dispatcher):
return h_log.LogExceptions(h_async.Async(dispatcher, self._tg,

View File

@ -42,6 +42,13 @@ class ResourceNotReady(Exception):
super(ResourceNotReady, self).__init__("Resource not ready: %r" % msg)
class KuryrLoadBalancerNotCreated(Exception):
def __init__(self, res):
name = utils.get_res_unique_name(res)
super().__init__(
'KuryrLoadBalancer not created yet for the Service %s' % name)
class LoadBalancerNotReady(ResourceNotReady):
def __init__(self, loadbalancer_id, status):
super().__init__(

View File

@ -28,13 +28,16 @@ class LogExceptions(base.EventHandler):
instead.
"""
def __init__(self, handler, exceptions=Exception):
def __init__(self, handler, exceptions=Exception, ignore_exceptions=None):
self._handler = handler
self._exceptions = exceptions
self._ignore_exceptions = ignore_exceptions or ()
def __call__(self, event, *args, **kwargs):
try:
self._handler(event, *args, **kwargs)
except self._ignore_exceptions:
pass
except self._exceptions as ex:
# If exception comes from OpenStack SDK and contains
# 'request_id' then print this 'request_id' along the Exception.

View File

@ -97,7 +97,10 @@ class Retry(base.EventHandler):
.get_instance())
method = getattr(exporter, cls_map[type(exc).__name__])
method()
except exceptions.KuryrLoadBalancerNotCreated:
with excutils.save_and_reraise_exception() as ex:
if self._sleep(deadline, attempt, ex.value):
ex.reraise = False
except os_exc.ConflictException as ex:
if ex.details.startswith('Quota exceeded for resources'):
with excutils.save_and_reraise_exception() as ex:

View File

@ -27,23 +27,6 @@ _SUPPORTED_LISTENER_PROT = ('HTTP', 'HTTPS', 'TCP')
class TestServiceHandler(test_base.TestCase):
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.ServiceSubnetsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.ServiceProjectDriver.get_instance')
def test_init(self, m_get_drv_project, m_get_drv_subnets, m_get_drv_sg):
m_get_drv_project.return_value = mock.sentinel.drv_project
m_get_drv_subnets.return_value = mock.sentinel.drv_subnets
m_get_drv_sg.return_value = mock.sentinel.drv_sg
handler = h_lbaas.ServiceHandler()
self.assertEqual(mock.sentinel.drv_project, handler._drv_project)
self.assertEqual(mock.sentinel.drv_subnets, handler._drv_subnets)
self.assertEqual(mock.sentinel.drv_sg, handler._drv_sg)
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
def test_on_present(self, get_k8s_client):
svc_event = {
@ -114,6 +97,7 @@ class TestServiceHandler(test_base.TestCase):
m_handler.create_crd_spec.return_value = new_spec
m_handler._should_ignore.return_value = False
m_handler._drv_project = m_drv_project
m_handler.k8s = mock.Mock()
h_lbaas.ServiceHandler.on_present(m_handler, svc_event)
m_handler.create_crd_spec(svc_event)
@ -179,6 +163,7 @@ class TestServiceHandler(test_base.TestCase):
m_handler.create_crd_spec.return_value = old_spec
m_handler._should_ignore.return_value = False
m_handler._drv_project = m_drv_project
m_handler.k8s = mock.Mock()
h_lbaas.ServiceHandler.on_present(m_handler, svc_event)
m_handler.create_crd_spec(svc_event)
@ -418,51 +403,36 @@ class TestEndpointsHandler(test_base.TestCase):
h_lbaas.EndpointsHandler.on_deleted(m_handler, self._ep)
m_handler._remove_endpoints.assert_called_once_with(self._ep)
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.utils.get_klb_crd_path')
def test__remove_endpoints(self, get_klb_crd_path, get_k8s_client):
k8s = mock.Mock()
get_k8s_client.return_value = k8s
h_lbaas.EndpointsHandler._remove_endpoints(self, self._ep)
k8s.patch_crd.assert_called_once_with('spec',
get_klb_crd_path(self._ep),
'endpointSlices',
action='remove')
def test__remove_endpoints(self, get_klb_crd_path):
m_handler = mock.Mock()
h_lbaas.EndpointsHandler._remove_endpoints(m_handler, self._ep)
m_handler.k8s.patch_crd.assert_called_once_with(
'spec', get_klb_crd_path(self._ep), 'endpointSlices',
action='remove')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch.object(logging.getLogger(
'kuryr_kubernetes.controller.handlers.lbaas'),
'debug')
def test__remove_endpoints_not_found(self, get_k8s_client, log):
k8s = mock.Mock()
get_k8s_client.return_value = k8s
h_lbaas.EndpointsHandler._remove_endpoints(self, self._ep)
k8s.patch_crd.side_effect = k_exc.K8sResourceNotFound(self._ep)
def test__remove_endpoints_not_found(self, log):
m_handler = mock.Mock()
m_handler.k8s.patch_crd.side_effect = k_exc.K8sResourceNotFound('foo')
h_lbaas.EndpointsHandler._remove_endpoints(m_handler, self._ep)
log.assert_called_once()
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
def test__remove_endpoints_client_exception(self, get_k8s_client):
k8s = mock.Mock()
get_k8s_client.return_value = k8s
h_lbaas.EndpointsHandler._remove_endpoints(self, self._ep)
k8s.patch_crd.side_effect = k_exc.K8sClientException(self._ep)
def test__remove_endpoints_client_exception(self):
m_handler = mock.Mock()
m_handler.k8s.patch_crd.side_effect = k_exc.K8sClientException()
self.assertRaises(k_exc.K8sClientException,
h_lbaas.EndpointsHandler._remove_endpoints,
self, self._ep)
m_handler, self._ep)
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch.object(logging.getLogger(
'kuryr_kubernetes.controller.handlers.lbaas'),
'warning')
def test__remove_endpoints_unprocessable_entity(self, get_k8s_client, log):
k8s = mock.Mock()
get_k8s_client.return_value = k8s
h_lbaas.EndpointsHandler._remove_endpoints(self, self._ep)
k8s.patch_crd.side_effect = k_exc.K8sUnprocessableEntity(self._ep)
def test__remove_endpoints_unprocessable_entity(self, log):
m_handler = mock.Mock()
m_handler.k8s.patch_crd.side_effect = k_exc.K8sUnprocessableEntity(
'bar')
h_lbaas.EndpointsHandler._remove_endpoints(m_handler, self._ep)
log.assert_called_once()

View File

@ -259,81 +259,6 @@ class FakeLBaaSDriver(drv_base.LBaaSDriver):
@mock.patch('kuryr_kubernetes.utils.get_subnets_id_cidrs',
mock.Mock(return_value=[('id', 'cidr')]))
class TestKuryrLoadBalancerHandler(test_base.TestCase):
@mock.patch('kuryr_kubernetes.utils.get_subnet_cidr')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceProjectDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.config.CONF')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.ServicePubIpDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.PodSubnetsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.PodProjectDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.LBaaSDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.NodesSubnetsDriver.get_instance')
def test_init(self, m_get_drv_node_subnets, m_get_drv_lbaas,
m_get_drv_project, m_get_drv_subnets,
m_get_drv_service_pub_ip, m_cfg, m_get_svc_sg_drv,
m_get_svc_drv_project, m_get_cidr):
m_get_drv_lbaas.return_value = mock.sentinel.drv_lbaas
m_get_drv_project.return_value = mock.sentinel.drv_project
m_get_drv_subnets.return_value = mock.sentinel.drv_subnets
m_get_cidr.return_value = '10.0.0.128/26'
m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip
m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project
m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg
m_get_drv_node_subnets.return_value = mock.sentinel.drv_node_subnets
handler = h_lb.KuryrLoadBalancerHandler()
self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas)
self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project)
self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets)
self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip)
self.assertEqual(mock.sentinel.drv_node_subnets,
handler._drv_nodes_subnets)
@mock.patch('kuryr_kubernetes.utils.get_subnet_cidr')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceProjectDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.config.CONF')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.ServicePubIpDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.PodSubnetsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.PodProjectDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.LBaaSDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.NodesSubnetsDriver.get_instance')
def test_init_provider_ovn(self, m_get_drv_node_subnets, m_get_drv_lbaas,
m_get_drv_project, m_get_drv_subnets,
m_get_drv_service_pub_ip, m_cfg,
m_get_svc_sg_drv, m_get_svc_drv_project,
m_get_cidr):
m_get_cidr.return_value = '10.0.0.128/26'
m_get_drv_lbaas.return_value = mock.sentinel.drv_lbaas
m_get_drv_project.return_value = mock.sentinel.drv_project
m_get_drv_subnets.return_value = mock.sentinel.drv_subnets
m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip
m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project
m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg
m_get_drv_node_subnets.return_value = mock.sentinel.drv_node_subnets
handler = h_lb .KuryrLoadBalancerHandler()
self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas)
self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project)
self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets)
self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip)
self.assertEqual(mock.sentinel.drv_node_subnets,
handler._drv_nodes_subnets)
def test_on_present(self):
m_drv_service_pub_ip = mock.Mock()
m_drv_service_pub_ip.acquire_service_pub_ip_info.return_value = None
@ -670,14 +595,16 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
lbaas_spec = {}
lbaas.load_balancers.return_value = []
selflink = ['/apis/openstack.org/v1/namespaces/default/'
'kuryrloadbalancers/test',
'/apis/openstack.org/v1/namespaces/default/'
'kuryrloadbalancers/demo']
expected_selflink = ['/apis/openstack.org/v1/namespaces/default/'
'kuryrloadbalancers/test',
'/apis/openstack.org/v1/namespaces/default/'
'kuryrloadbalancers/demo']
h_lb.KuryrLoadBalancerHandler._trigger_loadbalancer_reconciliation(
m_handler, loadbalancer_crds)
lbaas.load_balancers.assert_called_once_with(**lbaas_spec)
m_handler._reconcile_lbaas.assert_called_with(selflink)
selflink = [c['selflink'] for c
in m_handler._reconcile_lbaas.call_args[0][0]]
self.assertEqual(expected_selflink, selflink)
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base'

View File

@ -37,7 +37,8 @@ class TestControllerPipeline(test_base.TestCase):
ret = pipeline._wrap_consumer(consumer)
self.assertEqual(logging_handler, ret)
m_logging_type.assert_called_with(retry_handler)
m_logging_type.assert_called_with(retry_handler,
ignore_exceptions=mock.ANY)
m_retry_type.assert_called_with(consumer, exceptions=mock.ANY)
@mock.patch('kuryr_kubernetes.handlers.logging.LogExceptions')