Implement database store for Glare

Change-Id: I3aa0207e8649b8724692b27fade94dff5744bb98
This commit is contained in:
Mike Fedosin 2017-01-10 17:25:52 +03:00
parent c597ed074f
commit 7a1642c7f4
9 changed files with 339 additions and 19 deletions

View File

@ -11,14 +11,20 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from glance_store import backend
from glance_store import exceptions as store_exc
from oslo_config import cfg
from oslo_log import log as logging
from glare.common import exception
from glare.common import utils
from glare.i18n import _LW
from glare.store import database
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
database_api = database.DatabaseStoreAPI()
error_map = [{'catch': store_exc.NotFound,
'raise': exception.NotFound},
@ -60,15 +66,24 @@ def save_blob_to_store(blob_id, blob, context, max_size,
:param blob: blob file iterator
:param context: user context
:param verifier:signature verified
:return: tuple of values: (location_uri, size, checksums, metadata)
:return: tuple of values: (location_uri, size, checksums)
"""
if store_type not in set(CONF.glance_store.stores + ['database']):
LOG.warning(_LW("Incorrect backend configuration - scheme '%s' is not"
" supported. Fallback to default store.")
% store_type)
store_type = None
data = utils.LimitingReader(utils.CooperativeReader(blob), max_size)
(location, size, md5checksum, metadata) = backend.add_to_backend(
CONF, blob_id, data, 0, store_type, context, verifier)
checksums = {"md5": md5checksum,
if store_type == 'database':
location = database_api.add_to_backend(
blob_id, data.read(None), context, verifier)
else:
(location, size, md5checksum, __) = backend.add_to_backend(
CONF, blob_id, data, 0, store_type, context, verifier)
checksums = {"md5": data.md5.hexdigest(),
"sha1": data.sha1.hexdigest(),
"sha256": data.sha256.hexdigest()}
return location, size, checksums
return location, data.bytes_read, checksums
@utils.error_handler(error_map)
@ -79,6 +94,9 @@ def load_from_store(uri, context):
:param context: user context
:return: file iterator
"""
if uri.startswith("sql://"):
return utils.BlobIterator(
database_api.get_from_store(uri, context))
return backend.get_from_backend(uri=uri, context=context)[0]
@ -89,9 +107,6 @@ def delete_blob(uri, context):
:param uri: blob uri
:param context: user context
"""
if uri.startswith("sql://"):
return database_api.delete_from_store(uri, context)
return backend.delete_from_backend(uri, context)
@utils.error_handler(error_map)
def get_blob_size(uri, context):
return backend.get_size_from_backend(uri, context)

View File

@ -216,6 +216,7 @@ class LimitingReader(object):
self.data = data
self.limit = limit
self.bytes_read = 0
self.md5 = hashlib.md5()
self.sha1 = hashlib.sha1()
self.sha256 = hashlib.sha256()
@ -232,6 +233,7 @@ class LimitingReader(object):
len_result = len(result)
self.bytes_read += len_result
if len_result:
self.md5.update(result)
self.sha1.update(result)
self.sha256.update(result)
if self.bytes_read > self.limit:
@ -642,3 +644,22 @@ class DictDiffer(object):
msg += "\tChanged keys: %s\n" % ', '.join(self.changed())
msg += "\tUnchanged keys: %s\n" % ', '.join(self.unchanged())
return msg
class BlobIterator(object):
"""
Reads data from a blob, one chunk at a time.
"""
def __init__(self, data, chunk_size=65536):
self.chunk_size = chunk_size
self.data = data
def __iter__(self):
bytes_left = len(self.data)
i = 0
while bytes_left > 0:
data = self.data[i * self.chunk_size:(i + 1) * self.chunk_size]
bytes_left -= len(data)
yield data
raise StopIteration()

View File

@ -156,8 +156,6 @@ def get_socket(default_port):
"""
Bind socket to bind ip:port in conf
note: Mostly comes from Swift with a few small changes...
:param default_port: port to bind to if none is specified in conf
:returns: a socket object as returned from socket.listen or

View File

@ -630,3 +630,44 @@ def delete_lock(context, lock_id, session):
except orm.exc.NoResultFound:
msg = _("Cannot delete a lock with id %s.") % lock_id
raise exception.NotFound(msg)
@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500,
stop_max_attempt_number=50)
def save_blob_data(context, blob_data_id, data, session):
"""Save blob data to database."""
blob_data = models.ArtifactBlobData()
blob_data.id = blob_data_id
blob_data.data = data
blob_data.save(session=session)
return "sql://" + blob_data.id
@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500,
stop_max_attempt_number=50)
def get_blob_data(context, uri, session):
"""Download blob data from database."""
blob_data_id = uri[6:]
try:
blob_data = session.query(
models.ArtifactBlobData).filter_by(id=blob_data_id).one()
except orm.exc.NoResultFound:
msg = _("Cannot find a blob data with id %s.") % blob_data_id
raise exception.NotFound(msg)
return blob_data.data
@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500,
stop_max_attempt_number=50)
def delete_blob_data(context, uri, session):
"""Delete blob data from database."""
blob_data_id = uri[6:]
try:
session.query(
models.ArtifactBlobData).filter_by(id=blob_data_id).delete()
except orm.exc.NoResultFound:
msg = _("Cannot delete a blob data with id %s.") % blob_data_id
raise exception.NotFound(msg)

View File

@ -25,7 +25,7 @@ from glare.common import exception
from glare.common import policy
from glare.common import store_api
from glare.common import utils
from glare.i18n import _, _LI, _LW
from glare.i18n import _, _LI
from glare.notification import Notifier
from glare.objects import base
from glare.objects.meta import fields as glare_fields
@ -329,14 +329,9 @@ class Engine(object):
try:
default_store = af.get_default_store(
context, af, field_name, blob_key)
if default_store not in set(CONF.glance_store.stores):
LOG.warning(_LW('Incorrect backend configuration - scheme '
'"%s" is not supported. Fallback to '
'default store.'), default_store)
default_store = None
location_uri, size, checksums = store_api.save_blob_to_store(
blob_id, fd, context, af.get_max_blob_size(field_name),
default_store)
store_type=default_store)
except Exception:
# if upload failed remove blob from db and storage
with excutils.save_and_reraise_exception(logger=LOG):

0
glare/store/__init__.py Normal file
View File

45
glare/store/base_api.py Normal file
View File

@ -0,0 +1,45 @@
# Copyright 2017 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class BaseStoreAPI(object):
def add_to_backend(self, context, blob_id, data, verifier=None):
"""Save data to database store type and return location info
:param blob_id: id of artifact
:param data: file iterator
:param context: user context
:param verifier:signature verified
:return: database location uri
"""
raise NotImplementedError()
def get_from_store(self, uri, context):
"""Load file from database store
:param uri: blob uri
:param context: user context
:return: file iterator
"""
raise NotImplementedError()
def delete_from_store(self, uri, context):
"""Delete blob from database store
:param uri: blob uri
:param context: user context
"""
raise NotImplementedError()

33
glare/store/database.py Normal file
View File

@ -0,0 +1,33 @@
# Copyright 2017 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from glare.db.sqlalchemy import api as db_api
from glare.store import base_api
class DatabaseStoreAPI(base_api.BaseStoreAPI):
"""Class that stores all data in sql database."""
def add_to_backend(self, blob_id, data, context, verifier=None):
session = db_api.get_session()
return db_api.save_blob_data(context, blob_id, data, session)
def get_from_store(self, uri, context):
session = db_api.get_session()
return db_api.get_blob_data(context, uri, session)
def delete_from_store(self, uri, context):
session = db_api.get_session()
return db_api.delete_blob_data(context, uri, session)

View File

@ -0,0 +1,172 @@
# Copyright 2017 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
import requests
from glare.tests.functional import base
class TestMultiStore(base.TestArtifact):
enabled_types = (u'sample_artifact:database',)
def test_blob_dicts(self):
# Getting empty artifact list
url = '/sample_artifact'
response = self.get(url=url, status=200)
expected = {'first': '/artifacts/sample_artifact',
'sample_artifact': [],
'schema': '/schemas/sample_artifact'}
self.assertEqual(expected, response)
# Create a test artifact
art = self.create_artifact(status=201,
data={'name': 'test',
'version': '1.0',
'string_required': '123'})
self.assertIsNotNone(art['id'])
# Get the artifact which should have a generated id and status
# 'drafted'
url = '/sample_artifact/%s' % art['id']
art_1 = self.get(url=url, status=200)
self.assertIsNotNone(art_1['id'])
self.assertEqual('drafted', art_1['status'])
# Upload data to blob dict
headers = {'Content-Type': 'application/octet-stream'}
data = "data" * 100
self.put(url=url + '/dict_of_blobs/new_blob',
data=data, status=200, headers=headers)
# Download data from blob dict
self.assertEqual(data,
self.get(url=url + '/dict_of_blobs/new_blob',
status=200))
# download blob from undefined dict property
self.get(url=url + '/not_a_dict/not_a_blob', status=400)
def test_blob_upload(self):
# create artifact with blob
data = 'data'
self.create_artifact(
data={'name': 'test_af', 'blob': data,
'version': '0.0.1'}, status=400)
art = self.create_artifact(data={'name': 'test_af',
'version': '0.0.1',
'string_required': 'test'})
url = '/sample_artifact/%s' % art['id']
headers = {'Content-Type': 'application/octet-stream'}
# upload to non-existing property
self.put(url=url + '/blob_non_exist', data=data, status=400,
headers=headers)
# upload too big value
big_data = "this is the smallest big data"
self.put(url=url + '/small_blob', data=big_data, status=413,
headers=headers)
# upload correct blob value
self.put(url=url + '/small_blob', data=big_data[:2], headers=headers)
# Upload artifact via different user
self.set_user('user2')
self.put(url=url + '/blob', data=data, status=404,
headers=headers)
# Upload file to the artifact
self.set_user('user1')
art = self.put(url=url + '/blob', data=data, status=200,
headers=headers)
self.assertEqual('active', art['blob']['status'])
self.assertEqual('application/octet-stream',
art['blob']['content_type'])
self.assertIn('url', art['blob'])
self.assertNotIn('id', art['blob'])
# reUpload file to artifact
self.put(url=url + '/blob', data=data, status=409,
headers=headers)
# upload blob dict
self.put(url + '/dict_of_blobs/test_key', data=data, headers=headers)
# test re-upload failed
self.put(url + '/dict_of_blobs/test_key', data=data, headers=headers,
status=409)
# upload few other blobs to the dict
for elem in ('aaa', 'bbb', 'ccc', 'ddd'):
self.put(url + '/dict_of_blobs/' + elem, data=data,
headers=headers)
# upload to active artifact
self.patch(url, self.make_active)
self.put(url + '/dict_of_blobs/key2', data=data, status=403,
headers=headers)
self.delete(url)
def test_blob_download(self):
data = 'some_arbitrary_testing_data'
art = self.create_artifact(data={'name': 'test_af',
'version': '0.0.1'})
url = '/sample_artifact/%s' % art['id']
# download not uploaded blob
self.get(url=url + '/blob', status=400)
# download blob from not existing artifact
self.get(url=url + '1/blob', status=404)
# download blob from undefined property
self.get(url=url + '/not_a_blob', status=400)
headers = {'Content-Type': 'application/octet-stream'}
art = self.put(url=url + '/blob', data=data, status=200,
headers=headers)
self.assertEqual('active', art['blob']['status'])
md5 = hashlib.md5(data.encode('UTF-8')).hexdigest()
sha1 = hashlib.sha1(data.encode('UTF-8')).hexdigest()
sha256 = hashlib.sha256(data.encode('UTF-8')).hexdigest()
self.assertEqual(md5, art['blob']['md5'])
self.assertEqual(sha1, art['blob']['sha1'])
self.assertEqual(sha256, art['blob']['sha256'])
# check that content-length is in response
response = requests.get(self._url(url + '/blob'),
headers=self._headers())
self.assertEqual('27', response.headers["content-length"])
# check that all checksums are in response
self.assertEqual('0825587cc011b7e76381b65e19d5ec27',
response.headers["Content-MD5"])
self.assertEqual('89eb4b969b721ba8c3aff18ad7d69454f651a697',
response.headers["X-Openstack-Glare-Content-SHA1"])
self.assertEqual('bbfd48c7ec792fc462e58232d4d9f407'
'ecefb75cc9e9823336166556b499ea4d',
response.headers["X-Openstack-Glare-Content-SHA256"])
blob_data = self.get(url=url + '/blob')
self.assertEqual(data, blob_data)
# download artifact via admin
self.set_user('admin')
blob_data = self.get(url=url + '/blob')
self.assertEqual(data, blob_data)
# try to download blob via different user
self.set_user('user2')
self.get(url=url + '/blob', status=404)