Update openstack.common

The main motivation for this update is to fix
the way logging is configured so that all
libraries are able to log at the default level.
The other changes come in because there's no
clean way to update the common libs piecemeal.

bug 1152584

Change-Id: I22563f7c0aa7c48edc81cb85328c3ae3ec8c2f11
Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
This commit is contained in:
Doug Hellmann 2013-03-14 08:03:40 -07:00
parent e5f6889e75
commit 9c33366fbd
28 changed files with 488 additions and 80 deletions

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -39,10 +39,10 @@ from ceilometer.openstack.common.rpc import impl_zmq
CONF = cfg.CONF
CONF.register_opts(rpc.rpc_opts)
CONF.register_opts(impl_zmq.zmq_opts)
CONF(sys.argv[1:], project='ceilometer')
def main():
CONF(sys.argv[1:], project='ceilometer')
logging.setup("ceilometer")
with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -37,9 +37,9 @@ class RequestContext(object):
accesses the system, as well as additional request information.
"""
def __init__(self, auth_tok=None, user=None, tenant=None, is_admin=False,
def __init__(self, auth_token=None, user=None, tenant=None, is_admin=False,
read_only=False, show_deleted=False, request_id=None):
self.auth_tok = auth_tok
self.auth_token = auth_token
self.user = user
self.tenant = tenant
self.is_admin = is_admin
@ -55,7 +55,7 @@ class RequestContext(object):
'is_admin': self.is_admin,
'read_only': self.read_only,
'show_deleted': self.show_deleted,
'auth_token': self.auth_tok,
'auth_token': self.auth_token,
'request_id': self.request_id}

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 Openstack, LLC.
# Copyright (c) 2012 OpenStack Foundation.
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -38,14 +38,10 @@ import functools
import inspect
import itertools
import json
import logging
import xmlrpclib
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import timeutils
LOG = logging.getLogger(__name__)
def to_primitive(value, convert_instances=False, convert_datetime=True,
level=0, max_depth=3):
@ -85,8 +81,6 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
return 'mock'
if level > max_depth:
LOG.error(_('Max serialization depth exceeded on object: %d %s'),
level, value)
return '?'
# The try block may not be necessary after the class check above,

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
@ -328,7 +328,7 @@ def setup(product_name):
if CONF.log_config:
logging.config.fileConfig(CONF.log_config)
else:
_setup_logging_from_conf(product_name)
_setup_logging_from_conf()
sys.excepthook = _create_logging_excepthook(product_name)
@ -362,8 +362,8 @@ def _find_facility_from_conf():
return facility
def _setup_logging_from_conf(product_name):
log_root = getLogger(product_name).logger
def _setup_logging_from_conf():
log_root = getLogger(None).logger
for handler in log_root.handlers:
log_root.removeHandler(handler)
@ -401,6 +401,7 @@ def _setup_logging_from_conf(product_name):
if CONF.log_format:
handler.setFormatter(logging.Formatter(fmt=CONF.log_format,
datefmt=datefmt))
else:
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
if CONF.debug:

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC.
# Copyright 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 OpenStack, LLC.
# Copyright (c) 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -25,25 +25,27 @@ Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
AMQP, but is deprecated and predates this code.
"""
import collections
import inspect
import sys
import uuid
from eventlet import greenpool
from eventlet import pools
from eventlet import semaphore
from eventlet import queue
from eventlet import semaphore
# TODO(pekowsk): Remove import cfg and below comment in Havana.
# This import should no longer be needed when the amqp_rpc_single_reply_queue
# option is removed.
from oslo.config import cfg
from ceilometer.openstack.common import excutils
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import local
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common.rpc import common as rpc_common
# TODO(pekowski): Remove this option in Havana.
amqp_opts = [
cfg.BoolOpt('amqp_rpc_single_reply_queue',
@ -54,6 +56,7 @@ amqp_opts = [
cfg.CONF.register_opts(amqp_opts)
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
@ -236,6 +239,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
'failure': failure}
if ending:
msg['ending'] = True
_add_unique_id(msg)
# If a reply_q exists, add the msg_id to the reply and pass the
# reply_q to direct_send() to use it as the response queue.
# Otherwise use the msg_id for backward compatibilty.
@ -302,6 +306,37 @@ def pack_context(msg, context):
msg.update(context_d)
class _MsgIdCache(object):
"""This class checks any duplicate messages."""
# NOTE: This value is considered can be a configuration item, but
# it is not necessary to change its value in most cases,
# so let this value as static for now.
DUP_MSG_CHECK_SIZE = 16
def __init__(self, **kwargs):
self.prev_msgids = collections.deque([],
maxlen=self.DUP_MSG_CHECK_SIZE)
def check_duplicate_message(self, message_data):
"""AMQP consumers may read same message twice when exceptions occur
before ack is returned. This method prevents doing it.
"""
if UNIQUE_ID in message_data:
msg_id = message_data[UNIQUE_ID]
if msg_id not in self.prev_msgids:
self.prev_msgids.append(msg_id)
else:
raise rpc_common.DuplicateMessageError(msg_id=msg_id)
def _add_unique_id(msg):
"""Add unique_id for checking duplicate messages."""
unique_id = uuid.uuid4().hex
msg.update({UNIQUE_ID: unique_id})
LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
class _ThreadPoolWithWait(object):
"""Base class for a delayed invocation manager used by
the Connection class to start up green threads
@ -349,6 +384,7 @@ class ProxyCallback(_ThreadPoolWithWait):
connection_pool=connection_pool,
)
self.proxy = proxy
self.msg_id_cache = _MsgIdCache()
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
@ -368,6 +404,7 @@ class ProxyCallback(_ThreadPoolWithWait):
if hasattr(local.store, 'context'):
del local.store.context
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
self.msg_id_cache.check_duplicate_message(message_data)
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
args = message_data.get('args', {})
@ -406,9 +443,11 @@ class ProxyCallback(_ThreadPoolWithWait):
connection_pool=self.connection_pool,
log_failure=False)
except Exception:
LOG.exception(_('Exception during message handling'))
ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool)
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()
LOG.error(_('Exception during message handling'),
exc_info=exc_info)
ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
class MulticallProxyWaiter(object):
@ -422,6 +461,7 @@ class MulticallProxyWaiter(object):
self._dataqueue = queue.LightQueue()
# Add this caller to the reply proxy's call_waiters
self._reply_proxy.add_call_waiter(self, self._msg_id)
self.msg_id_cache = _MsgIdCache()
def put(self, data):
self._dataqueue.put(data)
@ -435,6 +475,7 @@ class MulticallProxyWaiter(object):
def _process_data(self, data):
result = None
self.msg_id_cache.check_duplicate_message(data)
if data['failure']:
failure = data['failure']
result = rpc_common.deserialize_remote_exception(self._conf,
@ -479,6 +520,7 @@ class MulticallWaiter(object):
self._done = False
self._got_ending = False
self._conf = conf
self.msg_id_cache = _MsgIdCache()
def done(self):
if self._done:
@ -490,6 +532,7 @@ class MulticallWaiter(object):
def __call__(self, data):
"""The consume() callback will call this. Store the result."""
self.msg_id_cache.check_duplicate_message(data)
if data['failure']:
failure = data['failure']
self._result = rpc_common.deserialize_remote_exception(self._conf,
@ -542,6 +585,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
_add_unique_id(msg)
pack_context(msg, context)
# TODO(pekowski): Remove this flag and the code under the if clause
@ -575,6 +619,7 @@ def call(conf, context, topic, msg, timeout, connection_pool):
def cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, rpc_common.serialize_msg(msg))
@ -583,6 +628,7 @@ def cast(conf, context, topic, msg, connection_pool):
def fanout_cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
@ -590,6 +636,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
"""Sends a message on a topic to a specific server."""
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
@ -599,6 +646,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
def fanout_cast_to_server(conf, context, server_params, topic, msg,
connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
@ -610,6 +658,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope):
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
dict(event_type=msg.get('event_type'),
topic=topic))
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
if envelope:

View File

@ -125,6 +125,10 @@ class Timeout(RPCException):
message = _("Timeout while waiting on RPC response.")
class DuplicateMessageError(RPCException):
message = _("Found duplicate message(%(msg_id)s). Skipping it.")
class InvalidRPCConnectionReuse(RPCException):
message = _("Invalid reuse of an RPC connection.")

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -624,7 +624,7 @@ class Connection(object):
def _error_callback(exc):
if isinstance(exc, socket.timeout):
LOG.exception(_('Timed out waiting for RPC response: %s') %
LOG.debug(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
else:

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC
# Copyright 2011 OpenStack Foundation
# Copyright 2011 - 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -415,7 +415,7 @@ class Connection(object):
def _error_callback(exc):
if isinstance(exc, qpid_exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
LOG.debug(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
else:

View File

@ -16,6 +16,7 @@
import os
import pprint
import re
import socket
import sys
import types
@ -25,6 +26,7 @@ import eventlet
import greenlet
from oslo.config import cfg
from ceilometer.openstack.common import excutils
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import jsonutils
@ -91,8 +93,8 @@ def _serialize(data):
try:
return jsonutils.dumps(data, ensure_ascii=True)
except TypeError:
with excutils.save_and_reraise_exception():
LOG.error(_("JSON serialization failed."))
raise
def _deserialize(data):
@ -430,6 +432,8 @@ class ZmqProxy(ZmqBaseReactor):
def __init__(self, conf):
super(ZmqProxy, self).__init__(conf)
pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
self.topic_proxy = {}
@ -455,6 +459,13 @@ class ZmqProxy(ZmqBaseReactor):
LOG.info(_("Creating proxy for topic: %s"), topic)
try:
# The topic is received over the network,
# don't trust this input.
if self.badchars.search(topic) is not None:
emsg = _("Topic contained dangerous characters.")
LOG.warn(emsg)
raise RPCException(emsg)
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
(ipc_dir, topic),
sock_type, bind=True)
@ -511,9 +522,9 @@ class ZmqProxy(ZmqBaseReactor):
ipc_dir, run_as_root=True)
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
except utils.ProcessExecutionError:
with excutils.save_and_reraise_exception():
LOG.error(_("Could not create IPC directory %s") %
(ipc_dir, ))
raise
try:
self.register(consumption_proxy,
@ -521,9 +532,9 @@ class ZmqProxy(ZmqBaseReactor):
zmq.PULL,
out_bind=True)
except zmq.ZMQError:
with excutils.save_and_reraise_exception():
LOG.error(_("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use."))
raise
super(ZmqProxy, self).consume_in_thread()
@ -594,6 +605,9 @@ class Connection(rpc_common.Connection):
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
# Register with matchmaker.
_get_matchmaker().register(topic, CONF.rpc_zmq_host)
# Subscription scenarios
if fanout:
sock_type = zmq.SUB
@ -620,6 +634,10 @@ class Connection(rpc_common.Connection):
self.topics.append(topic)
def close(self):
_get_matchmaker().stop_heartbeat()
for topic in self.topics:
_get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
self.reactor.close()
self.topics = []
@ -627,6 +645,7 @@ class Connection(rpc_common.Connection):
self.reactor.wait()
def consume_in_thread(self):
_get_matchmaker().start_heartbeat()
self.reactor.consume_in_thread()
@ -742,7 +761,7 @@ def _multi_send(method, context, topic, msg, timeout=None,
LOG.warn(_("No matchmaker results. Not casting."))
# While not strictly a timeout, callers know how to handle
# this exception and a timeout isn't too big a lie.
raise rpc_common.Timeout, "No match from matchmaker."
raise rpc_common.Timeout(_("No match from matchmaker."))
# This supports brokerless fanout (addresses > 1)
for queue in queues:
@ -785,7 +804,7 @@ def fanout_cast(conf, context, topic, msg, **kwargs):
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
def notify(conf, context, topic, msg, **kwargs):
def notify(conf, context, topic, msg, envelope):
"""
Send notification event.
Notifications are sent to topic-priority.
@ -793,9 +812,8 @@ def notify(conf, context, topic, msg, **kwargs):
"""
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
kwargs['envelope'] = kwargs.get('envelope', True)
cast(conf, context, topic, msg, **kwargs)
topic = topic.replace('.', '-')
cast(conf, context, topic, msg, envelope=envelope)
def cleanup():

View File

@ -22,6 +22,7 @@ import contextlib
import itertools
import json
import eventlet
from oslo.config import cfg
from ceilometer.openstack.common.gettextutils import _
@ -33,6 +34,12 @@ matchmaker_opts = [
cfg.StrOpt('matchmaker_ringfile',
default='/etc/nova/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
cfg.IntOpt('matchmaker_heartbeat_freq',
default='300',
help='Heartbeat frequency'),
cfg.IntOpt('matchmaker_heartbeat_ttl',
default='600',
help='Heartbeat time-to-live.'),
]
CONF = cfg.CONF
@ -70,12 +77,73 @@ class Binding(object):
class MatchMakerBase(object):
"""Match Maker Base Class."""
"""
Match Maker Base Class.
Build off HeartbeatMatchMakerBase if building a
heartbeat-capable MatchMaker.
"""
def __init__(self):
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
self.bindings = []
self.no_heartbeat_msg = _('Matchmaker does not implement '
'registration or heartbeat.')
def register(self, key, host):
"""
Register a host on a backend.
Heartbeats, if applicable, may keepalive registration.
"""
pass
def ack_alive(self, key, host):
"""
Acknowledge that a key.host is alive.
Used internally for updating heartbeats,
but may also be used publically to acknowledge
a system is alive (i.e. rpc message successfully
sent to host)
"""
pass
def is_alive(self, topic, host):
"""
Checks if a host is alive.
"""
pass
def expire(self, topic, host):
"""
Explicitly expire a host's registration.
"""
pass
def send_heartbeats(self):
"""
Send all heartbeats.
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
pass
def unregister(self, key, host):
"""
Unregister a topic.
"""
pass
def start_heartbeat(self):
"""
Spawn heartbeat greenthread.
"""
pass
def stop_heartbeat(self):
"""
Destroys the heartbeat greenthread.
"""
pass
def add_binding(self, binding, rule, last=True):
self.bindings.append((binding, rule, False, last))
@ -99,6 +167,103 @@ class MatchMakerBase(object):
return workers
class HeartbeatMatchMakerBase(MatchMakerBase):
"""
Base for a heart-beat capable MatchMaker.
Provides common methods for registering,
unregistering, and maintaining heartbeats.
"""
def __init__(self):
self.hosts = set()
self._heart = None
self.host_topic = {}
super(HeartbeatMatchMakerBase, self).__init__()
def send_heartbeats(self):
"""
Send all heartbeats.
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
for key, host in self.host_topic:
self.ack_alive(key, host)
def ack_alive(self, key, host):
"""
Acknowledge that a host.topic is alive.
Used internally for updating heartbeats,
but may also be used publically to acknowledge
a system is alive (i.e. rpc message successfully
sent to host)
"""
raise NotImplementedError("Must implement ack_alive")
def backend_register(self, key, host):
"""
Implements registration logic.
Called by register(self,key,host)
"""
raise NotImplementedError("Must implement backend_register")
def backend_unregister(self, key, key_host):
"""
Implements de-registration logic.
Called by unregister(self,key,host)
"""
raise NotImplementedError("Must implement backend_unregister")
def register(self, key, host):
"""
Register a host on a backend.
Heartbeats, if applicable, may keepalive registration.
"""
self.hosts.add(host)
self.host_topic[(key, host)] = host
key_host = '.'.join((key, host))
self.backend_register(key, key_host)
self.ack_alive(key, host)
def unregister(self, key, host):
"""
Unregister a topic.
"""
if (key, host) in self.host_topic:
del self.host_topic[(key, host)]
self.hosts.discard(host)
self.backend_unregister(key, '.'.join((key, host)))
LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host)))
def start_heartbeat(self):
"""
Implementation of MatchMakerBase.start_heartbeat
Launches greenthread looping send_heartbeats(),
yielding for CONF.matchmaker_heartbeat_freq seconds
between iterations.
"""
if len(self.hosts) == 0:
raise MatchMakerException(
_("Register before starting heartbeat."))
def do_heartbeat():
while True:
self.send_heartbeats()
eventlet.sleep(CONF.matchmaker_heartbeat_freq)
self._heart = eventlet.spawn(do_heartbeat)
def stop_heartbeat(self):
"""
Destroys the heartbeat greenthread.
"""
if self._heart:
self._heart.kill()
class DirectBinding(Binding):
"""
Specifies a host in the key via a '.' character

View File

@ -0,0 +1,149 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should accept a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
from oslo.config import cfg
from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common.rpc import matchmaker as mm_common
redis = importutils.try_import('redis')
matchmaker_redis_opts = [
cfg.StrOpt('host',
default='127.0.0.1',
help='Host to locate redis'),
cfg.IntOpt('port',
default=6379,
help='Use this port to connect to redis host.'),
cfg.StrOpt('password',
default=None,
help='Password for Redis server. (optional)'),
]
CONF = cfg.CONF
opt_group = cfg.OptGroup(name='matchmaker_redis',
title='Options for Redis-based MatchMaker')
CONF.register_group(opt_group)
CONF.register_opts(matchmaker_redis_opts, opt_group)
LOG = logging.getLogger(__name__)
class RedisExchange(mm_common.Exchange):
def __init__(self, matchmaker):
self.matchmaker = matchmaker
self.redis = matchmaker.redis
super(RedisExchange, self).__init__()
class RedisTopicExchange(RedisExchange):
"""
Exchange where all topic keys are split, sending to second half.
i.e. "compute.host" sends a message to "compute" running on "host"
"""
def run(self, topic):
while True:
member_name = self.redis.srandmember(topic)
if not member_name:
# If this happens, there are no
# longer any members.
break
if not self.matchmaker.is_alive(topic, member_name):
continue
host = member_name.split('.', 1)[1]
return [(member_name, host)]
return []
class RedisFanoutExchange(RedisExchange):
"""
Return a list of all hosts.
"""
def run(self, topic):
topic = topic.split('~', 1)[1]
hosts = self.redis.smembers(topic)
good_hosts = filter(
lambda host: self.matchmaker.is_alive(topic, host), hosts)
return [(x, x.split('.', 1)[1]) for x in good_hosts]
class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
"""
MatchMaker registering and looking-up hosts with a Redis server.
"""
def __init__(self):
super(MatchMakerRedis, self).__init__()
if not redis:
raise ImportError("Failed to import module redis.")
self.redis = redis.StrictRedis(
host=CONF.matchmaker_redis.host,
port=CONF.matchmaker_redis.port,
password=CONF.matchmaker_redis.password)
self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
def ack_alive(self, key, host):
topic = "%s.%s" % (key, host)
if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
# If we could not update the expiration, the key
# might have been pruned. Re-register, creating a new
# key in Redis.
self.register(self.topic_host[host], host)
def is_alive(self, topic, host):
if self.redis.ttl(host) == -1:
self.expire(topic, host)
return False
return True
def expire(self, topic, host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.delete(host)
pipe.srem(topic, host)
pipe.execute()
def backend_register(self, key, key_host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.sadd(key, key_host)
# No value is needed, we just
# care if it exists. Sets aren't viable
# because only keys can expire.
pipe.set(key_host, '')
pipe.execute()
def backend_unregister(self, key, key_host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.srem(key, key_host)
pipe.delete(key_host)
pipe.execute()

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
@ -43,6 +43,11 @@ def parse_mailmap(mailmap='.mailmap'):
return mapping
def _parse_git_mailmap(git_dir, mailmap='.mailmap'):
mailmap = os.path.join(os.path.dirname(git_dir), mailmap)
return parse_mailmap(mailmap)
def canonicalize_emails(changelog, mapping):
"""Takes in a string and an email alias mapping and replaces all
instances of the aliases in the string with their real email.
@ -127,14 +132,26 @@ def _run_shell_command(cmd, throw_on_error=False):
return out[0].strip()
def _get_git_directory():
parent_dir = os.path.dirname(__file__)
while True:
git_dir = os.path.join(parent_dir, '.git')
if os.path.exists(git_dir):
return git_dir
parent_dir, child = os.path.split(parent_dir)
if not child: # reached to root dir
return None
def write_git_changelog():
"""Write a changelog based on the git changelog."""
new_changelog = 'ChangeLog'
git_dir = _get_git_directory()
if not os.getenv('SKIP_WRITE_GIT_CHANGELOG'):
if os.path.exists('.git'):
git_log_cmd = 'git log --stat'
if git_dir:
git_log_cmd = 'git --git-dir=%s log' % git_dir
changelog = _run_shell_command(git_log_cmd)
mailmap = parse_mailmap()
mailmap = _parse_git_mailmap(git_dir)
with open(new_changelog, "w") as changelog_file:
changelog_file.write(canonicalize_emails(changelog, mailmap))
else:
@ -146,13 +163,15 @@ def generate_authors():
jenkins_email = 'jenkins@review.(openstack|stackforge).org'
old_authors = 'AUTHORS.in'
new_authors = 'AUTHORS'
git_dir = _get_git_directory()
if not os.getenv('SKIP_GENERATE_AUTHORS'):
if os.path.exists('.git'):
if git_dir:
# don't include jenkins email address in AUTHORS file
git_log_cmd = ("git log --format='%aN <%aE>' | sort -u | "
git_log_cmd = ("git --git-dir=" + git_dir +
" log --format='%aN <%aE>' | sort -u | "
"egrep -v '" + jenkins_email + "'")
changelog = _run_shell_command(git_log_cmd)
mailmap = parse_mailmap()
mailmap = _parse_git_mailmap(git_dir)
with open(new_authors, 'w') as new_authors_fh:
new_authors_fh.write(canonicalize_emails(changelog, mailmap))
if os.path.exists(old_authors):
@ -258,19 +277,21 @@ def get_cmdclass():
return cmdclass
def _get_revno():
def _get_revno(git_dir):
"""Return the number of commits since the most recent tag.
We use git-describe to find this out, but if there are no
tags then we fall back to counting commits since the beginning
of time.
"""
describe = _run_shell_command("git describe --always")
describe = _run_shell_command(
"git --git-dir=%s describe --always" % git_dir)
if "-" in describe:
return describe.rsplit("-", 2)[-2]
# no tags found
revlist = _run_shell_command("git rev-list --abbrev-commit HEAD")
revlist = _run_shell_command(
"git --git-dir=%s rev-list --abbrev-commit HEAD" % git_dir)
return len(revlist.splitlines())
@ -279,18 +300,21 @@ def _get_version_from_git(pre_version):
revision if there is one, or tag plus number of additional revisions
if the current revision has no tag."""
if os.path.exists('.git'):
git_dir = _get_git_directory()
if git_dir:
if pre_version:
try:
return _run_shell_command(
"git describe --exact-match",
"git --git-dir=" + git_dir + " describe --exact-match",
throw_on_error=True).replace('-', '.')
except Exception:
sha = _run_shell_command("git log -n1 --pretty=format:%h")
return "%s.a%s.g%s" % (pre_version, _get_revno(), sha)
sha = _run_shell_command(
"git --git-dir=" + git_dir + " log -n1 --pretty=format:%h")
return "%s.a%s.g%s" % (pre_version, _get_revno(git_dir), sha)
else:
return _run_shell_command(
"git describe --always").replace('-', '.')
"git --git-dir=" + git_dir + " describe --always").replace(
'-', '.')
return None

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -25,18 +25,22 @@ import datetime
import iso8601
TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
# ISO 8601 extended time format with microseconds
_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f'
_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
def isotime(at=None):
def isotime(at=None, subsecond=False):
"""Stringify time in ISO 8601 format"""
if not at:
at = utcnow()
str = at.strftime(TIME_FORMAT)
st = at.strftime(_ISO8601_TIME_FORMAT
if not subsecond
else _ISO8601_TIME_FORMAT_SUBSECOND)
tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
str += ('Z' if tz == 'UTC' else tz)
return str
st += ('Z' if tz == 'UTC' else tz)
return st
def parse_isotime(timestr):

View File

@ -1,5 +1,5 @@
# Copyright 2012 OpenStack LLC
# Copyright 2012 OpenStack Foundation
# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may