Merge "Mark task as failed in case of flow failure"
This commit is contained in:
commit
74aeb021c4
@ -24,6 +24,7 @@ from stevedore import named
|
||||
from taskflow.patterns import linear_flow as lf
|
||||
from taskflow import retry
|
||||
from taskflow import task
|
||||
from taskflow.types import failure
|
||||
|
||||
from glance.common import exception
|
||||
from glance.common.scripts.image_import import main as image_import
|
||||
@ -148,11 +149,11 @@ class _ImportToFS(task.Task):
|
||||
path = self.store.add(image_id, data, 0, context=None)[0]
|
||||
return path
|
||||
|
||||
def revert(self, image_id, result=None, **kwargs):
|
||||
# NOTE(flaper87): If result is None, it probably
|
||||
# means this task failed. Otherwise, we would have
|
||||
# a result from its execution.
|
||||
if result is None:
|
||||
def revert(self, image_id, result, **kwargs):
|
||||
if isinstance(result, failure.Failure):
|
||||
LOG.exception(_LE('Task: %(task_id)s failed to import image '
|
||||
'%(image_id)s to the filesystem.') %
|
||||
{'task_id': self.task_id, 'image_id': image_id})
|
||||
return
|
||||
|
||||
if os.path.exists(result.split("file://")[-1]):
|
||||
|
@ -129,3 +129,7 @@ class TaskExecutor(glance.async.TaskExecutor):
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_LE('Failed to execute task %(task_id)s: %(exc)s') %
|
||||
{'task_id': task_id, 'exc': exc.message})
|
||||
# TODO(sabari): Check for specific exceptions and update the
|
||||
# task failure message.
|
||||
task.fail(_('Task failed due to Internal Error'))
|
||||
self.task_repo.save(task)
|
||||
|
@ -21,6 +21,7 @@ from oslo_config import cfg
|
||||
from six.moves import cStringIO
|
||||
from six.moves import urllib
|
||||
from taskflow import task
|
||||
from taskflow.types import failure
|
||||
|
||||
import glance.async.flows.base_import as import_flow
|
||||
from glance.async import taskflow_executor
|
||||
@ -151,6 +152,44 @@ class TestImportTask(test_utils.BaseTestCase):
|
||||
self.assertFalse(os.path.exists(tmp_image_path))
|
||||
self.assertTrue(os.path.exists(image_path))
|
||||
|
||||
def test_import_flow_revert_import_to_fs(self):
|
||||
self.config(engine_mode='serial', group='taskflow_executor')
|
||||
|
||||
img_factory = mock.MagicMock()
|
||||
|
||||
executor = taskflow_executor.TaskExecutor(
|
||||
self.context,
|
||||
self.task_repo,
|
||||
self.img_repo,
|
||||
img_factory)
|
||||
|
||||
self.task_repo.get.return_value = self.task
|
||||
|
||||
def create_image(*args, **kwargs):
|
||||
kwargs['image_id'] = UUID1
|
||||
return self.img_factory.new_image(*args, **kwargs)
|
||||
|
||||
self.img_repo.get.return_value = self.image
|
||||
img_factory.new_image.side_effect = create_image
|
||||
|
||||
with mock.patch.object(script_utils, 'get_image_data_iter') as dmock:
|
||||
dmock.side_effect = RuntimeError
|
||||
|
||||
with mock.patch.object(import_flow._ImportToFS, 'revert') as rmock:
|
||||
self.assertRaises(RuntimeError,
|
||||
executor.begin_processing, self.task.task_id)
|
||||
self.assertTrue(rmock.called)
|
||||
self.assertIsInstance(rmock.call_args[1]['result'],
|
||||
failure.Failure)
|
||||
|
||||
image_path = os.path.join(self.test_dir, self.image.image_id)
|
||||
tmp_image_path = os.path.join(self.work_dir,
|
||||
"%s.tasks_import" % image_path)
|
||||
self.assertFalse(os.path.exists(tmp_image_path))
|
||||
# Note(sabari): The image should not have been uploaded to
|
||||
# the store as the flow failed before ImportToStore Task.
|
||||
self.assertFalse(os.path.exists(image_path))
|
||||
|
||||
def test_import_flow_revert(self):
|
||||
self.config(engine_mode='serial',
|
||||
group='taskflow_executor')
|
||||
|
@ -78,3 +78,14 @@ class TestTaskExecutor(test_utils.BaseTestCase):
|
||||
# assert the call
|
||||
load_mock.assert_called_once()
|
||||
engine.assert_called_once()
|
||||
|
||||
def test_task_fail(self):
|
||||
with mock.patch.object(engines, 'load') as load_mock:
|
||||
engine = mock.Mock()
|
||||
load_mock.return_value = engine
|
||||
engine.run.side_effect = RuntimeError
|
||||
self.task_repo.get.return_value = self.task
|
||||
self.assertRaises(RuntimeError, self.executor.begin_processing,
|
||||
self.task.task_id)
|
||||
self.assertEqual('failure', self.task.status)
|
||||
self.task_repo.save.assert_called_with(self.task)
|
||||
|
Loading…
Reference in New Issue
Block a user