[eventlet-removal] Remove the usage of eventlet in the L3 agent

This patch removes the usage of eventlet in the L3 agent. It removes
the last bits using this library and changes the implementation of the
WSGI server.

This patch replaces the WSGI ``neutron-keepalived-state-change``
server with an implementation based on
``socketserver.ThreadingUnixStreamServer``. The
``KeepalivedStateChangeHandler`` class is now inheriting from
``socketserver.StreamRequestHandler``. This change is
similar to the changes done for the Metadata agents before [1][2]

This patch bumps the ``oslo.service`` library to 4.2.0, that includes
[3]. Note that the requirements line is also requesting the dependencies
for "threading", that install aditional libraries.

[1]https://review.opendev.org/c/openstack/neutron/+/938393
[2]https://review.opendev.org/c/openstack/neutron/+/942916
[3]https://review.opendev.org/c/openstack/oslo.service/+/945720

Closes-Bug: #2087943
Change-Id: I82f7b4e4c4f165bab114a0ab5ee4948d3ee8ce63
This commit is contained in:
Rodolfo Alonso Hernandez
2025-01-03 14:58:41 +00:00
parent 18dff1b31a
commit 5292b13bbd
16 changed files with 130 additions and 113 deletions

View File

@ -85,6 +85,24 @@ has been removed. In order to implement an RPC cache, it should be implemented
outside the mentioned class.
L3 agent
--------
The L3 agent now uses the ``oslo_service.backend.BackendType.THREADING``
backend, that doesn't import eventlet. The HA flavor replaces the
``UnixDomainWSGIServer`` with the ``UnixDomainWSGIThreadServer``. This new
Unix socket WSGI server is based on ``socketserver.ThreadingUnixStreamServer``
and doesn't use ``eventlet``.
Several functional and fullstack tests have been skipped until ``eventlet``
has been completely removed from the repository and the test frameworks. The
WSGI server cannot be spawned in an ``eventlet`` patched environment. The
thread that waits for new messages is a blocking function. In a kernel threads
environment, where the threads are preemptive, it is not needed to manually
yield the Python GIL; on the contrary, in an ``eventlet`` environment, the
threads must yield the executor to the next one.
Neutron API
-----------

View File

@ -14,8 +14,8 @@
#
import functools
import threading
import eventlet
import netaddr
from neutron_lib.agent import constants as agent_consts
from neutron_lib.agent import topics
@ -1008,7 +1008,7 @@ class L3NATAgentWithStateReport(L3NATAgent):
self.heartbeat.start(interval=report_interval)
def after_start(self):
eventlet.spawn_n(self._process_routers_loop)
threading.Thread(target=self._process_routers_loop).start()
LOG.info("L3 agent started")
# Do the report state before we do the first full sync.
self._report_state()

View File

@ -13,12 +13,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import io
import os
import socketserver
import threading
import time
from neutron_lib import constants
from oslo_log import log as logging
from oslo_utils import encodeutils
from oslo_utils import fileutils
from oslo_utils import netutils
import webob
@ -35,29 +38,41 @@ TRANSLATION_MAP = {'primary': constants.HA_ROUTER_STATE_ACTIVE,
'fault': constants.HA_ROUTER_STATE_STANDBY,
'unknown': constants.HA_ROUTER_STATE_UNKNOWN}
REPLY = """HTTP/1.1 200 OK
Content-Type: text/plain; charset=UTF-8
Connection: close
Content-Location': http://127.0.0.1/"""
class KeepalivedStateChangeHandler:
def __init__(self, agent):
self.agent = agent
@webob.dec.wsgify(RequestClass=webob.Request)
def __call__(self, req):
router_id = req.headers['X-Neutron-Router-Id']
state = req.headers['X-Neutron-State']
self.enqueue(router_id, state)
class KeepalivedStateChangeHandler(socketserver.StreamRequestHandler):
_agent = None
def handle(self):
try:
request = self.request.recv(4096)
f_request = io.BytesIO(request)
req = webob.Request.from_file(f_request)
router_id = req.headers.get('X-Neutron-Router-Id')
state = req.headers.get('X-Neutron-State')
self.enqueue(router_id, state)
reply = encodeutils.to_utf8(REPLY)
self.wfile.write(reply)
except Exception as exc:
LOG.exception('Error while receiving data.')
raise exc
def enqueue(self, router_id, state):
LOG.debug('Handling notification for router '
'%(router_id)s, state %(state)s', {'router_id': router_id,
'state': state})
self.agent.enqueue_state_change(router_id, state)
self._agent.enqueue_state_change(router_id, state)
class L3AgentKeepalivedStateChangeServer:
def __init__(self, agent, conf):
self.agent = agent
self.conf = conf
self._server = None
agent_utils.ensure_directory_exists_without_file(
self.get_keepalived_state_change_socket_path(self.conf))
@ -66,14 +81,16 @@ class L3AgentKeepalivedStateChangeServer:
return os.path.join(conf.state_path, 'keepalived-state-change')
def run(self):
server = agent_utils.UnixDomainWSGIServer(
KeepalivedStateChangeHandler._agent = self.agent
self._server = agent_utils.UnixDomainWSGIThreadServer(
'neutron-keepalived-state-change',
num_threads=self.conf.ha_keepalived_state_change_server_threads)
server.start(KeepalivedStateChangeHandler(self.agent),
self.get_keepalived_state_change_socket_path(self.conf),
workers=0,
backlog=KEEPALIVED_STATE_CHANGE_SERVER_BACKLOG)
server.wait()
KeepalivedStateChangeHandler,
self.get_keepalived_state_change_socket_path(self.conf),
)
self._server.run()
def wait(self):
self._server.wait()
class AgentMixin:
@ -115,6 +132,7 @@ class AgentMixin:
state_change_server = (
L3AgentKeepalivedStateChangeServer(self, self.conf))
state_change_server.run()
state_change_server.wait()
def _calculate_batch_duration(self):
# Set the BatchNotifier interval to ha_vrrp_advert_int,

View File

@ -20,6 +20,7 @@ import os
import pwd
import shlex
import socket
import socketserver
import threading
import time
@ -35,7 +36,6 @@ from oslo_utils import excutils
from oslo_utils import fileutils
import psutil
from neutron.api import wsgi
from neutron.common import utils
from neutron.conf.agent import common as config
from neutron.privileged.agent.linux import utils as priv_utils
@ -476,32 +476,19 @@ class UnixDomainHttpProtocol(eventlet.wsgi.HttpProtocol):
eventlet.wsgi.HttpProtocol.__init__(self, *args)
class UnixDomainWSGIServer(wsgi.Server):
def __init__(self, name, num_threads=None):
self._socket = None
self._launcher = None
class UnixDomainWSGIThreadServer:
def __init__(self, name, application, file_socket):
self._name = name
self._application = application
self._file_socket = file_socket
self._server = None
super().__init__(name, disable_ssl=True,
num_threads=num_threads)
def start(self, application, file_socket, workers, backlog, mode=None):
self._socket = eventlet.listen(file_socket,
family=socket.AF_UNIX,
backlog=backlog)
if mode is not None:
os.chmod(file_socket, mode)
def run(self):
self._server = socketserver.ThreadingUnixStreamServer(
self._file_socket, self._application)
self._launch(application, workers=workers)
def _run(self, application, socket):
"""Start a WSGI service in a new green thread."""
logger = logging.getLogger('eventlet.wsgi.server')
eventlet.wsgi.server(socket,
application,
max_size=self.num_threads,
protocol=UnixDomainHttpProtocol,
log=logger,
log_format=cfg.CONF.wsgi_log_format)
def wait(self):
self._server.serve_forever()
def get_attr(pyroute2_obj, attr_name):

View File

@ -10,10 +10,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import setproctitle
# NOTE(ralonsoh): remove once the default backend is ``BackendType.THREADING``
import oslo_service.backend as service
service.init_backend(service.BackendType.THREADING)
from neutron.agent import l3_agent
from neutron_lib import constants
# pylint: disable=wrong-import-position
import setproctitle # noqa: E402
from neutron.agent import l3_agent # noqa: E402
from neutron_lib import constants # noqa: E402
def main():

View File

@ -17,12 +17,25 @@ import sys
from oslo_config import cfg # noqa
from neutron.agent.l3 import ha
from neutron.common import config
from neutron.common import eventlet_utils
from neutron.tests.common.agents import l3_agent
eventlet_utils.monkey_patch()
# NOTE(ralonsoh): remove when eventlet is removed.
def patch_keepalived_notifications_server():
def start_keepalived_notifications_server():
pass
ha.AgentMixin._start_keepalived_notifications_server = (
start_keepalived_notifications_server)
if __name__ == "__main__":
config.register_common_config_options()
patch_keepalived_notifications_server()
sys.exit(l3_agent.main())

View File

@ -381,6 +381,7 @@ class TestHAL3Agent(TestL3Agent):
use_dhcp = False
def setUp(self):
self.skipTest('Skip test until eventlet is removed')
# Two hosts with L3 agent to host HA routers
host_descriptions = [
environment.HostDescription(l3_agent=True,

View File

@ -28,6 +28,7 @@ import testtools
from neutron.agent.common import ovs_lib
from neutron.agent.l3 import agent as neutron_l3_agent
from neutron.agent.l3 import dvr_local_router
from neutron.agent.l3 import ha
from neutron.agent.l3 import namespaces
from neutron.agent.l3 import router_info as l3_router_info
from neutron.agent import l3_agent as l3_agent_main
@ -99,6 +100,11 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
'OVSBridge._set_port_dead').start()
l3_config.register_l3_agent_config_opts(l3_config.OPTS, cfg.CONF)
self.conf = self._configure_agent('agent1')
# NOTE(ralonsoh): this mock can be removed once the backend used for
# testing is "threading" and eventlet is removed.
self.mock_scserver_wait = mock.patch.object(
ha.L3AgentKeepalivedStateChangeServer, 'wait')
self.mock_scserver_wait.start()
self.agent = neutron_l3_agent.L3NATAgentWithStateReport('agent1',
self.conf)
self.agent.init_host()

View File

@ -220,6 +220,7 @@ class TestDvrRouter(DvrRouterTestFramework, framework.L3AgentTestFramework):
snat_bound_fip=True, enable_gw=False)
def test_dvr_lifecycle_ha_with_snat_with_fips_with_cent_fips_no_gw(self):
self.skipTest('Skip test until eventlet is removed')
self._dvr_router_lifecycle(enable_ha=True, enable_snat=True,
snat_bound_fip=True, enable_gw=False)
@ -1719,6 +1720,7 @@ class TestDvrRouter(DvrRouterTestFramework, framework.L3AgentTestFramework):
self._test_dvr_ha_router_failover(enable_gw=True, vrrp_id=10)
def test_dvr_ha_router_failover_with_gw_and_floatingip(self):
self.skipTest('Skip test until eventlet is removed')
self._test_dvr_ha_router_failover_with_gw_and_fip(
enable_gw=True, enable_centralized_fip=True, snat_bound_fip=True,
vrrp_id=11)
@ -2389,4 +2391,5 @@ class TestDvrRouter(DvrRouterTestFramework, framework.L3AgentTestFramework):
self._test_router_interface_mtu_update(ha=False)
def test_dvr_ha_router_interface_mtu_update(self):
self.skipTest('Skip test until eventlet is removed')
self._test_router_interface_mtu_update(ha=True)

View File

@ -13,15 +13,12 @@
# under the License.
import os
from unittest import mock
from oslo_utils import uuidutils
from neutron.agent.l3 import ha
from neutron.agent.l3 import ha_router
from neutron.agent.linux import external_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils as linux_utils
from neutron.common import utils
from neutron.tests.common import machine_fixtures as mf
from neutron.tests.common import net_helpers
@ -37,6 +34,7 @@ def has_expected_arp_entry(device_name, namespace, ip, mac):
class TestMonitorDaemon(base.BaseLoggingTestCase):
def setUp(self):
self.skipTest('Skip test until eventlet is removed')
super().setUp()
self.conf_dir = self.get_default_temp_dir().path
self.pid_file = os.path.join(self.conf_dir, 'pid_file')
@ -56,12 +54,14 @@ class TestMonitorDaemon(base.BaseLoggingTestCase):
default_cmd_callback=self._callback, run_as_root=True,
pid_file=self.pid_file)
server = linux_utils.UnixDomainWSGIServer(
'neutron-keepalived-state-change', num_threads=1)
server.start(ha.KeepalivedStateChangeHandler(mock.Mock()),
self.state_file, workers=0,
backlog=ha.KEEPALIVED_STATE_CHANGE_SERVER_BACKLOG)
self.addCleanup(server.stop)
# NOTE(ralonsoh): this section must be refactored once eventlet is
# removed. ``UnixDomainWSGIServer`` is no longer used.
# server = linux_utils.UnixDomainWSGIServer(
# 'neutron-keepalived-state-change', num_threads=1)
# server.start(ha.KeepalivedStateChangeHandler(mock.Mock()),
# self.state_file, workers=0,
# backlog=ha.KEEPALIVED_STATE_CHANGE_SERVER_BACKLOG)
# self.addCleanup(server.stop)
def _run_monitor(self):
self.ext_process.enable()

View File

@ -23,7 +23,6 @@ import webob.dec
import webob.exc
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.l3 import framework
@ -67,9 +66,14 @@ class MetadataL3AgentTestCase(framework.L3AgentTestFramework):
SOCKET_MODE = 0o644
def setUp(self):
self.skipTest('Skip test until eventlet is removed')
def _create_metadata_fake_server(self, status):
server = utils.UnixDomainWSGIServer('metadata-fake-server')
self.addCleanup(server.stop)
# NOTE(ralonsoh): this section must be refactored once eventlet is
# removed. ``UnixDomainWSGIServer`` is no longer used.
# server = utils.UnixDomainWSGIServer('metadata-fake-server')
# self.addCleanup(server.stop)
# NOTE(cbrandily): TempDir fixture creates a folder with 0o700
# permissions but metadata_proxy_socket folder must be readable by all
@ -77,9 +81,9 @@ class MetadataL3AgentTestCase(framework.L3AgentTestFramework):
self.useFixture(
helpers.RecursivePermDirFixture(
os.path.dirname(self.agent.conf.metadata_proxy_socket), 0o555))
server.start(MetadataFakeProxyHandler(status),
self.agent.conf.metadata_proxy_socket,
workers=0, backlog=4096, mode=self.SOCKET_MODE)
# server.start(MetadataFakeProxyHandler(status),
# self.agent.conf.metadata_proxy_socket,
# workers=0, backlog=4096, mode=self.SOCKET_MODE)
def _get_command(self, machine, ipv6=False, interface=None):
if ipv6:

View File

@ -23,6 +23,7 @@ from neutron.agent.l3 import agent as l3_agent
from neutron.agent.l3 import dvr_edge_router
from neutron.agent.l3 import dvr_local_router as dvr_router
from neutron.agent.l3.extensions import ndp_proxy as np
from neutron.agent.l3 import ha
from neutron.agent.l3 import l3_agent_extension_api as l3_ext_api
from neutron.agent.l3 import router_info
from neutron.agent.linux import iptables_manager
@ -112,6 +113,11 @@ class NDPProxyExtensionDVRTestCase(
self.conf.host = HOSTNAME
self.conf.agent_mode = lib_const.L3_AGENT_MODE_DVR
self.agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
# NOTE(ralonsoh): this mock can be removed once the backend used for
# testing is "threading" and eventlet is removed.
self.mock_scserver_wait = mock.patch.object(
ha.L3AgentKeepalivedStateChangeServer, 'wait')
self.mock_scserver_wait.start()
self.agent.init_host()
self.add_route = mock.MagicMock()
self.delete_route = mock.MagicMock()

View File

@ -44,6 +44,7 @@ from neutron.agent.l3 import dvr_edge_router as dvr_router
from neutron.agent.l3 import dvr_local_router
from neutron.agent.l3 import dvr_router_base
from neutron.agent.l3 import dvr_snat_ns
from neutron.agent.l3 import ha
from neutron.agent.l3 import ha_router
from neutron.agent.l3 import legacy_router
from neutron.agent.l3 import link_local_allocator as lla
@ -78,6 +79,11 @@ class BasicRouterOperationsFramework(base.BaseTestCase):
def setUp(self):
super().setUp()
mock.patch('eventlet.spawn').start()
# NOTE(ralonsoh): this mock can be removed once the backend used for
# testing is "threading" and eventlet is removed.
self.mock_scserver_wait = mock.patch.object(
ha.L3AgentKeepalivedStateChangeServer, 'wait')
self.mock_scserver_wait.start()
self.conf = agent_config.setup_conf()
self.conf.register_opts(base_config.core_opts)
log.register_options(self.conf)

View File

@ -21,7 +21,6 @@ import testtools
from neutron_lib import exceptions
from neutron_lib import fixture as lib_fixtures
from oslo_config import cfg
import oslo_i18n
from neutron.agent.linux import utils
@ -518,52 +517,3 @@ class TestUnixDomainHttpProtocol(base.BaseTestCase):
def test_init_unknown_client(self):
utils.UnixDomainHttpProtocol('foo')
self.ewhi.assert_called_once_with(mock.ANY, 'foo')
class TestUnixDomainWSGIServer(base.BaseTestCase):
def setUp(self):
super().setUp()
self.eventlet_p = mock.patch.object(utils, 'eventlet')
self.eventlet = self.eventlet_p.start()
def test_start(self):
self.server = utils.UnixDomainWSGIServer('test')
mock_app = mock.Mock()
with mock.patch.object(self.server, '_launch') as launcher:
self.server.start(mock_app, '/the/path', workers=5, backlog=128)
self.eventlet.assert_has_calls([
mock.call.listen(
'/the/path',
family=socket.AF_UNIX,
backlog=128
)]
)
launcher.assert_called_once_with(mock_app, workers=5)
def test_run(self):
self.server = utils.UnixDomainWSGIServer('test')
self.server._run('app', 'sock')
self.eventlet.wsgi.server.assert_called_once_with(
'sock',
'app',
protocol=utils.UnixDomainHttpProtocol,
log=mock.ANY,
log_format=cfg.CONF.wsgi_log_format,
max_size=self.server.num_threads
)
def test_num_threads(self):
num_threads = 8
self.server = utils.UnixDomainWSGIServer('test',
num_threads=num_threads)
self.server._run('app', 'sock')
self.eventlet.wsgi.server.assert_called_once_with(
'sock',
'app',
protocol=utils.UnixDomainHttpProtocol,
log=mock.ANY,
log_format=cfg.CONF.wsgi_log_format,
max_size=num_threads
)

View File

@ -37,7 +37,7 @@ oslo.privsep>=2.3.0 # Apache-2.0
oslo.reports>=1.18.0 # Apache-2.0
oslo.rootwrap>=5.15.0 # Apache-2.0
oslo.serialization>=5.5.0 # Apache-2.0
oslo.service>=3.5.0 # Apache-2.0
oslo.service[threading]>=4.2.0 # Apache-2.0
oslo.upgradecheck>=1.3.0 # Apache-2.0
oslo.utils>=7.3.0 # Apache-2.0
oslo.versionedobjects>=1.35.1 # Apache-2.0

View File

@ -38,7 +38,7 @@ console_scripts =
neutron-dhcp-agent = neutron.cmd.eventlet.agents.dhcp:main
neutron-keepalived-state-change = neutron.cmd.keepalived_state_change:main
neutron-ipset-cleanup = neutron.cmd.ipset_cleanup:main
neutron-l3-agent = neutron.cmd.eventlet.agents.l3:main
neutron-l3-agent = neutron.cmd.agents.l3:main
neutron-macvtap-agent = neutron.cmd.eventlet.plugins.macvtap_neutron_agent:main
neutron-metadata-agent = neutron.cmd.agents.metadata:main
neutron-netns-cleanup = neutron.cmd.netns_cleanup:main