Update wsgi.py to support reload from conf on SIGHUP

These changes are taken from the glance wsgi.py from where
heat wsgi.py originated. These changes are to support reload
of the service with new configuration parameters when SIGHUP
is received.

The reload of conf will only work if the server is running
with workers. If workers is 0 then it wont reload.

Co-Authored-By: Tetiana Lashchova <tlashchova@mirantis.com>
Change-Id: I7fd391209591c149fb95136d92dd1d82cfb7a6de
Closes-Bug: #1449807
Closes-Bug: #1276694
This commit is contained in:
tyagi 2015-05-21 02:53:14 -07:00 committed by Tetiana Lashchova
parent ddd4189a2d
commit 2cf8aa6fad
9 changed files with 415 additions and 58 deletions

View File

@ -64,8 +64,8 @@ if __name__ == '__main__':
{'host': host, 'port': port})
profiler.setup('heat-api', host)
gmr.TextGuruMeditation.setup_autorun(version)
server = wsgi.Server()
server.start(app, cfg.CONF.heat_api, default_port=port)
server = wsgi.Server('heat-api', cfg.CONF.heat_api)
server.start(app, default_port=port)
systemd.notify_once()
server.wait()
except RuntimeError as e:

View File

@ -68,8 +68,8 @@ if __name__ == '__main__':
{'host': host, 'port': port})
profiler.setup('heat-api-cfn', host)
gmr.TextGuruMeditation.setup_autorun(version)
server = wsgi.Server()
server.start(app, cfg.CONF.heat_api_cfn, default_port=port)
server = wsgi.Server('heat-api-cfn', cfg.CONF.heat_api_cfn)
server.start(app, default_port=port)
systemd.notify_once()
server.wait()
except RuntimeError as e:

View File

@ -68,8 +68,9 @@ if __name__ == '__main__':
{'host': host, 'port': port})
profiler.setup('heat-api-cloudwatch', host)
gmr.TextGuruMeditation.setup_autorun(version)
server = wsgi.Server()
server.start(app, cfg.CONF.heat_api_cloudwatch, default_port=port)
server = wsgi.Server('heat-api-cloudwatch',
cfg.CONF.heat_api_cloudwatch)
server.start(app, default_port=port)
systemd.notify_once()
server.wait()
except RuntimeError as e:

View File

@ -520,3 +520,7 @@ class ObjectFieldInvalid(HeatException):
class KeystoneServiceNameConflict(HeatException):
msg_fmt = _("Keystone has more than one service with same name "
"%(service)s. Please use service id instead of name")
class SIGHUPInterrupt(HeatException):
msg_fmt = _("System SIGHUP signal received.")

View File

@ -33,6 +33,8 @@ from eventlet.green import socket
from eventlet.green import ssl
import eventlet.greenio
import eventlet.wsgi
import functools
from oslo_concurrency import processutils
from oslo_config import cfg
import oslo_i18n as i18n
from oslo_log import log as logging
@ -77,7 +79,7 @@ api_opts = [
help=_("Location of the SSL key file to use "
"for enabling SSL mode."),
deprecated_group='DEFAULT'),
cfg.IntOpt('workers', default=0,
cfg.IntOpt('workers', default=processutils.get_worker_count(),
help=_("Number of workers for Heat service."),
deprecated_group='DEFAULT'),
cfg.IntOpt('max_header_line', default=16384,
@ -85,6 +87,10 @@ api_opts = [
'max_header_line may need to be increased when using '
'large tokens (typically those generated by the '
'Keystone v3 API with big service catalogs).')),
cfg.IntOpt('tcp_keepidle', default=600,
help=_('The value for the socket option TCP_KEEPIDLE. This is '
'the time in seconds that the connection must be idle '
'before TCP starts sending keepalive probes.')),
]
api_group = cfg.OptGroup('heat_api')
cfg.CONF.register_group(api_group)
@ -119,6 +125,10 @@ api_cfn_opts = [
'max_header_line may need to be increased when using '
'large tokens (typically those generated by the '
'Keystone v3 API with big service catalogs).')),
cfg.IntOpt('tcp_keepidle', default=600,
help=_('The value for the socket option TCP_KEEPIDLE. This is '
'the time in seconds that the connection must be idle '
'before TCP starts sending keepalive probes.')),
]
api_cfn_group = cfg.OptGroup('heat_api_cfn')
cfg.CONF.register_group(api_cfn_group)
@ -153,6 +163,10 @@ api_cw_opts = [
'max_header_line may need to be increased when using '
'large tokens (typically those generated by the '
'Keystone v3 API with big service catalogs.)')),
cfg.IntOpt('tcp_keepidle', default=600,
help=_('The value for the socket option TCP_KEEPIDLE. This is '
'the time in seconds that the connection must be idle '
'before TCP starts sending keepalive probes.')),
]
api_cw_group = cfg.OptGroup('heat_api_cloudwatch')
cfg.CONF.register_group(api_cw_group)
@ -227,11 +241,9 @@ def get_socket(conf, default_port):
retry_until = time.time() + 30
while not sock and time.time() < retry_until:
try:
sock = eventlet.listen(bind_addr, backlog=conf.backlog,
sock = eventlet.listen(bind_addr,
backlog=conf.backlog,
family=address_family)
if use_ssl:
sock = ssl.wrap_socket(sock, certfile=cert_file,
keyfile=key_file)
except socket.error as err:
if err.args[0] != errno.EADDRINUSE:
raise
@ -240,13 +252,6 @@ def get_socket(conf, default_port):
raise RuntimeError(_("Could not bind to %(bind_addr)s"
"after trying for 30 seconds")
% {'bind_addr': bind_addr})
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# in my experience, sockets can hang around forever without keepalive
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# This option isn't available in the OS X version of eventlet
if hasattr(socket, 'TCP_KEEPIDLE'):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 600)
return sock
@ -265,53 +270,64 @@ class WritableLogger(object):
class Server(object):
"""Server class to manage multiple WSGI sockets and applications."""
def __init__(self, threads=1000):
def __init__(self, name, conf, threads=1000):
os.umask(0o27) # ensure files are created with the correct privileges
self._logger = logging.getLogger("eventlet.wsgi.server")
self._wsgi_logger = WritableLogger(self._logger)
self.name = name
self.threads = threads
self.children = []
self.children = set()
self.stale_children = set()
self.running = True
self.pgid = os.getpid()
self.conf = conf
try:
os.setpgid(self.pgid, self.pgid)
except OSError:
self.pgid = 0
def start(self, application, conf, default_port):
def kill_children(self, *args):
"""Kills the entire process group."""
LOG.error(_LE('SIGTERM received'))
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
self.running = False
os.killpg(0, signal.SIGTERM)
def hup(self, *args):
"""
Reloads configuration files with zero down time.
"""
LOG.error(_LE('SIGHUP received'))
signal.signal(signal.SIGHUP, signal.SIG_IGN)
raise exception.SIGHUPInterrupt
def start(self, application, default_port):
"""
Run a WSGI server with the given application.
:param application: The application to run in the WSGI server
:param conf: a cfg.ConfigOpts object
:param default_port: Port to bind to if none is specified in conf
"""
def kill_children(*args):
"""Kills the entire process group."""
LOG.error(_LE('SIGTERM received'))
signal.signal(signal.SIGTERM, signal.SIG_IGN)
self.running = False
os.killpg(0, signal.SIGTERM)
def hup(*args):
"""
Shuts down the server(s), but allows running requests to complete
"""
LOG.error(_LE('SIGHUP received'))
signal.signal(signal.SIGHUP, signal.SIG_IGN)
os.killpg(0, signal.SIGHUP)
signal.signal(signal.SIGHUP, hup)
eventlet.wsgi.MAX_HEADER_LINE = conf.max_header_line
eventlet.wsgi.MAX_HEADER_LINE = self.conf.max_header_line
self.application = application
self.sock = get_socket(conf, default_port)
self.default_port = default_port
self.configure_socket()
self.start_wsgi()
os.umask(0o27) # ensure files are created with the correct privileges
self._logger = logging.getLogger("eventlet.wsgi.server")
self._wsgi_logger = WritableLogger(self._logger)
if conf.workers == 0:
def start_wsgi(self):
if self.conf.workers == 0:
# Useful for profiling, test, debug etc.
self.pool = eventlet.GreenPool(size=self.threads)
self.pool.spawn_n(self._single_run, application, self.sock)
self.pool.spawn_n(self._single_run, self.application, self.sock)
return
LOG.info(_LI("Starting %d workers"), conf.workers)
signal.signal(signal.SIGTERM, kill_children)
signal.signal(signal.SIGHUP, hup)
while len(self.children) < conf.workers:
LOG.info(_LI("Starting %d workers"), self.conf.workers)
signal.signal(signal.SIGTERM, self.kill_children)
signal.signal(signal.SIGINT, self.kill_children)
signal.signal(signal.SIGHUP, self.hup)
while len(self.children) < self.conf.workers:
self.run_child()
def wait_on_children(self):
@ -319,9 +335,8 @@ class Server(object):
try:
pid, status = os.wait()
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
LOG.error(_LE('Removing dead child %s'), pid)
self.children.remove(pid)
self.run_child()
self._remove_children(pid)
self._verify_and_respawn_children(pid, status)
except OSError as err:
if err.errno not in (errno.EINTR, errno.ECHILD):
raise
@ -329,10 +344,151 @@ class Server(object):
LOG.info(_LI('Caught keyboard interrupt. Exiting.'))
os.killpg(0, signal.SIGTERM)
break
except exception.SIGHUPInterrupt:
self.reload()
continue
eventlet.greenio.shutdown_safe(self.sock)
self.sock.close()
LOG.debug('Exited')
def configure_socket(self, old_conf=None, has_changed=None):
"""
Ensure a socket exists and is appropriately configured.
This function is called on start up, and can also be
called in the event of a configuration reload.
When called for the first time a new socket is created.
If reloading and either bind_host or bind port have been
changed the existing socket must be closed and a new
socket opened (laws of physics).
In all other cases (bind_host/bind_port have not changed)
the existing socket is reused.
:param old_conf: Cached old configuration settings (if any)
:param has changed: callable to determine if a parameter has changed
"""
# Do we need a fresh socket?
new_sock = (old_conf is None or (
has_changed('bind_host') or
has_changed('bind_port')))
# Will we be using https?
use_ssl = not (not self.conf.cert_file or not self.conf.key_file)
# Were we using https before?
old_use_ssl = (old_conf is not None and not (
not old_conf.get('key_file') or
not old_conf.get('cert_file')))
# Do we now need to perform an SSL wrap on the socket?
wrap_sock = use_ssl is True and (old_use_ssl is False or new_sock)
# Do we now need to perform an SSL unwrap on the socket?
unwrap_sock = use_ssl is False and old_use_ssl is True
if new_sock:
self._sock = None
if old_conf is not None:
self.sock.close()
_sock = get_socket(self.conf, self.default_port)
_sock.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
# sockets can hang around forever without keepalive
_sock.setsockopt(socket.SOL_SOCKET,
socket.SO_KEEPALIVE, 1)
self._sock = _sock
if wrap_sock:
self.sock = ssl.wrap_socket(self._sock,
certfile=self.conf.cert_file,
keyfile=self.conf.key_file)
if unwrap_sock:
self.sock = self._sock
if new_sock and not use_ssl:
self.sock = self._sock
# Pick up newly deployed certs
if old_conf is not None and use_ssl is True and old_use_ssl is True:
if has_changed('cert_file'):
self.sock.certfile = self.conf.cert_file
if has_changed('key_file'):
self.sock.keyfile = self.conf.key_file
if new_sock or (old_conf is not None and has_changed('tcp_keepidle')):
# This option isn't available in the OS X version of eventlet
if hasattr(socket, 'TCP_KEEPIDLE'):
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
self.conf.tcp_keepidle)
if old_conf is not None and has_changed('backlog'):
self.sock.listen(self.conf.backlog)
def _remove_children(self, pid):
if pid in self.children:
self.children.remove(pid)
LOG.info(_LI('Removed dead child %s'), pid)
elif pid in self.stale_children:
self.stale_children.remove(pid)
LOG.info(_LI('Removed stale child %s'), pid)
else:
LOG.warn(_LW('Unrecognised child %s'), pid)
def _verify_and_respawn_children(self, pid, status):
if len(self.stale_children) == 0:
LOG.debug('No stale children')
if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0:
LOG.error(_LE('Not respawning child %d, cannot '
'recover from termination'), pid)
if not self.children and not self.stale_children:
LOG.info(
_LI('All workers have terminated. Exiting'))
self.running = False
else:
if len(self.children) < self.conf.workers:
self.run_child()
def stash_conf_values(self):
"""
Make a copy of some of the current global CONF's settings.
Allows determining if any of these values have changed
when the config is reloaded.
"""
conf = {}
conf['bind_host'] = self.conf.bind_host
conf['bind_port'] = self.conf.bind_port
conf['backlog'] = self.conf.backlog
conf['key_file'] = self.conf.key_file
conf['cert_file'] = self.conf.cert_file
return conf
def reload(self):
"""
Reload and re-apply configuration settings
Existing child processes are sent a SIGHUP signal
and will exit after completing existing requests.
New child processes, which will have the updated
configuration, are spawned. This allows preventing
interruption to the service.
"""
def _has_changed(old, new, param):
old = old.get(param)
new = getattr(new, param)
return (new != old)
old_conf = self.stash_conf_values()
has_changed = functools.partial(_has_changed, old_conf, self.conf)
cfg.CONF.reload_config_files()
os.killpg(self.pgid, signal.SIGHUP)
self.stale_children = self.children
self.children = set()
# Ensure any logging config changes are picked up
logging.setup(cfg.CONF, self.name)
self.configure_socket(old_conf, has_changed)
self.start_wsgi()
def wait(self):
"""Wait until all servers have completed running."""
try:
@ -344,16 +500,32 @@ class Server(object):
pass
def run_child(self):
def child_hup(*args):
"""Shuts down child processes, existing requests are handled."""
signal.signal(signal.SIGHUP, signal.SIG_IGN)
eventlet.wsgi.is_accepting = False
self.sock.close()
pid = os.fork()
if pid == 0:
signal.signal(signal.SIGHUP, signal.SIG_DFL)
signal.signal(signal.SIGHUP, child_hup)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
# ignore the interrupt signal to avoid a race whereby
# a child worker receives the signal before the parent
# and is respawned unnecessarily as a result
signal.signal(signal.SIGINT, signal.SIG_IGN)
# The child has no need to stash the unwrapped
# socket, and the reference prevents a clean
# exit on sighup
self._sock = None
self.run_server()
LOG.info(_LI('Child %d exiting normally'), os.getpid())
return
# self.pool.waitall() is now called in wsgi's server so
# it's safe to exit here
sys.exit(0)
else:
LOG.info(_LI('Started child %s'), pid)
self.children.append(pid)
self.children.add(pid)
def run_server(self):
"""Run a WSGI server."""

View File

@ -15,13 +15,16 @@
# under the License.
import fixtures
import json
from oslo_config import cfg
import mock
import six
import socket
import stubout
import webob
from oslo_config import cfg
from heat.api.aws import exception as aws_exception
from heat.common import exception
from heat.common import wsgi
@ -398,3 +401,77 @@ class JSONRequestDeserializerTest(common.HeatTestCase):
'(%s bytes) exceeds maximum allowed size (%s bytes).' % (
len(body), cfg.CONF.max_json_body_size))
self.assertEqual(msg, six.text_type(error))
class GetSocketTestCase(common.HeatTestCase):
def setUp(self):
super(GetSocketTestCase, self).setUp()
self.useFixture(fixtures.MonkeyPatch(
"heat.common.wsgi.get_bind_addr",
lambda x, y: ('192.168.0.13', 1234)))
addr_info_list = [(2, 1, 6, '', ('192.168.0.13', 80)),
(2, 2, 17, '', ('192.168.0.13', 80)),
(2, 3, 0, '', ('192.168.0.13', 80))]
self.useFixture(fixtures.MonkeyPatch(
"heat.common.wsgi.socket.getaddrinfo",
lambda *x: addr_info_list))
self.useFixture(fixtures.MonkeyPatch(
"heat.common.wsgi.time.time",
mock.Mock(side_effect=[0, 1, 5, 10, 20, 35])))
wsgi.cfg.CONF.heat_api.cert_file = '/etc/ssl/cert'
wsgi.cfg.CONF.heat_api.key_file = '/etc/ssl/key'
wsgi.cfg.CONF.heat_api.ca_file = '/etc/ssl/ca_cert'
wsgi.cfg.CONF.heat_api.tcp_keepidle = 600
def test_correct_configure_socket(self):
mock_socket = mock.Mock()
self.useFixture(fixtures.MonkeyPatch(
'heat.common.wsgi.ssl.wrap_socket',
mock_socket))
self.useFixture(fixtures.MonkeyPatch(
'heat.common.wsgi.eventlet.listen',
lambda *x, **y: mock_socket))
server = wsgi.Server(name='heat-api', conf=cfg.CONF.heat_api)
server.default_port = 1234
server.configure_socket()
self.assertIn(mock.call.setsockopt(
socket.SOL_SOCKET,
socket.SO_REUSEADDR,
1), mock_socket.mock_calls)
self.assertIn(mock.call.setsockopt(
socket.SOL_SOCKET,
socket.SO_KEEPALIVE,
1), mock_socket.mock_calls)
if hasattr(socket, 'TCP_KEEPIDLE'):
self.assertIn(mock.call().setsockopt(
socket.IPPROTO_TCP,
socket.TCP_KEEPIDLE,
wsgi.cfg.CONF.heat_api.tcp_keepidle), mock_socket.mock_calls)
def test_get_socket_without_all_ssl_reqs(self):
wsgi.cfg.CONF.heat_api.key_file = None
self.assertRaises(RuntimeError, wsgi.get_socket,
wsgi.cfg.CONF.heat_api, 1234)
def test_get_socket_with_bind_problems(self):
self.useFixture(fixtures.MonkeyPatch(
'heat.common.wsgi.eventlet.listen',
mock.Mock(side_effect=(
[wsgi.socket.error(socket.errno.EADDRINUSE)] * 3 + [None]))))
self.useFixture(fixtures.MonkeyPatch(
'heat.common.wsgi.ssl.wrap_socket',
lambda *x, **y: None))
self.assertRaises(RuntimeError, wsgi.get_socket,
wsgi.cfg.CONF.heat_api, 1234)
def test_get_socket_with_unexpected_socket_errno(self):
self.useFixture(fixtures.MonkeyPatch(
'heat.common.wsgi.eventlet.listen',
mock.Mock(side_effect=wsgi.socket.error(socket.errno.ENOMEM))))
self.useFixture(fixtures.MonkeyPatch(
'heat.common.wsgi.ssl.wrap_socket',
lambda *x, **y: None))
self.assertRaises(wsgi.socket.error, wsgi.get_socket,
wsgi.cfg.CONF.heat_api, 1234)

View File

@ -0,0 +1,98 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from oslo_concurrency import processutils
from six.moves import configparser
from heat_integrationtests.common import test
class ReloadOnSighupTest(test.HeatIntegrationTest):
def setUp(self):
self.config_file = "/etc/heat/heat.conf"
super(ReloadOnSighupTest, self).setUp()
def _set_config_value(self, service, key, value):
config = configparser.ConfigParser()
config.read(self.config_file)
config.set(service, key, value)
with open(self.config_file, 'wb') as f:
config.write(f)
def _get_config_value(self, service, key):
config = configparser.ConfigParser()
config.read(self.config_file)
val = config.get(service, key)
return val
def _get_heat_api_pids(self, service):
# get the pids of all heat-api processes
if service == "heat_api":
process = "heat-api|grep -Ev 'grep|cloudwatch|cfn'"
else:
process = "%s|grep -Ev 'grep'" % service.replace('_', '-')
cmd = "ps -ef|grep %s|awk '{print $2}'" % process
out, err = processutils.execute(cmd, shell=True)
self.assertIsNotNone(out, "heat-api service not running. %s" % err)
pids = filter(None, out.split('\n'))
# get the parent pids of all heat-api processes
cmd = "ps -ef|grep %s|awk '{print $3}'" % process
out, _ = processutils.execute(cmd, shell=True)
parent_pids = filter(None, out.split('\n'))
heat_api_parent = list(set(pids) & set(parent_pids))[0]
heat_api_children = list(set(pids) - set(parent_pids))
return heat_api_parent, heat_api_children
def _change_config(self, service, old_workers, new_workers):
pre_reload_parent, pre_reload_children = self._get_heat_api_pids(
service)
self.assertEqual(old_workers, len(pre_reload_children))
# change the config values
self._set_config_value(service, 'workers', new_workers)
cmd = "kill -HUP %s" % pre_reload_parent
processutils.execute(cmd, shell=True)
# wait till heat-api reloads
eventlet.sleep(2)
post_reload_parent, post_reload_children = self._get_heat_api_pids(
service)
self.assertEqual(pre_reload_parent, post_reload_parent)
self.assertEqual(new_workers, len(post_reload_children))
# test if all child processes are newly created
self.assertEqual(set(post_reload_children) & set(pre_reload_children),
set())
def _reload(self, service):
old_workers = int(self._get_config_value(service, 'workers'))
new_workers = old_workers + 1
self.addCleanup(self._set_config_value, service, 'workers',
old_workers)
self._change_config(service, old_workers, new_workers)
# revert all the changes made
self._change_config(service, new_workers, old_workers)
def test_api_reload_on_sighup(self):
self._reload('heat_api')
def test_api_cfn_reload_on_sighup(self):
self._reload('heat_api_cfn')
def test_api_cloudwatch_on_sighup(self):
self._reload('heat_api_cloudwatch')

View File

@ -24,7 +24,11 @@ echo -e 'notification_driver=messagingv2\n' >> $localconf
echo -e 'num_engine_workers=2\n' >> $localconf
echo -e 'plugin_dirs=$HEAT_DIR/heat_integrationtests/common/test_resources\n' >> $localconf
echo -e 'hidden_stack_tags=hidden\n' >> $localconf
echo -e '[heat_api]\nworkers=1\n' >> $localconf
echo -e '[heat_api_cfn]\nworkers=1\n' >> $localconf
echo -e '[heat_api_cloudwatch]\nworkers=1' >> $localconf
if [ "$ENABLE_CONVERGENCE" == "true" ] ; then
echo -e 'convergence_engine=true\n' >> $localconf
fi
fi

View File

@ -5,6 +5,7 @@ pbr<2.0,>=0.11
kombu>=3.0.7
oslo.log>=1.2.0 # Apache-2.0
oslo.messaging!=1.12.0,>=1.8.0 # Apache-2.0
oslo.concurrency>=2.1.0
oslo.config>=1.11.0 # Apache-2.0
oslo.utils>=1.6.0 # Apache-2.0
paramiko>=1.13.0