Sync necessary modules from oslo incubator for worker service

Sync modelus: service, periodic_task and related dependencies
from oslo-incubator. Using the following command in oslo-incubator:

python update.py --base rally --dest-dir ../rally --modules \
periodic_task,service

Change-Id: I0b8278404d53a91add7a8cf3876121aad1e497c2
This commit is contained in:
liyingjun
2014-06-24 17:39:34 +08:00
parent ec6cd401e0
commit be17a13505
13 changed files with 1387 additions and 116 deletions

View File

@@ -11,6 +11,8 @@ module=log
module=test module=test
module=fixture module=fixture
module=uuidutils module=uuidutils
module=service
module=periodic_task
# The base module to hold the copy of openstack.common # The base module to hold the copy of openstack.common
base=rally base=rally

View File

@@ -0,0 +1,145 @@
# Copyright (c) 2012 OpenStack Foundation.
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import errno
import gc
import os
import pprint
import socket
import sys
import traceback
import eventlet
import eventlet.backdoor
import greenlet
from oslo.config import cfg
from rally.openstack.common.gettextutils import _LI
from rally.openstack.common import log as logging
help_for_backdoor_port = (
"Acceptable values are 0, <port>, and <start>:<end>, where 0 results "
"in listening on a random tcp port number; <port> results in listening "
"on the specified port number (and not enabling backdoor if that port "
"is in use); and <start>:<end> results in listening on the smallest "
"unused port number within the specified range of port numbers. The "
"chosen port is displayed in the service's log file.")
eventlet_backdoor_opts = [
cfg.StrOpt('backdoor_port',
help="Enable eventlet backdoor. %s" % help_for_backdoor_port)
]
CONF = cfg.CONF
CONF.register_opts(eventlet_backdoor_opts)
LOG = logging.getLogger(__name__)
class EventletBackdoorConfigValueError(Exception):
def __init__(self, port_range, help_msg, ex):
msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. '
'%(help)s' %
{'range': port_range, 'ex': ex, 'help': help_msg})
super(EventletBackdoorConfigValueError, self).__init__(msg)
self.port_range = port_range
def _dont_use_this():
print("Don't use this, just disconnect instead")
def _find_objects(t):
return [o for o in gc.get_objects() if isinstance(o, t)]
def _print_greenthreads():
for i, gt in enumerate(_find_objects(greenlet.greenlet)):
print(i, gt)
traceback.print_stack(gt.gr_frame)
print()
def _print_nativethreads():
for threadId, stack in sys._current_frames().items():
print(threadId)
traceback.print_stack(stack)
print()
def _parse_port_range(port_range):
if ':' not in port_range:
start, end = port_range, port_range
else:
start, end = port_range.split(':', 1)
try:
start, end = int(start), int(end)
if end < start:
raise ValueError
return start, end
except ValueError as ex:
raise EventletBackdoorConfigValueError(port_range, ex,
help_for_backdoor_port)
def _listen(host, start_port, end_port, listen_func):
try_port = start_port
while True:
try:
return listen_func((host, try_port))
except socket.error as exc:
if (exc.errno != errno.EADDRINUSE or
try_port >= end_port):
raise
try_port += 1
def initialize_if_enabled():
backdoor_locals = {
'exit': _dont_use_this, # So we don't exit the entire process
'quit': _dont_use_this, # So we don't exit the entire process
'fo': _find_objects,
'pgt': _print_greenthreads,
'pnt': _print_nativethreads,
}
if CONF.backdoor_port is None:
return None
start_port, end_port = _parse_port_range(str(CONF.backdoor_port))
# NOTE(johannes): The standard sys.displayhook will print the value of
# the last expression and set it to __builtin__._, which overwrites
# the __builtin__._ that gettext sets. Let's switch to using pprint
# since it won't interact poorly with gettext, and it's easier to
# read the output too.
def displayhook(val):
if val is not None:
pprint.pprint(val)
sys.displayhook = displayhook
sock = _listen('localhost', start_port, end_port, eventlet.listen)
# In the case of backdoor port being zero, a port number is assigned by
# listen(). In any case, pull the port number out here.
port = sock.getsockname()[1]
LOG.info(
_LI('Eventlet backdoor listening on %(port)s for process %(pid)d') %
{'port': port, 'pid': os.getpid()}
)
eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
locals=backdoor_locals)
return port

View File

@@ -34,7 +34,7 @@ import six
_AVAILABLE_LANGUAGES = {} _AVAILABLE_LANGUAGES = {}
# FIXME(dhellmann): Remove this when moving to rally.i18n. # FIXME(dhellmann): Remove this when moving to oslo.i18n.
USE_LAZY = False USE_LAZY = False
@@ -116,7 +116,7 @@ class TranslatorFactory(object):
# NOTE(dhellmann): When this module moves out of the incubator into # NOTE(dhellmann): When this module moves out of the incubator into
# rally.i18n, these global variables can be moved to an integration # oslo.i18n, these global variables can be moved to an integration
# module within each application. # module within each application.
# Create the global translation functions. # Create the global translation functions.
@@ -147,7 +147,7 @@ def enable_lazy():
your project is importing _ directly instead of using the your project is importing _ directly instead of using the
gettextutils.install() way of importing the _ function. gettextutils.install() way of importing the _ function.
""" """
# FIXME(dhellmann): This function will be removed in rally.i18n, # FIXME(dhellmann): This function will be removed in oslo.i18n,
# because the TranslatorFactory makes it superfluous. # because the TranslatorFactory makes it superfluous.
global _, _LI, _LW, _LE, _LC, USE_LAZY global _, _LI, _LW, _LE, _LC, USE_LAZY
tf = TranslatorFactory('rally', lazy=True) tf = TranslatorFactory('rally', lazy=True)
@@ -373,8 +373,8 @@ def get_available_languages(domain):
'zh_Hant_HK': 'zh_HK', 'zh_Hant_HK': 'zh_HK',
'zh_Hant': 'zh_TW', 'zh_Hant': 'zh_TW',
'fil': 'tl_PH'} 'fil': 'tl_PH'}
for (locale, alias) in six.iteritems(aliases): for (locale_, alias) in six.iteritems(aliases):
if locale in language_list and alias not in language_list: if locale_ in language_list and alias not in language_list:
language_list.append(alias) language_list.append(alias)
_AVAILABLE_LANGUAGES[domain] = language_list _AVAILABLE_LANGUAGES[domain] = language_list

View File

@@ -24,10 +24,10 @@ import traceback
def import_class(import_str): def import_class(import_str):
"""Returns a class from a string including module and class.""" """Returns a class from a string including module and class."""
mod_str, _sep, class_str = import_str.rpartition('.') mod_str, _sep, class_str = import_str.rpartition('.')
__import__(mod_str)
try: try:
__import__(mod_str)
return getattr(sys.modules[mod_str], class_str) return getattr(sys.modules[mod_str], class_str)
except (ValueError, AttributeError): except AttributeError:
raise ImportError('Class %s cannot be found (%s)' % raise ImportError('Class %s cannot be found (%s)' %
(class_str, (class_str,
traceback.format_exception(*sys.exc_info()))) traceback.format_exception(*sys.exc_info())))

View File

@@ -31,25 +31,29 @@ This module provides a few things:
''' '''
import codecs
import datetime import datetime
import functools import functools
import inspect import inspect
import itertools import itertools
import json import sys
try:
import xmlrpclib if sys.version_info < (2, 7):
except ImportError: # On Python <= 2.6, json module is not C boosted, so try to use
# NOTE(jaypipes): xmlrpclib was renamed to xmlrpc.client in Python3 # simplejson module if available
# however the function and object call signatures try:
# remained the same. This whole try/except block should import simplejson as json
# be removed and replaced with a call to six.moves once except ImportError:
# six 1.4.2 is released. See http://bit.ly/1bqrVzu import json
import xmlrpc.client as xmlrpclib else:
import json
import six import six
import six.moves.xmlrpc_client as xmlrpclib
from rally.openstack.common import gettextutils from rally.openstack.common import gettextutils
from rally.openstack.common import importutils from rally.openstack.common import importutils
from rally.openstack.common import strutils
from rally.openstack.common import timeutils from rally.openstack.common import timeutils
netaddr = importutils.try_import("netaddr") netaddr = importutils.try_import("netaddr")
@@ -164,12 +168,12 @@ def dumps(value, default=to_primitive, **kwargs):
return json.dumps(value, default=default, **kwargs) return json.dumps(value, default=default, **kwargs)
def loads(s): def loads(s, encoding='utf-8', **kwargs):
return json.loads(s) return json.loads(strutils.safe_decode(s, encoding), **kwargs)
def load(s): def load(fp, encoding='utf-8', **kwargs):
return json.load(s) return json.load(codecs.getreader(encoding)(fp), **kwargs)
try: try:

View File

@@ -59,7 +59,10 @@ _SANITIZE_PATTERNS = []
_FORMAT_PATTERNS = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])', _FORMAT_PATTERNS = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])',
r'(<%(key)s>).*?(</%(key)s>)', r'(<%(key)s>).*?(</%(key)s>)',
r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])', r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])',
r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])'] r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])',
r'([\'"].*?%(key)s[\'"]\s*,\s*\'--?[A-z]+\'\s*,\s*u?[\'"])'
'.*?([\'"])',
r'(%(key)s\s*--?[A-z]+\s*).*?([\s])']
for key in _SANITIZE_KEYS: for key in _SANITIZE_KEYS:
for pattern in _FORMAT_PATTERNS: for pattern in _FORMAT_PATTERNS:
@@ -84,14 +87,11 @@ logging_cli_opts = [
cfg.StrOpt('log-config-append', cfg.StrOpt('log-config-append',
metavar='PATH', metavar='PATH',
deprecated_name='log-config', deprecated_name='log-config',
help='The name of logging configuration file. It does not ' help='The name of a logging configuration file. This file '
'disable existing loggers, but just appends specified ' 'is appended to any existing logging configuration '
'logging configuration to any other existing logging ' 'files. For details about logging configuration files, '
'options. Please see the Python logging module ' 'see the Python logging module documentation.'),
'documentation for details on logging configuration '
'files.'),
cfg.StrOpt('log-format', cfg.StrOpt('log-format',
default=None,
metavar='FORMAT', metavar='FORMAT',
help='DEPRECATED. ' help='DEPRECATED. '
'A logging.Formatter log message format string which may ' 'A logging.Formatter log message format string which may '
@@ -103,7 +103,7 @@ logging_cli_opts = [
default=_DEFAULT_LOG_DATE_FORMAT, default=_DEFAULT_LOG_DATE_FORMAT,
metavar='DATE_FORMAT', metavar='DATE_FORMAT',
help='Format string for %%(asctime)s in log records. ' help='Format string for %%(asctime)s in log records. '
'Default: %(default)s'), 'Default: %(default)s .'),
cfg.StrOpt('log-file', cfg.StrOpt('log-file',
metavar='PATH', metavar='PATH',
deprecated_name='logfile', deprecated_name='logfile',
@@ -112,30 +112,30 @@ logging_cli_opts = [
cfg.StrOpt('log-dir', cfg.StrOpt('log-dir',
deprecated_name='logdir', deprecated_name='logdir',
help='(Optional) The base directory used for relative ' help='(Optional) The base directory used for relative '
'--log-file paths'), '--log-file paths.'),
cfg.BoolOpt('use-syslog', cfg.BoolOpt('use-syslog',
default=False, default=False,
help='Use syslog for logging. ' help='Use syslog for logging. '
'Existing syslog format is DEPRECATED during I, ' 'Existing syslog format is DEPRECATED during I, '
'and then will be changed in J to honor RFC5424'), 'and will change in J to honor RFC5424.'),
cfg.BoolOpt('use-syslog-rfc-format', cfg.BoolOpt('use-syslog-rfc-format',
# TODO(bogdando) remove or use True after existing # TODO(bogdando) remove or use True after existing
# syslog format deprecation in J # syslog format deprecation in J
default=False, default=False,
help='(Optional) Use syslog rfc5424 format for logging. ' help='(Optional) Enables or disables syslog rfc5424 format '
'If enabled, will add APP-NAME (RFC5424) before the ' 'for logging. If enabled, prefixes the MSG part of the '
'MSG part of the syslog message. The old format ' 'syslog message with APP-NAME (RFC5424). The '
'without APP-NAME is deprecated in I, ' 'format without the APP-NAME is deprecated in I, '
'and will be removed in J.'), 'and will be removed in J.'),
cfg.StrOpt('syslog-log-facility', cfg.StrOpt('syslog-log-facility',
default='LOG_USER', default='LOG_USER',
help='Syslog facility to receive log lines') help='Syslog facility to receive log lines.')
] ]
generic_log_opts = [ generic_log_opts = [
cfg.BoolOpt('use_stderr', cfg.BoolOpt('use_stderr',
default=True, default=True,
help='Log output to standard error') help='Log output to standard error.')
] ]
log_opts = [ log_opts = [
@@ -143,18 +143,18 @@ log_opts = [
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s ' default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
'%(name)s [%(request_id)s %(user_identity)s] ' '%(name)s [%(request_id)s %(user_identity)s] '
'%(instance)s%(message)s', '%(instance)s%(message)s',
help='Format string to use for log messages with context'), help='Format string to use for log messages with context.'),
cfg.StrOpt('logging_default_format_string', cfg.StrOpt('logging_default_format_string',
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s ' default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
'%(name)s [-] %(instance)s%(message)s', '%(name)s [-] %(instance)s%(message)s',
help='Format string to use for log messages without context'), help='Format string to use for log messages without context.'),
cfg.StrOpt('logging_debug_format_suffix', cfg.StrOpt('logging_debug_format_suffix',
default='%(funcName)s %(pathname)s:%(lineno)d', default='%(funcName)s %(pathname)s:%(lineno)d',
help='Data to append to log format when level is DEBUG'), help='Data to append to log format when level is DEBUG.'),
cfg.StrOpt('logging_exception_prefix', cfg.StrOpt('logging_exception_prefix',
default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s ' default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s '
'%(instance)s', '%(instance)s',
help='Prefix each line of exception output with this format'), help='Prefix each line of exception output with this format.'),
cfg.ListOpt('default_log_levels', cfg.ListOpt('default_log_levels',
default=[ default=[
'amqp=WARN', 'amqp=WARN',
@@ -167,25 +167,25 @@ log_opts = [
'iso8601=WARN', 'iso8601=WARN',
'requests.packages.urllib3.connectionpool=WARN' 'requests.packages.urllib3.connectionpool=WARN'
], ],
help='List of logger=LEVEL pairs'), help='List of logger=LEVEL pairs.'),
cfg.BoolOpt('publish_errors', cfg.BoolOpt('publish_errors',
default=False, default=False,
help='Publish error events'), help='Enables or disables publication of error events.'),
cfg.BoolOpt('fatal_deprecations', cfg.BoolOpt('fatal_deprecations',
default=False, default=False,
help='Make deprecations fatal'), help='Enables or disables fatal status of deprecations.'),
# NOTE(mikal): there are two options here because sometimes we are handed # NOTE(mikal): there are two options here because sometimes we are handed
# a full instance (and could include more information), and other times we # a full instance (and could include more information), and other times we
# are just handed a UUID for the instance. # are just handed a UUID for the instance.
cfg.StrOpt('instance_format', cfg.StrOpt('instance_format',
default='[instance: %(uuid)s] ', default='[instance: %(uuid)s] ',
help='If an instance is passed with the log message, format ' help='The format for an instance that is passed with the log '
'it like this'), 'message. '),
cfg.StrOpt('instance_uuid_format', cfg.StrOpt('instance_uuid_format',
default='[instance: %(uuid)s] ', default='[instance: %(uuid)s] ',
help='If an instance UUID is passed with the log message, ' help='The format for an instance UUID that is passed with the '
'format it like this'), 'log message. '),
] ]
CONF = cfg.CONF CONF = cfg.CONF
@@ -424,9 +424,7 @@ class JSONFormatter(logging.Formatter):
def _create_logging_excepthook(product_name): def _create_logging_excepthook(product_name):
def logging_excepthook(exc_type, value, tb): def logging_excepthook(exc_type, value, tb):
extra = {} extra = {'exc_info': (exc_type, value, tb)}
if CONF.verbose or CONF.debug:
extra['exc_info'] = (exc_type, value, tb)
getLogger(product_name).critical( getLogger(product_name).critical(
"".join(traceback.format_exception_only(exc_type, value)), "".join(traceback.format_exception_only(exc_type, value)),
**extra) **extra)
@@ -451,7 +449,7 @@ def _load_log_config(log_config_append):
logging.config.fileConfig(log_config_append, logging.config.fileConfig(log_config_append,
disable_existing_loggers=False) disable_existing_loggers=False)
except moves.configparser.Error as exc: except moves.configparser.Error as exc:
raise LogConfigError(log_config_append, str(exc)) raise LogConfigError(log_config_append, six.text_type(exc))
def setup(product_name, version='unknown'): def setup(product_name, version='unknown'):
@@ -464,9 +462,8 @@ def setup(product_name, version='unknown'):
def set_defaults(logging_context_format_string): def set_defaults(logging_context_format_string):
cfg.set_defaults(log_opts, cfg.set_defaults(
logging_context_format_string= log_opts, logging_context_format_string=logging_context_format_string)
logging_context_format_string)
def _find_facility_from_conf(): def _find_facility_from_conf():
@@ -543,9 +540,14 @@ def _setup_logging_from_conf(project, version):
log_root.addHandler(streamlog) log_root.addHandler(streamlog)
if CONF.publish_errors: if CONF.publish_errors:
handler = importutils.import_object( try:
"rally.openstack.common.log_handler.PublishErrorsHandler", handler = importutils.import_object(
logging.ERROR) "rally.openstack.common.log_handler.PublishErrorsHandler",
logging.ERROR)
except ImportError:
handler = importutils.import_object(
"oslo.messaging.notify.log_handler.PublishErrorsHandler",
logging.ERROR)
log_root.addHandler(handler) log_root.addHandler(handler)
datefmt = CONF.log_date_format datefmt = CONF.log_date_format
@@ -571,9 +573,15 @@ def _setup_logging_from_conf(project, version):
for pair in CONF.default_log_levels: for pair in CONF.default_log_levels:
mod, _sep, level_name = pair.partition('=') mod, _sep, level_name = pair.partition('=')
level = logging.getLevelName(level_name)
logger = logging.getLogger(mod) logger = logging.getLogger(mod)
logger.setLevel(level) # NOTE(AAzza) in python2.6 Logger.setLevel doesn't convert string name
# to integer code.
if sys.version_info < (2, 7):
level = logging.getLevelName(level_name)
logger.setLevel(level)
else:
logger.setLevel(level_name)
_loggers = {} _loggers = {}
@@ -662,14 +670,19 @@ class ContextFormatter(logging.Formatter):
record.__dict__[key] = '' record.__dict__[key] = ''
if record.__dict__.get('request_id'): if record.__dict__.get('request_id'):
self._fmt = CONF.logging_context_format_string fmt = CONF.logging_context_format_string
else: else:
self._fmt = CONF.logging_default_format_string fmt = CONF.logging_default_format_string
if (record.levelno == logging.DEBUG and if (record.levelno == logging.DEBUG and
CONF.logging_debug_format_suffix): CONF.logging_debug_format_suffix):
self._fmt += " " + CONF.logging_debug_format_suffix fmt += " " + CONF.logging_debug_format_suffix
if sys.version_info < (3, 2):
self._fmt = fmt
else:
self._style = logging.PercentStyle(fmt)
self._fmt = self._style._fmt
# Cache this on the record, Logger will respect our formatted copy # Cache this on the record, Logger will respect our formatted copy
if record.exc_info: if record.exc_info:
record.exc_text = self.formatException(record.exc_info, record) record.exc_text = self.formatException(record.exc_info, record)

View File

@@ -0,0 +1,142 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from eventlet import event
from eventlet import greenthread
from rally.openstack.common.gettextutils import _LE, _LW
from rally.openstack.common import log as logging
from rally.openstack.common import timeutils
LOG = logging.getLogger(__name__)
class LoopingCallDone(Exception):
"""Exception to break out and stop a LoopingCallBase.
The poll-function passed to LoopingCallBase can raise this exception to
break out of the loop normally. This is somewhat analogous to
StopIteration.
An optional return-value can be included as the argument to the exception;
this return-value will be returned by LoopingCallBase.wait()
"""
def __init__(self, retvalue=True):
""":param retvalue: Value that LoopingCallBase.wait() should return."""
self.retvalue = retvalue
class LoopingCallBase(object):
def __init__(self, f=None, *args, **kw):
self.args = args
self.kw = kw
self.f = f
self._running = False
self.done = None
def stop(self):
self._running = False
def wait(self):
return self.done.wait()
class FixedIntervalLoopingCall(LoopingCallBase):
"""A fixed interval looping call."""
def start(self, interval, initial_delay=None):
self._running = True
done = event.Event()
def _inner():
if initial_delay:
greenthread.sleep(initial_delay)
try:
while self._running:
start = timeutils.utcnow()
self.f(*self.args, **self.kw)
end = timeutils.utcnow()
if not self._running:
break
delay = interval - timeutils.delta_seconds(start, end)
if delay <= 0:
LOG.warn(_LW('task %(func_name)s run outlasted '
'interval by %(delay)s sec'),
{'func_name': repr(self.f), 'delay': -delay})
greenthread.sleep(delay if delay > 0 else 0)
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_LE('in fixed duration looping call'))
done.send_exception(*sys.exc_info())
return
else:
done.send(True)
self.done = done
greenthread.spawn_n(_inner)
return self.done
class DynamicLoopingCall(LoopingCallBase):
"""A looping call which sleeps until the next known event.
The function called should return how long to sleep for before being
called again.
"""
def start(self, initial_delay=None, periodic_interval_max=None):
self._running = True
done = event.Event()
def _inner():
if initial_delay:
greenthread.sleep(initial_delay)
try:
while self._running:
idle = self.f(*self.args, **self.kw)
if not self._running:
break
if periodic_interval_max is not None:
idle = min(idle, periodic_interval_max)
LOG.debug('Dynamic looping call %(func_name)s sleeping '
'for %(idle).02f seconds',
{'func_name': repr(self.f), 'idle': idle})
greenthread.sleep(idle)
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_LE('in dynamic looping call'))
done.send_exception(*sys.exc_info())
return
else:
done.send(True)
self.done = done
greenthread.spawn(_inner)
return self.done

View File

@@ -0,0 +1,183 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo.config import cfg
import six
from rally.openstack.common.gettextutils import _, _LE, _LI
from rally.openstack.common import log as logging
periodic_opts = [
cfg.BoolOpt('run_external_periodic_tasks',
default=True,
help='Some periodic tasks can be run in a separate process. '
'Should we run them here?'),
]
CONF = cfg.CONF
CONF.register_opts(periodic_opts)
LOG = logging.getLogger(__name__)
DEFAULT_INTERVAL = 60.0
class InvalidPeriodicTaskArg(Exception):
message = _("Unexpected argument for periodic task creation: %(arg)s.")
def periodic_task(*args, **kwargs):
"""Decorator to indicate that a method is a periodic task.
This decorator can be used in two ways:
1. Without arguments '@periodic_task', this will be run on the default
interval of 60 seconds.
2. With arguments:
@periodic_task(spacing=N [, run_immediately=[True|False]])
this will be run on approximately every N seconds. If this number is
negative the periodic task will be disabled. If the run_immediately
argument is provided and has a value of 'True', the first run of the
task will be shortly after task scheduler starts. If
run_immediately is omitted or set to 'False', the first time the
task runs will be approximately N seconds after the task scheduler
starts.
"""
def decorator(f):
# Test for old style invocation
if 'ticks_between_runs' in kwargs:
raise InvalidPeriodicTaskArg(arg='ticks_between_runs')
# Control if run at all
f._periodic_task = True
f._periodic_external_ok = kwargs.pop('external_process_ok', False)
if f._periodic_external_ok and not CONF.run_external_periodic_tasks:
f._periodic_enabled = False
else:
f._periodic_enabled = kwargs.pop('enabled', True)
# Control frequency
f._periodic_spacing = kwargs.pop('spacing', 0)
f._periodic_immediate = kwargs.pop('run_immediately', False)
if f._periodic_immediate:
f._periodic_last_run = None
else:
f._periodic_last_run = time.time()
return f
# NOTE(sirp): The `if` is necessary to allow the decorator to be used with
# and without parenthesis.
#
# In the 'with-parenthesis' case (with kwargs present), this function needs
# to return a decorator function since the interpreter will invoke it like:
#
# periodic_task(*args, **kwargs)(f)
#
# In the 'without-parenthesis' case, the original function will be passed
# in as the first argument, like:
#
# periodic_task(f)
if kwargs:
return decorator
else:
return decorator(args[0])
class _PeriodicTasksMeta(type):
def __init__(cls, names, bases, dict_):
"""Metaclass that allows us to collect decorated periodic tasks."""
super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
# NOTE(sirp): if the attribute is not present then we must be the base
# class, so, go ahead an initialize it. If the attribute is present,
# then we're a subclass so make a copy of it so we don't step on our
# parent's toes.
try:
cls._periodic_tasks = cls._periodic_tasks[:]
except AttributeError:
cls._periodic_tasks = []
try:
cls._periodic_spacing = cls._periodic_spacing.copy()
except AttributeError:
cls._periodic_spacing = {}
for value in cls.__dict__.values():
if getattr(value, '_periodic_task', False):
task = value
name = task.__name__
if task._periodic_spacing < 0:
LOG.info(_LI('Skipping periodic task %(task)s because '
'its interval is negative'),
{'task': name})
continue
if not task._periodic_enabled:
LOG.info(_LI('Skipping periodic task %(task)s because '
'it is disabled'),
{'task': name})
continue
# A periodic spacing of zero indicates that this task should
# be run on the default interval to avoid running too
# frequently.
if task._periodic_spacing == 0:
task._periodic_spacing = DEFAULT_INTERVAL
cls._periodic_tasks.append((name, task))
cls._periodic_spacing[name] = task._periodic_spacing
@six.add_metaclass(_PeriodicTasksMeta)
class PeriodicTasks(object):
def __init__(self):
super(PeriodicTasks, self).__init__()
self._periodic_last_run = {}
for name, task in self._periodic_tasks:
self._periodic_last_run[name] = task._periodic_last_run
def run_periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
idle_for = DEFAULT_INTERVAL
for task_name, task in self._periodic_tasks:
full_task_name = '.'.join([self.__class__.__name__, task_name])
spacing = self._periodic_spacing[task_name]
last_run = self._periodic_last_run[task_name]
# If a periodic task is _nearly_ due, then we'll run it early
idle_for = min(idle_for, spacing)
if last_run is not None:
delta = last_run + spacing - time.time()
if delta > 0.2:
idle_for = min(idle_for, delta)
continue
LOG.debug("Running periodic task %(full_task_name)s",
{"full_task_name": full_task_name})
self._periodic_last_run[task_name] = time.time()
try:
task(self, context)
except Exception as e:
if raise_on_error:
raise
LOG.exception(_LE("Error during %(full_task_name)s: %(e)s"),
{"full_task_name": full_task_name, "e": e})
time.sleep(0)
return idle_for

View File

@@ -0,0 +1,512 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Generic Node base class for all workers that run on hosts."""
import errno
import logging as std_logging
import os
import random
import signal
import sys
import time
try:
# Importing just the symbol here because the io module does not
# exist in Python 2.6.
from io import UnsupportedOperation # noqa
except ImportError:
# Python 2.6
UnsupportedOperation = None
import eventlet
from eventlet import event
from oslo.config import cfg
from rally.openstack.common import eventlet_backdoor
from rally.openstack.common.gettextutils import _LE, _LI, _LW
from rally.openstack.common import importutils
from rally.openstack.common import log as logging
from rally.openstack.common import systemd
from rally.openstack.common import threadgroup
rpc = importutils.try_import('rally.openstack.common.rpc')
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def _sighup_supported():
return hasattr(signal, 'SIGHUP')
def _is_daemon():
# The process group for a foreground process will match the
# process group of the controlling terminal. If those values do
# not match, or ioctl() fails on the stdout file handle, we assume
# the process is running in the background as a daemon.
# http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
try:
is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
except OSError as err:
if err.errno == errno.ENOTTY:
# Assume we are a daemon because there is no terminal.
is_daemon = True
else:
raise
except UnsupportedOperation:
# Could not get the fileno for stdout, so we must be a daemon.
is_daemon = True
return is_daemon
def _is_sighup_and_daemon(signo):
if not (_sighup_supported() and signo == signal.SIGHUP):
# Avoid checking if we are a daemon, because the signal isn't
# SIGHUP.
return False
return _is_daemon()
def _signo_to_signame(signo):
signals = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}
if _sighup_supported():
signals[signal.SIGHUP] = 'SIGHUP'
return signals[signo]
def _set_signals_handler(handler):
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
if _sighup_supported():
signal.signal(signal.SIGHUP, handler)
class Launcher(object):
"""Launch one or more services and wait for them to complete."""
def __init__(self):
"""Initialize the service launcher.
:returns: None
"""
self.services = Services()
self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
def launch_service(self, service):
"""Load and start the given service.
:param service: The service you would like to start.
:returns: None
"""
service.backdoor_port = self.backdoor_port
self.services.add(service)
def stop(self):
"""Stop all services which are currently running.
:returns: None
"""
self.services.stop()
def wait(self):
"""Waits until all services have been stopped, and then returns.
:returns: None
"""
self.services.wait()
def restart(self):
"""Reload config files and restart service.
:returns: None
"""
cfg.CONF.reload_config_files()
self.services.restart()
class SignalExit(SystemExit):
def __init__(self, signo, exccode=1):
super(SignalExit, self).__init__(exccode)
self.signo = signo
class ServiceLauncher(Launcher):
def _handle_signal(self, signo, frame):
# Allow the process to be killed again and die from natural causes
_set_signals_handler(signal.SIG_DFL)
raise SignalExit(signo)
def handle_signal(self):
_set_signals_handler(self._handle_signal)
def _wait_for_exit_or_signal(self, ready_callback=None):
status = None
signo = 0
LOG.debug('Full set of CONF:')
CONF.log_opt_values(LOG, std_logging.DEBUG)
try:
if ready_callback:
ready_callback()
super(ServiceLauncher, self).wait()
except SignalExit as exc:
signame = _signo_to_signame(exc.signo)
LOG.info(_LI('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
finally:
self.stop()
if rpc:
try:
rpc.cleanup()
except Exception:
# We're shutting down, so it doesn't matter at this point.
LOG.exception(_LE('Exception during rpc cleanup.'))
return status, signo
def wait(self, ready_callback=None):
systemd.notify_once()
while True:
self.handle_signal()
status, signo = self._wait_for_exit_or_signal(ready_callback)
if not _is_sighup_and_daemon(signo):
return status
self.restart()
class ServiceWrapper(object):
def __init__(self, service, workers):
self.service = service
self.workers = workers
self.children = set()
self.forktimes = []
class ProcessLauncher(object):
def __init__(self, wait_interval=0.01):
"""Constructor.
:param wait_interval: The interval to sleep for between checks
of child process exit.
"""
self.children = {}
self.sigcaught = None
self.running = True
self.wait_interval = wait_interval
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
self.handle_signal()
def handle_signal(self):
_set_signals_handler(self._handle_signal)
def _handle_signal(self, signo, frame):
self.sigcaught = signo
self.running = False
# Allow the process to be killed again and die from natural causes
_set_signals_handler(signal.SIG_DFL)
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
# dies unexpectedly
self.readpipe.read()
LOG.info(_LI('Parent process has died unexpectedly, exiting'))
sys.exit(1)
def _child_process_handle_signal(self):
# Setup child signal handlers differently
def _sigterm(*args):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SignalExit(signal.SIGTERM)
def _sighup(*args):
signal.signal(signal.SIGHUP, signal.SIG_DFL)
raise SignalExit(signal.SIGHUP)
signal.signal(signal.SIGTERM, _sigterm)
if _sighup_supported():
signal.signal(signal.SIGHUP, _sighup)
# Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN)
def _child_wait_for_exit_or_signal(self, launcher):
status = 0
signo = 0
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
try:
launcher.wait()
except SignalExit as exc:
signame = _signo_to_signame(exc.signo)
LOG.info(_LI('Child caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_LE('Unhandled exception'))
status = 2
finally:
launcher.stop()
return status, signo
def _child_process(self, service):
self._child_process_handle_signal()
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn_n(self._pipe_watcher)
# Reseed random number generator
random.seed()
launcher = Launcher()
launcher.launch_service(service)
return launcher
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
# Limit ourselves to one process a second (over the period of
# number of workers * 1 second). This will allow workers to
# start up quickly but ensure we don't fork off children that
# die instantly too quickly.
if time.time() - wrap.forktimes[0] < wrap.workers:
LOG.info(_LI('Forking too fast, sleeping'))
time.sleep(1)
wrap.forktimes.pop(0)
wrap.forktimes.append(time.time())
pid = os.fork()
if pid == 0:
launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(launcher)
if not _is_sighup_and_daemon(signo):
break
launcher.restart()
os._exit(status)
LOG.info(_LI('Started child %d'), pid)
wrap.children.add(pid)
self.children[pid] = wrap
return pid
def launch_service(self, service, workers=1):
wrap = ServiceWrapper(service, workers)
LOG.info(_LI('Starting %d workers'), wrap.workers)
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
def _wait_child(self):
try:
# Don't block if no child processes have exited
pid, status = os.waitpid(0, os.WNOHANG)
if not pid:
return None
except OSError as exc:
if exc.errno not in (errno.EINTR, errno.ECHILD):
raise
return None
if os.WIFSIGNALED(status):
sig = os.WTERMSIG(status)
LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'),
dict(pid=pid, sig=sig))
else:
code = os.WEXITSTATUS(status)
LOG.info(_LI('Child %(pid)s exited with status %(code)d'),
dict(pid=pid, code=code))
if pid not in self.children:
LOG.warning(_LW('pid %d not in child list'), pid)
return None
wrap = self.children.pop(pid)
wrap.children.remove(pid)
return wrap
def _respawn_children(self):
while self.running:
wrap = self._wait_child()
if not wrap:
# Yield to other threads if no children have exited
# Sleep for a short time to avoid excessive CPU usage
# (see bug #1095346)
eventlet.greenthread.sleep(self.wait_interval)
continue
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
def wait(self):
"""Loop waiting on children to die and respawning as necessary."""
systemd.notify_once()
LOG.debug('Full set of CONF:')
CONF.log_opt_values(LOG, std_logging.DEBUG)
try:
while True:
self.handle_signal()
self._respawn_children()
# No signal means that stop was called. Don't clean up here.
if not self.sigcaught:
return
signame = _signo_to_signame(self.sigcaught)
LOG.info(_LI('Caught %s, stopping children'), signame)
if not _is_sighup_and_daemon(self.sigcaught):
break
for pid in self.children:
os.kill(pid, signal.SIGHUP)
self.running = True
self.sigcaught = None
except eventlet.greenlet.GreenletExit:
LOG.info(_LI("Wait called after thread killed. Cleaning up."))
self.stop()
def stop(self):
"""Terminate child processes and wait on each."""
self.running = False
for pid in self.children:
try:
os.kill(pid, signal.SIGTERM)
except OSError as exc:
if exc.errno != errno.ESRCH:
raise
# Wait for children to die
if self.children:
LOG.info(_LI('Waiting on %d children to exit'), len(self.children))
while self.children:
self._wait_child()
class Service(object):
"""Service object for binaries running on hosts."""
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup(threads)
# signal that the service is done shutting itself down:
self._done = event.Event()
def reset(self):
# NOTE(Fengqian): docs for Event.reset() recommend against using it
self._done = event.Event()
def start(self):
pass
def stop(self):
self.tg.stop()
self.tg.wait()
# Signal that service cleanup is done:
if not self._done.ready():
self._done.send()
def wait(self):
self._done.wait()
class Services(object):
def __init__(self):
self.services = []
self.tg = threadgroup.ThreadGroup()
self.done = event.Event()
def add(self, service):
self.services.append(service)
self.tg.add_thread(self.run_service, service, self.done)
def stop(self):
# wait for graceful shutdown of services:
for service in self.services:
service.stop()
service.wait()
# Each service has performed cleanup, now signal that the run_service
# wrapper threads can now die:
if not self.done.ready():
self.done.send()
# reap threads:
self.tg.stop()
def wait(self):
self.tg.wait()
def restart(self):
self.stop()
self.done = event.Event()
for restart_service in self.services:
restart_service.reset()
self.tg.add_thread(self.run_service, restart_service, self.done)
@staticmethod
def run_service(service, done):
"""Service start wrapper.
:param service: service to run
:param done: event to wait on until a shutdown is triggered
:returns: None
"""
service.start()
done.wait()
def launch(service, workers=1):
if workers is None or workers == 1:
launcher = ServiceLauncher()
launcher.launch_service(service)
else:
launcher = ProcessLauncher()
launcher.launch_service(service, workers=workers)
return launcher

View File

@@ -17,6 +17,7 @@
System-level utilities and helper functions. System-level utilities and helper functions.
""" """
import math
import re import re
import sys import sys
import unicodedata import unicodedata
@@ -26,16 +27,21 @@ import six
from rally.openstack.common.gettextutils import _ from rally.openstack.common.gettextutils import _
# Used for looking up extensions of text UNIT_PREFIX_EXPONENT = {
# to their 'multiplied' byte amount 'k': 1,
BYTE_MULTIPLIERS = { 'K': 1,
'': 1, 'Ki': 1,
't': 1024 ** 4, 'M': 2,
'g': 1024 ** 3, 'Mi': 2,
'm': 1024 ** 2, 'G': 3,
'k': 1024, 'Gi': 3,
'T': 4,
'Ti': 4,
}
UNIT_SYSTEM_INFO = {
'IEC': (1024, re.compile(r'(^[-+]?\d*\.?\d+)([KMGT]i?)?(b|bit|B)$')),
'SI': (1000, re.compile(r'(^[-+]?\d*\.?\d+)([kMGT])?(b|bit|B)$')),
} }
BYTE_REGEX = re.compile(r'(^-?\d+)(\D*)')
TRUE_STRINGS = ('1', 't', 'true', 'on', 'y', 'yes') TRUE_STRINGS = ('1', 't', 'true', 'on', 'y', 'yes')
FALSE_STRINGS = ('0', 'f', 'false', 'off', 'n', 'no') FALSE_STRINGS = ('0', 'f', 'false', 'off', 'n', 'no')
@@ -58,12 +64,12 @@ def int_from_bool_as_string(subject):
return bool_from_string(subject) and 1 or 0 return bool_from_string(subject) and 1 or 0
def bool_from_string(subject, strict=False): def bool_from_string(subject, strict=False, default=False):
"""Interpret a string as a boolean. """Interpret a string as a boolean.
A case-insensitive match is performed such that strings matching 't', A case-insensitive match is performed such that strings matching 't',
'true', 'on', 'y', 'yes', or '1' are considered True and, when 'true', 'on', 'y', 'yes', or '1' are considered True and, when
`strict=False`, anything else is considered False. `strict=False`, anything else returns the value specified by 'default'.
Useful for JSON-decoded stuff and config file parsing. Useful for JSON-decoded stuff and config file parsing.
@@ -72,7 +78,7 @@ def bool_from_string(subject, strict=False):
Strings yielding False are 'f', 'false', 'off', 'n', 'no', or '0'. Strings yielding False are 'f', 'false', 'off', 'n', 'no', or '0'.
""" """
if not isinstance(subject, six.string_types): if not isinstance(subject, six.string_types):
subject = str(subject) subject = six.text_type(subject)
lowered = subject.strip().lower() lowered = subject.strip().lower()
@@ -88,11 +94,12 @@ def bool_from_string(subject, strict=False):
'acceptable': acceptable} 'acceptable': acceptable}
raise ValueError(msg) raise ValueError(msg)
else: else:
return False return default
def safe_decode(text, incoming=None, errors='strict'): def safe_decode(text, incoming=None, errors='strict'):
"""Decodes incoming str using `incoming` if they're not already unicode. """Decodes incoming text/bytes string using `incoming` if they're not
already unicode.
:param incoming: Text's current encoding :param incoming: Text's current encoding
:param errors: Errors handling policy. See here for valid :param errors: Errors handling policy. See here for valid
@@ -101,7 +108,7 @@ def safe_decode(text, incoming=None, errors='strict'):
representation of it. representation of it.
:raises TypeError: If text is not an instance of str :raises TypeError: If text is not an instance of str
""" """
if not isinstance(text, six.string_types): if not isinstance(text, (six.string_types, six.binary_type)):
raise TypeError("%s can't be decoded" % type(text)) raise TypeError("%s can't be decoded" % type(text))
if isinstance(text, six.text_type): if isinstance(text, six.text_type):
@@ -131,7 +138,7 @@ def safe_decode(text, incoming=None, errors='strict'):
def safe_encode(text, incoming=None, def safe_encode(text, incoming=None,
encoding='utf-8', errors='strict'): encoding='utf-8', errors='strict'):
"""Encodes incoming str/unicode using `encoding`. """Encodes incoming text/bytes string using `encoding`.
If incoming is not specified, text is expected to be encoded with If incoming is not specified, text is expected to be encoded with
current python's default encoding. (`sys.getdefaultencoding`) current python's default encoding. (`sys.getdefaultencoding`)
@@ -144,7 +151,7 @@ def safe_encode(text, incoming=None,
representation of it. representation of it.
:raises TypeError: If text is not an instance of str :raises TypeError: If text is not an instance of str
""" """
if not isinstance(text, six.string_types): if not isinstance(text, (six.string_types, six.binary_type)):
raise TypeError("%s can't be encoded" % type(text)) raise TypeError("%s can't be encoded" % type(text))
if not incoming: if not incoming:
@@ -152,49 +159,59 @@ def safe_encode(text, incoming=None,
sys.getdefaultencoding()) sys.getdefaultencoding())
if isinstance(text, six.text_type): if isinstance(text, six.text_type):
if six.PY3: return text.encode(encoding, errors)
return text.encode(encoding, errors).decode(incoming)
else:
return text.encode(encoding, errors)
elif text and encoding != incoming: elif text and encoding != incoming:
# Decode text before encoding it with `encoding` # Decode text before encoding it with `encoding`
text = safe_decode(text, incoming, errors) text = safe_decode(text, incoming, errors)
if six.PY3: return text.encode(encoding, errors)
return text.encode(encoding, errors).decode(incoming) else:
else: return text
return text.encode(encoding, errors)
return text
def to_bytes(text, default=0): def string_to_bytes(text, unit_system='IEC', return_int=False):
"""Converts a string into an integer of bytes. """Converts a string into an float representation of bytes.
Looks at the last characters of the text to determine The units supported for IEC ::
what conversion is needed to turn the input text into a byte number.
Supports "B, K(B), M(B), G(B), and T(B)". (case insensitive) Kb(it), Kib(it), Mb(it), Mib(it), Gb(it), Gib(it), Tb(it), Tib(it)
KB, KiB, MB, MiB, GB, GiB, TB, TiB
The units supported for SI ::
kb(it), Mb(it), Gb(it), Tb(it)
kB, MB, GB, TB
Note that the SI unit system does not support capital letter 'K'
:param text: String input for bytes size conversion. :param text: String input for bytes size conversion.
:param default: Default return value when text is blank. :param unit_system: Unit system for byte size conversion.
:param return_int: If True, returns integer representation of text
in bytes. (default: decimal)
:returns: Numerical representation of text in bytes.
:raises ValueError: If text has an invalid value.
""" """
match = BYTE_REGEX.search(text) try:
base, reg_ex = UNIT_SYSTEM_INFO[unit_system]
except KeyError:
msg = _('Invalid unit system: "%s"') % unit_system
raise ValueError(msg)
match = reg_ex.match(text)
if match: if match:
magnitude = int(match.group(1)) magnitude = float(match.group(1))
mult_key_org = match.group(2) unit_prefix = match.group(2)
if not mult_key_org: if match.group(3) in ['b', 'bit']:
return magnitude magnitude /= 8
elif text:
msg = _('Invalid string format: %s') % text
raise TypeError(msg)
else: else:
return default msg = _('Invalid string format: %s') % text
mult_key = mult_key_org.lower().replace('b', '', 1) raise ValueError(msg)
multiplier = BYTE_MULTIPLIERS.get(mult_key) if not unit_prefix:
if multiplier is None: res = magnitude
msg = _('Unknown byte multiplier: %s') % mult_key_org else:
raise TypeError(msg) res = magnitude * pow(base, UNIT_PREFIX_EXPONENT[unit_prefix])
return magnitude * multiplier if return_int:
return int(math.ceil(res))
return res
def to_slug(value, incoming=None, errors="strict"): def to_slug(value, incoming=None, errors="strict"):

View File

@@ -0,0 +1,106 @@
# Copyright 2012-2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Helper module for systemd service readiness notification.
"""
import os
import socket
import sys
from rally.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def _abstractify(socket_name):
if socket_name.startswith('@'):
# abstract namespace socket
socket_name = '\0%s' % socket_name[1:]
return socket_name
def _sd_notify(unset_env, msg):
notify_socket = os.getenv('NOTIFY_SOCKET')
if notify_socket:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
sock.connect(_abstractify(notify_socket))
sock.sendall(msg)
if unset_env:
del os.environ['NOTIFY_SOCKET']
except EnvironmentError:
LOG.debug("Systemd notification failed", exc_info=True)
finally:
sock.close()
def notify():
"""Send notification to Systemd that service is ready.
For details see
http://www.freedesktop.org/software/systemd/man/sd_notify.html
"""
_sd_notify(False, 'READY=1')
def notify_once():
"""Send notification once to Systemd that service is ready.
Systemd sets NOTIFY_SOCKET environment variable with the name of the
socket listening for notifications from services.
This method removes the NOTIFY_SOCKET environment variable to ensure
notification is sent only once.
"""
_sd_notify(True, 'READY=1')
def onready(notify_socket, timeout):
"""Wait for systemd style notification on the socket.
:param notify_socket: local socket address
:type notify_socket: string
:param timeout: socket timeout
:type timeout: float
:returns: 0 service ready
1 service not ready
2 timeout occurred
"""
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.settimeout(timeout)
sock.bind(_abstractify(notify_socket))
try:
msg = sock.recv(512)
except socket.timeout:
return 2
finally:
sock.close()
if 'READY=1' in msg:
return 0
else:
return 1
if __name__ == '__main__':
# simple CLI for testing
if len(sys.argv) == 1:
notify()
elif len(sys.argv) >= 2:
timeout = float(sys.argv[1])
notify_socket = os.getenv('NOTIFY_SOCKET')
if notify_socket:
retval = onready(notify_socket, timeout)
sys.exit(retval)

View File

@@ -0,0 +1,147 @@
# Copyright 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import eventlet
from eventlet import greenpool
from rally.openstack.common import log as logging
from rally.openstack.common import loopingcall
LOG = logging.getLogger(__name__)
def _thread_done(gt, *args, **kwargs):
"""Callback function to be passed to GreenThread.link() when we spawn()
Calls the :class:`ThreadGroup` to notify if.
"""
kwargs['group'].thread_done(kwargs['thread'])
class Thread(object):
"""Wrapper around a greenthread, that holds a reference to the
:class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
it has done so it can be removed from the threads list.
"""
def __init__(self, thread, group):
self.thread = thread
self.thread.link(_thread_done, group=group, thread=self)
def stop(self):
self.thread.kill()
def wait(self):
return self.thread.wait()
def link(self, func, *args, **kwargs):
self.thread.link(func, *args, **kwargs)
class ThreadGroup(object):
"""The point of the ThreadGroup class is to:
* keep track of timers and greenthreads (making it easier to stop them
when need be).
* provide an easy API to add timers.
"""
def __init__(self, thread_pool_size=10):
self.pool = greenpool.GreenPool(thread_pool_size)
self.threads = []
self.timers = []
def add_dynamic_timer(self, callback, initial_delay=None,
periodic_interval_max=None, *args, **kwargs):
timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs)
timer.start(initial_delay=initial_delay,
periodic_interval_max=periodic_interval_max)
self.timers.append(timer)
def add_timer(self, interval, callback, initial_delay=None,
*args, **kwargs):
pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)
pulse.start(interval=interval,
initial_delay=initial_delay)
self.timers.append(pulse)
def add_thread(self, callback, *args, **kwargs):
gt = self.pool.spawn(callback, *args, **kwargs)
th = Thread(gt, self)
self.threads.append(th)
return th
def thread_done(self, thread):
self.threads.remove(thread)
def _stop_threads(self):
current = threading.current_thread()
# Iterate over a copy of self.threads so thread_done doesn't
# modify the list while we're iterating
for x in self.threads[:]:
if x is current:
# don't kill the current thread.
continue
try:
x.stop()
except Exception as ex:
LOG.exception(ex)
def stop_timers(self):
for x in self.timers:
try:
x.stop()
except Exception as ex:
LOG.exception(ex)
self.timers = []
def stop(self, graceful=False):
"""stop function has the option of graceful=True/False.
* In case of graceful=True, wait for all threads to be finished.
Never kill threads.
* In case of graceful=False, kill threads immediately.
"""
self.stop_timers()
if graceful:
# In case of graceful=True, wait for all threads to be
# finished, never kill threads
self.wait()
else:
# In case of graceful=False(Default), kill threads
# immediately
self._stop_threads()
def wait(self):
for x in self.timers:
try:
x.wait()
except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)
current = threading.current_thread()
# Iterate over a copy of self.threads so thread_done doesn't
# modify the list while we're iterating
for x in self.threads[:]:
if x is current:
continue
try:
x.wait()
except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)

View File

@@ -114,7 +114,7 @@ def utcnow():
def iso8601_from_timestamp(timestamp): def iso8601_from_timestamp(timestamp):
"""Returns a iso8601 formated date from timestamp.""" """Returns an iso8601 formatted date from timestamp."""
return isotime(datetime.datetime.utcfromtimestamp(timestamp)) return isotime(datetime.datetime.utcfromtimestamp(timestamp))
@@ -134,7 +134,7 @@ def set_time_override(override_time=None):
def advance_time_delta(timedelta): def advance_time_delta(timedelta):
"""Advance overridden time using a datetime.timedelta.""" """Advance overridden time using a datetime.timedelta."""
assert(not utcnow.override_time is None) assert utcnow.override_time is not None
try: try:
for dt in utcnow.override_time: for dt in utcnow.override_time:
dt += timedelta dt += timedelta
@@ -201,8 +201,8 @@ def total_seconds(delta):
def is_soon(dt, window): def is_soon(dt, window):
"""Determines if time is going to happen in the next window seconds. """Determines if time is going to happen in the next window seconds.
:params dt: the time :param dt: the time
:params window: minimum seconds to remain to consider the time not soon :param window: minimum seconds to remain to consider the time not soon
:return: True if expiration is within the given duration :return: True if expiration is within the given duration
""" """