Update openstack/common tree

This update is a prerequisite for working on moving
the notification listener code into Oslo.

blueprint move-listener-framework-oslo

Change-Id: Iffd0a5903eb378df004de7b919df249bc053aa81
Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
This commit is contained in:
Doug Hellmann
2013-02-13 12:14:35 -05:00
parent 85d086def3
commit c49264a285
13 changed files with 235 additions and 123 deletions

View File

@@ -863,7 +863,7 @@ class SubCommandOpt(Opt):
description=self.description, description=self.description,
help=self.help) help=self.help)
if not self.handler is None: if self.handler is not None:
self.handler(subparsers) self.handler(subparsers)
@@ -1297,6 +1297,24 @@ class ConfigOpts(collections.Mapping):
__import__(module_str) __import__(module_str)
self._get_opt_info(name, group) self._get_opt_info(name, group)
def import_group(self, group, module_str):
"""Import an option group from a module.
Import a module and check that a given option group is registered.
This is intended for use with global configuration objects
like cfg.CONF where modules commonly register options with
CONF at module load time. If one module requires an option group
defined by another module it can use this method to explicitly
declare the dependency.
:param group: an option OptGroup object or group name
:param module_str: the name of a module to import
:raises: ImportError, NoSuchGroupError
"""
__import__(module_str)
self._get_group(group)
@__clear_cache @__clear_cache
def set_override(self, name, override, group=None): def set_override(self, name, override, group=None):
"""Override an opt value. """Override an opt value.
@@ -1547,8 +1565,8 @@ class ConfigOpts(collections.Mapping):
group = group_or_name if isinstance(group_or_name, OptGroup) else None group = group_or_name if isinstance(group_or_name, OptGroup) else None
group_name = group.name if group else group_or_name group_name = group.name if group else group_or_name
if not group_name in self._groups: if group_name not in self._groups:
if not group is None or not autocreate: if group is not None or not autocreate:
raise NoSuchGroupError(group_name) raise NoSuchGroupError(group_name)
self.register_group(OptGroup(name=group_name)) self.register_group(OptGroup(name=group_name))
@@ -1568,7 +1586,7 @@ class ConfigOpts(collections.Mapping):
group = self._get_group(group) group = self._get_group(group)
opts = group._opts opts = group._opts
if not opt_name in opts: if opt_name not in opts:
raise NoSuchOptError(opt_name, group) raise NoSuchOptError(opt_name, group)
return opts[opt_name] return opts[opt_name]
@@ -1606,7 +1624,7 @@ class ConfigOpts(collections.Mapping):
opt = info['opt'] opt = info['opt']
if opt.required: if opt.required:
if ('default' in info or 'override' in info): if 'default' in info or 'override' in info:
continue continue
if self._get(opt.dest, group) is None: if self._get(opt.dest, group) is None:
@@ -1625,7 +1643,7 @@ class ConfigOpts(collections.Mapping):
""" """
self._args = args self._args = args
for opt, group in self._all_cli_opts(): for opt, group in sorted(self._all_cli_opts()):
opt._add_to_cli(self._oparser, group) opt._add_to_cli(self._oparser, group)
return vars(self._oparser.parse_args(args)) return vars(self._oparser.parse_args(args))

View File

@@ -34,15 +34,21 @@ This module provides a few things:
import datetime import datetime
import functools
import inspect import inspect
import itertools import itertools
import json import json
import logging
import xmlrpclib import xmlrpclib
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import timeutils from ceilometer.openstack.common import timeutils
LOG = logging.getLogger(__name__)
def to_primitive(value, convert_instances=False, level=0):
def to_primitive(value, convert_instances=False, convert_datetime=True,
level=0, max_depth=3):
"""Convert a complex object into primitives. """Convert a complex object into primitives.
Handy for JSON serialization. We can optionally handle instances, Handy for JSON serialization. We can optionally handle instances,
@@ -78,12 +84,19 @@ def to_primitive(value, convert_instances=False, level=0):
if getattr(value, '__module__', None) == 'mox': if getattr(value, '__module__', None) == 'mox':
return 'mock' return 'mock'
if level > 3: if level > max_depth:
LOG.error(_('Max serialization depth exceeded on object: %d %s'),
level, value)
return '?' return '?'
# The try block may not be necessary after the class check above, # The try block may not be necessary after the class check above,
# but just in case ... # but just in case ...
try: try:
recursive = functools.partial(to_primitive,
convert_instances=convert_instances,
convert_datetime=convert_datetime,
level=level,
max_depth=max_depth)
# It's not clear why xmlrpclib created their own DateTime type, but # It's not clear why xmlrpclib created their own DateTime type, but
# for our purposes, make it a datetime type which is explicitly # for our purposes, make it a datetime type which is explicitly
# handled # handled
@@ -91,33 +104,19 @@ def to_primitive(value, convert_instances=False, level=0):
value = datetime.datetime(*tuple(value.timetuple())[:6]) value = datetime.datetime(*tuple(value.timetuple())[:6])
if isinstance(value, (list, tuple)): if isinstance(value, (list, tuple)):
o = [] return [recursive(v) for v in value]
for v in value:
o.append(to_primitive(v, convert_instances=convert_instances,
level=level))
return o
elif isinstance(value, dict): elif isinstance(value, dict):
o = {} return dict((k, recursive(v)) for k, v in value.iteritems())
for k, v in value.iteritems(): elif convert_datetime and isinstance(value, datetime.datetime):
o[k] = to_primitive(v, convert_instances=convert_instances,
level=level)
return o
elif isinstance(value, datetime.datetime):
return timeutils.strtime(value) return timeutils.strtime(value)
elif hasattr(value, 'iteritems'): elif hasattr(value, 'iteritems'):
return to_primitive(dict(value.iteritems()), return recursive(dict(value.iteritems()), level=level + 1)
convert_instances=convert_instances,
level=level + 1)
elif hasattr(value, '__iter__'): elif hasattr(value, '__iter__'):
return to_primitive(list(value), return recursive(list(value))
convert_instances=convert_instances,
level=level)
elif convert_instances and hasattr(value, '__dict__'): elif convert_instances and hasattr(value, '__dict__'):
# Likely an instance of something. Watch for cycles. # Likely an instance of something. Watch for cycles.
# Ignore class member vars. # Ignore class member vars.
return to_primitive(value.__dict__, return recursive(value.__dict__, level=level + 1)
convert_instances=convert_instances,
level=level + 1)
else: else:
return value return value
except TypeError: except TypeError:

View File

@@ -26,6 +26,9 @@ class WeakLocal(corolocal.local):
def __getattribute__(self, attr): def __getattribute__(self, attr):
rval = corolocal.local.__getattribute__(self, attr) rval = corolocal.local.__getattribute__(self, attr)
if rval: if rval:
# NOTE(mikal): this bit is confusing. What is stored is a weak
# reference, not the value itself. We therefore need to lookup
# the weak reference and return the inner value here.
rval = rval() rval = rval()
return rval return rval
@@ -34,4 +37,12 @@ class WeakLocal(corolocal.local):
return corolocal.local.__setattr__(self, attr, value) return corolocal.local.__setattr__(self, attr, value)
# NOTE(mikal): the name "store" should be deprecated in the future
store = WeakLocal() store = WeakLocal()
# A "weak" store uses weak references and allows an object to fall out of scope
# when it falls out of scope in the code that uses the thread local storage. A
# "strong" store will hold a reference to the object so that it never falls out
# of scope.
weak_store = WeakLocal()
strong_store = corolocal.local

View File

@@ -25,8 +25,16 @@ For some wrappers that add message versioning to rpc, see:
rpc.proxy rpc.proxy
""" """
import inspect
import logging
from ceilometer.openstack.common import cfg from ceilometer.openstack.common import cfg
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import importutils from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import local
LOG = logging.getLogger(__name__)
rpc_opts = [ rpc_opts = [
@@ -62,7 +70,8 @@ rpc_opts = [
help='AMQP exchange to connect to if using RabbitMQ or Qpid'), help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
] ]
cfg.CONF.register_opts(rpc_opts) CONF = cfg.CONF
CONF.register_opts(rpc_opts)
def set_defaults(control_exchange): def set_defaults(control_exchange):
@@ -83,10 +92,27 @@ def create_connection(new=True):
:returns: An instance of openstack.common.rpc.common.Connection :returns: An instance of openstack.common.rpc.common.Connection
""" """
return _get_impl().create_connection(cfg.CONF, new=new) return _get_impl().create_connection(CONF, new=new)
def call(context, topic, msg, timeout=None): def _check_for_lock():
if not CONF.debug:
return None
if ((hasattr(local.strong_store, 'locks_held')
and local.strong_store.locks_held)):
stack = ' :: '.join([frame[3] for frame in inspect.stack()])
LOG.warn(_('A RPC is being made while holding a lock. The locks '
'currently held are %(locks)s. This is probably a bug. '
'Please report it. Include the following: [%(stack)s].'),
{'locks': local.strong_store.locks_held,
'stack': stack})
return True
return False
def call(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method that returns something. """Invoke a remote method that returns something.
:param context: Information that identifies the user that has made this :param context: Information that identifies the user that has made this
@@ -100,13 +126,17 @@ def call(context, topic, msg, timeout=None):
"args" : dict_of_kwargs } "args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout. :param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option. If set, this overrides the rpc_response_timeout option.
:param check_for_lock: if True, a warning is emitted if a RPC call is made
with a lock held.
:returns: A dict from the remote method. :returns: A dict from the remote method.
:raises: openstack.common.rpc.common.Timeout if a complete response :raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached. is not received before the timeout is reached.
""" """
return _get_impl().call(cfg.CONF, context, topic, msg, timeout) if check_for_lock:
_check_for_lock()
return _get_impl().call(CONF, context, topic, msg, timeout)
def cast(context, topic, msg): def cast(context, topic, msg):
@@ -124,7 +154,7 @@ def cast(context, topic, msg):
:returns: None :returns: None
""" """
return _get_impl().cast(cfg.CONF, context, topic, msg) return _get_impl().cast(CONF, context, topic, msg)
def fanout_cast(context, topic, msg): def fanout_cast(context, topic, msg):
@@ -145,10 +175,10 @@ def fanout_cast(context, topic, msg):
:returns: None :returns: None
""" """
return _get_impl().fanout_cast(cfg.CONF, context, topic, msg) return _get_impl().fanout_cast(CONF, context, topic, msg)
def multicall(context, topic, msg, timeout=None): def multicall(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method and get back an iterator. """Invoke a remote method and get back an iterator.
In this case, the remote method will be returning multiple values in In this case, the remote method will be returning multiple values in
@@ -166,6 +196,8 @@ def multicall(context, topic, msg, timeout=None):
"args" : dict_of_kwargs } "args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout. :param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option. If set, this overrides the rpc_response_timeout option.
:param check_for_lock: if True, a warning is emitted if a RPC call is made
with a lock held.
:returns: An iterator. The iterator will yield a tuple (N, X) where N is :returns: An iterator. The iterator will yield a tuple (N, X) where N is
an index that starts at 0 and increases by one for each value an index that starts at 0 and increases by one for each value
@@ -175,7 +207,9 @@ def multicall(context, topic, msg, timeout=None):
:raises: openstack.common.rpc.common.Timeout if a complete response :raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached. is not received before the timeout is reached.
""" """
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) if check_for_lock:
_check_for_lock()
return _get_impl().multicall(CONF, context, topic, msg, timeout)
def notify(context, topic, msg, envelope=False): def notify(context, topic, msg, envelope=False):
@@ -217,7 +251,7 @@ def cast_to_server(context, server_params, topic, msg):
:returns: None :returns: None
""" """
return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic, return _get_impl().cast_to_server(CONF, context, server_params, topic,
msg) msg)
@@ -233,7 +267,7 @@ def fanout_cast_to_server(context, server_params, topic, msg):
:returns: None :returns: None
""" """
return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params, return _get_impl().fanout_cast_to_server(CONF, context, server_params,
topic, msg) topic, msg)
@@ -263,10 +297,10 @@ def _get_impl():
global _RPCIMPL global _RPCIMPL
if _RPCIMPL is None: if _RPCIMPL is None:
try: try:
_RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend) _RPCIMPL = importutils.import_module(CONF.rpc_backend)
except ImportError: except ImportError:
# For backwards compatibility with older nova config. # For backwards compatibility with older nova config.
impl = cfg.CONF.rpc_backend.replace('nova.rpc', impl = CONF.rpc_backend.replace('nova.rpc',
'nova.openstack.common.rpc') 'nova.openstack.common.rpc')
_RPCIMPL = importutils.import_module(impl) _RPCIMPL = importutils.import_module(impl)
return _RPCIMPL return _RPCIMPL

View File

@@ -368,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
conn = ConnectionContext(conf, connection_pool) conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout) wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg) conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, rpc_common.serialize_msg(msg)) conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg return wait_msg

View File

@@ -289,7 +289,7 @@ def deserialize_remote_exception(conf, data):
# NOTE(ameade): We DO NOT want to allow just any module to be imported, in # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
# order to prevent arbitrary code execution. # order to prevent arbitrary code execution.
if not module in conf.allowed_rpc_exception_modules: if module not in conf.allowed_rpc_exception_modules:
return RemoteError(name, failure.get('message'), trace) return RemoteError(name, failure.get('message'), trace)
try: try:

View File

@@ -66,7 +66,8 @@ kombu_opts = [
help='the RabbitMQ userid'), help='the RabbitMQ userid'),
cfg.StrOpt('rabbit_password', cfg.StrOpt('rabbit_password',
default='guest', default='guest',
help='the RabbitMQ password'), help='the RabbitMQ password',
secret=True),
cfg.StrOpt('rabbit_virtual_host', cfg.StrOpt('rabbit_virtual_host',
default='/', default='/',
help='the RabbitMQ virtual host'), help='the RabbitMQ virtual host'),
@@ -302,9 +303,15 @@ class Publisher(object):
channel=channel, channel=channel,
routing_key=self.routing_key) routing_key=self.routing_key)
def send(self, msg): def send(self, msg, timeout=None):
"""Send a message""" """Send a message"""
self.producer.publish(msg) if timeout:
#
# AMQP TTL is in milliseconds when set in the header.
#
self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
else:
self.producer.publish(msg)
class DirectPublisher(Publisher): class DirectPublisher(Publisher):
@@ -653,7 +660,7 @@ class Connection(object):
for proxy_cb in self.proxy_callbacks: for proxy_cb in self.proxy_callbacks:
proxy_cb.wait() proxy_cb.wait()
def publisher_send(self, cls, topic, msg, **kwargs): def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
"""Send to a publisher based on the publisher class""" """Send to a publisher based on the publisher class"""
def _error_callback(exc): def _error_callback(exc):
@@ -663,7 +670,7 @@ class Connection(object):
def _publish(): def _publish():
publisher = cls(self.conf, self.channel, topic, **kwargs) publisher = cls(self.conf, self.channel, topic, **kwargs)
publisher.send(msg) publisher.send(msg, timeout)
self.ensure(_error_callback, _publish) self.ensure(_error_callback, _publish)
@@ -691,9 +698,9 @@ class Connection(object):
"""Send a 'direct' message""" """Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg) self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg): def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message""" """Send a 'topic' message"""
self.publisher_send(TopicPublisher, topic, msg) self.publisher_send(TopicPublisher, topic, msg, timeout)
def fanout_send(self, topic, msg): def fanout_send(self, topic, msg):
"""Send a 'fanout' message""" """Send a 'fanout' message"""
@@ -701,7 +708,7 @@ class Connection(object):
def notify_send(self, topic, msg, **kwargs): def notify_send(self, topic, msg, **kwargs):
"""Send a notify message on a topic""" """Send a notify message on a topic"""
self.publisher_send(NotifyPublisher, topic, msg, **kwargs) self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
def consume(self, limit=None): def consume(self, limit=None):
"""Consume from all queues/consumers""" """Consume from all queues/consumers"""

View File

@@ -51,7 +51,8 @@ qpid_opts = [
help='Username for qpid connection'), help='Username for qpid connection'),
cfg.StrOpt('qpid_password', cfg.StrOpt('qpid_password',
default='', default='',
help='Password for qpid connection'), help='Password for qpid connection',
secret=True),
cfg.StrOpt('qpid_sasl_mechanisms', cfg.StrOpt('qpid_sasl_mechanisms',
default='', default='',
help='Space separated list of SASL mechanisms to use for auth'), help='Space separated list of SASL mechanisms to use for auth'),
@@ -486,9 +487,20 @@ class Connection(object):
"""Send a 'direct' message""" """Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg) self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg): def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message""" """Send a 'topic' message"""
self.publisher_send(TopicPublisher, topic, msg) #
# We want to create a message with attributes, e.g. a TTL. We
# don't really need to keep 'msg' in its JSON format any longer
# so let's create an actual qpid message here and get some
# value-add on the go.
#
# WARNING: Request timeout happens to be in the same units as
# qpid's TTL (seconds). If this changes in the future, then this
# will need to be altered accordingly.
#
qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
self.publisher_send(TopicPublisher, topic, qpid_message)
def fanout_send(self, topic, msg): def fanout_send(self, topic, msg):
"""Send a 'fanout' message""" """Send a 'fanout' message"""

View File

@@ -17,7 +17,6 @@
import os import os
import pprint import pprint
import socket import socket
import string
import sys import sys
import types import types
import uuid import uuid
@@ -90,7 +89,7 @@ def _serialize(data):
Error if a developer passes us bad data. Error if a developer passes us bad data.
""" """
try: try:
return str(jsonutils.dumps(data, ensure_ascii=True)) return jsonutils.dumps(data, ensure_ascii=True)
except TypeError: except TypeError:
LOG.error(_("JSON serialization failed.")) LOG.error(_("JSON serialization failed."))
raise raise
@@ -218,10 +217,11 @@ class ZmqClient(object):
self.outq = ZmqSocket(addr, socket_type, bind=bind) self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
msg_id = msg_id or 0
if serialize: if serialize:
data = rpc_common.serialize_msg(data, force_envelope) data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send([str(msg_id), str(topic), str('cast'), self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
_serialize(data)])
def close(self): def close(self):
self.outq.close() self.outq.close()
@@ -295,13 +295,13 @@ class InternalContext(object):
ctx.replies) ctx.replies)
LOG.debug(_("Sending reply")) LOG.debug(_("Sending reply"))
cast(CONF, ctx, topic, { _multi_send(_cast, ctx, topic, {
'method': '-process_reply', 'method': '-process_reply',
'args': { 'args': {
'msg_id': msg_id, 'msg_id': msg_id, # Include for Folsom compat.
'response': response 'response': response
} }
}) }, _msg_id=msg_id)
class ConsumerBase(object): class ConsumerBase(object):
@@ -321,21 +321,22 @@ class ConsumerBase(object):
return [result] return [result]
def process(self, style, target, proxy, ctx, data): def process(self, style, target, proxy, ctx, data):
data.setdefault('version', None)
data.setdefault('args', {})
# Method starting with - are # Method starting with - are
# processed internally. (non-valid method name) # processed internally. (non-valid method name)
method = data['method'] method = data.get('method')
if not method:
LOG.error(_("RPC message did not include method."))
return
# Internal method # Internal method
# uses internal context for safety. # uses internal context for safety.
if data['method'][0] == '-': if method == '-reply':
# For reply / process_reply self.private_ctx.reply(ctx, proxy, **data['args'])
method = method[1:]
if method == 'reply':
self.private_ctx.reply(ctx, proxy, **data['args'])
return return
data.setdefault('version', None)
data.setdefault('args', {})
proxy.dispatch(ctx, data['version'], proxy.dispatch(ctx, data['version'],
data['method'], **data['args']) data['method'], **data['args'])
@@ -436,20 +437,12 @@ class ZmqProxy(ZmqBaseReactor):
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
# Handle zmq_replies magic if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
if topic.startswith('fanout~'):
sock_type = zmq.PUB sock_type = zmq.PUB
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
inside = rpc_common.deserialize_msg(_deserialize(in_msg))
msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response)
data = [str(msg_id), _serialize(response)]
else: else:
sock_type = zmq.PUSH sock_type = zmq.PUSH
if not topic in self.topic_proxy: if topic not in self.topic_proxy:
def publisher(waiter): def publisher(waiter):
LOG.info(_("Creating proxy for topic: %s"), topic) LOG.info(_("Creating proxy for topic: %s"), topic)
@@ -600,8 +593,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread() self.reactor.consume_in_thread()
def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, def _cast(addr, context, topic, msg, timeout=None, serialize=True,
force_envelope=False): force_envelope=False, _msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg] payload = [RpcContext.marshal(context), msg]
@@ -610,7 +603,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn = ZmqClient(addr) conn = ZmqClient(addr)
# assumes cast can't return an exception # assumes cast can't return an exception
conn.cast(msg_id, topic, payload, serialize, force_envelope) conn.cast(_msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError: except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception") raise RPCException("Cast failed. ZMQ Socket Exception")
finally: finally:
@@ -618,7 +611,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn.close() conn.close()
def _call(addr, context, msg_id, topic, msg, timeout=None, def _call(addr, context, topic, msg, timeout=None,
serialize=True, force_envelope=False): serialize=True, force_envelope=False):
# timeout_response is how long we wait for a response # timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout timeout = timeout or CONF.rpc_response_timeout
@@ -654,7 +647,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
) )
LOG.debug(_("Sending cast")) LOG.debug(_("Sending cast"))
_cast(addr, context, msg_id, topic, payload, _cast(addr, context, topic, payload,
serialize=serialize, force_envelope=force_envelope) serialize=serialize, force_envelope=force_envelope)
LOG.debug(_("Cast sent; Waiting reply")) LOG.debug(_("Cast sent; Waiting reply"))
@@ -662,10 +655,12 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
msg = msg_waiter.recv() msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response")) LOG.debug(_("Unpacking response"))
responses = _deserialize(msg[-1]) responses = _deserialize(msg[-1])[-1]['args']['response']
# ZMQError trumps the Timeout error. # ZMQError trumps the Timeout error.
except zmq.ZMQError: except zmq.ZMQError:
raise RPCException("ZMQ Socket Error") raise RPCException("ZMQ Socket Error")
except (IndexError, KeyError):
raise RPCException(_("RPC Message Invalid."))
finally: finally:
if 'msg_waiter' in vars(): if 'msg_waiter' in vars():
msg_waiter.close() msg_waiter.close()
@@ -682,7 +677,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
def _multi_send(method, context, topic, msg, timeout=None, serialize=True, def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
force_envelope=False): force_envelope=False, _msg_id=None):
""" """
Wraps the sending of messages, Wraps the sending of messages,
dispatches to the matchmaker and sends dispatches to the matchmaker and sends
@@ -708,10 +703,10 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
if method.__name__ == '_cast': if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context, eventlet.spawn_n(method, _addr, context,
_topic, _topic, msg, timeout, serialize, _topic, msg, timeout, serialize,
force_envelope) force_envelope, _msg_id)
return return
return method(_addr, context, _topic, _topic, msg, timeout, return method(_addr, context, _topic, msg, timeout,
serialize, force_envelope) serialize, force_envelope)
@@ -777,21 +772,9 @@ def _get_ctxt():
return ZMQ_CTX return ZMQ_CTX
def _get_matchmaker(): def _get_matchmaker(*args, **kwargs):
global matchmaker global matchmaker
if not matchmaker: if not matchmaker:
# rpc_zmq_matchmaker should be set to a 'module.Class' matchmaker = importutils.import_object(
mm_path = CONF.rpc_zmq_matchmaker.split('.') CONF.rpc_zmq_matchmaker, *args, **kwargs)
mm_module = '.'.join(mm_path[:-1])
mm_class = mm_path[-1]
# Only initialize a class.
if mm_path[-1][0] not in string.ascii_uppercase:
LOG.error(_("Matchmaker could not be loaded.\n"
"rpc_zmq_matchmaker is not a class."))
raise RPCException(_("Error loading Matchmaker."))
mm_impl = importutils.import_module(mm_module)
mm_constructor = getattr(mm_impl, mm_class)
matchmaker = mm_constructor()
return matchmaker return matchmaker

View File

@@ -201,24 +201,25 @@ class FanoutRingExchange(RingExchange):
class LocalhostExchange(Exchange): class LocalhostExchange(Exchange):
"""Exchange where all direct topics are local.""" """Exchange where all direct topics are local."""
def __init__(self): def __init__(self, host='localhost'):
self.host = host
super(Exchange, self).__init__() super(Exchange, self).__init__()
def run(self, key): def run(self, key):
return [(key.split('.')[0] + '.localhost', 'localhost')] return [('.'.join((key.split('.')[0], self.host)), self.host)]
class DirectExchange(Exchange): class DirectExchange(Exchange):
""" """
Exchange where all topic keys are split, sending to second half. Exchange where all topic keys are split, sending to second half.
i.e. "compute.host" sends a message to "compute" running on "host" i.e. "compute.host" sends a message to "compute.host" running on "host"
""" """
def __init__(self): def __init__(self):
super(Exchange, self).__init__() super(Exchange, self).__init__()
def run(self, key): def run(self, key):
b, e = key.split('.', 1) e = key.split('.', 1)[1]
return [(b, e)] return [(key, e)]
class MatchMakerRing(MatchMakerBase): class MatchMakerRing(MatchMakerBase):
@@ -237,11 +238,11 @@ class MatchMakerLocalhost(MatchMakerBase):
Match Maker where all bare topics resolve to localhost. Match Maker where all bare topics resolve to localhost.
Useful for testing. Useful for testing.
""" """
def __init__(self): def __init__(self, host='localhost'):
super(MatchMakerLocalhost, self).__init__() super(MatchMakerLocalhost, self).__init__()
self.add_binding(FanoutBinding(), LocalhostExchange()) self.add_binding(FanoutBinding(), LocalhostExchange(host))
self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange()) self.add_binding(TopicBinding(), LocalhostExchange(host))
class MatchMakerStub(MatchMakerBase): class MatchMakerStub(MatchMakerBase):

View File

@@ -258,7 +258,23 @@ def get_cmdclass():
return cmdclass return cmdclass
def get_version_from_git(pre_version): def _get_revno():
"""Return the number of commits since the most recent tag.
We use git-describe to find this out, but if there are no
tags then we fall back to counting commits since the beginning
of time.
"""
describe = _run_shell_command("git describe --always")
if "-" in describe:
return describe.rsplit("-", 2)[-2]
# no tags found
revlist = _run_shell_command("git rev-list --abbrev-commit HEAD")
return len(revlist.splitlines())
def _get_version_from_git(pre_version):
"""Return a version which is equal to the tag that's on the current """Return a version which is equal to the tag that's on the current
revision if there is one, or tag plus number of additional revisions revision if there is one, or tag plus number of additional revisions
if the current revision has no tag.""" if the current revision has no tag."""
@@ -266,21 +282,19 @@ def get_version_from_git(pre_version):
if os.path.isdir('.git'): if os.path.isdir('.git'):
if pre_version: if pre_version:
try: try:
return _run_shell_command( return _run_shell_command(
"git describe --exact-match", "git describe --exact-match",
throw_on_error=True).replace('-', '.') throw_on_error=True).replace('-', '.')
except Exception: except Exception:
sha = _run_shell_command("git log -n1 --pretty=format:%h") sha = _run_shell_command("git log -n1 --pretty=format:%h")
describe = _run_shell_command("git describe --always") return "%s.a%s.g%s" % (pre_version, _get_revno(), sha)
revno = describe.rsplit("-", 2)[-2]
return "%s.a%s.g%s" % (pre_version, revno, sha)
else: else:
return _run_shell_command( return _run_shell_command(
"git describe --always").replace('-', '.') "git describe --always").replace('-', '.')
return None return None
def get_version_from_pkg_info(package_name): def _get_version_from_pkg_info(package_name):
"""Get the version from PKG-INFO file if we can.""" """Get the version from PKG-INFO file if we can."""
try: try:
pkg_info_file = open('PKG-INFO', 'r') pkg_info_file = open('PKG-INFO', 'r')
@@ -311,10 +325,10 @@ def get_version(package_name, pre_version=None):
version = os.environ.get("OSLO_PACKAGE_VERSION", None) version = os.environ.get("OSLO_PACKAGE_VERSION", None)
if version: if version:
return version return version
version = get_version_from_pkg_info(package_name) version = _get_version_from_pkg_info(package_name)
if version: if version:
return version return version
version = get_version_from_git(pre_version) version = _get_version_from_git(pre_version)
if version: if version:
return version return version
raise Exception("Versioning for this project requires either an sdist" raise Exception("Versioning for this project requires either an sdist"

View File

@@ -98,6 +98,11 @@ def utcnow():
return datetime.datetime.utcnow() return datetime.datetime.utcnow()
def iso8601_from_timestamp(timestamp):
"""Returns a iso8601 formated date from timestamp"""
return isotime(datetime.datetime.utcfromtimestamp(timestamp))
utcnow.override_time = None utcnow.override_time = None
@@ -162,3 +167,16 @@ def delta_seconds(before, after):
except AttributeError: except AttributeError:
return ((delta.days * 24 * 3600) + delta.seconds + return ((delta.days * 24 * 3600) + delta.seconds +
float(delta.microseconds) / (10 ** 6)) float(delta.microseconds) / (10 ** 6))
def is_soon(dt, window):
"""
Determines if time is going to happen in the next window seconds.
:params dt: the time
:params window: minimum seconds to remain to consider the time not soon
:return: True if expiration is within the given duration
"""
soon = (utcnow() + datetime.timedelta(seconds=window))
return normalize_time(dt) <= soon

View File

@@ -33,12 +33,27 @@ class VersionInfo(object):
self.version = None self.version = None
self._cached_version = None self._cached_version = None
def __str__(self):
"""Make the VersionInfo object behave like a string."""
return self.version_string()
def __repr__(self):
"""Include the name."""
return "VersionInfo(%s:%s)" % (self.package, self.version_string())
def _get_version_from_pkg_resources(self): def _get_version_from_pkg_resources(self):
"""Get the version of the package from the pkg_resources record """Get the version of the package from the pkg_resources record
associated with the package.""" associated with the package."""
requirement = pkg_resources.Requirement.parse(self.package) try:
provider = pkg_resources.get_provider(requirement) requirement = pkg_resources.Requirement.parse(self.package)
return provider.version provider = pkg_resources.get_provider(requirement)
return provider.version
except pkg_resources.DistributionNotFound:
# The most likely cause for this is running tests in a tree
# produced from a tarball where the package itself has not been
# installed into anything. Revert to setup-time logic.
from ceilometer.openstack.common import setup
return setup.get_version(self.package)
def release_string(self): def release_string(self):
"""Return the full version of the package including suffixes indicating """Return the full version of the package including suffixes indicating