Browse Source

Merge "Update loadbalancer CRD with service spec and rely on CRD"

changes/92/743792/3
Zuul 3 days ago
committed by Gerrit Code Review
parent
commit
3eeb67d84e
18 changed files with 2539 additions and 1833 deletions
  1. +3
    -3
      .zuul.d/octavia.yaml
  2. +2
    -2
      .zuul.d/sdn.yaml
  3. +1
    -0
      devstack/plugin.sh
  4. +1
    -1
      devstack/settings
  5. +42
    -13
      kubernetes_crds/kuryr_crds/kuryrloadbalancer.yaml
  6. +2
    -0
      kuryr_kubernetes/constants.py
  7. +17
    -15
      kuryr_kubernetes/controller/drivers/lb_public_ip.py
  8. +120
    -108
      kuryr_kubernetes/controller/drivers/lbaasv2.py
  9. +205
    -583
      kuryr_kubernetes/controller/handlers/lbaas.py
  10. +810
    -0
      kuryr_kubernetes/controller/handlers/loadbalancer.py
  11. +15
    -0
      kuryr_kubernetes/k8s_client.py
  12. +58
    -38
      kuryr_kubernetes/tests/unit/controller/drivers/test_lb_public_ip.py
  13. +377
    -257
      kuryr_kubernetes/tests/unit/controller/drivers/test_lbaasv2.py
  14. +228
    -763
      kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py
  15. +525
    -0
      kuryr_kubernetes/tests/unit/controller/handlers/test_loadbalancer.py
  16. +110
    -31
      kuryr_kubernetes/tests/unit/test_utils.py
  17. +20
    -17
      kuryr_kubernetes/utils.py
  18. +3
    -2
      setup.cfg

+ 3
- 3
.zuul.d/octavia.yaml View File

@@ -99,7 +99,7 @@
vars:
devstack_localrc:
DOCKER_CGROUP_DRIVER: "systemd"
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport
KURYR_ENABLED_HANDLERS: vif,endpoints,service,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport,kuryrloadbalancer
KURYR_SG_DRIVER: policy
KURYR_SUBNET_DRIVER: namespace
devstack_services:
@@ -120,7 +120,7 @@
vars:
devstack_localrc:
KURYR_SUBNET_DRIVER: namespace
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport
KURYR_ENABLED_HANDLERS: vif,endpoints,service,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport,kuryrloadbalancer
KURYR_SG_DRIVER: policy
KURYR_USE_PORT_POOLS: true
KURYR_POD_VIF_DRIVER: neutron-vif
@@ -134,7 +134,7 @@
parent: kuryr-kubernetes-tempest-containerized
vars:
devstack_localrc:
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport
KURYR_ENABLED_HANDLERS: vif,endpoints,service,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport,kuryrloadbalancer
KURYR_SG_DRIVER: policy
KURYR_SUBNET_DRIVER: namespace



+ 2
- 2
.zuul.d/sdn.yaml View File

@@ -98,7 +98,7 @@
KURYR_LB_ALGORITHM: SOURCE_IP_PORT
KURYR_SUBNET_DRIVER: namespace
KURYR_SG_DRIVER: policy
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport
KURYR_ENABLED_HANDLERS: vif,endpoints,service,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport,kuryrloadbalancer
voting: false

- job:
@@ -144,7 +144,7 @@
KURYR_ENFORCE_SG_RULES: false
KURYR_LB_ALGORITHM: SOURCE_IP_PORT
KURYR_HYPERKUBE_VERSION: v1.16.0
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport
KURYR_ENABLED_HANDLERS: vif,endpoints,service,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport,kuryrloadbalancer
KURYR_SG_DRIVER: policy
KURYR_SUBNET_DRIVER: namespace
KURYR_K8S_CONTAINERIZED_DEPLOYMENT: true


+ 1
- 0
devstack/plugin.sh View File

@@ -974,6 +974,7 @@ function update_tempest_conf_file {
iniset $TEMPEST_CONFIG kuryr_kubernetes validate_crd True
iniset $TEMPEST_CONFIG kuryr_kubernetes kuryrnetworks True
iniset $TEMPEST_CONFIG kuryr_kubernetes kuryrports True
iniset $TEMPEST_CONFIG kuryr_kubernetes kuryrloadbalancers True
}

source $DEST/kuryr-kubernetes/devstack/lib/kuryr_kubernetes


+ 1
- 1
devstack/settings View File

@@ -43,7 +43,7 @@ KURYR_K8S_API_LB_PORT=${KURYR_K8S_API_LB_PORT:-443}
KURYR_PORT_DEBUG=${KURYR_PORT_DEBUG:-True}
KURYR_SUBNET_DRIVER=${KURYR_SUBNET_DRIVER:-default}
KURYR_SG_DRIVER=${KURYR_SG_DRIVER:-default}
KURYR_ENABLED_HANDLERS=${KURYR_ENABLED_HANDLERS:-vif,lb,lbaasspec,kuryrport}
KURYR_ENABLED_HANDLERS=${KURYR_ENABLED_HANDLERS:-vif,endpoints,service,kuryrloadbalancer,kuryrport}

# OpenShift
OPENSHIFT_BINARY_VERSION=${OPENSHIFT_BINARY_VERSION:-v3.11.0}


+ 42
- 13
kubernetes_crds/kuryr_crds/kuryrloadbalancer.yaml View File

@@ -29,13 +29,6 @@ spec:
properties:
spec:
type: object
required:
- ip
- ports
- project_id
- security_groups_ids
- subnet_id
- type
properties:
ip:
type: string
@@ -46,7 +39,6 @@ spec:
items:
type: object
required:
- name
- port
- protocol
- targetPort
@@ -69,13 +61,50 @@ spec:
type: string
type:
type: string
subsets:
type: array
items:
type: object
properties:
addresses:
type: array
items:
type: object
properties:
hostname:
type: string
ip:
type: string
nodeName:
type: string
targetRef:
type: object
properties:
apiVersion:
type: string
kind:
type: string
name:
type: string
namespace:
type: string
resourceVersion:
type: string
uid:
type: string
ports:
type: array
items:
type: object
properties:
name:
type: string
port:
type: integer
protocol:
type: string
status:
type: object
required:
- listeners
- loadbalancer
- members
- pools
properties:
listeners:
type: array


+ 2
- 0
kuryr_kubernetes/constants.py View File

@@ -66,6 +66,8 @@ K8S_ANNOTATION_NEUTRON_PORT = 'neutron_id'

POD_FINALIZER = KURYR_FQDN + '/pod-finalizer'
KURYRNETWORK_FINALIZER = 'kuryrnetwork.finalizers.kuryr.openstack.org'
KURYRLB_FINALIZER = 'kuryr.openstack.org/kuryrloadbalancer-finalizers'
SERVICE_FINALIZER = 'kuryr.openstack.org/service-finalizer'

KURYRPORT_FINALIZER = KURYR_FQDN + '/kuryrport-finalizer'
KURYRPORT_LABEL = KURYR_FQDN + '/nodeName'


+ 17
- 15
kuryr_kubernetes/controller/drivers/lb_public_ip.py View File

@@ -15,7 +15,6 @@
from kuryr_kubernetes import config
from kuryr_kubernetes.controller.drivers import base
from kuryr_kubernetes.controller.drivers import public_ip
from kuryr_kubernetes.objects import lbaas as obj_lbaas
from oslo_config import cfg
from oslo_log import log as logging

@@ -50,10 +49,11 @@ class FloatingIpServicePubIPDriver(base.ServicePubIpDriver):
res_id = self._drv_pub_ip.is_ip_available(user_specified_ip,
port_id_to_be_associated)
if res_id:
service_pub_ip_info = (obj_lbaas.LBaaSPubIp(
ip_id=res_id,
ip_addr=str(user_specified_ip),
alloc_method='user'))
service_pub_ip_info = {
'ip_id': res_id,
'ip_addr': str(user_specified_ip),
'alloc_method': 'user'
}

return service_pub_ip_info
else:
@@ -78,32 +78,34 @@ class FloatingIpServicePubIPDriver(base.ServicePubIpDriver):
LOG.exception("Failed to allocate public IP - net_id:%s",
public_network_id)
return None
service_pub_ip_info = obj_lbaas.LBaaSPubIp(ip_id=res_id,
ip_addr=alloc_ip_addr,
alloc_method='pool')
service_pub_ip_info = {
'ip_id': res_id,
'ip_addr': alloc_ip_addr,
'alloc_method': 'pool'
}

return service_pub_ip_info

def release_pub_ip(self, service_pub_ip_info):
if not service_pub_ip_info:
return True
if service_pub_ip_info.alloc_method == 'pool':
retcode = self._drv_pub_ip.free_ip(service_pub_ip_info.ip_id)
if service_pub_ip_info['alloc_method'] == 'pool':
retcode = self._drv_pub_ip.free_ip(service_pub_ip_info['ip_id'])
if not retcode:
LOG.error("Failed to delete public_ip_id =%s !",
service_pub_ip_info.ip_id)
service_pub_ip_info['ip_id'])
return False
return True

def associate_pub_ip(self, service_pub_ip_info, vip_port_id):
if (not service_pub_ip_info or
not vip_port_id or
not service_pub_ip_info.ip_id):
not service_pub_ip_info['ip_id']):
return
self._drv_pub_ip.associate(
service_pub_ip_info.ip_id, vip_port_id)
service_pub_ip_info['ip_id'], vip_port_id)

def disassociate_pub_ip(self, service_pub_ip_info):
if not service_pub_ip_info or not service_pub_ip_info.ip_id:
if not service_pub_ip_info or not service_pub_ip_info['ip_id']:
return
self._drv_pub_ip.disassociate(service_pub_ip_info.ip_id)
self._drv_pub_ip.disassociate(service_pub_ip_info['ip_id'])

+ 120
- 108
kuryr_kubernetes/controller/drivers/lbaasv2.py View File

@@ -27,7 +27,6 @@ from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes.controller.drivers import base
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.objects import lbaas as obj_lbaas
from kuryr_kubernetes import utils

CONF = cfg.CONF
@@ -112,7 +111,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
return "%s/%s" % (namespace, svc_name)

def get_loadbalancer_pool_name(self, loadbalancer, namespace, svc_name):
return "%s/%s/%s" % (loadbalancer.name, namespace, svc_name)
return "%s/%s/%s" % (loadbalancer['name'], namespace, svc_name)

def add_tags(self, resource, req):
if CONF.neutron_defaults.resource_tags:
@@ -126,9 +125,14 @@ class LBaaSv2Driver(base.LBaaSDriver):
def ensure_loadbalancer(self, name, project_id, subnet_id, ip,
security_groups_ids=None, service_type=None,
provider=None):
request = obj_lbaas.LBaaSLoadBalancer(
name=name, project_id=project_id, subnet_id=subnet_id, ip=ip,
security_groups=security_groups_ids, provider=provider)
request = {
'name': name,
'project_id': project_id,
'subnet_id': subnet_id,
'ip': ip,
'security_groups': security_groups_ids,
'provider': provider
}
response = self._ensure(self._create_loadbalancer,
self._find_loadbalancer, request)
if not response:
@@ -146,9 +150,8 @@ class LBaaSv2Driver(base.LBaaSDriver):
loadbalancer,
loadbalancer,
lbaas.delete_load_balancer,
loadbalancer.id,
loadbalancer['id'],
cascade=True)

self._wait_for_deletion(loadbalancer, _ACTIVATION_TIMEOUT)

def _create_listeners_acls(self, loadbalancer, port, target_port,
@@ -160,7 +163,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
if new_sgs:
sgs = new_sgs
else:
sgs = loadbalancer.security_groups
sgs = loadbalancer['security_groups']

# Check if Network Policy allows listener on the pods
for sg in sgs:
@@ -210,7 +213,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
if vip_port:
lb_sg = vip_port.security_group_ids[0]
else:
LOG.debug("Skipping sg update for lb %s", loadbalancer.name)
LOG.debug("Skipping sg update for lb %s", loadbalancer['name'])
return

# NOTE (maysams) It might happen that the update of LBaaS SG
@@ -225,14 +228,14 @@ class LBaaSv2Driver(base.LBaaSDriver):
return

lbaas_sg_rules = os_net.security_group_rules(
security_group_id=lb_sg, project_id=loadbalancer.project_id)
security_group_id=lb_sg, project_id=loadbalancer['project_id'])
all_pod_rules = []
add_default_rules = False

if new_sgs:
sgs = new_sgs
else:
sgs = loadbalancer.security_groups
sgs = loadbalancer['security_groups']

sg_rule_ethertype = k_const.IPv4
if utils.get_service_subnet_version() == k_const.IP_VERSION_6:
@@ -325,12 +328,14 @@ class LBaaSv2Driver(base.LBaaSDriver):

def ensure_listener(self, loadbalancer, protocol, port,
service_type='ClusterIP'):
name = "%s:%s:%s" % (loadbalancer.name, protocol, port)
listener = obj_lbaas.LBaaSListener(name=name,
project_id=loadbalancer.project_id,
loadbalancer_id=loadbalancer.id,
protocol=protocol,
port=port)
name = "%s:%s:%s" % (loadbalancer['name'], protocol, port)
listener = {
'name': name,
'project_id': loadbalancer['project_id'],
'loadbalancer_id': loadbalancer['id'],
'protocol': protocol,
'port': port
}
try:
result = self._ensure_provisioned(
loadbalancer, listener, self._create_listener,
@@ -348,7 +353,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
os_net = clients.get_network_client()
vip_port = self._get_vip_port(loadbalancer)
os_net.update_port(vip_port.id, security_groups=[])
loadbalancer.security_groups = []
loadbalancer['security_groups'] = []

return result

@@ -357,7 +362,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
lbaas = clients.get_loadbalancer_client()
self._release(loadbalancer, listener,
lbaas.delete_listener,
listener.id)
listener['id'])

# NOTE(maysams): since lbs created with ovn-octavia provider
# does not have a sg in place, only need to delete sg rules
@@ -367,19 +372,22 @@ class LBaaSv2Driver(base.LBaaSDriver):
sg_id = self._get_vip_port(loadbalancer).security_group_ids[0]
if sg_id:
rules = os_net.security_group_rules(security_group_id=sg_id,
description=listener.name)
description=listener[
'name'])
try:
os_net.delete_security_group_rule(next(rules).id)
except StopIteration:
LOG.warning('Cannot find SG rule for %s (%s) listener.',
listener.id, listener.name)
listener['id'], listener['name'])

def ensure_pool(self, loadbalancer, listener):
pool = obj_lbaas.LBaaSPool(name=listener.name,
project_id=loadbalancer.project_id,
loadbalancer_id=loadbalancer.id,
listener_id=listener.id,
protocol=listener.protocol)
pool = {
'name': listener['name'],
'project_id': loadbalancer['project_id'],
'loadbalancer_id': loadbalancer['id'],
'listener_id': listener['id'],
'protocol': listener['protocol']
}
return self._ensure_provisioned(loadbalancer, pool,
self._create_pool,
self._find_pool)
@@ -388,30 +396,34 @@ class LBaaSv2Driver(base.LBaaSDriver):
svc_name, protocol):
name = self.get_loadbalancer_pool_name(loadbalancer,
namespace, svc_name)
pool = obj_lbaas.LBaaSPool(name=name,
project_id=loadbalancer.project_id,
loadbalancer_id=loadbalancer.id,
listener_id=None,
protocol=protocol)
pool = {
'name': name,
'project_id': loadbalancer['project_id'],
'loadbalancer_id': loadbalancer['id'],
'listener_id': None,
'protocol': protocol
}
return self._ensure_provisioned(loadbalancer, pool,
self._create_pool,
self._find_pool_by_name)

def release_pool(self, loadbalancer, pool):
lbaas = clients.get_loadbalancer_client()
self._release(loadbalancer, pool, lbaas.delete_pool, pool.id)
self._release(loadbalancer, pool, lbaas.delete_pool, pool['id'])

def ensure_member(self, loadbalancer, pool,
subnet_id, ip, port, target_ref_namespace,
target_ref_name, listener_port=None):
name = ("%s/%s" % (target_ref_namespace, target_ref_name))
name += ":%s" % port
member = obj_lbaas.LBaaSMember(name=name,
project_id=loadbalancer.project_id,
pool_id=pool.id,
subnet_id=subnet_id,
ip=ip,
port=port)
member = {
'name': name,
'project_id': loadbalancer['project_id'],
'pool_id': pool['id'],
'subnet_id': subnet_id,
'ip': ip,
'port': port
}
result = self._ensure_provisioned(loadbalancer, member,
self._create_member,
self._find_member)
@@ -421,9 +433,9 @@ class LBaaSv2Driver(base.LBaaSDriver):
CONF.kubernetes.service_security_groups_driver == 'policy')
if (network_policy and CONF.octavia_defaults.enforce_sg_rules and
listener_port):
protocol = pool.protocol
sg_rule_name = pool.name
listener_id = pool.listener_id
protocol = pool['protocol']
sg_rule_name = pool['name']
listener_id = pool['listener_id']
self._apply_members_security_groups(loadbalancer, listener_port,
port, protocol, sg_rule_name,
listener_id)
@@ -431,14 +443,14 @@ class LBaaSv2Driver(base.LBaaSDriver):

def release_member(self, loadbalancer, member):
lbaas = clients.get_loadbalancer_client()
self._release(loadbalancer, member, lbaas.delete_member, member.id,
member.pool_id)
self._release(loadbalancer, member, lbaas.delete_member, member['id'],
member['pool_id'])

def _get_vip_port(self, loadbalancer):
os_net = clients.get_network_client()
try:
fixed_ips = ['subnet_id=%s' % str(loadbalancer.subnet_id),
'ip_address=%s' % str(loadbalancer.ip)]
fixed_ips = ['subnet_id=%s' % str(loadbalancer['subnet_id']),
'ip_address=%s' % str(loadbalancer['ip'])]
ports = os_net.ports(fixed_ips=fixed_ips)
except os_exc.SDKException:
LOG.error("Port with fixed ips %s not found!", fixed_ips)
@@ -451,43 +463,43 @@ class LBaaSv2Driver(base.LBaaSDriver):

def _create_loadbalancer(self, loadbalancer):
request = {
'name': loadbalancer.name,
'project_id': loadbalancer.project_id,
'vip_address': str(loadbalancer.ip),
'vip_subnet_id': loadbalancer.subnet_id,
'name': loadbalancer['name'],
'project_id': loadbalancer['project_id'],
'vip_address': str(loadbalancer['ip']),
'vip_subnet_id': loadbalancer['subnet_id'],
}

if loadbalancer.provider is not None:
request['provider'] = loadbalancer.provider
if loadbalancer['provider'] is not None:
request['provider'] = loadbalancer['provider']

self.add_tags('loadbalancer', request)

lbaas = clients.get_loadbalancer_client()
response = lbaas.create_load_balancer(**request)
loadbalancer.id = response.id
loadbalancer.port_id = self._get_vip_port(loadbalancer).id
if (loadbalancer.provider is not None and
loadbalancer.provider != response.provider):
loadbalancer['id'] = response.id
loadbalancer['port_id'] = self._get_vip_port(loadbalancer).id
if (loadbalancer['provider'] is not None and
loadbalancer['provider'] != response.provider):
LOG.error("Request provider(%s) != Response provider(%s)",
loadbalancer.provider, response.provider)
loadbalancer['provider'], response.provider)
return None
loadbalancer.provider = response.provider
loadbalancer['provider'] = response.provider
return loadbalancer

def _find_loadbalancer(self, loadbalancer):
lbaas = clients.get_loadbalancer_client()
response = lbaas.load_balancers(
name=loadbalancer.name,
project_id=loadbalancer.project_id,
vip_address=str(loadbalancer.ip),
vip_subnet_id=loadbalancer.subnet_id,
provider=loadbalancer.provider)
name=loadbalancer['name'],
project_id=loadbalancer['project_id'],
vip_address=str(loadbalancer['ip']),
vip_subnet_id=loadbalancer['subnet_id'],
provider=loadbalancer['provider'])

try:
os_lb = next(response) # openstacksdk returns a generator
loadbalancer.id = os_lb.id
loadbalancer.port_id = self._get_vip_port(loadbalancer).id
loadbalancer.provider = os_lb.provider
loadbalancer['id'] = os_lb.id
loadbalancer['port_id'] = self._get_vip_port(loadbalancer).id
loadbalancer['provider'] = os_lb.provider
if os_lb.provisioning_status == 'ERROR':
self.release_loadbalancer(loadbalancer)
return None
@@ -498,16 +510,16 @@ class LBaaSv2Driver(base.LBaaSDriver):

def _create_listener(self, listener):
request = {
'name': listener.name,
'project_id': listener.project_id,
'loadbalancer_id': listener.loadbalancer_id,
'protocol': listener.protocol,
'protocol_port': listener.port,
'name': listener['name'],
'project_id': listener['project_id'],
'loadbalancer_id': listener['loadbalancer_id'],
'protocol': listener['protocol'],
'protocol_port': listener['port'],
}
self.add_tags('listener', request)
lbaas = clients.get_loadbalancer_client()
response = lbaas.create_listener(**request)
listener.id = response.id
listener['id'] = response.id
return listener

def _update_listener_acls(self, loadbalancer, listener_id, allowed_cidrs):
@@ -538,15 +550,15 @@ class LBaaSv2Driver(base.LBaaSDriver):
def _find_listener(self, listener, loadbalancer):
lbaas = clients.get_loadbalancer_client()
response = lbaas.listeners(
name=listener.name,
project_id=listener.project_id,
load_balancer_id=listener.loadbalancer_id,
protocol=listener.protocol,
protocol_port=listener.port)
name=listener['name'],
project_id=listener['project_id'],
load_balancer_id=listener['loadbalancer_id'],
protocol=listener['protocol'],
protocol_port=listener['port'])

try:
os_listener = next(response)
listener.id = os_listener.id
listener['id'] = os_listener.id
if os_listener.provisioning_status == 'ERROR':
LOG.debug("Releasing listener %s", os_listener.id)
self.release_listener(loadbalancer, listener)
@@ -560,34 +572,34 @@ class LBaaSv2Driver(base.LBaaSDriver):
# TODO(ivc): make lb_algorithm configurable
lb_algorithm = CONF.octavia_defaults.lb_algorithm
request = {
'name': pool.name,
'project_id': pool.project_id,
'listener_id': pool.listener_id,
'loadbalancer_id': pool.loadbalancer_id,
'protocol': pool.protocol,
'name': pool['name'],
'project_id': pool['project_id'],
'listener_id': pool['listener_id'],
'loadbalancer_id': pool['loadbalancer_id'],
'protocol': pool['protocol'],
'lb_algorithm': lb_algorithm,
}
self.add_tags('pool', request)
lbaas = clients.get_loadbalancer_client()
response = lbaas.create_pool(**request)
pool.id = response.id
pool['id'] = response.id
return pool

def _find_pool(self, pool, loadbalancer, by_listener=True):
lbaas = clients.get_loadbalancer_client()
response = lbaas.pools(
name=pool.name,
project_id=pool.project_id,
loadbalancer_id=pool.loadbalancer_id,
protocol=pool.protocol)
name=pool['name'],
project_id=pool['project_id'],
loadbalancer_id=pool['loadbalancer_id'],
protocol=pool['protocol'])
# TODO(scavnic) check response
try:
if by_listener:
pools = [p for p in response if pool.listener_id
pools = [p for p in response if pool['listener_id']
in {listener['id'] for listener in p.listeners}]
else:
pools = [p for p in response if pool.name == p.name]
pool.id = pools[0].id
pool['id'] = pools[0].id
if pools[0].provisioning_status == 'ERROR':
LOG.debug("Releasing pool %s", pool.id)
self.release_pool(loadbalancer, pool)
@@ -601,31 +613,31 @@ class LBaaSv2Driver(base.LBaaSDriver):

def _create_member(self, member):
request = {
'name': member.name,
'project_id': member.project_id,
'subnet_id': member.subnet_id,
'address': str(member.ip),
'protocol_port': member.port,
'name': member['name'],
'project_id': member['project_id'],
'subnet_id': member['subnet_id'],
'address': str(member['ip']),
'protocol_port': member['port'],
}
self.add_tags('member', request)
lbaas = clients.get_loadbalancer_client()
response = lbaas.create_member(member.pool_id, **request)
member.id = response.id
response = lbaas.create_member(member['pool_id'], **request)
member['id'] = response.id
return member

def _find_member(self, member, loadbalancer):
lbaas = clients.get_loadbalancer_client()
response = lbaas.members(
member.pool_id,
name=member.name,
project_id=member.project_id,
subnet_id=member.subnet_id,
address=member.ip,
protocol_port=member.port)
member['pool_id'],
name=member['name'],
project_id=member['project_id'],
subnet_id=member['subnet_id'],
address=member['ip'],
protocol_port=member['port'])

try:
os_members = next(response)
member.id = os_members.id
member['id'] = os_members.id
if os_members.provisioning_status == 'ERROR':
LOG.debug("Releasing Member %s", os_members.id)
self.release_member(loadbalancer, member)
@@ -683,7 +695,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
lbaas = clients.get_loadbalancer_client()

for remaining in self._provisioning_timer(timeout, interval):
response = lbaas.get_load_balancer(loadbalancer.id)
response = lbaas.get_load_balancer(loadbalancer['id'])
status = response.provisioning_status
if status == 'ACTIVE':
LOG.debug("Provisioning complete for %(lb)s", {
@@ -691,7 +703,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
return
elif status == 'ERROR':
LOG.debug("Releasing loadbalancer %s with error status",
loadbalancer.id)
loadbalancer['id'])
self.release_loadbalancer(loadbalancer)
break
else:
@@ -708,7 +720,7 @@ class LBaaSv2Driver(base.LBaaSDriver):

for remaining in self._provisioning_timer(timeout, interval):
try:
lbaas.get_load_balancer(loadbalancer.id)
lbaas.get_load_balancer(loadbalancer['id'])
except os_exc.NotFoundException:
return

@@ -753,7 +765,7 @@ class LBaaSv2Driver(base.LBaaSDriver):

utils.set_lbaas_state(endpoint, lbaas)

lsnr_ids = {(listener.protocol, listener.port): listener.id
lsnr_ids = {(listener['protocol'], listener['port']): listener['id']
for listener in lbaas.listeners}

for port in svc_ports:


+ 205
- 583
kuryr_kubernetes/controller/handlers/lbaas.py View File

@@ -13,18 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.

import eventlet
import time

from kuryr.lib._i18n import _
from openstack import exceptions as os_exc
from oslo_log import log as logging

from kuryr_kubernetes import clients
from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes.controller.drivers import base as drv_base
from kuryr_kubernetes.controller.drivers import utils as driver_utils
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes.objects import lbaas as obj_lbaas
@@ -35,10 +30,10 @@ LOG = logging.getLogger(__name__)
SUPPORTED_SERVICE_TYPES = ('ClusterIP', 'LoadBalancer')


class LBaaSSpecHandler(k8s_base.ResourceEventHandler):
"""LBaaSSpecHandler handles K8s Service events.
class ServiceHandler(k8s_base.ResourceEventHandler):
"""ServiceHandler handles K8s Service events.

LBaaSSpecHandler handles K8s Service events and updates related Endpoints
ServiceHandler handles K8s Service events and updates related Endpoints
with LBaaSServiceSpec when necessary.
"""

@@ -46,14 +41,12 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler):
OBJECT_WATCH_PATH = "%s/%s" % (k_const.K8S_API_BASE, "services")

def __init__(self):
super(LBaaSSpecHandler, self).__init__()
super(ServiceHandler, self).__init__()
self._drv_project = drv_base.ServiceProjectDriver.get_instance()
self._drv_subnets = drv_base.ServiceSubnetsDriver.get_instance()
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()

def on_present(self, service):
lbaas_spec = utils.get_lbaas_spec(service)

if self._should_ignore(service):
LOG.debug("Skipping Kubernetes service %s of an unsupported kind "
"or without a selector as Kubernetes does not create "
@@ -61,9 +54,18 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler):
service['metadata']['name'])
return

if self._has_lbaas_spec_changes(service, lbaas_spec):
lbaas_spec = self._generate_lbaas_spec(service)
utils.set_lbaas_spec(service, lbaas_spec)
k8s = clients.get_kubernetes_client()
loadbalancer_crd = k8s.get_loadbalancer_crd(service)
try:
self._patch_service_finalizer(service)
except k_exc.K8sClientException as ex:
LOG.exception("Failed to set service finalizer: %s", ex)
raise

if loadbalancer_crd is None:
loadbalancer_crd = self.create_crd_spec(service)
elif self._has_lbaas_spec_changes(service, loadbalancer_crd):
loadbalancer_crd = self._update_crd_spec(loadbalancer_crd, service)

def _is_supported_type(self, service):
spec = service['spec']
@@ -75,12 +77,22 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler):
return None

def _should_ignore(self, service):
return (not(self._has_selector(service)) or
not(self._has_clusterip(service)) or
return (not(self._has_clusterip(service)) or
not(self._is_supported_type(service)))

def _has_selector(self, service):
return service['spec'].get('selector')
def _patch_service_finalizer(self, service):
k8s = clients.get_kubernetes_client()
k8s.add_finalizer(service, k_const.SERVICE_FINALIZER)

def on_finalize(self, service):
k8s = clients.get_kubernetes_client()

svc_name = service['metadata']['name']
svc_namespace = service['metadata']['namespace']

klb_crd_path = (f"{k_const.K8S_API_CRD_NAMESPACES}/"
f"{svc_namespace}/kuryrloadbalancers/{svc_name}")
k8s.delete(klb_crd_path)

def _has_clusterip(self, service):
# ignore headless service, clusterIP is None
@@ -97,45 +109,113 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler):
if len(subnet_ids) != 1:
raise k_exc.IntegrityError(_(
"Found %(num)s subnets for service %(link)s IP %(ip)s") % {
'link': service['metadata']['selfLink'],
'ip': ip,
'num': len(subnet_ids)})
'link': service['metadata']['selfLink'],
'ip': ip,
'num': len(subnet_ids)})

return subnet_ids.pop()

def _generate_lbaas_spec(self, service):
def create_crd_spec(self, service):
svc_name = service['metadata']['name']
svc_namespace = service['metadata']['namespace']
kubernetes = clients.get_kubernetes_client()
svc_ip = self._get_service_ip(service)
spec_lb_ip = service['spec'].get('loadBalancerIP')
ports = service['spec'].get('ports')
for port in ports:
if type(port['targetPort']) == int:
port['targetPort'] = str(port['targetPort'])
project_id = self._drv_project.get_project(service)
ip = self._get_service_ip(service)
subnet_id = self._get_subnet_id(service, project_id, ip)
ports = self._generate_lbaas_port_specs(service)
sg_ids = self._drv_sg.get_security_groups(service, project_id)
subnet_id = self._get_subnet_id(service, project_id, svc_ip)
spec_type = service['spec'].get('type')
spec_lb_ip = service['spec'].get('loadBalancerIP')
loadbalancer_crd = {
'apiVersion': 'openstack.org/v1',
'kind': 'KuryrLoadBalancer',
'metadata': {
'name': svc_name,
'finalizers': [k_const.KURYRLB_FINALIZER],
},
'spec': {
'ip': svc_ip,
'ports': ports,
'project_id': project_id,
'security_groups_ids': sg_ids,
'subnet_id': subnet_id,
'type': spec_type
},
'status': {
}
}

if spec_lb_ip is not None:
loadbalancer_crd['spec']['lb_ip'] = spec_lb_ip

try:
kubernetes.post('{}/{}/kuryrloadbalancers'.format(
k_const.K8S_API_CRD_NAMESPACES, svc_namespace),
loadbalancer_crd)
except k_exc.K8sConflict:
raise k_exc.ResourceNotReady(svc_name)
except k_exc.K8sClientException:
LOG.exception("Kubernetes Client Exception creating "
"kuryrloadbalancer CRD. %s"
% k_exc.K8sClientException)
raise
return loadbalancer_crd

return obj_lbaas.LBaaSServiceSpec(ip=ip,
project_id=project_id,
subnet_id=subnet_id,
ports=ports,
security_groups_ids=sg_ids,
type=spec_type,
lb_ip=spec_lb_ip)
def _update_crd_spec(self, loadbalancer_crd, service):
svc_ip = self._get_service_ip(service)
ports = service['spec'].get('ports')
for port in ports:
if type(port['targetPort']) == int:
port['targetPort'] = str(port['targetPort'])
project_id = self._drv_project.get_project(service)
sg_ids = self._drv_sg.get_security_groups(service, project_id)
subnet_id = self._get_subnet_id(service, project_id, svc_ip)
spec_type = service['spec'].get('type')
kubernetes = clients.get_kubernetes_client()

patch = {
'spec': {
'ip': svc_ip,
'ports': ports,
'project_id': project_id,
'security_groups_ids': sg_ids,
'subnet_id': subnet_id,
'type': spec_type
}
}

LOG.debug('Patching KuryrLoadBalancer CRD %s', loadbalancer_crd)
try:
kubernetes.patch_crd('spec', loadbalancer_crd['metadata'][
'selfLink'], patch['spec'])
except k_exc.K8sResourceNotFound:
LOG.debug('KuryrLoadBalancer CRD not found %s', loadbalancer_crd)
except k_exc.K8sConflict:
raise k_exc.ResourceNotReady(loadbalancer_crd)
except k_exc.K8sClientException:
LOG.exception('Error updating kuryrnet CRD %s', loadbalancer_crd)
raise
return loadbalancer_crd

def _has_lbaas_spec_changes(self, service, lbaas_spec):
return (self._has_ip_changes(service, lbaas_spec) or
utils.has_port_changes(service, lbaas_spec))
def _has_lbaas_spec_changes(self, service, loadbalancer_crd):
return (self._has_ip_changes(service, loadbalancer_crd) or
utils.has_port_changes(service, loadbalancer_crd))

def _has_ip_changes(self, service, lbaas_spec):
def _has_ip_changes(self, service, loadbalancer_crd):
link = service['metadata']['selfLink']
svc_ip = self._get_service_ip(service)

if not lbaas_spec:
if svc_ip:
LOG.debug("LBaaS spec is missing for %(link)s"
% {'link': link})
return True
elif str(lbaas_spec.ip) != svc_ip:
if loadbalancer_crd['spec'].get('ip') is None:
if svc_ip is None:
return False
return True
elif str(loadbalancer_crd['spec'].get('ip')) != svc_ip:
LOG.debug("LBaaS spec IP %(spec_ip)s != %(svc_ip)s for %(link)s"
% {'spec_ip': lbaas_spec.ip,
% {'spec_ip': loadbalancer_crd['spec']['ip'],
'svc_ip': svc_ip,
'link': link})
return True
@@ -147,10 +227,10 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler):
for port in utils.get_service_ports(service)]


class LoadBalancerHandler(k8s_base.ResourceEventHandler):
"""LoadBalancerHandler handles K8s Endpoints events.
class EndpointsHandler(k8s_base.ResourceEventHandler):
"""EndpointsHandler handles K8s Endpoints events.

LoadBalancerHandler handles K8s Endpoints events and tracks changes in
EndpointsHandler handles K8s Endpoints events and tracks changes in
LBaaSServiceSpec to update Neutron LBaaS accordingly and to reflect its'
actual state in LBaaSState.
"""
@@ -159,13 +239,11 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
OBJECT_WATCH_PATH = "%s/%s" % (k_const.K8S_API_BASE, "endpoints")

def __init__(self):
super(LoadBalancerHandler, self).__init__()
super(EndpointsHandler, self).__init__()
self._drv_lbaas = drv_base.LBaaSDriver.get_instance()
self._drv_pod_project = drv_base.PodProjectDriver.get_instance()
self._drv_pod_subnets = drv_base.PodSubnetsDriver.get_instance()
self._drv_service_pub_ip = drv_base.ServicePubIpDriver.get_instance()
self._drv_project = drv_base.ServiceProjectDriver.get_instance()
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()
# Note(yboaron) LBaaS driver supports 'provider' parameter in
# Load Balancer creation flow.
# We need to set the requested load balancer provider
@@ -175,104 +253,40 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
!= 'default'):
self._lb_provider = (
config.CONF.kubernetes.endpoints_driver_octavia_provider)
eventlet.spawn(self._cleanup_leftover_lbaas)

def on_present(self, endpoints):
lbaas_spec = utils.get_lbaas_spec(endpoints)
if self._should_ignore(endpoints, lbaas_spec):
k8s = clients.get_kubernetes_client()
loadbalancer_crd = k8s.get_loadbalancer_crd(endpoints)

if not self._has_pods(endpoints):
LOG.debug("Ignoring Kubernetes endpoints %s",
endpoints['metadata']['name'])
return

lbaas_state = utils.get_lbaas_state(endpoints)
if not lbaas_state:
lbaas_state = obj_lbaas.LBaaSState()
elif (lbaas_state.loadbalancer and self._lb_provider and
self._lb_provider != lbaas_state.loadbalancer.provider):
LOG.info("LoadBalancer associated to the service does not match "
"the current provider: %s", lbaas_state.loadbalancer.id)
lb_client = clients.get_loadbalancer_client()
try:
lb_client.get_load_balancer(lbaas_state.loadbalancer.id)
except os_exc.NotFoundException:
# NOTE(ltomasbo): If the loadbalancer is gone, remove the
# annotations to ensure it is reprocessed
lbaas_state.loadbalancer = None
lbaas_state.pools = []
lbaas_state.listeners = []
lbaas_state.members = []
utils.set_lbaas_state(endpoints, lbaas_state)
return
if loadbalancer_crd is None:
loadbalancer_crd = self._create_crd_spec(endpoints)
else:
loadbalancer_crd = self._update_crd_spec(loadbalancer_crd,
endpoints)

def _has_lbaas_spec_changes(self, endpoints, loadbalancer_crd):
return (self._has_ip_changes(endpoints, loadbalancer_crd) or
utils.has_port_changes(endpoints, loadbalancer_crd))

def _has_ip_changes(self, endpoints, loadbalancer_crd):
link = endpoints['metadata']['selfLink']
endpoint_ip = endpoints['subsets']['addresses'].get('ip')
endpoint_crd_ip = loadbalancer_crd['spec'].get('ip')

if endpoint_crd_ip != endpoint_ip:
LOG.debug("LBaaS spec IP %(endpoint_crd_ip)s !="
" %(endpoint_ip)s for %(link)s"
% {'endpoint_crd_ip': endpoint_crd_ip,
'endpoint_ip': endpoint_ip,
'link': link})
return True

if self._sync_lbaas_members(endpoints, lbaas_state, lbaas_spec):
# Note(yboaron) For LoadBalancer services, we should allocate FIP,
# associate it to LB VIP and update K8S service status
if lbaas_state.service_pub_ip_info is None:
service_pub_ip_info = (
self._drv_service_pub_ip.acquire_service_pub_ip_info(
lbaas_spec.type,
lbaas_spec.lb_ip,
lbaas_spec.project_id,
lbaas_state.loadbalancer.port_id))
if service_pub_ip_info:
self._drv_service_pub_ip.associate_pub_ip(
service_pub_ip_info, lbaas_state.loadbalancer.port_id)
lbaas_state.service_pub_ip_info = service_pub_ip_info
self._update_lb_status(
endpoints,
lbaas_state.service_pub_ip_info.ip_addr)
# REVISIT(ivc): since _sync_lbaas_members is responsible for
# creating all lbaas components (i.e. load balancer, listeners,
# pools, members), it is currently possible for it to fail (due
# to invalid Kuryr/K8s/Neutron configuration, e.g. Members' IPs
# not belonging to configured Neutron subnet or Service IP being
# in use by gateway or VMs) leaving some Neutron entities without
# properly updating annotation. Some sort of failsafe mechanism is
# required to deal with such situations (e.g. cleanup, or skip
# failing items, or validate configuration) to prevent annotation
# being out of sync with the actual Neutron state.
try:
utils.set_lbaas_state(endpoints, lbaas_state)
except k_exc.K8sResourceNotFound:
# Note(yboaron) It's impossible to store neutron resources
# in K8S object since object was deleted. In that case
# we should rollback all neutron resources.
LOG.debug("LoadBalancerHandler failed to store Openstack "
"resources in K8S object (not found)")
self.on_deleted(endpoints, lbaas_state)

def on_deleted(self, endpoints, lbaas_state=None):
if lbaas_state is None:
lbaas_state = utils.get_lbaas_state(endpoints)
if not lbaas_state:
return
# NOTE(ivc): deleting pool deletes its members
self._drv_lbaas.release_loadbalancer(
loadbalancer=lbaas_state.loadbalancer)
if lbaas_state.service_pub_ip_info:
self._drv_service_pub_ip.release_pub_ip(
lbaas_state.service_pub_ip_info)

def _should_ignore(self, endpoints, lbaas_spec):
# NOTE(ltomasbo): we must wait until service handler has annotated the
# endpoints to process them. Thus, if annotations are not updated to
# match the endpoints information, we should skip the event
return not(lbaas_spec and
self._has_pods(endpoints) and
self._svc_handler_annotations_updated(endpoints,
lbaas_spec))

def _svc_handler_annotations_updated(self, endpoints, lbaas_spec):
svc_link = self._get_service_link(endpoints)
k8s = clients.get_kubernetes_client()
service = k8s.get(svc_link)
if utils.has_port_changes(service, lbaas_spec):
# NOTE(ltomasbo): Ensuring lbaas_spec annotated on the endpoints
# is in sync with the service status, i.e., upon a service
# modification it will ensure endpoint modifications are not
# handled until the service handler has performed its annotations
return False
return True
return False

def _has_pods(self, endpoints):
ep_subsets = endpoints.get('subsets', [])
@@ -283,327 +297,58 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
for address in subset.get('addresses', [])
if address.get('targetRef', {}).get('kind') == 'Pod')

def _sync_lbaas_members(self, endpoints, lbaas_state, lbaas_spec):
changed = False

if (self._has_pods(endpoints) and
self._remove_unused_members(endpoints, lbaas_state,
lbaas_spec)):
changed = True

if self._sync_lbaas_pools(endpoints, lbaas_state, lbaas_spec):
changed = True

if (self._has_pods(endpoints) and
self._add_new_members(endpoints, lbaas_state, lbaas_spec)):
changed = True

return changed

def _sync_lbaas_sgs(self, endpoints, lbaas_state):
svc_link = self._get_service_link(endpoints)
k8s = clients.get_kubernetes_client()
service = k8s.get(svc_link)

lb = lbaas_state.loadbalancer
# NOTE(maysams) It's possible that while the service annotation
# is added the backend pods on that service are not yet created
# resulting in no security groups retrieved for the service.
# Let's retrieve again to ensure is updated.
project_id = self._drv_project.get_project(service)
lb_sgs = self._drv_sg.get_security_groups(service, project_id)
lb.security_groups = lb_sgs

def _add_new_members(self, endpoints, lbaas_state, lbaas_spec):
changed = False

if config.CONF.octavia_defaults.enforce_sg_rules:
try:
self._sync_lbaas_sgs(endpoints, lbaas_state)
except k_exc.K8sResourceNotFound:
LOG.debug("The svc has been deleted while processing"
" the endpoints update. No need to add new"
" members.")

lsnr_by_id = {listener.id: listener
for listener in lbaas_state.listeners}
pool_by_lsnr_port = {(lsnr_by_id[p.listener_id].protocol,
lsnr_by_id[p.listener_id].port): p
for p in lbaas_state.pools}

# NOTE(yboaron): Since LBaaSv2 doesn't support UDP load balancing,
# the LBaaS driver will return 'None' in case of UDP port
# listener creation.
# we should consider the case in which
# 'pool_by_lsnr_port[p.protocol, p.port]' is missing
pool_by_tgt_name = {}
for p in lbaas_spec.ports:
try:
pool_by_tgt_name[p.name] = pool_by_lsnr_port[p.protocol,
p.port]
except KeyError:
continue
current_targets = {(str(m.ip), m.port, m.pool_id)
for m in lbaas_state.members}

for subset in endpoints.get('subsets', []):
subset_ports = subset.get('ports', [])
for subset_address in subset.get('addresses', []):
try:
target_ip = subset_address['ip']
target_ref = subset_address['targetRef']
if target_ref['kind'] != k_const.K8S_OBJ_POD:
continue
except KeyError:
continue
if not pool_by_tgt_name:
continue
for subset_port in subset_ports:
target_port = subset_port['port']
port_name = subset_port.get('name')
try:
pool = pool_by_tgt_name[port_name]
except KeyError:
LOG.debug("No pool found for port: %r", port_name)
continue

if (target_ip, target_port, pool.id) in current_targets:
continue
# TODO(apuimedo): Do not pass subnet_id at all when in
# L3 mode once old neutron-lbaasv2 is not supported, as
# octavia does not require it
if (config.CONF.octavia_defaults.member_mode ==
k_const.OCTAVIA_L2_MEMBER_MODE):
try:
member_subnet_id = self._get_pod_subnet(target_ref,
target_ip)
except k_exc.K8sResourceNotFound:
LOG.debug("Member namespace has been deleted. No "
"need to add the members as it is "
"going to be deleted")
continue
else:
# We use the service subnet id so that the connectivity
# from VIP to pods happens in layer 3 mode, i.e.,
# routed.
member_subnet_id = lbaas_state.loadbalancer.subnet_id
first_member_of_the_pool = True
for member in lbaas_state.members:
if pool.id == member.pool_id:
first_member_of_the_pool = False
break
if first_member_of_the_pool:
listener_port = lsnr_by_id[pool.listener_id].port
else:
listener_port = None

member = self._drv_lbaas.ensure_member(
loadbalancer=lbaas_state.loadbalancer,
pool=pool,
subnet_id=member_subnet_id,
ip=target_ip,
port=target_port,
target_ref_namespace=target_ref['namespace'],
target_ref_name=target_ref['name'],
listener_port=listener_port)
lbaas_state.members.append(member)
changed = True

return changed

def _get_pod_subnet(self, target_ref, ip):
# REVISIT(ivc): consider using true pod object instead
pod = {'kind': target_ref['kind'],
'metadata': {'name': target_ref['name'],
'namespace': target_ref['namespace']}}
project_id = self._drv_pod_project.get_project(pod)
subnets_map = self._drv_pod_subnets.get_subnets(pod, project_id)
subnet_ids = [subnet_id for subnet_id, network in subnets_map.items()
for subnet in network.subnets.objects
if ip in subnet.cidr]
if subnet_ids:
return subnet_ids[0]
else:
# NOTE(ltomasbo): We are assuming that if ip is not on the
# pod subnet is because the member is using hostnetworking. In
# this worker_nodes_subnet will be used
return config.CONF.pod_vif_nested.worker_nodes_subnet

def _get_port_in_pool(self, pool, lbaas_state, lbaas_spec):
for listener in lbaas_state.listeners:
if listener.id != pool.listener_id:
continue
for port in lbaas_spec.ports:
if (listener.port == port.port and
listener.protocol == port.protocol):
return port
return None
def _create_crd_spec(self, endpoints):
endpoints_name = endpoints['metadata']['name']
namespace = endpoints['metadata']['namespace']
kubernetes = clients.get_kubernetes_client()

subsets = endpoints.get('subsets', [])

loadbalancer_crd = {
'apiVersion': 'openstack.org/v1',
'kind': 'KuryrLoadBalancer',
'metadata': {
'name': endpoints_name,
'finalizers': [k_const.KURYRLB_FINALIZER]
},
'spec': {
'subsets': subsets
},
'status': {
}
}

def _remove_unused_members(self, endpoints, lbaas_state, lbaas_spec):
spec_ports = {}
for pool in lbaas_state.pools:
port = self._get_port_in_pool(pool, lbaas_state, lbaas_spec)
if port:
spec_ports[port.name] = pool.id

current_targets = {(a['ip'], a.get('targetRef', {}).get('name', ''),
p['port'], spec_ports.get(p.get('name')))
for s in endpoints['subsets']
for a in s['addresses']
for p in s['ports']
if p.get('name') in spec_ports}

removed_ids = set()
for member in lbaas_state.members:
try:
member_name = member.name
# NOTE: The member name is compose of:
# NAMESPACE_NAME/POD_NAME:PROTOCOL_PORT
pod_name = member_name.split('/')[1].split(':')[0]
except AttributeError:
pod_name = ""
if ((str(member.ip), pod_name, member.port, member.pool_id) in
current_targets):
continue
self._drv_lbaas.release_member(lbaas_state.loadbalancer,
member)
removed_ids.add(member.id)

if removed_ids:
lbaas_state.members = [m for m in lbaas_state.members
if m.id not in removed_ids]
return bool(removed_ids)

def _sync_lbaas_pools(self, endpoints, lbaas_state, lbaas_spec):
changed = False

if self._remove_unused_pools(lbaas_state, lbaas_spec):
changed = True

if self._sync_lbaas_listeners(endpoints, lbaas_state, lbaas_spec):
changed = True

if self._add_new_pools(lbaas_state, lbaas_spec):
changed = True

return changed

def _add_new_pools(self, lbaas_state, lbaas_spec):
changed = False

current_listeners_ids = {pool.listener_id
for pool in lbaas_state.pools}
for listener in lbaas_state.listeners:
if listener.id in current_listeners_ids:
continue
pool = self._drv_lbaas.ensure_pool(lbaas_state.loadbalancer,
listener)
lbaas_state.pools.append(pool)
changed = True

return changed

def _is_pool_in_spec(self, pool, lbaas_state, lbaas_spec):
# NOTE(yboaron): in order to check if a specific pool is in lbaas_spec
# we should:
# 1. get the listener that pool is attached to
# 2. check if listener's attributes appear in lbaas_spec.
for listener in lbaas_state.listeners:
if listener.id != pool.listener_id:
continue
for port in lbaas_spec.ports:
if (listener.port == port.port and
listener.protocol == port.protocol):
return True
return False

def _remove_unused_pools(self, lbaas_state, lbaas_spec):
removed_ids = set()
for pool in lbaas_state.pools:
if self._is_pool_in_spec(pool, lbaas_state, lbaas_spec):
continue
self._drv_lbaas.release_pool(lbaas_state.loadbalancer,
pool)
removed_ids.add(pool.id)
if removed_ids:
lbaas_state.pools = [p for p in lbaas_state.pools
if p.id not in removed_ids]
lbaas_state.members = [m for m in lbaas_state.members
if m.pool_id not in removed_ids]
return bool(removed_ids)

def _sync_lbaas_listeners(self, endpoints, lbaas_state, lbaas_spec):
changed = False

if self._remove_unused_listeners(endpoints, lbaas_state, lbaas_spec):
changed = True

if self._sync_lbaas_loadbalancer(endpoints, lbaas_state, lbaas_spec):
changed = True

if self._add_new_listeners(endpoints, lbaas_spec, lbaas_state):
changed = True

return changed

def _add_new_listeners(self, endpoints, lbaas_spec, lbaas_state):
changed = False
lbaas_spec_ports = sorted(lbaas_spec.ports, key=lambda x: x.protocol)
for port_spec in lbaas_spec_ports:
protocol = port_spec.protocol
port = port_spec.port
name = "%s:%s" % (lbaas_state.loadbalancer.name, protocol)
listener = [listener for listener in lbaas_state.listeners
if listener.port == port and
listener.protocol == protocol]
if listener:
continue
# FIXME (maysams): Due to a bug in Octavia, which does
# not allows listeners with same port but different
# protocols to co-exist, we need to skip the creation of
# listeners that have the same port as an existing one.
listener = [listener for listener in lbaas_state.listeners if
listener.port == port]
if listener and not self._drv_lbaas.double_listeners_supported():
LOG.warning("Skipping listener creation for %s as another one"
" already exists with port %s", name, port)
continue
listener = self._drv_lbaas.ensure_listener(
loadbalancer=lbaas_state.loadbalancer,
protocol=protocol,
port=port,
service_type=lbaas_spec.type)
if listener is not None:
lbaas_state.listeners.append(listener)
changed = True
return changed

def _remove_unused_listeners(self, endpoints, lbaas_state, lbaas_spec):
current_listeners = {p.listener_id for p in lbaas_state.pools}

removed_ids = set()
for listener in lbaas_state.listeners:
if listener.id in current_listeners:
continue
self._drv_lbaas.release_listener(lbaas_state.loadbalancer,
listener)
removed_ids.add(listener.id)
if removed_ids:
lbaas_state.listeners = [
listener for listener in lbaas_state.listeners
if listener.id not in removed_ids]
return bool(removed_ids)

def _update_lb_status(self, endpoints, lb_ip_address):
status_data = {"loadBalancer": {
"ingress": [{"ip": lb_ip_address.format()}]}}
k8s = clients.get_kubernetes_client()
svc_status_link = self._get_service_link(endpoints) + '/status'
try:
k8s.patch("status", svc_status_link, status_data)
kubernetes.post('{}/{}/kuryrloadbalancers'.format(
k_const.K8S_API_CRD_NAMESPACES, namespace), loadbalancer_crd)
except k_exc.K8sConflict:
raise k_exc.ResourceNotReady(loadbalancer_crd)
except k_exc.K8sClientException:
# REVISIT(ivc): only raise ResourceNotReady for NotFound
raise k_exc.ResourceNotReady(svc_status_link)
LOG.exception("Kubernetes Client Exception creating "
"kuryrloadbalancer CRD. %s" %
k_exc.K8sClientException)
raise
return loadbalancer_crd

def _update_crd_spec(self, loadbalancer_crd, endpoints):
kubernetes = clients.get_kubernetes_client()
subsets = endpoints.get('subsets')
lbaas_update_crd = {
'subsets': subsets
}
try:
kubernetes.patch_crd('spec', loadbalancer_crd['metadata'][
'selfLink'], lbaas_update_crd)
except k_exc.K8sResourceNotFound:
LOG.debug('KuryrLoadbalancer CRD not found %s', loadbalancer_crd)
except k_exc.K8sConflict:
raise k_exc.ResourceNotReady(loadbalancer_crd)
except k_exc.K8sClientException:
LOG.exception('Error updating KuryrLoadbalancer CRD %s',
loadbalancer_crd)
raise

return loadbalancer_crd

def _get_service_link(self, endpoints):
ep_link = endpoints['metadata']['selfLink']
@@ -612,129 +357,6 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
if link_parts[-2] != 'endpoints':
raise k_exc.IntegrityError(_(
"Unsupported endpoints link: %(link)s") % {
'link': ep_link})
'link': ep_link})
link_parts[-2] = 'services'
return "/".join(link_parts)

def _sync_lbaas_loadbalancer(self, endpoints, lbaas_state, lbaas_spec):
changed = False
lb = lbaas_state.loadbalancer

if lb and lb.ip != lbaas_spec.ip:
# if loadbalancerIP was associated to lbaas VIP, disassociate it.
if lbaas_state.service_pub_ip_info:
self._drv_service_pub_ip.disassociate_pub_ip(
lbaas_state.service_pub_ip_info)

self._drv_lbaas.release_loadbalancer(
loadbalancer=lb)
lb = None
lbaas_state.pools = []
lbaas_state.listeners = []
lbaas_state.members = []
changed = True

if not lb:
if lbaas_spec.ip:
lb_name = self._drv_lbaas.get_service_loadbalancer_name(
endpoints['metadata']['namespace'],
endpoints['metadata']['name'])
lb = self._drv_lbaas.ensure_loadbalancer(
name=lb_name,
project_id=lbaas_spec.project_id,
subnet_id=lbaas_spec.subnet_id,
ip=lbaas_spec.ip,
security_groups_ids=lbaas_spec.security_groups_ids,
service_type=lbaas_spec.type,
provider=self._lb_provider)
changed = True
elif lbaas_state.service_pub_ip_info:
self._drv_service_pub_ip.release_pub_ip(
lbaas_state.service_pub_ip_info)
lbaas_state.service_pub_ip_info = None
changed = True

lbaas_state.loadbalancer = lb
return changed

def _cleanup_leftover_lbaas(self):
lbaas_client = clients.get_loadbalancer_client()
services = []
try:
services = driver_utils.get_services().get('items')
except k_exc.K8sClientException:
LOG.debug("Skipping cleanup of leftover lbaas. "
"Error retriving Kubernetes services")
return
services_cluster_ip = {service['spec']['clusterIP']: service
for service in services
if service['spec'].get('clusterIP')}

services_without_selector = set(
service['spec']['clusterIP'] for service in services
if (service['spec'].get('clusterIP') and
not service['spec'].get('selector')))
lbaas_spec = {}
self._drv_lbaas.add_tags('loadbalancer', lbaas_spec)
loadbalancers = lbaas_client.load_balancers(**lbaas_spec)
for loadbalancer in loadbalancers:
if loadbalancer.vip_address not in services_cluster_ip.keys():
lb_obj = obj_lbaas.LBaaSLoadBalancer(**loadbalancer)
eventlet.spawn(self._ensure_release_lbaas, lb_obj)
else:
# check if the provider is the right one
if (loadbalancer.vip_address not in services_without_selector
and self._lb_provider
and self._lb_provider != loadbalancer.provider):
LOG.debug("Removing loadbalancer with old provider: %s",
loadbalancer)
lb_obj = obj_lbaas.LBaaSLoadBalancer(**loadbalancer)
eventlet.spawn(
self._ensure_release_lbaas,
lb_obj,
services_cluster_ip[loadbalancer.vip_address])
# NOTE(ltomasbo): give some extra time in between lbs
# recreation actions
time.sleep(1)

def _ensure_release_lbaas(self, lb_obj, svc=None):
attempts = 0
deadline = 0
retry = True
timeout = config.CONF.kubernetes.watch_retry_timeout
while retry:
try:
if attempts == 1:
deadline = time.time() + timeout
if (attempts > 0 and
utils.exponential_sleep(deadline, attempts) == 0):
LOG.error("Failed releasing lbaas '%s': deadline exceeded",
lb_obj.name)
return
self._drv_lbaas.release_loadbalancer(lb_obj)
retry = False
except k_exc.ResourceNotReady:
LOG.debug("Attempt (%s) of loadbalancer release %s failed."
" A retry will be triggered.", attempts,
lb_obj.name)
attempts += 1
retry = True
if svc:
endpoints_link = utils.get_endpoints_link(svc)
k8s = clients.get_kubernetes_client()
try:
endpoints = k8s.get(endpoints_link)
except k_exc.K8sResourceNotFound:
LOG.debug("Endpoint not Found.")
return

lbaas = utils.get_lbaas_state(endpoints)
if lbaas:
lbaas.loadbalancer = None
lbaas.pools = []
lbaas.listeners = []
lbaas.members = []
# NOTE(ltomasbo): give some extra time to ensure the Load
# Balancer VIP is also released
time.sleep(1)
utils.set_lbaas_state(endpoints, lbaas)

+ 810
- 0
kuryr_kubernetes/controller/handlers/loadbalancer.py View File

@@ -0,0 +1,810 @@
# Copyright (c) 2020 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import eventlet
import time

from oslo_log import log as logging

from kuryr_kubernetes import clients
from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes.controller.drivers import base as drv_base
from kuryr_kubernetes.controller.drivers import utils as driver_utils
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes.objects import lbaas as obj_lbaas
from kuryr_kubernetes import utils

LOG = logging.getLogger(__name__)

SUPPORTED_SERVICE_TYPES = ('ClusterIP', 'LoadBalancer')


class KuryrLoadBalancerHandler(k8s_base.ResourceEventHandler):
"""LoadBalancerStatusHandler handles K8s Endpoints events.

LBStatusHandler handles K8s Endpoints events and tracks changes in
LBaaSServiceSpec to update Neutron LBaaS accordingly and to reflect its'
actual state in LBaaSState.
"""

OBJECT_KIND = k_const.K8S_OBJ_KURYRLOADBALANCER
OBJECT_WATCH_PATH = k_const.K8S_API_CRD_KURYRLOADBALANCERS

def __init__(self):
super(KuryrLoadBalancerHandler, self).__init__()
self._drv_lbaas = drv_base.LBaaSDriver.get_instance()
self._drv_pod_project = drv_base.PodProjectDriver.get_instance()
self._drv_pod_subnets = drv_base.PodSubnetsDriver.get_instance()
self._drv_service_pub_ip = drv_base.ServicePubIpDriver.get_instance()
# Note(yboaron) LBaaS driver supports 'provider' parameter in
# Load Balancer creation flow.
# We need to set the requested load balancer provider
# according to 'endpoints_driver_octavia_provider' configuration.
self._lb_provider = None
if (config.CONF.kubernetes.endpoints_driver_octavia_provider
!= 'default'):
self._lb_provider = (
config.CONF.kubernetes.endpoints_driver_octavia_provider)
eventlet.spawn(self._cleanup_leftover_lbaas)

def on_present(self, loadbalancer_crd):
if self._should_ignore(loadbalancer_crd):
LOG.debug("Ignoring Kubernetes service %s",
loadbalancer_crd['metadata']['name'])
return

try:
name = loadbalancer_crd['metadata']['name']
namespace = loadbalancer_crd['metadata']['namespace']
self._get_loadbalancer_crd(name, namespace)
except k_exc.K8sResourceNotFound:
LOG.debug('KuryrLoadbalancer CRD not found %s',
loadbalancer_crd)
except KeyError:
LOG.debug('KuryrLoadbalancer CRD not found')
raise k_exc.ResourceNotReady(loadbalancer_crd)

if self._sync_lbaas_members(loadbalancer_crd):
# Note(yboaron) For LoadBalancer services, we should allocate FIP,
# associate it to LB VIP and update K8S service status
# if loadbalancer_crd['status'].get('service_pub_ip_info') is None:
lb_ip = loadbalancer_crd['spec'].get('lb_ip')
pub_info = loadbalancer_crd['status'].get(
'service_pub_ip_info')
if pub_info is None:
service_pub_ip_info = (
self._drv_service_pub_ip.acquire_service_pub_ip_info(
loadbalancer_crd['spec']['type'],
lb_ip,
loadbalancer_crd['spec']['project_id'],
loadbalancer_crd['status']['loadbalancer'][
'port_id']))
if service_pub_ip_info:
self._drv_service_pub_ip.associate_pub_ip(
service_pub_ip_info, loadbalancer_crd['status'][
'loadbalancer']['port_id'])
loadbalancer_crd['status'][
'service_pub_ip_info'] = service_pub_ip_info
self._update_lb_status(loadbalancer_crd)
kubernetes = clients.get_kubernetes_client()
try:
kubernetes.patch_crd('status', loadbalancer_crd[
'metadata']['selfLink'], loadbalancer_crd[
'status'])
except k_exc.K8sResourceNotFound:
LOG.debug('KuryrLoadbalancer CRD not found %s',
loadbalancer_crd)
except k_exc.K8sClientException:
LOG.exception('Error updating KuryLoadbalancer CRD %s',
loadbalancer_crd)
raise

def _should_ignore(self, loadbalancer_crd):
return not(self._has_pods(loadbalancer_crd))

def _has_pods(self, loadbalancer_crd):
ep_subsets = loadbalancer_crd['spec'].get('subsets', [])
if not ep_subsets:
return False
return any(True
for subset in ep_subsets
for address in subset.get('addresses', [])
if address['targetRef'].get('kind', []) == 'Pod')

def on_finalize(self, loadbalancer_crd):
LOG.debug("Deleting the loadbalancer CRD")

if not loadbalancer_crd:
LOG.warning("Load Balancer CRD not present")
return

if loadbalancer_crd['status'] != {}:
# NOTE(ivc): deleting pool deletes its members
self._drv_lbaas.release_loadbalancer(
loadbalancer=loadbalancer_crd['status'].get('loadbalancer'))

try:
pub_info = loadbalancer_crd['status']['service_pub_ip_info']
except KeyError:
pub_info = None

if pub_info:
self._drv_service_pub_ip.release_pub_ip(
loadbalancer_crd['status']['service_pub_ip_info'])

kubernetes = clients.get_kubernetes_client()
LOG.debug('Removing finalizer from KuryrLoadBalancer CRD %s',
loadbalancer_crd)
try:
kubernetes.remove_finalizer(loadbalancer_crd,
k_const.KURYRLB_FINALIZER)
except k_exc.K8sClientException:
LOG.exception('Error removing kuryrloadbalancer CRD finalizer'
'for %s', loadbalancer_crd)
raise

namespace = loadbalancer_crd['metadata']['namespace']
name = loadbalancer_crd['metadata']['name']
try:
service = kubernetes.get(f"{k_const.K8S_API_NAMESPACES}"
f"/{namespace}/services/{name}")
except k_exc.K8sResourceNotFound as ex:
LOG.exception("Failed to get service: %s", ex)
raise

LOG.debug('Removing finalizer from service %s',
service["metadata"]["name"])
try:
kubernetes.remove_finalizer(service, k_const.SERVICE_FINALIZER)
except k_exc.K8sClientException:
LOG.exception('Error removing service finalizer'
'for %s', service["metadata"]["name"])
raise

def _get_loadbalancer_crd(self, loadbalancer_crd_name, namespace):
k8s = clients.get_kubernetes_client()
try:
loadbalancer_crd = k8s.get('{}/{}/kuryrloadbalancers/{}'.format(
k_const.K8S_API_CRD_NAMESPACES, namespace,
loadbalancer_crd_name))
except k_exc.K8sResourceNotFound:
return None
except k_exc.K8sClientException:
LOG.exception("Kubernetes Client Exception.")
raise
return loadbalancer_crd

def _sync_lbaas_members(self, loadbalancer_crd):
changed = False

if (self._has_pods(loadbalancer_crd) and
self._remove_unused_members(loadbalancer_crd)):
changed = True

if self._sync_lbaas_pools(loadbalancer_crd):
changed = True

if (self._has_pods(loadbalancer_crd) and
self._add_new_members(loadbalancer_crd)):
changed = True

return changed

def _sync_lbaas_sgs(self, loadbalancer_crd):
# NOTE (maysams) Need to retrieve the LBaaS Spec again due to
# the possibility of it being updated after the LBaaS creation
# process has started.
lbaas_spec = loadbalancer_crd.get('spec')

lb = loadbalancer_crd['status'].get('loadbalancer')
if not lb:
return

default_sgs = config.CONF.neutron_defaults.pod_security_groups
# NOTE(maysams) As the endpoint and svc are annotated with the
# 'lbaas_spec' in two separate k8s calls, it's possible that
# the endpoint got annotated and the svc haven't due to controller
# restarts. For this case, a resourceNotReady exception is raised
# till the svc gets annotated with a 'lbaas_spec'.
if lbaas_spec:
lbaas_spec_sgs = loadbalancer_crd['spec'].get(
'security_groups_ids', [])
else:
raise k_exc.ResourceNotReady(lbaas_spec_sgs)
if (lb.get('security_groups') and
lb.get('security_groups') != lbaas_spec_sgs):
sgs = [lb_sg for lb_sg in lb['security_groups']
if lb_sg not in default_sgs]
if lbaas_spec_sgs != default_sgs:
sgs.extend(lbaas_spec_sgs)

# Check if this should update the CRD
lb['security_groups'] = sgs

def _add_new_members(self, loadbalancer_crd):
changed = False
try:
self._sync_lbaas_sgs(loadbalancer_crd)
except k_exc.K8sResourceNotFound:
LOG.debug("The svc has been deleted while processing the endpoints"
" update. No need to add new members.")

lsnr_by_id = {l['id']: l for l in loadbalancer_crd['status'].get(
'listeners', [])}
pool_by_lsnr_port = {(lsnr_by_id[p['listener_id']]['protocol'],
lsnr_by_id[p['listener_id']]['port']): p
for p in loadbalancer_crd['status'].get(
'pools', [])}

# NOTE(yboaron): Since LBaaSv2 doesn't support UDP load balancing,
# the LBaaS driver will return 'None' in case of UDP port
# listener creation.
# we should consider the case in which
# 'pool_by_lsnr_port[p.protocol, p.port]' is missing
pool_by_tgt_name = {}
for p in loadbalancer_crd['spec'].get('ports', []):
try:
pool_by_tgt_name[p['name']] = pool_by_lsnr_port[p['protocol'],
p['port']]
except KeyError:
continue

current_targets = {(str(m['ip']), m['port'], m['pool_id'])
for m in loadbalancer_crd['status'].get(
'members', [])}

for subset in loadbalancer_crd['spec']['subsets']:
subset_ports = subset.get('ports', [])
for subset_address in subset.get('addresses', []):
try:
target_ip = subset_address['ip']
target_ref = subset_address['targetRef']
if target_ref['kind'] != k_const.K8S_OBJ_POD:
continue
except KeyError:
continue
if not pool_by_tgt_name:
continue
for subset_port in subset_ports:
target_port = subset_port['port']
port_name = subset_port.get('name')
try:
pool = pool_by_tgt_name[port_name]
except KeyError:
LOG.debug("No pool found for port: %r", port_name)
continue

if (target_ip, target_port, pool['id']) in current_targets:
continue
# TODO(apuimedo): Do not pass subnet_id at all when in
# L3 mode once old neutron-lbaasv2 is not supported, as
# octavia does not require it
if (config.CONF.octavia_defaults.member_mode ==
k_const.OCTAVIA_L2_MEMBER_MODE):
try:
member_subnet_id = self._get_pod_subnet(target_ref,
target_ip)
except k_exc.K8sResourceNotFound:
LOG.debug("Member namespace has been deleted. No "
"need to add the members as it is "
"going to be deleted")
continue
else:
# We use the service subnet id so that the connectivity
# from VIP to pods happens in layer 3 mode, i.e.,
# routed.
member_subnet_id = loadbalancer_crd['status'][
'loadbalancer']['subnet_id']
first_member_of_the_pool = True
for member in loadbalancer_crd['status'].get(
'members', []):
if pool['id'] == member['pool_id']:
first_member_of_the_pool = False
break
if first_member_of_the_pool:
listener_port = lsnr_by_id[pool['listener_id']][
'port']
else:
listener_port = None
loadbalancer = loadbalancer_crd['status']['loadbalancer']
member = self._drv_lbaas.ensure_member(
loadbalancer=loadbalancer,
pool=pool,
subnet_id=member_subnet_id,
ip=target_ip,
port=target_port,
target_ref_namespace=target_ref['namespace'],
target_ref_name=target_ref['name'],
listener_port=listener_port)
members = loadbalancer_crd['status'].get('members', [])
if members:
loadbalancer_crd['status'].get('members', []).append(
member)
else:
loadbalancer_crd['status']['members'] = []
loadbalancer_crd['status'].get('members', []).append(
member)
kubernetes = clients.get_kubernetes_client()
try:
kubernetes.patch_crd('status', loadbalancer_crd[
'metadata']['selfLink'], loadbalancer_crd[
'status'])
except k_exc.K8sResourceNotFound:
LOG.debug('KuryrLoadbalancer CRD not found %s',
loadbalancer_crd)
except k_exc.K8sClientException:
LOG.exception('Error updating KuryLoadbalancer CRD %s',
loadbalancer_crd)
raise
changed = True
return changed

def _get_pod_subnet(self, target_ref, ip):
# REVISIT(ivc): consider using true pod object instead
pod = {'kind': target_ref['kind'],
'metadata': {'name': target_ref['name'],
'namespace': target_ref['namespace']}}
project_id = self._drv_pod_project.get_project(pod)
subnets_map = self._drv_pod_subnets.get_subnets(pod, project_id)
subnet_ids = [subnet_id for subnet_id, network in subnets_map.items()
for subnet in network.subnets.objects
if ip in subnet.cidr]
if subnet_ids:
return subnet_ids[0]