Sync service and systemd modules from oslo-incubator

This patch make systemd know when neutron-service was started. This is
needed in HA environment, previously systemd returned success even
before neutron-server was able to handle requests.

Current oslo-incubator commit on HEAD:
b7ad6ddab8b1d61bf4f52ccaa461a9d68809747b

Implements: blueprint service-readiness
Change-Id: Ic9e4abd11b614a896fbd7454b9a604a69a248d0f
This commit is contained in:
Jakub Libosvar 2014-03-17 16:36:01 +01:00
parent db232425e6
commit 149679b2dc
3 changed files with 143 additions and 32 deletions

View File

@ -38,9 +38,10 @@ from eventlet import event
from oslo.config import cfg from oslo.config import cfg
from neutron.openstack.common import eventlet_backdoor from neutron.openstack.common import eventlet_backdoor
from neutron.openstack.common.gettextutils import _ from neutron.openstack.common.gettextutils import _LE, _LI, _LW
from neutron.openstack.common import importutils from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
from neutron.openstack.common import systemd
from neutron.openstack.common import threadgroup from neutron.openstack.common import threadgroup
@ -163,7 +164,7 @@ class ServiceLauncher(Launcher):
status = None status = None
signo = 0 signo = 0
LOG.debug(_('Full set of CONF:')) LOG.debug('Full set of CONF:')
CONF.log_opt_values(LOG, std_logging.DEBUG) CONF.log_opt_values(LOG, std_logging.DEBUG)
try: try:
@ -172,7 +173,7 @@ class ServiceLauncher(Launcher):
super(ServiceLauncher, self).wait() super(ServiceLauncher, self).wait()
except SignalExit as exc: except SignalExit as exc:
signame = _signo_to_signame(exc.signo) signame = _signo_to_signame(exc.signo)
LOG.info(_('Caught %s, exiting'), signame) LOG.info(_LI('Caught %s, exiting'), signame)
status = exc.code status = exc.code
signo = exc.signo signo = exc.signo
except SystemExit as exc: except SystemExit as exc:
@ -184,7 +185,7 @@ class ServiceLauncher(Launcher):
rpc.cleanup() rpc.cleanup()
except Exception: except Exception:
# We're shutting down, so it doesn't matter at this point. # We're shutting down, so it doesn't matter at this point.
LOG.exception(_('Exception during rpc cleanup.')) LOG.exception(_LE('Exception during rpc cleanup.'))
return status, signo return status, signo
@ -235,7 +236,7 @@ class ProcessLauncher(object):
# dies unexpectedly # dies unexpectedly
self.readpipe.read() self.readpipe.read()
LOG.info(_('Parent process has died unexpectedly, exiting')) LOG.info(_LI('Parent process has died unexpectedly, exiting'))
sys.exit(1) sys.exit(1)
@ -266,13 +267,13 @@ class ProcessLauncher(object):
launcher.wait() launcher.wait()
except SignalExit as exc: except SignalExit as exc:
signame = _signo_to_signame(exc.signo) signame = _signo_to_signame(exc.signo)
LOG.info(_('Caught %s, exiting'), signame) LOG.info(_LI('Caught %s, exiting'), signame)
status = exc.code status = exc.code
signo = exc.signo signo = exc.signo
except SystemExit as exc: except SystemExit as exc:
status = exc.code status = exc.code
except BaseException: except BaseException:
LOG.exception(_('Unhandled exception')) LOG.exception(_LE('Unhandled exception'))
status = 2 status = 2
finally: finally:
launcher.stop() launcher.stop()
@ -305,7 +306,7 @@ class ProcessLauncher(object):
# start up quickly but ensure we don't fork off children that # start up quickly but ensure we don't fork off children that
# die instantly too quickly. # die instantly too quickly.
if time.time() - wrap.forktimes[0] < wrap.workers: if time.time() - wrap.forktimes[0] < wrap.workers:
LOG.info(_('Forking too fast, sleeping')) LOG.info(_LI('Forking too fast, sleeping'))
time.sleep(1) time.sleep(1)
wrap.forktimes.pop(0) wrap.forktimes.pop(0)
@ -324,7 +325,7 @@ class ProcessLauncher(object):
os._exit(status) os._exit(status)
LOG.info(_('Started child %d'), pid) LOG.info(_LI('Started child %d'), pid)
wrap.children.add(pid) wrap.children.add(pid)
self.children[pid] = wrap self.children[pid] = wrap
@ -334,7 +335,7 @@ class ProcessLauncher(object):
def launch_service(self, service, workers=1): def launch_service(self, service, workers=1):
wrap = ServiceWrapper(service, workers) wrap = ServiceWrapper(service, workers)
LOG.info(_('Starting %d workers'), wrap.workers) LOG.info(_LI('Starting %d workers'), wrap.workers)
while self.running and len(wrap.children) < wrap.workers: while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap) self._start_child(wrap)
@ -351,15 +352,15 @@ class ProcessLauncher(object):
if os.WIFSIGNALED(status): if os.WIFSIGNALED(status):
sig = os.WTERMSIG(status) sig = os.WTERMSIG(status)
LOG.info(_('Child %(pid)d killed by signal %(sig)d'), LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'),
dict(pid=pid, sig=sig)) dict(pid=pid, sig=sig))
else: else:
code = os.WEXITSTATUS(status) code = os.WEXITSTATUS(status)
LOG.info(_('Child %(pid)s exited with status %(code)d'), LOG.info(_LI('Child %(pid)s exited with status %(code)d'),
dict(pid=pid, code=code)) dict(pid=pid, code=code))
if pid not in self.children: if pid not in self.children:
LOG.warning(_('pid %d not in child list'), pid) LOG.warning(_LW('pid %d not in child list'), pid)
return None return None
wrap = self.children.pop(pid) wrap = self.children.pop(pid)
@ -381,22 +382,25 @@ class ProcessLauncher(object):
def wait(self): def wait(self):
"""Loop waiting on children to die and respawning as necessary.""" """Loop waiting on children to die and respawning as necessary."""
LOG.debug(_('Full set of CONF:')) LOG.debug('Full set of CONF:')
CONF.log_opt_values(LOG, std_logging.DEBUG) CONF.log_opt_values(LOG, std_logging.DEBUG)
while True: try:
self.handle_signal() while True:
self._respawn_children() self.handle_signal()
if self.sigcaught: self._respawn_children()
signame = _signo_to_signame(self.sigcaught) if self.sigcaught:
LOG.info(_('Caught %s, stopping children'), signame) signame = _signo_to_signame(self.sigcaught)
if not _is_sighup_and_daemon(self.sigcaught): LOG.info(_LI('Caught %s, stopping children'), signame)
break if not _is_sighup_and_daemon(self.sigcaught):
break
for pid in self.children: for pid in self.children:
os.kill(pid, signal.SIGHUP) os.kill(pid, signal.SIGHUP)
self.running = True self.running = True
self.sigcaught = None self.sigcaught = None
except eventlet.greenlet.GreenletExit:
LOG.info(_LI("Wait called after thread killed. Cleaning up."))
for pid in self.children: for pid in self.children:
try: try:
@ -407,7 +411,7 @@ class ProcessLauncher(object):
# Wait for children to die # Wait for children to die
if self.children: if self.children:
LOG.info(_('Waiting on %d children to exit'), len(self.children)) LOG.info(_LI('Waiting on %d children to exit'), len(self.children))
while self.children: while self.children:
self._wait_child() self._wait_child()
@ -484,14 +488,16 @@ class Services(object):
""" """
service.start() service.start()
systemd.notify_once()
done.wait() done.wait()
def launch(service, workers=None): def launch(service, workers=1):
if workers: if workers is None or workers == 1:
launcher = ProcessLauncher()
launcher.launch_service(service, workers=workers)
else:
launcher = ServiceLauncher() launcher = ServiceLauncher()
launcher.launch_service(service) launcher.launch_service(service)
else:
launcher = ProcessLauncher()
launcher.launch_service(service, workers=workers)
return launcher return launcher

View File

@ -0,0 +1,104 @@
# Copyright 2012-2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Helper module for systemd service readiness notification.
"""
import os
import socket
import sys
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def _abstractify(socket_name):
if socket_name.startswith('@'):
# abstract namespace socket
socket_name = '\0%s' % socket_name[1:]
return socket_name
def _sd_notify(unset_env, msg):
notify_socket = os.getenv('NOTIFY_SOCKET')
if notify_socket:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
sock.connect(_abstractify(notify_socket))
sock.sendall(msg)
if unset_env:
del os.environ['NOTIFY_SOCKET']
except EnvironmentError:
LOG.debug("Systemd notification failed", exc_info=True)
finally:
sock.close()
def notify():
"""Send notification to Systemd that service is ready.
For details see
http://www.freedesktop.org/software/systemd/man/sd_notify.html
"""
_sd_notify(False, 'READY=1')
def notify_once():
"""Send notification once to Systemd that service is ready.
Systemd sets NOTIFY_SOCKET environment variable with the name of the
socket listening for notifications from services.
This method removes the NOTIFY_SOCKET environment variable to ensure
notification is sent only once.
"""
_sd_notify(True, 'READY=1')
def onready(notify_socket, timeout):
"""Wait for systemd style notification on the socket.
:param notify_socket: local socket address
:type notify_socket: string
:param timeout: socket timeout
:type timeout: float
:returns: 0 service ready
1 service not ready
2 timeout occured
"""
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.settimeout(timeout)
sock.bind(_abstractify(notify_socket))
try:
msg = sock.recv(512)
except socket.timeout:
return 2
finally:
sock.close()
if 'READY=1' in msg:
return 0
else:
return 1
if __name__ == '__main__':
# simple CLI for testing
if len(sys.argv) == 1:
notify()
elif len(sys.argv) >= 2:
timeout = float(sys.argv[1])
notify_socket = os.getenv('NOTIFY_SOCKET')
if notify_socket:
retval = onready(notify_socket, timeout)
sys.exit(retval)

View File

@ -26,6 +26,7 @@ module=processutils
module=rpc module=rpc
module=service module=service
module=sslutils module=sslutils
module=systemd
module=threadgroup module=threadgroup
module=timeutils module=timeutils
module=uuidutils module=uuidutils