Pass ImageActionWrapper to internal plugins

The _internal_plugins/* tasks are not getting passed the
ImportActionWrapper, and thus are doing things against the image
that do not respect the task lock, nor apply multiple updates
atomically.

This fixes that and brings them in line with the base tasks, using the
wrapper for things like getting the image_id. The copy_image task
looks at the image for a few other things that are not exposed out
of the wrapper, so I left the image_repo.get() in that task until I
can extend it. They are read-only though, so not a big deal.

The web-download test also was not passing the right options to
that task (confusing task_repo and image_repo), which this cleans
up as well. It was also missing validation of the code that reverts
the state to 'queued' when web-download fails, so that is added here
as well.

Change-Id: I6db86b3e17a6a2f78745b40381b9419fb4404a4e
Related-Bug: #1914826
(cherry picked from commit 68646db35d)
This commit is contained in:
Dan Smith 2021-02-05 14:09:34 -08:00 committed by Lee Yarwood
parent 508800512d
commit 43d4115978
5 changed files with 56 additions and 44 deletions

View File

@ -33,11 +33,12 @@ class _CopyImage(task.Task):
default_provides = 'file_uri'
def __init__(self, task_id, task_type, image_repo, image_id):
def __init__(self, task_id, task_type, image_repo, action_wrapper):
self.task_id = task_id
self.task_type = task_type
self.image_repo = image_repo
self.image_id = image_id
self.image_id = action_wrapper.image_id
self.action_wrapper = action_wrapper
super(_CopyImage, self).__init__(
name='%s-CopyImage-%s' % (task_type, task_id))
@ -131,13 +132,14 @@ def get_flow(**kwargs):
:param task_id: Task ID.
:param task_type: Type of the task.
:param image_repo: Image repository used.
:param uri: URI the image data is downloaded from.
:param image_id: Image ID.
:param action_wrapper: An api_image_import.ActionWrapper.
"""
task_id = kwargs.get('task_id')
task_type = kwargs.get('task_type')
image_repo = kwargs.get('image_repo')
image_id = kwargs.get('image_id')
action_wrapper = kwargs.get('action_wrapper')
return lf.Flow(task_type).add(
_CopyImage(task_id, task_type, image_repo, image_id),
_CopyImage(task_id, task_type, image_repo, action_wrapper),
)

View File

@ -35,12 +35,12 @@ class _WebDownload(task.Task):
default_provides = 'file_uri'
def __init__(self, task_id, task_type, image_repo, image_id, uri):
def __init__(self, task_id, task_type, uri, action_wrapper):
self.task_id = task_id
self.task_type = task_type
self.image_repo = image_repo
self.image_id = image_id
self.image_id = action_wrapper.image_id
self.uri = uri
self.action_wrapper = action_wrapper
self._path = None
super(_WebDownload, self).__init__(
name='%s-WebDownload-%s' % (task_type, task_id))
@ -140,9 +140,8 @@ class _WebDownload(task.Task):
'image_id': self.image_id})
# NOTE(abhishekk): Revert image state back to 'queued' as
# something went wrong.
image = self.image_repo.get(self.image_id)
image.status = 'queued'
self.image_repo.save(image)
with self.action_wrapper as action:
action.set_image_status('queued')
# NOTE(abhishekk): Deleting partial image data from staging area
if self._path is not None:
@ -166,13 +165,13 @@ def get_flow(**kwargs):
:param task_type: Type of the task.
:param image_repo: Image repository used.
:param uri: URI the image data is downloaded from.
:param action_wrapper: An api_image_import.ActionWrapper.
"""
task_id = kwargs.get('task_id')
task_type = kwargs.get('task_type')
image_repo = kwargs.get('image_repo')
image_id = kwargs.get('image_id')
uri = kwargs.get('import_req')['method'].get('uri')
action_wrapper = kwargs.get('action_wrapper')
return lf.Flow(task_type).add(
_WebDownload(task_id, task_type, image_repo, image_id, uri),
_WebDownload(task_id, task_type, uri, action_wrapper),
)

View File

@ -730,7 +730,8 @@ def get_flow(**kwargs):
flow.add(_ImageLock(task_id, task_type, action_wrapper))
if import_method in ['web-download', 'copy-image']:
internal_plugin = internal_plugins.get_import_plugin(**kwargs)
internal_plugin = internal_plugins.get_import_plugin(
**dict(kwargs, action_wrapper=action_wrapper))
flow.add(internal_plugin)
if CONF.enabled_backends:
separator, staging_dir = store_utils.get_dir_separator()

View File

@ -21,6 +21,7 @@ import glance_store as store_api
from oslo_config import cfg
from glance.async_.flows._internal_plugins import copy_image
from glance.async_.flows import api_image_import
import glance.common.exception as exception
from glance import domain
import glance.tests.unit.utils as unit_test_utils
@ -93,6 +94,11 @@ class TestCopyImageTask(test_utils.BaseTestCase):
self.task = self.task_factory.new_task(self.task_type, TENANT1,
task_time_to_live=task_ttl,
task_input=task_input)
self.task_id = self.task.task_id
self.action_wrapper = api_image_import.ImportActionWrapper(
self.image_repo, self.image_id, self.task_id)
self.image_repo.get.return_value = mock.MagicMock(
extra_properties={'os_glance_import_task': self.task_id})
stores = {'cheap': 'file', 'fast': 'file'}
self.config(enabled_backends=stores)
@ -126,7 +132,7 @@ class TestCopyImageTask(test_utils.BaseTestCase):
mock_store_api.return_value = self.staging_store
copy_image_task = copy_image._CopyImage(
self.task.task_id, self.task_type, self.image_repo,
self.image_id)
self.action_wrapper)
with mock.patch.object(self.image_repo, 'get') as get_mock:
get_mock.return_value = mock.MagicMock(
image_id=self.images[0]['id'],
@ -152,7 +158,7 @@ class TestCopyImageTask(test_utils.BaseTestCase):
copy_image_task = copy_image._CopyImage(
self.task.task_id, self.task_type, self.image_repo,
self.image_id)
self.action_wrapper)
with mock.patch.object(self.image_repo, 'get') as get_mock:
get_mock.return_value = mock.MagicMock(
image_id=self.images[0]['id'],
@ -182,7 +188,7 @@ class TestCopyImageTask(test_utils.BaseTestCase):
copy_image_task = copy_image._CopyImage(
self.task.task_id, self.task_type, self.image_repo,
self.image_id)
self.action_wrapper)
with mock.patch.object(self.image_repo, 'get') as get_mock:
get_mock.return_value = mock.MagicMock(
image_id=self.images[0]['id'],
@ -206,7 +212,7 @@ class TestCopyImageTask(test_utils.BaseTestCase):
mock_store_api.return_value = self.staging_store
copy_image_task = copy_image._CopyImage(
self.task.task_id, self.task_type, self.image_repo,
self.image_id)
self.action_wrapper)
with mock.patch.object(self.image_repo, 'get') as get_mock:
get_mock.side_effect = exception.NotFound()

View File

@ -40,7 +40,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
super(TestWebDownloadTask, self).setUp()
self.config(node_staging_uri='/tmp/staging')
self.task_repo = mock.MagicMock()
self.image_repo = mock.MagicMock()
self.image_id = mock.MagicMock()
self.uri = mock.MagicMock()
self.task_factory = domain.TaskFactory()
@ -60,11 +60,16 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
task_time_to_live=task_ttl,
task_input=task_input)
self.task_id = self.task.task_id
self.action_wrapper = api_image_import.ImportActionWrapper(
self.image_repo, self.image_id, self.task_id)
self.image_repo.get.return_value = mock.MagicMock(
extra_properties={'os_glance_import_task': self.task_id})
@mock.patch.object(filesystem.Store, 'add')
def test_web_download(self, mock_add):
web_download_task = web_download._WebDownload(
self.task.task_id, self.task_type, self.task_repo,
self.image_id, self.uri)
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
with mock.patch.object(script_utils,
'get_image_data_iter') as mock_iter:
mock_add.return_value = ["path", 4]
@ -76,8 +81,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
@mock.patch.object(filesystem.Store, 'add')
def test_web_download_with_content_length(self, mock_add):
web_download_task = web_download._WebDownload(
self.task.task_id, self.task_type, self.task_repo,
self.image_id, self.uri)
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
with mock.patch.object(script_utils,
'get_image_data_iter') as mock_iter:
mock_iter.return_value.headers = {'content-length': '4'}
@ -89,8 +93,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
@mock.patch.object(filesystem.Store, 'add')
def test_web_download_with_invalid_content_length(self, mock_add):
web_download_task = web_download._WebDownload(
self.task.task_id, self.task_type, self.task_repo,
self.image_id, self.uri)
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
with mock.patch.object(script_utils,
'get_image_data_iter') as mock_iter:
mock_iter.return_value.headers = {'content-length': "not_valid"}
@ -102,8 +105,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
@mock.patch.object(filesystem.Store, 'add')
def test_web_download_fails_when_data_size_different(self, mock_add):
web_download_task = web_download._WebDownload(
self.task.task_id, self.task_type, self.task_repo,
self.image_id, self.uri)
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
with mock.patch.object(script_utils,
'get_image_data_iter') as mock_iter:
mock_iter.return_value.headers = {'content-length': '4'}
@ -116,8 +118,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
self.config(node_staging_uri=None)
self.assertRaises(glance.common.exception.BadTaskConfiguration,
web_download._WebDownload, self.task.task_id,
self.task_type, self.task_repo, self.image_id,
self.uri)
self.task_type, self.uri, self.action_wrapper)
@mock.patch.object(cfg.ConfigOpts, "set_override")
def test_web_download_node_store_initialization_failed(self,
@ -126,14 +127,12 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
mock_load_store.return_value = None
self.assertRaises(glance.common.exception.BadTaskConfiguration,
web_download._WebDownload, self.task.task_id,
self.task_type, self.task_repo, self.image_id,
self.uri)
self.task_type, self.uri, self.action_wrapper)
mock_override.assert_called()
def test_web_download_failed(self):
web_download_task = web_download._WebDownload(
self.task.task_id, self.task_type, self.task_repo,
self.image_id, self.uri)
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
with mock.patch.object(script_utils,
"get_image_data_iter") as mock_iter:
mock_iter.side_effect = glance.common.exception.NotFound
@ -182,8 +181,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
def test_web_download_revert_with_failure(self, mock_store_api,
mock_add):
web_download_task = web_download._WebDownload(
self.task.task_id, self.task_type, self.task_repo,
self.image_id, self.uri)
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
with mock.patch.object(script_utils,
'get_image_data_iter') as mock_iter:
mock_iter.return_value.headers = {'content-length': '4'}
@ -195,6 +193,9 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
web_download_task.revert(None)
mock_store_api.delete_from_backend.assert_called_once_with(
"/path/to_downloaded_data")
# NOTE(danms): Since we told revert that we were not at fault,
# we should not have updated the image.
self.image_repo.save.assert_not_called()
@mock.patch("glance.async_.flows._internal_plugins.web_download.store_api")
def test_web_download_revert_without_failure_multi_store(self,
@ -205,8 +206,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
}
self.config(enabled_backends=enabled_backends)
web_download_task = web_download._WebDownload(
self.task.task_id, self.task_type, self.task_repo,
self.image_id, self.uri)
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
web_download_task._path = "/path/to_downloaded_data"
web_download_task.revert("/path/to_downloaded_data")
mock_store_api.delete.assert_called_once_with(
@ -215,21 +215,26 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
@mock.patch("glance.async_.flows._internal_plugins.web_download.store_api")
def test_web_download_revert_with_failure_without_path(self,
mock_store_api):
image = self.image_repo.get.return_value
image.status = 'importing'
result = failure.Failure.from_exception(
glance.common.exception.ImportTaskError())
web_download_task = web_download._WebDownload(
self.task.task_id, self.task_type, self.task_repo,
self.image_id, self.uri)
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
web_download_task.revert(result)
mock_store_api.delete_from_backend.assert_not_called()
# NOTE(danms): Since we told revert that we were the problem,
# we should have updated the image status
self.image_repo.save.assert_called_once_with(image, 'importing')
self.assertEqual('queued', image.status)
@mock.patch("glance.async_.flows._internal_plugins.web_download.store_api")
def test_web_download_revert_with_failure_with_path(self, mock_store_api):
result = failure.Failure.from_exception(
glance.common.exception.ImportTaskError())
web_download_task = web_download._WebDownload(
self.task.task_id, self.task_type, self.task_repo,
self.image_id, self.uri)
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
web_download_task._path = "/path/to_downloaded_data"
web_download_task.revert(result)
mock_store_api.delete_from_backend.assert_called_once_with(
@ -241,8 +246,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
glance.common.exception.ImportTaskError())
mock_store_api.delete_from_backend.side_effect = Exception
web_download_task = web_download._WebDownload(
self.task.task_id, self.task_type, self.task_repo,
self.image_id, self.uri)
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
web_download_task._path = "/path/to_downloaded_data"
# this will verify that revert does not break because of failure
# while deleting data in staging area