Browse Source

Make import task capable of running as admin on behalf of user

This makes the api_image_import task capable of running as an admin on
behalf of a user if so authorized by the API. It includes a new object
called ImportActionWrapper which provides a bundle of utility methods
which can be run either against the user-authorized or admin-authorized
ImageRepo passed in from the API. It encapsulates all the actions
we are able and willing to run as an admin for the user.

This is currently not drivable by the API because the policy check is
still statically defined as "admin or owner" but this change is
offered without any needed modification to the functional tests to
prove that it does not regress existing functionality. The following
patch will introduce a more robust knob for allowing users to do this,
and it brings the functional test changes with it.

Changes:
 - Removed the change to the images API to pass in an admin repo
 - Reinstated the ImportActionWrapper usage that was removed from
   backport commit 2b3ce07c32

Change-Id: Iac75956e314ec6f41db18430486bd8be9754e780
(cherry picked from commit 2fd0c25733)
changes/02/748002/1
Dan Smith 3 months ago
parent
commit
eb08fbae0f
2 changed files with 477 additions and 117 deletions
  1. +220
    -78
      glance/async_/flows/api_image_import.py
  2. +257
    -39
      glance/tests/unit/async_/flows/test_api_image_import.py

+ 220
- 78
glance/async_/flows/api_image_import.py View File

@@ -82,6 +82,188 @@ class _NoStoresSucceeded(exception.GlanceException):
super(_NoStoresSucceeded, self).__init__(message)


class ImportActionWrapper(object):
"""Wrapper for all the image metadata operations we do during an import.

This is used to consolidate the changes we make to image metadata during
an import operation, and can be used with an admin-capable repo to
enable non-owner controlled modification of that data if desired.

Use this as a context manager to make multiple changes followed by
a save of the image in one operation. An _ImportActions object is
yielded from the context manager, which defines the available operations.

:param image_repo: The ImageRepo we should use to fetch/save the image
:param image-id: The ID of the image we should be altering
"""

def __init__(self, image_repo, image_id):
self._image_repo = image_repo
self._image_id = image_id

def __enter__(self):
self._image = self._image_repo.get(self._image_id)
self._image_previous_status = self._image.status
return _ImportActions(self._image)

def __exit__(self, type, value, traceback):
if type is not None:
# NOTE(danms): Do not save the image if we raised in context
return

if self._image_previous_status != self._image.status:
LOG.debug('Image %(image_id)s status changing from '
'%(old_status)s to %(new_status)s',
{'image_id': self._image_id,
'old_status': self._image_previous_status,
'new_status': self._image.status})
self._image_repo.save(self._image, self._image_previous_status)


class _ImportActions(object):
"""Actions available for being performed on an image during import.

This defines the available actions that can be performed on an image
during import, which may be done with an image owned by another user.

Do not instantiate this object directly, get it from ImportActionWrapper.
"""

IMPORTING_STORES_KEY = 'os_glance_importing_to_stores'
IMPORT_FAILED_KEY = 'os_glance_failed_import'

def __init__(self, image):
self._image = image

@property
def image_id(self):
return self._image.image_id

@property
def image_status(self):
return self._image.status

def merge_store_list(self, list_key, stores, subtract=False):
stores = set([store for store in stores if store])
existing = set(
self._image.extra_properties.get(list_key, '').split(','))

if subtract:
if stores - existing:
LOG.debug('Stores %(stores)s not in %(key)s for '
'image %(image_id)s',
{'stores': ','.join(sorted(stores - existing)),
'key': list_key,
'image_id': self.image_id})
merged_stores = existing - stores
else:
merged_stores = existing | stores

stores_list = ','.join(sorted((store for store in
merged_stores if store)))
self._image.extra_properties[list_key] = stores_list
LOG.debug('Image %(image_id)s %(key)s=%(stores)s',
{'image_id': self.image_id,
'key': list_key,
'stores': stores_list})

def add_importing_stores(self, stores):
"""Add a list of stores to the importing list.

Add stores to os_glance_importing_to_stores

:param stores: A list of store names
"""
self.merge_store_list(self.IMPORTING_STORES_KEY, stores)

def remove_importing_stores(self, stores):
"""Remove a list of stores from the importing list.

Remove stores from os_glance_importing_to_stores

:param stores: A list of store names
"""
self.merge_store_list(self.IMPORTING_STORES_KEY, stores, subtract=True)

def add_failed_stores(self, stores):
"""Add a list of stores to the failed list.

Add stores to os_glance_failed_import

:param stores: A list of store names
"""
self.merge_store_list(self.IMPORT_FAILED_KEY, stores)

def remove_failed_stores(self, stores):
"""Remove a list of stores from the failed list.

Remove stores from os_glance_failed_import

:param stores: A list of store names
"""
self.merge_store_list(self.IMPORT_FAILED_KEY, stores, subtract=True)

def set_image_data(self, uri, task_id, backend, set_active):
"""Populate image with data on a specific backend.

This is used during an image import operation to populate the data
in a given store for the image. If this object wraps an admin-capable
image_repo, then this will be done with admin credentials on behalf
of a user already determined to be able to perform this operation
(such as a copy-image import of an existing image owned by another
user).

:param uri: Source URL for image data
:param task_id: The task responsible for this operation
:param backend: The backend store to target the data
:param set_active: Whether or not to set the image to 'active'
state after the operation completes
"""
return image_import.set_image_data(self._image, uri, task_id,
backend=backend,
set_active=set_active)

def set_image_status(self, status):
"""Set the image status.

:param status: The new status of the image
"""
self._image.status = status

def remove_location_for_store(self, backend):
"""Remove a location from an image given a backend store.

Given a backend store, remove the corresponding location from the
image's set of locations. If the last location is removed, remove
the image checksum, hash information, and size.

:param backend: The backend store to remove from the image
"""

for i, location in enumerate(self._image.locations):
if location.get('metadata', {}).get('store') == backend:
try:
self._image.locations.pop(i)
except (store_exceptions.NotFound,
store_exceptions.Forbidden):
msg = (_("Error deleting from store %(store)s when "
"reverting.") % {'store': backend})
LOG.warning(msg)
# NOTE(yebinama): Some store drivers doesn't document which
# exceptions they throw.
except Exception:
msg = (_("Unexpected exception when deleting from store "
"%(store)s.") % {'store': backend})
LOG.warning(msg)
else:
if len(self._image.locations) == 0:
self._image.checksum = None
self._image.os_hash_algo = None
self._image.os_hash_value = None
self._image.size = None
break


class _DeleteFromFS(task.Task):

def __init__(self, task_id, task_type):
@@ -201,13 +383,12 @@ class _VerifyStaging(task.Task):

class _ImportToStore(task.Task):

def __init__(self, task_id, task_type, image_repo, uri, image_id, backend,
all_stores_must_succeed, set_active):
def __init__(self, task_id, task_type, action_wrapper, uri,
backend, all_stores_must_succeed, set_active):
self.task_id = task_id
self.task_type = task_type
self.image_repo = image_repo
self.action_wrapper = action_wrapper
self.uri = uri
self.image_id = image_id
self.backend = backend
self.all_stores_must_succeed = all_stores_must_succeed
self.set_active = set_active
@@ -217,7 +398,6 @@ class _ImportToStore(task.Task):
def execute(self, file_path=None):
"""Bringing the imported image to back end store

:param image_id: Glance Image ID
:param file_path: path to the image file
"""
# NOTE(flaper87): Let's dance... and fall
@@ -260,14 +440,19 @@ class _ImportToStore(task.Task):
# NOTE(jokke): The different options here are kind of pointless as we
# will need the file path anyways for our delete workflow for now.
# For future proofing keeping this as is.
image = self.image_repo.get(self.image_id)
if image.status == "deleted":

with self.action_wrapper as action:
self._execute(action, file_path)

def _execute(self, action, file_path):

if action.image_status == "deleted":
raise exception.ImportTaskError("Image has been deleted, aborting"
" import.")
try:
image_import.set_image_data(image, file_path or self.uri,
self.task_id, backend=self.backend,
set_active=self.set_active)
action.set_image_data(file_path or self.uri,
self.task_id, backend=self.backend,
set_active=self.set_active)
# NOTE(yebinama): set_image_data catches Exception and raises from
# them. Can't be more specific on exceptions catched.
except Exception:
@@ -278,25 +463,10 @@ class _ImportToStore(task.Task):
{'task_id': self.task_id, 'task_type': self.task_type})
LOG.warning(msg)
if self.backend is not None:
failed_import = image.extra_properties.get(
'os_glance_failed_import', '').split(',')
failed_import.append(self.backend)
image.extra_properties['os_glance_failed_import'] = ','.join(
failed_import).lstrip(',')
action.add_failed_stores([self.backend])

if self.backend is not None:
importing = image.extra_properties.get(
'os_glance_importing_to_stores', '').split(',')
try:
importing.remove(self.backend)
image.extra_properties[
'os_glance_importing_to_stores'] = ','.join(
importing).lstrip(',')
except ValueError:
LOG.debug("Store %s not found in property "
"os_glance_importing_to_stores.", self.backend)
# NOTE(flaper87): We need to save the image again after
# the locations have been set in the image.
self.image_repo.save(image)
action.remove_importing_stores([self.backend])

def revert(self, result, **kwargs):
"""
@@ -304,40 +474,16 @@ class _ImportToStore(task.Task):

:param result: taskflow result object
"""
image = self.image_repo.get(self.image_id)
for i, location in enumerate(image.locations):
if location.get('metadata', {}).get('store') == self.backend:
try:
image.locations.pop(i)
except (store_exceptions.NotFound,
store_exceptions.Forbidden):
msg = (_("Error deleting from store %{store}s when "
"reverting.") % {'store': self.backend})
LOG.warning(msg)
# NOTE(yebinama): Some store drivers doesn't document which
# exceptions they throw.
except Exception:
msg = (_("Unexpected exception when deleting from store"
"%{store}s.") % {'store': self.backend})
LOG.warning(msg)
else:
if len(image.locations) == 0:
image.checksum = None
image.os_hash_algo = None
image.os_hash_value = None
image.size = None
self.image_repo.save(image)
break
with self.action_wrapper as action:
action.remove_location_for_store(self.backend)


class _VerifyImageState(task.Task):

def __init__(self, task_id, task_type, image_repo, image_id,
import_method):
def __init__(self, task_id, task_type, action_wrapper, import_method):
self.task_id = task_id
self.task_type = task_type
self.image_repo = image_repo
self.image_id = image_id
self.action_wrapper = action_wrapper
self.import_method = import_method
super(_VerifyImageState, self).__init__(
name='%s-VerifyImageState-%s' % (task_type, task_id))
@@ -347,16 +493,15 @@ class _VerifyImageState(task.Task):

:param image_id: Glance Image ID
"""
new_image = self.image_repo.get(self.image_id)
if new_image.status != 'active':
raise _NoStoresSucceeded(_('None of the uploads finished!'))
with self.action_wrapper as action:
if action.image_status != 'active':
raise _NoStoresSucceeded(_('None of the uploads finished!'))

def revert(self, result, **kwargs):
"""Set back to queued if this wasn't copy-image job."""
if self.import_method != 'copy-image':
new_image = self.image_repo.get(self.image_id)
new_image.status = 'queued'
self.image_repo.save_image(new_image)
with self.action_wrapper as action:
if self.import_method != 'copy-image':
action.set_image_status('queued')


class _CompleteTask(task.Task):
@@ -415,6 +560,7 @@ def get_flow(**kwargs):
task_type = kwargs.get('task_type')
task_repo = kwargs.get('task_repo')
image_repo = kwargs.get('image_repo')
admin_repo = kwargs.get('admin_repo')
image_id = kwargs.get('image_id')
import_method = kwargs.get('import_req')['method']['name']
uri = kwargs.get('import_req')['method'].get('uri')
@@ -426,6 +572,10 @@ def get_flow(**kwargs):
if not CONF.enabled_backends and not CONF.node_staging_uri.endswith('/'):
separator = '/'

# Instantiate an action wrapper with the admin repo if we got one,
# otherwise with the regular repo.
action_wrapper = ImportActionWrapper(admin_repo or image_repo, image_id)

if not uri and import_method in ['glance-direct', 'copy-image']:
if CONF.enabled_backends:
separator, staging_dir = store_utils.get_dir_separator()
@@ -465,9 +615,8 @@ def get_flow(**kwargs):
import_task = lf.Flow(task_name)
import_to_store = _ImportToStore(task_id,
task_name,
image_repo,
action_wrapper,
file_uri,
image_id,
store,
all_stores_must_succeed,
set_active)
@@ -479,8 +628,7 @@ def get_flow(**kwargs):

verify_task = _VerifyImageState(task_id,
task_type,
image_repo,
image_id,
action_wrapper,
import_method)
flow.add(verify_task)

@@ -490,16 +638,10 @@ def get_flow(**kwargs):
image_id)
flow.add(complete_task)

image = image_repo.get(image_id)
from_state = image.status
if import_method != 'copy-image':
image.status = 'importing'

image.extra_properties[
'os_glance_importing_to_stores'] = ','.join((store for store in
stores if
store is not None))
image.extra_properties['os_glance_failed_import'] = ''
image_repo.save(image, from_state=from_state)
with action_wrapper as action:
if import_method != 'copy-image':
action.set_image_status('importing')
action.add_importing_stores(stores)
action.remove_failed_stores(stores)

return flow

+ 257
- 39
glance/tests/unit/async_/flows/test_api_image_import.py View File

@@ -15,16 +15,15 @@

from unittest import mock

from glance_store import exceptions as store_exceptions
from oslo_config import cfg

from glance.api import authorization
import glance.async_.flows.api_image_import as import_flow
from glance.common import exception
from glance.common.exception import ImportTaskError
from glance.common.scripts.image_import import main as image_import
from glance import context
from glance import gateway
import glance.tests.utils as test_utils
from glance_store import exceptions as store_exceptions

from cursive import exception as cursive_exception

@@ -112,9 +111,10 @@ class TestImportToStoreTask(test_utils.BaseTestCase):

def test_raises_when_image_deleted(self):
img_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
img_repo, "http://url",
IMAGE_ID1, "store1", False,
wrapper, "http://url",
"store1", False,
True)
image = self.img_factory.new_image(image_id=UUID1)
image.status = "deleted"
@@ -124,9 +124,10 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
@mock.patch("glance.async_.flows.api_image_import.image_import")
def test_remove_store_from_property(self, mock_import):
img_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
img_repo, "http://url",
IMAGE_ID1, "store1", True,
wrapper, "http://url",
"store1", True,
True)
extra_properties = {"os_glance_importing_to_stores": "store1,store2"}
image = self.img_factory.new_image(image_id=UUID1,
@@ -139,9 +140,10 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
@mock.patch("glance.async_.flows.api_image_import.image_import")
def test_raises_when_all_stores_must_succeed(self, mock_import):
img_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
img_repo, "http://url",
IMAGE_ID1, "store1", True,
wrapper, "http://url",
"store1", True,
True)
image = self.img_factory.new_image(image_id=UUID1)
img_repo.get.return_value = image
@@ -154,9 +156,10 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
@mock.patch("glance.async_.flows.api_image_import.image_import")
def test_doesnt_raise_when_not_all_stores_must_succeed(self, mock_import):
img_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
img_repo, "http://url",
IMAGE_ID1, "store1", False,
wrapper, "http://url",
"store1", False,
True)
image = self.img_factory.new_image(image_id=UUID1)
img_repo.get.return_value = image
@@ -228,10 +231,10 @@ class TestVerifyImageStateTask(test_utils.BaseTestCase):
fake_img = mock.MagicMock(status='active')
mock_repo = mock.MagicMock()
mock_repo.get.return_value = fake_img
wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1)

task = import_flow._VerifyImageState(TASK_ID1, TASK_TYPE,
mock_repo, IMAGE_ID1,
'anything!')
wrapper, 'anything!')

task.execute()

@@ -240,28 +243,23 @@ class TestVerifyImageStateTask(test_utils.BaseTestCase):
task.execute)

def test_revert_copy_status_unchanged(self):
fake_img = mock.MagicMock(status='active')
mock_repo = mock.MagicMock()
mock_repo.get.return_value = fake_img
wrapper = mock.MagicMock()
task = import_flow._VerifyImageState(TASK_ID1, TASK_TYPE,
mock_repo, IMAGE_ID1,
'copy-image')
wrapper, 'copy-image')
task.revert(mock.sentinel.result)

# If we are doing copy-image, no state update should be made
mock_repo.save_image.assert_not_called()
wrapper.__enter__.return_value.set_image_status.assert_not_called()

def test_reverts_state_nocopy(self):
fake_img = mock.MagicMock(status='importing')
mock_repo = mock.MagicMock()
mock_repo.get.return_value = fake_img
wrapper = mock.MagicMock()
task = import_flow._VerifyImageState(TASK_ID1, TASK_TYPE,
mock_repo, IMAGE_ID1,
'glance-direct')
wrapper, 'glance-direct')
task.revert(mock.sentinel.result)

# Except for copy-image, image state should revert to queued
mock_repo.save_image.assert_called_once()
action = wrapper.__enter__.return_value
action.set_image_status.assert_called_once_with('queued')


class TestImportCopyImageTask(test_utils.BaseTestCase):
@@ -274,21 +272,241 @@ class TestImportCopyImageTask(test_utils.BaseTestCase):
overwrite=False)

@mock.patch("glance.async_.flows.api_image_import.image_import")
def test_copy_as_non_owner(self, mock_import):
img_repo_db = mock.MagicMock()
img_repo = authorization.ImageRepoProxy(img_repo_db, self.context)
def test_init_copy_flow_as_non_owner(self, mock_import):
img_repo = mock.MagicMock()
admin_repo = mock.MagicMock()

fake_req = {"method": {"name": "copy-image"},
"backend": ['cheap']}

# FIXME(danms): Right now, this fails with Forbidden because we
# don't own the image
self.assertRaises(exception.Forbidden,
import_flow.get_flow,
task_id=TASK_ID1,
task_type=TASK_TYPE,
task_repo=mock.MagicMock(),
image_repo=img_repo,
image_id=IMAGE_ID1,
import_req=fake_req,
backend=['cheap'])
fake_img = mock.MagicMock()
fake_img.id = IMAGE_ID1
fake_img.status = 'active'
fake_img.extra_properties = {}
admin_repo.get.return_value = fake_img

import_flow.get_flow(task_id=TASK_ID1,
task_type=TASK_TYPE,
task_repo=mock.MagicMock(),
image_repo=img_repo,
admin_repo=admin_repo,
image_id=IMAGE_ID1,
import_req=fake_req,
backend=['cheap'])

# Assert that we saved the image with the admin repo instead of the
# user-context one at the end of get_flow() when we initialize the
# parameters.
admin_repo.save.assert_called_once_with(fake_img, 'active')
img_repo.save.assert_not_called()


class TestImportActionWrapper(test_utils.BaseTestCase):
def test_wrapper_success(self):
mock_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1)
with wrapper as action:
self.assertIsInstance(action, import_flow._ImportActions)
mock_repo.get.assert_called_once_with(IMAGE_ID1)
mock_repo.save.assert_called_once_with(
mock_repo.get.return_value,
mock_repo.get.return_value.status)

def test_wrapper_failure(self):
mock_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1)

class SpecificError(Exception):
pass

try:
with wrapper:
raise SpecificError('some failure')
except SpecificError:
# NOTE(danms): Make sure we only caught the test exception
# and aren't hiding anything else
pass

mock_repo.get.assert_called_once_with(IMAGE_ID1)
mock_repo.save.assert_not_called()

@mock.patch.object(import_flow, 'LOG')
def test_wrapper_logs_status(self, mock_log):
mock_repo = mock.MagicMock()
mock_image = mock_repo.get.return_value
wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1)

mock_image.status = 'foo'
with wrapper as action:
action.set_image_status('bar')

mock_log.debug.assert_called_once_with(
'Image %(image_id)s status changing from '
'%(old_status)s to %(new_status)s',
{'image_id': IMAGE_ID1,
'old_status': 'foo',
'new_status': 'bar'})
self.assertEqual('bar', mock_image.status)


class TestImportActions(test_utils.BaseTestCase):
def setUp(self):
super(TestImportActions, self).setUp()
self.image = mock.MagicMock()
self.image.image_id = IMAGE_ID1
self.image.status = 'active'
self.image.extra_properties = {'speed': '88mph'}
self.image.checksum = mock.sentinel.checksum
self.image.os_hash_algo = mock.sentinel.hash_algo
self.image.os_hash_value = mock.sentinel.hash_value
self.image.size = mock.sentinel.size
self.actions = import_flow._ImportActions(self.image)

def test_image_property_proxies(self):
self.assertEqual(IMAGE_ID1, self.actions.image_id)
self.assertEqual('active', self.actions.image_status)

def test_merge_store_list(self):
# Addition with no existing property works
self.actions.merge_store_list('stores', ['foo', 'bar'])
self.assertEqual({'speed': '88mph',
'stores': 'bar,foo'},
self.image.extra_properties)

# Addition adds to the list
self.actions.merge_store_list('stores', ['baz'])
self.assertEqual('bar,baz,foo', self.image.extra_properties['stores'])

# Removal preserves the rest
self.actions.merge_store_list('stores', ['foo'], subtract=True)
self.assertEqual('bar,baz', self.image.extra_properties['stores'])

# Duplicates aren't duplicated
self.actions.merge_store_list('stores', ['bar'])
self.assertEqual('bar,baz', self.image.extra_properties['stores'])

# Removing the last store leaves the key empty but present
self.actions.merge_store_list('stores', ['baz', 'bar'], subtract=True)
self.assertEqual('', self.image.extra_properties['stores'])

# Make sure we ignore falsey stores
self.actions.merge_store_list('stores', ['', None])
self.assertEqual('', self.image.extra_properties['stores'])

@mock.patch.object(import_flow, 'LOG')
def test_merge_store_logs_info(self, mock_log):
# Removal from non-present key logs debug, but does not fail
self.actions.merge_store_list('stores', ['foo,bar'], subtract=True)
mock_log.debug.assert_has_calls([
mock.call(
'Stores %(stores)s not in %(key)s for image %(image_id)s',
{'image_id': IMAGE_ID1,
'key': 'stores',
'stores': 'foo,bar'}),
mock.call(
'Image %(image_id)s %(key)s=%(stores)s',
{'image_id': IMAGE_ID1,
'key': 'stores',
'stores': ''}),
])

mock_log.debug.reset_mock()

self.actions.merge_store_list('stores', ['foo'])
self.assertEqual('foo', self.image.extra_properties['stores'])

mock_log.debug.reset_mock()

# Removal from a list where store is not present logs debug,
# but does not fail
self.actions.merge_store_list('stores', ['bar'], subtract=True)
self.assertEqual('foo', self.image.extra_properties['stores'])
mock_log.debug.assert_has_calls([
mock.call(
'Stores %(stores)s not in %(key)s for image %(image_id)s',
{'image_id': IMAGE_ID1,
'key': 'stores',
'stores': 'bar'}),
mock.call(
'Image %(image_id)s %(key)s=%(stores)s',
{'image_id': IMAGE_ID1,
'key': 'stores',
'stores': 'foo'}),
])

def test_store_list_helpers(self):
self.actions.add_importing_stores(['foo', 'bar', 'baz'])
self.actions.remove_importing_stores(['bar'])
self.actions.add_failed_stores(['foo', 'bar'])
self.actions.remove_failed_stores(['foo'])
self.assertEqual({'speed': '88mph',
'os_glance_importing_to_stores': 'baz,foo',
'os_glance_failed_import': 'bar'},
self.image.extra_properties)

@mock.patch.object(image_import, 'set_image_data')
def test_set_image_data(self, mock_sid):
self.assertEqual(mock_sid.return_value,
self.actions.set_image_data(
mock.sentinel.uri, mock.sentinel.task_id,
mock.sentinel.backend, mock.sentinel.set_active))
mock_sid.assert_called_once_with(
self.image, mock.sentinel.uri, mock.sentinel.task_id,
backend=mock.sentinel.backend, set_active=mock.sentinel.set_active)

def test_remove_location_for_store(self):
self.image.locations = [
{},
{'metadata': {}},
{'metadata': {'store': 'foo'}},
{'metadata': {'store': 'bar'}},
]

self.actions.remove_location_for_store('foo')
self.assertEqual([{}, {'metadata': {}},
{'metadata': {'store': 'bar'}}],
self.image.locations)

# Add a second definition for bar and make sure only one is removed
self.image.locations.append({'metadata': {'store': 'bar'}})
self.actions.remove_location_for_store('bar')
self.assertEqual([{}, {'metadata': {}},
{'metadata': {'store': 'bar'}}],
self.image.locations)

def test_remove_location_for_store_last_location(self):
self.image.locations = [{'metadata': {'store': 'foo'}}]
self.actions.remove_location_for_store('foo')
self.assertEqual([], self.image.locations)
self.assertIsNone(self.image.checksum)
self.assertIsNone(self.image.os_hash_algo)
self.assertIsNone(self.image.os_hash_value)
self.assertIsNone(self.image.size)

@mock.patch.object(import_flow, 'LOG')
def test_remove_location_for_store_pop_failures(self, mock_log):
class TestList(list):
def pop(self):
pass

self.image.locations = TestList([{'metadata': {'store': 'foo'}}])
with mock.patch.object(self.image.locations, 'pop',
new_callable=mock.PropertyMock) as mock_pop:

mock_pop.side_effect = store_exceptions.NotFound(image='image')
self.actions.remove_location_for_store('foo')
mock_log.warning.assert_called_once_with(
_('Error deleting from store foo when reverting.'))
mock_log.warning.reset_mock()

mock_pop.side_effect = store_exceptions.Forbidden()
self.actions.remove_location_for_store('foo')
mock_log.warning.assert_called_once_with(
_('Error deleting from store foo when reverting.'))
mock_log.warning.reset_mock()

mock_pop.side_effect = Exception
self.actions.remove_location_for_store('foo')
mock_log.warning.assert_called_once_with(
_('Unexpected exception when deleting from store foo.'))
mock_log.warning.reset_mock()

Loading…
Cancel
Save