Merge "Limit the number of concurrent create_ports requests"
This commit is contained in:
commit
6b10dcde9e
@ -331,7 +331,7 @@ class PodVIFDriver(DriverBase, metaclass=abc.ABCMeta):
|
||||
raise NotImplementedError()
|
||||
|
||||
def request_vifs(self, pod, project_id, subnets, security_groups,
|
||||
num_ports):
|
||||
num_ports, semaphore):
|
||||
"""Creates Neutron ports for pods and returns them as VIF objects list.
|
||||
|
||||
It follows the same pattern as request_vif but creating the specified
|
||||
@ -351,6 +351,8 @@ class PodVIFDriver(DriverBase, metaclass=abc.ABCMeta):
|
||||
returned by
|
||||
`PodSecurityGroupsDriver.get_security_groups`
|
||||
:param num_ports: number of ports to be created
|
||||
:param semaphore: a eventlet Semaphore to limit the number of create
|
||||
Port in bulk running in parallel
|
||||
:return: VIF objects list
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
@ -56,7 +56,7 @@ class NestedVlanPodVIFDriver(nested_vif.NestedPodVIFDriver):
|
||||
return ovu.neutron_to_osvif_vif_nested_vlan(port, subnets, vlan_id)
|
||||
|
||||
def request_vifs(self, pod, project_id, subnets, security_groups,
|
||||
num_ports, trunk_ip=None):
|
||||
num_ports, semaphore, trunk_ip=None):
|
||||
"""This method creates subports and returns a list with their vifs.
|
||||
|
||||
It creates up to num_ports subports and attaches them to the trunk
|
||||
@ -86,13 +86,16 @@ class NestedVlanPodVIFDriver(nested_vif.NestedPodVIFDriver):
|
||||
return []
|
||||
|
||||
bulk_port_rq = {'ports': [port_rq] * len(subports_info)}
|
||||
try:
|
||||
ports = list(os_net.create_ports(bulk_port_rq))
|
||||
except os_exc.SDKException:
|
||||
for subport_info in subports_info:
|
||||
self._release_vlan_id(subport_info['segmentation_id'])
|
||||
LOG.exception("Error creating bulk ports: %s", bulk_port_rq)
|
||||
raise
|
||||
# restrict amount of create Ports in bulk that might be running
|
||||
# in parallel.
|
||||
with semaphore:
|
||||
try:
|
||||
ports = list(os_net.create_ports(bulk_port_rq))
|
||||
except os_exc.SDKException:
|
||||
for subport_info in subports_info:
|
||||
self._release_vlan_id(subport_info['segmentation_id'])
|
||||
LOG.exception("Error creating bulk ports: %s", bulk_port_rq)
|
||||
raise
|
||||
self._check_port_binding(ports)
|
||||
if not self._tag_on_creation:
|
||||
utils.tag_neutron_resources(ports)
|
||||
|
@ -58,18 +58,21 @@ class NeutronPodVIFDriver(base.PodVIFDriver):
|
||||
return ovu.neutron_to_osvif_vif(port.binding_vif_type, port, subnets)
|
||||
|
||||
def request_vifs(self, pod, project_id, subnets, security_groups,
|
||||
num_ports):
|
||||
num_ports, semaphore):
|
||||
os_net = clients.get_network_client()
|
||||
|
||||
rq = self._get_port_request(pod, project_id, subnets, security_groups,
|
||||
unbound=True)
|
||||
|
||||
bulk_port_rq = {'ports': [rq] * num_ports}
|
||||
try:
|
||||
ports = list(os_net.create_ports(bulk_port_rq))
|
||||
except os_exc.SDKException:
|
||||
LOG.exception("Error creating bulk ports: %s", bulk_port_rq)
|
||||
raise
|
||||
# restrict amount of create Ports in bulk that might be running
|
||||
# in parallel.
|
||||
with semaphore:
|
||||
try:
|
||||
ports = list(os_net.create_ports(bulk_port_rq))
|
||||
except os_exc.SDKException:
|
||||
LOG.exception("Error creating bulk ports: %s", bulk_port_rq)
|
||||
raise
|
||||
|
||||
vif_plugin = ports[0].binding_vif_type
|
||||
|
||||
|
@ -107,6 +107,7 @@ VIF_TYPE_TO_DRIVER_MAPPING = {
|
||||
|
||||
NODE_PORTS_CLEAN_FREQUENCY = 600 # seconds
|
||||
POPULATE_POOL_TIMEOUT = 420 # seconds
|
||||
BULK_PORTS_CREATION_REQUESTS = 20
|
||||
|
||||
|
||||
class NoopVIFPool(base.VIFPoolDriver):
|
||||
@ -162,6 +163,8 @@ class BaseVIFPool(base.VIFPoolDriver, metaclass=abc.ABCMeta):
|
||||
when populating pools.
|
||||
- ports_pool_update_frequency: interval in seconds between ports pool
|
||||
updates for recycling ports.
|
||||
Also, it has a Semaphore _create_ports_semaphore to restrict the number of
|
||||
bulk Ports creation calls running in parallel.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
@ -257,7 +260,8 @@ class BaseVIFPool(base.VIFPoolDriver, metaclass=abc.ABCMeta):
|
||||
project_id=pool_key[1],
|
||||
subnets=subnets,
|
||||
security_groups=security_groups,
|
||||
num_ports=num_ports)
|
||||
num_ports=num_ports,
|
||||
semaphore=self._create_ports_semaphore)
|
||||
except os_exc.SDKException as exc:
|
||||
kubernetes.add_event(
|
||||
pod, 'FailToPopulateVIFPool',
|
||||
@ -380,6 +384,8 @@ class BaseVIFPool(base.VIFPoolDriver, metaclass=abc.ABCMeta):
|
||||
self._last_update = collections.defaultdict()
|
||||
self._lock = threading.Lock()
|
||||
self._populate_pool_lock = collections.defaultdict(threading.Lock)
|
||||
semaphore = eventlet.semaphore.Semaphore(BULK_PORTS_CREATION_REQUESTS)
|
||||
self._create_ports_semaphore = semaphore
|
||||
|
||||
def _get_trunks_info(self):
|
||||
"""Returns information about trunks and their subports.
|
||||
@ -1142,7 +1148,8 @@ class NestedVIFPool(BaseVIFPool):
|
||||
subnets=subnets,
|
||||
security_groups=security_groups,
|
||||
num_ports=num_ports,
|
||||
trunk_ip=trunk_ip)
|
||||
trunk_ip=trunk_ip,
|
||||
semaphore=self._create_ports_semaphore)
|
||||
|
||||
pool_key = self._get_pool_key(trunk_ip, project_id, None, subnets)
|
||||
for vif in vifs:
|
||||
|
@ -9,7 +9,7 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
import munch
|
||||
from unittest import mock
|
||||
|
||||
@ -106,9 +106,11 @@ class TestNestedVlanPodVIFDriver(test_base.TestCase):
|
||||
subports_info)
|
||||
os_net.create_ports.return_value = (p for p in [port, port])
|
||||
m_to_vif.return_value = vif
|
||||
semaphore = mock.MagicMock(spec=eventlet.semaphore.Semaphore(20))
|
||||
|
||||
self.assertEqual([vif, vif], cls.request_vifs(
|
||||
m_driver, pod, project_id, subnets, security_groups, num_ports))
|
||||
m_driver, pod, project_id, subnets, security_groups, num_ports,
|
||||
semaphore))
|
||||
|
||||
m_driver._get_parent_port.assert_called_once_with(pod)
|
||||
m_driver._get_trunk_id.assert_called_once_with(parent_port)
|
||||
@ -145,10 +147,11 @@ class TestNestedVlanPodVIFDriver(test_base.TestCase):
|
||||
m_driver._get_trunk_id.return_value = trunk_id
|
||||
m_driver._create_subports_info.return_value = (port_request,
|
||||
subports_info)
|
||||
semaphore = mock.MagicMock(spec=eventlet.semaphore.Semaphore(20))
|
||||
|
||||
self.assertEqual([], cls.request_vifs(m_driver, pod, project_id,
|
||||
subnets, security_groups,
|
||||
num_ports))
|
||||
num_ports, semaphore))
|
||||
|
||||
m_driver._get_parent_port.assert_called_once_with(pod)
|
||||
m_driver._get_trunk_id.assert_called_once_with(parent_port)
|
||||
@ -185,10 +188,12 @@ class TestNestedVlanPodVIFDriver(test_base.TestCase):
|
||||
m_driver._create_subports_info.return_value = (port_request,
|
||||
subports_info)
|
||||
os_net.create_ports.side_effect = os_exc.SDKException
|
||||
semaphore = mock.MagicMock(spec=eventlet.semaphore.Semaphore(20))
|
||||
|
||||
self.assertRaises(
|
||||
os_exc.SDKException, cls.request_vifs,
|
||||
m_driver, pod, project_id, subnets, security_groups, num_ports)
|
||||
m_driver, pod, project_id, subnets, security_groups, num_ports,
|
||||
semaphore)
|
||||
|
||||
m_driver._get_parent_port.assert_called_once_with(pod)
|
||||
m_driver._get_trunk_id.assert_called_once_with(parent_port)
|
||||
@ -228,9 +233,10 @@ class TestNestedVlanPodVIFDriver(test_base.TestCase):
|
||||
subports_info)
|
||||
os_net.create_ports.return_value = (p for p in [port, port])
|
||||
os_net.add_trunk_subports.side_effect = os_exc.ConflictException
|
||||
semaphore = mock.MagicMock(spec=eventlet.semaphore.Semaphore(20))
|
||||
|
||||
self.assertEqual([], cls.request_vifs(m_driver, pod, project_id,
|
||||
subnets, security_groups, num_ports))
|
||||
subnets, security_groups, num_ports, semaphore))
|
||||
|
||||
m_driver._get_parent_port.assert_called_once_with(pod)
|
||||
m_driver._get_trunk_id.assert_called_once_with(parent_port)
|
||||
@ -273,9 +279,10 @@ class TestNestedVlanPodVIFDriver(test_base.TestCase):
|
||||
subports_info)
|
||||
os_net.create_ports.return_value = (p for p in [port, port])
|
||||
os_net.add_trunk_subports.side_effect = os_exc.SDKException
|
||||
semaphore = mock.MagicMock(spec=eventlet.semaphore.Semaphore(20))
|
||||
|
||||
self.assertEqual([], cls.request_vifs(m_driver, pod, project_id,
|
||||
subnets, security_groups, num_ports))
|
||||
subnets, security_groups, num_ports, semaphore))
|
||||
|
||||
m_driver._get_parent_port.assert_called_once_with(pod)
|
||||
m_driver._get_trunk_id.assert_called_once_with(parent_port)
|
||||
|
@ -12,7 +12,7 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
from unittest import mock
|
||||
|
||||
from kuryr.lib import constants as kl_const
|
||||
@ -82,9 +82,11 @@ class NeutronPodVIFDriver(test_base.TestCase):
|
||||
|
||||
os_net.create_ports.return_value = (p for p in [port, port])
|
||||
m_to_vif.return_value = vif
|
||||
semaphore = mock.MagicMock(spec=eventlet.semaphore.Semaphore(20))
|
||||
|
||||
self.assertEqual([vif, vif], cls.request_vifs(
|
||||
m_driver, pod, project_id, subnets, security_groups, num_ports))
|
||||
m_driver, pod, project_id, subnets, security_groups, num_ports,
|
||||
semaphore))
|
||||
|
||||
m_driver._get_port_request.assert_called_once_with(
|
||||
pod, project_id, subnets, security_groups, unbound=True)
|
||||
@ -115,13 +117,15 @@ class NeutronPodVIFDriver(test_base.TestCase):
|
||||
port1_1 = munch.Munch({'id': port_id, 'binding_vif_type': vif_plugin})
|
||||
vif = mock.sentinel.vif
|
||||
bulk_rq = {'ports': [port_request for _ in range(num_ports)]}
|
||||
semaphore = mock.MagicMock(spec=eventlet.semaphore.Semaphore(20))
|
||||
|
||||
os_net.create_ports.return_value = (p for p in [port1, port2])
|
||||
os_net.get_port.return_value = port1_1
|
||||
m_to_vif.return_value = vif
|
||||
|
||||
self.assertEqual([vif, vif], cls.request_vifs(
|
||||
m_driver, pod, project_id, subnets, security_groups, num_ports))
|
||||
m_driver, pod, project_id, subnets, security_groups, num_ports,
|
||||
semaphore))
|
||||
|
||||
m_driver._get_port_request.assert_called_once_with(
|
||||
pod, project_id, subnets, security_groups, unbound=True)
|
||||
@ -148,11 +152,12 @@ class NeutronPodVIFDriver(test_base.TestCase):
|
||||
m_driver._get_port_request.return_value = port_request
|
||||
bulk_rq = {'ports': [port_request for _ in range(num_ports)]}
|
||||
|
||||
semaphore = mock.MagicMock(spec=eventlet.semaphore.Semaphore(20))
|
||||
os_net.create_ports.side_effect = os_exc.SDKException
|
||||
|
||||
self.assertRaises(os_exc.SDKException, cls.request_vifs,
|
||||
m_driver, pod, project_id, subnets,
|
||||
security_groups, num_ports)
|
||||
security_groups, num_ports, semaphore)
|
||||
|
||||
m_driver._get_port_request.assert_called_once_with(
|
||||
pod, project_id, subnets, security_groups, unbound=True)
|
||||
|
@ -13,6 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import eventlet
|
||||
import threading
|
||||
from unittest import mock
|
||||
import uuid
|
||||
@ -165,6 +166,11 @@ class BaseVIFPool(test_base.TestCase):
|
||||
m_driver._available_ports_pools = {}
|
||||
m_driver._last_update = {pool_key: {tuple(security_groups): 1}}
|
||||
m_driver._recovered_pools = True
|
||||
m_driver._lock = threading.Lock()
|
||||
m_driver._populate_pool_lock = {
|
||||
pool_key: mock.MagicMock(spec=threading.Lock())}
|
||||
m_driver._create_ports_semaphore = mock.MagicMock(
|
||||
spec=eventlet.semaphore.Semaphore(20))
|
||||
|
||||
oslo_cfg.CONF.set_override('ports_pool_min',
|
||||
5,
|
||||
|
Loading…
x
Reference in New Issue
Block a user