Pull latest service module from Oslo

Get latest service module from Oslo to prepare for multi-process API service implementation.
Below are the commits included in this pull.

Changes being pulled into in service module are:
* e7bc8c9 2013-11-20 | Merge "os._exit in _start_child may cause unexpected exception"
* 96a2d4e 2013-11-07 | os._exit in _start_child may cause unexpected exception
* 1771a77 2013-11-05 | Adjust import order according to PEP8 imports rule
* 3110c0f 2013-10-17 | Use multiprocessing.Event to ensure services have started
* b5fba9e 2013-09-18 | Move comment in service.py to correct location
* 11cc74f 2013-08-26 | Fixes issue with SUGHUP in services on Windows
* 825ace5 2013-06-17 | Add service restart function in oslo-incubator
* c935d1c 2013-07-16 | Merge "Allow launchers to be stopped multiple times"
* dc8aa79 2013-07-08 | Allow launchers to be stopped multiple times
* 1a2df89 2013-06-25 | Enable H302 hacking check
* 52e857a 2013-06-19 | Ignore any exceptions from rpc.cleanup().
* 5518ad3 2013-05-16 | Add graceful service shutdown support to Launcher

And these dependent modules
 - cinder/openstack/common/eventlet_backdoor.py
    * 1dcc747 2013-07-15 | Fix stylistic problems with help text
    * 1a2df89 2013-06-25 | Enable H302 hacking check
    * c7c55b2 2013-06-20 | Improve usability when backdoor_port is nonzero
 - cinder/openstack/common/gettextutils.py
    * 3970d46 2013-11-02 | Fix typos in oslo
    * 88db9c8 2013-10-03 | When translating if no locale is given use default locale
 - cinder/openstack/common/jsonutils.py
    * 3d7504b 2013-09-23 | Ensure that Message objects will be sent via RPC in unicode format
    * 1807d32 2013-08-22 | jsonutils: make types py3 compatible
    * bdef862 2013-08-22 | jsonutils: do not require xmlrpclib
    * ded9bd6 2013-08-04 | Make dependency on netaddr optional
    * 7b7566b 2013-06-25 | Add netaddr.IPAddress support to to_primitive()
 - cinder/openstack/common/local.py
    * cb2a2b6 2013-06-28 | Modify local.py to not be dependent on Eventlet
    * 547ab34 2013-03-11 | Fix Copyright Headers - Rename LLC to Foundation
 - cinder/openstack/common/log.py
    * a82e889 2013-11-14 | Merge "Do not name variables as builtins"
    * 2251cb5 2013-11-13 | Do not name variables as builtins
    * 25c5854 2013-11-13 | Adds admin_password as key to be sanitized when logging
    * cbfded9 2013-11-11 | Default iso8601 logging to WARN
    * 76b0cd1 2013-11-04 | Add mask password impl from other projects
 - cinder/openstack/common/loopingcall.py
    * 1a2df89 2013-06-25 | Enable H302 hacking check
 - cinder/openstack/common/threadgroup.py
    * 9d3c34b 2013-10-25 | Add a link method to Thread
    * 1a2df89 2013-06-25 | Enable H302 hacking check
 - cinder/openstack/common/timeutils.py
    * f3b5f17 2013-11-12 | Add helper method total_seconds in timeutils.py
    * 53ebd30 2013-10-18 | python3: use six.text_types for unicode()
    * 3bc6f79 2013-09-19 | Fix timeutils.set_override_time not defaulting to current wall time
    * af76064 2013-08-29 | Optimize timeutils.utcnow_ts()
    * df3f2ba 2013-07-26 | BaseException.message is deprecated since Python 2.6
    * d28fa69 2013-06-27 | python3: Add python3 compatibility.

Partial bp: multi-process-api-service

Change-Id: Ifd25eae9eb2d6ae53bcf1665c3d5b7db4144433c
This commit is contained in:
Zhiteng Huang 2013-11-21 17:02:23 +08:00
parent b4a9c25699
commit 9c2029a801
11 changed files with 386 additions and 99 deletions

View File

@ -18,8 +18,11 @@
from __future__ import print_function
import errno
import gc
import os
import pprint
import socket
import sys
import traceback
@ -28,14 +31,34 @@ import eventlet.backdoor
import greenlet
from oslo.config import cfg
from cinder.openstack.common.gettextutils import _ # noqa
from cinder.openstack.common import log as logging
help_for_backdoor_port = (
"Acceptable values are 0, <port>, and <start>:<end>, where 0 results "
"in listening on a random tcp port number; <port> results in listening "
"on the specified port number (and not enabling backdoor if that port "
"is in use); and <start>:<end> results in listening on the smallest "
"unused port number within the specified range of port numbers. The "
"chosen port is displayed in the service's log file.")
eventlet_backdoor_opts = [
cfg.IntOpt('backdoor_port',
cfg.StrOpt('backdoor_port',
default=None,
help='port for eventlet backdoor to listen')
help="Enable eventlet backdoor. %s" % help_for_backdoor_port)
]
CONF = cfg.CONF
CONF.register_opts(eventlet_backdoor_opts)
LOG = logging.getLogger(__name__)
class EventletBackdoorConfigValueError(Exception):
def __init__(self, port_range, help_msg, ex):
msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. '
'%(help)s' %
{'range': port_range, 'ex': ex, 'help': help_msg})
super(EventletBackdoorConfigValueError, self).__init__(msg)
self.port_range = port_range
def _dont_use_this():
@ -60,6 +83,33 @@ def _print_nativethreads():
print()
def _parse_port_range(port_range):
if ':' not in port_range:
start, end = port_range, port_range
else:
start, end = port_range.split(':', 1)
try:
start, end = int(start), int(end)
if end < start:
raise ValueError
return start, end
except ValueError as ex:
raise EventletBackdoorConfigValueError(port_range, ex,
help_for_backdoor_port)
def _listen(host, start_port, end_port, listen_func):
try_port = start_port
while True:
try:
return listen_func((host, try_port))
except socket.error as exc:
if (exc.errno != errno.EADDRINUSE or
try_port >= end_port):
raise
try_port += 1
def initialize_if_enabled():
backdoor_locals = {
'exit': _dont_use_this, # So we don't exit the entire process
@ -72,6 +122,8 @@ def initialize_if_enabled():
if CONF.backdoor_port is None:
return None
start_port, end_port = _parse_port_range(str(CONF.backdoor_port))
# NOTE(johannes): The standard sys.displayhook will print the value of
# the last expression and set it to __builtin__._, which overwrites
# the __builtin__._ that gettext sets. Let's switch to using pprint
@ -82,8 +134,13 @@ def initialize_if_enabled():
pprint.pprint(val)
sys.displayhook = displayhook
sock = eventlet.listen(('localhost', CONF.backdoor_port))
sock = _listen('localhost', start_port, end_port, eventlet.listen)
# In the case of backdoor port being zero, a port number is assigned by
# listen(). In any case, pull the port number out here.
port = sock.getsockname()[1]
LOG.info(_('Eventlet backdoor listening on %(port)s for process %(pid)d') %
{'port': port, 'pid': os.getpid()})
eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
locals=backdoor_locals)
return port

View File

@ -317,7 +317,7 @@ def get_available_languages(domain):
# NOTE(luisg): Babel <1.0 used a function called list(), which was
# renamed to locale_identifiers() in >=1.0, the requirements master list
# requires >=0.9.6, uncapped, so defensively work with both. We can remove
# this check when the master list updates to >=1.0, and all projects udpate
# this check when the master list updates to >=1.0, and update all projects
list_identifiers = (getattr(localedata, 'list', None) or
getattr(localedata, 'locale_identifiers'))
locale_identifiers = list_identifiers()
@ -329,13 +329,21 @@ def get_available_languages(domain):
def get_localized_message(message, user_locale):
"""Gets a localized version of the given message in the given locale."""
"""Gets a localized version of the given message in the given locale.
If the message is not a Message object the message is returned as-is.
If the locale is None the message is translated to the default locale.
:returns: the translated message in unicode, or the original message if
it could not be translated
"""
translated = message
if isinstance(message, Message):
if user_locale:
message.locale = user_locale
return six.text_type(message)
else:
return message
original_locale = message.locale
message.locale = user_locale
translated = six.text_type(message)
message.locale = original_locale
return translated
class LocaleHandler(logging.Handler):

View File

@ -38,13 +38,19 @@ import functools
import inspect
import itertools
import json
import types
import xmlrpclib
try:
import xmlrpclib
except ImportError:
# NOTE(jd): xmlrpclib is not shipped with Python 3
xmlrpclib = None
import six
from cinder.openstack.common import gettextutils
from cinder.openstack.common import importutils
from cinder.openstack.common import timeutils
netaddr = importutils.try_import("netaddr")
_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
inspect.isfunction, inspect.isgeneratorfunction,
@ -52,7 +58,8 @@ _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
inspect.iscode, inspect.isbuiltin, inspect.isroutine,
inspect.isabstract]
_simple_types = (types.NoneType, int, basestring, bool, float, long)
_simple_types = (six.string_types + six.integer_types
+ (type(None), bool, float))
def to_primitive(value, convert_instances=False, convert_datetime=True,
@ -124,11 +131,13 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
# It's not clear why xmlrpclib created their own DateTime type, but
# for our purposes, make it a datetime type which is explicitly
# handled
if isinstance(value, xmlrpclib.DateTime):
if xmlrpclib and isinstance(value, xmlrpclib.DateTime):
value = datetime.datetime(*tuple(value.timetuple())[:6])
if convert_datetime and isinstance(value, datetime.datetime):
return timeutils.strtime(value)
elif isinstance(value, gettextutils.Message):
return value.data
elif hasattr(value, 'iteritems'):
return recursive(dict(value.iteritems()), level=level + 1)
elif hasattr(value, '__iter__'):
@ -137,6 +146,8 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
# Likely an instance of something. Watch for cycles.
# Ignore class member vars.
return recursive(value.__dict__, level=level + 1)
elif netaddr and isinstance(value, netaddr.IPAddress):
return six.text_type(value)
else:
if any(test(value) for test in _nasty_type_tests):
return six.text_type(value)

View File

@ -15,16 +15,15 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Greenthread local storage of variables using weak references"""
"""Local storage of variables using weak references"""
import threading
import weakref
from eventlet import corolocal
class WeakLocal(corolocal.local):
class WeakLocal(threading.local):
def __getattribute__(self, attr):
rval = corolocal.local.__getattribute__(self, attr)
rval = super(WeakLocal, self).__getattribute__(attr)
if rval:
# NOTE(mikal): this bit is confusing. What is stored is a weak
# reference, not the value itself. We therefore need to lookup
@ -34,7 +33,7 @@ class WeakLocal(corolocal.local):
def __setattr__(self, attr, value):
value = weakref.ref(value)
return corolocal.local.__setattr__(self, attr, value)
return super(WeakLocal, self).__setattr__(attr, value)
# NOTE(mikal): the name "store" should be deprecated in the future
@ -45,4 +44,4 @@ store = WeakLocal()
# "strong" store will hold a reference to the object so that it never falls out
# of scope.
weak_store = WeakLocal()
strong_store = corolocal.local
strong_store = threading.local()

View File

@ -35,6 +35,7 @@ import logging
import logging.config
import logging.handlers
import os
import re
import sys
import traceback
@ -50,6 +51,24 @@ from cinder.openstack.common import local
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password', 'admin_password']
# NOTE(ldbragst): Let's build a list of regex objects using the list of
# _SANITIZE_KEYS we already have. This way, we only have to add the new key
# to the list of _SANITIZE_KEYS and we can generate regular expressions
# for XML and JSON automatically.
_SANITIZE_PATTERNS = []
_FORMAT_PATTERNS = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])',
r'(<%(key)s>).*?(</%(key)s>)',
r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])',
r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])']
for key in _SANITIZE_KEYS:
for pattern in _FORMAT_PATTERNS:
reg_ex = re.compile(pattern % {'key': key}, re.DOTALL)
_SANITIZE_PATTERNS.append(reg_ex)
common_cli_opts = [
cfg.BoolOpt('debug',
short='d',
@ -136,6 +155,7 @@ log_opts = [
'qpid=WARN',
'sqlalchemy=WARN',
'suds=INFO',
'iso8601=WARN',
],
help='list of logger=LEVEL pairs'),
cfg.BoolOpt('publish_errors',
@ -214,6 +234,39 @@ def _get_log_file_path(binary=None):
return None
def mask_password(message, secret="***"):
"""Replace password with 'secret' in message.
:param message: The string which includes security information.
:param secret: value with which to replace passwords, defaults to "***".
:returns: The unicode value of message with the password fields masked.
For example:
>>> mask_password("'adminPass' : 'aaaaa'")
"'adminPass' : '***'"
>>> mask_password("'admin_pass' : 'aaaaa'")
"'admin_pass' : '***'"
>>> mask_password('"password" : "aaaaa"')
'"password" : "***"'
>>> mask_password("'original_password' : 'aaaaa'")
"'original_password' : '***'"
>>> mask_password("u'original_password' : u'aaaaa'")
"u'original_password' : u'***'"
"""
message = six.text_type(message)
# NOTE(ldbragst): Check to see if anything in message contains any key
# specified in _SANITIZE_KEYS, if not then just return the message since
# we don't have to mask any passwords.
if not any(key in message for key in _SANITIZE_KEYS):
return message
secret = r'\g<1>' + secret + r'\g<2>'
for pattern in _SANITIZE_PATTERNS:
message = re.sub(pattern, secret, message)
return message
class BaseLoggerAdapter(logging.LoggerAdapter):
def audit(self, msg, *args, **kwargs):
@ -336,10 +389,10 @@ class JSONFormatter(logging.Formatter):
def _create_logging_excepthook(product_name):
def logging_excepthook(type, value, tb):
def logging_excepthook(exc_type, value, tb):
extra = {}
if CONF.verbose:
extra['exc_info'] = (type, value, tb)
extra['exc_info'] = (exc_type, value, tb)
getLogger(product_name).critical(str(value), **extra)
return logging_excepthook

View File

@ -22,7 +22,7 @@ import sys
from eventlet import event
from eventlet import greenthread
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common.gettextutils import _ # noqa
from cinder.openstack.common import log as logging
from cinder.openstack.common import timeutils

View File

@ -20,6 +20,7 @@
"""Generic Node base class for all workers that run on hosts."""
import errno
import logging as std_logging
import os
import random
import signal
@ -27,11 +28,11 @@ import sys
import time
import eventlet
import logging as std_logging
from eventlet import event
from oslo.config import cfg
from cinder.openstack.common import eventlet_backdoor
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common.gettextutils import _ # noqa
from cinder.openstack.common import importutils
from cinder.openstack.common import log as logging
from cinder.openstack.common import threadgroup
@ -42,6 +43,29 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def _sighup_supported():
return hasattr(signal, 'SIGHUP')
def _is_sighup(signo):
return _sighup_supported() and signo == signal.SIGHUP
def _signo_to_signame(signo):
signals = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}
if _sighup_supported():
signals[signal.SIGHUP] = 'SIGHUP'
return signals[signo]
def _set_signals_handler(handler):
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
if _sighup_supported():
signal.signal(signal.SIGHUP, handler)
class Launcher(object):
"""Launch one or more services and wait for them to complete."""
@ -51,20 +75,9 @@ class Launcher(object):
:returns: None
"""
self._services = threadgroup.ThreadGroup()
self.services = Services()
self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
@staticmethod
def run_service(service):
"""Start and wait for a service to finish.
:param service: service to run and wait for.
:returns: None
"""
service.start()
service.wait()
def launch_service(self, service):
"""Load and start the given service.
@ -73,7 +86,7 @@ class Launcher(object):
"""
service.backdoor_port = self.backdoor_port
self._services.add_thread(self.run_service, service)
self.services.add(service)
def stop(self):
"""Stop all services which are currently running.
@ -81,7 +94,7 @@ class Launcher(object):
:returns: None
"""
self._services.stop()
self.services.stop()
def wait(self):
"""Waits until all services have been stopped, and then returns.
@ -89,7 +102,16 @@ class Launcher(object):
:returns: None
"""
self._services.wait()
self.services.wait()
def restart(self):
"""Reload config files and restart service.
:returns: None
"""
cfg.CONF.reload_config_files()
self.services.restart()
class SignalExit(SystemExit):
@ -101,33 +123,48 @@ class SignalExit(SystemExit):
class ServiceLauncher(Launcher):
def _handle_signal(self, signo, frame):
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
_set_signals_handler(signal.SIG_DFL)
raise SignalExit(signo)
def wait(self):
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
def handle_signal(self):
_set_signals_handler(self._handle_signal)
def _wait_for_exit_or_signal(self, ready_callback=None):
status = None
signo = 0
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
status = None
try:
if ready_callback:
ready_callback()
super(ServiceLauncher, self).wait()
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[exc.signo]
signame = _signo_to_signame(exc.signo)
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
finally:
if rpc:
rpc.cleanup()
self.stop()
return status
if rpc:
try:
rpc.cleanup()
except Exception:
# We're shutting down, so it doesn't matter at this point.
LOG.exception(_('Exception during rpc cleanup.'))
return status, signo
def wait(self, ready_callback=None):
while True:
self.handle_signal()
status, signo = self._wait_for_exit_or_signal(ready_callback)
if not _is_sighup(signo):
return status
self.restart()
class ServiceWrapper(object):
@ -145,17 +182,17 @@ class ProcessLauncher(object):
self.running = True
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
self.handle_signal()
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
def handle_signal(self):
_set_signals_handler(self._handle_signal)
def _handle_signal(self, signo, frame):
self.sigcaught = signo
self.running = False
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
_set_signals_handler(signal.SIG_DFL)
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
@ -166,16 +203,49 @@ class ProcessLauncher(object):
sys.exit(1)
def _child_process(self, service):
def _child_process_handle_signal(self):
# Setup child signal handlers differently
def _sigterm(*args):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SignalExit(signal.SIGTERM)
def _sighup(*args):
signal.signal(signal.SIGHUP, signal.SIG_DFL)
raise SignalExit(signal.SIGHUP)
signal.signal(signal.SIGTERM, _sigterm)
if _sighup_supported():
signal.signal(signal.SIGHUP, _sighup)
# Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN)
def _child_wait_for_exit_or_signal(self, launcher):
status = 0
signo = 0
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
try:
launcher.wait()
except SignalExit as exc:
signame = _signo_to_signame(exc.signo)
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
status = 2
finally:
launcher.stop()
return status, signo
def _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
@ -189,7 +259,8 @@ class ProcessLauncher(object):
random.seed()
launcher = Launcher()
launcher.run_service(service)
launcher.launch_service(service)
return launcher
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
@ -207,24 +278,13 @@ class ProcessLauncher(object):
pid = os.fork()
if pid == 0:
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
status = 0
try:
self._child_process(wrap.service)
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[exc.signo]
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
status = 2
finally:
wrap.service.stop()
launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(launcher)
if not _is_sighup(signo):
break
launcher.restart()
os._exit(status)
@ -270,12 +330,7 @@ class ProcessLauncher(object):
wrap.children.remove(pid)
return wrap
def wait(self):
"""Loop waiting on children to die and respawning as necessary."""
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
def _respawn_children(self):
while self.running:
wrap = self._wait_child()
if not wrap:
@ -284,14 +339,28 @@ class ProcessLauncher(object):
# (see bug #1095346)
eventlet.greenthread.sleep(.01)
continue
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
if self.sigcaught:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[self.sigcaught]
LOG.info(_('Caught %s, stopping children'), signame)
def wait(self):
"""Loop waiting on children to die and respawning as necessary."""
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
while True:
self.handle_signal()
self._respawn_children()
if self.sigcaught:
signame = _signo_to_signame(self.sigcaught)
LOG.info(_('Caught %s, stopping children'), signame)
if not _is_sighup(self.sigcaught):
break
for pid in self.children:
os.kill(pid, signal.SIGHUP)
self.running = True
self.sigcaught = None
for pid in self.children:
try:
@ -313,15 +382,74 @@ class Service(object):
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup(threads)
# signal that the service is done shutting itself down:
self._done = event.Event()
def reset(self):
# NOTE(Fengqian): docs for Event.reset() recommend against using it
self._done = event.Event()
def start(self):
pass
def stop(self):
self.tg.stop()
self.tg.wait()
# Signal that service cleanup is done:
if not self._done.ready():
self._done.send()
def wait(self):
self._done.wait()
class Services(object):
def __init__(self):
self.services = []
self.tg = threadgroup.ThreadGroup()
self.done = event.Event()
def add(self, service):
self.services.append(service)
self.tg.add_thread(self.run_service, service, self.done)
def stop(self):
# wait for graceful shutdown of services:
for service in self.services:
service.stop()
service.wait()
# Each service has performed cleanup, now signal that the run_service
# wrapper threads can now die:
if not self.done.ready():
self.done.send()
# reap threads:
self.tg.stop()
def wait(self):
self.tg.wait()
def restart(self):
self.stop()
self.done = event.Event()
for restart_service in self.services:
restart_service.reset()
self.tg.add_thread(self.run_service, restart_service, self.done)
@staticmethod
def run_service(service, done):
"""Service start wrapper.
:param service: service to run
:param done: event to wait on until a shutdown is triggered
:returns: None
"""
service.start()
done.wait()
def launch(service, workers=None):
if workers:

View File

@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from eventlet import greenlet
import eventlet
from eventlet import greenpool
from eventlet import greenthread
@ -48,6 +48,9 @@ class Thread(object):
def wait(self):
return self.thread.wait()
def link(self, func, *args, **kwargs):
self.thread.link(func, *args, **kwargs)
class ThreadGroup(object):
"""The point of the ThreadGroup classis to:
@ -79,6 +82,7 @@ class ThreadGroup(object):
gt = self.pool.spawn(callback, *args, **kwargs)
th = Thread(gt, self)
self.threads.append(th)
return th
def thread_done(self, thread):
self.threads.remove(thread)
@ -105,7 +109,7 @@ class ThreadGroup(object):
for x in self.timers:
try:
x.wait()
except greenlet.GreenletExit:
except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)
@ -115,7 +119,7 @@ class ThreadGroup(object):
continue
try:
x.wait()
except greenlet.GreenletExit:
except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)

View File

@ -21,8 +21,10 @@ Time related utilities and helper functions.
import calendar
import datetime
import time
import iso8601
import six
# ISO 8601 extended time format with microseconds
@ -48,9 +50,9 @@ def parse_isotime(timestr):
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
raise ValueError(e.message)
raise ValueError(six.text_type(e))
except TypeError as e:
raise ValueError(e.message)
raise ValueError(six.text_type(e))
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
@ -75,20 +77,25 @@ def normalize_time(timestamp):
def is_older_than(before, seconds):
"""Return True if before is older than seconds."""
if isinstance(before, basestring):
if isinstance(before, six.string_types):
before = parse_strtime(before).replace(tzinfo=None)
return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds):
"""Return True if after is newer than seconds."""
if isinstance(after, basestring):
if isinstance(after, six.string_types):
after = parse_strtime(after).replace(tzinfo=None)
return after - utcnow() > datetime.timedelta(seconds=seconds)
def utcnow_ts():
"""Timestamp version of our utcnow function."""
if utcnow.override_time is None:
# NOTE(kgriffs): This is several times faster
# than going through calendar.timegm(...)
return int(time.time())
return calendar.timegm(utcnow().timetuple())
@ -110,12 +117,15 @@ def iso8601_from_timestamp(timestamp):
utcnow.override_time = None
def set_time_override(override_time=datetime.datetime.utcnow()):
def set_time_override(override_time=None):
"""Overrides utils.utcnow.
Make it return a constant time or a list thereof, one at a time.
:param override_time: datetime instance or list thereof. If not
given, defaults to the current UTC time.
"""
utcnow.override_time = override_time
utcnow.override_time = override_time or datetime.datetime.utcnow()
def advance_time_delta(timedelta):
@ -168,6 +178,15 @@ def delta_seconds(before, after):
datetime objects (as a float, to microsecond resolution).
"""
delta = after - before
return total_seconds(delta)
def total_seconds(delta):
"""Return the total seconds of datetime.timedelta object.
Compute total seconds of datetime.timedelta, datetime.timedelta
doesn't have method total_seconds in Python2.6, calculate it manually.
"""
try:
return delta.total_seconds()
except AttributeError:

View File

@ -573,7 +573,14 @@
# Options defined in cinder.openstack.common.eventlet_backdoor
#
# port for eventlet backdoor to listen (integer value)
# Enable eventlet backdoor. Acceptable values are 0, <port>,
# and <start>:<end>, where 0 results in listening on a random
# tcp port number; <port> results in listening on the
# specified port number (and not enabling backdoor if that
# port is in use); and <start>:<end> results in listening on
# the smallest unused port number within the specified range
# of port numbers. The chosen port is displayed in the
# service's log file. (string value)
#backdoor_port=<None>
@ -621,7 +628,7 @@
#logging_exception_prefix=%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s %(instance)s
# list of logger=LEVEL pairs (list value)
#default_log_levels=amqp=WARN,amqplib=WARN,boto=WARN,keystone=INFO,qpid=WARN,sqlalchemy=WARN,suds=INFO
#default_log_levels=amqp=WARN,amqplib=WARN,boto=WARN,keystone=INFO,qpid=WARN,sqlalchemy=WARN,suds=INFO,iso8601=WARN
# publish error events (boolean value)
#publish_errors=false

View File

@ -25,6 +25,7 @@ module=rpc
module=scheduler
module=scheduler.filters
module=scheduler.weights
module=service
module=strutils
module=timeutils
module=uuidutils