wsgi: Reap stale workers (after a timeout) following a reload
Add a new tunable, `stale_worker_timeout`, defaulting to 86400 (i.e. 24 hours). Once this time elapses following a reload, the manager process will issue SIGKILLs to any remaining stale workers. This gives operators a way to configure a limit for how long old code and configs may still be running in their cluster. To enable this, the temporary reload child (which waits for the reload to complete then closes the accept socket on all the old workers) has grown the ability to send state to the re-exec'ed manager. Currently, this is limited to just the set of pre-re-exec child PIDs and their reload times, though it was designed to be reasonably extensible. This allows the new manager to recognize stale workers as they exit instead of logging Ignoring wait() result from unknown PID ... With the improved knowledge of subprocesses, we can kick the log level for the above message up from info to warning; we no longer expect it to trigger in practice. Drive-by: Add logging to ServersPerPortStrategy.register_worker_exit that's comparable to what WorkersStrategy does. Change-Id: I8227939d04fda8db66fb2f131f2c71ce8741c7d9
This commit is contained in:
parent
734ed9cdd8
commit
ae6300af86
doc/source
etc
account-server.conf-samplecontainer-server.conf-sampleobject-server.conf-sampleproxy-server.conf-sample
swift/common
test/unit/common
@ -58,13 +58,16 @@ is as follows:
|
||||
socket. Once all workers have started and can accept new connections,
|
||||
the manager notifies the socket-closer via a pipe. The socket-closer
|
||||
closes the old worker listen sockets so they stop accepting new
|
||||
connections, then exits.
|
||||
connections, passes the list of old workers to the new manager,
|
||||
then exits.
|
||||
|
||||
.. image:: images/reload_process_tree_5.svg
|
||||
|
||||
5. Old workers continue servicing any in-progress connections, while new
|
||||
connections are picked up by new workers. Once an old worker completes
|
||||
all of its oustanding requests, it exits.
|
||||
all of its oustanding requests, it exits. Beginning with Swift 2.33.0,
|
||||
if any workers persist beyond ``stale_worker_timeout``, the new manager
|
||||
will clean them up with ``KILL`` signals.
|
||||
|
||||
.. image:: images/reload_process_tree_6.svg
|
||||
|
||||
|
@ -117,6 +117,12 @@ use = egg:swift#account
|
||||
# will be denied until the disk ha s more space available. Percentage
|
||||
# will be used if the value ends with a '%'.
|
||||
# fallocate_reserve = 1%
|
||||
#
|
||||
# When reloading servers with SIGUSR1, workers running with old config/code
|
||||
# are allowed some time to finish serving in-flight requests. Use this to
|
||||
# configure the grace period (in seconds), after which the reloaded server
|
||||
# will issue SIGKILLs to remaining stale workers.
|
||||
# stale_worker_timeout = 86400
|
||||
|
||||
[filter:healthcheck]
|
||||
use = egg:swift#healthcheck
|
||||
|
@ -127,6 +127,12 @@ use = egg:swift#container
|
||||
# will be denied until the disk ha s more space available. Percentage
|
||||
# will be used if the value ends with a '%'.
|
||||
# fallocate_reserve = 1%
|
||||
#
|
||||
# When reloading servers with SIGUSR1, workers running with old config/code
|
||||
# are allowed some time to finish serving in-flight requests. Use this to
|
||||
# configure the grace period (in seconds), after which the reloaded server
|
||||
# will issue SIGKILLs to remaining stale workers.
|
||||
# stale_worker_timeout = 86400
|
||||
|
||||
[filter:healthcheck]
|
||||
use = egg:swift#healthcheck
|
||||
|
@ -245,6 +245,12 @@ use = egg:swift#object
|
||||
# Work only with ionice_class.
|
||||
# ionice_class =
|
||||
# ionice_priority =
|
||||
#
|
||||
# When reloading servers with SIGUSR1, workers running with old config/code
|
||||
# are allowed some time to finish serving in-flight requests. Use this to
|
||||
# configure the grace period (in seconds), after which the reloaded server
|
||||
# will issue SIGKILLs to remaining stale workers.
|
||||
# stale_worker_timeout = 86400
|
||||
|
||||
[filter:healthcheck]
|
||||
use = egg:swift#healthcheck
|
||||
|
@ -330,6 +330,12 @@ use = egg:swift#proxy
|
||||
# ionice_class =
|
||||
# ionice_priority =
|
||||
#
|
||||
# When reloading servers with SIGUSR1, workers running with old config/code
|
||||
# are allowed some time to finish serving in-flight requests. Use this to
|
||||
# configure the grace period (in seconds), after which the reloaded server
|
||||
# will issue SIGKILLs to remaining stale workers.
|
||||
# stale_worker_timeout = 86400
|
||||
#
|
||||
# When upgrading from liberasurecode<=1.5.0, you may want to continue writing
|
||||
# legacy CRCs until all nodes are upgraded and capabale of reading fragments
|
||||
# with zlib CRCs. liberasurecode>=1.6.2 checks for the environment variable
|
||||
|
@ -5090,3 +5090,19 @@ class CooperativeIterator(ClosingIterator):
|
||||
sleep()
|
||||
self.count += 1
|
||||
return super(CooperativeIterator, self)._get_next_item()
|
||||
|
||||
|
||||
def get_ppid(pid):
|
||||
"""
|
||||
Get the parent process's PID given a child pid.
|
||||
|
||||
:raises OSError: if the child pid cannot be found
|
||||
"""
|
||||
try:
|
||||
with open('/proc/%d/stat' % pid) as fp:
|
||||
stats = fp.read().split()
|
||||
return int(stats[3])
|
||||
except IOError as e:
|
||||
if e.errno == errno.ENOENT:
|
||||
raise OSError(errno.ESRCH, 'No such process')
|
||||
raise
|
||||
|
@ -18,8 +18,10 @@
|
||||
from __future__ import print_function
|
||||
|
||||
import errno
|
||||
import json
|
||||
import os
|
||||
import signal
|
||||
import struct
|
||||
import sys
|
||||
from textwrap import dedent
|
||||
import time
|
||||
@ -45,6 +47,7 @@ from swift.common.utils import capture_stdio, disable_fallocate, \
|
||||
SIGNUM_TO_NAME = {getattr(signal, n): n for n in dir(signal)
|
||||
if n.startswith('SIG') and '_' not in n}
|
||||
NOTIFY_FD_ENV_KEY = '__SWIFT_SERVER_NOTIFY_FD'
|
||||
CHILD_STATE_FD_ENV_KEY = '__SWIFT_SERVER_CHILD_STATE_FD'
|
||||
|
||||
# Set maximum line size of message headers to be accepted.
|
||||
wsgi.MAX_HEADER_LINE = constraints.MAX_HEADER_SIZE
|
||||
@ -474,6 +477,13 @@ class StrategyBase(object):
|
||||
# children to easily drop refs to sibling sockets in post_fork_hook().
|
||||
self.tracking_data = {}
|
||||
|
||||
# When doing a seamless reload, we inherit a bunch of child processes
|
||||
# that should all clean themselves up fairly quickly; track them here
|
||||
self.reload_pids = dict()
|
||||
# If they don't cleanup quickly, we'll start killing them after this
|
||||
self.stale_worker_timeout = utils.non_negative_float(
|
||||
conf.get('stale_worker_timeout', 86400))
|
||||
|
||||
def post_fork_hook(self):
|
||||
"""
|
||||
Called in each forked-off child process, prior to starting the actual
|
||||
@ -523,9 +533,21 @@ class StrategyBase(object):
|
||||
# connections. This is used for seamless reloading using SIGUSR1.
|
||||
reexec_signal_fd = os.getenv(NOTIFY_FD_ENV_KEY)
|
||||
if reexec_signal_fd:
|
||||
if ',' in reexec_signal_fd:
|
||||
reexec_signal_fd, worker_state_fd = reexec_signal_fd.split(',')
|
||||
reexec_signal_fd = int(reexec_signal_fd)
|
||||
os.write(reexec_signal_fd, str(os.getpid()).encode('utf8'))
|
||||
os.close(reexec_signal_fd)
|
||||
worker_state_fd = os.getenv(CHILD_STATE_FD_ENV_KEY)
|
||||
try:
|
||||
self.read_state_from_old_manager(worker_state_fd)
|
||||
except Exception as e:
|
||||
# This was all opportunistic anyway; old swift wouldn't even
|
||||
# *try* to send us any state -- we don't want *new* code to
|
||||
# fail just because *old* code didn't live up to its promise
|
||||
self.logger.warning(
|
||||
'Failed to read state from the old manager: %r', e,
|
||||
exc_info=True)
|
||||
|
||||
# Finally, signal systemd (if appropriate) that process started
|
||||
# properly.
|
||||
@ -533,6 +555,110 @@ class StrategyBase(object):
|
||||
|
||||
self.signaled_ready = True
|
||||
|
||||
def read_state_from_old_manager(self, worker_state_fd):
|
||||
"""
|
||||
Read worker state from the old manager's socket-closer.
|
||||
|
||||
The socket-closing process is the last thing to still have the worker
|
||||
PIDs in its head, so it sends us a JSON dict (prefixed by its length)
|
||||
of the form::
|
||||
|
||||
{
|
||||
"old_pids": {
|
||||
"<old worker>": "<first reload time>",
|
||||
...
|
||||
}
|
||||
}
|
||||
|
||||
More data may be added in the future.
|
||||
|
||||
:param worker_state_fd: The file descriptor that should have the
|
||||
old worker state. Should be passed to us
|
||||
via the ``__SWIFT_SERVER_CHILD_STATE_FD``
|
||||
environment variable.
|
||||
"""
|
||||
if not worker_state_fd:
|
||||
return
|
||||
worker_state_fd = int(worker_state_fd)
|
||||
try:
|
||||
# The temporary manager may have up and died while trying to send
|
||||
# state; hopefully its logs will have more about what went wrong
|
||||
# -- let's just log at warning here
|
||||
data_len = os.read(worker_state_fd, 4)
|
||||
if len(data_len) != 4:
|
||||
self.logger.warning(
|
||||
'Invalid worker state received; expected 4 bytes '
|
||||
'followed by a payload but only received %d bytes',
|
||||
len(data_len))
|
||||
return
|
||||
|
||||
data_len = struct.unpack('!I', data_len)[0]
|
||||
data = b''
|
||||
while len(data) < data_len:
|
||||
chunk = os.read(worker_state_fd, data_len - len(data))
|
||||
if not chunk:
|
||||
break
|
||||
data += chunk
|
||||
if len(data) != data_len:
|
||||
self.logger.warning(
|
||||
'Incomplete worker state received; expected %d '
|
||||
'bytes but only received %d', data_len, len(data))
|
||||
return
|
||||
|
||||
# OK, the temporary manager was able to tell us how much it wanted
|
||||
# to send and send it; from here on, error seems appropriate.
|
||||
try:
|
||||
old_state = json.loads(data)
|
||||
except ValueError:
|
||||
self.logger.error(
|
||||
'Invalid worker state received; '
|
||||
'invalid JSON: %r', data)
|
||||
return
|
||||
|
||||
try:
|
||||
old_pids = {
|
||||
int(pid): float(reloaded)
|
||||
for pid, reloaded in old_state["old_pids"].items()}
|
||||
except (KeyError, TypeError) as err:
|
||||
self.logger.error(
|
||||
'Invalid worker state received; '
|
||||
'error reading old pids: %s', err)
|
||||
self.logger.debug('Received old worker pids: %s', old_pids)
|
||||
self.reload_pids.update(old_pids)
|
||||
|
||||
def smother(old_pids=old_pids, timeout=self.stale_worker_timeout):
|
||||
own_pid = os.getpid()
|
||||
kill_times = sorted(((reloaded + timeout, pid)
|
||||
for pid, reloaded in old_pids.items()),
|
||||
reverse=True)
|
||||
while kill_times:
|
||||
kill_time, pid = kill_times.pop()
|
||||
now = time.time()
|
||||
if kill_time > now:
|
||||
sleep(kill_time - now)
|
||||
try:
|
||||
ppid = utils.get_ppid(pid)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ESRCH:
|
||||
self.logger.error("Could not determine parent "
|
||||
"for stale pid %d: %s", pid, e)
|
||||
continue
|
||||
if ppid == own_pid:
|
||||
self.logger.notice("Killing long-running stale worker "
|
||||
"%d after %ds", pid, int(timeout))
|
||||
try:
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ESRCH:
|
||||
self.logger.error(
|
||||
"Could not kill stale pid %d: %s", pid, e)
|
||||
# else, pid got re-used?
|
||||
|
||||
eventlet.spawn_n(smother)
|
||||
|
||||
finally:
|
||||
os.close(worker_state_fd)
|
||||
|
||||
|
||||
class WorkersStrategy(StrategyBase):
|
||||
"""
|
||||
@ -622,11 +748,17 @@ class WorkersStrategy(StrategyBase):
|
||||
:param int pid: The PID of the worker that exited.
|
||||
"""
|
||||
|
||||
if self.reload_pids.pop(pid, None):
|
||||
self.logger.notice('Removing stale child %d from parent %d',
|
||||
pid, os.getpid())
|
||||
return
|
||||
|
||||
sock = self.tracking_data.pop(pid, None)
|
||||
if sock is None:
|
||||
self.logger.info('Ignoring wait() result from unknown PID %s', pid)
|
||||
self.logger.warning('Ignoring wait() result from unknown PID %d',
|
||||
pid)
|
||||
else:
|
||||
self.logger.error('Removing dead child %s from parent %s',
|
||||
self.logger.error('Removing dead child %d from parent %d',
|
||||
pid, os.getpid())
|
||||
greenio.shutdown_safe(sock)
|
||||
sock.close()
|
||||
@ -639,6 +771,9 @@ class WorkersStrategy(StrategyBase):
|
||||
for sock in self.tracking_data.values():
|
||||
yield sock
|
||||
|
||||
def get_worker_pids(self):
|
||||
return list(self.tracking_data.keys())
|
||||
|
||||
|
||||
class ServersPerPortStrategy(StrategyBase):
|
||||
"""
|
||||
@ -786,14 +921,23 @@ class ServersPerPortStrategy(StrategyBase):
|
||||
:param int pid: The PID of the worker that exited.
|
||||
"""
|
||||
|
||||
if self.reload_pids.pop(pid, None):
|
||||
self.logger.notice('Removing stale child %d from parent %d',
|
||||
pid, os.getpid())
|
||||
return
|
||||
|
||||
for port_data in self.tracking_data.values():
|
||||
for idx, (child_pid, sock) in enumerate(port_data):
|
||||
if child_pid == pid:
|
||||
self.logger.error('Removing dead child %d from parent %d',
|
||||
pid, os.getpid())
|
||||
port_data[idx] = (None, None)
|
||||
greenio.shutdown_safe(sock)
|
||||
sock.close()
|
||||
return
|
||||
|
||||
self.logger.warning('Ignoring wait() result from unknown PID %d', pid)
|
||||
|
||||
def iter_sockets(self):
|
||||
"""
|
||||
Yields all known listen sockets.
|
||||
@ -803,6 +947,12 @@ class ServersPerPortStrategy(StrategyBase):
|
||||
for _pid, sock in port_data:
|
||||
yield sock
|
||||
|
||||
def get_worker_pids(self):
|
||||
return [
|
||||
pid
|
||||
for port_data in self.tracking_data.values()
|
||||
for pid, _sock in port_data]
|
||||
|
||||
|
||||
def check_config(conf_path, app_section, *args, **kwargs):
|
||||
# Load configuration, Set logger and Load request processor
|
||||
@ -1000,24 +1150,29 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
|
||||
# then the old server can't actually ever exit.
|
||||
strategy.set_close_on_exec_on_listen_sockets()
|
||||
read_fd, write_fd = os.pipe()
|
||||
state_rfd, state_wfd = os.pipe()
|
||||
orig_server_pid = os.getpid()
|
||||
child_pid = os.fork()
|
||||
if child_pid:
|
||||
# parent; set env var for fds and reexec ourselves
|
||||
os.close(read_fd)
|
||||
os.close(state_wfd)
|
||||
os.putenv(NOTIFY_FD_ENV_KEY, str(write_fd))
|
||||
os.putenv(CHILD_STATE_FD_ENV_KEY, str(state_rfd))
|
||||
myself = os.path.realpath(sys.argv[0])
|
||||
logger.info("Old server PID=%d re'execing as: %r",
|
||||
orig_server_pid, [myself] + list(sys.argv))
|
||||
if hasattr(os, 'set_inheritable'):
|
||||
# See https://www.python.org/dev/peps/pep-0446/
|
||||
os.set_inheritable(write_fd, True)
|
||||
os.set_inheritable(state_rfd, True)
|
||||
os.execv(myself, sys.argv) # nosec B606
|
||||
logger.error('Somehow lived past os.execv()?!')
|
||||
exit('Somehow lived past os.execv()?!')
|
||||
elif child_pid == 0:
|
||||
# child
|
||||
os.close(write_fd)
|
||||
os.close(state_rfd)
|
||||
logger.info('Old server temporary child PID=%d waiting for '
|
||||
"re-exec'ed PID=%d to signal readiness...",
|
||||
os.getpid(), orig_server_pid)
|
||||
@ -1032,6 +1187,16 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
|
||||
logger.info('Old server temporary child PID=%d notified '
|
||||
'to shutdown old listen sockets by PID=%s',
|
||||
os.getpid(), got_pid)
|
||||
# Ensure new process knows about old children
|
||||
stale_pids = dict(strategy.reload_pids)
|
||||
stale_pids[os.getpid()] = now = time.time()
|
||||
stale_pids.update({
|
||||
pid: now for pid in strategy.get_worker_pids()})
|
||||
data = json.dumps({
|
||||
"old_pids": stale_pids,
|
||||
}).encode('ascii')
|
||||
os.write(state_wfd, struct.pack('!I', len(data)) + data)
|
||||
os.close(state_wfd)
|
||||
else:
|
||||
logger.warning('Old server temporary child PID=%d *NOT* '
|
||||
'notified to shutdown old listen sockets; '
|
||||
|
@ -2826,6 +2826,9 @@ cluster_dfw1 = http://dfw1.host/v1/
|
||||
self.assertRaises(
|
||||
TypeError, md5, None, usedforsecurity=False)
|
||||
|
||||
def test_get_my_ppid(self):
|
||||
self.assertEqual(os.getppid(), utils.get_ppid(os.getpid()))
|
||||
|
||||
|
||||
class TestUnlinkOlder(unittest.TestCase):
|
||||
|
||||
@ -4715,6 +4718,29 @@ class TestDistributeEvenly(unittest.TestCase):
|
||||
self.assertEqual(out, [[0], [1], [2], [3], [4], [], []])
|
||||
|
||||
|
||||
@mock.patch('swift.common.utils.open')
|
||||
class TestGetPpid(unittest.TestCase):
|
||||
def test_happy_path(self, mock_open):
|
||||
mock_open.return_value.__enter__().read.return_value = \
|
||||
'pid comm stat 456 see the procfs(5) man page for more info\n'
|
||||
self.assertEqual(utils.get_ppid(123), 456)
|
||||
self.assertIn(mock.call('/proc/123/stat'), mock_open.mock_calls)
|
||||
|
||||
def test_not_found(self, mock_open):
|
||||
mock_open.side_effect = IOError(errno.ENOENT, "Not there")
|
||||
with self.assertRaises(OSError) as caught:
|
||||
utils.get_ppid(123)
|
||||
self.assertEqual(caught.exception.errno, errno.ESRCH)
|
||||
self.assertEqual(mock_open.mock_calls[0], mock.call('/proc/123/stat'))
|
||||
|
||||
def test_not_allowed(self, mock_open):
|
||||
mock_open.side_effect = OSError(errno.EPERM, "Not for you")
|
||||
with self.assertRaises(OSError) as caught:
|
||||
utils.get_ppid(123)
|
||||
self.assertEqual(caught.exception.errno, errno.EPERM)
|
||||
self.assertEqual(mock_open.mock_calls[0], mock.call('/proc/123/stat'))
|
||||
|
||||
|
||||
class TestShardName(unittest.TestCase):
|
||||
def test(self):
|
||||
ts = utils.Timestamp.now()
|
||||
|
@ -17,10 +17,14 @@
|
||||
|
||||
import configparser
|
||||
import errno
|
||||
import json
|
||||
import logging
|
||||
import signal
|
||||
import socket
|
||||
import struct
|
||||
import unittest
|
||||
import os
|
||||
import eventlet
|
||||
|
||||
from collections import defaultdict
|
||||
from io import BytesIO
|
||||
@ -1274,6 +1278,79 @@ class CommonTestMixin(object):
|
||||
mock.call(self.logger),
|
||||
], mock_capture.mock_calls)
|
||||
|
||||
def test_stale_pid_loading(self):
|
||||
class FakeTime(object):
|
||||
def __init__(self, step=10):
|
||||
self.patchers = [
|
||||
mock.patch('swift.common.wsgi.time.time',
|
||||
side_effect=self.time),
|
||||
mock.patch('swift.common.wsgi.sleep',
|
||||
side_effect=self.sleep),
|
||||
]
|
||||
self.now = 0
|
||||
self.step = step
|
||||
self.sleeps = []
|
||||
|
||||
def time(self):
|
||||
self.now += self.step
|
||||
return self.now
|
||||
|
||||
def sleep(self, delta):
|
||||
if delta < 0:
|
||||
raise ValueError('cannot sleep negative time: %s' % delta)
|
||||
self.now += delta
|
||||
self.sleeps.append(delta)
|
||||
|
||||
def __enter__(self):
|
||||
for patcher in self.patchers:
|
||||
patcher.start()
|
||||
return self
|
||||
|
||||
def __exit__(self, *a):
|
||||
for patcher in self.patchers:
|
||||
patcher.stop()
|
||||
|
||||
notify_rfd, notify_wfd = os.pipe()
|
||||
state_rfd, state_wfd = os.pipe()
|
||||
stale_process_data = {
|
||||
"old_pids": {123: 5, 456: 6, 78: 27, 90: 28},
|
||||
}
|
||||
to_write = json.dumps(stale_process_data).encode('ascii')
|
||||
os.write(state_wfd, struct.pack('!I', len(to_write)) + to_write)
|
||||
os.close(state_wfd)
|
||||
self.assertEqual(self.strategy.reload_pids, {})
|
||||
os.environ['__SWIFT_SERVER_NOTIFY_FD'] = str(notify_wfd)
|
||||
os.environ['__SWIFT_SERVER_CHILD_STATE_FD'] = str(state_rfd)
|
||||
with mock.patch('swift.common.wsgi.capture_stdio'), \
|
||||
mock.patch('swift.common.utils.get_ppid') as mock_ppid, \
|
||||
mock.patch('os.kill') as mock_kill, FakeTime() as fake_time:
|
||||
mock_ppid.side_effect = [
|
||||
os.getpid(),
|
||||
OSError(errno.ENOENT, "Not there"),
|
||||
OSError(errno.EPERM, "Not for you"),
|
||||
os.getpid(),
|
||||
]
|
||||
self.strategy.signal_ready()
|
||||
self.assertEqual(self.strategy.reload_pids,
|
||||
stale_process_data['old_pids'])
|
||||
|
||||
# We spawned our child-killer, but it hasn't been scheduled yet
|
||||
self.assertEqual(mock_ppid.mock_calls, [])
|
||||
self.assertEqual(mock_kill.mock_calls, [])
|
||||
self.assertEqual(fake_time.sleeps, [])
|
||||
|
||||
# *Now* we let it run (with mocks still enabled)
|
||||
eventlet.sleep()
|
||||
|
||||
self.assertEqual(str(os.getpid()).encode('ascii'),
|
||||
os.read(notify_rfd, 30))
|
||||
os.close(notify_rfd)
|
||||
|
||||
self.assertEqual(mock_kill.mock_calls, [
|
||||
mock.call(123, signal.SIGKILL),
|
||||
mock.call(90, signal.SIGKILL)])
|
||||
self.assertEqual(fake_time.sleeps, [86395, 2])
|
||||
|
||||
|
||||
class TestServersPerPortStrategy(unittest.TestCase, CommonTestMixin):
|
||||
def setUp(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user