Merge "Update openstack.common"

This commit is contained in:
Jenkins 2012-09-28 11:05:12 +00:00 committed by Gerrit Code Review
commit d63eca399b
12 changed files with 379 additions and 92 deletions

View File

@ -54,15 +54,14 @@ log_opts = [
'%(message)s',
help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string',
default='%(asctime)s %(levelname)s %(name)s [-] %(instance)s'
'%(message)s',
default='%(asctime)s %(process)d %(levelname)s %(name)s [-]'
' %(instance)s%(message)s',
help='format string to use for log messages without context'),
cfg.StrOpt('logging_debug_format_suffix',
default='from (pid=%(process)d) %(funcName)s '
'%(pathname)s:%(lineno)d',
default='%(funcName)s %(pathname)s:%(lineno)d',
help='data to append to log format when level is DEBUG'),
cfg.StrOpt('logging_exception_prefix',
default='%(asctime)s TRACE %(name)s %(instance)s',
default='%(asctime)s %(process)d TRACE %(name)s %(instance)s',
help='prefix each line of exception output with this format'),
cfg.ListOpt('default_log_levels',
default=[

View File

@ -0,0 +1,68 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Network-related utilities and helper functions.
"""
import logging
LOG = logging.getLogger(__name__)
def parse_host_port(address, default_port=None):
"""
Interpret a string as a host:port pair.
An IPv6 address MUST be escaped if accompanied by a port,
because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
means both [2001:db8:85a3::8a2e:370:7334] and
[2001:db8:85a3::8a2e:370]:7334.
>>> parse_host_port('server01:80')
('server01', 80)
>>> parse_host_port('server01')
('server01', None)
>>> parse_host_port('server01', default_port=1234)
('server01', 1234)
>>> parse_host_port('[::1]:80')
('::1', 80)
>>> parse_host_port('[::1]')
('::1', None)
>>> parse_host_port('[::1]', default_port=1234)
('::1', 1234)
>>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234)
('2001:db8:85a3::8a2e:370:7334', 1234)
"""
if address[0] == '[':
# Escaped ipv6
_host, _port = address[1:].split(']')
host = _host
if ':' in _port:
port = _port.split(':')[1]
else:
port = default_port
else:
if address.count(':') == 1:
host, port = address.split(':')
else:
# 0 means ipv4, >1 means ipv6.
# We prohibit unescaped ipv6 addresses with port.
host = address
port = default_port
return (host, None if port is None else int(port))

View File

@ -139,8 +139,8 @@ def notify(context, publisher_id, event_type, priority, payload):
driver.notify(context, msg)
except Exception, e:
LOG.exception(_("Problem '%(e)s' attempting to "
"send to notification system. Payload=%(payload)s") %
locals())
"send to notification system. "
"Payload=%(payload)s") % locals())
_drivers = None
@ -169,7 +169,7 @@ def add_driver(notification_driver):
except ImportError as e:
LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") %
notification_driver)
notification_driver)
else:
# Driver is already loaded; just add the object.
_drivers[notification_driver] = notification_driver

View File

@ -49,15 +49,21 @@ rpc_opts = [
cfg.ListOpt('allowed_rpc_exception_modules',
default=['ceilometer.openstack.common.exception',
'nova.exception',
'cinder.exception',
],
help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'),
cfg.StrOpt('control_exchange',
default='nova',
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.BoolOpt('fake_rabbit',
default=False,
help='If passed, use a fake RabbitMQ provider'),
#
# The following options are not registered here, but are expected to be
# present. The project using this library must register these options with
# the configuration so that project-specific defaults may be defined.
#
#cfg.StrOpt('control_exchange',
# default='nova',
# help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
cfg.CONF.register_opts(rpc_opts)

View File

@ -34,6 +34,7 @@ from eventlet import greenpool
from eventlet import pools
from eventlet import semaphore
from ceilometer.openstack.common import cfg
from ceilometer.openstack.common import excutils
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import local
@ -416,3 +417,10 @@ def notify(conf, context, topic, msg, connection_pool):
def cleanup(connection_pool):
if connection_pool:
connection_pool.empty()
def get_control_exchange(conf):
try:
return conf.control_exchange
except cfg.NoSuchOptError:
return 'openstack'

View File

@ -33,6 +33,7 @@ from ceilometer.openstack.common import cfg
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common.rpc import amqp as rpc_amqp
from ceilometer.openstack.common.rpc import common as rpc_common
from ceilometer.openstack.common import network_utils
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
@ -50,10 +51,13 @@ kombu_opts = [
'(valid only if SSL enabled)')),
cfg.StrOpt('rabbit_host',
default='localhost',
help='the RabbitMQ host'),
help='The RabbitMQ broker address where a single node is used'),
cfg.IntOpt('rabbit_port',
default=5672,
help='the RabbitMQ port'),
help='The RabbitMQ broker port where a single node is used'),
cfg.ListOpt('rabbit_hosts',
default=['$rabbit_host:$rabbit_port'],
help='RabbitMQ HA cluster host:port pairs'),
cfg.BoolOpt('rabbit_use_ssl',
default=False,
help='connect over SSL for RabbitMQ'),
@ -80,6 +84,11 @@ kombu_opts = [
cfg.BoolOpt('rabbit_durable_queues',
default=False,
help='use durable queues in RabbitMQ'),
cfg.BoolOpt('rabbit_ha_queues',
default=False,
help='use H/A queues in RabbitMQ (x-ha-policy: all).'
'You need to wipe RabbitMQ database when '
'changing this option.'),
]
@ -88,6 +97,20 @@ cfg.CONF.register_opts(kombu_opts)
LOG = rpc_common.LOG
def _get_queue_arguments(conf):
"""Construct the arguments for declaring a queue.
If the rabbit_ha_queues option is set, we declare a mirrored queue
as described here:
http://www.rabbitmq.com/ha.html
Setting x-ha-policy to all means that the queue will be mirrored
to all nodes in the cluster.
"""
return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
class ConsumerBase(object):
"""Consumer base class."""
@ -192,7 +215,7 @@ class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
def __init__(self, conf, channel, topic, callback, tag, name=None,
**kwargs):
exchange_name=None, **kwargs):
"""Init a 'topic' queue.
:param channel: the amqp channel to use
@ -207,10 +230,12 @@ class TopicConsumer(ConsumerBase):
"""
# Default options
options = {'durable': conf.rabbit_durable_queues,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(name=conf.control_exchange,
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
exchange = kombu.entity.Exchange(name=exchange_name,
type='topic',
durable=options['durable'],
auto_delete=options['auto_delete'])
@ -307,8 +332,12 @@ class TopicPublisher(Publisher):
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
super(TopicPublisher, self).__init__(channel, conf.control_exchange,
topic, type='topic', **options)
exchange_name = rpc_amqp.get_control_exchange(conf)
super(TopicPublisher, self).__init__(channel,
exchange_name,
topic,
type='topic',
**options)
class FanoutPublisher(Publisher):
@ -331,6 +360,7 @@ class NotifyPublisher(TopicPublisher):
def __init__(self, conf, channel, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
def reconnect(self, channel):
@ -343,7 +373,8 @@ class NotifyPublisher(TopicPublisher):
exchange=self.exchange,
durable=self.durable,
name=self.routing_key,
routing_key=self.routing_key)
routing_key=self.routing_key,
queue_arguments=self.queue_arguments)
queue.declare()
@ -368,31 +399,37 @@ class Connection(object):
if server_params is None:
server_params = {}
# Keys to translate from server_params to kombu params
server_params_to_kombu_params = {'username': 'userid'}
params = {}
for sp_key, value in server_params.iteritems():
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
params[p_key] = value
ssl_params = self._fetch_ssl_params()
params_list = []
for adr in self.conf.rabbit_hosts:
hostname, port = network_utils.parse_host_port(
adr, default_port=self.conf.rabbit_port)
params.setdefault('hostname', self.conf.rabbit_host)
params.setdefault('port', self.conf.rabbit_port)
params.setdefault('userid', self.conf.rabbit_userid)
params.setdefault('password', self.conf.rabbit_password)
params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
params = {}
self.params = params
for sp_key, value in server_params.iteritems():
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
params[p_key] = value
if self.conf.fake_rabbit:
self.params['transport'] = 'memory'
self.memory_transport = True
else:
self.memory_transport = False
params.setdefault('hostname', hostname)
params.setdefault('port', port)
params.setdefault('userid', self.conf.rabbit_userid)
params.setdefault('password', self.conf.rabbit_password)
params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
if self.conf.rabbit_use_ssl:
self.params['ssl'] = self._fetch_ssl_params()
if self.conf.fake_rabbit:
params['transport'] = 'memory'
if self.conf.rabbit_use_ssl:
params['ssl'] = ssl_params
params_list.append(params)
self.params_list = params_list
self.memory_transport = self.conf.fake_rabbit
self.connection = None
self.reconnect()
@ -422,14 +459,14 @@ class Connection(object):
# Return the extended behavior
return ssl_params
def _connect(self):
def _connect(self, params):
"""Connect to rabbit. Re-establish any queues that may have
been declared before if we are reconnecting. Exceptions should
be handled by the caller.
"""
if self.connection:
LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % self.params)
"%(hostname)s:%(port)d") % params)
try:
self.connection.close()
except self.connection_errors:
@ -437,7 +474,7 @@ class Connection(object):
# Setting this in case the next statement fails, though
# it shouldn't be doing any network operations, yet.
self.connection = None
self.connection = kombu.connection.BrokerConnection(**self.params)
self.connection = kombu.connection.BrokerConnection(**params)
self.connection_errors = self.connection.connection_errors
if self.memory_transport:
# Kludge to speed up tests.
@ -450,8 +487,8 @@ class Connection(object):
self.channel._new_queue('ae.undeliver')
for consumer in self.consumers:
consumer.reconnect(self.channel)
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'),
self.params)
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
params)
def reconnect(self):
"""Handles reconnecting and re-establishing queues.
@ -464,11 +501,12 @@ class Connection(object):
attempt = 0
while True:
params = self.params_list[attempt % len(self.params_list)]
attempt += 1
try:
self._connect()
self._connect(params)
return
except (self.connection_errors, IOError), e:
except (IOError, self.connection_errors) as e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
@ -483,12 +521,12 @@ class Connection(object):
log_info = {}
log_info['err_str'] = str(e)
log_info['max_retries'] = self.max_retries
log_info.update(self.params)
log_info.update(params)
if self.max_retries and attempt == self.max_retries:
LOG.exception(_('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(max_retries)d '
'tries: %(err_str)s') % log_info)
LOG.error(_('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(max_retries)d '
'tries: %(err_str)s') % log_info)
# NOTE(comstud): Copied from original code. There's
# really no better recourse because if this was a queue we
# need to consume on, we have no way to consume anymore.
@ -502,9 +540,9 @@ class Connection(object):
sleep_time = min(sleep_time, self.interval_max)
log_info['sleep_time'] = sleep_time
LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
' unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.') % log_info)
LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time)
def ensure(self, error_callback, method, *args, **kwargs):
@ -512,7 +550,8 @@ class Connection(object):
try:
return method(*args, **kwargs)
except (self.connection_errors, socket.timeout, IOError), e:
pass
if error_callback:
error_callback(e)
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
@ -522,8 +561,8 @@ class Connection(object):
# and try to reconnect in this case.
if 'timeout' not in str(e):
raise
if error_callback:
error_callback(e)
if error_callback:
error_callback(e)
self.reconnect()
def get_channel(self):
@ -625,10 +664,12 @@ class Connection(object):
"""
self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None, queue_name=None):
def declare_topic_consumer(self, topic, callback=None, queue_name=None,
exchange_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
exchange_name=exchange_name,
),
topic, callback)

View File

@ -69,7 +69,7 @@ qpid_opts = [
default=0,
help='Equivalent to setting max and min to the same value'),
cfg.IntOpt('qpid_heartbeat',
default=5,
default=60,
help='Seconds between connection keepalive heartbeats'),
cfg.StrOpt('qpid_protocol',
default='tcp',
@ -170,7 +170,8 @@ class DirectConsumer(ConsumerBase):
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
def __init__(self, conf, session, topic, callback, name=None):
def __init__(self, conf, session, topic, callback, name=None,
exchange_name=None):
"""Init a 'topic' queue.
:param session: the amqp session to use
@ -180,9 +181,9 @@ class TopicConsumer(ConsumerBase):
:param name: optional queue name, defaults to topic
"""
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(session, callback,
"%s/%s" % (conf.control_exchange,
topic),
"%s/%s" % (exchange_name, topic),
{}, name or topic, {})
@ -256,9 +257,9 @@ class TopicPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
super(TopicPublisher, self).__init__(
session,
"%s/%s" % (conf.control_exchange, topic))
exchange_name = rpc_amqp.get_control_exchange(conf)
super(TopicPublisher, self).__init__(session,
"%s/%s" % (exchange_name, topic))
class FanoutPublisher(Publisher):
@ -276,10 +277,10 @@ class NotifyPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
super(NotifyPublisher, self).__init__(
session,
"%s/%s" % (conf.control_exchange, topic),
{"durable": True})
exchange_name = rpc_amqp.get_control_exchange(conf)
super(NotifyPublisher, self).__init__(session,
"%s/%s" % (exchange_name, topic),
{"durable": True})
class Connection(object):
@ -464,10 +465,12 @@ class Connection(object):
"""
self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None, queue_name=None):
def declare_topic_consumer(self, topic, callback=None, queue_name=None,
exchange_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
exchange_name=exchange_name,
),
topic, callback)

View File

@ -58,6 +58,9 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_port', default=9501,
help='ZeroMQ receiver listening port'),
cfg.IntOpt('rpc_zmq_port_pub', default=9502,
help='ZeroMQ fanout publisher port'),
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
@ -206,7 +209,7 @@ class ZmqClient(object):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data):
self.outq.send([str(msg_id), str(topic), str('cast'),
self.outq.send([str(topic), str(msg_id), str('cast'),
_serialize(data)])
def close(self):
@ -299,6 +302,9 @@ class ConsumerBase(object):
else:
return [result]
def consume(self, sock):
raise NotImplementedError()
def process(self, style, target, proxy, ctx, data):
# Method starting with - are
# processed internally. (non-valid method name)
@ -411,12 +417,17 @@ class ZmqProxy(ZmqBaseReactor):
zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['zmq_replies'])
self.topic_proxy['fanout~'] = \
ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address,
CONF.rpc_zmq_port_pub), zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['fanout~'])
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
msg_id, topic, style, in_msg = data
topic, msg_id, style, in_msg = data
topic = topic.split('.', 1)[0]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
@ -424,6 +435,11 @@ class ZmqProxy(ZmqBaseReactor):
# Handle zmq_replies magic
if topic.startswith('fanout~'):
sock_type = zmq.PUB
# This doesn't change what is in the message,
# it only specifies that these messages go to
# the generic fanout topic.
topic = 'fanout~'
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
inside = _deserialize(in_msg)
@ -434,23 +450,32 @@ class ZmqProxy(ZmqBaseReactor):
else:
sock_type = zmq.PUSH
if not topic in self.topic_proxy:
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
sock_type, bind=True)
self.topic_proxy[topic] = outq
self.sockets.append(outq)
LOG.info(_("Created topic proxy: %s"), topic)
# It takes some time for a pub socket to open,
# before we can have any faith in doing a send() to it.
if sock_type == zmq.PUB:
eventlet.sleep(.5)
if not topic in self.topic_proxy:
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
sock_type, bind=True)
self.topic_proxy[topic] = outq
self.sockets.append(outq)
LOG.info(_("Created topic proxy: %s"), topic)
LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
self.topic_proxy[topic].send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
class CallbackReactor(ZmqBaseReactor):
"""
A consumer class passing messages to a callback
"""
def __init__(self, conf, callback):
self._cb = callback
super(CallbackReactor, self).__init__(conf)
def consume(self, sock):
data = sock.recv()
self._cb(data[3])
class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
@ -471,7 +496,7 @@ class ZmqReactor(ZmqBaseReactor):
self.mapping[sock].send(data)
return
msg_id, topic, style, in_msg = data
topic, msg_id, style, in_msg = data
ctx, request = _deserialize(in_msg)
ctx = RpcContext.unmarshal(ctx)
@ -488,6 +513,26 @@ class Connection(rpc_common.Connection):
def __init__(self, conf):
self.reactor = ZmqReactor(conf)
def _consume_fanout(self, reactor, topic, proxy, bind=False):
for topic, host in matchmaker.queues("publishers~%s" % (topic, )):
inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port)
reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind)
def declare_topic_consumer(self, topic, callback=None,
queue_name=None):
"""declare_topic_consumer is a private method, but
it is being used by Quantum (Folsom).
This has been added compatibility.
"""
# Only consume on the base topic name.
topic = topic.split('.', 1)[0]
if CONF.rpc_zmq_host in matchmaker.queues("fanout~%s" % (topic, )):
return
reactor = CallbackReactor(CONF, callback)
self._consume_fanout(reactor, topic, None, bind=False)
def create_consumer(self, topic, proxy, fanout=False):
# Only consume on the base topic name.
topic = topic.split('.', 1)[0]
@ -495,22 +540,35 @@ class Connection(rpc_common.Connection):
LOG.info(_("Create Consumer for topic (%(topic)s)") %
{'topic': topic})
# Subscription scenarios
# Consume direct-push fanout messages (relay to local consumers)
if fanout:
subscribe = ('', fanout)[type(fanout) == str]
# If we're not in here, we can't receive direct fanout messages
if CONF.rpc_zmq_host in matchmaker.queues(topic):
# Consume from all remote publishers.
self._consume_fanout(self.reactor, topic, proxy)
else:
LOG.warn("This service cannot receive direct PUSH fanout "
"messages without being known by the matchmaker.")
return
# Configure consumer for direct pushes.
subscribe = (topic, fanout)[type(fanout) == str]
sock_type = zmq.SUB
topic = 'fanout~' + topic
inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, )
else:
sock_type = zmq.PULL
subscribe = None
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
(CONF.rpc_zmq_ipc_dir, topic)
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
(CONF.rpc_zmq_ipc_dir, topic)
LOG.debug(_("Consumer is a zmq.%s"),
['PULL', 'SUB'][sock_type == zmq.SUB])
# Consume messages from local rpc-zmq-receiver daemon.
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)

View File

@ -132,6 +132,14 @@ class FanoutBinding(Binding):
return False
class PublisherBinding(Binding):
"""Match on publishers keys, where key starts with 'publishers.' string."""
def test(self, key):
if key.startswith('publishers~'):
return True
return False
class StubExchange(Exchange):
"""Exchange that does nothing."""
def run(self, key):
@ -182,6 +190,23 @@ class RoundRobinRingExchange(RingExchange):
return [(key + '.' + host, host)]
class PublisherRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
super(PublisherRingExchange, self).__init__(ring)
def run(self, key):
# Assume starts with "publishers~", strip it for lookup.
nkey = key.split('publishers~')[1:][0]
if not self._ring_has(nkey):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (nkey, )
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
@ -196,7 +221,8 @@ class FanoutRingExchange(RingExchange):
"see ringfile") % (nkey, )
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
return map(lambda x: (key + '.' + x, x), self.ring[nkey] +
['localhost'])
class LocalhostExchange(Exchange):
@ -227,6 +253,7 @@ class MatchMakerRing(MatchMakerBase):
"""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
self.add_binding(PublisherBinding(), PublisherRingExchange(ring))
self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
@ -239,6 +266,7 @@ class MatchMakerLocalhost(MatchMakerBase):
"""
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
self.add_binding(PublisherBinding(), LocalhostExchange())
self.add_binding(FanoutBinding(), LocalhostExchange())
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange())
@ -253,6 +281,7 @@ class MatchMakerStub(MatchMakerBase):
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
self.add_binding(PublisherBinding(), StubExchange())
self.add_binding(FanoutBinding(), StubExchange())
self.add_binding(DirectBinding(), StubExchange())
self.add_binding(TopicBinding(), StubExchange())

View File

@ -0,0 +1,69 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import rpc
from ceilometer.openstack.common import service
LOG = logging.getLogger(__name__)
class Service(service.Service):
"""Service object for binaries running on hosts.
A service enables rpc by listening to queues based on topic and host."""
def __init__(self, host, topic, manager=None):
super(Service, self).__init__()
self.host = host
self.topic = topic
if manager is None:
self.manager = self
else:
self.manager = manager
def start(self):
super(Service, self).start()
self.conn = rpc.create_connection(new=True)
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
rpc_dispatcher = rpc.dispatcher.RpcDispatcher([self.manager])
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
def stop(self):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.conn.close()
except Exception:
pass
super(Service, self).stop()

View File

@ -62,9 +62,11 @@ def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
def normalize_time(timestamp):
"""Normalize time in arbitrary timezone to UTC"""
"""Normalize time in arbitrary timezone to UTC naive object"""
offset = timestamp.utcoffset()
return timestamp.replace(tzinfo=None) - offset if offset else timestamp
if offset is None:
return timestamp
return timestamp.replace(tzinfo=None) - offset
def is_older_than(before, seconds):
@ -121,6 +123,10 @@ def marshall_now(now=None):
def unmarshall_time(tyme):
"""Unmarshall a datetime dict."""
return datetime.datetime(day=tyme['day'], month=tyme['month'],
year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'],
second=tyme['second'], microsecond=tyme['microsecond'])
return datetime.datetime(day=tyme['day'],
month=tyme['month'],
year=tyme['year'],
hour=tyme['hour'],
minute=tyme['minute'],
second=tyme['second'],
microsecond=tyme['microsecond'])

View File

@ -1,3 +1,3 @@
[DEFAULT]
modules=cfg,iniparser,rpc,importutils,excutils,local,jsonutils,gettextutils,timeutils,notifier,context,log
modules=cfg,iniparser,rpc,importutils,excutils,local,jsonutils,gettextutils,timeutils,notifier,context,log,network_utils
base=ceilometer