Merge "Convert KuryrLoadBalancer subsets CRD to EndpointSlice"
This commit is contained in:
commit
93aafb1261
|
@ -30,6 +30,53 @@ spec:
|
|||
spec:
|
||||
type: object
|
||||
properties:
|
||||
endpointSlices:
|
||||
type: array
|
||||
items:
|
||||
type: object
|
||||
properties:
|
||||
endpoints:
|
||||
type: array
|
||||
items:
|
||||
type: object
|
||||
properties:
|
||||
addresses:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
conditions:
|
||||
type: object
|
||||
properties:
|
||||
ready:
|
||||
type: boolean
|
||||
hostname:
|
||||
type: string
|
||||
targetRef:
|
||||
type: object
|
||||
properties:
|
||||
kind:
|
||||
type: string
|
||||
name:
|
||||
type: string
|
||||
namespace:
|
||||
type: string
|
||||
resourceVersion:
|
||||
type: string
|
||||
uid:
|
||||
type: string
|
||||
topology:
|
||||
type: object
|
||||
ports:
|
||||
type: array
|
||||
items:
|
||||
type: object
|
||||
properties:
|
||||
name:
|
||||
type: string
|
||||
port:
|
||||
type: integer
|
||||
protocol:
|
||||
type: string
|
||||
ip:
|
||||
type: string
|
||||
lb_ip:
|
||||
|
@ -61,48 +108,6 @@ spec:
|
|||
type: string
|
||||
type:
|
||||
type: string
|
||||
subsets:
|
||||
type: array
|
||||
items:
|
||||
type: object
|
||||
properties:
|
||||
addresses:
|
||||
type: array
|
||||
items:
|
||||
type: object
|
||||
properties:
|
||||
hostname:
|
||||
type: string
|
||||
ip:
|
||||
type: string
|
||||
nodeName:
|
||||
type: string
|
||||
targetRef:
|
||||
type: object
|
||||
properties:
|
||||
apiVersion:
|
||||
type: string
|
||||
kind:
|
||||
type: string
|
||||
name:
|
||||
type: string
|
||||
namespace:
|
||||
type: string
|
||||
resourceVersion:
|
||||
type: string
|
||||
uid:
|
||||
type: string
|
||||
ports:
|
||||
type: array
|
||||
items:
|
||||
type: object
|
||||
properties:
|
||||
name:
|
||||
type: string
|
||||
port:
|
||||
type: integer
|
||||
protocol:
|
||||
type: string
|
||||
status:
|
||||
type: object
|
||||
properties:
|
||||
|
|
|
@ -281,29 +281,9 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
|
|||
return
|
||||
|
||||
if loadbalancer_crd is None:
|
||||
loadbalancer_crd = self._create_crd_spec(endpoints)
|
||||
self._create_crd_spec(endpoints)
|
||||
else:
|
||||
loadbalancer_crd = self._update_crd_spec(loadbalancer_crd,
|
||||
endpoints)
|
||||
|
||||
def _has_lbaas_spec_changes(self, endpoints, loadbalancer_crd):
|
||||
return (self._has_ip_changes(endpoints, loadbalancer_crd) or
|
||||
utils.has_port_changes(endpoints, loadbalancer_crd))
|
||||
|
||||
def _has_ip_changes(self, endpoints, loadbalancer_crd):
|
||||
link = endpoints['metadata']['selfLink']
|
||||
endpoint_ip = endpoints['subsets']['addresses'].get('ip')
|
||||
endpoint_crd_ip = loadbalancer_crd['spec'].get('ip')
|
||||
|
||||
if endpoint_crd_ip != endpoint_ip:
|
||||
LOG.debug("LBaaS spec IP %(endpoint_crd_ip)s !="
|
||||
" %(endpoint_ip)s for %(link)s"
|
||||
% {'endpoint_crd_ip': endpoint_crd_ip,
|
||||
'endpoint_ip': endpoint_ip,
|
||||
'link': link})
|
||||
return True
|
||||
|
||||
return False
|
||||
self._update_crd_spec(loadbalancer_crd, endpoints)
|
||||
|
||||
def _has_pods(self, endpoints):
|
||||
ep_subsets = endpoints.get('subsets', [])
|
||||
|
@ -314,17 +294,48 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
|
|||
for address in subset.get('addresses', [])
|
||||
if address.get('targetRef', {}).get('kind') == 'Pod')
|
||||
|
||||
def _convert_subsets_to_endpointslice(self, endpoints_obj):
|
||||
endpointslices = []
|
||||
endpoints = []
|
||||
subsets = endpoints_obj.get('subsets', [])
|
||||
for subset in subsets:
|
||||
addresses = subset.get('addresses', [])
|
||||
ports = subset.get('ports', [])
|
||||
for address in addresses:
|
||||
ip = address.get('ip')
|
||||
targetRef = address.get('targetRef')
|
||||
endpoint = {
|
||||
'addresses': [ip],
|
||||
'conditions': {
|
||||
'ready': True
|
||||
},
|
||||
'targetRef': targetRef
|
||||
}
|
||||
endpoints.append(endpoint)
|
||||
endpointslices.append({
|
||||
'endpoints': endpoints,
|
||||
'ports': ports,
|
||||
})
|
||||
|
||||
return endpointslices
|
||||
|
||||
def _create_crd_spec(self, endpoints, spec=None, status=None):
|
||||
endpoints_name = endpoints['metadata']['name']
|
||||
namespace = endpoints['metadata']['namespace']
|
||||
kubernetes = clients.get_kubernetes_client()
|
||||
|
||||
subsets = endpoints.get('subsets', [])
|
||||
# TODO(maysams): Remove the convertion once we start handling
|
||||
# Endpoint slices.
|
||||
epslices = self._convert_subsets_to_endpointslice(endpoints)
|
||||
if not status:
|
||||
status = {}
|
||||
if not spec:
|
||||
spec = {'subsets': subsets}
|
||||
spec = {'endpointSlices': epslices}
|
||||
|
||||
# NOTE(maysams): As the spec may already contain a
|
||||
# ports field from the Service, a new endpointslice
|
||||
# field is introduced to also hold ports from the
|
||||
# Endpoints under the spec.
|
||||
loadbalancer_crd = {
|
||||
'apiVersion': 'openstack.org/v1',
|
||||
'kind': 'KuryrLoadBalancer',
|
||||
|
@ -346,17 +357,17 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
|
|||
"kuryrloadbalancer CRD. %s" %
|
||||
k_exc.K8sClientException)
|
||||
raise
|
||||
return loadbalancer_crd
|
||||
|
||||
def _update_crd_spec(self, loadbalancer_crd, endpoints):
|
||||
kubernetes = clients.get_kubernetes_client()
|
||||
subsets = endpoints.get('subsets')
|
||||
lbaas_update_crd = {
|
||||
'subsets': subsets
|
||||
}
|
||||
# TODO(maysams): Remove the convertion once we start handling
|
||||
# Endpoint slices.
|
||||
epslices = self._convert_subsets_to_endpointslice(endpoints)
|
||||
try:
|
||||
kubernetes.patch_crd('spec', loadbalancer_crd['metadata'][
|
||||
'selfLink'], lbaas_update_crd)
|
||||
kubernetes.patch_crd(
|
||||
'spec',
|
||||
loadbalancer_crd['metadata']['selfLink'],
|
||||
{'endpointSlices': epslices})
|
||||
except k_exc.K8sResourceNotFound:
|
||||
LOG.debug('KuryrLoadbalancer CRD not found %s', loadbalancer_crd)
|
||||
except k_exc.K8sConflict:
|
||||
|
@ -366,19 +377,6 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
|
|||
loadbalancer_crd)
|
||||
raise
|
||||
|
||||
return loadbalancer_crd
|
||||
|
||||
def _get_service_link(self, endpoints):
|
||||
ep_link = endpoints['metadata']['selfLink']
|
||||
link_parts = ep_link.split('/')
|
||||
|
||||
if link_parts[-2] != 'endpoints':
|
||||
raise k_exc.IntegrityError(_(
|
||||
"Unsupported endpoints link: %(link)s") % {
|
||||
'link': ep_link})
|
||||
link_parts[-2] = 'services'
|
||||
return "/".join(link_parts)
|
||||
|
||||
def _move_annotations_to_crd(self, endpoints):
|
||||
"""Support upgrade from annotations to KuryrLoadBalancer CRD."""
|
||||
try:
|
||||
|
|
|
@ -117,13 +117,13 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
|||
return not(self._has_pods(loadbalancer_crd))
|
||||
|
||||
def _has_pods(self, loadbalancer_crd):
|
||||
ep_subsets = loadbalancer_crd['spec'].get('subsets', [])
|
||||
if not ep_subsets:
|
||||
ep_slices = loadbalancer_crd['spec'].get('endpointSlices', [])
|
||||
if not ep_slices:
|
||||
return False
|
||||
return any(True
|
||||
for subset in ep_subsets
|
||||
for address in subset.get('addresses', [])
|
||||
if address['targetRef'].get('kind', []) == 'Pod')
|
||||
for ep_slice in ep_slices
|
||||
for endpoint in ep_slice.get('endpoints', [])
|
||||
if endpoint['targetRef'].get('kind', []) == 'Pod')
|
||||
|
||||
def on_finalize(self, loadbalancer_crd):
|
||||
LOG.debug("Deleting the loadbalancer CRD")
|
||||
|
@ -263,25 +263,25 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
|||
except KeyError:
|
||||
continue
|
||||
|
||||
current_targets = {(str(m['ip']), m['port'], m['pool_id'])
|
||||
current_targets = [(str(m['ip']), m['port'], m['pool_id'])
|
||||
for m in loadbalancer_crd['status'].get(
|
||||
'members', [])}
|
||||
'members', [])]
|
||||
|
||||
for subset in loadbalancer_crd['spec']['subsets']:
|
||||
subset_ports = subset.get('ports', [])
|
||||
for subset_address in subset.get('addresses', []):
|
||||
for ep_slice in loadbalancer_crd['spec']['endpointSlices']:
|
||||
ep_slices_ports = ep_slice.get('ports', [])
|
||||
for endpoint in ep_slice.get('endpoints', []):
|
||||
try:
|
||||
target_ip = subset_address['ip']
|
||||
target_ref = subset_address['targetRef']
|
||||
target_ip = endpoint['addresses'][0]
|
||||
target_ref = endpoint['targetRef']
|
||||
if target_ref['kind'] != k_const.K8S_OBJ_POD:
|
||||
continue
|
||||
except KeyError:
|
||||
continue
|
||||
if not pool_by_tgt_name:
|
||||
continue
|
||||
for subset_port in subset_ports:
|
||||
target_port = subset_port['port']
|
||||
port_name = subset_port.get('name')
|
||||
for ep_slice_port in ep_slices_ports:
|
||||
target_port = ep_slice_port['port']
|
||||
port_name = ep_slice_port.get('name')
|
||||
try:
|
||||
pool = pool_by_tgt_name[port_name]
|
||||
except KeyError:
|
||||
|
@ -392,14 +392,16 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
|
|||
port['name'] = None
|
||||
spec_ports[port['name']] = pool['id']
|
||||
|
||||
subsets = loadbalancer_crd['spec'].get('subsets')
|
||||
current_targets = {(a['ip'], a.get('targetRef', {}).get('name', ''),
|
||||
p['port'], spec_ports.get(p.get('name')))
|
||||
for s in subsets
|
||||
for a in s['addresses']
|
||||
for p in s['ports']
|
||||
if p.get('name') in spec_ports}
|
||||
|
||||
ep_slices = loadbalancer_crd['spec'].get('endpointSlices')
|
||||
# NOTE(maysams): As we don't support dual-stack, we assume
|
||||
# only one address is possible on the addresses field.
|
||||
current_targets = [(ep['addresses'][0],
|
||||
ep.get('targetRef', {}).get('name', ''),
|
||||
p['port'], spec_ports.get(p.get('name')))
|
||||
for ep_slice in ep_slices
|
||||
for ep in ep_slice['endpoints']
|
||||
for p in ep_slice['ports']
|
||||
if p.get('name') in spec_ports]
|
||||
removed_ids = set()
|
||||
|
||||
for member in loadbalancer_crd['status'].get('members', []):
|
||||
|
|
|
@ -57,12 +57,11 @@ def get_lb_crd():
|
|||
"31d7b8c2-75f1-4125-9565-8c15c5cf046c"
|
||||
],
|
||||
"subnet_id": "123456789120",
|
||||
"subsets": [
|
||||
"endpointSlices": [
|
||||
{
|
||||
"addresses": [
|
||||
"endpoints": [
|
||||
{
|
||||
"ip": "1.1.1.1",
|
||||
"nodeName": "sarka-devstack",
|
||||
"addresses": ["1.1.1.1"],
|
||||
"targetRef": {
|
||||
"kind": "Pod",
|
||||
"name": "test-f87976f9c-thjbk",
|
||||
|
|
Loading…
Reference in New Issue