Don't set lock on whole blob upload

Change-Id: I7787e6237dedbde9da735e51ee17665d65a19e2e
This commit is contained in:
Mike Fedosin 2016-10-05 19:12:44 +03:00 committed by Mike Fedosin
parent 4e85e9b5a5
commit a6f6754b87
4 changed files with 224 additions and 297 deletions

View File

@ -333,13 +333,13 @@ class ArtifactsController(api_versioning.VersionedResource):
if content_type == ('application/vnd+openstack.glare-custom-location'
'+json'):
url = data.pop('url')
return self.engine.add_blob_dict_location(
req.context, type_name, artifact_id,
field_name, blob_key, url, data)
return self.engine.add_blob_location(
req.context, type_name, artifact_id, field_name, url, data,
blob_key)
else:
return self.engine.upload_blob_dict(
req.context, type_name, artifact_id,
field_name, blob_key, data, content_type)
return self.engine.upload_blob(req.context, type_name, artifact_id,
field_name, data, content_type,
blob_key)
@supported_versions(min_ver='1.0')
@log_request_progress
@ -370,7 +370,7 @@ class ArtifactsController(api_versioning.VersionedResource):
:param blob_key: name of Dict of blobs (optional)
:return: iterator that returns blob data
"""
data, meta = self.engine.download_blob_dict(
data, meta = self.engine.download_blob(
req.context, type_name, artifact_id, field_name, blob_key)
result = {'data': data, 'meta': meta}
return result

View File

@ -27,6 +27,7 @@ import six
import sqlalchemy
from sqlalchemy import and_
import sqlalchemy.exc
from sqlalchemy import exists
from sqlalchemy import func
from sqlalchemy import or_
import sqlalchemy.orm as orm
@ -162,6 +163,14 @@ def _create_or_update(context, artifact_id, values, session):
artifact.updated_at = timeutils.utcnow()
if 'status' in values and values['status'] == 'active':
if session.query(
exists().where(
models.ArtifactBlob.status == 'saving' and
models.ArtifactBlob.artifact_id == artifact_id)
).one()[0]:
raise exception.Conflict(
"You cannot activate artifact if it has "
"uploading blobs.")
artifact.activated_at = timeutils.utcnow()
artifact.update(values)
artifact.save(session=session)

View File

@ -17,14 +17,17 @@ import copy
import jsonpatch
from oslo_log import log as logging
from oslo_utils import excutils
from glare.common import exception
from glare.common import policy
from glare.common import store_api
from glare.common import utils
from glare.db import artifact_api
from glare.i18n import _
from glare.i18n import _, _LI
from glare import locking
from glare.notification import Notifier
from glare.objects.meta import fields as glare_fields
from glare.objects.meta import registry as glare_registry
LOG = logging.getLogger(__name__)
@ -240,67 +243,204 @@ class Engine(object):
Notifier.notify(context, "artifact.delete", af)
@classmethod
@lock_engine.locked(['type_name', 'artifact_id'])
def add_blob_location(cls, context, type_name,
artifact_id, field_name, location, blob_meta):
def add_blob_location(cls, context, type_name, artifact_id, field_name,
location, blob_meta, blob_key=None):
"""Add external location to blob.
:param context: user context
:param type_name: name of artifact type
:param artifact_id: id of the artifact to be updated
:param field_name: name of blob or blob dict field
:param location: external blob url
:param blob_meta: dictionary containing blob metadata like md5 checksum
:param blob_key: if field_name is blob dict it specifies concrete key
in this dict
:return updated artifact
"""
af = cls._get_artifact(context, type_name, artifact_id)
action_name = 'artifact:set_location'
policy.authorize(action_name, af.to_dict(), context)
modified_af = af.add_blob_location(context, af, field_name, location,
blob_meta)
blob_name = "%s[%s]" % (field_name, blob_key)\
if blob_key else field_name
blob = {'url': location, 'size': None, 'md5': None, 'sha1': None,
'sha256': None, 'status': glare_fields.BlobFieldType.ACTIVE,
'external': True, 'content_type': None}
md5 = blob_meta.pop("md5", None)
if md5 is None:
msg = (_("Incorrect blob metadata %(meta)s. MD5 must be specified "
"for external location in artifact blob %(blob_name)."),
{"meta": str(blob_meta), "blob_name": blob_name})
raise exception.BadRequest(msg)
else:
blob["md5"] = md5
blob["sha1"] = blob_meta.pop("sha1", None)
blob["sha256"] = blob_meta.pop("sha256", None)
modified_af = cls.update_blob(
context, type_name, artifact_id, blob, field_name, blob_key,
validate=True)
LOG.info(_LI("External location %(location)s has been created "
"successfully for artifact %(artifact)s blob %(blob)s"),
{'location': location, 'artifact': af.id,
'blob': blob_name})
Notifier.notify(context, action_name, modified_af)
return modified_af.to_dict()
@classmethod
@lock_engine.locked(['type_name', 'artifact_id'])
def add_blob_dict_location(cls, context, type_name, artifact_id,
field_name, blob_key, location, blob_meta):
af = cls._get_artifact(context, type_name, artifact_id)
action_name = 'artifact:set_location'
policy.authorize(action_name, af.to_dict(), context)
modified_af = af.add_blob_dict_location(context, af, field_name,
blob_key, location, blob_meta)
Notifier.notify(context, action_name, modified_af)
return modified_af.to_dict()
@classmethod
@lock_engine.locked(['type_name', 'artifact_id'])
def upload_blob(cls, context, type_name, artifact_id, field_name, fd,
content_type):
"""Upload Artifact blob"""
content_type, blob_key=None):
"""Upload Artifact blob.
:param context: user context
:param type_name: name of artifact type
:param artifact_id: id of the artifact to be updated
:param field_name: name of blob or blob dict field
:param fd: file descriptor that Glare uses to upload the file
:param field_name: name of blob dict field
:param content_type: data content-type
:param blob_key: if field_name is blob dict it specifies concrete key
in this dict
:return file iterator for requested file
"""
af = cls._get_artifact(context, type_name, artifact_id)
action_name = "artifact:upload"
policy.authorize(action_name, af.to_dict(), context)
modified_af = af.upload_blob(context, af, field_name, fd, content_type)
blob_name = "%s[%s]" % (field_name, blob_key)\
if blob_key else field_name
try:
# call upload hook
fd = af.validate_upload(context, af, field_name, fd)
except Exception as e:
raise exception.BadRequest(message=str(e))
# create an an empty blob instance in db with 'saving' status
blob = {'url': None, 'size': None, 'md5': None, 'sha1': None,
'sha256': None, 'status': glare_fields.BlobFieldType.SAVING,
'external': False, 'content_type': content_type}
modified_af = cls.update_blob(
context, type_name, artifact_id, blob, field_name, blob_key,
validate=True)
if blob_key is None:
blob_id = getattr(modified_af, field_name)['id']
else:
blob_id = getattr(modified_af, field_name)[blob_key]['id']
# try to perform blob uploading to storage backend
try:
location_uri, size, checksums = store_api.save_blob_to_store(
blob_id, fd, context, af.get_max_blob_size(field_name))
except Exception:
# if upload failed remove blob from db and storage
with excutils.save_and_reraise_exception(logger=LOG):
if blob_key is None:
af.update_blob(context, af.id, {field_name: None})
else:
blob_dict_attr = modified_af[field_name]
del blob_dict_attr[blob_key]
af.update_blob(context, af.id,
{field_name: blob_dict_attr})
LOG.info(_LI("Successfully finished blob upload for artifact "
"%(artifact)s blob field %(blob)s."),
{'artifact': af.id, 'blob': blob_name})
# update blob info and activate it
blob.update({'url': location_uri,
'status': glare_fields.BlobFieldType.ACTIVE,
'size': size})
blob.update(checksums)
modified_af = cls.update_blob(
context, type_name, artifact_id, blob, field_name, blob_key)
Notifier.notify(context, action_name, modified_af)
return modified_af.to_dict()
@classmethod
@lock_engine.locked(['type_name', 'artifact_id'])
def upload_blob_dict(cls, context, type_name, artifact_id, field_name,
blob_key, fd, content_type):
"""Upload Artifact blob to dict"""
def update_blob(cls, context, type_name, artifact_id, blob,
field_name, blob_key=None, validate=False):
"""Update blob info.
:param context: user context
:param type_name: name of artifact type
:param artifact_id: id of the artifact to be updated
:param blob: blob representation in dict format
:param field_name: name of blob or blob dict field
:param blob_key: if field_name is blob dict it specifies concrete key
in this dict
:param validate: enable validation of possibility of blob uploading
:return updated artifact
"""
af = cls._get_artifact(context, type_name, artifact_id)
action_name = "artifact:upload"
policy.authorize(action_name, af.to_dict(), context)
modified_af = af.upload_blob_dict(context, af, field_name, blob_key,
fd, content_type)
Notifier.notify(context, action_name, modified_af)
return modified_af.to_dict()
if validate:
af.validate_upload_allowed(context, af, field_name, blob_key)
if blob_key is None:
setattr(af, field_name, blob)
return af.update_blob(
context, af.id, {field_name: getattr(af, field_name)})
else:
blob_dict_attr = getattr(af, field_name)
blob_dict_attr[blob_key] = blob
return af.update_blob(
context, af.id, {field_name: blob_dict_attr})
@classmethod
def download_blob(cls, context, type_name, artifact_id, field_name):
"""Download blob from artifact"""
def download_blob(cls, context, type_name, artifact_id, field_name,
blob_key=None):
"""Download binary data from Glare Artifact.
:param context: user context
:param type_name: name of artifact type
:param artifact_id: id of the artifact to be updated
:param field_name: name of blob or blob dict field
:param blob_key: if field_name is blob dict it specifies concrete key
in this dict
:return: file iterator for requested file
"""
af = cls._get_artifact(context, type_name, artifact_id,
read_only=True)
policy.authorize("artifact:download", af.to_dict(), context)
return af.download_blob(context, af, field_name)
@classmethod
def download_blob_dict(cls, context, type_name, artifact_id, field_name,
blob_key):
"""Download blob from artifact"""
af = cls._get_artifact(context, type_name, artifact_id,
read_only=True)
policy.authorize("artifact:download", af.to_dict(), context)
return af.download_blob_dict(context, af, field_name, blob_key)
blob_name = "%s[%s]" % (field_name, blob_key)\
if blob_key else field_name
# check if property is downloadable
if blob_key is None and not af.is_blob(field_name):
msg = _("%s is not a blob") % field_name
raise exception.BadRequest(msg)
if blob_key is not None and not af.is_blob_dict(field_name):
msg = _("%s is not a blob dict") % field_name
raise exception.BadRequest(msg)
if af.status == af.STATUS.DEACTIVATED and not context.is_admin:
msg = _("Only admin is allowed to download artifact data "
"when it's deactivated")
raise exception.Forbidden(message=msg)
# get blob info from dict or directly
if blob_key is None:
blob = getattr(af, field_name)
else:
try:
blob = getattr(af, field_name)[blob_key]
except KeyError:
msg = _("Blob with name %s is not found") % blob_name
raise exception.NotFound(message=msg)
if blob is None or blob['status'] != glare_fields.BlobFieldType.ACTIVE:
msg = _("%s is not ready for download") % blob_name
raise exception.BadRequest(message=msg)
meta = {'md5': blob.get('md5'),
'sha1': blob.get('sha1'),
'sha256': blob.get('sha256'),
'external': blob.get('external')}
if blob['external']:
data = {'url': blob['url']}
else:
data = store_api.load_from_store(uri=blob['url'], context=context)
meta['size'] = blob.get('size')
meta['content_type'] = blob.get('content_type')
return data, meta

View File

@ -18,7 +18,6 @@ import uuid
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import timeutils
from oslo_versionedobjects import base
from oslo_versionedobjects import fields
@ -625,6 +624,11 @@ class BaseArtifact(base.VersionedObject):
'for artifact %(id)s') % {'name': name,
'id': af.id}
raise exception.Conflict(msg)
elif b['status'] == glare_fields.\
BlobFieldType.SAVING:
msg = _('Blob %(name)s is saving for artifact %(id)s'
) % {'name': name, 'id': af.id}
raise exception.Conflict(msg)
else:
b['status'] = glare_fields.BlobFieldType.PENDING_DELETE
blobs[name] = b
@ -638,6 +642,11 @@ class BaseArtifact(base.VersionedObject):
'for artifact %(id)s') % {'name': name,
'id': af.id}
raise exception.Conflict(msg)
elif b['status'] == glare_fields. \
BlobFieldType.SAVING:
msg = _('Blob %(name)s is saving for artifact '
'%(id)s') % {'name': name, 'id': af.id}
raise exception.Conflict(msg)
else:
b['status'] = glare_fields.\
BlobFieldType.PENDING_DELETE
@ -773,12 +782,17 @@ class BaseArtifact(base.VersionedObject):
return cls._init_artifact(context, af)
@classmethod
def _get_max_blob_size(cls, field_name):
def get_max_blob_size(cls, field_name):
return getattr(cls.fields[field_name], 'max_blob_size',
attribute.BlobAttribute.DEFAULT_MAX_BLOB_SIZE)
@classmethod
def _validate_upload_allowed(cls, context, af, field_name, blob_key=None):
def validate_upload_allowed(cls, context, af, field_name, blob_key=None):
"""Validate if given blob is ready for uploading."""
blob_name = "%s[%s]" % (field_name, blob_key)\
if blob_key else field_name
if field_name not in cls.fields:
msg = _("%s property does not exist") % field_name
raise exception.BadRequest(msg)
@ -800,244 +814,22 @@ class BaseArtifact(base.VersionedObject):
msg = _("Cannot re-upload blob %(blob)s for artifact "
"%(af)s") % {'blob': field_name, 'af': af.id}
raise exception.Conflict(message=msg)
@classmethod
def upload_blob(cls, context, af, field_name, fd, content_type):
"""Upload binary object as artifact property
:param context: user context
:param af: current Artifact definition
:param field_name: name of blob field
:param fd: file descriptor that Glare uses to upload the file
:param content_type: data content-type
:return: updated Artifact definition in Glare
"""
fd = cls.validate_upload(context, af, field_name, fd)
cls._validate_upload_allowed(context, af, field_name)
LOG.debug("Parameters validation for artifact %(artifact)s blob "
"upload passed for blob %(blob)s. "
"upload passed for blob %(blob_name)s. "
"Start blob uploading to backend.",
{'artifact': af.id, 'blob': field_name})
blob = {'url': None, 'size': None, 'md5': None, 'sha1': None,
'sha256': None, 'status': glare_fields.BlobFieldType.SAVING,
'external': False, 'content_type': content_type}
setattr(af, field_name, blob)
cls.db_api.update(
context, af.id, {field_name: getattr(af, field_name)})
blob_id = getattr(af, field_name)['id']
try:
location_uri, size, checksums = store_api.save_blob_to_store(
blob_id, fd, context, cls._get_max_blob_size(field_name))
blob.update({'url': location_uri,
'status': glare_fields.BlobFieldType.ACTIVE,
'size': size})
blob.update(checksums)
setattr(af, field_name, blob)
af_upd = cls.db_api.update(
context, af.id, {field_name: getattr(af, field_name)})
LOG.info(_LI("Successfully finished blob upload for artifact "
"%(artifact)s blob field %(blob)s."),
{'artifact': af.id, 'blob': field_name})
return cls._init_artifact(context, af_upd)
except Exception:
with excutils.save_and_reraise_exception(logger=LOG):
cls.db_api.update(context, af.id, {field_name: None})
{'artifact': af.id, 'blob_name': blob_name})
@classmethod
def download_blob(cls, context, af, field_name):
"""Download binary data from Glare Artifact.
:param context: user context
:param af: Artifact definition in Glare repo
:param field_name: name of blob field
:return: file iterator for requested file
"""
if not cls.is_blob(field_name):
msg = _("%s is not a blob") % field_name
raise exception.BadRequest(msg)
if af.status == cls.STATUS.DEACTIVATED and not context.is_admin:
msg = _("Only admin is allowed to download artifact data "
"when it's deactivated")
raise exception.Forbidden(message=msg)
blob = getattr(af, field_name)
if blob is None or blob['status'] != glare_fields.BlobFieldType.ACTIVE:
msg = _("%s is not ready for download") % field_name
raise exception.BadRequest(message=msg)
meta = {'md5': blob.get('md5'),
'sha1': blob.get('sha1'),
'sha256': blob.get('sha256'),
'external': blob.get('external')}
if blob['external']:
data = {'url': blob['url']}
else:
data = store_api.load_from_store(uri=blob['url'], context=context)
meta['size'] = blob.get('size')
meta['content_type'] = blob.get('content_type')
return data, meta
@classmethod
def upload_blob_dict(cls, context, af, field_name, blob_key, fd,
content_type):
def update_blob(cls, context, af_id, values):
"""Upload binary object as artifact property
:param context: user context
:param af: current Artifact definition
:param blob_key: name of blob key in dict
:param fd: file descriptor that Glare uses to upload the file
:param field_name: name of blob dict field
:param content_type: data content-type
:return: updated Artifact definition in Glare
:param af_id: id of modified artifact
:param values: updated blob values
:return updated Artifact definition in Glare
"""
fd = cls.validate_upload(context, af, field_name, fd)
cls._validate_upload_allowed(context, af, field_name, blob_key)
LOG.debug("Parameters validation for artifact %(artifact)s blob "
"upload passed for blob dict %(blob)s with key %(key)s. "
"Start blob uploading to backend.",
{'artifact': af.id, 'blob': field_name, 'key': blob_key})
blob = {'url': None, 'size': None, 'md5': None, 'sha1': None,
'sha256': None, 'status': glare_fields.BlobFieldType.SAVING,
'external': False, 'content_type': content_type}
blob_dict_attr = getattr(af, field_name)
blob_dict_attr[blob_key] = blob
cls.db_api.update(
context, af.id, {field_name: blob_dict_attr})
blob_id = getattr(af, field_name)[blob_key]['id']
try:
location_uri, size, checksums = store_api.save_blob_to_store(
blob_id, fd, context, cls._get_max_blob_size(field_name))
blob.update({'url': location_uri,
'status': glare_fields.BlobFieldType.ACTIVE,
'size': size})
blob.update(checksums)
af_values = cls.db_api.update(
context, af.id, {field_name: blob_dict_attr})
LOG.info(_LI("Successfully finished blob upload for artifact "
"%(artifact)s blob dict field %(blob)s with key."),
{'artifact': af.id, 'blob': field_name, 'key': blob_key})
return cls._init_artifact(context, af_values)
except Exception:
with excutils.save_and_reraise_exception(logger=LOG):
del blob_dict_attr[blob_key]
cls.db_api.update(context, af.id, {field_name: blob_dict_attr})
@classmethod
def download_blob_dict(cls, context, af, field_name, blob_key):
"""Download binary data from Glare Artifact.
:param context: user context
:param af: Artifact definition in Glare repo
:param blob_key: name of blob key in dict
:param field_name: name of blob dict field
:return: file iterator for requested file
"""
if not cls.is_blob_dict(field_name):
msg = _("%s is not a blob dict") % field_name
raise exception.BadRequest(msg)
if af.status == cls.STATUS.DEACTIVATED and not context.is_admin:
msg = _("Only admin is allowed to download artifact data "
"when it's deactivated")
raise exception.Forbidden(message=msg)
try:
blob = getattr(af, field_name)[blob_key]
except KeyError:
msg = _("Blob with name %(blob_name)s is not found in blob "
"dictionary %(blob_dict)s") % (blob_key, field_name)
raise exception.NotFound(message=msg)
if blob is None or blob['status'] != glare_fields.BlobFieldType.ACTIVE:
msg = _("Blob %(blob_name)s from blob dictionary %(blob_dict)s "
"is not ready for download") % (blob_key, field_name)
LOG.error(msg)
raise exception.BadRequest(message=msg)
meta = {'md5': blob.get('md5'),
'sha1': blob.get('sha1'),
'sha256': blob.get('sha256'),
'external': blob.get('external')}
if blob['external']:
data = {'url': blob['url']}
else:
data = store_api.load_from_store(uri=blob['url'], context=context)
meta['size'] = blob.get('size')
meta['content_type'] = blob.get('content_type')
return data, meta
@classmethod
def add_blob_location(cls, context, af, field_name, location, blob_meta):
"""Upload binary object as artifact property
:param context: user context
:param af: current Artifact definition
:param field_name: name of blob field
:param location: blob url
:return: updated Artifact definition in Glare
"""
cls._validate_upload_allowed(context, af, field_name)
LOG.debug("Parameters validation for artifact %(artifact)s location "
"passed for blob %(blob)s. Start location check for artifact"
".", {'artifact': af.id, 'blob': field_name})
blob = {'url': location, 'size': None, 'md5': None, 'sha1': None,
'sha256': None, 'status': glare_fields.BlobFieldType.ACTIVE,
'external': True, 'content_type': None}
md5 = blob_meta.pop("md5", None)
if md5 is None:
msg = (_("Incorrect blob metadata %(meta)s. MD5 must be specified "
"for external location in artifact blob %(field_name)."),
{"meta": str(blob_meta), "field_name": field_name})
raise exception.BadRequest(msg)
else:
blob["md5"] = md5
blob["sha1"] = blob_meta.pop("sha1", None)
blob["sha256"] = blob_meta.pop("sha256", None)
setattr(af, field_name, blob)
updated_af = cls.db_api.update(
context, af.id, {field_name: getattr(af, field_name)})
LOG.info(_LI("External location %(location)s has been created "
"successfully for artifact %(artifact)s blob %(blob)s"),
{'location': location, 'artifact': af.id,
'blob': field_name})
return cls._init_artifact(context, updated_af)
@classmethod
def add_blob_dict_location(cls, context, af, field_name,
blob_key, location, blob_meta):
cls._validate_upload_allowed(context, af, field_name, blob_key)
blob = {'url': location, 'size': None, 'md5': None, 'sha1': None,
'sha256': None, 'status': glare_fields.BlobFieldType.ACTIVE,
'external': True, 'content_type': None}
md5 = blob_meta.pop("md5", None)
if md5 is None:
msg = (_("Incorrect blob metadata %(meta)s. MD5 must be specified "
"for external location in artifact blob "
"%(field_name)[%(blob_key)s]."),
{"meta": str(blob_meta), "field_name": field_name,
"blob_key": str(blob_key)})
raise exception.BadRequest(msg)
else:
blob["md5"] = md5
blob["sha1"] = blob_meta.pop("sha1", None)
blob["sha256"] = blob_meta.pop("sha256", None)
blob_dict_attr = getattr(af, field_name)
blob_dict_attr[blob_key] = blob
updated_af = cls.db_api.update(
context, af.id, {field_name: blob_dict_attr})
LOG.info(
_LI("External location %(location)s has been created successfully "
"for artifact %(artifact)s blob dict %(blob)s with key "
"%(key)s"),
{'location': location, 'artifact': af.id,
'blob': field_name, 'key': blob_key})
return cls._init_artifact(context, updated_af)
af_upd = cls.db_api.update(context, af_id, values)
return cls._init_artifact(context, af_upd)
@classmethod
def validate_activate(cls, context, af, values=None):
@ -1225,19 +1017,5 @@ class ReadOnlyMixin(object):
raise exception.Forbidden("This type is read only.")
@classmethod
def upload_blob(cls, context, af, field_name, fd, content_type):
raise exception.Forbidden("This type is read only.")
@classmethod
def upload_blob_dict(cls, context, af, field_name, blob_key, fd,
content_type):
raise exception.Forbidden("This type is read only.")
@classmethod
def add_blob_location(cls, context, af, field_name, location, blob_meta):
raise exception.Forbidden("This type is read only.")
@classmethod
def add_blob_dict_location(cls, context, af, field_name,
blob_key, location, blob_meta):
def update_blob(cls, context, af_id, values):
raise exception.Forbidden("This type is read only.")