Merge "[eventlet-removal] Replace `eventlet.GreenPool` in the DHCP agent"
This commit is contained in:
@@ -14,6 +14,7 @@
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
from concurrent import futures
|
||||
import functools
|
||||
import os
|
||||
import threading
|
||||
@@ -133,7 +134,8 @@ class DhcpAgent(manager.Manager):
|
||||
self._process_monitor = external_process.ProcessMonitor(
|
||||
config=self.conf,
|
||||
resource_type='dhcp')
|
||||
self._pool = eventlet.GreenPool(size=DHCP_PROCESS_THREADS)
|
||||
self._pool = utils.ThreadPoolExecutorWithBlock(
|
||||
max_workers=DHCP_PROCESS_THREADS)
|
||||
self._queue = queue.ResourceProcessingQueue()
|
||||
self._network_bulk_allocations = {}
|
||||
# Each dhcp-agent restart should trigger a restart of all
|
||||
@@ -298,7 +300,6 @@ class DhcpAgent(manager.Manager):
|
||||
"""
|
||||
only_nets = set([] if (not networks or None in networks) else networks)
|
||||
LOG.info('Synchronizing state')
|
||||
pool = eventlet.GreenPool(self.conf.num_sync_threads)
|
||||
known_network_ids = set(self.cache.get_network_ids())
|
||||
|
||||
try:
|
||||
@@ -314,12 +315,19 @@ class DhcpAgent(manager.Manager):
|
||||
LOG.exception('Unable to sync network state on '
|
||||
'deleted network %s', deleted_id)
|
||||
|
||||
for network in active_networks:
|
||||
if (not only_nets or # specifically resync all
|
||||
network.id not in known_network_ids or # missing net
|
||||
network.id in only_nets): # specific network to sync
|
||||
pool.spawn(self.safe_configure_dhcp_for_network, network)
|
||||
pool.waitall()
|
||||
with utils.ThreadPoolExecutorWithBlock(
|
||||
max_workers=self.conf.num_sync_threads) as pool:
|
||||
fs = []
|
||||
for network in active_networks:
|
||||
if (not only_nets or # specifically resync all
|
||||
# missing net
|
||||
network.id not in known_network_ids or
|
||||
# specific network to sync
|
||||
network.id in only_nets):
|
||||
fs.append(pool.submit(
|
||||
self.safe_configure_dhcp_for_network, network)
|
||||
)
|
||||
futures.wait(fs)
|
||||
# we notify all ports in case some were created while the agent
|
||||
# was down
|
||||
self.dhcp_ready_ports |= set(self.cache.get_port_ids(only_nets))
|
||||
@@ -577,7 +585,7 @@ class DhcpAgent(manager.Manager):
|
||||
LOG.debug("Starting _process_loop")
|
||||
|
||||
while True:
|
||||
self._pool.spawn_n(self._process_resource_update)
|
||||
self._pool.submit(self._process_resource_update)
|
||||
|
||||
def _process_resource_update(self):
|
||||
for tmp, update in self._queue.each_update_to_next_resource():
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
from concurrent import futures
|
||||
import copy
|
||||
import datetime
|
||||
import signal
|
||||
@@ -523,12 +524,12 @@ class TestDhcpAgent(base.BaseTestCase):
|
||||
def test_sync_state_disabled_net(self):
|
||||
self._test_sync_state_helper(['b'], ['a'])
|
||||
|
||||
def test_sync_state_waitall(self):
|
||||
with mock.patch.object(dhcp_agent.eventlet.GreenPool, 'waitall') as w:
|
||||
def test_sync_state_wait(self):
|
||||
with mock.patch.object(futures, 'wait') as mock_wait:
|
||||
active_net_ids = ['1', '2', '3', '4', '5']
|
||||
known_net_ids = ['1', '2', '3', '4', '5']
|
||||
self._test_sync_state_helper(known_net_ids, active_net_ids)
|
||||
w.assert_called_once_with()
|
||||
mock_wait.assert_called_once()
|
||||
|
||||
def test_sync_state_for_all_networks_plugin_error(self):
|
||||
with mock.patch(DHCP_PLUGIN) as plug:
|
||||
|
||||
Reference in New Issue
Block a user