Handle SIGHUP: neutron-server (multiprocess) and metadata agent

All launchers implemented in common.service require each service to
implement reset method because it is called in case a process
receives a SIGHUP.

This change adds the reset method to neutron.service.RpcWorker and
neutron.wsgi.WorkerService which are used to wrap rpc and api
workers correspondingly.

Now neutron-server running in multiprocess mode (api_workers > 0 and
rpc_workers > 0) and metadata agent don't die on receiving SIGHUP and support
reloading policy_path and logging options in config.

Note that reset is called only in case a service is running in daemon mode.

Other changes made in the scope of this patch that need to be mentioned:

* Don't empty self._servers list in RpcWorker's stop method

  When a service is restarted all services are gracefully shutdowned,
  resetted and started again (see openstack.common.service code).
  As graceful shutdown implies calling service.stop() and then
  service.wait() we don't want to clean self._servers list because
  it would be impossible to wait for them to stop processing
  requests and cleaning up their resources.
  Otherwise, this would lead to problems with rpc after starting
  the rpc server again.

* Create a duplicate socket each time WorkerService starts

  When api worker is stopped it kills the eventlet wsgi server
  which internally closes the wsgi server socket object. This server
  socket object becomes not usable which leads to "Bad file
  descriptor" errors on service restart.

Added functional and unit tests.

DocImpact
Partial-Bug: #1276694
Change-Id: I75b00946b7cae891c6eb192e853118e7d49e4a24
This commit is contained in:
Elena Ezhova 2015-04-07 14:58:13 +03:00
parent d0bbfc090b
commit 6d0d729731
7 changed files with 321 additions and 8 deletions

View File

@ -31,6 +31,7 @@ from paste import deploy
from neutron.api.v2 import attributes
from neutron.common import utils
from neutron.i18n import _LI
from neutron import policy
from neutron import version
@ -210,6 +211,14 @@ def setup_logging():
LOG.debug("command line: %s", " ".join(sys.argv))
def reset_service():
# Reset worker in case SIGHUP is called.
# Note that this is called only in case a service is running in
# daemon mode.
setup_logging()
policy.refresh()
def load_paste_app(app_name):
"""Builds and returns a WSGI app from a paste config file.

View File

@ -32,7 +32,6 @@ from neutron.i18n import _LE, _LI
from neutron import manager
from neutron.openstack.common import loopingcall
from neutron.openstack.common import service as common_service
from neutron import policy
from neutron import wsgi
@ -128,7 +127,10 @@ class RpcWorker(object):
for server in self._servers:
if isinstance(server, rpc_server.MessageHandlingServer):
server.stop()
self._servers = []
@staticmethod
def reset():
config.reset_service()
def serve_rpc():
@ -288,8 +290,7 @@ class Service(n_rpc.Service):
LOG.exception(_LE("Exception occurs when waiting for timer"))
def reset(self):
config.setup_logging()
policy.refresh()
config.reset_service()
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""

View File

@ -4,5 +4,6 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
psutil>=1.1.1,<2.0.0
psycopg2
MySQL-python

View File

@ -0,0 +1,247 @@
# Copyright 2015 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import httplib2
import mock
import os
import signal
import socket
import time
import traceback
from oslo_config import cfg
import psutil
from neutron.agent.linux import utils
from neutron import service
from neutron.tests import base
from neutron import wsgi
CONF = cfg.CONF
# This message will be written to temporary file each time
# reset method is called.
FAKE_RESET_MSG = "reset".encode("utf-8")
TARGET_PLUGIN = 'neutron.plugins.ml2.plugin.Ml2Plugin'
class TestNeutronServer(base.BaseTestCase):
def setUp(self):
super(TestNeutronServer, self).setUp()
self.service_pid = None
self.workers = None
self.temp_file = self.get_temp_file_path("test_server.tmp")
self.health_checker = None
self.pipein, self.pipeout = os.pipe()
self.addCleanup(self._destroy_workers)
def _destroy_workers(self):
if self.service_pid:
# Make sure all processes are stopped
os.kill(self.service_pid, signal.SIGKILL)
def _start_server(self, callback, workers):
"""Run a given service.
:param callback: callback that will start the required service
:param workers: number of service workers
:returns: list of spawned workers' pids
"""
self.workers = workers
# Fork a new process in which server will be started
pid = os.fork()
if pid == 0:
status = 0
try:
callback(workers)
except SystemExit as exc:
status = exc.code
except BaseException:
traceback.print_exc()
status = 2
# Really exit
os._exit(status)
self.service_pid = pid
if self.workers > 0:
# Wait at most 10 seconds to spawn workers
condition = lambda: self.workers == len(self._get_workers())
utils.wait_until_true(
condition, timeout=10, sleep=0.1,
exception=RuntimeError(
"Failed to start %d workers." % self.workers))
workers = self._get_workers()
self.assertEqual(len(workers), self.workers)
return workers
# Wait for a service to start.
utils.wait_until_true(self.health_checker, timeout=10, sleep=0.1,
exception=RuntimeError(
"Failed to start service."))
return [self.service_pid]
def _get_workers(self):
"""Get the list of processes in which WSGI server is running."""
if self.workers > 0:
return [proc.pid for proc in psutil.process_iter()
if proc.ppid == self.service_pid]
else:
return [proc.pid for proc in psutil.process_iter()
if proc.pid == self.service_pid]
def _fake_reset(self):
"""Writes FAKE_RESET_MSG to temporary file on each call."""
with open(self.temp_file, 'a') as f:
f.write(FAKE_RESET_MSG)
def _test_restart_service_on_sighup(self, service, workers=0):
"""Test that a service correctly restarts on receiving SIGHUP.
1. Start a service with a given number of workers.
2. Send SIGHUP to the service.
3. Wait for workers (if any) to restart.
4. Assert that the pids of the workers didn't change after restart.
"""
start_workers = self._start_server(callback=service, workers=workers)
os.kill(self.service_pid, signal.SIGHUP)
# Wait for temp file to be created and its size become equal
# to size of FAKE_RESET_MSG repeated (workers + 1) times.
expected_size = len(FAKE_RESET_MSG) * (workers + 1)
condition = lambda: (os.path.isfile(self.temp_file)
and os.stat(self.temp_file).st_size ==
expected_size)
utils.wait_until_true(
condition, timeout=5, sleep=0.1,
exception=RuntimeError(
"Timed out waiting for file %(filename)s to be created and "
"its size become equal to %(size)s." %
{'filename': self.temp_file,
'size': expected_size}))
# Verify that reset has been called for parent process in which
# a service was started and for each worker by checking that
# FAKE_RESET_MSG has been written to temp file workers + 1 times.
with open(self.temp_file, 'r') as f:
res = f.readline()
self.assertEqual(FAKE_RESET_MSG * (workers + 1), res)
# Make sure worker pids don't change
end_workers = self._get_workers()
self.assertEqual(start_workers, end_workers)
class TestWsgiServer(TestNeutronServer):
"""Tests for neutron.wsgi.Server."""
def setUp(self):
super(TestWsgiServer, self).setUp()
self.health_checker = self._check_active
self.port = None
@staticmethod
def application(environ, start_response):
"""A primitive test application."""
response_body = 'Response'
status = '200 OK'
response_headers = [('Content-Type', 'text/plain'),
('Content-Length', str(len(response_body)))]
start_response(status, response_headers)
return [response_body]
def _check_active(self):
"""Check a wsgi service is active by making a GET request."""
port = int(os.read(self.pipein, 5))
conn = httplib2.HTTPConnectionWithTimeout("localhost", port)
try:
conn.request("GET", "/")
resp = conn.getresponse()
return resp.status == 200
except socket.error:
return False
def _run_wsgi(self, workers=0):
"""Start WSGI server with a test application."""
# Mock reset method to check that it is being called
# on receiving SIGHUP.
with mock.patch("neutron.wsgi.WorkerService.reset") as reset_method:
reset_method.side_effect = self._fake_reset
server = wsgi.Server("Test")
server.start(self.application, 0, "0.0.0.0",
workers=workers)
# Memorize a port that was chosen for the service
self.port = server.port
os.write(self.pipeout, str(self.port))
server.wait()
def test_restart_wsgi_on_sighup_multiple_workers(self):
self._test_restart_service_on_sighup(service=self._run_wsgi,
workers=2)
class TestRPCServer(TestNeutronServer):
"""Tests for neutron RPC server."""
def setUp(self):
super(TestRPCServer, self).setUp()
self.setup_coreplugin(TARGET_PLUGIN)
self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True)
self.plugin = self._plugin_patcher.start()
self.plugin.return_value.rpc_workers_supported = True
self.health_checker = self._check_active
def _check_active(self):
time.sleep(5)
return True
def _serve_rpc(self, workers=0):
"""Start RPC server with a given number of workers."""
# Mock reset method to check that it is being called
# on receiving SIGHUP.
with mock.patch("neutron.service.RpcWorker.reset") as reset_method:
with mock.patch(
"neutron.manager.NeutronManager.get_plugin"
) as get_plugin:
reset_method.side_effect = self._fake_reset
get_plugin.return_value = self.plugin
CONF.set_override("rpc_workers", workers)
launcher = service.serve_rpc()
launcher.wait()
def test_restart_rpc_on_sighup_multiple_workers(self):
self._test_restart_service_on_sighup(service=self._serve_rpc,
workers=2)

View File

@ -0,0 +1,33 @@
# Copyright 2015 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron import service
from neutron.tests import base
class TestRpcWorker(base.BaseTestCase):
@mock.patch("neutron.policy.refresh")
@mock.patch("neutron.common.config.setup_logging")
def test_reset(self, setup_logging_mock, refresh_mock):
_plugin = mock.Mock()
rpc_worker = service.RpcWorker(_plugin)
rpc_worker.reset()
setup_logging_mock.assert_called_once_with()
refresh_mock.assert_called_once_with()

View File

@ -65,6 +65,18 @@ class TestWorkerService(base.BaseTestCase):
workerservice.start()
self.assertFalse(apimock.called)
@mock.patch("neutron.policy.refresh")
@mock.patch("neutron.common.config.setup_logging")
def test_reset(self, setup_logging_mock, refresh_mock):
_service = mock.Mock()
_app = mock.Mock()
worker_service = wsgi.WorkerService(_service, _app)
worker_service.reset()
setup_logging_mock.assert_called_once_with()
refresh_mock.assert_called_once_with()
class TestWSGIServer(base.BaseTestCase):
"""WSGI server tests."""
@ -132,7 +144,7 @@ class TestWSGIServer(base.BaseTestCase):
mock.call(
server._run,
None,
mock_listen.return_value)
mock_listen.return_value.dup.return_value)
])
def test_app(self):

View File

@ -37,6 +37,7 @@ import six
import webob.dec
import webob.exc
from neutron.common import config
from neutron.common import exceptions as exception
from neutron import context
from neutron.db import api
@ -99,12 +100,17 @@ class WorkerService(object):
self._server = None
def start(self):
# When api worker is stopped it kills the eventlet wsgi server which
# internally closes the wsgi server socket object. This server socket
# object becomes not usable which leads to "Bad file descriptor"
# errors on service restart.
# Duplicate a socket object to keep a file descriptor usable.
dup_sock = self._service._socket.dup()
if CONF.use_ssl:
self._service._socket = self._service.wrap_ssl(
self._service._socket)
dup_sock = self._service.wrap_ssl(dup_sock)
self._server = self._service.pool.spawn(self._service._run,
self._application,
self._service._socket)
dup_sock)
def wait(self):
if isinstance(self._server, eventlet.greenthread.GreenThread):
@ -115,6 +121,10 @@ class WorkerService(object):
self._server.kill()
self._server = None
@staticmethod
def reset():
config.reset_service()
class Server(object):
"""Server class to manage multiple WSGI sockets and applications."""