Merge "Add support to Endpoints without targetRef"

This commit is contained in:
Zuul 2020-11-25 11:05:05 +00:00 committed by Gerrit Code Review
commit 8ceb35d56f
5 changed files with 204 additions and 72 deletions

View File

@ -90,14 +90,16 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
def _should_ignore(self, service):
if not self._has_clusterip(service):
return 'Skipping headless Service %s.'
elif not self._is_supported_type(service):
if not self._is_supported_type(service):
return 'Skipping service %s of unsupported type.'
elif self._has_spec_annotation(service):
if self._has_spec_annotation(service):
return ('Skipping annotated service %s, waiting for it to be '
'converted to KuryrLoadBalancer object and annotation '
'removed.')
else:
return None
if utils.is_kubernetes_default_resource(service):
# Avoid to handle default Kubernetes service as requires https.
return 'Skipping default service %s.'
return None
def _patch_service_finalizer(self, service):
k8s = clients.get_kubernetes_client()
@ -257,6 +259,8 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
config.CONF.kubernetes.endpoints_driver_octavia_provider)
def on_present(self, endpoints):
ep_name = endpoints['metadata']['name']
ep_namespace = endpoints['metadata']['namespace']
if self._move_annotations_to_crd(endpoints):
return
@ -266,7 +270,8 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
if (not (self._has_pods(endpoints) or (loadbalancer_crd and
loadbalancer_crd.get('status')))
or k_const.K8S_ANNOTATION_HEADLESS_SERVICE
in endpoints['metadata'].get('labels', [])):
in endpoints['metadata'].get('labels', []) or
utils.is_kubernetes_default_resource(endpoints)):
LOG.debug("Ignoring Kubernetes endpoints %s",
endpoints['metadata']['name'])
return
@ -277,8 +282,7 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
except k_exc.K8sNamespaceTerminating:
LOG.warning('Namespace %s is being terminated, ignoring '
'Endpoints %s in that namespace.',
endpoints['metadata']['namespace'],
endpoints['metadata']['name'])
ep_namespace, ep_name)
return
else:
self._update_crd_spec(loadbalancer_crd, endpoints)
@ -289,8 +293,7 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
return False
return any(True
for subset in ep_subsets
for address in subset.get('addresses', [])
if address.get('targetRef', {}).get('kind') == 'Pod')
if subset.get('addresses', []))
def _convert_subsets_to_endpointslice(self, endpoints_obj):
endpointslices = []
@ -307,8 +310,9 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
'conditions': {
'ready': True
},
'targetRef': targetRef
}
if targetRef:
endpoint['targetRef'] = targetRef
endpoints.append(endpoint)
endpointslices.append({
'endpoints': endpoints,

View File

@ -26,6 +26,7 @@ from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__)
CONF = config.CONF
OCTAVIA_DEFAULT_PROVIDERS = ['octavia', 'amphora']
@ -49,10 +50,8 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
self._drv_service_pub_ip = drv_base.ServicePubIpDriver.get_instance()
self._drv_svc_project = drv_base.ServiceProjectDriver.get_instance()
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()
# Note(yboaron) LBaaS driver supports 'provider' parameter in
# Load Balancer creation flow.
# We need to set the requested load balancer provider
# according to 'endpoints_driver_octavia_provider' configuration.
self._nodes_subnet = utils.get_subnet_cidr(
CONF.pod_vif_nested.worker_nodes_subnet)
def on_present(self, loadbalancer_crd):
if self._should_ignore(loadbalancer_crd):
@ -111,17 +110,15 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
raise
def _should_ignore(self, loadbalancer_crd):
return not(self._has_pods(loadbalancer_crd) or
loadbalancer_crd.get('status'))
return (not(self._has_endpoints(loadbalancer_crd) or
loadbalancer_crd.get('status')) or not
loadbalancer_crd['spec'].get('ip'))
def _has_pods(self, loadbalancer_crd):
def _has_endpoints(self, loadbalancer_crd):
ep_slices = loadbalancer_crd['spec'].get('endpointSlices', [])
if not ep_slices:
return False
return any(True
for ep_slice in ep_slices
for endpoint in ep_slice.get('endpoints', [])
if endpoint['targetRef'].get('kind', []) == 'Pod')
return True
def on_finalize(self, loadbalancer_crd):
LOG.debug("Deleting the loadbalancer CRD")
@ -178,7 +175,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
if self._sync_lbaas_pools(loadbalancer_crd):
changed = True
if (self._has_pods(loadbalancer_crd) and
if (self._has_endpoints(loadbalancer_crd) and
self._add_new_members(loadbalancer_crd)):
changed = True
@ -258,9 +255,18 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
for endpoint in ep_slice.get('endpoints', []):
try:
target_ip = endpoint['addresses'][0]
target_ref = endpoint['targetRef']
if target_ref['kind'] != k_const.K8S_OBJ_POD:
continue
target_ref = endpoint.get('targetRef')
target_namespace = None
if target_ref:
target_namespace = target_ref['namespace']
# Avoid to point to a Pod on hostNetwork
# that isn't the one to be added as Member.
if not target_ref and utils.is_ip_on_subnet(
self._nodes_subnet, target_ip):
target_pod = {}
else:
target_pod = utils.get_pod_by_ip(
target_ip, target_namespace)
except KeyError:
continue
if not pool_by_tgt_name:
@ -276,25 +282,18 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
if (target_ip, target_port, pool['id']) in current_targets:
continue
# TODO(apuimedo): Do not pass subnet_id at all when in
# L3 mode once old neutron-lbaasv2 is not supported, as
# octavia does not require it
if (config.CONF.octavia_defaults.member_mode ==
k_const.OCTAVIA_L2_MEMBER_MODE):
try:
member_subnet_id = self._get_pod_subnet(target_ref,
target_ip)
except k_exc.K8sResourceNotFound:
LOG.debug("Member namespace has been deleted. No "
"need to add the members as it is "
"going to be deleted")
continue
else:
# We use the service subnet id so that the connectivity
# from VIP to pods happens in layer 3 mode, i.e.,
# routed.
member_subnet_id = loadbalancer_crd['status'][
'loadbalancer']['subnet_id']
member_subnet_id = self._get_subnet_by_octavia_mode(
target_pod, target_ip, loadbalancer_crd)
if not member_subnet_id:
LOG.warning("Skipping member creation for %s",
target_ip)
continue
target_name, target_namespace = self._get_target_info(
target_ref, loadbalancer_crd)
first_member_of_the_pool = True
for member in loadbalancer_crd['status'].get(
'members', []):
@ -313,8 +312,8 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
subnet_id=member_subnet_id,
ip=target_ip,
port=target_port,
target_ref_namespace=target_ref['namespace'],
target_ref_name=target_ref['name'],
target_ref_namespace=target_namespace,
target_ref_name=target_name,
listener_port=listener_port)
if not member:
continue
@ -341,11 +340,35 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
changed = True
return changed
def _get_pod_subnet(self, target_ref, ip):
# REVISIT(ivc): consider using true pod object instead
pod = {'kind': target_ref['kind'],
'metadata': {'name': target_ref['name'],
'namespace': target_ref['namespace']}}
def _get_target_info(self, target_ref, loadbalancer_crd):
if target_ref:
target_namespace = target_ref['namespace']
target_name = target_ref['name']
else:
target_namespace = loadbalancer_crd['metadata']['namespace']
target_name = loadbalancer_crd['metadata']['name']
return target_name, target_namespace
def _get_subnet_by_octavia_mode(self, target_pod, target_ip, lb_crd):
# TODO(apuimedo): Do not pass subnet_id at all when in
# L3 mode once old neutron-lbaasv2 is not supported, as
# octavia does not require it
subnet_id = None
if (CONF.octavia_defaults.member_mode ==
k_const.OCTAVIA_L2_MEMBER_MODE):
if target_pod:
subnet_id = self._get_pod_subnet(
target_pod, target_ip)
elif utils.is_ip_on_subnet(self._nodes_subnet, target_ip):
subnet_id = CONF.pod_vif_nested.worker_nodes_subnet
else:
# We use the service subnet id so that the connectivity
# from VIP to pods happens in layer 3 mode, i.e.,
# routed.
subnet_id = lb_crd['status']['loadbalancer']['subnet_id']
return subnet_id
def _get_pod_subnet(self, pod, ip):
project_id = self._drv_pod_project.get_project(pod)
subnets_map = self._drv_pod_subnets.get_subnets(pod, project_id)
subnet_ids = [subnet_id for subnet_id, network in subnets_map.items()
@ -371,6 +394,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
return None
def _remove_unused_members(self, loadbalancer_crd):
lb_crd_name = loadbalancer_crd['metadata']['name']
spec_ports = {}
pools = loadbalancer_crd['status'].get('pools', [])
for pool in pools:
@ -380,26 +404,24 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
port['name'] = None
spec_ports[port['name']] = pool['id']
ep_slices = loadbalancer_crd['spec'].get('endpointSlices')
# NOTE(maysams): As we don't support dual-stack, we assume
# only one address is possible on the addresses field.
current_targets = [(ep['addresses'][0],
ep.get('targetRef', {}).get('name', ''),
p['port'], spec_ports.get(p.get('name')))
ep_slices = loadbalancer_crd['spec'].get('endpointSlices', [])
current_targets = [utils.get_current_endpoints_target(
ep, p, spec_ports, lb_crd_name)
for ep_slice in ep_slices
for ep in ep_slice['endpoints']
for p in ep_slice['ports']
if p.get('name') in spec_ports]
removed_ids = set()
removed_ids = set()
for member in loadbalancer_crd['status'].get('members', []):
member_name = member.get('name', '')
try:
member_name = member['name']
# NOTE: The member name is compose of:
# NAMESPACE_NAME/POD_NAME:PROTOCOL_PORT
pod_name = member_name.split('/')[1].split(':')[0]
except AttributeError:
pod_name = ""
if ((str(member['ip']), pod_name, member['port'], member[
'pool_id']) in current_targets):
continue

View File

@ -192,6 +192,7 @@ class FakeLBaaSDriver(drv_base.LBaaSDriver):
class TestKuryrLoadBalancerHandler(test_base.TestCase):
@mock.patch('kuryr_kubernetes.utils.get_subnet_cidr')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceProjectDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
@ -207,10 +208,11 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
'.LBaaSDriver.get_instance')
def test_init(self, m_get_drv_lbaas, m_get_drv_project,
m_get_drv_subnets, m_get_drv_service_pub_ip, m_cfg,
m_get_svc_sg_drv, m_get_svc_drv_project):
m_get_svc_sg_drv, m_get_svc_drv_project, m_get_cidr):
m_get_drv_lbaas.return_value = mock.sentinel.drv_lbaas
m_get_drv_project.return_value = mock.sentinel.drv_project
m_get_drv_subnets.return_value = mock.sentinel.drv_subnets
m_get_cidr.return_value = '10.0.0.128/26'
m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip
m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project
m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg
@ -221,6 +223,7 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets)
self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip)
@mock.patch('kuryr_kubernetes.utils.get_subnet_cidr')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceProjectDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
@ -236,8 +239,9 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
'.LBaaSDriver.get_instance')
def test_init_provider_ovn(self, m_get_drv_lbaas, m_get_drv_project,
m_get_drv_subnets, m_get_drv_service_pub_ip,
m_cfg,
m_get_svc_sg_drv, m_get_svc_drv_project):
m_cfg, m_get_svc_sg_drv, m_get_svc_drv_project,
m_get_cidr):
m_get_cidr.return_value = '10.0.0.128/26'
m_get_drv_lbaas.return_value = mock.sentinel.drv_lbaas
m_get_drv_project.return_value = mock.sentinel.drv_project
m_get_drv_subnets.return_value = mock.sentinel.drv_subnets
@ -343,32 +347,32 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
def test_should_ignore(self):
m_handler = mock.Mock(spec=h_lb.KuryrLoadBalancerHandler)
m_handler._has_pods.return_value = True
loadbalancer_crd = get_lb_crd()
loadbalancer_crd['status'] = {}
m_handler._has_endpoints.return_value = True
ret = h_lb.KuryrLoadBalancerHandler._should_ignore(
m_handler, loadbalancer_crd)
self.assertEqual(False, ret)
m_handler._has_pods.assert_called_once_with(loadbalancer_crd)
m_handler._has_endpoints.assert_called_once_with(loadbalancer_crd)
def test_should_ignore_member_scale_to_0(self):
m_handler = mock.Mock(spec=h_lb.KuryrLoadBalancerHandler)
m_handler._has_pods.return_value = False
m_handler._has_endpoints.return_value = False
loadbalancer_crd = get_lb_crd()
ret = h_lb.KuryrLoadBalancerHandler._should_ignore(
m_handler, loadbalancer_crd)
self.assertEqual(False, ret)
m_handler._has_pods.assert_called_once_with(loadbalancer_crd)
m_handler._has_endpoints.assert_called_once_with(loadbalancer_crd)
def test_has_pods(self):
def test_has_endpoints(self):
crd = get_lb_crd()
m_handler = mock.Mock(spec=h_lb.KuryrLoadBalancerHandler)
ret = h_lb.KuryrLoadBalancerHandler._has_pods(m_handler, crd)
ret = h_lb.KuryrLoadBalancerHandler._has_endpoints(m_handler, crd)
self.assertEqual(True, ret)
@ -422,6 +426,7 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
for member in crd['status']['members'])
return observed_targets
@mock.patch('kuryr_kubernetes.utils.get_subnet_cidr')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
@ -435,8 +440,9 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
'.LBaaSDriver.get_instance')
def test_sync_lbaas_members(self, m_get_drv_lbaas, m_get_drv_project,
m_get_drv_subnets, m_k8s, m_svc_project_drv,
m_svc_sg_drv):
m_svc_sg_drv, m_get_cidr):
# REVISIT(ivc): test methods separately and verify ensure/release
m_get_cidr.return_value = '10.0.0.128/26'
project_id = str(uuid.uuid4())
subnet_id = str(uuid.uuid4())
expected_ip = '1.2.3.4'
@ -453,6 +459,7 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
self.assertEqual(sorted(expected_targets.items()), observed_targets)
self.assertEqual(expected_ip, str(crd['status']['loadbalancer']['ip']))
@mock.patch('kuryr_kubernetes.utils.get_subnet_cidr')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
@ -466,8 +473,10 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
'.LBaaSDriver.get_instance')
def test_sync_lbaas_members_udp(self, m_get_drv_lbaas,
m_get_drv_project, m_get_drv_subnets,
m_k8s, m_svc_project_drv, m_svc_sg_drv):
m_k8s, m_svc_project_drv, m_svc_sg_drv,
m_get_cidr):
# REVISIT(ivc): test methods separately and verify ensure/release
m_get_cidr.return_value = '10.0.0.128/26'
project_id = str(uuid.uuid4())
subnet_id = str(uuid.uuid4())
expected_ip = "1.2.3.4"
@ -485,6 +494,7 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
self.assertEqual(sorted(expected_targets.items()), observed_targets)
self.assertEqual(expected_ip, str(crd['status']['loadbalancer']['ip']))
@mock.patch('kuryr_kubernetes.utils.get_subnet_cidr')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
@ -498,8 +508,9 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
'.LBaaSDriver.get_instance')
def test_sync_lbaas_members_svc_listener_port_edit(
self, m_get_drv_lbaas, m_get_drv_project, m_get_drv_subnets,
m_k8s, m_svc_project_drv, m_svc_sg_drv):
m_k8s, m_svc_project_drv, m_svc_sg_drv, m_get_cidr):
# REVISIT(ivc): test methods separately and verify ensure/release
m_get_cidr.return_value = '10.0.0.128/26'
project_id = str(uuid.uuid4())
subnet_id = str(uuid.uuid4())
expected_ip = '1.2.3.4'
@ -523,6 +534,7 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
self.assertEqual(expected_ip, str(crd['status']['loadbalancer']['ip']))
@mock.patch('kuryr_kubernetes.utils.get_subnet_cidr')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
@ -537,7 +549,8 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
def test_add_new_members_udp(self, m_get_drv_lbaas,
m_get_drv_project, m_get_drv_subnets,
m_k8s, m_svc_project_drv,
m_svc_sg_drv):
m_svc_sg_drv, m_get_cidr):
m_get_cidr.return_value = '10.0.0.128/26'
project_id = str(uuid.uuid4())
subnet_id = str(uuid.uuid4())
crd = get_lb_crd()

View File

@ -351,3 +351,27 @@ class TestUtils(test_base.TestCase):
self.assertRaises(os_exc.ResourceNotFound, utils.get_subnet_cidr,
subnet_id)
os_net.get_subnet.assert_called_once_with(subnet_id)
def test_get_current_endpoints_target_with_target_ref(self):
ep = {'addresses': ['10.0.2.107'], 'conditions': {'ready': True},
'targetRef': {'kind': 'Pod', 'name': 'test-868d9cbd68-xq2fl',
'namespace': 'test2'}}
port = {'port': 8080, 'protocol': 'TCP'}
spec_ports = {None: '31d59e41-05db-4a39-8aca-6a9a572c83cd'}
ep_name = 'test'
target = utils.get_current_endpoints_target(
ep, port, spec_ports, ep_name)
self.assertEqual(
target, ('10.0.2.107', 'test-868d9cbd68-xq2fl', 8080,
'31d59e41-05db-4a39-8aca-6a9a572c83cd'))
def test_get_current_endpoints_target_without_target_ref(self):
ep = {'addresses': ['10.0.1.208'], 'conditions': {'ready': True}}
port = {'port': 8080, 'protocol': 'TCP'}
spec_ports = {None: '4472fab1-f01c-46a7-b197-5cba4f2d7135'}
ep_name = 'test'
target = utils.get_current_endpoints_target(
ep, port, spec_ports, ep_name)
self.assertEqual(
target, ('10.0.1.208', 'test', 8080,
'4472fab1-f01c-46a7-b197-5cba4f2d7135'))

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import ipaddress
import random
import socket
import time
@ -441,3 +442,71 @@ def clean_lb_crd_status(loadbalancer_name):
LOG.exception('Error updating KuryrLoadbalancer CRD %s',
name)
raise
def is_kubernetes_default_resource(obj):
"""Check if Object is a resource associated to the API
Verifies if the Object is on the default namespace
and has the name kubernetes. Those name and namespace
are given to Kubernetes Service and Endpoints for the API.
:param obj: Kubernetes object dict
:returns: True if is default resource for the API, false
otherwise.
"""
return (obj['metadata']['name'] == 'kubernetes' and
obj['metadata']['namespace'] == 'default')
def get_pod_by_ip(pod_ip, namespace=None):
k8s = clients.get_kubernetes_client()
pod = {}
try:
if namespace:
pods = k8s.get(f'{constants.K8S_API_BASE}/namespaces/{namespace}/'
f'pods?fieldSelector=status.phase=Running,'
f'status.podIP={pod_ip}')
else:
pods = k8s.get(f'{constants.K8S_API_BASE}/'
f'pods?fieldSelector=status.phase=Running,'
f'status.podIP={pod_ip}')
except exceptions.K8sClientException:
LOG.exception('Error retrieving Pod with IP %s', pod_ip)
raise
if pods.get('items'):
# Only one Pod should have the IP
return pods['items'][0]
return pod
def get_current_endpoints_target(ep, port, spec_ports, ep_name):
"""Retrieve details about one specific Endpoint target
Defines the details about the Endpoint target, such as the
target address, name, port value and the Pool ID. In case,
the Endpoints has no targetRef defined, the name of the
target will be the same as the Endpoint.
:param ep: Endpoint on the Endpoints object
:param port: Endpoint port
:param spec_ports: dict of port name associated to pool ID
:param ep_name: Name of the Endpoints object
:returns: Tuple with target address, target name, port number
and pool ID.
"""
target_ref = ep.get('targetRef', {})
pod_name = ep_name
# NOTE(maysams): As we don't support dual-stack, we assume
# only one address is possible on the addresses field.
address = ep['addresses'][0]
if target_ref:
pod_name = target_ref.get('name', '')
return (address, pod_name, port['port'],
spec_ports.get(port.get('name')))
def is_ip_on_subnet(nodes_subnet, target_ip):
return (nodes_subnet and
(ipaddress.ip_address(target_ip) in
ipaddress.ip_network(nodes_subnet)))