Cancel hashing operation if image is deleted

Used file based locking to track the progress of hash calculation
task. If image is deleted in between and delete call is not
processed on the host where hash operation is in progress then
it will redirect to that host, cancels the hash operation and
then deletes the image.

Closes-Bug: #2045769
Change-Id: I956804491170b92686dc4de7b924371356b2626a
This commit is contained in:
Abhishek Kekane
2025-05-28 09:45:51 +00:00
parent 1311d5fe2f
commit f84e4f0123
5 changed files with 199 additions and 44 deletions

View File

@@ -47,6 +47,7 @@ from glance.i18n import _, _LE, _LI, _LW
import glance.notifier
from glance.quota import keystone as ks_quota
import glance.schema
from glance import task_cancellation_tracker as tracker
LOG = logging.getLogger(__name__)
@@ -57,6 +58,8 @@ CONF.import_opt('container_formats', 'glance.common.config',
CONF.import_opt('show_multiple_locations', 'glance.common.config')
CONF.import_opt('hashing_algorithm', 'glance.common.config')
PROXYABLE_HOSTS = ['os_glance_stage_host', 'os_glance_hash_op_host']
def proxy_response_error(orig_code, orig_explanation):
"""Construct a webob.exc.HTTPError exception on the fly.
@@ -237,12 +240,13 @@ class ImagesController(object):
{'image': image.image_id, 'task': task.task_id,
'keys': ','.join(changed)})
def _proxy_request_to_stage_host(self, image, req, body=None):
"""Proxy a request to a staging host.
def _proxy_request_to_host(self, image, req, body=None):
"""Proxy a request to a respected host.
When an image was staged on another worker, that worker may record its
worker_self_reference_url on the image, indicating that other workers
should proxy requests to it while the image is staged. This method
When an image was staged on another worker or Location add workflow
having hash calculation operation is in progress, that worker may
record its worker_self_reference_url on the image, indicating that
other workers should proxy requests to it. This method
replays our current request against the remote host, returns the
result, and performs any response error translation required.
@@ -258,13 +262,12 @@ class ImagesController(object):
remote host.
"""
stage_host = image.extra_properties['os_glance_stage_host']
host = self.is_proxyable(image)
LOG.info(_LI('Proxying %s request to host %s '
'which has image staged'),
req.method, stage_host)
'which has image.'),
req.method, host)
client = glance_context.get_ksa_client(req.context)
url = '%s%s' % (stage_host, req.path)
req_id_hdr = 'x-openstack-request-id'
url = '%s%s' % (host, req.path)
request_method = getattr(client, req.method.lower())
try:
r = request_method(url, json=body, timeout=60)
@@ -294,17 +297,18 @@ class ImagesController(object):
return CONF.worker_self_reference_url or CONF.public_endpoint
def is_proxyable(self, image):
"""Decide if an action is proxyable to a stage host.
If the image has a staging host recorded with a URL that does not match
ours, then we can proxy our request to that host.
"""
Return the host from extra_properties for any key in PROXYABLE_HOSTS
if it is not a local host.
:param image: The Image from the repo
:returns: bool indicating proxyable status
:returns: host string if proxyable, else None
"""
return (
'os_glance_stage_host' in image.extra_properties and
image.extra_properties['os_glance_stage_host'] != self.self_url)
for host_key in PROXYABLE_HOSTS:
host = image.extra_properties.get(host_key)
if host and host != self.self_url:
return host
return None
@utils.mutating
def import_image(self, req, image_id, body):
@@ -435,7 +439,7 @@ class ImagesController(object):
# NOTE(danms): Image is staged on another worker; proxy the
# import request to that worker with the user's token, as if
# they had called it themselves.
return self._proxy_request_to_stage_host(image, req, body)
return self._proxy_request_to_host(image, req, body)
task_input = {'image_id': image_id,
'import_req': body,
@@ -834,7 +838,7 @@ class ImagesController(object):
:raises: webob.exc.HTTPClientError if so raised by the remote server.
"""
try:
self._proxy_request_to_stage_host(image, req)
self._proxy_request_to_host(image, req)
except webob.exc.HTTPServerError:
# This means we would have raised a 50x error, indicating
# we did not succeed with the request to the remote host.
@@ -867,6 +871,15 @@ class ImagesController(object):
# Delete was proxied, so we are done here.
return
# NOTE(abhishekk): Here we can cancel the hash calculation
# operation and signal exit to the worker that is performing
# the hash
operation_id = image.extra_properties.get('os_glance_import_task')
os_hashing_host = image.extra_properties.get(
'os_glance_hash_op_host')
if operation_id and os_hashing_host:
tracker.cancel_operation(operation_id)
# NOTE(abhishekk): Delete the data from staging area
if CONF.enabled_backends:
separator, staging_dir = store_utils.get_dir_separator()
@@ -1697,7 +1710,7 @@ class RequestDeserializer(wsgi.JSONRequestDeserializer):
class ResponseSerializer(wsgi.JSONResponseSerializer):
# These properties will be filtered out from the response and not
# exposed to the client
_hidden_properties = ['os_glance_stage_host']
_hidden_properties = ['os_glance_stage_host', 'os_glance_hash_op_host']
def __init__(self, schema=None, location_schema=None):
super(ResponseSerializer, self).__init__()

View File

@@ -18,6 +18,7 @@ import glance_store as store
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import encodeutils
from oslo_utils import excutils
from taskflow.patterns import linear_flow as lf
from taskflow import retry
from taskflow import task
@@ -26,6 +27,7 @@ import glance.async_.flows.api_image_import as image_import
from glance.common import exception
from glance.common import store_utils
from glance.i18n import _, _LW
from glance import task_cancellation_tracker as tracker
LOG = logging.getLogger(__name__)
@@ -44,6 +46,12 @@ class _InvalidLocation(exception.GlanceException):
super(_InvalidLocation, self).__init__(message)
class _HashCalculationCanceled(exception.GlanceException):
def __init__(self, message):
super(_HashCalculationCanceled, self).__init__(message)
class _CalculateHash(task.Task):
def __init__(self, task_id, task_type, image_repo, image_id,
@@ -61,6 +69,10 @@ class _CalculateHash(task.Task):
current_os_hash_value = hashlib.new(self.hashing_algo)
current_checksum = hashlib.md5(usedforsecurity=False)
for chunk in image.get_data():
if tracker.is_canceled(self.task_id):
raise _HashCalculationCanceled(
_('Hash calculation for image %s has been '
'canceled') % self.image_id)
if chunk is None:
break
current_checksum.update(chunk)
@@ -69,12 +81,17 @@ class _CalculateHash(task.Task):
image.os_hash_value = current_os_hash_value.hexdigest()
def _set_checksum_and_hash(self, image):
tracker.register_operation(self.task_id)
retries = 0
while retries <= CONF.http_retries and image.os_hash_value is None:
retries += 1
try:
self._calculate_hash(image)
self.image_repo.save(image)
except _HashCalculationCanceled as e:
with excutils.save_and_reraise_exception():
LOG.debug('Hash calculation cancelled: %s',
encodeutils.exception_to_unicode(e))
except IOError as e:
LOG.debug('[%i/%i] Hash calculation failed due to %s',
retries, CONF.http_retries,
@@ -92,26 +109,25 @@ class _CalculateHash(task.Task):
"data") % self.image_id)
LOG.warning(msg)
except store.exceptions.NotFound:
# NOTE(pdeore): This can happen if image delete attempted
# when hash calculation is in progress, which deletes the
# image data from backend(specially rbd) but image remains
# in 'active' state.
# see: https://bugs.launchpad.net/glance/+bug/2045769
# Once this ceph side issue is fixed, we'll keep only the
# warning message here and will remove the deletion part
# which is a temporary workaround.
LOG.debug(_('Failed to calculate checksum of %(image_id)s '
'as image data has been deleted from the '
'backend'), {'image_id': self.image_id})
image.delete()
self.image_repo.remove(image)
break
finally:
tracker.signal_finished(self.task_id)
image.extra_properties.pop('os_glance_hash_op_host', None)
self.image_repo.save(image)
def execute(self):
image = self.image_repo.get(self.image_id)
if image.status == 'queued':
image.status = self.image_status
image.os_hash_algo = self.hashing_algo
# NOTE(abhishekk): Record this worker's
# worker_self_reference_url in the image metadata, so we
# know who is calculating the checksum and hash.
self_url = CONF.worker_self_reference_url or CONF.public_endpoint
if self_url:
image.extra_properties['os_glance_hash_op_host'] = self_url
self.image_repo.save(image)
self._set_checksum_and_hash(image)
@@ -121,6 +137,7 @@ class _CalculateHash(task.Task):
state
"""
try:
tracker.signal_finished(self.task_id)
image = self.image_repo.get(self.image_id)
if image.status == 'importing':
if not image.locations[0]['url'].startswith("http"):

View File

@@ -1455,6 +1455,7 @@ class SynchronousAPIBase(test_utils.BaseTestCase):
group='glance_store')
node_staging_uri = 'file://%s' % os.path.join(
self.test_dir, 'staging')
utils.safe_mkdirs(node_staging_uri[7:])
self.config(node_staging_uri=node_staging_uri)
self.config(default_store='file', group='glance_store')
glance_store.create_stores(CONF)

View File

@@ -24,6 +24,7 @@ from oslo_utils import units
import glance.async_.flows.location_import as import_flow
from glance.common import exception
from glance import context
from glance import task_cancellation_tracker as task_tracker
import glance.tests.unit.utils as unit_test_utils
import glance.tests.utils as test_utils
@@ -58,7 +59,10 @@ class TestCalculateHashTask(test_utils.BaseTestCase):
project_id=TENANT1,
overwrite=False)
def test_execute_calculate_hash(self):
@mock.patch.object(task_tracker, 'signal_finished')
@mock.patch.object(task_tracker, 'register_operation')
def test_execute_calculate_hash(self, mock_register_operation,
mock_signal_finished):
self.loc_url = '%s/fake_location_1' % (BASE_URI)
self.image.status = 'queued'
hashing_algo = CONF.hashing_algorithm
@@ -85,7 +89,54 @@ class TestCalculateHashTask(test_utils.BaseTestCase):
self.assertIsNotNone(self.image.os_hash_value)
self.assertEqual('active', self.image.status)
def test_hash_calculation_retry_count(self):
@mock.patch.object(task_tracker, 'register_operation')
def test_execute_cancel_hash_op(self, mock_register_operation):
self.loc_url = '%s/fake_location_1' % (BASE_URI)
self.image.status = 'queued'
hashing_algo = CONF.hashing_algorithm
location_update = import_flow._UpdateLocationTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1, self.loc_url,
self.context)
location_update.execute()
self.assertEqual(1, self.image.locations.append.call_count)
set_image_active = import_flow._SetImageToActiveTask(
TASK_ID1, TASK_TYPE, self.image_repo, IMAGE_ID1)
set_image_active.execute()
self.assertEqual('active', self.image.status)
# Simulate cancellation
expected_size = 4 * units.Ki
expected_data = b"*" * expected_size
self.image.get_data.return_value = io.BytesIO(expected_data)
self.image.checksum = None
self.image.os_hash_value = None
hash_calculation = import_flow._CalculateHash(TASK_ID1, TASK_TYPE,
self.image_repo,
IMAGE_ID1,
hashing_algo)
with mock.patch.object(
task_tracker, 'is_canceled') as mock_is_canceled:
mock_is_canceled.return_value = True
with mock.patch.object(
import_flow.LOG, 'debug') as mock_debug:
self.assertRaises(import_flow._HashCalculationCanceled,
hash_calculation.execute)
mock_debug.assert_any_call('Hash calculation cancelled: %s',
mock.ANY)
mock_register_operation.assert_called()
self.assertIsNone(self.image.checksum)
self.assertIsNone(self.image.os_hash_value)
self.assertEqual('active', self.image.status)
@mock.patch.object(task_tracker, 'signal_finished')
@mock.patch.object(task_tracker, 'register_operation')
def test_hash_calculation_retry_count(self, mock_register_operation,
mock_signal_finished):
hashing_algo = CONF.hashing_algorithm
self.image.checksum = None
self.image.os_hash_value = None
@@ -109,7 +160,10 @@ class TestCalculateHashTask(test_utils.BaseTestCase):
hash_calculation.revert(None)
self.assertIsNone(self.image.os_hash_algo)
def test_execute_hash_calculation_fails_without_validation_data(self):
@mock.patch.object(task_tracker, 'signal_finished')
@mock.patch.object(task_tracker, 'register_operation')
def test_execute_hash_calculation_fails_without_validation_data(
self, mock_register_operation, mock_signal_finished):
self.loc_url = '%s/fake_location_1' % (BASE_URI)
self.image.status = 'queued'
self.hash_task_input.update(loc_url=self.loc_url)
@@ -173,7 +227,10 @@ class TestCalculateHashTask(test_utils.BaseTestCase):
self.assertEqual('active', self.image.status)
self.assertEqual(1, len(self.image.locations))
def test_execute_hash_calculation_fails_for_store_other_that_http(self):
@mock.patch.object(task_tracker, 'signal_finished')
@mock.patch.object(task_tracker, 'register_operation')
def test_execute_hash_calculation_fails_for_store_other_that_http(
self, mock_register_operation, mock_signal_finished):
self.loc_url = "cinder://image/fake_location"
self.hash_task_input.update(loc_url=self.loc_url)
self.image.status = 'queued'
@@ -215,7 +272,10 @@ class TestCalculateHashTask(test_utils.BaseTestCase):
self.assertEqual('queued', self.image.status)
self.assertEqual(0, len(self.image.locations))
def test_execute_hash_calculation_fails_if_image_data_deleted(self):
@mock.patch.object(task_tracker, 'signal_finished')
@mock.patch.object(task_tracker, 'register_operation')
def test_execute_hash_calculation_fails_if_image_data_deleted(
self, mock_register_operation, mock_signal_finished):
self.loc_url = '%s/fake_location_1' % (BASE_URI)
self.image.status = 'queued'
self.hash_task_input.update(loc_url=self.loc_url)
@@ -241,10 +301,6 @@ class TestCalculateHashTask(test_utils.BaseTestCase):
hashing_algo)
self.image.get_data.side_effect = store.exceptions.NotFound
hash_calculation.execute()
# Check if Image delete and image_repo.delete has been called
# if exception raised
self.image.delete.assert_called_once()
self.image_repo.remove.assert_called_once_with(self.image)
class TestVerifyValidationDataTask(test_utils.BaseTestCase):
@@ -262,7 +318,13 @@ class TestVerifyValidationDataTask(test_utils.BaseTestCase):
self.image.container_format = 'bare'
self.config(do_secure_hash=True)
def test_execute_with_valid_validation_data(self):
@mock.patch.object(task_tracker, 'is_canceled')
@mock.patch.object(task_tracker, 'signal_finished')
@mock.patch.object(task_tracker, 'register_operation')
def test_execute_with_valid_validation_data(self, mock_register_operation,
mock_signal_finished,
mock_is_canceled):
mock_is_canceled.return_value = False
url = '%s/fake_location_1' % BASE_URI
self.image.status = 'queued'
self.image.locations = {"url": url, "metadata": {"store": "foo"}}
@@ -302,7 +364,13 @@ class TestVerifyValidationDataTask(test_utils.BaseTestCase):
set_image_active.execute()
self.assertEqual('active', self.image.status)
def test_execute_with_os_hash_value_other_than_512(self):
@mock.patch.object(task_tracker, 'is_canceled')
@mock.patch.object(task_tracker, 'signal_finished')
@mock.patch.object(task_tracker, 'register_operation')
def test_execute_with_os_hash_value_other_than_512(
self, mock_register_operation, mock_signal_finished,
mock_is_canceled):
mock_is_canceled.return_value = False
url = '%s/fake_location_1' % BASE_URI
self.image.status = 'queued'
self.image.locations = {"url": url, "metadata": {"store": "foo"}}
@@ -340,7 +408,10 @@ class TestVerifyValidationDataTask(test_utils.BaseTestCase):
set_image_active.execute()
self.assertEqual('active', self.image.status)
def test_execute_with_invalid_validation_data(self):
@mock.patch.object(task_tracker, 'signal_finished')
@mock.patch.object(task_tracker, 'register_operation')
def test_execute_with_invalid_validation_data(
self, mock_register_operation, mock_signal_finished):
url = '%s/fake_location_1' % BASE_URI
self.image.status = 'queued'
self.image.locations = [{"url": url, "metadata": {"store": "foo"}}]

View File

@@ -38,6 +38,7 @@ from glance.common import store_utils
from glance import domain
import glance.notifier
import glance.schema
from glance import task_cancellation_tracker as tracker
from glance.tests.unit import base
from glance.tests.unit.keymgr import fake as fake_keymgr
import glance.tests.unit.utils as unit_test_utils
@@ -3168,6 +3169,58 @@ class TestImagesController(base.IsolatedUnitTest):
self.assertEqual('deleted', deleted_img['status'])
self.assertNotIn('%s/%s' % (BASE_URI, UUID1), self.store.data)
def test_delete_cancels_operation(self):
request = unit_test_utils.get_fake_request()
# Set up an image with an import task operation id
operation_id = 'fake-operation-id'
hash_op_host = 'https://glance-worker1.openstack.org'
extra_props = {
'os_glance_import_task': operation_id,
'os_glance_hash_op_host': hash_op_host,
}
self.db.image_update(None, UUID1, {'properties': extra_props})
self.assertIn('%s/%s' % (BASE_URI, UUID1), self.store.data)
with mock.patch.object(
self.controller, 'is_proxyable') as mock_is_proxyable:
mock_is_proxyable.return_value = False
with mock.patch.object(tracker, 'cancel_operation') as mock_cancel:
self.controller.delete(request, UUID1)
mock_cancel.assert_called_once_with(operation_id)
deleted_img = self.db.image_get(
request.context, UUID1, force_show_deleted=True)
self.assertTrue(deleted_img['deleted'])
self.assertEqual('deleted', deleted_img['status'])
self.assertNotIn('%s/%s' % (BASE_URI, UUID1), self.store.data)
@mock.patch('glance.context.get_ksa_client')
def test_image_delete_proxies_hash_op_host(self, mock_client):
# Make sure that we proxy to the remote side when we need to
self.config(
worker_self_reference_url='http://glance-worker2.openstack.org')
request = unit_test_utils.get_fake_request(
'/v2/images/%s' % UUID4, method='DELETE')
with mock.patch.object(
glance.notifier.ImageRepoProxy, 'get') as m_get:
m_get.return_value = FakeImage(status='uploading')
m_get.return_value.extra_properties['os_glance_hash_op_host'] = (
'https://glance-worker1.openstack.org')
remote_hdrs = {'x-openstack-request-id': 'remote-req'}
mock_resp = mock.MagicMock(location='/target',
status_code=202,
reason='Thanks',
headers=remote_hdrs)
mock_client.return_value.delete.return_value = mock_resp
self.controller.delete(request, UUID4)
# Make sure we called the expected remote URL and passed
# the body.
mock_client.return_value.delete.assert_called_once_with(
('https://glance-worker1.openstack.org'
'/v2/images/%s') % UUID4,
json=None, timeout=60)
def test_delete_not_allowed_by_policy(self):
request = unit_test_utils.get_fake_request()
with mock.patch.object(self.controller.policy, 'enforce') as mock_enf: