diff --git a/glare/api/v1/resource.py b/glare/api/v1/resource.py index 351abb2..73039d5 100644 --- a/glare/api/v1/resource.py +++ b/glare/api/v1/resource.py @@ -333,13 +333,13 @@ class ArtifactsController(api_versioning.VersionedResource): if content_type == ('application/vnd+openstack.glare-custom-location' '+json'): url = data.pop('url') - return self.engine.add_blob_dict_location( - req.context, type_name, artifact_id, - field_name, blob_key, url, data) + return self.engine.add_blob_location( + req.context, type_name, artifact_id, field_name, url, data, + blob_key) else: - return self.engine.upload_blob_dict( - req.context, type_name, artifact_id, - field_name, blob_key, data, content_type) + return self.engine.upload_blob(req.context, type_name, artifact_id, + field_name, data, content_type, + blob_key) @supported_versions(min_ver='1.0') @log_request_progress @@ -370,7 +370,7 @@ class ArtifactsController(api_versioning.VersionedResource): :param blob_key: name of Dict of blobs (optional) :return: iterator that returns blob data """ - data, meta = self.engine.download_blob_dict( + data, meta = self.engine.download_blob( req.context, type_name, artifact_id, field_name, blob_key) result = {'data': data, 'meta': meta} return result diff --git a/glare/db/sqlalchemy/api.py b/glare/db/sqlalchemy/api.py index da99660..7ff5bef 100644 --- a/glare/db/sqlalchemy/api.py +++ b/glare/db/sqlalchemy/api.py @@ -27,6 +27,7 @@ import six import sqlalchemy from sqlalchemy import and_ import sqlalchemy.exc +from sqlalchemy import exists from sqlalchemy import func from sqlalchemy import or_ import sqlalchemy.orm as orm @@ -162,6 +163,14 @@ def _create_or_update(context, artifact_id, values, session): artifact.updated_at = timeutils.utcnow() if 'status' in values and values['status'] == 'active': + if session.query( + exists().where( + models.ArtifactBlob.status == 'saving' and + models.ArtifactBlob.artifact_id == artifact_id) + ).one()[0]: + raise exception.Conflict( + "You cannot activate artifact if it has " + "uploading blobs.") artifact.activated_at = timeutils.utcnow() artifact.update(values) artifact.save(session=session) diff --git a/glare/engine.py b/glare/engine.py index 34cd562..ac766b5 100644 --- a/glare/engine.py +++ b/glare/engine.py @@ -17,14 +17,17 @@ import copy import jsonpatch from oslo_log import log as logging +from oslo_utils import excutils from glare.common import exception from glare.common import policy +from glare.common import store_api from glare.common import utils from glare.db import artifact_api -from glare.i18n import _ +from glare.i18n import _, _LI from glare import locking from glare.notification import Notifier +from glare.objects.meta import fields as glare_fields from glare.objects.meta import registry as glare_registry LOG = logging.getLogger(__name__) @@ -240,67 +243,204 @@ class Engine(object): Notifier.notify(context, "artifact.delete", af) @classmethod - @lock_engine.locked(['type_name', 'artifact_id']) - def add_blob_location(cls, context, type_name, - artifact_id, field_name, location, blob_meta): + def add_blob_location(cls, context, type_name, artifact_id, field_name, + location, blob_meta, blob_key=None): + """Add external location to blob. + + :param context: user context + :param type_name: name of artifact type + :param artifact_id: id of the artifact to be updated + :param field_name: name of blob or blob dict field + :param location: external blob url + :param blob_meta: dictionary containing blob metadata like md5 checksum + :param blob_key: if field_name is blob dict it specifies concrete key + in this dict + :return updated artifact + """ af = cls._get_artifact(context, type_name, artifact_id) action_name = 'artifact:set_location' policy.authorize(action_name, af.to_dict(), context) - modified_af = af.add_blob_location(context, af, field_name, location, - blob_meta) + + blob_name = "%s[%s]" % (field_name, blob_key)\ + if blob_key else field_name + + blob = {'url': location, 'size': None, 'md5': None, 'sha1': None, + 'sha256': None, 'status': glare_fields.BlobFieldType.ACTIVE, + 'external': True, 'content_type': None} + md5 = blob_meta.pop("md5", None) + if md5 is None: + msg = (_("Incorrect blob metadata %(meta)s. MD5 must be specified " + "for external location in artifact blob %(blob_name)."), + {"meta": str(blob_meta), "blob_name": blob_name}) + raise exception.BadRequest(msg) + else: + blob["md5"] = md5 + blob["sha1"] = blob_meta.pop("sha1", None) + blob["sha256"] = blob_meta.pop("sha256", None) + modified_af = cls.update_blob( + context, type_name, artifact_id, blob, field_name, blob_key, + validate=True) + LOG.info(_LI("External location %(location)s has been created " + "successfully for artifact %(artifact)s blob %(blob)s"), + {'location': location, 'artifact': af.id, + 'blob': blob_name}) + Notifier.notify(context, action_name, modified_af) return modified_af.to_dict() @classmethod - @lock_engine.locked(['type_name', 'artifact_id']) - def add_blob_dict_location(cls, context, type_name, artifact_id, - field_name, blob_key, location, blob_meta): - af = cls._get_artifact(context, type_name, artifact_id) - action_name = 'artifact:set_location' - policy.authorize(action_name, af.to_dict(), context) - modified_af = af.add_blob_dict_location(context, af, field_name, - blob_key, location, blob_meta) - Notifier.notify(context, action_name, modified_af) - return modified_af.to_dict() - - @classmethod - @lock_engine.locked(['type_name', 'artifact_id']) def upload_blob(cls, context, type_name, artifact_id, field_name, fd, - content_type): - """Upload Artifact blob""" + content_type, blob_key=None): + """Upload Artifact blob. + + :param context: user context + :param type_name: name of artifact type + :param artifact_id: id of the artifact to be updated + :param field_name: name of blob or blob dict field + :param fd: file descriptor that Glare uses to upload the file + :param field_name: name of blob dict field + :param content_type: data content-type + :param blob_key: if field_name is blob dict it specifies concrete key + in this dict + :return file iterator for requested file + """ af = cls._get_artifact(context, type_name, artifact_id) action_name = "artifact:upload" policy.authorize(action_name, af.to_dict(), context) - modified_af = af.upload_blob(context, af, field_name, fd, content_type) + + blob_name = "%s[%s]" % (field_name, blob_key)\ + if blob_key else field_name + + try: + # call upload hook + fd = af.validate_upload(context, af, field_name, fd) + except Exception as e: + raise exception.BadRequest(message=str(e)) + + # create an an empty blob instance in db with 'saving' status + blob = {'url': None, 'size': None, 'md5': None, 'sha1': None, + 'sha256': None, 'status': glare_fields.BlobFieldType.SAVING, + 'external': False, 'content_type': content_type} + modified_af = cls.update_blob( + context, type_name, artifact_id, blob, field_name, blob_key, + validate=True) + + if blob_key is None: + blob_id = getattr(modified_af, field_name)['id'] + else: + blob_id = getattr(modified_af, field_name)[blob_key]['id'] + + # try to perform blob uploading to storage backend + try: + location_uri, size, checksums = store_api.save_blob_to_store( + blob_id, fd, context, af.get_max_blob_size(field_name)) + except Exception: + # if upload failed remove blob from db and storage + with excutils.save_and_reraise_exception(logger=LOG): + if blob_key is None: + af.update_blob(context, af.id, {field_name: None}) + else: + blob_dict_attr = modified_af[field_name] + del blob_dict_attr[blob_key] + af.update_blob(context, af.id, + {field_name: blob_dict_attr}) + LOG.info(_LI("Successfully finished blob upload for artifact " + "%(artifact)s blob field %(blob)s."), + {'artifact': af.id, 'blob': blob_name}) + + # update blob info and activate it + blob.update({'url': location_uri, + 'status': glare_fields.BlobFieldType.ACTIVE, + 'size': size}) + blob.update(checksums) + modified_af = cls.update_blob( + context, type_name, artifact_id, blob, field_name, blob_key) + Notifier.notify(context, action_name, modified_af) return modified_af.to_dict() @classmethod @lock_engine.locked(['type_name', 'artifact_id']) - def upload_blob_dict(cls, context, type_name, artifact_id, field_name, - blob_key, fd, content_type): - """Upload Artifact blob to dict""" + def update_blob(cls, context, type_name, artifact_id, blob, + field_name, blob_key=None, validate=False): + """Update blob info. + :param context: user context + :param type_name: name of artifact type + :param artifact_id: id of the artifact to be updated + :param blob: blob representation in dict format + :param field_name: name of blob or blob dict field + :param blob_key: if field_name is blob dict it specifies concrete key + in this dict + :param validate: enable validation of possibility of blob uploading + :return updated artifact + """ af = cls._get_artifact(context, type_name, artifact_id) - action_name = "artifact:upload" - policy.authorize(action_name, af.to_dict(), context) - modified_af = af.upload_blob_dict(context, af, field_name, blob_key, - fd, content_type) - Notifier.notify(context, action_name, modified_af) - return modified_af.to_dict() + if validate: + af.validate_upload_allowed(context, af, field_name, blob_key) + if blob_key is None: + setattr(af, field_name, blob) + return af.update_blob( + context, af.id, {field_name: getattr(af, field_name)}) + else: + blob_dict_attr = getattr(af, field_name) + blob_dict_attr[blob_key] = blob + return af.update_blob( + context, af.id, {field_name: blob_dict_attr}) @classmethod - def download_blob(cls, context, type_name, artifact_id, field_name): - """Download blob from artifact""" + def download_blob(cls, context, type_name, artifact_id, field_name, + blob_key=None): + """Download binary data from Glare Artifact. + :param context: user context + :param type_name: name of artifact type + :param artifact_id: id of the artifact to be updated + :param field_name: name of blob or blob dict field + :param blob_key: if field_name is blob dict it specifies concrete key + in this dict + :return: file iterator for requested file + """ af = cls._get_artifact(context, type_name, artifact_id, read_only=True) policy.authorize("artifact:download", af.to_dict(), context) - return af.download_blob(context, af, field_name) - @classmethod - def download_blob_dict(cls, context, type_name, artifact_id, field_name, - blob_key): - """Download blob from artifact""" - af = cls._get_artifact(context, type_name, artifact_id, - read_only=True) - policy.authorize("artifact:download", af.to_dict(), context) - return af.download_blob_dict(context, af, field_name, blob_key) + blob_name = "%s[%s]" % (field_name, blob_key)\ + if blob_key else field_name + + # check if property is downloadable + if blob_key is None and not af.is_blob(field_name): + msg = _("%s is not a blob") % field_name + raise exception.BadRequest(msg) + if blob_key is not None and not af.is_blob_dict(field_name): + msg = _("%s is not a blob dict") % field_name + raise exception.BadRequest(msg) + + if af.status == af.STATUS.DEACTIVATED and not context.is_admin: + msg = _("Only admin is allowed to download artifact data " + "when it's deactivated") + raise exception.Forbidden(message=msg) + + # get blob info from dict or directly + if blob_key is None: + blob = getattr(af, field_name) + else: + try: + blob = getattr(af, field_name)[blob_key] + except KeyError: + msg = _("Blob with name %s is not found") % blob_name + raise exception.NotFound(message=msg) + + if blob is None or blob['status'] != glare_fields.BlobFieldType.ACTIVE: + msg = _("%s is not ready for download") % blob_name + raise exception.BadRequest(message=msg) + + meta = {'md5': blob.get('md5'), + 'sha1': blob.get('sha1'), + 'sha256': blob.get('sha256'), + 'external': blob.get('external')} + if blob['external']: + data = {'url': blob['url']} + else: + data = store_api.load_from_store(uri=blob['url'], context=context) + meta['size'] = blob.get('size') + meta['content_type'] = blob.get('content_type') + return data, meta diff --git a/glare/objects/base.py b/glare/objects/base.py index 0353aff..7e3b522 100644 --- a/glare/objects/base.py +++ b/glare/objects/base.py @@ -18,7 +18,6 @@ import uuid from oslo_config import cfg from oslo_log import log as logging -from oslo_utils import excutils from oslo_utils import timeutils from oslo_versionedobjects import base from oslo_versionedobjects import fields @@ -625,6 +624,11 @@ class BaseArtifact(base.VersionedObject): 'for artifact %(id)s') % {'name': name, 'id': af.id} raise exception.Conflict(msg) + elif b['status'] == glare_fields.\ + BlobFieldType.SAVING: + msg = _('Blob %(name)s is saving for artifact %(id)s' + ) % {'name': name, 'id': af.id} + raise exception.Conflict(msg) else: b['status'] = glare_fields.BlobFieldType.PENDING_DELETE blobs[name] = b @@ -638,6 +642,11 @@ class BaseArtifact(base.VersionedObject): 'for artifact %(id)s') % {'name': name, 'id': af.id} raise exception.Conflict(msg) + elif b['status'] == glare_fields. \ + BlobFieldType.SAVING: + msg = _('Blob %(name)s is saving for artifact ' + '%(id)s') % {'name': name, 'id': af.id} + raise exception.Conflict(msg) else: b['status'] = glare_fields.\ BlobFieldType.PENDING_DELETE @@ -773,12 +782,17 @@ class BaseArtifact(base.VersionedObject): return cls._init_artifact(context, af) @classmethod - def _get_max_blob_size(cls, field_name): + def get_max_blob_size(cls, field_name): return getattr(cls.fields[field_name], 'max_blob_size', attribute.BlobAttribute.DEFAULT_MAX_BLOB_SIZE) @classmethod - def _validate_upload_allowed(cls, context, af, field_name, blob_key=None): + def validate_upload_allowed(cls, context, af, field_name, blob_key=None): + """Validate if given blob is ready for uploading.""" + + blob_name = "%s[%s]" % (field_name, blob_key)\ + if blob_key else field_name + if field_name not in cls.fields: msg = _("%s property does not exist") % field_name raise exception.BadRequest(msg) @@ -800,244 +814,22 @@ class BaseArtifact(base.VersionedObject): msg = _("Cannot re-upload blob %(blob)s for artifact " "%(af)s") % {'blob': field_name, 'af': af.id} raise exception.Conflict(message=msg) - - @classmethod - def upload_blob(cls, context, af, field_name, fd, content_type): - """Upload binary object as artifact property - - :param context: user context - :param af: current Artifact definition - :param field_name: name of blob field - :param fd: file descriptor that Glare uses to upload the file - :param content_type: data content-type - :return: updated Artifact definition in Glare - """ - fd = cls.validate_upload(context, af, field_name, fd) - cls._validate_upload_allowed(context, af, field_name) - LOG.debug("Parameters validation for artifact %(artifact)s blob " - "upload passed for blob %(blob)s. " + "upload passed for blob %(blob_name)s. " "Start blob uploading to backend.", - {'artifact': af.id, 'blob': field_name}) - blob = {'url': None, 'size': None, 'md5': None, 'sha1': None, - 'sha256': None, 'status': glare_fields.BlobFieldType.SAVING, - 'external': False, 'content_type': content_type} - setattr(af, field_name, blob) - cls.db_api.update( - context, af.id, {field_name: getattr(af, field_name)}) - blob_id = getattr(af, field_name)['id'] - - try: - location_uri, size, checksums = store_api.save_blob_to_store( - blob_id, fd, context, cls._get_max_blob_size(field_name)) - blob.update({'url': location_uri, - 'status': glare_fields.BlobFieldType.ACTIVE, - 'size': size}) - blob.update(checksums) - setattr(af, field_name, blob) - af_upd = cls.db_api.update( - context, af.id, {field_name: getattr(af, field_name)}) - LOG.info(_LI("Successfully finished blob upload for artifact " - "%(artifact)s blob field %(blob)s."), - {'artifact': af.id, 'blob': field_name}) - return cls._init_artifact(context, af_upd) - except Exception: - with excutils.save_and_reraise_exception(logger=LOG): - cls.db_api.update(context, af.id, {field_name: None}) + {'artifact': af.id, 'blob_name': blob_name}) @classmethod - def download_blob(cls, context, af, field_name): - """Download binary data from Glare Artifact. - - :param context: user context - :param af: Artifact definition in Glare repo - :param field_name: name of blob field - :return: file iterator for requested file - """ - if not cls.is_blob(field_name): - msg = _("%s is not a blob") % field_name - raise exception.BadRequest(msg) - if af.status == cls.STATUS.DEACTIVATED and not context.is_admin: - msg = _("Only admin is allowed to download artifact data " - "when it's deactivated") - raise exception.Forbidden(message=msg) - blob = getattr(af, field_name) - if blob is None or blob['status'] != glare_fields.BlobFieldType.ACTIVE: - msg = _("%s is not ready for download") % field_name - raise exception.BadRequest(message=msg) - meta = {'md5': blob.get('md5'), - 'sha1': blob.get('sha1'), - 'sha256': blob.get('sha256'), - 'external': blob.get('external')} - if blob['external']: - data = {'url': blob['url']} - else: - data = store_api.load_from_store(uri=blob['url'], context=context) - meta['size'] = blob.get('size') - meta['content_type'] = blob.get('content_type') - return data, meta - - @classmethod - def upload_blob_dict(cls, context, af, field_name, blob_key, fd, - content_type): + def update_blob(cls, context, af_id, values): """Upload binary object as artifact property :param context: user context - :param af: current Artifact definition - :param blob_key: name of blob key in dict - :param fd: file descriptor that Glare uses to upload the file - :param field_name: name of blob dict field - :param content_type: data content-type - :return: updated Artifact definition in Glare + :param af_id: id of modified artifact + :param values: updated blob values + :return updated Artifact definition in Glare """ - fd = cls.validate_upload(context, af, field_name, fd) - cls._validate_upload_allowed(context, af, field_name, blob_key) - - LOG.debug("Parameters validation for artifact %(artifact)s blob " - "upload passed for blob dict %(blob)s with key %(key)s. " - "Start blob uploading to backend.", - {'artifact': af.id, 'blob': field_name, 'key': blob_key}) - blob = {'url': None, 'size': None, 'md5': None, 'sha1': None, - 'sha256': None, 'status': glare_fields.BlobFieldType.SAVING, - 'external': False, 'content_type': content_type} - blob_dict_attr = getattr(af, field_name) - blob_dict_attr[blob_key] = blob - cls.db_api.update( - context, af.id, {field_name: blob_dict_attr}) - blob_id = getattr(af, field_name)[blob_key]['id'] - try: - location_uri, size, checksums = store_api.save_blob_to_store( - blob_id, fd, context, cls._get_max_blob_size(field_name)) - blob.update({'url': location_uri, - 'status': glare_fields.BlobFieldType.ACTIVE, - 'size': size}) - blob.update(checksums) - af_values = cls.db_api.update( - context, af.id, {field_name: blob_dict_attr}) - LOG.info(_LI("Successfully finished blob upload for artifact " - "%(artifact)s blob dict field %(blob)s with key."), - {'artifact': af.id, 'blob': field_name, 'key': blob_key}) - return cls._init_artifact(context, af_values) - except Exception: - with excutils.save_and_reraise_exception(logger=LOG): - del blob_dict_attr[blob_key] - cls.db_api.update(context, af.id, {field_name: blob_dict_attr}) - - @classmethod - def download_blob_dict(cls, context, af, field_name, blob_key): - """Download binary data from Glare Artifact. - - :param context: user context - :param af: Artifact definition in Glare repo - :param blob_key: name of blob key in dict - :param field_name: name of blob dict field - :return: file iterator for requested file - """ - if not cls.is_blob_dict(field_name): - msg = _("%s is not a blob dict") % field_name - raise exception.BadRequest(msg) - - if af.status == cls.STATUS.DEACTIVATED and not context.is_admin: - msg = _("Only admin is allowed to download artifact data " - "when it's deactivated") - raise exception.Forbidden(message=msg) - try: - blob = getattr(af, field_name)[blob_key] - except KeyError: - msg = _("Blob with name %(blob_name)s is not found in blob " - "dictionary %(blob_dict)s") % (blob_key, field_name) - raise exception.NotFound(message=msg) - if blob is None or blob['status'] != glare_fields.BlobFieldType.ACTIVE: - msg = _("Blob %(blob_name)s from blob dictionary %(blob_dict)s " - "is not ready for download") % (blob_key, field_name) - LOG.error(msg) - raise exception.BadRequest(message=msg) - meta = {'md5': blob.get('md5'), - 'sha1': blob.get('sha1'), - 'sha256': blob.get('sha256'), - 'external': blob.get('external')} - - if blob['external']: - data = {'url': blob['url']} - else: - data = store_api.load_from_store(uri=blob['url'], context=context) - meta['size'] = blob.get('size') - meta['content_type'] = blob.get('content_type') - return data, meta - - @classmethod - def add_blob_location(cls, context, af, field_name, location, blob_meta): - """Upload binary object as artifact property - - :param context: user context - :param af: current Artifact definition - :param field_name: name of blob field - :param location: blob url - :return: updated Artifact definition in Glare - """ - cls._validate_upload_allowed(context, af, field_name) - LOG.debug("Parameters validation for artifact %(artifact)s location " - "passed for blob %(blob)s. Start location check for artifact" - ".", {'artifact': af.id, 'blob': field_name}) - - blob = {'url': location, 'size': None, 'md5': None, 'sha1': None, - 'sha256': None, 'status': glare_fields.BlobFieldType.ACTIVE, - 'external': True, 'content_type': None} - - md5 = blob_meta.pop("md5", None) - if md5 is None: - msg = (_("Incorrect blob metadata %(meta)s. MD5 must be specified " - "for external location in artifact blob %(field_name)."), - {"meta": str(blob_meta), "field_name": field_name}) - raise exception.BadRequest(msg) - else: - blob["md5"] = md5 - blob["sha1"] = blob_meta.pop("sha1", None) - blob["sha256"] = blob_meta.pop("sha256", None) - - setattr(af, field_name, blob) - updated_af = cls.db_api.update( - context, af.id, {field_name: getattr(af, field_name)}) - LOG.info(_LI("External location %(location)s has been created " - "successfully for artifact %(artifact)s blob %(blob)s"), - {'location': location, 'artifact': af.id, - 'blob': field_name}) - return cls._init_artifact(context, updated_af) - - @classmethod - def add_blob_dict_location(cls, context, af, field_name, - blob_key, location, blob_meta): - cls._validate_upload_allowed(context, af, field_name, blob_key) - - blob = {'url': location, 'size': None, 'md5': None, 'sha1': None, - 'sha256': None, 'status': glare_fields.BlobFieldType.ACTIVE, - 'external': True, 'content_type': None} - - md5 = blob_meta.pop("md5", None) - if md5 is None: - msg = (_("Incorrect blob metadata %(meta)s. MD5 must be specified " - "for external location in artifact blob " - "%(field_name)[%(blob_key)s]."), - {"meta": str(blob_meta), "field_name": field_name, - "blob_key": str(blob_key)}) - raise exception.BadRequest(msg) - else: - blob["md5"] = md5 - blob["sha1"] = blob_meta.pop("sha1", None) - blob["sha256"] = blob_meta.pop("sha256", None) - - blob_dict_attr = getattr(af, field_name) - blob_dict_attr[blob_key] = blob - updated_af = cls.db_api.update( - context, af.id, {field_name: blob_dict_attr}) - - LOG.info( - _LI("External location %(location)s has been created successfully " - "for artifact %(artifact)s blob dict %(blob)s with key " - "%(key)s"), - {'location': location, 'artifact': af.id, - 'blob': field_name, 'key': blob_key}) - return cls._init_artifact(context, updated_af) + af_upd = cls.db_api.update(context, af_id, values) + return cls._init_artifact(context, af_upd) @classmethod def validate_activate(cls, context, af, values=None): @@ -1225,19 +1017,5 @@ class ReadOnlyMixin(object): raise exception.Forbidden("This type is read only.") @classmethod - def upload_blob(cls, context, af, field_name, fd, content_type): - raise exception.Forbidden("This type is read only.") - - @classmethod - def upload_blob_dict(cls, context, af, field_name, blob_key, fd, - content_type): - raise exception.Forbidden("This type is read only.") - - @classmethod - def add_blob_location(cls, context, af, field_name, location, blob_meta): - raise exception.Forbidden("This type is read only.") - - @classmethod - def add_blob_dict_location(cls, context, af, field_name, - blob_key, location, blob_meta): + def update_blob(cls, context, af_id, values): raise exception.Forbidden("This type is read only.")