K8s Services support: LoadBalancerHandler
This patch implements LoadBalancerHandler that handles K8s Endpoints events and tracks changes in LBaaSServiceSpec to update Neutron LBaaS accordingly and to reflect its' actual state in LBaaSState. Change-Id: I718daf6d3def981c1bde5ca9831f955766935fbd Partially-Implements: blueprint kuryr-k8s-integration
This commit is contained in:
parent
b0072c5816
commit
e3209864dc
@ -63,6 +63,10 @@ k8s_opts = [
|
||||
cfg.StrOpt('pod_vif_driver',
|
||||
help=_("The driver that provides VIFs for Kubernetes Pods."),
|
||||
default='generic'),
|
||||
cfg.StrOpt('endpoints_lbaas_driver',
|
||||
help=_("The driver that provides LoadBalancers for Kubernetes "
|
||||
"Endpoints"),
|
||||
default='lbaasv2'),
|
||||
]
|
||||
|
||||
neutron_defaults = [
|
||||
|
@ -26,6 +26,7 @@ K8S_POD_STATUS_PENDING = 'Pending'
|
||||
K8S_ANNOTATION_PREFIX = 'openstack.org/kuryr'
|
||||
K8S_ANNOTATION_VIF = K8S_ANNOTATION_PREFIX + '-vif'
|
||||
K8S_ANNOTATION_LBAAS_SPEC = K8S_ANNOTATION_PREFIX + '-lbaas-spec'
|
||||
K8S_ANNOTATION_LBAAS_STATE = K8S_ANNOTATION_PREFIX + '-lbaas-state'
|
||||
|
||||
K8S_OS_VIF_NOOP_PLUGIN = "noop"
|
||||
|
||||
|
@ -257,7 +257,7 @@ class PodVIFDriver(DriverBase):
|
||||
class LBaaSDriver(DriverBase):
|
||||
"""Manages Neutron/Octavia load balancer to support Kubernetes Services."""
|
||||
|
||||
ALIAS = 'lbaas'
|
||||
ALIAS = 'endpoints_lbaas'
|
||||
|
||||
@abc.abstractmethod
|
||||
def ensure_loadbalancer(self, endpoints, project_id, subnet_id, ip,
|
||||
|
@ -53,7 +53,6 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler):
|
||||
spec = service['spec']
|
||||
if spec.get('type') == 'ClusterIP':
|
||||
return spec.get('clusterIP')
|
||||
return None
|
||||
|
||||
def _get_subnet_id(self, service, project_id, ip):
|
||||
subnets_mapping = self._drv_subnets.get_subnets(service, project_id)
|
||||
@ -181,3 +180,315 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler):
|
||||
obj = obj_lbaas.LBaaSServiceSpec.obj_from_primitive(obj_dict)
|
||||
LOG.debug("Got LBaaSServiceSpec from annotation: %r", obj)
|
||||
return obj
|
||||
|
||||
|
||||
class LoadBalancerHandler(k8s_base.ResourceEventHandler):
|
||||
"""LoadBalancerHandler handles K8s Endpoints events.
|
||||
|
||||
LoadBalancerHandler handles K8s Endpoints events and tracks changes in
|
||||
LBaaSServiceSpec to update Neutron LBaaS accordingly and to reflect its'
|
||||
actual state in LBaaSState.
|
||||
"""
|
||||
|
||||
OBJECT_KIND = k_const.K8S_OBJ_ENDPOINTS
|
||||
|
||||
def __init__(self):
|
||||
self._drv_lbaas = drv_base.LBaaSDriver.get_instance()
|
||||
self._drv_pod_project = drv_base.PodProjectDriver.get_instance()
|
||||
self._drv_pod_subnets = drv_base.PodSubnetsDriver.get_instance()
|
||||
|
||||
def on_present(self, endpoints):
|
||||
lbaas_spec = self._get_lbaas_spec(endpoints)
|
||||
if self._should_ignore(endpoints, lbaas_spec):
|
||||
return
|
||||
|
||||
lbaas_state = self._get_lbaas_state(endpoints)
|
||||
if not lbaas_state:
|
||||
lbaas_state = obj_lbaas.LBaaSState()
|
||||
|
||||
if self._sync_lbaas_members(endpoints, lbaas_state, lbaas_spec):
|
||||
# REVISIT(ivc): since _sync_lbaas_members is responsible for
|
||||
# creating all lbaas components (i.e. load balancer, listeners,
|
||||
# pools, members), it is currently possible for it to fail (due
|
||||
# to invalid Kuryr/K8s/Neutron configuration, e.g. Members' IPs
|
||||
# not belonging to configured Neutron subnet or Service IP being
|
||||
# in use by gateway or VMs) leaving some Neutron entities without
|
||||
# properly updating annotation. Some sort of failsafe mechanism is
|
||||
# required to deal with such situations (e.g. cleanup, or skip
|
||||
# failing items, or validate configuration) to prevent annotation
|
||||
# being out of sync with the actual Neutron state.
|
||||
self._set_lbaas_state(endpoints, lbaas_state)
|
||||
|
||||
def on_deleted(self, endpoints):
|
||||
lbaas_state = self._get_lbaas_state(endpoints)
|
||||
if not lbaas_state:
|
||||
return
|
||||
# NOTE(ivc): deleting pool deletes its members
|
||||
lbaas_state.members = []
|
||||
self._sync_lbaas_members(endpoints, lbaas_state,
|
||||
obj_lbaas.LBaaSServiceSpec())
|
||||
|
||||
def _should_ignore(self, endpoints, lbaas_spec):
|
||||
return not(lbaas_spec and
|
||||
self._has_pods(endpoints) and
|
||||
self._is_lbaas_spec_in_sync(endpoints, lbaas_spec))
|
||||
|
||||
def _is_lbaas_spec_in_sync(self, endpoints, lbaas_spec):
|
||||
# REVISIT(ivc): consider other options instead of using 'name'
|
||||
ep_ports = list(set(port.get('name')
|
||||
for subset in endpoints.get('subsets', [])
|
||||
for port in subset.get('ports', [])))
|
||||
spec_ports = [port.name for port in lbaas_spec.ports]
|
||||
|
||||
return sorted(ep_ports) == sorted(spec_ports)
|
||||
|
||||
def _has_pods(self, endpoints):
|
||||
return any(True
|
||||
for subset in endpoints.get('subsets', [])
|
||||
for address in subset.get('addresses', [])
|
||||
if address.get('targetRef', {}).get('kind') == 'Pod')
|
||||
|
||||
def _sync_lbaas_members(self, endpoints, lbaas_state, lbaas_spec):
|
||||
changed = False
|
||||
|
||||
if self._remove_unused_members(endpoints, lbaas_state, lbaas_spec):
|
||||
changed = True
|
||||
|
||||
if self._sync_lbaas_pools(endpoints, lbaas_state, lbaas_spec):
|
||||
changed = True
|
||||
|
||||
if self._add_new_members(endpoints, lbaas_state, lbaas_spec):
|
||||
changed = True
|
||||
|
||||
return changed
|
||||
|
||||
def _add_new_members(self, endpoints, lbaas_state, lbaas_spec):
|
||||
changed = False
|
||||
|
||||
lsnr_by_id = {l.id: l for l in lbaas_state.listeners}
|
||||
pool_by_lsnr_port = {(lsnr_by_id[p.listener_id].protocol,
|
||||
lsnr_by_id[p.listener_id].port): p
|
||||
for p in lbaas_state.pools}
|
||||
pool_by_tgt_name = {p.name: pool_by_lsnr_port[p.protocol, p.port]
|
||||
for p in lbaas_spec.ports}
|
||||
current_targets = {(str(m.ip), m.port) for m in lbaas_state.members}
|
||||
|
||||
for subset in endpoints.get('subsets', []):
|
||||
subset_ports = subset.get('ports', [])
|
||||
for subset_address in subset.get('addresses', []):
|
||||
try:
|
||||
target_ip = subset_address['ip']
|
||||
target_ref = subset_address['targetRef']
|
||||
if target_ref['kind'] != k_const.K8S_OBJ_POD:
|
||||
continue
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
for subset_port in subset_ports:
|
||||
target_port = subset_port['port']
|
||||
if (target_ip, target_port) in current_targets:
|
||||
continue
|
||||
port_name = subset_port.get('name')
|
||||
pool = pool_by_tgt_name[port_name]
|
||||
target_subnet_id = self._get_pod_subnet(target_ref,
|
||||
target_ip)
|
||||
member = self._drv_lbaas.ensure_member(
|
||||
endpoints=endpoints,
|
||||
loadbalancer=lbaas_state.loadbalancer,
|
||||
pool=pool,
|
||||
subnet_id=target_subnet_id,
|
||||
ip=target_ip,
|
||||
port=target_port,
|
||||
target_ref=target_ref)
|
||||
lbaas_state.members.append(member)
|
||||
changed = True
|
||||
|
||||
return changed
|
||||
|
||||
def _get_pod_subnet(self, target_ref, ip):
|
||||
# REVISIT(ivc): consider using true pod object instead
|
||||
pod = {'kind': target_ref['kind'],
|
||||
'metadata': {'name': target_ref['name'],
|
||||
'namespace': target_ref['namespace']}}
|
||||
project_id = self._drv_pod_project.get_project(pod)
|
||||
subnets_map = self._drv_pod_subnets.get_subnets(pod, project_id)
|
||||
# FIXME(ivc): potentially unsafe [0] index
|
||||
return [subnet_id for subnet_id, network in six.iteritems(subnets_map)
|
||||
for subnet in network.subnets.objects
|
||||
if ip in subnet.cidr][0]
|
||||
|
||||
def _remove_unused_members(self, endpoints, lbaas_state, lbaas_spec):
|
||||
spec_port_names = {p.name for p in lbaas_spec.ports}
|
||||
current_targets = {(a['ip'], p['port'])
|
||||
for s in endpoints['subsets']
|
||||
for a in s['addresses']
|
||||
for p in s['ports']
|
||||
if p.get('name') in spec_port_names}
|
||||
removed_ids = set()
|
||||
for member in lbaas_state.members:
|
||||
if (str(member.ip), member.port) in current_targets:
|
||||
continue
|
||||
self._drv_lbaas.release_member(endpoints,
|
||||
lbaas_state.loadbalancer,
|
||||
member)
|
||||
removed_ids.add(member.id)
|
||||
if removed_ids:
|
||||
lbaas_state.members = [m for m in lbaas_state.members
|
||||
if m.id not in removed_ids]
|
||||
return bool(removed_ids)
|
||||
|
||||
def _sync_lbaas_pools(self, endpoints, lbaas_state, lbaas_spec):
|
||||
changed = False
|
||||
|
||||
if self._remove_unused_pools(endpoints, lbaas_state, lbaas_spec):
|
||||
changed = True
|
||||
|
||||
if self._sync_lbaas_listeners(endpoints, lbaas_state, lbaas_spec):
|
||||
changed = True
|
||||
|
||||
if self._add_new_pools(endpoints, lbaas_state, lbaas_spec):
|
||||
changed = True
|
||||
|
||||
return changed
|
||||
|
||||
def _add_new_pools(self, endpoints, lbaas_state, lbaas_spec):
|
||||
changed = False
|
||||
|
||||
current_listeners_ids = {pool.listener_id
|
||||
for pool in lbaas_state.pools}
|
||||
for listener in lbaas_state.listeners:
|
||||
if listener.id in current_listeners_ids:
|
||||
continue
|
||||
pool = self._drv_lbaas.ensure_pool(endpoints,
|
||||
lbaas_state.loadbalancer,
|
||||
listener)
|
||||
lbaas_state.pools.append(pool)
|
||||
changed = True
|
||||
|
||||
return changed
|
||||
|
||||
def _remove_unused_pools(self, endpoints, lbaas_state, lbaas_spec):
|
||||
current_pools = {m.pool_id for m in lbaas_state.members}
|
||||
removed_ids = set()
|
||||
for pool in lbaas_state.pools:
|
||||
if pool.id in current_pools:
|
||||
continue
|
||||
self._drv_lbaas.release_pool(endpoints,
|
||||
lbaas_state.loadbalancer,
|
||||
pool)
|
||||
removed_ids.add(pool.id)
|
||||
if removed_ids:
|
||||
lbaas_state.pools = [p for p in lbaas_state.pools
|
||||
if p.id not in removed_ids]
|
||||
return bool(removed_ids)
|
||||
|
||||
def _sync_lbaas_listeners(self, endpoints, lbaas_state, lbaas_spec):
|
||||
changed = False
|
||||
|
||||
if self._remove_unused_listeners(endpoints, lbaas_state, lbaas_spec):
|
||||
changed = True
|
||||
|
||||
if self._sync_lbaas_loadbalancer(endpoints, lbaas_state, lbaas_spec):
|
||||
changed = True
|
||||
|
||||
if self._add_new_listeners(endpoints, lbaas_spec, lbaas_state):
|
||||
changed = True
|
||||
|
||||
return changed
|
||||
|
||||
def _add_new_listeners(self, endpoints, lbaas_spec, lbaas_state):
|
||||
changed = False
|
||||
current_port_tuples = {(listener.protocol, listener.port)
|
||||
for listener in lbaas_state.listeners}
|
||||
for port_spec in lbaas_spec.ports:
|
||||
protocol = port_spec.protocol
|
||||
port = port_spec.port
|
||||
if (protocol, port) in current_port_tuples:
|
||||
continue
|
||||
|
||||
listener = self._drv_lbaas.ensure_listener(
|
||||
endpoints=endpoints,
|
||||
loadbalancer=lbaas_state.loadbalancer,
|
||||
protocol=protocol,
|
||||
port=port)
|
||||
lbaas_state.listeners.append(listener)
|
||||
changed = True
|
||||
return changed
|
||||
|
||||
def _remove_unused_listeners(self, endpoints, lbaas_state, lbaas_spec):
|
||||
current_listeners = {p.listener_id for p in lbaas_state.pools}
|
||||
|
||||
removed_ids = set()
|
||||
for listener in lbaas_state.listeners:
|
||||
if listener.id in current_listeners:
|
||||
continue
|
||||
self._drv_lbaas.release_listener(endpoints,
|
||||
lbaas_state.loadbalancer,
|
||||
listener)
|
||||
removed_ids.add(listener.id)
|
||||
if removed_ids:
|
||||
lbaas_state.listeners = [l for l in lbaas_state.listeners
|
||||
if l.id not in removed_ids]
|
||||
return bool(removed_ids)
|
||||
|
||||
def _sync_lbaas_loadbalancer(self, endpoints, lbaas_state, lbaas_spec):
|
||||
changed = False
|
||||
lb = lbaas_state.loadbalancer
|
||||
|
||||
if lb and lb.ip != lbaas_spec.ip:
|
||||
self._drv_lbaas.release_loadbalancer(
|
||||
endpoints=endpoints,
|
||||
loadbalancer=lb)
|
||||
lb = None
|
||||
changed = True
|
||||
|
||||
if not lb and lbaas_spec.ip:
|
||||
lb = self._drv_lbaas.ensure_loadbalancer(
|
||||
endpoints=endpoints,
|
||||
project_id=lbaas_spec.project_id,
|
||||
subnet_id=lbaas_spec.subnet_id,
|
||||
ip=lbaas_spec.ip,
|
||||
security_groups_ids=lbaas_spec.security_groups_ids)
|
||||
changed = True
|
||||
|
||||
lbaas_state.loadbalancer = lb
|
||||
return changed
|
||||
|
||||
def _get_lbaas_spec(self, endpoints):
|
||||
# TODO(ivc): same as '_get_lbaas_state'
|
||||
try:
|
||||
annotations = endpoints['metadata']['annotations']
|
||||
annotation = annotations[k_const.K8S_ANNOTATION_LBAAS_SPEC]
|
||||
except KeyError:
|
||||
return None
|
||||
obj_dict = jsonutils.loads(annotation)
|
||||
obj = obj_lbaas.LBaaSServiceSpec.obj_from_primitive(obj_dict)
|
||||
LOG.debug("Got LBaaSServiceSpec from annotation: %r", obj)
|
||||
return obj
|
||||
|
||||
def _set_lbaas_state(self, endpoints, lbaas_state):
|
||||
# TODO(ivc): extract annotation interactions
|
||||
if lbaas_state is None:
|
||||
LOG.debug("Removing LBaaSState annotation: %r", lbaas_state)
|
||||
annotation = None
|
||||
else:
|
||||
lbaas_state.obj_reset_changes(recursive=True)
|
||||
LOG.debug("Setting LBaaSState annotation: %r", lbaas_state)
|
||||
annotation = jsonutils.dumps(lbaas_state.obj_to_primitive(),
|
||||
sort_keys=True)
|
||||
k8s = clients.get_kubernetes_client()
|
||||
k8s.annotate(endpoints['metadata']['selfLink'],
|
||||
{k_const.K8S_ANNOTATION_LBAAS_STATE: annotation},
|
||||
resource_version=endpoints['metadata']['resourceVersion'])
|
||||
|
||||
def _get_lbaas_state(self, endpoints):
|
||||
# TODO(ivc): same as '_set_lbaas_state'
|
||||
try:
|
||||
annotations = endpoints['metadata']['annotations']
|
||||
annotation = annotations[k_const.K8S_ANNOTATION_LBAAS_STATE]
|
||||
except KeyError:
|
||||
return None
|
||||
obj_dict = jsonutils.loads(annotation)
|
||||
obj = obj_lbaas.LBaaSState.obj_from_primitive(obj_dict)
|
||||
LOG.debug("Got LBaaSState from annotation: %r", obj)
|
||||
return obj
|
||||
|
@ -45,6 +45,7 @@ class KuryrK8sService(service.Service):
|
||||
self.watcher.add("%s/%s" % (constants.K8S_API_BASE, resource))
|
||||
pipeline.register(h_vif.VIFHandler())
|
||||
pipeline.register(h_lbaas.LBaaSSpecHandler())
|
||||
pipeline.register(h_lbaas.LoadBalancerHandler())
|
||||
|
||||
def start(self):
|
||||
LOG.info("Service '%s' starting", self.__class__.__name__)
|
||||
|
@ -13,10 +13,13 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
import mock
|
||||
import os_vif.objects.network as osv_network
|
||||
import os_vif.objects.subnet as osv_subnet
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from kuryr_kubernetes import constants as k_const
|
||||
from kuryr_kubernetes.controller.drivers import base as drv_base
|
||||
from kuryr_kubernetes.controller.handlers import lbaas as h_lbaas
|
||||
from kuryr_kubernetes import exceptions as k_exc
|
||||
@ -321,3 +324,303 @@ class TestLBaaSSpecHandler(test_base.TestCase):
|
||||
def test_get_lbaas_spec(self):
|
||||
self.skipTest("skipping until generalised annotation handling is "
|
||||
"implemented")
|
||||
|
||||
|
||||
class FakeLBaaSDriver(drv_base.LBaaSDriver):
|
||||
def ensure_loadbalancer(self, endpoints, project_id, subnet_id, ip,
|
||||
security_groups_ids):
|
||||
name = str(ip)
|
||||
return obj_lbaas.LBaaSLoadBalancer(name=name,
|
||||
project_id=project_id,
|
||||
subnet_id=subnet_id,
|
||||
ip=ip,
|
||||
id=uuidutils.generate_uuid())
|
||||
|
||||
def ensure_listener(self, endpoints, loadbalancer, protocol, port):
|
||||
name = "%s:%s:%s" % (loadbalancer.name, protocol, port)
|
||||
return obj_lbaas.LBaaSListener(name=name,
|
||||
project_id=loadbalancer.project_id,
|
||||
loadbalancer_id=loadbalancer.id,
|
||||
protocol=protocol,
|
||||
port=port,
|
||||
id=uuidutils.generate_uuid())
|
||||
|
||||
def ensure_pool(self, endpoints, loadbalancer, listener):
|
||||
return obj_lbaas.LBaaSPool(name=listener.name,
|
||||
project_id=loadbalancer.project_id,
|
||||
loadbalancer_id=loadbalancer.id,
|
||||
listener_id=listener.id,
|
||||
protocol=listener.protocol,
|
||||
id=uuidutils.generate_uuid())
|
||||
|
||||
def ensure_member(self, endpoints, loadbalancer, pool, subnet_id, ip, port,
|
||||
target_ref):
|
||||
name = "%s:%s:%s" % (loadbalancer.name, ip, port)
|
||||
return obj_lbaas.LBaaSMember(name=name,
|
||||
project_id=pool.project_id,
|
||||
pool_id=pool.id,
|
||||
subnet_id=subnet_id,
|
||||
ip=ip,
|
||||
port=port,
|
||||
id=uuidutils.generate_uuid())
|
||||
|
||||
def release_loadbalancer(self, endpoints, loadbalancer):
|
||||
pass
|
||||
|
||||
def release_listener(self, endpoints, loadbalancer, listener):
|
||||
pass
|
||||
|
||||
def release_pool(self, endpoints, loadbalancer, pool):
|
||||
pass
|
||||
|
||||
def release_member(self, endpoints, loadbalancer, member):
|
||||
pass
|
||||
|
||||
|
||||
class TestLoadBalancerHandler(test_base.TestCase):
|
||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||
'.PodSubnetsDriver.get_instance')
|
||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||
'.PodProjectDriver.get_instance')
|
||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||
'.LBaaSDriver.get_instance')
|
||||
def test_init(self, m_get_drv_lbaas, m_get_drv_project, m_get_drv_subnets):
|
||||
m_get_drv_lbaas.return_value = mock.sentinel.drv_lbaas
|
||||
m_get_drv_project.return_value = mock.sentinel.drv_project
|
||||
m_get_drv_subnets.return_value = mock.sentinel.drv_subnets
|
||||
|
||||
handler = h_lbaas.LoadBalancerHandler()
|
||||
|
||||
self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas)
|
||||
self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project)
|
||||
self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets)
|
||||
|
||||
def test_on_present(self):
|
||||
lbaas_spec = mock.sentinel.lbaas_spec
|
||||
lbaas_state = mock.sentinel.lbaas_state
|
||||
endpoints = mock.sentinel.endpoints
|
||||
|
||||
m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler)
|
||||
m_handler._get_lbaas_spec.return_value = lbaas_spec
|
||||
m_handler._should_ignore.return_value = False
|
||||
m_handler._get_lbaas_state.return_value = lbaas_state
|
||||
m_handler._sync_lbaas_members.return_value = True
|
||||
|
||||
h_lbaas.LoadBalancerHandler.on_present(m_handler, endpoints)
|
||||
|
||||
m_handler._get_lbaas_spec.assert_called_once_with(endpoints)
|
||||
m_handler._should_ignore.assert_called_once_with(endpoints, lbaas_spec)
|
||||
m_handler._get_lbaas_state.assert_called_once_with(endpoints)
|
||||
m_handler._sync_lbaas_members.assert_called_once_with(
|
||||
endpoints, lbaas_state, lbaas_spec)
|
||||
m_handler._set_lbaas_state.assert_called_once_with(
|
||||
endpoints, lbaas_state)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.objects.lbaas'
|
||||
'.LBaaSServiceSpec')
|
||||
def test_on_deleted(self, m_svc_spec_ctor):
|
||||
endpoints = mock.sentinel.endpoints
|
||||
empty_spec = mock.sentinel.empty_spec
|
||||
lbaas_state = mock.sentinel.lbaas_state
|
||||
m_svc_spec_ctor.return_value = empty_spec
|
||||
|
||||
m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler)
|
||||
m_handler._get_lbaas_state.return_value = lbaas_state
|
||||
|
||||
h_lbaas.LoadBalancerHandler.on_deleted(m_handler, endpoints)
|
||||
|
||||
m_handler._get_lbaas_state.assert_called_once_with(endpoints)
|
||||
m_handler._sync_lbaas_members.assert_called_once_with(
|
||||
endpoints, lbaas_state, empty_spec)
|
||||
|
||||
def test_should_ignore(self):
|
||||
endpoints = mock.sentinel.endpoints
|
||||
lbaas_spec = mock.sentinel.lbaas_spec
|
||||
|
||||
# REVISIT(ivc): ddt?
|
||||
m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler)
|
||||
m_handler._has_pods.return_value = True
|
||||
m_handler._is_lbaas_spec_in_sync.return_value = True
|
||||
|
||||
ret = h_lbaas.LoadBalancerHandler._should_ignore(
|
||||
m_handler, endpoints, lbaas_spec)
|
||||
self.assertEqual(False, ret)
|
||||
|
||||
m_handler._has_pods.assert_called_once_with(endpoints)
|
||||
m_handler._is_lbaas_spec_in_sync.assert_called_once_with(
|
||||
endpoints, lbaas_spec)
|
||||
|
||||
def test_is_lbaas_spec_in_sync(self):
|
||||
names = ['a', 'b', 'c']
|
||||
endpoints = {'subsets': [{'ports': [{'name': n} for n in names]}]}
|
||||
lbaas_spec = obj_lbaas.LBaaSServiceSpec(ports=[
|
||||
obj_lbaas.LBaaSPortSpec(name=n) for n in reversed(names)])
|
||||
|
||||
m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler)
|
||||
ret = h_lbaas.LoadBalancerHandler._is_lbaas_spec_in_sync(
|
||||
m_handler, endpoints, lbaas_spec)
|
||||
|
||||
self.assertEqual(True, ret)
|
||||
|
||||
def test_has_pods(self):
|
||||
# REVISIT(ivc): ddt?
|
||||
endpoints = {'subsets': [
|
||||
{},
|
||||
{'addresses': []},
|
||||
{'addresses': [{'targetRef': {}}]},
|
||||
{'addresses': [{'targetRef': {'kind': k_const.K8S_OBJ_POD}}]}
|
||||
]}
|
||||
|
||||
m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler)
|
||||
|
||||
ret = h_lbaas.LoadBalancerHandler._has_pods(m_handler, endpoints)
|
||||
|
||||
self.assertEqual(True, ret)
|
||||
|
||||
def test_get_pod_subnet(self):
|
||||
subnet_id = mock.sentinel.subnet_id
|
||||
project_id = mock.sentinel.project_id
|
||||
target_ref = {'kind': k_const.K8S_OBJ_POD,
|
||||
'name': 'pod-name',
|
||||
'namespace': 'default'}
|
||||
ip = '1.2.3.4'
|
||||
m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler)
|
||||
m_drv_pod_project = mock.Mock()
|
||||
m_drv_pod_project.get_project.return_value = project_id
|
||||
m_handler._drv_pod_project = m_drv_pod_project
|
||||
m_drv_pod_subnets = mock.Mock()
|
||||
m_drv_pod_subnets.get_subnets.return_value = {
|
||||
subnet_id: osv_network.Network(subnets=osv_subnet.SubnetList(
|
||||
objects=[osv_subnet.Subnet(cidr='1.2.3.0/24')]))}
|
||||
m_handler._drv_pod_subnets = m_drv_pod_subnets
|
||||
|
||||
observed_subnet_id = h_lbaas.LoadBalancerHandler._get_pod_subnet(
|
||||
m_handler, target_ref, ip)
|
||||
|
||||
self.assertEqual(subnet_id, observed_subnet_id)
|
||||
|
||||
def _generate_lbaas_state(self, vip, targets, project_id, subnet_id):
|
||||
endpoints = mock.sentinel.endpoints
|
||||
drv = FakeLBaaSDriver()
|
||||
lb = drv.ensure_loadbalancer(
|
||||
endpoints, project_id, subnet_id, vip, None)
|
||||
listeners = {}
|
||||
pools = {}
|
||||
members = {}
|
||||
for ip, (listen_port, target_port) in targets.items():
|
||||
lsnr = listeners.setdefault(listen_port, drv.ensure_listener(
|
||||
endpoints, lb, 'TCP', listen_port))
|
||||
pool = pools.setdefault(listen_port, drv.ensure_pool(
|
||||
endpoints, lb, lsnr))
|
||||
members.setdefault((ip, listen_port, target_port),
|
||||
drv.ensure_member(endpoints, lb, pool,
|
||||
subnet_id, ip,
|
||||
target_port, None))
|
||||
return obj_lbaas.LBaaSState(
|
||||
loadbalancer=lb,
|
||||
listeners=list(listeners.values()),
|
||||
pools=list(pools.values()),
|
||||
members=list(members.values()))
|
||||
|
||||
def _generate_lbaas_spec(self, vip, targets, project_id, subnet_id):
|
||||
return obj_lbaas.LBaaSServiceSpec(
|
||||
ip=vip,
|
||||
project_id=project_id,
|
||||
subnet_id=subnet_id,
|
||||
ports=[obj_lbaas.LBaaSPortSpec(name=str(port),
|
||||
protocol='TCP',
|
||||
port=port)
|
||||
for port in set(t[0] for t in targets.values())])
|
||||
|
||||
def _generate_endpoints(self, targets):
|
||||
def _target_to_port(item):
|
||||
_, (listen_port, target_port) = item
|
||||
return {'port': target_port, 'name': str(listen_port)}
|
||||
port_with_addrs = [
|
||||
(p, [e[0] for e in grp])
|
||||
for p, grp in itertools.groupby(
|
||||
sorted(targets.items()), _target_to_port)]
|
||||
return {
|
||||
'subsets': [
|
||||
{
|
||||
'addresses': [
|
||||
{
|
||||
'ip': ip,
|
||||
'targetRef': {
|
||||
'kind': k_const.K8S_OBJ_POD,
|
||||
'name': ip,
|
||||
'namespace': 'default'
|
||||
}
|
||||
}
|
||||
for ip in addrs
|
||||
],
|
||||
'ports': [port]
|
||||
}
|
||||
for port, addrs in port_with_addrs
|
||||
]
|
||||
}
|
||||
|
||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||
'.PodSubnetsDriver.get_instance')
|
||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||
'.PodProjectDriver.get_instance')
|
||||
@mock.patch('kuryr_kubernetes.controller.drivers.base'
|
||||
'.LBaaSDriver.get_instance')
|
||||
def test_sync_lbaas_members(self, m_get_drv_lbaas, m_get_drv_project,
|
||||
m_get_drv_subnets):
|
||||
# REVISIT(ivc): test methods separately and verify ensure/release
|
||||
project_id = uuidutils.generate_uuid()
|
||||
subnet_id = uuidutils.generate_uuid()
|
||||
current_ip = '1.1.1.1'
|
||||
current_targets = {
|
||||
'1.1.1.101': (1001, 10001),
|
||||
'1.1.1.111': (1001, 10001),
|
||||
'1.1.1.201': (2001, 20001)}
|
||||
expected_ip = '2.2.2.2'
|
||||
expected_targets = {
|
||||
'2.2.2.101': (1201, 12001),
|
||||
'2.2.2.111': (1201, 12001),
|
||||
'2.2.2.201': (2201, 22001)}
|
||||
endpoints = self._generate_endpoints(expected_targets)
|
||||
state = self._generate_lbaas_state(
|
||||
current_ip, current_targets, project_id, subnet_id)
|
||||
spec = self._generate_lbaas_spec(expected_ip, expected_targets,
|
||||
project_id, subnet_id)
|
||||
|
||||
m_drv_lbaas = mock.Mock(wraps=FakeLBaaSDriver())
|
||||
m_drv_project = mock.Mock()
|
||||
m_drv_project.get_project.return_value = project_id
|
||||
m_drv_subnets = mock.Mock()
|
||||
m_drv_subnets.get_subnets.return_value = {
|
||||
subnet_id: mock.sentinel.subnet}
|
||||
m_get_drv_lbaas.return_value = m_drv_lbaas
|
||||
m_get_drv_project.return_value = m_drv_project
|
||||
m_get_drv_subnets.return_value = m_drv_subnets
|
||||
|
||||
handler = h_lbaas.LoadBalancerHandler()
|
||||
|
||||
with mock.patch.object(handler, '_get_pod_subnet') as m_get_pod_subnet:
|
||||
m_get_pod_subnet.return_value = subnet_id
|
||||
handler._sync_lbaas_members(endpoints, state, spec)
|
||||
|
||||
lsnrs = {lsnr.id: lsnr for lsnr in state.listeners}
|
||||
pools = {pool.id: pool for pool in state.pools}
|
||||
observed_targets = sorted(
|
||||
(str(member.ip), (
|
||||
lsnrs[pools[member.pool_id].listener_id].port,
|
||||
member.port))
|
||||
for member in state.members)
|
||||
self.assertEqual(sorted(expected_targets.items()), observed_targets)
|
||||
self.assertEqual(expected_ip, str(state.loadbalancer.ip))
|
||||
|
||||
def test_get_lbaas_spec(self):
|
||||
self.skipTest("skipping until generalised annotation handling is "
|
||||
"implemented")
|
||||
|
||||
def test_get_lbaas_state(self):
|
||||
self.skipTest("skipping until generalised annotation handling is "
|
||||
"implemented")
|
||||
|
||||
def test_set_lbaas_state(self):
|
||||
self.skipTest("skipping until generalised annotation handling is "
|
||||
"implemented")
|
||||
|
@ -61,6 +61,9 @@ kuryr_kubernetes.controller.drivers.pod_vif =
|
||||
generic = kuryr_kubernetes.controller.drivers.generic_vif:GenericPodVIFDriver
|
||||
nested-vlan = kuryr_kubernetes.controller.drivers.nested_vlan_vif:NestedVlanPodVIFDriver
|
||||
|
||||
kuryr_kubernetes.controller.drivers.endpoints_lbaas =
|
||||
lbaasv2 = kuryr_kubernetes.controller.drivers.lbaasv2:LBaaSv2Driver
|
||||
|
||||
[files]
|
||||
packages =
|
||||
kuryr_kubernetes
|
||||
|
Loading…
x
Reference in New Issue
Block a user