Fix NPs for OVN LBs with hairpin traffic

In case of hairpin LB traffic (member of the LB calls the LB and the
request is directed back to the same member) OVN replaces the source-ip
of the request with the LB IP. This means that pods with network
policies applied may have that traffic blocked when it should be
allowed.

To fix that this commit makes sure that SGs used for NPs include ingress
rules for each of the Service in it's namespace. It's not ideal but
seems to be a fair compromise between opening as little traffic as
possible and increasing number of security groups and rules.

As this commit makes sure all the NPs in the namespaces are reanalyzed
every time a Service is created or deleted, a little fixes in order to
support that are also made.

Change-Id: I7e0458c4071e4a43ab4d158429e05c67cd897a3c
Closes-Bug: 1923452
(cherry picked from commit e84a6a707e)
This commit is contained in:
Michał Dulko 2021-04-12 14:33:15 +02:00
parent bd63ba203a
commit 643effc340
13 changed files with 201 additions and 91 deletions

View File

@ -27,6 +27,7 @@ K8S_API_CRD_KURYRLOADBALANCERS = K8S_API_CRD + '/kuryrloadbalancers'
K8S_API_CRD_KURYRPORTS = K8S_API_CRD + '/kuryrports'
K8S_API_POLICIES = '/apis/networking.k8s.io/v1/networkpolicies'
K8S_API_NETWORKING = '/apis/networking.k8s.io/v1'
K8S_API_NETWORKING_NAMESPACES = K8S_API_NETWORKING + '/namespaces'
K8S_API_NPWG_CRD = '/apis/k8s.cni.cncf.io/v1'

View File

@ -35,6 +35,7 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
"""Provide security groups actions based on K8s Network Policies"""
def __init__(self):
super().__init__()
self.os_net = clients.get_network_client()
self.kubernetes = clients.get_kubernetes_client()
self.nodes_subnets_driver = base.NodesSubnetsDriver.get_instance()
@ -131,9 +132,52 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
i_rules, e_rules = self._parse_network_policy_rules(policy)
# Add default rules to allow traffic from host and svc subnet
i_rules += self._get_default_np_rules()
# Add rules allowing ingress from LBs
# FIXME(dulek): Rules added below cannot work around the Amphora
# source-ip problem as Amphora does not use LB VIP for
# LB->members traffic, but that other IP attached to the
# Amphora VM in the service subnet. It's ridiculous.
i_rules += self._get_service_ingress_rules(policy)
return i_rules, e_rules
def _get_service_ingress_rules(self, policy):
"""Get SG rules allowing traffic from Services in the namespace
This methods returns ingress rules allowing traffic from all
services clusterIPs in the cluster. This is required for OVN LBs in
order to work around the fact that it changes source-ip to LB IP in
hairpin traffic. This shouldn't be a security problem as this can only
happen when the pod receiving the traffic is the one that calls the
service.
FIXME(dulek): Once OVN supports selecting a single, configurable
source-IP for hairpin traffic, consider using it instead.
"""
if CONF.octavia_defaults.enforce_sg_rules:
# When enforce_sg_rules is True, one of the default rules will
# open ingress from all the services subnets, so those rules would
# be redundant.
return []
ns = policy['metadata']['namespace']
rules = []
services = self.kubernetes.get(
f'{constants.K8S_API_NAMESPACES}/{ns}/services').get('items', [])
for svc in services:
if svc['metadata'].get('deletionTimestamp'):
# Ignore services being deleted
continue
ip = svc['spec'].get('clusterIP')
if not ip or ip == 'None':
# Ignore headless services
continue
rules.append(driver_utils.create_security_group_rule_body(
'ingress', cidr=ip,
description=f"Allow traffic from local namespace service "
f"{svc['metadata']['name']}"))
return rules
def _get_default_np_rules(self):
"""Add extra SG rule to allow traffic from svcs and host.
@ -519,6 +563,8 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
def _create_svc_egress_sg_rule(self, policy_namespace, sg_rule_body_list,
resource=None, port=None, protocol=None):
# FIXME(dulek): We could probably filter by namespace here for pods
# and namespace resources?
services = driver_utils.get_services()
if not resource:
svc_subnet = utils.get_subnet_cidr(
@ -530,6 +576,15 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
return
for service in services.get('items'):
if service['metadata'].get('deletionTimestamp'):
# Ignore services being deleted
continue
cluster_ip = service['spec'].get('clusterIP')
if not cluster_ip or cluster_ip == 'None':
# Headless services has 'None' as clusterIP, ignore.
continue
svc_name = service['metadata']['name']
svc_namespace = service['metadata']['namespace']
if self._is_pod(resource):
@ -542,8 +597,7 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
if pod_ip and pod_ip not in targets:
continue
elif pod_labels:
if not driver_utils.match_labels(
svc_selector, pod_labels):
if not driver_utils.match_labels(svc_selector, pod_labels):
continue
elif resource.get('cidr'):
# NOTE(maysams) Accounts for traffic to pods under
@ -567,13 +621,8 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
ns_name = service['metadata']['namespace']
if ns_name != resource['metadata']['name']:
continue
cluster_ip = service['spec'].get('clusterIP')
if not cluster_ip or cluster_ip == 'None':
# Headless services has 'None' as clusterIP.
continue
rule = driver_utils.create_security_group_rule_body(
'egress', port, protocol=protocol,
cidr=cluster_ip)
'egress', port, protocol=protocol, cidr=cluster_ip)
if rule not in sg_rule_body_list:
sg_rule_body_list.append(rule)

View File

@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from oslo_config import cfg
from oslo_log import log as logging
@ -44,20 +42,6 @@ def _get_namespace_labels(namespace):
return namespaces['metadata'].get('labels')
def _bump_networkpolicy(knp):
kubernetes = clients.get_kubernetes_client()
try:
kubernetes.annotate(
knp['metadata']['annotations']['networkPolicyLink'],
{constants.K8S_ANNOTATION_POLICY: str(uuid.uuid4())})
except exceptions.K8sResourceNotFound:
raise
except exceptions.K8sClientException:
LOG.exception("Kubernetes Client Exception")
raise
def _create_sg_rules_with_container_ports(container_ports, matched):
"""Checks if security group rules based on container ports will be updated
@ -321,7 +305,7 @@ class NetworkPolicySecurityGroupsDriver(base.PodSecurityGroupsDriver):
if i_matched or e_matched:
try:
_bump_networkpolicy(crd)
driver_utils.bump_networkpolicy(crd)
except exceptions.K8sResourceNotFound:
# The NP got deleted, ignore it.
continue
@ -350,7 +334,7 @@ class NetworkPolicySecurityGroupsDriver(base.PodSecurityGroupsDriver):
if i_matched or e_matched:
try:
_bump_networkpolicy(crd)
driver_utils.bump_networkpolicy(crd)
except exceptions.K8sResourceNotFound:
# The NP got deleted, ignore it.
continue
@ -384,7 +368,7 @@ class NetworkPolicySecurityGroupsDriver(base.PodSecurityGroupsDriver):
if i_matched or e_matched:
try:
_bump_networkpolicy(crd)
driver_utils.bump_networkpolicy(crd)
except exceptions.K8sResourceNotFound:
# The NP got deleted, ignore it.
continue
@ -407,7 +391,7 @@ class NetworkPolicySecurityGroupsDriver(base.PodSecurityGroupsDriver):
if i_matched or e_matched:
try:
_bump_networkpolicy(crd)
driver_utils.bump_networkpolicy(crd)
except exceptions.K8sResourceNotFound:
# The NP got deleted, ignore it.
continue

View File

@ -14,6 +14,7 @@
# under the License.
import urllib
import uuid
import netaddr
from openstack import exceptions as os_exc
@ -348,7 +349,7 @@ def get_networkpolicies(namespace=None):
try:
if namespace:
np_path = '{}/{}/networkpolicies'.format(
constants.K8S_API_CRD_NAMESPACES, namespace)
constants.K8S_API_NETWORKING_NAMESPACES, namespace)
else:
np_path = constants.K8S_API_POLICIES
nps = kubernetes.get(np_path)
@ -625,3 +626,40 @@ def get_endpoints_targets(name, namespace):
for endpoint in ep_slice.get('endpoints', []):
target_ips.extend(endpoint.get('addresses', []))
return target_ips
def bump_networkpolicy(knp):
kubernetes = clients.get_kubernetes_client()
try:
kubernetes.annotate(
knp['metadata']['annotations']['networkPolicyLink'],
{constants.K8S_ANNOTATION_POLICY: str(uuid.uuid4())})
except k_exc.K8sResourceNotFound:
raise
except k_exc.K8sClientException:
LOG.exception("Failed to annotate network policy %s to force its "
"recalculation.", utils.get_res_unique_name(knp))
raise
def bump_networkpolicies(namespace=None):
k8s = clients.get_kubernetes_client()
nps = get_networkpolicies(namespace)
for np in nps:
try:
k8s.annotate(utils.get_res_link(np),
{constants.K8S_ANNOTATION_POLICY: str(uuid.uuid4())})
except k_exc.K8sResourceNotFound:
# Ignore if NP got deleted.
pass
except k_exc.K8sClientException:
LOG.warning("Failed to annotate network policy %s to force its "
"recalculation.", utils.get_res_unique_name(np))
continue
def is_network_policy_enabled():
enabled_handlers = CONF.kubernetes.enabled_handlers
svc_sg_driver = CONF.kubernetes.service_security_groups_driver
return 'policy' in enabled_handlers and svc_sg_driver == 'policy'

View File

@ -45,7 +45,7 @@ class KuryrNetworkHandler(k8s_base.ResourceEventHandler):
self._drv_vif_pool = drivers.VIFPoolDriver.get_instance(
specific_driver='multi_pool')
self._drv_vif_pool.set_vif_driver()
if self._is_network_policy_enabled():
if driver_utils.is_network_policy_enabled():
self._drv_lbaas = drivers.LBaaSDriver.get_instance()
self._drv_svc_sg = (
drivers.ServiceSecurityGroupsDriver.get_instance())
@ -82,7 +82,7 @@ class KuryrNetworkHandler(k8s_base.ResourceEventHandler):
# update SG and svc SGs
namespace = driver_utils.get_namespace(ns_name)
crd_selectors = self._drv_sg.update_namespace_sg_rules(namespace)
if (self._is_network_policy_enabled() and crd_selectors and
if (driver_utils.is_network_policy_enabled() and crd_selectors and
oslo_cfg.CONF.octavia_defaults.enforce_sg_rules):
services = driver_utils.get_services()
self._update_services(services, crd_selectors, project_id)
@ -111,7 +111,7 @@ class KuryrNetworkHandler(k8s_base.ResourceEventHandler):
'metadata': {'name': kuryrnet_crd['spec']['nsName']}}
crd_selectors = self._drv_sg.delete_namespace_sg_rules(namespace)
if (self._is_network_policy_enabled() and crd_selectors and
if (driver_utils.is_network_policy_enabled() and crd_selectors and
oslo_cfg.CONF.octavia_defaults.enforce_sg_rules):
project_id = kuryrnet_crd['spec']['projectId']
services = driver_utils.get_services()
@ -127,11 +127,6 @@ class KuryrNetworkHandler(k8s_base.ResourceEventHandler):
kuryrnet_crd)
raise
def _is_network_policy_enabled(self):
enabled_handlers = oslo_cfg.CONF.kubernetes.enabled_handlers
svc_sg_driver = oslo_cfg.CONF.kubernetes.service_security_groups_driver
return ('policy' in enabled_handlers and svc_sg_driver == 'policy')
def _update_services(self, services, crd_selectors, project_id):
for service in services.get('items'):
if not driver_utils.service_matches_affected_pods(

View File

@ -192,7 +192,9 @@ class KuryrNetworkPolicyHandler(k8s_base.ResourceEventHandler):
# by the policy
# NOTE(maysams): Network Policy is not enforced on Services
# without selectors for Amphora Octavia provider.
if (not service['spec'].get('selector') or not
# NOTE(dulek): Skip services being deleted.
if (not service['spec'].get('selector') or
service['metadata'].get('deletionTimestamp') or not
self._is_service_affected(service, pods_to_update)):
continue
sgs = self._drv_svc_sg.get_security_groups(service, project_id)

View File

@ -52,7 +52,7 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler):
specific_driver='multi_pool')
self._drv_vif_pool.set_vif_driver()
self._drv_multi_vif = drivers.MultiVIFDriver.get_enabled_drivers()
if self._is_network_policy_enabled():
if driver_utils.is_network_policy_enabled():
self._drv_lbaas = drivers.LBaaSDriver.get_instance()
self._drv_svc_sg = (drivers.ServiceSecurityGroupsDriver
.get_instance())
@ -117,7 +117,7 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler):
except k_exc.K8sClientException:
raise k_exc.ResourceNotReady(pod['metadata']['name'])
if self._is_network_policy_enabled():
if driver_utils.is_network_policy_enabled():
crd_pod_selectors = self._drv_sg.create_sg_rules(pod)
if oslo_cfg.CONF.octavia_defaults.enforce_sg_rules:
services = driver_utils.get_services()
@ -188,7 +188,7 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler):
vif = objects.base.VersionedObject.obj_from_primitive(data['vif'])
self._drv_vif_pool.release_vif(pod, vif, project_id,
security_groups)
if (self._is_network_policy_enabled() and crd_pod_selectors and
if (driver_utils.is_network_policy_enabled() and crd_pod_selectors and
oslo_cfg.CONF.octavia_defaults.enforce_sg_rules):
services = driver_utils.get_services()
self._update_services(services, crd_pod_selectors, project_id)
@ -278,11 +278,6 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler):
self.k8s.patch_crd('status', utils.get_res_link(kuryrport_crd),
{'vifs': vif_dict})
def _is_network_policy_enabled(self):
enabled_handlers = oslo_cfg.CONF.kubernetes.enabled_handlers
svc_sg_driver = oslo_cfg.CONF.kubernetes.service_security_groups_driver
return ('policy' in enabled_handlers and svc_sg_driver == 'policy')
def _update_services(self, services, crd_pod_selectors, project_id):
for service in services.get('items'):
if not driver_utils.service_matches_affected_pods(

View File

@ -20,6 +20,7 @@ from kuryr_kubernetes import clients
from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes.controller.drivers import base as drv_base
from kuryr_kubernetes.controller.drivers import utils as driver_utils
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes import utils
@ -45,6 +46,10 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
self._drv_subnets = drv_base.ServiceSubnetsDriver.get_instance()
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()
def _bump_network_policies(self, svc):
if driver_utils.is_network_policy_enabled():
driver_utils.bump_networkpolicies(svc['metadata']['namespace'])
def on_present(self, service):
reason = self._should_ignore(service)
if reason:
@ -62,6 +67,9 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
if loadbalancer_crd is None:
try:
# Bump all the NPs in the namespace to force SG rules
# recalculation.
self._bump_network_policies(service)
self.create_crd_spec(service)
except k_exc.K8sNamespaceTerminating:
LOG.warning('Namespace %s is being terminated, ignoring '
@ -111,6 +119,9 @@ class ServiceHandler(k8s_base.ResourceEventHandler):
klb_crd_path = (f"{k_const.K8S_API_CRD_NAMESPACES}/"
f"{svc_namespace}/kuryrloadbalancers/{svc_name}")
# Bump all the NPs in the namespace to force SG rules
# recalculation.
self._bump_network_policies(service)
try:
k8s.delete(klb_crd_path)
except k_exc.K8sResourceNotFound:

View File

@ -14,12 +14,16 @@
from unittest import mock
from oslo_config import cfg
from kuryr_kubernetes.controller.drivers import network_policy
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.tests import base as test_base
from kuryr_kubernetes.tests.unit import kuryr_fixtures as k_fix
from kuryr_kubernetes import utils
CONF = cfg.CONF
def get_pod_obj():
return {
@ -185,8 +189,41 @@ class TestNetworkPolicyDriver(test_base.TestCase):
m_get_crd, m_get_default):
m_utils.get_subnet_cidr.return_value = mock.sentinel.cidr
m_parse.return_value = (self._i_rules, self._e_rules)
self._driver.ensure_network_policy(
self._policy)
self.kubernetes.get = mock.Mock(return_value={})
self._driver.ensure_network_policy(self._policy)
m_get_crd.assert_called_once()
m_add_crd.assert_called_once()
m_get_default.assert_called_once()
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'
'create_security_group_rule_body')
@mock.patch.object(network_policy.NetworkPolicyDriver,
'_get_default_np_rules')
@mock.patch.object(network_policy.NetworkPolicyDriver,
'_get_knp_crd', return_value=False)
@mock.patch.object(network_policy.NetworkPolicyDriver,
'_create_knp_crd')
@mock.patch.object(network_policy.NetworkPolicyDriver,
'_parse_network_policy_rules')
@mock.patch.object(utils, 'get_subnet_cidr')
def test_ensure_network_policy_services(self, m_utils, m_parse, m_add_crd,
m_get_crd, m_get_default,
m_create_sgr):
CONF.set_override('enforce_sg_rules', False, group='octavia_defaults')
self.addCleanup(CONF.set_override, 'enforce_sg_rules', True,
group='octavia_defaults')
m_utils.get_subnet_cidr.return_value = mock.sentinel.cidr
m_parse.return_value = (self._i_rules, self._e_rules)
svcs = [
{'metadata': {'name': 'foo', 'deletionTimestamp': 'foobar'}},
{'metadata': {'name': 'bar'}, 'spec': {'clusterIP': 'None'}},
{'metadata': {'name': 'baz'}, 'spec': {'clusterIP': None}},
{'metadata': {'name': ''}, 'spec': {'clusterIP': '192.168.0.130'}},
]
self.kubernetes.get = mock.Mock(return_value={'items': svcs})
self._driver.ensure_network_policy(self._policy)
m_create_sgr.assert_called_once_with('ingress', cidr='192.168.0.130',
description=mock.ANY)
m_get_crd.assert_called_once()
m_add_crd.assert_called_once()
m_get_default.assert_called_once()
@ -203,6 +240,7 @@ class TestNetworkPolicyDriver(test_base.TestCase):
m_utils.get_subnet_cidr.return_value = mock.sentinel.cidr
m_parse.return_value = (self._i_rules, self._e_rules)
m_get_crd.side_effect = exceptions.K8sClientException
self.kubernetes.get = mock.Mock(return_value={})
self.assertRaises(exceptions.K8sClientException,
self._driver.ensure_network_policy, self._policy)
m_get_default.assert_called_once()
@ -220,6 +258,7 @@ class TestNetworkPolicyDriver(test_base.TestCase):
m_utils.get_subnet_cidr.return_value = mock.sentinel.cidr
m_parse.return_value = (self._i_rules, self._e_rules)
m_add_crd.side_effect = exceptions.K8sClientException
self.kubernetes.get = mock.Mock(return_value={})
self.assertRaises(exceptions.K8sClientException,
self._driver.ensure_network_policy, self._policy)
m_get_crd.assert_called()

View File

@ -381,8 +381,7 @@ class TestNetworkPolicySecurityGroupsDriver(test_base.TestCase):
crd, pod, pod_selector, rule_block, 'ingress', False)
self.assertEqual(matched, False)
@mock.patch('kuryr_kubernetes.controller.drivers.'
'network_policy_security_groups._bump_networkpolicy')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.bump_networkpolicy')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'
'get_kuryrnetworkpolicy_crds')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_ip')
@ -488,8 +487,7 @@ class TestNetworkPolicySecurityGroupsDriver(test_base.TestCase):
m_get_crds.assert_called_once_with(namespace=self._namespace)
self.assertEqual([self._sg_id, self._sg_id2], resp)
@mock.patch('kuryr_kubernetes.controller.drivers.'
'network_policy_security_groups._bump_networkpolicy')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.bump_networkpolicy')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'
'get_kuryrnetworkpolicy_crds')
def test_delete_namespace_sg_rule(self, m_get_knp_crd, m_bump):
@ -503,8 +501,7 @@ class TestNetworkPolicySecurityGroupsDriver(test_base.TestCase):
m_get_knp_crd.assert_called_once()
m_bump.assert_called_once()
@mock.patch('kuryr_kubernetes.controller.drivers.'
'network_policy_security_groups._bump_networkpolicy')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.bump_networkpolicy')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'
'delete_security_group_rule')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'

View File

@ -13,13 +13,16 @@
# limitations under the License.
from unittest import mock
from kuryr_kubernetes.controller.drivers import utils
from oslo_config import cfg
from kuryr_kubernetes import constants
from kuryr_kubernetes.controller.drivers import utils
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.tests import base as test_base
from kuryr_kubernetes.tests.unit import kuryr_fixtures as k_fix
CONF = cfg.CONF
class TestUtils(test_base.TestCase):
@ -86,3 +89,29 @@ class TestUtils(test_base.TestCase):
utils.match_selector({'matchLabels': {'app': 'demo',
'foo': 'bar'}},
{'app': 'demo'}))
def test_is_network_policy_enabled(self):
CONF.set_override('enabled_handlers', ['fake_handler'],
group='kubernetes')
CONF.set_override('service_security_groups_driver', 'foo',
group='kubernetes')
self.assertFalse(utils.is_network_policy_enabled())
CONF.set_override('enabled_handlers', ['policy'],
group='kubernetes')
CONF.set_override('service_security_groups_driver', 'foo',
group='kubernetes')
self.assertFalse(utils.is_network_policy_enabled())
CONF.set_override('enabled_handlers', ['policy'],
group='kubernetes')
self.addCleanup(CONF.clear_override, 'enabled_handlers',
group='kubernetes')
CONF.set_override('service_security_groups_driver', 'policy',
group='kubernetes')
self.addCleanup(CONF.clear_override, 'service_security_groups_driver',
group='kubernetes')
self.assertTrue(utils.is_network_policy_enabled())

View File

@ -26,6 +26,8 @@ from kuryr_kubernetes.tests import base as test_base
from kuryr_kubernetes.tests.unit import kuryr_fixtures as k_fix
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'
'is_network_policy_enabled', mock.Mock(return_value=True))
class TestKuryrNetworkHandler(test_base.TestCase):
def setUp(self):

View File

@ -309,8 +309,8 @@ class TestKuryrPortHandler(test_base.TestCase):
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'activate_vif')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._is_network_policy_enabled')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'
'is_network_policy_enabled')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_np(self, ged, is_np_enabled, get_k8s_client,
@ -394,8 +394,8 @@ class TestKuryrPortHandler(test_base.TestCase):
'ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.LBaaSDriver.'
'get_instance')
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._is_network_policy_enabled')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'
'is_network_policy_enabled')
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'release_vif')
@mock.patch('kuryr_kubernetes.controller.drivers.default_security_groups.'
@ -705,38 +705,6 @@ class TestKuryrPortHandler(test_base.TestCase):
utils.get_res_link(self._kp),
arg)
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_is_network_policy_enabled(self, ged, k8s):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
CONF.set_override('enabled_handlers', ['fake_handler'],
group='kubernetes')
CONF.set_override('service_security_groups_driver', 'foo',
group='kubernetes')
self.assertFalse(kp._is_network_policy_enabled())
CONF.set_override('enabled_handlers', ['policy'],
group='kubernetes')
CONF.set_override('service_security_groups_driver', 'foo',
group='kubernetes')
self.assertFalse(kp._is_network_policy_enabled())
CONF.set_override('enabled_handlers', ['policy'],
group='kubernetes')
self.addCleanup(CONF.clear_override, 'enabled_handlers',
group='kubernetes')
CONF.set_override('service_security_groups_driver', 'policy',
group='kubernetes')
self.addCleanup(CONF.clear_override, 'service_security_groups_driver',
group='kubernetes')
self.assertTrue(kp._is_network_policy_enabled())
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'
'service_matches_affected_pods')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')