diff --git a/ceilometer/openstack/common/log.py b/ceilometer/openstack/common/log.py index e078f3e5..0683f98d 100644 --- a/ceilometer/openstack/common/log.py +++ b/ceilometer/openstack/common/log.py @@ -54,15 +54,14 @@ log_opts = [ '%(message)s', help='format string to use for log messages with context'), cfg.StrOpt('logging_default_format_string', - default='%(asctime)s %(levelname)s %(name)s [-] %(instance)s' - '%(message)s', + default='%(asctime)s %(process)d %(levelname)s %(name)s [-]' + ' %(instance)s%(message)s', help='format string to use for log messages without context'), cfg.StrOpt('logging_debug_format_suffix', - default='from (pid=%(process)d) %(funcName)s ' - '%(pathname)s:%(lineno)d', + default='%(funcName)s %(pathname)s:%(lineno)d', help='data to append to log format when level is DEBUG'), cfg.StrOpt('logging_exception_prefix', - default='%(asctime)s TRACE %(name)s %(instance)s', + default='%(asctime)s %(process)d TRACE %(name)s %(instance)s', help='prefix each line of exception output with this format'), cfg.ListOpt('default_log_levels', default=[ diff --git a/ceilometer/openstack/common/network_utils.py b/ceilometer/openstack/common/network_utils.py new file mode 100644 index 00000000..69f67321 --- /dev/null +++ b/ceilometer/openstack/common/network_utils.py @@ -0,0 +1,68 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Network-related utilities and helper functions. +""" + +import logging + +LOG = logging.getLogger(__name__) + + +def parse_host_port(address, default_port=None): + """ + Interpret a string as a host:port pair. + An IPv6 address MUST be escaped if accompanied by a port, + because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334 + means both [2001:db8:85a3::8a2e:370:7334] and + [2001:db8:85a3::8a2e:370]:7334. + + >>> parse_host_port('server01:80') + ('server01', 80) + >>> parse_host_port('server01') + ('server01', None) + >>> parse_host_port('server01', default_port=1234) + ('server01', 1234) + >>> parse_host_port('[::1]:80') + ('::1', 80) + >>> parse_host_port('[::1]') + ('::1', None) + >>> parse_host_port('[::1]', default_port=1234) + ('::1', 1234) + >>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234) + ('2001:db8:85a3::8a2e:370:7334', 1234) + + """ + if address[0] == '[': + # Escaped ipv6 + _host, _port = address[1:].split(']') + host = _host + if ':' in _port: + port = _port.split(':')[1] + else: + port = default_port + else: + if address.count(':') == 1: + host, port = address.split(':') + else: + # 0 means ipv4, >1 means ipv6. + # We prohibit unescaped ipv6 addresses with port. + host = address + port = default_port + + return (host, None if port is None else int(port)) diff --git a/ceilometer/openstack/common/notifier/api.py b/ceilometer/openstack/common/notifier/api.py index 3f31eb84..92f654cd 100644 --- a/ceilometer/openstack/common/notifier/api.py +++ b/ceilometer/openstack/common/notifier/api.py @@ -139,8 +139,8 @@ def notify(context, publisher_id, event_type, priority, payload): driver.notify(context, msg) except Exception, e: LOG.exception(_("Problem '%(e)s' attempting to " - "send to notification system. Payload=%(payload)s") % - locals()) + "send to notification system. " + "Payload=%(payload)s") % locals()) _drivers = None @@ -169,7 +169,7 @@ def add_driver(notification_driver): except ImportError as e: LOG.exception(_("Failed to load notifier %s. " "These notifications will not be sent.") % - notification_driver) + notification_driver) else: # Driver is already loaded; just add the object. _drivers[notification_driver] = notification_driver diff --git a/ceilometer/openstack/common/rpc/__init__.py b/ceilometer/openstack/common/rpc/__init__.py index c3552104..e0d27af3 100644 --- a/ceilometer/openstack/common/rpc/__init__.py +++ b/ceilometer/openstack/common/rpc/__init__.py @@ -49,15 +49,21 @@ rpc_opts = [ cfg.ListOpt('allowed_rpc_exception_modules', default=['ceilometer.openstack.common.exception', 'nova.exception', + 'cinder.exception', ], help='Modules of exceptions that are permitted to be recreated' 'upon receiving exception data from an rpc call.'), - cfg.StrOpt('control_exchange', - default='nova', - help='AMQP exchange to connect to if using RabbitMQ or Qpid'), cfg.BoolOpt('fake_rabbit', default=False, help='If passed, use a fake RabbitMQ provider'), + # + # The following options are not registered here, but are expected to be + # present. The project using this library must register these options with + # the configuration so that project-specific defaults may be defined. + # + #cfg.StrOpt('control_exchange', + # default='nova', + # help='AMQP exchange to connect to if using RabbitMQ or Qpid'), ] cfg.CONF.register_opts(rpc_opts) diff --git a/ceilometer/openstack/common/rpc/amqp.py b/ceilometer/openstack/common/rpc/amqp.py index 16635149..e86e748a 100644 --- a/ceilometer/openstack/common/rpc/amqp.py +++ b/ceilometer/openstack/common/rpc/amqp.py @@ -34,6 +34,7 @@ from eventlet import greenpool from eventlet import pools from eventlet import semaphore +from ceilometer.openstack.common import cfg from ceilometer.openstack.common import excutils from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import local @@ -416,3 +417,10 @@ def notify(conf, context, topic, msg, connection_pool): def cleanup(connection_pool): if connection_pool: connection_pool.empty() + + +def get_control_exchange(conf): + try: + return conf.control_exchange + except cfg.NoSuchOptError: + return 'openstack' diff --git a/ceilometer/openstack/common/rpc/impl_kombu.py b/ceilometer/openstack/common/rpc/impl_kombu.py index b489141e..a68154e4 100644 --- a/ceilometer/openstack/common/rpc/impl_kombu.py +++ b/ceilometer/openstack/common/rpc/impl_kombu.py @@ -33,6 +33,7 @@ from ceilometer.openstack.common import cfg from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.rpc import amqp as rpc_amqp from ceilometer.openstack.common.rpc import common as rpc_common +from ceilometer.openstack.common import network_utils kombu_opts = [ cfg.StrOpt('kombu_ssl_version', @@ -50,10 +51,13 @@ kombu_opts = [ '(valid only if SSL enabled)')), cfg.StrOpt('rabbit_host', default='localhost', - help='the RabbitMQ host'), + help='The RabbitMQ broker address where a single node is used'), cfg.IntOpt('rabbit_port', default=5672, - help='the RabbitMQ port'), + help='The RabbitMQ broker port where a single node is used'), + cfg.ListOpt('rabbit_hosts', + default=['$rabbit_host:$rabbit_port'], + help='RabbitMQ HA cluster host:port pairs'), cfg.BoolOpt('rabbit_use_ssl', default=False, help='connect over SSL for RabbitMQ'), @@ -80,6 +84,11 @@ kombu_opts = [ cfg.BoolOpt('rabbit_durable_queues', default=False, help='use durable queues in RabbitMQ'), + cfg.BoolOpt('rabbit_ha_queues', + default=False, + help='use H/A queues in RabbitMQ (x-ha-policy: all).' + 'You need to wipe RabbitMQ database when ' + 'changing this option.'), ] @@ -88,6 +97,20 @@ cfg.CONF.register_opts(kombu_opts) LOG = rpc_common.LOG +def _get_queue_arguments(conf): + """Construct the arguments for declaring a queue. + + If the rabbit_ha_queues option is set, we declare a mirrored queue + as described here: + + http://www.rabbitmq.com/ha.html + + Setting x-ha-policy to all means that the queue will be mirrored + to all nodes in the cluster. + """ + return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} + + class ConsumerBase(object): """Consumer base class.""" @@ -192,7 +215,7 @@ class TopicConsumer(ConsumerBase): """Consumer class for 'topic'""" def __init__(self, conf, channel, topic, callback, tag, name=None, - **kwargs): + exchange_name=None, **kwargs): """Init a 'topic' queue. :param channel: the amqp channel to use @@ -207,10 +230,12 @@ class TopicConsumer(ConsumerBase): """ # Default options options = {'durable': conf.rabbit_durable_queues, + 'queue_arguments': _get_queue_arguments(conf), 'auto_delete': False, 'exclusive': False} options.update(kwargs) - exchange = kombu.entity.Exchange(name=conf.control_exchange, + exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) + exchange = kombu.entity.Exchange(name=exchange_name, type='topic', durable=options['durable'], auto_delete=options['auto_delete']) @@ -307,8 +332,12 @@ class TopicPublisher(Publisher): 'auto_delete': False, 'exclusive': False} options.update(kwargs) - super(TopicPublisher, self).__init__(channel, conf.control_exchange, - topic, type='topic', **options) + exchange_name = rpc_amqp.get_control_exchange(conf) + super(TopicPublisher, self).__init__(channel, + exchange_name, + topic, + type='topic', + **options) class FanoutPublisher(Publisher): @@ -331,6 +360,7 @@ class NotifyPublisher(TopicPublisher): def __init__(self, conf, channel, topic, **kwargs): self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) + self.queue_arguments = _get_queue_arguments(conf) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) def reconnect(self, channel): @@ -343,7 +373,8 @@ class NotifyPublisher(TopicPublisher): exchange=self.exchange, durable=self.durable, name=self.routing_key, - routing_key=self.routing_key) + routing_key=self.routing_key, + queue_arguments=self.queue_arguments) queue.declare() @@ -368,31 +399,37 @@ class Connection(object): if server_params is None: server_params = {} - # Keys to translate from server_params to kombu params server_params_to_kombu_params = {'username': 'userid'} - params = {} - for sp_key, value in server_params.iteritems(): - p_key = server_params_to_kombu_params.get(sp_key, sp_key) - params[p_key] = value + ssl_params = self._fetch_ssl_params() + params_list = [] + for adr in self.conf.rabbit_hosts: + hostname, port = network_utils.parse_host_port( + adr, default_port=self.conf.rabbit_port) - params.setdefault('hostname', self.conf.rabbit_host) - params.setdefault('port', self.conf.rabbit_port) - params.setdefault('userid', self.conf.rabbit_userid) - params.setdefault('password', self.conf.rabbit_password) - params.setdefault('virtual_host', self.conf.rabbit_virtual_host) + params = {} - self.params = params + for sp_key, value in server_params.iteritems(): + p_key = server_params_to_kombu_params.get(sp_key, sp_key) + params[p_key] = value - if self.conf.fake_rabbit: - self.params['transport'] = 'memory' - self.memory_transport = True - else: - self.memory_transport = False + params.setdefault('hostname', hostname) + params.setdefault('port', port) + params.setdefault('userid', self.conf.rabbit_userid) + params.setdefault('password', self.conf.rabbit_password) + params.setdefault('virtual_host', self.conf.rabbit_virtual_host) - if self.conf.rabbit_use_ssl: - self.params['ssl'] = self._fetch_ssl_params() + if self.conf.fake_rabbit: + params['transport'] = 'memory' + if self.conf.rabbit_use_ssl: + params['ssl'] = ssl_params + + params_list.append(params) + + self.params_list = params_list + + self.memory_transport = self.conf.fake_rabbit self.connection = None self.reconnect() @@ -422,14 +459,14 @@ class Connection(object): # Return the extended behavior return ssl_params - def _connect(self): + def _connect(self, params): """Connect to rabbit. Re-establish any queues that may have been declared before if we are reconnecting. Exceptions should be handled by the caller. """ if self.connection: LOG.info(_("Reconnecting to AMQP server on " - "%(hostname)s:%(port)d") % self.params) + "%(hostname)s:%(port)d") % params) try: self.connection.close() except self.connection_errors: @@ -437,7 +474,7 @@ class Connection(object): # Setting this in case the next statement fails, though # it shouldn't be doing any network operations, yet. self.connection = None - self.connection = kombu.connection.BrokerConnection(**self.params) + self.connection = kombu.connection.BrokerConnection(**params) self.connection_errors = self.connection.connection_errors if self.memory_transport: # Kludge to speed up tests. @@ -450,8 +487,8 @@ class Connection(object): self.channel._new_queue('ae.undeliver') for consumer in self.consumers: consumer.reconnect(self.channel) - LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), - self.params) + LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') % + params) def reconnect(self): """Handles reconnecting and re-establishing queues. @@ -464,11 +501,12 @@ class Connection(object): attempt = 0 while True: + params = self.params_list[attempt % len(self.params_list)] attempt += 1 try: - self._connect() + self._connect(params) return - except (self.connection_errors, IOError), e: + except (IOError, self.connection_errors) as e: pass except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib @@ -483,12 +521,12 @@ class Connection(object): log_info = {} log_info['err_str'] = str(e) log_info['max_retries'] = self.max_retries - log_info.update(self.params) + log_info.update(params) if self.max_retries and attempt == self.max_retries: - LOG.exception(_('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(max_retries)d ' - 'tries: %(err_str)s') % log_info) + LOG.error(_('Unable to connect to AMQP server on ' + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info) # NOTE(comstud): Copied from original code. There's # really no better recourse because if this was a queue we # need to consume on, we have no way to consume anymore. @@ -502,9 +540,9 @@ class Connection(object): sleep_time = min(sleep_time, self.interval_max) log_info['sleep_time'] = sleep_time - LOG.exception(_('AMQP server on %(hostname)s:%(port)d is' - ' unreachable: %(err_str)s. Trying again in ' - '%(sleep_time)d seconds.') % log_info) + LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' + 'unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.') % log_info) time.sleep(sleep_time) def ensure(self, error_callback, method, *args, **kwargs): @@ -512,7 +550,8 @@ class Connection(object): try: return method(*args, **kwargs) except (self.connection_errors, socket.timeout, IOError), e: - pass + if error_callback: + error_callback(e) except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib # to return an error not covered by its transport @@ -522,8 +561,8 @@ class Connection(object): # and try to reconnect in this case. if 'timeout' not in str(e): raise - if error_callback: - error_callback(e) + if error_callback: + error_callback(e) self.reconnect() def get_channel(self): @@ -625,10 +664,12 @@ class Connection(object): """ self.declare_consumer(DirectConsumer, topic, callback) - def declare_topic_consumer(self, topic, callback=None, queue_name=None): + def declare_topic_consumer(self, topic, callback=None, queue_name=None, + exchange_name=None): """Create a 'topic' consumer.""" self.declare_consumer(functools.partial(TopicConsumer, name=queue_name, + exchange_name=exchange_name, ), topic, callback) diff --git a/ceilometer/openstack/common/rpc/impl_qpid.py b/ceilometer/openstack/common/rpc/impl_qpid.py index d50a8b1c..8bc2c5da 100644 --- a/ceilometer/openstack/common/rpc/impl_qpid.py +++ b/ceilometer/openstack/common/rpc/impl_qpid.py @@ -69,7 +69,7 @@ qpid_opts = [ default=0, help='Equivalent to setting max and min to the same value'), cfg.IntOpt('qpid_heartbeat', - default=5, + default=60, help='Seconds between connection keepalive heartbeats'), cfg.StrOpt('qpid_protocol', default='tcp', @@ -170,7 +170,8 @@ class DirectConsumer(ConsumerBase): class TopicConsumer(ConsumerBase): """Consumer class for 'topic'""" - def __init__(self, conf, session, topic, callback, name=None): + def __init__(self, conf, session, topic, callback, name=None, + exchange_name=None): """Init a 'topic' queue. :param session: the amqp session to use @@ -180,9 +181,9 @@ class TopicConsumer(ConsumerBase): :param name: optional queue name, defaults to topic """ + exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) super(TopicConsumer, self).__init__(session, callback, - "%s/%s" % (conf.control_exchange, - topic), + "%s/%s" % (exchange_name, topic), {}, name or topic, {}) @@ -256,9 +257,9 @@ class TopicPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'topic' publisher. """ - super(TopicPublisher, self).__init__( - session, - "%s/%s" % (conf.control_exchange, topic)) + exchange_name = rpc_amqp.get_control_exchange(conf) + super(TopicPublisher, self).__init__(session, + "%s/%s" % (exchange_name, topic)) class FanoutPublisher(Publisher): @@ -276,10 +277,10 @@ class NotifyPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'topic' publisher. """ - super(NotifyPublisher, self).__init__( - session, - "%s/%s" % (conf.control_exchange, topic), - {"durable": True}) + exchange_name = rpc_amqp.get_control_exchange(conf) + super(NotifyPublisher, self).__init__(session, + "%s/%s" % (exchange_name, topic), + {"durable": True}) class Connection(object): @@ -464,10 +465,12 @@ class Connection(object): """ self.declare_consumer(DirectConsumer, topic, callback) - def declare_topic_consumer(self, topic, callback=None, queue_name=None): + def declare_topic_consumer(self, topic, callback=None, queue_name=None, + exchange_name=None): """Create a 'topic' consumer.""" self.declare_consumer(functools.partial(TopicConsumer, name=queue_name, + exchange_name=exchange_name, ), topic, callback) diff --git a/ceilometer/openstack/common/rpc/impl_zmq.py b/ceilometer/openstack/common/rpc/impl_zmq.py index bf6a81df..0b2e50a7 100644 --- a/ceilometer/openstack/common/rpc/impl_zmq.py +++ b/ceilometer/openstack/common/rpc/impl_zmq.py @@ -58,6 +58,9 @@ zmq_opts = [ cfg.IntOpt('rpc_zmq_port', default=9501, help='ZeroMQ receiver listening port'), + cfg.IntOpt('rpc_zmq_port_pub', default=9502, + help='ZeroMQ fanout publisher port'), + cfg.IntOpt('rpc_zmq_contexts', default=1, help='Number of ZeroMQ contexts, defaults to 1'), @@ -206,7 +209,7 @@ class ZmqClient(object): self.outq = ZmqSocket(addr, socket_type, bind=bind) def cast(self, msg_id, topic, data): - self.outq.send([str(msg_id), str(topic), str('cast'), + self.outq.send([str(topic), str(msg_id), str('cast'), _serialize(data)]) def close(self): @@ -299,6 +302,9 @@ class ConsumerBase(object): else: return [result] + def consume(self, sock): + raise NotImplementedError() + def process(self, style, target, proxy, ctx, data): # Method starting with - are # processed internally. (non-valid method name) @@ -411,12 +417,17 @@ class ZmqProxy(ZmqBaseReactor): zmq.PUB, bind=True) self.sockets.append(self.topic_proxy['zmq_replies']) + self.topic_proxy['fanout~'] = \ + ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address, + CONF.rpc_zmq_port_pub), zmq.PUB, bind=True) + self.sockets.append(self.topic_proxy['fanout~']) + def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() - msg_id, topic, style, in_msg = data + topic, msg_id, style, in_msg = data topic = topic.split('.', 1)[0] LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) @@ -424,6 +435,11 @@ class ZmqProxy(ZmqBaseReactor): # Handle zmq_replies magic if topic.startswith('fanout~'): sock_type = zmq.PUB + + # This doesn't change what is in the message, + # it only specifies that these messages go to + # the generic fanout topic. + topic = 'fanout~' elif topic.startswith('zmq_replies'): sock_type = zmq.PUB inside = _deserialize(in_msg) @@ -434,23 +450,32 @@ class ZmqProxy(ZmqBaseReactor): else: sock_type = zmq.PUSH - if not topic in self.topic_proxy: - outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), - sock_type, bind=True) - self.topic_proxy[topic] = outq - self.sockets.append(outq) - LOG.info(_("Created topic proxy: %s"), topic) - - # It takes some time for a pub socket to open, - # before we can have any faith in doing a send() to it. - if sock_type == zmq.PUB: - eventlet.sleep(.5) + if not topic in self.topic_proxy: + outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), + sock_type, bind=True) + self.topic_proxy[topic] = outq + self.sockets.append(outq) + LOG.info(_("Created topic proxy: %s"), topic) LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) self.topic_proxy[topic].send(data) LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) +class CallbackReactor(ZmqBaseReactor): + """ + A consumer class passing messages to a callback + """ + + def __init__(self, conf, callback): + self._cb = callback + super(CallbackReactor, self).__init__(conf) + + def consume(self, sock): + data = sock.recv() + self._cb(data[3]) + + class ZmqReactor(ZmqBaseReactor): """ A consumer class implementing a @@ -471,7 +496,7 @@ class ZmqReactor(ZmqBaseReactor): self.mapping[sock].send(data) return - msg_id, topic, style, in_msg = data + topic, msg_id, style, in_msg = data ctx, request = _deserialize(in_msg) ctx = RpcContext.unmarshal(ctx) @@ -488,6 +513,26 @@ class Connection(rpc_common.Connection): def __init__(self, conf): self.reactor = ZmqReactor(conf) + def _consume_fanout(self, reactor, topic, proxy, bind=False): + for topic, host in matchmaker.queues("publishers~%s" % (topic, )): + inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port) + reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind) + + def declare_topic_consumer(self, topic, callback=None, + queue_name=None): + """declare_topic_consumer is a private method, but + it is being used by Quantum (Folsom). + This has been added compatibility. + """ + # Only consume on the base topic name. + topic = topic.split('.', 1)[0] + + if CONF.rpc_zmq_host in matchmaker.queues("fanout~%s" % (topic, )): + return + + reactor = CallbackReactor(CONF, callback) + self._consume_fanout(reactor, topic, None, bind=False) + def create_consumer(self, topic, proxy, fanout=False): # Only consume on the base topic name. topic = topic.split('.', 1)[0] @@ -495,22 +540,35 @@ class Connection(rpc_common.Connection): LOG.info(_("Create Consumer for topic (%(topic)s)") % {'topic': topic}) - # Subscription scenarios + # Consume direct-push fanout messages (relay to local consumers) if fanout: - subscribe = ('', fanout)[type(fanout) == str] + # If we're not in here, we can't receive direct fanout messages + if CONF.rpc_zmq_host in matchmaker.queues(topic): + # Consume from all remote publishers. + self._consume_fanout(self.reactor, topic, proxy) + else: + LOG.warn("This service cannot receive direct PUSH fanout " + "messages without being known by the matchmaker.") + return + + # Configure consumer for direct pushes. + subscribe = (topic, fanout)[type(fanout) == str] sock_type = zmq.SUB topic = 'fanout~' + topic + + inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, ) else: sock_type = zmq.PULL subscribe = None - # Receive messages from (local) proxy - inaddr = "ipc://%s/zmq_topic_%s" % \ - (CONF.rpc_zmq_ipc_dir, topic) + # Receive messages from (local) proxy + inaddr = "ipc://%s/zmq_topic_%s" % \ + (CONF.rpc_zmq_ipc_dir, topic) LOG.debug(_("Consumer is a zmq.%s"), ['PULL', 'SUB'][sock_type == zmq.SUB]) + # Consume messages from local rpc-zmq-receiver daemon. self.reactor.register(proxy, inaddr, sock_type, subscribe=subscribe, in_bind=False) diff --git a/ceilometer/openstack/common/rpc/matchmaker.py b/ceilometer/openstack/common/rpc/matchmaker.py index bf4f85d4..268c05f4 100644 --- a/ceilometer/openstack/common/rpc/matchmaker.py +++ b/ceilometer/openstack/common/rpc/matchmaker.py @@ -132,6 +132,14 @@ class FanoutBinding(Binding): return False +class PublisherBinding(Binding): + """Match on publishers keys, where key starts with 'publishers.' string.""" + def test(self, key): + if key.startswith('publishers~'): + return True + return False + + class StubExchange(Exchange): """Exchange that does nothing.""" def run(self, key): @@ -182,6 +190,23 @@ class RoundRobinRingExchange(RingExchange): return [(key + '.' + host, host)] +class PublisherRingExchange(RingExchange): + """Fanout Exchange based on a hashmap.""" + def __init__(self, ring=None): + super(PublisherRingExchange, self).__init__(ring) + + def run(self, key): + # Assume starts with "publishers~", strip it for lookup. + nkey = key.split('publishers~')[1:][0] + if not self._ring_has(nkey): + LOG.warn( + _("No key defining hosts for topic '%s', " + "see ringfile") % (nkey, ) + ) + return [] + return map(lambda x: (key + '.' + x, x), self.ring[nkey]) + + class FanoutRingExchange(RingExchange): """Fanout Exchange based on a hashmap.""" def __init__(self, ring=None): @@ -196,7 +221,8 @@ class FanoutRingExchange(RingExchange): "see ringfile") % (nkey, ) ) return [] - return map(lambda x: (key + '.' + x, x), self.ring[nkey]) + return map(lambda x: (key + '.' + x, x), self.ring[nkey] + + ['localhost']) class LocalhostExchange(Exchange): @@ -227,6 +253,7 @@ class MatchMakerRing(MatchMakerBase): """ def __init__(self, ring=None): super(MatchMakerRing, self).__init__() + self.add_binding(PublisherBinding(), PublisherRingExchange(ring)) self.add_binding(FanoutBinding(), FanoutRingExchange(ring)) self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(TopicBinding(), RoundRobinRingExchange(ring)) @@ -239,6 +266,7 @@ class MatchMakerLocalhost(MatchMakerBase): """ def __init__(self): super(MatchMakerLocalhost, self).__init__() + self.add_binding(PublisherBinding(), LocalhostExchange()) self.add_binding(FanoutBinding(), LocalhostExchange()) self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(TopicBinding(), LocalhostExchange()) @@ -253,6 +281,7 @@ class MatchMakerStub(MatchMakerBase): def __init__(self): super(MatchMakerLocalhost, self).__init__() + self.add_binding(PublisherBinding(), StubExchange()) self.add_binding(FanoutBinding(), StubExchange()) self.add_binding(DirectBinding(), StubExchange()) self.add_binding(TopicBinding(), StubExchange()) diff --git a/ceilometer/openstack/common/rpc/service.py b/ceilometer/openstack/common/rpc/service.py new file mode 100644 index 00000000..c9348154 --- /dev/null +++ b/ceilometer/openstack/common/rpc/service.py @@ -0,0 +1,69 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2011 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common import log as logging +from ceilometer.openstack.common import rpc +from ceilometer.openstack.common import service + + +LOG = logging.getLogger(__name__) + + +class Service(service.Service): + """Service object for binaries running on hosts. + + A service enables rpc by listening to queues based on topic and host.""" + def __init__(self, host, topic, manager=None): + super(Service, self).__init__() + self.host = host + self.topic = topic + if manager is None: + self.manager = self + else: + self.manager = manager + + def start(self): + super(Service, self).start() + + self.conn = rpc.create_connection(new=True) + LOG.debug(_("Creating Consumer connection for Service %s") % + self.topic) + + rpc_dispatcher = rpc.dispatcher.RpcDispatcher([self.manager]) + + # Share this same connection for these Consumers + self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False) + + node_topic = '%s.%s' % (self.topic, self.host) + self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False) + + self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True) + + # Consume from all consumers in a thread + self.conn.consume_in_thread() + + def stop(self): + # Try to shut the connection down, but if we get any sort of + # errors, go ahead and ignore them.. as we're shutting down anyway + try: + self.conn.close() + except Exception: + pass + super(Service, self).stop() diff --git a/ceilometer/openstack/common/timeutils.py b/ceilometer/openstack/common/timeutils.py index c4f6cf04..93b34fc5 100644 --- a/ceilometer/openstack/common/timeutils.py +++ b/ceilometer/openstack/common/timeutils.py @@ -62,9 +62,11 @@ def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT): def normalize_time(timestamp): - """Normalize time in arbitrary timezone to UTC""" + """Normalize time in arbitrary timezone to UTC naive object""" offset = timestamp.utcoffset() - return timestamp.replace(tzinfo=None) - offset if offset else timestamp + if offset is None: + return timestamp + return timestamp.replace(tzinfo=None) - offset def is_older_than(before, seconds): @@ -121,6 +123,10 @@ def marshall_now(now=None): def unmarshall_time(tyme): """Unmarshall a datetime dict.""" - return datetime.datetime(day=tyme['day'], month=tyme['month'], - year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'], - second=tyme['second'], microsecond=tyme['microsecond']) + return datetime.datetime(day=tyme['day'], + month=tyme['month'], + year=tyme['year'], + hour=tyme['hour'], + minute=tyme['minute'], + second=tyme['second'], + microsecond=tyme['microsecond']) diff --git a/openstack-common.conf b/openstack-common.conf index 3493f1c6..a5c24580 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,3 +1,3 @@ [DEFAULT] -modules=cfg,iniparser,rpc,importutils,excutils,local,jsonutils,gettextutils,timeutils,notifier,context,log +modules=cfg,iniparser,rpc,importutils,excutils,local,jsonutils,gettextutils,timeutils,notifier,context,log,network_utils base=ceilometer \ No newline at end of file