lots of fixes for rpc and extra imports

This commit is contained in:
Vishvananda Ishaya
2011-05-25 15:42:24 -07:00
committed by termie
parent aba7847b8a
commit b3338e3ca5
2 changed files with 34 additions and 46 deletions

View File

@@ -31,6 +31,7 @@ LOG = logging.getLogger("nova.fakerabbit")
EXCHANGES = {}
QUEUES = {}
CONSUMERS = {}
class Message(base.BaseMessage):
@@ -101,17 +102,20 @@ class Backend(base.BaseBackend):
EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs):
global CONSUMERS
LOG.debug("Adding consumer %s", consumer_tag)
self.consumers[consumer_tag] = (queue, callback)
CONSUMERS[consumer_tag] = (queue, callback)
def cancel(self, consumer_tag):
global CONSUMERS
LOG.debug("Removing consumer %s", consumer_tag)
del self.consumers[consumer_tag]
del CONSUMERS[consumer_tag]
def consume(self, limit=None):
global CONSUMERS
num = 0
while True:
for (queue, callback) in self.consumers.itervalues():
for (queue, callback) in CONSUMERS.itervalues():
item = self.get(queue)
if item:
callback(item)
@@ -147,5 +151,7 @@ class Backend(base.BaseBackend):
def reset_all():
global EXCHANGES
global QUEUES
global CONSUMERS
EXCHANGES = {}
QUEUES = {}
CONSUMERS = {}

View File

@@ -33,9 +33,7 @@ import uuid
from carrot import connection as carrot_connection
from carrot import messaging
import eventlet
from eventlet import greenpool
from eventlet import greenthread
from eventlet import pools
from eventlet import queue
@@ -142,30 +140,30 @@ class Consumer(messaging.Consumer):
FLAGS.rabbit_max_retries)
sys.exit(1)
#def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
# """Wraps the parent fetch with some logic for failed connection."""
# # TODO(vish): the logic for failed connections and logging should be
# # refactored into some sort of connection manager object
# try:
# if self.failed_connection:
# # NOTE(vish): connection is defined in the parent class, we can
# # recreate it as long as we create the backend too
# # pylint: disable=W0201
# self.connection = Connection.recreate()
# self.backend = self.connection.create_backend()
# self.declare()
# return super(Consumer, self).fetch(
# no_ack, auto_ack, enable_callbacks)
# if self.failed_connection:
# LOG.error(_('Reconnected to queue'))
# self.failed_connection = False
# # NOTE(vish): This is catching all errors because we really don't
# # want exceptions to be logged 10 times a second if some
# # persistent failure occurs.
# except Exception, e: # pylint: disable=W0703
# if not self.failed_connection:
# LOG.exception(_('Failed to fetch message from queue: %s' % e))
# self.failed_connection = True
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
"""Wraps the parent fetch with some logic for failed connection."""
# TODO(vish): the logic for failed connections and logging should be
# refactored into some sort of connection manager object
try:
if self.failed_connection:
# NOTE(vish): connection is defined in the parent class, we can
# recreate it as long as we create the backend too
# pylint: disable=W0201
self.connection = Connection.recreate()
self.backend = self.connection.create_backend()
self.declare()
return super(Consumer, self).fetch(
no_ack, auto_ack, enable_callbacks)
if self.failed_connection:
LOG.error(_('Reconnected to queue'))
self.failed_connection = False
# NOTE(vish): This is catching all errors because we really don't
# want exceptions to be logged 10 times a second if some
# persistent failure occurs.
except Exception, e: # pylint: disable=W0703
if not self.failed_connection:
LOG.exception(_('Failed to fetch message from queue: %s' % e))
self.failed_connection = True
def close(self, *args, **kwargs):
LOG.debug('Closing consumer %s', self.consumer_tag)
@@ -325,6 +323,7 @@ class ConsumerSet(object):
def close(self):
self.consumer_set.close()
class Publisher(messaging.Publisher):
"""Publisher base class."""
pass
@@ -511,23 +510,6 @@ class MulticallWaiter(object):
return self.wait()
def wait(self):
# TODO(termie): This is probably really a much simpler issue but am
# trying to solve the problem quickly. This works but
# I'd prefer to dig in and do it the best way later on.
#def _waiter():
# i = 0
# while not self._closed:
# LOG.error('Iteration #%s (%s)', i, self._consumer.consumer_tag)
# i += 1
# try:
# self._consumer.wait(limit=1)
# except StopIteration:
# pass
# self._consumer.close()
# ConnectionPool.put(self._consumer.connection)
#eventlet.spawn(_waiter)
while True:
rv = None
while rv is None and not self._closed: