Create an image in a separate thread

Creating of an image by a running instance stops the instance. This
takes a time, and a client can resend the request due to timeout. The
resended request fails because instance is already not in appropriate
state (it's stoping, stopped, or scheduling at the moment).

Thus ec2api should make such images asynchronously. It should create a
DB item to reserve the image id, return it to a caller, and then stop
the instance and do other things. While image is asynchronously created
and no Glance image exists, ec2api should return the image in responses
for describe operation. Describe operation should return the image with
and without selective filter by the image id passed in.

Also it should consider a race when asynchronous task has not finished,
but Glance image is already created, and another client describe images.
In that case ec2api should not create a new db item, but reuse existed
one.

If the creation failed, ec2api should expose the image as failed and
allow to delete it.

Unit tests for auto_update_db and handle_unpaired_item will be added in
a separate review, because some refactoring is required for that.

Change-Id: Iba4ade5000bea4af35d97abd1e708bbbc57f372a
This commit is contained in:
Feodor Tersin 2015-09-07 16:46:44 +03:00
parent 4cd4f28870
commit 90cbc6030f
5 changed files with 210 additions and 48 deletions

View File

@ -376,6 +376,9 @@ class UniversalDescriber(object):
return formatted_items
def handle_unpaired_item(self, item, formatted_items):
self.delete_obsolete_item(item)
def describe(self, context, ids=None, names=None, filter=None,
max_results=None, next_token=None):
if max_results and max_results < 5:
@ -418,7 +421,7 @@ class UniversalDescriber(object):
# NOTE(Alex): delete obsolete items
for item in self.items:
if item['id'] not in paired_items_ids:
self.delete_obsolete_item(item)
self.handle_unpaired_item(item, formatted_items)
# NOTE(Alex): some requested items are not found
if self.ids or self.names:
params = {'id': next(iter(self.ids or self.names))}

View File

@ -360,6 +360,8 @@ def get_os_image(context, ec2_image_id):
if not ids:
raise exception.InvalidAMIIDNotFound(id=ec2_image_id)
_id, os_id = ids[0]
if not os_id:
return None
glance = clients.glance(context)
try:
return glance.images.get(os_id)

View File

@ -37,7 +37,7 @@ from ec2api.api import ec2utils
from ec2api.api import instance as instance_api
from ec2api.db import api as db_api
from ec2api import exception
from ec2api.i18n import _, _LE, _LI
from ec2api.i18n import _, _LE, _LI, _LW
LOG = logging.getLogger(__name__)
@ -127,40 +127,70 @@ def create_image(context, instance_id, name=None, description=None,
raise exception.IncorrectState(reason=msg)
restart_instance = True
os_instance.stop()
# wait instance for really stopped
start_time = time.time()
while os_instance.status != 'SHUTOFF':
time.sleep(1)
os_instance.get()
# NOTE(yamahata): timeout and error. 1 hour for now for safety.
# Is it too short/long?
# Or is there any better way?
timeout = 1 * 60 * 60
if time.time() > start_time + timeout:
err = _("Couldn't stop instance within %d sec") % timeout
raise exception.EC2Exception(message=err)
# meaningful image name
name_map = dict(instance=instance['os_id'], now=timeutils.isotime())
name = name or _('image of %(instance)s at %(now)s') % name_map
glance = clients.glance(context)
with common.OnCrashCleaner() as cleaner:
os_image_id = os_instance.create_image(name)
cleaner.addCleanup(glance.images.delete, os_image_id)
# TODO(andrey-mp): snapshot and volume also must be deleted in case
# of error
os_image = glance.images.get(os_image_id)
image = db_api.add_item(context, _get_os_image_kind(os_image),
{'os_id': os_image_id,
'is_public': False,
'description': description})
def delayed_create(context, image, name, os_instance):
try:
os_instance.stop()
# wait instance for really stopped
start_time = time.time()
while os_instance.status != 'SHUTOFF':
time.sleep(1)
os_instance.get()
# NOTE(yamahata): timeout and error. 1 hour for now for safety.
# Is it too short/long?
# Or is there any better way?
timeout = 1 * 60 * 60
if time.time() > start_time + timeout:
err = (_("Couldn't stop instance within %d sec") % timeout)
raise exception.EC2Exception(message=err)
os_image_id = os_instance.create_image(
name, metadata={'ec2_id': image['id']})
image['os_id'] = os_image_id
db_api.update_item(context, image)
except Exception:
LOG.exception(_LE('Failed to complete image %s creation'),
image.id)
try:
image['state'] = 'failed'
db_api.update_item(context, image)
except Exception:
LOG.warning(_LW("Couldn't set 'failed' state for db image %s"),
image.id, exc_info=True)
try:
os_instance.start()
except Exception:
LOG.warning(_LW('Failed to start instance %(i_id)s after '
'completed creation of image %(image_id)s'),
{'i_id': instance['id'],
'image_id': image['id']},
exc_info=True)
image = {'is_public': False,
'description': description}
if restart_instance:
os_instance.start()
# NOTE(ft): image type is hardcoded, because we don't know it now,
# but cannot change it later. But Nova doesn't specify container format
# for snapshots of volume backed instances, so that it is 'ami' in fact
image = db_api.add_item(context, 'ami', image)
eventlet.spawn_n(delayed_create, context, image, name, os_instance)
else:
glance = clients.glance(context)
with common.OnCrashCleaner() as cleaner:
os_image_id = os_instance.create_image(name)
cleaner.addCleanup(glance.images.delete, os_image_id)
# TODO(andrey-mp): snapshot and volume also must be deleted in case
# of error
os_image = glance.images.get(os_image_id)
image['os_id'] = os_image_id
image = db_api.add_item(context, _get_os_image_kind(os_image),
image)
return {'imageId': image['id']}
@ -244,13 +274,20 @@ def register_image(context, name=None, image_location=None,
def deregister_image(context, image_id):
os_image = ec2utils.get_os_image(context, image_id)
_check_owner(context, os_image)
if not os_image:
image = db_api.get_item_by_id(context, image_id)
if image.get('state') != 'failed':
# TODO(ft): figure out corresponding AWS error
raise exception.IncorrectState(
reason='Image is still being created')
else:
_check_owner(context, os_image)
glance = clients.glance(context)
try:
glance.images.delete(os_image.id)
except glance_exception.HTTPNotFound:
pass
glance = clients.glance(context)
try:
glance.images.delete(os_image.id)
except glance_exception.HTTPNotFound:
pass
db_api.delete_item(context, image_id)
return True
@ -315,6 +352,8 @@ class ImageDescriber(common.TaggableItemsDescriber):
if len(images_ids) < len(self.ids):
missed_ids = self.ids - images_ids
raise exception.InvalidAMIIDNotFound(id=next(iter(missed_ids)))
self.pending_images = {i['id']: i for i in local_images
if not i['os_id']}
self.snapshot_ids = dict(
(s['os_id'], s['id'])
for s in db_api.get_items(self.context, 'snap'))
@ -323,15 +362,29 @@ class ImageDescriber(common.TaggableItemsDescriber):
return images
def get_os_items(self):
return clients.glance(self.context).images.list()
os_images = list(clients.glance(self.context).images.list())
self.ec2_created_os_images = {
os_image.properties['ec2_id']: os_image
for os_image in os_images
if (os_image.properties.get('ec2_id') and
self.context.project_id == os_image.owner)}
return os_images
def auto_update_db(self, image, os_image):
if not image:
kind = _get_os_image_kind(os_image)
if self.context.project_id == os_image.owner:
image = ec2utils.get_db_item_by_os_id(
self.context, kind, os_image.id, self.items_dict,
os_image=os_image)
if os_image.properties.get('ec2_id') in self.pending_images:
# NOTE(ft): the image is being creating, Glance had created
# image, but creating thread doesn't yet update db item
image = self.pending_images[os_image.metadata['ec2_id']]
image['os_id'] = os_image.id
image['is_public'] = os_image.is_public
db_api.update_item(self.context, image)
else:
image = ec2utils.get_db_item_by_os_id(
self.context, kind, os_image.id, self.items_dict,
os_image=os_image)
else:
image_id = ec2utils.os_id_to_ec2_id(
self.context, kind, os_image.id,
@ -362,6 +415,37 @@ class ImageDescriber(common.TaggableItemsDescriber):
def get_tags(self):
return db_api.get_tags(self.context, ('ami', 'ari', 'aki'), self.ids)
def handle_unpaired_item(self, item, formatted_items):
if item['os_id']:
super(ImageDescriber, self).handle_unpaired_item(item,
formatted_items)
else:
# NOTE(ft): process creating images, ignoring ids mapping
if 'is_public' in item:
# NOTE(ft): the image is being creating, Glance had created
# image, but creating thread doesn't yet update db item
os_image = self.ec2_created_os_images.get(item['id'])
if os_image:
item['os_id'] = os_image.id
item['is_public'] = os_image.is_public
db_api.update_item(self.context, item)
image = self.format(item, os_image)
else:
# NOTE(ft): Glance image is yet not created, but DB item
# exists. So that we adds EC2 image to output results
# with all data we have.
# TODO(ft): check if euca2ools can process such result
image = {'imageId': item['id'],
'imageOwnerId': self.context.project_id,
'imageType': IMAGE_TYPES[
ec2utils.get_ec2_id_kind(item['id'])],
'isPublic': item['is_public']}
if 'description' in item:
image['description'] = item['description']
image['imageState'] = item.get('state', 'pending')
formatted_items.append(image)
self.ids.remove(item['id'])
def describe_images(context, executable_by=None, image_id=None,
owner=None, filter=None):
@ -420,6 +504,10 @@ def describe_image_attribute(context, image_id, attribute):
raise exception.InvalidRequest()
os_image = ec2utils.get_os_image(context, image_id)
if not os_image:
# TODO(ft): figure out corresponding AWS error
raise exception.IncorrectState(
reason='Image is still being created or failed')
_check_owner(context, os_image)
image = ec2utils.get_db_item(context, image_id)
@ -433,6 +521,10 @@ def modify_image_attribute(context, image_id, attribute=None,
description=None, launch_permission=None,
product_code=None, user_id=None, value=None):
os_image = ec2utils.get_os_image(context, image_id)
if not os_image:
# TODO(ft): figure out corresponding AWS error
raise exception.IncorrectState(
reason='Image is still being created or failed')
attributes = set()
@ -539,7 +631,10 @@ def _format_image(context, image, os_image, images_dict, ids_dict,
}
if 'description' in image:
ec2_image['description'] = image['description']
state = GLANCE_STATUS_TO_EC2.get(os_image.status, 'error')
if 'state' in image:
state = image['state']
else:
state = GLANCE_STATUS_TO_EC2.get(os_image.status, 'error')
if state in ('available', 'pending'):
state = _s3_image_state_map.get(os_image.properties.get('image_state'),
state)

View File

@ -265,6 +265,11 @@ class EC2UtilsTestCase(testtools.TestCase):
ec2utils.get_os_image,
fake_context, fakes.random_ec2_id('ami'))
# check case of creating image
db_api.get_items_ids.return_value = [(fakes.ID_EC2_IMAGE_1, None)]
self.assertIsNone(ec2utils.get_os_image(fake_context,
fakes.ID_EC2_IMAGE_1))
@mock.patch('neutronclient.v2_0.client.Client')
def test_get_os_public_network(self, neutron):
neutron = neutron.return_value

View File

@ -106,9 +106,11 @@ class ImageTestCase(base.ApiTestCase):
'SHUTOFF')
if next(stop_called) else None)
image_id = fakes.random_ec2_id('ami')
os_instance.create_image.return_value = image_id
self.glance.images.get.return_value = fakes.OSImage({'id': image_id},
from_get=True)
os_image_id = fakes.random_os_id()
os_instance.create_image.return_value = os_image_id
self.glance.images.get.return_value = fakes.OSImage(
{'id': os_image_id},
from_get=True)
self.nova.servers.get.return_value = os_instance
is_ebs_instance.return_value = True
self.db_api.add_item.side_effect = tools.get_db_api_add_item(image_id)
@ -124,15 +126,30 @@ class ImageTestCase(base.ApiTestCase):
mock.ANY, fakes.ID_EC2_INSTANCE_2)
self.nova.servers.get.assert_called_once_with(fakes.ID_OS_INSTANCE_2)
is_ebs_instance.assert_called_once_with(mock.ANY, os_instance.id)
expected_image = {'is_public': False,
'description': 'fake desc'}
if no_reboot:
expected_image['os_id'] = os_image_id
self.db_api.add_item.assert_called_once_with(
mock.ANY, 'ami', {'os_id': image_id,
'is_public': False,
'description': 'fake desc'})
mock.ANY, 'ami', expected_image)
if not no_reboot:
eventlet.sleep()
if not no_reboot:
os_instance.stop.assert_called_once_with()
os_instance.get.assert_called_once_with()
os_instance.start.assert_called_once_with()
if no_reboot:
os_instance.create_image.assert_called_once_with('fake_name')
else:
os_instance.create_image.assert_called_once_with(
'fake_name', metadata={'ec2_id': image_id})
self.db_api.update_item.assert_called_once_with(
mock.ANY, {'id': image_id,
'is_public': False,
'description': 'fake desc',
'os_id': os_image_id,
'vpc_id': None})
self.db_api.reset_mock()
self.nova.servers.reset_mock()
@ -276,6 +293,7 @@ class ImageTestCase(base.ApiTestCase):
def test_deregister_image(self):
self._setup_model()
# normal flow
resp = self.execute('DeregisterImage',
{'ImageId': fakes.ID_EC2_IMAGE_1})
self.assertThat(resp, matchers.DictMatches({'return': True}))
@ -284,12 +302,32 @@ class ImageTestCase(base.ApiTestCase):
self.glance.images.delete.assert_called_once_with(
fakes.ID_OS_IMAGE_1)
# deregister image which failed on asynchronously creation
self.glance.reset_mock()
image_id = fakes.random_ec2_id('ami')
self.add_mock_db_items({'id': image_id,
'os_id': None,
'state': 'failed'})
resp = self.execute('DeregisterImage',
{'ImageId': image_id})
self.assertThat(resp, matchers.DictMatches({'return': True}))
self.db_api.delete_item.assert_called_with(mock.ANY, image_id)
self.assertFalse(self.glance.images.delete.called)
def test_deregister_image_invalid_parameters(self):
self._setup_model()
self.assert_execution_error('InvalidAMIID.NotFound', 'DeregisterImage',
{'ImageId': fakes.random_ec2_id('ami')})
# deregister asynchronously creating image
image_id = fakes.random_ec2_id('ami')
self.add_mock_db_items({'id': image_id,
'os_id': None})
self.assert_execution_error('IncorrectState',
'DeregisterImage',
{'ImageId': image_id})
def test_describe_images(self):
self._setup_model()
@ -393,6 +431,15 @@ class ImageTestCase(base.ApiTestCase):
{'blockDeviceMapping': (
fakes.EC2_IMAGE_2['blockDeviceMapping'])})
def test_describe_image_attributes_invalid_parameters(self):
image_id = fakes.random_ec2_id('ami')
self.set_mock_db_items({'id': image_id,
'os_id': None})
self.assert_execution_error('IncorrectState',
'DescribeImageAttribute',
{'ImageId': image_id,
'Attribute': 'kernel'})
@mock.patch.object(fakes.OSImage, 'update', autospec=True)
def test_modify_image_attributes(self, osimage_update):
self._setup_model()
@ -408,6 +455,15 @@ class ImageTestCase(base.ApiTestCase):
self.assertEqual(fakes.ID_OS_IMAGE_1,
osimage_update.call_args[0][0].id)
def test_modify_image_attributes_invalid_parameters(self):
image_id = fakes.random_ec2_id('ami')
self.set_mock_db_items({'id': image_id,
'os_id': None})
self.assert_execution_error('IncorrectState',
'ModifyImageAttribute',
{'ImageId': image_id,
'Attribute': 'kernel'})
def _setup_model(self):
self.set_mock_db_items(fakes.DB_IMAGE_1, fakes.DB_IMAGE_2,
fakes.DB_SNAPSHOT_1, fakes.DB_SNAPSHOT_2,
@ -415,9 +471,10 @@ class ImageTestCase(base.ApiTestCase):
fakes.DB_VOLUME_1, fakes. DB_VOLUME_2)
self.db_api.get_public_items.return_value = []
# NOTE(ft): glance.image.list returns an iterator, not just a list
self.glance.images.list.side_effect = (
lambda: [fakes.OSImage(fakes.OS_IMAGE_1),
fakes.OSImage(fakes.OS_IMAGE_2)])
lambda: (fakes.OSImage(i)
for i in (fakes.OS_IMAGE_1, fakes.OS_IMAGE_2)))
self.glance.images.get.side_effect = (
lambda os_id: (fakes.OSImage(fakes.OS_IMAGE_1, from_get=True)
if os_id == fakes.ID_OS_IMAGE_1 else