Merge trunk.
This commit is contained in:
@@ -22,6 +22,7 @@ import logging
|
|||||||
import Queue as queue
|
import Queue as queue
|
||||||
|
|
||||||
from carrot.backends import base
|
from carrot.backends import base
|
||||||
|
from eventlet import greenthread
|
||||||
|
|
||||||
|
|
||||||
class Message(base.BaseMessage):
|
class Message(base.BaseMessage):
|
||||||
@@ -38,6 +39,7 @@ class Exchange(object):
|
|||||||
def publish(self, message, routing_key=None):
|
def publish(self, message, routing_key=None):
|
||||||
logging.debug('(%s) publish (key: %s) %s',
|
logging.debug('(%s) publish (key: %s) %s',
|
||||||
self.name, routing_key, message)
|
self.name, routing_key, message)
|
||||||
|
routing_key = routing_key.split('.')[0]
|
||||||
if routing_key in self._routes:
|
if routing_key in self._routes:
|
||||||
for f in self._routes[routing_key]:
|
for f in self._routes[routing_key]:
|
||||||
logging.debug('Publishing to route %s', f)
|
logging.debug('Publishing to route %s', f)
|
||||||
@@ -94,6 +96,18 @@ class Backend(object):
|
|||||||
self._exchanges[exchange].bind(self._queues[queue].push,
|
self._exchanges[exchange].bind(self._queues[queue].push,
|
||||||
routing_key)
|
routing_key)
|
||||||
|
|
||||||
|
def declare_consumer(self, queue, callback, *args, **kwargs):
|
||||||
|
self.current_queue = queue
|
||||||
|
self.current_callback = callback
|
||||||
|
|
||||||
|
def consume(self, *args, **kwargs):
|
||||||
|
while True:
|
||||||
|
item = self.get(self.current_queue)
|
||||||
|
if item:
|
||||||
|
self.current_callback(item)
|
||||||
|
raise StopIteration()
|
||||||
|
greenthread.sleep(0)
|
||||||
|
|
||||||
def get(self, queue, no_ack=False):
|
def get(self, queue, no_ack=False):
|
||||||
if not queue in self._queues or not self._queues[queue].size():
|
if not queue in self._queues or not self._queues[queue].size():
|
||||||
return None
|
return None
|
||||||
|
|||||||
@@ -222,6 +222,10 @@ DEFINE_string('volume_manager', 'nova.volume.manager.AOEManager',
|
|||||||
DEFINE_string('scheduler_manager', 'nova.scheduler.manager.SchedulerManager',
|
DEFINE_string('scheduler_manager', 'nova.scheduler.manager.SchedulerManager',
|
||||||
'Manager for scheduler')
|
'Manager for scheduler')
|
||||||
|
|
||||||
|
# The service to use for image search and retrieval
|
||||||
|
DEFINE_string('image_service', 'nova.image.service.LocalImageService',
|
||||||
|
'The service to use for retrieving and searching for images.')
|
||||||
|
|
||||||
DEFINE_string('host', socket.gethostname(),
|
DEFINE_string('host', socket.gethostname(),
|
||||||
'name of this node')
|
'name of this node')
|
||||||
|
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ import uuid
|
|||||||
|
|
||||||
from carrot import connection as carrot_connection
|
from carrot import connection as carrot_connection
|
||||||
from carrot import messaging
|
from carrot import messaging
|
||||||
|
from eventlet import greenthread
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from twisted.internet import task
|
from twisted.internet import task
|
||||||
|
|
||||||
@@ -107,6 +108,14 @@ class Consumer(messaging.Consumer):
|
|||||||
logging.exception("Failed to fetch message from queue")
|
logging.exception("Failed to fetch message from queue")
|
||||||
self.failed_connection = True
|
self.failed_connection = True
|
||||||
|
|
||||||
|
def attach_to_eventlet(self):
|
||||||
|
"""Only needed for unit tests!"""
|
||||||
|
def fetch_repeatedly():
|
||||||
|
while True:
|
||||||
|
self.fetch(enable_callbacks=True)
|
||||||
|
greenthread.sleep(0.1)
|
||||||
|
greenthread.spawn(fetch_repeatedly)
|
||||||
|
|
||||||
def attach_to_twisted(self):
|
def attach_to_twisted(self):
|
||||||
"""Attach a callback to twisted that fires 10 times a second"""
|
"""Attach a callback to twisted that fires 10 times a second"""
|
||||||
loop = task.LoopingCall(self.fetch, enable_callbacks=True)
|
loop = task.LoopingCall(self.fetch, enable_callbacks=True)
|
||||||
|
|||||||
@@ -106,6 +106,7 @@ def serve(name, main):
|
|||||||
def daemonize(args, name, main):
|
def daemonize(args, name, main):
|
||||||
"""Does the work of daemonizing the process"""
|
"""Does the work of daemonizing the process"""
|
||||||
logging.getLogger('amqplib').setLevel(logging.WARN)
|
logging.getLogger('amqplib').setLevel(logging.WARN)
|
||||||
|
files_to_keep = []
|
||||||
if FLAGS.daemonize:
|
if FLAGS.daemonize:
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
formatter = logging.Formatter(
|
formatter = logging.Formatter(
|
||||||
@@ -114,12 +115,14 @@ def daemonize(args, name, main):
|
|||||||
syslog = logging.handlers.SysLogHandler(address='/dev/log')
|
syslog = logging.handlers.SysLogHandler(address='/dev/log')
|
||||||
syslog.setFormatter(formatter)
|
syslog.setFormatter(formatter)
|
||||||
logger.addHandler(syslog)
|
logger.addHandler(syslog)
|
||||||
|
files_to_keep.append(syslog.socket)
|
||||||
else:
|
else:
|
||||||
if not FLAGS.logfile:
|
if not FLAGS.logfile:
|
||||||
FLAGS.logfile = '%s.log' % name
|
FLAGS.logfile = '%s.log' % name
|
||||||
logfile = logging.FileHandler(FLAGS.logfile)
|
logfile = logging.FileHandler(FLAGS.logfile)
|
||||||
logfile.setFormatter(formatter)
|
logfile.setFormatter(formatter)
|
||||||
logger.addHandler(logfile)
|
logger.addHandler(logfile)
|
||||||
|
files_to_keep.append(logfile.stream)
|
||||||
stdin, stdout, stderr = None, None, None
|
stdin, stdout, stderr = None, None, None
|
||||||
else:
|
else:
|
||||||
stdin, stdout, stderr = sys.stdin, sys.stdout, sys.stderr
|
stdin, stdout, stderr = sys.stdin, sys.stdout, sys.stderr
|
||||||
@@ -139,6 +142,7 @@ def daemonize(args, name, main):
|
|||||||
stdout=stdout,
|
stdout=stdout,
|
||||||
stderr=stderr,
|
stderr=stderr,
|
||||||
uid=FLAGS.uid,
|
uid=FLAGS.uid,
|
||||||
gid=FLAGS.gid
|
gid=FLAGS.gid,
|
||||||
|
files_preserve=files_to_keep
|
||||||
):
|
):
|
||||||
main(args)
|
main(args)
|
||||||
|
|||||||
@@ -16,6 +16,7 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
from base64 import b64decode
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
from M2Crypto import BIO
|
from M2Crypto import BIO
|
||||||
@@ -63,11 +64,17 @@ class CloudTestCase(test.TrialTestCase):
|
|||||||
self.cloud = cloud.CloudController()
|
self.cloud = cloud.CloudController()
|
||||||
|
|
||||||
# set up a service
|
# set up a service
|
||||||
self.compute = utils.import_class(FLAGS.compute_manager)
|
self.compute = utils.import_class(FLAGS.compute_manager)()
|
||||||
self.compute_consumer = rpc.AdapterConsumer(connection=self.conn,
|
self.compute_consumer = rpc.AdapterConsumer(connection=self.conn,
|
||||||
topic=FLAGS.compute_topic,
|
topic=FLAGS.compute_topic,
|
||||||
proxy=self.compute)
|
proxy=self.compute)
|
||||||
self.compute_consumer.attach_to_twisted()
|
self.compute_consumer.attach_to_eventlet()
|
||||||
|
self.network = utils.import_class(FLAGS.network_manager)()
|
||||||
|
self.network_consumer = rpc.AdapterConsumer(connection=self.conn,
|
||||||
|
topic=FLAGS.network_topic,
|
||||||
|
proxy=self.network)
|
||||||
|
self.network_consumer.attach_to_eventlet()
|
||||||
|
|
||||||
|
|
||||||
self.manager = manager.AuthManager()
|
self.manager = manager.AuthManager()
|
||||||
self.user = self.manager.create_user('admin', 'admin', 'admin', True)
|
self.user = self.manager.create_user('admin', 'admin', 'admin', True)
|
||||||
@@ -85,15 +92,17 @@ class CloudTestCase(test.TrialTestCase):
|
|||||||
return cloud._gen_key(self.context, self.context.user.id, name)
|
return cloud._gen_key(self.context, self.context.user.id, name)
|
||||||
|
|
||||||
def test_console_output(self):
|
def test_console_output(self):
|
||||||
if FLAGS.connection_type == 'fake':
|
image_id = FLAGS.default_image
|
||||||
logging.debug("Can't test instances without a real virtual env.")
|
instance_type = FLAGS.default_instance_type
|
||||||
return
|
max_count = 1
|
||||||
instance_id = 'foo'
|
kwargs = {'image_id': image_id,
|
||||||
inst = yield self.compute.run_instance(instance_id)
|
'instance_type': instance_type,
|
||||||
output = yield self.cloud.get_console_output(self.context, [instance_id])
|
'max_count': max_count }
|
||||||
logging.debug(output)
|
rv = yield self.cloud.run_instances(self.context, **kwargs)
|
||||||
self.assert_(output)
|
instance_id = rv['instancesSet'][0]['instanceId']
|
||||||
rv = yield self.compute.terminate_instance(instance_id)
|
output = yield self.cloud.get_console_output(context=self.context, instance_id=[instance_id])
|
||||||
|
self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE OUTPUT')
|
||||||
|
rv = yield self.cloud.terminate_instances(self.context, [instance_id])
|
||||||
|
|
||||||
|
|
||||||
def test_key_generation(self):
|
def test_key_generation(self):
|
||||||
@@ -236,7 +245,8 @@ class CloudTestCase(test.TrialTestCase):
|
|||||||
|
|
||||||
def test_update_of_instance_display_fields(self):
|
def test_update_of_instance_display_fields(self):
|
||||||
inst = db.instance_create({}, {})
|
inst = db.instance_create({}, {})
|
||||||
self.cloud.update_instance(self.context, inst['ec2_id'],
|
ec2_id = cloud.internal_id_to_ec2_id(inst['internal_id'])
|
||||||
|
self.cloud.update_instance(self.context, ec2_id,
|
||||||
display_name='c00l 1m4g3')
|
display_name='c00l 1m4g3')
|
||||||
inst = db.instance_get({}, inst['id'])
|
inst = db.instance_get({}, inst['id'])
|
||||||
self.assertEqual('c00l 1m4g3', inst['display_name'])
|
self.assertEqual('c00l 1m4g3', inst['display_name'])
|
||||||
|
|||||||
@@ -133,13 +133,22 @@ class ObjectStoreTestCase(test.TrialTestCase):
|
|||||||
self.assertRaises(NotFound, objectstore.bucket.Bucket, 'new_bucket')
|
self.assertRaises(NotFound, objectstore.bucket.Bucket, 'new_bucket')
|
||||||
|
|
||||||
def test_images(self):
|
def test_images(self):
|
||||||
|
self.do_test_images('1mb.manifest.xml', True,
|
||||||
|
'image_bucket1', 'i-testing1')
|
||||||
|
|
||||||
|
def test_images_no_kernel_or_ramdisk(self):
|
||||||
|
self.do_test_images('1mb.no_kernel_or_ramdisk.manifest.xml',
|
||||||
|
False, 'image_bucket2', 'i-testing2')
|
||||||
|
|
||||||
|
def do_test_images(self, manifest_file, expect_kernel_and_ramdisk,
|
||||||
|
image_bucket, image_name):
|
||||||
"Test the image API."
|
"Test the image API."
|
||||||
self.context.user = self.auth_manager.get_user('user1')
|
self.context.user = self.auth_manager.get_user('user1')
|
||||||
self.context.project = self.auth_manager.get_project('proj1')
|
self.context.project = self.auth_manager.get_project('proj1')
|
||||||
|
|
||||||
# create a bucket for our bundle
|
# create a bucket for our bundle
|
||||||
objectstore.bucket.Bucket.create('image_bucket', self.context)
|
objectstore.bucket.Bucket.create(image_bucket, self.context)
|
||||||
bucket = objectstore.bucket.Bucket('image_bucket')
|
bucket = objectstore.bucket.Bucket(image_bucket)
|
||||||
|
|
||||||
# upload an image manifest/parts
|
# upload an image manifest/parts
|
||||||
bundle_path = os.path.join(os.path.dirname(__file__), 'bundle')
|
bundle_path = os.path.join(os.path.dirname(__file__), 'bundle')
|
||||||
@@ -147,18 +156,28 @@ class ObjectStoreTestCase(test.TrialTestCase):
|
|||||||
bucket[os.path.basename(path)] = open(path, 'rb').read()
|
bucket[os.path.basename(path)] = open(path, 'rb').read()
|
||||||
|
|
||||||
# register an image
|
# register an image
|
||||||
image.Image.register_aws_image('i-testing',
|
image.Image.register_aws_image(image_name,
|
||||||
'image_bucket/1mb.manifest.xml',
|
'%s/%s' % (image_bucket, manifest_file),
|
||||||
self.context)
|
self.context)
|
||||||
|
|
||||||
# verify image
|
# verify image
|
||||||
my_img = image.Image('i-testing')
|
my_img = image.Image(image_name)
|
||||||
result_image_file = os.path.join(my_img.path, 'image')
|
result_image_file = os.path.join(my_img.path, 'image')
|
||||||
self.assertEqual(os.stat(result_image_file).st_size, 1048576)
|
self.assertEqual(os.stat(result_image_file).st_size, 1048576)
|
||||||
|
|
||||||
sha = hashlib.sha1(open(result_image_file).read()).hexdigest()
|
sha = hashlib.sha1(open(result_image_file).read()).hexdigest()
|
||||||
self.assertEqual(sha, '3b71f43ff30f4b15b5cd85dd9e95ebc7e84eb5a3')
|
self.assertEqual(sha, '3b71f43ff30f4b15b5cd85dd9e95ebc7e84eb5a3')
|
||||||
|
|
||||||
|
if expect_kernel_and_ramdisk:
|
||||||
|
# Verify the default kernel and ramdisk are set
|
||||||
|
self.assertEqual(my_img.metadata['kernelId'], 'aki-test')
|
||||||
|
self.assertEqual(my_img.metadata['ramdiskId'], 'ari-test')
|
||||||
|
else:
|
||||||
|
# Verify that the default kernel and ramdisk (the one from FLAGS)
|
||||||
|
# doesn't get embedded in the metadata
|
||||||
|
self.assertFalse('kernelId' in my_img.metadata)
|
||||||
|
self.assertFalse('ramdiskId' in my_img.metadata)
|
||||||
|
|
||||||
# verify image permissions
|
# verify image permissions
|
||||||
self.context.user = self.auth_manager.get_user('user2')
|
self.context.user = self.auth_manager.get_user('user2')
|
||||||
self.context.project = self.auth_manager.get_project('proj2')
|
self.context.project = self.auth_manager.get_project('proj2')
|
||||||
|
|||||||
Reference in New Issue
Block a user