Fix LBaaS sg rules update on deployment scale

When a service is created with a Network Policy applied and
deployments are scaled up or down, the LBaaS SG rules should be
updated accordindly. Right now, the LBaaS/Service do not react on
deployment scales.
This commit fixes the issue by ensuring that the LBaaS SG is updated
on pod events.

Also, when Pods, Network Policies and SVCs are created together it might
happen that the LBaaS SG remains with default SG rules, even though
the policy is being enforced. This commit ensures the right SG rules
are applied on a LBaaS regardless the order of k8s resources creation.
This happens by setting the LBaaS Spec annotation whenever a request
to update the SG rules has been made and retrieving the Spec again
whenever a LBaaS member is created.

Change-Id: I1c54d17a5fcff5387ffae2b132f5036ee9bf07ca
Closes-Bug: 1816015
This commit is contained in:
Maysa Macedo 2019-02-15 13:41:33 +00:00
parent ce3fe712ac
commit ba89bd027f
10 changed files with 196 additions and 124 deletions

View File

@ -148,6 +148,7 @@ class LBaaSv2Driver(base.LBaaSDriver):
protocol, sg_rule_name, new_sgs=None):
LOG.debug("Applying members security groups.")
neutron = clients.get_neutron_client()
lb_sg = None
if CONF.octavia_defaults.sg_mode == 'create':
if new_sgs:
lb_name = sg_rule_name.split(":")[0]
@ -155,7 +156,15 @@ class LBaaSv2Driver(base.LBaaSDriver):
else:
lb_sg = self._find_listeners_sg(loadbalancer)
else:
lb_sg = self._get_vip_port(loadbalancer).get('security_groups')[0]
vip_port = self._get_vip_port(loadbalancer)
if vip_port:
lb_sg = vip_port.get('security_groups')[0]
# NOTE (maysams) It might happen that the update of LBaaS SG
# has been triggered and the LBaaS SG was not created yet.
# This update is skiped, until the LBaaS members are created.
if not lb_sg:
return
lbaas_sg_rules = neutron.list_security_group_rules(
security_group_id=lb_sg)
@ -747,7 +756,12 @@ class LBaaSv2Driver(base.LBaaSDriver):
# NOTE(ltomasbo): lb_name parameter is only passed when sg_mode
# is 'create' and in that case there is only one sg associated
# to the loadbalancer
return sgs['security_groups'][0]['id']
try:
sg_id = sgs['security_groups'][0]['id']
except IndexError:
sg_id = None
LOG.debug("Security Group not created yet for LBaaS.")
return sg_id
try:
sgs = neutron.list_security_groups(
name=loadbalancer.name, project_id=loadbalancer.project_id)
@ -917,6 +931,9 @@ class LBaaSv2Driver(base.LBaaSDriver):
if not lbaas:
return
lbaas.security_groups_ids = sgs
utils.set_lbaas_spec(service, lbaas)
for port in svc_ports:
port_protocol = port['protocol']
lbaas_port = port['port']

View File

@ -392,3 +392,16 @@ def tag_neutron_resources(resource, res_ids):
LOG.warning("Failed to tag %s %s with %s. Ignoring, but this "
"is still unexpected.", resource, res_id, tags,
exc_info=True)
def get_services(namespace):
kubernetes = clients.get_kubernetes_client()
try:
services = kubernetes.get(
'{}/namespaces/{}/services'.format(constants.K8S_API_BASE,
namespace))
except k_exc.K8sClientException:
LOG.exception('Exception when getting K8s services in '
'namespace %s', namespace)
raise
return services

View File

@ -24,6 +24,7 @@ from kuryr_kubernetes.controller.drivers import base as drv_base
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes.objects import lbaas as obj_lbaas
from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__)
@ -47,7 +48,7 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler):
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()
def on_present(self, service):
lbaas_spec = self._get_lbaas_spec(service)
lbaas_spec = utils.get_lbaas_spec(service)
if self._should_ignore(service):
LOG.debug("Skipping Kubernetes service %s of an unsupported kind "
@ -58,7 +59,7 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler):
if self._has_lbaas_spec_changes(service, lbaas_spec):
lbaas_spec = self._generate_lbaas_spec(service)
self._set_lbaas_spec(service, lbaas_spec)
utils.set_lbaas_spec(service, lbaas_spec)
def _is_supported_type(self, service):
spec = service['spec']
@ -167,55 +168,6 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler):
return [obj_lbaas.LBaaSPortSpec(**port)
for port in self._get_service_ports(service)]
def _get_endpoints_link(self, service):
svc_link = service['metadata']['selfLink']
link_parts = svc_link.split('/')
if link_parts[-2] != 'services':
raise k_exc.IntegrityError(_(
"Unsupported service link: %(link)s") % {
'link': svc_link})
link_parts[-2] = 'endpoints'
return "/".join(link_parts)
def _set_lbaas_spec(self, service, lbaas_spec):
# TODO(ivc): extract annotation interactions
if lbaas_spec is None:
LOG.debug("Removing LBaaSServiceSpec annotation: %r", lbaas_spec)
annotation = None
else:
lbaas_spec.obj_reset_changes(recursive=True)
LOG.debug("Setting LBaaSServiceSpec annotation: %r", lbaas_spec)
annotation = jsonutils.dumps(lbaas_spec.obj_to_primitive(),
sort_keys=True)
svc_link = service['metadata']['selfLink']
ep_link = self._get_endpoints_link(service)
k8s = clients.get_kubernetes_client()
try:
k8s.annotate(ep_link,
{k_const.K8S_ANNOTATION_LBAAS_SPEC: annotation})
except k_exc.K8sClientException:
# REVISIT(ivc): only raise ResourceNotReady for NotFound
raise k_exc.ResourceNotReady(ep_link)
k8s.annotate(svc_link,
{k_const.K8S_ANNOTATION_LBAAS_SPEC: annotation},
resource_version=service['metadata']['resourceVersion'])
def _get_lbaas_spec(self, service):
# TODO(ivc): same as '_set_lbaas_spec'
try:
annotations = service['metadata']['annotations']
annotation = annotations[k_const.K8S_ANNOTATION_LBAAS_SPEC]
except KeyError:
return None
obj_dict = jsonutils.loads(annotation)
obj = obj_lbaas.LBaaSServiceSpec.obj_from_primitive(obj_dict)
LOG.debug("Got LBaaSServiceSpec from annotation: %r", obj)
return obj
class LoadBalancerHandler(k8s_base.ResourceEventHandler):
"""LoadBalancerHandler handles K8s Endpoints events.
@ -355,9 +307,30 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
return changed
def _sync_lbaas_sgs(self, endpoints, lbaas_state, lbaas_spec):
# NOTE (maysams) Need to retrieve the LBaaS Spec again due to
# the possibility of it being updated after the LBaaS creation
# process has started.
svc_link = self._get_service_link(endpoints)
k8s = clients.get_kubernetes_client()
service = k8s.get(svc_link)
lbaas_spec = utils.get_lbaas_spec(service)
lb = lbaas_state.loadbalancer
default_sgs = config.CONF.neutron_defaults.pod_security_groups
lbaas_spec_sgs = lbaas_spec.security_groups_ids
if lb.security_groups and lb.security_groups != lbaas_spec_sgs:
sgs = [lb_sg for lb_sg in lb.security_groups
if lb_sg not in default_sgs]
if lbaas_spec_sgs != default_sgs:
sgs.extend(lbaas_spec_sgs)
lb.security_groups = sgs
def _add_new_members(self, endpoints, lbaas_state, lbaas_spec):
changed = False
self._sync_lbaas_sgs(endpoints, lbaas_state, lbaas_spec)
lsnr_by_id = {l.id: l for l in lbaas_state.listeners}
pool_by_lsnr_port = {(lsnr_by_id[p.listener_id].protocol,
lsnr_by_id[p.listener_id].port): p
@ -649,15 +622,6 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler):
lbaas_state.service_pub_ip_info = None
changed = True
default_sgs = config.CONF.neutron_defaults.pod_security_groups
lbaas_spec_sgs = lbaas_spec.security_groups_ids
if lb.security_groups and lb.security_groups != lbaas_spec_sgs:
sgs = [lb_sg for lb_sg in lb.security_groups
if lb_sg not in default_sgs]
if lbaas_spec_sgs != default_sgs:
sgs.extend(lbaas_spec_sgs)
lb.security_groups = sgs
lbaas_state.loadbalancer = lb
return changed

View File

@ -20,7 +20,6 @@ from kuryr_kubernetes import clients
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes.controller.drivers import base as drivers
from kuryr_kubernetes.controller.drivers import utils as driver_utils
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes import utils
@ -81,7 +80,8 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
if pods_to_update:
# NOTE(ltomasbo): only need to change services if the pods that
# they point to are updated
services = self._get_services(policy['metadata']['namespace'])
services = driver_utils.get_services(
policy['metadata']['namespace'])
for service in services.get('items'):
# TODO(ltomasbo): Skip other services that are not affected
# by the policy
@ -116,7 +116,8 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
self._drv_policy.release_network_policy(netpolicy_crd)
services = self._get_services(policy['metadata']['namespace'])
services = driver_utils.get_services(
policy['metadata']['namespace'])
for service in services.get('items'):
if service['metadata']['name'] == 'kubernetes':
continue
@ -137,15 +138,3 @@ class NetworkPolicyHandler(k8s_base.ResourceEventHandler):
if utils.has_limit(sg_quota):
return utils.is_available('security_groups', sg_quota, sg_func)
return True
def _get_services(self, namespace):
kubernetes = clients.get_kubernetes_client()
services = {"items": []}
try:
services = kubernetes.get(
'{}/namespaces/{}/services'.format(k_const.K8S_API_BASE,
namespace))
except exceptions.K8sClientException:
LOG.exception("Kubernetes Client Exception.")
raise
return services

View File

@ -73,6 +73,8 @@ class VIFHandler(k8s_base.ResourceEventHandler):
specific_driver='multi_pool')
self._drv_vif_pool.set_vif_driver()
self._drv_multi_vif = drivers.MultiVIFDriver.get_enabled_drivers()
self._drv_lbaas = drivers.LBaaSDriver.get_instance()
self._drv_svc_sg = drivers.ServiceSecurityGroupsDriver.get_instance()
def on_present(self, pod):
if driver_utils.is_host_network(pod) or not self._is_pending_node(pod):
@ -83,9 +85,8 @@ class VIFHandler(k8s_base.ResourceEventHandler):
return
state = driver_utils.get_pod_state(pod)
LOG.debug("Got VIFs from annotation: %r", state)
project_id = self._drv_project.get_project(pod)
if not state:
project_id = self._drv_project.get_project(pod)
security_groups = self._drv_sg.get_security_groups(pod, project_id)
subnets = self._drv_subnets.get_subnets(pod, project_id)
@ -127,10 +128,16 @@ class VIFHandler(k8s_base.ResourceEventHandler):
self._set_pod_state(pod, state)
self._drv_sg.create_sg_rules(pod)
if self._is_network_policy_enabled():
services = driver_utils.get_services(
pod['metadata']['namespace'])
self._update_services(services, project_id)
def on_deleted(self, pod):
if driver_utils.is_host_network(pod):
return
services = driver_utils.get_services(pod['metadata']['namespace'])
project_id = self._drv_project.get_project(pod)
self._drv_sg.delete_sg_rules(pod)
try:
@ -150,6 +157,8 @@ class VIFHandler(k8s_base.ResourceEventHandler):
for ifname, vif in state.vifs.items():
self._drv_vif_pool.release_vif(pod, vif, project_id,
security_groups)
if self._is_network_policy_enabled():
self._update_services(services, project_id)
@MEMOIZE
def is_ready(self, quota):
@ -198,3 +207,16 @@ class VIFHandler(k8s_base.ResourceEventHandler):
{constants.K8S_ANNOTATION_VIF: annotation,
constants.K8S_ANNOTATION_LABEL: labels_annotation},
resource_version=pod['metadata']['resourceVersion'])
def _update_services(self, services, project_id):
for service in services.get('items'):
if service['metadata']['name'] == 'kubernetes':
continue
sgs = self._drv_svc_sg.get_security_groups(service,
project_id)
self._drv_lbaas.update_lbaas_sg(service, sgs)
def _is_network_policy_enabled(self):
enabled_handlers = oslo_cfg.CONF.kubernetes.enabled_handlers
svc_sg_driver = oslo_cfg.CONF.kubernetes.service_security_groups_driver
return ('policy' in enabled_handlers and svc_sg_driver == 'policy')

View File

@ -48,7 +48,9 @@ class TestLBaaSSpecHandler(test_base.TestCase):
self.assertEqual(mock.sentinel.drv_subnets, handler._drv_subnets)
self.assertEqual(mock.sentinel.drv_sg, handler._drv_sg)
def test_on_present(self):
@mock.patch('kuryr_kubernetes.utils.set_lbaas_spec')
@mock.patch('kuryr_kubernetes.utils.get_lbaas_spec')
def test_on_present(self, m_get_lbaas_spec, m_set_lbaas_spec):
svc_event = mock.sentinel.svc_event
old_spec = mock.sentinel.old_spec
new_spec = mock.sentinel.new_spec
@ -58,7 +60,7 @@ class TestLBaaSSpecHandler(test_base.TestCase):
m_drv_project.get_project.return_value = project_id
m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
m_handler._get_lbaas_spec.return_value = old_spec
m_get_lbaas_spec.return_value = old_spec
m_handler._has_lbaas_spec_changes.return_value = True
m_handler._generate_lbaas_spec.return_value = new_spec
m_handler._should_ignore.return_value = False
@ -66,43 +68,49 @@ class TestLBaaSSpecHandler(test_base.TestCase):
h_lbaas.LBaaSSpecHandler.on_present(m_handler, svc_event)
m_handler._get_lbaas_spec.assert_called_once_with(svc_event)
m_get_lbaas_spec.assert_called_once_with(svc_event)
m_handler._has_lbaas_spec_changes.assert_called_once_with(svc_event,
old_spec)
m_handler._generate_lbaas_spec.assert_called_once_with(svc_event)
m_handler._set_lbaas_spec.assert_called_once_with(svc_event, new_spec)
m_set_lbaas_spec.assert_called_once_with(svc_event, new_spec)
def test_on_present_no_changes(self):
@mock.patch('kuryr_kubernetes.utils.set_lbaas_spec')
@mock.patch('kuryr_kubernetes.utils.get_lbaas_spec')
def test_on_present_no_changes(self, m_get_lbaas_spec,
m_set_lbaas_spec):
svc_event = mock.sentinel.svc_event
old_spec = mock.sentinel.old_spec
m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
m_handler._get_lbaas_spec.return_value = old_spec
m_get_lbaas_spec.return_value = old_spec
m_handler._has_lbaas_spec_changes.return_value = False
m_handler._should_ignore.return_value = False
h_lbaas.LBaaSSpecHandler.on_present(m_handler, svc_event)
m_handler._get_lbaas_spec.assert_called_once_with(svc_event)
m_get_lbaas_spec.assert_called_once_with(svc_event)
m_handler._has_lbaas_spec_changes.assert_called_once_with(svc_event,
old_spec)
m_handler._generate_lbaas_spec.assert_not_called()
m_handler._set_lbaas_spec.assert_not_called()
m_set_lbaas_spec.assert_not_called()
def test_on_present_no_selector(self):
@mock.patch('kuryr_kubernetes.utils.set_lbaas_spec')
@mock.patch('kuryr_kubernetes.utils.get_lbaas_spec')
def test_on_present_no_selector(self, m_get_lbaas_spec,
m_set_lbaas_spec):
svc_event = {'metadata': {'name': 'dummy_name'}}
old_spec = mock.sentinel.old_spec
m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
m_handler._get_lbaas_spec.return_value = old_spec
m_get_lbaas_spec.return_value = old_spec
m_handler._should_ignore.return_value = True
h_lbaas.LBaaSSpecHandler.on_present(m_handler, svc_event)
m_handler._get_lbaas_spec.assert_called_once_with(svc_event)
m_get_lbaas_spec.assert_called_once_with(svc_event)
m_handler._has_lbaas_spec_changes.assert_not_called()
m_handler._generate_lbaas_spec.assert_not_called()
m_handler._set_lbaas_spec.assert_not_called()
m_set_lbaas_spec.assert_not_called()
def test_get_service_ip(self):
svc_body = {'spec': {'type': 'ClusterIP',
@ -328,22 +336,6 @@ class TestLBaaSSpecHandler(test_base.TestCase):
m_handler._get_service_ports.assert_called_once_with(
mock.sentinel.service)
def test_get_endpoints_link(self):
m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
service = {'metadata': {
'selfLink': "/api/v1/namespaces/default/services/test"}}
ret = h_lbaas.LBaaSSpecHandler._get_endpoints_link(m_handler, service)
expected_link = "/api/v1/namespaces/default/endpoints/test"
self.assertEqual(expected_link, ret)
def test_get_endpoints_link__integrity_error(self):
m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
service = {'metadata': {
'selfLink': "/api/v1/namespaces/default/not-services/test"}}
self.assertRaises(k_exc.IntegrityError,
h_lbaas.LBaaSSpecHandler._get_endpoints_link,
m_handler, service)
def test_set_lbaas_spec(self):
self.skipTest("skipping until generalised annotation handling is "
"implemented")
@ -821,6 +813,8 @@ class TestLoadBalancerHandler(test_base.TestCase):
for member in state.members)
return observed_targets
@mock.patch('kuryr_kubernetes.controller.handlers.lbaas.'
'LoadBalancerHandler._sync_lbaas_sgs')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.PodSubnetsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
@ -828,7 +822,7 @@ class TestLoadBalancerHandler(test_base.TestCase):
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.LBaaSDriver.get_instance')
def test_sync_lbaas_members(self, m_get_drv_lbaas, m_get_drv_project,
m_get_drv_subnets):
m_get_drv_subnets, m_sync_lbaas_sgs):
# REVISIT(ivc): test methods separately and verify ensure/release
project_id = str(uuid.uuid4())
subnet_id = str(uuid.uuid4())
@ -855,6 +849,8 @@ class TestLoadBalancerHandler(test_base.TestCase):
self.assertEqual(sorted(expected_targets.items()), observed_targets)
self.assertEqual(expected_ip, str(state.loadbalancer.ip))
@mock.patch('kuryr_kubernetes.controller.handlers.lbaas.'
'LoadBalancerHandler._sync_lbaas_sgs')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.PodSubnetsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
@ -862,7 +858,8 @@ class TestLoadBalancerHandler(test_base.TestCase):
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.LBaaSDriver.get_instance')
def test_sync_lbaas_members_udp(self, m_get_drv_lbaas,
m_get_drv_project, m_get_drv_subnets):
m_get_drv_project, m_get_drv_subnets,
m_sync_lbaas_sgs):
# REVISIT(ivc): test methods separately and verify ensure/release
project_id = str(uuid.uuid4())
subnet_id = str(uuid.uuid4())
@ -889,6 +886,8 @@ class TestLoadBalancerHandler(test_base.TestCase):
self.assertEqual([], observed_targets)
self.assertEqual(expected_ip, str(state.loadbalancer.ip))
@mock.patch('kuryr_kubernetes.controller.handlers.lbaas.'
'LoadBalancerHandler._sync_lbaas_sgs')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.PodSubnetsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
@ -896,7 +895,8 @@ class TestLoadBalancerHandler(test_base.TestCase):
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.LBaaSDriver.get_instance')
def test_sync_lbaas_members_svc_listener_port_edit(
self, m_get_drv_lbaas, m_get_drv_project, m_get_drv_subnets):
self, m_get_drv_lbaas, m_get_drv_project, m_get_drv_subnets,
m_sync_lbaas_sgs):
# REVISIT(ivc): test methods separately and verify ensure/release
project_id = str(uuid.uuid4())
subnet_id = str(uuid.uuid4())
@ -943,6 +943,8 @@ class TestLoadBalancerHandler(test_base.TestCase):
self.skipTest("skipping until generalised annotation handling is "
"implemented")
@mock.patch('kuryr_kubernetes.controller.handlers.lbaas.'
'LoadBalancerHandler._sync_lbaas_sgs')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.PodSubnetsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base'
@ -950,7 +952,8 @@ class TestLoadBalancerHandler(test_base.TestCase):
@mock.patch('kuryr_kubernetes.controller.drivers.base'
'.LBaaSDriver.get_instance')
def test_add_new_members_udp(self, m_get_drv_lbaas,
m_get_drv_project, m_get_drv_subnets):
m_get_drv_project, m_get_drv_subnets,
m_sync_lbaas_sgs):
project_id = str(uuid.uuid4())
subnet_id = str(uuid.uuid4())
current_ip = '1.1.1.1'

View File

@ -115,8 +115,9 @@ class TestPolicyHandler(test_base.TestCase):
handler._drv_project)
self.assertEqual(m_get_policy_driver.return_value, handler._drv_policy)
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
def test_on_present(self, m_host_network):
def test_on_present(self, m_host_network, m_get_services):
modified_pod = mock.sentinel.modified_pod
match_pod = mock.sentinel.match_pod
m_host_network.return_value = False
@ -131,7 +132,7 @@ class TestPolicyHandler(test_base.TestCase):
sg1 = [mock.sentinel.sg1]
sg2 = [mock.sentinel.sg2]
self._get_security_groups.side_effect = [sg1, sg2]
self._handler._get_services.return_value = {'items': []}
m_get_services.return_value = {'items': []}
policy.NetworkPolicyHandler.on_present(self._handler, self._policy)
namespaced_pods.assert_not_called()
@ -147,8 +148,10 @@ class TestPolicyHandler(test_base.TestCase):
self._update_vif_sgs.assert_has_calls(calls)
self._update_lbaas_sg.assert_not_called()
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
def test_on_present_without_knps_on_namespace(self, m_host_network):
def test_on_present_without_knps_on_namespace(self, m_host_network,
m_get_services):
modified_pod = mock.sentinel.modified_pod
match_pod = mock.sentinel.match_pod
m_host_network.return_value = False
@ -160,7 +163,7 @@ class TestPolicyHandler(test_base.TestCase):
sg2 = [mock.sentinel.sg2]
sg3 = [mock.sentinel.sg3]
self._get_security_groups.side_effect = [sg2, sg3]
self._handler._get_services.return_value = {'items': []}
m_get_services.return_value = {'items': []}
policy.NetworkPolicyHandler.on_present(self._handler, self._policy)
ensure_nw_policy.assert_called_once_with(self._policy,
@ -176,8 +179,9 @@ class TestPolicyHandler(test_base.TestCase):
self._update_vif_sgs.assert_has_calls(calls)
self._update_lbaas_sg.assert_not_called()
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
def test_on_present_with_services(self, m_host_network):
def test_on_present_with_services(self, m_host_network, m_get_services):
modified_pod = mock.sentinel.modified_pod
match_pod = mock.sentinel.match_pod
m_host_network.return_value = False
@ -193,7 +197,7 @@ class TestPolicyHandler(test_base.TestCase):
sg2 = [mock.sentinel.sg2]
self._get_security_groups.side_effect = [sg1, sg2]
service = {'metadata': {'name': 'service-test'}}
self._handler._get_services.return_value = {'items': [service]}
m_get_services.return_value = {'items': [service]}
policy.NetworkPolicyHandler.on_present(self._handler, self._policy)
namespaced_pods.assert_not_called()
@ -209,8 +213,9 @@ class TestPolicyHandler(test_base.TestCase):
self._update_vif_sgs.assert_has_calls(calls)
self._update_lbaas_sg.assert_called_once()
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
def test_on_deleted(self, m_host_network):
def test_on_deleted(self, m_host_network, m_get_services):
namespace_pod = mock.sentinel.namespace_pod
match_pod = mock.sentinel.match_pod
m_host_network.return_value = False
@ -222,7 +227,7 @@ class TestPolicyHandler(test_base.TestCase):
sg1 = [mock.sentinel.sg1]
sg2 = [mock.sentinel.sg2]
self._get_security_groups.side_effect = [sg1, sg2]
self._handler._get_services.return_value = {'items': []}
m_get_services.return_value = {'items': []}
release_nw_policy = self._handler._drv_policy.release_network_policy
knp_on_ns = self._handler._drv_policy.knps_on_namespace
knp_on_ns.return_value = False

View File

@ -42,9 +42,11 @@ class TestVIFHandler(test_base.TestCase):
self._pod_version = mock.sentinel.pod_version
self._pod_link = mock.sentinel.pod_link
self._pod_namespace = mock.sentinel.namespace
self._pod = {
'metadata': {'resourceVersion': self._pod_version,
'selfLink': self._pod_link},
'selfLink': self._pod_link,
'namespace': self._pod_namespace},
'status': {'phase': k_const.K8S_POD_STATUS_PENDING},
'spec': {'hostNetwork': False,
'nodeName': 'hostname'}
@ -168,11 +170,14 @@ class TestVIFHandler(test_base.TestCase):
self._activate_vif.assert_not_called()
self._set_pod_state.assert_not_called()
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state')
def test_on_present_activate(self, m_get_pod_state, m_host_network):
def test_on_present_activate(self, m_get_pod_state, m_host_network,
m_get_services):
m_get_pod_state.return_value = self._state
m_host_network.return_value = False
m_get_services.return_value = {"items": []}
self._vif.active = False
h_vif.VIFHandler.on_present(self._handler, self._pod)
@ -239,11 +244,13 @@ class TestVIFHandler(test_base.TestCase):
self._security_groups)
self._activate_vif.assert_not_called()
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state')
def test_on_deleted(self, m_get_pod_state, m_host_network):
def test_on_deleted(self, m_get_pod_state, m_host_network, m_get_services):
m_get_pod_state.return_value = self._state
m_host_network.return_value = False
m_get_services.return_value = {"items": []}
h_vif.VIFHandler.on_deleted(self._handler, self._pod)
m_get_pod_state.assert_called_once_with(self._pod)
@ -251,14 +258,16 @@ class TestVIFHandler(test_base.TestCase):
self._project_id,
self._security_groups)
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state')
def test_on_deleted_with_additional_vifs(self, m_get_pod_state,
m_host_network):
m_host_network, m_get_services):
additional_vif = os_obj.vif.VIFBase()
self._state.additional_vifs = {'eth1': additional_vif}
m_get_pod_state.return_value = self._state
m_host_network.return_value = False
m_get_services.return_value = {"items": []}
h_vif.VIFHandler.on_deleted(self._handler, self._pod)
@ -280,11 +289,14 @@ class TestVIFHandler(test_base.TestCase):
m_get_pod_state.assert_not_called()
self._release_vif.assert_not_called()
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state')
def test_on_deleted_no_annotation(self, m_get_pod_state, m_host_network):
def test_on_deleted_no_annotation(self, m_get_pod_state, m_host_network,
m_get_services):
m_get_pod_state.return_value = None
m_host_network.return_value = False
m_get_services.return_value = {"items": []}
h_vif.VIFHandler.on_deleted(self._handler, self._pod)

View File

@ -157,3 +157,10 @@ class TestUtils(test_base.TestCase):
self.assertEqual(resp, False)
kubernetes.get.assert_called_once()
def test_get_endpoints_link(self):
service = {'metadata': {
'selfLink': "/api/v1/namespaces/default/services/test"}}
ret = utils.get_endpoints_link(service)
expected_link = "/api/v1/namespaces/default/endpoints/test"
self.assertEqual(expected_link, ret)

View File

@ -223,3 +223,43 @@ def get_lbaas_spec(service):
obj = obj_lbaas.LBaaSServiceSpec.obj_from_primitive(obj_dict)
LOG.debug("Got LBaaSServiceSpec from annotation: %r", obj)
return obj
def set_lbaas_spec(service, lbaas_spec):
# TODO(ivc): extract annotation interactions
if lbaas_spec is None:
LOG.debug("Removing LBaaSServiceSpec annotation: %r", lbaas_spec)
annotation = None
else:
lbaas_spec.obj_reset_changes(recursive=True)
LOG.debug("Setting LBaaSServiceSpec annotation: %r", lbaas_spec)
annotation = jsonutils.dumps(lbaas_spec.obj_to_primitive(),
sort_keys=True)
svc_link = service['metadata']['selfLink']
ep_link = get_endpoints_link(service)
k8s = clients.get_kubernetes_client()
try:
k8s.annotate(ep_link,
{constants.K8S_ANNOTATION_LBAAS_SPEC: annotation})
except exceptions.K8sResourceNotFound:
raise exceptions.ResourceNotReady(ep_link)
except exceptions.K8sClientException:
LOG.debug("Failed to annotate endpoint %r", ep_link)
raise
k8s.annotate(svc_link,
{constants.K8S_ANNOTATION_LBAAS_SPEC: annotation},
resource_version=service['metadata']['resourceVersion'])
def get_endpoints_link(service):
svc_link = service['metadata']['selfLink']
link_parts = svc_link.split('/')
if link_parts[-2] != 'services':
raise exceptions.IntegrityError(_(
"Unsupported service link: %(link)s") % {
'link': svc_link})
link_parts[-2] = 'endpoints'
return "/".join(link_parts)