Merge "Update wsgi.py to support reload from conf on SIGHUP"

This commit is contained in:
Jenkins 2015-07-07 07:50:24 +00:00 committed by Gerrit Code Review
commit 1765389942
9 changed files with 415 additions and 58 deletions

View File

@ -64,8 +64,8 @@ if __name__ == '__main__':
{'host': host, 'port': port})
profiler.setup('heat-api', host)
gmr.TextGuruMeditation.setup_autorun(version)
server = wsgi.Server()
server.start(app, cfg.CONF.heat_api, default_port=port)
server = wsgi.Server('heat-api', cfg.CONF.heat_api)
server.start(app, default_port=port)
systemd.notify_once()
server.wait()
except RuntimeError as e:

View File

@ -68,8 +68,8 @@ if __name__ == '__main__':
{'host': host, 'port': port})
profiler.setup('heat-api-cfn', host)
gmr.TextGuruMeditation.setup_autorun(version)
server = wsgi.Server()
server.start(app, cfg.CONF.heat_api_cfn, default_port=port)
server = wsgi.Server('heat-api-cfn', cfg.CONF.heat_api_cfn)
server.start(app, default_port=port)
systemd.notify_once()
server.wait()
except RuntimeError as e:

View File

@ -68,8 +68,9 @@ if __name__ == '__main__':
{'host': host, 'port': port})
profiler.setup('heat-api-cloudwatch', host)
gmr.TextGuruMeditation.setup_autorun(version)
server = wsgi.Server()
server.start(app, cfg.CONF.heat_api_cloudwatch, default_port=port)
server = wsgi.Server('heat-api-cloudwatch',
cfg.CONF.heat_api_cloudwatch)
server.start(app, default_port=port)
systemd.notify_once()
server.wait()
except RuntimeError as e:

View File

@ -520,3 +520,7 @@ class ObjectFieldInvalid(HeatException):
class KeystoneServiceNameConflict(HeatException):
msg_fmt = _("Keystone has more than one service with same name "
"%(service)s. Please use service id instead of name")
class SIGHUPInterrupt(HeatException):
msg_fmt = _("System SIGHUP signal received.")

View File

@ -33,6 +33,8 @@ from eventlet.green import socket
from eventlet.green import ssl
import eventlet.greenio
import eventlet.wsgi
import functools
from oslo_concurrency import processutils
from oslo_config import cfg
import oslo_i18n as i18n
from oslo_log import log as logging
@ -77,7 +79,7 @@ api_opts = [
help=_("Location of the SSL key file to use "
"for enabling SSL mode."),
deprecated_group='DEFAULT'),
cfg.IntOpt('workers', default=0,
cfg.IntOpt('workers', default=processutils.get_worker_count(),
help=_("Number of workers for Heat service."),
deprecated_group='DEFAULT'),
cfg.IntOpt('max_header_line', default=16384,
@ -85,6 +87,10 @@ api_opts = [
'max_header_line may need to be increased when using '
'large tokens (typically those generated by the '
'Keystone v3 API with big service catalogs).')),
cfg.IntOpt('tcp_keepidle', default=600,
help=_('The value for the socket option TCP_KEEPIDLE. This is '
'the time in seconds that the connection must be idle '
'before TCP starts sending keepalive probes.')),
]
api_group = cfg.OptGroup('heat_api')
cfg.CONF.register_group(api_group)
@ -119,6 +125,10 @@ api_cfn_opts = [
'max_header_line may need to be increased when using '
'large tokens (typically those generated by the '
'Keystone v3 API with big service catalogs).')),
cfg.IntOpt('tcp_keepidle', default=600,
help=_('The value for the socket option TCP_KEEPIDLE. This is '
'the time in seconds that the connection must be idle '
'before TCP starts sending keepalive probes.')),
]
api_cfn_group = cfg.OptGroup('heat_api_cfn')
cfg.CONF.register_group(api_cfn_group)
@ -153,6 +163,10 @@ api_cw_opts = [
'max_header_line may need to be increased when using '
'large tokens (typically those generated by the '
'Keystone v3 API with big service catalogs.)')),
cfg.IntOpt('tcp_keepidle', default=600,
help=_('The value for the socket option TCP_KEEPIDLE. This is '
'the time in seconds that the connection must be idle '
'before TCP starts sending keepalive probes.')),
]
api_cw_group = cfg.OptGroup('heat_api_cloudwatch')
cfg.CONF.register_group(api_cw_group)
@ -227,11 +241,9 @@ def get_socket(conf, default_port):
retry_until = time.time() + 30
while not sock and time.time() < retry_until:
try:
sock = eventlet.listen(bind_addr, backlog=conf.backlog,
sock = eventlet.listen(bind_addr,
backlog=conf.backlog,
family=address_family)
if use_ssl:
sock = ssl.wrap_socket(sock, certfile=cert_file,
keyfile=key_file)
except socket.error as err:
if err.args[0] != errno.EADDRINUSE:
raise
@ -240,13 +252,6 @@ def get_socket(conf, default_port):
raise RuntimeError(_("Could not bind to %(bind_addr)s"
"after trying for 30 seconds")
% {'bind_addr': bind_addr})
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# in my experience, sockets can hang around forever without keepalive
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# This option isn't available in the OS X version of eventlet
if hasattr(socket, 'TCP_KEEPIDLE'):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 600)
return sock
@ -265,53 +270,64 @@ class WritableLogger(object):
class Server(object):
"""Server class to manage multiple WSGI sockets and applications."""
def __init__(self, threads=1000):
def __init__(self, name, conf, threads=1000):
os.umask(0o27) # ensure files are created with the correct privileges
self._logger = logging.getLogger("eventlet.wsgi.server")
self._wsgi_logger = WritableLogger(self._logger)
self.name = name
self.threads = threads
self.children = []
self.children = set()
self.stale_children = set()
self.running = True
self.pgid = os.getpid()
self.conf = conf
try:
os.setpgid(self.pgid, self.pgid)
except OSError:
self.pgid = 0
def start(self, application, conf, default_port):
def kill_children(self, *args):
"""Kills the entire process group."""
LOG.error(_LE('SIGTERM received'))
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
self.running = False
os.killpg(0, signal.SIGTERM)
def hup(self, *args):
"""
Reloads configuration files with zero down time.
"""
LOG.error(_LE('SIGHUP received'))
signal.signal(signal.SIGHUP, signal.SIG_IGN)
raise exception.SIGHUPInterrupt
def start(self, application, default_port):
"""
Run a WSGI server with the given application.
:param application: The application to run in the WSGI server
:param conf: a cfg.ConfigOpts object
:param default_port: Port to bind to if none is specified in conf
"""
def kill_children(*args):
"""Kills the entire process group."""
LOG.error(_LE('SIGTERM received'))
signal.signal(signal.SIGTERM, signal.SIG_IGN)
self.running = False
os.killpg(0, signal.SIGTERM)
def hup(*args):
"""
Shuts down the server(s), but allows running requests to complete
"""
LOG.error(_LE('SIGHUP received'))
signal.signal(signal.SIGHUP, signal.SIG_IGN)
os.killpg(0, signal.SIGHUP)
signal.signal(signal.SIGHUP, hup)
eventlet.wsgi.MAX_HEADER_LINE = conf.max_header_line
eventlet.wsgi.MAX_HEADER_LINE = self.conf.max_header_line
self.application = application
self.sock = get_socket(conf, default_port)
self.default_port = default_port
self.configure_socket()
self.start_wsgi()
os.umask(0o27) # ensure files are created with the correct privileges
self._logger = logging.getLogger("eventlet.wsgi.server")
self._wsgi_logger = WritableLogger(self._logger)
if conf.workers == 0:
def start_wsgi(self):
if self.conf.workers == 0:
# Useful for profiling, test, debug etc.
self.pool = eventlet.GreenPool(size=self.threads)
self.pool.spawn_n(self._single_run, application, self.sock)
self.pool.spawn_n(self._single_run, self.application, self.sock)
return
LOG.info(_LI("Starting %d workers"), conf.workers)
signal.signal(signal.SIGTERM, kill_children)
signal.signal(signal.SIGHUP, hup)
while len(self.children) < conf.workers:
LOG.info(_LI("Starting %d workers"), self.conf.workers)
signal.signal(signal.SIGTERM, self.kill_children)
signal.signal(signal.SIGINT, self.kill_children)
signal.signal(signal.SIGHUP, self.hup)
while len(self.children) < self.conf.workers:
self.run_child()
def wait_on_children(self):
@ -319,9 +335,8 @@ class Server(object):
try:
pid, status = os.wait()
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
LOG.error(_LE('Removing dead child %s'), pid)
self.children.remove(pid)
self.run_child()
self._remove_children(pid)
self._verify_and_respawn_children(pid, status)
except OSError as err:
if err.errno not in (errno.EINTR, errno.ECHILD):
raise
@ -329,10 +344,151 @@ class Server(object):
LOG.info(_LI('Caught keyboard interrupt. Exiting.'))
os.killpg(0, signal.SIGTERM)
break
except exception.SIGHUPInterrupt:
self.reload()
continue
eventlet.greenio.shutdown_safe(self.sock)
self.sock.close()
LOG.debug('Exited')
def configure_socket(self, old_conf=None, has_changed=None):
"""
Ensure a socket exists and is appropriately configured.
This function is called on start up, and can also be
called in the event of a configuration reload.
When called for the first time a new socket is created.
If reloading and either bind_host or bind port have been
changed the existing socket must be closed and a new
socket opened (laws of physics).
In all other cases (bind_host/bind_port have not changed)
the existing socket is reused.
:param old_conf: Cached old configuration settings (if any)
:param has changed: callable to determine if a parameter has changed
"""
# Do we need a fresh socket?
new_sock = (old_conf is None or (
has_changed('bind_host') or
has_changed('bind_port')))
# Will we be using https?
use_ssl = not (not self.conf.cert_file or not self.conf.key_file)
# Were we using https before?
old_use_ssl = (old_conf is not None and not (
not old_conf.get('key_file') or
not old_conf.get('cert_file')))
# Do we now need to perform an SSL wrap on the socket?
wrap_sock = use_ssl is True and (old_use_ssl is False or new_sock)
# Do we now need to perform an SSL unwrap on the socket?
unwrap_sock = use_ssl is False and old_use_ssl is True
if new_sock:
self._sock = None
if old_conf is not None:
self.sock.close()
_sock = get_socket(self.conf, self.default_port)
_sock.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
# sockets can hang around forever without keepalive
_sock.setsockopt(socket.SOL_SOCKET,
socket.SO_KEEPALIVE, 1)
self._sock = _sock
if wrap_sock:
self.sock = ssl.wrap_socket(self._sock,
certfile=self.conf.cert_file,
keyfile=self.conf.key_file)
if unwrap_sock:
self.sock = self._sock
if new_sock and not use_ssl:
self.sock = self._sock
# Pick up newly deployed certs
if old_conf is not None and use_ssl is True and old_use_ssl is True:
if has_changed('cert_file'):
self.sock.certfile = self.conf.cert_file
if has_changed('key_file'):
self.sock.keyfile = self.conf.key_file
if new_sock or (old_conf is not None and has_changed('tcp_keepidle')):
# This option isn't available in the OS X version of eventlet
if hasattr(socket, 'TCP_KEEPIDLE'):
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
self.conf.tcp_keepidle)
if old_conf is not None and has_changed('backlog'):
self.sock.listen(self.conf.backlog)
def _remove_children(self, pid):
if pid in self.children:
self.children.remove(pid)
LOG.info(_LI('Removed dead child %s'), pid)
elif pid in self.stale_children:
self.stale_children.remove(pid)
LOG.info(_LI('Removed stale child %s'), pid)
else:
LOG.warn(_LW('Unrecognised child %s'), pid)
def _verify_and_respawn_children(self, pid, status):
if len(self.stale_children) == 0:
LOG.debug('No stale children')
if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0:
LOG.error(_LE('Not respawning child %d, cannot '
'recover from termination'), pid)
if not self.children and not self.stale_children:
LOG.info(
_LI('All workers have terminated. Exiting'))
self.running = False
else:
if len(self.children) < self.conf.workers:
self.run_child()
def stash_conf_values(self):
"""
Make a copy of some of the current global CONF's settings.
Allows determining if any of these values have changed
when the config is reloaded.
"""
conf = {}
conf['bind_host'] = self.conf.bind_host
conf['bind_port'] = self.conf.bind_port
conf['backlog'] = self.conf.backlog
conf['key_file'] = self.conf.key_file
conf['cert_file'] = self.conf.cert_file
return conf
def reload(self):
"""
Reload and re-apply configuration settings
Existing child processes are sent a SIGHUP signal
and will exit after completing existing requests.
New child processes, which will have the updated
configuration, are spawned. This allows preventing
interruption to the service.
"""
def _has_changed(old, new, param):
old = old.get(param)
new = getattr(new, param)
return (new != old)
old_conf = self.stash_conf_values()
has_changed = functools.partial(_has_changed, old_conf, self.conf)
cfg.CONF.reload_config_files()
os.killpg(self.pgid, signal.SIGHUP)
self.stale_children = self.children
self.children = set()
# Ensure any logging config changes are picked up
logging.setup(cfg.CONF, self.name)
self.configure_socket(old_conf, has_changed)
self.start_wsgi()
def wait(self):
"""Wait until all servers have completed running."""
try:
@ -344,16 +500,32 @@ class Server(object):
pass
def run_child(self):
def child_hup(*args):
"""Shuts down child processes, existing requests are handled."""
signal.signal(signal.SIGHUP, signal.SIG_IGN)
eventlet.wsgi.is_accepting = False
self.sock.close()
pid = os.fork()
if pid == 0:
signal.signal(signal.SIGHUP, signal.SIG_DFL)
signal.signal(signal.SIGHUP, child_hup)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
# ignore the interrupt signal to avoid a race whereby
# a child worker receives the signal before the parent
# and is respawned unnecessarily as a result
signal.signal(signal.SIGINT, signal.SIG_IGN)
# The child has no need to stash the unwrapped
# socket, and the reference prevents a clean
# exit on sighup
self._sock = None
self.run_server()
LOG.info(_LI('Child %d exiting normally'), os.getpid())
return
# self.pool.waitall() is now called in wsgi's server so
# it's safe to exit here
sys.exit(0)
else:
LOG.info(_LI('Started child %s'), pid)
self.children.append(pid)
self.children.add(pid)
def run_server(self):
"""Run a WSGI server."""

View File

@ -15,13 +15,16 @@
# under the License.
import fixtures
import json
from oslo_config import cfg
import mock
import six
import socket
import stubout
import webob
from oslo_config import cfg
from heat.api.aws import exception as aws_exception
from heat.common import exception
from heat.common import wsgi
@ -398,3 +401,77 @@ class JSONRequestDeserializerTest(common.HeatTestCase):
'(%s bytes) exceeds maximum allowed size (%s bytes).' % (
len(body), cfg.CONF.max_json_body_size))
self.assertEqual(msg, six.text_type(error))
class GetSocketTestCase(common.HeatTestCase):
def setUp(self):
super(GetSocketTestCase, self).setUp()
self.useFixture(fixtures.MonkeyPatch(
"heat.common.wsgi.get_bind_addr",
lambda x, y: ('192.168.0.13', 1234)))
addr_info_list = [(2, 1, 6, '', ('192.168.0.13', 80)),
(2, 2, 17, '', ('192.168.0.13', 80)),
(2, 3, 0, '', ('192.168.0.13', 80))]
self.useFixture(fixtures.MonkeyPatch(
"heat.common.wsgi.socket.getaddrinfo",
lambda *x: addr_info_list))
self.useFixture(fixtures.MonkeyPatch(
"heat.common.wsgi.time.time",
mock.Mock(side_effect=[0, 1, 5, 10, 20, 35])))
wsgi.cfg.CONF.heat_api.cert_file = '/etc/ssl/cert'
wsgi.cfg.CONF.heat_api.key_file = '/etc/ssl/key'
wsgi.cfg.CONF.heat_api.ca_file = '/etc/ssl/ca_cert'
wsgi.cfg.CONF.heat_api.tcp_keepidle = 600
def test_correct_configure_socket(self):
mock_socket = mock.Mock()
self.useFixture(fixtures.MonkeyPatch(
'heat.common.wsgi.ssl.wrap_socket',
mock_socket))
self.useFixture(fixtures.MonkeyPatch(
'heat.common.wsgi.eventlet.listen',
lambda *x, **y: mock_socket))
server = wsgi.Server(name='heat-api', conf=cfg.CONF.heat_api)
server.default_port = 1234
server.configure_socket()
self.assertIn(mock.call.setsockopt(
socket.SOL_SOCKET,
socket.SO_REUSEADDR,
1), mock_socket.mock_calls)
self.assertIn(mock.call.setsockopt(
socket.SOL_SOCKET,
socket.SO_KEEPALIVE,
1), mock_socket.mock_calls)
if hasattr(socket, 'TCP_KEEPIDLE'):
self.assertIn(mock.call().setsockopt(
socket.IPPROTO_TCP,
socket.TCP_KEEPIDLE,
wsgi.cfg.CONF.heat_api.tcp_keepidle), mock_socket.mock_calls)
def test_get_socket_without_all_ssl_reqs(self):
wsgi.cfg.CONF.heat_api.key_file = None
self.assertRaises(RuntimeError, wsgi.get_socket,
wsgi.cfg.CONF.heat_api, 1234)
def test_get_socket_with_bind_problems(self):
self.useFixture(fixtures.MonkeyPatch(
'heat.common.wsgi.eventlet.listen',
mock.Mock(side_effect=(
[wsgi.socket.error(socket.errno.EADDRINUSE)] * 3 + [None]))))
self.useFixture(fixtures.MonkeyPatch(
'heat.common.wsgi.ssl.wrap_socket',
lambda *x, **y: None))
self.assertRaises(RuntimeError, wsgi.get_socket,
wsgi.cfg.CONF.heat_api, 1234)
def test_get_socket_with_unexpected_socket_errno(self):
self.useFixture(fixtures.MonkeyPatch(
'heat.common.wsgi.eventlet.listen',
mock.Mock(side_effect=wsgi.socket.error(socket.errno.ENOMEM))))
self.useFixture(fixtures.MonkeyPatch(
'heat.common.wsgi.ssl.wrap_socket',
lambda *x, **y: None))
self.assertRaises(wsgi.socket.error, wsgi.get_socket,
wsgi.cfg.CONF.heat_api, 1234)

View File

@ -0,0 +1,98 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from oslo_concurrency import processutils
from six.moves import configparser
from heat_integrationtests.common import test
class ReloadOnSighupTest(test.HeatIntegrationTest):
def setUp(self):
self.config_file = "/etc/heat/heat.conf"
super(ReloadOnSighupTest, self).setUp()
def _set_config_value(self, service, key, value):
config = configparser.ConfigParser()
config.read(self.config_file)
config.set(service, key, value)
with open(self.config_file, 'wb') as f:
config.write(f)
def _get_config_value(self, service, key):
config = configparser.ConfigParser()
config.read(self.config_file)
val = config.get(service, key)
return val
def _get_heat_api_pids(self, service):
# get the pids of all heat-api processes
if service == "heat_api":
process = "heat-api|grep -Ev 'grep|cloudwatch|cfn'"
else:
process = "%s|grep -Ev 'grep'" % service.replace('_', '-')
cmd = "ps -ef|grep %s|awk '{print $2}'" % process
out, err = processutils.execute(cmd, shell=True)
self.assertIsNotNone(out, "heat-api service not running. %s" % err)
pids = filter(None, out.split('\n'))
# get the parent pids of all heat-api processes
cmd = "ps -ef|grep %s|awk '{print $3}'" % process
out, _ = processutils.execute(cmd, shell=True)
parent_pids = filter(None, out.split('\n'))
heat_api_parent = list(set(pids) & set(parent_pids))[0]
heat_api_children = list(set(pids) - set(parent_pids))
return heat_api_parent, heat_api_children
def _change_config(self, service, old_workers, new_workers):
pre_reload_parent, pre_reload_children = self._get_heat_api_pids(
service)
self.assertEqual(old_workers, len(pre_reload_children))
# change the config values
self._set_config_value(service, 'workers', new_workers)
cmd = "kill -HUP %s" % pre_reload_parent
processutils.execute(cmd, shell=True)
# wait till heat-api reloads
eventlet.sleep(2)
post_reload_parent, post_reload_children = self._get_heat_api_pids(
service)
self.assertEqual(pre_reload_parent, post_reload_parent)
self.assertEqual(new_workers, len(post_reload_children))
# test if all child processes are newly created
self.assertEqual(set(post_reload_children) & set(pre_reload_children),
set())
def _reload(self, service):
old_workers = int(self._get_config_value(service, 'workers'))
new_workers = old_workers + 1
self.addCleanup(self._set_config_value, service, 'workers',
old_workers)
self._change_config(service, old_workers, new_workers)
# revert all the changes made
self._change_config(service, new_workers, old_workers)
def test_api_reload_on_sighup(self):
self._reload('heat_api')
def test_api_cfn_reload_on_sighup(self):
self._reload('heat_api_cfn')
def test_api_cloudwatch_on_sighup(self):
self._reload('heat_api_cloudwatch')

View File

@ -24,7 +24,11 @@ echo -e 'notification_driver=messagingv2\n' >> $localconf
echo -e 'num_engine_workers=2\n' >> $localconf
echo -e 'plugin_dirs=$HEAT_DIR/heat_integrationtests/common/test_resources\n' >> $localconf
echo -e 'hidden_stack_tags=hidden\n' >> $localconf
echo -e '[heat_api]\nworkers=1\n' >> $localconf
echo -e '[heat_api_cfn]\nworkers=1\n' >> $localconf
echo -e '[heat_api_cloudwatch]\nworkers=1' >> $localconf
if [ "$ENABLE_CONVERGENCE" == "true" ] ; then
echo -e 'convergence_engine=true\n' >> $localconf
fi
fi

View File

@ -5,6 +5,7 @@ pbr<2.0,>=0.11
kombu>=3.0.7
oslo.log>=1.2.0 # Apache-2.0
oslo.messaging!=1.12.0,>=1.8.0 # Apache-2.0
oslo.concurrency>=2.1.0
oslo.config>=1.11.0 # Apache-2.0
oslo.utils>=1.6.0 # Apache-2.0
paramiko>=1.13.0