Browse Source
oslo.service has graduated, so heat should consume it. Change-Id: I9c1899eb37505e64967e9cb77da23bb169498aba Closes-Bug: #1466851changes/94/194494/4
20 changed files with 18 additions and 1080 deletions
@ -1,151 +0,0 @@
|
||||
# Copyright (c) 2012 OpenStack Foundation. |
||||
# Administrator of the National Aeronautics and Space Administration. |
||||
# All Rights Reserved. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
||||
|
||||
from __future__ import print_function |
||||
|
||||
import copy |
||||
import errno |
||||
import gc |
||||
import logging |
||||
import os |
||||
import pprint |
||||
import socket |
||||
import sys |
||||
import traceback |
||||
|
||||
import eventlet.backdoor |
||||
import greenlet |
||||
from oslo_config import cfg |
||||
|
||||
from heat.openstack.common._i18n import _LI |
||||
|
||||
help_for_backdoor_port = ( |
||||
"Acceptable values are 0, <port>, and <start>:<end>, where 0 results " |
||||
"in listening on a random tcp port number; <port> results in listening " |
||||
"on the specified port number (and not enabling backdoor if that port " |
||||
"is in use); and <start>:<end> results in listening on the smallest " |
||||
"unused port number within the specified range of port numbers. The " |
||||
"chosen port is displayed in the service's log file.") |
||||
eventlet_backdoor_opts = [ |
||||
cfg.StrOpt('backdoor_port', |
||||
help="Enable eventlet backdoor. %s" % help_for_backdoor_port) |
||||
] |
||||
|
||||
CONF = cfg.CONF |
||||
CONF.register_opts(eventlet_backdoor_opts) |
||||
LOG = logging.getLogger(__name__) |
||||
|
||||
|
||||
def list_opts(): |
||||
"""Entry point for oslo-config-generator. |
||||
""" |
||||
return [(None, copy.deepcopy(eventlet_backdoor_opts))] |
||||
|
||||
|
||||
class EventletBackdoorConfigValueError(Exception): |
||||
def __init__(self, port_range, help_msg, ex): |
||||
msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. ' |
||||
'%(help)s' % |
||||
{'range': port_range, 'ex': ex, 'help': help_msg}) |
||||
super(EventletBackdoorConfigValueError, self).__init__(msg) |
||||
self.port_range = port_range |
||||
|
||||
|
||||
def _dont_use_this(): |
||||
print("Don't use this, just disconnect instead") |
||||
|
||||
|
||||
def _find_objects(t): |
||||
return [o for o in gc.get_objects() if isinstance(o, t)] |
||||
|
||||
|
||||
def _print_greenthreads(): |
||||
for i, gt in enumerate(_find_objects(greenlet.greenlet)): |
||||
print(i, gt) |
||||
traceback.print_stack(gt.gr_frame) |
||||
print() |
||||
|
||||
|
||||
def _print_nativethreads(): |
||||
for threadId, stack in sys._current_frames().items(): |
||||
print(threadId) |
||||
traceback.print_stack(stack) |
||||
print() |
||||
|
||||
|
||||
def _parse_port_range(port_range): |
||||
if ':' not in port_range: |
||||
start, end = port_range, port_range |
||||
else: |
||||
start, end = port_range.split(':', 1) |
||||
try: |
||||
start, end = int(start), int(end) |
||||
if end < start: |
||||
raise ValueError |
||||
return start, end |
||||
except ValueError as ex: |
||||
raise EventletBackdoorConfigValueError(port_range, ex, |
||||
help_for_backdoor_port) |
||||
|
||||
|
||||
def _listen(host, start_port, end_port, listen_func): |
||||
try_port = start_port |
||||
while True: |
||||
try: |
||||
return listen_func((host, try_port)) |
||||
except socket.error as exc: |
||||
if (exc.errno != errno.EADDRINUSE or |
||||
try_port >= end_port): |
||||
raise |
||||
try_port += 1 |
||||
|
||||
|
||||
def initialize_if_enabled(): |
||||
backdoor_locals = { |
||||
'exit': _dont_use_this, # So we don't exit the entire process |
||||
'quit': _dont_use_this, # So we don't exit the entire process |
||||
'fo': _find_objects, |
||||
'pgt': _print_greenthreads, |
||||
'pnt': _print_nativethreads, |
||||
} |
||||
|
||||
if CONF.backdoor_port is None: |
||||
return None |
||||
|
||||
start_port, end_port = _parse_port_range(str(CONF.backdoor_port)) |
||||
|
||||
# NOTE(johannes): The standard sys.displayhook will print the value of |
||||
# the last expression and set it to __builtin__._, which overwrites |
||||
# the __builtin__._ that gettext sets. Let's switch to using pprint |
||||
# since it won't interact poorly with gettext, and it's easier to |
||||
# read the output too. |
||||
def displayhook(val): |
||||
if val is not None: |
||||
pprint.pprint(val) |
||||
sys.displayhook = displayhook |
||||
|
||||
sock = _listen('localhost', start_port, end_port, eventlet.listen) |
||||
|
||||
# In the case of backdoor port being zero, a port number is assigned by |
||||
# listen(). In any case, pull the port number out here. |
||||
port = sock.getsockname()[1] |
||||
LOG.info( |
||||
_LI('Eventlet backdoor listening on %(port)s for process %(pid)d'), |
||||
{'port': port, 'pid': os.getpid()} |
||||
) |
||||
eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock, |
||||
locals=backdoor_locals) |
||||
return port |
@ -1,147 +0,0 @@
|
||||
# Copyright 2010 United States Government as represented by the |
||||
# Administrator of the National Aeronautics and Space Administration. |
||||
# Copyright 2011 Justin Santa Barbara |
||||
# All Rights Reserved. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
||||
|
||||
import logging |
||||
import sys |
||||
import time |
||||
|
||||
from eventlet import event |
||||
from eventlet import greenthread |
||||
|
||||
from heat.openstack.common._i18n import _LE, _LW |
||||
|
||||
LOG = logging.getLogger(__name__) |
||||
|
||||
# NOTE(zyluo): This lambda function was declared to avoid mocking collisions |
||||
# with time.time() called in the standard logging module |
||||
# during unittests. |
||||
_ts = lambda: time.time() |
||||
|
||||
|
||||
class LoopingCallDone(Exception): |
||||
"""Exception to break out and stop a LoopingCallBase. |
||||
|
||||
The poll-function passed to LoopingCallBase can raise this exception to |
||||
break out of the loop normally. This is somewhat analogous to |
||||
StopIteration. |
||||
|
||||
An optional return-value can be included as the argument to the exception; |
||||
this return-value will be returned by LoopingCallBase.wait() |
||||
|
||||
""" |
||||
|
||||
def __init__(self, retvalue=True): |
||||
""":param retvalue: Value that LoopingCallBase.wait() should return.""" |
||||
self.retvalue = retvalue |
||||
|
||||
|
||||
class LoopingCallBase(object): |
||||
def __init__(self, f=None, *args, **kw): |
||||
self.args = args |
||||
self.kw = kw |
||||
self.f = f |
||||
self._running = False |
||||
self.done = None |
||||
|
||||
def stop(self): |
||||
self._running = False |
||||
|
||||
def wait(self): |
||||
return self.done.wait() |
||||
|
||||
|
||||
class FixedIntervalLoopingCall(LoopingCallBase): |
||||
"""A fixed interval looping call.""" |
||||
|
||||
def start(self, interval, initial_delay=None): |
||||
self._running = True |
||||
done = event.Event() |
||||
|
||||
def _inner(): |
||||
if initial_delay: |
||||
greenthread.sleep(initial_delay) |
||||
|
||||
try: |
||||
while self._running: |
||||
start = _ts() |
||||
self.f(*self.args, **self.kw) |
||||
end = _ts() |
||||
if not self._running: |
||||
break |
||||
delay = end - start - interval |
||||
if delay > 0: |
||||
LOG.warning(_LW('task %(func_name)r run outlasted ' |
||||
'interval by %(delay).2f sec'), |
||||
{'func_name': self.f, 'delay': delay}) |
||||
greenthread.sleep(-delay if delay < 0 else 0) |
||||
except LoopingCallDone as e: |
||||
self.stop() |
||||
done.send(e.retvalue) |
||||
except Exception: |
||||
LOG.exception(_LE('in fixed duration looping call')) |
||||
done.send_exception(*sys.exc_info()) |
||||
return |
||||
else: |
||||
done.send(True) |
||||
|
||||
self.done = done |
||||
|
||||
greenthread.spawn_n(_inner) |
||||
return self.done |
||||
|
||||
|
||||
class DynamicLoopingCall(LoopingCallBase): |
||||
"""A looping call which sleeps until the next known event. |
||||
|
||||
The function called should return how long to sleep for before being |
||||
called again. |
||||
""" |
||||
|
||||
def start(self, initial_delay=None, periodic_interval_max=None): |
||||
self._running = True |
||||
done = event.Event() |
||||
|
||||
def _inner(): |
||||
if initial_delay: |
||||
greenthread.sleep(initial_delay) |
||||
|
||||
try: |
||||
while self._running: |
||||
idle = self.f(*self.args, **self.kw) |
||||
if not self._running: |
||||
break |
||||
|
||||
if periodic_interval_max is not None: |
||||
idle = min(idle, periodic_interval_max) |
||||
LOG.debug('Dynamic looping call %(func_name)r sleeping ' |
||||
'for %(idle).02f seconds', |
||||
{'func_name': self.f, 'idle': idle}) |
||||
greenthread.sleep(idle) |
||||
except LoopingCallDone as e: |
||||
self.stop() |
||||
done.send(e.retvalue) |
||||
except Exception: |
||||
LOG.exception(_LE('in dynamic looping call')) |
||||
done.send_exception(*sys.exc_info()) |
||||
return |
||||
else: |
||||
done.send(True) |
||||
|
||||
self.done = done |
||||
|
||||
greenthread.spawn(_inner) |
||||
return self.done |
@ -1,507 +0,0 @@
|
||||
# Copyright 2010 United States Government as represented by the |
||||
# Administrator of the National Aeronautics and Space Administration. |
||||
# Copyright 2011 Justin Santa Barbara |
||||
# All Rights Reserved. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
||||
|
||||
"""Generic Node base class for all workers that run on hosts.""" |
||||
|
||||
import errno |
||||
import io |
||||
import logging |
||||
import os |
||||
import random |
||||
import signal |
||||
import sys |
||||
import time |
||||
|
||||
import eventlet |
||||
from eventlet import event |
||||
from oslo_config import cfg |
||||
|
||||
from heat.openstack.common import eventlet_backdoor |
||||
from heat.openstack.common._i18n import _LE, _LI, _LW |
||||
from heat.openstack.common import systemd |
||||
from heat.openstack.common import threadgroup |
||||
|
||||
|
||||
CONF = cfg.CONF |
||||
LOG = logging.getLogger(__name__) |
||||
|
||||
|
||||
def _sighup_supported(): |
||||
return hasattr(signal, 'SIGHUP') |
||||
|
||||
|
||||
def _is_daemon(): |
||||
# The process group for a foreground process will match the |
||||
# process group of the controlling terminal. If those values do |
||||
# not match, or ioctl() fails on the stdout file handle, we assume |
||||
# the process is running in the background as a daemon. |
||||
# http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics |
||||
try: |
||||
is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno()) |
||||
except io.UnsupportedOperation: |
||||
# Could not get the fileno for stdout, so we must be a daemon. |
||||
is_daemon = True |
||||
except OSError as err: |
||||
if err.errno == errno.ENOTTY: |
||||
# Assume we are a daemon because there is no terminal. |
||||
is_daemon = True |
||||
else: |
||||
raise |
||||
return is_daemon |
||||
|
||||
|
||||
def _is_sighup_and_daemon(signo): |
||||
if not (_sighup_supported() and signo == signal.SIGHUP): |
||||
# Avoid checking if we are a daemon, because the signal isn't |
||||
# SIGHUP. |
||||
return False |
||||
return _is_daemon() |
||||
|
||||
|
||||
def _signo_to_signame(signo): |
||||
signals = {signal.SIGTERM: 'SIGTERM', |
||||
signal.SIGINT: 'SIGINT'} |
||||
if _sighup_supported(): |
||||
signals[signal.SIGHUP] = 'SIGHUP' |
||||
return signals[signo] |
||||
|
||||
|
||||
def _set_signals_handler(handler): |
||||
signal.signal(signal.SIGTERM, handler) |
||||
signal.signal(signal.SIGINT, handler) |
||||
if _sighup_supported(): |
||||
signal.signal(signal.SIGHUP, handler) |
||||
|
||||
|
||||
class Launcher(object): |
||||
"""Launch one or more services and wait for them to complete.""" |
||||
|
||||
def __init__(self): |
||||
"""Initialize the service launcher. |
||||
|
||||
:returns: None |
||||
|
||||
""" |
||||
self.services = Services() |
||||
self.backdoor_port = eventlet_backdoor.initialize_if_enabled() |
||||
|
||||
def launch_service(self, service): |
||||
"""Load and start the given service. |
||||
|
||||
:param service: The service you would like to start. |
||||
:returns: None |
||||
|
||||
""" |
||||
service.backdoor_port = self.backdoor_port |
||||
self.services.add(service) |
||||
|
||||
def stop(self): |
||||
"""Stop all services which are currently running. |
||||
|
||||
:returns: None |
||||
|
||||
""" |
||||
self.services.stop() |
||||
|
||||
def wait(self): |
||||
"""Waits until all services have been stopped, and then returns. |
||||
|
||||
:returns: None |
||||
|
||||
""" |
||||
self.services.wait() |
||||
|
||||
def restart(self): |
||||
"""Reload config files and restart service. |
||||
|
||||
:returns: None |
||||
|
||||
""" |
||||
cfg.CONF.reload_config_files() |
||||
self.services.restart() |
||||
|
||||
|
||||
class SignalExit(SystemExit): |
||||
def __init__(self, signo, exccode=1): |
||||
super(SignalExit, self).__init__(exccode) |
||||
self.signo = signo |
||||
|
||||
|
||||
class ServiceLauncher(Launcher): |
||||
def _handle_signal(self, signo, frame): |
||||
# Allow the process to be killed again and die from natural causes |
||||
_set_signals_handler(signal.SIG_DFL) |
||||
raise SignalExit(signo) |
||||
|
||||
def handle_signal(self): |
||||
_set_signals_handler(self._handle_signal) |
||||
|
||||
def _wait_for_exit_or_signal(self, ready_callback=None): |
||||
status = None |
||||
signo = 0 |
||||
|
||||
LOG.debug('Full set of CONF:') |
||||
CONF.log_opt_values(LOG, logging.DEBUG) |
||||
|
||||
try: |
||||
if ready_callback: |
||||
ready_callback() |
||||
super(ServiceLauncher, self).wait() |
||||
except SignalExit as exc: |
||||
signame = _signo_to_signame(exc.signo) |
||||
LOG.info(_LI('Caught %s, exiting'), signame) |
||||
status = exc.code |
||||
signo = exc.signo |
||||
except SystemExit as exc: |
||||
status = exc.code |
||||
finally: |
||||
self.stop() |
||||
|
||||
return status, signo |
||||
|
||||
def wait(self, ready_callback=None): |
||||
systemd.notify_once() |
||||
while True: |
||||
self.handle_signal() |
||||
status, signo = self._wait_for_exit_or_signal(ready_callback) |
||||
if not _is_sighup_and_daemon(signo): |
||||
return status |
||||
self.restart() |
||||
|
||||
|
||||
class ServiceWrapper(object): |
||||
def __init__(self, service, workers): |
||||
self.service = service |
||||
self.workers = workers |
||||
self.children = set() |
||||
self.forktimes = [] |
||||
|
||||
|
||||
class ProcessLauncher(object): |
||||
_signal_handlers_set = set() |
||||
|
||||
@classmethod |
||||
def _handle_class_signals(cls, *args, **kwargs): |
||||
for handler in cls._signal_handlers_set: |
||||
handler(*args, **kwargs) |
||||
|
||||
def __init__(self, wait_interval=0.01): |
||||
"""Constructor. |
||||
|
||||
:param wait_interval: The interval to sleep for between checks |
||||
of child process exit. |
||||
""" |
||||
self.children = {} |
||||
self.sigcaught = None |
||||
self.running = True |
||||
self.wait_interval = wait_interval |
||||
rfd, self.writepipe = os.pipe() |
||||
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') |
||||
self.handle_signal() |
||||
|
||||
def handle_signal(self): |
||||
self._signal_handlers_set.add(self._handle_signal) |
||||
_set_signals_handler(self._handle_class_signals) |
||||
|
||||
def _handle_signal(self, signo, frame): |
||||
self.sigcaught = signo |
||||
self.running = False |
||||
|
||||
# Allow the process to be killed again and die from natural causes |
||||
_set_signals_handler(signal.SIG_DFL) |
||||
|
||||
def _pipe_watcher(self): |
||||
# This will block until the write end is closed when the parent |
||||
# dies unexpectedly |
||||
self.readpipe.read(1) |
||||
|
||||
LOG.info(_LI('Parent process has died unexpectedly, exiting')) |
||||
|
||||
sys.exit(1) |
||||
|
||||
def _child_process_handle_signal(self): |
||||
# Setup child signal handlers differently |
||||
def _sighup(*args): |
||||
signal.signal(signal.SIGHUP, signal.SIG_DFL) |
||||
raise SignalExit(signal.SIGHUP) |
||||
|
||||
# Parent signals with SIGTERM when it wants us to go away. |
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL) |
||||
if _sighup_supported(): |
||||
signal.signal(signal.SIGHUP, _sighup) |
||||
# Block SIGINT and let the parent send us a SIGTERM |
||||
signal.signal(signal.SIGINT, signal.SIG_IGN) |
||||
|
||||
def _child_wait_for_exit_or_signal(self, launcher): |
||||
status = 0 |
||||
signo = 0 |
||||
|
||||
# NOTE(johannes): All exceptions are caught to ensure this |
||||
# doesn't fallback into the loop spawning children. It would |
||||
# be bad for a child to spawn more children. |
||||
try: |
||||
launcher.wait() |
||||
except SignalExit as exc: |
||||
signame = _signo_to_signame(exc.signo) |
||||
LOG.info(_LI('Child caught %s, exiting'), signame) |
||||
status = exc.code |
||||
signo = exc.signo |
||||
except SystemExit as exc: |
||||
status = exc.code |
||||
except BaseException: |
||||
LOG.exception(_LE('Unhandled exception')) |
||||
status = 2 |
||||
finally: |
||||
launcher.stop() |
||||
|
||||
return status, signo |
||||
|
||||
def _child_process(self, service): |
||||
self._child_process_handle_signal() |
||||
|
||||
# Reopen the eventlet hub to make sure we don't share an epoll |
||||
# fd with parent and/or siblings, which would be bad |
||||
eventlet.hubs.use_hub() |
||||
|
||||
# Close write to ensure only parent has it open |
||||
os.close(self.writepipe) |
||||
# Create greenthread to watch for parent to close pipe |
||||
eventlet.spawn_n(self._pipe_watcher) |
||||
|
||||
# Reseed random number generator |
||||
random.seed() |
||||
|
||||
launcher = Launcher() |
||||
launcher.launch_service(service) |
||||
return launcher |
||||
|
||||
def _start_child(self, wrap): |
||||
if len(wrap.forktimes) > wrap.workers: |
||||
# Limit ourselves to one process a second (over the period of |
||||
# number of workers * 1 second). This will allow workers to |
||||
# start up quickly but ensure we don't fork off children that |
||||
# die instantly too quickly. |
||||
if time.time() - wrap.forktimes[0] < wrap.workers: |
||||
LOG.info(_LI('Forking too fast, sleeping')) |
||||
time.sleep(1) |
||||
|
||||
wrap.forktimes.pop(0) |
||||
|
||||
wrap.forktimes.append(time.time()) |
||||
|
||||
pid = os.fork() |
||||
if pid == 0: |
||||
launcher = self._child_process(wrap.service) |
||||
while True: |
||||
self._child_process_handle_signal() |
||||
status, signo = self._child_wait_for_exit_or_signal(launcher) |
||||
if not _is_sighup_and_daemon(signo): |
||||
break |
||||
launcher.restart() |
||||
|
||||
os._exit(status) |
||||
|
||||
LOG.info(_LI('Started child %d'), pid) |
||||
|
||||
wrap.children.add(pid) |
||||
self.children[pid] = wrap |
||||
|
||||
return pid |
||||
|
||||
def launch_service(self, service, workers=1): |
||||
wrap = ServiceWrapper(service, workers) |
||||
|
||||
LOG.info(_LI('Starting %d workers'), wrap.workers) |
||||
while self.running and len(wrap.children) < wrap.workers: |
||||
self._start_child(wrap) |
||||
|
||||
def _wait_child(self): |
||||
try: |
||||
# Don't block if no child processes have exited |
||||
pid, status = os.waitpid(0, os.WNOHANG) |
||||
if not pid: |
||||
return None |
||||
except OSError as exc: |
||||
if exc.errno not in (errno.EINTR, errno.ECHILD): |
||||
raise |
||||
return None |
||||
|
||||
if os.WIFSIGNALED(status): |
||||
sig = os.WTERMSIG(status) |
||||
LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'), |
||||
dict(pid=pid, sig=sig)) |
||||
else: |
||||
code = os.WEXITSTATUS(status) |
||||
LOG.info(_LI('Child %(pid)s exited with status %(code)d'), |
||||
dict(pid=pid, code=code)) |
||||
|
||||
if pid not in self.children: |
||||
LOG.warning(_LW('pid %d not in child list'), pid) |
||||
return None |
||||
|
||||
wrap = self.children.pop(pid) |
||||
wrap.children.remove(pid) |
||||
return wrap |
||||
|
||||
def _respawn_children(self): |
||||
while self.running: |
||||
wrap = self._wait_child() |
||||
if not wrap: |
||||
# Yield to other threads if no children have exited |
||||
# Sleep for a short time to avoid excessive CPU usage |
||||
# (see bug #1095346) |
||||
eventlet.greenthread.sleep(self.wait_interval) |
||||
continue |
||||
while self.running and len(wrap.children) < wrap.workers: |
||||
self._start_child(wrap) |
||||
|
||||
def wait(self): |
||||
"""Loop waiting on children to die and respawning as necessary.""" |
||||
|
||||
systemd.notify_once() |
||||
LOG.debug('Full set of CONF:') |
||||
CONF.log_opt_values(LOG, logging.DEBUG) |
||||
|
||||
try: |
||||
while True: |
||||
self.handle_signal() |
||||
self._respawn_children() |
||||
# No signal means that stop was called. Don't clean up here. |
||||
if not self.sigcaught: |
||||
return |
||||
|
||||
signame = _signo_to_signame(self.sigcaught) |
||||
LOG.info(_LI('Caught %s, stopping children'), signame) |
||||
if not _is_sighup_and_daemon(self.sigcaught): |
||||
break |
||||
|
||||
cfg.CONF.reload_config_files() |
||||
for service in set( |
||||
[wrap.service for wrap in self.children.values()]): |
||||
service.reset() |
||||
|
||||
for pid in self.children: |
||||
os.kill(pid, signal.SIGHUP) |
||||
|
||||
self.running = True |
||||
self.sigcaught = None |
||||
except eventlet.greenlet.GreenletExit: |
||||
LOG.info(_LI("Wait called after thread killed. Cleaning up.")) |
||||
|
||||
self.stop() |
||||
|
||||
def stop(self): |
||||
"""Terminate child processes and wait on each.""" |
||||
self.running = False |
||||
for pid in self.children: |
||||
try: |
||||
os.kill(pid, signal.SIGTERM) |
||||
except OSError as exc: |
||||
if exc.errno != errno.ESRCH: |
||||
raise |
||||
|
||||
# Wait for children to die |
||||
if self.children: |
||||
LOG.info(_LI('Waiting on %d children to exit'), len(self.children)) |
||||
while self.children: |
||||
self._wait_child() |
||||
|
||||
|
||||
class Service(object): |
||||
"""Service object for binaries running on hosts.""" |
||||
|
||||
def __init__(self, threads=1000): |
||||
self.tg = threadgroup.ThreadGroup(threads) |
||||
|
||||
# signal that the service is done shutting itself down: |
||||
self._done = event.Event() |
||||
|
||||
def reset(self): |
||||
# NOTE(Fengqian): docs for Event.reset() recommend against using it |
||||
self._done = event.Event() |
||||
|
||||
def start(self): |
||||
pass |
||||
|
||||
def stop(self, graceful=False): |
||||
self.tg.stop(graceful) |
||||
self.tg.wait() |
||||
# Signal that service cleanup is done: |
||||
if not self._done.ready(): |
||||
self._done.send() |
||||
|
||||
def wait(self): |
||||
self._done.wait() |
||||
|
||||
|
||||
class Services(object): |
||||
|
||||
def __init__(self): |
||||
self.services = [] |
||||
self.tg = threadgroup.ThreadGroup() |
||||
self.done = event.Event() |
||||
|
||||
def add(self, service): |
||||
self.services.append(service) |
||||
self.tg.add_thread(self.run_service, service, self.done) |
||||
|
||||
def stop(self): |
||||
# wait for graceful shutdown of services: |
||||
for service in self.services: |
||||
service.stop() |
||||
service.wait() |
||||
|
||||
# Each service has performed cleanup, now signal that the run_service |
||||
# wrapper threads can now die: |
||||
if not self.done.ready(): |
||||
self.done.send() |
||||
|
||||
# reap threads: |
||||
self.tg.stop() |
||||
|
||||
def wait(self): |
||||
self.tg.wait() |
||||
|
||||
def restart(self): |
||||
self.stop() |
||||
self.done = event.Event() |
||||
for restart_service in self.services: |
||||
restart_service.reset() |
||||
self.tg.add_thread(self.run_service, restart_service, self.done) |
||||
|
||||
@staticmethod |
||||
def run_service(service, done): |
||||
"""Service start wrapper. |
||||
|
||||
:param service: service to run |
||||
:param done: event to wait on until a shutdown is triggered |
||||
:returns: None |
||||
|
||||
""" |
||||
service.start() |
||||
done.wait() |
||||
|
||||
|
||||
def launch(service, workers=1): |
||||
if workers is None or workers == 1: |
||||
launcher = ServiceLauncher() |
||||
launcher.launch_service(service) |
||||
else: |
||||
launcher = ProcessLauncher() |
||||
launcher.launch_service(service, workers=workers) |
||||
|
||||
return launcher |
@ -1,105 +0,0 @@
|
||||
# Copyright 2012-2014 Red Hat, Inc. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
||||
|
||||
""" |
||||
Helper module for systemd service readiness notification. |
||||
""" |
||||
|
||||
import logging |
||||
import os |
||||
import socket |
||||
import sys |
||||
|
||||
|
||||
LOG = logging.getLogger(__name__) |
||||
|
||||
|
||||
def _abstractify(socket_name): |
||||
if socket_name.startswith('@'): |
||||
# abstract namespace socket |
||||
socket_name = '\0%s' % socket_name[1:] |
||||
return socket_name |
||||
|
||||
|
||||
def _sd_notify(unset_env, msg): |
||||
notify_socket = os.getenv('NOTIFY_SOCKET') |
||||
if notify_socket: |
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) |
||||
try: |
||||
sock.connect(_abstractify(notify_socket)) |
||||
sock.sendall(msg) |
||||
if unset_env: |
||||
del os.environ['NOTIFY_SOCKET'] |
||||
except EnvironmentError: |
||||
LOG.debug("Systemd notification failed", exc_info=True) |
||||
finally: |
||||
sock.close() |
||||
|
||||
|
||||
def notify(): |
||||
"""Send notification to Systemd that service is ready. |
||||
|
||||
For details see |
||||
http://www.freedesktop.org/software/systemd/man/sd_notify.html |
||||
""" |
||||
_sd_notify(False, 'READY=1') |
||||
|
||||
|
||||
def notify_once(): |
||||
"""Send notification once to Systemd that service is ready. |
||||
|
||||
Systemd sets NOTIFY_SOCKET environment variable with the name of the |
||||
socket listening for notifications from services. |
||||
This method removes the NOTIFY_SOCKET environment variable to ensure |
||||
notification is sent only once. |
||||
""" |
||||
_sd_notify(True, 'READY=1') |
||||
|
||||
|
||||
def onready(notify_socket, timeout): |
||||
"""Wait for systemd style notification on the socket. |
||||
|
||||
:param notify_socket: local socket address |
||||
:type notify_socket: string |
||||
:param timeout: socket timeout |
||||
:type timeout: float |
||||
:returns: 0 service ready |
||||
1 service not ready |
||||
2 timeout occurred |
||||
""" |
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) |
||||
sock.settimeout(timeout) |
||||
sock.bind(_abstractify(notify_socket)) |
||||
try: |
||||
msg = sock.recv(512) |
||||
except socket.timeout: |
||||
return 2 |
||||
finally: |
||||
sock.close() |
||||
if 'READY=1' in msg: |
||||
return 0 |
||||
else: |
||||
return 1 |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
# simple CLI for testing |
||||
if len(sys.argv) == 1: |
||||
notify() |
||||
elif len(sys.argv) >= 2: |
||||
timeout = float(sys.argv[1]) |
||||
notify_socket = os.getenv('NOTIFY_SOCKET') |
||||
if notify_socket: |
||||
retval = onready(notify_socket, timeout) |
||||
sys.exit(retval) |
@ -1,150 +0,0 @@
|
||||
# Copyright 2012 Red Hat, Inc. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
||||
import logging |
||||
import threading |
||||
|
||||
import eventlet |
||||
from eventlet import greenpool |
||||
|
||||
from heat.openstack.common._i18n import _LE |
||||
from heat.openstack.common import loopingcall |
||||
|
||||
|
||||
LOG = logging.getLogger(__name__) |
||||
|
||||
|
||||
def _thread_done(gt, *args, **kwargs): |
||||
"""Callback function to be passed to GreenThread.link() when we spawn() |
||||
Calls the :class:`ThreadGroup` to notify if. |
||||
|
||||
""" |
||||
kwargs['group'].thread_done(kwargs['thread']) |
||||
|
||||
|
||||
class Thread(object): |
||||
"""Wrapper around a greenthread, that holds a reference to the |
||||
:class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when |
||||
it has done so it can be removed from the threads list. |
||||
""" |
||||
def __init__(self, thread, group): |
||||
self.thread = thread |
||||
self.thread.link(_thread_done, group=group, thread=self) |
||||
|
||||
def stop(self): |
||||
self.thread.kill() |
||||
|
||||
def wait(self): |
||||
return self.thread.wait() |
||||
|
||||
def link(self, func, *args, **kwargs): |
||||
self.thread.link(func, *args, **kwargs) |
||||
|
||||
|
||||
class ThreadGroup(object): |
||||
"""The point of the ThreadGroup class is to: |
||||
|
||||
* keep track of timers and greenthreads (making it easier to stop them |
||||
when need be). |
||||
* provide an easy API to add timers. |
||||
""" |
||||
def __init__(self, thread_pool_size=10): |
||||
self.pool = greenpool.GreenPool(thread_pool_size) |
||||
self.threads = [] |
||||
self.timers = [] |
||||
|
||||
def add_dynamic_timer(self, callback, initial_delay=None, |
||||
periodic_interval_max=None, *args, **kwargs): |
||||
timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs) |
||||
timer.start(initial_delay=initial_delay, |
||||
periodic_interval_max=periodic_interval_max) |
||||
self.timers.append(timer) |
||||
|
||||
def add_timer(self, interval, callback, initial_delay=None, |
||||
*args, **kwargs): |
||||
pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs) |
||||
pulse.start(interval=interval, |
||||
initial_delay=initial_delay) |
||||
self.timers.append(pulse) |
||||
|
||||
def add_thread(self, callback, *args, **kwargs): |
||||
gt = self.pool.spawn(callback, *args, **kwargs) |
||||
th = Thread(gt, self) |
||||
self.threads.append(th) |
||||
return th |
||||
|
||||
def thread_done(self, thread): |
||||
self.threads.remove(thread) |
||||
|
||||
def _stop_threads(self): |
||||
current = threading.current_thread() |
||||
|
||||
# Iterate over a copy of self.threads so thread_done doesn't |
||||
# modify the list while we're iterating |
||||
for x in self.threads[:]: |
||||
if x is current: |
||||
# don't kill the current thread. |
||||
continue |
||||
try: |
||||
x.stop() |
||||
except eventlet.greenlet.GreenletExit: |
||||
pass |
||||
except Exception: |
||||
LOG.exception(_LE('Error stopping thread.')) |
||||
|
||||
def stop_timers(self): |
||||
for x in self.timers: |
||||
try: |
||||
x.stop() |
||||
except Exception: |
||||
LOG.exception(_LE('Error stopping timer.')) |
||||
self.timers = [] |
||||
|
||||
def stop(self, graceful=False): |
||||
"""stop function has the option of graceful=True/False. |
||||
|
||||
* In case of graceful=True, wait for all threads to be finished. |
||||
Never kill threads. |
||||
* In case of graceful=False, kill threads immediately. |
||||
""" |
||||
self.stop_timers() |
||||
if graceful: |
||||
# In case of graceful=True, wait for all threads to be |
||||
# finished, never kill threads |
||||
self.wait() |
||||
else: |
||||
# In case of graceful=False(Default), kill threads |
||||
# immediately |
||||
self._stop_threads() |
||||
|
||||
def wait(self): |
||||
for x in self.timers: |
||||
try: |
||||
x.wait() |
||||
except eventlet.greenlet.GreenletExit: |
||||
pass |
||||
except Exception: |
||||
LOG.exception(_LE('Error waiting on ThreadGroup.')) |
||||
current = threading.current_thread() |
||||
|
||||
# Iterate over a copy of self.threads so thread_done doesn't |
||||
# modify the list while we're iterating |
||||
for x in self.threads[:]: |
||||
if x is current: |
||||
continue |
||||
try: |
||||
x.wait() |
||||
except eventlet.greenlet.GreenletExit: |
||||
pass |
||||
except Exception as ex: |
||||
LOG.exception(ex) |
Loading…
Reference in new issue