Rename ec2 get_console_output's instance ID argument to 'instance_id'. It's passed as a kwarg, based on key in the http query, so it must be named this way.
This commit is contained in:
@@ -258,9 +258,9 @@ class CloudController(object):
|
||||
def delete_security_group(self, context, group_name, **kwargs):
|
||||
return True
|
||||
|
||||
def get_console_output(self, context, ec2_id_list, **kwargs):
|
||||
# ec2_id_list is passed in as a list of instances
|
||||
ec2_id = ec2_id_list[0]
|
||||
def get_console_output(self, context, instance_id, **kwargs):
|
||||
# instance_id is passed in as a list of instances
|
||||
ec2_id = instance_id[0]
|
||||
internal_id = ec2_id_to_internal_id(ec2_id)
|
||||
instance_ref = db.instance_get_by_internal_id(context, internal_id)
|
||||
return rpc.call('%s.%s' % (FLAGS.compute_topic,
|
||||
|
||||
@@ -69,6 +69,9 @@ def list(context, filter_list=[]):
|
||||
|
||||
optionally filtered by a list of image_id """
|
||||
|
||||
if FLAGS.connection_type == 'fake':
|
||||
return [{ 'imageId' : 'bar'}]
|
||||
|
||||
# FIXME: send along the list of only_images to check for
|
||||
response = conn(context).make_request(
|
||||
method='GET',
|
||||
|
||||
@@ -22,6 +22,7 @@ import logging
|
||||
import Queue as queue
|
||||
|
||||
from carrot.backends import base
|
||||
from eventlet import greenthread
|
||||
|
||||
|
||||
class Message(base.BaseMessage):
|
||||
@@ -38,6 +39,7 @@ class Exchange(object):
|
||||
def publish(self, message, routing_key=None):
|
||||
logging.debug('(%s) publish (key: %s) %s',
|
||||
self.name, routing_key, message)
|
||||
routing_key = routing_key.split('.')[0]
|
||||
if routing_key in self._routes:
|
||||
for f in self._routes[routing_key]:
|
||||
logging.debug('Publishing to route %s', f)
|
||||
@@ -94,6 +96,18 @@ class Backend(object):
|
||||
self._exchanges[exchange].bind(self._queues[queue].push,
|
||||
routing_key)
|
||||
|
||||
def declare_consumer(self, queue, callback, *args, **kwargs):
|
||||
self.current_queue = queue
|
||||
self.current_callback = callback
|
||||
|
||||
def consume(self, *args, **kwargs):
|
||||
while True:
|
||||
item = self.get(self.current_queue)
|
||||
if item:
|
||||
self.current_callback(item)
|
||||
raise StopIteration()
|
||||
greenthread.sleep(0)
|
||||
|
||||
def get(self, queue, no_ack=False):
|
||||
if not queue in self._queues or not self._queues[queue].size():
|
||||
return None
|
||||
|
||||
@@ -28,6 +28,7 @@ import uuid
|
||||
|
||||
from carrot import connection as carrot_connection
|
||||
from carrot import messaging
|
||||
from eventlet import greenthread
|
||||
from twisted.internet import defer
|
||||
from twisted.internet import task
|
||||
|
||||
@@ -107,6 +108,14 @@ class Consumer(messaging.Consumer):
|
||||
logging.exception("Failed to fetch message from queue")
|
||||
self.failed_connection = True
|
||||
|
||||
def attach_to_eventlet(self):
|
||||
"""Only needed for unit tests!"""
|
||||
def fetch_repeatedly():
|
||||
while True:
|
||||
self.fetch(enable_callbacks=True)
|
||||
greenthread.sleep(0.1)
|
||||
greenthread.spawn(fetch_repeatedly)
|
||||
|
||||
def attach_to_twisted(self):
|
||||
"""Attach a callback to twisted that fires 10 times a second"""
|
||||
loop = task.LoopingCall(self.fetch, enable_callbacks=True)
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from base64 import b64decode
|
||||
import json
|
||||
import logging
|
||||
from M2Crypto import BIO
|
||||
@@ -63,11 +64,17 @@ class CloudTestCase(test.TrialTestCase):
|
||||
self.cloud = cloud.CloudController()
|
||||
|
||||
# set up a service
|
||||
self.compute = utils.import_class(FLAGS.compute_manager)
|
||||
self.compute = utils.import_class(FLAGS.compute_manager)()
|
||||
self.compute_consumer = rpc.AdapterConsumer(connection=self.conn,
|
||||
topic=FLAGS.compute_topic,
|
||||
proxy=self.compute)
|
||||
self.compute_consumer.attach_to_twisted()
|
||||
self.compute_consumer.attach_to_eventlet()
|
||||
self.network = utils.import_class(FLAGS.network_manager)()
|
||||
self.network_consumer = rpc.AdapterConsumer(connection=self.conn,
|
||||
topic=FLAGS.network_topic,
|
||||
proxy=self.network)
|
||||
self.network_consumer.attach_to_eventlet()
|
||||
|
||||
|
||||
self.manager = manager.AuthManager()
|
||||
self.user = self.manager.create_user('admin', 'admin', 'admin', True)
|
||||
@@ -85,15 +92,17 @@ class CloudTestCase(test.TrialTestCase):
|
||||
return cloud._gen_key(self.context, self.context.user.id, name)
|
||||
|
||||
def test_console_output(self):
|
||||
if FLAGS.connection_type == 'fake':
|
||||
logging.debug("Can't test instances without a real virtual env.")
|
||||
return
|
||||
instance_id = 'foo'
|
||||
inst = yield self.compute.run_instance(instance_id)
|
||||
output = yield self.cloud.get_console_output(self.context, [instance_id])
|
||||
logging.debug(output)
|
||||
self.assert_(output)
|
||||
rv = yield self.compute.terminate_instance(instance_id)
|
||||
image_id = FLAGS.default_image
|
||||
instance_type = FLAGS.default_instance_type
|
||||
max_count = 1
|
||||
kwargs = {'image_id': image_id,
|
||||
'instance_type': instance_type,
|
||||
'max_count': max_count }
|
||||
rv = yield self.cloud.run_instances(self.context, **kwargs)
|
||||
instance_id = rv['instancesSet'][0]['instanceId']
|
||||
output = yield self.cloud.get_console_output(context=self.context, instance_id=[instance_id])
|
||||
self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE OUTPUT')
|
||||
rv = yield self.cloud.terminate_instances(self.context, [instance_id])
|
||||
|
||||
|
||||
def test_key_generation(self):
|
||||
|
||||
Reference in New Issue
Block a user