From 43d41159787f10e51d154a08877bade78f3698f2 Mon Sep 17 00:00:00 2001 From: Dan Smith Date: Fri, 5 Feb 2021 14:09:34 -0800 Subject: [PATCH] Pass ImageActionWrapper to internal plugins The _internal_plugins/* tasks are not getting passed the ImportActionWrapper, and thus are doing things against the image that do not respect the task lock, nor apply multiple updates atomically. This fixes that and brings them in line with the base tasks, using the wrapper for things like getting the image_id. The copy_image task looks at the image for a few other things that are not exposed out of the wrapper, so I left the image_repo.get() in that task until I can extend it. They are read-only though, so not a big deal. The web-download test also was not passing the right options to that task (confusing task_repo and image_repo), which this cleans up as well. It was also missing validation of the code that reverts the state to 'queued' when web-download fails, so that is added here as well. Change-Id: I6db86b3e17a6a2f78745b40381b9419fb4404a4e Related-Bug: #1914826 (cherry picked from commit 68646db35d4a389237b9c1db75c074ba90fa1ec6) --- .../flows/_internal_plugins/copy_image.py | 12 +++-- .../flows/_internal_plugins/web_download.py | 17 +++--- glance/async_/flows/api_image_import.py | 3 +- .../unit/async_/flows/test_copy_image.py | 14 +++-- .../unit/async_/flows/test_web_download.py | 54 ++++++++++--------- 5 files changed, 56 insertions(+), 44 deletions(-) diff --git a/glance/async_/flows/_internal_plugins/copy_image.py b/glance/async_/flows/_internal_plugins/copy_image.py index 9ec0260de3..55a29a21a8 100644 --- a/glance/async_/flows/_internal_plugins/copy_image.py +++ b/glance/async_/flows/_internal_plugins/copy_image.py @@ -33,11 +33,12 @@ class _CopyImage(task.Task): default_provides = 'file_uri' - def __init__(self, task_id, task_type, image_repo, image_id): + def __init__(self, task_id, task_type, image_repo, action_wrapper): self.task_id = task_id self.task_type = task_type self.image_repo = image_repo - self.image_id = image_id + self.image_id = action_wrapper.image_id + self.action_wrapper = action_wrapper super(_CopyImage, self).__init__( name='%s-CopyImage-%s' % (task_type, task_id)) @@ -131,13 +132,14 @@ def get_flow(**kwargs): :param task_id: Task ID. :param task_type: Type of the task. :param image_repo: Image repository used. - :param uri: URI the image data is downloaded from. + :param image_id: Image ID. + :param action_wrapper: An api_image_import.ActionWrapper. """ task_id = kwargs.get('task_id') task_type = kwargs.get('task_type') image_repo = kwargs.get('image_repo') - image_id = kwargs.get('image_id') + action_wrapper = kwargs.get('action_wrapper') return lf.Flow(task_type).add( - _CopyImage(task_id, task_type, image_repo, image_id), + _CopyImage(task_id, task_type, image_repo, action_wrapper), ) diff --git a/glance/async_/flows/_internal_plugins/web_download.py b/glance/async_/flows/_internal_plugins/web_download.py index 9ab891be39..6b824c0a13 100644 --- a/glance/async_/flows/_internal_plugins/web_download.py +++ b/glance/async_/flows/_internal_plugins/web_download.py @@ -35,12 +35,12 @@ class _WebDownload(task.Task): default_provides = 'file_uri' - def __init__(self, task_id, task_type, image_repo, image_id, uri): + def __init__(self, task_id, task_type, uri, action_wrapper): self.task_id = task_id self.task_type = task_type - self.image_repo = image_repo - self.image_id = image_id + self.image_id = action_wrapper.image_id self.uri = uri + self.action_wrapper = action_wrapper self._path = None super(_WebDownload, self).__init__( name='%s-WebDownload-%s' % (task_type, task_id)) @@ -140,9 +140,8 @@ class _WebDownload(task.Task): 'image_id': self.image_id}) # NOTE(abhishekk): Revert image state back to 'queued' as # something went wrong. - image = self.image_repo.get(self.image_id) - image.status = 'queued' - self.image_repo.save(image) + with self.action_wrapper as action: + action.set_image_status('queued') # NOTE(abhishekk): Deleting partial image data from staging area if self._path is not None: @@ -166,13 +165,13 @@ def get_flow(**kwargs): :param task_type: Type of the task. :param image_repo: Image repository used. :param uri: URI the image data is downloaded from. + :param action_wrapper: An api_image_import.ActionWrapper. """ task_id = kwargs.get('task_id') task_type = kwargs.get('task_type') - image_repo = kwargs.get('image_repo') - image_id = kwargs.get('image_id') uri = kwargs.get('import_req')['method'].get('uri') + action_wrapper = kwargs.get('action_wrapper') return lf.Flow(task_type).add( - _WebDownload(task_id, task_type, image_repo, image_id, uri), + _WebDownload(task_id, task_type, uri, action_wrapper), ) diff --git a/glance/async_/flows/api_image_import.py b/glance/async_/flows/api_image_import.py index 470f184185..f739a7f2e4 100644 --- a/glance/async_/flows/api_image_import.py +++ b/glance/async_/flows/api_image_import.py @@ -730,7 +730,8 @@ def get_flow(**kwargs): flow.add(_ImageLock(task_id, task_type, action_wrapper)) if import_method in ['web-download', 'copy-image']: - internal_plugin = internal_plugins.get_import_plugin(**kwargs) + internal_plugin = internal_plugins.get_import_plugin( + **dict(kwargs, action_wrapper=action_wrapper)) flow.add(internal_plugin) if CONF.enabled_backends: separator, staging_dir = store_utils.get_dir_separator() diff --git a/glance/tests/unit/async_/flows/test_copy_image.py b/glance/tests/unit/async_/flows/test_copy_image.py index fa79053a9c..95f9408705 100644 --- a/glance/tests/unit/async_/flows/test_copy_image.py +++ b/glance/tests/unit/async_/flows/test_copy_image.py @@ -21,6 +21,7 @@ import glance_store as store_api from oslo_config import cfg from glance.async_.flows._internal_plugins import copy_image +from glance.async_.flows import api_image_import import glance.common.exception as exception from glance import domain import glance.tests.unit.utils as unit_test_utils @@ -93,6 +94,11 @@ class TestCopyImageTask(test_utils.BaseTestCase): self.task = self.task_factory.new_task(self.task_type, TENANT1, task_time_to_live=task_ttl, task_input=task_input) + self.task_id = self.task.task_id + self.action_wrapper = api_image_import.ImportActionWrapper( + self.image_repo, self.image_id, self.task_id) + self.image_repo.get.return_value = mock.MagicMock( + extra_properties={'os_glance_import_task': self.task_id}) stores = {'cheap': 'file', 'fast': 'file'} self.config(enabled_backends=stores) @@ -126,7 +132,7 @@ class TestCopyImageTask(test_utils.BaseTestCase): mock_store_api.return_value = self.staging_store copy_image_task = copy_image._CopyImage( self.task.task_id, self.task_type, self.image_repo, - self.image_id) + self.action_wrapper) with mock.patch.object(self.image_repo, 'get') as get_mock: get_mock.return_value = mock.MagicMock( image_id=self.images[0]['id'], @@ -152,7 +158,7 @@ class TestCopyImageTask(test_utils.BaseTestCase): copy_image_task = copy_image._CopyImage( self.task.task_id, self.task_type, self.image_repo, - self.image_id) + self.action_wrapper) with mock.patch.object(self.image_repo, 'get') as get_mock: get_mock.return_value = mock.MagicMock( image_id=self.images[0]['id'], @@ -182,7 +188,7 @@ class TestCopyImageTask(test_utils.BaseTestCase): copy_image_task = copy_image._CopyImage( self.task.task_id, self.task_type, self.image_repo, - self.image_id) + self.action_wrapper) with mock.patch.object(self.image_repo, 'get') as get_mock: get_mock.return_value = mock.MagicMock( image_id=self.images[0]['id'], @@ -206,7 +212,7 @@ class TestCopyImageTask(test_utils.BaseTestCase): mock_store_api.return_value = self.staging_store copy_image_task = copy_image._CopyImage( self.task.task_id, self.task_type, self.image_repo, - self.image_id) + self.action_wrapper) with mock.patch.object(self.image_repo, 'get') as get_mock: get_mock.side_effect = exception.NotFound() diff --git a/glance/tests/unit/async_/flows/test_web_download.py b/glance/tests/unit/async_/flows/test_web_download.py index 02bf37f782..4a1ee9c7cc 100644 --- a/glance/tests/unit/async_/flows/test_web_download.py +++ b/glance/tests/unit/async_/flows/test_web_download.py @@ -40,7 +40,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase): super(TestWebDownloadTask, self).setUp() self.config(node_staging_uri='/tmp/staging') - self.task_repo = mock.MagicMock() + self.image_repo = mock.MagicMock() self.image_id = mock.MagicMock() self.uri = mock.MagicMock() self.task_factory = domain.TaskFactory() @@ -60,11 +60,16 @@ class TestWebDownloadTask(test_utils.BaseTestCase): task_time_to_live=task_ttl, task_input=task_input) + self.task_id = self.task.task_id + self.action_wrapper = api_image_import.ImportActionWrapper( + self.image_repo, self.image_id, self.task_id) + self.image_repo.get.return_value = mock.MagicMock( + extra_properties={'os_glance_import_task': self.task_id}) + @mock.patch.object(filesystem.Store, 'add') def test_web_download(self, mock_add): web_download_task = web_download._WebDownload( - self.task.task_id, self.task_type, self.task_repo, - self.image_id, self.uri) + self.task.task_id, self.task_type, self.uri, self.action_wrapper) with mock.patch.object(script_utils, 'get_image_data_iter') as mock_iter: mock_add.return_value = ["path", 4] @@ -76,8 +81,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase): @mock.patch.object(filesystem.Store, 'add') def test_web_download_with_content_length(self, mock_add): web_download_task = web_download._WebDownload( - self.task.task_id, self.task_type, self.task_repo, - self.image_id, self.uri) + self.task.task_id, self.task_type, self.uri, self.action_wrapper) with mock.patch.object(script_utils, 'get_image_data_iter') as mock_iter: mock_iter.return_value.headers = {'content-length': '4'} @@ -89,8 +93,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase): @mock.patch.object(filesystem.Store, 'add') def test_web_download_with_invalid_content_length(self, mock_add): web_download_task = web_download._WebDownload( - self.task.task_id, self.task_type, self.task_repo, - self.image_id, self.uri) + self.task.task_id, self.task_type, self.uri, self.action_wrapper) with mock.patch.object(script_utils, 'get_image_data_iter') as mock_iter: mock_iter.return_value.headers = {'content-length': "not_valid"} @@ -102,8 +105,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase): @mock.patch.object(filesystem.Store, 'add') def test_web_download_fails_when_data_size_different(self, mock_add): web_download_task = web_download._WebDownload( - self.task.task_id, self.task_type, self.task_repo, - self.image_id, self.uri) + self.task.task_id, self.task_type, self.uri, self.action_wrapper) with mock.patch.object(script_utils, 'get_image_data_iter') as mock_iter: mock_iter.return_value.headers = {'content-length': '4'} @@ -116,8 +118,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase): self.config(node_staging_uri=None) self.assertRaises(glance.common.exception.BadTaskConfiguration, web_download._WebDownload, self.task.task_id, - self.task_type, self.task_repo, self.image_id, - self.uri) + self.task_type, self.uri, self.action_wrapper) @mock.patch.object(cfg.ConfigOpts, "set_override") def test_web_download_node_store_initialization_failed(self, @@ -126,14 +127,12 @@ class TestWebDownloadTask(test_utils.BaseTestCase): mock_load_store.return_value = None self.assertRaises(glance.common.exception.BadTaskConfiguration, web_download._WebDownload, self.task.task_id, - self.task_type, self.task_repo, self.image_id, - self.uri) + self.task_type, self.uri, self.action_wrapper) mock_override.assert_called() def test_web_download_failed(self): web_download_task = web_download._WebDownload( - self.task.task_id, self.task_type, self.task_repo, - self.image_id, self.uri) + self.task.task_id, self.task_type, self.uri, self.action_wrapper) with mock.patch.object(script_utils, "get_image_data_iter") as mock_iter: mock_iter.side_effect = glance.common.exception.NotFound @@ -182,8 +181,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase): def test_web_download_revert_with_failure(self, mock_store_api, mock_add): web_download_task = web_download._WebDownload( - self.task.task_id, self.task_type, self.task_repo, - self.image_id, self.uri) + self.task.task_id, self.task_type, self.uri, self.action_wrapper) with mock.patch.object(script_utils, 'get_image_data_iter') as mock_iter: mock_iter.return_value.headers = {'content-length': '4'} @@ -195,6 +193,9 @@ class TestWebDownloadTask(test_utils.BaseTestCase): web_download_task.revert(None) mock_store_api.delete_from_backend.assert_called_once_with( "/path/to_downloaded_data") + # NOTE(danms): Since we told revert that we were not at fault, + # we should not have updated the image. + self.image_repo.save.assert_not_called() @mock.patch("glance.async_.flows._internal_plugins.web_download.store_api") def test_web_download_revert_without_failure_multi_store(self, @@ -205,8 +206,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase): } self.config(enabled_backends=enabled_backends) web_download_task = web_download._WebDownload( - self.task.task_id, self.task_type, self.task_repo, - self.image_id, self.uri) + self.task.task_id, self.task_type, self.uri, self.action_wrapper) web_download_task._path = "/path/to_downloaded_data" web_download_task.revert("/path/to_downloaded_data") mock_store_api.delete.assert_called_once_with( @@ -215,21 +215,26 @@ class TestWebDownloadTask(test_utils.BaseTestCase): @mock.patch("glance.async_.flows._internal_plugins.web_download.store_api") def test_web_download_revert_with_failure_without_path(self, mock_store_api): + image = self.image_repo.get.return_value + image.status = 'importing' result = failure.Failure.from_exception( glance.common.exception.ImportTaskError()) web_download_task = web_download._WebDownload( - self.task.task_id, self.task_type, self.task_repo, - self.image_id, self.uri) + self.task.task_id, self.task_type, self.uri, self.action_wrapper) web_download_task.revert(result) mock_store_api.delete_from_backend.assert_not_called() + # NOTE(danms): Since we told revert that we were the problem, + # we should have updated the image status + self.image_repo.save.assert_called_once_with(image, 'importing') + self.assertEqual('queued', image.status) + @mock.patch("glance.async_.flows._internal_plugins.web_download.store_api") def test_web_download_revert_with_failure_with_path(self, mock_store_api): result = failure.Failure.from_exception( glance.common.exception.ImportTaskError()) web_download_task = web_download._WebDownload( - self.task.task_id, self.task_type, self.task_repo, - self.image_id, self.uri) + self.task.task_id, self.task_type, self.uri, self.action_wrapper) web_download_task._path = "/path/to_downloaded_data" web_download_task.revert(result) mock_store_api.delete_from_backend.assert_called_once_with( @@ -241,8 +246,7 @@ class TestWebDownloadTask(test_utils.BaseTestCase): glance.common.exception.ImportTaskError()) mock_store_api.delete_from_backend.side_effect = Exception web_download_task = web_download._WebDownload( - self.task.task_id, self.task_type, self.task_repo, - self.image_id, self.uri) + self.task.task_id, self.task_type, self.uri, self.action_wrapper) web_download_task._path = "/path/to_downloaded_data" # this will verify that revert does not break because of failure # while deleting data in staging area