Wait until workers have been launched

In "test_restart_rpc_on_sighup_multiple_workers", the test needs to
wait until the RPC workers have been properly launched by
``oslo_service.service.ProcessLauncher.launch_service``. Once this
method returns, it is guaranteed that the child worker processes
are running and the signal process handlers are attending the
SIGHUP signal that will reset them.

Conflicts:
    neutron/tests/functional/test_server.py

Closes-Bug: #1938428
Change-Id: I1dc56092d099223accc3aefa8e303310c4f6787e
(cherry picked from commit fafcabdbe0)
This commit is contained in:
Rodolfo Alonso Hernandez 2021-07-29 10:30:01 +00:00
parent 7dc407da94
commit ce54a37ddd
1 changed files with 35 additions and 22 deletions

View File

@ -13,7 +13,9 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import multiprocessing
import os import os
import queue
import signal import signal
import socket import socket
import time import time
@ -61,7 +63,7 @@ class TestNeutronServer(base.BaseLoggingTestCase):
# Make sure all processes are stopped # Make sure all processes are stopped
os.kill(self.service_pid, signal.SIGKILL) os.kill(self.service_pid, signal.SIGKILL)
def _start_server(self, callback, workers): def _start_server(self, callback, workers, processes_queue=None):
"""Run a given service. """Run a given service.
:param callback: callback that will start the required service :param callback: callback that will start the required service
@ -91,15 +93,7 @@ class TestNeutronServer(base.BaseLoggingTestCase):
# If number of workers is 1 it is assumed that we run # If number of workers is 1 it is assumed that we run
# a service in the current process. # a service in the current process.
if self.workers > 1: if self.workers > 1:
# Wait at most 10 seconds to spawn workers workers = self._get_workers(10, processes_queue=processes_queue)
condition = lambda: self.workers == len(self._get_workers())
utils.wait_until_true(
condition, timeout=10, sleep=0.1,
exception=RuntimeError(
"Failed to start %d workers." % self.workers))
workers = self._get_workers()
self.assertEqual(len(workers), self.workers) self.assertEqual(len(workers), self.workers)
return workers return workers
@ -110,21 +104,35 @@ class TestNeutronServer(base.BaseLoggingTestCase):
return [self.service_pid] return [self.service_pid]
def _get_workers(self): def _get_workers(self, timeout, processes_queue=None):
"""Get the list of processes in which WSGI server is running.""" """Get the list of processes in which WSGI server is running."""
def safe_ppid(proc): def safe_ppid(proc):
try: try:
return proc.ppid() return proc.ppid()
except psutil.NoSuchProcess: except psutil.NoSuchProcess:
return None return None
if self.workers > 1: def get_workers_pid():
return [proc.pid for proc in psutil.process_iter() if self.workers > 1:
if safe_ppid(proc) == self.service_pid] return [proc.pid for proc in psutil.process_iter()
else: if safe_ppid(proc) == self.service_pid]
return [proc.pid for proc in psutil.process_iter() else:
if proc.pid == self.service_pid] return [proc.pid for proc in psutil.process_iter()
if proc.pid == self.service_pid]
exception = RuntimeError('Failed to start %d workers.' % self.workers)
if processes_queue:
try:
return processes_queue.get(timeout=timeout)
except queue.Empty:
raise exception
# Wait at most 10 seconds to spawn workers
condition = lambda: self.workers == len(get_workers_pid())
utils.wait_until_true(condition, timeout=timeout, sleep=0.1,
exception=exception)
return get_workers_pid()
def _check_active(self): def _check_active(self):
"""Dummy service activity check.""" """Dummy service activity check."""
@ -139,7 +147,8 @@ class TestNeutronServer(base.BaseLoggingTestCase):
with open(self.temp_file, 'ab') as f: with open(self.temp_file, 'ab') as f:
f.write(FAKE_RESET_MSG) f.write(FAKE_RESET_MSG)
def _test_restart_service_on_sighup(self, service, workers=1): def _test_restart_service_on_sighup(self, service, workers=1,
processes_queue=None):
"""Test that a service correctly (re)starts on receiving SIGHUP. """Test that a service correctly (re)starts on receiving SIGHUP.
1. Start a service with a given number of workers. 1. Start a service with a given number of workers.
@ -147,7 +156,8 @@ class TestNeutronServer(base.BaseLoggingTestCase):
3. Wait for workers (if any) to (re)start. 3. Wait for workers (if any) to (re)start.
""" """
self._start_server(callback=service, workers=workers) self._start_server(callback=service, workers=workers,
processes_queue=processes_queue)
os.kill(self.service_pid, signal.SIGHUP) os.kill(self.service_pid, signal.SIGHUP)
# After sending SIGHUP it is expected that there will be as many # After sending SIGHUP it is expected that there will be as many
@ -257,6 +267,7 @@ class TestRPCServer(TestNeutronServer):
self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True) self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True)
self.plugin = self._plugin_patcher.start() self.plugin = self._plugin_patcher.start()
self.plugin.return_value.rpc_workers_supported = True self.plugin.return_value.rpc_workers_supported = True
self._processes_queue = multiprocessing.Queue()
def _serve_rpc(self, workers=1): def _serve_rpc(self, workers=1):
"""Start RPC server with a given number of workers.""" """Start RPC server with a given number of workers."""
@ -277,11 +288,13 @@ class TestRPCServer(TestNeutronServer):
CONF.set_override("rpc_state_report_workers", 0) CONF.set_override("rpc_state_report_workers", 0)
rpc_workers_launcher = service.start_rpc_workers() rpc_workers_launcher = service.start_rpc_workers()
self._processes_queue.put(list(rpc_workers_launcher.children))
rpc_workers_launcher.wait() rpc_workers_launcher.wait()
def test_restart_rpc_on_sighup_multiple_workers(self): def test_restart_rpc_on_sighup_multiple_workers(self):
self._test_restart_service_on_sighup(service=self._serve_rpc, self._test_restart_service_on_sighup(
workers=2) service=self._serve_rpc, workers=2,
processes_queue=self._processes_queue)
class TestPluginWorker(TestNeutronServer): class TestPluginWorker(TestNeutronServer):