Update task message during import

This updates the task.message field with details about the copy,
which will also update the updated_at field on the task. This will
facilitate some rudimentary liveness checking from outside the task
thread. Note that it also checks the state of our task, and will
abort if it has been pushed out of 'processing' state externally.
This will be used in the following patch to faciliate import lock
busting behavior.

Change-Id: I8667c17813f6e701db98595b0b30df9e7b275294
(cherry picked from commit 77d9cfa66e)
This commit is contained in:
Dan Smith 2020-07-27 15:05:58 -07:00
parent 586ca78a23
commit 3da696e0a4
3 changed files with 98 additions and 12 deletions

View File

@ -394,10 +394,11 @@ class _VerifyStaging(task.Task):
class _ImportToStore(task.Task): class _ImportToStore(task.Task):
def __init__(self, task_id, task_type, action_wrapper, uri, def __init__(self, task_id, task_type, task_repo, action_wrapper, uri,
backend, all_stores_must_succeed, set_active): backend, all_stores_must_succeed, set_active):
self.task_id = task_id self.task_id = task_id
self.task_type = task_type self.task_type = task_type
self.task_repo = task_repo
self.action_wrapper = action_wrapper self.action_wrapper = action_wrapper
self.uri = uri self.uri = uri
self.backend = backend self.backend = backend
@ -490,6 +491,20 @@ class _ImportToStore(task.Task):
'copied': total_bytes // units.Mi}) 'copied': total_bytes // units.Mi})
self.last_status = timeutils.now() self.last_status = timeutils.now()
task = script_utils.get_task(self.task_repo, self.task_id)
if task is None:
LOG.error(
'Status callback for task %(task)s found no task object!',
{'task': self.task_id})
raise exception.TaskNotFound(self.task_id)
if task.status != 'processing':
LOG.error('Task %(task)s expected "processing" status, '
'but found "%(status)s"; aborting.')
raise exception.TaskAbortedError()
task.message = _('Copied %i MiB') % (total_bytes // units.Mi)
self.task_repo.save(task)
def revert(self, result, **kwargs): def revert(self, result, **kwargs):
""" """
Remove location from image in case of failure Remove location from image in case of failure
@ -637,6 +652,7 @@ def get_flow(**kwargs):
import_task = lf.Flow(task_name) import_task = lf.Flow(task_name)
import_to_store = _ImportToStore(task_id, import_to_store = _ImportToStore(task_id,
task_name, task_name,
task_repo,
action_wrapper, action_wrapper,
file_uri, file_uri,
store, store,

View File

@ -359,6 +359,10 @@ class ImportTaskError(TaskException, Invalid):
message = _("An import task exception occurred") message = _("An import task exception occurred")
class TaskAbortedError(ImportTaskError):
message = _("Task was aborted externally")
class DuplicateLocation(Duplicate): class DuplicateLocation(Duplicate):
message = _("The location %(location)s already exists") message = _("The location %(location)s already exists")

View File

@ -16,9 +16,10 @@
from unittest import mock from unittest import mock
from oslo_config import cfg from oslo_config import cfg
from oslo_utils import units
import glance.async_.flows.api_image_import as import_flow import glance.async_.flows.api_image_import as import_flow
from glance.common.exception import ImportTaskError from glance.common import exception
from glance.common.scripts.image_import import main as image_import from glance.common.scripts.image_import import main as image_import
from glance import context from glance import context
from glance import gateway from glance import gateway
@ -112,9 +113,10 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
def test_execute(self): def test_execute(self):
wrapper = mock.MagicMock() wrapper = mock.MagicMock()
action = mock.MagicMock() action = mock.MagicMock()
task_repo = mock.MagicMock()
wrapper.__enter__.return_value = action wrapper.__enter__.return_value = action
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper, task_repo, wrapper,
"http://url", "http://url",
"store1", False, "store1", False,
True) True)
@ -132,9 +134,10 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
image = mock.MagicMock() image = mock.MagicMock()
img_repo = mock.MagicMock() img_repo = mock.MagicMock()
img_repo.get.return_value = image img_repo.get.return_value = image
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper, task_repo, wrapper,
"http://url", "http://url",
"store1", False, "store1", False,
True) True)
@ -151,9 +154,10 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
image = mock.MagicMock() image = mock.MagicMock()
img_repo = mock.MagicMock() img_repo = mock.MagicMock()
img_repo.get.return_value = image img_repo.get.return_value = image
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper, task_repo, wrapper,
"http://url", "http://url",
"store1", False, "store1", False,
True) True)
@ -170,9 +174,10 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
image = mock.MagicMock() image = mock.MagicMock()
img_repo = mock.MagicMock() img_repo = mock.MagicMock()
img_repo.get.return_value = image img_repo.get.return_value = image
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper, task_repo, wrapper,
"http://url", "http://url",
None, False, None, False,
True) True)
@ -189,9 +194,11 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
@mock.patch('oslo_utils.timeutils.now') @mock.patch('oslo_utils.timeutils.now')
def test_status_callback_limits_rate(self, mock_now, mock_log): def test_status_callback_limits_rate(self, mock_now, mock_log):
img_repo = mock.MagicMock() img_repo = mock.MagicMock()
task_repo = mock.MagicMock()
task_repo.get.return_value.status = 'processing'
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper, task_repo, wrapper,
"http://url", "http://url",
None, False, None, False,
True) True)
@ -240,22 +247,26 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
def test_raises_when_image_deleted(self): def test_raises_when_image_deleted(self):
img_repo = mock.MagicMock() img_repo = mock.MagicMock()
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper, "http://url", task_repo, wrapper,
"http://url",
"store1", False, "store1", False,
True) True)
image = self.img_factory.new_image(image_id=UUID1) image = self.img_factory.new_image(image_id=UUID1)
image.status = "deleted" image.status = "deleted"
img_repo.get.return_value = image img_repo.get.return_value = image
self.assertRaises(ImportTaskError, image_import.execute) self.assertRaises(exception.ImportTaskError, image_import.execute)
@mock.patch("glance.async_.flows.api_image_import.image_import") @mock.patch("glance.async_.flows.api_image_import.image_import")
def test_remove_store_from_property(self, mock_import): def test_remove_store_from_property(self, mock_import):
img_repo = mock.MagicMock() img_repo = mock.MagicMock()
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper, "http://url", task_repo, wrapper,
"http://url",
"store1", True, "store1", True,
True) True)
extra_properties = {"os_glance_importing_to_stores": "store1,store2"} extra_properties = {"os_glance_importing_to_stores": "store1,store2"}
@ -269,9 +280,11 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
@mock.patch("glance.async_.flows.api_image_import.image_import") @mock.patch("glance.async_.flows.api_image_import.image_import")
def test_raises_when_all_stores_must_succeed(self, mock_import): def test_raises_when_all_stores_must_succeed(self, mock_import):
img_repo = mock.MagicMock() img_repo = mock.MagicMock()
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper, "http://url", task_repo, wrapper,
"http://url",
"store1", True, "store1", True,
True) True)
image = self.img_factory.new_image(image_id=UUID1) image = self.img_factory.new_image(image_id=UUID1)
@ -285,9 +298,11 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
@mock.patch("glance.async_.flows.api_image_import.image_import") @mock.patch("glance.async_.flows.api_image_import.image_import")
def test_doesnt_raise_when_not_all_stores_must_succeed(self, mock_import): def test_doesnt_raise_when_not_all_stores_must_succeed(self, mock_import):
img_repo = mock.MagicMock() img_repo = mock.MagicMock()
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper, "http://url", task_repo, wrapper,
"http://url",
"store1", False, "store1", False,
True) True)
image = self.img_factory.new_image(image_id=UUID1) image = self.img_factory.new_image(image_id=UUID1)
@ -302,6 +317,57 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
except cursive_exception.SignatureVerificationError: except cursive_exception.SignatureVerificationError:
self.fail("Exception shouldn't be raised") self.fail("Exception shouldn't be raised")
@mock.patch('glance.common.scripts.utils.get_task')
def test_status_callback_updates_task_message(self, mock_get):
task_repo = mock.MagicMock()
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
task_repo, mock.MagicMock(),
"http://url",
"store1", False,
True)
task = mock.MagicMock()
task.status = 'processing'
mock_get.return_value = task
action = mock.MagicMock()
image_import._status_callback(action, 128, 256 * units.Mi)
mock_get.assert_called_once_with(task_repo, TASK_ID1)
task_repo.save.assert_called_once_with(task)
self.assertEqual(_('Copied %i MiB' % 256), task.message)
@mock.patch('glance.common.scripts.utils.get_task')
def test_status_aborts_missing_task(self, mock_get):
task_repo = mock.MagicMock()
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
task_repo, mock.MagicMock(),
"http://url",
"store1", False,
True)
mock_get.return_value = None
action = mock.MagicMock()
self.assertRaises(exception.TaskNotFound,
image_import._status_callback,
action, 128, 256 * units.Mi)
mock_get.assert_called_once_with(task_repo, TASK_ID1)
task_repo.save.assert_not_called()
@mock.patch('glance.common.scripts.utils.get_task')
def test_status_aborts_invalid_task_state(self, mock_get):
task_repo = mock.MagicMock()
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
task_repo, mock.MagicMock(),
"http://url",
"store1", False,
True)
task = mock.MagicMock()
task.status = 'failed'
mock_get.return_value = task
action = mock.MagicMock()
self.assertRaises(exception.TaskAbortedError,
image_import._status_callback,
action, 128, 256 * units.Mi)
mock_get.assert_called_once_with(task_repo, TASK_ID1)
task_repo.save.assert_not_called()
class TestDeleteFromFS(test_utils.BaseTestCase): class TestDeleteFromFS(test_utils.BaseTestCase):
def test_delete_with_backends_deletes(self): def test_delete_with_backends_deletes(self):