diff --git a/quantum/openstack/common/notifier/log_notifier.py b/quantum/openstack/common/notifier/log_notifier.py index bc8f61865e..cea98311c4 100644 --- a/quantum/openstack/common/notifier/log_notifier.py +++ b/quantum/openstack/common/notifier/log_notifier.py @@ -13,7 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. - from oslo.config import cfg from quantum.openstack.common import jsonutils diff --git a/quantum/openstack/common/notifier/rpc_notifier.py b/quantum/openstack/common/notifier/rpc_notifier.py index 8d84aa69c0..e7caa97e28 100644 --- a/quantum/openstack/common/notifier/rpc_notifier.py +++ b/quantum/openstack/common/notifier/rpc_notifier.py @@ -13,7 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. - from oslo.config import cfg from quantum.openstack.common import context as req_context diff --git a/quantum/openstack/common/rpc/amqp.py b/quantum/openstack/common/rpc/amqp.py index 66ffd528f7..4b7b99dc2e 100644 --- a/quantum/openstack/common/rpc/amqp.py +++ b/quantum/openstack/common/rpc/amqp.py @@ -32,13 +32,27 @@ import uuid from eventlet import greenpool from eventlet import pools from eventlet import semaphore +from eventlet import queue +# TODO(pekowsk): Remove import cfg and below comment in Havana. +# This import should no longer be needed when the amqp_rpc_single_reply_queue +# option is removed. +from oslo.config import cfg from quantum.openstack.common import excutils from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import local from quantum.openstack.common import log as logging from quantum.openstack.common.rpc import common as rpc_common +# TODO(pekowski): Remove this option in Havana. +amqp_opts = [ + cfg.BoolOpt('amqp_rpc_single_reply_queue', + default=False, + help='Enable a fast single reply queue if using AMQP based ' + 'RPC like RabbitMQ or Qpid.'), +] + +cfg.CONF.register_opts(amqp_opts) LOG = logging.getLogger(__name__) @@ -51,6 +65,7 @@ class Pool(pools.Pool): kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size) kwargs.setdefault("order_as_stack", True) super(Pool, self).__init__(*args, **kwargs) + self.reply_proxy = None # TODO(comstud): Timeout connections not used in a while def create(self): @@ -60,6 +75,16 @@ class Pool(pools.Pool): def empty(self): while self.free_items: self.get().close() + # Force a new connection pool to be created. + # Note that this was added due to failing unit test cases. The issue + # is the above "while loop" gets all the cached connections from the + # pool and closes them, but never returns them to the pool, a pool + # leak. The unit tests hang waiting for an item to be returned to the + # pool. The unit tests get here via the teatDown() method. In the run + # time code, it gets here via cleanup() and only appears in service.py + # just before doing a sys.exit(), so cleanup() only happens once and + # the leakage is not a problem. + self.connection_cls.pool = None _pool_create_sem = semaphore.Semaphore() @@ -137,6 +162,12 @@ class ConnectionContext(rpc_common.Connection): def create_worker(self, topic, proxy, pool_name): self.connection.create_worker(topic, proxy, pool_name) + def join_consumer_pool(self, callback, pool_name, topic, exchange_name): + self.connection.join_consumer_pool(callback, + pool_name, + topic, + exchange_name) + def consume_in_thread(self): self.connection.consume_in_thread() @@ -148,8 +179,45 @@ class ConnectionContext(rpc_common.Connection): raise rpc_common.InvalidRPCConnectionReuse() -def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, - ending=False, log_failure=True): +class ReplyProxy(ConnectionContext): + """ Connection class for RPC replies / callbacks """ + def __init__(self, conf, connection_pool): + self._call_waiters = {} + self._num_call_waiters = 0 + self._num_call_waiters_wrn_threshhold = 10 + self._reply_q = 'reply_' + uuid.uuid4().hex + super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False) + self.declare_direct_consumer(self._reply_q, self._process_data) + self.consume_in_thread() + + def _process_data(self, message_data): + msg_id = message_data.pop('_msg_id', None) + waiter = self._call_waiters.get(msg_id) + if not waiter: + LOG.warn(_('no calling threads waiting for msg_id : %s' + ', message : %s') % (msg_id, message_data)) + else: + waiter.put(message_data) + + def add_call_waiter(self, waiter, msg_id): + self._num_call_waiters += 1 + if self._num_call_waiters > self._num_call_waiters_wrn_threshhold: + LOG.warn(_('Number of call waiters is greater than warning ' + 'threshhold: %d. There could be a MulticallProxyWaiter ' + 'leak.') % self._num_call_waiters_wrn_threshhold) + self._num_call_waiters_wrn_threshhold *= 2 + self._call_waiters[msg_id] = waiter + + def del_call_waiter(self, msg_id): + self._num_call_waiters -= 1 + del self._call_waiters[msg_id] + + def get_reply_q(self): + return self._reply_q + + +def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None, + failure=None, ending=False, log_failure=True): """Sends a reply or an error on the channel signified by msg_id. Failure should be a sys.exc_info() tuple. @@ -168,13 +236,21 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, 'failure': failure} if ending: msg['ending'] = True - conn.direct_send(msg_id, rpc_common.serialize_msg(msg)) + # If a reply_q exists, add the msg_id to the reply and pass the + # reply_q to direct_send() to use it as the response queue. + # Otherwise use the msg_id for backward compatibilty. + if reply_q: + msg['_msg_id'] = msg_id + conn.direct_send(reply_q, rpc_common.serialize_msg(msg)) + else: + conn.direct_send(msg_id, rpc_common.serialize_msg(msg)) class RpcContext(rpc_common.CommonRpcContext): """Context that supports replying to a rpc.call""" def __init__(self, **kwargs): self.msg_id = kwargs.pop('msg_id', None) + self.reply_q = kwargs.pop('reply_q', None) self.conf = kwargs.pop('conf') super(RpcContext, self).__init__(**kwargs) @@ -182,13 +258,14 @@ class RpcContext(rpc_common.CommonRpcContext): values = self.to_dict() values['conf'] = self.conf values['msg_id'] = self.msg_id + values['reply_q'] = self.reply_q return self.__class__(**values) def reply(self, reply=None, failure=None, ending=False, connection_pool=None, log_failure=True): if self.msg_id: - msg_reply(self.conf, self.msg_id, connection_pool, reply, failure, - ending, log_failure) + msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool, + reply, failure, ending, log_failure) if ending: self.msg_id = None @@ -204,6 +281,7 @@ def unpack_context(conf, msg): value = msg.pop(key) context_dict[key[9:]] = value context_dict['msg_id'] = msg.pop('_msg_id', None) + context_dict['reply_q'] = msg.pop('_reply_q', None) context_dict['conf'] = conf ctx = RpcContext.from_dict(context_dict) rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict()) @@ -224,15 +302,54 @@ def pack_context(msg, context): msg.update(context_d) -class ProxyCallback(object): - """Calls methods on a proxy object based on method and args.""" +class _ThreadPoolWithWait(object): + """Base class for a delayed invocation manager used by + the Connection class to start up green threads + to handle incoming messages. + """ - def __init__(self, conf, proxy, connection_pool): - self.proxy = proxy + def __init__(self, conf, connection_pool): self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size) self.connection_pool = connection_pool self.conf = conf + def wait(self): + """Wait for all callback threads to exit.""" + self.pool.waitall() + + +class CallbackWrapper(_ThreadPoolWithWait): + """Wraps a straight callback to allow it to be invoked in a green + thread. + """ + + def __init__(self, conf, callback, connection_pool): + """ + :param conf: cfg.CONF instance + :param callback: a callable (probably a function) + :param connection_pool: connection pool as returned by + get_connection_pool() + """ + super(CallbackWrapper, self).__init__( + conf=conf, + connection_pool=connection_pool, + ) + self.callback = callback + + def __call__(self, message_data): + self.pool.spawn_n(self.callback, message_data) + + +class ProxyCallback(_ThreadPoolWithWait): + """Calls methods on a proxy object based on method and args.""" + + def __init__(self, conf, proxy, connection_pool): + super(ProxyCallback, self).__init__( + conf=conf, + connection_pool=connection_pool, + ) + self.proxy = proxy + def __call__(self, message_data): """Consumer callback to call a method on a proxy object. @@ -293,11 +410,66 @@ class ProxyCallback(object): ctxt.reply(None, sys.exc_info(), connection_pool=self.connection_pool) - def wait(self): - """Wait for all callback threads to exit.""" - self.pool.waitall() + +class MulticallProxyWaiter(object): + def __init__(self, conf, msg_id, timeout, connection_pool): + self._msg_id = msg_id + self._timeout = timeout or conf.rpc_response_timeout + self._reply_proxy = connection_pool.reply_proxy + self._done = False + self._got_ending = False + self._conf = conf + self._dataqueue = queue.LightQueue() + # Add this caller to the reply proxy's call_waiters + self._reply_proxy.add_call_waiter(self, self._msg_id) + + def put(self, data): + self._dataqueue.put(data) + + def done(self): + if self._done: + return + self._done = True + # Remove this caller from reply proxy's call_waiters + self._reply_proxy.del_call_waiter(self._msg_id) + + def _process_data(self, data): + result = None + if data['failure']: + failure = data['failure'] + result = rpc_common.deserialize_remote_exception(self._conf, + failure) + elif data.get('ending', False): + self._got_ending = True + else: + result = data['result'] + return result + + def __iter__(self): + """Return a result until we get a reply with an 'ending" flag""" + if self._done: + raise StopIteration + while True: + try: + data = self._dataqueue.get(timeout=self._timeout) + result = self._process_data(data) + except queue.Empty: + LOG.exception(_('Timed out waiting for RPC response.')) + self.done() + raise rpc_common.Timeout() + except Exception: + with excutils.save_and_reraise_exception(): + self.done() + if self._got_ending: + self.done() + raise StopIteration + if isinstance(result, Exception): + self.done() + raise result + yield result +#TODO(pekowski): Remove MulticallWaiter() in Havana. class MulticallWaiter(object): def __init__(self, conf, connection, timeout): self._connection = connection @@ -353,22 +525,40 @@ def create_connection(conf, new, connection_pool): return ConnectionContext(conf, connection_pool, pooled=not new) +_reply_proxy_create_sem = semaphore.Semaphore() + + def multicall(conf, context, topic, msg, timeout, connection_pool): """Make a call that returns multiple times.""" + # TODO(pekowski): Remove all these comments in Havana. + # For amqp_rpc_single_reply_queue = False, # Can't use 'with' for multicall, as it returns an iterator # that will continue to use the connection. When it's done, # connection.close() will get called which will put it back into # the pool + # For amqp_rpc_single_reply_queue = True, + # The 'with' statement is mandatory for closing the connection LOG.debug(_('Making synchronous call on %s ...'), topic) msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) LOG.debug(_('MSG_ID is %s') % (msg_id)) pack_context(msg, context) - conn = ConnectionContext(conf, connection_pool) - wait_msg = MulticallWaiter(conf, conn, timeout) - conn.declare_direct_consumer(msg_id, wait_msg) - conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) + # TODO(pekowski): Remove this flag and the code under the if clause + # in Havana. + if not conf.amqp_rpc_single_reply_queue: + conn = ConnectionContext(conf, connection_pool) + wait_msg = MulticallWaiter(conf, conn, timeout) + conn.declare_direct_consumer(msg_id, wait_msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) + else: + with _reply_proxy_create_sem: + if not connection_pool.reply_proxy: + connection_pool.reply_proxy = ReplyProxy(conf, connection_pool) + msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()}) + wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool) + with ConnectionContext(conf, connection_pool) as conn: + conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) return wait_msg diff --git a/quantum/openstack/common/rpc/common.py b/quantum/openstack/common/rpc/common.py index 4377eaac50..4421e7d951 100644 --- a/quantum/openstack/common/rpc/common.py +++ b/quantum/openstack/common/rpc/common.py @@ -197,6 +197,28 @@ class Connection(object): """ raise NotImplementedError() + def join_consumer_pool(self, callback, pool_name, topic, exchange_name): + """Register as a member of a group of consumers for a given topic from + the specified exchange. + + Exactly one member of a given pool will receive each message. + + A message will be delivered to multiple pools, if more than + one is created. + + :param callback: Callable to be invoked for each message. + :type callback: callable accepting one argument + :param pool_name: The name of the consumer pool. + :type pool_name: str + :param topic: The routing topic for desired messages. + :type topic: str + :param exchange_name: The name of the message exchange where + the client should attach. Defaults to + the configured exchange. + :type exchange_name: str + """ + raise NotImplementedError() + def consume_in_thread(self): """Spawn a thread to handle incoming messages. diff --git a/quantum/openstack/common/rpc/impl_kombu.py b/quantum/openstack/common/rpc/impl_kombu.py index 1635e166e1..dd4d47e218 100644 --- a/quantum/openstack/common/rpc/impl_kombu.py +++ b/quantum/openstack/common/rpc/impl_kombu.py @@ -165,9 +165,10 @@ class ConsumerBase(object): try: msg = rpc_common.deserialize_msg(message.payload) callback(msg) - message.ack() except Exception: LOG.exception(_("Failed to process message... skipping it.")) + finally: + message.ack() self.queue.consume(*args, callback=_callback, **options) @@ -750,6 +751,30 @@ class Connection(object): self.proxy_callbacks.append(proxy_cb) self.declare_topic_consumer(topic, proxy_cb, pool_name) + def join_consumer_pool(self, callback, pool_name, topic, + exchange_name=None): + """Register as a member of a group of consumers for a given topic from + the specified exchange. + + Exactly one member of a given pool will receive each message. + + A message will be delivered to multiple pools, if more than + one is created. + """ + callback_wrapper = rpc_amqp.CallbackWrapper( + conf=self.conf, + callback=callback, + connection_pool=rpc_amqp.get_connection_pool(self.conf, + Connection), + ) + self.proxy_callbacks.append(callback_wrapper) + self.declare_topic_consumer( + queue_name=pool_name, + topic=topic, + exchange_name=exchange_name, + callback=callback_wrapper, + ) + def create_connection(conf, new=True): """Create a connection""" diff --git a/quantum/openstack/common/rpc/impl_qpid.py b/quantum/openstack/common/rpc/impl_qpid.py index 689c9704e2..ba0920a936 100644 --- a/quantum/openstack/common/rpc/impl_qpid.py +++ b/quantum/openstack/common/rpc/impl_qpid.py @@ -560,6 +560,34 @@ class Connection(object): return consumer + def join_consumer_pool(self, callback, pool_name, topic, + exchange_name=None): + """Register as a member of a group of consumers for a given topic from + the specified exchange. + + Exactly one member of a given pool will receive each message. + + A message will be delivered to multiple pools, if more than + one is created. + """ + callback_wrapper = rpc_amqp.CallbackWrapper( + conf=self.conf, + callback=callback, + connection_pool=rpc_amqp.get_connection_pool(self.conf, + Connection), + ) + self.proxy_callbacks.append(callback_wrapper) + + consumer = TopicConsumer(conf=self.conf, + session=self.session, + topic=topic, + callback=callback_wrapper, + name=pool_name, + exchange_name=exchange_name) + + self._register_consumer(consumer) + return consumer + def create_connection(conf, new=True): """Create a connection"""