kuryr-kubernetes/kuryr_kubernetes/tests/unit/controller/drivers/test_network_policy_securit...

438 lines
18 KiB
Python

# Copyright 2018 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from kuryr_kubernetes.controller.drivers import network_policy_security_groups
from kuryr_kubernetes.tests import base as test_base
from kuryr_kubernetes.tests.unit import kuryr_fixtures as k_fix
from oslo_config import cfg
class TestNetworkPolicySecurityGroupsDriver(test_base.TestCase):
def setUp(self):
super(TestNetworkPolicySecurityGroupsDriver, self).setUp()
self._project_id = mock.sentinel.project_id
self._sg_id = mock.sentinel.sg_id
self._sg_id2 = mock.sentinel._sg_id2
self._namespace = 'default'
self._crd = {
'metadata': {'name': mock.sentinel.name,
'selfLink': mock.sentinel.selfLink},
'spec': {
'egressSgRules': [
{'security_group_rule':
{'description': 'Kuryr-Kubernetes NetPolicy SG rule',
'direction': 'egress',
'ethertype': 'IPv4',
'port_range_max': 5978,
'port_range_min': 5978,
'protocol': 'tcp',
'security_group_id': self._sg_id,
'id': mock.sentinel.id
}}],
'ingressSgRules': [
{'security_group_rule':
{'description': 'Kuryr-Kubernetes NetPolicy SG rule',
'direction': 'ingress',
'ethertype': 'IPv4',
'port_range_max': 6379,
'port_range_min': 6379,
'protocol': 'tcp',
'security_group_id': self._sg_id,
'id': mock.sentinel.id
}}],
'podSelector': {
'matchExpressions': [
{
'key': 'environment',
'operator': 'In',
'values': [
'production']}],
'matchLabels': {
'run': 'demo'
}},
'securityGroupId': self._sg_id,
'securityGroupName': mock.sentinel.sg_name}}
self._crd2 = {
'metadata': {'name': mock.sentinel.name3,
'selfLink': mock.sentinel.selfLink},
'spec': {
'ingressSgRules': [
{'security_group_rule':
{'description': 'Kuryr-Kubernetes NetPolicy SG rule',
'direction': 'ingress',
'ethertype': 'IPv4',
'port_range_max': 8080,
'port_range_min': 8080,
'protocol': 'tcp',
'security_group_id': self._sg_id2,
'id': mock.sentinel.id
}}],
'podSelector': {},
'securityGroupId': self._sg_id2,
'securityGroupName': mock.sentinel.sg_name}}
self._crds = {
"apiVersion": "v1",
"items": [self._crd],
"kind": "List",
"metadata": {
"resourceVersion": "",
"selfLink": mock.sentinel.selfLink}}
self._multiple_crds = {
"apiVersion": "v1",
"items": [self._crd, self._crd2],
"kind": "List",
"metadata": {
"resourceVersion": "",
"selfLink": mock.sentinel.selfLink}}
self._empty_crds = {
"apiVersion": "v1",
"items": [],
"kind": "List",
"metadata": {
"resourceVersion": "",
"selfLink": mock.sentinel.selfLink}}
self._pod = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'name': mock.sentinel.pod_name,
'namespace': self._namespace,
'labels': {
'run': 'demo',
'environment': 'production'}},
'spec': {
'containers': [{
'image': 'kuryr/demo',
'imagePullPolicy': 'Always',
'name': mock.sentinel.pod_name
}]
}}
self._pod2 = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'name': mock.sentinel.pod_name,
'namespace': self._namespace,
'labels': {
'run': 'demo',
'environment': 'development'},
'annotations': {
'openstack.org/kuryr-pod-label': '{'
'"run": "demo","environment": "development"}'}},
'spec': {
'containers': [{
'image': 'kuryr/demo',
'imagePullPolicy': 'Always',
'name': mock.sentinel.pod_name
}]
}}
self._pod_without_label = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'name': mock.sentinel.pod_name,
'namespace': self._namespace},
'spec': {
'containers': [{
'image': 'kuryr/demo',
'imagePullPolicy': 'Always',
'name': mock.sentinel.pod_name
}]
}}
self.kubernetes = self.useFixture(k_fix.MockK8sClient()).client
self._driver = (
network_policy_security_groups.NetworkPolicySecurityGroupsDriver())
self._crd_sg_id = mock.sentinel.crd_sg_id
self._crd_without_rules = {
"apiVersion": "openstack.org/v1",
"kind": "KuryrNetPolicy",
"metadata": {"name": "np-test-network-policy",
"namespace": "default"},
"spec": {
"egressSgRules": [],
"ingressSgRules": [],
"networkpolicy_spec": {
"ingress": [
{"from": [
{"namespaceSelector": {
"matchLabels": {"name": "dev"}},
"podSelector": {
"matchLabels": {"tier": "backend"}}}],
"ports": [
{"port": 6379,
"protocol": "TCP"}]}],
"podSelector": {"matchLabels": {"app": "demo"}},
"policyTypes": ["Ingress"]},
"podSelector": {"matchLabels": {"app": "demo"}},
"securityGroupId": self._crd_sg_id}}
self._pod_ip = mock.sentinel.pod_ip
self._pod_dev_namespace = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'name': mock.sentinel.pod_name,
'namespace': 'dev',
'labels': {
'tier': 'backend'},
'annotations': {
'openstack.org/kuryr-pod-label': '{"tier": "backend"}'}},
'spec': {
'containers': [{
'image': 'kuryr/demo',
'imagePullPolicy': 'Always',
'name': mock.sentinel.pod_name
}]},
'status': {'podIP': self._pod_ip}}
self._sg_rule_body = {
u'security_group_rule': {
u'direction': 'ingress',
u'protocol': u'tcp',
u'description': 'Kuryr-Kubernetes NetPolicy SG rule',
u'ethertype': 'IPv4',
u'port_range_max': 6379,
u'security_group_id': self._crd_sg_id,
u'port_range_min': 6379,
u'remote_ip_prefix': self._pod_ip}}
self._new_rule_id = mock.sentinel.id
self._crd_with_rule = {
"apiVersion": "openstack.org/v1",
"kind": "KuryrNetPolicy",
"metadata": {"name": "np-test-network-policy",
"namespace": "default"},
"spec": {
"egressSgRules": [],
"ingressSgRules": [{
"security_group_rule": {
"description": "Kuryr-Kubernetes NetPolicy SG rule",
"direction": "ingress",
"ethertype": "IPv4",
"id": self._new_rule_id,
"port_range_max": 6379,
"port_range_min": 6379,
"protocol": "tcp",
"remote_ip_prefix": self._pod_ip,
"security_group_id": self._crd_sg_id}}],
"networkpolicy_spec": {
"ingress": [
{"from": [
{"namespaceSelector": {
"matchLabels": {"name": "dev"}},
"podSelector": {
"matchLabels": {"tier": "backend"}}}],
"ports": [
{"port": 6379,
"protocol": "TCP"}]}],
"podSelector": {"matchLabels": {"app": "demo"}},
"policyTypes": ["Ingress"]},
"podSelector": {"matchLabels": {"app": "demo"}},
"securityGroupId": self._crd_sg_id}}
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'
'create_security_group_rule')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'
'create_security_group_rule_body')
@mock.patch.object(network_policy_security_groups,
'_match_selector', return_value=True)
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_ip')
def test__create_sg_rules(self, m_get_pod_ip,
m_match_selector,
m_create_sg_rule_body,
m_create_sg_rule):
m_get_pod_ip.return_value = self._pod_ip
m_create_sg_rule_body.return_value = self._sg_rule_body
sgr_id = mock.sentinel.sgr_id
m_create_sg_rule.return_value = sgr_id
crd = self._crd_without_rules
pod = self._pod_dev_namespace
matched = False
new_sg_rule = self._sg_rule_body
policy = crd['spec']['networkpolicy_spec']
rule_list = policy.get('ingress', None)
crd_rules = crd['spec'].get('ingressSgRules')
for rule_block in rule_list:
for rule in rule_block.get('from', []):
namespace_selector = rule.get('namespaceSelector')
pod_selector = rule.get('podSelector')
matched = network_policy_security_groups._create_sg_rules(
crd, pod, namespace_selector, pod_selector, rule_block,
crd_rules, 'ingress', matched)
new_sg_rule['namespaceSelector'] = namespace_selector
new_sg_rule['podSelector'] = pod_selector
new_sg_rule['security_group_rule']['id'] = sgr_id
m_match_selector.assert_called_once_with(
pod_selector, pod['metadata']['labels'])
m_get_pod_ip.assert_called_once_with(pod)
m_create_sg_rule_body.assert_called_once()
m_create_sg_rule.assert_called_once()
self.assertEqual([new_sg_rule], crd_rules)
self.assertEqual(matched, True)
@mock.patch.object(network_policy_security_groups,
'_match_selector', return_value=False)
def test__create_sg_rules_no_match(self, m_match_selector):
crd = self._crd_without_rules
pod = self._pod2
policy = crd['spec']['networkpolicy_spec']
rule_list = policy.get('ingress', None)
crd_rules = crd['spec'].get('ingressSgRules')
for rule_block in rule_list:
for rule in rule_block.get('from', []):
namespace_selector = rule.get('namespaceSelector')
pod_selector = rule.get('podSelector')
matched = network_policy_security_groups._create_sg_rules(
crd, pod, namespace_selector, pod_selector, rule_block,
crd_rules, 'ingress', False)
self.assertEqual(matched, False)
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'
'patch_kuryr_crd')
@mock.patch.object(network_policy_security_groups,
'_get_kuryrnetpolicy_crds')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'
'delete_security_group_rule')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_ip')
def test_delete_sg_rules(self, m_get_pod_ip, m_delete_sg_rule,
m_get_knp_crds, m_patch_kuryr_crd):
crd = self._crd_with_rule
i_rule = crd['spec'].get('ingressSgRules')[0]
sgr_id = i_rule['security_group_rule'].get('id')
m_get_pod_ip.return_value = self._pod_ip
m_get_knp_crds.return_value = {
"apiVersion": "v1",
"items": [crd],
"kind": "List",
"metadata": {
"resourceVersion": "",
"selfLink": mock.sentinel.selfLink}}
i_rules = e_rules = []
pod = self._pod_dev_namespace
self._driver.delete_sg_rules(pod)
m_get_knp_crds.assert_called_once()
m_get_pod_ip.assert_called_once_with(pod)
m_delete_sg_rule.assert_called_once_with(sgr_id)
m_patch_kuryr_crd.assert_called_with(
crd, i_rules, e_rules, crd['spec'].get('podSelector'))
@mock.patch('kuryr_kubernetes.config.CONF')
@mock.patch.object(network_policy_security_groups,
'_get_kuryrnetpolicy_crds')
def test_get_sgs_for_pod_without_label(self, m_get_crds, m_cfg):
m_get_crds.return_value = self._crds
sg_list = [str(mock.sentinel.sg_id)]
m_cfg.neutron_defaults.pod_security_groups = sg_list
sgs = self._driver.get_security_groups(self._pod_without_label,
self._project_id)
m_get_crds.assert_called_once_with(namespace=self._namespace)
self.assertEqual(sg_list, sgs)
@mock.patch.object(network_policy_security_groups,
'_match_expressions')
@mock.patch.object(network_policy_security_groups,
'_match_labels')
@mock.patch.object(network_policy_security_groups,
'_get_kuryrnetpolicy_crds')
def test_get_sgs_for_pod_with_label(self, m_get_crds, m_match_labels,
m_match_expressions):
m_get_crds.return_value = self._crds
m_match_expressions.return_value = True
m_match_labels.return_value = True
pod_labels = self._pod['metadata']['labels']
resp = self._driver.get_security_groups(self._pod, self._project_id)
m_get_crds.assert_called_once_with(namespace=self._namespace)
m_match_expressions.assert_called_once_with(
self._crd['spec']['podSelector']['matchExpressions'], pod_labels)
m_match_labels.assert_called_once_with(
self._crd['spec']['podSelector']['matchLabels'], pod_labels)
self.assertEqual(resp, [str(self._sg_id)])
@mock.patch('kuryr_kubernetes.config.CONF')
@mock.patch.object(network_policy_security_groups,
'_match_expressions')
@mock.patch.object(network_policy_security_groups,
'_match_labels')
@mock.patch.object(network_policy_security_groups,
'_get_kuryrnetpolicy_crds')
def test_get_sgs_for_pod_with_label_no_match(self, m_get_crds,
m_match_labels,
m_match_expressions, m_cfg):
m_get_crds.return_value = self._crds
m_match_expressions.return_value = False
m_match_labels.return_value = True
sg_list = [mock.sentinel.sg_id]
m_cfg.neutron_defaults.pod_security_groups = sg_list
pod_labels = self._pod2['metadata']['labels']
sgs = self._driver.get_security_groups(self._pod2, self._project_id)
m_get_crds.assert_called_once_with(namespace=self._namespace)
m_match_expressions.assert_called_once_with(
self._crd['spec']['podSelector']['matchExpressions'], pod_labels)
m_match_labels.assert_called_once_with(
self._crd['spec']['podSelector']['matchLabels'], pod_labels)
self.assertEqual(sg_list, sgs)
@mock.patch.object(network_policy_security_groups,
'_get_kuryrnetpolicy_crds')
def test_get_sgs_no_crds(self, m_get_crds):
m_get_crds.return_value = self._empty_crds
cfg.CONF.set_override('pod_security_groups', [],
group='neutron_defaults')
self.assertRaises(cfg.RequiredOptError,
self._driver.get_security_groups, self._pod,
self._project_id)
m_get_crds.assert_called_with(namespace=self._namespace)
@mock.patch.object(network_policy_security_groups,
'_match_expressions')
@mock.patch.object(network_policy_security_groups,
'_match_labels')
@mock.patch.object(network_policy_security_groups,
'_get_kuryrnetpolicy_crds')
def test_get_sgs_multiple_crds(self, m_get_crds, m_match_labels,
m_match_expressions):
m_match_expressions.return_value = True
m_match_labels.return_value = True
m_get_crds.return_value = self._multiple_crds
resp = self._driver.get_security_groups(self._pod, self._project_id)
m_get_crds.assert_called_once_with(namespace=self._namespace)
self.assertEqual([str(self._sg_id), str(self._sg_id2)], resp)