diff --git a/glare/common/store_api.py b/glare/common/store_api.py index 8784646..e91ba02 100644 --- a/glare/common/store_api.py +++ b/glare/common/store_api.py @@ -11,14 +11,20 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + from glance_store import backend from glance_store import exceptions as store_exc from oslo_config import cfg +from oslo_log import log as logging from glare.common import exception from glare.common import utils +from glare.i18n import _LW +from glare.store import database CONF = cfg.CONF +LOG = logging.getLogger(__name__) +database_api = database.DatabaseStoreAPI() error_map = [{'catch': store_exc.NotFound, 'raise': exception.NotFound}, @@ -60,15 +66,24 @@ def save_blob_to_store(blob_id, blob, context, max_size, :param blob: blob file iterator :param context: user context :param verifier:signature verified - :return: tuple of values: (location_uri, size, checksums, metadata) + :return: tuple of values: (location_uri, size, checksums) """ + if store_type not in set(CONF.glance_store.stores + ['database']): + LOG.warning(_LW("Incorrect backend configuration - scheme '%s' is not" + " supported. Fallback to default store.") + % store_type) + store_type = None data = utils.LimitingReader(utils.CooperativeReader(blob), max_size) - (location, size, md5checksum, metadata) = backend.add_to_backend( - CONF, blob_id, data, 0, store_type, context, verifier) - checksums = {"md5": md5checksum, + if store_type == 'database': + location = database_api.add_to_backend( + blob_id, data.read(None), context, verifier) + else: + (location, size, md5checksum, __) = backend.add_to_backend( + CONF, blob_id, data, 0, store_type, context, verifier) + checksums = {"md5": data.md5.hexdigest(), "sha1": data.sha1.hexdigest(), "sha256": data.sha256.hexdigest()} - return location, size, checksums + return location, data.bytes_read, checksums @utils.error_handler(error_map) @@ -79,6 +94,9 @@ def load_from_store(uri, context): :param context: user context :return: file iterator """ + if uri.startswith("sql://"): + return utils.BlobIterator( + database_api.get_from_store(uri, context)) return backend.get_from_backend(uri=uri, context=context)[0] @@ -89,9 +107,6 @@ def delete_blob(uri, context): :param uri: blob uri :param context: user context """ + if uri.startswith("sql://"): + return database_api.delete_from_store(uri, context) return backend.delete_from_backend(uri, context) - - -@utils.error_handler(error_map) -def get_blob_size(uri, context): - return backend.get_size_from_backend(uri, context) diff --git a/glare/common/utils.py b/glare/common/utils.py index e98cac9..c0f2dd8 100644 --- a/glare/common/utils.py +++ b/glare/common/utils.py @@ -216,6 +216,7 @@ class LimitingReader(object): self.data = data self.limit = limit self.bytes_read = 0 + self.md5 = hashlib.md5() self.sha1 = hashlib.sha1() self.sha256 = hashlib.sha256() @@ -232,6 +233,7 @@ class LimitingReader(object): len_result = len(result) self.bytes_read += len_result if len_result: + self.md5.update(result) self.sha1.update(result) self.sha256.update(result) if self.bytes_read > self.limit: @@ -642,3 +644,22 @@ class DictDiffer(object): msg += "\tChanged keys: %s\n" % ', '.join(self.changed()) msg += "\tUnchanged keys: %s\n" % ', '.join(self.unchanged()) return msg + + +class BlobIterator(object): + """ + Reads data from a blob, one chunk at a time. + """ + + def __init__(self, data, chunk_size=65536): + self.chunk_size = chunk_size + self.data = data + + def __iter__(self): + bytes_left = len(self.data) + i = 0 + while bytes_left > 0: + data = self.data[i * self.chunk_size:(i + 1) * self.chunk_size] + bytes_left -= len(data) + yield data + raise StopIteration() diff --git a/glare/common/wsgi.py b/glare/common/wsgi.py index ace0e69..7ce799e 100644 --- a/glare/common/wsgi.py +++ b/glare/common/wsgi.py @@ -156,8 +156,6 @@ def get_socket(default_port): """ Bind socket to bind ip:port in conf - note: Mostly comes from Swift with a few small changes... - :param default_port: port to bind to if none is specified in conf :returns: a socket object as returned from socket.listen or diff --git a/glare/db/sqlalchemy/api.py b/glare/db/sqlalchemy/api.py index eb8604c..c86237f 100644 --- a/glare/db/sqlalchemy/api.py +++ b/glare/db/sqlalchemy/api.py @@ -630,3 +630,44 @@ def delete_lock(context, lock_id, session): except orm.exc.NoResultFound: msg = _("Cannot delete a lock with id %s.") % lock_id raise exception.NotFound(msg) + + +@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500, + stop_max_attempt_number=50) +def save_blob_data(context, blob_data_id, data, session): + """Save blob data to database.""" + + blob_data = models.ArtifactBlobData() + blob_data.id = blob_data_id + blob_data.data = data + blob_data.save(session=session) + return "sql://" + blob_data.id + + +@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500, + stop_max_attempt_number=50) +def get_blob_data(context, uri, session): + """Download blob data from database.""" + + blob_data_id = uri[6:] + try: + blob_data = session.query( + models.ArtifactBlobData).filter_by(id=blob_data_id).one() + except orm.exc.NoResultFound: + msg = _("Cannot find a blob data with id %s.") % blob_data_id + raise exception.NotFound(msg) + return blob_data.data + + +@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500, + stop_max_attempt_number=50) +def delete_blob_data(context, uri, session): + """Delete blob data from database.""" + + blob_data_id = uri[6:] + try: + session.query( + models.ArtifactBlobData).filter_by(id=blob_data_id).delete() + except orm.exc.NoResultFound: + msg = _("Cannot delete a blob data with id %s.") % blob_data_id + raise exception.NotFound(msg) diff --git a/glare/engine.py b/glare/engine.py index fdfdd36..c7ca1ef 100644 --- a/glare/engine.py +++ b/glare/engine.py @@ -25,7 +25,7 @@ from glare.common import exception from glare.common import policy from glare.common import store_api from glare.common import utils -from glare.i18n import _, _LI, _LW +from glare.i18n import _, _LI from glare.notification import Notifier from glare.objects import base from glare.objects.meta import fields as glare_fields @@ -329,14 +329,9 @@ class Engine(object): try: default_store = af.get_default_store( context, af, field_name, blob_key) - if default_store not in set(CONF.glance_store.stores): - LOG.warning(_LW('Incorrect backend configuration - scheme ' - '"%s" is not supported. Fallback to ' - 'default store.'), default_store) - default_store = None location_uri, size, checksums = store_api.save_blob_to_store( blob_id, fd, context, af.get_max_blob_size(field_name), - default_store) + store_type=default_store) except Exception: # if upload failed remove blob from db and storage with excutils.save_and_reraise_exception(logger=LOG): diff --git a/glare/store/__init__.py b/glare/store/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/glare/store/base_api.py b/glare/store/base_api.py new file mode 100644 index 0000000..c6d8eb4 --- /dev/null +++ b/glare/store/base_api.py @@ -0,0 +1,45 @@ +# Copyright 2017 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class BaseStoreAPI(object): + + def add_to_backend(self, context, blob_id, data, verifier=None): + """Save data to database store type and return location info + + :param blob_id: id of artifact + :param data: file iterator + :param context: user context + :param verifier:signature verified + :return: database location uri + """ + raise NotImplementedError() + + def get_from_store(self, uri, context): + """Load file from database store + + :param uri: blob uri + :param context: user context + :return: file iterator + """ + raise NotImplementedError() + + def delete_from_store(self, uri, context): + """Delete blob from database store + + :param uri: blob uri + :param context: user context + """ + raise NotImplementedError() diff --git a/glare/store/database.py b/glare/store/database.py new file mode 100644 index 0000000..9edbd5f --- /dev/null +++ b/glare/store/database.py @@ -0,0 +1,33 @@ +# Copyright 2017 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from glare.db.sqlalchemy import api as db_api +from glare.store import base_api + + +class DatabaseStoreAPI(base_api.BaseStoreAPI): + """Class that stores all data in sql database.""" + + def add_to_backend(self, blob_id, data, context, verifier=None): + session = db_api.get_session() + return db_api.save_blob_data(context, blob_id, data, session) + + def get_from_store(self, uri, context): + session = db_api.get_session() + return db_api.get_blob_data(context, uri, session) + + def delete_from_store(self, uri, context): + session = db_api.get_session() + return db_api.delete_blob_data(context, uri, session) diff --git a/glare/tests/functional/test_database_store.py b/glare/tests/functional/test_database_store.py new file mode 100644 index 0000000..cb7a1d1 --- /dev/null +++ b/glare/tests/functional/test_database_store.py @@ -0,0 +1,172 @@ +# Copyright 2017 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import hashlib +import requests + +from glare.tests.functional import base + + +class TestMultiStore(base.TestArtifact): + enabled_types = (u'sample_artifact:database',) + + def test_blob_dicts(self): + # Getting empty artifact list + url = '/sample_artifact' + response = self.get(url=url, status=200) + expected = {'first': '/artifacts/sample_artifact', + 'sample_artifact': [], + 'schema': '/schemas/sample_artifact'} + self.assertEqual(expected, response) + + # Create a test artifact + art = self.create_artifact(status=201, + data={'name': 'test', + 'version': '1.0', + 'string_required': '123'}) + self.assertIsNotNone(art['id']) + + # Get the artifact which should have a generated id and status + # 'drafted' + url = '/sample_artifact/%s' % art['id'] + art_1 = self.get(url=url, status=200) + self.assertIsNotNone(art_1['id']) + self.assertEqual('drafted', art_1['status']) + + # Upload data to blob dict + headers = {'Content-Type': 'application/octet-stream'} + data = "data" * 100 + + self.put(url=url + '/dict_of_blobs/new_blob', + data=data, status=200, headers=headers) + + # Download data from blob dict + self.assertEqual(data, + self.get(url=url + '/dict_of_blobs/new_blob', + status=200)) + + # download blob from undefined dict property + self.get(url=url + '/not_a_dict/not_a_blob', status=400) + + def test_blob_upload(self): + # create artifact with blob + data = 'data' + self.create_artifact( + data={'name': 'test_af', 'blob': data, + 'version': '0.0.1'}, status=400) + art = self.create_artifact(data={'name': 'test_af', + 'version': '0.0.1', + 'string_required': 'test'}) + url = '/sample_artifact/%s' % art['id'] + headers = {'Content-Type': 'application/octet-stream'} + + # upload to non-existing property + self.put(url=url + '/blob_non_exist', data=data, status=400, + headers=headers) + + # upload too big value + big_data = "this is the smallest big data" + self.put(url=url + '/small_blob', data=big_data, status=413, + headers=headers) + # upload correct blob value + self.put(url=url + '/small_blob', data=big_data[:2], headers=headers) + + # Upload artifact via different user + self.set_user('user2') + self.put(url=url + '/blob', data=data, status=404, + headers=headers) + + # Upload file to the artifact + self.set_user('user1') + art = self.put(url=url + '/blob', data=data, status=200, + headers=headers) + self.assertEqual('active', art['blob']['status']) + self.assertEqual('application/octet-stream', + art['blob']['content_type']) + self.assertIn('url', art['blob']) + self.assertNotIn('id', art['blob']) + + # reUpload file to artifact + self.put(url=url + '/blob', data=data, status=409, + headers=headers) + # upload blob dict + self.put(url + '/dict_of_blobs/test_key', data=data, headers=headers) + # test re-upload failed + self.put(url + '/dict_of_blobs/test_key', data=data, headers=headers, + status=409) + + # upload few other blobs to the dict + for elem in ('aaa', 'bbb', 'ccc', 'ddd'): + self.put(url + '/dict_of_blobs/' + elem, data=data, + headers=headers) + + # upload to active artifact + self.patch(url, self.make_active) + self.put(url + '/dict_of_blobs/key2', data=data, status=403, + headers=headers) + + self.delete(url) + + def test_blob_download(self): + data = 'some_arbitrary_testing_data' + art = self.create_artifact(data={'name': 'test_af', + 'version': '0.0.1'}) + url = '/sample_artifact/%s' % art['id'] + + # download not uploaded blob + self.get(url=url + '/blob', status=400) + + # download blob from not existing artifact + self.get(url=url + '1/blob', status=404) + + # download blob from undefined property + self.get(url=url + '/not_a_blob', status=400) + + headers = {'Content-Type': 'application/octet-stream'} + art = self.put(url=url + '/blob', data=data, status=200, + headers=headers) + self.assertEqual('active', art['blob']['status']) + md5 = hashlib.md5(data.encode('UTF-8')).hexdigest() + sha1 = hashlib.sha1(data.encode('UTF-8')).hexdigest() + sha256 = hashlib.sha256(data.encode('UTF-8')).hexdigest() + self.assertEqual(md5, art['blob']['md5']) + self.assertEqual(sha1, art['blob']['sha1']) + self.assertEqual(sha256, art['blob']['sha256']) + + # check that content-length is in response + response = requests.get(self._url(url + '/blob'), + headers=self._headers()) + self.assertEqual('27', response.headers["content-length"]) + + # check that all checksums are in response + self.assertEqual('0825587cc011b7e76381b65e19d5ec27', + response.headers["Content-MD5"]) + self.assertEqual('89eb4b969b721ba8c3aff18ad7d69454f651a697', + response.headers["X-Openstack-Glare-Content-SHA1"]) + self.assertEqual('bbfd48c7ec792fc462e58232d4d9f407' + 'ecefb75cc9e9823336166556b499ea4d', + response.headers["X-Openstack-Glare-Content-SHA256"]) + + blob_data = self.get(url=url + '/blob') + self.assertEqual(data, blob_data) + + # download artifact via admin + self.set_user('admin') + blob_data = self.get(url=url + '/blob') + self.assertEqual(data, blob_data) + + # try to download blob via different user + self.set_user('user2') + self.get(url=url + '/blob', status=404)