Browse Source

Add tests for _ImportToStore.execute()

There are some existing tests for the body of this task, but none for the
meat of the task when things go right.

Change-Id: I6fd0505a265c8ffb03bf8c3c0f2d4a1176485b51
(cherry picked from commit 94a672c7cd)
changes/05/748005/1
Dan Smith 1 month ago
parent
commit
ec46cbca97
1 changed files with 73 additions and 0 deletions
  1. +73
    -0
      glance/tests/unit/async_/flows/test_api_image_import.py

+ 73
- 0
glance/tests/unit/async_/flows/test_api_image_import.py View File

@@ -109,6 +109,79 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
overwrite=False)
self.img_factory = self.gateway.get_image_factory(self.context)

def test_execute(self):
wrapper = mock.MagicMock()
action = mock.MagicMock()
wrapper.__enter__.return_value = action
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper,
"http://url",
"store1", False,
True)
# Assert file_path is honored
with mock.patch.object(image_import, '_execute') as mock_execute:
image_import.execute(mock.sentinel.path)
mock_execute.assert_called_once_with(action, mock.sentinel.path)

# Assert file_path is optional
with mock.patch.object(image_import, '_execute') as mock_execute:
image_import.execute()
mock_execute.assert_called_once_with(action, None)

def test_execute_body_with_store(self):
image = mock.MagicMock()
img_repo = mock.MagicMock()
img_repo.get.return_value = image
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper,
"http://url",
"store1", False,
True)
action = mock.MagicMock()
image_import._execute(action, mock.sentinel.path)
action.set_image_data.assert_called_once_with(
mock.sentinel.path,
TASK_ID1, backend='store1',
set_active=True)
action.remove_importing_stores(['store1'])

def test_execute_body_with_store_no_path(self):
image = mock.MagicMock()
img_repo = mock.MagicMock()
img_repo.get.return_value = image
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper,
"http://url",
"store1", False,
True)
action = mock.MagicMock()
image_import._execute(action, None)
action.set_image_data.assert_called_once_with(
'http://url',
TASK_ID1, backend='store1',
set_active=True)
action.remove_importing_stores(['store1'])

def test_execute_body_without_store(self):
image = mock.MagicMock()
img_repo = mock.MagicMock()
img_repo.get.return_value = image
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper,
"http://url",
None, False,
True)
action = mock.MagicMock()
image_import._execute(action, mock.sentinel.path)
action.set_image_data.assert_called_once_with(
mock.sentinel.path,
TASK_ID1, backend=None,
set_active=True)
action.remove_importing_stores.assert_not_called()

def test_raises_when_image_deleted(self):
img_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)


Loading…
Cancel
Save