Add NotificationServer to fix swift-reload
This implementation uses abstract sockets for process notifications, similar to systemd's notify sockets, but notifiers use a PID-specific name from a well-known namespace and listeners are assumed to be ephemeral. Update swift-reload to use these instead of polling child processes to determine when a server reload has completed. Bonus: it also acts as a non-blocking lock to prevent two swift-reload commands from reloading a process at the same time. Closes-Bug: #2098405 Related-Change: Ib2dd9513d3bb7c7686e6fa35485317bbad915876 Change-Id: I5f36aba583650bddddff5e55ac557302d023ea1b
This commit is contained in:
@@ -28,11 +28,11 @@ import errno
|
||||
import os
|
||||
import os.path
|
||||
import signal
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
from swift.common.manager import get_child_pids
|
||||
from swift.common.utils import NotificationServer
|
||||
|
||||
|
||||
EXIT_BAD_PID = 2 # similar to argparse exiting 2 on an unknown arg
|
||||
@@ -97,37 +97,32 @@ def main(args=None):
|
||||
|
||||
if args.wait:
|
||||
try:
|
||||
original_children = get_child_pids(args.pid)
|
||||
children_since_reload = set()
|
||||
with NotificationServer(args.pid, args.timeout) as notifications:
|
||||
if args.verbose:
|
||||
print("Sending USR1 signal")
|
||||
os.kill(args.pid, signal.SIGUSR1)
|
||||
|
||||
if args.verbose:
|
||||
print("Sending USR1 signal")
|
||||
os.kill(args.pid, signal.SIGUSR1)
|
||||
try:
|
||||
ready = False
|
||||
while not ready:
|
||||
data = notifications.receive()
|
||||
for data in data.split(b"\n"):
|
||||
if args.verbose:
|
||||
if data in (b"READY=1", b"RELOADING=1",
|
||||
b"STOPPING=1"):
|
||||
print("Process is %s" %
|
||||
data.decode("ascii")[:-2])
|
||||
else:
|
||||
print("Received notification %r" % data)
|
||||
|
||||
start = time.time()
|
||||
while time.time() - start < args.timeout:
|
||||
children = get_child_pids(args.pid)
|
||||
new_children = (children - original_children
|
||||
- children_since_reload)
|
||||
if new_children:
|
||||
if args.verbose:
|
||||
print("Found new children: %s" % ", ".join(
|
||||
str(pid) for pid in new_children))
|
||||
children_since_reload |= new_children
|
||||
if children_since_reload - children:
|
||||
# At least one new child exited; presumably, it was
|
||||
# the temporary child waiting to shutdown sockets
|
||||
break
|
||||
# We want this to be fairly low, since the temporary child
|
||||
# may not hang around very long
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
print("Timed out reloading %s" % script, file=sys.stderr)
|
||||
exit(EXIT_RELOAD_TIMEOUT)
|
||||
|
||||
except subprocess.CalledProcessError:
|
||||
# This could pop during any of the calls to get_child_pids
|
||||
print("Process seems to have died!", file=sys.stderr)
|
||||
if data == b"READY=1":
|
||||
ready = True
|
||||
except socket.timeout:
|
||||
print("Timed out reloading %s" % script, file=sys.stderr)
|
||||
exit(EXIT_RELOAD_TIMEOUT)
|
||||
except OSError as e:
|
||||
print("Could not bind notification socket: %s" % e,
|
||||
file=sys.stderr)
|
||||
exit(EXIT_RELOAD_FAILED)
|
||||
else: # --no-wait
|
||||
if args.verbose:
|
||||
|
||||
@@ -4878,14 +4878,67 @@ def get_db_files(db_path):
|
||||
return sorted(results)
|
||||
|
||||
|
||||
def get_pid_notify_socket(pid=None):
|
||||
"""
|
||||
Get a pid-specific abstract notification socket.
|
||||
|
||||
This is used by the ``swift-reload`` command.
|
||||
"""
|
||||
if pid is None:
|
||||
pid = os.getpid()
|
||||
return '\0swift-notifications\0' + str(pid)
|
||||
|
||||
|
||||
class NotificationServer(object):
|
||||
RECV_SIZE = 1024
|
||||
|
||||
def __init__(self, pid, read_timeout):
|
||||
self.pid = pid
|
||||
self.read_timeout = read_timeout
|
||||
self.sock = None
|
||||
|
||||
def receive(self):
|
||||
return self.sock.recv(self.RECV_SIZE)
|
||||
|
||||
def close(self):
|
||||
self.sock.close()
|
||||
self.sock = None
|
||||
|
||||
def start(self):
|
||||
if self.sock is not None:
|
||||
raise RuntimeError('notification server already started')
|
||||
|
||||
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
started = False
|
||||
try:
|
||||
self.sock.bind(get_pid_notify_socket(self.pid))
|
||||
self.sock.settimeout(self.read_timeout)
|
||||
started = True
|
||||
finally:
|
||||
if not started:
|
||||
self.close()
|
||||
|
||||
def __enter__(self):
|
||||
if self.sock is None:
|
||||
self.start()
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
self.close()
|
||||
|
||||
|
||||
def systemd_notify(logger=None, msg=b"READY=1"):
|
||||
"""
|
||||
Send systemd-compatible notifications.
|
||||
|
||||
Notify the service manager that started this process, if it has set the
|
||||
NOTIFY_SOCKET environment variable. For example, systemd will set this
|
||||
when the unit has ``Type=notify``. More information can be found in
|
||||
systemd documentation:
|
||||
Attempt to send the message to swift's pid-specific notification socket;
|
||||
see :func:`get_pid_notify_socket`. This is used by the ``swift-reload``
|
||||
command.
|
||||
|
||||
Additionally, notify the service manager that started this process, if
|
||||
it has set the NOTIFY_SOCKET environment variable. For example, systemd
|
||||
will set this when the unit has ``Type=notify``. More information can
|
||||
be found in systemd documentation:
|
||||
https://www.freedesktop.org/software/systemd/man/sd_notify.html
|
||||
|
||||
Common messages include::
|
||||
@@ -4893,15 +4946,18 @@ def systemd_notify(logger=None, msg=b"READY=1"):
|
||||
READY=1
|
||||
RELOADING=1
|
||||
STOPPING=1
|
||||
STATUS=<some string>
|
||||
|
||||
:param logger: a logger object
|
||||
:param msg: the message to send
|
||||
"""
|
||||
if not isinstance(msg, bytes):
|
||||
msg = msg.encode('utf8')
|
||||
notify_socket = os.getenv('NOTIFY_SOCKET')
|
||||
if notify_socket:
|
||||
|
||||
notify_sockets = [get_pid_notify_socket()]
|
||||
systemd_socket = os.getenv('NOTIFY_SOCKET')
|
||||
if systemd_socket:
|
||||
notify_sockets.append(systemd_socket)
|
||||
for notify_socket in notify_sockets:
|
||||
if notify_socket.startswith('@'):
|
||||
# abstract namespace socket
|
||||
notify_socket = '\0%s' % notify_socket[1:]
|
||||
@@ -4910,8 +4966,9 @@ def systemd_notify(logger=None, msg=b"READY=1"):
|
||||
try:
|
||||
sock.connect(notify_socket)
|
||||
sock.sendall(msg)
|
||||
except EnvironmentError:
|
||||
if logger:
|
||||
except EnvironmentError as e:
|
||||
if logger and not (notify_socket == notify_sockets[0] and
|
||||
e.errno == errno.ECONNREFUSED):
|
||||
logger.debug("Systemd notification failed", exc_info=True)
|
||||
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
|
||||
import mock
|
||||
import signal
|
||||
import socket
|
||||
import subprocess
|
||||
import unittest
|
||||
|
||||
@@ -122,8 +123,8 @@ class TestMain(unittest.TestCase):
|
||||
self.mock_validate = patcher.start()
|
||||
self.addCleanup(patcher.stop)
|
||||
|
||||
patcher = mock.patch.object(reload, 'get_child_pids')
|
||||
self.mock_get_child_pids = patcher.start()
|
||||
patcher = mock.patch.object(reload, 'NotificationServer')
|
||||
self.mock_notify_server = patcher.start()
|
||||
self.addCleanup(patcher.stop)
|
||||
|
||||
patcher = mock.patch('os.kill')
|
||||
@@ -138,13 +139,11 @@ class TestMain(unittest.TestCase):
|
||||
],
|
||||
'swift-proxy-server',
|
||||
)
|
||||
self.mock_get_child_pids.side_effect = [
|
||||
{'worker1', 'worker2'},
|
||||
{'worker1', 'worker2', 'foster parent'},
|
||||
{'worker1', 'worker2', 'foster parent', 'new worker'},
|
||||
{'worker1', 'worker2', 'new worker'},
|
||||
self.mock_notify_server().__enter__().receive.side_effect = [
|
||||
b'RELOADING=1',
|
||||
b'READY=1',
|
||||
]
|
||||
self.assertIsNone(reload.main(['123']))
|
||||
self.assertIsNone(reload.main(['123', '-v']))
|
||||
self.assertEqual(self.mock_check_call.mock_calls, [mock.call([
|
||||
'/usr/bin/swift-proxy-server',
|
||||
'/etc/swift/proxy-server.conf',
|
||||
@@ -164,10 +163,9 @@ class TestMain(unittest.TestCase):
|
||||
],
|
||||
'swift-proxy-server',
|
||||
)
|
||||
self.mock_get_child_pids.side_effect = [
|
||||
{'worker1', 'worker2'},
|
||||
{'worker1', 'worker2', 'foster parent'},
|
||||
{'worker1', 'worker2', 'foster parent', 'new worker'},
|
||||
self.mock_notify_server().__enter__().receive.side_effect = [
|
||||
b'RELOADING=1',
|
||||
socket.timeout,
|
||||
]
|
||||
with self.assertRaises(SystemExit) as caught:
|
||||
reload.main(['123'])
|
||||
|
||||
@@ -2683,18 +2683,27 @@ cluster_dfw1 = http://dfw1.host/v1/
|
||||
m_socket.reset_mock()
|
||||
m_sock.reset_mock()
|
||||
utils.systemd_notify()
|
||||
self.assertEqual(m_socket.call_count, 0)
|
||||
self.assertEqual(m_sock.connect.call_count, 0)
|
||||
self.assertEqual(m_sock.sendall.call_count, 0)
|
||||
self.assertEqual(m_socket.mock_calls, [
|
||||
mock.call(socket.AF_UNIX, socket.SOCK_DGRAM)])
|
||||
self.assertEqual(m_sock.connect.mock_calls, [
|
||||
mock.call(utils.get_pid_notify_socket())])
|
||||
self.assertEqual(m_sock.sendall.mock_calls, [
|
||||
mock.call(b'READY=1')])
|
||||
|
||||
# File notification socket
|
||||
m_socket.reset_mock()
|
||||
m_sock.reset_mock()
|
||||
os.environ['NOTIFY_SOCKET'] = 'foobar'
|
||||
utils.systemd_notify()
|
||||
m_socket.assert_called_once_with(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
m_sock.connect.assert_called_once_with('foobar')
|
||||
m_sock.sendall.assert_called_once_with(b'READY=1')
|
||||
self.assertEqual(m_socket.mock_calls, [
|
||||
mock.call(socket.AF_UNIX, socket.SOCK_DGRAM),
|
||||
mock.call(socket.AF_UNIX, socket.SOCK_DGRAM)])
|
||||
self.assertEqual(m_sock.connect.mock_calls, [
|
||||
mock.call(utils.get_pid_notify_socket()),
|
||||
mock.call('foobar')])
|
||||
self.assertEqual(m_sock.sendall.mock_calls, [
|
||||
mock.call(b'READY=1'),
|
||||
mock.call(b'READY=1')])
|
||||
# Still there, so we can send STOPPING/RELOADING messages
|
||||
self.assertIn('NOTIFY_SOCKET', os.environ)
|
||||
|
||||
@@ -2702,18 +2711,30 @@ cluster_dfw1 = http://dfw1.host/v1/
|
||||
m_sock.reset_mock()
|
||||
logger = debug_logger()
|
||||
utils.systemd_notify(logger, "RELOADING=1")
|
||||
m_socket.assert_called_once_with(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
m_sock.connect.assert_called_once_with('foobar')
|
||||
m_sock.sendall.assert_called_once_with(b'RELOADING=1')
|
||||
self.assertEqual(m_socket.mock_calls, [
|
||||
mock.call(socket.AF_UNIX, socket.SOCK_DGRAM),
|
||||
mock.call(socket.AF_UNIX, socket.SOCK_DGRAM)])
|
||||
self.assertEqual(m_sock.connect.mock_calls, [
|
||||
mock.call(utils.get_pid_notify_socket()),
|
||||
mock.call('foobar')])
|
||||
self.assertEqual(m_sock.sendall.mock_calls, [
|
||||
mock.call(b'RELOADING=1'),
|
||||
mock.call(b'RELOADING=1')])
|
||||
|
||||
# Abstract notification socket
|
||||
m_socket.reset_mock()
|
||||
m_sock.reset_mock()
|
||||
os.environ['NOTIFY_SOCKET'] = '@foobar'
|
||||
utils.systemd_notify()
|
||||
m_socket.assert_called_once_with(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
m_sock.connect.assert_called_once_with('\0foobar')
|
||||
m_sock.sendall.assert_called_once_with(b'READY=1')
|
||||
self.assertEqual(m_socket.mock_calls, [
|
||||
mock.call(socket.AF_UNIX, socket.SOCK_DGRAM),
|
||||
mock.call(socket.AF_UNIX, socket.SOCK_DGRAM)])
|
||||
self.assertEqual(m_sock.connect.mock_calls, [
|
||||
mock.call(utils.get_pid_notify_socket()),
|
||||
mock.call('\x00foobar')])
|
||||
self.assertEqual(m_sock.sendall.mock_calls, [
|
||||
mock.call(b'READY=1'),
|
||||
mock.call(b'READY=1')])
|
||||
self.assertIn('NOTIFY_SOCKET', os.environ)
|
||||
|
||||
# Test logger with connection error
|
||||
@@ -2733,8 +2754,9 @@ cluster_dfw1 = http://dfw1.host/v1/
|
||||
m_logger.reset_mock()
|
||||
utils.systemd_notify(logger=m_logger)
|
||||
self.assertEqual(0, m_sock.sendall.call_count)
|
||||
m_logger.debug.assert_called_once_with(
|
||||
"Systemd notification failed", exc_info=True)
|
||||
self.assertEqual(m_logger.debug.mock_calls, [
|
||||
mock.call("Systemd notification failed", exc_info=True),
|
||||
mock.call("Systemd notification failed", exc_info=True)])
|
||||
|
||||
# Test it for real
|
||||
def do_test_real_socket(socket_address, notify_socket):
|
||||
@@ -2755,6 +2777,11 @@ cluster_dfw1 = http://dfw1.host/v1/
|
||||
# test abstract socket address
|
||||
do_test_real_socket('\0foobar', '@foobar')
|
||||
|
||||
with utils.NotificationServer(os.getpid(), 1) as swift_listener:
|
||||
do_test_real_socket('\0foobar', '@foobar')
|
||||
self.assertEqual(swift_listener.receive(),
|
||||
b'READY=1')
|
||||
|
||||
def test_md5_with_data(self):
|
||||
if not self.fips_enabled:
|
||||
digest = md5(self.md5_test_data).hexdigest()
|
||||
|
||||
Reference in New Issue
Block a user