Sync service and threadgroup modules from oslo
blueprint use-oslo-services Change-Id: I8641dbed0a83a6d57c0b6c1a02796eb3e633fafd
This commit is contained in:
parent
e4f05bada6
commit
1298b1880a
|
@ -41,6 +41,8 @@ import json
|
|||
import types
|
||||
import xmlrpclib
|
||||
|
||||
import six
|
||||
|
||||
from nova.openstack.common import timeutils
|
||||
|
||||
|
||||
|
@ -93,7 +95,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
|
|||
# value of itertools.count doesn't get caught by nasty_type_tests
|
||||
# and results in infinite loop when list(value) is called.
|
||||
if type(value) == itertools.count:
|
||||
return unicode(value)
|
||||
return six.text_type(value)
|
||||
|
||||
# FIXME(vish): Workaround for LP bug 852095. Without this workaround,
|
||||
# tests that raise an exception in a mocked method that
|
||||
|
@ -137,12 +139,12 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
|
|||
return recursive(value.__dict__, level=level + 1)
|
||||
else:
|
||||
if any(test(value) for test in _nasty_type_tests):
|
||||
return unicode(value)
|
||||
return six.text_type(value)
|
||||
return value
|
||||
except TypeError:
|
||||
# Class objects are tricky since they may define something like
|
||||
# __iter__ defined but it isn't callable as list().
|
||||
return unicode(value)
|
||||
return six.text_type(value)
|
||||
|
||||
|
||||
def dumps(value, default=to_primitive, **kwargs):
|
||||
|
|
|
@ -43,9 +43,9 @@ import traceback
|
|||
from oslo.config import cfg
|
||||
|
||||
from nova.openstack.common.gettextutils import _
|
||||
from nova.openstack.common import importutils
|
||||
from nova.openstack.common import jsonutils
|
||||
from nova.openstack.common import local
|
||||
from nova.openstack.common import notifier
|
||||
|
||||
|
||||
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
|
||||
|
@ -322,17 +322,6 @@ class JSONFormatter(logging.Formatter):
|
|||
return jsonutils.dumps(message)
|
||||
|
||||
|
||||
class PublishErrorsHandler(logging.Handler):
|
||||
def emit(self, record):
|
||||
if ('nova.openstack.common.notifier.log_notifier' in
|
||||
CONF.notification_driver):
|
||||
return
|
||||
notifier.api.notify(None, 'error.publisher',
|
||||
'error_notification',
|
||||
notifier.api.ERROR,
|
||||
dict(error=record.msg))
|
||||
|
||||
|
||||
def _create_logging_excepthook(product_name):
|
||||
def logging_excepthook(type, value, tb):
|
||||
extra = {}
|
||||
|
@ -428,7 +417,10 @@ def _setup_logging_from_conf():
|
|||
log_root.addHandler(streamlog)
|
||||
|
||||
if CONF.publish_errors:
|
||||
log_root.addHandler(PublishErrorsHandler(logging.ERROR))
|
||||
handler = importutils.import_object(
|
||||
"nova.openstack.common.log_handler.PublishErrorsHandler",
|
||||
logging.ERROR)
|
||||
log_root.addHandler(handler)
|
||||
|
||||
datefmt = CONF.log_date_format
|
||||
for handler in log_root.handlers:
|
||||
|
|
|
@ -123,7 +123,7 @@ def execute(*cmd, **kwargs):
|
|||
elif isinstance(check_exit_code, int):
|
||||
check_exit_code = [check_exit_code]
|
||||
|
||||
if len(kwargs):
|
||||
if kwargs:
|
||||
raise UnknownArgumentError(_('Got unknown keyword args '
|
||||
'to utils.execute: %r') % kwargs)
|
||||
|
||||
|
|
|
@ -0,0 +1,333 @@
|
|||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# Copyright 2011 Justin Santa Barbara
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Generic Node base class for all workers that run on hosts."""
|
||||
|
||||
import errno
|
||||
import os
|
||||
import random
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
|
||||
import eventlet
|
||||
import logging as std_logging
|
||||
from oslo.config import cfg
|
||||
|
||||
from nova.openstack.common import eventlet_backdoor
|
||||
from nova.openstack.common.gettextutils import _
|
||||
from nova.openstack.common import importutils
|
||||
from nova.openstack.common import log as logging
|
||||
from nova.openstack.common import threadgroup
|
||||
|
||||
|
||||
rpc = importutils.try_import('nova.openstack.common.rpc')
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Launcher(object):
|
||||
"""Launch one or more services and wait for them to complete."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the service launcher.
|
||||
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
self._services = threadgroup.ThreadGroup()
|
||||
self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
|
||||
|
||||
@staticmethod
|
||||
def run_service(service):
|
||||
"""Start and wait for a service to finish.
|
||||
|
||||
:param service: service to run and wait for.
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
service.start()
|
||||
service.wait()
|
||||
|
||||
def launch_service(self, service):
|
||||
"""Load and start the given service.
|
||||
|
||||
:param service: The service you would like to start.
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
service.backdoor_port = self.backdoor_port
|
||||
self._services.add_thread(self.run_service, service)
|
||||
|
||||
def stop(self):
|
||||
"""Stop all services which are currently running.
|
||||
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
self._services.stop()
|
||||
|
||||
def wait(self):
|
||||
"""Waits until all services have been stopped, and then returns.
|
||||
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
self._services.wait()
|
||||
|
||||
|
||||
class SignalExit(SystemExit):
|
||||
def __init__(self, signo, exccode=1):
|
||||
super(SignalExit, self).__init__(exccode)
|
||||
self.signo = signo
|
||||
|
||||
|
||||
class ServiceLauncher(Launcher):
|
||||
def _handle_signal(self, signo, frame):
|
||||
# Allow the process to be killed again and die from natural causes
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
||||
|
||||
raise SignalExit(signo)
|
||||
|
||||
def wait(self):
|
||||
signal.signal(signal.SIGTERM, self._handle_signal)
|
||||
signal.signal(signal.SIGINT, self._handle_signal)
|
||||
|
||||
LOG.debug(_('Full set of CONF:'))
|
||||
CONF.log_opt_values(LOG, std_logging.DEBUG)
|
||||
|
||||
status = None
|
||||
try:
|
||||
super(ServiceLauncher, self).wait()
|
||||
except SignalExit as exc:
|
||||
signame = {signal.SIGTERM: 'SIGTERM',
|
||||
signal.SIGINT: 'SIGINT'}[exc.signo]
|
||||
LOG.info(_('Caught %s, exiting'), signame)
|
||||
status = exc.code
|
||||
except SystemExit as exc:
|
||||
status = exc.code
|
||||
finally:
|
||||
if rpc:
|
||||
rpc.cleanup()
|
||||
self.stop()
|
||||
return status
|
||||
|
||||
|
||||
class ServiceWrapper(object):
|
||||
def __init__(self, service, workers):
|
||||
self.service = service
|
||||
self.workers = workers
|
||||
self.children = set()
|
||||
self.forktimes = []
|
||||
|
||||
|
||||
class ProcessLauncher(object):
|
||||
def __init__(self):
|
||||
self.children = {}
|
||||
self.sigcaught = None
|
||||
self.running = True
|
||||
rfd, self.writepipe = os.pipe()
|
||||
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
|
||||
|
||||
signal.signal(signal.SIGTERM, self._handle_signal)
|
||||
signal.signal(signal.SIGINT, self._handle_signal)
|
||||
|
||||
def _handle_signal(self, signo, frame):
|
||||
self.sigcaught = signo
|
||||
self.running = False
|
||||
|
||||
# Allow the process to be killed again and die from natural causes
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
||||
|
||||
def _pipe_watcher(self):
|
||||
# This will block until the write end is closed when the parent
|
||||
# dies unexpectedly
|
||||
self.readpipe.read()
|
||||
|
||||
LOG.info(_('Parent process has died unexpectedly, exiting'))
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
def _child_process(self, service):
|
||||
# Setup child signal handlers differently
|
||||
def _sigterm(*args):
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
raise SignalExit(signal.SIGTERM)
|
||||
|
||||
signal.signal(signal.SIGTERM, _sigterm)
|
||||
# Block SIGINT and let the parent send us a SIGTERM
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
|
||||
# Reopen the eventlet hub to make sure we don't share an epoll
|
||||
# fd with parent and/or siblings, which would be bad
|
||||
eventlet.hubs.use_hub()
|
||||
|
||||
# Close write to ensure only parent has it open
|
||||
os.close(self.writepipe)
|
||||
# Create greenthread to watch for parent to close pipe
|
||||
eventlet.spawn_n(self._pipe_watcher)
|
||||
|
||||
# Reseed random number generator
|
||||
random.seed()
|
||||
|
||||
launcher = Launcher()
|
||||
launcher.run_service(service)
|
||||
|
||||
def _start_child(self, wrap):
|
||||
if len(wrap.forktimes) > wrap.workers:
|
||||
# Limit ourselves to one process a second (over the period of
|
||||
# number of workers * 1 second). This will allow workers to
|
||||
# start up quickly but ensure we don't fork off children that
|
||||
# die instantly too quickly.
|
||||
if time.time() - wrap.forktimes[0] < wrap.workers:
|
||||
LOG.info(_('Forking too fast, sleeping'))
|
||||
time.sleep(1)
|
||||
|
||||
wrap.forktimes.pop(0)
|
||||
|
||||
wrap.forktimes.append(time.time())
|
||||
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
# NOTE(johannes): All exceptions are caught to ensure this
|
||||
# doesn't fallback into the loop spawning children. It would
|
||||
# be bad for a child to spawn more children.
|
||||
status = 0
|
||||
try:
|
||||
self._child_process(wrap.service)
|
||||
except SignalExit as exc:
|
||||
signame = {signal.SIGTERM: 'SIGTERM',
|
||||
signal.SIGINT: 'SIGINT'}[exc.signo]
|
||||
LOG.info(_('Caught %s, exiting'), signame)
|
||||
status = exc.code
|
||||
except SystemExit as exc:
|
||||
status = exc.code
|
||||
except BaseException:
|
||||
LOG.exception(_('Unhandled exception'))
|
||||
status = 2
|
||||
finally:
|
||||
wrap.service.stop()
|
||||
|
||||
os._exit(status)
|
||||
|
||||
LOG.info(_('Started child %d'), pid)
|
||||
|
||||
wrap.children.add(pid)
|
||||
self.children[pid] = wrap
|
||||
|
||||
return pid
|
||||
|
||||
def launch_service(self, service, workers=1):
|
||||
wrap = ServiceWrapper(service, workers)
|
||||
|
||||
LOG.info(_('Starting %d workers'), wrap.workers)
|
||||
while self.running and len(wrap.children) < wrap.workers:
|
||||
self._start_child(wrap)
|
||||
|
||||
def _wait_child(self):
|
||||
try:
|
||||
# Don't block if no child processes have exited
|
||||
pid, status = os.waitpid(0, os.WNOHANG)
|
||||
if not pid:
|
||||
return None
|
||||
except OSError as exc:
|
||||
if exc.errno not in (errno.EINTR, errno.ECHILD):
|
||||
raise
|
||||
return None
|
||||
|
||||
if os.WIFSIGNALED(status):
|
||||
sig = os.WTERMSIG(status)
|
||||
LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
|
||||
dict(pid=pid, sig=sig))
|
||||
else:
|
||||
code = os.WEXITSTATUS(status)
|
||||
LOG.info(_('Child %(pid)s exited with status %(code)d'),
|
||||
dict(pid=pid, code=code))
|
||||
|
||||
if pid not in self.children:
|
||||
LOG.warning(_('pid %d not in child list'), pid)
|
||||
return None
|
||||
|
||||
wrap = self.children.pop(pid)
|
||||
wrap.children.remove(pid)
|
||||
return wrap
|
||||
|
||||
def wait(self):
|
||||
"""Loop waiting on children to die and respawning as necessary"""
|
||||
|
||||
LOG.debug(_('Full set of CONF:'))
|
||||
CONF.log_opt_values(LOG, std_logging.DEBUG)
|
||||
|
||||
while self.running:
|
||||
wrap = self._wait_child()
|
||||
if not wrap:
|
||||
# Yield to other threads if no children have exited
|
||||
# Sleep for a short time to avoid excessive CPU usage
|
||||
# (see bug #1095346)
|
||||
eventlet.greenthread.sleep(.01)
|
||||
continue
|
||||
|
||||
while self.running and len(wrap.children) < wrap.workers:
|
||||
self._start_child(wrap)
|
||||
|
||||
if self.sigcaught:
|
||||
signame = {signal.SIGTERM: 'SIGTERM',
|
||||
signal.SIGINT: 'SIGINT'}[self.sigcaught]
|
||||
LOG.info(_('Caught %s, stopping children'), signame)
|
||||
|
||||
for pid in self.children:
|
||||
try:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
except OSError as exc:
|
||||
if exc.errno != errno.ESRCH:
|
||||
raise
|
||||
|
||||
# Wait for children to die
|
||||
if self.children:
|
||||
LOG.info(_('Waiting on %d children to exit'), len(self.children))
|
||||
while self.children:
|
||||
self._wait_child()
|
||||
|
||||
|
||||
class Service(object):
|
||||
"""Service object for binaries running on hosts."""
|
||||
|
||||
def __init__(self, threads=1000):
|
||||
self.tg = threadgroup.ThreadGroup(threads)
|
||||
|
||||
def start(self):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
self.tg.stop()
|
||||
|
||||
def wait(self):
|
||||
self.tg.wait()
|
||||
|
||||
|
||||
def launch(service, workers=None):
|
||||
if workers:
|
||||
launcher = ProcessLauncher()
|
||||
launcher.launch_service(service, workers=workers)
|
||||
else:
|
||||
launcher = ServiceLauncher()
|
||||
launcher.launch_service(service)
|
||||
return launcher
|
|
@ -0,0 +1,121 @@
|
|||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2012 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from eventlet import greenlet
|
||||
from eventlet import greenpool
|
||||
from eventlet import greenthread
|
||||
|
||||
from nova.openstack.common import log as logging
|
||||
from nova.openstack.common import loopingcall
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _thread_done(gt, *args, **kwargs):
|
||||
""" Callback function to be passed to GreenThread.link() when we spawn()
|
||||
Calls the :class:`ThreadGroup` to notify if.
|
||||
|
||||
"""
|
||||
kwargs['group'].thread_done(kwargs['thread'])
|
||||
|
||||
|
||||
class Thread(object):
|
||||
""" Wrapper around a greenthread, that holds a reference to the
|
||||
:class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
|
||||
it has done so it can be removed from the threads list.
|
||||
"""
|
||||
def __init__(self, thread, group):
|
||||
self.thread = thread
|
||||
self.thread.link(_thread_done, group=group, thread=self)
|
||||
|
||||
def stop(self):
|
||||
self.thread.kill()
|
||||
|
||||
def wait(self):
|
||||
return self.thread.wait()
|
||||
|
||||
|
||||
class ThreadGroup(object):
|
||||
""" The point of the ThreadGroup classis to:
|
||||
|
||||
* keep track of timers and greenthreads (making it easier to stop them
|
||||
when need be).
|
||||
* provide an easy API to add timers.
|
||||
"""
|
||||
def __init__(self, thread_pool_size=10):
|
||||
self.pool = greenpool.GreenPool(thread_pool_size)
|
||||
self.threads = []
|
||||
self.timers = []
|
||||
|
||||
def add_dynamic_timer(self, callback, initial_delay=None,
|
||||
periodic_interval_max=None, *args, **kwargs):
|
||||
timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs)
|
||||
timer.start(initial_delay=initial_delay,
|
||||
periodic_interval_max=periodic_interval_max)
|
||||
self.timers.append(timer)
|
||||
|
||||
def add_timer(self, interval, callback, initial_delay=None,
|
||||
*args, **kwargs):
|
||||
pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)
|
||||
pulse.start(interval=interval,
|
||||
initial_delay=initial_delay)
|
||||
self.timers.append(pulse)
|
||||
|
||||
def add_thread(self, callback, *args, **kwargs):
|
||||
gt = self.pool.spawn(callback, *args, **kwargs)
|
||||
th = Thread(gt, self)
|
||||
self.threads.append(th)
|
||||
|
||||
def thread_done(self, thread):
|
||||
self.threads.remove(thread)
|
||||
|
||||
def stop(self):
|
||||
current = greenthread.getcurrent()
|
||||
for x in self.threads:
|
||||
if x is current:
|
||||
# don't kill the current thread.
|
||||
continue
|
||||
try:
|
||||
x.stop()
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
|
||||
for x in self.timers:
|
||||
try:
|
||||
x.stop()
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
self.timers = []
|
||||
|
||||
def wait(self):
|
||||
for x in self.timers:
|
||||
try:
|
||||
x.wait()
|
||||
except greenlet.GreenletExit:
|
||||
pass
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
current = greenthread.getcurrent()
|
||||
for x in self.threads:
|
||||
if x is current:
|
||||
continue
|
||||
try:
|
||||
x.wait()
|
||||
except greenlet.GreenletExit:
|
||||
pass
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
|
@ -27,7 +27,9 @@ module=processutils
|
|||
module=redhat-eventlet.patch
|
||||
module=rootwrap
|
||||
module=rpc
|
||||
module=service
|
||||
module=strutils
|
||||
module=threadgroup
|
||||
module=timeutils
|
||||
module=uuidutils
|
||||
|
||||
|
|
Loading…
Reference in New Issue