From ca0e624eb1a351df32df9d1b3098569f0ec8b1df Mon Sep 17 00:00:00 2001 From: Christian Berendt Date: Tue, 24 Mar 2015 22:09:13 +0100 Subject: [PATCH] Improve faafo-producer * replace python-daemon with service from oslo-incubator * use periodic_tasks from oslo-incubator instead of time.sleep() * several minor overall improvements Change-Id: Ifa25f005dc9dac57a521d516c9251841f7bc08a8 --- faafo/openstack/__init__.py | 0 faafo/openstack/common/__init__.py | 0 faafo/openstack/common/_i18n.py | 45 ++ faafo/openstack/common/eventlet_backdoor.py | 151 ++++++ faafo/openstack/common/loopingcall.py | 147 ++++++ faafo/openstack/common/periodic_task.py | 232 +++++++++ faafo/openstack/common/service.py | 503 ++++++++++++++++++++ faafo/openstack/common/systemd.py | 105 ++++ faafo/openstack/common/threadgroup.py | 149 ++++++ faafo/producer.py | 151 +++--- openstack-common.conf | 8 + requirements.txt | 1 + tox.ini | 2 +- 13 files changed, 1415 insertions(+), 79 deletions(-) create mode 100644 faafo/openstack/__init__.py create mode 100644 faafo/openstack/common/__init__.py create mode 100644 faafo/openstack/common/_i18n.py create mode 100644 faafo/openstack/common/eventlet_backdoor.py create mode 100644 faafo/openstack/common/loopingcall.py create mode 100644 faafo/openstack/common/periodic_task.py create mode 100644 faafo/openstack/common/service.py create mode 100644 faafo/openstack/common/systemd.py create mode 100644 faafo/openstack/common/threadgroup.py create mode 100644 openstack-common.conf diff --git a/faafo/openstack/__init__.py b/faafo/openstack/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/faafo/openstack/common/__init__.py b/faafo/openstack/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/faafo/openstack/common/_i18n.py b/faafo/openstack/common/_i18n.py new file mode 100644 index 0000000..9c516a8 --- /dev/null +++ b/faafo/openstack/common/_i18n.py @@ -0,0 +1,45 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""oslo.i18n integration module. + +See http://docs.openstack.org/developer/oslo.i18n/usage.html + +""" + +try: + import oslo_i18n + + # NOTE(dhellmann): This reference to o-s-l-o will be replaced by the + # application name when this module is synced into the separate + # repository. It is OK to have more than one translation function + # using the same domain, since there will still only be one message + # catalog. + _translators = oslo_i18n.TranslatorFactory(domain='faafo') + + # The primary translation function using the well-known name "_" + _ = _translators.primary + + # Translators for log levels. + # + # The abbreviated names are meant to reflect the usual use of a short + # name like '_'. The "L" is for "log" and the other letter comes from + # the level. + _LI = _translators.log_info + _LW = _translators.log_warning + _LE = _translators.log_error + _LC = _translators.log_critical +except ImportError: + # NOTE(dims): Support for cases where a project wants to use + # code from oslo-incubator, but is not ready to be internationalized + # (like tempest) + _ = _LI = _LW = _LE = _LC = lambda x: x diff --git a/faafo/openstack/common/eventlet_backdoor.py b/faafo/openstack/common/eventlet_backdoor.py new file mode 100644 index 0000000..c098f97 --- /dev/null +++ b/faafo/openstack/common/eventlet_backdoor.py @@ -0,0 +1,151 @@ +# Copyright (c) 2012 OpenStack Foundation. +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import print_function + +import copy +import errno +import gc +import logging +import os +import pprint +import socket +import sys +import traceback + +import eventlet.backdoor +import greenlet +from oslo_config import cfg + +from faafo.openstack.common._i18n import _LI + +help_for_backdoor_port = ( + "Acceptable values are 0, , and :, where 0 results " + "in listening on a random tcp port number; results in listening " + "on the specified port number (and not enabling backdoor if that port " + "is in use); and : results in listening on the smallest " + "unused port number within the specified range of port numbers. The " + "chosen port is displayed in the service's log file.") +eventlet_backdoor_opts = [ + cfg.StrOpt('backdoor_port', + help="Enable eventlet backdoor. %s" % help_for_backdoor_port) +] + +CONF = cfg.CONF +CONF.register_opts(eventlet_backdoor_opts) +LOG = logging.getLogger(__name__) + + +def list_opts(): + """Entry point for oslo-config-generator. + """ + return [(None, copy.deepcopy(eventlet_backdoor_opts))] + + +class EventletBackdoorConfigValueError(Exception): + def __init__(self, port_range, help_msg, ex): + msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. ' + '%(help)s' % + {'range': port_range, 'ex': ex, 'help': help_msg}) + super(EventletBackdoorConfigValueError, self).__init__(msg) + self.port_range = port_range + + +def _dont_use_this(): + print("Don't use this, just disconnect instead") + + +def _find_objects(t): + return [o for o in gc.get_objects() if isinstance(o, t)] + + +def _print_greenthreads(): + for i, gt in enumerate(_find_objects(greenlet.greenlet)): + print(i, gt) + traceback.print_stack(gt.gr_frame) + print() + + +def _print_nativethreads(): + for threadId, stack in sys._current_frames().items(): + print(threadId) + traceback.print_stack(stack) + print() + + +def _parse_port_range(port_range): + if ':' not in port_range: + start, end = port_range, port_range + else: + start, end = port_range.split(':', 1) + try: + start, end = int(start), int(end) + if end < start: + raise ValueError + return start, end + except ValueError as ex: + raise EventletBackdoorConfigValueError(port_range, ex, + help_for_backdoor_port) + + +def _listen(host, start_port, end_port, listen_func): + try_port = start_port + while True: + try: + return listen_func((host, try_port)) + except socket.error as exc: + if (exc.errno != errno.EADDRINUSE or + try_port >= end_port): + raise + try_port += 1 + + +def initialize_if_enabled(): + backdoor_locals = { + 'exit': _dont_use_this, # So we don't exit the entire process + 'quit': _dont_use_this, # So we don't exit the entire process + 'fo': _find_objects, + 'pgt': _print_greenthreads, + 'pnt': _print_nativethreads, + } + + if CONF.backdoor_port is None: + return None + + start_port, end_port = _parse_port_range(str(CONF.backdoor_port)) + + # NOTE(johannes): The standard sys.displayhook will print the value of + # the last expression and set it to __builtin__._, which overwrites + # the __builtin__._ that gettext sets. Let's switch to using pprint + # since it won't interact poorly with gettext, and it's easier to + # read the output too. + def displayhook(val): + if val is not None: + pprint.pprint(val) + sys.displayhook = displayhook + + sock = _listen('localhost', start_port, end_port, eventlet.listen) + + # In the case of backdoor port being zero, a port number is assigned by + # listen(). In any case, pull the port number out here. + port = sock.getsockname()[1] + LOG.info( + _LI('Eventlet backdoor listening on %(port)s for process %(pid)d') % + {'port': port, 'pid': os.getpid()} + ) + eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock, + locals=backdoor_locals) + return port diff --git a/faafo/openstack/common/loopingcall.py b/faafo/openstack/common/loopingcall.py new file mode 100644 index 0000000..e989aa6 --- /dev/null +++ b/faafo/openstack/common/loopingcall.py @@ -0,0 +1,147 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import sys +import time + +from eventlet import event +from eventlet import greenthread + +from faafo.openstack.common._i18n import _LE, _LW + +LOG = logging.getLogger(__name__) + +# NOTE(zyluo): This lambda function was declared to avoid mocking collisions +# with time.time() called in the standard logging module +# during unittests. +_ts = lambda: time.time() + + +class LoopingCallDone(Exception): + """Exception to break out and stop a LoopingCallBase. + + The poll-function passed to LoopingCallBase can raise this exception to + break out of the loop normally. This is somewhat analogous to + StopIteration. + + An optional return-value can be included as the argument to the exception; + this return-value will be returned by LoopingCallBase.wait() + + """ + + def __init__(self, retvalue=True): + """:param retvalue: Value that LoopingCallBase.wait() should return.""" + self.retvalue = retvalue + + +class LoopingCallBase(object): + def __init__(self, f=None, *args, **kw): + self.args = args + self.kw = kw + self.f = f + self._running = False + self.done = None + + def stop(self): + self._running = False + + def wait(self): + return self.done.wait() + + +class FixedIntervalLoopingCall(LoopingCallBase): + """A fixed interval looping call.""" + + def start(self, interval, initial_delay=None): + self._running = True + done = event.Event() + + def _inner(): + if initial_delay: + greenthread.sleep(initial_delay) + + try: + while self._running: + start = _ts() + self.f(*self.args, **self.kw) + end = _ts() + if not self._running: + break + delay = end - start - interval + if delay > 0: + LOG.warn(_LW('task %(func_name)r run outlasted ' + 'interval by %(delay).2f sec'), + {'func_name': self.f, 'delay': delay}) + greenthread.sleep(-delay if delay < 0 else 0) + except LoopingCallDone as e: + self.stop() + done.send(e.retvalue) + except Exception: + LOG.exception(_LE('in fixed duration looping call')) + done.send_exception(*sys.exc_info()) + return + else: + done.send(True) + + self.done = done + + greenthread.spawn_n(_inner) + return self.done + + +class DynamicLoopingCall(LoopingCallBase): + """A looping call which sleeps until the next known event. + + The function called should return how long to sleep for before being + called again. + """ + + def start(self, initial_delay=None, periodic_interval_max=None): + self._running = True + done = event.Event() + + def _inner(): + if initial_delay: + greenthread.sleep(initial_delay) + + try: + while self._running: + idle = self.f(*self.args, **self.kw) + if not self._running: + break + + if periodic_interval_max is not None: + idle = min(idle, periodic_interval_max) + LOG.debug('Dynamic looping call %(func_name)r sleeping ' + 'for %(idle).02f seconds', + {'func_name': self.f, 'idle': idle}) + greenthread.sleep(idle) + except LoopingCallDone as e: + self.stop() + done.send(e.retvalue) + except Exception: + LOG.exception(_LE('in dynamic looping call')) + done.send_exception(*sys.exc_info()) + return + else: + done.send(True) + + self.done = done + + greenthread.spawn(_inner) + return self.done diff --git a/faafo/openstack/common/periodic_task.py b/faafo/openstack/common/periodic_task.py new file mode 100644 index 0000000..63094de --- /dev/null +++ b/faafo/openstack/common/periodic_task.py @@ -0,0 +1,232 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import logging +import random +import time + +from oslo_config import cfg +import six + +from faafo.openstack.common._i18n import _, _LE, _LI + + +periodic_opts = [ + cfg.BoolOpt('run_external_periodic_tasks', + default=True, + help='Some periodic tasks can be run in a separate process. ' + 'Should we run them here?'), +] + +CONF = cfg.CONF +CONF.register_opts(periodic_opts) + +LOG = logging.getLogger(__name__) + +DEFAULT_INTERVAL = 60.0 + + +def list_opts(): + """Entry point for oslo-config-generator.""" + return [(None, copy.deepcopy(periodic_opts))] + + +class InvalidPeriodicTaskArg(Exception): + message = _("Unexpected argument for periodic task creation: %(arg)s.") + + +def periodic_task(*args, **kwargs): + """Decorator to indicate that a method is a periodic task. + + This decorator can be used in two ways: + + 1. Without arguments '@periodic_task', this will be run on the default + interval of 60 seconds. + + 2. With arguments: + @periodic_task(spacing=N [, run_immediately=[True|False]] + [, name=[None|"string"]) + this will be run on approximately every N seconds. If this number is + negative the periodic task will be disabled. If the run_immediately + argument is provided and has a value of 'True', the first run of the + task will be shortly after task scheduler starts. If + run_immediately is omitted or set to 'False', the first time the + task runs will be approximately N seconds after the task scheduler + starts. If name is not provided, __name__ of function is used. + """ + def decorator(f): + # Test for old style invocation + if 'ticks_between_runs' in kwargs: + raise InvalidPeriodicTaskArg(arg='ticks_between_runs') + + # Control if run at all + f._periodic_task = True + f._periodic_external_ok = kwargs.pop('external_process_ok', False) + if f._periodic_external_ok and not CONF.run_external_periodic_tasks: + f._periodic_enabled = False + else: + f._periodic_enabled = kwargs.pop('enabled', True) + f._periodic_name = kwargs.pop('name', f.__name__) + + # Control frequency + f._periodic_spacing = kwargs.pop('spacing', 0) + f._periodic_immediate = kwargs.pop('run_immediately', False) + if f._periodic_immediate: + f._periodic_last_run = None + else: + f._periodic_last_run = time.time() + return f + + # NOTE(sirp): The `if` is necessary to allow the decorator to be used with + # and without parenthesis. + # + # In the 'with-parenthesis' case (with kwargs present), this function needs + # to return a decorator function since the interpreter will invoke it like: + # + # periodic_task(*args, **kwargs)(f) + # + # In the 'without-parenthesis' case, the original function will be passed + # in as the first argument, like: + # + # periodic_task(f) + if kwargs: + return decorator + else: + return decorator(args[0]) + + +class _PeriodicTasksMeta(type): + def _add_periodic_task(cls, task): + """Add a periodic task to the list of periodic tasks. + + The task should already be decorated by @periodic_task. + + :return: whether task was actually enabled + """ + name = task._periodic_name + + if task._periodic_spacing < 0: + LOG.info(_LI('Skipping periodic task %(task)s because ' + 'its interval is negative'), + {'task': name}) + return False + if not task._periodic_enabled: + LOG.info(_LI('Skipping periodic task %(task)s because ' + 'it is disabled'), + {'task': name}) + return False + + # A periodic spacing of zero indicates that this task should + # be run on the default interval to avoid running too + # frequently. + if task._periodic_spacing == 0: + task._periodic_spacing = DEFAULT_INTERVAL + + cls._periodic_tasks.append((name, task)) + cls._periodic_spacing[name] = task._periodic_spacing + return True + + def __init__(cls, names, bases, dict_): + """Metaclass that allows us to collect decorated periodic tasks.""" + super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_) + + # NOTE(sirp): if the attribute is not present then we must be the base + # class, so, go ahead an initialize it. If the attribute is present, + # then we're a subclass so make a copy of it so we don't step on our + # parent's toes. + try: + cls._periodic_tasks = cls._periodic_tasks[:] + except AttributeError: + cls._periodic_tasks = [] + + try: + cls._periodic_spacing = cls._periodic_spacing.copy() + except AttributeError: + cls._periodic_spacing = {} + + for value in cls.__dict__.values(): + if getattr(value, '_periodic_task', False): + cls._add_periodic_task(value) + + +def _nearest_boundary(last_run, spacing): + """Find nearest boundary which is in the past, which is a multiple of the + spacing with the last run as an offset. + + Eg if last run was 10 and spacing was 7, the new last run could be: 17, 24, + 31, 38... + + 0% to 5% of the spacing value will be added to this value to ensure tasks + do not synchronize. This jitter is rounded to the nearest second, this + means that spacings smaller than 20 seconds will not have jitter. + """ + current_time = time.time() + if last_run is None: + return current_time + delta = current_time - last_run + offset = delta % spacing + # Add up to 5% jitter + jitter = int(spacing * (random.random() / 20)) + return current_time - offset + jitter + + +@six.add_metaclass(_PeriodicTasksMeta) +class PeriodicTasks(object): + def __init__(self): + super(PeriodicTasks, self).__init__() + self._periodic_last_run = {} + for name, task in self._periodic_tasks: + self._periodic_last_run[name] = task._periodic_last_run + + def add_periodic_task(self, task): + """Add a periodic task to the list of periodic tasks. + + The task should already be decorated by @periodic_task. + """ + if self.__class__._add_periodic_task(task): + self._periodic_last_run[task._periodic_name] = ( + task._periodic_last_run) + + def run_periodic_tasks(self, context, raise_on_error=False): + """Tasks to be run at a periodic interval.""" + idle_for = DEFAULT_INTERVAL + for task_name, task in self._periodic_tasks: + full_task_name = '.'.join([self.__class__.__name__, task_name]) + + spacing = self._periodic_spacing[task_name] + last_run = self._periodic_last_run[task_name] + + # Check if due, if not skip + idle_for = min(idle_for, spacing) + if last_run is not None: + delta = last_run + spacing - time.time() + if delta > 0: + idle_for = min(idle_for, delta) + continue + + LOG.debug("Running periodic task %(full_task_name)s", + {"full_task_name": full_task_name}) + self._periodic_last_run[task_name] = _nearest_boundary( + last_run, spacing) + + try: + task(self, context) + except Exception as e: + if raise_on_error: + raise + LOG.exception(_LE("Error during %(full_task_name)s: %(e)s"), + {"full_task_name": full_task_name, "e": e}) + time.sleep(0) + + return idle_for diff --git a/faafo/openstack/common/service.py b/faafo/openstack/common/service.py new file mode 100644 index 0000000..06ff2d9 --- /dev/null +++ b/faafo/openstack/common/service.py @@ -0,0 +1,503 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Generic Node base class for all workers that run on hosts.""" + +import errno +import logging +import os +import random +import signal +import sys +import time + +try: + # Importing just the symbol here because the io module does not + # exist in Python 2.6. + from io import UnsupportedOperation # noqa +except ImportError: + # Python 2.6 + UnsupportedOperation = None + +import eventlet +from eventlet import event +from oslo_config import cfg + +from faafo.openstack.common import eventlet_backdoor +from faafo.openstack.common._i18n import _LE, _LI, _LW +from faafo.openstack.common import systemd +from faafo.openstack.common import threadgroup + + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +def _sighup_supported(): + return hasattr(signal, 'SIGHUP') + + +def _is_daemon(): + # The process group for a foreground process will match the + # process group of the controlling terminal. If those values do + # not match, or ioctl() fails on the stdout file handle, we assume + # the process is running in the background as a daemon. + # http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics + try: + is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno()) + except OSError as err: + if err.errno == errno.ENOTTY: + # Assume we are a daemon because there is no terminal. + is_daemon = True + else: + raise + except UnsupportedOperation: + # Could not get the fileno for stdout, so we must be a daemon. + is_daemon = True + return is_daemon + + +def _is_sighup_and_daemon(signo): + if not (_sighup_supported() and signo == signal.SIGHUP): + # Avoid checking if we are a daemon, because the signal isn't + # SIGHUP. + return False + return _is_daemon() + + +def _signo_to_signame(signo): + signals = {signal.SIGTERM: 'SIGTERM', + signal.SIGINT: 'SIGINT'} + if _sighup_supported(): + signals[signal.SIGHUP] = 'SIGHUP' + return signals[signo] + + +def _set_signals_handler(handler): + signal.signal(signal.SIGTERM, handler) + signal.signal(signal.SIGINT, handler) + if _sighup_supported(): + signal.signal(signal.SIGHUP, handler) + + +class Launcher(object): + """Launch one or more services and wait for them to complete.""" + + def __init__(self): + """Initialize the service launcher. + + :returns: None + + """ + self.services = Services() + self.backdoor_port = eventlet_backdoor.initialize_if_enabled() + + def launch_service(self, service): + """Load and start the given service. + + :param service: The service you would like to start. + :returns: None + + """ + service.backdoor_port = self.backdoor_port + self.services.add(service) + + def stop(self): + """Stop all services which are currently running. + + :returns: None + + """ + self.services.stop() + + def wait(self): + """Waits until all services have been stopped, and then returns. + + :returns: None + + """ + self.services.wait() + + def restart(self): + """Reload config files and restart service. + + :returns: None + + """ + cfg.CONF.reload_config_files() + self.services.restart() + + +class SignalExit(SystemExit): + def __init__(self, signo, exccode=1): + super(SignalExit, self).__init__(exccode) + self.signo = signo + + +class ServiceLauncher(Launcher): + def _handle_signal(self, signo, frame): + # Allow the process to be killed again and die from natural causes + _set_signals_handler(signal.SIG_DFL) + raise SignalExit(signo) + + def handle_signal(self): + _set_signals_handler(self._handle_signal) + + def _wait_for_exit_or_signal(self, ready_callback=None): + status = None + signo = 0 + + LOG.debug('Full set of CONF:') + CONF.log_opt_values(LOG, logging.DEBUG) + + try: + if ready_callback: + ready_callback() + super(ServiceLauncher, self).wait() + except SignalExit as exc: + signame = _signo_to_signame(exc.signo) + LOG.info(_LI('Caught %s, exiting'), signame) + status = exc.code + signo = exc.signo + except SystemExit as exc: + status = exc.code + finally: + self.stop() + + return status, signo + + def wait(self, ready_callback=None): + systemd.notify_once() + while True: + self.handle_signal() + status, signo = self._wait_for_exit_or_signal(ready_callback) + if not _is_sighup_and_daemon(signo): + return status + self.restart() + + +class ServiceWrapper(object): + def __init__(self, service, workers): + self.service = service + self.workers = workers + self.children = set() + self.forktimes = [] + + +class ProcessLauncher(object): + _signal_handlers_set = set() + + @classmethod + def _handle_class_signals(cls, *args, **kwargs): + for handler in cls._signal_handlers_set: + handler(*args, **kwargs) + + def __init__(self): + """Constructor.""" + + self.children = {} + self.sigcaught = None + self.running = True + rfd, self.writepipe = os.pipe() + self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') + self.handle_signal() + + def handle_signal(self): + self._signal_handlers_set.add(self._handle_signal) + _set_signals_handler(self._handle_class_signals) + + def _handle_signal(self, signo, frame): + self.sigcaught = signo + self.running = False + + # Allow the process to be killed again and die from natural causes + _set_signals_handler(signal.SIG_DFL) + + def _pipe_watcher(self): + # This will block until the write end is closed when the parent + # dies unexpectedly + self.readpipe.read() + + LOG.info(_LI('Parent process has died unexpectedly, exiting')) + + sys.exit(1) + + def _child_process_handle_signal(self): + # Setup child signal handlers differently + def _sigterm(*args): + signal.signal(signal.SIGTERM, signal.SIG_DFL) + raise SignalExit(signal.SIGTERM) + + def _sighup(*args): + signal.signal(signal.SIGHUP, signal.SIG_DFL) + raise SignalExit(signal.SIGHUP) + + signal.signal(signal.SIGTERM, _sigterm) + if _sighup_supported(): + signal.signal(signal.SIGHUP, _sighup) + # Block SIGINT and let the parent send us a SIGTERM + signal.signal(signal.SIGINT, signal.SIG_IGN) + + def _child_wait_for_exit_or_signal(self, launcher): + status = 0 + signo = 0 + + # NOTE(johannes): All exceptions are caught to ensure this + # doesn't fallback into the loop spawning children. It would + # be bad for a child to spawn more children. + try: + launcher.wait() + except SignalExit as exc: + signame = _signo_to_signame(exc.signo) + LOG.info(_LI('Child caught %s, exiting'), signame) + status = exc.code + signo = exc.signo + except SystemExit as exc: + status = exc.code + except BaseException: + LOG.exception(_LE('Unhandled exception')) + status = 2 + finally: + launcher.stop() + + return status, signo + + def _child_process(self, service): + self._child_process_handle_signal() + + # Reopen the eventlet hub to make sure we don't share an epoll + # fd with parent and/or siblings, which would be bad + eventlet.hubs.use_hub() + + # Close write to ensure only parent has it open + os.close(self.writepipe) + # Create greenthread to watch for parent to close pipe + eventlet.spawn_n(self._pipe_watcher) + + # Reseed random number generator + random.seed() + + launcher = Launcher() + launcher.launch_service(service) + return launcher + + def _start_child(self, wrap): + if len(wrap.forktimes) > wrap.workers: + # Limit ourselves to one process a second (over the period of + # number of workers * 1 second). This will allow workers to + # start up quickly but ensure we don't fork off children that + # die instantly too quickly. + if time.time() - wrap.forktimes[0] < wrap.workers: + LOG.info(_LI('Forking too fast, sleeping')) + time.sleep(1) + + wrap.forktimes.pop(0) + + wrap.forktimes.append(time.time()) + + pid = os.fork() + if pid == 0: + launcher = self._child_process(wrap.service) + while True: + self._child_process_handle_signal() + status, signo = self._child_wait_for_exit_or_signal(launcher) + if not _is_sighup_and_daemon(signo): + break + launcher.restart() + + os._exit(status) + + LOG.info(_LI('Started child %d'), pid) + + wrap.children.add(pid) + self.children[pid] = wrap + + return pid + + def launch_service(self, service, workers=1): + wrap = ServiceWrapper(service, workers) + + LOG.info(_LI('Starting %d workers'), wrap.workers) + while self.running and len(wrap.children) < wrap.workers: + self._start_child(wrap) + + def _wait_child(self): + try: + # Block while any of child processes have exited + pid, status = os.waitpid(0, 0) + if not pid: + return None + except OSError as exc: + if exc.errno not in (errno.EINTR, errno.ECHILD): + raise + return None + + if os.WIFSIGNALED(status): + sig = os.WTERMSIG(status) + LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'), + dict(pid=pid, sig=sig)) + else: + code = os.WEXITSTATUS(status) + LOG.info(_LI('Child %(pid)s exited with status %(code)d'), + dict(pid=pid, code=code)) + + if pid not in self.children: + LOG.warning(_LW('pid %d not in child list'), pid) + return None + + wrap = self.children.pop(pid) + wrap.children.remove(pid) + return wrap + + def _respawn_children(self): + while self.running: + wrap = self._wait_child() + if not wrap: + continue + while self.running and len(wrap.children) < wrap.workers: + self._start_child(wrap) + + def wait(self): + """Loop waiting on children to die and respawning as necessary.""" + + systemd.notify_once() + LOG.debug('Full set of CONF:') + CONF.log_opt_values(LOG, logging.DEBUG) + + try: + while True: + self.handle_signal() + self._respawn_children() + # No signal means that stop was called. Don't clean up here. + if not self.sigcaught: + return + + signame = _signo_to_signame(self.sigcaught) + LOG.info(_LI('Caught %s, stopping children'), signame) + if not _is_sighup_and_daemon(self.sigcaught): + break + + for pid in self.children: + os.kill(pid, signal.SIGHUP) + self.running = True + self.sigcaught = None + except eventlet.greenlet.GreenletExit: + LOG.info(_LI("Wait called after thread killed. Cleaning up.")) + + self.stop() + + def stop(self): + """Terminate child processes and wait on each.""" + self.running = False + for pid in self.children: + try: + os.kill(pid, signal.SIGTERM) + except OSError as exc: + if exc.errno != errno.ESRCH: + raise + + # Wait for children to die + if self.children: + LOG.info(_LI('Waiting on %d children to exit'), len(self.children)) + while self.children: + self._wait_child() + + +class Service(object): + """Service object for binaries running on hosts.""" + + def __init__(self, threads=1000): + self.tg = threadgroup.ThreadGroup(threads) + + # signal that the service is done shutting itself down: + self._done = event.Event() + + def reset(self): + # NOTE(Fengqian): docs for Event.reset() recommend against using it + self._done = event.Event() + + def start(self): + pass + + def stop(self, graceful=False): + self.tg.stop(graceful) + self.tg.wait() + # Signal that service cleanup is done: + if not self._done.ready(): + self._done.send() + + def wait(self): + self._done.wait() + + +class Services(object): + + def __init__(self): + self.services = [] + self.tg = threadgroup.ThreadGroup() + self.done = event.Event() + + def add(self, service): + self.services.append(service) + self.tg.add_thread(self.run_service, service, self.done) + + def stop(self): + # wait for graceful shutdown of services: + for service in self.services: + service.stop() + service.wait() + + # Each service has performed cleanup, now signal that the run_service + # wrapper threads can now die: + if not self.done.ready(): + self.done.send() + + # reap threads: + self.tg.stop() + + def wait(self): + self.tg.wait() + + def restart(self): + self.stop() + self.done = event.Event() + for restart_service in self.services: + restart_service.reset() + self.tg.add_thread(self.run_service, restart_service, self.done) + + @staticmethod + def run_service(service, done): + """Service start wrapper. + + :param service: service to run + :param done: event to wait on until a shutdown is triggered + :returns: None + + """ + service.start() + done.wait() + + +def launch(service, workers=1): + if workers is None or workers == 1: + launcher = ServiceLauncher() + launcher.launch_service(service) + else: + launcher = ProcessLauncher() + launcher.launch_service(service, workers=workers) + + return launcher diff --git a/faafo/openstack/common/systemd.py b/faafo/openstack/common/systemd.py new file mode 100644 index 0000000..36243b3 --- /dev/null +++ b/faafo/openstack/common/systemd.py @@ -0,0 +1,105 @@ +# Copyright 2012-2014 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Helper module for systemd service readiness notification. +""" + +import logging +import os +import socket +import sys + + +LOG = logging.getLogger(__name__) + + +def _abstractify(socket_name): + if socket_name.startswith('@'): + # abstract namespace socket + socket_name = '\0%s' % socket_name[1:] + return socket_name + + +def _sd_notify(unset_env, msg): + notify_socket = os.getenv('NOTIFY_SOCKET') + if notify_socket: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + try: + sock.connect(_abstractify(notify_socket)) + sock.sendall(msg) + if unset_env: + del os.environ['NOTIFY_SOCKET'] + except EnvironmentError: + LOG.debug("Systemd notification failed", exc_info=True) + finally: + sock.close() + + +def notify(): + """Send notification to Systemd that service is ready. + + For details see + http://www.freedesktop.org/software/systemd/man/sd_notify.html + """ + _sd_notify(False, 'READY=1') + + +def notify_once(): + """Send notification once to Systemd that service is ready. + + Systemd sets NOTIFY_SOCKET environment variable with the name of the + socket listening for notifications from services. + This method removes the NOTIFY_SOCKET environment variable to ensure + notification is sent only once. + """ + _sd_notify(True, 'READY=1') + + +def onready(notify_socket, timeout): + """Wait for systemd style notification on the socket. + + :param notify_socket: local socket address + :type notify_socket: string + :param timeout: socket timeout + :type timeout: float + :returns: 0 service ready + 1 service not ready + 2 timeout occurred + """ + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + sock.settimeout(timeout) + sock.bind(_abstractify(notify_socket)) + try: + msg = sock.recv(512) + except socket.timeout: + return 2 + finally: + sock.close() + if 'READY=1' in msg: + return 0 + else: + return 1 + + +if __name__ == '__main__': + # simple CLI for testing + if len(sys.argv) == 1: + notify() + elif len(sys.argv) >= 2: + timeout = float(sys.argv[1]) + notify_socket = os.getenv('NOTIFY_SOCKET') + if notify_socket: + retval = onready(notify_socket, timeout) + sys.exit(retval) diff --git a/faafo/openstack/common/threadgroup.py b/faafo/openstack/common/threadgroup.py new file mode 100644 index 0000000..d7e92ba --- /dev/null +++ b/faafo/openstack/common/threadgroup.py @@ -0,0 +1,149 @@ +# Copyright 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import logging +import threading + +import eventlet +from eventlet import greenpool + +from faafo.openstack.common import loopingcall + + +LOG = logging.getLogger(__name__) + + +def _thread_done(gt, *args, **kwargs): + """Callback function to be passed to GreenThread.link() when we spawn() + Calls the :class:`ThreadGroup` to notify if. + + """ + kwargs['group'].thread_done(kwargs['thread']) + + +class Thread(object): + """Wrapper around a greenthread, that holds a reference to the + :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when + it has done so it can be removed from the threads list. + """ + def __init__(self, thread, group): + self.thread = thread + self.thread.link(_thread_done, group=group, thread=self) + + def stop(self): + self.thread.kill() + + def wait(self): + return self.thread.wait() + + def link(self, func, *args, **kwargs): + self.thread.link(func, *args, **kwargs) + + +class ThreadGroup(object): + """The point of the ThreadGroup class is to: + + * keep track of timers and greenthreads (making it easier to stop them + when need be). + * provide an easy API to add timers. + """ + def __init__(self, thread_pool_size=10): + self.pool = greenpool.GreenPool(thread_pool_size) + self.threads = [] + self.timers = [] + + def add_dynamic_timer(self, callback, initial_delay=None, + periodic_interval_max=None, *args, **kwargs): + timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs) + timer.start(initial_delay=initial_delay, + periodic_interval_max=periodic_interval_max) + self.timers.append(timer) + + def add_timer(self, interval, callback, initial_delay=None, + *args, **kwargs): + pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs) + pulse.start(interval=interval, + initial_delay=initial_delay) + self.timers.append(pulse) + + def add_thread(self, callback, *args, **kwargs): + gt = self.pool.spawn(callback, *args, **kwargs) + th = Thread(gt, self) + self.threads.append(th) + return th + + def thread_done(self, thread): + self.threads.remove(thread) + + def _stop_threads(self): + current = threading.current_thread() + + # Iterate over a copy of self.threads so thread_done doesn't + # modify the list while we're iterating + for x in self.threads[:]: + if x is current: + # don't kill the current thread. + continue + try: + x.stop() + except eventlet.greenlet.GreenletExit: + pass + except Exception as ex: + LOG.exception(ex) + + def stop_timers(self): + for x in self.timers: + try: + x.stop() + except Exception as ex: + LOG.exception(ex) + self.timers = [] + + def stop(self, graceful=False): + """stop function has the option of graceful=True/False. + + * In case of graceful=True, wait for all threads to be finished. + Never kill threads. + * In case of graceful=False, kill threads immediately. + """ + self.stop_timers() + if graceful: + # In case of graceful=True, wait for all threads to be + # finished, never kill threads + self.wait() + else: + # In case of graceful=False(Default), kill threads + # immediately + self._stop_threads() + + def wait(self): + for x in self.timers: + try: + x.wait() + except eventlet.greenlet.GreenletExit: + pass + except Exception as ex: + LOG.exception(ex) + current = threading.current_thread() + + # Iterate over a copy of self.threads so thread_done doesn't + # modify the list while we're iterating + for x in self.threads[:]: + if x is current: + continue + try: + x.wait() + except eventlet.greenlet.GreenletExit: + pass + except Exception as ex: + LOG.exception(ex) diff --git a/faafo/producer.py b/faafo/producer.py index 4d65f26..d37d39d 100644 --- a/faafo/producer.py +++ b/faafo/producer.py @@ -14,27 +14,23 @@ import json import random -import sys -import time import uuid -import daemon import kombu from kombu.pools import producers from oslo_config import cfg from oslo_log import log import requests +from faafo.openstack.common import periodic_task +from faafo.openstack.common import service from faafo import queues from faafo import version -CONF = cfg.CONF +LOG = log.getLogger('faafo.producer') cli_opts = [ - cfg.BoolOpt('daemonize', - default=False, - help='Run in background.'), cfg.StrOpt('amqp-url', default='amqp://faafo:secretsecret@localhost:5672/', help='AMQP connection URL'), @@ -74,96 +70,95 @@ producer_opts = [ help="The minimum value for the parameter 'yb'."), cfg.IntOpt("min-iterations", default=128, help="The minimum number of iterations."), - cfg.FloatOpt("min-pause", default=1.0, - help="The minimum pause in seconds."), - cfg.FloatOpt("max-pause", default=10.0, - help="The maximum pause in seconds."), cfg.IntOpt("min-tasks", default=1, help="The minimum number of generated tasks."), cfg.IntOpt("max-tasks", default=10, - help="The maximum number of generated tasks.") + help="The maximum number of generated tasks."), + cfg.IntOpt("interval", default=10, help="Interval in seconds.") ] -CONF.register_cli_opts(cli_opts) -CONF.register_cli_opts(producer_opts) - -log.register_options(CONF) -log.setup(CONF, 'producer', version=version.version_info.version_string()) -log.set_defaults() - -CONF(args=sys.argv[1:], - project='producer', - version=version.version_info.version_string()) - -LOG = log.getLogger(__name__) +cfg.CONF.register_cli_opts(cli_opts) +cfg.CONF.register_cli_opts(producer_opts) -def generate_task(): - random.seed() +class ProducerService(service.Service, periodic_task.PeriodicTasks): + def __init__(self): + super(ProducerService, self).__init__() + self.messaging = kombu.Connection(cfg.CONF.amqp_url) + self._periodic_last_run = {} - width = random.randint(CONF.min_width, CONF.max_width) - height = random.randint(CONF.min_height, CONF.max_height) - iterations = random.randint(CONF.min_iterations, CONF.max_iterations) - xa = random.uniform(CONF.min_xa, CONF.max_xa) - xb = random.uniform(CONF.min_xb, CONF.max_xb) - ya = random.uniform(CONF.min_ya, CONF.max_ya) - yb = random.uniform(CONF.min_yb, CONF.max_yb) + @periodic_task.periodic_task(spacing=cfg.CONF.interval, + run_immediately=True) + def generate_task(self, context): + random.seed() + number = random.randint(cfg.CONF.min_tasks, cfg.CONF.max_tasks) + LOG.info("generating %d task(s)" % number) + for i in xrange(0, number): + task = self.get_random_task() + # NOTE(berendt): only necessary when using requests < 2.4.2 + headers = {'Content-type': 'application/json', + 'Accept': 'text/plain'} + requests.post("%s/api/fractal" % cfg.CONF.api_url, + json.dumps(task), headers=headers) + LOG.info("generated task: %s" % task) + with producers[self.messaging].acquire(block=True) as producer: + producer.publish( + task, + serializer='pickle', + exchange=queues.task_exchange, + declare=[queues.task_exchange], + routing_key='tasks') - task = { - 'uuid': str(uuid.uuid4()), - 'width': width, - 'height': height, - 'iterations': iterations, - 'xa': xa, - 'xb': xb, - 'ya': ya, - 'yb': yb - } + self.add_periodic_task(generate_task) + self.tg.add_dynamic_timer(self.periodic_tasks) - return task + def periodic_tasks(self): + """Tasks to be run at a periodic interval.""" + return self.run_periodic_tasks(None) - -def run(messaging, api_url): - while True: + @staticmethod + def get_random_task(): random.seed() - number = random.randint(CONF.min_tasks, CONF.max_tasks) - LOG.info("generating %d task(s)" % number) - for i in xrange(0, number): - task = generate_task() - # NOTE(berendt): only necessary when using requests < 2.4.2 - headers = {'Content-type': 'application/json', - 'Accept': 'text/plain'} - requests.post("%s/api/fractal" % api_url, json.dumps(task), - headers=headers) - LOG.info("generated task: %s" % task) - with producers[messaging].acquire(block=True) as producer: - producer.publish( - task, - serializer='pickle', - exchange=queues.task_exchange, - declare=[queues.task_exchange], - routing_key='tasks') - if CONF.one_shot: - break + width = random.randint(cfg.CONF.min_width, cfg.CONF.max_width) + height = random.randint(cfg.CONF.min_height, cfg.CONF.max_height) + iterations = random.randint(cfg.CONF.min_iterations, + cfg.CONF.max_iterations) + xa = random.uniform(cfg.CONF.min_xa, cfg.CONF.max_xa) + xb = random.uniform(cfg.CONF.min_xb, cfg.CONF.max_xb) + ya = random.uniform(cfg.CONF.min_ya, cfg.CONF.max_ya) + yb = random.uniform(cfg.CONF.min_yb, cfg.CONF.max_yb) - pause = random.uniform(CONF.min_pause, CONF.max_pause) - LOG.info("sleeping for %f seconds" % pause) - time.sleep(pause) + task = { + 'uuid': str(uuid.uuid4()), + 'width': width, + 'height': height, + 'iterations': iterations, + 'xa': xa, + 'xb': xb, + 'ya': ya, + 'yb': yb + } + + return task def main(): - messaging = kombu.Connection(CONF.amqp_url) + log.register_options(cfg.CONF) + log.setup(cfg.CONF, 'producer', + version=version.version_info.version_string()) + log.set_defaults() - if CONF.daemonize: - with daemon.DaemonContext(): - run(messaging, CONF.api_url) + cfg.CONF(project='producer', prog='faafo-producer', + version=version.version_info.version_string()) + + srv = ProducerService() + + if cfg.CONF.one_shot: + srv.periodic_tasks() else: - try: - run(messaging, CONF.api_url) - except Exception as e: - sys.exit("ERROR: %s" % e) - + launcher = service.launch(srv) + launcher.wait() if __name__ == '__main__': main() diff --git a/openstack-common.conf b/openstack-common.conf new file mode 100644 index 0000000..11527ef --- /dev/null +++ b/openstack-common.conf @@ -0,0 +1,8 @@ +[DEFAULT] + +# The list of modules to copy from oslo-incubator.git +module=periodic_task +module=service + +# The base module to hold the copy of openstack.common +base=faafo diff --git a/requirements.txt b/requirements.txt index b987bb5..d57f730 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ pbr>=0.6,!=0.7,<1.0 anyjson>=0.3.3 amqp +eventlet>=0.16.1,!=0.17.0 kombu>=2.5.0 PyMySQL>=0.6.2 # MIT License Pillow==2.4.0 # MIT diff --git a/tox.ini b/tox.ini index 28f1c4d..e95986e 100644 --- a/tox.ini +++ b/tox.ini @@ -20,4 +20,4 @@ commands = flake8 {posargs} [flake8] show-source = True -exclude=.venv,.git,.tox,*egg*,build +exclude=.venv,.git,.tox,*egg*,build,*openstack/common*