Update openstack.common

This update brings in the new join_consumer_pool()
method of the message bus connection class.

Change-Id: Ie5b9bf93c9aaf8f4f85b47b2394969741ba5fef4
Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
This commit is contained in:
Doug Hellmann 2013-02-14 18:12:41 -05:00
parent bc1bd84b91
commit 6ae709c268
5 changed files with 177 additions and 9 deletions

52
bin/ceilometer-rpc-zmq-receiver Executable file

@ -0,0 +1,52 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
eventlet.monkey_patch()
import contextlib
import os
import sys
# If ../ceilometer/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'ceilometer', '__init__.py')):
sys.path.insert(0, POSSIBLE_TOPDIR)
from ceilometer.openstack.common import cfg
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import rpc
from ceilometer.openstack.common.rpc import impl_zmq
CONF = cfg.CONF
CONF.register_opts(rpc.rpc_opts)
CONF.register_opts(impl_zmq.zmq_opts)
CONF(sys.argv[1:], project='ceilometer')
def main():
logging.setup("ceilometer")
with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
reactor.consume_in_thread()
reactor.wait()
if __name__ == '__main__':
main()

@ -137,6 +137,12 @@ class ConnectionContext(rpc_common.Connection):
def create_worker(self, topic, proxy, pool_name):
self.connection.create_worker(topic, proxy, pool_name)
def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
self.connection.join_consumer_pool(callback,
pool_name,
topic,
exchange_name)
def consume_in_thread(self):
self.connection.consume_in_thread()
@ -224,15 +230,54 @@ def pack_context(msg, context):
msg.update(context_d)
class ProxyCallback(object):
"""Calls methods on a proxy object based on method and args."""
class _ThreadPoolWithWait(object):
"""Base class for a delayed invocation manager used by
the Connection class to start up green threads
to handle incoming messages.
"""
def __init__(self, conf, proxy, connection_pool):
self.proxy = proxy
def __init__(self, conf, connection_pool):
self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
self.connection_pool = connection_pool
self.conf = conf
def wait(self):
"""Wait for all callback threads to exit."""
self.pool.waitall()
class CallbackWrapper(_ThreadPoolWithWait):
"""Wraps a straight callback to allow it to be invoked in a green
thread.
"""
def __init__(self, conf, callback, connection_pool):
"""
:param conf: cfg.CONF instance
:param callback: a callable (probably a function)
:param connection_pool: connection pool as returned by
get_connection_pool()
"""
super(CallbackWrapper, self).__init__(
conf=conf,
connection_pool=connection_pool,
)
self.callback = callback
def __call__(self, message_data):
self.pool.spawn_n(self.callback, message_data)
class ProxyCallback(_ThreadPoolWithWait):
"""Calls methods on a proxy object based on method and args."""
def __init__(self, conf, proxy, connection_pool):
super(ProxyCallback, self).__init__(
conf=conf,
connection_pool=connection_pool,
)
self.proxy = proxy
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
@ -293,10 +338,6 @@ class ProxyCallback(object):
ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool)
def wait(self):
"""Wait for all callback threads to exit."""
self.pool.waitall()
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):

@ -196,6 +196,28 @@ class Connection(object):
"""
raise NotImplementedError()
def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
Exactly one member of a given pool will receive each message.
A message will be delivered to multiple pools, if more than
one is created.
:param callback: Callable to be invoked for each message.
:type callback: callable accepting one argument
:param pool_name: The name of the consumer pool.
:type pool_name: str
:param topic: The routing topic for desired messages.
:type topic: str
:param exchange_name: The name of the message exchange where
the client should attach. Defaults to
the configured exchange.
:type exchange_name: str
"""
raise NotImplementedError()
def consume_in_thread(self):
"""Spawn a thread to handle incoming messages.

@ -165,9 +165,10 @@ class ConsumerBase(object):
try:
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
message.ack()
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
message.ack()
self.queue.consume(*args, callback=_callback, **options)
@ -750,6 +751,30 @@ class Connection(object):
self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name)
def join_consumer_pool(self, callback, pool_name, topic,
exchange_name=None):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
Exactly one member of a given pool will receive each message.
A message will be delivered to multiple pools, if more than
one is created.
"""
callback_wrapper = rpc_amqp.CallbackWrapper(
conf=self.conf,
callback=callback,
connection_pool=rpc_amqp.get_connection_pool(self.conf,
Connection),
)
self.proxy_callbacks.append(callback_wrapper)
self.declare_topic_consumer(
queue_name=pool_name,
topic=topic,
exchange_name=exchange_name,
callback=callback_wrapper,
)
def create_connection(conf, new=True):
"""Create a connection"""

@ -560,6 +560,34 @@ class Connection(object):
return consumer
def join_consumer_pool(self, callback, pool_name, topic,
exchange_name=None):
"""Register as a member of a group of consumers for a given topic from
the specified exchange.
Exactly one member of a given pool will receive each message.
A message will be delivered to multiple pools, if more than
one is created.
"""
callback_wrapper = rpc_amqp.CallbackWrapper(
conf=self.conf,
callback=callback,
connection_pool=rpc_amqp.get_connection_pool(self.conf,
Connection),
)
self.proxy_callbacks.append(callback_wrapper)
consumer = TopicConsumer(conf=self.conf,
session=self.session,
topic=topic,
callback=callback_wrapper,
name=pool_name,
exchange_name=exchange_name)
self._register_consumer(consumer)
return consumer
def create_connection(conf, new=True):
"""Create a connection"""