Add testing for _CompleteTask in api_image_import

This critical task has no testing. Add some before we add more
functionality to it.

Change-Id: I4af4202d6650b60aebbed813b7640cf018466e12
This commit is contained in:
Dan Smith 2020-07-28 08:32:46 -07:00
parent 765bb7a2c7
commit 2e11ecb15e
1 changed files with 35 additions and 0 deletions

View File

@ -510,3 +510,38 @@ class TestImportActions(test_utils.BaseTestCase):
mock_log.warning.assert_called_once_with(
_('Unexpected exception when deleting from store foo.'))
mock_log.warning.reset_mock()
@mock.patch('glance.common.scripts.utils.get_task')
class TestCompleteTask(test_utils.BaseTestCase):
def setUp(self):
super(TestCompleteTask, self).setUp()
self.task_repo = mock.MagicMock()
self.task = mock.MagicMock()
def test_execute(self, mock_get_task):
complete = import_flow._CompleteTask(TASK_ID1, TASK_TYPE,
self.task_repo, IMAGE_ID1)
mock_get_task.return_value = self.task
complete.execute()
mock_get_task.assert_called_once_with(self.task_repo,
TASK_ID1)
self.task.succeed.assert_called_once_with({'image_id': IMAGE_ID1})
self.task_repo.save.assert_called_once_with(self.task)
def test_execute_no_task(self, mock_get_task):
mock_get_task.return_value = None
complete = import_flow._CompleteTask(TASK_ID1, TASK_TYPE,
self.task_repo, IMAGE_ID1)
complete.execute()
self.task_repo.save.assert_not_called()
def test_execute_succeed_fails(self, mock_get_task):
mock_get_task.return_value = self.task
self.task.succeed.side_effect = Exception('testing')
complete = import_flow._CompleteTask(TASK_ID1, TASK_TYPE,
self.task_repo, IMAGE_ID1)
complete.execute()
self.task.fail.assert_called_once_with(
_('Error: <class \'Exception\'>: testing'))
self.task_repo.save.assert_called_once_with(self.task)