Merge "Handle SIGHUP: neutron-server (multiprocess) and metadata agent"

This commit is contained in:
Jenkins 2015-06-10 20:25:16 +00:00 committed by Gerrit Code Review
commit a3226a405c
7 changed files with 321 additions and 8 deletions

View File

@ -31,6 +31,7 @@ from paste import deploy
from neutron.api.v2 import attributes
from neutron.common import utils
from neutron.i18n import _LI
from neutron import policy
from neutron import version
@ -210,6 +211,14 @@ def setup_logging():
LOG.debug("command line: %s", " ".join(sys.argv))
def reset_service():
# Reset worker in case SIGHUP is called.
# Note that this is called only in case a service is running in
# daemon mode.
setup_logging()
policy.refresh()
def load_paste_app(app_name):
"""Builds and returns a WSGI app from a paste config file.

View File

@ -32,7 +32,6 @@ from neutron.i18n import _LE, _LI
from neutron import manager
from neutron.openstack.common import loopingcall
from neutron.openstack.common import service as common_service
from neutron import policy
from neutron import wsgi
@ -128,7 +127,10 @@ class RpcWorker(object):
for server in self._servers:
if isinstance(server, rpc_server.MessageHandlingServer):
server.stop()
self._servers = []
@staticmethod
def reset():
config.reset_service()
def serve_rpc():
@ -288,8 +290,7 @@ class Service(n_rpc.Service):
LOG.exception(_LE("Exception occurs when waiting for timer"))
def reset(self):
config.setup_logging()
policy.refresh()
config.reset_service()
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""

View File

@ -4,5 +4,6 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
psutil>=1.1.1,<2.0.0
psycopg2
PyMySQL>=0.6.2 # MIT License

View File

@ -0,0 +1,247 @@
# Copyright 2015 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import httplib2
import mock
import os
import signal
import socket
import time
import traceback
from oslo_config import cfg
import psutil
from neutron.agent.linux import utils
from neutron import service
from neutron.tests import base
from neutron import wsgi
CONF = cfg.CONF
# This message will be written to temporary file each time
# reset method is called.
FAKE_RESET_MSG = "reset".encode("utf-8")
TARGET_PLUGIN = 'neutron.plugins.ml2.plugin.Ml2Plugin'
class TestNeutronServer(base.BaseTestCase):
def setUp(self):
super(TestNeutronServer, self).setUp()
self.service_pid = None
self.workers = None
self.temp_file = self.get_temp_file_path("test_server.tmp")
self.health_checker = None
self.pipein, self.pipeout = os.pipe()
self.addCleanup(self._destroy_workers)
def _destroy_workers(self):
if self.service_pid:
# Make sure all processes are stopped
os.kill(self.service_pid, signal.SIGKILL)
def _start_server(self, callback, workers):
"""Run a given service.
:param callback: callback that will start the required service
:param workers: number of service workers
:returns: list of spawned workers' pids
"""
self.workers = workers
# Fork a new process in which server will be started
pid = os.fork()
if pid == 0:
status = 0
try:
callback(workers)
except SystemExit as exc:
status = exc.code
except BaseException:
traceback.print_exc()
status = 2
# Really exit
os._exit(status)
self.service_pid = pid
if self.workers > 0:
# Wait at most 10 seconds to spawn workers
condition = lambda: self.workers == len(self._get_workers())
utils.wait_until_true(
condition, timeout=10, sleep=0.1,
exception=RuntimeError(
"Failed to start %d workers." % self.workers))
workers = self._get_workers()
self.assertEqual(len(workers), self.workers)
return workers
# Wait for a service to start.
utils.wait_until_true(self.health_checker, timeout=10, sleep=0.1,
exception=RuntimeError(
"Failed to start service."))
return [self.service_pid]
def _get_workers(self):
"""Get the list of processes in which WSGI server is running."""
if self.workers > 0:
return [proc.pid for proc in psutil.process_iter()
if proc.ppid == self.service_pid]
else:
return [proc.pid for proc in psutil.process_iter()
if proc.pid == self.service_pid]
def _fake_reset(self):
"""Writes FAKE_RESET_MSG to temporary file on each call."""
with open(self.temp_file, 'a') as f:
f.write(FAKE_RESET_MSG)
def _test_restart_service_on_sighup(self, service, workers=0):
"""Test that a service correctly restarts on receiving SIGHUP.
1. Start a service with a given number of workers.
2. Send SIGHUP to the service.
3. Wait for workers (if any) to restart.
4. Assert that the pids of the workers didn't change after restart.
"""
start_workers = self._start_server(callback=service, workers=workers)
os.kill(self.service_pid, signal.SIGHUP)
# Wait for temp file to be created and its size become equal
# to size of FAKE_RESET_MSG repeated (workers + 1) times.
expected_size = len(FAKE_RESET_MSG) * (workers + 1)
condition = lambda: (os.path.isfile(self.temp_file)
and os.stat(self.temp_file).st_size ==
expected_size)
utils.wait_until_true(
condition, timeout=5, sleep=0.1,
exception=RuntimeError(
"Timed out waiting for file %(filename)s to be created and "
"its size become equal to %(size)s." %
{'filename': self.temp_file,
'size': expected_size}))
# Verify that reset has been called for parent process in which
# a service was started and for each worker by checking that
# FAKE_RESET_MSG has been written to temp file workers + 1 times.
with open(self.temp_file, 'r') as f:
res = f.readline()
self.assertEqual(FAKE_RESET_MSG * (workers + 1), res)
# Make sure worker pids don't change
end_workers = self._get_workers()
self.assertEqual(start_workers, end_workers)
class TestWsgiServer(TestNeutronServer):
"""Tests for neutron.wsgi.Server."""
def setUp(self):
super(TestWsgiServer, self).setUp()
self.health_checker = self._check_active
self.port = None
@staticmethod
def application(environ, start_response):
"""A primitive test application."""
response_body = 'Response'
status = '200 OK'
response_headers = [('Content-Type', 'text/plain'),
('Content-Length', str(len(response_body)))]
start_response(status, response_headers)
return [response_body]
def _check_active(self):
"""Check a wsgi service is active by making a GET request."""
port = int(os.read(self.pipein, 5))
conn = httplib2.HTTPConnectionWithTimeout("localhost", port)
try:
conn.request("GET", "/")
resp = conn.getresponse()
return resp.status == 200
except socket.error:
return False
def _run_wsgi(self, workers=0):
"""Start WSGI server with a test application."""
# Mock reset method to check that it is being called
# on receiving SIGHUP.
with mock.patch("neutron.wsgi.WorkerService.reset") as reset_method:
reset_method.side_effect = self._fake_reset
server = wsgi.Server("Test")
server.start(self.application, 0, "0.0.0.0",
workers=workers)
# Memorize a port that was chosen for the service
self.port = server.port
os.write(self.pipeout, str(self.port))
server.wait()
def test_restart_wsgi_on_sighup_multiple_workers(self):
self._test_restart_service_on_sighup(service=self._run_wsgi,
workers=2)
class TestRPCServer(TestNeutronServer):
"""Tests for neutron RPC server."""
def setUp(self):
super(TestRPCServer, self).setUp()
self.setup_coreplugin(TARGET_PLUGIN)
self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True)
self.plugin = self._plugin_patcher.start()
self.plugin.return_value.rpc_workers_supported = True
self.health_checker = self._check_active
def _check_active(self):
time.sleep(5)
return True
def _serve_rpc(self, workers=0):
"""Start RPC server with a given number of workers."""
# Mock reset method to check that it is being called
# on receiving SIGHUP.
with mock.patch("neutron.service.RpcWorker.reset") as reset_method:
with mock.patch(
"neutron.manager.NeutronManager.get_plugin"
) as get_plugin:
reset_method.side_effect = self._fake_reset
get_plugin.return_value = self.plugin
CONF.set_override("rpc_workers", workers)
launcher = service.serve_rpc()
launcher.wait()
def test_restart_rpc_on_sighup_multiple_workers(self):
self._test_restart_service_on_sighup(service=self._serve_rpc,
workers=2)

View File

@ -0,0 +1,33 @@
# Copyright 2015 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron import service
from neutron.tests import base
class TestRpcWorker(base.BaseTestCase):
@mock.patch("neutron.policy.refresh")
@mock.patch("neutron.common.config.setup_logging")
def test_reset(self, setup_logging_mock, refresh_mock):
_plugin = mock.Mock()
rpc_worker = service.RpcWorker(_plugin)
rpc_worker.reset()
setup_logging_mock.assert_called_once_with()
refresh_mock.assert_called_once_with()

View File

@ -65,6 +65,18 @@ class TestWorkerService(base.BaseTestCase):
workerservice.start()
self.assertFalse(apimock.called)
@mock.patch("neutron.policy.refresh")
@mock.patch("neutron.common.config.setup_logging")
def test_reset(self, setup_logging_mock, refresh_mock):
_service = mock.Mock()
_app = mock.Mock()
worker_service = wsgi.WorkerService(_service, _app)
worker_service.reset()
setup_logging_mock.assert_called_once_with()
refresh_mock.assert_called_once_with()
class TestWSGIServer(base.BaseTestCase):
"""WSGI server tests."""
@ -132,7 +144,7 @@ class TestWSGIServer(base.BaseTestCase):
mock.call(
server._run,
None,
mock_listen.return_value)
mock_listen.return_value.dup.return_value)
])
def test_app(self):

View File

@ -37,6 +37,7 @@ import six
import webob.dec
import webob.exc
from neutron.common import config
from neutron.common import exceptions as exception
from neutron import context
from neutron.db import api
@ -99,12 +100,17 @@ class WorkerService(object):
self._server = None
def start(self):
# When api worker is stopped it kills the eventlet wsgi server which
# internally closes the wsgi server socket object. This server socket
# object becomes not usable which leads to "Bad file descriptor"
# errors on service restart.
# Duplicate a socket object to keep a file descriptor usable.
dup_sock = self._service._socket.dup()
if CONF.use_ssl:
self._service._socket = self._service.wrap_ssl(
self._service._socket)
dup_sock = self._service.wrap_ssl(dup_sock)
self._server = self._service.pool.spawn(self._service._run,
self._application,
self._service._socket)
dup_sock)
def wait(self):
if isinstance(self._server, eventlet.greenthread.GreenThread):
@ -115,6 +121,10 @@ class WorkerService(object):
self._server.kill()
self._server = None
@staticmethod
def reset():
config.reset_service()
class Server(object):
"""Server class to manage multiple WSGI sockets and applications."""