Pass ImageActionWrapper to internal plugins
The _internal_plugins/* tasks are not getting passed the ImportActionWrapper, and thus are doing things against the image that do not respect the task lock, nor apply multiple updates atomically. This fixes that and brings them in line with the base tasks, using the wrapper for things like getting the image_id. The copy_image task looks at the image for a few other things that are not exposed out of the wrapper, so I left the image_repo.get() in that task until I can extend it. They are read-only though, so not a big deal. The web-download test also was not passing the right options to that task (confusing task_repo and image_repo), which this cleans up as well. It was also missing validation of the code that reverts the state to 'queued' when web-download fails, so that is added here as well. Change-Id: I6db86b3e17a6a2f78745b40381b9419fb4404a4e Related-Bug: #1914826
This commit is contained in:
parent
f2452863e7
commit
68646db35d
|
@ -33,11 +33,12 @@ class _CopyImage(task.Task):
|
|||
|
||||
default_provides = 'file_uri'
|
||||
|
||||
def __init__(self, task_id, task_type, image_repo, image_id):
|
||||
def __init__(self, task_id, task_type, image_repo, action_wrapper):
|
||||
self.task_id = task_id
|
||||
self.task_type = task_type
|
||||
self.image_repo = image_repo
|
||||
self.image_id = image_id
|
||||
self.image_id = action_wrapper.image_id
|
||||
self.action_wrapper = action_wrapper
|
||||
super(_CopyImage, self).__init__(
|
||||
name='%s-CopyImage-%s' % (task_type, task_id))
|
||||
|
||||
|
@ -131,13 +132,14 @@ def get_flow(**kwargs):
|
|||
:param task_id: Task ID.
|
||||
:param task_type: Type of the task.
|
||||
:param image_repo: Image repository used.
|
||||
:param uri: URI the image data is downloaded from.
|
||||
:param image_id: Image ID.
|
||||
:param action_wrapper: An api_image_import.ActionWrapper.
|
||||
"""
|
||||
task_id = kwargs.get('task_id')
|
||||
task_type = kwargs.get('task_type')
|
||||
image_repo = kwargs.get('image_repo')
|
||||
image_id = kwargs.get('image_id')
|
||||
action_wrapper = kwargs.get('action_wrapper')
|
||||
|
||||
return lf.Flow(task_type).add(
|
||||
_CopyImage(task_id, task_type, image_repo, image_id),
|
||||
_CopyImage(task_id, task_type, image_repo, action_wrapper),
|
||||
)
|
||||
|
|
|
@ -35,12 +35,12 @@ class _WebDownload(task.Task):
|
|||
|
||||
default_provides = 'file_uri'
|
||||
|
||||
def __init__(self, task_id, task_type, image_repo, image_id, uri):
|
||||
def __init__(self, task_id, task_type, uri, action_wrapper):
|
||||
self.task_id = task_id
|
||||
self.task_type = task_type
|
||||
self.image_repo = image_repo
|
||||
self.image_id = image_id
|
||||
self.image_id = action_wrapper.image_id
|
||||
self.uri = uri
|
||||
self.action_wrapper = action_wrapper
|
||||
self._path = None
|
||||
super(_WebDownload, self).__init__(
|
||||
name='%s-WebDownload-%s' % (task_type, task_id))
|
||||
|
@ -140,9 +140,8 @@ class _WebDownload(task.Task):
|
|||
'image_id': self.image_id})
|
||||
# NOTE(abhishekk): Revert image state back to 'queued' as
|
||||
# something went wrong.
|
||||
image = self.image_repo.get(self.image_id)
|
||||
image.status = 'queued'
|
||||
self.image_repo.save(image)
|
||||
with self.action_wrapper as action:
|
||||
action.set_image_status('queued')
|
||||
|
||||
# NOTE(abhishekk): Deleting partial image data from staging area
|
||||
if self._path is not None:
|
||||
|
@ -166,13 +165,13 @@ def get_flow(**kwargs):
|
|||
:param task_type: Type of the task.
|
||||
:param image_repo: Image repository used.
|
||||
:param uri: URI the image data is downloaded from.
|
||||
:param action_wrapper: An api_image_import.ActionWrapper.
|
||||
"""
|
||||
task_id = kwargs.get('task_id')
|
||||
task_type = kwargs.get('task_type')
|
||||
image_repo = kwargs.get('image_repo')
|
||||
image_id = kwargs.get('image_id')
|
||||
uri = kwargs.get('import_req')['method'].get('uri')
|
||||
action_wrapper = kwargs.get('action_wrapper')
|
||||
|
||||
return lf.Flow(task_type).add(
|
||||
_WebDownload(task_id, task_type, image_repo, image_id, uri),
|
||||
_WebDownload(task_id, task_type, uri, action_wrapper),
|
||||
)
|
||||
|
|
|
@ -730,7 +730,8 @@ def get_flow(**kwargs):
|
|||
flow.add(_ImageLock(task_id, task_type, action_wrapper))
|
||||
|
||||
if import_method in ['web-download', 'copy-image']:
|
||||
internal_plugin = internal_plugins.get_import_plugin(**kwargs)
|
||||
internal_plugin = internal_plugins.get_import_plugin(
|
||||
**dict(kwargs, action_wrapper=action_wrapper))
|
||||
flow.add(internal_plugin)
|
||||
if CONF.enabled_backends:
|
||||
separator, staging_dir = store_utils.get_dir_separator()
|
||||
|
|
|
@ -21,6 +21,7 @@ import glance_store as store_api
|
|||
from oslo_config import cfg
|
||||
|
||||
from glance.async_.flows._internal_plugins import copy_image
|
||||
from glance.async_.flows import api_image_import
|
||||
import glance.common.exception as exception
|
||||
from glance import domain
|
||||
import glance.tests.unit.utils as unit_test_utils
|
||||
|
@ -93,6 +94,11 @@ class TestCopyImageTask(test_utils.BaseTestCase):
|
|||
self.task = self.task_factory.new_task(self.task_type, TENANT1,
|
||||
task_time_to_live=task_ttl,
|
||||
task_input=task_input)
|
||||
self.task_id = self.task.task_id
|
||||
self.action_wrapper = api_image_import.ImportActionWrapper(
|
||||
self.image_repo, self.image_id, self.task_id)
|
||||
self.image_repo.get.return_value = mock.MagicMock(
|
||||
extra_properties={'os_glance_import_task': self.task_id})
|
||||
|
||||
stores = {'cheap': 'file', 'fast': 'file'}
|
||||
self.config(enabled_backends=stores)
|
||||
|
@ -126,7 +132,7 @@ class TestCopyImageTask(test_utils.BaseTestCase):
|
|||
mock_store_api.return_value = self.staging_store
|
||||
copy_image_task = copy_image._CopyImage(
|
||||
self.task.task_id, self.task_type, self.image_repo,
|
||||
self.image_id)
|
||||
self.action_wrapper)
|
||||
with mock.patch.object(self.image_repo, 'get') as get_mock:
|
||||
get_mock.return_value = mock.MagicMock(
|
||||
image_id=self.images[0]['id'],
|
||||
|
@ -152,7 +158,7 @@ class TestCopyImageTask(test_utils.BaseTestCase):
|
|||
|
||||
copy_image_task = copy_image._CopyImage(
|
||||
self.task.task_id, self.task_type, self.image_repo,
|
||||
self.image_id)
|
||||
self.action_wrapper)
|
||||
with mock.patch.object(self.image_repo, 'get') as get_mock:
|
||||
get_mock.return_value = mock.MagicMock(
|
||||
image_id=self.images[0]['id'],
|
||||
|
@ -182,7 +188,7 @@ class TestCopyImageTask(test_utils.BaseTestCase):
|
|||
|
||||
copy_image_task = copy_image._CopyImage(
|
||||
self.task.task_id, self.task_type, self.image_repo,
|
||||
self.image_id)
|
||||
self.action_wrapper)
|
||||
with mock.patch.object(self.image_repo, 'get') as get_mock:
|
||||
get_mock.return_value = mock.MagicMock(
|
||||
image_id=self.images[0]['id'],
|
||||
|
@ -206,7 +212,7 @@ class TestCopyImageTask(test_utils.BaseTestCase):
|
|||
mock_store_api.return_value = self.staging_store
|
||||
copy_image_task = copy_image._CopyImage(
|
||||
self.task.task_id, self.task_type, self.image_repo,
|
||||
self.image_id)
|
||||
self.action_wrapper)
|
||||
with mock.patch.object(self.image_repo, 'get') as get_mock:
|
||||
get_mock.side_effect = exception.NotFound()
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
|
|||
super(TestWebDownloadTask, self).setUp()
|
||||
|
||||
self.config(node_staging_uri='/tmp/staging')
|
||||
self.task_repo = mock.MagicMock()
|
||||
self.image_repo = mock.MagicMock()
|
||||
self.image_id = mock.MagicMock()
|
||||
self.uri = mock.MagicMock()
|
||||
self.task_factory = domain.TaskFactory()
|
||||
|
@ -60,11 +60,16 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
|
|||
task_time_to_live=task_ttl,
|
||||
task_input=task_input)
|
||||
|
||||
self.task_id = self.task.task_id
|
||||
self.action_wrapper = api_image_import.ImportActionWrapper(
|
||||
self.image_repo, self.image_id, self.task_id)
|
||||
self.image_repo.get.return_value = mock.MagicMock(
|
||||
extra_properties={'os_glance_import_task': self.task_id})
|
||||
|
||||
@mock.patch.object(filesystem.Store, 'add')
|
||||
def test_web_download(self, mock_add):
|
||||
web_download_task = web_download._WebDownload(
|
||||
self.task.task_id, self.task_type, self.task_repo,
|
||||
self.image_id, self.uri)
|
||||
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
|
||||
with mock.patch.object(script_utils,
|
||||
'get_image_data_iter') as mock_iter:
|
||||
mock_add.return_value = ["path", 4]
|
||||
|
@ -76,8 +81,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
|
|||
@mock.patch.object(filesystem.Store, 'add')
|
||||
def test_web_download_with_content_length(self, mock_add):
|
||||
web_download_task = web_download._WebDownload(
|
||||
self.task.task_id, self.task_type, self.task_repo,
|
||||
self.image_id, self.uri)
|
||||
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
|
||||
with mock.patch.object(script_utils,
|
||||
'get_image_data_iter') as mock_iter:
|
||||
mock_iter.return_value.headers = {'content-length': '4'}
|
||||
|
@ -89,8 +93,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
|
|||
@mock.patch.object(filesystem.Store, 'add')
|
||||
def test_web_download_with_invalid_content_length(self, mock_add):
|
||||
web_download_task = web_download._WebDownload(
|
||||
self.task.task_id, self.task_type, self.task_repo,
|
||||
self.image_id, self.uri)
|
||||
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
|
||||
with mock.patch.object(script_utils,
|
||||
'get_image_data_iter') as mock_iter:
|
||||
mock_iter.return_value.headers = {'content-length': "not_valid"}
|
||||
|
@ -102,8 +105,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
|
|||
@mock.patch.object(filesystem.Store, 'add')
|
||||
def test_web_download_fails_when_data_size_different(self, mock_add):
|
||||
web_download_task = web_download._WebDownload(
|
||||
self.task.task_id, self.task_type, self.task_repo,
|
||||
self.image_id, self.uri)
|
||||
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
|
||||
with mock.patch.object(script_utils,
|
||||
'get_image_data_iter') as mock_iter:
|
||||
mock_iter.return_value.headers = {'content-length': '4'}
|
||||
|
@ -116,8 +118,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
|
|||
self.config(node_staging_uri=None)
|
||||
self.assertRaises(glance.common.exception.BadTaskConfiguration,
|
||||
web_download._WebDownload, self.task.task_id,
|
||||
self.task_type, self.task_repo, self.image_id,
|
||||
self.uri)
|
||||
self.task_type, self.uri, self.action_wrapper)
|
||||
|
||||
@mock.patch.object(cfg.ConfigOpts, "set_override")
|
||||
def test_web_download_node_store_initialization_failed(self,
|
||||
|
@ -126,14 +127,12 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
|
|||
mock_load_store.return_value = None
|
||||
self.assertRaises(glance.common.exception.BadTaskConfiguration,
|
||||
web_download._WebDownload, self.task.task_id,
|
||||
self.task_type, self.task_repo, self.image_id,
|
||||
self.uri)
|
||||
self.task_type, self.uri, self.action_wrapper)
|
||||
mock_override.assert_called()
|
||||
|
||||
def test_web_download_failed(self):
|
||||
web_download_task = web_download._WebDownload(
|
||||
self.task.task_id, self.task_type, self.task_repo,
|
||||
self.image_id, self.uri)
|
||||
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
|
||||
with mock.patch.object(script_utils,
|
||||
"get_image_data_iter") as mock_iter:
|
||||
mock_iter.side_effect = glance.common.exception.NotFound
|
||||
|
@ -182,8 +181,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
|
|||
def test_web_download_revert_with_failure(self, mock_store_api,
|
||||
mock_add):
|
||||
web_download_task = web_download._WebDownload(
|
||||
self.task.task_id, self.task_type, self.task_repo,
|
||||
self.image_id, self.uri)
|
||||
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
|
||||
with mock.patch.object(script_utils,
|
||||
'get_image_data_iter') as mock_iter:
|
||||
mock_iter.return_value.headers = {'content-length': '4'}
|
||||
|
@ -195,6 +193,9 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
|
|||
web_download_task.revert(None)
|
||||
mock_store_api.delete_from_backend.assert_called_once_with(
|
||||
"/path/to_downloaded_data")
|
||||
# NOTE(danms): Since we told revert that we were not at fault,
|
||||
# we should not have updated the image.
|
||||
self.image_repo.save.assert_not_called()
|
||||
|
||||
@mock.patch("glance.async_.flows._internal_plugins.web_download.store_api")
|
||||
def test_web_download_revert_without_failure_multi_store(self,
|
||||
|
@ -205,8 +206,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
|
|||
}
|
||||
self.config(enabled_backends=enabled_backends)
|
||||
web_download_task = web_download._WebDownload(
|
||||
self.task.task_id, self.task_type, self.task_repo,
|
||||
self.image_id, self.uri)
|
||||
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
|
||||
web_download_task._path = "/path/to_downloaded_data"
|
||||
web_download_task.revert("/path/to_downloaded_data")
|
||||
mock_store_api.delete.assert_called_once_with(
|
||||
|
@ -215,21 +215,26 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
|
|||
@mock.patch("glance.async_.flows._internal_plugins.web_download.store_api")
|
||||
def test_web_download_revert_with_failure_without_path(self,
|
||||
mock_store_api):
|
||||
image = self.image_repo.get.return_value
|
||||
image.status = 'importing'
|
||||
result = failure.Failure.from_exception(
|
||||
glance.common.exception.ImportTaskError())
|
||||
web_download_task = web_download._WebDownload(
|
||||
self.task.task_id, self.task_type, self.task_repo,
|
||||
self.image_id, self.uri)
|
||||
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
|
||||
web_download_task.revert(result)
|
||||
mock_store_api.delete_from_backend.assert_not_called()
|
||||
|
||||
# NOTE(danms): Since we told revert that we were the problem,
|
||||
# we should have updated the image status
|
||||
self.image_repo.save.assert_called_once_with(image, 'importing')
|
||||
self.assertEqual('queued', image.status)
|
||||
|
||||
@mock.patch("glance.async_.flows._internal_plugins.web_download.store_api")
|
||||
def test_web_download_revert_with_failure_with_path(self, mock_store_api):
|
||||
result = failure.Failure.from_exception(
|
||||
glance.common.exception.ImportTaskError())
|
||||
web_download_task = web_download._WebDownload(
|
||||
self.task.task_id, self.task_type, self.task_repo,
|
||||
self.image_id, self.uri)
|
||||
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
|
||||
web_download_task._path = "/path/to_downloaded_data"
|
||||
web_download_task.revert(result)
|
||||
mock_store_api.delete_from_backend.assert_called_once_with(
|
||||
|
@ -241,8 +246,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase):
|
|||
glance.common.exception.ImportTaskError())
|
||||
mock_store_api.delete_from_backend.side_effect = Exception
|
||||
web_download_task = web_download._WebDownload(
|
||||
self.task.task_id, self.task_type, self.task_repo,
|
||||
self.image_id, self.uri)
|
||||
self.task.task_id, self.task_type, self.uri, self.action_wrapper)
|
||||
web_download_task._path = "/path/to_downloaded_data"
|
||||
# this will verify that revert does not break because of failure
|
||||
# while deleting data in staging area
|
||||
|
|
Loading…
Reference in New Issue