Cleanup import status information after busting a lock
When we bust a lock, we now own the image for that time period and may exclude the other task (if still running) from updating the import status information. If not still running, we should take responsibility of that cleanup since we know what task we stole the lock from. We should, however, only do that if we succeed in grabbing the lock to avoid racing with another thread which might be trying to do the same thing. Change-Id: Iff3dfbfcbfb956a06d77a144e5456bdb556c5a2c
This commit is contained in:
parent
3636915dd4
commit
552da84400
@ -169,7 +169,7 @@ class ImagesController(object):
|
||||
if not task or (task.status in bustable_states and age >= expiry):
|
||||
self._bust_import_lock(admin_image_repo, admin_task_repo,
|
||||
image, task, other_task)
|
||||
return
|
||||
return task
|
||||
|
||||
if task.status in bustable_states:
|
||||
LOG.warning('Image %(image)s has active import task %(task)s in '
|
||||
@ -184,6 +184,30 @@ class ImagesController(object):
|
||||
'%(status)s and does not qualify for expiry.')
|
||||
raise exception.Conflict('Image has active task')
|
||||
|
||||
def _cleanup_stale_task_progress(self, image_repo, image, task):
|
||||
"""Cleanup stale in-progress information from a previous task.
|
||||
|
||||
If we stole the lock from another task, we should try to clean up
|
||||
the in-progress status information from that task while we have
|
||||
the lock.
|
||||
"""
|
||||
stores = task.task_input.get('backend', [])
|
||||
keys = ['os_glance_importing_to_stores', 'os_glance_failed_import']
|
||||
changed = set()
|
||||
for store in stores:
|
||||
for key in keys:
|
||||
values = image.extra_properties.get(key, '').split(',')
|
||||
if store in values:
|
||||
values.remove(store)
|
||||
changed.add(key)
|
||||
image.extra_properties[key] = ','.join(values)
|
||||
if changed:
|
||||
image_repo.save(image)
|
||||
LOG.debug('Image %(image)s had stale import progress info '
|
||||
'%(keys)s from task %(task)s which was cleaned up',
|
||||
{'image': image.image_id, 'task': task.task_id,
|
||||
'keys': ','.join(changed)})
|
||||
|
||||
@utils.mutating
|
||||
def import_image(self, req, image_id, body):
|
||||
image_repo = self.gateway.get_repo(req.context)
|
||||
@ -192,6 +216,7 @@ class ImagesController(object):
|
||||
import_method = body.get('method').get('name')
|
||||
uri = body.get('method').get('uri')
|
||||
all_stores_must_succeed = body.get('all_stores_must_succeed', True)
|
||||
stole_lock_from_task = None
|
||||
|
||||
try:
|
||||
image = image_repo.get(image_id)
|
||||
@ -231,7 +256,7 @@ class ImagesController(object):
|
||||
if 'os_glance_import_task' in image.extra_properties:
|
||||
# NOTE(danms): This will raise exception.Conflict if the
|
||||
# lock is present and valid, or return if absent or invalid.
|
||||
self._enforce_import_lock(req, image)
|
||||
stole_lock_from_task = self._enforce_import_lock(req, image)
|
||||
|
||||
stores = [None]
|
||||
if CONF.enabled_backends:
|
||||
@ -316,6 +341,14 @@ class ImagesController(object):
|
||||
"prior operation is still in progress") % image_id)
|
||||
raise exception.Conflict(msg)
|
||||
|
||||
# NOTE(danms): We now have the import lock on this image. If we
|
||||
# busted the lock above and have a reference to that task, try
|
||||
# to clean up the import status information left over from that
|
||||
# execution.
|
||||
if stole_lock_from_task:
|
||||
self._cleanup_stale_task_progress(image_repo, image,
|
||||
stole_lock_from_task)
|
||||
|
||||
task_repo.add(import_task)
|
||||
task_executor = executor_factory.new_task_executor(req.context)
|
||||
pool = common.get_thread_pool("tasks_pool")
|
||||
|
@ -210,10 +210,8 @@ class TestImageImportLocking(functional.SynchronousAPIBase):
|
||||
|
||||
# After completion, we expect store1 (original) and store3 (new)
|
||||
# and that the other task is still stuck importing
|
||||
# FIXME(danms): The stuck importing state needs fixing
|
||||
image = self.api_get('/v2/images/%s' % image_id).json
|
||||
self.assertEqual('store1,store3', image['stores'])
|
||||
self.assertEqual('store2', image['os_glance_importing_to_stores'])
|
||||
self.assertEqual('', image['os_glance_failed_import'])
|
||||
|
||||
# Free up the stalled task and give eventlet time to let it
|
||||
@ -227,12 +225,7 @@ class TestImageImportLocking(functional.SynchronousAPIBase):
|
||||
# terminal state that we expect.
|
||||
image = self.api_get('/v2/images/%s' % image_id).json
|
||||
self.assertEqual('', image.get('os_glance_import_task', ''))
|
||||
# FIXME(danms): With the strict import lock checking in
|
||||
# ImportActionWrapper, we lose the ability to update
|
||||
# importing_to_stores after our lock has been stolen. We
|
||||
# should probably do something about that in the lock-busting
|
||||
# code. We would expect this in that case:
|
||||
# self.assertEqual('', image['os_glance_importing_to_stores'])
|
||||
self.assertEqual('', image['os_glance_importing_to_stores'])
|
||||
self.assertEqual('', image['os_glance_failed_import'])
|
||||
self.assertEqual('store1,store3', image['stores'])
|
||||
|
||||
|
@ -3085,20 +3085,31 @@ class TestImagesController(base.IsolatedUnitTest):
|
||||
mock_spi.assert_called_once_with(image.id, 'os_glance_import_task',
|
||||
'mytask')
|
||||
|
||||
@mock.patch.object(glance.api.authorization.ImageRepoProxy, 'save')
|
||||
@mock.patch('glance.db.simple.api.image_set_property_atomic')
|
||||
@mock.patch('glance.db.simple.api.image_delete_property_atomic')
|
||||
@mock.patch.object(glance.api.authorization.TaskFactoryProxy, 'new_task')
|
||||
@mock.patch.object(glance.api.authorization.ImageRepoProxy, 'get')
|
||||
def test_image_import_locked_by_bustable_task(self, mock_get, mock_nt,
|
||||
mock_dpi, mock_spi,
|
||||
mock_save,
|
||||
task_status='processing'):
|
||||
if task_status == 'processing':
|
||||
# NOTE(danms): Only set task_input on one of the tested
|
||||
# states to make sure we don't choke on a task without
|
||||
# some of the data set yet.
|
||||
task_input = {'backend': ['store2']}
|
||||
else:
|
||||
task_input = {}
|
||||
task = test_tasks_resource._db_fixture(
|
||||
test_tasks_resource.UUID1,
|
||||
status=task_status)
|
||||
status=task_status,
|
||||
input=task_input)
|
||||
self.db.task_create(None, task)
|
||||
image = FakeImage(status='uploading')
|
||||
# Image is locked by a task in 'processing' state
|
||||
image.extra_properties['os_glance_import_task'] = task['id']
|
||||
image.extra_properties['os_glance_importing_to_stores'] = 'store2'
|
||||
mock_get.return_value = image
|
||||
|
||||
request = unit_test_utils.get_fake_request(tenant=TENANT1)
|
||||
@ -3129,6 +3140,14 @@ class TestImagesController(base.IsolatedUnitTest):
|
||||
mock_spi.assert_called_once_with(image.id, 'os_glance_import_task',
|
||||
'mytask')
|
||||
|
||||
# If we stored task_input with information about the stores
|
||||
# and thus triggered the cleanup code, make sure that cleanup
|
||||
# happened here.
|
||||
if task_status == 'processing':
|
||||
self.assertNotIn('store2',
|
||||
image.extra_properties[
|
||||
'os_glance_importing_to_stores'])
|
||||
|
||||
def test_image_import_locked_by_bustable_terminal_task_failure(self):
|
||||
# Make sure we don't fail with a task status transition error
|
||||
self.test_image_import_locked_by_bustable_task(task_status='failure')
|
||||
@ -3137,6 +3156,46 @@ class TestImagesController(base.IsolatedUnitTest):
|
||||
# Make sure we don't fail with a task status transition error
|
||||
self.test_image_import_locked_by_bustable_task(task_status='success')
|
||||
|
||||
def test_cleanup_stale_task_progress(self):
|
||||
img_repo = mock.MagicMock()
|
||||
image = mock.MagicMock()
|
||||
task = mock.MagicMock()
|
||||
|
||||
# No backend info from the old task, means no action
|
||||
task.task_input = {}
|
||||
image.extra_properties = {}
|
||||
self.controller._cleanup_stale_task_progress(img_repo, image, task)
|
||||
img_repo.save.assert_not_called()
|
||||
|
||||
# If we have info but no stores, no action
|
||||
task.task_input = {'backend': []}
|
||||
self.controller._cleanup_stale_task_progress(img_repo, image, task)
|
||||
img_repo.save.assert_not_called()
|
||||
|
||||
# If task had stores, but image does not have those stores in
|
||||
# the lists, no action
|
||||
task.task_input = {'backend': ['store1', 'store2']}
|
||||
self.controller._cleanup_stale_task_progress(img_repo, image, task)
|
||||
img_repo.save.assert_not_called()
|
||||
|
||||
# If the image has stores in the lists, but not the ones we care
|
||||
# about, make sure they are not disturbed
|
||||
image.extra_properties = {'os_glance_failed_import': 'store3'}
|
||||
self.controller._cleanup_stale_task_progress(img_repo, image, task)
|
||||
img_repo.save.assert_not_called()
|
||||
|
||||
# Only if the image has stores that relate to our old task should
|
||||
# take action, and only on those stores.
|
||||
image.extra_properties = {
|
||||
'os_glance_importing_to_stores': 'foo,store1,bar',
|
||||
'os_glance_failed_import': 'foo,store2,bar',
|
||||
}
|
||||
self.controller._cleanup_stale_task_progress(img_repo, image, task)
|
||||
img_repo.save.assert_called_once_with(image)
|
||||
self.assertEqual({'os_glance_importing_to_stores': 'foo,bar',
|
||||
'os_glance_failed_import': 'foo,bar'},
|
||||
image.extra_properties)
|
||||
|
||||
def test_bust_import_lock_race_to_delete(self):
|
||||
image_repo = mock.MagicMock()
|
||||
task_repo = mock.MagicMock()
|
||||
|
Loading…
Reference in New Issue
Block a user