Merge "Pools support with Network Policies"

This commit is contained in:
Zuul 2019-02-12 11:29:54 +00:00 committed by Gerrit Code Review
commit 5a6a396854
2 changed files with 447 additions and 134 deletions

View File

@ -167,23 +167,25 @@ class BaseVIFPool(base.VIFPoolDriver):
def update_vif_sgs(self, pod, sgs):
self._drv_vif.update_vif_sgs(pod, sgs)
def _get_pool_size(self, pool_key=None):
return len(self._available_ports_pools.get(pool_key, []))
def _get_pool_size(self, pool_key):
pool = self._available_ports_pools.get(pool_key, {})
pool_members = []
for port_list in pool.values():
pool_members.extend(port_list)
return len(pool_members)
def _get_host_addr(self, pod):
return pod['status']['hostIP']
def _get_pool_key(self, host, project_id, security_groups, net_id=None,
subnets=None):
def _get_pool_key(self, host, project_id, net_id=None, subnets=None):
if not net_id and subnets:
net_obj = list(subnets.values())[0]
net_id = net_obj.id
pool_key = (host, project_id, tuple(sorted(security_groups)),
net_id)
pool_key = (host, project_id, net_id)
return pool_key
def _get_pool_key_net(self, pool_key):
return pool_key[3]
return pool_key[2]
def request_vif(self, pod, project_id, subnets, security_groups):
try:
@ -191,33 +193,37 @@ class BaseVIFPool(base.VIFPoolDriver):
except KeyError:
LOG.warning("Pod has not been scheduled yet.")
raise
pool_key = self._get_pool_key(host_addr, project_id, security_groups,
None, subnets)
pool_key = self._get_pool_key(host_addr, project_id, None, subnets)
try:
return self._get_port_from_pool(pool_key, pod, subnets)
return self._get_port_from_pool(pool_key, pod, subnets,
tuple(sorted(security_groups)))
except exceptions.ResourceNotReady:
LOG.warning("Ports pool does not have available ports!")
eventlet.spawn(self._populate_pool, pool_key, pod, subnets)
eventlet.spawn(self._populate_pool, pool_key, pod, subnets,
tuple(sorted(security_groups)))
raise
def _get_port_from_pool(self, pool_key, pod, subnets):
def _get_port_from_pool(self, pool_key, pod, subnets, security_groups):
raise NotImplementedError()
def _populate_pool(self, pool_key, pod, subnets):
def _populate_pool(self, pool_key, pod, subnets, security_groups):
# REVISIT(ltomasbo): Drop the subnets parameter and get the information
# from the pool_key, which will be required when multi-network is
# supported
now = time.time()
try:
if (now - oslo_cfg.CONF.vif_pool.ports_pool_update_frequency <
self._last_update.get(pool_key, 0)):
LOG.info("Not enough time since the last pool update")
pool_updates = self._last_update.get(pool_key)
if pool_updates:
last_update = pool_updates.get(security_groups, 0)
try:
if (now - oslo_cfg.CONF.vif_pool.ports_pool_update_frequency <
last_update):
LOG.info("Not enough time since the last pool update")
return
except AttributeError:
LOG.info("Kuryr-controller not yet ready to populate pools")
return
except AttributeError:
LOG.info("Kuryr-controller not yet ready to populate pools")
return
self._last_update[pool_key] = now
self._last_update[pool_key] = {security_groups: now}
pool_size = self._get_pool_size(pool_key)
if pool_size < oslo_cfg.CONF.vif_pool.ports_pool_min:
@ -227,18 +233,19 @@ class BaseVIFPool(base.VIFPoolDriver):
pod=pod,
project_id=pool_key[1],
subnets=subnets,
security_groups=list(pool_key[2]),
security_groups=security_groups,
num_ports=num_ports)
for vif in vifs:
self._existing_vifs[vif.id] = vif
self._available_ports_pools.setdefault(pool_key,
[]).append(vif.id)
self._available_ports_pools.setdefault(
pool_key, {}).setdefault(
security_groups, []).append(vif.id)
def release_vif(self, pod, vif, project_id, security_groups):
host_addr = self._get_host_addr(pod)
pool_key = self._get_pool_key(host_addr, project_id, security_groups,
vif.network.id, None)
pool_key = self._get_pool_key(host_addr, project_id, vif.network.id,
None)
try:
if not self._existing_vifs.get(vif.id):
@ -300,12 +307,10 @@ class BaseVIFPool(base.VIFPoolDriver):
@lockutils.synchronized('return_to_pool_baremetal')
@lockutils.synchronized('return_to_pool_nested')
def sync_pools(self):
self._available_ports_pools = collections.defaultdict(
collections.deque)
self._existing_vifs = collections.defaultdict(collections.defaultdict)
self._recyclable_ports = collections.defaultdict(
collections.defaultdict)
self._last_update = collections.defaultdict(collections.defaultdict)
self._available_ports_pools = collections.defaultdict()
self._existing_vifs = collections.defaultdict()
self._recyclable_ports = collections.defaultdict()
self._last_update = collections.defaultdict()
# NOTE(ltomasbo): Ensure previously created ports are recovered into
# their respective pools
self._recover_precreated_ports()
@ -377,11 +382,45 @@ class NeutronVIFPool(BaseVIFPool):
def _get_host_addr(self, pod):
return pod['spec']['nodeName']
def _get_port_from_pool(self, pool_key, pod, subnets):
def _get_port_from_pool(self, pool_key, pod, subnets, security_groups):
try:
port_id = self._available_ports_pools[pool_key].pop()
except (IndexError, AttributeError):
pool_ports = self._available_ports_pools[pool_key]
except (KeyError, AttributeError):
raise exceptions.ResourceNotReady(pod)
try:
port_id = pool_ports[security_groups].pop()
except (KeyError, IndexError):
# Get another port from the pool and update the SG to the
# appropriate one. It uses a port from the group that was updated
# longer ago
pool_updates = self._last_update.get(pool_key, {})
if not pool_updates:
# No pools update info. Selecting a random one
for sg_group, ports in pool_ports.items():
if len(ports) > 0:
port_id = pool_ports[sg_group].pop()
break
else:
raise exceptions.ResourceNotReady(pod)
else:
min_date = -1
for sg_group, date in pool_updates.items():
if pool_ports.get(sg_group):
if min_date == -1 or date < min_date:
min_date = date
min_sg_group = sg_group
if min_date == -1:
# pool is empty, no port to reuse
raise exceptions.ResourceNotReady(pod)
port_id = pool_ports[min_sg_group].pop()
neutron = clients.get_neutron_client()
neutron.update_port(
port_id,
{
"port": {
'security_groups': list(security_groups)
}
})
if config.CONF.kubernetes.port_debug:
neutron = clients.get_neutron_client()
neutron.update_port(
@ -395,7 +434,8 @@ class NeutronVIFPool(BaseVIFPool):
# check if the pool needs to be populated
if (self._get_pool_size(pool_key) <
oslo_cfg.CONF.vif_pool.ports_pool_min):
eventlet.spawn(self._populate_pool, pool_key, pod, subnets)
eventlet.spawn(self._populate_pool, pool_key, pod, subnets,
security_groups)
return self._existing_vifs[port_id]
def _return_ports_to_pool(self):
@ -426,7 +466,8 @@ class NeutronVIFPool(BaseVIFPool):
device_owner=kl_const.DEVICE_OWNER)
for port in kuryr_ports:
if port['id'] in self._recyclable_ports:
sg_current[port['id']] = port['security_groups']
sg_current[port['id']] = tuple(sorted(
port['security_groups']))
for port_id, pool_key in self._recyclable_ports.copy().items():
if (not oslo_cfg.CONF.vif_pool.ports_pool_max or
@ -435,25 +476,24 @@ class NeutronVIFPool(BaseVIFPool):
port_name = (constants.KURYR_PORT_NAME
if config.CONF.kubernetes.port_debug
else '')
if (config.CONF.kubernetes.port_debug or
list(pool_key[2]) != sg_current.get(port_id)):
if config.CONF.kubernetes.port_debug:
try:
neutron.update_port(
port_id,
{
"port": {
'name': port_name,
'device_id': '',
'security_groups': list(pool_key[2])
'device_id': ''
}
})
except n_exc.NeutronClientException:
LOG.warning("Error preparing port %s to be "
LOG.warning("Error changing name for port %s to be "
"reused, put back on the cleanable "
"pool.", port_id)
continue
self._available_ports_pools.setdefault(
pool_key, []).append(port_id)
pool_key, {}).setdefault(
sg_current.get(port_id), []).append(port_id)
else:
try:
del self._existing_vifs[port_id]
@ -502,12 +542,13 @@ class NeutronVIFPool(BaseVIFPool):
net_obj = subnet[subnet_id]
pool_key = self._get_pool_key(port_host,
port['project_id'],
port['security_groups'],
net_obj.id, None)
self._existing_vifs[port['id']] = vif
self._available_ports_pools.setdefault(
pool_key, []).append(port['id'])
pool_key, {}).setdefault(
tuple(sorted(port['security_groups'])), []).append(
port['id'])
LOG.info("PORTS POOL: pools updated with pre-created ports")
self._create_healthcheck_file()
@ -524,10 +565,13 @@ class NeutronVIFPool(BaseVIFPool):
# on the available_ports_pools dict. The next call forces it to be on
# that dict before cleaning it up
self._trigger_return_to_pool()
for pool_key, ports_id in self._available_ports_pools.items():
for pool_key, ports in self._available_ports_pools.items():
if self._get_pool_key_net(pool_key) != net_id:
continue
self._available_ports_pools[pool_key] = []
ports_id = []
for sg_ports in ports.values():
ports_id.extend(sg_ports)
for port_id in ports_id:
try:
del self._existing_vifs[port_id]
@ -560,11 +604,45 @@ class NestedVIFPool(BaseVIFPool):
def set_vif_driver(self, driver):
self._drv_vif = driver
def _get_port_from_pool(self, pool_key, pod, subnets):
def _get_port_from_pool(self, pool_key, pod, subnets, security_groups):
try:
port_id = self._available_ports_pools[pool_key].pop()
except (IndexError, AttributeError):
pool_ports = self._available_ports_pools[pool_key]
except (KeyError, AttributeError):
raise exceptions.ResourceNotReady(pod)
try:
port_id = pool_ports[security_groups].pop()
except (KeyError, IndexError):
# Get another port from the pool and update the SG to the
# appropriate one. It uses a port from the group that was updated
# longer ago
pool_updates = self._last_update.get(pool_key, {})
if not pool_updates:
# No pools update info. Selecting a random one
for sg_group, ports in pool_ports.items():
if len(ports) > 0:
port_id = pool_ports[sg_group].pop()
break
else:
raise exceptions.ResourceNotReady(pod)
else:
min_date = -1
for sg_group, date in pool_updates.items():
if pool_ports.get(sg_group):
if min_date == -1 or date < min_date:
min_date = date
min_sg_group = sg_group
if min_date == -1:
# pool is empty, no port to reuse
raise exceptions.ResourceNotReady(pod)
port_id = pool_ports[min_sg_group].pop()
neutron = clients.get_neutron_client()
neutron.update_port(
port_id,
{
"port": {
'security_groups': list(security_groups)
}
})
if config.CONF.kubernetes.port_debug:
neutron = clients.get_neutron_client()
neutron.update_port(
@ -577,7 +655,8 @@ class NestedVIFPool(BaseVIFPool):
# check if the pool needs to be populated
if (self._get_pool_size(pool_key) <
oslo_cfg.CONF.vif_pool.ports_pool_min):
eventlet.spawn(self._populate_pool, pool_key, pod, subnets)
eventlet.spawn(self._populate_pool, pool_key, pod, subnets,
security_groups)
return self._existing_vifs[port_id]
def _return_ports_to_pool(self):
@ -608,7 +687,8 @@ class NestedVIFPool(BaseVIFPool):
device_owner=['trunk:subport', kl_const.DEVICE_OWNER])
for subport in kuryr_subports:
if subport['id'] in self._recyclable_ports:
sg_current[subport['id']] = subport['security_groups']
sg_current[subport['id']] = tuple(sorted(
subport['security_groups']))
for port_id, pool_key in self._recyclable_ports.copy().items():
if (not oslo_cfg.CONF.vif_pool.ports_pool_max or
@ -617,24 +697,23 @@ class NestedVIFPool(BaseVIFPool):
port_name = (constants.KURYR_PORT_NAME
if config.CONF.kubernetes.port_debug
else '')
if (config.CONF.kubernetes.port_debug or
list(pool_key[2]) != sg_current.get(port_id)):
if config.CONF.kubernetes.port_debug:
try:
neutron.update_port(
port_id,
{
"port": {
'name': port_name,
'security_groups': list(pool_key[2])
}
})
except n_exc.NeutronClientException:
LOG.warning("Error preparing port %s to be "
LOG.warning("Error changing name for port %s to be "
"reused, put back on the cleanable "
"pool.", port_id)
continue
self._available_ports_pools.setdefault(
pool_key, []).append(port_id)
pool_key, {}).setdefault(
sg_current.get(port_id), []).append(port_id)
else:
trunk_id = self._get_trunk_id(neutron, pool_key)
try:
@ -713,8 +792,6 @@ class NestedVIFPool(BaseVIFPool):
net_obj = subnet[subnet_id]
pool_key = self._get_pool_key(host_addr,
kuryr_subport['project_id'],
kuryr_subport[
'security_groups'],
net_obj.id, None)
if action == 'recover':
@ -723,7 +800,9 @@ class NestedVIFPool(BaseVIFPool):
self._existing_vifs[kuryr_subport['id']] = vif
self._available_ports_pools.setdefault(
pool_key, []).append(kuryr_subport['id'])
pool_key, {}).setdefault(tuple(sorted(
kuryr_subport['security_groups'])),
[]).append(kuryr_subport['id'])
elif action == 'free':
try:
@ -733,8 +812,9 @@ class NestedVIFPool(BaseVIFPool):
self._drv_vif._release_vlan_id(
subport['segmentation_id'])
del self._existing_vifs[kuryr_subport['id']]
self._available_ports_pools[pool_key].remove(
kuryr_subport['id'])
self._available_ports_pools[pool_key][
tuple(sorted(kuryr_subport['security_groups']
))].remove(kuryr_subport['id'])
except n_exc.PortNotFoundClient:
LOG.debug('Unable to release port %s as it no '
'longer exists.', kuryr_subport['id'])
@ -764,12 +844,11 @@ class NestedVIFPool(BaseVIFPool):
num_ports=num_ports,
trunk_ip=trunk_ip)
pool_key = self._get_pool_key(trunk_ip, project_id, security_groups,
None, subnets)
pool_key = self._get_pool_key(trunk_ip, project_id, None, subnets)
for vif in vifs:
self._existing_vifs[vif.id] = vif
self._available_ports_pools.setdefault(pool_key,
[]).append(vif.id)
self._available_ports_pools.setdefault(pool_key, {}).setdefault(
tuple(sorted(security_groups)), []).append(vif.id)
def free_pool(self, trunk_ips=None):
"""Removes subports from the pool and deletes neutron port resource.
@ -791,19 +870,21 @@ class NestedVIFPool(BaseVIFPool):
# on the available_ports_pools dict. The next call forces it to be on
# that dict before cleaning it up
self._trigger_return_to_pool()
for pool_key, ports_ids in self._available_ports_pools.items():
for pool_key, ports in self._available_ports_pools.items():
if self._get_pool_key_net(pool_key) != net_id:
continue
self._available_ports_pools[pool_key] = []
trunk_id = self._get_trunk_id(neutron, pool_key)
ports_id = [p_id for sg_ports in ports.values()
for p_id in sg_ports]
try:
self._drv_vif._remove_subports(neutron, trunk_id, ports_ids)
self._drv_vif._remove_subports(neutron, trunk_id, ports_id)
except n_exc.NeutronClientException:
LOG.exception('Error removing subports from trunk: %s',
trunk_id)
continue
for port_id in ports_ids:
for port_id in ports_id:
try:
self._drv_vif._release_vlan_id(
self._existing_vifs[port_id].vlan_id)

View File

@ -187,15 +187,14 @@ class BaseVIFPool(test_base.TestCase):
pod = mock.sentinel.pod
project_id = str(uuid.uuid4())
subnets = mock.sentinel.subnets
security_groups = [mock.sentinel.security_groups]
pool_key = (mock.sentinel.host_addr, project_id,
tuple(security_groups))
security_groups = 'test-sg'
pool_key = (mock.sentinel.host_addr, project_id)
vif = osv_vif.VIFOpenVSwitch(id='0fa0e837-d34e-4580-a6c4-04f5f607d93e')
vifs = [vif]
m_driver._existing_vifs = {}
m_driver._available_ports_pools = {}
m_driver._last_update = {pool_key: 1}
m_driver._last_update = {pool_key: {tuple(security_groups): 1}}
oslo_cfg.CONF.set_override('ports_pool_min',
5,
@ -206,7 +205,8 @@ class BaseVIFPool(test_base.TestCase):
m_driver._get_pool_size.return_value = 2
vif_driver.request_vifs.return_value = vifs
cls._populate_pool(m_driver, pool_key, pod, subnets)
cls._populate_pool(m_driver, pool_key, pod, subnets,
tuple(security_groups))
m_driver._get_pool_size.assert_called_once()
m_driver._drv_vif.request_vifs.assert_called_once()
@ -218,16 +218,16 @@ class BaseVIFPool(test_base.TestCase):
pod = mock.sentinel.pod
project_id = str(uuid.uuid4())
subnets = mock.sentinel.subnets
security_groups = [mock.sentinel.security_groups]
pool_key = (mock.sentinel.host_addr, project_id,
tuple(security_groups))
security_groups = 'test-sg'
pool_key = (mock.sentinel.host_addr, project_id)
oslo_cfg.CONF.set_override('ports_pool_update_frequency',
15,
group='vif_pool')
m_driver._last_update = {pool_key: 1}
m_driver._last_update = {pool_key: {tuple(security_groups): 1}}
cls._populate_pool(m_driver, pool_key, pod, subnets)
cls._populate_pool(m_driver, pool_key, pod, subnets,
tuple(security_groups))
m_driver._get_pool_size.assert_not_called()
@mock.patch('time.time', return_value=50)
@ -244,9 +244,8 @@ class BaseVIFPool(test_base.TestCase):
pod = mock.sentinel.pod
project_id = str(uuid.uuid4())
subnets = mock.sentinel.subnets
security_groups = [mock.sentinel.security_groups]
pool_key = (mock.sentinel.host_addr, project_id,
tuple(security_groups))
security_groups = 'test-sg'
pool_key = (mock.sentinel.host_addr, project_id)
oslo_cfg.CONF.set_override('ports_pool_update_frequency',
15,
@ -254,10 +253,11 @@ class BaseVIFPool(test_base.TestCase):
oslo_cfg.CONF.set_override('ports_pool_min',
5,
group='vif_pool')
m_driver._last_update = {pool_key: 1}
m_driver._last_update = {pool_key: {tuple(security_groups): 1}}
m_driver._get_pool_size.return_value = 10
cls._populate_pool(m_driver, pool_key, pod, subnets)
cls._populate_pool(m_driver, pool_key, pod, subnets,
tuple(security_groups))
m_driver._get_pool_size.assert_called_once()
m_driver._drv_vif.request_vifs.assert_not_called()
@ -341,11 +341,12 @@ class NeutronVIFPool(test_base.TestCase):
port_id = str(uuid.uuid4())
port = mock.sentinel.port
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
pod = get_pod_obj()
m_driver._available_ports_pools = {
pool_key: collections.deque([port_id])}
pool_key: {tuple(security_groups): collections.deque([port_id])}}
m_driver._existing_vifs = {port_id: port}
m_get_port_name.return_value = get_pod_name(pod)
@ -362,7 +363,7 @@ class NeutronVIFPool(test_base.TestCase):
m_driver._get_pool_size.return_value = pool_length
self.assertEqual(port, cls._get_port_from_pool(
m_driver, pool_key, pod, subnets))
m_driver, pool_key, pod, subnets, tuple(security_groups)))
neutron.update_port.assert_called_once_with(
port_id,
@ -386,11 +387,12 @@ class NeutronVIFPool(test_base.TestCase):
port_id = str(uuid.uuid4())
port = mock.sentinel.port
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
pod = get_pod_obj()
m_driver._available_ports_pools = {
pool_key: collections.deque([port_id])}
pool_key: {tuple(security_groups): collections.deque([port_id])}}
m_driver._existing_vifs = {port_id: port}
m_get_port_name.return_value = get_pod_name(pod)
@ -404,7 +406,7 @@ class NeutronVIFPool(test_base.TestCase):
m_driver._get_pool_size.return_value = pool_length
self.assertEqual(port, cls._get_port_from_pool(
m_driver, pool_key, pod, subnets))
m_driver, pool_key, pod, subnets, tuple(security_groups)))
neutron.update_port.assert_called_once_with(
port_id,
@ -424,11 +426,124 @@ class NeutronVIFPool(test_base.TestCase):
pod = get_pod_obj()
pool_key = mock.sentinel.pool_key
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
m_driver._available_ports_pools = {pool_key: collections.deque([])}
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([])}}
m_driver._last_update = {pool_key: {tuple(security_groups): 1}}
self.assertRaises(exceptions.ResourceNotReady, cls._get_port_from_pool,
m_driver, pool_key, pod, subnets)
m_driver, pool_key, pod, subnets,
tuple(security_groups))
neutron.update_port.assert_not_called()
@mock.patch('eventlet.spawn')
def test__get_port_from_pool_empty_pool_reuse(self, m_eventlet):
cls = vif_pool.NeutronVIFPool
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pod = get_pod_obj()
port_id = str(uuid.uuid4())
port = mock.sentinel.port
pool_key = mock.sentinel.pool_key
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
security_groups_2 = 'test-sg2'
oslo_cfg.CONF.set_override('port_debug',
False,
group='kubernetes')
pool_length = 5
m_driver._get_pool_size.return_value = pool_length
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([]),
tuple(security_groups_2): collections.deque([port_id])}}
m_driver._last_update = {pool_key: {tuple(security_groups): 1,
tuple(security_groups_2): 0}}
m_driver._existing_vifs = {port_id: port}
self.assertEqual(port, cls._get_port_from_pool(
m_driver, pool_key, pod, subnets, tuple(security_groups)))
neutron.update_port.assert_called_once_with(
port_id,
{
"port": {
'security_groups': list(security_groups),
}
})
m_eventlet.assert_not_called()
@mock.patch('eventlet.spawn')
def test__get_port_from_pool_empty_pool_reuse_no_update_info(self,
m_eventlet):
cls = vif_pool.NeutronVIFPool
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pod = get_pod_obj()
port_id = str(uuid.uuid4())
port = mock.sentinel.port
pool_key = mock.sentinel.pool_key
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
security_groups_2 = 'test-sg2'
oslo_cfg.CONF.set_override('port_debug',
False,
group='kubernetes')
pool_length = 5
m_driver._get_pool_size.return_value = pool_length
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([]),
tuple(security_groups_2): collections.deque([port_id])}}
m_driver._last_update = {}
m_driver._existing_vifs = {port_id: port}
self.assertEqual(port, cls._get_port_from_pool(
m_driver, pool_key, pod, subnets, tuple(security_groups)))
neutron.update_port.assert_called_once_with(
port_id,
{
"port": {
'security_groups': list(security_groups),
}
})
m_eventlet.assert_not_called()
def test__get_port_from_pool_empty_pool_reuse_no_ports(self):
cls = vif_pool.NeutronVIFPool
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pod = get_pod_obj()
port_id = str(uuid.uuid4())
port = mock.sentinel.port
pool_key = mock.sentinel.pool_key
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
security_groups_2 = 'test-sg2'
oslo_cfg.CONF.set_override('port_debug',
False,
group='kubernetes')
pool_length = 5
m_driver._get_pool_size.return_value = pool_length
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([]),
tuple(security_groups_2): collections.deque([])}}
m_driver._last_update = {}
m_driver._existing_vifs = {port_id: port}
self.assertRaises(exceptions.ResourceNotReady, cls._get_port_from_pool,
m_driver, pool_key, pod, subnets, tuple(
security_groups))
neutron.update_port.assert_not_called()
@ -438,7 +553,7 @@ class NeutronVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 5
@ -462,7 +577,6 @@ class NeutronVIFPool(test_base.TestCase):
"port": {
'name': constants.KURYR_PORT_NAME,
'device_id': '',
'security_groups': ['security_group']
}
})
neutron.delete_port.assert_not_called()
@ -473,7 +587,7 @@ class NeutronVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 5
@ -499,7 +613,7 @@ class NeutronVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 10
vif = mock.sentinel.vif
@ -524,7 +638,7 @@ class NeutronVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 5
@ -552,7 +666,6 @@ class NeutronVIFPool(test_base.TestCase):
"port": {
'name': constants.KURYR_PORT_NAME,
'device_id': '',
'security_groups': ['security_group']
}
})
neutron.delete_port.assert_not_called()
@ -562,7 +675,7 @@ class NeutronVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 10
vif = mock.sentinel.vif
@ -588,7 +701,7 @@ class NeutronVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 10
@ -639,8 +752,7 @@ class NeutronVIFPool(test_base.TestCase):
vif = mock.sentinel.vif
m_to_osvif.return_value = vif
pool_key = (port['binding:host_id'], port['project_id'],
tuple(port['security_groups']), net_id)
pool_key = (port['binding:host_id'], port['project_id'], net_id)
m_driver._get_pool_key.return_value = pool_key
m_driver._get_trunks_info.return_value = ({}, {}, {})
@ -652,7 +764,8 @@ class NeutronVIFPool(test_base.TestCase):
m_to_osvif.assert_called_once_with(vif_plugin, port, subnet)
self.assertEqual(m_driver._existing_vifs[port_id], vif)
self.assertEqual(m_driver._available_ports_pools[pool_key], [port_id])
self.assertEqual(m_driver._available_ports_pools[pool_key],
{tuple(port['security_groups']): [port_id]})
@mock.patch('kuryr_kubernetes.os_vif_util.neutron_to_osvif_vif')
@mock.patch('kuryr_kubernetes.utils.get_subnet')
@ -681,9 +794,10 @@ class NeutronVIFPool(test_base.TestCase):
neutron = self.useFixture(k_fix.MockNeutronClient()).client
net_id = mock.sentinel.net_id
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
m_driver._available_ports_pools = {pool_key: [port_id]}
m_driver._available_ports_pools = {pool_key: {
tuple(['security_group']): [port_id]}}
m_driver._existing_vifs = {port_id: mock.sentinel.vif}
m_driver._get_pool_key_net.return_value = net_id
@ -701,9 +815,10 @@ class NeutronVIFPool(test_base.TestCase):
neutron = self.useFixture(k_fix.MockNeutronClient()).client
net_id = mock.sentinel.net_id
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
m_driver._available_ports_pools = {pool_key: [port_id]}
m_driver._available_ports_pools = {pool_key: {
tuple(['security_group']): [port_id]}}
m_driver._existing_vifs = {}
neutron.delete_port.side_effect = n_exc.PortNotFoundClient
@ -765,11 +880,12 @@ class NestedVIFPool(test_base.TestCase):
port_id = str(uuid.uuid4())
port = mock.sentinel.port
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
pod = get_pod_obj()
m_driver._available_ports_pools = {
pool_key: collections.deque([port_id])}
pool_key: {tuple(security_groups): collections.deque([port_id])}}
m_driver._existing_vifs = {port_id: port}
m_get_port_name.return_value = get_pod_name(pod)
@ -783,7 +899,7 @@ class NestedVIFPool(test_base.TestCase):
m_driver._get_pool_size.return_value = pool_length
self.assertEqual(port, cls._get_port_from_pool(
m_driver, pool_key, pod, subnets))
m_driver, pool_key, pod, subnets, tuple(security_groups)))
neutron.update_port.assert_called_once_with(
port_id,
@ -806,11 +922,12 @@ class NestedVIFPool(test_base.TestCase):
port_id = str(uuid.uuid4())
port = mock.sentinel.port
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
pod = get_pod_obj()
m_driver._available_ports_pools = {
pool_key: collections.deque([port_id])}
pool_key: {tuple(security_groups): collections.deque([port_id])}}
m_driver._existing_vifs = {port_id: port}
m_get_port_name.return_value = get_pod_name(pod)
@ -824,7 +941,7 @@ class NestedVIFPool(test_base.TestCase):
m_driver._get_pool_size.return_value = pool_length
self.assertEqual(port, cls._get_port_from_pool(
m_driver, pool_key, pod, subnets))
m_driver, pool_key, pod, subnets, tuple(security_groups)))
neutron.update_port.assert_called_once_with(
port_id,
@ -843,11 +960,124 @@ class NestedVIFPool(test_base.TestCase):
pod = mock.sentinel.pod
pool_key = mock.sentinel.pool_key
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
m_driver._available_ports_pools = {pool_key: collections.deque([])}
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([])}}
m_driver._last_update = {pool_key: {tuple(security_groups): 1}}
self.assertRaises(exceptions.ResourceNotReady, cls._get_port_from_pool,
m_driver, pool_key, pod, subnets)
m_driver, pool_key, pod, subnets, tuple(
security_groups))
neutron.update_port.assert_not_called()
@mock.patch('eventlet.spawn')
def test__get_port_from_pool_empty_pool_reuse(self, m_eventlet):
cls = vif_pool.NestedVIFPool
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pod = mock.sentinel.pod
port_id = str(uuid.uuid4())
port = mock.sentinel.port
pool_key = mock.sentinel.pool_key
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
security_groups_2 = 'test-sg2'
oslo_cfg.CONF.set_override('port_debug',
False,
group='kubernetes')
pool_length = 5
m_driver._get_pool_size.return_value = pool_length
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([]),
tuple(security_groups_2): collections.deque([port_id])}}
m_driver._last_update = {pool_key: {tuple(security_groups): 1,
tuple(security_groups_2): 0}}
m_driver._existing_vifs = {port_id: port}
self.assertEqual(port, cls._get_port_from_pool(
m_driver, pool_key, pod, subnets, tuple(security_groups)))
neutron.update_port.assert_called_once_with(
port_id,
{
"port": {
'security_groups': list(security_groups),
}
})
m_eventlet.assert_not_called()
@mock.patch('eventlet.spawn')
def test__get_port_from_pool_empty_pool_reuse_no_update_info(self,
m_eventlet):
cls = vif_pool.NestedVIFPool
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pod = mock.sentinel.pod
port_id = str(uuid.uuid4())
port = mock.sentinel.port
pool_key = mock.sentinel.pool_key
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
security_groups_2 = 'test-sg2'
oslo_cfg.CONF.set_override('port_debug',
False,
group='kubernetes')
pool_length = 5
m_driver._get_pool_size.return_value = pool_length
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([]),
tuple(security_groups_2): collections.deque([port_id])}}
m_driver._last_update = {}
m_driver._existing_vifs = {port_id: port}
self.assertEqual(port, cls._get_port_from_pool(
m_driver, pool_key, pod, subnets, tuple(security_groups)))
neutron.update_port.assert_called_once_with(
port_id,
{
"port": {
'security_groups': list(security_groups),
}
})
m_eventlet.assert_not_called()
def test__get_port_from_pool_empty_pool_reuse_no_ports(self):
cls = vif_pool.NestedVIFPool
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pod = mock.sentinel.pod
port_id = str(uuid.uuid4())
port = mock.sentinel.port
pool_key = mock.sentinel.pool_key
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
security_groups_2 = 'test-sg2'
oslo_cfg.CONF.set_override('port_debug',
False,
group='kubernetes')
pool_length = 5
m_driver._get_pool_size.return_value = pool_length
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([]),
tuple(security_groups_2): collections.deque([])}}
m_driver._last_update = {}
m_driver._existing_vifs = {port_id: port}
self.assertRaises(exceptions.ResourceNotReady, cls._get_port_from_pool,
m_driver, pool_key, pod, subnets, tuple(
security_groups))
neutron.update_port.assert_not_called()
@ -857,7 +1087,7 @@ class NestedVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 5
@ -880,7 +1110,6 @@ class NestedVIFPool(test_base.TestCase):
{
"port": {
'name': constants.KURYR_PORT_NAME,
'security_groups': ['security_group']
}
})
neutron.delete_port.assert_not_called()
@ -891,7 +1120,7 @@ class NestedVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 5
@ -920,7 +1149,7 @@ class NestedVIFPool(test_base.TestCase):
vif_driver = mock.MagicMock(spec=cls_vif_driver)
m_driver._drv_vif = vif_driver
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 10
vif = mock.MagicMock()
@ -953,7 +1182,7 @@ class NestedVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 5
@ -977,7 +1206,6 @@ class NestedVIFPool(test_base.TestCase):
{
"port": {
'name': constants.KURYR_PORT_NAME,
'security_groups': ['security_group']
}
})
neutron.delete_port.assert_not_called()
@ -990,7 +1218,7 @@ class NestedVIFPool(test_base.TestCase):
vif_driver = mock.MagicMock(spec=cls_vif_driver)
m_driver._drv_vif = vif_driver
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 10
vif = mock.MagicMock()
@ -1027,7 +1255,7 @@ class NestedVIFPool(test_base.TestCase):
vif_driver = mock.MagicMock(spec=cls_vif_driver)
m_driver._drv_vif = vif_driver
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 10
trunk_id = str(uuid.uuid4())
@ -1163,15 +1391,15 @@ class NestedVIFPool(test_base.TestCase):
vif = mock.sentinel.vif
m_to_osvif.return_value = vif
pool_key = (port['binding:host_id'], port['project_id'],
tuple(port['security_groups']), net_id)
pool_key = (port['binding:host_id'], port['project_id'], net_id)
m_driver._get_pool_key.return_value = pool_key
cls._precreated_ports(m_driver, 'recover')
m_driver._get_trunks_info.assert_called_once()
self.assertEqual(m_driver._existing_vifs[port_id], vif)
self.assertEqual(m_driver._available_ports_pools[pool_key], [port_id])
self.assertEqual(m_driver._available_ports_pools[pool_key],
{tuple(port['security_groups']): [port_id]})
neutron.delete_port.assert_not_called()
def test__precreated_ports_free(self):
@ -1200,10 +1428,10 @@ class NestedVIFPool(test_base.TestCase):
m_driver._get_trunks_info.return_value = (p_ports, a_subports,
subnets)
pool_key = (port['binding:host_id'], port['project_id'],
tuple(port['security_groups']), net_id)
pool_key = (port['binding:host_id'], port['project_id'], net_id)
m_driver._get_pool_key.return_value = pool_key
m_driver._available_ports_pools = {pool_key: [port_id]}
m_driver._available_ports_pools = {
pool_key: {tuple(port['security_groups']): [port_id]}}
m_driver._existing_vifs = {port_id: mock.sentinel.vif}
cls._precreated_ports(m_driver, 'free')
@ -1214,7 +1442,8 @@ class NestedVIFPool(test_base.TestCase):
m_driver._drv_vif._release_vlan_id.assert_called_once()
self.assertEqual(m_driver._existing_vifs, {})
self.assertEqual(m_driver._available_ports_pools[pool_key], [])
self.assertEqual(m_driver._available_ports_pools[pool_key][tuple(
port['security_groups'])], [])
@mock.patch('kuryr_kubernetes.os_vif_util.'
'neutron_to_osvif_vif_nested_vlan')
@ -1302,8 +1531,7 @@ class NestedVIFPool(test_base.TestCase):
vif = mock.sentinel.vif
m_to_osvif.return_value = vif
pool_key = (port1['binding:host_id'], port1['project_id'],
tuple(port1['security_groups']), net_id)
pool_key = (port1['binding:host_id'], port1['project_id'], net_id)
m_driver._get_pool_key.return_value = pool_key
cls._precreated_ports(m_driver, 'recover')
@ -1311,7 +1539,8 @@ class NestedVIFPool(test_base.TestCase):
self.assertEqual(m_driver._existing_vifs, {port_id1: vif,
port_id2: vif})
self.assertEqual(m_driver._available_ports_pools[pool_key],
[port_id1, port_id2])
{tuple(port1['security_groups']): [port_id1,
port_id2]})
neutron.delete_port.assert_not_called()
@ddt.data(('recover'), ('free'))
@ -1382,13 +1611,14 @@ class NestedVIFPool(test_base.TestCase):
neutron = self.useFixture(k_fix.MockNeutronClient()).client
net_id = mock.sentinel.net_id
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
trunk_id = str(uuid.uuid4())
vif = mock.MagicMock()
vlan_id = mock.sentinel.vlan_id
vif.vlan_id = vlan_id
m_driver._available_ports_pools = {pool_key: [port_id]}
m_driver._available_ports_pools = {pool_key: {
tuple(['security_group']): [port_id]}}
m_driver._existing_vifs = {port_id: vif}
m_driver._get_trunk_id.return_value = trunk_id
@ -1415,13 +1645,14 @@ class NestedVIFPool(test_base.TestCase):
neutron = self.useFixture(k_fix.MockNeutronClient()).client
net_id = mock.sentinel.net_id
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
trunk_id = str(uuid.uuid4())
vif = mock.MagicMock()
vlan_id = mock.sentinel.vlan_id
vif.vlan_id = vlan_id
m_driver._available_ports_pools = {pool_key: [port_id]}
m_driver._available_ports_pools = {pool_key: {
tuple(['security_group']): [port_id]}}
m_driver._existing_vifs = {port_id: vif}
m_driver._get_trunk_id.return_value = trunk_id
@ -1450,13 +1681,14 @@ class NestedVIFPool(test_base.TestCase):
neutron = self.useFixture(k_fix.MockNeutronClient()).client
net_id = mock.sentinel.net_id
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
trunk_id = str(uuid.uuid4())
vif = mock.MagicMock()
vlan_id = mock.sentinel.vlan_id
vif.vlan_id = vlan_id
m_driver._available_ports_pools = {pool_key: [port_id]}
m_driver._available_ports_pools = {pool_key: {
tuple(['security_group']): [port_id]}}
m_driver._existing_vifs = {}
m_driver._get_trunk_id.return_value = trunk_id