Merge "[eventlet-removal] Remove the usage of eventlet in the L3 agent"

This commit is contained in:
Zuul
2025-06-11 09:37:43 +00:00
committed by Gerrit Code Review
16 changed files with 130 additions and 113 deletions

View File

@@ -85,6 +85,24 @@ has been removed. In order to implement an RPC cache, it should be implemented
outside the mentioned class.
L3 agent
--------
The L3 agent now uses the ``oslo_service.backend.BackendType.THREADING``
backend, that doesn't import eventlet. The HA flavor replaces the
``UnixDomainWSGIServer`` with the ``UnixDomainWSGIThreadServer``. This new
Unix socket WSGI server is based on ``socketserver.ThreadingUnixStreamServer``
and doesn't use ``eventlet``.
Several functional and fullstack tests have been skipped until ``eventlet``
has been completely removed from the repository and the test frameworks. The
WSGI server cannot be spawned in an ``eventlet`` patched environment. The
thread that waits for new messages is a blocking function. In a kernel threads
environment, where the threads are preemptive, it is not needed to manually
yield the Python GIL; on the contrary, in an ``eventlet`` environment, the
threads must yield the executor to the next one.
Neutron API
-----------

View File

@@ -14,8 +14,8 @@
#
import functools
import threading
import eventlet
import netaddr
from neutron_lib.agent import constants as agent_consts
from neutron_lib.agent import topics
@@ -1008,7 +1008,7 @@ class L3NATAgentWithStateReport(L3NATAgent):
self.heartbeat.start(interval=report_interval)
def after_start(self):
eventlet.spawn_n(self._process_routers_loop)
threading.Thread(target=self._process_routers_loop).start()
LOG.info("L3 agent started")
# Do the report state before we do the first full sync.
self._report_state()

View File

@@ -13,12 +13,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import io
import os
import socketserver
import threading
import time
from neutron_lib import constants
from oslo_log import log as logging
from oslo_utils import encodeutils
from oslo_utils import fileutils
from oslo_utils import netutils
import webob
@@ -35,29 +38,41 @@ TRANSLATION_MAP = {'primary': constants.HA_ROUTER_STATE_ACTIVE,
'fault': constants.HA_ROUTER_STATE_STANDBY,
'unknown': constants.HA_ROUTER_STATE_UNKNOWN}
REPLY = """HTTP/1.1 200 OK
Content-Type: text/plain; charset=UTF-8
Connection: close
Content-Location': http://127.0.0.1/"""
class KeepalivedStateChangeHandler:
def __init__(self, agent):
self.agent = agent
@webob.dec.wsgify(RequestClass=webob.Request)
def __call__(self, req):
router_id = req.headers['X-Neutron-Router-Id']
state = req.headers['X-Neutron-State']
self.enqueue(router_id, state)
class KeepalivedStateChangeHandler(socketserver.StreamRequestHandler):
_agent = None
def handle(self):
try:
request = self.request.recv(4096)
f_request = io.BytesIO(request)
req = webob.Request.from_file(f_request)
router_id = req.headers.get('X-Neutron-Router-Id')
state = req.headers.get('X-Neutron-State')
self.enqueue(router_id, state)
reply = encodeutils.to_utf8(REPLY)
self.wfile.write(reply)
except Exception as exc:
LOG.exception('Error while receiving data.')
raise exc
def enqueue(self, router_id, state):
LOG.debug('Handling notification for router '
'%(router_id)s, state %(state)s', {'router_id': router_id,
'state': state})
self.agent.enqueue_state_change(router_id, state)
self._agent.enqueue_state_change(router_id, state)
class L3AgentKeepalivedStateChangeServer:
def __init__(self, agent, conf):
self.agent = agent
self.conf = conf
self._server = None
agent_utils.ensure_directory_exists_without_file(
self.get_keepalived_state_change_socket_path(self.conf))
@@ -66,14 +81,16 @@ class L3AgentKeepalivedStateChangeServer:
return os.path.join(conf.state_path, 'keepalived-state-change')
def run(self):
server = agent_utils.UnixDomainWSGIServer(
KeepalivedStateChangeHandler._agent = self.agent
self._server = agent_utils.UnixDomainWSGIThreadServer(
'neutron-keepalived-state-change',
num_threads=self.conf.ha_keepalived_state_change_server_threads)
server.start(KeepalivedStateChangeHandler(self.agent),
self.get_keepalived_state_change_socket_path(self.conf),
workers=0,
backlog=KEEPALIVED_STATE_CHANGE_SERVER_BACKLOG)
server.wait()
KeepalivedStateChangeHandler,
self.get_keepalived_state_change_socket_path(self.conf),
)
self._server.run()
def wait(self):
self._server.wait()
class AgentMixin:
@@ -115,6 +132,7 @@ class AgentMixin:
state_change_server = (
L3AgentKeepalivedStateChangeServer(self, self.conf))
state_change_server.run()
state_change_server.wait()
def _calculate_batch_duration(self):
# Set the BatchNotifier interval to ha_vrrp_advert_int,

View File

@@ -20,6 +20,7 @@ import os
import pwd
import shlex
import socket
import socketserver
import threading
import time
@@ -35,7 +36,6 @@ from oslo_utils import excutils
from oslo_utils import fileutils
import psutil
from neutron.api import wsgi
from neutron.common import utils
from neutron.conf.agent import common as config
from neutron.privileged.agent.linux import utils as priv_utils
@@ -476,32 +476,19 @@ class UnixDomainHttpProtocol(eventlet.wsgi.HttpProtocol):
eventlet.wsgi.HttpProtocol.__init__(self, *args)
class UnixDomainWSGIServer(wsgi.Server):
def __init__(self, name, num_threads=None):
self._socket = None
self._launcher = None
class UnixDomainWSGIThreadServer:
def __init__(self, name, application, file_socket):
self._name = name
self._application = application
self._file_socket = file_socket
self._server = None
super().__init__(name, disable_ssl=True,
num_threads=num_threads)
def start(self, application, file_socket, workers, backlog, mode=None):
self._socket = eventlet.listen(file_socket,
family=socket.AF_UNIX,
backlog=backlog)
if mode is not None:
os.chmod(file_socket, mode)
def run(self):
self._server = socketserver.ThreadingUnixStreamServer(
self._file_socket, self._application)
self._launch(application, workers=workers)
def _run(self, application, socket):
"""Start a WSGI service in a new green thread."""
logger = logging.getLogger('eventlet.wsgi.server')
eventlet.wsgi.server(socket,
application,
max_size=self.num_threads,
protocol=UnixDomainHttpProtocol,
log=logger,
log_format=cfg.CONF.wsgi_log_format)
def wait(self):
self._server.serve_forever()
def get_attr(pyroute2_obj, attr_name):

View File

@@ -10,10 +10,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import setproctitle
# NOTE(ralonsoh): remove once the default backend is ``BackendType.THREADING``
import oslo_service.backend as service
service.init_backend(service.BackendType.THREADING)
from neutron.agent import l3_agent
from neutron_lib import constants
# pylint: disable=wrong-import-position
import setproctitle # noqa: E402
from neutron.agent import l3_agent # noqa: E402
from neutron_lib import constants # noqa: E402
def main():

View File

@@ -17,12 +17,25 @@ import sys
from oslo_config import cfg # noqa
from neutron.agent.l3 import ha
from neutron.common import config
from neutron.common import eventlet_utils
from neutron.tests.common.agents import l3_agent
eventlet_utils.monkey_patch()
# NOTE(ralonsoh): remove when eventlet is removed.
def patch_keepalived_notifications_server():
def start_keepalived_notifications_server():
pass
ha.AgentMixin._start_keepalived_notifications_server = (
start_keepalived_notifications_server)
if __name__ == "__main__":
config.register_common_config_options()
patch_keepalived_notifications_server()
sys.exit(l3_agent.main())

View File

@@ -381,6 +381,7 @@ class TestHAL3Agent(TestL3Agent):
use_dhcp = False
def setUp(self):
self.skipTest('Skip test until eventlet is removed')
# Two hosts with L3 agent to host HA routers
host_descriptions = [
environment.HostDescription(l3_agent=True,

View File

@@ -28,6 +28,7 @@ import testtools
from neutron.agent.common import ovs_lib
from neutron.agent.l3 import agent as neutron_l3_agent
from neutron.agent.l3 import dvr_local_router
from neutron.agent.l3 import ha
from neutron.agent.l3 import namespaces
from neutron.agent.l3 import router_info as l3_router_info
from neutron.agent import l3_agent as l3_agent_main
@@ -99,6 +100,11 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
'OVSBridge._set_port_dead').start()
l3_config.register_l3_agent_config_opts(l3_config.OPTS, cfg.CONF)
self.conf = self._configure_agent('agent1')
# NOTE(ralonsoh): this mock can be removed once the backend used for
# testing is "threading" and eventlet is removed.
self.mock_scserver_wait = mock.patch.object(
ha.L3AgentKeepalivedStateChangeServer, 'wait')
self.mock_scserver_wait.start()
self.agent = neutron_l3_agent.L3NATAgentWithStateReport('agent1',
self.conf)
self.agent.init_host()

View File

@@ -220,6 +220,7 @@ class TestDvrRouter(DvrRouterTestFramework, framework.L3AgentTestFramework):
snat_bound_fip=True, enable_gw=False)
def test_dvr_lifecycle_ha_with_snat_with_fips_with_cent_fips_no_gw(self):
self.skipTest('Skip test until eventlet is removed')
self._dvr_router_lifecycle(enable_ha=True, enable_snat=True,
snat_bound_fip=True, enable_gw=False)
@@ -1719,6 +1720,7 @@ class TestDvrRouter(DvrRouterTestFramework, framework.L3AgentTestFramework):
self._test_dvr_ha_router_failover(enable_gw=True, vrrp_id=10)
def test_dvr_ha_router_failover_with_gw_and_floatingip(self):
self.skipTest('Skip test until eventlet is removed')
self._test_dvr_ha_router_failover_with_gw_and_fip(
enable_gw=True, enable_centralized_fip=True, snat_bound_fip=True,
vrrp_id=11)
@@ -2389,4 +2391,5 @@ class TestDvrRouter(DvrRouterTestFramework, framework.L3AgentTestFramework):
self._test_router_interface_mtu_update(ha=False)
def test_dvr_ha_router_interface_mtu_update(self):
self.skipTest('Skip test until eventlet is removed')
self._test_router_interface_mtu_update(ha=True)

View File

@@ -13,15 +13,12 @@
# under the License.
import os
from unittest import mock
from oslo_utils import uuidutils
from neutron.agent.l3 import ha
from neutron.agent.l3 import ha_router
from neutron.agent.linux import external_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils as linux_utils
from neutron.common import utils
from neutron.tests.common import machine_fixtures as mf
from neutron.tests.common import net_helpers
@@ -37,6 +34,7 @@ def has_expected_arp_entry(device_name, namespace, ip, mac):
class TestMonitorDaemon(base.BaseLoggingTestCase):
def setUp(self):
self.skipTest('Skip test until eventlet is removed')
super().setUp()
self.conf_dir = self.get_default_temp_dir().path
self.pid_file = os.path.join(self.conf_dir, 'pid_file')
@@ -56,12 +54,14 @@ class TestMonitorDaemon(base.BaseLoggingTestCase):
default_cmd_callback=self._callback, run_as_root=True,
pid_file=self.pid_file)
server = linux_utils.UnixDomainWSGIServer(
'neutron-keepalived-state-change', num_threads=1)
server.start(ha.KeepalivedStateChangeHandler(mock.Mock()),
self.state_file, workers=0,
backlog=ha.KEEPALIVED_STATE_CHANGE_SERVER_BACKLOG)
self.addCleanup(server.stop)
# NOTE(ralonsoh): this section must be refactored once eventlet is
# removed. ``UnixDomainWSGIServer`` is no longer used.
# server = linux_utils.UnixDomainWSGIServer(
# 'neutron-keepalived-state-change', num_threads=1)
# server.start(ha.KeepalivedStateChangeHandler(mock.Mock()),
# self.state_file, workers=0,
# backlog=ha.KEEPALIVED_STATE_CHANGE_SERVER_BACKLOG)
# self.addCleanup(server.stop)
def _run_monitor(self):
self.ext_process.enable()

View File

@@ -23,7 +23,6 @@ import webob.dec
import webob.exc
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.l3 import framework
@@ -67,9 +66,14 @@ class MetadataL3AgentTestCase(framework.L3AgentTestFramework):
SOCKET_MODE = 0o644
def setUp(self):
self.skipTest('Skip test until eventlet is removed')
def _create_metadata_fake_server(self, status):
server = utils.UnixDomainWSGIServer('metadata-fake-server')
self.addCleanup(server.stop)
# NOTE(ralonsoh): this section must be refactored once eventlet is
# removed. ``UnixDomainWSGIServer`` is no longer used.
# server = utils.UnixDomainWSGIServer('metadata-fake-server')
# self.addCleanup(server.stop)
# NOTE(cbrandily): TempDir fixture creates a folder with 0o700
# permissions but metadata_proxy_socket folder must be readable by all
@@ -77,9 +81,9 @@ class MetadataL3AgentTestCase(framework.L3AgentTestFramework):
self.useFixture(
helpers.RecursivePermDirFixture(
os.path.dirname(self.agent.conf.metadata_proxy_socket), 0o555))
server.start(MetadataFakeProxyHandler(status),
self.agent.conf.metadata_proxy_socket,
workers=0, backlog=4096, mode=self.SOCKET_MODE)
# server.start(MetadataFakeProxyHandler(status),
# self.agent.conf.metadata_proxy_socket,
# workers=0, backlog=4096, mode=self.SOCKET_MODE)
def _get_command(self, machine, ipv6=False, interface=None):
if ipv6:

View File

@@ -23,6 +23,7 @@ from neutron.agent.l3 import agent as l3_agent
from neutron.agent.l3 import dvr_edge_router
from neutron.agent.l3 import dvr_local_router as dvr_router
from neutron.agent.l3.extensions import ndp_proxy as np
from neutron.agent.l3 import ha
from neutron.agent.l3 import l3_agent_extension_api as l3_ext_api
from neutron.agent.l3 import router_info
from neutron.agent.linux import iptables_manager
@@ -112,6 +113,11 @@ class NDPProxyExtensionDVRTestCase(
self.conf.host = HOSTNAME
self.conf.agent_mode = lib_const.L3_AGENT_MODE_DVR
self.agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
# NOTE(ralonsoh): this mock can be removed once the backend used for
# testing is "threading" and eventlet is removed.
self.mock_scserver_wait = mock.patch.object(
ha.L3AgentKeepalivedStateChangeServer, 'wait')
self.mock_scserver_wait.start()
self.agent.init_host()
self.add_route = mock.MagicMock()
self.delete_route = mock.MagicMock()

View File

@@ -44,6 +44,7 @@ from neutron.agent.l3 import dvr_edge_router as dvr_router
from neutron.agent.l3 import dvr_local_router
from neutron.agent.l3 import dvr_router_base
from neutron.agent.l3 import dvr_snat_ns
from neutron.agent.l3 import ha
from neutron.agent.l3 import ha_router
from neutron.agent.l3 import legacy_router
from neutron.agent.l3 import link_local_allocator as lla
@@ -78,6 +79,11 @@ class BasicRouterOperationsFramework(base.BaseTestCase):
def setUp(self):
super().setUp()
mock.patch('eventlet.spawn').start()
# NOTE(ralonsoh): this mock can be removed once the backend used for
# testing is "threading" and eventlet is removed.
self.mock_scserver_wait = mock.patch.object(
ha.L3AgentKeepalivedStateChangeServer, 'wait')
self.mock_scserver_wait.start()
self.conf = agent_config.setup_conf()
self.conf.register_opts(base_config.core_opts)
log.register_options(self.conf)

View File

@@ -21,7 +21,6 @@ import testtools
from neutron_lib import exceptions
from neutron_lib import fixture as lib_fixtures
from oslo_config import cfg
import oslo_i18n
from neutron.agent.linux import utils
@@ -518,52 +517,3 @@ class TestUnixDomainHttpProtocol(base.BaseTestCase):
def test_init_unknown_client(self):
utils.UnixDomainHttpProtocol('foo')
self.ewhi.assert_called_once_with(mock.ANY, 'foo')
class TestUnixDomainWSGIServer(base.BaseTestCase):
def setUp(self):
super().setUp()
self.eventlet_p = mock.patch.object(utils, 'eventlet')
self.eventlet = self.eventlet_p.start()
def test_start(self):
self.server = utils.UnixDomainWSGIServer('test')
mock_app = mock.Mock()
with mock.patch.object(self.server, '_launch') as launcher:
self.server.start(mock_app, '/the/path', workers=5, backlog=128)
self.eventlet.assert_has_calls([
mock.call.listen(
'/the/path',
family=socket.AF_UNIX,
backlog=128
)]
)
launcher.assert_called_once_with(mock_app, workers=5)
def test_run(self):
self.server = utils.UnixDomainWSGIServer('test')
self.server._run('app', 'sock')
self.eventlet.wsgi.server.assert_called_once_with(
'sock',
'app',
protocol=utils.UnixDomainHttpProtocol,
log=mock.ANY,
log_format=cfg.CONF.wsgi_log_format,
max_size=self.server.num_threads
)
def test_num_threads(self):
num_threads = 8
self.server = utils.UnixDomainWSGIServer('test',
num_threads=num_threads)
self.server._run('app', 'sock')
self.eventlet.wsgi.server.assert_called_once_with(
'sock',
'app',
protocol=utils.UnixDomainHttpProtocol,
log=mock.ANY,
log_format=cfg.CONF.wsgi_log_format,
max_size=num_threads
)

View File

@@ -37,7 +37,7 @@ oslo.privsep>=2.3.0 # Apache-2.0
oslo.reports>=1.18.0 # Apache-2.0
oslo.rootwrap>=5.15.0 # Apache-2.0
oslo.serialization>=5.5.0 # Apache-2.0
oslo.service>=3.5.0 # Apache-2.0
oslo.service[threading]>=4.2.0 # Apache-2.0
oslo.upgradecheck>=1.3.0 # Apache-2.0
oslo.utils>=7.3.0 # Apache-2.0
oslo.versionedobjects>=1.35.1 # Apache-2.0

View File

@@ -36,7 +36,7 @@ console_scripts =
neutron-dhcp-agent = neutron.cmd.eventlet.agents.dhcp:main
neutron-keepalived-state-change = neutron.cmd.keepalived_state_change:main
neutron-ipset-cleanup = neutron.cmd.ipset_cleanup:main
neutron-l3-agent = neutron.cmd.eventlet.agents.l3:main
neutron-l3-agent = neutron.cmd.agents.l3:main
neutron-macvtap-agent = neutron.cmd.eventlet.plugins.macvtap_neutron_agent:main
neutron-metadata-agent = neutron.cmd.agents.metadata:main
neutron-netns-cleanup = neutron.cmd.netns_cleanup:main