intermediate commit to checkpoint progress

all relevant tests are passing except volume, next step is volume manager fixery
This commit is contained in:
Andy Smith
2010-12-08 17:18:27 -08:00
parent 47dd2062a2
commit 609ee0c715
9 changed files with 100 additions and 143 deletions

View File

@@ -25,18 +25,18 @@ import json
import logging
import sys
import time
import traceback
import uuid
from carrot import connection as carrot_connection
from carrot import messaging
from eventlet import greenthread
from twisted.internet import defer
from twisted.internet import task
from nova import context
from nova import exception
from nova import fakerabbit
from nova import flags
from nova import context
from nova import utils
FLAGS = flags.FLAGS
@@ -128,17 +128,9 @@ class Consumer(messaging.Consumer):
def attach_to_eventlet(self):
"""Only needed for unit tests!"""
def fetch_repeatedly():
while True:
self.fetch(enable_callbacks=True)
greenthread.sleep(0.1)
greenthread.spawn(fetch_repeatedly)
def attach_to_twisted(self):
"""Attach a callback to twisted that fires 10 times a second"""
loop = task.LoopingCall(self.fetch, enable_callbacks=True)
loop.start(interval=0.1)
return loop
timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
timer.start(0.1)
return timer
class Publisher(messaging.Publisher):
@@ -197,10 +189,13 @@ class AdapterConsumer(TopicConsumer):
node_args = dict((str(k), v) for k, v in args.iteritems())
# NOTE(vish): magic is fun!
# pylint: disable-msg=W0142
d = defer.maybeDeferred(node_func, context=ctxt, **node_args)
if msg_id:
d.addCallback(lambda rval: msg_reply(msg_id, rval, None))
d.addErrback(lambda e: msg_reply(msg_id, None, e))
try:
rval = node_func(context=ctxt, **node_args)
if msg_id:
msg_reply(msg_id, rval, None)
except Exception as e:
if msg_id:
msg_reply(msg_id, None, sys.exc_info())
return
@@ -244,11 +239,11 @@ def msg_reply(msg_id, reply=None, failure=None):
failure should be a twisted failure object"""
if failure:
message = failure.getErrorMessage()
traceback = failure.getTraceback()
message = str(failure[1])
tb = traceback.format_exception(*failure)
logging.error("Returning exception %s to caller", message)
logging.error(traceback)
failure = (failure.type.__name__, str(failure.value), traceback)
logging.error(tb)
failure = (failure[0].__name__, str(failure[1]), tb)
conn = Connection.instance()
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
@@ -313,8 +308,8 @@ def call(context, topic, msg):
_pack_context(msg, context)
class WaitMessage(object):
def __call__(self, data, message):
LOG.debug('data %s, msg %s', data, message)
"""Acks message and sets result."""
message.ack()
if data['failure']:
@@ -337,41 +332,11 @@ def call(context, topic, msg):
except StopIteration:
pass
consumer.close()
if isinstance(wait_msg.result, Exception):
raise wait_msg.result
return wait_msg.result
def call_twisted(context, topic, msg):
"""Sends a message on a topic and wait for a response"""
LOG.debug("Making asynchronous call...")
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug("MSG_ID is %s" % (msg_id))
_pack_context(msg, context)
conn = Connection.instance()
d = defer.Deferred()
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
def deferred_receive(data, message):
"""Acks message and callbacks or errbacks"""
message.ack()
if data['failure']:
return d.errback(RemoteError(*data['failure']))
else:
return d.callback(data['result'])
consumer.register_callback(deferred_receive)
injected = consumer.attach_to_twisted()
# clean up after the injected listened and return x
d.addCallback(lambda x: injected.stop() and x or x)
publisher = TopicPublisher(connection=conn, topic=topic)
publisher.send(msg)
publisher.close()
return d
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response"""
LOG.debug("Making asynchronous cast...")

View File

@@ -16,10 +16,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
#import logging
from M2Crypto import X509
import unittest
import eventlet
logging = eventlet.import_patched('logging')
from nova import crypto
from nova import flags
from nova import test

View File

@@ -27,8 +27,6 @@ import tempfile
import time
from eventlet import greenthread
from twisted.internet import defer
import unittest
from xml.etree import ElementTree
from nova import context
@@ -186,7 +184,7 @@ class CloudTestCase(test.TrialTestCase):
logging.debug("Need to watch instance %s until it's running..." %
instance['instance_id'])
while True:
rv = yield defer.succeed(time.sleep(1))
greenthread.sleep(1)
info = self.cloud._get_instance(instance['instance_id'])
logging.debug(info['state'])
if info['state'] == power_state.RUNNING:

View File

@@ -22,8 +22,6 @@ Tests For Compute
import datetime
import logging
from twisted.internet import defer
from nova import context
from nova import db
from nova import exception
@@ -33,6 +31,7 @@ from nova import utils
from nova.auth import manager
from nova.compute import api as compute_api
FLAGS = flags.FLAGS
@@ -94,24 +93,22 @@ class ComputeTestCase(test.TrialTestCase):
db.security_group_destroy(self.context, group['id'])
db.instance_destroy(self.context, ref[0]['id'])
@defer.inlineCallbacks
def test_run_terminate(self):
"""Make sure it is possible to run and terminate instance"""
instance_id = self._create_instance()
yield self.compute.run_instance(self.context, instance_id)
self.compute.run_instance(self.context, instance_id)
instances = db.instance_get_all(context.get_admin_context())
logging.info("Running instances: %s", instances)
self.assertEqual(len(instances), 1)
yield self.compute.terminate_instance(self.context, instance_id)
self.compute.terminate_instance(self.context, instance_id)
instances = db.instance_get_all(context.get_admin_context())
logging.info("After terminating instances: %s", instances)
self.assertEqual(len(instances), 0)
@defer.inlineCallbacks
def test_run_terminate_timestamps(self):
"""Make sure timestamps are set for launched and destroyed"""
instance_id = self._create_instance()
@@ -119,42 +116,40 @@ class ComputeTestCase(test.TrialTestCase):
self.assertEqual(instance_ref['launched_at'], None)
self.assertEqual(instance_ref['deleted_at'], None)
launch = datetime.datetime.utcnow()
yield self.compute.run_instance(self.context, instance_id)
self.compute.run_instance(self.context, instance_id)
instance_ref = db.instance_get(self.context, instance_id)
self.assert_(instance_ref['launched_at'] > launch)
self.assertEqual(instance_ref['deleted_at'], None)
terminate = datetime.datetime.utcnow()
yield self.compute.terminate_instance(self.context, instance_id)
self.compute.terminate_instance(self.context, instance_id)
self.context = self.context.elevated(True)
instance_ref = db.instance_get(self.context, instance_id)
self.assert_(instance_ref['launched_at'] < terminate)
self.assert_(instance_ref['deleted_at'] > terminate)
@defer.inlineCallbacks
def test_reboot(self):
"""Ensure instance can be rebooted"""
instance_id = self._create_instance()
yield self.compute.run_instance(self.context, instance_id)
yield self.compute.reboot_instance(self.context, instance_id)
yield self.compute.terminate_instance(self.context, instance_id)
self.compute.run_instance(self.context, instance_id)
self.compute.reboot_instance(self.context, instance_id)
self.compute.terminate_instance(self.context, instance_id)
@defer.inlineCallbacks
def test_console_output(self):
"""Make sure we can get console output from instance"""
instance_id = self._create_instance()
yield self.compute.run_instance(self.context, instance_id)
self.compute.run_instance(self.context, instance_id)
console = yield self.compute.get_console_output(self.context,
console = self.compute.get_console_output(self.context,
instance_id)
self.assert_(console)
yield self.compute.terminate_instance(self.context, instance_id)
self.compute.terminate_instance(self.context, instance_id)
@defer.inlineCallbacks
def test_run_instance_existing(self):
"""Ensure failure when running an instance that already exists"""
instance_id = self._create_instance()
yield self.compute.run_instance(self.context, instance_id)
self.assertFailure(self.compute.run_instance(self.context,
instance_id),
exception.Error)
yield self.compute.terminate_instance(self.context, instance_id)
self.compute.run_instance(self.context, instance_id)
self.assertRaises(exception.Error,
self.compute.run_instance,
self.context,
instance_id)
self.compute.terminate_instance(self.context, instance_id)

View File

@@ -20,8 +20,6 @@ Unit Tests for remote procedure calls using queue
"""
import logging
from twisted.internet import defer
from nova import context
from nova import flags
from nova import rpc
@@ -40,23 +38,22 @@ class RpcTestCase(test.TrialTestCase):
self.consumer = rpc.AdapterConsumer(connection=self.conn,
topic='test',
proxy=self.receiver)
self.consumer.attach_to_twisted()
self.consumer.attach_to_eventlet()
self.context = context.get_admin_context()
def test_call_succeed(self):
"""Get a value through rpc call"""
value = 42
result = yield rpc.call_twisted(self.context,
'test', {"method": "echo",
result = rpc.call(self.context, 'test', {"method": "echo",
"args": {"value": value}})
self.assertEqual(value, result)
def test_context_passed(self):
"""Makes sure a context is passed through rpc call"""
value = 42
result = yield rpc.call_twisted(self.context,
'test', {"method": "context",
"args": {"value": value}})
result = rpc.call(self.context,
'test', {"method": "context",
"args": {"value": value}})
self.assertEqual(self.context.to_dict(), result)
def test_call_exception(self):
@@ -67,14 +64,17 @@ class RpcTestCase(test.TrialTestCase):
to an int in the test.
"""
value = 42
self.assertFailure(rpc.call_twisted(self.context, 'test',
{"method": "fail",
"args": {"value": value}}),
rpc.RemoteError)
self.assertRaises(rpc.RemoteError,
rpc.call,
self.context,
'test',
{"method": "fail",
"args": {"value": value}})
try:
yield rpc.call_twisted(self.context,
'test', {"method": "fail",
"args": {"value": value}})
rpc.call(self.context,
'test',
{"method": "fail",
"args": {"value": value}})
self.fail("should have thrown rpc.RemoteError")
except rpc.RemoteError as exc:
self.assertEqual(int(exc.value), value)
@@ -89,13 +89,13 @@ class TestReceiver(object):
def echo(context, value):
"""Simply returns whatever value is sent in"""
logging.debug("Received %s", value)
return defer.succeed(value)
return value
@staticmethod
def context(context, value):
"""Returns dictionary version of context"""
logging.debug("Received %s", context)
return defer.succeed(context.to_dict())
return context.to_dict()
@staticmethod
def fail(context, value):

View File

@@ -143,7 +143,6 @@ class ServiceTestCase(test.TrialTestCase):
# whether it is disconnected, it looks for a variable on itself called
# 'model_disconnected' and report_state doesn't really do much so this
# these are mostly just for coverage
@defer.inlineCallbacks
def test_report_state_no_service(self):
host = 'foo'
binary = 'bar'
@@ -174,9 +173,8 @@ class ServiceTestCase(test.TrialTestCase):
topic,
'nova.tests.service_unittest.FakeManager')
serv.startService()
yield serv.report_state()
serv.report_state()
@defer.inlineCallbacks
def test_report_state_newly_disconnected(self):
host = 'foo'
binary = 'bar'
@@ -205,10 +203,9 @@ class ServiceTestCase(test.TrialTestCase):
topic,
'nova.tests.service_unittest.FakeManager')
serv.startService()
yield serv.report_state()
serv.report_state()
self.assert_(serv.model_disconnected)
@defer.inlineCallbacks
def test_report_state_newly_connected(self):
host = 'foo'
binary = 'bar'
@@ -240,6 +237,6 @@ class ServiceTestCase(test.TrialTestCase):
'nova.tests.service_unittest.FakeManager')
serv.startService()
serv.model_disconnected = True
yield serv.report_state()
serv.report_state()
self.assert_(not serv.model_disconnected)

View File

@@ -235,7 +235,7 @@ class NWFilterTestCase(test.TrialTestCase):
'project_id': 'fake'})
inst_id = instance_ref['id']
def _ensure_all_called(_):
def _ensure_all_called():
instance_filter = 'nova-instance-%s' % instance_ref['name']
secgroup_filter = 'nova-secgroup-%s' % self.security_group['id']
for required in [secgroup_filter, 'allow-dhcp-server',
@@ -253,7 +253,6 @@ class NWFilterTestCase(test.TrialTestCase):
instance = db.instance_get(self.context, inst_id)
d = self.fw.setup_nwfilters_for_instance(instance)
d.addCallback(_ensure_all_called)
d.addCallback(lambda _: self.teardown_security_group())
_ensure_all_called()
self.teardown_security_group()
return d

View File

@@ -21,8 +21,6 @@ Tests for Volume Code.
"""
import logging
from twisted.internet import defer
from nova import context
from nova import exception
from nova import db
@@ -56,51 +54,48 @@ class VolumeTestCase(test.TrialTestCase):
vol['attach_status'] = "detached"
return db.volume_create(context.get_admin_context(), vol)['id']
@defer.inlineCallbacks
def test_create_delete_volume(self):
"""Test volume can be created and deleted."""
volume_id = self._create_volume()
yield self.volume.create_volume(self.context, volume_id)
self.volume.create_volume(self.context, volume_id)
self.assertEqual(volume_id, db.volume_get(context.get_admin_context(),
volume_id).id)
yield self.volume.delete_volume(self.context, volume_id)
self.volume.delete_volume(self.context, volume_id)
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
volume_id)
@defer.inlineCallbacks
def test_too_big_volume(self):
"""Ensure failure if a too large of a volume is requested."""
# FIXME(vish): validation needs to move into the data layer in
# volume_create
defer.returnValue(True)
return True
try:
volume_id = self._create_volume('1001')
yield self.volume.create_volume(self.context, volume_id)
self.volume.create_volume(self.context, volume_id)
self.fail("Should have thrown TypeError")
except TypeError:
pass
@defer.inlineCallbacks
def test_too_many_volumes(self):
"""Ensure that NoMoreTargets is raised when we run out of volumes."""
vols = []
total_slots = FLAGS.iscsi_num_targets
for _index in xrange(total_slots):
volume_id = self._create_volume()
yield self.volume.create_volume(self.context, volume_id)
self.volume.create_volume(self.context, volume_id)
vols.append(volume_id)
volume_id = self._create_volume()
self.assertFailure(self.volume.create_volume(self.context,
volume_id),
db.NoMoreTargets)
self.assertRaises(db.NoMoreTargets,
self.volume.create_volume,
self.context,
volume_id)
db.volume_destroy(context.get_admin_context(), volume_id)
for volume_id in vols:
yield self.volume.delete_volume(self.context, volume_id)
self.volume.delete_volume(self.context, volume_id)
@defer.inlineCallbacks
def test_run_attach_detach_volume(self):
"""Make sure volume can be attached and detached from instance."""
inst = {}
@@ -115,15 +110,15 @@ class VolumeTestCase(test.TrialTestCase):
instance_id = db.instance_create(self.context, inst)['id']
mountpoint = "/dev/sdf"
volume_id = self._create_volume()
yield self.volume.create_volume(self.context, volume_id)
self.volume.create_volume(self.context, volume_id)
if FLAGS.fake_tests:
db.volume_attached(self.context, volume_id, instance_id,
mountpoint)
else:
yield self.compute.attach_volume(self.context,
instance_id,
volume_id,
mountpoint)
self.compute.attach_volume(self.context,
instance_id,
volume_id,
mountpoint)
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(vol['status'], "in-use")
self.assertEqual(vol['attach_status'], "attached")
@@ -131,25 +126,26 @@ class VolumeTestCase(test.TrialTestCase):
instance_ref = db.volume_get_instance(self.context, volume_id)
self.assertEqual(instance_ref['id'], instance_id)
self.assertFailure(self.volume.delete_volume(self.context, volume_id),
exception.Error)
self.assertRaises(exception.Error,
self.volume.delete_volume,
self.context,
volume_id)
if FLAGS.fake_tests:
db.volume_detached(self.context, volume_id)
else:
yield self.compute.detach_volume(self.context,
instance_id,
volume_id)
self.compute.detach_volume(self.context,
instance_id,
volume_id)
vol = db.volume_get(self.context, volume_id)
self.assertEqual(vol['status'], "available")
yield self.volume.delete_volume(self.context, volume_id)
self.volume.delete_volume(self.context, volume_id)
self.assertRaises(exception.Error,
db.volume_get,
self.context,
volume_id)
db.instance_destroy(self.context, instance_id)
@defer.inlineCallbacks
def test_concurrent_volumes_get_different_targets(self):
"""Ensure multiple concurrent volumes get different targets."""
volume_ids = []
@@ -164,15 +160,11 @@ class VolumeTestCase(test.TrialTestCase):
self.assert_(iscsi_target not in targets)
targets.append(iscsi_target)
logging.debug("Target %s allocated", iscsi_target)
deferreds = []
total_slots = FLAGS.iscsi_num_targets
for _index in xrange(total_slots):
volume_id = self._create_volume()
d = self.volume.create_volume(self.context, volume_id)
d.addCallback(_check)
d.addErrback(self.fail)
deferreds.append(d)
yield defer.DeferredList(deferreds)
_check(d)
for volume_id in volume_ids:
self.volume.delete_volume(self.context, volume_id)

View File

@@ -39,11 +39,16 @@ Due to our use of multiprocessing it we frequently get some ignorable
"""
import eventlet
eventlet.monkey_patch()
import __main__
import os
import sys
from twisted.scripts import trial as trial_script
import unittest
from nova import flags
from nova import twistd
@@ -56,12 +61,12 @@ from nova.tests.compute_unittest import *
from nova.tests.flags_unittest import *
from nova.tests.misc_unittest import *
from nova.tests.network_unittest import *
from nova.tests.objectstore_unittest import *
from nova.tests.process_unittest import *
#from nova.tests.objectstore_unittest import *
#from nova.tests.process_unittest import *
from nova.tests.quota_unittest import *
from nova.tests.rpc_unittest import *
from nova.tests.scheduler_unittest import *
from nova.tests.service_unittest import *
#from nova.tests.service_unittest import *
from nova.tests.twistd_unittest import *
from nova.tests.validator_unittest import *
from nova.tests.virt_unittest import *
@@ -82,6 +87,8 @@ if __name__ == '__main__':
config = OptionsClass()
argv = config.parseOptions()
argv = FLAGS(sys.argv)
FLAGS.verbose = True
# TODO(termie): these should make a call instead of doing work on import
@@ -90,6 +97,7 @@ if __name__ == '__main__':
else:
from nova.tests.real_flags import *
# Establish redirect for STDERR
sys.stderr.flush()
err = open(FLAGS.tests_stderr, 'w+', 0)