Merge "Resolve issues with object races."

This commit is contained in:
Zuul 2020-11-18 19:38:55 +00:00 committed by Gerrit Code Review
commit 659a91ea30
5 changed files with 573 additions and 3 deletions

View File

@ -227,7 +227,12 @@ class LBaaSv2Driver(base.LBaaSDriver):
if CONF.octavia_defaults.enforce_sg_rules:
vip_port = self._get_vip_port(loadbalancer)
if vip_port:
lb_sg = vip_port.security_group_ids[0]
try:
lb_sg = vip_port.security_group_ids[0]
except IndexError:
LOG.warning("We still waiting for SG to be created for "
"VIP %s", vip_port)
raise k_exc.ResourceNotReady(listener_id)
else:
LOG.debug("Skipping sg update for lb %s", loadbalancer['name'])
return

View File

@ -225,8 +225,17 @@ class KuryrPortHandler(k8s_base.ResourceEventHandler):
return False
# Request the default interface of pod
main_vif = self._drv_vif_pool.request_vif(
pod, project_id, subnets, security_groups)
try:
main_vif = self._drv_vif_pool.request_vif(pod, project_id,
subnets,
security_groups)
except os_exc.ResourceNotFound:
# NOTE(gryf): It might happen, that between getting security
# groups above and requesting VIF, network policy is deleted,
# hence we will get 404 from OpenStackSDK. Let's retry, to refresh
# information regarding SG.
LOG.warning("SG not found during VIF requesting. Retrying.")
raise k_exc.ResourceNotReady(pod['metadata']['name'])
if not main_vif:
pod_name = pod['metadata']['name']

View File

@ -16,6 +16,7 @@
import uuid
from openstack.network.v2 import port as os_port
from openstack.network.v2 import security_group_rule as os_sgr
from os_vif import objects as osv_objects
from os_vif.objects import vif as osv_vif
from oslo_serialization import jsonutils
@ -140,3 +141,23 @@ def get_port_obj(port_id='07cfe856-11cc-43d9-9200-ff4dc02d3620',
'updated_at': u'2019-12-04T15:06:09Z'}
port_data.update(kwargs)
return os_port.Port(**port_data)
def get_sgr_obj(sgr_id='7621d1e0-a2d2-4496-94eb-ffd375d20877',
sg_id='cfb3dfc4-7a43-4ba1-b92d-b8b2650d7f88',
protocol='tcp', direction='ingress'):
sgr_data = {'description': '',
'direction': direction,
'ether_type': 'IPv4',
'id': sgr_id,
'port_range_max': 8080,
'port_range_min': 8080,
'project_id': '5ea46368c7fe436bb8732738c149fbce',
'protocol': protocol,
'remote_group_id': None,
'remote_ip_prefix': None,
'security_group_id': sg_id,
'tenant_id': '5ea46368c7fe436bb8732738c149fbce'}
return os_sgr.SecurityGroupRule(**sgr_data)

View File

@ -23,10 +23,12 @@ from openstack.load_balancer.v2 import member as o_mem
from openstack.load_balancer.v2 import pool as o_pool
from oslo_config import cfg
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes.controller.drivers import lbaasv2 as d_lbaasv2
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.objects import lbaas as obj_lbaas
from kuryr_kubernetes.tests import base as test_base
from kuryr_kubernetes.tests import fake
from kuryr_kubernetes.tests.unit import kuryr_fixtures as k_fix
CONF = cfg.CONF
@ -1041,3 +1043,505 @@ class TestLBaaSv2Driver(test_base.TestCase):
def test_provisioning_timer(self):
# REVISIT(ivc): add test if _provisioning_timer is to stay
self.skipTest("not implemented")
class TestLBaaSv2AppyMembersSecurityGroup(test_base.TestCase):
def setUp(self):
super().setUp()
self.lb = {'id': 'a4de5f1a-ac03-45b1-951d-39f108d52e7d',
'ip': '10.0.0.142',
'name': 'default/lb',
'port_id': '5be1b3c4-7d44-4597-9294-cadafdf1ec69',
'project_id': '7ef23242bb3f4773a58da681421ab26e',
'provider': 'amphora',
'security_groups': ['328900a2-c328-41cc-946f-56ae8720ec0d'],
'subnet_id': 'c85e2e10-1fad-4218-ad10-7de4aa5de7ce'}
self.port = 80
self.target_port = 8080
self.protocol = 'TCP'
self.sg_rule_name = 'default/lb:TCP:80'
self.listener_id = '858869ec-e4fa-4715-b22f-bd08889c6235'
self.new_sgs = ['48cfc812-a442-44bf-989f-8dbaf23a7007']
self.vip = fake.get_port_obj()
@mock.patch('kuryr_kubernetes.clients.get_network_client')
def test__apply_members_security_groups_no_enforce(self, gnc):
CONF.set_override('enforce_sg_rules', False, group='octavia_defaults')
self.addCleanup(CONF.clear_override, 'enforce_sg_rules',
group='octavia_defaults')
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = None
cls._apply_members_security_groups(m_driver, self.lb, self.port,
self.target_port, self.protocol,
self.sg_rule_name,
self.listener_id, self.new_sgs)
m_driver._get_vip_port.assert_not_called()
@mock.patch('kuryr_kubernetes.clients.get_network_client')
def test__apply_members_security_groups_no_vip(self, gnc):
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = None
cls._apply_members_security_groups(m_driver, self.lb, self.port,
self.target_port, self.protocol,
self.sg_rule_name,
self.listener_id, self.new_sgs)
m_driver._get_vip_port.assert_called_once_with(self.lb)
@mock.patch('kuryr_kubernetes.clients.get_network_client')
def test__apply_members_security_groups_no_sg(self, gnc):
self.new_sgs = None
self.vip.security_group_ids = []
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = self.vip
self.assertRaises(k_exc.ResourceNotReady,
cls._apply_members_security_groups, m_driver,
self.lb, self.port, self.target_port, self.protocol,
self.sg_rule_name, self.listener_id, self.new_sgs)
m_driver._get_vip_port.assert_called_once_with(self.lb)
@mock.patch('kuryr_kubernetes.clients.get_network_client')
def test__apply_members_security_groups_conf_with_octavia_acls(self, gnc):
self.new_sgs = None
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port = mock.Mock(return_value=self.vip)
m_driver._octavia_acls = True
m_driver._create_listeners_acls = mock.Mock()
cls._apply_members_security_groups(m_driver, self.lb, self.port,
self.target_port, self.protocol,
self.sg_rule_name, self.listener_id,
self.new_sgs)
m_driver._get_vip_port.assert_called_once_with(self.lb)
m_driver._create_listeners_acls.assert_called_once_with(
self.lb, self.port, self.target_port, self.protocol,
self.vip.security_group_ids[0], self.new_sgs, self.listener_id)
def test__apply_members_security_groups_new_sgs(self):
os_net = self.useFixture(k_fix.MockNetworkClient()).client
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = self.vip
m_driver._octavia_acls = False
os_net.security_group_rules.return_value = []
CONF.set_override('pod_security_groups', [], group='neutron_defaults')
self.addCleanup(CONF.clear_override, 'pod_security_groups',
group='neutron_defaults')
cls._apply_members_security_groups(m_driver, self.lb, self.port,
self.target_port, self.protocol,
self.sg_rule_name, self.listener_id,
self.new_sgs)
m_driver._get_vip_port.assert_called_once_with(self.lb)
os_net.security_group_rules.assert_has_calls([
mock.call(security_group_id=self.vip.security_group_ids[0],
project_id=self.lb['project_id']),
mock.call(security_group_id=self.new_sgs[0])])
def test__apply_members_security_groups_conf_lb_sgs(self):
os_net = self.useFixture(k_fix.MockNetworkClient()).client
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = self.vip
m_driver._octavia_acls = False
sgr = fake.get_sgr_obj()
os_net.security_group_rules.side_effect = ([], [sgr])
self.new_sgs = []
CONF.set_override('pod_security_groups', [], group='neutron_defaults')
self.addCleanup(CONF.clear_override, 'pod_security_groups',
group='neutron_defaults')
cls._apply_members_security_groups(m_driver, self.lb, self.port,
self.target_port, self.protocol,
self.sg_rule_name,
self.listener_id, self.new_sgs)
m_driver._get_vip_port.assert_called_once_with(self.lb)
os_net.security_group_rules.assert_has_calls([
mock.call(security_group_id=self.vip.security_group_ids[0],
project_id=self.lb['project_id']),
mock.call(security_group_id=self.lb['security_groups'][0])])
os_net.create_security_group_rule.assert_called_once_with(
direction='ingress',
ether_type=k_const.IPv4,
port_range_min=self.port,
port_range_max=self.port,
protocol=self.protocol,
remote_ip_prefix=sgr.remote_ip_prefix,
security_group_id=sgr.security_group_id,
description=self.sg_rule_name)
def test__apply_members_security_groups_conf_lb_sgs_conflict(self):
os_net = self.useFixture(k_fix.MockNetworkClient()).client
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = self.vip
m_driver._octavia_acls = False
sgr = fake.get_sgr_obj()
os_net.security_group_rules.side_effect = ([], [sgr])
os_net.create_security_group_rule.side_effect = (os_exc
.ConflictException)
self.new_sgs = []
CONF.set_override('pod_security_groups', [], group='neutron_defaults')
self.addCleanup(CONF.clear_override, 'pod_security_groups',
group='neutron_defaults')
cls._apply_members_security_groups(m_driver, self.lb, self.port,
self.target_port, self.protocol,
self.sg_rule_name,
self.listener_id, self.new_sgs)
m_driver._get_vip_port.assert_called_once_with(self.lb)
os_net.security_group_rules.assert_has_calls([
mock.call(security_group_id=self.vip.security_group_ids[0],
project_id=self.lb['project_id']),
mock.call(security_group_id=self.lb['security_groups'][0])])
os_net.create_security_group_rule.assert_called_once_with(
direction='ingress',
ether_type=k_const.IPv4,
port_range_min=self.port,
port_range_max=self.port,
protocol=self.protocol,
remote_ip_prefix=None,
security_group_id=self.vip.security_group_ids[0],
description=self.sg_rule_name)
def test__apply_members_security_groups_conf_lb_sgs_sdkexception(self):
os_net = self.useFixture(k_fix.MockNetworkClient()).client
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = self.vip
m_driver._octavia_acls = False
sgr = fake.get_sgr_obj()
os_net.security_group_rules.side_effect = ([], [sgr])
os_net.create_security_group_rule.side_effect = os_exc.SDKException
self.new_sgs = []
CONF.set_override('pod_security_groups', [], group='neutron_defaults')
self.addCleanup(CONF.clear_override, 'pod_security_groups',
group='neutron_defaults')
cls._apply_members_security_groups(m_driver, self.lb, self.port,
self.target_port, self.protocol,
self.sg_rule_name,
self.listener_id, self.new_sgs)
m_driver._get_vip_port.assert_called_once_with(self.lb)
os_net.security_group_rules.assert_has_calls([
mock.call(security_group_id=self.vip.security_group_ids[0],
project_id=self.lb['project_id']),
mock.call(security_group_id=self.lb['security_groups'][0])])
os_net.create_security_group_rule.assert_called_once_with(
direction='ingress',
ether_type=k_const.IPv4,
port_range_min=self.port,
port_range_max=self.port,
protocol=self.protocol,
remote_ip_prefix=None,
security_group_id=self.vip.security_group_ids[0],
description=self.sg_rule_name)
@mock.patch("kuryr_kubernetes.utils.get_service_subnet_version",
return_value=k_const.IP_VERSION_6)
def test__apply_members_security_groups_ipv6_add_default(self, gssv):
os_net = self.useFixture(k_fix.MockNetworkClient()).client
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = self.vip
m_driver._octavia_acls = False
os_net.security_group_rules.return_value = []
CONF.set_override('pod_security_groups', self.new_sgs,
group='neutron_defaults')
self.addCleanup(CONF.clear_override, 'pod_security_groups',
group='neutron_defaults')
cls._apply_members_security_groups(m_driver, self.lb, self.port,
self.target_port, self.protocol,
self.sg_rule_name, self.listener_id,
self.new_sgs)
m_driver._get_vip_port.assert_called_once_with(self.lb)
os_net.security_group_rules.assert_called_once_with(
security_group_id=self.vip.security_group_ids[0],
project_id=self.lb['project_id'])
os_net.create_security_group_rule.assert_called_once_with(
direction='ingress',
ether_type=k_const.IPv6,
port_range_min=self.port,
port_range_max=self.port,
protocol=self.protocol,
security_group_id=self.vip.security_group_ids[0],
description=self.sg_rule_name)
def test__apply_members_security_groups_add_default_conflict(self):
os_net = self.useFixture(k_fix.MockNetworkClient()).client
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = self.vip
m_driver._octavia_acls = False
os_net.security_group_rules.return_value = []
CONF.set_override('pod_security_groups', self.new_sgs,
group='neutron_defaults')
self.addCleanup(CONF.clear_override, 'pod_security_groups',
group='neutron_defaults')
os_net.create_security_group_rule.side_effect = (os_exc
.ConflictException)
cls._apply_members_security_groups(m_driver, self.lb, self.port,
self.target_port, self.protocol,
self.sg_rule_name, self.listener_id,
self.new_sgs)
m_driver._get_vip_port.assert_called_once_with(self.lb)
os_net.security_group_rules.assert_called_once_with(
security_group_id=self.vip.security_group_ids[0],
project_id=self.lb['project_id'])
os_net.create_security_group_rule.assert_called_once_with(
direction='ingress',
ether_type=k_const.IPv4,
port_range_min=self.port,
port_range_max=self.port,
protocol=self.protocol,
security_group_id=self.vip.security_group_ids[0],
description=self.sg_rule_name)
def test__apply_members_security_groups_add_default_sdk_exception(self):
os_net = self.useFixture(k_fix.MockNetworkClient()).client
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = self.vip
m_driver._octavia_acls = False
os_net.security_group_rules.return_value = []
CONF.set_override('pod_security_groups', self.new_sgs,
group='neutron_defaults')
self.addCleanup(CONF.clear_override, 'pod_security_groups',
group='neutron_defaults')
os_net.create_security_group_rule.side_effect = os_exc.SDKException
cls._apply_members_security_groups(m_driver, self.lb, self.port,
self.target_port, self.protocol,
self.sg_rule_name, self.listener_id,
self.new_sgs)
m_driver._get_vip_port.assert_called_once_with(self.lb)
os_net.security_group_rules.assert_called_once_with(
security_group_id=self.vip.security_group_ids[0],
project_id=self.lb['project_id'])
os_net.create_security_group_rule.assert_called_once_with(
direction='ingress',
ether_type=k_const.IPv4,
port_range_min=self.port,
port_range_max=self.port,
protocol=self.protocol,
security_group_id=self.vip.security_group_ids[0],
description=self.sg_rule_name)
def test__apply_members_security_groups_same_sg(self):
os_net = self.useFixture(k_fix.MockNetworkClient()).client
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
self.vip.security_group_ids = self.new_sgs
m_driver._get_vip_port.return_value = self.vip
m_driver._octavia_acls = False
os_net.security_group_rules.return_value = []
CONF.set_override('pod_security_groups', [], group='neutron_defaults')
self.addCleanup(CONF.clear_override, 'pod_security_groups',
group='neutron_defaults')
cls._apply_members_security_groups(m_driver, self.lb, self.port,
self.target_port, self.protocol,
self.sg_rule_name, self.listener_id,
self.new_sgs)
m_driver._get_vip_port.assert_called_once_with(self.lb)
os_net.security_group_rules.assert_called_once_with(
security_group_id=self.vip.security_group_ids[0],
project_id=self.lb['project_id'])
def test__apply_members_security_groups_unmatched_target_port(self):
os_net = self.useFixture(k_fix.MockNetworkClient()).client
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = self.vip
m_driver._octavia_acls = False
sgr = fake.get_sgr_obj()
self.target_port = 9090
os_net.security_group_rules.side_effect = ([], [sgr])
self.new_sgs = []
CONF.set_override('pod_security_groups', [], group='neutron_defaults')
self.addCleanup(CONF.clear_override, 'pod_security_groups',
group='neutron_defaults')
cls._apply_members_security_groups(m_driver, self.lb, self.port,
self.target_port, self.protocol,
self.sg_rule_name,
self.listener_id, self.new_sgs)
m_driver._get_vip_port.assert_called_once_with(self.lb)
os_net.security_group_rules.assert_has_calls([
mock.call(security_group_id=self.vip.security_group_ids[0],
project_id=self.lb['project_id']),
mock.call(security_group_id=self.lb['security_groups'][0])])
os_net.create_security_group_rule.assert_not_called()
def test__apply_members_security_groups_egress(self):
os_net = self.useFixture(k_fix.MockNetworkClient()).client
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = self.vip
m_driver._octavia_acls = False
sgr = fake.get_sgr_obj(direction='egress')
os_net.security_group_rules.side_effect = ([], [sgr])
self.new_sgs = []
CONF.set_override('pod_security_groups', [], group='neutron_defaults')
self.addCleanup(CONF.clear_override, 'pod_security_groups',
group='neutron_defaults')
cls._apply_members_security_groups(m_driver, self.lb, self.port,
self.target_port, self.protocol,
self.sg_rule_name,
self.listener_id, self.new_sgs)
m_driver._get_vip_port.assert_called_once_with(self.lb)
os_net.security_group_rules.assert_has_calls([
mock.call(security_group_id=self.vip.security_group_ids[0],
project_id=self.lb['project_id']),
mock.call(security_group_id=self.lb['security_groups'][0])])
os_net.create_security_group_rule.assert_not_called()
def test__apply_members_security_groups_no_delete_lbaas_rules(self):
os_net = self.useFixture(k_fix.MockNetworkClient()).client
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = self.vip
m_driver._octavia_acls = False
self.lb['security_groups'] = []
self.new_sgs = []
sgr = fake.get_sgr_obj()
os_net.security_group_rules.return_value = [sgr]
cls._apply_members_security_groups(m_driver, self.lb, self.port,
self.target_port, self.protocol,
self.sg_rule_name,
self.listener_id, self.new_sgs)
m_driver._get_vip_port.assert_called_once_with(self.lb)
os_net.security_group_rules.assert_called_once_with(
security_group_id=self.vip.security_group_ids[0],
project_id=self.lb['project_id'])
os_net.create_security_group_rule.assert_not_called()
def test__apply_members_security_groups_delete_matched_lbaas_rules(self):
os_net = self.useFixture(k_fix.MockNetworkClient()).client
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = self.vip
m_driver._octavia_acls = False
sgr = fake.get_sgr_obj()
os_net.security_group_rules.side_effect = ([sgr], [sgr])
self.new_sgs = []
CONF.set_override('pod_security_groups', [], group='neutron_defaults')
self.addCleanup(CONF.clear_override, 'pod_security_groups',
group='neutron_defaults')
cls._apply_members_security_groups(m_driver, self.lb, self.port,
self.target_port, self.protocol,
self.sg_rule_name,
self.listener_id, self.new_sgs)
m_driver._get_vip_port.assert_called_once_with(self.lb)
os_net.security_group_rules.assert_has_calls([
mock.call(security_group_id=self.vip.security_group_ids[0],
project_id=self.lb['project_id']),
mock.call(security_group_id=self.lb['security_groups'][0])])
os_net.create_security_group_rule.assert_called_once_with(
direction='ingress',
ether_type=k_const.IPv4,
port_range_min=self.port,
port_range_max=self.port,
protocol=self.protocol,
remote_ip_prefix=sgr.remote_ip_prefix,
security_group_id=sgr.security_group_id,
description=self.sg_rule_name)
os_net.delete_security_group_rule.assert_called_once_with(sgr.id)
def test__apply_members_security_groups_delete_unmatched_lbaas_rules(self):
os_net = self.useFixture(k_fix.MockNetworkClient()).client
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = self.vip
m_driver._octavia_acls = False
sgr = fake.get_sgr_obj()
os_net.security_group_rules.side_effect = ([sgr], [sgr])
self.new_sgs = []
CONF.set_override('pod_security_groups', [], group='neutron_defaults')
self.addCleanup(CONF.clear_override, 'pod_security_groups',
group='neutron_defaults')
self.port = 8080
cls._apply_members_security_groups(m_driver, self.lb, self.port,
self.target_port, self.protocol,
self.sg_rule_name,
self.listener_id, self.new_sgs)
m_driver._get_vip_port.assert_called_once_with(self.lb)
os_net.security_group_rules.assert_has_calls([
mock.call(security_group_id=self.vip.security_group_ids[0],
project_id=self.lb['project_id']),
mock.call(security_group_id=self.lb['security_groups'][0])])
os_net.create_security_group_rule.assert_called_once_with(
direction='ingress',
ether_type=k_const.IPv4,
port_range_min=self.port,
port_range_max=self.port,
protocol=self.protocol,
remote_ip_prefix=sgr.remote_ip_prefix,
security_group_id=sgr.security_group_id,
description=self.sg_rule_name)
m_driver._delete_rule_if_no_match.assert_called_once_with(sgr, [sgr])
def test__apply_members_security_groups_delete_no_default_lbaas_rules(
self):
os_net = self.useFixture(k_fix.MockNetworkClient()).client
cls = d_lbaasv2.LBaaSv2Driver
m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver)
m_driver._get_vip_port.return_value = self.vip
m_driver._octavia_acls = False
sgr = fake.get_sgr_obj()
os_net.security_group_rules.side_effect = ([sgr], [sgr])
self.new_sgs = []
CONF.set_override('pod_security_groups', [], group='neutron_defaults')
self.addCleanup(CONF.clear_override, 'pod_security_groups',
group='neutron_defaults')
m_driver._is_default_rule.return_value = False
cls._apply_members_security_groups(m_driver, self.lb, self.port,
self.target_port, self.protocol,
self.sg_rule_name,
self.listener_id, self.new_sgs)
m_driver._get_vip_port.assert_called_once_with(self.lb)
os_net.security_group_rules.assert_has_calls([
mock.call(security_group_id=self.vip.security_group_ids[0],
project_id=self.lb['project_id']),
mock.call(security_group_id=self.lb['security_groups'][0])])
os_net.create_security_group_rule.assert_called_once_with(
direction='ingress',
ether_type=k_const.IPv4,
port_range_min=self.port,
port_range_max=self.port,
protocol=self.protocol,
remote_ip_prefix=sgr.remote_ip_prefix,
security_group_id=sgr.security_group_id,
description=self.sg_rule_name)

View File

@ -564,6 +564,37 @@ class TestKuryrPortHandler(test_base.TestCase):
mock.sentinel.subnets,
self._security_groups)
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'request_vif')
@mock.patch('kuryr_kubernetes.controller.drivers.default_subnet.'
'DefaultPodSubnetDriver.get_subnets')
@mock.patch('kuryr_kubernetes.controller.drivers.default_security_groups.'
'DefaultPodSecurityGroupsDriver.get_security_groups')
@mock.patch('kuryr_kubernetes.controller.drivers.default_project.'
'DefaultPodProjectDriver.get_project')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_get_vifs_resource_not_found(self, ged, k8s, get_project, get_sg,
get_subnets, request_vif):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
kp.k8s.get.return_value = self._pod
get_sg.return_value = self._security_groups
get_project.return_value = self._project_id
get_subnets.return_value = mock.sentinel.subnets
request_vif.side_effect = os_exc.ResourceNotFound()
self.assertRaises(k_exc.ResourceNotReady, kp.get_vifs, self._kp)
kp.k8s.get.assert_called_once_with(self._pod_uri)
get_project.assert_called_once_with(self._pod)
get_sg.assert_called_once_with(self._pod, self._project_id)
get_subnets.assert_called_once_with(self._pod, self._project_id)
request_vif.assert_called_once_with(self._pod, self._project_id,
mock.sentinel.subnets,
self._security_groups)
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._update_kuryrport_crd')
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'