Add support to Endpoints without targetRef

The targetRef field is optional for the Endpoints object,
and when service without selectors are created that field
is more likely to not be specified. This commit ensures
Kuryr properly wires Endpoints without targetRef.

Change-Id: Ib43e88aafd9e0907556a0e740990a6acbd173fb0
This commit is contained in:
Maysa Macedo 2020-11-03 17:08:45 +01:00
parent 2fbdba9004
commit 27876a586f
5 changed files with 204 additions and 72 deletions

View File

@ -90,14 +90,16 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
def _should_ignore(self, service): def _should_ignore(self, service):
if not self._has_clusterip(service): if not self._has_clusterip(service):
return 'Skipping headless Service %s.' return 'Skipping headless Service %s.'
elif not self._is_supported_type(service): if not self._is_supported_type(service):
return 'Skipping service %s of unsupported type.' return 'Skipping service %s of unsupported type.'
elif self._has_spec_annotation(service): if self._has_spec_annotation(service):
return ('Skipping annotated service %s, waiting for it to be ' return ('Skipping annotated service %s, waiting for it to be '
'converted to KuryrLoadBalancer object and annotation ' 'converted to KuryrLoadBalancer object and annotation '
'removed.') 'removed.')
else: if utils.is_kubernetes_default_resource(service):
return None # Avoid to handle default Kubernetes service as requires https.
return 'Skipping default service %s.'
return None
def _patch_service_finalizer(self, service): def _patch_service_finalizer(self, service):
k8s = clients.get_kubernetes_client() k8s = clients.get_kubernetes_client()
@ -257,6 +259,8 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
config.CONF.kubernetes.endpoints_driver_octavia_provider) config.CONF.kubernetes.endpoints_driver_octavia_provider)
def on_present(self, endpoints): def on_present(self, endpoints):
ep_name = endpoints['metadata']['name']
ep_namespace = endpoints['metadata']['namespace']
if self._move_annotations_to_crd(endpoints): if self._move_annotations_to_crd(endpoints):
return return
@ -266,7 +270,8 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
if (not (self._has_pods(endpoints) or (loadbalancer_crd and if (not (self._has_pods(endpoints) or (loadbalancer_crd and
loadbalancer_crd.get('status'))) loadbalancer_crd.get('status')))
or k_const.K8S_ANNOTATION_HEADLESS_SERVICE or k_const.K8S_ANNOTATION_HEADLESS_SERVICE
in endpoints['metadata'].get('labels', [])): in endpoints['metadata'].get('labels', []) or
utils.is_kubernetes_default_resource(endpoints)):
LOG.debug("Ignoring Kubernetes endpoints %s", LOG.debug("Ignoring Kubernetes endpoints %s",
endpoints['metadata']['name']) endpoints['metadata']['name'])
return return
@ -277,8 +282,7 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
except k_exc.K8sNamespaceTerminating: except k_exc.K8sNamespaceTerminating:
LOG.warning('Namespace %s is being terminated, ignoring ' LOG.warning('Namespace %s is being terminated, ignoring '
'Endpoints %s in that namespace.', 'Endpoints %s in that namespace.',
endpoints['metadata']['namespace'], ep_namespace, ep_name)
endpoints['metadata']['name'])
return return
else: else:
self._update_crd_spec(loadbalancer_crd, endpoints) self._update_crd_spec(loadbalancer_crd, endpoints)
@ -289,8 +293,7 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
return False return False
return any(True return any(True
for subset in ep_subsets for subset in ep_subsets
for address in subset.get('addresses', []) if subset.get('addresses', []))
if address.get('targetRef', {}).get('kind') == 'Pod')
def _convert_subsets_to_endpointslice(self, endpoints_obj): def _convert_subsets_to_endpointslice(self, endpoints_obj):
endpointslices = [] endpointslices = []
@ -307,8 +310,9 @@ class EndpointsHandler(k8s_base.ResourceEventHandler):
'conditions': { 'conditions': {
'ready': True 'ready': True
}, },
'targetRef': targetRef
} }
if targetRef:
endpoint['targetRef'] = targetRef
endpoints.append(endpoint) endpoints.append(endpoint)
endpointslices.append({ endpointslices.append({
'endpoints': endpoints, 'endpoints': endpoints,

View File

@ -26,6 +26,7 @@ from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes import utils from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = config.CONF
OCTAVIA_DEFAULT_PROVIDERS = ['octavia', 'amphora'] OCTAVIA_DEFAULT_PROVIDERS = ['octavia', 'amphora']
@ -49,10 +50,8 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
self._drv_service_pub_ip = drv_base.ServicePubIpDriver.get_instance() self._drv_service_pub_ip = drv_base.ServicePubIpDriver.get_instance()
self._drv_svc_project = drv_base.ServiceProjectDriver.get_instance() self._drv_svc_project = drv_base.ServiceProjectDriver.get_instance()
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance() self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()
# Note(yboaron) LBaaS driver supports 'provider' parameter in self._nodes_subnet = utils.get_subnet_cidr(
# Load Balancer creation flow. CONF.pod_vif_nested.worker_nodes_subnet)
# We need to set the requested load balancer provider
# according to 'endpoints_driver_octavia_provider' configuration.
def on_present(self, loadbalancer_crd): def on_present(self, loadbalancer_crd):
if self._should_ignore(loadbalancer_crd): if self._should_ignore(loadbalancer_crd):
@ -111,17 +110,15 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
raise raise
def _should_ignore(self, loadbalancer_crd): def _should_ignore(self, loadbalancer_crd):
return not(self._has_pods(loadbalancer_crd) or return (not(self._has_endpoints(loadbalancer_crd) or
loadbalancer_crd.get('status')) loadbalancer_crd.get('status')) or not
loadbalancer_crd['spec'].get('ip'))
def _has_pods(self, loadbalancer_crd): def _has_endpoints(self, loadbalancer_crd):
ep_slices = loadbalancer_crd['spec'].get('endpointSlices', []) ep_slices = loadbalancer_crd['spec'].get('endpointSlices', [])
if not ep_slices: if not ep_slices:
return False return False
return any(True return True
for ep_slice in ep_slices
for endpoint in ep_slice.get('endpoints', [])
if endpoint['targetRef'].get('kind', []) == 'Pod')
def on_finalize(self, loadbalancer_crd): def on_finalize(self, loadbalancer_crd):
LOG.debug("Deleting the loadbalancer CRD") LOG.debug("Deleting the loadbalancer CRD")
@ -178,7 +175,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
if self._sync_lbaas_pools(loadbalancer_crd): if self._sync_lbaas_pools(loadbalancer_crd):
changed = True changed = True
if (self._has_pods(loadbalancer_crd) and if (self._has_endpoints(loadbalancer_crd) and
self._add_new_members(loadbalancer_crd)): self._add_new_members(loadbalancer_crd)):
changed = True changed = True
@ -258,9 +255,18 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
for endpoint in ep_slice.get('endpoints', []): for endpoint in ep_slice.get('endpoints', []):
try: try:
target_ip = endpoint['addresses'][0] target_ip = endpoint['addresses'][0]
target_ref = endpoint['targetRef'] target_ref = endpoint.get('targetRef')
if target_ref['kind'] != k_const.K8S_OBJ_POD: target_namespace = None
continue if target_ref:
target_namespace = target_ref['namespace']
# Avoid to point to a Pod on hostNetwork
# that isn't the one to be added as Member.
if not target_ref and utils.is_ip_on_subnet(
self._nodes_subnet, target_ip):
target_pod = {}
else:
target_pod = utils.get_pod_by_ip(
target_ip, target_namespace)
except KeyError: except KeyError:
continue continue
if not pool_by_tgt_name: if not pool_by_tgt_name:
@ -276,25 +282,18 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
if (target_ip, target_port, pool['id']) in current_targets: if (target_ip, target_port, pool['id']) in current_targets:
continue continue
# TODO(apuimedo): Do not pass subnet_id at all when in
# L3 mode once old neutron-lbaasv2 is not supported, as member_subnet_id = self._get_subnet_by_octavia_mode(
# octavia does not require it target_pod, target_ip, loadbalancer_crd)
if (config.CONF.octavia_defaults.member_mode ==
k_const.OCTAVIA_L2_MEMBER_MODE): if not member_subnet_id:
try: LOG.warning("Skipping member creation for %s",
member_subnet_id = self._get_pod_subnet(target_ref, target_ip)
target_ip) continue
except k_exc.K8sResourceNotFound:
LOG.debug("Member namespace has been deleted. No " target_name, target_namespace = self._get_target_info(
"need to add the members as it is " target_ref, loadbalancer_crd)
"going to be deleted")
continue
else:
# We use the service subnet id so that the connectivity
# from VIP to pods happens in layer 3 mode, i.e.,
# routed.
member_subnet_id = loadbalancer_crd['status'][
'loadbalancer']['subnet_id']
first_member_of_the_pool = True first_member_of_the_pool = True
for member in loadbalancer_crd['status'].get( for member in loadbalancer_crd['status'].get(
'members', []): 'members', []):
@ -313,8 +312,8 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
subnet_id=member_subnet_id, subnet_id=member_subnet_id,
ip=target_ip, ip=target_ip,
port=target_port, port=target_port,
target_ref_namespace=target_ref['namespace'], target_ref_namespace=target_namespace,
target_ref_name=target_ref['name'], target_ref_name=target_name,
listener_port=listener_port) listener_port=listener_port)
if not member: if not member:
continue continue
@ -341,11 +340,35 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
changed = True changed = True
return changed return changed
def _get_pod_subnet(self, target_ref, ip): def _get_target_info(self, target_ref, loadbalancer_crd):
# REVISIT(ivc): consider using true pod object instead if target_ref:
pod = {'kind': target_ref['kind'], target_namespace = target_ref['namespace']
'metadata': {'name': target_ref['name'], target_name = target_ref['name']
'namespace': target_ref['namespace']}} else:
target_namespace = loadbalancer_crd['metadata']['namespace']
target_name = loadbalancer_crd['metadata']['name']
return target_name, target_namespace
def _get_subnet_by_octavia_mode(self, target_pod, target_ip, lb_crd):
# TODO(apuimedo): Do not pass subnet_id at all when in
# L3 mode once old neutron-lbaasv2 is not supported, as
# octavia does not require it
subnet_id = None
if (CONF.octavia_defaults.member_mode ==
k_const.OCTAVIA_L2_MEMBER_MODE):
if target_pod:
subnet_id = self._get_pod_subnet(
target_pod, target_ip)
elif utils.is_ip_on_subnet(self._nodes_subnet, target_ip):
subnet_id = CONF.pod_vif_nested.worker_nodes_subnet
else:
# We use the service subnet id so that the connectivity
# from VIP to pods happens in layer 3 mode, i.e.,
# routed.
subnet_id = lb_crd['status']['loadbalancer']['subnet_id']
return subnet_id
def _get_pod_subnet(self, pod, ip):
project_id = self._drv_pod_project.get_project(pod) project_id = self._drv_pod_project.get_project(pod)
subnets_map = self._drv_pod_subnets.get_subnets(pod, project_id) subnets_map = self._drv_pod_subnets.get_subnets(pod, project_id)
subnet_ids = [subnet_id for subnet_id, network in subnets_map.items() subnet_ids = [subnet_id for subnet_id, network in subnets_map.items()
@ -371,6 +394,7 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
return None return None
def _remove_unused_members(self, loadbalancer_crd): def _remove_unused_members(self, loadbalancer_crd):
lb_crd_name = loadbalancer_crd['metadata']['name']
spec_ports = {} spec_ports = {}
pools = loadbalancer_crd['status'].get('pools', []) pools = loadbalancer_crd['status'].get('pools', [])
for pool in pools: for pool in pools:
@ -380,26 +404,24 @@ class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
port['name'] = None port['name'] = None
spec_ports[port['name']] = pool['id'] spec_ports[port['name']] = pool['id']
ep_slices = loadbalancer_crd['spec'].get('endpointSlices') ep_slices = loadbalancer_crd['spec'].get('endpointSlices', [])
# NOTE(maysams): As we don't support dual-stack, we assume current_targets = [utils.get_current_endpoints_target(
# only one address is possible on the addresses field. ep, p, spec_ports, lb_crd_name)
current_targets = [(ep['addresses'][0],
ep.get('targetRef', {}).get('name', ''),
p['port'], spec_ports.get(p.get('name')))
for ep_slice in ep_slices for ep_slice in ep_slices
for ep in ep_slice['endpoints'] for ep in ep_slice['endpoints']
for p in ep_slice['ports'] for p in ep_slice['ports']
if p.get('name') in spec_ports] if p.get('name') in spec_ports]
removed_ids = set()
removed_ids = set()
for member in loadbalancer_crd['status'].get('members', []): for member in loadbalancer_crd['status'].get('members', []):
member_name = member.get('name', '')
try: try:
member_name = member['name']
# NOTE: The member name is compose of: # NOTE: The member name is compose of:
# NAMESPACE_NAME/POD_NAME:PROTOCOL_PORT # NAMESPACE_NAME/POD_NAME:PROTOCOL_PORT
pod_name = member_name.split('/')[1].split(':')[0] pod_name = member_name.split('/')[1].split(':')[0]
except AttributeError: except AttributeError:
pod_name = "" pod_name = ""
if ((str(member['ip']), pod_name, member['port'], member[ if ((str(member['ip']), pod_name, member['port'], member[
'pool_id']) in current_targets): 'pool_id']) in current_targets):
continue continue

View File

@ -192,6 +192,7 @@ class FakeLBaaSDriver(drv_base.LBaaSDriver):
class TestKuryrLoadBalancerHandler(test_base.TestCase): class TestKuryrLoadBalancerHandler(test_base.TestCase):
@mock.patch('kuryr_kubernetes.utils.get_subnet_cidr')
@mock.patch('kuryr_kubernetes.controller.drivers.base.' @mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceProjectDriver.get_instance') 'ServiceProjectDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.' @mock.patch('kuryr_kubernetes.controller.drivers.base.'
@ -207,10 +208,11 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
'.LBaaSDriver.get_instance') '.LBaaSDriver.get_instance')
def test_init(self, m_get_drv_lbaas, m_get_drv_project, def test_init(self, m_get_drv_lbaas, m_get_drv_project,
m_get_drv_subnets, m_get_drv_service_pub_ip, m_cfg, m_get_drv_subnets, m_get_drv_service_pub_ip, m_cfg,
m_get_svc_sg_drv, m_get_svc_drv_project): m_get_svc_sg_drv, m_get_svc_drv_project, m_get_cidr):
m_get_drv_lbaas.return_value = mock.sentinel.drv_lbaas m_get_drv_lbaas.return_value = mock.sentinel.drv_lbaas
m_get_drv_project.return_value = mock.sentinel.drv_project m_get_drv_project.return_value = mock.sentinel.drv_project
m_get_drv_subnets.return_value = mock.sentinel.drv_subnets m_get_drv_subnets.return_value = mock.sentinel.drv_subnets
m_get_cidr.return_value = '10.0.0.128/26'
m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip m_get_drv_service_pub_ip.return_value = mock.sentinel.drv_lb_ip
m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project m_get_svc_drv_project.return_value = mock.sentinel.drv_svc_project
m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg m_get_svc_sg_drv.return_value = mock.sentinel.drv_sg
@ -221,6 +223,7 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets) self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets)
self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip) self.assertEqual(mock.sentinel.drv_lb_ip, handler._drv_service_pub_ip)
@mock.patch('kuryr_kubernetes.utils.get_subnet_cidr')
@mock.patch('kuryr_kubernetes.controller.drivers.base.' @mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceProjectDriver.get_instance') 'ServiceProjectDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.' @mock.patch('kuryr_kubernetes.controller.drivers.base.'
@ -236,8 +239,9 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
'.LBaaSDriver.get_instance') '.LBaaSDriver.get_instance')
def test_init_provider_ovn(self, m_get_drv_lbaas, m_get_drv_project, def test_init_provider_ovn(self, m_get_drv_lbaas, m_get_drv_project,
m_get_drv_subnets, m_get_drv_service_pub_ip, m_get_drv_subnets, m_get_drv_service_pub_ip,
m_cfg, m_cfg, m_get_svc_sg_drv, m_get_svc_drv_project,
m_get_svc_sg_drv, m_get_svc_drv_project): m_get_cidr):
m_get_cidr.return_value = '10.0.0.128/26'
m_get_drv_lbaas.return_value = mock.sentinel.drv_lbaas m_get_drv_lbaas.return_value = mock.sentinel.drv_lbaas
m_get_drv_project.return_value = mock.sentinel.drv_project m_get_drv_project.return_value = mock.sentinel.drv_project
m_get_drv_subnets.return_value = mock.sentinel.drv_subnets m_get_drv_subnets.return_value = mock.sentinel.drv_subnets
@ -343,32 +347,32 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
def test_should_ignore(self): def test_should_ignore(self):
m_handler = mock.Mock(spec=h_lb.KuryrLoadBalancerHandler) m_handler = mock.Mock(spec=h_lb.KuryrLoadBalancerHandler)
m_handler._has_pods.return_value = True
loadbalancer_crd = get_lb_crd() loadbalancer_crd = get_lb_crd()
loadbalancer_crd['status'] = {} loadbalancer_crd['status'] = {}
m_handler._has_endpoints.return_value = True
ret = h_lb.KuryrLoadBalancerHandler._should_ignore( ret = h_lb.KuryrLoadBalancerHandler._should_ignore(
m_handler, loadbalancer_crd) m_handler, loadbalancer_crd)
self.assertEqual(False, ret) self.assertEqual(False, ret)
m_handler._has_pods.assert_called_once_with(loadbalancer_crd) m_handler._has_endpoints.assert_called_once_with(loadbalancer_crd)
def test_should_ignore_member_scale_to_0(self): def test_should_ignore_member_scale_to_0(self):
m_handler = mock.Mock(spec=h_lb.KuryrLoadBalancerHandler) m_handler = mock.Mock(spec=h_lb.KuryrLoadBalancerHandler)
m_handler._has_pods.return_value = False m_handler._has_endpoints.return_value = False
loadbalancer_crd = get_lb_crd() loadbalancer_crd = get_lb_crd()
ret = h_lb.KuryrLoadBalancerHandler._should_ignore( ret = h_lb.KuryrLoadBalancerHandler._should_ignore(
m_handler, loadbalancer_crd) m_handler, loadbalancer_crd)
self.assertEqual(False, ret) self.assertEqual(False, ret)
m_handler._has_pods.assert_called_once_with(loadbalancer_crd) m_handler._has_endpoints.assert_called_once_with(loadbalancer_crd)
def test_has_pods(self): def test_has_endpoints(self):
crd = get_lb_crd() crd = get_lb_crd()
m_handler = mock.Mock(spec=h_lb.KuryrLoadBalancerHandler) m_handler = mock.Mock(spec=h_lb.KuryrLoadBalancerHandler)
ret = h_lb.KuryrLoadBalancerHandler._has_pods(m_handler, crd) ret = h_lb.KuryrLoadBalancerHandler._has_endpoints(m_handler, crd)
self.assertEqual(True, ret) self.assertEqual(True, ret)
@ -422,6 +426,7 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
for member in crd['status']['members']) for member in crd['status']['members'])
return observed_targets return observed_targets
@mock.patch('kuryr_kubernetes.utils.get_subnet_cidr')
@mock.patch('kuryr_kubernetes.controller.drivers.base.' @mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceSecurityGroupsDriver.get_instance') 'ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.' @mock.patch('kuryr_kubernetes.controller.drivers.base.'
@ -435,8 +440,9 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
'.LBaaSDriver.get_instance') '.LBaaSDriver.get_instance')
def test_sync_lbaas_members(self, m_get_drv_lbaas, m_get_drv_project, def test_sync_lbaas_members(self, m_get_drv_lbaas, m_get_drv_project,
m_get_drv_subnets, m_k8s, m_svc_project_drv, m_get_drv_subnets, m_k8s, m_svc_project_drv,
m_svc_sg_drv): m_svc_sg_drv, m_get_cidr):
# REVISIT(ivc): test methods separately and verify ensure/release # REVISIT(ivc): test methods separately and verify ensure/release
m_get_cidr.return_value = '10.0.0.128/26'
project_id = str(uuid.uuid4()) project_id = str(uuid.uuid4())
subnet_id = str(uuid.uuid4()) subnet_id = str(uuid.uuid4())
expected_ip = '1.2.3.4' expected_ip = '1.2.3.4'
@ -453,6 +459,7 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
self.assertEqual(sorted(expected_targets.items()), observed_targets) self.assertEqual(sorted(expected_targets.items()), observed_targets)
self.assertEqual(expected_ip, str(crd['status']['loadbalancer']['ip'])) self.assertEqual(expected_ip, str(crd['status']['loadbalancer']['ip']))
@mock.patch('kuryr_kubernetes.utils.get_subnet_cidr')
@mock.patch('kuryr_kubernetes.controller.drivers.base.' @mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceSecurityGroupsDriver.get_instance') 'ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.' @mock.patch('kuryr_kubernetes.controller.drivers.base.'
@ -466,8 +473,10 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
'.LBaaSDriver.get_instance') '.LBaaSDriver.get_instance')
def test_sync_lbaas_members_udp(self, m_get_drv_lbaas, def test_sync_lbaas_members_udp(self, m_get_drv_lbaas,
m_get_drv_project, m_get_drv_subnets, m_get_drv_project, m_get_drv_subnets,
m_k8s, m_svc_project_drv, m_svc_sg_drv): m_k8s, m_svc_project_drv, m_svc_sg_drv,
m_get_cidr):
# REVISIT(ivc): test methods separately and verify ensure/release # REVISIT(ivc): test methods separately and verify ensure/release
m_get_cidr.return_value = '10.0.0.128/26'
project_id = str(uuid.uuid4()) project_id = str(uuid.uuid4())
subnet_id = str(uuid.uuid4()) subnet_id = str(uuid.uuid4())
expected_ip = "1.2.3.4" expected_ip = "1.2.3.4"
@ -485,6 +494,7 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
self.assertEqual(sorted(expected_targets.items()), observed_targets) self.assertEqual(sorted(expected_targets.items()), observed_targets)
self.assertEqual(expected_ip, str(crd['status']['loadbalancer']['ip'])) self.assertEqual(expected_ip, str(crd['status']['loadbalancer']['ip']))
@mock.patch('kuryr_kubernetes.utils.get_subnet_cidr')
@mock.patch('kuryr_kubernetes.controller.drivers.base.' @mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceSecurityGroupsDriver.get_instance') 'ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.' @mock.patch('kuryr_kubernetes.controller.drivers.base.'
@ -498,8 +508,9 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
'.LBaaSDriver.get_instance') '.LBaaSDriver.get_instance')
def test_sync_lbaas_members_svc_listener_port_edit( def test_sync_lbaas_members_svc_listener_port_edit(
self, m_get_drv_lbaas, m_get_drv_project, m_get_drv_subnets, self, m_get_drv_lbaas, m_get_drv_project, m_get_drv_subnets,
m_k8s, m_svc_project_drv, m_svc_sg_drv): m_k8s, m_svc_project_drv, m_svc_sg_drv, m_get_cidr):
# REVISIT(ivc): test methods separately and verify ensure/release # REVISIT(ivc): test methods separately and verify ensure/release
m_get_cidr.return_value = '10.0.0.128/26'
project_id = str(uuid.uuid4()) project_id = str(uuid.uuid4())
subnet_id = str(uuid.uuid4()) subnet_id = str(uuid.uuid4())
expected_ip = '1.2.3.4' expected_ip = '1.2.3.4'
@ -523,6 +534,7 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
self.assertEqual(expected_ip, str(crd['status']['loadbalancer']['ip'])) self.assertEqual(expected_ip, str(crd['status']['loadbalancer']['ip']))
@mock.patch('kuryr_kubernetes.utils.get_subnet_cidr')
@mock.patch('kuryr_kubernetes.controller.drivers.base.' @mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceSecurityGroupsDriver.get_instance') 'ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.' @mock.patch('kuryr_kubernetes.controller.drivers.base.'
@ -537,7 +549,8 @@ class TestKuryrLoadBalancerHandler(test_base.TestCase):
def test_add_new_members_udp(self, m_get_drv_lbaas, def test_add_new_members_udp(self, m_get_drv_lbaas,
m_get_drv_project, m_get_drv_subnets, m_get_drv_project, m_get_drv_subnets,
m_k8s, m_svc_project_drv, m_k8s, m_svc_project_drv,
m_svc_sg_drv): m_svc_sg_drv, m_get_cidr):
m_get_cidr.return_value = '10.0.0.128/26'
project_id = str(uuid.uuid4()) project_id = str(uuid.uuid4())
subnet_id = str(uuid.uuid4()) subnet_id = str(uuid.uuid4())
crd = get_lb_crd() crd = get_lb_crd()

View File

@ -351,3 +351,27 @@ class TestUtils(test_base.TestCase):
self.assertRaises(os_exc.ResourceNotFound, utils.get_subnet_cidr, self.assertRaises(os_exc.ResourceNotFound, utils.get_subnet_cidr,
subnet_id) subnet_id)
os_net.get_subnet.assert_called_once_with(subnet_id) os_net.get_subnet.assert_called_once_with(subnet_id)
def test_get_current_endpoints_target_with_target_ref(self):
ep = {'addresses': ['10.0.2.107'], 'conditions': {'ready': True},
'targetRef': {'kind': 'Pod', 'name': 'test-868d9cbd68-xq2fl',
'namespace': 'test2'}}
port = {'port': 8080, 'protocol': 'TCP'}
spec_ports = {None: '31d59e41-05db-4a39-8aca-6a9a572c83cd'}
ep_name = 'test'
target = utils.get_current_endpoints_target(
ep, port, spec_ports, ep_name)
self.assertEqual(
target, ('10.0.2.107', 'test-868d9cbd68-xq2fl', 8080,
'31d59e41-05db-4a39-8aca-6a9a572c83cd'))
def test_get_current_endpoints_target_without_target_ref(self):
ep = {'addresses': ['10.0.1.208'], 'conditions': {'ready': True}}
port = {'port': 8080, 'protocol': 'TCP'}
spec_ports = {None: '4472fab1-f01c-46a7-b197-5cba4f2d7135'}
ep_name = 'test'
target = utils.get_current_endpoints_target(
ep, port, spec_ports, ep_name)
self.assertEqual(
target, ('10.0.1.208', 'test', 8080,
'4472fab1-f01c-46a7-b197-5cba4f2d7135'))

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import ipaddress
import random import random
import socket import socket
import time import time
@ -441,3 +442,71 @@ def clean_lb_crd_status(loadbalancer_name):
LOG.exception('Error updating KuryrLoadbalancer CRD %s', LOG.exception('Error updating KuryrLoadbalancer CRD %s',
name) name)
raise raise
def is_kubernetes_default_resource(obj):
"""Check if Object is a resource associated to the API
Verifies if the Object is on the default namespace
and has the name kubernetes. Those name and namespace
are given to Kubernetes Service and Endpoints for the API.
:param obj: Kubernetes object dict
:returns: True if is default resource for the API, false
otherwise.
"""
return (obj['metadata']['name'] == 'kubernetes' and
obj['metadata']['namespace'] == 'default')
def get_pod_by_ip(pod_ip, namespace=None):
k8s = clients.get_kubernetes_client()
pod = {}
try:
if namespace:
pods = k8s.get(f'{constants.K8S_API_BASE}/namespaces/{namespace}/'
f'pods?fieldSelector=status.phase=Running,'
f'status.podIP={pod_ip}')
else:
pods = k8s.get(f'{constants.K8S_API_BASE}/'
f'pods?fieldSelector=status.phase=Running,'
f'status.podIP={pod_ip}')
except exceptions.K8sClientException:
LOG.exception('Error retrieving Pod with IP %s', pod_ip)
raise
if pods.get('items'):
# Only one Pod should have the IP
return pods['items'][0]
return pod
def get_current_endpoints_target(ep, port, spec_ports, ep_name):
"""Retrieve details about one specific Endpoint target
Defines the details about the Endpoint target, such as the
target address, name, port value and the Pool ID. In case,
the Endpoints has no targetRef defined, the name of the
target will be the same as the Endpoint.
:param ep: Endpoint on the Endpoints object
:param port: Endpoint port
:param spec_ports: dict of port name associated to pool ID
:param ep_name: Name of the Endpoints object
:returns: Tuple with target address, target name, port number
and pool ID.
"""
target_ref = ep.get('targetRef', {})
pod_name = ep_name
# NOTE(maysams): As we don't support dual-stack, we assume
# only one address is possible on the addresses field.
address = ep['addresses'][0]
if target_ref:
pod_name = target_ref.get('name', '')
return (address, pod_name, port['port'],
spec_ports.get(port.get('name')))
def is_ip_on_subnet(nodes_subnet, target_ip):
return (nodes_subnet and
(ipaddress.ip_address(target_ip) in
ipaddress.ip_network(nodes_subnet)))