Adds guard against upload contention
Protect against concurrent image uploads by allowing the initial request to proceed and succeed and subsequent concurrent requests to fail. We now allow the upload activation to specify what state it expects to transition to 'active' from. If by the time the db is updated the transition has already occured i.e. another upload has succeeded, the upload will fail and data will be cleaned up. Fixes: bug 1241025 Change-Id: Ie532b61484bec660910fb9a37429397bde8ef11f
This commit is contained in:
parent
448ef4a899
commit
51ecc8016a
|
@ -594,7 +594,8 @@ class Controller(controller.BaseController):
|
|||
|
||||
return location, loc_meta
|
||||
|
||||
def _activate(self, req, image_id, location, location_metadata=None):
|
||||
def _activate(self, req, image_id, location, location_metadata=None,
|
||||
from_state=None):
|
||||
"""
|
||||
Sets the image status to `active` and the image's location
|
||||
attribute.
|
||||
|
@ -612,12 +613,24 @@ class Controller(controller.BaseController):
|
|||
'metadata': location_metadata}]
|
||||
|
||||
try:
|
||||
s = from_state
|
||||
image_meta_data = registry.update_image_metadata(req.context,
|
||||
image_id,
|
||||
image_meta)
|
||||
image_meta,
|
||||
from_state=s)
|
||||
self.notifier.info("image.activate", redact_loc(image_meta_data))
|
||||
self.notifier.info("image.update", redact_loc(image_meta_data))
|
||||
return image_meta_data
|
||||
except exception.Duplicate:
|
||||
# Delete image data since it has been supersceded by another
|
||||
# upload.
|
||||
LOG.debug("duplicate operation - deleting image data for %s "
|
||||
"(location:%s)" %
|
||||
(image_id, image_meta['location']))
|
||||
upload_utils.initiate_deletion(req, image_meta['location'],
|
||||
image_id, CONF.delayed_delete)
|
||||
# Then propagate the exception.
|
||||
raise
|
||||
except exception.Invalid as e:
|
||||
msg = _("Failed to activate image. Got error: %(e)s") % {'e': e}
|
||||
LOG.debug(msg)
|
||||
|
@ -645,7 +658,8 @@ class Controller(controller.BaseController):
|
|||
return self._activate(req,
|
||||
image_id,
|
||||
location,
|
||||
location_metadata) if location else None
|
||||
location_metadata,
|
||||
from_state='saving') if location else None
|
||||
|
||||
def _get_size(self, context, image_meta, location):
|
||||
# retrieve the image size from remote store (if not provided)
|
||||
|
@ -918,6 +932,11 @@ class Controller(controller.BaseController):
|
|||
raise HTTPForbidden(explanation=msg,
|
||||
request=req,
|
||||
content_type="text/plain")
|
||||
except (exception.Conflict, exception.Duplicate) as e:
|
||||
LOG.info(unicode(e))
|
||||
raise HTTPConflict(body='Image operation conflicts',
|
||||
request=req,
|
||||
content_type='text/plain')
|
||||
else:
|
||||
self.notifier.info('image.update', redact_loc(image_meta))
|
||||
|
||||
|
|
|
@ -166,7 +166,9 @@ def upload_data_to_store(req, image_meta, image_data, store, notifier):
|
|||
except exception.Duplicate as e:
|
||||
msg = _("Attempt to upload duplicate image: %s") % e
|
||||
LOG.debug(msg)
|
||||
safe_kill(req, image_id)
|
||||
# NOTE(dosaboy): do not delete the image since it is likely that this
|
||||
# conflict is a result of another concurrent upload that will be
|
||||
# successful.
|
||||
notifier.error('image.upload', msg)
|
||||
raise webob.exc.HTTPConflict(explanation=msg,
|
||||
request=req,
|
||||
|
|
|
@ -78,6 +78,11 @@ class Duplicate(GlanceException):
|
|||
message = _("An object with the same identifier already exists.")
|
||||
|
||||
|
||||
class Conflict(GlanceException):
|
||||
message = _("An object with the same identifier is currently being "
|
||||
"operated on.")
|
||||
|
||||
|
||||
class StorageFull(GlanceException):
|
||||
message = _("There is not enough disk space on the image storage media.")
|
||||
|
||||
|
|
|
@ -67,7 +67,7 @@ def image_create(client, values):
|
|||
|
||||
|
||||
@_get_client
|
||||
def image_update(client, image_id, values, purge_props=False):
|
||||
def image_update(client, image_id, values, purge_props=False, from_state=None):
|
||||
"""
|
||||
Set the given properties on an image and update it.
|
||||
|
||||
|
@ -75,7 +75,7 @@ def image_update(client, image_id, values, purge_props=False):
|
|||
"""
|
||||
return client.image_update(values=values,
|
||||
image_id=image_id,
|
||||
purge_props=purge_props)
|
||||
purge_props=purge_props, from_state=from_state)
|
||||
|
||||
|
||||
@_get_client
|
||||
|
|
|
@ -542,7 +542,8 @@ def image_create(context, image_values):
|
|||
|
||||
|
||||
@log_call
|
||||
def image_update(context, image_id, image_values, purge_props=False):
|
||||
def image_update(context, image_id, image_values, purge_props=False,
|
||||
from_state=None):
|
||||
global DATA
|
||||
try:
|
||||
image = DATA['images'][image_id]
|
||||
|
|
|
@ -258,16 +258,18 @@ def _wrap_db_error(f):
|
|||
|
||||
def image_create(context, values):
|
||||
"""Create an image from the values dictionary."""
|
||||
return _image_update(context, values, None, False)
|
||||
return _image_update(context, values, None, purge_props=False)
|
||||
|
||||
|
||||
def image_update(context, image_id, values, purge_props=False):
|
||||
def image_update(context, image_id, values, purge_props=False,
|
||||
from_state=None):
|
||||
"""
|
||||
Set the given properties on an image and update it.
|
||||
|
||||
:raises NotFound if image does not exist.
|
||||
"""
|
||||
return _image_update(context, values, image_id, purge_props)
|
||||
return _image_update(context, values, image_id, purge_props,
|
||||
from_state=from_state)
|
||||
|
||||
|
||||
def image_destroy(context, image_id):
|
||||
|
@ -743,7 +745,8 @@ def _update_values(image_ref, values):
|
|||
setattr(image_ref, k, values[k])
|
||||
|
||||
|
||||
def _image_update(context, values, image_id, purge_props=False):
|
||||
def _image_update(context, values, image_id, purge_props=False,
|
||||
from_state=None):
|
||||
"""
|
||||
Used internally by image_create and image_update
|
||||
|
||||
|
@ -768,9 +771,10 @@ def _image_update(context, values, image_id, purge_props=False):
|
|||
|
||||
location_data = values.pop('locations', None)
|
||||
|
||||
new_status = values.get('status', None)
|
||||
if image_id:
|
||||
image_ref = _image_get(context, image_id, session=session)
|
||||
|
||||
current = image_ref.status
|
||||
# Perform authorization check
|
||||
_check_mutate_authorization(context, image_ref)
|
||||
else:
|
||||
|
@ -797,20 +801,49 @@ def _image_update(context, values, image_id, purge_props=False):
|
|||
#NOTE(iccha-sethi): updated_at must be explicitly set in case
|
||||
# only ImageProperty table was modifited
|
||||
values['updated_at'] = timeutils.utcnow()
|
||||
image_ref.update(values)
|
||||
|
||||
# Validate the attributes before we go any further. From my
|
||||
# investigation, the @validates decorator does not validate
|
||||
# on new records, only on existing records, which is, well,
|
||||
# idiotic.
|
||||
values = _validate_image(image_ref.to_dict())
|
||||
_update_values(image_ref, values)
|
||||
if image_id:
|
||||
query = session.query(models.Image).filter_by(id=image_id)
|
||||
if from_state:
|
||||
query = query.filter_by(status=from_state)
|
||||
|
||||
try:
|
||||
image_ref.save(session=session)
|
||||
except sqlalchemy.exc.IntegrityError:
|
||||
raise exception.Duplicate("Image ID %s already exists!"
|
||||
% values['id'])
|
||||
if new_status:
|
||||
_validate_image(values)
|
||||
|
||||
# Validate fields for Images table. This is similar to what is done
|
||||
# for the query result update except that we need to do it prior
|
||||
# in this case.
|
||||
# TODO(dosaboy): replace this with a dict comprehension once py26
|
||||
# support is deprecated.
|
||||
keys = values.keys()
|
||||
for k in keys:
|
||||
if k not in image_ref.to_dict():
|
||||
del values[k]
|
||||
updated = query.update(values, synchronize_session='fetch')
|
||||
|
||||
if not updated:
|
||||
msg = (_('cannot transition from %(current)s to '
|
||||
'%(next)s in update (wanted '
|
||||
'from_state=%(from)s)') %
|
||||
{'current': current, 'next': new_status,
|
||||
'from': from_state})
|
||||
raise exception.Conflict(msg)
|
||||
|
||||
image_ref = _image_get(context, image_id, session=session)
|
||||
else:
|
||||
image_ref.update(values)
|
||||
# Validate the attributes before we go any further. From my
|
||||
# investigation, the @validates decorator does not validate
|
||||
# on new records, only on existing records, which is, well,
|
||||
# idiotic.
|
||||
values = _validate_image(image_ref.to_dict())
|
||||
_update_values(image_ref, values)
|
||||
|
||||
try:
|
||||
image_ref.save(session=session)
|
||||
except sqlalchemy.exc.IntegrityError:
|
||||
raise exception.Duplicate("Image ID %s already exists!"
|
||||
% values['id'])
|
||||
|
||||
_set_properties_for_image(context, image_ref, properties, purge_props,
|
||||
session)
|
||||
|
|
|
@ -413,6 +413,7 @@ class Controller(object):
|
|||
:retval Returns the updated image information as a mapping,
|
||||
"""
|
||||
image_data = body['image']
|
||||
from_state = body.get('from_state', None)
|
||||
|
||||
# Prohibit modification of 'owner'
|
||||
if not req.context.is_admin and 'owner' in image_data:
|
||||
|
@ -428,11 +429,15 @@ class Controller(object):
|
|||
'image_data': image_data})
|
||||
image_data = _normalize_image_location_for_db(image_data)
|
||||
if purge_props == "true":
|
||||
updated_image = self.db_api.image_update(req.context, id,
|
||||
image_data, True)
|
||||
purge_props = True
|
||||
else:
|
||||
updated_image = self.db_api.image_update(req.context, id,
|
||||
image_data)
|
||||
purge_props = False
|
||||
|
||||
updated_image = self.db_api.image_update(req.context, id,
|
||||
image_data,
|
||||
purge_props=purge_props,
|
||||
from_state=from_state)
|
||||
|
||||
msg = _("Updating metadata for image %(id)s")
|
||||
LOG.info(msg % {'id': id})
|
||||
return dict(image=make_image_dict(updated_image))
|
||||
|
@ -459,6 +464,11 @@ class Controller(object):
|
|||
raise exc.HTTPNotFound(body='Image not found',
|
||||
request=req,
|
||||
content_type='text/plain')
|
||||
except exception.Conflict as e:
|
||||
LOG.info(unicode(e))
|
||||
raise exc.HTTPConflict(body='Image operation conflicts',
|
||||
request=req,
|
||||
content_type='text/plain')
|
||||
|
||||
|
||||
def _limit_locations(image):
|
||||
|
|
|
@ -164,10 +164,11 @@ def add_image_metadata(context, image_meta):
|
|||
|
||||
|
||||
def update_image_metadata(context, image_id, image_meta,
|
||||
purge_props=False):
|
||||
purge_props=False, from_state=None):
|
||||
LOG.debug(_("Updating image metadata for image %s..."), image_id)
|
||||
c = get_registry_client(context)
|
||||
return c.update_image(image_id, image_meta, purge_props)
|
||||
return c.update_image(image_id, image_meta, purge_props=purge_props,
|
||||
from_state=from_state)
|
||||
|
||||
|
||||
def delete_image_metadata(context, image_id):
|
||||
|
|
|
@ -169,7 +169,8 @@ class RegistryClient(BaseClient):
|
|||
image = data['image']
|
||||
return self.decrypt_metadata(image)
|
||||
|
||||
def update_image(self, image_id, image_metadata, purge_props=False):
|
||||
def update_image(self, image_id, image_metadata, purge_props=False,
|
||||
from_state=None):
|
||||
"""
|
||||
Updates Registry's information about an image
|
||||
"""
|
||||
|
@ -178,6 +179,7 @@ class RegistryClient(BaseClient):
|
|||
|
||||
encrypted_metadata = self.encrypt_metadata(image_metadata['image'])
|
||||
image_metadata['image'] = encrypted_metadata
|
||||
image_metadata['from_state'] = from_state
|
||||
body = json.dumps(image_metadata)
|
||||
|
||||
headers = {
|
||||
|
|
|
@ -20,6 +20,7 @@ import copy
|
|||
import datetime
|
||||
import hashlib
|
||||
import json
|
||||
import mock
|
||||
import StringIO
|
||||
|
||||
from oslo.config import cfg
|
||||
|
@ -30,6 +31,7 @@ import glance.api
|
|||
import glance.api.common
|
||||
from glance.api.v1 import images
|
||||
from glance.api.v1 import router
|
||||
from glance.api.v1 import upload_utils
|
||||
import glance.common.config
|
||||
import glance.context
|
||||
from glance.db.sqlalchemy import api as db_api
|
||||
|
@ -1144,6 +1146,110 @@ class TestGlanceAPI(base.IsolatedUnitTest):
|
|||
self.assertEqual(res.status_int, 200)
|
||||
self.assertEqual("pending_delete", res.headers['x-image-meta-status'])
|
||||
|
||||
def test_upload_to_image_status_saving(self):
|
||||
"""Test image upload conflict.
|
||||
|
||||
If an image is uploaded before an existing upload operation completes
|
||||
to the same image, the original image should succeed and the
|
||||
conflicting should fail and any data deleted.
|
||||
"""
|
||||
fixture_headers = {'x-image-meta-store': 'file',
|
||||
'x-image-meta-disk-format': 'vhd',
|
||||
'x-image-meta-container-format': 'ovf',
|
||||
'x-image-meta-name': 'some-foo-image'}
|
||||
|
||||
# create an image but don't upload yet.
|
||||
req = webob.Request.blank("/images")
|
||||
req.method = 'POST'
|
||||
for k, v in fixture_headers.iteritems():
|
||||
req.headers[k] = v
|
||||
|
||||
res = req.get_response(self.api)
|
||||
self.assertEquals(res.status_int, 201)
|
||||
res_body = json.loads(res.body)['image']
|
||||
|
||||
image_id = res_body['id']
|
||||
self.assertTrue('/images/%s' % image_id in res.headers['location'])
|
||||
|
||||
# verify the status is 'queued'
|
||||
self.assertEqual('queued', res_body['status'])
|
||||
|
||||
orig_get_image_metadata = registry.get_image_metadata
|
||||
orig_image_get = db_api._image_get
|
||||
orig_image_update = db_api._image_update
|
||||
|
||||
# this will be used to track what is called and their order.
|
||||
called = []
|
||||
# use this to determine if we are within a db session i.e. atomic
|
||||
# operation, that is setting our active state.
|
||||
test_status = {'activate_session_started': False}
|
||||
# We want first status check to be 'queued' so we get past the first
|
||||
# guard.
|
||||
test_status['queued_guard_passed'] = False
|
||||
|
||||
def mock_image_update(context, values, image_id, purge_props=False,
|
||||
from_state=None):
|
||||
if values.get('status', None) == 'active':
|
||||
# We only expect this state to be entered once.
|
||||
if test_status['activate_session_started']:
|
||||
raise Exception("target session already started")
|
||||
|
||||
test_status['activate_session_started'] = True
|
||||
called.append('update_active')
|
||||
|
||||
return orig_image_update(context, values, image_id,
|
||||
purge_props=purge_props,
|
||||
from_state=from_state)
|
||||
|
||||
def mock_image_get(*args, **kwargs):
|
||||
"""Force status to 'saving' if not within activate db session.
|
||||
|
||||
If we are in the activate db session we return 'active' which we
|
||||
then expect to cause exception.Conflict to be raised since this
|
||||
indicates that another upload has succeeded.
|
||||
"""
|
||||
image = orig_image_get(*args, **kwargs)
|
||||
if test_status['activate_session_started']:
|
||||
called.append('image_get_active')
|
||||
setattr(image, 'status', 'active')
|
||||
else:
|
||||
setattr(image, 'status', 'saving')
|
||||
|
||||
return image
|
||||
|
||||
def mock_get_image_metadata(*args, **kwargs):
|
||||
"""Force image status sequence.
|
||||
"""
|
||||
called.append('get_image_meta')
|
||||
meta = orig_get_image_metadata(*args, **kwargs)
|
||||
if not test_status['queued_guard_passed']:
|
||||
meta['status'] = 'queued'
|
||||
test_status['queued_guard_passed'] = True
|
||||
|
||||
return meta
|
||||
|
||||
req = webob.Request.blank("/images/%s" % image_id)
|
||||
req.method = 'PUT'
|
||||
req.headers['Content-Type'] = \
|
||||
'application/octet-stream'
|
||||
req.body = "chunk00000remainder"
|
||||
|
||||
mpo = mock.patch.object
|
||||
with mpo(upload_utils, 'initiate_deletion') as mock_initiate_deletion:
|
||||
with mpo(registry, 'get_image_metadata', mock_get_image_metadata):
|
||||
with mpo(db_api, '_image_get', mock_image_get):
|
||||
with mpo(db_api, '_image_update', mock_image_update):
|
||||
res = req.get_response(self.api)
|
||||
self.assertEquals(res.status_int, 409)
|
||||
|
||||
# Check expected call sequence
|
||||
self.assertEqual(['get_image_meta', 'get_image_meta',
|
||||
'update_active', 'image_get_active'],
|
||||
called)
|
||||
|
||||
# Ensure cleanup occured.
|
||||
self.assertTrue(mock_initiate_deletion.called)
|
||||
|
||||
def test_register_and_upload(self):
|
||||
"""
|
||||
Test that the process of registering an image with
|
||||
|
|
|
@ -225,7 +225,8 @@ class TestUploadUtils(base.StoreClearingUnitTest):
|
|||
|
||||
def _test_upload_data_to_store_exception_with_notify(self,
|
||||
exc_class,
|
||||
expected_class):
|
||||
expected_class,
|
||||
image_killed=True):
|
||||
req = unit_test_utils.get_fake_request()
|
||||
|
||||
location = "file://foo/bar"
|
||||
|
@ -242,8 +243,9 @@ class TestUploadUtils(base.StoreClearingUnitTest):
|
|||
mox.IgnoreArg(),
|
||||
image_meta['size']).AndRaise(exc_class)
|
||||
|
||||
self.mox.StubOutWithMock(upload_utils, "safe_kill")
|
||||
upload_utils.safe_kill(req, image_meta['id'])
|
||||
if image_killed:
|
||||
self.mox.StubOutWithMock(upload_utils, "safe_kill")
|
||||
upload_utils.safe_kill(req, image_meta['id'])
|
||||
|
||||
notifier = self.mox.CreateMockAnything()
|
||||
notifier.error('image.upload', mox.IgnoreArg())
|
||||
|
@ -256,9 +258,13 @@ class TestUploadUtils(base.StoreClearingUnitTest):
|
|||
self.mox.VerifyAll()
|
||||
|
||||
def test_upload_data_to_store_duplicate(self):
|
||||
"""See note in glance.api.v1.upload_utils on why we don't want image to
|
||||
be deleted in this case.
|
||||
"""
|
||||
self._test_upload_data_to_store_exception_with_notify(
|
||||
exception.Duplicate,
|
||||
webob.exc.HTTPConflict)
|
||||
webob.exc.HTTPConflict,
|
||||
image_killed=False)
|
||||
|
||||
def test_upload_data_to_store_forbidden(self):
|
||||
self._test_upload_data_to_store_exception_with_notify(
|
||||
|
|
|
@ -444,7 +444,8 @@ class TestRegistryV2Client(base.IsolatedUnitTest,
|
|||
def test_image_update(self):
|
||||
"""Tests that the registry API updates the image"""
|
||||
fixture = {'name': 'fake public image #2',
|
||||
'disk_format': 'vmdk'}
|
||||
'disk_format': 'vmdk',
|
||||
'status': 'saving'}
|
||||
|
||||
self.assertTrue(self.client.image_update(image_id=UUID2,
|
||||
values=fixture))
|
||||
|
@ -455,7 +456,36 @@ class TestRegistryV2Client(base.IsolatedUnitTest,
|
|||
for k, v in fixture.items():
|
||||
self.assertEqual(v, data[k])
|
||||
|
||||
def test_image_update_not_existing(self):
|
||||
def test_image_update_conflict(self):
|
||||
"""Tests that the registry API updates the image"""
|
||||
next_state = 'saving'
|
||||
fixture = {'name': 'fake public image #2',
|
||||
'disk_format': 'vmdk',
|
||||
'status': next_state}
|
||||
|
||||
image = self.client.image_get(image_id=UUID2)
|
||||
current = image['status']
|
||||
self.assertEqual(current, 'active')
|
||||
|
||||
# image is in 'active' state so this should cause a failure.
|
||||
from_state = 'saving'
|
||||
|
||||
self.assertRaises(exception.Conflict, self.client.image_update,
|
||||
image_id=UUID2, values=fixture,
|
||||
from_state=from_state)
|
||||
|
||||
try:
|
||||
self.client.image_update(image_id=UUID2, values=fixture,
|
||||
from_state=from_state)
|
||||
except exception.Conflict as exc:
|
||||
msg = (_('cannot transition from %(current)s to '
|
||||
'%(next)s in update (wanted '
|
||||
'from_state=%(from)s)') %
|
||||
{'current': current, 'next': next_state,
|
||||
'from': from_state})
|
||||
self.assertEqual(str(exc), msg)
|
||||
|
||||
def _test_image_update_not_existing(self):
|
||||
"""Tests non existing image update doesn't work"""
|
||||
fixture = self.get_fixture(status='bad status')
|
||||
|
||||
|
|
Loading…
Reference in New Issue