api/ec2: an unit test for create image

unit test for ec2 create image.
This is incomplete as there is no unit test for register image.
This commit is contained in:
Isaku Yamahata
2011-06-22 12:55:14 +09:00
parent 9530583667
commit cb6ca62a73

View File

@@ -294,6 +294,24 @@ class CloudTestCase(test.TestCase):
db.service_destroy(self.context, comp1['id'])
db.service_destroy(self.context, comp2['id'])
def _block_device_mapping_create(self, instance_id, mappings):
volumes = []
for bdm in mappings:
db.block_device_mapping_create(self.context, bdm)
if 'volume_id' in bdm:
values = {'id': bdm['volume_id']}
for bdm_key, vol_key in [('snapshot_id', 'snapshot_id'),
('snapshot_size', 'volume_size'),
('delete_on_termination',
'delete_on_termination')]:
if bdm_key in bdm:
values[vol_key] = bdm[bdm_key]
vol = db.volume_create(self.context, values)
db.volume_attached(self.context, vol['id'],
instance_id, bdm['device_name'])
volumes.append(vol)
return volumes
def _assertInstance(self, instance_id):
ec2_instance_id = ec2utils.id_to_ec2_id(instance_id)
result = self.cloud.describe_instances(self.context,
@@ -334,7 +352,8 @@ class CloudTestCase(test.TestCase):
'volume_id': '2'},
{'instance_id': instance_id,
'device_name': '/dev/sdb2',
'volume_id': '3'},
'volume_id': '3',
'volume_size': 1},
{'instance_id': instance_id,
'device_name': '/dev/sdb3',
'delete_on_termination': True,
@@ -365,21 +384,7 @@ class CloudTestCase(test.TestCase):
'device_name': '/dev/sdb9',
'virtual_name': 'ephemeral3'}]
volumes = []
for bdm in mappings:
db.block_device_mapping_create(self.context, bdm)
if bdm.get('volume_id'):
values = {'volume_id': bdm['volume_id']}
for bdm_key, vol_key in [('snapshot_id', 'snapshot_id'),
('snapshot_size', 'volume_size'),
('delete_on_termination',
'delete_on_termination')]:
if bdm.get(bdm_key):
values[vol_key] = bdm[bdm_key]
vol = db.volume_create(self.context, values)
db.volume_attached(self.context, vol['id'],
instance_id, bdm['device_name'])
volumes.append(vol)
volumes = self._block_device_mapping_create(instance_id, mappings)
ec2_instance_id, result = self._assertInstance(instance_id)
expected_result = {'instanceId': ec2_instance_id,
@@ -472,7 +477,7 @@ class CloudTestCase(test.TestCase):
if d1[key] == d2[key]:
self.assertDictMatch(d1, d2)
def _setUpImageSet(self):
def _setUpImageSet(self, create_volumes_and_snapshots=False):
mappings1 = [
{'device': '/dev/sda1', 'virtual': 'root'},
@@ -502,6 +507,7 @@ class CloudTestCase(test.TestCase):
'properties': {
'kernel_id': 1,
'type': 'machine',
'image_state': 'available',
'mappings': mappings1,
'block_device_mapping': block_device_mapping1,
}
@@ -531,6 +537,22 @@ class CloudTestCase(test.TestCase):
self.stubs.Set(fake._FakeImageService, 'show', fake_show)
self.stubs.Set(fake._FakeImageService, 'detail', fake_detail)
volumes = []
snapshots = []
if create_volumes_and_snapshots:
for bdm in block_device_mapping1:
if 'volume_id' in bdm:
vol = self._volume_create(bdm['volume_id'])
volumes.append(vol['id'])
if 'snapshot_id' in bdm:
snap = db.snapshot_create(self.context,
{'id': bdm['snapshot_id'],
'volume_id': 76543210,
'status': "available",
'volume_size': 1})
snapshots.append(snap['id'])
return (volumes, snapshots)
def _assertImageSet(self, result, root_device_type, root_device_name):
self.assertEqual(1, len(result['imagesSet']))
result = result['imagesSet'][0]
@@ -951,11 +973,13 @@ class CloudTestCase(test.TestCase):
self._restart_compute_service()
def _volume_create(self):
def _volume_create(self, volume_id=None):
kwargs = {'status': 'available',
'host': self.volume.host,
'size': 1,
'attach_status': 'detached', }
if volume_id:
kwargs['id'] = volume_id
return db.volume_create(self.context, kwargs)
def _assert_volume_attached(self, vol, instance_id, mountpoint):
@@ -1175,3 +1199,33 @@ class CloudTestCase(test.TestCase):
self.cloud.delete_snapshot(self.context, snapshot_id)
greenthread.sleep(0.3)
db.volume_destroy(self.context, vol['id'])
def test_create_image(self):
"""Make sure that CreateImage works"""
# enforce periodic tasks run in short time to avoid wait for 60s.
self._restart_compute_service(periodic_interval=0.3)
(volumes, snapshots) = self._setUpImageSet(
create_volumes_and_snapshots=True)
kwargs = {'image_id': 'ami-1',
'instance_type': FLAGS.default_instance_type,
'max_count': 1}
ec2_instance_id = self._run_instance_wait(**kwargs)
# TODO(yamahata): s3._s3_create() can't be tested easily by unit test
# as there is no unit test for s3.create()
## result = self.cloud.create_image(self.context, ec2_instance_id,
## no_reboot=True)
## ec2_image_id = result['imageId']
## created_image = self.cloud.describe_images(self.context,
## [ec2_image_id])
self.cloud.terminate_instances(self.context, [ec2_instance_id])
for vol in volumes:
db.volume_destroy(self.context, vol)
for snap in snapshots:
db.snapshot_destroy(self.context, snap)
# TODO(yamahata): clean up snapshot created by CreateImage.
self._restart_compute_service()