Optimize zip unpacking
Now each file in zip archive requires 2 or 3 calls to db: 1. initialize blob instanse, 2.(optional) upload data if database store is used, 3. activate blob instance So, for 100 files there are 300 requests, and complexity is O(n). With this commit we perform requests for all files in batch mode: 1. initialize all blob instances, 2.(optional) upload all data if database store is used, 3.activate all blob instances. So, for any number of files in archive there will be only 2 or 3 requests to db. Complexity is O(1). Change-Id: I87ea9d1f87dc6519956808a1d1e764f16e6dc611
This commit is contained in:
parent
e93e29bb50
commit
f28f9c3e28
@ -90,6 +90,43 @@ def save_blob_to_store(blob_id, blob, context, max_size,
|
||||
return location, data.bytes_read, checksums
|
||||
|
||||
|
||||
@utils.error_handler(error_map)
|
||||
def save_blobs_to_store(blobs, context, max_size,
|
||||
store_type=None, verifier=None):
|
||||
"""Save several files to specified store.
|
||||
|
||||
:param store_type: type of the store, None means save to default store.
|
||||
:param blobs: list of tuples (blob_data_id, data)
|
||||
:param context: user context
|
||||
:param verifier:signature verified
|
||||
:return: dict {blob_data_id: (location_uri, size, checksums)}
|
||||
"""
|
||||
# wrap data in CooperativeReader
|
||||
blobs = [(blob_data_id,
|
||||
utils.LimitingReader(utils.CooperativeReader(data), max_size))
|
||||
for (blob_data_id, data) in blobs]
|
||||
|
||||
if store_type == 'database':
|
||||
locations = database_api.add_to_backend_batch(blobs, context, verifier)
|
||||
else:
|
||||
locations = []
|
||||
for blob_data_id, data in blobs:
|
||||
(location, __, __, __) = backend.add_to_backend(
|
||||
CONF, blob_data_id, data, 0, store_type, context, verifier)
|
||||
locations.append(location)
|
||||
|
||||
# combine location, size and checksums together
|
||||
res = {}
|
||||
for i in range(len(locations)):
|
||||
data = blobs[i][1]
|
||||
checksums = {"md5": data.md5.hexdigest(),
|
||||
"sha1": data.sha1.hexdigest(),
|
||||
"sha256": data.sha256.hexdigest()}
|
||||
res[blobs[i][0]] = (locations[i], data.bytes_read, checksums)
|
||||
|
||||
return res
|
||||
|
||||
|
||||
@utils.error_handler(error_map)
|
||||
def load_from_store(uri, context):
|
||||
"""Load file from store backend.
|
||||
|
@ -706,6 +706,27 @@ def save_blob_data(context, blob_data_id, data, session):
|
||||
return "sql://" + blob_data.id
|
||||
|
||||
|
||||
@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500,
|
||||
stop_max_attempt_number=50)
|
||||
def save_blob_data_batch(context, blobs, session):
|
||||
"""Perform batch uploading to database."""
|
||||
with session.begin():
|
||||
|
||||
locations = []
|
||||
|
||||
# blobs is a list of tuples (blob_data_id, data)
|
||||
for blob_data_id, data in blobs:
|
||||
blob_data = models.ArtifactBlobData()
|
||||
blob_data.id = blob_data_id
|
||||
blob_data.data = data.read()
|
||||
session.add(blob_data)
|
||||
locations.append("sql://" + blob_data.id)
|
||||
|
||||
session.flush()
|
||||
|
||||
return locations
|
||||
|
||||
|
||||
@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500,
|
||||
stop_max_attempt_number=50)
|
||||
def get_blob_data(context, uri, session):
|
||||
|
@ -15,7 +15,6 @@
|
||||
|
||||
"""Contains additional file utils that may be useful for upload hooks."""
|
||||
|
||||
import io
|
||||
import os
|
||||
import tempfile
|
||||
import zipfile
|
||||
@ -25,10 +24,8 @@ from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from glare.common import exception
|
||||
from glare.common import store_api
|
||||
from glare.common import utils
|
||||
from glare.i18n import _
|
||||
from glare.objects.meta import fields as glare_fields
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -65,6 +62,57 @@ def extract_zip_to_temporary_folder(tfile):
|
||||
return tdir
|
||||
|
||||
|
||||
def unpack_zip_archive_to_artifact_folder(context, af, zip_ref, folder_name):
|
||||
"""Unpack zip archive to artifact folder.
|
||||
|
||||
:param context: user context
|
||||
:param af: artifact object
|
||||
:param zip_ref: zip archive to be extracted
|
||||
:param folder_name: name of the artifact folder where to extract data
|
||||
"""
|
||||
file_dict = {}
|
||||
blobs = []
|
||||
for name in zip_ref.namelist():
|
||||
if not name.endswith('/'):
|
||||
blob_id = uuidutils.generate_uuid()
|
||||
# create an an empty blob instance in db with 'saving' status
|
||||
blob = {'url': None, 'size': None, 'md5': None, 'sha1': None,
|
||||
'sha256': None, 'status': 'saving', 'id': blob_id,
|
||||
'external': False,
|
||||
'content_type': 'application/octet-stream'}
|
||||
file_dict[name] = blob
|
||||
blobs.append((blob_id, utils.BlobIterator(zip_ref.read(name))))
|
||||
|
||||
af = af.update_blob(context, af.id, folder_name, file_dict)
|
||||
|
||||
default_store = getattr(
|
||||
CONF, 'artifact_type:' + af.get_type_name()).default_store
|
||||
# use global parameter if default store isn't set per artifact type
|
||||
if default_store is None:
|
||||
default_store = CONF.glance_store.default_store
|
||||
|
||||
# try to perform blob uploading to storage backend
|
||||
try:
|
||||
blobs_info = store_api.save_blobs_to_store(
|
||||
blobs, context, af.get_max_blob_size(folder_name),
|
||||
default_store)
|
||||
for name in zip_ref.namelist():
|
||||
if not name.endswith('/'):
|
||||
location_uri, size, checksums = blobs_info[
|
||||
file_dict[name]['id']]
|
||||
# update blob info and activate it
|
||||
file_dict[name].update({'url': location_uri,
|
||||
'status': 'active',
|
||||
'size': size})
|
||||
file_dict[name].update(checksums)
|
||||
except Exception:
|
||||
# if upload failed remove blob from db and storage
|
||||
with excutils.save_and_reraise_exception(logger=LOG):
|
||||
af.update_blob(context, af.id, folder_name, None)
|
||||
|
||||
af.update_blob(context, af.id, folder_name, file_dict)
|
||||
|
||||
|
||||
def upload_content_file(context, af, data, blob_dict, key_name,
|
||||
content_type='application/octet-stream'):
|
||||
"""Upload a file to a blob dictionary.
|
||||
@ -109,31 +157,3 @@ def upload_content_file(context, af, data, blob_dict, key_name,
|
||||
blob.update(checksums)
|
||||
getattr(af, blob_dict)[key_name] = blob
|
||||
af.update_blob(context, af.id, blob_dict, getattr(af, blob_dict))
|
||||
|
||||
|
||||
def unpack_zip_archive_in_memory(context, af, field_name, fd):
|
||||
"""Unpack zip archive in memory and write its content to artifact folder.
|
||||
|
||||
:param context: user context
|
||||
:param af: artifact object
|
||||
:param field_name: blob dict name where to unpack the data
|
||||
:param fd: zip archive
|
||||
:return: io.BytesIO object - simple stream of in-memory bytes
|
||||
"""
|
||||
flobj = io.BytesIO(fd.read(INMEMORY_OBJECT_SIZE_LIMIT))
|
||||
|
||||
# Raise exception if something left
|
||||
data = fd.read(1)
|
||||
if data:
|
||||
msg = _("The zip you are trying to unpack is too big. "
|
||||
"The system upper limit is %s") % INMEMORY_OBJECT_SIZE_LIMIT
|
||||
raise exception.RequestEntityTooLarge(msg)
|
||||
|
||||
zip_ref = zipfile.ZipFile(flobj, 'r')
|
||||
for name in zip_ref.namelist():
|
||||
if not name.endswith('/'):
|
||||
upload_content_file(
|
||||
context, af, utils.BlobIterator(zip_ref.read(name)),
|
||||
field_name, name)
|
||||
flobj.seek(0)
|
||||
return flobj
|
||||
|
@ -24,6 +24,10 @@ class DatabaseStoreAPI(base_api.BaseStoreAPI):
|
||||
session = db_api.get_session()
|
||||
return db_api.save_blob_data(context, blob_id, data, session)
|
||||
|
||||
def add_to_backend_batch(self, blobs, context, verifier=None):
|
||||
session = db_api.get_session()
|
||||
return db_api.save_blob_data_batch(context, blobs, session)
|
||||
|
||||
def get_from_store(self, uri, context):
|
||||
session = db_api.get_session()
|
||||
return db_api.get_blob_data(context, uri, session)
|
||||
|
@ -13,6 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
from time import time
|
||||
|
||||
from glare.tests.unit import base
|
||||
|
||||
@ -42,3 +43,38 @@ class TestArtifactHooks(base.BaseTestArtifactAPI):
|
||||
self.assertEqual(11, artifact['content']['folder1/bbb.txt']['size'])
|
||||
self.assertEqual(
|
||||
11, artifact['content']['folder1/folder2/ccc.txt']['size'])
|
||||
|
||||
def test_unpacking_database(self):
|
||||
self.config(default_store='database',
|
||||
group='artifact_type:unpacking_artifact')
|
||||
self.test_unpacking()
|
||||
|
||||
def test_unpacking_big_archive(self):
|
||||
var_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||
'../', 'var'))
|
||||
data_path = os.path.join(var_dir, 'hooks_100.zip')
|
||||
|
||||
# play rally - test that this test should pass faster than 3 seconds
|
||||
start = time()
|
||||
with open(data_path, "rb") as data:
|
||||
self.controller.upload_blob(
|
||||
self.req, 'unpacking_artifact', self.unpacking_artifact['id'],
|
||||
'zip', data, 'application/octet-stream')
|
||||
end = time()
|
||||
self.assertIs(True, (end - start) < 3, (end - start))
|
||||
|
||||
artifact = self.controller.show(self.req, 'unpacking_artifact',
|
||||
self.unpacking_artifact['id'])
|
||||
self.assertEqual(15702, artifact['zip']['size'])
|
||||
self.assertEqual('active', artifact['zip']['status'])
|
||||
|
||||
self.assertEqual(100, len(artifact['content']))
|
||||
|
||||
for blob in artifact['content'].values():
|
||||
self.assertEqual('active', blob['status'])
|
||||
self.assertEqual(15, blob['size'])
|
||||
|
||||
def test_unpacking_database_big_archive(self):
|
||||
self.config(default_store='database',
|
||||
group='artifact_type:unpacking_artifact')
|
||||
self.test_unpacking_big_archive()
|
||||
|
@ -16,7 +16,6 @@ import io
|
||||
import zipfile
|
||||
|
||||
from glare.common import exception
|
||||
from glare.common import utils
|
||||
from glare.objects import base
|
||||
from glare.objects.meta import file_utils
|
||||
from glare.objects.meta import wrappers
|
||||
@ -49,10 +48,9 @@ class Unpacker(base.BaseArtifact):
|
||||
raise exception.RequestEntityTooLarge(msg)
|
||||
|
||||
zip_ref = zipfile.ZipFile(flobj, 'r')
|
||||
for name in zip_ref.namelist():
|
||||
if not name.endswith('/'):
|
||||
file_utils.upload_content_file(
|
||||
context, af, utils.BlobIterator(zip_ref.read(name)),
|
||||
'content', name)
|
||||
|
||||
file_utils.unpack_zip_archive_to_artifact_folder(
|
||||
context, af, zip_ref, 'content')
|
||||
|
||||
flobj.seek(0)
|
||||
return flobj
|
||||
|
BIN
glare/tests/var/hooks_100.zip
Normal file
BIN
glare/tests/var/hooks_100.zip
Normal file
Binary file not shown.
Loading…
x
Reference in New Issue
Block a user