fix test_rpc and kombu stuff

This commit is contained in:
Chris Behrens
2011-08-28 18:17:21 -07:00
parent a966a416ff
commit 7b8bf743e0
5 changed files with 79 additions and 32 deletions

View File

@@ -1,2 +0,0 @@
Move some code duplication between carrot/kombu into common.py
The other FIXMEs in __init__.py and impl_kombu.py

View File

@@ -38,18 +38,13 @@ RPCIMPL = import_object(impl_table.get(FLAGS.rpc_backend,
def create_connection(new=True):
return RPCIMPL.Connection.instance(new=True)
return RPCIMPL.create_connection(new=new)
def create_consumer(conn, topic, proxy, fanout=False):
return RPCIMPL.create_consumer(conn, topic, proxy, fanout)
def create_consumer_set(conn, consumers):
# FIXME(comstud): replace however necessary
return RPCIMPL.ConsumerSet(connection=conn, consumer_list=consumers)
def call(context, topic, msg):
return RPCIMPL.call(context, topic, msg)

View File

@@ -33,6 +33,7 @@ import uuid
from carrot import connection as carrot_connection
from carrot import messaging
import eventlet
from eventlet import greenpool
from eventlet import pools
from eventlet import queue
@@ -42,10 +43,10 @@ from nova import context
from nova import exception
from nova import fakerabbit
from nova import flags
from nova import log as logging
from nova import utils
from nova.rpc.common import RemoteError, LOG
# Needed for tests
eventlet.monkey_patch()
FLAGS = flags.FLAGS
flags.DEFINE_integer('rpc_thread_pool_size', 1024,
@@ -57,6 +58,11 @@ flags.DEFINE_integer('rpc_conn_pool_size', 30,
class Connection(carrot_connection.BrokerConnection):
"""Connection instance object."""
def __init__(self, *args, **kwargs):
super(Connection, self).__init__(*args, **kwargs)
self._rpc_consumers = []
self._rpc_consumer_thread = None
@classmethod
def instance(cls, new=True):
"""Returns the instance."""
@@ -94,6 +100,42 @@ class Connection(carrot_connection.BrokerConnection):
pass
return cls.instance()
def close(self):
self.cancel_consumer_thread()
for consumer in self._rpc_consumers:
try:
consumer.close()
except Exception:
# ignore all errors
pass
self._rpc_consumers = []
super(Connection, self).close()
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
consumer_set = ConsumerSet(connection=self,
consumer_list=self._rpc_consumers)
def _consumer_thread():
try:
consumer_set.wait()
except greenlet.GreenletExit:
return
if not self._rpc_consumer_thread:
self._rpc_consumer_thread = eventlet.spawn(_consumer_thread)
return self._rpc_consumer_thread
def cancel_consumer_thread(self):
"""Cancel a consumer thread"""
if self._rpc_consumer_thread:
self._rpc_consumer_thread.kill()
try:
self._rpc_consumer_thread.wait()
except greenlet.GreenletExit:
pass
self._rpc_consumer_thread = None
class Pool(pools.Pool):
"""Class that implements a Pool of Connections."""
@@ -119,6 +161,7 @@ class Consumer(messaging.Consumer):
"""
def __init__(self, *args, **kwargs):
connection = kwargs.get('connection')
max_retries = FLAGS.rabbit_max_retries
sleep_time = FLAGS.rabbit_retry_interval
tries = 0
@@ -148,6 +191,7 @@ class Consumer(messaging.Consumer):
LOG.error(_('Unable to connect to AMQP server '
'after %(tries)d tries. Shutting down.') % locals())
sys.exit(1)
connection._rpc_consumers.append(self)
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
"""Wraps the parent fetch with some logic for failed connection."""
@@ -175,12 +219,6 @@ class Consumer(messaging.Consumer):
LOG.exception(_('Failed to fetch message from queue: %s' % e))
self.failed_connection = True
def attach_to_eventlet(self):
"""Only needed for unit tests!"""
timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
timer.start(0.1)
return timer
class AdapterConsumer(Consumer):
"""Calls methods on a proxy object based on method and args."""
@@ -251,7 +289,7 @@ class AdapterConsumer(Consumer):
# NOTE(vish): this iterates through the generator
list(rval)
except Exception as e:
logging.exception('Exception during message handling')
LOG.exception('Exception during message handling')
if msg_id:
msg_reply(msg_id, None, sys.exc_info())
return

View File

@@ -14,9 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from nova import flags
from nova.rpc.common import RemoteError, LOG
import kombu
import kombu.entity
import kombu.messaging
@@ -24,8 +21,22 @@ import kombu.connection
import itertools
import sys
import time
import traceback
import types
import uuid
import eventlet
from eventlet import greenpool
from eventlet import pools
import greenlet
from nova import context
from nova import exception
from nova import flags
from nova.rpc.common import RemoteError, LOG
# Needed for tests
eventlet.monkey_patch()
FLAGS = flags.FLAGS
@@ -317,7 +328,7 @@ class Connection(object):
pass
time.sleep(1)
self.connection = kombu.connection.Connection(**self.params)
self.queue_num = itertools.count(1)
self.consumer_num = itertools.count(1)
try:
self.connection.ensure_connection(errback=self.connect_error,
@@ -634,7 +645,7 @@ class RpcContext(context.RequestContext):
class MulticallWaiter(object):
def __init__(self, connection):
self._connection = connection
self._iterator = connection.consume()
self._iterator = connection.iterconsume()
self._result = None
self._done = False