Merge "Implement database store for Glare"
This commit is contained in:
commit
63a765168b
@ -11,14 +11,20 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from glance_store import backend
|
||||
from glance_store import exceptions as store_exc
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from glare.common import exception
|
||||
from glare.common import utils
|
||||
from glare.i18n import _LW
|
||||
from glare.store import database
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
database_api = database.DatabaseStoreAPI()
|
||||
|
||||
error_map = [{'catch': store_exc.NotFound,
|
||||
'raise': exception.NotFound},
|
||||
@ -60,15 +66,24 @@ def save_blob_to_store(blob_id, blob, context, max_size,
|
||||
:param blob: blob file iterator
|
||||
:param context: user context
|
||||
:param verifier:signature verified
|
||||
:return: tuple of values: (location_uri, size, checksums, metadata)
|
||||
:return: tuple of values: (location_uri, size, checksums)
|
||||
"""
|
||||
if store_type not in set(CONF.glance_store.stores + ['database']):
|
||||
LOG.warning(_LW("Incorrect backend configuration - scheme '%s' is not"
|
||||
" supported. Fallback to default store.")
|
||||
% store_type)
|
||||
store_type = None
|
||||
data = utils.LimitingReader(utils.CooperativeReader(blob), max_size)
|
||||
(location, size, md5checksum, metadata) = backend.add_to_backend(
|
||||
if store_type == 'database':
|
||||
location = database_api.add_to_backend(
|
||||
blob_id, data.read(None), context, verifier)
|
||||
else:
|
||||
(location, size, md5checksum, __) = backend.add_to_backend(
|
||||
CONF, blob_id, data, 0, store_type, context, verifier)
|
||||
checksums = {"md5": md5checksum,
|
||||
checksums = {"md5": data.md5.hexdigest(),
|
||||
"sha1": data.sha1.hexdigest(),
|
||||
"sha256": data.sha256.hexdigest()}
|
||||
return location, size, checksums
|
||||
return location, data.bytes_read, checksums
|
||||
|
||||
|
||||
@utils.error_handler(error_map)
|
||||
@ -79,6 +94,9 @@ def load_from_store(uri, context):
|
||||
:param context: user context
|
||||
:return: file iterator
|
||||
"""
|
||||
if uri.startswith("sql://"):
|
||||
return utils.BlobIterator(
|
||||
database_api.get_from_store(uri, context))
|
||||
return backend.get_from_backend(uri=uri, context=context)[0]
|
||||
|
||||
|
||||
@ -89,9 +107,6 @@ def delete_blob(uri, context):
|
||||
:param uri: blob uri
|
||||
:param context: user context
|
||||
"""
|
||||
if uri.startswith("sql://"):
|
||||
return database_api.delete_from_store(uri, context)
|
||||
return backend.delete_from_backend(uri, context)
|
||||
|
||||
|
||||
@utils.error_handler(error_map)
|
||||
def get_blob_size(uri, context):
|
||||
return backend.get_size_from_backend(uri, context)
|
||||
|
@ -216,6 +216,7 @@ class LimitingReader(object):
|
||||
self.data = data
|
||||
self.limit = limit
|
||||
self.bytes_read = 0
|
||||
self.md5 = hashlib.md5()
|
||||
self.sha1 = hashlib.sha1()
|
||||
self.sha256 = hashlib.sha256()
|
||||
|
||||
@ -232,6 +233,7 @@ class LimitingReader(object):
|
||||
len_result = len(result)
|
||||
self.bytes_read += len_result
|
||||
if len_result:
|
||||
self.md5.update(result)
|
||||
self.sha1.update(result)
|
||||
self.sha256.update(result)
|
||||
if self.bytes_read > self.limit:
|
||||
@ -642,3 +644,22 @@ class DictDiffer(object):
|
||||
msg += "\tChanged keys: %s\n" % ', '.join(self.changed())
|
||||
msg += "\tUnchanged keys: %s\n" % ', '.join(self.unchanged())
|
||||
return msg
|
||||
|
||||
|
||||
class BlobIterator(object):
|
||||
"""
|
||||
Reads data from a blob, one chunk at a time.
|
||||
"""
|
||||
|
||||
def __init__(self, data, chunk_size=65536):
|
||||
self.chunk_size = chunk_size
|
||||
self.data = data
|
||||
|
||||
def __iter__(self):
|
||||
bytes_left = len(self.data)
|
||||
i = 0
|
||||
while bytes_left > 0:
|
||||
data = self.data[i * self.chunk_size:(i + 1) * self.chunk_size]
|
||||
bytes_left -= len(data)
|
||||
yield data
|
||||
raise StopIteration()
|
||||
|
@ -156,8 +156,6 @@ def get_socket(default_port):
|
||||
"""
|
||||
Bind socket to bind ip:port in conf
|
||||
|
||||
note: Mostly comes from Swift with a few small changes...
|
||||
|
||||
:param default_port: port to bind to if none is specified in conf
|
||||
|
||||
:returns: a socket object as returned from socket.listen or
|
||||
|
@ -630,3 +630,44 @@ def delete_lock(context, lock_id, session):
|
||||
except orm.exc.NoResultFound:
|
||||
msg = _("Cannot delete a lock with id %s.") % lock_id
|
||||
raise exception.NotFound(msg)
|
||||
|
||||
|
||||
@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500,
|
||||
stop_max_attempt_number=50)
|
||||
def save_blob_data(context, blob_data_id, data, session):
|
||||
"""Save blob data to database."""
|
||||
|
||||
blob_data = models.ArtifactBlobData()
|
||||
blob_data.id = blob_data_id
|
||||
blob_data.data = data
|
||||
blob_data.save(session=session)
|
||||
return "sql://" + blob_data.id
|
||||
|
||||
|
||||
@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500,
|
||||
stop_max_attempt_number=50)
|
||||
def get_blob_data(context, uri, session):
|
||||
"""Download blob data from database."""
|
||||
|
||||
blob_data_id = uri[6:]
|
||||
try:
|
||||
blob_data = session.query(
|
||||
models.ArtifactBlobData).filter_by(id=blob_data_id).one()
|
||||
except orm.exc.NoResultFound:
|
||||
msg = _("Cannot find a blob data with id %s.") % blob_data_id
|
||||
raise exception.NotFound(msg)
|
||||
return blob_data.data
|
||||
|
||||
|
||||
@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500,
|
||||
stop_max_attempt_number=50)
|
||||
def delete_blob_data(context, uri, session):
|
||||
"""Delete blob data from database."""
|
||||
|
||||
blob_data_id = uri[6:]
|
||||
try:
|
||||
session.query(
|
||||
models.ArtifactBlobData).filter_by(id=blob_data_id).delete()
|
||||
except orm.exc.NoResultFound:
|
||||
msg = _("Cannot delete a blob data with id %s.") % blob_data_id
|
||||
raise exception.NotFound(msg)
|
||||
|
@ -25,7 +25,7 @@ from glare.common import exception
|
||||
from glare.common import policy
|
||||
from glare.common import store_api
|
||||
from glare.common import utils
|
||||
from glare.i18n import _, _LI, _LW
|
||||
from glare.i18n import _, _LI
|
||||
from glare.notification import Notifier
|
||||
from glare.objects import base
|
||||
from glare.objects.meta import fields as glare_fields
|
||||
@ -329,14 +329,9 @@ class Engine(object):
|
||||
try:
|
||||
default_store = af.get_default_store(
|
||||
context, af, field_name, blob_key)
|
||||
if default_store not in set(CONF.glance_store.stores):
|
||||
LOG.warning(_LW('Incorrect backend configuration - scheme '
|
||||
'"%s" is not supported. Fallback to '
|
||||
'default store.'), default_store)
|
||||
default_store = None
|
||||
location_uri, size, checksums = store_api.save_blob_to_store(
|
||||
blob_id, fd, context, af.get_max_blob_size(field_name),
|
||||
default_store)
|
||||
store_type=default_store)
|
||||
except Exception:
|
||||
# if upload failed remove blob from db and storage
|
||||
with excutils.save_and_reraise_exception(logger=LOG):
|
||||
|
0
glare/store/__init__.py
Normal file
0
glare/store/__init__.py
Normal file
45
glare/store/base_api.py
Normal file
45
glare/store/base_api.py
Normal file
@ -0,0 +1,45 @@
|
||||
# Copyright 2017 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
class BaseStoreAPI(object):
|
||||
|
||||
def add_to_backend(self, context, blob_id, data, verifier=None):
|
||||
"""Save data to database store type and return location info
|
||||
|
||||
:param blob_id: id of artifact
|
||||
:param data: file iterator
|
||||
:param context: user context
|
||||
:param verifier:signature verified
|
||||
:return: database location uri
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_from_store(self, uri, context):
|
||||
"""Load file from database store
|
||||
|
||||
:param uri: blob uri
|
||||
:param context: user context
|
||||
:return: file iterator
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def delete_from_store(self, uri, context):
|
||||
"""Delete blob from database store
|
||||
|
||||
:param uri: blob uri
|
||||
:param context: user context
|
||||
"""
|
||||
raise NotImplementedError()
|
33
glare/store/database.py
Normal file
33
glare/store/database.py
Normal file
@ -0,0 +1,33 @@
|
||||
# Copyright 2017 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from glare.db.sqlalchemy import api as db_api
|
||||
from glare.store import base_api
|
||||
|
||||
|
||||
class DatabaseStoreAPI(base_api.BaseStoreAPI):
|
||||
"""Class that stores all data in sql database."""
|
||||
|
||||
def add_to_backend(self, blob_id, data, context, verifier=None):
|
||||
session = db_api.get_session()
|
||||
return db_api.save_blob_data(context, blob_id, data, session)
|
||||
|
||||
def get_from_store(self, uri, context):
|
||||
session = db_api.get_session()
|
||||
return db_api.get_blob_data(context, uri, session)
|
||||
|
||||
def delete_from_store(self, uri, context):
|
||||
session = db_api.get_session()
|
||||
return db_api.delete_blob_data(context, uri, session)
|
172
glare/tests/functional/test_database_store.py
Normal file
172
glare/tests/functional/test_database_store.py
Normal file
@ -0,0 +1,172 @@
|
||||
# Copyright 2017 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import hashlib
|
||||
import requests
|
||||
|
||||
from glare.tests.functional import base
|
||||
|
||||
|
||||
class TestMultiStore(base.TestArtifact):
|
||||
enabled_types = (u'sample_artifact:database',)
|
||||
|
||||
def test_blob_dicts(self):
|
||||
# Getting empty artifact list
|
||||
url = '/sample_artifact'
|
||||
response = self.get(url=url, status=200)
|
||||
expected = {'first': '/artifacts/sample_artifact',
|
||||
'sample_artifact': [],
|
||||
'schema': '/schemas/sample_artifact'}
|
||||
self.assertEqual(expected, response)
|
||||
|
||||
# Create a test artifact
|
||||
art = self.create_artifact(status=201,
|
||||
data={'name': 'test',
|
||||
'version': '1.0',
|
||||
'string_required': '123'})
|
||||
self.assertIsNotNone(art['id'])
|
||||
|
||||
# Get the artifact which should have a generated id and status
|
||||
# 'drafted'
|
||||
url = '/sample_artifact/%s' % art['id']
|
||||
art_1 = self.get(url=url, status=200)
|
||||
self.assertIsNotNone(art_1['id'])
|
||||
self.assertEqual('drafted', art_1['status'])
|
||||
|
||||
# Upload data to blob dict
|
||||
headers = {'Content-Type': 'application/octet-stream'}
|
||||
data = "data" * 100
|
||||
|
||||
self.put(url=url + '/dict_of_blobs/new_blob',
|
||||
data=data, status=200, headers=headers)
|
||||
|
||||
# Download data from blob dict
|
||||
self.assertEqual(data,
|
||||
self.get(url=url + '/dict_of_blobs/new_blob',
|
||||
status=200))
|
||||
|
||||
# download blob from undefined dict property
|
||||
self.get(url=url + '/not_a_dict/not_a_blob', status=400)
|
||||
|
||||
def test_blob_upload(self):
|
||||
# create artifact with blob
|
||||
data = 'data'
|
||||
self.create_artifact(
|
||||
data={'name': 'test_af', 'blob': data,
|
||||
'version': '0.0.1'}, status=400)
|
||||
art = self.create_artifact(data={'name': 'test_af',
|
||||
'version': '0.0.1',
|
||||
'string_required': 'test'})
|
||||
url = '/sample_artifact/%s' % art['id']
|
||||
headers = {'Content-Type': 'application/octet-stream'}
|
||||
|
||||
# upload to non-existing property
|
||||
self.put(url=url + '/blob_non_exist', data=data, status=400,
|
||||
headers=headers)
|
||||
|
||||
# upload too big value
|
||||
big_data = "this is the smallest big data"
|
||||
self.put(url=url + '/small_blob', data=big_data, status=413,
|
||||
headers=headers)
|
||||
# upload correct blob value
|
||||
self.put(url=url + '/small_blob', data=big_data[:2], headers=headers)
|
||||
|
||||
# Upload artifact via different user
|
||||
self.set_user('user2')
|
||||
self.put(url=url + '/blob', data=data, status=404,
|
||||
headers=headers)
|
||||
|
||||
# Upload file to the artifact
|
||||
self.set_user('user1')
|
||||
art = self.put(url=url + '/blob', data=data, status=200,
|
||||
headers=headers)
|
||||
self.assertEqual('active', art['blob']['status'])
|
||||
self.assertEqual('application/octet-stream',
|
||||
art['blob']['content_type'])
|
||||
self.assertIn('url', art['blob'])
|
||||
self.assertNotIn('id', art['blob'])
|
||||
|
||||
# reUpload file to artifact
|
||||
self.put(url=url + '/blob', data=data, status=409,
|
||||
headers=headers)
|
||||
# upload blob dict
|
||||
self.put(url + '/dict_of_blobs/test_key', data=data, headers=headers)
|
||||
# test re-upload failed
|
||||
self.put(url + '/dict_of_blobs/test_key', data=data, headers=headers,
|
||||
status=409)
|
||||
|
||||
# upload few other blobs to the dict
|
||||
for elem in ('aaa', 'bbb', 'ccc', 'ddd'):
|
||||
self.put(url + '/dict_of_blobs/' + elem, data=data,
|
||||
headers=headers)
|
||||
|
||||
# upload to active artifact
|
||||
self.patch(url, self.make_active)
|
||||
self.put(url + '/dict_of_blobs/key2', data=data, status=403,
|
||||
headers=headers)
|
||||
|
||||
self.delete(url)
|
||||
|
||||
def test_blob_download(self):
|
||||
data = 'some_arbitrary_testing_data'
|
||||
art = self.create_artifact(data={'name': 'test_af',
|
||||
'version': '0.0.1'})
|
||||
url = '/sample_artifact/%s' % art['id']
|
||||
|
||||
# download not uploaded blob
|
||||
self.get(url=url + '/blob', status=400)
|
||||
|
||||
# download blob from not existing artifact
|
||||
self.get(url=url + '1/blob', status=404)
|
||||
|
||||
# download blob from undefined property
|
||||
self.get(url=url + '/not_a_blob', status=400)
|
||||
|
||||
headers = {'Content-Type': 'application/octet-stream'}
|
||||
art = self.put(url=url + '/blob', data=data, status=200,
|
||||
headers=headers)
|
||||
self.assertEqual('active', art['blob']['status'])
|
||||
md5 = hashlib.md5(data.encode('UTF-8')).hexdigest()
|
||||
sha1 = hashlib.sha1(data.encode('UTF-8')).hexdigest()
|
||||
sha256 = hashlib.sha256(data.encode('UTF-8')).hexdigest()
|
||||
self.assertEqual(md5, art['blob']['md5'])
|
||||
self.assertEqual(sha1, art['blob']['sha1'])
|
||||
self.assertEqual(sha256, art['blob']['sha256'])
|
||||
|
||||
# check that content-length is in response
|
||||
response = requests.get(self._url(url + '/blob'),
|
||||
headers=self._headers())
|
||||
self.assertEqual('27', response.headers["content-length"])
|
||||
|
||||
# check that all checksums are in response
|
||||
self.assertEqual('0825587cc011b7e76381b65e19d5ec27',
|
||||
response.headers["Content-MD5"])
|
||||
self.assertEqual('89eb4b969b721ba8c3aff18ad7d69454f651a697',
|
||||
response.headers["X-Openstack-Glare-Content-SHA1"])
|
||||
self.assertEqual('bbfd48c7ec792fc462e58232d4d9f407'
|
||||
'ecefb75cc9e9823336166556b499ea4d',
|
||||
response.headers["X-Openstack-Glare-Content-SHA256"])
|
||||
|
||||
blob_data = self.get(url=url + '/blob')
|
||||
self.assertEqual(data, blob_data)
|
||||
|
||||
# download artifact via admin
|
||||
self.set_user('admin')
|
||||
blob_data = self.get(url=url + '/blob')
|
||||
self.assertEqual(data, blob_data)
|
||||
|
||||
# try to download blob via different user
|
||||
self.set_user('user2')
|
||||
self.get(url=url + '/blob', status=404)
|
Loading…
Reference in New Issue
Block a user