0bdc50ba99
The function is deprecated because it is equivalent to str(ex) in Python 3. Note that %s implies conversion by str so we don't have to even call str for most of cases. However a few explicit str calls are kept to simplify unit tests. Depends-on: https://review.opendev.org/c/openstack/oslo.utils/+/938929 Change-Id: Ie04bf64a22781c95d24222c96eee7865b0712312 Signed-off-by: Takashi Kajinami <kajinamit@oss.nttdata.com>
365 lines
13 KiB
Python
365 lines
13 KiB
Python
# Copyright 2024 RedHat Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
import hashlib
|
|
|
|
import glance_store as store
|
|
from oslo_config import cfg
|
|
from oslo_log import log as logging
|
|
from oslo_utils import excutils
|
|
from taskflow.patterns import linear_flow as lf
|
|
from taskflow import retry
|
|
from taskflow import task
|
|
|
|
import glance.async_.flows.api_image_import as image_import
|
|
from glance.common import exception
|
|
from glance.common import store_utils
|
|
from glance.i18n import _, _LW
|
|
from glance import task_cancellation_tracker as tracker
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
CONF = cfg.CONF
|
|
|
|
|
|
class _HashCalculationFailed(exception.GlanceException):
|
|
|
|
def __init__(self, message):
|
|
super(_HashCalculationFailed, self).__init__(message)
|
|
|
|
|
|
class _InvalidLocation(exception.GlanceException):
|
|
|
|
def __init__(self, message):
|
|
super(_InvalidLocation, self).__init__(message)
|
|
|
|
|
|
class _HashCalculationCanceled(exception.GlanceException):
|
|
|
|
def __init__(self, message):
|
|
super(_HashCalculationCanceled, self).__init__(message)
|
|
|
|
|
|
class _CalculateHash(task.Task):
|
|
|
|
def __init__(self, task_id, task_type, image_repo, image_id,
|
|
hashing_algo, status=None):
|
|
self.task_id = task_id
|
|
self.task_type = task_type
|
|
self.image_repo = image_repo
|
|
self.image_id = image_id
|
|
self.hashing_algo = hashing_algo
|
|
self.image_status = status
|
|
super(_CalculateHash, self).__init__(
|
|
name='%s-CalculateHash-%s' % (task_type, task_id))
|
|
|
|
def _calculate_hash(self, image):
|
|
current_os_hash_value = hashlib.new(self.hashing_algo)
|
|
current_checksum = hashlib.md5(usedforsecurity=False)
|
|
for chunk in image.get_data():
|
|
if tracker.is_canceled(self.task_id):
|
|
raise _HashCalculationCanceled(
|
|
_('Hash calculation for image %s has been '
|
|
'canceled') % self.image_id)
|
|
if chunk is None:
|
|
break
|
|
current_checksum.update(chunk)
|
|
current_os_hash_value.update(chunk)
|
|
image.checksum = current_checksum.hexdigest()
|
|
image.os_hash_value = current_os_hash_value.hexdigest()
|
|
|
|
def _set_checksum_and_hash(self, image):
|
|
tracker.register_operation(self.task_id)
|
|
retries = 0
|
|
while retries <= CONF.http_retries and image.os_hash_value is None:
|
|
retries += 1
|
|
try:
|
|
self._calculate_hash(image)
|
|
self.image_repo.save(image)
|
|
except _HashCalculationCanceled as e:
|
|
with excutils.save_and_reraise_exception():
|
|
LOG.debug('Hash calculation cancelled: %s', e)
|
|
except IOError as e:
|
|
LOG.debug('[%i/%i] Hash calculation failed due to %s',
|
|
retries, CONF.http_retries, str(e))
|
|
if retries == CONF.http_retries:
|
|
if image.status != 'active':
|
|
# NOTE(pdeore): The image location add operation
|
|
# should succeed so this exception should be raised
|
|
# only when image status is not active.
|
|
msg = (_('Hash calculation failed for image %s '
|
|
'data') % self.image_id)
|
|
raise _HashCalculationFailed(msg)
|
|
else:
|
|
msg = (_LW("Hash calculation failed for image %s "
|
|
"data") % self.image_id)
|
|
LOG.warning(msg)
|
|
except store.exceptions.NotFound:
|
|
LOG.debug(_('Failed to calculate checksum of %(image_id)s '
|
|
'as image data has been deleted from the '
|
|
'backend'), {'image_id': self.image_id})
|
|
finally:
|
|
tracker.signal_finished(self.task_id)
|
|
image.extra_properties.pop('os_glance_hash_op_host', None)
|
|
self.image_repo.save(image)
|
|
|
|
def execute(self):
|
|
image = self.image_repo.get(self.image_id)
|
|
if image.status == 'queued':
|
|
image.status = self.image_status
|
|
image.os_hash_algo = self.hashing_algo
|
|
# NOTE(abhishekk): Record this worker's
|
|
# worker_self_reference_url in the image metadata, so we
|
|
# know who is calculating the checksum and hash.
|
|
self_url = CONF.worker_self_reference_url or CONF.public_endpoint
|
|
if self_url:
|
|
image.extra_properties['os_glance_hash_op_host'] = self_url
|
|
self.image_repo.save(image)
|
|
self._set_checksum_and_hash(image)
|
|
|
|
def revert(self, result, **kwargs):
|
|
"""Set os_hash_algo to None when hash calculation fails
|
|
and remove the location by reverting image to queued
|
|
state
|
|
"""
|
|
try:
|
|
tracker.signal_finished(self.task_id)
|
|
image = self.image_repo.get(self.image_id)
|
|
if image.status == 'importing':
|
|
if not image.locations[0]['url'].startswith("http"):
|
|
# NOTE(pdeore): `http` store doesn't allow deletion of
|
|
# location:
|
|
image.locations.pop()
|
|
image.status = 'queued'
|
|
image.os_hash_algo = None
|
|
self.image_repo.save(image)
|
|
except exception.NotFound:
|
|
LOG.debug("Image %s might have been deleted from the backend",
|
|
self.image_id)
|
|
|
|
|
|
class _VerifyValidationData(task.Task):
|
|
|
|
def __init__(self, task_id, task_type, image_repo, image_id,
|
|
val_data):
|
|
self.task_id = task_id
|
|
self.task_type = task_type
|
|
self.image_repo = image_repo
|
|
self.image_id = image_id
|
|
self.val_data = val_data
|
|
super(_VerifyValidationData, self).__init__(
|
|
name='%s-VerifyValidationData-%s' % (task_type, task_id))
|
|
|
|
def execute(self):
|
|
"""Verify the Validation Data with calculated Hash
|
|
|
|
:param image_id: Glance Image ID
|
|
:val_data: Validation Data provider by user
|
|
"""
|
|
image = self.image_repo.get(self.image_id)
|
|
|
|
if self.val_data['os_hash_value'] != image.os_hash_value:
|
|
msg = (_("os_hash_value: (%s) not matched with actual "
|
|
"os_hash_value: (%s)") % (
|
|
self.val_data['os_hash_value'],
|
|
image.os_hash_value))
|
|
raise exception.InvalidParameterValue(msg)
|
|
|
|
def revert(self, result, **kwargs):
|
|
"""Set image status back to queued and
|
|
set the hash values to None
|
|
"""
|
|
try:
|
|
image = self.image_repo.get(self.image_id)
|
|
if not image.locations[0]['url'].startswith("http"):
|
|
# NOTE(pdeore): `http` store doesn't allow deletion of
|
|
# location
|
|
image.locations.pop()
|
|
image.status = 'queued'
|
|
image.os_hash_algo = None
|
|
image.os_hash_value = None
|
|
image.checksum = None
|
|
self.image_repo.save(image)
|
|
except exception.NotFound:
|
|
LOG.debug("Image %s might have been deleted from the backend",
|
|
self.image_id)
|
|
|
|
|
|
class _SetHashValues(task.Task):
|
|
|
|
def __init__(self, task_id, task_type, image_repo, image_id,
|
|
val_data):
|
|
self.task_id = task_id
|
|
self.task_type = task_type
|
|
self.image_repo = image_repo
|
|
self.image_id = image_id
|
|
self.val_data = val_data
|
|
super(_SetHashValues, self).__init__(
|
|
name='%s-SetHashValues-%s' % (task_type, task_id))
|
|
|
|
def execute(self):
|
|
"""Set user provided hash algo and value hash properties to image
|
|
when do_secure_hash is False.
|
|
|
|
:param image_id: Glance Image ID
|
|
:val_data: Validation Data provided by user
|
|
"""
|
|
image = self.image_repo.get(self.image_id)
|
|
for k, v in self.val_data.items():
|
|
setattr(image, k, v)
|
|
self.image_repo.save(image)
|
|
|
|
|
|
class _UpdateLocationTask(task.Task):
|
|
|
|
def __init__(self, task_id, task_type, image_repo, image_id, url,
|
|
context):
|
|
self.task_id = task_id
|
|
self.task_type = task_type
|
|
self.image_repo = image_repo
|
|
self.image_id = image_id
|
|
self.url = url
|
|
self.context = context
|
|
super(_UpdateLocationTask, self).__init__(
|
|
name='%s-UpdateLocationTask-%s' % (task_type, task_id))
|
|
|
|
def execute(self):
|
|
"""Update the image location
|
|
|
|
:param image_id: Glance Image ID
|
|
:param url: Location URL
|
|
"""
|
|
image = self.image_repo.get(self.image_id)
|
|
try:
|
|
# (NOTE(pdeore): Add metadata key to add the store identifier
|
|
# as location metadata
|
|
updated_location = {
|
|
'url': self.url,
|
|
'metadata': {},
|
|
}
|
|
if CONF.enabled_backends:
|
|
updated_location = store_utils.get_updated_store_location(
|
|
[updated_location], context=self.context)[0]
|
|
|
|
image.locations.append(updated_location)
|
|
self.image_repo.save(image)
|
|
except (exception.Invalid, exception.BadStoreUri) as e:
|
|
raise _InvalidLocation(e.msg)
|
|
|
|
|
|
class _SetImageToActiveTask(task.Task):
|
|
|
|
def __init__(self, task_id, task_type, image_repo, image_id):
|
|
self.task_id = task_id
|
|
self.task_type = task_type
|
|
self.image_repo = image_repo
|
|
self.image_id = image_id
|
|
super(_SetImageToActiveTask, self).__init__(
|
|
name='%s-SetImageToActiveTask-%s' % (task_type, task_id))
|
|
|
|
def execute(self):
|
|
"""Set Image status to Active
|
|
|
|
:param image_id: Glance Image ID
|
|
"""
|
|
image = self.image_repo.get(self.image_id)
|
|
image.status = 'active'
|
|
self.image_repo.save(image)
|
|
|
|
def revert(self, result, **kwargs):
|
|
"""Set image status back to queued and
|
|
remove the location if it's added.
|
|
"""
|
|
try:
|
|
image = self.image_repo.get(self.image_id)
|
|
if image.status != 'active':
|
|
if not image.locations[0]['url'].startswith("http"):
|
|
# NOTE(pdeore): `http` store doesn't allow deletion of
|
|
# location
|
|
image.locations.pop()
|
|
if image.status == 'importing':
|
|
image.status = 'queued'
|
|
self.image_repo.save(image)
|
|
except exception.NotFound:
|
|
LOG.debug("Image %s might have been deleted from the backend",
|
|
self.image_id)
|
|
|
|
|
|
def get_flow(**kwargs):
|
|
"""Return task flow
|
|
|
|
:param task_id: Task ID
|
|
:param task_type: Type of the task
|
|
:param task_repo: Task repo
|
|
:param image_repo: Image repository used
|
|
:param image_id: ID of the Image to be processed
|
|
"""
|
|
task_id = kwargs.get('task_id')
|
|
task_type = kwargs.get('task_type')
|
|
task_repo = kwargs.get('task_repo')
|
|
image_repo = kwargs.get('image_repo')
|
|
admin_repo = kwargs.get('admin_repo')
|
|
image_id = kwargs.get('image_id')
|
|
val_data = kwargs.get('val_data', {})
|
|
loc_url = kwargs.get('loc_url')
|
|
context = kwargs.get('context')
|
|
|
|
hashing_algo = val_data.get("os_hash_algo",
|
|
CONF['hashing_algorithm'])
|
|
|
|
# Instantiate an action wrapper with the admin repo if we got one,
|
|
# otherwise with the regular repo.
|
|
action_wrapper = image_import.ImportActionWrapper(
|
|
admin_repo or image_repo, image_id, task_id)
|
|
kwargs['action_wrapper'] = action_wrapper
|
|
|
|
flow = lf.Flow(task_type, retry=retry.AlwaysRevert())
|
|
flow.add(image_import._ImageLock(task_id, task_type, action_wrapper))
|
|
flow.add(
|
|
_UpdateLocationTask(task_id, task_type, image_repo, image_id,
|
|
loc_url, context))
|
|
if CONF.do_secure_hash:
|
|
if val_data:
|
|
flow.add(
|
|
_CalculateHash(task_id, task_type, image_repo, image_id,
|
|
hashing_algo, status='importing'))
|
|
flow.add(
|
|
_VerifyValidationData(task_id, task_type, image_repo,
|
|
image_id, val_data))
|
|
flow.add(
|
|
_SetImageToActiveTask(task_id, task_type, image_repo,
|
|
image_id))
|
|
else:
|
|
flow.add(
|
|
_SetImageToActiveTask(
|
|
task_id, task_type, image_repo, image_id))
|
|
flow.add(
|
|
_CalculateHash(task_id, task_type, image_repo, image_id,
|
|
hashing_algo))
|
|
elif val_data:
|
|
flow.add(
|
|
_SetHashValues(task_id, task_type, image_repo, image_id,
|
|
val_data))
|
|
flow.add(
|
|
_SetImageToActiveTask(task_id, task_type, image_repo, image_id))
|
|
else:
|
|
flow.add(
|
|
_SetImageToActiveTask(task_id, task_type, image_repo, image_id))
|
|
|
|
flow.add(
|
|
image_import._CompleteTask(task_id, task_type, task_repo,
|
|
action_wrapper))
|
|
|
|
return flow
|