Merge "Optimize zip unpacking"
This commit is contained in:
commit
2563f34425
@ -90,6 +90,43 @@ def save_blob_to_store(blob_id, blob, context, max_size,
|
||||
return location, data.bytes_read, checksums
|
||||
|
||||
|
||||
@utils.error_handler(error_map)
|
||||
def save_blobs_to_store(blobs, context, max_size,
|
||||
store_type=None, verifier=None):
|
||||
"""Save several files to specified store.
|
||||
|
||||
:param store_type: type of the store, None means save to default store.
|
||||
:param blobs: list of tuples (blob_data_id, data)
|
||||
:param context: user context
|
||||
:param verifier:signature verified
|
||||
:return: dict {blob_data_id: (location_uri, size, checksums)}
|
||||
"""
|
||||
# wrap data in CooperativeReader
|
||||
blobs = [(blob_data_id,
|
||||
utils.LimitingReader(utils.CooperativeReader(data), max_size))
|
||||
for (blob_data_id, data) in blobs]
|
||||
|
||||
if store_type == 'database':
|
||||
locations = database_api.add_to_backend_batch(blobs, context, verifier)
|
||||
else:
|
||||
locations = []
|
||||
for blob_data_id, data in blobs:
|
||||
(location, __, __, __) = backend.add_to_backend(
|
||||
CONF, blob_data_id, data, 0, store_type, context, verifier)
|
||||
locations.append(location)
|
||||
|
||||
# combine location, size and checksums together
|
||||
res = {}
|
||||
for i in range(len(locations)):
|
||||
data = blobs[i][1]
|
||||
checksums = {"md5": data.md5.hexdigest(),
|
||||
"sha1": data.sha1.hexdigest(),
|
||||
"sha256": data.sha256.hexdigest()}
|
||||
res[blobs[i][0]] = (locations[i], data.bytes_read, checksums)
|
||||
|
||||
return res
|
||||
|
||||
|
||||
@utils.error_handler(error_map)
|
||||
def load_from_store(uri, context):
|
||||
"""Load file from store backend.
|
||||
|
@ -706,6 +706,27 @@ def save_blob_data(context, blob_data_id, data, session):
|
||||
return "sql://" + blob_data.id
|
||||
|
||||
|
||||
@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500,
|
||||
stop_max_attempt_number=50)
|
||||
def save_blob_data_batch(context, blobs, session):
|
||||
"""Perform batch uploading to database."""
|
||||
with session.begin():
|
||||
|
||||
locations = []
|
||||
|
||||
# blobs is a list of tuples (blob_data_id, data)
|
||||
for blob_data_id, data in blobs:
|
||||
blob_data = models.ArtifactBlobData()
|
||||
blob_data.id = blob_data_id
|
||||
blob_data.data = data.read()
|
||||
session.add(blob_data)
|
||||
locations.append("sql://" + blob_data.id)
|
||||
|
||||
session.flush()
|
||||
|
||||
return locations
|
||||
|
||||
|
||||
@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500,
|
||||
stop_max_attempt_number=50)
|
||||
def get_blob_data(context, uri, session):
|
||||
|
@ -15,7 +15,6 @@
|
||||
|
||||
"""Contains additional file utils that may be useful for upload hooks."""
|
||||
|
||||
import io
|
||||
import os
|
||||
import tempfile
|
||||
import zipfile
|
||||
@ -25,10 +24,8 @@ from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from glare.common import exception
|
||||
from glare.common import store_api
|
||||
from glare.common import utils
|
||||
from glare.i18n import _
|
||||
from glare.objects.meta import fields as glare_fields
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -65,6 +62,57 @@ def extract_zip_to_temporary_folder(tfile):
|
||||
return tdir
|
||||
|
||||
|
||||
def unpack_zip_archive_to_artifact_folder(context, af, zip_ref, folder_name):
|
||||
"""Unpack zip archive to artifact folder.
|
||||
|
||||
:param context: user context
|
||||
:param af: artifact object
|
||||
:param zip_ref: zip archive to be extracted
|
||||
:param folder_name: name of the artifact folder where to extract data
|
||||
"""
|
||||
file_dict = {}
|
||||
blobs = []
|
||||
for name in zip_ref.namelist():
|
||||
if not name.endswith('/'):
|
||||
blob_id = uuidutils.generate_uuid()
|
||||
# create an an empty blob instance in db with 'saving' status
|
||||
blob = {'url': None, 'size': None, 'md5': None, 'sha1': None,
|
||||
'sha256': None, 'status': 'saving', 'id': blob_id,
|
||||
'external': False,
|
||||
'content_type': 'application/octet-stream'}
|
||||
file_dict[name] = blob
|
||||
blobs.append((blob_id, utils.BlobIterator(zip_ref.read(name))))
|
||||
|
||||
af = af.update_blob(context, af.id, folder_name, file_dict)
|
||||
|
||||
default_store = getattr(
|
||||
CONF, 'artifact_type:' + af.get_type_name()).default_store
|
||||
# use global parameter if default store isn't set per artifact type
|
||||
if default_store is None:
|
||||
default_store = CONF.glance_store.default_store
|
||||
|
||||
# try to perform blob uploading to storage backend
|
||||
try:
|
||||
blobs_info = store_api.save_blobs_to_store(
|
||||
blobs, context, af.get_max_blob_size(folder_name),
|
||||
default_store)
|
||||
for name in zip_ref.namelist():
|
||||
if not name.endswith('/'):
|
||||
location_uri, size, checksums = blobs_info[
|
||||
file_dict[name]['id']]
|
||||
# update blob info and activate it
|
||||
file_dict[name].update({'url': location_uri,
|
||||
'status': 'active',
|
||||
'size': size})
|
||||
file_dict[name].update(checksums)
|
||||
except Exception:
|
||||
# if upload failed remove blob from db and storage
|
||||
with excutils.save_and_reraise_exception(logger=LOG):
|
||||
af.update_blob(context, af.id, folder_name, None)
|
||||
|
||||
af.update_blob(context, af.id, folder_name, file_dict)
|
||||
|
||||
|
||||
def upload_content_file(context, af, data, blob_dict, key_name,
|
||||
content_type='application/octet-stream'):
|
||||
"""Upload a file to a blob dictionary.
|
||||
@ -109,31 +157,3 @@ def upload_content_file(context, af, data, blob_dict, key_name,
|
||||
blob.update(checksums)
|
||||
getattr(af, blob_dict)[key_name] = blob
|
||||
af.update_blob(context, af.id, blob_dict, getattr(af, blob_dict))
|
||||
|
||||
|
||||
def unpack_zip_archive_in_memory(context, af, field_name, fd):
|
||||
"""Unpack zip archive in memory and write its content to artifact folder.
|
||||
|
||||
:param context: user context
|
||||
:param af: artifact object
|
||||
:param field_name: blob dict name where to unpack the data
|
||||
:param fd: zip archive
|
||||
:return: io.BytesIO object - simple stream of in-memory bytes
|
||||
"""
|
||||
flobj = io.BytesIO(fd.read(INMEMORY_OBJECT_SIZE_LIMIT))
|
||||
|
||||
# Raise exception if something left
|
||||
data = fd.read(1)
|
||||
if data:
|
||||
msg = _("The zip you are trying to unpack is too big. "
|
||||
"The system upper limit is %s") % INMEMORY_OBJECT_SIZE_LIMIT
|
||||
raise exception.RequestEntityTooLarge(msg)
|
||||
|
||||
zip_ref = zipfile.ZipFile(flobj, 'r')
|
||||
for name in zip_ref.namelist():
|
||||
if not name.endswith('/'):
|
||||
upload_content_file(
|
||||
context, af, utils.BlobIterator(zip_ref.read(name)),
|
||||
field_name, name)
|
||||
flobj.seek(0)
|
||||
return flobj
|
||||
|
@ -24,6 +24,10 @@ class DatabaseStoreAPI(base_api.BaseStoreAPI):
|
||||
session = db_api.get_session()
|
||||
return db_api.save_blob_data(context, blob_id, data, session)
|
||||
|
||||
def add_to_backend_batch(self, blobs, context, verifier=None):
|
||||
session = db_api.get_session()
|
||||
return db_api.save_blob_data_batch(context, blobs, session)
|
||||
|
||||
def get_from_store(self, uri, context):
|
||||
session = db_api.get_session()
|
||||
return db_api.get_blob_data(context, uri, session)
|
||||
|
@ -13,6 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
from time import time
|
||||
|
||||
from glare.tests.unit import base
|
||||
|
||||
@ -42,3 +43,38 @@ class TestArtifactHooks(base.BaseTestArtifactAPI):
|
||||
self.assertEqual(11, artifact['content']['folder1/bbb.txt']['size'])
|
||||
self.assertEqual(
|
||||
11, artifact['content']['folder1/folder2/ccc.txt']['size'])
|
||||
|
||||
def test_unpacking_database(self):
|
||||
self.config(default_store='database',
|
||||
group='artifact_type:unpacking_artifact')
|
||||
self.test_unpacking()
|
||||
|
||||
def test_unpacking_big_archive(self):
|
||||
var_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||
'../', 'var'))
|
||||
data_path = os.path.join(var_dir, 'hooks_100.zip')
|
||||
|
||||
# play rally - test that this test should pass faster than 3 seconds
|
||||
start = time()
|
||||
with open(data_path, "rb") as data:
|
||||
self.controller.upload_blob(
|
||||
self.req, 'unpacking_artifact', self.unpacking_artifact['id'],
|
||||
'zip', data, 'application/octet-stream')
|
||||
end = time()
|
||||
self.assertIs(True, (end - start) < 3, (end - start))
|
||||
|
||||
artifact = self.controller.show(self.req, 'unpacking_artifact',
|
||||
self.unpacking_artifact['id'])
|
||||
self.assertEqual(15702, artifact['zip']['size'])
|
||||
self.assertEqual('active', artifact['zip']['status'])
|
||||
|
||||
self.assertEqual(100, len(artifact['content']))
|
||||
|
||||
for blob in artifact['content'].values():
|
||||
self.assertEqual('active', blob['status'])
|
||||
self.assertEqual(15, blob['size'])
|
||||
|
||||
def test_unpacking_database_big_archive(self):
|
||||
self.config(default_store='database',
|
||||
group='artifact_type:unpacking_artifact')
|
||||
self.test_unpacking_big_archive()
|
||||
|
@ -16,7 +16,6 @@ import io
|
||||
import zipfile
|
||||
|
||||
from glare.common import exception
|
||||
from glare.common import utils
|
||||
from glare.objects import base
|
||||
from glare.objects.meta import file_utils
|
||||
from glare.objects.meta import wrappers
|
||||
@ -49,10 +48,9 @@ class Unpacker(base.BaseArtifact):
|
||||
raise exception.RequestEntityTooLarge(msg)
|
||||
|
||||
zip_ref = zipfile.ZipFile(flobj, 'r')
|
||||
for name in zip_ref.namelist():
|
||||
if not name.endswith('/'):
|
||||
file_utils.upload_content_file(
|
||||
context, af, utils.BlobIterator(zip_ref.read(name)),
|
||||
'content', name)
|
||||
|
||||
file_utils.unpack_zip_archive_to_artifact_folder(
|
||||
context, af, zip_ref, 'content')
|
||||
|
||||
flobj.seek(0)
|
||||
return flobj
|
||||
|
BIN
glare/tests/var/hooks_100.zip
Normal file
BIN
glare/tests/var/hooks_100.zip
Normal file
Binary file not shown.
Loading…
Reference in New Issue
Block a user