diff --git a/ceilometer/openstack/common/rpc/amqp.py b/ceilometer/openstack/common/rpc/amqp.py index ba08f5e40..e9702c53f 100644 --- a/ceilometer/openstack/common/rpc/amqp.py +++ b/ceilometer/openstack/common/rpc/amqp.py @@ -151,11 +151,13 @@ class ConnectionContext(rpc_common.Connection): def create_worker(self, topic, proxy, pool_name): self.connection.create_worker(topic, proxy, pool_name) - def join_consumer_pool(self, callback, pool_name, topic, exchange_name): + def join_consumer_pool(self, callback, pool_name, topic, exchange_name, + ack_on_error=True): self.connection.join_consumer_pool(callback, pool_name, topic, - exchange_name) + exchange_name, + ack_on_error) def consume_in_thread(self): self.connection.consume_in_thread() diff --git a/ceilometer/openstack/common/rpc/impl_kombu.py b/ceilometer/openstack/common/rpc/impl_kombu.py index b7b12b321..9e7e04e31 100644 --- a/ceilometer/openstack/common/rpc/impl_kombu.py +++ b/ceilometer/openstack/common/rpc/impl_kombu.py @@ -129,6 +129,7 @@ class ConsumerBase(object): self.tag = str(tag) self.kwargs = kwargs self.queue = None + self.ack_on_error = kwargs.get('ack_on_error', True) self.reconnect(channel) def reconnect(self, channel): @@ -138,6 +139,36 @@ class ConsumerBase(object): self.queue = kombu.entity.Queue(**self.kwargs) self.queue.declare() + def _callback_handler(self, message, callback): + """Call callback with deserialized message. + + Messages that are processed without exception are ack'ed. + + If the message processing generates an exception, it will be + ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed. + Rejection is better than waiting for the message to timeout. + Rejected messages are immediately requeued. + """ + + ack_msg = False + try: + msg = rpc_common.deserialize_msg(message.payload) + callback(msg) + ack_msg = True + except Exception: + if self.ack_on_error: + ack_msg = True + LOG.exception(_("Failed to process message" + " ... skipping it.")) + else: + LOG.exception(_("Failed to process message" + " ... will requeue.")) + finally: + if ack_msg: + message.ack() + else: + message.reject() + def consume(self, *args, **kwargs): """Actually declare the consumer on the amqp channel. This will start the flow of messages from the queue. Using the @@ -150,8 +181,6 @@ class ConsumerBase(object): If kwargs['nowait'] is True, then this call will block until a message is read. - Messages will automatically be acked if the callback doesn't - raise an exception """ options = {'consumer_tag': self.tag} @@ -162,13 +191,7 @@ class ConsumerBase(object): def _callback(raw_message): message = self.channel.message_to_python(raw_message) - try: - msg = rpc_common.deserialize_msg(message.payload) - callback(msg) - except Exception: - LOG.exception(_("Failed to process message... skipping it.")) - finally: - message.ack() + self._callback_handler(message, callback) self.queue.consume(*args, callback=_callback, **options) @@ -635,8 +658,8 @@ class Connection(object): def _consume(): if info['do_consume']: - queues_head = self.consumers[:-1] - queues_tail = self.consumers[-1] + queues_head = self.consumers[:-1] # not fanout. + queues_tail = self.consumers[-1] # fanout for queue in queues_head: queue.consume(nowait=True) queues_tail.consume(nowait=False) @@ -685,11 +708,12 @@ class Connection(object): self.declare_consumer(DirectConsumer, topic, callback) def declare_topic_consumer(self, topic, callback=None, queue_name=None, - exchange_name=None): + exchange_name=None, ack_on_error=True): """Create a 'topic' consumer.""" self.declare_consumer(functools.partial(TopicConsumer, name=queue_name, exchange_name=exchange_name, + ack_on_error=ack_on_error, ), topic, callback) @@ -754,7 +778,7 @@ class Connection(object): self.declare_topic_consumer(topic, proxy_cb, pool_name) def join_consumer_pool(self, callback, pool_name, topic, - exchange_name=None): + exchange_name=None, ack_on_error=True): """Register as a member of a group of consumers for a given topic from the specified exchange. @@ -775,6 +799,7 @@ class Connection(object): topic=topic, exchange_name=exchange_name, callback=callback_wrapper, + ack_on_error=ack_on_error, ) diff --git a/ceilometer/openstack/common/rpc/impl_qpid.py b/ceilometer/openstack/common/rpc/impl_qpid.py index ff717db5b..24ecbe6ab 100644 --- a/ceilometer/openstack/common/rpc/impl_qpid.py +++ b/ceilometer/openstack/common/rpc/impl_qpid.py @@ -152,6 +152,7 @@ class ConsumerBase(object): except Exception: LOG.exception(_("Failed to process message... skipping it.")) finally: + # TODO(sandy): Need support for optional ack_on_error. self.session.acknowledge(message) def get_receiver(self): @@ -615,7 +616,7 @@ class Connection(object): return consumer def join_consumer_pool(self, callback, pool_name, topic, - exchange_name=None): + exchange_name=None, ack_on_error=True): """Register as a member of a group of consumers for a given topic from the specified exchange. diff --git a/ceilometer/openstack/common/rpc/impl_zmq.py b/ceilometer/openstack/common/rpc/impl_zmq.py index 0dede3443..9cd64e6e4 100644 --- a/ceilometer/openstack/common/rpc/impl_zmq.py +++ b/ceilometer/openstack/common/rpc/impl_zmq.py @@ -358,7 +358,6 @@ class ZmqBaseReactor(ConsumerBase): def __init__(self, conf): super(ZmqBaseReactor, self).__init__() - self.mapping = {} self.proxies = {} self.threads = [] self.sockets = [] @@ -366,9 +365,8 @@ class ZmqBaseReactor(ConsumerBase): self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size) - def register(self, proxy, in_addr, zmq_type_in, out_addr=None, - zmq_type_out=None, in_bind=True, out_bind=True, - subscribe=None): + def register(self, proxy, in_addr, zmq_type_in, + in_bind=True, subscribe=None): LOG.info(_("Registering reactor")) @@ -384,21 +382,6 @@ class ZmqBaseReactor(ConsumerBase): LOG.info(_("In reactor registered")) - if not out_addr: - return - - if zmq_type_out not in (zmq.PUSH, zmq.PUB): - raise RPCException("Bad output socktype") - - # Items push out. - outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind) - - self.mapping[inq] = outq - self.mapping[outq] = inq - self.sockets.append(outq) - - LOG.info(_("Out reactor registered")) - def consume_in_thread(self): def _consume(sock): LOG.info(_("Consuming socket")) @@ -516,8 +499,7 @@ class ZmqProxy(ZmqBaseReactor): try: self.register(consumption_proxy, consume_in, - zmq.PULL, - out_bind=True) + zmq.PULL) except zmq.ZMQError: if os.access(ipc_dir, os.X_OK): with excutils.save_and_reraise_exception(): @@ -559,11 +541,6 @@ class ZmqReactor(ZmqBaseReactor): #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data) - if sock in self.mapping: - LOG.debug(_("ROUTER RELAY-OUT %(data)s") % { - 'data': data}) - self.mapping[sock].send(data) - return proxy = self.proxies[sock]