Files
Andriy Kurilin 14e494d197 Base mypy support
Change-Id: Idf4283556dbc5a0af9a07d317d13cf20d9b496fb
Signed-off-by: Andriy Kurilin <andr.kurilin@gmail.com>
2025-09-09 17:36:10 +02:00

94 lines
3.3 KiB
Python

# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from rally import exceptions
from rally.task import atomic
from rally_openstack.common.services.image import image as image_service
class GlanceMixin(atomic.ActionTimerMixin):
def _get_client(self):
return self._clients.glance(self.version)
def get_image(self, image):
"""Get specified image.
:param image: ID or object with ID of image to obtain.
"""
from glanceclient import exc as glance_exc
image_id = getattr(image, "id", image)
try:
aname = "glance_v%s.get_image" % self.version
with atomic.ActionTimer(self, aname):
return self._get_client().images.get(image_id)
except glance_exc.HTTPNotFound:
raise exceptions.GetResourceNotFound(resource=image)
def delete_image(self, image_id):
"""Delete image."""
aname = "glance_v%s.delete_image" % self.version
with atomic.ActionTimer(self, aname):
self._get_client().images.delete(image_id)
def download_image(self, image_id, do_checksum=True):
"""Retrieve data of an image.
:param image_id: ID of the image to download.
:param do_checksum: Enable/disable checksum validation.
:returns: An iterable body or None
"""
aname = "glance_v%s.download_image" % self.version
with atomic.ActionTimer(self, aname):
return self._get_client().images.data(image_id,
do_checksum=do_checksum)
class UnifiedGlanceMixin(object):
@staticmethod
def _unify_image(image):
if hasattr(image, "visibility"):
return image_service.UnifiedImage(id=image.id, name=image.name,
status=image.status,
visibility=image.visibility)
else:
return image_service.UnifiedImage(
id=image.id, name=image.name,
status=image.status,
visibility=("public" if image.is_public else "private"))
def get_image(self, image):
"""Get specified image.
:param image: ID or object with ID of image to obtain.
"""
image_obj = self._impl.get_image(image=image)
return self._unify_image(image_obj)
def delete_image(self, image_id):
"""Delete image."""
self._impl.delete_image(image_id=image_id)
def download_image(self, image_id, do_checksum=True):
"""Download data for an image.
:param image_id: image id to look up
:param do_checksum: Enable/disable checksum validation
:rtype: iterable containing image data or None
"""
return self._impl.download_image(image_id, do_checksum=do_checksum)