Kashyap | Refactored all the tests to be concurrency friendly, we can now run all the tests concurrently

This commit is contained in:
Kashyap Kopparam
2014-09-10 15:44:31 +05:30
parent be2724ef83
commit 7f9be535a6

View File

@@ -9,33 +9,36 @@ from ..ec2driver_config import *
class EC2DriverTest(unittest.TestCase):
_multiprocess_shared_ = True
@classmethod
def setUp(self):
print "Establishing connection with AWS"
self.ec2_conn = ec2.connect_to_region(aws_region, aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key)
self.creds = get_nova_creds()
self.nova = client.Client(**self.creds)
self.server = None
self.servers = []
def spawn_ec2_instance(self):
print "Spawning an instance"
image = self.nova.images.find(name="cirros-0.3.1-x86_64-uec")
flavor = self.nova.flavors.find(name="m1.tiny")
self.server = self.nova.servers.create(name="cirros-test", image=image.id, flavor=flavor.id)
instance = self.nova.servers.get(self.server.id)
server = self.nova.servers.create(name="cirros-test", image=image.id, flavor=flavor.id)
instance = self.nova.servers.get(server.id)
while instance.status != 'ACTIVE':
time.sleep(10)
instance = self.nova.servers.get(self.server.id)
instance = self.nova.servers.get(server.id)
self.servers.append(instance)
return instance
def test_spawn(self):
self.spawn_ec2_instance()
instance = self.spawn_ec2_instance()
ec2_instance = self.ec2_conn.get_only_instances(instance_ids=[self.server.metadata['ec2_id']], filters=None,
ec2_instance = self.ec2_conn.get_only_instances(instance_ids=[instance.metadata['ec2_id']], filters=None,
dry_run=False, max_results=None)
self.assertEqual(ec2_instance[0].id, self.server.metadata['ec2_id'])
self.assertEqual(ec2_instance[0].id, instance.metadata['ec2_id'])
def test_destroy(self):
instance = self.spawn_ec2_instance()
@@ -71,10 +74,10 @@ class EC2DriverTest(unittest.TestCase):
while instance.status != 'SHUTOFF':
time.sleep(5)
instance = self.nova.servers.get(self.server.id)
instance = self.nova.servers.get(instance.id)
#assert power off
ec2_instance = self.ec2_conn.get_only_instances(instance_ids=[self.server.metadata['ec2_id']], filters=None,
ec2_instance = self.ec2_conn.get_only_instances(instance_ids=[instance.metadata['ec2_id']], filters=None,
dry_run=False, max_results=None)[0]
self.assertEqual(ec2_instance.state, "stopped")
@@ -86,14 +89,14 @@ class EC2DriverTest(unittest.TestCase):
# we are waiting for the status to actually get to 'Reboot' before beginning to wait for it to go to 'Active' status
while instance.status != 'REBOOT':
# We don't sleep here because the soft reboot may take less than a second
instance = self.nova.servers.get(self.server.id)
instance = self.nova.servers.get(instance.id)
while instance.status != 'ACTIVE':
time.sleep(5)
instance = self.nova.servers.get(self.server.id)
instance = self.nova.servers.get(instance.id)
#assert restarted
ec2_instance = self.ec2_conn.get_only_instances(instance_ids=[self.server.metadata['ec2_id']], filters=None,
ec2_instance = self.ec2_conn.get_only_instances(instance_ids=[instance.metadata['ec2_id']], filters=None,
dry_run=False, max_results=None)[0]
self.assertEqual(ec2_instance.state, "running")
@@ -105,21 +108,21 @@ class EC2DriverTest(unittest.TestCase):
# we are waiting for the status to actually get to 'Hard Reboot' before beginning to wait for it to go to 'Active' status
while instance.status != 'HARD_REBOOT':
time.sleep(5)
instance = self.nova.servers.get(self.server.id)
instance = self.nova.servers.get(instance.id)
while instance.status != 'ACTIVE':
time.sleep(5)
instance = self.nova.servers.get(self.server.id)
instance = self.nova.servers.get(instance.id)
#assert restarted
ec2_instance = self.ec2_conn.get_only_instances(instance_ids=[self.server.metadata['ec2_id']], filters=None,
ec2_instance = self.ec2_conn.get_only_instances(instance_ids=[instance.metadata['ec2_id']], filters=None,
dry_run=False, max_results=None)[0]
self.assertEqual(ec2_instance.state, "running")
def test_resize(self):
instance = self.spawn_ec2_instance()
ec2_instance = self.ec2_conn.get_only_instances(instance_ids=[self.server.metadata['ec2_id']], filters=None,
ec2_instance = self.ec2_conn.get_only_instances(instance_ids=[instance.metadata['ec2_id']], filters=None,
dry_run=False, max_results=None)[0]
self.assertEqual(ec2_instance.instance_type, "t2.micro")
@@ -132,23 +135,24 @@ class EC2DriverTest(unittest.TestCase):
# wait for the status to actually go to Verify_Resize, before confirming the resize.
while instance.status != 'VERIFY_RESIZE':
time.sleep(5)
instance = self.nova.servers.get(self.server.id)
instance = self.nova.servers.get(instance.id)
# Confirm the resize
self.nova.servers.confirm_resize(instance)
while instance.status != 'ACTIVE':
time.sleep(5)
instance = self.nova.servers.get(self.server.id)
instance = self.nova.servers.get(instance.id)
ec2_instance = self.ec2_conn.get_only_instances(instance_ids=[self.server.metadata['ec2_id']], filters=None,
ec2_instance = self.ec2_conn.get_only_instances(instance_ids=[instance.metadata['ec2_id']], filters=None,
dry_run=False, max_results=None)[0]
self.assertEqual(ec2_instance.instance_type, "t2.small")
@classmethod
def tearDown(self):
if self.server is not None:
print "Cleanup: Destroying the instance used for testing"
self.server.delete()
print "Cleanup: Destroying the instance used for testing"
for instance in self.servers:
instance.delete()
if __name__ == '__main__':
unittest.main()
unittest.main()