diff --git a/bin/ceilometer-rpc-zmq-receiver b/bin/ceilometer-rpc-zmq-receiver index 8ac28b6e..f6e878ef 100755 --- a/bin/ceilometer-rpc-zmq-receiver +++ b/bin/ceilometer-rpc-zmq-receiver @@ -1,7 +1,7 @@ #!/usr/bin/env python # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC +# Copyright 2011 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -39,10 +39,10 @@ from ceilometer.openstack.common.rpc import impl_zmq CONF = cfg.CONF CONF.register_opts(rpc.rpc_opts) CONF.register_opts(impl_zmq.zmq_opts) -CONF(sys.argv[1:], project='ceilometer') def main(): + CONF(sys.argv[1:], project='ceilometer') logging.setup("ceilometer") with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor: diff --git a/ceilometer/openstack/common/context.py b/ceilometer/openstack/common/context.py index dd7dd04c..e9cfd73c 100644 --- a/ceilometer/openstack/common/context.py +++ b/ceilometer/openstack/common/context.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -37,9 +37,9 @@ class RequestContext(object): accesses the system, as well as additional request information. """ - def __init__(self, auth_tok=None, user=None, tenant=None, is_admin=False, + def __init__(self, auth_token=None, user=None, tenant=None, is_admin=False, read_only=False, show_deleted=False, request_id=None): - self.auth_tok = auth_tok + self.auth_token = auth_token self.user = user self.tenant = tenant self.is_admin = is_admin @@ -55,7 +55,7 @@ class RequestContext(object): 'is_admin': self.is_admin, 'read_only': self.read_only, 'show_deleted': self.show_deleted, - 'auth_token': self.auth_tok, + 'auth_token': self.auth_token, 'request_id': self.request_id} diff --git a/ceilometer/openstack/common/eventlet_backdoor.py b/ceilometer/openstack/common/eventlet_backdoor.py index 8b81ebf8..c0ad460f 100644 --- a/ceilometer/openstack/common/eventlet_backdoor.py +++ b/ceilometer/openstack/common/eventlet_backdoor.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (c) 2012 Openstack, LLC. +# Copyright (c) 2012 OpenStack Foundation. # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. # diff --git a/ceilometer/openstack/common/excutils.py b/ceilometer/openstack/common/excutils.py index 414cc27e..fce39595 100644 --- a/ceilometer/openstack/common/excutils.py +++ b/ceilometer/openstack/common/excutils.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # Copyright 2012, Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/importutils.py b/ceilometer/openstack/common/importutils.py index 9dec764f..3bd277f4 100644 --- a/ceilometer/openstack/common/importutils.py +++ b/ceilometer/openstack/common/importutils.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/jsonutils.py b/ceilometer/openstack/common/jsonutils.py index c34a8c32..ab3620c3 100644 --- a/ceilometer/openstack/common/jsonutils.py +++ b/ceilometer/openstack/common/jsonutils.py @@ -38,14 +38,10 @@ import functools import inspect import itertools import json -import logging import xmlrpclib -from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import timeutils -LOG = logging.getLogger(__name__) - def to_primitive(value, convert_instances=False, convert_datetime=True, level=0, max_depth=3): @@ -85,8 +81,6 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, return 'mock' if level > max_depth: - LOG.error(_('Max serialization depth exceeded on object: %d %s'), - level, value) return '?' # The try block may not be necessary after the class check above, diff --git a/ceilometer/openstack/common/local.py b/ceilometer/openstack/common/local.py index 8bdc837a..f1bfc824 100644 --- a/ceilometer/openstack/common/local.py +++ b/ceilometer/openstack/common/local.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/log.py b/ceilometer/openstack/common/log.py index 01d2a68f..2c482f34 100644 --- a/ceilometer/openstack/common/log.py +++ b/ceilometer/openstack/common/log.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. @@ -328,7 +328,7 @@ def setup(product_name): if CONF.log_config: logging.config.fileConfig(CONF.log_config) else: - _setup_logging_from_conf(product_name) + _setup_logging_from_conf() sys.excepthook = _create_logging_excepthook(product_name) @@ -362,8 +362,8 @@ def _find_facility_from_conf(): return facility -def _setup_logging_from_conf(product_name): - log_root = getLogger(product_name).logger +def _setup_logging_from_conf(): + log_root = getLogger(None).logger for handler in log_root.handlers: log_root.removeHandler(handler) @@ -401,7 +401,8 @@ def _setup_logging_from_conf(product_name): if CONF.log_format: handler.setFormatter(logging.Formatter(fmt=CONF.log_format, datefmt=datefmt)) - handler.setFormatter(LegacyFormatter(datefmt=datefmt)) + else: + handler.setFormatter(LegacyFormatter(datefmt=datefmt)) if CONF.debug: log_root.setLevel(logging.DEBUG) diff --git a/ceilometer/openstack/common/network_utils.py b/ceilometer/openstack/common/network_utils.py index 69f67321..5224e01a 100644 --- a/ceilometer/openstack/common/network_utils.py +++ b/ceilometer/openstack/common/network_utils.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2012 OpenStack LLC. +# Copyright 2012 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/notifier/__init__.py b/ceilometer/openstack/common/notifier/__init__.py index 482d54e4..45c3b46a 100644 --- a/ceilometer/openstack/common/notifier/__init__.py +++ b/ceilometer/openstack/common/notifier/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/notifier/api.py b/ceilometer/openstack/common/notifier/api.py index 1361faa5..c1c897dd 100644 --- a/ceilometer/openstack/common/notifier/api.py +++ b/ceilometer/openstack/common/notifier/api.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/notifier/log_notifier.py b/ceilometer/openstack/common/notifier/log_notifier.py index 496fa6d2..fc63e9de 100644 --- a/ceilometer/openstack/common/notifier/log_notifier.py +++ b/ceilometer/openstack/common/notifier/log_notifier.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/notifier/no_op_notifier.py b/ceilometer/openstack/common/notifier/no_op_notifier.py index ee1ddbdc..bc7a56ca 100644 --- a/ceilometer/openstack/common/notifier/no_op_notifier.py +++ b/ceilometer/openstack/common/notifier/no_op_notifier.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/notifier/rpc_notifier.py b/ceilometer/openstack/common/notifier/rpc_notifier.py index 55111007..ee6c0361 100644 --- a/ceilometer/openstack/common/notifier/rpc_notifier.py +++ b/ceilometer/openstack/common/notifier/rpc_notifier.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/notifier/rpc_notifier2.py b/ceilometer/openstack/common/notifier/rpc_notifier2.py index 5de2e8a2..f8a299c2 100644 --- a/ceilometer/openstack/common/notifier/rpc_notifier2.py +++ b/ceilometer/openstack/common/notifier/rpc_notifier2.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/notifier/test_notifier.py b/ceilometer/openstack/common/notifier/test_notifier.py index 5e348803..96c1746b 100644 --- a/ceilometer/openstack/common/notifier/test_notifier.py +++ b/ceilometer/openstack/common/notifier/test_notifier.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/policy.py b/ceilometer/openstack/common/policy.py index 3c482548..155de8b0 100644 --- a/ceilometer/openstack/common/policy.py +++ b/ceilometer/openstack/common/policy.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (c) 2012 OpenStack, LLC. +# Copyright (c) 2012 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/rpc/amqp.py b/ceilometer/openstack/common/rpc/amqp.py index 3574891b..d8e6ba06 100644 --- a/ceilometer/openstack/common/rpc/amqp.py +++ b/ceilometer/openstack/common/rpc/amqp.py @@ -25,25 +25,27 @@ Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses AMQP, but is deprecated and predates this code. """ +import collections import inspect import sys import uuid from eventlet import greenpool from eventlet import pools -from eventlet import semaphore from eventlet import queue - +from eventlet import semaphore # TODO(pekowsk): Remove import cfg and below comment in Havana. # This import should no longer be needed when the amqp_rpc_single_reply_queue # option is removed. from oslo.config import cfg + from ceilometer.openstack.common import excutils from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import local from ceilometer.openstack.common import log as logging from ceilometer.openstack.common.rpc import common as rpc_common + # TODO(pekowski): Remove this option in Havana. amqp_opts = [ cfg.BoolOpt('amqp_rpc_single_reply_queue', @@ -54,6 +56,7 @@ amqp_opts = [ cfg.CONF.register_opts(amqp_opts) +UNIQUE_ID = '_unique_id' LOG = logging.getLogger(__name__) @@ -236,6 +239,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None, 'failure': failure} if ending: msg['ending'] = True + _add_unique_id(msg) # If a reply_q exists, add the msg_id to the reply and pass the # reply_q to direct_send() to use it as the response queue. # Otherwise use the msg_id for backward compatibilty. @@ -302,6 +306,37 @@ def pack_context(msg, context): msg.update(context_d) +class _MsgIdCache(object): + """This class checks any duplicate messages.""" + + # NOTE: This value is considered can be a configuration item, but + # it is not necessary to change its value in most cases, + # so let this value as static for now. + DUP_MSG_CHECK_SIZE = 16 + + def __init__(self, **kwargs): + self.prev_msgids = collections.deque([], + maxlen=self.DUP_MSG_CHECK_SIZE) + + def check_duplicate_message(self, message_data): + """AMQP consumers may read same message twice when exceptions occur + before ack is returned. This method prevents doing it. + """ + if UNIQUE_ID in message_data: + msg_id = message_data[UNIQUE_ID] + if msg_id not in self.prev_msgids: + self.prev_msgids.append(msg_id) + else: + raise rpc_common.DuplicateMessageError(msg_id=msg_id) + + +def _add_unique_id(msg): + """Add unique_id for checking duplicate messages.""" + unique_id = uuid.uuid4().hex + msg.update({UNIQUE_ID: unique_id}) + LOG.debug(_('UNIQUE_ID is %s.') % (unique_id)) + + class _ThreadPoolWithWait(object): """Base class for a delayed invocation manager used by the Connection class to start up green threads @@ -349,6 +384,7 @@ class ProxyCallback(_ThreadPoolWithWait): connection_pool=connection_pool, ) self.proxy = proxy + self.msg_id_cache = _MsgIdCache() def __call__(self, message_data): """Consumer callback to call a method on a proxy object. @@ -368,6 +404,7 @@ class ProxyCallback(_ThreadPoolWithWait): if hasattr(local.store, 'context'): del local.store.context rpc_common._safe_log(LOG.debug, _('received %s'), message_data) + self.msg_id_cache.check_duplicate_message(message_data) ctxt = unpack_context(self.conf, message_data) method = message_data.get('method') args = message_data.get('args', {}) @@ -406,9 +443,11 @@ class ProxyCallback(_ThreadPoolWithWait): connection_pool=self.connection_pool, log_failure=False) except Exception: - LOG.exception(_('Exception during message handling')) - ctxt.reply(None, sys.exc_info(), - connection_pool=self.connection_pool) + # sys.exc_info() is deleted by LOG.exception(). + exc_info = sys.exc_info() + LOG.error(_('Exception during message handling'), + exc_info=exc_info) + ctxt.reply(None, exc_info, connection_pool=self.connection_pool) class MulticallProxyWaiter(object): @@ -422,6 +461,7 @@ class MulticallProxyWaiter(object): self._dataqueue = queue.LightQueue() # Add this caller to the reply proxy's call_waiters self._reply_proxy.add_call_waiter(self, self._msg_id) + self.msg_id_cache = _MsgIdCache() def put(self, data): self._dataqueue.put(data) @@ -435,6 +475,7 @@ class MulticallProxyWaiter(object): def _process_data(self, data): result = None + self.msg_id_cache.check_duplicate_message(data) if data['failure']: failure = data['failure'] result = rpc_common.deserialize_remote_exception(self._conf, @@ -479,6 +520,7 @@ class MulticallWaiter(object): self._done = False self._got_ending = False self._conf = conf + self.msg_id_cache = _MsgIdCache() def done(self): if self._done: @@ -490,6 +532,7 @@ class MulticallWaiter(object): def __call__(self, data): """The consume() callback will call this. Store the result.""" + self.msg_id_cache.check_duplicate_message(data) if data['failure']: failure = data['failure'] self._result = rpc_common.deserialize_remote_exception(self._conf, @@ -542,6 +585,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) LOG.debug(_('MSG_ID is %s') % (msg_id)) + _add_unique_id(msg) pack_context(msg, context) # TODO(pekowski): Remove this flag and the code under the if clause @@ -575,6 +619,7 @@ def call(conf, context, topic, msg, timeout, connection_pool): def cast(conf, context, topic, msg, connection_pool): """Sends a message on a topic without waiting for a response.""" LOG.debug(_('Making asynchronous cast on %s...'), topic) + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: conn.topic_send(topic, rpc_common.serialize_msg(msg)) @@ -583,6 +628,7 @@ def cast(conf, context, topic, msg, connection_pool): def fanout_cast(conf, context, topic, msg, connection_pool): """Sends a message on a fanout exchange without waiting for a response.""" LOG.debug(_('Making asynchronous fanout cast...')) + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: conn.fanout_send(topic, rpc_common.serialize_msg(msg)) @@ -590,6 +636,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool): def cast_to_server(conf, context, server_params, topic, msg, connection_pool): """Sends a message on a topic to a specific server.""" + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: @@ -599,6 +646,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool): def fanout_cast_to_server(conf, context, server_params, topic, msg, connection_pool): """Sends a message on a fanout exchange to a specific server.""" + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: @@ -610,6 +658,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope): LOG.debug(_('Sending %(event_type)s on %(topic)s'), dict(event_type=msg.get('event_type'), topic=topic)) + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: if envelope: diff --git a/ceilometer/openstack/common/rpc/common.py b/ceilometer/openstack/common/rpc/common.py index 88ebe42d..b0f5c694 100644 --- a/ceilometer/openstack/common/rpc/common.py +++ b/ceilometer/openstack/common/rpc/common.py @@ -125,6 +125,10 @@ class Timeout(RPCException): message = _("Timeout while waiting on RPC response.") +class DuplicateMessageError(RPCException): + message = _("Found duplicate message(%(msg_id)s). Skipping it.") + + class InvalidRPCConnectionReuse(RPCException): message = _("Invalid reuse of an RPC connection.") diff --git a/ceilometer/openstack/common/rpc/impl_fake.py b/ceilometer/openstack/common/rpc/impl_fake.py index 72041bfe..7c3e69ca 100644 --- a/ceilometer/openstack/common/rpc/impl_fake.py +++ b/ceilometer/openstack/common/rpc/impl_fake.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC +# Copyright 2011 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain diff --git a/ceilometer/openstack/common/rpc/impl_kombu.py b/ceilometer/openstack/common/rpc/impl_kombu.py index d3c86a62..9e2a34bc 100644 --- a/ceilometer/openstack/common/rpc/impl_kombu.py +++ b/ceilometer/openstack/common/rpc/impl_kombu.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC +# Copyright 2011 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -624,8 +624,8 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, socket.timeout): - LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + LOG.debug(_('Timed out waiting for RPC response: %s') % + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % diff --git a/ceilometer/openstack/common/rpc/impl_qpid.py b/ceilometer/openstack/common/rpc/impl_qpid.py index 6bdec04e..cd28cc34 100644 --- a/ceilometer/openstack/common/rpc/impl_qpid.py +++ b/ceilometer/openstack/common/rpc/impl_qpid.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC +# Copyright 2011 OpenStack Foundation # Copyright 2011 - 2012, Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -415,8 +415,8 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, qpid_exceptions.Empty): - LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + LOG.debug(_('Timed out waiting for RPC response: %s') % + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % diff --git a/ceilometer/openstack/common/rpc/impl_zmq.py b/ceilometer/openstack/common/rpc/impl_zmq.py index 4a3f5ff3..1ee5554a 100644 --- a/ceilometer/openstack/common/rpc/impl_zmq.py +++ b/ceilometer/openstack/common/rpc/impl_zmq.py @@ -16,6 +16,7 @@ import os import pprint +import re import socket import sys import types @@ -25,6 +26,7 @@ import eventlet import greenlet from oslo.config import cfg +from ceilometer.openstack.common import excutils from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import importutils from ceilometer.openstack.common import jsonutils @@ -91,8 +93,8 @@ def _serialize(data): try: return jsonutils.dumps(data, ensure_ascii=True) except TypeError: - LOG.error(_("JSON serialization failed.")) - raise + with excutils.save_and_reraise_exception(): + LOG.error(_("JSON serialization failed.")) def _deserialize(data): @@ -430,6 +432,8 @@ class ZmqProxy(ZmqBaseReactor): def __init__(self, conf): super(ZmqProxy, self).__init__(conf) + pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\')) + self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep))) self.topic_proxy = {} @@ -455,6 +459,13 @@ class ZmqProxy(ZmqBaseReactor): LOG.info(_("Creating proxy for topic: %s"), topic) try: + # The topic is received over the network, + # don't trust this input. + if self.badchars.search(topic) is not None: + emsg = _("Topic contained dangerous characters.") + LOG.warn(emsg) + raise RPCException(emsg) + out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), sock_type, bind=True) @@ -511,9 +522,9 @@ class ZmqProxy(ZmqBaseReactor): ipc_dir, run_as_root=True) utils.execute('chmod', '750', ipc_dir, run_as_root=True) except utils.ProcessExecutionError: - LOG.error(_("Could not create IPC directory %s") % - (ipc_dir, )) - raise + with excutils.save_and_reraise_exception(): + LOG.error(_("Could not create IPC directory %s") % + (ipc_dir, )) try: self.register(consumption_proxy, @@ -521,9 +532,9 @@ class ZmqProxy(ZmqBaseReactor): zmq.PULL, out_bind=True) except zmq.ZMQError: - LOG.error(_("Could not create ZeroMQ receiver daemon. " - "Socket may already be in use.")) - raise + with excutils.save_and_reraise_exception(): + LOG.error(_("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.")) super(ZmqProxy, self).consume_in_thread() @@ -594,6 +605,9 @@ class Connection(rpc_common.Connection): self.reactor = ZmqReactor(conf) def create_consumer(self, topic, proxy, fanout=False): + # Register with matchmaker. + _get_matchmaker().register(topic, CONF.rpc_zmq_host) + # Subscription scenarios if fanout: sock_type = zmq.SUB @@ -620,6 +634,10 @@ class Connection(rpc_common.Connection): self.topics.append(topic) def close(self): + _get_matchmaker().stop_heartbeat() + for topic in self.topics: + _get_matchmaker().unregister(topic, CONF.rpc_zmq_host) + self.reactor.close() self.topics = [] @@ -627,6 +645,7 @@ class Connection(rpc_common.Connection): self.reactor.wait() def consume_in_thread(self): + _get_matchmaker().start_heartbeat() self.reactor.consume_in_thread() @@ -742,7 +761,7 @@ def _multi_send(method, context, topic, msg, timeout=None, LOG.warn(_("No matchmaker results. Not casting.")) # While not strictly a timeout, callers know how to handle # this exception and a timeout isn't too big a lie. - raise rpc_common.Timeout, "No match from matchmaker." + raise rpc_common.Timeout(_("No match from matchmaker.")) # This supports brokerless fanout (addresses > 1) for queue in queues: @@ -785,7 +804,7 @@ def fanout_cast(conf, context, topic, msg, **kwargs): _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs) -def notify(conf, context, topic, msg, **kwargs): +def notify(conf, context, topic, msg, envelope): """ Send notification event. Notifications are sent to topic-priority. @@ -793,9 +812,8 @@ def notify(conf, context, topic, msg, **kwargs): """ # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. - topic.replace('.', '-') - kwargs['envelope'] = kwargs.get('envelope', True) - cast(conf, context, topic, msg, **kwargs) + topic = topic.replace('.', '-') + cast(conf, context, topic, msg, envelope=envelope) def cleanup(): diff --git a/ceilometer/openstack/common/rpc/matchmaker.py b/ceilometer/openstack/common/rpc/matchmaker.py index ed5cebac..7ae6b7e8 100644 --- a/ceilometer/openstack/common/rpc/matchmaker.py +++ b/ceilometer/openstack/common/rpc/matchmaker.py @@ -22,6 +22,7 @@ import contextlib import itertools import json +import eventlet from oslo.config import cfg from ceilometer.openstack.common.gettextutils import _ @@ -33,6 +34,12 @@ matchmaker_opts = [ cfg.StrOpt('matchmaker_ringfile', default='/etc/nova/matchmaker_ring.json', help='Matchmaker ring file (JSON)'), + cfg.IntOpt('matchmaker_heartbeat_freq', + default='300', + help='Heartbeat frequency'), + cfg.IntOpt('matchmaker_heartbeat_ttl', + default='600', + help='Heartbeat time-to-live.'), ] CONF = cfg.CONF @@ -70,12 +77,73 @@ class Binding(object): class MatchMakerBase(object): - """Match Maker Base Class.""" - + """ + Match Maker Base Class. + Build off HeartbeatMatchMakerBase if building a + heartbeat-capable MatchMaker. + """ def __init__(self): # Array of tuples. Index [2] toggles negation, [3] is last-if-true self.bindings = [] + self.no_heartbeat_msg = _('Matchmaker does not implement ' + 'registration or heartbeat.') + + def register(self, key, host): + """ + Register a host on a backend. + Heartbeats, if applicable, may keepalive registration. + """ + pass + + def ack_alive(self, key, host): + """ + Acknowledge that a key.host is alive. + Used internally for updating heartbeats, + but may also be used publically to acknowledge + a system is alive (i.e. rpc message successfully + sent to host) + """ + pass + + def is_alive(self, topic, host): + """ + Checks if a host is alive. + """ + pass + + def expire(self, topic, host): + """ + Explicitly expire a host's registration. + """ + pass + + def send_heartbeats(self): + """ + Send all heartbeats. + Use start_heartbeat to spawn a heartbeat greenthread, + which loops this method. + """ + pass + + def unregister(self, key, host): + """ + Unregister a topic. + """ + pass + + def start_heartbeat(self): + """ + Spawn heartbeat greenthread. + """ + pass + + def stop_heartbeat(self): + """ + Destroys the heartbeat greenthread. + """ + pass + def add_binding(self, binding, rule, last=True): self.bindings.append((binding, rule, False, last)) @@ -99,6 +167,103 @@ class MatchMakerBase(object): return workers +class HeartbeatMatchMakerBase(MatchMakerBase): + """ + Base for a heart-beat capable MatchMaker. + Provides common methods for registering, + unregistering, and maintaining heartbeats. + """ + def __init__(self): + self.hosts = set() + self._heart = None + self.host_topic = {} + + super(HeartbeatMatchMakerBase, self).__init__() + + def send_heartbeats(self): + """ + Send all heartbeats. + Use start_heartbeat to spawn a heartbeat greenthread, + which loops this method. + """ + for key, host in self.host_topic: + self.ack_alive(key, host) + + def ack_alive(self, key, host): + """ + Acknowledge that a host.topic is alive. + Used internally for updating heartbeats, + but may also be used publically to acknowledge + a system is alive (i.e. rpc message successfully + sent to host) + """ + raise NotImplementedError("Must implement ack_alive") + + def backend_register(self, key, host): + """ + Implements registration logic. + Called by register(self,key,host) + """ + raise NotImplementedError("Must implement backend_register") + + def backend_unregister(self, key, key_host): + """ + Implements de-registration logic. + Called by unregister(self,key,host) + """ + raise NotImplementedError("Must implement backend_unregister") + + def register(self, key, host): + """ + Register a host on a backend. + Heartbeats, if applicable, may keepalive registration. + """ + self.hosts.add(host) + self.host_topic[(key, host)] = host + key_host = '.'.join((key, host)) + + self.backend_register(key, key_host) + + self.ack_alive(key, host) + + def unregister(self, key, host): + """ + Unregister a topic. + """ + if (key, host) in self.host_topic: + del self.host_topic[(key, host)] + + self.hosts.discard(host) + self.backend_unregister(key, '.'.join((key, host))) + + LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host))) + + def start_heartbeat(self): + """ + Implementation of MatchMakerBase.start_heartbeat + Launches greenthread looping send_heartbeats(), + yielding for CONF.matchmaker_heartbeat_freq seconds + between iterations. + """ + if len(self.hosts) == 0: + raise MatchMakerException( + _("Register before starting heartbeat.")) + + def do_heartbeat(): + while True: + self.send_heartbeats() + eventlet.sleep(CONF.matchmaker_heartbeat_freq) + + self._heart = eventlet.spawn(do_heartbeat) + + def stop_heartbeat(self): + """ + Destroys the heartbeat greenthread. + """ + if self._heart: + self._heart.kill() + + class DirectBinding(Binding): """ Specifies a host in the key via a '.' character diff --git a/ceilometer/openstack/common/rpc/matchmaker_redis.py b/ceilometer/openstack/common/rpc/matchmaker_redis.py new file mode 100644 index 00000000..325a6512 --- /dev/null +++ b/ceilometer/openstack/common/rpc/matchmaker_redis.py @@ -0,0 +1,149 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +The MatchMaker classes should accept a Topic or Fanout exchange key and +return keys for direct exchanges, per (approximate) AMQP parlance. +""" + +from oslo.config import cfg + +from ceilometer.openstack.common import importutils +from ceilometer.openstack.common import log as logging +from ceilometer.openstack.common.rpc import matchmaker as mm_common + +redis = importutils.try_import('redis') + + +matchmaker_redis_opts = [ + cfg.StrOpt('host', + default='127.0.0.1', + help='Host to locate redis'), + cfg.IntOpt('port', + default=6379, + help='Use this port to connect to redis host.'), + cfg.StrOpt('password', + default=None, + help='Password for Redis server. (optional)'), +] + +CONF = cfg.CONF +opt_group = cfg.OptGroup(name='matchmaker_redis', + title='Options for Redis-based MatchMaker') +CONF.register_group(opt_group) +CONF.register_opts(matchmaker_redis_opts, opt_group) +LOG = logging.getLogger(__name__) + + +class RedisExchange(mm_common.Exchange): + def __init__(self, matchmaker): + self.matchmaker = matchmaker + self.redis = matchmaker.redis + super(RedisExchange, self).__init__() + + +class RedisTopicExchange(RedisExchange): + """ + Exchange where all topic keys are split, sending to second half. + i.e. "compute.host" sends a message to "compute" running on "host" + """ + def run(self, topic): + while True: + member_name = self.redis.srandmember(topic) + + if not member_name: + # If this happens, there are no + # longer any members. + break + + if not self.matchmaker.is_alive(topic, member_name): + continue + + host = member_name.split('.', 1)[1] + return [(member_name, host)] + return [] + + +class RedisFanoutExchange(RedisExchange): + """ + Return a list of all hosts. + """ + def run(self, topic): + topic = topic.split('~', 1)[1] + hosts = self.redis.smembers(topic) + good_hosts = filter( + lambda host: self.matchmaker.is_alive(topic, host), hosts) + + return [(x, x.split('.', 1)[1]) for x in good_hosts] + + +class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase): + """ + MatchMaker registering and looking-up hosts with a Redis server. + """ + def __init__(self): + super(MatchMakerRedis, self).__init__() + + if not redis: + raise ImportError("Failed to import module redis.") + + self.redis = redis.StrictRedis( + host=CONF.matchmaker_redis.host, + port=CONF.matchmaker_redis.port, + password=CONF.matchmaker_redis.password) + + self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self)) + self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange()) + self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self)) + + def ack_alive(self, key, host): + topic = "%s.%s" % (key, host) + if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl): + # If we could not update the expiration, the key + # might have been pruned. Re-register, creating a new + # key in Redis. + self.register(self.topic_host[host], host) + + def is_alive(self, topic, host): + if self.redis.ttl(host) == -1: + self.expire(topic, host) + return False + return True + + def expire(self, topic, host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.delete(host) + pipe.srem(topic, host) + pipe.execute() + + def backend_register(self, key, key_host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.sadd(key, key_host) + + # No value is needed, we just + # care if it exists. Sets aren't viable + # because only keys can expire. + pipe.set(key_host, '') + + pipe.execute() + + def backend_unregister(self, key, key_host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.srem(key, key_host) + pipe.delete(key_host) + pipe.execute() diff --git a/ceilometer/openstack/common/setup.py b/ceilometer/openstack/common/setup.py index 22f864d5..030df61c 100644 --- a/ceilometer/openstack/common/setup.py +++ b/ceilometer/openstack/common/setup.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # Copyright 2012-2013 Hewlett-Packard Development Company, L.P. # All Rights Reserved. # @@ -43,6 +43,11 @@ def parse_mailmap(mailmap='.mailmap'): return mapping +def _parse_git_mailmap(git_dir, mailmap='.mailmap'): + mailmap = os.path.join(os.path.dirname(git_dir), mailmap) + return parse_mailmap(mailmap) + + def canonicalize_emails(changelog, mapping): """Takes in a string and an email alias mapping and replaces all instances of the aliases in the string with their real email. @@ -127,14 +132,26 @@ def _run_shell_command(cmd, throw_on_error=False): return out[0].strip() +def _get_git_directory(): + parent_dir = os.path.dirname(__file__) + while True: + git_dir = os.path.join(parent_dir, '.git') + if os.path.exists(git_dir): + return git_dir + parent_dir, child = os.path.split(parent_dir) + if not child: # reached to root dir + return None + + def write_git_changelog(): """Write a changelog based on the git changelog.""" new_changelog = 'ChangeLog' + git_dir = _get_git_directory() if not os.getenv('SKIP_WRITE_GIT_CHANGELOG'): - if os.path.exists('.git'): - git_log_cmd = 'git log --stat' + if git_dir: + git_log_cmd = 'git --git-dir=%s log' % git_dir changelog = _run_shell_command(git_log_cmd) - mailmap = parse_mailmap() + mailmap = _parse_git_mailmap(git_dir) with open(new_changelog, "w") as changelog_file: changelog_file.write(canonicalize_emails(changelog, mailmap)) else: @@ -146,13 +163,15 @@ def generate_authors(): jenkins_email = 'jenkins@review.(openstack|stackforge).org' old_authors = 'AUTHORS.in' new_authors = 'AUTHORS' + git_dir = _get_git_directory() if not os.getenv('SKIP_GENERATE_AUTHORS'): - if os.path.exists('.git'): + if git_dir: # don't include jenkins email address in AUTHORS file - git_log_cmd = ("git log --format='%aN <%aE>' | sort -u | " + git_log_cmd = ("git --git-dir=" + git_dir + + " log --format='%aN <%aE>' | sort -u | " "egrep -v '" + jenkins_email + "'") changelog = _run_shell_command(git_log_cmd) - mailmap = parse_mailmap() + mailmap = _parse_git_mailmap(git_dir) with open(new_authors, 'w') as new_authors_fh: new_authors_fh.write(canonicalize_emails(changelog, mailmap)) if os.path.exists(old_authors): @@ -258,19 +277,21 @@ def get_cmdclass(): return cmdclass -def _get_revno(): +def _get_revno(git_dir): """Return the number of commits since the most recent tag. We use git-describe to find this out, but if there are no tags then we fall back to counting commits since the beginning of time. """ - describe = _run_shell_command("git describe --always") + describe = _run_shell_command( + "git --git-dir=%s describe --always" % git_dir) if "-" in describe: return describe.rsplit("-", 2)[-2] # no tags found - revlist = _run_shell_command("git rev-list --abbrev-commit HEAD") + revlist = _run_shell_command( + "git --git-dir=%s rev-list --abbrev-commit HEAD" % git_dir) return len(revlist.splitlines()) @@ -279,18 +300,21 @@ def _get_version_from_git(pre_version): revision if there is one, or tag plus number of additional revisions if the current revision has no tag.""" - if os.path.exists('.git'): + git_dir = _get_git_directory() + if git_dir: if pre_version: try: return _run_shell_command( - "git describe --exact-match", + "git --git-dir=" + git_dir + " describe --exact-match", throw_on_error=True).replace('-', '.') except Exception: - sha = _run_shell_command("git log -n1 --pretty=format:%h") - return "%s.a%s.g%s" % (pre_version, _get_revno(), sha) + sha = _run_shell_command( + "git --git-dir=" + git_dir + " log -n1 --pretty=format:%h") + return "%s.a%s.g%s" % (pre_version, _get_revno(git_dir), sha) else: return _run_shell_command( - "git describe --always").replace('-', '.') + "git --git-dir=" + git_dir + " describe --always").replace( + '-', '.') return None diff --git a/ceilometer/openstack/common/timeutils.py b/ceilometer/openstack/common/timeutils.py index e2c27405..60943659 100644 --- a/ceilometer/openstack/common/timeutils.py +++ b/ceilometer/openstack/common/timeutils.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -25,18 +25,22 @@ import datetime import iso8601 -TIME_FORMAT = "%Y-%m-%dT%H:%M:%S" -PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" +# ISO 8601 extended time format with microseconds +_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f' +_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S' +PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND -def isotime(at=None): +def isotime(at=None, subsecond=False): """Stringify time in ISO 8601 format""" if not at: at = utcnow() - str = at.strftime(TIME_FORMAT) + st = at.strftime(_ISO8601_TIME_FORMAT + if not subsecond + else _ISO8601_TIME_FORMAT_SUBSECOND) tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC' - str += ('Z' if tz == 'UTC' else tz) - return str + st += ('Z' if tz == 'UTC' else tz) + return st def parse_isotime(timestr): diff --git a/ceilometer/openstack/common/version.py b/ceilometer/openstack/common/version.py index 5500f1b9..d1f83682 100644 --- a/ceilometer/openstack/common/version.py +++ b/ceilometer/openstack/common/version.py @@ -1,5 +1,5 @@ -# Copyright 2012 OpenStack LLC +# Copyright 2012 OpenStack Foundation # Copyright 2012-2013 Hewlett-Packard Development Company, L.P. # # Licensed under the Apache License, Version 2.0 (the "License"); you may