fix consumers to actually be deleted and clean up cloud test

This commit is contained in:
Vishvananda Ishaya
2011-05-25 15:42:24 -07:00
committed by termie
parent 2368bc4fe3
commit 1da5df1d8e
3 changed files with 29 additions and 23 deletions

View File

@@ -79,7 +79,7 @@ class Queue(object):
class Backend(base.BaseBackend):
def __init__(self, connection, **kwargs):
super(Backend, self).__init__(connection, **kwargs)
self.consumers = []
self.consumers = {}
def queue_declare(self, queue, **kwargs):
global QUEUES
@@ -100,13 +100,18 @@ class Backend(base.BaseBackend):
' key %(routing_key)s') % locals())
EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
def declare_consumer(self, queue, callback, *args, **kwargs):
self.consumers.append((queue, callback))
def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs):
LOG.debug("Adding consumer %s", consumer_tag)
self.consumers[consumer_tag] = (queue, callback)
def cancel(self, consumer_tag):
LOG.debug("Removing consumer %s", consumer_tag)
del self.consumers[consumer_tag]
def consume(self, limit=None):
num = 0
while True:
for (queue, callback) in self.consumers:
for (queue, callback) in self.consumers.itervalues():
item = self.get(queue)
if item:
callback(item)

View File

@@ -30,11 +30,11 @@ import time
import traceback
import uuid
import greenlet
from carrot import connection as carrot_connection
from carrot import messaging
import eventlet
from eventlet import greenpool
from eventlet import greenthread
from eventlet import pools
from eventlet import queue
@@ -266,6 +266,7 @@ class ConsumerSet(object):
def __init__(self, conn, consumer_list):
self.consumer_list = set(consumer_list)
self.consumer_set = None
self.enabled = True
self.init(conn)
def init(self, conn):
@@ -283,15 +284,21 @@ class ConsumerSet(object):
self.init(None)
def wait(self, limit=None):
while True:
running = True
while running:
it = self.consumer_set.iterconsume(limit=limit)
if not it:
break
while True:
try:
it.next()
except StopIteration:
return
except greenlet.GreenletExit:
running = False
break
except Exception as e:
LOG.error(_("Received exception %s " % str(e) + \
LOG.error(_("Received exception %s " % type(e) + \
"while processing consumer"))
self.reconnect()
# Break to outer loop

View File

@@ -17,13 +17,8 @@
# under the License.
from base64 import b64decode
import json
from M2Crypto import BIO
from M2Crypto import RSA
import os
import shutil
import tempfile
import time
from eventlet import greenthread
@@ -33,12 +28,10 @@ from nova import db
from nova import flags
from nova import log as logging
from nova import rpc
from nova import service
from nova import test
from nova import utils
from nova import exception
from nova.auth import manager
from nova.compute import power_state
from nova.api.ec2 import cloud
from nova.api.ec2 import ec2utils
from nova.image import local
@@ -79,6 +72,15 @@ class CloudTestCase(test.TestCase):
self.stubs.Set(local.LocalImageService, 'show', fake_show)
self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show)
# NOTE(vish): set up a manual wait so rpc.cast has a chance to finish
rpc_cast = rpc.cast
def finish_cast(*args, **kwargs):
rpc_cast(*args, **kwargs)
greenthread.sleep(0.2)
self.stubs.Set(rpc, 'cast', finish_cast)
def tearDown(self):
network_ref = db.project_get_network(self.context,
self.project.id)
@@ -113,7 +115,6 @@ class CloudTestCase(test.TestCase):
self.cloud.describe_addresses(self.context)
self.cloud.release_address(self.context,
public_ip=address)
greenthread.sleep(0.3)
db.floating_ip_destroy(self.context, address)
def test_associate_disassociate_address(self):
@@ -129,12 +130,10 @@ class CloudTestCase(test.TestCase):
self.cloud.associate_address(self.context,
instance_id=ec2_id,
public_ip=address)
greenthread.sleep(0.3)
self.cloud.disassociate_address(self.context,
public_ip=address)
self.cloud.release_address(self.context,
public_ip=address)
greenthread.sleep(0.3)
self.network.deallocate_fixed_ip(self.context, fixed)
db.instance_destroy(self.context, inst['id'])
db.floating_ip_destroy(self.context, address)
@@ -306,31 +305,26 @@ class CloudTestCase(test.TestCase):
'instance_type': instance_type,
'max_count': max_count}
rv = self.cloud.run_instances(self.context, **kwargs)
greenthread.sleep(0.3)
instance_id = rv['instancesSet'][0]['instanceId']
output = self.cloud.get_console_output(context=self.context,
instance_id=[instance_id])
self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT')
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
greenthread.sleep(0.3)
rv = self.cloud.terminate_instances(self.context, [instance_id])
greenthread.sleep(0.3)
def test_ajax_console(self):
kwargs = {'image_id': 'ami-1'}
rv = self.cloud.run_instances(self.context, **kwargs)
instance_id = rv['instancesSet'][0]['instanceId']
greenthread.sleep(0.3)
output = self.cloud.get_ajax_console(context=self.context,
instance_id=[instance_id])
self.assertEquals(output['url'],
'%s/?token=FAKETOKEN' % FLAGS.ajax_console_proxy_url)
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
greenthread.sleep(0.3)
rv = self.cloud.terminate_instances(self.context, [instance_id])
greenthread.sleep(0.3)
def test_key_generation(self):
result = self._create_key('test')