diff --git a/neutron/tests/functional/test_server.py b/neutron/tests/functional/test_server.py index 71283ec17f8..4479dc668da 100644 --- a/neutron/tests/functional/test_server.py +++ b/neutron/tests/functional/test_server.py @@ -15,6 +15,7 @@ import multiprocessing import os +import queue import signal import socket import time @@ -61,7 +62,7 @@ class TestNeutronServer(base.BaseLoggingTestCase): # Make sure all processes are stopped os.kill(self.service_pid, signal.SIGKILL) - def _start_server(self, callback, workers): + def _start_server(self, callback, workers, processes_queue=None): """Run a given service. :param callback: callback that will start the required service @@ -91,15 +92,7 @@ class TestNeutronServer(base.BaseLoggingTestCase): # If number of workers is 1 it is assumed that we run # a service in the current process. if self.workers > 1: - # Wait at most 10 seconds to spawn workers - condition = lambda: self.workers == len(self._get_workers()) - - utils.wait_until_true( - condition, timeout=10, sleep=0.1, - exception=RuntimeError( - "Failed to start %d workers." % self.workers)) - - workers = self._get_workers() + workers = self._get_workers(10, processes_queue=processes_queue) self.assertEqual(len(workers), self.workers) return workers @@ -110,21 +103,35 @@ class TestNeutronServer(base.BaseLoggingTestCase): return [self.service_pid] - def _get_workers(self): + def _get_workers(self, timeout, processes_queue=None): """Get the list of processes in which WSGI server is running.""" - def safe_ppid(proc): try: return proc.ppid() except psutil.NoSuchProcess: return None - if self.workers > 1: - return [proc.pid for proc in psutil.process_iter() - if safe_ppid(proc) == self.service_pid] - else: - return [proc.pid for proc in psutil.process_iter() - if proc.pid == self.service_pid] + def get_workers_pid(): + if self.workers > 1: + return [proc.pid for proc in psutil.process_iter() + if safe_ppid(proc) == self.service_pid] + else: + return [proc.pid for proc in psutil.process_iter() + if proc.pid == self.service_pid] + + exception = RuntimeError('Failed to start %d workers.' % self.workers) + + if processes_queue: + try: + return processes_queue.get(timeout=timeout) + except queue.Empty: + raise exception + + # Wait at most 10 seconds to spawn workers + condition = lambda: self.workers == len(get_workers_pid()) + utils.wait_until_true(condition, timeout=timeout, sleep=0.1, + exception=exception) + return get_workers_pid() def _check_active(self): """Dummy service activity check.""" @@ -137,7 +144,8 @@ class TestNeutronServer(base.BaseLoggingTestCase): def _fake_reset(self): self._mp_queue.put(FAKE_RESET_MSG) - def _test_restart_service_on_sighup(self, service, workers=1): + def _test_restart_service_on_sighup(self, service, workers=1, + processes_queue=None): """Test that a service correctly (re)starts on receiving SIGHUP. 1. Start a service with a given number of workers. @@ -145,7 +153,8 @@ class TestNeutronServer(base.BaseLoggingTestCase): 3. Wait for workers (if any) to (re)start. """ - self._start_server(callback=service, workers=workers) + self._start_server(callback=service, workers=workers, + processes_queue=processes_queue) os.kill(self.service_pid, signal.SIGHUP) # After sending SIGHUP it is expected that there will be as many @@ -244,6 +253,7 @@ class TestRPCServer(TestNeutronServer): self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True) self.plugin = self._plugin_patcher.start() self.plugin.return_value.rpc_workers_supported = True + self._processes_queue = multiprocessing.Queue() def _serve_rpc(self, workers=1): """Start RPC server with a given number of workers.""" @@ -264,11 +274,13 @@ class TestRPCServer(TestNeutronServer): CONF.set_override("rpc_state_report_workers", 0) rpc_workers_launcher = service.start_rpc_workers() + self._processes_queue.put(list(rpc_workers_launcher.children)) rpc_workers_launcher.wait() def test_restart_rpc_on_sighup_multiple_workers(self): - self._test_restart_service_on_sighup(service=self._serve_rpc, - workers=2) + self._test_restart_service_on_sighup( + service=self._serve_rpc, workers=2, + processes_queue=self._processes_queue) class TestPluginWorker(TestNeutronServer):