From 3da696e0a4c45b35cd9241ddc7c80c490b290ef2 Mon Sep 17 00:00:00 2001 From: Dan Smith Date: Mon, 27 Jul 2020 15:05:58 -0700 Subject: [PATCH] Update task message during import This updates the task.message field with details about the copy, which will also update the updated_at field on the task. This will facilitate some rudimentary liveness checking from outside the task thread. Note that it also checks the state of our task, and will abort if it has been pushed out of 'processing' state externally. This will be used in the following patch to faciliate import lock busting behavior. Change-Id: I8667c17813f6e701db98595b0b30df9e7b275294 (cherry picked from commit 77d9cfa66e3c2e70a1cc515e7504fd5dd946549a) --- glance/async_/flows/api_image_import.py | 18 +++- glance/common/exception.py | 4 + .../async_/flows/test_api_image_import.py | 88 ++++++++++++++++--- 3 files changed, 98 insertions(+), 12 deletions(-) diff --git a/glance/async_/flows/api_image_import.py b/glance/async_/flows/api_image_import.py index 74970ef9c6..c7f9926413 100644 --- a/glance/async_/flows/api_image_import.py +++ b/glance/async_/flows/api_image_import.py @@ -394,10 +394,11 @@ class _VerifyStaging(task.Task): class _ImportToStore(task.Task): - def __init__(self, task_id, task_type, action_wrapper, uri, + def __init__(self, task_id, task_type, task_repo, action_wrapper, uri, backend, all_stores_must_succeed, set_active): self.task_id = task_id self.task_type = task_type + self.task_repo = task_repo self.action_wrapper = action_wrapper self.uri = uri self.backend = backend @@ -490,6 +491,20 @@ class _ImportToStore(task.Task): 'copied': total_bytes // units.Mi}) self.last_status = timeutils.now() + task = script_utils.get_task(self.task_repo, self.task_id) + if task is None: + LOG.error( + 'Status callback for task %(task)s found no task object!', + {'task': self.task_id}) + raise exception.TaskNotFound(self.task_id) + if task.status != 'processing': + LOG.error('Task %(task)s expected "processing" status, ' + 'but found "%(status)s"; aborting.') + raise exception.TaskAbortedError() + + task.message = _('Copied %i MiB') % (total_bytes // units.Mi) + self.task_repo.save(task) + def revert(self, result, **kwargs): """ Remove location from image in case of failure @@ -637,6 +652,7 @@ def get_flow(**kwargs): import_task = lf.Flow(task_name) import_to_store = _ImportToStore(task_id, task_name, + task_repo, action_wrapper, file_uri, store, diff --git a/glance/common/exception.py b/glance/common/exception.py index 097957a70f..0d8f09bba1 100644 --- a/glance/common/exception.py +++ b/glance/common/exception.py @@ -359,6 +359,10 @@ class ImportTaskError(TaskException, Invalid): message = _("An import task exception occurred") +class TaskAbortedError(ImportTaskError): + message = _("Task was aborted externally") + + class DuplicateLocation(Duplicate): message = _("The location %(location)s already exists") diff --git a/glance/tests/unit/async_/flows/test_api_image_import.py b/glance/tests/unit/async_/flows/test_api_image_import.py index 7d10a40203..79280fecca 100644 --- a/glance/tests/unit/async_/flows/test_api_image_import.py +++ b/glance/tests/unit/async_/flows/test_api_image_import.py @@ -16,9 +16,10 @@ from unittest import mock from oslo_config import cfg +from oslo_utils import units import glance.async_.flows.api_image_import as import_flow -from glance.common.exception import ImportTaskError +from glance.common import exception from glance.common.scripts.image_import import main as image_import from glance import context from glance import gateway @@ -112,9 +113,10 @@ class TestImportToStoreTask(test_utils.BaseTestCase): def test_execute(self): wrapper = mock.MagicMock() action = mock.MagicMock() + task_repo = mock.MagicMock() wrapper.__enter__.return_value = action image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, - wrapper, + task_repo, wrapper, "http://url", "store1", False, True) @@ -132,9 +134,10 @@ class TestImportToStoreTask(test_utils.BaseTestCase): image = mock.MagicMock() img_repo = mock.MagicMock() img_repo.get.return_value = image + task_repo = mock.MagicMock() wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, - wrapper, + task_repo, wrapper, "http://url", "store1", False, True) @@ -151,9 +154,10 @@ class TestImportToStoreTask(test_utils.BaseTestCase): image = mock.MagicMock() img_repo = mock.MagicMock() img_repo.get.return_value = image + task_repo = mock.MagicMock() wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, - wrapper, + task_repo, wrapper, "http://url", "store1", False, True) @@ -170,9 +174,10 @@ class TestImportToStoreTask(test_utils.BaseTestCase): image = mock.MagicMock() img_repo = mock.MagicMock() img_repo.get.return_value = image + task_repo = mock.MagicMock() wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, - wrapper, + task_repo, wrapper, "http://url", None, False, True) @@ -189,9 +194,11 @@ class TestImportToStoreTask(test_utils.BaseTestCase): @mock.patch('oslo_utils.timeutils.now') def test_status_callback_limits_rate(self, mock_now, mock_log): img_repo = mock.MagicMock() + task_repo = mock.MagicMock() + task_repo.get.return_value.status = 'processing' wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, - wrapper, + task_repo, wrapper, "http://url", None, False, True) @@ -240,22 +247,26 @@ class TestImportToStoreTask(test_utils.BaseTestCase): def test_raises_when_image_deleted(self): img_repo = mock.MagicMock() + task_repo = mock.MagicMock() wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, - wrapper, "http://url", + task_repo, wrapper, + "http://url", "store1", False, True) image = self.img_factory.new_image(image_id=UUID1) image.status = "deleted" img_repo.get.return_value = image - self.assertRaises(ImportTaskError, image_import.execute) + self.assertRaises(exception.ImportTaskError, image_import.execute) @mock.patch("glance.async_.flows.api_image_import.image_import") def test_remove_store_from_property(self, mock_import): img_repo = mock.MagicMock() + task_repo = mock.MagicMock() wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, - wrapper, "http://url", + task_repo, wrapper, + "http://url", "store1", True, True) extra_properties = {"os_glance_importing_to_stores": "store1,store2"} @@ -269,9 +280,11 @@ class TestImportToStoreTask(test_utils.BaseTestCase): @mock.patch("glance.async_.flows.api_image_import.image_import") def test_raises_when_all_stores_must_succeed(self, mock_import): img_repo = mock.MagicMock() + task_repo = mock.MagicMock() wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, - wrapper, "http://url", + task_repo, wrapper, + "http://url", "store1", True, True) image = self.img_factory.new_image(image_id=UUID1) @@ -285,9 +298,11 @@ class TestImportToStoreTask(test_utils.BaseTestCase): @mock.patch("glance.async_.flows.api_image_import.image_import") def test_doesnt_raise_when_not_all_stores_must_succeed(self, mock_import): img_repo = mock.MagicMock() + task_repo = mock.MagicMock() wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, - wrapper, "http://url", + task_repo, wrapper, + "http://url", "store1", False, True) image = self.img_factory.new_image(image_id=UUID1) @@ -302,6 +317,57 @@ class TestImportToStoreTask(test_utils.BaseTestCase): except cursive_exception.SignatureVerificationError: self.fail("Exception shouldn't be raised") + @mock.patch('glance.common.scripts.utils.get_task') + def test_status_callback_updates_task_message(self, mock_get): + task_repo = mock.MagicMock() + image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, + task_repo, mock.MagicMock(), + "http://url", + "store1", False, + True) + task = mock.MagicMock() + task.status = 'processing' + mock_get.return_value = task + action = mock.MagicMock() + image_import._status_callback(action, 128, 256 * units.Mi) + mock_get.assert_called_once_with(task_repo, TASK_ID1) + task_repo.save.assert_called_once_with(task) + self.assertEqual(_('Copied %i MiB' % 256), task.message) + + @mock.patch('glance.common.scripts.utils.get_task') + def test_status_aborts_missing_task(self, mock_get): + task_repo = mock.MagicMock() + image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, + task_repo, mock.MagicMock(), + "http://url", + "store1", False, + True) + mock_get.return_value = None + action = mock.MagicMock() + self.assertRaises(exception.TaskNotFound, + image_import._status_callback, + action, 128, 256 * units.Mi) + mock_get.assert_called_once_with(task_repo, TASK_ID1) + task_repo.save.assert_not_called() + + @mock.patch('glance.common.scripts.utils.get_task') + def test_status_aborts_invalid_task_state(self, mock_get): + task_repo = mock.MagicMock() + image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, + task_repo, mock.MagicMock(), + "http://url", + "store1", False, + True) + task = mock.MagicMock() + task.status = 'failed' + mock_get.return_value = task + action = mock.MagicMock() + self.assertRaises(exception.TaskAbortedError, + image_import._status_callback, + action, 128, 256 * units.Mi) + mock_get.assert_called_once_with(task_repo, TASK_ID1) + task_repo.save.assert_not_called() + class TestDeleteFromFS(test_utils.BaseTestCase): def test_delete_with_backends_deletes(self):