Latest OSLO updates

Change-Id: Ibf223203c8b34f614357fa4539d0dfa953765d6b
This commit is contained in:
Gary Kotton 2013-01-07 13:52:21 +00:00
parent 939200d35d
commit de99585d4f
29 changed files with 512 additions and 187 deletions

View File

@ -27,6 +27,7 @@ from quantum.api.v2 import attributes
from quantum.common import utils
from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.version import version_info as quantum_version
@ -49,9 +50,6 @@ core_opts = [
cfg.IntOpt('max_subnet_host_routes', default=20),
cfg.IntOpt('dhcp_lease_duration', default=120),
cfg.BoolOpt('allow_overlapping_ips', default=False),
cfg.StrOpt('control_exchange',
default='quantum',
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.StrOpt('host', default=utils.get_hostname()),
cfg.BoolOpt('force_gateway_on_subnet', default=False,
help=_("Ensure that configured gateway is on subnet")),
@ -67,6 +65,7 @@ cfg.CONF.register_cli_opts(core_cli_opts)
def parse(args):
rpc.set_defaults(control_exchange='quantum')
cfg.CONF(args=args, project='quantum',
version='%%prog %s' % quantum_version.version_string_with_vcs())

View File

@ -46,7 +46,7 @@ def _find_objects(t):
def _print_greenthreads():
for i, gt in enumerate(find_objects(greenlet.greenlet)):
for i, gt in enumerate(_find_objects(greenlet.greenlet)):
print i, gt
traceback.print_stack(gt.gr_frame)
print
@ -61,7 +61,7 @@ def initialize_if_enabled():
}
if CONF.backdoor_port is None:
return
return None
# NOTE(johannes): The standard sys.displayhook will print the value of
# the last expression and set it to __builtin__._, which overwrites
@ -73,6 +73,8 @@ def initialize_if_enabled():
pprint.pprint(val)
sys.displayhook = displayhook
eventlet.spawn_n(eventlet.backdoor.backdoor_server,
eventlet.listen(('localhost', CONF.backdoor_port)),
sock = eventlet.listen(('localhost', CONF.backdoor_port))
port = sock.getsockname()[1]
eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
locals=backdoor_locals)
return port

View File

@ -21,6 +21,8 @@ Exceptions common to OpenStack projects
import logging
from quantum.openstack.common.gettextutils import _
class Error(Exception):
def __init__(self, message=None):
@ -97,7 +99,7 @@ def wrap_exception(f):
except Exception, e:
if not isinstance(e, Error):
#exc_type, exc_value, exc_traceback = sys.exc_info()
logging.exception('Uncaught exception')
logging.exception(_('Uncaught exception'))
#logging.error(traceback.extract_stack(exc_traceback))
raise Error(str(e))
raise

View File

@ -24,6 +24,8 @@ import logging
import sys
import traceback
from quantum.openstack.common.gettextutils import _
@contextlib.contextmanager
def save_and_reraise_exception():
@ -43,7 +45,7 @@ def save_and_reraise_exception():
try:
yield
except Exception:
logging.error('Original exception being dropped: %s' %
(traceback.format_exception(type_, value, tb)))
logging.error(_('Original exception being dropped: %s'),
traceback.format_exception(type_, value, tb))
raise
raise type_, value, tb

View File

@ -29,7 +29,7 @@ def import_class(import_str):
try:
__import__(mod_str)
return getattr(sys.modules[mod_str], class_str)
except (ValueError, AttributeError), exc:
except (ValueError, AttributeError):
raise ImportError('Class %s cannot be found (%s)' %
(class_str,
traceback.format_exception(*sys.exc_info())))

View File

@ -120,7 +120,7 @@ def to_primitive(value, convert_instances=False, level=0):
level=level + 1)
else:
return value
except TypeError, e:
except TypeError:
# Class objects are tricky since they may define something like
# __iter__ defined but it isn't callable as list().
return unicode(value)

View File

@ -28,6 +28,7 @@ from eventlet import semaphore
from quantum.openstack.common import cfg
from quantum.openstack.common import fileutils
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging

View File

@ -49,19 +49,20 @@ from quantum.openstack.common import notifier
log_opts = [
cfg.StrOpt('logging_context_format_string',
default='%(asctime)s %(levelname)s %(name)s [%(request_id)s '
'%(user_id)s %(project_id)s] %(instance)s'
default='%(asctime)s.%(msecs)d %(levelname)s %(name)s '
'[%(request_id)s %(user)s %(tenant)s] %(instance)s'
'%(message)s',
help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string',
default='%(asctime)s %(process)d %(levelname)s %(name)s [-]'
' %(instance)s%(message)s',
default='%(asctime)s.%(msecs)d %(process)d %(levelname)s '
'%(name)s [-] %(instance)s%(message)s',
help='format string to use for log messages without context'),
cfg.StrOpt('logging_debug_format_suffix',
default='%(funcName)s %(pathname)s:%(lineno)d',
help='data to append to log format when level is DEBUG'),
cfg.StrOpt('logging_exception_prefix',
default='%(asctime)s %(process)d TRACE %(name)s %(instance)s',
default='%(asctime)s.%(msecs)d %(process)d TRACE %(name)s '
'%(instance)s',
help='prefix each line of exception output with this format'),
cfg.ListOpt('default_log_levels',
default=[
@ -174,7 +175,7 @@ class ContextAdapter(logging.LoggerAdapter):
self.log(logging.AUDIT, msg, *args, **kwargs)
def deprecated(self, msg, *args, **kwargs):
stdmsg = _("Deprecated Config: %s") % msg
stdmsg = _("Deprecated: %s") % msg
if CONF.fatal_deprecations:
self.critical(stdmsg, *args, **kwargs)
raise DeprecatedConfig(msg=stdmsg)
@ -289,6 +290,12 @@ def setup(product_name):
_setup_logging_from_conf(product_name)
def set_defaults(logging_context_format_string):
cfg.set_defaults(log_opts,
logging_context_format_string=
logging_context_format_string)
def _find_facility_from_conf():
facility_names = logging.handlers.SysLogHandler.facility_names
facility = getattr(logging.handlers.SysLogHandler,

View File

@ -24,6 +24,7 @@ from eventlet import greenthread
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging
from quantum.openstack.common import timeutils
LOG = logging.getLogger(__name__)
@ -62,10 +63,16 @@ class LoopingCall(object):
try:
while self._running:
start = timeutils.utcnow()
self.f(*self.args, **self.kw)
end = timeutils.utcnow()
if not self._running:
break
greenthread.sleep(interval)
delay = interval - timeutils.delta_seconds(start, end)
if delay <= 0:
LOG.warn(_('task run outlasted interval by %s sec') %
-delay)
greenthread.sleep(delay if delay > 0 else 0)
except LoopingCallDone, e:
self.stop()
done.send(e.retvalue)

View File

@ -137,10 +137,11 @@ def notify(context, publisher_id, event_type, priority, payload):
for driver in _get_drivers():
try:
driver.notify(context, msg)
except Exception, e:
except Exception as e:
LOG.exception(_("Problem '%(e)s' attempting to "
"send to notification system. "
"Payload=%(payload)s") % locals())
"Payload=%(payload)s")
% dict(e=e, payload=payload))
_drivers = None
@ -166,7 +167,7 @@ def add_driver(notification_driver):
try:
driver = importutils.import_module(notification_driver)
_drivers[notification_driver] = driver
except ImportError as e:
except ImportError:
LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") %
notification_driver)

View File

@ -41,6 +41,6 @@ def notify(context, message):
topic = '%s.%s' % (topic, priority)
try:
rpc.notify(context, topic, message)
except Exception, e:
except Exception:
LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"), locals())

View File

@ -0,0 +1,51 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
'''messaging based notification driver, with message envelopes'''
from quantum.openstack.common import cfg
from quantum.openstack.common import context as req_context
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
LOG = logging.getLogger(__name__)
notification_topic_opt = cfg.ListOpt(
'topics', default=['notifications', ],
help='AMQP topic(s) used for openstack notifications')
opt_group = cfg.OptGroup(name='rpc_notifier2',
title='Options for rpc_notifier2')
CONF = cfg.CONF
CONF.register_group(opt_group)
CONF.register_opt(notification_topic_opt, opt_group)
def notify(context, message):
"""Sends a notification via RPC"""
if not context:
context = req_context.get_admin_context()
priority = message.get('priority',
CONF.default_notification_level)
priority = priority.lower()
for topic in CONF.rpc_notifier2.topics:
topic = '%s.%s' % (topic, priority)
try:
rpc.notify(context, topic, message, envelope=True)
except Exception:
LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"), locals())

View File

@ -95,17 +95,21 @@ class PeriodicTasks(object):
ticks_to_skip = self._ticks_to_skip[task_name]
if ticks_to_skip > 0:
LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
" ticks left until next run"), locals())
" ticks left until next run"),
dict(full_task_name=full_task_name,
ticks_to_skip=ticks_to_skip))
self._ticks_to_skip[task_name] -= 1
continue
self._ticks_to_skip[task_name] = task._ticks_between_runs
LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
LOG.debug(_("Running periodic task %(full_task_name)s"),
dict(full_task_name=full_task_name))
try:
task(self, context)
except Exception as e:
if raise_on_error:
raise
LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
locals())
LOG.exception(_("Error during %(full_task_name)s:"
" %(e)s"),
dict(e=e, full_task_name=full_task_name))

View File

@ -50,25 +50,26 @@ rpc_opts = [
default=['quantum.openstack.common.exception',
'nova.exception',
'cinder.exception',
'exceptions',
],
help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'),
cfg.BoolOpt('fake_rabbit',
default=False,
help='If passed, use a fake RabbitMQ provider'),
#
# The following options are not registered here, but are expected to be
# present. The project using this library must register these options with
# the configuration so that project-specific defaults may be defined.
#
#cfg.StrOpt('control_exchange',
# default='nova',
# help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.StrOpt('control_exchange',
default='openstack',
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
cfg.CONF.register_opts(rpc_opts)
def set_defaults(control_exchange):
cfg.set_defaults(rpc_opts,
control_exchange=control_exchange)
def create_connection(new=True):
"""Create a connection to the message bus used for rpc.
@ -177,17 +178,18 @@ def multicall(context, topic, msg, timeout=None):
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
def notify(context, topic, msg):
def notify(context, topic, msg, envelope=False):
"""Send notification event.
:param context: Information that identifies the user that has made this
request.
:param topic: The topic to send the notification to.
:param msg: This is a dict of content of event.
:param envelope: Set to True to enable message envelope for notifications.
:returns: None
"""
return _get_impl().notify(cfg.CONF, context, topic, msg)
return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
def cleanup():

View File

@ -26,7 +26,6 @@ AMQP, but is deprecated and predates this code.
"""
import inspect
import logging
import sys
import uuid
@ -34,10 +33,10 @@ from eventlet import greenpool
from eventlet import pools
from eventlet import semaphore
from quantum.openstack.common import cfg
from quantum.openstack.common import excutils
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import local
from quantum.openstack.common import log as logging
from quantum.openstack.common.rpc import common as rpc_common
@ -55,7 +54,7 @@ class Pool(pools.Pool):
# TODO(comstud): Timeout connections not used in a while
def create(self):
LOG.debug('Pool creating new connection')
LOG.debug(_('Pool creating new connection'))
return self.connection_cls(self.conf)
def empty(self):
@ -150,7 +149,7 @@ class ConnectionContext(rpc_common.Connection):
def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
ending=False):
ending=False, log_failure=True):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
@ -158,7 +157,8 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
"""
with ConnectionContext(conf, connection_pool) as conn:
if failure:
failure = rpc_common.serialize_remote_exception(failure)
failure = rpc_common.serialize_remote_exception(failure,
log_failure)
try:
msg = {'result': reply, 'failure': failure}
@ -168,7 +168,7 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
'failure': failure}
if ending:
msg['ending'] = True
conn.direct_send(msg_id, msg)
conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext):
@ -185,10 +185,10 @@ class RpcContext(rpc_common.CommonRpcContext):
return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False,
connection_pool=None):
connection_pool=None, log_failure=True):
if self.msg_id:
msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
ending)
ending, log_failure)
if ending:
self.msg_id = None
@ -282,11 +282,21 @@ class ProxyCallback(object):
ctxt.reply(rval, None, connection_pool=self.connection_pool)
# This final None tells multicall that it is done.
ctxt.reply(ending=True, connection_pool=self.connection_pool)
except Exception as e:
LOG.exception('Exception during message handling')
except rpc_common.ClientException as e:
LOG.debug(_('Expected exception during message handling (%s)') %
e._exc_info[1])
ctxt.reply(None, e._exc_info,
connection_pool=self.connection_pool,
log_failure=False)
except Exception:
LOG.exception(_('Exception during message handling'))
ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool)
def wait(self):
"""Wait for all callback threads to exit."""
self.pool.waitall()
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
@ -349,7 +359,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
# that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into
# the pool
LOG.debug(_('Making asynchronous call on %s ...'), topic)
LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
@ -358,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, msg)
conn.topic_send(topic, rpc_common.serialize_msg(msg))
return wait_msg
@ -377,7 +387,7 @@ def cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous cast on %s...'), topic)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, msg)
conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast(conf, context, topic, msg, connection_pool):
@ -385,7 +395,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous fanout cast...'))
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.fanout_send(topic, msg)
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
@ -393,7 +403,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
conn.topic_send(topic, msg)
conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast_to_server(conf, context, server_params, topic, msg,
@ -402,15 +412,18 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
conn.fanout_send(topic, msg)
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def notify(conf, context, topic, msg, connection_pool):
def notify(conf, context, topic, msg, connection_pool, envelope):
"""Sends a notification event on a topic."""
event_type = msg.get('event_type')
LOG.debug(_('Sending %(event_type)s on %(topic)s'), locals())
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
dict(event_type=msg.get('event_type'),
topic=topic))
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
if envelope:
msg = rpc_common.serialize_msg(msg, force_envelope=True)
conn.notify_send(topic, msg)
@ -420,7 +433,4 @@ def cleanup(connection_pool):
def get_control_exchange(conf):
try:
return conf.control_exchange
except cfg.NoSuchOptError:
return 'openstack'
return conf.control_exchange

View File

@ -18,18 +18,61 @@
# under the License.
import copy
import logging
import sys
import traceback
from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils
from quantum.openstack.common import jsonutils
from quantum.openstack.common import local
from quantum.openstack.common import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
'''RPC Envelope Version.
This version number applies to the top level structure of messages sent out.
It does *not* apply to the message payload, which must be versioned
independently. For example, when using rpc APIs, a version number is applied
for changes to the API being exposed over rpc. This version number is handled
in the rpc proxy and dispatcher modules.
This version number applies to the message envelope that is used in the
serialization done inside the rpc layer. See serialize_msg() and
deserialize_msg().
The current message format (version 2.0) is very simple. It is:
{
'quantum.version': <RPC Envelope Version as a String>,
'quantum.message': <Application Message Payload, JSON encoded>
}
Message format version '1.0' is just considered to be the messages we sent
without a message envelope.
So, the current message envelope just includes the envelope version. It may
eventually contain additional information, such as a signature for the message
payload.
We will JSON encode the application message payload. The message envelope,
which includes the JSON encoded application message body, will be passed down
to the messaging libraries as a dict.
'''
_RPC_ENVELOPE_VERSION = '2.0'
_VERSION_KEY = 'quantum.version'
_MESSAGE_KEY = 'quantum.message'
# TODO(russellb) Turn this on after Grizzly.
_SEND_RPC_ENVELOPE = False
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
@ -40,7 +83,7 @@ class RPCException(Exception):
try:
message = self.message % kwargs
except Exception as e:
except Exception:
# kwargs doesn't match a variable in the message
# log the issue and the kwargs
LOG.exception(_('Exception in string format operation'))
@ -90,6 +133,11 @@ class UnsupportedRpcVersion(RPCException):
"this endpoint.")
class UnsupportedRpcEnvelopeVersion(RPCException):
message = _("Specified RPC envelope version, %(version)s, "
"not supported by this endpoint.")
class Connection(object):
"""A connection, returned by rpc.create_connection().
@ -164,8 +212,12 @@ class Connection(object):
def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging."""
SANITIZE = {'set_admin_password': ('new_pass',),
'run_instance': ('admin_password',), }
SANITIZE = {'set_admin_password': [('args', 'new_pass')],
'run_instance': [('args', 'admin_password')],
'route_message': [('args', 'message', 'args', 'method_info',
'method_kwargs', 'password'),
('args', 'message', 'args', 'method_info',
'method_kwargs', 'admin_password')]}
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
has_context_token = '_context_auth_token' in msg_data
@ -177,14 +229,16 @@ def _safe_log(log_func, msg, msg_data):
msg_data = copy.deepcopy(msg_data)
if has_method:
method = msg_data['method']
if method in SANITIZE:
args_to_sanitize = SANITIZE[method]
for arg in args_to_sanitize:
try:
msg_data['args'][arg] = "<SANITIZED>"
except KeyError:
pass
for arg in SANITIZE.get(msg_data['method'], []):
try:
d = msg_data
for elem in arg[:-1]:
d = d[elem]
d[arg[-1]] = '<SANITIZED>'
except KeyError, e:
LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
{'item': arg,
'err': e})
if has_context_token:
msg_data['_context_auth_token'] = '<SANITIZED>'
@ -195,7 +249,7 @@ def _safe_log(log_func, msg, msg_data):
return log_func(msg, msg_data)
def serialize_remote_exception(failure_info):
def serialize_remote_exception(failure_info, log_failure=True):
"""Prepares exception data to be sent over rpc.
Failure_info should be a sys.exc_info() tuple.
@ -203,8 +257,9 @@ def serialize_remote_exception(failure_info):
"""
tb = traceback.format_exception(*failure_info)
failure = failure_info[1]
LOG.error(_("Returning exception %s to caller"), unicode(failure))
LOG.error(tb)
if log_failure:
LOG.error(_("Returning exception %s to caller"), unicode(failure))
LOG.error(tb)
kwargs = {}
if hasattr(failure, 'kwargs'):
@ -258,7 +313,7 @@ def deserialize_remote_exception(conf, data):
# we cannot necessarily change an exception message so we must override
# the __str__ method.
failure.__class__ = new_ex_type
except TypeError as e:
except TypeError:
# NOTE(ameade): If a core exception then just add the traceback to the
# first exception argument.
failure.args = (message,) + failure.args[1:]
@ -309,3 +364,107 @@ class CommonRpcContext(object):
context.values['read_deleted'] = read_deleted
return context
class ClientException(Exception):
"""This encapsulates some actual exception that is expected to be
hit by an RPC proxy object. Merely instantiating it records the
current exception information, which will be passed back to the
RPC client without exceptional logging."""
def __init__(self):
self._exc_info = sys.exc_info()
def catch_client_exception(exceptions, func, *args, **kwargs):
try:
return func(*args, **kwargs)
except Exception, e:
if type(e) in exceptions:
raise ClientException()
else:
raise
def client_exceptions(*exceptions):
"""Decorator for manager methods that raise expected exceptions.
Marking a Manager method with this decorator allows the declaration
of expected exceptions that the RPC layer should not consider fatal,
and not log as if they were generated in a real error scenario. Note
that this will cause listed exceptions to be wrapped in a
ClientException, which is used internally by the RPC layer."""
def outer(func):
def inner(*args, **kwargs):
return catch_client_exception(exceptions, func, *args, **kwargs)
return inner
return outer
def version_is_compatible(imp_version, version):
"""Determine whether versions are compatible.
:param imp_version: The version implemented
:param version: The version requested by an incoming message.
"""
version_parts = version.split('.')
imp_version_parts = imp_version.split('.')
if int(version_parts[0]) != int(imp_version_parts[0]): # Major
return False
if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
return False
return True
def serialize_msg(raw_msg, force_envelope=False):
if not _SEND_RPC_ENVELOPE and not force_envelope:
return raw_msg
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
# information about this format.
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
_MESSAGE_KEY: jsonutils.dumps(raw_msg)}
return msg
def deserialize_msg(msg):
# NOTE(russellb): Hang on to your hats, this road is about to
# get a little bumpy.
#
# Robustness Principle:
# "Be strict in what you send, liberal in what you accept."
#
# At this point we have to do a bit of guessing about what it
# is we just received. Here is the set of possibilities:
#
# 1) We received a dict. This could be 2 things:
#
# a) Inspect it to see if it looks like a standard message envelope.
# If so, great!
#
# b) If it doesn't look like a standard message envelope, it could either
# be a notification, or a message from before we added a message
# envelope (referred to as version 1.0).
# Just return the message as-is.
#
# 2) It's any other non-dict type. Just return it and hope for the best.
# This case covers return values from rpc.call() from before message
# envelopes were used. (messages to call a method were always a dict)
if not isinstance(msg, dict):
# See #2 above.
return msg
base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
if not all(map(lambda key: key in msg, base_envelope_keys)):
# See #1.b above.
return msg
# At this point we think we have the message envelope
# format we were expecting. (#1.a above)
if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
return raw_msg

View File

@ -41,8 +41,8 @@ server side of the API at the same time. However, as the code stands today,
there can be both versioned and unversioned APIs implemented in the same code
base.
EXAMPLES:
EXAMPLES
========
Nova was the first project to use versioned rpc APIs. Consider the compute rpc
API as an example. The client side is in nova/compute/rpcapi.py and the server
@ -50,12 +50,13 @@ side is in nova/compute/manager.py.
Example 1) Adding a new method.
-------------------------------
Adding a new method is a backwards compatible change. It should be added to
nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to
X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should
have a specific version specified to indicate the minimum API version that must
be implemented for the method to be supported. For example:
be implemented for the method to be supported. For example::
def get_host_uptime(self, ctxt, host):
topic = _compute_topic(self.topic, ctxt, host, None)
@ -67,10 +68,11 @@ get_host_uptime() method.
Example 2) Adding a new parameter.
----------------------------------
Adding a new parameter to an rpc method can be made backwards compatible. The
RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped.
The implementation of the method must not expect the parameter to be present.
The implementation of the method must not expect the parameter to be present.::
def some_remote_method(self, arg1, arg2, newarg=None):
# The code needs to deal with newarg=None for cases
@ -101,21 +103,6 @@ class RpcDispatcher(object):
self.callbacks = callbacks
super(RpcDispatcher, self).__init__()
@staticmethod
def _is_compatible(mversion, version):
"""Determine whether versions are compatible.
:param mversion: The API version implemented by a callback.
:param version: The API version requested by an incoming message.
"""
version_parts = version.split('.')
mversion_parts = mversion.split('.')
if int(version_parts[0]) != int(mversion_parts[0]): # Major
return False
if int(version_parts[1]) > int(mversion_parts[1]): # Minor
return False
return True
def dispatch(self, ctxt, version, method, **kwargs):
"""Dispatch a message based on a requested version.
@ -137,7 +124,8 @@ class RpcDispatcher(object):
rpc_api_version = proxyobj.RPC_API_VERSION
else:
rpc_api_version = '1.0'
is_compatible = self._is_compatible(rpc_api_version, version)
is_compatible = rpc_common.version_is_compatible(rpc_api_version,
version)
had_compatible = had_compatible or is_compatible
if not hasattr(proxyobj, method):
continue

View File

@ -18,11 +18,15 @@ queues. Casts will block, but this is very useful for tests.
"""
import inspect
# NOTE(russellb): We specifically want to use json, not our own jsonutils.
# jsonutils has some extra logic to automatically convert objects to primitive
# types so that they can be serialized. We want to catch all cases where
# non-primitive types make it into this code and treat it as an error.
import json
import time
import eventlet
from quantum.openstack.common import jsonutils
from quantum.openstack.common.rpc import common as rpc_common
CONSUMERS = {}
@ -75,6 +79,8 @@ class Consumer(object):
else:
res.append(rval)
done.send(res)
except rpc_common.ClientException as e:
done.send_exception(e._exc_info[1])
except Exception as e:
done.send_exception(e)
@ -121,7 +127,7 @@ def create_connection(conf, new=True):
def check_serialize(msg):
"""Make sure a message intended for rpc can be serialized."""
jsonutils.dumps(msg)
json.dumps(msg)
def multicall(conf, context, topic, msg, timeout=None):
@ -154,6 +160,7 @@ def call(conf, context, topic, msg, timeout=None):
def cast(conf, context, topic, msg):
check_serialize(msg)
try:
call(conf, context, topic, msg)
except Exception:

View File

@ -162,7 +162,8 @@ class ConsumerBase(object):
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
callback(message.payload)
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
message.ack()
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
@ -196,7 +197,7 @@ class DirectConsumer(ConsumerBase):
# Default options
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(name=msg_id,
type='direct',
@ -269,7 +270,7 @@ class FanoutConsumer(ConsumerBase):
options = {'durable': False,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True,
'exclusive': True}
'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
durable=options['durable'],
@ -316,7 +317,7 @@ class DirectPublisher(Publisher):
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
'exclusive': False}
options.update(kwargs)
super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
type='direct', **options)
@ -350,7 +351,7 @@ class FanoutPublisher(Publisher):
"""
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
'exclusive': False}
options.update(kwargs)
super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
None, type='fanout', **options)
@ -387,6 +388,7 @@ class Connection(object):
def __init__(self, conf, server_params=None):
self.consumers = []
self.consumer_thread = None
self.proxy_callbacks = []
self.conf = conf
self.max_retries = self.conf.rabbit_max_retries
# Try forever?
@ -469,7 +471,7 @@ class Connection(object):
LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % params)
try:
self.connection.close()
self.connection.release()
except self.connection_errors:
pass
# Setting this in case the next statement fails, though
@ -573,12 +575,14 @@ class Connection(object):
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.connection.release()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.channel.close()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
@ -644,6 +648,11 @@ class Connection(object):
pass
self.consumer_thread = None
def wait_on_proxy_callbacks(self):
"""Wait for all proxy callback threads to exit."""
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg, **kwargs):
"""Send to a publisher based on the publisher class"""
@ -719,6 +728,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
if fanout:
self.declare_fanout_consumer(topic, proxy_cb)
@ -730,6 +740,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name)
@ -782,11 +793,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg):
def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(
conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
rpc_amqp.get_connection_pool(conf, Connection),
envelope)
def cleanup():

View File

@ -17,7 +17,6 @@
import functools
import itertools
import logging
import time
import uuid
@ -29,6 +28,7 @@ import qpid.messaging.exceptions
from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import jsonutils
from quantum.openstack.common import log as logging
from quantum.openstack.common.rpc import amqp as rpc_amqp
from quantum.openstack.common.rpc import common as rpc_common
@ -41,6 +41,9 @@ qpid_opts = [
cfg.StrOpt('qpid_port',
default='5672',
help='Qpid broker port'),
cfg.ListOpt('qpid_hosts',
default=['$qpid_hostname:$qpid_port'],
help='Qpid HA cluster host:port pairs'),
cfg.StrOpt('qpid_username',
default='',
help='Username for qpid connection'),
@ -121,7 +124,8 @@ class ConsumerBase(object):
"""Fetch the message and pass it to the callback object"""
message = self.receiver.fetch()
try:
self.callback(message.content)
msg = rpc_common.deserialize_msg(message.content)
self.callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
@ -274,25 +278,32 @@ class Connection(object):
self.session = None
self.consumers = {}
self.consumer_thread = None
self.proxy_callbacks = []
self.conf = conf
if server_params and 'hostname' in server_params:
# NOTE(russellb) This enables support for cast_to_server.
server_params['qpid_hosts'] = [
'%s:%d' % (server_params['hostname'],
server_params.get('port', 5672))
]
params = {
'hostname': self.conf.qpid_hostname,
'port': self.conf.qpid_port,
'qpid_hosts': self.conf.qpid_hosts,
'username': self.conf.qpid_username,
'password': self.conf.qpid_password,
}
params.update(server_params or {})
self.broker = params['hostname'] + ":" + str(params['port'])
self.brokers = params['qpid_hosts']
self.username = params['username']
self.password = params['password']
self.connection_create()
self.connection_create(self.brokers[0])
self.reconnect()
def connection_create(self):
def connection_create(self, broker):
# Create the connection - this does not open the connection
self.connection = qpid.messaging.Connection(self.broker)
self.connection = qpid.messaging.Connection(broker)
# Check if flags are set and if so set them for the connection
# before we call open
@ -320,10 +331,14 @@ class Connection(object):
except qpid.messaging.exceptions.ConnectionError:
pass
attempt = 0
delay = 1
while True:
broker = self.brokers[attempt % len(self.brokers)]
attempt += 1
try:
self.connection_create()
self.connection_create(broker)
self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e:
msg_dict = dict(e=e, delay=delay)
@ -333,10 +348,9 @@ class Connection(object):
time.sleep(delay)
delay = min(2 * delay, 60)
else:
LOG.info(_('Connected to AMQP server on %s'), broker)
break
LOG.info(_('Connected to AMQP server on %s'), self.broker)
self.session = self.connection.session()
if self.consumers:
@ -362,12 +376,14 @@ class Connection(object):
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.connection.close()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.session.close()
self.session = self.connection.session()
self.consumers = {}
@ -422,6 +438,11 @@ class Connection(object):
pass
self.consumer_thread = None
def wait_on_proxy_callbacks(self):
"""Wait for all proxy callback threads to exit."""
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class"""
@ -497,6 +518,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
if fanout:
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
@ -512,6 +534,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
name=pool_name)
@ -570,10 +593,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg):
def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
rpc_amqp.get_connection_pool(conf, Connection),
envelope)
def cleanup():

View File

@ -205,7 +205,9 @@ class ZmqClient(object):
def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data):
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send([str(msg_id), str(topic), str('cast'),
_serialize(data)])
@ -250,7 +252,7 @@ class InternalContext(object):
"""Process a curried message and cast the result to topic."""
LOG.debug(_("Running func with context: %s"), ctx.to_dict())
data.setdefault('version', None)
data.setdefault('args', [])
data.setdefault('args', {})
try:
result = proxy.dispatch(
@ -259,7 +261,14 @@ class InternalContext(object):
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
pass
except rpc_common.ClientException, e:
LOG.debug(_("Expected exception during message handling (%s)") %
e._exc_info[1])
return {'exc':
rpc_common.serialize_remote_exception(e._exc_info,
log_failure=False)}
except Exception:
LOG.error(_("Exception during message handling"))
return {'exc':
rpc_common.serialize_remote_exception(sys.exc_info())}
@ -314,7 +323,7 @@ class ConsumerBase(object):
return
data.setdefault('version', None)
data.setdefault('args', [])
data.setdefault('args', {})
proxy.dispatch(ctx, data['version'],
data['method'], **data['args'])
@ -426,7 +435,7 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUB
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
inside = _deserialize(in_msg)
inside = rpc_common.deserialize_msg(_deserialize(in_msg))
msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response)
@ -473,7 +482,7 @@ class ZmqReactor(ZmqBaseReactor):
msg_id, topic, style, in_msg = data
ctx, request = _deserialize(in_msg)
ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock]
@ -524,7 +533,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
def _cast(addr, context, msg_id, topic, msg, timeout=None):
def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
force_envelope=False):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@ -533,7 +543,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
conn = ZmqClient(addr)
# assumes cast can't return an exception
conn.cast(msg_id, topic, payload)
conn.cast(msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@ -602,7 +612,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
return responses[-1]
def _multi_send(method, context, topic, msg, timeout=None):
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
force_envelope=False):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@ -628,7 +639,8 @@ def _multi_send(method, context, topic, msg, timeout=None):
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
_topic, _topic, msg, timeout)
_topic, _topic, msg, timeout, serialize,
force_envelope)
return
return method(_addr, context, _topic, _topic, msg, timeout)
@ -669,6 +681,8 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
kwargs['serialize'] = kwargs.pop('envelope')
kwargs['force_envelope'] = True
cast(conf, context, topic, msg, **kwargs)

View File

@ -21,10 +21,10 @@ return keys for direct exchanges, per (approximate) AMQP parlance.
import contextlib
import itertools
import json
import logging
from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging
matchmaker_opts = [

View File

@ -57,6 +57,11 @@ class Service(service.Service):
self.conn.create_consumer(self.topic, dispatcher, fanout=True)
# Hook to allow the manager to do other initializations after
# the rpc connection is created.
if callable(getattr(self.manager, 'initialize_service_hook', None)):
self.manager.initialize_service_hook(self)
# Consume from all consumers in a thread
self.conn.consume_in_thread()

View File

@ -27,7 +27,7 @@ import sys
import time
import eventlet
import greenlet
import extras
import logging as std_logging
from quantum.openstack.common import cfg
@ -36,11 +36,8 @@ from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging
from quantum.openstack.common import threadgroup
try:
from quantum.openstack.common import rpc
except ImportError:
rpc = None
rpc = extras.try_import('quantum.openstack.common.rpc')
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -54,7 +51,7 @@ class Launcher(object):
:returns: None
"""
self._services = []
self._services = threadgroup.ThreadGroup('launcher')
eventlet_backdoor.initialize_if_enabled()
@staticmethod
@ -75,8 +72,7 @@ class Launcher(object):
:returns: None
"""
gt = eventlet.spawn(self.run_service, service)
self._services.append(gt)
self._services.add_thread(self.run_service, service)
def stop(self):
"""Stop all services which are currently running.
@ -84,8 +80,7 @@ class Launcher(object):
:returns: None
"""
for service in self._services:
service.kill()
self._services.stop()
def wait(self):
"""Waits until all services have been stopped, and then returns.
@ -93,11 +88,7 @@ class Launcher(object):
:returns: None
"""
for service in self._services:
try:
service.wait()
except greenlet.GreenletExit:
pass
self._services.wait()
class SignalExit(SystemExit):
@ -132,9 +123,9 @@ class ServiceLauncher(Launcher):
except SystemExit as exc:
status = exc.code
finally:
self.stop()
if rpc:
rpc.cleanup()
self.stop()
return status
@ -252,7 +243,10 @@ class ProcessLauncher(object):
def _wait_child(self):
try:
pid, status = os.wait()
# Don't block if no child processes have exited
pid, status = os.waitpid(0, os.WNOHANG)
if not pid:
return None
except OSError as exc:
if exc.errno not in (errno.EINTR, errno.ECHILD):
raise
@ -260,10 +254,12 @@ class ProcessLauncher(object):
if os.WIFSIGNALED(status):
sig = os.WTERMSIG(status)
LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals())
LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
dict(pid=pid, sig=sig))
else:
code = os.WEXITSTATUS(status)
LOG.info(_('Child %(pid)d exited with status %(code)d'), locals())
LOG.info(_('Child %(pid)s exited with status %(code)d'),
dict(pid=pid, code=code))
if pid not in self.children:
LOG.warning(_('pid %d not in child list'), pid)
@ -282,6 +278,10 @@ class ProcessLauncher(object):
while self.running:
wrap = self._wait_child()
if not wrap:
# Yield to other threads if no children have exited
# Sleep for a short time to avoid excessive CPU usage
# (see bug #1095346)
eventlet.greenthread.sleep(.01)
continue
while self.running and len(wrap.children) < wrap.workers:
@ -309,8 +309,8 @@ class ProcessLauncher(object):
class Service(object):
"""Service object for binaries running on hosts."""
def __init__(self):
self.tg = threadgroup.ThreadGroup('service')
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup('service', threads)
def start(self):
pass

View File

@ -117,8 +117,12 @@ def write_requirements():
def _run_shell_command(cmd):
output = subprocess.Popen(["/bin/sh", "-c", cmd],
stdout=subprocess.PIPE)
if os.name == 'nt':
output = subprocess.Popen(["cmd.exe", "/C", cmd],
stdout=subprocess.PIPE)
else:
output = subprocess.Popen(["/bin/sh", "-c", cmd],
stdout=subprocess.PIPE)
out = output.communicate()
if len(out) == 0:
return None
@ -272,6 +276,9 @@ def get_cmdclass():
from sphinx.setup_command import BuildDoc
class LocalBuildDoc(BuildDoc):
builders = ['html', 'man']
def generate_autoindex(self):
print "**Autodocumenting from %s" % os.path.abspath(os.curdir)
modules = {}
@ -307,14 +314,19 @@ def get_cmdclass():
if not os.getenv('SPHINX_DEBUG'):
self.generate_autoindex()
for builder in ['html', 'man']:
for builder in self.builders:
self.builder = builder
self.finalize_options()
self.project = self.distribution.get_name()
self.version = self.distribution.get_version()
self.release = self.distribution.get_version()
BuildDoc.run(self)
class LocalBuildLatex(LocalBuildDoc):
builders = ['latex']
cmdclass['build_sphinx'] = LocalBuildDoc
cmdclass['build_sphinx_latex'] = LocalBuildLatex
except ImportError:
pass

View File

@ -18,7 +18,6 @@ from eventlet import greenlet
from eventlet import greenpool
from eventlet import greenthread
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging
from quantum.openstack.common import loopingcall
@ -27,19 +26,17 @@ LOG = logging.getLogger(__name__)
def _thread_done(gt, *args, **kwargs):
'''
Callback function to be passed to GreenThread.link() when we spawn()
Calls the ThreadGroup to notify if.
'''
""" Callback function to be passed to GreenThread.link() when we spawn()
Calls the :class:`ThreadGroup` to notify if.
"""
kwargs['group'].thread_done(kwargs['thread'])
class Thread(object):
"""
Wrapper around a greenthread, that holds a reference to
the ThreadGroup. The Thread will notify the ThreadGroup
when it has done so it can be removed from the threads
list.
""" Wrapper around a greenthread, that holds a reference to the
:class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
it has done so it can be removed from the threads list.
"""
def __init__(self, name, thread, group):
self.name = name
@ -54,11 +51,11 @@ class Thread(object):
class ThreadGroup(object):
"""
The point of this class is to:
- keep track of timers and greenthreads (making it easier to stop them
""" The point of the ThreadGroup classis to:
* keep track of timers and greenthreads (making it easier to stop them
when need be).
- provide an easy API to add timers.
* provide an easy API to add timers.
"""
def __init__(self, name, thread_pool_size=10):
self.name = name

View File

@ -71,11 +71,15 @@ def normalize_time(timestamp):
def is_older_than(before, seconds):
"""Return True if before is older than seconds."""
if isinstance(before, basestring):
before = parse_strtime(before).replace(tzinfo=None)
return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds):
"""Return True if after is newer than seconds."""
if isinstance(after, basestring):
after = parse_strtime(after).replace(tzinfo=None)
return after - utcnow() > datetime.timedelta(seconds=seconds)
@ -87,7 +91,10 @@ def utcnow_ts():
def utcnow():
"""Overridable version of utils.utcnow."""
if utcnow.override_time:
return utcnow.override_time
try:
return utcnow.override_time.pop(0)
except AttributeError:
return utcnow.override_time
return datetime.datetime.utcnow()
@ -95,14 +102,21 @@ utcnow.override_time = None
def set_time_override(override_time=datetime.datetime.utcnow()):
"""Override utils.utcnow to return a constant time."""
"""
Override utils.utcnow to return a constant time or a list thereof,
one at a time.
"""
utcnow.override_time = override_time
def advance_time_delta(timedelta):
"""Advance overridden time using a datetime.timedelta."""
assert(not utcnow.override_time is None)
utcnow.override_time += timedelta
try:
for dt in utcnow.override_time:
dt += timedelta
except TypeError:
utcnow.override_time += timedelta
def advance_time_seconds(seconds):
@ -135,3 +149,16 @@ def unmarshall_time(tyme):
minute=tyme['minute'],
second=tyme['second'],
microsecond=tyme['microsecond'])
def delta_seconds(before, after):
"""
Compute the difference in seconds between two date, time, or
datetime objects (as a float, to microsecond resolution).
"""
delta = after - before
try:
return delta.total_seconds()
except AttributeError:
return ((delta.days * 24 * 3600) + delta.seconds +
float(delta.microseconds) / (10 ** 6))

View File

@ -24,19 +24,6 @@ import pkg_resources
import setup
class _deferred_version_string(object):
"""Internal helper class which provides delayed version calculation."""
def __init__(self, version_info, prefix):
self.version_info = version_info
self.prefix = prefix
def __str__(self):
return "%s%s" % (self.prefix, self.version_info.version_string())
def __repr__(self):
return "%s%s" % (self.prefix, self.version_info.version_string())
class VersionInfo(object):
def __init__(self, package, python_package=None, pre_version=None):
@ -57,14 +44,15 @@ class VersionInfo(object):
self.python_package = python_package
self.pre_version = pre_version
self.version = None
self._cached_version = None
def _generate_version(self):
"""Defer to the openstack.common.setup routines for making a
version from git."""
if self.pre_version is None:
return setup.get_post_version(self.python_package)
return setup.get_post_version(self.package)
else:
return setup.get_pre_version(self.python_package, self.pre_version)
return setup.get_pre_version(self.package, self.pre_version)
def _newer_version(self, pending_version):
"""Check to see if we're working with a stale version or not.
@ -138,11 +126,14 @@ class VersionInfo(object):
else:
return '%s-dev' % (version_parts[0],)
def deferred_version_string(self, prefix=""):
def cached_version_string(self, prefix=""):
"""Generate an object which will expand in a string context to
the results of version_string(). We do this so that don't
call into pkg_resources every time we start up a program when
passing version information into the CONF constructor, but
rather only do the calculation when and if a version is requested
"""
return _deferred_version_string(self, prefix)
if not self._cached_version:
self._cached_version = "%s%s" % (prefix,
self.version_string())
return self._cached_version

View File

@ -5,6 +5,7 @@ amqplib==0.6.1
anyjson>=0.2.4
argparse
eventlet>=0.9.17
extras
greenlet>=0.3.1
httplib2
iso8601>=0.1.4